summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/security/jwt
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/security/jwt
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/security/jwt')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py23
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pycbin0 -> 733 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pycbin0 -> 26027 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pycbin0 -> 8900 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pycbin0 -> 7141 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py691
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py188
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/token.py119
8 files changed, 1021 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py
new file mode 100644
index 0000000..4fd88de
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py
@@ -0,0 +1,23 @@
+from litestar.security.jwt.auth import (
+ BaseJWTAuth,
+ JWTAuth,
+ JWTCookieAuth,
+ OAuth2Login,
+ OAuth2PasswordBearerAuth,
+)
+from litestar.security.jwt.middleware import (
+ JWTAuthenticationMiddleware,
+ JWTCookieAuthenticationMiddleware,
+)
+from litestar.security.jwt.token import Token
+
+__all__ = (
+ "BaseJWTAuth",
+ "JWTAuth",
+ "JWTAuthenticationMiddleware",
+ "JWTCookieAuth",
+ "JWTCookieAuthenticationMiddleware",
+ "OAuth2Login",
+ "OAuth2PasswordBearerAuth",
+ "Token",
+)
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000..f04d57f
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc
new file mode 100644
index 0000000..cec42c0
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc
new file mode 100644
index 0000000..8d5603e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc
new file mode 100644
index 0000000..b4f8c45
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py
new file mode 100644
index 0000000..2a0f094
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py
@@ -0,0 +1,691 @@
+from __future__ import annotations
+
+from dataclasses import asdict, dataclass, field
+from datetime import datetime, timedelta, timezone
+from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Literal, Sequence, TypeVar, cast
+
+from litestar.datastructures import Cookie
+from litestar.enums import MediaType
+from litestar.middleware import DefineMiddleware
+from litestar.openapi.spec import Components, OAuthFlow, OAuthFlows, SecurityRequirement, SecurityScheme
+from litestar.security.base import AbstractSecurityConfig
+from litestar.security.jwt.middleware import JWTAuthenticationMiddleware, JWTCookieAuthenticationMiddleware
+from litestar.security.jwt.token import Token
+from litestar.status_codes import HTTP_201_CREATED
+from litestar.types import ControllerRouterHandler, Empty, Guard, Method, Scopes, SyncOrAsyncUnion, TypeEncodersMap
+
+__all__ = ("BaseJWTAuth", "JWTAuth", "JWTCookieAuth", "OAuth2Login", "OAuth2PasswordBearerAuth")
+
+
+if TYPE_CHECKING:
+ from litestar import Response
+ from litestar.connection import ASGIConnection
+ from litestar.di import Provide
+
+
+UserType = TypeVar("UserType")
+
+
+class BaseJWTAuth(Generic[UserType], AbstractSecurityConfig[UserType, Token]):
+ """Base class for JWT Auth backends"""
+
+ token_secret: str
+ """Key with which to generate the token hash.
+
+ Notes:
+ - This value should be kept as a secret and the standard practice is to inject it into the environment.
+ """
+ retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]]
+ """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value.
+
+ Notes:
+ - User and Auth can be any arbitrary values specified by the security backend.
+ - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively.
+ Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties.
+ - The callable can be sync or async. If it is sync, it will be wrapped to support async.
+
+ """
+ algorithm: str
+ """Algorithm to use for JWT hashing."""
+ auth_header: str
+ """Request header key from which to retrieve the token.
+
+ E.g. ``Authorization`` or ``X-Api-Key``.
+ """
+ default_token_expiration: timedelta
+ """The default value for token expiration."""
+ openapi_security_scheme_name: str
+ """The value to use for the OpenAPI security scheme and security requirements."""
+ description: str
+ """Description for the OpenAPI security scheme."""
+ authentication_middleware_class: type[JWTAuthenticationMiddleware] # pyright: ignore
+ """The authentication middleware class to use.
+
+ Must inherit from :class:`JWTAuthenticationMiddleware`
+ """
+
+ @property
+ def openapi_components(self) -> Components:
+ """Create OpenAPI documentation for the JWT auth schema used.
+
+ Returns:
+ An :class:`Components <litestar.openapi.spec.components.Components>` instance.
+ """
+ return Components(
+ security_schemes={
+ self.openapi_security_scheme_name: SecurityScheme(
+ type="http",
+ scheme="Bearer",
+ name=self.auth_header,
+ bearer_format="JWT",
+ description=self.description,
+ )
+ }
+ )
+
+ @property
+ def security_requirement(self) -> SecurityRequirement:
+ """Return OpenAPI 3.1.
+
+ :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>`
+
+ Returns:
+ An OpenAPI 3.1
+ :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>`
+ dictionary.
+ """
+ return {self.openapi_security_scheme_name: []}
+
+ @property
+ def middleware(self) -> DefineMiddleware:
+ """Create :class:`JWTAuthenticationMiddleware` wrapped in
+ :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
+
+ Returns:
+ An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
+ """
+ return DefineMiddleware(
+ self.authentication_middleware_class,
+ algorithm=self.algorithm,
+ auth_header=self.auth_header,
+ exclude=self.exclude,
+ exclude_opt_key=self.exclude_opt_key,
+ exclude_http_methods=self.exclude_http_methods,
+ retrieve_user_handler=self.retrieve_user_handler,
+ scopes=self.scopes,
+ token_secret=self.token_secret,
+ )
+
+ def login(
+ self,
+ identifier: str,
+ *,
+ response_body: Any = Empty,
+ response_media_type: str | MediaType = MediaType.JSON,
+ response_status_code: int = HTTP_201_CREATED,
+ token_expiration: timedelta | None = None,
+ token_issuer: str | None = None,
+ token_audience: str | None = None,
+ token_unique_jwt_id: str | None = None,
+ token_extras: dict[str, Any] | None = None,
+ send_token_as_response_body: bool = False,
+ ) -> Response[Any]:
+ """Create a response with a JWT header.
+
+ Args:
+ identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value.
+ response_body: An optional response body to send.
+ response_media_type: An optional ``Content-Type``. Defaults to ``application/json``.
+ response_status_code: An optional status code for the response. Defaults to ``201``.
+ token_expiration: An optional timedelta for the token expiration.
+ token_issuer: An optional value of the token ``iss`` field.
+ token_audience: An optional value for the token ``aud`` field.
+ token_unique_jwt_id: An optional value for the token ``jti`` field.
+ token_extras: An optional dictionary to include in the token ``extras`` field.
+ send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }``
+ will be returned as the response body. Note: if a response body is passed this setting will be ignored.
+
+ Returns:
+ A :class:`Response <.response.Response>` instance.
+ """
+ encoded_token = self.create_token(
+ identifier=identifier,
+ token_expiration=token_expiration,
+ token_issuer=token_issuer,
+ token_audience=token_audience,
+ token_unique_jwt_id=token_unique_jwt_id,
+ token_extras=token_extras,
+ )
+
+ if response_body is not Empty:
+ body = response_body
+ elif send_token_as_response_body:
+ body = {"token": encoded_token}
+ else:
+ body = None
+
+ return self.create_response(
+ content=body,
+ headers={self.auth_header: self.format_auth_header(encoded_token)},
+ media_type=response_media_type,
+ status_code=response_status_code,
+ )
+
+ def create_token(
+ self,
+ identifier: str,
+ token_expiration: timedelta | None = None,
+ token_issuer: str | None = None,
+ token_audience: str | None = None,
+ token_unique_jwt_id: str | None = None,
+ token_extras: dict | None = None,
+ ) -> str:
+ """Create a Token instance from the passed in parameters, persists and returns it.
+
+ Args:
+ identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value.
+ token_expiration: An optional timedelta for the token expiration.
+ token_issuer: An optional value of the token ``iss`` field.
+ token_audience: An optional value for the token ``aud`` field.
+ token_unique_jwt_id: An optional value for the token ``jti`` field.
+ token_extras: An optional dictionary to include in the token ``extras`` field.
+
+ Returns:
+ The created token.
+ """
+ token = Token(
+ sub=identifier,
+ exp=(datetime.now(timezone.utc) + (token_expiration or self.default_token_expiration)),
+ iss=token_issuer,
+ aud=token_audience,
+ jti=token_unique_jwt_id,
+ extras=token_extras or {},
+ )
+ return token.encode(secret=self.token_secret, algorithm=self.algorithm)
+
+ def format_auth_header(self, encoded_token: str) -> str:
+ """Format a token according to the specified OpenAPI scheme.
+
+ Args:
+ encoded_token: An encoded JWT token
+
+ Returns:
+ The encoded token formatted for the HTTP headers
+ """
+ security = self.openapi_components.security_schemes.get(self.openapi_security_scheme_name, None) # type: ignore[union-attr]
+ return f"{security.scheme} {encoded_token}" if isinstance(security, SecurityScheme) else encoded_token
+
+
+@dataclass
+class JWTAuth(Generic[UserType], BaseJWTAuth[UserType]):
+ """JWT Authentication Configuration.
+
+ This class is the main entry point to the library, and it includes methods to create the middleware, provide login
+ functionality, and create OpenAPI documentation.
+ """
+
+ token_secret: str
+ """Key with which to generate the token hash.
+
+ Notes:
+ - This value should be kept as a secret and the standard practice is to inject it into the environment.
+ """
+ retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]]
+ """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value.
+
+ Notes:
+ - User and Auth can be any arbitrary values specified by the security backend.
+ - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively.
+ Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties.
+ - The callable can be sync or async. If it is sync, it will be wrapped to support async.
+
+ """
+ guards: Iterable[Guard] | None = field(default=None)
+ """An iterable of guards to call for requests, providing authorization functionalities."""
+ exclude: str | list[str] | None = field(default=None)
+ """A pattern or list of patterns to skip in the authentication middleware."""
+ exclude_opt_key: str = field(default="exclude_from_auth")
+ """An identifier to use on routes to disable authentication and authorization checks for a particular route."""
+ exclude_http_methods: Sequence[Method] | None = field(
+ default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"])
+ )
+ """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']"""
+ scopes: Scopes | None = field(default=None)
+ """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be
+ processed."""
+ route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None)
+ """An optional iterable of route handlers to register."""
+ dependencies: dict[str, Provide] | None = field(default=None)
+ """An optional dictionary of dependency providers."""
+
+ type_encoders: TypeEncodersMap | None = field(default=None)
+ """A mapping of types to callables that transform them into types supported for serialization."""
+
+ algorithm: str = field(default="HS256")
+ """Algorithm to use for JWT hashing."""
+ auth_header: str = field(default="Authorization")
+ """Request header key from which to retrieve the token.
+
+ E.g. ``Authorization`` or ``X-Api-Key``.
+ """
+ default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1))
+ """The default value for token expiration."""
+ openapi_security_scheme_name: str = field(default="BearerToken")
+ """The value to use for the OpenAPI security scheme and security requirements."""
+ description: str = field(default="JWT api-key authentication and authorization.")
+ """Description for the OpenAPI security scheme."""
+ authentication_middleware_class: type[JWTAuthenticationMiddleware] = field(default=JWTAuthenticationMiddleware)
+ """The authentication middleware class to use.
+
+ Must inherit from :class:`JWTAuthenticationMiddleware`
+ """
+
+
+@dataclass
+class JWTCookieAuth(Generic[UserType], BaseJWTAuth[UserType]):
+ """JWT Cookie Authentication Configuration.
+
+ This class is an alternate entry point to the library, and it includes all the functionality of the :class:`JWTAuth`
+ class and adds support for passing JWT tokens ``HttpOnly`` cookies.
+ """
+
+ token_secret: str
+ """Key with which to generate the token hash.
+
+ Notes:
+ - This value should be kept as a secret and the standard practice is to inject it into the environment.
+ """
+ retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]]
+ """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value.
+
+ Notes:
+ - User and Auth can be any arbitrary values specified by the security backend.
+ - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively.
+ Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties.
+ - The callable can be sync or async. If it is sync, it will be wrapped to support async.
+
+ """
+ guards: Iterable[Guard] | None = field(default=None)
+ """An iterable of guards to call for requests, providing authorization functionalities."""
+ exclude: str | list[str] | None = field(default=None)
+ """A pattern or list of patterns to skip in the authentication middleware."""
+ exclude_opt_key: str = field(default="exclude_from_auth")
+ """An identifier to use on routes to disable authentication and authorization checks for a particular route."""
+ scopes: Scopes | None = field(default=None)
+ """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be
+ processed."""
+ exclude_http_methods: Sequence[Method] | None = field(
+ default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"])
+ )
+ """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']"""
+ route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None)
+ """An optional iterable of route handlers to register."""
+ dependencies: dict[str, Provide] | None = field(default=None)
+ """An optional dictionary of dependency providers."""
+
+ type_encoders: TypeEncodersMap | None = field(default=None)
+ """A mapping of types to callables that transform them into types supported for serialization."""
+
+ algorithm: str = field(default="HS256")
+ """Algorithm to use for JWT hashing."""
+ auth_header: str = field(default="Authorization")
+ """Request header key from which to retrieve the token.
+
+ E.g. ``Authorization`` or ``X-Api-Key``.
+ """
+ default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1))
+ """The default value for token expiration."""
+ openapi_security_scheme_name: str = field(default="BearerToken")
+ """The value to use for the OpenAPI security scheme and security requirements."""
+ key: str = field(default="token")
+ """Key for the cookie."""
+ path: str = field(default="/")
+ """Path fragment that must exist in the request url for the cookie to be valid.
+
+ Defaults to ``/``.
+ """
+ domain: str | None = field(default=None)
+ """Domain for which the cookie is valid."""
+ secure: bool | None = field(default=None)
+ """Https is required for the cookie."""
+ samesite: Literal["lax", "strict", "none"] = field(default="lax")
+ """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """
+ description: str = field(default="JWT cookie-based authentication and authorization.")
+ """Description for the OpenAPI security scheme."""
+ authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field( # pyright: ignore
+ default=JWTCookieAuthenticationMiddleware
+ )
+ """The authentication middleware class to use. Must inherit from :class:`JWTCookieAuthenticationMiddleware`
+ """
+
+ @property
+ def openapi_components(self) -> Components:
+ """Create OpenAPI documentation for the JWT Cookie auth scheme.
+
+ Returns:
+ A :class:`Components <litestar.openapi.spec.components.Components>` instance.
+ """
+ return Components(
+ security_schemes={
+ self.openapi_security_scheme_name: SecurityScheme(
+ type="http",
+ scheme="Bearer",
+ name=self.key,
+ security_scheme_in="cookie",
+ bearer_format="JWT",
+ description=self.description,
+ )
+ }
+ )
+
+ @property
+ def middleware(self) -> DefineMiddleware:
+ """Create :class:`JWTCookieAuthenticationMiddleware` wrapped in
+ :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
+
+ Returns:
+ An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
+ """
+ return DefineMiddleware(
+ self.authentication_middleware_class,
+ algorithm=self.algorithm,
+ auth_cookie_key=self.key,
+ auth_header=self.auth_header,
+ exclude=self.exclude,
+ exclude_opt_key=self.exclude_opt_key,
+ exclude_http_methods=self.exclude_http_methods,
+ retrieve_user_handler=self.retrieve_user_handler,
+ scopes=self.scopes,
+ token_secret=self.token_secret,
+ )
+
+ def login(
+ self,
+ identifier: str,
+ *,
+ response_body: Any = Empty,
+ response_media_type: str | MediaType = MediaType.JSON,
+ response_status_code: int = HTTP_201_CREATED,
+ token_expiration: timedelta | None = None,
+ token_issuer: str | None = None,
+ token_audience: str | None = None,
+ token_unique_jwt_id: str | None = None,
+ token_extras: dict[str, Any] | None = None,
+ send_token_as_response_body: bool = False,
+ ) -> Response[Any]:
+ """Create a response with a JWT header.
+
+ Args:
+ identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value.
+ response_body: An optional response body to send.
+ response_media_type: An optional 'Content-Type'. Defaults to 'application/json'.
+ response_status_code: An optional status code for the response. Defaults to '201 Created'.
+ token_expiration: An optional timedelta for the token expiration.
+ token_issuer: An optional value of the token ``iss`` field.
+ token_audience: An optional value for the token ``aud`` field.
+ token_unique_jwt_id: An optional value for the token ``jti`` field.
+ token_extras: An optional dictionary to include in the token ``extras`` field.
+ send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }``
+ will be returned as the response body. Note: if a response body is passed this setting will be ignored.
+
+ Returns:
+ A :class:`Response <.response.Response>` instance.
+ """
+
+ encoded_token = self.create_token(
+ identifier=identifier,
+ token_expiration=token_expiration,
+ token_issuer=token_issuer,
+ token_audience=token_audience,
+ token_unique_jwt_id=token_unique_jwt_id,
+ token_extras=token_extras,
+ )
+ cookie = Cookie(
+ key=self.key,
+ path=self.path,
+ httponly=True,
+ value=self.format_auth_header(encoded_token),
+ max_age=int((token_expiration or self.default_token_expiration).total_seconds()),
+ secure=self.secure,
+ samesite=self.samesite,
+ domain=self.domain,
+ )
+
+ if response_body is not Empty:
+ body = response_body
+ elif send_token_as_response_body:
+ body = {"token": encoded_token}
+ else:
+ body = None
+
+ return self.create_response(
+ content=body,
+ headers={self.auth_header: self.format_auth_header(encoded_token)},
+ cookies=[cookie],
+ media_type=response_media_type,
+ status_code=response_status_code,
+ )
+
+
+@dataclass
+class OAuth2Login:
+ """OAuth2 Login DTO"""
+
+ access_token: str
+ """Valid JWT access token"""
+ token_type: str
+ """Type of the OAuth token used"""
+ refresh_token: str | None = field(default=None)
+ """Optional valid refresh token JWT"""
+ expires_in: int | None = field(default=None)
+ """Expiration time of the token in seconds. """
+
+
+@dataclass
+class OAuth2PasswordBearerAuth(Generic[UserType], BaseJWTAuth[UserType]):
+ """OAUTH2 Schema for Password Bearer Authentication.
+
+ This class implements an OAUTH2 authentication flow entry point to the library, and it includes all the
+ functionality of the :class:`JWTAuth` class and adds support for passing JWT tokens ``HttpOnly`` cookies.
+
+ ``token_url`` is the only additional argument that is required, and it should point at your login route
+ """
+
+ token_secret: str
+ """Key with which to generate the token hash.
+
+ Notes:
+ - This value should be kept as a secret and the standard practice is to inject it into the environment.
+ """
+ token_url: str
+ """The URL for retrieving a new token."""
+ retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]]
+ """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value.
+
+ Notes:
+ - User and Auth can be any arbitrary values specified by the security backend.
+ - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively.
+ Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties.
+ - The callable can be sync or async. If it is sync, it will be wrapped to support async.
+
+ """
+ guards: Iterable[Guard] | None = field(default=None)
+ """An iterable of guards to call for requests, providing authorization functionalities."""
+ exclude: str | list[str] | None = field(default=None)
+ """A pattern or list of patterns to skip in the authentication middleware."""
+ exclude_opt_key: str = field(default="exclude_from_auth")
+ """An identifier to use on routes to disable authentication and authorization checks for a particular route."""
+ exclude_http_methods: Sequence[Method] | None = field(
+ default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"])
+ )
+ """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']"""
+ scopes: Scopes | None = field(default=None)
+ """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be
+ processed."""
+ route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None)
+ """An optional iterable of route handlers to register."""
+ dependencies: dict[str, Provide] | None = field(default=None)
+ """An optional dictionary of dependency providers."""
+ type_encoders: TypeEncodersMap | None = field(default=None)
+ """A mapping of types to callables that transform them into types supported for serialization."""
+ algorithm: str = field(default="HS256")
+ """Algorithm to use for JWT hashing."""
+ auth_header: str = field(default="Authorization")
+ """Request header key from which to retrieve the token.
+
+ E.g. ``Authorization`` or 'X-Api-Key'.
+ """
+ default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1))
+ """The default value for token expiration."""
+ openapi_security_scheme_name: str = field(default="BearerToken")
+ """The value to use for the OpenAPI security scheme and security requirements."""
+ oauth_scopes: dict[str, str] | None = field(default=None)
+ """Oauth Scopes available for the token."""
+ key: str = field(default="token")
+ """Key for the cookie."""
+ path: str = field(default="/")
+ """Path fragment that must exist in the request url for the cookie to be valid.
+
+ Defaults to ``/``.
+ """
+ domain: str | None = field(default=None)
+ """Domain for which the cookie is valid."""
+ secure: bool | None = field(default=None)
+ """Https is required for the cookie."""
+ samesite: Literal["lax", "strict", "none"] = field(default="lax")
+ """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """
+ description: str = field(default="OAUTH2 password bearer authentication and authorization.")
+ """Description for the OpenAPI security scheme."""
+ authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field( # pyright: ignore
+ default=JWTCookieAuthenticationMiddleware
+ )
+ """The authentication middleware class to use.
+
+ Must inherit from :class:`JWTCookieAuthenticationMiddleware`
+ """
+
+ @property
+ def middleware(self) -> DefineMiddleware:
+ """Create ``JWTCookieAuthenticationMiddleware`` wrapped in
+ :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
+
+ Returns:
+ An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
+ """
+ return DefineMiddleware(
+ self.authentication_middleware_class,
+ algorithm=self.algorithm,
+ auth_cookie_key=self.key,
+ auth_header=self.auth_header,
+ exclude=self.exclude,
+ exclude_opt_key=self.exclude_opt_key,
+ exclude_http_methods=self.exclude_http_methods,
+ retrieve_user_handler=self.retrieve_user_handler,
+ scopes=self.scopes,
+ token_secret=self.token_secret,
+ )
+
+ @property
+ def oauth_flow(self) -> OAuthFlow:
+ """Create an OpenAPI OAuth2 flow for the password bearer authentication scheme.
+
+ Returns:
+ An :class:`OAuthFlow <litestar.openapi.spec.oauth_flow.OAuthFlow>` instance.
+ """
+ return OAuthFlow(
+ token_url=self.token_url,
+ scopes=self.oauth_scopes,
+ )
+
+ @property
+ def openapi_components(self) -> Components:
+ """Create OpenAPI documentation for the OAUTH2 Password bearer auth scheme.
+
+ Returns:
+ An :class:`Components <litestar.openapi.spec.components.Components>` instance.
+ """
+ return Components(
+ security_schemes={
+ self.openapi_security_scheme_name: SecurityScheme(
+ type="oauth2",
+ scheme="Bearer",
+ name=self.auth_header,
+ security_scheme_in="header",
+ flows=OAuthFlows(password=self.oauth_flow), # pyright: ignore[reportGeneralTypeIssues]
+ bearer_format="JWT",
+ description=self.description,
+ )
+ }
+ )
+
+ def login(
+ self,
+ identifier: str,
+ *,
+ response_body: Any = Empty,
+ response_media_type: str | MediaType = MediaType.JSON,
+ response_status_code: int = HTTP_201_CREATED,
+ token_expiration: timedelta | None = None,
+ token_issuer: str | None = None,
+ token_audience: str | None = None,
+ token_unique_jwt_id: str | None = None,
+ token_extras: dict[str, Any] | None = None,
+ send_token_as_response_body: bool = True,
+ ) -> Response[Any]:
+ """Create a response with a JWT header.
+
+ Args:
+ identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value.
+ response_body: An optional response body to send.
+ response_media_type: An optional ``Content-Type``. Defaults to ``application/json``.
+ response_status_code: An optional status code for the response. Defaults to ``201``.
+ token_expiration: An optional timedelta for the token expiration.
+ token_issuer: An optional value of the token ``iss`` field.
+ token_audience: An optional value for the token ``aud`` field.
+ token_unique_jwt_id: An optional value for the token ``jti`` field.
+ token_extras: An optional dictionary to include in the token ``extras`` field.
+ send_token_as_response_body: If ``True`` the response will be an oAuth2 token response dict.
+ Note: if a response body is passed this setting will be ignored.
+
+ Returns:
+ A :class:`Response <.response.Response>` instance.
+ """
+ encoded_token = self.create_token(
+ identifier=identifier,
+ token_expiration=token_expiration,
+ token_issuer=token_issuer,
+ token_audience=token_audience,
+ token_unique_jwt_id=token_unique_jwt_id,
+ token_extras=token_extras,
+ )
+ expires_in = int((token_expiration or self.default_token_expiration).total_seconds())
+ cookie = Cookie(
+ key=self.key,
+ path=self.path,
+ httponly=True,
+ value=self.format_auth_header(encoded_token),
+ max_age=expires_in,
+ secure=self.secure,
+ samesite=self.samesite,
+ domain=self.domain,
+ )
+
+ if response_body is not Empty:
+ body = response_body
+ elif send_token_as_response_body:
+ token_dto = OAuth2Login(
+ access_token=encoded_token,
+ expires_in=expires_in,
+ token_type="bearer", # noqa: S106
+ )
+ body = asdict(token_dto)
+ else:
+ body = None
+
+ return self.create_response(
+ content=body,
+ headers={self.auth_header: self.format_auth_header(encoded_token)},
+ cookies=[cookie],
+ media_type=response_media_type,
+ status_code=response_status_code,
+ )
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py
new file mode 100644
index 0000000..84326da
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py
@@ -0,0 +1,188 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Awaitable, Callable, Sequence
+
+from litestar.exceptions import NotAuthorizedException
+from litestar.middleware.authentication import (
+ AbstractAuthenticationMiddleware,
+ AuthenticationResult,
+)
+from litestar.security.jwt.token import Token
+
+__all__ = ("JWTAuthenticationMiddleware", "JWTCookieAuthenticationMiddleware")
+
+
+if TYPE_CHECKING:
+ from typing import Any
+
+ from litestar.connection import ASGIConnection
+ from litestar.types import ASGIApp, Method, Scopes
+
+
+class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware):
+ """JWT Authentication middleware.
+
+ This class provides JWT authentication functionalities.
+ """
+
+ __slots__ = (
+ "algorithm",
+ "auth_header",
+ "retrieve_user_handler",
+ "token_secret",
+ )
+
+ def __init__(
+ self,
+ algorithm: str,
+ app: ASGIApp,
+ auth_header: str,
+ exclude: str | list[str] | None,
+ exclude_http_methods: Sequence[Method] | None,
+ exclude_opt_key: str,
+ retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]],
+ scopes: Scopes,
+ token_secret: str,
+ ) -> None:
+ """Check incoming requests for an encoded token in the auth header specified, and if present retrieve the user
+ from persistence using the provided function.
+
+ Args:
+ algorithm: JWT hashing algorithm to use.
+ app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack.
+ auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``.
+ exclude: A pattern or list of patterns to skip.
+ exclude_opt_key: An identifier to use on routes to disable authentication for a particular route.
+ exclude_http_methods: A sequence of http methods that do not require authentication.
+ retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user,
+ which can be any arbitrary value.
+ scopes: ASGI scopes processed by the authentication middleware.
+ token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to
+ encode it.
+ """
+ super().__init__(
+ app=app,
+ exclude=exclude,
+ exclude_from_auth_key=exclude_opt_key,
+ exclude_http_methods=exclude_http_methods,
+ scopes=scopes,
+ )
+ self.algorithm = algorithm
+ self.auth_header = auth_header
+ self.retrieve_user_handler = retrieve_user_handler
+ self.token_secret = token_secret
+
+ async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult:
+ """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the
+ token from the DB.
+
+ Args:
+ connection: An Litestar HTTPConnection instance.
+
+ Returns:
+ AuthenticationResult
+
+ Raises:
+ NotAuthorizedException: If token is invalid or user is not found.
+ """
+ auth_header = connection.headers.get(self.auth_header)
+ if not auth_header:
+ raise NotAuthorizedException("No JWT token found in request header")
+ encoded_token = auth_header.partition(" ")[-1]
+ return await self.authenticate_token(encoded_token=encoded_token, connection=connection)
+
+ async def authenticate_token(
+ self, encoded_token: str, connection: ASGIConnection[Any, Any, Any, Any]
+ ) -> AuthenticationResult:
+ """Given an encoded JWT token, parse, validate and look up sub within token.
+
+ Args:
+ encoded_token: Encoded JWT token.
+ connection: An ASGI connection instance.
+
+ Raises:
+ NotAuthorizedException: If token is invalid or user is not found.
+
+ Returns:
+ AuthenticationResult
+ """
+ token = Token.decode(
+ encoded_token=encoded_token,
+ secret=self.token_secret,
+ algorithm=self.algorithm,
+ )
+
+ user = await self.retrieve_user_handler(token, connection)
+
+ if not user:
+ raise NotAuthorizedException()
+
+ return AuthenticationResult(user=user, auth=token)
+
+
+class JWTCookieAuthenticationMiddleware(JWTAuthenticationMiddleware):
+ """Cookie based JWT authentication middleware."""
+
+ __slots__ = ("auth_cookie_key",)
+
+ def __init__(
+ self,
+ algorithm: str,
+ app: ASGIApp,
+ auth_cookie_key: str,
+ auth_header: str,
+ exclude: str | list[str] | None,
+ exclude_opt_key: str,
+ exclude_http_methods: Sequence[Method] | None,
+ retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]],
+ scopes: Scopes,
+ token_secret: str,
+ ) -> None:
+ """Check incoming requests for an encoded token in the auth header or cookie name specified, and if present
+ retrieves the user from persistence using the provided function.
+
+ Args:
+ algorithm: JWT hashing algorithm to use.
+ app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack.
+ auth_cookie_key: Cookie name from which to retrieve the token. E.g. ``token`` or ``accessToken``.
+ auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``.
+ exclude: A pattern or list of patterns to skip.
+ exclude_opt_key: An identifier to use on routes to disable authentication for a particular route.
+ exclude_http_methods: A sequence of http methods that do not require authentication.
+ retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user,
+ which can be any arbitrary value.
+ scopes: ASGI scopes processed by the authentication middleware.
+ token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to
+ encode it.
+ """
+ super().__init__(
+ algorithm=algorithm,
+ app=app,
+ auth_header=auth_header,
+ exclude=exclude,
+ exclude_http_methods=exclude_http_methods,
+ exclude_opt_key=exclude_opt_key,
+ retrieve_user_handler=retrieve_user_handler,
+ scopes=scopes,
+ token_secret=token_secret,
+ )
+ self.auth_cookie_key = auth_cookie_key
+
+ async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult:
+ """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the
+ token from the DB.
+
+ Args:
+ connection: An Litestar HTTPConnection instance.
+
+ Raises:
+ NotAuthorizedException: If token is invalid or user is not found.
+
+ Returns:
+ AuthenticationResult
+ """
+ auth_header = connection.headers.get(self.auth_header) or connection.cookies.get(self.auth_cookie_key)
+ if not auth_header:
+ raise NotAuthorizedException("No JWT token found in request header or cookies")
+ encoded_token = auth_header.partition(" ")[-1]
+ return await self.authenticate_token(encoded_token=encoded_token, connection=connection)
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py
new file mode 100644
index 0000000..279111a
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py
@@ -0,0 +1,119 @@
+from __future__ import annotations
+
+import dataclasses
+from dataclasses import asdict, dataclass, field
+from datetime import datetime, timezone
+from typing import TYPE_CHECKING, Any
+
+from jose import JWSError, JWTError, jwt
+
+from litestar.exceptions import ImproperlyConfiguredException, NotAuthorizedException
+
+if TYPE_CHECKING:
+ from typing_extensions import Self
+
+
+__all__ = ("Token",)
+
+
+def _normalize_datetime(value: datetime) -> datetime:
+ """Convert the given value into UTC and strip microseconds.
+
+ Args:
+ value: A datetime instance
+
+ Returns:
+ A datetime instance
+ """
+ if value.tzinfo is not None:
+ value.astimezone(timezone.utc)
+
+ return value.replace(microsecond=0)
+
+
+@dataclass
+class Token:
+ """JWT Token DTO."""
+
+ exp: datetime
+ """Expiration - datetime for token expiration."""
+ sub: str
+ """Subject - usually a unique identifier of the user or equivalent entity."""
+ iat: datetime = field(default_factory=lambda: _normalize_datetime(datetime.now(timezone.utc)))
+ """Issued at - should always be current now."""
+ iss: str | None = field(default=None)
+ """Issuer - optional unique identifier for the issuer."""
+ aud: str | None = field(default=None)
+ """Audience - intended audience."""
+ jti: str | None = field(default=None)
+ """JWT ID - a unique identifier of the JWT between different issuers."""
+ extras: dict[str, Any] = field(default_factory=dict)
+ """Extra fields that were found on the JWT token."""
+
+ def __post_init__(self) -> None:
+ if len(self.sub) < 1:
+ raise ImproperlyConfiguredException("sub must be a string with a length greater than 0")
+
+ if isinstance(self.exp, datetime) and (
+ (exp := _normalize_datetime(self.exp)).timestamp()
+ >= _normalize_datetime(datetime.now(timezone.utc)).timestamp()
+ ):
+ self.exp = exp
+ else:
+ raise ImproperlyConfiguredException("exp value must be a datetime in the future")
+
+ if isinstance(self.iat, datetime) and (
+ (iat := _normalize_datetime(self.iat)).timestamp()
+ <= _normalize_datetime(datetime.now(timezone.utc)).timestamp()
+ ):
+ self.iat = iat
+ else:
+ raise ImproperlyConfiguredException("iat must be a current or past time")
+
+ @classmethod
+ def decode(cls, encoded_token: str, secret: str | dict[str, str], algorithm: str) -> Self:
+ """Decode a passed in token string and returns a Token instance.
+
+ Args:
+ encoded_token: A base64 string containing an encoded JWT.
+ secret: The secret with which the JWT is encoded. It may optionally be an individual JWK or JWS set dict
+ algorithm: The algorithm used to encode the JWT.
+
+ Returns:
+ A decoded Token instance.
+
+ Raises:
+ NotAuthorizedException: If the token is invalid.
+ """
+ try:
+ payload = jwt.decode(token=encoded_token, key=secret, algorithms=[algorithm], options={"verify_aud": False})
+ exp = datetime.fromtimestamp(payload.pop("exp"), tz=timezone.utc)
+ iat = datetime.fromtimestamp(payload.pop("iat"), tz=timezone.utc)
+ field_names = {f.name for f in dataclasses.fields(Token)}
+ extra_fields = payload.keys() - field_names
+ extras = payload.pop("extras", {})
+ for key in extra_fields:
+ extras[key] = payload.pop(key)
+ return cls(exp=exp, iat=iat, **payload, extras=extras)
+ except (KeyError, JWTError, ImproperlyConfiguredException) as e:
+ raise NotAuthorizedException("Invalid token") from e
+
+ def encode(self, secret: str, algorithm: str) -> str:
+ """Encode the token instance into a string.
+
+ Args:
+ secret: The secret with which the JWT is encoded.
+ algorithm: The algorithm used to encode the JWT.
+
+ Returns:
+ An encoded token string.
+
+ Raises:
+ ImproperlyConfiguredException: If encoding fails.
+ """
+ try:
+ return jwt.encode(
+ claims={k: v for k, v in asdict(self).items() if v is not None}, key=secret, algorithm=algorithm
+ )
+ except (JWTError, JWSError) as e:
+ raise ImproperlyConfiguredException("Failed to encode token") from e