1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
from __future__ import annotations
import os
import shutil
import unicodedata
from tempfile import mkstemp
from typing import TYPE_CHECKING
from anyio import Path
from litestar.concurrency import sync_to_thread
from .base import NamespacedStore, StorageObject
__all__ = ("FileStore",)
if TYPE_CHECKING:
from datetime import timedelta
from os import PathLike
def _safe_file_name(name: str) -> str:
name = unicodedata.normalize("NFKD", name)
return "".join(c if c.isalnum() else str(ord(c)) for c in name)
class FileStore(NamespacedStore):
"""File based, thread and process safe, asynchronous key/value store."""
__slots__ = {"path": "file path"}
def __init__(self, path: PathLike[str]) -> None:
"""Initialize ``FileStorage``.
Args:
path: Path to store data under
"""
self.path = Path(path)
def with_namespace(self, namespace: str) -> FileStore:
"""Return a new instance of :class:`FileStore`, using a sub-path of the current store's path."""
if not namespace.isalnum():
raise ValueError(f"Invalid namespace: {namespace!r}")
return FileStore(self.path / namespace)
def _path_from_key(self, key: str) -> Path:
return self.path / _safe_file_name(key)
@staticmethod
async def _load_from_path(path: Path) -> StorageObject | None:
try:
data = await path.read_bytes()
return StorageObject.from_bytes(data)
except FileNotFoundError:
return None
def _write_sync(self, target_file: Path, storage_obj: StorageObject) -> None:
try:
tmp_file_fd, tmp_file_name = mkstemp(dir=self.path, prefix=f"{target_file.name}.tmp")
renamed = False
try:
try:
os.write(tmp_file_fd, storage_obj.to_bytes())
finally:
os.close(tmp_file_fd)
os.replace(tmp_file_name, target_file) # noqa: PTH105
renamed = True
finally:
if not renamed:
os.unlink(tmp_file_name) # noqa: PTH108
except OSError:
pass
async def _write(self, target_file: Path, storage_obj: StorageObject) -> None:
await sync_to_thread(self._write_sync, target_file, storage_obj)
async def set(self, key: str, value: str | bytes, expires_in: int | timedelta | None = None) -> None:
"""Set a value.
Args:
key: Key to associate the value with
value: Value to store
expires_in: Time in seconds before the key is considered expired
Returns:
``None``
"""
await self.path.mkdir(exist_ok=True)
path = self._path_from_key(key)
if isinstance(value, str):
value = value.encode("utf-8")
storage_obj = StorageObject.new(data=value, expires_in=expires_in)
await self._write(path, storage_obj)
async def get(self, key: str, renew_for: int | timedelta | None = None) -> bytes | None:
"""Get a value.
Args:
key: Key associated with the value
renew_for: If given and the value had an initial expiry time set, renew the
expiry time for ``renew_for`` seconds. If the value has not been set
with an expiry time this is a no-op
Returns:
The value associated with ``key`` if it exists and is not expired, else
``None``
"""
path = self._path_from_key(key)
storage_obj = await self._load_from_path(path)
if not storage_obj:
return None
if storage_obj.expired:
await path.unlink(missing_ok=True)
return None
if renew_for and storage_obj.expires_at:
await self.set(key, value=storage_obj.data, expires_in=renew_for)
return storage_obj.data
async def delete(self, key: str) -> None:
"""Delete a value.
If no such key exists, this is a no-op.
Args:
key: Key of the value to delete
"""
path = self._path_from_key(key)
await path.unlink(missing_ok=True)
async def delete_all(self) -> None:
"""Delete all stored values.
Note:
This deletes and recreates :attr:`FileStore.path`
"""
await sync_to_thread(shutil.rmtree, self.path)
await self.path.mkdir(exist_ok=True)
async def delete_expired(self) -> None:
"""Delete expired items.
Since expired items are normally only cleared on access (i.e. when calling
:meth:`.get`), this method should be called in regular intervals
to free disk space.
"""
async for file in self.path.iterdir():
wrapper = await self._load_from_path(file)
if wrapper and wrapper.expired:
await file.unlink(missing_ok=True)
async def exists(self, key: str) -> bool:
"""Check if a given ``key`` exists."""
path = self._path_from_key(key)
return await path.exists()
async def expires_in(self, key: str) -> int | None:
"""Get the time in seconds ``key`` expires in. If no such ``key`` exists or no
expiry time was set, return ``None``.
"""
if storage_obj := await self._load_from_path(self._path_from_key(key)):
return storage_obj.expires_in
return None
|