diff options
| author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 | 
|---|---|---|
| committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 | 
| commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
| tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/channels/backends | |
| parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) | |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/channels/backends')
15 files changed, 579 insertions, 0 deletions
| diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pycBinary files differ new file mode 100644 index 0000000..ab4e477 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pycBinary files differ new file mode 100644 index 0000000..a577096 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pycBinary files differ new file mode 100644 index 0000000..334d295 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pycBinary files differ new file mode 100644 index 0000000..9a87da5 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pycBinary files differ new file mode 100644 index 0000000..f663280 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pycBinary files differ new file mode 100644 index 0000000..bf86a3e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua new file mode 100644 index 0000000..a3faa6e --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua @@ -0,0 +1,15 @@ +local key_pattern = ARGV[1] + +local cursor = 0 +local deleted_streams = 0 + +repeat +    local result = redis.call('SCAN', cursor, 'MATCH', key_pattern) +    for _,key in ipairs(result[2]) do +        redis.call('DEL', key) +        deleted_streams = deleted_streams + 1 +    end +    cursor = tonumber(result[1]) +until cursor == 0 + +return deleted_streams diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua new file mode 100644 index 0000000..8402d08 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua @@ -0,0 +1,5 @@ +local data = ARGV[1] + +for _, channel in ipairs(KEYS) do +    redis.call("PUBLISH", channel, data) +end diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua new file mode 100644 index 0000000..f6b322f --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua @@ -0,0 +1,13 @@ +local data = ARGV[1] +local limit = ARGV[2] +local exp = ARGV[3] +local maxlen_approx = ARGV[4] + +for i, key in ipairs(KEYS) do +    if maxlen_approx == 1 then +        redis.call("XADD", key, "MAXLEN", "~", limit, "*", "data", data, "channel", ARGV[i + 4]) +    else +        redis.call("XADD", key, "MAXLEN", limit, "*", "data", data, "channel", ARGV[i + 4]) +    end +    redis.call("PEXPIRE", key, exp) +end diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py new file mode 100644 index 0000000..967b208 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import asyncio +from contextlib import AsyncExitStack +from functools import partial +from typing import AsyncGenerator, Awaitable, Callable, Iterable, overload + +import asyncpg + +from litestar.channels import ChannelsBackend +from litestar.exceptions import ImproperlyConfiguredException + + +class AsyncPgChannelsBackend(ChannelsBackend): +    _listener_conn: asyncpg.Connection + +    @overload +    def __init__(self, dsn: str) -> None: ... + +    @overload +    def __init__( +        self, +        *, +        make_connection: Callable[[], Awaitable[asyncpg.Connection]], +    ) -> None: ... + +    def __init__( +        self, +        dsn: str | None = None, +        *, +        make_connection: Callable[[], Awaitable[asyncpg.Connection]] | None = None, +    ) -> None: +        if not (dsn or make_connection): +            raise ImproperlyConfiguredException("Need to specify dsn or make_connection") + +        self._subscribed_channels: set[str] = set() +        self._exit_stack = AsyncExitStack() +        self._connect = make_connection or partial(asyncpg.connect, dsn=dsn) +        self._queue: asyncio.Queue[tuple[str, bytes]] | None = None + +    async def on_startup(self) -> None: +        self._queue = asyncio.Queue() +        self._listener_conn = await self._connect() + +    async def on_shutdown(self) -> None: +        await self._listener_conn.close() +        self._queue = None + +    async def publish(self, data: bytes, channels: Iterable[str]) -> None: +        if self._queue is None: +            raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?") + +        dec_data = data.decode("utf-8") + +        conn = await self._connect() +        try: +            for channel in channels: +                await conn.execute("SELECT pg_notify($1, $2);", channel, dec_data) +        finally: +            await conn.close() + +    async def subscribe(self, channels: Iterable[str]) -> None: +        for channel in set(channels) - self._subscribed_channels: +            await self._listener_conn.add_listener(channel, self._listener)  # type: ignore[arg-type] +            self._subscribed_channels.add(channel) + +    async def unsubscribe(self, channels: Iterable[str]) -> None: +        for channel in channels: +            await self._listener_conn.remove_listener(channel, self._listener)  # type: ignore[arg-type] +        self._subscribed_channels = self._subscribed_channels - set(channels) + +    async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]: +        if self._queue is None: +            raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?") + +        while True: +            channel, message = await self._queue.get() +            self._queue.task_done() +            # an UNLISTEN may be in transit while we're getting here, so we double-check +            # that we are actually supposed to deliver this message +            if channel in self._subscribed_channels: +                yield channel, message + +    async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: +        raise NotImplementedError() + +    def _listener(self, /, connection: asyncpg.Connection, pid: int, channel: str, payload: object) -> None: +        if not isinstance(payload, str): +            raise RuntimeError("Invalid data received") +        self._queue.put_nowait((channel, payload.encode("utf-8")))  # type: ignore[union-attr] diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py new file mode 100644 index 0000000..ce7ee81 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import AsyncGenerator, Iterable + + +class ChannelsBackend(ABC): +    @abstractmethod +    async def on_startup(self) -> None: +        """Called by the plugin on application startup""" +        ... + +    @abstractmethod +    async def on_shutdown(self) -> None: +        """Called by the plugin on application shutdown""" +        ... + +    @abstractmethod +    async def publish(self, data: bytes, channels: Iterable[str]) -> None: +        """Publish the message ``data`` to all ``channels``""" +        ... + +    @abstractmethod +    async def subscribe(self, channels: Iterable[str]) -> None: +        """Start listening for events on ``channels``""" +        ... + +    @abstractmethod +    async def unsubscribe(self, channels: Iterable[str]) -> None: +        """Stop listening for events on ``channels``""" +        ... + +    @abstractmethod +    def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]: +        """Return a generator, iterating over events of subscribed channels as they become available""" +        ... + +    @abstractmethod +    async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: +        """Return the event history of ``channel``, at most ``limit`` entries""" +        ... diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py new file mode 100644 index 0000000..a96a66b --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from asyncio import Queue +from collections import defaultdict, deque +from typing import Any, AsyncGenerator, Iterable + +from litestar.channels.backends.base import ChannelsBackend + + +class MemoryChannelsBackend(ChannelsBackend): +    """An in-memory channels backend""" + +    def __init__(self, history: int = 0) -> None: +        self._max_history_length = history +        self._channels: set[str] = set() +        self._queue: Queue[tuple[str, bytes]] | None = None +        self._history: defaultdict[str, deque[bytes]] = defaultdict(lambda: deque(maxlen=self._max_history_length)) + +    async def on_startup(self) -> None: +        self._queue = Queue() + +    async def on_shutdown(self) -> None: +        self._queue = None + +    async def publish(self, data: bytes, channels: Iterable[str]) -> None: +        """Publish ``data`` to ``channels``. If a channel has not yet been subscribed to, +        this will be a no-op. + +        Args: +            data: Data to publish +            channels: Channels to publish to + +        Returns: +            None + +        Raises: +            RuntimeError: If ``on_startup`` has not been called yet +        """ +        if not self._queue: +            raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?") + +        for channel in channels: +            if channel not in self._channels: +                continue + +            self._queue.put_nowait((channel, data)) +        if self._max_history_length: +            for channel in channels: +                self._history[channel].append(data) + +    async def subscribe(self, channels: Iterable[str]) -> None: +        """Subscribe to ``channels``, and enable publishing to them""" +        self._channels.update(channels) + +    async def unsubscribe(self, channels: Iterable[str]) -> None: +        """Unsubscribe from ``channels``""" +        self._channels -= set(channels) +        try: +            for channel in channels: +                del self._history[channel] +        except KeyError: +            pass + +    async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]: +        """Return a generator, iterating over events of subscribed channels as they become available""" +        if self._queue is None: +            raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?") + +        while True: +            channel, message = await self._queue.get() +            self._queue.task_done() + +            # if a message is published to a channel and the channel is then +            # unsubscribed before retrieving that message from the stream, it can still +            # end up here, so we double-check if we still are interested in this message +            if channel in self._channels: +                yield channel, message + +    async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: +        """Return the event history of ``channel``, at most ``limit`` entries""" +        history = list(self._history[channel]) +        if limit: +            history = history[-limit:] +        return history diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py new file mode 100644 index 0000000..14b53bc --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from contextlib import AsyncExitStack +from typing import AsyncGenerator, Iterable + +import psycopg + +from .base import ChannelsBackend + + +def _safe_quote(ident: str) -> str: +    return '"{}"'.format(ident.replace('"', '""'))  # sourcery skip + + +class PsycoPgChannelsBackend(ChannelsBackend): +    _listener_conn: psycopg.AsyncConnection + +    def __init__(self, pg_dsn: str) -> None: +        self._pg_dsn = pg_dsn +        self._subscribed_channels: set[str] = set() +        self._exit_stack = AsyncExitStack() + +    async def on_startup(self) -> None: +        self._listener_conn = await psycopg.AsyncConnection.connect(self._pg_dsn, autocommit=True) +        await self._exit_stack.enter_async_context(self._listener_conn) + +    async def on_shutdown(self) -> None: +        await self._exit_stack.aclose() + +    async def publish(self, data: bytes, channels: Iterable[str]) -> None: +        dec_data = data.decode("utf-8") +        async with await psycopg.AsyncConnection.connect(self._pg_dsn) as conn: +            for channel in channels: +                await conn.execute("SELECT pg_notify(%s, %s);", (channel, dec_data)) + +    async def subscribe(self, channels: Iterable[str]) -> None: +        for channel in set(channels) - self._subscribed_channels: +            # can't use placeholders in LISTEN +            await self._listener_conn.execute(f"LISTEN {_safe_quote(channel)};")  # pyright: ignore + +            self._subscribed_channels.add(channel) + +    async def unsubscribe(self, channels: Iterable[str]) -> None: +        for channel in channels: +            # can't use placeholders in UNLISTEN +            await self._listener_conn.execute(f"UNLISTEN {_safe_quote(channel)};")  # pyright: ignore +        self._subscribed_channels = self._subscribed_channels - set(channels) + +    async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]: +        async for notify in self._listener_conn.notifies(): +            yield notify.channel, notify.payload.encode("utf-8") + +    async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: +        raise NotImplementedError() diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py new file mode 100644 index 0000000..f03c9f2 --- /dev/null +++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py @@ -0,0 +1,277 @@ +from __future__ import annotations + +import asyncio +import sys + +if sys.version_info < (3, 9): +    import importlib_resources  # pyright: ignore +else: +    import importlib.resources as importlib_resources +from abc import ABC +from datetime import timedelta +from typing import TYPE_CHECKING, Any, AsyncGenerator, Iterable, cast + +from litestar.channels.backends.base import ChannelsBackend + +if TYPE_CHECKING: +    from redis.asyncio import Redis +    from redis.asyncio.client import PubSub + +_resource_path = importlib_resources.files("litestar.channels.backends") +_PUBSUB_PUBLISH_SCRIPT = (_resource_path / "_redis_pubsub_publish.lua").read_text() +_FLUSHALL_STREAMS_SCRIPT = (_resource_path / "_redis_flushall_streams.lua").read_text() +_XADD_EXPIRE_SCRIPT = (_resource_path / "_redis_xadd_expire.lua").read_text() + + +class _LazyEvent: +    """A lazy proxy to asyncio.Event that only creates the event once it's accessed. + +    It ensures that the Event is created within a running event loop. If it's not, there can be an issue where a future +    within the event itself is attached to a different loop. + +    This happens in our tests and could also happen when a user creates an instance of the backend outside an event loop +    in their application. +    """ + +    def __init__(self) -> None: +        self.__event: asyncio.Event | None = None + +    @property +    def _event(self) -> asyncio.Event: +        if self.__event is None: +            self.__event = asyncio.Event() +        return self.__event + +    def set(self) -> None: +        self._event.set() + +    def clear(self) -> None: +        self._event.clear() + +    async def wait(self) -> None: +        await self._event.wait() + + +class RedisChannelsBackend(ChannelsBackend, ABC): +    def __init__(self, *, redis: Redis, key_prefix: str, stream_sleep_no_subscriptions: int) -> None: +        """Base redis channels backend. + +        Args: +            redis: A :class:`redis.asyncio.Redis` instance +            key_prefix: Key prefix to use for storing data in redis +            stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the +                :meth:`stream_events` generator, should no subscribers exist +        """ +        self._redis = redis +        self._key_prefix = key_prefix +        self._stream_sleep_no_subscriptions = stream_sleep_no_subscriptions + +    def _make_key(self, channel: str) -> str: +        return f"{self._key_prefix}_{channel.upper()}" + + +class RedisChannelsPubSubBackend(RedisChannelsBackend): +    def __init__( +        self, *, redis: Redis, stream_sleep_no_subscriptions: int = 1, key_prefix: str = "LITESTAR_CHANNELS" +    ) -> None: +        """Redis channels backend, `Pub/Sub <https://redis.io/docs/manual/pubsub/>`_. + +        This backend provides low overhead and resource usage but no support for history. + +        Args: +            redis: A :class:`redis.asyncio.Redis` instance +            key_prefix: Key prefix to use for storing data in redis +            stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the +                :meth:`stream_events` generator, should no subscribers exist +        """ +        super().__init__( +            redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix +        ) +        self.__pub_sub: PubSub | None = None +        self._publish_script = self._redis.register_script(_PUBSUB_PUBLISH_SCRIPT) +        self._has_subscribed = _LazyEvent() + +    @property +    def _pub_sub(self) -> PubSub: +        if self.__pub_sub is None: +            self.__pub_sub = self._redis.pubsub() +        return self.__pub_sub + +    async def on_startup(self) -> None: +        # this method should not do anything in this case +        pass + +    async def on_shutdown(self) -> None: +        await self._pub_sub.reset() + +    async def subscribe(self, channels: Iterable[str]) -> None: +        """Subscribe to ``channels``, and enable publishing to them""" +        await self._pub_sub.subscribe(*channels) +        self._has_subscribed.set() + +    async def unsubscribe(self, channels: Iterable[str]) -> None: +        """Stop listening for events on ``channels``""" +        await self._pub_sub.unsubscribe(*channels) +        # if we have no active subscriptions, or only subscriptions which are pending +        # to be unsubscribed we consider the backend to be unsubscribed from all +        # channels, so we reset the event +        if not self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels: +            self._has_subscribed.clear() + +    async def publish(self, data: bytes, channels: Iterable[str]) -> None: +        """Publish ``data`` to ``channels`` + +        .. note:: +            This operation is performed atomically, using a lua script +        """ +        await self._publish_script(keys=list(set(channels)), args=[data]) + +    async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]: +        """Return a generator, iterating over events of subscribed channels as they become available. + +        If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions`` +        milliseconds. +        """ + +        while True: +            await self._has_subscribed.wait() +            message = await self._pub_sub.get_message( +                ignore_subscribe_messages=True, timeout=self._stream_sleep_no_subscriptions +            ) +            if message is None: +                continue + +            channel: str = message["channel"].decode() +            data: bytes = message["data"] +            # redis handles the unsubscibes with a queue; Unsubscribing doesn't mean the +            # unsubscribe will happen immediately after requesting it, so we could +            # receive a message on a channel that, from a client's perspective, it's not +            # subscribed to anymore +            if channel.encode() in self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels: +                yield channel, data + +    async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: +        """Not implemented""" +        raise NotImplementedError() + + +class RedisChannelsStreamBackend(RedisChannelsBackend): +    def __init__( +        self, +        history: int, +        *, +        redis: Redis, +        stream_sleep_no_subscriptions: int = 1, +        cap_streams_approximate: bool = True, +        stream_ttl: int | timedelta = timedelta(seconds=60), +        key_prefix: str = "LITESTAR_CHANNELS", +    ) -> None: +        """Redis channels backend, `streams <https://redis.io/docs/data-types/streams/>`_. + +        Args: +            history: Amount of messages to keep. This will set a ``MAXLEN`` to the streams +            redis: A :class:`redis.asyncio.Redis` instance +            key_prefix: Key prefix to use for streams +            stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the +                :meth:`stream_events` generator, should no subscribers exist +            cap_streams_approximate: Set the streams ``MAXLEN`` using the ``~`` approximation +                operator for improved performance +            stream_ttl: TTL of a stream in milliseconds or as a timedelta. A streams TTL will be set on each publishing +                operation using ``PEXPIRE`` +        """ +        super().__init__( +            redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix +        ) + +        self._history_limit = history +        self._subscribed_channels: set[str] = set() +        self._cap_streams_approximate = cap_streams_approximate +        self._stream_ttl = stream_ttl if isinstance(stream_ttl, int) else int(stream_ttl.total_seconds() * 1000) +        self._flush_all_streams_script = self._redis.register_script(_FLUSHALL_STREAMS_SCRIPT) +        self._publish_script = self._redis.register_script(_XADD_EXPIRE_SCRIPT) +        self._has_subscribed_channels = _LazyEvent() + +    async def on_startup(self) -> None: +        """Called on application startup""" + +    async def on_shutdown(self) -> None: +        """Called on application shutdown""" + +    async def subscribe(self, channels: Iterable[str]) -> None: +        """Subscribe to ``channels``""" +        self._subscribed_channels.update(channels) +        self._has_subscribed_channels.set() + +    async def unsubscribe(self, channels: Iterable[str]) -> None: +        """Unsubscribe from ``channels``""" +        self._subscribed_channels -= set(channels) +        if not len(self._subscribed_channels): +            self._has_subscribed_channels.clear() + +    async def publish(self, data: bytes, channels: Iterable[str]) -> None: +        """Publish ``data`` to ``channels``. + +        .. note:: +            This operation is performed atomically, using a Lua script +        """ +        channels = set(channels) +        await self._publish_script( +            keys=[self._make_key(key) for key in channels], +            args=[ +                data, +                self._history_limit, +                self._stream_ttl, +                int(self._cap_streams_approximate), +                *channels, +            ], +        ) + +    async def _get_subscribed_channels(self) -> set[str]: +        """Get subscribed channels. If no channels are currently subscribed, wait""" +        await self._has_subscribed_channels.wait() +        return self._subscribed_channels + +    async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]: +        """Return a generator, iterating over events of subscribed channels as they become available. + +        If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions`` +        milliseconds. +        """ +        stream_ids: dict[str, bytes] = {} +        while True: +            # We wait for subscribed channels, because we can't pass an empty dict to +            # xread and block for subscribers +            stream_keys = [self._make_key(c) for c in await self._get_subscribed_channels()] + +            data: list[tuple[bytes, list[tuple[bytes, dict[bytes, bytes]]]]] = await self._redis.xread( +                {key: stream_ids.get(key, 0) for key in stream_keys}, block=self._stream_sleep_no_subscriptions +            ) + +            if not data: +                continue + +            for stream_key, channel_events in data: +                for event in channel_events: +                    event_data = event[1][b"data"] +                    channel_name = event[1][b"channel"].decode() +                    stream_ids[stream_key.decode()] = event[0] +                    yield channel_name, event_data + +    async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]: +        """Return the history of ``channels``, returning at most ``limit`` messages""" +        data: Iterable[tuple[bytes, dict[bytes, bytes]]] +        if limit: +            data = reversed(await self._redis.xrevrange(self._make_key(channel), count=limit)) +        else: +            data = await self._redis.xrange(self._make_key(channel)) + +        return [event[b"data"] for _, event in data] + +    async def flush_all(self) -> int: +        """Delete all stream keys with the ``key_prefix``. + +        .. important:: +            This method is incompatible with redis clusters +        """ +        deleted_streams = await self._flush_all_streams_script(keys=[], args=[f"{self._key_prefix}*"]) +        return cast("int", deleted_streams) | 
