diff options
| author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 | 
|---|---|---|
| committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 | 
| commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
| tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/security/session_auth | |
| parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) | |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/security/session_auth')
6 files changed, 266 insertions, 0 deletions
| diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__init__.py new file mode 100644 index 0000000..7c83991 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__init__.py @@ -0,0 +1,4 @@ +from litestar.security.session_auth.auth import SessionAuth +from litestar.security.session_auth.middleware import SessionAuthMiddleware + +__all__ = ("SessionAuth", "SessionAuthMiddleware") diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/__init__.cpython-311.pycBinary files differ new file mode 100644 index 0000000..95bf5c1 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/auth.cpython-311.pycBinary files differ new file mode 100644 index 0000000..8d4aa6c --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/auth.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/middleware.cpython-311.pycBinary files differ new file mode 100644 index 0000000..27e4213 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/middleware.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/auth.py b/venv/lib/python3.11/site-packages/litestar/security/session_auth/auth.py new file mode 100644 index 0000000..7a5c542 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/auth.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Iterable, Sequence, cast + +from litestar.middleware.base import DefineMiddleware +from litestar.middleware.session.base import BaseBackendConfig, BaseSessionBackendT +from litestar.openapi.spec import Components, SecurityRequirement, SecurityScheme +from litestar.security.base import AbstractSecurityConfig, UserType +from litestar.security.session_auth.middleware import MiddlewareWrapper, SessionAuthMiddleware + +__all__ = ("SessionAuth",) + +if TYPE_CHECKING: +    from litestar.connection import ASGIConnection +    from litestar.di import Provide +    from litestar.types import ControllerRouterHandler, Guard, Method, Scopes, SyncOrAsyncUnion, TypeEncodersMap + + +@dataclass +class SessionAuth(Generic[UserType, BaseSessionBackendT], AbstractSecurityConfig[UserType, Dict[str, Any]]): +    """Session Based Security Backend.""" + +    session_backend_config: BaseBackendConfig[BaseSessionBackendT]  # pyright: ignore +    """A session backend config.""" +    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] +    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + +    Notes: +        - User and Auth can be any arbitrary values specified by the security backend. +        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. +          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. +        - The callable can be sync or async. If it is sync, it will be wrapped to support async. + +    """ + +    authentication_middleware_class: type[SessionAuthMiddleware] = field(default=SessionAuthMiddleware)  # pyright: ignore +    """The authentication middleware class to use. + +    Must inherit from :class:`SessionAuthMiddleware <litestar.security.session_auth.middleware.SessionAuthMiddleware>` +    """ + +    guards: Iterable[Guard] | None = field(default=None) +    """An iterable of guards to call for requests, providing authorization functionalities.""" +    exclude: str | list[str] | None = field(default=None) +    """A pattern or list of patterns to skip in the authentication middleware.""" +    exclude_opt_key: str = field(default="exclude_from_auth") +    """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" +    exclude_http_methods: Sequence[Method] | None = field( +        default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) +    ) +    """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" +    scopes: Scopes | None = field(default=None) +    """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be +    processed.""" +    route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) +    """An optional iterable of route handlers to register.""" +    dependencies: dict[str, Provide] | None = field(default=None) +    """An optional dictionary of dependency providers.""" + +    type_encoders: TypeEncodersMap | None = field(default=None) +    """A mapping of types to callables that transform them into types supported for serialization.""" + +    @property +    def middleware(self) -> DefineMiddleware: +        """Use this property to insert the config into a middleware list on one of the application layers. + +        Examples: +            .. code-block:: python + +                from typing import Any +                from os import urandom + +                from litestar import Litestar, Request, get +                from litestar_session import SessionAuth + + +                async def retrieve_user_from_session(session: dict[str, Any]) -> Any: +                    # implement logic here to retrieve a ``user`` datum given the session dictionary +                    ... + + +                session_auth_config = SessionAuth( +                    secret=urandom(16), retrieve_user_handler=retrieve_user_from_session +                ) + + +                @get("/") +                def my_handler(request: Request) -> None: ... + + +                app = Litestar(route_handlers=[my_handler], middleware=[session_auth_config.middleware]) + + +        Returns: +            An instance of DefineMiddleware including ``self`` as the config kwarg value. +        """ +        return DefineMiddleware(MiddlewareWrapper, config=self) + +    @property +    def session_backend(self) -> BaseSessionBackendT: +        """Create a session backend. + +        Returns: +            A subclass of :class:`BaseSessionBackend <litestar.middleware.session.base.BaseSessionBackend>` +        """ +        return self.session_backend_config._backend_class(config=self.session_backend_config)  # pyright: ignore + +    @property +    def openapi_components(self) -> Components: +        """Create OpenAPI documentation for the Session Authentication schema used. + +        Returns: +            An :class:`Components <litestar.openapi.spec.components.Components>` instance. +        """ +        return Components( +            security_schemes={ +                "sessionCookie": SecurityScheme( +                    type="apiKey", +                    name=self.session_backend_config.key, +                    security_scheme_in="cookie",  # pyright: ignore +                    description="Session cookie authentication.", +                ) +            } +        ) + +    @property +    def security_requirement(self) -> SecurityRequirement: +        """Return OpenAPI 3.1. + +        :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` for the auth +        backend. + +        Returns: +            An OpenAPI 3.1 :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` dictionary. +        """ +        return {"sessionCookie": []} diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/middleware.py b/venv/lib/python3.11/site-packages/litestar/security/session_auth/middleware.py new file mode 100644 index 0000000..bb3fce4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/middleware.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Sequence + +from litestar.exceptions import NotAuthorizedException +from litestar.middleware.authentication import ( +    AbstractAuthenticationMiddleware, +    AuthenticationResult, +) +from litestar.middleware.exceptions import ExceptionHandlerMiddleware +from litestar.types import Empty, Method, Scopes + +__all__ = ("MiddlewareWrapper", "SessionAuthMiddleware") + +if TYPE_CHECKING: +    from litestar.connection import ASGIConnection +    from litestar.security.session_auth.auth import SessionAuth +    from litestar.types import ASGIApp, Receive, Scope, Send + + +class MiddlewareWrapper: +    """Wrapper class that serves as the middleware entry point.""" + +    def __init__(self, app: ASGIApp, config: SessionAuth[Any, Any]) -> None: +        """Wrap the SessionAuthMiddleware inside ExceptionHandlerMiddleware, and it wraps this inside SessionMiddleware. +        This allows the auth middleware to raise exceptions and still have the response handled, while having the +        session cleared. + +        Args: +            app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. +            config: An instance of SessionAuth. +        """ +        self.app = app +        self.config = config +        self.has_wrapped_middleware = False + +    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: +        """Handle creating a middleware stack and calling it. + +        Args: +            scope: The ASGI connection scope. +            receive: The ASGI receive function. +            send: The ASGI send function. + +        Returns: +            None +        """ +        if not self.has_wrapped_middleware: +            litestar_app = scope["app"] +            auth_middleware = self.config.authentication_middleware_class( +                app=self.app, +                exclude=self.config.exclude, +                exclude_http_methods=self.config.exclude_http_methods, +                exclude_opt_key=self.config.exclude_opt_key, +                scopes=self.config.scopes, +                retrieve_user_handler=self.config.retrieve_user_handler,  # type: ignore[arg-type] +            ) +            exception_middleware = ExceptionHandlerMiddleware( +                app=auth_middleware, +                exception_handlers=litestar_app.exception_handlers or {},  # pyright: ignore +                debug=None, +            ) +            self.app = self.config.session_backend_config.middleware.middleware( +                app=exception_middleware, +                backend=self.config.session_backend, +            ) +            self.has_wrapped_middleware = True +        await self.app(scope, receive, send) + + +class SessionAuthMiddleware(AbstractAuthenticationMiddleware): +    """Session Authentication Middleware.""" + +    def __init__( +        self, +        app: ASGIApp, +        exclude: str | list[str] | None, +        exclude_http_methods: Sequence[Method] | None, +        exclude_opt_key: str, +        retrieve_user_handler: Callable[[dict[str, Any], ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], +        scopes: Scopes | None, +    ) -> None: +        """Session based authentication middleware. + +        Args: +            app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. +            exclude: A pattern or list of patterns to skip in the authentication middleware. +            exclude_http_methods: A sequence of http methods that do not require authentication. +            exclude_opt_key: An identifier to use on routes to disable authentication and authorization checks for a particular route. +            scopes: ASGI scopes processed by the authentication middleware. +            retrieve_user_handler: Callable that receives the ``session`` value from the authentication middleware and returns a ``user`` value. +        """ +        super().__init__( +            app=app, +            exclude=exclude, +            exclude_from_auth_key=exclude_opt_key, +            exclude_http_methods=exclude_http_methods, +            scopes=scopes, +        ) +        self.retrieve_user_handler = retrieve_user_handler + +    async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: +        """Authenticate an incoming connection. + +        Args: +            connection: An :class:`ASGIConnection <.connection.ASGIConnection>` instance. + +        Raises: +            NotAuthorizedException: if session data is empty or user is not found. + +        Returns: +            :class:`AuthenticationResult <.middleware.authentication.AuthenticationResult>` +        """ +        if not connection.session or connection.scope["session"] is Empty: +            # the assignment of 'Empty' forces the session middleware to clear session data. +            connection.scope["session"] = Empty +            raise NotAuthorizedException("no session data found") + +        user = await self.retrieve_user_handler(connection.session, connection) + +        if not user: +            connection.scope["session"] = Empty +            raise NotAuthorizedException("no user correlating to session found") + +        return AuthenticationResult(user=user, auth=connection.session) | 
