diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 |
commit | 12cf076118570eebbff08c6b3090e0d4798447a1 (patch) | |
tree | 3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/litestar/channels/backends | |
parent | c45662ff3923b34614ddcc8feb9195541166dcc5 (diff) |
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/channels/backends')
15 files changed, 0 insertions, 579 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc Binary files differdeleted file mode 100644 index ab4e477..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc Binary files differdeleted file mode 100644 index a577096..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc Binary files differdeleted file mode 100644 index 334d295..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc Binary files differdeleted file mode 100644 index 9a87da5..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc Binary files differdeleted file mode 100644 index f663280..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc Binary files differdeleted file mode 100644 index bf86a3e..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua deleted file mode 100644 index a3faa6e..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua +++ /dev/null @@ -1,15 +0,0 @@ -local key_pattern = ARGV[1] - -local cursor = 0 -local deleted_streams = 0 - -repeat - local result = redis.call('SCAN', cursor, 'MATCH', key_pattern) - for _,key in ipairs(result[2]) do - redis.call('DEL', key) - deleted_streams = deleted_streams + 1 - end - cursor = tonumber(result[1]) -until cursor == 0 - -return deleted_streams diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua deleted file mode 100644 index 8402d08..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua +++ /dev/null @@ -1,5 +0,0 @@ -local data = ARGV[1] - -for _, channel in ipairs(KEYS) do - redis.call("PUBLISH", channel, data) -end diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua deleted file mode 100644 index f6b322f..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua +++ /dev/null @@ -1,13 +0,0 @@ -local data = ARGV[1] -local limit = ARGV[2] -local exp = ARGV[3] -local maxlen_approx = ARGV[4] - -for i, key in ipairs(KEYS) do - if maxlen_approx == 1 then - redis.call("XADD", key, "MAXLEN", "~", limit, "*", "data", data, "channel", ARGV[i + 4]) - else - redis.call("XADD", key, "MAXLEN", limit, "*", "data", data, "channel", ARGV[i + 4]) - end - redis.call("PEXPIRE", key, exp) -end diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py deleted file mode 100644 index 967b208..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py +++ /dev/null @@ -1,90 +0,0 @@ -from __future__ import annotations - -import asyncio -from contextlib import AsyncExitStack -from functools import partial -from typing import AsyncGenerator, Awaitable, Callable, Iterable, overload - -import asyncpg - -from litestar.channels import ChannelsBackend -from litestar.exceptions import ImproperlyConfiguredException - - -class AsyncPgChannelsBackend(ChannelsBackend): - _listener_conn: asyncpg.Connection - - @overload - def __init__(self, dsn: str) -> None: ... - - @overload - def __init__( - self, - *, - make_connection: Callable[[], Awaitable[asyncpg.Connection]], - ) -> None: ... - - def __init__( - self, - dsn: str | None = None, - *, - make_connection: Callable[[], Awaitable[asyncpg.Connection]] | None = None, - ) -> None: - if not (dsn or make_connection): - raise ImproperlyConfiguredException("Need to specify dsn or make_connection") - - self._subscribed_channels: set[str] = set() - self._exit_stack = AsyncExitStack() - self._connect = make_connection or partial(asyncpg.connect, dsn=dsn) - self._queue: asyncio.Queue[tuple[str, bytes]] | None = None - - async def on_startup(self) -> None: - self._queue = asyncio.Queue() - self._listener_conn = await self._connect() - - async def on_shutdown(self) -> None: - await self._listener_conn.close() - self._queue = None - - async def publish(self, data: bytes, channels: Iterable[str]) -> None: - if self._queue is None: - raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?") - - dec_data = data.decode("utf-8") - - conn = await self._connect() - try: - for channel in channels: - await conn.execute("SELECT pg_notify($1, $2);", channel, dec_data) - finally: - await conn.close() - - async def subscribe(self, channels: Iterable[str]) -> None: - for channel in set(channels) - self._subscribed_channels: - await self._listener_conn.add_listener(channel, self._listener) # type: ignore[arg-type] - self._subscribed_channels.add(channel) - - async def unsubscribe(self, channels: Iterable[str]) -> None: - for channel in channels: - await self._listener_conn.remove_listener(channel, self._listener) # type: ignore[arg-type] - self._subscribed_channels = self._subscribed_channels - set(channels) - - async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]: - if self._queue is None: - raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?") - - while True: - channel, message = await self._queue.get() - self._queue.task_done() - # an UNLISTEN may be in transit while we're getting here, so we double-check - # that we are actually supposed to deliver this message - if channel in self._subscribed_channels: - yield channel, message - - async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: - raise NotImplementedError() - - def _listener(self, /, connection: asyncpg.Connection, pid: int, channel: str, payload: object) -> None: - if not isinstance(payload, str): - raise RuntimeError("Invalid data received") - self._queue.put_nowait((channel, payload.encode("utf-8"))) # type: ignore[union-attr] diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py deleted file mode 100644 index ce7ee81..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import AsyncGenerator, Iterable - - -class ChannelsBackend(ABC): - @abstractmethod - async def on_startup(self) -> None: - """Called by the plugin on application startup""" - ... - - @abstractmethod - async def on_shutdown(self) -> None: - """Called by the plugin on application shutdown""" - ... - - @abstractmethod - async def publish(self, data: bytes, channels: Iterable[str]) -> None: - """Publish the message ``data`` to all ``channels``""" - ... - - @abstractmethod - async def subscribe(self, channels: Iterable[str]) -> None: - """Start listening for events on ``channels``""" - ... - - @abstractmethod - async def unsubscribe(self, channels: Iterable[str]) -> None: - """Stop listening for events on ``channels``""" - ... - - @abstractmethod - def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]: - """Return a generator, iterating over events of subscribed channels as they become available""" - ... - - @abstractmethod - async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: - """Return the event history of ``channel``, at most ``limit`` entries""" - ... diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py deleted file mode 100644 index a96a66b..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py +++ /dev/null @@ -1,84 +0,0 @@ -from __future__ import annotations - -from asyncio import Queue -from collections import defaultdict, deque -from typing import Any, AsyncGenerator, Iterable - -from litestar.channels.backends.base import ChannelsBackend - - -class MemoryChannelsBackend(ChannelsBackend): - """An in-memory channels backend""" - - def __init__(self, history: int = 0) -> None: - self._max_history_length = history - self._channels: set[str] = set() - self._queue: Queue[tuple[str, bytes]] | None = None - self._history: defaultdict[str, deque[bytes]] = defaultdict(lambda: deque(maxlen=self._max_history_length)) - - async def on_startup(self) -> None: - self._queue = Queue() - - async def on_shutdown(self) -> None: - self._queue = None - - async def publish(self, data: bytes, channels: Iterable[str]) -> None: - """Publish ``data`` to ``channels``. If a channel has not yet been subscribed to, - this will be a no-op. - - Args: - data: Data to publish - channels: Channels to publish to - - Returns: - None - - Raises: - RuntimeError: If ``on_startup`` has not been called yet - """ - if not self._queue: - raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?") - - for channel in channels: - if channel not in self._channels: - continue - - self._queue.put_nowait((channel, data)) - if self._max_history_length: - for channel in channels: - self._history[channel].append(data) - - async def subscribe(self, channels: Iterable[str]) -> None: - """Subscribe to ``channels``, and enable publishing to them""" - self._channels.update(channels) - - async def unsubscribe(self, channels: Iterable[str]) -> None: - """Unsubscribe from ``channels``""" - self._channels -= set(channels) - try: - for channel in channels: - del self._history[channel] - except KeyError: - pass - - async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]: - """Return a generator, iterating over events of subscribed channels as they become available""" - if self._queue is None: - raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?") - - while True: - channel, message = await self._queue.get() - self._queue.task_done() - - # if a message is published to a channel and the channel is then - # unsubscribed before retrieving that message from the stream, it can still - # end up here, so we double-check if we still are interested in this message - if channel in self._channels: - yield channel, message - - async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: - """Return the event history of ``channel``, at most ``limit`` entries""" - history = list(self._history[channel]) - if limit: - history = history[-limit:] - return history diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py deleted file mode 100644 index 14b53bc..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py +++ /dev/null @@ -1,54 +0,0 @@ -from __future__ import annotations - -from contextlib import AsyncExitStack -from typing import AsyncGenerator, Iterable - -import psycopg - -from .base import ChannelsBackend - - -def _safe_quote(ident: str) -> str: - return '"{}"'.format(ident.replace('"', '""')) # sourcery skip - - -class PsycoPgChannelsBackend(ChannelsBackend): - _listener_conn: psycopg.AsyncConnection - - def __init__(self, pg_dsn: str) -> None: - self._pg_dsn = pg_dsn - self._subscribed_channels: set[str] = set() - self._exit_stack = AsyncExitStack() - - async def on_startup(self) -> None: - self._listener_conn = await psycopg.AsyncConnection.connect(self._pg_dsn, autocommit=True) - await self._exit_stack.enter_async_context(self._listener_conn) - - async def on_shutdown(self) -> None: - await self._exit_stack.aclose() - - async def publish(self, data: bytes, channels: Iterable[str]) -> None: - dec_data = data.decode("utf-8") - async with await psycopg.AsyncConnection.connect(self._pg_dsn) as conn: - for channel in channels: - await conn.execute("SELECT pg_notify(%s, %s);", (channel, dec_data)) - - async def subscribe(self, channels: Iterable[str]) -> None: - for channel in set(channels) - self._subscribed_channels: - # can't use placeholders in LISTEN - await self._listener_conn.execute(f"LISTEN {_safe_quote(channel)};") # pyright: ignore - - self._subscribed_channels.add(channel) - - async def unsubscribe(self, channels: Iterable[str]) -> None: - for channel in channels: - # can't use placeholders in UNLISTEN - await self._listener_conn.execute(f"UNLISTEN {_safe_quote(channel)};") # pyright: ignore - self._subscribed_channels = self._subscribed_channels - set(channels) - - async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]: - async for notify in self._listener_conn.notifies(): - yield notify.channel, notify.payload.encode("utf-8") - - async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: - raise NotImplementedError() diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py deleted file mode 100644 index f03c9f2..0000000 --- a/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py +++ /dev/null @@ -1,277 +0,0 @@ -from __future__ import annotations - -import asyncio -import sys - -if sys.version_info < (3, 9): - import importlib_resources # pyright: ignore -else: - import importlib.resources as importlib_resources -from abc import ABC -from datetime import timedelta -from typing import TYPE_CHECKING, Any, AsyncGenerator, Iterable, cast - -from litestar.channels.backends.base import ChannelsBackend - -if TYPE_CHECKING: - from redis.asyncio import Redis - from redis.asyncio.client import PubSub - -_resource_path = importlib_resources.files("litestar.channels.backends") -_PUBSUB_PUBLISH_SCRIPT = (_resource_path / "_redis_pubsub_publish.lua").read_text() -_FLUSHALL_STREAMS_SCRIPT = (_resource_path / "_redis_flushall_streams.lua").read_text() -_XADD_EXPIRE_SCRIPT = (_resource_path / "_redis_xadd_expire.lua").read_text() - - -class _LazyEvent: - """A lazy proxy to asyncio.Event that only creates the event once it's accessed. - - It ensures that the Event is created within a running event loop. If it's not, there can be an issue where a future - within the event itself is attached to a different loop. - - This happens in our tests and could also happen when a user creates an instance of the backend outside an event loop - in their application. - """ - - def __init__(self) -> None: - self.__event: asyncio.Event | None = None - - @property - def _event(self) -> asyncio.Event: - if self.__event is None: - self.__event = asyncio.Event() - return self.__event - - def set(self) -> None: - self._event.set() - - def clear(self) -> None: - self._event.clear() - - async def wait(self) -> None: - await self._event.wait() - - -class RedisChannelsBackend(ChannelsBackend, ABC): - def __init__(self, *, redis: Redis, key_prefix: str, stream_sleep_no_subscriptions: int) -> None: - """Base redis channels backend. - - Args: - redis: A :class:`redis.asyncio.Redis` instance - key_prefix: Key prefix to use for storing data in redis - stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the - :meth:`stream_events` generator, should no subscribers exist - """ - self._redis = redis - self._key_prefix = key_prefix - self._stream_sleep_no_subscriptions = stream_sleep_no_subscriptions - - def _make_key(self, channel: str) -> str: - return f"{self._key_prefix}_{channel.upper()}" - - -class RedisChannelsPubSubBackend(RedisChannelsBackend): - def __init__( - self, *, redis: Redis, stream_sleep_no_subscriptions: int = 1, key_prefix: str = "LITESTAR_CHANNELS" - ) -> None: - """Redis channels backend, `Pub/Sub <https://redis.io/docs/manual/pubsub/>`_. - - This backend provides low overhead and resource usage but no support for history. - - Args: - redis: A :class:`redis.asyncio.Redis` instance - key_prefix: Key prefix to use for storing data in redis - stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the - :meth:`stream_events` generator, should no subscribers exist - """ - super().__init__( - redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix - ) - self.__pub_sub: PubSub | None = None - self._publish_script = self._redis.register_script(_PUBSUB_PUBLISH_SCRIPT) - self._has_subscribed = _LazyEvent() - - @property - def _pub_sub(self) -> PubSub: - if self.__pub_sub is None: - self.__pub_sub = self._redis.pubsub() - return self.__pub_sub - - async def on_startup(self) -> None: - # this method should not do anything in this case - pass - - async def on_shutdown(self) -> None: - await self._pub_sub.reset() - - async def subscribe(self, channels: Iterable[str]) -> None: - """Subscribe to ``channels``, and enable publishing to them""" - await self._pub_sub.subscribe(*channels) - self._has_subscribed.set() - - async def unsubscribe(self, channels: Iterable[str]) -> None: - """Stop listening for events on ``channels``""" - await self._pub_sub.unsubscribe(*channels) - # if we have no active subscriptions, or only subscriptions which are pending - # to be unsubscribed we consider the backend to be unsubscribed from all - # channels, so we reset the event - if not self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels: - self._has_subscribed.clear() - - async def publish(self, data: bytes, channels: Iterable[str]) -> None: - """Publish ``data`` to ``channels`` - - .. note:: - This operation is performed atomically, using a lua script - """ - await self._publish_script(keys=list(set(channels)), args=[data]) - - async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]: - """Return a generator, iterating over events of subscribed channels as they become available. - - If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions`` - milliseconds. - """ - - while True: - await self._has_subscribed.wait() - message = await self._pub_sub.get_message( - ignore_subscribe_messages=True, timeout=self._stream_sleep_no_subscriptions - ) - if message is None: - continue - - channel: str = message["channel"].decode() - data: bytes = message["data"] - # redis handles the unsubscibes with a queue; Unsubscribing doesn't mean the - # unsubscribe will happen immediately after requesting it, so we could - # receive a message on a channel that, from a client's perspective, it's not - # subscribed to anymore - if channel.encode() in self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels: - yield channel, data - - async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: - """Not implemented""" - raise NotImplementedError() - - -class RedisChannelsStreamBackend(RedisChannelsBackend): - def __init__( - self, - history: int, - *, - redis: Redis, - stream_sleep_no_subscriptions: int = 1, - cap_streams_approximate: bool = True, - stream_ttl: int | timedelta = timedelta(seconds=60), - key_prefix: str = "LITESTAR_CHANNELS", - ) -> None: - """Redis channels backend, `streams <https://redis.io/docs/data-types/streams/>`_. - - Args: - history: Amount of messages to keep. This will set a ``MAXLEN`` to the streams - redis: A :class:`redis.asyncio.Redis` instance - key_prefix: Key prefix to use for streams - stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the - :meth:`stream_events` generator, should no subscribers exist - cap_streams_approximate: Set the streams ``MAXLEN`` using the ``~`` approximation - operator for improved performance - stream_ttl: TTL of a stream in milliseconds or as a timedelta. A streams TTL will be set on each publishing - operation using ``PEXPIRE`` - """ - super().__init__( - redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix - ) - - self._history_limit = history - self._subscribed_channels: set[str] = set() - self._cap_streams_approximate = cap_streams_approximate - self._stream_ttl = stream_ttl if isinstance(stream_ttl, int) else int(stream_ttl.total_seconds() * 1000) - self._flush_all_streams_script = self._redis.register_script(_FLUSHALL_STREAMS_SCRIPT) - self._publish_script = self._redis.register_script(_XADD_EXPIRE_SCRIPT) - self._has_subscribed_channels = _LazyEvent() - - async def on_startup(self) -> None: - """Called on application startup""" - - async def on_shutdown(self) -> None: - """Called on application shutdown""" - - async def subscribe(self, channels: Iterable[str]) -> None: - """Subscribe to ``channels``""" - self._subscribed_channels.update(channels) - self._has_subscribed_channels.set() - - async def unsubscribe(self, channels: Iterable[str]) -> None: - """Unsubscribe from ``channels``""" - self._subscribed_channels -= set(channels) - if not len(self._subscribed_channels): - self._has_subscribed_channels.clear() - - async def publish(self, data: bytes, channels: Iterable[str]) -> None: - """Publish ``data`` to ``channels``. - - .. note:: - This operation is performed atomically, using a Lua script - """ - channels = set(channels) - await self._publish_script( - keys=[self._make_key(key) for key in channels], - args=[ - data, - self._history_limit, - self._stream_ttl, - int(self._cap_streams_approximate), - *channels, - ], - ) - - async def _get_subscribed_channels(self) -> set[str]: - """Get subscribed channels. If no channels are currently subscribed, wait""" - await self._has_subscribed_channels.wait() - return self._subscribed_channels - - async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]: - """Return a generator, iterating over events of subscribed channels as they become available. - - If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions`` - milliseconds. - """ - stream_ids: dict[str, bytes] = {} - while True: - # We wait for subscribed channels, because we can't pass an empty dict to - # xread and block for subscribers - stream_keys = [self._make_key(c) for c in await self._get_subscribed_channels()] - - data: list[tuple[bytes, list[tuple[bytes, dict[bytes, bytes]]]]] = await self._redis.xread( - {key: stream_ids.get(key, 0) for key in stream_keys}, block=self._stream_sleep_no_subscriptions - ) - - if not data: - continue - - for stream_key, channel_events in data: - for event in channel_events: - event_data = event[1][b"data"] - channel_name = event[1][b"channel"].decode() - stream_ids[stream_key.decode()] = event[0] - yield channel_name, event_data - - async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: - """Return the history of ``channels``, returning at most ``limit`` messages""" - data: Iterable[tuple[bytes, dict[bytes, bytes]]] - if limit: - data = reversed(await self._redis.xrevrange(self._make_key(channel), count=limit)) - else: - data = await self._redis.xrange(self._make_key(channel)) - - return [event[b"data"] for _, event in data] - - async def flush_all(self) -> int: - """Delete all stream keys with the ``key_prefix``. - - .. important:: - This method is incompatible with redis clusters - """ - deleted_streams = await self._flush_all_streams_script(keys=[], args=[f"{self._key_prefix}*"]) - return cast("int", deleted_streams) |