SSE Client
The Server-Sent Events (SSE) client allows consuming a stream of events over a single HTTP connection.
This module exposes the high-level aiosonic.SSEClient API as well as low-level connection objects
used internally. The client is designed to be used with async context managers and async iteration:
Key features
Connect to standard SSE endpoints that return
text/event-stream.Automatic reconnection support with configurable retry delay and Last-Event-ID handling.
Event parsing compatible with the SSE specification (
data,event,id,retryfields).Deduplication of events by
idand by last yieldeddatato reduce duplicate deliveries after reconnects.
Usage overview
Typical usage follows one of two patterns:
Await the connect awaitable to obtain an
SSEConnectionand iterate over events:
from aiosonic import SSEClient
import asyncio
async def main():
client = SSEClient()
sse_conn = await client.connect("https://example.com/stream")
async for event in sse_conn:
print(event["event"], event["data"]) # event is a dict with keys: data, event, id, retry
await sse_conn.close()
asyncio.run(main())
Use nested async context managers (recommended) for automatic cleanup:
async with SSEClient() as client:
async with client.connect("https://example.com/stream") as sse_conn:
async for event in sse_conn:
print(event["data"]) # handle events
Parameters and configuration
The aiosonic.SSEClient.connect() method accepts the same general parameters as the HTTP client, with
additional SSE-specific options:
reconnect(bool): automatically reconnect when the server closes the stream. Default:True.retry_delay(int): milliseconds to wait between reconnect attempts. Default:3000(3 seconds).keep_connection(bool): ifTruethe underlying TCP connection will be kept open even after the SSE stream ends. This is experimental and may be unsafe with some servers.
Event parsing and semantics
Each yielded event is a mapping with the following keys:
data(str): the combineddatalines for the event (newlines preserved).event(Optional[str]): the event type if provided by the server.id(Optional[str]): the event id. Used for deduplication and Last-Event-ID when reconnecting.retry(Optional[int]): if set, indicates the server-suggested retry interval in milliseconds.
The implementation follows the SSE spec for line parsing. Malformed lines or invalid retry values will
raise aiosonic.exceptions.SSEParsingError.
Error handling and reconnection
Initial connection failures raise
aiosonic.exceptions.SSEConnectionErrorunlessreconnectis enabled, in which case the client will retry using the configuredretry_delay.While iterating, transient socket or parse errors will either trigger reconnection (if enabled) or raise
aiosonic.exceptions.SSEConnectionError.When reconnecting, the client will include the last seen event id in the
Last-Event-IDheader, and deduplicate events by id and by the last yieldeddatato minimize duplicate deliveries.
Examples
OpenAI-style streaming with a POST and JSON body:
async with SSEClient() as client:
async with client.connect(
"https://api.openai.com/v1/chat/completions",
method="POST",
json={"model": "gpt-4", "messages": [...], "stream": True},
headers={"Authorization": "Bearer <token>"},
) as sse_conn:
async for event in sse_conn:
# event['data'] contains the partial JSON payloads
print(event['data'])
Notes and tips
If you need precise control over reconnection or want to implement custom retry backoff, set
reconnect=Falseand handle reconnection logic in your application.For idempotent processing across restarts, persist the last-seen event id reported in
event['id'].
- class aiosonic.sse_client.SSEClient(http_client: HTTPClient | None = None)
Bases:
objectSSE client for aiosonic.
- Examples:
Basic usage with async context manager:
>>> import asyncio >>> from aiosonic import SSEClient >>> >>> async def main(): ... async with SSEClient() as client: ... async with client.connect("http://example.com/sse") as sse_conn: ... async for event in sse_conn: ... print(f"Event: {event['event']}, Data: {event['data']}") ... if event['data'] == 'stop': ... break >>> >>> asyncio.run(main())
POST request with JSON body (OpenAI-style streaming):
>>> async def main(): ... async with SSEClient() as client: ... async with client.connect( ... "https://api.openai.com/v1/chat/completions", ... method="POST", ... json={"model": "gpt-4", "messages": [...], "stream": True}, ... headers={"Authorization": "Bearer token"} ... ) as sse_conn: ... async for event in sse_conn: ... print(event['data']) >>> >>> asyncio.run(main())
- connect(url: str, method: str = 'GET', headers: ~typing.Dict[str, str] | ~typing.List[~typing.Tuple[str, str]] | ~aiosonic.client.HttpHeaders | None = None, params: ~typing.Dict[str, str] | ~typing.Sequence[~typing.Tuple[str, str]] | None = None, data: str | bytes | dict | tuple | ~typing.AsyncIterator[bytes] | ~typing.Iterator[bytes] | ~aiosonic.multipart.MultipartForm | None = None, json: dict | list | None = None, multipart: bool = False, verify: bool = True, ssl: ~ssl.SSLContext | None = None, timeouts: ~aiosonic.timeout.Timeouts | None = None, follow: bool = False, http2: bool = False, reconnect: bool = True, retry_delay: int = 3000, json_serializer=<function dumps>, keep_connection: bool = False) _ConnectAwaitable
Return an awaitable/async-context-manager that establishes an SSE connection.
- Args:
url: The URL to connect to method: HTTP method to use (GET, POST, PUT, PATCH, DELETE, etc.) headers: HTTP headers to send params: Query parameters to include in the URL data: Request body data json: JSON data to send as request body multipart: Whether to send data as multipart form verify: Whether to verify SSL certificates ssl: Custom SSL context timeouts: Request timeout settings follow: Whether to follow redirects http2: Whether to use HTTP/2 reconnect: Whether to automatically reconnect on connection loss retry_delay: Delay between reconnection attempts in milliseconds json_serializer: Custom JSON serializer function keep_connection: Whether to keep the connection open after the SSE stream ends
(experimental)
- Usage:
sse_conn = await client.connect(url, method=”POST”, json=data)
async with client.connect(url, method=”POST”, json=data) as sse_conn:
- class aiosonic.sse_client.SSEConnection(response, client: HTTPClient, config: RequestConfig)
Bases:
objectManages an active SSE connection.
- async close()
Close the connection.