summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/channels/backends
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/channels/backends
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/channels/backends')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py0
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pycbin0 -> 210 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pycbin0 -> 6422 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pycbin0 -> 2607 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pycbin0 -> 5024 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pycbin0 -> 5220 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pycbin0 -> 17950 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua15
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua5
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua13
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py90
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/base.py41
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py84
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py54
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py277
15 files changed, 579 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000..ab4e477
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc
new file mode 100644
index 0000000..a577096
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc
new file mode 100644
index 0000000..334d295
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc
new file mode 100644
index 0000000..9a87da5
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc
new file mode 100644
index 0000000..f663280
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc
new file mode 100644
index 0000000..bf86a3e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua
new file mode 100644
index 0000000..a3faa6e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua
@@ -0,0 +1,15 @@
+local key_pattern = ARGV[1]
+
+local cursor = 0
+local deleted_streams = 0
+
+repeat
+ local result = redis.call('SCAN', cursor, 'MATCH', key_pattern)
+ for _,key in ipairs(result[2]) do
+ redis.call('DEL', key)
+ deleted_streams = deleted_streams + 1
+ end
+ cursor = tonumber(result[1])
+until cursor == 0
+
+return deleted_streams
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua
new file mode 100644
index 0000000..8402d08
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua
@@ -0,0 +1,5 @@
+local data = ARGV[1]
+
+for _, channel in ipairs(KEYS) do
+ redis.call("PUBLISH", channel, data)
+end
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua
new file mode 100644
index 0000000..f6b322f
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua
@@ -0,0 +1,13 @@
+local data = ARGV[1]
+local limit = ARGV[2]
+local exp = ARGV[3]
+local maxlen_approx = ARGV[4]
+
+for i, key in ipairs(KEYS) do
+ if maxlen_approx == 1 then
+ redis.call("XADD", key, "MAXLEN", "~", limit, "*", "data", data, "channel", ARGV[i + 4])
+ else
+ redis.call("XADD", key, "MAXLEN", limit, "*", "data", data, "channel", ARGV[i + 4])
+ end
+ redis.call("PEXPIRE", key, exp)
+end
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py
new file mode 100644
index 0000000..967b208
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py
@@ -0,0 +1,90 @@
+from __future__ import annotations
+
+import asyncio
+from contextlib import AsyncExitStack
+from functools import partial
+from typing import AsyncGenerator, Awaitable, Callable, Iterable, overload
+
+import asyncpg
+
+from litestar.channels import ChannelsBackend
+from litestar.exceptions import ImproperlyConfiguredException
+
+
+class AsyncPgChannelsBackend(ChannelsBackend):
+ _listener_conn: asyncpg.Connection
+
+ @overload
+ def __init__(self, dsn: str) -> None: ...
+
+ @overload
+ def __init__(
+ self,
+ *,
+ make_connection: Callable[[], Awaitable[asyncpg.Connection]],
+ ) -> None: ...
+
+ def __init__(
+ self,
+ dsn: str | None = None,
+ *,
+ make_connection: Callable[[], Awaitable[asyncpg.Connection]] | None = None,
+ ) -> None:
+ if not (dsn or make_connection):
+ raise ImproperlyConfiguredException("Need to specify dsn or make_connection")
+
+ self._subscribed_channels: set[str] = set()
+ self._exit_stack = AsyncExitStack()
+ self._connect = make_connection or partial(asyncpg.connect, dsn=dsn)
+ self._queue: asyncio.Queue[tuple[str, bytes]] | None = None
+
+ async def on_startup(self) -> None:
+ self._queue = asyncio.Queue()
+ self._listener_conn = await self._connect()
+
+ async def on_shutdown(self) -> None:
+ await self._listener_conn.close()
+ self._queue = None
+
+ async def publish(self, data: bytes, channels: Iterable[str]) -> None:
+ if self._queue is None:
+ raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
+
+ dec_data = data.decode("utf-8")
+
+ conn = await self._connect()
+ try:
+ for channel in channels:
+ await conn.execute("SELECT pg_notify($1, $2);", channel, dec_data)
+ finally:
+ await conn.close()
+
+ async def subscribe(self, channels: Iterable[str]) -> None:
+ for channel in set(channels) - self._subscribed_channels:
+ await self._listener_conn.add_listener(channel, self._listener) # type: ignore[arg-type]
+ self._subscribed_channels.add(channel)
+
+ async def unsubscribe(self, channels: Iterable[str]) -> None:
+ for channel in channels:
+ await self._listener_conn.remove_listener(channel, self._listener) # type: ignore[arg-type]
+ self._subscribed_channels = self._subscribed_channels - set(channels)
+
+ async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]:
+ if self._queue is None:
+ raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
+
+ while True:
+ channel, message = await self._queue.get()
+ self._queue.task_done()
+ # an UNLISTEN may be in transit while we're getting here, so we double-check
+ # that we are actually supposed to deliver this message
+ if channel in self._subscribed_channels:
+ yield channel, message
+
+ async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
+ raise NotImplementedError()
+
+ def _listener(self, /, connection: asyncpg.Connection, pid: int, channel: str, payload: object) -> None:
+ if not isinstance(payload, str):
+ raise RuntimeError("Invalid data received")
+ self._queue.put_nowait((channel, payload.encode("utf-8"))) # type: ignore[union-attr]
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py
new file mode 100644
index 0000000..ce7ee81
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py
@@ -0,0 +1,41 @@
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from typing import AsyncGenerator, Iterable
+
+
+class ChannelsBackend(ABC):
+ @abstractmethod
+ async def on_startup(self) -> None:
+ """Called by the plugin on application startup"""
+ ...
+
+ @abstractmethod
+ async def on_shutdown(self) -> None:
+ """Called by the plugin on application shutdown"""
+ ...
+
+ @abstractmethod
+ async def publish(self, data: bytes, channels: Iterable[str]) -> None:
+ """Publish the message ``data`` to all ``channels``"""
+ ...
+
+ @abstractmethod
+ async def subscribe(self, channels: Iterable[str]) -> None:
+ """Start listening for events on ``channels``"""
+ ...
+
+ @abstractmethod
+ async def unsubscribe(self, channels: Iterable[str]) -> None:
+ """Stop listening for events on ``channels``"""
+ ...
+
+ @abstractmethod
+ def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]:
+ """Return a generator, iterating over events of subscribed channels as they become available"""
+ ...
+
+ @abstractmethod
+ async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
+ """Return the event history of ``channel``, at most ``limit`` entries"""
+ ...
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py
new file mode 100644
index 0000000..a96a66b
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py
@@ -0,0 +1,84 @@
+from __future__ import annotations
+
+from asyncio import Queue
+from collections import defaultdict, deque
+from typing import Any, AsyncGenerator, Iterable
+
+from litestar.channels.backends.base import ChannelsBackend
+
+
+class MemoryChannelsBackend(ChannelsBackend):
+ """An in-memory channels backend"""
+
+ def __init__(self, history: int = 0) -> None:
+ self._max_history_length = history
+ self._channels: set[str] = set()
+ self._queue: Queue[tuple[str, bytes]] | None = None
+ self._history: defaultdict[str, deque[bytes]] = defaultdict(lambda: deque(maxlen=self._max_history_length))
+
+ async def on_startup(self) -> None:
+ self._queue = Queue()
+
+ async def on_shutdown(self) -> None:
+ self._queue = None
+
+ async def publish(self, data: bytes, channels: Iterable[str]) -> None:
+ """Publish ``data`` to ``channels``. If a channel has not yet been subscribed to,
+ this will be a no-op.
+
+ Args:
+ data: Data to publish
+ channels: Channels to publish to
+
+ Returns:
+ None
+
+ Raises:
+ RuntimeError: If ``on_startup`` has not been called yet
+ """
+ if not self._queue:
+ raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
+
+ for channel in channels:
+ if channel not in self._channels:
+ continue
+
+ self._queue.put_nowait((channel, data))
+ if self._max_history_length:
+ for channel in channels:
+ self._history[channel].append(data)
+
+ async def subscribe(self, channels: Iterable[str]) -> None:
+ """Subscribe to ``channels``, and enable publishing to them"""
+ self._channels.update(channels)
+
+ async def unsubscribe(self, channels: Iterable[str]) -> None:
+ """Unsubscribe from ``channels``"""
+ self._channels -= set(channels)
+ try:
+ for channel in channels:
+ del self._history[channel]
+ except KeyError:
+ pass
+
+ async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]:
+ """Return a generator, iterating over events of subscribed channels as they become available"""
+ if self._queue is None:
+ raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
+
+ while True:
+ channel, message = await self._queue.get()
+ self._queue.task_done()
+
+ # if a message is published to a channel and the channel is then
+ # unsubscribed before retrieving that message from the stream, it can still
+ # end up here, so we double-check if we still are interested in this message
+ if channel in self._channels:
+ yield channel, message
+
+ async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
+ """Return the event history of ``channel``, at most ``limit`` entries"""
+ history = list(self._history[channel])
+ if limit:
+ history = history[-limit:]
+ return history
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py
new file mode 100644
index 0000000..14b53bc
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py
@@ -0,0 +1,54 @@
+from __future__ import annotations
+
+from contextlib import AsyncExitStack
+from typing import AsyncGenerator, Iterable
+
+import psycopg
+
+from .base import ChannelsBackend
+
+
+def _safe_quote(ident: str) -> str:
+ return '"{}"'.format(ident.replace('"', '""')) # sourcery skip
+
+
+class PsycoPgChannelsBackend(ChannelsBackend):
+ _listener_conn: psycopg.AsyncConnection
+
+ def __init__(self, pg_dsn: str) -> None:
+ self._pg_dsn = pg_dsn
+ self._subscribed_channels: set[str] = set()
+ self._exit_stack = AsyncExitStack()
+
+ async def on_startup(self) -> None:
+ self._listener_conn = await psycopg.AsyncConnection.connect(self._pg_dsn, autocommit=True)
+ await self._exit_stack.enter_async_context(self._listener_conn)
+
+ async def on_shutdown(self) -> None:
+ await self._exit_stack.aclose()
+
+ async def publish(self, data: bytes, channels: Iterable[str]) -> None:
+ dec_data = data.decode("utf-8")
+ async with await psycopg.AsyncConnection.connect(self._pg_dsn) as conn:
+ for channel in channels:
+ await conn.execute("SELECT pg_notify(%s, %s);", (channel, dec_data))
+
+ async def subscribe(self, channels: Iterable[str]) -> None:
+ for channel in set(channels) - self._subscribed_channels:
+ # can't use placeholders in LISTEN
+ await self._listener_conn.execute(f"LISTEN {_safe_quote(channel)};") # pyright: ignore
+
+ self._subscribed_channels.add(channel)
+
+ async def unsubscribe(self, channels: Iterable[str]) -> None:
+ for channel in channels:
+ # can't use placeholders in UNLISTEN
+ await self._listener_conn.execute(f"UNLISTEN {_safe_quote(channel)};") # pyright: ignore
+ self._subscribed_channels = self._subscribed_channels - set(channels)
+
+ async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]:
+ async for notify in self._listener_conn.notifies():
+ yield notify.channel, notify.payload.encode("utf-8")
+
+ async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
+ raise NotImplementedError()
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py
new file mode 100644
index 0000000..f03c9f2
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py
@@ -0,0 +1,277 @@
+from __future__ import annotations
+
+import asyncio
+import sys
+
+if sys.version_info < (3, 9):
+ import importlib_resources # pyright: ignore
+else:
+ import importlib.resources as importlib_resources
+from abc import ABC
+from datetime import timedelta
+from typing import TYPE_CHECKING, Any, AsyncGenerator, Iterable, cast
+
+from litestar.channels.backends.base import ChannelsBackend
+
+if TYPE_CHECKING:
+ from redis.asyncio import Redis
+ from redis.asyncio.client import PubSub
+
+_resource_path = importlib_resources.files("litestar.channels.backends")
+_PUBSUB_PUBLISH_SCRIPT = (_resource_path / "_redis_pubsub_publish.lua").read_text()
+_FLUSHALL_STREAMS_SCRIPT = (_resource_path / "_redis_flushall_streams.lua").read_text()
+_XADD_EXPIRE_SCRIPT = (_resource_path / "_redis_xadd_expire.lua").read_text()
+
+
+class _LazyEvent:
+ """A lazy proxy to asyncio.Event that only creates the event once it's accessed.
+
+ It ensures that the Event is created within a running event loop. If it's not, there can be an issue where a future
+ within the event itself is attached to a different loop.
+
+ This happens in our tests and could also happen when a user creates an instance of the backend outside an event loop
+ in their application.
+ """
+
+ def __init__(self) -> None:
+ self.__event: asyncio.Event | None = None
+
+ @property
+ def _event(self) -> asyncio.Event:
+ if self.__event is None:
+ self.__event = asyncio.Event()
+ return self.__event
+
+ def set(self) -> None:
+ self._event.set()
+
+ def clear(self) -> None:
+ self._event.clear()
+
+ async def wait(self) -> None:
+ await self._event.wait()
+
+
+class RedisChannelsBackend(ChannelsBackend, ABC):
+ def __init__(self, *, redis: Redis, key_prefix: str, stream_sleep_no_subscriptions: int) -> None:
+ """Base redis channels backend.
+
+ Args:
+ redis: A :class:`redis.asyncio.Redis` instance
+ key_prefix: Key prefix to use for storing data in redis
+ stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the
+ :meth:`stream_events` generator, should no subscribers exist
+ """
+ self._redis = redis
+ self._key_prefix = key_prefix
+ self._stream_sleep_no_subscriptions = stream_sleep_no_subscriptions
+
+ def _make_key(self, channel: str) -> str:
+ return f"{self._key_prefix}_{channel.upper()}"
+
+
+class RedisChannelsPubSubBackend(RedisChannelsBackend):
+ def __init__(
+ self, *, redis: Redis, stream_sleep_no_subscriptions: int = 1, key_prefix: str = "LITESTAR_CHANNELS"
+ ) -> None:
+ """Redis channels backend, `Pub/Sub <https://redis.io/docs/manual/pubsub/>`_.
+
+ This backend provides low overhead and resource usage but no support for history.
+
+ Args:
+ redis: A :class:`redis.asyncio.Redis` instance
+ key_prefix: Key prefix to use for storing data in redis
+ stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the
+ :meth:`stream_events` generator, should no subscribers exist
+ """
+ super().__init__(
+ redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix
+ )
+ self.__pub_sub: PubSub | None = None
+ self._publish_script = self._redis.register_script(_PUBSUB_PUBLISH_SCRIPT)
+ self._has_subscribed = _LazyEvent()
+
+ @property
+ def _pub_sub(self) -> PubSub:
+ if self.__pub_sub is None:
+ self.__pub_sub = self._redis.pubsub()
+ return self.__pub_sub
+
+ async def on_startup(self) -> None:
+ # this method should not do anything in this case
+ pass
+
+ async def on_shutdown(self) -> None:
+ await self._pub_sub.reset()
+
+ async def subscribe(self, channels: Iterable[str]) -> None:
+ """Subscribe to ``channels``, and enable publishing to them"""
+ await self._pub_sub.subscribe(*channels)
+ self._has_subscribed.set()
+
+ async def unsubscribe(self, channels: Iterable[str]) -> None:
+ """Stop listening for events on ``channels``"""
+ await self._pub_sub.unsubscribe(*channels)
+ # if we have no active subscriptions, or only subscriptions which are pending
+ # to be unsubscribed we consider the backend to be unsubscribed from all
+ # channels, so we reset the event
+ if not self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels:
+ self._has_subscribed.clear()
+
+ async def publish(self, data: bytes, channels: Iterable[str]) -> None:
+ """Publish ``data`` to ``channels``
+
+ .. note::
+ This operation is performed atomically, using a lua script
+ """
+ await self._publish_script(keys=list(set(channels)), args=[data])
+
+ async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]:
+ """Return a generator, iterating over events of subscribed channels as they become available.
+
+ If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions``
+ milliseconds.
+ """
+
+ while True:
+ await self._has_subscribed.wait()
+ message = await self._pub_sub.get_message(
+ ignore_subscribe_messages=True, timeout=self._stream_sleep_no_subscriptions
+ )
+ if message is None:
+ continue
+
+ channel: str = message["channel"].decode()
+ data: bytes = message["data"]
+ # redis handles the unsubscibes with a queue; Unsubscribing doesn't mean the
+ # unsubscribe will happen immediately after requesting it, so we could
+ # receive a message on a channel that, from a client's perspective, it's not
+ # subscribed to anymore
+ if channel.encode() in self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels:
+ yield channel, data
+
+ async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
+ """Not implemented"""
+ raise NotImplementedError()
+
+
+class RedisChannelsStreamBackend(RedisChannelsBackend):
+ def __init__(
+ self,
+ history: int,
+ *,
+ redis: Redis,
+ stream_sleep_no_subscriptions: int = 1,
+ cap_streams_approximate: bool = True,
+ stream_ttl: int | timedelta = timedelta(seconds=60),
+ key_prefix: str = "LITESTAR_CHANNELS",
+ ) -> None:
+ """Redis channels backend, `streams <https://redis.io/docs/data-types/streams/>`_.
+
+ Args:
+ history: Amount of messages to keep. This will set a ``MAXLEN`` to the streams
+ redis: A :class:`redis.asyncio.Redis` instance
+ key_prefix: Key prefix to use for streams
+ stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the
+ :meth:`stream_events` generator, should no subscribers exist
+ cap_streams_approximate: Set the streams ``MAXLEN`` using the ``~`` approximation
+ operator for improved performance
+ stream_ttl: TTL of a stream in milliseconds or as a timedelta. A streams TTL will be set on each publishing
+ operation using ``PEXPIRE``
+ """
+ super().__init__(
+ redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix
+ )
+
+ self._history_limit = history
+ self._subscribed_channels: set[str] = set()
+ self._cap_streams_approximate = cap_streams_approximate
+ self._stream_ttl = stream_ttl if isinstance(stream_ttl, int) else int(stream_ttl.total_seconds() * 1000)
+ self._flush_all_streams_script = self._redis.register_script(_FLUSHALL_STREAMS_SCRIPT)
+ self._publish_script = self._redis.register_script(_XADD_EXPIRE_SCRIPT)
+ self._has_subscribed_channels = _LazyEvent()
+
+ async def on_startup(self) -> None:
+ """Called on application startup"""
+
+ async def on_shutdown(self) -> None:
+ """Called on application shutdown"""
+
+ async def subscribe(self, channels: Iterable[str]) -> None:
+ """Subscribe to ``channels``"""
+ self._subscribed_channels.update(channels)
+ self._has_subscribed_channels.set()
+
+ async def unsubscribe(self, channels: Iterable[str]) -> None:
+ """Unsubscribe from ``channels``"""
+ self._subscribed_channels -= set(channels)
+ if not len(self._subscribed_channels):
+ self._has_subscribed_channels.clear()
+
+ async def publish(self, data: bytes, channels: Iterable[str]) -> None:
+ """Publish ``data`` to ``channels``.
+
+ .. note::
+ This operation is performed atomically, using a Lua script
+ """
+ channels = set(channels)
+ await self._publish_script(
+ keys=[self._make_key(key) for key in channels],
+ args=[
+ data,
+ self._history_limit,
+ self._stream_ttl,
+ int(self._cap_streams_approximate),
+ *channels,
+ ],
+ )
+
+ async def _get_subscribed_channels(self) -> set[str]:
+ """Get subscribed channels. If no channels are currently subscribed, wait"""
+ await self._has_subscribed_channels.wait()
+ return self._subscribed_channels
+
+ async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]:
+ """Return a generator, iterating over events of subscribed channels as they become available.
+
+ If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions``
+ milliseconds.
+ """
+ stream_ids: dict[str, bytes] = {}
+ while True:
+ # We wait for subscribed channels, because we can't pass an empty dict to
+ # xread and block for subscribers
+ stream_keys = [self._make_key(c) for c in await self._get_subscribed_channels()]
+
+ data: list[tuple[bytes, list[tuple[bytes, dict[bytes, bytes]]]]] = await self._redis.xread(
+ {key: stream_ids.get(key, 0) for key in stream_keys}, block=self._stream_sleep_no_subscriptions
+ )
+
+ if not data:
+ continue
+
+ for stream_key, channel_events in data:
+ for event in channel_events:
+ event_data = event[1][b"data"]
+ channel_name = event[1][b"channel"].decode()
+ stream_ids[stream_key.decode()] = event[0]
+ yield channel_name, event_data
+
+ async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
+ """Return the history of ``channels``, returning at most ``limit`` messages"""
+ data: Iterable[tuple[bytes, dict[bytes, bytes]]]
+ if limit:
+ data = reversed(await self._redis.xrevrange(self._make_key(channel), count=limit))
+ else:
+ data = await self._redis.xrange(self._make_key(channel))
+
+ return [event[b"data"] for _, event in data]
+
+ async def flush_all(self) -> int:
+ """Delete all stream keys with the ``key_prefix``.
+
+ .. important::
+ This method is incompatible with redis clusters
+ """
+ deleted_streams = await self._flush_all_streams_script(keys=[], args=[f"{self._key_prefix}*"])
+ return cast("int", deleted_streams)