summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/anyio/to_process.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/anyio/to_process.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/anyio/to_process.py')
-rw-r--r--venv/lib/python3.11/site-packages/anyio/to_process.py259
1 files changed, 259 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/anyio/to_process.py b/venv/lib/python3.11/site-packages/anyio/to_process.py
new file mode 100644
index 0000000..1ff06f0
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/anyio/to_process.py
@@ -0,0 +1,259 @@
+from __future__ import annotations
+
+import os
+import pickle
+import subprocess
+import sys
+from collections import deque
+from collections.abc import Callable
+from importlib.util import module_from_spec, spec_from_file_location
+from typing import TypeVar, cast
+
+from ._core._eventloop import current_time, get_async_backend, get_cancelled_exc_class
+from ._core._exceptions import BrokenWorkerProcess
+from ._core._subprocesses import open_process
+from ._core._synchronization import CapacityLimiter
+from ._core._tasks import CancelScope, fail_after
+from .abc import ByteReceiveStream, ByteSendStream, Process
+from .lowlevel import RunVar, checkpoint_if_cancelled
+from .streams.buffered import BufferedByteReceiveStream
+
+if sys.version_info >= (3, 11):
+ from typing import TypeVarTuple, Unpack
+else:
+ from typing_extensions import TypeVarTuple, Unpack
+
+WORKER_MAX_IDLE_TIME = 300 # 5 minutes
+
+T_Retval = TypeVar("T_Retval")
+PosArgsT = TypeVarTuple("PosArgsT")
+
+_process_pool_workers: RunVar[set[Process]] = RunVar("_process_pool_workers")
+_process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar(
+ "_process_pool_idle_workers"
+)
+_default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter")
+
+
+async def run_sync(
+ func: Callable[[Unpack[PosArgsT]], T_Retval],
+ *args: Unpack[PosArgsT],
+ cancellable: bool = False,
+ limiter: CapacityLimiter | None = None,
+) -> T_Retval:
+ """
+ Call the given function with the given arguments in a worker process.
+
+ If the ``cancellable`` option is enabled and the task waiting for its completion is
+ cancelled, the worker process running it will be abruptly terminated using SIGKILL
+ (or ``terminateProcess()`` on Windows).
+
+ :param func: a callable
+ :param args: positional arguments for the callable
+ :param cancellable: ``True`` to allow cancellation of the operation while it's
+ running
+ :param limiter: capacity limiter to use to limit the total amount of processes
+ running (if omitted, the default limiter is used)
+ :return: an awaitable that yields the return value of the function.
+
+ """
+
+ async def send_raw_command(pickled_cmd: bytes) -> object:
+ try:
+ await stdin.send(pickled_cmd)
+ response = await buffered.receive_until(b"\n", 50)
+ status, length = response.split(b" ")
+ if status not in (b"RETURN", b"EXCEPTION"):
+ raise RuntimeError(
+ f"Worker process returned unexpected response: {response!r}"
+ )
+
+ pickled_response = await buffered.receive_exactly(int(length))
+ except BaseException as exc:
+ workers.discard(process)
+ try:
+ process.kill()
+ with CancelScope(shield=True):
+ await process.aclose()
+ except ProcessLookupError:
+ pass
+
+ if isinstance(exc, get_cancelled_exc_class()):
+ raise
+ else:
+ raise BrokenWorkerProcess from exc
+
+ retval = pickle.loads(pickled_response)
+ if status == b"EXCEPTION":
+ assert isinstance(retval, BaseException)
+ raise retval
+ else:
+ return retval
+
+ # First pickle the request before trying to reserve a worker process
+ await checkpoint_if_cancelled()
+ request = pickle.dumps(("run", func, args), protocol=pickle.HIGHEST_PROTOCOL)
+
+ # If this is the first run in this event loop thread, set up the necessary variables
+ try:
+ workers = _process_pool_workers.get()
+ idle_workers = _process_pool_idle_workers.get()
+ except LookupError:
+ workers = set()
+ idle_workers = deque()
+ _process_pool_workers.set(workers)
+ _process_pool_idle_workers.set(idle_workers)
+ get_async_backend().setup_process_pool_exit_at_shutdown(workers)
+
+ async with limiter or current_default_process_limiter():
+ # Pop processes from the pool (starting from the most recently used) until we
+ # find one that hasn't exited yet
+ process: Process
+ while idle_workers:
+ process, idle_since = idle_workers.pop()
+ if process.returncode is None:
+ stdin = cast(ByteSendStream, process.stdin)
+ buffered = BufferedByteReceiveStream(
+ cast(ByteReceiveStream, process.stdout)
+ )
+
+ # Prune any other workers that have been idle for WORKER_MAX_IDLE_TIME
+ # seconds or longer
+ now = current_time()
+ killed_processes: list[Process] = []
+ while idle_workers:
+ if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME:
+ break
+
+ process_to_kill, idle_since = idle_workers.popleft()
+ process_to_kill.kill()
+ workers.remove(process_to_kill)
+ killed_processes.append(process_to_kill)
+
+ with CancelScope(shield=True):
+ for killed_process in killed_processes:
+ await killed_process.aclose()
+
+ break
+
+ workers.remove(process)
+ else:
+ command = [sys.executable, "-u", "-m", __name__]
+ process = await open_process(
+ command, stdin=subprocess.PIPE, stdout=subprocess.PIPE
+ )
+ try:
+ stdin = cast(ByteSendStream, process.stdin)
+ buffered = BufferedByteReceiveStream(
+ cast(ByteReceiveStream, process.stdout)
+ )
+ with fail_after(20):
+ message = await buffered.receive(6)
+
+ if message != b"READY\n":
+ raise BrokenWorkerProcess(
+ f"Worker process returned unexpected response: {message!r}"
+ )
+
+ main_module_path = getattr(sys.modules["__main__"], "__file__", None)
+ pickled = pickle.dumps(
+ ("init", sys.path, main_module_path),
+ protocol=pickle.HIGHEST_PROTOCOL,
+ )
+ await send_raw_command(pickled)
+ except (BrokenWorkerProcess, get_cancelled_exc_class()):
+ raise
+ except BaseException as exc:
+ process.kill()
+ raise BrokenWorkerProcess(
+ "Error during worker process initialization"
+ ) from exc
+
+ workers.add(process)
+
+ with CancelScope(shield=not cancellable):
+ try:
+ return cast(T_Retval, await send_raw_command(request))
+ finally:
+ if process in workers:
+ idle_workers.append((process, current_time()))
+
+
+def current_default_process_limiter() -> CapacityLimiter:
+ """
+ Return the capacity limiter that is used by default to limit the number of worker
+ processes.
+
+ :return: a capacity limiter object
+
+ """
+ try:
+ return _default_process_limiter.get()
+ except LookupError:
+ limiter = CapacityLimiter(os.cpu_count() or 2)
+ _default_process_limiter.set(limiter)
+ return limiter
+
+
+def process_worker() -> None:
+ # Redirect standard streams to os.devnull so that user code won't interfere with the
+ # parent-worker communication
+ stdin = sys.stdin
+ stdout = sys.stdout
+ sys.stdin = open(os.devnull)
+ sys.stdout = open(os.devnull, "w")
+
+ stdout.buffer.write(b"READY\n")
+ while True:
+ retval = exception = None
+ try:
+ command, *args = pickle.load(stdin.buffer)
+ except EOFError:
+ return
+ except BaseException as exc:
+ exception = exc
+ else:
+ if command == "run":
+ func, args = args
+ try:
+ retval = func(*args)
+ except BaseException as exc:
+ exception = exc
+ elif command == "init":
+ main_module_path: str | None
+ sys.path, main_module_path = args
+ del sys.modules["__main__"]
+ if main_module_path:
+ # Load the parent's main module but as __mp_main__ instead of
+ # __main__ (like multiprocessing does) to avoid infinite recursion
+ try:
+ spec = spec_from_file_location("__mp_main__", main_module_path)
+ if spec and spec.loader:
+ main = module_from_spec(spec)
+ spec.loader.exec_module(main)
+ sys.modules["__main__"] = main
+ except BaseException as exc:
+ exception = exc
+
+ try:
+ if exception is not None:
+ status = b"EXCEPTION"
+ pickled = pickle.dumps(exception, pickle.HIGHEST_PROTOCOL)
+ else:
+ status = b"RETURN"
+ pickled = pickle.dumps(retval, pickle.HIGHEST_PROTOCOL)
+ except BaseException as exc:
+ exception = exc
+ status = b"EXCEPTION"
+ pickled = pickle.dumps(exc, pickle.HIGHEST_PROTOCOL)
+
+ stdout.buffer.write(b"%s %d\n" % (status, len(pickled)))
+ stdout.buffer.write(pickled)
+
+ # Respect SIGTERM
+ if isinstance(exception, SystemExit):
+ raise exception
+
+
+if __name__ == "__main__":
+ process_worker()