DEV Community

Roberto de Vargas Neto
Roberto de Vargas Neto

Posted on β€’ Edited on

Market Data Integrator: Consuming Real-Time Data with Python, MongoDB, and Kafka

Hey folks!

Continuing the My Broker B3 series, today we'll build the first microservice in the ecosystem: the trading-broker-market-data.

This service has a simple and well-defined responsibility: fetch real quotes for Brazilian assets from the Brapi API and distribute them to the ecosystem β€” persisting the history in MongoDB and publishing updated prices to Kafka for other services to consume.


πŸ—οΈ This Service's Role in the Ecosystem

[brapi.dev] ◀── HTTP ── [broker-market-data-api]
                                  β”‚
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β–Ό                             β–Ό
             MongoDB                           Kafka
        (price_history)          (trading-assets-market-data-v1)
                                               β”‚
                                    [broker-asset-api] (consumer)
Enter fullscreen mode Exit fullscreen mode

It's the entry point for real data into the system. Without it, the ecosystem operates on static prices.


🎯 MVP Focus

The goal here is clear: have real quotes available in Kafka for other services to react to. In this phase, I prioritized:

  • Scheduled ingestion of 50 assets (Blue Chips, REITs and ETFs)
  • Historical persistence in MongoDB for audit
  • Kafka event publishing for downstream consumers
  • Rate limit protection for the Brapi API (free plan)
  • Immediate execution on service startup

πŸ› οΈ Tech Stack

Technology Usage
Python 3.12 Service runtime
requests HTTP client for Brapi API
pymongo MongoDB persistence
confluent-kafka Kafka producer
schedule Periodic scheduling
python-dotenv Environment variables

πŸ—οΈ The Implementation

The Ingestion Loop

The heart of the service is the run_ingestion() function, which iterates over the watchlist, fetches each asset's price and distributes the data:

def run_ingestion():
    """Fetches market data from Brapi, persists to MongoDB and publishes to Kafka."""
    logger.info("Market data ingestion job started")

    token = os.getenv("BRAPI_TOKEN")
    if not token:
        logger.error("BRAPI_TOKEN not set. Aborting.")
        return

    # Producer created per run to avoid crash on import if Kafka is unavailable
    producer = Producer({
        'bootstrap.servers': os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
        'client.id': 'broker-market-data-api'
    })

    mongo = MongoRepository()
    success_count = 0
    error_count = 0

    for ticker in WATCHLIST:
        try:
            url = f"https://brapi.dev/api/quote/{ticker}?token={token}"
            response = requests.get(url, timeout=10)
            response.raise_for_status()

            result = response.json()["results"][0]
            payload = {
                "ticker": result.get("symbol"),
                "name": result.get("longName"),
                "short_name": result.get("shortName"),
                "price": result.get("regularMarketPrice"),
                "volume": result.get("regularMarketVolume"),
                "updated_at": result.get("regularMarketTime")
            }

            # 1. Persist historical record in MongoDB
            mongo.save(payload.copy())

            # 2. Publish event to Kafka
            producer.produce(
                topic=TOPIC_NAME,
                key=ticker,
                value=json.dumps(payload).encode('utf-8'),
                callback=delivery_report
            )
            producer.poll(0)

            success_count += 1
            time.sleep(0.5)  # Rate limiting β€” respect Brapi free plan

        except Exception as e:
            error_count += 1
            logger.error(f"Error processing ticker {ticker}: {e}")

    producer.flush()
    logger.info(f"Ingestion completed: {success_count} succeeded, {error_count} failed")
Enter fullscreen mode Exit fullscreen mode

Important design decisions:

Why create the Producer inside the function? If it were at module level, an unavailable Kafka at import time would crash the script before executing any logic. Creating it inside the function means a Kafka failure doesn't prevent execution β€” only that run fails.

Why try/catch per ticker? If one asset fails (timeout, invalid ticker), the loop continues for the rest. The error_count at the end shows how many failed without stopping the work.

The time.sleep(0.5) β€” respects the rate limit of Brapi's free plan. For 50 assets, the full run takes ~25 seconds.

The Scheduling

if __name__ == "__main__":
    logger.info(f"Starting β€” sync every {SYNC_INTERVAL_MINUTES} minutes")

    # Run immediately on startup, then on schedule
    run_ingestion()

    schedule.every(SYNC_INTERVAL_MINUTES).minutes.do(run_ingestion)

    while True:
        schedule.run_pending()
        time.sleep(1)
Enter fullscreen mode Exit fullscreen mode

Why run immediately on startup? If the service restarts, we don't want to wait 30 minutes for data in Kafka. The first execution is immediate, subsequent ones follow the schedule.

The Kafka Payload

{
  "ticker": "PETR4",
  "name": "PetrΓ³leo Brasileiro S.A. - Petrobras",
  "short_name": "PETROBRAS PN",
  "price": 35.50,
  "volume": 15234567,
  "updated_at": 1714234567
}
Enter fullscreen mode Exit fullscreen mode

Topic: trading-assets-market-data-v1
Key: asset symbol (e.g. PETR4) β€” ensures events for the same asset go to the same partition


πŸ“‹ The Watchlist

50 assets covering the main B3 segments:

WATCHLIST = [
    # Blue Chips
    "PETR4", "PETR3", "VALE3", "ITUB4", "BBDC4", "BBAS3", ...
    # REITs / FIIs
    "MXRF11", "HGLG11", "KNIP11", "XPLG11", ...
    # ETFs
    "BOVA11", "IVVB11"
]
Enter fullscreen mode Exit fullscreen mode

πŸ” Security: Token Out of Code

# .env β€” never committed
BRAPI_TOKEN=your_token_here
Enter fullscreen mode Exit fullscreen mode
# application β€” no fallback, required variable
token = os.getenv("BRAPI_TOKEN")
if not token:
    logger.error("BRAPI_TOKEN not set. Aborting.")
    return
Enter fullscreen mode Exit fullscreen mode

Never expose credentials as default values in public repositories.


βœ… Validating the Execution

With the service running locally:

  • βœ… First run executes immediately on startup
  • βœ… MongoDB receives documents with created_at automatically
  • βœ… Kafka receives messages with key = ticker
  • βœ… Scheduler waits 30 minutes for the next round

πŸš€ What's Next?

With broker-market-data-api publishing quotes to Kafka, the next step is b3-market-sync-api β€” the Java service that consumes these events and synchronizes prices to Redis, feeding the Matching Engine with real-time data.


πŸ”Ž About the Series

⬅️ Previous Post: Infrastructure as Code

➑️ Next Post: Syncing the Real Market: Consuming Brapi and Feeding Redis with Spring Boot

πŸ“˜ Series Index: Series Roadmap


Links:

Top comments (0)