diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles')
28 files changed, 4579 insertions, 0 deletions
| diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/async_.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/async_.pxd new file mode 100644 index 0000000..5f0d820 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/async_.pxd @@ -0,0 +1,11 @@ +cdef class UVAsync(UVHandle): +    cdef: +        method_t callback +        object ctx + +    cdef _init(self, Loop loop, method_t callback, object ctx) + +    cdef send(self) + +    @staticmethod +    cdef UVAsync new(Loop loop, method_t callback, object ctx) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/async_.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/async_.pyx new file mode 100644 index 0000000..5c740cf --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/async_.pyx @@ -0,0 +1,56 @@ +@cython.no_gc_clear +cdef class UVAsync(UVHandle): +    cdef _init(self, Loop loop, method_t callback, object ctx): +        cdef int err + +        self._start_init(loop) + +        self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_async_t)) +        if self._handle is NULL: +            self._abort_init() +            raise MemoryError() + +        err = uv.uv_async_init(self._loop.uvloop, +                               <uv.uv_async_t*>self._handle, +                               __uvasync_callback) +        if err < 0: +            self._abort_init() +            raise convert_error(err) + +        self._finish_init() + +        self.callback = callback +        self.ctx = ctx + +    cdef send(self): +        cdef int err + +        self._ensure_alive() + +        err = uv.uv_async_send(<uv.uv_async_t*>self._handle) +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return + +    @staticmethod +    cdef UVAsync new(Loop loop, method_t callback, object ctx): +        cdef UVAsync handle +        handle = UVAsync.__new__(UVAsync) +        handle._init(loop, callback, ctx) +        return handle + + +cdef void __uvasync_callback( +    uv.uv_async_t* handle, +) noexcept with gil: +    if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVAsync callback") == 0: +        return + +    cdef: +        UVAsync async_ = <UVAsync> handle.data +        method_t cb = async_.callback +    try: +        cb(async_.ctx) +    except BaseException as ex: +        async_._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pxd new file mode 100644 index 0000000..ba356a7 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pxd @@ -0,0 +1,54 @@ +cdef class UVBaseTransport(UVSocketHandle): + +    cdef: +        readonly bint _closing + +        bint _protocol_connected +        bint _protocol_paused +        object _protocol_data_received +        size_t _high_water +        size_t _low_water + +        object _protocol +        Server _server +        object _waiter + +        dict _extra_info + +        uint32_t _conn_lost + +        object __weakref__ + +    # All "inline" methods are final + +    cdef inline _maybe_pause_protocol(self) +    cdef inline _maybe_resume_protocol(self) + +    cdef inline _schedule_call_connection_made(self) +    cdef inline _schedule_call_connection_lost(self, exc) + +    cdef _wakeup_waiter(self) +    cdef _call_connection_made(self) +    cdef _call_connection_lost(self, exc) + +    # Overloads of UVHandle methods: +    cdef _fatal_error(self, exc, throw, reason=?) +    cdef _close(self) + +    cdef inline _set_server(self, Server server) +    cdef inline _set_waiter(self, object waiter) + +    cdef _set_protocol(self, object protocol) +    cdef _clear_protocol(self) + +    cdef inline _init_protocol(self) +    cdef inline _add_extra_info(self, str name, object obj) + +    # === overloads === + +    cdef _new_socket(self) +    cdef size_t _get_write_buffer_size(self) + +    cdef bint _is_reading(self) +    cdef _start_reading(self) +    cdef _stop_reading(self) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx new file mode 100644 index 0000000..28b3079 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx @@ -0,0 +1,293 @@ +cdef class UVBaseTransport(UVSocketHandle): + +    def __cinit__(self): +        # Flow control +        self._high_water = FLOW_CONTROL_HIGH_WATER * 1024 +        self._low_water = FLOW_CONTROL_HIGH_WATER // 4 + +        self._protocol = None +        self._protocol_connected = 0 +        self._protocol_paused = 0 +        self._protocol_data_received = None + +        self._server = None +        self._waiter = None +        self._extra_info = None + +        self._conn_lost = 0 + +        self._closing = 0 + +    cdef size_t _get_write_buffer_size(self): +        return 0 + +    cdef inline _schedule_call_connection_made(self): +        self._loop._call_soon_handle( +            new_MethodHandle(self._loop, +                             "UVTransport._call_connection_made", +                             <method_t>self._call_connection_made, +                             self.context, +                             self)) + +    cdef inline _schedule_call_connection_lost(self, exc): +        self._loop._call_soon_handle( +            new_MethodHandle1(self._loop, +                              "UVTransport._call_connection_lost", +                              <method1_t>self._call_connection_lost, +                              self.context, +                              self, exc)) + +    cdef _fatal_error(self, exc, throw, reason=None): +        # Overload UVHandle._fatal_error + +        self._force_close(exc) + +        if not isinstance(exc, OSError): + +            if throw or self._loop is None: +                raise exc + +            msg = f'Fatal error on transport {self.__class__.__name__}' +            if reason is not None: +                msg = f'{msg} ({reason})' + +            self._loop.call_exception_handler({ +                'message': msg, +                'exception': exc, +                'transport': self, +                'protocol': self._protocol, +            }) + +    cdef inline _maybe_pause_protocol(self): +        cdef: +            size_t size = self._get_write_buffer_size() + +        if size <= self._high_water: +            return + +        if not self._protocol_paused: +            self._protocol_paused = 1 +            try: +                # _maybe_pause_protocol() is always triggered from user-calls, +                # so we must copy the context to avoid entering context twice +                run_in_context( +                    self.context.copy(), self._protocol.pause_writing, +                ) +            except (KeyboardInterrupt, SystemExit): +                raise +            except BaseException as exc: +                self._loop.call_exception_handler({ +                    'message': 'protocol.pause_writing() failed', +                    'exception': exc, +                    'transport': self, +                    'protocol': self._protocol, +                }) + +    cdef inline _maybe_resume_protocol(self): +        cdef: +            size_t size = self._get_write_buffer_size() + +        if self._protocol_paused and size <= self._low_water: +            self._protocol_paused = 0 +            try: +                # We're copying the context to avoid entering context twice, +                # even though it's not always necessary to copy - it's easier +                # to copy here than passing down a copied context. +                run_in_context( +                    self.context.copy(), self._protocol.resume_writing, +                ) +            except (KeyboardInterrupt, SystemExit): +                raise +            except BaseException as exc: +                self._loop.call_exception_handler({ +                    'message': 'protocol.resume_writing() failed', +                    'exception': exc, +                    'transport': self, +                    'protocol': self._protocol, +                }) + +    cdef _wakeup_waiter(self): +        if self._waiter is not None: +            if not self._waiter.cancelled(): +                if not self._is_alive(): +                    self._waiter.set_exception( +                        RuntimeError( +                            'closed Transport handle and unset waiter')) +                else: +                    self._waiter.set_result(True) +            self._waiter = None + +    cdef _call_connection_made(self): +        if self._protocol is None: +            raise RuntimeError( +                'protocol is not set, cannot call connection_made()') + +        # We use `_is_alive()` and not `_closing`, because we call +        # `transport._close()` in `loop.create_connection()` if an +        # exception happens during `await waiter`. +        if not self._is_alive(): +            # A connection waiter can be cancelled between +            # 'await loop.create_connection()' and +            # `_schedule_call_connection_made` and +            # the actual `_call_connection_made`. +            self._wakeup_waiter() +            return + +        # Set _protocol_connected to 1 before calling "connection_made": +        # if transport is aborted or closed, "connection_lost" will +        # still be scheduled. +        self._protocol_connected = 1 + +        try: +            self._protocol.connection_made(self) +        except BaseException: +            self._wakeup_waiter() +            raise + +        if not self._is_alive(): +            # This might happen when "transport.abort()" is called +            # from "Protocol.connection_made". +            self._wakeup_waiter() +            return + +        self._start_reading() +        self._wakeup_waiter() + +    cdef _call_connection_lost(self, exc): +        if self._waiter is not None: +            if not self._waiter.done(): +                self._waiter.set_exception(exc) +            self._waiter = None + +        if self._closed: +            # The handle is closed -- likely, _call_connection_lost +            # was already called before. +            return + +        try: +            if self._protocol_connected: +                self._protocol.connection_lost(exc) +        finally: +            self._clear_protocol() + +            self._close() + +            server = self._server +            if server is not None: +                (<Server>server)._detach() +                self._server = None + +    cdef inline _set_server(self, Server server): +        self._server = server +        (<Server>server)._attach() + +    cdef inline _set_waiter(self, object waiter): +        if waiter is not None and not isfuture(waiter): +            raise TypeError( +                f'invalid waiter object {waiter!r}, expected asyncio.Future') + +        self._waiter = waiter + +    cdef _set_protocol(self, object protocol): +        self._protocol = protocol +        # Store a reference to the bound method directly +        try: +            self._protocol_data_received = protocol.data_received +        except AttributeError: +            pass + +    cdef _clear_protocol(self): +        self._protocol = None +        self._protocol_data_received = None + +    cdef inline _init_protocol(self): +        self._loop._track_transport(self) +        if self._protocol is None: +            raise RuntimeError('invalid _init_protocol call') +        self._schedule_call_connection_made() + +    cdef inline _add_extra_info(self, str name, object obj): +        if self._extra_info is None: +            self._extra_info = {} +        self._extra_info[name] = obj + +    cdef bint _is_reading(self): +        raise NotImplementedError + +    cdef _start_reading(self): +        raise NotImplementedError + +    cdef _stop_reading(self): +        raise NotImplementedError + +    # === Public API === + +    property _paused: +        # Used by SSLProto.  Might be removed in the future. +        def __get__(self): +            return bool(not self._is_reading()) + +    def get_protocol(self): +        return self._protocol + +    def set_protocol(self, protocol): +        self._set_protocol(protocol) +        if self._is_reading(): +            self._stop_reading() +            self._start_reading() + +    def _force_close(self, exc): +        # Used by SSLProto.  Might be removed in the future. +        if self._conn_lost or self._closed: +            return +        if not self._closing: +            self._closing = 1 +            self._stop_reading() +        self._conn_lost += 1 +        self._schedule_call_connection_lost(exc) + +    def abort(self): +        self._force_close(None) + +    def close(self): +        if self._closing or self._closed: +            return + +        self._closing = 1 +        self._stop_reading() + +        if not self._get_write_buffer_size(): +            # The write buffer is empty +            self._conn_lost += 1 +            self._schedule_call_connection_lost(None) + +    def is_closing(self): +        return self._closing + +    def get_write_buffer_size(self): +        return self._get_write_buffer_size() + +    def set_write_buffer_limits(self, high=None, low=None): +        self._ensure_alive() + +        self._high_water, self._low_water = add_flowcontrol_defaults( +            high, low, FLOW_CONTROL_HIGH_WATER) + +        self._maybe_pause_protocol() + +    def get_write_buffer_limits(self): +        return (self._low_water, self._high_water) + +    def get_extra_info(self, name, default=None): +        if self._extra_info is not None and name in self._extra_info: +            return self._extra_info[name] +        if name == 'socket': +            return self._get_socket() +        if name == 'sockname': +            return self._get_socket().getsockname() +        if name == 'peername': +            try: +                return self._get_socket().getpeername() +            except socket_error: +                return default +        return default diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/check.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/check.pxd new file mode 100644 index 0000000..86cfd8f --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/check.pxd @@ -0,0 +1,14 @@ +cdef class UVCheck(UVHandle): +    cdef: +        Handle h +        bint running + +    # All "inline" methods are final + +    cdef _init(self, Loop loop, Handle h) + +    cdef inline stop(self) +    cdef inline start(self) + +    @staticmethod +    cdef UVCheck new(Loop loop, Handle h) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/check.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/check.pyx new file mode 100644 index 0000000..1a61c4e --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/check.pyx @@ -0,0 +1,72 @@ +@cython.no_gc_clear +cdef class UVCheck(UVHandle): +    cdef _init(self, Loop loop, Handle h): +        cdef int err + +        self._start_init(loop) + +        self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_check_t)) +        if self._handle is NULL: +            self._abort_init() +            raise MemoryError() + +        err = uv.uv_check_init(self._loop.uvloop, <uv.uv_check_t*>self._handle) +        if err < 0: +            self._abort_init() +            raise convert_error(err) + +        self._finish_init() + +        self.h = h +        self.running = 0 + +    cdef inline stop(self): +        cdef int err + +        if not self._is_alive(): +            self.running = 0 +            return + +        if self.running == 1: +            err = uv.uv_check_stop(<uv.uv_check_t*>self._handle) +            self.running = 0 +            if err < 0: +                exc = convert_error(err) +                self._fatal_error(exc, True) +                return + +    cdef inline start(self): +        cdef int err + +        self._ensure_alive() + +        if self.running == 0: +            err = uv.uv_check_start(<uv.uv_check_t*>self._handle, +                                    cb_check_callback) +            if err < 0: +                exc = convert_error(err) +                self._fatal_error(exc, True) +                return +            self.running = 1 + +    @staticmethod +    cdef UVCheck new(Loop loop, Handle h): +        cdef UVCheck handle +        handle = UVCheck.__new__(UVCheck) +        handle._init(loop, h) +        return handle + + +cdef void cb_check_callback( +    uv.uv_check_t* handle, +) noexcept with gil: +    if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVCheck callback") == 0: +        return + +    cdef: +        UVCheck check = <UVCheck> handle.data +        Handle h = check.h +    try: +        h._run() +    except BaseException as ex: +        check._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pxd new file mode 100644 index 0000000..3a32428 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pxd @@ -0,0 +1,12 @@ +cdef class UVFSEvent(UVHandle): +    cdef: +        object callback +        bint running + +    cdef _init(self, Loop loop, object callback, object context) +    cdef _close(self) +    cdef start(self, char* path, int flags) +    cdef stop(self) + +    @staticmethod +    cdef UVFSEvent new(Loop loop, object callback, object context) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pyx new file mode 100644 index 0000000..6ed6433 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/fsevent.pyx @@ -0,0 +1,116 @@ +import enum + + +class FileSystemEvent(enum.IntEnum): +    RENAME = uv.UV_RENAME +    CHANGE = uv.UV_CHANGE +    RENAME_CHANGE = RENAME | CHANGE + + +@cython.no_gc_clear +cdef class UVFSEvent(UVHandle): +    cdef _init(self, Loop loop, object callback, object context): +        cdef int err + +        self._start_init(loop) + +        self._handle = <uv.uv_handle_t*>PyMem_RawMalloc( +            sizeof(uv.uv_fs_event_t) +        ) +        if self._handle is NULL: +            self._abort_init() +            raise MemoryError() + +        err = uv.uv_fs_event_init( +            self._loop.uvloop, <uv.uv_fs_event_t*>self._handle +        ) +        if err < 0: +            self._abort_init() +            raise convert_error(err) + +        self._finish_init() + +        self.running = 0 +        self.callback = callback +        if context is None: +            context = Context_CopyCurrent() +        self.context = context + +    cdef start(self, char* path, int flags): +        cdef int err + +        self._ensure_alive() + +        if self.running == 0: +            err = uv.uv_fs_event_start( +                <uv.uv_fs_event_t*>self._handle, +                __uvfsevent_callback, +                path, +                flags, +            ) +            if err < 0: +                exc = convert_error(err) +                self._fatal_error(exc, True) +                return +            self.running = 1 + +    cdef stop(self): +        cdef int err + +        if not self._is_alive(): +            self.running = 0 +            return + +        if self.running == 1: +            err = uv.uv_fs_event_stop(<uv.uv_fs_event_t*>self._handle) +            self.running = 0 +            if err < 0: +                exc = convert_error(err) +                self._fatal_error(exc, True) +                return + +    cdef _close(self): +        try: +            self.stop() +        finally: +            UVHandle._close(<UVHandle>self) + +    def cancel(self): +        self._close() + +    def cancelled(self): +        return self.running == 0 + +    @staticmethod +    cdef UVFSEvent new(Loop loop, object callback, object context): +        cdef UVFSEvent handle +        handle = UVFSEvent.__new__(UVFSEvent) +        handle._init(loop, callback, context) +        return handle + + +cdef void __uvfsevent_callback( +    uv.uv_fs_event_t* handle, +    const char *filename, +    int events, +    int status, +) noexcept with gil: +    if __ensure_handle_data( +        <uv.uv_handle_t*>handle, "UVFSEvent callback" +    ) == 0: +        return + +    cdef: +        UVFSEvent fs_event = <UVFSEvent> handle.data +        Handle h + +    try: +        h = new_Handle( +            fs_event._loop, +            fs_event.callback, +            (filename, FileSystemEvent(events)), +            fs_event.context, +        ) +        h._run() +    except BaseException as ex: +        fs_event._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/handle.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/handle.pxd new file mode 100644 index 0000000..5af1c14 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/handle.pxd @@ -0,0 +1,48 @@ +cdef class UVHandle: +    cdef: +        uv.uv_handle_t *_handle +        Loop _loop +        readonly _source_traceback +        bint _closed +        bint _inited +        object context + +        # Added to enable current UDPTransport implementation, +        # which doesn't use libuv handles. +        bint _has_handle + +    # All "inline" methods are final + +    cdef inline _start_init(self, Loop loop) +    cdef inline _abort_init(self) +    cdef inline _finish_init(self) + +    cdef inline bint _is_alive(self) +    cdef inline _ensure_alive(self) + +    cdef _error(self, exc, throw) +    cdef _fatal_error(self, exc, throw, reason=?) + +    cdef _warn_unclosed(self) + +    cdef _free(self) +    cdef _close(self) + + +cdef class UVSocketHandle(UVHandle): +    cdef: +        # Points to a Python file-object that should be closed +        # when the transport is closing.  Used by pipes.  This +        # should probably be refactored somehow. +        object _fileobj +        object __cached_socket + +    # All "inline" methods are final + +    cdef _fileno(self) + +    cdef _new_socket(self) +    cdef inline _get_socket(self) +    cdef inline _attach_fileobj(self, object file) + +    cdef _open(self, int sockfd) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/handle.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/handle.pyx new file mode 100644 index 0000000..6efe375 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/handle.pyx @@ -0,0 +1,395 @@ +cdef class UVHandle: +    """A base class for all libuv handles. + +    Automatically manages memory deallocation and closing. + +    Important: + +       1. call "_ensure_alive()" before calling any libuv functions on +          your handles. + +       2. call "__ensure_handle_data" in *all* libuv handle callbacks. +    """ + +    def __cinit__(self): +        self._closed = 0 +        self._inited = 0 +        self._has_handle = 1 +        self._handle = NULL +        self._loop = None +        self._source_traceback = None + +    def __init__(self): +        raise TypeError( +            '{} is not supposed to be instantiated from Python'.format( +                self.__class__.__name__)) + +    def __dealloc__(self): +        if UVLOOP_DEBUG: +            if self._loop is not None: +                if self._inited: +                    self._loop._debug_handles_current.subtract([ +                        self.__class__.__name__]) +            else: +                # No "@cython.no_gc_clear" decorator on this UVHandle +                raise RuntimeError( +                    '{} without @no_gc_clear; loop was set to None by GC' +                    .format(self.__class__.__name__)) + +        if self._handle is NULL: +            return + +        # -> When we're at this point, something is wrong <- + +        if self._handle.loop is NULL: +            # The handle wasn't initialized with "uv_{handle}_init" +            self._closed = 1 +            self._free() +            raise RuntimeError( +                '{} is open in __dealloc__ with loop set to NULL' +                .format(self.__class__.__name__)) + +        if self._closed: +            # So _handle is not NULL and self._closed == 1? +            raise RuntimeError( +                '{}.__dealloc__: _handle is NULL, _closed == 1'.format( +                    self.__class__.__name__)) + +        # The handle is dealloced while open.  Let's try to close it. +        # Situations when this is possible include unhandled exceptions, +        # errors during Handle.__cinit__/__init__ etc. +        if self._inited: +            self._handle.data = NULL +            uv.uv_close(self._handle, __uv_close_handle_cb)  # void; no errors +            self._handle = NULL +            self._warn_unclosed() +        else: +            # The handle was allocated, but not initialized +            self._closed = 1 +            self._free() + +    cdef _free(self): +        if self._handle == NULL: +            return + +        if UVLOOP_DEBUG and self._inited: +            self._loop._debug_uv_handles_freed += 1 + +        PyMem_RawFree(self._handle) +        self._handle = NULL + +    cdef _warn_unclosed(self): +        if self._source_traceback is not None: +            try: +                tb = ''.join(tb_format_list(self._source_traceback)) +                tb = 'object created at (most recent call last):\n{}'.format( +                    tb.rstrip()) +            except Exception as ex: +                msg = ( +                    'unclosed resource {!r}; could not serialize ' +                    'debug traceback: {}: {}' +                ).format(self, type(ex).__name__, ex) +            else: +                msg = 'unclosed resource {!r}; {}'.format(self, tb) +        else: +            msg = 'unclosed resource {!r}'.format(self) +        warnings_warn(msg, ResourceWarning) + +    cdef inline _abort_init(self): +        if self._handle is not NULL: +            self._free() + +        try: +            if UVLOOP_DEBUG: +                name = self.__class__.__name__ +                if self._inited: +                    raise RuntimeError( +                        '_abort_init: {}._inited is set'.format(name)) +                if self._closed: +                    raise RuntimeError( +                        '_abort_init: {}._closed is set'.format(name)) +        finally: +            self._closed = 1 + +    cdef inline _finish_init(self): +        self._inited = 1 +        if self._has_handle == 1: +            self._handle.data = <void*>self +        if self._loop._debug: +            self._source_traceback = extract_stack() +        if UVLOOP_DEBUG: +            cls_name = self.__class__.__name__ +            self._loop._debug_uv_handles_total += 1 +            self._loop._debug_handles_total.update([cls_name]) +            self._loop._debug_handles_current.update([cls_name]) + +    cdef inline _start_init(self, Loop loop): +        if UVLOOP_DEBUG: +            if self._loop is not None: +                raise RuntimeError( +                    '{}._start_init can only be called once'.format( +                        self.__class__.__name__)) + +        self._loop = loop + +    cdef inline bint _is_alive(self): +        cdef bint res +        res = self._closed != 1 and self._inited == 1 +        if UVLOOP_DEBUG: +            if res and self._has_handle == 1: +                name = self.__class__.__name__ +                if self._handle is NULL: +                    raise RuntimeError( +                        '{} is alive, but _handle is NULL'.format(name)) +                if self._loop is None: +                    raise RuntimeError( +                        '{} is alive, but _loop is None'.format(name)) +                if self._handle.loop is not self._loop.uvloop: +                    raise RuntimeError( +                        '{} is alive, but _handle.loop is not ' +                        'initialized'.format(name)) +                if self._handle.data is not <void*>self: +                    raise RuntimeError( +                        '{} is alive, but _handle.data is not ' +                        'initialized'.format(name)) +        return res + +    cdef inline _ensure_alive(self): +        if not self._is_alive(): +            raise RuntimeError( +                'unable to perform operation on {!r}; ' +                'the handler is closed'.format(self)) + +    cdef _fatal_error(self, exc, throw, reason=None): +        # Fatal error means an error that was returned by the +        # underlying libuv handle function.  We usually can't +        # recover from that, hence we just close the handle. +        self._close() + +        if throw or self._loop is None: +            raise exc +        else: +            self._loop._handle_exception(exc) + +    cdef _error(self, exc, throw): +        # A non-fatal error is usually an error that was caught +        # by the handler, but was originated in the client code +        # (not in libuv).  In this case we either want to simply +        # raise or log it. +        if throw or self._loop is None: +            raise exc +        else: +            self._loop._handle_exception(exc) + +    cdef _close(self): +        if self._closed == 1: +            return + +        self._closed = 1 + +        if self._handle is NULL: +            return + +        if UVLOOP_DEBUG: +            if self._handle.data is NULL: +                raise RuntimeError( +                    '{}._close: _handle.data is NULL'.format( +                        self.__class__.__name__)) + +            if <object>self._handle.data is not self: +                raise RuntimeError( +                    '{}._close: _handle.data is not UVHandle/self'.format( +                        self.__class__.__name__)) + +            if uv.uv_is_closing(self._handle): +                raise RuntimeError( +                    '{}._close: uv_is_closing() is true'.format( +                        self.__class__.__name__)) + +        # We want the handle wrapper (UVHandle) to stay alive until +        # the closing callback fires. +        Py_INCREF(self) +        uv.uv_close(self._handle, __uv_close_handle_cb)  # void; no errors + +    def __repr__(self): +        return '<{} closed={} {:#x}>'.format( +            self.__class__.__name__, +            self._closed, +            id(self)) + + +cdef class UVSocketHandle(UVHandle): + +    def __cinit__(self): +        self._fileobj = None +        self.__cached_socket = None + +    cdef _fileno(self): +        cdef: +            int fd +            int err + +        self._ensure_alive() +        err = uv.uv_fileno(self._handle, <uv.uv_os_fd_t*>&fd) +        if err < 0: +            raise convert_error(err) + +        return fd + +    cdef _new_socket(self): +        raise NotImplementedError + +    cdef inline _get_socket(self): +        if self.__cached_socket is not None: +            return self.__cached_socket + +        if not self._is_alive(): +            return None + +        self.__cached_socket = self._new_socket() +        if UVLOOP_DEBUG: +            # We don't "dup" for the "__cached_socket". +            assert self.__cached_socket.fileno() == self._fileno() +        return self.__cached_socket + +    cdef inline _attach_fileobj(self, object file): +        # When we create a TCP/PIPE/etc connection/server based on +        # a Python file object, we need to close the file object when +        # the uv handle is closed. +        socket_inc_io_ref(file) +        self._fileobj = file + +    cdef _close(self): +        if self.__cached_socket is not None: +            (<PseudoSocket>self.__cached_socket)._fd = -1 + +        UVHandle._close(self) + +        try: +            # This code will only run for transports created from +            # Python sockets, i.e. with `loop.create_server(sock=sock)` etc. +            if self._fileobj is not None: +                if isinstance(self._fileobj, socket_socket): +                    # Detaching the socket object is the ideal solution: +                    # * libuv will actually close the FD; +                    # * detach() call will reset FD for the Python socket +                    #   object, which means that it won't be closed 2nd time +                    #   when the socket object is GCed. +                    # +                    # No need to call `socket_dec_io_ref()`, as +                    # `socket.detach()` ignores `socket._io_refs`. +                    self._fileobj.detach() +                else: +                    try: +                        # `socket.close()` will raise an EBADF because libuv +                        # has already closed the underlying FD. +                        self._fileobj.close() +                    except OSError as ex: +                        if ex.errno != errno_EBADF: +                            raise +        except Exception as ex: +            self._loop.call_exception_handler({ +                'exception': ex, +                'transport': self, +                'message': f'could not close attached file object ' +                           f'{self._fileobj!r}', +            }) +        finally: +            self._fileobj = None + +    cdef _open(self, int sockfd): +        raise NotImplementedError + + +cdef inline bint __ensure_handle_data(uv.uv_handle_t* handle, +                                      const char* handle_ctx): + +    cdef Loop loop + +    if UVLOOP_DEBUG: +        if handle.loop is NULL: +            raise RuntimeError( +                'handle.loop is NULL in __ensure_handle_data') + +        if handle.loop.data is NULL: +            raise RuntimeError( +                'handle.loop.data is NULL in __ensure_handle_data') + +    if handle.data is NULL: +        loop = <Loop>handle.loop.data +        loop.call_exception_handler({ +            'message': '{} called with handle.data == NULL'.format( +                handle_ctx.decode('latin-1')) +        }) +        return 0 + +    if handle.data is NULL: +        # The underlying UVHandle object was GCed with an open uv_handle_t. +        loop = <Loop>handle.loop.data +        loop.call_exception_handler({ +            'message': '{} called after destroying the UVHandle'.format( +                handle_ctx.decode('latin-1')) +        }) +        return 0 + +    return 1 + + +cdef void __uv_close_handle_cb(uv.uv_handle_t* handle) noexcept with gil: +    cdef UVHandle h + +    if handle.data is NULL: +        # The original UVHandle is long dead. Just free the mem of +        # the uv_handle_t* handler. + +        if UVLOOP_DEBUG: +            if handle.loop == NULL or handle.loop.data == NULL: +                raise RuntimeError( +                    '__uv_close_handle_cb: handle.loop is invalid') +            (<Loop>handle.loop.data)._debug_uv_handles_freed += 1 + +        PyMem_RawFree(handle) +    else: +        h = <UVHandle>handle.data +        try: +            if UVLOOP_DEBUG: +                if not h._has_handle: +                    raise RuntimeError( +                        'has_handle=0 in __uv_close_handle_cb') +                h._loop._debug_handles_closed.update([ +                    h.__class__.__name__]) +            h._free() +        finally: +            Py_DECREF(h)  # Was INCREFed in UVHandle._close + + +cdef void __close_all_handles(Loop loop): +    uv.uv_walk(loop.uvloop, +               __uv_walk_close_all_handles_cb, +               <void*>loop)  # void + + +cdef void __uv_walk_close_all_handles_cb( +    uv.uv_handle_t* handle, +    void* arg, +) noexcept with gil: + +    cdef: +        Loop loop = <Loop>arg +        UVHandle h + +    if uv.uv_is_closing(handle): +        # The handle is closed or is closing. +        return + +    if handle.data is NULL: +        # This shouldn't happen. Ever. +        loop.call_exception_handler({ +            'message': 'handle.data is NULL in __close_all_handles_cb' +        }) +        return + +    h = <UVHandle>handle.data +    if not h._closed: +        h._warn_unclosed() +        h._close() diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/idle.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/idle.pxd new file mode 100644 index 0000000..cf7b19f --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/idle.pxd @@ -0,0 +1,14 @@ +cdef class UVIdle(UVHandle): +    cdef: +        Handle h +        bint running + +    # All "inline" methods are final + +    cdef _init(self, Loop loop, Handle h) + +    cdef inline stop(self) +    cdef inline start(self) + +    @staticmethod +    cdef UVIdle new(Loop loop, Handle h) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/idle.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/idle.pyx new file mode 100644 index 0000000..91c641f --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/idle.pyx @@ -0,0 +1,72 @@ +@cython.no_gc_clear +cdef class UVIdle(UVHandle): +    cdef _init(self, Loop loop, Handle h): +        cdef int err + +        self._start_init(loop) + +        self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_idle_t)) +        if self._handle is NULL: +            self._abort_init() +            raise MemoryError() + +        err = uv.uv_idle_init(self._loop.uvloop, <uv.uv_idle_t*>self._handle) +        if err < 0: +            self._abort_init() +            raise convert_error(err) + +        self._finish_init() + +        self.h = h +        self.running = 0 + +    cdef inline stop(self): +        cdef int err + +        if not self._is_alive(): +            self.running = 0 +            return + +        if self.running == 1: +            err = uv.uv_idle_stop(<uv.uv_idle_t*>self._handle) +            self.running = 0 +            if err < 0: +                exc = convert_error(err) +                self._fatal_error(exc, True) +                return + +    cdef inline start(self): +        cdef int err + +        self._ensure_alive() + +        if self.running == 0: +            err = uv.uv_idle_start(<uv.uv_idle_t*>self._handle, +                                   cb_idle_callback) +            if err < 0: +                exc = convert_error(err) +                self._fatal_error(exc, True) +                return +            self.running = 1 + +    @staticmethod +    cdef UVIdle new(Loop loop, Handle h): +        cdef UVIdle handle +        handle = UVIdle.__new__(UVIdle) +        handle._init(loop, h) +        return handle + + +cdef void cb_idle_callback( +    uv.uv_idle_t* handle, +) noexcept with gil: +    if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVIdle callback") == 0: +        return + +    cdef: +        UVIdle idle = <UVIdle> handle.data +        Handle h = idle.h +    try: +        h._run() +    except BaseException as ex: +        idle._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pxd new file mode 100644 index 0000000..56fc265 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pxd @@ -0,0 +1,33 @@ +cdef class UnixServer(UVStreamServer): + +    cdef bind(self, str path) + +    @staticmethod +    cdef UnixServer new(Loop loop, object protocol_factory, Server server, +                        object backlog, +                        object ssl, +                        object ssl_handshake_timeout, +                        object ssl_shutdown_timeout) + + +cdef class UnixTransport(UVStream): + +    @staticmethod +    cdef UnixTransport new(Loop loop, object protocol, Server server, +                           object waiter, object context) + +    cdef connect(self, char* addr) + + +cdef class ReadUnixTransport(UVStream): + +    @staticmethod +    cdef ReadUnixTransport new(Loop loop, object protocol, Server server, +                               object waiter) + + +cdef class WriteUnixTransport(UVStream): + +    @staticmethod +    cdef WriteUnixTransport new(Loop loop, object protocol, Server server, +                                object waiter) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx new file mode 100644 index 0000000..195576c --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/pipe.pyx @@ -0,0 +1,226 @@ +cdef __pipe_init_uv_handle(UVStream handle, Loop loop): +    cdef int err + +    handle._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_pipe_t)) +    if handle._handle is NULL: +        handle._abort_init() +        raise MemoryError() + +    # Initialize pipe handle with ipc=0. +    # ipc=1 means that libuv will use recvmsg/sendmsg +    # instead of recv/send. +    err = uv.uv_pipe_init(handle._loop.uvloop, +                          <uv.uv_pipe_t*>handle._handle, +                          0) +    # UV_HANDLE_READABLE allows calling uv_read_start() on this pipe +    # even if it is O_WRONLY, see also #317, libuv/libuv#2058 +    handle._handle.flags |= uv.UV_INTERNAL_HANDLE_READABLE +    if err < 0: +        handle._abort_init() +        raise convert_error(err) + +    handle._finish_init() + + +cdef __pipe_open(UVStream handle, int fd): +    cdef int err +    err = uv.uv_pipe_open(<uv.uv_pipe_t *>handle._handle, +                          <uv.uv_file>fd) +    if err < 0: +        exc = convert_error(err) +        raise exc + + +cdef __pipe_get_socket(UVSocketHandle handle): +    fileno = handle._fileno() +    return PseudoSocket(uv.AF_UNIX, uv.SOCK_STREAM, 0, fileno) + + +@cython.no_gc_clear +cdef class UnixServer(UVStreamServer): + +    @staticmethod +    cdef UnixServer new(Loop loop, object protocol_factory, Server server, +                        object backlog, +                        object ssl, +                        object ssl_handshake_timeout, +                        object ssl_shutdown_timeout): + +        cdef UnixServer handle +        handle = UnixServer.__new__(UnixServer) +        handle._init(loop, protocol_factory, server, backlog, +                     ssl, ssl_handshake_timeout, ssl_shutdown_timeout) +        __pipe_init_uv_handle(<UVStream>handle, loop) +        return handle + +    cdef _new_socket(self): +        return __pipe_get_socket(<UVSocketHandle>self) + +    cdef _open(self, int sockfd): +        self._ensure_alive() +        __pipe_open(<UVStream>self, sockfd) +        self._mark_as_open() + +    cdef bind(self, str path): +        cdef int err +        self._ensure_alive() +        err = uv.uv_pipe_bind(<uv.uv_pipe_t *>self._handle, +                              path.encode()) +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return + +        self._mark_as_open() + +    cdef UVStream _make_new_transport(self, object protocol, object waiter, +                                      object context): +        cdef UnixTransport tr +        tr = UnixTransport.new(self._loop, protocol, self._server, waiter, +                               context) +        return <UVStream>tr + + +@cython.no_gc_clear +cdef class UnixTransport(UVStream): + +    @staticmethod +    cdef UnixTransport new(Loop loop, object protocol, Server server, +                           object waiter, object context): + +        cdef UnixTransport handle +        handle = UnixTransport.__new__(UnixTransport) +        handle._init(loop, protocol, server, waiter, context) +        __pipe_init_uv_handle(<UVStream>handle, loop) +        return handle + +    cdef _new_socket(self): +        return __pipe_get_socket(<UVSocketHandle>self) + +    cdef _open(self, int sockfd): +        __pipe_open(<UVStream>self, sockfd) + +    cdef connect(self, char* addr): +        cdef _PipeConnectRequest req +        req = _PipeConnectRequest(self._loop, self) +        req.connect(addr) + + +@cython.no_gc_clear +cdef class ReadUnixTransport(UVStream): + +    @staticmethod +    cdef ReadUnixTransport new(Loop loop, object protocol, Server server, +                               object waiter): +        cdef ReadUnixTransport handle +        handle = ReadUnixTransport.__new__(ReadUnixTransport) +        # This is only used in connect_read_pipe() and subprocess_shell/exec() +        # directly, we could simply copy the current context. +        handle._init(loop, protocol, server, waiter, Context_CopyCurrent()) +        __pipe_init_uv_handle(<UVStream>handle, loop) +        return handle + +    cdef _new_socket(self): +        return __pipe_get_socket(<UVSocketHandle>self) + +    cdef _open(self, int sockfd): +        __pipe_open(<UVStream>self, sockfd) + +    def get_write_buffer_limits(self): +        raise NotImplementedError + +    def set_write_buffer_limits(self, high=None, low=None): +        raise NotImplementedError + +    def get_write_buffer_size(self): +        raise NotImplementedError + +    def write(self, data): +        raise NotImplementedError + +    def writelines(self, list_of_data): +        raise NotImplementedError + +    def write_eof(self): +        raise NotImplementedError + +    def can_write_eof(self): +        raise NotImplementedError + +    def abort(self): +        raise NotImplementedError + + +@cython.no_gc_clear +cdef class WriteUnixTransport(UVStream): + +    @staticmethod +    cdef WriteUnixTransport new(Loop loop, object protocol, Server server, +                                object waiter): +        cdef WriteUnixTransport handle +        handle = WriteUnixTransport.__new__(WriteUnixTransport) + +        # We listen for read events on write-end of the pipe. When +        # the read-end is close, the uv_stream_t.read callback will +        # receive an error -- we want to silence that error, and just +        # close the transport. +        handle._close_on_read_error() + +        # This is only used in connect_write_pipe() and subprocess_shell/exec() +        # directly, we could simply copy the current context. +        handle._init(loop, protocol, server, waiter, Context_CopyCurrent()) +        __pipe_init_uv_handle(<UVStream>handle, loop) +        return handle + +    cdef _new_socket(self): +        return __pipe_get_socket(<UVSocketHandle>self) + +    cdef _open(self, int sockfd): +        __pipe_open(<UVStream>self, sockfd) + +    def pause_reading(self): +        raise NotImplementedError + +    def resume_reading(self): +        raise NotImplementedError + + +cdef class _PipeConnectRequest(UVRequest): +    cdef: +        UnixTransport transport +        uv.uv_connect_t _req_data + +    def __cinit__(self, loop, transport): +        self.request = <uv.uv_req_t*> &self._req_data +        self.request.data = <void*>self +        self.transport = transport + +    cdef connect(self, char* addr): +        # uv_pipe_connect returns void +        uv.uv_pipe_connect(<uv.uv_connect_t*>self.request, +                           <uv.uv_pipe_t*>self.transport._handle, +                           addr, +                           __pipe_connect_callback) + +cdef void __pipe_connect_callback( +    uv.uv_connect_t* req, +    int status, +) noexcept with gil: +    cdef: +        _PipeConnectRequest wrapper +        UnixTransport transport + +    wrapper = <_PipeConnectRequest> req.data +    transport = wrapper.transport + +    if status < 0: +        exc = convert_error(status) +    else: +        exc = None + +    try: +        transport._on_connect(exc) +    except BaseException as ex: +        wrapper.transport._fatal_error(ex, False) +    finally: +        wrapper.on_done() diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/poll.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/poll.pxd new file mode 100644 index 0000000..d07030b --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/poll.pxd @@ -0,0 +1,25 @@ +cdef class UVPoll(UVHandle): +    cdef: +        int fd +        Handle reading_handle +        Handle writing_handle + +    cdef _init(self, Loop loop, int fd) +    cdef _close(self) + +    cdef inline _poll_start(self, int flags) +    cdef inline _poll_stop(self) + +    cdef int is_active(self) + +    cdef is_reading(self) +    cdef is_writing(self) + +    cdef start_reading(self, Handle callback) +    cdef start_writing(self, Handle callback) +    cdef stop_reading(self) +    cdef stop_writing(self) +    cdef stop(self) + +    @staticmethod +    cdef UVPoll new(Loop loop, int fd) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/poll.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/poll.pyx new file mode 100644 index 0000000..fca5981 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/poll.pyx @@ -0,0 +1,233 @@ +@cython.no_gc_clear +cdef class UVPoll(UVHandle): +    cdef _init(self, Loop loop, int fd): +        cdef int err + +        self._start_init(loop) + +        self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_poll_t)) +        if self._handle is NULL: +            self._abort_init() +            raise MemoryError() + +        err = uv.uv_poll_init(self._loop.uvloop, +                              <uv.uv_poll_t *>self._handle, fd) +        if err < 0: +            self._abort_init() +            raise convert_error(err) + +        self._finish_init() + +        self.fd = fd +        self.reading_handle = None +        self.writing_handle = None + +    @staticmethod +    cdef UVPoll new(Loop loop, int fd): +        cdef UVPoll handle +        handle = UVPoll.__new__(UVPoll) +        handle._init(loop, fd) +        return handle + +    cdef int is_active(self): +        return (self.reading_handle is not None or +                self.writing_handle is not None) + +    cdef inline _poll_start(self, int flags): +        cdef int err + +        self._ensure_alive() + +        err = uv.uv_poll_start( +            <uv.uv_poll_t*>self._handle, +            flags, +            __on_uvpoll_event) + +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return + +    cdef inline _poll_stop(self): +        cdef int err + +        if not self._is_alive(): +            return + +        err = uv.uv_poll_stop(<uv.uv_poll_t*>self._handle) +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return + +        cdef: +            int backend_id +            system.epoll_event dummy_event + +        if system.PLATFORM_IS_LINUX: +            # libuv doesn't remove the FD from epoll immediately +            # after uv_poll_stop or uv_poll_close, causing hard +            # to debug issue with dup-ed file descriptors causing +            # CPU burn in epoll/epoll_ctl: +            #    https://github.com/MagicStack/uvloop/issues/61 +            # +            # It's safe though to manually call epoll_ctl here, +            # after calling uv_poll_stop. + +            backend_id = uv.uv_backend_fd(self._loop.uvloop) +            if backend_id != -1: +                memset(&dummy_event, 0, sizeof(dummy_event)) +                system.epoll_ctl( +                    backend_id, +                    system.EPOLL_CTL_DEL, +                    self.fd, +                    &dummy_event)  # ignore errors + +    cdef is_reading(self): +        return self._is_alive() and self.reading_handle is not None + +    cdef is_writing(self): +        return self._is_alive() and self.writing_handle is not None + +    cdef start_reading(self, Handle callback): +        cdef: +            int mask = 0 + +        if self.reading_handle is None: +            # not reading right now, setup the handle + +            mask = uv.UV_READABLE +            if self.writing_handle is not None: +                # are we writing right now? +                mask |= uv.UV_WRITABLE + +            self._poll_start(mask) +        else: +            self.reading_handle._cancel() + +        self.reading_handle = callback + +    cdef start_writing(self, Handle callback): +        cdef: +            int mask = 0 + +        if self.writing_handle is None: +            # not writing right now, setup the handle + +            mask = uv.UV_WRITABLE +            if self.reading_handle is not None: +                # are we reading right now? +                mask |= uv.UV_READABLE + +            self._poll_start(mask) +        else: +            self.writing_handle._cancel() + +        self.writing_handle = callback + +    cdef stop_reading(self): +        if self.reading_handle is None: +            return False + +        self.reading_handle._cancel() +        self.reading_handle = None + +        if self.writing_handle is None: +            self.stop() +        else: +            self._poll_start(uv.UV_WRITABLE) + +        return True + +    cdef stop_writing(self): +        if self.writing_handle is None: +            return False + +        self.writing_handle._cancel() +        self.writing_handle = None + +        if self.reading_handle is None: +            self.stop() +        else: +            self._poll_start(uv.UV_READABLE) + +        return True + +    cdef stop(self): +        if self.reading_handle is not None: +            self.reading_handle._cancel() +            self.reading_handle = None + +        if self.writing_handle is not None: +            self.writing_handle._cancel() +            self.writing_handle = None + +        self._poll_stop() + +    cdef _close(self): +        if self.is_active(): +            self.stop() + +        UVHandle._close(<UVHandle>self) + +    cdef _fatal_error(self, exc, throw, reason=None): +        try: +            if self.reading_handle is not None: +                try: +                    self.reading_handle._run() +                except BaseException as ex: +                    self._loop._handle_exception(ex) +                self.reading_handle = None + +            if self.writing_handle is not None: +                try: +                    self.writing_handle._run() +                except BaseException as ex: +                    self._loop._handle_exception(ex) +                self.writing_handle = None + +        finally: +            self._close() + + +cdef void __on_uvpoll_event( +    uv.uv_poll_t* handle, +    int status, +    int events, +) noexcept with gil: + +    if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVPoll callback") == 0: +        return + +    cdef: +        UVPoll poll = <UVPoll> handle.data + +    if status < 0: +        exc = convert_error(status) +        poll._fatal_error(exc, False) +        return + +    if ((events & (uv.UV_READABLE | uv.UV_DISCONNECT)) and +            poll.reading_handle is not None): + +        try: +            if UVLOOP_DEBUG: +                poll._loop._poll_read_events_total += 1 +            poll.reading_handle._run() +        except BaseException as ex: +            if UVLOOP_DEBUG: +                poll._loop._poll_read_cb_errors_total += 1 +            poll._error(ex, False) +            # continue code execution + +    if ((events & (uv.UV_WRITABLE | uv.UV_DISCONNECT)) and +            poll.writing_handle is not None): + +        try: +            if UVLOOP_DEBUG: +                poll._loop._poll_write_events_total += 1 +            poll.writing_handle._run() +        except BaseException as ex: +            if UVLOOP_DEBUG: +                poll._loop._poll_write_cb_errors_total += 1 +            poll._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/process.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/process.pxd new file mode 100644 index 0000000..970abcf --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/process.pxd @@ -0,0 +1,80 @@ +cdef class UVProcess(UVHandle): +    cdef: +        object _returncode +        object _pid + +        object _errpipe_read +        object _errpipe_write +        object _preexec_fn +        bint _restore_signals + +        list _fds_to_close + +        # Attributes used to compose uv_process_options_t: +        uv.uv_process_options_t options +        uv.uv_stdio_container_t[3] iocnt +        list __env +        char **uv_opt_env +        list __args +        char **uv_opt_args +        char *uv_opt_file +        bytes __cwd + +    cdef _close_process_handle(self) + +    cdef _init(self, Loop loop, list args, dict env, cwd, +               start_new_session, +               _stdin, _stdout, _stderr, pass_fds, +               debug_flags, preexec_fn, restore_signals) + +    cdef _after_fork(self) + +    cdef char** __to_cstring_array(self, list arr) +    cdef _init_args(self, list args) +    cdef _init_env(self, dict env) +    cdef _init_files(self, _stdin, _stdout, _stderr) +    cdef _init_options(self, list args, dict env, cwd, start_new_session, +                       _stdin, _stdout, _stderr, bint force_fork) + +    cdef _close_after_spawn(self, int fd) + +    cdef _on_exit(self, int64_t exit_status, int term_signal) +    cdef _kill(self, int signum) + + +cdef class UVProcessTransport(UVProcess): +    cdef: +        list _exit_waiters +        list _init_futs +        bint _stdio_ready +        list _pending_calls +        object _protocol +        bint _finished + +        WriteUnixTransport _stdin +        ReadUnixTransport _stdout +        ReadUnixTransport _stderr + +        object stdin_proto +        object stdout_proto +        object stderr_proto + +    cdef _file_redirect_stdio(self, int fd) +    cdef _file_devnull(self) +    cdef _file_inpipe(self) +    cdef _file_outpipe(self) + +    cdef _check_proc(self) +    cdef _pipe_connection_lost(self, int fd, exc) +    cdef _pipe_data_received(self, int fd, data) + +    cdef _call_connection_made(self, waiter) +    cdef _try_finish(self) + +    @staticmethod +    cdef UVProcessTransport new(Loop loop, protocol, args, env, cwd, +                                start_new_session, +                                _stdin, _stdout, _stderr, pass_fds, +                                waiter, +                                debug_flags, +                                preexec_fn, restore_signals) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx new file mode 100644 index 0000000..63b982a --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/process.pyx @@ -0,0 +1,792 @@ +@cython.no_gc_clear +cdef class UVProcess(UVHandle): +    """Abstract class; wrapper over uv_process_t handle.""" + +    def __cinit__(self): +        self.uv_opt_env = NULL +        self.uv_opt_args = NULL +        self._returncode = None +        self._pid = None +        self._fds_to_close = list() +        self._preexec_fn = None +        self._restore_signals = True +        self.context = Context_CopyCurrent() + +    cdef _close_process_handle(self): +        # XXX: This is a workaround for a libuv bug: +        # - https://github.com/libuv/libuv/issues/1933 +        # - https://github.com/libuv/libuv/pull/551 +        if self._handle is NULL: +            return +        self._handle.data = NULL +        uv.uv_close(self._handle, __uv_close_process_handle_cb) +        self._handle = NULL  # close callback will free() the memory + +    cdef _init(self, Loop loop, list args, dict env, +               cwd, start_new_session, +               _stdin, _stdout, _stderr,  # std* can be defined as macros in C +               pass_fds, debug_flags, preexec_fn, restore_signals): + +        global __forking +        global __forking_loop +        global __forkHandler + +        cdef int err + +        self._start_init(loop) + +        self._handle = <uv.uv_handle_t*>PyMem_RawMalloc( +            sizeof(uv.uv_process_t)) +        if self._handle is NULL: +            self._abort_init() +            raise MemoryError() + +        # Too early to call _finish_init, but still a lot of work to do. +        # Let's set handle.data to NULL, so in case something goes wrong, +        # callbacks have a chance to avoid casting *something* into UVHandle. +        self._handle.data = NULL + +        force_fork = False +        if system.PLATFORM_IS_APPLE and not ( +            preexec_fn is None +            and not pass_fds +        ): +            # see _execute_child() in CPython/subprocess.py +            force_fork = True + +        try: +            self._init_options(args, env, cwd, start_new_session, +                               _stdin, _stdout, _stderr, force_fork) + +            restore_inheritable = set() +            if pass_fds: +                for fd in pass_fds: +                    if not os_get_inheritable(fd): +                        restore_inheritable.add(fd) +                        os_set_inheritable(fd, True) +        except Exception: +            self._abort_init() +            raise + +        if __forking or loop.active_process_handler is not None: +            # Our pthread_atfork handlers won't work correctly when +            # another loop is forking in another thread (even though +            # GIL should help us to avoid that.) +            self._abort_init() +            raise RuntimeError( +                'Racing with another loop to spawn a process.') + +        self._errpipe_read, self._errpipe_write = os_pipe() +        fds_to_close = self._fds_to_close +        self._fds_to_close = None +        fds_to_close.append(self._errpipe_read) +        # add the write pipe last so we can close it early +        fds_to_close.append(self._errpipe_write) +        try: +            os_set_inheritable(self._errpipe_write, True) + +            self._preexec_fn = preexec_fn +            self._restore_signals = restore_signals + +            loop.active_process_handler = self +            __forking = 1 +            __forking_loop = loop +            system.setForkHandler(<system.OnForkHandler>&__get_fork_handler) + +            PyOS_BeforeFork() + +            err = uv.uv_spawn(loop.uvloop, +                              <uv.uv_process_t*>self._handle, +                              &self.options) + +            __forking = 0 +            __forking_loop = None +            system.resetForkHandler() +            loop.active_process_handler = None + +            PyOS_AfterFork_Parent() + +            if err < 0: +                self._close_process_handle() +                self._abort_init() +                raise convert_error(err) + +            self._finish_init() + +            # close the write pipe early +            os_close(fds_to_close.pop()) + +            if preexec_fn is not None: +                errpipe_data = bytearray() +                while True: +                    # XXX: This is a blocking code that has to be +                    # rewritten (using loop.connect_read_pipe() or +                    # otherwise.) +                    part = os_read(self._errpipe_read, 50000) +                    errpipe_data += part +                    if not part or len(errpipe_data) > 50000: +                        break + +        finally: +            while fds_to_close: +                os_close(fds_to_close.pop()) + +            for fd in restore_inheritable: +                os_set_inheritable(fd, False) + +        # asyncio caches the PID in BaseSubprocessTransport, +        # so that the transport knows what the PID was even +        # after the process is finished. +        self._pid = (<uv.uv_process_t*>self._handle).pid + +        # Track the process handle (create a strong ref to it) +        # to guarantee that __dealloc__ doesn't happen in an +        # uncontrolled fashion.  We want to wait until the process +        # exits and libuv calls __uvprocess_on_exit_callback, +        # which will call `UVProcess._close()`, which will, in turn, +        # untrack this handle. +        self._loop._track_process(self) + +        if debug_flags & __PROCESS_DEBUG_SLEEP_AFTER_FORK: +            time_sleep(1) + +        if preexec_fn is not None and errpipe_data: +            # preexec_fn has raised an exception.  The child +            # process must be dead now. +            try: +                exc_name, exc_msg = errpipe_data.split(b':', 1) +                exc_name = exc_name.decode() +                exc_msg = exc_msg.decode() +            except Exception: +                self._close() +                raise subprocess_SubprocessError( +                    'Bad exception data from child: {!r}'.format( +                        errpipe_data)) +            exc_cls = getattr(__builtins__, exc_name, +                              subprocess_SubprocessError) + +            exc = subprocess_SubprocessError( +                'Exception occurred in preexec_fn.') +            exc.__cause__ = exc_cls(exc_msg) +            self._close() +            raise exc + +    cdef _after_fork(self): +        # See CPython/_posixsubprocess.c for details +        cdef int err + +        if self._restore_signals: +            _Py_RestoreSignals() + +        PyOS_AfterFork_Child() + +        err = uv.uv_loop_fork(self._loop.uvloop) +        if err < 0: +            raise convert_error(err) + +        if self._preexec_fn is not None: +            try: +                gc_disable() +                self._preexec_fn() +            except BaseException as ex: +                try: +                    with open(self._errpipe_write, 'wb') as f: +                        f.write(str(ex.__class__.__name__).encode()) +                        f.write(b':') +                        f.write(str(ex.args[0]).encode()) +                finally: +                    system._exit(255) +                    return +            else: +                os_close(self._errpipe_write) +        else: +            os_close(self._errpipe_write) + +    cdef _close_after_spawn(self, int fd): +        if self._fds_to_close is None: +            raise RuntimeError( +                'UVProcess._close_after_spawn called after uv_spawn') +        self._fds_to_close.append(fd) + +    def __dealloc__(self): +        if self.uv_opt_env is not NULL: +            PyMem_RawFree(self.uv_opt_env) +            self.uv_opt_env = NULL + +        if self.uv_opt_args is not NULL: +            PyMem_RawFree(self.uv_opt_args) +            self.uv_opt_args = NULL + +    cdef char** __to_cstring_array(self, list arr): +        cdef: +            int i +            ssize_t arr_len = len(arr) +            bytes el + +            char **ret + +        ret = <char **>PyMem_RawMalloc((arr_len + 1) * sizeof(char *)) +        if ret is NULL: +            raise MemoryError() + +        for i in range(arr_len): +            el = arr[i] +            # NB: PyBytes_AsString doesn't copy the data; +            # we have to be careful when the "arr" is GCed, +            # and it shouldn't be ever mutated. +            ret[i] = PyBytes_AsString(el) + +        ret[arr_len] = NULL +        return ret + +    cdef _init_options(self, list args, dict env, cwd, start_new_session, +                       _stdin, _stdout, _stderr, bint force_fork): + +        memset(&self.options, 0, sizeof(uv.uv_process_options_t)) + +        self._init_env(env) +        self.options.env = self.uv_opt_env + +        self._init_args(args) +        self.options.file = self.uv_opt_file +        self.options.args = self.uv_opt_args + +        if start_new_session: +            self.options.flags |= uv.UV_PROCESS_DETACHED + +        if force_fork: +            # This is a hack to work around the change in libuv 1.44: +            #    > macos: use posix_spawn instead of fork +            # where Python subprocess options like preexec_fn are +            # crippled. CPython only uses posix_spawn under a pretty +            # strict list of conditions (see subprocess.py), and falls +            # back to using fork() otherwise. We'd like to simulate such +            # behavior with libuv, but unfortunately libuv doesn't +            # provide explicit API to choose such implementation detail. +            # Based on current (libuv 1.46) behavior, setting +            # UV_PROCESS_SETUID or UV_PROCESS_SETGID would reliably make +            # libuv fallback to use fork, so let's just use it for now. +            self.options.flags |= uv.UV_PROCESS_SETUID +            self.options.uid = uv.getuid() + +        if cwd is not None: +            cwd = os_fspath(cwd) + +            if isinstance(cwd, str): +                cwd = PyUnicode_EncodeFSDefault(cwd) +            if not isinstance(cwd, bytes): +                raise ValueError('cwd must be a str or bytes object') + +            self.__cwd = cwd +            self.options.cwd = PyBytes_AsString(self.__cwd) + +        self.options.exit_cb = &__uvprocess_on_exit_callback + +        self._init_files(_stdin, _stdout, _stderr) + +    cdef _init_args(self, list args): +        cdef: +            bytes path +            int an = len(args) + +        if an < 1: +            raise ValueError('cannot spawn a process: args are empty') + +        self.__args = args.copy() +        for i in range(an): +            arg = os_fspath(args[i]) +            if isinstance(arg, str): +                self.__args[i] = PyUnicode_EncodeFSDefault(arg) +            elif not isinstance(arg, bytes): +                raise TypeError('all args must be str or bytes') + +        path = self.__args[0] +        self.uv_opt_file = PyBytes_AsString(path) +        self.uv_opt_args = self.__to_cstring_array(self.__args) + +    cdef _init_env(self, dict env): +        if env is not None: +            self.__env = list() +            for key in env: +                val = env[key] + +                if isinstance(key, str): +                    key = PyUnicode_EncodeFSDefault(key) +                elif not isinstance(key, bytes): +                    raise TypeError( +                        'all environment vars must be bytes or str') + +                if isinstance(val, str): +                    val = PyUnicode_EncodeFSDefault(val) +                elif not isinstance(val, bytes): +                    raise TypeError( +                        'all environment values must be bytes or str') + +                self.__env.append(key + b'=' + val) + +            self.uv_opt_env = self.__to_cstring_array(self.__env) +        else: +            self.__env = None + +    cdef _init_files(self, _stdin, _stdout, _stderr): +        self.options.stdio_count = 0 + +    cdef _kill(self, int signum): +        cdef int err +        self._ensure_alive() +        err = uv.uv_process_kill(<uv.uv_process_t*>self._handle, signum) +        if err < 0: +            raise convert_error(err) + +    cdef _on_exit(self, int64_t exit_status, int term_signal): +        if term_signal: +            # From Python docs: +            #    A negative value -N indicates that the child was +            #    terminated by signal N (POSIX only). +            self._returncode = -term_signal +        else: +            self._returncode = exit_status + +        self._close() + +    cdef _close(self): +        try: +            if self._loop is not None: +                self._loop._untrack_process(self) +        finally: +            UVHandle._close(self) + + +DEF _CALL_PIPE_DATA_RECEIVED = 0 +DEF _CALL_PIPE_CONNECTION_LOST = 1 +DEF _CALL_PROCESS_EXITED = 2 +DEF _CALL_CONNECTION_LOST = 3 + + +@cython.no_gc_clear +cdef class UVProcessTransport(UVProcess): +    def __cinit__(self): +        self._exit_waiters = [] +        self._protocol = None + +        self._init_futs = [] +        self._pending_calls = [] +        self._stdio_ready = 0 + +        self._stdin = self._stdout = self._stderr = None +        self.stdin_proto = self.stdout_proto = self.stderr_proto = None + +        self._finished = 0 + +    cdef _on_exit(self, int64_t exit_status, int term_signal): +        UVProcess._on_exit(self, exit_status, term_signal) + +        if self._stdio_ready: +            self._loop.call_soon(self._protocol.process_exited, +                                 context=self.context) +        else: +            self._pending_calls.append((_CALL_PROCESS_EXITED, None, None)) + +        self._try_finish() + +        for waiter in self._exit_waiters: +            if not waiter.cancelled(): +                waiter.set_result(self._returncode) +        self._exit_waiters.clear() + +        self._close() + +    cdef _check_proc(self): +        if not self._is_alive() or self._returncode is not None: +            raise ProcessLookupError() + +    cdef _pipe_connection_lost(self, int fd, exc): +        if self._stdio_ready: +            self._loop.call_soon(self._protocol.pipe_connection_lost, fd, exc, +                                 context=self.context) +            self._try_finish() +        else: +            self._pending_calls.append((_CALL_PIPE_CONNECTION_LOST, fd, exc)) + +    cdef _pipe_data_received(self, int fd, data): +        if self._stdio_ready: +            self._loop.call_soon(self._protocol.pipe_data_received, fd, data, +                                 context=self.context) +        else: +            self._pending_calls.append((_CALL_PIPE_DATA_RECEIVED, fd, data)) + +    cdef _file_redirect_stdio(self, int fd): +        fd = os_dup(fd) +        os_set_inheritable(fd, True) +        self._close_after_spawn(fd) +        return fd + +    cdef _file_devnull(self): +        dn = os_open(os_devnull, os_O_RDWR) +        os_set_inheritable(dn, True) +        self._close_after_spawn(dn) +        return dn + +    cdef _file_outpipe(self): +        r, w = __socketpair() +        os_set_inheritable(w, True) +        self._close_after_spawn(w) +        return r, w + +    cdef _file_inpipe(self): +        r, w = __socketpair() +        os_set_inheritable(r, True) +        self._close_after_spawn(r) +        return r, w + +    cdef _init_files(self, _stdin, _stdout, _stderr): +        cdef uv.uv_stdio_container_t *iocnt + +        UVProcess._init_files(self, _stdin, _stdout, _stderr) + +        io = [None, None, None] + +        self.options.stdio_count = 3 +        self.options.stdio = self.iocnt + +        if _stdin is not None: +            if _stdin == subprocess_PIPE: +                r, w = self._file_inpipe() +                io[0] = r + +                self.stdin_proto = WriteSubprocessPipeProto(self, 0) +                waiter = self._loop._new_future() +                self._stdin = WriteUnixTransport.new( +                    self._loop, self.stdin_proto, None, waiter) +                self._init_futs.append(waiter) +                self._stdin._open(w) +                self._stdin._init_protocol() +            elif _stdin == subprocess_DEVNULL: +                io[0] = self._file_devnull() +            elif _stdout == subprocess_STDOUT: +                raise ValueError( +                    'subprocess.STDOUT is supported only by stderr parameter') +            else: +                io[0] = self._file_redirect_stdio(_stdin) +        else: +            io[0] = self._file_redirect_stdio(0) + +        if _stdout is not None: +            if _stdout == subprocess_PIPE: +                # We can't use UV_CREATE_PIPE here, since 'stderr' might be +                # set to 'subprocess.STDOUT', and there is no way to +                # emulate that functionality with libuv high-level +                # streams API. Therefore, we create pipes for stdout and +                # stderr manually. + +                r, w = self._file_outpipe() +                io[1] = w + +                self.stdout_proto = ReadSubprocessPipeProto(self, 1) +                waiter = self._loop._new_future() +                self._stdout = ReadUnixTransport.new( +                    self._loop, self.stdout_proto, None, waiter) +                self._init_futs.append(waiter) +                self._stdout._open(r) +                self._stdout._init_protocol() +            elif _stdout == subprocess_DEVNULL: +                io[1] = self._file_devnull() +            elif _stdout == subprocess_STDOUT: +                raise ValueError( +                    'subprocess.STDOUT is supported only by stderr parameter') +            else: +                io[1] = self._file_redirect_stdio(_stdout) +        else: +            io[1] = self._file_redirect_stdio(1) + +        if _stderr is not None: +            if _stderr == subprocess_PIPE: +                r, w = self._file_outpipe() +                io[2] = w + +                self.stderr_proto = ReadSubprocessPipeProto(self, 2) +                waiter = self._loop._new_future() +                self._stderr = ReadUnixTransport.new( +                    self._loop, self.stderr_proto, None, waiter) +                self._init_futs.append(waiter) +                self._stderr._open(r) +                self._stderr._init_protocol() +            elif _stderr == subprocess_STDOUT: +                if io[1] is None: +                    # shouldn't ever happen +                    raise RuntimeError('cannot apply subprocess.STDOUT') + +                io[2] = self._file_redirect_stdio(io[1]) +            elif _stderr == subprocess_DEVNULL: +                io[2] = self._file_devnull() +            else: +                io[2] = self._file_redirect_stdio(_stderr) +        else: +            io[2] = self._file_redirect_stdio(2) + +        assert len(io) == 3 +        for idx in range(3): +            iocnt = &self.iocnt[idx] +            if io[idx] is not None: +                iocnt.flags = uv.UV_INHERIT_FD +                iocnt.data.fd = io[idx] +            else: +                iocnt.flags = uv.UV_IGNORE + +    cdef _call_connection_made(self, waiter): +        try: +            # we're always called in the right context, so just call the user's +            self._protocol.connection_made(self) +        except (KeyboardInterrupt, SystemExit): +            raise +        except BaseException as ex: +            if waiter is not None and not waiter.cancelled(): +                waiter.set_exception(ex) +            else: +                raise +        else: +            if waiter is not None and not waiter.cancelled(): +                waiter.set_result(True) + +        self._stdio_ready = 1 +        if self._pending_calls: +            pending_calls = self._pending_calls.copy() +            self._pending_calls.clear() +            for (type, fd, arg) in pending_calls: +                if type == _CALL_PIPE_CONNECTION_LOST: +                    self._pipe_connection_lost(fd, arg) +                elif type == _CALL_PIPE_DATA_RECEIVED: +                    self._pipe_data_received(fd, arg) +                elif type == _CALL_PROCESS_EXITED: +                    self._loop.call_soon(self._protocol.process_exited) +                elif type == _CALL_CONNECTION_LOST: +                    self._loop.call_soon(self._protocol.connection_lost, None) + +    cdef _try_finish(self): +        if self._returncode is None or self._finished: +            return + +        if ((self.stdin_proto is None or self.stdin_proto.disconnected) and +                (self.stdout_proto is None or +                    self.stdout_proto.disconnected) and +                (self.stderr_proto is None or +                    self.stderr_proto.disconnected)): + +            self._finished = 1 + +            if self._stdio_ready: +                # copy self.context for simplicity +                self._loop.call_soon(self._protocol.connection_lost, None, +                                     context=self.context) +            else: +                self._pending_calls.append((_CALL_CONNECTION_LOST, None, None)) + +    def __stdio_inited(self, waiter, stdio_fut): +        exc = stdio_fut.exception() +        if exc is not None: +            if waiter is None: +                raise exc +            else: +                waiter.set_exception(exc) +        else: +            self._loop._call_soon_handle( +                new_MethodHandle1(self._loop, +                                  "UVProcessTransport._call_connection_made", +                                  <method1_t>self._call_connection_made, +                                  None,  # means to copy the current context +                                  self, waiter)) + +    @staticmethod +    cdef UVProcessTransport new(Loop loop, protocol, args, env, +                                cwd, start_new_session, +                                _stdin, _stdout, _stderr, pass_fds, +                                waiter, +                                debug_flags, +                                preexec_fn, +                                restore_signals): + +        cdef UVProcessTransport handle +        handle = UVProcessTransport.__new__(UVProcessTransport) +        handle._protocol = protocol +        handle._init(loop, args, env, cwd, start_new_session, +                     __process_convert_fileno(_stdin), +                     __process_convert_fileno(_stdout), +                     __process_convert_fileno(_stderr), +                     pass_fds, +                     debug_flags, +                     preexec_fn, +                     restore_signals) + +        if handle._init_futs: +            handle._stdio_ready = 0 +            init_fut = aio_gather(*handle._init_futs) +            # add_done_callback will copy the current context and run the +            # callback within the context +            init_fut.add_done_callback( +                ft_partial(handle.__stdio_inited, waiter)) +        else: +            handle._stdio_ready = 1 +            loop._call_soon_handle( +                new_MethodHandle1(loop, +                                  "UVProcessTransport._call_connection_made", +                                  <method1_t>handle._call_connection_made, +                                  None,  # means to copy the current context +                                  handle, waiter)) + +        return handle + +    def get_protocol(self): +        return self._protocol + +    def set_protocol(self, protocol): +        self._protocol = protocol + +    def get_pid(self): +        return self._pid + +    def get_returncode(self): +        return self._returncode + +    def get_pipe_transport(self, fd): +        if fd == 0: +            return self._stdin +        elif fd == 1: +            return self._stdout +        elif fd == 2: +            return self._stderr + +    def terminate(self): +        self._check_proc() +        self._kill(uv.SIGTERM) + +    def kill(self): +        self._check_proc() +        self._kill(uv.SIGKILL) + +    def send_signal(self, int signal): +        self._check_proc() +        self._kill(signal) + +    def is_closing(self): +        return self._closed + +    def close(self): +        if self._returncode is None: +            self._kill(uv.SIGKILL) + +        if self._stdin is not None: +            self._stdin.close() +        if self._stdout is not None: +            self._stdout.close() +        if self._stderr is not None: +            self._stderr.close() + +        if self._returncode is not None: +            # The process is dead, just close the UV handle. +            # +            # (If "self._returncode is None", the process should have been +            # killed already and we're just waiting for a SIGCHLD; after +            # which the transport will be GC'ed and the uvhandle will be +            # closed in UVHandle.__dealloc__.) +            self._close() + +    def get_extra_info(self, name, default=None): +        return default + +    def _wait(self): +        fut = self._loop._new_future() +        if self._returncode is not None: +            fut.set_result(self._returncode) +            return fut + +        self._exit_waiters.append(fut) +        return fut + + +class WriteSubprocessPipeProto(aio_BaseProtocol): + +    def __init__(self, proc, fd): +        if UVLOOP_DEBUG: +            if type(proc) is not UVProcessTransport: +                raise TypeError +            if not isinstance(fd, int): +                raise TypeError +        self.proc = proc +        self.fd = fd +        self.pipe = None +        self.disconnected = False + +    def connection_made(self, transport): +        self.pipe = transport + +    def __repr__(self): +        return ('<%s fd=%s pipe=%r>' +                % (self.__class__.__name__, self.fd, self.pipe)) + +    def connection_lost(self, exc): +        self.disconnected = True +        (<UVProcessTransport>self.proc)._pipe_connection_lost(self.fd, exc) +        self.proc = None + +    def pause_writing(self): +        (<UVProcessTransport>self.proc)._protocol.pause_writing() + +    def resume_writing(self): +        (<UVProcessTransport>self.proc)._protocol.resume_writing() + + +class ReadSubprocessPipeProto(WriteSubprocessPipeProto, +                              aio_Protocol): + +    def data_received(self, data): +        (<UVProcessTransport>self.proc)._pipe_data_received(self.fd, data) + + +cdef __process_convert_fileno(object obj): +    if obj is None or isinstance(obj, int): +        return obj + +    fileno = obj.fileno() +    if not isinstance(fileno, int): +        raise TypeError( +            '{!r}.fileno() returned non-integer'.format(obj)) +    return fileno + + +cdef void __uvprocess_on_exit_callback( +    uv.uv_process_t *handle, +    int64_t exit_status, +    int term_signal, +) noexcept with gil: + +    if __ensure_handle_data(<uv.uv_handle_t*>handle, +                            "UVProcess exit callback") == 0: +        return + +    cdef UVProcess proc = <UVProcess> handle.data +    try: +        proc._on_exit(exit_status, term_signal) +    except BaseException as ex: +        proc._error(ex, False) + + +cdef __socketpair(): +    cdef: +        int fds[2] +        int err + +    err = system.socketpair(uv.AF_UNIX, uv.SOCK_STREAM, 0, fds) +    if err: +        exc = convert_error(-err) +        raise exc + +    os_set_inheritable(fds[0], False) +    os_set_inheritable(fds[1], False) + +    return fds[0], fds[1] + + +cdef void __uv_close_process_handle_cb( +    uv.uv_handle_t* handle +) noexcept with gil: +    PyMem_RawFree(handle) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/stream.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pxd new file mode 100644 index 0000000..8ca8743 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pxd @@ -0,0 +1,50 @@ +cdef class UVStream(UVBaseTransport): +    cdef: +        uv.uv_shutdown_t _shutdown_req +        bint __shutting_down +        bint __reading +        bint __read_error_close + +        bint __buffered +        object _protocol_get_buffer +        object _protocol_buffer_updated + +        bint _eof +        list _buffer +        size_t _buffer_size + +        Py_buffer _read_pybuf +        bint _read_pybuf_acquired + +    # All "inline" methods are final + +    cdef inline _init(self, Loop loop, object protocol, Server server, +                      object waiter, object context) + + +    cdef inline _shutdown(self) +    cdef inline _accept(self, UVStream server) + +    cdef inline _close_on_read_error(self) + +    cdef inline __reading_started(self) +    cdef inline __reading_stopped(self) + +    # The user API write() and writelines() firstly call _buffer_write() to +    # buffer up user data chunks, potentially multiple times in writelines(), +    # and then call _initiate_write() to start writing either immediately or in +    # the next iteration (loop._queue_write()). +    cdef inline _buffer_write(self, object data) +    cdef inline _initiate_write(self) + +    # _exec_write() is the method that does the actual send, and _try_write() +    # is a fast-path used in _exec_write() to send a single chunk. +    cdef inline _exec_write(self) +    cdef inline _try_write(self, object data) + +    cdef _close(self) + +    cdef inline _on_accept(self) +    cdef inline _on_eof(self) +    cdef inline _on_write(self) +    cdef inline _on_connect(self, object exc) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx new file mode 100644 index 0000000..d4e02e3 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/stream.pyx @@ -0,0 +1,1015 @@ +DEF __PREALLOCED_BUFS = 4 + + +@cython.no_gc_clear +@cython.freelist(DEFAULT_FREELIST_SIZE) +cdef class _StreamWriteContext: +    # used to hold additional write request information for uv_write + +    cdef: +        uv.uv_write_t   req + +        list            buffers + +        uv.uv_buf_t     uv_bufs_sml[__PREALLOCED_BUFS] +        Py_buffer       py_bufs_sml[__PREALLOCED_BUFS] +        bint            py_bufs_sml_inuse + +        uv.uv_buf_t*    uv_bufs +        Py_buffer*      py_bufs +        size_t          py_bufs_len + +        uv.uv_buf_t*    uv_bufs_start +        size_t          uv_bufs_len + +        UVStream        stream + +        bint            closed + +    cdef free_bufs(self): +        cdef size_t i + +        if self.uv_bufs is not NULL: +            PyMem_RawFree(self.uv_bufs) +            self.uv_bufs = NULL +            if UVLOOP_DEBUG: +                if self.py_bufs_sml_inuse: +                    raise RuntimeError( +                        '_StreamWriteContext.close: uv_bufs != NULL and ' +                        'py_bufs_sml_inuse is True') + +        if self.py_bufs is not NULL: +            for i from 0 <= i < self.py_bufs_len: +                PyBuffer_Release(&self.py_bufs[i]) +            PyMem_RawFree(self.py_bufs) +            self.py_bufs = NULL +            if UVLOOP_DEBUG: +                if self.py_bufs_sml_inuse: +                    raise RuntimeError( +                        '_StreamWriteContext.close: py_bufs != NULL and ' +                        'py_bufs_sml_inuse is True') + +        if self.py_bufs_sml_inuse: +            for i from 0 <= i < self.py_bufs_len: +                PyBuffer_Release(&self.py_bufs_sml[i]) +            self.py_bufs_sml_inuse = 0 + +        self.py_bufs_len = 0 +        self.buffers = None + +    cdef close(self): +        if self.closed: +            return +        self.closed = 1 +        self.free_bufs() +        Py_DECREF(self) + +    cdef advance_uv_buf(self, size_t sent): +        # Advance the pointer to first uv_buf and the +        # pointer to first byte in that buffer. +        # +        # We do this after a "uv_try_write" call, which +        # sometimes sends only a portion of data. +        # We then call "advance_uv_buf" on the write +        # context, and reuse it in a "uv_write" call. + +        cdef: +            uv.uv_buf_t* buf +            size_t idx + +        for idx from 0 <= idx < self.uv_bufs_len: +            buf = &self.uv_bufs_start[idx] +            if buf.len > sent: +                buf.len -= sent +                buf.base = buf.base + sent +                self.uv_bufs_start = buf +                self.uv_bufs_len -= idx +                return +            else: +                sent -= self.uv_bufs_start[idx].len + +            if UVLOOP_DEBUG: +                if sent < 0: +                    raise RuntimeError('fatal: sent < 0 in advance_uv_buf') + +        raise RuntimeError('fatal: Could not advance _StreamWriteContext') + +    @staticmethod +    cdef _StreamWriteContext new(UVStream stream, list buffers): +        cdef: +            _StreamWriteContext ctx +            int uv_bufs_idx = 0 +            size_t py_bufs_len = 0 +            int i + +            Py_buffer* p_pybufs +            uv.uv_buf_t* p_uvbufs + +        ctx = _StreamWriteContext.__new__(_StreamWriteContext) +        ctx.stream = None +        ctx.closed = 1 +        ctx.py_bufs_len = 0 +        ctx.py_bufs_sml_inuse = 0 +        ctx.uv_bufs = NULL +        ctx.py_bufs = NULL +        ctx.buffers = buffers +        ctx.stream = stream + +        if len(buffers) <= __PREALLOCED_BUFS: +            # We've got a small number of buffers to write, don't +            # need to use malloc. +            ctx.py_bufs_sml_inuse = 1 +            p_pybufs = <Py_buffer*>&ctx.py_bufs_sml +            p_uvbufs = <uv.uv_buf_t*>&ctx.uv_bufs_sml + +        else: +            for buf in buffers: +                if UVLOOP_DEBUG: +                    if not isinstance(buf, (bytes, bytearray, memoryview)): +                        raise RuntimeError( +                            'invalid data in writebuf: an instance of ' +                            'bytes, bytearray or memoryview was expected, ' +                            'got {}'.format(type(buf))) + +                if not PyBytes_CheckExact(buf): +                    py_bufs_len += 1 + +            if py_bufs_len > 0: +                ctx.py_bufs = <Py_buffer*>PyMem_RawMalloc( +                    py_bufs_len * sizeof(Py_buffer)) +                if ctx.py_bufs is NULL: +                    raise MemoryError() + +            ctx.uv_bufs = <uv.uv_buf_t*>PyMem_RawMalloc( +                len(buffers) * sizeof(uv.uv_buf_t)) +            if ctx.uv_bufs is NULL: +                raise MemoryError() + +            p_pybufs = ctx.py_bufs +            p_uvbufs = ctx.uv_bufs + +        py_bufs_len = 0 +        for buf in buffers: +            if PyBytes_CheckExact(buf): +                # We can only use this hack for bytes since it's +                # immutable.  For everything else it is only safe to +                # use buffer protocol. +                p_uvbufs[uv_bufs_idx].base = PyBytes_AS_STRING(buf) +                p_uvbufs[uv_bufs_idx].len = Py_SIZE(buf) + +            else: +                try: +                    PyObject_GetBuffer( +                        buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE) +                except Exception: +                    # This shouldn't ever happen, as `UVStream._buffer_write` +                    # casts non-bytes objects to `memoryviews`. +                    ctx.py_bufs_len = py_bufs_len +                    ctx.free_bufs() +                    raise + +                p_uvbufs[uv_bufs_idx].base = <char*>p_pybufs[py_bufs_len].buf +                p_uvbufs[uv_bufs_idx].len = p_pybufs[py_bufs_len].len + +                py_bufs_len += 1 + +            uv_bufs_idx += 1 + +        ctx.uv_bufs_start = p_uvbufs +        ctx.uv_bufs_len = uv_bufs_idx + +        ctx.py_bufs_len = py_bufs_len +        ctx.req.data = <void*> ctx + +        if UVLOOP_DEBUG: +            stream._loop._debug_stream_write_ctx_total += 1 +            stream._loop._debug_stream_write_ctx_cnt += 1 + +        # Do incref after everything else is done. +        # Under no circumstances we want `ctx` to be GCed while +        # libuv is still working with `ctx.uv_bufs`. +        Py_INCREF(ctx) +        ctx.closed = 0 +        return ctx + +    def __dealloc__(self): +        if not self.closed: +            # Because we do an INCREF in _StreamWriteContext.new, +            # __dealloc__ shouldn't ever happen with `self.closed == 1` +            raise RuntimeError( +                'open _StreamWriteContext is being deallocated') + +        if UVLOOP_DEBUG: +            if self.stream is not None: +                self.stream._loop._debug_stream_write_ctx_cnt -= 1 +                self.stream = None + + +@cython.no_gc_clear +cdef class UVStream(UVBaseTransport): + +    def __cinit__(self): +        self.__shutting_down = 0 +        self.__reading = 0 +        self.__read_error_close = 0 +        self.__buffered = 0 +        self._eof = 0 +        self._buffer = [] +        self._buffer_size = 0 + +        self._protocol_get_buffer = None +        self._protocol_buffer_updated = None + +        self._read_pybuf_acquired = False + +    cdef _set_protocol(self, object protocol): +        if protocol is None: +            raise TypeError('protocol is required') + +        UVBaseTransport._set_protocol(self, protocol) + +        if (hasattr(protocol, 'get_buffer') and +                not isinstance(protocol, aio_Protocol)): +            try: +                self._protocol_get_buffer = protocol.get_buffer +                self._protocol_buffer_updated = protocol.buffer_updated +                self.__buffered = 1 +            except AttributeError: +                pass +        else: +            self.__buffered = 0 + +    cdef _clear_protocol(self): +        UVBaseTransport._clear_protocol(self) +        self._protocol_get_buffer = None +        self._protocol_buffer_updated = None +        self.__buffered = 0 + +    cdef inline _shutdown(self): +        cdef int err + +        if self.__shutting_down: +            return +        self.__shutting_down = 1 + +        self._ensure_alive() + +        self._shutdown_req.data = <void*> self +        err = uv.uv_shutdown(&self._shutdown_req, +                             <uv.uv_stream_t*> self._handle, +                             __uv_stream_on_shutdown) +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return + +    cdef inline _accept(self, UVStream server): +        cdef int err +        self._ensure_alive() + +        err = uv.uv_accept(<uv.uv_stream_t*>server._handle, +                           <uv.uv_stream_t*>self._handle) +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return + +        self._on_accept() + +    cdef inline _close_on_read_error(self): +        self.__read_error_close = 1 + +    cdef bint _is_reading(self): +        return self.__reading + +    cdef _start_reading(self): +        cdef int err + +        if self._closing: +            return + +        self._ensure_alive() + +        if self.__reading: +            return + +        if self.__buffered: +            err = uv.uv_read_start(<uv.uv_stream_t*>self._handle, +                                   __uv_stream_buffered_alloc, +                                   __uv_stream_buffered_on_read) +        else: +            err = uv.uv_read_start(<uv.uv_stream_t*>self._handle, +                                   __loop_alloc_buffer, +                                   __uv_stream_on_read) +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return +        else: +            # UVStream must live until the read callback is called +            self.__reading_started() + +    cdef inline __reading_started(self): +        if self.__reading: +            return +        self.__reading = 1 +        Py_INCREF(self) + +    cdef inline __reading_stopped(self): +        if not self.__reading: +            return +        self.__reading = 0 +        Py_DECREF(self) + +    cdef _stop_reading(self): +        cdef int err + +        if not self.__reading: +            return + +        self._ensure_alive() + +        # From libuv docs: +        #    This function is idempotent and may be safely +        #    called on a stopped stream. +        err = uv.uv_read_stop(<uv.uv_stream_t*>self._handle) +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return +        else: +            self.__reading_stopped() + +    cdef inline _try_write(self, object data): +        cdef: +            ssize_t written +            bint used_buf = 0 +            Py_buffer py_buf +            void* buf +            size_t blen +            int saved_errno +            int fd + +        if (<uv.uv_stream_t*>self._handle).write_queue_size != 0: +            raise RuntimeError( +                'UVStream._try_write called with data in uv buffers') + +        if PyBytes_CheckExact(data): +            # We can only use this hack for bytes since it's +            # immutable.  For everything else it is only safe to +            # use buffer protocol. +            buf = <void*>PyBytes_AS_STRING(data) +            blen = Py_SIZE(data) +        else: +            PyObject_GetBuffer(data, &py_buf, PyBUF_SIMPLE) +            used_buf = 1 +            buf = py_buf.buf +            blen = py_buf.len + +        if blen == 0: +            # Empty data, do nothing. +            return 0 + +        fd = self._fileno() +        # Use `unistd.h/write` directly, it's faster than +        # uv_try_write -- less layers of code.  The error +        # checking logic is copied from libuv. +        written = system.write(fd, buf, blen) +        while written == -1 and ( +                errno.errno == errno.EINTR or +                (system.PLATFORM_IS_APPLE and +                    errno.errno == errno.EPROTOTYPE)): +            # From libuv code (unix/stream.c): +            #   Due to a possible kernel bug at least in OS X 10.10 "Yosemite", +            #   EPROTOTYPE can be returned while trying to write to a socket +            #   that is shutting down. If we retry the write, we should get +            #   the expected EPIPE instead. +            written = system.write(fd, buf, blen) +        saved_errno = errno.errno + +        if used_buf: +            PyBuffer_Release(&py_buf) + +        if written < 0: +            if saved_errno == errno.EAGAIN or \ +                    saved_errno == system.EWOULDBLOCK: +                return -1 +            else: +                exc = convert_error(-saved_errno) +                self._fatal_error(exc, True) +                return + +        if UVLOOP_DEBUG: +            self._loop._debug_stream_write_tries += 1 + +        if <size_t>written == blen: +            return 0 + +        return written + +    cdef inline _buffer_write(self, object data): +        cdef int dlen + +        if not PyBytes_CheckExact(data): +            data = memoryview(data).cast('b') + +        dlen = len(data) +        if not dlen: +            return + +        self._buffer_size += dlen +        self._buffer.append(data) + +    cdef inline _initiate_write(self): +        if (not self._protocol_paused and +                (<uv.uv_stream_t*>self._handle).write_queue_size == 0 and +                self._buffer_size > self._high_water): +            # Fast-path.  If: +            #   - the protocol isn't yet paused, +            #   - there is no data in libuv buffers for this stream, +            #   - the protocol will be paused if we continue to buffer data +            # +            # Then: +            #   - Try to write all buffered data right now. +            all_sent = self._exec_write() +            if UVLOOP_DEBUG: +                if self._buffer_size != 0 or self._buffer != []: +                    raise RuntimeError( +                        '_buffer_size is not 0 after a successful _exec_write') + +            # There is no need to call `_queue_write` anymore, +            # as `uv_write` should be called already. + +            if not all_sent: +                # If not all of the data was sent successfully, +                # we might need to pause the protocol. +                self._maybe_pause_protocol() + +        elif self._buffer_size > 0: +            self._maybe_pause_protocol() +            self._loop._queue_write(self) + +    cdef inline _exec_write(self): +        cdef: +            int err +            int buf_len +            _StreamWriteContext ctx = None + +        if self._closed: +            # If the handle is closed, just return, it's too +            # late to do anything. +            return + +        buf_len = len(self._buffer) +        if not buf_len: +            return + +        if (<uv.uv_stream_t*>self._handle).write_queue_size == 0: +            # libuv internal write buffers for this stream are empty. +            if buf_len == 1: +                # If we only have one piece of data to send, let's +                # use our fast implementation of try_write. +                data = self._buffer[0] +                sent = self._try_write(data) + +                if sent is None: +                    # A `self._fatal_error` was called. +                    # It might not raise an exception under some +                    # conditions. +                    self._buffer_size = 0 +                    self._buffer.clear() +                    if not self._closing: +                        # This should never happen. +                        raise RuntimeError( +                            'stream is open after UVStream._try_write ' +                            'returned None') +                    return + +                if sent == 0: +                    # All data was successfully written. +                    self._buffer_size = 0 +                    self._buffer.clear() +                    # on_write will call "maybe_resume_protocol". +                    self._on_write() +                    return True + +                if sent > 0: +                    if UVLOOP_DEBUG: +                        if sent == len(data): +                            raise RuntimeError( +                                '_try_write sent all data and returned ' +                                'non-zero') + +                    if PyBytes_CheckExact(data): +                        # Cast bytes to memoryview to avoid copying +                        # data that wasn't sent. +                        data = memoryview(data) +                    data = data[sent:] + +                    self._buffer_size -= sent +                    self._buffer[0] = data + +                # At this point it's either data was sent partially, +                # or an EAGAIN has happened. + +            else: +                ctx = _StreamWriteContext.new(self, self._buffer) + +                err = uv.uv_try_write(<uv.uv_stream_t*>self._handle, +                                      ctx.uv_bufs_start, +                                      ctx.uv_bufs_len) + +                if err > 0: +                    # Some data was successfully sent. + +                    if <size_t>err == self._buffer_size: +                        # Everything was sent. +                        ctx.close() +                        self._buffer.clear() +                        self._buffer_size = 0 +                        # on_write will call "maybe_resume_protocol". +                        self._on_write() +                        return True + +                    try: +                        # Advance pointers to uv_bufs in `ctx`, +                        # we will reuse it soon for a uv_write +                        # call. +                        ctx.advance_uv_buf(<ssize_t>err) +                    except Exception as ex:  # This should never happen. +                        # Let's try to close the `ctx` anyways. +                        ctx.close() +                        self._fatal_error(ex, True) +                        self._buffer.clear() +                        self._buffer_size = 0 +                        return + +                elif err != uv.UV_EAGAIN: +                    ctx.close() +                    exc = convert_error(err) +                    self._fatal_error(exc, True) +                    self._buffer.clear() +                    self._buffer_size = 0 +                    return + +                # fall through + +        if ctx is None: +            ctx = _StreamWriteContext.new(self, self._buffer) + +        err = uv.uv_write(&ctx.req, +                          <uv.uv_stream_t*>self._handle, +                          ctx.uv_bufs_start, +                          ctx.uv_bufs_len, +                          __uv_stream_on_write) + +        self._buffer_size = 0 +        # Can't use `_buffer.clear()` here: `ctx` holds a reference to +        # the `_buffer`. +        self._buffer = [] + +        if err < 0: +            # close write context +            ctx.close() + +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return + +        self._maybe_resume_protocol() + +    cdef size_t _get_write_buffer_size(self): +        if self._handle is NULL: +            return 0 +        return ((<uv.uv_stream_t*>self._handle).write_queue_size + +                self._buffer_size) + +    cdef _close(self): +        try: +            if self._read_pybuf_acquired: +                # Should never happen. libuv always calls uv_alloc/uv_read +                # in pairs. +                self._loop.call_exception_handler({ +                    'transport': self, +                    'message': 'XXX: an allocated buffer in transport._close()' +                }) +                self._read_pybuf_acquired = 0 +                PyBuffer_Release(&self._read_pybuf) + +            self._stop_reading() +        finally: +            UVSocketHandle._close(<UVHandle>self) + +    cdef inline _on_accept(self): +        # Ultimately called by __uv_stream_on_listen. +        self._init_protocol() + +    cdef inline _on_eof(self): +        # Any exception raised here will be caught in +        # __uv_stream_on_read. + +        try: +            meth = self._protocol.eof_received +        except AttributeError: +            keep_open = False +        else: +            keep_open = run_in_context(self.context, meth) + +        if keep_open: +            # We're keeping the connection open so the +            # protocol can write more, but we still can't +            # receive more, so remove the reader callback. +            self._stop_reading() +        else: +            self.close() + +    cdef inline _on_write(self): +        self._maybe_resume_protocol() +        if not self._get_write_buffer_size(): +            if self._closing: +                self._schedule_call_connection_lost(None) +            elif self._eof: +                self._shutdown() + +    cdef inline _init(self, Loop loop, object protocol, Server server, +                      object waiter, object context): +        self.context = context +        self._set_protocol(protocol) +        self._start_init(loop) + +        if server is not None: +            self._set_server(server) + +        if waiter is not None: +            self._set_waiter(waiter) + +    cdef inline _on_connect(self, object exc): +        # Called from __tcp_connect_callback (tcp.pyx) and +        # __pipe_connect_callback (pipe.pyx). +        if exc is None: +            self._init_protocol() +        else: +            if self._waiter is None: +                self._fatal_error(exc, False, "connect failed") +            elif self._waiter.cancelled(): +                # Connect call was cancelled; just close the transport +                # silently. +                self._close() +            elif self._waiter.done(): +                self._fatal_error(exc, False, "connect failed") +            else: +                self._waiter.set_exception(exc) +                self._close() + +    # === Public API === + +    def __repr__(self): +        return '<{} closed={} reading={} {:#x}>'.format( +            self.__class__.__name__, +            self._closed, +            self.__reading, +            id(self)) + +    def write(self, object buf): +        self._ensure_alive() + +        if self._eof: +            raise RuntimeError('Cannot call write() after write_eof()') +        if not buf: +            return +        if self._conn_lost: +            self._conn_lost += 1 +            return +        self._buffer_write(buf) +        self._initiate_write() + +    def writelines(self, bufs): +        self._ensure_alive() + +        if self._eof: +            raise RuntimeError('Cannot call writelines() after write_eof()') +        if self._conn_lost: +            self._conn_lost += 1 +            return +        for buf in bufs: +            self._buffer_write(buf) +        self._initiate_write() + +    def write_eof(self): +        self._ensure_alive() + +        if self._eof: +            return + +        self._eof = 1 +        if not self._get_write_buffer_size(): +            self._shutdown() + +    def can_write_eof(self): +        return True + +    def is_reading(self): +        return self._is_reading() + +    def pause_reading(self): +        if self._closing or not self._is_reading(): +            return +        self._stop_reading() + +    def resume_reading(self): +        if self._is_reading() or self._closing: +            return +        self._start_reading() + + +cdef void __uv_stream_on_shutdown(uv.uv_shutdown_t* req, +                                  int status) noexcept with gil: + +    # callback for uv_shutdown + +    if req.data is NULL: +        aio_logger.error( +            'UVStream.shutdown callback called with NULL req.data, status=%r', +            status) +        return + +    cdef UVStream stream = <UVStream> req.data + +    if status < 0 and status != uv.UV_ECANCELED: +        # From libuv source code: +        #     The ECANCELED error code is a lie, the shutdown(2) syscall is a +        #     fait accompli at this point. Maybe we should revisit this in +        #     v0.11.  A possible reason for leaving it unchanged is that it +        #     informs the callee that the handle has been destroyed. + +        if UVLOOP_DEBUG: +            stream._loop._debug_stream_shutdown_errors_total += 1 + +        exc = convert_error(status) +        stream._fatal_error( +            exc, False, "error status in uv_stream_t.shutdown callback") +        return + + +cdef inline bint __uv_stream_on_read_common( +    UVStream sc, +    Loop loop, +    ssize_t nread, +): +    if sc._closed: +        # The stream was closed, there is no reason to +        # do any work now. +        sc.__reading_stopped()  # Just in case. +        return True + +    if nread == uv.UV_EOF: +        # From libuv docs: +        #     The callee is responsible for stopping closing the stream +        #     when an error happens by calling uv_read_stop() or uv_close(). +        #     Trying to read from the stream again is undefined. +        try: +            if UVLOOP_DEBUG: +                loop._debug_stream_read_eof_total += 1 + +            sc._stop_reading() +            sc._on_eof() +        except BaseException as ex: +            if UVLOOP_DEBUG: +                loop._debug_stream_read_eof_cb_errors_total += 1 + +            sc._fatal_error(ex, False) +        finally: +            return True + +    if nread == 0: +        # From libuv docs: +        #     nread might be 0, which does not indicate an error or EOF. +        #     This is equivalent to EAGAIN or EWOULDBLOCK under read(2). +        return True + +    if nread < 0: +        # From libuv docs: +        #     The callee is responsible for stopping closing the stream +        #     when an error happens by calling uv_read_stop() or uv_close(). +        #     Trying to read from the stream again is undefined. +        # +        # Therefore, we're closing the stream.  Since "UVHandle._close()" +        # doesn't raise exceptions unless uvloop is built with DEBUG=1, +        # we don't need try...finally here. + +        if UVLOOP_DEBUG: +            loop._debug_stream_read_errors_total += 1 + +        if sc.__read_error_close: +            # Used for getting notified when a pipe is closed. +            # See WriteUnixTransport for the explanation. +            sc._on_eof() +            return True + +        exc = convert_error(nread) +        sc._fatal_error( +            exc, False, "error status in uv_stream_t.read callback") +        return True + +    return False + + +cdef inline void __uv_stream_on_read_impl( +    uv.uv_stream_t* stream, +    ssize_t nread, +    const uv.uv_buf_t* buf, +): +    cdef: +        UVStream sc = <UVStream>stream.data +        Loop loop = sc._loop + +    # It's OK to free the buffer early, since nothing will +    # be able to touch it until this method is done. +    __loop_free_buffer(loop) + +    if __uv_stream_on_read_common(sc, loop, nread): +        return + +    try: +        if UVLOOP_DEBUG: +            loop._debug_stream_read_cb_total += 1 + +        run_in_context1( +            sc.context, +            sc._protocol_data_received, +            loop._recv_buffer[:nread], +        ) +    except BaseException as exc: +        if UVLOOP_DEBUG: +            loop._debug_stream_read_cb_errors_total += 1 + +        sc._fatal_error(exc, False) + + +cdef inline void __uv_stream_on_write_impl( +    uv.uv_write_t* req, +    int status, +): +    cdef: +        _StreamWriteContext ctx = <_StreamWriteContext> req.data +        UVStream stream = <UVStream>ctx.stream + +    ctx.close() + +    if stream._closed: +        # The stream was closed, there is nothing to do. +        # Even if there is an error, like EPIPE, there +        # is no reason to report it. +        return + +    if status < 0: +        if UVLOOP_DEBUG: +            stream._loop._debug_stream_write_errors_total += 1 + +        exc = convert_error(status) +        stream._fatal_error( +            exc, False, "error status in uv_stream_t.write callback") +        return + +    try: +        stream._on_write() +    except BaseException as exc: +        if UVLOOP_DEBUG: +            stream._loop._debug_stream_write_cb_errors_total += 1 + +        stream._fatal_error(exc, False) + + +cdef void __uv_stream_on_read( +    uv.uv_stream_t* stream, +    ssize_t nread, +    const uv.uv_buf_t* buf, +) noexcept with gil: + +    if __ensure_handle_data(<uv.uv_handle_t*>stream, +                            "UVStream read callback") == 0: +        return + +    # Don't need try-finally, __uv_stream_on_read_impl is void +    __uv_stream_on_read_impl(stream, nread, buf) + + +cdef void __uv_stream_on_write( +    uv.uv_write_t* req, +    int status, +) noexcept with gil: + +    if UVLOOP_DEBUG: +        if req.data is NULL: +            aio_logger.error( +                'UVStream.write callback called with NULL req.data, status=%r', +                status) +            return + +    # Don't need try-finally, __uv_stream_on_write_impl is void +    __uv_stream_on_write_impl(req, status) + + +cdef void __uv_stream_buffered_alloc( +    uv.uv_handle_t* stream, +    size_t suggested_size, +    uv.uv_buf_t* uvbuf, +) noexcept with gil: + +    if __ensure_handle_data(<uv.uv_handle_t*>stream, +                            "UVStream alloc buffer callback") == 0: +        return + +    cdef: +        UVStream sc = <UVStream>stream.data +        Loop loop = sc._loop +        Py_buffer* pybuf = &sc._read_pybuf +        int got_buf = 0 + +    if sc._read_pybuf_acquired: +        uvbuf.len = 0 +        uvbuf.base = NULL +        return + +    sc._read_pybuf_acquired = 0 +    try: +        buf = run_in_context1( +            sc.context, +            sc._protocol_get_buffer, +            suggested_size, +        ) +        PyObject_GetBuffer(buf, pybuf, PyBUF_WRITABLE) +        got_buf = 1 +    except BaseException as exc: +        # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF. +        # We'll do it later in __uv_stream_buffered_on_read when we +        # receive UV_ENOBUFS. +        uvbuf.len = 0 +        uvbuf.base = NULL +        return + +    if not pybuf.len: +        uvbuf.len = 0 +        uvbuf.base = NULL +        if got_buf: +            PyBuffer_Release(pybuf) +        return + +    sc._read_pybuf_acquired = 1 +    uvbuf.base = <char*>pybuf.buf +    uvbuf.len = pybuf.len + + +cdef void __uv_stream_buffered_on_read( +    uv.uv_stream_t* stream, +    ssize_t nread, +    const uv.uv_buf_t* buf, +) noexcept with gil: + +    if __ensure_handle_data(<uv.uv_handle_t*>stream, +                            "UVStream buffered read callback") == 0: +        return + +    cdef: +        UVStream sc = <UVStream>stream.data +        Loop loop = sc._loop +        Py_buffer* pybuf = &sc._read_pybuf + +    if nread == uv.UV_ENOBUFS: +        sc._fatal_error( +            RuntimeError( +                'unhandled error (or an empty buffer) in get_buffer()'), +            False) +        return + +    try: +        if nread > 0 and not sc._read_pybuf_acquired: +            # From libuv docs: +            #     nread is > 0 if there is data available or < 0 on error. When +            #     we’ve reached EOF, nread will be set to UV_EOF. When +            #     nread < 0, the buf parameter might not point to a valid +            #     buffer; in that case buf.len and buf.base are both set to 0. +            raise RuntimeError( +                f'no python buffer is allocated in on_read; nread={nread}') + +        if nread == 0: +            # From libuv docs: +            #     nread might be 0, which does not indicate an error or EOF. +            #     This is equivalent to EAGAIN or EWOULDBLOCK under read(2). +            return + +        if __uv_stream_on_read_common(sc, loop, nread): +            return + +        if UVLOOP_DEBUG: +            loop._debug_stream_read_cb_total += 1 + +        run_in_context1(sc.context, sc._protocol_buffer_updated, nread) +    except BaseException as exc: +        if UVLOOP_DEBUG: +            loop._debug_stream_read_cb_errors_total += 1 + +        sc._fatal_error(exc, False) +    finally: +        sc._read_pybuf_acquired = 0 +        PyBuffer_Release(pybuf) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pxd new file mode 100644 index 0000000..a004efd --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pxd @@ -0,0 +1,26 @@ +cdef class UVStreamServer(UVSocketHandle): +    cdef: +        int backlog +        object ssl +        object ssl_handshake_timeout +        object ssl_shutdown_timeout +        object protocol_factory +        bint opened +        Server _server + +    # All "inline" methods are final + +    cdef inline _init(self, Loop loop, object protocol_factory, +                      Server server, +                      object backlog, +                      object ssl, +                      object ssl_handshake_timeout, +                      object ssl_shutdown_timeout) + +    cdef inline _mark_as_open(self) + +    cdef inline listen(self) +    cdef inline _on_listen(self) + +    cdef UVStream _make_new_transport(self, object protocol, object waiter, +                                      object context) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pyx new file mode 100644 index 0000000..9993317 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/streamserver.pyx @@ -0,0 +1,150 @@ +@cython.no_gc_clear +cdef class UVStreamServer(UVSocketHandle): + +    def __cinit__(self): +        self.opened = 0 +        self._server = None +        self.ssl = None +        self.ssl_handshake_timeout = None +        self.ssl_shutdown_timeout = None +        self.protocol_factory = None + +    cdef inline _init(self, Loop loop, object protocol_factory, +                      Server server, +                      object backlog, +                      object ssl, +                      object ssl_handshake_timeout, +                      object ssl_shutdown_timeout): + +        if not isinstance(backlog, int): +            # Don't allow floats +            raise TypeError('integer argument expected, got {}'.format( +                type(backlog).__name__)) + +        if ssl is not None: +            if not isinstance(ssl, ssl_SSLContext): +                raise TypeError( +                    'ssl is expected to be None or an instance of ' +                    'ssl.SSLContext, got {!r}'.format(ssl)) +        else: +            if ssl_handshake_timeout is not None: +                raise ValueError( +                    'ssl_handshake_timeout is only meaningful with ssl') +            if ssl_shutdown_timeout is not None: +                raise ValueError( +                    'ssl_shutdown_timeout is only meaningful with ssl') + +        self.backlog = backlog +        self.ssl = ssl +        self.ssl_handshake_timeout = ssl_handshake_timeout +        self.ssl_shutdown_timeout = ssl_shutdown_timeout + +        self._start_init(loop) +        self.protocol_factory = protocol_factory +        self._server = server + +    cdef inline listen(self): +        cdef int err +        self._ensure_alive() + +        if self.protocol_factory is None: +            raise RuntimeError('unable to listen(); no protocol_factory') + +        if self.opened != 1: +            raise RuntimeError('unopened TCPServer') + +        self.context = Context_CopyCurrent() + +        err = uv.uv_listen(<uv.uv_stream_t*> self._handle, +                           self.backlog, +                           __uv_streamserver_on_listen) +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return + +    cdef inline _on_listen(self): +        cdef UVStream client + +        protocol = run_in_context(self.context, self.protocol_factory) + +        if self.ssl is None: +            client = self._make_new_transport(protocol, None, self.context) + +        else: +            waiter = self._loop._new_future() + +            ssl_protocol = SSLProtocol( +                self._loop, protocol, self.ssl, +                waiter, +                server_side=True, +                server_hostname=None, +                ssl_handshake_timeout=self.ssl_handshake_timeout, +                ssl_shutdown_timeout=self.ssl_shutdown_timeout) + +            client = self._make_new_transport(ssl_protocol, None, self.context) + +            waiter.add_done_callback( +                ft_partial(self.__on_ssl_connected, client)) + +        client._accept(<UVStream>self) + +    cdef _fatal_error(self, exc, throw, reason=None): +        # Overload UVHandle._fatal_error + +        self._close() + +        if not isinstance(exc, OSError): + +            if throw or self._loop is None: +                raise exc + +            msg = f'Fatal error on server {self.__class__.__name__}' +            if reason is not None: +                msg = f'{msg} ({reason})' + +            self._loop.call_exception_handler({ +                'message': msg, +                'exception': exc, +            }) + +    cdef inline _mark_as_open(self): +        self.opened = 1 + +    cdef UVStream _make_new_transport(self, object protocol, object waiter, +                                      object context): +        raise NotImplementedError + +    def __on_ssl_connected(self, transport, fut): +        exc = fut.exception() +        if exc is not None: +            transport._force_close(exc) + + +cdef void __uv_streamserver_on_listen( +    uv.uv_stream_t* handle, +    int status, +) noexcept with gil: + +    # callback for uv_listen + +    if __ensure_handle_data(<uv.uv_handle_t*>handle, +                            "UVStream listen callback") == 0: +        return + +    cdef: +        UVStreamServer stream = <UVStreamServer> handle.data + +    if status < 0: +        if UVLOOP_DEBUG: +            stream._loop._debug_stream_listen_errors_total += 1 + +        exc = convert_error(status) +        stream._fatal_error( +            exc, False, "error status in uv_stream_t.listen callback") +        return + +    try: +        stream._on_listen() +    except BaseException as exc: +        stream._error(exc, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pxd new file mode 100644 index 0000000..8d388ef --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pxd @@ -0,0 +1,26 @@ +cdef class TCPServer(UVStreamServer): +    cdef bind(self, system.sockaddr* addr, unsigned int flags=*) + +    @staticmethod +    cdef TCPServer new(Loop loop, object protocol_factory, Server server, +                       unsigned int flags, +                       object backlog, +                       object ssl, +                       object ssl_handshake_timeout, +                       object ssl_shutdown_timeout) + + +cdef class TCPTransport(UVStream): +    cdef: +        bint __peername_set +        bint __sockname_set +        system.sockaddr_storage __peername +        system.sockaddr_storage __sockname + +    cdef bind(self, system.sockaddr* addr, unsigned int flags=*) +    cdef connect(self, system.sockaddr* addr) +    cdef _set_nodelay(self) + +    @staticmethod +    cdef TCPTransport new(Loop loop, object protocol, Server server, +                          object waiter, object context) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pyx new file mode 100644 index 0000000..d5fe827 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/tcp.pyx @@ -0,0 +1,228 @@ +cdef __tcp_init_uv_handle(UVStream handle, Loop loop, unsigned int flags): +    cdef int err + +    handle._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_tcp_t)) +    if handle._handle is NULL: +        handle._abort_init() +        raise MemoryError() + +    err = uv.uv_tcp_init_ex(handle._loop.uvloop, +                            <uv.uv_tcp_t*>handle._handle, +                            flags) +    if err < 0: +        handle._abort_init() +        raise convert_error(err) + +    handle._finish_init() + + +cdef __tcp_bind(UVStream handle, system.sockaddr* addr, unsigned int flags): +    cdef int err +    err = uv.uv_tcp_bind(<uv.uv_tcp_t *>handle._handle, +                         addr, flags) +    if err < 0: +        exc = convert_error(err) +        raise exc + + +cdef __tcp_open(UVStream handle, int sockfd): +    cdef int err +    err = uv.uv_tcp_open(<uv.uv_tcp_t *>handle._handle, +                         <uv.uv_os_sock_t>sockfd) +    if err < 0: +        exc = convert_error(err) +        raise exc + + +cdef __tcp_get_socket(UVSocketHandle handle): +    cdef: +        int buf_len = sizeof(system.sockaddr_storage) +        int fileno +        int err +        system.sockaddr_storage buf + +    fileno = handle._fileno() + +    err = uv.uv_tcp_getsockname(<uv.uv_tcp_t*>handle._handle, +                                <system.sockaddr*>&buf, +                                &buf_len) +    if err < 0: +        raise convert_error(err) + +    return PseudoSocket(buf.ss_family, uv.SOCK_STREAM, 0, fileno) + + +@cython.no_gc_clear +cdef class TCPServer(UVStreamServer): + +    @staticmethod +    cdef TCPServer new(Loop loop, object protocol_factory, Server server, +                       unsigned int flags, +                       object backlog, +                       object ssl, +                       object ssl_handshake_timeout, +                       object ssl_shutdown_timeout): + +        cdef TCPServer handle +        handle = TCPServer.__new__(TCPServer) +        handle._init(loop, protocol_factory, server, backlog, +                     ssl, ssl_handshake_timeout, ssl_shutdown_timeout) +        __tcp_init_uv_handle(<UVStream>handle, loop, flags) +        return handle + +    cdef _new_socket(self): +        return __tcp_get_socket(<UVSocketHandle>self) + +    cdef _open(self, int sockfd): +        self._ensure_alive() +        try: +            __tcp_open(<UVStream>self, sockfd) +        except Exception as exc: +            self._fatal_error(exc, True) +        else: +            self._mark_as_open() + +    cdef bind(self, system.sockaddr* addr, unsigned int flags=0): +        self._ensure_alive() +        try: +            __tcp_bind(<UVStream>self, addr, flags) +        except Exception as exc: +            self._fatal_error(exc, True) +        else: +            self._mark_as_open() + +    cdef UVStream _make_new_transport(self, object protocol, object waiter, +                                      object context): +        cdef TCPTransport tr +        tr = TCPTransport.new(self._loop, protocol, self._server, waiter, +                              context) +        return <UVStream>tr + + +@cython.no_gc_clear +cdef class TCPTransport(UVStream): + +    @staticmethod +    cdef TCPTransport new(Loop loop, object protocol, Server server, +                          object waiter, object context): + +        cdef TCPTransport handle +        handle = TCPTransport.__new__(TCPTransport) +        handle._init(loop, protocol, server, waiter, context) +        __tcp_init_uv_handle(<UVStream>handle, loop, uv.AF_UNSPEC) +        handle.__peername_set = 0 +        handle.__sockname_set = 0 +        handle._set_nodelay() +        return handle + +    cdef _set_nodelay(self): +        cdef int err +        self._ensure_alive() +        err = uv.uv_tcp_nodelay(<uv.uv_tcp_t*>self._handle, 1) +        if err < 0: +            raise convert_error(err) + +    cdef _call_connection_made(self): +        # asyncio saves peername & sockname when transports are instantiated, +        # so that they're accessible even after the transport is closed. +        # We are doing the same thing here, except that we create Python +        # objects lazily, on request in get_extra_info() + +        cdef: +            int err +            int buf_len + +        buf_len = sizeof(system.sockaddr_storage) +        err = uv.uv_tcp_getsockname(<uv.uv_tcp_t*>self._handle, +                                    <system.sockaddr*>&self.__sockname, +                                    &buf_len) +        if err >= 0: +            # Ignore errors, this is an optional thing. +            # If something serious is going on, the transport +            # will crash later (in roughly the same way how +            # an asyncio transport would.) +            self.__sockname_set = 1 + +        buf_len = sizeof(system.sockaddr_storage) +        err = uv.uv_tcp_getpeername(<uv.uv_tcp_t*>self._handle, +                                    <system.sockaddr*>&self.__peername, +                                    &buf_len) +        if err >= 0: +            # Same as few lines above -- we don't really care +            # about error case here. +            self.__peername_set = 1 + +        UVBaseTransport._call_connection_made(self) + +    def get_extra_info(self, name, default=None): +        if name == 'sockname': +            if self.__sockname_set: +                return __convert_sockaddr_to_pyaddr( +                    <system.sockaddr*>&self.__sockname) +        elif name == 'peername': +            if self.__peername_set: +                return __convert_sockaddr_to_pyaddr( +                    <system.sockaddr*>&self.__peername) +        return super().get_extra_info(name, default) + +    cdef _new_socket(self): +        return __tcp_get_socket(<UVSocketHandle>self) + +    cdef bind(self, system.sockaddr* addr, unsigned int flags=0): +        self._ensure_alive() +        __tcp_bind(<UVStream>self, addr, flags) + +    cdef _open(self, int sockfd): +        self._ensure_alive() +        __tcp_open(<UVStream>self, sockfd) + +    cdef connect(self, system.sockaddr* addr): +        cdef _TCPConnectRequest req +        req = _TCPConnectRequest(self._loop, self) +        req.connect(addr) + + +cdef class _TCPConnectRequest(UVRequest): +    cdef: +        TCPTransport transport +        uv.uv_connect_t _req_data + +    def __cinit__(self, loop, transport): +        self.request = <uv.uv_req_t*>&self._req_data +        self.request.data = <void*>self +        self.transport = transport + +    cdef connect(self, system.sockaddr* addr): +        cdef int err +        err = uv.uv_tcp_connect(<uv.uv_connect_t*>self.request, +                                <uv.uv_tcp_t*>self.transport._handle, +                                addr, +                                __tcp_connect_callback) +        if err < 0: +            exc = convert_error(err) +            self.on_done() +            raise exc + + +cdef void __tcp_connect_callback( +    uv.uv_connect_t* req, +    int status, +) noexcept with gil: +    cdef: +        _TCPConnectRequest wrapper +        TCPTransport transport + +    wrapper = <_TCPConnectRequest> req.data +    transport = wrapper.transport + +    if status < 0: +        exc = convert_error(status) +    else: +        exc = None + +    try: +        transport._on_connect(exc) +    except BaseException as ex: +        wrapper.transport._fatal_error(ex, False) +    finally: +        wrapper.on_done() diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/timer.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/timer.pxd new file mode 100644 index 0000000..fda23b6 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/timer.pxd @@ -0,0 +1,18 @@ +cdef class UVTimer(UVHandle): +    cdef: +        method_t callback +        object ctx +        bint running +        uint64_t timeout +        uint64_t start_t + +    cdef _init(self, Loop loop, method_t callback, object ctx, +               uint64_t timeout) + +    cdef stop(self) +    cdef start(self) +    cdef get_when(self) + +    @staticmethod +    cdef UVTimer new(Loop loop, method_t callback, object ctx, +                     uint64_t timeout) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/timer.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/timer.pyx new file mode 100644 index 0000000..86d46ef --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/timer.pyx @@ -0,0 +1,89 @@ +@cython.no_gc_clear +cdef class UVTimer(UVHandle): +    cdef _init(self, Loop loop, method_t callback, object ctx, +               uint64_t timeout): + +        cdef int err + +        self._start_init(loop) + +        self._handle = <uv.uv_handle_t*> PyMem_RawMalloc(sizeof(uv.uv_timer_t)) +        if self._handle is NULL: +            self._abort_init() +            raise MemoryError() + +        err = uv.uv_timer_init(self._loop.uvloop, <uv.uv_timer_t*>self._handle) +        if err < 0: +            self._abort_init() +            raise convert_error(err) + +        self._finish_init() + +        self.callback = callback +        self.ctx = ctx +        self.running = 0 +        self.timeout = timeout +        self.start_t = 0 + +    cdef stop(self): +        cdef int err + +        if not self._is_alive(): +            self.running = 0 +            return + +        if self.running == 1: +            err = uv.uv_timer_stop(<uv.uv_timer_t*>self._handle) +            self.running = 0 +            if err < 0: +                exc = convert_error(err) +                self._fatal_error(exc, True) +                return + +    cdef start(self): +        cdef int err + +        self._ensure_alive() + +        if self.running == 0: +            # Update libuv internal time. +            uv.uv_update_time(self._loop.uvloop)  # void +            self.start_t = uv.uv_now(self._loop.uvloop) + +            err = uv.uv_timer_start(<uv.uv_timer_t*>self._handle, +                                    __uvtimer_callback, +                                    self.timeout, 0) +            if err < 0: +                exc = convert_error(err) +                self._fatal_error(exc, True) +                return +            self.running = 1 + +    cdef get_when(self): +        return self.start_t + self.timeout + +    @staticmethod +    cdef UVTimer new(Loop loop, method_t callback, object ctx, +                     uint64_t timeout): + +        cdef UVTimer handle +        handle = UVTimer.__new__(UVTimer) +        handle._init(loop, callback, ctx, timeout) +        return handle + + +cdef void __uvtimer_callback( +    uv.uv_timer_t* handle, +) noexcept with gil: +    if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVTimer callback") == 0: +        return + +    cdef: +        UVTimer timer = <UVTimer> handle.data +        method_t cb = timer.callback + +    timer.running = 0 +    try: +        cb(timer.ctx) +    except BaseException as ex: +        timer._error(ex, False) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/udp.pxd b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pxd new file mode 100644 index 0000000..daa9a1b --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pxd @@ -0,0 +1,22 @@ +cdef class UDPTransport(UVBaseTransport): +    cdef: +        bint __receiving +        int _family +        object _address + +    cdef _init(self, Loop loop, unsigned int family) +    cdef _set_address(self, system.addrinfo *addr) + +    cdef _connect(self, system.sockaddr* addr, size_t addr_len) + +    cdef _bind(self, system.sockaddr* addr) +    cdef open(self, int family, int sockfd) +    cdef _set_broadcast(self, bint on) + +    cdef inline __receiving_started(self) +    cdef inline __receiving_stopped(self) + +    cdef _send(self, object data, object addr) + +    cdef _on_receive(self, bytes data, object exc, object addr) +    cdef _on_sent(self, object exc, object context=*) diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx new file mode 100644 index 0000000..bbe60d5 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx @@ -0,0 +1,409 @@ +@cython.no_gc_clear +@cython.freelist(DEFAULT_FREELIST_SIZE) +cdef class _UDPSendContext: +    # used to hold additional write request information for uv_write + +    cdef: +        uv.uv_udp_send_t   req + +        uv.uv_buf_t     uv_buf +        Py_buffer       py_buf + +        UDPTransport    udp + +        bint            closed + +    cdef close(self): +        if self.closed: +            return + +        self.closed = 1 +        PyBuffer_Release(&self.py_buf)  # void +        self.req.data = NULL +        self.uv_buf.base = NULL +        Py_DECREF(self) +        self.udp = None + +    @staticmethod +    cdef _UDPSendContext new(UDPTransport udp, object data): +        cdef _UDPSendContext ctx +        ctx = _UDPSendContext.__new__(_UDPSendContext) +        ctx.udp = None +        ctx.closed = 1 + +        ctx.req.data = <void*> ctx +        Py_INCREF(ctx) + +        PyObject_GetBuffer(data, &ctx.py_buf, PyBUF_SIMPLE) +        ctx.uv_buf.base = <char*>ctx.py_buf.buf +        ctx.uv_buf.len = ctx.py_buf.len +        ctx.udp = udp + +        ctx.closed = 0 +        return ctx + +    def __dealloc__(self): +        if UVLOOP_DEBUG: +            if not self.closed: +                raise RuntimeError( +                    'open _UDPSendContext is being deallocated') +        self.udp = None + + +@cython.no_gc_clear +cdef class UDPTransport(UVBaseTransport): +    def __cinit__(self): +        self._family = uv.AF_UNSPEC +        self.__receiving = 0 +        self._address = None +        self.context = Context_CopyCurrent() + +    cdef _init(self, Loop loop, unsigned int family): +        cdef int err + +        self._start_init(loop) + +        self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_udp_t)) +        if self._handle is NULL: +            self._abort_init() +            raise MemoryError() + +        err = uv.uv_udp_init_ex(loop.uvloop, +                                <uv.uv_udp_t*>self._handle, +                                family) +        if err < 0: +            self._abort_init() +            raise convert_error(err) + +        if family in (uv.AF_INET, uv.AF_INET6): +            self._family = family + +        self._finish_init() + +    cdef _set_address(self, system.addrinfo *addr): +        self._address = __convert_sockaddr_to_pyaddr(addr.ai_addr) + +    cdef _connect(self, system.sockaddr* addr, size_t addr_len): +        cdef int err +        err = uv.uv_udp_connect(<uv.uv_udp_t*>self._handle, addr) +        if err < 0: +            exc = convert_error(err) +            raise exc + +    cdef open(self, int family, int sockfd): +        if family in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): +            self._family = family +        else: +            raise ValueError( +                'cannot open a UDP handle, invalid family {}'.format(family)) + +        cdef int err +        err = uv.uv_udp_open(<uv.uv_udp_t*>self._handle, +                             <uv.uv_os_sock_t>sockfd) + +        if err < 0: +            exc = convert_error(err) +            raise exc + +    cdef _bind(self, system.sockaddr* addr): +        cdef: +            int err +            int flags = 0 + +        self._ensure_alive() + +        err = uv.uv_udp_bind(<uv.uv_udp_t*>self._handle, addr, flags) +        if err < 0: +            exc = convert_error(err) +            raise exc + +    cdef _set_broadcast(self, bint on): +        cdef int err + +        self._ensure_alive() + +        err = uv.uv_udp_set_broadcast(<uv.uv_udp_t*>self._handle, on) +        if err < 0: +            exc = convert_error(err) +            raise exc + +    cdef size_t _get_write_buffer_size(self): +        if self._handle is NULL: +            return 0 +        return (<uv.uv_udp_t*>self._handle).send_queue_size + +    cdef bint _is_reading(self): +        return self.__receiving + +    cdef _start_reading(self): +        cdef int err + +        if self.__receiving: +            return + +        self._ensure_alive() + +        err = uv.uv_udp_recv_start(<uv.uv_udp_t*>self._handle, +                                   __loop_alloc_buffer, +                                   __uv_udp_on_receive) + +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return +        else: +            # UDPTransport must live until the read callback is called +            self.__receiving_started() + +    cdef _stop_reading(self): +        cdef int err + +        if not self.__receiving: +            return + +        self._ensure_alive() + +        err = uv.uv_udp_recv_stop(<uv.uv_udp_t*>self._handle) +        if err < 0: +            exc = convert_error(err) +            self._fatal_error(exc, True) +            return +        else: +            self.__receiving_stopped() + +    cdef inline __receiving_started(self): +        if self.__receiving: +            return +        self.__receiving = 1 +        Py_INCREF(self) + +    cdef inline __receiving_stopped(self): +        if not self.__receiving: +            return +        self.__receiving = 0 +        Py_DECREF(self) + +    cdef _new_socket(self): +        if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): +            raise RuntimeError( +                'UDPTransport.family is undefined; ' +                'cannot create python socket') + +        fileno = self._fileno() +        return PseudoSocket(self._family, uv.SOCK_DGRAM, 0, fileno) + +    cdef _send(self, object data, object addr): +        cdef: +            _UDPSendContext ctx +            system.sockaddr_storage saddr_st +            system.sockaddr *saddr +            Py_buffer       try_pybuf +            uv.uv_buf_t     try_uvbuf + +        self._ensure_alive() + +        if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): +            raise RuntimeError('UDPTransport.family is undefined; cannot send') + +        if addr is None: +            saddr = NULL +        else: +            try: +                __convert_pyaddr_to_sockaddr(self._family, addr, +                                             <system.sockaddr*>&saddr_st) +            except (ValueError, TypeError): +                raise +            except Exception: +                raise ValueError( +                    f'{addr!r}: socket family mismatch or ' +                    f'a DNS lookup is required') +            saddr = <system.sockaddr*>(&saddr_st) + +        if self._get_write_buffer_size() == 0: +            PyObject_GetBuffer(data, &try_pybuf, PyBUF_SIMPLE) +            try_uvbuf.base = <char*>try_pybuf.buf +            try_uvbuf.len = try_pybuf.len +            err = uv.uv_udp_try_send(<uv.uv_udp_t*>self._handle, +                                     &try_uvbuf, +                                     1, +                                     saddr) +            PyBuffer_Release(&try_pybuf) +        else: +            err = uv.UV_EAGAIN + +        if err == uv.UV_EAGAIN: +            ctx = _UDPSendContext.new(self, data) +            err = uv.uv_udp_send(&ctx.req, +                                 <uv.uv_udp_t*>self._handle, +                                 &ctx.uv_buf, +                                 1, +                                 saddr, +                                 __uv_udp_on_send) + +            if err < 0: +                ctx.close() + +                exc = convert_error(err) +                self._fatal_error(exc, True) +            else: +                self._maybe_pause_protocol() + +        else: +            if err < 0: +                exc = convert_error(err) +                self._fatal_error(exc, True) +            else: +                self._on_sent(None, self.context.copy()) + +    cdef _on_receive(self, bytes data, object exc, object addr): +        if exc is None: +            run_in_context2( +                self.context, self._protocol.datagram_received, data, addr, +            ) +        else: +            run_in_context1(self.context, self._protocol.error_received, exc) + +    cdef _on_sent(self, object exc, object context=None): +        if exc is not None: +            if isinstance(exc, OSError): +                if context is None: +                    context = self.context +                run_in_context1(context, self._protocol.error_received, exc) +            else: +                self._fatal_error( +                    exc, False, 'Fatal write error on datagram transport') + +        self._maybe_resume_protocol() +        if not self._get_write_buffer_size(): +            if self._closing: +                self._schedule_call_connection_lost(None) + +    # === Public API === + +    def sendto(self, data, addr=None): +        if not data: +            # Replicating asyncio logic here. +            return + +        if self._address: +            if addr not in (None, self._address): +                # Replicating asyncio logic here. +                raise ValueError( +                    'Invalid address: must be None or %s' % (self._address,)) + +            # Instead of setting addr to self._address below like what asyncio +            # does, we depend on previous uv_udp_connect() to set the address +            addr = None + +        if self._conn_lost: +            # Replicating asyncio logic here. +            if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES: +                aio_logger.warning('socket.send() raised exception.') +            self._conn_lost += 1 +            return + +        self._send(data, addr) + + +cdef void __uv_udp_on_receive( +    uv.uv_udp_t* handle, +    ssize_t nread, +    const uv.uv_buf_t* buf, +    const system.sockaddr* addr, +    unsigned flags +) noexcept with gil: + +    if __ensure_handle_data(<uv.uv_handle_t*>handle, +                            "UDPTransport receive callback") == 0: +        return + +    cdef: +        UDPTransport udp = <UDPTransport>handle.data +        Loop loop = udp._loop +        bytes data +        object pyaddr + +    # It's OK to free the buffer early, since nothing will +    # be able to touch it until this method is done. +    __loop_free_buffer(loop) + +    if udp._closed: +        # The handle was closed, there is no reason to +        # do any work now. +        udp.__receiving_stopped()  # Just in case. +        return + +    if addr is NULL and nread == 0: +        # From libuv docs: +        #      addr: struct sockaddr* containing the address +        #      of the sender. Can be NULL. Valid for the duration +        #      of the callback only. +        #      [...] +        #      The receive callback will be called with +        #      nread == 0 and addr == NULL when there is +        #      nothing to read, and with nread == 0 and +        #      addr != NULL when an empty UDP packet is +        #      received. +        return + +    if addr is NULL: +        pyaddr = None +    elif addr.sa_family == uv.AF_UNSPEC: +        # https://github.com/MagicStack/uvloop/issues/304 +        if system.PLATFORM_IS_LINUX: +            pyaddr = None +        else: +            pyaddr = '' +    else: +        try: +            pyaddr = __convert_sockaddr_to_pyaddr(addr) +        except BaseException as exc: +            udp._error(exc, False) +            return + +    if nread < 0: +        exc = convert_error(nread) +        udp._on_receive(None, exc, pyaddr) +        return + +    if nread == 0: +        data = b'' +    else: +        data = loop._recv_buffer[:nread] + +    try: +        udp._on_receive(data, None, pyaddr) +    except BaseException as exc: +        udp._error(exc, False) + + +cdef void __uv_udp_on_send( +    uv.uv_udp_send_t* req, +    int status, +) noexcept with gil: + +    if req.data is NULL: +        # Shouldn't happen as: +        #    - _UDPSendContext does an extra INCREF in its 'init()' +        #    - _UDPSendContext holds a ref to the relevant UDPTransport +        aio_logger.error( +            'UVStream.write callback called with NULL req.data, status=%r', +            status) +        return + +    cdef: +        _UDPSendContext ctx = <_UDPSendContext> req.data +        UDPTransport udp = <UDPTransport>ctx.udp + +    ctx.close() + +    if status < 0: +        exc = convert_error(status) +        print(exc) +    else: +        exc = None + +    try: +        udp._on_sent(exc) +    except BaseException as exc: +        udp._error(exc, False) | 
