diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/process.pyx')
-rw-r--r-- | venv/lib/python3.11/site-packages/uvloop/handles/process.pyx | 792 |
1 files changed, 0 insertions, 792 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx deleted file mode 100644 index 63b982a..0000000 --- a/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx +++ /dev/null @@ -1,792 +0,0 @@ -@cython.no_gc_clear -cdef class UVProcess(UVHandle): - """Abstract class; wrapper over uv_process_t handle.""" - - def __cinit__(self): - self.uv_opt_env = NULL - self.uv_opt_args = NULL - self._returncode = None - self._pid = None - self._fds_to_close = list() - self._preexec_fn = None - self._restore_signals = True - self.context = Context_CopyCurrent() - - cdef _close_process_handle(self): - # XXX: This is a workaround for a libuv bug: - # - https://github.com/libuv/libuv/issues/1933 - # - https://github.com/libuv/libuv/pull/551 - if self._handle is NULL: - return - self._handle.data = NULL - uv.uv_close(self._handle, __uv_close_process_handle_cb) - self._handle = NULL # close callback will free() the memory - - cdef _init(self, Loop loop, list args, dict env, - cwd, start_new_session, - _stdin, _stdout, _stderr, # std* can be defined as macros in C - pass_fds, debug_flags, preexec_fn, restore_signals): - - global __forking - global __forking_loop - global __forkHandler - - cdef int err - - self._start_init(loop) - - self._handle = <uv.uv_handle_t*>PyMem_RawMalloc( - sizeof(uv.uv_process_t)) - if self._handle is NULL: - self._abort_init() - raise MemoryError() - - # Too early to call _finish_init, but still a lot of work to do. - # Let's set handle.data to NULL, so in case something goes wrong, - # callbacks have a chance to avoid casting *something* into UVHandle. - self._handle.data = NULL - - force_fork = False - if system.PLATFORM_IS_APPLE and not ( - preexec_fn is None - and not pass_fds - ): - # see _execute_child() in CPython/subprocess.py - force_fork = True - - try: - self._init_options(args, env, cwd, start_new_session, - _stdin, _stdout, _stderr, force_fork) - - restore_inheritable = set() - if pass_fds: - for fd in pass_fds: - if not os_get_inheritable(fd): - restore_inheritable.add(fd) - os_set_inheritable(fd, True) - except Exception: - self._abort_init() - raise - - if __forking or loop.active_process_handler is not None: - # Our pthread_atfork handlers won't work correctly when - # another loop is forking in another thread (even though - # GIL should help us to avoid that.) - self._abort_init() - raise RuntimeError( - 'Racing with another loop to spawn a process.') - - self._errpipe_read, self._errpipe_write = os_pipe() - fds_to_close = self._fds_to_close - self._fds_to_close = None - fds_to_close.append(self._errpipe_read) - # add the write pipe last so we can close it early - fds_to_close.append(self._errpipe_write) - try: - os_set_inheritable(self._errpipe_write, True) - - self._preexec_fn = preexec_fn - self._restore_signals = restore_signals - - loop.active_process_handler = self - __forking = 1 - __forking_loop = loop - system.setForkHandler(<system.OnForkHandler>&__get_fork_handler) - - PyOS_BeforeFork() - - err = uv.uv_spawn(loop.uvloop, - <uv.uv_process_t*>self._handle, - &self.options) - - __forking = 0 - __forking_loop = None - system.resetForkHandler() - loop.active_process_handler = None - - PyOS_AfterFork_Parent() - - if err < 0: - self._close_process_handle() - self._abort_init() - raise convert_error(err) - - self._finish_init() - - # close the write pipe early - os_close(fds_to_close.pop()) - - if preexec_fn is not None: - errpipe_data = bytearray() - while True: - # XXX: This is a blocking code that has to be - # rewritten (using loop.connect_read_pipe() or - # otherwise.) - part = os_read(self._errpipe_read, 50000) - errpipe_data += part - if not part or len(errpipe_data) > 50000: - break - - finally: - while fds_to_close: - os_close(fds_to_close.pop()) - - for fd in restore_inheritable: - os_set_inheritable(fd, False) - - # asyncio caches the PID in BaseSubprocessTransport, - # so that the transport knows what the PID was even - # after the process is finished. - self._pid = (<uv.uv_process_t*>self._handle).pid - - # Track the process handle (create a strong ref to it) - # to guarantee that __dealloc__ doesn't happen in an - # uncontrolled fashion. We want to wait until the process - # exits and libuv calls __uvprocess_on_exit_callback, - # which will call `UVProcess._close()`, which will, in turn, - # untrack this handle. - self._loop._track_process(self) - - if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK: - time_sleep(1) - - if preexec_fn is not None and errpipe_data: - # preexec_fn has raised an exception. The child - # process must be dead now. - try: - exc_name, exc_msg = errpipe_data.split(b':', 1) - exc_name = exc_name.decode() - exc_msg = exc_msg.decode() - except Exception: - self._close() - raise subprocess_SubprocessError( - 'Bad exception data from child: {!r}'.format( - errpipe_data)) - exc_cls = getattr(__builtins__, exc_name, - subprocess_SubprocessError) - - exc = subprocess_SubprocessError( - 'Exception occurred in preexec_fn.') - exc.__cause__ = exc_cls(exc_msg) - self._close() - raise exc - - cdef _after_fork(self): - # See CPython/_posixsubprocess.c for details - cdef int err - - if self._restore_signals: - _Py_RestoreSignals() - - PyOS_AfterFork_Child() - - err = uv.uv_loop_fork(self._loop.uvloop) - if err < 0: - raise convert_error(err) - - if self._preexec_fn is not None: - try: - gc_disable() - self._preexec_fn() - except BaseException as ex: - try: - with open(self._errpipe_write, 'wb') as f: - f.write(str(ex.__class__.__name__).encode()) - f.write(b':') - f.write(str(ex.args[0]).encode()) - finally: - system._exit(255) - return - else: - os_close(self._errpipe_write) - else: - os_close(self._errpipe_write) - - cdef _close_after_spawn(self, int fd): - if self._fds_to_close is None: - raise RuntimeError( - 'UVProcess._close_after_spawn called after uv_spawn') - self._fds_to_close.append(fd) - - def __dealloc__(self): - if self.uv_opt_env is not NULL: - PyMem_RawFree(self.uv_opt_env) - self.uv_opt_env = NULL - - if self.uv_opt_args is not NULL: - PyMem_RawFree(self.uv_opt_args) - self.uv_opt_args = NULL - - cdef char** __to_cstring_array(self, list arr): - cdef: - int i - ssize_t arr_len = len(arr) - bytes el - - char **ret - - ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *)) - if ret is NULL: - raise MemoryError() - - for i in range(arr_len): - el = arr[i] - # NB: PyBytes_AsString doesn't copy the data; - # we have to be careful when the "arr" is GCed, - # and it shouldn't be ever mutated. - ret[i] = PyBytes_AsString(el) - - ret[arr_len] = NULL - return ret - - cdef _init_options(self, list args, dict env, cwd, start_new_session, - _stdin, _stdout, _stderr, bint force_fork): - - memset(&self.options, 0, sizeof(uv.uv_process_options_t)) - - self._init_env(env) - self.options.env = self.uv_opt_env - - self._init_args(args) - self.options.file = self.uv_opt_file - self.options.args = self.uv_opt_args - - if start_new_session: - self.options.flags |= uv.UV_PROCESS_DETACHED - - if force_fork: - # This is a hack to work around the change in libuv 1.44: - # > macos: use posix_spawn instead of fork - # where Python subprocess options like preexec_fn are - # crippled. CPython only uses posix_spawn under a pretty - # strict list of conditions (see subprocess.py), and falls - # back to using fork() otherwise. We'd like to simulate such - # behavior with libuv, but unfortunately libuv doesn't - # provide explicit API to choose such implementation detail. - # Based on current (libuv 1.46) behavior, setting - # UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make - # libuv fallback to use fork, so let's just use it for now. - self.options.flags |= uv.UV_PROCESS_SETUID - self.options.uid = uv.getuid() - - if cwd is not None: - cwd = os_fspath(cwd) - - if isinstance(cwd, str): - cwd = PyUnicode_EncodeFSDefault(cwd) - if not isinstance(cwd, bytes): - raise ValueError('cwd must be a str or bytes object') - - self.__cwd = cwd - self.options.cwd = PyBytes_AsString(self.__cwd) - - self.options.exit_cb = &__uvprocess_on_exit_callback - - self._init_files(_stdin, _stdout, _stderr) - - cdef _init_args(self, list args): - cdef: - bytes path - int an = len(args) - - if an < 1: - raise ValueError('cannot spawn a process: args are empty') - - self.__args = args.copy() - for i in range(an): - arg = os_fspath(args[i]) - if isinstance(arg, str): - self.__args[i] = PyUnicode_EncodeFSDefault(arg) - elif not isinstance(arg, bytes): - raise TypeError('all args must be str or bytes') - - path = self.__args[0] - self.uv_opt_file = PyBytes_AsString(path) - self.uv_opt_args = self.__to_cstring_array(self.__args) - - cdef _init_env(self, dict env): - if env is not None: - self.__env = list() - for key in env: - val = env[key] - - if isinstance(key, str): - key = PyUnicode_EncodeFSDefault(key) - elif not isinstance(key, bytes): - raise TypeError( - 'all environment vars must be bytes or str') - - if isinstance(val, str): - val = PyUnicode_EncodeFSDefault(val) - elif not isinstance(val, bytes): - raise TypeError( - 'all environment values must be bytes or str') - - self.__env.append(key + b'=' + val) - - self.uv_opt_env = self.__to_cstring_array(self.__env) - else: - self.__env = None - - cdef _init_files(self, _stdin, _stdout, _stderr): - self.options.stdio_count = 0 - - cdef _kill(self, int signum): - cdef int err - self._ensure_alive() - err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum) - if err < 0: - raise convert_error(err) - - cdef _on_exit(self, int64_t exit_status, int term_signal): - if term_signal: - # From Python docs: - # A negative value -N indicates that the child was - # terminated by signal N (POSIX only). - self._returncode = -term_signal - else: - self._returncode = exit_status - - self._close() - - cdef _close(self): - try: - if self._loop is not None: - self._loop._untrack_process(self) - finally: - UVHandle._close(self) - - -DEF _CALL_PIPE_DATA_RECEIVED = 0 -DEF _CALL_PIPE_CONNECTION_LOST = 1 -DEF _CALL_PROCESS_EXITED = 2 -DEF _CALL_CONNECTION_LOST = 3 - - -@cython.no_gc_clear -cdef class UVProcessTransport(UVProcess): - def __cinit__(self): - self._exit_waiters = [] - self._protocol = None - - self._init_futs = [] - self._pending_calls = [] - self._stdio_ready = 0 - - self._stdin = self._stdout = self._stderr = None - self.stdin_proto = self.stdout_proto = self.stderr_proto = None - - self._finished = 0 - - cdef _on_exit(self, int64_t exit_status, int term_signal): - UVProcess._on_exit(self, exit_status, term_signal) - - if self._stdio_ready: - self._loop.call_soon(self._protocol.process_exited, - context=self.context) - else: - self._pending_calls.append((_CALL_PROCESS_EXITED, None, None)) - - self._try_finish() - - for waiter in self._exit_waiters: - if not waiter.cancelled(): - waiter.set_result(self._returncode) - self._exit_waiters.clear() - - self._close() - - cdef _check_proc(self): - if not self._is_alive() or self._returncode is not None: - raise ProcessLookupError() - - cdef _pipe_connection_lost(self, int fd, exc): - if self._stdio_ready: - self._loop.call_soon(self._protocol.pipe_connection_lost, fd, exc, - context=self.context) - self._try_finish() - else: - self._pending_calls.append((_CALL_PIPE_CONNECTION_LOST, fd, exc)) - - cdef _pipe_data_received(self, int fd, data): - if self._stdio_ready: - self._loop.call_soon(self._protocol.pipe_data_received, fd, data, - context=self.context) - else: - self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data)) - - cdef _file_redirect_stdio(self, int fd): - fd = os_dup(fd) - os_set_inheritable(fd, True) - self._close_after_spawn(fd) - return fd - - cdef _file_devnull(self): - dn = os_open(os_devnull, os_O_RDWR) - os_set_inheritable(dn, True) - self._close_after_spawn(dn) - return dn - - cdef _file_outpipe(self): - r, w = __socketpair() - os_set_inheritable(w, True) - self._close_after_spawn(w) - return r, w - - cdef _file_inpipe(self): - r, w = __socketpair() - os_set_inheritable(r, True) - self._close_after_spawn(r) - return r, w - - cdef _init_files(self, _stdin, _stdout, _stderr): - cdef uv.uv_stdio_container_t *iocnt - - UVProcess._init_files(self, _stdin, _stdout, _stderr) - - io = [None, None, None] - - self.options.stdio_count = 3 - self.options.stdio = self.iocnt - - if _stdin is not None: - if _stdin == subprocess_PIPE: - r, w = self._file_inpipe() - io[0] = r - - self.stdin_proto = WriteSubprocessPipeProto(self, 0) - waiter = self._loop._new_future() - self._stdin = WriteUnixTransport.new( - self._loop, self.stdin_proto, None, waiter) - self._init_futs.append(waiter) - self._stdin._open(w) - self._stdin._init_protocol() - elif _stdin == subprocess_DEVNULL: - io[0] = self._file_devnull() - elif _stdout == subprocess_STDOUT: - raise ValueError( - 'subprocess.STDOUT is supported only by stderr parameter') - else: - io[0] = self._file_redirect_stdio(_stdin) - else: - io[0] = self._file_redirect_stdio(0) - - if _stdout is not None: - if _stdout == subprocess_PIPE: - # We can't use UV_CREATE_PIPE here, since 'stderr' might be - # set to 'subprocess.STDOUT', and there is no way to - # emulate that functionality with libuv high-level - # streams API. Therefore, we create pipes for stdout and - # stderr manually. - - r, w = self._file_outpipe() - io[1] = w - - self.stdout_proto = ReadSubprocessPipeProto(self, 1) - waiter = self._loop._new_future() - self._stdout = ReadUnixTransport.new( - self._loop, self.stdout_proto, None, waiter) - self._init_futs.append(waiter) - self._stdout._open(r) - self._stdout._init_protocol() - elif _stdout == subprocess_DEVNULL: - io[1] = self._file_devnull() - elif _stdout == subprocess_STDOUT: - raise ValueError( - 'subprocess.STDOUT is supported only by stderr parameter') - else: - io[1] = self._file_redirect_stdio(_stdout) - else: - io[1] = self._file_redirect_stdio(1) - - if _stderr is not None: - if _stderr == subprocess_PIPE: - r, w = self._file_outpipe() - io[2] = w - - self.stderr_proto = ReadSubprocessPipeProto(self, 2) - waiter = self._loop._new_future() - self._stderr = ReadUnixTransport.new( - self._loop, self.stderr_proto, None, waiter) - self._init_futs.append(waiter) - self._stderr._open(r) - self._stderr._init_protocol() - elif _stderr == subprocess_STDOUT: - if io[1] is None: - # shouldn't ever happen - raise RuntimeError('cannot apply subprocess.STDOUT') - - io[2] = self._file_redirect_stdio(io[1]) - elif _stderr == subprocess_DEVNULL: - io[2] = self._file_devnull() - else: - io[2] = self._file_redirect_stdio(_stderr) - else: - io[2] = self._file_redirect_stdio(2) - - assert len(io) == 3 - for idx in range(3): - iocnt = &self.iocnt[idx] - if io[idx] is not None: - iocnt.flags = uv.UV_INHERIT_FD - iocnt.data.fd = io[idx] - else: - iocnt.flags = uv.UV_IGNORE - - cdef _call_connection_made(self, waiter): - try: - # we're always called in the right context, so just call the user's - self._protocol.connection_made(self) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as ex: - if waiter is not None and not waiter.cancelled(): - waiter.set_exception(ex) - else: - raise - else: - if waiter is not None and not waiter.cancelled(): - waiter.set_result(True) - - self._stdio_ready = 1 - if self._pending_calls: - pending_calls = self._pending_calls.copy() - self._pending_calls.clear() - for (type, fd, arg) in pending_calls: - if type == _CALL_PIPE_CONNECTION_LOST: - self._pipe_connection_lost(fd, arg) - elif type == _CALL_PIPE_DATA_RECEIVED: - self._pipe_data_received(fd, arg) - elif type == _CALL_PROCESS_EXITED: - self._loop.call_soon(self._protocol.process_exited) - elif type == _CALL_CONNECTION_LOST: - self._loop.call_soon(self._protocol.connection_lost, None) - - cdef _try_finish(self): - if self._returncode is None or self._finished: - return - - if ((self.stdin_proto is None or self.stdin_proto.disconnected) and - (self.stdout_proto is None or - self.stdout_proto.disconnected) and - (self.stderr_proto is None or - self.stderr_proto.disconnected)): - - self._finished = 1 - - if self._stdio_ready: - # copy self.context for simplicity - self._loop.call_soon(self._protocol.connection_lost, None, - context=self.context) - else: - self._pending_calls.append((_CALL_CONNECTION_LOST, None, None)) - - def __stdio_inited(self, waiter, stdio_fut): - exc = stdio_fut.exception() - if exc is not None: - if waiter is None: - raise exc - else: - waiter.set_exception(exc) - else: - self._loop._call_soon_handle( - new_MethodHandle1(self._loop, - "UVProcessTransport._call_connection_made", - <method1_t>self._call_connection_made, - None, # means to copy the current context - self, waiter)) - - @staticmethod - cdef UVProcessTransport new(Loop loop, protocol, args, env, - cwd, start_new_session, - _stdin, _stdout, _stderr, pass_fds, - waiter, - debug_flags, - preexec_fn, - restore_signals): - - cdef UVProcessTransport handle - handle = UVProcessTransport.__new__(UVProcessTransport) - handle._protocol = protocol - handle._init(loop, args, env, cwd, start_new_session, - __process_convert_fileno(_stdin), - __process_convert_fileno(_stdout), - __process_convert_fileno(_stderr), - pass_fds, - debug_flags, - preexec_fn, - restore_signals) - - if handle._init_futs: - handle._stdio_ready = 0 - init_fut = aio_gather(*handle._init_futs) - # add_done_callback will copy the current context and run the - # callback within the context - init_fut.add_done_callback( - ft_partial(handle.__stdio_inited, waiter)) - else: - handle._stdio_ready = 1 - loop._call_soon_handle( - new_MethodHandle1(loop, - "UVProcessTransport._call_connection_made", - <method1_t>handle._call_connection_made, - None, # means to copy the current context - handle, waiter)) - - return handle - - def get_protocol(self): - return self._protocol - - def set_protocol(self, protocol): - self._protocol = protocol - - def get_pid(self): - return self._pid - - def get_returncode(self): - return self._returncode - - def get_pipe_transport(self, fd): - if fd == 0: - return self._stdin - elif fd == 1: - return self._stdout - elif fd == 2: - return self._stderr - - def terminate(self): - self._check_proc() - self._kill(uv.SIGTERM) - - def kill(self): - self._check_proc() - self._kill(uv.SIGKILL) - - def send_signal(self, int signal): - self._check_proc() - self._kill(signal) - - def is_closing(self): - return self._closed - - def close(self): - if self._returncode is None: - self._kill(uv.SIGKILL) - - if self._stdin is not None: - self._stdin.close() - if self._stdout is not None: - self._stdout.close() - if self._stderr is not None: - self._stderr.close() - - if self._returncode is not None: - # The process is dead, just close the UV handle. - # - # (If "self._returncode is None", the process should have been - # killed already and we're just waiting for a SIGCHLD; after - # which the transport will be GC'ed and the uvhandle will be - # closed in UVHandle.__dealloc__.) - self._close() - - def get_extra_info(self, name, default=None): - return default - - def _wait(self): - fut = self._loop._new_future() - if self._returncode is not None: - fut.set_result(self._returncode) - return fut - - self._exit_waiters.append(fut) - return fut - - -class WriteSubprocessPipeProto(aio_BaseProtocol): - - def __init__(self, proc, fd): - if UVLOOP_DEBUG: - if type(proc) is not UVProcessTransport: - raise TypeError - if not isinstance(fd, int): - raise TypeError - self.proc = proc - self.fd = fd - self.pipe = None - self.disconnected = False - - def connection_made(self, transport): - self.pipe = transport - - def __repr__(self): - return ('<%s fd=%s pipe=%r>' - % (self.__class__.__name__, self.fd, self.pipe)) - - def connection_lost(self, exc): - self.disconnected = True - (<UVProcessTransport>self.proc)._pipe_connection_lost(self.fd, exc) - self.proc = None - - def pause_writing(self): - (<UVProcessTransport>self.proc)._protocol.pause_writing() - - def resume_writing(self): - (<UVProcessTransport>self.proc)._protocol.resume_writing() - - -class ReadSubprocessPipeProto(WriteSubprocessPipeProto, - aio_Protocol): - - def data_received(self, data): - (<UVProcessTransport>self.proc)._pipe_data_received(self.fd, data) - - -cdef __process_convert_fileno(object obj): - if obj is None or isinstance(obj, int): - return obj - - fileno = obj.fileno() - if not isinstance(fileno, int): - raise TypeError( - '{!r}.fileno() returned non-integer'.format(obj)) - return fileno - - -cdef void __uvprocess_on_exit_callback( - uv.uv_process_t *handle, - int64_t exit_status, - int term_signal, -) noexcept with gil: - - if __ensure_handle_data(<uv.uv_handle_t*>handle, - "UVProcess exit callback") == 0: - return - - cdef UVProcess proc = <UVProcess> handle.data - try: - proc._on_exit(exit_status, term_signal) - except BaseException as ex: - proc._error(ex, False) - - -cdef __socketpair(): - cdef: - int fds[2] - int err - - err = system.socketpair(uv.AF_UNIX, uv.SOCK_STREAM, 0, fds) - if err: - exc = convert_error(-err) - raise exc - - os_set_inheritable(fds[0], False) - os_set_inheritable(fds[1], False) - - return fds[0], fds[1] - - -cdef void __uv_close_process_handle_cb( - uv.uv_handle_t* handle -) noexcept with gil: - PyMem_RawFree(handle) |