Skip to content

API Reference


sse_stream

django_flosse.decorators.sse_stream(_view_func: Optional[Callable] = None, *, retry: Optional[int] = None, permission_classes: Sequence[Type[BaseSSEPermission]] = ()) -> Callable

Decorator that turns a generator view into an SSE endpoint.

Supports both sync and async generator views — detected automatically::

# sync
@sse_stream
def feed(request):
    yield ("update", {"value": 1})

# async — requires ASGI server (Uvicorn, Daphne)
@sse_stream
async def feed(request):
    async for item in my_async_source():
        yield ("update", {"value": item})

Can be used with or without parentheses::

@sse_stream
@sse_stream()
@sse_stream(retry=3000, permission_classes=[IsAuthenticated])

Parameters:

Name Type Description Default
retry Optional[int]

If given, sends a retry: directive (in ms) as the first frame, telling the browser how long to wait before reconnecting.

None
permission_classes Sequence[Type[BaseSSEPermission]]

Sequence of :class:~django_flosse.permissions.BaseSSEPermission classes (not instances). All must pass or the response is HTTP 403.

()
Yield styles
  • str → plain data event
  • (event_name, data) → named event
  • (event_name, data, id) → named event with ID
  • dict with "data" key → mapped to SSEEvent fields
  • dict without "data" → whole dict serialised as JSON
  • :class:~django_flosse.events.SSEEvent → used directly
Heartbeats

django-flosse does not manage heartbeats automatically. Emit a keep-alive ping yourself if behind a proxy::

@sse_stream
async def live_feed(request):
    while True:
        data = await get_new_data()
        if data:
            yield ("update", data)
        else:
            yield SSEEvent(data="", event="ping")
        await asyncio.sleep(5)
