diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie')
10 files changed, 529 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__init__.py b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__init__.py new file mode 100644 index 0000000..948e394 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__init__.py @@ -0,0 +1,6 @@ +from litestar._asgi.routing_trie.mapping import add_route_to_trie +from litestar._asgi.routing_trie.traversal import parse_path_to_route +from litestar._asgi.routing_trie.types import RouteTrieNode +from litestar._asgi.routing_trie.validate import validate_node + +__all__ = ("RouteTrieNode", "add_route_to_trie", "parse_path_to_route", "validate_node") diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..f8658f9 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/mapping.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/mapping.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..73323bf --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/mapping.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/traversal.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/traversal.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..9ed2983 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/traversal.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/types.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/types.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..5247c5b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/types.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/validate.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/validate.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..57fdcdd --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/__pycache__/validate.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/mapping.py b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/mapping.py new file mode 100644 index 0000000..7a56b97 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/mapping.py @@ -0,0 +1,221 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Any, cast + +from litestar._asgi.routing_trie.types import ( + ASGIHandlerTuple, + PathParameterSentinel, + create_node, +) +from litestar._asgi.utils import wrap_in_exception_handler +from litestar.types.internal_types import PathParameterDefinition + +__all__ = ("add_mount_route", "add_route_to_trie", "build_route_middleware_stack", "configure_node") + + +if TYPE_CHECKING: + from litestar._asgi.routing_trie.types import RouteTrieNode + from litestar.app import Litestar + from litestar.routes import ASGIRoute, HTTPRoute, WebSocketRoute + from litestar.types import ASGIApp, RouteHandlerType + + +def add_mount_route( + current_node: RouteTrieNode, + mount_routes: dict[str, RouteTrieNode], + root_node: RouteTrieNode, + route: ASGIRoute, +) -> RouteTrieNode: + """Add a node for a mount route. + + Args: + current_node: The current trie node that is being mapped. + mount_routes: A dictionary mapping static routes to trie nodes. + root_node: The root trie node. + route: The route that is being added. + + Returns: + A trie node. + """ + + # we need to ensure that we can traverse the map both through the full path key, e.g. "/my-route/sub-path" and + # via the components keys ["my-route, "sub-path"] + if route.path not in current_node.children: + root_node = current_node + for component in route.path_components: + if component not in current_node.children: + current_node.children[component] = create_node() # type: ignore[index] + current_node = current_node.children[component] # type: ignore[index] + + current_node.is_mount = True + current_node.is_static = route.route_handler.is_static + + if route.path != "/": + mount_routes[route.path] = root_node.children[route.path] = current_node + else: + mount_routes[route.path] = current_node + + return current_node + + +def add_route_to_trie( + app: Litestar, + mount_routes: dict[str, RouteTrieNode], + plain_routes: set[str], + root_node: RouteTrieNode, + route: HTTPRoute | WebSocketRoute | ASGIRoute, +) -> RouteTrieNode: + """Add a new route path (e.g. '/foo/bar/{param:int}') into the route_map tree. + + Inserts non-parameter paths ('plain routes') off the tree's root + node. For paths containing parameters, splits the path on '/' and + nests each path segment under the previous segment's node (see + prefix tree / trie). + + Args: + app: The Litestar app instance. + mount_routes: A dictionary mapping static routes to trie nodes. + plain_routes: A set of routes that do not have path parameters. + root_node: The root trie node. + route: The route that is being added. + + Returns: + A RouteTrieNode instance. + """ + current_node = root_node + + has_path_parameters = bool(route.path_parameters) + + if (route_handler := getattr(route, "route_handler", None)) and getattr(route_handler, "is_mount", False): + current_node = add_mount_route( + current_node=current_node, + mount_routes=mount_routes, + root_node=root_node, + route=cast("ASGIRoute", route), + ) + + elif not has_path_parameters: + plain_routes.add(route.path) + if route.path not in root_node.children: + current_node.children[route.path] = create_node() + current_node = root_node.children[route.path] + + else: + for component in route.path_components: + if isinstance(component, PathParameterDefinition): + current_node.is_path_param_node = True + next_node_key: type[PathParameterSentinel] | str = PathParameterSentinel + + else: + next_node_key = component + + if next_node_key not in current_node.children: + current_node.children[next_node_key] = create_node() + + current_node.child_keys = set(current_node.children.keys()) + current_node = current_node.children[next_node_key] + + if isinstance(component, PathParameterDefinition) and component.type is Path: + current_node.is_path_type = True + + configure_node(route=route, app=app, node=current_node) + return current_node + + +def configure_node( + app: Litestar, + route: HTTPRoute | WebSocketRoute | ASGIRoute, + node: RouteTrieNode, +) -> None: + """Set required attributes and route handlers on route_map tree node. + + Args: + app: The Litestar app instance. + route: The route that is being added. + node: The trie node being configured. + + Returns: + None + """ + from litestar.routes import HTTPRoute, WebSocketRoute + + if not node.path_parameters: + node.path_parameters = {} + + if isinstance(route, HTTPRoute): + for method, handler_mapping in route.route_handler_map.items(): + handler, _ = handler_mapping + node.asgi_handlers[method] = ASGIHandlerTuple( + asgi_app=build_route_middleware_stack(app=app, route=route, route_handler=handler), + handler=handler, + ) + node.path_parameters[method] = route.path_parameters + + elif isinstance(route, WebSocketRoute): + node.asgi_handlers["websocket"] = ASGIHandlerTuple( + asgi_app=build_route_middleware_stack(app=app, route=route, route_handler=route.route_handler), + handler=route.route_handler, + ) + node.path_parameters["websocket"] = route.path_parameters + + else: + node.asgi_handlers["asgi"] = ASGIHandlerTuple( + asgi_app=build_route_middleware_stack(app=app, route=route, route_handler=route.route_handler), + handler=route.route_handler, + ) + node.path_parameters["asgi"] = route.path_parameters + node.is_asgi = True + + +def build_route_middleware_stack( + app: Litestar, + route: HTTPRoute | WebSocketRoute | ASGIRoute, + route_handler: RouteHandlerType, +) -> ASGIApp: + """Construct a middleware stack that serves as the point of entry for each route. + + Args: + app: The Litestar app instance. + route: The route that is being added. + route_handler: The route handler that is being wrapped. + + Returns: + An ASGIApp that is composed of a "stack" of middlewares. + """ + from litestar.middleware.allowed_hosts import AllowedHostsMiddleware + from litestar.middleware.compression import CompressionMiddleware + from litestar.middleware.csrf import CSRFMiddleware + from litestar.middleware.response_cache import ResponseCacheMiddleware + from litestar.routes import HTTPRoute + + # we wrap the route.handle method in the ExceptionHandlerMiddleware + asgi_handler = wrap_in_exception_handler( + app=route.handle, # type: ignore[arg-type] + exception_handlers=route_handler.resolve_exception_handlers(), + ) + + if app.csrf_config: + asgi_handler = CSRFMiddleware(app=asgi_handler, config=app.csrf_config) + + if app.compression_config: + asgi_handler = CompressionMiddleware(app=asgi_handler, config=app.compression_config) + + if isinstance(route, HTTPRoute) and any(r.cache for r in route.route_handlers): + asgi_handler = ResponseCacheMiddleware(app=asgi_handler, config=app.response_cache_config) + + if app.allowed_hosts: + asgi_handler = AllowedHostsMiddleware(app=asgi_handler, config=app.allowed_hosts) + + for middleware in route_handler.resolve_middleware(): + if hasattr(middleware, "__iter__"): + handler, kwargs = cast("tuple[Any, dict[str, Any]]", middleware) + asgi_handler = handler(app=asgi_handler, **kwargs) + else: + asgi_handler = middleware(app=asgi_handler) # type: ignore[call-arg] + + # we wrap the entire stack again in ExceptionHandlerMiddleware + return wrap_in_exception_handler( + app=cast("ASGIApp", asgi_handler), + exception_handlers=route_handler.resolve_exception_handlers(), + ) # pyright: ignore diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/traversal.py b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/traversal.py new file mode 100644 index 0000000..b7788bd --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/traversal.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +from functools import lru_cache +from typing import TYPE_CHECKING, Any, Pattern + +from litestar._asgi.routing_trie.types import PathParameterSentinel +from litestar.exceptions import MethodNotAllowedException, NotFoundException +from litestar.utils import normalize_path + +__all__ = ("parse_node_handlers", "parse_path_params", "parse_path_to_route", "traverse_route_map") + + +if TYPE_CHECKING: + from litestar._asgi.routing_trie.types import ASGIHandlerTuple, RouteTrieNode + from litestar.types import ASGIApp, Method, RouteHandlerType + from litestar.types.internal_types import PathParameterDefinition + + +def traverse_route_map( + root_node: RouteTrieNode, + path: str, +) -> tuple[RouteTrieNode, list[str], str]: + """Traverses the application route mapping and retrieves the correct node for the request url. + + Args: + root_node: The root trie node. + path: The request's path. + + Raises: + NotFoundException: If no correlating node is found. + + Returns: + A tuple containing the target RouteMapNode and a list containing all path parameter values. + """ + current_node = root_node + path_params: list[str] = [] + path_components = [p for p in path.split("/") if p] + + for i, component in enumerate(path_components): + if component in current_node.child_keys: + current_node = current_node.children[component] + continue + + if current_node.is_path_param_node: + current_node = current_node.children[PathParameterSentinel] + + if current_node.is_path_type: + path_params.append(normalize_path("/".join(path_components[i:]))) + break + + path_params.append(component) + continue + + raise NotFoundException() + + if not current_node.asgi_handlers: + raise NotFoundException() + + return current_node, path_params, path + + +def parse_node_handlers( + node: RouteTrieNode, + method: Method | None, +) -> ASGIHandlerTuple: + """Retrieve the handler tuple from the node. + + Args: + node: The trie node to parse. + method: The scope's method. + + Raises: + KeyError: If no matching method is found. + + Returns: + An ASGI Handler tuple. + """ + + if node.is_asgi: + return node.asgi_handlers["asgi"] + if method: + return node.asgi_handlers[method] + return node.asgi_handlers["websocket"] + + +@lru_cache(1024) +def parse_path_params( + parameter_definitions: tuple[PathParameterDefinition, ...], path_param_values: tuple[str, ...] +) -> dict[str, Any]: + """Parse path parameters into a dictionary of values. + + Args: + parameter_definitions: The parameter definitions tuple from the route. + path_param_values: The string values extracted from the url + + Raises: + ValueError: If any of path parameters can not be parsed into a value. + + Returns: + A dictionary of parsed path parameters. + """ + return { + param_definition.name: param_definition.parser(value) if param_definition.parser else value + for param_definition, value in zip(parameter_definitions, path_param_values) + } + + +def parse_path_to_route( + method: Method | None, + mount_paths_regex: Pattern | None, + mount_routes: dict[str, RouteTrieNode], + path: str, + plain_routes: set[str], + root_node: RouteTrieNode, +) -> tuple[ASGIApp, RouteHandlerType, str, dict[str, Any]]: + """Given a scope object, retrieve the asgi_handlers and is_mount boolean values from correct trie node. + + Args: + method: The scope's method, if any. + root_node: The root trie node. + path: The path to resolve scope instance. + plain_routes: The set of plain routes. + mount_routes: Mapping of mount routes to trie nodes. + mount_paths_regex: A compiled regex to match the mount routes. + + Raises: + MethodNotAllowedException: if no matching method is found. + NotFoundException: If no correlating node is found or if path params can not be parsed into values according to the node definition. + + Returns: + A tuple containing the stack of middlewares and the route handler that is wrapped by it. + """ + + try: + if path in plain_routes: + asgi_app, handler = parse_node_handlers(node=root_node.children[path], method=method) + return asgi_app, handler, path, {} + + if mount_paths_regex and (match := mount_paths_regex.search(path)): + mount_path = path[match.start() : match.end()] + mount_node = mount_routes[mount_path] + remaining_path = path[match.end() :] + # since we allow regular handlers under static paths, we must validate that the request does not match + # any such handler. + children = [sub_route for sub_route in mount_node.children or [] if sub_route != mount_path] + if not children or all(sub_route not in path for sub_route in children): # type: ignore[operator] + asgi_app, handler = parse_node_handlers(node=mount_node, method=method) + remaining_path = remaining_path or "/" + if not mount_node.is_static: + remaining_path = remaining_path if remaining_path.endswith("/") else f"{remaining_path}/" + return asgi_app, handler, remaining_path, {} + + node, path_parameters, path = traverse_route_map( + root_node=root_node, + path=path, + ) + asgi_app, handler = parse_node_handlers(node=node, method=method) + key = method or ("asgi" if node.is_asgi else "websocket") + parsed_path_parameters = parse_path_params(node.path_parameters[key], tuple(path_parameters)) + + return ( + asgi_app, + handler, + path, + parsed_path_parameters, + ) + except KeyError as e: + raise MethodNotAllowedException() from e + except ValueError as e: + raise NotFoundException() from e diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/types.py b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/types.py new file mode 100644 index 0000000..d1fc368 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/types.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Literal, NamedTuple + +__all__ = ("ASGIHandlerTuple", "PathParameterSentinel", "RouteTrieNode", "create_node") + + +if TYPE_CHECKING: + from litestar.types import ASGIApp, Method, RouteHandlerType + from litestar.types.internal_types import PathParameterDefinition + + +class PathParameterSentinel: + """Sentinel class designating a path parameter.""" + + +class ASGIHandlerTuple(NamedTuple): + """Encapsulation of a route handler node.""" + + asgi_app: ASGIApp + """An ASGI stack, composed of a handler function and layers of middleware that wrap it.""" + handler: RouteHandlerType + """The route handler instance.""" + + +@dataclass(unsafe_hash=True) +class RouteTrieNode: + """A radix trie node.""" + + __slots__ = ( + "asgi_handlers", + "child_keys", + "children", + "is_asgi", + "is_mount", + "is_static", + "is_path_param_node", + "is_path_type", + "path_parameters", + ) + + asgi_handlers: dict[Method | Literal["websocket", "asgi"], ASGIHandlerTuple] + """A mapping of ASGI handlers stored on the node.""" + child_keys: set[str | type[PathParameterSentinel]] + """ + A set containing the child keys, same as the children dictionary - but as a set, which offers faster lookup. + """ + children: dict[str | type[PathParameterSentinel], RouteTrieNode] + """A dictionary mapping path components or using the PathParameterSentinel class to child nodes.""" + is_path_param_node: bool + """Designates the node as having a path parameter.""" + is_path_type: bool + """Designates the node as having a 'path' type path parameter.""" + is_asgi: bool + """Designate the node as having an `asgi` type handler.""" + is_mount: bool + """Designate the node as being a mount route.""" + is_static: bool + """Designate the node as being a static mount route.""" + path_parameters: dict[Method | Literal["websocket"] | Literal["asgi"], tuple[PathParameterDefinition, ...]] + """A list of tuples containing path parameter definitions. + + This is used for parsing extracted path parameter values. + """ + + +def create_node() -> RouteTrieNode: + """Create a RouteMapNode instance. + + Returns: + A route map node instance. + """ + + return RouteTrieNode( + asgi_handlers={}, + child_keys=set(), + children={}, + is_path_param_node=False, + is_asgi=False, + is_mount=False, + is_static=False, + is_path_type=False, + path_parameters={}, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/validate.py b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/validate.py new file mode 100644 index 0000000..5c29fac --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/_asgi/routing_trie/validate.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from itertools import chain +from typing import TYPE_CHECKING + +from litestar.exceptions import ImproperlyConfiguredException + +__all__ = ("validate_node",) + + +if TYPE_CHECKING: + from litestar._asgi.routing_trie.types import RouteTrieNode + + +def validate_node(node: RouteTrieNode) -> None: + """Recursively traverses the trie from the given node upwards. + + Args: + node: A trie node. + + Raises: + ImproperlyConfiguredException + + Returns: + None + """ + if node.is_asgi and bool(set(node.asgi_handlers).difference({"asgi"})): + raise ImproperlyConfiguredException("ASGI handlers must have a unique path not shared by other route handlers.") + + if ( + node.is_mount + and node.children + and any( + chain.from_iterable( + list(child.path_parameters.values()) + if isinstance(child.path_parameters, dict) + else child.path_parameters + for child in node.children.values() + ) + ) + ): + raise ImproperlyConfiguredException("Path parameters are not allowed under a static or mount route.") + + for child in node.children.values(): + if child is node: + continue + validate_node(node=child) |