summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/stores/file.py
blob: 25c52eb6b54f9ff2814d5be5ca48d542cb209b29 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from __future__ import annotations

import os
import shutil
import unicodedata
from tempfile import mkstemp
from typing import TYPE_CHECKING

from anyio import Path

from litestar.concurrency import sync_to_thread

from .base import NamespacedStore, StorageObject

__all__ = ("FileStore",)


if TYPE_CHECKING:
    from datetime import timedelta
    from os import PathLike


def _safe_file_name(name: str) -> str:
    name = unicodedata.normalize("NFKD", name)
    return "".join(c if c.isalnum() else str(ord(c)) for c in name)


class FileStore(NamespacedStore):
    """File based, thread and process safe, asynchronous key/value store."""

    __slots__ = {"path": "file path"}

    def __init__(self, path: PathLike[str]) -> None:
        """Initialize ``FileStorage``.

        Args:
            path: Path to store data under
        """
        self.path = Path(path)

    def with_namespace(self, namespace: str) -> FileStore:
        """Return a new instance of :class:`FileStore`, using  a sub-path of the current store's path."""
        if not namespace.isalnum():
            raise ValueError(f"Invalid namespace: {namespace!r}")
        return FileStore(self.path / namespace)

    def _path_from_key(self, key: str) -> Path:
        return self.path / _safe_file_name(key)

    @staticmethod
    async def _load_from_path(path: Path) -> StorageObject | None:
        try:
            data = await path.read_bytes()
            return StorageObject.from_bytes(data)
        except FileNotFoundError:
            return None

    def _write_sync(self, target_file: Path, storage_obj: StorageObject) -> None:
        try:
            tmp_file_fd, tmp_file_name = mkstemp(dir=self.path, prefix=f"{target_file.name}.tmp")
            renamed = False
            try:
                try:
                    os.write(tmp_file_fd, storage_obj.to_bytes())
                finally:
                    os.close(tmp_file_fd)

                os.replace(tmp_file_name, target_file)  # noqa: PTH105
                renamed = True
            finally:
                if not renamed:
                    os.unlink(tmp_file_name)  # noqa: PTH108
        except OSError:
            pass

    async def _write(self, target_file: Path, storage_obj: StorageObject) -> None:
        await sync_to_thread(self._write_sync, target_file, storage_obj)

    async def set(self, key: str, value: str | bytes, expires_in: int | timedelta | None = None) -> None:
        """Set a value.

        Args:
            key: Key to associate the value with
            value: Value to store
            expires_in: Time in seconds before the key is considered expired

        Returns:
            ``None``
        """

        await self.path.mkdir(exist_ok=True)
        path = self._path_from_key(key)
        if isinstance(value, str):
            value = value.encode("utf-8")
        storage_obj = StorageObject.new(data=value, expires_in=expires_in)
        await self._write(path, storage_obj)

    async def get(self, key: str, renew_for: int | timedelta | None = None) -> bytes | None:
        """Get a value.

        Args:
            key: Key associated with the value
            renew_for: If given and the value had an initial expiry time set, renew the
                expiry time for ``renew_for`` seconds. If the value has not been set
                with an expiry time this is a no-op

        Returns:
            The value associated with ``key`` if it exists and is not expired, else
            ``None``
        """
        path = self._path_from_key(key)
        storage_obj = await self._load_from_path(path)

        if not storage_obj:
            return None

        if storage_obj.expired:
            await path.unlink(missing_ok=True)
            return None

        if renew_for and storage_obj.expires_at:
            await self.set(key, value=storage_obj.data, expires_in=renew_for)

        return storage_obj.data

    async def delete(self, key: str) -> None:
        """Delete a value.

        If no such key exists, this is a no-op.

        Args:
            key: Key of the value to delete
        """
        path = self._path_from_key(key)
        await path.unlink(missing_ok=True)

    async def delete_all(self) -> None:
        """Delete all stored values.

        Note:
            This deletes and recreates :attr:`FileStore.path`
        """

        await sync_to_thread(shutil.rmtree, self.path)
        await self.path.mkdir(exist_ok=True)

    async def delete_expired(self) -> None:
        """Delete expired items.

        Since expired items are normally only cleared on access (i.e. when calling
        :meth:`.get`), this method should be called in regular intervals
        to free disk space.
        """
        async for file in self.path.iterdir():
            wrapper = await self._load_from_path(file)
            if wrapper and wrapper.expired:
                await file.unlink(missing_ok=True)

    async def exists(self, key: str) -> bool:
        """Check if a given ``key`` exists."""
        path = self._path_from_key(key)
        return await path.exists()

    async def expires_in(self, key: str) -> int | None:
        """Get the time in seconds ``key`` expires in. If no such ``key`` exists or no
        expiry time was set, return ``None``.
        """
        if storage_obj := await self._load_from_path(self._path_from_key(key)):
            return storage_obj.expires_in
        return None