summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/litestar/events
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/litestar/events')
-rw-r--r--venv/lib/python3.11/site-packages/litestar/events/__init__.py4
-rw-r--r--venv/lib/python3.11/site-packages/litestar/events/__pycache__/__init__.cpython-311.pycbin0 -> 418 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/events/__pycache__/emitter.cpython-311.pycbin0 -> 7064 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/events/__pycache__/listener.cpython-311.pycbin0 -> 3902 bytes
-rw-r--r--venv/lib/python3.11/site-packages/litestar/events/emitter.py134
-rw-r--r--venv/lib/python3.11/site-packages/litestar/events/listener.py76
6 files changed, 214 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/litestar/events/__init__.py b/venv/lib/python3.11/site-packages/litestar/events/__init__.py
new file mode 100644
index 0000000..a291141
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/events/__init__.py
@@ -0,0 +1,4 @@
+from .emitter import BaseEventEmitterBackend, SimpleEventEmitter
+from .listener import EventListener, listener
+
+__all__ = ("EventListener", "SimpleEventEmitter", "BaseEventEmitterBackend", "listener")
diff --git a/venv/lib/python3.11/site-packages/litestar/events/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/events/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000..bc8ee50
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/events/__pycache__/__init__.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/events/__pycache__/emitter.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/events/__pycache__/emitter.cpython-311.pyc
new file mode 100644
index 0000000..f59ec24
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/events/__pycache__/emitter.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/events/__pycache__/listener.cpython-311.pyc b/venv/lib/python3.11/site-packages/litestar/events/__pycache__/listener.cpython-311.pyc
new file mode 100644
index 0000000..666bcbc
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/events/__pycache__/listener.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/litestar/events/emitter.py b/venv/lib/python3.11/site-packages/litestar/events/emitter.py
new file mode 100644
index 0000000..7c33c9e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/events/emitter.py
@@ -0,0 +1,134 @@
+from __future__ import annotations
+
+import math
+import sys
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from contextlib import AsyncExitStack
+from functools import partial
+from typing import TYPE_CHECKING, Any, Sequence
+
+if sys.version_info < (3, 9):
+ from typing import AsyncContextManager
+else:
+ from contextlib import AbstractAsyncContextManager as AsyncContextManager
+
+import anyio
+
+from litestar.exceptions import ImproperlyConfiguredException
+
+if TYPE_CHECKING:
+ from types import TracebackType
+
+ from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
+
+ from litestar.events.listener import EventListener
+
+__all__ = ("BaseEventEmitterBackend", "SimpleEventEmitter")
+
+
+class BaseEventEmitterBackend(AsyncContextManager["BaseEventEmitterBackend"], ABC):
+ """Abstract class used to define event emitter backends."""
+
+ __slots__ = ("listeners",)
+
+ listeners: defaultdict[str, set[EventListener]]
+
+ def __init__(self, listeners: Sequence[EventListener]) -> None:
+ """Create an event emitter instance.
+
+ Args:
+ listeners: A list of listeners.
+ """
+ self.listeners = defaultdict(set)
+ for listener in listeners:
+ for event_id in listener.event_ids:
+ self.listeners[event_id].add(listener)
+
+ @abstractmethod
+ def emit(self, event_id: str, *args: Any, **kwargs: Any) -> None:
+ """Emit an event to all attached listeners.
+
+ Args:
+ event_id: The ID of the event to emit, e.g 'my_event'.
+ *args: args to pass to the listener(s).
+ **kwargs: kwargs to pass to the listener(s)
+
+ Returns:
+ None
+ """
+ raise NotImplementedError("not implemented")
+
+
+class SimpleEventEmitter(BaseEventEmitterBackend):
+ """Event emitter the works only in the current process"""
+
+ __slots__ = ("_queue", "_exit_stack", "_receive_stream", "_send_stream")
+
+ def __init__(self, listeners: Sequence[EventListener]) -> None:
+ """Create an event emitter instance.
+
+ Args:
+ listeners: A list of listeners.
+ """
+ super().__init__(listeners=listeners)
+ self._receive_stream: MemoryObjectReceiveStream | None = None
+ self._send_stream: MemoryObjectSendStream | None = None
+ self._exit_stack: AsyncExitStack | None = None
+
+ async def _worker(self, receive_stream: MemoryObjectReceiveStream) -> None:
+ """Run items from ``receive_stream`` in a task group.
+
+ Returns:
+ None
+ """
+ async with receive_stream, anyio.create_task_group() as task_group:
+ async for item in receive_stream:
+ fn, args, kwargs = item
+ if kwargs:
+ fn = partial(fn, **kwargs)
+ task_group.start_soon(fn, *args) # pyright: ignore[reportGeneralTypeIssues]
+
+ async def __aenter__(self) -> SimpleEventEmitter:
+ self._exit_stack = AsyncExitStack()
+ send_stream, receive_stream = anyio.create_memory_object_stream(math.inf) # type: ignore[var-annotated]
+ self._send_stream = send_stream
+ task_group = anyio.create_task_group()
+
+ await self._exit_stack.enter_async_context(task_group)
+ await self._exit_stack.enter_async_context(send_stream)
+ task_group.start_soon(self._worker, receive_stream)
+
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ if self._exit_stack:
+ await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb)
+
+ self._exit_stack = None
+ self._send_stream = None
+
+ def emit(self, event_id: str, *args: Any, **kwargs: Any) -> None:
+ """Emit an event to all attached listeners.
+
+ Args:
+ event_id: The ID of the event to emit, e.g 'my_event'.
+ *args: args to pass to the listener(s).
+ **kwargs: kwargs to pass to the listener(s)
+
+ Returns:
+ None
+ """
+ if not (self._send_stream and self._exit_stack):
+ raise RuntimeError("Emitter not initialized")
+
+ if listeners := self.listeners.get(event_id):
+ for listener in listeners:
+ self._send_stream.send_nowait((listener.fn, args, kwargs))
+ return
+ raise ImproperlyConfiguredException(f"no event listeners are registered for event ID: {event_id}")
diff --git a/venv/lib/python3.11/site-packages/litestar/events/listener.py b/venv/lib/python3.11/site-packages/litestar/events/listener.py
new file mode 100644
index 0000000..63c9848
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/litestar/events/listener.py
@@ -0,0 +1,76 @@
+from __future__ import annotations
+
+import logging
+from typing import TYPE_CHECKING, Any
+
+from litestar.exceptions import ImproperlyConfiguredException
+from litestar.utils import ensure_async_callable
+
+if TYPE_CHECKING:
+ from litestar.types import AnyCallable, AsyncAnyCallable
+
+__all__ = ("EventListener", "listener")
+
+logger = logging.getLogger(__name__)
+
+
+class EventListener:
+ """Decorator for event listeners"""
+
+ __slots__ = ("event_ids", "fn", "listener_id")
+
+ fn: AsyncAnyCallable
+
+ def __init__(self, *event_ids: str) -> None:
+ """Create a decorator for event handlers.
+
+ Args:
+ *event_ids: The id of the event to listen to or a list of
+ event ids to listen to.
+ """
+ self.event_ids: frozenset[str] = frozenset(event_ids)
+
+ def __call__(self, fn: AnyCallable) -> EventListener:
+ """Decorate a callable by wrapping it inside an instance of EventListener.
+
+ Args:
+ fn: Callable to decorate.
+
+ Returns:
+ An instance of EventListener
+ """
+ if not callable(fn):
+ raise ImproperlyConfiguredException("EventListener instance should be called as a decorator on a callable")
+
+ self.fn = self.wrap_in_error_handler(ensure_async_callable(fn))
+
+ return self
+
+ @staticmethod
+ def wrap_in_error_handler(fn: AsyncAnyCallable) -> AsyncAnyCallable:
+ """Wrap a listener function to handle errors.
+
+ Listeners are executed concurrently in a TaskGroup, so we need to ensure that exceptions do not propagate
+ to the task group which results in any other unfinished listeners to be cancelled, and the receive stream to
+ be closed.
+
+ See https://github.com/litestar-org/litestar/issues/2809
+
+ Args:
+ fn: The listener function to wrap.
+ """
+
+ async def wrapped(*args: Any, **kwargs: Any) -> None:
+ """Wrap a listener function to handle errors."""
+ try:
+ await fn(*args, **kwargs)
+ except Exception as exc:
+ logger.exception("Error while executing listener %s: %s", fn.__name__, exc)
+
+ return wrapped
+
+ def __hash__(self) -> int:
+ return hash(self.event_ids) + hash(self.fn)
+
+
+listener = EventListener