summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/anyio/to_process.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/anyio/to_process.py
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/anyio/to_process.py')
-rw-r--r--venv/lib/python3.11/site-packages/anyio/to_process.py259
1 files changed, 0 insertions, 259 deletions
diff --git a/venv/lib/python3.11/site-packages/anyio/to_process.py b/venv/lib/python3.11/site-packages/anyio/to_process.py
deleted file mode 100644
index 1ff06f0..0000000
--- a/venv/lib/python3.11/site-packages/anyio/to_process.py
+++ /dev/null
@@ -1,259 +0,0 @@
-from __future__ import annotations
-
-import os
-import pickle
-import subprocess
-import sys
-from collections import deque
-from collections.abc import Callable
-from importlib.util import module_from_spec, spec_from_file_location
-from typing import TypeVar, cast
-
-from ._core._eventloop import current_time, get_async_backend, get_cancelled_exc_class
-from ._core._exceptions import BrokenWorkerProcess
-from ._core._subprocesses import open_process
-from ._core._synchronization import CapacityLimiter
-from ._core._tasks import CancelScope, fail_after
-from .abc import ByteReceiveStream, ByteSendStream, Process
-from .lowlevel import RunVar, checkpoint_if_cancelled
-from .streams.buffered import BufferedByteReceiveStream
-
-if sys.version_info >= (3, 11):
- from typing import TypeVarTuple, Unpack
-else:
- from typing_extensions import TypeVarTuple, Unpack
-
-WORKER_MAX_IDLE_TIME = 300 # 5 minutes
-
-T_Retval = TypeVar("T_Retval")
-PosArgsT = TypeVarTuple("PosArgsT")
-
-_process_pool_workers: RunVar[set[Process]] = RunVar("_process_pool_workers")
-_process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar(
- "_process_pool_idle_workers"
-)
-_default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter")
-
-
-async def run_sync(
- func: Callable[[Unpack[PosArgsT]], T_Retval],
- *args: Unpack[PosArgsT],
- cancellable: bool = False,
- limiter: CapacityLimiter | None = None,
-) -> T_Retval:
- """
- Call the given function with the given arguments in a worker process.
-
- If the ``cancellable`` option is enabled and the task waiting for its completion is
- cancelled, the worker process running it will be abruptly terminated using SIGKILL
- (or ``terminateProcess()`` on Windows).
-
- :param func: a callable
- :param args: positional arguments for the callable
- :param cancellable: ``True`` to allow cancellation of the operation while it's
- running
- :param limiter: capacity limiter to use to limit the total amount of processes
- running (if omitted, the default limiter is used)
- :return: an awaitable that yields the return value of the function.
-
- """
-
- async def send_raw_command(pickled_cmd: bytes) -> object:
- try:
- await stdin.send(pickled_cmd)
- response = await buffered.receive_until(b"\n", 50)
- status, length = response.split(b" ")
- if status not in (b"RETURN", b"EXCEPTION"):
- raise RuntimeError(
- f"Worker process returned unexpected response: {response!r}"
- )
-
- pickled_response = await buffered.receive_exactly(int(length))
- except BaseException as exc:
- workers.discard(process)
- try:
- process.kill()
- with CancelScope(shield=True):
- await process.aclose()
- except ProcessLookupError:
- pass
-
- if isinstance(exc, get_cancelled_exc_class()):
- raise
- else:
- raise BrokenWorkerProcess from exc
-
- retval = pickle.loads(pickled_response)
- if status == b"EXCEPTION":
- assert isinstance(retval, BaseException)
- raise retval
- else:
- return retval
-
- # First pickle the request before trying to reserve a worker process
- await checkpoint_if_cancelled()
- request = pickle.dumps(("run", func, args), protocol=pickle.HIGHEST_PROTOCOL)
-
- # If this is the first run in this event loop thread, set up the necessary variables
- try:
- workers = _process_pool_workers.get()
- idle_workers = _process_pool_idle_workers.get()
- except LookupError:
- workers = set()
- idle_workers = deque()
- _process_pool_workers.set(workers)
- _process_pool_idle_workers.set(idle_workers)
- get_async_backend().setup_process_pool_exit_at_shutdown(workers)
-
- async with limiter or current_default_process_limiter():
- # Pop processes from the pool (starting from the most recently used) until we
- # find one that hasn't exited yet
- process: Process
- while idle_workers:
- process, idle_since = idle_workers.pop()
- if process.returncode is None:
- stdin = cast(ByteSendStream, process.stdin)
- buffered = BufferedByteReceiveStream(
- cast(ByteReceiveStream, process.stdout)
- )
-
- # Prune any other workers that have been idle for WORKER_MAX_IDLE_TIME
- # seconds or longer
- now = current_time()
- killed_processes: list[Process] = []
- while idle_workers:
- if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME:
- break
-
- process_to_kill, idle_since = idle_workers.popleft()
- process_to_kill.kill()
- workers.remove(process_to_kill)
- killed_processes.append(process_to_kill)
-
- with CancelScope(shield=True):
- for killed_process in killed_processes:
- await killed_process.aclose()
-
- break
-
- workers.remove(process)
- else:
- command = [sys.executable, "-u", "-m", __name__]
- process = await open_process(
- command, stdin=subprocess.PIPE, stdout=subprocess.PIPE
- )
- try:
- stdin = cast(ByteSendStream, process.stdin)
- buffered = BufferedByteReceiveStream(
- cast(ByteReceiveStream, process.stdout)
- )
- with fail_after(20):
- message = await buffered.receive(6)
-
- if message != b"READY\n":
- raise BrokenWorkerProcess(
- f"Worker process returned unexpected response: {message!r}"
- )
-
- main_module_path = getattr(sys.modules["__main__"], "__file__", None)
- pickled = pickle.dumps(
- ("init", sys.path, main_module_path),
- protocol=pickle.HIGHEST_PROTOCOL,
- )
- await send_raw_command(pickled)
- except (BrokenWorkerProcess, get_cancelled_exc_class()):
- raise
- except BaseException as exc:
- process.kill()
- raise BrokenWorkerProcess(
- "Error during worker process initialization"
- ) from exc
-
- workers.add(process)
-
- with CancelScope(shield=not cancellable):
- try:
- return cast(T_Retval, await send_raw_command(request))
- finally:
- if process in workers:
- idle_workers.append((process, current_time()))
-
-
-def current_default_process_limiter() -> CapacityLimiter:
- """
- Return the capacity limiter that is used by default to limit the number of worker
- processes.
-
- :return: a capacity limiter object
-
- """
- try:
- return _default_process_limiter.get()
- except LookupError:
- limiter = CapacityLimiter(os.cpu_count() or 2)
- _default_process_limiter.set(limiter)
- return limiter
-
-
-def process_worker() -> None:
- # Redirect standard streams to os.devnull so that user code won't interfere with the
- # parent-worker communication
- stdin = sys.stdin
- stdout = sys.stdout
- sys.stdin = open(os.devnull)
- sys.stdout = open(os.devnull, "w")
-
- stdout.buffer.write(b"READY\n")
- while True:
- retval = exception = None
- try:
- command, *args = pickle.load(stdin.buffer)
- except EOFError:
- return
- except BaseException as exc:
- exception = exc
- else:
- if command == "run":
- func, args = args
- try:
- retval = func(*args)
- except BaseException as exc:
- exception = exc
- elif command == "init":
- main_module_path: str | None
- sys.path, main_module_path = args
- del sys.modules["__main__"]
- if main_module_path:
- # Load the parent's main module but as __mp_main__ instead of
- # __main__ (like multiprocessing does) to avoid infinite recursion
- try:
- spec = spec_from_file_location("__mp_main__", main_module_path)
- if spec and spec.loader:
- main = module_from_spec(spec)
- spec.loader.exec_module(main)
- sys.modules["__main__"] = main
- except BaseException as exc:
- exception = exc
-
- try:
- if exception is not None:
- status = b"EXCEPTION"
- pickled = pickle.dumps(exception, pickle.HIGHEST_PROTOCOL)
- else:
- status = b"RETURN"
- pickled = pickle.dumps(retval, pickle.HIGHEST_PROTOCOL)
- except BaseException as exc:
- exception = exc
- status = b"EXCEPTION"
- pickled = pickle.dumps(exc, pickle.HIGHEST_PROTOCOL)
-
- stdout.buffer.write(b"%s %d\n" % (status, len(pickled)))
- stdout.buffer.write(pickled)
-
- # Respect SIGTERM
- if isinstance(exception, SystemExit):
- raise exception
-
-
-if __name__ == "__main__":
- process_worker()