diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/process.pyx')
-rw-r--r-- | venv/lib/python3.11/site-packages/uvloop/handles/process.pyx | 792 |
1 files changed, 792 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx new file mode 100644 index 0000000..63b982a --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx @@ -0,0 +1,792 @@ +@cython.no_gc_clear +cdef class UVProcess(UVHandle): + """Abstract class; wrapper over uv_process_t handle.""" + + def __cinit__(self): + self.uv_opt_env = NULL + self.uv_opt_args = NULL + self._returncode = None + self._pid = None + self._fds_to_close = list() + self._preexec_fn = None + self._restore_signals = True + self.context = Context_CopyCurrent() + + cdef _close_process_handle(self): + # XXX: This is a workaround for a libuv bug: + # - https://github.com/libuv/libuv/issues/1933 + # - https://github.com/libuv/libuv/pull/551 + if self._handle is NULL: + return + self._handle.data = NULL + uv.uv_close(self._handle, __uv_close_process_handle_cb) + self._handle = NULL # close callback will free() the memory + + cdef _init(self, Loop loop, list args, dict env, + cwd, start_new_session, + _stdin, _stdout, _stderr, # std* can be defined as macros in C + pass_fds, debug_flags, preexec_fn, restore_signals): + + global __forking + global __forking_loop + global __forkHandler + + cdef int err + + self._start_init(loop) + + self._handle = <uv.uv_handle_t*>PyMem_RawMalloc( + sizeof(uv.uv_process_t)) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + # Too early to call _finish_init, but still a lot of work to do. + # Let's set handle.data to NULL, so in case something goes wrong, + # callbacks have a chance to avoid casting *something* into UVHandle. + self._handle.data = NULL + + force_fork = False + if system.PLATFORM_IS_APPLE and not ( + preexec_fn is None + and not pass_fds + ): + # see _execute_child() in CPython/subprocess.py + force_fork = True + + try: + self._init_options(args, env, cwd, start_new_session, + _stdin, _stdout, _stderr, force_fork) + + restore_inheritable = set() + if pass_fds: + for fd in pass_fds: + if not os_get_inheritable(fd): + restore_inheritable.add(fd) + os_set_inheritable(fd, True) + except Exception: + self._abort_init() + raise + + if __forking or loop.active_process_handler is not None: + # Our pthread_atfork handlers won't work correctly when + # another loop is forking in another thread (even though + # GIL should help us to avoid that.) + self._abort_init() + raise RuntimeError( + 'Racing with another loop to spawn a process.') + + self._errpipe_read, self._errpipe_write = os_pipe() + fds_to_close = self._fds_to_close + self._fds_to_close = None + fds_to_close.append(self._errpipe_read) + # add the write pipe last so we can close it early + fds_to_close.append(self._errpipe_write) + try: + os_set_inheritable(self._errpipe_write, True) + + self._preexec_fn = preexec_fn + self._restore_signals = restore_signals + + loop.active_process_handler = self + __forking = 1 + __forking_loop = loop + system.setForkHandler(<system.OnForkHandler>&__get_fork_handler) + + PyOS_BeforeFork() + + err = uv.uv_spawn(loop.uvloop, + <uv.uv_process_t*>self._handle, + &self.options) + + __forking = 0 + __forking_loop = None + system.resetForkHandler() + loop.active_process_handler = None + + PyOS_AfterFork_Parent() + + if err < 0: + self._close_process_handle() + self._abort_init() + raise convert_error(err) + + self._finish_init() + + # close the write pipe early + os_close(fds_to_close.pop()) + + if preexec_fn is not None: + errpipe_data = bytearray() + while True: + # XXX: This is a blocking code that has to be + # rewritten (using loop.connect_read_pipe() or + # otherwise.) + part = os_read(self._errpipe_read, 50000) + errpipe_data += part + if not part or len(errpipe_data) > 50000: + break + + finally: + while fds_to_close: + os_close(fds_to_close.pop()) + + for fd in restore_inheritable: + os_set_inheritable(fd, False) + + # asyncio caches the PID in BaseSubprocessTransport, + # so that the transport knows what the PID was even + # after the process is finished. + self._pid = (<uv.uv_process_t*>self._handle).pid + + # Track the process handle (create a strong ref to it) + # to guarantee that __dealloc__ doesn't happen in an + # uncontrolled fashion. We want to wait until the process + # exits and libuv calls __uvprocess_on_exit_callback, + # which will call `UVProcess._close()`, which will, in turn, + # untrack this handle. + self._loop._track_process(self) + + if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK: + time_sleep(1) + + if preexec_fn is not None and errpipe_data: + # preexec_fn has raised an exception. The child + # process must be dead now. + try: + exc_name, exc_msg = errpipe_data.split(b':', 1) + exc_name = exc_name.decode() + exc_msg = exc_msg.decode() + except Exception: + self._close() + raise subprocess_SubprocessError( + 'Bad exception data from child: {!r}'.format( + errpipe_data)) + exc_cls = getattr(__builtins__, exc_name, + subprocess_SubprocessError) + + exc = subprocess_SubprocessError( + 'Exception occurred in preexec_fn.') + exc.__cause__ = exc_cls(exc_msg) + self._close() + raise exc + + cdef _after_fork(self): + # See CPython/_posixsubprocess.c for details + cdef int err + + if self._restore_signals: + _Py_RestoreSignals() + + PyOS_AfterFork_Child() + + err = uv.uv_loop_fork(self._loop.uvloop) + if err < 0: + raise convert_error(err) + + if self._preexec_fn is not None: + try: + gc_disable() + self._preexec_fn() + except BaseException as ex: + try: + with open(self._errpipe_write, 'wb') as f: + f.write(str(ex.__class__.__name__).encode()) + f.write(b':') + f.write(str(ex.args[0]).encode()) + finally: + system._exit(255) + return + else: + os_close(self._errpipe_write) + else: + os_close(self._errpipe_write) + + cdef _close_after_spawn(self, int fd): + if self._fds_to_close is None: + raise RuntimeError( + 'UVProcess._close_after_spawn called after uv_spawn') + self._fds_to_close.append(fd) + + def __dealloc__(self): + if self.uv_opt_env is not NULL: + PyMem_RawFree(self.uv_opt_env) + self.uv_opt_env = NULL + + if self.uv_opt_args is not NULL: + PyMem_RawFree(self.uv_opt_args) + self.uv_opt_args = NULL + + cdef char** __to_cstring_array(self, list arr): + cdef: + int i + ssize_t arr_len = len(arr) + bytes el + + char **ret + + ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *)) + if ret is NULL: + raise MemoryError() + + for i in range(arr_len): + el = arr[i] + # NB: PyBytes_AsString doesn't copy the data; + # we have to be careful when the "arr" is GCed, + # and it shouldn't be ever mutated. + ret[i] = PyBytes_AsString(el) + + ret[arr_len] = NULL + return ret + + cdef _init_options(self, list args, dict env, cwd, start_new_session, + _stdin, _stdout, _stderr, bint force_fork): + + memset(&self.options, 0, sizeof(uv.uv_process_options_t)) + + self._init_env(env) + self.options.env = self.uv_opt_env + + self._init_args(args) + self.options.file = self.uv_opt_file + self.options.args = self.uv_opt_args + + if start_new_session: + self.options.flags |= uv.UV_PROCESS_DETACHED + + if force_fork: + # This is a hack to work around the change in libuv 1.44: + # > macos: use posix_spawn instead of fork + # where Python subprocess options like preexec_fn are + # crippled. CPython only uses posix_spawn under a pretty + # strict list of conditions (see subprocess.py), and falls + # back to using fork() otherwise. We'd like to simulate such + # behavior with libuv, but unfortunately libuv doesn't + # provide explicit API to choose such implementation detail. + # Based on current (libuv 1.46) behavior, setting + # UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make + # libuv fallback to use fork, so let's just use it for now. + self.options.flags |= uv.UV_PROCESS_SETUID + self.options.uid = uv.getuid() + + if cwd is not None: + cwd = os_fspath(cwd) + + if isinstance(cwd, str): + cwd = PyUnicode_EncodeFSDefault(cwd) + if not isinstance(cwd, bytes): + raise ValueError('cwd must be a str or bytes object') + + self.__cwd = cwd + self.options.cwd = PyBytes_AsString(self.__cwd) + + self.options.exit_cb = &__uvprocess_on_exit_callback + + self._init_files(_stdin, _stdout, _stderr) + + cdef _init_args(self, list args): + cdef: + bytes path + int an = len(args) + + if an < 1: + raise ValueError('cannot spawn a process: args are empty') + + self.__args = args.copy() + for i in range(an): + arg = os_fspath(args[i]) + if isinstance(arg, str): + self.__args[i] = PyUnicode_EncodeFSDefault(arg) + elif not isinstance(arg, bytes): + raise TypeError('all args must be str or bytes') + + path = self.__args[0] + self.uv_opt_file = PyBytes_AsString(path) + self.uv_opt_args = self.__to_cstring_array(self.__args) + + cdef _init_env(self, dict env): + if env is not None: + self.__env = list() + for key in env: + val = env[key] + + if isinstance(key, str): + key = PyUnicode_EncodeFSDefault(key) + elif not isinstance(key, bytes): + raise TypeError( + 'all environment vars must be bytes or str') + + if isinstance(val, str): + val = PyUnicode_EncodeFSDefault(val) + elif not isinstance(val, bytes): + raise TypeError( + 'all environment values must be bytes or str') + + self.__env.append(key + b'=' + val) + + self.uv_opt_env = self.__to_cstring_array(self.__env) + else: + self.__env = None + + cdef _init_files(self, _stdin, _stdout, _stderr): + self.options.stdio_count = 0 + + cdef _kill(self, int signum): + cdef int err + self._ensure_alive() + err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum) + if err < 0: + raise convert_error(err) + + cdef _on_exit(self, int64_t exit_status, int term_signal): + if term_signal: + # From Python docs: + # A negative value -N indicates that the child was + # terminated by signal N (POSIX only). + self._returncode = -term_signal + else: + self._returncode = exit_status + + self._close() + + cdef _close(self): + try: + if self._loop is not None: + self._loop._untrack_process(self) + finally: + UVHandle._close(self) + + +DEF _CALL_PIPE_DATA_RECEIVED = 0 +DEF _CALL_PIPE_CONNECTION_LOST = 1 +DEF _CALL_PROCESS_EXITED = 2 +DEF _CALL_CONNECTION_LOST = 3 + + +@cython.no_gc_clear +cdef class UVProcessTransport(UVProcess): + def __cinit__(self): + self._exit_waiters = [] + self._protocol = None + + self._init_futs = [] + self._pending_calls = [] + self._stdio_ready = 0 + + self._stdin = self._stdout = self._stderr = None + self.stdin_proto = self.stdout_proto = self.stderr_proto = None + + self._finished = 0 + + cdef _on_exit(self, int64_t exit_status, int term_signal): + UVProcess._on_exit(self, exit_status, term_signal) + + if self._stdio_ready: + self._loop.call_soon(self._protocol.process_exited, + context=self.context) + else: + self._pending_calls.append((_CALL_PROCESS_EXITED, None, None)) + + self._try_finish() + + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(self._returncode) + self._exit_waiters.clear() + + self._close() + + cdef _check_proc(self): + if not self._is_alive() or self._returncode is not None: + raise ProcessLookupError() + + cdef _pipe_connection_lost(self, int fd, exc): + if self._stdio_ready: + self._loop.call_soon(self._protocol.pipe_connection_lost, fd, exc, + context=self.context) + self._try_finish() + else: + self._pending_calls.append((_CALL_PIPE_CONNECTION_LOST, fd, exc)) + + cdef _pipe_data_received(self, int fd, data): + if self._stdio_ready: + self._loop.call_soon(self._protocol.pipe_data_received, fd, data, + context=self.context) + else: + self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data)) + + cdef _file_redirect_stdio(self, int fd): + fd = os_dup(fd) + os_set_inheritable(fd, True) + self._close_after_spawn(fd) + return fd + + cdef _file_devnull(self): + dn = os_open(os_devnull, os_O_RDWR) + os_set_inheritable(dn, True) + self._close_after_spawn(dn) + return dn + + cdef _file_outpipe(self): + r, w = __socketpair() + os_set_inheritable(w, True) + self._close_after_spawn(w) + return r, w + + cdef _file_inpipe(self): + r, w = __socketpair() + os_set_inheritable(r, True) + self._close_after_spawn(r) + return r, w + + cdef _init_files(self, _stdin, _stdout, _stderr): + cdef uv.uv_stdio_container_t *iocnt + + UVProcess._init_files(self, _stdin, _stdout, _stderr) + + io = [None, None, None] + + self.options.stdio_count = 3 + self.options.stdio = self.iocnt + + if _stdin is not None: + if _stdin == subprocess_PIPE: + r, w = self._file_inpipe() + io[0] = r + + self.stdin_proto = WriteSubprocessPipeProto(self, 0) + waiter = self._loop._new_future() + self._stdin = WriteUnixTransport.new( + self._loop, self.stdin_proto, None, waiter) + self._init_futs.append(waiter) + self._stdin._open(w) + self._stdin._init_protocol() + elif _stdin == subprocess_DEVNULL: + io[0] = self._file_devnull() + elif _stdout == subprocess_STDOUT: + raise ValueError( + 'subprocess.STDOUT is supported only by stderr parameter') + else: + io[0] = self._file_redirect_stdio(_stdin) + else: + io[0] = self._file_redirect_stdio(0) + + if _stdout is not None: + if _stdout == subprocess_PIPE: + # We can't use UV_CREATE_PIPE here, since 'stderr' might be + # set to 'subprocess.STDOUT', and there is no way to + # emulate that functionality with libuv high-level + # streams API. Therefore, we create pipes for stdout and + # stderr manually. + + r, w = self._file_outpipe() + io[1] = w + + self.stdout_proto = ReadSubprocessPipeProto(self, 1) + waiter = self._loop._new_future() + self._stdout = ReadUnixTransport.new( + self._loop, self.stdout_proto, None, waiter) + self._init_futs.append(waiter) + self._stdout._open(r) + self._stdout._init_protocol() + elif _stdout == subprocess_DEVNULL: + io[1] = self._file_devnull() + elif _stdout == subprocess_STDOUT: + raise ValueError( + 'subprocess.STDOUT is supported only by stderr parameter') + else: + io[1] = self._file_redirect_stdio(_stdout) + else: + io[1] = self._file_redirect_stdio(1) + + if _stderr is not None: + if _stderr == subprocess_PIPE: + r, w = self._file_outpipe() + io[2] = w + + self.stderr_proto = ReadSubprocessPipeProto(self, 2) + waiter = self._loop._new_future() + self._stderr = ReadUnixTransport.new( + self._loop, self.stderr_proto, None, waiter) + self._init_futs.append(waiter) + self._stderr._open(r) + self._stderr._init_protocol() + elif _stderr == subprocess_STDOUT: + if io[1] is None: + # shouldn't ever happen + raise RuntimeError('cannot apply subprocess.STDOUT') + + io[2] = self._file_redirect_stdio(io[1]) + elif _stderr == subprocess_DEVNULL: + io[2] = self._file_devnull() + else: + io[2] = self._file_redirect_stdio(_stderr) + else: + io[2] = self._file_redirect_stdio(2) + + assert len(io) == 3 + for idx in range(3): + iocnt = &self.iocnt[idx] + if io[idx] is not None: + iocnt.flags = uv.UV_INHERIT_FD + iocnt.data.fd = io[idx] + else: + iocnt.flags = uv.UV_IGNORE + + cdef _call_connection_made(self, waiter): + try: + # we're always called in the right context, so just call the user's + self._protocol.connection_made(self) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as ex: + if waiter is not None and not waiter.cancelled(): + waiter.set_exception(ex) + else: + raise + else: + if waiter is not None and not waiter.cancelled(): + waiter.set_result(True) + + self._stdio_ready = 1 + if self._pending_calls: + pending_calls = self._pending_calls.copy() + self._pending_calls.clear() + for (type, fd, arg) in pending_calls: + if type == _CALL_PIPE_CONNECTION_LOST: + self._pipe_connection_lost(fd, arg) + elif type == _CALL_PIPE_DATA_RECEIVED: + self._pipe_data_received(fd, arg) + elif type == _CALL_PROCESS_EXITED: + self._loop.call_soon(self._protocol.process_exited) + elif type == _CALL_CONNECTION_LOST: + self._loop.call_soon(self._protocol.connection_lost, None) + + cdef _try_finish(self): + if self._returncode is None or self._finished: + return + + if ((self.stdin_proto is None or self.stdin_proto.disconnected) and + (self.stdout_proto is None or + self.stdout_proto.disconnected) and + (self.stderr_proto is None or + self.stderr_proto.disconnected)): + + self._finished = 1 + + if self._stdio_ready: + # copy self.context for simplicity + self._loop.call_soon(self._protocol.connection_lost, None, + context=self.context) + else: + self._pending_calls.append((_CALL_CONNECTION_LOST, None, None)) + + def __stdio_inited(self, waiter, stdio_fut): + exc = stdio_fut.exception() + if exc is not None: + if waiter is None: + raise exc + else: + waiter.set_exception(exc) + else: + self._loop._call_soon_handle( + new_MethodHandle1(self._loop, + "UVProcessTransport._call_connection_made", + <method1_t>self._call_connection_made, + None, # means to copy the current context + self, waiter)) + + @staticmethod + cdef UVProcessTransport new(Loop loop, protocol, args, env, + cwd, start_new_session, + _stdin, _stdout, _stderr, pass_fds, + waiter, + debug_flags, + preexec_fn, + restore_signals): + + cdef UVProcessTransport handle + handle = UVProcessTransport.__new__(UVProcessTransport) + handle._protocol = protocol + handle._init(loop, args, env, cwd, start_new_session, + __process_convert_fileno(_stdin), + __process_convert_fileno(_stdout), + __process_convert_fileno(_stderr), + pass_fds, + debug_flags, + preexec_fn, + restore_signals) + + if handle._init_futs: + handle._stdio_ready = 0 + init_fut = aio_gather(*handle._init_futs) + # add_done_callback will copy the current context and run the + # callback within the context + init_fut.add_done_callback( + ft_partial(handle.__stdio_inited, waiter)) + else: + handle._stdio_ready = 1 + loop._call_soon_handle( + new_MethodHandle1(loop, + "UVProcessTransport._call_connection_made", + <method1_t>handle._call_connection_made, + None, # means to copy the current context + handle, waiter)) + + return handle + + def get_protocol(self): + return self._protocol + + def set_protocol(self, protocol): + self._protocol = protocol + + def get_pid(self): + return self._pid + + def get_returncode(self): + return self._returncode + + def get_pipe_transport(self, fd): + if fd == 0: + return self._stdin + elif fd == 1: + return self._stdout + elif fd == 2: + return self._stderr + + def terminate(self): + self._check_proc() + self._kill(uv.SIGTERM) + + def kill(self): + self._check_proc() + self._kill(uv.SIGKILL) + + def send_signal(self, int signal): + self._check_proc() + self._kill(signal) + + def is_closing(self): + return self._closed + + def close(self): + if self._returncode is None: + self._kill(uv.SIGKILL) + + if self._stdin is not None: + self._stdin.close() + if self._stdout is not None: + self._stdout.close() + if self._stderr is not None: + self._stderr.close() + + if self._returncode is not None: + # The process is dead, just close the UV handle. + # + # (If "self._returncode is None", the process should have been + # killed already and we're just waiting for a SIGCHLD; after + # which the transport will be GC'ed and the uvhandle will be + # closed in UVHandle.__dealloc__.) + self._close() + + def get_extra_info(self, name, default=None): + return default + + def _wait(self): + fut = self._loop._new_future() + if self._returncode is not None: + fut.set_result(self._returncode) + return fut + + self._exit_waiters.append(fut) + return fut + + +class WriteSubprocessPipeProto(aio_BaseProtocol): + + def __init__(self, proc, fd): + if UVLOOP_DEBUG: + if type(proc) is not UVProcessTransport: + raise TypeError + if not isinstance(fd, int): + raise TypeError + self.proc = proc + self.fd = fd + self.pipe = None + self.disconnected = False + + def connection_made(self, transport): + self.pipe = transport + + def __repr__(self): + return ('<%s fd=%s pipe=%r>' + % (self.__class__.__name__, self.fd, self.pipe)) + + def connection_lost(self, exc): + self.disconnected = True + (<UVProcessTransport>self.proc)._pipe_connection_lost(self.fd, exc) + self.proc = None + + def pause_writing(self): + (<UVProcessTransport>self.proc)._protocol.pause_writing() + + def resume_writing(self): + (<UVProcessTransport>self.proc)._protocol.resume_writing() + + +class ReadSubprocessPipeProto(WriteSubprocessPipeProto, + aio_Protocol): + + def data_received(self, data): + (<UVProcessTransport>self.proc)._pipe_data_received(self.fd, data) + + +cdef __process_convert_fileno(object obj): + if obj is None or isinstance(obj, int): + return obj + + fileno = obj.fileno() + if not isinstance(fileno, int): + raise TypeError( + '{!r}.fileno() returned non-integer'.format(obj)) + return fileno + + +cdef void __uvprocess_on_exit_callback( + uv.uv_process_t *handle, + int64_t exit_status, + int term_signal, +) noexcept with gil: + + if __ensure_handle_data(<uv.uv_handle_t*>handle, + "UVProcess exit callback") == 0: + return + + cdef UVProcess proc = <UVProcess> handle.data + try: + proc._on_exit(exit_status, term_signal) + except BaseException as ex: + proc._error(ex, False) + + +cdef __socketpair(): + cdef: + int fds[2] + int err + + err = system.socketpair(uv.AF_UNIX, uv.SOCK_STREAM, 0, fds) + if err: + exc = convert_error(-err) + raise exc + + os_set_inheritable(fds[0], False) + os_set_inheritable(fds[1], False) + + return fds[0], fds[1] + + +cdef void __uv_close_process_handle_cb( + uv.uv_handle_t* handle +) noexcept with gil: + PyMem_RawFree(handle) |