sse-starlette2.3.6
Published
SSE plugin for Starlette
pip install sse-starlette
Package Downloads
Authors
Project URLs
Requires Python
>=3.9
Dependencies
- anyio
>=4.7.0
- uvicorn
>=0.34.0; extra == "examples"
- fastapi
>=0.115.12; extra == "examples"
- sqlalchemy
[asyncio,examples]>=2.0.41; extra == "examples"
- starlette
>=0.41.3; extra == "examples"
- aiosqlite
>=0.21.0; extra == "examples"
- uvicorn
>=0.34.0; extra == "uvicorn"
- granian
>=2.3.1; extra == "granian"
- daphne
>=4.2.0; extra == "daphne"
Server-Sent Events for Starlette and FastAPI
Background: https://sysid.github.io/server-sent-events/
Production ready Server-Sent Events implementation for Starlette and FastAPI following the W3C SSE specification.
Installation
pip install sse-starlette
uv add sse-starlette
# To run the examples and demonstrations
uv add sse-starlette[examples]
# Recommended ASGI server
uv add sse-starlette[uvicorn,granian,daphne]
Quick Start
import asyncio
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette import EventSourceResponse
async def generate_events():
for i in range(10):
yield {"data": f"Event {i}"}
await asyncio.sleep(1)
async def sse_endpoint(request):
return EventSourceResponse(generate_events())
app = Starlette(routes=[Route("/events", sse_endpoint)])
Core Features
- Standards Compliant: Full SSE specification implementation
- Framework Integration: Native Starlette and FastAPI support
- Async/Await: Built on modern Python async patterns
- Connection Management: Automatic client disconnect detection
- Graceful Shutdown: Proper cleanup on server termination
Key Components
EventSourceResponse
The main response class that handles SSE streaming:
from sse_starlette import EventSourceResponse
# Basic usage
async def stream_data():
for item in data:
yield {"data": item, "event": "update", "id": str(item.id)}
return EventSourceResponse(stream_data())
ServerSentEvent
For structured event creation:
from sse_starlette import ServerSentEvent
event = ServerSentEvent(
data="Custom message",
event="notification",
id="msg-123",
retry=5000
)
Advanced Usage
Custom Ping Configuration
from sse_starlette import ServerSentEvent
def custom_ping():
return ServerSentEvent(comment="Custom ping message")
return EventSourceResponse(
generate_events(),
ping=10, # Ping every 10 seconds
ping_message_factory=custom_ping
)
Database Streaming (Thread-Safe)
async def stream_database_results(request):
# CORRECT: Create session within generator context
async with AsyncSession() as session:
results = await session.execute(select(User))
for row in results:
if await request.is_disconnected():
break
yield {"data": row.name, "id": str(row.id)}
return EventSourceResponse(stream_database_results(request))
Error Handling and Timeouts
async def robust_stream(request):
try:
for i in range(100):
if await request.is_disconnected():
break
yield {"data": f"Item {i}"}
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# Client disconnected - perform cleanup
raise
return EventSourceResponse(
robust_stream(request),
send_timeout=30, # Timeout hanging sends
headers={"Cache-Control": "no-cache"}
)
Memory Channels Alternative
For complex data flows, use memory channels instead of generators:
import anyio
from functools import partial
async def data_producer(send_channel):
async with send_channel:
for i in range(10):
await send_channel.send({"data": f"Item {i}"})
await anyio.sleep(1)
async def channel_endpoint(request):
send_channel, receive_channel = anyio.create_memory_object_stream(10)
return EventSourceResponse(
receive_channel,
data_sender_callable=partial(data_producer, send_channel)
)
Configuration Options
EventSourceResponse Parameters
Parameter | Type | Default | Description |
---|---|---|---|
content | ContentStream | Required | Async generator or iterable |
ping | int | 15 | Ping interval in seconds |
sep | str | "\r\n" | Line separator (\r\n , \r , \n ) |
send_timeout | float | None | Send operation timeout |
headers | dict | None | Additional HTTP headers |
ping_message_factory | Callable | None | Custom ping message creator |
Client Disconnection
async def monitored_stream(request):
events_sent = 0
try:
while events_sent < 100:
if await request.is_disconnected():
print(f"Client disconnected after {events_sent} events")
break
yield {"data": f"Event {events_sent}"}
events_sent += 1
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Stream cancelled")
raise
Testing
When testing SSE endpoints with pytest, reset the global event state:
import pytest
from sse_starlette.sse import AppStatus
@pytest.fixture
def reset_sse_app_status():
AppStatus.should_exit_event = None
yield
AppStatus.should_exit_event = None
Production Considerations
Performance Limits
- Memory: Each connection maintains a buffer. Monitor memory usage.
- Connections: Limited by system file descriptors and application design.
- Network: High-frequency events can saturate bandwidth.
Error Recovery
Implement client-side reconnection logic:
function createEventSource(url) {
const eventSource = new EventSource(url);
eventSource.onerror = function() {
setTimeout(() => {
createEventSource(url); // Reconnect after delay
}, 5000);
};
return eventSource;
}
Learning Resources
Examples Directory
The examples/
directory contains production-ready patterns:
01_basic_sse.py
: Fundamental SSE concepts02_message_broadcasting.py
: Multi-client message distribution03_database_streaming.py
: Thread-safe database integration04_advanced_features.py
: Custom protocols and error handling
Demonstrations Directory
The examples/demonstrations/
directory provides educational scenarios:
Basic Patterns (basic_patterns/
):
- Client disconnect detection and cleanup
- Graceful server shutdown behavior
Production Scenarios (production_scenarios/
):
- Load testing with concurrent clients
- Network interruption handling
Advanced Patterns (advanced_patterns/
):
- Memory channels vs generators
- Error recovery and circuit breakers
- Custom protocol development
Run any demonstration:
python examples/demonstrations/basic_patterns/client_disconnect.py
python examples/demonstrations/production_scenarios/load_simulations.py
python examples/demonstrations/advanced_patterns/error_recovery.py
Troubleshooting
Common Issues
"RuntimeError: Event object bound to different event loop"
- Reset
AppStatus.should_exit_event = None
between tests
Database session errors with async generators
- Create database sessions inside generators, not as dependencies
Hanging connections after client disconnect
- Always check
await request.is_disconnected()
in loops - Use
send_timeout
parameter to detect dead connections
If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826
Performance Optimization
# Connection limits
class ConnectionLimiter:
def __init__(self, max_connections=100):
self.semaphore = asyncio.Semaphore(max_connections)
async def limited_endpoint(self, request):
async with self.semaphore:
return EventSourceResponse(generate_events())
Contributing
See examples and demonstrations for implementation patterns. Run tests with:
make test-unit # Unit tests only
make test # All tests including integration