From 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:10:44 -0400 Subject: venv --- .../site-packages/uvloop/handles/stream.pyx | 1015 ++++++++++++++++++++ 1 file changed, 1015 insertions(+) create mode 100644 venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx') diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx new file mode 100644 index 0000000..d4e02e3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx @@ -0,0 +1,1015 @@ +DEF __PREALLOCED_BUFS = 4 + + +@cython.no_gc_clear +@cython.freelist(DEFAULT_FREELIST_SIZE) +cdef class _StreamWriteContext: + # used to hold additional write request information for uv_write + + cdef: + uv.uv_write_t req + + list buffers + + uv.uv_buf_t uv_bufs_sml[__PREALLOCED_BUFS] + Py_buffer py_bufs_sml[__PREALLOCED_BUFS] + bint py_bufs_sml_inuse + + uv.uv_buf_t* uv_bufs + Py_buffer* py_bufs + size_t py_bufs_len + + uv.uv_buf_t* uv_bufs_start + size_t uv_bufs_len + + UVStream stream + + bint closed + + cdef free_bufs(self): + cdef size_t i + + if self.uv_bufs is not NULL: + PyMem_RawFree(self.uv_bufs) + self.uv_bufs = NULL + if UVLOOP_DEBUG: + if self.py_bufs_sml_inuse: + raise RuntimeError( + '_StreamWriteContext.close: uv_bufs != NULL and ' + 'py_bufs_sml_inuse is True') + + if self.py_bufs is not NULL: + for i from 0 <= i < self.py_bufs_len: + PyBuffer_Release(&self.py_bufs[i]) + PyMem_RawFree(self.py_bufs) + self.py_bufs = NULL + if UVLOOP_DEBUG: + if self.py_bufs_sml_inuse: + raise RuntimeError( + '_StreamWriteContext.close: py_bufs != NULL and ' + 'py_bufs_sml_inuse is True') + + if self.py_bufs_sml_inuse: + for i from 0 <= i < self.py_bufs_len: + PyBuffer_Release(&self.py_bufs_sml[i]) + self.py_bufs_sml_inuse = 0 + + self.py_bufs_len = 0 + self.buffers = None + + cdef close(self): + if self.closed: + return + self.closed = 1 + self.free_bufs() + Py_DECREF(self) + + cdef advance_uv_buf(self, size_t sent): + # Advance the pointer to first uv_buf and the + # pointer to first byte in that buffer. + # + # We do this after a "uv_try_write" call, which + # sometimes sends only a portion of data. + # We then call "advance_uv_buf" on the write + # context, and reuse it in a "uv_write" call. + + cdef: + uv.uv_buf_t* buf + size_t idx + + for idx from 0 <= idx < self.uv_bufs_len: + buf = &self.uv_bufs_start[idx] + if buf.len > sent: + buf.len -= sent + buf.base = buf.base + sent + self.uv_bufs_start = buf + self.uv_bufs_len -= idx + return + else: + sent -= self.uv_bufs_start[idx].len + + if UVLOOP_DEBUG: + if sent < 0: + raise RuntimeError('fatal: sent < 0 in advance_uv_buf') + + raise RuntimeError('fatal: Could not advance _StreamWriteContext') + + @staticmethod + cdef _StreamWriteContext new(UVStream stream, list buffers): + cdef: + _StreamWriteContext ctx + int uv_bufs_idx = 0 + size_t py_bufs_len = 0 + int i + + Py_buffer* p_pybufs + uv.uv_buf_t* p_uvbufs + + ctx = _StreamWriteContext.__new__(_StreamWriteContext) + ctx.stream = None + ctx.closed = 1 + ctx.py_bufs_len = 0 + ctx.py_bufs_sml_inuse = 0 + ctx.uv_bufs = NULL + ctx.py_bufs = NULL + ctx.buffers = buffers + ctx.stream = stream + + if len(buffers) <= __PREALLOCED_BUFS: + # We've got a small number of buffers to write, don't + # need to use malloc. + ctx.py_bufs_sml_inuse = 1 + p_pybufs = &ctx.py_bufs_sml + p_uvbufs = &ctx.uv_bufs_sml + + else: + for buf in buffers: + if UVLOOP_DEBUG: + if not isinstance(buf, (bytes, bytearray, memoryview)): + raise RuntimeError( + 'invalid data in writebuf: an instance of ' + 'bytes, bytearray or memoryview was expected, ' + 'got {}'.format(type(buf))) + + if not PyBytes_CheckExact(buf): + py_bufs_len += 1 + + if py_bufs_len > 0: + ctx.py_bufs = PyMem_RawMalloc( + py_bufs_len * sizeof(Py_buffer)) + if ctx.py_bufs is NULL: + raise MemoryError() + + ctx.uv_bufs = PyMem_RawMalloc( + len(buffers) * sizeof(uv.uv_buf_t)) + if ctx.uv_bufs is NULL: + raise MemoryError() + + p_pybufs = ctx.py_bufs + p_uvbufs = ctx.uv_bufs + + py_bufs_len = 0 + for buf in buffers: + if PyBytes_CheckExact(buf): + # We can only use this hack for bytes since it's + # immutable. For everything else it is only safe to + # use buffer protocol. + p_uvbufs[uv_bufs_idx].base = PyBytes_AS_STRING(buf) + p_uvbufs[uv_bufs_idx].len = Py_SIZE(buf) + + else: + try: + PyObject_GetBuffer( + buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE) + except Exception: + # This shouldn't ever happen, as `UVStream._buffer_write` + # casts non-bytes objects to `memoryviews`. + ctx.py_bufs_len = py_bufs_len + ctx.free_bufs() + raise + + p_uvbufs[uv_bufs_idx].base = p_pybufs[py_bufs_len].buf + p_uvbufs[uv_bufs_idx].len = p_pybufs[py_bufs_len].len + + py_bufs_len += 1 + + uv_bufs_idx += 1 + + ctx.uv_bufs_start = p_uvbufs + ctx.uv_bufs_len = uv_bufs_idx + + ctx.py_bufs_len = py_bufs_len + ctx.req.data = ctx + + if UVLOOP_DEBUG: + stream._loop._debug_stream_write_ctx_total += 1 + stream._loop._debug_stream_write_ctx_cnt += 1 + + # Do incref after everything else is done. + # Under no circumstances we want `ctx` to be GCed while + # libuv is still working with `ctx.uv_bufs`. + Py_INCREF(ctx) + ctx.closed = 0 + return ctx + + def __dealloc__(self): + if not self.closed: + # Because we do an INCREF in _StreamWriteContext.new, + # __dealloc__ shouldn't ever happen with `self.closed == 1` + raise RuntimeError( + 'open _StreamWriteContext is being deallocated') + + if UVLOOP_DEBUG: + if self.stream is not None: + self.stream._loop._debug_stream_write_ctx_cnt -= 1 + self.stream = None + + +@cython.no_gc_clear +cdef class UVStream(UVBaseTransport): + + def __cinit__(self): + self.__shutting_down = 0 + self.__reading = 0 + self.__read_error_close = 0 + self.__buffered = 0 + self._eof = 0 + self._buffer = [] + self._buffer_size = 0 + + self._protocol_get_buffer = None + self._protocol_buffer_updated = None + + self._read_pybuf_acquired = False + + cdef _set_protocol(self, object protocol): + if protocol is None: + raise TypeError('protocol is required') + + UVBaseTransport._set_protocol(self, protocol) + + if (hasattr(protocol, 'get_buffer') and + not isinstance(protocol, aio_Protocol)): + try: + self._protocol_get_buffer = protocol.get_buffer + self._protocol_buffer_updated = protocol.buffer_updated + self.__buffered = 1 + except AttributeError: + pass + else: + self.__buffered = 0 + + cdef _clear_protocol(self): + UVBaseTransport._clear_protocol(self) + self._protocol_get_buffer = None + self._protocol_buffer_updated = None + self.__buffered = 0 + + cdef inline _shutdown(self): + cdef int err + + if self.__shutting_down: + return + self.__shutting_down = 1 + + self._ensure_alive() + + self._shutdown_req.data = self + err = uv.uv_shutdown(&self._shutdown_req, + self._handle, + __uv_stream_on_shutdown) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef inline _accept(self, UVStream server): + cdef int err + self._ensure_alive() + + err = uv.uv_accept(server._handle, + self._handle) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + self._on_accept() + + cdef inline _close_on_read_error(self): + self.__read_error_close = 1 + + cdef bint _is_reading(self): + return self.__reading + + cdef _start_reading(self): + cdef int err + + if self._closing: + return + + self._ensure_alive() + + if self.__reading: + return + + if self.__buffered: + err = uv.uv_read_start(self._handle, + __uv_stream_buffered_alloc, + __uv_stream_buffered_on_read) + else: + err = uv.uv_read_start(self._handle, + __loop_alloc_buffer, + __uv_stream_on_read) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + else: + # UVStream must live until the read callback is called + self.__reading_started() + + cdef inline __reading_started(self): + if self.__reading: + return + self.__reading = 1 + Py_INCREF(self) + + cdef inline __reading_stopped(self): + if not self.__reading: + return + self.__reading = 0 + Py_DECREF(self) + + cdef _stop_reading(self): + cdef int err + + if not self.__reading: + return + + self._ensure_alive() + + # From libuv docs: + # This function is idempotent and may be safely + # called on a stopped stream. + err = uv.uv_read_stop(self._handle) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + else: + self.__reading_stopped() + + cdef inline _try_write(self, object data): + cdef: + ssize_t written + bint used_buf = 0 + Py_buffer py_buf + void* buf + size_t blen + int saved_errno + int fd + + if (self._handle).write_queue_size != 0: + raise RuntimeError( + 'UVStream._try_write called with data in uv buffers') + + if PyBytes_CheckExact(data): + # We can only use this hack for bytes since it's + # immutable. For everything else it is only safe to + # use buffer protocol. + buf = PyBytes_AS_STRING(data) + blen = Py_SIZE(data) + else: + PyObject_GetBuffer(data, &py_buf, PyBUF_SIMPLE) + used_buf = 1 + buf = py_buf.buf + blen = py_buf.len + + if blen == 0: + # Empty data, do nothing. + return 0 + + fd = self._fileno() + # Use `unistd.h/write` directly, it's faster than + # uv_try_write -- less layers of code. The error + # checking logic is copied from libuv. + written = system.write(fd, buf, blen) + while written == -1 and ( + errno.errno == errno.EINTR or + (system.PLATFORM_IS_APPLE and + errno.errno == errno.EPROTOTYPE)): + # From libuv code (unix/stream.c): + # Due to a possible kernel bug at least in OS X 10.10 "Yosemite", + # EPROTOTYPE can be returned while trying to write to a socket + # that is shutting down. If we retry the write, we should get + # the expected EPIPE instead. + written = system.write(fd, buf, blen) + saved_errno = errno.errno + + if used_buf: + PyBuffer_Release(&py_buf) + + if written < 0: + if saved_errno == errno.EAGAIN or \ + saved_errno == system.EWOULDBLOCK: + return -1 + else: + exc = convert_error(-saved_errno) + self._fatal_error(exc, True) + return + + if UVLOOP_DEBUG: + self._loop._debug_stream_write_tries += 1 + + if written == blen: + return 0 + + return written + + cdef inline _buffer_write(self, object data): + cdef int dlen + + if not PyBytes_CheckExact(data): + data = memoryview(data).cast('b') + + dlen = len(data) + if not dlen: + return + + self._buffer_size += dlen + self._buffer.append(data) + + cdef inline _initiate_write(self): + if (not self._protocol_paused and + (self._handle).write_queue_size == 0 and + self._buffer_size > self._high_water): + # Fast-path. If: + # - the protocol isn't yet paused, + # - there is no data in libuv buffers for this stream, + # - the protocol will be paused if we continue to buffer data + # + # Then: + # - Try to write all buffered data right now. + all_sent = self._exec_write() + if UVLOOP_DEBUG: + if self._buffer_size != 0 or self._buffer != []: + raise RuntimeError( + '_buffer_size is not 0 after a successful _exec_write') + + # There is no need to call `_queue_write` anymore, + # as `uv_write` should be called already. + + if not all_sent: + # If not all of the data was sent successfully, + # we might need to pause the protocol. + self._maybe_pause_protocol() + + elif self._buffer_size > 0: + self._maybe_pause_protocol() + self._loop._queue_write(self) + + cdef inline _exec_write(self): + cdef: + int err + int buf_len + _StreamWriteContext ctx = None + + if self._closed: + # If the handle is closed, just return, it's too + # late to do anything. + return + + buf_len = len(self._buffer) + if not buf_len: + return + + if (self._handle).write_queue_size == 0: + # libuv internal write buffers for this stream are empty. + if buf_len == 1: + # If we only have one piece of data to send, let's + # use our fast implementation of try_write. + data = self._buffer[0] + sent = self._try_write(data) + + if sent is None: + # A `self._fatal_error` was called. + # It might not raise an exception under some + # conditions. + self._buffer_size = 0 + self._buffer.clear() + if not self._closing: + # This should never happen. + raise RuntimeError( + 'stream is open after UVStream._try_write ' + 'returned None') + return + + if sent == 0: + # All data was successfully written. + self._buffer_size = 0 + self._buffer.clear() + # on_write will call "maybe_resume_protocol". + self._on_write() + return True + + if sent > 0: + if UVLOOP_DEBUG: + if sent == len(data): + raise RuntimeError( + '_try_write sent all data and returned ' + 'non-zero') + + if PyBytes_CheckExact(data): + # Cast bytes to memoryview to avoid copying + # data that wasn't sent. + data = memoryview(data) + data = data[sent:] + + self._buffer_size -= sent + self._buffer[0] = data + + # At this point it's either data was sent partially, + # or an EAGAIN has happened. + + else: + ctx = _StreamWriteContext.new(self, self._buffer) + + err = uv.uv_try_write(self._handle, + ctx.uv_bufs_start, + ctx.uv_bufs_len) + + if err > 0: + # Some data was successfully sent. + + if err == self._buffer_size: + # Everything was sent. + ctx.close() + self._buffer.clear() + self._buffer_size = 0 + # on_write will call "maybe_resume_protocol". + self._on_write() + return True + + try: + # Advance pointers to uv_bufs in `ctx`, + # we will reuse it soon for a uv_write + # call. + ctx.advance_uv_buf(err) + except Exception as ex: # This should never happen. + # Let's try to close the `ctx` anyways. + ctx.close() + self._fatal_error(ex, True) + self._buffer.clear() + self._buffer_size = 0 + return + + elif err != uv.UV_EAGAIN: + ctx.close() + exc = convert_error(err) + self._fatal_error(exc, True) + self._buffer.clear() + self._buffer_size = 0 + return + + # fall through + + if ctx is None: + ctx = _StreamWriteContext.new(self, self._buffer) + + err = uv.uv_write(&ctx.req, + self._handle, + ctx.uv_bufs_start, + ctx.uv_bufs_len, + __uv_stream_on_write) + + self._buffer_size = 0 + # Can't use `_buffer.clear()` here: `ctx` holds a reference to + # the `_buffer`. + self._buffer = [] + + if err < 0: + # close write context + ctx.close() + + exc = convert_error(err) + self._fatal_error(exc, True) + return + + self._maybe_resume_protocol() + + cdef size_t _get_write_buffer_size(self): + if self._handle is NULL: + return 0 + return ((self._handle).write_queue_size + + self._buffer_size) + + cdef _close(self): + try: + if self._read_pybuf_acquired: + # Should never happen. libuv always calls uv_alloc/uv_read + # in pairs. + self._loop.call_exception_handler({ + 'transport': self, + 'message': 'XXX: an allocated buffer in transport._close()' + }) + self._read_pybuf_acquired = 0 + PyBuffer_Release(&self._read_pybuf) + + self._stop_reading() + finally: + UVSocketHandle._close(self) + + cdef inline _on_accept(self): + # Ultimately called by __uv_stream_on_listen. + self._init_protocol() + + cdef inline _on_eof(self): + # Any exception raised here will be caught in + # __uv_stream_on_read. + + try: + meth = self._protocol.eof_received + except AttributeError: + keep_open = False + else: + keep_open = run_in_context(self.context, meth) + + if keep_open: + # We're keeping the connection open so the + # protocol can write more, but we still can't + # receive more, so remove the reader callback. + self._stop_reading() + else: + self.close() + + cdef inline _on_write(self): + self._maybe_resume_protocol() + if not self._get_write_buffer_size(): + if self._closing: + self._schedule_call_connection_lost(None) + elif self._eof: + self._shutdown() + + cdef inline _init(self, Loop loop, object protocol, Server server, + object waiter, object context): + self.context = context + self._set_protocol(protocol) + self._start_init(loop) + + if server is not None: + self._set_server(server) + + if waiter is not None: + self._set_waiter(waiter) + + cdef inline _on_connect(self, object exc): + # Called from __tcp_connect_callback (tcp.pyx) and + # __pipe_connect_callback (pipe.pyx). + if exc is None: + self._init_protocol() + else: + if self._waiter is None: + self._fatal_error(exc, False, "connect failed") + elif self._waiter.cancelled(): + # Connect call was cancelled; just close the transport + # silently. + self._close() + elif self._waiter.done(): + self._fatal_error(exc, False, "connect failed") + else: + self._waiter.set_exception(exc) + self._close() + + # === Public API === + + def __repr__(self): + return '<{} closed={} reading={} {:#x}>'.format( + self.__class__.__name__, + self._closed, + self.__reading, + id(self)) + + def write(self, object buf): + self._ensure_alive() + + if self._eof: + raise RuntimeError('Cannot call write() after write_eof()') + if not buf: + return + if self._conn_lost: + self._conn_lost += 1 + return + self._buffer_write(buf) + self._initiate_write() + + def writelines(self, bufs): + self._ensure_alive() + + if self._eof: + raise RuntimeError('Cannot call writelines() after write_eof()') + if self._conn_lost: + self._conn_lost += 1 + return + for buf in bufs: + self._buffer_write(buf) + self._initiate_write() + + def write_eof(self): + self._ensure_alive() + + if self._eof: + return + + self._eof = 1 + if not self._get_write_buffer_size(): + self._shutdown() + + def can_write_eof(self): + return True + + def is_reading(self): + return self._is_reading() + + def pause_reading(self): + if self._closing or not self._is_reading(): + return + self._stop_reading() + + def resume_reading(self): + if self._is_reading() or self._closing: + return + self._start_reading() + + +cdef void __uv_stream_on_shutdown(uv.uv_shutdown_t* req, + int status) noexcept with gil: + + # callback for uv_shutdown + + if req.data is NULL: + aio_logger.error( + 'UVStream.shutdown callback called with NULL req.data, status=%r', + status) + return + + cdef UVStream stream = req.data + + if status < 0 and status != uv.UV_ECANCELED: + # From libuv source code: + # The ECANCELED error code is a lie, the shutdown(2) syscall is a + # fait accompli at this point. Maybe we should revisit this in + # v0.11. A possible reason for leaving it unchanged is that it + # informs the callee that the handle has been destroyed. + + if UVLOOP_DEBUG: + stream._loop._debug_stream_shutdown_errors_total += 1 + + exc = convert_error(status) + stream._fatal_error( + exc, False, "error status in uv_stream_t.shutdown callback") + return + + +cdef inline bint __uv_stream_on_read_common( + UVStream sc, + Loop loop, + ssize_t nread, +): + if sc._closed: + # The stream was closed, there is no reason to + # do any work now. + sc.__reading_stopped() # Just in case. + return True + + if nread == uv.UV_EOF: + # From libuv docs: + # The callee is responsible for stopping closing the stream + # when an error happens by calling uv_read_stop() or uv_close(). + # Trying to read from the stream again is undefined. + try: + if UVLOOP_DEBUG: + loop._debug_stream_read_eof_total += 1 + + sc._stop_reading() + sc._on_eof() + except BaseException as ex: + if UVLOOP_DEBUG: + loop._debug_stream_read_eof_cb_errors_total += 1 + + sc._fatal_error(ex, False) + finally: + return True + + if nread == 0: + # From libuv docs: + # nread might be 0, which does not indicate an error or EOF. + # This is equivalent to EAGAIN or EWOULDBLOCK under read(2). + return True + + if nread < 0: + # From libuv docs: + # The callee is responsible for stopping closing the stream + # when an error happens by calling uv_read_stop() or uv_close(). + # Trying to read from the stream again is undefined. + # + # Therefore, we're closing the stream. Since "UVHandle._close()" + # doesn't raise exceptions unless uvloop is built with DEBUG=1, + # we don't need try...finally here. + + if UVLOOP_DEBUG: + loop._debug_stream_read_errors_total += 1 + + if sc.__read_error_close: + # Used for getting notified when a pipe is closed. + # See WriteUnixTransport for the explanation. + sc._on_eof() + return True + + exc = convert_error(nread) + sc._fatal_error( + exc, False, "error status in uv_stream_t.read callback") + return True + + return False + + +cdef inline void __uv_stream_on_read_impl( + uv.uv_stream_t* stream, + ssize_t nread, + const uv.uv_buf_t* buf, +): + cdef: + UVStream sc = stream.data + Loop loop = sc._loop + + # It's OK to free the buffer early, since nothing will + # be able to touch it until this method is done. + __loop_free_buffer(loop) + + if __uv_stream_on_read_common(sc, loop, nread): + return + + try: + if UVLOOP_DEBUG: + loop._debug_stream_read_cb_total += 1 + + run_in_context1( + sc.context, + sc._protocol_data_received, + loop._recv_buffer[:nread], + ) + except BaseException as exc: + if UVLOOP_DEBUG: + loop._debug_stream_read_cb_errors_total += 1 + + sc._fatal_error(exc, False) + + +cdef inline void __uv_stream_on_write_impl( + uv.uv_write_t* req, + int status, +): + cdef: + _StreamWriteContext ctx = <_StreamWriteContext> req.data + UVStream stream = ctx.stream + + ctx.close() + + if stream._closed: + # The stream was closed, there is nothing to do. + # Even if there is an error, like EPIPE, there + # is no reason to report it. + return + + if status < 0: + if UVLOOP_DEBUG: + stream._loop._debug_stream_write_errors_total += 1 + + exc = convert_error(status) + stream._fatal_error( + exc, False, "error status in uv_stream_t.write callback") + return + + try: + stream._on_write() + except BaseException as exc: + if UVLOOP_DEBUG: + stream._loop._debug_stream_write_cb_errors_total += 1 + + stream._fatal_error(exc, False) + + +cdef void __uv_stream_on_read( + uv.uv_stream_t* stream, + ssize_t nread, + const uv.uv_buf_t* buf, +) noexcept with gil: + + if __ensure_handle_data(stream, + "UVStream read callback") == 0: + return + + # Don't need try-finally, __uv_stream_on_read_impl is void + __uv_stream_on_read_impl(stream, nread, buf) + + +cdef void __uv_stream_on_write( + uv.uv_write_t* req, + int status, +) noexcept with gil: + + if UVLOOP_DEBUG: + if req.data is NULL: + aio_logger.error( + 'UVStream.write callback called with NULL req.data, status=%r', + status) + return + + # Don't need try-finally, __uv_stream_on_write_impl is void + __uv_stream_on_write_impl(req, status) + + +cdef void __uv_stream_buffered_alloc( + uv.uv_handle_t* stream, + size_t suggested_size, + uv.uv_buf_t* uvbuf, +) noexcept with gil: + + if __ensure_handle_data(stream, + "UVStream alloc buffer callback") == 0: + return + + cdef: + UVStream sc = stream.data + Loop loop = sc._loop + Py_buffer* pybuf = &sc._read_pybuf + int got_buf = 0 + + if sc._read_pybuf_acquired: + uvbuf.len = 0 + uvbuf.base = NULL + return + + sc._read_pybuf_acquired = 0 + try: + buf = run_in_context1( + sc.context, + sc._protocol_get_buffer, + suggested_size, + ) + PyObject_GetBuffer(buf, pybuf, PyBUF_WRITABLE) + got_buf = 1 + except BaseException as exc: + # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF. + # We'll do it later in __uv_stream_buffered_on_read when we + # receive UV_ENOBUFS. + uvbuf.len = 0 + uvbuf.base = NULL + return + + if not pybuf.len: + uvbuf.len = 0 + uvbuf.base = NULL + if got_buf: + PyBuffer_Release(pybuf) + return + + sc._read_pybuf_acquired = 1 + uvbuf.base = pybuf.buf + uvbuf.len = pybuf.len + + +cdef void __uv_stream_buffered_on_read( + uv.uv_stream_t* stream, + ssize_t nread, + const uv.uv_buf_t* buf, +) noexcept with gil: + + if __ensure_handle_data(stream, + "UVStream buffered read callback") == 0: + return + + cdef: + UVStream sc = stream.data + Loop loop = sc._loop + Py_buffer* pybuf = &sc._read_pybuf + + if nread == uv.UV_ENOBUFS: + sc._fatal_error( + RuntimeError( + 'unhandled error (or an empty buffer) in get_buffer()'), + False) + return + + try: + if nread > 0 and not sc._read_pybuf_acquired: + # From libuv docs: + # nread is > 0 if there is data available or < 0 on error. When + # we’ve reached EOF, nread will be set to UV_EOF. When + # nread < 0, the buf parameter might not point to a valid + # buffer; in that case buf.len and buf.base are both set to 0. + raise RuntimeError( + f'no python buffer is allocated in on_read; nread={nread}') + + if nread == 0: + # From libuv docs: + # nread might be 0, which does not indicate an error or EOF. + # This is equivalent to EAGAIN or EWOULDBLOCK under read(2). + return + + if __uv_stream_on_read_common(sc, loop, nread): + return + + if UVLOOP_DEBUG: + loop._debug_stream_read_cb_total += 1 + + run_in_context1(sc.context, sc._protocol_buffer_updated, nread) + except BaseException as exc: + if UVLOOP_DEBUG: + loop._debug_stream_read_cb_errors_total += 1 + + sc._fatal_error(exc, False) + finally: + sc._read_pybuf_acquired = 0 + PyBuffer_Release(pybuf) -- cgit v1.2.3