summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/process.pyx')
-rw-r--r--venv/lib/python3.11/site-packages/uvloop/handles/process.pyx792
1 files changed, 0 insertions, 792 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx
deleted file mode 100644
index 63b982a..0000000
--- a/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx
+++ /dev/null
@@ -1,792 +0,0 @@
-@cython.no_gc_clear
-cdef class UVProcess(UVHandle):
- """Abstract class; wrapper over uv_process_t handle."""
-
- def __cinit__(self):
- self.uv_opt_env = NULL
- self.uv_opt_args = NULL
- self._returncode = None
- self._pid = None
- self._fds_to_close = list()
- self._preexec_fn = None
- self._restore_signals = True
- self.context = Context_CopyCurrent()
-
- cdef _close_process_handle(self):
- # XXX: This is a workaround for a libuv bug:
- # - https://github.com/libuv/libuv/issues/1933
- # - https://github.com/libuv/libuv/pull/551
- if self._handle is NULL:
- return
- self._handle.data = NULL
- uv.uv_close(self._handle, __uv_close_process_handle_cb)
- self._handle = NULL # close callback will free() the memory
-
- cdef _init(self, Loop loop, list args, dict env,
- cwd, start_new_session,
- _stdin, _stdout, _stderr, # std* can be defined as macros in C
- pass_fds, debug_flags, preexec_fn, restore_signals):
-
- global __forking
- global __forking_loop
- global __forkHandler
-
- cdef int err
-
- self._start_init(loop)
-
- self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(
- sizeof(uv.uv_process_t))
- if self._handle is NULL:
- self._abort_init()
- raise MemoryError()
-
- # Too early to call _finish_init, but still a lot of work to do.
- # Let's set handle.data to NULL, so in case something goes wrong,
- # callbacks have a chance to avoid casting *something* into UVHandle.
- self._handle.data = NULL
-
- force_fork = False
- if system.PLATFORM_IS_APPLE and not (
- preexec_fn is None
- and not pass_fds
- ):
- # see _execute_child() in CPython/subprocess.py
- force_fork = True
-
- try:
- self._init_options(args, env, cwd, start_new_session,
- _stdin, _stdout, _stderr, force_fork)
-
- restore_inheritable = set()
- if pass_fds:
- for fd in pass_fds:
- if not os_get_inheritable(fd):
- restore_inheritable.add(fd)
- os_set_inheritable(fd, True)
- except Exception:
- self._abort_init()
- raise
-
- if __forking or loop.active_process_handler is not None:
- # Our pthread_atfork handlers won't work correctly when
- # another loop is forking in another thread (even though
- # GIL should help us to avoid that.)
- self._abort_init()
- raise RuntimeError(
- 'Racing with another loop to spawn a process.')
-
- self._errpipe_read, self._errpipe_write = os_pipe()
- fds_to_close = self._fds_to_close
- self._fds_to_close = None
- fds_to_close.append(self._errpipe_read)
- # add the write pipe last so we can close it early
- fds_to_close.append(self._errpipe_write)
- try:
- os_set_inheritable(self._errpipe_write, True)
-
- self._preexec_fn = preexec_fn
- self._restore_signals = restore_signals
-
- loop.active_process_handler = self
- __forking = 1
- __forking_loop = loop
- system.setForkHandler(<system.OnForkHandler>&__get_fork_handler)
-
- PyOS_BeforeFork()
-
- err = uv.uv_spawn(loop.uvloop,
- <uv.uv_process_t*>self._handle,
- &self.options)
-
- __forking = 0
- __forking_loop = None
- system.resetForkHandler()
- loop.active_process_handler = None
-
- PyOS_AfterFork_Parent()
-
- if err < 0:
- self._close_process_handle()
- self._abort_init()
- raise convert_error(err)
-
- self._finish_init()
-
- # close the write pipe early
- os_close(fds_to_close.pop())
-
- if preexec_fn is not None:
- errpipe_data = bytearray()
- while True:
- # XXX: This is a blocking code that has to be
- # rewritten (using loop.connect_read_pipe() or
- # otherwise.)
- part = os_read(self._errpipe_read, 50000)
- errpipe_data += part
- if not part or len(errpipe_data) > 50000:
- break
-
- finally:
- while fds_to_close:
- os_close(fds_to_close.pop())
-
- for fd in restore_inheritable:
- os_set_inheritable(fd, False)
-
- # asyncio caches the PID in BaseSubprocessTransport,
- # so that the transport knows what the PID was even
- # after the process is finished.
- self._pid = (<uv.uv_process_t*>self._handle).pid
-
- # Track the process handle (create a strong ref to it)
- # to guarantee that __dealloc__ doesn't happen in an
- # uncontrolled fashion. We want to wait until the process
- # exits and libuv calls __uvprocess_on_exit_callback,
- # which will call `UVProcess._close()`, which will, in turn,
- # untrack this handle.
- self._loop._track_process(self)
-
- if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK:
- time_sleep(1)
-
- if preexec_fn is not None and errpipe_data:
- # preexec_fn has raised an exception. The child
- # process must be dead now.
- try:
- exc_name, exc_msg = errpipe_data.split(b':', 1)
- exc_name = exc_name.decode()
- exc_msg = exc_msg.decode()
- except Exception:
- self._close()
- raise subprocess_SubprocessError(
- 'Bad exception data from child: {!r}'.format(
- errpipe_data))
- exc_cls = getattr(__builtins__, exc_name,
- subprocess_SubprocessError)
-
- exc = subprocess_SubprocessError(
- 'Exception occurred in preexec_fn.')
- exc.__cause__ = exc_cls(exc_msg)
- self._close()
- raise exc
-
- cdef _after_fork(self):
- # See CPython/_posixsubprocess.c for details
- cdef int err
-
- if self._restore_signals:
- _Py_RestoreSignals()
-
- PyOS_AfterFork_Child()
-
- err = uv.uv_loop_fork(self._loop.uvloop)
- if err < 0:
- raise convert_error(err)
-
- if self._preexec_fn is not None:
- try:
- gc_disable()
- self._preexec_fn()
- except BaseException as ex:
- try:
- with open(self._errpipe_write, 'wb') as f:
- f.write(str(ex.__class__.__name__).encode())
- f.write(b':')
- f.write(str(ex.args[0]).encode())
- finally:
- system._exit(255)
- return
- else:
- os_close(self._errpipe_write)
- else:
- os_close(self._errpipe_write)
-
- cdef _close_after_spawn(self, int fd):
- if self._fds_to_close is None:
- raise RuntimeError(
- 'UVProcess._close_after_spawn called after uv_spawn')
- self._fds_to_close.append(fd)
-
- def __dealloc__(self):
- if self.uv_opt_env is not NULL:
- PyMem_RawFree(self.uv_opt_env)
- self.uv_opt_env = NULL
-
- if self.uv_opt_args is not NULL:
- PyMem_RawFree(self.uv_opt_args)
- self.uv_opt_args = NULL
-
- cdef char** __to_cstring_array(self, list arr):
- cdef:
- int i
- ssize_t arr_len = len(arr)
- bytes el
-
- char **ret
-
- ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *))
- if ret is NULL:
- raise MemoryError()
-
- for i in range(arr_len):
- el = arr[i]
- # NB: PyBytes_AsString doesn't copy the data;
- # we have to be careful when the "arr" is GCed,
- # and it shouldn't be ever mutated.
- ret[i] = PyBytes_AsString(el)
-
- ret[arr_len] = NULL
- return ret
-
- cdef _init_options(self, list args, dict env, cwd, start_new_session,
- _stdin, _stdout, _stderr, bint force_fork):
-
- memset(&self.options, 0, sizeof(uv.uv_process_options_t))
-
- self._init_env(env)
- self.options.env = self.uv_opt_env
-
- self._init_args(args)
- self.options.file = self.uv_opt_file
- self.options.args = self.uv_opt_args
-
- if start_new_session:
- self.options.flags |= uv.UV_PROCESS_DETACHED
-
- if force_fork:
- # This is a hack to work around the change in libuv 1.44:
- # > macos: use posix_spawn instead of fork
- # where Python subprocess options like preexec_fn are
- # crippled. CPython only uses posix_spawn under a pretty
- # strict list of conditions (see subprocess.py), and falls
- # back to using fork() otherwise. We'd like to simulate such
- # behavior with libuv, but unfortunately libuv doesn't
- # provide explicit API to choose such implementation detail.
- # Based on current (libuv 1.46) behavior, setting
- # UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make
- # libuv fallback to use fork, so let's just use it for now.
- self.options.flags |= uv.UV_PROCESS_SETUID
- self.options.uid = uv.getuid()
-
- if cwd is not None:
- cwd = os_fspath(cwd)
-
- if isinstance(cwd, str):
- cwd = PyUnicode_EncodeFSDefault(cwd)
- if not isinstance(cwd, bytes):
- raise ValueError('cwd must be a str or bytes object')
-
- self.__cwd = cwd
- self.options.cwd = PyBytes_AsString(self.__cwd)
-
- self.options.exit_cb = &__uvprocess_on_exit_callback
-
- self._init_files(_stdin, _stdout, _stderr)
-
- cdef _init_args(self, list args):
- cdef:
- bytes path
- int an = len(args)
-
- if an < 1:
- raise ValueError('cannot spawn a process: args are empty')
-
- self.__args = args.copy()
- for i in range(an):
- arg = os_fspath(args[i])
- if isinstance(arg, str):
- self.__args[i] = PyUnicode_EncodeFSDefault(arg)
- elif not isinstance(arg, bytes):
- raise TypeError('all args must be str or bytes')
-
- path = self.__args[0]
- self.uv_opt_file = PyBytes_AsString(path)
- self.uv_opt_args = self.__to_cstring_array(self.__args)
-
- cdef _init_env(self, dict env):
- if env is not None:
- self.__env = list()
- for key in env:
- val = env[key]
-
- if isinstance(key, str):
- key = PyUnicode_EncodeFSDefault(key)
- elif not isinstance(key, bytes):
- raise TypeError(
- 'all environment vars must be bytes or str')
-
- if isinstance(val, str):
- val = PyUnicode_EncodeFSDefault(val)
- elif not isinstance(val, bytes):
- raise TypeError(
- 'all environment values must be bytes or str')
-
- self.__env.append(key + b'=' + val)
-
- self.uv_opt_env = self.__to_cstring_array(self.__env)
- else:
- self.__env = None
-
- cdef _init_files(self, _stdin, _stdout, _stderr):
- self.options.stdio_count = 0
-
- cdef _kill(self, int signum):
- cdef int err
- self._ensure_alive()
- err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum)
- if err < 0:
- raise convert_error(err)
-
- cdef _on_exit(self, int64_t exit_status, int term_signal):
- if term_signal:
- # From Python docs:
- # A negative value -N indicates that the child was
- # terminated by signal N (POSIX only).
- self._returncode = -term_signal
- else:
- self._returncode = exit_status
-
- self._close()
-
- cdef _close(self):
- try:
- if self._loop is not None:
- self._loop._untrack_process(self)
- finally:
- UVHandle._close(self)
-
-
-DEF _CALL_PIPE_DATA_RECEIVED = 0
-DEF _CALL_PIPE_CONNECTION_LOST = 1
-DEF _CALL_PROCESS_EXITED = 2
-DEF _CALL_CONNECTION_LOST = 3
-
-
-@cython.no_gc_clear
-cdef class UVProcessTransport(UVProcess):
- def __cinit__(self):
- self._exit_waiters = []
- self._protocol = None
-
- self._init_futs = []
- self._pending_calls = []
- self._stdio_ready = 0
-
- self._stdin = self._stdout = self._stderr = None
- self.stdin_proto = self.stdout_proto = self.stderr_proto = None
-
- self._finished = 0
-
- cdef _on_exit(self, int64_t exit_status, int term_signal):
- UVProcess._on_exit(self, exit_status, term_signal)
-
- if self._stdio_ready:
- self._loop.call_soon(self._protocol.process_exited,
- context=self.context)
- else:
- self._pending_calls.append((_CALL_PROCESS_EXITED, None, None))
-
- self._try_finish()
-
- for waiter in self._exit_waiters:
- if not waiter.cancelled():
- waiter.set_result(self._returncode)
- self._exit_waiters.clear()
-
- self._close()
-
- cdef _check_proc(self):
- if not self._is_alive() or self._returncode is not None:
- raise ProcessLookupError()
-
- cdef _pipe_connection_lost(self, int fd, exc):
- if self._stdio_ready:
- self._loop.call_soon(self._protocol.pipe_connection_lost, fd, exc,
- context=self.context)
- self._try_finish()
- else:
- self._pending_calls.append((_CALL_PIPE_CONNECTION_LOST, fd, exc))
-
- cdef _pipe_data_received(self, int fd, data):
- if self._stdio_ready:
- self._loop.call_soon(self._protocol.pipe_data_received, fd, data,
- context=self.context)
- else:
- self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data))
-
- cdef _file_redirect_stdio(self, int fd):
- fd = os_dup(fd)
- os_set_inheritable(fd, True)
- self._close_after_spawn(fd)
- return fd
-
- cdef _file_devnull(self):
- dn = os_open(os_devnull, os_O_RDWR)
- os_set_inheritable(dn, True)
- self._close_after_spawn(dn)
- return dn
-
- cdef _file_outpipe(self):
- r, w = __socketpair()
- os_set_inheritable(w, True)
- self._close_after_spawn(w)
- return r, w
-
- cdef _file_inpipe(self):
- r, w = __socketpair()
- os_set_inheritable(r, True)
- self._close_after_spawn(r)
- return r, w
-
- cdef _init_files(self, _stdin, _stdout, _stderr):
- cdef uv.uv_stdio_container_t *iocnt
-
- UVProcess._init_files(self, _stdin, _stdout, _stderr)
-
- io = [None, None, None]
-
- self.options.stdio_count = 3
- self.options.stdio = self.iocnt
-
- if _stdin is not None:
- if _stdin == subprocess_PIPE:
- r, w = self._file_inpipe()
- io[0] = r
-
- self.stdin_proto = WriteSubprocessPipeProto(self, 0)
- waiter = self._loop._new_future()
- self._stdin = WriteUnixTransport.new(
- self._loop, self.stdin_proto, None, waiter)
- self._init_futs.append(waiter)
- self._stdin._open(w)
- self._stdin._init_protocol()
- elif _stdin == subprocess_DEVNULL:
- io[0] = self._file_devnull()
- elif _stdout == subprocess_STDOUT:
- raise ValueError(
- 'subprocess.STDOUT is supported only by stderr parameter')
- else:
- io[0] = self._file_redirect_stdio(_stdin)
- else:
- io[0] = self._file_redirect_stdio(0)
-
- if _stdout is not None:
- if _stdout == subprocess_PIPE:
- # We can't use UV_CREATE_PIPE here, since 'stderr' might be
- # set to 'subprocess.STDOUT', and there is no way to
- # emulate that functionality with libuv high-level
- # streams API. Therefore, we create pipes for stdout and
- # stderr manually.
-
- r, w = self._file_outpipe()
- io[1] = w
-
- self.stdout_proto = ReadSubprocessPipeProto(self, 1)
- waiter = self._loop._new_future()
- self._stdout = ReadUnixTransport.new(
- self._loop, self.stdout_proto, None, waiter)
- self._init_futs.append(waiter)
- self._stdout._open(r)
- self._stdout._init_protocol()
- elif _stdout == subprocess_DEVNULL:
- io[1] = self._file_devnull()
- elif _stdout == subprocess_STDOUT:
- raise ValueError(
- 'subprocess.STDOUT is supported only by stderr parameter')
- else:
- io[1] = self._file_redirect_stdio(_stdout)
- else:
- io[1] = self._file_redirect_stdio(1)
-
- if _stderr is not None:
- if _stderr == subprocess_PIPE:
- r, w = self._file_outpipe()
- io[2] = w
-
- self.stderr_proto = ReadSubprocessPipeProto(self, 2)
- waiter = self._loop._new_future()
- self._stderr = ReadUnixTransport.new(
- self._loop, self.stderr_proto, None, waiter)
- self._init_futs.append(waiter)
- self._stderr._open(r)
- self._stderr._init_protocol()
- elif _stderr == subprocess_STDOUT:
- if io[1] is None:
- # shouldn't ever happen
- raise RuntimeError('cannot apply subprocess.STDOUT')
-
- io[2] = self._file_redirect_stdio(io[1])
- elif _stderr == subprocess_DEVNULL:
- io[2] = self._file_devnull()
- else:
- io[2] = self._file_redirect_stdio(_stderr)
- else:
- io[2] = self._file_redirect_stdio(2)
-
- assert len(io) == 3
- for idx in range(3):
- iocnt = &self.iocnt[idx]
- if io[idx] is not None:
- iocnt.flags = uv.UV_INHERIT_FD
- iocnt.data.fd = io[idx]
- else:
- iocnt.flags = uv.UV_IGNORE
-
- cdef _call_connection_made(self, waiter):
- try:
- # we're always called in the right context, so just call the user's
- self._protocol.connection_made(self)
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as ex:
- if waiter is not None and not waiter.cancelled():
- waiter.set_exception(ex)
- else:
- raise
- else:
- if waiter is not None and not waiter.cancelled():
- waiter.set_result(True)
-
- self._stdio_ready = 1
- if self._pending_calls:
- pending_calls = self._pending_calls.copy()
- self._pending_calls.clear()
- for (type, fd, arg) in pending_calls:
- if type == _CALL_PIPE_CONNECTION_LOST:
- self._pipe_connection_lost(fd, arg)
- elif type == _CALL_PIPE_DATA_RECEIVED:
- self._pipe_data_received(fd, arg)
- elif type == _CALL_PROCESS_EXITED:
- self._loop.call_soon(self._protocol.process_exited)
- elif type == _CALL_CONNECTION_LOST:
- self._loop.call_soon(self._protocol.connection_lost, None)
-
- cdef _try_finish(self):
- if self._returncode is None or self._finished:
- return
-
- if ((self.stdin_proto is None or self.stdin_proto.disconnected) and
- (self.stdout_proto is None or
- self.stdout_proto.disconnected) and
- (self.stderr_proto is None or
- self.stderr_proto.disconnected)):
-
- self._finished = 1
-
- if self._stdio_ready:
- # copy self.context for simplicity
- self._loop.call_soon(self._protocol.connection_lost, None,
- context=self.context)
- else:
- self._pending_calls.append((_CALL_CONNECTION_LOST, None, None))
-
- def __stdio_inited(self, waiter, stdio_fut):
- exc = stdio_fut.exception()
- if exc is not None:
- if waiter is None:
- raise exc
- else:
- waiter.set_exception(exc)
- else:
- self._loop._call_soon_handle(
- new_MethodHandle1(self._loop,
- "UVProcessTransport._call_connection_made",
- <method1_t>self._call_connection_made,
- None, # means to copy the current context
- self, waiter))
-
- @staticmethod
- cdef UVProcessTransport new(Loop loop, protocol, args, env,
- cwd, start_new_session,
- _stdin, _stdout, _stderr, pass_fds,
- waiter,
- debug_flags,
- preexec_fn,
- restore_signals):
-
- cdef UVProcessTransport handle
- handle = UVProcessTransport.__new__(UVProcessTransport)
- handle._protocol = protocol
- handle._init(loop, args, env, cwd, start_new_session,
- __process_convert_fileno(_stdin),
- __process_convert_fileno(_stdout),
- __process_convert_fileno(_stderr),
- pass_fds,
- debug_flags,
- preexec_fn,
- restore_signals)
-
- if handle._init_futs:
- handle._stdio_ready = 0
- init_fut = aio_gather(*handle._init_futs)
- # add_done_callback will copy the current context and run the
- # callback within the context
- init_fut.add_done_callback(
- ft_partial(handle.__stdio_inited, waiter))
- else:
- handle._stdio_ready = 1
- loop._call_soon_handle(
- new_MethodHandle1(loop,
- "UVProcessTransport._call_connection_made",
- <method1_t>handle._call_connection_made,
- None, # means to copy the current context
- handle, waiter))
-
- return handle
-
- def get_protocol(self):
- return self._protocol
-
- def set_protocol(self, protocol):
- self._protocol = protocol
-
- def get_pid(self):
- return self._pid
-
- def get_returncode(self):
- return self._returncode
-
- def get_pipe_transport(self, fd):
- if fd == 0:
- return self._stdin
- elif fd == 1:
- return self._stdout
- elif fd == 2:
- return self._stderr
-
- def terminate(self):
- self._check_proc()
- self._kill(uv.SIGTERM)
-
- def kill(self):
- self._check_proc()
- self._kill(uv.SIGKILL)
-
- def send_signal(self, int signal):
- self._check_proc()
- self._kill(signal)
-
- def is_closing(self):
- return self._closed
-
- def close(self):
- if self._returncode is None:
- self._kill(uv.SIGKILL)
-
- if self._stdin is not None:
- self._stdin.close()
- if self._stdout is not None:
- self._stdout.close()
- if self._stderr is not None:
- self._stderr.close()
-
- if self._returncode is not None:
- # The process is dead, just close the UV handle.
- #
- # (If "self._returncode is None", the process should have been
- # killed already and we're just waiting for a SIGCHLD; after
- # which the transport will be GC'ed and the uvhandle will be
- # closed in UVHandle.__dealloc__.)
- self._close()
-
- def get_extra_info(self, name, default=None):
- return default
-
- def _wait(self):
- fut = self._loop._new_future()
- if self._returncode is not None:
- fut.set_result(self._returncode)
- return fut
-
- self._exit_waiters.append(fut)
- return fut
-
-
-class WriteSubprocessPipeProto(aio_BaseProtocol):
-
- def __init__(self, proc, fd):
- if UVLOOP_DEBUG:
- if type(proc) is not UVProcessTransport:
- raise TypeError
- if not isinstance(fd, int):
- raise TypeError
- self.proc = proc
- self.fd = fd
- self.pipe = None
- self.disconnected = False
-
- def connection_made(self, transport):
- self.pipe = transport
-
- def __repr__(self):
- return ('<%s fd=%s pipe=%r>'
- % (self.__class__.__name__, self.fd, self.pipe))
-
- def connection_lost(self, exc):
- self.disconnected = True
- (<UVProcessTransport>self.proc)._pipe_connection_lost(self.fd, exc)
- self.proc = None
-
- def pause_writing(self):
- (<UVProcessTransport>self.proc)._protocol.pause_writing()
-
- def resume_writing(self):
- (<UVProcessTransport>self.proc)._protocol.resume_writing()
-
-
-class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
- aio_Protocol):
-
- def data_received(self, data):
- (<UVProcessTransport>self.proc)._pipe_data_received(self.fd, data)
-
-
-cdef __process_convert_fileno(object obj):
- if obj is None or isinstance(obj, int):
- return obj
-
- fileno = obj.fileno()
- if not isinstance(fileno, int):
- raise TypeError(
- '{!r}.fileno() returned non-integer'.format(obj))
- return fileno
-
-
-cdef void __uvprocess_on_exit_callback(
- uv.uv_process_t *handle,
- int64_t exit_status,
- int term_signal,
-) noexcept with gil:
-
- if __ensure_handle_data(<uv.uv_handle_t*>handle,
- "UVProcess exit callback") == 0:
- return
-
- cdef UVProcess proc = <UVProcess> handle.data
- try:
- proc._on_exit(exit_status, term_signal)
- except BaseException as ex:
- proc._error(ex, False)
-
-
-cdef __socketpair():
- cdef:
- int fds[2]
- int err
-
- err = system.socketpair(uv.AF_UNIX, uv.SOCK_STREAM, 0, fds)
- if err:
- exc = convert_error(-err)
- raise exc
-
- os_set_inheritable(fds[0], False)
- os_set_inheritable(fds[1], False)
-
- return fds[0], fds[1]
-
-
-cdef void __uv_close_process_handle_cb(
- uv.uv_handle_t* handle
-) noexcept with gil:
- PyMem_RawFree(handle)