Part 11 connects two earlier threads. In Part 6 you streamed fake tokens from FastAPI with server sent events; in Part 8 you called a model. Now you stream a real model response to the browser, so answers appear as they are written. This is the difference between an app that feels slow and one that feels alive.
What you will build
- A model stream consumed token by token in Python
- A FastAPI SSE endpoint that forwards model tokens
- Browser code that renders the stream as it arrives
- Handling disconnects and errors mid stream
1. Streaming from the model
The SDK exposes a streaming context manager. Iterate its text stream to get chunks as the model produces them, instead of waiting for the whole message. This is also the recommended path for long outputs, because it avoids request timeouts.
from anthropic import Anthropic
client = Anthropic()
with client.messages.stream(
model="claude-opus-4-8",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain RAG in three sentences."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
final = stream.get_final_message() # full message once streaming ends
2. Forwarding the stream through FastAPI
Combine the model stream with the SSE pattern from Part 6. The endpoint becomes an async generator that yields each model chunk as a server sent event. The browser receives tokens the moment the model writes them.
from anthropic import AsyncAnthropic
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI()
client = AsyncAnthropic()
class Ask(BaseModel):
question: str
@app.post("/chat/stream")
async def chat_stream(body: Ask):
async def events():
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=1024,
messages=[{"role": "user", "content": body.question}],
) as stream:
async for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "event: done\ndata: end\n\n"
return StreamingResponse(events(), media_type="text/event-stream")
Warning
Mind the SSE format
Each frame is data: then your text then two newlines. If your tokens can contain newlines, encode them, for example as JSON, so a chunk is never split across frames.
3. Rendering in the browser
On the client, read the stream and append text as it arrives. The fetch reader below works with the endpoint above and updates the page on every chunk.
const resp = await fetch("/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ question: "Explain RAG in three sentences." }),
});
const reader = resp.body.getReader();
const decoder = new TextDecoder();
const out = document.getElementById("answer");
while (true) {
const { value, done } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
for (const line of chunk.split("\n")) {
if (line.startsWith("data: ")) out.textContent += line.slice(6);
}
}
4. Disconnects and errors
Users close tabs mid answer. When the client disconnects, the generator is cancelled, so wrap model work to stop cleanly and avoid paying for tokens nobody will read. Catch model errors inside the generator and emit an error event so the browser can show a message instead of hanging.
Checkpoint
Why stream long model answers instead of returning them all at once?
5. Capture usage after the stream
Streaming should not cost you observability. The streaming context manager still gives you the complete message once it finishes, including the usage object, so you keep the per call logging from Part 8. Read get_final_message after the loop and log tokens and the stop reason exactly as you would for a non streamed call. You get the responsive feel of streaming and the operational visibility of a normal request.
with client.messages.stream(
model="claude-opus-4-8",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain RAG."}],
) as stream:
for text in stream.text_stream:
... # forward each chunk to the client
final = stream.get_final_message()
u = final.usage
print(f"in={u.input_tokens} out={u.output_tokens} stop={final.stop_reason}")
6. Stream structured events, not raw text
Forwarding bare text works until you need to signal anything other than content: an error, the end of the stream, or later a tool call. Reuse the JSON event framing from Part 6 so each frame carries a type. The browser then knows the difference between a token to append, a done event to close on, and an error event to surface, instead of trying to infer meaning from a stream of words.
import json
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
async def model_events(question: str):
try:
async with client.messages.stream(
model="claude-opus-4-8",
max_tokens=1024,
messages=[{"role": "user", "content": question}],
) as stream:
async for text in stream.text_stream:
yield f"event: token\ndata: {json.dumps({'text': text})}\n\n"
yield f"event: done\ndata: {json.dumps({'reason': 'end'})}\n\n"
except Exception:
yield f"event: error\ndata: {json.dumps({'message': 'stream failed'})}\n\n"
7. Disconnects, abuse, and trust
A streaming endpoint is still a public endpoint, so the production rules from Part 5 apply. Authenticate it, rate limit it, and validate the request body, because a model call is expensive and an open streaming route is an inviting target. Treat everything from the browser as untrusted: never put raw user text into a system instruction, and never let the client choose the model or token budget. When the user navigates away mid answer, the generator is cancelled, so wrap the model work so it stops promptly and you stop paying for tokens nobody will read.
Warning
Streamed tokens are not validated JSON
If a feature needs structured output, do not try to parse half-formed JSON as it streams. Stream the human-facing text for responsiveness, and make a separate, non-streamed structured-output call when you need a reliable object, as in Part 9.
8. Polish the browser experience
Raw token by token rendering can feel jittery, and updating the DOM on every single token is wasteful. Two small touches help. Buffer tokens briefly and flush on an animation frame so the page updates smoothly rather than thrashing. And if your answers contain markdown, accumulate the full text and render it incrementally, rather than trying to parse half formed markdown on every chunk. The reader sees a steady, readable stream instead of a flicker of partial formatting.
let buffer = "";
let scheduled = false;
const out = document.getElementById("answer");
function flush() {
out.textContent += buffer; // or render accumulated markdown
buffer = "";
scheduled = false;
}
function onToken(text) {
buffer += text;
if (!scheduled) {
scheduled = true;
requestAnimationFrame(flush); // batch DOM updates to one per frame
}
}
The reader cannot tell whether tokens arrived every ten milliseconds or were batched into frames, but the page stays smooth and the browser does far less work. This matters most on long answers, where naive per token updates can visibly stutter.
9. Parse SSE events on the client
Once the server sends structured events, the client should read them as events rather than scanning for the word data. Split each chunk into lines, track the current event name, and act on the JSON payload. This is the counterpart to the structured framing on the server, and it is what lets a token, a done signal, and an error travel down the same stream and be handled differently.
const reader = resp.body.getReader();
const decoder = new TextDecoder();
let event = "message";
while (true) {
const { value, done } = await reader.read();
if (done) break;
for (const line of decoder.decode(value).split("\n")) {
if (line.startsWith("event: ")) event = line.slice(7).trim();
else if (line.startsWith("data: ")) {
const payload = JSON.parse(line.slice(6));
if (event === "token") onToken(payload.text);
else if (event === "error") showError(payload.message);
else if (event === "done") return;
}
}
}
Tip
EventSource is an option for GET streams
The browser native EventSource API parses SSE for you, but it only supports GET requests. For a POST with a JSON body, the manual fetch reader above is the right tool, and it gives you full control over headers and cancellation.
The bottom line
Streaming is the feature users feel most. Consume the model stream in Python, forward it through a FastAPI SSE endpoint, render it in the browser, and handle disconnects so you do not waste tokens. Keep the structured event framing from the background work part, capture usage with get_final_message so you keep full observability, and batch DOM updates so long answers stay smooth. Remember too that a streaming route is still a public, billable endpoint, so the authentication, rate limiting, and input validation from the production part all apply here without exception. Your app now responds the moment the model starts thinking, which is the single change that does the most to make it feel fast. The final part ties everything together into a small agent that can take actions.
? Frequently asked questions
SSE or WebSockets for this? +
SSE is ideal for one way model output and works through most proxies. Use WebSockets only if you need two way messaging on the same connection.
How do I stream and still log usage? +
Call get_final_message after the stream ends. It returns the complete message including the usage object you logged in Part 8.
Should the browser ever pick the model or token budget? +
No. Treat those as server side settings. Letting the client choose them invites abuse and surprise cost, so accept only the user content and decide the rest on the server.
Up next: Part 12, building an AI agent API.
Comments
0No comments yet. Be the first to share your thoughts.