httpx-sse0.4.0
Published
Consume Server-Sent Event (SSE) messages with HTTPX.
pip install httpx-sse
Package Downloads
Authors
Project URLs
Requires Python
>=3.8
Dependencies
httpx-sse
Consume Server-Sent Event (SSE) messages with HTTPX.
Table of contents
Installation
NOTE: This is beta software. Please be sure to pin your dependencies.
pip install httpx-sse=="0.4.*"
Quickstart
httpx-sse
provides the connect_sse
and aconnect_sse
helpers for connecting to an SSE endpoint. The resulting EventSource
object exposes the .iter_sse()
and .aiter_sse()
methods to iterate over the server-sent events.
Example usage:
import httpx
from httpx_sse import connect_sse
with httpx.Client() as client:
with connect_sse(client, "GET", "http://localhost:8000/sse") as event_source:
for sse in event_source.iter_sse():
print(sse.event, sse.data, sse.id, sse.retry)
You can try this against this example Starlette server (credit):
# Requirements: pip install uvicorn starlette sse-starlette
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse
async def numbers(minimum, maximum):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.9)
yield {"data": i}
async def sse(request):
generator = numbers(1, 5)
return EventSourceResponse(generator)
routes = [
Route("/sse", endpoint=sse)
]
app = Starlette(routes=routes)
if __name__ == "__main__":
uvicorn.run(app)
How-To
Calling into Python web apps
You can call into Python web apps with HTTPX and httpx-sse
to test SSE endpoints directly.
Here's an example of calling into a Starlette ASGI app...
import asyncio
import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
async def auth_events(request):
async def events():
yield {
"event": "login",
"data": '{"user_id": "4135"}',
}
return EventSourceResponse(events())
app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])
async def main():
async with httpx.AsyncClient(app=app) as client:
async with aconnect_sse(
client, "GET", "http://localhost:8000/sse/auth/"
) as event_source:
events = [sse async for sse in event_source.aiter_sse()]
(sse,) = events
assert sse.event == "login"
assert sse.json() == {"user_id": "4135"}
asyncio.run(main())
Handling reconnections
(Advanced)
SSETransport
and AsyncSSETransport
don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an httpx.ReadError
from iter_sse()
(or aiter_sse()
).
However, httpx-sse
does allow implementing reconnection by using the Last-Event-ID
and reconnection time (in milliseconds), exposed as sse.id
and sse.retry
respectively.
Here's how you might achieve this using stamina
...
import time
from typing import Iterator
import httpx
from httpx_sse import connect_sse, ServerSentEvent
from stamina import retry
def iter_sse_retrying(client, method, url):
last_event_id = ""
reconnection_delay = 0.0
# `stamina` will apply jitter and exponential backoff on top of
# the `retry` reconnection delay sent by the server.
@retry(on=httpx.ReadError)
def _iter_sse():
nonlocal last_event_id, reconnection_delay
time.sleep(reconnection_delay)
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
with connect_sse(client, method, url, headers=headers) as event_source:
for sse in event_source.iter_sse():
last_event_id = sse.id
if sse.retry is not None:
reconnection_delay = sse.retry / 1000
yield sse
return _iter_sse()
Usage:
with httpx.Client() as client:
for sse in iter_sse_retrying(client, "GET", "http://localhost:8000/sse"):
print(sse.event, sse.data)
API Reference
connect_sse
def connect_sse(
client: httpx.Client,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> ContextManager[EventSource]
Connect to an SSE endpoint and return an EventSource
context manager.
This sets Cache-Control: no-store
on the request, as per the SSE spec, as well as Accept: text/event-stream
.
If the response Content-Type
is not text/event-stream
, this will raise an SSEError
.
aconnect_sse
async def aconnect_sse(
client: httpx.AsyncClient,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> AsyncContextManager[EventSource]
An async equivalent to connect_sse
.
EventSource
def __init__(response: httpx.Response)
Helper for working with an SSE response.
response
The underlying httpx.Response
.
iter_sse
def iter_sse() -> Iterator[ServerSentEvent]
Decode the response content and yield corresponding ServerSentEvent
.
Example usage:
for sse in event_source.iter_sse():
...
aiter_sse
async def iter_sse() -> AsyncIterator[ServerSentEvent]
An async equivalent to iter_sse
.
ServerSentEvent
Represents a server-sent event.
event: str
- Defaults to"message"
.data: str
- Defaults to""
.id: str
- Defaults to""
.retry: str | None
- Defaults toNone
.
Methods:
json() -> Any
- Returnssse.data
decoded as JSON.
SSEError
An error that occurred while making a request to an SSE endpoint.
Parents:
httpx.TransportError
License
MIT
Changelog
All notable changes to this project will be documented in this file.
The format is based on Keep a Changelog.
0.4.0 - 2023-12-22
Removed
- Dropped Python 3.7 support, as it has reached EOL. (Pull #21)
Added
- Add official support for Python 3.12. (Pull #21)
Fixed
- Allow
Content-Type
that contain but are not strictlytext/event-stream
. (Pull #22 by @dbuades) - Improve error message when
Content-Type
is missing. (Pull #20 by @jamesbraza)
0.3.1 - 2023-06-01
Added
- Add
__repr__()
forServerSentEvent
model, which may help with debugging and other tasks. (Pull #16)
0.3.0 - 2023-04-27
Changed
- Raising an
SSEError
if the response content type is nottext/event-stream
is now performed as part ofiter_sse()
/aiter_sse()
, instead ofconnect_sse()
/aconnect_sse()
. This allows inspecting the response before iterating on server-sent events, such as checking for error responses. (Pull #12)
0.2.0 - 2023-03-27
Changed
connect_sse()
andaconnect_sse()
now require amethod
argument:connect_sse(client, "GET", "https://example.org")
. This provides support for SSE requests with HTTP verbs other thanGET
. (Pull #7)
0.1.0 - 2023-02-05
Initial release
Added
- Add
connect_sse
,aconnect_sse()
,ServerSentEvent
andSSEError
.