sse-starlette3.0.3
Published
SSE plugin for Starlette
pip install sse-starlette
Package Downloads
Authors
Project URLs
Requires Python
>=3.9
Dependencies
- anyio>=4.7.0
- uvicorn>=0.34.0; extra == "examples"
- fastapi>=0.115.12; extra == "examples"
- sqlalchemy[asyncio]>=2.0.41; extra == "examples"
- starlette>=0.49.1; extra == "examples"
- aiosqlite>=0.21.0; extra == "examples"
- uvicorn>=0.34.0; extra == "uvicorn"
- granian>=2.3.1; extra == "granian"
- daphne>=4.2.0; extra == "daphne"
Server-Sent Events for Starlette and FastAPI
Background: https://sysid.github.io/server-sent-events/
Production ready Server-Sent Events implementation for Starlette and FastAPI following the W3C SSE specification.
Installation
pip install sse-starlette
uv add sse-starlette
# To run the examples and demonstrations
uv add sse-starlette[examples]
# Recommended ASGI server
uv add sse-starlette[uvicorn,granian,daphne]
Quick Start
import asyncio
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette import EventSourceResponse
async def generate_events():
    for i in range(10):
        yield {"data": f"Event {i}"}
        await asyncio.sleep(1)
async def sse_endpoint(request):
    return EventSourceResponse(generate_events())
app = Starlette(routes=[Route("/events", sse_endpoint)])
Core Features
- Standards Compliant: Full SSE specification implementation
- Framework Integration: Native Starlette and FastAPI support
- Async/Await: Built on modern Python async patterns
- Connection Management: Automatic client disconnect detection
- Graceful Shutdown: Proper cleanup on server termination
- Thread Safety: Context-local event management for multi-threaded applications
- Multi-Loop Support: Works correctly with multiple asyncio event loops
Key Components
EventSourceResponse
The main response class that handles SSE streaming:
from sse_starlette import EventSourceResponse
# Basic usage
async def stream_data():
    for item in data:
        yield {"data": item, "event": "update", "id": str(item.id)}
return EventSourceResponse(stream_data())
ServerSentEvent
For structured event creation:
from sse_starlette import ServerSentEvent
event = ServerSentEvent(
    data="Custom message",
    event="notification", 
    id="msg-123",
    retry=5000
)
JSONServerSentEvent
For an easy way to send json data as SSE events:
from sse_starlette import JSONServerSentEvent
event = JSONServerSentEvent(
    data={"field":"value"}, # Anything serializable with json.dumps
)
Advanced Usage
Custom Ping Configuration
from sse_starlette import ServerSentEvent
def custom_ping():
    return ServerSentEvent(comment="Custom ping message")
return EventSourceResponse(
    generate_events(),
    ping=10,  # Ping every 10 seconds
    ping_message_factory=custom_ping
)
Multi-Threaded Usage
sse-starlette now supports usage in multi-threaded applications and with multiple asyncio event loops:
import threading
import asyncio
from sse_starlette import EventSourceResponse
def run_sse_in_thread():
    """SSE streaming works correctly in separate threads"""
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    async def thread_events():
        for i in range(5):
            yield {"data": f"Thread event {i}"}
            await asyncio.sleep(1)
    
    # This works without "Event bound to different loop" errors
    response = EventSourceResponse(thread_events())
    loop.close()
# Start SSE in multiple threads
for i in range(3):
    thread = threading.Thread(target=run_sse_in_thread)
    thread.start()
Database Streaming (Thread-Safe)
async def stream_database_results(request):
    # CORRECT: Create session within generator context
    async with AsyncSession() as session:
        results = await session.execute(select(User))
        for row in results:
            if await request.is_disconnected():
                break
            yield {"data": row.name, "id": str(row.id)}
return EventSourceResponse(stream_database_results(request))
Error Handling and Timeouts
async def robust_stream(request):
    try:
        for i in range(100):
            if await request.is_disconnected():
                break
            yield {"data": f"Item {i}"}
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        # Client disconnected - perform cleanup
        raise
return EventSourceResponse(
    robust_stream(request),
    send_timeout=30,  # Timeout hanging sends
    headers={"Cache-Control": "no-cache"}
)
Memory Channels Alternative
For complex data flows, use memory channels instead of generators:
import anyio
from functools import partial
async def data_producer(send_channel):
    async with send_channel:
        for i in range(10):
            await send_channel.send({"data": f"Item {i}"})
            await anyio.sleep(1)
