diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles')
28 files changed, 4579 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/async_.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/async_.pxd new file mode 100644 index 0000000..5f0d820 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/async_.pxd @@ -0,0 +1,11 @@ +cdef class UVAsync(UVHandle): + cdef: + method_t callback + object ctx + + cdef _init(self, Loop loop, method_t callback, object ctx) + + cdef send(self) + + @staticmethod + cdef UVAsync new(Loop loop, method_t callback, object ctx) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/async_.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/async_.pyx new file mode 100644 index 0000000..5c740cf --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/async_.pyx @@ -0,0 +1,56 @@ +@cython.no_gc_clear +cdef class UVAsync(UVHandle): + cdef _init(self, Loop loop, method_t callback, object ctx): + cdef int err + + self._start_init(loop) + + self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_async_t)) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + err = uv.uv_async_init(self._loop.uvloop, + <uv.uv_async_t*>self._handle, + __uvasync_callback) + if err < 0: + self._abort_init() + raise convert_error(err) + + self._finish_init() + + self.callback = callback + self.ctx = ctx + + cdef send(self): + cdef int err + + self._ensure_alive() + + err = uv.uv_async_send(<uv.uv_async_t*>self._handle) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + @staticmethod + cdef UVAsync new(Loop loop, method_t callback, object ctx): + cdef UVAsync handle + handle = UVAsync.__new__(UVAsync) + handle._init(loop, callback, ctx) + return handle + + +cdef void __uvasync_callback( + uv.uv_async_t* handle, +) noexcept with gil: + if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVAsync callback") == 0: + return + + cdef: + UVAsync async_ = <UVAsync> handle.data + method_t cb = async_.callback + try: + cb(async_.ctx) + except BaseException as ex: + async_._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pxd new file mode 100644 index 0000000..ba356a7 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pxd @@ -0,0 +1,54 @@ +cdef class UVBaseTransport(UVSocketHandle): + + cdef: + readonly bint _closing + + bint _protocol_connected + bint _protocol_paused + object _protocol_data_received + size_t _high_water + size_t _low_water + + object _protocol + Server _server + object _waiter + + dict _extra_info + + uint32_t _conn_lost + + object __weakref__ + + # All "inline" methods are final + + cdef inline _maybe_pause_protocol(self) + cdef inline _maybe_resume_protocol(self) + + cdef inline _schedule_call_connection_made(self) + cdef inline _schedule_call_connection_lost(self, exc) + + cdef _wakeup_waiter(self) + cdef _call_connection_made(self) + cdef _call_connection_lost(self, exc) + + # Overloads of UVHandle methods: + cdef _fatal_error(self, exc, throw, reason=?) + cdef _close(self) + + cdef inline _set_server(self, Server server) + cdef inline _set_waiter(self, object waiter) + + cdef _set_protocol(self, object protocol) + cdef _clear_protocol(self) + + cdef inline _init_protocol(self) + cdef inline _add_extra_info(self, str name, object obj) + + # === overloads === + + cdef _new_socket(self) + cdef size_t _get_write_buffer_size(self) + + cdef bint _is_reading(self) + cdef _start_reading(self) + cdef _stop_reading(self) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx new file mode 100644 index 0000000..28b3079 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx @@ -0,0 +1,293 @@ +cdef class UVBaseTransport(UVSocketHandle): + + def __cinit__(self): + # Flow control + self._high_water = FLOW_CONTROL_HIGH_WATER * 1024 + self._low_water = FLOW_CONTROL_HIGH_WATER // 4 + + self._protocol = None + self._protocol_connected = 0 + self._protocol_paused = 0 + self._protocol_data_received = None + + self._server = None + self._waiter = None + self._extra_info = None + + self._conn_lost = 0 + + self._closing = 0 + + cdef size_t _get_write_buffer_size(self): + return 0 + + cdef inline _schedule_call_connection_made(self): + self._loop._call_soon_handle( + new_MethodHandle(self._loop, + "UVTransport._call_connection_made", + <method_t>self._call_connection_made, + self.context, + self)) + + cdef inline _schedule_call_connection_lost(self, exc): + self._loop._call_soon_handle( + new_MethodHandle1(self._loop, + "UVTransport._call_connection_lost", + <method1_t>self._call_connection_lost, + self.context, + self, exc)) + + cdef _fatal_error(self, exc, throw, reason=None): + # Overload UVHandle._fatal_error + + self._force_close(exc) + + if not isinstance(exc, OSError): + + if throw or self._loop is None: + raise exc + + msg = f'Fatal error on transport {self.__class__.__name__}' + if reason is not None: + msg = f'{msg} ({reason})' + + self._loop.call_exception_handler({ + 'message': msg, + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + cdef inline _maybe_pause_protocol(self): + cdef: + size_t size = self._get_write_buffer_size() + + if size <= self._high_water: + return + + if not self._protocol_paused: + self._protocol_paused = 1 + try: + # _maybe_pause_protocol() is always triggered from user-calls, + # so we must copy the context to avoid entering context twice + run_in_context( + self.context.copy(), self._protocol.pause_writing, + ) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.pause_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + cdef inline _maybe_resume_protocol(self): + cdef: + size_t size = self._get_write_buffer_size() + + if self._protocol_paused and size <= self._low_water: + self._protocol_paused = 0 + try: + # We're copying the context to avoid entering context twice, + # even though it's not always necessary to copy - it's easier + # to copy here than passing down a copied context. + run_in_context( + self.context.copy(), self._protocol.resume_writing, + ) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.resume_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + cdef _wakeup_waiter(self): + if self._waiter is not None: + if not self._waiter.cancelled(): + if not self._is_alive(): + self._waiter.set_exception( + RuntimeError( + 'closed Transport handle and unset waiter')) + else: + self._waiter.set_result(True) + self._waiter = None + + cdef _call_connection_made(self): + if self._protocol is None: + raise RuntimeError( + 'protocol is not set, cannot call connection_made()') + + # We use `_is_alive()` and not `_closing`, because we call + # `transport._close()` in `loop.create_connection()` if an + # exception happens during `await waiter`. + if not self._is_alive(): + # A connection waiter can be cancelled between + # 'await loop.create_connection()' and + # `_schedule_call_connection_made` and + # the actual `_call_connection_made`. + self._wakeup_waiter() + return + + # Set _protocol_connected to 1 before calling "connection_made": + # if transport is aborted or closed, "connection_lost" will + # still be scheduled. + self._protocol_connected = 1 + + try: + self._protocol.connection_made(self) + except BaseException: + self._wakeup_waiter() + raise + + if not self._is_alive(): + # This might happen when "transport.abort()" is called + # from "Protocol.connection_made". + self._wakeup_waiter() + return + + self._start_reading() + self._wakeup_waiter() + + cdef _call_connection_lost(self, exc): + if self._waiter is not None: + if not self._waiter.done(): + self._waiter.set_exception(exc) + self._waiter = None + + if self._closed: + # The handle is closed -- likely, _call_connection_lost + # was already called before. + return + + try: + if self._protocol_connected: + self._protocol.connection_lost(exc) + finally: + self._clear_protocol() + + self._close() + + server = self._server + if server is not None: + (<Server>server)._detach() + self._server = None + + cdef inline _set_server(self, Server server): + self._server = server + (<Server>server)._attach() + + cdef inline _set_waiter(self, object waiter): + if waiter is not None and not isfuture(waiter): + raise TypeError( + f'invalid waiter object {waiter!r}, expected asyncio.Future') + + self._waiter = waiter + + cdef _set_protocol(self, object protocol): + self._protocol = protocol + # Store a reference to the bound method directly + try: + self._protocol_data_received = protocol.data_received + except AttributeError: + pass + + cdef _clear_protocol(self): + self._protocol = None + self._protocol_data_received = None + + cdef inline _init_protocol(self): + self._loop._track_transport(self) + if self._protocol is None: + raise RuntimeError('invalid _init_protocol call') + self._schedule_call_connection_made() + + cdef inline _add_extra_info(self, str name, object obj): + if self._extra_info is None: + self._extra_info = {} + self._extra_info[name] = obj + + cdef bint _is_reading(self): + raise NotImplementedError + + cdef _start_reading(self): + raise NotImplementedError + + cdef _stop_reading(self): + raise NotImplementedError + + # === Public API === + + property _paused: + # Used by SSLProto. Might be removed in the future. + def __get__(self): + return bool(not self._is_reading()) + + def get_protocol(self): + return self._protocol + + def set_protocol(self, protocol): + self._set_protocol(protocol) + if self._is_reading(): + self._stop_reading() + self._start_reading() + + def _force_close(self, exc): + # Used by SSLProto. Might be removed in the future. + if self._conn_lost or self._closed: + return + if not self._closing: + self._closing = 1 + self._stop_reading() + self._conn_lost += 1 + self._schedule_call_connection_lost(exc) + + def abort(self): + self._force_close(None) + + def close(self): + if self._closing or self._closed: + return + + self._closing = 1 + self._stop_reading() + + if not self._get_write_buffer_size(): + # The write buffer is empty + self._conn_lost += 1 + self._schedule_call_connection_lost(None) + + def is_closing(self): + return self._closing + + def get_write_buffer_size(self): + return self._get_write_buffer_size() + + def set_write_buffer_limits(self, high=None, low=None): + self._ensure_alive() + + self._high_water, self._low_water = add_flowcontrol_defaults( + high, low, FLOW_CONTROL_HIGH_WATER) + + self._maybe_pause_protocol() + + def get_write_buffer_limits(self): + return (self._low_water, self._high_water) + + def get_extra_info(self, name, default=None): + if self._extra_info is not None and name in self._extra_info: + return self._extra_info[name] + if name == 'socket': + return self._get_socket() + if name == 'sockname': + return self._get_socket().getsockname() + if name == 'peername': + try: + return self._get_socket().getpeername() + except socket_error: + return default + return default diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/check.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/check.pxd new file mode 100644 index 0000000..86cfd8f --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/check.pxd @@ -0,0 +1,14 @@ +cdef class UVCheck(UVHandle): + cdef: + Handle h + bint running + + # All "inline" methods are final + + cdef _init(self, Loop loop, Handle h) + + cdef inline stop(self) + cdef inline start(self) + + @staticmethod + cdef UVCheck new(Loop loop, Handle h) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/check.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/check.pyx new file mode 100644 index 0000000..1a61c4e --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/check.pyx @@ -0,0 +1,72 @@ +@cython.no_gc_clear +cdef class UVCheck(UVHandle): + cdef _init(self, Loop loop, Handle h): + cdef int err + + self._start_init(loop) + + self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_check_t)) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + err = uv.uv_check_init(self._loop.uvloop, <uv.uv_check_t*>self._handle) + if err < 0: + self._abort_init() + raise convert_error(err) + + self._finish_init() + + self.h = h + self.running = 0 + + cdef inline stop(self): + cdef int err + + if not self._is_alive(): + self.running = 0 + return + + if self.running == 1: + err = uv.uv_check_stop(<uv.uv_check_t*>self._handle) + self.running = 0 + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef inline start(self): + cdef int err + + self._ensure_alive() + + if self.running == 0: + err = uv.uv_check_start(<uv.uv_check_t*>self._handle, + cb_check_callback) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + self.running = 1 + + @staticmethod + cdef UVCheck new(Loop loop, Handle h): + cdef UVCheck handle + handle = UVCheck.__new__(UVCheck) + handle._init(loop, h) + return handle + + +cdef void cb_check_callback( + uv.uv_check_t* handle, +) noexcept with gil: + if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVCheck callback") == 0: + return + + cdef: + UVCheck check = <UVCheck> handle.data + Handle h = check.h + try: + h._run() + except BaseException as ex: + check._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pxd new file mode 100644 index 0000000..3a32428 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pxd @@ -0,0 +1,12 @@ +cdef class UVFSEvent(UVHandle): + cdef: + object callback + bint running + + cdef _init(self, Loop loop, object callback, object context) + cdef _close(self) + cdef start(self, char* path, int flags) + cdef stop(self) + + @staticmethod + cdef UVFSEvent new(Loop loop, object callback, object context) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pyx new file mode 100644 index 0000000..6ed6433 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pyx @@ -0,0 +1,116 @@ +import enum + + +class FileSystemEvent(enum.IntEnum): + RENAME = uv.UV_RENAME + CHANGE = uv.UV_CHANGE + RENAME_CHANGE = RENAME | CHANGE + + +@cython.no_gc_clear +cdef class UVFSEvent(UVHandle): + cdef _init(self, Loop loop, object callback, object context): + cdef int err + + self._start_init(loop) + + self._handle = <uv.uv_handle_t*>PyMem_RawMalloc( + sizeof(uv.uv_fs_event_t) + ) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + err = uv.uv_fs_event_init( + self._loop.uvloop, <uv.uv_fs_event_t*>self._handle + ) + if err < 0: + self._abort_init() + raise convert_error(err) + + self._finish_init() + + self.running = 0 + self.callback = callback + if context is None: + context = Context_CopyCurrent() + self.context = context + + cdef start(self, char* path, int flags): + cdef int err + + self._ensure_alive() + + if self.running == 0: + err = uv.uv_fs_event_start( + <uv.uv_fs_event_t*>self._handle, + __uvfsevent_callback, + path, + flags, + ) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + self.running = 1 + + cdef stop(self): + cdef int err + + if not self._is_alive(): + self.running = 0 + return + + if self.running == 1: + err = uv.uv_fs_event_stop(<uv.uv_fs_event_t*>self._handle) + self.running = 0 + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef _close(self): + try: + self.stop() + finally: + UVHandle._close(<UVHandle>self) + + def cancel(self): + self._close() + + def cancelled(self): + return self.running == 0 + + @staticmethod + cdef UVFSEvent new(Loop loop, object callback, object context): + cdef UVFSEvent handle + handle = UVFSEvent.__new__(UVFSEvent) + handle._init(loop, callback, context) + return handle + + +cdef void __uvfsevent_callback( + uv.uv_fs_event_t* handle, + const char *filename, + int events, + int status, +) noexcept with gil: + if __ensure_handle_data( + <uv.uv_handle_t*>handle, "UVFSEvent callback" + ) == 0: + return + + cdef: + UVFSEvent fs_event = <UVFSEvent> handle.data + Handle h + + try: + h = new_Handle( + fs_event._loop, + fs_event.callback, + (filename, FileSystemEvent(events)), + fs_event.context, + ) + h._run() + except BaseException as ex: + fs_event._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/handle.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/handle.pxd new file mode 100644 index 0000000..5af1c14 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/handle.pxd @@ -0,0 +1,48 @@ +cdef class UVHandle: + cdef: + uv.uv_handle_t *_handle + Loop _loop + readonly _source_traceback + bint _closed + bint _inited + object context + + # Added to enable current UDPTransport implementation, + # which doesn't use libuv handles. + bint _has_handle + + # All "inline" methods are final + + cdef inline _start_init(self, Loop loop) + cdef inline _abort_init(self) + cdef inline _finish_init(self) + + cdef inline bint _is_alive(self) + cdef inline _ensure_alive(self) + + cdef _error(self, exc, throw) + cdef _fatal_error(self, exc, throw, reason=?) + + cdef _warn_unclosed(self) + + cdef _free(self) + cdef _close(self) + + +cdef class UVSocketHandle(UVHandle): + cdef: + # Points to a Python file-object that should be closed + # when the transport is closing. Used by pipes. This + # should probably be refactored somehow. + object _fileobj + object __cached_socket + + # All "inline" methods are final + + cdef _fileno(self) + + cdef _new_socket(self) + cdef inline _get_socket(self) + cdef inline _attach_fileobj(self, object file) + + cdef _open(self, int sockfd) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/handle.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/handle.pyx new file mode 100644 index 0000000..6efe375 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/handle.pyx @@ -0,0 +1,395 @@ +cdef class UVHandle: + """A base class for all libuv handles. + + Automatically manages memory deallocation and closing. + + Important: + + 1. call "_ensure_alive()" before calling any libuv functions on + your handles. + + 2. call "__ensure_handle_data" in *all* libuv handle callbacks. + """ + + def __cinit__(self): + self._closed = 0 + self._inited = 0 + self._has_handle = 1 + self._handle = NULL + self._loop = None + self._source_traceback = None + + def __init__(self): + raise TypeError( + '{} is not supposed to be instantiated from Python'.format( + self.__class__.__name__)) + + def __dealloc__(self): + if UVLOOP_DEBUG: + if self._loop is not None: + if self._inited: + self._loop._debug_handles_current.subtract([ + self.__class__.__name__]) + else: + # No "@cython.no_gc_clear" decorator on this UVHandle + raise RuntimeError( + '{} without @no_gc_clear; loop was set to None by GC' + .format(self.__class__.__name__)) + + if self._handle is NULL: + return + + # -> When we're at this point, something is wrong <- + + if self._handle.loop is NULL: + # The handle wasn't initialized with "uv_{handle}_init" + self._closed = 1 + self._free() + raise RuntimeError( + '{} is open in __dealloc__ with loop set to NULL' + .format(self.__class__.__name__)) + + if self._closed: + # So _handle is not NULL and self._closed == 1? + raise RuntimeError( + '{}.__dealloc__: _handle is NULL, _closed == 1'.format( + self.__class__.__name__)) + + # The handle is dealloced while open. Let's try to close it. + # Situations when this is possible include unhandled exceptions, + # errors during Handle.__cinit__/__init__ etc. + if self._inited: + self._handle.data = NULL + uv.uv_close(self._handle, __uv_close_handle_cb) # void; no errors + self._handle = NULL + self._warn_unclosed() + else: + # The handle was allocated, but not initialized + self._closed = 1 + self._free() + + cdef _free(self): + if self._handle == NULL: + return + + if UVLOOP_DEBUG and self._inited: + self._loop._debug_uv_handles_freed += 1 + + PyMem_RawFree(self._handle) + self._handle = NULL + + cdef _warn_unclosed(self): + if self._source_traceback is not None: + try: + tb = ''.join(tb_format_list(self._source_traceback)) + tb = 'object created at (most recent call last):\n{}'.format( + tb.rstrip()) + except Exception as ex: + msg = ( + 'unclosed resource {!r}; could not serialize ' + 'debug traceback: {}: {}' + ).format(self, type(ex).__name__, ex) + else: + msg = 'unclosed resource {!r}; {}'.format(self, tb) + else: + msg = 'unclosed resource {!r}'.format(self) + warnings_warn(msg, ResourceWarning) + + cdef inline _abort_init(self): + if self._handle is not NULL: + self._free() + + try: + if UVLOOP_DEBUG: + name = self.__class__.__name__ + if self._inited: + raise RuntimeError( + '_abort_init: {}._inited is set'.format(name)) + if self._closed: + raise RuntimeError( + '_abort_init: {}._closed is set'.format(name)) + finally: + self._closed = 1 + + cdef inline _finish_init(self): + self._inited = 1 + if self._has_handle == 1: + self._handle.data = <void*>self + if self._loop._debug: + self._source_traceback = extract_stack() + if UVLOOP_DEBUG: + cls_name = self.__class__.__name__ + self._loop._debug_uv_handles_total += 1 + self._loop._debug_handles_total.update([cls_name]) + self._loop._debug_handles_current.update([cls_name]) + + cdef inline _start_init(self, Loop loop): + if UVLOOP_DEBUG: + if self._loop is not None: + raise RuntimeError( + '{}._start_init can only be called once'.format( + self.__class__.__name__)) + + self._loop = loop + + cdef inline bint _is_alive(self): + cdef bint res + res = self._closed != 1 and self._inited == 1 + if UVLOOP_DEBUG: + if res and self._has_handle == 1: + name = self.__class__.__name__ + if self._handle is NULL: + raise RuntimeError( + '{} is alive, but _handle is NULL'.format(name)) + if self._loop is None: + raise RuntimeError( + '{} is alive, but _loop is None'.format(name)) + if self._handle.loop is not self._loop.uvloop: + raise RuntimeError( + '{} is alive, but _handle.loop is not ' + 'initialized'.format(name)) + if self._handle.data is not <void*>self: + raise RuntimeError( + '{} is alive, but _handle.data is not ' + 'initialized'.format(name)) + return res + + cdef inline _ensure_alive(self): + if not self._is_alive(): + raise RuntimeError( + 'unable to perform operation on {!r}; ' + 'the handler is closed'.format(self)) + + cdef _fatal_error(self, exc, throw, reason=None): + # Fatal error means an error that was returned by the + # underlying libuv handle function. We usually can't + # recover from that, hence we just close the handle. + self._close() + + if throw or self._loop is None: + raise exc + else: + self._loop._handle_exception(exc) + + cdef _error(self, exc, throw): + # A non-fatal error is usually an error that was caught + # by the handler, but was originated in the client code + # (not in libuv). In this case we either want to simply + # raise or log it. + if throw or self._loop is None: + raise exc + else: + self._loop._handle_exception(exc) + + cdef _close(self): + if self._closed == 1: + return + + self._closed = 1 + + if self._handle is NULL: + return + + if UVLOOP_DEBUG: + if self._handle.data is NULL: + raise RuntimeError( + '{}._close: _handle.data is NULL'.format( + self.__class__.__name__)) + + if <object>self._handle.data is not self: + raise RuntimeError( + '{}._close: _handle.data is not UVHandle/self'.format( + self.__class__.__name__)) + + if uv.uv_is_closing(self._handle): + raise RuntimeError( + '{}._close: uv_is_closing() is true'.format( + self.__class__.__name__)) + + # We want the handle wrapper (UVHandle) to stay alive until + # the closing callback fires. + Py_INCREF(self) + uv.uv_close(self._handle, __uv_close_handle_cb) # void; no errors + + def __repr__(self): + return '<{} closed={} {:#x}>'.format( + self.__class__.__name__, + self._closed, + id(self)) + + +cdef class UVSocketHandle(UVHandle): + + def __cinit__(self): + self._fileobj = None + self.__cached_socket = None + + cdef _fileno(self): + cdef: + int fd + int err + + self._ensure_alive() + err = uv.uv_fileno(self._handle, <uv.uv_os_fd_t*>&fd) + if err < 0: + raise convert_error(err) + + return fd + + cdef _new_socket(self): + raise NotImplementedError + + cdef inline _get_socket(self): + if self.__cached_socket is not None: + return self.__cached_socket + + if not self._is_alive(): + return None + + self.__cached_socket = self._new_socket() + if UVLOOP_DEBUG: + # We don't "dup" for the "__cached_socket". + assert self.__cached_socket.fileno() == self._fileno() + return self.__cached_socket + + cdef inline _attach_fileobj(self, object file): + # When we create a TCP/PIPE/etc connection/server based on + # a Python file object, we need to close the file object when + # the uv handle is closed. + socket_inc_io_ref(file) + self._fileobj = file + + cdef _close(self): + if self.__cached_socket is not None: + (<PseudoSocket>self.__cached_socket)._fd = -1 + + UVHandle._close(self) + + try: + # This code will only run for transports created from + # Python sockets, i.e. with `loop.create_server(sock=sock)` etc. + if self._fileobj is not None: + if isinstance(self._fileobj, socket_socket): + # Detaching the socket object is the ideal solution: + # * libuv will actually close the FD; + # * detach() call will reset FD for the Python socket + # object, which means that it won't be closed 2nd time + # when the socket object is GCed. + # + # No need to call `socket_dec_io_ref()`, as + # `socket.detach()` ignores `socket._io_refs`. + self._fileobj.detach() + else: + try: + # `socket.close()` will raise an EBADF because libuv + # has already closed the underlying FD. + self._fileobj.close() + except OSError as ex: + if ex.errno != errno_EBADF: + raise + except Exception as ex: + self._loop.call_exception_handler({ + 'exception': ex, + 'transport': self, + 'message': f'could not close attached file object ' + f'{self._fileobj!r}', + }) + finally: + self._fileobj = None + + cdef _open(self, int sockfd): + raise NotImplementedError + + +cdef inline bint __ensure_handle_data(uv.uv_handle_t* handle, + const char* handle_ctx): + + cdef Loop loop + + if UVLOOP_DEBUG: + if handle.loop is NULL: + raise RuntimeError( + 'handle.loop is NULL in __ensure_handle_data') + + if handle.loop.data is NULL: + raise RuntimeError( + 'handle.loop.data is NULL in __ensure_handle_data') + + if handle.data is NULL: + loop = <Loop>handle.loop.data + loop.call_exception_handler({ + 'message': '{} called with handle.data == NULL'.format( + handle_ctx.decode('latin-1')) + }) + return 0 + + if handle.data is NULL: + # The underlying UVHandle object was GCed with an open uv_handle_t. + loop = <Loop>handle.loop.data + loop.call_exception_handler({ + 'message': '{} called after destroying the UVHandle'.format( + handle_ctx.decode('latin-1')) + }) + return 0 + + return 1 + + +cdef void __uv_close_handle_cb(uv.uv_handle_t* handle) noexcept with gil: + cdef UVHandle h + + if handle.data is NULL: + # The original UVHandle is long dead. Just free the mem of + # the uv_handle_t* handler. + + if UVLOOP_DEBUG: + if handle.loop == NULL or handle.loop.data == NULL: + raise RuntimeError( + '__uv_close_handle_cb: handle.loop is invalid') + (<Loop>handle.loop.data)._debug_uv_handles_freed += 1 + + PyMem_RawFree(handle) + else: + h = <UVHandle>handle.data + try: + if UVLOOP_DEBUG: + if not h._has_handle: + raise RuntimeError( + 'has_handle=0 in __uv_close_handle_cb') + h._loop._debug_handles_closed.update([ + h.__class__.__name__]) + h._free() + finally: + Py_DECREF(h) # Was INCREFed in UVHandle._close + + +cdef void __close_all_handles(Loop loop): + uv.uv_walk(loop.uvloop, + __uv_walk_close_all_handles_cb, + <void*>loop) # void + + +cdef void __uv_walk_close_all_handles_cb( + uv.uv_handle_t* handle, + void* arg, +) noexcept with gil: + + cdef: + Loop loop = <Loop>arg + UVHandle h + + if uv.uv_is_closing(handle): + # The handle is closed or is closing. + return + + if handle.data is NULL: + # This shouldn't happen. Ever. + loop.call_exception_handler({ + 'message': 'handle.data is NULL in __close_all_handles_cb' + }) + return + + h = <UVHandle>handle.data + if not h._closed: + h._warn_unclosed() + h._close() diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/idle.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/idle.pxd new file mode 100644 index 0000000..cf7b19f --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/idle.pxd @@ -0,0 +1,14 @@ +cdef class UVIdle(UVHandle): + cdef: + Handle h + bint running + + # All "inline" methods are final + + cdef _init(self, Loop loop, Handle h) + + cdef inline stop(self) + cdef inline start(self) + + @staticmethod + cdef UVIdle new(Loop loop, Handle h) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/idle.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/idle.pyx new file mode 100644 index 0000000..91c641f --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/idle.pyx @@ -0,0 +1,72 @@ +@cython.no_gc_clear +cdef class UVIdle(UVHandle): + cdef _init(self, Loop loop, Handle h): + cdef int err + + self._start_init(loop) + + self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_idle_t)) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + err = uv.uv_idle_init(self._loop.uvloop, <uv.uv_idle_t*>self._handle) + if err < 0: + self._abort_init() + raise convert_error(err) + + self._finish_init() + + self.h = h + self.running = 0 + + cdef inline stop(self): + cdef int err + + if not self._is_alive(): + self.running = 0 + return + + if self.running == 1: + err = uv.uv_idle_stop(<uv.uv_idle_t*>self._handle) + self.running = 0 + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef inline start(self): + cdef int err + + self._ensure_alive() + + if self.running == 0: + err = uv.uv_idle_start(<uv.uv_idle_t*>self._handle, + cb_idle_callback) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + self.running = 1 + + @staticmethod + cdef UVIdle new(Loop loop, Handle h): + cdef UVIdle handle + handle = UVIdle.__new__(UVIdle) + handle._init(loop, h) + return handle + + +cdef void cb_idle_callback( + uv.uv_idle_t* handle, +) noexcept with gil: + if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVIdle callback") == 0: + return + + cdef: + UVIdle idle = <UVIdle> handle.data + Handle h = idle.h + try: + h._run() + except BaseException as ex: + idle._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pxd new file mode 100644 index 0000000..56fc265 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pxd @@ -0,0 +1,33 @@ +cdef class UnixServer(UVStreamServer): + + cdef bind(self, str path) + + @staticmethod + cdef UnixServer new(Loop loop, object protocol_factory, Server server, + object backlog, + object ssl, + object ssl_handshake_timeout, + object ssl_shutdown_timeout) + + +cdef class UnixTransport(UVStream): + + @staticmethod + cdef UnixTransport new(Loop loop, object protocol, Server server, + object waiter, object context) + + cdef connect(self, char* addr) + + +cdef class ReadUnixTransport(UVStream): + + @staticmethod + cdef ReadUnixTransport new(Loop loop, object protocol, Server server, + object waiter) + + +cdef class WriteUnixTransport(UVStream): + + @staticmethod + cdef WriteUnixTransport new(Loop loop, object protocol, Server server, + object waiter) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx new file mode 100644 index 0000000..195576c --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx @@ -0,0 +1,226 @@ +cdef __pipe_init_uv_handle(UVStream handle, Loop loop): + cdef int err + + handle._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_pipe_t)) + if handle._handle is NULL: + handle._abort_init() + raise MemoryError() + + # Initialize pipe handle with ipc=0. + # ipc=1 means that libuv will use recvmsg/sendmsg + # instead of recv/send. + err = uv.uv_pipe_init(handle._loop.uvloop, + <uv.uv_pipe_t*>handle._handle, + 0) + # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe + # even if it is O_WRONLY, see also #317, libuv/libuv#2058 + handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE + if err < 0: + handle._abort_init() + raise convert_error(err) + + handle._finish_init() + + +cdef __pipe_open(UVStream handle, int fd): + cdef int err + err = uv.uv_pipe_open(<uv.uv_pipe_t *>handle._handle, + <uv.uv_file>fd) + if err < 0: + exc = convert_error(err) + raise exc + + +cdef __pipe_get_socket(UVSocketHandle handle): + fileno = handle._fileno() + return PseudoSocket(uv.AF_UNIX, uv.SOCK_STREAM, 0, fileno) + + +@cython.no_gc_clear +cdef class UnixServer(UVStreamServer): + + @staticmethod + cdef UnixServer new(Loop loop, object protocol_factory, Server server, + object backlog, + object ssl, + object ssl_handshake_timeout, + object ssl_shutdown_timeout): + + cdef UnixServer handle + handle = UnixServer.__new__(UnixServer) + handle._init(loop, protocol_factory, server, backlog, + ssl, ssl_handshake_timeout, ssl_shutdown_timeout) + __pipe_init_uv_handle(<UVStream>handle, loop) + return handle + + cdef _new_socket(self): + return __pipe_get_socket(<UVSocketHandle>self) + + cdef _open(self, int sockfd): + self._ensure_alive() + __pipe_open(<UVStream>self, sockfd) + self._mark_as_open() + + cdef bind(self, str path): + cdef int err + self._ensure_alive() + err = uv.uv_pipe_bind(<uv.uv_pipe_t *>self._handle, + path.encode()) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + self._mark_as_open() + + cdef UVStream _make_new_transport(self, object protocol, object waiter, + object context): + cdef UnixTransport tr + tr = UnixTransport.new(self._loop, protocol, self._server, waiter, + context) + return <UVStream>tr + + +@cython.no_gc_clear +cdef class UnixTransport(UVStream): + + @staticmethod + cdef UnixTransport new(Loop loop, object protocol, Server server, + object waiter, object context): + + cdef UnixTransport handle + handle = UnixTransport.__new__(UnixTransport) + handle._init(loop, protocol, server, waiter, context) + __pipe_init_uv_handle(<UVStream>handle, loop) + return handle + + cdef _new_socket(self): + return __pipe_get_socket(<UVSocketHandle>self) + + cdef _open(self, int sockfd): + __pipe_open(<UVStream>self, sockfd) + + cdef connect(self, char* addr): + cdef _PipeConnectRequest req + req = _PipeConnectRequest(self._loop, self) + req.connect(addr) + + +@cython.no_gc_clear +cdef class ReadUnixTransport(UVStream): + + @staticmethod + cdef ReadUnixTransport new(Loop loop, object protocol, Server server, + object waiter): + cdef ReadUnixTransport handle + handle = ReadUnixTransport.__new__(ReadUnixTransport) + # This is only used in connect_read_pipe() and subprocess_shell/exec() + # directly, we could simply copy the current context. + handle._init(loop, protocol, server, waiter, Context_CopyCurrent()) + __pipe_init_uv_handle(<UVStream>handle, loop) + return handle + + cdef _new_socket(self): + return __pipe_get_socket(<UVSocketHandle>self) + + cdef _open(self, int sockfd): + __pipe_open(<UVStream>self, sockfd) + + def get_write_buffer_limits(self): + raise NotImplementedError + + def set_write_buffer_limits(self, high=None, low=None): + raise NotImplementedError + + def get_write_buffer_size(self): + raise NotImplementedError + + def write(self, data): + raise NotImplementedError + + def writelines(self, list_of_data): + raise NotImplementedError + + def write_eof(self): + raise NotImplementedError + + def can_write_eof(self): + raise NotImplementedError + + def abort(self): + raise NotImplementedError + + +@cython.no_gc_clear +cdef class WriteUnixTransport(UVStream): + + @staticmethod + cdef WriteUnixTransport new(Loop loop, object protocol, Server server, + object waiter): + cdef WriteUnixTransport handle + handle = WriteUnixTransport.__new__(WriteUnixTransport) + + # We listen for read events on write-end of the pipe. When + # the read-end is close, the uv_stream_t.read callback will + # receive an error -- we want to silence that error, and just + # close the transport. + handle._close_on_read_error() + + # This is only used in connect_write_pipe() and subprocess_shell/exec() + # directly, we could simply copy the current context. + handle._init(loop, protocol, server, waiter, Context_CopyCurrent()) + __pipe_init_uv_handle(<UVStream>handle, loop) + return handle + + cdef _new_socket(self): + return __pipe_get_socket(<UVSocketHandle>self) + + cdef _open(self, int sockfd): + __pipe_open(<UVStream>self, sockfd) + + def pause_reading(self): + raise NotImplementedError + + def resume_reading(self): + raise NotImplementedError + + +cdef class _PipeConnectRequest(UVRequest): + cdef: + UnixTransport transport + uv.uv_connect_t _req_data + + def __cinit__(self, loop, transport): + self.request = <uv.uv_req_t*> &self._req_data + self.request.data = <void*>self + self.transport = transport + + cdef connect(self, char* addr): + # uv_pipe_connect returns void + uv.uv_pipe_connect(<uv.uv_connect_t*>self.request, + <uv.uv_pipe_t*>self.transport._handle, + addr, + __pipe_connect_callback) + +cdef void __pipe_connect_callback( + uv.uv_connect_t* req, + int status, +) noexcept with gil: + cdef: + _PipeConnectRequest wrapper + UnixTransport transport + + wrapper = <_PipeConnectRequest> req.data + transport = wrapper.transport + + if status < 0: + exc = convert_error(status) + else: + exc = None + + try: + transport._on_connect(exc) + except BaseException as ex: + wrapper.transport._fatal_error(ex, False) + finally: + wrapper.on_done() diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/poll.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/poll.pxd new file mode 100644 index 0000000..d07030b --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/poll.pxd @@ -0,0 +1,25 @@ +cdef class UVPoll(UVHandle): + cdef: + int fd + Handle reading_handle + Handle writing_handle + + cdef _init(self, Loop loop, int fd) + cdef _close(self) + + cdef inline _poll_start(self, int flags) + cdef inline _poll_stop(self) + + cdef int is_active(self) + + cdef is_reading(self) + cdef is_writing(self) + + cdef start_reading(self, Handle callback) + cdef start_writing(self, Handle callback) + cdef stop_reading(self) + cdef stop_writing(self) + cdef stop(self) + + @staticmethod + cdef UVPoll new(Loop loop, int fd) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/poll.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/poll.pyx new file mode 100644 index 0000000..fca5981 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/poll.pyx @@ -0,0 +1,233 @@ +@cython.no_gc_clear +cdef class UVPoll(UVHandle): + cdef _init(self, Loop loop, int fd): + cdef int err + + self._start_init(loop) + + self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_poll_t)) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + err = uv.uv_poll_init(self._loop.uvloop, + <uv.uv_poll_t *>self._handle, fd) + if err < 0: + self._abort_init() + raise convert_error(err) + + self._finish_init() + + self.fd = fd + self.reading_handle = None + self.writing_handle = None + + @staticmethod + cdef UVPoll new(Loop loop, int fd): + cdef UVPoll handle + handle = UVPoll.__new__(UVPoll) + handle._init(loop, fd) + return handle + + cdef int is_active(self): + return (self.reading_handle is not None or + self.writing_handle is not None) + + cdef inline _poll_start(self, int flags): + cdef int err + + self._ensure_alive() + + err = uv.uv_poll_start( + <uv.uv_poll_t*>self._handle, + flags, + __on_uvpoll_event) + + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef inline _poll_stop(self): + cdef int err + + if not self._is_alive(): + return + + err = uv.uv_poll_stop(<uv.uv_poll_t*>self._handle) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef: + int backend_id + system.epoll_event dummy_event + + if system.PLATFORM_IS_LINUX: + # libuv doesn't remove the FD from epoll immediately + # after uv_poll_stop or uv_poll_close, causing hard + # to debug issue with dup-ed file descriptors causing + # CPU burn in epoll/epoll_ctl: + # https://github.com/MagicStack/uvloop/issues/61 + # + # It's safe though to manually call epoll_ctl here, + # after calling uv_poll_stop. + + backend_id = uv.uv_backend_fd(self._loop.uvloop) + if backend_id != -1: + memset(&dummy_event, 0, sizeof(dummy_event)) + system.epoll_ctl( + backend_id, + system.EPOLL_CTL_DEL, + self.fd, + &dummy_event) # ignore errors + + cdef is_reading(self): + return self._is_alive() and self.reading_handle is not None + + cdef is_writing(self): + return self._is_alive() and self.writing_handle is not None + + cdef start_reading(self, Handle callback): + cdef: + int mask = 0 + + if self.reading_handle is None: + # not reading right now, setup the handle + + mask = uv.UV_READABLE + if self.writing_handle is not None: + # are we writing right now? + mask |= uv.UV_WRITABLE + + self._poll_start(mask) + else: + self.reading_handle._cancel() + + self.reading_handle = callback + + cdef start_writing(self, Handle callback): + cdef: + int mask = 0 + + if self.writing_handle is None: + # not writing right now, setup the handle + + mask = uv.UV_WRITABLE + if self.reading_handle is not None: + # are we reading right now? + mask |= uv.UV_READABLE + + self._poll_start(mask) + else: + self.writing_handle._cancel() + + self.writing_handle = callback + + cdef stop_reading(self): + if self.reading_handle is None: + return False + + self.reading_handle._cancel() + self.reading_handle = None + + if self.writing_handle is None: + self.stop() + else: + self._poll_start(uv.UV_WRITABLE) + + return True + + cdef stop_writing(self): + if self.writing_handle is None: + return False + + self.writing_handle._cancel() + self.writing_handle = None + + if self.reading_handle is None: + self.stop() + else: + self._poll_start(uv.UV_READABLE) + + return True + + cdef stop(self): + if self.reading_handle is not None: + self.reading_handle._cancel() + self.reading_handle = None + + if self.writing_handle is not None: + self.writing_handle._cancel() + self.writing_handle = None + + self._poll_stop() + + cdef _close(self): + if self.is_active(): + self.stop() + + UVHandle._close(<UVHandle>self) + + cdef _fatal_error(self, exc, throw, reason=None): + try: + if self.reading_handle is not None: + try: + self.reading_handle._run() + except BaseException as ex: + self._loop._handle_exception(ex) + self.reading_handle = None + + if self.writing_handle is not None: + try: + self.writing_handle._run() + except BaseException as ex: + self._loop._handle_exception(ex) + self.writing_handle = None + + finally: + self._close() + + +cdef void __on_uvpoll_event( + uv.uv_poll_t* handle, + int status, + int events, +) noexcept with gil: + + if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVPoll callback") == 0: + return + + cdef: + UVPoll poll = <UVPoll> handle.data + + if status < 0: + exc = convert_error(status) + poll._fatal_error(exc, False) + return + + if ((events & (uv.UV_READABLE | uv.UV_DISCONNECT)) and + poll.reading_handle is not None): + + try: + if UVLOOP_DEBUG: + poll._loop._poll_read_events_total += 1 + poll.reading_handle._run() + except BaseException as ex: + if UVLOOP_DEBUG: + poll._loop._poll_read_cb_errors_total += 1 + poll._error(ex, False) + # continue code execution + + if ((events & (uv.UV_WRITABLE | uv.UV_DISCONNECT)) and + poll.writing_handle is not None): + + try: + if UVLOOP_DEBUG: + poll._loop._poll_write_events_total += 1 + poll.writing_handle._run() + except BaseException as ex: + if UVLOOP_DEBUG: + poll._loop._poll_write_cb_errors_total += 1 + poll._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/process.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/process.pxd new file mode 100644 index 0000000..970abcf --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/process.pxd @@ -0,0 +1,80 @@ +cdef class UVProcess(UVHandle): + cdef: + object _returncode + object _pid + + object _errpipe_read + object _errpipe_write + object _preexec_fn + bint _restore_signals + + list _fds_to_close + + # Attributes used to compose uv_process_options_t: + uv.uv_process_options_t options + uv.uv_stdio_container_t[3] iocnt + list __env + char **uv_opt_env + list __args + char **uv_opt_args + char *uv_opt_file + bytes __cwd + + cdef _close_process_handle(self) + + cdef _init(self, Loop loop, list args, dict env, cwd, + start_new_session, + _stdin, _stdout, _stderr, pass_fds, + debug_flags, preexec_fn, restore_signals) + + cdef _after_fork(self) + + cdef char** __to_cstring_array(self, list arr) + cdef _init_args(self, list args) + cdef _init_env(self, dict env) + cdef _init_files(self, _stdin, _stdout, _stderr) + cdef _init_options(self, list args, dict env, cwd, start_new_session, + _stdin, _stdout, _stderr, bint force_fork) + + cdef _close_after_spawn(self, int fd) + + cdef _on_exit(self, int64_t exit_status, int term_signal) + cdef _kill(self, int signum) + + +cdef class UVProcessTransport(UVProcess): + cdef: + list _exit_waiters + list _init_futs + bint _stdio_ready + list _pending_calls + object _protocol + bint _finished + + WriteUnixTransport _stdin + ReadUnixTransport _stdout + ReadUnixTransport _stderr + + object stdin_proto + object stdout_proto + object stderr_proto + + cdef _file_redirect_stdio(self, int fd) + cdef _file_devnull(self) + cdef _file_inpipe(self) + cdef _file_outpipe(self) + + cdef _check_proc(self) + cdef _pipe_connection_lost(self, int fd, exc) + cdef _pipe_data_received(self, int fd, data) + + cdef _call_connection_made(self, waiter) + cdef _try_finish(self) + + @staticmethod + cdef UVProcessTransport new(Loop loop, protocol, args, env, cwd, + start_new_session, + _stdin, _stdout, _stderr, pass_fds, + waiter, + debug_flags, + preexec_fn, restore_signals) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx new file mode 100644 index 0000000..63b982a --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx @@ -0,0 +1,792 @@ +@cython.no_gc_clear +cdef class UVProcess(UVHandle): + """Abstract class; wrapper over uv_process_t handle.""" + + def __cinit__(self): + self.uv_opt_env = NULL + self.uv_opt_args = NULL + self._returncode = None + self._pid = None + self._fds_to_close = list() + self._preexec_fn = None + self._restore_signals = True + self.context = Context_CopyCurrent() + + cdef _close_process_handle(self): + # XXX: This is a workaround for a libuv bug: + # - https://github.com/libuv/libuv/issues/1933 + # - https://github.com/libuv/libuv/pull/551 + if self._handle is NULL: + return + self._handle.data = NULL + uv.uv_close(self._handle, __uv_close_process_handle_cb) + self._handle = NULL # close callback will free() the memory + + cdef _init(self, Loop loop, list args, dict env, + cwd, start_new_session, + _stdin, _stdout, _stderr, # std* can be defined as macros in C + pass_fds, debug_flags, preexec_fn, restore_signals): + + global __forking + global __forking_loop + global __forkHandler + + cdef int err + + self._start_init(loop) + + self._handle = <uv.uv_handle_t*>PyMem_RawMalloc( + sizeof(uv.uv_process_t)) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + # Too early to call _finish_init, but still a lot of work to do. + # Let's set handle.data to NULL, so in case something goes wrong, + # callbacks have a chance to avoid casting *something* into UVHandle. + self._handle.data = NULL + + force_fork = False + if system.PLATFORM_IS_APPLE and not ( + preexec_fn is None + and not pass_fds + ): + # see _execute_child() in CPython/subprocess.py + force_fork = True + + try: + self._init_options(args, env, cwd, start_new_session, + _stdin, _stdout, _stderr, force_fork) + + restore_inheritable = set() + if pass_fds: + for fd in pass_fds: + if not os_get_inheritable(fd): + restore_inheritable.add(fd) + os_set_inheritable(fd, True) + except Exception: + self._abort_init() + raise + + if __forking or loop.active_process_handler is not None: + # Our pthread_atfork handlers won't work correctly when + # another loop is forking in another thread (even though + # GIL should help us to avoid that.) + self._abort_init() + raise RuntimeError( + 'Racing with another loop to spawn a process.') + + self._errpipe_read, self._errpipe_write = os_pipe() + fds_to_close = self._fds_to_close + self._fds_to_close = None + fds_to_close.append(self._errpipe_read) + # add the write pipe last so we can close it early + fds_to_close.append(self._errpipe_write) + try: + os_set_inheritable(self._errpipe_write, True) + + self._preexec_fn = preexec_fn + self._restore_signals = restore_signals + + loop.active_process_handler = self + __forking = 1 + __forking_loop = loop + system.setForkHandler(<system.OnForkHandler>&__get_fork_handler) + + PyOS_BeforeFork() + + err = uv.uv_spawn(loop.uvloop, + <uv.uv_process_t*>self._handle, + &self.options) + + __forking = 0 + __forking_loop = None + system.resetForkHandler() + loop.active_process_handler = None + + PyOS_AfterFork_Parent() + + if err < 0: + self._close_process_handle() + self._abort_init() + raise convert_error(err) + + self._finish_init() + + # close the write pipe early + os_close(fds_to_close.pop()) + + if preexec_fn is not None: + errpipe_data = bytearray() + while True: + # XXX: This is a blocking code that has to be + # rewritten (using loop.connect_read_pipe() or + # otherwise.) + part = os_read(self._errpipe_read, 50000) + errpipe_data += part + if not part or len(errpipe_data) > 50000: + break + + finally: + while fds_to_close: + os_close(fds_to_close.pop()) + + for fd in restore_inheritable: + os_set_inheritable(fd, False) + + # asyncio caches the PID in BaseSubprocessTransport, + # so that the transport knows what the PID was even + # after the process is finished. + self._pid = (<uv.uv_process_t*>self._handle).pid + + # Track the process handle (create a strong ref to it) + # to guarantee that __dealloc__ doesn't happen in an + # uncontrolled fashion. We want to wait until the process + # exits and libuv calls __uvprocess_on_exit_callback, + # which will call `UVProcess._close()`, which will, in turn, + # untrack this handle. + self._loop._track_process(self) + + if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK: + time_sleep(1) + + if preexec_fn is not None and errpipe_data: + # preexec_fn has raised an exception. The child + # process must be dead now. + try: + exc_name, exc_msg = errpipe_data.split(b':', 1) + exc_name = exc_name.decode() + exc_msg = exc_msg.decode() + except Exception: + self._close() + raise subprocess_SubprocessError( + 'Bad exception data from child: {!r}'.format( + errpipe_data)) + exc_cls = getattr(__builtins__, exc_name, + subprocess_SubprocessError) + + exc = subprocess_SubprocessError( + 'Exception occurred in preexec_fn.') + exc.__cause__ = exc_cls(exc_msg) + self._close() + raise exc + + cdef _after_fork(self): + # See CPython/_posixsubprocess.c for details + cdef int err + + if self._restore_signals: + _Py_RestoreSignals() + + PyOS_AfterFork_Child() + + err = uv.uv_loop_fork(self._loop.uvloop) + if err < 0: + raise convert_error(err) + + if self._preexec_fn is not None: + try: + gc_disable() + self._preexec_fn() + except BaseException as ex: + try: + with open(self._errpipe_write, 'wb') as f: + f.write(str(ex.__class__.__name__).encode()) + f.write(b':') + f.write(str(ex.args[0]).encode()) + finally: + system._exit(255) + return + else: + os_close(self._errpipe_write) + else: + os_close(self._errpipe_write) + + cdef _close_after_spawn(self, int fd): + if self._fds_to_close is None: + raise RuntimeError( + 'UVProcess._close_after_spawn called after uv_spawn') + self._fds_to_close.append(fd) + + def __dealloc__(self): + if self.uv_opt_env is not NULL: + PyMem_RawFree(self.uv_opt_env) + self.uv_opt_env = NULL + + if self.uv_opt_args is not NULL: + PyMem_RawFree(self.uv_opt_args) + self.uv_opt_args = NULL + + cdef char** __to_cstring_array(self, list arr): + cdef: + int i + ssize_t arr_len = len(arr) + bytes el + + char **ret + + ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *)) + if ret is NULL: + raise MemoryError() + + for i in range(arr_len): + el = arr[i] + # NB: PyBytes_AsString doesn't copy the data; + # we have to be careful when the "arr" is GCed, + # and it shouldn't be ever mutated. + ret[i] = PyBytes_AsString(el) + + ret[arr_len] = NULL + return ret + + cdef _init_options(self, list args, dict env, cwd, start_new_session, + _stdin, _stdout, _stderr, bint force_fork): + + memset(&self.options, 0, sizeof(uv.uv_process_options_t)) + + self._init_env(env) + self.options.env = self.uv_opt_env + + self._init_args(args) + self.options.file = self.uv_opt_file + self.options.args = self.uv_opt_args + + if start_new_session: + self.options.flags |= uv.UV_PROCESS_DETACHED + + if force_fork: + # This is a hack to work around the change in libuv 1.44: + # > macos: use posix_spawn instead of fork + # where Python subprocess options like preexec_fn are + # crippled. CPython only uses posix_spawn under a pretty + # strict list of conditions (see subprocess.py), and falls + # back to using fork() otherwise. We'd like to simulate such + # behavior with libuv, but unfortunately libuv doesn't + # provide explicit API to choose such implementation detail. + # Based on current (libuv 1.46) behavior, setting + # UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make + # libuv fallback to use fork, so let's just use it for now. + self.options.flags |= uv.UV_PROCESS_SETUID + self.options.uid = uv.getuid() + + if cwd is not None: + cwd = os_fspath(cwd) + + if isinstance(cwd, str): + cwd = PyUnicode_EncodeFSDefault(cwd) + if not isinstance(cwd, bytes): + raise ValueError('cwd must be a str or bytes object') + + self.__cwd = cwd + self.options.cwd = PyBytes_AsString(self.__cwd) + + self.options.exit_cb = &__uvprocess_on_exit_callback + + self._init_files(_stdin, _stdout, _stderr) + + cdef _init_args(self, list args): + cdef: + bytes path + int an = len(args) + + if an < 1: + raise ValueError('cannot spawn a process: args are empty') + + self.__args = args.copy() + for i in range(an): + arg = os_fspath(args[i]) + if isinstance(arg, str): + self.__args[i] = PyUnicode_EncodeFSDefault(arg) + elif not isinstance(arg, bytes): + raise TypeError('all args must be str or bytes') + + path = self.__args[0] + self.uv_opt_file = PyBytes_AsString(path) + self.uv_opt_args = self.__to_cstring_array(self.__args) + + cdef _init_env(self, dict env): + if env is not None: + self.__env = list() + for key in env: + val = env[key] + + if isinstance(key, str): + key = PyUnicode_EncodeFSDefault(key) + elif not isinstance(key, bytes): + raise TypeError( + 'all environment vars must be bytes or str') + + if isinstance(val, str): + val = PyUnicode_EncodeFSDefault(val) + elif not isinstance(val, bytes): + raise TypeError( + 'all environment values must be bytes or str') + + self.__env.append(key + b'=' + val) + + self.uv_opt_env = self.__to_cstring_array(self.__env) + else: + self.__env = None + + cdef _init_files(self, _stdin, _stdout, _stderr): + self.options.stdio_count = 0 + + cdef _kill(self, int signum): + cdef int err + self._ensure_alive() + err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum) + if err < 0: + raise convert_error(err) + + cdef _on_exit(self, int64_t exit_status, int term_signal): + if term_signal: + # From Python docs: + # A negative value -N indicates that the child was + # terminated by signal N (POSIX only). + self._returncode = -term_signal + else: + self._returncode = exit_status + + self._close() + + cdef _close(self): + try: + if self._loop is not None: + self._loop._untrack_process(self) + finally: + UVHandle._close(self) + + +DEF _CALL_PIPE_DATA_RECEIVED = 0 +DEF _CALL_PIPE_CONNECTION_LOST = 1 +DEF _CALL_PROCESS_EXITED = 2 +DEF _CALL_CONNECTION_LOST = 3 + + +@cython.no_gc_clear +cdef class UVProcessTransport(UVProcess): + def __cinit__(self): + self._exit_waiters = [] + self._protocol = None + + self._init_futs = [] + self._pending_calls = [] + self._stdio_ready = 0 + + self._stdin = self._stdout = self._stderr = None + self.stdin_proto = self.stdout_proto = self.stderr_proto = None + + self._finished = 0 + + cdef _on_exit(self, int64_t exit_status, int term_signal): + UVProcess._on_exit(self, exit_status, term_signal) + + if self._stdio_ready: + self._loop.call_soon(self._protocol.process_exited, + context=self.context) + else: + self._pending_calls.append((_CALL_PROCESS_EXITED, None, None)) + + self._try_finish() + + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(self._returncode) + self._exit_waiters.clear() + + self._close() + + cdef _check_proc(self): + if not self._is_alive() or self._returncode is not None: + raise ProcessLookupError() + + cdef _pipe_connection_lost(self, int fd, exc): + if self._stdio_ready: + self._loop.call_soon(self._protocol.pipe_connection_lost, fd, exc, + context=self.context) + self._try_finish() + else: + self._pending_calls.append((_CALL_PIPE_CONNECTION_LOST, fd, exc)) + + cdef _pipe_data_received(self, int fd, data): + if self._stdio_ready: + self._loop.call_soon(self._protocol.pipe_data_received, fd, data, + context=self.context) + else: + self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data)) + + cdef _file_redirect_stdio(self, int fd): + fd = os_dup(fd) + os_set_inheritable(fd, True) + self._close_after_spawn(fd) + return fd + + cdef _file_devnull(self): + dn = os_open(os_devnull, os_O_RDWR) + os_set_inheritable(dn, True) + self._close_after_spawn(dn) + return dn + + cdef _file_outpipe(self): + r, w = __socketpair() + os_set_inheritable(w, True) + self._close_after_spawn(w) + return r, w + + cdef _file_inpipe(self): + r, w = __socketpair() + os_set_inheritable(r, True) + self._close_after_spawn(r) + return r, w + + cdef _init_files(self, _stdin, _stdout, _stderr): + cdef uv.uv_stdio_container_t *iocnt + + UVProcess._init_files(self, _stdin, _stdout, _stderr) + + io = [None, None, None] + + self.options.stdio_count = 3 + self.options.stdio = self.iocnt + + if _stdin is not None: + if _stdin == subprocess_PIPE: + r, w = self._file_inpipe() + io[0] = r + + self.stdin_proto = WriteSubprocessPipeProto(self, 0) + waiter = self._loop._new_future() + self._stdin = WriteUnixTransport.new( + self._loop, self.stdin_proto, None, waiter) + self._init_futs.append(waiter) + self._stdin._open(w) + self._stdin._init_protocol() + elif _stdin == subprocess_DEVNULL: + io[0] = self._file_devnull() + elif _stdout == subprocess_STDOUT: + raise ValueError( + 'subprocess.STDOUT is supported only by stderr parameter') + else: + io[0] = self._file_redirect_stdio(_stdin) + else: + io[0] = self._file_redirect_stdio(0) + + if _stdout is not None: + if _stdout == subprocess_PIPE: + # We can't use UV_CREATE_PIPE here, since 'stderr' might be + # set to 'subprocess.STDOUT', and there is no way to + # emulate that functionality with libuv high-level + # streams API. Therefore, we create pipes for stdout and + # stderr manually. + + r, w = self._file_outpipe() + io[1] = w + + self.stdout_proto = ReadSubprocessPipeProto(self, 1) + waiter = self._loop._new_future() + self._stdout = ReadUnixTransport.new( + self._loop, self.stdout_proto, None, waiter) + self._init_futs.append(waiter) + self._stdout._open(r) + self._stdout._init_protocol() + elif _stdout == subprocess_DEVNULL: + io[1] = self._file_devnull() + elif _stdout == subprocess_STDOUT: + raise ValueError( + 'subprocess.STDOUT is supported only by stderr parameter') + else: + io[1] = self._file_redirect_stdio(_stdout) + else: + io[1] = self._file_redirect_stdio(1) + + if _stderr is not None: + if _stderr == subprocess_PIPE: + r, w = self._file_outpipe() + io[2] = w + + self.stderr_proto = ReadSubprocessPipeProto(self, 2) + waiter = self._loop._new_future() + self._stderr = ReadUnixTransport.new( + self._loop, self.stderr_proto, None, waiter) + self._init_futs.append(waiter) + self._stderr._open(r) + self._stderr._init_protocol() + elif _stderr == subprocess_STDOUT: + if io[1] is None: + # shouldn't ever happen + raise RuntimeError('cannot apply subprocess.STDOUT') + + io[2] = self._file_redirect_stdio(io[1]) + elif _stderr == subprocess_DEVNULL: + io[2] = self._file_devnull() + else: + io[2] = self._file_redirect_stdio(_stderr) + else: + io[2] = self._file_redirect_stdio(2) + + assert len(io) == 3 + for idx in range(3): + iocnt = &self.iocnt[idx] + if io[idx] is not None: + iocnt.flags = uv.UV_INHERIT_FD + iocnt.data.fd = io[idx] + else: + iocnt.flags = uv.UV_IGNORE + + cdef _call_connection_made(self, waiter): + try: + # we're always called in the right context, so just call the user's + self._protocol.connection_made(self) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as ex: + if waiter is not None and not waiter.cancelled(): + waiter.set_exception(ex) + else: + raise + else: + if waiter is not None and not waiter.cancelled(): + waiter.set_result(True) + + self._stdio_ready = 1 + if self._pending_calls: + pending_calls = self._pending_calls.copy() + self._pending_calls.clear() + for (type, fd, arg) in pending_calls: + if type == _CALL_PIPE_CONNECTION_LOST: + self._pipe_connection_lost(fd, arg) + elif type == _CALL_PIPE_DATA_RECEIVED: + self._pipe_data_received(fd, arg) + elif type == _CALL_PROCESS_EXITED: + self._loop.call_soon(self._protocol.process_exited) + elif type == _CALL_CONNECTION_LOST: + self._loop.call_soon(self._protocol.connection_lost, None) + + cdef _try_finish(self): + if self._returncode is None or self._finished: + return + + if ((self.stdin_proto is None or self.stdin_proto.disconnected) and + (self.stdout_proto is None or + self.stdout_proto.disconnected) and + (self.stderr_proto is None or + self.stderr_proto.disconnected)): + + self._finished = 1 + + if self._stdio_ready: + # copy self.context for simplicity + self._loop.call_soon(self._protocol.connection_lost, None, + context=self.context) + else: + self._pending_calls.append((_CALL_CONNECTION_LOST, None, None)) + + def __stdio_inited(self, waiter, stdio_fut): + exc = stdio_fut.exception() + if exc is not None: + if waiter is None: + raise exc + else: + waiter.set_exception(exc) + else: + self._loop._call_soon_handle( + new_MethodHandle1(self._loop, + "UVProcessTransport._call_connection_made", + <method1_t>self._call_connection_made, + None, # means to copy the current context + self, waiter)) + + @staticmethod + cdef UVProcessTransport new(Loop loop, protocol, args, env, + cwd, start_new_session, + _stdin, _stdout, _stderr, pass_fds, + waiter, + debug_flags, + preexec_fn, + restore_signals): + + cdef UVProcessTransport handle + handle = UVProcessTransport.__new__(UVProcessTransport) + handle._protocol = protocol + handle._init(loop, args, env, cwd, start_new_session, + __process_convert_fileno(_stdin), + __process_convert_fileno(_stdout), + __process_convert_fileno(_stderr), + pass_fds, + debug_flags, + preexec_fn, + restore_signals) + + if handle._init_futs: + handle._stdio_ready = 0 + init_fut = aio_gather(*handle._init_futs) + # add_done_callback will copy the current context and run the + # callback within the context + init_fut.add_done_callback( + ft_partial(handle.__stdio_inited, waiter)) + else: + handle._stdio_ready = 1 + loop._call_soon_handle( + new_MethodHandle1(loop, + "UVProcessTransport._call_connection_made", + <method1_t>handle._call_connection_made, + None, # means to copy the current context + handle, waiter)) + + return handle + + def get_protocol(self): + return self._protocol + + def set_protocol(self, protocol): + self._protocol = protocol + + def get_pid(self): + return self._pid + + def get_returncode(self): + return self._returncode + + def get_pipe_transport(self, fd): + if fd == 0: + return self._stdin + elif fd == 1: + return self._stdout + elif fd == 2: + return self._stderr + + def terminate(self): + self._check_proc() + self._kill(uv.SIGTERM) + + def kill(self): + self._check_proc() + self._kill(uv.SIGKILL) + + def send_signal(self, int signal): + self._check_proc() + self._kill(signal) + + def is_closing(self): + return self._closed + + def close(self): + if self._returncode is None: + self._kill(uv.SIGKILL) + + if self._stdin is not None: + self._stdin.close() + if self._stdout is not None: + self._stdout.close() + if self._stderr is not None: + self._stderr.close() + + if self._returncode is not None: + # The process is dead, just close the UV handle. + # + # (If "self._returncode is None", the process should have been + # killed already and we're just waiting for a SIGCHLD; after + # which the transport will be GC'ed and the uvhandle will be + # closed in UVHandle.__dealloc__.) + self._close() + + def get_extra_info(self, name, default=None): + return default + + def _wait(self): + fut = self._loop._new_future() + if self._returncode is not None: + fut.set_result(self._returncode) + return fut + + self._exit_waiters.append(fut) + return fut + + +class WriteSubprocessPipeProto(aio_BaseProtocol): + + def __init__(self, proc, fd): + if UVLOOP_DEBUG: + if type(proc) is not UVProcessTransport: + raise TypeError + if not isinstance(fd, int): + raise TypeError + self.proc = proc + self.fd = fd + self.pipe = None + self.disconnected = False + + def connection_made(self, transport): + self.pipe = transport + + def __repr__(self): + return ('<%s fd=%s pipe=%r>' + % (self.__class__.__name__, self.fd, self.pipe)) + + def connection_lost(self, exc): + self.disconnected = True + (<UVProcessTransport>self.proc)._pipe_connection_lost(self.fd, exc) + self.proc = None + + def pause_writing(self): + (<UVProcessTransport>self.proc)._protocol.pause_writing() + + def resume_writing(self): + (<UVProcessTransport>self.proc)._protocol.resume_writing() + + +class ReadSubprocessPipeProto(WriteSubprocessPipeProto, + aio_Protocol): + + def data_received(self, data): + (<UVProcessTransport>self.proc)._pipe_data_received(self.fd, data) + + +cdef __process_convert_fileno(object obj): + if obj is None or isinstance(obj, int): + return obj + + fileno = obj.fileno() + if not isinstance(fileno, int): + raise TypeError( + '{!r}.fileno() returned non-integer'.format(obj)) + return fileno + + +cdef void __uvprocess_on_exit_callback( + uv.uv_process_t *handle, + int64_t exit_status, + int term_signal, +) noexcept with gil: + + if __ensure_handle_data(<uv.uv_handle_t*>handle, + "UVProcess exit callback") == 0: + return + + cdef UVProcess proc = <UVProcess> handle.data + try: + proc._on_exit(exit_status, term_signal) + except BaseException as ex: + proc._error(ex, False) + + +cdef __socketpair(): + cdef: + int fds[2] + int err + + err = system.socketpair(uv.AF_UNIX, uv.SOCK_STREAM, 0, fds) + if err: + exc = convert_error(-err) + raise exc + + os_set_inheritable(fds[0], False) + os_set_inheritable(fds[1], False) + + return fds[0], fds[1] + + +cdef void __uv_close_process_handle_cb( + uv.uv_handle_t* handle +) noexcept with gil: + PyMem_RawFree(handle) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/stream.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pxd new file mode 100644 index 0000000..8ca8743 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pxd @@ -0,0 +1,50 @@ +cdef class UVStream(UVBaseTransport): + cdef: + uv.uv_shutdown_t _shutdown_req + bint __shutting_down + bint __reading + bint __read_error_close + + bint __buffered + object _protocol_get_buffer + object _protocol_buffer_updated + + bint _eof + list _buffer + size_t _buffer_size + + Py_buffer _read_pybuf + bint _read_pybuf_acquired + + # All "inline" methods are final + + cdef inline _init(self, Loop loop, object protocol, Server server, + object waiter, object context) + + + cdef inline _shutdown(self) + cdef inline _accept(self, UVStream server) + + cdef inline _close_on_read_error(self) + + cdef inline __reading_started(self) + cdef inline __reading_stopped(self) + + # The user API write() and writelines() firstly call _buffer_write() to + # buffer up user data chunks, potentially multiple times in writelines(), + # and then call _initiate_write() to start writing either immediately or in + # the next iteration (loop._queue_write()). + cdef inline _buffer_write(self, object data) + cdef inline _initiate_write(self) + + # _exec_write() is the method that does the actual send, and _try_write() + # is a fast-path used in _exec_write() to send a single chunk. + cdef inline _exec_write(self) + cdef inline _try_write(self, object data) + + cdef _close(self) + + cdef inline _on_accept(self) + cdef inline _on_eof(self) + cdef inline _on_write(self) + cdef inline _on_connect(self, object exc) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx new file mode 100644 index 0000000..d4e02e3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx @@ -0,0 +1,1015 @@ +DEF __PREALLOCED_BUFS = 4 + + +@cython.no_gc_clear +@cython.freelist(DEFAULT_FREELIST_SIZE) +cdef class _StreamWriteContext: + # used to hold additional write request information for uv_write + + cdef: + uv.uv_write_t req + + list buffers + + uv.uv_buf_t uv_bufs_sml[__PREALLOCED_BUFS] + Py_buffer py_bufs_sml[__PREALLOCED_BUFS] + bint py_bufs_sml_inuse + + uv.uv_buf_t* uv_bufs + Py_buffer* py_bufs + size_t py_bufs_len + + uv.uv_buf_t* uv_bufs_start + size_t uv_bufs_len + + UVStream stream + + bint closed + + cdef free_bufs(self): + cdef size_t i + + if self.uv_bufs is not NULL: + PyMem_RawFree(self.uv_bufs) + self.uv_bufs = NULL + if UVLOOP_DEBUG: + if self.py_bufs_sml_inuse: + raise RuntimeError( + '_StreamWriteContext.close: uv_bufs != NULL and ' + 'py_bufs_sml_inuse is True') + + if self.py_bufs is not NULL: + for i from 0 <= i < self.py_bufs_len: + PyBuffer_Release(&self.py_bufs[i]) + PyMem_RawFree(self.py_bufs) + self.py_bufs = NULL + if UVLOOP_DEBUG: + if self.py_bufs_sml_inuse: + raise RuntimeError( + '_StreamWriteContext.close: py_bufs != NULL and ' + 'py_bufs_sml_inuse is True') + + if self.py_bufs_sml_inuse: + for i from 0 <= i < self.py_bufs_len: + PyBuffer_Release(&self.py_bufs_sml[i]) + self.py_bufs_sml_inuse = 0 + + self.py_bufs_len = 0 + self.buffers = None + + cdef close(self): + if self.closed: + return + self.closed = 1 + self.free_bufs() + Py_DECREF(self) + + cdef advance_uv_buf(self, size_t sent): + # Advance the pointer to first uv_buf and the + # pointer to first byte in that buffer. + # + # We do this after a "uv_try_write" call, which + # sometimes sends only a portion of data. + # We then call "advance_uv_buf" on the write + # context, and reuse it in a "uv_write" call. + + cdef: + uv.uv_buf_t* buf + size_t idx + + for idx from 0 <= idx < self.uv_bufs_len: + buf = &self.uv_bufs_start[idx] + if buf.len > sent: + buf.len -= sent + buf.base = buf.base + sent + self.uv_bufs_start = buf + self.uv_bufs_len -= idx + return + else: + sent -= self.uv_bufs_start[idx].len + + if UVLOOP_DEBUG: + if sent < 0: + raise RuntimeError('fatal: sent < 0 in advance_uv_buf') + + raise RuntimeError('fatal: Could not advance _StreamWriteContext') + + @staticmethod + cdef _StreamWriteContext new(UVStream stream, list buffers): + cdef: + _StreamWriteContext ctx + int uv_bufs_idx = 0 + size_t py_bufs_len = 0 + int i + + Py_buffer* p_pybufs + uv.uv_buf_t* p_uvbufs + + ctx = _StreamWriteContext.__new__(_StreamWriteContext) + ctx.stream = None + ctx.closed = 1 + ctx.py_bufs_len = 0 + ctx.py_bufs_sml_inuse = 0 + ctx.uv_bufs = NULL + ctx.py_bufs = NULL + ctx.buffers = buffers + ctx.stream = stream + + if len(buffers) <= __PREALLOCED_BUFS: + # We've got a small number of buffers to write, don't + # need to use malloc. + ctx.py_bufs_sml_inuse = 1 + p_pybufs = <Py_buffer*>&ctx.py_bufs_sml + p_uvbufs = <uv.uv_buf_t*>&ctx.uv_bufs_sml + + else: + for buf in buffers: + if UVLOOP_DEBUG: + if not isinstance(buf, (bytes, bytearray, memoryview)): + raise RuntimeError( + 'invalid data in writebuf: an instance of ' + 'bytes, bytearray or memoryview was expected, ' + 'got {}'.format(type(buf))) + + if not PyBytes_CheckExact(buf): + py_bufs_len += 1 + + if py_bufs_len > 0: + ctx.py_bufs = <Py_buffer*>PyMem_RawMalloc( + py_bufs_len * sizeof(Py_buffer)) + if ctx.py_bufs is NULL: + raise MemoryError() + + ctx.uv_bufs = <uv.uv_buf_t*>PyMem_RawMalloc( + len(buffers) * sizeof(uv.uv_buf_t)) + if ctx.uv_bufs is NULL: + raise MemoryError() + + p_pybufs = ctx.py_bufs + p_uvbufs = ctx.uv_bufs + + py_bufs_len = 0 + for buf in buffers: + if PyBytes_CheckExact(buf): + # We can only use this hack for bytes since it's + # immutable. For everything else it is only safe to + # use buffer protocol. + p_uvbufs[uv_bufs_idx].base = PyBytes_AS_STRING(buf) + p_uvbufs[uv_bufs_idx].len = Py_SIZE(buf) + + else: + try: + PyObject_GetBuffer( + buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE) + except Exception: + # This shouldn't ever happen, as `UVStream._buffer_write` + # casts non-bytes objects to `memoryviews`. + ctx.py_bufs_len = py_bufs_len + ctx.free_bufs() + raise + + p_uvbufs[uv_bufs_idx].base = <char*>p_pybufs[py_bufs_len].buf + p_uvbufs[uv_bufs_idx].len = p_pybufs[py_bufs_len].len + + py_bufs_len += 1 + + uv_bufs_idx += 1 + + ctx.uv_bufs_start = p_uvbufs + ctx.uv_bufs_len = uv_bufs_idx + + ctx.py_bufs_len = py_bufs_len + ctx.req.data = <void*> ctx + + if UVLOOP_DEBUG: + stream._loop._debug_stream_write_ctx_total += 1 + stream._loop._debug_stream_write_ctx_cnt += 1 + + # Do incref after everything else is done. + # Under no circumstances we want `ctx` to be GCed while + # libuv is still working with `ctx.uv_bufs`. + Py_INCREF(ctx) + ctx.closed = 0 + return ctx + + def __dealloc__(self): + if not self.closed: + # Because we do an INCREF in _StreamWriteContext.new, + # __dealloc__ shouldn't ever happen with `self.closed == 1` + raise RuntimeError( + 'open _StreamWriteContext is being deallocated') + + if UVLOOP_DEBUG: + if self.stream is not None: + self.stream._loop._debug_stream_write_ctx_cnt -= 1 + self.stream = None + + +@cython.no_gc_clear +cdef class UVStream(UVBaseTransport): + + def __cinit__(self): + self.__shutting_down = 0 + self.__reading = 0 + self.__read_error_close = 0 + self.__buffered = 0 + self._eof = 0 + self._buffer = [] + self._buffer_size = 0 + + self._protocol_get_buffer = None + self._protocol_buffer_updated = None + + self._read_pybuf_acquired = False + + cdef _set_protocol(self, object protocol): + if protocol is None: + raise TypeError('protocol is required') + + UVBaseTransport._set_protocol(self, protocol) + + if (hasattr(protocol, 'get_buffer') and + not isinstance(protocol, aio_Protocol)): + try: + self._protocol_get_buffer = protocol.get_buffer + self._protocol_buffer_updated = protocol.buffer_updated + self.__buffered = 1 + except AttributeError: + pass + else: + self.__buffered = 0 + + cdef _clear_protocol(self): + UVBaseTransport._clear_protocol(self) + self._protocol_get_buffer = None + self._protocol_buffer_updated = None + self.__buffered = 0 + + cdef inline _shutdown(self): + cdef int err + + if self.__shutting_down: + return + self.__shutting_down = 1 + + self._ensure_alive() + + self._shutdown_req.data = <void*> self + err = uv.uv_shutdown(&self._shutdown_req, + <uv.uv_stream_t*> self._handle, + __uv_stream_on_shutdown) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef inline _accept(self, UVStream server): + cdef int err + self._ensure_alive() + + err = uv.uv_accept(<uv.uv_stream_t*>server._handle, + <uv.uv_stream_t*>self._handle) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + self._on_accept() + + cdef inline _close_on_read_error(self): + self.__read_error_close = 1 + + cdef bint _is_reading(self): + return self.__reading + + cdef _start_reading(self): + cdef int err + + if self._closing: + return + + self._ensure_alive() + + if self.__reading: + return + + if self.__buffered: + err = uv.uv_read_start(<uv.uv_stream_t*>self._handle, + __uv_stream_buffered_alloc, + __uv_stream_buffered_on_read) + else: + err = uv.uv_read_start(<uv.uv_stream_t*>self._handle, + __loop_alloc_buffer, + __uv_stream_on_read) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + else: + # UVStream must live until the read callback is called + self.__reading_started() + + cdef inline __reading_started(self): + if self.__reading: + return + self.__reading = 1 + Py_INCREF(self) + + cdef inline __reading_stopped(self): + if not self.__reading: + return + self.__reading = 0 + Py_DECREF(self) + + cdef _stop_reading(self): + cdef int err + + if not self.__reading: + return + + self._ensure_alive() + + # From libuv docs: + # This function is idempotent and may be safely + # called on a stopped stream. + err = uv.uv_read_stop(<uv.uv_stream_t*>self._handle) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + else: + self.__reading_stopped() + + cdef inline _try_write(self, object data): + cdef: + ssize_t written + bint used_buf = 0 + Py_buffer py_buf + void* buf + size_t blen + int saved_errno + int fd + + if (<uv.uv_stream_t*>self._handle).write_queue_size != 0: + raise RuntimeError( + 'UVStream._try_write called with data in uv buffers') + + if PyBytes_CheckExact(data): + # We can only use this hack for bytes since it's + # immutable. For everything else it is only safe to + # use buffer protocol. + buf = <void*>PyBytes_AS_STRING(data) + blen = Py_SIZE(data) + else: + PyObject_GetBuffer(data, &py_buf, PyBUF_SIMPLE) + used_buf = 1 + buf = py_buf.buf + blen = py_buf.len + + if blen == 0: + # Empty data, do nothing. + return 0 + + fd = self._fileno() + # Use `unistd.h/write` directly, it's faster than + # uv_try_write -- less layers of code. The error + # checking logic is copied from libuv. + written = system.write(fd, buf, blen) + while written == -1 and ( + errno.errno == errno.EINTR or + (system.PLATFORM_IS_APPLE and + errno.errno == errno.EPROTOTYPE)): + # From libuv code (unix/stream.c): + # Due to a possible kernel bug at least in OS X 10.10 "Yosemite", + # EPROTOTYPE can be returned while trying to write to a socket + # that is shutting down. If we retry the write, we should get + # the expected EPIPE instead. + written = system.write(fd, buf, blen) + saved_errno = errno.errno + + if used_buf: + PyBuffer_Release(&py_buf) + + if written < 0: + if saved_errno == errno.EAGAIN or \ + saved_errno == system.EWOULDBLOCK: + return -1 + else: + exc = convert_error(-saved_errno) + self._fatal_error(exc, True) + return + + if UVLOOP_DEBUG: + self._loop._debug_stream_write_tries += 1 + + if <size_t>written == blen: + return 0 + + return written + + cdef inline _buffer_write(self, object data): + cdef int dlen + + if not PyBytes_CheckExact(data): + data = memoryview(data).cast('b') + + dlen = len(data) + if not dlen: + return + + self._buffer_size += dlen + self._buffer.append(data) + + cdef inline _initiate_write(self): + if (not self._protocol_paused and + (<uv.uv_stream_t*>self._handle).write_queue_size == 0 and + self._buffer_size > self._high_water): + # Fast-path. If: + # - the protocol isn't yet paused, + # - there is no data in libuv buffers for this stream, + # - the protocol will be paused if we continue to buffer data + # + # Then: + # - Try to write all buffered data right now. + all_sent = self._exec_write() + if UVLOOP_DEBUG: + if self._buffer_size != 0 or self._buffer != []: + raise RuntimeError( + '_buffer_size is not 0 after a successful _exec_write') + + # There is no need to call `_queue_write` anymore, + # as `uv_write` should be called already. + + if not all_sent: + # If not all of the data was sent successfully, + # we might need to pause the protocol. + self._maybe_pause_protocol() + + elif self._buffer_size > 0: + self._maybe_pause_protocol() + self._loop._queue_write(self) + + cdef inline _exec_write(self): + cdef: + int err + int buf_len + _StreamWriteContext ctx = None + + if self._closed: + # If the handle is closed, just return, it's too + # late to do anything. + return + + buf_len = len(self._buffer) + if not buf_len: + return + + if (<uv.uv_stream_t*>self._handle).write_queue_size == 0: + # libuv internal write buffers for this stream are empty. + if buf_len == 1: + # If we only have one piece of data to send, let's + # use our fast implementation of try_write. + data = self._buffer[0] + sent = self._try_write(data) + + if sent is None: + # A `self._fatal_error` was called. + # It might not raise an exception under some + # conditions. + self._buffer_size = 0 + self._buffer.clear() + if not self._closing: + # This should never happen. + raise RuntimeError( + 'stream is open after UVStream._try_write ' + 'returned None') + return + + if sent == 0: + # All data was successfully written. + self._buffer_size = 0 + self._buffer.clear() + # on_write will call "maybe_resume_protocol". + self._on_write() + return True + + if sent > 0: + if UVLOOP_DEBUG: + if sent == len(data): + raise RuntimeError( + '_try_write sent all data and returned ' + 'non-zero') + + if PyBytes_CheckExact(data): + # Cast bytes to memoryview to avoid copying + # data that wasn't sent. + data = memoryview(data) + data = data[sent:] + + self._buffer_size -= sent + self._buffer[0] = data + + # At this point it's either data was sent partially, + # or an EAGAIN has happened. + + else: + ctx = _StreamWriteContext.new(self, self._buffer) + + err = uv.uv_try_write(<uv.uv_stream_t*>self._handle, + ctx.uv_bufs_start, + ctx.uv_bufs_len) + + if err > 0: + # Some data was successfully sent. + + if <size_t>err == self._buffer_size: + # Everything was sent. + ctx.close() + self._buffer.clear() + self._buffer_size = 0 + # on_write will call "maybe_resume_protocol". + self._on_write() + return True + + try: + # Advance pointers to uv_bufs in `ctx`, + # we will reuse it soon for a uv_write + # call. + ctx.advance_uv_buf(<ssize_t>err) + except Exception as ex: # This should never happen. + # Let's try to close the `ctx` anyways. + ctx.close() + self._fatal_error(ex, True) + self._buffer.clear() + self._buffer_size = 0 + return + + elif err != uv.UV_EAGAIN: + ctx.close() + exc = convert_error(err) + self._fatal_error(exc, True) + self._buffer.clear() + self._buffer_size = 0 + return + + # fall through + + if ctx is None: + ctx = _StreamWriteContext.new(self, self._buffer) + + err = uv.uv_write(&ctx.req, + <uv.uv_stream_t*>self._handle, + ctx.uv_bufs_start, + ctx.uv_bufs_len, + __uv_stream_on_write) + + self._buffer_size = 0 + # Can't use `_buffer.clear()` here: `ctx` holds a reference to + # the `_buffer`. + self._buffer = [] + + if err < 0: + # close write context + ctx.close() + + exc = convert_error(err) + self._fatal_error(exc, True) + return + + self._maybe_resume_protocol() + + cdef size_t _get_write_buffer_size(self): + if self._handle is NULL: + return 0 + return ((<uv.uv_stream_t*>self._handle).write_queue_size + + self._buffer_size) + + cdef _close(self): + try: + if self._read_pybuf_acquired: + # Should never happen. libuv always calls uv_alloc/uv_read + # in pairs. + self._loop.call_exception_handler({ + 'transport': self, + 'message': 'XXX: an allocated buffer in transport._close()' + }) + self._read_pybuf_acquired = 0 + PyBuffer_Release(&self._read_pybuf) + + self._stop_reading() + finally: + UVSocketHandle._close(<UVHandle>self) + + cdef inline _on_accept(self): + # Ultimately called by __uv_stream_on_listen. + self._init_protocol() + + cdef inline _on_eof(self): + # Any exception raised here will be caught in + # __uv_stream_on_read. + + try: + meth = self._protocol.eof_received + except AttributeError: + keep_open = False + else: + keep_open = run_in_context(self.context, meth) + + if keep_open: + # We're keeping the connection open so the + # protocol can write more, but we still can't + # receive more, so remove the reader callback. + self._stop_reading() + else: + self.close() + + cdef inline _on_write(self): + self._maybe_resume_protocol() + if not self._get_write_buffer_size(): + if self._closing: + self._schedule_call_connection_lost(None) + elif self._eof: + self._shutdown() + + cdef inline _init(self, Loop loop, object protocol, Server server, + object waiter, object context): + self.context = context + self._set_protocol(protocol) + self._start_init(loop) + + if server is not None: + self._set_server(server) + + if waiter is not None: + self._set_waiter(waiter) + + cdef inline _on_connect(self, object exc): + # Called from __tcp_connect_callback (tcp.pyx) and + # __pipe_connect_callback (pipe.pyx). + if exc is None: + self._init_protocol() + else: + if self._waiter is None: + self._fatal_error(exc, False, "connect failed") + elif self._waiter.cancelled(): + # Connect call was cancelled; just close the transport + # silently. + self._close() + elif self._waiter.done(): + self._fatal_error(exc, False, "connect failed") + else: + self._waiter.set_exception(exc) + self._close() + + # === Public API === + + def __repr__(self): + return '<{} closed={} reading={} {:#x}>'.format( + self.__class__.__name__, + self._closed, + self.__reading, + id(self)) + + def write(self, object buf): + self._ensure_alive() + + if self._eof: + raise RuntimeError('Cannot call write() after write_eof()') + if not buf: + return + if self._conn_lost: + self._conn_lost += 1 + return + self._buffer_write(buf) + self._initiate_write() + + def writelines(self, bufs): + self._ensure_alive() + + if self._eof: + raise RuntimeError('Cannot call writelines() after write_eof()') + if self._conn_lost: + self._conn_lost += 1 + return + for buf in bufs: + self._buffer_write(buf) + self._initiate_write() + + def write_eof(self): + self._ensure_alive() + + if self._eof: + return + + self._eof = 1 + if not self._get_write_buffer_size(): + self._shutdown() + + def can_write_eof(self): + return True + + def is_reading(self): + return self._is_reading() + + def pause_reading(self): + if self._closing or not self._is_reading(): + return + self._stop_reading() + + def resume_reading(self): + if self._is_reading() or self._closing: + return + self._start_reading() + + +cdef void __uv_stream_on_shutdown(uv.uv_shutdown_t* req, + int status) noexcept with gil: + + # callback for uv_shutdown + + if req.data is NULL: + aio_logger.error( + 'UVStream.shutdown callback called with NULL req.data, status=%r', + status) + return + + cdef UVStream stream = <UVStream> req.data + + if status < 0 and status != uv.UV_ECANCELED: + # From libuv source code: + # The ECANCELED error code is a lie, the shutdown(2) syscall is a + # fait accompli at this point. Maybe we should revisit this in + # v0.11. A possible reason for leaving it unchanged is that it + # informs the callee that the handle has been destroyed. + + if UVLOOP_DEBUG: + stream._loop._debug_stream_shutdown_errors_total += 1 + + exc = convert_error(status) + stream._fatal_error( + exc, False, "error status in uv_stream_t.shutdown callback") + return + + +cdef inline bint __uv_stream_on_read_common( + UVStream sc, + Loop loop, + ssize_t nread, +): + if sc._closed: + # The stream was closed, there is no reason to + # do any work now. + sc.__reading_stopped() # Just in case. + return True + + if nread == uv.UV_EOF: + # From libuv docs: + # The callee is responsible for stopping closing the stream + # when an error happens by calling uv_read_stop() or uv_close(). + # Trying to read from the stream again is undefined. + try: + if UVLOOP_DEBUG: + loop._debug_stream_read_eof_total += 1 + + sc._stop_reading() + sc._on_eof() + except BaseException as ex: + if UVLOOP_DEBUG: + loop._debug_stream_read_eof_cb_errors_total += 1 + + sc._fatal_error(ex, False) + finally: + return True + + if nread == 0: + # From libuv docs: + # nread might be 0, which does not indicate an error or EOF. + # This is equivalent to EAGAIN or EWOULDBLOCK under read(2). + return True + + if nread < 0: + # From libuv docs: + # The callee is responsible for stopping closing the stream + # when an error happens by calling uv_read_stop() or uv_close(). + # Trying to read from the stream again is undefined. + # + # Therefore, we're closing the stream. Since "UVHandle._close()" + # doesn't raise exceptions unless uvloop is built with DEBUG=1, + # we don't need try...finally here. + + if UVLOOP_DEBUG: + loop._debug_stream_read_errors_total += 1 + + if sc.__read_error_close: + # Used for getting notified when a pipe is closed. + # See WriteUnixTransport for the explanation. + sc._on_eof() + return True + + exc = convert_error(nread) + sc._fatal_error( + exc, False, "error status in uv_stream_t.read callback") + return True + + return False + + +cdef inline void __uv_stream_on_read_impl( + uv.uv_stream_t* stream, + ssize_t nread, + const uv.uv_buf_t* buf, +): + cdef: + UVStream sc = <UVStream>stream.data + Loop loop = sc._loop + + # It's OK to free the buffer early, since nothing will + # be able to touch it until this method is done. + __loop_free_buffer(loop) + + if __uv_stream_on_read_common(sc, loop, nread): + return + + try: + if UVLOOP_DEBUG: + loop._debug_stream_read_cb_total += 1 + + run_in_context1( + sc.context, + sc._protocol_data_received, + loop._recv_buffer[:nread], + ) + except BaseException as exc: + if UVLOOP_DEBUG: + loop._debug_stream_read_cb_errors_total += 1 + + sc._fatal_error(exc, False) + + +cdef inline void __uv_stream_on_write_impl( + uv.uv_write_t* req, + int status, +): + cdef: + _StreamWriteContext ctx = <_StreamWriteContext> req.data + UVStream stream = <UVStream>ctx.stream + + ctx.close() + + if stream._closed: + # The stream was closed, there is nothing to do. + # Even if there is an error, like EPIPE, there + # is no reason to report it. + return + + if status < 0: + if UVLOOP_DEBUG: + stream._loop._debug_stream_write_errors_total += 1 + + exc = convert_error(status) + stream._fatal_error( + exc, False, "error status in uv_stream_t.write callback") + return + + try: + stream._on_write() + except BaseException as exc: + if UVLOOP_DEBUG: + stream._loop._debug_stream_write_cb_errors_total += 1 + + stream._fatal_error(exc, False) + + +cdef void __uv_stream_on_read( + uv.uv_stream_t* stream, + ssize_t nread, + const uv.uv_buf_t* buf, +) noexcept with gil: + + if __ensure_handle_data(<uv.uv_handle_t*>stream, + "UVStream read callback") == 0: + return + + # Don't need try-finally, __uv_stream_on_read_impl is void + __uv_stream_on_read_impl(stream, nread, buf) + + +cdef void __uv_stream_on_write( + uv.uv_write_t* req, + int status, +) noexcept with gil: + + if UVLOOP_DEBUG: + if req.data is NULL: + aio_logger.error( + 'UVStream.write callback called with NULL req.data, status=%r', + status) + return + + # Don't need try-finally, __uv_stream_on_write_impl is void + __uv_stream_on_write_impl(req, status) + + +cdef void __uv_stream_buffered_alloc( + uv.uv_handle_t* stream, + size_t suggested_size, + uv.uv_buf_t* uvbuf, +) noexcept with gil: + + if __ensure_handle_data(<uv.uv_handle_t*>stream, + "UVStream alloc buffer callback") == 0: + return + + cdef: + UVStream sc = <UVStream>stream.data + Loop loop = sc._loop + Py_buffer* pybuf = &sc._read_pybuf + int got_buf = 0 + + if sc._read_pybuf_acquired: + uvbuf.len = 0 + uvbuf.base = NULL + return + + sc._read_pybuf_acquired = 0 + try: + buf = run_in_context1( + sc.context, + sc._protocol_get_buffer, + suggested_size, + ) + PyObject_GetBuffer(buf, pybuf, PyBUF_WRITABLE) + got_buf = 1 + except BaseException as exc: + # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF. + # We'll do it later in __uv_stream_buffered_on_read when we + # receive UV_ENOBUFS. + uvbuf.len = 0 + uvbuf.base = NULL + return + + if not pybuf.len: + uvbuf.len = 0 + uvbuf.base = NULL + if got_buf: + PyBuffer_Release(pybuf) + return + + sc._read_pybuf_acquired = 1 + uvbuf.base = <char*>pybuf.buf + uvbuf.len = pybuf.len + + +cdef void __uv_stream_buffered_on_read( + uv.uv_stream_t* stream, + ssize_t nread, + const uv.uv_buf_t* buf, +) noexcept with gil: + + if __ensure_handle_data(<uv.uv_handle_t*>stream, + "UVStream buffered read callback") == 0: + return + + cdef: + UVStream sc = <UVStream>stream.data + Loop loop = sc._loop + Py_buffer* pybuf = &sc._read_pybuf + + if nread == uv.UV_ENOBUFS: + sc._fatal_error( + RuntimeError( + 'unhandled error (or an empty buffer) in get_buffer()'), + False) + return + + try: + if nread > 0 and not sc._read_pybuf_acquired: + # From libuv docs: + # nread is > 0 if there is data available or < 0 on error. When + # we’ve reached EOF, nread will be set to UV_EOF. When + # nread < 0, the buf parameter might not point to a valid + # buffer; in that case buf.len and buf.base are both set to 0. + raise RuntimeError( + f'no python buffer is allocated in on_read; nread={nread}') + + if nread == 0: + # From libuv docs: + # nread might be 0, which does not indicate an error or EOF. + # This is equivalent to EAGAIN or EWOULDBLOCK under read(2). + return + + if __uv_stream_on_read_common(sc, loop, nread): + return + + if UVLOOP_DEBUG: + loop._debug_stream_read_cb_total += 1 + + run_in_context1(sc.context, sc._protocol_buffer_updated, nread) + except BaseException as exc: + if UVLOOP_DEBUG: + loop._debug_stream_read_cb_errors_total += 1 + + sc._fatal_error(exc, False) + finally: + sc._read_pybuf_acquired = 0 + PyBuffer_Release(pybuf) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pxd new file mode 100644 index 0000000..a004efd --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pxd @@ -0,0 +1,26 @@ +cdef class UVStreamServer(UVSocketHandle): + cdef: + int backlog + object ssl + object ssl_handshake_timeout + object ssl_shutdown_timeout + object protocol_factory + bint opened + Server _server + + # All "inline" methods are final + + cdef inline _init(self, Loop loop, object protocol_factory, + Server server, + object backlog, + object ssl, + object ssl_handshake_timeout, + object ssl_shutdown_timeout) + + cdef inline _mark_as_open(self) + + cdef inline listen(self) + cdef inline _on_listen(self) + + cdef UVStream _make_new_transport(self, object protocol, object waiter, + object context) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pyx new file mode 100644 index 0000000..9993317 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pyx @@ -0,0 +1,150 @@ +@cython.no_gc_clear +cdef class UVStreamServer(UVSocketHandle): + + def __cinit__(self): + self.opened = 0 + self._server = None + self.ssl = None + self.ssl_handshake_timeout = None + self.ssl_shutdown_timeout = None + self.protocol_factory = None + + cdef inline _init(self, Loop loop, object protocol_factory, + Server server, + object backlog, + object ssl, + object ssl_handshake_timeout, + object ssl_shutdown_timeout): + + if not isinstance(backlog, int): + # Don't allow floats + raise TypeError('integer argument expected, got {}'.format( + type(backlog).__name__)) + + if ssl is not None: + if not isinstance(ssl, ssl_SSLContext): + raise TypeError( + 'ssl is expected to be None or an instance of ' + 'ssl.SSLContext, got {!r}'.format(ssl)) + else: + if ssl_handshake_timeout is not None: + raise ValueError( + 'ssl_handshake_timeout is only meaningful with ssl') + if ssl_shutdown_timeout is not None: + raise ValueError( + 'ssl_shutdown_timeout is only meaningful with ssl') + + self.backlog = backlog + self.ssl = ssl + self.ssl_handshake_timeout = ssl_handshake_timeout + self.ssl_shutdown_timeout = ssl_shutdown_timeout + + self._start_init(loop) + self.protocol_factory = protocol_factory + self._server = server + + cdef inline listen(self): + cdef int err + self._ensure_alive() + + if self.protocol_factory is None: + raise RuntimeError('unable to listen(); no protocol_factory') + + if self.opened != 1: + raise RuntimeError('unopened TCPServer') + + self.context = Context_CopyCurrent() + + err = uv.uv_listen(<uv.uv_stream_t*> self._handle, + self.backlog, + __uv_streamserver_on_listen) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef inline _on_listen(self): + cdef UVStream client + + protocol = run_in_context(self.context, self.protocol_factory) + + if self.ssl is None: + client = self._make_new_transport(protocol, None, self.context) + + else: + waiter = self._loop._new_future() + + ssl_protocol = SSLProtocol( + self._loop, protocol, self.ssl, + waiter, + server_side=True, + server_hostname=None, + ssl_handshake_timeout=self.ssl_handshake_timeout, + ssl_shutdown_timeout=self.ssl_shutdown_timeout) + + client = self._make_new_transport(ssl_protocol, None, self.context) + + waiter.add_done_callback( + ft_partial(self.__on_ssl_connected, client)) + + client._accept(<UVStream>self) + + cdef _fatal_error(self, exc, throw, reason=None): + # Overload UVHandle._fatal_error + + self._close() + + if not isinstance(exc, OSError): + + if throw or self._loop is None: + raise exc + + msg = f'Fatal error on server {self.__class__.__name__}' + if reason is not None: + msg = f'{msg} ({reason})' + + self._loop.call_exception_handler({ + 'message': msg, + 'exception': exc, + }) + + cdef inline _mark_as_open(self): + self.opened = 1 + + cdef UVStream _make_new_transport(self, object protocol, object waiter, + object context): + raise NotImplementedError + + def __on_ssl_connected(self, transport, fut): + exc = fut.exception() + if exc is not None: + transport._force_close(exc) + + +cdef void __uv_streamserver_on_listen( + uv.uv_stream_t* handle, + int status, +) noexcept with gil: + + # callback for uv_listen + + if __ensure_handle_data(<uv.uv_handle_t*>handle, + "UVStream listen callback") == 0: + return + + cdef: + UVStreamServer stream = <UVStreamServer> handle.data + + if status < 0: + if UVLOOP_DEBUG: + stream._loop._debug_stream_listen_errors_total += 1 + + exc = convert_error(status) + stream._fatal_error( + exc, False, "error status in uv_stream_t.listen callback") + return + + try: + stream._on_listen() + except BaseException as exc: + stream._error(exc, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pxd new file mode 100644 index 0000000..8d388ef --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pxd @@ -0,0 +1,26 @@ +cdef class TCPServer(UVStreamServer): + cdef bind(self, system.sockaddr* addr, unsigned int flags=*) + + @staticmethod + cdef TCPServer new(Loop loop, object protocol_factory, Server server, + unsigned int flags, + object backlog, + object ssl, + object ssl_handshake_timeout, + object ssl_shutdown_timeout) + + +cdef class TCPTransport(UVStream): + cdef: + bint __peername_set + bint __sockname_set + system.sockaddr_storage __peername + system.sockaddr_storage __sockname + + cdef bind(self, system.sockaddr* addr, unsigned int flags=*) + cdef connect(self, system.sockaddr* addr) + cdef _set_nodelay(self) + + @staticmethod + cdef TCPTransport new(Loop loop, object protocol, Server server, + object waiter, object context) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pyx new file mode 100644 index 0000000..d5fe827 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pyx @@ -0,0 +1,228 @@ +cdef __tcp_init_uv_handle(UVStream handle, Loop loop, unsigned int flags): + cdef int err + + handle._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_tcp_t)) + if handle._handle is NULL: + handle._abort_init() + raise MemoryError() + + err = uv.uv_tcp_init_ex(handle._loop.uvloop, + <uv.uv_tcp_t*>handle._handle, + flags) + if err < 0: + handle._abort_init() + raise convert_error(err) + + handle._finish_init() + + +cdef __tcp_bind(UVStream handle, system.sockaddr* addr, unsigned int flags): + cdef int err + err = uv.uv_tcp_bind(<uv.uv_tcp_t *>handle._handle, + addr, flags) + if err < 0: + exc = convert_error(err) + raise exc + + +cdef __tcp_open(UVStream handle, int sockfd): + cdef int err + err = uv.uv_tcp_open(<uv.uv_tcp_t *>handle._handle, + <uv.uv_os_sock_t>sockfd) + if err < 0: + exc = convert_error(err) + raise exc + + +cdef __tcp_get_socket(UVSocketHandle handle): + cdef: + int buf_len = sizeof(system.sockaddr_storage) + int fileno + int err + system.sockaddr_storage buf + + fileno = handle._fileno() + + err = uv.uv_tcp_getsockname(<uv.uv_tcp_t*>handle._handle, + <system.sockaddr*>&buf, + &buf_len) + if err < 0: + raise convert_error(err) + + return PseudoSocket(buf.ss_family, uv.SOCK_STREAM, 0, fileno) + + +@cython.no_gc_clear +cdef class TCPServer(UVStreamServer): + + @staticmethod + cdef TCPServer new(Loop loop, object protocol_factory, Server server, + unsigned int flags, + object backlog, + object ssl, + object ssl_handshake_timeout, + object ssl_shutdown_timeout): + + cdef TCPServer handle + handle = TCPServer.__new__(TCPServer) + handle._init(loop, protocol_factory, server, backlog, + ssl, ssl_handshake_timeout, ssl_shutdown_timeout) + __tcp_init_uv_handle(<UVStream>handle, loop, flags) + return handle + + cdef _new_socket(self): + return __tcp_get_socket(<UVSocketHandle>self) + + cdef _open(self, int sockfd): + self._ensure_alive() + try: + __tcp_open(<UVStream>self, sockfd) + except Exception as exc: + self._fatal_error(exc, True) + else: + self._mark_as_open() + + cdef bind(self, system.sockaddr* addr, unsigned int flags=0): + self._ensure_alive() + try: + __tcp_bind(<UVStream>self, addr, flags) + except Exception as exc: + self._fatal_error(exc, True) + else: + self._mark_as_open() + + cdef UVStream _make_new_transport(self, object protocol, object waiter, + object context): + cdef TCPTransport tr + tr = TCPTransport.new(self._loop, protocol, self._server, waiter, + context) + return <UVStream>tr + + +@cython.no_gc_clear +cdef class TCPTransport(UVStream): + + @staticmethod + cdef TCPTransport new(Loop loop, object protocol, Server server, + object waiter, object context): + + cdef TCPTransport handle + handle = TCPTransport.__new__(TCPTransport) + handle._init(loop, protocol, server, waiter, context) + __tcp_init_uv_handle(<UVStream>handle, loop, uv.AF_UNSPEC) + handle.__peername_set = 0 + handle.__sockname_set = 0 + handle._set_nodelay() + return handle + + cdef _set_nodelay(self): + cdef int err + self._ensure_alive() + err = uv.uv_tcp_nodelay(<uv.uv_tcp_t*>self._handle, 1) + if err < 0: + raise convert_error(err) + + cdef _call_connection_made(self): + # asyncio saves peername & sockname when transports are instantiated, + # so that they're accessible even after the transport is closed. + # We are doing the same thing here, except that we create Python + # objects lazily, on request in get_extra_info() + + cdef: + int err + int buf_len + + buf_len = sizeof(system.sockaddr_storage) + err = uv.uv_tcp_getsockname(<uv.uv_tcp_t*>self._handle, + <system.sockaddr*>&self.__sockname, + &buf_len) + if err >= 0: + # Ignore errors, this is an optional thing. + # If something serious is going on, the transport + # will crash later (in roughly the same way how + # an asyncio transport would.) + self.__sockname_set = 1 + + buf_len = sizeof(system.sockaddr_storage) + err = uv.uv_tcp_getpeername(<uv.uv_tcp_t*>self._handle, + <system.sockaddr*>&self.__peername, + &buf_len) + if err >= 0: + # Same as few lines above -- we don't really care + # about error case here. + self.__peername_set = 1 + + UVBaseTransport._call_connection_made(self) + + def get_extra_info(self, name, default=None): + if name == 'sockname': + if self.__sockname_set: + return __convert_sockaddr_to_pyaddr( + <system.sockaddr*>&self.__sockname) + elif name == 'peername': + if self.__peername_set: + return __convert_sockaddr_to_pyaddr( + <system.sockaddr*>&self.__peername) + return super().get_extra_info(name, default) + + cdef _new_socket(self): + return __tcp_get_socket(<UVSocketHandle>self) + + cdef bind(self, system.sockaddr* addr, unsigned int flags=0): + self._ensure_alive() + __tcp_bind(<UVStream>self, addr, flags) + + cdef _open(self, int sockfd): + self._ensure_alive() + __tcp_open(<UVStream>self, sockfd) + + cdef connect(self, system.sockaddr* addr): + cdef _TCPConnectRequest req + req = _TCPConnectRequest(self._loop, self) + req.connect(addr) + + +cdef class _TCPConnectRequest(UVRequest): + cdef: + TCPTransport transport + uv.uv_connect_t _req_data + + def __cinit__(self, loop, transport): + self.request = <uv.uv_req_t*>&self._req_data + self.request.data = <void*>self + self.transport = transport + + cdef connect(self, system.sockaddr* addr): + cdef int err + err = uv.uv_tcp_connect(<uv.uv_connect_t*>self.request, + <uv.uv_tcp_t*>self.transport._handle, + addr, + __tcp_connect_callback) + if err < 0: + exc = convert_error(err) + self.on_done() + raise exc + + +cdef void __tcp_connect_callback( + uv.uv_connect_t* req, + int status, +) noexcept with gil: + cdef: + _TCPConnectRequest wrapper + TCPTransport transport + + wrapper = <_TCPConnectRequest> req.data + transport = wrapper.transport + + if status < 0: + exc = convert_error(status) + else: + exc = None + + try: + transport._on_connect(exc) + except BaseException as ex: + wrapper.transport._fatal_error(ex, False) + finally: + wrapper.on_done() diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/timer.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/timer.pxd new file mode 100644 index 0000000..fda23b6 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/timer.pxd @@ -0,0 +1,18 @@ +cdef class UVTimer(UVHandle): + cdef: + method_t callback + object ctx + bint running + uint64_t timeout + uint64_t start_t + + cdef _init(self, Loop loop, method_t callback, object ctx, + uint64_t timeout) + + cdef stop(self) + cdef start(self) + cdef get_when(self) + + @staticmethod + cdef UVTimer new(Loop loop, method_t callback, object ctx, + uint64_t timeout) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/timer.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/timer.pyx new file mode 100644 index 0000000..86d46ef --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/timer.pyx @@ -0,0 +1,89 @@ +@cython.no_gc_clear +cdef class UVTimer(UVHandle): + cdef _init(self, Loop loop, method_t callback, object ctx, + uint64_t timeout): + + cdef int err + + self._start_init(loop) + + self._handle = <uv.uv_handle_t*> PyMem_RawMalloc(sizeof(uv.uv_timer_t)) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + err = uv.uv_timer_init(self._loop.uvloop, <uv.uv_timer_t*>self._handle) + if err < 0: + self._abort_init() + raise convert_error(err) + + self._finish_init() + + self.callback = callback + self.ctx = ctx + self.running = 0 + self.timeout = timeout + self.start_t = 0 + + cdef stop(self): + cdef int err + + if not self._is_alive(): + self.running = 0 + return + + if self.running == 1: + err = uv.uv_timer_stop(<uv.uv_timer_t*>self._handle) + self.running = 0 + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef start(self): + cdef int err + + self._ensure_alive() + + if self.running == 0: + # Update libuv internal time. + uv.uv_update_time(self._loop.uvloop) # void + self.start_t = uv.uv_now(self._loop.uvloop) + + err = uv.uv_timer_start(<uv.uv_timer_t*>self._handle, + __uvtimer_callback, + self.timeout, 0) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + self.running = 1 + + cdef get_when(self): + return self.start_t + self.timeout + + @staticmethod + cdef UVTimer new(Loop loop, method_t callback, object ctx, + uint64_t timeout): + + cdef UVTimer handle + handle = UVTimer.__new__(UVTimer) + handle._init(loop, callback, ctx, timeout) + return handle + + +cdef void __uvtimer_callback( + uv.uv_timer_t* handle, +) noexcept with gil: + if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVTimer callback") == 0: + return + + cdef: + UVTimer timer = <UVTimer> handle.data + method_t cb = timer.callback + + timer.running = 0 + try: + cb(timer.ctx) + except BaseException as ex: + timer._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/udp.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pxd new file mode 100644 index 0000000..daa9a1b --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pxd @@ -0,0 +1,22 @@ +cdef class UDPTransport(UVBaseTransport): + cdef: + bint __receiving + int _family + object _address + + cdef _init(self, Loop loop, unsigned int family) + cdef _set_address(self, system.addrinfo *addr) + + cdef _connect(self, system.sockaddr* addr, size_t addr_len) + + cdef _bind(self, system.sockaddr* addr) + cdef open(self, int family, int sockfd) + cdef _set_broadcast(self, bint on) + + cdef inline __receiving_started(self) + cdef inline __receiving_stopped(self) + + cdef _send(self, object data, object addr) + + cdef _on_receive(self, bytes data, object exc, object addr) + cdef _on_sent(self, object exc, object context=*) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx new file mode 100644 index 0000000..bbe60d5 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx @@ -0,0 +1,409 @@ +@cython.no_gc_clear +@cython.freelist(DEFAULT_FREELIST_SIZE) +cdef class _UDPSendContext: + # used to hold additional write request information for uv_write + + cdef: + uv.uv_udp_send_t req + + uv.uv_buf_t uv_buf + Py_buffer py_buf + + UDPTransport udp + + bint closed + + cdef close(self): + if self.closed: + return + + self.closed = 1 + PyBuffer_Release(&self.py_buf) # void + self.req.data = NULL + self.uv_buf.base = NULL + Py_DECREF(self) + self.udp = None + + @staticmethod + cdef _UDPSendContext new(UDPTransport udp, object data): + cdef _UDPSendContext ctx + ctx = _UDPSendContext.__new__(_UDPSendContext) + ctx.udp = None + ctx.closed = 1 + + ctx.req.data = <void*> ctx + Py_INCREF(ctx) + + PyObject_GetBuffer(data, &ctx.py_buf, PyBUF_SIMPLE) + ctx.uv_buf.base = <char*>ctx.py_buf.buf + ctx.uv_buf.len = ctx.py_buf.len + ctx.udp = udp + + ctx.closed = 0 + return ctx + + def __dealloc__(self): + if UVLOOP_DEBUG: + if not self.closed: + raise RuntimeError( + 'open _UDPSendContext is being deallocated') + self.udp = None + + +@cython.no_gc_clear +cdef class UDPTransport(UVBaseTransport): + def __cinit__(self): + self._family = uv.AF_UNSPEC + self.__receiving = 0 + self._address = None + self.context = Context_CopyCurrent() + + cdef _init(self, Loop loop, unsigned int family): + cdef int err + + self._start_init(loop) + + self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_udp_t)) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + err = uv.uv_udp_init_ex(loop.uvloop, + <uv.uv_udp_t*>self._handle, + family) + if err < 0: + self._abort_init() + raise convert_error(err) + + if family in (uv.AF_INET, uv.AF_INET6): + self._family = family + + self._finish_init() + + cdef _set_address(self, system.addrinfo *addr): + self._address = __convert_sockaddr_to_pyaddr(addr.ai_addr) + + cdef _connect(self, system.sockaddr* addr, size_t addr_len): + cdef int err + err = uv.uv_udp_connect(<uv.uv_udp_t*>self._handle, addr) + if err < 0: + exc = convert_error(err) + raise exc + + cdef open(self, int family, int sockfd): + if family in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): + self._family = family + else: + raise ValueError( + 'cannot open a UDP handle, invalid family {}'.format(family)) + + cdef int err + err = uv.uv_udp_open(<uv.uv_udp_t*>self._handle, + <uv.uv_os_sock_t>sockfd) + + if err < 0: + exc = convert_error(err) + raise exc + + cdef _bind(self, system.sockaddr* addr): + cdef: + int err + int flags = 0 + + self._ensure_alive() + + err = uv.uv_udp_bind(<uv.uv_udp_t*>self._handle, addr, flags) + if err < 0: + exc = convert_error(err) + raise exc + + cdef _set_broadcast(self, bint on): + cdef int err + + self._ensure_alive() + + err = uv.uv_udp_set_broadcast(<uv.uv_udp_t*>self._handle, on) + if err < 0: + exc = convert_error(err) + raise exc + + cdef size_t _get_write_buffer_size(self): + if self._handle is NULL: + return 0 + return (<uv.uv_udp_t*>self._handle).send_queue_size + + cdef bint _is_reading(self): + return self.__receiving + + cdef _start_reading(self): + cdef int err + + if self.__receiving: + return + + self._ensure_alive() + + err = uv.uv_udp_recv_start(<uv.uv_udp_t*>self._handle, + __loop_alloc_buffer, + __uv_udp_on_receive) + + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + else: + # UDPTransport must live until the read callback is called + self.__receiving_started() + + cdef _stop_reading(self): + cdef int err + + if not self.__receiving: + return + + self._ensure_alive() + + err = uv.uv_udp_recv_stop(<uv.uv_udp_t*>self._handle) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + else: + self.__receiving_stopped() + + cdef inline __receiving_started(self): + if self.__receiving: + return + self.__receiving = 1 + Py_INCREF(self) + + cdef inline __receiving_stopped(self): + if not self.__receiving: + return + self.__receiving = 0 + Py_DECREF(self) + + cdef _new_socket(self): + if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): + raise RuntimeError( + 'UDPTransport.family is undefined; ' + 'cannot create python socket') + + fileno = self._fileno() + return PseudoSocket(self._family, uv.SOCK_DGRAM, 0, fileno) + + cdef _send(self, object data, object addr): + cdef: + _UDPSendContext ctx + system.sockaddr_storage saddr_st + system.sockaddr *saddr + Py_buffer try_pybuf + uv.uv_buf_t try_uvbuf + + self._ensure_alive() + + if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): + raise RuntimeError('UDPTransport.family is undefined; cannot send') + + if addr is None: + saddr = NULL + else: + try: + __convert_pyaddr_to_sockaddr(self._family, addr, + <system.sockaddr*>&saddr_st) + except (ValueError, TypeError): + raise + except Exception: + raise ValueError( + f'{addr!r}: socket family mismatch or ' + f'a DNS lookup is required') + saddr = <system.sockaddr*>(&saddr_st) + + if self._get_write_buffer_size() == 0: + PyObject_GetBuffer(data, &try_pybuf, PyBUF_SIMPLE) + try_uvbuf.base = <char*>try_pybuf.buf + try_uvbuf.len = try_pybuf.len + err = uv.uv_udp_try_send(<uv.uv_udp_t*>self._handle, + &try_uvbuf, + 1, + saddr) + PyBuffer_Release(&try_pybuf) + else: + err = uv.UV_EAGAIN + + if err == uv.UV_EAGAIN: + ctx = _UDPSendContext.new(self, data) + err = uv.uv_udp_send(&ctx.req, + <uv.uv_udp_t*>self._handle, + &ctx.uv_buf, + 1, + saddr, + __uv_udp_on_send) + + if err < 0: + ctx.close() + + exc = convert_error(err) + self._fatal_error(exc, True) + else: + self._maybe_pause_protocol() + + else: + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + else: + self._on_sent(None, self.context.copy()) + + cdef _on_receive(self, bytes data, object exc, object addr): + if exc is None: + run_in_context2( + self.context, self._protocol.datagram_received, data, addr, + ) + else: + run_in_context1(self.context, self._protocol.error_received, exc) + + cdef _on_sent(self, object exc, object context=None): + if exc is not None: + if isinstance(exc, OSError): + if context is None: + context = self.context + run_in_context1(context, self._protocol.error_received, exc) + else: + self._fatal_error( + exc, False, 'Fatal write error on datagram transport') + + self._maybe_resume_protocol() + if not self._get_write_buffer_size(): + if self._closing: + self._schedule_call_connection_lost(None) + + # === Public API === + + def sendto(self, data, addr=None): + if not data: + # Replicating asyncio logic here. + return + + if self._address: + if addr not in (None, self._address): + # Replicating asyncio logic here. + raise ValueError( + 'Invalid address: must be None or %s' % (self._address,)) + + # Instead of setting addr to self._address below like what asyncio + # does, we depend on previous uv_udp_connect() to set the address + addr = None + + if self._conn_lost: + # Replicating asyncio logic here. + if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES: + aio_logger.warning('socket.send() raised exception.') + self._conn_lost += 1 + return + + self._send(data, addr) + + +cdef void __uv_udp_on_receive( + uv.uv_udp_t* handle, + ssize_t nread, + const uv.uv_buf_t* buf, + const system.sockaddr* addr, + unsigned flags +) noexcept with gil: + + if __ensure_handle_data(<uv.uv_handle_t*>handle, + "UDPTransport receive callback") == 0: + return + + cdef: + UDPTransport udp = <UDPTransport>handle.data + Loop loop = udp._loop + bytes data + object pyaddr + + # It's OK to free the buffer early, since nothing will + # be able to touch it until this method is done. + __loop_free_buffer(loop) + + if udp._closed: + # The handle was closed, there is no reason to + # do any work now. + udp.__receiving_stopped() # Just in case. + return + + if addr is NULL and nread == 0: + # From libuv docs: + # addr: struct sockaddr* containing the address + # of the sender. Can be NULL. Valid for the duration + # of the callback only. + # [...] + # The receive callback will be called with + # nread == 0 and addr == NULL when there is + # nothing to read, and with nread == 0 and + # addr != NULL when an empty UDP packet is + # received. + return + + if addr is NULL: + pyaddr = None + elif addr.sa_family == uv.AF_UNSPEC: + # https://github.com/MagicStack/uvloop/issues/304 + if system.PLATFORM_IS_LINUX: + pyaddr = None + else: + pyaddr = '' + else: + try: + pyaddr = __convert_sockaddr_to_pyaddr(addr) + except BaseException as exc: + udp._error(exc, False) + return + + if nread < 0: + exc = convert_error(nread) + udp._on_receive(None, exc, pyaddr) + return + + if nread == 0: + data = b'' + else: + data = loop._recv_buffer[:nread] + + try: + udp._on_receive(data, None, pyaddr) + except BaseException as exc: + udp._error(exc, False) + + +cdef void __uv_udp_on_send( + uv.uv_udp_send_t* req, + int status, +) noexcept with gil: + + if req.data is NULL: + # Shouldn't happen as: + # - _UDPSendContext does an extra INCREF in its 'init()' + # - _UDPSendContext holds a ref to the relevant UDPTransport + aio_logger.error( + 'UVStream.write callback called with NULL req.data, status=%r', + status) + return + + cdef: + _UDPSendContext ctx = <_UDPSendContext> req.data + UDPTransport udp = <UDPTransport>ctx.udp + + ctx.close() + + if status < 0: + exc = convert_error(status) + print(exc) + else: + exc = None + + try: + udp._on_sent(exc) + except BaseException as exc: + udp._error(exc, False) |