summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/response/streaming.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/response/streaming.py')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/response/streaming.py239
1 files changed, 0 insertions, 239 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/response/streaming.py b/venv/lib/python3.11/site-packages/litestar/response/streaming.py
deleted file mode 100644
index fc76522..0000000
--- a/venv/lib/python3.11/site-packages/litestar/response/streaming.py
+++ /dev/null
@@ -1,239 +0,0 @@
-from __future__ import annotations
-
-import itertools
-from functools import partial
-from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterable, AsyncIterator, Callable, Iterable, Iterator, Union
-
-from anyio import CancelScope, create_task_group
-
-from litestar.enums import MediaType
-from litestar.response.base import ASGIResponse, Response
-from litestar.types.helper_types import StreamType
-from litestar.utils.deprecation import warn_deprecation
-from litestar.utils.helpers import get_enum_string_value
-from litestar.utils.sync import AsyncIteratorWrapper
-
-if TYPE_CHECKING:
- from litestar.app import Litestar
- from litestar.background_tasks import BackgroundTask, BackgroundTasks
- from litestar.connection import Request
- from litestar.datastructures.cookie import Cookie
- from litestar.enums import OpenAPIMediaType
- from litestar.types import HTTPResponseBodyEvent, Receive, ResponseCookies, ResponseHeaders, Send, TypeEncodersMap
-
-__all__ = (
- "ASGIStreamingResponse",
- "Stream",
-)
-
-
-class ASGIStreamingResponse(ASGIResponse):
- """A streaming response."""
-
- __slots__ = ("iterator",)
-
- _should_set_content_length = False
-
- def __init__(
- self,
- *,
- iterator: StreamType,
- background: BackgroundTask | BackgroundTasks | None = None,
- body: bytes | str = b"",
- content_length: int | None = None,
- cookies: Iterable[Cookie] | None = None,
- encoded_headers: Iterable[tuple[bytes, bytes]] | None = None,
- encoding: str = "utf-8",
- headers: dict[str, Any] | None = None,
- is_head_response: bool = False,
- media_type: MediaType | str | None = None,
- status_code: int | None = None,
- ) -> None:
- """A low-level ASGI streaming response.
-
- Args:
- background: A background task or a list of background tasks to be executed after the response is sent.
- body: encoded content to send in the response body.
- content_length: The response content length.
- cookies: The response cookies.
- encoded_headers: The response headers.
- encoding: The response encoding.
- headers: The response headers.
- is_head_response: A boolean indicating if the response is a HEAD response.
- iterator: An async iterator or iterable.
- media_type: The response media type.
- status_code: The response status code.
- """
- super().__init__(
- background=background,
- body=body,
- content_length=content_length,
- cookies=cookies,
- encoding=encoding,
- headers=headers,
- is_head_response=is_head_response,
- media_type=media_type,
- status_code=status_code,
- encoded_headers=encoded_headers,
- )
- self.iterator: AsyncIterable[str | bytes] | AsyncGenerator[str | bytes, None] = (
- iterator if isinstance(iterator, (AsyncIterable, AsyncIterator)) else AsyncIteratorWrapper(iterator)
- )
-
- async def _listen_for_disconnect(self, cancel_scope: CancelScope, receive: Receive) -> None:
- """Listen for a cancellation message, and if received - call cancel on the cancel scope.
-
- Args:
- cancel_scope: A task group cancel scope instance.
- receive: The ASGI receive function.
-
- Returns:
- None
- """
- if not cancel_scope.cancel_called:
- message = await receive()
- if message["type"] == "http.disconnect":
- # despite the IDE warning, this is not a coroutine because anyio 3+ changed this.
- # therefore make sure not to await this.
- cancel_scope.cancel()
- else:
- await self._listen_for_disconnect(cancel_scope=cancel_scope, receive=receive)
-
- async def _stream(self, send: Send) -> None:
- """Send the chunks from the iterator as a stream of ASGI 'http.response.body' events.
-
- Args:
- send: The ASGI Send function.
-
- Returns:
- None
- """
- async for chunk in self.iterator:
- stream_event: HTTPResponseBodyEvent = {
- "type": "http.response.body",
- "body": chunk if isinstance(chunk, bytes) else chunk.encode(self.encoding),
- "more_body": True,
- }
- await send(stream_event)
- terminus_event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": b"", "more_body": False}
- await send(terminus_event)
-
- async def send_body(self, send: Send, receive: Receive) -> None:
- """Emit a stream of events correlating with the response body.
-
- Args:
- send: The ASGI send function.
- receive: The ASGI receive function.
-
- Returns:
- None
- """
-
- async with create_task_group() as task_group:
- task_group.start_soon(partial(self._stream, send))
- await self._listen_for_disconnect(cancel_scope=task_group.cancel_scope, receive=receive)
-
-
-class Stream(Response[StreamType[Union[str, bytes]]]):
- """An HTTP response that streams the response data as a series of ASGI ``http.response.body`` events."""
-
- __slots__ = ("iterator",)
-
- def __init__(
- self,
- content: StreamType[str | bytes] | Callable[[], StreamType[str | bytes]],
- *,
- background: BackgroundTask | BackgroundTasks | None = None,
- cookies: ResponseCookies | None = None,
- encoding: str = "utf-8",
- headers: ResponseHeaders | None = None,
- media_type: MediaType | OpenAPIMediaType | str | None = None,
- status_code: int | None = None,
- ) -> None:
- """Initialize the response.
-
- Args:
- content: A sync or async iterator or iterable.
- background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or
- :class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished.
- Defaults to None.
- cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response
- ``Set-Cookie`` header.
- encoding: The encoding to be used for the response headers.
- headers: A string keyed dictionary of response headers. Header keys are insensitive.
- media_type: A value for the response ``Content-Type`` header.
- status_code: An HTTP status code.
- """
- super().__init__(
- background=background,
- content=b"", # type: ignore[arg-type]
- cookies=cookies,
- encoding=encoding,
- headers=headers,
- media_type=media_type,
- status_code=status_code,
- )
- self.iterator = content
-
- def to_asgi_response(
- self,
- app: Litestar | None,
- request: Request,
- *,
- background: BackgroundTask | BackgroundTasks | None = None,
- cookies: Iterable[Cookie] | None = None,
- encoded_headers: Iterable[tuple[bytes, bytes]] | None = None,
- headers: dict[str, str] | None = None,
- is_head_response: bool = False,
- media_type: MediaType | str | None = None,
- status_code: int | None = None,
- type_encoders: TypeEncodersMap | None = None,
- ) -> ASGIResponse:
- """Create an ASGIStreamingResponse from a StremaingResponse instance.
-
- Args:
- app: The :class:`Litestar <.app.Litestar>` application instance.
- background: Background task(s) to be executed after the response is sent.
- cookies: A list of cookies to be set on the response.
- encoded_headers: A list of already encoded headers.
- headers: Additional headers to be merged with the response headers. Response headers take precedence.
- is_head_response: Whether the response is a HEAD response.
- media_type: Media type for the response. If ``media_type`` is already set on the response, this is ignored.
- request: The :class:`Request <.connection.Request>` instance.
- status_code: Status code for the response. If ``status_code`` is already set on the response, this is
- type_encoders: A dictionary of type encoders to use for encoding the response content.
-
- Returns:
- An ASGIStreamingResponse instance.
- """
- if app is not None:
- warn_deprecation(
- version="2.1",
- deprecated_name="app",
- kind="parameter",
- removal_in="3.0.0",
- alternative="request.app",
- )
-
- headers = {**headers, **self.headers} if headers is not None else self.headers
- cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies)
-
- media_type = get_enum_string_value(media_type or self.media_type or MediaType.JSON)
-
- iterator = self.iterator
- if not isinstance(iterator, (Iterable, Iterator, AsyncIterable, AsyncIterator)) and callable(iterator):
- iterator = iterator()
-
- return ASGIStreamingResponse(
- background=self.background or background,
- body=b"",
- content_length=0,
- cookies=cookies,
- encoded_headers=encoded_headers,
- encoding=self.encoding,
- headers=headers,
- is_head_response=is_head_response,
- iterator=iterator,
- media_type=media_type,
- status_code=self.status_code or status_code,
- )