diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/dto')
22 files changed, 2290 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__init__.py b/venv/lib/python3.11/site-packages/litestar/dto/__init__.py new file mode 100644 index 0000000..052e6a4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__init__.py @@ -0,0 +1,20 @@ +from .base_dto import AbstractDTO +from .config import DTOConfig +from .data_structures import DTOData, DTOFieldDefinition +from .dataclass_dto import DataclassDTO +from .field import DTOField, Mark, dto_field +from .msgspec_dto import MsgspecDTO +from .types import RenameStrategy + +__all__ = ( + "AbstractDTO", + "DTOConfig", + "DTOData", + "DTOField", + "DTOFieldDefinition", + "DataclassDTO", + "Mark", + "MsgspecDTO", + "RenameStrategy", + "dto_field", +) diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..7551946 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/_backend.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/_backend.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..d39aea9 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/_backend.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/_codegen_backend.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/_codegen_backend.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..610f4b4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/_codegen_backend.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/_types.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/_types.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..53877b3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/_types.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/base_dto.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/base_dto.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..eb9af7a --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/base_dto.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/config.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/config.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..ba3f03d --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/config.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/data_structures.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/data_structures.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..785f3b0 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/data_structures.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/dataclass_dto.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/dataclass_dto.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..2388d4a --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/dataclass_dto.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/field.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/field.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..a6d1cbb --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/field.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/msgspec_dto.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/msgspec_dto.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..0d15d6b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/msgspec_dto.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/types.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/types.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..ec01cd4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/__pycache__/types.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/dto/_backend.py b/venv/lib/python3.11/site-packages/litestar/dto/_backend.py new file mode 100644 index 0000000..1c48dc0 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/_backend.py @@ -0,0 +1,911 @@ +"""DTO backends do the heavy lifting of decoding and validating raw bytes into domain models, and +back again, to bytes. +""" + +from __future__ import annotations + +from dataclasses import replace +from typing import ( + TYPE_CHECKING, + AbstractSet, + Any, + ClassVar, + Collection, + Final, + Mapping, + Protocol, + Union, + cast, +) + +import msgspec +from msgspec import UNSET, Struct, UnsetType, convert, defstruct, field +from typing_extensions import Annotated + +from litestar.dto._types import ( + CollectionType, + CompositeType, + MappingType, + NestedFieldInfo, + SimpleType, + TransferDTOFieldDefinition, + TransferType, + TupleType, + UnionType, +) +from litestar.dto.data_structures import DTOData, DTOFieldDefinition +from litestar.dto.field import Mark +from litestar.enums import RequestEncodingType +from litestar.params import KwargDefinition +from litestar.serialization import decode_json, decode_msgpack +from litestar.types import Empty +from litestar.typing import FieldDefinition +from litestar.utils import unique_name_for_scope + +if TYPE_CHECKING: + from litestar.connection import ASGIConnection + from litestar.dto import AbstractDTO, RenameStrategy + from litestar.types.serialization import LitestarEncodableType + +__all__ = ("DTOBackend",) + + +class CompositeTypeHandler(Protocol): + def __call__( + self, + field_definition: FieldDefinition, + exclude: AbstractSet[str], + include: AbstractSet[str], + rename_fields: dict[str, str], + unique_name: str, + nested_depth: int, + ) -> CompositeType: ... + + +class DTOBackend: + __slots__ = ( + "annotation", + "dto_data_type", + "dto_factory", + "field_definition", + "handler_id", + "is_data_field", + "model_type", + "parsed_field_definitions", + "reverse_name_map", + "transfer_model_type", + "wrapper_attribute_name", + ) + + _seen_model_names: ClassVar[set[str]] = set() + + def __init__( + self, + dto_factory: type[AbstractDTO], + field_definition: FieldDefinition, + handler_id: str, + is_data_field: bool, + model_type: type[Any], + wrapper_attribute_name: str | None, + ) -> None: + """Create dto backend instance. + + Args: + dto_factory: The DTO factory class calling this backend. + field_definition: Parsed type. + handler_id: The name of the handler that this backend is for. + is_data_field: Whether the field is a subclass of DTOData. + model_type: Model type. + wrapper_attribute_name: If the data that DTO should operate upon is wrapped in a generic datastructure, this is the name of the attribute that the data is stored in. + """ + self.dto_factory: Final[type[AbstractDTO]] = dto_factory + self.field_definition: Final[FieldDefinition] = field_definition + self.is_data_field: Final[bool] = is_data_field + self.handler_id: Final[str] = handler_id + self.model_type: Final[type[Any]] = model_type + self.wrapper_attribute_name: Final[str | None] = wrapper_attribute_name + + self.parsed_field_definitions = self.parse_model( + model_type=model_type, + exclude=self.dto_factory.config.exclude, + include=self.dto_factory.config.include, + rename_fields=self.dto_factory.config.rename_fields, + ) + self.transfer_model_type = self.create_transfer_model_type( + model_name=model_type.__name__, field_definitions=self.parsed_field_definitions + ) + self.dto_data_type: type[DTOData] | None = None + + if field_definition.is_subclass_of(DTOData): + self.dto_data_type = field_definition.annotation + field_definition = self.field_definition.inner_types[0] + + self.annotation = build_annotation_for_backend(model_type, field_definition, self.transfer_model_type) + + def parse_model( + self, + model_type: Any, + exclude: AbstractSet[str], + include: AbstractSet[str], + rename_fields: dict[str, str], + nested_depth: int = 0, + ) -> tuple[TransferDTOFieldDefinition, ...]: + """Reduce :attr:`model_type` to a tuple :class:`TransferDTOFieldDefinition` instances. + + Returns: + Fields for data transfer. + """ + defined_fields = [] + generic_field_definitons = list(FieldDefinition.from_annotation(model_type).generic_types or ()) + for field_definition in self.dto_factory.generate_field_definitions(model_type): + if field_definition.is_type_var: + base_arg_field = generic_field_definitons.pop() + field_definition = replace( + field_definition, annotation=base_arg_field.annotation, raw=base_arg_field.raw + ) + + if _should_mark_private(field_definition, self.dto_factory.config.underscore_fields_private): + field_definition.dto_field.mark = Mark.PRIVATE + + try: + transfer_type = self._create_transfer_type( + field_definition=field_definition, + exclude=exclude, + include=include, + rename_fields=rename_fields, + field_name=field_definition.name, + unique_name=field_definition.model_name, + nested_depth=nested_depth, + ) + except RecursionError: + continue + + transfer_field_definition = TransferDTOFieldDefinition.from_dto_field_definition( + field_definition=field_definition, + serialization_name=rename_fields.get(field_definition.name), + transfer_type=transfer_type, + is_partial=self.dto_factory.config.partial, + is_excluded=_should_exclude_field( + field_definition=field_definition, + exclude=exclude, + include=include, + is_data_field=self.is_data_field, + ), + ) + defined_fields.append(transfer_field_definition) + return tuple(defined_fields) + + def _create_transfer_model_name(self, model_name: str) -> str: + long_name_prefix = self.handler_id.split("::")[0] + short_name_prefix = _camelize(long_name_prefix.split(".")[-1], True) + + name_suffix = "RequestBody" if self.is_data_field else "ResponseBody" + + if (short_name := f"{short_name_prefix}{model_name}{name_suffix}") not in self._seen_model_names: + name = short_name + elif (long_name := f"{long_name_prefix}{model_name}{name_suffix}") not in self._seen_model_names: + name = long_name + else: + name = unique_name_for_scope(long_name, self._seen_model_names) + + self._seen_model_names.add(name) + + return name + + def create_transfer_model_type( + self, + model_name: str, + field_definitions: tuple[TransferDTOFieldDefinition, ...], + ) -> type[Struct]: + """Create a model for data transfer. + + Args: + model_name: name for the type that should be unique across all transfer types. + field_definitions: field definitions for the container type. + + Returns: + A ``BackendT`` class. + """ + struct_name = self._create_transfer_model_name(model_name) + + struct = _create_struct_for_field_definitions( + struct_name, field_definitions, self.dto_factory.config.rename_strategy + ) + setattr(struct, "__schema_name__", struct_name) + return struct + + def parse_raw(self, raw: bytes, asgi_connection: ASGIConnection) -> Struct | Collection[Struct]: + """Parse raw bytes into transfer model type. + + Args: + raw: bytes + asgi_connection: The current ASGI Connection + + Returns: + The raw bytes parsed into transfer model type. + """ + request_encoding = RequestEncodingType.JSON + + if (content_type := getattr(asgi_connection, "content_type", None)) and (media_type := content_type[0]): + request_encoding = media_type + + type_decoders = asgi_connection.route_handler.resolve_type_decoders() + + if request_encoding == RequestEncodingType.MESSAGEPACK: + result = decode_msgpack(value=raw, target_type=self.annotation, type_decoders=type_decoders) + else: + result = decode_json(value=raw, target_type=self.annotation, type_decoders=type_decoders) + + return cast("Struct | Collection[Struct]", result) + + def parse_builtins(self, builtins: Any, asgi_connection: ASGIConnection) -> Any: + """Parse builtin types into transfer model type. + + Args: + builtins: Builtin type. + asgi_connection: The current ASGI Connection + + Returns: + The builtin type parsed into transfer model type. + """ + return convert( + obj=builtins, + type=self.annotation, + dec_hook=asgi_connection.route_handler.default_deserializer, + strict=False, + str_keys=True, + ) + + def populate_data_from_builtins(self, builtins: Any, asgi_connection: ASGIConnection) -> Any: + """Populate model instance from builtin types. + + Args: + builtins: Builtin type. + asgi_connection: The current ASGI Connection + + Returns: + Instance or collection of ``model_type`` instances. + """ + if self.dto_data_type: + return self.dto_data_type( + backend=self, + data_as_builtins=_transfer_data( + destination_type=dict, + source_data=self.parse_builtins(builtins, asgi_connection), + field_definitions=self.parsed_field_definitions, + field_definition=self.field_definition, + is_data_field=self.is_data_field, + ), + ) + return self.transfer_data_from_builtins(self.parse_builtins(builtins, asgi_connection)) + + def transfer_data_from_builtins(self, builtins: Any) -> Any: + """Populate model instance from builtin types. + + Args: + builtins: Builtin type. + + Returns: + Instance or collection of ``model_type`` instances. + """ + return _transfer_data( + destination_type=self.model_type, + source_data=builtins, + field_definitions=self.parsed_field_definitions, + field_definition=self.field_definition, + is_data_field=self.is_data_field, + ) + + def populate_data_from_raw(self, raw: bytes, asgi_connection: ASGIConnection) -> Any: + """Parse raw bytes into instance of `model_type`. + + Args: + raw: bytes + asgi_connection: The current ASGI Connection + + Returns: + Instance or collection of ``model_type`` instances. + """ + if self.dto_data_type: + return self.dto_data_type( + backend=self, + data_as_builtins=_transfer_data( + destination_type=dict, + source_data=self.parse_raw(raw, asgi_connection), + field_definitions=self.parsed_field_definitions, + field_definition=self.field_definition, + is_data_field=self.is_data_field, + ), + ) + return _transfer_data( + destination_type=self.model_type, + source_data=self.parse_raw(raw, asgi_connection), + field_definitions=self.parsed_field_definitions, + field_definition=self.field_definition, + is_data_field=self.is_data_field, + ) + + def encode_data(self, data: Any) -> LitestarEncodableType: + """Encode data into a ``LitestarEncodableType``. + + Args: + data: Data to encode. + + Returns: + Encoded data. + """ + if self.wrapper_attribute_name: + wrapped_transfer = _transfer_data( + destination_type=self.transfer_model_type, + source_data=getattr(data, self.wrapper_attribute_name), + field_definitions=self.parsed_field_definitions, + field_definition=self.field_definition, + is_data_field=self.is_data_field, + ) + setattr( + data, + self.wrapper_attribute_name, + wrapped_transfer, + ) + return cast("LitestarEncodableType", data) + + return cast( + "LitestarEncodableType", + _transfer_data( + destination_type=self.transfer_model_type, + source_data=data, + field_definitions=self.parsed_field_definitions, + field_definition=self.field_definition, + is_data_field=self.is_data_field, + ), + ) + + def _get_handler_for_field_definition(self, field_definition: FieldDefinition) -> CompositeTypeHandler | None: + if field_definition.is_union: + return self._create_union_type + + if field_definition.is_tuple: + if len(field_definition.inner_types) == 2 and field_definition.inner_types[1].annotation is Ellipsis: + return self._create_collection_type + return self._create_tuple_type + + if field_definition.is_mapping: + return self._create_mapping_type + + if field_definition.is_non_string_collection: + return self._create_collection_type + return None + + def _create_transfer_type( + self, + field_definition: FieldDefinition, + exclude: AbstractSet[str], + include: AbstractSet[str], + rename_fields: dict[str, str], + field_name: str, + unique_name: str, + nested_depth: int, + ) -> CompositeType | SimpleType: + exclude = _filter_nested_field(exclude, field_name) + include = _filter_nested_field(include, field_name) + rename_fields = _filter_nested_field_mapping(rename_fields, field_name) + + if composite_type_handler := self._get_handler_for_field_definition(field_definition): + return composite_type_handler( + field_definition=field_definition, + exclude=exclude, + include=include, + rename_fields=rename_fields, + unique_name=unique_name, + nested_depth=nested_depth, + ) + + transfer_model: NestedFieldInfo | None = None + + if self.dto_factory.detect_nested_field(field_definition): + if nested_depth == self.dto_factory.config.max_nested_depth: + raise RecursionError + + nested_field_definitions = self.parse_model( + model_type=field_definition.annotation, + exclude=exclude, + include=include, + rename_fields=rename_fields, + nested_depth=nested_depth + 1, + ) + + transfer_model = NestedFieldInfo( + model=self.create_transfer_model_type(unique_name, nested_field_definitions), + field_definitions=nested_field_definitions, + ) + + return SimpleType(field_definition, nested_field_info=transfer_model) + + def _create_collection_type( + self, + field_definition: FieldDefinition, + exclude: AbstractSet[str], + include: AbstractSet[str], + rename_fields: dict[str, str], + unique_name: str, + nested_depth: int, + ) -> CollectionType: + inner_types = field_definition.inner_types + inner_type = self._create_transfer_type( + field_definition=inner_types[0] if inner_types else FieldDefinition.from_annotation(Any), + exclude=exclude, + include=include, + field_name="0", + unique_name=f"{unique_name}_0", + nested_depth=nested_depth, + rename_fields=rename_fields, + ) + return CollectionType( + field_definition=field_definition, inner_type=inner_type, has_nested=inner_type.has_nested + ) + + def _create_mapping_type( + self, + field_definition: FieldDefinition, + exclude: AbstractSet[str], + include: AbstractSet[str], + rename_fields: dict[str, str], + unique_name: str, + nested_depth: int, + ) -> MappingType: + inner_types = field_definition.inner_types + key_type = self._create_transfer_type( + field_definition=inner_types[0] if inner_types else FieldDefinition.from_annotation(Any), + exclude=exclude, + include=include, + field_name="0", + unique_name=f"{unique_name}_0", + nested_depth=nested_depth, + rename_fields=rename_fields, + ) + value_type = self._create_transfer_type( + field_definition=inner_types[1] if inner_types else FieldDefinition.from_annotation(Any), + exclude=exclude, + include=include, + field_name="1", + unique_name=f"{unique_name}_1", + nested_depth=nested_depth, + rename_fields=rename_fields, + ) + return MappingType( + field_definition=field_definition, + key_type=key_type, + value_type=value_type, + has_nested=key_type.has_nested or value_type.has_nested, + ) + + def _create_tuple_type( + self, + field_definition: FieldDefinition, + exclude: AbstractSet[str], + include: AbstractSet[str], + rename_fields: dict[str, str], + unique_name: str, + nested_depth: int, + ) -> TupleType: + inner_types = tuple( + self._create_transfer_type( + field_definition=inner_type, + exclude=exclude, + include=include, + field_name=str(i), + unique_name=f"{unique_name}_{i}", + nested_depth=nested_depth, + rename_fields=rename_fields, + ) + for i, inner_type in enumerate(field_definition.inner_types) + ) + return TupleType( + field_definition=field_definition, + inner_types=inner_types, + has_nested=any(t.has_nested for t in inner_types), + ) + + def _create_union_type( + self, + field_definition: FieldDefinition, + exclude: AbstractSet[str], + include: AbstractSet[str], + rename_fields: dict[str, str], + unique_name: str, + nested_depth: int, + ) -> UnionType: + inner_types = tuple( + self._create_transfer_type( + field_definition=inner_type, + exclude=exclude, + include=include, + field_name=str(i), + unique_name=f"{unique_name}_{i}", + nested_depth=nested_depth, + rename_fields=rename_fields, + ) + for i, inner_type in enumerate(field_definition.inner_types) + ) + return UnionType( + field_definition=field_definition, + inner_types=inner_types, + has_nested=any(t.has_nested for t in inner_types), + ) + + +def _camelize(value: str, capitalize_first_letter: bool) -> str: + return "".join( + word if index == 0 and not capitalize_first_letter else word.capitalize() + for index, word in enumerate(value.split("_")) + ) + + +def _filter_nested_field(field_name_set: AbstractSet[str], field_name: str) -> AbstractSet[str]: + """Filter a nested field name.""" + return {split[1] for s in field_name_set if (split := s.split(".", 1))[0] == field_name and len(split) > 1} + + +def _filter_nested_field_mapping(field_name_mapping: Mapping[str, str], field_name: str) -> dict[str, str]: + """Filter a nested field name.""" + return { + split[1]: v + for s, v in field_name_mapping.items() + if (split := s.split(".", 1))[0] == field_name and len(split) > 1 + } + + +def _transfer_data( + destination_type: type[Any], + source_data: Any | Collection[Any], + field_definitions: tuple[TransferDTOFieldDefinition, ...], + field_definition: FieldDefinition, + is_data_field: bool, +) -> Any: + """Create instance or iterable of instances of ``destination_type``. + + Args: + destination_type: the model type received by the DTO on type narrowing. + source_data: data that has been parsed and validated via the backend. + field_definitions: model field definitions. + field_definition: the parsed type that represents the handler annotation for which the DTO is being applied. + is_data_field: whether the DTO is being applied to a ``data`` field. + + Returns: + Data parsed into ``destination_type``. + """ + if field_definition.is_non_string_collection: + if not field_definition.is_mapping: + return field_definition.instantiable_origin( + _transfer_data( + destination_type=destination_type, + source_data=item, + field_definitions=field_definitions, + field_definition=field_definition.inner_types[0], + is_data_field=is_data_field, + ) + for item in source_data + ) + return field_definition.instantiable_origin( + ( + key, + _transfer_data( + destination_type=destination_type, + source_data=value, + field_definitions=field_definitions, + field_definition=field_definition.inner_types[1], + is_data_field=is_data_field, + ), + ) + for key, value in source_data.items() # type: ignore[union-attr] + ) + + return _transfer_instance_data( + destination_type=destination_type, + source_instance=source_data, + field_definitions=field_definitions, + is_data_field=is_data_field, + ) + + +def _transfer_instance_data( + destination_type: type[Any], + source_instance: Any, + field_definitions: tuple[TransferDTOFieldDefinition, ...], + is_data_field: bool, +) -> Any: + """Create instance of ``destination_type`` with data from ``source_instance``. + + Args: + destination_type: the model type received by the DTO on type narrowing. + source_instance: primitive data that has been parsed and validated via the backend. + field_definitions: model field definitions. + is_data_field: whether the given field is a 'data' kwarg field. + + Returns: + Data parsed into ``model_type``. + """ + unstructured_data = {} + + for field_definition in field_definitions: + if not is_data_field: + if field_definition.is_excluded: + continue + elif not ( + field_definition.name in source_instance + if isinstance(source_instance, Mapping) + else hasattr(source_instance, field_definition.name) + ): + continue + + transfer_type = field_definition.transfer_type + source_value = ( + source_instance[field_definition.name] + if isinstance(source_instance, Mapping) + else getattr(source_instance, field_definition.name) + ) + + if field_definition.is_partial and is_data_field and source_value is UNSET: + continue + + unstructured_data[field_definition.name] = _transfer_type_data( + source_value=source_value, + transfer_type=transfer_type, + nested_as_dict=destination_type is dict, + is_data_field=is_data_field, + ) + + return destination_type(**unstructured_data) + + +def _transfer_type_data( + source_value: Any, + transfer_type: TransferType, + nested_as_dict: bool, + is_data_field: bool, +) -> Any: + if isinstance(transfer_type, SimpleType) and transfer_type.nested_field_info: + if nested_as_dict: + destination_type: Any = dict + elif is_data_field: + destination_type = transfer_type.field_definition.annotation + else: + destination_type = transfer_type.nested_field_info.model + + return _transfer_instance_data( + destination_type=destination_type, + source_instance=source_value, + field_definitions=transfer_type.nested_field_info.field_definitions, + is_data_field=is_data_field, + ) + + if isinstance(transfer_type, UnionType) and transfer_type.has_nested: + return _transfer_nested_union_type_data( + transfer_type=transfer_type, + source_value=source_value, + is_data_field=is_data_field, + ) + + if isinstance(transfer_type, CollectionType): + if transfer_type.has_nested: + return transfer_type.field_definition.instantiable_origin( + _transfer_type_data( + source_value=item, + transfer_type=transfer_type.inner_type, + nested_as_dict=False, + is_data_field=is_data_field, + ) + for item in source_value + ) + + return transfer_type.field_definition.instantiable_origin(source_value) + return source_value + + +def _transfer_nested_union_type_data( + transfer_type: UnionType, + source_value: Any, + is_data_field: bool, +) -> Any: + for inner_type in transfer_type.inner_types: + if isinstance(inner_type, CompositeType): + raise RuntimeError("Composite inner types not (yet) supported for nested unions.") + + if inner_type.nested_field_info and isinstance( + source_value, + inner_type.nested_field_info.model if is_data_field else inner_type.field_definition.annotation, + ): + return _transfer_instance_data( + destination_type=inner_type.field_definition.annotation + if is_data_field + else inner_type.nested_field_info.model, + source_instance=source_value, + field_definitions=inner_type.nested_field_info.field_definitions, + is_data_field=is_data_field, + ) + return source_value + + +def _create_msgspec_field(field_definition: TransferDTOFieldDefinition) -> Any: + kwargs: dict[str, Any] = {} + if field_definition.is_partial: + kwargs["default"] = UNSET + + elif field_definition.default is not Empty: + kwargs["default"] = field_definition.default + + elif field_definition.default_factory is not None: + kwargs["default_factory"] = field_definition.default_factory + + if field_definition.serialization_name is not None: + kwargs["name"] = field_definition.serialization_name + + return field(**kwargs) + + +def _create_struct_field_meta_for_field_definition(field_definition: TransferDTOFieldDefinition) -> msgspec.Meta | None: + if (kwarg_definition := field_definition.kwarg_definition) is None or not isinstance( + kwarg_definition, KwargDefinition + ): + return None + + return msgspec.Meta( + description=kwarg_definition.description, + examples=[e.value for e in kwarg_definition.examples or []], + ge=kwarg_definition.ge, + gt=kwarg_definition.gt, + le=kwarg_definition.le, + lt=kwarg_definition.lt, + max_length=kwarg_definition.max_length if not field_definition.is_partial else None, + min_length=kwarg_definition.min_length if not field_definition.is_partial else None, + multiple_of=kwarg_definition.multiple_of, + pattern=kwarg_definition.pattern, + title=kwarg_definition.title, + ) + + +def _create_struct_for_field_definitions( + model_name: str, + field_definitions: tuple[TransferDTOFieldDefinition, ...], + rename_strategy: RenameStrategy | dict[str, str] | None, +) -> type[Struct]: + struct_fields: list[tuple[str, type] | tuple[str, type, type]] = [] + + for field_definition in field_definitions: + if field_definition.is_excluded: + continue + + field_type = _create_transfer_model_type_annotation(field_definition.transfer_type) + if field_definition.is_partial: + field_type = Union[field_type, UnsetType] + + if (field_meta := _create_struct_field_meta_for_field_definition(field_definition)) is not None: + field_type = Annotated[field_type, field_meta] + + struct_fields.append( + ( + field_definition.name, + field_type, + _create_msgspec_field(field_definition), + ) + ) + return defstruct(model_name, struct_fields, frozen=True, kw_only=True, rename=rename_strategy) + + +def build_annotation_for_backend( + model_type: type[Any], field_definition: FieldDefinition, transfer_model: type[Struct] +) -> Any: + """A helper to re-build a generic outer type with new inner type. + + Args: + model_type: The original model type. + field_definition: The parsed type that represents the handler annotation for which the DTO is being applied. + transfer_model: The transfer model generated to represent the model type. + + Returns: + Annotation with new inner type if applicable. + """ + if not field_definition.inner_types: + if field_definition.is_subclass_of(model_type): + return transfer_model + return field_definition.annotation + + inner_types = tuple( + build_annotation_for_backend(model_type, inner_type, transfer_model) + for inner_type in field_definition.inner_types + ) + + return field_definition.safe_generic_origin[inner_types] + + +def _should_mark_private(field_definition: DTOFieldDefinition, underscore_fields_private: bool) -> bool: + """Returns ``True`` where a field should be marked as private. + + Fields should be marked as private when: + - the ``underscore_fields_private`` flag is set. + - the field is not already marked. + - the field name is prefixed with an underscore + + Args: + field_definition: defined DTO field + underscore_fields_private: whether fields prefixed with an underscore should be marked as private. + """ + return bool( + underscore_fields_private and field_definition.dto_field.mark is None and field_definition.name.startswith("_") + ) + + +def _should_exclude_field( + field_definition: DTOFieldDefinition, exclude: AbstractSet[str], include: AbstractSet[str], is_data_field: bool +) -> bool: + """Returns ``True`` where a field should be excluded from data transfer. + + Args: + field_definition: defined DTO field + exclude: names of fields to exclude + include: names of fields to exclude + is_data_field: whether the field is a data field + + Returns: + ``True`` if the field should not be included in any data transfer. + """ + field_name = field_definition.name + if field_name in exclude: + return True + if include and field_name not in include and not (any(f.startswith(f"{field_name}.") for f in include)): + return True + if field_definition.dto_field.mark is Mark.PRIVATE: + return True + if is_data_field and field_definition.dto_field.mark is Mark.READ_ONLY: + return True + return not is_data_field and field_definition.dto_field.mark is Mark.WRITE_ONLY + + +def _create_transfer_model_type_annotation(transfer_type: TransferType) -> Any: + """Create a type annotation for a transfer model. + + Uses the parsed type that originates from the data model and the transfer model generated to represent a nested + type to reconstruct the type annotation for the transfer model. + """ + if isinstance(transfer_type, SimpleType): + if transfer_type.nested_field_info: + return transfer_type.nested_field_info.model + return transfer_type.field_definition.annotation + + if isinstance(transfer_type, CollectionType): + return _create_transfer_model_collection_type(transfer_type) + + if isinstance(transfer_type, MappingType): + return _create_transfer_model_mapping_type(transfer_type) + + if isinstance(transfer_type, TupleType): + return _create_transfer_model_tuple_type(transfer_type) + + if isinstance(transfer_type, UnionType): + return _create_transfer_model_union_type(transfer_type) + + raise RuntimeError(f"Unexpected transfer type: {type(transfer_type)}") + + +def _create_transfer_model_collection_type(transfer_type: CollectionType) -> Any: + generic_collection_type = transfer_type.field_definition.safe_generic_origin + inner_type = _create_transfer_model_type_annotation(transfer_type.inner_type) + if transfer_type.field_definition.origin is tuple: + return generic_collection_type[inner_type, ...] + return generic_collection_type[inner_type] + + +def _create_transfer_model_tuple_type(transfer_type: TupleType) -> Any: + inner_types = tuple(_create_transfer_model_type_annotation(t) for t in transfer_type.inner_types) + return transfer_type.field_definition.safe_generic_origin[inner_types] + + +def _create_transfer_model_union_type(transfer_type: UnionType) -> Any: + inner_types = tuple(_create_transfer_model_type_annotation(t) for t in transfer_type.inner_types) + return transfer_type.field_definition.safe_generic_origin[inner_types] + + +def _create_transfer_model_mapping_type(transfer_type: MappingType) -> Any: + key_type = _create_transfer_model_type_annotation(transfer_type.key_type) + value_type = _create_transfer_model_type_annotation(transfer_type.value_type) + return transfer_type.field_definition.safe_generic_origin[key_type, value_type] diff --git a/venv/lib/python3.11/site-packages/litestar/dto/_codegen_backend.py b/venv/lib/python3.11/site-packages/litestar/dto/_codegen_backend.py new file mode 100644 index 0000000..deff908 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/_codegen_backend.py @@ -0,0 +1,541 @@ +"""DTO backends do the heavy lifting of decoding and validating raw bytes into domain models, and +back again, to bytes. +""" + +from __future__ import annotations + +import re +import textwrap +from contextlib import contextmanager, nullcontext +from typing import ( + TYPE_CHECKING, + Any, + Callable, + ContextManager, + Generator, + Mapping, + Protocol, + cast, +) + +from msgspec import UNSET + +from litestar.dto._backend import DTOBackend +from litestar.dto._types import ( + CollectionType, + CompositeType, + SimpleType, + TransferDTOFieldDefinition, + TransferType, + UnionType, +) +from litestar.utils.helpers import unique_name_for_scope + +if TYPE_CHECKING: + from litestar.connection import ASGIConnection + from litestar.dto import AbstractDTO + from litestar.types.serialization import LitestarEncodableType + from litestar.typing import FieldDefinition + +__all__ = ("DTOCodegenBackend",) + + +class DTOCodegenBackend(DTOBackend): + __slots__ = ( + "_transfer_to_dict", + "_transfer_to_model_type", + "_transfer_data_from_builtins", + "_transfer_data_from_builtins_with_overrides", + "_encode_data", + ) + + def __init__( + self, + dto_factory: type[AbstractDTO], + field_definition: FieldDefinition, + handler_id: str, + is_data_field: bool, + model_type: type[Any], + wrapper_attribute_name: str | None, + ) -> None: + """Create dto backend instance. + + Args: + dto_factory: The DTO factory class calling this backend. + field_definition: Parsed type. + handler_id: The name of the handler that this backend is for. + is_data_field: Whether the field is a subclass of DTOData. + model_type: Model type. + wrapper_attribute_name: If the data that DTO should operate upon is wrapped in a generic datastructure, + this is the name of the attribute that the data is stored in. + """ + super().__init__( + dto_factory=dto_factory, + field_definition=field_definition, + handler_id=handler_id, + is_data_field=is_data_field, + model_type=model_type, + wrapper_attribute_name=wrapper_attribute_name, + ) + self._transfer_to_dict = self._create_transfer_data_fn( + destination_type=dict, + field_definition=self.field_definition, + ) + self._transfer_to_model_type = self._create_transfer_data_fn( + destination_type=self.model_type, + field_definition=self.field_definition, + ) + self._transfer_data_from_builtins = self._create_transfer_data_fn( + destination_type=self.model_type, + field_definition=self.field_definition, + ) + self._transfer_data_from_builtins_with_overrides = self._create_transfer_data_fn( + destination_type=self.model_type, + field_definition=self.field_definition, + ) + self._encode_data = self._create_transfer_data_fn( + destination_type=self.transfer_model_type, + field_definition=self.field_definition, + ) + + def populate_data_from_builtins(self, builtins: Any, asgi_connection: ASGIConnection) -> Any: + """Populate model instance from builtin types. + + Args: + builtins: Builtin type. + asgi_connection: The current ASGI Connection + + Returns: + Instance or collection of ``model_type`` instances. + """ + if self.dto_data_type: + return self.dto_data_type( + backend=self, + data_as_builtins=self._transfer_to_dict(self.parse_builtins(builtins, asgi_connection)), + ) + return self.transfer_data_from_builtins(self.parse_builtins(builtins, asgi_connection)) + + def transfer_data_from_builtins(self, builtins: Any) -> Any: + """Populate model instance from builtin types. + + Args: + builtins: Builtin type. + + Returns: + Instance or collection of ``model_type`` instances. + """ + return self._transfer_data_from_builtins(builtins) + + def populate_data_from_raw(self, raw: bytes, asgi_connection: ASGIConnection) -> Any: + """Parse raw bytes into instance of `model_type`. + + Args: + raw: bytes + asgi_connection: The current ASGI Connection + + Returns: + Instance or collection of ``model_type`` instances. + """ + if self.dto_data_type: + return self.dto_data_type( + backend=self, + data_as_builtins=self._transfer_to_dict(self.parse_raw(raw, asgi_connection)), + ) + return self._transfer_to_model_type(self.parse_raw(raw, asgi_connection)) + + def encode_data(self, data: Any) -> LitestarEncodableType: + """Encode data into a ``LitestarEncodableType``. + + Args: + data: Data to encode. + + Returns: + Encoded data. + """ + if self.wrapper_attribute_name: + wrapped_transfer = self._encode_data(getattr(data, self.wrapper_attribute_name)) + setattr(data, self.wrapper_attribute_name, wrapped_transfer) + return cast("LitestarEncodableType", data) + + return cast("LitestarEncodableType", self._encode_data(data)) + + def _create_transfer_data_fn( + self, + destination_type: type[Any], + field_definition: FieldDefinition, + ) -> Any: + """Create instance or iterable of instances of ``destination_type``. + + Args: + destination_type: the model type received by the DTO on type narrowing. + field_definition: the parsed type that represents the handler annotation for which the DTO is being applied. + + Returns: + Data parsed into ``destination_type``. + """ + + return TransferFunctionFactory.create_transfer_data( + destination_type=destination_type, + field_definitions=self.parsed_field_definitions, + is_data_field=self.is_data_field, + field_definition=field_definition, + ) + + +class FieldAccessManager(Protocol): + def __call__(self, source_name: str, field_name: str, expect_optional: bool) -> ContextManager[str]: ... + + +class TransferFunctionFactory: + def __init__(self, is_data_field: bool, nested_as_dict: bool) -> None: + self.is_data_field = is_data_field + self._fn_locals: dict[str, Any] = { + "Mapping": Mapping, + "UNSET": UNSET, + } + self._indentation = 1 + self._body = "" + self.names: set[str] = set() + self.nested_as_dict = nested_as_dict + self._re_index_access = re.compile(r"\[['\"](\w+?)['\"]]") + + def _add_to_fn_globals(self, name: str, value: Any) -> str: + unique_name = unique_name_for_scope(name, self._fn_locals) + self._fn_locals[unique_name] = value + return unique_name + + def _create_local_name(self, name: str) -> str: + unique_name = unique_name_for_scope(name, self.names) + self.names.add(unique_name) + return unique_name + + def _make_function( + self, source_value_name: str, return_value_name: str, fn_name: str = "func" + ) -> Callable[[Any], Any]: + """Wrap the current body contents in a function definition and turn it into a callable object""" + source = f"def {fn_name}({source_value_name}):\n{self._body} return {return_value_name}" + ctx: dict[str, Any] = {**self._fn_locals} + exec(source, ctx) # noqa: S102 + return ctx["func"] # type: ignore[no-any-return] + + def _add_stmt(self, stmt: str) -> None: + self._body += textwrap.indent(stmt + "\n", " " * self._indentation) + + @contextmanager + def _start_block(self, expr: str | None = None) -> Generator[None, None, None]: + """Start an indented block. If `expr` is given, use it as the "opening line" + of the block. + """ + if expr is not None: + self._add_stmt(expr) + self._indentation += 1 + yield + self._indentation -= 1 + + @contextmanager + def _try_except_pass(self, exception: str) -> Generator[None, None, None]: + """Enter a `try / except / pass` block. Content written while inside this context + will go into the `try` block. + """ + with self._start_block("try:"): + yield + with self._start_block(expr=f"except {exception}:"): + self._add_stmt("pass") + + @contextmanager + def _access_mapping_item( + self, source_name: str, field_name: str, expect_optional: bool + ) -> Generator[str, None, None]: + """Enter a context within which an item of a mapping can be accessed safely, + i.e. only if it is contained within that mapping. + Yields an expression that accesses the mapping item. Content written while + within this context can use this expression to access the desired value. + """ + value_expr = f"{source_name}['{field_name}']" + + # if we expect an optional item, it's faster to check if it exists beforehand + if expect_optional: + with self._start_block(f"if '{field_name}' in {source_name}:"): + yield value_expr + # the happy path of a try/except will be faster than that, so we use that if + # we expect a value + else: + with self._try_except_pass("KeyError"): + yield value_expr + + @contextmanager + def _access_attribute(self, source_name: str, field_name: str, expect_optional: bool) -> Generator[str, None, None]: + """Enter a context within which an attribute of an object can be accessed + safely, i.e. only if the object actually has the attribute. + Yields an expression that retrieves the object attribute. Content written while + within this context can use this expression to access the desired value. + """ + + value_expr = f"{source_name}.{field_name}" + + # if we expect an optional attribute it's faster to check with hasattr + if expect_optional: + with self._start_block(f"if hasattr({source_name}, '{field_name}'):"): + yield value_expr + # the happy path of a try/except will be faster than that, so we use that if + # we expect a value + else: + with self._try_except_pass("AttributeError"): + yield value_expr + + @classmethod + def create_transfer_instance_data( + cls, + field_definitions: tuple[TransferDTOFieldDefinition, ...], + destination_type: type[Any], + is_data_field: bool, + ) -> Callable[[Any], Any]: + factory = cls(is_data_field=is_data_field, nested_as_dict=destination_type is dict) + tmp_return_type_name = factory._create_local_name("tmp_return_type") + source_instance_name = factory._create_local_name("source_instance") + destination_type_name = factory._add_to_fn_globals("destination_type", destination_type) + factory._create_transfer_instance_data( + tmp_return_type_name=tmp_return_type_name, + source_instance_name=source_instance_name, + destination_type_name=destination_type_name, + field_definitions=field_definitions, + destination_type_is_dict=destination_type is dict, + ) + return factory._make_function(source_value_name=source_instance_name, return_value_name=tmp_return_type_name) + + @classmethod + def create_transfer_type_data( + cls, + transfer_type: TransferType, + is_data_field: bool, + ) -> Callable[[Any], Any]: + factory = cls(is_data_field=is_data_field, nested_as_dict=False) + tmp_return_type_name = factory._create_local_name("tmp_return_type") + source_value_name = factory._create_local_name("source_value") + factory._create_transfer_type_data_body( + transfer_type=transfer_type, + nested_as_dict=False, + assignment_target=tmp_return_type_name, + source_value_name=source_value_name, + ) + return factory._make_function(source_value_name=source_value_name, return_value_name=tmp_return_type_name) + + @classmethod + def create_transfer_data( + cls, + destination_type: type[Any], + field_definitions: tuple[TransferDTOFieldDefinition, ...], + is_data_field: bool, + field_definition: FieldDefinition | None = None, + ) -> Callable[[Any], Any]: + if field_definition and field_definition.is_non_string_collection: + factory = cls( + is_data_field=is_data_field, + nested_as_dict=False, + ) + source_value_name = factory._create_local_name("source_value") + return_value_name = factory._create_local_name("tmp_return_value") + factory._create_transfer_data_body_nested( + field_definitions=field_definitions, + field_definition=field_definition, + destination_type=destination_type, + source_data_name=source_value_name, + assignment_target=return_value_name, + ) + return factory._make_function(source_value_name=source_value_name, return_value_name=return_value_name) + + return cls.create_transfer_instance_data( + destination_type=destination_type, + field_definitions=field_definitions, + is_data_field=is_data_field, + ) + + def _create_transfer_data_body_nested( + self, + field_definition: FieldDefinition, + field_definitions: tuple[TransferDTOFieldDefinition, ...], + destination_type: type[Any], + source_data_name: str, + assignment_target: str, + ) -> None: + origin_name = self._add_to_fn_globals("origin", field_definition.instantiable_origin) + transfer_func = TransferFunctionFactory.create_transfer_data( + is_data_field=self.is_data_field, + destination_type=destination_type, + field_definition=field_definition.inner_types[0], + field_definitions=field_definitions, + ) + transfer_func_name = self._add_to_fn_globals("transfer_data", transfer_func) + if field_definition.is_mapping: + self._add_stmt( + f"{assignment_target} = {origin_name}((key, {transfer_func_name}(item)) for key, item in {source_data_name}.items())" + ) + else: + self._add_stmt( + f"{assignment_target} = {origin_name}({transfer_func_name}(item) for item in {source_data_name})" + ) + + def _create_transfer_instance_data( + self, + tmp_return_type_name: str, + source_instance_name: str, + destination_type_name: str, + field_definitions: tuple[TransferDTOFieldDefinition, ...], + destination_type_is_dict: bool, + ) -> None: + local_dict_name = self._create_local_name("unstructured_data") + self._add_stmt(f"{local_dict_name} = {{}}") + + if field_definitions := tuple(f for f in field_definitions if self.is_data_field or not f.is_excluded): + if len(field_definitions) > 1 and ("." in source_instance_name or "[" in source_instance_name): + # If there's more than one field we have to access, we check if it is + # nested. If it is nested, we assign it to a local variable to avoid + # repeated lookups. This is only a small performance improvement for + # regular attributes, but can be quite significant for properties or + # other types of descriptors, where I/O may be involved, such as the + # case for lazy loaded relationships in SQLAlchemy + if "." in source_instance_name: + level_1, level_2 = source_instance_name.split(".", 1) + else: + level_1, level_2, *_ = self._re_index_access.split(source_instance_name, maxsplit=1) + + new_source_instance_name = self._create_local_name(f"{level_1}_{level_2}") + self._add_stmt(f"{new_source_instance_name} = {source_instance_name}") + source_instance_name = new_source_instance_name + + for source_type in ("mapping", "object"): + if source_type == "mapping": + block_expr = f"if isinstance({source_instance_name}, Mapping):" + access_item = self._access_mapping_item + else: + block_expr = "else:" + access_item = self._access_attribute + + with self._start_block(expr=block_expr): + self._create_transfer_instance_data_inner( + local_dict_name=local_dict_name, + field_definitions=field_definitions, + access_field_safe=access_item, + source_instance_name=source_instance_name, + ) + + # if the destination type is a dict we can reuse our temporary dictionary of + # unstructured data as the "return value" + if not destination_type_is_dict: + self._add_stmt(f"{tmp_return_type_name} = {destination_type_name}(**{local_dict_name})") + else: + self._add_stmt(f"{tmp_return_type_name} = {local_dict_name}") + + def _create_transfer_instance_data_inner( + self, + *, + local_dict_name: str, + field_definitions: tuple[TransferDTOFieldDefinition, ...], + access_field_safe: FieldAccessManager, + source_instance_name: str, + ) -> None: + for field_definition in field_definitions: + with access_field_safe( + source_name=source_instance_name, + field_name=field_definition.name, + expect_optional=field_definition.is_partial or field_definition.is_optional, + ) as source_value_expr: + if self.is_data_field and field_definition.is_partial: + # we assign the source value to a name here, so we can skip + # getting it twice from the source instance + source_value_name = self._create_local_name("source_value") + self._add_stmt(f"{source_value_name} = {source_value_expr}") + ctx = self._start_block(f"if {source_value_name} is not UNSET:") + else: + # in these cases, we only ever access the source value once, so + # we can skip assigning it + source_value_name = source_value_expr + ctx = nullcontext() # type: ignore[assignment] + with ctx: + self._create_transfer_type_data_body( + transfer_type=field_definition.transfer_type, + nested_as_dict=self.nested_as_dict, + source_value_name=source_value_name, + assignment_target=f"{local_dict_name}['{field_definition.name}']", + ) + + def _create_transfer_type_data_body( + self, + transfer_type: TransferType, + nested_as_dict: bool, + source_value_name: str, + assignment_target: str, + ) -> None: + if isinstance(transfer_type, SimpleType) and transfer_type.nested_field_info: + if nested_as_dict: + destination_type: Any = dict + elif self.is_data_field: + destination_type = transfer_type.field_definition.annotation + else: + destination_type = transfer_type.nested_field_info.model + + self._create_transfer_instance_data( + field_definitions=transfer_type.nested_field_info.field_definitions, + tmp_return_type_name=assignment_target, + source_instance_name=source_value_name, + destination_type_name=self._add_to_fn_globals("destination_type", destination_type), + destination_type_is_dict=destination_type is dict, + ) + return + + if isinstance(transfer_type, UnionType) and transfer_type.has_nested: + self._create_transfer_nested_union_type_data( + transfer_type=transfer_type, + source_value_name=source_value_name, + assignment_target=assignment_target, + ) + return + + if isinstance(transfer_type, CollectionType): + origin_name = self._add_to_fn_globals("origin", transfer_type.field_definition.instantiable_origin) + if transfer_type.has_nested: + transfer_type_data_fn = TransferFunctionFactory.create_transfer_type_data( + is_data_field=self.is_data_field, transfer_type=transfer_type.inner_type + ) + transfer_type_data_name = self._add_to_fn_globals("transfer_type_data", transfer_type_data_fn) + self._add_stmt( + f"{assignment_target} = {origin_name}({transfer_type_data_name}(item) for item in {source_value_name})" + ) + return + + self._add_stmt(f"{assignment_target} = {origin_name}({source_value_name})") + return + + self._add_stmt(f"{assignment_target} = {source_value_name}") + + def _create_transfer_nested_union_type_data( + self, + transfer_type: UnionType, + source_value_name: str, + assignment_target: str, + ) -> None: + for inner_type in transfer_type.inner_types: + if isinstance(inner_type, CompositeType): + continue + + if inner_type.nested_field_info: + if self.is_data_field: + constraint_type = inner_type.nested_field_info.model + destination_type = inner_type.field_definition.annotation + else: + constraint_type = inner_type.field_definition.annotation + destination_type = inner_type.nested_field_info.model + + constraint_type_name = self._add_to_fn_globals("constraint_type", constraint_type) + destination_type_name = self._add_to_fn_globals("destination_type", destination_type) + + with self._start_block(f"if isinstance({source_value_name}, {constraint_type_name}):"): + self._create_transfer_instance_data( + destination_type_name=destination_type_name, + destination_type_is_dict=destination_type is dict, + field_definitions=inner_type.nested_field_info.field_definitions, + source_instance_name=source_value_name, + tmp_return_type_name=assignment_target, + ) + return + self._add_stmt(f"{assignment_target} = {source_value_name}") diff --git a/venv/lib/python3.11/site-packages/litestar/dto/_types.py b/venv/lib/python3.11/site-packages/litestar/dto/_types.py new file mode 100644 index 0000000..24e99b7 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/_types.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from litestar.dto.data_structures import DTOFieldDefinition + +if TYPE_CHECKING: + from typing import Any + + from typing_extensions import Self + + from litestar.typing import FieldDefinition + + +@dataclass(frozen=True) +class NestedFieldInfo: + """Type for representing fields and model type of nested model type.""" + + __slots__ = ("model", "field_definitions") + + model: type[Any] + field_definitions: tuple[TransferDTOFieldDefinition, ...] + + +@dataclass(frozen=True) +class TransferType: + """Type for representing model types for data transfer.""" + + __slots__ = ("field_definition",) + + field_definition: FieldDefinition + + +@dataclass(frozen=True) +class SimpleType(TransferType): + """Represents indivisible, non-composite types.""" + + __slots__ = ("nested_field_info",) + + nested_field_info: NestedFieldInfo | None + """If the type is a 'nested' type, this is the model generated for transfer to/from it.""" + + @property + def has_nested(self) -> bool: + return self.nested_field_info is not None + + +@dataclass(frozen=True) +class CompositeType(TransferType): + """A type that is made up of other types.""" + + __slots__ = ("has_nested",) + + has_nested: bool + """Whether the type represents nested model types within itself.""" + + +@dataclass(frozen=True) +class UnionType(CompositeType): + """Type for representing union types for data transfer.""" + + __slots__ = ("inner_types",) + + inner_types: tuple[CompositeType | SimpleType, ...] + + +@dataclass(frozen=True) +class CollectionType(CompositeType): + """Type for representing collection types for data transfer.""" + + __slots__ = ("inner_type",) + + inner_type: CompositeType | SimpleType + + +@dataclass(frozen=True) +class TupleType(CompositeType): + """Type for representing tuples for data transfer.""" + + __slots__ = ("inner_types",) + + inner_types: tuple[CompositeType | SimpleType, ...] + + +@dataclass(frozen=True) +class MappingType(CompositeType): + """Type for representing mappings for data transfer.""" + + __slots__ = ("key_type", "value_type") + + key_type: CompositeType | SimpleType + value_type: CompositeType | SimpleType + + +@dataclass(frozen=True) +class TransferDTOFieldDefinition(DTOFieldDefinition): + __slots__ = ( + "default_factory", + "dto_field", + "model_name", + "is_excluded", + "is_partial", + "serialization_name", + "transfer_type", + "unique_name", + ) + + transfer_type: TransferType + """Type of the field for transfer.""" + serialization_name: str | None + """Name of the field as it should appear in serialized form.""" + is_partial: bool + """Whether the field is optional for transfer.""" + is_excluded: bool + """Whether the field should be excluded from transfer.""" + + @classmethod + def from_dto_field_definition( + cls, + field_definition: DTOFieldDefinition, + transfer_type: TransferType, + serialization_name: str | None, + is_partial: bool, + is_excluded: bool, + ) -> Self: + return cls( + annotation=field_definition.annotation, + args=field_definition.args, + default=field_definition.default, + default_factory=field_definition.default_factory, + dto_field=field_definition.dto_field, + extra=field_definition.extra, + inner_types=field_definition.inner_types, + instantiable_origin=field_definition.instantiable_origin, + is_excluded=is_excluded, + is_partial=is_partial, + kwarg_definition=field_definition.kwarg_definition, + metadata=field_definition.metadata, + name=field_definition.name, + origin=field_definition.origin, + raw=field_definition.raw, + safe_generic_origin=field_definition.safe_generic_origin, + serialization_name=serialization_name, + transfer_type=transfer_type, + type_wrappers=field_definition.type_wrappers, + model_name=field_definition.model_name, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/dto/base_dto.py b/venv/lib/python3.11/site-packages/litestar/dto/base_dto.py new file mode 100644 index 0000000..991b09f --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/base_dto.py @@ -0,0 +1,313 @@ +from __future__ import annotations + +import typing +from abc import abstractmethod +from inspect import getmodule +from typing import TYPE_CHECKING, Collection, Generic, TypeVar + +from typing_extensions import NotRequired, TypedDict, get_type_hints + +from litestar.dto._backend import DTOBackend +from litestar.dto._codegen_backend import DTOCodegenBackend +from litestar.dto.config import DTOConfig +from litestar.dto.data_structures import DTOData +from litestar.dto.types import RenameStrategy +from litestar.enums import RequestEncodingType +from litestar.exceptions.dto_exceptions import InvalidAnnotationException +from litestar.types.builtin_types import NoneType +from litestar.types.composite_types import TypeEncodersMap +from litestar.typing import FieldDefinition + +if TYPE_CHECKING: + from typing import Any, ClassVar, Generator + + from typing_extensions import Self + + from litestar._openapi.schema_generation import SchemaCreator + from litestar.connection import ASGIConnection + from litestar.dto.data_structures import DTOFieldDefinition + from litestar.openapi.spec import Reference, Schema + from litestar.types.serialization import LitestarEncodableType + +__all__ = ("AbstractDTO",) + +T = TypeVar("T") + + +class _BackendDict(TypedDict): + data_backend: NotRequired[DTOBackend] + return_backend: NotRequired[DTOBackend] + + +class AbstractDTO(Generic[T]): + """Base class for DTO types.""" + + __slots__ = ("asgi_connection",) + + config: ClassVar[DTOConfig] + """Config objects to define properties of the DTO.""" + model_type: type[T] + """If ``annotation`` is an iterable, this is the inner type, otherwise will be the same as ``annotation``.""" + + _dto_backends: ClassVar[dict[str, _BackendDict]] = {} + + def __init__(self, asgi_connection: ASGIConnection) -> None: + """Create an AbstractDTOFactory type. + + Args: + asgi_connection: A :class:`ASGIConnection <litestar.connection.base.ASGIConnection>` instance. + """ + self.asgi_connection = asgi_connection + + def __class_getitem__(cls, annotation: Any) -> type[Self]: + field_definition = FieldDefinition.from_annotation(annotation) + + if (field_definition.is_optional and len(field_definition.args) > 2) or ( + field_definition.is_union and not field_definition.is_optional + ): + raise InvalidAnnotationException("Unions are currently not supported as type argument to DTOs.") + + if field_definition.is_forward_ref: + raise InvalidAnnotationException("Forward references are not supported as type argument to DTO") + + # if a configuration is not provided, and the type narrowing is a type var, we don't want to create a subclass + config = cls.get_dto_config_from_annotated_type(field_definition) + + if not config: + if field_definition.is_type_var: + return cls + config = cls.config if hasattr(cls, "config") else DTOConfig() + + cls_dict: dict[str, Any] = {"config": config, "_type_backend_map": {}, "_handler_backend_map": {}} + if not field_definition.is_type_var: + cls_dict.update(model_type=field_definition.annotation) + + return type(f"{cls.__name__}[{annotation}]", (cls,), cls_dict) # pyright: ignore + + def decode_builtins(self, value: dict[str, Any]) -> Any: + """Decode a dictionary of Python values into an the DTO's datatype.""" + + backend = self._dto_backends[self.asgi_connection.route_handler.handler_id]["data_backend"] # pyright: ignore + return backend.populate_data_from_builtins(value, self.asgi_connection) + + def decode_bytes(self, value: bytes) -> Any: + """Decode a byte string into an the DTO's datatype.""" + + backend = self._dto_backends[self.asgi_connection.route_handler.handler_id]["data_backend"] # pyright: ignore + return backend.populate_data_from_raw(value, self.asgi_connection) + + def data_to_encodable_type(self, data: T | Collection[T]) -> LitestarEncodableType: + backend = self._dto_backends[self.asgi_connection.route_handler.handler_id]["return_backend"] # pyright: ignore + return backend.encode_data(data) + + @classmethod + @abstractmethod + def generate_field_definitions(cls, model_type: type[Any]) -> Generator[DTOFieldDefinition, None, None]: + """Generate ``FieldDefinition`` instances from ``model_type``. + + Yields: + ``FieldDefinition`` instances. + """ + + @classmethod + @abstractmethod + def detect_nested_field(cls, field_definition: FieldDefinition) -> bool: + """Return ``True`` if ``field_definition`` represents a nested model field. + + Args: + field_definition: inspect type to determine if field represents a nested model. + + Returns: + ``True`` if ``field_definition`` represents a nested model field. + """ + + @classmethod + def is_supported_model_type_field(cls, field_definition: FieldDefinition) -> bool: + """Check support for the given type. + + Args: + field_definition: A :class:`FieldDefinition <litestar.typing.FieldDefinition>` instance. + + Returns: + Whether the type of the field definition is supported by the DTO. + """ + return field_definition.is_subclass_of(cls.model_type) or ( + field_definition.origin + and any( + cls.resolve_model_type(inner_field).is_subclass_of(cls.model_type) + for inner_field in field_definition.inner_types + ) + ) + + @classmethod + def create_for_field_definition( + cls, + field_definition: FieldDefinition, + handler_id: str, + backend_cls: type[DTOBackend] | None = None, + ) -> None: + """Creates a DTO subclass for a field definition. + + Args: + field_definition: A :class:`FieldDefinition <litestar.typing.FieldDefinition>` instance. + handler_id: ID of the route handler for which to create a DTO instance. + backend_cls: Alternative DTO backend class to use + + Returns: + None + """ + + if handler_id not in cls._dto_backends: + cls._dto_backends[handler_id] = {} + + backend_context = cls._dto_backends[handler_id] + key = "data_backend" if field_definition.name == "data" else "return_backend" + + if key not in backend_context: + model_type_field_definition = cls.resolve_model_type(field_definition=field_definition) + wrapper_attribute_name: str | None = None + + if not model_type_field_definition.is_subclass_of(cls.model_type): + if resolved_generic_result := cls.resolve_generic_wrapper_type( + field_definition=model_type_field_definition + ): + model_type_field_definition, field_definition, wrapper_attribute_name = resolved_generic_result + else: + raise InvalidAnnotationException( + f"DTO narrowed with '{cls.model_type}', handler type is '{field_definition.annotation}'" + ) + + if backend_cls is None: + backend_cls = DTOCodegenBackend if cls.config.experimental_codegen_backend else DTOBackend + elif backend_cls is DTOCodegenBackend and cls.config.experimental_codegen_backend is False: + backend_cls = DTOBackend + + backend_context[key] = backend_cls( # type: ignore[literal-required] + dto_factory=cls, + field_definition=field_definition, + model_type=model_type_field_definition.annotation, + wrapper_attribute_name=wrapper_attribute_name, + is_data_field=field_definition.name == "data", + handler_id=handler_id, + ) + + @classmethod + def create_openapi_schema( + cls, field_definition: FieldDefinition, handler_id: str, schema_creator: SchemaCreator + ) -> Reference | Schema: + """Create an OpenAPI request body. + + Returns: + OpenAPI request body. + """ + key = "data_backend" if field_definition.name == "data" else "return_backend" + backend = cls._dto_backends[handler_id][key] # type: ignore[literal-required] + return schema_creator.for_field_definition(FieldDefinition.from_annotation(backend.annotation)) + + @classmethod + def resolve_generic_wrapper_type( + cls, field_definition: FieldDefinition + ) -> tuple[FieldDefinition, FieldDefinition, str] | None: + """Handle where DTO supported data is wrapped in a generic container type. + + Args: + field_definition: A parsed type annotation that represents the annotation used to narrow the DTO type. + + Returns: + The data model type. + """ + if field_definition.origin and ( + inner_fields := [ + inner_field + for inner_field in field_definition.inner_types + if cls.resolve_model_type(inner_field).is_subclass_of(cls.model_type) + ] + ): + inner_field = inner_fields[0] + model_field_definition = cls.resolve_model_type(inner_field) + + for attr, attr_type in cls.get_model_type_hints(field_definition.origin).items(): + if isinstance(attr_type.annotation, TypeVar) or any( + isinstance(t.annotation, TypeVar) for t in attr_type.inner_types + ): + if attr_type.is_non_string_collection: + # the inner type of the collection type is the type var, so we need to specialize the + # collection type with the DTO supported type. + specialized_annotation = attr_type.safe_generic_origin[model_field_definition.annotation] + return model_field_definition, FieldDefinition.from_annotation(specialized_annotation), attr + return model_field_definition, inner_field, attr + return None + + @staticmethod + def get_model_type_hints( + model_type: type[Any], namespace: dict[str, Any] | None = None + ) -> dict[str, FieldDefinition]: + """Retrieve type annotations for ``model_type``. + + Args: + model_type: Any type-annotated class. + namespace: Optional namespace to use for resolving type hints. + + Returns: + Parsed type hints for ``model_type`` resolved within the scope of its module. + """ + namespace = namespace or {} + namespace.update(vars(typing)) + namespace.update( + { + "TypeEncodersMap": TypeEncodersMap, + "DTOConfig": DTOConfig, + "RenameStrategy": RenameStrategy, + "RequestEncodingType": RequestEncodingType, + } + ) + + if model_module := getmodule(model_type): + namespace.update(vars(model_module)) + + return { + k: FieldDefinition.from_kwarg(annotation=v, name=k) + for k, v in get_type_hints(model_type, localns=namespace, include_extras=True).items() # pyright: ignore + } + + @staticmethod + def get_dto_config_from_annotated_type(field_definition: FieldDefinition) -> DTOConfig | None: + """Extract data type and config instances from ``Annotated`` annotation. + + Args: + field_definition: A parsed type annotation that represents the annotation used to narrow the DTO type. + + Returns: + The type and config object extracted from the annotation. + """ + return next((item for item in field_definition.metadata if isinstance(item, DTOConfig)), None) + + @classmethod + def resolve_model_type(cls, field_definition: FieldDefinition) -> FieldDefinition: + """Resolve the data model type from a parsed type. + + Args: + field_definition: A parsed type annotation that represents the annotation used to narrow the DTO type. + + Returns: + A :class:`FieldDefinition <.typing.FieldDefinition>` that represents the data model type. + """ + if field_definition.is_optional: + return cls.resolve_model_type( + next(t for t in field_definition.inner_types if not t.is_subclass_of(NoneType)) + ) + + if field_definition.is_subclass_of(DTOData): + return cls.resolve_model_type(field_definition.inner_types[0]) + + if field_definition.is_collection: + if field_definition.is_mapping: + return cls.resolve_model_type(field_definition.inner_types[1]) + + if field_definition.is_tuple: + if any(t is Ellipsis for t in field_definition.args): + return cls.resolve_model_type(field_definition.inner_types[0]) + elif field_definition.is_non_string_collection: + return cls.resolve_model_type(field_definition.inner_types[0]) + + return field_definition diff --git a/venv/lib/python3.11/site-packages/litestar/dto/config.py b/venv/lib/python3.11/site-packages/litestar/dto/config.py new file mode 100644 index 0000000..e213d17 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/config.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from litestar.exceptions import ImproperlyConfiguredException + +if TYPE_CHECKING: + from typing import AbstractSet + + from litestar.dto.types import RenameStrategy + +__all__ = ("DTOConfig",) + + +@dataclass(frozen=True) +class DTOConfig: + """Control the generated DTO.""" + + exclude: AbstractSet[str] = field(default_factory=set) + """Explicitly exclude fields from the generated DTO. + + If exclude is specified, all fields not specified in exclude will be included by default. + + Notes: + - The field names are dot-separated paths to nested fields, e.g. ``"address.street"`` will + exclude the ``"street"`` field from a nested ``"address"`` model. + - 'exclude' mutually exclusive with 'include' - specifying both values will raise an + ``ImproperlyConfiguredException``. + """ + include: AbstractSet[str] = field(default_factory=set) + """Explicitly include fields in the generated DTO. + + If include is specified, all fields not specified in include will be excluded by default. + + Notes: + - The field names are dot-separated paths to nested fields, e.g. ``"address.street"`` will + include the ``"street"`` field from a nested ``"address"`` model. + - 'include' mutually exclusive with 'exclude' - specifying both values will raise an + ``ImproperlyConfiguredException``. + """ + rename_fields: dict[str, str] = field(default_factory=dict) + """Mapping of field names, to new name.""" + rename_strategy: RenameStrategy | None = None + """Rename all fields using a pre-defined strategy or a custom strategy. + + The pre-defined strategies are: `upper`, `lower`, `camel`, `pascal`. + + A custom strategy is any callable that accepts a string as an argument and + return a string. + + Fields defined in ``rename_fields`` are ignored.""" + max_nested_depth: int = 1 + """The maximum depth of nested items allowed for data transfer.""" + partial: bool = False + """Allow transfer of partial data.""" + underscore_fields_private: bool = True + """Fields starting with an underscore are considered private and excluded from data transfer.""" + experimental_codegen_backend: bool | None = None + """Use the experimental codegen backend""" + + def __post_init__(self) -> None: + if self.include and self.exclude: + raise ImproperlyConfiguredException( + "'include' and 'exclude' are mutually exclusive options, please use one of them" + ) diff --git a/venv/lib/python3.11/site-packages/litestar/dto/data_structures.py b/venv/lib/python3.11/site-packages/litestar/dto/data_structures.py new file mode 100644 index 0000000..a5c3386 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/data_structures.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Generic, TypeVar + +from litestar.typing import FieldDefinition + +if TYPE_CHECKING: + from typing import Any, Callable + + from litestar.dto import DTOField + from litestar.dto._backend import DTOBackend + +T = TypeVar("T") + + +class DTOData(Generic[T]): + """DTO validated data and utility methods.""" + + __slots__ = ("_backend", "_data_as_builtins") + + def __init__(self, backend: DTOBackend, data_as_builtins: Any) -> None: + self._backend = backend + self._data_as_builtins = data_as_builtins + + def create_instance(self, **kwargs: Any) -> T: + """Create an instance of the DTO validated data. + + Args: + **kwargs: Additional data to create the instance with. Takes precedence over DTO validated data. + """ + data = dict(self._data_as_builtins) + for k, v in kwargs.items(): + _set_nested_dict_value(data, k.split("__"), v) + return self._backend.transfer_data_from_builtins(data) # type: ignore[no-any-return] + + def update_instance(self, instance: T, **kwargs: Any) -> T: + """Update an instance with the DTO validated data. + + Args: + instance: The instance to update. + **kwargs: Additional data to update the instance with. Takes precedence over DTO validated data. + """ + data = {**self._data_as_builtins, **kwargs} + for k, v in data.items(): + setattr(instance, k, v) + return instance + + def as_builtins(self) -> Any: + """Return the DTO validated data as builtins.""" + return self._data_as_builtins + + +def _set_nested_dict_value(d: dict[str, Any], keys: list[str], value: Any) -> None: + if len(keys) == 1: + d[keys[0]] = value + else: + key = keys[0] + d.setdefault(key, {}) + _set_nested_dict_value(d[key], keys[1:], value) + + +@dataclass(frozen=True) +class DTOFieldDefinition(FieldDefinition): + """A model field representation for purposes of generating a DTO backend model type.""" + + __slots__ = ( + "default_factory", + "dto_field", + "model_name", + ) + + model_name: str + """The name of the model for which the field is generated.""" + default_factory: Callable[[], Any] | None + """Default factory of the field.""" + dto_field: DTOField + """DTO field configuration.""" + + @classmethod + def from_field_definition( + cls, + field_definition: FieldDefinition, + model_name: str, + default_factory: Callable[[], Any] | None, + dto_field: DTOField, + ) -> DTOFieldDefinition: + """Create a :class:`FieldDefinition` from a :class:`FieldDefinition`. + + Args: + field_definition: A :class:`FieldDefinition` to create a :class:`FieldDefinition` from. + model_name: The name of the model. + default_factory: Default factory function, if any. + dto_field: DTOField instance. + + Returns: + A :class:`FieldDefinition` instance. + """ + return DTOFieldDefinition( + annotation=field_definition.annotation, + args=field_definition.args, + default=field_definition.default, + default_factory=default_factory, + dto_field=dto_field, + extra=field_definition.extra, + inner_types=field_definition.inner_types, + instantiable_origin=field_definition.instantiable_origin, + kwarg_definition=field_definition.kwarg_definition, + metadata=field_definition.metadata, + model_name=model_name, + name=field_definition.name, + origin=field_definition.origin, + raw=field_definition.raw, + safe_generic_origin=field_definition.safe_generic_origin, + type_wrappers=field_definition.type_wrappers, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/dto/dataclass_dto.py b/venv/lib/python3.11/site-packages/litestar/dto/dataclass_dto.py new file mode 100644 index 0000000..554b0f3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/dataclass_dto.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +from dataclasses import MISSING, fields, replace +from typing import TYPE_CHECKING, Generic, TypeVar + +from litestar.dto.base_dto import AbstractDTO +from litestar.dto.data_structures import DTOFieldDefinition +from litestar.dto.field import DTO_FIELD_META_KEY, DTOField +from litestar.params import DependencyKwarg, KwargDefinition +from litestar.types.empty import Empty + +if TYPE_CHECKING: + from typing import Collection, Generator + + from litestar.types.protocols import DataclassProtocol + from litestar.typing import FieldDefinition + + +__all__ = ("DataclassDTO", "T") + +T = TypeVar("T", bound="DataclassProtocol | Collection[DataclassProtocol]") +AnyDataclass = TypeVar("AnyDataclass", bound="DataclassProtocol") + + +class DataclassDTO(AbstractDTO[T], Generic[T]): + """Support for domain modelling with dataclasses.""" + + @classmethod + def generate_field_definitions( + cls, model_type: type[DataclassProtocol] + ) -> Generator[DTOFieldDefinition, None, None]: + dc_fields = {f.name: f for f in fields(model_type)} + for key, field_definition in cls.get_model_type_hints(model_type).items(): + if not (dc_field := dc_fields.get(key)): + continue + + default = dc_field.default if dc_field.default is not MISSING else Empty + default_factory = dc_field.default_factory if dc_field.default_factory is not MISSING else None + field_defintion = replace( + DTOFieldDefinition.from_field_definition( + field_definition=field_definition, + default_factory=default_factory, + dto_field=dc_field.metadata.get(DTO_FIELD_META_KEY, DTOField()), + model_name=model_type.__name__, + ), + name=key, + default=default, + ) + + yield ( + replace(field_defintion, default=Empty, kwarg_definition=default) + if isinstance(default, (KwargDefinition, DependencyKwarg)) + else field_defintion + ) + + @classmethod + def detect_nested_field(cls, field_definition: FieldDefinition) -> bool: + return hasattr(field_definition.annotation, "__dataclass_fields__") diff --git a/venv/lib/python3.11/site-packages/litestar/dto/field.py b/venv/lib/python3.11/site-packages/litestar/dto/field.py new file mode 100644 index 0000000..7ef8a39 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/field.py @@ -0,0 +1,50 @@ +"""DTO domain types.""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Literal + +__all__ = ( + "DTO_FIELD_META_KEY", + "DTOField", + "Mark", + "dto_field", +) + +DTO_FIELD_META_KEY = "__dto__" + + +class Mark(str, Enum): + """For marking field definitions on domain models.""" + + READ_ONLY = "read-only" + """To mark a field that can be read, but not updated by clients.""" + WRITE_ONLY = "write-only" + """To mark a field that can be written to, but not read by clients.""" + PRIVATE = "private" + """To mark a field that can neither be read or updated by clients.""" + + +@dataclass +class DTOField: + """For configuring DTO behavior on model fields.""" + + mark: Mark | Literal["read-only", "write-only", "private"] | None = None + """Mark the field as read-only, or private.""" + + +def dto_field(mark: Literal["read-only", "write-only", "private"] | Mark) -> dict[str, DTOField]: + """Create a field metadata mapping. + + Args: + mark: A DTO mark for the field, e.g., "read-only". + + Returns: + A dict for setting as field metadata, such as the dataclass "metadata" field key, or the SQLAlchemy "info" + field. + + Marking a field automates its inclusion/exclusion from DTO field definitions, depending on the DTO's purpose. + """ + return {DTO_FIELD_META_KEY: DTOField(mark=Mark(mark))} diff --git a/venv/lib/python3.11/site-packages/litestar/dto/msgspec_dto.py b/venv/lib/python3.11/site-packages/litestar/dto/msgspec_dto.py new file mode 100644 index 0000000..826a1d2 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/msgspec_dto.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from dataclasses import replace +from typing import TYPE_CHECKING, Generic, TypeVar + +from msgspec import NODEFAULT, Struct, structs + +from litestar.dto.base_dto import AbstractDTO +from litestar.dto.data_structures import DTOFieldDefinition +from litestar.dto.field import DTO_FIELD_META_KEY, DTOField +from litestar.types.empty import Empty + +if TYPE_CHECKING: + from typing import Any, Collection, Generator + + from litestar.typing import FieldDefinition + + +__all__ = ("MsgspecDTO",) + +T = TypeVar("T", bound="Struct | Collection[Struct]") + + +class MsgspecDTO(AbstractDTO[T], Generic[T]): + """Support for domain modelling with Msgspec.""" + + @classmethod + def generate_field_definitions(cls, model_type: type[Struct]) -> Generator[DTOFieldDefinition, None, None]: + msgspec_fields = {f.name: f for f in structs.fields(model_type)} + + def default_or_empty(value: Any) -> Any: + return Empty if value is NODEFAULT else value + + def default_or_none(value: Any) -> Any: + return None if value is NODEFAULT else value + + for key, field_definition in cls.get_model_type_hints(model_type).items(): + msgspec_field = msgspec_fields[key] + dto_field = (field_definition.extra or {}).pop(DTO_FIELD_META_KEY, DTOField()) + + yield replace( + DTOFieldDefinition.from_field_definition( + field_definition=field_definition, + dto_field=dto_field, + model_name=model_type.__name__, + default_factory=default_or_none(msgspec_field.default_factory), + ), + default=default_or_empty(msgspec_field.default), + name=key, + ) + + @classmethod + def detect_nested_field(cls, field_definition: FieldDefinition) -> bool: + return field_definition.is_subclass_of(Struct) diff --git a/venv/lib/python3.11/site-packages/litestar/dto/types.py b/venv/lib/python3.11/site-packages/litestar/dto/types.py new file mode 100644 index 0000000..f154e49 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/dto/types.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Callable, Literal + + from typing_extensions import TypeAlias + +__all__ = ("RenameStrategy",) + +RenameStrategy: TypeAlias = 'Literal["lower", "upper", "camel", "pascal", "kebab"] | Callable[[str], str]' +"""A pre-defined strategy or a custom callback for converting DTO field names.""" |