Cover image for Streaming and Background Work in FastAPI: SSE and BackgroundTasks

At a glance

Reading time

~200 words/min

Published

4 hours ago

Jun 10, 2026

Views

4

All-time total

Streaming and Background Work in FastAPI: SSE and BackgroundTasks

Part 6 adds two patterns that LLM apps lean on heavily. Streaming sends output token by token so users see a response forming instead of staring at a spinner. Background work hands slow tasks off so the request can return immediately. This part covers server sent events for streaming and BackgroundTasks for fire and forget work, with the option to graduate to a real queue.

What you will build

  • A streaming endpoint using server sent events
  • An async generator that yields chunks as they are produced
  • BackgroundTasks for work that should not block the response
  • When to move from BackgroundTasks to a real task queue

1. Why streaming matters for LLMs

A model can take several seconds to finish a long answer. If you wait for the whole thing, the user waits too. Streaming flips that: the first words appear almost immediately and the rest follow. Server sent events are the simplest transport, a long lived HTTP response that emits text chunks.

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def token_stream():
    for word in "streaming responses feel instant to users".split():
        yield f"data: {word}\n\n"   # SSE frame
        await asyncio.sleep(0.1)

@app.get("/stream")
async def stream():
    return StreamingResponse(token_stream(), media_type="text/event-stream")

Each yielded value is flushed to the client as it is produced. In Part 11 you will replace the fake generator with a real model stream, but the FastAPI side stays exactly this shape.

💡

Tip

Test it from the terminal

Run the app and use curl -N http://localhost:8000/stream to watch frames arrive one at a time. The -N flag disables buffering so you see the stream live.

2. Background tasks

Some work should happen after the response is sent: writing an audit log, sending a notification, warming a cache. BackgroundTasks runs it in the same process after the response returns, so the client is not kept waiting.

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def record_event(name: str) -> None:
    # runs after the response is sent
    print(f"event recorded: {name}")

@app.post("/projects")
async def create_project(tasks: BackgroundTasks) -> dict:
    project_id = 1
    tasks.add_task(record_event, f"project_created:{project_id}")
    return {"id": project_id}

3. When to reach for a real queue

BackgroundTasks runs in your web process. That is fine for short, best effort work. For anything slow, retryable, or important enough that it must survive a restart, use a real task queue such as Celery, RQ, or Dramatiq with a Redis or RabbitMQ broker.

BackgroundTasks vs a task queue
Concern BackgroundTasks Task queue
Runs in Web process Separate workers
Survives restart No Yes
Retries Manual Built in
Best for Fast, best effort work Slow, important, retryable work

Checkpoint

A user uploads a document that takes 40 seconds to index. Where should that work go?

4. Structure your SSE events

A bare stream of text is fine for a demo, but real clients want to tell apart a token, a status update, and the end of the stream. Server sent events support a named event type alongside the data line, and you can put JSON in the data so a chunk carries structure. Encoding each frame as JSON also sidesteps the newline problem, since a raw token containing a line break would otherwise split across frames.

import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def events():
    for word in ["retrieval", "augmented", "generation"]:
        yield f"event: token\ndata: {json.dumps({'text': word})}\n\n"
    yield f"event: done\ndata: {json.dumps({'reason': 'end'})}\n\n"

@app.get("/stream")
async def stream():
    return StreamingResponse(events(), media_type="text/event-stream")

The client now switches on the event name: append on a token event, show a spinner state on a status event, and close the connection on done. This is exactly the shape Part 11 uses to forward real model tokens, so getting the framing right here pays off later.

5. Keep long streams alive

Proxies and load balancers will close a connection that looks idle, even if the model is simply thinking before it emits the next token. A periodic heartbeat, a comment frame that the browser ignores, keeps the connection open without polluting the output. Send one every several seconds while you wait on the model.

import asyncio

async def events_with_heartbeat(source):
    async for token in source:
        yield f"data: {token}\n\n"
    # while waiting, a `: keep-alive` comment frame prevents idle timeouts:
    # yield ": keep-alive\n\n"
    await asyncio.sleep(0)

6. Make background tasks safe to retry

Any task that can be retried can run twice, because a worker might crash after doing the work but before recording success. Design side effects to be idempotent, so running them twice has the same result as running them once. The usual technique is an idempotency key: record that you processed a given event id, and skip it if you see it again.

PROCESSED: set[str] = set()

def send_welcome_email(event_id: str, user_email: str) -> None:
    if event_id in PROCESSED:
        return                      # already handled; safe to retry
    # ... actually send the email ...
    PROCESSED.add(event_id)

Warning

Background failures are silent

