SSE Client

The Server-Sent Events (SSE) client allows consuming a stream of events over a single HTTP connection.

This module exposes the high-level aiosonic.SSEClient API as well as low-level connection objects used internally. The client is designed to be used with async context managers and async iteration:

Key features

  • Connect to standard SSE endpoints that return text/event-stream.

  • Automatic reconnection support with configurable retry delay and Last-Event-ID handling.

  • Event parsing compatible with the SSE specification (data, event, id, retry fields).

  • Deduplication of events by id and by last yielded data to reduce duplicate deliveries after reconnects.

Usage overview

Typical usage follows one of two patterns:

  • Await the connect awaitable to obtain an SSEConnection and iterate over events:

from aiosonic import SSEClient
import asyncio

async def main():
    client = SSEClient()
    sse_conn = await client.connect("https://example.com/stream")
    async for event in sse_conn:
        print(event["event"], event["data"])  # event is a dict with keys: data, event, id, retry

    await sse_conn.close()

asyncio.run(main())
  • Use nested async context managers (recommended) for automatic cleanup:

async with SSEClient() as client:
    async with client.connect("https://example.com/stream") as sse_conn:
        async for event in sse_conn:
            print(event["data"])  # handle events

Parameters and configuration

The aiosonic.SSEClient.connect() method accepts the same general parameters as the HTTP client, with additional SSE-specific options:

  • reconnect (bool): automatically reconnect when the server closes the stream. Default: True.

  • retry_delay (int): milliseconds to wait between reconnect attempts. Default: 3000 (3 seconds).

  • keep_connection (bool): if True the underlying TCP connection will be kept open even after the SSE stream ends. This is experimental and may be unsafe with some servers.

Event parsing and semantics

Each yielded event is a mapping with the following keys:

  • data (str): the combined data lines for the event (newlines preserved).

  • event (Optional[str]): the event type if provided by the server.

  • id (Optional[str]): the event id. Used for deduplication and Last-Event-ID when reconnecting.

  • retry (Optional[int]): if set, indicates the server-suggested retry interval in milliseconds.

The implementation follows the SSE spec for line parsing. Malformed lines or invalid retry values will raise aiosonic.exceptions.SSEParsingError.

Error handling and reconnection

  • Initial connection failures raise aiosonic.exceptions.SSEConnectionError unless reconnect is enabled, in which case the client will retry using the configured retry_delay.

  • While iterating, transient socket or parse errors will either trigger reconnection (if enabled) or raise aiosonic.exceptions.SSEConnectionError.

  • When reconnecting, the client will include the last seen event id in the Last-Event-ID header, and deduplicate events by id and by the last yielded data to minimize duplicate deliveries.

Examples

OpenAI-style streaming with a POST and JSON body:

async with SSEClient() as client:
    async with client.connect(
        "https://api.openai.com/v1/chat/completions",
        method="POST",
        json={"model": "gpt-4", "messages": [...], "stream": True},
        headers={"Authorization": "Bearer <token>"},
    ) as sse_conn:
        async for event in sse_conn:
            # event['data'] contains the partial JSON payloads
            print(event['data'])

Notes and tips

  • If you need precise control over reconnection or want to implement custom retry backoff, set reconnect=False and handle reconnection logic in your application.

  • For idempotent processing across restarts, persist the last-seen event id reported in event['id'].

class aiosonic.sse_client.SSEClient(http_client: HTTPClient | None = None)

Bases: object

SSE client for aiosonic.

Examples:

Basic usage with async context manager:

>>> import asyncio
>>> from aiosonic import SSEClient
>>>
>>> async def main():
...     async with SSEClient() as client:
...         async with client.connect("http://example.com/sse") as sse_conn:
...             async for event in sse_conn:
...                 print(f"Event: {event['event']}, Data: {event['data']}")
...                 if event['data'] == 'stop':
...                     break
>>>
>>> asyncio.run(main())

POST request with JSON body (OpenAI-style streaming):

>>> async def main():
...     async with SSEClient() as client:
...         async with client.connect(
...             "https://api.openai.com/v1/chat/completions",
...             method="POST",
...             json={"model": "gpt-4", "messages": [...], "stream": True},
...             headers={"Authorization": "Bearer token"}
...         ) as sse_conn:
...             async for event in sse_conn:
...                 print(event['data'])
>>>
>>> asyncio.run(main())
connect(url: str, method: str = 'GET', headers: ~typing.Dict[str, str] | ~typing.List[~typing.Tuple[str, str]] | ~aiosonic.client.HttpHeaders | None = None, params: ~typing.Dict[str, str] | ~typing.Sequence[~typing.Tuple[str, str]] | None = None, data: str | bytes | dict | tuple | ~typing.AsyncIterator[bytes] | ~typing.Iterator[bytes] | ~aiosonic.multipart.MultipartForm | None = None, json: dict | list | None = None, multipart: bool = False, verify: bool = True, ssl: ~ssl.SSLContext | None = None, timeouts: ~aiosonic.timeout.Timeouts | None = None, follow: bool = False, http2: bool = False, reconnect: bool = True, retry_delay: int = 3000, json_serializer=<function dumps>, keep_connection: bool = False) _ConnectAwaitable

Return an awaitable/async-context-manager that establishes an SSE connection.

Args:

url: The URL to connect to method: HTTP method to use (GET, POST, PUT, PATCH, DELETE, etc.) headers: HTTP headers to send params: Query parameters to include in the URL data: Request body data json: JSON data to send as request body multipart: Whether to send data as multipart form verify: Whether to verify SSL certificates ssl: Custom SSL context timeouts: Request timeout settings follow: Whether to follow redirects http2: Whether to use HTTP/2 reconnect: Whether to automatically reconnect on connection loss retry_delay: Delay between reconnection attempts in milliseconds json_serializer: Custom JSON serializer function keep_connection: Whether to keep the connection open after the SSE stream ends

(experimental)

Usage:
  • sse_conn = await client.connect(url, method=”POST”, json=data)

  • async with client.connect(url, method=”POST”, json=data) as sse_conn:

class aiosonic.sse_client.SSEConnection(response, client: HTTPClient, config: RequestConfig)

Bases: object

Manages an active SSE connection.

async close()

Close the connection.