diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/middleware/exceptions')
10 files changed, 709 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__init__.py b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__init__.py new file mode 100644 index 0000000..5328adf --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__init__.py @@ -0,0 +1,3 @@ +from litestar.middleware.exceptions.middleware import ExceptionHandlerMiddleware + +__all__ = ("ExceptionHandlerMiddleware",) diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..c443e00 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__pycache__/_debug_response.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__pycache__/_debug_response.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..b41fc85 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__pycache__/_debug_response.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__pycache__/middleware.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..2259206 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/__pycache__/middleware.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/_debug_response.py b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/_debug_response.py new file mode 100644 index 0000000..99e8c87 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/_debug_response.py @@ -0,0 +1,210 @@ +from __future__ import annotations + +from html import escape +from inspect import getinnerframes +from pathlib import Path +from traceback import format_exception +from typing import TYPE_CHECKING, Any + +from litestar.enums import MediaType +from litestar.response import Response +from litestar.status_codes import HTTP_500_INTERNAL_SERVER_ERROR +from litestar.utils import get_name + +__all__ = ( + "create_debug_response", + "create_exception_html", + "create_frame_html", + "create_html_response_content", + "create_line_html", + "create_plain_text_response_content", + "get_symbol_name", +) + + +if TYPE_CHECKING: + from inspect import FrameInfo + + from litestar.connection import Request + from litestar.types import TypeEncodersMap + +tpl_dir = Path(__file__).parent / "templates" + + +def get_symbol_name(frame: FrameInfo) -> str: + """Return full name of the function that is being executed by the given frame. + + Args: + frame: An instance of [FrameInfo](https://docs.python.org/3/library/inspect.html#inspect.FrameInfo). + + Notes: + - class detection assumes standard names (self and cls) of params. + - if current class name can not be determined only function (method) name will be returned. + - we can not distinguish static methods from ordinary functions at the moment. + + Returns: + A string containing full function name. + """ + + locals_dict = frame.frame.f_locals + # this piece assumes that the code uses standard names "self" and "cls" + # in instance and class methods + instance_or_cls = inst if (inst := locals_dict.get("self")) is not None else locals_dict.get("cls") + + classname = f"{get_name(instance_or_cls)}." if instance_or_cls is not None else "" + + return f"{classname}{frame.function}" + + +def create_line_html( + line: str, + line_no: int, + frame_index: int, + idx: int, +) -> str: + """Produce HTML representation of a line including real line number in the source code. + + Args: + line: A string representing the current line. + line_no: The line number associated with the executed line. + frame_index: Index of the executed line in the code context. + idx: Index of the current line in the code context. + + Returns: + A string containing HTML representation of the given line. + """ + template = '<tr class="{line_class}"><td class="line_no">{line_no}</td><td class="code_line">{line}</td></tr>' + data = { + # line_no - frame_index produces actual line number of the very first line in the frame code context. + # so adding index (aka relative number) of a line in the code context we can calculate its actual number in the source file, + "line_no": line_no - frame_index + idx, + "line": escape(line).replace(" ", " "), + "line_class": "executed-line" if idx == frame_index else "", + } + return template.format(**data) + + +def create_frame_html(frame: FrameInfo, collapsed: bool) -> str: + """Produce HTML representation of the given frame object including filename containing source code and name of the + function being executed. + + Args: + frame: An instance of [FrameInfo](https://docs.python.org/3/library/inspect.html#inspect.FrameInfo). + collapsed: Flag controlling whether frame should be collapsed on the page load. + + Returns: + A string containing HTML representation of the execution frame. + """ + frame_tpl = (tpl_dir / "frame.html").read_text() + + code_lines: list[str] = [ + create_line_html(line, frame.lineno, frame.index or 0, idx) for idx, line in enumerate(frame.code_context or []) + ] + data = { + "file": escape(frame.filename), + "line": frame.lineno, + "symbol_name": escape(get_symbol_name(frame)), + "code": "".join(code_lines), + "frame_class": "collapsed" if collapsed else "", + } + return frame_tpl.format(**data) + + +def create_exception_html(exc: BaseException, line_limit: int) -> str: + """Produce HTML representation of the exception frames. + + Args: + exc: An Exception instance to generate. + line_limit: Number of lines of code context to return, which are centered around the executed line. + + Returns: + A string containing HTML representation of the execution frames related to the exception. + """ + frames = getinnerframes(exc.__traceback__, line_limit) if exc.__traceback__ else [] + result = [create_frame_html(frame=frame, collapsed=idx > 0) for idx, frame in enumerate(reversed(frames))] + return "".join(result) + + +def create_html_response_content(exc: Exception, request: Request, line_limit: int = 15) -> str: + """Given an exception, produces its traceback in HTML. + + Args: + exc: An Exception instance to render debug response from. + request: A :class:`Request <litestar.connection.Request>` instance. + line_limit: Number of lines of code context to return, which are centered around the executed line. + + Returns: + A string containing HTML page with exception traceback. + """ + exception_data: list[str] = [create_exception_html(exc, line_limit)] + cause = exc.__cause__ + while cause: + cause_data = create_exception_html(cause, line_limit) + cause_header = '<h4 class="cause-header">The above exception was caused by</h4>' + cause_error_description = f"<h3><span>{escape(str(cause))}</span></h3>" + cause_error = f"<h4><span>{escape(cause.__class__.__name__)}</span></h4>" + exception_data.append( + f'<div class="cause-wrapper">{cause_header}{cause_error}{cause_error_description}{cause_data}</div>' + ) + cause = cause.__cause__ + + scripts = (tpl_dir / "scripts.js").read_text() + styles = (tpl_dir / "styles.css").read_text() + body_tpl = (tpl_dir / "body.html").read_text() + return body_tpl.format( + scripts=scripts, + styles=styles, + error=f"<span>{escape(exc.__class__.__name__)}</span> on {request.method} {escape(request.url.path)}", + error_description=escape(str(exc)), + exception_data="".join(exception_data), + ) + + +def create_plain_text_response_content(exc: Exception) -> str: + """Given an exception, produces its traceback in plain text. + + Args: + exc: An Exception instance to render debug response from. + + Returns: + A string containing exception traceback. + """ + return "".join(format_exception(type(exc), value=exc, tb=exc.__traceback__)) + + +def create_debug_response(request: Request, exc: Exception) -> Response: + """Create debug response either in plain text or HTML depending on client capabilities. + + Args: + request: A :class:`Request <litestar.connection.Request>` instance. + exc: An Exception instance to render debug response from. + + Returns: + A response with a rendered exception traceback. + """ + if MediaType.HTML in request.headers.get("accept", ""): + content: Any = create_html_response_content(exc=exc, request=request) + media_type = MediaType.HTML + elif MediaType.JSON in request.headers.get("accept", ""): + content = {"details": create_plain_text_response_content(exc), "status_code": HTTP_500_INTERNAL_SERVER_ERROR} + media_type = MediaType.JSON + else: + content = create_plain_text_response_content(exc) + media_type = MediaType.TEXT + + return Response( + content=content, + media_type=media_type, + status_code=HTTP_500_INTERNAL_SERVER_ERROR, + type_encoders=_get_type_encoders_for_request(request), + ) + + +def _get_type_encoders_for_request(request: Request) -> TypeEncodersMap | None: + try: + return request.route_handler.resolve_type_encoders() + # we might be in a 404, or before we could resolve the handler, so this + # could potentially error out. In this case we fall back on the application + # type encoders + except (KeyError, AttributeError): + return request.app.type_encoders diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/middleware.py b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/middleware.py new file mode 100644 index 0000000..f3ff157 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/middleware.py @@ -0,0 +1,316 @@ +from __future__ import annotations + +import pdb # noqa: T100 +from dataclasses import asdict, dataclass, field +from inspect import getmro +from sys import exc_info +from traceback import format_exception +from typing import TYPE_CHECKING, Any, Type, cast + +from litestar.datastructures import Headers +from litestar.enums import MediaType, ScopeType +from litestar.exceptions import HTTPException, LitestarException, WebSocketException +from litestar.middleware.cors import CORSMiddleware +from litestar.middleware.exceptions._debug_response import _get_type_encoders_for_request, create_debug_response +from litestar.serialization import encode_json +from litestar.status_codes import HTTP_500_INTERNAL_SERVER_ERROR +from litestar.utils.deprecation import warn_deprecation + +__all__ = ("ExceptionHandlerMiddleware", "ExceptionResponseContent", "create_exception_response") + + +if TYPE_CHECKING: + from starlette.exceptions import HTTPException as StarletteHTTPException + + from litestar import Response + from litestar.app import Litestar + from litestar.connection import Request + from litestar.logging import BaseLoggingConfig + from litestar.types import ( + ASGIApp, + ExceptionHandler, + ExceptionHandlersMap, + Logger, + Receive, + Scope, + Send, + ) + from litestar.types.asgi_types import WebSocketCloseEvent + + +def get_exception_handler(exception_handlers: ExceptionHandlersMap, exc: Exception) -> ExceptionHandler | None: + """Given a dictionary that maps exceptions and status codes to handler functions, and an exception, returns the + appropriate handler if existing. + + Status codes are given preference over exception type. + + If no status code match exists, each class in the MRO of the exception type is checked and + the first matching handler is returned. + + Finally, if a ``500`` handler is registered, it will be returned for any exception that isn't a + subclass of :class:`HTTPException <litestar.exceptions.HTTPException>`. + + Args: + exception_handlers: Mapping of status codes and exception types to handlers. + exc: Exception Instance to be resolved to a handler. + + Returns: + Optional exception handler callable. + """ + if not exception_handlers: + return None + + default_handler: ExceptionHandler | None = None + if isinstance(exc, HTTPException): + if exception_handler := exception_handlers.get(exc.status_code): + return exception_handler + else: + default_handler = exception_handlers.get(HTTP_500_INTERNAL_SERVER_ERROR) + + return next( + (exception_handlers[cast("Type[Exception]", cls)] for cls in getmro(type(exc)) if cls in exception_handlers), + default_handler, + ) + + +@dataclass +class ExceptionResponseContent: + """Represent the contents of an exception-response.""" + + status_code: int + """Exception status code.""" + detail: str + """Exception details or message.""" + media_type: MediaType | str + """Media type of the response.""" + headers: dict[str, str] | None = field(default=None) + """Headers to attach to the response.""" + extra: dict[str, Any] | list[Any] | None = field(default=None) + """An extra mapping to attach to the exception.""" + + def to_response(self, request: Request | None = None) -> Response: + """Create a response from the model attributes. + + Returns: + A response instance. + """ + from litestar.response import Response + + content: Any = {k: v for k, v in asdict(self).items() if k not in ("headers", "media_type") and v is not None} + + if self.media_type != MediaType.JSON: + content = encode_json(content) + + return Response( + content=content, + headers=self.headers, + status_code=self.status_code, + media_type=self.media_type, + type_encoders=_get_type_encoders_for_request(request) if request is not None else None, + ) + + +def _starlette_exception_handler(request: Request[Any, Any, Any], exc: StarletteHTTPException) -> Response: + return create_exception_response( + request=request, + exc=HTTPException( + detail=exc.detail, + status_code=exc.status_code, + headers=exc.headers, + ), + ) + + +def create_exception_response(request: Request[Any, Any, Any], exc: Exception) -> Response: + """Construct a response from an exception. + + Notes: + - For instances of :class:`HTTPException <litestar.exceptions.HTTPException>` or other exception classes that have a + ``status_code`` attribute (e.g. Starlette exceptions), the status code is drawn from the exception, otherwise + response status is ``HTTP_500_INTERNAL_SERVER_ERROR``. + + Args: + request: The request that triggered the exception. + exc: An exception. + + Returns: + Response: HTTP response constructed from exception details. + """ + headers: dict[str, Any] | None + extra: dict[str, Any] | list | None + + if isinstance(exc, HTTPException): + status_code = exc.status_code + headers = exc.headers + extra = exc.extra + else: + status_code = HTTP_500_INTERNAL_SERVER_ERROR + headers = None + extra = None + + detail = ( + exc.detail + if isinstance(exc, LitestarException) and status_code != HTTP_500_INTERNAL_SERVER_ERROR + else "Internal Server Error" + ) + + try: + media_type = request.route_handler.media_type + except (KeyError, AttributeError): + media_type = MediaType.JSON + + content = ExceptionResponseContent( + status_code=status_code, + detail=detail, + headers=headers, + extra=extra, + media_type=media_type, + ) + return content.to_response(request=request) + + +class ExceptionHandlerMiddleware: + """Middleware used to wrap an ASGIApp inside a try catch block and handle any exceptions raised. + + This used in multiple layers of Litestar. + """ + + def __init__(self, app: ASGIApp, debug: bool | None, exception_handlers: ExceptionHandlersMap) -> None: + """Initialize ``ExceptionHandlerMiddleware``. + + Args: + app: The ``next`` ASGI app to call. + debug: Whether ``debug`` mode is enabled. Deprecated. Debug mode will be inferred from the request scope + exception_handlers: A dictionary mapping status codes and/or exception types to handler functions. + + .. deprecated:: 2.0.0 + The ``debug`` parameter is deprecated. It will be inferred from the request scope + """ + self.app = app + self.exception_handlers = exception_handlers + self.debug = debug + if debug is not None: + warn_deprecation( + "2.0.0", + deprecated_name="debug", + kind="parameter", + info="Debug mode will be inferred from the request scope", + ) + + self._get_debug = self._get_debug_scope if debug is None else lambda *a: debug + + @staticmethod + def _get_debug_scope(scope: Scope) -> bool: + return scope["app"].debug + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + """ASGI-callable. + + Args: + scope: The ASGI connection scope. + receive: The ASGI receive function. + send: The ASGI send function. + + Returns: + None + """ + try: + await self.app(scope, receive, send) + except Exception as e: # noqa: BLE001 + litestar_app = scope["app"] + + if litestar_app.logging_config and (logger := litestar_app.logger): + self.handle_exception_logging(logger=logger, logging_config=litestar_app.logging_config, scope=scope) + + for hook in litestar_app.after_exception: + await hook(e, scope) + + if litestar_app.pdb_on_exception: + pdb.post_mortem() + + if scope["type"] == ScopeType.HTTP: + await self.handle_request_exception( + litestar_app=litestar_app, scope=scope, receive=receive, send=send, exc=e + ) + else: + await self.handle_websocket_exception(send=send, exc=e) + + async def handle_request_exception( + self, litestar_app: Litestar, scope: Scope, receive: Receive, send: Send, exc: Exception + ) -> None: + """Handle exception raised inside 'http' scope routes. + + Args: + litestar_app: The litestar app instance. + scope: The ASGI connection scope. + receive: The ASGI receive function. + send: The ASGI send function. + exc: The caught exception. + + Returns: + None. + """ + + headers = Headers.from_scope(scope=scope) + if litestar_app.cors_config and (origin := headers.get("origin")): + cors_middleware = CORSMiddleware(app=self.app, config=litestar_app.cors_config) + send = cors_middleware.send_wrapper(send=send, origin=origin, has_cookie="cookie" in headers) + + exception_handler = get_exception_handler(self.exception_handlers, exc) or self.default_http_exception_handler + request: Request[Any, Any, Any] = litestar_app.request_class(scope=scope, receive=receive, send=send) + response = exception_handler(request, exc) + await response.to_asgi_response(app=None, request=request)(scope=scope, receive=receive, send=send) + + @staticmethod + async def handle_websocket_exception(send: Send, exc: Exception) -> None: + """Handle exception raised inside 'websocket' scope routes. + + Args: + send: The ASGI send function. + exc: The caught exception. + + Returns: + None. + """ + code = 4000 + HTTP_500_INTERNAL_SERVER_ERROR + reason = "Internal Server Error" + if isinstance(exc, WebSocketException): + code = exc.code + reason = exc.detail + elif isinstance(exc, LitestarException): + reason = exc.detail + + event: WebSocketCloseEvent = {"type": "websocket.close", "code": code, "reason": reason} + await send(event) + + def default_http_exception_handler(self, request: Request, exc: Exception) -> Response[Any]: + """Handle an HTTP exception by returning the appropriate response. + + Args: + request: An HTTP Request instance. + exc: The caught exception. + + Returns: + An HTTP response. + """ + status_code = exc.status_code if isinstance(exc, HTTPException) else HTTP_500_INTERNAL_SERVER_ERROR + if status_code == HTTP_500_INTERNAL_SERVER_ERROR and self._get_debug_scope(request.scope): + return create_debug_response(request=request, exc=exc) + return create_exception_response(request=request, exc=exc) + + def handle_exception_logging(self, logger: Logger, logging_config: BaseLoggingConfig, scope: Scope) -> None: + """Handle logging - if the litestar app has a logging config in place. + + Args: + logger: A logger instance. + logging_config: Logging Config instance. + scope: The ASGI connection scope. + + Returns: + None + """ + if ( + logging_config.log_exceptions == "always" + or (logging_config.log_exceptions == "debug" and self._get_debug_scope(scope)) + ) and logging_config.exception_logging_handler: + logging_config.exception_logging_handler(logger, scope, format_exception(*exc_info())) diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/body.html b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/body.html new file mode 100644 index 0000000..1c6705c --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/body.html @@ -0,0 +1,20 @@ +<!doctype html> + +<html lang="en"> + <head> + <meta charset="utf-8" /> + <style type="text/css"> + {styles} + </style> + <title>Litestar exception page</title> + </head> + <body> + <h4>{error}</h4> + <h3><span>{error_description}</span></h3> + {exception_data} + <script type="text/javascript"> + // prettier-ignore + {scripts} // NOSONAR + </script> + </body> +</html> diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/frame.html b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/frame.html new file mode 100644 index 0000000..2ead8dd --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/frame.html @@ -0,0 +1,12 @@ +<div class="frame {frame_class}"> + <div class="frame-name"> + <span class="expander">▼</span> + <span class="breakable">{file}</span> in <span>{symbol_name}</span> at line + <span>{line}</span> + </div> + <div class="code-snippet-wrapper"> + <table role="presentation" class="code-snippet"> + {code} + </table> + </div> +</div> diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/scripts.js b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/scripts.js new file mode 100644 index 0000000..014a256 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/scripts.js @@ -0,0 +1,27 @@ +const expanders = document.querySelectorAll(".frame .expander"); + +for (const expander of expanders) { + expander.addEventListener("click", (evt) => { + const currentSnippet = evt.currentTarget.closest(".frame"); + const snippetWrapper = currentSnippet.querySelector( + ".code-snippet-wrapper", + ); + if (currentSnippet.classList.contains("collapsed")) { + snippetWrapper.style.height = `${snippetWrapper.scrollHeight}px`; + currentSnippet.classList.remove("collapsed"); + } else { + currentSnippet.classList.add("collapsed"); + snippetWrapper.style.height = "0px"; + } + }); +} + +// init height for non-collapsed code snippets so animation will be show +// their first collapse +const nonCollapsedSnippets = document.querySelectorAll( + ".frame:not(.collapsed) .code-snippet-wrapper", +); + +for (const snippet of nonCollapsedSnippets) { + snippet.style.height = `${snippet.scrollHeight}px`; +} diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/styles.css b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/styles.css new file mode 100644 index 0000000..6b98b89 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/middleware/exceptions/templates/styles.css @@ -0,0 +1,121 @@ +:root { + --code-background-color: #f5f5f5; + --code-background-color-dark: #b8b8b8; + --code-color: #1d2534; + --code-color-light: #546996; + --code-font-family: Consolas, monospace; + --header-color: #303b55; + --warn-color: hsl(356, 92%, 60%); + --text-font-family: -apple-system, BlinkMacSystemFont, Helvetica, Arial, + sans-serif; +} + +html { + font-size: 20px; +} + +body { + font-family: var(--text-font-family); + font-size: 0.8rem; +} + +h1, +h2, +h3, +h4 { + color: var(--header-color); +} + +h4 { + font-size: 1rem; +} + +h3 { + font-size: 1.35rem; +} + +h2 { + font-size: 1.83rem; +} + +h3 span, +h4 span { + color: var(--warn-color); +} + +.frame { + background-color: var(--code-background-color); + border-radius: 0.2rem; + margin-bottom: 20px; +} + +.frame-name { + border-bottom: 1px solid var(--code-color-light); + padding: 10px 16px; +} + +.frame.collapsed .frame-name { + border-bottom: none; +} + +.frame-name span { + font-weight: 700; +} + +span.expander { + display: inline-block; + margin-right: 10px; + cursor: pointer; + transition: transform 0.33s ease-in-out; +} + +.frame.collapsed span.expander { + transform: rotate(-90deg); +} + +.frame-name span.breakable { + word-break: break-all; +} + +.code-snippet-wrapper { + height: auto; + overflow-y: hidden; + transition: height 0.33s ease-in-out; +} + +.frame.collapsed .code-snippet-wrapper { + height: 0; +} + +.code-snippet { + margin: 10px 16px; + border-spacing: 0 0; + color: var(--code-color); + font-family: var(--code-font-family); + font-size: 0.68rem; +} + +.code-snippet td { + padding: 0; + text-align: left; +} + +td.line_no { + color: var(--code-color-light); + min-width: 4ch; + padding-right: 20px; + text-align: right; + user-select: none; +} + +td.code_line { + width: 99%; +} + +tr.executed-line { + background-color: var(--code-background-color-dark); +} + +.cause-wrapper { + margin-top: 50px; +} |