summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/stores/file.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/stores/file.py')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/stores/file.py170
1 files changed, 0 insertions, 170 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/stores/file.py b/venv/lib/python3.11/site-packages/litestar/stores/file.py
deleted file mode 100644
index 25c52eb..0000000
--- a/venv/lib/python3.11/site-packages/litestar/stores/file.py
+++ /dev/null
@@ -1,170 +0,0 @@
-from __future__ import annotations
-
-import os
-import shutil
-import unicodedata
-from tempfile import mkstemp
-from typing import TYPE_CHECKING
-
-from anyio import Path
-
-from litestar.concurrency import sync_to_thread
-
-from .base import NamespacedStore, StorageObject
-
-__all__ = ("FileStore",)
-
-
-if TYPE_CHECKING:
- from datetime import timedelta
- from os import PathLike
-
-
-def _safe_file_name(name: str) -> str:
- name = unicodedata.normalize("NFKD", name)
- return "".join(c if c.isalnum() else str(ord(c)) for c in name)
-
-
-class FileStore(NamespacedStore):
- """File based, thread and process safe, asynchronous key/value store."""
-
- __slots__ = {"path": "file path"}
-
- def __init__(self, path: PathLike[str]) -> None:
- """Initialize ``FileStorage``.
-
- Args:
- path: Path to store data under
- """
- self.path = Path(path)
-
- def with_namespace(self, namespace: str) -> FileStore:
- """Return a new instance of :class:`FileStore`, using a sub-path of the current store's path."""
- if not namespace.isalnum():
- raise ValueError(f"Invalid namespace: {namespace!r}")
- return FileStore(self.path / namespace)
-
- def _path_from_key(self, key: str) -> Path:
- return self.path / _safe_file_name(key)
-
- @staticmethod
- async def _load_from_path(path: Path) -> StorageObject | None:
- try:
- data = await path.read_bytes()
- return StorageObject.from_bytes(data)
- except FileNotFoundError:
- return None
-
- def _write_sync(self, target_file: Path, storage_obj: StorageObject) -> None:
- try:
- tmp_file_fd, tmp_file_name = mkstemp(dir=self.path, prefix=f"{target_file.name}.tmp")
- renamed = False
- try:
- try:
- os.write(tmp_file_fd, storage_obj.to_bytes())
- finally:
- os.close(tmp_file_fd)
-
- os.replace(tmp_file_name, target_file) # noqa: PTH105
- renamed = True
- finally:
- if not renamed:
- os.unlink(tmp_file_name) # noqa: PTH108
- except OSError:
- pass
-
- async def _write(self, target_file: Path, storage_obj: StorageObject) -> None:
- await sync_to_thread(self._write_sync, target_file, storage_obj)
-
- async def set(self, key: str, value: str | bytes, expires_in: int | timedelta | None = None) -> None:
- """Set a value.
-
- Args:
- key: Key to associate the value with
- value: Value to store
- expires_in: Time in seconds before the key is considered expired
-
- Returns:
- ``None``
- """
-
- await self.path.mkdir(exist_ok=True)
- path = self._path_from_key(key)
- if isinstance(value, str):
- value = value.encode("utf-8")
- storage_obj = StorageObject.new(data=value, expires_in=expires_in)
- await self._write(path, storage_obj)
-
- async def get(self, key: str, renew_for: int | timedelta | None = None) -> bytes | None:
- """Get a value.
-
- Args:
- key: Key associated with the value
- renew_for: If given and the value had an initial expiry time set, renew the
- expiry time for ``renew_for`` seconds. If the value has not been set
- with an expiry time this is a no-op
-
- Returns:
- The value associated with ``key`` if it exists and is not expired, else
- ``None``
- """
- path = self._path_from_key(key)
- storage_obj = await self._load_from_path(path)
-
- if not storage_obj:
- return None
-
- if storage_obj.expired:
- await path.unlink(missing_ok=True)
- return None
-
- if renew_for and storage_obj.expires_at:
- await self.set(key, value=storage_obj.data, expires_in=renew_for)
-
- return storage_obj.data
-
- async def delete(self, key: str) -> None:
- """Delete a value.
-
- If no such key exists, this is a no-op.
-
- Args:
- key: Key of the value to delete
- """
- path = self._path_from_key(key)
- await path.unlink(missing_ok=True)
-
- async def delete_all(self) -> None:
- """Delete all stored values.
-
- Note:
- This deletes and recreates :attr:`FileStore.path`
- """
-
- await sync_to_thread(shutil.rmtree, self.path)
- await self.path.mkdir(exist_ok=True)
-
- async def delete_expired(self) -> None:
- """Delete expired items.
-
- Since expired items are normally only cleared on access (i.e. when calling
- :meth:`.get`), this method should be called in regular intervals
- to free disk space.
- """
- async for file in self.path.iterdir():
- wrapper = await self._load_from_path(file)
- if wrapper and wrapper.expired:
- await file.unlink(missing_ok=True)
-
- async def exists(self, key: str) -> bool:
- """Check if a given ``key`` exists."""
- path = self._path_from_key(key)
- return await path.exists()
-
- async def expires_in(self, key: str) -> int | None:
- """Get the time in seconds ``key`` expires in. If no such ``key`` exists or no
- expiry time was set, return ``None``.
- """
- if storage_obj := await self._load_from_path(self._path_from_key(key)):
- return storage_obj.expires_in
- return None