diff options
| author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 | 
|---|---|---|
| committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 | 
| commit | 12cf076118570eebbff08c6b3090e0d4798447a1 (patch) | |
| tree | 3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/anyio/to_process.py | |
| parent | c45662ff3923b34614ddcc8feb9195541166dcc5 (diff) | |
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/anyio/to_process.py')
| -rw-r--r-- | venv/lib/python3.11/site-packages/anyio/to_process.py | 259 | 
1 files changed, 0 insertions, 259 deletions
| diff --git a/venv/lib/python3.11/site-packages/anyio/to_process.py b/venv/lib/python3.11/site-packages/anyio/to_process.py deleted file mode 100644 index 1ff06f0..0000000 --- a/venv/lib/python3.11/site-packages/anyio/to_process.py +++ /dev/null @@ -1,259 +0,0 @@ -from __future__ import annotations - -import os -import pickle -import subprocess -import sys -from collections import deque -from collections.abc import Callable -from importlib.util import module_from_spec, spec_from_file_location -from typing import TypeVar, cast - -from ._core._eventloop import current_time, get_async_backend, get_cancelled_exc_class -from ._core._exceptions import BrokenWorkerProcess -from ._core._subprocesses import open_process -from ._core._synchronization import CapacityLimiter -from ._core._tasks import CancelScope, fail_after -from .abc import ByteReceiveStream, ByteSendStream, Process -from .lowlevel import RunVar, checkpoint_if_cancelled -from .streams.buffered import BufferedByteReceiveStream - -if sys.version_info >= (3, 11): -    from typing import TypeVarTuple, Unpack -else: -    from typing_extensions import TypeVarTuple, Unpack - -WORKER_MAX_IDLE_TIME = 300  # 5 minutes - -T_Retval = TypeVar("T_Retval") -PosArgsT = TypeVarTuple("PosArgsT") - -_process_pool_workers: RunVar[set[Process]] = RunVar("_process_pool_workers") -_process_pool_idle_workers: RunVar[deque[tuple[Process, float]]] = RunVar( -    "_process_pool_idle_workers" -) -_default_process_limiter: RunVar[CapacityLimiter] = RunVar("_default_process_limiter") - - -async def run_sync( -    func: Callable[[Unpack[PosArgsT]], T_Retval], -    *args: Unpack[PosArgsT], -    cancellable: bool = False, -    limiter: CapacityLimiter | None = None, -) -> T_Retval: -    """ -    Call the given function with the given arguments in a worker process. - -    If the ``cancellable`` option is enabled and the task waiting for its completion is -    cancelled, the worker process running it will be abruptly terminated using SIGKILL -    (or ``terminateProcess()`` on Windows). - -    :param func: a callable -    :param args: positional arguments for the callable -    :param cancellable: ``True`` to allow cancellation of the operation while it's -        running -    :param limiter: capacity limiter to use to limit the total amount of processes -        running (if omitted, the default limiter is used) -    :return: an awaitable that yields the return value of the function. - -    """ - -    async def send_raw_command(pickled_cmd: bytes) -> object: -        try: -            await stdin.send(pickled_cmd) -            response = await buffered.receive_until(b"\n", 50) -            status, length = response.split(b" ") -            if status not in (b"RETURN", b"EXCEPTION"): -                raise RuntimeError( -                    f"Worker process returned unexpected response: {response!r}" -                ) - -            pickled_response = await buffered.receive_exactly(int(length)) -        except BaseException as exc: -            workers.discard(process) -            try: -                process.kill() -                with CancelScope(shield=True): -                    await process.aclose() -            except ProcessLookupError: -                pass - -            if isinstance(exc, get_cancelled_exc_class()): -                raise -            else: -                raise BrokenWorkerProcess from exc - -        retval = pickle.loads(pickled_response) -        if status == b"EXCEPTION": -            assert isinstance(retval, BaseException) -            raise retval -        else: -            return retval - -    # First pickle the request before trying to reserve a worker process -    await checkpoint_if_cancelled() -    request = pickle.dumps(("run", func, args), protocol=pickle.HIGHEST_PROTOCOL) - -    # If this is the first run in this event loop thread, set up the necessary variables -    try: -        workers = _process_pool_workers.get() -        idle_workers = _process_pool_idle_workers.get() -    except LookupError: -        workers = set() -        idle_workers = deque() -        _process_pool_workers.set(workers) -        _process_pool_idle_workers.set(idle_workers) -        get_async_backend().setup_process_pool_exit_at_shutdown(workers) - -    async with limiter or current_default_process_limiter(): -        # Pop processes from the pool (starting from the most recently used) until we -        # find one that hasn't exited yet -        process: Process -        while idle_workers: -            process, idle_since = idle_workers.pop() -            if process.returncode is None: -                stdin = cast(ByteSendStream, process.stdin) -                buffered = BufferedByteReceiveStream( -                    cast(ByteReceiveStream, process.stdout) -                ) - -                # Prune any other workers that have been idle for WORKER_MAX_IDLE_TIME -                # seconds or longer -                now = current_time() -                killed_processes: list[Process] = [] -                while idle_workers: -                    if now - idle_workers[0][1] < WORKER_MAX_IDLE_TIME: -                        break - -                    process_to_kill, idle_since = idle_workers.popleft() -                    process_to_kill.kill() -                    workers.remove(process_to_kill) -                    killed_processes.append(process_to_kill) - -                with CancelScope(shield=True): -                    for killed_process in killed_processes: -                        await killed_process.aclose() - -                break - -            workers.remove(process) -        else: -            command = [sys.executable, "-u", "-m", __name__] -            process = await open_process( -                command, stdin=subprocess.PIPE, stdout=subprocess.PIPE -            ) -            try: -                stdin = cast(ByteSendStream, process.stdin) -                buffered = BufferedByteReceiveStream( -                    cast(ByteReceiveStream, process.stdout) -                ) -                with fail_after(20): -                    message = await buffered.receive(6) - -                if message != b"READY\n": -                    raise BrokenWorkerProcess( -                        f"Worker process returned unexpected response: {message!r}" -                    ) - -                main_module_path = getattr(sys.modules["__main__"], "__file__", None) -                pickled = pickle.dumps( -                    ("init", sys.path, main_module_path), -                    protocol=pickle.HIGHEST_PROTOCOL, -                ) -                await send_raw_command(pickled) -            except (BrokenWorkerProcess, get_cancelled_exc_class()): -                raise -            except BaseException as exc: -                process.kill() -                raise BrokenWorkerProcess( -                    "Error during worker process initialization" -                ) from exc - -            workers.add(process) - -        with CancelScope(shield=not cancellable): -            try: -                return cast(T_Retval, await send_raw_command(request)) -            finally: -                if process in workers: -                    idle_workers.append((process, current_time())) - - -def current_default_process_limiter() -> CapacityLimiter: -    """ -    Return the capacity limiter that is used by default to limit the number of worker -    processes. - -    :return: a capacity limiter object - -    """ -    try: -        return _default_process_limiter.get() -    except LookupError: -        limiter = CapacityLimiter(os.cpu_count() or 2) -        _default_process_limiter.set(limiter) -        return limiter - - -def process_worker() -> None: -    # Redirect standard streams to os.devnull so that user code won't interfere with the -    # parent-worker communication -    stdin = sys.stdin -    stdout = sys.stdout -    sys.stdin = open(os.devnull) -    sys.stdout = open(os.devnull, "w") - -    stdout.buffer.write(b"READY\n") -    while True: -        retval = exception = None -        try: -            command, *args = pickle.load(stdin.buffer) -        except EOFError: -            return -        except BaseException as exc: -            exception = exc -        else: -            if command == "run": -                func, args = args -                try: -                    retval = func(*args) -                except BaseException as exc: -                    exception = exc -            elif command == "init": -                main_module_path: str | None -                sys.path, main_module_path = args -                del sys.modules["__main__"] -                if main_module_path: -                    # Load the parent's main module but as __mp_main__ instead of -                    # __main__ (like multiprocessing does) to avoid infinite recursion -                    try: -                        spec = spec_from_file_location("__mp_main__", main_module_path) -                        if spec and spec.loader: -                            main = module_from_spec(spec) -                            spec.loader.exec_module(main) -                            sys.modules["__main__"] = main -                    except BaseException as exc: -                        exception = exc - -        try: -            if exception is not None: -                status = b"EXCEPTION" -                pickled = pickle.dumps(exception, pickle.HIGHEST_PROTOCOL) -            else: -                status = b"RETURN" -                pickled = pickle.dumps(retval, pickle.HIGHEST_PROTOCOL) -        except BaseException as exc: -            exception = exc -            status = b"EXCEPTION" -            pickled = pickle.dumps(exc, pickle.HIGHEST_PROTOCOL) - -        stdout.buffer.write(b"%s %d\n" % (status, len(pickled))) -        stdout.buffer.write(pickled) - -        # Respect SIGTERM -        if isinstance(exception, SystemExit): -            raise exception - - -if __name__ == "__main__": -    process_worker() | 
