summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx')
-rw-r--r--venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx1015
1 files changed, 0 insertions, 1015 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx
deleted file mode 100644
index d4e02e3..0000000
--- a/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx
+++ /dev/null
@@ -1,1015 +0,0 @@
-DEF __PREALLOCED_BUFS = 4
-
-
-@cython.no_gc_clear
-@cython.freelist(DEFAULT_FREELIST_SIZE)
-cdef class _StreamWriteContext:
- # used to hold additional write request information for uv_write
-
- cdef:
- uv.uv_write_t req
-
- list buffers
-
- uv.uv_buf_t uv_bufs_sml[__PREALLOCED_BUFS]
- Py_buffer py_bufs_sml[__PREALLOCED_BUFS]
- bint py_bufs_sml_inuse
-
- uv.uv_buf_t* uv_bufs
- Py_buffer* py_bufs
- size_t py_bufs_len
-
- uv.uv_buf_t* uv_bufs_start
- size_t uv_bufs_len
-
- UVStream stream
-
- bint closed
-
- cdef free_bufs(self):
- cdef size_t i
-
- if self.uv_bufs is not NULL:
- PyMem_RawFree(self.uv_bufs)
- self.uv_bufs = NULL
- if UVLOOP_DEBUG:
- if self.py_bufs_sml_inuse:
- raise RuntimeError(
- '_StreamWriteContext.close: uv_bufs != NULL and '
- 'py_bufs_sml_inuse is True')
-
- if self.py_bufs is not NULL:
- for i from 0 <= i < self.py_bufs_len:
- PyBuffer_Release(&self.py_bufs[i])
- PyMem_RawFree(self.py_bufs)
- self.py_bufs = NULL
- if UVLOOP_DEBUG:
- if self.py_bufs_sml_inuse:
- raise RuntimeError(
- '_StreamWriteContext.close: py_bufs != NULL and '
- 'py_bufs_sml_inuse is True')
-
- if self.py_bufs_sml_inuse:
- for i from 0 <= i < self.py_bufs_len:
- PyBuffer_Release(&self.py_bufs_sml[i])
- self.py_bufs_sml_inuse = 0
-
- self.py_bufs_len = 0
- self.buffers = None
-
- cdef close(self):
- if self.closed:
- return
- self.closed = 1
- self.free_bufs()
- Py_DECREF(self)
-
- cdef advance_uv_buf(self, size_t sent):
- # Advance the pointer to first uv_buf and the
- # pointer to first byte in that buffer.
- #
- # We do this after a "uv_try_write" call, which
- # sometimes sends only a portion of data.
- # We then call "advance_uv_buf" on the write
- # context, and reuse it in a "uv_write" call.
-
- cdef:
- uv.uv_buf_t* buf
- size_t idx
-
- for idx from 0 <= idx < self.uv_bufs_len:
- buf = &self.uv_bufs_start[idx]
- if buf.len > sent:
- buf.len -= sent
- buf.base = buf.base + sent
- self.uv_bufs_start = buf
- self.uv_bufs_len -= idx
- return
- else:
- sent -= self.uv_bufs_start[idx].len
-
- if UVLOOP_DEBUG:
- if sent < 0:
- raise RuntimeError('fatal: sent < 0 in advance_uv_buf')
-
- raise RuntimeError('fatal: Could not advance _StreamWriteContext')
-
- @staticmethod
- cdef _StreamWriteContext new(UVStream stream, list buffers):
- cdef:
- _StreamWriteContext ctx
- int uv_bufs_idx = 0
- size_t py_bufs_len = 0
- int i
-
- Py_buffer* p_pybufs
- uv.uv_buf_t* p_uvbufs
-
- ctx = _StreamWriteContext.__new__(_StreamWriteContext)
- ctx.stream = None
- ctx.closed = 1
- ctx.py_bufs_len = 0
- ctx.py_bufs_sml_inuse = 0
- ctx.uv_bufs = NULL
- ctx.py_bufs = NULL
- ctx.buffers = buffers
- ctx.stream = stream
-
- if len(buffers) <= __PREALLOCED_BUFS:
- # We've got a small number of buffers to write, don't
- # need to use malloc.
- ctx.py_bufs_sml_inuse = 1
- p_pybufs = <Py_buffer*>&ctx.py_bufs_sml
- p_uvbufs = <uv.uv_buf_t*>&ctx.uv_bufs_sml
-
- else:
- for buf in buffers:
- if UVLOOP_DEBUG:
- if not isinstance(buf, (bytes, bytearray, memoryview)):
- raise RuntimeError(
- 'invalid data in writebuf: an instance of '
- 'bytes, bytearray or memoryview was expected, '
- 'got {}'.format(type(buf)))
-
- if not PyBytes_CheckExact(buf):
- py_bufs_len += 1
-
- if py_bufs_len > 0:
- ctx.py_bufs = <Py_buffer*>PyMem_RawMalloc(
- py_bufs_len * sizeof(Py_buffer))
- if ctx.py_bufs is NULL:
- raise MemoryError()
-
- ctx.uv_bufs = <uv.uv_buf_t*>PyMem_RawMalloc(
- len(buffers) * sizeof(uv.uv_buf_t))
- if ctx.uv_bufs is NULL:
- raise MemoryError()
-
- p_pybufs = ctx.py_bufs
- p_uvbufs = ctx.uv_bufs
-
- py_bufs_len = 0
- for buf in buffers:
- if PyBytes_CheckExact(buf):
- # We can only use this hack for bytes since it's
- # immutable. For everything else it is only safe to
- # use buffer protocol.
- p_uvbufs[uv_bufs_idx].base = PyBytes_AS_STRING(buf)
- p_uvbufs[uv_bufs_idx].len = Py_SIZE(buf)
-
- else:
- try:
- PyObject_GetBuffer(
- buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE)
- except Exception:
- # This shouldn't ever happen, as `UVStream._buffer_write`
- # casts non-bytes objects to `memoryviews`.
- ctx.py_bufs_len = py_bufs_len
- ctx.free_bufs()
- raise
-
- p_uvbufs[uv_bufs_idx].base = <char*>p_pybufs[py_bufs_len].buf
- p_uvbufs[uv_bufs_idx].len = p_pybufs[py_bufs_len].len
-
- py_bufs_len += 1
-
- uv_bufs_idx += 1
-
- ctx.uv_bufs_start = p_uvbufs
- ctx.uv_bufs_len = uv_bufs_idx
-
- ctx.py_bufs_len = py_bufs_len
- ctx.req.data = <void*> ctx
-
- if UVLOOP_DEBUG:
- stream._loop._debug_stream_write_ctx_total += 1
- stream._loop._debug_stream_write_ctx_cnt += 1
-
- # Do incref after everything else is done.
- # Under no circumstances we want `ctx` to be GCed while
- # libuv is still working with `ctx.uv_bufs`.
- Py_INCREF(ctx)
- ctx.closed = 0
- return ctx
-
- def __dealloc__(self):
- if not self.closed:
- # Because we do an INCREF in _StreamWriteContext.new,
- # __dealloc__ shouldn't ever happen with `self.closed == 1`
- raise RuntimeError(
- 'open _StreamWriteContext is being deallocated')
-
- if UVLOOP_DEBUG:
- if self.stream is not None:
- self.stream._loop._debug_stream_write_ctx_cnt -= 1
- self.stream = None
-
-
-@cython.no_gc_clear
-cdef class UVStream(UVBaseTransport):
-
- def __cinit__(self):
- self.__shutting_down = 0
- self.__reading = 0
- self.__read_error_close = 0
- self.__buffered = 0
- self._eof = 0
- self._buffer = []
- self._buffer_size = 0
-
- self._protocol_get_buffer = None
- self._protocol_buffer_updated = None
-
- self._read_pybuf_acquired = False
-
- cdef _set_protocol(self, object protocol):
- if protocol is None:
- raise TypeError('protocol is required')
-
- UVBaseTransport._set_protocol(self, protocol)
-
- if (hasattr(protocol, 'get_buffer') and
- not isinstance(protocol, aio_Protocol)):
- try:
- self._protocol_get_buffer = protocol.get_buffer
- self._protocol_buffer_updated = protocol.buffer_updated
- self.__buffered = 1
- except AttributeError:
- pass
- else:
- self.__buffered = 0
-
- cdef _clear_protocol(self):
- UVBaseTransport._clear_protocol(self)
- self._protocol_get_buffer = None
- self._protocol_buffer_updated = None
- self.__buffered = 0
-
- cdef inline _shutdown(self):
- cdef int err
-
- if self.__shutting_down:
- return
- self.__shutting_down = 1
-
- self._ensure_alive()
-
- self._shutdown_req.data = <void*> self
- err = uv.uv_shutdown(&self._shutdown_req,
- <uv.uv_stream_t*> self._handle,
- __uv_stream_on_shutdown)
- if err < 0:
- exc = convert_error(err)
- self._fatal_error(exc, True)
- return
-
- cdef inline _accept(self, UVStream server):
- cdef int err
- self._ensure_alive()
-
- err = uv.uv_accept(<uv.uv_stream_t*>server._handle,
- <uv.uv_stream_t*>self._handle)
- if err < 0:
- exc = convert_error(err)
- self._fatal_error(exc, True)
- return
-
- self._on_accept()
-
- cdef inline _close_on_read_error(self):
- self.__read_error_close = 1
-
- cdef bint _is_reading(self):
- return self.__reading
-
- cdef _start_reading(self):
- cdef int err
-
- if self._closing:
- return
-
- self._ensure_alive()
-
- if self.__reading:
- return
-
- if self.__buffered:
- err = uv.uv_read_start(<uv.uv_stream_t*>self._handle,
- __uv_stream_buffered_alloc,
- __uv_stream_buffered_on_read)
- else:
- err = uv.uv_read_start(<uv.uv_stream_t*>self._handle,
- __loop_alloc_buffer,
- __uv_stream_on_read)
- if err < 0:
- exc = convert_error(err)
- self._fatal_error(exc, True)
- return
- else:
- # UVStream must live until the read callback is called
- self.__reading_started()
-
- cdef inline __reading_started(self):
- if self.__reading:
- return
- self.__reading = 1
- Py_INCREF(self)
-
- cdef inline __reading_stopped(self):
- if not self.__reading:
- return
- self.__reading = 0
- Py_DECREF(self)
-
- cdef _stop_reading(self):
- cdef int err
-
- if not self.__reading:
- return
-
- self._ensure_alive()
-
- # From libuv docs:
- # This function is idempotent and may be safely
- # called on a stopped stream.
- err = uv.uv_read_stop(<uv.uv_stream_t*>self._handle)
- if err < 0:
- exc = convert_error(err)
- self._fatal_error(exc, True)
- return
- else:
- self.__reading_stopped()
-
- cdef inline _try_write(self, object data):
- cdef:
- ssize_t written
- bint used_buf = 0
- Py_buffer py_buf
- void* buf
- size_t blen
- int saved_errno
- int fd
-
- if (<uv.uv_stream_t*>self._handle).write_queue_size != 0:
- raise RuntimeError(
- 'UVStream._try_write called with data in uv buffers')
-
- if PyBytes_CheckExact(data):
- # We can only use this hack for bytes since it's
- # immutable. For everything else it is only safe to
- # use buffer protocol.
- buf = <void*>PyBytes_AS_STRING(data)
- blen = Py_SIZE(data)
- else:
- PyObject_GetBuffer(data, &py_buf, PyBUF_SIMPLE)
- used_buf = 1
- buf = py_buf.buf
- blen = py_buf.len
-
- if blen == 0:
- # Empty data, do nothing.
- return 0
-
- fd = self._fileno()
- # Use `unistd.h/write` directly, it's faster than
- # uv_try_write -- less layers of code. The error
- # checking logic is copied from libuv.
- written = system.write(fd, buf, blen)
- while written == -1 and (
- errno.errno == errno.EINTR or
- (system.PLATFORM_IS_APPLE and
- errno.errno == errno.EPROTOTYPE)):
- # From libuv code (unix/stream.c):
- # Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
- # EPROTOTYPE can be returned while trying to write to a socket
- # that is shutting down. If we retry the write, we should get
- # the expected EPIPE instead.
- written = system.write(fd, buf, blen)
- saved_errno = errno.errno
-
- if used_buf:
- PyBuffer_Release(&py_buf)
-
- if written < 0:
- if saved_errno == errno.EAGAIN or \
- saved_errno == system.EWOULDBLOCK:
- return -1
- else:
- exc = convert_error(-saved_errno)
- self._fatal_error(exc, True)
- return
-
- if UVLOOP_DEBUG:
- self._loop._debug_stream_write_tries += 1
-
- if <size_t>written == blen:
- return 0
-
- return written
-
- cdef inline _buffer_write(self, object data):
- cdef int dlen
-
- if not PyBytes_CheckExact(data):
- data = memoryview(data).cast('b')
-
- dlen = len(data)
- if not dlen:
- return
-
- self._buffer_size += dlen
- self._buffer.append(data)
-
- cdef inline _initiate_write(self):
- if (not self._protocol_paused and
- (<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
- self._buffer_size > self._high_water):
- # Fast-path. If:
- # - the protocol isn't yet paused,
- # - there is no data in libuv buffers for this stream,
- # - the protocol will be paused if we continue to buffer data
- #
- # Then:
- # - Try to write all buffered data right now.
- all_sent = self._exec_write()
- if UVLOOP_DEBUG:
- if self._buffer_size != 0 or self._buffer != []:
- raise RuntimeError(
- '_buffer_size is not 0 after a successful _exec_write')
-
- # There is no need to call `_queue_write` anymore,
- # as `uv_write` should be called already.
-
- if not all_sent:
- # If not all of the data was sent successfully,
- # we might need to pause the protocol.
- self._maybe_pause_protocol()
-
- elif self._buffer_size > 0:
- self._maybe_pause_protocol()
- self._loop._queue_write(self)
-
- cdef inline _exec_write(self):
- cdef:
- int err
- int buf_len
- _StreamWriteContext ctx = None
-
- if self._closed:
- # If the handle is closed, just return, it's too
- # late to do anything.
- return
-
- buf_len = len(self._buffer)
- if not buf_len:
- return
-
- if (<uv.uv_stream_t*>self._handle).write_queue_size == 0:
- # libuv internal write buffers for this stream are empty.
- if buf_len == 1:
- # If we only have one piece of data to send, let's
- # use our fast implementation of try_write.
- data = self._buffer[0]
- sent = self._try_write(data)
-
- if sent is None:
- # A `self._fatal_error` was called.
- # It might not raise an exception under some
- # conditions.
- self._buffer_size = 0
- self._buffer.clear()
- if not self._closing:
- # This should never happen.
- raise RuntimeError(
- 'stream is open after UVStream._try_write '
- 'returned None')
- return
-
- if sent == 0:
- # All data was successfully written.
- self._buffer_size = 0
- self._buffer.clear()
- # on_write will call "maybe_resume_protocol".
- self._on_write()
- return True
-
- if sent > 0:
- if UVLOOP_DEBUG:
- if sent == len(data):
- raise RuntimeError(
- '_try_write sent all data and returned '
- 'non-zero')
-
- if PyBytes_CheckExact(data):
- # Cast bytes to memoryview to avoid copying
- # data that wasn't sent.
- data = memoryview(data)
- data = data[sent:]
-
- self._buffer_size -= sent
- self._buffer[0] = data
-
- # At this point it's either data was sent partially,
- # or an EAGAIN has happened.
-
- else:
- ctx = _StreamWriteContext.new(self, self._buffer)
-
- err = uv.uv_try_write(<uv.uv_stream_t*>self._handle,
- ctx.uv_bufs_start,
- ctx.uv_bufs_len)
-
- if err > 0:
- # Some data was successfully sent.
-
- if <size_t>err == self._buffer_size:
- # Everything was sent.
- ctx.close()
- self._buffer.clear()
- self._buffer_size = 0
- # on_write will call "maybe_resume_protocol".
- self._on_write()
- return True
-
- try:
- # Advance pointers to uv_bufs in `ctx`,
- # we will reuse it soon for a uv_write
- # call.
- ctx.advance_uv_buf(<ssize_t>err)
- except Exception as ex: # This should never happen.
- # Let's try to close the `ctx` anyways.
- ctx.close()
- self._fatal_error(ex, True)
- self._buffer.clear()
- self._buffer_size = 0
- return
-
- elif err != uv.UV_EAGAIN:
- ctx.close()
- exc = convert_error(err)
- self._fatal_error(exc, True)
- self._buffer.clear()
- self._buffer_size = 0
- return
-
- # fall through
-
- if ctx is None:
- ctx = _StreamWriteContext.new(self, self._buffer)
-
- err = uv.uv_write(&ctx.req,
- <uv.uv_stream_t*>self._handle,
- ctx.uv_bufs_start,
- ctx.uv_bufs_len,
- __uv_stream_on_write)
-
- self._buffer_size = 0
- # Can't use `_buffer.clear()` here: `ctx` holds a reference to
- # the `_buffer`.
- self._buffer = []
-
- if err < 0:
- # close write context
- ctx.close()
-
- exc = convert_error(err)
- self._fatal_error(exc, True)
- return
-
- self._maybe_resume_protocol()
-
- cdef size_t _get_write_buffer_size(self):
- if self._handle is NULL:
- return 0
- return ((<uv.uv_stream_t*>self._handle).write_queue_size +
- self._buffer_size)
-
- cdef _close(self):
- try:
- if self._read_pybuf_acquired:
- # Should never happen. libuv always calls uv_alloc/uv_read
- # in pairs.
- self._loop.call_exception_handler({
- 'transport': self,
- 'message': 'XXX: an allocated buffer in transport._close()'
- })
- self._read_pybuf_acquired = 0
- PyBuffer_Release(&self._read_pybuf)
-
- self._stop_reading()
- finally:
- UVSocketHandle._close(<UVHandle>self)
-
- cdef inline _on_accept(self):
- # Ultimately called by __uv_stream_on_listen.
- self._init_protocol()
-
- cdef inline _on_eof(self):
- # Any exception raised here will be caught in
- # __uv_stream_on_read.
-
- try:
- meth = self._protocol.eof_received
- except AttributeError:
- keep_open = False
- else:
- keep_open = run_in_context(self.context, meth)
-
- if keep_open:
- # We're keeping the connection open so the
- # protocol can write more, but we still can't
- # receive more, so remove the reader callback.
- self._stop_reading()
- else:
- self.close()
-
- cdef inline _on_write(self):
- self._maybe_resume_protocol()
- if not self._get_write_buffer_size():
- if self._closing:
- self._schedule_call_connection_lost(None)
- elif self._eof:
- self._shutdown()
-
- cdef inline _init(self, Loop loop, object protocol, Server server,
- object waiter, object context):
- self.context = context
- self._set_protocol(protocol)
- self._start_init(loop)
-
- if server is not None:
- self._set_server(server)
-
- if waiter is not None:
- self._set_waiter(waiter)
-
- cdef inline _on_connect(self, object exc):
- # Called from __tcp_connect_callback (tcp.pyx) and
- # __pipe_connect_callback (pipe.pyx).
- if exc is None:
- self._init_protocol()
- else:
- if self._waiter is None:
- self._fatal_error(exc, False, "connect failed")
- elif self._waiter.cancelled():
- # Connect call was cancelled; just close the transport
- # silently.
- self._close()
- elif self._waiter.done():
- self._fatal_error(exc, False, "connect failed")
- else:
- self._waiter.set_exception(exc)
- self._close()
-
- # === Public API ===
-
- def __repr__(self):
- return '<{} closed={} reading={} {:#x}>'.format(
- self.__class__.__name__,
- self._closed,
- self.__reading,
- id(self))
-
- def write(self, object buf):
- self._ensure_alive()
-
- if self._eof:
- raise RuntimeError('Cannot call write() after write_eof()')
- if not buf:
- return
- if self._conn_lost:
- self._conn_lost += 1
- return
- self._buffer_write(buf)
- self._initiate_write()
-
- def writelines(self, bufs):
- self._ensure_alive()
-
- if self._eof:
- raise RuntimeError('Cannot call writelines() after write_eof()')
- if self._conn_lost:
- self._conn_lost += 1
- return
- for buf in bufs:
- self._buffer_write(buf)
- self._initiate_write()
-
- def write_eof(self):
- self._ensure_alive()
-
- if self._eof:
- return
-
- self._eof = 1
- if not self._get_write_buffer_size():
- self._shutdown()
-
- def can_write_eof(self):
- return True
-
- def is_reading(self):
- return self._is_reading()
-
- def pause_reading(self):
- if self._closing or not self._is_reading():
- return
- self._stop_reading()
-
- def resume_reading(self):
- if self._is_reading() or self._closing:
- return
- self._start_reading()
-
-
-cdef void __uv_stream_on_shutdown(uv.uv_shutdown_t* req,
- int status) noexcept with gil:
-
- # callback for uv_shutdown
-
- if req.data is NULL:
- aio_logger.error(
- 'UVStream.shutdown callback called with NULL req.data, status=%r',
- status)
- return
-
- cdef UVStream stream = <UVStream> req.data
-
- if status < 0 and status != uv.UV_ECANCELED:
- # From libuv source code:
- # The ECANCELED error code is a lie, the shutdown(2) syscall is a
- # fait accompli at this point. Maybe we should revisit this in
- # v0.11. A possible reason for leaving it unchanged is that it
- # informs the callee that the handle has been destroyed.
-
- if UVLOOP_DEBUG:
- stream._loop._debug_stream_shutdown_errors_total += 1
-
- exc = convert_error(status)
- stream._fatal_error(
- exc, False, "error status in uv_stream_t.shutdown callback")
- return
-
-
-cdef inline bint __uv_stream_on_read_common(
- UVStream sc,
- Loop loop,
- ssize_t nread,
-):
- if sc._closed:
- # The stream was closed, there is no reason to
- # do any work now.
- sc.__reading_stopped() # Just in case.
- return True
-
- if nread == uv.UV_EOF:
- # From libuv docs:
- # The callee is responsible for stopping closing the stream
- # when an error happens by calling uv_read_stop() or uv_close().
- # Trying to read from the stream again is undefined.
- try:
- if UVLOOP_DEBUG:
- loop._debug_stream_read_eof_total += 1
-
- sc._stop_reading()
- sc._on_eof()
- except BaseException as ex:
- if UVLOOP_DEBUG:
- loop._debug_stream_read_eof_cb_errors_total += 1
-
- sc._fatal_error(ex, False)
- finally:
- return True
-
- if nread == 0:
- # From libuv docs:
- # nread might be 0, which does not indicate an error or EOF.
- # This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
- return True
-
- if nread < 0:
- # From libuv docs:
- # The callee is responsible for stopping closing the stream
- # when an error happens by calling uv_read_stop() or uv_close().
- # Trying to read from the stream again is undefined.
- #
- # Therefore, we're closing the stream. Since "UVHandle._close()"
- # doesn't raise exceptions unless uvloop is built with DEBUG=1,
- # we don't need try...finally here.
-
- if UVLOOP_DEBUG:
- loop._debug_stream_read_errors_total += 1
-
- if sc.__read_error_close:
- # Used for getting notified when a pipe is closed.
- # See WriteUnixTransport for the explanation.
- sc._on_eof()
- return True
-
- exc = convert_error(nread)
- sc._fatal_error(
- exc, False, "error status in uv_stream_t.read callback")
- return True
-
- return False
-
-
-cdef inline void __uv_stream_on_read_impl(
- uv.uv_stream_t* stream,
- ssize_t nread,
- const uv.uv_buf_t* buf,
-):
- cdef:
- UVStream sc = <UVStream>stream.data
- Loop loop = sc._loop
-
- # It's OK to free the buffer early, since nothing will
- # be able to touch it until this method is done.
- __loop_free_buffer(loop)
-
- if __uv_stream_on_read_common(sc, loop, nread):
- return
-
- try:
- if UVLOOP_DEBUG:
- loop._debug_stream_read_cb_total += 1
-
- run_in_context1(
- sc.context,
- sc._protocol_data_received,
- loop._recv_buffer[:nread],
- )
- except BaseException as exc:
- if UVLOOP_DEBUG:
- loop._debug_stream_read_cb_errors_total += 1
-
- sc._fatal_error(exc, False)
-
-
-cdef inline void __uv_stream_on_write_impl(
- uv.uv_write_t* req,
- int status,
-):
- cdef:
- _StreamWriteContext ctx = <_StreamWriteContext> req.data
- UVStream stream = <UVStream>ctx.stream
-
- ctx.close()
-
- if stream._closed:
- # The stream was closed, there is nothing to do.
- # Even if there is an error, like EPIPE, there
- # is no reason to report it.
- return
-
- if status < 0:
- if UVLOOP_DEBUG:
- stream._loop._debug_stream_write_errors_total += 1
-
- exc = convert_error(status)
- stream._fatal_error(
- exc, False, "error status in uv_stream_t.write callback")
- return
-
- try:
- stream._on_write()
- except BaseException as exc:
- if UVLOOP_DEBUG:
- stream._loop._debug_stream_write_cb_errors_total += 1
-
- stream._fatal_error(exc, False)
-
-
-cdef void __uv_stream_on_read(
- uv.uv_stream_t* stream,
- ssize_t nread,
- const uv.uv_buf_t* buf,
-) noexcept with gil:
-
- if __ensure_handle_data(<uv.uv_handle_t*>stream,
- "UVStream read callback") == 0:
- return
-
- # Don't need try-finally, __uv_stream_on_read_impl is void
- __uv_stream_on_read_impl(stream, nread, buf)
-
-
-cdef void __uv_stream_on_write(
- uv.uv_write_t* req,
- int status,
-) noexcept with gil:
-
- if UVLOOP_DEBUG:
- if req.data is NULL:
- aio_logger.error(
- 'UVStream.write callback called with NULL req.data, status=%r',
- status)
- return
-
- # Don't need try-finally, __uv_stream_on_write_impl is void
- __uv_stream_on_write_impl(req, status)
-
-
-cdef void __uv_stream_buffered_alloc(
- uv.uv_handle_t* stream,
- size_t suggested_size,
- uv.uv_buf_t* uvbuf,
-) noexcept with gil:
-
- if __ensure_handle_data(<uv.uv_handle_t*>stream,
- "UVStream alloc buffer callback") == 0:
- return
-
- cdef:
- UVStream sc = <UVStream>stream.data
- Loop loop = sc._loop
- Py_buffer* pybuf = &sc._read_pybuf
- int got_buf = 0
-
- if sc._read_pybuf_acquired:
- uvbuf.len = 0
- uvbuf.base = NULL
- return
-
- sc._read_pybuf_acquired = 0
- try:
- buf = run_in_context1(
- sc.context,
- sc._protocol_get_buffer,
- suggested_size,
- )
- PyObject_GetBuffer(buf, pybuf, PyBUF_WRITABLE)
- got_buf = 1
- except BaseException as exc:
- # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF.
- # We'll do it later in __uv_stream_buffered_on_read when we
- # receive UV_ENOBUFS.
- uvbuf.len = 0
- uvbuf.base = NULL
- return
-
- if not pybuf.len:
- uvbuf.len = 0
- uvbuf.base = NULL
- if got_buf:
- PyBuffer_Release(pybuf)
- return
-
- sc._read_pybuf_acquired = 1
- uvbuf.base = <char*>pybuf.buf
- uvbuf.len = pybuf.len
-
-
-cdef void __uv_stream_buffered_on_read(
- uv.uv_stream_t* stream,
- ssize_t nread,
- const uv.uv_buf_t* buf,
-) noexcept with gil:
-
- if __ensure_handle_data(<uv.uv_handle_t*>stream,
- "UVStream buffered read callback") == 0:
- return
-
- cdef:
- UVStream sc = <UVStream>stream.data
- Loop loop = sc._loop
- Py_buffer* pybuf = &sc._read_pybuf
-
- if nread == uv.UV_ENOBUFS:
- sc._fatal_error(
- RuntimeError(
- 'unhandled error (or an empty buffer) in get_buffer()'),
- False)
- return
-
- try:
- if nread > 0 and not sc._read_pybuf_acquired:
- # From libuv docs:
- # nread is > 0 if there is data available or < 0 on error. When
- # we’ve reached EOF, nread will be set to UV_EOF. When
- # nread < 0, the buf parameter might not point to a valid
- # buffer; in that case buf.len and buf.base are both set to 0.
- raise RuntimeError(
- f'no python buffer is allocated in on_read; nread={nread}')
-
- if nread == 0:
- # From libuv docs:
- # nread might be 0, which does not indicate an error or EOF.
- # This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
- return
-
- if __uv_stream_on_read_common(sc, loop, nread):
- return
-
- if UVLOOP_DEBUG:
- loop._debug_stream_read_cb_total += 1
-
- run_in_context1(sc.context, sc._protocol_buffer_updated, nread)
- except BaseException as exc:
- if UVLOOP_DEBUG:
- loop._debug_stream_read_cb_errors_total += 1
-
- sc._fatal_error(exc, False)
- finally:
- sc._read_pybuf_acquired = 0
- PyBuffer_Release(pybuf)