summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/events/emitter.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/litestar/events/emitter.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/events/emitter.py')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/events/emitter.py134
1 files changed, 134 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/events/emitter.py b/venv/lib/python3.11/site-packages/litestar/events/emitter.py
new file mode 100644
index 0000000..7c33c9e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/events/emitter.py
@@ -0,0 +1,134 @@
+from __future__ import annotations
+
+import math
+import sys
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from contextlib import AsyncExitStack
+from functools import partial
+from typing import TYPE_CHECKING, Any, Sequence
+
+if sys.version_info < (3, 9):
+ from typing import AsyncContextManager
+else:
+ from contextlib import AbstractAsyncContextManager as AsyncContextManager
+
+import anyio
+
+from litestar.exceptions import ImproperlyConfiguredException
+
+if TYPE_CHECKING:
+ from types import TracebackType
+
+ from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
+
+ from litestar.events.listener import EventListener
+
+__all__ = ("BaseEventEmitterBackend", "SimpleEventEmitter")
+
+
+class BaseEventEmitterBackend(AsyncContextManager["BaseEventEmitterBackend"], ABC):
+ """Abstract class used to define event emitter backends."""
+
+ __slots__ = ("listeners",)
+
+ listeners: defaultdict[str, set[EventListener]]
+
+ def __init__(self, listeners: Sequence[EventListener]) -> None:
+ """Create an event emitter instance.
+
+ Args:
+ listeners: A list of listeners.
+ """
+ self.listeners = defaultdict(set)
+ for listener in listeners:
+ for event_id in listener.event_ids:
+ self.listeners[event_id].add(listener)
+
+ @abstractmethod
+ def emit(self, event_id: str, *args: Any, **kwargs: Any) -> None:
+ """Emit an event to all attached listeners.
+
+ Args:
+ event_id: The ID of the event to emit, e.g 'my_event'.
+ *args: args to pass to the listener(s).
+ **kwargs: kwargs to pass to the listener(s)
+
+ Returns:
+ None
+ """
+ raise NotImplementedError("not implemented")
+
+
+class SimpleEventEmitter(BaseEventEmitterBackend):
+ """Event emitter the works only in the current process"""
+
+ __slots__ = ("_queue", "_exit_stack", "_receive_stream", "_send_stream")
+
+ def __init__(self, listeners: Sequence[EventListener]) -> None:
+ """Create an event emitter instance.
+
+ Args:
+ listeners: A list of listeners.
+ """
+ super().__init__(listeners=listeners)
+ self._receive_stream: MemoryObjectReceiveStream | None = None
+ self._send_stream: MemoryObjectSendStream | None = None
+ self._exit_stack: AsyncExitStack | None = None
+
+ async def _worker(self, receive_stream: MemoryObjectReceiveStream) -> None:
+ """Run items from ``receive_stream`` in a task group.
+
+ Returns:
+ None
+ """
+ async with receive_stream, anyio.create_task_group() as task_group:
+ async for item in receive_stream:
+ fn, args, kwargs = item
+ if kwargs:
+ fn = partial(fn, **kwargs)
+ task_group.start_soon(fn, *args) # pyright: ignore[reportGeneralTypeIssues]
+
+ async def __aenter__(self) -> SimpleEventEmitter:
+ self._exit_stack = AsyncExitStack()
+ send_stream, receive_stream = anyio.create_memory_object_stream(math.inf) # type: ignore[var-annotated]
+ self._send_stream = send_stream
+ task_group = anyio.create_task_group()
+
+ await self._exit_stack.enter_async_context(task_group)
+ await self._exit_stack.enter_async_context(send_stream)
+ task_group.start_soon(self._worker, receive_stream)
+
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ if self._exit_stack:
+ await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
+
+ self._exit_stack = None
+ self._send_stream = None
+
+ def emit(self, event_id: str, *args: Any, **kwargs: Any) -> None:
+ """Emit an event to all attached listeners.
+
+ Args:
+ event_id: The ID of the event to emit, e.g 'my_event'.
+ *args: args to pass to the listener(s).
+ **kwargs: kwargs to pass to the listener(s)
+
+ Returns:
+ None
+ """
+ if not (self._send_stream and self._exit_stack):
+ raise RuntimeError("Emitter not initialized")
+
+ if listeners := self.listeners.get(event_id):
+ for listener in listeners:
+ self._send_stream.send_nowait((listener.fn, args, kwargs))
+ return
+ raise ImproperlyConfiguredException(f"no event listeners are registered for event ID: {event_id}")