summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/response/base.py
blob: 67eec091598466a62f6f9bf9c3fff941d95d2203 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
from __future__ import annotations

import itertools
import re
from typing import TYPE_CHECKING, Any, ClassVar, Generic, Iterable, Literal, Mapping, TypeVar, overload

from litestar.datastructures.cookie import Cookie
from litestar.datastructures.headers import ETag, MutableScopeHeaders
from litestar.enums import MediaType, OpenAPIMediaType
from litestar.exceptions import ImproperlyConfiguredException
from litestar.serialization import default_serializer, encode_json, encode_msgpack, get_serializer
from litestar.status_codes import HTTP_200_OK, HTTP_204_NO_CONTENT, HTTP_304_NOT_MODIFIED
from litestar.types.empty import Empty
from litestar.utils.deprecation import deprecated, warn_deprecation
from litestar.utils.helpers import get_enum_string_value

if TYPE_CHECKING:
    from typing import Optional

    from litestar.app import Litestar
    from litestar.background_tasks import BackgroundTask, BackgroundTasks
    from litestar.connection import Request
    from litestar.types import (
        HTTPResponseBodyEvent,
        HTTPResponseStartEvent,
        Receive,
        ResponseCookies,
        ResponseHeaders,
        Scope,
        Send,
        Serializer,
        TypeEncodersMap,
    )

__all__ = ("ASGIResponse", "Response")

T = TypeVar("T")

MEDIA_TYPE_APPLICATION_JSON_PATTERN = re.compile(r"^application/(?:.+\+)?json")


class ASGIResponse:
    """A low-level ASGI response class."""

    __slots__ = (
        "background",
        "body",
        "content_length",
        "encoding",
        "is_head_response",
        "status_code",
        "_encoded_cookies",
        "headers",
    )

    _should_set_content_length: ClassVar[bool] = True
    """A flag to indicate whether the content-length header should be set by default or not."""

    def __init__(
        self,
        *,
        background: BackgroundTask | BackgroundTasks | None = None,
        body: bytes | str = b"",
        content_length: int | None = None,
        cookies: Iterable[Cookie] | None = None,
        encoded_headers: Iterable[tuple[bytes, bytes]] | None = None,
        encoding: str = "utf-8",
        headers: dict[str, Any] | Iterable[tuple[str, str]] | None = None,
        is_head_response: bool = False,
        media_type: MediaType | str | None = None,
        status_code: int | None = None,
    ) -> None:
        """A low-level ASGI response class.

        Args:
            background: A background task or a list of background tasks to be executed after the response is sent.
            body: encoded content to send in the response body.
            content_length: The response content length.
            cookies: The response cookies.
            encoded_headers: The response headers.
            encoding: The response encoding.
            headers: The response headers.
            is_head_response: A boolean indicating if the response is a HEAD response.
            media_type: The response media type.
            status_code: The response status code.
        """
        body = body.encode() if isinstance(body, str) else body
        status_code = status_code or HTTP_200_OK
        self.headers = MutableScopeHeaders()

        if encoded_headers is not None:
            warn_deprecation("3.0", kind="parameter", deprecated_name="encoded_headers", alternative="headers")
            for header_name, header_value in encoded_headers:
                self.headers.add(header_name.decode("latin-1"), header_value.decode("latin-1"))

        if headers is not None:
            for k, v in headers.items() if isinstance(headers, dict) else headers:
                self.headers.add(k, v)  # pyright: ignore

        media_type = get_enum_string_value(media_type or MediaType.JSON)

        status_allows_body = (
            status_code not in {HTTP_204_NO_CONTENT, HTTP_304_NOT_MODIFIED} and status_code >= HTTP_200_OK
        )

        if content_length is None:
            content_length = len(body)

        if not status_allows_body or is_head_response:
            if body and body != b"null":
                raise ImproperlyConfiguredException(
                    "response content is not supported for HEAD responses and responses with a status code "
                    "that does not allow content (304, 204, < 200)"
                )
            body = b""
        else:
            self.headers.setdefault(
                "content-type", (f"{media_type}; charset={encoding}" if media_type.startswith("text/") else media_type)
            )

            if self._should_set_content_length:
                self.headers.setdefault("content-length", str(content_length))

        self.background = background
        self.body = body
        self.content_length = content_length
        self._encoded_cookies = tuple(
            cookie.to_encoded_header() for cookie in (cookies or ()) if not cookie.documentation_only
        )
        self.encoding = encoding
        self.is_head_response = is_head_response
        self.status_code = status_code

    @property
    @deprecated("3.0", kind="property", alternative="encode_headers()")
    def encoded_headers(self) -> list[tuple[bytes, bytes]]:
        return self.encode_headers()

    def encode_headers(self) -> list[tuple[bytes, bytes]]:
        return [*self.headers.headers, *self._encoded_cookies]

    async def after_response(self) -> None:
        """Execute after the response is sent.

        Returns:
            None
        """
        if self.background is not None:
            await self.background()

    async def start_response(self, send: Send) -> None:
        """Emit the start event of the response. This event includes the headers and status codes.

        Args:
            send: The ASGI send function.

        Returns:
            None
        """
        event: HTTPResponseStartEvent = {
            "type": "http.response.start",
            "status": self.status_code,
            "headers": self.encode_headers(),
        }
        await send(event)

    async def send_body(self, send: Send, receive: Receive) -> None:
        """Emit the response body.

        Args:
            send: The ASGI send function.
            receive: The ASGI receive function.

        Notes:
            - Response subclasses should customize this method if there is a need to customize sending data.

        Returns:
            None
        """
        event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": self.body, "more_body": False}
        await send(event)

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        """ASGI callable of the ``Response``.

        Args:
            scope: The ASGI connection scope.
            receive: The ASGI receive function.
            send: The ASGI send function.

        Returns:
            None
        """
        await self.start_response(send=send)

        if self.is_head_response:
            event: HTTPResponseBodyEvent = {"type": "http.response.body", "body": b"", "more_body": False}
            await send(event)
        else:
            await self.send_body(send=send, receive=receive)

        await self.after_response()


