DEV Community

Roberto de Vargas Neto
Roberto de Vargas Neto

Posted on • Edited on • Originally published at dev.to

Integrador de Market Data: Consumindo Dados Reais com Python, MongoDB e Kafka

Olá, pessoal!

Continuando a série My Broker B3, hoje vamos construir o primeiro microserviço do ecossistema: o trading-broker-market-data.

Este serviço tem uma responsabilidade simples e bem definida: buscar cotações reais de ativos brasileiros na API Brapi e distribuí-las para o ecossistema — persistindo o histórico no MongoDB e publicando os preços atualizados no Kafka para que outros serviços consumam.


🏗️ O Papel deste Serviço no Ecossistema

[brapi.dev] ◀── HTTP ── [broker-market-data-api]
                                  │
                    ┌─────────────┴──────────────┐
                    ▼                             ▼
             MongoDB                           Kafka
        (price_history)          (trading-assets-market-data-v1)
                                               │
                                    [broker-asset-api] (consumer)
Enter fullscreen mode Exit fullscreen mode

É o ponto de entrada de dados reais no sistema. Sem ele, o ecossistema opera com preços estáticos.


🎯 Foco no MVP

Neste serviço o objetivo é claro: ter cotações reais disponíveis no Kafka para que outros serviços possam reagir. Nesta fase, priorizei:

  • Ingestão agendada de 50 ativos (Blue Chips, FIIs e ETFs)
  • Persistência histórica no MongoDB para auditoria
  • Publicação de eventos no Kafka para os consumers downstream
  • Proteção contra rate limit da API Brapi (plano gratuito)
  • Inicialização imediata ao subir o serviço

🛠️ Stack Tecnológica

Tecnologia Uso
Python 3.12 Runtime do serviço
requests Cliente HTTP para a API Brapi
pymongo Persistência no MongoDB
confluent-kafka Producer Kafka
schedule Agendamento periódico
python-dotenv Variáveis de ambiente

🏗️ A Implementação

O Loop de Ingestão

O coração do serviço é a função run_ingestion(), que itera sobre a watchlist, busca o preço de cada ativo e distribui os dados:

def run_ingestion():
    """Fetches market data from Brapi, persists to MongoDB and publishes to Kafka."""
    logger.info("Market data ingestion job started")

    token = os.getenv("BRAPI_TOKEN")
    if not token:
        logger.error("BRAPI_TOKEN not set. Aborting.")
        return

    # Producer created per run to avoid crash on import if Kafka is unavailable
    producer = Producer({
        'bootstrap.servers': os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
        'client.id': 'broker-market-data-api'
    })

    mongo = MongoRepository()
    success_count = 0
    error_count = 0

    for ticker in WATCHLIST:
        try:
            url = f"https://brapi.dev/api/quote/{ticker}?token={token}"
            response = requests.get(url, timeout=10)
            response.raise_for_status()

            result = response.json()["results"][0]
            payload = {
                "ticker": result.get("symbol"),
                "name": result.get("longName"),
                "short_name": result.get("shortName"),
                "price": result.get("regularMarketPrice"),
                "volume": result.get("regularMarketVolume"),
                "updated_at": result.get("regularMarketTime")
            }

            # 1. Persist historical record in MongoDB
            mongo.save(payload.copy())

            # 2. Publish event to Kafka
            producer.produce(
                topic=TOPIC_NAME,
                key=ticker,
                value=json.dumps(payload).encode('utf-8'),
                callback=delivery_report
            )
            producer.poll(0)

            success_count += 1
            time.sleep(0.5)  # Rate limiting — respect Brapi free plan

        except Exception as e:
            error_count += 1
            logger.error(f"Error processing ticker {ticker}: {e}")

    producer.flush()
    logger.info(f"Ingestion completed: {success_count} succeeded, {error_count} failed")
Enter fullscreen mode Exit fullscreen mode

Decisões de design importantes:

Por que o Producer é criado dentro da função? Se estivesse no nível de módulo, um Kafka indisponível no momento do import faria o script crashar antes de executar qualquer lógica. Criando dentro da função, uma falha do Kafka não impede a execução — apenas aquela rodada falha.

Por que try/catch por ticker? Se um ativo falhar (timeout, ticker inválido), o loop continua para os próximos. O error_count no final mostra quantos falharam sem interromper o trabalho.

O time.sleep(0.5) — respeita o rate limit do plano gratuito da Brapi. Para 50 ativos, a rodada completa leva ~25 segundos.

O Agendamento

if __name__ == "__main__":
    logger.info(f"Starting — sync every {SYNC_INTERVAL_MINUTES} minutes")

    # Run immediately on startup, then on schedule
    run_ingestion()

    schedule.every(SYNC_INTERVAL_MINUTES).minutes.do(run_ingestion)

    while True:
        schedule.run_pending()
        time.sleep(1)
Enter fullscreen mode Exit fullscreen mode

Por que executar imediatamente no startup? Se o serviço reiniciar, não queremos esperar 30 minutos para ter dados no Kafka. A primeira execução é imediata, as seguintes seguem o agendamento.

O Payload Kafka

{
  "ticker": "PETR4",
  "name": "Petróleo Brasileiro S.A. - Petrobras",
  "short_name": "PETROBRAS PN",
  "price": 35.50,
  "volume": 15234567,
  "updated_at": 1714234567
}
Enter fullscreen mode Exit fullscreen mode

Tópico: trading-assets-market-data-v1
Key: símbolo do ativo (ex: PETR4) — garante que eventos do mesmo ativo vão para a mesma partição


📋 A Watchlist

50 ativos cobrindo os principais segmentos da B3:

WATCHLIST = [
    # Blue Chips
    "PETR4", "PETR3", "VALE3", "ITUB4", "BBDC4", "BBAS3", ...
    # FIIs
    "MXRF11", "HGLG11", "KNIP11", "XPLG11", ...
    # ETFs
    "BOVA11", "IVVB11"
]
Enter fullscreen mode Exit fullscreen mode

🔐 Segurança: Token Fora do Código

# .env — nunca commitado
BRAPI_TOKEN=seu_token_aqui
Enter fullscreen mode Exit fullscreen mode
# application — sem fallback, variável obrigatória
token = os.getenv("BRAPI_TOKEN")
if not token:
    logger.error("BRAPI_TOKEN not set. Aborting.")
    return
Enter fullscreen mode Exit fullscreen mode

Nunca exponha credenciais com valores default em repositórios públicos.


✅ Validando a Execução

Com o serviço rodando localmente:

  • ✅ Primeira rodada executa imediatamente no startup
  • ✅ MongoDB recebe documentos com created_at automaticamente
  • ✅ Kafka recebe mensagens com key = ticker
  • ✅ Scheduler aguarda 30 minutos para a próxima rodada

🚀 O que vem a seguir?

Com o broker-market-data-api publicando cotações no Kafka, o próximo passo é o b3-market-sync-api — o serviço Java que consome esses eventos e sincroniza os preços para o Redis, alimentando o Matching Engine com dados em tempo real.


🔎 Sobre a série

⬅️ Post Anterior: Infraestrutura como Código

➡️ Próximo Post: Sincronizando o Mercado Real: Consumindo a Brapi e Alimentando o Redis com Spring Boot

📘 Índice da Série: Guia da Série


Links:

Top comments (0)