summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py')
-rw-r--r--venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py392
1 files changed, 392 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py b/venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py
new file mode 100644
index 0000000..4470d83
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py
@@ -0,0 +1,392 @@
+from __future__ import annotations
+
+import math
+import sys
+from abc import ABCMeta, abstractmethod
+from collections.abc import AsyncIterator, Awaitable, Mapping
+from os import PathLike
+from signal import Signals
+from socket import AddressFamily, SocketKind, socket
+from typing import (
+ IO,
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ ContextManager,
+ Sequence,
+ TypeVar,
+ overload,
+)
+
+if sys.version_info >= (3, 11):
+ from typing import TypeVarTuple, Unpack
+else:
+ from typing_extensions import TypeVarTuple, Unpack
+
+if TYPE_CHECKING:
+ from typing import Literal
+
+ from .._core._synchronization import CapacityLimiter, Event
+ from .._core._tasks import CancelScope
+ from .._core._testing import TaskInfo
+ from ..from_thread import BlockingPortal
+ from ._sockets import (
+ ConnectedUDPSocket,
+ ConnectedUNIXDatagramSocket,
+ IPSockAddrType,
+ SocketListener,
+ SocketStream,
+ UDPSocket,
+ UNIXDatagramSocket,
+ UNIXSocketStream,
+ )
+ from ._subprocesses import Process
+ from ._tasks import TaskGroup
+ from ._testing import TestRunner
+
+T_Retval = TypeVar("T_Retval")
+PosArgsT = TypeVarTuple("PosArgsT")
+
+
+class AsyncBackend(metaclass=ABCMeta):
+ @classmethod
+ @abstractmethod
+ def run(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
+ args: tuple[Unpack[PosArgsT]],
+ kwargs: dict[str, Any],
+ options: dict[str, Any],
+ ) -> T_Retval:
+ """
+ Run the given coroutine function in an asynchronous event loop.
+
+ The current thread must not be already running an event loop.
+
+ :param func: a coroutine function
+ :param args: positional arguments to ``func``
+ :param kwargs: positional arguments to ``func``
+ :param options: keyword arguments to call the backend ``run()`` implementation
+ with
+ :return: the return value of the coroutine function
+ """
+
+ @classmethod
+ @abstractmethod
+ def current_token(cls) -> object:
+ """
+
+ :return:
+ """
+
+ @classmethod
+ @abstractmethod
+ def current_time(cls) -> float:
+ """
+ Return the current value of the event loop's internal clock.
+
+ :return: the clock value (seconds)
+ """
+
+ @classmethod
+ @abstractmethod
+ def cancelled_exception_class(cls) -> type[BaseException]:
+ """Return the exception class that is raised in a task if it's cancelled."""
+
+ @classmethod
+ @abstractmethod
+ async def checkpoint(cls) -> None:
+ """
+ Check if the task has been cancelled, and allow rescheduling of other tasks.
+
+ This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
+ :meth:`cancel_shielded_checkpoint`.
+ """
+
+ @classmethod
+ async def checkpoint_if_cancelled(cls) -> None:
+ """
+ Check if the current task group has been cancelled.
+
+ This will check if the task has been cancelled, but will not allow other tasks
+ to be scheduled if not.
+
+ """
+ if cls.current_effective_deadline() == -math.inf:
+ await cls.checkpoint()
+
+ @classmethod
+ async def cancel_shielded_checkpoint(cls) -> None:
+ """
+ Allow the rescheduling of other tasks.
+
+ This will give other tasks the opportunity to run, but without checking if the
+ current task group has been cancelled, unlike with :meth:`checkpoint`.
+
+ """
+ with cls.create_cancel_scope(shield=True):
+ await cls.sleep(0)
+
+ @classmethod
+ @abstractmethod
+ async def sleep(cls, delay: float) -> None:
+ """
+ Pause the current task for the specified duration.
+
+ :param delay: the duration, in seconds
+ """
+
+ @classmethod
+ @abstractmethod
+ def create_cancel_scope(
+ cls, *, deadline: float = math.inf, shield: bool = False
+ ) -> CancelScope:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def current_effective_deadline(cls) -> float:
+ """
+ Return the nearest deadline among all the cancel scopes effective for the
+ current task.
+
+ :return:
+ - a clock value from the event loop's internal clock
+ - ``inf`` if there is no deadline in effect
+ - ``-inf`` if the current scope has been cancelled
+ :rtype: float
+ """
+
+ @classmethod
+ @abstractmethod
+ def create_task_group(cls) -> TaskGroup:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def create_event(cls) -> Event:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def run_sync_in_worker_thread(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], T_Retval],
+ args: tuple[Unpack[PosArgsT]],
+ abandon_on_cancel: bool = False,
+ limiter: CapacityLimiter | None = None,
+ ) -> T_Retval:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def check_cancelled(cls) -> None:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def run_async_from_thread(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
+ args: tuple[Unpack[PosArgsT]],
+ token: object,
+ ) -> T_Retval:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def run_sync_from_thread(
+ cls,
+ func: Callable[[Unpack[PosArgsT]], T_Retval],
+ args: tuple[Unpack[PosArgsT]],
+ token: object,
+ ) -> T_Retval:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def create_blocking_portal(cls) -> BlockingPortal:
+ pass
+
+ @classmethod
+ @overload
+ async def open_process(
+ cls,
+ command: str | bytes,
+ *,
+ shell: Literal[True],
+ stdin: int | IO[Any] | None,
+ stdout: int | IO[Any] | None,
+ stderr: int | IO[Any] | None,
+ cwd: str | bytes | PathLike[str] | None = None,
+ env: Mapping[str, str] | None = None,
+ start_new_session: bool = False,
+ ) -> Process:
+ pass
+
+ @classmethod
+ @overload
+ async def open_process(
+ cls,
+ command: Sequence[str | bytes],
+ *,
+ shell: Literal[False],
+ stdin: int | IO[Any] | None,
+ stdout: int | IO[Any] | None,
+ stderr: int | IO[Any] | None,
+ cwd: str | bytes | PathLike[str] | None = None,
+ env: Mapping[str, str] | None = None,
+ start_new_session: bool = False,
+ ) -> Process:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def open_process(
+ cls,
+ command: str | bytes | Sequence[str | bytes],
+ *,
+ shell: bool,
+ stdin: int | IO[Any] | None,
+ stdout: int | IO[Any] | None,
+ stderr: int | IO[Any] | None,
+ cwd: str | bytes | PathLike[str] | None = None,
+ env: Mapping[str, str] | None = None,
+ start_new_session: bool = False,
+ ) -> Process:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def connect_tcp(
+ cls, host: str, port: int, local_address: IPSockAddrType | None = None
+ ) -> SocketStream:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def create_tcp_listener(cls, sock: socket) -> SocketListener:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def create_unix_listener(cls, sock: socket) -> SocketListener:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def create_udp_socket(
+ cls,
+ family: AddressFamily,
+ local_address: IPSockAddrType | None,
+ remote_address: IPSockAddrType | None,
+ reuse_port: bool,
+ ) -> UDPSocket | ConnectedUDPSocket:
+ pass
+
+ @classmethod
+ @overload
+ async def create_unix_datagram_socket(
+ cls, raw_socket: socket, remote_path: None
+ ) -> UNIXDatagramSocket:
+ ...
+
+ @classmethod
+ @overload
+ async def create_unix_datagram_socket(
+ cls, raw_socket: socket, remote_path: str | bytes
+ ) -> ConnectedUNIXDatagramSocket:
+ ...
+
+ @classmethod
+ @abstractmethod
+ async def create_unix_datagram_socket(
+ cls, raw_socket: socket, remote_path: str | bytes | None
+ ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def getaddrinfo(
+ cls,
+ host: bytes | str | None,
+ port: str | int | None,
+ *,
+ family: int | AddressFamily = 0,
+ type: int | SocketKind = 0,
+ proto: int = 0,
+ flags: int = 0,
+ ) -> list[
+ tuple[
+ AddressFamily,
+ SocketKind,
+ int,
+ str,
+ tuple[str, int] | tuple[str, int, int, int],
+ ]
+ ]:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def getnameinfo(
+ cls, sockaddr: IPSockAddrType, flags: int = 0
+ ) -> tuple[str, str]:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def wait_socket_readable(cls, sock: socket) -> None:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def wait_socket_writable(cls, sock: socket) -> None:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def current_default_thread_limiter(cls) -> CapacityLimiter:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def open_signal_receiver(
+ cls, *signals: Signals
+ ) -> ContextManager[AsyncIterator[Signals]]:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def get_current_task(cls) -> TaskInfo:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def get_running_tasks(cls) -> list[TaskInfo]:
+ pass
+
+ @classmethod
+ @abstractmethod
+ async def wait_all_tasks_blocked(cls) -> None:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
+ pass