diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/security/jwt')
| -rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py | 23 | ||||
| -rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc | bin | 0 -> 733 bytes | |||
| -rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc | bin | 0 -> 26027 bytes | |||
| -rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc | bin | 0 -> 8900 bytes | |||
| -rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc | bin | 0 -> 7141 bytes | |||
| -rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py | 691 | ||||
| -rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py | 188 | ||||
| -rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/token.py | 119 | 
8 files changed, 1021 insertions, 0 deletions
| diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py new file mode 100644 index 0000000..4fd88de --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py @@ -0,0 +1,23 @@ +from litestar.security.jwt.auth import ( +    BaseJWTAuth, +    JWTAuth, +    JWTCookieAuth, +    OAuth2Login, +    OAuth2PasswordBearerAuth, +) +from litestar.security.jwt.middleware import ( +    JWTAuthenticationMiddleware, +    JWTCookieAuthenticationMiddleware, +) +from litestar.security.jwt.token import Token + +__all__ = ( +    "BaseJWTAuth", +    "JWTAuth", +    "JWTAuthenticationMiddleware", +    "JWTCookieAuth", +    "JWTCookieAuthenticationMiddleware", +    "OAuth2Login", +    "OAuth2PasswordBearerAuth", +    "Token", +) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pycBinary files differ new file mode 100644 index 0000000..f04d57f --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pycBinary files differ new file mode 100644 index 0000000..cec42c0 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pycBinary files differ new file mode 100644 index 0000000..8d5603e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pycBinary files differ new file mode 100644 index 0000000..b4f8c45 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py new file mode 100644 index 0000000..2a0f094 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py @@ -0,0 +1,691 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Literal, Sequence, TypeVar, cast + +from litestar.datastructures import Cookie +from litestar.enums import MediaType +from litestar.middleware import DefineMiddleware +from litestar.openapi.spec import Components, OAuthFlow, OAuthFlows, SecurityRequirement, SecurityScheme +from litestar.security.base import AbstractSecurityConfig +from litestar.security.jwt.middleware import JWTAuthenticationMiddleware, JWTCookieAuthenticationMiddleware +from litestar.security.jwt.token import Token +from litestar.status_codes import HTTP_201_CREATED +from litestar.types import ControllerRouterHandler, Empty, Guard, Method, Scopes, SyncOrAsyncUnion, TypeEncodersMap + +__all__ = ("BaseJWTAuth", "JWTAuth", "JWTCookieAuth", "OAuth2Login", "OAuth2PasswordBearerAuth") + + +if TYPE_CHECKING: +    from litestar import Response +    from litestar.connection import ASGIConnection +    from litestar.di import Provide + + +UserType = TypeVar("UserType") + + +class BaseJWTAuth(Generic[UserType], AbstractSecurityConfig[UserType, Token]): +    """Base class for JWT Auth backends""" + +    token_secret: str +    """Key with which to generate the token hash. + +    Notes: +        - This value should be kept as a secret and the standard practice is to inject it into the environment. +    """ +    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] +    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + +    Notes: +        - User and Auth can be any arbitrary values specified by the security backend. +        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. +          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. +        - The callable can be sync or async. If it is sync, it will be wrapped to support async. + +    """ +    algorithm: str +    """Algorithm to use for JWT hashing.""" +    auth_header: str +    """Request header key from which to retrieve the token. + +    E.g. ``Authorization`` or ``X-Api-Key``. +    """ +    default_token_expiration: timedelta +    """The default value for token expiration.""" +    openapi_security_scheme_name: str +    """The value to use for the OpenAPI security scheme and security requirements.""" +    description: str +    """Description for the OpenAPI security scheme.""" +    authentication_middleware_class: type[JWTAuthenticationMiddleware]  # pyright: ignore +    """The authentication middleware class to use. + +    Must inherit from :class:`JWTAuthenticationMiddleware` +    """ + +    @property +    def openapi_components(self) -> Components: +        """Create OpenAPI documentation for the JWT auth schema used. + +        Returns: +            An :class:`Components <litestar.openapi.spec.components.Components>` instance. +        """ +        return Components( +            security_schemes={ +                self.openapi_security_scheme_name: SecurityScheme( +                    type="http", +                    scheme="Bearer", +                    name=self.auth_header, +                    bearer_format="JWT", +                    description=self.description, +                ) +            } +        ) + +    @property +    def security_requirement(self) -> SecurityRequirement: +        """Return OpenAPI 3.1. + +        :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` + +        Returns: +            An OpenAPI 3.1 +            :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` +            dictionary. +        """ +        return {self.openapi_security_scheme_name: []} + +    @property +    def middleware(self) -> DefineMiddleware: +        """Create :class:`JWTAuthenticationMiddleware` wrapped in +        :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. + +        Returns: +            An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. +        """ +        return DefineMiddleware( +            self.authentication_middleware_class, +            algorithm=self.algorithm, +            auth_header=self.auth_header, +            exclude=self.exclude, +            exclude_opt_key=self.exclude_opt_key, +            exclude_http_methods=self.exclude_http_methods, +            retrieve_user_handler=self.retrieve_user_handler, +            scopes=self.scopes, +            token_secret=self.token_secret, +        ) + +    def login( +        self, +        identifier: str, +        *, +        response_body: Any = Empty, +        response_media_type: str | MediaType = MediaType.JSON, +        response_status_code: int = HTTP_201_CREATED, +        token_expiration: timedelta | None = None, +        token_issuer: str | None = None, +        token_audience: str | None = None, +        token_unique_jwt_id: str | None = None, +        token_extras: dict[str, Any] | None = None, +        send_token_as_response_body: bool = False, +    ) -> Response[Any]: +        """Create a response with a JWT header. + +        Args: +            identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. +            response_body: An optional response body to send. +            response_media_type: An optional ``Content-Type``. Defaults to ``application/json``. +            response_status_code: An optional status code for the response. Defaults to ``201``. +            token_expiration: An optional timedelta for the token expiration. +            token_issuer: An optional value of the token ``iss`` field. +            token_audience: An optional value for the token ``aud`` field. +            token_unique_jwt_id: An optional value for the token ``jti`` field. +            token_extras: An optional dictionary to include in the token ``extras`` field. +            send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }`` +                will be returned as the response body. Note: if a response body is passed this setting will be ignored. + +        Returns: +            A :class:`Response <.response.Response>` instance. +        """ +        encoded_token = self.create_token( +            identifier=identifier, +            token_expiration=token_expiration, +            token_issuer=token_issuer, +            token_audience=token_audience, +            token_unique_jwt_id=token_unique_jwt_id, +            token_extras=token_extras, +        ) + +        if response_body is not Empty: +            body = response_body +        elif send_token_as_response_body: +            body = {"token": encoded_token} +        else: +            body = None + +        return self.create_response( +            content=body, +            headers={self.auth_header: self.format_auth_header(encoded_token)}, +            media_type=response_media_type, +            status_code=response_status_code, +        ) + +    def create_token( +        self, +        identifier: str, +        token_expiration: timedelta | None = None, +        token_issuer: str | None = None, +        token_audience: str | None = None, +        token_unique_jwt_id: str | None = None, +        token_extras: dict | None = None, +    ) -> str: +        """Create a Token instance from the passed in parameters, persists and returns it. + +        Args: +            identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. +            token_expiration: An optional timedelta for the token expiration. +            token_issuer: An optional value of the token ``iss`` field. +            token_audience: An optional value for the token ``aud`` field. +            token_unique_jwt_id: An optional value for the token ``jti`` field. +            token_extras: An optional dictionary to include in the token ``extras`` field. + +        Returns: +            The created token. +        """ +        token = Token( +            sub=identifier, +            exp=(datetime.now(timezone.utc) + (token_expiration or self.default_token_expiration)), +            iss=token_issuer, +            aud=token_audience, +            jti=token_unique_jwt_id, +            extras=token_extras or {}, +        ) +        return token.encode(secret=self.token_secret, algorithm=self.algorithm) + +    def format_auth_header(self, encoded_token: str) -> str: +        """Format a token according to the specified OpenAPI scheme. + +        Args: +            encoded_token: An encoded JWT token + +        Returns: +            The encoded token formatted for the HTTP headers +        """ +        security = self.openapi_components.security_schemes.get(self.openapi_security_scheme_name, None)  # type: ignore[union-attr] +        return f"{security.scheme} {encoded_token}" if isinstance(security, SecurityScheme) else encoded_token + + +@dataclass +class JWTAuth(Generic[UserType], BaseJWTAuth[UserType]): +    """JWT Authentication Configuration. + +    This class is the main entry point to the library, and it includes methods to create the middleware, provide login +    functionality, and create OpenAPI documentation. +    """ + +    token_secret: str +    """Key with which to generate the token hash. + +    Notes: +        - This value should be kept as a secret and the standard practice is to inject it into the environment. +    """ +    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] +    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + +    Notes: +        - User and Auth can be any arbitrary values specified by the security backend. +        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. +          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. +        - The callable can be sync or async. If it is sync, it will be wrapped to support async. + +    """ +    guards: Iterable[Guard] | None = field(default=None) +    """An iterable of guards to call for requests, providing authorization functionalities.""" +    exclude: str | list[str] | None = field(default=None) +    """A pattern or list of patterns to skip in the authentication middleware.""" +    exclude_opt_key: str = field(default="exclude_from_auth") +    """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" +    exclude_http_methods: Sequence[Method] | None = field( +        default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) +    ) +    """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" +    scopes: Scopes | None = field(default=None) +    """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be +    processed.""" +    route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) +    """An optional iterable of route handlers to register.""" +    dependencies: dict[str, Provide] | None = field(default=None) +    """An optional dictionary of dependency providers.""" + +    type_encoders: TypeEncodersMap | None = field(default=None) +    """A mapping of types to callables that transform them into types supported for serialization.""" + +    algorithm: str = field(default="HS256") +    """Algorithm to use for JWT hashing.""" +    auth_header: str = field(default="Authorization") +    """Request header key from which to retrieve the token. + +    E.g. ``Authorization`` or ``X-Api-Key``. +    """ +    default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) +    """The default value for token expiration.""" +    openapi_security_scheme_name: str = field(default="BearerToken") +    """The value to use for the OpenAPI security scheme and security requirements.""" +    description: str = field(default="JWT api-key authentication and authorization.") +    """Description for the OpenAPI security scheme.""" +    authentication_middleware_class: type[JWTAuthenticationMiddleware] = field(default=JWTAuthenticationMiddleware) +    """The authentication middleware class to use. + +    Must inherit from :class:`JWTAuthenticationMiddleware` +    """ + + +@dataclass +class JWTCookieAuth(Generic[UserType], BaseJWTAuth[UserType]): +    """JWT Cookie Authentication Configuration. + +    This class is an alternate entry point to the library, and it includes all the functionality of the :class:`JWTAuth` +    class and adds support for passing JWT tokens ``HttpOnly`` cookies. +    """ + +    token_secret: str +    """Key with which to generate the token hash. + +    Notes: +        - This value should be kept as a secret and the standard practice is to inject it into the environment. +    """ +    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] +    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + +    Notes: +        - User and Auth can be any arbitrary values specified by the security backend. +        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. +          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. +        - The callable can be sync or async. If it is sync, it will be wrapped to support async. + +    """ +    guards: Iterable[Guard] | None = field(default=None) +    """An iterable of guards to call for requests, providing authorization functionalities.""" +    exclude: str | list[str] | None = field(default=None) +    """A pattern or list of patterns to skip in the authentication middleware.""" +    exclude_opt_key: str = field(default="exclude_from_auth") +    """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" +    scopes: Scopes | None = field(default=None) +    """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be +    processed.""" +    exclude_http_methods: Sequence[Method] | None = field( +        default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) +    ) +    """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" +    route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) +    """An optional iterable of route handlers to register.""" +    dependencies: dict[str, Provide] | None = field(default=None) +    """An optional dictionary of dependency providers.""" + +    type_encoders: TypeEncodersMap | None = field(default=None) +    """A mapping of types to callables that transform them into types supported for serialization.""" + +    algorithm: str = field(default="HS256") +    """Algorithm to use for JWT hashing.""" +    auth_header: str = field(default="Authorization") +    """Request header key from which to retrieve the token. + +    E.g. ``Authorization`` or ``X-Api-Key``. +    """ +    default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) +    """The default value for token expiration.""" +    openapi_security_scheme_name: str = field(default="BearerToken") +    """The value to use for the OpenAPI security scheme and security requirements.""" +    key: str = field(default="token") +    """Key for the cookie.""" +    path: str = field(default="/") +    """Path fragment that must exist in the request url for the cookie to be valid. + +    Defaults to ``/``. +    """ +    domain: str | None = field(default=None) +    """Domain for which the cookie is valid.""" +    secure: bool | None = field(default=None) +    """Https is required for the cookie.""" +    samesite: Literal["lax", "strict", "none"] = field(default="lax") +    """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """ +    description: str = field(default="JWT cookie-based authentication and authorization.") +    """Description for the OpenAPI security scheme.""" +    authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field(  # pyright: ignore +        default=JWTCookieAuthenticationMiddleware +    ) +    """The authentication middleware class to use. Must inherit from :class:`JWTCookieAuthenticationMiddleware` +    """ + +    @property +    def openapi_components(self) -> Components: +        """Create OpenAPI documentation for the JWT Cookie auth scheme. + +        Returns: +            A :class:`Components <litestar.openapi.spec.components.Components>` instance. +        """ +        return Components( +            security_schemes={ +                self.openapi_security_scheme_name: SecurityScheme( +                    type="http", +                    scheme="Bearer", +                    name=self.key, +                    security_scheme_in="cookie", +                    bearer_format="JWT", +                    description=self.description, +                ) +            } +        ) + +    @property +    def middleware(self) -> DefineMiddleware: +        """Create :class:`JWTCookieAuthenticationMiddleware` wrapped in +            :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. + +        Returns: +            An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. +        """ +        return DefineMiddleware( +            self.authentication_middleware_class, +            algorithm=self.algorithm, +            auth_cookie_key=self.key, +            auth_header=self.auth_header, +            exclude=self.exclude, +            exclude_opt_key=self.exclude_opt_key, +            exclude_http_methods=self.exclude_http_methods, +            retrieve_user_handler=self.retrieve_user_handler, +            scopes=self.scopes, +            token_secret=self.token_secret, +        ) + +    def login( +        self, +        identifier: str, +        *, +        response_body: Any = Empty, +        response_media_type: str | MediaType = MediaType.JSON, +        response_status_code: int = HTTP_201_CREATED, +        token_expiration: timedelta | None = None, +        token_issuer: str | None = None, +        token_audience: str | None = None, +        token_unique_jwt_id: str | None = None, +        token_extras: dict[str, Any] | None = None, +        send_token_as_response_body: bool = False, +    ) -> Response[Any]: +        """Create a response with a JWT header. + +        Args: +            identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. +            response_body: An optional response body to send. +            response_media_type: An optional 'Content-Type'. Defaults to 'application/json'. +            response_status_code: An optional status code for the response. Defaults to '201 Created'. +            token_expiration: An optional timedelta for the token expiration. +            token_issuer: An optional value of the token ``iss`` field. +            token_audience: An optional value for the token ``aud`` field. +            token_unique_jwt_id: An optional value for the token ``jti`` field. +            token_extras: An optional dictionary to include in the token ``extras`` field. +            send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }`` +                will be returned as the response body. Note: if a response body is passed this setting will be ignored. + +        Returns: +            A :class:`Response <.response.Response>` instance. +        """ + +        encoded_token = self.create_token( +            identifier=identifier, +            token_expiration=token_expiration, +            token_issuer=token_issuer, +            token_audience=token_audience, +            token_unique_jwt_id=token_unique_jwt_id, +            token_extras=token_extras, +        ) +        cookie = Cookie( +            key=self.key, +            path=self.path, +            httponly=True, +            value=self.format_auth_header(encoded_token), +            max_age=int((token_expiration or self.default_token_expiration).total_seconds()), +            secure=self.secure, +            samesite=self.samesite, +            domain=self.domain, +        ) + +        if response_body is not Empty: +            body = response_body +        elif send_token_as_response_body: +            body = {"token": encoded_token} +        else: +            body = None + +        return self.create_response( +            content=body, +            headers={self.auth_header: self.format_auth_header(encoded_token)}, +            cookies=[cookie], +            media_type=response_media_type, +            status_code=response_status_code, +        ) + + +@dataclass +class OAuth2Login: +    """OAuth2 Login DTO""" + +    access_token: str +    """Valid JWT access token""" +    token_type: str +    """Type of the OAuth token used""" +    refresh_token: str | None = field(default=None) +    """Optional valid refresh token JWT""" +    expires_in: int | None = field(default=None) +    """Expiration time of the token in seconds. """ + + +@dataclass +class OAuth2PasswordBearerAuth(Generic[UserType], BaseJWTAuth[UserType]): +    """OAUTH2 Schema for Password Bearer Authentication. + +    This class implements an OAUTH2 authentication flow entry point to the library, and it includes all the +    functionality of the :class:`JWTAuth` class and adds support for passing JWT tokens ``HttpOnly`` cookies. + +    ``token_url`` is the only additional argument that is required, and it should point at your login route +    """ + +    token_secret: str +    """Key with which to generate the token hash. + +    Notes: +        - This value should be kept as a secret and the standard practice is to inject it into the environment. +    """ +    token_url: str +    """The URL for retrieving a new token.""" +    retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] +    """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + +    Notes: +        - User and Auth can be any arbitrary values specified by the security backend. +        - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. +          Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. +        - The callable can be sync or async. If it is sync, it will be wrapped to support async. + +    """ +    guards: Iterable[Guard] | None = field(default=None) +    """An iterable of guards to call for requests, providing authorization functionalities.""" +    exclude: str | list[str] | None = field(default=None) +    """A pattern or list of patterns to skip in the authentication middleware.""" +    exclude_opt_key: str = field(default="exclude_from_auth") +    """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" +    exclude_http_methods: Sequence[Method] | None = field( +        default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) +    ) +    """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" +    scopes: Scopes | None = field(default=None) +    """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be +    processed.""" +    route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) +    """An optional iterable of route handlers to register.""" +    dependencies: dict[str, Provide] | None = field(default=None) +    """An optional dictionary of dependency providers.""" +    type_encoders: TypeEncodersMap | None = field(default=None) +    """A mapping of types to callables that transform them into types supported for serialization.""" +    algorithm: str = field(default="HS256") +    """Algorithm to use for JWT hashing.""" +    auth_header: str = field(default="Authorization") +    """Request header key from which to retrieve the token. + +    E.g. ``Authorization`` or 'X-Api-Key'. +    """ +    default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) +    """The default value for token expiration.""" +    openapi_security_scheme_name: str = field(default="BearerToken") +    """The value to use for the OpenAPI security scheme and security requirements.""" +    oauth_scopes: dict[str, str] | None = field(default=None) +    """Oauth Scopes available for the token.""" +    key: str = field(default="token") +    """Key for the cookie.""" +    path: str = field(default="/") +    """Path fragment that must exist in the request url for the cookie to be valid. + +    Defaults to ``/``. +    """ +    domain: str | None = field(default=None) +    """Domain for which the cookie is valid.""" +    secure: bool | None = field(default=None) +    """Https is required for the cookie.""" +    samesite: Literal["lax", "strict", "none"] = field(default="lax") +    """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """ +    description: str = field(default="OAUTH2 password bearer authentication and authorization.") +    """Description for the OpenAPI security scheme.""" +    authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field(  # pyright: ignore +        default=JWTCookieAuthenticationMiddleware +    ) +    """The authentication middleware class to use. + +    Must inherit from :class:`JWTCookieAuthenticationMiddleware` +    """ + +    @property +    def middleware(self) -> DefineMiddleware: +        """Create ``JWTCookieAuthenticationMiddleware`` wrapped in +            :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. + +        Returns: +            An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. +        """ +        return DefineMiddleware( +            self.authentication_middleware_class, +            algorithm=self.algorithm, +            auth_cookie_key=self.key, +            auth_header=self.auth_header, +            exclude=self.exclude, +            exclude_opt_key=self.exclude_opt_key, +            exclude_http_methods=self.exclude_http_methods, +            retrieve_user_handler=self.retrieve_user_handler, +            scopes=self.scopes, +            token_secret=self.token_secret, +        ) + +    @property +    def oauth_flow(self) -> OAuthFlow: +        """Create an OpenAPI OAuth2 flow for the password bearer authentication scheme. + +        Returns: +            An :class:`OAuthFlow <litestar.openapi.spec.oauth_flow.OAuthFlow>` instance. +        """ +        return OAuthFlow( +            token_url=self.token_url, +            scopes=self.oauth_scopes, +        ) + +    @property +    def openapi_components(self) -> Components: +        """Create OpenAPI documentation for the OAUTH2 Password bearer auth scheme. + +        Returns: +            An :class:`Components <litestar.openapi.spec.components.Components>` instance. +        """ +        return Components( +            security_schemes={ +                self.openapi_security_scheme_name: SecurityScheme( +                    type="oauth2", +                    scheme="Bearer", +                    name=self.auth_header, +                    security_scheme_in="header", +                    flows=OAuthFlows(password=self.oauth_flow),  # pyright: ignore[reportGeneralTypeIssues] +                    bearer_format="JWT", +                    description=self.description, +                ) +            } +        ) + +    def login( +        self, +        identifier: str, +        *, +        response_body: Any = Empty, +        response_media_type: str | MediaType = MediaType.JSON, +        response_status_code: int = HTTP_201_CREATED, +        token_expiration: timedelta | None = None, +        token_issuer: str | None = None, +        token_audience: str | None = None, +        token_unique_jwt_id: str | None = None, +        token_extras: dict[str, Any] | None = None, +        send_token_as_response_body: bool = True, +    ) -> Response[Any]: +        """Create a response with a JWT header. + +        Args: +            identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. +            response_body: An optional response body to send. +            response_media_type: An optional ``Content-Type``. Defaults to ``application/json``. +            response_status_code: An optional status code for the response. Defaults to ``201``. +            token_expiration: An optional timedelta for the token expiration. +            token_issuer: An optional value of the token ``iss`` field. +            token_audience: An optional value for the token ``aud`` field. +            token_unique_jwt_id: An optional value for the token ``jti`` field. +            token_extras: An optional dictionary to include in the token ``extras`` field. +            send_token_as_response_body: If ``True`` the response will be an oAuth2 token response dict. +                Note: if a response body is passed this setting will be ignored. + +        Returns: +            A :class:`Response <.response.Response>` instance. +        """ +        encoded_token = self.create_token( +            identifier=identifier, +            token_expiration=token_expiration, +            token_issuer=token_issuer, +            token_audience=token_audience, +            token_unique_jwt_id=token_unique_jwt_id, +            token_extras=token_extras, +        ) +        expires_in = int((token_expiration or self.default_token_expiration).total_seconds()) +        cookie = Cookie( +            key=self.key, +            path=self.path, +            httponly=True, +            value=self.format_auth_header(encoded_token), +            max_age=expires_in, +            secure=self.secure, +            samesite=self.samesite, +            domain=self.domain, +        ) + +        if response_body is not Empty: +            body = response_body +        elif send_token_as_response_body: +            token_dto = OAuth2Login( +                access_token=encoded_token, +                expires_in=expires_in, +                token_type="bearer",  # noqa: S106 +            ) +            body = asdict(token_dto) +        else: +            body = None + +        return self.create_response( +            content=body, +            headers={self.auth_header: self.format_auth_header(encoded_token)}, +            cookies=[cookie], +            media_type=response_media_type, +            status_code=response_status_code, +        ) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py new file mode 100644 index 0000000..84326da --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Awaitable, Callable, Sequence + +from litestar.exceptions import NotAuthorizedException +from litestar.middleware.authentication import ( +    AbstractAuthenticationMiddleware, +    AuthenticationResult, +) +from litestar.security.jwt.token import Token + +__all__ = ("JWTAuthenticationMiddleware", "JWTCookieAuthenticationMiddleware") + + +if TYPE_CHECKING: +    from typing import Any + +    from litestar.connection import ASGIConnection +    from litestar.types import ASGIApp, Method, Scopes + + +class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware): +    """JWT Authentication middleware. + +    This class provides JWT authentication functionalities. +    """ + +    __slots__ = ( +        "algorithm", +        "auth_header", +        "retrieve_user_handler", +        "token_secret", +    ) + +    def __init__( +        self, +        algorithm: str, +        app: ASGIApp, +        auth_header: str, +        exclude: str | list[str] | None, +        exclude_http_methods: Sequence[Method] | None, +        exclude_opt_key: str, +        retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], +        scopes: Scopes, +        token_secret: str, +    ) -> None: +        """Check incoming requests for an encoded token in the auth header specified, and if present retrieve the user +        from persistence using the provided function. + +        Args: +            algorithm: JWT hashing algorithm to use. +            app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. +            auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``. +            exclude: A pattern or list of patterns to skip. +            exclude_opt_key: An identifier to use on routes to disable authentication for a particular route. +            exclude_http_methods: A sequence of http methods that do not require authentication. +            retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user, +                which can be any arbitrary value. +            scopes: ASGI scopes processed by the authentication middleware. +            token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to +                encode it. +        """ +        super().__init__( +            app=app, +            exclude=exclude, +            exclude_from_auth_key=exclude_opt_key, +            exclude_http_methods=exclude_http_methods, +            scopes=scopes, +        ) +        self.algorithm = algorithm +        self.auth_header = auth_header +        self.retrieve_user_handler = retrieve_user_handler +        self.token_secret = token_secret + +    async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: +        """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the +        token from the DB. + +        Args: +            connection: An Litestar HTTPConnection instance. + +        Returns: +            AuthenticationResult + +        Raises: +            NotAuthorizedException: If token is invalid or user is not found. +        """ +        auth_header = connection.headers.get(self.auth_header) +        if not auth_header: +            raise NotAuthorizedException("No JWT token found in request header") +        encoded_token = auth_header.partition(" ")[-1] +        return await self.authenticate_token(encoded_token=encoded_token, connection=connection) + +    async def authenticate_token( +        self, encoded_token: str, connection: ASGIConnection[Any, Any, Any, Any] +    ) -> AuthenticationResult: +        """Given an encoded JWT token, parse, validate and look up sub within token. + +        Args: +            encoded_token: Encoded JWT token. +            connection: An ASGI connection instance. + +        Raises: +            NotAuthorizedException: If token is invalid or user is not found. + +        Returns: +            AuthenticationResult +        """ +        token = Token.decode( +            encoded_token=encoded_token, +            secret=self.token_secret, +            algorithm=self.algorithm, +        ) + +        user = await self.retrieve_user_handler(token, connection) + +        if not user: +            raise NotAuthorizedException() + +        return AuthenticationResult(user=user, auth=token) + + +class JWTCookieAuthenticationMiddleware(JWTAuthenticationMiddleware): +    """Cookie based JWT authentication middleware.""" + +    __slots__ = ("auth_cookie_key",) + +    def __init__( +        self, +        algorithm: str, +        app: ASGIApp, +        auth_cookie_key: str, +        auth_header: str, +        exclude: str | list[str] | None, +        exclude_opt_key: str, +        exclude_http_methods: Sequence[Method] | None, +        retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], +        scopes: Scopes, +        token_secret: str, +    ) -> None: +        """Check incoming requests for an encoded token in the auth header or cookie name specified, and if present +        retrieves the user from persistence using the provided function. + +        Args: +            algorithm: JWT hashing algorithm to use. +            app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. +            auth_cookie_key: Cookie name from which to retrieve the token. E.g. ``token`` or ``accessToken``. +            auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``. +            exclude: A pattern or list of patterns to skip. +            exclude_opt_key: An identifier to use on routes to disable authentication for a particular route. +            exclude_http_methods: A sequence of http methods that do not require authentication. +            retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user, +                which can be any arbitrary value. +            scopes: ASGI scopes processed by the authentication middleware. +            token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to +                encode it. +        """ +        super().__init__( +            algorithm=algorithm, +            app=app, +            auth_header=auth_header, +            exclude=exclude, +            exclude_http_methods=exclude_http_methods, +            exclude_opt_key=exclude_opt_key, +            retrieve_user_handler=retrieve_user_handler, +            scopes=scopes, +            token_secret=token_secret, +        ) +        self.auth_cookie_key = auth_cookie_key + +    async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: +        """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the +        token from the DB. + +        Args: +            connection: An Litestar HTTPConnection instance. + +        Raises: +            NotAuthorizedException: If token is invalid or user is not found. + +        Returns: +            AuthenticationResult +        """ +        auth_header = connection.headers.get(self.auth_header) or connection.cookies.get(self.auth_cookie_key) +        if not auth_header: +            raise NotAuthorizedException("No JWT token found in request header or cookies") +        encoded_token = auth_header.partition(" ")[-1] +        return await self.authenticate_token(encoded_token=encoded_token, connection=connection) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py new file mode 100644 index 0000000..279111a --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import dataclasses +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any + +from jose import JWSError, JWTError, jwt + +from litestar.exceptions import ImproperlyConfiguredException, NotAuthorizedException + +if TYPE_CHECKING: +    from typing_extensions import Self + + +__all__ = ("Token",) + + +def _normalize_datetime(value: datetime) -> datetime: +    """Convert the given value into UTC and strip microseconds. + +    Args: +        value: A datetime instance + +    Returns: +        A datetime instance +    """ +    if value.tzinfo is not None: +        value.astimezone(timezone.utc) + +    return value.replace(microsecond=0) + + +@dataclass +class Token: +    """JWT Token DTO.""" + +    exp: datetime +    """Expiration - datetime for token expiration.""" +    sub: str +    """Subject - usually a unique identifier of the user or equivalent entity.""" +    iat: datetime = field(default_factory=lambda: _normalize_datetime(datetime.now(timezone.utc))) +    """Issued at - should always be current now.""" +    iss: str | None = field(default=None) +    """Issuer - optional unique identifier for the issuer.""" +    aud: str | None = field(default=None) +    """Audience - intended audience.""" +    jti: str | None = field(default=None) +    """JWT ID - a unique identifier of the JWT between different issuers.""" +    extras: dict[str, Any] = field(default_factory=dict) +    """Extra fields that were found on the JWT token.""" + +    def __post_init__(self) -> None: +        if len(self.sub) < 1: +            raise ImproperlyConfiguredException("sub must be a string with a length greater than 0") + +        if isinstance(self.exp, datetime) and ( +            (exp := _normalize_datetime(self.exp)).timestamp() +            >= _normalize_datetime(datetime.now(timezone.utc)).timestamp() +        ): +            self.exp = exp +        else: +            raise ImproperlyConfiguredException("exp value must be a datetime in the future") + +        if isinstance(self.iat, datetime) and ( +            (iat := _normalize_datetime(self.iat)).timestamp() +            <= _normalize_datetime(datetime.now(timezone.utc)).timestamp() +        ): +            self.iat = iat +        else: +            raise ImproperlyConfiguredException("iat must be a current or past time") + +    @classmethod +    def decode(cls, encoded_token: str, secret: str | dict[str, str], algorithm: str) -> Self: +        """Decode a passed in token string and returns a Token instance. + +        Args: +            encoded_token: A base64 string containing an encoded JWT. +            secret: The secret with which the JWT is encoded. It may optionally be an individual JWK or JWS set dict +            algorithm: The algorithm used to encode the JWT. + +        Returns: +            A decoded Token instance. + +        Raises: +            NotAuthorizedException: If the token is invalid. +        """ +        try: +            payload = jwt.decode(token=encoded_token, key=secret, algorithms=[algorithm], options={"verify_aud": False}) +            exp = datetime.fromtimestamp(payload.pop("exp"), tz=timezone.utc) +            iat = datetime.fromtimestamp(payload.pop("iat"), tz=timezone.utc) +            field_names = {f.name for f in dataclasses.fields(Token)} +            extra_fields = payload.keys() - field_names +            extras = payload.pop("extras", {}) +            for key in extra_fields: +                extras[key] = payload.pop(key) +            return cls(exp=exp, iat=iat, **payload, extras=extras) +        except (KeyError, JWTError, ImproperlyConfiguredException) as e: +            raise NotAuthorizedException("Invalid token") from e + +    def encode(self, secret: str, algorithm: str) -> str: +        """Encode the token instance into a string. + +        Args: +            secret: The secret with which the JWT is encoded. +            algorithm: The algorithm used to encode the JWT. + +        Returns: +            An encoded token string. + +        Raises: +            ImproperlyConfiguredException: If encoding fails. +        """ +        try: +            return jwt.encode( +                claims={k: v for k, v in asdict(self).items() if v is not None}, key=secret, algorithm=algorithm +            ) +        except (JWTError, JWSError) as e: +            raise ImproperlyConfiguredException("Failed to encode token") from e | 
