diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/security')
18 files changed, 0 insertions, 1473 deletions
| diff --git a/venv/lib/python3.11/site-packages/litestar/security/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/__init__.py deleted file mode 100644 index d864d43..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from litestar.security.base import AbstractSecurityConfig - -__all__ = ("AbstractSecurityConfig",) diff --git a/venv/lib/python3.11/site-packages/litestar/security/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/__pycache__/__init__.cpython-311.pycBinary files differ deleted file mode 100644 index 419b39e..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/__pycache__/base.cpython-311.pycBinary files differ deleted file mode 100644 index 6133974..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/__pycache__/base.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/base.py b/venv/lib/python3.11/site-packages/litestar/security/base.py deleted file mode 100644 index fbe7913..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/base.py +++ /dev/null @@ -1,183 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from copy import copy -from dataclasses import field -from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Sequence, TypeVar, cast - -from litestar import Response -from litestar.utils.sync import ensure_async_callable - -if TYPE_CHECKING: -    from litestar.config.app import AppConfig -    from litestar.connection import ASGIConnection -    from litestar.di import Provide -    from litestar.enums import MediaType, OpenAPIMediaType -    from litestar.middleware.authentication import AbstractAuthenticationMiddleware -    from litestar.middleware.base import DefineMiddleware -    from litestar.openapi.spec import Components, SecurityRequirement -    from litestar.types import ( -        ControllerRouterHandler, -        Guard, -        Method, -        ResponseCookies, -        Scopes, -        SyncOrAsyncUnion, -        TypeEncodersMap, -    ) - -__all__ = ("AbstractSecurityConfig",) - -UserType = TypeVar("UserType") -AuthType = TypeVar("AuthType") - - -class AbstractSecurityConfig(ABC, Generic[UserType, AuthType]): -    """A base class for Security Configs - this class can be used on the application level -    or be manually configured on the router / controller level to provide auth. -    """ - -    authentication_middleware_class: type[AbstractAuthenticationMiddleware] -    """The authentication middleware class to use. - -    Must inherit from -    :class:`AbstractAuthenticationMiddleware <litestar.middleware.authentication.AbstractAuthenticationMiddleware>` -    """ -    guards: Iterable[Guard] | None = None -    """An iterable of guards to call for requests, providing authorization functionalities.""" -    exclude: str | list[str] | None = None -    """A pattern or list of patterns to skip in the authentication middleware.""" -    exclude_opt_key: str = "exclude_from_auth" -    """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" -    exclude_http_methods: Sequence[Method] | None = field( -        default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) -    ) -    """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" -    scopes: Scopes | None = None -    """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be -    processed.""" -    route_handlers: Iterable[ControllerRouterHandler] | None = None -    """An optional iterable of route handlers to register.""" -    dependencies: dict[str, Provide] | None = None -    """An optional dictionary of dependency providers.""" -    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] -    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - -    Notes: -        - User and Auth can be any arbitrary values specified by the security backend. -        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. -          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. -        - The callable can be sync or async. If it is sync, it will be wrapped to support async. - -    """ -    type_encoders: TypeEncodersMap | None = None -    """A mapping of types to callables that transform them into types supported for serialization.""" - -    def on_app_init(self, app_config: AppConfig) -> AppConfig: -        """Handle app init by injecting middleware, guards etc. into the app. This method can be used only on the app -        level. - -        Args: -            app_config: An instance of :class:`AppConfig <.config.app.AppConfig>` - -        Returns: -            The :class:`AppConfig <.config.app.AppConfig>`. -        """ -        app_config.middleware.insert(0, self.middleware) - -        if app_config.openapi_config: -            app_config.openapi_config = copy(app_config.openapi_config) -            if isinstance(app_config.openapi_config.components, list): -                app_config.openapi_config.components.append(self.openapi_components) -            elif app_config.openapi_config.components: -                app_config.openapi_config.components = [self.openapi_components, app_config.openapi_config.components] -            else: -                app_config.openapi_config.components = [self.openapi_components] - -            if isinstance(app_config.openapi_config.security, list): -                app_config.openapi_config.security.append(self.security_requirement) -            else: -                app_config.openapi_config.security = [self.security_requirement] - -        if self.guards: -            app_config.guards.extend(self.guards) - -        if self.dependencies: -            app_config.dependencies.update(self.dependencies) - -        if self.route_handlers: -            app_config.route_handlers.extend(self.route_handlers) - -        if self.type_encoders is None: -            self.type_encoders = app_config.type_encoders - -        return app_config - -    def create_response( -        self, -        content: Any | None, -        status_code: int, -        media_type: MediaType | OpenAPIMediaType | str, -        headers: dict[str, Any] | None = None, -        cookies: ResponseCookies | None = None, -    ) -> Response[Any]: -        """Create a response object. - -        Handles setting the type encoders mapping on the response. - -        Args: -            content: A value for the response body that will be rendered into bytes string. -            status_code: An HTTP status code. -            media_type: A value for the response 'Content-Type' header. -            headers: A string keyed dictionary of response headers. Header keys are insensitive. -            cookies: A list of :class:`Cookie <litestar.datastructures.Cookie>` instances to be set under -                the response 'Set-Cookie' header. - -        Returns: -            A response object. -        """ -        return Response( -            content=content, -            status_code=status_code, -            media_type=media_type, -            headers=headers, -            cookies=cookies, -            type_encoders=self.type_encoders, -        ) - -    def __post_init__(self) -> None: -        self.retrieve_user_handler = ensure_async_callable(self.retrieve_user_handler) - -    @property -    @abstractmethod -    def openapi_components(self) -> Components: -        """Create OpenAPI documentation for the JWT auth schema used. - -        Returns: -            An :class:`Components <litestar.openapi.spec.components.Components>` instance. -        """ -        raise NotImplementedError - -    @property -    @abstractmethod -    def security_requirement(self) -> SecurityRequirement: -        """Return OpenAPI 3.1. - -        :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` for the auth -        backend. - -        Returns: -            An OpenAPI 3.1 :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` dictionary. -        """ -        raise NotImplementedError - -    @property -    @abstractmethod -    def middleware(self) -> DefineMiddleware: -        """Create an instance of the config's ``authentication_middleware_class`` attribute and any required kwargs, -        wrapping it in Litestar's ``DefineMiddleware``. - -        Returns: -            An instance of :class:`DefineMiddleware <litestar.middleware.base.DefineMiddleware>`. -        """ -        raise NotImplementedError diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py deleted file mode 100644 index 4fd88de..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -from litestar.security.jwt.auth import ( -    BaseJWTAuth, -    JWTAuth, -    JWTCookieAuth, -    OAuth2Login, -    OAuth2PasswordBearerAuth, -) -from litestar.security.jwt.middleware import ( -    JWTAuthenticationMiddleware, -    JWTCookieAuthenticationMiddleware, -) -from litestar.security.jwt.token import Token - -__all__ = ( -    "BaseJWTAuth", -    "JWTAuth", -    "JWTAuthenticationMiddleware", -    "JWTCookieAuth", -    "JWTCookieAuthenticationMiddleware", -    "OAuth2Login", -    "OAuth2PasswordBearerAuth", -    "Token", -) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pycBinary files differ deleted file mode 100644 index f04d57f..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pycBinary files differ deleted file mode 100644 index cec42c0..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pycBinary files differ deleted file mode 100644 index 8d5603e..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pycBinary files differ deleted file mode 100644 index b4f8c45..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py deleted file mode 100644 index 2a0f094..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py +++ /dev/null @@ -1,691 +0,0 @@ -from __future__ import annotations - -from dataclasses import asdict, dataclass, field -from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Literal, Sequence, TypeVar, cast - -from litestar.datastructures import Cookie -from litestar.enums import MediaType -from litestar.middleware import DefineMiddleware -from litestar.openapi.spec import Components, OAuthFlow, OAuthFlows, SecurityRequirement, SecurityScheme -from litestar.security.base import AbstractSecurityConfig -from litestar.security.jwt.middleware import JWTAuthenticationMiddleware, JWTCookieAuthenticationMiddleware -from litestar.security.jwt.token import Token -from litestar.status_codes import HTTP_201_CREATED -from litestar.types import ControllerRouterHandler, Empty, Guard, Method, Scopes, SyncOrAsyncUnion, TypeEncodersMap - -__all__ = ("BaseJWTAuth", "JWTAuth", "JWTCookieAuth", "OAuth2Login", "OAuth2PasswordBearerAuth") - - -if TYPE_CHECKING: -    from litestar import Response -    from litestar.connection import ASGIConnection -    from litestar.di import Provide - - -UserType = TypeVar("UserType") - - -class BaseJWTAuth(Generic[UserType], AbstractSecurityConfig[UserType, Token]): -    """Base class for JWT Auth backends""" - -    token_secret: str -    """Key with which to generate the token hash. - -    Notes: -        - This value should be kept as a secret and the standard practice is to inject it into the environment. -    """ -    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] -    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - -    Notes: -        - User and Auth can be any arbitrary values specified by the security backend. -        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. -          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. -        - The callable can be sync or async. If it is sync, it will be wrapped to support async. - -    """ -    algorithm: str -    """Algorithm to use for JWT hashing.""" -    auth_header: str -    """Request header key from which to retrieve the token. - -    E.g. ``Authorization`` or ``X-Api-Key``. -    """ -    default_token_expiration: timedelta -    """The default value for token expiration.""" -    openapi_security_scheme_name: str -    """The value to use for the OpenAPI security scheme and security requirements.""" -    description: str -    """Description for the OpenAPI security scheme.""" -    authentication_middleware_class: type[JWTAuthenticationMiddleware]  # pyright: ignore -    """The authentication middleware class to use. - -    Must inherit from :class:`JWTAuthenticationMiddleware` -    """ - -    @property -    def openapi_components(self) -> Components: -        """Create OpenAPI documentation for the JWT auth schema used. - -        Returns: -            An :class:`Components <litestar.openapi.spec.components.Components>` instance. -        """ -        return Components( -            security_schemes={ -                self.openapi_security_scheme_name: SecurityScheme( -                    type="http", -                    scheme="Bearer", -                    name=self.auth_header, -                    bearer_format="JWT", -                    description=self.description, -                ) -            } -        ) - -    @property -    def security_requirement(self) -> SecurityRequirement: -        """Return OpenAPI 3.1. - -        :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` - -        Returns: -            An OpenAPI 3.1 -            :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` -            dictionary. -        """ -        return {self.openapi_security_scheme_name: []} - -    @property -    def middleware(self) -> DefineMiddleware: -        """Create :class:`JWTAuthenticationMiddleware` wrapped in -        :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. - -        Returns: -            An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. -        """ -        return DefineMiddleware( -            self.authentication_middleware_class, -            algorithm=self.algorithm, -            auth_header=self.auth_header, -            exclude=self.exclude, -            exclude_opt_key=self.exclude_opt_key, -            exclude_http_methods=self.exclude_http_methods, -            retrieve_user_handler=self.retrieve_user_handler, -            scopes=self.scopes, -            token_secret=self.token_secret, -        ) - -    def login( -        self, -        identifier: str, -        *, -        response_body: Any = Empty, -        response_media_type: str | MediaType = MediaType.JSON, -        response_status_code: int = HTTP_201_CREATED, -        token_expiration: timedelta | None = None, -        token_issuer: str | None = None, -        token_audience: str | None = None, -        token_unique_jwt_id: str | None = None, -        token_extras: dict[str, Any] | None = None, -        send_token_as_response_body: bool = False, -    ) -> Response[Any]: -        """Create a response with a JWT header. - -        Args: -            identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. -            response_body: An optional response body to send. -            response_media_type: An optional ``Content-Type``. Defaults to ``application/json``. -            response_status_code: An optional status code for the response. Defaults to ``201``. -            token_expiration: An optional timedelta for the token expiration. -            token_issuer: An optional value of the token ``iss`` field. -            token_audience: An optional value for the token ``aud`` field. -            token_unique_jwt_id: An optional value for the token ``jti`` field. -            token_extras: An optional dictionary to include in the token ``extras`` field. -            send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }`` -                will be returned as the response body. Note: if a response body is passed this setting will be ignored. - -        Returns: -            A :class:`Response <.response.Response>` instance. -        """ -        encoded_token = self.create_token( -            identifier=identifier, -            token_expiration=token_expiration, -            token_issuer=token_issuer, -            token_audience=token_audience, -            token_unique_jwt_id=token_unique_jwt_id, -            token_extras=token_extras, -        ) - -        if response_body is not Empty: -            body = response_body -        elif send_token_as_response_body: -            body = {"token": encoded_token} -        else: -            body = None - -        return self.create_response( -            content=body, -            headers={self.auth_header: self.format_auth_header(encoded_token)}, -            media_type=response_media_type, -            status_code=response_status_code, -        ) - -    def create_token( -        self, -        identifier: str, -        token_expiration: timedelta | None = None, -        token_issuer: str | None = None, -        token_audience: str | None = None, -        token_unique_jwt_id: str | None = None, -        token_extras: dict | None = None, -    ) -> str: -        """Create a Token instance from the passed in parameters, persists and returns it. - -        Args: -            identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. -            token_expiration: An optional timedelta for the token expiration. -            token_issuer: An optional value of the token ``iss`` field. -            token_audience: An optional value for the token ``aud`` field. -            token_unique_jwt_id: An optional value for the token ``jti`` field. -            token_extras: An optional dictionary to include in the token ``extras`` field. - -        Returns: -            The created token. -        """ -        token = Token( -            sub=identifier, -            exp=(datetime.now(timezone.utc) + (token_expiration or self.default_token_expiration)), -            iss=token_issuer, -            aud=token_audience, -            jti=token_unique_jwt_id, -            extras=token_extras or {}, -        ) -        return token.encode(secret=self.token_secret, algorithm=self.algorithm) - -    def format_auth_header(self, encoded_token: str) -> str: -        """Format a token according to the specified OpenAPI scheme. - -        Args: -            encoded_token: An encoded JWT token - -        Returns: -            The encoded token formatted for the HTTP headers -        """ -        security = self.openapi_components.security_schemes.get(self.openapi_security_scheme_name, None)  # type: ignore[union-attr] -        return f"{security.scheme} {encoded_token}" if isinstance(security, SecurityScheme) else encoded_token - - -@dataclass -class JWTAuth(Generic[UserType], BaseJWTAuth[UserType]): -    """JWT Authentication Configuration. - -    This class is the main entry point to the library, and it includes methods to create the middleware, provide login -    functionality, and create OpenAPI documentation. -    """ - -    token_secret: str -    """Key with which to generate the token hash. - -    Notes: -        - This value should be kept as a secret and the standard practice is to inject it into the environment. -    """ -    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] -    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - -    Notes: -        - User and Auth can be any arbitrary values specified by the security backend. -        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. -          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. -        - The callable can be sync or async. If it is sync, it will be wrapped to support async. - -    """ -    guards: Iterable[Guard] | None = field(default=None) -    """An iterable of guards to call for requests, providing authorization functionalities.""" -    exclude: str | list[str] | None = field(default=None) -    """A pattern or list of patterns to skip in the authentication middleware.""" -    exclude_opt_key: str = field(default="exclude_from_auth") -    """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" -    exclude_http_methods: Sequence[Method] | None = field( -        default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) -    ) -    """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" -    scopes: Scopes | None = field(default=None) -    """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be -    processed.""" -    route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) -    """An optional iterable of route handlers to register.""" -    dependencies: dict[str, Provide] | None = field(default=None) -    """An optional dictionary of dependency providers.""" - -    type_encoders: TypeEncodersMap | None = field(default=None) -    """A mapping of types to callables that transform them into types supported for serialization.""" - -    algorithm: str = field(default="HS256") -    """Algorithm to use for JWT hashing.""" -    auth_header: str = field(default="Authorization") -    """Request header key from which to retrieve the token. - -    E.g. ``Authorization`` or ``X-Api-Key``. -    """ -    default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) -    """The default value for token expiration.""" -    openapi_security_scheme_name: str = field(default="BearerToken") -    """The value to use for the OpenAPI security scheme and security requirements.""" -    description: str = field(default="JWT api-key authentication and authorization.") -    """Description for the OpenAPI security scheme.""" -    authentication_middleware_class: type[JWTAuthenticationMiddleware] = field(default=JWTAuthenticationMiddleware) -    """The authentication middleware class to use. - -    Must inherit from :class:`JWTAuthenticationMiddleware` -    """ - - -@dataclass -class JWTCookieAuth(Generic[UserType], BaseJWTAuth[UserType]): -    """JWT Cookie Authentication Configuration. - -    This class is an alternate entry point to the library, and it includes all the functionality of the :class:`JWTAuth` -    class and adds support for passing JWT tokens ``HttpOnly`` cookies. -    """ - -    token_secret: str -    """Key with which to generate the token hash. - -    Notes: -        - This value should be kept as a secret and the standard practice is to inject it into the environment. -    """ -    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] -    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - -    Notes: -        - User and Auth can be any arbitrary values specified by the security backend. -        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. -          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. -        - The callable can be sync or async. If it is sync, it will be wrapped to support async. - -    """ -    guards: Iterable[Guard] | None = field(default=None) -    """An iterable of guards to call for requests, providing authorization functionalities.""" -    exclude: str | list[str] | None = field(default=None) -    """A pattern or list of patterns to skip in the authentication middleware.""" -    exclude_opt_key: str = field(default="exclude_from_auth") -    """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" -    scopes: Scopes | None = field(default=None) -    """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be -    processed.""" -    exclude_http_methods: Sequence[Method] | None = field( -        default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) -    ) -    """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" -    route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) -    """An optional iterable of route handlers to register.""" -    dependencies: dict[str, Provide] | None = field(default=None) -    """An optional dictionary of dependency providers.""" - -    type_encoders: TypeEncodersMap | None = field(default=None) -    """A mapping of types to callables that transform them into types supported for serialization.""" - -    algorithm: str = field(default="HS256") -    """Algorithm to use for JWT hashing.""" -    auth_header: str = field(default="Authorization") -    """Request header key from which to retrieve the token. - -    E.g. ``Authorization`` or ``X-Api-Key``. -    """ -    default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) -    """The default value for token expiration.""" -    openapi_security_scheme_name: str = field(default="BearerToken") -    """The value to use for the OpenAPI security scheme and security requirements.""" -    key: str = field(default="token") -    """Key for the cookie.""" -    path: str = field(default="/") -    """Path fragment that must exist in the request url for the cookie to be valid. - -    Defaults to ``/``. -    """ -    domain: str | None = field(default=None) -    """Domain for which the cookie is valid.""" -    secure: bool | None = field(default=None) -    """Https is required for the cookie.""" -    samesite: Literal["lax", "strict", "none"] = field(default="lax") -    """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """ -    description: str = field(default="JWT cookie-based authentication and authorization.") -    """Description for the OpenAPI security scheme.""" -    authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field(  # pyright: ignore -        default=JWTCookieAuthenticationMiddleware -    ) -    """The authentication middleware class to use. Must inherit from :class:`JWTCookieAuthenticationMiddleware` -    """ - -    @property -    def openapi_components(self) -> Components: -        """Create OpenAPI documentation for the JWT Cookie auth scheme. - -        Returns: -            A :class:`Components <litestar.openapi.spec.components.Components>` instance. -        """ -        return Components( -            security_schemes={ -                self.openapi_security_scheme_name: SecurityScheme( -                    type="http", -                    scheme="Bearer", -                    name=self.key, -                    security_scheme_in="cookie", -                    bearer_format="JWT", -                    description=self.description, -                ) -            } -        ) - -    @property -    def middleware(self) -> DefineMiddleware: -        """Create :class:`JWTCookieAuthenticationMiddleware` wrapped in -            :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. - -        Returns: -            An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. -        """ -        return DefineMiddleware( -            self.authentication_middleware_class, -            algorithm=self.algorithm, -            auth_cookie_key=self.key, -            auth_header=self.auth_header, -            exclude=self.exclude, -            exclude_opt_key=self.exclude_opt_key, -            exclude_http_methods=self.exclude_http_methods, -            retrieve_user_handler=self.retrieve_user_handler, -            scopes=self.scopes, -            token_secret=self.token_secret, -        ) - -    def login( -        self, -        identifier: str, -        *, -        response_body: Any = Empty, -        response_media_type: str | MediaType = MediaType.JSON, -        response_status_code: int = HTTP_201_CREATED, -        token_expiration: timedelta | None = None, -        token_issuer: str | None = None, -        token_audience: str | None = None, -        token_unique_jwt_id: str | None = None, -        token_extras: dict[str, Any] | None = None, -        send_token_as_response_body: bool = False, -    ) -> Response[Any]: -        """Create a response with a JWT header. - -        Args: -            identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. -            response_body: An optional response body to send. -            response_media_type: An optional 'Content-Type'. Defaults to 'application/json'. -            response_status_code: An optional status code for the response. Defaults to '201 Created'. -            token_expiration: An optional timedelta for the token expiration. -            token_issuer: An optional value of the token ``iss`` field. -            token_audience: An optional value for the token ``aud`` field. -            token_unique_jwt_id: An optional value for the token ``jti`` field. -            token_extras: An optional dictionary to include in the token ``extras`` field. -            send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }`` -                will be returned as the response body. Note: if a response body is passed this setting will be ignored. - -        Returns: -            A :class:`Response <.response.Response>` instance. -        """ - -        encoded_token = self.create_token( -            identifier=identifier, -            token_expiration=token_expiration, -            token_issuer=token_issuer, -            token_audience=token_audience, -            token_unique_jwt_id=token_unique_jwt_id, -            token_extras=token_extras, -        ) -        cookie = Cookie( -            key=self.key, -            path=self.path, -            httponly=True, -            value=self.format_auth_header(encoded_token), -            max_age=int((token_expiration or self.default_token_expiration).total_seconds()), -            secure=self.secure, -            samesite=self.samesite, -            domain=self.domain, -        ) - -        if response_body is not Empty: -            body = response_body -        elif send_token_as_response_body: -            body = {"token": encoded_token} -        else: -            body = None - -        return self.create_response( -            content=body, -            headers={self.auth_header: self.format_auth_header(encoded_token)}, -            cookies=[cookie], -            media_type=response_media_type, -            status_code=response_status_code, -        ) - - -@dataclass -class OAuth2Login: -    """OAuth2 Login DTO""" - -    access_token: str -    """Valid JWT access token""" -    token_type: str -    """Type of the OAuth token used""" -    refresh_token: str | None = field(default=None) -    """Optional valid refresh token JWT""" -    expires_in: int | None = field(default=None) -    """Expiration time of the token in seconds. """ - - -@dataclass -class OAuth2PasswordBearerAuth(Generic[UserType], BaseJWTAuth[UserType]): -    """OAUTH2 Schema for Password Bearer Authentication. - -    This class implements an OAUTH2 authentication flow entry point to the library, and it includes all the -    functionality of the :class:`JWTAuth` class and adds support for passing JWT tokens ``HttpOnly`` cookies. - -    ``token_url`` is the only additional argument that is required, and it should point at your login route -    """ - -    token_secret: str -    """Key with which to generate the token hash. - -    Notes: -        - This value should be kept as a secret and the standard practice is to inject it into the environment. -    """ -    token_url: str -    """The URL for retrieving a new token.""" -    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] -    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - -    Notes: -        - User and Auth can be any arbitrary values specified by the security backend. -        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. -          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. -        - The callable can be sync or async. If it is sync, it will be wrapped to support async. - -    """ -    guards: Iterable[Guard] | None = field(default=None) -    """An iterable of guards to call for requests, providing authorization functionalities.""" -    exclude: str | list[str] | None = field(default=None) -    """A pattern or list of patterns to skip in the authentication middleware.""" -    exclude_opt_key: str = field(default="exclude_from_auth") -    """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" -    exclude_http_methods: Sequence[Method] | None = field( -        default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) -    ) -    """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" -    scopes: Scopes | None = field(default=None) -    """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be -    processed.""" -    route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) -    """An optional iterable of route handlers to register.""" -    dependencies: dict[str, Provide] | None = field(default=None) -    """An optional dictionary of dependency providers.""" -    type_encoders: TypeEncodersMap | None = field(default=None) -    """A mapping of types to callables that transform them into types supported for serialization.""" -    algorithm: str = field(default="HS256") -    """Algorithm to use for JWT hashing.""" -    auth_header: str = field(default="Authorization") -    """Request header key from which to retrieve the token. - -    E.g. ``Authorization`` or 'X-Api-Key'. -    """ -    default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) -    """The default value for token expiration.""" -    openapi_security_scheme_name: str = field(default="BearerToken") -    """The value to use for the OpenAPI security scheme and security requirements.""" -    oauth_scopes: dict[str, str] | None = field(default=None) -    """Oauth Scopes available for the token.""" -    key: str = field(default="token") -    """Key for the cookie.""" -    path: str = field(default="/") -    """Path fragment that must exist in the request url for the cookie to be valid. - -    Defaults to ``/``. -    """ -    domain: str | None = field(default=None) -    """Domain for which the cookie is valid.""" -    secure: bool | None = field(default=None) -    """Https is required for the cookie.""" -    samesite: Literal["lax", "strict", "none"] = field(default="lax") -    """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """ -    description: str = field(default="OAUTH2 password bearer authentication and authorization.") -    """Description for the OpenAPI security scheme.""" -    authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field(  # pyright: ignore -        default=JWTCookieAuthenticationMiddleware -    ) -    """The authentication middleware class to use. - -    Must inherit from :class:`JWTCookieAuthenticationMiddleware` -    """ - -    @property -    def middleware(self) -> DefineMiddleware: -        """Create ``JWTCookieAuthenticationMiddleware`` wrapped in -            :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. - -        Returns: -            An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. -        """ -        return DefineMiddleware( -            self.authentication_middleware_class, -            algorithm=self.algorithm, -            auth_cookie_key=self.key, -            auth_header=self.auth_header, -            exclude=self.exclude, -            exclude_opt_key=self.exclude_opt_key, -            exclude_http_methods=self.exclude_http_methods, -            retrieve_user_handler=self.retrieve_user_handler, -            scopes=self.scopes, -            token_secret=self.token_secret, -        ) - -    @property -    def oauth_flow(self) -> OAuthFlow: -        """Create an OpenAPI OAuth2 flow for the password bearer authentication scheme. - -        Returns: -            An :class:`OAuthFlow <litestar.openapi.spec.oauth_flow.OAuthFlow>` instance. -        """ -        return OAuthFlow( -            token_url=self.token_url, -            scopes=self.oauth_scopes, -        ) - -    @property -    def openapi_components(self) -> Components: -        """Create OpenAPI documentation for the OAUTH2 Password bearer auth scheme. - -        Returns: -            An :class:`Components <litestar.openapi.spec.components.Components>` instance. -        """ -        return Components( -            security_schemes={ -                self.openapi_security_scheme_name: SecurityScheme( -                    type="oauth2", -                    scheme="Bearer", -                    name=self.auth_header, -                    security_scheme_in="header", -                    flows=OAuthFlows(password=self.oauth_flow),  # pyright: ignore[reportGeneralTypeIssues] -                    bearer_format="JWT", -                    description=self.description, -                ) -            } -        ) - -    def login( -        self, -        identifier: str, -        *, -        response_body: Any = Empty, -        response_media_type: str | MediaType = MediaType.JSON, -        response_status_code: int = HTTP_201_CREATED, -        token_expiration: timedelta | None = None, -        token_issuer: str | None = None, -        token_audience: str | None = None, -        token_unique_jwt_id: str | None = None, -        token_extras: dict[str, Any] | None = None, -        send_token_as_response_body: bool = True, -    ) -> Response[Any]: -        """Create a response with a JWT header. - -        Args: -            identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. -            response_body: An optional response body to send. -            response_media_type: An optional ``Content-Type``. Defaults to ``application/json``. -            response_status_code: An optional status code for the response. Defaults to ``201``. -            token_expiration: An optional timedelta for the token expiration. -            token_issuer: An optional value of the token ``iss`` field. -            token_audience: An optional value for the token ``aud`` field. -            token_unique_jwt_id: An optional value for the token ``jti`` field. -            token_extras: An optional dictionary to include in the token ``extras`` field. -            send_token_as_response_body: If ``True`` the response will be an oAuth2 token response dict. -                Note: if a response body is passed this setting will be ignored. - -        Returns: -            A :class:`Response <.response.Response>` instance. -        """ -        encoded_token = self.create_token( -            identifier=identifier, -            token_expiration=token_expiration, -            token_issuer=token_issuer, -            token_audience=token_audience, -            token_unique_jwt_id=token_unique_jwt_id, -            token_extras=token_extras, -        ) -        expires_in = int((token_expiration or self.default_token_expiration).total_seconds()) -        cookie = Cookie( -            key=self.key, -            path=self.path, -            httponly=True, -            value=self.format_auth_header(encoded_token), -            max_age=expires_in, -            secure=self.secure, -            samesite=self.samesite, -            domain=self.domain, -        ) - -        if response_body is not Empty: -            body = response_body -        elif send_token_as_response_body: -            token_dto = OAuth2Login( -                access_token=encoded_token, -                expires_in=expires_in, -                token_type="bearer",  # noqa: S106 -            ) -            body = asdict(token_dto) -        else: -            body = None - -        return self.create_response( -            content=body, -            headers={self.auth_header: self.format_auth_header(encoded_token)}, -            cookies=[cookie], -            media_type=response_media_type, -            status_code=response_status_code, -        ) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py deleted file mode 100644 index 84326da..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py +++ /dev/null @@ -1,188 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Awaitable, Callable, Sequence - -from litestar.exceptions import NotAuthorizedException -from litestar.middleware.authentication import ( -    AbstractAuthenticationMiddleware, -    AuthenticationResult, -) -from litestar.security.jwt.token import Token - -__all__ = ("JWTAuthenticationMiddleware", "JWTCookieAuthenticationMiddleware") - - -if TYPE_CHECKING: -    from typing import Any - -    from litestar.connection import ASGIConnection -    from litestar.types import ASGIApp, Method, Scopes - - -class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware): -    """JWT Authentication middleware. - -    This class provides JWT authentication functionalities. -    """ - -    __slots__ = ( -        "algorithm", -        "auth_header", -        "retrieve_user_handler", -        "token_secret", -    ) - -    def __init__( -        self, -        algorithm: str, -        app: ASGIApp, -        auth_header: str, -        exclude: str | list[str] | None, -        exclude_http_methods: Sequence[Method] | None, -        exclude_opt_key: str, -        retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], -        scopes: Scopes, -        token_secret: str, -    ) -> None: -        """Check incoming requests for an encoded token in the auth header specified, and if present retrieve the user -        from persistence using the provided function. - -        Args: -            algorithm: JWT hashing algorithm to use. -            app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. -            auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``. -            exclude: A pattern or list of patterns to skip. -            exclude_opt_key: An identifier to use on routes to disable authentication for a particular route. -            exclude_http_methods: A sequence of http methods that do not require authentication. -            retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user, -                which can be any arbitrary value. -            scopes: ASGI scopes processed by the authentication middleware. -            token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to -                encode it. -        """ -        super().__init__( -            app=app, -            exclude=exclude, -            exclude_from_auth_key=exclude_opt_key, -            exclude_http_methods=exclude_http_methods, -            scopes=scopes, -        ) -        self.algorithm = algorithm -        self.auth_header = auth_header -        self.retrieve_user_handler = retrieve_user_handler -        self.token_secret = token_secret - -    async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: -        """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the -        token from the DB. - -        Args: -            connection: An Litestar HTTPConnection instance. - -        Returns: -            AuthenticationResult - -        Raises: -            NotAuthorizedException: If token is invalid or user is not found. -        """ -        auth_header = connection.headers.get(self.auth_header) -        if not auth_header: -            raise NotAuthorizedException("No JWT token found in request header") -        encoded_token = auth_header.partition(" ")[-1] -        return await self.authenticate_token(encoded_token=encoded_token, connection=connection) - -    async def authenticate_token( -        self, encoded_token: str, connection: ASGIConnection[Any, Any, Any, Any] -    ) -> AuthenticationResult: -        """Given an encoded JWT token, parse, validate and look up sub within token. - -        Args: -            encoded_token: Encoded JWT token. -            connection: An ASGI connection instance. - -        Raises: -            NotAuthorizedException: If token is invalid or user is not found. - -        Returns: -            AuthenticationResult -        """ -        token = Token.decode( -            encoded_token=encoded_token, -            secret=self.token_secret, -            algorithm=self.algorithm, -        ) - -        user = await self.retrieve_user_handler(token, connection) - -        if not user: -            raise NotAuthorizedException() - -        return AuthenticationResult(user=user, auth=token) - - -class JWTCookieAuthenticationMiddleware(JWTAuthenticationMiddleware): -    """Cookie based JWT authentication middleware.""" - -    __slots__ = ("auth_cookie_key",) - -    def __init__( -        self, -        algorithm: str, -        app: ASGIApp, -        auth_cookie_key: str, -        auth_header: str, -        exclude: str | list[str] | None, -        exclude_opt_key: str, -        exclude_http_methods: Sequence[Method] | None, -        retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], -        scopes: Scopes, -        token_secret: str, -    ) -> None: -        """Check incoming requests for an encoded token in the auth header or cookie name specified, and if present -        retrieves the user from persistence using the provided function. - -        Args: -            algorithm: JWT hashing algorithm to use. -            app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. -            auth_cookie_key: Cookie name from which to retrieve the token. E.g. ``token`` or ``accessToken``. -            auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``. -            exclude: A pattern or list of patterns to skip. -            exclude_opt_key: An identifier to use on routes to disable authentication for a particular route. -            exclude_http_methods: A sequence of http methods that do not require authentication. -            retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user, -                which can be any arbitrary value. -            scopes: ASGI scopes processed by the authentication middleware. -            token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to -                encode it. -        """ -        super().__init__( -            algorithm=algorithm, -            app=app, -            auth_header=auth_header, -            exclude=exclude, -            exclude_http_methods=exclude_http_methods, -            exclude_opt_key=exclude_opt_key, -            retrieve_user_handler=retrieve_user_handler, -            scopes=scopes, -            token_secret=token_secret, -        ) -        self.auth_cookie_key = auth_cookie_key - -    async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: -        """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the -        token from the DB. - -        Args: -            connection: An Litestar HTTPConnection instance. - -        Raises: -            NotAuthorizedException: If token is invalid or user is not found. - -        Returns: -            AuthenticationResult -        """ -        auth_header = connection.headers.get(self.auth_header) or connection.cookies.get(self.auth_cookie_key) -        if not auth_header: -            raise NotAuthorizedException("No JWT token found in request header or cookies") -        encoded_token = auth_header.partition(" ")[-1] -        return await self.authenticate_token(encoded_token=encoded_token, connection=connection) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py deleted file mode 100644 index 279111a..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py +++ /dev/null @@ -1,119 +0,0 @@ -from __future__ import annotations - -import dataclasses -from dataclasses import asdict, dataclass, field -from datetime import datetime, timezone -from typing import TYPE_CHECKING, Any - -from jose import JWSError, JWTError, jwt - -from litestar.exceptions import ImproperlyConfiguredException, NotAuthorizedException - -if TYPE_CHECKING: -    from typing_extensions import Self - - -__all__ = ("Token",) - - -def _normalize_datetime(value: datetime) -> datetime: -    """Convert the given value into UTC and strip microseconds. - -    Args: -        value: A datetime instance - -    Returns: -        A datetime instance -    """ -    if value.tzinfo is not None: -        value.astimezone(timezone.utc) - -    return value.replace(microsecond=0) - - -@dataclass -class Token: -    """JWT Token DTO.""" - -    exp: datetime -    """Expiration - datetime for token expiration.""" -    sub: str -    """Subject - usually a unique identifier of the user or equivalent entity.""" -    iat: datetime = field(default_factory=lambda: _normalize_datetime(datetime.now(timezone.utc))) -    """Issued at - should always be current now.""" -    iss: str | None = field(default=None) -    """Issuer - optional unique identifier for the issuer.""" -    aud: str | None = field(default=None) -    """Audience - intended audience.""" -    jti: str | None = field(default=None) -    """JWT ID - a unique identifier of the JWT between different issuers.""" -    extras: dict[str, Any] = field(default_factory=dict) -    """Extra fields that were found on the JWT token.""" - -    def __post_init__(self) -> None: -        if len(self.sub) < 1: -            raise ImproperlyConfiguredException("sub must be a string with a length greater than 0") - -        if isinstance(self.exp, datetime) and ( -            (exp := _normalize_datetime(self.exp)).timestamp() -            >= _normalize_datetime(datetime.now(timezone.utc)).timestamp() -        ): -            self.exp = exp -        else: -            raise ImproperlyConfiguredException("exp value must be a datetime in the future") - -        if isinstance(self.iat, datetime) and ( -            (iat := _normalize_datetime(self.iat)).timestamp() -            <= _normalize_datetime(datetime.now(timezone.utc)).timestamp() -        ): -            self.iat = iat -        else: -            raise ImproperlyConfiguredException("iat must be a current or past time") - -    @classmethod -    def decode(cls, encoded_token: str, secret: str | dict[str, str], algorithm: str) -> Self: -        """Decode a passed in token string and returns a Token instance. - -        Args: -            encoded_token: A base64 string containing an encoded JWT. -            secret: The secret with which the JWT is encoded. It may optionally be an individual JWK or JWS set dict -            algorithm: The algorithm used to encode the JWT. - -        Returns: -            A decoded Token instance. - -        Raises: -            NotAuthorizedException: If the token is invalid. -        """ -        try: -            payload = jwt.decode(token=encoded_token, key=secret, algorithms=[algorithm], options={"verify_aud": False}) -            exp = datetime.fromtimestamp(payload.pop("exp"), tz=timezone.utc) -            iat = datetime.fromtimestamp(payload.pop("iat"), tz=timezone.utc) -            field_names = {f.name for f in dataclasses.fields(Token)} -            extra_fields = payload.keys() - field_names -            extras = payload.pop("extras", {}) -            for key in extra_fields: -                extras[key] = payload.pop(key) -            return cls(exp=exp, iat=iat, **payload, extras=extras) -        except (KeyError, JWTError, ImproperlyConfiguredException) as e: -            raise NotAuthorizedException("Invalid token") from e - -    def encode(self, secret: str, algorithm: str) -> str: -        """Encode the token instance into a string. - -        Args: -            secret: The secret with which the JWT is encoded. -            algorithm: The algorithm used to encode the JWT. - -        Returns: -            An encoded token string. - -        Raises: -            ImproperlyConfiguredException: If encoding fails. -        """ -        try: -            return jwt.encode( -                claims={k: v for k, v in asdict(self).items() if v is not None}, key=secret, algorithm=algorithm -            ) -        except (JWTError, JWSError) as e: -            raise ImproperlyConfiguredException("Failed to encode token") from e diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__init__.py deleted file mode 100644 index 7c83991..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from litestar.security.session_auth.auth import SessionAuth -from litestar.security.session_auth.middleware import SessionAuthMiddleware - -__all__ = ("SessionAuth", "SessionAuthMiddleware") diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/__init__.cpython-311.pycBinary files differ deleted file mode 100644 index 95bf5c1..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/auth.cpython-311.pycBinary files differ deleted file mode 100644 index 8d4aa6c..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/auth.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/middleware.cpython-311.pycBinary files differ deleted file mode 100644 index 27e4213..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/middleware.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/auth.py b/venv/lib/python3.11/site-packages/litestar/security/session_auth/auth.py deleted file mode 100644 index 7a5c542..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/session_auth/auth.py +++ /dev/null @@ -1,137 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Iterable, Sequence, cast - -from litestar.middleware.base import DefineMiddleware -from litestar.middleware.session.base import BaseBackendConfig, BaseSessionBackendT -from litestar.openapi.spec import Components, SecurityRequirement, SecurityScheme -from litestar.security.base import AbstractSecurityConfig, UserType -from litestar.security.session_auth.middleware import MiddlewareWrapper, SessionAuthMiddleware - -__all__ = ("SessionAuth",) - -if TYPE_CHECKING: -    from litestar.connection import ASGIConnection -    from litestar.di import Provide -    from litestar.types import ControllerRouterHandler, Guard, Method, Scopes, SyncOrAsyncUnion, TypeEncodersMap - - -@dataclass -class SessionAuth(Generic[UserType, BaseSessionBackendT], AbstractSecurityConfig[UserType, Dict[str, Any]]): -    """Session Based Security Backend.""" - -    session_backend_config: BaseBackendConfig[BaseSessionBackendT]  # pyright: ignore -    """A session backend config.""" -    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] -    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - -    Notes: -        - User and Auth can be any arbitrary values specified by the security backend. -        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. -          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. -        - The callable can be sync or async. If it is sync, it will be wrapped to support async. - -    """ - -    authentication_middleware_class: type[SessionAuthMiddleware] = field(default=SessionAuthMiddleware)  # pyright: ignore -    """The authentication middleware class to use. - -    Must inherit from :class:`SessionAuthMiddleware <litestar.security.session_auth.middleware.SessionAuthMiddleware>` -    """ - -    guards: Iterable[Guard] | None = field(default=None) -    """An iterable of guards to call for requests, providing authorization functionalities.""" -    exclude: str | list[str] | None = field(default=None) -    """A pattern or list of patterns to skip in the authentication middleware.""" -    exclude_opt_key: str = field(default="exclude_from_auth") -    """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" -    exclude_http_methods: Sequence[Method] | None = field( -        default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) -    ) -    """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" -    scopes: Scopes | None = field(default=None) -    """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be -    processed.""" -    route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) -    """An optional iterable of route handlers to register.""" -    dependencies: dict[str, Provide] | None = field(default=None) -    """An optional dictionary of dependency providers.""" - -    type_encoders: TypeEncodersMap | None = field(default=None) -    """A mapping of types to callables that transform them into types supported for serialization.""" - -    @property -    def middleware(self) -> DefineMiddleware: -        """Use this property to insert the config into a middleware list on one of the application layers. - -        Examples: -            .. code-block:: python - -                from typing import Any -                from os import urandom - -                from litestar import Litestar, Request, get -                from litestar_session import SessionAuth - - -                async def retrieve_user_from_session(session: dict[str, Any]) -> Any: -                    # implement logic here to retrieve a ``user`` datum given the session dictionary -                    ... - - -                session_auth_config = SessionAuth( -                    secret=urandom(16), retrieve_user_handler=retrieve_user_from_session -                ) - - -                @get("/") -                def my_handler(request: Request) -> None: ... - - -                app = Litestar(route_handlers=[my_handler], middleware=[session_auth_config.middleware]) - - -        Returns: -            An instance of DefineMiddleware including ``self`` as the config kwarg value. -        """ -        return DefineMiddleware(MiddlewareWrapper, config=self) - -    @property -    def session_backend(self) -> BaseSessionBackendT: -        """Create a session backend. - -        Returns: -            A subclass of :class:`BaseSessionBackend <litestar.middleware.session.base.BaseSessionBackend>` -        """ -        return self.session_backend_config._backend_class(config=self.session_backend_config)  # pyright: ignore - -    @property -    def openapi_components(self) -> Components: -        """Create OpenAPI documentation for the Session Authentication schema used. - -        Returns: -            An :class:`Components <litestar.openapi.spec.components.Components>` instance. -        """ -        return Components( -            security_schemes={ -                "sessionCookie": SecurityScheme( -                    type="apiKey", -                    name=self.session_backend_config.key, -                    security_scheme_in="cookie",  # pyright: ignore -                    description="Session cookie authentication.", -                ) -            } -        ) - -    @property -    def security_requirement(self) -> SecurityRequirement: -        """Return OpenAPI 3.1. - -        :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` for the auth -        backend. - -        Returns: -            An OpenAPI 3.1 :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` dictionary. -        """ -        return {"sessionCookie": []} diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/middleware.py b/venv/lib/python3.11/site-packages/litestar/security/session_auth/middleware.py deleted file mode 100644 index bb3fce4..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/session_auth/middleware.py +++ /dev/null @@ -1,125 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, Awaitable, Callable, Sequence - -from litestar.exceptions import NotAuthorizedException -from litestar.middleware.authentication import ( -    AbstractAuthenticationMiddleware, -    AuthenticationResult, -) -from litestar.middleware.exceptions import ExceptionHandlerMiddleware -from litestar.types import Empty, Method, Scopes - -__all__ = ("MiddlewareWrapper", "SessionAuthMiddleware") - -if TYPE_CHECKING: -    from litestar.connection import ASGIConnection -    from litestar.security.session_auth.auth import SessionAuth -    from litestar.types import ASGIApp, Receive, Scope, Send - - -class MiddlewareWrapper: -    """Wrapper class that serves as the middleware entry point.""" - -    def __init__(self, app: ASGIApp, config: SessionAuth[Any, Any]) -> None: -        """Wrap the SessionAuthMiddleware inside ExceptionHandlerMiddleware, and it wraps this inside SessionMiddleware. -        This allows the auth middleware to raise exceptions and still have the response handled, while having the -        session cleared. - -        Args: -            app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. -            config: An instance of SessionAuth. -        """ -        self.app = app -        self.config = config -        self.has_wrapped_middleware = False - -    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: -        """Handle creating a middleware stack and calling it. - -        Args: -            scope: The ASGI connection scope. -            receive: The ASGI receive function. -            send: The ASGI send function. - -        Returns: -            None -        """ -        if not self.has_wrapped_middleware: -            litestar_app = scope["app"] -            auth_middleware = self.config.authentication_middleware_class( -                app=self.app, -                exclude=self.config.exclude, -                exclude_http_methods=self.config.exclude_http_methods, -                exclude_opt_key=self.config.exclude_opt_key, -                scopes=self.config.scopes, -                retrieve_user_handler=self.config.retrieve_user_handler,  # type: ignore[arg-type] -            ) -            exception_middleware = ExceptionHandlerMiddleware( -                app=auth_middleware, -                exception_handlers=litestar_app.exception_handlers or {},  # pyright: ignore -                debug=None, -            ) -            self.app = self.config.session_backend_config.middleware.middleware( -                app=exception_middleware, -                backend=self.config.session_backend, -            ) -            self.has_wrapped_middleware = True -        await self.app(scope, receive, send) - - -class SessionAuthMiddleware(AbstractAuthenticationMiddleware): -    """Session Authentication Middleware.""" - -    def __init__( -        self, -        app: ASGIApp, -        exclude: str | list[str] | None, -        exclude_http_methods: Sequence[Method] | None, -        exclude_opt_key: str, -        retrieve_user_handler: Callable[[dict[str, Any], ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], -        scopes: Scopes | None, -    ) -> None: -        """Session based authentication middleware. - -        Args: -            app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. -            exclude: A pattern or list of patterns to skip in the authentication middleware. -            exclude_http_methods: A sequence of http methods that do not require authentication. -            exclude_opt_key: An identifier to use on routes to disable authentication and authorization checks for a particular route. -            scopes: ASGI scopes processed by the authentication middleware. -            retrieve_user_handler: Callable that receives the ``session`` value from the authentication middleware and returns a ``user`` value. -        """ -        super().__init__( -            app=app, -            exclude=exclude, -            exclude_from_auth_key=exclude_opt_key, -            exclude_http_methods=exclude_http_methods, -            scopes=scopes, -        ) -        self.retrieve_user_handler = retrieve_user_handler - -    async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: -        """Authenticate an incoming connection. - -        Args: -            connection: An :class:`ASGIConnection <.connection.ASGIConnection>` instance. - -        Raises: -            NotAuthorizedException: if session data is empty or user is not found. - -        Returns: -            :class:`AuthenticationResult <.middleware.authentication.AuthenticationResult>` -        """ -        if not connection.session or connection.scope["session"] is Empty: -            # the assignment of 'Empty' forces the session middleware to clear session data. -            connection.scope["session"] = Empty -            raise NotAuthorizedException("no session data found") - -        user = await self.retrieve_user_handler(connection.session, connection) - -        if not user: -            connection.scope["session"] = Empty -            raise NotAuthorizedException("no user correlating to session found") - -        return AuthenticationResult(user=user, auth=connection.session) | 
