diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/contrib/pydantic | |
parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/contrib/pydantic')
14 files changed, 918 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__init__.py b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__init__.py new file mode 100644 index 0000000..9bab707 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__init__.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from litestar.plugins import InitPluginProtocol + +from .pydantic_di_plugin import PydanticDIPlugin +from .pydantic_dto_factory import PydanticDTO +from .pydantic_init_plugin import PydanticInitPlugin +from .pydantic_schema_plugin import PydanticSchemaPlugin + +if TYPE_CHECKING: + from pydantic import BaseModel + from pydantic.v1 import BaseModel as BaseModelV1 + + from litestar.config.app import AppConfig + +__all__ = ( + "PydanticDTO", + "PydanticInitPlugin", + "PydanticSchemaPlugin", + "PydanticPlugin", + "PydanticDIPlugin", +) + + +def _model_dump(model: BaseModel | BaseModelV1, *, by_alias: bool = False) -> dict[str, Any]: + return ( + model.model_dump(mode="json", by_alias=by_alias) # pyright: ignore + if hasattr(model, "model_dump") + else {k: v.decode() if isinstance(v, bytes) else v for k, v in model.dict(by_alias=by_alias).items()} + ) + + +def _model_dump_json(model: BaseModel | BaseModelV1, by_alias: bool = False) -> str: + return ( + model.model_dump_json(by_alias=by_alias) # pyright: ignore + if hasattr(model, "model_dump_json") + else model.json(by_alias=by_alias) # pyright: ignore + ) + + +class PydanticPlugin(InitPluginProtocol): + """A plugin that provides Pydantic integration.""" + + __slots__ = ("prefer_alias",) + + def __init__(self, prefer_alias: bool = False) -> None: + """Initialize ``PydanticPlugin``. + + Args: + prefer_alias: OpenAPI and ``type_encoders`` will export by alias + """ + self.prefer_alias = prefer_alias + + def on_app_init(self, app_config: AppConfig) -> AppConfig: + """Configure application for use with Pydantic. + + Args: + app_config: The :class:`AppConfig <.config.app.AppConfig>` instance. + """ + app_config.plugins.extend( + [ + PydanticInitPlugin(prefer_alias=self.prefer_alias), + PydanticSchemaPlugin(prefer_alias=self.prefer_alias), + PydanticDIPlugin(), + ] + ) + return app_config diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..a62eb0f --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/config.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/config.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..5515df6 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/config.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_di_plugin.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_di_plugin.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..f8c4e00 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_di_plugin.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_dto_factory.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_dto_factory.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..490108e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_dto_factory.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_init_plugin.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_init_plugin.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..13788d4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_init_plugin.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_schema_plugin.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_schema_plugin.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..8b0946b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/pydantic_schema_plugin.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/utils.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/utils.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..dbb0e54 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/__pycache__/utils.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/config.py b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/config.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/config.py diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_di_plugin.py b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_di_plugin.py new file mode 100644 index 0000000..2096fd4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_di_plugin.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import inspect +from inspect import Signature +from typing import Any + +from litestar.contrib.pydantic.utils import is_pydantic_model_class +from litestar.plugins import DIPlugin + + +class PydanticDIPlugin(DIPlugin): + def has_typed_init(self, type_: Any) -> bool: + return is_pydantic_model_class(type_) + + def get_typed_init(self, type_: Any) -> tuple[Signature, dict[str, Any]]: + try: + model_fields = dict(type_.model_fields) + except AttributeError: + model_fields = {k: model_field.field_info for k, model_field in type_.__fields__.items()} + + parameters = [ + inspect.Parameter(name=field_name, kind=inspect.Parameter.KEYWORD_ONLY, annotation=Any) + for field_name in model_fields + ] + type_hints = {field_name: Any for field_name in model_fields} + return Signature(parameters), type_hints diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_dto_factory.py b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_dto_factory.py new file mode 100644 index 0000000..d61f95d --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_dto_factory.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from dataclasses import replace +from typing import TYPE_CHECKING, Collection, Generic, TypeVar + +from typing_extensions import TypeAlias, override + +from litestar.contrib.pydantic.utils import is_pydantic_undefined +from litestar.dto.base_dto import AbstractDTO +from litestar.dto.data_structures import DTOFieldDefinition +from litestar.dto.field import DTO_FIELD_META_KEY, DTOField +from litestar.exceptions import MissingDependencyException, ValidationException +from litestar.types.empty import Empty + +if TYPE_CHECKING: + from typing import Any, Generator + + from litestar.typing import FieldDefinition + +try: + import pydantic as _ # noqa: F401 +except ImportError as e: + raise MissingDependencyException("pydantic") from e + + +try: + import pydantic as pydantic_v2 + from pydantic import ValidationError as ValidationErrorV2 + from pydantic import v1 as pydantic_v1 + from pydantic.v1 import ValidationError as ValidationErrorV1 + + ModelType: TypeAlias = "pydantic_v1.BaseModel | pydantic_v2.BaseModel" + +except ImportError: + import pydantic as pydantic_v1 # type: ignore[no-redef] + + pydantic_v2 = Empty # type: ignore[assignment] + from pydantic import ValidationError as ValidationErrorV1 # type: ignore[assignment] + + ValidationErrorV2 = ValidationErrorV1 # type: ignore[assignment, misc] + ModelType = "pydantic_v1.BaseModel" # type: ignore[misc] + + +T = TypeVar("T", bound="ModelType | Collection[ModelType]") + + +__all__ = ("PydanticDTO",) + + +class PydanticDTO(AbstractDTO[T], Generic[T]): + """Support for domain modelling with Pydantic.""" + + @override + def decode_builtins(self, value: dict[str, Any]) -> Any: + try: + return super().decode_builtins(value) + except (ValidationErrorV2, ValidationErrorV1) as ex: + raise ValidationException(extra=ex.errors()) from ex + + @override + def decode_bytes(self, value: bytes) -> Any: + try: + return super().decode_bytes(value) + except (ValidationErrorV2, ValidationErrorV1) as ex: + raise ValidationException(extra=ex.errors()) from ex + + @classmethod + def generate_field_definitions( + cls, model_type: type[pydantic_v1.BaseModel | pydantic_v2.BaseModel] + ) -> Generator[DTOFieldDefinition, None, None]: + model_field_definitions = cls.get_model_type_hints(model_type) + + model_fields: dict[str, pydantic_v1.fields.FieldInfo | pydantic_v2.fields.FieldInfo] + try: + model_fields = dict(model_type.model_fields) # type: ignore[union-attr] + except AttributeError: + model_fields = { + k: model_field.field_info + for k, model_field in model_type.__fields__.items() # type: ignore[union-attr] + } + + for field_name, field_info in model_fields.items(): + field_definition = model_field_definitions[field_name] + dto_field = (field_definition.extra or {}).pop(DTO_FIELD_META_KEY, DTOField()) + + if not is_pydantic_undefined(field_info.default): + default = field_info.default + elif field_definition.is_optional: + default = None + else: + default = Empty + + yield replace( + DTOFieldDefinition.from_field_definition( + field_definition=field_definition, + dto_field=dto_field, + model_name=model_type.__name__, + default_factory=field_info.default_factory + if field_info.default_factory and not is_pydantic_undefined(field_info.default_factory) + else None, + ), + default=default, + name=field_name, + ) + + @classmethod + def detect_nested_field(cls, field_definition: FieldDefinition) -> bool: + if pydantic_v2 is not Empty: # type: ignore[comparison-overlap] + return field_definition.is_subclass_of((pydantic_v1.BaseModel, pydantic_v2.BaseModel)) + return field_definition.is_subclass_of(pydantic_v1.BaseModel) # type: ignore[unreachable] diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_init_plugin.py b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_init_plugin.py new file mode 100644 index 0000000..1261cd8 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_init_plugin.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +from contextlib import suppress +from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast +from uuid import UUID + +from msgspec import ValidationError +from typing_extensions import Buffer, TypeGuard + +from litestar._signature.types import ExtendedMsgSpecValidationError +from litestar.contrib.pydantic.utils import is_pydantic_constrained_field +from litestar.exceptions import MissingDependencyException +from litestar.plugins import InitPluginProtocol +from litestar.typing import _KWARG_META_EXTRACTORS +from litestar.utils import is_class_and_subclass + +try: + # check if we have pydantic v2 installed, and try to import both versions + import pydantic as pydantic_v2 + from pydantic import v1 as pydantic_v1 +except ImportError: + # check if pydantic 1 is installed and import it + try: + import pydantic as pydantic_v1 # type: ignore[no-redef] + + pydantic_v2 = None # type: ignore[assignment] + except ImportError as e: + raise MissingDependencyException("pydantic") from e + + +if TYPE_CHECKING: + from litestar.config.app import AppConfig + + +T = TypeVar("T") + + +def _dec_pydantic_v1(model_type: type[pydantic_v1.BaseModel], value: Any) -> pydantic_v1.BaseModel: + try: + return model_type.parse_obj(value) + except pydantic_v1.ValidationError as e: + raise ExtendedMsgSpecValidationError(errors=cast("list[dict[str, Any]]", e.errors())) from e + + +def _dec_pydantic_v2(model_type: type[pydantic_v2.BaseModel], value: Any) -> pydantic_v2.BaseModel: + try: + return model_type.model_validate(value, strict=False) + except pydantic_v2.ValidationError as e: + raise ExtendedMsgSpecValidationError(errors=cast("list[dict[str, Any]]", e.errors())) from e + + +def _dec_pydantic_uuid( + uuid_type: type[pydantic_v1.UUID1] | type[pydantic_v1.UUID3] | type[pydantic_v1.UUID4] | type[pydantic_v1.UUID5], + value: Any, +) -> ( + type[pydantic_v1.UUID1] | type[pydantic_v1.UUID3] | type[pydantic_v1.UUID4] | type[pydantic_v1.UUID5] +): # pragma: no cover + if isinstance(value, str): + value = uuid_type(value) + + elif isinstance(value, Buffer): + value = bytes(value) + try: + value = uuid_type(value.decode()) + except ValueError: + # 16 bytes in big-endian order as the bytes argument fail + # the above check + value = uuid_type(bytes=value) + elif isinstance(value, UUID): + value = uuid_type(str(value)) + + if not isinstance(value, uuid_type): + raise ValidationError(f"Invalid UUID: {value!r}") + + if value._required_version != value.version: + raise ValidationError(f"Invalid UUID version: {value!r}") + + return cast( + "type[pydantic_v1.UUID1] | type[pydantic_v1.UUID3] | type[pydantic_v1.UUID4] | type[pydantic_v1.UUID5]", value + ) + + +def _is_pydantic_v1_uuid(value: Any) -> bool: # pragma: no cover + return is_class_and_subclass(value, (pydantic_v1.UUID1, pydantic_v1.UUID3, pydantic_v1.UUID4, pydantic_v1.UUID5)) + + +_base_encoders: dict[Any, Callable[[Any], Any]] = { + pydantic_v1.EmailStr: str, + pydantic_v1.NameEmail: str, + pydantic_v1.ByteSize: lambda val: val.real, +} + +if pydantic_v2 is not None: # pragma: no cover + _base_encoders.update( + { + pydantic_v2.EmailStr: str, + pydantic_v2.NameEmail: str, + pydantic_v2.ByteSize: lambda val: val.real, + } + ) + + +def is_pydantic_v1_model_class(annotation: Any) -> TypeGuard[type[pydantic_v1.BaseModel]]: + return is_class_and_subclass(annotation, pydantic_v1.BaseModel) + + +def is_pydantic_v2_model_class(annotation: Any) -> TypeGuard[type[pydantic_v2.BaseModel]]: + return is_class_and_subclass(annotation, pydantic_v2.BaseModel) + + +class ConstrainedFieldMetaExtractor: + @staticmethod + def matches(annotation: Any, name: str | None, default: Any) -> bool: + return is_pydantic_constrained_field(annotation) + + @staticmethod + def extract(annotation: Any, default: Any) -> Any: + return [annotation] + + +class PydanticInitPlugin(InitPluginProtocol): + __slots__ = ("prefer_alias",) + + def __init__(self, prefer_alias: bool = False) -> None: + self.prefer_alias = prefer_alias + + @classmethod + def encoders(cls, prefer_alias: bool = False) -> dict[Any, Callable[[Any], Any]]: + encoders = {**_base_encoders, **cls._create_pydantic_v1_encoders(prefer_alias)} + if pydantic_v2 is not None: # pragma: no cover + encoders.update(cls._create_pydantic_v2_encoders(prefer_alias)) + return encoders + + @classmethod + def decoders(cls) -> list[tuple[Callable[[Any], bool], Callable[[Any, Any], Any]]]: + decoders: list[tuple[Callable[[Any], bool], Callable[[Any, Any], Any]]] = [ + (is_pydantic_v1_model_class, _dec_pydantic_v1) + ] + + if pydantic_v2 is not None: # pragma: no cover + decoders.append((is_pydantic_v2_model_class, _dec_pydantic_v2)) + + decoders.append((_is_pydantic_v1_uuid, _dec_pydantic_uuid)) + + return decoders + + @staticmethod + def _create_pydantic_v1_encoders(prefer_alias: bool = False) -> dict[Any, Callable[[Any], Any]]: # pragma: no cover + return { + pydantic_v1.BaseModel: lambda model: { + k: v.decode() if isinstance(v, bytes) else v for k, v in model.dict(by_alias=prefer_alias).items() + }, + pydantic_v1.SecretField: str, + pydantic_v1.StrictBool: int, + pydantic_v1.color.Color: str, + pydantic_v1.ConstrainedBytes: lambda val: val.decode("utf-8"), + pydantic_v1.ConstrainedDate: lambda val: val.isoformat(), + pydantic_v1.AnyUrl: str, + } + + @staticmethod + def _create_pydantic_v2_encoders(prefer_alias: bool = False) -> dict[Any, Callable[[Any], Any]]: + encoders: dict[Any, Callable[[Any], Any]] = { + pydantic_v2.BaseModel: lambda model: model.model_dump(mode="json", by_alias=prefer_alias), + pydantic_v2.types.SecretStr: lambda val: "**********" if val else "", + pydantic_v2.types.SecretBytes: lambda val: "**********" if val else "", + pydantic_v2.AnyUrl: str, + } + + with suppress(ImportError): + from pydantic_extra_types import color + + encoders[color.Color] = str + + return encoders + + def on_app_init(self, app_config: AppConfig) -> AppConfig: + app_config.type_encoders = {**self.encoders(self.prefer_alias), **(app_config.type_encoders or {})} + app_config.type_decoders = [*self.decoders(), *(app_config.type_decoders or [])] + + _KWARG_META_EXTRACTORS.add(ConstrainedFieldMetaExtractor) + return app_config diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_schema_plugin.py b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_schema_plugin.py new file mode 100644 index 0000000..2c189e4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/pydantic_schema_plugin.py @@ -0,0 +1,317 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Optional + +from typing_extensions import Annotated + +from litestar.contrib.pydantic.utils import ( + create_field_definitions_for_computed_fields, + is_pydantic_2_model, + is_pydantic_constrained_field, + is_pydantic_model_class, + is_pydantic_undefined, + pydantic_get_type_hints_with_generics_resolved, + pydantic_unwrap_and_get_origin, +) +from litestar.exceptions import MissingDependencyException +from litestar.openapi.spec import OpenAPIFormat, OpenAPIType, Schema +from litestar.plugins import OpenAPISchemaPlugin +from litestar.types import Empty +from litestar.typing import FieldDefinition +from litestar.utils import is_class_and_subclass, is_generic + +try: + # check if we have pydantic v2 installed, and try to import both versions + import pydantic as pydantic_v2 + from pydantic import v1 as pydantic_v1 +except ImportError: + # check if pydantic 1 is installed and import it + try: + import pydantic as pydantic_v1 # type: ignore[no-redef] + + pydantic_v2 = None # type: ignore[assignment] + except ImportError as e: + raise MissingDependencyException("pydantic") from e + +if TYPE_CHECKING: + from litestar._openapi.schema_generation.schema import SchemaCreator + +PYDANTIC_TYPE_MAP: dict[type[Any] | None | Any, Schema] = { + pydantic_v1.ByteSize: Schema(type=OpenAPIType.INTEGER), + pydantic_v1.EmailStr: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.EMAIL), + pydantic_v1.IPvAnyAddress: Schema( + one_of=[ + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV4, + description="IPv4 address", + ), + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV6, + description="IPv6 address", + ), + ] + ), + pydantic_v1.IPvAnyInterface: Schema( + one_of=[ + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV4, + description="IPv4 interface", + ), + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV6, + description="IPv6 interface", + ), + ] + ), + pydantic_v1.IPvAnyNetwork: Schema( + one_of=[ + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV4, + description="IPv4 network", + ), + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV6, + description="IPv6 network", + ), + ] + ), + pydantic_v1.Json: Schema(type=OpenAPIType.OBJECT, format=OpenAPIFormat.JSON_POINTER), + pydantic_v1.NameEmail: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.EMAIL, description="Name and email"), + # removed in v2 + pydantic_v1.PyObject: Schema( + type=OpenAPIType.STRING, + description="dot separated path identifying a python object, e.g. 'decimal.Decimal'", + ), + # annotated in v2 + pydantic_v1.UUID1: Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.UUID, + description="UUID1 string", + ), + pydantic_v1.UUID3: Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.UUID, + description="UUID3 string", + ), + pydantic_v1.UUID4: Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.UUID, + description="UUID4 string", + ), + pydantic_v1.UUID5: Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.UUID, + description="UUID5 string", + ), + pydantic_v1.DirectoryPath: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.URI_REFERENCE), + pydantic_v1.AnyUrl: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.URL), + pydantic_v1.AnyHttpUrl: Schema( + type=OpenAPIType.STRING, format=OpenAPIFormat.URL, description="must be a valid HTTP based URL" + ), + pydantic_v1.FilePath: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.URI_REFERENCE), + pydantic_v1.HttpUrl: Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.URL, + description="must be a valid HTTP based URL", + max_length=2083, + ), + pydantic_v1.RedisDsn: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.URI, description="redis DSN"), + pydantic_v1.PostgresDsn: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.URI, description="postgres DSN"), + pydantic_v1.SecretBytes: Schema(type=OpenAPIType.STRING), + pydantic_v1.SecretStr: Schema(type=OpenAPIType.STRING), + pydantic_v1.StrictBool: Schema(type=OpenAPIType.BOOLEAN), + pydantic_v1.StrictBytes: Schema(type=OpenAPIType.STRING), + pydantic_v1.StrictFloat: Schema(type=OpenAPIType.NUMBER), + pydantic_v1.StrictInt: Schema(type=OpenAPIType.INTEGER), + pydantic_v1.StrictStr: Schema(type=OpenAPIType.STRING), + pydantic_v1.NegativeFloat: Schema(type=OpenAPIType.NUMBER, exclusive_maximum=0.0), + pydantic_v1.NegativeInt: Schema(type=OpenAPIType.INTEGER, exclusive_maximum=0), + pydantic_v1.NonNegativeInt: Schema(type=OpenAPIType.INTEGER, minimum=0), + pydantic_v1.NonPositiveFloat: Schema(type=OpenAPIType.NUMBER, maximum=0.0), + pydantic_v1.PaymentCardNumber: Schema(type=OpenAPIType.STRING, min_length=12, max_length=19), + pydantic_v1.PositiveFloat: Schema(type=OpenAPIType.NUMBER, exclusive_minimum=0.0), + pydantic_v1.PositiveInt: Schema(type=OpenAPIType.INTEGER, exclusive_minimum=0), +} + +if pydantic_v2 is not None: # pragma: no cover + PYDANTIC_TYPE_MAP.update( + { + pydantic_v2.SecretStr: Schema(type=OpenAPIType.STRING), + pydantic_v2.SecretBytes: Schema(type=OpenAPIType.STRING), + pydantic_v2.ByteSize: Schema(type=OpenAPIType.INTEGER), + pydantic_v2.EmailStr: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.EMAIL), + pydantic_v2.IPvAnyAddress: Schema( + one_of=[ + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV4, + description="IPv4 address", + ), + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV6, + description="IPv6 address", + ), + ] + ), + pydantic_v2.IPvAnyInterface: Schema( + one_of=[ + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV4, + description="IPv4 interface", + ), + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV6, + description="IPv6 interface", + ), + ] + ), + pydantic_v2.IPvAnyNetwork: Schema( + one_of=[ + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV4, + description="IPv4 network", + ), + Schema( + type=OpenAPIType.STRING, + format=OpenAPIFormat.IPV6, + description="IPv6 network", + ), + ] + ), + pydantic_v2.Json: Schema(type=OpenAPIType.OBJECT, format=OpenAPIFormat.JSON_POINTER), + pydantic_v2.NameEmail: Schema( + type=OpenAPIType.STRING, format=OpenAPIFormat.EMAIL, description="Name and email" + ), + pydantic_v2.AnyUrl: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.URL), + } + ) + + +_supported_types = (pydantic_v1.BaseModel, *PYDANTIC_TYPE_MAP.keys()) +if pydantic_v2 is not None: # pragma: no cover + _supported_types = (pydantic_v2.BaseModel, *_supported_types) + + +class PydanticSchemaPlugin(OpenAPISchemaPlugin): + __slots__ = ("prefer_alias",) + + def __init__(self, prefer_alias: bool = False) -> None: + self.prefer_alias = prefer_alias + + @staticmethod + def is_plugin_supported_type(value: Any) -> bool: + return isinstance(value, _supported_types) or is_class_and_subclass(value, _supported_types) # type: ignore[arg-type] + + @staticmethod + def is_undefined_sentinel(value: Any) -> bool: + return is_pydantic_undefined(value) + + @staticmethod + def is_constrained_field(field_definition: FieldDefinition) -> bool: + return is_pydantic_constrained_field(field_definition.annotation) + + def to_openapi_schema(self, field_definition: FieldDefinition, schema_creator: SchemaCreator) -> Schema: + """Given a type annotation, transform it into an OpenAPI schema class. + + Args: + field_definition: FieldDefinition instance. + schema_creator: An instance of the schema creator class + + Returns: + An :class:`OpenAPI <litestar.openapi.spec.schema.Schema>` instance. + """ + if schema_creator.prefer_alias != self.prefer_alias: + schema_creator.prefer_alias = True + if is_pydantic_model_class(field_definition.annotation): + return self.for_pydantic_model(field_definition=field_definition, schema_creator=schema_creator) + return PYDANTIC_TYPE_MAP[field_definition.annotation] # pragma: no cover + + @classmethod + def for_pydantic_model(cls, field_definition: FieldDefinition, schema_creator: SchemaCreator) -> Schema: # pyright: ignore + """Create a schema object for a given pydantic model class. + + Args: + field_definition: FieldDefinition instance. + schema_creator: An instance of the schema creator class + + Returns: + A schema instance. + """ + + annotation = field_definition.annotation + if is_generic(annotation): + is_generic_model = True + model = pydantic_unwrap_and_get_origin(annotation) or annotation + else: + is_generic_model = False + model = annotation + + if is_pydantic_2_model(model): + model_config = model.model_config + model_field_info = model.model_fields + title = model_config.get("title") + example = model_config.get("example") + is_v2_model = True + else: + model_config = annotation.__config__ + model_field_info = model.__fields__ + title = getattr(model_config, "title", None) + example = getattr(model_config, "example", None) + is_v2_model = False + + model_fields: dict[str, pydantic_v1.fields.FieldInfo | pydantic_v2.fields.FieldInfo] = { # pyright: ignore + k: getattr(f, "field_info", f) for k, f in model_field_info.items() + } + + if is_v2_model: + # extract the annotations from the FieldInfo. This allows us to skip fields + # which have been marked as private + model_annotations = {k: field_info.annotation for k, field_info in model_fields.items()} # type: ignore[union-attr] + + else: + # pydantic v1 requires some workarounds here + model_annotations = { + k: f.outer_type_ if f.required else Optional[f.outer_type_] for k, f in model.__fields__.items() + } + + if is_generic_model: + # if the model is generic, resolve the type variables. We pass in the + # already extracted annotations, to keep the logic of respecting private + # fields consistent with the above + model_annotations = pydantic_get_type_hints_with_generics_resolved( + annotation, model_annotations=model_annotations, include_extras=True + ) + + property_fields = { + field_info.alias if field_info.alias and schema_creator.prefer_alias else k: FieldDefinition.from_kwarg( + annotation=Annotated[model_annotations[k], field_info, field_info.metadata] # type: ignore[union-attr] + if is_v2_model + else Annotated[model_annotations[k], field_info], # pyright: ignore + name=field_info.alias if field_info.alias and schema_creator.prefer_alias else k, + default=Empty if schema_creator.is_undefined(field_info.default) else field_info.default, + ) + for k, field_info in model_fields.items() + } + + computed_field_definitions = create_field_definitions_for_computed_fields( + annotation, schema_creator.prefer_alias + ) + property_fields.update(computed_field_definitions) + + return schema_creator.create_component_schema( + field_definition, + required=sorted(f.name for f in property_fields.values() if f.is_required), + property_fields=property_fields, + title=title, + examples=None if example is None else [example], + ) diff --git a/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/utils.py b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/utils.py new file mode 100644 index 0000000..6aee322 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/contrib/pydantic/utils.py @@ -0,0 +1,214 @@ +# mypy: strict-equality=False +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from typing_extensions import Annotated, get_type_hints + +from litestar.params import KwargDefinition +from litestar.types import Empty +from litestar.typing import FieldDefinition +from litestar.utils import deprecated, is_class_and_subclass +from litestar.utils.predicates import is_generic +from litestar.utils.typing import ( + _substitute_typevars, + get_origin_or_inner_type, + get_type_hints_with_generics_resolved, + normalize_type_annotation, +) + +# isort: off +try: + from pydantic import v1 as pydantic_v1 + import pydantic as pydantic_v2 + from pydantic.fields import PydanticUndefined as Pydantic2Undefined # type: ignore[attr-defined] + from pydantic.v1.fields import Undefined as Pydantic1Undefined + + PYDANTIC_UNDEFINED_SENTINELS = {Pydantic1Undefined, Pydantic2Undefined} +except ImportError: + try: + import pydantic as pydantic_v1 # type: ignore[no-redef] + from pydantic.fields import Undefined as Pydantic1Undefined # type: ignore[attr-defined, no-redef] + + pydantic_v2 = Empty # type: ignore[assignment] + PYDANTIC_UNDEFINED_SENTINELS = {Pydantic1Undefined} + + except ImportError: # pyright: ignore + pydantic_v1 = Empty # type: ignore[assignment] + pydantic_v2 = Empty # type: ignore[assignment] + PYDANTIC_UNDEFINED_SENTINELS = set() +# isort: on + + +if TYPE_CHECKING: + from typing_extensions import TypeGuard + + +def is_pydantic_model_class( + annotation: Any, +) -> TypeGuard[type[pydantic_v1.BaseModel | pydantic_v2.BaseModel]]: # pyright: ignore + """Given a type annotation determine if the annotation is a subclass of pydantic's BaseModel. + + Args: + annotation: A type. + + Returns: + A typeguard determining whether the type is :data:`BaseModel pydantic.BaseModel>`. + """ + tests: list[bool] = [] + + if pydantic_v1 is not Empty: # pragma: no cover + tests.append(is_class_and_subclass(annotation, pydantic_v1.BaseModel)) + + if pydantic_v2 is not Empty: # pragma: no cover + tests.append(is_class_and_subclass(annotation, pydantic_v2.BaseModel)) + + return any(tests) + + +def is_pydantic_model_instance( + annotation: Any, +) -> TypeGuard[pydantic_v1.BaseModel | pydantic_v2.BaseModel]: # pyright: ignore + """Given a type annotation determine if the annotation is an instance of pydantic's BaseModel. + + Args: + annotation: A type. + + Returns: + A typeguard determining whether the type is :data:`BaseModel pydantic.BaseModel>`. + """ + tests: list[bool] = [] + + if pydantic_v1 is not Empty: # pragma: no cover + tests.append(isinstance(annotation, pydantic_v1.BaseModel)) + + if pydantic_v2 is not Empty: # pragma: no cover + tests.append(isinstance(annotation, pydantic_v2.BaseModel)) + + return any(tests) + + +def is_pydantic_constrained_field(annotation: Any) -> bool: + """Check if the given annotation is a constrained pydantic type. + + Args: + annotation: A type annotation + + Returns: + True if pydantic is installed and the type is a constrained type, otherwise False. + """ + if pydantic_v1 is Empty: # pragma: no cover + return False # type: ignore[unreachable] + + return any( + is_class_and_subclass(annotation, constrained_type) # pyright: ignore + for constrained_type in ( + pydantic_v1.ConstrainedBytes, + pydantic_v1.ConstrainedDate, + pydantic_v1.ConstrainedDecimal, + pydantic_v1.ConstrainedFloat, + pydantic_v1.ConstrainedFrozenSet, + pydantic_v1.ConstrainedInt, + pydantic_v1.ConstrainedList, + pydantic_v1.ConstrainedSet, + pydantic_v1.ConstrainedStr, + ) + ) + + +def pydantic_unwrap_and_get_origin(annotation: Any) -> Any | None: + if pydantic_v2 is Empty or (pydantic_v1 is not Empty and is_class_and_subclass(annotation, pydantic_v1.BaseModel)): + return get_origin_or_inner_type(annotation) + + origin = annotation.__pydantic_generic_metadata__["origin"] + return normalize_type_annotation(origin) + + +def pydantic_get_type_hints_with_generics_resolved( + annotation: Any, + globalns: dict[str, Any] | None = None, + localns: dict[str, Any] | None = None, + include_extras: bool = False, + model_annotations: dict[str, Any] | None = None, +) -> dict[str, Any]: + if pydantic_v2 is Empty or (pydantic_v1 is not Empty and is_class_and_subclass(annotation, pydantic_v1.BaseModel)): + return get_type_hints_with_generics_resolved(annotation, type_hints=model_annotations) + + origin = pydantic_unwrap_and_get_origin(annotation) + if origin is None: + if model_annotations is None: # pragma: no cover + model_annotations = get_type_hints( + annotation, globalns=globalns, localns=localns, include_extras=include_extras + ) + typevar_map = {p: p for p in annotation.__pydantic_generic_metadata__["parameters"]} + else: + if model_annotations is None: + model_annotations = get_type_hints( + origin, globalns=globalns, localns=localns, include_extras=include_extras + ) + args = annotation.__pydantic_generic_metadata__["args"] + parameters = origin.__pydantic_generic_metadata__["parameters"] + typevar_map = dict(zip(parameters, args)) + + return {n: _substitute_typevars(type_, typevar_map) for n, type_ in model_annotations.items()} + + +@deprecated(version="2.6.2") +def pydantic_get_unwrapped_annotation_and_type_hints(annotation: Any) -> tuple[Any, dict[str, Any]]: # pragma: pver + """Get the unwrapped annotation and the type hints after resolving generics. + + Args: + annotation: A type annotation. + + Returns: + A tuple containing the unwrapped annotation and the type hints. + """ + + if is_generic(annotation): + origin = pydantic_unwrap_and_get_origin(annotation) + return origin or annotation, pydantic_get_type_hints_with_generics_resolved(annotation, include_extras=True) + return annotation, get_type_hints(annotation, include_extras=True) + + +def is_pydantic_2_model( + obj: type[pydantic_v1.BaseModel | pydantic_v2.BaseModel], # pyright: ignore +) -> TypeGuard[pydantic_v2.BaseModel]: # pyright: ignore + return pydantic_v2 is not Empty and issubclass(obj, pydantic_v2.BaseModel) + + +def is_pydantic_undefined(value: Any) -> bool: + return any(v is value for v in PYDANTIC_UNDEFINED_SENTINELS) + + +def create_field_definitions_for_computed_fields( + model: type[pydantic_v1.BaseModel | pydantic_v2.BaseModel], # pyright: ignore + prefer_alias: bool, +) -> dict[str, FieldDefinition]: + """Create field definitions for computed fields. + + Args: + model: A pydantic model. + prefer_alias: Whether to prefer the alias or the name of the field. + + Returns: + A dictionary containing the field definitions for the computed fields. + """ + pydantic_decorators = getattr(model, "__pydantic_decorators__", None) + if pydantic_decorators is None: + return {} + + def get_name(k: str, dec: Any) -> str: + if not dec.info.alias: + return k + return dec.info.alias if prefer_alias else k # type: ignore[no-any-return] + + return { + (name := get_name(k, dec)): FieldDefinition.from_annotation( + Annotated[ + dec.info.return_type, + KwargDefinition(title=dec.info.title, description=dec.info.description, read_only=True), + ], + name=name, + ) + for k, dec in pydantic_decorators.computed_fields.items() + } |