By the end of this tutorial, you’ll have a working FastAPI backend that streams Claude’s token output and tool call events over Server-Sent Events, plus a minimal JavaScript frontend that renders agent progress in real-time — no page refresh, no waiting for the full response to complete.
Streaming Claude API agents changes the perceived performance of your product more than almost any other optimization. A response that takes 8 seconds to complete feels fast when users see tokens appearing after 200ms. The same 8 seconds feels broken when the page sits blank. This tutorial covers the architecture, the actual SSE plumbing, and the parts the Anthropic docs gloss over — like how to surface tool call events mid-stream.
- Install dependencies — Set up FastAPI, the Anthropic SDK, and a basic project structure
- Configure the streaming client — Initialize the Anthropic client with streaming enabled and handle the context manager pattern
- Build the SSE endpoint — Create a FastAPI route that yields SSE-formatted events from Claude’s stream
- Handle tool call events mid-stream — Parse and forward tool use blocks as they arrive, not after completion
- Wire up the frontend — Connect EventSource to your endpoint and render tokens and tool status in real-time
- Add error boundaries and reconnect logic — Handle disconnects, rate limits, and stream interruptions gracefully
Why Streaming Matters More for Agents Than for Simple Chat
For a single-turn completion, streaming is a nice UX improvement. For agents that use tools, it’s close to essential. An agent that calls three tools before responding might take 15–30 seconds total. Without streaming, your users stare at a spinner for the entire duration. With streaming, they can watch the agent announce “searching the database… found 12 results… analyzing…” as each step completes.
This is especially true if you’re building on top of Claude tool use with Python — multi-step workflows where the agent’s reasoning process is itself valuable information to surface. The architecture we’re building here lets you stream both the thinking and the doing.
Claude 3.5 Sonnet streaming on Haiku costs roughly $0.0008 per 1K output tokens. A typical agent response with tool calls runs 400–800 tokens, so you’re looking at $0.0003–$0.0006 per full agent interaction. Streaming doesn’t change your token costs — you’re just delivering the same tokens progressively.
Step 1: Install Dependencies
You need Python 3.11+, FastAPI, Uvicorn, and the Anthropic SDK. Pin your versions — the streaming API interface changed between anthropic 0.18 and 0.25, and it’ll change again.
# requirements.txt
fastapi==0.111.0
uvicorn[standard]==0.30.1
anthropic==0.28.0
python-dotenv==1.0.1
httpx==0.27.0 # anthropic's internal dep, pin it too
pip install -r requirements.txt
Create a .env file at the project root:
ANTHROPIC_API_KEY=sk-ant-your-key-here
Step 2: Configure the Streaming Client
The Anthropic Python SDK exposes streaming through a context manager. The key thing most tutorials miss: you must use client.messages.stream(), not client.messages.create(stream=True). The latter gives you raw SSE bytes; the former gives you a typed stream with event parsing built in.
# client.py
import anthropic
import os
from dotenv import load_dotenv
load_dotenv()
# Initialize once at module level — don't re-create per request
client = anthropic.Anthropic(
api_key=os.environ["ANTHROPIC_API_KEY"],
# Optional: set a custom timeout for long agent runs
timeout=120.0,
)
# Tool definition for demonstration
TOOLS = [
{
"name": "search_database",
"description": "Search a database for relevant records",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"limit": {"type": "integer", "description": "Max results", "default": 5}
},
"required": ["query"]
}
}
]
Step 3: Build the SSE Endpoint
SSE is just HTTP with Content-Type: text/event-stream and a specific line format. FastAPI’s StreamingResponse handles the connection; you write the event formatting yourself. Each SSE message is data: {json}\n\n. That double newline is mandatory — miss it and the browser EventSource parser silently drops events.
# main.py
import json
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from client import client, TOOLS
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Tighten this in production
allow_methods=["POST"],
allow_headers=["*"],
)
def format_sse(event_type: str, data: dict) -> str:
"""Format a dict as an SSE message with an event type."""
payload = json.dumps(data)
return f"event: {event_type}\ndata: {payload}\n\n"
async def stream_agent_response(user_message: str):
"""Generator that yields SSE-formatted events from Claude's stream."""
# Use the sync streaming API in a thread pool to avoid blocking the event loop
def _sync_stream():
events = []
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=TOOLS,
messages=[{"role": "user", "content": user_message}],
) as stream:
for event in stream:
events.append(event)
return events
# Run the sync stream in a thread executor
loop = asyncio.get_event_loop()
# Yield a "start" event immediately so the client knows we're connected
yield format_sse("start", {"status": "connected"})
# For real-time streaming, use the async client instead
async with anthropic.AsyncAnthropic(
api_key=client.api_key,
timeout=120.0,
).messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=TOOLS,
messages=[{"role": "user", "content": user_message}],
) as stream:
async for event in stream:
event_type = event.type
if event_type == "content_block_start":
if hasattr(event.content_block, "type"):
block_type = event.content_block.type
if block_type == "tool_use":
# Surface the tool call immediately — don't wait for completion
yield format_sse("tool_start", {
"tool_name": event.content_block.name,
"tool_id": event.content_block.id,
})
elif event_type == "content_block_delta":
delta = event.delta
if delta.type == "text_delta":
yield format_sse("token", {"text": delta.text})
elif delta.type == "input_json_delta":
# Partial tool input — useful for showing what the agent is querying
yield format_sse("tool_input", {"partial": delta.partial_json})
elif event_type == "content_block_stop":
yield format_sse("block_stop", {"index": event.index})
elif event_type == "message_stop":
yield format_sse("done", {"stop_reason": "end_turn"})
@app.post("/stream")
async def stream_endpoint(request: Request):
body = await request.json()
user_message = body.get("message", "")
if not user_message:
return {"error": "message is required"}
return StreamingResponse(
stream_agent_response(user_message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Critical: disables Nginx buffering
}
)
The X-Accel-Buffering: no header is the one that catches people. If you deploy behind Nginx (which most production setups do), it’ll buffer your SSE stream and deliver it in chunks or all at once. This header turns that off.
Step 4: Handle Tool Call Events Mid-Stream
The pattern above surfaces tool names as soon as Claude decides to call them, before the input arguments are complete. This is what lets you show “🔍 Searching database…” in your UI while the agent is still constructing the query. If your agent workflow involves multiple tool calls, this creates a visible progress timeline rather than a single long pause.
For agents that execute tool calls server-side and re-inject results (the full agentic loop), you’ll want to extend this into a multi-turn pattern. Here’s the tool execution layer:
# tool_executor.py
import asyncio
from typing import Any
async def execute_tool(tool_name: str, tool_input: dict) -> Any:
"""Execute a tool and return its result. Add your real implementations here."""
if tool_name == "search_database":
# Simulate async database search
await asyncio.sleep(0.5) # Replace with real DB call
return {
"results": [
{"id": 1, "title": "Result 1", "score": 0.95},
{"id": 2, "title": "Result 2", "score": 0.87},
],
"total": 2
}
raise ValueError(f"Unknown tool: {tool_name}")
The multi-turn agentic loop (where Claude sees tool results and continues) requires more complex state management. Check out our guide on building Claude agents with persistent memory across sessions for the full architecture — the streaming layer here composes cleanly with that pattern.
Step 5: Wire Up the Frontend
The browser EventSource API only supports GET requests, which is why we’re using fetch with a custom reader instead. This is a common gotcha — SSE via POST requires manual stream reading.
<!-- index.html -->
<div id="status"></div>
<div id="output"></div>
<div id="tools"></div>
<script>
async function sendMessage(message) {
const outputEl = document.getElementById('output');
const toolsEl = document.getElementById('tools');
const statusEl = document.getElementById('status');
outputEl.textContent = '';
toolsEl.innerHTML = '';
const response = await fetch('/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// SSE messages are separated by double newlines
const messages = buffer.split('\n\n');
buffer = messages.pop(); // Keep incomplete message in buffer
for (const message of messages) {
if (!message.trim()) continue;
// Parse event type and data from SSE format
const lines = message.split('\n');
let eventType = 'message';
let data = '';
for (const line of lines) {
if (line.startsWith('event: ')) eventType = line.slice(7);
if (line.startsWith('data: ')) data = line.slice(6);
}
if (!data) continue;
try {
const payload = JSON.parse(data);
if (eventType === 'token') {
outputEl.textContent += payload.text; // Append tokens as they arrive
} else if (eventType === 'tool_start') {
const toolEl = document.createElement('div');
toolEl.id = `tool-${payload.tool_id}`;
toolEl.textContent = `🔍 Calling ${payload.tool_name}...`;
toolsEl.appendChild(toolEl);
} else if (eventType === 'done') {
statusEl.textContent = 'Complete';
}
} catch (e) {
console.error('Parse error:', e, data);
}
}
}
}
</script>
Step 6: Add Error Boundaries and Reconnect Logic
Streams die. Network hiccups, Anthropic rate limits, Lambda timeouts — all of these will interrupt your SSE connection mid-response. You need a client-side reconnect strategy and server-side error events. This is also where you should integrate with your broader LLM fallback and retry logic — streaming failures need the same graceful degradation patterns as any other API call.
# Add to your stream generator — wrap the stream in try/except
async def stream_agent_response(user_message: str):
yield format_sse("start", {"status": "connected"})
try:
async with anthropic.AsyncAnthropic(
api_key=client.api_key
).messages.stream(...) as stream:
async for event in stream:
# ... event handling as above
pass
except anthropic.RateLimitError:
yield format_sse("error", {
"type": "rate_limit",
"message": "Rate limit hit — please retry in a moment",
"retryable": True
})
except anthropic.APITimeoutError:
yield format_sse("error", {
"type": "timeout",
"message": "Request timed out",
"retryable": True
})
except Exception as e:
yield format_sse("error", {
"type": "unknown",
"message": str(e),
"retryable": False
})
finally:
yield format_sse("stream_end", {})
On the client, listen for error events and implement exponential backoff before retrying. A max of 3 retries with 1s, 2s, 4s delays covers most transient failures without hammering the API.
Common Errors
Stream completes instantly with no tokens
You’re almost certainly hitting the anthropic.Anthropic (sync) client inside an async context. The sync client blocks the event loop and the generator exhausts before yielding anything useful. Switch to anthropic.AsyncAnthropic and use async for as shown above. If you must use the sync client, run it with asyncio.run_in_executor.
Nginx buffers the entire stream and delivers it all at once
Two fixes: set X-Accel-Buffering: no in your response headers (shown above), and add proxy_buffering off; to your Nginx location block. If you’re on a managed platform (Railway, Render, etc.), check their SSE documentation — some require a specific header or environment variable to disable buffering.
EventSource or fetch stream drops silently after ~30 seconds
Load balancers and reverse proxies often have idle connection timeouts of 30–60 seconds. Send a keepalive event every 15 seconds to prevent this: yield ": keepalive\n\n" — SSE comments (lines starting with :) are ignored by the client parser but reset the idle timer.
Deployment Considerations
SSE streams require persistent connections, which means you can’t use serverless functions with short timeouts (AWS Lambda’s default 29s timeout will kill long agent runs). Use a long-running process on Railway, Fly.io, or a VM. If you need serverless, set Lambda timeout to 15 minutes and enable response streaming — AWS added native Lambda streaming support in 2023.
For observability, you’ll want to log token counts and latency at the stream level. Integrate with an LLM observability platform — we’ve compared Helicone vs LangSmith vs Langfuse in detail if you’re choosing between them. Both Helicone and Langfuse support streaming spans natively.
What to Build Next
Add a streaming agent that surfaces its reasoning steps. Claude 3.5 Sonnet supports extended thinking in the API. Wire the thinking blocks into your SSE stream as a separate event type (thinking_delta) and render them in a collapsible “Agent reasoning” panel. Users who want to understand why the agent made a decision can expand it; everyone else ignores it. This is one of the highest-leverage UX patterns for building trust in agent-powered features — and it pairs naturally with the Claude Agent SDK patterns if you want to move toward a more structured agent loop architecture.
Bottom Line
Solo founders: Implement the basic token streaming endpoint first (Steps 1–3), ship it, and add tool call visibility later. The perceived latency improvement alone is worth the 2 hours of work.
Teams building production agents: Invest in the full event type taxonomy from the start — tool_start, tool_input, tool_result, token, error, done. Retrofitting this onto a flat text stream is painful. The reconnect logic and keepalive pings are not optional — you’ll need them within the first week of real user traffic.
Budget-conscious builders: Streaming Claude API agents costs exactly the same as non-streaming — you’re paying for tokens, not connection time. The only cost difference is slightly higher server costs from persistent connections, which is negligible at typical agent traffic volumes. The UX improvement is essentially free.
Frequently Asked Questions
Can I stream Claude’s tool call results, not just the arguments?
Tool results aren’t streamed by the API — they’re injected back into the conversation as complete messages. What you can stream is the moment Claude decides to call a tool (content_block_start with type tool_use) and the partial JSON input as it’s generated (input_json_delta). To show tool results progressively, execute the tool server-side and yield an SSE event with the result before passing it back to Claude for the next turn.
Why does my SSE stream work locally but buffer in production?
Almost always Nginx buffering. Add X-Accel-Buffering: no to your response headers and proxy_buffering off; to your Nginx config. If you’re on a managed platform, check for a “disable response buffering” setting — Render, Railway, and Fly.io all have this. AWS ALB requires proxy_read_timeout to be increased as well.
How do I handle streaming with Claude when using tool use in a loop?
Each turn of the agentic loop is a separate streaming request. After Claude’s stream ends with stop_reason: tool_use, execute the tools, collect their results, and start a new streaming request with the full conversation history including tool results. Yield SSE events between turns to keep the client informed of progress — a tool_executing event while you wait for the tool result prevents the client from thinking the stream has died.
What’s the difference between streaming with EventSource vs fetch?
The browser EventSource API only supports GET requests with no custom body, which makes it unsuitable for sending user messages. Use fetch with a ReadableStream reader instead — it supports POST with a JSON body and gives you the same streaming behavior. The parsing logic is a few extra lines but the control is much better. EventSource is fine for one-way notification streams where you don’t need to send data with the request.
Does streaming work with Claude on AWS Bedrock or Vertex AI?
Yes, but the event format differs slightly. Bedrock uses its own streaming response format via the invoke_model_with_response_stream API, and you’ll need to parse Bedrock’s chunk structure rather than Anthropic’s native SDK events. Vertex AI has its own streaming interface as well. If you’re committed to the Anthropic SDK event types shown here, use the Anthropic API directly rather than the cloud provider wrappers.
Put this into practice
Try the Connection Agent agent — ready to use, no setup required.
Editorial note: API pricing, model capabilities, and tool features change frequently — always verify current details on the vendor’s website before building in production. Code examples are tested at time of writing; pin your dependency versions to avoid breaking changes. Some links in this article may be affiliate links — we may earn a commission if you sign up, at no extra cost to you.