async def channel_endpoint(request):
    send_channel, receive_channel = anyio.create_memory_object_stream(10)
    
    return EventSourceResponse(
        receive_channel,
        data_sender_callable=partial(data_producer, send_channel)
    )
Configuration Options
EventSourceResponse Parameters
| Parameter | Type | Default | Description | 
|---|---|---|---|
| content | ContentStream | Required | Async generator or iterable | 
| ping | int | 15 | Ping interval in seconds | 
| sep | str | "\r\n" | Line separator ( \r\n,\r,\n) | 
| send_timeout | float | None | Send operation timeout | 
| headers | dict | None | Additional HTTP headers | 
| ping_message_factory | Callable | None | Custom ping message creator | 
Client Disconnection
async def monitored_stream(request):
    events_sent = 0
    try:
        while events_sent < 100:
            if await request.is_disconnected():
                print(f"Client disconnected after {events_sent} events")
                break
            
            yield {"data": f"Event {events_sent}"}
            events_sent += 1
            await asyncio.sleep(1)
            
    except asyncio.CancelledError:
        print("Stream cancelled")
        raise
Testing
sse-starlette includes now comprehensive test isolation without manual setup. The library automatically handles event loop contexts, eliminating the need for manual state resets:
# this is deprecated and not needed since version 3.0.0
import pytest
from sse_starlette import EventSourceResponse
@pytest.fixture
def reset_sse_app_status():
    AppStatus.should_exit_event = None
    yield
    AppStatus.should_exit_event = None
Production Considerations
Performance Limits
- Memory: Each connection maintains a buffer. Monitor memory usage.
- Connections: Limited by system file descriptors and application design.
- Network: High-frequency events can saturate bandwidth.
Error Recovery
Implement client-side reconnection logic:
function createEventSource(url) {
    const eventSource = new EventSource(url);
    
    eventSource.onerror = function() {
        setTimeout(() => {
            createEventSource(url);  // Reconnect after delay
        }, 5000);
    };
    
    return eventSource;
}
Learning Resources
Examples Directory
The examples/ directory contains production-ready patterns:
- 01_basic_sse.py: Fundamental SSE concepts
- 02_message_broadcasting.py: Multi-client message distribution
- 03_database_streaming.py: Thread-safe database integration
- 04_advanced_features.py: Custom protocols and error handling
Demonstrations Directory
The examples/demonstrations/ directory provides educational scenarios:
Basic Patterns (basic_patterns/):
- Client disconnect detection and cleanup
- Graceful server shutdown behavior
Production Scenarios (production_scenarios/):
- Load testing with concurrent clients
- Network interruption handling
Advanced Patterns (advanced_patterns/):
- Memory channels vs generators
- Error recovery and circuit breakers
- Custom protocol development
Run any demonstration:
python examples/demonstrations/basic_patterns/client_disconnect.py
python examples/demonstrations/production_scenarios/load_simulations.py
python examples/demonstrations/advanced_patterns/error_recovery.py
Troubleshooting
Common Issues
Database session errors with async generators
- Create database sessions inside generators, not as dependencies
Hanging connections after client disconnect
- Always check await request.is_disconnected()in loops
- Use send_timeoutparameter to detect dead connections
If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826
Performance Optimization
# Connection limits
class ConnectionLimiter:
    def __init__(self, max_connections=100):
        self.semaphore = asyncio.Semaphore(max_connections)
    
    async def limited_endpoint(self, request):
        async with self.semaphore:
            return EventSourceResponse(generate_events())
Network-Level Gotchas
Network infrastructure components can buffer SSE streams, breaking real-time delivery. Here are the most common issues and solutions:
Reverse Proxy Buffering (Nginx/Apache)
Problem: Nginx buffers responses by default, delaying SSE events until ~16KB accumulates.
Solution: Add the X-Accel-Buffering: no header.
Nginx Configuration (if you can't modify app headers):
location /events {
    proxy_pass http://localhost:8000;
    proxy_http_version 1.1;
    proxy_set_header Connection '';
    proxy_buffering off;           # Disable for this location
    chunked_transfer_encoding off;
}
CDN Issues
Cloudflare: Buffers ~100KB before flushing to clients, breaking real-time delivery. Akamai: Edge servers buffer by default.
Load Balancer Problems
HAProxy: Timeout settings must exceed heartbeat frequency.
# Ensure timeouts > ping interval
timeout client 60s    # If ping every 45s
timeout server 60s
F5 Load Balancers: Buffer responses by default.
Contributing
See examples and demonstrations for implementation patterns. Run tests with:
make test-unit  # Unit tests only
make test       # All tests including integration