Source code in django_flosse/decorators.py
def sse_stream(
    _view_func: Optional[Callable] = None,
    *,
    retry: Optional[int] = None,
    permission_classes: Sequence[Type[BaseSSEPermission]] = (),
) -> Callable:
    """
    Decorator that turns a **generator view** into an SSE endpoint.

    Supports both sync and async generator views — detected automatically::

        # sync
        @sse_stream
        def feed(request):
            yield ("update", {"value": 1})

        # async — requires ASGI server (Uvicorn, Daphne)
        @sse_stream
        async def feed(request):
            async for item in my_async_source():
                yield ("update", {"value": item})

    Can be used with or without parentheses::

        @sse_stream
        @sse_stream()
        @sse_stream(retry=3000, permission_classes=[IsAuthenticated])

    Parameters
    ----------
    retry:
        If given, sends a ``retry:`` directive (in ms) as the first frame,
        telling the browser how long to wait before reconnecting.
    permission_classes:
        Sequence of :class:`~django_flosse.permissions.BaseSSEPermission`
        **classes** (not instances). All must pass or the response is HTTP 403.

    Yield styles
    ------------
    * ``str``                        → plain data event
    * ``(event_name, data)``         → named event
    * ``(event_name, data, id)``     → named event with ID
    * ``dict`` with ``"data"`` key   → mapped to SSEEvent fields
    * ``dict`` without ``"data"``    → whole dict serialised as JSON
    * :class:`~django_flosse.events.SSEEvent` → used directly

    Heartbeats
    ----------
    django-flosse does **not** manage heartbeats automatically.
    Emit a keep-alive ping yourself if behind a proxy::

        @sse_stream
        async def live_feed(request):
            while True:
                data = await get_new_data()
                if data:
                    yield ("update", data)
                else:
                    yield SSEEvent(data="", event="ping")
                await asyncio.sleep(5)
    """

    def decorator(view_func: Callable) -> Callable:
        # -------------------------------------------------------------------- #
        # Async path                                                           #
        # -------------------------------------------------------------------- #
        if inspect.isasyncgenfunction(view_func):

            @wraps(view_func)
            async def awrapper(
                request: HttpRequest, *args: Any, **kwargs: Any
            ) -> HttpResponse:
                # ---------------------------------------------------------------- #
                # 1. Permission checks                                             #
                # ---------------------------------------------------------------- #
                denied = _check_permissions(request, permission_classes)
                if denied:
                    return denied

                # ---------------------------------------------------------------- #
                # 2. Async streaming generator                                     #
                # ---------------------------------------------------------------- #
                async def astream():
                    if retry is not None:
                        yield f"retry: {retry}\n\n"

                    try:
                        async for item in view_func(request, *args, **kwargs):
                            try:
                                yield to_sse(item)
                            except Exception as fmt_err:  # noqa: BLE001
                                logger.warning("SSE format error: %s", fmt_err)
                                yield SSEEvent(
                                    data=str(fmt_err), event="error"
                                ).encode()

                    except Exception as exc:  # noqa: BLE001
                        logger.exception(
                            "SSE async producer raised an exception: %s", exc
                        )
                        yield SSEEvent(data=str(exc), event="error").encode()

                # ---------------------------------------------------------------- #
                # 3. Build response                                                #
                # ---------------------------------------------------------------- #
                return _build_response(astream())

            return awrapper

        # -------------------------------------------------------------------- #
        # Sync path                                                            #
        # -------------------------------------------------------------------- #
        @wraps(view_func)
        def wrapper(request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:

            # ---------------------------------------------------------------- #
            # 1. Permission checks                                             #
            # ---------------------------------------------------------------- #
            denied = _check_permissions(request, permission_classes)
            if denied:
                return denied

            # ---------------------------------------------------------------- #
            # 2. Streaming generator                                           #
            # ---------------------------------------------------------------- #
            def _stream():
                if retry is not None:
                    yield f"retry: {retry}\n\n"

                try:
                    for item in view_func(request, *args, **kwargs):
                        try:
                            yield to_sse(item)
                        except Exception as fmt_err:  # noqa: BLE001
                            logger.warning("SSE format error: %s", fmt_err)
                            yield SSEEvent(data=str(fmt_err), event="error").encode()

                except GeneratorExit:  # pragma: no cover
                    logger.debug("SSE client disconnected from %s.", request.path)

                except Exception as exc:  # noqa: BLE001
                    logger.exception("SSE producer raised an exception: %s", exc)
                    yield SSEEvent(data=str(exc), event="error").encode()

            # ---------------------------------------------------------------- #
            # 3. Build response                                                #
            # ---------------------------------------------------------------- #
            return _build_response(_stream())

        return wrapper

    # Support both @sse_stream and @sse_stream()
    if _view_func is not None:
        return decorator(_view_func)

    return decorator

SSEEvent

django_flosse.events.SSEEvent dataclass

Represents a single Server-Sent Event.

Attributes: data: The payload. Dicts/lists are serialised to JSON automatically. event: Optional named event type (maps to the event: field). id: Optional event ID for client reconnection (maps to id:). retry: Optional reconnection delay in milliseconds (maps to retry:).

Examples::

SSEEvent(data="hello")
SSEEvent(data={"count": 1}, event="update")
SSEEvent(data="done", event="finish", id="42")
Source code in django_flosse/events.py
@dataclass
class SSEEvent:
    """
    Represents a single Server-Sent Event.

    Attributes:
        data:   The payload. Dicts/lists are serialised to JSON automatically.
        event:  Optional named event type (maps to the ``event:`` field).
        id:     Optional event ID for client reconnection (maps to ``id:``).
        retry:  Optional reconnection delay in milliseconds (maps to ``retry:``).

    Examples::

        SSEEvent(data="hello")
        SSEEvent(data={"count": 1}, event="update")
        SSEEvent(data="done", event="finish", id="42")
    """

    data: Any
    event: Optional[str] = None
    id: Optional[str] = None
    retry: Optional[int] = None

    def encode(self) -> str:
        """Serialise to the SSE wire format (ends with a blank line)."""
        lines: list[str] = []

        if self.retry is not None:
            lines.append(f"retry: {self.retry}")

        if self.id is not None:
            lines.append(f"id: {self.id}")

        if self.event is not None:
            lines.append(f"event: {self.event}")

        payload = self.data
        if not isinstance(payload, str):
            payload = json.dumps(payload, ensure_ascii=False)

        # Multi-line data: each line must have its own "data:" prefix.
        for line in payload.splitlines():
            lines.append(f"data: {line}")

        lines.append("")  # Blank line to terminate the event
        return "\n".join(lines) + "\n"

encode() -> str

Serialise to the SSE wire format (ends with a blank line).

Source code in django_flosse/events.py
def encode(self) -> str:
    """Serialise to the SSE wire format (ends with a blank line)."""
    lines: list[str] = []

    if self.retry is not None:
        lines.append(f"retry: {self.retry}")

    if self.id is not None:
        lines.append(f"id: {self.id}")

    if self.event is not None:
        lines.append(f"event: {self.event}")

    payload = self.data
    if not isinstance(payload, str):
        payload = json.dumps(payload, ensure_ascii=False)

    # Multi-line data: each line must have its own "data:" prefix.
    for line in payload.splitlines():
        lines.append(f"data: {line}")

    lines.append("")  # Blank line to terminate the event
    return "\n".join(lines) + "\n"

to_sse

django_flosse.formatters.to_sse(raw: Any) -> str

Convert any value yielded by a view into an SSE-formatted string.

Supported yield styles
  • str → treated as plain data
  • (event, data) → named event with data
  • dict → passed as keyword args to SSEEvent; if no "data" key, the whole dict is the data
  • SSEEvent → encoded as-is

Anything else is coerced to str and used as data.

Source code in django_flosse/formatters.py
def to_sse(raw: Any) -> str:
    """
    Convert any value yielded by a view into an SSE-formatted string.

    Supported yield styles
    ----------------------
    - ``str``              → treated as plain data
    - ``(event, data)``   → named event with data
    - ``dict``            → passed as keyword args to SSEEvent;
                            if no ``"data"`` key, the whole dict is the data
    - ``SSEEvent``        → encoded as-is

    Anything else is coerced to ``str`` and used as data.
    """

    if isinstance(raw, SSEEvent):
        return raw.encode()

    if isinstance(raw, str):
        return SSEEvent(data=raw).encode()

    if isinstance(raw, tuple):
        if len(raw) == 2:
            event_name, data = raw
            return SSEEvent(event=event_name, data=data).encode()

        if len(raw) == 3:
            event_name, data, id_ = raw
            return SSEEvent(event=event_name, data=data, id=str(id_)).encode()

        raise ValueError(
            f"Tuple yields must be (event, data) or (event, data, id), got {len(raw)}-tuple."
        )

    if isinstance(raw, dict):
        if "data" in raw:
            return SSEEvent(
                **{
                    k: v
                    for k, v in raw.items()
                    if k in ("data", "event", "id", "retry")
                }
            ).encode()

        return SSEEvent(data=raw).encode()

    # Fallback
    return SSEEvent(data=str(raw)).encode()

Permissions

django_flosse.permissions.BaseSSEPermission

Minimal permission interface for @sse_stream.

Subclass this and override has_permission to add your own auth logic. Returning False causes the decorator to respond with HTTP 403.

Example::

class LoginRequired(BaseSSEPermission):
    def has_permission(self, request: HttpRequest) -> bool:
        return request.user.is_authenticated
Source code in django_flosse/permissions.py
class BaseSSEPermission:
    """
    Minimal permission interface for ``@sse_stream``.

    Subclass this and override ``has_permission`` to add your own auth logic.
    Returning ``False`` causes the decorator to respond with HTTP 403.

    Example::

        class LoginRequired(BaseSSEPermission):
            def has_permission(self, request: HttpRequest) -> bool:
                return request.user.is_authenticated
    """

    def has_permission(self, request: HttpRequest) -> bool:  # noqa: ARG002
        return True

django_flosse.permissions.AllowAny

Bases: BaseSSEPermission

Allow unrestricted access (default behaviour).

Source code in django_flosse/permissions.py
class AllowAny(BaseSSEPermission):
    """Allow unrestricted access (default behaviour)."""

    def has_permission(self, request: HttpRequest) -> bool:
        return True

django_flosse.permissions.IsAuthenticated

Bases: BaseSSEPermission

Allow access only to authenticated users (session / token auth).

Source code in django_flosse/permissions.py
class IsAuthenticated(BaseSSEPermission):
    """Allow access only to authenticated users (session / token auth)."""

    def has_permission(self, request: HttpRequest) -> bool:
        return bool(getattr(request, "user", None) and request.user.is_authenticated)

django_flosse.permissions.IsAdminUser

Bases: BaseSSEPermission

Allow access only to staff / superusers.

Source code in django_flosse/permissions.py
class IsAdminUser(BaseSSEPermission):
    """Allow access only to staff / superusers."""

    def has_permission(self, request: HttpRequest) -> bool:
        return bool(
            getattr(request, "user", None)
            and request.user.is_authenticated
            and request.user.is_staff
        )

Exceptions

django_flosse.exceptions.SSEClientDisconnected

Bases: Exception

Raised internally when the client closes the connection.

Source code in django_flosse/exceptions.py
class SSEClientDisconnected(Exception):
    """Raised internally when the client closes the connection."""

django_flosse.exceptions.SSEPermissionDenied

Bases: Exception

Raised when a permission check fails.

Source code in django_flosse/exceptions.py
class SSEPermissionDenied(Exception):
    """Raised when a permission check fails."""

django_flosse.exceptions.SSEYieldError

Bases: ValueError

Raised when a view yields a value that cannot be converted to an SSE event.

Source code in django_flosse/exceptions.py
class SSEYieldError(ValueError):
    """Raised when a view yields a value that cannot be converted to an SSE event."""