diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/datastructures')
16 files changed, 1592 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__init__.py b/venv/lib/python3.11/site-packages/litestar/datastructures/__init__.py new file mode 100644 index 0000000..74fc25b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__init__.py @@ -0,0 +1,39 @@ +from litestar.datastructures.cookie import Cookie +from litestar.datastructures.headers import ( + Accept, + CacheControlHeader, + ETag, + Header, + Headers, + MutableScopeHeaders, +) +from litestar.datastructures.multi_dicts import ( + FormMultiDict, + ImmutableMultiDict, + MultiDict, + MultiMixin, +) +from litestar.datastructures.response_header import ResponseHeader +from litestar.datastructures.state import ImmutableState, State +from litestar.datastructures.upload_file import UploadFile +from litestar.datastructures.url import URL, Address + +__all__ = ( + "Accept", + "Address", + "CacheControlHeader", + "Cookie", + "ETag", + "FormMultiDict", + "Header", + "Headers", + "ImmutableMultiDict", + "ImmutableState", + "MultiDict", + "MultiMixin", + "MutableScopeHeaders", + "ResponseHeader", + "State", + "UploadFile", + "URL", +) diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..085a180 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/cookie.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/cookie.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..87a2646 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/cookie.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/headers.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/headers.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..4a8e1ad --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/headers.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/multi_dicts.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/multi_dicts.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..c5a3dd4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/multi_dicts.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/response_header.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/response_header.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..0c83cae --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/response_header.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/state.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/state.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..6cd1f18 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/state.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/upload_file.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/upload_file.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..4db0e27 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/upload_file.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/url.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/url.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..a9c2138 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/url.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/cookie.py b/venv/lib/python3.11/site-packages/litestar/datastructures/cookie.py new file mode 100644 index 0000000..21cedc3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/cookie.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from http.cookies import SimpleCookie +from typing import Any, Literal + +__all__ = ("Cookie",) + + +@dataclass +class Cookie: + """Container class for defining a cookie using the ``Set-Cookie`` header. + + See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Set-Cookie for more details regarding this header. + """ + + key: str + """Key for the cookie.""" + path: str = "/" + """Path fragment that must exist in the request url for the cookie to be valid. + + Defaults to ``/``. + """ + value: str | None = field(default=None) + """Value for the cookie, if none given defaults to empty string.""" + max_age: int | None = field(default=None) + """Maximal age of the cookie before its invalidated.""" + expires: int | None = field(default=None) + """Seconds from now until the cookie expires.""" + domain: str | None = field(default=None) + """Domain for which the cookie is valid.""" + secure: bool | None = field(default=None) + """Https is required for the cookie.""" + httponly: bool | None = field(default=None) + """Forbids javascript to access the cookie via ``document.cookie``.""" + samesite: Literal["lax", "strict", "none"] = field(default="lax") + """Controls whether or not a cookie is sent with cross-site requests. + + Defaults to 'lax'. + """ + description: str | None = field(default=None) + """Description of the response cookie header for OpenAPI documentation.""" + documentation_only: bool = field(default=False) + """Defines the Cookie instance as for OpenAPI documentation purpose only.""" + + @property + def simple_cookie(self) -> SimpleCookie: + """Get a simple cookie object from the values. + + Returns: + A :class:`SimpleCookie <http.cookies.SimpleCookie>` + """ + simple_cookie: SimpleCookie = SimpleCookie() + simple_cookie[self.key] = self.value or "" + + namespace = simple_cookie[self.key] + for key, value in self.dict.items(): + if key in {"key", "value"}: + continue + if value is not None: + updated_key = key + if updated_key == "max_age": + updated_key = "max-age" + namespace[updated_key] = value + + return simple_cookie + + def to_header(self, **kwargs: Any) -> str: + """Return a string representation suitable to be sent as HTTP headers. + + Args: + **kwargs: Any kwargs to pass to the simple cookie output method. + """ + return self.simple_cookie.output(**kwargs).strip() + + def to_encoded_header(self) -> tuple[bytes, bytes]: + """Create encoded header for ASGI ``send``. + + Returns: + A two tuple of bytes. + """ + return b"set-cookie", self.to_header(header="").strip().encode("latin-1") + + @property + def dict(self) -> dict[str, Any]: + """Get the cookie as a dict. + + Returns: + A dict of values + """ + return { + k: v + for k, v in asdict(self).items() + if k not in {"documentation_only", "description", "__pydantic_initialised__"} + } + + def __hash__(self) -> int: + return hash((self.key, self.path, self.domain)) + + def __eq__(self, other: Any) -> bool: + """Determine whether two cookie instances are equal according to the cookie spec, i.e. hey have a similar path, + domain and key. + + Args: + other: An arbitrary value + + Returns: + A boolean + """ + if isinstance(other, Cookie): + return other.key == self.key and other.path == self.path and other.domain == self.domain + return False diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/headers.py b/venv/lib/python3.11/site-packages/litestar/datastructures/headers.py new file mode 100644 index 0000000..f3e9bd7 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/headers.py @@ -0,0 +1,534 @@ +import re +from abc import ABC, abstractmethod +from contextlib import suppress +from copy import copy +from dataclasses import dataclass +from typing import ( + TYPE_CHECKING, + Any, + ClassVar, + Dict, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Optional, + Pattern, + Tuple, + Union, + cast, +) + +from multidict import CIMultiDict, CIMultiDictProxy, MultiMapping +from typing_extensions import get_type_hints + +from litestar._multipart import parse_content_header +from litestar.datastructures.multi_dicts import MultiMixin +from litestar.dto.base_dto import AbstractDTO +from litestar.exceptions import ImproperlyConfiguredException, ValidationException +from litestar.types.empty import Empty +from litestar.typing import FieldDefinition +from litestar.utils.dataclass import simple_asdict +from litestar.utils.scope.state import ScopeState + +if TYPE_CHECKING: + from litestar.types.asgi_types import ( + HeaderScope, + Message, + RawHeaders, + RawHeadersList, + Scope, + ) + +__all__ = ("Accept", "CacheControlHeader", "ETag", "Header", "Headers", "MutableScopeHeaders") + +ETAG_RE = re.compile(r'([Ww]/)?"(.+)"') +PRINTABLE_ASCII_RE: Pattern[str] = re.compile(r"^[ -~]+$") + + +def _encode_headers(headers: Iterable[Tuple[str, str]]) -> "RawHeadersList": + return [(key.lower().encode("latin-1"), value.encode("latin-1")) for key, value in headers] + + +class Headers(CIMultiDictProxy[str], MultiMixin[str]): + """An immutable, case-insensitive multi dict for HTTP headers.""" + + def __init__(self, headers: Optional[Union[Mapping[str, str], "RawHeaders", MultiMapping]] = None) -> None: + """Initialize ``Headers``. + + Args: + headers: Initial value. + """ + if not isinstance(headers, MultiMapping): + headers_: Union[Mapping[str, str], List[Tuple[str, str]]] = {} + if headers: + if isinstance(headers, Mapping): + headers_ = headers # pyright: ignore + else: + headers_ = [(key.decode("latin-1"), value.decode("latin-1")) for key, value in headers] + + super().__init__(CIMultiDict(headers_)) + else: + super().__init__(headers) + self._header_list: Optional[RawHeadersList] = None + + @classmethod + def from_scope(cls, scope: "Scope") -> "Headers": + """Create headers from a send-message. + + Args: + scope: The ASGI connection scope. + + Returns: + Headers + + Raises: + ValueError: If the message does not have a ``headers`` key + """ + connection_state = ScopeState.from_scope(scope) + if (headers := connection_state.headers) is Empty: + headers = connection_state.headers = cls(scope["headers"]) + return headers + + def to_header_list(self) -> "RawHeadersList": + """Raw header value. + + Returns: + A list of tuples contain the header and header-value as bytes + """ + # Since ``Headers`` are immutable, this can be cached + if not self._header_list: + self._header_list = _encode_headers((key, value) for key in set(self) for value in self.getall(key)) + return self._header_list + + +class MutableScopeHeaders(MutableMapping): + """A case-insensitive, multidict-like structure that can be used to mutate headers within a + :class:`Scope <.types.Scope>` + """ + + def __init__(self, scope: Optional["HeaderScope"] = None) -> None: + """Initialize ``MutableScopeHeaders`` from a ``HeaderScope``. + + Args: + scope: The ASGI connection scope. + """ + self.headers: RawHeadersList + if scope is not None: + if not isinstance(scope["headers"], list): + scope["headers"] = list(scope["headers"]) + + self.headers = cast("RawHeadersList", scope["headers"]) + else: + self.headers = [] + + @classmethod + def from_message(cls, message: "Message") -> "MutableScopeHeaders": + """Construct a header from a message object. + + Args: + message: :class:`Message <.types.Message>`. + + Returns: + MutableScopeHeaders. + + Raises: + ValueError: If the message does not have a ``headers`` key. + """ + if "headers" not in message: + raise ValueError(f"Invalid message type: {message['type']!r}") + + return cls(cast("HeaderScope", message)) + + def add(self, key: str, value: str) -> None: + """Add a header to the scope. + + Notes: + - This method keeps duplicates. + + Args: + key: Header key. + value: Header value. + + Returns: + None. + """ + self.headers.append((key.lower().encode("latin-1"), value.encode("latin-1"))) + + def getall(self, key: str, default: Optional[List[str]] = None) -> List[str]: + """Get all values of a header. + + Args: + key: Header key. + default: Default value to return if ``name`` is not found. + + Returns: + A list of strings. + + Raises: + KeyError: if no header for ``name`` was found and ``default`` is not given. + """ + name = key.lower() + values = [ + header_value.decode("latin-1") + for header_name, header_value in self.headers + if header_name.decode("latin-1").lower() == name + ] + if not values: + if default: + return default + raise KeyError + return values + + def extend_header_value(self, key: str, value: str) -> None: + """Extend a multivalued header. + + Notes: + - A multivalues header is a header that can take a comma separated list. + - If the header previously did not exist, it will be added. + + Args: + key: Header key. + value: Header value to add, + + Returns: + None + """ + existing = self.get(key) + if existing is not None: + value = ",".join([*existing.split(","), value]) + self[key] = value + + def __getitem__(self, key: str) -> str: + """Get the first header matching ``name``""" + name = key.lower() + for header in self.headers: + if header[0].decode("latin-1").lower() == name: + return header[1].decode("latin-1") + raise KeyError + + def _find_indices(self, key: str) -> List[int]: + name = key.lower() + return [i for i, (name_, _) in enumerate(self.headers) if name_.decode("latin-1").lower() == name] + + def __setitem__(self, key: str, value: str) -> None: + """Set a header in the scope, overwriting duplicates.""" + name_encoded = key.lower().encode("latin-1") + value_encoded = value.encode("latin-1") + if indices := self._find_indices(key): + for i in indices[1:]: + del self.headers[i] + self.headers[indices[0]] = (name_encoded, value_encoded) + else: + self.headers.append((name_encoded, value_encoded)) + + def __delitem__(self, key: str) -> None: + """Delete all headers matching ``name``""" + indices = self._find_indices(key) + for i in indices[::-1]: + del self.headers[i] + + def __len__(self) -> int: + """Return the length of the internally stored headers, including duplicates.""" + return len(self.headers) + + def __iter__(self) -> Iterator[str]: + """Create an iterator of header names including duplicates.""" + return iter(h[0].decode("latin-1") for h in self.headers) + + +@dataclass +class Header(ABC): + """An abstract type for HTTP headers.""" + + HEADER_NAME: ClassVar[str] = "" + + documentation_only: bool = False + """Defines the header instance as for OpenAPI documentation purpose only.""" + + @abstractmethod + def _get_header_value(self) -> str: + """Get the header value as string.""" + raise NotImplementedError + + @classmethod + @abstractmethod + def from_header(cls, header_value: str) -> "Header": + """Construct a header from its string representation.""" + + def to_header(self, include_header_name: bool = False) -> str: + """Get the header as string. + + Args: + include_header_name: should include the header name in the return value. If set to false + the return value will only include the header value. if set to true the return value + will be: ``<header name>: <header value>``. Defaults to false. + """ + + if not self.HEADER_NAME: + raise ImproperlyConfiguredException("Missing header name") + + return (f"{self.HEADER_NAME}: " if include_header_name else "") + self._get_header_value() + + +@dataclass +class CacheControlHeader(Header): + """A ``cache-control`` header.""" + + HEADER_NAME: ClassVar[str] = "cache-control" + + max_age: Optional[int] = None + """Accessor for the ``max-age`` directive.""" + s_maxage: Optional[int] = None + """Accessor for the ``s-maxage`` directive.""" + no_cache: Optional[bool] = None + """Accessor for the ``no-cache`` directive.""" + no_store: Optional[bool] = None + """Accessor for the ``no-store`` directive.""" + private: Optional[bool] = None + """Accessor for the ``private`` directive.""" + public: Optional[bool] = None + """Accessor for the ``public`` directive.""" + no_transform: Optional[bool] = None + """Accessor for the ``no-transform`` directive.""" + must_revalidate: Optional[bool] = None + """Accessor for the ``must-revalidate`` directive.""" + proxy_revalidate: Optional[bool] = None + """Accessor for the ``proxy-revalidate`` directive.""" + must_understand: Optional[bool] = None + """Accessor for the ``must-understand`` directive.""" + immutable: Optional[bool] = None + """Accessor for the ``immutable`` directive.""" + stale_while_revalidate: Optional[int] = None + """Accessor for the ``stale-while-revalidate`` directive.""" + + _field_definitions: ClassVar[Optional[Dict[str, FieldDefinition]]] = None + + def _get_header_value(self) -> str: + """Get the header value as string.""" + + cc_items = [ + key.replace("_", "-") if isinstance(value, bool) else f"{key.replace('_', '-')}={value}" + for key, value in simple_asdict(self, exclude_none=True, exclude={"documentation_only"}).items() + ] + return ", ".join(cc_items) + + @classmethod + def from_header(cls, header_value: str) -> "CacheControlHeader": + """Create a ``CacheControlHeader`` instance from the header value. + + Args: + header_value: the header value as string + + Returns: + An instance of ``CacheControlHeader`` + """ + + cc_items = [v.strip() for v in header_value.split(",")] + kwargs: Dict[str, Any] = {} + field_definitions = cls._get_field_definitions() + for cc_item in cc_items: + key_value = cc_item.split("=") + key_value[0] = key_value[0].replace("-", "_") + if len(key_value) == 1: + kwargs[key_value[0]] = True + elif len(key_value) == 2: + key, value = key_value + if key not in field_definitions: + raise ImproperlyConfiguredException("Invalid cache-control header") + kwargs[key] = cls._convert_to_type(value, field_definition=field_definitions[key]) + else: + raise ImproperlyConfiguredException("Invalid cache-control header value") + + try: + return CacheControlHeader(**kwargs) + except TypeError as exc: + raise ImproperlyConfiguredException from exc + + @classmethod + def prevent_storing(cls) -> "CacheControlHeader": + """Create a ``cache-control`` header with the ``no-store`` directive which indicates that any caches of any kind + (private or shared) should not store this response. + """ + + return cls(no_store=True) + + @classmethod + def _get_field_definitions(cls) -> Dict[str, FieldDefinition]: + """Get the type annotations for the ``CacheControlHeader`` class properties. + + This is needed due to the conversion from pydantic models to dataclasses. Dataclasses do not support + automatic conversion of types like pydantic models do. + + Returns: + A dictionary of type annotations + + """ + + if cls._field_definitions is None: + cls._field_definitions = {} + for key, value in get_type_hints(cls, include_extras=True).items(): + definition = FieldDefinition.from_kwarg(annotation=value, name=key) + # resolve_model_type so that field_definition.raw has the real raw type e.g. <class 'bool'> + cls._field_definitions[key] = AbstractDTO.resolve_model_type(definition) + return cls._field_definitions + + @classmethod + def _convert_to_type(cls, value: str, field_definition: FieldDefinition) -> Any: + """Convert the value to the expected type. + + Args: + value: the value of the cache-control directive + field_definition: the field definition for the value to convert + + Returns: + The value converted to the expected type + """ + # bool values shouldn't be initiated since they should have been caught earlier in the from_header method and + # set with a value of True + expected_type = field_definition.raw + if expected_type is bool: + raise ImproperlyConfiguredException("Invalid cache-control header value") + return expected_type(value) + + +@dataclass +class ETag(Header): + """An ``etag`` header.""" + + HEADER_NAME: ClassVar[str] = "etag" + + weak: bool = False + value: Optional[str] = None # only ASCII characters + + def _get_header_value(self) -> str: + value = f'"{self.value}"' + return f"W/{value}" if self.weak else value + + @classmethod + def from_header(cls, header_value: str) -> "ETag": + """Construct an ``etag`` header from its string representation. + + Note that this will unquote etag-values + """ + match = ETAG_RE.match(header_value) + if not match: + raise ImproperlyConfiguredException + weak, value = match.group(1, 2) + try: + return cls(weak=bool(weak), value=value) + except ValueError as exc: + raise ImproperlyConfiguredException from exc + + def __post_init__(self) -> None: + if self.documentation_only is False and self.value is None: + raise ValidationException("value must be set if documentation_only is false") + if self.value and not PRINTABLE_ASCII_RE.fullmatch(self.value): + raise ValidationException("value must only contain ASCII printable characters") + + +class MediaTypeHeader: + """A helper class for ``Accept`` header parsing.""" + + __slots__ = ("maintype", "subtype", "params", "_params_str") + + def __init__(self, type_str: str) -> None: + # preserve the original parameters, because the order might be + # changed in the dict + self._params_str = "".join(type_str.partition(";")[1:]) + + full_type, self.params = parse_content_header(type_str) + self.maintype, _, self.subtype = full_type.partition("/") + + def __str__(self) -> str: + return f"{self.maintype}/{self.subtype}{self._params_str}" + + @property + def priority(self) -> Tuple[int, int]: + # Use fixed point values with two decimals to avoid problems + # when comparing float values + quality = 100 + if "q" in self.params: + with suppress(ValueError): + quality = int(100 * float(self.params["q"])) + + if self.maintype == "*": + specificity = 0 + elif self.subtype == "*": + specificity = 1 + elif not self.params or ("q" in self.params and len(self.params) == 1): + # no params or 'q' is the only one which we ignore + specificity = 2 + else: + specificity = 3 + + return quality, specificity + + def match(self, other: "MediaTypeHeader") -> bool: + return next( + (False for key, value in self.params.items() if key != "q" and value != other.params.get(key)), + False + if self.subtype != "*" and other.subtype != "*" and self.subtype != other.subtype + else self.maintype == "*" or other.maintype == "*" or self.maintype == other.maintype, + ) + + +class Accept: + """An ``Accept`` header.""" + + __slots__ = ("_accepted_types",) + + def __init__(self, accept_value: str) -> None: + self._accepted_types = [MediaTypeHeader(t) for t in accept_value.split(",")] + self._accepted_types.sort(key=lambda t: t.priority, reverse=True) + + def __len__(self) -> int: + return len(self._accepted_types) + + def __getitem__(self, key: int) -> str: + return str(self._accepted_types[key]) + + def __iter__(self) -> Iterator[str]: + return map(str, self._accepted_types) + + def best_match(self, provided_types: List[str], default: Optional[str] = None) -> Optional[str]: + """Find the best matching media type for the request. + + Args: + provided_types: A list of media types that can be provided as a response. These types + can contain a wildcard ``*`` character in the main- or subtype part. + default: The media type that is returned if none of the provided types match. + + Returns: + The best matching media type. If the matching provided type contains wildcard characters, + they are replaced with the corresponding part of the accepted type. Otherwise the + provided type is returned as-is. + """ + types = [MediaTypeHeader(t) for t in provided_types] + + for accepted in self._accepted_types: + for provided in types: + if provided.match(accepted): + # Return the accepted type with wildcards replaced + # by concrete parts from the provided type + result = copy(provided) + if result.subtype == "*": + result.subtype = accepted.subtype + if result.maintype == "*": + result.maintype = accepted.maintype + return str(result) + return default + + def accepts(self, media_type: str) -> bool: + """Check if the request accepts the specified media type. + + If multiple media types can be provided, it is better to use :func:`best_match`. + + Args: + media_type: The media type to check for. + + Returns: + True if the request accepts ``media_type``. + """ + return self.best_match([media_type]) == media_type diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/multi_dicts.py b/venv/lib/python3.11/site-packages/litestar/datastructures/multi_dicts.py new file mode 100644 index 0000000..7702e1a --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/multi_dicts.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +from abc import ABC +from typing import TYPE_CHECKING, Any, Generator, Generic, Iterable, Mapping, TypeVar + +from multidict import MultiDict as BaseMultiDict +from multidict import MultiDictProxy, MultiMapping + +from litestar.datastructures.upload_file import UploadFile + +if TYPE_CHECKING: + from typing_extensions import Self + + +__all__ = ("FormMultiDict", "ImmutableMultiDict", "MultiDict", "MultiMixin") + + +T = TypeVar("T") + + +class MultiMixin(Generic[T], MultiMapping[T], ABC): + """Mixin providing common methods for multi dicts, used by :class:`ImmutableMultiDict` and :class:`MultiDict`""" + + def dict(self) -> dict[str, list[Any]]: + """Return the multi-dict as a dict of lists. + + Returns: + A dict of lists + """ + return {k: self.getall(k) for k in set(self.keys())} + + def multi_items(self) -> Generator[tuple[str, T], None, None]: + """Get all keys and values, including duplicates. + + Returns: + A list of tuples containing key-value pairs + """ + for key in set(self): + for value in self.getall(key): + yield key, value + + +class MultiDict(BaseMultiDict[T], MultiMixin[T], Generic[T]): + """MultiDict, using :class:`MultiDict <multidict.MultiDictProxy>`.""" + + def __init__(self, args: MultiMapping | Mapping[str, T] | Iterable[tuple[str, T]] | None = None) -> None: + """Initialize ``MultiDict`` from a`MultiMapping``, + :class:`Mapping <typing.Mapping>` or an iterable of tuples. + + Args: + args: Mapping-like structure to create the ``MultiDict`` from + """ + super().__init__(args or {}) + + def immutable(self) -> ImmutableMultiDict[T]: + """Create an. + + :class:`ImmutableMultiDict` view. + + Returns: + An immutable multi dict + """ + return ImmutableMultiDict[T](self) # pyright: ignore + + def copy(self) -> Self: + """Return a shallow copy""" + return type(self)(list(self.multi_items())) + + +class ImmutableMultiDict(MultiDictProxy[T], MultiMixin[T], Generic[T]): + """Immutable MultiDict, using class:`MultiDictProxy <multidict.MultiDictProxy>`.""" + + def __init__(self, args: MultiMapping | Mapping[str, Any] | Iterable[tuple[str, Any]] | None = None) -> None: + """Initialize ``ImmutableMultiDict`` from a `MultiMapping``, + :class:`Mapping <typing.Mapping>` or an iterable of tuples. + + Args: + args: Mapping-like structure to create the ``ImmutableMultiDict`` from + """ + super().__init__(BaseMultiDict(args or {})) + + def mutable_copy(self) -> MultiDict[T]: + """Create a mutable copy as a :class:`MultiDict` + + Returns: + A mutable multi dict + """ + return MultiDict(list(self.multi_items())) + + def copy(self) -> Self: # type: ignore[override] + """Return a shallow copy""" + return type(self)(self.items()) + + +class FormMultiDict(ImmutableMultiDict[Any]): + """MultiDict for form data.""" + + async def close(self) -> None: + """Close all files in the multi-dict. + + Returns: + None + """ + for _, value in self.multi_items(): + if isinstance(value, UploadFile): + await value.close() diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/response_header.py b/venv/lib/python3.11/site-packages/litestar/datastructures/response_header.py new file mode 100644 index 0000000..f781d0c --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/response_header.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from litestar.exceptions import ImproperlyConfiguredException + +if TYPE_CHECKING: + from litestar.openapi.spec import Example + +__all__ = ("ResponseHeader",) + + +@dataclass +class ResponseHeader: + """Container type for a response header.""" + + name: str + """Header name""" + + documentation_only: bool = False + """Defines the ResponseHeader instance as for OpenAPI documentation purpose only.""" + + value: str | None = None + """Value to set for the response header.""" + + description: str | None = None + """A brief description of the parameter. This could contain examples of + use. + + [CommonMark syntax](https://spec.commonmark.org/) MAY be used for + rich text representation. + """ + + required: bool = False + """Determines whether this parameter is mandatory. + + If the [parameter location](https://spec.openapis.org/oas/v3.1.0#parameterIn) is `"path"`, this property is **REQUIRED** and its value MUST be `true`. + Otherwise, the property MAY be included and its default value is `false`. + """ + + deprecated: bool = False + """Specifies that a parameter is deprecated and SHOULD be transitioned out + of usage. + + Default value is `false`. + """ + + allow_empty_value: bool = False + """Sets the ability to pass empty-valued parameters. This is valid only for + `query` parameters and allows sending a parameter with an empty value. + Default value is `false`. If. + + [style](https://spec.openapis.org/oas/v3.1.0#parameterStyle) is used, and if behavior is `n/a` (cannot be + serialized), the value of `allowEmptyValue` SHALL be ignored. Use of this property is NOT RECOMMENDED, as it is + likely to be removed in a later revision. + + The rules for serialization of the parameter are specified in one of two ways. + For simpler scenarios, a [schema](https://spec.openapis.org/oas/v3.1.0#parameterSchema) and [style](https://spec.openapis.org/oas/v3.1.0#parameterStyle) + can describe the structure and syntax of the parameter. + """ + + style: str | None = None + """Describes how the parameter value will be serialized depending on the + type of the parameter value. Default values (based on value of `in`): + + - for `query` - `form`; + - for `path` - `simple`; + - for `header` - `simple`; + - for `cookie` - `form`. + """ + + explode: bool | None = None + """When this is true, parameter values of type `array` or `object` generate + separate parameters for each value of the array or key-value pair of the + map. + + For other types of parameters this property has no effect. + When [style](https://spec.openapis.org/oas/v3.1.0#parameterStyle) is `form`, the default value is `true`. + For all other styles, the default value is `false`. + """ + + allow_reserved: bool = False + """Determines whether the parameter value SHOULD allow reserved characters, + as defined by. + + [RFC3986](https://tools.ietf.org/html/rfc3986#section-2.2) `:/?#[]@!$&'()*+,;=` to be included without percent- + encoding. + + This property only applies to parameters with an `in` value of `query`. The default value is `false`. + """ + + example: Any | None = None + """Example of the parameter's potential value. + + The example SHOULD match the specified schema and encoding + properties if present. The `example` field is mutually exclusive of + the `examples` field. Furthermore, if referencing a `schema` that + contains an example, the `example` value SHALL _override_ the + example provided by the schema. To represent examples of media types + that cannot naturally be represented in JSON or YAML, a string value + can contain the example with escaping where necessary. + """ + + examples: dict[str, Example] | None = None + """Examples of the parameter's potential value. Each example SHOULD contain + a value in the correct format as specified in the parameter encoding. The + `examples` field is mutually exclusive of the `example` field. Furthermore, + if referencing a `schema` that contains an example, the `examples` value + SHALL _override_ the example provided by the schema. + + For more complex scenarios, the [content](https://spec.openapis.org/oas/v3.1.0#parameterContent) property + can define the media type and schema of the parameter. + A parameter MUST contain either a `schema` property, or a `content` property, but not both. + When `example` or `examples` are provided in conjunction with the `schema` object, + the example MUST follow the prescribed serialization strategy for the parameter. + """ + + def __post_init__(self) -> None: + """Ensure that either value is set or the instance is for documentation_only.""" + if not self.documentation_only and self.value is None: + raise ImproperlyConfiguredException("value must be set if documentation_only is false") + + def __hash__(self) -> int: + return hash(self.name) diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/state.py b/venv/lib/python3.11/site-packages/litestar/datastructures/state.py new file mode 100644 index 0000000..71980e0 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/state.py @@ -0,0 +1,313 @@ +from __future__ import annotations + +from copy import copy, deepcopy +from threading import RLock +from typing import TYPE_CHECKING, Any, Callable, Generator, Iterable, Iterator, Mapping, MutableMapping + +if TYPE_CHECKING: + from typing_extensions import Self + +__all__ = ("ImmutableState", "State") + + +class ImmutableState(Mapping[str, Any]): + """An object meant to store arbitrary state. + + It can be accessed using dot notation while exposing dict like functionalities. + """ + + __slots__ = ( + "_state", + "_deep_copy", + ) + + _state: dict[str, Any] + + def __init__( + self, state: ImmutableState | Mapping[str, Any] | Iterable[tuple[str, Any]], deep_copy: bool = True + ) -> None: + """Initialize an ``ImmutableState`` instance. + + Args: + state: An object to initialize the state from. Can be a dict, an instance of :class:`ImmutableState`, or a tuple + of key value paris. + deep_copy: Whether to 'deepcopy' the passed in state. + + Examples: + .. code-block:: python + + from litestar.datastructures import ImmutableState + + state_dict = {"first": 1, "second": 2, "third": 3, "fourth": 4} + state = ImmutableState(state_dict) + + # state implements the Mapping type: + assert len(state) == 3 + assert "first" in state + assert not "fourth" in state + assert state["first"] == 1 + assert [(k, v) for k, v in state.items()] == [("first", 1), ("second", 2), ("third", 3)] + + # state implements __bool__ + assert state # state is true when it has values. + assert not State() # state is empty when it has no values. + + # it has a 'dict' method to retrieve a shallow copy of the underlying dict + inner_dict = state.dict() + assert inner_dict == state_dict + + # you can also retrieve a mutable State by calling 'mutable_copy' + mutable_state = state.mutable_copy() + del state["first"] + assert "first" not in state + + """ + if isinstance(state, ImmutableState): + state = state._state + + if not isinstance(state, dict) and isinstance(state, Iterable): + state = dict(state) + + super().__setattr__("_deep_copy", deep_copy) + super().__setattr__("_state", deepcopy(state) if deep_copy else state) + + def __bool__(self) -> bool: + """Return a boolean indicating whether the wrapped dict instance has values.""" + return bool(self._state) + + def __getitem__(self, key: str) -> Any: + """Get the value for the corresponding key from the wrapped state object using subscription notation. + + Args: + key: Key to access. + + Raises: + KeyError + + Returns: + A value from the wrapped state instance. + """ + return self._state[key] + + def __iter__(self) -> Iterator[str]: + """Return an iterator iterating the wrapped state dict. + + Returns: + An iterator of strings + """ + return iter(self._state) + + def __len__(self) -> int: + """Return length of the wrapped state dict. + + Returns: + An integer + """ + return len(self._state) + + def __getattr__(self, key: str) -> Any: + """Get the value for the corresponding key from the wrapped state object using attribute notation. + + Args: + key: Key to retrieve + + Raises: + AttributeError: if the given attribute is not set. + + Returns: + The retrieved value + """ + try: + return self._state[key] + except KeyError as e: + raise AttributeError from e + + def __copy__(self) -> Self: + """Return a shallow copy of the given state object. + + Customizes how the builtin "copy" function will work. + """ + return self.__class__(self._state, deep_copy=self._deep_copy) # pyright: ignore + + def mutable_copy(self) -> State: + """Return a mutable copy of the state object. + + Returns: + A ``State`` + """ + return State(self._state, deep_copy=self._deep_copy) + + def dict(self) -> dict[str, Any]: + """Return a shallow copy of the wrapped dict. + + Returns: + A dict + """ + return copy(self._state) + + @classmethod + def __get_validators__( + cls, + ) -> Generator[Callable[[ImmutableState | dict[str, Any] | Iterable[tuple[str, Any]]], ImmutableState], None, None]: # type: ignore[valid-type] + """Pydantic compatible method to allow custom parsing of state instances in a SignatureModel.""" + yield cls.validate + + @classmethod + def validate(cls, value: ImmutableState | dict[str, Any] | Iterable[tuple[str, Any]]) -> Self: # type: ignore[valid-type] + """Parse a value and instantiate state inside a SignatureModel. This allows us to use custom subclasses of + state, as well as allows users to decide whether state is mutable or immutable. + + Args: + value: The value from which to initialize the state instance. + + Returns: + An ImmutableState instance + """ + deep_copy = value._deep_copy if isinstance(value, ImmutableState) else False + return cls(value, deep_copy=deep_copy) + + +class State(ImmutableState, MutableMapping[str, Any]): + """An object meant to store arbitrary state. + + It can be accessed using dot notation while exposing dict like functionalities. + """ + + __slots__ = ("_lock",) + + _lock: RLock + + def __init__( + self, + state: ImmutableState | Mapping[str, Any] | Iterable[tuple[str, Any]] | None = None, + deep_copy: bool = False, + ) -> None: + """Initialize a ``State`` instance with an optional value. + + Args: + state: An object to initialize the state from. Can be a dict, an instance of 'ImmutableState', or a tuple of key value paris. + deep_copy: Whether to 'deepcopy' the passed in state. + + .. code-block:: python + :caption: Examples + + from litestar.datastructures import State + + state_dict = {"first": 1, "second": 2, "third": 3, "fourth": 4} + state = State(state_dict) + + # state can be accessed using '.' notation + assert state.fourth == 4 + del state.fourth + + # state implements the Mapping type: + assert len(state) == 3 + assert "first" in state + assert not "fourth" in state + assert state["first"] == 1 + assert [(k, v) for k, v in state.items()] == [("first", 1), ("second", 2), ("third", 3)] + + state["fourth"] = 4 + assert "fourth" in state + del state["fourth"] + + # state implements __bool__ + assert state # state is true when it has values. + assert not State() # state is empty when it has no values. + + # it has shallow copy + copied_state = state.copy() + del copied_state.first + assert state.first + + # it has a 'dict' method to retrieve a shallow copy of the underlying dict + inner_dict = state.dict() + assert inner_dict == state_dict + + # you can get an immutable copy of the state by calling 'immutable_immutable_copy' + immutable_copy = state.immutable_copy() + del immutable_copy.first # raises AttributeError + + """ + + super().__init__(state if state is not None else {}, deep_copy=deep_copy) + super().__setattr__("_lock", RLock()) + + def __delitem__(self, key: str) -> None: + """Delete the value from the key from the wrapped state object using subscription notation. + + Args: + key: Key to delete + + Raises: + KeyError: if the given attribute is not set. + + Returns: + None + """ + + with self._lock: + del self._state[key] + + def __setitem__(self, key: str, value: Any) -> None: + """Set an item in the state using subscription notation. + + Args: + key: Key to set. + value: Value to set. + + Returns: + None + """ + + with self._lock: + self._state[key] = value + + def __setattr__(self, key: str, value: Any) -> None: + """Set an item in the state using attribute notation. + + Args: + key: Key to set. + value: Value to set. + + Returns: + None + """ + + with self._lock: + self._state[key] = value + + def __delattr__(self, key: str) -> None: + """Delete the value from the key from the wrapped state object using attribute notation. + + Args: + key: Key to delete + + Raises: + AttributeError: if the given attribute is not set. + + Returns: + None + """ + + try: + with self._lock: + del self._state[key] + except KeyError as e: + raise AttributeError from e + + def copy(self) -> Self: + """Return a shallow copy of the state object. + + Returns: + A ``State`` + """ + return self.__class__(self.dict(), deep_copy=self._deep_copy) # pyright: ignore + + def immutable_copy(self) -> ImmutableState: + """Return a shallow copy of the state object, setting it to be frozen. + + Returns: + A ``State`` + """ + return ImmutableState(self, deep_copy=self._deep_copy) diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/upload_file.py b/venv/lib/python3.11/site-packages/litestar/datastructures/upload_file.py new file mode 100644 index 0000000..09ad2d3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/upload_file.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +from tempfile import SpooledTemporaryFile + +from litestar.concurrency import sync_to_thread +from litestar.constants import ONE_MEGABYTE + +__all__ = ("UploadFile",) + + +class UploadFile: + """Representation of a file upload""" + + __slots__ = ("filename", "file", "content_type", "headers") + + def __init__( + self, + content_type: str, + filename: str, + file_data: bytes | None = None, + headers: dict[str, str] | None = None, + max_spool_size: int = ONE_MEGABYTE, + ) -> None: + """Upload file in-memory container. + + Args: + content_type: Content type for the file. + filename: The filename. + file_data: File data. + headers: Any attached headers. + max_spool_size: The size above which the temporary file will be rolled to disk. + """ + self.filename = filename + self.content_type = content_type + self.file = SpooledTemporaryFile(max_size=max_spool_size) + self.headers = headers or {} + + if file_data: + self.file.write(file_data) + self.file.seek(0) + + @property + def rolled_to_disk(self) -> bool: + """Determine whether the spooled file exceeded the rolled-to-disk threshold and is no longer in memory. + + Returns: + A boolean flag + """ + return getattr(self.file, "_rolled", False) + + async def write(self, data: bytes) -> int: + """Proxy for data writing. + + Args: + data: Byte string to write. + + Returns: + None + """ + if self.rolled_to_disk: + return await sync_to_thread(self.file.write, data) + return self.file.write(data) + + async def read(self, size: int = -1) -> bytes: + """Proxy for data reading. + + Args: + size: position from which to read. + + Returns: + Byte string. + """ + if self.rolled_to_disk: + return await sync_to_thread(self.file.read, size) + return self.file.read(size) + + async def seek(self, offset: int) -> int: + """Async proxy for file seek. + + Args: + offset: start position.. + + Returns: + None. + """ + if self.rolled_to_disk: + return await sync_to_thread(self.file.seek, offset) + return self.file.seek(offset) + + async def close(self) -> None: + """Async proxy for file close. + + Returns: + None. + """ + if self.rolled_to_disk: + return await sync_to_thread(self.file.close) + return self.file.close() + + def __repr__(self) -> str: + return f"{self.filename} - {self.content_type}" diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/url.py b/venv/lib/python3.11/site-packages/litestar/datastructures/url.py new file mode 100644 index 0000000..f3441d0 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/datastructures/url.py @@ -0,0 +1,262 @@ +from __future__ import annotations + +from functools import lru_cache +from typing import TYPE_CHECKING, Any, NamedTuple +from urllib.parse import SplitResult, urlencode, urlsplit, urlunsplit + +from litestar._parsers import parse_query_string +from litestar.datastructures import MultiDict +from litestar.types import Empty + +if TYPE_CHECKING: + from typing_extensions import Self + + from litestar.types import EmptyType, Scope + +__all__ = ("Address", "URL") + +_DEFAULT_SCHEME_PORTS = {"http": 80, "https": 443, "ftp": 21, "ws": 80, "wss": 443} + + +class Address(NamedTuple): + """Just a network address.""" + + host: str + """Address host.""" + port: int + """Address port.""" + + +def make_absolute_url(path: str | URL, base: str | URL) -> str: + """Create an absolute URL. + + Args: + path: URL path to make absolute + base: URL to use as a base + + Returns: + A string representing the new, absolute URL + """ + url = base if isinstance(base, URL) else URL(base) + netloc = url.netloc + path = url.path.rstrip("/") + str(path) + return str(URL.from_components(scheme=url.scheme, netloc=netloc, path=path)) + + +class URL: + """Representation and modification utilities of a URL.""" + + __slots__ = ( + "_query_params", + "_parsed_url", + "fragment", + "hostname", + "netloc", + "password", + "path", + "port", + "query", + "scheme", + "username", + ) + + _query_params: EmptyType | MultiDict + _parsed_url: str | None + + scheme: str + """URL scheme.""" + netloc: str + """Network location.""" + path: str + """Hierarchical path.""" + fragment: str + """Fragment component.""" + query: str + """Query string.""" + username: str | None + """Username if specified.""" + password: str | None + """Password if specified.""" + port: int | None + """Port if specified.""" + hostname: str | None + """Hostname if specified.""" + + def __new__(cls, url: str | SplitResult) -> URL: + """Create a new instance. + + Args: + url: url string or split result to represent. + """ + return cls._new(url=url) + + @classmethod + @lru_cache + def _new(cls, url: str | SplitResult) -> URL: + instance = super().__new__(cls) + instance._parsed_url = None + + if isinstance(url, str): + result = urlsplit(url) + instance._parsed_url = url + else: + result = url + + instance.scheme = result.scheme + instance.netloc = result.netloc + instance.path = result.path + instance.fragment = result.fragment + instance.query = result.query + instance.username = result.username + instance.password = result.password + instance.port = result.port + instance.hostname = result.hostname + instance._query_params = Empty + + return instance + + @property + def _url(self) -> str: + if not self._parsed_url: + self._parsed_url = str( + urlunsplit( + SplitResult( + scheme=self.scheme, + netloc=self.netloc, + path=self.path, + fragment=self.fragment, + query=self.query, + ) + ) + ) + return self._parsed_url + + @classmethod + @lru_cache + def from_components( + cls, + scheme: str = "", + netloc: str = "", + path: str = "", + fragment: str = "", + query: str = "", + ) -> Self: + """Create a new URL from components. + + Args: + scheme: URL scheme + netloc: Network location + path: Hierarchical path + query: Query component + fragment: Fragment identifier + + Returns: + A new URL with the given components + """ + return cls( + SplitResult( + scheme=scheme, + netloc=netloc, + path=path, + fragment=fragment, + query=query, + ) + ) + + @classmethod + def from_scope(cls, scope: Scope) -> Self: + """Construct a URL from a :class:`Scope <.types.Scope>` + + Args: + scope: A scope + + Returns: + A URL + """ + scheme = scope.get("scheme", "http") + server = scope.get("server") + path = scope.get("root_path", "") + scope["path"] + query_string = scope.get("query_string", b"") + + # we use iteration here because it's faster, and headers might not yet be cached + host = next( + ( + header_value.decode("latin-1") + for header_name, header_value in scope.get("headers", []) + if header_name == b"host" + ), + "", + ) + if server and not host: + host, port = server + default_port = _DEFAULT_SCHEME_PORTS[scheme] + if port != default_port: + host = f"{host}:{port}" + + return cls.from_components( + scheme=scheme if server else "", + query=query_string.decode(), + netloc=host, + path=path, + ) + + def with_replacements( + self, + scheme: str = "", + netloc: str = "", + path: str = "", + query: str | MultiDict | None | EmptyType = Empty, + fragment: str = "", + ) -> Self: + """Create a new URL, replacing the given components. + + Args: + scheme: URL scheme + netloc: Network location + path: Hierarchical path + query: Raw query string + fragment: Fragment identifier + + Returns: + A new URL with the given components replaced + """ + if isinstance(query, MultiDict): + query = urlencode(query=query) + + query = (query if query is not Empty else self.query) or "" + + return type(self).from_components( + scheme=scheme or self.scheme, + netloc=netloc or self.netloc, + path=path or self.path, + query=query, + fragment=fragment or self.fragment, + ) + + @property + def query_params(self) -> MultiDict: + """Query parameters of a URL as a :class:`MultiDict <.datastructures.multi_dicts.MultiDict>` + + Returns: + A :class:`MultiDict <.datastructures.multi_dicts.MultiDict>` with query parameters + + Notes: + - The returned ``MultiDict`` is mutable, :class:`URL` itself is *immutable*, + therefore mutating the query parameters will not directly mutate the ``URL``. + If you want to modify query parameters, make modifications in the + multidict and pass them back to :meth:`with_replacements` + """ + if self._query_params is Empty: + self._query_params = MultiDict(parse_query_string(query_string=self.query.encode())) + return self._query_params + + def __str__(self) -> str: + return self._url + + def __eq__(self, other: Any) -> bool: + if isinstance(other, (str, URL)): + return str(self) == str(other) + return NotImplemented # pragma: no cover + + def __repr__(self) -> str: + return f"{type(self).__name__}({self._url!r})" |