summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/channels
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/litestar/channels
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/channels')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/__init__.py5
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/__pycache__/__init__.cpython-311.pycbin423 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/__pycache__/plugin.cpython-311.pycbin23104 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/__pycache__/subscriber.cpython-311.pycbin8595 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py0
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pycbin210 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pycbin6422 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pycbin2607 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pycbin5024 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pycbin5220 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pycbin17950 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua15
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua5
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua13
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py90
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/base.py41
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py84
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py54
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py277
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/plugin.py359
-rw-r--r--venv/lib/python3.11/site-packages/litestar/channels/subscriber.py135
21 files changed, 0 insertions, 1078 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/__init__.py b/venv/lib/python3.11/site-packages/litestar/channels/__init__.py
deleted file mode 100644
index 0167223..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-from .backends.base import ChannelsBackend
-from .plugin import ChannelsPlugin
-from .subscriber import Subscriber
-
-__all__ = ("ChannelsPlugin", "ChannelsBackend", "Subscriber")
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index bf9d6bd..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/__pycache__/plugin.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/__pycache__/plugin.cpython-311.pyc
deleted file mode 100644
index 08361dc..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/__pycache__/plugin.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/__pycache__/subscriber.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/__pycache__/subscriber.cpython-311.pyc
deleted file mode 100644
index 8d609b4..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/__pycache__/subscriber.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__init__.py
+++ /dev/null
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index ab4e477..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc
deleted file mode 100644
index a577096..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/asyncpg.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc
deleted file mode 100644
index 334d295..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/base.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc
deleted file mode 100644
index 9a87da5..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/memory.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc
deleted file mode 100644
index f663280..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/psycopg.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc
deleted file mode 100644
index bf86a3e..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/__pycache__/redis.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua
deleted file mode 100644
index a3faa6e..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_flushall_streams.lua
+++ /dev/null
@@ -1,15 +0,0 @@
-local key_pattern = ARGV[1]
-
-local cursor = 0
-local deleted_streams = 0
-
-repeat
- local result = redis.call('SCAN', cursor, 'MATCH', key_pattern)
- for _,key in ipairs(result[2]) do
- redis.call('DEL', key)
- deleted_streams = deleted_streams + 1
- end
- cursor = tonumber(result[1])
-until cursor == 0
-
-return deleted_streams
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua
deleted file mode 100644
index 8402d08..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_pubsub_publish.lua
+++ /dev/null
@@ -1,5 +0,0 @@
-local data = ARGV[1]
-
-for _, channel in ipairs(KEYS) do
- redis.call("PUBLISH", channel, data)
-end
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua b/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua
deleted file mode 100644
index f6b322f..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/_redis_xadd_expire.lua
+++ /dev/null
@@ -1,13 +0,0 @@
-local data = ARGV[1]
-local limit = ARGV[2]
-local exp = ARGV[3]
-local maxlen_approx = ARGV[4]
-
-for i, key in ipairs(KEYS) do
- if maxlen_approx == 1 then
- redis.call("XADD", key, "MAXLEN", "~", limit, "*", "data", data, "channel", ARGV[i + 4])
- else
- redis.call("XADD", key, "MAXLEN", limit, "*", "data", data, "channel", ARGV[i + 4])
- end
- redis.call("PEXPIRE", key, exp)
-end
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py
deleted file mode 100644
index 967b208..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/asyncpg.py
+++ /dev/null
@@ -1,90 +0,0 @@
-from __future__ import annotations
-
-import asyncio
-from contextlib import AsyncExitStack
-from functools import partial
-from typing import AsyncGenerator, Awaitable, Callable, Iterable, overload
-
-import asyncpg
-
-from litestar.channels import ChannelsBackend
-from litestar.exceptions import ImproperlyConfiguredException
-
-
-class AsyncPgChannelsBackend(ChannelsBackend):
- _listener_conn: asyncpg.Connection
-
- @overload
- def __init__(self, dsn: str) -> None: ...
-
- @overload
- def __init__(
- self,
- *,
- make_connection: Callable[[], Awaitable[asyncpg.Connection]],
- ) -> None: ...
-
- def __init__(
- self,
- dsn: str | None = None,
- *,
- make_connection: Callable[[], Awaitable[asyncpg.Connection]] | None = None,
- ) -> None:
- if not (dsn or make_connection):
- raise ImproperlyConfiguredException("Need to specify dsn or make_connection")
-
- self._subscribed_channels: set[str] = set()
- self._exit_stack = AsyncExitStack()
- self._connect = make_connection or partial(asyncpg.connect, dsn=dsn)
- self._queue: asyncio.Queue[tuple[str, bytes]] | None = None
-
- async def on_startup(self) -> None:
- self._queue = asyncio.Queue()
- self._listener_conn = await self._connect()
-
- async def on_shutdown(self) -> None:
- await self._listener_conn.close()
- self._queue = None
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- if self._queue is None:
- raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
-
- dec_data = data.decode("utf-8")
-
- conn = await self._connect()
- try:
- for channel in channels:
- await conn.execute("SELECT pg_notify($1, $2);", channel, dec_data)
- finally:
- await conn.close()
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- for channel in set(channels) - self._subscribed_channels:
- await self._listener_conn.add_listener(channel, self._listener) # type: ignore[arg-type]
- self._subscribed_channels.add(channel)
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- for channel in channels:
- await self._listener_conn.remove_listener(channel, self._listener) # type: ignore[arg-type]
- self._subscribed_channels = self._subscribed_channels - set(channels)
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]:
- if self._queue is None:
- raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
-
- while True:
- channel, message = await self._queue.get()
- self._queue.task_done()
- # an UNLISTEN may be in transit while we're getting here, so we double-check
- # that we are actually supposed to deliver this message
- if channel in self._subscribed_channels:
- yield channel, message
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- raise NotImplementedError()
-
- def _listener(self, /, connection: asyncpg.Connection, pid: int, channel: str, payload: object) -> None:
- if not isinstance(payload, str):
- raise RuntimeError("Invalid data received")
- self._queue.put_nowait((channel, payload.encode("utf-8"))) # type: ignore[union-attr]
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py
deleted file mode 100644
index ce7ee81..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/base.py
+++ /dev/null
@@ -1,41 +0,0 @@
-from __future__ import annotations
-
-from abc import ABC, abstractmethod
-from typing import AsyncGenerator, Iterable
-
-
-class ChannelsBackend(ABC):
- @abstractmethod
- async def on_startup(self) -> None:
- """Called by the plugin on application startup"""
- ...
-
- @abstractmethod
- async def on_shutdown(self) -> None:
- """Called by the plugin on application shutdown"""
- ...
-
- @abstractmethod
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- """Publish the message ``data`` to all ``channels``"""
- ...
-
- @abstractmethod
- async def subscribe(self, channels: Iterable[str]) -> None:
- """Start listening for events on ``channels``"""
- ...
-
- @abstractmethod
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- """Stop listening for events on ``channels``"""
- ...
-
- @abstractmethod
- def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]:
- """Return a generator, iterating over events of subscribed channels as they become available"""
- ...
-
- @abstractmethod
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- """Return the event history of ``channel``, at most ``limit`` entries"""
- ...
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py
deleted file mode 100644
index a96a66b..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/memory.py
+++ /dev/null
@@ -1,84 +0,0 @@
-from __future__ import annotations
-
-from asyncio import Queue
-from collections import defaultdict, deque
-from typing import Any, AsyncGenerator, Iterable
-
-from litestar.channels.backends.base import ChannelsBackend
-
-
-class MemoryChannelsBackend(ChannelsBackend):
- """An in-memory channels backend"""
-
- def __init__(self, history: int = 0) -> None:
- self._max_history_length = history
- self._channels: set[str] = set()
- self._queue: Queue[tuple[str, bytes]] | None = None
- self._history: defaultdict[str, deque[bytes]] = defaultdict(lambda: deque(maxlen=self._max_history_length))
-
- async def on_startup(self) -> None:
- self._queue = Queue()
-
- async def on_shutdown(self) -> None:
- self._queue = None
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- """Publish ``data`` to ``channels``. If a channel has not yet been subscribed to,
- this will be a no-op.
-
- Args:
- data: Data to publish
- channels: Channels to publish to
-
- Returns:
- None
-
- Raises:
- RuntimeError: If ``on_startup`` has not been called yet
- """
- if not self._queue:
- raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
-
- for channel in channels:
- if channel not in self._channels:
- continue
-
- self._queue.put_nowait((channel, data))
- if self._max_history_length:
- for channel in channels:
- self._history[channel].append(data)
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- """Subscribe to ``channels``, and enable publishing to them"""
- self._channels.update(channels)
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- """Unsubscribe from ``channels``"""
- self._channels -= set(channels)
- try:
- for channel in channels:
- del self._history[channel]
- except KeyError:
- pass
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]:
- """Return a generator, iterating over events of subscribed channels as they become available"""
- if self._queue is None:
- raise RuntimeError("Backend not yet initialized. Did you forget to call on_startup?")
-
- while True:
- channel, message = await self._queue.get()
- self._queue.task_done()
-
- # if a message is published to a channel and the channel is then
- # unsubscribed before retrieving that message from the stream, it can still
- # end up here, so we double-check if we still are interested in this message
- if channel in self._channels:
- yield channel, message
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- """Return the event history of ``channel``, at most ``limit`` entries"""
- history = list(self._history[channel])
- if limit:
- history = history[-limit:]
- return history
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py
deleted file mode 100644
index 14b53bc..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/psycopg.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from __future__ import annotations
-
-from contextlib import AsyncExitStack
-from typing import AsyncGenerator, Iterable
-
-import psycopg
-
-from .base import ChannelsBackend
-
-
-def _safe_quote(ident: str) -> str:
- return '"{}"'.format(ident.replace('"', '""')) # sourcery skip
-
-
-class PsycoPgChannelsBackend(ChannelsBackend):
- _listener_conn: psycopg.AsyncConnection
-
- def __init__(self, pg_dsn: str) -> None:
- self._pg_dsn = pg_dsn
- self._subscribed_channels: set[str] = set()
- self._exit_stack = AsyncExitStack()
-
- async def on_startup(self) -> None:
- self._listener_conn = await psycopg.AsyncConnection.connect(self._pg_dsn, autocommit=True)
- await self._exit_stack.enter_async_context(self._listener_conn)
-
- async def on_shutdown(self) -> None:
- await self._exit_stack.aclose()
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- dec_data = data.decode("utf-8")
- async with await psycopg.AsyncConnection.connect(self._pg_dsn) as conn:
- for channel in channels:
- await conn.execute("SELECT pg_notify(%s, %s);", (channel, dec_data))
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- for channel in set(channels) - self._subscribed_channels:
- # can't use placeholders in LISTEN
- await self._listener_conn.execute(f"LISTEN {_safe_quote(channel)};") # pyright: ignore
-
- self._subscribed_channels.add(channel)
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- for channel in channels:
- # can't use placeholders in UNLISTEN
- await self._listener_conn.execute(f"UNLISTEN {_safe_quote(channel)};") # pyright: ignore
- self._subscribed_channels = self._subscribed_channels - set(channels)
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, bytes], None]:
- async for notify in self._listener_conn.notifies():
- yield notify.channel, notify.payload.encode("utf-8")
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- raise NotImplementedError()
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py b/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py
deleted file mode 100644
index f03c9f2..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/backends/redis.py
+++ /dev/null
@@ -1,277 +0,0 @@
-from __future__ import annotations
-
-import asyncio
-import sys
-
-if sys.version_info < (3, 9):
- import importlib_resources # pyright: ignore
-else:
- import importlib.resources as importlib_resources
-from abc import ABC
-from datetime import timedelta
-from typing import TYPE_CHECKING, Any, AsyncGenerator, Iterable, cast
-
-from litestar.channels.backends.base import ChannelsBackend
-
-if TYPE_CHECKING:
- from redis.asyncio import Redis
- from redis.asyncio.client import PubSub
-
-_resource_path = importlib_resources.files("litestar.channels.backends")
-_PUBSUB_PUBLISH_SCRIPT = (_resource_path / "_redis_pubsub_publish.lua").read_text()
-_FLUSHALL_STREAMS_SCRIPT = (_resource_path / "_redis_flushall_streams.lua").read_text()
-_XADD_EXPIRE_SCRIPT = (_resource_path / "_redis_xadd_expire.lua").read_text()
-
-
-class _LazyEvent:
- """A lazy proxy to asyncio.Event that only creates the event once it's accessed.
-
- It ensures that the Event is created within a running event loop. If it's not, there can be an issue where a future
- within the event itself is attached to a different loop.
-
- This happens in our tests and could also happen when a user creates an instance of the backend outside an event loop
- in their application.
- """
-
- def __init__(self) -> None:
- self.__event: asyncio.Event | None = None
-
- @property
- def _event(self) -> asyncio.Event:
- if self.__event is None:
- self.__event = asyncio.Event()
- return self.__event
-
- def set(self) -> None:
- self._event.set()
-
- def clear(self) -> None:
- self._event.clear()
-
- async def wait(self) -> None:
- await self._event.wait()
-
-
-class RedisChannelsBackend(ChannelsBackend, ABC):
- def __init__(self, *, redis: Redis, key_prefix: str, stream_sleep_no_subscriptions: int) -> None:
- """Base redis channels backend.
-
- Args:
- redis: A :class:`redis.asyncio.Redis` instance
- key_prefix: Key prefix to use for storing data in redis
- stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the
- :meth:`stream_events` generator, should no subscribers exist
- """
- self._redis = redis
- self._key_prefix = key_prefix
- self._stream_sleep_no_subscriptions = stream_sleep_no_subscriptions
-
- def _make_key(self, channel: str) -> str:
- return f"{self._key_prefix}_{channel.upper()}"
-
-
-class RedisChannelsPubSubBackend(RedisChannelsBackend):
- def __init__(
- self, *, redis: Redis, stream_sleep_no_subscriptions: int = 1, key_prefix: str = "LITESTAR_CHANNELS"
- ) -> None:
- """Redis channels backend, `Pub/Sub <https://redis.io/docs/manual/pubsub/>`_.
-
- This backend provides low overhead and resource usage but no support for history.
-
- Args:
- redis: A :class:`redis.asyncio.Redis` instance
- key_prefix: Key prefix to use for storing data in redis
- stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the
- :meth:`stream_events` generator, should no subscribers exist
- """
- super().__init__(
- redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix
- )
- self.__pub_sub: PubSub | None = None
- self._publish_script = self._redis.register_script(_PUBSUB_PUBLISH_SCRIPT)
- self._has_subscribed = _LazyEvent()
-
- @property
- def _pub_sub(self) -> PubSub:
- if self.__pub_sub is None:
- self.__pub_sub = self._redis.pubsub()
- return self.__pub_sub
-
- async def on_startup(self) -> None:
- # this method should not do anything in this case
- pass
-
- async def on_shutdown(self) -> None:
- await self._pub_sub.reset()
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- """Subscribe to ``channels``, and enable publishing to them"""
- await self._pub_sub.subscribe(*channels)
- self._has_subscribed.set()
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- """Stop listening for events on ``channels``"""
- await self._pub_sub.unsubscribe(*channels)
- # if we have no active subscriptions, or only subscriptions which are pending
- # to be unsubscribed we consider the backend to be unsubscribed from all
- # channels, so we reset the event
- if not self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels:
- self._has_subscribed.clear()
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- """Publish ``data`` to ``channels``
-
- .. note::
- This operation is performed atomically, using a lua script
- """
- await self._publish_script(keys=list(set(channels)), args=[data])
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]:
- """Return a generator, iterating over events of subscribed channels as they become available.
-
- If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions``
- milliseconds.
- """
-
- while True:
- await self._has_subscribed.wait()
- message = await self._pub_sub.get_message(
- ignore_subscribe_messages=True, timeout=self._stream_sleep_no_subscriptions
- )
- if message is None:
- continue
-
- channel: str = message["channel"].decode()
- data: bytes = message["data"]
- # redis handles the unsubscibes with a queue; Unsubscribing doesn't mean the
- # unsubscribe will happen immediately after requesting it, so we could
- # receive a message on a channel that, from a client's perspective, it's not
- # subscribed to anymore
- if channel.encode() in self._pub_sub.channels.keys() - self._pub_sub.pending_unsubscribe_channels:
- yield channel, data
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- """Not implemented"""
- raise NotImplementedError()
-
-
-class RedisChannelsStreamBackend(RedisChannelsBackend):
- def __init__(
- self,
- history: int,
- *,
- redis: Redis,
- stream_sleep_no_subscriptions: int = 1,
- cap_streams_approximate: bool = True,
- stream_ttl: int | timedelta = timedelta(seconds=60),
- key_prefix: str = "LITESTAR_CHANNELS",
- ) -> None:
- """Redis channels backend, `streams <https://redis.io/docs/data-types/streams/>`_.
-
- Args:
- history: Amount of messages to keep. This will set a ``MAXLEN`` to the streams
- redis: A :class:`redis.asyncio.Redis` instance
- key_prefix: Key prefix to use for streams
- stream_sleep_no_subscriptions: Amount of time in milliseconds to pause the
- :meth:`stream_events` generator, should no subscribers exist
- cap_streams_approximate: Set the streams ``MAXLEN`` using the ``~`` approximation
- operator for improved performance
- stream_ttl: TTL of a stream in milliseconds or as a timedelta. A streams TTL will be set on each publishing
- operation using ``PEXPIRE``
- """
- super().__init__(
- redis=redis, stream_sleep_no_subscriptions=stream_sleep_no_subscriptions, key_prefix=key_prefix
- )
-
- self._history_limit = history
- self._subscribed_channels: set[str] = set()
- self._cap_streams_approximate = cap_streams_approximate
- self._stream_ttl = stream_ttl if isinstance(stream_ttl, int) else int(stream_ttl.total_seconds() * 1000)
- self._flush_all_streams_script = self._redis.register_script(_FLUSHALL_STREAMS_SCRIPT)
- self._publish_script = self._redis.register_script(_XADD_EXPIRE_SCRIPT)
- self._has_subscribed_channels = _LazyEvent()
-
- async def on_startup(self) -> None:
- """Called on application startup"""
-
- async def on_shutdown(self) -> None:
- """Called on application shutdown"""
-
- async def subscribe(self, channels: Iterable[str]) -> None:
- """Subscribe to ``channels``"""
- self._subscribed_channels.update(channels)
- self._has_subscribed_channels.set()
-
- async def unsubscribe(self, channels: Iterable[str]) -> None:
- """Unsubscribe from ``channels``"""
- self._subscribed_channels -= set(channels)
- if not len(self._subscribed_channels):
- self._has_subscribed_channels.clear()
-
- async def publish(self, data: bytes, channels: Iterable[str]) -> None:
- """Publish ``data`` to ``channels``.
-
- .. note::
- This operation is performed atomically, using a Lua script
- """
- channels = set(channels)
- await self._publish_script(
- keys=[self._make_key(key) for key in channels],
- args=[
- data,
- self._history_limit,
- self._stream_ttl,
- int(self._cap_streams_approximate),
- *channels,
- ],
- )
-
- async def _get_subscribed_channels(self) -> set[str]:
- """Get subscribed channels. If no channels are currently subscribed, wait"""
- await self._has_subscribed_channels.wait()
- return self._subscribed_channels
-
- async def stream_events(self) -> AsyncGenerator[tuple[str, Any], None]:
- """Return a generator, iterating over events of subscribed channels as they become available.
-
- If no channels have been subscribed to yet via :meth:`subscribe`, sleep for ``stream_sleep_no_subscriptions``
- milliseconds.
- """
- stream_ids: dict[str, bytes] = {}
- while True:
- # We wait for subscribed channels, because we can't pass an empty dict to
- # xread and block for subscribers
- stream_keys = [self._make_key(c) for c in await self._get_subscribed_channels()]
-
- data: list[tuple[bytes, list[tuple[bytes, dict[bytes, bytes]]]]] = await self._redis.xread(
- {key: stream_ids.get(key, 0) for key in stream_keys}, block=self._stream_sleep_no_subscriptions
- )
-
- if not data:
- continue
-
- for stream_key, channel_events in data:
- for event in channel_events:
- event_data = event[1][b"data"]
- channel_name = event[1][b"channel"].decode()
- stream_ids[stream_key.decode()] = event[0]
- yield channel_name, event_data
-
- async def get_history(self, channel: str, limit: int | None = None) -> list[bytes]:
- """Return the history of ``channels``, returning at most ``limit`` messages"""
- data: Iterable[tuple[bytes, dict[bytes, bytes]]]
- if limit:
- data = reversed(await self._redis.xrevrange(self._make_key(channel), count=limit))
- else:
- data = await self._redis.xrange(self._make_key(channel))
-
- return [event[b"data"] for _, event in data]
-
- async def flush_all(self) -> int:
- """Delete all stream keys with the ``key_prefix``.
-
- .. important::
- This method is incompatible with redis clusters
- """
- deleted_streams = await self._flush_all_streams_script(keys=[], args=[f"{self._key_prefix}*"])
- return cast("int", deleted_streams)
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/plugin.py b/venv/lib/python3.11/site-packages/litestar/channels/plugin.py
deleted file mode 100644
index 5988445..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/plugin.py
+++ /dev/null
@@ -1,359 +0,0 @@
-from __future__ import annotations
-
-import asyncio
-from asyncio import CancelledError, Queue, Task, create_task
-from contextlib import AbstractAsyncContextManager, asynccontextmanager, suppress
-from functools import partial
-from typing import TYPE_CHECKING, AsyncGenerator, Awaitable, Callable, Iterable
-
-import msgspec.json
-
-from litestar.di import Provide
-from litestar.exceptions import ImproperlyConfiguredException, LitestarException
-from litestar.handlers import WebsocketRouteHandler
-from litestar.plugins import InitPluginProtocol
-from litestar.serialization import default_serializer
-
-from .subscriber import BacklogStrategy, EventCallback, Subscriber
-
-if TYPE_CHECKING:
- from types import TracebackType
-
- from litestar.channels.backends.base import ChannelsBackend
- from litestar.config.app import AppConfig
- from litestar.connection import WebSocket
- from litestar.types import LitestarEncodableType, TypeEncodersMap
- from litestar.types.asgi_types import WebSocketMode
-
-
-class ChannelsException(LitestarException):
- pass
-
-
-class ChannelsPlugin(InitPluginProtocol, AbstractAsyncContextManager):
- def __init__(
- self,
- backend: ChannelsBackend,
- *,
- channels: Iterable[str] | None = None,
- arbitrary_channels_allowed: bool = False,
- create_ws_route_handlers: bool = False,
- ws_handler_send_history: int = 0,
- ws_handler_base_path: str = "/",
- ws_send_mode: WebSocketMode = "text",
- subscriber_max_backlog: int | None = None,
- subscriber_backlog_strategy: BacklogStrategy = "backoff",
- subscriber_class: type[Subscriber] = Subscriber,
- type_encoders: TypeEncodersMap | None = None,
- ) -> None:
- """Plugin to handle broadcasting to WebSockets with support for channels.
-
- This plugin is available as an injected dependency using the ``channels`` key.
-
- Args:
- backend: Backend to store data in
- channels: Channels to serve. If ``arbitrary_channels_allowed`` is ``False`` (the default), trying to
- subscribe to a channel not in ``channels`` will raise an exception
- arbitrary_channels_allowed: Allow the creation of channels on the fly
- create_ws_route_handlers: If ``True``, websocket route handlers will be created for all channels defined in
- ``channels``. If ``arbitrary_channels_allowed`` is ``True``, a single handler will be created instead,
- handling all channels. The handlers created will accept WebSocket connections and start sending received
- events for their respective channels.
- ws_handler_send_history: Amount of history entries to send from the generated websocket route handlers after
- a client has connected. A value of ``0`` indicates to not send a history
- ws_handler_base_path: Path prefix used for the generated route handlers
- ws_send_mode: Send mode to use for sending data through a :class:`WebSocket <.connection.WebSocket>`.
- This will be used when sending within generated route handlers or :meth:`Subscriber.run_in_background`
- subscriber_max_backlog: Maximum amount of unsent messages to be held in memory for a given subscriber. If
- that limit is reached, new messages will be treated accordingly to ``backlog_strategy``
- subscriber_backlog_strategy: Define the behaviour if ``max_backlog`` is reached for a subscriber. `
- `backoff`` will result in new messages being dropped until older ones have been processed. ``dropleft``
- will drop older messages in favour of new ones.
- subscriber_class: A :class:`Subscriber` subclass to return from :meth:`subscribe`
- type_encoders: An additional mapping of type encoders used to encode data before sending
-
- """
- self._backend = backend
- self._pub_queue: Queue[tuple[bytes, list[str]]] | None = None
- self._pub_task: Task | None = None
- self._sub_task: Task | None = None
-
- if not (channels or arbitrary_channels_allowed):
- raise ImproperlyConfiguredException("Must define either channels or set arbitrary_channels_allowed=True")
-
- # make the path absolute, so we can simply concatenate it later
- if not ws_handler_base_path.endswith("/"):
- ws_handler_base_path += "/"
-
- self._arbitrary_channels_allowed = arbitrary_channels_allowed
- self._create_route_handlers = create_ws_route_handlers
- self._handler_root_path = ws_handler_base_path
- self._socket_send_mode: WebSocketMode = ws_send_mode
- self._encode_json = msgspec.json.Encoder(
- enc_hook=partial(default_serializer, type_encoders=type_encoders)
- ).encode
- self._handler_should_send_history = bool(ws_handler_send_history)
- self._history_limit = None if ws_handler_send_history < 0 else ws_handler_send_history
- self._max_backlog = subscriber_max_backlog
- self._backlog_strategy: BacklogStrategy = subscriber_backlog_strategy
- self._subscriber_class = subscriber_class
-
- self._channels: dict[str, set[Subscriber]] = {channel: set() for channel in channels or []}
-
- def encode_data(self, data: LitestarEncodableType) -> bytes:
- """Encode data before storing it in the backend"""
- if isinstance(data, bytes):
- return data
-
- return data.encode() if isinstance(data, str) else self._encode_json(data)
-
- def on_app_init(self, app_config: AppConfig) -> AppConfig:
- """Plugin hook. Set up a ``channels`` dependency, add route handlers and register application hooks"""
- app_config.dependencies["channels"] = Provide(lambda: self, use_cache=True, sync_to_thread=False)
- app_config.lifespan.append(self)
- app_config.signature_namespace.update(ChannelsPlugin=ChannelsPlugin)
-
- if self._create_route_handlers:
- if self._arbitrary_channels_allowed:
- path = self._handler_root_path + "{channel_name:str}"
- route_handlers = [WebsocketRouteHandler(path)(self._ws_handler_func)]
- else:
- route_handlers = [
- WebsocketRouteHandler(self._handler_root_path + channel_name)(
- self._create_ws_handler_func(channel_name)
- )
- for channel_name in self._channels
- ]
- app_config.route_handlers.extend(route_handlers)
-
- return app_config
-
- def publish(self, data: LitestarEncodableType, channels: str | Iterable[str]) -> None:
- """Schedule ``data`` to be published to ``channels``.
-
- .. note::
- This is a synchronous method that returns immediately. There are no
- guarantees that when this method returns the data will have been published
- to the backend. For that, use :meth:`wait_published`
-
- """
- if isinstance(channels, str):
- channels = [channels]
- data = self.encode_data(data)
- try:
- self._pub_queue.put_nowait((data, list(channels))) # type: ignore[union-attr]
- except AttributeError as e:
- raise RuntimeError("Plugin not yet initialized. Did you forget to call on_startup?") from e
-
- async def wait_published(self, data: LitestarEncodableType, channels: str | Iterable[str]) -> None:
- """Publish ``data`` to ``channels``"""
- if isinstance(channels, str):
- channels = [channels]
- data = self.encode_data(data)
-
- await self._backend.publish(data, channels)
-
- async def subscribe(self, channels: str | Iterable[str], history: int | None = None) -> Subscriber:
- """Create a :class:`Subscriber`, providing a stream of all events in ``channels``.
-
- The created subscriber will be passive by default and has to be consumed manually,
- either by using :meth:`Subscriber.run_in_background` or iterating over events
- using :meth:`Subscriber.iter_events`.
-
- Args:
- channels: Channel(s) to subscribe to
- history: If a non-negative integer, add this amount of history entries from
- each channel to the subscriber's event stream. Note that this will wait
- until all history entries are fetched and pushed to the subscriber's
- stream. For more control use :meth:`put_subscriber_history`.
-
- Returns:
- A :class:`Subscriber`
-
- Raises:
- ChannelsException: If a channel in ``channels`` has not been declared on this backend and
- ``arbitrary_channels_allowed`` has not been set to ``True``
- """
- if isinstance(channels, str):
- channels = [channels]
-
- subscriber = self._subscriber_class(
- plugin=self,
- max_backlog=self._max_backlog,
- backlog_strategy=self._backlog_strategy,
- )
- channels_to_subscribe = set()
-
- for channel in channels:
- if channel not in self._channels:
- if not self._arbitrary_channels_allowed:
- raise ChannelsException(
- f"Unknown channel: {channel!r}. Either explicitly defined the channel or set "
- "arbitrary_channels_allowed=True"
- )
- self._channels[channel] = set()
- channel_subscribers = self._channels[channel]
- if not channel_subscribers:
- channels_to_subscribe.add(channel)
-
- channel_subscribers.add(subscriber)
-
- if channels_to_subscribe:
- await self._backend.subscribe(channels_to_subscribe)
-
- if history:
- await self.put_subscriber_history(subscriber=subscriber, limit=history, channels=channels)
-
- return subscriber
-
- async def unsubscribe(self, subscriber: Subscriber, channels: str | Iterable[str] | None = None) -> None:
- """Unsubscribe a :class:`Subscriber` from ``channels``. If the subscriber has a running sending task, it will
- be stopped.
-
- Args:
- channels: Channels to unsubscribe from. If ``None``, unsubscribe from all channels
- subscriber: :class:`Subscriber` to unsubscribe
- """
- if channels is None:
- channels = list(self._channels.keys())
- elif isinstance(channels, str):
- channels = [channels]
-
- channels_to_unsubscribe: set[str] = set()
-
- for channel in channels:
- channel_subscribers = self._channels[channel]
-
- try:
- channel_subscribers.remove(subscriber)
- except KeyError: # subscriber was not subscribed to this channel. This may happen if channels is None
- continue
-
- if not channel_subscribers:
- channels_to_unsubscribe.add(channel)
-
- if all(subscriber not in queues for queues in self._channels.values()):
- await subscriber.put(None) # this will stop any running task or generator by breaking the inner loop
- if subscriber.is_running:
- await subscriber.stop()
-
- if channels_to_unsubscribe:
- await self._backend.unsubscribe(channels_to_unsubscribe)
-
- @asynccontextmanager
- async def start_subscription(
- self, channels: str | Iterable[str], history: int | None = None
- ) -> AsyncGenerator[Subscriber, None]:
- """Create a :class:`Subscriber` and tie its subscriptions to a context manager;
- Upon exiting the context, :meth:`unsubscribe` will be called.
-
- Args:
- channels: Channel(s) to subscribe to
- history: If a non-negative integer, add this amount of history entries from
- each channel to the subscriber's event stream. Note that this will wait
- until all history entries are fetched and pushed to the subscriber's
- stream. For more control use :meth:`put_subscriber_history`.
-
- Returns:
- A :class:`Subscriber`
- """
- subscriber = await self.subscribe(channels, history=history)
-
- try:
- yield subscriber
- finally:
- await self.unsubscribe(subscriber, channels)
-
- async def put_subscriber_history(
- self, subscriber: Subscriber, channels: str | Iterable[str], limit: int | None = None
- ) -> None:
- """Fetch the history of ``channels`` from the backend and put them in the
- subscriber's stream
- """
- if isinstance(channels, str):
- channels = [channels]
-
- for channel in channels:
- history = await self._backend.get_history(channel, limit)
- for entry in history:
- await subscriber.put(entry)
-
- async def _ws_handler_func(self, channel_name: str, socket: WebSocket) -> None:
- await socket.accept()
-
- # the ternary operator triggers a mypy bug: https://github.com/python/mypy/issues/10740
- on_event: EventCallback = socket.send_text if self._socket_send_mode == "text" else socket.send_bytes # type: ignore[assignment]
-
- async with self.start_subscription(channel_name) as subscriber:
- if self._handler_should_send_history:
- await self.put_subscriber_history(subscriber, channels=channel_name, limit=self._history_limit)
-
- # use the background task, so we can block on receive(), breaking the loop when a connection closes
- async with subscriber.run_in_background(on_event):
- while (await socket.receive())["type"] != "websocket.disconnect":
- continue
-
- def _create_ws_handler_func(self, channel_name: str) -> Callable[[WebSocket], Awaitable[None]]:
- async def ws_handler_func(socket: WebSocket) -> None:
- await self._ws_handler_func(channel_name=channel_name, socket=socket)
-
- return ws_handler_func
-
- async def _pub_worker(self) -> None:
- while self._pub_queue:
- data, channels = await self._pub_queue.get()
- await self._backend.publish(data, channels)
- self._pub_queue.task_done()
-
- async def _sub_worker(self) -> None:
- async for channel, payload in self._backend.stream_events():
- for subscriber in self._channels.get(channel, []):
- subscriber.put_nowait(payload)
-
- async def _on_startup(self) -> None:
- await self._backend.on_startup()
- self._pub_queue = Queue()
- self._pub_task = create_task(self._pub_worker())
- self._sub_task = create_task(self._sub_worker())
- if self._channels:
- await self._backend.subscribe(list(self._channels))
-
- async def _on_shutdown(self) -> None:
- if self._pub_queue:
- await self._pub_queue.join()
- self._pub_queue = None
-
- await asyncio.gather(
- *[
- subscriber.stop(join=False)
- for subscribers in self._channels.values()
- for subscriber in subscribers
- if subscriber.is_running
- ]
- )
-
- if self._sub_task:
- self._sub_task.cancel()
- with suppress(CancelledError):
- await self._sub_task
- self._sub_task = None
-
- if self._pub_task:
- self._pub_task.cancel()
- with suppress(CancelledError):
- await self._pub_task
- self._sub_task = None
-
- await self._backend.on_shutdown()
-
- async def __aenter__(self) -> ChannelsPlugin:
- await self._on_startup()
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: TracebackType | None,
- ) -> None:
- await self._on_shutdown()
diff --git a/venv/lib/python3.11/site-packages/litestar/channels/subscriber.py b/venv/lib/python3.11/site-packages/litestar/channels/subscriber.py
deleted file mode 100644
index b358bc4..0000000
--- a/venv/lib/python3.11/site-packages/litestar/channels/subscriber.py
+++ /dev/null
@@ -1,135 +0,0 @@
-from __future__ import annotations
-
-import asyncio
-from asyncio import CancelledError, Queue, QueueFull
-from collections import deque
-from contextlib import AsyncExitStack, asynccontextmanager, suppress
-from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, Generic, Literal, TypeVar
-
-if TYPE_CHECKING:
- from litestar.channels import ChannelsPlugin
-
-
-T = TypeVar("T")
-
-BacklogStrategy = Literal["backoff", "dropleft"]
-
-EventCallback = Callable[[bytes], Awaitable[Any]]
-
-
-class AsyncDeque(Queue, Generic[T]):
- def __init__(self, maxsize: int | None) -> None:
- self._deque_maxlen = maxsize
- super().__init__()
-
- def _init(self, maxsize: int) -> None:
- self._queue: deque[T] = deque(maxlen=self._deque_maxlen)
-
-
-class Subscriber:
- """A wrapper around a stream of events published to subscribed channels"""
-
- def __init__(
- self,
- plugin: ChannelsPlugin,
- max_backlog: int | None = None,
- backlog_strategy: BacklogStrategy = "backoff",
- ) -> None:
- self._task: asyncio.Task | None = None
- self._plugin = plugin
- self._backend = plugin._backend
- self._queue: Queue[bytes | None] | AsyncDeque[bytes | None]
-
- if max_backlog and backlog_strategy == "dropleft":
- self._queue = AsyncDeque(maxsize=max_backlog or 0)
- else:
- self._queue = Queue(maxsize=max_backlog or 0)
-
- async def put(self, item: bytes | None) -> None:
- await self._queue.put(item)
-
- def put_nowait(self, item: bytes | None) -> bool:
- """Put an item in the subscriber's stream without waiting"""
- try:
- self._queue.put_nowait(item)
- return True
- except QueueFull:
- return False
-
- @property
- def qsize(self) -> int:
- return self._queue.qsize()
-
- async def iter_events(self) -> AsyncGenerator[bytes, None]:
- """Iterate over the stream of events. If no items are available, block until
- one becomes available
- """
- while True:
- item = await self._queue.get()
- if item is None:
- self._queue.task_done()
- break
- yield item
- self._queue.task_done()
-
- @asynccontextmanager
- async def run_in_background(self, on_event: EventCallback, join: bool = True) -> AsyncGenerator[None, None]:
- """Start a task in the background that sends events from the subscriber's stream
- to ``socket`` as they become available. On exit, it will prevent the stream from
- accepting new events and wait until the currently enqueued ones are processed.
- Should the context be left with an exception, the task will be cancelled
- immediately.
-
- Args:
- on_event: Callback to invoke with the event data for every event
- join: If ``True``, wait for all items in the stream to be processed before
- stopping the worker. Note that an error occurring within the context
- will always lead to the immediate cancellation of the worker
- """
- self._start_in_background(on_event=on_event)
- async with AsyncExitStack() as exit_stack:
- exit_stack.push_async_callback(self.stop, join=False)
- yield
- exit_stack.pop_all()
- await self.stop(join=join)
-
- async def _worker(self, on_event: EventCallback) -> None:
- async for event in self.iter_events():
- await on_event(event)
-
- def _start_in_background(self, on_event: EventCallback) -> None:
- """Start a task in the background that sends events from the subscriber's stream
- to ``socket`` as they become available.
-
- Args:
- on_event: Callback to invoke with the event data for every event
- """
- if self._task is not None:
- raise RuntimeError("Subscriber is already running")
- self._task = asyncio.create_task(self._worker(on_event))
-
- @property
- def is_running(self) -> bool:
- """Return whether a sending task is currently running"""
- return self._task is not None
-
- async def stop(self, join: bool = False) -> None:
- """Stop a task was previously started with :meth:`run_in_background`. If the
- task is not yet done it will be cancelled and awaited
-
- Args:
- join: If ``True`` wait for all items to be processed before stopping the task
- """
- if not self._task:
- return
-
- if join:
- await self._queue.join()
-
- if not self._task.done():
- self._task.cancel()
-
- with suppress(CancelledError):
- await self._task
-
- self._task = None