@cython.no_gc_clear cdef class UVProcess(UVHandle): """Abstract class; wrapper over uv_process_t handle.""" def __cinit__(self): self.uv_opt_env = NULL self.uv_opt_args = NULL self._returncode = None self._pid = None self._fds_to_close = list() self._preexec_fn = None self._restore_signals = True self.context = Context_CopyCurrent() cdef _close_process_handle(self): # XXX: This is a workaround for a libuv bug: # - https://github.com/libuv/libuv/issues/1933 # - https://github.com/libuv/libuv/pull/551 if self._handle is NULL: return self._handle.data = NULL uv.uv_close(self._handle, __uv_close_process_handle_cb) self._handle = NULL # close callback will free() the memory cdef _init(self, Loop loop, list args, dict env, cwd, start_new_session, _stdin, _stdout, _stderr, # std* can be defined as macros in C pass_fds, debug_flags, preexec_fn, restore_signals): global __forking global __forking_loop global __forkHandler cdef int err self._start_init(loop) self._handle = PyMem_RawMalloc( sizeof(uv.uv_process_t)) if self._handle is NULL: self._abort_init() raise MemoryError() # Too early to call _finish_init, but still a lot of work to do. # Let's set handle.data to NULL, so in case something goes wrong, # callbacks have a chance to avoid casting *something* into UVHandle. self._handle.data = NULL force_fork = False if system.PLATFORM_IS_APPLE and not ( preexec_fn is None and not pass_fds ): # see _execute_child() in CPython/subprocess.py force_fork = True try: self._init_options(args, env, cwd, start_new_session, _stdin, _stdout, _stderr, force_fork) restore_inheritable = set() if pass_fds: for fd in pass_fds: if not os_get_inheritable(fd): restore_inheritable.add(fd) os_set_inheritable(fd, True) except Exception: self._abort_init() raise if __forking or loop.active_process_handler is not None: # Our pthread_atfork handlers won't work correctly when # another loop is forking in another thread (even though # GIL should help us to avoid that.) self._abort_init() raise RuntimeError( 'Racing with another loop to spawn a process.') self._errpipe_read, self._errpipe_write = os_pipe() fds_to_close = self._fds_to_close self._fds_to_close = None fds_to_close.append(self._errpipe_read) # add the write pipe last so we can close it early fds_to_close.append(self._errpipe_write) try: os_set_inheritable(self._errpipe_write, True) self._preexec_fn = preexec_fn self._restore_signals = restore_signals loop.active_process_handler = self __forking = 1 __forking_loop = loop system.setForkHandler(&__get_fork_handler) PyOS_BeforeFork() err = uv.uv_spawn(loop.uvloop, self._handle, &self.options) __forking = 0 __forking_loop = None system.resetForkHandler() loop.active_process_handler = None PyOS_AfterFork_Parent() if err < 0: self._close_process_handle() self._abort_init() raise convert_error(err) self._finish_init() # close the write pipe early os_close(fds_to_close.pop()) if preexec_fn is not None: errpipe_data = bytearray() while True: # XXX: This is a blocking code that has to be # rewritten (using loop.connect_read_pipe() or # otherwise.) part = os_read(self._errpipe_read, 50000) errpipe_data += part if not part or len(errpipe_data) > 50000: break finally: while fds_to_close: os_close(fds_to_close.pop()) for fd in restore_inheritable: os_set_inheritable(fd, False) # asyncio caches the PID in BaseSubprocessTransport, # so that the transport knows what the PID was even # after the process is finished. self._pid = (self._handle).pid # Track the process handle (create a strong ref to it) # to guarantee that __dealloc__ doesn't happen in an # uncontrolled fashion. We want to wait until the process # exits and libuv calls __uvprocess_on_exit_callback, # which will call `UVProcess._close()`, which will, in turn, # untrack this handle. self._loop._track_process(self) if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK: time_sleep(1) if preexec_fn is not None and errpipe_data: # preexec_fn has raised an exception. The child # process must be dead now. try: exc_name, exc_msg = errpipe_data.split(b':', 1) exc_name = exc_name.decode() exc_msg = exc_msg.decode() except Exception: self._close() raise subprocess_SubprocessError( 'Bad exception data from child: {!r}'.format( errpipe_data)) exc_cls = getattr(__builtins__, exc_name, subprocess_SubprocessError) exc = subprocess_SubprocessError( 'Exception occurred in preexec_fn.') exc.__cause__ = exc_cls(exc_msg) self._close() raise exc cdef _after_fork(self): # See CPython/_posixsubprocess.c for details cdef int err if self._restore_signals: _Py_RestoreSignals() PyOS_AfterFork_Child() err = uv.uv_loop_fork(self._loop.uvloop) if err < 0: raise convert_error(err) if self._preexec_fn is not None: try: gc_disable() self._preexec_fn() except BaseException as ex: try: with open(self._errpipe_write, 'wb') as f: f.write(str(ex.__class__.__name__).encode()) f.write(b':') f.write(str(ex.args[0]).encode()) finally: system._exit(255) return else: os_close(self._errpipe_write) else: os_close(self._errpipe_write) cdef _close_after_spawn(self, int fd): if self._fds_to_close is None: raise RuntimeError( 'UVProcess._close_after_spawn called after uv_spawn') self._fds_to_close.append(fd) def __dealloc__(self): if self.uv_opt_env is not NULL: PyMem_RawFree(self.uv_opt_env) self.uv_opt_env = NULL if self.uv_opt_args is not NULL: PyMem_RawFree(self.uv_opt_args) self.uv_opt_args = NULL cdef char** __to_cstring_array(self, list arr): cdef: int i ssize_t arr_len = len(arr) bytes el char **ret ret = PyMem_RawMalloc((arr_len + 1) * sizeof(char *)) if ret is NULL: raise MemoryError() for i in range(arr_len): el = arr[i] # NB: PyBytes_AsString doesn't copy the data; # we have to be careful when the "arr" is GCed, # and it shouldn't be ever mutated. ret[i] = PyBytes_AsString(el) ret[arr_len] = NULL return ret cdef _init_options(self, list args, dict env, cwd, start_new_session, _stdin, _stdout, _stderr, bint force_fork): memset(&self.options, 0, sizeof(uv.uv_process_options_t)) self._init_env(env) self.options.env = self.uv_opt_env self._init_args(args) self.options.file = self.uv_opt_file self.options.args = self.uv_opt_args if start_new_session: self.options.flags |= uv.UV_PROCESS_DETACHED if force_fork: # This is a hack to work around the change in libuv 1.44: # > macos: use posix_spawn instead of fork # where Python subprocess options like preexec_fn are # crippled. CPython only uses posix_spawn under a pretty # strict list of conditions (see subprocess.py), and falls # back to using fork() otherwise. We'd like to simulate such # behavior with libuv, but unfortunately libuv doesn't # provide explicit API to choose such implementation detail. # Based on current (libuv 1.46) behavior, setting # UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make # libuv fallback to use fork, so let's just use it for now. self.options.flags |= uv.UV_PROCESS_SETUID self.options.uid = uv.getuid() if cwd is not None: cwd = os_fspath(cwd) if isinstance(cwd, str): cwd = PyUnicode_EncodeFSDefault(cwd) if not isinstance(cwd, bytes): raise ValueError('cwd must be a str or bytes object') self.__cwd = cwd self.options.cwd = PyBytes_AsString(self.__cwd) self.options.exit_cb = &__uvprocess_on_exit_callback self._init_files(_stdin, _stdout, _stderr) cdef _init_args(self, list args): cdef: bytes path int an = len(args) if an < 1: raise ValueError('cannot spawn a process: args are empty') self.__args = args.copy() for i in range(an): arg = os_fspath(args[i]) if isinstance(arg, str): self.__args[i] = PyUnicode_EncodeFSDefault(arg) elif not isinstance(arg, bytes): raise TypeError('all args must be str or bytes') path = self.__args[0] self.uv_opt_file = PyBytes_AsString(path) self.uv_opt_args = self.__to_cstring_array(self.__args) cdef _init_env(self, dict env): if env is not None: self.__env = list() for key in env: val = env[key] if isinstance(key, str): key = PyUnicode_EncodeFSDefault(key) elif not isinstance(key, bytes): raise TypeError( 'all environment vars must be bytes or str') if isinstance(val, str): val = PyUnicode_EncodeFSDefault(val) elif not isinstance(val, bytes): raise TypeError( 'all environment values must be bytes or str') self.__env.append(key + b'=' + val) self.uv_opt_env = self.__to_cstring_array(self.__env) else: self.__env = None cdef _init_files(self, _stdin, _stdout, _stderr): self.options.stdio_count = 0 cdef _kill(self, int signum): cdef int err self._ensure_alive() err = uv.uv_process_kill(self._handle, signum) if err < 0: raise convert_error(err) cdef _on_exit(self, int64_t exit_status, int term_signal): if term_signal: # From Python docs: # A negative value -N indicates that the child was # terminated by signal N (POSIX only). self._returncode = -term_signal else: self._returncode = exit_status self._close() cdef _close(self): try: if self._loop is not None: self._loop._untrack_process(self) finally: UVHandle._close(self) DEF _CALL_PIPE_DATA_RECEIVED = 0 DEF _CALL_PIPE_CONNECTION_LOST = 1 DEF _CALL_PROCESS_EXITED = 2 DEF _CALL_CONNECTION_LOST = 3 @cython.no_gc_clear cdef class UVProcessTransport(UVProcess): def __cinit__(self): self._exit_waiters = [] self._protocol = None self._init_futs = [] self._pending_calls = [] self._stdio_ready = 0 self._stdin = self._stdout = self._stderr = None self.stdin_proto = self.stdout_proto = self.stderr_proto = None self._finished = 0 cdef _on_exit(self, int64_t exit_status, int term_signal): UVProcess._on_exit(self, exit_status, term_signal) if self._stdio_ready: self._loop.call_soon(self._protocol.process_exited, context=self.context) else: self._pending_calls.append((_CALL_PROCESS_EXITED, None, None)) self._try_finish() for waiter in self._exit_waiters: if not waiter.cancelled(): waiter.set_result(self._returncode) self._exit_waiters.clear() self._close() cdef _check_proc(self): if not self._is_alive() or self._returncode is not None: raise ProcessLookupError() cdef _pipe_connection_lost(self, int fd, exc): if self._stdio_ready: self._loop.call_soon(self._protocol.pipe_connection_lost, fd, exc, context=self.context) self._try_finish() else: self._pending_calls.append((_CALL_PIPE_CONNECTION_LOST, fd, exc)) cdef _pipe_data_received(self, int fd, data): if self._stdio_ready: self._loop.call_soon(self._protocol.pipe_data_received, fd, data, context=self.context) else: self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data)) cdef _file_redirect_stdio(self, int fd): fd = os_dup(fd) os_set_inheritable(fd, True) self._close_after_spawn(fd) return fd cdef _file_devnull(self): dn = os_open(os_devnull, os_O_RDWR) os_set_inheritable(dn, True) self._close_after_spawn(dn) return dn cdef _file_outpipe(self): r, w = __socketpair() os_set_inheritable(w, True) self._close_after_spawn(w) return r, w cdef _file_inpipe(self): r, w = __socketpair() os_set_inheritable(r, True) self._close_after_spawn(r) return r, w cdef _init_files(self, _stdin, _stdout, _stderr): cdef uv.uv_stdio_container_t *iocnt UVProcess._init_files(self, _stdin, _stdout, _stderr) io = [None, None, None] self.options.stdio_count = 3 self.options.stdio = self.iocnt if _stdin is not None: if _stdin == subprocess_PIPE: r, w = self._file_inpipe() io[0] = r self.stdin_proto = WriteSubprocessPipeProto(self, 0) waiter = self._loop._new_future() self._stdin = WriteUnixTransport.new( self._loop, self.stdin_proto, None, waiter) self._init_futs.append(waiter) self._stdin._open(w) self._stdin._init_protocol() elif _stdin == subprocess_DEVNULL: io[0] = self._file_devnull() elif _stdout == subprocess_STDOUT: raise ValueError( 'subprocess.STDOUT is supported only by stderr parameter') else: io[0] = self._file_redirect_stdio(_stdin) else: io[0] = self._file_redirect_stdio(0) if _stdout is not None: if _stdout == subprocess_PIPE: # We can't use UV_CREATE_PIPE here, since 'stderr' might be # set to 'subprocess.STDOUT', and there is no way to # emulate that functionality with libuv high-level # streams API. Therefore, we create pipes for stdout and # stderr manually. r, w = self._file_outpipe() io[1] = w self.stdout_proto = ReadSubprocessPipeProto(self, 1) waiter = self._loop._new_future() self._stdout = ReadUnixTransport.new( self._loop, self.stdout_proto, None, waiter) self._init_futs.append(waiter) self._stdout._open(r) self._stdout._init_protocol() elif _stdout == subprocess_DEVNULL: io[1] = self._file_devnull() elif _stdout == subprocess_STDOUT: raise ValueError( 'subprocess.STDOUT is supported only by stderr parameter') else: io[1] = self._file_redirect_stdio(_stdout) else: io[1] = self._file_redirect_stdio(1) if _stderr is not None: if _stderr == subprocess_PIPE: r, w = self._file_outpipe() io[2] = w self.stderr_proto = ReadSubprocessPipeProto(self, 2) waiter = self._loop._new_future() self._stderr = ReadUnixTransport.new( self._loop, self.stderr_proto, None, waiter) self._init_futs.append(waiter) self._stderr._open(r) self._stderr._init_protocol() elif _stderr == subprocess_STDOUT: if io[1] is None: # shouldn't ever happen raise RuntimeError('cannot apply subprocess.STDOUT') io[2] = self._file_redirect_stdio(io[1]) elif _stderr == subprocess_DEVNULL: io[2] = self._file_devnull() else: io[2] = self._file_redirect_stdio(_stderr) else: io[2] = self._file_redirect_stdio(2) assert len(io) == 3 for idx in range(3): iocnt = &self.iocnt[idx] if io[idx] is not None: iocnt.flags = uv.UV_INHERIT_FD iocnt.data.fd = io[idx] else: iocnt.flags = uv.UV_IGNORE cdef _call_connection_made(self, waiter): try: # we're always called in the right context, so just call the user's self._protocol.connection_made(self) except (KeyboardInterrupt, SystemExit): raise except BaseException as ex: if waiter is not None and not waiter.cancelled(): waiter.set_exception(ex) else: raise else: if waiter is not None and not waiter.cancelled(): waiter.set_result(True) self._stdio_ready = 1 if self._pending_calls: pending_calls = self._pending_calls.copy() self._pending_calls.clear() for (type, fd, arg) in pending_calls: if type == _CALL_PIPE_CONNECTION_LOST: self._pipe_connection_lost(fd, arg) elif type == _CALL_PIPE_DATA_RECEIVED: self._pipe_data_received(fd, arg) elif type == _CALL_PROCESS_EXITED: self._loop.call_soon(self._protocol.process_exited) elif type == _CALL_CONNECTION_LOST: self._loop.call_soon(self._protocol.connection_lost, None) cdef _try_finish(self): if self._returncode is None or self._finished: return if ((self.stdin_proto is None or self.stdin_proto.disconnected) and (self.stdout_proto is None or self.stdout_proto.disconnected) and (self.stderr_proto is None or self.stderr_proto.disconnected)): self._finished = 1 if self._stdio_ready: # copy self.context for simplicity self._loop.call_soon(self._protocol.connection_lost, None, context=self.context) else: self._pending_calls.append((_CALL_CONNECTION_LOST, None, None)) def __stdio_inited(self, waiter, stdio_fut): exc = stdio_fut.exception() if exc is not None: if waiter is None: raise exc else: waiter.set_exception(exc) else: self._loop._call_soon_handle( new_MethodHandle1(self._loop, "UVProcessTransport._call_connection_made", self._call_connection_made, None, # means to copy the current context self, waiter)) @staticmethod cdef UVProcessTransport new(Loop loop, protocol, args, env, cwd, start_new_session, _stdin, _stdout, _stderr, pass_fds, waiter, debug_flags, preexec_fn, restore_signals): cdef UVProcessTransport handle handle = UVProcessTransport.__new__(UVProcessTransport) handle._protocol = protocol handle._init(loop, args, env, cwd, start_new_session, __process_convert_fileno(_stdin), __process_convert_fileno(_stdout), __process_convert_fileno(_stderr), pass_fds, debug_flags, preexec_fn, restore_signals) if handle._init_futs: handle._stdio_ready = 0 init_fut = aio_gather(*handle._init_futs) # add_done_callback will copy the current context and run the # callback within the context init_fut.add_done_callback( ft_partial(handle.__stdio_inited, waiter)) else: handle._stdio_ready = 1 loop._call_soon_handle( new_MethodHandle1(loop, "UVProcessTransport._call_connection_made", handle._call_connection_made, None, # means to copy the current context handle, waiter)) return handle def get_protocol(self): return self._protocol def set_protocol(self, protocol): self._protocol = protocol def get_pid(self): return self._pid def get_returncode(self): return self._returncode def get_pipe_transport(self, fd): if fd == 0: return self._stdin elif fd == 1: return self._stdout elif fd == 2: return self._stderr def terminate(self): self._check_proc() self._kill(uv.SIGTERM) def kill(self): self._check_proc() self._kill(uv.SIGKILL) def send_signal(self, int signal): self._check_proc() self._kill(signal) def is_closing(self): return self._closed def close(self): if self._returncode is None: self._kill(uv.SIGKILL) if self._stdin is not None: self._stdin.close() if self._stdout is not None: self._stdout.close() if self._stderr is not None: self._stderr.close() if self._returncode is not None: # The process is dead, just close the UV handle. # # (If "self._returncode is None", the process should have been # killed already and we're just waiting for a SIGCHLD; after # which the transport will be GC'ed and the uvhandle will be # closed in UVHandle.__dealloc__.) self._close() def get_extra_info(self, name, default=None): return default def _wait(self): fut = self._loop._new_future() if self._returncode is not None: fut.set_result(self._returncode) return fut self._exit_waiters.append(fut) return fut class WriteSubprocessPipeProto(aio_BaseProtocol): def __init__(self, proc, fd): if UVLOOP_DEBUG: if type(proc) is not UVProcessTransport: raise TypeError if not isinstance(fd, int): raise TypeError self.proc = proc self.fd = fd self.pipe = None self.disconnected = False def connection_made(self, transport): self.pipe = transport def __repr__(self): return ('<%s fd=%s pipe=%r>' % (self.__class__.__name__, self.fd, self.pipe)) def connection_lost(self, exc): self.disconnected = True (self.proc)._pipe_connection_lost(self.fd, exc) self.proc = None def pause_writing(self): (self.proc)._protocol.pause_writing() def resume_writing(self): (self.proc)._protocol.resume_writing() class ReadSubprocessPipeProto(WriteSubprocessPipeProto, aio_Protocol): def data_received(self, data): (self.proc)._pipe_data_received(self.fd, data) cdef __process_convert_fileno(object obj): if obj is None or isinstance(obj, int): return obj fileno = obj.fileno() if not isinstance(fileno, int): raise TypeError( '{!r}.fileno() returned non-integer'.format(obj)) return fileno cdef void __uvprocess_on_exit_callback( uv.uv_process_t *handle, int64_t exit_status, int term_signal, ) noexcept with gil: if __ensure_handle_data(handle, "UVProcess exit callback") == 0: return cdef UVProcess proc = handle.data try: proc._on_exit(exit_status, term_signal) except BaseException as ex: proc._error(ex, False) cdef __socketpair(): cdef: int fds[2] int err err = system.socketpair(uv.AF_UNIX, uv.SOCK_STREAM, 0, fds) if err: exc = convert_error(-err) raise exc os_set_inheritable(fds[0], False) os_set_inheritable(fds[1], False) return fds[0], fds[1] cdef void __uv_close_process_handle_cb( uv.uv_handle_t* handle ) noexcept with gil: PyMem_RawFree(handle)