summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/anyio/_backends
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/anyio/_backends')
-rw-r--r--venv/lib/python3.11/site-packages/anyio/_backends/__init__.py0
-rw-r--r--venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/__init__.cpython-311.pycbin0 -> 199 bytes
-rw-r--r--venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/_asyncio.cpython-311.pycbin0 -> 132800 bytes
-rw-r--r--venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/_trio.cpython-311.pycbin0 -> 68671 bytes
-rw-r--r--venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py2478
-rw-r--r--venv/lib/python3.11/site-packages/anyio/_backends/_trio.py1169
6 files changed, 3647 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/anyio/_backends/__init__.py b/venv/lib/python3.11/site-packages/anyio/_backends/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/anyio/_backends/__init__.py
diff --git a/venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000..b016e2e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/__init__.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/_asyncio.cpython-311.pyc b/venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/_asyncio.cpython-311.pyc
new file mode 100644
index 0000000..fd8f88a
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/_asyncio.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/_trio.cpython-311.pyc b/venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/_trio.cpython-311.pyc
new file mode 100644
index 0000000..aa2c0cd
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/anyio/_backends/__pycache__/_trio.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py b/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py
new file mode 100644
index 0000000..2699bf8
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py
@@ -0,0 +1,2478 @@
+from __future__ import annotations
+
+import array
+import asyncio
+import concurrent.futures
+import math
+import socket
+import sys
+import threading
+from asyncio import (
+ AbstractEventLoop,
+ CancelledError,
+ all_tasks,
+ create_task,
+ current_task,
+ get_running_loop,
+ sleep,
+)
+from asyncio.base_events import _run_until_complete_cb # type: ignore[attr-defined]
+from collections import OrderedDict, deque
+from collections.abc import AsyncIterator, Generator, Iterable
+from concurrent.futures import Future
+from contextlib import suppress
+from contextvars import Context, copy_context
+from dataclasses import dataclass
+from functools import partial, wraps
+from inspect import (
+ CORO_RUNNING,
+ CORO_SUSPENDED,
+ getcoroutinestate,
+ iscoroutine,
+)
+from io import IOBase
+from os import PathLike
+from queue import Queue
+from signal import Signals
+from socket import AddressFamily, SocketKind
+from threading import Thread
+from types import TracebackType
+from typing import (
+ IO,
+ Any,
+ AsyncGenerator,
+ Awaitable,
+ Callable,
+ Collection,
+ ContextManager,
+ Coroutine,
+ Mapping,
+ Optional,
+ Sequence,
+ Tuple,
+ TypeVar,
+ cast,
+)
+from weakref import WeakKeyDictionary
+
+import sniffio
+
+from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
+from .._core._eventloop import claim_worker_thread, threadlocals
+from .._core._exceptions import (
+ BrokenResourceError,
+ BusyResourceError,
+ ClosedResourceError,
+ EndOfStream,
+ WouldBlock,
+)
+from .._core._sockets import convert_ipv6_sockaddr
+from .._core._streams import create_memory_object_stream
+from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
+from .._core._synchronization import Event as BaseEvent
+from .._core._synchronization import ResourceGuard
+from .._core._tasks import CancelScope as BaseCancelScope
+from ..abc import (
+ AsyncBackend,
+ IPSockAddrType,
+ SocketListener,
+ UDPPacketType,
+ UNIXDatagramPacketType,
+)
+from ..lowlevel import RunVar
+from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
+
+if sys.version_info >= (3, 10):
+ from typing import ParamSpec
+else:
+ from typing_extensions import ParamSpec
+
+if sys.version_info >= (3, 11):
+ from asyncio import Runner
+ from typing import TypeVarTuple, Unpack
+else:
+ import contextvars
+ import enum
+ import signal
+ from asyncio import coroutines, events, exceptions, tasks
+
+ from exceptiongroup import BaseExceptionGroup
+ from typing_extensions import TypeVarTuple, Unpack
+
+ class _State(enum.Enum):
+ CREATED = "created"
+ INITIALIZED = "initialized"
+ CLOSED = "closed"
+
+ class Runner:
+ # Copied from CPython 3.11
+ def __init__(
+ self,
+ *,
+ debug: bool | None = None,
+ loop_factory: Callable[[], AbstractEventLoop] | None = None,
+ ):
+ self._state = _State.CREATED
+ self._debug = debug
+ self._loop_factory = loop_factory
+ self._loop: AbstractEventLoop | None = None
+ self._context = None
+ self._interrupt_count = 0
+ self._set_event_loop = False
+
+ def __enter__(self) -> Runner:
+ self._lazy_init()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException],
+ exc_val: BaseException,
+ exc_tb: TracebackType,
+ ) -> None:
+ self.close()
+
+ def close(self) -> None:
+ """Shutdown and close event loop."""
+ if self._state is not _State.INITIALIZED:
+ return
+ try:
+ loop = self._loop
+ _cancel_all_tasks(loop)
+ loop.run_until_complete(loop.shutdown_asyncgens())
+ if hasattr(loop, "shutdown_default_executor"):
+ loop.run_until_complete(loop.shutdown_default_executor())
+ else:
+ loop.run_until_complete(_shutdown_default_executor(loop))
+ finally:
+ if self._set_event_loop:
+ events.set_event_loop(None)
+ loop.close()
+ self._loop = None
+ self._state = _State.CLOSED
+
+ def get_loop(self) -> AbstractEventLoop:
+ """Return embedded event loop."""
+ self._lazy_init()
+ return self._loop
+
+ def run(self, coro: Coroutine[T_Retval], *, context=None) -> T_Retval:
+ """Run a coroutine inside the embedded event loop."""
+ if not coroutines.iscoroutine(coro):
+ raise ValueError(f"a coroutine was expected, got {coro!r}")
+
+ if events._get_running_loop() is not None:
+ # fail fast with short traceback
+ raise RuntimeError(
+ "Runner.run() cannot be called from a running event loop"
+ )
+
+ self._lazy_init()
+
+ if context is None:
+ context = self._context
+ task = context.run(self._loop.create_task, coro)
+
+ if (
+ threading.current_thread() is threading.main_thread()
+ and signal.getsignal(signal.SIGINT) is signal.default_int_handler
+ ):
+ sigint_handler = partial(self._on_sigint, main_task=task)
+ try:
+ signal.signal(signal.SIGINT, sigint_handler)
+ except ValueError:
+ # `signal.signal` may throw if `threading.main_thread` does
+ # not support signals (e.g. embedded interpreter with signals
+ # not registered - see gh-91880)
+ sigint_handler = None
+ else:
+ sigint_handler = None
+
+ self._interrupt_count = 0
+ try:
+ return self._loop.run_until_complete(task)
+ except exceptions.CancelledError:
+ if self._interrupt_count > 0:
+ uncancel = getattr(task, "uncancel", None)
+ if uncancel is not None and uncancel() == 0:
+ raise KeyboardInterrupt()
+ raise # CancelledError
+ finally:
+ if (
+ sigint_handler is not None
+ and signal.getsignal(signal.SIGINT) is sigint_handler
+ ):
+ signal.signal(signal.SIGINT, signal.default_int_handler)
+
+ def _lazy_init(self) -> None:
+ if self._state is _State.CLOSED:
+ raise RuntimeError("Runner is closed")
+ if self._state is _State.INITIALIZED:
+ return
+ if self._loop_factory is None:
+ self._loop = events.new_event_loop()
+ if not self._set_event_loop:
+ # Call set_event_loop only once to avoid calling
+ # attach_loop multiple times on child watchers
+ events.set_event_loop(self._loop)
+ self._set_event_loop = True
+ else:
+ self._loop = self._loop_factory()
+ if self._debug is not None:
+ self._loop.set_debug(self._debug)
+ self._context = contextvars.copy_context()
+ self._state = _State.INITIALIZED
+
+ def _on_sigint(self, signum, frame, main_task: asyncio.Task) -> None:
+ self._interrupt_count += 1
+ if self._interrupt_count == 1 and not main_task.done():
+ main_task.cancel()
+ # wakeup loop if it is blocked by select() with long timeout
+ self._loop.call_soon_threadsafe(lambda: None)
+ return
+ raise KeyboardInterrupt()
+
+ def _cancel_all_tasks(loop: AbstractEventLoop) -> None:
+ to_cancel = tasks.all_tasks(loop)
+ if not to_cancel:
+ return
+
+ for task in to_cancel:
+ task.cancel()
+
+ loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
+
+ for task in to_cancel:
+ if task.cancelled():
+ continue
+ if task.exception() is not None:
+ loop.call_exception_handler(
+ {
+ "message": "unhandled exception during asyncio.run() shutdown",
+ "exception": task.exception(),
+ "task": task,
+ }
+ )
+
+ async def _shutdown_default_executor(loop: AbstractEventLoop) -> None:
+ """Schedule the shutdown of the default executor."""
+
+ def _do_shutdown(future: asyncio.futures.Future) -> None:
+ try:
+ loop._default_executor.shutdown(wait=True) # type: ignore[attr-defined]
+ loop.call_soon_threadsafe(future.set_result, None)
+ except Exception as ex:
+ loop.call_soon_threadsafe(future.set_exception, ex)
+
+ loop._executor_shutdown_called = True
+ if loop._default_executor is None:
+ return
+ future = loop.create_future()
+ thread = threading.Thread(target=_do_shutdown, args=(future,))
+ thread.start()
+ try:
+ await future
+ finally:
+ thread.join()
+
+
+T_Retval = TypeVar("T_Retval")
+T_contra = TypeVar("T_contra", contravariant=True)
+PosArgsT = TypeVarTuple("PosArgsT")
+P = ParamSpec("P")
+
+_root_task: RunVar[asyncio.Task | None] = RunVar("_root_task")
+
+
+def find_root_task() -> asyncio.Task:
+ root_task = _root_task.get(None)
+ if root_task is not None and not root_task.done():
+ return root_task
+
+ # Look for a task that has been started via run_until_complete()
+ for task in all_tasks():
+ if task._callbacks and not task.done():
+ callbacks = [cb for cb, context in task._callbacks]
+ for cb in callbacks:
+ if (
+ cb is _run_until_complete_cb
+ or getattr(cb, "__module__", None) == "uvloop.loop"
+ ):
+ _root_task.set(task)
+ return task
+
+ # Look up the topmost task in the AnyIO task tree, if possible
+ task = cast(asyncio.Task, current_task())
+ state = _task_states.get(task)
+ if state:
+ cancel_scope = state.cancel_scope
+ while cancel_scope and cancel_scope._parent_scope is not None:
+ cancel_scope = cancel_scope._parent_scope
+
+ if cancel_scope is not None:
+ return cast(asyncio.Task, cancel_scope._host_task)
+
+ return task
+
+
+def get_callable_name(func: Callable) -> str:
+ module = getattr(func, "__module__", None)
+ qualname = getattr(func, "__qualname__", None)
+ return ".".join([x for x in (module, qualname) if x])
+
+
+#
+# Event loop
+#
+
+_run_vars: WeakKeyDictionary[asyncio.AbstractEventLoop, Any] = WeakKeyDictionary()
+
+
+def _task_started(task: asyncio.Task) -> bool:
+ """Return ``True`` if the task has been started and has not finished."""
+ try:
+ return getcoroutinestate(task.get_coro()) in (CORO_RUNNING, CORO_SUSPENDED)
+ except AttributeError:
+ # task coro is async_genenerator_asend https://bugs.python.org/issue37771
+ raise Exception(f"Cannot determine if task {task} has started or not") from None
+
+
+#
+# Timeouts and cancellation
+#
+
+
+class CancelScope(BaseCancelScope):
+ def __new__(
+ cls, *, deadline: float = math.inf, shield: bool = False
+ ) -> CancelScope:
+ return object.__new__(cls)
+
+ def __init__(self, deadline: float = math.inf, shield: bool = False):
+ self._deadline = deadline
+ self._shield = shield
+ self._parent_scope: CancelScope | None = None
+ self._child_scopes: set[CancelScope] = set()
+ self._cancel_called = False
+ self._cancelled_caught = False
+ self._active = False
+ self._timeout_handle: asyncio.TimerHandle | None = None
+ self._cancel_handle: asyncio.Handle | None = None
+ self._tasks: set[asyncio.Task] = set()
+ self._host_task: asyncio.Task | None = None
+ self._cancel_calls: int = 0
+ self._cancelling: int | None = None
+
+ def __enter__(self) -> CancelScope:
+ if self._active:
+ raise RuntimeError(
+ "Each CancelScope may only be used for a single 'with' block"
+ )
+
+ self._host_task = host_task = cast(asyncio.Task, current_task())
+ self._tasks.add(host_task)
+ try:
+ task_state = _task_states[host_task]
+ except KeyError:
+ task_state = TaskState(None, self)
+ _task_states[host_task] = task_state
+ else:
+ self._parent_scope = task_state.cancel_scope
+ task_state.cancel_scope = self
+ if self._parent_scope is not None:
+ self._parent_scope._child_scopes.add(self)
+ self._parent_scope._tasks.remove(host_task)
+
+ self._timeout()
+ self._active = True
+ if sys.version_info >= (3, 11):
+ self._cancelling = self._host_task.cancelling()
+
+ # Start cancelling the host task if the scope was cancelled before entering
+ if self._cancel_called:
+ self._deliver_cancellation(self)
+
+ return self
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
+ if not self._active:
+ raise RuntimeError("This cancel scope is not active")
+ if current_task() is not self._host_task:
+ raise RuntimeError(
+ "Attempted to exit cancel scope in a different task than it was "
+ "entered in"
+ )
+
+ assert self._host_task is not None
+ host_task_state = _task_states.get(self._host_task)
+ if host_task_state is None or host_task_state.cancel_scope is not self:
+ raise RuntimeError(
+ "Attempted to exit a cancel scope that isn't the current tasks's "
+ "current cancel scope"
+ )
+
+ self._active = False
+ if self._timeout_handle:
+ self._timeout_handle.cancel()
+ self._timeout_handle = None
+
+ self._tasks.remove(self._host_task)
+ if self._parent_scope is not None:
+ self._parent_scope._child_scopes.remove(self)
+ self._parent_scope._tasks.add(self._host_task)
+
+ host_task_state.cancel_scope = self._parent_scope
+
+ # Restart the cancellation effort in the closest directly cancelled parent
+ # scope if this one was shielded
+ self._restart_cancellation_in_parent()
+
+ if self._cancel_called and exc_val is not None:
+ for exc in iterate_exceptions(exc_val):
+ if isinstance(exc, CancelledError):
+ self._cancelled_caught = self._uncancel(exc)
+ if self._cancelled_caught:
+ break
+
+ return self._cancelled_caught
+
+ return None
+
+ def _uncancel(self, cancelled_exc: CancelledError) -> bool:
+ if sys.version_info < (3, 9) or self._host_task is None:
+ self._cancel_calls = 0
+ return True
+
+ # Undo all cancellations done by this scope
+ if self._cancelling is not None:
+ while self._cancel_calls:
+ self._cancel_calls -= 1
+ if self._host_task.uncancel() <= self._cancelling:
+ return True
+
+ self._cancel_calls = 0
+ return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args
+
+ def _timeout(self) -> None:
+ if self._deadline != math.inf:
+ loop = get_running_loop()
+ if loop.time() >= self._deadline:
+ self.cancel()
+ else:
+ self._timeout_handle = loop.call_at(self._deadline, self._timeout)
+
+ def _deliver_cancellation(self, origin: CancelScope) -> bool:
+ """
+ Deliver cancellation to directly contained tasks and nested cancel scopes.
+
+ Schedule another run at the end if we still have tasks eligible for
+ cancellation.
+
+ :param origin: the cancel scope that originated the cancellation
+ :return: ``True`` if the delivery needs to be retried on the next cycle
+
+ """
+ should_retry = False
+ current = current_task()
+ for task in self._tasks:
+ if task._must_cancel: # type: ignore[attr-defined]
+ continue
+
+ # The task is eligible for cancellation if it has started
+ should_retry = True
+ if task is not current and (task is self._host_task or _task_started(task)):
+ waiter = task._fut_waiter # type: ignore[attr-defined]
+ if not isinstance(waiter, asyncio.Future) or not waiter.done():
+ self._cancel_calls += 1
+ if sys.version_info >= (3, 9):
+ task.cancel(f"Cancelled by cancel scope {id(origin):x}")
+ else:
+ task.cancel()
+
+ # Deliver cancellation to child scopes that aren't shielded or running their own
+ # cancellation callbacks
+ for scope in self._child_scopes:
+ if not scope._shield and not scope.cancel_called:
+ should_retry = scope._deliver_cancellation(origin) or should_retry
+
+ # Schedule another callback if there are still tasks left
+ if origin is self:
+ if should_retry:
+ self._cancel_handle = get_running_loop().call_soon(
+ self._deliver_cancellation, origin
+ )
+ else:
+ self._cancel_handle = None
+
+ return should_retry
+
+ def _restart_cancellation_in_parent(self) -> None:
+ """
+ Restart the cancellation effort in the closest directly cancelled parent scope.
+
+ """
+ scope = self._parent_scope
+ while scope is not None:
+ if scope._cancel_called:
+ if scope._cancel_handle is None:
+ scope._deliver_cancellation(scope)
+
+ break
+
+ # No point in looking beyond any shielded scope
+ if scope._shield:
+ break
+
+ scope = scope._parent_scope
+
+ def _parent_cancelled(self) -> bool:
+ # Check whether any parent has been cancelled
+ cancel_scope = self._parent_scope
+ while cancel_scope is not None and not cancel_scope._shield:
+ if cancel_scope._cancel_called:
+ return True
+ else:
+ cancel_scope = cancel_scope._parent_scope
+
+ return False
+
+ def cancel(self) -> None:
+ if not self._cancel_called:
+ if self._timeout_handle:
+ self._timeout_handle.cancel()
+ self._timeout_handle = None
+
+ self._cancel_called = True
+ if self._host_task is not None:
+ self._deliver_cancellation(self)
+
+ @property
+ def deadline(self) -> float:
+ return self._deadline
+
+ @deadline.setter
+ def deadline(self, value: float) -> None:
+ self._deadline = float(value)
+ if self._timeout_handle is not None:
+ self._timeout_handle.cancel()
+ self._timeout_handle = None
+
+ if self._active and not self._cancel_called:
+ self._timeout()
+
+ @property
+ def cancel_called(self) -> bool:
+ return self._cancel_called
+
+ @property
+ def cancelled_caught(self) -> bool:
+ return self._cancelled_caught
+
+ @property
+ def shield(self) -> bool:
+ return self._shield
+
+ @shield.setter
+ def shield(self, value: bool) -> None:
+ if self._shield != value:
+ self._shield = value
+ if not value:
+ self._restart_cancellation_in_parent()
+
+
+#
+# Task states
+#
+
+
+class TaskState:
+ """
+ Encapsulates auxiliary task information that cannot be added to the Task instance
+ itself because there are no guarantees about its implementation.
+ """
+
+ __slots__ = "parent_id", "cancel_scope"
+
+ def __init__(self, parent_id: int | None, cancel_scope: CancelScope | None):
+ self.parent_id = parent_id
+ self.cancel_scope = cancel_scope
+
+
+_task_states = WeakKeyDictionary() # type: WeakKeyDictionary[asyncio.Task, TaskState]
+
+
+#
+# Task groups
+#
+
+
+class _AsyncioTaskStatus(abc.TaskStatus):
+ def __init__(self, future: asyncio.Future, parent_id: int):
+ self._future = future
+ self._parent_id = parent_id
+
+ def started(self, value: T_contra | None = None) -> None:
+ try:
+ self._future.set_result(value)
+ except asyncio.InvalidStateError:
+ raise RuntimeError(
+ "called 'started' twice on the same task status"
+ ) from None
+
+ task = cast(asyncio.Task, current_task())
+ _task_states[task].parent_id = self._parent_id
+
+
+def iterate_exceptions(
+ exception: BaseException,
+) -> Generator[BaseException, None, None]:
+ if isinstance(exception, BaseExceptionGroup):
+ for exc in exception.exceptions:
+ yield from iterate_exceptions(exc)
+ else:
+ yield exception
+
+
+class TaskGroup(abc.TaskGroup):
+ def __init__(self) -> None:
+ self.cancel_scope: CancelScope = CancelScope()
+ self._active = False
+ self._exceptions: list[BaseException] = []
+ self._tasks: set[asyncio.Task] = set()
+
+ async def __aenter__(self) -> TaskGroup:
+ self.cancel_scope.__enter__()
+ self._active = True
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
+ ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb)
+ if exc_val is not None:
+ self.cancel_scope.cancel()
+ if not isinstance(exc_val, CancelledError):
+ self._exceptions.append(exc_val)
+
+ cancelled_exc_while_waiting_tasks: CancelledError | None = None
+ while self._tasks:
+ try:
+ await asyncio.wait(self._tasks)
+ except CancelledError as exc:
+ # This task was cancelled natively; reraise the CancelledError later
+ # unless this task was already interrupted by another exception
+ self.cancel_scope.cancel()
+ if cancelled_exc_while_waiting_tasks is None:
+ cancelled_exc_while_waiting_tasks = exc
+
+ self._active = False
+ if self._exceptions:
+ raise BaseExceptionGroup(
+ "unhandled errors in a TaskGroup", self._exceptions
+ )
+
+ # Raise the CancelledError received while waiting for child tasks to exit,
+ # unless the context manager itself was previously exited with another
+ # exception, or if any of the child tasks raised an exception other than
+ # CancelledError
+ if cancelled_exc_while_waiting_tasks:
+ if exc_val is None or ignore_exception:
+ raise cancelled_exc_while_waiting_tasks
+
+ return ignore_exception
+
+ def _spawn(
+ self,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
+ args: tuple[Unpack[PosArgsT]],
+ name: object,
+ task_status_future: asyncio.Future | None = None,
+ ) -> asyncio.Task:
+ def task_done(_task: asyncio.Task) -> None:
+ task_state = _task_states[_task]
+ assert task_state.cancel_scope is not None
+ assert _task in task_state.cancel_scope._tasks
+ task_state.cancel_scope._tasks.remove(_task)
+ self._tasks.remove(task)
+ del _task_states[_task]
+
+ try:
+ exc = _task.exception()
+ except CancelledError as e:
+ while isinstance(e.__context__, CancelledError):
+ e = e.__context__
+
+ exc = e
+
+ if exc is not None:
+ if task_status_future is None or task_status_future.done():
+ if not isinstance(exc, CancelledError):
+ self._exceptions.append(exc)
+
+ if not self.cancel_scope._parent_cancelled():
+ self.cancel_scope.cancel()
+ else:
+ task_status_future.set_exception(exc)
+ elif task_status_future is not None and not task_status_future.done():
+ task_status_future.set_exception(
+ RuntimeError("Child exited without calling task_status.started()")
+ )
+
+ if not self._active:
+ raise RuntimeError(
+ "This task group is not active; no new tasks can be started."
+ )
+
+ kwargs = {}
+ if task_status_future:
+ parent_id = id(current_task())
+ kwargs["task_status"] = _AsyncioTaskStatus(
+ task_status_future, id(self.cancel_scope._host_task)
+ )
+ else:
+ parent_id = id(self.cancel_scope._host_task)
+
+ coro = func(*args, **kwargs)
+ if not iscoroutine(coro):
+ prefix = f"{func.__module__}." if hasattr(func, "__module__") else ""
+ raise TypeError(
+ f"Expected {prefix}{func.__qualname__}() to return a coroutine, but "
+ f"the return value ({coro!r}) is not a coroutine object"
+ )
+
+ name = get_callable_name(func) if name is None else str(name)
+ task = create_task(coro, name=name)
+ task.add_done_callback(task_done)
+
+ # Make the spawned task inherit the task group's cancel scope
+ _task_states[task] = TaskState(
+ parent_id=parent_id, cancel_scope=self.cancel_scope
+ )
+ self.cancel_scope._tasks.add(task)
+ self._tasks.add(task)
+ return task
+
+ def start_soon(
+ self,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
+ *args: Unpack[PosArgsT],
+ name: object = None,
+ ) -> None:
+ self._spawn(func, args, name)
+
+ async def start(
+ self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
+ ) -> Any:
+ future: asyncio.Future = asyncio.Future()
+ task = self._spawn(func, args, name, future)
+
+ # If the task raises an exception after sending a start value without a switch
+ # point between, the task group is cancelled and this method never proceeds to
+ # process the completed future. That's why we have to have a shielded cancel
+ # scope here.
+ try:
+ return await future
+ except CancelledError:
+ # Cancel the task and wait for it to exit before returning
+ task.cancel()
+ with CancelScope(shield=True), suppress(CancelledError):
+ await task
+
+ raise
+
+
+#
+# Threads
+#
+
+_Retval_Queue_Type = Tuple[Optional[T_Retval], Optional[BaseException]]
+
+
+class WorkerThread(Thread):
+ MAX_IDLE_TIME = 10 # seconds
+
+ def __init__(
+ self,
+ root_task: asyncio.Task,
+ workers: set[WorkerThread],
+ idle_workers: deque[WorkerThread],
+ ):
+ super().__init__(name="AnyIO worker thread")
+ self.root_task = root_task
+ self.workers = workers
+ self.idle_workers = idle_workers
+ self.loop = root_task._loop
+ self.queue: Queue[
+ tuple[Context, Callable, tuple, asyncio.Future, CancelScope] | None
+ ] = Queue(2)
+ self.idle_since = AsyncIOBackend.current_time()
+ self.stopping = False
+
+ def _report_result(
+ self, future: asyncio.Future, result: Any, exc: BaseException | None
+ ) -> None:
+ self.idle_since = AsyncIOBackend.current_time()
+ if not self.stopping:
+ self.idle_workers.append(self)
+
+ if not future.cancelled():
+ if exc is not None:
+ if isinstance(exc, StopIteration):
+ new_exc = RuntimeError("coroutine raised StopIteration")
+ new_exc.__cause__ = exc
+ exc = new_exc
+
+ future.set_exception(exc)
+ else:
+ future.set_result(result)
+
+ def run(self) -> None:
+ with claim_worker_thread(AsyncIOBackend, self.loop):
+ while True:
+ item = self.queue.get()
+ if item is None:
+ # Shutdown command received
+ return
+
+ context, func, args, future, cancel_scope = item
+ if not future.cancelled():
+ result = None
+ exception: BaseException | None = None
+ threadlocals.current_cancel_scope = cancel_scope
+ try:
+ result = context.run(func, *args)
+ except BaseException as exc:
+ exception = exc
+ finally:
+ del threadlocals.current_cancel_scope
+
+ if not self.loop.is_closed():
+ self.loop.call_soon_threadsafe(
+ self._report_result, future, result, exception
+ )
+
+ self.queue.task_done()
+
+ def stop(self, f: asyncio.Task | None = None) -> None:
+ self.stopping = True
+ self.queue.put_nowait(None)
+ self.workers.discard(self)
+ try:
+ self.idle_workers.remove(self)
+ except ValueError:
+ pass
+
+
+_threadpool_idle_workers: RunVar[deque[WorkerThread]] = RunVar(
+ "_threadpool_idle_workers"
+)
+_threadpool_workers: RunVar[set[WorkerThread]] = RunVar("_threadpool_workers")
+
+
+class BlockingPortal(abc.BlockingPortal):
+ def __new__(cls) -> BlockingPortal:
+ return object.__new__(cls)
+
+ def __init__(self) -> None:
+ super().__init__()
+ self._loop = get_running_loop()
+
+ def _spawn_task_from_thread(
+ self,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
+ args: tuple[Unpack[PosArgsT]],
+ kwargs: dict[str, Any],
+ name: object,
+ future: Future[T_Retval],
+ ) -> None:
+ AsyncIOBackend.run_sync_from_thread(
+ partial(self._task_group.start_soon, name=name),
+ (self._call_func, func, args, kwargs, future),
+ self._loop,
+ )
+
+
+#
+# Subprocesses
+#
+
+
+@dataclass(eq=False)
+class StreamReaderWrapper(abc.ByteReceiveStream):
+ _stream: asyncio.StreamReader
+
+ async def receive(self, max_bytes: int = 65536) -> bytes:
+ data = await self._stream.read(max_bytes)
+ if data:
+ return data
+ else:
+ raise EndOfStream
+
+ async def aclose(self) -> None:
+ self._stream.feed_eof()
+ await AsyncIOBackend.checkpoint()
+
+
+@dataclass(eq=False)
+class StreamWriterWrapper(abc.ByteSendStream):
+ _stream: asyncio.StreamWriter
+
+ async def send(self, item: bytes) -> None:
+ self._stream.write(item)
+ await self._stream.drain()
+
+ async def aclose(self) -> None:
+ self._stream.close()
+ await AsyncIOBackend.checkpoint()
+
+
+@dataclass(eq=False)
+class Process(abc.Process):
+ _process: asyncio.subprocess.Process
+ _stdin: StreamWriterWrapper | None
+ _stdout: StreamReaderWrapper | None
+ _stderr: StreamReaderWrapper | None
+
+ async def aclose(self) -> None:
+ with CancelScope(shield=True):
+ if self._stdin:
+ await self._stdin.aclose()
+ if self._stdout:
+ await self._stdout.aclose()
+ if self._stderr:
+ await self._stderr.aclose()
+
+ try:
+ await self.wait()
+ except BaseException:
+ self.kill()
+ with CancelScope(shield=True):
+ await self.wait()
+
+ raise
+
+ async def wait(self) -> int:
+ return await self._process.wait()
+
+ def terminate(self) -> None:
+ self._process.terminate()
+
+ def kill(self) -> None:
+ self._process.kill()
+
+ def send_signal(self, signal: int) -> None:
+ self._process.send_signal(signal)
+
+ @property
+ def pid(self) -> int:
+ return self._process.pid
+
+ @property
+ def returncode(self) -> int | None:
+ return self._process.returncode
+
+ @property
+ def stdin(self) -> abc.ByteSendStream | None:
+ return self._stdin
+
+ @property
+ def stdout(self) -> abc.ByteReceiveStream | None:
+ return self._stdout
+
+ @property
+ def stderr(self) -> abc.ByteReceiveStream | None:
+ return self._stderr
+
+
+def _forcibly_shutdown_process_pool_on_exit(
+ workers: set[Process], _task: object
+) -> None:
+ """
+ Forcibly shuts down worker processes belonging to this event loop."""
+ child_watcher: asyncio.AbstractChildWatcher | None = None
+ if sys.version_info < (3, 12):
+ try:
+ child_watcher = asyncio.get_event_loop_policy().get_child_watcher()
+ except NotImplementedError:
+ pass
+
+ # Close as much as possible (w/o async/await) to avoid warnings
+ for process in workers:
+ if process.returncode is None:
+ continue
+
+ process._stdin._stream._transport.close() # type: ignore[union-attr]
+ process._stdout._stream._transport.close() # type: ignore[union-attr]
+ process._stderr._stream._transport.close() # type: ignore[union-attr]
+ process.kill()
+ if child_watcher:
+ child_watcher.remove_child_handler(process.pid)
+
+
+async def _shutdown_process_pool_on_exit(workers: set[abc.Process]) -> None:
+ """
+ Shuts down worker processes belonging to this event loop.
+
+ NOTE: this only works when the event loop was started using asyncio.run() or
+ anyio.run().
+
+ """
+ process: abc.Process
+ try:
+ await sleep(math.inf)
+ except asyncio.CancelledError:
+ for process in workers:
+ if process.returncode is None:
+ process.kill()
+
+ for process in workers:
+ await process.aclose()
+
+
+#
+# Sockets and networking
+#
+
+
+class StreamProtocol(asyncio.Protocol):
+ read_queue: deque[bytes]
+ read_event: asyncio.Event
+ write_event: asyncio.Event
+ exception: Exception | None = None
+
+ def connection_made(self, transport: asyncio.BaseTransport) -> None:
+ self.read_queue = deque()
+ self.read_event = asyncio.Event()
+ self.write_event = asyncio.Event()
+ self.write_event.set()
+ cast(asyncio.Transport, transport).set_write_buffer_limits(0)
+
+ def connection_lost(self, exc: Exception | None) -> None:
+ if exc:
+ self.exception = BrokenResourceError()
+ self.exception.__cause__ = exc
+
+ self.read_event.set()
+ self.write_event.set()
+
+ def data_received(self, data: bytes) -> None:
+ self.read_queue.append(data)
+ self.read_event.set()
+
+ def eof_received(self) -> bool | None:
+ self.read_event.set()
+ return True
+
+ def pause_writing(self) -> None:
+ self.write_event = asyncio.Event()
+
+ def resume_writing(self) -> None:
+ self.write_event.set()
+
+
+class DatagramProtocol(asyncio.DatagramProtocol):
+ read_queue: deque[tuple[bytes, IPSockAddrType]]
+ read_event: asyncio.Event
+ write_event: asyncio.Event
+ exception: Exception | None = None
+
+ def connection_made(self, transport: asyncio.BaseTransport) -> None:
+ self.read_queue = deque(maxlen=100) # arbitrary value
+ self.read_event = asyncio.Event()
+ self.write_event = asyncio.Event()
+ self.write_event.set()
+
+ def connection_lost(self, exc: Exception | None) -> None:
+ self.read_event.set()
+ self.write_event.set()
+
+ def datagram_received(self, data: bytes, addr: IPSockAddrType) -> None:
+ addr = convert_ipv6_sockaddr(addr)
+ self.read_queue.append((data, addr))
+ self.read_event.set()
+
+ def error_received(self, exc: Exception) -> None:
+ self.exception = exc
+
+ def pause_writing(self) -> None:
+ self.write_event.clear()
+
+ def resume_writing(self) -> None:
+ self.write_event.set()
+
+
+class SocketStream(abc.SocketStream):
+ def __init__(self, transport: asyncio.Transport, protocol: StreamProtocol):
+ self._transport = transport
+ self._protocol = protocol
+ self._receive_guard = ResourceGuard("reading from")
+ self._send_guard = ResourceGuard("writing to")
+ self._closed = False
+
+ @property
+ def _raw_socket(self) -> socket.socket:
+ return self._transport.get_extra_info("socket")
+
+ async def receive(self, max_bytes: int = 65536) -> bytes:
+ with self._receive_guard:
+ await AsyncIOBackend.checkpoint()
+
+ if (
+ not self._protocol.read_event.is_set()
+ and not self._transport.is_closing()
+ ):
+ self._transport.resume_reading()
+ await self._protocol.read_event.wait()
+ self._transport.pause_reading()
+
+ try:
+ chunk = self._protocol.read_queue.popleft()
+ except IndexError:
+ if self._closed:
+ raise ClosedResourceError from None
+ elif self._protocol.exception:
+ raise self._protocol.exception from None
+ else:
+ raise EndOfStream from None
+
+ if len(chunk) > max_bytes:
+ # Split the oversized chunk
+ chunk, leftover = chunk[:max_bytes], chunk[max_bytes:]
+ self._protocol.read_queue.appendleft(leftover)
+
+ # If the read queue is empty, clear the flag so that the next call will
+ # block until data is available
+ if not self._protocol.read_queue:
+ self._protocol.read_event.clear()
+
+ return chunk
+
+ async def send(self, item: bytes) -> None:
+ with self._send_guard:
+ await AsyncIOBackend.checkpoint()
+
+ if self._closed:
+ raise ClosedResourceError
+ elif self._protocol.exception is not None:
+ raise self._protocol.exception
+
+ try:
+ self._transport.write(item)
+ except RuntimeError as exc:
+ if self._transport.is_closing():
+ raise BrokenResourceError from exc
+ else:
+ raise
+
+ await self._protocol.write_event.wait()
+
+ async def send_eof(self) -> None:
+ try:
+ self._transport.write_eof()
+ except OSError:
+ pass
+
+ async def aclose(self) -> None:
+ if not self._transport.is_closing():
+ self._closed = True
+ try:
+ self._transport.write_eof()
+ except OSError:
+ pass
+
+ self._transport.close()
+ await sleep(0)
+ self._transport.abort()
+
+
+class _RawSocketMixin:
+ _receive_future: asyncio.Future | None = None
+ _send_future: asyncio.Future | None = None
+ _closing = False
+
+ def __init__(self, raw_socket: socket.socket):
+ self.__raw_socket = raw_socket
+ self._receive_guard = ResourceGuard("reading from")
+ self._send_guard = ResourceGuard("writing to")
+
+ @property
+ def _raw_socket(self) -> socket.socket:
+ return self.__raw_socket
+
+ def _wait_until_readable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
+ def callback(f: object) -> None:
+ del self._receive_future
+ loop.remove_reader(self.__raw_socket)
+
+ f = self._receive_future = asyncio.Future()
+ loop.add_reader(self.__raw_socket, f.set_result, None)
+ f.add_done_callback(callback)
+ return f
+
+ def _wait_until_writable(self, loop: asyncio.AbstractEventLoop) -> asyncio.Future:
+ def callback(f: object) -> None:
+ del self._send_future
+ loop.remove_writer(self.__raw_socket)
+
+ f = self._send_future = asyncio.Future()
+ loop.add_writer(self.__raw_socket, f.set_result, None)
+ f.add_done_callback(callback)
+ return f
+
+ async def aclose(self) -> None:
+ if not self._closing:
+ self._closing = True
+ if self.__raw_socket.fileno() != -1:
+ self.__raw_socket.close()
+
+ if self._receive_future:
+ self._receive_future.set_result(None)
+ if self._send_future:
+ self._send_future.set_result(None)
+
+
+class UNIXSocketStream(_RawSocketMixin, abc.UNIXSocketStream):
+ async def send_eof(self) -> None:
+ with self._send_guard:
+ self._raw_socket.shutdown(socket.SHUT_WR)
+
+ async def receive(self, max_bytes: int = 65536) -> bytes:
+ loop = get_running_loop()
+ await AsyncIOBackend.checkpoint()
+ with self._receive_guard:
+ while True:
+ try:
+ data = self._raw_socket.recv(max_bytes)
+ except BlockingIOError:
+ await self._wait_until_readable(loop)
+ except OSError as exc:
+ if self._closing:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from exc
+ else:
+ if not data:
+ raise EndOfStream
+
+ return data
+
+ async def send(self, item: bytes) -> None:
+ loop = get_running_loop()
+ await AsyncIOBackend.checkpoint()
+ with self._send_guard:
+ view = memoryview(item)
+ while view:
+ try:
+ bytes_sent = self._raw_socket.send(view)
+ except BlockingIOError:
+ await self._wait_until_writable(loop)
+ except OSError as exc:
+ if self._closing:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from exc
+ else:
+ view = view[bytes_sent:]
+
+ async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
+ if not isinstance(msglen, int) or msglen < 0:
+ raise ValueError("msglen must be a non-negative integer")
+ if not isinstance(maxfds, int) or maxfds < 1:
+ raise ValueError("maxfds must be a positive integer")
+
+ loop = get_running_loop()
+ fds = array.array("i")
+ await AsyncIOBackend.checkpoint()
+ with self._receive_guard:
+ while True:
+ try:
+ message, ancdata, flags, addr = self._raw_socket.recvmsg(
+ msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
+ )
+ except BlockingIOError:
+ await self._wait_until_readable(loop)
+ except OSError as exc:
+ if self._closing:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from exc
+ else:
+ if not message and not ancdata:
+ raise EndOfStream
+
+ break
+
+ for cmsg_level, cmsg_type, cmsg_data in ancdata:
+ if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
+ raise RuntimeError(
+ f"Received unexpected ancillary data; message = {message!r}, "
+ f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
+ )
+
+ fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
+
+ return message, list(fds)
+
+ async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
+ if not message:
+ raise ValueError("message must not be empty")
+ if not fds:
+ raise ValueError("fds must not be empty")
+
+ loop = get_running_loop()
+ filenos: list[int] = []
+ for fd in fds:
+ if isinstance(fd, int):
+ filenos.append(fd)
+ elif isinstance(fd, IOBase):
+ filenos.append(fd.fileno())
+
+ fdarray = array.array("i", filenos)
+ await AsyncIOBackend.checkpoint()
+ with self._send_guard:
+ while True:
+ try:
+ # The ignore can be removed after mypy picks up
+ # https://github.com/python/typeshed/pull/5545
+ self._raw_socket.sendmsg(
+ [message], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fdarray)]
+ )
+ break
+ except BlockingIOError:
+ await self._wait_until_writable(loop)
+ except OSError as exc:
+ if self._closing:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from exc
+
+
+class TCPSocketListener(abc.SocketListener):
+ _accept_scope: CancelScope | None = None
+ _closed = False
+
+ def __init__(self, raw_socket: socket.socket):
+ self.__raw_socket = raw_socket
+ self._loop = cast(asyncio.BaseEventLoop, get_running_loop())
+ self._accept_guard = ResourceGuard("accepting connections from")
+
+ @property
+ def _raw_socket(self) -> socket.socket:
+ return self.__raw_socket
+
+ async def accept(self) -> abc.SocketStream:
+ if self._closed:
+ raise ClosedResourceError
+
+ with self._accept_guard:
+ await AsyncIOBackend.checkpoint()
+ with CancelScope() as self._accept_scope:
+ try:
+ client_sock, _addr = await self._loop.sock_accept(self._raw_socket)
+ except asyncio.CancelledError:
+ # Workaround for https://bugs.python.org/issue41317
+ try:
+ self._loop.remove_reader(self._raw_socket)
+ except (ValueError, NotImplementedError):
+ pass
+
+ if self._closed:
+ raise ClosedResourceError from None
+
+ raise
+ finally:
+ self._accept_scope = None
+
+ client_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ transport, protocol = await self._loop.connect_accepted_socket(
+ StreamProtocol, client_sock
+ )
+ return SocketStream(transport, protocol)
+
+ async def aclose(self) -> None:
+ if self._closed:
+ return
+
+ self._closed = True
+ if self._accept_scope:
+ # Workaround for https://bugs.python.org/issue41317
+ try:
+ self._loop.remove_reader(self._raw_socket)
+ except (ValueError, NotImplementedError):
+ pass
+
+ self._accept_scope.cancel()
+ await sleep(0)
+
+ self._raw_socket.close()
+
+
+class UNIXSocketListener(abc.SocketListener):
+ def __init__(self, raw_socket: socket.socket):
+ self.__raw_socket = raw_socket
+ self._loop = get_running_loop()
+ self._accept_guard = ResourceGuard("accepting connections from")
+ self._closed = False
+
+ async def accept(self) -> abc.SocketStream:
+ await AsyncIOBackend.checkpoint()
+ with self._accept_guard:
+ while True:
+ try:
+ client_sock, _ = self.__raw_socket.accept()
+ client_sock.setblocking(False)
+ return UNIXSocketStream(client_sock)
+ except BlockingIOError:
+ f: asyncio.Future = asyncio.Future()
+ self._loop.add_reader(self.__raw_socket, f.set_result, None)
+ f.add_done_callback(
+ lambda _: self._loop.remove_reader(self.__raw_socket)
+ )
+ await f
+ except OSError as exc:
+ if self._closed:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from exc
+
+ async def aclose(self) -> None:
+ self._closed = True
+ self.__raw_socket.close()
+
+ @property
+ def _raw_socket(self) -> socket.socket:
+ return self.__raw_socket
+
+
+class UDPSocket(abc.UDPSocket):
+ def __init__(
+ self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
+ ):
+ self._transport = transport
+ self._protocol = protocol
+ self._receive_guard = ResourceGuard("reading from")
+ self._send_guard = ResourceGuard("writing to")
+ self._closed = False
+
+ @property
+ def _raw_socket(self) -> socket.socket:
+ return self._transport.get_extra_info("socket")
+
+ async def aclose(self) -> None:
+ if not self._transport.is_closing():
+ self._closed = True
+ self._transport.close()
+
+ async def receive(self) -> tuple[bytes, IPSockAddrType]:
+ with self._receive_guard:
+ await AsyncIOBackend.checkpoint()
+
+ # If the buffer is empty, ask for more data
+ if not self._protocol.read_queue and not self._transport.is_closing():
+ self._protocol.read_event.clear()
+ await self._protocol.read_event.wait()
+
+ try:
+ return self._protocol.read_queue.popleft()
+ except IndexError:
+ if self._closed:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from None
+
+ async def send(self, item: UDPPacketType) -> None:
+ with self._send_guard:
+ await AsyncIOBackend.checkpoint()
+ await self._protocol.write_event.wait()
+ if self._closed:
+ raise ClosedResourceError
+ elif self._transport.is_closing():
+ raise BrokenResourceError
+ else:
+ self._transport.sendto(*item)
+
+
+class ConnectedUDPSocket(abc.ConnectedUDPSocket):
+ def __init__(
+ self, transport: asyncio.DatagramTransport, protocol: DatagramProtocol
+ ):
+ self._transport = transport
+ self._protocol = protocol
+ self._receive_guard = ResourceGuard("reading from")
+ self._send_guard = ResourceGuard("writing to")
+ self._closed = False
+
+ @property
+ def _raw_socket(self) -> socket.socket:
+ return self._transport.get_extra_info("socket")
+
+ async def aclose(self) -> None:
+ if not self._transport.is_closing():
+ self._closed = True
+ self._transport.close()
+
+ async def receive(self) -> bytes:
+ with self._receive_guard:
+ await AsyncIOBackend.checkpoint()
+
+ # If the buffer is empty, ask for more data
+ if not self._protocol.read_queue and not self._transport.is_closing():
+ self._protocol.read_event.clear()
+ await self._protocol.read_event.wait()
+
+ try:
+ packet = self._protocol.read_queue.popleft()
+ except IndexError:
+ if self._closed:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from None
+
+ return packet[0]
+
+ async def send(self, item: bytes) -> None:
+ with self._send_guard:
+ await AsyncIOBackend.checkpoint()
+ await self._protocol.write_event.wait()
+ if self._closed:
+ raise ClosedResourceError
+ elif self._transport.is_closing():
+ raise BrokenResourceError
+ else:
+ self._transport.sendto(item)
+
+
+class UNIXDatagramSocket(_RawSocketMixin, abc.UNIXDatagramSocket):
+ async def receive(self) -> UNIXDatagramPacketType:
+ loop = get_running_loop()
+ await AsyncIOBackend.checkpoint()
+ with self._receive_guard:
+ while True:
+ try:
+ data = self._raw_socket.recvfrom(65536)
+ except BlockingIOError:
+ await self._wait_until_readable(loop)
+ except OSError as exc:
+ if self._closing:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from exc
+ else:
+ return data
+
+ async def send(self, item: UNIXDatagramPacketType) -> None:
+ loop = get_running_loop()
+ await AsyncIOBackend.checkpoint()
+ with self._send_guard:
+ while True:
+ try:
+ self._raw_socket.sendto(*item)
+ except BlockingIOError:
+ await self._wait_until_writable(loop)
+ except OSError as exc:
+ if self._closing:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from exc
+ else:
+ return
+
+
+class ConnectedUNIXDatagramSocket(_RawSocketMixin, abc.ConnectedUNIXDatagramSocket):
+ async def receive(self) -> bytes:
+ loop = get_running_loop()
+ await AsyncIOBackend.checkpoint()
+ with self._receive_guard:
+ while True:
+ try:
+ data = self._raw_socket.recv(65536)
+ except BlockingIOError:
+ await self._wait_until_readable(loop)
+ except OSError as exc:
+ if self._closing:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from exc
+ else:
+ return data
+
+ async def send(self, item: bytes) -> None:
+ loop = get_running_loop()
+ await AsyncIOBackend.checkpoint()
+ with self._send_guard:
+ while True:
+ try:
+ self._raw_socket.send(item)
+ except BlockingIOError:
+ await self._wait_until_writable(loop)
+ except OSError as exc:
+ if self._closing:
+ raise ClosedResourceError from None
+ else:
+ raise BrokenResourceError from exc
+ else:
+ return
+
+
+_read_events: RunVar[dict[Any, asyncio.Event]] = RunVar("read_events")
+_write_events: RunVar[dict[Any, asyncio.Event]] = RunVar("write_events")
+
+
+#
+# Synchronization
+#
+
+
+class Event(BaseEvent):
+ def __new__(cls) -> Event:
+ return object.__new__(cls)
+
+ def __init__(self) -> None:
+ self._event = asyncio.Event()
+
+ def set(self) -> None:
+ self._event.set()
+
+ def is_set(self) -> bool:
+ return self._event.is_set()
+
+ async def wait(self) -> None:
+ if self.is_set():
+ await AsyncIOBackend.checkpoint()
+ else:
+ await self._event.wait()
+
+ def statistics(self) -> EventStatistics:
+ return EventStatistics(len(self._event._waiters)) # type: ignore[attr-defined]
+
+
+class CapacityLimiter(BaseCapacityLimiter):
+ _total_tokens: float = 0
+
+ def __new__(cls, total_tokens: float) -> CapacityLimiter:
+ return object.__new__(cls)
+
+ def __init__(self, total_tokens: float):
+ self._borrowers: set[Any] = set()
+ self._wait_queue: OrderedDict[Any, asyncio.Event] = OrderedDict()
+ self.total_tokens = total_tokens
+
+ async def __aenter__(self) -> None:
+ await self.acquire()
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ self.release()
+
+ @property
+ def total_tokens(self) -> float:
+ return self._total_tokens
+
+ @total_tokens.setter
+ def total_tokens(self, value: float) -> None:
+ if not isinstance(value, int) and not math.isinf(value):
+ raise TypeError("total_tokens must be an int or math.inf")
+ if value < 1:
+ raise ValueError("total_tokens must be >= 1")
+
+ waiters_to_notify = max(value - self._total_tokens, 0)
+ self._total_tokens = value
+
+ # Notify waiting tasks that they have acquired the limiter
+ while self._wait_queue and waiters_to_notify:
+ event = self._wait_queue.popitem(last=False)[1]
+ event.set()
+ waiters_to_notify -= 1
+
+ @property
+ def borrowed_tokens(self) -> int:
+ return len(self._borrowers)
+
+ @property
+ def available_tokens(self) -> float:
+ return self._total_tokens - len(self._borrowers)
+
+ def acquire_nowait(self) -> None:
+ self.acquire_on_behalf_of_nowait(current_task())
+
+ def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
+ if borrower in self._borrowers:
+ raise RuntimeError(
+ "this borrower is already holding one of this CapacityLimiter's "
+ "tokens"
+ )
+
+ if self._wait_queue or len(self._borrowers) >= self._total_tokens:
+ raise WouldBlock
+
+ self._borrowers.add(borrower)
+
+ async def acquire(self) -> None:
+ return await self.acquire_on_behalf_of(current_task())
+
+ async def acquire_on_behalf_of(self, borrower: object) -> None:
+ await AsyncIOBackend.checkpoint_if_cancelled()
+ try:
+ self.acquire_on_behalf_of_nowait(borrower)
+ except WouldBlock:
+ event = asyncio.Event()
+ self._wait_queue[borrower] = event
+ try:
+ await event.wait()
+ except BaseException:
+ self._wait_queue.pop(borrower, None)
+ raise
+
+ self._borrowers.add(borrower)
+ else:
+ try:
+ await AsyncIOBackend.cancel_shielded_checkpoint()
+ except BaseException:
+ self.release()
+ raise
+
+ def release(self) -> None:
+ self.release_on_behalf_of(current_task())
+
+ def release_on_behalf_of(self, borrower: object) -> None:
+ try:
+ self._borrowers.remove(borrower)
+ except KeyError:
+ raise RuntimeError(
+ "this borrower isn't holding any of this CapacityLimiter's " "tokens"
+ ) from None
+
+ # Notify the next task in line if this limiter has free capacity now
+ if self._wait_queue and len(self._borrowers) < self._total_tokens:
+ event = self._wait_queue.popitem(last=False)[1]
+ event.set()
+
+ def statistics(self) -> CapacityLimiterStatistics:
+ return CapacityLimiterStatistics(
+ self.borrowed_tokens,
+ self.total_tokens,
+ tuple(self._borrowers),
+ len(self._wait_queue),
+ )
+
+
+_default_thread_limiter: RunVar[CapacityLimiter] = RunVar("_default_thread_limiter")
+
+
+#
+# Operating system signals
+#
+
+
+class _SignalReceiver:
+ def __init__(self, signals: tuple[Signals, ...]):
+ self._signals = signals
+ self._loop = get_running_loop()
+ self._signal_queue: deque[Signals] = deque()
+ self._future: asyncio.Future = asyncio.Future()
+ self._handled_signals: set[Signals] = set()
+
+ def _deliver(self, signum: Signals) -> None:
+ self._signal_queue.append(signum)
+ if not self._future.done():
+ self._future.set_result(None)
+
+ def __enter__(self) -> _SignalReceiver:
+ for sig in set(self._signals):
+ self._loop.add_signal_handler(sig, self._deliver, sig)
+ self._handled_signals.add(sig)
+
+ return self
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
+ for sig in self._handled_signals:
+ self._loop.remove_signal_handler(sig)
+ return None
+
+ def __aiter__(self) -> _SignalReceiver:
+ return self
+
+ async def __anext__(self) -> Signals:
+ await AsyncIOBackend.checkpoint()
+ if not self._signal_queue:
+ self._future = asyncio.Future()
+ await self._future
+
+ return self._signal_queue.popleft()
+
+
+#
+# Testing and debugging
+#
+
+
+def _create_task_info(task: asyncio.Task) -> TaskInfo:
+ task_state = _task_states.get(task)
+ if task_state is None:
+ parent_id = None
+ else:
+ parent_id = task_state.parent_id
+
+ return TaskInfo(id(task), parent_id, task.get_name(), task.get_coro())
+
+
+class TestRunner(abc.TestRunner):
+ _send_stream: MemoryObjectSendStream[tuple[Awaitable[Any], asyncio.Future[Any]]]
+
+ def __init__(
+ self,
+ *,
+ debug: bool | None = None,
+ use_uvloop: bool = False,
+ loop_factory: Callable[[], AbstractEventLoop] | None = None,
+ ) -> None:
+ if use_uvloop and loop_factory is None:
+ import uvloop
+
+ loop_factory = uvloop.new_event_loop
+
+ self._runner = Runner(debug=debug, loop_factory=loop_factory)
+ self._exceptions: list[BaseException] = []
+ self._runner_task: asyncio.Task | None = None
+
+ def __enter__(self) -> TestRunner:
+ self._runner.__enter__()
+ self.get_loop().set_exception_handler(self._exception_handler)
+ return self
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ self._runner.__exit__(exc_type, exc_val, exc_tb)
+
+ def get_loop(self) -> AbstractEventLoop:
+ return self._runner.get_loop()
+
+ def _exception_handler(
+ self, loop: asyncio.AbstractEventLoop, context: dict[str, Any]
+ ) -> None:
+ if isinstance(context.get("exception"), Exception):
+ self._exceptions.append(context["exception"])
+ else:
+ loop.default_exception_handler(context)
+
+ def _raise_async_exceptions(self) -> None:
+ # Re-raise any exceptions raised in asynchronous callbacks
+ if self._exceptions:
+ exceptions, self._exceptions = self._exceptions, []
+ if len(exceptions) == 1:
+ raise exceptions[0]
+ elif exceptions:
+ raise BaseExceptionGroup(
+ "Multiple exceptions occurred in asynchronous callbacks", exceptions
+ )
+
+ @staticmethod
+ async def _run_tests_and_fixtures(
+ receive_stream: MemoryObjectReceiveStream[
+ tuple[Awaitable[T_Retval], asyncio.Future[T_Retval]]
+ ],
+ ) -> None:
+ with receive_stream:
+ async for coro, future in receive_stream:
+ try:
+ retval = await coro
+ except BaseException as exc:
+ if not future.cancelled():
+ future.set_exception(exc)
+ else:
+ if not future.cancelled():
+ future.set_result(retval)
+
+ async def _call_in_runner_task(
+ self,
+ func: Callable[P, Awaitable[T_Retval]],
+ *args: P.args,
+ **kwargs: P.kwargs,
+ ) -> T_Retval:
+ if not self._runner_task:
+ self._send_stream, receive_stream = create_memory_object_stream[
+ Tuple[Awaitable[Any], asyncio.Future]
+ ](1)
+ self._runner_task = self.get_loop().create_task(
+ self._run_tests_and_fixtures(receive_stream)
+ )
+
+ coro = func(*args, **kwargs)
+ future: asyncio.Future[T_Retval] = self.get_loop().create_future()
+ self._send_stream.send_nowait((coro, future))
+ return await future
+
+ def run_asyncgen_fixture(
+ self,
+ fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
+ kwargs: dict[str, Any],
+ ) -> Iterable[T_Retval]:
+ asyncgen = fixture_func(**kwargs)
+ fixturevalue: T_Retval = self.get_loop().run_until_complete(
+ self._call_in_runner_task(asyncgen.asend, None)
+ )
+ self._raise_async_exceptions()
+
+ yield fixturevalue
+
+ try:
+ self.get_loop().run_until_complete(
+ self._call_in_runner_task(asyncgen.asend, None)
+ )
+ except StopAsyncIteration:
+ self._raise_async_exceptions()
+ else:
+ self.get_loop().run_until_complete(asyncgen.aclose())
+ raise RuntimeError("Async generator fixture did not stop")
+
+ def run_fixture(
+ self,
+ fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
+ kwargs: dict[str, Any],
+ ) -> T_Retval:
+ retval = self.get_loop().run_until_complete(
+ self._call_in_runner_task(fixture_func, **kwargs)
+ )
+ self._raise_async_exceptions()
+ return retval
+
+ def run_test(
+ self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
+ ) -> None:
+ try:
+ self.get_loop().run_until_complete(
+ self._call_in_runner_task(test_func, **kwargs)
+ )
+ except Exception as exc:
+ self._exceptions.append(exc)
+
+ self._raise_async_exceptions()
+
+
+class AsyncIOBackend(AsyncBackend):
+ @classmethod
+ def run(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
+ args: tuple[Unpack[PosArgsT]],
+ kwargs: dict[str, Any],
+ options: dict[str, Any],
+ ) -> T_Retval:
+ @wraps(func)
+ async def wrapper() -> T_Retval:
+ task = cast(asyncio.Task, current_task())
+ task.set_name(get_callable_name(func))
+ _task_states[task] = TaskState(None, None)
+
+ try:
+ return await func(*args)
+ finally:
+ del _task_states[task]
+
+ debug = options.get("debug", False)
+ loop_factory = options.get("loop_factory", None)
+ if loop_factory is None and options.get("use_uvloop", False):
+ import uvloop
+
+ loop_factory = uvloop.new_event_loop
+
+ with Runner(debug=debug, loop_factory=loop_factory) as runner:
+ return runner.run(wrapper())
+
+ @classmethod
+ def current_token(cls) -> object:
+ return get_running_loop()
+
+ @classmethod
+ def current_time(cls) -> float:
+ return get_running_loop().time()
+
+ @classmethod
+ def cancelled_exception_class(cls) -> type[BaseException]:
+ return CancelledError
+
+ @classmethod
+ async def checkpoint(cls) -> None:
+ await sleep(0)
+
+ @classmethod
+ async def checkpoint_if_cancelled(cls) -> None:
+ task = current_task()
+ if task is None:
+ return
+
+ try:
+ cancel_scope = _task_states[task].cancel_scope
+ except KeyError:
+ return
+
+ while cancel_scope:
+ if cancel_scope.cancel_called:
+ await sleep(0)
+ elif cancel_scope.shield:
+ break
+ else:
+ cancel_scope = cancel_scope._parent_scope
+
+ @classmethod
+ async def cancel_shielded_checkpoint(cls) -> None:
+ with CancelScope(shield=True):
+ await sleep(0)
+
+ @classmethod
+ async def sleep(cls, delay: float) -> None:
+ await sleep(delay)
+
+ @classmethod
+ def create_cancel_scope(
+ cls, *, deadline: float = math.inf, shield: bool = False
+ ) -> CancelScope:
+ return CancelScope(deadline=deadline, shield=shield)
+
+ @classmethod
+ def current_effective_deadline(cls) -> float:
+ try:
+ cancel_scope = _task_states[
+ current_task() # type: ignore[index]
+ ].cancel_scope
+ except KeyError:
+ return math.inf
+
+ deadline = math.inf
+ while cancel_scope:
+ deadline = min(deadline, cancel_scope.deadline)
+ if cancel_scope._cancel_called:
+ deadline = -math.inf
+ break
+ elif cancel_scope.shield:
+ break
+ else:
+ cancel_scope = cancel_scope._parent_scope
+
+ return deadline
+
+ @classmethod
+ def create_task_group(cls) -> abc.TaskGroup:
+ return TaskGroup()
+
+ @classmethod
+ def create_event(cls) -> abc.Event:
+ return Event()
+
+ @classmethod
+ def create_capacity_limiter(cls, total_tokens: float) -> abc.CapacityLimiter:
+ return CapacityLimiter(total_tokens)
+
+ @classmethod
+ async def run_sync_in_worker_thread(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], T_Retval],
+ args: tuple[Unpack[PosArgsT]],
+ abandon_on_cancel: bool = False,
+ limiter: abc.CapacityLimiter | None = None,
+ ) -> T_Retval:
+ await cls.checkpoint()
+
+ # If this is the first run in this event loop thread, set up the necessary
+ # variables
+ try:
+ idle_workers = _threadpool_idle_workers.get()
+ workers = _threadpool_workers.get()
+ except LookupError:
+ idle_workers = deque()
+ workers = set()
+ _threadpool_idle_workers.set(idle_workers)
+ _threadpool_workers.set(workers)
+
+ async with limiter or cls.current_default_thread_limiter():
+ with CancelScope(shield=not abandon_on_cancel) as scope:
+ future: asyncio.Future = asyncio.Future()
+ root_task = find_root_task()
+ if not idle_workers:
+ worker = WorkerThread(root_task, workers, idle_workers)
+ worker.start()
+ workers.add(worker)
+ root_task.add_done_callback(worker.stop)
+ else:
+ worker = idle_workers.pop()
+
+ # Prune any other workers that have been idle for MAX_IDLE_TIME
+ # seconds or longer
+ now = cls.current_time()
+ while idle_workers:
+ if (
+ now - idle_workers[0].idle_since
+ < WorkerThread.MAX_IDLE_TIME
+ ):
+ break
+
+ expired_worker = idle_workers.popleft()
+ expired_worker.root_task.remove_done_callback(
+ expired_worker.stop
+ )
+ expired_worker.stop()
+
+ context = copy_context()
+ context.run(sniffio.current_async_library_cvar.set, None)
+ if abandon_on_cancel or scope._parent_scope is None:
+ worker_scope = scope
+ else:
+ worker_scope = scope._parent_scope
+
+ worker.queue.put_nowait((context, func, args, future, worker_scope))
+ return await future
+
+ @classmethod
+ def check_cancelled(cls) -> None:
+ scope: CancelScope | None = threadlocals.current_cancel_scope
+ while scope is not None:
+ if scope.cancel_called:
+ raise CancelledError(f"Cancelled by cancel scope {id(scope):x}")
+
+ if scope.shield:
+ return
+
+ scope = scope._parent_scope
+
+ @classmethod
+ def run_async_from_thread(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
+ args: tuple[Unpack[PosArgsT]],
+ token: object,
+ ) -> T_Retval:
+ async def task_wrapper(scope: CancelScope) -> T_Retval:
+ __tracebackhide__ = True
+ task = cast(asyncio.Task, current_task())
+ _task_states[task] = TaskState(None, scope)
+ scope._tasks.add(task)
+ try:
+ return await func(*args)
+ except CancelledError as exc:
+ raise concurrent.futures.CancelledError(str(exc)) from None
+ finally:
+ scope._tasks.discard(task)
+
+ loop = cast(AbstractEventLoop, token)
+ context = copy_context()
+ context.run(sniffio.current_async_library_cvar.set, "asyncio")
+ wrapper = task_wrapper(threadlocals.current_cancel_scope)
+ f: concurrent.futures.Future[T_Retval] = context.run(
+ asyncio.run_coroutine_threadsafe, wrapper, loop
+ )
+ return f.result()
+
+ @classmethod
+ def run_sync_from_thread(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], T_Retval],
+ args: tuple[Unpack[PosArgsT]],
+ token: object,
+ ) -> T_Retval:
+ @wraps(func)
+ def wrapper() -> None:
+ try:
+ sniffio.current_async_library_cvar.set("asyncio")
+ f.set_result(func(*args))
+ except BaseException as exc:
+ f.set_exception(exc)
+ if not isinstance(exc, Exception):
+ raise
+
+ f: concurrent.futures.Future[T_Retval] = Future()
+ loop = cast(AbstractEventLoop, token)
+ loop.call_soon_threadsafe(wrapper)
+ return f.result()
+
+ @classmethod
+ def create_blocking_portal(cls) -> abc.BlockingPortal:
+ return BlockingPortal()
+
+ @classmethod
+ async def open_process(
+ cls,
+ command: str | bytes | Sequence[str | bytes],
+ *,
+ shell: bool,
+ stdin: int | IO[Any] | None,
+ stdout: int | IO[Any] | None,
+ stderr: int | IO[Any] | None,
+ cwd: str | bytes | PathLike | None = None,
+ env: Mapping[str, str] | None = None,
+ start_new_session: bool = False,
+ ) -> Process:
+ await cls.checkpoint()
+ if shell:
+ process = await asyncio.create_subprocess_shell(
+ cast("str | bytes", command),
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ cwd=cwd,
+ env=env,
+ start_new_session=start_new_session,
+ )
+ else:
+ process = await asyncio.create_subprocess_exec(
+ *command,
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ cwd=cwd,
+ env=env,
+ start_new_session=start_new_session,
+ )
+
+ stdin_stream = StreamWriterWrapper(process.stdin) if process.stdin else None
+ stdout_stream = StreamReaderWrapper(process.stdout) if process.stdout else None
+ stderr_stream = StreamReaderWrapper(process.stderr) if process.stderr else None
+ return Process(process, stdin_stream, stdout_stream, stderr_stream)
+
+ @classmethod
+ def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
+ create_task(
+ _shutdown_process_pool_on_exit(workers),
+ name="AnyIO process pool shutdown task",
+ )
+ find_root_task().add_done_callback(
+ partial(_forcibly_shutdown_process_pool_on_exit, workers)
+ )
+
+ @classmethod
+ async def connect_tcp(
+ cls, host: str, port: int, local_address: IPSockAddrType | None = None
+ ) -> abc.SocketStream:
+ transport, protocol = cast(
+ Tuple[asyncio.Transport, StreamProtocol],
+ await get_running_loop().create_connection(
+ StreamProtocol, host, port, local_addr=local_address
+ ),
+ )
+ transport.pause_reading()
+ return SocketStream(transport, protocol)
+
+ @classmethod
+ async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
+ await cls.checkpoint()
+ loop = get_running_loop()
+ raw_socket = socket.socket(socket.AF_UNIX)
+ raw_socket.setblocking(False)
+ while True:
+ try:
+ raw_socket.connect(path)
+ except BlockingIOError:
+ f: asyncio.Future = asyncio.Future()
+ loop.add_writer(raw_socket, f.set_result, None)
+ f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
+ await f
+ except BaseException:
+ raw_socket.close()
+ raise
+ else:
+ return UNIXSocketStream(raw_socket)
+
+ @classmethod
+ def create_tcp_listener(cls, sock: socket.socket) -> SocketListener:
+ return TCPSocketListener(sock)
+
+ @classmethod
+ def create_unix_listener(cls, sock: socket.socket) -> SocketListener:
+ return UNIXSocketListener(sock)
+
+ @classmethod
+ async def create_udp_socket(
+ cls,
+ family: AddressFamily,
+ local_address: IPSockAddrType | None,
+ remote_address: IPSockAddrType | None,
+ reuse_port: bool,
+ ) -> UDPSocket | ConnectedUDPSocket:
+ transport, protocol = await get_running_loop().create_datagram_endpoint(
+ DatagramProtocol,
+ local_addr=local_address,
+ remote_addr=remote_address,
+ family=family,
+ reuse_port=reuse_port,
+ )
+ if protocol.exception:
+ transport.close()
+ raise protocol.exception
+
+ if not remote_address:
+ return UDPSocket(transport, protocol)
+ else:
+ return ConnectedUDPSocket(transport, protocol)
+
+ @classmethod
+ async def create_unix_datagram_socket( # type: ignore[override]
+ cls, raw_socket: socket.socket, remote_path: str | bytes | None
+ ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
+ await cls.checkpoint()
+ loop = get_running_loop()
+
+ if remote_path:
+ while True:
+ try:
+ raw_socket.connect(remote_path)
+ except BlockingIOError:
+ f: asyncio.Future = asyncio.Future()
+ loop.add_writer(raw_socket, f.set_result, None)
+ f.add_done_callback(lambda _: loop.remove_writer(raw_socket))
+ await f
+ except BaseException:
+ raw_socket.close()
+ raise
+ else:
+ return ConnectedUNIXDatagramSocket(raw_socket)
+ else:
+ return UNIXDatagramSocket(raw_socket)
+
+ @classmethod
+ async def getaddrinfo(
+ cls,
+ host: bytes | str | None,
+ port: str | int | None,
+ *,
+ family: int | AddressFamily = 0,
+ type: int | SocketKind = 0,
+ proto: int = 0,
+ flags: int = 0,
+ ) -> list[
+ tuple[
+ AddressFamily,
+ SocketKind,
+ int,
+ str,
+ tuple[str, int] | tuple[str, int, int, int],
+ ]
+ ]:
+ return await get_running_loop().getaddrinfo(
+ host, port, family=family, type=type, proto=proto, flags=flags
+ )
+
+ @classmethod
+ async def getnameinfo(
+ cls, sockaddr: IPSockAddrType, flags: int = 0
+ ) -> tuple[str, str]:
+ return await get_running_loop().getnameinfo(sockaddr, flags)
+
+ @classmethod
+ async def wait_socket_readable(cls, sock: socket.socket) -> None:
+ await cls.checkpoint()
+ try:
+ read_events = _read_events.get()
+ except LookupError:
+ read_events = {}
+ _read_events.set(read_events)
+
+ if read_events.get(sock):
+ raise BusyResourceError("reading from") from None
+
+ loop = get_running_loop()
+ event = read_events[sock] = asyncio.Event()
+ loop.add_reader(sock, event.set)
+ try:
+ await event.wait()
+ finally:
+ if read_events.pop(sock, None) is not None:
+ loop.remove_reader(sock)
+ readable = True
+ else:
+ readable = False
+
+ if not readable:
+ raise ClosedResourceError
+
+ @classmethod
+ async def wait_socket_writable(cls, sock: socket.socket) -> None:
+ await cls.checkpoint()
+ try:
+ write_events = _write_events.get()
+ except LookupError:
+ write_events = {}
+ _write_events.set(write_events)
+
+ if write_events.get(sock):
+ raise BusyResourceError("writing to") from None
+
+ loop = get_running_loop()
+ event = write_events[sock] = asyncio.Event()
+ loop.add_writer(sock.fileno(), event.set)
+ try:
+ await event.wait()
+ finally:
+ if write_events.pop(sock, None) is not None:
+ loop.remove_writer(sock)
+ writable = True
+ else:
+ writable = False
+
+ if not writable:
+ raise ClosedResourceError
+
+ @classmethod
+ def current_default_thread_limiter(cls) -> CapacityLimiter:
+ try:
+ return _default_thread_limiter.get()
+ except LookupError:
+ limiter = CapacityLimiter(40)
+ _default_thread_limiter.set(limiter)
+ return limiter
+
+ @classmethod
+ def open_signal_receiver(
+ cls, *signals: Signals
+ ) -> ContextManager[AsyncIterator[Signals]]:
+ return _SignalReceiver(signals)
+
+ @classmethod
+ def get_current_task(cls) -> TaskInfo:
+ return _create_task_info(current_task()) # type: ignore[arg-type]
+
+ @classmethod
+ def get_running_tasks(cls) -> list[TaskInfo]:
+ return [_create_task_info(task) for task in all_tasks() if not task.done()]
+
+ @classmethod
+ async def wait_all_tasks_blocked(cls) -> None:
+ await cls.checkpoint()
+ this_task = current_task()
+ while True:
+ for task in all_tasks():
+ if task is this_task:
+ continue
+
+ waiter = task._fut_waiter # type: ignore[attr-defined]
+ if waiter is None or waiter.done():
+ await sleep(0.1)
+ break
+ else:
+ return
+
+ @classmethod
+ def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
+ return TestRunner(**options)
+
+
+backend_class = AsyncIOBackend
diff --git a/venv/lib/python3.11/site-packages/anyio/_backends/_trio.py b/venv/lib/python3.11/site-packages/anyio/_backends/_trio.py
new file mode 100644
index 0000000..1a47192
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/anyio/_backends/_trio.py
@@ -0,0 +1,1169 @@
+from __future__ import annotations
+
+import array
+import math
+import socket
+import sys
+import types
+from collections.abc import AsyncIterator, Iterable
+from concurrent.futures import Future
+from dataclasses import dataclass
+from functools import partial
+from io import IOBase
+from os import PathLike
+from signal import Signals
+from socket import AddressFamily, SocketKind
+from types import TracebackType
+from typing import (
+ IO,
+ Any,
+ AsyncGenerator,
+ Awaitable,
+ Callable,
+ Collection,
+ ContextManager,
+ Coroutine,
+ Generic,
+ Mapping,
+ NoReturn,
+ Sequence,
+ TypeVar,
+ cast,
+ overload,
+)
+
+import trio.from_thread
+import trio.lowlevel
+from outcome import Error, Outcome, Value
+from trio.lowlevel import (
+ current_root_task,
+ current_task,
+ wait_readable,
+ wait_writable,
+)
+from trio.socket import SocketType as TrioSocketType
+from trio.to_thread import run_sync
+
+from .. import CapacityLimiterStatistics, EventStatistics, TaskInfo, abc
+from .._core._eventloop import claim_worker_thread
+from .._core._exceptions import (
+ BrokenResourceError,
+ BusyResourceError,
+ ClosedResourceError,
+ EndOfStream,
+)
+from .._core._sockets import convert_ipv6_sockaddr
+from .._core._streams import create_memory_object_stream
+from .._core._synchronization import CapacityLimiter as BaseCapacityLimiter
+from .._core._synchronization import Event as BaseEvent
+from .._core._synchronization import ResourceGuard
+from .._core._tasks import CancelScope as BaseCancelScope
+from ..abc import IPSockAddrType, UDPPacketType, UNIXDatagramPacketType
+from ..abc._eventloop import AsyncBackend
+from ..streams.memory import MemoryObjectSendStream
+
+if sys.version_info >= (3, 10):
+ from typing import ParamSpec
+else:
+ from typing_extensions import ParamSpec
+
+if sys.version_info >= (3, 11):
+ from typing import TypeVarTuple, Unpack
+else:
+ from exceptiongroup import BaseExceptionGroup
+ from typing_extensions import TypeVarTuple, Unpack
+
+T = TypeVar("T")
+T_Retval = TypeVar("T_Retval")
+T_SockAddr = TypeVar("T_SockAddr", str, IPSockAddrType)
+PosArgsT = TypeVarTuple("PosArgsT")
+P = ParamSpec("P")
+
+
+#
+# Event loop
+#
+
+RunVar = trio.lowlevel.RunVar
+
+
+#
+# Timeouts and cancellation
+#
+
+
+class CancelScope(BaseCancelScope):
+ def __new__(
+ cls, original: trio.CancelScope | None = None, **kwargs: object
+ ) -> CancelScope:
+ return object.__new__(cls)
+
+ def __init__(self, original: trio.CancelScope | None = None, **kwargs: Any) -> None:
+ self.__original = original or trio.CancelScope(**kwargs)
+
+ def __enter__(self) -> CancelScope:
+ self.__original.__enter__()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
+ # https://github.com/python-trio/trio-typing/pull/79
+ return self.__original.__exit__(exc_type, exc_val, exc_tb)
+
+ def cancel(self) -> None:
+ self.__original.cancel()
+
+ @property
+ def deadline(self) -> float:
+ return self.__original.deadline
+
+ @deadline.setter
+ def deadline(self, value: float) -> None:
+ self.__original.deadline = value
+
+ @property
+ def cancel_called(self) -> bool:
+ return self.__original.cancel_called
+
+ @property
+ def cancelled_caught(self) -> bool:
+ return self.__original.cancelled_caught
+
+ @property
+ def shield(self) -> bool:
+ return self.__original.shield
+
+ @shield.setter
+ def shield(self, value: bool) -> None:
+ self.__original.shield = value
+
+
+#
+# Task groups
+#
+
+
+class TaskGroup(abc.TaskGroup):
+ def __init__(self) -> None:
+ self._active = False
+ self._nursery_manager = trio.open_nursery(strict_exception_groups=True)
+ self.cancel_scope = None # type: ignore[assignment]
+
+ async def __aenter__(self) -> TaskGroup:
+ self._active = True
+ self._nursery = await self._nursery_manager.__aenter__()
+ self.cancel_scope = CancelScope(self._nursery.cancel_scope)
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
+ try:
+ return await self._nursery_manager.__aexit__(exc_type, exc_val, exc_tb)
+ except BaseExceptionGroup as exc:
+ _, rest = exc.split(trio.Cancelled)
+ if not rest:
+ cancelled_exc = trio.Cancelled._create()
+ raise cancelled_exc from exc
+
+ raise
+ finally:
+ self._active = False
+
+ def start_soon(
+ self,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[Any]],
+ *args: Unpack[PosArgsT],
+ name: object = None,
+ ) -> None:
+ if not self._active:
+ raise RuntimeError(
+ "This task group is not active; no new tasks can be started."
+ )
+
+ self._nursery.start_soon(func, *args, name=name)
+
+ async def start(
+ self, func: Callable[..., Awaitable[Any]], *args: object, name: object = None
+ ) -> Any:
+ if not self._active:
+ raise RuntimeError(
+ "This task group is not active; no new tasks can be started."
+ )
+
+ return await self._nursery.start(func, *args, name=name)
+
+
+#
+# Threads
+#
+
+
+class BlockingPortal(abc.BlockingPortal):
+ def __new__(cls) -> BlockingPortal:
+ return object.__new__(cls)
+
+ def __init__(self) -> None:
+ super().__init__()
+ self._token = trio.lowlevel.current_trio_token()
+
+ def _spawn_task_from_thread(
+ self,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval] | T_Retval],
+ args: tuple[Unpack[PosArgsT]],
+ kwargs: dict[str, Any],
+ name: object,
+ future: Future[T_Retval],
+ ) -> None:
+ trio.from_thread.run_sync(
+ partial(self._task_group.start_soon, name=name),
+ self._call_func,
+ func,
+ args,
+ kwargs,
+ future,
+ trio_token=self._token,
+ )
+
+
+#
+# Subprocesses
+#
+
+
+@dataclass(eq=False)
+class ReceiveStreamWrapper(abc.ByteReceiveStream):
+ _stream: trio.abc.ReceiveStream
+
+ async def receive(self, max_bytes: int | None = None) -> bytes:
+ try:
+ data = await self._stream.receive_some(max_bytes)
+ except trio.ClosedResourceError as exc:
+ raise ClosedResourceError from exc.__cause__
+ except trio.BrokenResourceError as exc:
+ raise BrokenResourceError from exc.__cause__
+
+ if data:
+ return data
+ else:
+ raise EndOfStream
+
+ async def aclose(self) -> None:
+ await self._stream.aclose()
+
+
+@dataclass(eq=False)
+class SendStreamWrapper(abc.ByteSendStream):
+ _stream: trio.abc.SendStream
+
+ async def send(self, item: bytes) -> None:
+ try:
+ await self._stream.send_all(item)
+ except trio.ClosedResourceError as exc:
+ raise ClosedResourceError from exc.__cause__
+ except trio.BrokenResourceError as exc:
+ raise BrokenResourceError from exc.__cause__
+
+ async def aclose(self) -> None:
+ await self._stream.aclose()
+
+
+@dataclass(eq=False)
+class Process(abc.Process):
+ _process: trio.Process
+ _stdin: abc.ByteSendStream | None
+ _stdout: abc.ByteReceiveStream | None
+ _stderr: abc.ByteReceiveStream | None
+
+ async def aclose(self) -> None:
+ with CancelScope(shield=True):
+ if self._stdin:
+ await self._stdin.aclose()
+ if self._stdout:
+ await self._stdout.aclose()
+ if self._stderr:
+ await self._stderr.aclose()
+
+ try:
+ await self.wait()
+ except BaseException:
+ self.kill()
+ with CancelScope(shield=True):
+ await self.wait()
+ raise
+
+ async def wait(self) -> int:
+ return await self._process.wait()
+
+ def terminate(self) -> None:
+ self._process.terminate()
+
+ def kill(self) -> None:
+ self._process.kill()
+
+ def send_signal(self, signal: Signals) -> None:
+ self._process.send_signal(signal)
+
+ @property
+ def pid(self) -> int:
+ return self._process.pid
+
+ @property
+ def returncode(self) -> int | None:
+ return self._process.returncode
+
+ @property
+ def stdin(self) -> abc.ByteSendStream | None:
+ return self._stdin
+
+ @property
+ def stdout(self) -> abc.ByteReceiveStream | None:
+ return self._stdout
+
+ @property
+ def stderr(self) -> abc.ByteReceiveStream | None:
+ return self._stderr
+
+
+class _ProcessPoolShutdownInstrument(trio.abc.Instrument):
+ def after_run(self) -> None:
+ super().after_run()
+
+
+current_default_worker_process_limiter: trio.lowlevel.RunVar = RunVar(
+ "current_default_worker_process_limiter"
+)
+
+
+async def _shutdown_process_pool(workers: set[abc.Process]) -> None:
+ try:
+ await trio.sleep(math.inf)
+ except trio.Cancelled:
+ for process in workers:
+ if process.returncode is None:
+ process.kill()
+
+ with CancelScope(shield=True):
+ for process in workers:
+ await process.aclose()
+
+
+#
+# Sockets and networking
+#
+
+
+class _TrioSocketMixin(Generic[T_SockAddr]):
+ def __init__(self, trio_socket: TrioSocketType) -> None:
+ self._trio_socket = trio_socket
+ self._closed = False
+
+ def _check_closed(self) -> None:
+ if self._closed:
+ raise ClosedResourceError
+ if self._trio_socket.fileno() < 0:
+ raise BrokenResourceError
+
+ @property
+ def _raw_socket(self) -> socket.socket:
+ return self._trio_socket._sock # type: ignore[attr-defined]
+
+ async def aclose(self) -> None:
+ if self._trio_socket.fileno() >= 0:
+ self._closed = True
+ self._trio_socket.close()
+
+ def _convert_socket_error(self, exc: BaseException) -> NoReturn:
+ if isinstance(exc, trio.ClosedResourceError):
+ raise ClosedResourceError from exc
+ elif self._trio_socket.fileno() < 0 and self._closed:
+ raise ClosedResourceError from None
+ elif isinstance(exc, OSError):
+ raise BrokenResourceError from exc
+ else:
+ raise exc
+
+
+class SocketStream(_TrioSocketMixin, abc.SocketStream):
+ def __init__(self, trio_socket: TrioSocketType) -> None:
+ super().__init__(trio_socket)
+ self._receive_guard = ResourceGuard("reading from")
+ self._send_guard = ResourceGuard("writing to")
+
+ async def receive(self, max_bytes: int = 65536) -> bytes:
+ with self._receive_guard:
+ try:
+ data = await self._trio_socket.recv(max_bytes)
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+ if data:
+ return data
+ else:
+ raise EndOfStream
+
+ async def send(self, item: bytes) -> None:
+ with self._send_guard:
+ view = memoryview(item)
+ while view:
+ try:
+ bytes_sent = await self._trio_socket.send(view)
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+ view = view[bytes_sent:]
+
+ async def send_eof(self) -> None:
+ self._trio_socket.shutdown(socket.SHUT_WR)
+
+
+class UNIXSocketStream(SocketStream, abc.UNIXSocketStream):
+ async def receive_fds(self, msglen: int, maxfds: int) -> tuple[bytes, list[int]]:
+ if not isinstance(msglen, int) or msglen < 0:
+ raise ValueError("msglen must be a non-negative integer")
+ if not isinstance(maxfds, int) or maxfds < 1:
+ raise ValueError("maxfds must be a positive integer")
+
+ fds = array.array("i")
+ await trio.lowlevel.checkpoint()
+ with self._receive_guard:
+ while True:
+ try:
+ message, ancdata, flags, addr = await self._trio_socket.recvmsg(
+ msglen, socket.CMSG_LEN(maxfds * fds.itemsize)
+ )
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+ else:
+ if not message and not ancdata:
+ raise EndOfStream
+
+ break
+
+ for cmsg_level, cmsg_type, cmsg_data in ancdata:
+ if cmsg_level != socket.SOL_SOCKET or cmsg_type != socket.SCM_RIGHTS:
+ raise RuntimeError(
+ f"Received unexpected ancillary data; message = {message!r}, "
+ f"cmsg_level = {cmsg_level}, cmsg_type = {cmsg_type}"
+ )
+
+ fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
+
+ return message, list(fds)
+
+ async def send_fds(self, message: bytes, fds: Collection[int | IOBase]) -> None:
+ if not message:
+ raise ValueError("message must not be empty")
+ if not fds:
+ raise ValueError("fds must not be empty")
+
+ filenos: list[int] = []
+ for fd in fds:
+ if isinstance(fd, int):
+ filenos.append(fd)
+ elif isinstance(fd, IOBase):
+ filenos.append(fd.fileno())
+
+ fdarray = array.array("i", filenos)
+ await trio.lowlevel.checkpoint()
+ with self._send_guard:
+ while True:
+ try:
+ await self._trio_socket.sendmsg(
+ [message],
+ [
+ (
+ socket.SOL_SOCKET,
+ socket.SCM_RIGHTS,
+ fdarray,
+ )
+ ],
+ )
+ break
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+
+class TCPSocketListener(_TrioSocketMixin, abc.SocketListener):
+ def __init__(self, raw_socket: socket.socket):
+ super().__init__(trio.socket.from_stdlib_socket(raw_socket))
+ self._accept_guard = ResourceGuard("accepting connections from")
+
+ async def accept(self) -> SocketStream:
+ with self._accept_guard:
+ try:
+ trio_socket, _addr = await self._trio_socket.accept()
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+ trio_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ return SocketStream(trio_socket)
+
+
+class UNIXSocketListener(_TrioSocketMixin, abc.SocketListener):
+ def __init__(self, raw_socket: socket.socket):
+ super().__init__(trio.socket.from_stdlib_socket(raw_socket))
+ self._accept_guard = ResourceGuard("accepting connections from")
+
+ async def accept(self) -> UNIXSocketStream:
+ with self._accept_guard:
+ try:
+ trio_socket, _addr = await self._trio_socket.accept()
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+ return UNIXSocketStream(trio_socket)
+
+
+class UDPSocket(_TrioSocketMixin[IPSockAddrType], abc.UDPSocket):
+ def __init__(self, trio_socket: TrioSocketType) -> None:
+ super().__init__(trio_socket)
+ self._receive_guard = ResourceGuard("reading from")
+ self._send_guard = ResourceGuard("writing to")
+
+ async def receive(self) -> tuple[bytes, IPSockAddrType]:
+ with self._receive_guard:
+ try:
+ data, addr = await self._trio_socket.recvfrom(65536)
+ return data, convert_ipv6_sockaddr(addr)
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+ async def send(self, item: UDPPacketType) -> None:
+ with self._send_guard:
+ try:
+ await self._trio_socket.sendto(*item)
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+
+class ConnectedUDPSocket(_TrioSocketMixin[IPSockAddrType], abc.ConnectedUDPSocket):
+ def __init__(self, trio_socket: TrioSocketType) -> None:
+ super().__init__(trio_socket)
+ self._receive_guard = ResourceGuard("reading from")
+ self._send_guard = ResourceGuard("writing to")
+
+ async def receive(self) -> bytes:
+ with self._receive_guard:
+ try:
+ return await self._trio_socket.recv(65536)
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+ async def send(self, item: bytes) -> None:
+ with self._send_guard:
+ try:
+ await self._trio_socket.send(item)
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+
+class UNIXDatagramSocket(_TrioSocketMixin[str], abc.UNIXDatagramSocket):
+ def __init__(self, trio_socket: TrioSocketType) -> None:
+ super().__init__(trio_socket)
+ self._receive_guard = ResourceGuard("reading from")
+ self._send_guard = ResourceGuard("writing to")
+
+ async def receive(self) -> UNIXDatagramPacketType:
+ with self._receive_guard:
+ try:
+ data, addr = await self._trio_socket.recvfrom(65536)
+ return data, addr
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+ async def send(self, item: UNIXDatagramPacketType) -> None:
+ with self._send_guard:
+ try:
+ await self._trio_socket.sendto(*item)
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+
+class ConnectedUNIXDatagramSocket(
+ _TrioSocketMixin[str], abc.ConnectedUNIXDatagramSocket
+):
+ def __init__(self, trio_socket: TrioSocketType) -> None:
+ super().__init__(trio_socket)
+ self._receive_guard = ResourceGuard("reading from")
+ self._send_guard = ResourceGuard("writing to")
+
+ async def receive(self) -> bytes:
+ with self._receive_guard:
+ try:
+ return await self._trio_socket.recv(65536)
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+ async def send(self, item: bytes) -> None:
+ with self._send_guard:
+ try:
+ await self._trio_socket.send(item)
+ except BaseException as exc:
+ self._convert_socket_error(exc)
+
+
+#
+# Synchronization
+#
+
+
+class Event(BaseEvent):
+ def __new__(cls) -> Event:
+ return object.__new__(cls)
+
+ def __init__(self) -> None:
+ self.__original = trio.Event()
+
+ def is_set(self) -> bool:
+ return self.__original.is_set()
+
+ async def wait(self) -> None:
+ return await self.__original.wait()
+
+ def statistics(self) -> EventStatistics:
+ orig_statistics = self.__original.statistics()
+ return EventStatistics(tasks_waiting=orig_statistics.tasks_waiting)
+
+ def set(self) -> None:
+ self.__original.set()
+
+
+class CapacityLimiter(BaseCapacityLimiter):
+ def __new__(
+ cls,
+ total_tokens: float | None = None,
+ *,
+ original: trio.CapacityLimiter | None = None,
+ ) -> CapacityLimiter:
+ return object.__new__(cls)
+
+ def __init__(
+ self,
+ total_tokens: float | None = None,
+ *,
+ original: trio.CapacityLimiter | None = None,
+ ) -> None:
+ if original is not None:
+ self.__original = original
+ else:
+ assert total_tokens is not None
+ self.__original = trio.CapacityLimiter(total_tokens)
+
+ async def __aenter__(self) -> None:
+ return await self.__original.__aenter__()
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> None:
+ await self.__original.__aexit__(exc_type, exc_val, exc_tb)
+
+ @property
+ def total_tokens(self) -> float:
+ return self.__original.total_tokens
+
+ @total_tokens.setter
+ def total_tokens(self, value: float) -> None:
+ self.__original.total_tokens = value
+
+ @property
+ def borrowed_tokens(self) -> int:
+ return self.__original.borrowed_tokens
+
+ @property
+ def available_tokens(self) -> float:
+ return self.__original.available_tokens
+
+ def acquire_nowait(self) -> None:
+ self.__original.acquire_nowait()
+
+ def acquire_on_behalf_of_nowait(self, borrower: object) -> None:
+ self.__original.acquire_on_behalf_of_nowait(borrower)
+
+ async def acquire(self) -> None:
+ await self.__original.acquire()
+
+ async def acquire_on_behalf_of(self, borrower: object) -> None:
+ await self.__original.acquire_on_behalf_of(borrower)
+
+ def release(self) -> None:
+ return self.__original.release()
+
+ def release_on_behalf_of(self, borrower: object) -> None:
+ return self.__original.release_on_behalf_of(borrower)
+
+ def statistics(self) -> CapacityLimiterStatistics:
+ orig = self.__original.statistics()
+ return CapacityLimiterStatistics(
+ borrowed_tokens=orig.borrowed_tokens,
+ total_tokens=orig.total_tokens,
+ borrowers=tuple(orig.borrowers),
+ tasks_waiting=orig.tasks_waiting,
+ )
+
+
+_capacity_limiter_wrapper: trio.lowlevel.RunVar = RunVar("_capacity_limiter_wrapper")
+
+
+#
+# Signal handling
+#
+
+
+class _SignalReceiver:
+ _iterator: AsyncIterator[int]
+
+ def __init__(self, signals: tuple[Signals, ...]):
+ self._signals = signals
+
+ def __enter__(self) -> _SignalReceiver:
+ self._cm = trio.open_signal_receiver(*self._signals)
+ self._iterator = self._cm.__enter__()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: TracebackType | None,
+ ) -> bool | None:
+ return self._cm.__exit__(exc_type, exc_val, exc_tb)
+
+ def __aiter__(self) -> _SignalReceiver:
+ return self
+
+ async def __anext__(self) -> Signals:
+ signum = await self._iterator.__anext__()
+ return Signals(signum)
+
+
+#
+# Testing and debugging
+#
+
+
+class TestRunner(abc.TestRunner):
+ def __init__(self, **options: Any) -> None:
+ from queue import Queue
+
+ self._call_queue: Queue[Callable[[], object]] = Queue()
+ self._send_stream: MemoryObjectSendStream | None = None
+ self._options = options
+
+ def __exit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_val: BaseException | None,
+ exc_tb: types.TracebackType | None,
+ ) -> None:
+ if self._send_stream:
+ self._send_stream.close()
+ while self._send_stream is not None:
+ self._call_queue.get()()
+
+ async def _run_tests_and_fixtures(self) -> None:
+ self._send_stream, receive_stream = create_memory_object_stream(1)
+ with receive_stream:
+ async for coro, outcome_holder in receive_stream:
+ try:
+ retval = await coro
+ except BaseException as exc:
+ outcome_holder.append(Error(exc))
+ else:
+ outcome_holder.append(Value(retval))
+
+ def _main_task_finished(self, outcome: object) -> None:
+ self._send_stream = None
+
+ def _call_in_runner_task(
+ self,
+ func: Callable[P, Awaitable[T_Retval]],
+ *args: P.args,
+ **kwargs: P.kwargs,
+ ) -> T_Retval:
+ if self._send_stream is None:
+ trio.lowlevel.start_guest_run(
+ self._run_tests_and_fixtures,
+ run_sync_soon_threadsafe=self._call_queue.put,
+ done_callback=self._main_task_finished,
+ **self._options,
+ )
+ while self._send_stream is None:
+ self._call_queue.get()()
+
+ outcome_holder: list[Outcome] = []
+ self._send_stream.send_nowait((func(*args, **kwargs), outcome_holder))
+ while not outcome_holder:
+ self._call_queue.get()()
+
+ return outcome_holder[0].unwrap()
+
+ def run_asyncgen_fixture(
+ self,
+ fixture_func: Callable[..., AsyncGenerator[T_Retval, Any]],
+ kwargs: dict[str, Any],
+ ) -> Iterable[T_Retval]:
+ asyncgen = fixture_func(**kwargs)
+ fixturevalue: T_Retval = self._call_in_runner_task(asyncgen.asend, None)
+
+ yield fixturevalue
+
+ try:
+ self._call_in_runner_task(asyncgen.asend, None)
+ except StopAsyncIteration:
+ pass
+ else:
+ self._call_in_runner_task(asyncgen.aclose)
+ raise RuntimeError("Async generator fixture did not stop")
+
+ def run_fixture(
+ self,
+ fixture_func: Callable[..., Coroutine[Any, Any, T_Retval]],
+ kwargs: dict[str, Any],
+ ) -> T_Retval:
+ return self._call_in_runner_task(fixture_func, **kwargs)
+
+ def run_test(
+ self, test_func: Callable[..., Coroutine[Any, Any, Any]], kwargs: dict[str, Any]
+ ) -> None:
+ self._call_in_runner_task(test_func, **kwargs)
+
+
+class TrioBackend(AsyncBackend):
+ @classmethod
+ def run(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
+ args: tuple[Unpack[PosArgsT]],
+ kwargs: dict[str, Any],
+ options: dict[str, Any],
+ ) -> T_Retval:
+ return trio.run(func, *args)
+
+ @classmethod
+ def current_token(cls) -> object:
+ return trio.lowlevel.current_trio_token()
+
+ @classmethod
+ def current_time(cls) -> float:
+ return trio.current_time()
+
+ @classmethod
+ def cancelled_exception_class(cls) -> type[BaseException]:
+ return trio.Cancelled
+
+ @classmethod
+ async def checkpoint(cls) -> None:
+ await trio.lowlevel.checkpoint()
+
+ @classmethod
+ async def checkpoint_if_cancelled(cls) -> None:
+ await trio.lowlevel.checkpoint_if_cancelled()
+
+ @classmethod
+ async def cancel_shielded_checkpoint(cls) -> None:
+ await trio.lowlevel.cancel_shielded_checkpoint()
+
+ @classmethod
+ async def sleep(cls, delay: float) -> None:
+ await trio.sleep(delay)
+
+ @classmethod
+ def create_cancel_scope(
+ cls, *, deadline: float = math.inf, shield: bool = False
+ ) -> abc.CancelScope:
+ return CancelScope(deadline=deadline, shield=shield)
+
+ @classmethod
+ def current_effective_deadline(cls) -> float:
+ return trio.current_effective_deadline()
+
+ @classmethod
+ def create_task_group(cls) -> abc.TaskGroup:
+ return TaskGroup()
+
+ @classmethod
+ def create_event(cls) -> abc.Event:
+ return Event()
+
+ @classmethod
+ def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
+ return CapacityLimiter(total_tokens)
+
+ @classmethod
+ async def run_sync_in_worker_thread(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], T_Retval],
+ args: tuple[Unpack[PosArgsT]],
+ abandon_on_cancel: bool = False,
+ limiter: abc.CapacityLimiter | None = None,
+ ) -> T_Retval:
+ def wrapper() -> T_Retval:
+ with claim_worker_thread(TrioBackend, token):
+ return func(*args)
+
+ token = TrioBackend.current_token()
+ return await run_sync(
+ wrapper,
+ abandon_on_cancel=abandon_on_cancel,
+ limiter=cast(trio.CapacityLimiter, limiter),
+ )
+
+ @classmethod
+ def check_cancelled(cls) -> None:
+ trio.from_thread.check_cancelled()
+
+ @classmethod
+ def run_async_from_thread(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
+ args: tuple[Unpack[PosArgsT]],
+ token: object,
+ ) -> T_Retval:
+ return trio.from_thread.run(func, *args)
+
+ @classmethod
+ def run_sync_from_thread(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], T_Retval],
+ args: tuple[Unpack[PosArgsT]],
+ token: object,
+ ) -> T_Retval:
+ return trio.from_thread.run_sync(func, *args)
+
+ @classmethod
+ def create_blocking_portal(cls) -> abc.BlockingPortal:
+ return BlockingPortal()
+
+ @classmethod
+ async def open_process(
+ cls,
+ command: str | bytes | Sequence[str | bytes],
+ *,
+ shell: bool,
+ stdin: int | IO[Any] | None,
+ stdout: int | IO[Any] | None,
+ stderr: int | IO[Any] | None,
+ cwd: str | bytes | PathLike | None = None,
+ env: Mapping[str, str] | None = None,
+ start_new_session: bool = False,
+ ) -> Process:
+ process = await trio.lowlevel.open_process( # type: ignore[misc]
+ command, # type: ignore[arg-type]
+ stdin=stdin,
+ stdout=stdout,
+ stderr=stderr,
+ shell=shell,
+ cwd=cwd,
+ env=env,
+ start_new_session=start_new_session,
+ )
+ stdin_stream = SendStreamWrapper(process.stdin) if process.stdin else None
+ stdout_stream = ReceiveStreamWrapper(process.stdout) if process.stdout else None
+ stderr_stream = ReceiveStreamWrapper(process.stderr) if process.stderr else None
+ return Process(process, stdin_stream, stdout_stream, stderr_stream)
+
+ @classmethod
+ def setup_process_pool_exit_at_shutdown(cls, workers: set[abc.Process]) -> None:
+ trio.lowlevel.spawn_system_task(_shutdown_process_pool, workers)
+
+ @classmethod
+ async def connect_tcp(
+ cls, host: str, port: int, local_address: IPSockAddrType | None = None
+ ) -> SocketStream:
+ family = socket.AF_INET6 if ":" in host else socket.AF_INET
+ trio_socket = trio.socket.socket(family)
+ trio_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+ if local_address:
+ await trio_socket.bind(local_address)
+
+ try:
+ await trio_socket.connect((host, port))
+ except BaseException:
+ trio_socket.close()
+ raise
+
+ return SocketStream(trio_socket)
+
+ @classmethod
+ async def connect_unix(cls, path: str | bytes) -> abc.UNIXSocketStream:
+ trio_socket = trio.socket.socket(socket.AF_UNIX)
+ try:
+ await trio_socket.connect(path)
+ except BaseException:
+ trio_socket.close()
+ raise
+
+ return UNIXSocketStream(trio_socket)
+
+ @classmethod
+ def create_tcp_listener(cls, sock: socket.socket) -> abc.SocketListener:
+ return TCPSocketListener(sock)
+
+ @classmethod
+ def create_unix_listener(cls, sock: socket.socket) -> abc.SocketListener:
+ return UNIXSocketListener(sock)
+
+ @classmethod
+ async def create_udp_socket(
+ cls,
+ family: socket.AddressFamily,
+ local_address: IPSockAddrType | None,
+ remote_address: IPSockAddrType | None,
+ reuse_port: bool,
+ ) -> UDPSocket | ConnectedUDPSocket:
+ trio_socket = trio.socket.socket(family=family, type=socket.SOCK_DGRAM)
+
+ if reuse_port:
+ trio_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+
+ if local_address:
+ await trio_socket.bind(local_address)
+
+ if remote_address:
+ await trio_socket.connect(remote_address)
+ return ConnectedUDPSocket(trio_socket)
+ else:
+ return UDPSocket(trio_socket)
+
+ @classmethod
+ @overload
+ async def create_unix_datagram_socket(
+ cls, raw_socket: socket.socket, remote_path: None
+ ) -> abc.UNIXDatagramSocket:
+ ...
+
+ @classmethod
+ @overload
+ async def create_unix_datagram_socket(
+ cls, raw_socket: socket.socket, remote_path: str | bytes
+ ) -> abc.ConnectedUNIXDatagramSocket:
+ ...
+
+ @classmethod
+ async def create_unix_datagram_socket(
+ cls, raw_socket: socket.socket, remote_path: str | bytes | None
+ ) -> abc.UNIXDatagramSocket | abc.ConnectedUNIXDatagramSocket:
+ trio_socket = trio.socket.from_stdlib_socket(raw_socket)
+
+ if remote_path:
+ await trio_socket.connect(remote_path)
+ return ConnectedUNIXDatagramSocket(trio_socket)
+ else:
+ return UNIXDatagramSocket(trio_socket)
+
+ @classmethod
+ async def getaddrinfo(
+ cls,
+ host: bytes | str | None,
+ port: str | int | None,
+ *,
+ family: int | AddressFamily = 0,
+ type: int | SocketKind = 0,
+ proto: int = 0,
+ flags: int = 0,
+ ) -> list[
+ tuple[
+ AddressFamily,
+ SocketKind,
+ int,
+ str,
+ tuple[str, int] | tuple[str, int, int, int],
+ ]
+ ]:
+ return await trio.socket.getaddrinfo(host, port, family, type, proto, flags)
+
+ @classmethod
+ async def getnameinfo(
+ cls, sockaddr: IPSockAddrType, flags: int = 0
+ ) -> tuple[str, str]:
+ return await trio.socket.getnameinfo(sockaddr, flags)
+
+ @classmethod
+ async def wait_socket_readable(cls, sock: socket.socket) -> None:
+ try:
+ await wait_readable(sock)
+ except trio.ClosedResourceError as exc:
+ raise ClosedResourceError().with_traceback(exc.__traceback__) from None
+ except trio.BusyResourceError:
+ raise BusyResourceError("reading from") from None
+
+ @classmethod
+ async def wait_socket_writable(cls, sock: socket.socket) -> None:
+ try:
+ await wait_writable(sock)
+ except trio.ClosedResourceError as exc:
+ raise ClosedResourceError().with_traceback(exc.__traceback__) from None
+ except trio.BusyResourceError:
+ raise BusyResourceError("writing to") from None
+
+ @classmethod
+ def current_default_thread_limiter(cls) -> CapacityLimiter:
+ try:
+ return _capacity_limiter_wrapper.get()
+ except LookupError:
+ limiter = CapacityLimiter(
+ original=trio.to_thread.current_default_thread_limiter()
+ )
+ _capacity_limiter_wrapper.set(limiter)
+ return limiter
+
+ @classmethod
+ def open_signal_receiver(
+ cls, *signals: Signals
+ ) -> ContextManager[AsyncIterator[Signals]]:
+ return _SignalReceiver(signals)
+
+ @classmethod
+ def get_current_task(cls) -> TaskInfo:
+ task = current_task()
+
+ parent_id = None
+ if task.parent_nursery and task.parent_nursery.parent_task:
+ parent_id = id(task.parent_nursery.parent_task)
+
+ return TaskInfo(id(task), parent_id, task.name, task.coro)
+
+ @classmethod
+ def get_running_tasks(cls) -> list[TaskInfo]:
+ root_task = current_root_task()
+ assert root_task
+ task_infos = [TaskInfo(id(root_task), None, root_task.name, root_task.coro)]
+ nurseries = root_task.child_nurseries
+ while nurseries:
+ new_nurseries: list[trio.Nursery] = []
+ for nursery in nurseries:
+ for task in nursery.child_tasks:
+ task_infos.append(
+ TaskInfo(
+ id(task), id(nursery.parent_task), task.name, task.coro
+ )
+ )
+ new_nurseries.extend(task.child_nurseries)
+
+ nurseries = new_nurseries
+
+ return task_infos
+
+ @classmethod
+ async def wait_all_tasks_blocked(cls) -> None:
+ from trio.testing import wait_all_tasks_blocked
+
+ await wait_all_tasks_blocked()
+
+ @classmethod
+ def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
+ return TestRunner(**options)
+
+
+backend_class = TrioBackend