summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/stores/base.py
blob: 34aa514fcaaa34365ed2c97f481e81c4cae4448c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from __future__ import annotations

from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Optional

from msgspec import Struct
from msgspec.msgpack import decode as msgpack_decode
from msgspec.msgpack import encode as msgpack_encode

if TYPE_CHECKING:
    from types import TracebackType

    from typing_extensions import Self


__all__ = ("Store", "NamespacedStore", "StorageObject")


class Store(ABC):
    """Thread and process safe asynchronous key/value store."""

    @abstractmethod
    async def set(self, key: str, value: str | bytes, expires_in: int | timedelta | None = None) -> None:
        """Set a value.

        Args:
            key: Key to associate the value with
            value: Value to store
            expires_in: Time in seconds before the key is considered expired

        Returns:
            ``None``
        """
        raise NotImplementedError

    @abstractmethod
    async def get(self, key: str, renew_for: int | timedelta | None = None) -> bytes | None:
        """Get a value.

        Args:
            key: Key associated with the value
            renew_for: If given and the value had an initial expiry time set, renew the
                expiry time for ``renew_for`` seconds. If the value has not been set
                with an expiry time this is a no-op

        Returns:
            The value associated with ``key`` if it exists and is not expired, else
            ``None``
        """
        raise NotImplementedError

    @abstractmethod
    async def delete(self, key: str) -> None:
        """Delete a value.

        If no such key exists, this is a no-op.

        Args:
            key: Key of the value to delete
        """
        raise NotImplementedError

    @abstractmethod
    async def delete_all(self) -> None:
        """Delete all stored values."""
        raise NotImplementedError

    @abstractmethod
    async def exists(self, key: str) -> bool:
        """Check if a given ``key`` exists."""
        raise NotImplementedError

    @abstractmethod
    async def expires_in(self, key: str) -> int | None:
        """Get the time in seconds ``key`` expires in. If no such ``key`` exists or no
        expiry time was set, return ``None``.
        """
        raise NotImplementedError

    async def __aenter__(self) -> None:  # noqa: B027
        pass

    async def __aexit__(  # noqa: B027
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        pass


class NamespacedStore(Store):
    """A subclass of :class:`Store`, offering hierarchical namespacing.

    Bulk actions on a parent namespace should affect all child namespaces, whereas other operations on all namespaces
    should be isolated.
    """

    @abstractmethod
    def with_namespace(self, namespace: str) -> Self:
        """Return a new instance of :class:`NamespacedStore`, which exists in a child namespace of the current namespace.
        Bulk actions on the parent namespace should affect all child namespaces, whereas other operations on all
        namespaces should be isolated.
        """


class StorageObject(Struct):
    """:class:`msgspec.Struct` to store serialized data alongside with their expiry time."""

    expires_at: Optional[datetime]  # noqa: UP007
    data: bytes

    @classmethod
    def new(cls, data: bytes, expires_in: int | timedelta | None) -> StorageObject:
        """Construct a new :class:`StorageObject` instance."""
        if expires_in is not None and not isinstance(expires_in, timedelta):
            expires_in = timedelta(seconds=expires_in)
        return cls(
            data=data,
            expires_at=(datetime.now(tz=timezone.utc) + expires_in) if expires_in else None,
        )

    @property
    def expired(self) -> bool:
        """Return if the :class:`StorageObject` is expired"""
        return self.expires_at is not None and datetime.now(tz=timezone.utc) >= self.expires_at

    @property
    def expires_in(self) -> int:
        """Return the expiry time of this ``StorageObject`` in seconds. If no expiry time
        was set, return ``-1``.
        """
        if self.expires_at:
            return int(self.expires_at.timestamp() - datetime.now(tz=timezone.utc).timestamp())
        return -1

    def to_bytes(self) -> bytes:
        """Encode the instance to bytes"""
        return msgpack_encode(self)

    @classmethod
    def from_bytes(cls, raw: bytes) -> StorageObject:
        """Load a previously encoded with :meth:`StorageObject.to_bytes`"""
        return msgpack_decode(raw, type=cls)