diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/security/jwt')
-rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py | 23 | ||||
-rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc | bin | 733 -> 0 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc | bin | 26027 -> 0 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc | bin | 8900 -> 0 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc | bin | 7141 -> 0 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py | 691 | ||||
-rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py | 188 | ||||
-rw-r--r-- | venv/lib/python3.11/site-packages/litestar/security/jwt/token.py | 119 |
8 files changed, 0 insertions, 1021 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py deleted file mode 100644 index 4fd88de..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -from litestar.security.jwt.auth import ( - BaseJWTAuth, - JWTAuth, - JWTCookieAuth, - OAuth2Login, - OAuth2PasswordBearerAuth, -) -from litestar.security.jwt.middleware import ( - JWTAuthenticationMiddleware, - JWTCookieAuthenticationMiddleware, -) -from litestar.security.jwt.token import Token - -__all__ = ( - "BaseJWTAuth", - "JWTAuth", - "JWTAuthenticationMiddleware", - "JWTCookieAuth", - "JWTCookieAuthenticationMiddleware", - "OAuth2Login", - "OAuth2PasswordBearerAuth", - "Token", -) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc Binary files differdeleted file mode 100644 index f04d57f..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc Binary files differdeleted file mode 100644 index cec42c0..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc Binary files differdeleted file mode 100644 index 8d5603e..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc Binary files differdeleted file mode 100644 index b4f8c45..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py deleted file mode 100644 index 2a0f094..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py +++ /dev/null @@ -1,691 +0,0 @@ -from __future__ import annotations - -from dataclasses import asdict, dataclass, field -from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Literal, Sequence, TypeVar, cast - -from litestar.datastructures import Cookie -from litestar.enums import MediaType -from litestar.middleware import DefineMiddleware -from litestar.openapi.spec import Components, OAuthFlow, OAuthFlows, SecurityRequirement, SecurityScheme -from litestar.security.base import AbstractSecurityConfig -from litestar.security.jwt.middleware import JWTAuthenticationMiddleware, JWTCookieAuthenticationMiddleware -from litestar.security.jwt.token import Token -from litestar.status_codes import HTTP_201_CREATED -from litestar.types import ControllerRouterHandler, Empty, Guard, Method, Scopes, SyncOrAsyncUnion, TypeEncodersMap - -__all__ = ("BaseJWTAuth", "JWTAuth", "JWTCookieAuth", "OAuth2Login", "OAuth2PasswordBearerAuth") - - -if TYPE_CHECKING: - from litestar import Response - from litestar.connection import ASGIConnection - from litestar.di import Provide - - -UserType = TypeVar("UserType") - - -class BaseJWTAuth(Generic[UserType], AbstractSecurityConfig[UserType, Token]): - """Base class for JWT Auth backends""" - - token_secret: str - """Key with which to generate the token hash. - - Notes: - - This value should be kept as a secret and the standard practice is to inject it into the environment. - """ - retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] - """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - - Notes: - - User and Auth can be any arbitrary values specified by the security backend. - - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. - Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. - - The callable can be sync or async. If it is sync, it will be wrapped to support async. - - """ - algorithm: str - """Algorithm to use for JWT hashing.""" - auth_header: str - """Request header key from which to retrieve the token. - - E.g. ``Authorization`` or ``X-Api-Key``. - """ - default_token_expiration: timedelta - """The default value for token expiration.""" - openapi_security_scheme_name: str - """The value to use for the OpenAPI security scheme and security requirements.""" - description: str - """Description for the OpenAPI security scheme.""" - authentication_middleware_class: type[JWTAuthenticationMiddleware] # pyright: ignore - """The authentication middleware class to use. - - Must inherit from :class:`JWTAuthenticationMiddleware` - """ - - @property - def openapi_components(self) -> Components: - """Create OpenAPI documentation for the JWT auth schema used. - - Returns: - An :class:`Components <litestar.openapi.spec.components.Components>` instance. - """ - return Components( - security_schemes={ - self.openapi_security_scheme_name: SecurityScheme( - type="http", - scheme="Bearer", - name=self.auth_header, - bearer_format="JWT", - description=self.description, - ) - } - ) - - @property - def security_requirement(self) -> SecurityRequirement: - """Return OpenAPI 3.1. - - :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` - - Returns: - An OpenAPI 3.1 - :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` - dictionary. - """ - return {self.openapi_security_scheme_name: []} - - @property - def middleware(self) -> DefineMiddleware: - """Create :class:`JWTAuthenticationMiddleware` wrapped in - :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. - - Returns: - An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. - """ - return DefineMiddleware( - self.authentication_middleware_class, - algorithm=self.algorithm, - auth_header=self.auth_header, - exclude=self.exclude, - exclude_opt_key=self.exclude_opt_key, - exclude_http_methods=self.exclude_http_methods, - retrieve_user_handler=self.retrieve_user_handler, - scopes=self.scopes, - token_secret=self.token_secret, - ) - - def login( - self, - identifier: str, - *, - response_body: Any = Empty, - response_media_type: str | MediaType = MediaType.JSON, - response_status_code: int = HTTP_201_CREATED, - token_expiration: timedelta | None = None, - token_issuer: str | None = None, - token_audience: str | None = None, - token_unique_jwt_id: str | None = None, - token_extras: dict[str, Any] | None = None, - send_token_as_response_body: bool = False, - ) -> Response[Any]: - """Create a response with a JWT header. - - Args: - identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. - response_body: An optional response body to send. - response_media_type: An optional ``Content-Type``. Defaults to ``application/json``. - response_status_code: An optional status code for the response. Defaults to ``201``. - token_expiration: An optional timedelta for the token expiration. - token_issuer: An optional value of the token ``iss`` field. - token_audience: An optional value for the token ``aud`` field. - token_unique_jwt_id: An optional value for the token ``jti`` field. - token_extras: An optional dictionary to include in the token ``extras`` field. - send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }`` - will be returned as the response body. Note: if a response body is passed this setting will be ignored. - - Returns: - A :class:`Response <.response.Response>` instance. - """ - encoded_token = self.create_token( - identifier=identifier, - token_expiration=token_expiration, - token_issuer=token_issuer, - token_audience=token_audience, - token_unique_jwt_id=token_unique_jwt_id, - token_extras=token_extras, - ) - - if response_body is not Empty: - body = response_body - elif send_token_as_response_body: - body = {"token": encoded_token} - else: - body = None - - return self.create_response( - content=body, - headers={self.auth_header: self.format_auth_header(encoded_token)}, - media_type=response_media_type, - status_code=response_status_code, - ) - - def create_token( - self, - identifier: str, - token_expiration: timedelta | None = None, - token_issuer: str | None = None, - token_audience: str | None = None, - token_unique_jwt_id: str | None = None, - token_extras: dict | None = None, - ) -> str: - """Create a Token instance from the passed in parameters, persists and returns it. - - Args: - identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. - token_expiration: An optional timedelta for the token expiration. - token_issuer: An optional value of the token ``iss`` field. - token_audience: An optional value for the token ``aud`` field. - token_unique_jwt_id: An optional value for the token ``jti`` field. - token_extras: An optional dictionary to include in the token ``extras`` field. - - Returns: - The created token. - """ - token = Token( - sub=identifier, - exp=(datetime.now(timezone.utc) + (token_expiration or self.default_token_expiration)), - iss=token_issuer, - aud=token_audience, - jti=token_unique_jwt_id, - extras=token_extras or {}, - ) - return token.encode(secret=self.token_secret, algorithm=self.algorithm) - - def format_auth_header(self, encoded_token: str) -> str: - """Format a token according to the specified OpenAPI scheme. - - Args: - encoded_token: An encoded JWT token - - Returns: - The encoded token formatted for the HTTP headers - """ - security = self.openapi_components.security_schemes.get(self.openapi_security_scheme_name, None) # type: ignore[union-attr] - return f"{security.scheme} {encoded_token}" if isinstance(security, SecurityScheme) else encoded_token - - -@dataclass -class JWTAuth(Generic[UserType], BaseJWTAuth[UserType]): - """JWT Authentication Configuration. - - This class is the main entry point to the library, and it includes methods to create the middleware, provide login - functionality, and create OpenAPI documentation. - """ - - token_secret: str - """Key with which to generate the token hash. - - Notes: - - This value should be kept as a secret and the standard practice is to inject it into the environment. - """ - retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] - """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - - Notes: - - User and Auth can be any arbitrary values specified by the security backend. - - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. - Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. - - The callable can be sync or async. If it is sync, it will be wrapped to support async. - - """ - guards: Iterable[Guard] | None = field(default=None) - """An iterable of guards to call for requests, providing authorization functionalities.""" - exclude: str | list[str] | None = field(default=None) - """A pattern or list of patterns to skip in the authentication middleware.""" - exclude_opt_key: str = field(default="exclude_from_auth") - """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" - exclude_http_methods: Sequence[Method] | None = field( - default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) - ) - """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" - scopes: Scopes | None = field(default=None) - """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be - processed.""" - route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) - """An optional iterable of route handlers to register.""" - dependencies: dict[str, Provide] | None = field(default=None) - """An optional dictionary of dependency providers.""" - - type_encoders: TypeEncodersMap | None = field(default=None) - """A mapping of types to callables that transform them into types supported for serialization.""" - - algorithm: str = field(default="HS256") - """Algorithm to use for JWT hashing.""" - auth_header: str = field(default="Authorization") - """Request header key from which to retrieve the token. - - E.g. ``Authorization`` or ``X-Api-Key``. - """ - default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) - """The default value for token expiration.""" - openapi_security_scheme_name: str = field(default="BearerToken") - """The value to use for the OpenAPI security scheme and security requirements.""" - description: str = field(default="JWT api-key authentication and authorization.") - """Description for the OpenAPI security scheme.""" - authentication_middleware_class: type[JWTAuthenticationMiddleware] = field(default=JWTAuthenticationMiddleware) - """The authentication middleware class to use. - - Must inherit from :class:`JWTAuthenticationMiddleware` - """ - - -@dataclass -class JWTCookieAuth(Generic[UserType], BaseJWTAuth[UserType]): - """JWT Cookie Authentication Configuration. - - This class is an alternate entry point to the library, and it includes all the functionality of the :class:`JWTAuth` - class and adds support for passing JWT tokens ``HttpOnly`` cookies. - """ - - token_secret: str - """Key with which to generate the token hash. - - Notes: - - This value should be kept as a secret and the standard practice is to inject it into the environment. - """ - retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] - """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - - Notes: - - User and Auth can be any arbitrary values specified by the security backend. - - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. - Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. - - The callable can be sync or async. If it is sync, it will be wrapped to support async. - - """ - guards: Iterable[Guard] | None = field(default=None) - """An iterable of guards to call for requests, providing authorization functionalities.""" - exclude: str | list[str] | None = field(default=None) - """A pattern or list of patterns to skip in the authentication middleware.""" - exclude_opt_key: str = field(default="exclude_from_auth") - """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" - scopes: Scopes | None = field(default=None) - """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be - processed.""" - exclude_http_methods: Sequence[Method] | None = field( - default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) - ) - """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" - route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) - """An optional iterable of route handlers to register.""" - dependencies: dict[str, Provide] | None = field(default=None) - """An optional dictionary of dependency providers.""" - - type_encoders: TypeEncodersMap | None = field(default=None) - """A mapping of types to callables that transform them into types supported for serialization.""" - - algorithm: str = field(default="HS256") - """Algorithm to use for JWT hashing.""" - auth_header: str = field(default="Authorization") - """Request header key from which to retrieve the token. - - E.g. ``Authorization`` or ``X-Api-Key``. - """ - default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) - """The default value for token expiration.""" - openapi_security_scheme_name: str = field(default="BearerToken") - """The value to use for the OpenAPI security scheme and security requirements.""" - key: str = field(default="token") - """Key for the cookie.""" - path: str = field(default="/") - """Path fragment that must exist in the request url for the cookie to be valid. - - Defaults to ``/``. - """ - domain: str | None = field(default=None) - """Domain for which the cookie is valid.""" - secure: bool | None = field(default=None) - """Https is required for the cookie.""" - samesite: Literal["lax", "strict", "none"] = field(default="lax") - """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """ - description: str = field(default="JWT cookie-based authentication and authorization.") - """Description for the OpenAPI security scheme.""" - authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field( # pyright: ignore - default=JWTCookieAuthenticationMiddleware - ) - """The authentication middleware class to use. Must inherit from :class:`JWTCookieAuthenticationMiddleware` - """ - - @property - def openapi_components(self) -> Components: - """Create OpenAPI documentation for the JWT Cookie auth scheme. - - Returns: - A :class:`Components <litestar.openapi.spec.components.Components>` instance. - """ - return Components( - security_schemes={ - self.openapi_security_scheme_name: SecurityScheme( - type="http", - scheme="Bearer", - name=self.key, - security_scheme_in="cookie", - bearer_format="JWT", - description=self.description, - ) - } - ) - - @property - def middleware(self) -> DefineMiddleware: - """Create :class:`JWTCookieAuthenticationMiddleware` wrapped in - :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. - - Returns: - An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. - """ - return DefineMiddleware( - self.authentication_middleware_class, - algorithm=self.algorithm, - auth_cookie_key=self.key, - auth_header=self.auth_header, - exclude=self.exclude, - exclude_opt_key=self.exclude_opt_key, - exclude_http_methods=self.exclude_http_methods, - retrieve_user_handler=self.retrieve_user_handler, - scopes=self.scopes, - token_secret=self.token_secret, - ) - - def login( - self, - identifier: str, - *, - response_body: Any = Empty, - response_media_type: str | MediaType = MediaType.JSON, - response_status_code: int = HTTP_201_CREATED, - token_expiration: timedelta | None = None, - token_issuer: str | None = None, - token_audience: str | None = None, - token_unique_jwt_id: str | None = None, - token_extras: dict[str, Any] | None = None, - send_token_as_response_body: bool = False, - ) -> Response[Any]: - """Create a response with a JWT header. - - Args: - identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. - response_body: An optional response body to send. - response_media_type: An optional 'Content-Type'. Defaults to 'application/json'. - response_status_code: An optional status code for the response. Defaults to '201 Created'. - token_expiration: An optional timedelta for the token expiration. - token_issuer: An optional value of the token ``iss`` field. - token_audience: An optional value for the token ``aud`` field. - token_unique_jwt_id: An optional value for the token ``jti`` field. - token_extras: An optional dictionary to include in the token ``extras`` field. - send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }`` - will be returned as the response body. Note: if a response body is passed this setting will be ignored. - - Returns: - A :class:`Response <.response.Response>` instance. - """ - - encoded_token = self.create_token( - identifier=identifier, - token_expiration=token_expiration, - token_issuer=token_issuer, - token_audience=token_audience, - token_unique_jwt_id=token_unique_jwt_id, - token_extras=token_extras, - ) - cookie = Cookie( - key=self.key, - path=self.path, - httponly=True, - value=self.format_auth_header(encoded_token), - max_age=int((token_expiration or self.default_token_expiration).total_seconds()), - secure=self.secure, - samesite=self.samesite, - domain=self.domain, - ) - - if response_body is not Empty: - body = response_body - elif send_token_as_response_body: - body = {"token": encoded_token} - else: - body = None - - return self.create_response( - content=body, - headers={self.auth_header: self.format_auth_header(encoded_token)}, - cookies=[cookie], - media_type=response_media_type, - status_code=response_status_code, - ) - - -@dataclass -class OAuth2Login: - """OAuth2 Login DTO""" - - access_token: str - """Valid JWT access token""" - token_type: str - """Type of the OAuth token used""" - refresh_token: str | None = field(default=None) - """Optional valid refresh token JWT""" - expires_in: int | None = field(default=None) - """Expiration time of the token in seconds. """ - - -@dataclass -class OAuth2PasswordBearerAuth(Generic[UserType], BaseJWTAuth[UserType]): - """OAUTH2 Schema for Password Bearer Authentication. - - This class implements an OAUTH2 authentication flow entry point to the library, and it includes all the - functionality of the :class:`JWTAuth` class and adds support for passing JWT tokens ``HttpOnly`` cookies. - - ``token_url`` is the only additional argument that is required, and it should point at your login route - """ - - token_secret: str - """Key with which to generate the token hash. - - Notes: - - This value should be kept as a secret and the standard practice is to inject it into the environment. - """ - token_url: str - """The URL for retrieving a new token.""" - retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] - """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. - - Notes: - - User and Auth can be any arbitrary values specified by the security backend. - - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. - Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. - - The callable can be sync or async. If it is sync, it will be wrapped to support async. - - """ - guards: Iterable[Guard] | None = field(default=None) - """An iterable of guards to call for requests, providing authorization functionalities.""" - exclude: str | list[str] | None = field(default=None) - """A pattern or list of patterns to skip in the authentication middleware.""" - exclude_opt_key: str = field(default="exclude_from_auth") - """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" - exclude_http_methods: Sequence[Method] | None = field( - default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) - ) - """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" - scopes: Scopes | None = field(default=None) - """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be - processed.""" - route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) - """An optional iterable of route handlers to register.""" - dependencies: dict[str, Provide] | None = field(default=None) - """An optional dictionary of dependency providers.""" - type_encoders: TypeEncodersMap | None = field(default=None) - """A mapping of types to callables that transform them into types supported for serialization.""" - algorithm: str = field(default="HS256") - """Algorithm to use for JWT hashing.""" - auth_header: str = field(default="Authorization") - """Request header key from which to retrieve the token. - - E.g. ``Authorization`` or 'X-Api-Key'. - """ - default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) - """The default value for token expiration.""" - openapi_security_scheme_name: str = field(default="BearerToken") - """The value to use for the OpenAPI security scheme and security requirements.""" - oauth_scopes: dict[str, str] | None = field(default=None) - """Oauth Scopes available for the token.""" - key: str = field(default="token") - """Key for the cookie.""" - path: str = field(default="/") - """Path fragment that must exist in the request url for the cookie to be valid. - - Defaults to ``/``. - """ - domain: str | None = field(default=None) - """Domain for which the cookie is valid.""" - secure: bool | None = field(default=None) - """Https is required for the cookie.""" - samesite: Literal["lax", "strict", "none"] = field(default="lax") - """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """ - description: str = field(default="OAUTH2 password bearer authentication and authorization.") - """Description for the OpenAPI security scheme.""" - authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field( # pyright: ignore - default=JWTCookieAuthenticationMiddleware - ) - """The authentication middleware class to use. - - Must inherit from :class:`JWTCookieAuthenticationMiddleware` - """ - - @property - def middleware(self) -> DefineMiddleware: - """Create ``JWTCookieAuthenticationMiddleware`` wrapped in - :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. - - Returns: - An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. - """ - return DefineMiddleware( - self.authentication_middleware_class, - algorithm=self.algorithm, - auth_cookie_key=self.key, - auth_header=self.auth_header, - exclude=self.exclude, - exclude_opt_key=self.exclude_opt_key, - exclude_http_methods=self.exclude_http_methods, - retrieve_user_handler=self.retrieve_user_handler, - scopes=self.scopes, - token_secret=self.token_secret, - ) - - @property - def oauth_flow(self) -> OAuthFlow: - """Create an OpenAPI OAuth2 flow for the password bearer authentication scheme. - - Returns: - An :class:`OAuthFlow <litestar.openapi.spec.oauth_flow.OAuthFlow>` instance. - """ - return OAuthFlow( - token_url=self.token_url, - scopes=self.oauth_scopes, - ) - - @property - def openapi_components(self) -> Components: - """Create OpenAPI documentation for the OAUTH2 Password bearer auth scheme. - - Returns: - An :class:`Components <litestar.openapi.spec.components.Components>` instance. - """ - return Components( - security_schemes={ - self.openapi_security_scheme_name: SecurityScheme( - type="oauth2", - scheme="Bearer", - name=self.auth_header, - security_scheme_in="header", - flows=OAuthFlows(password=self.oauth_flow), # pyright: ignore[reportGeneralTypeIssues] - bearer_format="JWT", - description=self.description, - ) - } - ) - - def login( - self, - identifier: str, - *, - response_body: Any = Empty, - response_media_type: str | MediaType = MediaType.JSON, - response_status_code: int = HTTP_201_CREATED, - token_expiration: timedelta | None = None, - token_issuer: str | None = None, - token_audience: str | None = None, - token_unique_jwt_id: str | None = None, - token_extras: dict[str, Any] | None = None, - send_token_as_response_body: bool = True, - ) -> Response[Any]: - """Create a response with a JWT header. - - Args: - identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. - response_body: An optional response body to send. - response_media_type: An optional ``Content-Type``. Defaults to ``application/json``. - response_status_code: An optional status code for the response. Defaults to ``201``. - token_expiration: An optional timedelta for the token expiration. - token_issuer: An optional value of the token ``iss`` field. - token_audience: An optional value for the token ``aud`` field. - token_unique_jwt_id: An optional value for the token ``jti`` field. - token_extras: An optional dictionary to include in the token ``extras`` field. - send_token_as_response_body: If ``True`` the response will be an oAuth2 token response dict. - Note: if a response body is passed this setting will be ignored. - - Returns: - A :class:`Response <.response.Response>` instance. - """ - encoded_token = self.create_token( - identifier=identifier, - token_expiration=token_expiration, - token_issuer=token_issuer, - token_audience=token_audience, - token_unique_jwt_id=token_unique_jwt_id, - token_extras=token_extras, - ) - expires_in = int((token_expiration or self.default_token_expiration).total_seconds()) - cookie = Cookie( - key=self.key, - path=self.path, - httponly=True, - value=self.format_auth_header(encoded_token), - max_age=expires_in, - secure=self.secure, - samesite=self.samesite, - domain=self.domain, - ) - - if response_body is not Empty: - body = response_body - elif send_token_as_response_body: - token_dto = OAuth2Login( - access_token=encoded_token, - expires_in=expires_in, - token_type="bearer", # noqa: S106 - ) - body = asdict(token_dto) - else: - body = None - - return self.create_response( - content=body, - headers={self.auth_header: self.format_auth_header(encoded_token)}, - cookies=[cookie], - media_type=response_media_type, - status_code=response_status_code, - ) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py deleted file mode 100644 index 84326da..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py +++ /dev/null @@ -1,188 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Awaitable, Callable, Sequence - -from litestar.exceptions import NotAuthorizedException -from litestar.middleware.authentication import ( - AbstractAuthenticationMiddleware, - AuthenticationResult, -) -from litestar.security.jwt.token import Token - -__all__ = ("JWTAuthenticationMiddleware", "JWTCookieAuthenticationMiddleware") - - -if TYPE_CHECKING: - from typing import Any - - from litestar.connection import ASGIConnection - from litestar.types import ASGIApp, Method, Scopes - - -class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware): - """JWT Authentication middleware. - - This class provides JWT authentication functionalities. - """ - - __slots__ = ( - "algorithm", - "auth_header", - "retrieve_user_handler", - "token_secret", - ) - - def __init__( - self, - algorithm: str, - app: ASGIApp, - auth_header: str, - exclude: str | list[str] | None, - exclude_http_methods: Sequence[Method] | None, - exclude_opt_key: str, - retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], - scopes: Scopes, - token_secret: str, - ) -> None: - """Check incoming requests for an encoded token in the auth header specified, and if present retrieve the user - from persistence using the provided function. - - Args: - algorithm: JWT hashing algorithm to use. - app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. - auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``. - exclude: A pattern or list of patterns to skip. - exclude_opt_key: An identifier to use on routes to disable authentication for a particular route. - exclude_http_methods: A sequence of http methods that do not require authentication. - retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user, - which can be any arbitrary value. - scopes: ASGI scopes processed by the authentication middleware. - token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to - encode it. - """ - super().__init__( - app=app, - exclude=exclude, - exclude_from_auth_key=exclude_opt_key, - exclude_http_methods=exclude_http_methods, - scopes=scopes, - ) - self.algorithm = algorithm - self.auth_header = auth_header - self.retrieve_user_handler = retrieve_user_handler - self.token_secret = token_secret - - async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: - """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the - token from the DB. - - Args: - connection: An Litestar HTTPConnection instance. - - Returns: - AuthenticationResult - - Raises: - NotAuthorizedException: If token is invalid or user is not found. - """ - auth_header = connection.headers.get(self.auth_header) - if not auth_header: - raise NotAuthorizedException("No JWT token found in request header") - encoded_token = auth_header.partition(" ")[-1] - return await self.authenticate_token(encoded_token=encoded_token, connection=connection) - - async def authenticate_token( - self, encoded_token: str, connection: ASGIConnection[Any, Any, Any, Any] - ) -> AuthenticationResult: - """Given an encoded JWT token, parse, validate and look up sub within token. - - Args: - encoded_token: Encoded JWT token. - connection: An ASGI connection instance. - - Raises: - NotAuthorizedException: If token is invalid or user is not found. - - Returns: - AuthenticationResult - """ - token = Token.decode( - encoded_token=encoded_token, - secret=self.token_secret, - algorithm=self.algorithm, - ) - - user = await self.retrieve_user_handler(token, connection) - - if not user: - raise NotAuthorizedException() - - return AuthenticationResult(user=user, auth=token) - - -class JWTCookieAuthenticationMiddleware(JWTAuthenticationMiddleware): - """Cookie based JWT authentication middleware.""" - - __slots__ = ("auth_cookie_key",) - - def __init__( - self, - algorithm: str, - app: ASGIApp, - auth_cookie_key: str, - auth_header: str, - exclude: str | list[str] | None, - exclude_opt_key: str, - exclude_http_methods: Sequence[Method] | None, - retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], - scopes: Scopes, - token_secret: str, - ) -> None: - """Check incoming requests for an encoded token in the auth header or cookie name specified, and if present - retrieves the user from persistence using the provided function. - - Args: - algorithm: JWT hashing algorithm to use. - app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. - auth_cookie_key: Cookie name from which to retrieve the token. E.g. ``token`` or ``accessToken``. - auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``. - exclude: A pattern or list of patterns to skip. - exclude_opt_key: An identifier to use on routes to disable authentication for a particular route. - exclude_http_methods: A sequence of http methods that do not require authentication. - retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user, - which can be any arbitrary value. - scopes: ASGI scopes processed by the authentication middleware. - token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to - encode it. - """ - super().__init__( - algorithm=algorithm, - app=app, - auth_header=auth_header, - exclude=exclude, - exclude_http_methods=exclude_http_methods, - exclude_opt_key=exclude_opt_key, - retrieve_user_handler=retrieve_user_handler, - scopes=scopes, - token_secret=token_secret, - ) - self.auth_cookie_key = auth_cookie_key - - async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: - """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the - token from the DB. - - Args: - connection: An Litestar HTTPConnection instance. - - Raises: - NotAuthorizedException: If token is invalid or user is not found. - - Returns: - AuthenticationResult - """ - auth_header = connection.headers.get(self.auth_header) or connection.cookies.get(self.auth_cookie_key) - if not auth_header: - raise NotAuthorizedException("No JWT token found in request header or cookies") - encoded_token = auth_header.partition(" ")[-1] - return await self.authenticate_token(encoded_token=encoded_token, connection=connection) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py deleted file mode 100644 index 279111a..0000000 --- a/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py +++ /dev/null @@ -1,119 +0,0 @@ -from __future__ import annotations - -import dataclasses -from dataclasses import asdict, dataclass, field -from datetime import datetime, timezone -from typing import TYPE_CHECKING, Any - -from jose import JWSError, JWTError, jwt - -from litestar.exceptions import ImproperlyConfiguredException, NotAuthorizedException - -if TYPE_CHECKING: - from typing_extensions import Self - - -__all__ = ("Token",) - - -def _normalize_datetime(value: datetime) -> datetime: - """Convert the given value into UTC and strip microseconds. - - Args: - value: A datetime instance - - Returns: - A datetime instance - """ - if value.tzinfo is not None: - value.astimezone(timezone.utc) - - return value.replace(microsecond=0) - - -@dataclass -class Token: - """JWT Token DTO.""" - - exp: datetime - """Expiration - datetime for token expiration.""" - sub: str - """Subject - usually a unique identifier of the user or equivalent entity.""" - iat: datetime = field(default_factory=lambda: _normalize_datetime(datetime.now(timezone.utc))) - """Issued at - should always be current now.""" - iss: str | None = field(default=None) - """Issuer - optional unique identifier for the issuer.""" - aud: str | None = field(default=None) - """Audience - intended audience.""" - jti: str | None = field(default=None) - """JWT ID - a unique identifier of the JWT between different issuers.""" - extras: dict[str, Any] = field(default_factory=dict) - """Extra fields that were found on the JWT token.""" - - def __post_init__(self) -> None: - if len(self.sub) < 1: - raise ImproperlyConfiguredException("sub must be a string with a length greater than 0") - - if isinstance(self.exp, datetime) and ( - (exp := _normalize_datetime(self.exp)).timestamp() - >= _normalize_datetime(datetime.now(timezone.utc)).timestamp() - ): - self.exp = exp - else: - raise ImproperlyConfiguredException("exp value must be a datetime in the future") - - if isinstance(self.iat, datetime) and ( - (iat := _normalize_datetime(self.iat)).timestamp() - <= _normalize_datetime(datetime.now(timezone.utc)).timestamp() - ): - self.iat = iat - else: - raise ImproperlyConfiguredException("iat must be a current or past time") - - @classmethod - def decode(cls, encoded_token: str, secret: str | dict[str, str], algorithm: str) -> Self: - """Decode a passed in token string and returns a Token instance. - - Args: - encoded_token: A base64 string containing an encoded JWT. - secret: The secret with which the JWT is encoded. It may optionally be an individual JWK or JWS set dict - algorithm: The algorithm used to encode the JWT. - - Returns: - A decoded Token instance. - - Raises: - NotAuthorizedException: If the token is invalid. - """ - try: - payload = jwt.decode(token=encoded_token, key=secret, algorithms=[algorithm], options={"verify_aud": False}) - exp = datetime.fromtimestamp(payload.pop("exp"), tz=timezone.utc) - iat = datetime.fromtimestamp(payload.pop("iat"), tz=timezone.utc) - field_names = {f.name for f in dataclasses.fields(Token)} - extra_fields = payload.keys() - field_names - extras = payload.pop("extras", {}) - for key in extra_fields: - extras[key] = payload.pop(key) - return cls(exp=exp, iat=iat, **payload, extras=extras) - except (KeyError, JWTError, ImproperlyConfiguredException) as e: - raise NotAuthorizedException("Invalid token") from e - - def encode(self, secret: str, algorithm: str) -> str: - """Encode the token instance into a string. - - Args: - secret: The secret with which the JWT is encoded. - algorithm: The algorithm used to encode the JWT. - - Returns: - An encoded token string. - - Raises: - ImproperlyConfiguredException: If encoding fails. - """ - try: - return jwt.encode( - claims={k: v for k, v in asdict(self).items() if v is not None}, key=secret, algorithm=algorithm - ) - except (JWTError, JWSError) as e: - raise ImproperlyConfiguredException("Failed to encode token") from e |