Olá, pessoal!
Continuando a série My Broker B3, hoje vamos construir o primeiro microserviço do ecossistema: o trading-broker-market-data.
Este serviço tem uma responsabilidade simples e bem definida: buscar cotações reais de ativos brasileiros na API Brapi e distribuí-las para o ecossistema — persistindo o histórico no MongoDB e publicando os preços atualizados no Kafka para que outros serviços consumam.
🏗️ O Papel deste Serviço no Ecossistema
[brapi.dev] ◀── HTTP ── [broker-market-data-api]
│
┌─────────────┴──────────────┐
▼ ▼
MongoDB Kafka
(price_history) (trading-assets-market-data-v1)
│
[broker-asset-api] (consumer)
É o ponto de entrada de dados reais no sistema. Sem ele, o ecossistema opera com preços estáticos.
🎯 Foco no MVP
Neste serviço o objetivo é claro: ter cotações reais disponíveis no Kafka para que outros serviços possam reagir. Nesta fase, priorizei:
- Ingestão agendada de 50 ativos (Blue Chips, FIIs e ETFs)
- Persistência histórica no MongoDB para auditoria
- Publicação de eventos no Kafka para os consumers downstream
- Proteção contra rate limit da API Brapi (plano gratuito)
- Inicialização imediata ao subir o serviço
🛠️ Stack Tecnológica
| Tecnologia | Uso |
|---|---|
| Python 3.12 | Runtime do serviço |
| requests | Cliente HTTP para a API Brapi |
| pymongo | Persistência no MongoDB |
| confluent-kafka | Producer Kafka |
| schedule | Agendamento periódico |
| python-dotenv | Variáveis de ambiente |
🏗️ A Implementação
O Loop de Ingestão
O coração do serviço é a função run_ingestion(), que itera sobre a watchlist, busca o preço de cada ativo e distribui os dados:
def run_ingestion():
"""Fetches market data from Brapi, persists to MongoDB and publishes to Kafka."""
logger.info("Market data ingestion job started")
token = os.getenv("BRAPI_TOKEN")
if not token:
logger.error("BRAPI_TOKEN not set. Aborting.")
return
# Producer created per run to avoid crash on import if Kafka is unavailable
producer = Producer({
'bootstrap.servers': os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
'client.id': 'broker-market-data-api'
})
mongo = MongoRepository()
success_count = 0
error_count = 0
for ticker in WATCHLIST:
try:
url = f"https://brapi.dev/api/quote/{ticker}?token={token}"
response = requests.get(url, timeout=10)
response.raise_for_status()
result = response.json()["results"][0]
payload = {
"ticker": result.get("symbol"),
"name": result.get("longName"),
"short_name": result.get("shortName"),
"price": result.get("regularMarketPrice"),
"volume": result.get("regularMarketVolume"),
"updated_at": result.get("regularMarketTime")
}
# 1. Persist historical record in MongoDB
mongo.save(payload.copy())
# 2. Publish event to Kafka
producer.produce(
topic=TOPIC_NAME,
key=ticker,
value=json.dumps(payload).encode('utf-8'),
callback=delivery_report
)
producer.poll(0)
success_count += 1
time.sleep(0.5) # Rate limiting — respect Brapi free plan
except Exception as e:
error_count += 1
logger.error(f"Error processing ticker {ticker}: {e}")
producer.flush()
logger.info(f"Ingestion completed: {success_count} succeeded, {error_count} failed")
Decisões de design importantes:
Por que o Producer é criado dentro da função? Se estivesse no nível de módulo, um Kafka indisponível no momento do import faria o script crashar antes de executar qualquer lógica. Criando dentro da função, uma falha do Kafka não impede a execução — apenas aquela rodada falha.
Por que try/catch por ticker? Se um ativo falhar (timeout, ticker inválido), o loop continua para os próximos. O error_count no final mostra quantos falharam sem interromper o trabalho.
O time.sleep(0.5) — respeita o rate limit do plano gratuito da Brapi. Para 50 ativos, a rodada completa leva ~25 segundos.
O Agendamento
if __name__ == "__main__":
logger.info(f"Starting — sync every {SYNC_INTERVAL_MINUTES} minutes")
# Run immediately on startup, then on schedule
run_ingestion()
schedule.every(SYNC_INTERVAL_MINUTES).minutes.do(run_ingestion)
while True:
schedule.run_pending()
time.sleep(1)
Por que executar imediatamente no startup? Se o serviço reiniciar, não queremos esperar 30 minutos para ter dados no Kafka. A primeira execução é imediata, as seguintes seguem o agendamento.
O Payload Kafka
{
"ticker": "PETR4",
"name": "Petróleo Brasileiro S.A. - Petrobras",
"short_name": "PETROBRAS PN",
"price": 35.50,
"volume": 15234567,
"updated_at": 1714234567
}
Tópico: trading-assets-market-data-v1
Key: símbolo do ativo (ex: PETR4) — garante que eventos do mesmo ativo vão para a mesma partição
📋 A Watchlist
50 ativos cobrindo os principais segmentos da B3:
WATCHLIST = [
# Blue Chips
"PETR4", "PETR3", "VALE3", "ITUB4", "BBDC4", "BBAS3", ...
# FIIs
"MXRF11", "HGLG11", "KNIP11", "XPLG11", ...
# ETFs
"BOVA11", "IVVB11"
]
🔐 Segurança: Token Fora do Código
# .env — nunca commitado
BRAPI_TOKEN=seu_token_aqui
# application — sem fallback, variável obrigatória
token = os.getenv("BRAPI_TOKEN")
if not token:
logger.error("BRAPI_TOKEN not set. Aborting.")
return
Nunca exponha credenciais com valores default em repositórios públicos.
✅ Validando a Execução
Com o serviço rodando localmente:
- ✅ Primeira rodada executa imediatamente no startup
- ✅ MongoDB recebe documentos com
created_atautomaticamente - ✅ Kafka recebe mensagens com key = ticker
- ✅ Scheduler aguarda 30 minutos para a próxima rodada
🚀 O que vem a seguir?
Com o broker-market-data-api publicando cotações no Kafka, o próximo passo é o b3-market-sync-api — o serviço Java que consome esses eventos e sincroniza os preços para o Redis, alimentando o Matching Engine com dados em tempo real.
🔎 Sobre a série
⬅️ Post Anterior: Infraestrutura como Código
➡️ Próximo Post: Sincronizando o Mercado Real: Consumindo a Brapi e Alimentando o Redis com Spring Boot
📘 Índice da Série: Guia da Série
Links:
Top comments (0)