DEV Community

Jangwook Kim
Jangwook Kim

Posted on • Originally published at jangwook.net

Building a FastAPI + Claude API Streaming Production Backend — SSE, Retry, and Error Recovery Guide

When building an AI backend, you eventually hit the same question: "Can I make users wait until the whole response is generated?" Most of the time the answer is no. When a model like Claude is producing a long piece of text, buffering everything and sending it all at once kills the UX.

Having integrated this into actual services, what I found is that streaming itself isn't the hard part. The real complexity is around it: what to do when you hit a rate limit, how to classify errors and handle each one differently, which headers you need to make SSE flow properly behind Nginx. This guide covers those production patterns — implemented and tested against FastAPI 0.136 and Anthropic SDK 0.97.

Prerequisites

  • Python 3.11 or later (3.12 recommended)
  • Anthropic API key (ANTHROPIC_API_KEY)
  • Basic understanding of FastAPI and asyncio

You only need four dependencies:

pip install fastapi uvicorn anthropic httpx
Enter fullscreen mode Exit fullscreen mode

If you're new to Python environment setup, setting up a Python AI development environment with uv is a good first read. It cleanly solves virtual environment and dependency conflict issues.

Step 1: Project Structure and Basic Setup

Start with a clean directory layout:

claude-streaming-api/
├── main.py          # FastAPI app + endpoints
├── retry.py         # retry logic
├── .env             # API key (gitignored)
├── Dockerfile
└── docker-compose.yml
Enter fullscreen mode Exit fullscreen mode

The skeleton of main.py:

import os
import anthropic
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

app = FastAPI(title="Claude Streaming API", version="1.0.0")

client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))


class ChatRequest(BaseModel):
    message: str
    max_tokens: int = 1024
    system: str = "You are a helpful assistant."
Enter fullscreen mode Exit fullscreen mode

Defining the request schema with Pydantic's BaseModel gives you automatic input validation and OpenAPI docs from FastAPI for free. As you can see in the screenshot below, the Swagger UI generates automatically.

FastAPI Swagger UI — Claude Streaming API endpoints

Running uvicorn main:app --reload locally and opening /docs gives you a live Swagger UI you can test directly. That convenience is one of the main reasons I reach for FastAPI.

Step 2: Implementing the SSE Streaming Endpoint

Server-Sent Events (SSE) is the simplest way to push a one-directional real-time stream over HTTP. It's simpler to implement than WebSocket and fits perfectly for the pattern of streaming text from server to client — exactly what Claude does.

The key is combining FastAPI's StreamingResponse with Anthropic SDK's stream() context manager:

import asyncio
import json
from typing import AsyncGenerator


async def stream_claude(request: ChatRequest) -> AsyncGenerator[str, None]:
    """Claude API streaming → SSE event generator"""
    try:
        with client.messages.stream(
            model="claude-opus-4-7-20251101",
            max_tokens=request.max_tokens,
            system=request.system,
            messages=[{"role": "user", "content": request.message}],
        ) as stream:
            for text in stream.text_stream:
                # SSE format: "data: {...}\n\n"
                yield f"data: {json.dumps({'text': text, 'type': 'delta'})}\n\n"

            yield f"data: {json.dumps({'type': 'done'})}\n\n"

    except anthropic.RateLimitError:
        yield f"data: {json.dumps({'type': 'error', 'error': 'rate_limit', 'retry_after': 30})}\n\n"
    except anthropic.AuthenticationError:
        yield f"data: {json.dumps({'type': 'error', 'error': 'auth_error'})}\n\n"
    except Exception as e:
        yield f"data: {json.dumps({'type': 'error', 'error': 'unknown', 'message': str(e)})}\n\n"


@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    return StreamingResponse(
        stream_claude(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable Nginx buffering — critical
        },
    )
Enter fullscreen mode Exit fullscreen mode

Testing with curl against a live server, the SSE stream looks like this:

$ curl -sN -X POST http://localhost:8000/chat/stream \
       -H "Content-Type: application/json" \
       -d '{"message": "Explain FastAPI and Claude integration"}'

data: {"type": "delta", "text": "FastAPI"}
data: {"type": "delta", "text": " and "}
data: {"type": "delta", "text": "Claude"}
...
data: {"type": "done"}
Enter fullscreen mode Exit fullscreen mode

The SSE format rules are simple: data: prefix + JSON + two newlines (\n\n). Follow that format and the browser's EventSource API or most SSE clients will parse it automatically.

One thing to watch: anthropic.Anthropic()'s messages.stream() is a synchronous context manager. To avoid blocking uvicorn's event loop inside an async FastAPI route, use AsyncAnthropic instead:

client = anthropic.AsyncAnthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))

async def stream_claude(request: ChatRequest) -> AsyncGenerator[str, None]:
    async with client.messages.stream(...) as stream:
        async for text in stream.text_stream:
            yield f"data: {json.dumps({'text': text, 'type': 'delta'})}\n\n"
Enter fullscreen mode Exit fullscreen mode

With AsyncAnthropic, you won't block uvicorn's event loop. The sync client works fine for low-traffic early-stage projects, but production warrants the async client.

Step 3: Error Classification and Retry Strategy

Don't handle all AI API errors the same way. Each error type calls for a different response:

Error Type Classification Correct Action
RateLimitError rate_limit Retry with exponential backoff
AuthenticationError auth_error Fail immediately, check API key
BadRequestError token_limit Fail immediately, shorten message
APIConnectionError network_error Retry with limits
Other unknown Fail immediately, log the event

An exponential backoff function that only retries rate limits and network errors:

MAX_RETRIES = 3
BASE_DELAY = 1.0  # seconds


async def call_with_retry(fn, *args, **kwargs):
    """Exponential backoff retry — only for rate_limit and network_error"""
    for attempt in range(MAX_RETRIES):
        try:
            return await fn(*args, **kwargs)
        except anthropic.RateLimitError as e:
            if attempt == MAX_RETRIES - 1:
                raise
            delay = BASE_DELAY * (2 ** attempt)
            print(f"[retry] rate_limit, waiting {delay}s (attempt {attempt + 1}/{MAX_RETRIES})")
            await asyncio.sleep(delay)
        except anthropic.APIConnectionError:
            if attempt == MAX_RETRIES - 1:
                raise
            await asyncio.sleep(BASE_DELAY * (2 ** attempt))
        except (anthropic.AuthenticationError, anthropic.BadRequestError):
            raise  # No point retrying these — propagate immediately
Enter fullscreen mode Exit fullscreen mode

When I tested this pattern locally — simulating a flaky API that fails twice before succeeding — the result was Result: success (after 3 attempts). The backoff worked as expected.

Honestly, the part of this I'm most uncertain about is the MAX_RETRIES and BASE_DELAY values. Rate limits differ per Anthropic plan, and if your retry interval is too short, you'll hit the same rate limit again. I'd recommend externalizing these values as environment variables based on your API plan.

Step 4: Health Checks and Production Deployment

In container environments like Kubernetes or ECS, a health check endpoint is non-negotiable:

import time


@app.get("/health")
async def health_check():
    """For K8s readiness / liveness probes"""
    return {"status": "ok", "timestamp": time.time()}
Enter fullscreen mode Exit fullscreen mode

Docker image:

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Enter fullscreen mode Exit fullscreen mode

For Nginx reverse proxy, you must disable buffering to let SSE flow properly:

location /chat/stream {
    proxy_pass         http://backend:8000;
    proxy_buffering    off;           # Critical: disable SSE buffering
    proxy_cache        off;
    proxy_set_header   Connection     '';
    proxy_http_version 1.1;
    proxy_read_timeout 300s;          # Allow long streaming sessions
    chunked_transfer_encoding on;
}
Enter fullscreen mode Exit fullscreen mode

