diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/utils | |
parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/utils')
34 files changed, 1925 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__init__.py b/venv/lib/python3.11/site-packages/litestar/utils/__init__.py new file mode 100644 index 0000000..3f62792 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__init__.py @@ -0,0 +1,86 @@ +from typing import Any + +from litestar.utils.deprecation import deprecated, warn_deprecation + +from .helpers import get_enum_string_value, get_name, unique_name_for_scope, url_quote +from .path import join_paths, normalize_path +from .predicates import ( + _is_sync_or_async_generator, + is_annotated_type, + is_any, + is_async_callable, + is_attrs_class, + is_class_and_subclass, + is_class_var, + is_dataclass_class, + is_dataclass_instance, + is_generic, + is_mapping, + is_non_string_iterable, + is_non_string_sequence, + is_optional_union, + is_undefined_sentinel, + is_union, +) +from .scope import ( # type: ignore[attr-defined] + _delete_litestar_scope_state, + _get_litestar_scope_state, + _set_litestar_scope_state, + get_serializer_from_scope, +) +from .sequence import find_index, unique +from .sync import AsyncIteratorWrapper, ensure_async_callable +from .typing import get_origin_or_inner_type, make_non_optional_union + +__all__ = ( + "ensure_async_callable", + "AsyncIteratorWrapper", + "deprecated", + "find_index", + "get_enum_string_value", + "get_name", + "get_origin_or_inner_type", + "get_serializer_from_scope", + "is_annotated_type", + "is_any", + "is_async_callable", + "is_attrs_class", + "is_class_and_subclass", + "is_class_var", + "is_dataclass_class", + "is_dataclass_instance", + "is_generic", + "is_mapping", + "is_non_string_iterable", + "is_non_string_sequence", + "is_optional_union", + "is_undefined_sentinel", + "is_union", + "join_paths", + "make_non_optional_union", + "normalize_path", + "unique", + "unique_name_for_scope", + "url_quote", + "warn_deprecation", +) + +_deprecated_names = { + "get_litestar_scope_state": _get_litestar_scope_state, + "set_litestar_scope_state": _set_litestar_scope_state, + "delete_litestar_scope_state": _delete_litestar_scope_state, + "is_sync_or_async_generator": _is_sync_or_async_generator, +} + + +def __getattr__(name: str) -> Any: + if name in _deprecated_names: + warn_deprecation( + deprecated_name=f"litestar.utils.{name}", + version="2.4", + kind="import", + removal_in="3.0", + info=f"'litestar.utils.{name}' is deprecated.", + ) + return globals()["_deprecated_names"][name] + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") # pragma: no cover diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..da9ad02 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/compat.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/compat.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..39d39d6 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/compat.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/dataclass.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/dataclass.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..addf140 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/dataclass.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/deprecation.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/deprecation.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..ddf3b7c --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/deprecation.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/empty.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/empty.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..2b358ad --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/empty.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/helpers.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/helpers.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..9db59c4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/helpers.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/module_loader.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/module_loader.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..f32ca7e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/module_loader.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/path.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/path.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..6602b02 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/path.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/predicates.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/predicates.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..983250b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/predicates.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/sequence.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/sequence.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..b2f8c1a --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/sequence.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/signature.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/signature.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..d81050f --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/signature.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/sync.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/sync.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..8a18dfb --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/sync.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/typing.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/typing.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..53db129 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/typing.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/version.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/version.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..f656ef4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/version.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/warnings.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/warnings.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..3fa1cf1 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/__pycache__/warnings.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/compat.py b/venv/lib/python3.11/site-packages/litestar/utils/compat.py new file mode 100644 index 0000000..384db76 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/compat.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, TypeVar + +from litestar.types import Empty, EmptyType + +__all__ = ("async_next",) + + +if TYPE_CHECKING: + from typing import Any, AsyncGenerator + +T = TypeVar("T") +D = TypeVar("D") + +try: + async_next = anext # type: ignore[name-defined] +except NameError: + + async def async_next(gen: AsyncGenerator[T, Any], default: D | EmptyType = Empty) -> T | D: + """Backwards compatibility shim for Python<3.10.""" + try: + return await gen.__anext__() + except StopAsyncIteration as exc: + if default is not Empty: + return default + raise exc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/dataclass.py b/venv/lib/python3.11/site-packages/litestar/utils/dataclass.py new file mode 100644 index 0000000..597465d --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/dataclass.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +from dataclasses import Field, fields +from typing import TYPE_CHECKING + +from litestar.types import Empty +from litestar.utils.predicates import is_dataclass_instance + +if TYPE_CHECKING: + from typing import AbstractSet, Any, Iterable + + from litestar.types.protocols import DataclassProtocol + +__all__ = ( + "extract_dataclass_fields", + "extract_dataclass_items", + "simple_asdict", +) + + +def extract_dataclass_fields( + dt: DataclassProtocol, + exclude_none: bool = False, + exclude_empty: bool = False, + include: AbstractSet[str] | None = None, + exclude: AbstractSet[str] | None = None, +) -> tuple[Field[Any], ...]: + """Extract dataclass fields. + + Args: + dt: A dataclass instance. + exclude_none: Whether to exclude None values. + exclude_empty: Whether to exclude Empty values. + include: An iterable of fields to include. + exclude: An iterable of fields to exclude. + + + Returns: + A tuple of dataclass fields. + """ + include = include or set() + exclude = exclude or set() + + if common := (include & exclude): + raise ValueError(f"Fields {common} are both included and excluded.") + + dataclass_fields: Iterable[Field[Any]] = fields(dt) + if exclude_none: + dataclass_fields = (field for field in dataclass_fields if getattr(dt, field.name) is not None) + if exclude_empty: + dataclass_fields = (field for field in dataclass_fields if getattr(dt, field.name) is not Empty) + if include: + dataclass_fields = (field for field in dataclass_fields if field.name in include) + if exclude: + dataclass_fields = (field for field in dataclass_fields if field.name not in exclude) + + return tuple(dataclass_fields) + + +def extract_dataclass_items( + dt: DataclassProtocol, + exclude_none: bool = False, + exclude_empty: bool = False, + include: AbstractSet[str] | None = None, + exclude: AbstractSet[str] | None = None, +) -> tuple[tuple[str, Any], ...]: + """Extract dataclass name, value pairs. + + Unlike the 'asdict' method exports by the stlib, this function does not pickle values. + + Args: + dt: A dataclass instance. + exclude_none: Whether to exclude None values. + exclude_empty: Whether to exclude Empty values. + include: An iterable of fields to include. + exclude: An iterable of fields to exclude. + + Returns: + A tuple of key/value pairs. + """ + dataclass_fields = extract_dataclass_fields(dt, exclude_none, exclude_empty, include, exclude) + return tuple((field.name, getattr(dt, field.name)) for field in dataclass_fields) + + +def simple_asdict( + obj: DataclassProtocol, + exclude_none: bool = False, + exclude_empty: bool = False, + convert_nested: bool = True, + exclude: set[str] | None = None, +) -> dict[str, Any]: + """Convert a dataclass to a dictionary. + + This method has important differences to the standard library version: + - it does not deepcopy values + - it does not recurse into collections + + Args: + obj: A dataclass instance. + exclude_none: Whether to exclude None values. + exclude_empty: Whether to exclude Empty values. + convert_nested: Whether to recursively convert nested dataclasses. + exclude: An iterable of fields to exclude. + + Returns: + A dictionary of key/value pairs. + """ + ret = {} + for field in extract_dataclass_fields(obj, exclude_none, exclude_empty, exclude=exclude): + value = getattr(obj, field.name) + if is_dataclass_instance(value) and convert_nested: + ret[field.name] = simple_asdict(value, exclude_none, exclude_empty) + else: + ret[field.name] = getattr(obj, field.name) + return ret diff --git a/venv/lib/python3.11/site-packages/litestar/utils/deprecation.py b/venv/lib/python3.11/site-packages/litestar/utils/deprecation.py new file mode 100644 index 0000000..b1b8725 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/deprecation.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import inspect +from functools import wraps +from typing import Callable, Literal, TypeVar +from warnings import warn + +from typing_extensions import ParamSpec + +__all__ = ("deprecated", "warn_deprecation") + + +T = TypeVar("T") +P = ParamSpec("P") +DeprecatedKind = Literal["function", "method", "classmethod", "attribute", "property", "class", "parameter", "import"] + + +def warn_deprecation( + version: str, + deprecated_name: str, + kind: DeprecatedKind, + *, + removal_in: str | None = None, + alternative: str | None = None, + info: str | None = None, + pending: bool = False, +) -> None: + """Warn about a call to a (soon to be) deprecated function. + + Args: + version: Litestar version where the deprecation will occur + deprecated_name: Name of the deprecated function + removal_in: Litestar version where the deprecated function will be removed + alternative: Name of a function that should be used instead + info: Additional information + pending: Use ``PendingDeprecationWarning`` instead of ``DeprecationWarning`` + kind: Type of the deprecated thing + """ + parts = [] + + if kind == "import": + access_type = "Import of" + elif kind in {"function", "method"}: + access_type = "Call to" + else: + access_type = "Use of" + + if pending: + parts.append(f"{access_type} {kind} awaiting deprecation {deprecated_name!r}") + else: + parts.append(f"{access_type} deprecated {kind} {deprecated_name!r}") + + parts.extend( + ( + f"Deprecated in litestar {version}", + f"This {kind} will be removed in {removal_in or 'the next major version'}", + ) + ) + if alternative: + parts.append(f"Use {alternative!r} instead") + + if info: + parts.append(info) + + text = ". ".join(parts) + warning_class = PendingDeprecationWarning if pending else DeprecationWarning + + warn(text, warning_class, stacklevel=2) + + +def deprecated( + version: str, + *, + removal_in: str | None = None, + alternative: str | None = None, + info: str | None = None, + pending: bool = False, + kind: Literal["function", "method", "classmethod", "property"] | None = None, +) -> Callable[[Callable[P, T]], Callable[P, T]]: + """Create a decorator wrapping a function, method or property with a warning call about a (pending) deprecation. + + Args: + version: Litestar version where the deprecation will occur + removal_in: Litestar version where the deprecated function will be removed + alternative: Name of a function that should be used instead + info: Additional information + pending: Use ``PendingDeprecationWarning`` instead of ``DeprecationWarning`` + kind: Type of the deprecated callable. If ``None``, will use ``inspect`` to figure + out if it's a function or method + + Returns: + A decorator wrapping the function call with a warning + """ + + def decorator(func: Callable[P, T]) -> Callable[P, T]: + @wraps(func) + def wrapped(*args: P.args, **kwargs: P.kwargs) -> T: + warn_deprecation( + version=version, + deprecated_name=func.__name__, + info=info, + alternative=alternative, + pending=pending, + removal_in=removal_in, + kind=kind or ("method" if inspect.ismethod(func) else "function"), + ) + return func(*args, **kwargs) + + return wrapped + + return decorator diff --git a/venv/lib/python3.11/site-packages/litestar/utils/empty.py b/venv/lib/python3.11/site-packages/litestar/utils/empty.py new file mode 100644 index 0000000..cdde871 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/empty.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, TypeVar + +from litestar.types.empty import Empty + +if TYPE_CHECKING: + from litestar.types.empty import EmptyType + +ValueT = TypeVar("ValueT") +DefaultT = TypeVar("DefaultT") + + +def value_or_default(value: ValueT | EmptyType, default: DefaultT) -> ValueT | DefaultT: + """Return `value` handling the case where it is empty. + + If `value` is `Empty`, `default` is returned. + + Args: + value: The value to check. + default: The default value to return if `value` is `Empty`. + + Returns: + The value or default value. + """ + return default if value is Empty else value diff --git a/venv/lib/python3.11/site-packages/litestar/utils/helpers.py b/venv/lib/python3.11/site-packages/litestar/utils/helpers.py new file mode 100644 index 0000000..c25fe35 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/helpers.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from enum import Enum +from functools import partial +from typing import TYPE_CHECKING, TypeVar, cast +from urllib.parse import quote + +from litestar.utils.typing import get_origin_or_inner_type + +if TYPE_CHECKING: + from collections.abc import Container + + from litestar.types import MaybePartial + +__all__ = ( + "get_enum_string_value", + "get_name", + "unwrap_partial", + "url_quote", + "unique_name_for_scope", +) + +T = TypeVar("T") + + +def get_name(value: object) -> str: + """Get the ``__name__`` of an object. + + Args: + value: An arbitrary object. + + Returns: + A name string. + """ + + name = getattr(value, "__name__", None) + if name is not None: + return cast("str", name) + + # On Python 3.8 and 3.9, Foo[int] does not have the __name__ attribute. + if origin := get_origin_or_inner_type(value): + return cast("str", origin.__name__) + + return type(value).__name__ + + +def get_enum_string_value(value: Enum | str) -> str: + """Return the string value of a string enum. + + See: https://github.com/litestar-org/litestar/pull/633#issuecomment-1286519267 + + Args: + value: An enum or string. + + Returns: + A string. + """ + return value.value if isinstance(value, Enum) else value # type: ignore[no-any-return] + + +def unwrap_partial(value: MaybePartial[T]) -> T: + """Unwraps a partial, returning the underlying callable. + + Args: + value: A partial function. + + Returns: + Callable + """ + from litestar.utils.sync import AsyncCallable + + return cast("T", value.func if isinstance(value, (partial, AsyncCallable)) else value) + + +def url_quote(value: str | bytes) -> str: + """Quote a URL. + + Args: + value: A URL. + + Returns: + A quoted URL. + """ + return quote(value, safe="/#%[]=:;$&()+,!?*@'~") + + +def unique_name_for_scope(base_name: str, scope: Container[str]) -> str: + """Create a name derived from ``base_name`` that's unique within ``scope``""" + i = 0 + while True: + if (unique_name := f"{base_name}_{i}") not in scope: + return unique_name + i += 1 + + +def get_exception_group() -> type[BaseException]: + """Get the exception group class with version compatibility.""" + try: + return cast("type[BaseException]", ExceptionGroup) # type:ignore[name-defined] + except NameError: + from exceptiongroup import ExceptionGroup as _ExceptionGroup # pyright: ignore + + return cast("type[BaseException]", _ExceptionGroup) diff --git a/venv/lib/python3.11/site-packages/litestar/utils/module_loader.py b/venv/lib/python3.11/site-packages/litestar/utils/module_loader.py new file mode 100644 index 0000000..09dbf9f --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/module_loader.py @@ -0,0 +1,92 @@ +"""General utility functions.""" + +from __future__ import annotations + +import os.path +import sys +from importlib import import_module +from importlib.util import find_spec +from pathlib import Path +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from types import ModuleType + +__all__ = ( + "import_string", + "module_to_os_path", +) + + +def module_to_os_path(dotted_path: str = "app") -> Path: + """Find Module to OS Path. + + Return a path to the base directory of the project or the module + specified by `dotted_path`. + + Args: + dotted_path: The path to the module. Defaults to "app". + + Raises: + TypeError: The module could not be found. + + Returns: + Path: The path to the module. + """ + try: + if (src := find_spec(dotted_path)) is None: # pragma: no cover + raise TypeError(f"Couldn't find the path for {dotted_path}") + except ModuleNotFoundError as e: + raise TypeError(f"Couldn't find the path for {dotted_path}") from e + + return Path(str(src.origin).rsplit(os.path.sep + "__init__.py", maxsplit=1)[0]) + + +def import_string(dotted_path: str) -> Any: + """Dotted Path Import. + + Import a dotted module path and return the attribute/class designated by the + last name in the path. Raise ImportError if the import failed. + + Args: + dotted_path: The path of the module to import. + + Raises: + ImportError: Could not import the module. + + Returns: + object: The imported object. + """ + + def _is_loaded(module: ModuleType | None) -> bool: + spec = getattr(module, "__spec__", None) + initializing = getattr(spec, "_initializing", False) + return bool(module and spec and not initializing) + + def _cached_import(module_path: str, class_name: str) -> Any: + """Import and cache a class from a module. + + Args: + module_path: dotted path to module. + class_name: Class or function name. + + Returns: + object: The imported class or function + """ + # Check whether module is loaded and fully initialized. + module = sys.modules.get(module_path) + if not _is_loaded(module): + module = import_module(module_path) + return getattr(module, class_name) + + try: + module_path, class_name = dotted_path.rsplit(".", 1) + except ValueError as e: + msg = "%s doesn't look like a module path" + raise ImportError(msg, dotted_path) from e + + try: + return _cached_import(module_path, class_name) + except AttributeError as e: + msg = "Module '%s' does not define a '%s' attribute/class" + raise ImportError(msg, module_path, class_name) from e diff --git a/venv/lib/python3.11/site-packages/litestar/utils/path.py b/venv/lib/python3.11/site-packages/litestar/utils/path.py new file mode 100644 index 0000000..76b43af --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/path.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import re +from typing import Iterable + +__all__ = ("join_paths", "normalize_path") + + +multi_slash_pattern = re.compile("//+") + + +def normalize_path(path: str) -> str: + """Normalize a given path by ensuring it starts with a slash and does not end with a slash. + + Args: + path: Path string + + Returns: + Path string + """ + path = path.strip("/") + path = f"/{path}" + return multi_slash_pattern.sub("/", path) + + +def join_paths(paths: Iterable[str]) -> str: + """Normalize and joins path fragments. + + Args: + paths: An iterable of path fragments. + + Returns: + A normalized joined path string. + """ + return normalize_path("/".join(paths)) diff --git a/venv/lib/python3.11/site-packages/litestar/utils/predicates.py b/venv/lib/python3.11/site-packages/litestar/utils/predicates.py new file mode 100644 index 0000000..11d5f79 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/predicates.py @@ -0,0 +1,339 @@ +from __future__ import annotations + +from asyncio import iscoroutinefunction +from collections import defaultdict, deque +from collections.abc import Iterable as CollectionsIterable +from dataclasses import is_dataclass +from inspect import isasyncgenfunction, isclass, isgeneratorfunction +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + ClassVar, + DefaultDict, + Deque, + Dict, + FrozenSet, + Generic, + Iterable, + List, + Mapping, + Optional, + Sequence, + Set, + Tuple, + TypeVar, +) + +from typing_extensions import ( + ParamSpec, + TypeGuard, + _AnnotatedAlias, + get_args, +) + +from litestar.constants import UNDEFINED_SENTINELS +from litestar.types import Empty +from litestar.types.builtin_types import NoneType, UnionTypes +from litestar.utils.deprecation import warn_deprecation +from litestar.utils.helpers import unwrap_partial +from litestar.utils.typing import get_origin_or_inner_type + +if TYPE_CHECKING: + from litestar.types.callable_types import AnyGenerator + from litestar.types.protocols import DataclassProtocol + +try: + import attrs +except ImportError: + attrs = Empty # type: ignore[assignment] + +__all__ = ( + "is_annotated_type", + "is_any", + "is_async_callable", + "is_attrs_class", + "is_class_and_subclass", + "is_class_var", + "is_dataclass_class", + "is_dataclass_instance", + "is_generic", + "is_mapping", + "is_non_string_iterable", + "is_non_string_sequence", + "is_optional_union", + "is_undefined_sentinel", + "is_union", +) + +P = ParamSpec("P") +T = TypeVar("T") + + +def is_async_callable(value: Callable[P, T]) -> TypeGuard[Callable[P, Awaitable[T]]]: + """Extend :func:`asyncio.iscoroutinefunction` to additionally detect async :func:`functools.partial` objects and + class instances with ``async def __call__()`` defined. + + Args: + value: Any + + Returns: + Bool determining if type of ``value`` is an awaitable. + """ + value = unwrap_partial(value) + + return iscoroutinefunction(value) or ( + callable(value) and iscoroutinefunction(value.__call__) # type: ignore[operator] + ) + + +def is_dataclass_instance(obj: Any) -> TypeGuard[DataclassProtocol]: + """Check if an object is a dataclass instance. + + Args: + obj: An object to check. + + Returns: + True if the object is a dataclass instance. + """ + return hasattr(type(obj), "__dataclass_fields__") + + +def is_dataclass_class(annotation: Any) -> TypeGuard[type[DataclassProtocol]]: + """Wrap :func:`is_dataclass <dataclasses.is_dataclass>` in a :data:`typing.TypeGuard`. + + Args: + annotation: tested to determine if instance or type of :class:`dataclasses.dataclass`. + + Returns: + ``True`` if instance or type of ``dataclass``. + """ + try: + origin = get_origin_or_inner_type(annotation) + annotation = origin or annotation + + return isclass(annotation) and is_dataclass(annotation) + except TypeError: # pragma: no cover + return False + + +def is_class_and_subclass(annotation: Any, type_or_type_tuple: type[T] | tuple[type[T], ...]) -> TypeGuard[type[T]]: + """Return ``True`` if ``value`` is a ``class`` and is a subtype of ``t_type``. + + See https://github.com/litestar-org/litestar/issues/367 + + Args: + annotation: The value to check if is class and subclass of ``t_type``. + type_or_type_tuple: Type used for :func:`issubclass` check of ``value`` + + Returns: + bool + """ + origin = get_origin_or_inner_type(annotation) + if not origin and not isclass(annotation): + return False + try: + return issubclass(origin or annotation, type_or_type_tuple) + except TypeError: # pragma: no cover + return False + + +def is_generic(annotation: Any) -> bool: + """Given a type annotation determine if the annotation is a generic class. + + Args: + annotation: A type. + + Returns: + True if the annotation is a subclass of :data:`Generic <typing.Generic>` otherwise ``False``. + """ + return is_class_and_subclass(annotation, Generic) # type: ignore[arg-type] + + +def is_mapping(annotation: Any) -> TypeGuard[Mapping[Any, Any]]: + """Given a type annotation determine if the annotation is a mapping type. + + Args: + annotation: A type. + + Returns: + A typeguard determining whether the type can be cast as :class:`Mapping <typing.Mapping>`. + """ + _type = get_origin_or_inner_type(annotation) or annotation + return isclass(_type) and issubclass(_type, (dict, defaultdict, DefaultDict, Mapping)) + + +def is_non_string_iterable(annotation: Any) -> TypeGuard[Iterable[Any]]: + """Given a type annotation determine if the annotation is an iterable. + + Args: + annotation: A type. + + Returns: + A typeguard determining whether the type can be cast as :class:`Iterable <typing.Iterable>` that is not a string. + """ + origin = get_origin_or_inner_type(annotation) + if not origin and not isclass(annotation): + return False + try: + return not issubclass(origin or annotation, (str, bytes)) and ( + issubclass(origin or annotation, (Iterable, CollectionsIterable, Dict, dict, Mapping)) + or is_non_string_sequence(annotation) + ) + except TypeError: # pragma: no cover + return False + + +def is_non_string_sequence(annotation: Any) -> TypeGuard[Sequence[Any]]: + """Given a type annotation determine if the annotation is a sequence. + + Args: + annotation: A type. + + Returns: + A typeguard determining whether the type can be cast as :class`Sequence <typing.Sequence>` that is not a string. + """ + origin = get_origin_or_inner_type(annotation) + if not origin and not isclass(annotation): + return False + try: + return not issubclass(origin or annotation, (str, bytes)) and issubclass( + origin or annotation, + ( # type: ignore[arg-type] + Tuple, + List, + Set, + FrozenSet, + Deque, + Sequence, + list, + tuple, + deque, + set, + frozenset, + ), + ) + except TypeError: # pragma: no cover + return False + + +def is_any(annotation: Any) -> TypeGuard[Any]: + """Given a type annotation determine if the annotation is Any. + + Args: + annotation: A type. + + Returns: + A typeguard determining whether the type is :data:`Any <typing.Any>`. + """ + return ( + annotation is Any + or getattr(annotation, "_name", "") == "typing.Any" + or (get_origin_or_inner_type(annotation) in UnionTypes and Any in get_args(annotation)) + ) + + +def is_union(annotation: Any) -> bool: + """Given a type annotation determine if the annotation infers an optional union. + + Args: + annotation: A type. + + Returns: + A boolean determining whether the type is :data:`Union typing.Union>`. + """ + return get_origin_or_inner_type(annotation) in UnionTypes + + +def is_optional_union(annotation: Any) -> TypeGuard[Any | None]: + """Given a type annotation determine if the annotation infers an optional union. + + Args: + annotation: A type. + + Returns: + A typeguard determining whether the type is :data:`Union typing.Union>` with a + None value or :data:`Optional <typing.Optional>` which is equivalent. + """ + origin = get_origin_or_inner_type(annotation) + return origin is Optional or ( + get_origin_or_inner_type(annotation) in UnionTypes and NoneType in get_args(annotation) + ) + + +def is_attrs_class(annotation: Any) -> TypeGuard[type[attrs.AttrsInstance]]: # pyright: ignore + """Given a type annotation determine if the annotation is a class that includes an attrs attribute. + + Args: + annotation: A type. + + Returns: + A typeguard determining whether the type is an attrs class. + """ + return attrs.has(annotation) if attrs is not Empty else False # type: ignore[comparison-overlap] + + +def is_class_var(annotation: Any) -> bool: + """Check if the given annotation is a ClassVar. + + Args: + annotation: A type annotation + + Returns: + A boolean. + """ + annotation = get_origin_or_inner_type(annotation) or annotation + return annotation is ClassVar + + +def _is_sync_or_async_generator(obj: Any) -> TypeGuard[AnyGenerator]: + """Check if the given annotation is a sync or async generator. + + Args: + obj: type to be tested for sync or async generator. + + Returns: + A boolean. + """ + return isgeneratorfunction(obj) or isasyncgenfunction(obj) + + +def is_annotated_type(annotation: Any) -> bool: + """Check if the given annotation is an Annotated. + + Args: + annotation: A type annotation + + Returns: + A boolean. + """ + return isinstance(annotation, _AnnotatedAlias) and getattr(annotation, "__args__", None) is not None + + +def is_undefined_sentinel(value: Any) -> bool: + """Check if the given value is the undefined sentinel. + + Args: + value: A value to be tested for undefined sentinel. + + Returns: + A boolean. + """ + return any(v is value for v in UNDEFINED_SENTINELS) + + +_deprecated_names = {"is_sync_or_async_generator": _is_sync_or_async_generator} + + +def __getattr__(name: str) -> Any: + if name in _deprecated_names: + warn_deprecation( + deprecated_name=f"litestar.utils.scope.{name}", + version="2.4", + kind="import", + removal_in="3.0", + info=f"'litestar.utils.predicates.{name}' is deprecated.", + ) + return globals()["_deprecated_names"][name] + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") # pragma: no cover diff --git a/venv/lib/python3.11/site-packages/litestar/utils/scope/__init__.py b/venv/lib/python3.11/site-packages/litestar/utils/scope/__init__.py new file mode 100644 index 0000000..e5757d3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/scope/__init__.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from litestar.serialization import get_serializer +from litestar.utils.deprecation import warn_deprecation +from litestar.utils.scope.state import delete_litestar_scope_state as _delete_litestar_scope_state +from litestar.utils.scope.state import get_litestar_scope_state as _get_litestar_scope_state +from litestar.utils.scope.state import set_litestar_scope_state as _set_litestar_scope_state + +if TYPE_CHECKING: + from litestar.types import Scope, Serializer + +__all__ = ("get_serializer_from_scope",) + + +def get_serializer_from_scope(scope: Scope) -> Serializer: + """Return a serializer given a scope object. + + Args: + scope: The ASGI connection scope. + + Returns: + A serializer function + """ + route_handler = scope["route_handler"] + app = scope["app"] + + if hasattr(route_handler, "resolve_type_encoders"): + type_encoders = route_handler.resolve_type_encoders() + else: + type_encoders = app.type_encoders or {} + + if response_class := ( + route_handler.resolve_response_class() # pyright: ignore + if hasattr(route_handler, "resolve_response_class") + else app.response_class + ): + type_encoders = {**type_encoders, **(response_class.type_encoders or {})} + + return get_serializer(type_encoders) + + +_deprecated_names = { + "get_litestar_scope_state": _get_litestar_scope_state, + "set_litestar_scope_state": _set_litestar_scope_state, + "delete_litestar_scope_state": _delete_litestar_scope_state, +} + + +def __getattr__(name: str) -> Any: + if name in _deprecated_names: + warn_deprecation( + deprecated_name=f"litestar.utils.scope.{name}", + version="2.4", + kind="import", + removal_in="3.0", + info=f"'litestar.utils.scope.{name}' is deprecated. The Litestar scope state is private and should not be " + f"used. Plugin authors should maintain their own scope state namespace.", + ) + return globals()["_deprecated_names"][name] + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") # pragma: no cover diff --git a/venv/lib/python3.11/site-packages/litestar/utils/scope/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/scope/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..ee218d5 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/scope/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/scope/__pycache__/state.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/utils/scope/__pycache__/state.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..4615004 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/scope/__pycache__/state.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/utils/scope/state.py b/venv/lib/python3.11/site-packages/litestar/utils/scope/state.py new file mode 100644 index 0000000..bed4394 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/scope/state.py @@ -0,0 +1,158 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Final + +from litestar.types import Empty, EmptyType +from litestar.utils.empty import value_or_default + +if TYPE_CHECKING: + from typing_extensions import Self + + from litestar.datastructures import URL, Accept, Headers + from litestar.types.asgi_types import Scope + +CONNECTION_STATE_KEY: Final = "_ls_connection_state" + + +@dataclass +class ScopeState: + """An object for storing connection state. + + This is an internal API, and subject to change without notice. + + All types are a union with `EmptyType` and are seeded with the `Empty` value. + """ + + __slots__ = ( + "accept", + "base_url", + "body", + "content_type", + "cookies", + "csrf_token", + "dependency_cache", + "do_cache", + "form", + "headers", + "is_cached", + "json", + "log_context", + "msgpack", + "parsed_query", + "response_compressed", + "session_id", + "url", + "_compat_ns", + ) + + def __init__(self) -> None: + self.accept = Empty + self.base_url = Empty + self.body = Empty + self.content_type = Empty + self.cookies = Empty + self.csrf_token = Empty + self.dependency_cache = Empty + self.do_cache = Empty + self.form = Empty + self.headers = Empty + self.is_cached = Empty + self.json = Empty + self.log_context: dict[str, Any] = {} + self.msgpack = Empty + self.parsed_query = Empty + self.response_compressed = Empty + self.session_id = Empty + self.url = Empty + self._compat_ns: dict[str, Any] = {} + + accept: Accept | EmptyType + base_url: URL | EmptyType + body: bytes | EmptyType + content_type: tuple[str, dict[str, str]] | EmptyType + cookies: dict[str, str] | EmptyType + csrf_token: str | EmptyType + dependency_cache: dict[str, Any] | EmptyType + do_cache: bool | EmptyType + form: dict[str, str | list[str]] | EmptyType + headers: Headers | EmptyType + is_cached: bool | EmptyType + json: Any | EmptyType + log_context: dict[str, Any] + msgpack: Any | EmptyType + parsed_query: tuple[tuple[str, str], ...] | EmptyType + response_compressed: bool | EmptyType + session_id: str | None | EmptyType + url: URL | EmptyType + _compat_ns: dict[str, Any] + + @classmethod + def from_scope(cls, scope: Scope) -> Self: + """Create a new `ConnectionState` object from a scope. + + Object is cached in the scope's state under the `SCOPE_STATE_NAMESPACE` key. + + Args: + scope: The ASGI connection scope. + + Returns: + A `ConnectionState` object. + """ + base_scope_state = scope.setdefault("state", {}) + if (state := base_scope_state.get(CONNECTION_STATE_KEY)) is None: + state = base_scope_state[CONNECTION_STATE_KEY] = cls() + return state + + +def get_litestar_scope_state(scope: Scope, key: str, default: Any = None, pop: bool = False) -> Any: + """Get an internal value from connection scope state. + + Args: + scope: The connection scope. + key: Key to get from internal namespace in scope state. + default: Default value to return. + pop: Boolean flag dictating whether the value should be deleted from the state. + + Returns: + Value mapped to ``key`` in internal connection scope namespace. + """ + scope_state = ScopeState.from_scope(scope) + try: + val = value_or_default(getattr(scope_state, key), default) + if pop: + setattr(scope_state, key, Empty) + return val + except AttributeError: + if pop: + return scope_state._compat_ns.pop(key, default) + return scope_state._compat_ns.get(key, default) + + +def set_litestar_scope_state(scope: Scope, key: str, value: Any) -> None: + """Set an internal value in connection scope state. + + Args: + scope: The connection scope. + key: Key to set under internal namespace in scope state. + value: Value for key. + """ + scope_state = ScopeState.from_scope(scope) + if hasattr(scope_state, key): + setattr(scope_state, key, value) + else: + scope_state._compat_ns[key] = value + + +def delete_litestar_scope_state(scope: Scope, key: str) -> None: + """Delete an internal value from connection scope state. + + Args: + scope: The connection scope. + key: Key to set under internal namespace in scope state. + """ + scope_state = ScopeState.from_scope(scope) + if hasattr(scope_state, key): + setattr(scope_state, key, Empty) + else: + del scope_state._compat_ns[key] diff --git a/venv/lib/python3.11/site-packages/litestar/utils/sequence.py b/venv/lib/python3.11/site-packages/litestar/utils/sequence.py new file mode 100644 index 0000000..01ef1a8 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/sequence.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from typing import Callable, Sequence, TypeVar + +__all__ = ("find_index", "unique") + + +T = TypeVar("T") + + +def find_index(target_list: Sequence[T], predicate: Callable[[T], bool]) -> int: + """Find element in list given a key and value. + + List elements can be dicts or classes + """ + return next((i for i, element in enumerate(target_list) if predicate(element)), -1) + + +def unique(value: Sequence[T]) -> list[T]: + """Return all unique values in a given sequence or iterator.""" + try: + return list(set(value)) + except TypeError: + output: list[T] = [] + for element in value: + if element not in output: + output.append(element) + return output diff --git a/venv/lib/python3.11/site-packages/litestar/utils/signature.py b/venv/lib/python3.11/site-packages/litestar/utils/signature.py new file mode 100644 index 0000000..eb58599 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/signature.py @@ -0,0 +1,267 @@ +from __future__ import annotations + +import sys +import typing +from copy import deepcopy +from dataclasses import dataclass, replace +from inspect import Signature, getmembers, isclass, ismethod +from itertools import chain +from typing import TYPE_CHECKING, Any, Union + +from typing_extensions import Annotated, Self, get_args, get_origin, get_type_hints + +from litestar import connection, datastructures, types +from litestar.exceptions import ImproperlyConfiguredException +from litestar.types import Empty +from litestar.typing import FieldDefinition +from litestar.utils.typing import unwrap_annotation + +if TYPE_CHECKING: + from typing import Sequence + + from litestar.types import AnyCallable + +if sys.version_info < (3, 11): + from typing import _get_defaults # type: ignore[attr-defined] +else: + + def _get_defaults(_: Any) -> Any: ... + + +__all__ = ( + "add_types_to_signature_namespace", + "get_fn_type_hints", + "ParsedSignature", +) + +_GLOBAL_NAMES = { + namespace: export + for namespace, export in chain( + tuple(getmembers(types)), tuple(getmembers(connection)), tuple(getmembers(datastructures)) + ) + if namespace[0].isupper() and namespace in chain(types.__all__, connection.__all__, datastructures.__all__) # pyright: ignore +} +"""A mapping of names used for handler signature forward-ref resolution. + +This allows users to include these names within an `if TYPE_CHECKING:` block in their handler module. +""" + + +def _unwrap_implicit_optional_hints(defaults: dict[str, Any], hints: dict[str, Any]) -> dict[str, Any]: + """Unwrap implicit optional hints. + + On Python<3.11, if a function parameter annotation has a ``None`` default, it is unconditionally wrapped in an + ``Optional`` type. + + If the annotation is not annotated, then any nested unions are flattened, e.g.,: + + .. code-block:: python + + def foo(a: Optional[Union[str, int]] = None): ... + + ...will become `Union[str, int, NoneType]`. + + However, if the annotation is annotated, then we end up with an optional union around the annotated type, e.g.,: + + .. code-block:: python + + def foo(a: Annotated[Optional[Union[str, int]], ...] = None): ... + + ... becomes `Union[Annotated[Union[str, int, NoneType], ...], NoneType]` + + This function makes the latter case consistent with the former by either removing the outer union if it is redundant + or flattening the union if it is not. The latter case would become `Annotated[Union[str, int, NoneType], ...]`. + + Args: + defaults: Mapping of names to default values. + hints: Mapping of names to types. + + Returns: + Mapping of names to types. + """ + + def _is_two_arg_optional(origin_: Any, args_: Any) -> bool: + """Check if a type is a two-argument optional type. + + If the type has been wrapped in `Optional` by `get_type_hints()` it will always be a union of a type and + `NoneType`. + + See: https://github.com/litestar-org/litestar/pull/2516 + """ + return origin_ is Union and len(args_) == 2 and args_[1] is type(None) + + def _is_any_optional(origin_: Any, args_: tuple[Any, ...]) -> bool: + """Detect if a type is a union with `NoneType`. + + After detecting that a type is a two-argument optional type, this function can be used to detect if the + inner type is a union with `NoneType` at all. + + We only want to perform the unwrapping of the optional union if the inner type is optional as well. + """ + return origin_ is Union and any(arg is type(None) for arg in args_) + + for name, default in defaults.items(): + if default is not None: + continue + + hint = hints[name] + origin = get_origin(hint) + args = get_args(hint) + + if _is_two_arg_optional(origin, args): + unwrapped_inner, meta, wrappers = unwrap_annotation(args[0]) + + if Annotated not in wrappers: + continue + + inner_args = get_args(unwrapped_inner) + + if not _is_any_optional(get_origin(unwrapped_inner), inner_args): + # this is where hint is like `Union[Annotated[Union[str, int], ...], NoneType]`, we add the outer union + # into the inner one, and re-wrap with Annotated + union_args = (*(inner_args or (unwrapped_inner,)), type(None)) + # calling `__class_getitem__` directly as in earlier py vers it is a syntax error to unpack into + # the getitem brackets, e.g., Annotated[T, *meta]. + hints[name] = Annotated.__class_getitem__((Union[union_args], *meta)) # type: ignore[attr-defined] + continue + + # this is where hint is like `Union[Annotated[Union[str, NoneType], ...], NoneType]`, we remove the + # redundant outer union + hints[name] = args[0] + return hints + + +def get_fn_type_hints(fn: Any, namespace: dict[str, Any] | None = None) -> dict[str, Any]: + """Resolve type hints for ``fn``. + + Args: + fn: Callable that is being inspected + namespace: Extra names for resolution of forward references. + + Returns: + Mapping of names to types. + """ + fn_to_inspect: Any = fn + + module_name = fn_to_inspect.__module__ + + if isclass(fn_to_inspect): + fn_to_inspect = fn_to_inspect.__init__ + + # detect objects that are not functions and that have a `__call__` method + if callable(fn_to_inspect) and ismethod(fn_to_inspect.__call__): + fn_to_inspect = fn_to_inspect.__call__ + + # inspect the underlying function for methods + if hasattr(fn_to_inspect, "__func__"): + fn_to_inspect = fn_to_inspect.__func__ + + # Order important. If a litestar name has been overridden in the function module, we want + # to use that instead of the litestar one. + namespace = { + **_GLOBAL_NAMES, + **vars(typing), + **vars(sys.modules[module_name]), + **(namespace or {}), + } + hints = get_type_hints(fn_to_inspect, globalns=namespace, include_extras=True) + + if sys.version_info < (3, 11): + # see https://github.com/litestar-org/litestar/pull/2516 + defaults = _get_defaults(fn_to_inspect) + hints = _unwrap_implicit_optional_hints(defaults, hints) + + return hints + + +@dataclass(frozen=True) +class ParsedSignature: + """Parsed signature. + + This object is the primary source of handler/dependency signature information. + + The only post-processing that occurs is the conversion of any forward referenced type annotations. + """ + + __slots__ = ("parameters", "return_type", "original_signature") + + parameters: dict[str, FieldDefinition] + """A mapping of parameter names to ParsedSignatureParameter instances.""" + return_type: FieldDefinition + """The return annotation of the callable.""" + original_signature: Signature + """The raw signature as returned by :func:`inspect.signature`""" + + def __deepcopy__(self, memo: dict[str, Any]) -> Self: + return type(self)( + parameters={k: deepcopy(v) for k, v in self.parameters.items()}, + return_type=deepcopy(self.return_type), + original_signature=deepcopy(self.original_signature), + ) + + @classmethod + def from_fn(cls, fn: AnyCallable, signature_namespace: dict[str, Any]) -> Self: + """Parse a function signature. + + Args: + fn: Any callable. + signature_namespace: mapping of names to types for forward reference resolution + + Returns: + ParsedSignature + """ + signature = Signature.from_callable(fn) + fn_type_hints = get_fn_type_hints(fn, namespace=signature_namespace) + + return cls.from_signature(signature, fn_type_hints) + + @classmethod + def from_signature(cls, signature: Signature, fn_type_hints: dict[str, type]) -> Self: + """Parse an :class:`inspect.Signature` instance. + + Args: + signature: An :class:`inspect.Signature` instance. + fn_type_hints: mapping of types + + Returns: + ParsedSignature + """ + + parameters = tuple( + FieldDefinition.from_parameter(parameter=parameter, fn_type_hints=fn_type_hints) + for name, parameter in signature.parameters.items() + if name not in ("self", "cls") + ) + + return_type = FieldDefinition.from_annotation(fn_type_hints.get("return", Any)) + + return cls( + parameters={p.name: p for p in parameters}, + return_type=return_type if "return" in fn_type_hints else replace(return_type, annotation=Empty), + original_signature=signature, + ) + + +def add_types_to_signature_namespace( + signature_types: Sequence[Any], signature_namespace: dict[str, Any] +) -> dict[str, Any]: + """Add types to ith signature namespace mapping. + + Types are added mapped to their `__name__` attribute. + + Args: + signature_types: A list of types to add to the signature namespace. + signature_namespace: The signature namespace to add types to. + + Raises: + ImproperlyConfiguredException: If a type is already defined in the signature namespace. + AttributeError: If a type does not have a `__name__` attribute. + + Returns: + The updated signature namespace. + """ + for typ in signature_types: + if (name := typ.__name__) in signature_namespace: + raise ImproperlyConfiguredException(f"Type '{name}' is already defined in the signature namespace") + signature_namespace[name] = typ + return signature_namespace diff --git a/venv/lib/python3.11/site-packages/litestar/utils/sync.py b/venv/lib/python3.11/site-packages/litestar/utils/sync.py new file mode 100644 index 0000000..02acabf --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/sync.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from typing import ( + AsyncGenerator, + Awaitable, + Callable, + Generic, + Iterable, + Iterator, + TypeVar, +) + +from typing_extensions import ParamSpec + +from litestar.concurrency import sync_to_thread +from litestar.utils.predicates import is_async_callable + +__all__ = ("ensure_async_callable", "AsyncIteratorWrapper", "AsyncCallable", "is_async_callable") + + +P = ParamSpec("P") +T = TypeVar("T") + + +def ensure_async_callable(fn: Callable[P, T]) -> Callable[P, Awaitable[T]]: + """Ensure that ``fn`` is an asynchronous callable. + If it is an asynchronous, return the original object, else wrap it in an + ``AsyncCallable`` + """ + if is_async_callable(fn): + return fn + return AsyncCallable(fn) # pyright: ignore + + +class AsyncCallable: + """Wrap a given callable to be called in a thread pool using + ``anyio.to_thread.run_sync``, keeping a reference to the original callable as + :attr:`func` + """ + + def __init__(self, fn: Callable[P, T]) -> None: # pyright: ignore + self.func = fn + + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[T]: # pyright: ignore + return sync_to_thread(self.func, *args, **kwargs) # pyright: ignore + + +class AsyncIteratorWrapper(Generic[T]): + """Asynchronous generator, wrapping an iterable or iterator.""" + + __slots__ = ("iterator", "generator") + + def __init__(self, iterator: Iterator[T] | Iterable[T]) -> None: + """Take a sync iterator or iterable and yields values from it asynchronously. + + Args: + iterator: A sync iterator or iterable. + """ + self.iterator = iterator if isinstance(iterator, Iterator) else iter(iterator) + self.generator = self._async_generator() + + def _call_next(self) -> T: + try: + return next(self.iterator) + except StopIteration as e: + raise ValueError from e + + async def _async_generator(self) -> AsyncGenerator[T, None]: + while True: + try: + yield await sync_to_thread(self._call_next) + except ValueError: + return + + def __aiter__(self) -> AsyncIteratorWrapper[T]: + return self + + async def __anext__(self) -> T: + return await self.generator.__anext__() diff --git a/venv/lib/python3.11/site-packages/litestar/utils/typing.py b/venv/lib/python3.11/site-packages/litestar/utils/typing.py new file mode 100644 index 0000000..9da6c2a --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/typing.py @@ -0,0 +1,284 @@ +from __future__ import annotations + +import re +from collections import abc, defaultdict, deque +from typing import ( + AbstractSet, + Any, + AsyncGenerator, + AsyncIterable, + AsyncIterator, + Awaitable, + Collection, + Container, + Coroutine, + DefaultDict, + Deque, + Dict, + FrozenSet, + Generator, + ItemsView, + Iterable, + Iterator, + KeysView, + List, + Mapping, + MappingView, + MutableMapping, + MutableSequence, + MutableSet, + Reversible, + Sequence, + Set, + Tuple, + TypeVar, + Union, + ValuesView, + cast, +) + +from typing_extensions import Annotated, NotRequired, Required, get_args, get_origin, get_type_hints + +from litestar.types.builtin_types import NoneType, UnionTypes + +__all__ = ( + "get_instantiable_origin", + "get_origin_or_inner_type", + "get_safe_generic_origin", + "instantiable_type_mapping", + "make_non_optional_union", + "safe_generic_origin_map", + "unwrap_annotation", +) + + +T = TypeVar("T") +UnionT = TypeVar("UnionT", bound="Union") + +tuple_types_regex = re.compile( + "^" + + "|".join( + [*[repr(x) for x in (List, Sequence, Iterable, Iterator, Tuple, Deque)], "tuple", "list", "collections.deque"] + ) +) + +instantiable_type_mapping = { + AbstractSet: set, + DefaultDict: defaultdict, + Deque: deque, + Dict: dict, + FrozenSet: frozenset, + List: list, + Mapping: dict, + MutableMapping: dict, + MutableSequence: list, + MutableSet: set, + Sequence: list, + Set: set, + Tuple: tuple, + abc.Mapping: dict, + abc.MutableMapping: dict, + abc.MutableSequence: list, + abc.MutableSet: set, + abc.Sequence: list, + abc.Set: set, + defaultdict: defaultdict, + deque: deque, + dict: dict, + frozenset: frozenset, + list: list, + set: set, + tuple: tuple, +} + +safe_generic_origin_map = { + set: AbstractSet, + defaultdict: DefaultDict, + deque: Deque, + dict: Dict, + frozenset: FrozenSet, + list: List, + tuple: Tuple, + abc.Mapping: Mapping, + abc.MutableMapping: MutableMapping, + abc.MutableSequence: MutableSequence, + abc.MutableSet: MutableSet, + abc.Sequence: Sequence, + abc.Set: AbstractSet, + abc.Collection: Collection, + abc.Container: Container, + abc.ItemsView: ItemsView, + abc.KeysView: KeysView, + abc.MappingView: MappingView, + abc.ValuesView: ValuesView, + abc.Iterable: Iterable, + abc.Iterator: Iterator, + abc.Generator: Generator, + abc.Reversible: Reversible, + abc.Coroutine: Coroutine, + abc.AsyncGenerator: AsyncGenerator, + abc.AsyncIterable: AsyncIterable, + abc.AsyncIterator: AsyncIterator, + abc.Awaitable: Awaitable, + **{union_t: Union for union_t in UnionTypes}, +} +"""A mapping of types to equivalent types that are safe to be used as generics across all Python versions. + +This is necessary because occasionally we want to rebuild a generic outer type with different args, and types such as +``collections.abc.Mapping``, are not valid generic types in Python 3.8. +""" + +wrapper_type_set = {Annotated, Required, NotRequired} +"""Types that always contain a wrapped type annotation as their first arg.""" + + +def normalize_type_annotation(annotation: Any) -> Any: + """Normalize a type annotation to a standard form.""" + return instantiable_type_mapping.get(annotation, annotation) + + +def make_non_optional_union(annotation: UnionT | None) -> UnionT: + """Make a :data:`Union <typing.Union>` type that excludes ``NoneType``. + + Args: + annotation: A type annotation. + + Returns: + The union with all original members, except ``NoneType``. + """ + args = tuple(tp for tp in get_args(annotation) if tp is not NoneType) + return cast("UnionT", Union[args]) # pyright: ignore + + +def unwrap_annotation(annotation: Any) -> tuple[Any, tuple[Any, ...], set[Any]]: + """Remove "wrapper" annotation types, such as ``Annotated``, ``Required``, and ``NotRequired``. + + Note: + ``annotation`` should have been retrieved from :func:`get_type_hints()` with ``include_extras=True``. This + ensures that any nested ``Annotated`` types are flattened according to the PEP 593 specification. + + Args: + annotation: A type annotation. + + Returns: + A tuple of the unwrapped annotation and any ``Annotated`` metadata, and a set of any wrapper types encountered. + """ + origin = get_origin(annotation) + wrappers = set() + metadata = [] + while origin in wrapper_type_set: + wrappers.add(origin) + annotation, *meta = get_args(annotation) + metadata.extend(meta) + origin = get_origin(annotation) + return annotation, tuple(metadata), wrappers + + +def get_origin_or_inner_type(annotation: Any) -> Any: + """Get origin or unwrap it. Returns None for non-generic types. + + Args: + annotation: A type annotation. + + Returns: + Any type. + """ + origin = get_origin(annotation) + if origin in wrapper_type_set: + inner, _, _ = unwrap_annotation(annotation) + # we need to recursively call here 'get_origin_or_inner_type' because we might be dealing + # with a generic type alias e.g. Annotated[dict[str, list[int]] + origin = get_origin_or_inner_type(inner) + return instantiable_type_mapping.get(origin, origin) + + +def get_safe_generic_origin(origin_type: Any, annotation: Any) -> Any: + """Get a type that is safe to use as a generic type across all supported Python versions. + + If a builtin collection type is annotated without generic args, e.g, ``a: dict``, then the origin type will be + ``None``. In this case, we can use the annotation to determine the correct generic type, if one exists. + + Args: + origin_type: A type - would be the return value of :func:`get_origin()`. + annotation: Type annotation associated with the origin type. Should be unwrapped from any wrapper types, such + as ``Annotated``. + + Returns: + The ``typing`` module equivalent of the given type, if it exists. Otherwise, the original type is returned. + """ + if origin_type is None: + return safe_generic_origin_map.get(annotation) + return safe_generic_origin_map.get(origin_type, origin_type) + + +def get_instantiable_origin(origin_type: Any, annotation: Any) -> Any: + """Get a type that is safe to instantiate for the given origin type. + + If a builtin collection type is annotated without generic args, e.g, ``a: dict``, then the origin type will be + ``None``. In this case, we can use the annotation to determine the correct instantiable type, if one exists. + + Args: + origin_type: A type - would be the return value of :func:`get_origin()`. + annotation: Type annotation associated with the origin type. Should be unwrapped from any wrapper types, such + as ``Annotated``. + + Returns: + A builtin type that is safe to instantiate for the given origin type. + """ + if origin_type is None: + return instantiable_type_mapping.get(annotation) + return instantiable_type_mapping.get(origin_type, origin_type) + + +def get_type_hints_with_generics_resolved( + annotation: Any, + globalns: dict[str, Any] | None = None, + localns: dict[str, Any] | None = None, + include_extras: bool = False, + type_hints: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Get the type hints for the given object after resolving the generic types as much as possible. + + Args: + annotation: A type annotation. + globalns: The global namespace. + localns: The local namespace. + include_extras: A flag indicating whether to include the ``Annotated[T, ...]`` or not. + type_hints: Already resolved type hints + """ + origin = get_origin(annotation) + + if origin is None: + # Implies the generic types have not been specified in the annotation + if type_hints is None: # pragma: no cover + type_hints = get_type_hints(annotation, globalns=globalns, localns=localns, include_extras=include_extras) + typevar_map = {p: p for p in annotation.__parameters__} + else: + if type_hints is None: # pragma: no cover + type_hints = get_type_hints(origin, globalns=globalns, localns=localns, include_extras=include_extras) + # the __parameters__ is only available on the origin itself and not the annotation + typevar_map = dict(zip(origin.__parameters__, get_args(annotation))) + + return {n: _substitute_typevars(type_, typevar_map) for n, type_ in type_hints.items()} + + +def _substitute_typevars(obj: Any, typevar_map: Mapping[Any, Any]) -> Any: + if params := getattr(obj, "__parameters__", None): + args = tuple(_substitute_typevars(typevar_map.get(p, p), typevar_map) for p in params) + return obj[args] + + if isinstance(obj, TypeVar): + # If there's a mapped type for the TypeVar already, then it should be returned instead + # of considering __constraints__ or __bound__. For a generic `Foo[T]`, if Foo[int] is given + # then int should be returned and if `Foo` is given then the __bounds__ and __constraints__ + # should be considered. + if (type_ := typevar_map.get(obj, None)) is not None and not isinstance(type_, TypeVar): + return type_ + + if obj.__bound__ is not None: + return obj.__bound__ + + if obj.__constraints__: + return Union[obj.__constraints__] # pyright: ignore + + return obj diff --git a/venv/lib/python3.11/site-packages/litestar/utils/version.py b/venv/lib/python3.11/site-packages/litestar/utils/version.py new file mode 100644 index 0000000..d7974eb --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/version.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +import re +import sys +from typing import Literal, NamedTuple + +__all__ = ("Version", "get_version", "parse_version") + + +if sys.version_info >= (3, 10): + import importlib.metadata as importlib_metadata +else: + import importlib_metadata + + +_ReleaseLevel = Literal["alpha", "beta", "rc", "final"] +_PRE_RELEASE_TAGS = {"alpha", "a", "beta", "b", "rc"} +_PRE_RELEASE_TAGS_CONVERSIONS: dict[str, _ReleaseLevel] = {"a": "alpha", "b": "beta"} + +_VERSION_PARTS_RE = re.compile(r"(\d+|[a-z]+|\.)") + + +class Version(NamedTuple): + """Litestar version information""" + + major: int + minor: int + patch: int + release_level: _ReleaseLevel + serial: int + + def formatted(self, short: bool = False) -> str: + version = f"{self.major}.{self.minor}.{self.patch}" + if not short: + version += f"{self.release_level}{self.serial}" + return version + + +def parse_version(raw_version: str) -> Version: + """Parse a version string into a :class:`Version`""" + parts = [p for p in _VERSION_PARTS_RE.split(raw_version) if p and p != "."] + release_level: _ReleaseLevel = "final" + serial = "0" + + if len(parts) == 3: + major, minor, patch = parts + elif len(parts) == 5: + major, minor, patch, release_level, serial = parts # type: ignore[assignment] + if release_level not in _PRE_RELEASE_TAGS: + raise ValueError(f"Invalid release level: {release_level}") + release_level = _PRE_RELEASE_TAGS_CONVERSIONS.get(release_level, release_level) + else: + raise ValueError(f"Invalid version: {raw_version}") + + return Version( + major=int(major), minor=int(minor), patch=int(patch), release_level=release_level, serial=int(serial) + ) + + +def get_version() -> Version: + """Get the version of the installed litestar package""" + return parse_version(importlib_metadata.version("litestar")) diff --git a/venv/lib/python3.11/site-packages/litestar/utils/warnings.py b/venv/lib/python3.11/site-packages/litestar/utils/warnings.py new file mode 100644 index 0000000..e20484b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/utils/warnings.py @@ -0,0 +1,51 @@ +import os +import warnings + +from litestar.exceptions import LitestarWarning +from litestar.types import AnyCallable, AnyGenerator + + +def warn_implicit_sync_to_thread(source: AnyCallable, stacklevel: int = 2) -> None: + if os.getenv("LITESTAR_WARN_IMPLICIT_SYNC_TO_THREAD") == "0": + return + + warnings.warn( + f"Use of a synchronous callable {source} without setting sync_to_thread is " + "discouraged since synchronous callables can block the main thread if they " + "perform blocking operations. If the callable is guaranteed to be non-blocking, " + "you can set sync_to_thread=False to skip this warning, or set the environment" + "variable LITESTAR_WARN_IMPLICIT_SYNC_TO_THREAD=0 to disable warnings of this " + "type entirely.", + category=LitestarWarning, + stacklevel=stacklevel, + ) + + +def warn_sync_to_thread_with_async_callable(source: AnyCallable, stacklevel: int = 2) -> None: + if os.getenv("LITESTAR_WARN_SYNC_TO_THREAD_WITH_ASYNC") == "0": + return + + warnings.warn( + f"Use of an asynchronous callable {source} with sync_to_thread; sync_to_thread " + "has no effect on async callable. You can disable this warning by setting " + "LITESTAR_WARN_SYNC_TO_THREAD_WITH_ASYNC=0", + category=LitestarWarning, + stacklevel=stacklevel, + ) + + +def warn_sync_to_thread_with_generator(source: AnyGenerator, stacklevel: int = 2) -> None: + if os.getenv("LITESTAR_WARN_SYNC_TO_THREAD_WITH_GENERATOR") == "0": + return + + warnings.warn( + f"Use of generator {source} with sync_to_thread; sync_to_thread has no effect " + "on generators. You can disable this warning by setting " + "LITESTAR_WARN_SYNC_TO_THREAD_WITH_GENERATOR=0", + category=LitestarWarning, + stacklevel=stacklevel, + ) + + +def warn_pdb_on_exception(stacklevel: int = 2) -> None: + warnings.warn("Python Debugger on exception enabled", category=LitestarWarning, stacklevel=stacklevel) |