summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/process.pyx')
-rw-r--r--venv/lib/python3.11/site-packages/uvloop/handles/process.pyx792
1 files changed, 792 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx
new file mode 100644
index 0000000..63b982a
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx
@@ -0,0 +1,792 @@
+@cython.no_gc_clear
+cdef class UVProcess(UVHandle):
+ """Abstract class; wrapper over uv_process_t handle."""
+
+ def __cinit__(self):
+ self.uv_opt_env = NULL
+ self.uv_opt_args = NULL
+ self._returncode = None
+ self._pid = None
+ self._fds_to_close = list()
+ self._preexec_fn = None
+ self._restore_signals = True
+ self.context = Context_CopyCurrent()
+
+ cdef _close_process_handle(self):
+ # XXX: This is a workaround for a libuv bug:
+ # - https://github.com/libuv/libuv/issues/1933
+ # - https://github.com/libuv/libuv/pull/551
+ if self._handle is NULL:
+ return
+ self._handle.data = NULL
+ uv.uv_close(self._handle, __uv_close_process_handle_cb)
+ self._handle = NULL # close callback will free() the memory
+
+ cdef _init(self, Loop loop, list args, dict env,
+ cwd, start_new_session,
+ _stdin, _stdout, _stderr, # std* can be defined as macros in C
+ pass_fds, debug_flags, preexec_fn, restore_signals):
+
+ global __forking
+ global __forking_loop
+ global __forkHandler
+
+ cdef int err
+
+ self._start_init(loop)
+
+ self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(
+ sizeof(uv.uv_process_t))
+ if self._handle is NULL:
+ self._abort_init()
+ raise MemoryError()
+
+ # Too early to call _finish_init, but still a lot of work to do.
+ # Let's set handle.data to NULL, so in case something goes wrong,
+ # callbacks have a chance to avoid casting *something* into UVHandle.
+ self._handle.data = NULL
+
+ force_fork = False
+ if system.PLATFORM_IS_APPLE and not (
+ preexec_fn is None
+ and not pass_fds
+ ):
+ # see _execute_child() in CPython/subprocess.py
+ force_fork = True
+
+ try:
+ self._init_options(args, env, cwd, start_new_session,
+ _stdin, _stdout, _stderr, force_fork)
+
+ restore_inheritable = set()
+ if pass_fds:
+ for fd in pass_fds:
+ if not os_get_inheritable(fd):
+ restore_inheritable.add(fd)
+ os_set_inheritable(fd, True)
+ except Exception:
+ self._abort_init()
+ raise
+
+ if __forking or loop.active_process_handler is not None:
+ # Our pthread_atfork handlers won't work correctly when
+ # another loop is forking in another thread (even though
+ # GIL should help us to avoid that.)
+ self._abort_init()
+ raise RuntimeError(
+ 'Racing with another loop to spawn a process.')
+
+ self._errpipe_read, self._errpipe_write = os_pipe()
+ fds_to_close = self._fds_to_close
+ self._fds_to_close = None
+ fds_to_close.append(self._errpipe_read)
+ # add the write pipe last so we can close it early
+ fds_to_close.append(self._errpipe_write)
+ try:
+ os_set_inheritable(self._errpipe_write, True)
+
+ self._preexec_fn = preexec_fn
+ self._restore_signals = restore_signals
+
+ loop.active_process_handler = self
+ __forking = 1
+ __forking_loop = loop
+ system.setForkHandler(<system.OnForkHandler>&__get_fork_handler)
+
+ PyOS_BeforeFork()
+
+ err = uv.uv_spawn(loop.uvloop,
+ <uv.uv_process_t*>self._handle,
+ &self.options)
+
+ __forking = 0
+ __forking_loop = None
+ system.resetForkHandler()
+ loop.active_process_handler = None
+
+ PyOS_AfterFork_Parent()
+
+ if err < 0:
+ self._close_process_handle()
+ self._abort_init()
+ raise convert_error(err)
+
+ self._finish_init()
+
+ # close the write pipe early
+ os_close(fds_to_close.pop())
+
+ if preexec_fn is not None:
+ errpipe_data = bytearray()
+ while True:
+ # XXX: This is a blocking code that has to be
+ # rewritten (using loop.connect_read_pipe() or
+ # otherwise.)
+ part = os_read(self._errpipe_read, 50000)
+ errpipe_data += part
+ if not part or len(errpipe_data) > 50000:
+ break
+
+ finally:
+ while fds_to_close:
+ os_close(fds_to_close.pop())
+
+ for fd in restore_inheritable:
+ os_set_inheritable(fd, False)
+
+ # asyncio caches the PID in BaseSubprocessTransport,
+ # so that the transport knows what the PID was even
+ # after the process is finished.
+ self._pid = (<uv.uv_process_t*>self._handle).pid
+
+ # Track the process handle (create a strong ref to it)
+ # to guarantee that __dealloc__ doesn't happen in an
+ # uncontrolled fashion. We want to wait until the process
+ # exits and libuv calls __uvprocess_on_exit_callback,
+ # which will call `UVProcess._close()`, which will, in turn,
+ # untrack this handle.
+ self._loop._track_process(self)
+
+ if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK:
+ time_sleep(1)
+
+ if preexec_fn is not None and errpipe_data:
+ # preexec_fn has raised an exception. The child
+ # process must be dead now.
+ try:
+ exc_name, exc_msg = errpipe_data.split(b':', 1)
+ exc_name = exc_name.decode()
+ exc_msg = exc_msg.decode()
+ except Exception:
+ self._close()
+ raise subprocess_SubprocessError(
+ 'Bad exception data from child: {!r}'.format(
+ errpipe_data))
+ exc_cls = getattr(__builtins__, exc_name,
+ subprocess_SubprocessError)
+
+ exc = subprocess_SubprocessError(
+ 'Exception occurred in preexec_fn.')
+ exc.__cause__ = exc_cls(exc_msg)
+ self._close()
+ raise exc
+
+ cdef _after_fork(self):
+ # See CPython/_posixsubprocess.c for details
+ cdef int err
+
+ if self._restore_signals:
+ _Py_RestoreSignals()
+
+ PyOS_AfterFork_Child()
+
+ err = uv.uv_loop_fork(self._loop.uvloop)
+ if err < 0:
+ raise convert_error(err)
+
+ if self._preexec_fn is not None:
+ try:
+ gc_disable()
+ self._preexec_fn()
+ except BaseException as ex:
+ try:
+ with open(self._errpipe_write, 'wb') as f:
+ f.write(str(ex.__class__.__name__).encode())
+ f.write(b':')
+ f.write(str(ex.args[0]).encode())
+ finally:
+ system._exit(255)
+ return
+ else:
+ os_close(self._errpipe_write)
+ else:
+ os_close(self._errpipe_write)
+
+ cdef _close_after_spawn(self, int fd):
+ if self._fds_to_close is None:
+ raise RuntimeError(
+ 'UVProcess._close_after_spawn called after uv_spawn')
+ self._fds_to_close.append(fd)
+
+ def __dealloc__(self):
+ if self.uv_opt_env is not NULL:
+ PyMem_RawFree(self.uv_opt_env)
+ self.uv_opt_env = NULL
+
+ if self.uv_opt_args is not NULL:
+ PyMem_RawFree(self.uv_opt_args)
+ self.uv_opt_args = NULL
+
+ cdef char** __to_cstring_array(self, list arr):
+ cdef:
+ int i
+ ssize_t arr_len = len(arr)
+ bytes el
+
+ char **ret
+
+ ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *))
+ if ret is NULL:
+ raise MemoryError()
+
+ for i in range(arr_len):
+ el = arr[i]
+ # NB: PyBytes_AsString doesn't copy the data;
+ # we have to be careful when the "arr" is GCed,
+ # and it shouldn't be ever mutated.
+ ret[i] = PyBytes_AsString(el)
+
+ ret[arr_len] = NULL
+ return ret
+
+ cdef _init_options(self, list args, dict env, cwd, start_new_session,
+ _stdin, _stdout, _stderr, bint force_fork):
+
+ memset(&self.options, 0, sizeof(uv.uv_process_options_t))
+
+ self._init_env(env)
+ self.options.env = self.uv_opt_env
+
+ self._init_args(args)
+ self.options.file = self.uv_opt_file
+ self.options.args = self.uv_opt_args
+
+ if start_new_session:
+ self.options.flags |= uv.UV_PROCESS_DETACHED
+
+ if force_fork:
+ # This is a hack to work around the change in libuv 1.44:
+ # > macos: use posix_spawn instead of fork
+ # where Python subprocess options like preexec_fn are
+ # crippled. CPython only uses posix_spawn under a pretty
+ # strict list of conditions (see subprocess.py), and falls
+ # back to using fork() otherwise. We'd like to simulate such
+ # behavior with libuv, but unfortunately libuv doesn't
+ # provide explicit API to choose such implementation detail.
+ # Based on current (libuv 1.46) behavior, setting
+ # UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make
+ # libuv fallback to use fork, so let's just use it for now.
+ self.options.flags |= uv.UV_PROCESS_SETUID
+ self.options.uid = uv.getuid()
+
+ if cwd is not None:
+ cwd = os_fspath(cwd)
+
+ if isinstance(cwd, str):
+ cwd = PyUnicode_EncodeFSDefault(cwd)
+ if not isinstance(cwd, bytes):
+ raise ValueError('cwd must be a str or bytes object')
+
+ self.__cwd = cwd
+ self.options.cwd = PyBytes_AsString(self.__cwd)
+
+ self.options.exit_cb = &__uvprocess_on_exit_callback
+
+ self._init_files(_stdin, _stdout, _stderr)
+
+ cdef _init_args(self, list args):
+ cdef:
+ bytes path
+ int an = len(args)
+
+ if an < 1:
+ raise ValueError('cannot spawn a process: args are empty')
+
+ self.__args = args.copy()
+ for i in range(an):
+ arg = os_fspath(args[i])
+ if isinstance(arg, str):
+ self.__args[i] = PyUnicode_EncodeFSDefault(arg)
+ elif not isinstance(arg, bytes):
+ raise TypeError('all args must be str or bytes')
+
+ path = self.__args[0]
+ self.uv_opt_file = PyBytes_AsString(path)
+ self.uv_opt_args = self.__to_cstring_array(self.__args)
+
+ cdef _init_env(self, dict env):
+ if env is not None:
+ self.__env = list()
+ for key in env:
+ val = env[key]
+
+ if isinstance(key, str):
+ key = PyUnicode_EncodeFSDefault(key)
+ elif not isinstance(key, bytes):
+ raise TypeError(
+ 'all environment vars must be bytes or str')
+
+ if isinstance(val, str):
+ val = PyUnicode_EncodeFSDefault(val)
+ elif not isinstance(val, bytes):
+ raise TypeError(
+ 'all environment values must be bytes or str')
+
+ self.__env.append(key + b'=' + val)
+
+ self.uv_opt_env = self.__to_cstring_array(self.__env)
+ else:
+ self.__env = None
+
+ cdef _init_files(self, _stdin, _stdout, _stderr):
+ self.options.stdio_count = 0
+
+ cdef _kill(self, int signum):
+ cdef int err
+ self._ensure_alive()
+ err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum)
+ if err < 0:
+ raise convert_error(err)
+
+ cdef _on_exit(self, int64_t exit_status, int term_signal):
+ if term_signal:
+ # From Python docs:
+ # A negative value -N indicates that the child was
+ # terminated by signal N (POSIX only).
+ self._returncode = -term_signal
+ else:
+ self._returncode = exit_status
+
+ self._close()
+
+ cdef _close(self):
+ try:
+ if self._loop is not None:
+ self._loop._untrack_process(self)
+ finally:
+ UVHandle._close(self)
+
+
+DEF _CALL_PIPE_DATA_RECEIVED = 0
+DEF _CALL_PIPE_CONNECTION_LOST = 1
+DEF _CALL_PROCESS_EXITED = 2
+DEF _CALL_CONNECTION_LOST = 3
+
+
+@cython.no_gc_clear
+cdef class UVProcessTransport(UVProcess):
+ def __cinit__(self):
+ self._exit_waiters = []
+ self._protocol = None
+
+ self._init_futs = []
+ self._pending_calls = []
+ self._stdio_ready = 0
+
+ self._stdin = self._stdout = self._stderr = None
+ self.stdin_proto = self.stdout_proto = self.stderr_proto = None
+
+ self._finished = 0
+
+ cdef _on_exit(self, int64_t exit_status, int term_signal):
+ UVProcess._on_exit(self, exit_status, term_signal)
+
+ if self._stdio_ready:
+ self._loop.call_soon(self._protocol.process_exited,
+ context=self.context)
+ else:
+ self._pending_calls.append((_CALL_PROCESS_EXITED, None, None))
+
+ self._try_finish()
+
+ for waiter in self._exit_waiters:
+ if not waiter.cancelled():
+ waiter.set_result(self._returncode)
+ self._exit_waiters.clear()
+
+ self._close()
+
+ cdef _check_proc(self):
+ if not self._is_alive() or self._returncode is not None:
+ raise ProcessLookupError()
+
+ cdef _pipe_connection_lost(self, int fd, exc):
+ if self._stdio_ready:
+ self._loop.call_soon(self._protocol.pipe_connection_lost, fd, exc,
+ context=self.context)
+ self._try_finish()
+ else:
+ self._pending_calls.append((_CALL_PIPE_CONNECTION_LOST, fd, exc))
+
+ cdef _pipe_data_received(self, int fd, data):
+ if self._stdio_ready:
+ self._loop.call_soon(self._protocol.pipe_data_received, fd, data,
+ context=self.context)
+ else:
+ self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data))
+
+ cdef _file_redirect_stdio(self, int fd):
+ fd = os_dup(fd)
+ os_set_inheritable(fd, True)
+ self._close_after_spawn(fd)
+ return fd
+
+ cdef _file_devnull(self):
+ dn = os_open(os_devnull, os_O_RDWR)
+ os_set_inheritable(dn, True)
+ self._close_after_spawn(dn)
+ return dn
+
+ cdef _file_outpipe(self):
+ r, w = __socketpair()
+ os_set_inheritable(w, True)
+ self._close_after_spawn(w)
+ return r, w
+
+ cdef _file_inpipe(self):
+ r, w = __socketpair()
+ os_set_inheritable(r, True)
+ self._close_after_spawn(r)
+ return r, w
+
+ cdef _init_files(self, _stdin, _stdout, _stderr):
+ cdef uv.uv_stdio_container_t *iocnt
+
+ UVProcess._init_files(self, _stdin, _stdout, _stderr)
+
+ io = [None, None, None]
+
+ self.options.stdio_count = 3
+ self.options.stdio = self.iocnt
+
+ if _stdin is not None:
+ if _stdin == subprocess_PIPE:
+ r, w = self._file_inpipe()
+ io[0] = r
+
+ self.stdin_proto = WriteSubprocessPipeProto(self, 0)
+ waiter = self._loop._new_future()
+ self._stdin = WriteUnixTransport.new(
+ self._loop, self.stdin_proto, None, waiter)
+ self._init_futs.append(waiter)
+ self._stdin._open(w)
+ self._stdin._init_protocol()
+ elif _stdin == subprocess_DEVNULL:
+ io[0] = self._file_devnull()
+ elif _stdout == subprocess_STDOUT:
+ raise ValueError(
+ 'subprocess.STDOUT is supported only by stderr parameter')
+ else:
+ io[0] = self._file_redirect_stdio(_stdin)
+ else:
+ io[0] = self._file_redirect_stdio(0)
+
+ if _stdout is not None:
+ if _stdout == subprocess_PIPE:
+ # We can't use UV_CREATE_PIPE here, since 'stderr' might be
+ # set to 'subprocess.STDOUT', and there is no way to
+ # emulate that functionality with libuv high-level
+ # streams API. Therefore, we create pipes for stdout and
+ # stderr manually.
+
+ r, w = self._file_outpipe()
+ io[1] = w
+
+ self.stdout_proto = ReadSubprocessPipeProto(self, 1)
+ waiter = self._loop._new_future()
+ self._stdout = ReadUnixTransport.new(
+ self._loop, self.stdout_proto, None, waiter)
+ self._init_futs.append(waiter)
+ self._stdout._open(r)
+ self._stdout._init_protocol()
+ elif _stdout == subprocess_DEVNULL:
+ io[1] = self._file_devnull()
+ elif _stdout == subprocess_STDOUT:
+ raise ValueError(
+ 'subprocess.STDOUT is supported only by stderr parameter')
+ else:
+ io[1] = self._file_redirect_stdio(_stdout)
+ else:
+ io[1] = self._file_redirect_stdio(1)
+
+ if _stderr is not None:
+ if _stderr == subprocess_PIPE:
+ r, w = self._file_outpipe()
+ io[2] = w
+
+ self.stderr_proto = ReadSubprocessPipeProto(self, 2)
+ waiter = self._loop._new_future()
+ self._stderr = ReadUnixTransport.new(
+ self._loop, self.stderr_proto, None, waiter)
+ self._init_futs.append(waiter)
+ self._stderr._open(r)
+ self._stderr._init_protocol()
+ elif _stderr == subprocess_STDOUT:
+ if io[1] is None:
+ # shouldn't ever happen
+ raise RuntimeError('cannot apply subprocess.STDOUT')
+
+ io[2] = self._file_redirect_stdio(io[1])
+ elif _stderr == subprocess_DEVNULL:
+ io[2] = self._file_devnull()
+ else:
+ io[2] = self._file_redirect_stdio(_stderr)
+ else:
+ io[2] = self._file_redirect_stdio(2)
+
+ assert len(io) == 3
+ for idx in range(3):
+ iocnt = &self.iocnt[idx]
+ if io[idx] is not None:
+ iocnt.flags = uv.UV_INHERIT_FD
+ iocnt.data.fd = io[idx]
+ else:
+ iocnt.flags = uv.UV_IGNORE
+
+ cdef _call_connection_made(self, waiter):
+ try:
+ # we're always called in the right context, so just call the user's
+ self._protocol.connection_made(self)
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except BaseException as ex:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_exception(ex)
+ else:
+ raise
+ else:
+ if waiter is not None and not waiter.cancelled():
+ waiter.set_result(True)
+
+ self._stdio_ready = 1
+ if self._pending_calls:
+ pending_calls = self._pending_calls.copy()
+ self._pending_calls.clear()
+ for (type, fd, arg) in pending_calls:
+ if type == _CALL_PIPE_CONNECTION_LOST:
+ self._pipe_connection_lost(fd, arg)
+ elif type == _CALL_PIPE_DATA_RECEIVED:
+ self._pipe_data_received(fd, arg)
+ elif type == _CALL_PROCESS_EXITED:
+ self._loop.call_soon(self._protocol.process_exited)
+ elif type == _CALL_CONNECTION_LOST:
+ self._loop.call_soon(self._protocol.connection_lost, None)
+
+ cdef _try_finish(self):
+ if self._returncode is None or self._finished:
+ return
+
+ if ((self.stdin_proto is None or self.stdin_proto.disconnected) and
+ (self.stdout_proto is None or
+ self.stdout_proto.disconnected) and
+ (self.stderr_proto is None or
+ self.stderr_proto.disconnected)):
+
+ self._finished = 1
+
+ if self._stdio_ready:
+ # copy self.context for simplicity
+ self._loop.call_soon(self._protocol.connection_lost, None,
+ context=self.context)
+ else:
+ self._pending_calls.append((_CALL_CONNECTION_LOST, None, None))
+
+ def __stdio_inited(self, waiter, stdio_fut):
+ exc = stdio_fut.exception()
+ if exc is not None:
+ if waiter is None:
+ raise exc
+ else:
+ waiter.set_exception(exc)
+ else:
+ self._loop._call_soon_handle(
+ new_MethodHandle1(self._loop,
+ "UVProcessTransport._call_connection_made",
+ <method1_t>self._call_connection_made,
+ None, # means to copy the current context
+ self, waiter))
+
+ @staticmethod
+ cdef UVProcessTransport new(Loop loop, protocol, args, env,
+ cwd, start_new_session,
+ _stdin, _stdout, _stderr, pass_fds,
+ waiter,
+ debug_flags,
+ preexec_fn,
+ restore_signals):
+
+ cdef UVProcessTransport handle
+ handle = UVProcessTransport.__new__(UVProcessTransport)
+ handle._protocol = protocol
+ handle._init(loop, args, env, cwd, start_new_session,
+ __process_convert_fileno(_stdin),
+ __process_convert_fileno(_stdout),
+ __process_convert_fileno(_stderr),
+ pass_fds,
+ debug_flags,
+ preexec_fn,
+ restore_signals)
+
+ if handle._init_futs:
+ handle._stdio_ready = 0
+ init_fut = aio_gather(*handle._init_futs)
+ # add_done_callback will copy the current context and run the
+ # callback within the context
+ init_fut.add_done_callback(
+ ft_partial(handle.__stdio_inited, waiter))
+ else:
+ handle._stdio_ready = 1
+ loop._call_soon_handle(
+ new_MethodHandle1(loop,
+ "UVProcessTransport._call_connection_made",
+ <method1_t>handle._call_connection_made,
+ None, # means to copy the current context
+ handle, waiter))
+
+ return handle
+
+ def get_protocol(self):
+ return self._protocol
+
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_pid(self):
+ return self._pid
+
+ def get_returncode(self):
+ return self._returncode
+
+ def get_pipe_transport(self, fd):
+ if fd == 0:
+ return self._stdin
+ elif fd == 1:
+ return self._stdout
+ elif fd == 2:
+ return self._stderr
+
+ def terminate(self):
+ self._check_proc()
+ self._kill(uv.SIGTERM)
+
+ def kill(self):
+ self._check_proc()
+ self._kill(uv.SIGKILL)
+
+ def send_signal(self, int signal):
+ self._check_proc()
+ self._kill(signal)
+
+ def is_closing(self):
+ return self._closed
+
+ def close(self):
+ if self._returncode is None:
+ self._kill(uv.SIGKILL)
+
+ if self._stdin is not None:
+ self._stdin.close()
+ if self._stdout is not None:
+ self._stdout.close()
+ if self._stderr is not None:
+ self._stderr.close()
+
+ if self._returncode is not None:
+ # The process is dead, just close the UV handle.
+ #
+ # (If "self._returncode is None", the process should have been
+ # killed already and we're just waiting for a SIGCHLD; after
+ # which the transport will be GC'ed and the uvhandle will be
+ # closed in UVHandle.__dealloc__.)
+ self._close()
+
+ def get_extra_info(self, name, default=None):
+ return default
+
+ def _wait(self):
+ fut = self._loop._new_future()
+ if self._returncode is not None:
+ fut.set_result(self._returncode)
+ return fut
+
+ self._exit_waiters.append(fut)
+ return fut
+
+
+class WriteSubprocessPipeProto(aio_BaseProtocol):
+
+ def __init__(self, proc, fd):
+ if UVLOOP_DEBUG:
+ if type(proc) is not UVProcessTransport:
+ raise TypeError
+ if not isinstance(fd, int):
+ raise TypeError
+ self.proc = proc
+ self.fd = fd
+ self.pipe = None
+ self.disconnected = False
+
+ def connection_made(self, transport):
+ self.pipe = transport
+
+ def __repr__(self):
+ return ('<%s fd=%s pipe=%r>'
+ % (self.__class__.__name__, self.fd, self.pipe))
+
+ def connection_lost(self, exc):
+ self.disconnected = True
+ (<UVProcessTransport>self.proc)._pipe_connection_lost(self.fd, exc)
+ self.proc = None
+
+ def pause_writing(self):
+ (<UVProcessTransport>self.proc)._protocol.pause_writing()
+
+ def resume_writing(self):
+ (<UVProcessTransport>self.proc)._protocol.resume_writing()
+
+
+class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
+ aio_Protocol):
+
+ def data_received(self, data):
+ (<UVProcessTransport>self.proc)._pipe_data_received(self.fd, data)
+
+
+cdef __process_convert_fileno(object obj):
+ if obj is None or isinstance(obj, int):
+ return obj
+
+ fileno = obj.fileno()
+ if not isinstance(fileno, int):
+ raise TypeError(
+ '{!r}.fileno() returned non-integer'.format(obj))
+ return fileno
+
+
+cdef void __uvprocess_on_exit_callback(
+ uv.uv_process_t *handle,
+ int64_t exit_status,
+ int term_signal,
+) noexcept with gil:
+
+ if __ensure_handle_data(<uv.uv_handle_t*>handle,
+ "UVProcess exit callback") == 0:
+ return
+
+ cdef UVProcess proc = <UVProcess> handle.data
+ try:
+ proc._on_exit(exit_status, term_signal)
+ except BaseException as ex:
+ proc._error(ex, False)
+
+
+cdef __socketpair():
+ cdef:
+ int fds[2]
+ int err
+
+ err = system.socketpair(uv.AF_UNIX, uv.SOCK_STREAM, 0, fds)
+ if err:
+ exc = convert_error(-err)
+ raise exc
+
+ os_set_inheritable(fds[0], False)
+ os_set_inheritable(fds[1], False)
+
+ return fds[0], fds[1]
+
+
+cdef void __uv_close_process_handle_cb(
+ uv.uv_handle_t* handle
+) noexcept with gil:
+ PyMem_RawFree(handle)