diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/response')
14 files changed, 1611 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/response/__init__.py b/venv/lib/python3.11/site-packages/litestar/response/__init__.py new file mode 100644 index 0000000..c655758 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/__init__.py @@ -0,0 +1,16 @@ +from .base import Response +from .file import File +from .redirect import Redirect +from .sse import ServerSentEvent, ServerSentEventMessage +from .streaming import Stream +from .template import Template + +__all__ = ( + "File", + "Redirect", + "Response", + "ServerSentEvent", + "ServerSentEventMessage", + "Stream", + "Template", +) diff --git a/venv/lib/python3.11/site-packages/litestar/response/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..a806282 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/response/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/base.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..885cbe8 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/base.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/response/__pycache__/file.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/file.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..23c09f2 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/file.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/response/__pycache__/redirect.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/redirect.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..d776095 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/redirect.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/response/__pycache__/sse.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/sse.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..2a03883 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/sse.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/response/__pycache__/streaming.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/streaming.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..350cf9e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/streaming.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/response/__pycache__/template.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/template.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..91dde51 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/__pycache__/template.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/response/base.py b/venv/lib/python3.11/site-packages/litestar/response/base.py new file mode 100644 index 0000000..67eec09 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/base.py @@ -0,0 +1,459 @@ +from __future__ import annotations + +import itertools +import re +from typing import TYPE_CHECKING, Any, ClassVar, Generic, Iterable, Literal, Mapping, TypeVar, overload + +from litestar.datastructures.cookie import Cookie +from litestar.datastructures.headers import ETag, MutableScopeHeaders +from litestar.enums import MediaType, OpenAPIMediaType +from litestar.exceptions import ImproperlyConfiguredException +from litestar.serialization import default_serializer, encode_json, encode_msgpack, get_serializer +from litestar.status_codes import HTTP_200_OK, HTTP_204_NO_CONTENT, HTTP_304_NOT_MODIFIED +from litestar.types.empty import Empty +from litestar.utils.deprecation import deprecated, warn_deprecation +from litestar.utils.helpers import get_enum_string_value + +if TYPE_CHECKING: + from typing import Optional + + from litestar.app import Litestar + from litestar.background_tasks import BackgroundTask, BackgroundTasks + from litestar.connection import Request + from litestar.types import ( + HTTPResponseBodyEvent, + HTTPResponseStartEvent, + Receive, + ResponseCookies, + ResponseHeaders, + Scope, + Send, + Serializer, + TypeEncodersMap, + ) + +__all__ = ("ASGIResponse", "Response") + +T = TypeVar("T") + +MEDIA_TYPE_APPLICATION_JSON_PATTERN = re.compile(r"^application/(?:.+\+)?json") + + +class ASGIResponse: + """A low-level ASGI response class.""" + + __slots__ = ( + "background", + "body", + "content_length", + "encoding", + "is_head_response", + "status_code", + "_encoded_cookies", + "headers", + ) + + _should_set_content_length: ClassVar[bool] = True + """A flag to indicate whether the content-length header should be set by default or not.""" + + def __init__( + self, + *, + background: BackgroundTask | BackgroundTasks | None = None, + body: bytes | str = b"", + content_length: int | None = None, + cookies: Iterable[Cookie] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + encoding: str = "utf-8", + headers: dict[str, Any] | Iterable[tuple[str, str]] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + status_code: int | None = None, + ) -> None: + """A low-level ASGI response class. + + Args: + background: A background task or a list of background tasks to be executed after the response is sent. + body: encoded content to send in the response body. + content_length: The response content length. + cookies: The response cookies. + encoded_headers: The response headers. + encoding: The response encoding. + headers: The response headers. + is_head_response: A boolean indicating if the response is a HEAD response. + media_type: The response media type. + status_code: The response status code. + """ + body = body.encode() if isinstance(body, str) else body + status_code = status_code or HTTP_200_OK + self.headers = MutableScopeHeaders() + + if encoded_headers is not None: + warn_deprecation("3.0", kind="parameter", deprecated_name="encoded_headers", alternative="headers") + for header_name, header_value in encoded_headers: + self.headers.add(header_name.decode("latin-1"), header_value.decode("latin-1")) + + if headers is not None: + for k, v in headers.items() if isinstance(headers, dict) else headers: + self.headers.add(k, v) # pyright: ignore + + media_type = get_enum_string_value(media_type or MediaType.JSON) + + status_allows_body = ( + status_code not in {HTTP_204_NO_CONTENT, HTTP_304_NOT_MODIFIED} and status_code >= HTTP_200_OK + ) + + if content_length is None: + content_length = len(body) + + if not status_allows_body or is_head_response: + if body and body != b"null": + raise ImproperlyConfiguredException( + "response content is not supported for HEAD responses and responses with a status code " + "that does not allow content (304, 204, < 200)" + ) + body = b"" + else: + self.headers.setdefault( + "content-type", (f"{media_type}; charset={encoding}" if media_type.startswith("text/") else media_type) + ) + + if self._should_set_content_length: + self.headers.setdefault("content-length", str(content_length)) + + self.background = background + self.body = body + self.content_length = content_length + self._encoded_cookies = tuple( + cookie.to_encoded_header() for cookie in (cookies or ()) if not cookie.documentation_only + ) + self.encoding = encoding + self.is_head_response = is_head_response + self.status_code = status_code + + @property + @deprecated("3.0", kind="property", alternative="encode_headers()") + def encoded_headers(self) -> list[tuple[bytes, bytes]]: + return self.encode_headers() + + def encode_headers(self) -> list[tuple[bytes, bytes]]: + return [*self.headers.headers, *self._encoded_cookies] + + async def after_response(self) -> None: + """Execute after the response is sent. + + Returns: + None + """ + if self.background is not None: + await self.background() + + async def start_response(self, send: Send) -> None: + """Emit the start event of the response. This event includes the headers and status codes. + + Args: + send: The ASGI send function. + + Returns: + None + """ + event: HTTPResponseStartEvent = { + "type": "http.response.start", + "status": self.status_code, + "headers": self.encode_headers(), + } + await send(event) + + async def send_body(self, send: Send, receive: Receive) -> None: + """Emit the response body. + + Args: + send: The ASGI send function. + receive: The ASGI receive function. + + Notes: + - Response subclasses should customize this method if there is a need to customize sending data. + + Returns: + None + """ + event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": self.body, "more_body": False} + await send(event) + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + """ASGI callable of the ``Response``. + + Args: + scope: The ASGI connection scope. + receive: The ASGI receive function. + send: The ASGI send function. + + Returns: + None + """ + await self.start_response(send=send) + + if self.is_head_response: + event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": b"", "more_body": False} + await send(event) + else: + await self.send_body(send=send, receive=receive) + + await self.after_response() + + +class Response(Generic[T]): + """Base Litestar HTTP response class, used as the basis for all other response classes.""" + + __slots__ = ( + "background", + "content", + "cookies", + "encoding", + "headers", + "media_type", + "status_code", + "response_type_encoders", + ) + + content: T + type_encoders: Optional[TypeEncodersMap] = None # noqa: UP007 + + def __init__( + self, + content: T, + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: ResponseCookies | None = None, + encoding: str = "utf-8", + headers: ResponseHeaders | None = None, + media_type: MediaType | OpenAPIMediaType | str | None = None, + status_code: int | None = None, + type_encoders: TypeEncodersMap | None = None, + ) -> None: + """Initialize the response. + + Args: + content: A value for the response body that will be rendered into bytes string. + status_code: An HTTP status code. + media_type: A value for the response ``Content-Type`` header. + background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or + :class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished. + Defaults to ``None``. + headers: A string keyed dictionary of response headers. Header keys are insensitive. + cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response + ``Set-Cookie`` header. + encoding: The encoding to be used for the response headers. + type_encoders: A mapping of types to callables that transform them into types supported for serialization. + """ + self.content = content + self.background = background + self.cookies: list[Cookie] = ( + [Cookie(key=key, value=value) for key, value in cookies.items()] + if isinstance(cookies, Mapping) + else list(cookies or []) + ) + self.encoding = encoding + self.headers: dict[str, Any] = ( + dict(headers) if isinstance(headers, Mapping) else {h.name: h.value for h in headers or {}} + ) + self.media_type = media_type + self.status_code = status_code + self.response_type_encoders = {**(self.type_encoders or {}), **(type_encoders or {})} + + @overload + def set_cookie(self, /, cookie: Cookie) -> None: ... + + @overload + def set_cookie( + self, + key: str, + value: str | None = None, + max_age: int | None = None, + expires: int | None = None, + path: str = "/", + domain: str | None = None, + secure: bool = False, + httponly: bool = False, + samesite: Literal["lax", "strict", "none"] = "lax", + ) -> None: ... + + def set_cookie( # type: ignore[misc] + self, + key: str | Cookie, + value: str | None = None, + max_age: int | None = None, + expires: int | None = None, + path: str = "/", + domain: str | None = None, + secure: bool = False, + httponly: bool = False, + samesite: Literal["lax", "strict", "none"] = "lax", + ) -> None: + """Set a cookie on the response. If passed a :class:`Cookie <.datastructures.Cookie>` instance, keyword + arguments will be ignored. + + Args: + key: Key for the cookie or a :class:`Cookie <.datastructures.Cookie>` instance. + value: Value for the cookie, if none given defaults to empty string. + max_age: Maximal age of the cookie before its invalidated. + expires: Seconds from now until the cookie expires. + path: Path fragment that must exist in the request url for the cookie to be valid. Defaults to ``/``. + domain: Domain for which the cookie is valid. + secure: Https is required for the cookie. + httponly: Forbids javascript to access the cookie via ``document.cookie``. + samesite: Controls whether a cookie is sent with cross-site requests. Defaults to ``lax``. + + Returns: + None. + """ + if not isinstance(key, Cookie): + key = Cookie( + domain=domain, + expires=expires, + httponly=httponly, + key=key, + max_age=max_age, + path=path, + samesite=samesite, + secure=secure, + value=value, + ) + self.cookies.append(key) + + def set_header(self, key: str, value: Any) -> None: + """Set a header on the response. + + Args: + key: Header key. + value: Header value. + + Returns: + None. + """ + self.headers[key] = value + + def set_etag(self, etag: str | ETag) -> None: + """Set an etag header. + + Args: + etag: An etag value. + + Returns: + None + """ + self.headers["etag"] = etag.to_header() if isinstance(etag, ETag) else etag + + def delete_cookie( + self, + key: str, + path: str = "/", + domain: str | None = None, + ) -> None: + """Delete a cookie. + + Args: + key: Key of the cookie. + path: Path of the cookie. + domain: Domain of the cookie. + + Returns: + None. + """ + cookie = Cookie(key=key, path=path, domain=domain, expires=0, max_age=0) + self.cookies = [c for c in self.cookies if c != cookie] + self.cookies.append(cookie) + + def render(self, content: Any, media_type: str, enc_hook: Serializer = default_serializer) -> bytes: + """Handle the rendering of content into a bytes string. + + Returns: + An encoded bytes string + """ + if isinstance(content, bytes): + return content + + if content is Empty: + raise RuntimeError("The `Empty` sentinel cannot be used as response content") + + try: + if media_type.startswith("text/") and not content: + return b"" + + if isinstance(content, str): + return content.encode(self.encoding) + + if media_type == MediaType.MESSAGEPACK: + return encode_msgpack(content, enc_hook) + + if MEDIA_TYPE_APPLICATION_JSON_PATTERN.match( + media_type, + ): + return encode_json(content, enc_hook) + + raise ImproperlyConfiguredException(f"unsupported media_type {media_type} for content {content!r}") + except (AttributeError, ValueError, TypeError) as e: + raise ImproperlyConfiguredException("Unable to serialize response content") from e + + def to_asgi_response( + self, + app: Litestar | None, + request: Request, + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: Iterable[Cookie] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + headers: dict[str, str] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + status_code: int | None = None, + type_encoders: TypeEncodersMap | None = None, + ) -> ASGIResponse: + """Create an ASGIResponse from a Response instance. + + Args: + app: The :class:`Litestar <.app.Litestar>` application instance. + background: Background task(s) to be executed after the response is sent. + cookies: A list of cookies to be set on the response. + encoded_headers: A list of already encoded headers. + headers: Additional headers to be merged with the response headers. Response headers take precedence. + is_head_response: Whether the response is a HEAD response. + media_type: Media type for the response. If ``media_type`` is already set on the response, this is ignored. + request: The :class:`Request <.connection.Request>` instance. + status_code: Status code for the response. If ``status_code`` is already set on the response, this is + type_encoders: A dictionary of type encoders to use for encoding the response content. + + Returns: + An ASGIResponse instance. + """ + + if app is not None: + warn_deprecation( + version="2.1", + deprecated_name="app", + kind="parameter", + removal_in="3.0.0", + alternative="request.app", + ) + + headers = {**headers, **self.headers} if headers is not None else self.headers + cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies) + + if type_encoders: + type_encoders = {**type_encoders, **(self.response_type_encoders or {})} + else: + type_encoders = self.response_type_encoders + + media_type = get_enum_string_value(self.media_type or media_type or MediaType.JSON) + + return ASGIResponse( + background=self.background or background, + body=self.render(self.content, media_type, get_serializer(type_encoders)), + cookies=cookies, + encoded_headers=encoded_headers, + encoding=self.encoding, + headers=headers, + is_head_response=is_head_response, + media_type=media_type, + status_code=self.status_code or status_code, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/response/file.py b/venv/lib/python3.11/site-packages/litestar/response/file.py new file mode 100644 index 0000000..1fc6f86 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/file.py @@ -0,0 +1,386 @@ +from __future__ import annotations + +import itertools +from email.utils import formatdate +from inspect import iscoroutine +from mimetypes import encodings_map, guess_type +from typing import TYPE_CHECKING, Any, AsyncGenerator, Coroutine, Iterable, Literal, cast +from urllib.parse import quote +from zlib import adler32 + +from litestar.constants import ONE_MEGABYTE +from litestar.exceptions import ImproperlyConfiguredException +from litestar.file_system import BaseLocalFileSystem, FileSystemAdapter +from litestar.response.base import Response +from litestar.response.streaming import ASGIStreamingResponse +from litestar.utils.deprecation import warn_deprecation +from litestar.utils.helpers import get_enum_string_value + +if TYPE_CHECKING: + from os import PathLike + from os import stat_result as stat_result_type + + from anyio import Path + + from litestar.app import Litestar + from litestar.background_tasks import BackgroundTask, BackgroundTasks + from litestar.connection import Request + from litestar.datastructures.cookie import Cookie + from litestar.datastructures.headers import ETag + from litestar.enums import MediaType + from litestar.types import ( + HTTPResponseBodyEvent, + PathType, + Receive, + ResponseCookies, + ResponseHeaders, + Send, + TypeEncodersMap, + ) + from litestar.types.file_types import FileInfo, FileSystemProtocol + +__all__ = ( + "ASGIFileResponse", + "File", + "async_file_iterator", + "create_etag_for_file", +) + +# brotli not supported in 'mimetypes.encodings_map' until py 3.9. +encodings_map[".br"] = "br" + + +async def async_file_iterator( + file_path: PathType, chunk_size: int, adapter: FileSystemAdapter +) -> AsyncGenerator[bytes, None]: + """Return an async that asynchronously reads a file and yields its chunks. + + Args: + file_path: A path to a file. + chunk_size: The chunk file to use. + adapter: File system adapter class. + adapter: File system adapter class. + + Returns: + An async generator. + """ + async with await adapter.open(file_path) as file: + while chunk := await file.read(chunk_size): + yield chunk + + +def create_etag_for_file(path: PathType, modified_time: float, file_size: int) -> str: + """Create an etag. + + Notes: + - Function is derived from flask. + + Returns: + An etag. + """ + check = adler32(str(path).encode("utf-8")) & 0xFFFFFFFF + return f'"{modified_time}-{file_size}-{check}"' + + +class ASGIFileResponse(ASGIStreamingResponse): + """A low-level ASGI response, streaming a file as response body.""" + + def __init__( + self, + *, + background: BackgroundTask | BackgroundTasks | None = None, + body: bytes | str = b"", + chunk_size: int = ONE_MEGABYTE, + content_disposition_type: Literal["attachment", "inline"] = "attachment", + content_length: int | None = None, + cookies: Iterable[Cookie] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + encoding: str = "utf-8", + etag: ETag | None = None, + file_info: FileInfo | Coroutine[None, None, FileInfo] | None = None, + file_path: str | PathLike | Path, + file_system: FileSystemProtocol | None = None, + filename: str = "", + headers: dict[str, str] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + stat_result: stat_result_type | None = None, + status_code: int | None = None, + ) -> None: + """A low-level ASGI response, streaming a file as response body. + + Args: + background: A background task or a list of background tasks to be executed after the response is sent. + body: encoded content to send in the response body. + chunk_size: The chunk size to use. + content_disposition_type: The type of the ``Content-Disposition``. Either ``inline`` or ``attachment``. + content_length: The response content length. + cookies: The response cookies. + encoded_headers: A list of encoded headers. + encoding: The response encoding. + etag: An etag. + file_info: A file info. + file_path: A path to a file. + file_system: A file system adapter. + filename: The name of the file. + headers: A dictionary of headers. + headers: The response headers. + is_head_response: A boolean indicating if the response is a HEAD response. + media_type: The media type of the file. + stat_result: A stat result. + status_code: The response status code. + """ + headers = headers or {} + if not media_type: + mimetype, content_encoding = guess_type(filename) if filename else (None, None) + media_type = mimetype or "application/octet-stream" + if content_encoding is not None: + headers.update({"content-encoding": content_encoding}) + + self.adapter = FileSystemAdapter(file_system or BaseLocalFileSystem()) + + super().__init__( + iterator=async_file_iterator(file_path=file_path, chunk_size=chunk_size, adapter=self.adapter), + headers=headers, + media_type=media_type, + cookies=cookies, + background=background, + status_code=status_code, + body=body, + content_length=content_length, + encoding=encoding, + is_head_response=is_head_response, + encoded_headers=encoded_headers, + ) + + quoted_filename = quote(filename) + is_utf8 = quoted_filename == filename + if is_utf8: + content_disposition = f'{content_disposition_type}; filename="{filename}"' + else: + content_disposition = f"{content_disposition_type}; filename*=utf-8''{quoted_filename}" + + self.headers.setdefault("content-disposition", content_disposition) + + self.chunk_size = chunk_size + self.etag = etag + self.file_path = file_path + + if file_info: + self.file_info: FileInfo | Coroutine[Any, Any, FileInfo] = file_info + elif stat_result: + self.file_info = self.adapter.parse_stat_result(result=stat_result, path=file_path) + else: + self.file_info = self.adapter.info(self.file_path) + + async def send_body(self, send: Send, receive: Receive) -> None: + """Emit a stream of events correlating with the response body. + + Args: + send: The ASGI send function. + receive: The ASGI receive function. + + Returns: + None + """ + if self.chunk_size < self.content_length: + await super().send_body(send=send, receive=receive) + return + + async with await self.adapter.open(self.file_path) as file: + body_event: HTTPResponseBodyEvent = { + "type": "http.response.body", + "body": await file.read(), + "more_body": False, + } + await send(body_event) + + async def start_response(self, send: Send) -> None: + """Emit the start event of the response. This event includes the headers and status codes. + + Args: + send: The ASGI send function. + + Returns: + None + """ + try: + fs_info = self.file_info = cast( + "FileInfo", (await self.file_info if iscoroutine(self.file_info) else self.file_info) + ) + except FileNotFoundError as e: + raise ImproperlyConfiguredException(f"{self.file_path} does not exist") from e + + if fs_info["type"] != "file": + raise ImproperlyConfiguredException(f"{self.file_path} is not a file") + + self.content_length = fs_info["size"] + + self.headers.setdefault("content-length", str(self.content_length)) + self.headers.setdefault("last-modified", formatdate(fs_info["mtime"], usegmt=True)) + + if self.etag: + self.headers.setdefault("etag", self.etag.to_header()) + else: + self.headers.setdefault( + "etag", + create_etag_for_file(path=self.file_path, modified_time=fs_info["mtime"], file_size=fs_info["size"]), + ) + + await super().start_response(send=send) + + +class File(Response): + """A response, streaming a file as response body.""" + + __slots__ = ( + "chunk_size", + "content_disposition_type", + "etag", + "file_path", + "file_system", + "filename", + "file_info", + "stat_result", + ) + + def __init__( + self, + path: str | PathLike | Path, + *, + background: BackgroundTask | BackgroundTasks | None = None, + chunk_size: int = ONE_MEGABYTE, + content_disposition_type: Literal["attachment", "inline"] = "attachment", + cookies: ResponseCookies | None = None, + encoding: str = "utf-8", + etag: ETag | None = None, + file_info: FileInfo | Coroutine[Any, Any, FileInfo] | None = None, + file_system: FileSystemProtocol | None = None, + filename: str | None = None, + headers: ResponseHeaders | None = None, + media_type: Literal[MediaType.TEXT] | str | None = None, + stat_result: stat_result_type | None = None, + status_code: int | None = None, + ) -> None: + """Initialize ``File`` + + Notes: + - This class extends the :class:`Stream <.response.Stream>` class. + + Args: + path: A file path in one of the supported formats. + background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or + :class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished. + Defaults to None. + chunk_size: The chunk sizes to use when streaming the file. Defaults to 1MB. + content_disposition_type: The type of the ``Content-Disposition``. Either ``inline`` or ``attachment``. + cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response + ``Set-Cookie`` header. + encoding: The encoding to be used for the response headers. + etag: An optional :class:`ETag <.datastructures.ETag>` instance. If not provided, an etag will be + generated. + file_info: The output of calling :meth:`file_system.info <types.FileSystemProtocol.info>`, equivalent to + providing an :class:`os.stat_result`. + file_system: An implementation of the :class:`FileSystemProtocol <.types.FileSystemProtocol>`. If provided + it will be used to load the file. + filename: An optional filename to set in the header. + headers: A string keyed dictionary of response headers. Header keys are insensitive. + media_type: A value for the response ``Content-Type`` header. If not provided, the value will be either + derived from the filename if provided and supported by the stdlib, or will default to + ``application/octet-stream``. + stat_result: An optional result of calling :func:os.stat:. If not provided, this will be done by the + response constructor. + status_code: An HTTP status code. + """ + + if file_system is not None and not ( + callable(getattr(file_system, "info", None)) and callable(getattr(file_system, "open", None)) + ): + raise ImproperlyConfiguredException("file_system must adhere to the FileSystemProtocol type") + + self.chunk_size = chunk_size + self.content_disposition_type = content_disposition_type + self.etag = etag + self.file_info = file_info + self.file_path = path + self.file_system = file_system + self.filename = filename or "" + self.stat_result = stat_result + + super().__init__( + content=None, + status_code=status_code, + media_type=media_type, + background=background, + headers=headers, + cookies=cookies, + encoding=encoding, + ) + + def to_asgi_response( + self, + app: Litestar | None, + request: Request, + *, + background: BackgroundTask | BackgroundTasks | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + cookies: Iterable[Cookie] | None = None, + headers: dict[str, str] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + status_code: int | None = None, + type_encoders: TypeEncodersMap | None = None, + ) -> ASGIFileResponse: + """Create an :class:`ASGIFileResponse <litestar.response.file.ASGIFileResponse>` instance. + + Args: + app: The :class:`Litestar <.app.Litestar>` application instance. + background: Background task(s) to be executed after the response is sent. + cookies: A list of cookies to be set on the response. + encoded_headers: A list of already encoded headers. + headers: Additional headers to be merged with the response headers. Response headers take precedence. + is_head_response: Whether the response is a HEAD response. + media_type: Media type for the response. If ``media_type`` is already set on the response, this is ignored. + request: The :class:`Request <.connection.Request>` instance. + status_code: Status code for the response. If ``status_code`` is already set on the response, this is + type_encoders: A dictionary of type encoders to use for encoding the response content. + + Returns: + A low-level ASGI file response. + """ + if app is not None: + warn_deprecation( + version="2.1", + deprecated_name="app", + kind="parameter", + removal_in="3.0.0", + alternative="request.app", + ) + + headers = {**headers, **self.headers} if headers is not None else self.headers + cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies) + + media_type = self.media_type or media_type + if media_type is not None: + media_type = get_enum_string_value(media_type) + + return ASGIFileResponse( + background=self.background or background, + body=b"", + chunk_size=self.chunk_size, + content_disposition_type=self.content_disposition_type, # pyright: ignore + content_length=0, + cookies=cookies, + encoded_headers=encoded_headers, + encoding=self.encoding, + etag=self.etag, + file_info=self.file_info, + file_path=self.file_path, + file_system=self.file_system, + filename=self.filename, + headers=headers, + is_head_response=is_head_response, + media_type=media_type, + stat_result=self.stat_result, + status_code=self.status_code or status_code, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/response/redirect.py b/venv/lib/python3.11/site-packages/litestar/response/redirect.py new file mode 100644 index 0000000..6a07076 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/redirect.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +import itertools +from typing import TYPE_CHECKING, Any, Iterable, Literal + +from litestar.constants import REDIRECT_ALLOWED_MEDIA_TYPES, REDIRECT_STATUS_CODES +from litestar.enums import MediaType +from litestar.exceptions import ImproperlyConfiguredException +from litestar.response.base import ASGIResponse, Response +from litestar.status_codes import HTTP_302_FOUND +from litestar.utils import url_quote +from litestar.utils.deprecation import warn_deprecation +from litestar.utils.helpers import get_enum_string_value + +if TYPE_CHECKING: + from litestar.app import Litestar + from litestar.background_tasks import BackgroundTask, BackgroundTasks + from litestar.connection import Request + from litestar.datastructures import Cookie + from litestar.types import ResponseCookies, ResponseHeaders, TypeEncodersMap + +__all__ = ( + "ASGIRedirectResponse", + "Redirect", +) + + +RedirectStatusType = Literal[301, 302, 303, 307, 308] +"""Acceptable status codes for redirect responses.""" + + +class ASGIRedirectResponse(ASGIResponse): + """A low-level ASGI redirect response class.""" + + def __init__( + self, + path: str | bytes, + media_type: str | None = None, + status_code: RedirectStatusType | None = None, + headers: dict[str, Any] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + background: BackgroundTask | BackgroundTasks | None = None, + body: bytes | str = b"", + content_length: int | None = None, + cookies: Iterable[Cookie] | None = None, + encoding: str = "utf-8", + is_head_response: bool = False, + ) -> None: + headers = {**(headers or {}), "location": url_quote(path)} + media_type = media_type or MediaType.TEXT + status_code = status_code or HTTP_302_FOUND + + if status_code not in REDIRECT_STATUS_CODES: + raise ImproperlyConfiguredException( + f"{status_code} is not a valid for this response. " + f"Redirect responses should have one of " + f"the following status codes: {', '.join([str(s) for s in REDIRECT_STATUS_CODES])}" + ) + + if media_type not in REDIRECT_ALLOWED_MEDIA_TYPES: + raise ImproperlyConfiguredException( + f"{media_type} media type is not supported yet. " + f"Media type should be one of " + f"the following values: {', '.join([str(s) for s in REDIRECT_ALLOWED_MEDIA_TYPES])}" + ) + + super().__init__( + status_code=status_code, + headers=headers, + media_type=media_type, + background=background, + is_head_response=is_head_response, + encoding=encoding, + cookies=cookies, + content_length=content_length, + body=body, + encoded_headers=encoded_headers, + ) + + +class Redirect(Response[Any]): + """A redirect response.""" + + __slots__ = ("url",) + + def __init__( + self, + path: str, + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: ResponseCookies | None = None, + encoding: str = "utf-8", + headers: ResponseHeaders | None = None, + media_type: str | MediaType | None = None, + status_code: RedirectStatusType | None = None, + type_encoders: TypeEncodersMap | None = None, + ) -> None: + """Initialize the response. + + Args: + path: A path to redirect to. + background: A background task or tasks to be run after the response is sent. + cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response + ``Set-Cookie`` header. + encoding: The encoding to be used for the response headers. + headers: A string keyed dictionary of response headers. Header keys are insensitive. + media_type: A value for the response ``Content-Type`` header. + status_code: An HTTP status code. The status code should be one of 301, 302, 303, 307 or 308, + otherwise an exception will be raised. + type_encoders: A mapping of types to callables that transform them into types supported for serialization. + + Raises: + ImproperlyConfiguredException: Either if status code is not a redirect status code or media type is not + supported. + """ + self.url = path + if status_code is None: + status_code = HTTP_302_FOUND + super().__init__( + background=background, + content=b"", + cookies=cookies, + encoding=encoding, + headers=headers, + media_type=media_type, + status_code=status_code, + type_encoders=type_encoders, + ) + + def to_asgi_response( + self, + app: Litestar | None, + request: Request, + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: Iterable[Cookie] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + headers: dict[str, str] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + status_code: int | None = None, + type_encoders: TypeEncodersMap | None = None, + ) -> ASGIResponse: + headers = {**headers, **self.headers} if headers is not None else self.headers + cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies) + media_type = get_enum_string_value(self.media_type or media_type or MediaType.TEXT) + + if app is not None: + warn_deprecation( + version="2.1", + deprecated_name="app", + kind="parameter", + removal_in="3.0.0", + alternative="request.app", + ) + + return ASGIRedirectResponse( + path=self.url, + background=self.background or background, + body=b"", + content_length=None, + cookies=cookies, + encoded_headers=encoded_headers, + encoding=self.encoding, + headers=headers, + is_head_response=is_head_response, + media_type=media_type, + status_code=self.status_code or status_code, # type:ignore[arg-type] + ) diff --git a/venv/lib/python3.11/site-packages/litestar/response/sse.py b/venv/lib/python3.11/site-packages/litestar/response/sse.py new file mode 100644 index 0000000..48a9192 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/sse.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import io +import re +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterable, AsyncIterator, Iterable, Iterator + +from litestar.concurrency import sync_to_thread +from litestar.exceptions import ImproperlyConfiguredException +from litestar.response.streaming import Stream +from litestar.utils import AsyncIteratorWrapper + +if TYPE_CHECKING: + from litestar.background_tasks import BackgroundTask, BackgroundTasks + from litestar.types import ResponseCookies, ResponseHeaders, SSEData, StreamType + +_LINE_BREAK_RE = re.compile(r"\r\n|\r|\n") +DEFAULT_SEPARATOR = "\r\n" + + +class _ServerSentEventIterator(AsyncIteratorWrapper[bytes]): + __slots__ = ("content_async_iterator", "event_id", "event_type", "retry_duration", "comment_message") + + content_async_iterator: AsyncIterable[SSEData] + + def __init__( + self, + content: str | bytes | StreamType[SSEData], + event_type: str | None = None, + event_id: int | str | None = None, + retry_duration: int | None = None, + comment_message: str | None = None, + ) -> None: + self.comment_message = comment_message + self.event_id = event_id + self.event_type = event_type + self.retry_duration = retry_duration + chunks: list[bytes] = [] + if comment_message is not None: + chunks.extend([f": {chunk}\r\n".encode() for chunk in _LINE_BREAK_RE.split(comment_message)]) + + if event_id is not None: + chunks.append(f"id: {event_id}\r\n".encode()) + + if event_type is not None: + chunks.append(f"event: {event_type}\r\n".encode()) + + if retry_duration is not None: + chunks.append(f"retry: {retry_duration}\r\n".encode()) + + super().__init__(iterator=chunks) + + if not isinstance(content, (Iterator, AsyncIterator, AsyncIteratorWrapper)) and callable(content): + content = content() # type: ignore[unreachable] + + if isinstance(content, (str, bytes)): + self.content_async_iterator = AsyncIteratorWrapper([content]) + elif isinstance(content, (Iterable, Iterator)): + self.content_async_iterator = AsyncIteratorWrapper(content) + elif isinstance(content, (AsyncIterable, AsyncIterator, AsyncIteratorWrapper)): + self.content_async_iterator = content + else: + raise ImproperlyConfiguredException(f"Invalid type {type(content)} for ServerSentEvent") + + def ensure_bytes(self, data: str | int | bytes | dict | ServerSentEventMessage | Any, sep: str) -> bytes: + if isinstance(data, ServerSentEventMessage): + return data.encode() + if isinstance(data, dict): + data["sep"] = sep + return ServerSentEventMessage(**data).encode() + + return ServerSentEventMessage( + data=data, id=self.event_id, event=self.event_type, retry=self.retry_duration, sep=sep + ).encode() + + def _call_next(self) -> bytes: + try: + return next(self.iterator) + except StopIteration as e: + raise ValueError from e + + async def _async_generator(self) -> AsyncGenerator[bytes, None]: + while True: + try: + yield await sync_to_thread(self._call_next) + except ValueError: + async for value in self.content_async_iterator: + yield self.ensure_bytes(value, DEFAULT_SEPARATOR) + break + + +@dataclass +class ServerSentEventMessage: + data: str | int | bytes | None = "" + event: str | None = None + id: int | str | None = None + retry: int | None = None + comment: str | None = None + sep: str = DEFAULT_SEPARATOR + + def encode(self) -> bytes: + buffer = io.StringIO() + if self.comment is not None: + for chunk in _LINE_BREAK_RE.split(str(self.comment)): + buffer.write(f": {chunk}") + buffer.write(self.sep) + + if self.id is not None: + buffer.write(_LINE_BREAK_RE.sub("", f"id: {self.id}")) + buffer.write(self.sep) + + if self.event is not None: + buffer.write(_LINE_BREAK_RE.sub("", f"event: {self.event}")) + buffer.write(self.sep) + + if self.data is not None: + data = self.data + for chunk in _LINE_BREAK_RE.split(data.decode() if isinstance(data, bytes) else str(data)): + buffer.write(f"data: {chunk}") + buffer.write(self.sep) + + if self.retry is not None: + buffer.write(f"retry: {self.retry}") + buffer.write(self.sep) + + buffer.write(self.sep) + return buffer.getvalue().encode("utf-8") + + +class ServerSentEvent(Stream): + def __init__( + self, + content: str | bytes | StreamType[SSEData], + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: ResponseCookies | None = None, + encoding: str = "utf-8", + headers: ResponseHeaders | None = None, + event_type: str | None = None, + event_id: int | str | None = None, + retry_duration: int | None = None, + comment_message: str | None = None, + status_code: int | None = None, + ) -> None: + """Initialize the response. + + Args: + content: Bytes, string or a sync or async iterator or iterable. + background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or + :class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished. + Defaults to None. + cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response + ``Set-Cookie`` header. + encoding: The encoding to be used for the response headers. + headers: A string keyed dictionary of response headers. Header keys are insensitive. + status_code: The response status code. Defaults to 200. + event_type: The type of the SSE event. If given, the browser will sent the event to any 'event-listener' + declared for it (e.g. via 'addEventListener' in JS). + event_id: The event ID. This sets the event source's 'last event id'. + retry_duration: Retry duration in milliseconds. + comment_message: A comment message. This value is ignored by clients and is used mostly for pinging. + """ + super().__init__( + content=_ServerSentEventIterator( + content=content, + event_type=event_type, + event_id=event_id, + retry_duration=retry_duration, + comment_message=comment_message, + ), + media_type="text/event-stream", + background=background, + cookies=cookies, + encoding=encoding, + headers=headers, + status_code=status_code, + ) + self.headers.setdefault("Cache-Control", "no-cache") + self.headers["Connection"] = "keep-alive" + self.headers["X-Accel-Buffering"] = "no" diff --git a/venv/lib/python3.11/site-packages/litestar/response/streaming.py b/venv/lib/python3.11/site-packages/litestar/response/streaming.py new file mode 100644 index 0000000..fc76522 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/streaming.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +import itertools +from functools import partial +from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterable, AsyncIterator, Callable, Iterable, Iterator, Union + +from anyio import CancelScope, create_task_group + +from litestar.enums import MediaType +from litestar.response.base import ASGIResponse, Response +from litestar.types.helper_types import StreamType +from litestar.utils.deprecation import warn_deprecation +from litestar.utils.helpers import get_enum_string_value +from litestar.utils.sync import AsyncIteratorWrapper + +if TYPE_CHECKING: + from litestar.app import Litestar + from litestar.background_tasks import BackgroundTask, BackgroundTasks + from litestar.connection import Request + from litestar.datastructures.cookie import Cookie + from litestar.enums import OpenAPIMediaType + from litestar.types import HTTPResponseBodyEvent, Receive, ResponseCookies, ResponseHeaders, Send, TypeEncodersMap + +__all__ = ( + "ASGIStreamingResponse", + "Stream", +) + + +class ASGIStreamingResponse(ASGIResponse): + """A streaming response.""" + + __slots__ = ("iterator",) + + _should_set_content_length = False + + def __init__( + self, + *, + iterator: StreamType, + background: BackgroundTask | BackgroundTasks | None = None, + body: bytes | str = b"", + content_length: int | None = None, + cookies: Iterable[Cookie] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + encoding: str = "utf-8", + headers: dict[str, Any] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + status_code: int | None = None, + ) -> None: + """A low-level ASGI streaming response. + + Args: + background: A background task or a list of background tasks to be executed after the response is sent. + body: encoded content to send in the response body. + content_length: The response content length. + cookies: The response cookies. + encoded_headers: The response headers. + encoding: The response encoding. + headers: The response headers. + is_head_response: A boolean indicating if the response is a HEAD response. + iterator: An async iterator or iterable. + media_type: The response media type. + status_code: The response status code. + """ + super().__init__( + background=background, + body=body, + content_length=content_length, + cookies=cookies, + encoding=encoding, + headers=headers, + is_head_response=is_head_response, + media_type=media_type, + status_code=status_code, + encoded_headers=encoded_headers, + ) + self.iterator: AsyncIterable[str | bytes] | AsyncGenerator[str | bytes, None] = ( + iterator if isinstance(iterator, (AsyncIterable, AsyncIterator)) else AsyncIteratorWrapper(iterator) + ) + + async def _listen_for_disconnect(self, cancel_scope: CancelScope, receive: Receive) -> None: + """Listen for a cancellation message, and if received - call cancel on the cancel scope. + + Args: + cancel_scope: A task group cancel scope instance. + receive: The ASGI receive function. + + Returns: + None + """ + if not cancel_scope.cancel_called: + message = await receive() + if message["type"] == "http.disconnect": + # despite the IDE warning, this is not a coroutine because anyio 3+ changed this. + # therefore make sure not to await this. + cancel_scope.cancel() + else: + await self._listen_for_disconnect(cancel_scope=cancel_scope, receive=receive) + + async def _stream(self, send: Send) -> None: + """Send the chunks from the iterator as a stream of ASGI 'http.response.body' events. + + Args: + send: The ASGI Send function. + + Returns: + None + """ + async for chunk in self.iterator: + stream_event: HTTPResponseBodyEvent = { + "type": "http.response.body", + "body": chunk if isinstance(chunk, bytes) else chunk.encode(self.encoding), + "more_body": True, + } + await send(stream_event) + terminus_event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": b"", "more_body": False} + await send(terminus_event) + + async def send_body(self, send: Send, receive: Receive) -> None: + """Emit a stream of events correlating with the response body. + + Args: + send: The ASGI send function. + receive: The ASGI receive function. + + Returns: + None + """ + + async with create_task_group() as task_group: + task_group.start_soon(partial(self._stream, send)) + await self._listen_for_disconnect(cancel_scope=task_group.cancel_scope, receive=receive) + + +class Stream(Response[StreamType[Union[str, bytes]]]): + """An HTTP response that streams the response data as a series of ASGI ``http.response.body`` events.""" + + __slots__ = ("iterator",) + + def __init__( + self, + content: StreamType[str | bytes] | Callable[[], StreamType[str | bytes]], + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: ResponseCookies | None = None, + encoding: str = "utf-8", + headers: ResponseHeaders | None = None, + media_type: MediaType | OpenAPIMediaType | str | None = None, + status_code: int | None = None, + ) -> None: + """Initialize the response. + + Args: + content: A sync or async iterator or iterable. + background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or + :class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished. + Defaults to None. + cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response + ``Set-Cookie`` header. + encoding: The encoding to be used for the response headers. + headers: A string keyed dictionary of response headers. Header keys are insensitive. + media_type: A value for the response ``Content-Type`` header. + status_code: An HTTP status code. + """ + super().__init__( + background=background, + content=b"", # type: ignore[arg-type] + cookies=cookies, + encoding=encoding, + headers=headers, + media_type=media_type, + status_code=status_code, + ) + self.iterator = content + + def to_asgi_response( + self, + app: Litestar | None, + request: Request, + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: Iterable[Cookie] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + headers: dict[str, str] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + status_code: int | None = None, + type_encoders: TypeEncodersMap | None = None, + ) -> ASGIResponse: + """Create an ASGIStreamingResponse from a StremaingResponse instance. + + Args: + app: The :class:`Litestar <.app.Litestar>` application instance. + background: Background task(s) to be executed after the response is sent. + cookies: A list of cookies to be set on the response. + encoded_headers: A list of already encoded headers. + headers: Additional headers to be merged with the response headers. Response headers take precedence. + is_head_response: Whether the response is a HEAD response. + media_type: Media type for the response. If ``media_type`` is already set on the response, this is ignored. + request: The :class:`Request <.connection.Request>` instance. + status_code: Status code for the response. If ``status_code`` is already set on the response, this is + type_encoders: A dictionary of type encoders to use for encoding the response content. + + Returns: + An ASGIStreamingResponse instance. + """ + if app is not None: + warn_deprecation( + version="2.1", + deprecated_name="app", + kind="parameter", + removal_in="3.0.0", + alternative="request.app", + ) + + headers = {**headers, **self.headers} if headers is not None else self.headers + cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies) + + media_type = get_enum_string_value(media_type or self.media_type or MediaType.JSON) + + iterator = self.iterator + if not isinstance(iterator, (Iterable, Iterator, AsyncIterable, AsyncIterator)) and callable(iterator): + iterator = iterator() + + return ASGIStreamingResponse( + background=self.background or background, + body=b"", + content_length=0, + cookies=cookies, + encoded_headers=encoded_headers, + encoding=self.encoding, + headers=headers, + is_head_response=is_head_response, + iterator=iterator, + media_type=media_type, + status_code=self.status_code or status_code, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/response/template.py b/venv/lib/python3.11/site-packages/litestar/response/template.py new file mode 100644 index 0000000..6499aae --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/template.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +import itertools +from mimetypes import guess_type +from pathlib import PurePath +from typing import TYPE_CHECKING, Any, Iterable, cast + +from litestar.enums import MediaType +from litestar.exceptions import ImproperlyConfiguredException +from litestar.response.base import ASGIResponse, Response +from litestar.status_codes import HTTP_200_OK +from litestar.utils.deprecation import warn_deprecation +from litestar.utils.empty import value_or_default +from litestar.utils.scope.state import ScopeState + +if TYPE_CHECKING: + from litestar.app import Litestar + from litestar.background_tasks import BackgroundTask, BackgroundTasks + from litestar.connection import Request + from litestar.datastructures import Cookie + from litestar.types import ResponseCookies, TypeEncodersMap + +__all__ = ("Template",) + + +class Template(Response[bytes]): + """Template-based response, rendering a given template into a bytes string.""" + + __slots__ = ( + "template_name", + "template_str", + "context", + ) + + def __init__( + self, + template_name: str | None = None, + *, + template_str: str | None = None, + background: BackgroundTask | BackgroundTasks | None = None, + context: dict[str, Any] | None = None, + cookies: ResponseCookies | None = None, + encoding: str = "utf-8", + headers: dict[str, Any] | None = None, + media_type: MediaType | str | None = None, + status_code: int = HTTP_200_OK, + ) -> None: + """Handle the rendering of a given template into a bytes string. + + Args: + template_name: Path-like name for the template to be rendered, e.g. ``index.html``. + template_str: A string representing the template, e.g. ``tmpl = "Hello <strong>World</strong>"``. + background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or + :class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished. + Defaults to ``None``. + context: A dictionary of key/value pairs to be passed to the temple engine's render method. + cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response + ``Set-Cookie`` header. + encoding: Content encoding + headers: A string keyed dictionary of response headers. Header keys are insensitive. + media_type: A string or member of the :class:`MediaType <.enums.MediaType>` enum. If not set, try to infer + the media type based on the template name. If this fails, fall back to ``text/plain``. + status_code: A value for the response HTTP status code. + """ + if not (template_name or template_str): + raise ValueError("Either template_name or template_str must be provided.") + + if template_name and template_str: + raise ValueError("Either template_name or template_str must be provided, not both.") + + super().__init__( + background=background, + content=b"", + cookies=cookies, + encoding=encoding, + headers=headers, + media_type=media_type, + status_code=status_code, + ) + self.context = context or {} + self.template_name = template_name + self.template_str = template_str + + def create_template_context(self, request: Request) -> dict[str, Any]: + """Create a context object for the template. + + Args: + request: A :class:`Request <.connection.Request>` instance. + + Returns: + A dictionary holding the template context + """ + csrf_token = value_or_default(ScopeState.from_scope(request.scope).csrf_token, "") + return { + **self.context, + "request": request, + "csrf_input": f'<input type="hidden" name="_csrf_token" value="{csrf_token}" />', + } + + def to_asgi_response( + self, + app: Litestar | None, + request: Request, + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: Iterable[Cookie] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + headers: dict[str, str] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + status_code: int | None = None, + type_encoders: TypeEncodersMap | None = None, + ) -> ASGIResponse: + if app is not None: + warn_deprecation( + version="2.1", + deprecated_name="app", + kind="parameter", + removal_in="3.0.0", + alternative="request.app", + ) + + if not (template_engine := request.app.template_engine): + raise ImproperlyConfiguredException("Template engine is not configured") + + headers = {**headers, **self.headers} if headers is not None else self.headers + cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies) + + media_type = self.media_type or media_type + if not media_type: + if self.template_name: + suffixes = PurePath(self.template_name).suffixes + for suffix in suffixes: + if _type := guess_type(f"name{suffix}")[0]: + media_type = _type + break + else: + media_type = MediaType.TEXT + else: + media_type = MediaType.HTML + + context = self.create_template_context(request) + + if self.template_str is not None: + body = template_engine.render_string(self.template_str, context) + else: + # cast to str b/c we know that either template_name cannot be None if template_str is None + template = template_engine.get_template(cast("str", self.template_name)) + body = template.render(**context).encode(self.encoding) + + return ASGIResponse( + background=self.background or background, + body=body, + content_length=None, + cookies=cookies, + encoded_headers=encoded_headers, + encoding=self.encoding, + headers=headers, + is_head_response=is_head_response, + media_type=media_type, + status_code=self.status_code or status_code, + ) |