summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx')
-rw-r--r--venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx226
1 files changed, 226 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx
new file mode 100644
index 0000000..195576c
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx
@@ -0,0 +1,226 @@
+cdef __pipe_init_uv_handle(UVStream handle, Loop loop):
+ cdef int err
+
+ handle._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_pipe_t))
+ if handle._handle is NULL:
+ handle._abort_init()
+ raise MemoryError()
+
+ # Initialize pipe handle with ipc=0.
+ # ipc=1 means that libuv will use recvmsg/sendmsg
+ # instead of recv/send.
+ err = uv.uv_pipe_init(handle._loop.uvloop,
+ <uv.uv_pipe_t*>handle._handle,
+ 0)
+ # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe
+ # even if it is O_WRONLY, see also #317, libuv/libuv#2058
+ handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE
+ if err < 0:
+ handle._abort_init()
+ raise convert_error(err)
+
+ handle._finish_init()
+
+
+cdef __pipe_open(UVStream handle, int fd):
+ cdef int err
+ err = uv.uv_pipe_open(<uv.uv_pipe_t *>handle._handle,
+ <uv.uv_file>fd)
+ if err < 0:
+ exc = convert_error(err)
+ raise exc
+
+
+cdef __pipe_get_socket(UVSocketHandle handle):
+ fileno = handle._fileno()
+ return PseudoSocket(uv.AF_UNIX, uv.SOCK_STREAM, 0, fileno)
+
+
+@cython.no_gc_clear
+cdef class UnixServer(UVStreamServer):
+
+ @staticmethod
+ cdef UnixServer new(Loop loop, object protocol_factory, Server server,
+ object backlog,
+ object ssl,
+ object ssl_handshake_timeout,
+ object ssl_shutdown_timeout):
+
+ cdef UnixServer handle
+ handle = UnixServer.__new__(UnixServer)
+ handle._init(loop, protocol_factory, server, backlog,
+ ssl, ssl_handshake_timeout, ssl_shutdown_timeout)
+ __pipe_init_uv_handle(<UVStream>handle, loop)
+ return handle
+
+ cdef _new_socket(self):
+ return __pipe_get_socket(<UVSocketHandle>self)
+
+ cdef _open(self, int sockfd):
+ self._ensure_alive()
+ __pipe_open(<UVStream>self, sockfd)
+ self._mark_as_open()
+
+ cdef bind(self, str path):
+ cdef int err
+ self._ensure_alive()
+ err = uv.uv_pipe_bind(<uv.uv_pipe_t *>self._handle,
+ path.encode())
+ if err < 0:
+ exc = convert_error(err)
+ self._fatal_error(exc, True)
+ return
+
+ self._mark_as_open()
+
+ cdef UVStream _make_new_transport(self, object protocol, object waiter,
+ object context):
+ cdef UnixTransport tr
+ tr = UnixTransport.new(self._loop, protocol, self._server, waiter,
+ context)
+ return <UVStream>tr
+
+
+@cython.no_gc_clear
+cdef class UnixTransport(UVStream):
+
+ @staticmethod
+ cdef UnixTransport new(Loop loop, object protocol, Server server,
+ object waiter, object context):
+
+ cdef UnixTransport handle
+ handle = UnixTransport.__new__(UnixTransport)
+ handle._init(loop, protocol, server, waiter, context)
+ __pipe_init_uv_handle(<UVStream>handle, loop)
+ return handle
+
+ cdef _new_socket(self):
+ return __pipe_get_socket(<UVSocketHandle>self)
+
+ cdef _open(self, int sockfd):
+ __pipe_open(<UVStream>self, sockfd)
+
+ cdef connect(self, char* addr):
+ cdef _PipeConnectRequest req
+ req = _PipeConnectRequest(self._loop, self)
+ req.connect(addr)
+
+
+@cython.no_gc_clear
+cdef class ReadUnixTransport(UVStream):
+
+ @staticmethod
+ cdef ReadUnixTransport new(Loop loop, object protocol, Server server,
+ object waiter):
+ cdef ReadUnixTransport handle
+ handle = ReadUnixTransport.__new__(ReadUnixTransport)
+ # This is only used in connect_read_pipe() and subprocess_shell/exec()
+ # directly, we could simply copy the current context.
+ handle._init(loop, protocol, server, waiter, Context_CopyCurrent())
+ __pipe_init_uv_handle(<UVStream>handle, loop)
+ return handle
+
+ cdef _new_socket(self):
+ return __pipe_get_socket(<UVSocketHandle>self)
+
+ cdef _open(self, int sockfd):
+ __pipe_open(<UVStream>self, sockfd)
+
+ def get_write_buffer_limits(self):
+ raise NotImplementedError
+
+ def set_write_buffer_limits(self, high=None, low=None):
+ raise NotImplementedError
+
+ def get_write_buffer_size(self):
+ raise NotImplementedError
+
+ def write(self, data):
+ raise NotImplementedError
+
+ def writelines(self, list_of_data):
+ raise NotImplementedError
+
+ def write_eof(self):
+ raise NotImplementedError
+
+ def can_write_eof(self):
+ raise NotImplementedError
+
+ def abort(self):
+ raise NotImplementedError
+
+
+@cython.no_gc_clear
+cdef class WriteUnixTransport(UVStream):
+
+ @staticmethod
+ cdef WriteUnixTransport new(Loop loop, object protocol, Server server,
+ object waiter):
+ cdef WriteUnixTransport handle
+ handle = WriteUnixTransport.__new__(WriteUnixTransport)
+
+ # We listen for read events on write-end of the pipe. When
+ # the read-end is close, the uv_stream_t.read callback will
+ # receive an error -- we want to silence that error, and just
+ # close the transport.
+ handle._close_on_read_error()
+
+ # This is only used in connect_write_pipe() and subprocess_shell/exec()
+ # directly, we could simply copy the current context.
+ handle._init(loop, protocol, server, waiter, Context_CopyCurrent())
+ __pipe_init_uv_handle(<UVStream>handle, loop)
+ return handle
+
+ cdef _new_socket(self):
+ return __pipe_get_socket(<UVSocketHandle>self)
+
+ cdef _open(self, int sockfd):
+ __pipe_open(<UVStream>self, sockfd)
+
+ def pause_reading(self):
+ raise NotImplementedError
+
+ def resume_reading(self):
+ raise NotImplementedError
+
+
+cdef class _PipeConnectRequest(UVRequest):
+ cdef:
+ UnixTransport transport
+ uv.uv_connect_t _req_data
+
+ def __cinit__(self, loop, transport):
+ self.request = <uv.uv_req_t*> &self._req_data
+ self.request.data = <void*>self
+ self.transport = transport
+
+ cdef connect(self, char* addr):
+ # uv_pipe_connect returns void
+ uv.uv_pipe_connect(<uv.uv_connect_t*>self.request,
+ <uv.uv_pipe_t*>self.transport._handle,
+ addr,
+ __pipe_connect_callback)
+
+cdef void __pipe_connect_callback(
+ uv.uv_connect_t* req,
+ int status,
+) noexcept with gil:
+ cdef:
+ _PipeConnectRequest wrapper
+ UnixTransport transport
+
+ wrapper = <_PipeConnectRequest> req.data
+ transport = wrapper.transport
+
+ if status < 0:
+ exc = convert_error(status)
+ else:
+ exc = None
+
+ try:
+ transport._on_connect(exc)
+ except BaseException as ex:
+ wrapper.transport._fatal_error(ex, False)
+ finally:
+ wrapper.on_done()