diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/response/base.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/litestar/response/base.py | 459 |
1 files changed, 459 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/response/base.py b/venv/lib/python3.11/site-packages/litestar/response/base.py new file mode 100644 index 0000000..67eec09 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/response/base.py @@ -0,0 +1,459 @@ +from __future__ import annotations + +import itertools +import re +from typing import TYPE_CHECKING, Any, ClassVar, Generic, Iterable, Literal, Mapping, TypeVar, overload + +from litestar.datastructures.cookie import Cookie +from litestar.datastructures.headers import ETag, MutableScopeHeaders +from litestar.enums import MediaType, OpenAPIMediaType +from litestar.exceptions import ImproperlyConfiguredException +from litestar.serialization import default_serializer, encode_json, encode_msgpack, get_serializer +from litestar.status_codes import HTTP_200_OK, HTTP_204_NO_CONTENT, HTTP_304_NOT_MODIFIED +from litestar.types.empty import Empty +from litestar.utils.deprecation import deprecated, warn_deprecation +from litestar.utils.helpers import get_enum_string_value + +if TYPE_CHECKING: + from typing import Optional + + from litestar.app import Litestar + from litestar.background_tasks import BackgroundTask, BackgroundTasks + from litestar.connection import Request + from litestar.types import ( + HTTPResponseBodyEvent, + HTTPResponseStartEvent, + Receive, + ResponseCookies, + ResponseHeaders, + Scope, + Send, + Serializer, + TypeEncodersMap, + ) + +__all__ = ("ASGIResponse", "Response") + +T = TypeVar("T") + +MEDIA_TYPE_APPLICATION_JSON_PATTERN = re.compile(r"^application/(?:.+\+)?json") + + +class ASGIResponse: + """A low-level ASGI response class.""" + + __slots__ = ( + "background", + "body", + "content_length", + "encoding", + "is_head_response", + "status_code", + "_encoded_cookies", + "headers", + ) + + _should_set_content_length: ClassVar[bool] = True + """A flag to indicate whether the content-length header should be set by default or not.""" + + def __init__( + self, + *, + background: BackgroundTask | BackgroundTasks | None = None, + body: bytes | str = b"", + content_length: int | None = None, + cookies: Iterable[Cookie] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + encoding: str = "utf-8", + headers: dict[str, Any] | Iterable[tuple[str, str]] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + status_code: int | None = None, + ) -> None: + """A low-level ASGI response class. + + Args: + background: A background task or a list of background tasks to be executed after the response is sent. + body: encoded content to send in the response body. + content_length: The response content length. + cookies: The response cookies. + encoded_headers: The response headers. + encoding: The response encoding. + headers: The response headers. + is_head_response: A boolean indicating if the response is a HEAD response. + media_type: The response media type. + status_code: The response status code. + """ + body = body.encode() if isinstance(body, str) else body + status_code = status_code or HTTP_200_OK + self.headers = MutableScopeHeaders() + + if encoded_headers is not None: + warn_deprecation("3.0", kind="parameter", deprecated_name="encoded_headers", alternative="headers") + for header_name, header_value in encoded_headers: + self.headers.add(header_name.decode("latin-1"), header_value.decode("latin-1")) + + if headers is not None: + for k, v in headers.items() if isinstance(headers, dict) else headers: + self.headers.add(k, v) # pyright: ignore + + media_type = get_enum_string_value(media_type or MediaType.JSON) + + status_allows_body = ( + status_code not in {HTTP_204_NO_CONTENT, HTTP_304_NOT_MODIFIED} and status_code >= HTTP_200_OK + ) + + if content_length is None: + content_length = len(body) + + if not status_allows_body or is_head_response: + if body and body != b"null": + raise ImproperlyConfiguredException( + "response content is not supported for HEAD responses and responses with a status code " + "that does not allow content (304, 204, < 200)" + ) + body = b"" + else: + self.headers.setdefault( + "content-type", (f"{media_type}; charset={encoding}" if media_type.startswith("text/") else media_type) + ) + + if self._should_set_content_length: + self.headers.setdefault("content-length", str(content_length)) + + self.background = background + self.body = body + self.content_length = content_length + self._encoded_cookies = tuple( + cookie.to_encoded_header() for cookie in (cookies or ()) if not cookie.documentation_only + ) + self.encoding = encoding + self.is_head_response = is_head_response + self.status_code = status_code + + @property + @deprecated("3.0", kind="property", alternative="encode_headers()") + def encoded_headers(self) -> list[tuple[bytes, bytes]]: + return self.encode_headers() + + def encode_headers(self) -> list[tuple[bytes, bytes]]: + return [*self.headers.headers, *self._encoded_cookies] + + async def after_response(self) -> None: + """Execute after the response is sent. + + Returns: + None + """ + if self.background is not None: + await self.background() + + async def start_response(self, send: Send) -> None: + """Emit the start event of the response. This event includes the headers and status codes. + + Args: + send: The ASGI send function. + + Returns: + None + """ + event: HTTPResponseStartEvent = { + "type": "http.response.start", + "status": self.status_code, + "headers": self.encode_headers(), + } + await send(event) + + async def send_body(self, send: Send, receive: Receive) -> None: + """Emit the response body. + + Args: + send: The ASGI send function. + receive: The ASGI receive function. + + Notes: + - Response subclasses should customize this method if there is a need to customize sending data. + + Returns: + None + """ + event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": self.body, "more_body": False} + await send(event) + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + """ASGI callable of the ``Response``. + + Args: + scope: The ASGI connection scope. + receive: The ASGI receive function. + send: The ASGI send function. + + Returns: + None + """ + await self.start_response(send=send) + + if self.is_head_response: + event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": b"", "more_body": False} + await send(event) + else: + await self.send_body(send=send, receive=receive) + + await self.after_response() + + +class Response(Generic[T]): + """Base Litestar HTTP response class, used as the basis for all other response classes.""" + + __slots__ = ( + "background", + "content", + "cookies", + "encoding", + "headers", + "media_type", + "status_code", + "response_type_encoders", + ) + + content: T + type_encoders: Optional[TypeEncodersMap] = None # noqa: UP007 + + def __init__( + self, + content: T, + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: ResponseCookies | None = None, + encoding: str = "utf-8", + headers: ResponseHeaders | None = None, + media_type: MediaType | OpenAPIMediaType | str | None = None, + status_code: int | None = None, + type_encoders: TypeEncodersMap | None = None, + ) -> None: + """Initialize the response. + + Args: + content: A value for the response body that will be rendered into bytes string. + status_code: An HTTP status code. + media_type: A value for the response ``Content-Type`` header. + background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or + :class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished. + Defaults to ``None``. + headers: A string keyed dictionary of response headers. Header keys are insensitive. + cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response + ``Set-Cookie`` header. + encoding: The encoding to be used for the response headers. + type_encoders: A mapping of types to callables that transform them into types supported for serialization. + """ + self.content = content + self.background = background + self.cookies: list[Cookie] = ( + [Cookie(key=key, value=value) for key, value in cookies.items()] + if isinstance(cookies, Mapping) + else list(cookies or []) + ) + self.encoding = encoding + self.headers: dict[str, Any] = ( + dict(headers) if isinstance(headers, Mapping) else {h.name: h.value for h in headers or {}} + ) + self.media_type = media_type + self.status_code = status_code + self.response_type_encoders = {**(self.type_encoders or {}), **(type_encoders or {})} + + @overload + def set_cookie(self, /, cookie: Cookie) -> None: ... + + @overload + def set_cookie( + self, + key: str, + value: str | None = None, + max_age: int | None = None, + expires: int | None = None, + path: str = "/", + domain: str | None = None, + secure: bool = False, + httponly: bool = False, + samesite: Literal["lax", "strict", "none"] = "lax", + ) -> None: ... + + def set_cookie( # type: ignore[misc] + self, + key: str | Cookie, + value: str | None = None, + max_age: int | None = None, + expires: int | None = None, + path: str = "/", + domain: str | None = None, + secure: bool = False, + httponly: bool = False, + samesite: Literal["lax", "strict", "none"] = "lax", + ) -> None: + """Set a cookie on the response. If passed a :class:`Cookie <.datastructures.Cookie>` instance, keyword + arguments will be ignored. + + Args: + key: Key for the cookie or a :class:`Cookie <.datastructures.Cookie>` instance. + value: Value for the cookie, if none given defaults to empty string. + max_age: Maximal age of the cookie before its invalidated. + expires: Seconds from now until the cookie expires. + path: Path fragment that must exist in the request url for the cookie to be valid. Defaults to ``/``. + domain: Domain for which the cookie is valid. + secure: Https is required for the cookie. + httponly: Forbids javascript to access the cookie via ``document.cookie``. + samesite: Controls whether a cookie is sent with cross-site requests. Defaults to ``lax``. + + Returns: + None. + """ + if not isinstance(key, Cookie): + key = Cookie( + domain=domain, + expires=expires, + httponly=httponly, + key=key, + max_age=max_age, + path=path, + samesite=samesite, + secure=secure, + value=value, + ) + self.cookies.append(key) + + def set_header(self, key: str, value: Any) -> None: + """Set a header on the response. + + Args: + key: Header key. + value: Header value. + + Returns: + None. + """ + self.headers[key] = value + + def set_etag(self, etag: str | ETag) -> None: + """Set an etag header. + + Args: + etag: An etag value. + + Returns: + None + """ + self.headers["etag"] = etag.to_header() if isinstance(etag, ETag) else etag + + def delete_cookie( + self, + key: str, + path: str = "/", + domain: str | None = None, + ) -> None: + """Delete a cookie. + + Args: + key: Key of the cookie. + path: Path of the cookie. + domain: Domain of the cookie. + + Returns: + None. + """ + cookie = Cookie(key=key, path=path, domain=domain, expires=0, max_age=0) + self.cookies = [c for c in self.cookies if c != cookie] + self.cookies.append(cookie) + + def render(self, content: Any, media_type: str, enc_hook: Serializer = default_serializer) -> bytes: + """Handle the rendering of content into a bytes string. + + Returns: + An encoded bytes string + """ + if isinstance(content, bytes): + return content + + if content is Empty: + raise RuntimeError("The `Empty` sentinel cannot be used as response content") + + try: + if media_type.startswith("text/") and not content: + return b"" + + if isinstance(content, str): + return content.encode(self.encoding) + + if media_type == MediaType.MESSAGEPACK: + return encode_msgpack(content, enc_hook) + + if MEDIA_TYPE_APPLICATION_JSON_PATTERN.match( + media_type, + ): + return encode_json(content, enc_hook) + + raise ImproperlyConfiguredException(f"unsupported media_type {media_type} for content {content!r}") + except (AttributeError, ValueError, TypeError) as e: + raise ImproperlyConfiguredException("Unable to serialize response content") from e + + def to_asgi_response( + self, + app: Litestar | None, + request: Request, + *, + background: BackgroundTask | BackgroundTasks | None = None, + cookies: Iterable[Cookie] | None = None, + encoded_headers: Iterable[tuple[bytes, bytes]] | None = None, + headers: dict[str, str] | None = None, + is_head_response: bool = False, + media_type: MediaType | str | None = None, + status_code: int | None = None, + type_encoders: TypeEncodersMap | None = None, + ) -> ASGIResponse: + """Create an ASGIResponse from a Response instance. + + Args: + app: The :class:`Litestar <.app.Litestar>` application instance. + background: Background task(s) to be executed after the response is sent. + cookies: A list of cookies to be set on the response. + encoded_headers: A list of already encoded headers. + headers: Additional headers to be merged with the response headers. Response headers take precedence. + is_head_response: Whether the response is a HEAD response. + media_type: Media type for the response. If ``media_type`` is already set on the response, this is ignored. + request: The :class:`Request <.connection.Request>` instance. + status_code: Status code for the response. If ``status_code`` is already set on the response, this is + type_encoders: A dictionary of type encoders to use for encoding the response content. + + Returns: + An ASGIResponse instance. + """ + + if app is not None: + warn_deprecation( + version="2.1", + deprecated_name="app", + kind="parameter", + removal_in="3.0.0", + alternative="request.app", + ) + + headers = {**headers, **self.headers} if headers is not None else self.headers + cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies) + + if type_encoders: + type_encoders = {**type_encoders, **(self.response_type_encoders or {})} + else: + type_encoders = self.response_type_encoders + + media_type = get_enum_string_value(self.media_type or media_type or MediaType.JSON) + + return ASGIResponse( + background=self.background or background, + body=self.render(self.content, media_type, get_serializer(type_encoders)), + cookies=cookies, + encoded_headers=encoded_headers, + encoding=self.encoding, + headers=headers, + is_head_response=is_head_response, + media_type=media_type, + status_code=self.status_code or status_code, + ) |