summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py')
-rw-r--r--venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py392
1 files changed, 0 insertions, 392 deletions
diff --git a/venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py b/venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py
deleted file mode 100644
index 4470d83..0000000
--- a/venv/lib/python3.11/site-packages/anyio/abc/_eventloop.py
+++ /dev/null
@@ -1,392 +0,0 @@
-from __future__ import annotations
-
-import math
-import sys
-from abc import ABCMeta, abstractmethod
-from collections.abc import AsyncIterator, Awaitable, Mapping
-from os import PathLike
-from signal import Signals
-from socket import AddressFamily, SocketKind, socket
-from typing import (
- IO,
- TYPE_CHECKING,
- Any,
- Callable,
- ContextManager,
- Sequence,
- TypeVar,
- overload,
-)
-
-if sys.version_info >= (3, 11):
- from typing import TypeVarTuple, Unpack
-else:
- from typing_extensions import TypeVarTuple, Unpack
-
-if TYPE_CHECKING:
- from typing import Literal
-
- from .._core._synchronization import CapacityLimiter, Event
- from .._core._tasks import CancelScope
- from .._core._testing import TaskInfo
- from ..from_thread import BlockingPortal
- from ._sockets import (
- ConnectedUDPSocket,
- ConnectedUNIXDatagramSocket,
- IPSockAddrType,
- SocketListener,
- SocketStream,
- UDPSocket,
- UNIXDatagramSocket,
- UNIXSocketStream,
- )
- from ._subprocesses import Process
- from ._tasks import TaskGroup
- from ._testing import TestRunner
-
-T_Retval = TypeVar("T_Retval")
-PosArgsT = TypeVarTuple("PosArgsT")
-
-
-class AsyncBackend(metaclass=ABCMeta):
- @classmethod
- @abstractmethod
- def run(
- cls,
- func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
- args: tuple[Unpack[PosArgsT]],
- kwargs: dict[str, Any],
- options: dict[str, Any],
- ) -> T_Retval:
- """
- Run the given coroutine function in an asynchronous event loop.
-
- The current thread must not be already running an event loop.
-
- :param func: a coroutine function
- :param args: positional arguments to ``func``
- :param kwargs: positional arguments to ``func``
- :param options: keyword arguments to call the backend ``run()`` implementation
- with
- :return: the return value of the coroutine function
- """
-
- @classmethod
- @abstractmethod
- def current_token(cls) -> object:
- """
-
- :return:
- """
-
- @classmethod
- @abstractmethod
- def current_time(cls) -> float:
- """
- Return the current value of the event loop's internal clock.
-
- :return: the clock value (seconds)
- """
-
- @classmethod
- @abstractmethod
- def cancelled_exception_class(cls) -> type[BaseException]:
- """Return the exception class that is raised in a task if it's cancelled."""
-
- @classmethod
- @abstractmethod
- async def checkpoint(cls) -> None:
- """
- Check if the task has been cancelled, and allow rescheduling of other tasks.
-
- This is effectively the same as running :meth:`checkpoint_if_cancelled` and then
- :meth:`cancel_shielded_checkpoint`.
- """
-
- @classmethod
- async def checkpoint_if_cancelled(cls) -> None:
- """
- Check if the current task group has been cancelled.
-
- This will check if the task has been cancelled, but will not allow other tasks
- to be scheduled if not.
-
- """
- if cls.current_effective_deadline() == -math.inf:
- await cls.checkpoint()
-
- @classmethod
- async def cancel_shielded_checkpoint(cls) -> None:
- """
- Allow the rescheduling of other tasks.
-
- This will give other tasks the opportunity to run, but without checking if the
- current task group has been cancelled, unlike with :meth:`checkpoint`.
-
- """
- with cls.create_cancel_scope(shield=True):
- await cls.sleep(0)
-
- @classmethod
- @abstractmethod
- async def sleep(cls, delay: float) -> None:
- """
- Pause the current task for the specified duration.
-
- :param delay: the duration, in seconds
- """
-
- @classmethod
- @abstractmethod
- def create_cancel_scope(
- cls, *, deadline: float = math.inf, shield: bool = False
- ) -> CancelScope:
- pass
-
- @classmethod
- @abstractmethod
- def current_effective_deadline(cls) -> float:
- """
- Return the nearest deadline among all the cancel scopes effective for the
- current task.
-
- :return:
- - a clock value from the event loop's internal clock
- - ``inf`` if there is no deadline in effect
- - ``-inf`` if the current scope has been cancelled
- :rtype: float
- """
-
- @classmethod
- @abstractmethod
- def create_task_group(cls) -> TaskGroup:
- pass
-
- @classmethod
- @abstractmethod
- def create_event(cls) -> Event:
- pass
-
- @classmethod
- @abstractmethod
- def create_capacity_limiter(cls, total_tokens: float) -> CapacityLimiter:
- pass
-
- @classmethod
- @abstractmethod
- async def run_sync_in_worker_thread(
- cls,
- func: Callable[[Unpack[PosArgsT]], T_Retval],
- args: tuple[Unpack[PosArgsT]],
- abandon_on_cancel: bool = False,
- limiter: CapacityLimiter | None = None,
- ) -> T_Retval:
- pass
-
- @classmethod
- @abstractmethod
- def check_cancelled(cls) -> None:
- pass
-
- @classmethod
- @abstractmethod
- def run_async_from_thread(
- cls,
- func: Callable[[Unpack[PosArgsT]], Awaitable[T_Retval]],
- args: tuple[Unpack[PosArgsT]],
- token: object,
- ) -> T_Retval:
- pass
-
- @classmethod
- @abstractmethod
- def run_sync_from_thread(
- cls,
- func: Callable[[Unpack[PosArgsT]], T_Retval],
- args: tuple[Unpack[PosArgsT]],
- token: object,
- ) -> T_Retval:
- pass
-
- @classmethod
- @abstractmethod
- def create_blocking_portal(cls) -> BlockingPortal:
- pass
-
- @classmethod
- @overload
- async def open_process(
- cls,
- command: str | bytes,
- *,
- shell: Literal[True],
- stdin: int | IO[Any] | None,
- stdout: int | IO[Any] | None,
- stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike[str] | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
- ) -> Process:
- pass
-
- @classmethod
- @overload
- async def open_process(
- cls,
- command: Sequence[str | bytes],
- *,
- shell: Literal[False],
- stdin: int | IO[Any] | None,
- stdout: int | IO[Any] | None,
- stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike[str] | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
- ) -> Process:
- pass
-
- @classmethod
- @abstractmethod
- async def open_process(
- cls,
- command: str | bytes | Sequence[str | bytes],
- *,
- shell: bool,
- stdin: int | IO[Any] | None,
- stdout: int | IO[Any] | None,
- stderr: int | IO[Any] | None,
- cwd: str | bytes | PathLike[str] | None = None,
- env: Mapping[str, str] | None = None,
- start_new_session: bool = False,
- ) -> Process:
- pass
-
- @classmethod
- @abstractmethod
- def setup_process_pool_exit_at_shutdown(cls, workers: set[Process]) -> None:
- pass
-
- @classmethod
- @abstractmethod
- async def connect_tcp(
- cls, host: str, port: int, local_address: IPSockAddrType | None = None
- ) -> SocketStream:
- pass
-
- @classmethod
- @abstractmethod
- async def connect_unix(cls, path: str | bytes) -> UNIXSocketStream:
- pass
-
- @classmethod
- @abstractmethod
- def create_tcp_listener(cls, sock: socket) -> SocketListener:
- pass
-
- @classmethod
- @abstractmethod
- def create_unix_listener(cls, sock: socket) -> SocketListener:
- pass
-
- @classmethod
- @abstractmethod
- async def create_udp_socket(
- cls,
- family: AddressFamily,
- local_address: IPSockAddrType | None,
- remote_address: IPSockAddrType | None,
- reuse_port: bool,
- ) -> UDPSocket | ConnectedUDPSocket:
- pass
-
- @classmethod
- @overload
- async def create_unix_datagram_socket(
- cls, raw_socket: socket, remote_path: None
- ) -> UNIXDatagramSocket:
- ...
-
- @classmethod
- @overload
- async def create_unix_datagram_socket(
- cls, raw_socket: socket, remote_path: str | bytes
- ) -> ConnectedUNIXDatagramSocket:
- ...
-
- @classmethod
- @abstractmethod
- async def create_unix_datagram_socket(
- cls, raw_socket: socket, remote_path: str | bytes | None
- ) -> UNIXDatagramSocket | ConnectedUNIXDatagramSocket:
- pass
-
- @classmethod
- @abstractmethod
- async def getaddrinfo(
- cls,
- host: bytes | str | None,
- port: str | int | None,
- *,
- family: int | AddressFamily = 0,
- type: int | SocketKind = 0,
- proto: int = 0,
- flags: int = 0,
- ) -> list[
- tuple[
- AddressFamily,
- SocketKind,
- int,
- str,
- tuple[str, int] | tuple[str, int, int, int],
- ]
- ]:
- pass
-
- @classmethod
- @abstractmethod
- async def getnameinfo(
- cls, sockaddr: IPSockAddrType, flags: int = 0
- ) -> tuple[str, str]:
- pass
-
- @classmethod
- @abstractmethod
- async def wait_socket_readable(cls, sock: socket) -> None:
- pass
-
- @classmethod
- @abstractmethod
- async def wait_socket_writable(cls, sock: socket) -> None:
- pass
-
- @classmethod
- @abstractmethod
- def current_default_thread_limiter(cls) -> CapacityLimiter:
- pass
-
- @classmethod
- @abstractmethod
- def open_signal_receiver(
- cls, *signals: Signals
- ) -> ContextManager[AsyncIterator[Signals]]:
- pass
-
- @classmethod
- @abstractmethod
- def get_current_task(cls) -> TaskInfo:
- pass
-
- @classmethod
- @abstractmethod
- def get_running_tasks(cls) -> list[TaskInfo]:
- pass
-
- @classmethod
- @abstractmethod
- async def wait_all_tasks_blocked(cls) -> None:
- pass
-
- @classmethod
- @abstractmethod
- def create_test_runner(cls, options: dict[str, Any]) -> TestRunner:
- pass