diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/security | |
parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/security')
18 files changed, 1473 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/security/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/__init__.py new file mode 100644 index 0000000..d864d43 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/__init__.py @@ -0,0 +1,3 @@ +from litestar.security.base import AbstractSecurityConfig + +__all__ = ("AbstractSecurityConfig",) diff --git a/venv/lib/python3.11/site-packages/litestar/security/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..419b39e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/__pycache__/base.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..6133974 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/__pycache__/base.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/base.py b/venv/lib/python3.11/site-packages/litestar/security/base.py new file mode 100644 index 0000000..fbe7913 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/base.py @@ -0,0 +1,183 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from copy import copy +from dataclasses import field +from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Sequence, TypeVar, cast + +from litestar import Response +from litestar.utils.sync import ensure_async_callable + +if TYPE_CHECKING: + from litestar.config.app import AppConfig + from litestar.connection import ASGIConnection + from litestar.di import Provide + from litestar.enums import MediaType, OpenAPIMediaType + from litestar.middleware.authentication import AbstractAuthenticationMiddleware + from litestar.middleware.base import DefineMiddleware + from litestar.openapi.spec import Components, SecurityRequirement + from litestar.types import ( + ControllerRouterHandler, + Guard, + Method, + ResponseCookies, + Scopes, + SyncOrAsyncUnion, + TypeEncodersMap, + ) + +__all__ = ("AbstractSecurityConfig",) + +UserType = TypeVar("UserType") +AuthType = TypeVar("AuthType") + + +class AbstractSecurityConfig(ABC, Generic[UserType, AuthType]): + """A base class for Security Configs - this class can be used on the application level + or be manually configured on the router / controller level to provide auth. + """ + + authentication_middleware_class: type[AbstractAuthenticationMiddleware] + """The authentication middleware class to use. + + Must inherit from + :class:`AbstractAuthenticationMiddleware <litestar.middleware.authentication.AbstractAuthenticationMiddleware>` + """ + guards: Iterable[Guard] | None = None + """An iterable of guards to call for requests, providing authorization functionalities.""" + exclude: str | list[str] | None = None + """A pattern or list of patterns to skip in the authentication middleware.""" + exclude_opt_key: str = "exclude_from_auth" + """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" + exclude_http_methods: Sequence[Method] | None = field( + default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) + ) + """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" + scopes: Scopes | None = None + """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be + processed.""" + route_handlers: Iterable[ControllerRouterHandler] | None = None + """An optional iterable of route handlers to register.""" + dependencies: dict[str, Provide] | None = None + """An optional dictionary of dependency providers.""" + retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] + """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + + Notes: + - User and Auth can be any arbitrary values specified by the security backend. + - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. + Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. + - The callable can be sync or async. If it is sync, it will be wrapped to support async. + + """ + type_encoders: TypeEncodersMap | None = None + """A mapping of types to callables that transform them into types supported for serialization.""" + + def on_app_init(self, app_config: AppConfig) -> AppConfig: + """Handle app init by injecting middleware, guards etc. into the app. This method can be used only on the app + level. + + Args: + app_config: An instance of :class:`AppConfig <.config.app.AppConfig>` + + Returns: + The :class:`AppConfig <.config.app.AppConfig>`. + """ + app_config.middleware.insert(0, self.middleware) + + if app_config.openapi_config: + app_config.openapi_config = copy(app_config.openapi_config) + if isinstance(app_config.openapi_config.components, list): + app_config.openapi_config.components.append(self.openapi_components) + elif app_config.openapi_config.components: + app_config.openapi_config.components = [self.openapi_components, app_config.openapi_config.components] + else: + app_config.openapi_config.components = [self.openapi_components] + + if isinstance(app_config.openapi_config.security, list): + app_config.openapi_config.security.append(self.security_requirement) + else: + app_config.openapi_config.security = [self.security_requirement] + + if self.guards: + app_config.guards.extend(self.guards) + + if self.dependencies: + app_config.dependencies.update(self.dependencies) + + if self.route_handlers: + app_config.route_handlers.extend(self.route_handlers) + + if self.type_encoders is None: + self.type_encoders = app_config.type_encoders + + return app_config + + def create_response( + self, + content: Any | None, + status_code: int, + media_type: MediaType | OpenAPIMediaType | str, + headers: dict[str, Any] | None = None, + cookies: ResponseCookies | None = None, + ) -> Response[Any]: + """Create a response object. + + Handles setting the type encoders mapping on the response. + + Args: + content: A value for the response body that will be rendered into bytes string. + status_code: An HTTP status code. + media_type: A value for the response 'Content-Type' header. + headers: A string keyed dictionary of response headers. Header keys are insensitive. + cookies: A list of :class:`Cookie <litestar.datastructures.Cookie>` instances to be set under + the response 'Set-Cookie' header. + + Returns: + A response object. + """ + return Response( + content=content, + status_code=status_code, + media_type=media_type, + headers=headers, + cookies=cookies, + type_encoders=self.type_encoders, + ) + + def __post_init__(self) -> None: + self.retrieve_user_handler = ensure_async_callable(self.retrieve_user_handler) + + @property + @abstractmethod + def openapi_components(self) -> Components: + """Create OpenAPI documentation for the JWT auth schema used. + + Returns: + An :class:`Components <litestar.openapi.spec.components.Components>` instance. + """ + raise NotImplementedError + + @property + @abstractmethod + def security_requirement(self) -> SecurityRequirement: + """Return OpenAPI 3.1. + + :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` for the auth + backend. + + Returns: + An OpenAPI 3.1 :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` dictionary. + """ + raise NotImplementedError + + @property + @abstractmethod + def middleware(self) -> DefineMiddleware: + """Create an instance of the config's ``authentication_middleware_class`` attribute and any required kwargs, + wrapping it in Litestar's ``DefineMiddleware``. + + Returns: + An instance of :class:`DefineMiddleware <litestar.middleware.base.DefineMiddleware>`. + """ + raise NotImplementedError diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py new file mode 100644 index 0000000..4fd88de --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__init__.py @@ -0,0 +1,23 @@ +from litestar.security.jwt.auth import ( + BaseJWTAuth, + JWTAuth, + JWTCookieAuth, + OAuth2Login, + OAuth2PasswordBearerAuth, +) +from litestar.security.jwt.middleware import ( + JWTAuthenticationMiddleware, + JWTCookieAuthenticationMiddleware, +) +from litestar.security.jwt.token import Token + +__all__ = ( + "BaseJWTAuth", + "JWTAuth", + "JWTAuthenticationMiddleware", + "JWTCookieAuth", + "JWTCookieAuthenticationMiddleware", + "OAuth2Login", + "OAuth2PasswordBearerAuth", + "Token", +) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..f04d57f --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..cec42c0 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/auth.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..8d5603e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/middleware.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..b4f8c45 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/__pycache__/token.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py new file mode 100644 index 0000000..2a0f094 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/auth.py @@ -0,0 +1,691 @@ +from __future__ import annotations + +from dataclasses import asdict, dataclass, field +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Any, Callable, Generic, Iterable, Literal, Sequence, TypeVar, cast + +from litestar.datastructures import Cookie +from litestar.enums import MediaType +from litestar.middleware import DefineMiddleware +from litestar.openapi.spec import Components, OAuthFlow, OAuthFlows, SecurityRequirement, SecurityScheme +from litestar.security.base import AbstractSecurityConfig +from litestar.security.jwt.middleware import JWTAuthenticationMiddleware, JWTCookieAuthenticationMiddleware +from litestar.security.jwt.token import Token +from litestar.status_codes import HTTP_201_CREATED +from litestar.types import ControllerRouterHandler, Empty, Guard, Method, Scopes, SyncOrAsyncUnion, TypeEncodersMap + +__all__ = ("BaseJWTAuth", "JWTAuth", "JWTCookieAuth", "OAuth2Login", "OAuth2PasswordBearerAuth") + + +if TYPE_CHECKING: + from litestar import Response + from litestar.connection import ASGIConnection + from litestar.di import Provide + + +UserType = TypeVar("UserType") + + +class BaseJWTAuth(Generic[UserType], AbstractSecurityConfig[UserType, Token]): + """Base class for JWT Auth backends""" + + token_secret: str + """Key with which to generate the token hash. + + Notes: + - This value should be kept as a secret and the standard practice is to inject it into the environment. + """ + retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] + """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + + Notes: + - User and Auth can be any arbitrary values specified by the security backend. + - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. + Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. + - The callable can be sync or async. If it is sync, it will be wrapped to support async. + + """ + algorithm: str + """Algorithm to use for JWT hashing.""" + auth_header: str + """Request header key from which to retrieve the token. + + E.g. ``Authorization`` or ``X-Api-Key``. + """ + default_token_expiration: timedelta + """The default value for token expiration.""" + openapi_security_scheme_name: str + """The value to use for the OpenAPI security scheme and security requirements.""" + description: str + """Description for the OpenAPI security scheme.""" + authentication_middleware_class: type[JWTAuthenticationMiddleware] # pyright: ignore + """The authentication middleware class to use. + + Must inherit from :class:`JWTAuthenticationMiddleware` + """ + + @property + def openapi_components(self) -> Components: + """Create OpenAPI documentation for the JWT auth schema used. + + Returns: + An :class:`Components <litestar.openapi.spec.components.Components>` instance. + """ + return Components( + security_schemes={ + self.openapi_security_scheme_name: SecurityScheme( + type="http", + scheme="Bearer", + name=self.auth_header, + bearer_format="JWT", + description=self.description, + ) + } + ) + + @property + def security_requirement(self) -> SecurityRequirement: + """Return OpenAPI 3.1. + + :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` + + Returns: + An OpenAPI 3.1 + :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` + dictionary. + """ + return {self.openapi_security_scheme_name: []} + + @property + def middleware(self) -> DefineMiddleware: + """Create :class:`JWTAuthenticationMiddleware` wrapped in + :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. + + Returns: + An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. + """ + return DefineMiddleware( + self.authentication_middleware_class, + algorithm=self.algorithm, + auth_header=self.auth_header, + exclude=self.exclude, + exclude_opt_key=self.exclude_opt_key, + exclude_http_methods=self.exclude_http_methods, + retrieve_user_handler=self.retrieve_user_handler, + scopes=self.scopes, + token_secret=self.token_secret, + ) + + def login( + self, + identifier: str, + *, + response_body: Any = Empty, + response_media_type: str | MediaType = MediaType.JSON, + response_status_code: int = HTTP_201_CREATED, + token_expiration: timedelta | None = None, + token_issuer: str | None = None, + token_audience: str | None = None, + token_unique_jwt_id: str | None = None, + token_extras: dict[str, Any] | None = None, + send_token_as_response_body: bool = False, + ) -> Response[Any]: + """Create a response with a JWT header. + + Args: + identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. + response_body: An optional response body to send. + response_media_type: An optional ``Content-Type``. Defaults to ``application/json``. + response_status_code: An optional status code for the response. Defaults to ``201``. + token_expiration: An optional timedelta for the token expiration. + token_issuer: An optional value of the token ``iss`` field. + token_audience: An optional value for the token ``aud`` field. + token_unique_jwt_id: An optional value for the token ``jti`` field. + token_extras: An optional dictionary to include in the token ``extras`` field. + send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }`` + will be returned as the response body. Note: if a response body is passed this setting will be ignored. + + Returns: + A :class:`Response <.response.Response>` instance. + """ + encoded_token = self.create_token( + identifier=identifier, + token_expiration=token_expiration, + token_issuer=token_issuer, + token_audience=token_audience, + token_unique_jwt_id=token_unique_jwt_id, + token_extras=token_extras, + ) + + if response_body is not Empty: + body = response_body + elif send_token_as_response_body: + body = {"token": encoded_token} + else: + body = None + + return self.create_response( + content=body, + headers={self.auth_header: self.format_auth_header(encoded_token)}, + media_type=response_media_type, + status_code=response_status_code, + ) + + def create_token( + self, + identifier: str, + token_expiration: timedelta | None = None, + token_issuer: str | None = None, + token_audience: str | None = None, + token_unique_jwt_id: str | None = None, + token_extras: dict | None = None, + ) -> str: + """Create a Token instance from the passed in parameters, persists and returns it. + + Args: + identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. + token_expiration: An optional timedelta for the token expiration. + token_issuer: An optional value of the token ``iss`` field. + token_audience: An optional value for the token ``aud`` field. + token_unique_jwt_id: An optional value for the token ``jti`` field. + token_extras: An optional dictionary to include in the token ``extras`` field. + + Returns: + The created token. + """ + token = Token( + sub=identifier, + exp=(datetime.now(timezone.utc) + (token_expiration or self.default_token_expiration)), + iss=token_issuer, + aud=token_audience, + jti=token_unique_jwt_id, + extras=token_extras or {}, + ) + return token.encode(secret=self.token_secret, algorithm=self.algorithm) + + def format_auth_header(self, encoded_token: str) -> str: + """Format a token according to the specified OpenAPI scheme. + + Args: + encoded_token: An encoded JWT token + + Returns: + The encoded token formatted for the HTTP headers + """ + security = self.openapi_components.security_schemes.get(self.openapi_security_scheme_name, None) # type: ignore[union-attr] + return f"{security.scheme} {encoded_token}" if isinstance(security, SecurityScheme) else encoded_token + + +@dataclass +class JWTAuth(Generic[UserType], BaseJWTAuth[UserType]): + """JWT Authentication Configuration. + + This class is the main entry point to the library, and it includes methods to create the middleware, provide login + functionality, and create OpenAPI documentation. + """ + + token_secret: str + """Key with which to generate the token hash. + + Notes: + - This value should be kept as a secret and the standard practice is to inject it into the environment. + """ + retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] + """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + + Notes: + - User and Auth can be any arbitrary values specified by the security backend. + - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. + Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. + - The callable can be sync or async. If it is sync, it will be wrapped to support async. + + """ + guards: Iterable[Guard] | None = field(default=None) + """An iterable of guards to call for requests, providing authorization functionalities.""" + exclude: str | list[str] | None = field(default=None) + """A pattern or list of patterns to skip in the authentication middleware.""" + exclude_opt_key: str = field(default="exclude_from_auth") + """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" + exclude_http_methods: Sequence[Method] | None = field( + default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) + ) + """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" + scopes: Scopes | None = field(default=None) + """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be + processed.""" + route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) + """An optional iterable of route handlers to register.""" + dependencies: dict[str, Provide] | None = field(default=None) + """An optional dictionary of dependency providers.""" + + type_encoders: TypeEncodersMap | None = field(default=None) + """A mapping of types to callables that transform them into types supported for serialization.""" + + algorithm: str = field(default="HS256") + """Algorithm to use for JWT hashing.""" + auth_header: str = field(default="Authorization") + """Request header key from which to retrieve the token. + + E.g. ``Authorization`` or ``X-Api-Key``. + """ + default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) + """The default value for token expiration.""" + openapi_security_scheme_name: str = field(default="BearerToken") + """The value to use for the OpenAPI security scheme and security requirements.""" + description: str = field(default="JWT api-key authentication and authorization.") + """Description for the OpenAPI security scheme.""" + authentication_middleware_class: type[JWTAuthenticationMiddleware] = field(default=JWTAuthenticationMiddleware) + """The authentication middleware class to use. + + Must inherit from :class:`JWTAuthenticationMiddleware` + """ + + +@dataclass +class JWTCookieAuth(Generic[UserType], BaseJWTAuth[UserType]): + """JWT Cookie Authentication Configuration. + + This class is an alternate entry point to the library, and it includes all the functionality of the :class:`JWTAuth` + class and adds support for passing JWT tokens ``HttpOnly`` cookies. + """ + + token_secret: str + """Key with which to generate the token hash. + + Notes: + - This value should be kept as a secret and the standard practice is to inject it into the environment. + """ + retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] + """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + + Notes: + - User and Auth can be any arbitrary values specified by the security backend. + - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. + Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. + - The callable can be sync or async. If it is sync, it will be wrapped to support async. + + """ + guards: Iterable[Guard] | None = field(default=None) + """An iterable of guards to call for requests, providing authorization functionalities.""" + exclude: str | list[str] | None = field(default=None) + """A pattern or list of patterns to skip in the authentication middleware.""" + exclude_opt_key: str = field(default="exclude_from_auth") + """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" + scopes: Scopes | None = field(default=None) + """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be + processed.""" + exclude_http_methods: Sequence[Method] | None = field( + default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) + ) + """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" + route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) + """An optional iterable of route handlers to register.""" + dependencies: dict[str, Provide] | None = field(default=None) + """An optional dictionary of dependency providers.""" + + type_encoders: TypeEncodersMap | None = field(default=None) + """A mapping of types to callables that transform them into types supported for serialization.""" + + algorithm: str = field(default="HS256") + """Algorithm to use for JWT hashing.""" + auth_header: str = field(default="Authorization") + """Request header key from which to retrieve the token. + + E.g. ``Authorization`` or ``X-Api-Key``. + """ + default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) + """The default value for token expiration.""" + openapi_security_scheme_name: str = field(default="BearerToken") + """The value to use for the OpenAPI security scheme and security requirements.""" + key: str = field(default="token") + """Key for the cookie.""" + path: str = field(default="/") + """Path fragment that must exist in the request url for the cookie to be valid. + + Defaults to ``/``. + """ + domain: str | None = field(default=None) + """Domain for which the cookie is valid.""" + secure: bool | None = field(default=None) + """Https is required for the cookie.""" + samesite: Literal["lax", "strict", "none"] = field(default="lax") + """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """ + description: str = field(default="JWT cookie-based authentication and authorization.") + """Description for the OpenAPI security scheme.""" + authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field( # pyright: ignore + default=JWTCookieAuthenticationMiddleware + ) + """The authentication middleware class to use. Must inherit from :class:`JWTCookieAuthenticationMiddleware` + """ + + @property + def openapi_components(self) -> Components: + """Create OpenAPI documentation for the JWT Cookie auth scheme. + + Returns: + A :class:`Components <litestar.openapi.spec.components.Components>` instance. + """ + return Components( + security_schemes={ + self.openapi_security_scheme_name: SecurityScheme( + type="http", + scheme="Bearer", + name=self.key, + security_scheme_in="cookie", + bearer_format="JWT", + description=self.description, + ) + } + ) + + @property + def middleware(self) -> DefineMiddleware: + """Create :class:`JWTCookieAuthenticationMiddleware` wrapped in + :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. + + Returns: + An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. + """ + return DefineMiddleware( + self.authentication_middleware_class, + algorithm=self.algorithm, + auth_cookie_key=self.key, + auth_header=self.auth_header, + exclude=self.exclude, + exclude_opt_key=self.exclude_opt_key, + exclude_http_methods=self.exclude_http_methods, + retrieve_user_handler=self.retrieve_user_handler, + scopes=self.scopes, + token_secret=self.token_secret, + ) + + def login( + self, + identifier: str, + *, + response_body: Any = Empty, + response_media_type: str | MediaType = MediaType.JSON, + response_status_code: int = HTTP_201_CREATED, + token_expiration: timedelta | None = None, + token_issuer: str | None = None, + token_audience: str | None = None, + token_unique_jwt_id: str | None = None, + token_extras: dict[str, Any] | None = None, + send_token_as_response_body: bool = False, + ) -> Response[Any]: + """Create a response with a JWT header. + + Args: + identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. + response_body: An optional response body to send. + response_media_type: An optional 'Content-Type'. Defaults to 'application/json'. + response_status_code: An optional status code for the response. Defaults to '201 Created'. + token_expiration: An optional timedelta for the token expiration. + token_issuer: An optional value of the token ``iss`` field. + token_audience: An optional value for the token ``aud`` field. + token_unique_jwt_id: An optional value for the token ``jti`` field. + token_extras: An optional dictionary to include in the token ``extras`` field. + send_token_as_response_body: If ``True`` the response will be a dict including the token: ``{ "token": <token> }`` + will be returned as the response body. Note: if a response body is passed this setting will be ignored. + + Returns: + A :class:`Response <.response.Response>` instance. + """ + + encoded_token = self.create_token( + identifier=identifier, + token_expiration=token_expiration, + token_issuer=token_issuer, + token_audience=token_audience, + token_unique_jwt_id=token_unique_jwt_id, + token_extras=token_extras, + ) + cookie = Cookie( + key=self.key, + path=self.path, + httponly=True, + value=self.format_auth_header(encoded_token), + max_age=int((token_expiration or self.default_token_expiration).total_seconds()), + secure=self.secure, + samesite=self.samesite, + domain=self.domain, + ) + + if response_body is not Empty: + body = response_body + elif send_token_as_response_body: + body = {"token": encoded_token} + else: + body = None + + return self.create_response( + content=body, + headers={self.auth_header: self.format_auth_header(encoded_token)}, + cookies=[cookie], + media_type=response_media_type, + status_code=response_status_code, + ) + + +@dataclass +class OAuth2Login: + """OAuth2 Login DTO""" + + access_token: str + """Valid JWT access token""" + token_type: str + """Type of the OAuth token used""" + refresh_token: str | None = field(default=None) + """Optional valid refresh token JWT""" + expires_in: int | None = field(default=None) + """Expiration time of the token in seconds. """ + + +@dataclass +class OAuth2PasswordBearerAuth(Generic[UserType], BaseJWTAuth[UserType]): + """OAUTH2 Schema for Password Bearer Authentication. + + This class implements an OAUTH2 authentication flow entry point to the library, and it includes all the + functionality of the :class:`JWTAuth` class and adds support for passing JWT tokens ``HttpOnly`` cookies. + + ``token_url`` is the only additional argument that is required, and it should point at your login route + """ + + token_secret: str + """Key with which to generate the token hash. + + Notes: + - This value should be kept as a secret and the standard practice is to inject it into the environment. + """ + token_url: str + """The URL for retrieving a new token.""" + retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] + """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + + Notes: + - User and Auth can be any arbitrary values specified by the security backend. + - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. + Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. + - The callable can be sync or async. If it is sync, it will be wrapped to support async. + + """ + guards: Iterable[Guard] | None = field(default=None) + """An iterable of guards to call for requests, providing authorization functionalities.""" + exclude: str | list[str] | None = field(default=None) + """A pattern or list of patterns to skip in the authentication middleware.""" + exclude_opt_key: str = field(default="exclude_from_auth") + """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" + exclude_http_methods: Sequence[Method] | None = field( + default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) + ) + """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" + scopes: Scopes | None = field(default=None) + """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be + processed.""" + route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) + """An optional iterable of route handlers to register.""" + dependencies: dict[str, Provide] | None = field(default=None) + """An optional dictionary of dependency providers.""" + type_encoders: TypeEncodersMap | None = field(default=None) + """A mapping of types to callables that transform them into types supported for serialization.""" + algorithm: str = field(default="HS256") + """Algorithm to use for JWT hashing.""" + auth_header: str = field(default="Authorization") + """Request header key from which to retrieve the token. + + E.g. ``Authorization`` or 'X-Api-Key'. + """ + default_token_expiration: timedelta = field(default_factory=lambda: timedelta(days=1)) + """The default value for token expiration.""" + openapi_security_scheme_name: str = field(default="BearerToken") + """The value to use for the OpenAPI security scheme and security requirements.""" + oauth_scopes: dict[str, str] | None = field(default=None) + """Oauth Scopes available for the token.""" + key: str = field(default="token") + """Key for the cookie.""" + path: str = field(default="/") + """Path fragment that must exist in the request url for the cookie to be valid. + + Defaults to ``/``. + """ + domain: str | None = field(default=None) + """Domain for which the cookie is valid.""" + secure: bool | None = field(default=None) + """Https is required for the cookie.""" + samesite: Literal["lax", "strict", "none"] = field(default="lax") + """Controls whether or not a cookie is sent with cross-site requests. Defaults to ``lax``. """ + description: str = field(default="OAUTH2 password bearer authentication and authorization.") + """Description for the OpenAPI security scheme.""" + authentication_middleware_class: type[JWTCookieAuthenticationMiddleware] = field( # pyright: ignore + default=JWTCookieAuthenticationMiddleware + ) + """The authentication middleware class to use. + + Must inherit from :class:`JWTCookieAuthenticationMiddleware` + """ + + @property + def middleware(self) -> DefineMiddleware: + """Create ``JWTCookieAuthenticationMiddleware`` wrapped in + :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. + + Returns: + An instance of :class:`DefineMiddleware <.middleware.base.DefineMiddleware>`. + """ + return DefineMiddleware( + self.authentication_middleware_class, + algorithm=self.algorithm, + auth_cookie_key=self.key, + auth_header=self.auth_header, + exclude=self.exclude, + exclude_opt_key=self.exclude_opt_key, + exclude_http_methods=self.exclude_http_methods, + retrieve_user_handler=self.retrieve_user_handler, + scopes=self.scopes, + token_secret=self.token_secret, + ) + + @property + def oauth_flow(self) -> OAuthFlow: + """Create an OpenAPI OAuth2 flow for the password bearer authentication scheme. + + Returns: + An :class:`OAuthFlow <litestar.openapi.spec.oauth_flow.OAuthFlow>` instance. + """ + return OAuthFlow( + token_url=self.token_url, + scopes=self.oauth_scopes, + ) + + @property + def openapi_components(self) -> Components: + """Create OpenAPI documentation for the OAUTH2 Password bearer auth scheme. + + Returns: + An :class:`Components <litestar.openapi.spec.components.Components>` instance. + """ + return Components( + security_schemes={ + self.openapi_security_scheme_name: SecurityScheme( + type="oauth2", + scheme="Bearer", + name=self.auth_header, + security_scheme_in="header", + flows=OAuthFlows(password=self.oauth_flow), # pyright: ignore[reportGeneralTypeIssues] + bearer_format="JWT", + description=self.description, + ) + } + ) + + def login( + self, + identifier: str, + *, + response_body: Any = Empty, + response_media_type: str | MediaType = MediaType.JSON, + response_status_code: int = HTTP_201_CREATED, + token_expiration: timedelta | None = None, + token_issuer: str | None = None, + token_audience: str | None = None, + token_unique_jwt_id: str | None = None, + token_extras: dict[str, Any] | None = None, + send_token_as_response_body: bool = True, + ) -> Response[Any]: + """Create a response with a JWT header. + + Args: + identifier: Unique identifier of the token subject. Usually this is a user ID or equivalent kind of value. + response_body: An optional response body to send. + response_media_type: An optional ``Content-Type``. Defaults to ``application/json``. + response_status_code: An optional status code for the response. Defaults to ``201``. + token_expiration: An optional timedelta for the token expiration. + token_issuer: An optional value of the token ``iss`` field. + token_audience: An optional value for the token ``aud`` field. + token_unique_jwt_id: An optional value for the token ``jti`` field. + token_extras: An optional dictionary to include in the token ``extras`` field. + send_token_as_response_body: If ``True`` the response will be an oAuth2 token response dict. + Note: if a response body is passed this setting will be ignored. + + Returns: + A :class:`Response <.response.Response>` instance. + """ + encoded_token = self.create_token( + identifier=identifier, + token_expiration=token_expiration, + token_issuer=token_issuer, + token_audience=token_audience, + token_unique_jwt_id=token_unique_jwt_id, + token_extras=token_extras, + ) + expires_in = int((token_expiration or self.default_token_expiration).total_seconds()) + cookie = Cookie( + key=self.key, + path=self.path, + httponly=True, + value=self.format_auth_header(encoded_token), + max_age=expires_in, + secure=self.secure, + samesite=self.samesite, + domain=self.domain, + ) + + if response_body is not Empty: + body = response_body + elif send_token_as_response_body: + token_dto = OAuth2Login( + access_token=encoded_token, + expires_in=expires_in, + token_type="bearer", # noqa: S106 + ) + body = asdict(token_dto) + else: + body = None + + return self.create_response( + content=body, + headers={self.auth_header: self.format_auth_header(encoded_token)}, + cookies=[cookie], + media_type=response_media_type, + status_code=response_status_code, + ) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py new file mode 100644 index 0000000..84326da --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/middleware.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Awaitable, Callable, Sequence + +from litestar.exceptions import NotAuthorizedException +from litestar.middleware.authentication import ( + AbstractAuthenticationMiddleware, + AuthenticationResult, +) +from litestar.security.jwt.token import Token + +__all__ = ("JWTAuthenticationMiddleware", "JWTCookieAuthenticationMiddleware") + + +if TYPE_CHECKING: + from typing import Any + + from litestar.connection import ASGIConnection + from litestar.types import ASGIApp, Method, Scopes + + +class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware): + """JWT Authentication middleware. + + This class provides JWT authentication functionalities. + """ + + __slots__ = ( + "algorithm", + "auth_header", + "retrieve_user_handler", + "token_secret", + ) + + def __init__( + self, + algorithm: str, + app: ASGIApp, + auth_header: str, + exclude: str | list[str] | None, + exclude_http_methods: Sequence[Method] | None, + exclude_opt_key: str, + retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], + scopes: Scopes, + token_secret: str, + ) -> None: + """Check incoming requests for an encoded token in the auth header specified, and if present retrieve the user + from persistence using the provided function. + + Args: + algorithm: JWT hashing algorithm to use. + app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. + auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``. + exclude: A pattern or list of patterns to skip. + exclude_opt_key: An identifier to use on routes to disable authentication for a particular route. + exclude_http_methods: A sequence of http methods that do not require authentication. + retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user, + which can be any arbitrary value. + scopes: ASGI scopes processed by the authentication middleware. + token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to + encode it. + """ + super().__init__( + app=app, + exclude=exclude, + exclude_from_auth_key=exclude_opt_key, + exclude_http_methods=exclude_http_methods, + scopes=scopes, + ) + self.algorithm = algorithm + self.auth_header = auth_header + self.retrieve_user_handler = retrieve_user_handler + self.token_secret = token_secret + + async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: + """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the + token from the DB. + + Args: + connection: An Litestar HTTPConnection instance. + + Returns: + AuthenticationResult + + Raises: + NotAuthorizedException: If token is invalid or user is not found. + """ + auth_header = connection.headers.get(self.auth_header) + if not auth_header: + raise NotAuthorizedException("No JWT token found in request header") + encoded_token = auth_header.partition(" ")[-1] + return await self.authenticate_token(encoded_token=encoded_token, connection=connection) + + async def authenticate_token( + self, encoded_token: str, connection: ASGIConnection[Any, Any, Any, Any] + ) -> AuthenticationResult: + """Given an encoded JWT token, parse, validate and look up sub within token. + + Args: + encoded_token: Encoded JWT token. + connection: An ASGI connection instance. + + Raises: + NotAuthorizedException: If token is invalid or user is not found. + + Returns: + AuthenticationResult + """ + token = Token.decode( + encoded_token=encoded_token, + secret=self.token_secret, + algorithm=self.algorithm, + ) + + user = await self.retrieve_user_handler(token, connection) + + if not user: + raise NotAuthorizedException() + + return AuthenticationResult(user=user, auth=token) + + +class JWTCookieAuthenticationMiddleware(JWTAuthenticationMiddleware): + """Cookie based JWT authentication middleware.""" + + __slots__ = ("auth_cookie_key",) + + def __init__( + self, + algorithm: str, + app: ASGIApp, + auth_cookie_key: str, + auth_header: str, + exclude: str | list[str] | None, + exclude_opt_key: str, + exclude_http_methods: Sequence[Method] | None, + retrieve_user_handler: Callable[[Token, ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], + scopes: Scopes, + token_secret: str, + ) -> None: + """Check incoming requests for an encoded token in the auth header or cookie name specified, and if present + retrieves the user from persistence using the provided function. + + Args: + algorithm: JWT hashing algorithm to use. + app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. + auth_cookie_key: Cookie name from which to retrieve the token. E.g. ``token`` or ``accessToken``. + auth_header: Request header key from which to retrieve the token. E.g. ``Authorization`` or ``X-Api-Key``. + exclude: A pattern or list of patterns to skip. + exclude_opt_key: An identifier to use on routes to disable authentication for a particular route. + exclude_http_methods: A sequence of http methods that do not require authentication. + retrieve_user_handler: A function that receives a :class:`Token <.security.jwt.Token>` and returns a user, + which can be any arbitrary value. + scopes: ASGI scopes processed by the authentication middleware. + token_secret: Secret for decoding the JWT token. This value should be equivalent to the secret used to + encode it. + """ + super().__init__( + algorithm=algorithm, + app=app, + auth_header=auth_header, + exclude=exclude, + exclude_http_methods=exclude_http_methods, + exclude_opt_key=exclude_opt_key, + retrieve_user_handler=retrieve_user_handler, + scopes=scopes, + token_secret=token_secret, + ) + self.auth_cookie_key = auth_cookie_key + + async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: + """Given an HTTP Connection, parse the JWT api key stored in the header and retrieve the user correlating to the + token from the DB. + + Args: + connection: An Litestar HTTPConnection instance. + + Raises: + NotAuthorizedException: If token is invalid or user is not found. + + Returns: + AuthenticationResult + """ + auth_header = connection.headers.get(self.auth_header) or connection.cookies.get(self.auth_cookie_key) + if not auth_header: + raise NotAuthorizedException("No JWT token found in request header or cookies") + encoded_token = auth_header.partition(" ")[-1] + return await self.authenticate_token(encoded_token=encoded_token, connection=connection) diff --git a/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py b/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py new file mode 100644 index 0000000..279111a --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/jwt/token.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import dataclasses +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any + +from jose import JWSError, JWTError, jwt + +from litestar.exceptions import ImproperlyConfiguredException, NotAuthorizedException + +if TYPE_CHECKING: + from typing_extensions import Self + + +__all__ = ("Token",) + + +def _normalize_datetime(value: datetime) -> datetime: + """Convert the given value into UTC and strip microseconds. + + Args: + value: A datetime instance + + Returns: + A datetime instance + """ + if value.tzinfo is not None: + value.astimezone(timezone.utc) + + return value.replace(microsecond=0) + + +@dataclass +class Token: + """JWT Token DTO.""" + + exp: datetime + """Expiration - datetime for token expiration.""" + sub: str + """Subject - usually a unique identifier of the user or equivalent entity.""" + iat: datetime = field(default_factory=lambda: _normalize_datetime(datetime.now(timezone.utc))) + """Issued at - should always be current now.""" + iss: str | None = field(default=None) + """Issuer - optional unique identifier for the issuer.""" + aud: str | None = field(default=None) + """Audience - intended audience.""" + jti: str | None = field(default=None) + """JWT ID - a unique identifier of the JWT between different issuers.""" + extras: dict[str, Any] = field(default_factory=dict) + """Extra fields that were found on the JWT token.""" + + def __post_init__(self) -> None: + if len(self.sub) < 1: + raise ImproperlyConfiguredException("sub must be a string with a length greater than 0") + + if isinstance(self.exp, datetime) and ( + (exp := _normalize_datetime(self.exp)).timestamp() + >= _normalize_datetime(datetime.now(timezone.utc)).timestamp() + ): + self.exp = exp + else: + raise ImproperlyConfiguredException("exp value must be a datetime in the future") + + if isinstance(self.iat, datetime) and ( + (iat := _normalize_datetime(self.iat)).timestamp() + <= _normalize_datetime(datetime.now(timezone.utc)).timestamp() + ): + self.iat = iat + else: + raise ImproperlyConfiguredException("iat must be a current or past time") + + @classmethod + def decode(cls, encoded_token: str, secret: str | dict[str, str], algorithm: str) -> Self: + """Decode a passed in token string and returns a Token instance. + + Args: + encoded_token: A base64 string containing an encoded JWT. + secret: The secret with which the JWT is encoded. It may optionally be an individual JWK or JWS set dict + algorithm: The algorithm used to encode the JWT. + + Returns: + A decoded Token instance. + + Raises: + NotAuthorizedException: If the token is invalid. + """ + try: + payload = jwt.decode(token=encoded_token, key=secret, algorithms=[algorithm], options={"verify_aud": False}) + exp = datetime.fromtimestamp(payload.pop("exp"), tz=timezone.utc) + iat = datetime.fromtimestamp(payload.pop("iat"), tz=timezone.utc) + field_names = {f.name for f in dataclasses.fields(Token)} + extra_fields = payload.keys() - field_names + extras = payload.pop("extras", {}) + for key in extra_fields: + extras[key] = payload.pop(key) + return cls(exp=exp, iat=iat, **payload, extras=extras) + except (KeyError, JWTError, ImproperlyConfiguredException) as e: + raise NotAuthorizedException("Invalid token") from e + + def encode(self, secret: str, algorithm: str) -> str: + """Encode the token instance into a string. + + Args: + secret: The secret with which the JWT is encoded. + algorithm: The algorithm used to encode the JWT. + + Returns: + An encoded token string. + + Raises: + ImproperlyConfiguredException: If encoding fails. + """ + try: + return jwt.encode( + claims={k: v for k, v in asdict(self).items() if v is not None}, key=secret, algorithm=algorithm + ) + except (JWTError, JWSError) as e: + raise ImproperlyConfiguredException("Failed to encode token") from e diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__init__.py b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__init__.py new file mode 100644 index 0000000..7c83991 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__init__.py @@ -0,0 +1,4 @@ +from litestar.security.session_auth.auth import SessionAuth +from litestar.security.session_auth.middleware import SessionAuthMiddleware + +__all__ = ("SessionAuth", "SessionAuthMiddleware") diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..95bf5c1 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/auth.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..8d4aa6c --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/auth.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/middleware.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/middleware.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..27e4213 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/__pycache__/middleware.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/auth.py b/venv/lib/python3.11/site-packages/litestar/security/session_auth/auth.py new file mode 100644 index 0000000..7a5c542 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/auth.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Iterable, Sequence, cast + +from litestar.middleware.base import DefineMiddleware +from litestar.middleware.session.base import BaseBackendConfig, BaseSessionBackendT +from litestar.openapi.spec import Components, SecurityRequirement, SecurityScheme +from litestar.security.base import AbstractSecurityConfig, UserType +from litestar.security.session_auth.middleware import MiddlewareWrapper, SessionAuthMiddleware + +__all__ = ("SessionAuth",) + +if TYPE_CHECKING: + from litestar.connection import ASGIConnection + from litestar.di import Provide + from litestar.types import ControllerRouterHandler, Guard, Method, Scopes, SyncOrAsyncUnion, TypeEncodersMap + + +@dataclass +class SessionAuth(Generic[UserType, BaseSessionBackendT], AbstractSecurityConfig[UserType, Dict[str, Any]]): + """Session Based Security Backend.""" + + session_backend_config: BaseBackendConfig[BaseSessionBackendT] # pyright: ignore + """A session backend config.""" + retrieve_user_handler: Callable[[Any, ASGIConnection], SyncOrAsyncUnion[Any | None]] + """Callable that receives the ``auth`` value from the authentication middleware and returns a ``user`` value. + + Notes: + - User and Auth can be any arbitrary values specified by the security backend. + - The User and Auth values will be set by the middleware as ``scope["user"]`` and ``scope["auth"]`` respectively. + Once provided, they can access via the ``connection.user`` and ``connection.auth`` properties. + - The callable can be sync or async. If it is sync, it will be wrapped to support async. + + """ + + authentication_middleware_class: type[SessionAuthMiddleware] = field(default=SessionAuthMiddleware) # pyright: ignore + """The authentication middleware class to use. + + Must inherit from :class:`SessionAuthMiddleware <litestar.security.session_auth.middleware.SessionAuthMiddleware>` + """ + + guards: Iterable[Guard] | None = field(default=None) + """An iterable of guards to call for requests, providing authorization functionalities.""" + exclude: str | list[str] | None = field(default=None) + """A pattern or list of patterns to skip in the authentication middleware.""" + exclude_opt_key: str = field(default="exclude_from_auth") + """An identifier to use on routes to disable authentication and authorization checks for a particular route.""" + exclude_http_methods: Sequence[Method] | None = field( + default_factory=lambda: cast("Sequence[Method]", ["OPTIONS", "HEAD"]) + ) + """A sequence of http methods that do not require authentication. Defaults to ['OPTIONS', 'HEAD']""" + scopes: Scopes | None = field(default=None) + """ASGI scopes processed by the authentication middleware, if ``None``, both ``http`` and ``websocket`` will be + processed.""" + route_handlers: Iterable[ControllerRouterHandler] | None = field(default=None) + """An optional iterable of route handlers to register.""" + dependencies: dict[str, Provide] | None = field(default=None) + """An optional dictionary of dependency providers.""" + + type_encoders: TypeEncodersMap | None = field(default=None) + """A mapping of types to callables that transform them into types supported for serialization.""" + + @property + def middleware(self) -> DefineMiddleware: + """Use this property to insert the config into a middleware list on one of the application layers. + + Examples: + .. code-block:: python + + from typing import Any + from os import urandom + + from litestar import Litestar, Request, get + from litestar_session import SessionAuth + + + async def retrieve_user_from_session(session: dict[str, Any]) -> Any: + # implement logic here to retrieve a ``user`` datum given the session dictionary + ... + + + session_auth_config = SessionAuth( + secret=urandom(16), retrieve_user_handler=retrieve_user_from_session + ) + + + @get("/") + def my_handler(request: Request) -> None: ... + + + app = Litestar(route_handlers=[my_handler], middleware=[session_auth_config.middleware]) + + + Returns: + An instance of DefineMiddleware including ``self`` as the config kwarg value. + """ + return DefineMiddleware(MiddlewareWrapper, config=self) + + @property + def session_backend(self) -> BaseSessionBackendT: + """Create a session backend. + + Returns: + A subclass of :class:`BaseSessionBackend <litestar.middleware.session.base.BaseSessionBackend>` + """ + return self.session_backend_config._backend_class(config=self.session_backend_config) # pyright: ignore + + @property + def openapi_components(self) -> Components: + """Create OpenAPI documentation for the Session Authentication schema used. + + Returns: + An :class:`Components <litestar.openapi.spec.components.Components>` instance. + """ + return Components( + security_schemes={ + "sessionCookie": SecurityScheme( + type="apiKey", + name=self.session_backend_config.key, + security_scheme_in="cookie", # pyright: ignore + description="Session cookie authentication.", + ) + } + ) + + @property + def security_requirement(self) -> SecurityRequirement: + """Return OpenAPI 3.1. + + :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` for the auth + backend. + + Returns: + An OpenAPI 3.1 :data:`SecurityRequirement <.openapi.spec.SecurityRequirement>` dictionary. + """ + return {"sessionCookie": []} diff --git a/venv/lib/python3.11/site-packages/litestar/security/session_auth/middleware.py b/venv/lib/python3.11/site-packages/litestar/security/session_auth/middleware.py new file mode 100644 index 0000000..bb3fce4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/security/session_auth/middleware.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Sequence + +from litestar.exceptions import NotAuthorizedException +from litestar.middleware.authentication import ( + AbstractAuthenticationMiddleware, + AuthenticationResult, +) +from litestar.middleware.exceptions import ExceptionHandlerMiddleware +from litestar.types import Empty, Method, Scopes + +__all__ = ("MiddlewareWrapper", "SessionAuthMiddleware") + +if TYPE_CHECKING: + from litestar.connection import ASGIConnection + from litestar.security.session_auth.auth import SessionAuth + from litestar.types import ASGIApp, Receive, Scope, Send + + +class MiddlewareWrapper: + """Wrapper class that serves as the middleware entry point.""" + + def __init__(self, app: ASGIApp, config: SessionAuth[Any, Any]) -> None: + """Wrap the SessionAuthMiddleware inside ExceptionHandlerMiddleware, and it wraps this inside SessionMiddleware. + This allows the auth middleware to raise exceptions and still have the response handled, while having the + session cleared. + + Args: + app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. + config: An instance of SessionAuth. + """ + self.app = app + self.config = config + self.has_wrapped_middleware = False + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + """Handle creating a middleware stack and calling it. + + Args: + scope: The ASGI connection scope. + receive: The ASGI receive function. + send: The ASGI send function. + + Returns: + None + """ + if not self.has_wrapped_middleware: + litestar_app = scope["app"] + auth_middleware = self.config.authentication_middleware_class( + app=self.app, + exclude=self.config.exclude, + exclude_http_methods=self.config.exclude_http_methods, + exclude_opt_key=self.config.exclude_opt_key, + scopes=self.config.scopes, + retrieve_user_handler=self.config.retrieve_user_handler, # type: ignore[arg-type] + ) + exception_middleware = ExceptionHandlerMiddleware( + app=auth_middleware, + exception_handlers=litestar_app.exception_handlers or {}, # pyright: ignore + debug=None, + ) + self.app = self.config.session_backend_config.middleware.middleware( + app=exception_middleware, + backend=self.config.session_backend, + ) + self.has_wrapped_middleware = True + await self.app(scope, receive, send) + + +class SessionAuthMiddleware(AbstractAuthenticationMiddleware): + """Session Authentication Middleware.""" + + def __init__( + self, + app: ASGIApp, + exclude: str | list[str] | None, + exclude_http_methods: Sequence[Method] | None, + exclude_opt_key: str, + retrieve_user_handler: Callable[[dict[str, Any], ASGIConnection[Any, Any, Any, Any]], Awaitable[Any]], + scopes: Scopes | None, + ) -> None: + """Session based authentication middleware. + + Args: + app: An ASGIApp, this value is the next ASGI handler to call in the middleware stack. + exclude: A pattern or list of patterns to skip in the authentication middleware. + exclude_http_methods: A sequence of http methods that do not require authentication. + exclude_opt_key: An identifier to use on routes to disable authentication and authorization checks for a particular route. + scopes: ASGI scopes processed by the authentication middleware. + retrieve_user_handler: Callable that receives the ``session`` value from the authentication middleware and returns a ``user`` value. + """ + super().__init__( + app=app, + exclude=exclude, + exclude_from_auth_key=exclude_opt_key, + exclude_http_methods=exclude_http_methods, + scopes=scopes, + ) + self.retrieve_user_handler = retrieve_user_handler + + async def authenticate_request(self, connection: ASGIConnection[Any, Any, Any, Any]) -> AuthenticationResult: + """Authenticate an incoming connection. + + Args: + connection: An :class:`ASGIConnection <.connection.ASGIConnection>` instance. + + Raises: + NotAuthorizedException: if session data is empty or user is not found. + + Returns: + :class:`AuthenticationResult <.middleware.authentication.AuthenticationResult>` + """ + if not connection.session or connection.scope["session"] is Empty: + # the assignment of 'Empty' forces the session middleware to clear session data. + connection.scope["session"] = Empty + raise NotAuthorizedException("no session data found") + + user = await self.retrieve_user_handler(connection.session, connection) + + if not user: + connection.scope["session"] = Empty + raise NotAuthorizedException("no user correlating to session found") + + return AuthenticationResult(user=user, auth=connection.session) |