diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx')
-rw-r--r-- | venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx new file mode 100644 index 0000000..195576c --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx @@ -0,0 +1,226 @@ +cdef __pipe_init_uv_handle(UVStream handle, Loop loop): + cdef int err + + handle._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_pipe_t)) + if handle._handle is NULL: + handle._abort_init() + raise MemoryError() + + # Initialize pipe handle with ipc=0. + # ipc=1 means that libuv will use recvmsg/sendmsg + # instead of recv/send. + err = uv.uv_pipe_init(handle._loop.uvloop, + <uv.uv_pipe_t*>handle._handle, + 0) + # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe + # even if it is O_WRONLY, see also #317, libuv/libuv#2058 + handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE + if err < 0: + handle._abort_init() + raise convert_error(err) + + handle._finish_init() + + +cdef __pipe_open(UVStream handle, int fd): + cdef int err + err = uv.uv_pipe_open(<uv.uv_pipe_t *>handle._handle, + <uv.uv_file>fd) + if err < 0: + exc = convert_error(err) + raise exc + + +cdef __pipe_get_socket(UVSocketHandle handle): + fileno = handle._fileno() + return PseudoSocket(uv.AF_UNIX, uv.SOCK_STREAM, 0, fileno) + + +@cython.no_gc_clear +cdef class UnixServer(UVStreamServer): + + @staticmethod + cdef UnixServer new(Loop loop, object protocol_factory, Server server, + object backlog, + object ssl, + object ssl_handshake_timeout, + object ssl_shutdown_timeout): + + cdef UnixServer handle + handle = UnixServer.__new__(UnixServer) + handle._init(loop, protocol_factory, server, backlog, + ssl, ssl_handshake_timeout, ssl_shutdown_timeout) + __pipe_init_uv_handle(<UVStream>handle, loop) + return handle + + cdef _new_socket(self): + return __pipe_get_socket(<UVSocketHandle>self) + + cdef _open(self, int sockfd): + self._ensure_alive() + __pipe_open(<UVStream>self, sockfd) + self._mark_as_open() + + cdef bind(self, str path): + cdef int err + self._ensure_alive() + err = uv.uv_pipe_bind(<uv.uv_pipe_t *>self._handle, + path.encode()) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + self._mark_as_open() + + cdef UVStream _make_new_transport(self, object protocol, object waiter, + object context): + cdef UnixTransport tr + tr = UnixTransport.new(self._loop, protocol, self._server, waiter, + context) + return <UVStream>tr + + +@cython.no_gc_clear +cdef class UnixTransport(UVStream): + + @staticmethod + cdef UnixTransport new(Loop loop, object protocol, Server server, + object waiter, object context): + + cdef UnixTransport handle + handle = UnixTransport.__new__(UnixTransport) + handle._init(loop, protocol, server, waiter, context) + __pipe_init_uv_handle(<UVStream>handle, loop) + return handle + + cdef _new_socket(self): + return __pipe_get_socket(<UVSocketHandle>self) + + cdef _open(self, int sockfd): + __pipe_open(<UVStream>self, sockfd) + + cdef connect(self, char* addr): + cdef _PipeConnectRequest req + req = _PipeConnectRequest(self._loop, self) + req.connect(addr) + + +@cython.no_gc_clear +cdef class ReadUnixTransport(UVStream): + + @staticmethod + cdef ReadUnixTransport new(Loop loop, object protocol, Server server, + object waiter): + cdef ReadUnixTransport handle + handle = ReadUnixTransport.__new__(ReadUnixTransport) + # This is only used in connect_read_pipe() and subprocess_shell/exec() + # directly, we could simply copy the current context. + handle._init(loop, protocol, server, waiter, Context_CopyCurrent()) + __pipe_init_uv_handle(<UVStream>handle, loop) + return handle + + cdef _new_socket(self): + return __pipe_get_socket(<UVSocketHandle>self) + + cdef _open(self, int sockfd): + __pipe_open(<UVStream>self, sockfd) + + def get_write_buffer_limits(self): + raise NotImplementedError + + def set_write_buffer_limits(self, high=None, low=None): + raise NotImplementedError + + def get_write_buffer_size(self): + raise NotImplementedError + + def write(self, data): + raise NotImplementedError + + def writelines(self, list_of_data): + raise NotImplementedError + + def write_eof(self): + raise NotImplementedError + + def can_write_eof(self): + raise NotImplementedError + + def abort(self): + raise NotImplementedError + + +@cython.no_gc_clear +cdef class WriteUnixTransport(UVStream): + + @staticmethod + cdef WriteUnixTransport new(Loop loop, object protocol, Server server, + object waiter): + cdef WriteUnixTransport handle + handle = WriteUnixTransport.__new__(WriteUnixTransport) + + # We listen for read events on write-end of the pipe. When + # the read-end is close, the uv_stream_t.read callback will + # receive an error -- we want to silence that error, and just + # close the transport. + handle._close_on_read_error() + + # This is only used in connect_write_pipe() and subprocess_shell/exec() + # directly, we could simply copy the current context. + handle._init(loop, protocol, server, waiter, Context_CopyCurrent()) + __pipe_init_uv_handle(<UVStream>handle, loop) + return handle + + cdef _new_socket(self): + return __pipe_get_socket(<UVSocketHandle>self) + + cdef _open(self, int sockfd): + __pipe_open(<UVStream>self, sockfd) + + def pause_reading(self): + raise NotImplementedError + + def resume_reading(self): + raise NotImplementedError + + +cdef class _PipeConnectRequest(UVRequest): + cdef: + UnixTransport transport + uv.uv_connect_t _req_data + + def __cinit__(self, loop, transport): + self.request = <uv.uv_req_t*> &self._req_data + self.request.data = <void*>self + self.transport = transport + + cdef connect(self, char* addr): + # uv_pipe_connect returns void + uv.uv_pipe_connect(<uv.uv_connect_t*>self.request, + <uv.uv_pipe_t*>self.transport._handle, + addr, + __pipe_connect_callback) + +cdef void __pipe_connect_callback( + uv.uv_connect_t* req, + int status, +) noexcept with gil: + cdef: + _PipeConnectRequest wrapper + UnixTransport transport + + wrapper = <_PipeConnectRequest> req.data + transport = wrapper.transport + + if status < 0: + exc = convert_error(status) + else: + exc = None + + try: + transport._on_connect(exc) + except BaseException as ex: + wrapper.transport._fatal_error(ex, False) + finally: + wrapper.on_done() |