summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/channels/backends
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/channels/backends')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py0
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pycbin210 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pycbin6422 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pycbin2607 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pycbin5024 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pycbin5220 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pycbin17950 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua15
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua5
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua13
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py90
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/base.py41
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py84
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py54
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py277
15 files changed, 0 insertions, 579 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py
+++ /dev/null
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index ab4e477..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc
deleted file mode 100644
index a577096..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc
deleted file mode 100644
index 334d295..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc
deleted file mode 100644
index 9a87da5..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc
deleted file mode 100644
index f663280..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc
deleted file mode 100644
index bf86a3e..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua
deleted file mode 100644
index a3faa6e..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua
+++ /dev/null
@@ -1,15 +0,0 @@
-local key_pattern = ARGV[1]
-
-local cursor = 0
-local deleted_streams = 0
-
-repeat
- local result = redis.call('SCAN', cursor, 'MATCH', key_pattern)
- for _,key in ipairs(result[2]) do
- redis.call('DEL', key)
- deleted_streams = deleted_streams + 1
- end
- cursor = tonumber(result[1])
-until cursor == 0
-
-return deleted_streams
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua
deleted file mode 100644
index 8402d08..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua
+++ /dev/null
@@ -1,5 +0,0 @@
-local data = ARGV[1]
-
-for _, channel in ipairs(KEYS) do
- redis.call("PUBLISH", channel, data)
-end
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua
deleted file mode 100644
index f6b322f..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua
+++ /dev/null
@@ -1,13 +0,0 @@
-local data = ARGV[1]
-local limit = ARGV[2]
-local exp = ARGV[3]
-local maxlen_approx = ARGV[4]
-
-for i, key in ipairs(KEYS) do
- if maxlen_approx == 1 then
- redis.call("XADD", key, "MAXLEN", "~", limit, "*", "data", data, "channel", ARGV[i + 4])
- else
- redis.call("XADD", key, "MAXLEN", limit, "*", "data", data, "channel", ARGV[i + 4])
- end
- redis.call("PEXPIRE", key, exp)
-end
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py
deleted file mode 100644
index 967b208..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py
+++ /dev/null
@@ -1,90 +0,0 @@
-from __future__ import annotations
-
-import asyncio
-from contextlib import AsyncExitStack
-from functools import partial
-from typing import AsyncGenerator, Awaitable, Callable, Iterable, overload
-
-import asyncpg
-
-from litestar.channels import ChannelsBackend
-from litestar.exceptions import ImproperlyConfiguredException
-
-
-class AsyncPgChannelsBackend(ChannelsBackend):
- _listener_conn: asyncpg.Connection
-
- @overload
- def __init__(self, dsn: str) -> None: ...
-
- @overload
- def __init__(
- self,
- *,
- make_connection: Callable[[], Awaitable[asyncpg.Connection]],
- ) -> None: ...
-
- def __init__(
- self,
- dsn: str | None = None,
- *,
- make_connection: Callable[[], Awaitable[asyncpg.Connection]] | None = None,
- ) -> None:
- if not (dsn or make_connection):
- raise ImproperlyConfiguredException("Need to specify dsn or make_connection")
-
- self._subscribed_channels: set[str] = set()
- self._exit_stack = AsyncExitStack()
- self._connect = make_connection or partial(asyncpg.connect, dsn=dsn)
- self._queue: asyncio.Queue[tuple[str, bytes]] | None = None
-
- async def on_startup(self) -> None:
- self._queue = asyncio.Queue()
- self._listener_conn = await self._connect()
-
- async def on_shutdown(self) -> None:
- await self._listener_conn.close()
- self._queue = None
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- if self._queue is None:
- raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
-
- dec_data = data.decode("utf-8")
-
- conn = await self._connect()
- try:
- for channel in channels:
- await conn.execute("SELECT pg_notify($1, $2);", channel, dec_data)
- finally:
- await conn.close()
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- for channel in set(channels) - self._subscribed_channels:
- await self._listener_conn.add_listener(channel, self._listener) # type: ignore[arg-type]
- self._subscribed_channels.add(channel)
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- for channel in channels:
- await self._listener_conn.remove_listener(channel, self._listener) # type: ignore[arg-type]
- self._subscribed_channels = self._subscribed_channels - set(channels)
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]:
- if self._queue is None:
- raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
-
- while True:
- channel, message = await self._queue.get()
- self._queue.task_done()
- # an UNLISTEN may be in transit while we're getting here, so we double-check
- # that we are actually supposed to deliver this message
- if channel in self._subscribed_channels:
- yield channel, message
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- raise NotImplementedError()
-
- def _listener(self, /, connection: asyncpg.Connection, pid: int, channel: str, payload: object) -> None:
- if not isinstance(payload, str):
- raise RuntimeError("Invalid data received")
- self._queue.put_nowait((channel, payload.encode("utf-8"))) # type: ignore[union-attr]
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py
deleted file mode 100644
index ce7ee81..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py
+++ /dev/null
@@ -1,41 +0,0 @@
-from __future__ import annotations
-
-from abc import ABC, abstractmethod
-from typing import AsyncGenerator, Iterable
-
-
-class ChannelsBackend(ABC):
- @abstractmethod
- async def on_startup(self) -> None:
- """Called by the plugin on application startup"""
- ...
-
- @abstractmethod
- async def on_shutdown(self) -> None:
- """Called by the plugin on application shutdown"""
- ...
-
- @abstractmethod
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- """Publish the message ``data`` to all ``channels``"""
- ...
-
- @abstractmethod
- async def subscribe(self, channels: Iterable[str]) -> None:
- """Start listening for events on ``channels``"""
- ...
-
- @abstractmethod
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- """Stop listening for events on ``channels``"""
- ...
-
- @abstractmethod
- def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]:
- """Return a generator, iterating over events of subscribed channels as they become available"""
- ...
-
- @abstractmethod
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- """Return the event history of ``channel``, at most ``limit`` entries"""
- ...
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py
deleted file mode 100644
index a96a66b..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py
+++ /dev/null
@@ -1,84 +0,0 @@
-from __future__ import annotations
-
-from asyncio import Queue
-from collections import defaultdict, deque
-from typing import Any, AsyncGenerator, Iterable
-
-from litestar.channels.backends.base import ChannelsBackend
-
-
-class MemoryChannelsBackend(ChannelsBackend):
- """An in-memory channels backend"""
-
- def __init__(self, history: int = 0) -> None:
- self._max_history_length = history
- self._channels: set[str] = set()
- self._queue: Queue[tuple[str, bytes]] | None = None
- self._history: defaultdict[str, deque[bytes]] = defaultdict(lambda: deque(maxlen=self._max_history_length))
-
- async def on_startup(self) -> None:
- self._queue = Queue()
-
- async def on_shutdown(self) -> None:
- self._queue = None
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- """Publish ``data`` to ``channels``. If a channel has not yet been subscribed to,
- this will be a no-op.
-
- Args:
- data: Data to publish
- channels: Channels to publish to
-
- Returns:
- None
-
- Raises:
- RuntimeError: If ``on_startup`` has not been called yet
- """
- if not self._queue:
- raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
-
- for channel in channels:
- if channel not in self._channels:
- continue
-
- self._queue.put_nowait((channel, data))
- if self._max_history_length:
- for channel in channels:
- self._history[channel].append(data)
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- """Subscribe to ``channels``, and enable publishing to them"""
- self._channels.update(channels)
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- """Unsubscribe from ``channels``"""
- self._channels -= set(channels)
- try:
- for channel in channels:
- del self._history[channel]
- except KeyError:
- pass
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]:
- """Return a generator, iterating over events of subscribed channels as they become available"""
- if self._queue is None:
- raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
-
- while True:
- channel, message = await self._queue.get()
- self._queue.task_done()
-
- # if a message is published to a channel and the channel is then
- # unsubscribed before retrieving that message from the stream, it can still
- # end up here, so we double-check if we still are interested in this message
- if channel in self._channels:
- yield channel, message
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- """Return the event history of ``channel``, at most ``limit`` entries"""
- history = list(self._history[channel])
- if limit:
- history = history[-limit:]
- return history
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py
deleted file mode 100644
index 14b53bc..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from __future__ import annotations
-
-from contextlib import AsyncExitStack
-from typing import AsyncGenerator, Iterable
-
-import psycopg
-
-from .base import ChannelsBackend
-
-
-def _safe_quote(ident: str) -> str:
- return '"{}"'.format(ident.replace('"', '""')) # sourcery skip
-
-
-class PsycoPgChannelsBackend(ChannelsBackend):
- _listener_conn: psycopg.AsyncConnection
-
- def __init__(self, pg_dsn: str) -> None:
- self._pg_dsn = pg_dsn
- self._subscribed_channels: set[str] = set()
- self._exit_stack = AsyncExitStack()
-
- async def on_startup(self) -> None:
- self._listener_conn = await psycopg.AsyncConnection.connect(self._pg_dsn, autocommit=True)
- await self._exit_stack.enter_async_context(self._listener_conn)
-
- async def on_shutdown(self) -> None:
- await self._exit_stack.aclose()
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- dec_data = data.decode("utf-8")
- async with await psycopg.AsyncConnection.connect(self._pg_dsn) as conn:
- for channel in channels:
- await conn.execute("SELECT pg_notify(%s, %s);", (channel, dec_data))
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- for channel in set(channels) - self._subscribed_channels:
- # can't use placeholders in LISTEN
- await self._listener_conn.execute(f"LISTEN {_safe_quote(channel)};") # pyright: ignore
-
- self._subscribed_channels.add(channel)
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- for channel in channels:
- # can't use placeholders in UNLISTEN
- await self._listener_conn.execute(f"UNLISTEN {_safe_quote(channel)};") # pyright: ignore
- self._subscribed_channels = self._subscribed_channels - set(channels)
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]:
- async for notify in self._listener_conn.notifies():
- yield notify.channel, notify.payload.encode("utf-8")
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- raise NotImplementedError()
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py
deleted file mode 100644
index f03c9f2..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py
+++ /dev/null
@@ -1,277 +0,0 @@
-from __future__ import annotations
-
-import asyncio
-import sys
-
-if sys.version_info < (3, 9):
- import importlib_resources # pyright: ignore
-else:
- import importlib.resources as importlib_resources
-from abc import ABC
-from datetime import timedelta
-from typing import TYPE_CHECKING, Any, AsyncGenerator, Iterable, cast
-
-from litestar.channels.backends.base import ChannelsBackend
-
-if TYPE_CHECKING:
- from redis.asyncio import Redis
- from redis.asyncio.client import PubSub
-
-_resource_path = importlib_resources.files("litestar.channels.backends")
-_PUBSUB_PUBLISH_SCRIPT = (_resource_path / "_redis_pubsub_publish.lua").read_text()
-_FLUSHALL_STREAMS_SCRIPT = (_resource_path / "_redis_flushall_streams.lua").read_text()
-_XADD_EXPIRE_SCRIPT = (_resource_path / "_redis_xadd_expire.lua").read_text()
-
-
-class _LazyEvent:
- """A lazy proxy to asyncio.Event that only creates the event once it's accessed.
-
- It ensures that the Event is created within a running event loop. If it's not, there can be an issue where a future
- within the event itself is attached to a different loop.
-
- This happens in our tests and could also happen when a user creates an instance of the backend outside an event loop
- in their application.
- """
-
- def __init__(self) -> None:
- self.__event: asyncio.Event | None = None
-
- @property
- def _event(self) -> asyncio.Event:
- if self.__event is None:
- self.__event = asyncio.Event()
- return self.__event
-
- def set(self) -> None:
- self._event.set()
-
- def clear(self) -> None:
- self._event.clear()
-
- async def wait(self) -> None:
- await self._event.wait()
-
-
-class RedisChannelsBackend(ChannelsBackend, ABC):
- def __init__(self, *, redis: Redis, key_prefix: str, stream_sleep_no_subscriptions: int) -> None:
- """Base redis channels backend.
-
- Args:
- redis: A :class:`redis.asyncio.Redis` instance
- key_prefix: Key prefix to use for storing data in redis
- stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the
- :meth:`stream_events` generator, should no subscribers exist
- """
- self._redis = redis
- self._key_prefix = key_prefix
- self._stream_sleep_no_subscriptions = stream_sleep_no_subscriptions
-
- def _make_key(self, channel: str) -> str:
- return f"{self._key_prefix}_{channel.upper()}"
-
-
-class RedisChannelsPubSubBackend(RedisChannelsBackend):
- def __init__(
- self, *, redis: Redis, stream_sleep_no_subscriptions: int = 1, key_prefix: str = "LITESTAR_CHANNELS"
- ) -> None:
- """Redis channels backend, `Pub/Sub <https://redis.io/docs/manual/pubsub/>`_.
-
- This backend provides low overhead and resource usage but no support for history.
-
- Args:
- redis: A :class:`redis.asyncio.Redis` instance
- key_prefix: Key prefix to use for storing data in redis
- stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the
- :meth:`stream_events` generator, should no subscribers exist
- """
- super().__init__(
- redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix
- )
- self.__pub_sub: PubSub | None = None
- self._publish_script = self._redis.register_script(_PUBSUB_PUBLISH_SCRIPT)
- self._has_subscribed = _LazyEvent()
-
- @property
- def _pub_sub(self) -> PubSub:
- if self.__pub_sub is None:
- self.__pub_sub = self._redis.pubsub()
- return self.__pub_sub
-
- async def on_startup(self) -> None:
- # this method should not do anything in this case
- pass
-
- async def on_shutdown(self) -> None:
- await self._pub_sub.reset()
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- """Subscribe to ``channels``, and enable publishing to them"""
- await self._pub_sub.subscribe(*channels)
- self._has_subscribed.set()
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- """Stop listening for events on ``channels``"""
- await self._pub_sub.unsubscribe(*channels)
- # if we have no active subscriptions, or only subscriptions which are pending
- # to be unsubscribed we consider the backend to be unsubscribed from all
- # channels, so we reset the event
- if not self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels:
- self._has_subscribed.clear()
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- """Publish ``data`` to ``channels``
-
- .. note::
- This operation is performed atomically, using a lua script
- """
- await self._publish_script(keys=list(set(channels)), args=[data])
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]:
- """Return a generator, iterating over events of subscribed channels as they become available.
-
- If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions``
- milliseconds.
- """
-
- while True:
- await self._has_subscribed.wait()
- message = await self._pub_sub.get_message(
- ignore_subscribe_messages=True, timeout=self._stream_sleep_no_subscriptions
- )
- if message is None:
- continue
-
- channel: str = message["channel"].decode()
- data: bytes = message["data"]
- # redis handles the unsubscibes with a queue; Unsubscribing doesn't mean the
- # unsubscribe will happen immediately after requesting it, so we could
- # receive a message on a channel that, from a client's perspective, it's not
- # subscribed to anymore
- if channel.encode() in self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels:
- yield channel, data
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- """Not implemented"""
- raise NotImplementedError()
-
-
-class RedisChannelsStreamBackend(RedisChannelsBackend):
- def __init__(
- self,
- history: int,
- *,
- redis: Redis,
- stream_sleep_no_subscriptions: int = 1,
- cap_streams_approximate: bool = True,
- stream_ttl: int | timedelta = timedelta(seconds=60),
- key_prefix: str = "LITESTAR_CHANNELS",
- ) -> None:
- """Redis channels backend, `streams <https://redis.io/docs/data-types/streams/>`_.
-
- Args:
- history: Amount of messages to keep. This will set a ``MAXLEN`` to the streams
- redis: A :class:`redis.asyncio.Redis` instance
- key_prefix: Key prefix to use for streams
- stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the
- :meth:`stream_events` generator, should no subscribers exist
- cap_streams_approximate: Set the streams ``MAXLEN`` using the ``~`` approximation
- operator for improved performance
- stream_ttl: TTL of a stream in milliseconds or as a timedelta. A streams TTL will be set on each publishing
- operation using ``PEXPIRE``
- """
- super().__init__(
- redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix
- )
-
- self._history_limit = history
- self._subscribed_channels: set[str] = set()
- self._cap_streams_approximate = cap_streams_approximate
- self._stream_ttl = stream_ttl if isinstance(stream_ttl, int) else int(stream_ttl.total_seconds() * 1000)
- self._flush_all_streams_script = self._redis.register_script(_FLUSHALL_STREAMS_SCRIPT)
- self._publish_script = self._redis.register_script(_XADD_EXPIRE_SCRIPT)
- self._has_subscribed_channels = _LazyEvent()
-
- async def on_startup(self) -> None:
- """Called on application startup"""
-
- async def on_shutdown(self) -> None:
- """Called on application shutdown"""
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- """Subscribe to ``channels``"""
- self._subscribed_channels.update(channels)
- self._has_subscribed_channels.set()
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- """Unsubscribe from ``channels``"""
- self._subscribed_channels -= set(channels)
- if not len(self._subscribed_channels):
- self._has_subscribed_channels.clear()
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- """Publish ``data`` to ``channels``.
-
- .. note::
- This operation is performed atomically, using a Lua script
- """
- channels = set(channels)
- await self._publish_script(
- keys=[self._make_key(key) for key in channels],
- args=[
- data,
- self._history_limit,
- self._stream_ttl,
- int(self._cap_streams_approximate),
- *channels,
- ],
- )
-
- async def _get_subscribed_channels(self) -> set[str]:
- """Get subscribed channels. If no channels are currently subscribed, wait"""
- await self._has_subscribed_channels.wait()
- return self._subscribed_channels
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]:
- """Return a generator, iterating over events of subscribed channels as they become available.
-
- If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions``
- milliseconds.
- """
- stream_ids: dict[str, bytes] = {}
- while True:
- # We wait for subscribed channels, because we can't pass an empty dict to
- # xread and block for subscribers
- stream_keys = [self._make_key(c) for c in await self._get_subscribed_channels()]
-
- data: list[tuple[bytes, list[tuple[bytes, dict[bytes, bytes]]]]] = await self._redis.xread(
- {key: stream_ids.get(key, 0) for key in stream_keys}, block=self._stream_sleep_no_subscriptions
- )
-
- if not data:
- continue
-
- for stream_key, channel_events in data:
- for event in channel_events:
- event_data = event[1][b"data"]
- channel_name = event[1][b"channel"].decode()
- stream_ids[stream_key.decode()] = event[0]
- yield channel_name, event_data
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- """Return the history of ``channels``, returning at most ``limit`` messages"""
- data: Iterable[tuple[bytes, dict[bytes, bytes]]]
- if limit:
- data = reversed(await self._redis.xrevrange(self._make_key(channel), count=limit))
- else:
- data = await self._redis.xrange(self._make_key(channel))
-
- return [event[b"data"] for _, event in data]
-
- async def flush_all(self) -> int:
- """Delete all stream keys with the ``key_prefix``.
-
- .. important::
- This method is incompatible with redis clusters
- """
- deleted_streams = await self._flush_all_streams_script(keys=[], args=[f"{self._key_prefix}*"])
- return cast("int", deleted_streams)