From 12cf076118570eebbff08c6b3090e0d4798447a1 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:17:55 -0400 Subject: no venv --- .../litestar/middleware/session/client_side.py | 264 --------------------- 1 file changed, 264 deletions(-) delete mode 100644 venv/lib/python3.11/site-packages/litestar/middleware/session/client_side.py (limited to 'venv/lib/python3.11/site-packages/litestar/middleware/session/client_side.py') diff --git a/venv/lib/python3.11/site-packages/litestar/middleware/session/client_side.py b/venv/lib/python3.11/site-packages/litestar/middleware/session/client_side.py deleted file mode 100644 index f709410..0000000 --- a/venv/lib/python3.11/site-packages/litestar/middleware/session/client_side.py +++ /dev/null @@ -1,264 +0,0 @@ -from __future__ import annotations - -import binascii -import contextlib -import re -import time -from base64 import b64decode, b64encode -from dataclasses import dataclass, field -from os import urandom -from typing import TYPE_CHECKING, Any, Literal - -from litestar.datastructures import MutableScopeHeaders -from litestar.datastructures.cookie import Cookie -from litestar.enums import ScopeType -from litestar.exceptions import ( - ImproperlyConfiguredException, - MissingDependencyException, -) -from litestar.serialization import decode_json, encode_json -from litestar.types import Empty, Scopes -from litestar.utils.dataclass import extract_dataclass_items - -from .base import ONE_DAY_IN_SECONDS, BaseBackendConfig, BaseSessionBackend - -__all__ = ("ClientSideSessionBackend", "CookieBackendConfig") - - -try: - from cryptography.exceptions import InvalidTag - from cryptography.hazmat.primitives.ciphers.aead import AESGCM -except ImportError as e: - raise MissingDependencyException("cryptography") from e - -if TYPE_CHECKING: - from litestar.connection import ASGIConnection - from litestar.types import Message, Scope, ScopeSession - -NONCE_SIZE = 12 -CHUNK_SIZE = 4096 - 64 -AAD = b"additional_authenticated_data=" - - -class ClientSideSessionBackend(BaseSessionBackend["CookieBackendConfig"]): - """Cookie backend for SessionMiddleware.""" - - __slots__ = ("aesgcm", "cookie_re") - - def __init__(self, config: CookieBackendConfig) -> None: - """Initialize ``ClientSideSessionBackend``. - - Args: - config: SessionCookieConfig instance. - """ - super().__init__(config) - self.aesgcm = AESGCM(config.secret) - self.cookie_re = re.compile(rf"{self.config.key}(?:-\d+)?") - - def dump_data(self, data: Any, scope: Scope | None = None) -> list[bytes]: - """Given serializable data, including pydantic models and numpy types, dump it into a bytes string, encrypt, - encode and split it into chunks of the desirable size. - - Args: - data: Data to serialize, encrypt, encode and chunk. - scope: The ASGI connection scope. - - Notes: - - The returned list is composed of a chunks of a single base64 encoded - string that is encrypted using AES-CGM. - - Returns: - List of encoded bytes string of a maximum length equal to the ``CHUNK_SIZE`` constant. - """ - serialized = self.serialize_data(data, scope) - associated_data = encode_json({"expires_at": round(time.time()) + self.config.max_age}) - nonce = urandom(NONCE_SIZE) - encrypted = self.aesgcm.encrypt(nonce, serialized, associated_data=associated_data) - encoded = b64encode(nonce + encrypted + AAD + associated_data) - return [encoded[i : i + CHUNK_SIZE] for i in range(0, len(encoded), CHUNK_SIZE)] - - def load_data(self, data: list[bytes]) -> dict[str, Any]: - """Given a list of strings, decodes them into the session object. - - Args: - data: A list of strings derived from the request's session cookie(s). - - Returns: - A deserialized session value. - """ - decoded = b64decode(b"".join(data)) - nonce = decoded[:NONCE_SIZE] - aad_starts_from = decoded.find(AAD) - associated_data = decoded[aad_starts_from:].replace(AAD, b"") if aad_starts_from != -1 else None - if associated_data and decode_json(value=associated_data)["expires_at"] > round(time.time()): - encrypted_session = decoded[NONCE_SIZE:aad_starts_from] - decrypted = self.aesgcm.decrypt(nonce, encrypted_session, associated_data=associated_data) - return self.deserialize_data(decrypted) - return {} - - def get_cookie_keys(self, connection: ASGIConnection) -> list[str]: - """Return a list of cookie-keys from the connection if they match the session-cookie pattern. - - Args: - connection: An ASGIConnection instance - - Returns: - A list of session-cookie keys - """ - return sorted(key for key in connection.cookies if self.cookie_re.fullmatch(key)) - - def _create_session_cookies(self, data: list[bytes], cookie_params: dict[str, Any] | None = None) -> list[Cookie]: - """Create a list of cookies containing the session data. - If the data is split into multiple cookies, the key will be of the format ``session-{segment number}``, - however if only one cookie is needed, the key will be ``session``. - """ - if cookie_params is None: - cookie_params = dict( - extract_dataclass_items( - self.config, - exclude_none=True, - include={f for f in Cookie.__dict__ if f not in ("key", "secret")}, - ) - ) - - if len(data) == 1: - return [ - Cookie( - value=data[0].decode("utf-8"), - key=self.config.key, - **cookie_params, - ) - ] - - return [ - Cookie( - value=datum.decode("utf-8"), - key=f"{self.config.key}-{i}", - **cookie_params, - ) - for i, datum in enumerate(data) - ] - - async def store_in_message(self, scope_session: ScopeSession, message: Message, connection: ASGIConnection) -> None: - """Store data from ``scope_session`` in ``Message`` in the form of cookies. If the contents of ``scope_session`` - are too large to fit a single cookie, it will be split across several cookies, following the naming scheme of - ``-``. If the session is empty or shrinks, cookies will be cleared by setting their value to - ``"null"`` - - Args: - scope_session: Current session to store - message: Outgoing send-message - connection: Originating ASGIConnection containing the scope - - Returns: - None - """ - - scope = connection.scope - headers = MutableScopeHeaders.from_message(message) - cookie_keys = self.get_cookie_keys(connection) - - if scope_session and scope_session is not Empty: - data = self.dump_data(scope_session, scope=scope) - cookie_params = dict( - extract_dataclass_items( - self.config, - exclude_none=True, - include={f for f in Cookie.__dict__ if f not in ("key", "secret")}, - ) - ) - for cookie in self._create_session_cookies(data, cookie_params): - headers.add("Set-Cookie", cookie.to_header(header="")) - # Cookies with the same key overwrite the earlier cookie with that key. To expire earlier session - # cookies, first check how many session cookies will not be overwritten in this upcoming response. - # If leftover cookies are greater than or equal to 1, that means older session cookies have to be - # expired and their names are in cookie_keys. - cookies_to_clear = cookie_keys[len(data) :] if len(cookie_keys) - len(data) > 0 else [] - else: - cookies_to_clear = cookie_keys - - for cookie_key in cookies_to_clear: - cookie_params = dict( - extract_dataclass_items( - self.config, - exclude_none=True, - include={f for f in Cookie.__dict__ if f not in ("key", "secret", "max_age")}, - ) - ) - headers.add( - "Set-Cookie", - Cookie(value="null", key=cookie_key, expires=0, **cookie_params).to_header(header=""), - ) - - async def load_from_connection(self, connection: ASGIConnection) -> dict[str, Any]: - """Load session data from a connection's session-cookies and return it as a dictionary. - - Args: - connection: Originating ASGIConnection - - Returns: - The session data - """ - if cookie_keys := self.get_cookie_keys(connection): - data = [connection.cookies[key].encode("utf-8") for key in cookie_keys] - # If these exceptions occur, the session must remain empty so do nothing. - with contextlib.suppress(InvalidTag, binascii.Error): - return self.load_data(data) - return {} - - def get_session_id(self, connection: ASGIConnection) -> str | None: - return None - - -@dataclass -class CookieBackendConfig(BaseBackendConfig[ClientSideSessionBackend]): # pyright: ignore - """Configuration for [SessionMiddleware] middleware.""" - - _backend_class = ClientSideSessionBackend - - secret: bytes - """A secret key to use for generating an encryption key. - - Must have a length of 16 (128 bits), 24 (192 bits) or 32 (256 bits) characters. - """ - key: str = field(default="session") - """Key to use for the cookie inside the header, e.g. ``session=`` where ``session`` is the cookie key and - ```` is the session data. - - Notes: - - If a session cookie exceeds 4KB in size it is split. In this case the key will be of the format - ``session-{segment number}``. - - """ - max_age: int = field(default=ONE_DAY_IN_SECONDS * 14) - """Maximal age of the cookie before its invalidated.""" - scopes: Scopes = field(default_factory=lambda: {ScopeType.HTTP, ScopeType.WEBSOCKET}) - """Scopes for the middleware - options are ``http`` and ``websocket`` with the default being both""" - path: str = field(default="/") - """Path fragment that must exist in the request url for the cookie to be valid. - - Defaults to ``'/'``. - """ - domain: str | None = field(default=None) - """Domain for which the cookie is valid.""" - secure: bool = field(default=False) - """Https is required for the cookie.""" - httponly: bool = field(default=True) - """Forbids javascript to access the cookie via 'Document.cookie'.""" - samesite: Literal["lax", "strict", "none"] = field(default="lax") - """Controls whether or not a cookie is sent with cross-site requests. - - Defaults to ``lax``. - """ - exclude: str | list[str] | None = field(default=None) - """A pattern or list of patterns to skip in the session middleware.""" - exclude_opt_key: str = field(default="skip_session") - """An identifier to use on routes to disable the session middleware for a particular route.""" - - def __post_init__(self) -> None: - if len(self.key) < 1 or len(self.key) > 256: - raise ImproperlyConfiguredException("key must be a string with a length between 1-256") - if self.max_age < 1: - raise ImproperlyConfiguredException("max_age must be greater than 0") - if len(self.secret) not in {16, 24, 32}: - raise ImproperlyConfiguredException("secret length must be 16 (128 bit), 24 (192 bit) or 32 (256 bit)") -- cgit v1.2.3