An exception inside a BackgroundTask cannot affect the response that already went out, so the error vanishes unless you log it. Always wrap background work in its own try and except with logging, and monitor it.

7. Knowing when to graduate to a queue

The honest signal that you have outgrown BackgroundTasks is when losing a task would matter. If a failed task must be retried automatically, if work must survive a deploy, or if it is heavy enough to steal capacity from request handling, move it to a worker. The endpoint code barely changes: instead of tasks.add_task you enqueue a job, and a separate worker process runs it with retries and visibility.

8. A complete streaming sketch

Here is the whole pattern in one place, still with a stand-in generator so you can run the FastAPI side today and swap in a real model stream in Part 11. The endpoint validates its body, streams structured token events, and ends with a done event. Note that the generator is where cancellation lands: if the client disconnects, FastAPI stops iterating it, so any cleanup belongs in a finally block.

import asyncio, json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel

app = FastAPI()

class Ask(BaseModel):
    question: str

async def fake_model_stream(question: str):
    for word in f"you asked: {question}".split():
        yield word
        await asyncio.sleep(0.05)

@app.post("/chat/stream")
async def chat_stream(body: Ask):
    async def events():
        try:
            async for token in fake_model_stream(body.question):
                yield f"event: token\ndata: {json.dumps({'text': token})}\n\n"
            yield f"event: done\ndata: {json.dumps({'reason': 'end'})}\n\n"
        finally:
            pass  # release any per-stream resources here on disconnect
    return StreamingResponse(events(), media_type="text/event-stream")

When Part 11 replaces fake_model_stream with the model client stream, nothing else in this endpoint changes. That is the value of getting the framing, the event types, and the cancellation handling right now, before a real, billable model is on the other end.

9. Choosing the right tool for the work

To pick between inline work, BackgroundTasks, and a queue, ask two questions: how long does it take, and does it matter if it is lost. Fast and best effort, like recording an analytics event, fits BackgroundTasks. Slow, or important enough to require a retry and survive a restart, belongs in a queue. Anything the user is actively waiting on the result of stays inline, and if that is slow, stream it so the wait feels short.

Where work should run
Work Latency to user Run it
Log an event after responding None BackgroundTasks
Resize an uploaded image Seconds BackgroundTasks or queue
Index a document for RAG Tens of seconds Queue with a worker
Answer a user question Immediate Inline, streamed

The bottom line

Streaming and background work are what make an LLM app feel responsive. Stream long answers with server sent events, push fast side effects to BackgroundTasks, and graduate to a real queue when work is slow or must not be lost. The deeper lesson is about perceived speed: users do not experience your average latency, they experience the wait before something happens, and both techniques attack that wait directly. Streaming turns a multi second silence into words appearing immediately, and moving slow work off the request path turns a blocked spinner into an instant response. Get the event framing, heartbeats, and cancellation right now, with a stand-in generator, and the only thing left for Part 11 is to connect a real model to the other end. Reach for a real queue the moment losing a task would matter, keep best effort side effects in BackgroundTasks, and make anything retryable idempotent so running it twice is harmless. With this in place, the next part makes sure all of it stays correct under change by adding tests.

? Frequently asked questions

Should I use WebSockets instead of SSE? +

For one way model output, SSE is simpler and proxy friendly. Use WebSockets when you need full duplex communication, such as a live collaborative session.

Can BackgroundTasks fail silently? +

Yes. An exception in a background task does not affect the already sent response, so log inside the task and monitor it.

How do I test a streaming endpoint? +

The test client can read a streamed response and let you assert on the chunks, and a fake generator stands in for the model so the test stays fast and deterministic, as covered in the testing part.

Up next: Part 7, testing FastAPI.

Newsletter

Want more posts like this?

Get practical software notes and tutorials delivered when something new is published.

No spam. Unsubscribe anytime.

How did this land?

Comments

0
Log in or sign up to join the discussion and react to this post.

No comments yet. Be the first to share your thoughts.

Related posts

FastAPI Fundamentals: Routing, Pydantic Models, and Dependency Injection

Build real FastAPI endpoints: typed routing, Pydantic request and response models, dependency injection, and automatic docs.

4 hours ago

FastAPI in Production: Settings, Auth, Middleware, and Project Structure

Harden a FastAPI app for production: typed settings with pydantic-settings, bearer auth, logging and CORS middleware, and a scalable project structure.

4 hours ago

Testing FastAPI the Right Way: pytest, the Test Client, and Validation

Test FastAPI with pytest and the test client: assert on validation, override dependencies to isolate from real services, and cover async and streaming code.

4 hours ago