diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/_openapi | |
parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/_openapi')
44 files changed, 2914 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/__init__.py b/venv/lib/python3.11/site-packages/litestar/_openapi/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/__init__.py diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..e090039 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/datastructures.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/datastructures.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..9dd5b10 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/datastructures.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/parameters.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/parameters.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..1a17187 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/parameters.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/path_item.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/path_item.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..a440e7d --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/path_item.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/plugin.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/plugin.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..13b3ab3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/plugin.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/request_body.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/request_body.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..2f093bf --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/request_body.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/responses.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/responses.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..10d877b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/responses.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/utils.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/utils.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..97c7c13 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/__pycache__/utils.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/datastructures.py b/venv/lib/python3.11/site-packages/litestar/_openapi/datastructures.py new file mode 100644 index 0000000..d97c8db --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/datastructures.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +from collections import defaultdict +from typing import TYPE_CHECKING, Iterator, Sequence + +from litestar.exceptions import ImproperlyConfiguredException +from litestar.openapi.spec import Reference, Schema + +if TYPE_CHECKING: + from litestar.openapi import OpenAPIConfig + from litestar.plugins import OpenAPISchemaPluginProtocol + + +class RegisteredSchema: + """Object to store a schema and any references to it.""" + + def __init__(self, key: tuple[str, ...], schema: Schema, references: list[Reference]) -> None: + """Create a new RegisteredSchema object. + + Args: + key: The key used to register the schema. + schema: The schema object. + references: A list of references to the schema. + """ + self.key = key + self.schema = schema + self.references = references + + +class SchemaRegistry: + """A registry for object schemas. + + This class is used to store schemas that we reference from other parts of the spec. + + Its main purpose is to allow us to generate the components/schemas section of the spec once we have + collected all the schemas that should be included. + + This allows us to determine a path to the schema in the components/schemas section of the spec that + is unique and as short as possible. + """ + + def __init__(self) -> None: + self._schema_key_map: dict[tuple[str, ...], RegisteredSchema] = {} + self._schema_reference_map: dict[int, RegisteredSchema] = {} + self._model_name_groups: defaultdict[str, list[RegisteredSchema]] = defaultdict(list) + + def get_schema_for_key(self, key: tuple[str, ...]) -> Schema: + """Get a registered schema by its key. + + Args: + key: The key to the schema to get. + + Returns: + A RegisteredSchema object. + """ + if key not in self._schema_key_map: + self._schema_key_map[key] = registered_schema = RegisteredSchema(key, Schema(), []) + self._model_name_groups[key[-1]].append(registered_schema) + return self._schema_key_map[key].schema + + def get_reference_for_key(self, key: tuple[str, ...]) -> Reference | None: + """Get a reference to a registered schema by its key. + + Args: + key: The key to the schema to get. + + Returns: + A Reference object. + """ + if key not in self._schema_key_map: + return None + registered_schema = self._schema_key_map[key] + reference = Reference(f"#/components/schemas/{'_'.join(key)}") + registered_schema.references.append(reference) + self._schema_reference_map[id(reference)] = registered_schema + return reference + + def from_reference(self, reference: Reference) -> RegisteredSchema: + """Get a registered schema by its reference. + + Args: + reference: The reference to the schema to get. + + Returns: + A RegisteredSchema object. + """ + return self._schema_reference_map[id(reference)] + + def __iter__(self) -> Iterator[RegisteredSchema]: + """Iterate over the registered schemas.""" + return iter(self._schema_key_map.values()) + + @staticmethod + def set_reference_paths(name: str, registered_schema: RegisteredSchema) -> None: + """Set the reference paths for a registered schema.""" + for reference in registered_schema.references: + reference.ref = f"#/components/schemas/{name}" + + @staticmethod + def remove_common_prefix(tuples: list[tuple[str, ...]]) -> list[tuple[str, ...]]: + """Remove the common prefix from a list of tuples. + + Args: + tuples: A list of tuples to remove the common prefix from. + + Returns: + A list of tuples with the common prefix removed. + """ + + def longest_common_prefix(tuples_: list[tuple[str, ...]]) -> tuple[str, ...]: + """Find the longest common prefix of a list of tuples. + + Args: + tuples_: A list of tuples to find the longest common prefix of. + + Returns: + The longest common prefix of the tuples. + """ + prefix_ = tuples_[0] + for t in tuples_: + # Compare the current prefix with each tuple and shorten it + prefix_ = prefix_[: min(len(prefix_), len(t))] + for i in range(len(prefix_)): + if prefix_[i] != t[i]: + prefix_ = prefix_[:i] + break + return prefix_ + + prefix = longest_common_prefix(tuples) + prefix_length = len(prefix) + return [t[prefix_length:] for t in tuples] + + def generate_components_schemas(self) -> dict[str, Schema]: + """Generate the components/schemas section of the spec. + + Returns: + A dictionary of schemas. + """ + components_schemas: dict[str, Schema] = {} + + for name, name_group in self._model_name_groups.items(): + if len(name_group) == 1: + self.set_reference_paths(name, name_group[0]) + components_schemas[name] = name_group[0].schema + continue + + full_keys = [registered_schema.key for registered_schema in name_group] + names = ["_".join(k) for k in self.remove_common_prefix(full_keys)] + for name_, registered_schema in zip(names, name_group): + self.set_reference_paths(name_, registered_schema) + components_schemas[name_] = registered_schema.schema + + # Sort them by name to ensure they're always generated in the same order. + return {name: components_schemas[name] for name in sorted(components_schemas.keys())} + + +class OpenAPIContext: + def __init__( + self, + openapi_config: OpenAPIConfig, + plugins: Sequence[OpenAPISchemaPluginProtocol], + ) -> None: + self.openapi_config = openapi_config + self.plugins = plugins + self.operation_ids: set[str] = set() + self.schema_registry = SchemaRegistry() + + def add_operation_id(self, operation_id: str) -> None: + """Add an operation ID to the context. + + Args: + operation_id: Operation ID to add. + """ + if operation_id in self.operation_ids: + raise ImproperlyConfiguredException( + "operation_ids must be unique, " + f"please ensure the value of 'operation_id' is either not set or unique for {operation_id}" + ) + self.operation_ids.add(operation_id) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/parameters.py b/venv/lib/python3.11/site-packages/litestar/_openapi/parameters.py new file mode 100644 index 0000000..c3da5c4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/parameters.py @@ -0,0 +1,233 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from litestar._openapi.schema_generation import SchemaCreator +from litestar._openapi.schema_generation.utils import get_formatted_examples +from litestar.constants import RESERVED_KWARGS +from litestar.enums import ParamType +from litestar.exceptions import ImproperlyConfiguredException +from litestar.openapi.spec.parameter import Parameter +from litestar.openapi.spec.schema import Schema +from litestar.params import DependencyKwarg, ParameterKwarg +from litestar.types import Empty +from litestar.typing import FieldDefinition + +if TYPE_CHECKING: + from litestar._openapi.datastructures import OpenAPIContext + from litestar.handlers.base import BaseRouteHandler + from litestar.openapi.spec import Reference + from litestar.types.internal_types import PathParameterDefinition + +__all__ = ("create_parameters_for_handler",) + + +class ParameterCollection: + """Facilitates conditional deduplication of parameters. + + If multiple parameters with the same name are produced for a handler, the condition is ignored if the two + ``Parameter`` instances are the same (the first is retained and any duplicates are ignored). If the ``Parameter`` + instances are not the same, an exception is raised. + """ + + def __init__(self, route_handler: BaseRouteHandler) -> None: + """Initialize ``ParameterCollection``. + + Args: + route_handler: Associated route handler + """ + self.route_handler = route_handler + self._parameters: dict[tuple[str, str], Parameter] = {} + + def add(self, parameter: Parameter) -> None: + """Add a ``Parameter`` to the collection. + + If an existing parameter with the same name and type already exists, the + parameter is ignored. + + If an existing parameter with the same name but different type exists, raises + ``ImproperlyConfiguredException``. + """ + + if (parameter.name, parameter.param_in) not in self._parameters: + # because we are defining routes as unique per path, we have to handle here a situation when there is an optional + # path parameter. e.g. get(path=["/", "/{param:str}"]). When parsing the parameter for path, the route handler + # would still have a kwarg called param: + # def handler(param: str | None) -> ... + if parameter.param_in != ParamType.QUERY or all( + f"{{{parameter.name}:" not in path for path in self.route_handler.paths + ): + self._parameters[(parameter.name, parameter.param_in)] = parameter + return + + pre_existing = self._parameters[(parameter.name, parameter.param_in)] + if parameter == pre_existing: + return + + raise ImproperlyConfiguredException( + f"OpenAPI schema generation for handler `{self.route_handler}` detected multiple parameters named " + f"'{parameter.name}' with different types." + ) + + def list(self) -> list[Parameter]: + """Return a list of all ``Parameter``'s in the collection.""" + return list(self._parameters.values()) + + +class ParameterFactory: + """Factory for creating OpenAPI Parameters for a given route handler.""" + + def __init__( + self, + context: OpenAPIContext, + route_handler: BaseRouteHandler, + path_parameters: tuple[PathParameterDefinition, ...], + ) -> None: + """Initialize ParameterFactory. + + Args: + context: The OpenAPI context. + route_handler: The route handler. + path_parameters: The path parameters for the route. + """ + self.context = context + self.schema_creator = SchemaCreator.from_openapi_context(self.context, prefer_alias=True) + self.route_handler = route_handler + self.parameters = ParameterCollection(route_handler) + self.dependency_providers = route_handler.resolve_dependencies() + self.layered_parameters = route_handler.resolve_layered_parameters() + self.path_parameters_names = {p.name for p in path_parameters} + + def create_parameter(self, field_definition: FieldDefinition, parameter_name: str) -> Parameter: + """Create an OpenAPI Parameter instance for a field definition. + + Args: + field_definition: The field definition. + parameter_name: The name of the parameter. + """ + + result: Schema | Reference | None = None + kwarg_definition = ( + field_definition.kwarg_definition if isinstance(field_definition.kwarg_definition, ParameterKwarg) else None + ) + + if parameter_name in self.path_parameters_names: + param_in = ParamType.PATH + is_required = True + result = self.schema_creator.for_field_definition(field_definition) + elif kwarg_definition and kwarg_definition.header: + parameter_name = kwarg_definition.header + param_in = ParamType.HEADER + is_required = field_definition.is_required + elif kwarg_definition and kwarg_definition.cookie: + parameter_name = kwarg_definition.cookie + param_in = ParamType.COOKIE + is_required = field_definition.is_required + else: + is_required = field_definition.is_required + param_in = ParamType.QUERY + parameter_name = kwarg_definition.query if kwarg_definition and kwarg_definition.query else parameter_name + + if not result: + result = self.schema_creator.for_field_definition(field_definition) + + schema = result if isinstance(result, Schema) else self.context.schema_registry.from_reference(result).schema + + examples_list = kwarg_definition.examples or [] if kwarg_definition else [] + examples = get_formatted_examples(field_definition, examples_list) + + return Parameter( + description=schema.description, + name=parameter_name, + param_in=param_in, + required=is_required, + schema=result, + examples=examples or None, + ) + + def get_layered_parameter(self, field_name: str, field_definition: FieldDefinition) -> Parameter: + """Create a parameter for a field definition that has a KwargDefinition defined on the layers. + + Args: + field_name: The name of the field. + field_definition: The field definition. + """ + layer_field = self.layered_parameters[field_name] + + field = field_definition if field_definition.is_parameter_field else layer_field + default = layer_field.default if field_definition.has_default else field_definition.default + annotation = field_definition.annotation if field_definition is not Empty else layer_field.annotation + + parameter_name = field_name + if isinstance(field.kwarg_definition, ParameterKwarg): + parameter_name = ( + field.kwarg_definition.query + or field.kwarg_definition.header + or field.kwarg_definition.cookie + or field_name + ) + + field_definition = FieldDefinition.from_kwarg( + inner_types=field.inner_types, + default=default, + extra=field.extra, + annotation=annotation, + kwarg_definition=field.kwarg_definition, + name=field_name, + ) + return self.create_parameter(field_definition=field_definition, parameter_name=parameter_name) + + def create_parameters_for_field_definitions(self, fields: dict[str, FieldDefinition]) -> None: + """Add Parameter models to the handler's collection for the given field definitions. + + Args: + fields: The field definitions. + """ + unique_handler_fields = ( + (k, v) for k, v in fields.items() if k not in RESERVED_KWARGS and k not in self.layered_parameters + ) + unique_layered_fields = ( + (k, v) for k, v in self.layered_parameters.items() if k not in RESERVED_KWARGS and k not in fields + ) + intersection_fields = ( + (k, v) for k, v in fields.items() if k not in RESERVED_KWARGS and k in self.layered_parameters + ) + + for field_name, field_definition in unique_handler_fields: + if ( + isinstance(field_definition.kwarg_definition, DependencyKwarg) + and field_name not in self.dependency_providers + ): + # never document explicit dependencies + continue + + if provider := self.dependency_providers.get(field_name): + self.create_parameters_for_field_definitions(fields=provider.parsed_fn_signature.parameters) + else: + self.parameters.add(self.create_parameter(field_definition=field_definition, parameter_name=field_name)) + + for field_name, field_definition in unique_layered_fields: + self.parameters.add(self.create_parameter(field_definition=field_definition, parameter_name=field_name)) + + for field_name, field_definition in intersection_fields: + self.parameters.add(self.get_layered_parameter(field_name=field_name, field_definition=field_definition)) + + def create_parameters_for_handler(self) -> list[Parameter]: + """Create a list of path/query/header Parameter models for the given PathHandler.""" + handler_fields = self.route_handler.parsed_fn_signature.parameters + self.create_parameters_for_field_definitions(handler_fields) + return self.parameters.list() + + +def create_parameters_for_handler( + context: OpenAPIContext, + route_handler: BaseRouteHandler, + path_parameters: tuple[PathParameterDefinition, ...], +) -> list[Parameter]: + """Create a list of path/query/header Parameter models for the given PathHandler.""" + factory = ParameterFactory( + context=context, + route_handler=route_handler, + path_parameters=path_parameters, + ) + return factory.create_parameters_for_handler() diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/path_item.py b/venv/lib/python3.11/site-packages/litestar/_openapi/path_item.py new file mode 100644 index 0000000..74a04ce --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/path_item.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +from inspect import cleandoc +from typing import TYPE_CHECKING + +from litestar._openapi.parameters import create_parameters_for_handler +from litestar._openapi.request_body import create_request_body +from litestar._openapi.responses import create_responses_for_handler +from litestar._openapi.utils import SEPARATORS_CLEANUP_PATTERN +from litestar.enums import HttpMethod +from litestar.openapi.spec import Operation, PathItem +from litestar.utils.helpers import unwrap_partial + +if TYPE_CHECKING: + from litestar._openapi.datastructures import OpenAPIContext + from litestar.handlers.http_handlers import HTTPRouteHandler + from litestar.routes import HTTPRoute + +__all__ = ("create_path_item_for_route",) + + +class PathItemFactory: + """Factory for creating a PathItem instance for a given route.""" + + def __init__(self, openapi_context: OpenAPIContext, route: HTTPRoute) -> None: + self.context = openapi_context + self.route = route + self._path_item = PathItem() + + def create_path_item(self) -> PathItem: + """Create a PathItem for the given route parsing all http_methods into Operation Models. + + Returns: + A PathItem instance. + """ + for http_method, handler_tuple in self.route.route_handler_map.items(): + route_handler, _ = handler_tuple + + if not route_handler.resolve_include_in_schema(): + continue + + operation = self.create_operation_for_handler_method(route_handler, HttpMethod(http_method)) + + setattr(self._path_item, http_method.lower(), operation) + + return self._path_item + + def create_operation_for_handler_method( + self, route_handler: HTTPRouteHandler, http_method: HttpMethod + ) -> Operation: + """Create an Operation instance for a given route handler and http method. + + Args: + route_handler: A route handler instance. + http_method: An HttpMethod enum value. + + Returns: + An Operation instance. + """ + operation_id = self.create_operation_id(route_handler, http_method) + parameters = create_parameters_for_handler(self.context, route_handler, self.route.path_parameters) + signature_fields = route_handler.parsed_fn_signature.parameters + + request_body = None + if data_field := signature_fields.get("data"): + request_body = create_request_body( + self.context, route_handler.handler_id, route_handler.resolve_data_dto(), data_field + ) + + raises_validation_error = bool(data_field or self._path_item.parameters or parameters) + responses = create_responses_for_handler( + self.context, route_handler, raises_validation_error=raises_validation_error + ) + + return route_handler.operation_class( + operation_id=operation_id, + tags=route_handler.resolve_tags() or None, + summary=route_handler.summary or SEPARATORS_CLEANUP_PATTERN.sub("", route_handler.handler_name.title()), + description=self.create_description_for_handler(route_handler), + deprecated=route_handler.deprecated, + responses=responses, + request_body=request_body, + parameters=parameters or None, # type: ignore[arg-type] + security=route_handler.resolve_security() or None, + ) + + def create_operation_id(self, route_handler: HTTPRouteHandler, http_method: HttpMethod) -> str: + """Create an operation id for a given route handler and http method. + + Adds the operation id to the context's operation id set, where it is checked for uniqueness. + + Args: + route_handler: A route handler instance. + http_method: An HttpMethod enum value. + + Returns: + An operation id string. + """ + if isinstance(route_handler.operation_id, str): + operation_id = route_handler.operation_id + elif callable(route_handler.operation_id): + operation_id = route_handler.operation_id(route_handler, http_method, self.route.path_components) + else: + operation_id = self.context.openapi_config.operation_id_creator( + route_handler, http_method, self.route.path_components + ) + self.context.add_operation_id(operation_id) + return operation_id + + def create_description_for_handler(self, route_handler: HTTPRouteHandler) -> str | None: + """Produce the operation description for a route handler. + + Args: + route_handler: A route handler instance. + + Returns: + An optional description string + """ + handler_description = route_handler.description + if handler_description is None and self.context.openapi_config.use_handler_docstrings: + fn = unwrap_partial(route_handler.fn) + return cleandoc(fn.__doc__) if fn.__doc__ else None + return handler_description + + +def create_path_item_for_route(openapi_context: OpenAPIContext, route: HTTPRoute) -> PathItem: + """Create a PathItem for the given route parsing all http_methods into Operation Models. + + Args: + openapi_context: The OpenAPIContext instance. + route: The route to create a PathItem for. + + Returns: + A PathItem instance. + """ + path_item_factory = PathItemFactory(openapi_context, route) + return path_item_factory.create_path_item() diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/plugin.py b/venv/lib/python3.11/site-packages/litestar/_openapi/plugin.py new file mode 100644 index 0000000..9bdbdec --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/plugin.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from litestar._openapi.datastructures import OpenAPIContext +from litestar._openapi.path_item import create_path_item_for_route +from litestar.exceptions import ImproperlyConfiguredException +from litestar.plugins import InitPluginProtocol +from litestar.plugins.base import ReceiveRoutePlugin +from litestar.routes import HTTPRoute + +if TYPE_CHECKING: + from litestar.app import Litestar + from litestar.config.app import AppConfig + from litestar.openapi.config import OpenAPIConfig + from litestar.openapi.spec import OpenAPI + from litestar.routes import BaseRoute + + +class OpenAPIPlugin(InitPluginProtocol, ReceiveRoutePlugin): + __slots__ = ( + "app", + "included_routes", + "_openapi_config", + "_openapi_schema", + ) + + def __init__(self, app: Litestar) -> None: + self.app = app + self.included_routes: dict[str, HTTPRoute] = {} + self._openapi_config: OpenAPIConfig | None = None + self._openapi_schema: OpenAPI | None = None + + def _build_openapi_schema(self) -> OpenAPI: + openapi_config = self.openapi_config + + if openapi_config.create_examples: + from litestar._openapi.schema_generation.examples import ExampleFactory + + ExampleFactory.seed_random(openapi_config.random_seed) + + openapi = openapi_config.to_openapi_schema() + context = OpenAPIContext(openapi_config=openapi_config, plugins=self.app.plugins.openapi) + openapi.paths = { + route.path_format or "/": create_path_item_for_route(context, route) + for route in self.included_routes.values() + } + openapi.components.schemas = context.schema_registry.generate_components_schemas() + return openapi + + def provide_openapi(self) -> OpenAPI: + if not self._openapi_schema: + self._openapi_schema = self._build_openapi_schema() + return self._openapi_schema + + def on_app_init(self, app_config: AppConfig) -> AppConfig: + if app_config.openapi_config: + self._openapi_config = app_config.openapi_config + app_config.route_handlers.append(self.openapi_config.openapi_controller) + return app_config + + @property + def openapi_config(self) -> OpenAPIConfig: + if not self._openapi_config: + raise ImproperlyConfiguredException("OpenAPIConfig not initialized") + return self._openapi_config + + def receive_route(self, route: BaseRoute) -> None: + if not isinstance(route, HTTPRoute): + return + + if any(route_handler.resolve_include_in_schema() for route_handler, _ in route.route_handler_map.values()): + # Force recompute the schema if a new route is added + self._openapi_schema = None + self.included_routes[route.path] = route diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/request_body.py b/venv/lib/python3.11/site-packages/litestar/_openapi/request_body.py new file mode 100644 index 0000000..7a5cf37 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/request_body.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from litestar._openapi.schema_generation import SchemaCreator +from litestar.enums import RequestEncodingType +from litestar.openapi.spec.media_type import OpenAPIMediaType +from litestar.openapi.spec.request_body import RequestBody +from litestar.params import BodyKwarg + +__all__ = ("create_request_body",) + + +if TYPE_CHECKING: + from litestar._openapi.datastructures import OpenAPIContext + from litestar.dto import AbstractDTO + from litestar.typing import FieldDefinition + + +def create_request_body( + context: OpenAPIContext, + handler_id: str, + resolved_data_dto: type[AbstractDTO] | None, + data_field: FieldDefinition, +) -> RequestBody: + """Create a RequestBody instance for the given route handler's data field. + + Args: + context: The OpenAPIContext instance. + handler_id: The handler id. + resolved_data_dto: The resolved data dto. + data_field: The data field. + + Returns: + A RequestBody instance. + """ + media_type: RequestEncodingType | str = RequestEncodingType.JSON + schema_creator = SchemaCreator.from_openapi_context(context, prefer_alias=True) + if isinstance(data_field.kwarg_definition, BodyKwarg) and data_field.kwarg_definition.media_type: + media_type = data_field.kwarg_definition.media_type + + if resolved_data_dto: + schema = resolved_data_dto.create_openapi_schema( + field_definition=data_field, + handler_id=handler_id, + schema_creator=schema_creator, + ) + else: + schema = schema_creator.for_field_definition(data_field) + + return RequestBody(required=True, content={media_type: OpenAPIMediaType(schema=schema)}) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/responses.py b/venv/lib/python3.11/site-packages/litestar/_openapi/responses.py new file mode 100644 index 0000000..6b0f312 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/responses.py @@ -0,0 +1,335 @@ +from __future__ import annotations + +import contextlib +import re +from copy import copy +from dataclasses import asdict +from http import HTTPStatus +from operator import attrgetter +from typing import TYPE_CHECKING, Any, Iterator + +from litestar._openapi.schema_generation import SchemaCreator +from litestar._openapi.schema_generation.utils import get_formatted_examples +from litestar.enums import MediaType +from litestar.exceptions import HTTPException, ValidationException +from litestar.openapi.spec import Example, OpenAPIResponse, Reference +from litestar.openapi.spec.enums import OpenAPIFormat, OpenAPIType +from litestar.openapi.spec.header import OpenAPIHeader +from litestar.openapi.spec.media_type import OpenAPIMediaType +from litestar.openapi.spec.schema import Schema +from litestar.response import ( + File, + Redirect, + Stream, + Template, +) +from litestar.response import ( + Response as LitestarResponse, +) +from litestar.response.base import ASGIResponse +from litestar.types.builtin_types import NoneType +from litestar.typing import FieldDefinition +from litestar.utils import get_enum_string_value, get_name + +if TYPE_CHECKING: + from litestar._openapi.datastructures import OpenAPIContext + from litestar.datastructures.cookie import Cookie + from litestar.handlers.http_handlers import HTTPRouteHandler + from litestar.openapi.spec.responses import Responses + + +__all__ = ("create_responses_for_handler",) + +CAPITAL_LETTERS_PATTERN = re.compile(r"(?=[A-Z])") + + +def pascal_case_to_text(string: str) -> str: + """Given a 'PascalCased' string, return its split form- 'Pascal Cased'.""" + return " ".join(re.split(CAPITAL_LETTERS_PATTERN, string)).strip() + + +def create_cookie_schema(cookie: Cookie) -> Schema: + """Given a Cookie instance, return its corresponding OpenAPI schema. + + Args: + cookie: Cookie + + Returns: + Schema + """ + cookie_copy = copy(cookie) + cookie_copy.value = "<string>" + value = cookie_copy.to_header(header="") + return Schema(description=cookie.description or "", example=value) + + +class ResponseFactory: + """Factory for creating a Response instance for a given route handler.""" + + def __init__(self, context: OpenAPIContext, route_handler: HTTPRouteHandler) -> None: + """Initialize the factory. + + Args: + context: An OpenAPIContext instance. + route_handler: An HTTPRouteHandler instance. + """ + self.context = context + self.route_handler = route_handler + self.field_definition = route_handler.parsed_fn_signature.return_type + self.schema_creator = SchemaCreator.from_openapi_context(context, prefer_alias=False) + + def create_responses(self, raises_validation_error: bool) -> Responses | None: + """Create the schema for responses, if any. + + Args: + raises_validation_error: Boolean flag indicating whether the handler raises a ValidationException. + + Returns: + Responses + """ + responses: Responses = { + str(self.route_handler.status_code): self.create_success_response(), + } + + exceptions = list(self.route_handler.raises or []) + if raises_validation_error and ValidationException not in exceptions: + exceptions.append(ValidationException) + + for status_code, response in create_error_responses(exceptions=exceptions): + responses[status_code] = response + + for status_code, response in self.create_additional_responses(): + responses[status_code] = response + + return responses or None + + def create_description(self) -> str: + """Create the description for a success response.""" + default_descriptions: dict[Any, str] = { + Stream: "Stream Response", + Redirect: "Redirect Response", + File: "File Download", + } + return ( + self.route_handler.response_description + or default_descriptions.get(self.field_definition.annotation) + or HTTPStatus(self.route_handler.status_code).description + ) + + def create_success_response(self) -> OpenAPIResponse: + """Create the schema for a success response.""" + if self.field_definition.is_subclass_of((NoneType, ASGIResponse)): + response = OpenAPIResponse(content=None, description=self.create_description()) + elif self.field_definition.is_subclass_of(Redirect): + response = self.create_redirect_response() + elif self.field_definition.is_subclass_of((File, Stream)): + response = self.create_file_response() + else: + media_type = self.route_handler.media_type + + if dto := self.route_handler.resolve_return_dto(): + result = dto.create_openapi_schema( + field_definition=self.field_definition, + handler_id=self.route_handler.handler_id, + schema_creator=self.schema_creator, + ) + else: + if self.field_definition.is_subclass_of(Template): + field_def = FieldDefinition.from_annotation(str) + media_type = media_type or MediaType.HTML + elif self.field_definition.is_subclass_of(LitestarResponse): + field_def = ( + self.field_definition.inner_types[0] + if self.field_definition.inner_types + else FieldDefinition.from_annotation(Any) + ) + media_type = media_type or MediaType.JSON + else: + field_def = self.field_definition + + result = self.schema_creator.for_field_definition(field_def) + + schema = ( + result if isinstance(result, Schema) else self.context.schema_registry.from_reference(result).schema + ) + schema.content_encoding = self.route_handler.content_encoding + schema.content_media_type = self.route_handler.content_media_type + response = OpenAPIResponse( + content={get_enum_string_value(media_type): OpenAPIMediaType(schema=result)}, + description=self.create_description(), + ) + self.set_success_response_headers(response) + return response + + def create_redirect_response(self) -> OpenAPIResponse: + """Create the schema for a redirect response.""" + return OpenAPIResponse( + content=None, + description=self.create_description(), + headers={ + "location": OpenAPIHeader( + schema=Schema(type=OpenAPIType.STRING), description="target path for the redirect" + ) + }, + ) + + def create_file_response(self) -> OpenAPIResponse: + """Create the schema for a file/stream response.""" + return OpenAPIResponse( + content={ + self.route_handler.media_type: OpenAPIMediaType( + schema=Schema( + type=OpenAPIType.STRING, + content_encoding=self.route_handler.content_encoding, + content_media_type=self.route_handler.content_media_type or "application/octet-stream", + ), + ) + }, + description=self.create_description(), + headers={ + "content-length": OpenAPIHeader( + schema=Schema(type=OpenAPIType.STRING), description="File size in bytes" + ), + "last-modified": OpenAPIHeader( + schema=Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.DATE_TIME), + description="Last modified data-time in RFC 2822 format", + ), + "etag": OpenAPIHeader(schema=Schema(type=OpenAPIType.STRING), description="Entity tag"), + }, + ) + + def set_success_response_headers(self, response: OpenAPIResponse) -> None: + """Set the schema for success response headers, if any.""" + + if response.headers is None: + response.headers = {} + + if not self.schema_creator.generate_examples: + schema_creator = self.schema_creator + else: + schema_creator = SchemaCreator.from_openapi_context(self.context, generate_examples=False) + + for response_header in self.route_handler.resolve_response_headers(): + header = OpenAPIHeader() + for attribute_name, attribute_value in ( + (k, v) for k, v in asdict(response_header).items() if v is not None + ): + if attribute_name == "value": + header.schema = schema_creator.for_field_definition( + FieldDefinition.from_annotation(type(attribute_value)) + ) + elif attribute_name != "documentation_only": + setattr(header, attribute_name, attribute_value) + + response.headers[response_header.name] = header + + if cookies := self.route_handler.resolve_response_cookies(): + response.headers["Set-Cookie"] = OpenAPIHeader( + schema=Schema( + all_of=[create_cookie_schema(cookie=cookie) for cookie in sorted(cookies, key=attrgetter("key"))] + ) + ) + + def create_additional_responses(self) -> Iterator[tuple[str, OpenAPIResponse]]: + """Create the schema for additional responses, if any.""" + if not self.route_handler.responses: + return + + for status_code, additional_response in self.route_handler.responses.items(): + schema_creator = SchemaCreator.from_openapi_context( + self.context, + prefer_alias=False, + generate_examples=additional_response.generate_examples, + ) + field_def = FieldDefinition.from_annotation(additional_response.data_container) + + examples: dict[str, Example | Reference] | None = ( + dict(get_formatted_examples(field_def, additional_response.examples)) + if additional_response.examples + else None + ) + + content: dict[str, OpenAPIMediaType] | None + if additional_response.data_container is not None: + schema = schema_creator.for_field_definition(field_def) + content = {additional_response.media_type: OpenAPIMediaType(schema=schema, examples=examples)} + else: + content = None + + yield ( + str(status_code), + OpenAPIResponse( + description=additional_response.description, + content=content, + ), + ) + + +def create_error_responses(exceptions: list[type[HTTPException]]) -> Iterator[tuple[str, OpenAPIResponse]]: + """Create the schema for error responses, if any.""" + grouped_exceptions: dict[int, list[type[HTTPException]]] = {} + for exc in exceptions: + if not grouped_exceptions.get(exc.status_code): + grouped_exceptions[exc.status_code] = [] + grouped_exceptions[exc.status_code].append(exc) + for status_code, exception_group in grouped_exceptions.items(): + exceptions_schemas = [] + group_description: str = "" + for exc in exception_group: + example_detail = "" + if hasattr(exc, "detail") and exc.detail: + group_description = exc.detail + example_detail = exc.detail + + if not example_detail: + with contextlib.suppress(Exception): + example_detail = HTTPStatus(status_code).phrase + + exceptions_schemas.append( + Schema( + type=OpenAPIType.OBJECT, + required=["detail", "status_code"], + properties={ + "status_code": Schema(type=OpenAPIType.INTEGER), + "detail": Schema(type=OpenAPIType.STRING), + "extra": Schema( + type=[OpenAPIType.NULL, OpenAPIType.OBJECT, OpenAPIType.ARRAY], + additional_properties=Schema(), + ), + }, + description=pascal_case_to_text(get_name(exc)), + examples=[{"status_code": status_code, "detail": example_detail, "extra": {}}], + ) + ) + if len(exceptions_schemas) > 1: # noqa: SIM108 + schema = Schema(one_of=exceptions_schemas) + else: + schema = exceptions_schemas[0] + + if not group_description: + with contextlib.suppress(Exception): + group_description = HTTPStatus(status_code).description + + yield ( + str(status_code), + OpenAPIResponse( + description=group_description, + content={MediaType.JSON: OpenAPIMediaType(schema=schema)}, + ), + ) + + +def create_responses_for_handler( + context: OpenAPIContext, route_handler: HTTPRouteHandler, raises_validation_error: bool +) -> Responses | None: + """Create the schema for responses, if any. + + Args: + context: An OpenAPIContext instance. + route_handler: An HTTPRouteHandler instance. + raises_validation_error: Boolean flag indicating whether the handler raises a ValidationException. + + Returns: + Responses + """ + return ResponseFactory(context, route_handler).create_responses(raises_validation_error=raises_validation_error) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__init__.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__init__.py new file mode 100644 index 0000000..8b9183e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__init__.py @@ -0,0 +1,7 @@ +from .plugins import openapi_schema_plugins +from .schema import SchemaCreator + +__all__ = ( + "SchemaCreator", + "openapi_schema_plugins", +) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..60cdb7e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/constrained_fields.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/constrained_fields.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..156b683 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/constrained_fields.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/examples.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/examples.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..1d1a327 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/examples.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/schema.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/schema.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..c382161 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/schema.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/utils.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/utils.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..8f1ed6b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/__pycache__/utils.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/constrained_fields.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/constrained_fields.py new file mode 100644 index 0000000..80f355d --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/constrained_fields.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from datetime import date, datetime, timezone +from re import Pattern +from typing import TYPE_CHECKING + +from litestar.openapi.spec.enums import OpenAPIFormat, OpenAPIType +from litestar.openapi.spec.schema import Schema + +if TYPE_CHECKING: + from decimal import Decimal + + from litestar.params import KwargDefinition + +__all__ = ( + "create_date_constrained_field_schema", + "create_numerical_constrained_field_schema", + "create_string_constrained_field_schema", +) + + +def create_numerical_constrained_field_schema( + field_type: type[int] | type[float] | type[Decimal], + kwarg_definition: KwargDefinition, +) -> Schema: + """Create Schema from Constrained Int/Float/Decimal field.""" + schema = Schema(type=OpenAPIType.INTEGER if issubclass(field_type, int) else OpenAPIType.NUMBER) + if kwarg_definition.le is not None: + schema.maximum = float(kwarg_definition.le) + if kwarg_definition.lt is not None: + schema.exclusive_maximum = float(kwarg_definition.lt) + if kwarg_definition.ge is not None: + schema.minimum = float(kwarg_definition.ge) + if kwarg_definition.gt is not None: + schema.exclusive_minimum = float(kwarg_definition.gt) + if kwarg_definition.multiple_of is not None: + schema.multiple_of = float(kwarg_definition.multiple_of) + return schema + + +def create_date_constrained_field_schema( + field_type: type[date] | type[datetime], + kwarg_definition: KwargDefinition, +) -> Schema: + """Create Schema from Constrained Date Field.""" + schema = Schema( + type=OpenAPIType.STRING, format=OpenAPIFormat.DATE if issubclass(field_type, date) else OpenAPIFormat.DATE_TIME + ) + for kwarg_definition_attr, schema_attr in [ + ("le", "maximum"), + ("lt", "exclusive_maximum"), + ("ge", "minimum"), + ("gt", "exclusive_minimum"), + ]: + if attr := getattr(kwarg_definition, kwarg_definition_attr): + setattr( + schema, + schema_attr, + datetime.combine( + datetime.fromtimestamp(attr, tz=timezone.utc) if isinstance(attr, (float, int)) else attr, + datetime.min.time(), + tzinfo=timezone.utc, + ).timestamp(), + ) + + return schema + + +def create_string_constrained_field_schema( + field_type: type[str] | type[bytes], + kwarg_definition: KwargDefinition, +) -> Schema: + """Create Schema from Constrained Str/Bytes field.""" + schema = Schema(type=OpenAPIType.STRING) + if issubclass(field_type, bytes): + schema.content_encoding = "utf-8" + if kwarg_definition.min_length: + schema.min_length = kwarg_definition.min_length + if kwarg_definition.max_length: + schema.max_length = kwarg_definition.max_length + if kwarg_definition.pattern: + schema.pattern = ( + kwarg_definition.pattern.pattern # type: ignore[attr-defined] + if isinstance(kwarg_definition.pattern, Pattern) # type: ignore[unreachable] + else kwarg_definition.pattern + ) + if kwarg_definition.lower_case: + schema.description = "must be in lower case" + if kwarg_definition.upper_case: + schema.description = "must be in upper case" + return schema diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/examples.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/examples.py new file mode 100644 index 0000000..49edf72 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/examples.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import typing +from dataclasses import replace +from decimal import Decimal +from enum import Enum +from typing import TYPE_CHECKING, Any + +import msgspec +from polyfactory.exceptions import ParameterException +from polyfactory.factories import DataclassFactory +from polyfactory.field_meta import FieldMeta, Null +from polyfactory.utils.helpers import unwrap_annotation +from polyfactory.utils.predicates import is_union +from typing_extensions import get_args + +from litestar.contrib.pydantic.utils import is_pydantic_model_instance +from litestar.openapi.spec import Example +from litestar.types import Empty + +if TYPE_CHECKING: + from litestar.typing import FieldDefinition + + +class ExampleFactory(DataclassFactory[Example]): + __model__ = Example + __random_seed__ = 10 + + +def _normalize_example_value(value: Any) -> Any: + """Normalize the example value to make it look a bit prettier.""" + # if UnsetType is part of the union, then it might get chosen as the value + # but that will not be properly serialized by msgspec unless it is for a field + # in a msgspec Struct + if is_union(value): + args = list(get_args(value)) + try: + args.remove(msgspec.UnsetType) + value = typing.Union[tuple(args)] # pyright: ignore + except ValueError: + # UnsetType not part of the Union + pass + + value = unwrap_annotation(annotation=value, random=ExampleFactory.__random__) + if isinstance(value, (Decimal, float)): + value = round(float(value), 2) + if isinstance(value, Enum): + value = value.value + if is_pydantic_model_instance(value): + from litestar.contrib.pydantic import _model_dump + + value = _model_dump(value) + if isinstance(value, (list, set)): + value = [_normalize_example_value(v) for v in value] + if isinstance(value, dict): + for k, v in value.items(): + value[k] = _normalize_example_value(v) + return value + + +def _create_field_meta(field: FieldDefinition) -> FieldMeta: + return FieldMeta.from_type( + annotation=field.annotation, + default=field.default if field.default is not Empty else Null, + name=field.name, + random=ExampleFactory.__random__, + ) + + +def create_examples_for_field(field: FieldDefinition) -> list[Example]: + """Create an OpenAPI Example instance. + + Args: + field: A signature field. + + Returns: + A list including a single example. + """ + try: + field_meta = _create_field_meta(replace(field, annotation=_normalize_example_value(field.annotation))) + value = ExampleFactory.get_field_value(field_meta) + return [Example(description=f"Example {field.name} value", value=value)] + except ParameterException: + return [] diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__init__.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__init__.py new file mode 100644 index 0000000..1b12b1e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__init__.py @@ -0,0 +1,16 @@ +from .dataclass import DataclassSchemaPlugin +from .pagination import PaginationSchemaPlugin +from .struct import StructSchemaPlugin +from .typed_dict import TypedDictSchemaPlugin + +__all__ = ("openapi_schema_plugins",) + +# NOTE: The Pagination type plugin has to come before the Dataclass plugin since the Pagination +# classes are dataclasses, but we want to handle them differently from how dataclasses are normally +# handled. +openapi_schema_plugins = [ + PaginationSchemaPlugin(), + StructSchemaPlugin(), + DataclassSchemaPlugin(), + TypedDictSchemaPlugin(), +] diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..f022bdd --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/dataclass.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/dataclass.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..7cbfe6f --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/dataclass.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/pagination.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/pagination.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..0bc4add --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/pagination.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/struct.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/struct.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..cef9d12 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/struct.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/typed_dict.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/typed_dict.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..3d534ba --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/__pycache__/typed_dict.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/dataclass.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/dataclass.py new file mode 100644 index 0000000..fb5da35 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/dataclass.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from dataclasses import MISSING, fields +from typing import TYPE_CHECKING + +from litestar.plugins import OpenAPISchemaPlugin +from litestar.typing import FieldDefinition +from litestar.utils.predicates import is_optional_union + +if TYPE_CHECKING: + from litestar._openapi.schema_generation import SchemaCreator + from litestar.openapi.spec import Schema + + +class DataclassSchemaPlugin(OpenAPISchemaPlugin): + def is_plugin_supported_field(self, field_definition: FieldDefinition) -> bool: + return field_definition.is_dataclass_type + + def to_openapi_schema(self, field_definition: FieldDefinition, schema_creator: SchemaCreator) -> Schema: + type_hints = field_definition.get_type_hints(include_extras=True, resolve_generics=True) + dataclass_fields = fields(field_definition.type_) + return schema_creator.create_component_schema( + field_definition, + required=sorted( + field.name + for field in dataclass_fields + if ( + field.default is MISSING + and field.default_factory is MISSING + and not is_optional_union(type_hints[field.name]) + ) + ), + property_fields={ + field.name: FieldDefinition.from_kwarg(type_hints[field.name], field.name) for field in dataclass_fields + }, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/pagination.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/pagination.py new file mode 100644 index 0000000..9b4f6c6 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/pagination.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from litestar.openapi.spec import OpenAPIType, Schema +from litestar.pagination import ClassicPagination, CursorPagination, OffsetPagination +from litestar.plugins import OpenAPISchemaPlugin + +if TYPE_CHECKING: + from litestar._openapi.schema_generation import SchemaCreator + from litestar.typing import FieldDefinition + + +class PaginationSchemaPlugin(OpenAPISchemaPlugin): + def is_plugin_supported_field(self, field_definition: FieldDefinition) -> bool: + return field_definition.origin in (ClassicPagination, CursorPagination, OffsetPagination) + + def to_openapi_schema(self, field_definition: FieldDefinition, schema_creator: SchemaCreator) -> Schema: + if field_definition.origin is ClassicPagination: + return Schema( + type=OpenAPIType.OBJECT, + properties={ + "items": Schema( + type=OpenAPIType.ARRAY, + items=schema_creator.for_field_definition(field_definition.inner_types[0]), + ), + "page_size": Schema(type=OpenAPIType.INTEGER, description="Number of items per page."), + "current_page": Schema(type=OpenAPIType.INTEGER, description="Current page number."), + "total_pages": Schema(type=OpenAPIType.INTEGER, description="Total number of pages."), + }, + ) + + if field_definition.origin is OffsetPagination: + return Schema( + type=OpenAPIType.OBJECT, + properties={ + "items": Schema( + type=OpenAPIType.ARRAY, + items=schema_creator.for_field_definition(field_definition.inner_types[0]), + ), + "limit": Schema(type=OpenAPIType.INTEGER, description="Maximal number of items to send."), + "offset": Schema(type=OpenAPIType.INTEGER, description="Offset from the beginning of the query."), + "total": Schema(type=OpenAPIType.INTEGER, description="Total number of items."), + }, + ) + + cursor_schema = schema_creator.not_generating_examples.for_field_definition(field_definition.inner_types[0]) + cursor_schema.description = "Unique ID, designating the last identifier in the given data set. This value can be used to request the 'next' batch of records." + + return Schema( + type=OpenAPIType.OBJECT, + properties={ + "items": Schema( + type=OpenAPIType.ARRAY, + items=schema_creator.for_field_definition(field_definition=field_definition.inner_types[1]), + ), + "cursor": cursor_schema, + "results_per_page": Schema(type=OpenAPIType.INTEGER, description="Maximal number of items to send."), + }, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/struct.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/struct.py new file mode 100644 index 0000000..aabfdb3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/struct.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from msgspec import Struct +from msgspec.structs import fields + +from litestar.plugins import OpenAPISchemaPlugin +from litestar.types.empty import Empty +from litestar.typing import FieldDefinition +from litestar.utils.predicates import is_optional_union + +if TYPE_CHECKING: + from msgspec.structs import FieldInfo + + from litestar._openapi.schema_generation import SchemaCreator + from litestar.openapi.spec import Schema + + +class StructSchemaPlugin(OpenAPISchemaPlugin): + def is_plugin_supported_field(self, field_definition: FieldDefinition) -> bool: + return not field_definition.is_union and field_definition.is_subclass_of(Struct) + + def to_openapi_schema(self, field_definition: FieldDefinition, schema_creator: SchemaCreator) -> Schema: + def is_field_required(field: FieldInfo) -> bool: + return field.required or field.default_factory is Empty + + type_hints = field_definition.get_type_hints(include_extras=True, resolve_generics=True) + struct_fields = fields(field_definition.type_) + + return schema_creator.create_component_schema( + field_definition, + required=sorted( + [ + field.encode_name + for field in struct_fields + if is_field_required(field=field) and not is_optional_union(type_hints[field.name]) + ] + ), + property_fields={ + field.encode_name: FieldDefinition.from_kwarg(type_hints[field.name], field.encode_name) + for field in struct_fields + }, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/typed_dict.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/typed_dict.py new file mode 100644 index 0000000..ef34e2b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/plugins/typed_dict.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from litestar.plugins import OpenAPISchemaPlugin +from litestar.typing import FieldDefinition + +if TYPE_CHECKING: + from litestar._openapi.schema_generation import SchemaCreator + from litestar.openapi.spec import Schema + + +class TypedDictSchemaPlugin(OpenAPISchemaPlugin): + def is_plugin_supported_field(self, field_definition: FieldDefinition) -> bool: + return field_definition.is_typeddict_type + + def to_openapi_schema(self, field_definition: FieldDefinition, schema_creator: SchemaCreator) -> Schema: + type_hints = field_definition.get_type_hints(include_extras=True, resolve_generics=True) + + return schema_creator.create_component_schema( + field_definition, + required=sorted(getattr(field_definition.type_, "__required_keys__", [])), + property_fields={k: FieldDefinition.from_kwarg(v, k) for k, v in type_hints.items()}, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/schema.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/schema.py new file mode 100644 index 0000000..0b7d6c6 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/schema.py @@ -0,0 +1,616 @@ +from __future__ import annotations + +from collections import deque +from copy import copy +from datetime import date, datetime, time, timedelta +from decimal import Decimal +from enum import Enum, EnumMeta +from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network +from pathlib import Path +from typing import ( + TYPE_CHECKING, + Any, + DefaultDict, + Deque, + Dict, + FrozenSet, + Hashable, + Iterable, + List, + Literal, + Mapping, + MutableMapping, + MutableSequence, + OrderedDict, + Pattern, + Sequence, + Set, + Tuple, + Union, + cast, +) +from uuid import UUID + +from typing_extensions import Self, get_args + +from litestar._openapi.datastructures import SchemaRegistry +from litestar._openapi.schema_generation.constrained_fields import ( + create_date_constrained_field_schema, + create_numerical_constrained_field_schema, + create_string_constrained_field_schema, +) +from litestar._openapi.schema_generation.utils import ( + _get_normalized_schema_key, + _should_create_enum_schema, + _should_create_literal_schema, + _type_or_first_not_none_inner_type, + get_json_schema_formatted_examples, +) +from litestar.datastructures import UploadFile +from litestar.exceptions import ImproperlyConfiguredException +from litestar.openapi.spec.enums import OpenAPIFormat, OpenAPIType +from litestar.openapi.spec.schema import Schema, SchemaDataContainer +from litestar.params import BodyKwarg, ParameterKwarg +from litestar.plugins import OpenAPISchemaPlugin +from litestar.types import Empty +from litestar.types.builtin_types import NoneType +from litestar.typing import FieldDefinition +from litestar.utils.helpers import get_name +from litestar.utils.predicates import ( + is_class_and_subclass, + is_undefined_sentinel, +) +from litestar.utils.typing import ( + get_origin_or_inner_type, + make_non_optional_union, +) + +if TYPE_CHECKING: + from litestar._openapi.datastructures import OpenAPIContext + from litestar.openapi.spec import Example, Reference + from litestar.plugins import OpenAPISchemaPluginProtocol + +KWARG_DEFINITION_ATTRIBUTE_TO_OPENAPI_PROPERTY_MAP: dict[str, str] = { + "content_encoding": "content_encoding", + "default": "default", + "description": "description", + "enum": "enum", + "examples": "examples", + "external_docs": "external_docs", + "format": "format", + "ge": "minimum", + "gt": "exclusive_minimum", + "le": "maximum", + "lt": "exclusive_maximum", + "max_items": "max_items", + "max_length": "max_length", + "min_items": "min_items", + "min_length": "min_length", + "multiple_of": "multiple_of", + "pattern": "pattern", + "title": "title", + "read_only": "read_only", +} + +TYPE_MAP: dict[type[Any] | None | Any, Schema] = { + Decimal: Schema(type=OpenAPIType.NUMBER), + DefaultDict: Schema(type=OpenAPIType.OBJECT), + Deque: Schema(type=OpenAPIType.ARRAY), + Dict: Schema(type=OpenAPIType.OBJECT), + FrozenSet: Schema(type=OpenAPIType.ARRAY), + IPv4Address: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.IPV4), + IPv4Interface: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.IPV4), + IPv4Network: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.IPV4), + IPv6Address: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.IPV6), + IPv6Interface: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.IPV6), + IPv6Network: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.IPV6), + Iterable: Schema(type=OpenAPIType.ARRAY), + List: Schema(type=OpenAPIType.ARRAY), + Mapping: Schema(type=OpenAPIType.OBJECT), + MutableMapping: Schema(type=OpenAPIType.OBJECT), + MutableSequence: Schema(type=OpenAPIType.ARRAY), + None: Schema(type=OpenAPIType.NULL), + NoneType: Schema(type=OpenAPIType.NULL), + OrderedDict: Schema(type=OpenAPIType.OBJECT), + Path: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.URI), + Pattern: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.REGEX), + Sequence: Schema(type=OpenAPIType.ARRAY), + Set: Schema(type=OpenAPIType.ARRAY), + Tuple: Schema(type=OpenAPIType.ARRAY), + UUID: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.UUID), + bool: Schema(type=OpenAPIType.BOOLEAN), + bytearray: Schema(type=OpenAPIType.STRING), + bytes: Schema(type=OpenAPIType.STRING), + date: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.DATE), + datetime: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.DATE_TIME), + deque: Schema(type=OpenAPIType.ARRAY), + dict: Schema(type=OpenAPIType.OBJECT), + float: Schema(type=OpenAPIType.NUMBER), + frozenset: Schema(type=OpenAPIType.ARRAY), + int: Schema(type=OpenAPIType.INTEGER), + list: Schema(type=OpenAPIType.ARRAY), + set: Schema(type=OpenAPIType.ARRAY), + str: Schema(type=OpenAPIType.STRING), + time: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.DURATION), + timedelta: Schema(type=OpenAPIType.STRING, format=OpenAPIFormat.DURATION), + tuple: Schema(type=OpenAPIType.ARRAY), +} + + +def _types_in_list(lst: list[Any]) -> list[OpenAPIType] | OpenAPIType: + """Extract unique OpenAPITypes present in the values of a list. + + Args: + lst: A list of values + + Returns: + OpenAPIType in the given list. If more then one exists, return + a list of OpenAPITypes. + """ + schema_types: list[OpenAPIType] = [] + for item in lst: + schema_type = TYPE_MAP[type(item)].type + if isinstance(schema_type, OpenAPIType): + schema_types.append(schema_type) + else: + raise RuntimeError("Unexpected type for schema item") # pragma: no cover + schema_types = list(set(schema_types)) + return schema_types[0] if len(schema_types) == 1 else schema_types + + +def _get_type_schema_name(field_definition: FieldDefinition) -> str: + """Extract the schema name from a data container. + + Args: + field_definition: A field definition instance. + + Returns: + A string + """ + + if name := getattr(field_definition.annotation, "__schema_name__", None): + return cast("str", name) + + name = get_name(field_definition.annotation) + if field_definition.inner_types: + inner_parts = ", ".join(_get_type_schema_name(t) for t in field_definition.inner_types) + return f"{name}[{inner_parts}]" + + return name + + +def create_enum_schema(annotation: EnumMeta, include_null: bool = False) -> Schema: + """Create a schema instance for an enum. + + Args: + annotation: An enum. + include_null: Whether to include null as a possible value. + + Returns: + A schema instance. + """ + enum_values: list[str | int | None] = [v.value for v in annotation] # type: ignore[var-annotated] + if include_null and None not in enum_values: + enum_values.append(None) + return Schema(type=_types_in_list(enum_values), enum=enum_values) + + +def _iter_flat_literal_args(annotation: Any) -> Iterable[Any]: + """Iterate over the flattened arguments of a Literal. + + Args: + annotation: An Literal annotation. + + Yields: + The flattened arguments of the Literal. + """ + for arg in get_args(annotation): + if get_origin_or_inner_type(arg) is Literal: + yield from _iter_flat_literal_args(arg) + else: + yield arg.value if isinstance(arg, Enum) else arg + + +def create_literal_schema(annotation: Any, include_null: bool = False) -> Schema: + """Create a schema instance for a Literal. + + Args: + annotation: An Literal annotation. + include_null: Whether to include null as a possible value. + + Returns: + A schema instance. + """ + args = list(_iter_flat_literal_args(annotation)) + if include_null and None not in args: + args.append(None) + schema = Schema(type=_types_in_list(args)) + if len(args) > 1: + schema.enum = args + else: + schema.const = args[0] + return schema + + +def create_schema_for_annotation(annotation: Any) -> Schema: + """Get a schema from the type mapping - if possible. + + Args: + annotation: A type annotation. + + Returns: + A schema instance or None. + """ + + return copy(TYPE_MAP[annotation]) if annotation in TYPE_MAP else Schema() + + +class SchemaCreator: + __slots__ = ("generate_examples", "plugins", "prefer_alias", "schema_registry") + + def __init__( + self, + generate_examples: bool = False, + plugins: Iterable[OpenAPISchemaPluginProtocol] | None = None, + prefer_alias: bool = True, + schema_registry: SchemaRegistry | None = None, + ) -> None: + """Instantiate a SchemaCreator. + + Args: + generate_examples: Whether to generate examples if none are given. + plugins: A list of plugins. + prefer_alias: Whether to prefer the alias name for the schema. + schema_registry: A SchemaRegistry instance. + """ + self.generate_examples = generate_examples + self.plugins = plugins if plugins is not None else [] + self.prefer_alias = prefer_alias + self.schema_registry = schema_registry or SchemaRegistry() + + @classmethod + def from_openapi_context(cls, context: OpenAPIContext, prefer_alias: bool = True, **kwargs: Any) -> Self: + kwargs.setdefault("generate_examples", context.openapi_config.create_examples) + kwargs.setdefault("plugins", context.plugins) + kwargs.setdefault("schema_registry", context.schema_registry) + return cls(**kwargs, prefer_alias=prefer_alias) + + @property + def not_generating_examples(self) -> SchemaCreator: + """Return a SchemaCreator with generate_examples set to False.""" + if not self.generate_examples: + return self + return type(self)(generate_examples=False, plugins=self.plugins, prefer_alias=False) + + @staticmethod + def plugin_supports_field(plugin: OpenAPISchemaPluginProtocol, field: FieldDefinition) -> bool: + if predicate := getattr(plugin, "is_plugin_supported_field", None): + return predicate(field) # type: ignore[no-any-return] + return plugin.is_plugin_supported_type(field.annotation) + + def get_plugin_for(self, field_definition: FieldDefinition) -> OpenAPISchemaPluginProtocol | None: + return next( + (plugin for plugin in self.plugins if self.plugin_supports_field(plugin, field_definition)), + None, + ) + + def is_constrained_field(self, field_definition: FieldDefinition) -> bool: + """Return if the field is constrained, taking into account constraints defined by plugins""" + return ( + isinstance(field_definition.kwarg_definition, (ParameterKwarg, BodyKwarg)) + and field_definition.kwarg_definition.is_constrained + ) or any( + p.is_constrained_field(field_definition) + for p in self.plugins + if isinstance(p, OpenAPISchemaPlugin) and p.is_plugin_supported_field(field_definition) + ) + + def is_undefined(self, value: Any) -> bool: + """Return if the field is undefined, taking into account undefined types defined by plugins""" + return is_undefined_sentinel(value) or any( + p.is_undefined_sentinel(value) for p in self.plugins if isinstance(p, OpenAPISchemaPlugin) + ) + + def for_field_definition(self, field_definition: FieldDefinition) -> Schema | Reference: + """Create a Schema for a given FieldDefinition. + + Args: + field_definition: A signature field instance. + + Returns: + A schema instance. + """ + + result: Schema | Reference + + if plugin_for_annotation := self.get_plugin_for(field_definition): + result = self.for_plugin(field_definition, plugin_for_annotation) + elif _should_create_enum_schema(field_definition): + annotation = _type_or_first_not_none_inner_type(field_definition) + result = create_enum_schema(annotation, include_null=field_definition.is_optional) + elif _should_create_literal_schema(field_definition): + annotation = ( + make_non_optional_union(field_definition.annotation) + if field_definition.is_optional + else field_definition.annotation + ) + result = create_literal_schema(annotation, include_null=field_definition.is_optional) + elif field_definition.is_optional: + result = self.for_optional_field(field_definition) + elif field_definition.is_union: + result = self.for_union_field(field_definition) + elif field_definition.is_type_var: + result = self.for_typevar() + elif field_definition.inner_types and not field_definition.is_generic: + result = self.for_object_type(field_definition) + elif self.is_constrained_field(field_definition): + result = self.for_constrained_field(field_definition) + elif field_definition.is_subclass_of(UploadFile): + result = self.for_upload_file(field_definition) + else: + result = create_schema_for_annotation(field_definition.annotation) + + return self.process_schema_result(field_definition, result) if isinstance(result, Schema) else result + + @staticmethod + def for_upload_file(field_definition: FieldDefinition) -> Schema: + """Create schema for UploadFile. + + Args: + field_definition: A field definition instance. + + Returns: + A Schema instance. + """ + + property_key = "file" + schema = Schema( + type=OpenAPIType.STRING, + content_media_type="application/octet-stream", + format=OpenAPIFormat.BINARY, + ) + + # If the type is `dict[str, UploadFile]`, then it's the same as a `list[UploadFile]` + # but we will internally convert that into a `dict[str, UploadFile]`. + if field_definition.is_non_string_sequence or field_definition.is_mapping: + property_key = "files" + schema = Schema(type=OpenAPIType.ARRAY, items=schema) + + # If the uploadfile is annotated directly on the handler, then the + # 'properties' needs to be created. Else, the 'properties' will be + # created by the corresponding plugin. + is_defined_on_handler = field_definition.name == "data" and isinstance( + field_definition.kwarg_definition, BodyKwarg + ) + if is_defined_on_handler: + return Schema(type=OpenAPIType.OBJECT, properties={property_key: schema}) + + return schema + + @staticmethod + def for_typevar() -> Schema: + """Create a schema for a TypeVar. + + Returns: + A schema instance. + """ + + return Schema(type=OpenAPIType.OBJECT) + + def for_optional_field(self, field_definition: FieldDefinition) -> Schema: + """Create a Schema for an optional FieldDefinition. + + Args: + field_definition: A signature field instance. + + Returns: + A schema instance. + """ + schema_or_reference = self.for_field_definition( + FieldDefinition.from_kwarg( + annotation=make_non_optional_union(field_definition.annotation), + name=field_definition.name, + default=field_definition.default, + ) + ) + if isinstance(schema_or_reference, Schema) and isinstance(schema_or_reference.one_of, list): + result = schema_or_reference.one_of + else: + result = [schema_or_reference] + + return Schema(one_of=[Schema(type=OpenAPIType.NULL), *result]) + + def for_union_field(self, field_definition: FieldDefinition) -> Schema: + """Create a Schema for a union FieldDefinition. + + Args: + field_definition: A signature field instance. + + Returns: + A schema instance. + """ + inner_types = (f for f in (field_definition.inner_types or []) if not self.is_undefined(f.annotation)) + values = list(map(self.for_field_definition, inner_types)) + return Schema(one_of=values) + + def for_object_type(self, field_definition: FieldDefinition) -> Schema: + """Create schema for object types (dict, Mapping, list, Sequence etc.) types. + + Args: + field_definition: A signature field instance. + + Returns: + A schema instance. + """ + if field_definition.has_inner_subclass_of(UploadFile): + return self.for_upload_file(field_definition) + + if field_definition.is_mapping: + return Schema( + type=OpenAPIType.OBJECT, + additional_properties=( + self.for_field_definition(field_definition.inner_types[1]) + if field_definition.inner_types and len(field_definition.inner_types) == 2 + else None + ), + ) + + if field_definition.is_non_string_sequence or field_definition.is_non_string_iterable: + # filters out ellipsis from tuple[int, ...] type annotations + inner_types = (f for f in field_definition.inner_types if f.annotation is not Ellipsis) + items = list(map(self.for_field_definition, inner_types or ())) + + return Schema( + type=OpenAPIType.ARRAY, + items=Schema(one_of=items) if len(items) > 1 else items[0], + ) + + raise ImproperlyConfiguredException( # pragma: no cover + f"Parameter '{field_definition.name}' with type '{field_definition.annotation}' could not be mapped to an Open API type. " + f"This can occur if a user-defined generic type is resolved as a parameter. If '{field_definition.name}' should " + "not be documented as a parameter, annotate it using the `Dependency` function, e.g., " + f"`{field_definition.name}: ... = Dependency(...)`." + ) + + def for_plugin(self, field_definition: FieldDefinition, plugin: OpenAPISchemaPluginProtocol) -> Schema | Reference: + """Create a schema using a plugin. + + Args: + field_definition: A signature field instance. + plugin: A plugin for the field type. + + Returns: + A schema instance. + """ + key = _get_normalized_schema_key(field_definition.annotation) + if (ref := self.schema_registry.get_reference_for_key(key)) is not None: + return ref + + schema = plugin.to_openapi_schema(field_definition=field_definition, schema_creator=self) + if isinstance(schema, SchemaDataContainer): # pragma: no cover + return self.for_field_definition( + FieldDefinition.from_kwarg( + annotation=schema.data_container, + name=field_definition.name, + default=field_definition.default, + extra=field_definition.extra, + kwarg_definition=field_definition.kwarg_definition, + ) + ) + return schema + + def for_constrained_field(self, field: FieldDefinition) -> Schema: + """Create Schema for Pydantic Constrained fields (created using constr(), conint() and so forth, or by subclassing + Constrained*) + + Args: + field: A signature field instance. + + Returns: + A schema instance. + """ + kwarg_definition = cast(Union[ParameterKwarg, BodyKwarg], field.kwarg_definition) + if any(is_class_and_subclass(field.annotation, t) for t in (int, float, Decimal)): + return create_numerical_constrained_field_schema(field.annotation, kwarg_definition) + if any(is_class_and_subclass(field.annotation, t) for t in (str, bytes)): # type: ignore[arg-type] + return create_string_constrained_field_schema(field.annotation, kwarg_definition) + if any(is_class_and_subclass(field.annotation, t) for t in (date, datetime)): + return create_date_constrained_field_schema(field.annotation, kwarg_definition) + return self.for_collection_constrained_field(field) + + def for_collection_constrained_field(self, field_definition: FieldDefinition) -> Schema: + """Create Schema from Constrained List/Set field. + + Args: + field_definition: A signature field instance. + + Returns: + A schema instance. + """ + schema = Schema(type=OpenAPIType.ARRAY) + kwarg_definition = cast(Union[ParameterKwarg, BodyKwarg], field_definition.kwarg_definition) + if kwarg_definition.min_items: + schema.min_items = kwarg_definition.min_items + if kwarg_definition.max_items: + schema.max_items = kwarg_definition.max_items + if any(is_class_and_subclass(field_definition.annotation, t) for t in (set, frozenset)): # type: ignore[arg-type] + schema.unique_items = True + + item_creator = self.not_generating_examples + if field_definition.inner_types: + items = list(map(item_creator.for_field_definition, field_definition.inner_types)) + schema.items = Schema(one_of=items) if len(items) > 1 else items[0] + else: + schema.items = item_creator.for_field_definition( + FieldDefinition.from_kwarg( + field_definition.annotation.item_type, f"{field_definition.annotation.__name__}Field" + ) + ) + return schema + + def process_schema_result(self, field: FieldDefinition, schema: Schema) -> Schema | Reference: + if field.kwarg_definition and field.is_const and field.has_default and schema.const is None: + schema.const = field.default + + if field.kwarg_definition: + for kwarg_definition_key, schema_key in KWARG_DEFINITION_ATTRIBUTE_TO_OPENAPI_PROPERTY_MAP.items(): + if (value := getattr(field.kwarg_definition, kwarg_definition_key, Empty)) and ( + not isinstance(value, Hashable) or not self.is_undefined(value) + ): + if schema_key == "examples": + value = get_json_schema_formatted_examples(cast("list[Example]", value)) + + # we only want to transfer values from the `KwargDefinition` to `Schema` if the schema object + # doesn't already have a value for that property. For example, if a field is a constrained date, + # by this point, we have already set the `exclusive_minimum` and/or `exclusive_maximum` fields + # to floating point timestamp values on the schema object. However, the original `date` objects + # that define those constraints on `KwargDefinition` are still `date` objects. We don't want to + # overwrite them here. + if getattr(schema, schema_key, None) is None: + setattr(schema, schema_key, value) + + if not schema.examples and self.generate_examples: + from litestar._openapi.schema_generation.examples import create_examples_for_field + + schema.examples = get_json_schema_formatted_examples(create_examples_for_field(field)) + + if schema.title and schema.type == OpenAPIType.OBJECT: + key = _get_normalized_schema_key(field.annotation) + return self.schema_registry.get_reference_for_key(key) or schema + return schema + + def create_component_schema( + self, + type_: FieldDefinition, + /, + required: list[str], + property_fields: Mapping[str, FieldDefinition], + openapi_type: OpenAPIType = OpenAPIType.OBJECT, + title: str | None = None, + examples: list[Any] | None = None, + ) -> Schema: + """Create a schema for the components/schemas section of the OpenAPI spec. + + These are schemas that can be referenced by other schemas in the document, including self references. + + To support self referencing schemas, the schema is added to the registry before schemas for its properties + are created. This allows the schema to be referenced by its properties. + + Args: + type_: ``FieldDefinition`` instance of the type to create a schema for. + required: A list of required fields. + property_fields: Mapping of name to ``FieldDefinition`` instances for the properties of the schema. + openapi_type: The OpenAPI type, defaults to ``OpenAPIType.OBJECT``. + title: The schema title, generated if not provided. + examples: A mapping of example names to ``Example`` instances, not required. + + Returns: + A schema instance. + """ + schema = self.schema_registry.get_schema_for_key(_get_normalized_schema_key(type_.annotation)) + schema.title = title or _get_type_schema_name(type_) + schema.required = required + schema.type = openapi_type + schema.properties = {k: self.for_field_definition(v) for k, v in property_fields.items()} + schema.examples = examples + return schema diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/utils.py b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/utils.py new file mode 100644 index 0000000..7ce27ca --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/schema_generation/utils.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +from enum import Enum +from typing import TYPE_CHECKING, Any, Mapping, _GenericAlias # type: ignore[attr-defined] + +from litestar.utils.helpers import get_name + +if TYPE_CHECKING: + from collections.abc import Sequence + + from litestar.openapi.spec import Example + from litestar.typing import FieldDefinition + +__all__ = ( + "_type_or_first_not_none_inner_type", + "_should_create_enum_schema", + "_should_create_literal_schema", + "_get_normalized_schema_key", +) + + +def _type_or_first_not_none_inner_type(field_definition: FieldDefinition) -> Any: + """Get the first inner type that is not None. + + This is a narrow focussed utility to be used when we know that a field definition either represents + a single type, or a single type in a union with `None`, and we want the single type. + + Args: + field_definition: A field definition instance. + + Returns: + A field definition instance. + """ + if not field_definition.is_optional: + return field_definition.annotation + inner = next((t for t in field_definition.inner_types if not t.is_none_type), None) + if inner is None: + raise ValueError("Field definition has no inner type that is not None") + return inner.annotation + + +def _should_create_enum_schema(field_definition: FieldDefinition) -> bool: + """Predicate to determine if we should create an enum schema for the field def, or not. + + This returns true if the field definition is an enum, or if the field definition is a union + of an enum and ``None``. + + When an annotation is ``SomeEnum | None`` we should create a schema for the enum that includes ``null`` + in the enum values. + + Args: + field_definition: A field definition instance. + + Returns: + A boolean + """ + return field_definition.is_subclass_of(Enum) or ( + field_definition.is_optional + and len(field_definition.args) == 2 + and field_definition.has_inner_subclass_of(Enum) + ) + + +def _should_create_literal_schema(field_definition: FieldDefinition) -> bool: + """Predicate to determine if we should create a literal schema for the field def, or not. + + This returns ``True`` if the field definition is an literal, or if the field definition is a union + of a literal and None. + + When an annotation is `Literal["anything"] | None` we should create a schema for the literal that includes `null` + in the enum values. + + Args: + field_definition: A field definition instance. + + Returns: + A boolean + """ + return ( + field_definition.is_literal + or field_definition.is_optional + and all(inner.is_literal for inner in field_definition.inner_types if not inner.is_none_type) + ) + + +def _get_normalized_schema_key(annotation: Any) -> tuple[str, ...]: + """Create a key for a type annotation. + + The key should be a tuple such as ``("path", "to", "type", "TypeName")``. + + Args: + annotation: a type annotation + + Returns: + A tuple of strings. + """ + module = getattr(annotation, "__module__", "") + name = str(annotation)[len(module) + 1 :] if isinstance(annotation, _GenericAlias) else annotation.__qualname__ + name = name.replace(".<locals>.", ".") + return *module.split("."), name + + +def get_formatted_examples(field_definition: FieldDefinition, examples: Sequence[Example]) -> Mapping[str, Example]: + """Format the examples into the OpenAPI schema format.""" + + name = field_definition.name or get_name(field_definition.type_) + name = name.lower() + + return {f"{name}-example-{i}": example for i, example in enumerate(examples, 1)} + + +def get_json_schema_formatted_examples(examples: Sequence[Example]) -> list[Any]: + """Format the examples into the JSON schema format.""" + return [example.value for example in examples] diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__init__.py b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__init__.py diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..7fc0cb2 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/converter.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/converter.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..5bc5015 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/converter.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/schema_parsing.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/schema_parsing.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..f6e0196 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/schema_parsing.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/types.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/types.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..ffe7efb --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/__pycache__/types.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/converter.py b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/converter.py new file mode 100644 index 0000000..4782dbe --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/converter.py @@ -0,0 +1,308 @@ +from __future__ import annotations + +from copy import copy +from dataclasses import fields +from typing import Any, TypeVar, cast + +from litestar._openapi.typescript_converter.schema_parsing import ( + normalize_typescript_namespace, + parse_schema, +) +from litestar._openapi.typescript_converter.types import ( + TypeScriptInterface, + TypeScriptNamespace, + TypeScriptPrimitive, + TypeScriptProperty, + TypeScriptType, + TypeScriptUnion, +) +from litestar.enums import HttpMethod, ParamType +from litestar.openapi.spec import ( + Components, + OpenAPI, + Operation, + Parameter, + Reference, + RequestBody, + Responses, + Schema, +) + +__all__ = ( + "convert_openapi_to_typescript", + "deref_container", + "get_openapi_type", + "parse_params", + "parse_request_body", + "parse_responses", + "resolve_ref", +) + +from litestar.openapi.spec.base import BaseSchemaObject + +T = TypeVar("T") + + +def _deref_schema_object(value: BaseSchemaObject, components: Components) -> BaseSchemaObject: + for field in fields(value): + if field_value := getattr(value, field.name, None): + if isinstance(field_value, Reference): + setattr( + value, + field.name, + deref_container(resolve_ref(field_value, components=components), components=components), + ) + elif isinstance(field_value, (Schema, dict, list)): + setattr(value, field.name, deref_container(field_value, components=components)) + return value + + +def _deref_dict(value: dict[str, Any], components: Components) -> dict[str, Any]: + for k, v in value.items(): + if isinstance(v, Reference): + value[k] = deref_container(resolve_ref(v, components=components), components=components) + elif isinstance(v, (Schema, dict, list)): + value[k] = deref_container(v, components=components) + return value + + +def _deref_list(values: list[Any], components: Components) -> list[Any]: + for i, value in enumerate(values): + if isinstance(value, Reference): + values[i] = deref_container(resolve_ref(value, components=components), components=components) + elif isinstance(value, (Schema, (dict, list))): + values[i] = deref_container(value, components=components) + return values + + +def deref_container(open_api_container: T, components: Components) -> T: + """Dereference an object that may contain Reference instances. + + Args: + open_api_container: Either an OpenAPI content, a dict or a list. + components: The OpenAPI schema Components section. + + Returns: + A dereferenced object. + """ + if isinstance(open_api_container, BaseSchemaObject): + return cast("T", _deref_schema_object(open_api_container, components)) + + if isinstance(open_api_container, dict): + return cast("T", _deref_dict(copy(open_api_container), components)) + + if isinstance(open_api_container, list): + return cast("T", _deref_list(copy(open_api_container), components)) + raise ValueError(f"unexpected container type {type(open_api_container).__name__}") # pragma: no cover + + +def resolve_ref(ref: Reference, components: Components) -> Schema: + """Resolve a reference object into the actual value it points at. + + Args: + ref: A Reference instance. + components: The OpenAPI schema Components section. + + Returns: + An OpenAPI schema instance. + """ + current: Any = components + for path in [p for p in ref.ref.split("/") if p not in {"#", "components"}]: + current = current[path] if isinstance(current, dict) else getattr(current, path, None) + + if not isinstance(current, Schema): # pragma: no cover + raise ValueError( + f"unexpected value type, expected schema but received {type(current).__name__ if current is not None else 'None'}" + ) + + return current + + +def get_openapi_type(value: Reference | T, components: Components) -> T: + """Extract or dereference an OpenAPI container type. + + Args: + value: Either a reference or a container type. + components: The OpenAPI schema Components section. + + Returns: + The extracted container. + """ + if isinstance(value, Reference): + resolved_ref = resolve_ref(value, components=components) + return cast("T", deref_container(open_api_container=resolved_ref, components=components)) + + return deref_container(open_api_container=value, components=components) + + +def parse_params( + params: list[Parameter], + components: Components, +) -> tuple[TypeScriptInterface, ...]: + """Parse request parameters. + + Args: + params: An OpenAPI Operation parameters. + components: The OpenAPI schema Components section. + + Returns: + A tuple of resolved interfaces. + """ + cookie_params: list[TypeScriptProperty] = [] + header_params: list[TypeScriptProperty] = [] + path_params: list[TypeScriptProperty] = [] + query_params: list[TypeScriptProperty] = [] + + for param in params: + if param.schema: + schema = get_openapi_type(param.schema, components) + ts_prop = TypeScriptProperty( + key=normalize_typescript_namespace(param.name, allow_quoted=True), + required=param.required, + value=parse_schema(schema), + ) + if param.param_in == ParamType.COOKIE: + cookie_params.append(ts_prop) + elif param.param_in == ParamType.HEADER: + header_params.append(ts_prop) + elif param.param_in == ParamType.PATH: + path_params.append(ts_prop) + else: + query_params.append(ts_prop) + + result: list[TypeScriptInterface] = [] + + if cookie_params: + result.append(TypeScriptInterface("CookieParameters", tuple(cookie_params))) + if header_params: + result.append(TypeScriptInterface("HeaderParameters", tuple(header_params))) + if path_params: + result.append(TypeScriptInterface("PathParameters", tuple(path_params))) + if query_params: + result.append(TypeScriptInterface("QueryParameters", tuple(query_params))) + + return tuple(result) + + +def parse_request_body(body: RequestBody, components: Components) -> TypeScriptType: + """Parse the schema request body. + + Args: + body: An OpenAPI RequestBody instance. + components: The OpenAPI schema Components section. + + Returns: + A TypeScript type. + """ + undefined = TypeScriptPrimitive("undefined") + if not body.content: + return TypeScriptType("RequestBody", undefined) + + if content := [get_openapi_type(v.schema, components) for v in body.content.values() if v.schema]: + schema = content[0] + return TypeScriptType( + "RequestBody", + parse_schema(schema) if body.required else TypeScriptUnion((parse_schema(schema), undefined)), + ) + + return TypeScriptType("RequestBody", undefined) + + +def parse_responses(responses: Responses, components: Components) -> tuple[TypeScriptNamespace, ...]: + """Parse a given Operation's Responses object. + + Args: + responses: An OpenAPI Responses object. + components: The OpenAPI schema Components section. + + Returns: + A tuple of namespaces, mapping response codes to data. + """ + result: list[TypeScriptNamespace] = [] + for http_status, response in [ + (status, get_openapi_type(res, components=components)) for status, res in responses.items() + ]: + if response.content and ( + content := [get_openapi_type(v.schema, components) for v in response.content.values() if v.schema] + ): + ts_type = parse_schema(content[0]) + else: + ts_type = TypeScriptPrimitive("undefined") + + containers = [ + TypeScriptType("ResponseBody", ts_type), + TypeScriptInterface( + "ResponseHeaders", + tuple( + TypeScriptProperty( + required=get_openapi_type(header, components=components).required, + key=normalize_typescript_namespace(key, allow_quoted=True), + value=TypeScriptPrimitive("string"), + ) + for key, header in response.headers.items() + ), + ) + if response.headers + else None, + ] + + result.append(TypeScriptNamespace(f"Http{http_status}", tuple(c for c in containers if c))) + + return tuple(result) + + +def convert_openapi_to_typescript(openapi_schema: OpenAPI, namespace: str = "API") -> TypeScriptNamespace: + """Convert an OpenAPI Schema instance to a TypeScript namespace. This function is the main entry point for the + TypeScript converter. + + Args: + openapi_schema: An OpenAPI Schema instance. + namespace: The namespace to use. + + Returns: + A string representing the generated types. + """ + if not openapi_schema.paths: # pragma: no cover + raise ValueError("OpenAPI schema has no paths") + if not openapi_schema.components: # pragma: no cover + raise ValueError("OpenAPI schema has no components") + + operations: list[TypeScriptNamespace] = [] + + for path_item in openapi_schema.paths.values(): + shared_params = [ + get_openapi_type(p, components=openapi_schema.components) for p in (path_item.parameters or []) + ] + for method in HttpMethod: + if ( + operation := cast("Operation | None", getattr(path_item, method.lower(), "None")) + ) and operation.operation_id: + params = parse_params( + [ + *( + get_openapi_type(p, components=openapi_schema.components) + for p in (operation.parameters or []) + ), + *shared_params, + ], + components=openapi_schema.components, + ) + request_body = ( + parse_request_body( + get_openapi_type(operation.request_body, components=openapi_schema.components), + components=openapi_schema.components, + ) + if operation.request_body + else None + ) + + responses = parse_responses(operation.responses or {}, components=openapi_schema.components) + + operations.append( + TypeScriptNamespace( + normalize_typescript_namespace(operation.operation_id, allow_quoted=False), + tuple(container for container in (*params, request_body, *responses) if container), + ) + ) + + return TypeScriptNamespace(namespace, tuple(operations)) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/schema_parsing.py b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/schema_parsing.py new file mode 100644 index 0000000..c5cbbd0 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/schema_parsing.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +import re +from typing import TYPE_CHECKING, Any, Literal, overload + +from litestar._openapi.typescript_converter.types import ( + TypeScriptAnonymousInterface, + TypeScriptArray, + TypeScriptElement, + TypeScriptInterface, + TypeScriptIntersection, + TypeScriptLiteral, + TypeScriptPrimitive, + TypeScriptProperty, + TypeScriptUnion, +) +from litestar.openapi.spec import Schema +from litestar.openapi.spec.enums import OpenAPIType + +__all__ = ("create_interface", "is_schema_value", "normalize_typescript_namespace", "parse_schema", "parse_type_schema") + +if TYPE_CHECKING: + from typing_extensions import TypeGuard + +openapi_typescript_equivalent_types = Literal[ + "string", "boolean", "number", "null", "Record<string, unknown>", "unknown[]" +] + +openapi_to_typescript_type_map: dict[OpenAPIType, openapi_typescript_equivalent_types] = { + OpenAPIType.ARRAY: "unknown[]", + OpenAPIType.BOOLEAN: "boolean", + OpenAPIType.INTEGER: "number", + OpenAPIType.NULL: "null", + OpenAPIType.NUMBER: "number", + OpenAPIType.OBJECT: "Record<string, unknown>", + OpenAPIType.STRING: "string", +} + +invalid_namespace_re = re.compile(r"[^\w+_$]*") +allowed_key_re = re.compile(r"[\w+_$]*") + + +def normalize_typescript_namespace(value: str, allow_quoted: bool) -> str: + """Normalize a namespace, e.g. variable name, or object key, to values supported by TS. + + Args: + value: A string to normalize. + allow_quoted: Whether to allow quoting the value. + + Returns: + A normalized value + """ + if not allow_quoted and not value[0].isalpha() and value[0] not in {"_", "$"}: + raise ValueError(f"invalid typescript namespace {value}") + if allow_quoted: + return value if allowed_key_re.fullmatch(value) else f'"{value}"' + return invalid_namespace_re.sub("", value) + + +def is_schema_value(value: Any) -> TypeGuard[Schema]: + """Typeguard for a schema value. + + Args: + value: An arbitrary value + + Returns: + A typeguard boolean dictating whether the passed in value is a Schema. + """ + return isinstance(value, Schema) + + +@overload +def create_interface(properties: dict[str, Schema], required: set[str] | None) -> TypeScriptAnonymousInterface: ... + + +@overload +def create_interface(properties: dict[str, Schema], required: set[str] | None, name: str) -> TypeScriptInterface: ... + + +def create_interface( + properties: dict[str, Schema], required: set[str] | None = None, name: str | None = None +) -> TypeScriptAnonymousInterface | TypeScriptInterface: + """Create a typescript interface from the given schema.properties values. + + Args: + properties: schema.properties mapping. + required: An optional list of required properties. + name: An optional string representing the interface name. + + Returns: + A typescript interface or anonymous interface. + """ + parsed_properties = tuple( + TypeScriptProperty( + key=normalize_typescript_namespace(key, allow_quoted=True), + value=parse_schema(schema), + required=key in required if required is not None else True, + ) + for key, schema in properties.items() + ) + return ( + TypeScriptInterface(name=name, properties=parsed_properties) + if name is not None + else TypeScriptAnonymousInterface(properties=parsed_properties) + ) + + +def parse_type_schema(schema: Schema) -> TypeScriptPrimitive | TypeScriptLiteral | TypeScriptUnion: + """Parse an OpenAPI schema representing a primitive type(s). + + Args: + schema: An OpenAPI schema. + + Returns: + A typescript type. + """ + if schema.enum: + return TypeScriptUnion(types=tuple(TypeScriptLiteral(value=value) for value in schema.enum)) + if schema.const: + return TypeScriptLiteral(value=schema.const) + if isinstance(schema.type, list): + return TypeScriptUnion( + tuple(TypeScriptPrimitive(openapi_to_typescript_type_map[s_type]) for s_type in schema.type) + ) + if schema.type in openapi_to_typescript_type_map and isinstance(schema.type, OpenAPIType): + return TypeScriptPrimitive(openapi_to_typescript_type_map[schema.type]) + raise TypeError(f"received an unexpected openapi type: {schema.type}") # pragma: no cover + + +def parse_schema(schema: Schema) -> TypeScriptElement: + """Parse an OpenAPI schema object recursively to create typescript types. + + Args: + schema: An OpenAPI Schema object. + + Returns: + A typescript type. + """ + if schema.all_of: + return TypeScriptIntersection(tuple(parse_schema(s) for s in schema.all_of if is_schema_value(s))) + if schema.one_of: + return TypeScriptUnion(tuple(parse_schema(s) for s in schema.one_of if is_schema_value(s))) + if is_schema_value(schema.items): + return TypeScriptArray(parse_schema(schema.items)) + if schema.type == OpenAPIType.OBJECT: + return create_interface( + properties={k: v for k, v in schema.properties.items() if is_schema_value(v)} if schema.properties else {}, + required=set(schema.required) if schema.required else None, + ) + return parse_type_schema(schema=schema) diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/types.py b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/types.py new file mode 100644 index 0000000..ff265d4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/typescript_converter/types.py @@ -0,0 +1,308 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Literal + +__all__ = ( + "TypeScriptAnonymousInterface", + "TypeScriptArray", + "TypeScriptConst", + "TypeScriptContainer", + "TypeScriptElement", + "TypeScriptEnum", + "TypeScriptInterface", + "TypeScriptIntersection", + "TypeScriptLiteral", + "TypeScriptNamespace", + "TypeScriptPrimitive", + "TypeScriptProperty", + "TypeScriptType", + "TypeScriptUnion", +) + + +def _as_string(value: Any) -> str: + if isinstance(value, str): + return f'"{value}"' + + if isinstance(value, bool): + return "true" if value else "false" + + return "null" if value is None else str(value) + + +class TypeScriptElement(ABC): + """A class representing a TypeScript type element.""" + + @abstractmethod + def write(self) -> str: + """Write a typescript value corresponding to the given typescript element. + + Returns: + A typescript string + """ + raise NotImplementedError("") + + +class TypeScriptContainer(TypeScriptElement): + """A class representing a TypeScript type container.""" + + name: str + + @abstractmethod + def write(self) -> str: + """Write a typescript value corresponding to the given typescript container. + + Returns: + A typescript string + """ + raise NotImplementedError("") + + +@dataclass(unsafe_hash=True) +class TypeScriptIntersection(TypeScriptElement): + """A class representing a TypeScript intersection type.""" + + types: tuple[TypeScriptElement, ...] + + def write(self) -> str: + """Write a typescript intersection value. + + Example: + { prop: string } & { another: number } + + Returns: + A typescript string + """ + return " & ".join(t.write() for t in self.types) + + +@dataclass(unsafe_hash=True) +class TypeScriptUnion(TypeScriptElement): + """A class representing a TypeScript union type.""" + + types: tuple[TypeScriptElement, ...] + + def write(self) -> str: + """Write a typescript union value. + + Example: + string | number + + Returns: + A typescript string + """ + return " | ".join(sorted(t.write() for t in self.types)) + + +@dataclass(unsafe_hash=True) +class TypeScriptPrimitive(TypeScriptElement): + """A class representing a TypeScript primitive type.""" + + type: Literal[ + "string", "number", "boolean", "any", "null", "undefined", "symbol", "Record<string, unknown>", "unknown[]" + ] + + def write(self) -> str: + """Write a typescript primitive type. + + Example: + null + + Returns: + A typescript string + """ + return self.type + + +@dataclass(unsafe_hash=True) +class TypeScriptLiteral(TypeScriptElement): + """A class representing a TypeScript literal type.""" + + value: str | int | float | bool | None + + def write(self) -> str: + """Write a typescript literal type. + + Example: + "someValue" + + Returns: + A typescript string + """ + return _as_string(self.value) + + +@dataclass(unsafe_hash=True) +class TypeScriptArray(TypeScriptElement): + """A class representing a TypeScript array type.""" + + item_type: TypeScriptElement + + def write(self) -> str: + """Write a typescript array type. + + Example: + number[] + + Returns: + A typescript string + """ + value = ( + f"({self.item_type.write()})" + if isinstance(self.item_type, (TypeScriptUnion, TypeScriptIntersection)) + else self.item_type.write() + ) + return f"{value}[]" + + +@dataclass(unsafe_hash=True) +class TypeScriptProperty(TypeScriptElement): + """A class representing a TypeScript interface property.""" + + required: bool + key: str + value: TypeScriptElement + + def write(self) -> str: + """Write a typescript property. This class is used exclusively inside interfaces. + + Example: + key: string; + optional?: number; + + Returns: + A typescript string + """ + return f"{self.key}{':' if self.required else '?:'} {self.value.write()};" + + +@dataclass(unsafe_hash=True) +class TypeScriptAnonymousInterface(TypeScriptElement): + """A class representing a TypeScript anonymous interface.""" + + properties: tuple[TypeScriptProperty, ...] + + def write(self) -> str: + """Write a typescript interface object, without a name. + + Example: + { + key: string; + optional?: number; + } + + Returns: + A typescript string + """ + props = "\t" + "\n\t".join([prop.write() for prop in sorted(self.properties, key=lambda prop: prop.key)]) + return f"{{\n{props}\n}}" + + +@dataclass(unsafe_hash=True) +class TypeScriptInterface(TypeScriptContainer): + """A class representing a TypeScript interface.""" + + name: str + properties: tuple[TypeScriptProperty, ...] + + def write(self) -> str: + """Write a typescript interface. + + Example: + export interface MyInterface { + key: string; + optional?: number; + }; + + Returns: + A typescript string + """ + interface = TypeScriptAnonymousInterface(properties=self.properties) + return f"export interface {self.name} {interface.write()};" + + +@dataclass(unsafe_hash=True) +class TypeScriptEnum(TypeScriptContainer): + """A class representing a TypeScript enum.""" + + name: str + values: tuple[tuple[str, str], ...] | tuple[tuple[str, int | float], ...] + + def write(self) -> str: + """Write a typescript enum. + + Example: + export enum MyEnum { + DOG = "canine", + CAT = "feline", + }; + + Returns: + A typescript string + """ + members = "\t" + "\n\t".join( + [f"{key} = {_as_string(value)}," for key, value in sorted(self.values, key=lambda member: member[0])] + ) + return f"export enum {self.name} {{\n{members}\n}};" + + +@dataclass(unsafe_hash=True) +class TypeScriptType(TypeScriptContainer): + """A class representing a TypeScript type.""" + + name: str + value: TypeScriptElement + + def write(self) -> str: + """Write a typescript type. + + Example: + export type MyType = number | "42"; + + Returns: + A typescript string + """ + return f"export type {self.name} = {self.value.write()};" + + +@dataclass(unsafe_hash=True) +class TypeScriptConst(TypeScriptContainer): + """A class representing a TypeScript const.""" + + name: str + value: TypeScriptPrimitive | TypeScriptLiteral + + def write(self) -> str: + """Write a typescript const. + + Example: + export const MyConst: number; + + Returns: + A typescript string + """ + return f"export const {self.name}: {self.value.write()};" + + +@dataclass(unsafe_hash=True) +class TypeScriptNamespace(TypeScriptContainer): + """A class representing a TypeScript namespace.""" + + name: str + values: tuple[TypeScriptContainer, ...] + + def write(self) -> str: + """Write a typescript namespace. + + Example: + export MyNamespace { + export const MyConst: number; + } + + Returns: + A typescript string + """ + members = "\t" + "\n\n\t".join([value.write() for value in sorted(self.values, key=lambda el: el.name)]) + return f"export namespace {self.name} {{\n{members}\n}};" diff --git a/venv/lib/python3.11/site-packages/litestar/_openapi/utils.py b/venv/lib/python3.11/site-packages/litestar/_openapi/utils.py new file mode 100644 index 0000000..b1950fa --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_openapi/utils.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import re +from typing import TYPE_CHECKING + +from litestar.types.internal_types import PathParameterDefinition + +if TYPE_CHECKING: + from litestar.handlers.http_handlers import HTTPRouteHandler + from litestar.types import Method + + +__all__ = ("default_operation_id_creator", "SEPARATORS_CLEANUP_PATTERN") + +SEPARATORS_CLEANUP_PATTERN = re.compile(r"[!#$%&'*+\-.^_`|~:]+") + + +def default_operation_id_creator( + route_handler: HTTPRouteHandler, + http_method: Method, + path_components: list[str | PathParameterDefinition], +) -> str: + """Create a unique 'operationId' for an OpenAPI PathItem entry. + + Args: + route_handler: The HTTP Route Handler instance. + http_method: The HTTP method for the given PathItem. + path_components: A list of path components. + + Returns: + A camelCased operationId created from the handler function name, + http method and path components. + """ + + handler_namespace = ( + http_method.title() + route_handler.handler_name.title() + if len(route_handler.http_methods) > 1 + else route_handler.handler_name.title() + ) + + components_namespace = "" + for component in (c.name if isinstance(c, PathParameterDefinition) else c for c in path_components): + if component.title() not in components_namespace: + components_namespace += component.title() + + return SEPARATORS_CLEANUP_PATTERN.sub("", components_namespace + handler_namespace) |