Oven logo

Oven

Server-Sent Events for Starlette and FastAPI

Downloads PyPI Version Build Status Code Coverage

Background: https://sysid.github.io/server-sent-events/

Production ready Server-Sent Events implementation for Starlette and FastAPI following the W3C SSE specification.

Installation

pip install sse-starlette
uv add sse-starlette

# To run the examples and demonstrations
uv add sse-starlette[examples]

# Recommended ASGI server
uv add sse-starlette[uvicorn,granian,daphne]

Quick Start

import asyncio
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette import EventSourceResponse

async def generate_events():
    for i in range(10):
        yield {"data": f"Event {i}"}
        await asyncio.sleep(1)

async def sse_endpoint(request):
    return EventSourceResponse(generate_events())

app = Starlette(routes=[Route("/events", sse_endpoint)])

Core Features

  • Standards Compliant: Full SSE specification implementation
  • Framework Integration: Native Starlette and FastAPI support
  • Async/Await: Built on modern Python async patterns
  • Connection Management: Automatic client disconnect detection
  • Graceful Shutdown: Proper cleanup on server termination

Key Components

EventSourceResponse

The main response class that handles SSE streaming:

from sse_starlette import EventSourceResponse

# Basic usage
async def stream_data():
    for item in data:
        yield {"data": item, "event": "update", "id": str(item.id)}

return EventSourceResponse(stream_data())

ServerSentEvent

For structured event creation:

from sse_starlette import ServerSentEvent

event = ServerSentEvent(
    data="Custom message",
    event="notification", 
    id="msg-123",
    retry=5000
)

Advanced Usage

Custom Ping Configuration

from sse_starlette import ServerSentEvent

def custom_ping():
    return ServerSentEvent(comment="Custom ping message")

return EventSourceResponse(
    generate_events(),
    ping=10,  # Ping every 10 seconds
    ping_message_factory=custom_ping
)

Database Streaming (Thread-Safe)

async def stream_database_results(request):
    # CORRECT: Create session within generator context
    async with AsyncSession() as session:
        results = await session.execute(select(User))
        for row in results:
            if await request.is_disconnected():
                break
            yield {"data": row.name, "id": str(row.id)}

return EventSourceResponse(stream_database_results(request))

Error Handling and Timeouts

async def robust_stream(request):
    try:
        for i in range(100):
            if await request.is_disconnected():
                break
            yield {"data": f"Item {i}"}
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        # Client disconnected - perform cleanup
        raise

return EventSourceResponse(
    robust_stream(request),
    send_timeout=30,  # Timeout hanging sends
    headers={"Cache-Control": "no-cache"}
)

Memory Channels Alternative

For complex data flows, use memory channels instead of generators:

import anyio
from functools import partial

async def data_producer(send_channel):
    async with send_channel:
        for i in range(10):
            await send_channel.send({"data": f"Item {i}"})
            await anyio.sleep(1)

async def channel_endpoint(request):
    send_channel, receive_channel = anyio.create_memory_object_stream(10)
    
    return EventSourceResponse(
        receive_channel,
        data_sender_callable=partial(data_producer, send_channel)
    )

Configuration Options

EventSourceResponse Parameters

ParameterTypeDefaultDescription
contentContentStreamRequiredAsync generator or iterable
pingint15Ping interval in seconds
sepstr"\r\n"Line separator (\r\n, \r, \n)
send_timeoutfloatNoneSend operation timeout
headersdictNoneAdditional HTTP headers
ping_message_factoryCallableNoneCustom ping message creator

Client Disconnection

async def monitored_stream(request):
    events_sent = 0
    try:
        while events_sent < 100:
            if await request.is_disconnected():
                print(f"Client disconnected after {events_sent} events")
                break
            
            yield {"data": f"Event {events_sent}"}
            events_sent += 1
            await asyncio.sleep(1)
            
    except asyncio.CancelledError:
        print("Stream cancelled")
        raise

Testing

When testing SSE endpoints with pytest, reset the global event state:

import pytest
from sse_starlette.sse import AppStatus

@pytest.fixture
def reset_sse_app_status():
    AppStatus.should_exit_event = None
    yield
    AppStatus.should_exit_event = None

Production Considerations

Performance Limits

  • Memory: Each connection maintains a buffer. Monitor memory usage.
  • Connections: Limited by system file descriptors and application design.
  • Network: High-frequency events can saturate bandwidth.

Error Recovery

Implement client-side reconnection logic:

function createEventSource(url) {
    const eventSource = new EventSource(url);
    
    eventSource.onerror = function() {
        setTimeout(() => {
            createEventSource(url);  // Reconnect after delay
        }, 5000);
    };
    
    return eventSource;
}

Learning Resources

Examples Directory

The examples/ directory contains production-ready patterns:

  • 01_basic_sse.py: Fundamental SSE concepts
  • 02_message_broadcasting.py: Multi-client message distribution
  • 03_database_streaming.py: Thread-safe database integration
  • 04_advanced_features.py: Custom protocols and error handling

Demonstrations Directory

The examples/demonstrations/ directory provides educational scenarios:

Basic Patterns (basic_patterns/):

  • Client disconnect detection and cleanup
  • Graceful server shutdown behavior

Production Scenarios (production_scenarios/):

  • Load testing with concurrent clients
  • Network interruption handling

Advanced Patterns (advanced_patterns/):

  • Memory channels vs generators
  • Error recovery and circuit breakers
  • Custom protocol development

Run any demonstration:

python examples/demonstrations/basic_patterns/client_disconnect.py
python examples/demonstrations/production_scenarios/load_simulations.py
python examples/demonstrations/advanced_patterns/error_recovery.py

Troubleshooting

Common Issues

"RuntimeError: Event object bound to different event loop"

  • Reset AppStatus.should_exit_event = None between tests

Database session errors with async generators

  • Create database sessions inside generators, not as dependencies

Hanging connections after client disconnect

  • Always check await request.is_disconnected() in loops
  • Use send_timeout parameter to detect dead connections

If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826

Performance Optimization

# Connection limits
class ConnectionLimiter:
    def __init__(self, max_connections=100):
        self.semaphore = asyncio.Semaphore(max_connections)
    
    async def limited_endpoint(self, request):
        async with self.semaphore:
            return EventSourceResponse(generate_events())

Contributing

See examples and demonstrations for implementation patterns. Run tests with:

make test-unit  # Unit tests only
make test       # All tests including integration