class Response(Generic[T]):
    """Base Litestar HTTP response class, used as the basis for all other response classes."""

    __slots__ = (
        "background",
        "content",
        "cookies",
        "encoding",
        "headers",
        "media_type",
        "status_code",
        "response_type_encoders",
    )

    content: T
    type_encoders: Optional[TypeEncodersMap] = None  # noqa: UP007

    def __init__(
        self,
        content: T,
        *,
        background: BackgroundTask | BackgroundTasks | None = None,
        cookies: ResponseCookies | None = None,
        encoding: str = "utf-8",
        headers: ResponseHeaders | None = None,
        media_type: MediaType | OpenAPIMediaType | str | None = None,
        status_code: int | None = None,
        type_encoders: TypeEncodersMap | None = None,
    ) -> None:
        """Initialize the response.

        Args:
            content: A value for the response body that will be rendered into bytes string.
            status_code: An HTTP status code.
            media_type: A value for the response ``Content-Type`` header.
            background: A :class:`BackgroundTask <.background_tasks.BackgroundTask>` instance or
                :class:`BackgroundTasks <.background_tasks.BackgroundTasks>` to execute after the response is finished.
                Defaults to ``None``.
            headers: A string keyed dictionary of response headers. Header keys are insensitive.
            cookies: A list of :class:`Cookie <.datastructures.Cookie>` instances to be set under the response
                ``Set-Cookie`` header.
            encoding: The encoding to be used for the response headers.
            type_encoders: A mapping of types to callables that transform them into types supported for serialization.
        """
        self.content = content
        self.background = background
        self.cookies: list[Cookie] = (
            [Cookie(key=key, value=value) for key, value in cookies.items()]
            if isinstance(cookies, Mapping)
            else list(cookies or [])
        )
        self.encoding = encoding
        self.headers: dict[str, Any] = (
            dict(headers) if isinstance(headers, Mapping) else {h.name: h.value for h in headers or {}}
        )
        self.media_type = media_type
        self.status_code = status_code
        self.response_type_encoders = {**(self.type_encoders or {}), **(type_encoders or {})}

    @overload
    def set_cookie(self, /, cookie: Cookie) -> None: ...

    @overload
    def set_cookie(
        self,
        key: str,
        value: str | None = None,
        max_age: int | None = None,
        expires: int | None = None,
        path: str = "/",
        domain: str | None = None,
        secure: bool = False,
        httponly: bool = False,
        samesite: Literal["lax", "strict", "none"] = "lax",
    ) -> None: ...

    def set_cookie(  # type: ignore[misc]
        self,
        key: str | Cookie,
        value: str | None = None,
        max_age: int | None = None,
        expires: int | None = None,
        path: str = "/",
        domain: str | None = None,
        secure: bool = False,
        httponly: bool = False,
        samesite: Literal["lax", "strict", "none"] = "lax",
    ) -> None:
        """Set a cookie on the response. If passed a :class:`Cookie <.datastructures.Cookie>` instance, keyword
        arguments will be ignored.

        Args:
            key: Key for the cookie or a :class:`Cookie <.datastructures.Cookie>` instance.
            value: Value for the cookie, if none given defaults to empty string.
            max_age: Maximal age of the cookie before its invalidated.
            expires: Seconds from now until the cookie expires.
            path: Path fragment that must exist in the request url for the cookie to be valid. Defaults to ``/``.
            domain: Domain for which the cookie is valid.
            secure: Https is required for the cookie.
            httponly: Forbids javascript to access the cookie via ``document.cookie``.
            samesite: Controls whether a cookie is sent with cross-site requests. Defaults to ``lax``.

        Returns:
            None.
        """
        if not isinstance(key, Cookie):
            key = Cookie(
                domain=domain,
                expires=expires,
                httponly=httponly,
                key=key,
                max_age=max_age,
                path=path,
                samesite=samesite,
                secure=secure,
                value=value,
            )
        self.cookies.append(key)

    def set_header(self, key: str, value: Any) -> None:
        """Set a header on the response.

        Args:
            key: Header key.
            value: Header value.

        Returns:
            None.
        """
        self.headers[key] = value

    def set_etag(self, etag: str | ETag) -> None:
        """Set an etag header.

        Args:
            etag: An etag value.

        Returns:
            None
        """
        self.headers["etag"] = etag.to_header() if isinstance(etag, ETag) else etag

    def delete_cookie(
        self,
        key: str,
        path: str = "/",
        domain: str | None = None,
    ) -> None:
        """Delete a cookie.

        Args:
            key: Key of the cookie.
            path: Path of the cookie.
            domain: Domain of the cookie.

        Returns:
            None.
        """
        cookie = Cookie(key=key, path=path, domain=domain, expires=0, max_age=0)
        self.cookies = [c for c in self.cookies if c != cookie]
        self.cookies.append(cookie)

    def render(self, content: Any, media_type: str, enc_hook: Serializer = default_serializer) -> bytes:
        """Handle the rendering of content into a bytes string.

        Returns:
            An encoded bytes string
        """
        if isinstance(content, bytes):
            return content

        if content is Empty:
            raise RuntimeError("The `Empty` sentinel cannot be used as response content")

        try:
            if media_type.startswith("text/") and not content:
                return b""

            if isinstance(content, str):
                return content.encode(self.encoding)

            if media_type == MediaType.MESSAGEPACK:
                return encode_msgpack(content, enc_hook)

            if MEDIA_TYPE_APPLICATION_JSON_PATTERN.match(
                media_type,
            ):
                return encode_json(content, enc_hook)

            raise ImproperlyConfiguredException(f"unsupported media_type {media_type} for content {content!r}")
        except (AttributeError, ValueError, TypeError) as e:
            raise ImproperlyConfiguredException("Unable to serialize response content") from e

    def to_asgi_response(
        self,
        app: Litestar | None,
        request: Request,
        *,
        background: BackgroundTask | BackgroundTasks | None = None,
        cookies: Iterable[Cookie] | None = None,
        encoded_headers: Iterable[tuple[bytes, bytes]] | None = None,
        headers: dict[str, str] | None = None,
        is_head_response: bool = False,
        media_type: MediaType | str | None = None,
        status_code: int | None = None,
        type_encoders: TypeEncodersMap | None = None,
    ) -> ASGIResponse:
        """Create an ASGIResponse from a Response instance.

        Args:
            app: The :class:`Litestar <.app.Litestar>` application instance.
            background: Background task(s) to be executed after the response is sent.
            cookies: A list of cookies to be set on the response.
            encoded_headers: A list of already encoded headers.
            headers: Additional headers to be merged with the response headers. Response headers take precedence.
            is_head_response: Whether the response is a HEAD response.
            media_type: Media type for the response. If ``media_type`` is already set on the response, this is ignored.
            request: The :class:`Request <.connection.Request>` instance.
            status_code: Status code for the response. If ``status_code`` is already set on the response, this is
            type_encoders: A dictionary of type encoders to use for encoding the response content.

        Returns:
            An ASGIResponse instance.
        """

        if app is not None:
            warn_deprecation(
                version="2.1",
                deprecated_name="app",
                kind="parameter",
                removal_in="3.0.0",
                alternative="request.app",
            )

        headers = {**headers, **self.headers} if headers is not None else self.headers
        cookies = self.cookies if cookies is None else itertools.chain(self.cookies, cookies)

        if type_encoders:
            type_encoders = {**type_encoders, **(self.response_type_encoders or {})}
        else:
            type_encoders = self.response_type_encoders

        media_type = get_enum_string_value(self.media_type or media_type or MediaType.JSON)

        return ASGIResponse(
            background=self.background or background,
            body=self.render(self.content, media_type, get_serializer(type_encoders)),
            cookies=cookies,
            encoded_headers=encoded_headers,
            encoding=self.encoding,
            headers=headers,
            is_head_response=is_head_response,
            media_type=media_type,
            status_code=self.status_code or status_code,
        )