Oven logo

Oven

Published

Consume Server-Sent Event (SSE) messages with HTTPX.

pip install httpx-sse

Package Downloads

Weekly DownloadsMonthly Downloads

Project URLs

Requires Python

>=3.8

Dependencies

    httpx-sse

    Build Status Coverage Package version

    Consume Server-Sent Event (SSE) messages with HTTPX.

    Table of contents

    Installation

    NOTE: This is beta software. Please be sure to pin your dependencies.

    pip install httpx-sse=="0.4.*"
    

    Quickstart

    httpx-sse provides the connect_sse and aconnect_sse helpers for connecting to an SSE endpoint. The resulting EventSource object exposes the .iter_sse() and .aiter_sse() methods to iterate over the server-sent events.

    Example usage:

    import httpx
    from httpx_sse import connect_sse
    
    with httpx.Client() as client:
        with connect_sse(client, "GET", "http://localhost:8000/sse") as event_source:
            for sse in event_source.iter_sse():
                print(sse.event, sse.data, sse.id, sse.retry)
    

    You can try this against this example Starlette server (credit):

    # Requirements: pip install uvicorn starlette sse-starlette
    import asyncio
    import uvicorn
    from starlette.applications import Starlette
    from starlette.routing import Route
    from sse_starlette.sse import EventSourceResponse
    
    async def numbers(minimum, maximum):
        for i in range(minimum, maximum + 1):
            await asyncio.sleep(0.9)
            yield {"data": i}
    
    async def sse(request):
        generator = numbers(1, 5)
        return EventSourceResponse(generator)
    
    routes = [
        Route("/sse", endpoint=sse)
    ]
    
    app = Starlette(routes=routes)
    
    if __name__ == "__main__":
        uvicorn.run(app)
    

    How-To

    Calling into Python web apps

    You can call into Python web apps with HTTPX and httpx-sse to test SSE endpoints directly.

    Here's an example of calling into a Starlette ASGI app...

    import asyncio
    
    import httpx
    from httpx_sse import aconnect_sse
    from sse_starlette.sse import EventSourceResponse
    from starlette.applications import Starlette
    from starlette.routing import Route
    
    async def auth_events(request):
        async def events():
            yield {
                "event": "login",
                "data": '{"user_id": "4135"}',
            }
    
        return EventSourceResponse(events())
    
    app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])
    
    async def main():
        async with httpx.AsyncClient(app=app) as client:
            async with aconnect_sse(
                client, "GET", "http://localhost:8000/sse/auth/"
            ) as event_source:
                events = [sse async for sse in event_source.aiter_sse()]
                (sse,) = events
                assert sse.event == "login"
                assert sse.json() == {"user_id": "4135"}
    
    asyncio.run(main())
    

    Handling reconnections

    (Advanced)

    SSETransport and AsyncSSETransport don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an httpx.ReadError from iter_sse() (or aiter_sse()).

    However, httpx-sse does allow implementing reconnection by using the Last-Event-ID and reconnection time (in milliseconds), exposed as sse.id and sse.retry respectively.

    Here's how you might achieve this using stamina...

    import time
    from typing import Iterator
    
    import httpx
    from httpx_sse import connect_sse, ServerSentEvent
    from stamina import retry
    
    def iter_sse_retrying(client, method, url):
        last_event_id = ""
        reconnection_delay = 0.0
    
        # `stamina` will apply jitter and exponential backoff on top of
        # the `retry` reconnection delay sent by the server.
        @retry(on=httpx.ReadError)
        def _iter_sse():
            nonlocal last_event_id, reconnection_delay
    
            time.sleep(reconnection_delay)
    
            headers = {"Accept": "text/event-stream"}
    
            if last_event_id:
                headers["Last-Event-ID"] = last_event_id
    
            with connect_sse(client, method, url, headers=headers) as event_source:
                for sse in event_source.iter_sse():
                    last_event_id = sse.id
    
                    if sse.retry is not None:
                        reconnection_delay = sse.retry / 1000
    
                    yield sse
    
        return _iter_sse()
    

    Usage:

    with httpx.Client() as client:
        for sse in iter_sse_retrying(client, "GET", "http://localhost:8000/sse"):
            print(sse.event, sse.data)
    

    API Reference

    connect_sse

    def connect_sse(
        client: httpx.Client,
        method: str,
        url: Union[str, httpx.URL],
        **kwargs,
    ) -> ContextManager[EventSource]
    

    Connect to an SSE endpoint and return an EventSource context manager.

    This sets Cache-Control: no-store on the request, as per the SSE spec, as well as Accept: text/event-stream.

    If the response Content-Type is not text/event-stream, this will raise an SSEError.

    aconnect_sse

    async def aconnect_sse(
        client: httpx.AsyncClient,
        method: str,
        url: Union[str, httpx.URL],
        **kwargs,
    ) -> AsyncContextManager[EventSource]
    

    An async equivalent to connect_sse.

    EventSource

    def __init__(response: httpx.Response)
    

    Helper for working with an SSE response.

    response

    The underlying httpx.Response.

    iter_sse

    def iter_sse() -> Iterator[ServerSentEvent]
    

    Decode the response content and yield corresponding ServerSentEvent.

    Example usage:

    for sse in event_source.iter_sse():
        ...
    

    aiter_sse

    async def iter_sse() -> AsyncIterator[ServerSentEvent]
    

    An async equivalent to iter_sse.

    ServerSentEvent

    Represents a server-sent event.

    • event: str - Defaults to "message".
    • data: str - Defaults to "".
    • id: str - Defaults to "".
    • retry: str | None - Defaults to None.

    Methods:

    • json() -> Any - Returns sse.data decoded as JSON.

    SSEError

    An error that occurred while making a request to an SSE endpoint.

    Parents:

    • httpx.TransportError

    License

    MIT

    Changelog

    All notable changes to this project will be documented in this file.

    The format is based on Keep a Changelog.

    0.4.0 - 2023-12-22

    Removed

    • Dropped Python 3.7 support, as it has reached EOL. (Pull #21)

    Added

    • Add official support for Python 3.12. (Pull #21)

    Fixed

    • Allow Content-Type that contain but are not strictly text/event-stream. (Pull #22 by @dbuades)
    • Improve error message when Content-Type is missing. (Pull #20 by @jamesbraza)

    0.3.1 - 2023-06-01

    Added

    • Add __repr__() for ServerSentEvent model, which may help with debugging and other tasks. (Pull #16)

    0.3.0 - 2023-04-27

    Changed

    • Raising an SSEError if the response content type is not text/event-stream is now performed as part of iter_sse() / aiter_sse(), instead of connect_sse() / aconnect_sse(). This allows inspecting the response before iterating on server-sent events, such as checking for error responses. (Pull #12)

    0.2.0 - 2023-03-27

    Changed

    • connect_sse() and aconnect_sse() now require a method argument: connect_sse(client, "GET", "https://example.org"). This provides support for SSE requests with HTTP verbs other than GET. (Pull #7)

    0.1.0 - 2023-02-05

    Initial release

    Added

    • Add connect_sse, aconnect_sse(), ServerSentEvent and SSEError.