summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/security/jwt
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/security/jwt')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py23
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pycbin733 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pycbin26027 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pycbin8900 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pycbin7141 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py691
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py188
-rw-r--r--venv/lib/python3.11/site-packages/litestar/security/jwt/token.py119
8 files changed, 0 insertions, 1021 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py
deleted file mode 100644
index 4fd88de..0000000
--- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py
+++ /dev/null
@@ -1,23 +0,0 @@
-from litestar.security.jwt.auth import (
- BaseJWTAuth,
- JWTAuth,
- JWTCookieAuth,
- OAuth2Login,
- OAuth2PasswordBearerAuth,
-)
-from litestar.security.jwt.middleware import (
- JWTAuthenticationMiddleware,
- JWTCookieAuthenticationMiddleware,
-)
-from litestar.security.jwt.token import Token
-
-__all__ = (
- "BaseJWTAuth",
- "JWTAuth",
- "JWTAuthenticationMiddleware",
- "JWTCookieAuth",
- "JWTCookieAuthenticationMiddleware",
- "OAuth2Login",
- "OAuth2PasswordBearerAuth",
- "Token",
-)
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index f04d57f..0000000
--- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc
deleted file mode 100644
index cec42c0..0000000
--- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc
deleted file mode 100644
index 8d5603e..0000000
--- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc
deleted file mode 100644
index b4f8c45..0000000
--- a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py
deleted file mode 100644
index 2a0f094..0000000
--- a/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py
+++ /dev/null
@@ -1,691 +0,0 @@
-from __future__ import annotations
-
-from dataclasses import asdict, dataclass, field
-from datetime import datetime, timedelta, timezone
-from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Literal, Sequence, TypeVar, cast
-
-from litestar.datastructures import Cookie
-from litestar.enums import MediaType
-from litestar.middleware import DefineMiddleware
-from litestar.openapi.spec import Components, OAuthFlow, OAuthFlows, SecurityRequirement, SecurityScheme
-from litestar.security.base import AbstractSecurityConfig
-from litestar.security.jwt.middleware import JWTAuthenticationMiddleware, JWTCookieAuthenticationMiddleware
-from litestar.security.jwt.token import Token
-from litestar.status_codes import HTTP_201_CREATED
-from litestar.types import ControllerRouterHandler, Empty, Guard, Method, Scopes, SyncOrAsyncUnion, TypeEncodersMap
-
-__all__ = ("BaseJWTAuth", "JWTAuth", "JWTCookieAuth", "OAuth2Login", "OAuth2PasswordBearerAuth")
-
-
-if TYPE_CHECKING:
- from litestar import Response
- from litestar.connection import ASGIConnection
- from litestar.di import Provide
-
-
-UserType = TypeVar("UserType")
-
-
-class BaseJWTAuth(Generic[UserType], AbstractSecurityConfig[UserType, Token]):
- """Base class for JWT Auth backends"""
-
- token_secret: str
- """Key with which to generate the token hash.
-
- Notes:
- - This value should be kept as a secret and the standard practice is to inject it into the environment.
- """
- retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]]
- """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value.
-
- Notes:
- - User and Auth can be any arbitrary values specified by the security backend.
- - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively.
- Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties.
- - The callable can be sync or async. If it is sync, it will be wrapped to support async.
-
- """
- algorithm: str
- """Algorithm to use for JWT hashing."""
- auth_header: str
- """Request header key from which to retrieve the token.
-
- E.g. ``Authorization`` or ``X-Api-Key``.
- """
- default_token_expiration: timedelta
- """The default value for token expiration."""
- openapi_security_scheme_name: str
- """The value to use for the OpenAPI security scheme and security requirements."""
- description: str
- """Description for the OpenAPI security scheme."""
- authentication_middleware_class: type[JWTAuthenticationMiddleware] # pyright: ignore
- """The authentication middleware class to use.
-
- Must inherit from :class:`JWTAuthenticationMiddleware`
- """
-
- @property
- def openapi_components(self) -> Components:
- """Create OpenAPI documentation for the JWT auth schema used.
-
- Returns:
- An :class:`Components <litestar.openapi.spec.components.Components>` instance.
- """
- return Components(
- security_schemes={
- self.openapi_security_scheme_name: SecurityScheme(
- type="http",
- scheme="Bearer",
- name=self.auth_header,
- bearer_format="JWT",
- description=self.description,
- )
- }
- )
-
- @property
- def security_requirement(self) -> SecurityRequirement:
- """Return OpenAPI 3.1.
-
- :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>`
-
- Returns:
- An OpenAPI 3.1
- :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>`
- dictionary.
- """
- return {self.openapi_security_scheme_name: []}
-
- @property
- def middleware(self) -> DefineMiddleware:
- """Create :class:`JWTAuthenticationMiddleware` wrapped in
- :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
-
- Returns:
- An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
- """
- return DefineMiddleware(
- self.authentication_middleware_class,
- algorithm=self.algorithm,
- auth_header=self.auth_header,
- exclude=self.exclude,
- exclude_opt_key=self.exclude_opt_key,
- exclude_http_methods=self.exclude_http_methods,
- retrieve_user_handler=self.retrieve_user_handler,
- scopes=self.scopes,
- token_secret=self.token_secret,
- )
-
- def login(
- self,
- identifier: str,
- *,
- response_body: Any = Empty,
- response_media_type: str | MediaType = MediaType.JSON,
- response_status_code: int = HTTP_201_CREATED,
- token_expiration: timedelta | None = None,
- token_issuer: str | None = None,
- token_audience: str | None = None,
- token_unique_jwt_id: str | None = None,
- token_extras: dict[str, Any] | None = None,
- send_token_as_response_body: bool = False,
- ) -> Response[Any]:
- """Create a response with a JWT header.
-
- Args:
- identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value.
- response_body: An optional response body to send.
- response_media_type: An optional ``Content-Type``. Defaults to ``application/json``.
- response_status_code: An optional status code for the response. Defaults to ``201``.
- token_expiration: An optional timedelta for the token expiration.
- token_issuer: An optional value of the token ``iss`` field.
- token_audience: An optional value for the token ``aud`` field.
- token_unique_jwt_id: An optional value for the token ``jti`` field.
- token_extras: An optional dictionary to include in the token ``extras`` field.
- send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }``
- will be returned as the response body. Note: if a response body is passed this setting will be ignored.
-
- Returns:
- A :class:`Response <.response.Response>` instance.
- """
- encoded_token = self.create_token(
- identifier=identifier,
- token_expiration=token_expiration,
- token_issuer=token_issuer,
- token_audience=token_audience,
- token_unique_jwt_id=token_unique_jwt_id,
- token_extras=token_extras,
- )
-
- if response_body is not Empty:
- body = response_body
- elif send_token_as_response_body:
- body = {"token": encoded_token}
- else:
- body = None
-
- return self.create_response(
- content=body,
- headers={self.auth_header: self.format_auth_header(encoded_token)},
- media_type=response_media_type,
- status_code=response_status_code,
- )
-
- def create_token(
- self,
- identifier: str,
- token_expiration: timedelta | None = None,
- token_issuer: str | None = None,
- token_audience: str | None = None,
- token_unique_jwt_id: str | None = None,
- token_extras: dict | None = None,
- ) -> str:
- """Create a Token instance from the passed in parameters, persists and returns it.
-
- Args:
- identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value.
- token_expiration: An optional timedelta for the token expiration.
- token_issuer: An optional value of the token ``iss`` field.
- token_audience: An optional value for the token ``aud`` field.
- token_unique_jwt_id: An optional value for the token ``jti`` field.
- token_extras: An optional dictionary to include in the token ``extras`` field.
-
- Returns:
- The created token.
- """
- token = Token(
- sub=identifier,
- exp=(datetime.now(timezone.utc) + (token_expiration or self.default_token_expiration)),
- iss=token_issuer,
- aud=token_audience,
- jti=token_unique_jwt_id,
- extras=token_extras or {},
- )
- return token.encode(secret=self.token_secret, algorithm=self.algorithm)
-
- def format_auth_header(self, encoded_token: str) -> str:
- """Format a token according to the specified OpenAPI scheme.
-
- Args:
- encoded_token: An encoded JWT token
-
- Returns:
- The encoded token formatted for the HTTP headers
- """
- security = self.openapi_components.security_schemes.get(self.openapi_security_scheme_name, None) # type: ignore[union-attr]
- return f"{security.scheme} {encoded_token}" if isinstance(security, SecurityScheme) else encoded_token
-
-
-@dataclass
-class JWTAuth(Generic[UserType], BaseJWTAuth[UserType]):
- """JWT Authentication Configuration.
-
- This class is the main entry point to the library, and it includes methods to create the middleware, provide login
- functionality, and create OpenAPI documentation.
- """
-
- token_secret: str
- """Key with which to generate the token hash.
-
- Notes:
- - This value should be kept as a secret and the standard practice is to inject it into the environment.
- """
- retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]]
- """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value.
-
- Notes:
- - User and Auth can be any arbitrary values specified by the security backend.
- - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively.
- Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties.
- - The callable can be sync or async. If it is sync, it will be wrapped to support async.
-
- """
- guards: Iterable[Guard] | None = field(default=None)
- """An iterable of guards to call for requests, providing authorization functionalities."""
- exclude: str | list[str] | None = field(default=None)
- """A pattern or list of patterns to skip in the authentication middleware."""
- exclude_opt_key: str = field(default="exclude_from_auth")
- """An identifier to use on routes to disable authentication and authorization checks for a particular route."""
- exclude_http_methods: Sequence[Method] | None = field(
- default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"])
- )
- """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']"""
- scopes: Scopes | None = field(default=None)
- """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be
- processed."""
- route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None)
- """An optional iterable of route handlers to register."""
- dependencies: dict[str, Provide] | None = field(default=None)
- """An optional dictionary of dependency providers."""
-
- type_encoders: TypeEncodersMap | None = field(default=None)
- """A mapping of types to callables that transform them into types supported for serialization."""
-
- algorithm: str = field(default="HS256")
- """Algorithm to use for JWT hashing."""
- auth_header: str = field(default="Authorization")
- """Request header key from which to retrieve the token.
-
- E.g. ``Authorization`` or ``X-Api-Key``.
- """
- default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1))
- """The default value for token expiration."""
- openapi_security_scheme_name: str = field(default="BearerToken")
- """The value to use for the OpenAPI security scheme and security requirements."""
- description: str = field(default="JWT api-key authentication and authorization.")
- """Description for the OpenAPI security scheme."""
- authentication_middleware_class: type[JWTAuthenticationMiddleware] = field(default=JWTAuthenticationMiddleware)
- """The authentication middleware class to use.
-
- Must inherit from :class:`JWTAuthenticationMiddleware`
- """
-
-
-@dataclass
-class JWTCookieAuth(Generic[UserType], BaseJWTAuth[UserType]):
- """JWT Cookie Authentication Configuration.
-
- This class is an alternate entry point to the library, and it includes all the functionality of the :class:`JWTAuth`
- class and adds support for passing JWT tokens ``HttpOnly`` cookies.
- """
-
- token_secret: str
- """Key with which to generate the token hash.
-
- Notes:
- - This value should be kept as a secret and the standard practice is to inject it into the environment.
- """
- retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]]
- """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value.
-
- Notes:
- - User and Auth can be any arbitrary values specified by the security backend.
- - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively.
- Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties.
- - The callable can be sync or async. If it is sync, it will be wrapped to support async.
-
- """
- guards: Iterable[Guard] | None = field(default=None)
- """An iterable of guards to call for requests, providing authorization functionalities."""
- exclude: str | list[str] | None = field(default=None)
- """A pattern or list of patterns to skip in the authentication middleware."""
- exclude_opt_key: str = field(default="exclude_from_auth")
- """An identifier to use on routes to disable authentication and authorization checks for a particular route."""
- scopes: Scopes | None = field(default=None)
- """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be
- processed."""
- exclude_http_methods: Sequence[Method] | None = field(
- default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"])
- )
- """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']"""
- route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None)
- """An optional iterable of route handlers to register."""
- dependencies: dict[str, Provide] | None = field(default=None)
- """An optional dictionary of dependency providers."""
-
- type_encoders: TypeEncodersMap | None = field(default=None)
- """A mapping of types to callables that transform them into types supported for serialization."""
-
- algorithm: str = field(default="HS256")
- """Algorithm to use for JWT hashing."""
- auth_header: str = field(default="Authorization")
- """Request header key from which to retrieve the token.
-
- E.g. ``Authorization`` or ``X-Api-Key``.
- """
- default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1))
- """The default value for token expiration."""
- openapi_security_scheme_name: str = field(default="BearerToken")
- """The value to use for the OpenAPI security scheme and security requirements."""
- key: str = field(default="token")
- """Key for the cookie."""
- path: str = field(default="/")
- """Path fragment that must exist in the request url for the cookie to be valid.
-
- Defaults to ``/``.
- """
- domain: str | None = field(default=None)
- """Domain for which the cookie is valid."""
- secure: bool | None = field(default=None)
- """Https is required for the cookie."""
- samesite: Literal["lax", "strict", "none"] = field(default="lax")
- """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """
- description: str = field(default="JWT cookie-based authentication and authorization.")
- """Description for the OpenAPI security scheme."""
- authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field( # pyright: ignore
- default=JWTCookieAuthenticationMiddleware
- )
- """The authentication middleware class to use. Must inherit from :class:`JWTCookieAuthenticationMiddleware`
- """
-
- @property
- def openapi_components(self) -> Components:
- """Create OpenAPI documentation for the JWT Cookie auth scheme.
-
- Returns:
- A :class:`Components <litestar.openapi.spec.components.Components>` instance.
- """
- return Components(
- security_schemes={
- self.openapi_security_scheme_name: SecurityScheme(
- type="http",
- scheme="Bearer",
- name=self.key,
- security_scheme_in="cookie",
- bearer_format="JWT",
- description=self.description,
- )
- }
- )
-
- @property
- def middleware(self) -> DefineMiddleware:
- """Create :class:`JWTCookieAuthenticationMiddleware` wrapped in
- :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
-
- Returns:
- An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
- """
- return DefineMiddleware(
- self.authentication_middleware_class,
- algorithm=self.algorithm,
- auth_cookie_key=self.key,
- auth_header=self.auth_header,
- exclude=self.exclude,
- exclude_opt_key=self.exclude_opt_key,
- exclude_http_methods=self.exclude_http_methods,
- retrieve_user_handler=self.retrieve_user_handler,
- scopes=self.scopes,
- token_secret=self.token_secret,
- )
-
- def login(
- self,
- identifier: str,
- *,
- response_body: Any = Empty,
- response_media_type: str | MediaType = MediaType.JSON,
- response_status_code: int = HTTP_201_CREATED,
- token_expiration: timedelta | None = None,
- token_issuer: str | None = None,
- token_audience: str | None = None,
- token_unique_jwt_id: str | None = None,
- token_extras: dict[str, Any] | None = None,
- send_token_as_response_body: bool = False,
- ) -> Response[Any]:
- """Create a response with a JWT header.
-
- Args:
- identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value.
- response_body: An optional response body to send.
- response_media_type: An optional 'Content-Type'. Defaults to 'application/json'.
- response_status_code: An optional status code for the response. Defaults to '201 Created'.
- token_expiration: An optional timedelta for the token expiration.
- token_issuer: An optional value of the token ``iss`` field.
- token_audience: An optional value for the token ``aud`` field.
- token_unique_jwt_id: An optional value for the token ``jti`` field.
- token_extras: An optional dictionary to include in the token ``extras`` field.
- send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }``
- will be returned as the response body. Note: if a response body is passed this setting will be ignored.
-
- Returns:
- A :class:`Response <.response.Response>` instance.
- """
-
- encoded_token = self.create_token(
- identifier=identifier,
- token_expiration=token_expiration,
- token_issuer=token_issuer,
- token_audience=token_audience,
- token_unique_jwt_id=token_unique_jwt_id,
- token_extras=token_extras,
- )
- cookie = Cookie(
- key=self.key,
- path=self.path,
- httponly=True,
- value=self.format_auth_header(encoded_token),
- max_age=int((token_expiration or self.default_token_expiration).total_seconds()),
- secure=self.secure,
- samesite=self.samesite,
- domain=self.domain,
- )
-
- if response_body is not Empty:
- body = response_body
- elif send_token_as_response_body:
- body = {"token": encoded_token}
- else:
- body = None
-
- return self.create_response(
- content=body,
- headers={self.auth_header: self.format_auth_header(encoded_token)},
- cookies=[cookie],
- media_type=response_media_type,
- status_code=response_status_code,
- )
-
-
-@dataclass
-class OAuth2Login:
- """OAuth2 Login DTO"""
-
- access_token: str
- """Valid JWT access token"""
- token_type: str
- """Type of the OAuth token used"""
- refresh_token: str | None = field(default=None)
- """Optional valid refresh token JWT"""
- expires_in: int | None = field(default=None)
- """Expiration time of the token in seconds. """
-
-
-@dataclass
-class OAuth2PasswordBearerAuth(Generic[UserType], BaseJWTAuth[UserType]):
- """OAUTH2 Schema for Password Bearer Authentication.
-
- This class implements an OAUTH2 authentication flow entry point to the library, and it includes all the
- functionality of the :class:`JWTAuth` class and adds support for passing JWT tokens ``HttpOnly`` cookies.
-
- ``token_url`` is the only additional argument that is required, and it should point at your login route
- """
-
- token_secret: str
- """Key with which to generate the token hash.
-
- Notes:
- - This value should be kept as a secret and the standard practice is to inject it into the environment.
- """
- token_url: str
- """The URL for retrieving a new token."""
- retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]]
- """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value.
-
- Notes:
- - User and Auth can be any arbitrary values specified by the security backend.
- - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively.
- Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties.
- - The callable can be sync or async. If it is sync, it will be wrapped to support async.
-
- """
- guards: Iterable[Guard] | None = field(default=None)
- """An iterable of guards to call for requests, providing authorization functionalities."""
- exclude: str | list[str] | None = field(default=None)
- """A pattern or list of patterns to skip in the authentication middleware."""
- exclude_opt_key: str = field(default="exclude_from_auth")
- """An identifier to use on routes to disable authentication and authorization checks for a particular route."""
- exclude_http_methods: Sequence[Method] | None = field(
- default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"])
- )
- """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']"""
- scopes: Scopes | None = field(default=None)
- """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be
- processed."""
- route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None)
- """An optional iterable of route handlers to register."""
- dependencies: dict[str, Provide] | None = field(default=None)
- """An optional dictionary of dependency providers."""
- type_encoders: TypeEncodersMap | None = field(default=None)
- """A mapping of types to callables that transform them into types supported for serialization."""
- algorithm: str = field(default="HS256")
- """Algorithm to use for JWT hashing."""
- auth_header: str = field(default="Authorization")
- """Request header key from which to retrieve the token.
-
- E.g. ``Authorization`` or 'X-Api-Key'.
- """
- default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1))
- """The default value for token expiration."""
- openapi_security_scheme_name: str = field(default="BearerToken")
- """The value to use for the OpenAPI security scheme and security requirements."""
- oauth_scopes: dict[str, str] | None = field(default=None)
- """Oauth Scopes available for the token."""
- key: str = field(default="token")
- """Key for the cookie."""
- path: str = field(default="/")
- """Path fragment that must exist in the request url for the cookie to be valid.
-
- Defaults to ``/``.
- """
- domain: str | None = field(default=None)
- """Domain for which the cookie is valid."""
- secure: bool | None = field(default=None)
- """Https is required for the cookie."""
- samesite: Literal["lax", "strict", "none"] = field(default="lax")
- """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """
- description: str = field(default="OAUTH2 password bearer authentication and authorization.")
- """Description for the OpenAPI security scheme."""
- authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field( # pyright: ignore
- default=JWTCookieAuthenticationMiddleware
- )
- """The authentication middleware class to use.
-
- Must inherit from :class:`JWTCookieAuthenticationMiddleware`
- """
-
- @property
- def middleware(self) -> DefineMiddleware:
- """Create ``JWTCookieAuthenticationMiddleware`` wrapped in
- :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
-
- Returns:
- An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`.
- """
- return DefineMiddleware(
- self.authentication_middleware_class,
- algorithm=self.algorithm,
- auth_cookie_key=self.key,
- auth_header=self.auth_header,
- exclude=self.exclude,
- exclude_opt_key=self.exclude_opt_key,
- exclude_http_methods=self.exclude_http_methods,
- retrieve_user_handler=self.retrieve_user_handler,
- scopes=self.scopes,
- token_secret=self.token_secret,
- )
-
- @property
- def oauth_flow(self) -> OAuthFlow:
- """Create an OpenAPI OAuth2 flow for the password bearer authentication scheme.
-
- Returns:
- An :class:`OAuthFlow <litestar.openapi.spec.oauth_flow.OAuthFlow>` instance.
- """
- return OAuthFlow(
- token_url=self.token_url,
- scopes=self.oauth_scopes,
- )
-
- @property
- def openapi_components(self) -> Components:
- """Create OpenAPI documentation for the OAUTH2 Password bearer auth scheme.
-
- Returns:
- An :class:`Components <litestar.openapi.spec.components.Components>` instance.
- """
- return Components(
- security_schemes={
- self.openapi_security_scheme_name: SecurityScheme(
- type="oauth2",
- scheme="Bearer",
- name=self.auth_header,
- security_scheme_in="header",
- flows=OAuthFlows(password=self.oauth_flow), # pyright: ignore[reportGeneralTypeIssues]
- bearer_format="JWT",
- description=self.description,
- )
- }
- )
-
- def login(
- self,
- identifier: str,
- *,
- response_body: Any = Empty,
- response_media_type: str | MediaType = MediaType.JSON,
- response_status_code: int = HTTP_201_CREATED,
- token_expiration: timedelta | None = None,
- token_issuer: str | None = None,
- token_audience: str | None = None,
- token_unique_jwt_id: str | None = None,
- token_extras: dict[str, Any] | None = None,
- send_token_as_response_body: bool = True,
- ) -> Response[Any]:
- """Create a response with a JWT header.
-
- Args:
- identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value.
- response_body: An optional response body to send.
- response_media_type: An optional ``Content-Type``. Defaults to ``application/json``.
- response_status_code: An optional status code for the response. Defaults to ``201``.
- token_expiration: An optional timedelta for the token expiration.
- token_issuer: An optional value of the token ``iss`` field.
- token_audience: An optional value for the token ``aud`` field.
- token_unique_jwt_id: An optional value for the token ``jti`` field.
- token_extras: An optional dictionary to include in the token ``extras`` field.
- send_token_as_response_body: If ``True`` the response will be an oAuth2 token response dict.
- Note: if a response body is passed this setting will be ignored.
-
- Returns:
- A :class:`Response <.response.Response>` instance.
- """
- encoded_token = self.create_token(
- identifier=identifier,
- token_expiration=token_expiration,
- token_issuer=token_issuer,
- token_audience=token_audience,
- token_unique_jwt_id=token_unique_jwt_id,
- token_extras=token_extras,
- )
- expires_in = int((token_expiration or self.default_token_expiration).total_seconds())
- cookie = Cookie(
- key=self.key,
- path=self.path,
- httponly=True,
- value=self.format_auth_header(encoded_token),
- max_age=expires_in,
- secure=self.secure,
- samesite=self.samesite,
- domain=self.domain,
- )
-
- if response_body is not Empty:
- body = response_body
- elif send_token_as_response_body:
- token_dto = OAuth2Login(
- access_token=encoded_token,
- expires_in=expires_in,
- token_type="bearer", # noqa: S106
- )
- body = asdict(token_dto)
- else:
- body = None
-
- return self.create_response(
- content=body,
- headers={self.auth_header: self.format_auth_header(encoded_token)},
- cookies=[cookie],
- media_type=response_media_type,
- status_code=response_status_code,
- )
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py
deleted file mode 100644
index 84326da..0000000
--- a/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py
+++ /dev/null
@@ -1,188 +0,0 @@
-from __future__ import annotations
-
-from typing import TYPE_CHECKING, Awaitable, Callable, Sequence
-
-from litestar.exceptions import NotAuthorizedException
-from litestar.middleware.authentication import (
- AbstractAuthenticationMiddleware,
- AuthenticationResult,
-)
-from litestar.security.jwt.token import Token
-
-__all__ = ("JWTAuthenticationMiddleware", "JWTCookieAuthenticationMiddleware")
-
-
-if TYPE_CHECKING:
- from typing import Any
-
- from litestar.connection import ASGIConnection
- from litestar.types import ASGIApp, Method, Scopes
-
-
-class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware):
- """JWT Authentication middleware.
-
- This class provides JWT authentication functionalities.
- """
-
- __slots__ = (
- "algorithm",
- "auth_header",
- "retrieve_user_handler",
- "token_secret",
- )
-
- def __init__(
- self,
- algorithm: str,
- app: ASGIApp,
- auth_header: str,
- exclude: str | list[str] | None,
- exclude_http_methods: Sequence[Method] | None,
- exclude_opt_key: str,
- retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]],
- scopes: Scopes,
- token_secret: str,
- ) -> None:
- """Check incoming requests for an encoded token in the auth header specified, and if present retrieve the user
- from persistence using the provided function.
-
- Args:
- algorithm: JWT hashing algorithm to use.
- app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack.
- auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``.
- exclude: A pattern or list of patterns to skip.
- exclude_opt_key: An identifier to use on routes to disable authentication for a particular route.
- exclude_http_methods: A sequence of http methods that do not require authentication.
- retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user,
- which can be any arbitrary value.
- scopes: ASGI scopes processed by the authentication middleware.
- token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to
- encode it.
- """
- super().__init__(
- app=app,
- exclude=exclude,
- exclude_from_auth_key=exclude_opt_key,
- exclude_http_methods=exclude_http_methods,
- scopes=scopes,
- )
- self.algorithm = algorithm
- self.auth_header = auth_header
- self.retrieve_user_handler = retrieve_user_handler
- self.token_secret = token_secret
-
- async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult:
- """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the
- token from the DB.
-
- Args:
- connection: An Litestar HTTPConnection instance.
-
- Returns:
- AuthenticationResult
-
- Raises:
- NotAuthorizedException: If token is invalid or user is not found.
- """
- auth_header = connection.headers.get(self.auth_header)
- if not auth_header:
- raise NotAuthorizedException("No JWT token found in request header")
- encoded_token = auth_header.partition(" ")[-1]
- return await self.authenticate_token(encoded_token=encoded_token, connection=connection)
-
- async def authenticate_token(
- self, encoded_token: str, connection: ASGIConnection[Any, Any, Any, Any]
- ) -> AuthenticationResult:
- """Given an encoded JWT token, parse, validate and look up sub within token.
-
- Args:
- encoded_token: Encoded JWT token.
- connection: An ASGI connection instance.
-
- Raises:
- NotAuthorizedException: If token is invalid or user is not found.
-
- Returns:
- AuthenticationResult
- """
- token = Token.decode(
- encoded_token=encoded_token,
- secret=self.token_secret,
- algorithm=self.algorithm,
- )
-
- user = await self.retrieve_user_handler(token, connection)
-
- if not user:
- raise NotAuthorizedException()
-
- return AuthenticationResult(user=user, auth=token)
-
-
-class JWTCookieAuthenticationMiddleware(JWTAuthenticationMiddleware):
- """Cookie based JWT authentication middleware."""
-
- __slots__ = ("auth_cookie_key",)
-
- def __init__(
- self,
- algorithm: str,
- app: ASGIApp,
- auth_cookie_key: str,
- auth_header: str,
- exclude: str | list[str] | None,
- exclude_opt_key: str,
- exclude_http_methods: Sequence[Method] | None,
- retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]],
- scopes: Scopes,
- token_secret: str,
- ) -> None:
- """Check incoming requests for an encoded token in the auth header or cookie name specified, and if present
- retrieves the user from persistence using the provided function.
-
- Args:
- algorithm: JWT hashing algorithm to use.
- app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack.
- auth_cookie_key: Cookie name from which to retrieve the token. E.g. ``token`` or ``accessToken``.
- auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``.
- exclude: A pattern or list of patterns to skip.
- exclude_opt_key: An identifier to use on routes to disable authentication for a particular route.
- exclude_http_methods: A sequence of http methods that do not require authentication.
- retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user,
- which can be any arbitrary value.
- scopes: ASGI scopes processed by the authentication middleware.
- token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to
- encode it.
- """
- super().__init__(
- algorithm=algorithm,
- app=app,
- auth_header=auth_header,
- exclude=exclude,
- exclude_http_methods=exclude_http_methods,
- exclude_opt_key=exclude_opt_key,
- retrieve_user_handler=retrieve_user_handler,
- scopes=scopes,
- token_secret=token_secret,
- )
- self.auth_cookie_key = auth_cookie_key
-
- async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult:
- """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the
- token from the DB.
-
- Args:
- connection: An Litestar HTTPConnection instance.
-
- Raises:
- NotAuthorizedException: If token is invalid or user is not found.
-
- Returns:
- AuthenticationResult
- """
- auth_header = connection.headers.get(self.auth_header) or connection.cookies.get(self.auth_cookie_key)
- if not auth_header:
- raise NotAuthorizedException("No JWT token found in request header or cookies")
- encoded_token = auth_header.partition(" ")[-1]
- return await self.authenticate_token(encoded_token=encoded_token, connection=connection)
diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py
deleted file mode 100644
index 279111a..0000000
--- a/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py
+++ /dev/null
@@ -1,119 +0,0 @@
-from __future__ import annotations
-
-import dataclasses
-from dataclasses import asdict, dataclass, field
-from datetime import datetime, timezone
-from typing import TYPE_CHECKING, Any
-
-from jose import JWSError, JWTError, jwt
-
-from litestar.exceptions import ImproperlyConfiguredException, NotAuthorizedException
-
-if TYPE_CHECKING:
- from typing_extensions import Self
-
-
-__all__ = ("Token",)
-
-
-def _normalize_datetime(value: datetime) -> datetime:
- """Convert the given value into UTC and strip microseconds.
-
- Args:
- value: A datetime instance
-
- Returns:
- A datetime instance
- """
- if value.tzinfo is not None:
- value.astimezone(timezone.utc)
-
- return value.replace(microsecond=0)
-
-
-@dataclass
-class Token:
- """JWT Token DTO."""
-
- exp: datetime
- """Expiration - datetime for token expiration."""
- sub: str
- """Subject - usually a unique identifier of the user or equivalent entity."""
- iat: datetime = field(default_factory=lambda: _normalize_datetime(datetime.now(timezone.utc)))
- """Issued at - should always be current now."""
- iss: str | None = field(default=None)
- """Issuer - optional unique identifier for the issuer."""
- aud: str | None = field(default=None)
- """Audience - intended audience."""
- jti: str | None = field(default=None)
- """JWT ID - a unique identifier of the JWT between different issuers."""
- extras: dict[str, Any] = field(default_factory=dict)
- """Extra fields that were found on the JWT token."""
-
- def __post_init__(self) -> None:
- if len(self.sub) < 1:
- raise ImproperlyConfiguredException("sub must be a string with a length greater than 0")
-
- if isinstance(self.exp, datetime) and (
- (exp := _normalize_datetime(self.exp)).timestamp()
- >= _normalize_datetime(datetime.now(timezone.utc)).timestamp()
- ):
- self.exp = exp
- else:
- raise ImproperlyConfiguredException("exp value must be a datetime in the future")
-
- if isinstance(self.iat, datetime) and (
- (iat := _normalize_datetime(self.iat)).timestamp()
- <= _normalize_datetime(datetime.now(timezone.utc)).timestamp()
- ):
- self.iat = iat
- else:
- raise ImproperlyConfiguredException("iat must be a current or past time")
-
- @classmethod
- def decode(cls, encoded_token: str, secret: str | dict[str, str], algorithm: str) -> Self:
- """Decode a passed in token string and returns a Token instance.
-
- Args:
- encoded_token: A base64 string containing an encoded JWT.
- secret: The secret with which the JWT is encoded. It may optionally be an individual JWK or JWS set dict
- algorithm: The algorithm used to encode the JWT.
-
- Returns:
- A decoded Token instance.
-
- Raises:
- NotAuthorizedException: If the token is invalid.
- """
- try:
- payload = jwt.decode(token=encoded_token, key=secret, algorithms=[algorithm], options={"verify_aud": False})
- exp = datetime.fromtimestamp(payload.pop("exp"), tz=timezone.utc)
- iat = datetime.fromtimestamp(payload.pop("iat"), tz=timezone.utc)
- field_names = {f.name for f in dataclasses.fields(Token)}
- extra_fields = payload.keys() - field_names
- extras = payload.pop("extras", {})
- for key in extra_fields:
- extras[key] = payload.pop(key)
- return cls(exp=exp, iat=iat, **payload, extras=extras)
- except (KeyError, JWTError, ImproperlyConfiguredException) as e:
- raise NotAuthorizedException("Invalid token") from e
-
- def encode(self, secret: str, algorithm: str) -> str:
- """Encode the token instance into a string.
-
- Args:
- secret: The secret with which the JWT is encoded.
- algorithm: The algorithm used to encode the JWT.
-
- Returns:
- An encoded token string.
-
- Raises:
- ImproperlyConfiguredException: If encoding fails.
- """
- try:
- return jwt.encode(
- claims={k: v for k, v in asdict(self).items() if v is not None}, key=secret, algorithm=algorithm
- )
- except (JWTError, JWSError) as e:
- raise ImproperlyConfiguredException("Failed to encode token") from e