summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/datastructures
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/datastructures
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/datastructures')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/__init__.py39
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/__init__.cpython-311.pycbin0 -> 1221 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/cookie.cpython-311.pycbin0 -> 5293 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/headers.cpython-311.pycbin0 -> 30616 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/multi_dicts.cpython-311.pycbin0 -> 6713 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/response_header.cpython-311.pycbin0 -> 2338 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/state.cpython-311.pycbin0 -> 13750 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/upload_file.cpython-311.pycbin0 -> 4572 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/url.cpython-311.pycbin0 -> 10120 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/cookie.py112
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/headers.py534
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/multi_dicts.py106
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/response_header.py125
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/state.py313
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/upload_file.py101
-rw-r--r--venv/lib/python3.11/site-packages/litestar/datastructures/url.py262
16 files changed, 1592 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__init__.py b/venv/lib/python3.11/site-packages/litestar/datastructures/__init__.py
new file mode 100644
index 0000000..74fc25b
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__init__.py
@@ -0,0 +1,39 @@
+from litestar.datastructures.cookie import Cookie
+from litestar.datastructures.headers import (
+ Accept,
+ CacheControlHeader,
+ ETag,
+ Header,
+ Headers,
+ MutableScopeHeaders,
+)
+from litestar.datastructures.multi_dicts import (
+ FormMultiDict,
+ ImmutableMultiDict,
+ MultiDict,
+ MultiMixin,
+)
+from litestar.datastructures.response_header import ResponseHeader
+from litestar.datastructures.state import ImmutableState, State
+from litestar.datastructures.upload_file import UploadFile
+from litestar.datastructures.url import URL, Address
+
+__all__ = (
+ "Accept",
+ "Address",
+ "CacheControlHeader",
+ "Cookie",
+ "ETag",
+ "FormMultiDict",
+ "Header",
+ "Headers",
+ "ImmutableMultiDict",
+ "ImmutableState",
+ "MultiDict",
+ "MultiMixin",
+ "MutableScopeHeaders",
+ "ResponseHeader",
+ "State",
+ "UploadFile",
+ "URL",
+)
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000..085a180
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/__init__.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/cookie.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/cookie.cpython-311.pyc
new file mode 100644
index 0000000..87a2646
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/cookie.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/headers.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/headers.cpython-311.pyc
new file mode 100644
index 0000000..4a8e1ad
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/headers.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/multi_dicts.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/multi_dicts.cpython-311.pyc
new file mode 100644
index 0000000..c5a3dd4
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/multi_dicts.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/response_header.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/response_header.cpython-311.pyc
new file mode 100644
index 0000000..0c83cae
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/response_header.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/state.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/state.cpython-311.pyc
new file mode 100644
index 0000000..6cd1f18
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/state.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/upload_file.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/upload_file.cpython-311.pyc
new file mode 100644
index 0000000..4db0e27
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/upload_file.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/url.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/url.cpython-311.pyc
new file mode 100644
index 0000000..a9c2138
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/__pycache__/url.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/cookie.py b/venv/lib/python3.11/site-packages/litestar/datastructures/cookie.py
new file mode 100644
index 0000000..21cedc3
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/cookie.py
@@ -0,0 +1,112 @@
+from __future__ import annotations
+
+from dataclasses import asdict, dataclass, field
+from http.cookies import SimpleCookie
+from typing import Any, Literal
+
+__all__ = ("Cookie",)
+
+
+@dataclass
+class Cookie:
+ """Container class for defining a cookie using the ``Set-Cookie`` header.
+
+ See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Set-Cookie for more details regarding this header.
+ """
+
+ key: str
+ """Key for the cookie."""
+ path: str = "/"
+ """Path fragment that must exist in the request url for the cookie to be valid.
+
+ Defaults to ``/``.
+ """
+ value: str | None = field(default=None)
+ """Value for the cookie, if none given defaults to empty string."""
+ max_age: int | None = field(default=None)
+ """Maximal age of the cookie before its invalidated."""
+ expires: int | None = field(default=None)
+ """Seconds from now until the cookie expires."""
+ domain: str | None = field(default=None)
+ """Domain for which the cookie is valid."""
+ secure: bool | None = field(default=None)
+ """Https is required for the cookie."""
+ httponly: bool | None = field(default=None)
+ """Forbids javascript to access the cookie via ``document.cookie``."""
+ samesite: Literal["lax", "strict", "none"] = field(default="lax")
+ """Controls whether or not a cookie is sent with cross-site requests.
+
+ Defaults to 'lax'.
+ """
+ description: str | None = field(default=None)
+ """Description of the response cookie header for OpenAPI documentation."""
+ documentation_only: bool = field(default=False)
+ """Defines the Cookie instance as for OpenAPI documentation purpose only."""
+
+ @property
+ def simple_cookie(self) -> SimpleCookie:
+ """Get a simple cookie object from the values.
+
+ Returns:
+ A :class:`SimpleCookie <http.cookies.SimpleCookie>`
+ """
+ simple_cookie: SimpleCookie = SimpleCookie()
+ simple_cookie[self.key] = self.value or ""
+
+ namespace = simple_cookie[self.key]
+ for key, value in self.dict.items():
+ if key in {"key", "value"}:
+ continue
+ if value is not None:
+ updated_key = key
+ if updated_key == "max_age":
+ updated_key = "max-age"
+ namespace[updated_key] = value
+
+ return simple_cookie
+
+ def to_header(self, **kwargs: Any) -> str:
+ """Return a string representation suitable to be sent as HTTP headers.
+
+ Args:
+ **kwargs: Any kwargs to pass to the simple cookie output method.
+ """
+ return self.simple_cookie.output(**kwargs).strip()
+
+ def to_encoded_header(self) -> tuple[bytes, bytes]:
+ """Create encoded header for ASGI ``send``.
+
+ Returns:
+ A two tuple of bytes.
+ """
+ return b"set-cookie", self.to_header(header="").strip().encode("latin-1")
+
+ @property
+ def dict(self) -> dict[str, Any]:
+ """Get the cookie as a dict.
+
+ Returns:
+ A dict of values
+ """
+ return {
+ k: v
+ for k, v in asdict(self).items()
+ if k not in {"documentation_only", "description", "__pydantic_initialised__"}
+ }
+
+ def __hash__(self) -> int:
+ return hash((self.key, self.path, self.domain))
+
+ def __eq__(self, other: Any) -> bool:
+ """Determine whether two cookie instances are equal according to the cookie spec, i.e. hey have a similar path,
+ domain and key.
+
+ Args:
+ other: An arbitrary value
+
+ Returns:
+ A boolean
+ """
+ if isinstance(other, Cookie):
+ return other.key == self.key and other.path == self.path and other.domain == self.domain
+ return False
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/headers.py b/venv/lib/python3.11/site-packages/litestar/datastructures/headers.py
new file mode 100644
index 0000000..f3e9bd7
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/headers.py
@@ -0,0 +1,534 @@
+import re
+from abc import ABC, abstractmethod
+from contextlib import suppress
+from copy import copy
+from dataclasses import dataclass
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ ClassVar,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ MutableMapping,
+ Optional,
+ Pattern,
+ Tuple,
+ Union,
+ cast,
+)
+
+from multidict import CIMultiDict, CIMultiDictProxy, MultiMapping
+from typing_extensions import get_type_hints
+
+from litestar._multipart import parse_content_header
+from litestar.datastructures.multi_dicts import MultiMixin
+from litestar.dto.base_dto import AbstractDTO
+from litestar.exceptions import ImproperlyConfiguredException, ValidationException
+from litestar.types.empty import Empty
+from litestar.typing import FieldDefinition
+from litestar.utils.dataclass import simple_asdict
+from litestar.utils.scope.state import ScopeState
+
+if TYPE_CHECKING:
+ from litestar.types.asgi_types import (
+ HeaderScope,
+ Message,
+ RawHeaders,
+ RawHeadersList,
+ Scope,
+ )
+
+__all__ = ("Accept", "CacheControlHeader", "ETag", "Header", "Headers", "MutableScopeHeaders")
+
+ETAG_RE = re.compile(r'([Ww]/)?"(.+)"')
+PRINTABLE_ASCII_RE: Pattern[str] = re.compile(r"^[ -~]+$")
+
+
+def _encode_headers(headers: Iterable[Tuple[str, str]]) -> "RawHeadersList":
+ return [(key.lower().encode("latin-1"), value.encode("latin-1")) for key, value in headers]
+
+
+class Headers(CIMultiDictProxy[str], MultiMixin[str]):
+ """An immutable, case-insensitive multi dict for HTTP headers."""
+
+ def __init__(self, headers: Optional[Union[Mapping[str, str], "RawHeaders", MultiMapping]] = None) -> None:
+ """Initialize ``Headers``.
+
+ Args:
+ headers: Initial value.
+ """
+ if not isinstance(headers, MultiMapping):
+ headers_: Union[Mapping[str, str], List[Tuple[str, str]]] = {}
+ if headers:
+ if isinstance(headers, Mapping):
+ headers_ = headers # pyright: ignore
+ else:
+ headers_ = [(key.decode("latin-1"), value.decode("latin-1")) for key, value in headers]
+
+ super().__init__(CIMultiDict(headers_))
+ else:
+ super().__init__(headers)
+ self._header_list: Optional[RawHeadersList] = None
+
+ @classmethod
+ def from_scope(cls, scope: "Scope") -> "Headers":
+ """Create headers from a send-message.
+
+ Args:
+ scope: The ASGI connection scope.
+
+ Returns:
+ Headers
+
+ Raises:
+ ValueError: If the message does not have a ``headers`` key
+ """
+ connection_state = ScopeState.from_scope(scope)
+ if (headers := connection_state.headers) is Empty:
+ headers = connection_state.headers = cls(scope["headers"])
+ return headers
+
+ def to_header_list(self) -> "RawHeadersList":
+ """Raw header value.
+
+ Returns:
+ A list of tuples contain the header and header-value as bytes
+ """
+ # Since ``Headers`` are immutable, this can be cached
+ if not self._header_list:
+ self._header_list = _encode_headers((key, value) for key in set(self) for value in self.getall(key))
+ return self._header_list
+
+
+class MutableScopeHeaders(MutableMapping):
+ """A case-insensitive, multidict-like structure that can be used to mutate headers within a
+ :class:`Scope <.types.Scope>`
+ """
+
+ def __init__(self, scope: Optional["HeaderScope"] = None) -> None:
+ """Initialize ``MutableScopeHeaders`` from a ``HeaderScope``.
+
+ Args:
+ scope: The ASGI connection scope.
+ """
+ self.headers: RawHeadersList
+ if scope is not None:
+ if not isinstance(scope["headers"], list):
+ scope["headers"] = list(scope["headers"])
+
+ self.headers = cast("RawHeadersList", scope["headers"])
+ else:
+ self.headers = []
+
+ @classmethod
+ def from_message(cls, message: "Message") -> "MutableScopeHeaders":
+ """Construct a header from a message object.
+
+ Args:
+ message: :class:`Message <.types.Message>`.
+
+ Returns:
+ MutableScopeHeaders.
+
+ Raises:
+ ValueError: If the message does not have a ``headers`` key.
+ """
+ if "headers" not in message:
+ raise ValueError(f"Invalid message type: {message['type']!r}")
+
+ return cls(cast("HeaderScope", message))
+
+ def add(self, key: str, value: str) -> None:
+ """Add a header to the scope.
+
+ Notes:
+ - This method keeps duplicates.
+
+ Args:
+ key: Header key.
+ value: Header value.
+
+ Returns:
+ None.
+ """
+ self.headers.append((key.lower().encode("latin-1"), value.encode("latin-1")))
+
+ def getall(self, key: str, default: Optional[List[str]] = None) -> List[str]:
+ """Get all values of a header.
+
+ Args:
+ key: Header key.
+ default: Default value to return if ``name`` is not found.
+
+ Returns:
+ A list of strings.
+
+ Raises:
+ KeyError: if no header for ``name`` was found and ``default`` is not given.
+ """
+ name = key.lower()
+ values = [
+ header_value.decode("latin-1")
+ for header_name, header_value in self.headers
+ if header_name.decode("latin-1").lower() == name
+ ]
+ if not values:
+ if default:
+ return default
+ raise KeyError
+ return values
+
+ def extend_header_value(self, key: str, value: str) -> None:
+ """Extend a multivalued header.
+
+ Notes:
+ - A multivalues header is a header that can take a comma separated list.
+ - If the header previously did not exist, it will be added.
+
+ Args:
+ key: Header key.
+ value: Header value to add,
+
+ Returns:
+ None
+ """
+ existing = self.get(key)
+ if existing is not None:
+ value = ",".join([*existing.split(","), value])
+ self[key] = value
+
+ def __getitem__(self, key: str) -> str:
+ """Get the first header matching ``name``"""
+ name = key.lower()
+ for header in self.headers:
+ if header[0].decode("latin-1").lower() == name:
+ return header[1].decode("latin-1")
+ raise KeyError
+
+ def _find_indices(self, key: str) -> List[int]:
+ name = key.lower()
+ return [i for i, (name_, _) in enumerate(self.headers) if name_.decode("latin-1").lower() == name]
+
+ def __setitem__(self, key: str, value: str) -> None:
+ """Set a header in the scope, overwriting duplicates."""
+ name_encoded = key.lower().encode("latin-1")
+ value_encoded = value.encode("latin-1")
+ if indices := self._find_indices(key):
+ for i in indices[1:]:
+ del self.headers[i]
+ self.headers[indices[0]] = (name_encoded, value_encoded)
+ else:
+ self.headers.append((name_encoded, value_encoded))
+
+ def __delitem__(self, key: str) -> None:
+ """Delete all headers matching ``name``"""
+ indices = self._find_indices(key)
+ for i in indices[::-1]:
+ del self.headers[i]
+
+ def __len__(self) -> int:
+ """Return the length of the internally stored headers, including duplicates."""
+ return len(self.headers)
+
+ def __iter__(self) -> Iterator[str]:
+ """Create an iterator of header names including duplicates."""
+ return iter(h[0].decode("latin-1") for h in self.headers)
+
+
+@dataclass
+class Header(ABC):
+ """An abstract type for HTTP headers."""
+
+ HEADER_NAME: ClassVar[str] = ""
+
+ documentation_only: bool = False
+ """Defines the header instance as for OpenAPI documentation purpose only."""
+
+ @abstractmethod
+ def _get_header_value(self) -> str:
+ """Get the header value as string."""
+ raise NotImplementedError
+
+ @classmethod
+ @abstractmethod
+ def from_header(cls, header_value: str) -> "Header":
+ """Construct a header from its string representation."""
+
+ def to_header(self, include_header_name: bool = False) -> str:
+ """Get the header as string.
+
+ Args:
+ include_header_name: should include the header name in the return value. If set to false
+ the return value will only include the header value. if set to true the return value
+ will be: ``<header name>: <header value>``. Defaults to false.
+ """
+
+ if not self.HEADER_NAME:
+ raise ImproperlyConfiguredException("Missing header name")
+
+ return (f"{self.HEADER_NAME}: " if include_header_name else "") + self._get_header_value()
+
+
+@dataclass
+class CacheControlHeader(Header):
+ """A ``cache-control`` header."""
+
+ HEADER_NAME: ClassVar[str] = "cache-control"
+
+ max_age: Optional[int] = None
+ """Accessor for the ``max-age`` directive."""
+ s_maxage: Optional[int] = None
+ """Accessor for the ``s-maxage`` directive."""
+ no_cache: Optional[bool] = None
+ """Accessor for the ``no-cache`` directive."""
+ no_store: Optional[bool] = None
+ """Accessor for the ``no-store`` directive."""
+ private: Optional[bool] = None
+ """Accessor for the ``private`` directive."""
+ public: Optional[bool] = None
+ """Accessor for the ``public`` directive."""
+ no_transform: Optional[bool] = None
+ """Accessor for the ``no-transform`` directive."""
+ must_revalidate: Optional[bool] = None
+ """Accessor for the ``must-revalidate`` directive."""
+ proxy_revalidate: Optional[bool] = None
+ """Accessor for the ``proxy-revalidate`` directive."""
+ must_understand: Optional[bool] = None
+ """Accessor for the ``must-understand`` directive."""
+ immutable: Optional[bool] = None
+ """Accessor for the ``immutable`` directive."""
+ stale_while_revalidate: Optional[int] = None
+ """Accessor for the ``stale-while-revalidate`` directive."""
+
+ _field_definitions: ClassVar[Optional[Dict[str, FieldDefinition]]] = None
+
+ def _get_header_value(self) -> str:
+ """Get the header value as string."""
+
+ cc_items = [
+ key.replace("_", "-") if isinstance(value, bool) else f"{key.replace('_', '-')}={value}"
+ for key, value in simple_asdict(self, exclude_none=True, exclude={"documentation_only"}).items()
+ ]
+ return ", ".join(cc_items)
+
+ @classmethod
+ def from_header(cls, header_value: str) -> "CacheControlHeader":
+ """Create a ``CacheControlHeader`` instance from the header value.
+
+ Args:
+ header_value: the header value as string
+
+ Returns:
+ An instance of ``CacheControlHeader``
+ """
+
+ cc_items = [v.strip() for v in header_value.split(",")]
+ kwargs: Dict[str, Any] = {}
+ field_definitions = cls._get_field_definitions()
+ for cc_item in cc_items:
+ key_value = cc_item.split("=")
+ key_value[0] = key_value[0].replace("-", "_")
+ if len(key_value) == 1:
+ kwargs[key_value[0]] = True
+ elif len(key_value) == 2:
+ key, value = key_value
+ if key not in field_definitions:
+ raise ImproperlyConfiguredException("Invalid cache-control header")
+ kwargs[key] = cls._convert_to_type(value, field_definition=field_definitions[key])
+ else:
+ raise ImproperlyConfiguredException("Invalid cache-control header value")
+
+ try:
+ return CacheControlHeader(**kwargs)
+ except TypeError as exc:
+ raise ImproperlyConfiguredException from exc
+
+ @classmethod
+ def prevent_storing(cls) -> "CacheControlHeader":
+ """Create a ``cache-control`` header with the ``no-store`` directive which indicates that any caches of any kind
+ (private or shared) should not store this response.
+ """
+
+ return cls(no_store=True)
+
+ @classmethod
+ def _get_field_definitions(cls) -> Dict[str, FieldDefinition]:
+ """Get the type annotations for the ``CacheControlHeader`` class properties.
+
+ This is needed due to the conversion from pydantic models to dataclasses. Dataclasses do not support
+ automatic conversion of types like pydantic models do.
+
+ Returns:
+ A dictionary of type annotations
+
+ """
+
+ if cls._field_definitions is None:
+ cls._field_definitions = {}
+ for key, value in get_type_hints(cls, include_extras=True).items():
+ definition = FieldDefinition.from_kwarg(annotation=value, name=key)
+ # resolve_model_type so that field_definition.raw has the real raw type e.g. <class 'bool'>
+ cls._field_definitions[key] = AbstractDTO.resolve_model_type(definition)
+ return cls._field_definitions
+
+ @classmethod
+ def _convert_to_type(cls, value: str, field_definition: FieldDefinition) -> Any:
+ """Convert the value to the expected type.
+
+ Args:
+ value: the value of the cache-control directive
+ field_definition: the field definition for the value to convert
+
+ Returns:
+ The value converted to the expected type
+ """
+ # bool values shouldn't be initiated since they should have been caught earlier in the from_header method and
+ # set with a value of True
+ expected_type = field_definition.raw
+ if expected_type is bool:
+ raise ImproperlyConfiguredException("Invalid cache-control header value")
+ return expected_type(value)
+
+
+@dataclass
+class ETag(Header):
+ """An ``etag`` header."""
+
+ HEADER_NAME: ClassVar[str] = "etag"
+
+ weak: bool = False
+ value: Optional[str] = None # only ASCII characters
+
+ def _get_header_value(self) -> str:
+ value = f'"{self.value}"'
+ return f"W/{value}" if self.weak else value
+
+ @classmethod
+ def from_header(cls, header_value: str) -> "ETag":
+ """Construct an ``etag`` header from its string representation.
+
+ Note that this will unquote etag-values
+ """
+ match = ETAG_RE.match(header_value)
+ if not match:
+ raise ImproperlyConfiguredException
+ weak, value = match.group(1, 2)
+ try:
+ return cls(weak=bool(weak), value=value)
+ except ValueError as exc:
+ raise ImproperlyConfiguredException from exc
+
+ def __post_init__(self) -> None:
+ if self.documentation_only is False and self.value is None:
+ raise ValidationException("value must be set if documentation_only is false")
+ if self.value and not PRINTABLE_ASCII_RE.fullmatch(self.value):
+ raise ValidationException("value must only contain ASCII printable characters")
+
+
+class MediaTypeHeader:
+ """A helper class for ``Accept`` header parsing."""
+
+ __slots__ = ("maintype", "subtype", "params", "_params_str")
+
+ def __init__(self, type_str: str) -> None:
+ # preserve the original parameters, because the order might be
+ # changed in the dict
+ self._params_str = "".join(type_str.partition(";")[1:])
+
+ full_type, self.params = parse_content_header(type_str)
+ self.maintype, _, self.subtype = full_type.partition("/")
+
+ def __str__(self) -> str:
+ return f"{self.maintype}/{self.subtype}{self._params_str}"
+
+ @property
+ def priority(self) -> Tuple[int, int]:
+ # Use fixed point values with two decimals to avoid problems
+ # when comparing float values
+ quality = 100
+ if "q" in self.params:
+ with suppress(ValueError):
+ quality = int(100 * float(self.params["q"]))
+
+ if self.maintype == "*":
+ specificity = 0
+ elif self.subtype == "*":
+ specificity = 1
+ elif not self.params or ("q" in self.params and len(self.params) == 1):
+ # no params or 'q' is the only one which we ignore
+ specificity = 2
+ else:
+ specificity = 3
+
+ return quality, specificity
+
+ def match(self, other: "MediaTypeHeader") -> bool:
+ return next(
+ (False for key, value in self.params.items() if key != "q" and value != other.params.get(key)),
+ False
+ if self.subtype != "*" and other.subtype != "*" and self.subtype != other.subtype
+ else self.maintype == "*" or other.maintype == "*" or self.maintype == other.maintype,
+ )
+
+
+class Accept:
+ """An ``Accept`` header."""
+
+ __slots__ = ("_accepted_types",)
+
+ def __init__(self, accept_value: str) -> None:
+ self._accepted_types = [MediaTypeHeader(t) for t in accept_value.split(",")]
+ self._accepted_types.sort(key=lambda t: t.priority, reverse=True)
+
+ def __len__(self) -> int:
+ return len(self._accepted_types)
+
+ def __getitem__(self, key: int) -> str:
+ return str(self._accepted_types[key])
+
+ def __iter__(self) -> Iterator[str]:
+ return map(str, self._accepted_types)
+
+ def best_match(self, provided_types: List[str], default: Optional[str] = None) -> Optional[str]:
+ """Find the best matching media type for the request.
+
+ Args:
+ provided_types: A list of media types that can be provided as a response. These types
+ can contain a wildcard ``*`` character in the main- or subtype part.
+ default: The media type that is returned if none of the provided types match.
+
+ Returns:
+ The best matching media type. If the matching provided type contains wildcard characters,
+ they are replaced with the corresponding part of the accepted type. Otherwise the
+ provided type is returned as-is.
+ """
+ types = [MediaTypeHeader(t) for t in provided_types]
+
+ for accepted in self._accepted_types:
+ for provided in types:
+ if provided.match(accepted):
+ # Return the accepted type with wildcards replaced
+ # by concrete parts from the provided type
+ result = copy(provided)
+ if result.subtype == "*":
+ result.subtype = accepted.subtype
+ if result.maintype == "*":
+ result.maintype = accepted.maintype
+ return str(result)
+ return default
+
+ def accepts(self, media_type: str) -> bool:
+ """Check if the request accepts the specified media type.
+
+ If multiple media types can be provided, it is better to use :func:`best_match`.
+
+ Args:
+ media_type: The media type to check for.
+
+ Returns:
+ True if the request accepts ``media_type``.
+ """
+ return self.best_match([media_type]) == media_type
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/multi_dicts.py b/venv/lib/python3.11/site-packages/litestar/datastructures/multi_dicts.py
new file mode 100644
index 0000000..7702e1a
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/multi_dicts.py
@@ -0,0 +1,106 @@
+from __future__ import annotations
+
+from abc import ABC
+from typing import TYPE_CHECKING, Any, Generator, Generic, Iterable, Mapping, TypeVar
+
+from multidict import MultiDict as BaseMultiDict
+from multidict import MultiDictProxy, MultiMapping
+
+from litestar.datastructures.upload_file import UploadFile
+
+if TYPE_CHECKING:
+ from typing_extensions import Self
+
+
+__all__ = ("FormMultiDict", "ImmutableMultiDict", "MultiDict", "MultiMixin")
+
+
+T = TypeVar("T")
+
+
+class MultiMixin(Generic[T], MultiMapping[T], ABC):
+ """Mixin providing common methods for multi dicts, used by :class:`ImmutableMultiDict` and :class:`MultiDict`"""
+
+ def dict(self) -> dict[str, list[Any]]:
+ """Return the multi-dict as a dict of lists.
+
+ Returns:
+ A dict of lists
+ """
+ return {k: self.getall(k) for k in set(self.keys())}
+
+ def multi_items(self) -> Generator[tuple[str, T], None, None]:
+ """Get all keys and values, including duplicates.
+
+ Returns:
+ A list of tuples containing key-value pairs
+ """
+ for key in set(self):
+ for value in self.getall(key):
+ yield key, value
+
+
+class MultiDict(BaseMultiDict[T], MultiMixin[T], Generic[T]):
+ """MultiDict, using :class:`MultiDict <multidict.MultiDictProxy>`."""
+
+ def __init__(self, args: MultiMapping | Mapping[str, T] | Iterable[tuple[str, T]] | None = None) -> None:
+ """Initialize ``MultiDict`` from a`MultiMapping``,
+ :class:`Mapping <typing.Mapping>` or an iterable of tuples.
+
+ Args:
+ args: Mapping-like structure to create the ``MultiDict`` from
+ """
+ super().__init__(args or {})
+
+ def immutable(self) -> ImmutableMultiDict[T]:
+ """Create an.
+
+ :class:`ImmutableMultiDict` view.
+
+ Returns:
+ An immutable multi dict
+ """
+ return ImmutableMultiDict[T](self) # pyright: ignore
+
+ def copy(self) -> Self:
+ """Return a shallow copy"""
+ return type(self)(list(self.multi_items()))
+
+
+class ImmutableMultiDict(MultiDictProxy[T], MultiMixin[T], Generic[T]):
+ """Immutable MultiDict, using class:`MultiDictProxy <multidict.MultiDictProxy>`."""
+
+ def __init__(self, args: MultiMapping | Mapping[str, Any] | Iterable[tuple[str, Any]] | None = None) -> None:
+ """Initialize ``ImmutableMultiDict`` from a `MultiMapping``,
+ :class:`Mapping <typing.Mapping>` or an iterable of tuples.
+
+ Args:
+ args: Mapping-like structure to create the ``ImmutableMultiDict`` from
+ """
+ super().__init__(BaseMultiDict(args or {}))
+
+ def mutable_copy(self) -> MultiDict[T]:
+ """Create a mutable copy as a :class:`MultiDict`
+
+ Returns:
+ A mutable multi dict
+ """
+ return MultiDict(list(self.multi_items()))
+
+ def copy(self) -> Self: # type: ignore[override]
+ """Return a shallow copy"""
+ return type(self)(self.items())
+
+
+class FormMultiDict(ImmutableMultiDict[Any]):
+ """MultiDict for form data."""
+
+ async def close(self) -> None:
+ """Close all files in the multi-dict.
+
+ Returns:
+ None
+ """
+ for _, value in self.multi_items():
+ if isinstance(value, UploadFile):
+ await value.close()
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/response_header.py b/venv/lib/python3.11/site-packages/litestar/datastructures/response_header.py
new file mode 100644
index 0000000..f781d0c
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/response_header.py
@@ -0,0 +1,125 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any
+
+from litestar.exceptions import ImproperlyConfiguredException
+
+if TYPE_CHECKING:
+ from litestar.openapi.spec import Example
+
+__all__ = ("ResponseHeader",)
+
+
+@dataclass
+class ResponseHeader:
+ """Container type for a response header."""
+
+ name: str
+ """Header name"""
+
+ documentation_only: bool = False
+ """Defines the ResponseHeader instance as for OpenAPI documentation purpose only."""
+
+ value: str | None = None
+ """Value to set for the response header."""
+
+ description: str | None = None
+ """A brief description of the parameter. This could contain examples of
+ use.
+
+ [CommonMark syntax](https://spec.commonmark.org/) MAY be used for
+ rich text representation.
+ """
+
+ required: bool = False
+ """Determines whether this parameter is mandatory.
+
+ If the [parameter location](https://spec.openapis.org/oas/v3.1.0#parameterIn) is `"path"`, this property is **REQUIRED** and its value MUST be `true`.
+ Otherwise, the property MAY be included and its default value is `false`.
+ """
+
+ deprecated: bool = False
+ """Specifies that a parameter is deprecated and SHOULD be transitioned out
+ of usage.
+
+ Default value is `false`.
+ """
+
+ allow_empty_value: bool = False
+ """Sets the ability to pass empty-valued parameters. This is valid only for
+ `query` parameters and allows sending a parameter with an empty value.
+ Default value is `false`. If.
+
+ [style](https://spec.openapis.org/oas/v3.1.0#parameterStyle) is used, and if behavior is `n/a` (cannot be
+ serialized), the value of `allowEmptyValue` SHALL be ignored. Use of this property is NOT RECOMMENDED, as it is
+ likely to be removed in a later revision.
+
+ The rules for serialization of the parameter are specified in one of two ways.
+ For simpler scenarios, a [schema](https://spec.openapis.org/oas/v3.1.0#parameterSchema) and [style](https://spec.openapis.org/oas/v3.1.0#parameterStyle)
+ can describe the structure and syntax of the parameter.
+ """
+
+ style: str | None = None
+ """Describes how the parameter value will be serialized depending on the
+ type of the parameter value. Default values (based on value of `in`):
+
+ - for `query` - `form`;
+ - for `path` - `simple`;
+ - for `header` - `simple`;
+ - for `cookie` - `form`.
+ """
+
+ explode: bool | None = None
+ """When this is true, parameter values of type `array` or `object` generate
+ separate parameters for each value of the array or key-value pair of the
+ map.
+
+ For other types of parameters this property has no effect.
+ When [style](https://spec.openapis.org/oas/v3.1.0#parameterStyle) is `form`, the default value is `true`.
+ For all other styles, the default value is `false`.
+ """
+
+ allow_reserved: bool = False
+ """Determines whether the parameter value SHOULD allow reserved characters,
+ as defined by.
+
+ [RFC3986](https://tools.ietf.org/html/rfc3986#section-2.2) `:/?#[]@!$&'()*+,;=` to be included without percent-
+ encoding.
+
+ This property only applies to parameters with an `in` value of `query`. The default value is `false`.
+ """
+
+ example: Any | None = None
+ """Example of the parameter's potential value.
+
+ The example SHOULD match the specified schema and encoding
+ properties if present. The `example` field is mutually exclusive of
+ the `examples` field. Furthermore, if referencing a `schema` that
+ contains an example, the `example` value SHALL _override_ the
+ example provided by the schema. To represent examples of media types
+ that cannot naturally be represented in JSON or YAML, a string value
+ can contain the example with escaping where necessary.
+ """
+
+ examples: dict[str, Example] | None = None
+ """Examples of the parameter's potential value. Each example SHOULD contain
+ a value in the correct format as specified in the parameter encoding. The
+ `examples` field is mutually exclusive of the `example` field. Furthermore,
+ if referencing a `schema` that contains an example, the `examples` value
+ SHALL _override_ the example provided by the schema.
+
+ For more complex scenarios, the [content](https://spec.openapis.org/oas/v3.1.0#parameterContent) property
+ can define the media type and schema of the parameter.
+ A parameter MUST contain either a `schema` property, or a `content` property, but not both.
+ When `example` or `examples` are provided in conjunction with the `schema` object,
+ the example MUST follow the prescribed serialization strategy for the parameter.
+ """
+
+ def __post_init__(self) -> None:
+ """Ensure that either value is set or the instance is for documentation_only."""
+ if not self.documentation_only and self.value is None:
+ raise ImproperlyConfiguredException("value must be set if documentation_only is false")
+
+ def __hash__(self) -> int:
+ return hash(self.name)
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/state.py b/venv/lib/python3.11/site-packages/litestar/datastructures/state.py
new file mode 100644
index 0000000..71980e0
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/state.py
@@ -0,0 +1,313 @@
+from __future__ import annotations
+
+from copy import copy, deepcopy
+from threading import RLock
+from typing import TYPE_CHECKING, Any, Callable, Generator, Iterable, Iterator, Mapping, MutableMapping
+
+if TYPE_CHECKING:
+ from typing_extensions import Self
+
+__all__ = ("ImmutableState", "State")
+
+
+class ImmutableState(Mapping[str, Any]):
+ """An object meant to store arbitrary state.
+
+ It can be accessed using dot notation while exposing dict like functionalities.
+ """
+
+ __slots__ = (
+ "_state",
+ "_deep_copy",
+ )
+
+ _state: dict[str, Any]
+
+ def __init__(
+ self, state: ImmutableState | Mapping[str, Any] | Iterable[tuple[str, Any]], deep_copy: bool = True
+ ) -> None:
+ """Initialize an ``ImmutableState`` instance.
+
+ Args:
+ state: An object to initialize the state from. Can be a dict, an instance of :class:`ImmutableState`, or a tuple
+ of key value paris.
+ deep_copy: Whether to 'deepcopy' the passed in state.
+
+ Examples:
+ .. code-block:: python
+
+ from litestar.datastructures import ImmutableState
+
+ state_dict = {"first": 1, "second": 2, "third": 3, "fourth": 4}
+ state = ImmutableState(state_dict)
+
+ # state implements the Mapping type:
+ assert len(state) == 3
+ assert "first" in state
+ assert not "fourth" in state
+ assert state["first"] == 1
+ assert [(k, v) for k, v in state.items()] == [("first", 1), ("second", 2), ("third", 3)]
+
+ # state implements __bool__
+ assert state # state is true when it has values.
+ assert not State() # state is empty when it has no values.
+
+ # it has a 'dict' method to retrieve a shallow copy of the underlying dict
+ inner_dict = state.dict()
+ assert inner_dict == state_dict
+
+ # you can also retrieve a mutable State by calling 'mutable_copy'
+ mutable_state = state.mutable_copy()
+ del state["first"]
+ assert "first" not in state
+
+ """
+ if isinstance(state, ImmutableState):
+ state = state._state
+
+ if not isinstance(state, dict) and isinstance(state, Iterable):
+ state = dict(state)
+
+ super().__setattr__("_deep_copy", deep_copy)
+ super().__setattr__("_state", deepcopy(state) if deep_copy else state)
+
+ def __bool__(self) -> bool:
+ """Return a boolean indicating whether the wrapped dict instance has values."""
+ return bool(self._state)
+
+ def __getitem__(self, key: str) -> Any:
+ """Get the value for the corresponding key from the wrapped state object using subscription notation.
+
+ Args:
+ key: Key to access.
+
+ Raises:
+ KeyError
+
+ Returns:
+ A value from the wrapped state instance.
+ """
+ return self._state[key]
+
+ def __iter__(self) -> Iterator[str]:
+ """Return an iterator iterating the wrapped state dict.
+
+ Returns:
+ An iterator of strings
+ """
+ return iter(self._state)
+
+ def __len__(self) -> int:
+ """Return length of the wrapped state dict.
+
+ Returns:
+ An integer
+ """
+ return len(self._state)
+
+ def __getattr__(self, key: str) -> Any:
+ """Get the value for the corresponding key from the wrapped state object using attribute notation.
+
+ Args:
+ key: Key to retrieve
+
+ Raises:
+ AttributeError: if the given attribute is not set.
+
+ Returns:
+ The retrieved value
+ """
+ try:
+ return self._state[key]
+ except KeyError as e:
+ raise AttributeError from e
+
+ def __copy__(self) -> Self:
+ """Return a shallow copy of the given state object.
+
+ Customizes how the builtin "copy" function will work.
+ """
+ return self.__class__(self._state, deep_copy=self._deep_copy) # pyright: ignore
+
+ def mutable_copy(self) -> State:
+ """Return a mutable copy of the state object.
+
+ Returns:
+ A ``State``
+ """
+ return State(self._state, deep_copy=self._deep_copy)
+
+ def dict(self) -> dict[str, Any]:
+ """Return a shallow copy of the wrapped dict.
+
+ Returns:
+ A dict
+ """
+ return copy(self._state)
+
+ @classmethod
+ def __get_validators__(
+ cls,
+ ) -> Generator[Callable[[ImmutableState | dict[str, Any] | Iterable[tuple[str, Any]]], ImmutableState], None, None]: # type: ignore[valid-type]
+ """Pydantic compatible method to allow custom parsing of state instances in a SignatureModel."""
+ yield cls.validate
+
+ @classmethod
+ def validate(cls, value: ImmutableState | dict[str, Any] | Iterable[tuple[str, Any]]) -> Self: # type: ignore[valid-type]
+ """Parse a value and instantiate state inside a SignatureModel. This allows us to use custom subclasses of
+ state, as well as allows users to decide whether state is mutable or immutable.
+
+ Args:
+ value: The value from which to initialize the state instance.
+
+ Returns:
+ An ImmutableState instance
+ """
+ deep_copy = value._deep_copy if isinstance(value, ImmutableState) else False
+ return cls(value, deep_copy=deep_copy)
+
+
+class State(ImmutableState, MutableMapping[str, Any]):
+ """An object meant to store arbitrary state.
+
+ It can be accessed using dot notation while exposing dict like functionalities.
+ """
+
+ __slots__ = ("_lock",)
+
+ _lock: RLock
+
+ def __init__(
+ self,
+ state: ImmutableState | Mapping[str, Any] | Iterable[tuple[str, Any]] | None = None,
+ deep_copy: bool = False,
+ ) -> None:
+ """Initialize a ``State`` instance with an optional value.
+
+ Args:
+ state: An object to initialize the state from. Can be a dict, an instance of 'ImmutableState', or a tuple of key value paris.
+ deep_copy: Whether to 'deepcopy' the passed in state.
+
+ .. code-block:: python
+ :caption: Examples
+
+ from litestar.datastructures import State
+
+ state_dict = {"first": 1, "second": 2, "third": 3, "fourth": 4}
+ state = State(state_dict)
+
+ # state can be accessed using '.' notation
+ assert state.fourth == 4
+ del state.fourth
+
+ # state implements the Mapping type:
+ assert len(state) == 3
+ assert "first" in state
+ assert not "fourth" in state
+ assert state["first"] == 1
+ assert [(k, v) for k, v in state.items()] == [("first", 1), ("second", 2), ("third", 3)]
+
+ state["fourth"] = 4
+ assert "fourth" in state
+ del state["fourth"]
+
+ # state implements __bool__
+ assert state # state is true when it has values.
+ assert not State() # state is empty when it has no values.
+
+ # it has shallow copy
+ copied_state = state.copy()
+ del copied_state.first
+ assert state.first
+
+ # it has a 'dict' method to retrieve a shallow copy of the underlying dict
+ inner_dict = state.dict()
+ assert inner_dict == state_dict
+
+ # you can get an immutable copy of the state by calling 'immutable_immutable_copy'
+ immutable_copy = state.immutable_copy()
+ del immutable_copy.first # raises AttributeError
+
+ """
+
+ super().__init__(state if state is not None else {}, deep_copy=deep_copy)
+ super().__setattr__("_lock", RLock())
+
+ def __delitem__(self, key: str) -> None:
+ """Delete the value from the key from the wrapped state object using subscription notation.
+
+ Args:
+ key: Key to delete
+
+ Raises:
+ KeyError: if the given attribute is not set.
+
+ Returns:
+ None
+ """
+
+ with self._lock:
+ del self._state[key]
+
+ def __setitem__(self, key: str, value: Any) -> None:
+ """Set an item in the state using subscription notation.
+
+ Args:
+ key: Key to set.
+ value: Value to set.
+
+ Returns:
+ None
+ """
+
+ with self._lock:
+ self._state[key] = value
+
+ def __setattr__(self, key: str, value: Any) -> None:
+ """Set an item in the state using attribute notation.
+
+ Args:
+ key: Key to set.
+ value: Value to set.
+
+ Returns:
+ None
+ """
+
+ with self._lock:
+ self._state[key] = value
+
+ def __delattr__(self, key: str) -> None:
+ """Delete the value from the key from the wrapped state object using attribute notation.
+
+ Args:
+ key: Key to delete
+
+ Raises:
+ AttributeError: if the given attribute is not set.
+
+ Returns:
+ None
+ """
+
+ try:
+ with self._lock:
+ del self._state[key]
+ except KeyError as e:
+ raise AttributeError from e
+
+ def copy(self) -> Self:
+ """Return a shallow copy of the state object.
+
+ Returns:
+ A ``State``
+ """
+ return self.__class__(self.dict(), deep_copy=self._deep_copy) # pyright: ignore
+
+ def immutable_copy(self) -> ImmutableState:
+ """Return a shallow copy of the state object, setting it to be frozen.
+
+ Returns:
+ A ``State``
+ """
+ return ImmutableState(self, deep_copy=self._deep_copy)
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/upload_file.py b/venv/lib/python3.11/site-packages/litestar/datastructures/upload_file.py
new file mode 100644
index 0000000..09ad2d3
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/upload_file.py
@@ -0,0 +1,101 @@
+from __future__ import annotations
+
+from tempfile import SpooledTemporaryFile
+
+from litestar.concurrency import sync_to_thread
+from litestar.constants import ONE_MEGABYTE
+
+__all__ = ("UploadFile",)
+
+
+class UploadFile:
+ """Representation of a file upload"""
+
+ __slots__ = ("filename", "file", "content_type", "headers")
+
+ def __init__(
+ self,
+ content_type: str,
+ filename: str,
+ file_data: bytes | None = None,
+ headers: dict[str, str] | None = None,
+ max_spool_size: int = ONE_MEGABYTE,
+ ) -> None:
+ """Upload file in-memory container.
+
+ Args:
+ content_type: Content type for the file.
+ filename: The filename.
+ file_data: File data.
+ headers: Any attached headers.
+ max_spool_size: The size above which the temporary file will be rolled to disk.
+ """
+ self.filename = filename
+ self.content_type = content_type
+ self.file = SpooledTemporaryFile(max_size=max_spool_size)
+ self.headers = headers or {}
+
+ if file_data:
+ self.file.write(file_data)
+ self.file.seek(0)
+
+ @property
+ def rolled_to_disk(self) -> bool:
+ """Determine whether the spooled file exceeded the rolled-to-disk threshold and is no longer in memory.
+
+ Returns:
+ A boolean flag
+ """
+ return getattr(self.file, "_rolled", False)
+
+ async def write(self, data: bytes) -> int:
+ """Proxy for data writing.
+
+ Args:
+ data: Byte string to write.
+
+ Returns:
+ None
+ """
+ if self.rolled_to_disk:
+ return await sync_to_thread(self.file.write, data)
+ return self.file.write(data)
+
+ async def read(self, size: int = -1) -> bytes:
+ """Proxy for data reading.
+
+ Args:
+ size: position from which to read.
+
+ Returns:
+ Byte string.
+ """
+ if self.rolled_to_disk:
+ return await sync_to_thread(self.file.read, size)
+ return self.file.read(size)
+
+ async def seek(self, offset: int) -> int:
+ """Async proxy for file seek.
+
+ Args:
+ offset: start position..
+
+ Returns:
+ None.
+ """
+ if self.rolled_to_disk:
+ return await sync_to_thread(self.file.seek, offset)
+ return self.file.seek(offset)
+
+ async def close(self) -> None:
+ """Async proxy for file close.
+
+ Returns:
+ None.
+ """
+ if self.rolled_to_disk:
+ return await sync_to_thread(self.file.close)
+ return self.file.close()
+
+ def __repr__(self) -> str:
+ return f"{self.filename} - {self.content_type}"
diff --git a/venv/lib/python3.11/site-packages/litestar/datastructures/url.py b/venv/lib/python3.11/site-packages/litestar/datastructures/url.py
new file mode 100644
index 0000000..f3441d0
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/datastructures/url.py
@@ -0,0 +1,262 @@
+from __future__ import annotations
+
+from functools import lru_cache
+from typing import TYPE_CHECKING, Any, NamedTuple
+from urllib.parse import SplitResult, urlencode, urlsplit, urlunsplit
+
+from litestar._parsers import parse_query_string
+from litestar.datastructures import MultiDict
+from litestar.types import Empty
+
+if TYPE_CHECKING:
+ from typing_extensions import Self
+
+ from litestar.types import EmptyType, Scope
+
+__all__ = ("Address", "URL")
+
+_DEFAULT_SCHEME_PORTS = {"http": 80, "https": 443, "ftp": 21, "ws": 80, "wss": 443}
+
+
+class Address(NamedTuple):
+ """Just a network address."""
+
+ host: str
+ """Address host."""
+ port: int
+ """Address port."""
+
+
+def make_absolute_url(path: str | URL, base: str | URL) -> str:
+ """Create an absolute URL.
+
+ Args:
+ path: URL path to make absolute
+ base: URL to use as a base
+
+ Returns:
+ A string representing the new, absolute URL
+ """
+ url = base if isinstance(base, URL) else URL(base)
+ netloc = url.netloc
+ path = url.path.rstrip("/") + str(path)
+ return str(URL.from_components(scheme=url.scheme, netloc=netloc, path=path))
+
+
+class URL:
+ """Representation and modification utilities of a URL."""
+
+ __slots__ = (
+ "_query_params",
+ "_parsed_url",
+ "fragment",
+ "hostname",
+ "netloc",
+ "password",
+ "path",
+ "port",
+ "query",
+ "scheme",
+ "username",
+ )
+
+ _query_params: EmptyType | MultiDict
+ _parsed_url: str | None
+
+ scheme: str
+ """URL scheme."""
+ netloc: str
+ """Network location."""
+ path: str
+ """Hierarchical path."""
+ fragment: str
+ """Fragment component."""
+ query: str
+ """Query string."""
+ username: str | None
+ """Username if specified."""
+ password: str | None
+ """Password if specified."""
+ port: int | None
+ """Port if specified."""
+ hostname: str | None
+ """Hostname if specified."""
+
+ def __new__(cls, url: str | SplitResult) -> URL:
+ """Create a new instance.
+
+ Args:
+ url: url string or split result to represent.
+ """
+ return cls._new(url=url)
+
+ @classmethod
+ @lru_cache
+ def _new(cls, url: str | SplitResult) -> URL:
+ instance = super().__new__(cls)
+ instance._parsed_url = None
+
+ if isinstance(url, str):
+ result = urlsplit(url)
+ instance._parsed_url = url
+ else:
+ result = url
+
+ instance.scheme = result.scheme
+ instance.netloc = result.netloc
+ instance.path = result.path
+ instance.fragment = result.fragment
+ instance.query = result.query
+ instance.username = result.username
+ instance.password = result.password
+ instance.port = result.port
+ instance.hostname = result.hostname
+ instance._query_params = Empty
+
+ return instance
+
+ @property
+ def _url(self) -> str:
+ if not self._parsed_url:
+ self._parsed_url = str(
+ urlunsplit(
+ SplitResult(
+ scheme=self.scheme,
+ netloc=self.netloc,
+ path=self.path,
+ fragment=self.fragment,
+ query=self.query,
+ )
+ )
+ )
+ return self._parsed_url
+
+ @classmethod
+ @lru_cache
+ def from_components(
+ cls,
+ scheme: str = "",
+ netloc: str = "",
+ path: str = "",
+ fragment: str = "",
+ query: str = "",
+ ) -> Self:
+ """Create a new URL from components.
+
+ Args:
+ scheme: URL scheme
+ netloc: Network location
+ path: Hierarchical path
+ query: Query component
+ fragment: Fragment identifier
+
+ Returns:
+ A new URL with the given components
+ """
+ return cls(
+ SplitResult(
+ scheme=scheme,
+ netloc=netloc,
+ path=path,
+ fragment=fragment,
+ query=query,
+ )
+ )
+
+ @classmethod
+ def from_scope(cls, scope: Scope) -> Self:
+ """Construct a URL from a :class:`Scope <.types.Scope>`
+
+ Args:
+ scope: A scope
+
+ Returns:
+ A URL
+ """
+ scheme = scope.get("scheme", "http")
+ server = scope.get("server")
+ path = scope.get("root_path", "") + scope["path"]
+ query_string = scope.get("query_string", b"")
+
+ # we use iteration here because it's faster, and headers might not yet be cached
+ host = next(
+ (
+ header_value.decode("latin-1")
+ for header_name, header_value in scope.get("headers", [])
+ if header_name == b"host"
+ ),
+ "",
+ )
+ if server and not host:
+ host, port = server
+ default_port = _DEFAULT_SCHEME_PORTS[scheme]
+ if port != default_port:
+ host = f"{host}:{port}"
+
+ return cls.from_components(
+ scheme=scheme if server else "",
+ query=query_string.decode(),
+ netloc=host,
+ path=path,
+ )
+
+ def with_replacements(
+ self,
+ scheme: str = "",
+ netloc: str = "",
+ path: str = "",
+ query: str | MultiDict | None | EmptyType = Empty,
+ fragment: str = "",
+ ) -> Self:
+ """Create a new URL, replacing the given components.
+
+ Args:
+ scheme: URL scheme
+ netloc: Network location
+ path: Hierarchical path
+ query: Raw query string
+ fragment: Fragment identifier
+
+ Returns:
+ A new URL with the given components replaced
+ """
+ if isinstance(query, MultiDict):
+ query = urlencode(query=query)
+
+ query = (query if query is not Empty else self.query) or ""
+
+ return type(self).from_components(
+ scheme=scheme or self.scheme,
+ netloc=netloc or self.netloc,
+ path=path or self.path,
+ query=query,
+ fragment=fragment or self.fragment,
+ )
+
+ @property
+ def query_params(self) -> MultiDict:
+ """Query parameters of a URL as a :class:`MultiDict <.datastructures.multi_dicts.MultiDict>`
+
+ Returns:
+ A :class:`MultiDict <.datastructures.multi_dicts.MultiDict>` with query parameters
+
+ Notes:
+ - The returned ``MultiDict`` is mutable, :class:`URL` itself is *immutable*,
+ therefore mutating the query parameters will not directly mutate the ``URL``.
+ If you want to modify query parameters, make modifications in the
+ multidict and pass them back to :meth:`with_replacements`
+ """
+ if self._query_params is Empty:
+ self._query_params = MultiDict(parse_query_string(query_string=self.query.encode()))
+ return self._query_params
+
+ def __str__(self) -> str:
+ return self._url
+
+ def __eq__(self, other: Any) -> bool:
+ if isinstance(other, (str, URL)):
+ return str(self) == str(other)
+ return NotImplemented # pragma: no cover
+
+ def __repr__(self) -> str:
+ return f"{type(self).__name__}({self._url!r})"