Leaving out proxy_buffering off means Nginx collects the entire stream in its buffer and sends it all at once. That's not streaming — it's just a slow response. This is a mistake nearly everyone makes the first time they put SSE behind Nginx.

Step 5: Client Integration — Browser EventSource and Python

Browser (JavaScript):

// EventSource is GET-only — for POST requests, use fetch + ReadableStream
const response = await fetch('/chat/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ message: 'Hello!' }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;

  const text = decoder.decode(value);
  const lines = text.split('\n\n').filter(l => l.startsWith('data:'));

  for (const line of lines) {
    const data = JSON.parse(line.slice(6));
    if (data.type === 'delta') {
      outputElement.textContent += data.text;
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Python (httpx):

import httpx
import json

async def stream_chat(message: str):
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "http://localhost:8000/chat/stream",
            json={"message": message},
            timeout=60.0,
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data:"):
                    event = json.loads(line[6:])
                    if event["type"] == "delta":
                        print(event["text"], end="", flush=True)
Enter fullscreen mode Exit fullscreen mode

If you have a frontend using the Vercel AI SDK, building a Claude streaming agent with the Vercel AI SDK shows how to wire this up on the frontend side. The useChat hook handles SSE parsing for you, which makes client-side code much simpler.

Limitations and Where You'll Actually Get Stuck

Here are the honest limitations I hit when using this stack in real projects.

First, combining streaming with prompt caching is tricky. Claude's prompt caching reduces input token costs significantly. But when using streaming and caching together, you can't know mid-stream whether the cache was hit. The usage object is available after streaming completes, but if you need to reflect cache status in real time, the implementation gets complex. Read Claude API prompt caching cost optimization before you design your architecture around caching.

Second, uvicorn worker count and connection management is more involved than it looks. SSE keeps connections open for a long time. With --workers 4, you can handle at most 4 concurrent long-running streaming connections. When real traffic exceeds that, requests queue. You'll need horizontal scaling on Kubernetes or the gunicorn + uvicorn worker class combination.

Third, retry logic mid-stream is a hard problem. What do you do when a network error hits halfway through a stream? Restarting the request from scratch means the client gets duplicate text. The practical solution — having the client track last-event-id so the server can resume — is outside this guide's scope, but worth planning for early.

This pattern is also overkill for bulk processing where streaming isn't the point. If you're processing 1,000 documents in batch, the Anthropic Message Batches API is far cheaper and more appropriate.

Troubleshooting FAQ

Q: SSE arrives all at once instead of streaming

proxy_buffering off is missing from Nginx in most cases. Also check that the Content-Type: text/event-stream header is present — without it, browsers won't recognize the response as SSE.

Q: Intermittent asyncio.CancelledError

When a client disconnects mid-stream, FastAPI cancels the generator. Adding except asyncio.CancelledError: return inside stream_claude exits cleanly.

Q: RuntimeError: Event loop is closed

This can happen when using the synchronous anthropic.Anthropic() client inside an async context. Switching to anthropic.AsyncAnthropic() is the root fix.

Q: Rate limited, retries keep failing

Either BASE_DELAY is too short or burst traffic is hammering the same window. Check Anthropic's Rate Limits page for your plan's TPM/RPM limits and set BASE_DELAY to at least 5 seconds.

Closing: When to Choose This Stack

FastAPI + AsyncAnthropic + uvicorn is a good fit when:

  • You have a Python team and want to avoid the cost of adopting a new language stack
  • Streaming is a core UX element — AI chat, code generation, document drafting
  • You want OpenAPI documentation auto-generation and Pydantic validation out of the box

To be honest, this isn't the right stack for every situation. If you have a Node.js team, the Vercel AI SDK is faster to ship. If you need massive concurrent real-time connections, WebSocket or gRPC Streaming might be better. But for getting a Python AI streaming backend running quickly, this is the most practical starting point I've personally verified.

Next steps: apply prompt caching to cut costs, add OpenTelemetry tracing to your streaming responses, and make latency and token usage visible.

Top comments (0)