diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/loop.pyx')
-rw-r--r-- | venv/lib/python3.11/site-packages/uvloop/loop.pyx | 3403 |
1 files changed, 3403 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/loop.pyx b/venv/lib/python3.11/site-packages/uvloop/loop.pyx new file mode 100644 index 0000000..334d8d5 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/loop.pyx @@ -0,0 +1,3403 @@ +# cython: language_level=3, embedsignature=True + +import asyncio +cimport cython + +from .includes.debug cimport UVLOOP_DEBUG +from .includes cimport uv +from .includes cimport system +from .includes.python cimport ( + PY_VERSION_HEX, + PyMem_RawMalloc, PyMem_RawFree, + PyMem_RawCalloc, PyMem_RawRealloc, + PyUnicode_EncodeFSDefault, + PyErr_SetInterrupt, + _Py_RestoreSignals, + Context_CopyCurrent, + Context_Enter, + Context_Exit, + PyMemoryView_FromMemory, PyBUF_WRITE, + PyMemoryView_FromObject, PyMemoryView_Check, + PyOS_AfterFork_Parent, PyOS_AfterFork_Child, + PyOS_BeforeFork, + PyUnicode_FromString +) +from .includes.flowcontrol cimport add_flowcontrol_defaults + +from libc.stdint cimport uint64_t +from libc.string cimport memset, strerror, memcpy +from libc cimport errno + +from cpython cimport PyObject +from cpython cimport PyErr_CheckSignals, PyErr_Occurred +from cpython cimport PyThread_get_thread_ident +from cpython cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF +from cpython cimport ( + PyObject_GetBuffer, PyBuffer_Release, PyBUF_SIMPLE, + Py_buffer, PyBytes_AsString, PyBytes_CheckExact, + PyBytes_AsStringAndSize, + Py_SIZE, PyBytes_AS_STRING, PyBUF_WRITABLE +) +from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer + +from . import _noop + + +include "includes/consts.pxi" +include "includes/stdlib.pxi" + +include "errors.pyx" + +cdef: + int PY39 = PY_VERSION_HEX >= 0x03090000 + int PY311 = PY_VERSION_HEX >= 0x030b0000 + uint64_t MAX_SLEEP = 3600 * 24 * 365 * 100 + + +cdef _is_sock_stream(sock_type): + if SOCK_NONBLOCK == -1: + return sock_type == uv.SOCK_STREAM + else: + # Linux's socket.type is a bitmask that can include extra info + # about socket (like SOCK_NONBLOCK bit), therefore we can't do simple + # `sock_type == socket.SOCK_STREAM`, see + # https://github.com/torvalds/linux/blob/v4.13/include/linux/net.h#L77 + # for more details. + return (sock_type & 0xF) == uv.SOCK_STREAM + + +cdef _is_sock_dgram(sock_type): + if SOCK_NONBLOCK == -1: + return sock_type == uv.SOCK_DGRAM + else: + # Read the comment in `_is_sock_stream`. + return (sock_type & 0xF) == uv.SOCK_DGRAM + + +cdef isfuture(obj): + if aio_isfuture is None: + return isinstance(obj, aio_Future) + else: + return aio_isfuture(obj) + + +cdef inline socket_inc_io_ref(sock): + if isinstance(sock, socket_socket): + sock._io_refs += 1 + + +cdef inline socket_dec_io_ref(sock): + if isinstance(sock, socket_socket): + sock._decref_socketios() + + +cdef inline run_in_context(context, method): + # This method is internally used to workaround a reference issue that in + # certain circumstances, inlined context.run() will not hold a reference to + # the given method instance, which - if deallocated - will cause segfault. + # See also: edgedb/edgedb#2222 + Py_INCREF(method) + try: + return context.run(method) + finally: + Py_DECREF(method) + + +cdef inline run_in_context1(context, method, arg): + Py_INCREF(method) + try: + return context.run(method, arg) + finally: + Py_DECREF(method) + + +cdef inline run_in_context2(context, method, arg1, arg2): + Py_INCREF(method) + try: + return context.run(method, arg1, arg2) + finally: + Py_DECREF(method) + + +# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s +# *reuse_address* parameter +_unset = object() + + +@cython.no_gc_clear +cdef class Loop: + def __cinit__(self): + cdef int err + + # Install PyMem* memory allocators if they aren't installed yet. + __install_pymem() + + # Install pthread_atfork handlers + __install_atfork() + + self.uvloop = <uv.uv_loop_t*>PyMem_RawMalloc(sizeof(uv.uv_loop_t)) + if self.uvloop is NULL: + raise MemoryError() + + self.slow_callback_duration = 0.1 + + self._closed = 0 + self._debug = 0 + self._thread_id = 0 + self._running = 0 + self._stopping = 0 + + self._transports = weakref_WeakValueDictionary() + self._processes = set() + + # Used to keep a reference (and hence keep the fileobj alive) + # for as long as its registered by add_reader or add_writer. + # This is how the selector module and hence asyncio behaves. + self._fd_to_reader_fileobj = {} + self._fd_to_writer_fileobj = {} + + self._timers = set() + self._polls = {} + + self._recv_buffer_in_use = 0 + + err = uv.uv_loop_init(self.uvloop) + if err < 0: + raise convert_error(err) + self.uvloop.data = <void*> self + + self._init_debug_fields() + + self.active_process_handler = None + + self._last_error = None + + self._task_factory = None + self._exception_handler = None + self._default_executor = None + + self._queued_streams = set() + self._executing_streams = set() + self._ready = col_deque() + self._ready_len = 0 + + self.handler_async = UVAsync.new( + self, <method_t>self._on_wake, self) + + self.handler_idle = UVIdle.new( + self, + new_MethodHandle( + self, "loop._on_idle", <method_t>self._on_idle, None, self)) + + # Needed to call `UVStream._exec_write` for writes scheduled + # during `Protocol.data_received`. + self.handler_check__exec_writes = UVCheck.new( + self, + new_MethodHandle( + self, "loop._exec_queued_writes", + <method_t>self._exec_queued_writes, None, self)) + + self._signals = set() + self._ssock = self._csock = None + self._signal_handlers = {} + self._listening_signals = False + self._old_signal_wakeup_id = -1 + + self._coroutine_debug_set = False + + # A weak set of all asynchronous generators that are + # being iterated by the loop. + self._asyncgens = weakref_WeakSet() + + # Set to True when `loop.shutdown_asyncgens` is called. + self._asyncgens_shutdown_called = False + # Set to True when `loop.shutdown_default_executor` is called. + self._executor_shutdown_called = False + + self._servers = set() + + cdef inline _is_main_thread(self): + cdef uint64_t main_thread_id = system.MAIN_THREAD_ID + if system.MAIN_THREAD_ID_SET == 0: + main_thread_id = <uint64_t>threading_main_thread().ident + system.setMainThreadID(main_thread_id) + return main_thread_id == PyThread_get_thread_ident() + + def __init__(self): + self.set_debug( + sys_dev_mode or (not sys_ignore_environment + and bool(os_environ.get('PYTHONASYNCIODEBUG')))) + + def __dealloc__(self): + if self._running == 1: + raise RuntimeError('deallocating a running event loop!') + if self._closed == 0: + aio_logger.error("deallocating an open event loop") + return + PyMem_RawFree(self.uvloop) + self.uvloop = NULL + + cdef _init_debug_fields(self): + self._debug_cc = bool(UVLOOP_DEBUG) + + if UVLOOP_DEBUG: + self._debug_handles_current = col_Counter() + self._debug_handles_closed = col_Counter() + self._debug_handles_total = col_Counter() + else: + self._debug_handles_current = None + self._debug_handles_closed = None + self._debug_handles_total = None + + self._debug_uv_handles_total = 0 + self._debug_uv_handles_freed = 0 + + self._debug_stream_read_cb_total = 0 + self._debug_stream_read_eof_total = 0 + self._debug_stream_read_errors_total = 0 + self._debug_stream_read_cb_errors_total = 0 + self._debug_stream_read_eof_cb_errors_total = 0 + + self._debug_stream_shutdown_errors_total = 0 + self._debug_stream_listen_errors_total = 0 + + self._debug_stream_write_tries = 0 + self._debug_stream_write_errors_total = 0 + self._debug_stream_write_ctx_total = 0 + self._debug_stream_write_ctx_cnt = 0 + self._debug_stream_write_cb_errors_total = 0 + + self._debug_cb_handles_total = 0 + self._debug_cb_handles_count = 0 + + self._debug_cb_timer_handles_total = 0 + self._debug_cb_timer_handles_count = 0 + + self._poll_read_events_total = 0 + self._poll_read_cb_errors_total = 0 + self._poll_write_events_total = 0 + self._poll_write_cb_errors_total = 0 + + self._sock_try_write_total = 0 + + self._debug_exception_handler_cnt = 0 + + cdef _setup_or_resume_signals(self): + if not self._is_main_thread(): + return + + if self._listening_signals: + raise RuntimeError('signals handling has been already setup') + + if self._ssock is not None: + raise RuntimeError('self-pipe exists before loop run') + + # Create a self-pipe and call set_signal_wakeup_fd() with one + # of its ends. This is needed so that libuv knows that it needs + # to wakeup on ^C (no matter if the SIGINT handler is still the + # standard Python's one or or user set their own.) + + self._ssock, self._csock = socket_socketpair() + try: + self._ssock.setblocking(False) + self._csock.setblocking(False) + + fileno = self._csock.fileno() + + self._old_signal_wakeup_id = _set_signal_wakeup_fd(fileno) + except Exception: + # Out of all statements in the try block, only the + # "_set_signal_wakeup_fd()" call can fail, but it shouldn't, + # as we ensure that the current thread is the main thread. + # Still, if something goes horribly wrong we want to clean up + # the socket pair. + self._ssock.close() + self._csock.close() + self._ssock = None + self._csock = None + raise + + self._add_reader( + self._ssock, + new_MethodHandle( + self, + "Loop._read_from_self", + <method_t>self._read_from_self, + None, + self)) + + self._listening_signals = True + + cdef _pause_signals(self): + if not self._is_main_thread(): + if self._listening_signals: + raise RuntimeError( + 'cannot pause signals handling; no longer running in ' + 'the main thread') + else: + return + + if not self._listening_signals: + raise RuntimeError('signals handling has not been setup') + + self._listening_signals = False + + _set_signal_wakeup_fd(self._old_signal_wakeup_id) + + self._remove_reader(self._ssock) + self._ssock.close() + self._csock.close() + self._ssock = None + self._csock = None + + cdef _shutdown_signals(self): + if not self._is_main_thread(): + if self._signal_handlers: + aio_logger.warning( + 'cannot cleanup signal handlers: closing the event loop ' + 'in a non-main OS thread') + return + + if self._listening_signals: + raise RuntimeError( + 'cannot shutdown signals handling as it has not been paused') + + if self._ssock: + raise RuntimeError( + 'self-pipe was not cleaned up after loop was run') + + for sig in list(self._signal_handlers): + self.remove_signal_handler(sig) + + def __sighandler(self, signum, frame): + self._signals.add(signum) + + cdef inline _ceval_process_signals(self): + # Invoke CPython eval loop to let process signals. + PyErr_CheckSignals() + # Calling a pure-Python function will invoke + # _PyEval_EvalFrameDefault which will process + # pending signal callbacks. + _noop.noop() # Might raise ^C + + cdef _read_from_self(self): + cdef bytes sigdata + sigdata = b'' + while True: + try: + data = self._ssock.recv(65536) + if not data: + break + sigdata += data + except InterruptedError: + continue + except BlockingIOError: + break + if sigdata: + self._invoke_signals(sigdata) + + cdef _invoke_signals(self, bytes data): + cdef set sigs + + self._ceval_process_signals() + + sigs = self._signals.copy() + self._signals.clear() + for signum in data: + if not signum: + # ignore null bytes written by set_wakeup_fd() + continue + sigs.discard(signum) + self._handle_signal(signum) + + for signum in sigs: + # Since not all signals are registered by add_signal_handler() + # (for instance, we use the default SIGINT handler) not all + # signals will trigger loop.__sighandler() callback. Therefore + # we combine two datasources: one is self-pipe, one is data + # from __sighandler; this ensures that signals shouldn't be + # lost even if set_wakeup_fd() couldn't write to the self-pipe. + self._handle_signal(signum) + + cdef _handle_signal(self, sig): + cdef Handle handle + + try: + handle = <Handle>(self._signal_handlers[sig]) + except KeyError: + handle = None + + if handle is None: + self._ceval_process_signals() + return + + if handle._cancelled: + self.remove_signal_handler(sig) # Remove it properly. + else: + self._append_ready_handle(handle) + self.handler_async.send() + + cdef _on_wake(self): + if ((self._ready_len > 0 or self._stopping) and + not self.handler_idle.running): + self.handler_idle.start() + + cdef _on_idle(self): + cdef: + int i, ntodo + object popleft = self._ready.popleft + Handle handler + + ntodo = len(self._ready) + if self._debug: + for i from 0 <= i < ntodo: + handler = <Handle> popleft() + if handler._cancelled == 0: + try: + started = time_monotonic() + handler._run() + except BaseException as ex: + self._stop(ex) + return + else: + delta = time_monotonic() - started + if delta > self.slow_callback_duration: + aio_logger.warning( + 'Executing %s took %.3f seconds', + handler._format_handle(), delta) + + else: + for i from 0 <= i < ntodo: + handler = <Handle> popleft() + if handler._cancelled == 0: + try: + handler._run() + except BaseException as ex: + self._stop(ex) + return + + if len(self._queued_streams): + self._exec_queued_writes() + + self._ready_len = len(self._ready) + if self._ready_len == 0 and self.handler_idle.running: + self.handler_idle.stop() + + if self._stopping: + uv.uv_stop(self.uvloop) # void + + cdef _stop(self, exc): + if exc is not None: + self._last_error = exc + if self._stopping == 1: + return + self._stopping = 1 + if not self.handler_idle.running: + self.handler_idle.start() + + cdef __run(self, uv.uv_run_mode mode): + # Although every UVHandle holds a reference to the loop, + # we want to do everything to ensure that the loop will + # never deallocate during the run -- so we do some + # manual refs management. + Py_INCREF(self) + with nogil: + err = uv.uv_run(self.uvloop, mode) + Py_DECREF(self) + + if err < 0: + raise convert_error(err) + + cdef _run(self, uv.uv_run_mode mode): + cdef int err + + if self._closed == 1: + raise RuntimeError('unable to start the loop; it was closed') + + if self._running == 1: + raise RuntimeError('this event loop is already running.') + + if (aio_get_running_loop is not None and + aio_get_running_loop() is not None): + raise RuntimeError( + 'Cannot run the event loop while another loop is running') + + # reset _last_error + self._last_error = None + + self._thread_id = PyThread_get_thread_ident() + self._running = 1 + + self.handler_check__exec_writes.start() + self.handler_idle.start() + + self._setup_or_resume_signals() + + if aio_set_running_loop is not None: + aio_set_running_loop(self) + try: + self.__run(mode) + finally: + if aio_set_running_loop is not None: + aio_set_running_loop(None) + + self.handler_check__exec_writes.stop() + self.handler_idle.stop() + + self._pause_signals() + + self._thread_id = 0 + self._running = 0 + self._stopping = 0 + + if self._last_error is not None: + # The loop was stopped with an error with 'loop._stop(error)' call + raise self._last_error + + cdef _close(self): + cdef int err + + if self._running == 1: + raise RuntimeError("Cannot close a running event loop") + + if self._closed == 1: + return + + self._closed = 1 + + for cb_handle in self._ready: + cb_handle.cancel() + self._ready.clear() + self._ready_len = 0 + + if self._polls: + for poll_handle in self._polls.values(): + (<UVHandle>poll_handle)._close() + + self._polls.clear() + + if self._timers: + for timer_cbhandle in tuple(self._timers): + timer_cbhandle.cancel() + + # Close all remaining handles + self.handler_async._close() + self.handler_idle._close() + self.handler_check__exec_writes._close() + __close_all_handles(self) + self._shutdown_signals() + # During this run there should be no open handles, + # so it should finish right away + self.__run(uv.UV_RUN_DEFAULT) + + if self._fd_to_writer_fileobj: + for fileobj in self._fd_to_writer_fileobj.values(): + socket_dec_io_ref(fileobj) + self._fd_to_writer_fileobj.clear() + + if self._fd_to_reader_fileobj: + for fileobj in self._fd_to_reader_fileobj.values(): + socket_dec_io_ref(fileobj) + self._fd_to_reader_fileobj.clear() + + if self._timers: + raise RuntimeError( + f"new timers were queued during loop closing: {self._timers}") + + if self._polls: + raise RuntimeError( + f"new poll handles were queued during loop closing: " + f"{self._polls}") + + if self._ready: + raise RuntimeError( + f"new callbacks were queued during loop closing: " + f"{self._ready}") + + err = uv.uv_loop_close(self.uvloop) + if err < 0: + raise convert_error(err) + + self.handler_async = None + self.handler_idle = None + self.handler_check__exec_writes = None + + self._executor_shutdown_called = True + executor = self._default_executor + if executor is not None: + self._default_executor = None + executor.shutdown(wait=False) + + cdef uint64_t _time(self): + # asyncio doesn't have a time cache, neither should uvloop. + uv.uv_update_time(self.uvloop) # void + return uv.uv_now(self.uvloop) + + cdef inline _queue_write(self, UVStream stream): + self._queued_streams.add(stream) + if not self.handler_check__exec_writes.running: + self.handler_check__exec_writes.start() + + cdef _exec_queued_writes(self): + if len(self._queued_streams) == 0: + if self.handler_check__exec_writes.running: + self.handler_check__exec_writes.stop() + return + + cdef: + UVStream stream + + streams = self._queued_streams + self._queued_streams = self._executing_streams + self._executing_streams = streams + try: + for pystream in streams: + stream = <UVStream>pystream + stream._exec_write() + finally: + streams.clear() + + if self.handler_check__exec_writes.running: + if len(self._queued_streams) == 0: + self.handler_check__exec_writes.stop() + + cdef inline _call_soon(self, object callback, object args, object context): + cdef Handle handle + handle = new_Handle(self, callback, args, context) + self._call_soon_handle(handle) + return handle + + cdef inline _append_ready_handle(self, Handle handle): + self._check_closed() + self._ready.append(handle) + self._ready_len += 1 + + cdef inline _call_soon_handle(self, Handle handle): + self._append_ready_handle(handle) + if not self.handler_idle.running: + self.handler_idle.start() + + cdef _call_later(self, uint64_t delay, object callback, object args, + object context): + return TimerHandle(self, callback, args, delay, context) + + cdef void _handle_exception(self, object ex): + if isinstance(ex, Exception): + self.call_exception_handler({'exception': ex}) + else: + # BaseException + self._last_error = ex + # Exit ASAP + self._stop(None) + + cdef inline _check_signal(self, sig): + if not isinstance(sig, int): + raise TypeError('sig must be an int, not {!r}'.format(sig)) + + if not (1 <= sig < signal_NSIG): + raise ValueError( + 'sig {} out of range(1, {})'.format(sig, signal_NSIG)) + + cdef inline _check_closed(self): + if self._closed == 1: + raise RuntimeError('Event loop is closed') + + cdef inline _check_thread(self): + if self._thread_id == 0: + return + + cdef uint64_t thread_id + thread_id = <uint64_t>PyThread_get_thread_ident() + + if thread_id != self._thread_id: + raise RuntimeError( + "Non-thread-safe operation invoked on an event loop other " + "than the current one") + + cdef inline _new_future(self): + return aio_Future(loop=self) + + cdef _track_transport(self, UVBaseTransport transport): + self._transports[transport._fileno()] = transport + + cdef _track_process(self, UVProcess proc): + self._processes.add(proc) + + cdef _untrack_process(self, UVProcess proc): + self._processes.discard(proc) + + cdef _fileobj_to_fd(self, fileobj): + """Return a file descriptor from a file object. + + Parameters: + fileobj -- file object or file descriptor + + Returns: + corresponding file descriptor + + Raises: + ValueError if the object is invalid + """ + # Copy of the `selectors._fileobj_to_fd()` function. + if isinstance(fileobj, int): + fd = fileobj + else: + try: + fd = int(fileobj.fileno()) + except (AttributeError, TypeError, ValueError): + raise ValueError("Invalid file object: " + "{!r}".format(fileobj)) from None + if fd < 0: + raise ValueError("Invalid file descriptor: {}".format(fd)) + return fd + + cdef _ensure_fd_no_transport(self, fd): + cdef UVBaseTransport tr + try: + tr = <UVBaseTransport>(self._transports[fd]) + except KeyError: + pass + else: + if tr._is_alive(): + raise RuntimeError( + 'File descriptor {!r} is used by transport {!r}'.format( + fd, tr)) + + cdef _add_reader(self, fileobj, Handle handle): + cdef: + UVPoll poll + + self._check_closed() + fd = self._fileobj_to_fd(fileobj) + self._ensure_fd_no_transport(fd) + + try: + poll = <UVPoll>(self._polls[fd]) + except KeyError: + poll = UVPoll.new(self, fd) + self._polls[fd] = poll + + poll.start_reading(handle) + + old_fileobj = self._fd_to_reader_fileobj.pop(fd, None) + if old_fileobj is not None: + socket_dec_io_ref(old_fileobj) + + self._fd_to_reader_fileobj[fd] = fileobj + socket_inc_io_ref(fileobj) + + cdef _remove_reader(self, fileobj): + cdef: + UVPoll poll + + fd = self._fileobj_to_fd(fileobj) + self._ensure_fd_no_transport(fd) + + mapped_fileobj = self._fd_to_reader_fileobj.pop(fd, None) + if mapped_fileobj is not None: + socket_dec_io_ref(mapped_fileobj) + + if self._closed == 1: + return False + + try: + poll = <UVPoll>(self._polls[fd]) + except KeyError: + return False + + result = poll.stop_reading() + if not poll.is_active(): + del self._polls[fd] + poll._close() + + return result + + cdef _has_reader(self, fileobj): + cdef: + UVPoll poll + + self._check_closed() + fd = self._fileobj_to_fd(fileobj) + + try: + poll = <UVPoll>(self._polls[fd]) + except KeyError: + return False + + return poll.is_reading() + + cdef _add_writer(self, fileobj, Handle handle): + cdef: + UVPoll poll + + self._check_closed() + fd = self._fileobj_to_fd(fileobj) + self._ensure_fd_no_transport(fd) + + try: + poll = <UVPoll>(self._polls[fd]) + except KeyError: + poll = UVPoll.new(self, fd) + self._polls[fd] = poll + + poll.start_writing(handle) + + old_fileobj = self._fd_to_writer_fileobj.pop(fd, None) + if old_fileobj is not None: + socket_dec_io_ref(old_fileobj) + + self._fd_to_writer_fileobj[fd] = fileobj + socket_inc_io_ref(fileobj) + + cdef _remove_writer(self, fileobj): + cdef: + UVPoll poll + + fd = self._fileobj_to_fd(fileobj) + self._ensure_fd_no_transport(fd) + + mapped_fileobj = self._fd_to_writer_fileobj.pop(fd, None) + if mapped_fileobj is not None: + socket_dec_io_ref(mapped_fileobj) + + if self._closed == 1: + return False + + try: + poll = <UVPoll>(self._polls[fd]) + except KeyError: + return False + + result = poll.stop_writing() + if not poll.is_active(): + del self._polls[fd] + poll._close() + + return result + + cdef _has_writer(self, fileobj): + cdef: + UVPoll poll + + self._check_closed() + fd = self._fileobj_to_fd(fileobj) + + try: + poll = <UVPoll>(self._polls[fd]) + except KeyError: + return False + + return poll.is_writing() + + cdef _getaddrinfo(self, object host, object port, + int family, int type, + int proto, int flags, + int unpack): + + if isinstance(port, str): + port = port.encode() + elif isinstance(port, int): + port = str(port).encode() + if port is not None and not isinstance(port, bytes): + raise TypeError('port must be a str, bytes or int') + + if isinstance(host, str): + host = host.encode('idna') + if host is not None: + if not isinstance(host, bytes): + raise TypeError('host must be a str or bytes') + + fut = self._new_future() + + def callback(result): + if AddrInfo.isinstance(result): + try: + if unpack == 0: + data = result + else: + data = (<AddrInfo>result).unpack() + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as ex: + if not fut.cancelled(): + fut.set_exception(ex) + else: + if not fut.cancelled(): + fut.set_result(data) + else: + if not fut.cancelled(): + fut.set_exception(result) + + AddrInfoRequest(self, host, port, family, type, proto, flags, callback) + return fut + + cdef _getnameinfo(self, system.sockaddr *addr, int flags): + cdef NameInfoRequest nr + fut = self._new_future() + + def callback(result): + if isinstance(result, tuple): + fut.set_result(result) + else: + fut.set_exception(result) + + nr = NameInfoRequest(self, callback) + nr.query(addr, flags) + return fut + + cdef _sock_recv(self, fut, sock, n): + if UVLOOP_DEBUG: + if fut.cancelled(): + # Shouldn't happen with _SyncSocketReaderFuture. + raise RuntimeError( + f'_sock_recv is called on a cancelled Future') + + if not self._has_reader(sock): + raise RuntimeError( + f'socket {sock!r} does not have a reader ' + f'in the _sock_recv callback') + + try: + data = sock.recv(n) + except (BlockingIOError, InterruptedError): + # No need to re-add the reader, let's just wait until + # the poll handler calls this callback again. + pass + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + fut.set_exception(exc) + self._remove_reader(sock) + else: + fut.set_result(data) + self._remove_reader(sock) + + cdef _sock_recv_into(self, fut, sock, buf): + if UVLOOP_DEBUG: + if fut.cancelled(): + # Shouldn't happen with _SyncSocketReaderFuture. + raise RuntimeError( + f'_sock_recv_into is called on a cancelled Future') + + if not self._has_reader(sock): + raise RuntimeError( + f'socket {sock!r} does not have a reader ' + f'in the _sock_recv_into callback') + + try: + data = sock.recv_into(buf) + except (BlockingIOError, InterruptedError): + # No need to re-add the reader, let's just wait until + # the poll handler calls this callback again. + pass + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + fut.set_exception(exc) + self._remove_reader(sock) + else: + fut.set_result(data) + self._remove_reader(sock) + + cdef _sock_sendall(self, fut, sock, data): + cdef: + Handle handle + int n + + if UVLOOP_DEBUG: + if fut.cancelled(): + # Shouldn't happen with _SyncSocketWriterFuture. + raise RuntimeError( + f'_sock_sendall is called on a cancelled Future') + + if not self._has_writer(sock): + raise RuntimeError( + f'socket {sock!r} does not have a writer ' + f'in the _sock_sendall callback') + + try: + n = sock.send(data) + except (BlockingIOError, InterruptedError): + # Try next time. + return + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + fut.set_exception(exc) + self._remove_writer(sock) + return + + self._remove_writer(sock) + + if n == len(data): + fut.set_result(None) + else: + if n: + if not isinstance(data, memoryview): + data = memoryview(data) + data = data[n:] + + handle = new_MethodHandle3( + self, + "Loop._sock_sendall", + <method3_t>self._sock_sendall, + None, + self, + fut, sock, data) + + self._add_writer(sock, handle) + + cdef _sock_accept(self, fut, sock): + try: + conn, address = sock.accept() + conn.setblocking(False) + except (BlockingIOError, InterruptedError): + # There is an active reader for _sock_accept, so + # do nothing, it will be called again. + pass + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + fut.set_exception(exc) + self._remove_reader(sock) + else: + fut.set_result((conn, address)) + self._remove_reader(sock) + + cdef _sock_connect(self, sock, address): + cdef: + Handle handle + + try: + sock.connect(address) + except (BlockingIOError, InterruptedError): + pass + else: + return + + fut = _SyncSocketWriterFuture(sock, self) + handle = new_MethodHandle3( + self, + "Loop._sock_connect", + <method3_t>self._sock_connect_cb, + None, + self, + fut, sock, address) + + self._add_writer(sock, handle) + return fut + + cdef _sock_connect_cb(self, fut, sock, address): + if UVLOOP_DEBUG: + if fut.cancelled(): + # Shouldn't happen with _SyncSocketWriterFuture. + raise RuntimeError( + f'_sock_connect_cb is called on a cancelled Future') + + if not self._has_writer(sock): + raise RuntimeError( + f'socket {sock!r} does not have a writer ' + f'in the _sock_connect_cb callback') + + try: + err = sock.getsockopt(uv.SOL_SOCKET, uv.SO_ERROR) + if err != 0: + # Jump to any except clause below. + raise OSError(err, 'Connect call failed %s' % (address,)) + except (BlockingIOError, InterruptedError): + # socket is still registered, the callback will be retried later + pass + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + fut.set_exception(exc) + self._remove_writer(sock) + else: + fut.set_result(None) + self._remove_writer(sock) + + cdef _sock_set_reuseport(self, int fd): + cdef: + int err + int reuseport_flag = 1 + + err = system.setsockopt( + fd, + uv.SOL_SOCKET, + SO_REUSEPORT, + <char*>&reuseport_flag, + sizeof(reuseport_flag)) + + if err < 0: + raise convert_error(-errno.errno) + + cdef _set_coroutine_debug(self, bint enabled): + enabled = bool(enabled) + if self._coroutine_debug_set == enabled: + return + + if enabled: + self._coroutine_origin_tracking_saved_depth = ( + sys.get_coroutine_origin_tracking_depth()) + sys.set_coroutine_origin_tracking_depth( + DEBUG_STACK_DEPTH) + else: + sys.set_coroutine_origin_tracking_depth( + self._coroutine_origin_tracking_saved_depth) + + self._coroutine_debug_set = enabled + + def _get_backend_id(self): + """This method is used by uvloop tests and is not part of the API.""" + return uv.uv_backend_fd(self.uvloop) + + cdef _print_debug_info(self): + cdef: + int err + uv.uv_rusage_t rusage + + err = uv.uv_getrusage(&rusage) + if err < 0: + raise convert_error(err) + + # OS + + print('---- Process info: -----') + print('Process memory: {}'.format(rusage.ru_maxrss)) + print('Number of signals: {}'.format(rusage.ru_nsignals)) + print('') + + # Loop + + print('--- Loop debug info: ---') + print('Loop time: {}'.format(self.time())) + print('Errors logged: {}'.format( + self._debug_exception_handler_cnt)) + print() + print('Callback handles: {: <8} | {}'.format( + self._debug_cb_handles_count, + self._debug_cb_handles_total)) + print('Timer handles: {: <8} | {}'.format( + self._debug_cb_timer_handles_count, + self._debug_cb_timer_handles_total)) + print() + + print(' alive | closed |') + print('UVHandles python | libuv | total') + print(' objs | handles |') + print('-------------------------------+---------+---------') + for name in sorted(self._debug_handles_total): + print(' {: <18} {: >7} | {: >7} | {: >7}'.format( + name, + self._debug_handles_current[name], + self._debug_handles_closed[name], + self._debug_handles_total[name])) + print() + + print('uv_handle_t (current: {}; freed: {}; total: {})'.format( + self._debug_uv_handles_total - self._debug_uv_handles_freed, + self._debug_uv_handles_freed, + self._debug_uv_handles_total)) + print() + + print('--- Streams debug info: ---') + print('Write errors: {}'.format( + self._debug_stream_write_errors_total)) + print('Write without poll: {}'.format( + self._debug_stream_write_tries)) + print('Write contexts: {: <8} | {}'.format( + self._debug_stream_write_ctx_cnt, + self._debug_stream_write_ctx_total)) + print('Write failed callbacks: {}'.format( + self._debug_stream_write_cb_errors_total)) + print() + print('Read errors: {}'.format( + self._debug_stream_read_errors_total)) + print('Read callbacks: {}'.format( + self._debug_stream_read_cb_total)) + print('Read failed callbacks: {}'.format( + self._debug_stream_read_cb_errors_total)) + print('Read EOFs: {}'.format( + self._debug_stream_read_eof_total)) + print('Read EOF failed callbacks: {}'.format( + self._debug_stream_read_eof_cb_errors_total)) + print() + print('Listen errors: {}'.format( + self._debug_stream_listen_errors_total)) + print('Shutdown errors {}'.format( + self._debug_stream_shutdown_errors_total)) + print() + + print('--- Polls debug info: ---') + print('Read events: {}'.format( + self._poll_read_events_total)) + print('Read callbacks failed: {}'.format( + self._poll_read_cb_errors_total)) + print('Write events: {}'.format( + self._poll_write_events_total)) + print('Write callbacks failed: {}'.format( + self._poll_write_cb_errors_total)) + print() + + print('--- Sock ops successful on 1st try: ---') + print('Socket try-writes: {}'.format( + self._sock_try_write_total)) + + print(flush=True) + + property print_debug_info: + def __get__(self): + if UVLOOP_DEBUG: + return lambda: self._print_debug_info() + else: + raise AttributeError('print_debug_info') + + # Public API + + def __repr__(self): + return '<{}.{} running={} closed={} debug={}>'.format( + self.__class__.__module__, + self.__class__.__name__, + self.is_running(), + self.is_closed(), + self.get_debug() + ) + + def call_soon(self, callback, *args, context=None): + """Arrange for a callback to be called as soon as possible. + + This operates as a FIFO queue: callbacks are called in the + order in which they are registered. Each callback will be + called exactly once. + + Any positional arguments after the callback will be passed to + the callback when it is called. + """ + if self._debug == 1: + self._check_thread() + if args: + return self._call_soon(callback, args, context) + else: + return self._call_soon(callback, None, context) + + def call_soon_threadsafe(self, callback, *args, context=None): + """Like call_soon(), but thread-safe.""" + if not args: + args = None + cdef Handle handle = new_Handle(self, callback, args, context) + self._append_ready_handle(handle) # deque append is atomic + # libuv async handler is thread-safe while the idle handler is not - + # we only set the async handler here, which will start the idle handler + # in _on_wake() from the loop and eventually call the callback. + self.handler_async.send() + return handle + + def call_later(self, delay, callback, *args, context=None): + """Arrange for a callback to be called at a given time. + + Return a Handle: an opaque object with a cancel() method that + can be used to cancel the call. + + The delay can be an int or float, expressed in seconds. It is + always relative to the current time. + + Each callback will be called exactly once. If two callbacks + are scheduled for exactly the same time, it undefined which + will be called first. + + Any positional arguments after the callback will be passed to + the callback when it is called. + """ + cdef uint64_t when + + self._check_closed() + if self._debug == 1: + self._check_thread() + + if delay < 0: + delay = 0 + elif delay == py_inf or delay > MAX_SLEEP: + # ~100 years sounds like a good approximation of + # infinity for a Python application. + delay = MAX_SLEEP + + when = <uint64_t>round(delay * 1000) + if not args: + args = None + if when == 0: + return self._call_soon(callback, args, context) + else: + return self._call_later(when, callback, args, context) + + def call_at(self, when, callback, *args, context=None): + """Like call_later(), but uses an absolute time. + + Absolute time corresponds to the event loop's time() method. + """ + return self.call_later( + when - self.time(), callback, *args, context=context) + + def time(self): + """Return the time according to the event loop's clock. + + This is a float expressed in seconds since an epoch, but the + epoch, precision, accuracy and drift are unspecified and may + differ per event loop. + """ + return self._time() / 1000 + + def stop(self): + """Stop running the event loop. + + Every callback already scheduled will still run. This simply informs + run_forever to stop looping after a complete iteration. + """ + self._call_soon_handle( + new_MethodHandle1( + self, + "Loop._stop", + <method1_t>self._stop, + None, + self, + None)) + + def run_forever(self): + """Run the event loop until stop() is called.""" + self._check_closed() + mode = uv.UV_RUN_DEFAULT + if self._stopping: + # loop.stop() was called right before loop.run_forever(). + # This is how asyncio loop behaves. + mode = uv.UV_RUN_NOWAIT + self._set_coroutine_debug(self._debug) + old_agen_hooks = sys.get_asyncgen_hooks() + sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, + finalizer=self._asyncgen_finalizer_hook) + try: + self._run(mode) + finally: + self._set_coroutine_debug(False) + sys.set_asyncgen_hooks(*old_agen_hooks) + + def close(self): + """Close the event loop. + + The event loop must not be running. + + This is idempotent and irreversible. + + No other methods should be called after this one. + """ + self._close() + + def get_debug(self): + return bool(self._debug) + + def set_debug(self, enabled): + self._debug = bool(enabled) + if self.is_running(): + self.call_soon_threadsafe( + self._set_coroutine_debug, self, self._debug) + + def is_running(self): + """Return whether the event loop is currently running.""" + return bool(self._running) + + def is_closed(self): + """Returns True if the event loop was closed.""" + return bool(self._closed) + + def create_future(self): + """Create a Future object attached to the loop.""" + return self._new_future() + + def create_task(self, coro, *, name=None, context=None): + """Schedule a coroutine object. + + Return a task object. + + If name is not None, task.set_name(name) will be called if the task + object has the set_name attribute, true for default Task in CPython. + + An optional keyword-only context argument allows specifying a custom + contextvars.Context for the coro to run in. The current context copy is + created when no context is provided. + """ + self._check_closed() + if PY311: + if self._task_factory is None: + task = aio_Task(coro, loop=self, context=context) + else: + task = self._task_factory(self, coro, context=context) + else: + if context is None: + if self._task_factory is None: + task = aio_Task(coro, loop=self) + else: + task = self._task_factory(self, coro) + else: + if self._task_factory is None: + task = context.run(aio_Task, coro, self) + else: + task = context.run(self._task_factory, self, coro) + + # copied from asyncio.tasks._set_task_name (bpo-34270) + if name is not None: + try: + set_name = task.set_name + except AttributeError: + pass + else: + set_name(name) + + return task + + def set_task_factory(self, factory): + """Set a task factory that will be used by loop.create_task(). + + If factory is None the default task factory will be set. + + If factory is a callable, it should have a signature matching + '(loop, coro)', where 'loop' will be a reference to the active + event loop, 'coro' will be a coroutine object. The callable + must return a Future. + """ + if factory is not None and not callable(factory): + raise TypeError('task factory must be a callable or None') + self._task_factory = factory + + def get_task_factory(self): + """Return a task factory, or None if the default one is in use.""" + return self._task_factory + + def run_until_complete(self, future): + """Run until the Future is done. + + If the argument is a coroutine, it is wrapped in a Task. + + WARNING: It would be disastrous to call run_until_complete() + with the same coroutine twice -- it would wrap it in two + different Tasks and that can't be good. + + Return the Future's result, or raise its exception. + """ + self._check_closed() + + new_task = not isfuture(future) + future = aio_ensure_future(future, loop=self) + if new_task: + # An exception is raised if the future didn't complete, so there + # is no need to log the "destroy pending task" message + future._log_destroy_pending = False + + def done_cb(fut): + if not fut.cancelled(): + exc = fut.exception() + if isinstance(exc, (SystemExit, KeyboardInterrupt)): + # Issue #336: run_forever() already finished, + # no need to stop it. + return + self.stop() + + future.add_done_callback(done_cb) + try: + self.run_forever() + except BaseException: + if new_task and future.done() and not future.cancelled(): + # The coroutine raised a BaseException. Consume the exception + # to not log a warning, the caller doesn't have access to the + # local task. + future.exception() + raise + finally: + future.remove_done_callback(done_cb) + if not future.done(): + raise RuntimeError('Event loop stopped before Future completed.') + + return future.result() + + @cython.iterable_coroutine + async def getaddrinfo(self, object host, object port, *, + int family=0, int type=0, int proto=0, int flags=0): + + addr = __static_getaddrinfo_pyaddr(host, port, family, + type, proto, flags) + if addr is not None: + return [addr] + + return await self._getaddrinfo( + host, port, family, type, proto, flags, 1) + + @cython.iterable_coroutine + async def getnameinfo(self, sockaddr, int flags=0): + cdef: + AddrInfo ai_cnt + system.addrinfo *ai + system.sockaddr_in6 *sin6 + + if not isinstance(sockaddr, tuple): + raise TypeError('getnameinfo() argument 1 must be a tuple') + + sl = len(sockaddr) + + if sl < 2 or sl > 4: + raise ValueError('sockaddr must be a tuple of 2, 3 or 4 values') + + if sl > 2: + flowinfo = sockaddr[2] + if flowinfo < 0 or flowinfo > 0xfffff: + raise OverflowError( + 'getnameinfo(): flowinfo must be 0-1048575.') + else: + flowinfo = 0 + + if sl > 3: + scope_id = sockaddr[3] + if scope_id < 0 or scope_id > 2 ** 32: + raise OverflowError( + 'getsockaddrarg: scope_id must be unsigned 32 bit integer') + else: + scope_id = 0 + + ai_cnt = await self._getaddrinfo( + sockaddr[0], sockaddr[1], + uv.AF_UNSPEC, # family + uv.SOCK_DGRAM, # type + 0, # proto + uv.AI_NUMERICHOST, # flags + 0) # unpack + + ai = ai_cnt.data + + if ai.ai_next: + raise OSError("sockaddr resolved to multiple addresses") + + if ai.ai_family == uv.AF_INET: + if sl > 2: + raise OSError("IPv4 sockaddr must be 2 tuple") + elif ai.ai_family == uv.AF_INET6: + # Modify some fields in `ai` + sin6 = <system.sockaddr_in6*> ai.ai_addr + sin6.sin6_flowinfo = system.htonl(flowinfo) + sin6.sin6_scope_id = scope_id + + return await self._getnameinfo(ai.ai_addr, flags) + + @cython.iterable_coroutine + async def start_tls(self, transport, protocol, sslcontext, *, + server_side=False, + server_hostname=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None): + """Upgrade transport to TLS. + + Return a new transport that *protocol* should start using + immediately. + """ + if not isinstance(sslcontext, ssl_SSLContext): + raise TypeError( + f'sslcontext is expected to be an instance of ssl.SSLContext, ' + f'got {sslcontext!r}') + + if isinstance(transport, (TCPTransport, UnixTransport)): + context = (<UVStream>transport).context + elif isinstance(transport, _SSLProtocolTransport): + context = (<_SSLProtocolTransport>transport).context + else: + raise TypeError( + f'transport {transport!r} is not supported by start_tls()') + + waiter = self._new_future() + ssl_protocol = SSLProtocol( + self, protocol, sslcontext, waiter, + server_side, server_hostname, + ssl_handshake_timeout=ssl_handshake_timeout, + ssl_shutdown_timeout=ssl_shutdown_timeout, + call_connection_made=False) + + # Pause early so that "ssl_protocol.data_received()" doesn't + # have a chance to get called before "ssl_protocol.connection_made()". + transport.pause_reading() + + transport.set_protocol(ssl_protocol) + conmade_cb = self.call_soon(ssl_protocol.connection_made, transport, + context=context) + # transport.resume_reading() will use the right context + # (transport.context) to call e.g. data_received() + resume_cb = self.call_soon(transport.resume_reading) + app_transport = ssl_protocol._get_app_transport(context) + + try: + await waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + app_transport.close() + conmade_cb.cancel() + resume_cb.cancel() + raise + + return app_transport + + @cython.iterable_coroutine + async def create_server(self, protocol_factory, host=None, port=None, + *, + int family=uv.AF_UNSPEC, + int flags=uv.AI_PASSIVE, + sock=None, + backlog=100, + ssl=None, + reuse_address=None, + reuse_port=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None, + start_serving=True): + """A coroutine which creates a TCP server bound to host and port. + + The return value is a Server object which can be used to stop + the service. + + If host is an empty string or None all interfaces are assumed + and a list of multiple sockets will be returned (most likely + one for IPv4 and another one for IPv6). The host parameter can also be + a sequence (e.g. list) of hosts to bind to. + + family can be set to either AF_INET or AF_INET6 to force the + socket to use IPv4 or IPv6. If not set it will be determined + from host (defaults to AF_UNSPEC). + + flags is a bitmask for getaddrinfo(). + + sock can optionally be specified in order to use a preexisting + socket object. + + backlog is the maximum number of queued connections passed to + listen() (defaults to 100). + + ssl can be set to an SSLContext to enable SSL over the + accepted connections. + + reuse_address tells the kernel to reuse a local socket in + TIME_WAIT state, without waiting for its natural timeout to + expire. If not specified will automatically be set to True on + UNIX. + + reuse_port tells the kernel to allow this endpoint to be bound to + the same port as other existing endpoints are bound to, so long as + they all set this flag when being created. This option is not + supported on Windows. + + ssl_handshake_timeout is the time in seconds that an SSL server + will wait for completion of the SSL handshake before aborting the + connection. Default is 60s. + + ssl_shutdown_timeout is the time in seconds that an SSL server + will wait for completion of the SSL shutdown before aborting the + connection. Default is 30s. + """ + cdef: + TCPServer tcp + system.addrinfo *addrinfo + Server server + + if sock is not None and sock.family == uv.AF_UNIX: + if host is not None or port is not None: + raise ValueError( + 'host/port and sock can not be specified at the same time') + return await self.create_unix_server( + protocol_factory, sock=sock, backlog=backlog, ssl=ssl, + start_serving=start_serving) + + server = Server(self) + + if ssl is not None: + if not isinstance(ssl, ssl_SSLContext): + raise TypeError('ssl argument must be an SSLContext or None') + else: + if ssl_handshake_timeout is not None: + raise ValueError( + 'ssl_handshake_timeout is only meaningful with ssl') + if ssl_shutdown_timeout is not None: + raise ValueError( + 'ssl_shutdown_timeout is only meaningful with ssl') + + if host is not None or port is not None: + if sock is not None: + raise ValueError( + 'host/port and sock can not be specified at the same time') + + if reuse_address is None: + reuse_address = os_name == 'posix' and sys_platform != 'cygwin' + reuse_port = bool(reuse_port) + if reuse_port and not has_SO_REUSEPORT: + raise ValueError( + 'reuse_port not supported by socket module') + + if host == '': + hosts = [None] + elif (isinstance(host, str) or not isinstance(host, col_Iterable)): + hosts = [host] + else: + hosts = host + + fs = [self._getaddrinfo(host, port, family, + uv.SOCK_STREAM, 0, flags, + 0) for host in hosts] + + infos = await aio_gather(*fs) + + completed = False + sock = None + try: + for info in infos: + addrinfo = (<AddrInfo>info).data + while addrinfo != NULL: + if addrinfo.ai_family == uv.AF_UNSPEC: + raise RuntimeError('AF_UNSPEC in DNS results') + + try: + sock = socket_socket(addrinfo.ai_family, + addrinfo.ai_socktype, + addrinfo.ai_protocol) + except socket_error: + # Assume it's a bad family/type/protocol + # combination. + if self._debug: + aio_logger.warning( + 'create_server() failed to create ' + 'socket.socket(%r, %r, %r)', + addrinfo.ai_family, + addrinfo.ai_socktype, + addrinfo.ai_protocol, exc_info=True) + addrinfo = addrinfo.ai_next + continue + + if reuse_address: + sock.setsockopt(uv.SOL_SOCKET, uv.SO_REUSEADDR, 1) + if reuse_port: + sock.setsockopt(uv.SOL_SOCKET, uv.SO_REUSEPORT, 1) + # Disable IPv4/IPv6 dual stack support (enabled by + # default on Linux) which makes a single socket + # listen on both address families. + if (addrinfo.ai_family == uv.AF_INET6 and + has_IPV6_V6ONLY): + sock.setsockopt(uv.IPPROTO_IPV6, IPV6_V6ONLY, 1) + + pyaddr = __convert_sockaddr_to_pyaddr(addrinfo.ai_addr) + try: + sock.bind(pyaddr) + except OSError as err: + raise OSError( + err.errno, 'error while attempting ' + 'to bind on address %r: %s' + % (pyaddr, err.strerror.lower())) from None + + tcp = TCPServer.new(self, protocol_factory, server, + uv.AF_UNSPEC, backlog, + ssl, ssl_handshake_timeout, + ssl_shutdown_timeout) + + try: + tcp._open(sock.fileno()) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + tcp._close() + raise + + server._add_server(tcp) + sock.detach() + sock = None + + addrinfo = addrinfo.ai_next + + completed = True + finally: + if not completed: + if sock is not None: + sock.close() + server.close() + else: + if sock is None: + raise ValueError('Neither host/port nor sock were specified') + if not _is_sock_stream(sock.type): + raise ValueError( + 'A Stream Socket was expected, got {!r}'.format(sock)) + + # libuv will set the socket to non-blocking mode, but + # we want Python socket object to notice that. + sock.setblocking(False) + + tcp = TCPServer.new(self, protocol_factory, server, + uv.AF_UNSPEC, backlog, + ssl, ssl_handshake_timeout, + ssl_shutdown_timeout) + + try: + tcp._open(sock.fileno()) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + tcp._close() + raise + + tcp._attach_fileobj(sock) + server._add_server(tcp) + + if start_serving: + server._start_serving() + + server._ref() + return server + + @cython.iterable_coroutine + async def create_connection(self, protocol_factory, host=None, port=None, + *, + ssl=None, + family=0, proto=0, flags=0, sock=None, + local_addr=None, server_hostname=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None): + """Connect to a TCP server. + + Create a streaming transport connection to a given Internet host and + port: socket family AF_INET or socket.AF_INET6 depending on host (or + family if specified), socket type SOCK_STREAM. protocol_factory must be + a callable returning a protocol instance. + + This method is a coroutine which will try to establish the connection + in the background. When successful, the coroutine returns a + (transport, protocol) pair. + """ + cdef: + AddrInfo ai_local = None + AddrInfo ai_remote + TCPTransport tr + + system.addrinfo *rai = NULL + system.addrinfo *lai = NULL + + system.addrinfo *rai_iter = NULL + system.addrinfo *lai_iter = NULL + + system.addrinfo rai_static + system.sockaddr_storage rai_addr_static + system.addrinfo lai_static + system.sockaddr_storage lai_addr_static + + object app_protocol + object app_transport + object protocol + object ssl_waiter + + if sock is not None and sock.family == uv.AF_UNIX: + if host is not None or port is not None: + raise ValueError( + 'host/port and sock can not be specified at the same time') + return await self.create_unix_connection( + protocol_factory, None, + sock=sock, ssl=ssl, server_hostname=server_hostname) + + app_protocol = protocol = protocol_factory() + ssl_waiter = None + context = Context_CopyCurrent() + if ssl: + if server_hostname is None: + if not host: + raise ValueError('You must set server_hostname ' + 'when using ssl without a host') + server_hostname = host + + ssl_waiter = self._new_future() + sslcontext = None if isinstance(ssl, bool) else ssl + protocol = SSLProtocol( + self, app_protocol, sslcontext, ssl_waiter, + False, server_hostname, + ssl_handshake_timeout=ssl_handshake_timeout, + ssl_shutdown_timeout=ssl_shutdown_timeout) + else: + if server_hostname is not None: + raise ValueError('server_hostname is only meaningful with ssl') + if ssl_handshake_timeout is not None: + raise ValueError( + 'ssl_handshake_timeout is only meaningful with ssl') + if ssl_shutdown_timeout is not None: + raise ValueError( + 'ssl_shutdown_timeout is only meaningful with ssl') + + if host is not None or port is not None: + if sock is not None: + raise ValueError( + 'host/port and sock can not be specified at the same time') + + fs = [] + f1 = f2 = None + + addr = __static_getaddrinfo( + host, port, family, uv.SOCK_STREAM, + proto, <system.sockaddr*>&rai_addr_static) + + if addr is None: + f1 = self._getaddrinfo( + host, port, family, + uv.SOCK_STREAM, proto, flags, + 0) # 0 == don't unpack + + fs.append(f1) + else: + rai_static.ai_addr = <system.sockaddr*>&rai_addr_static + rai_static.ai_next = NULL + rai = &rai_static + + if local_addr is not None: + if not isinstance(local_addr, (tuple, list)) or \ + len(local_addr) != 2: + raise ValueError( + 'local_addr must be a tuple of host and port') + + addr = __static_getaddrinfo( + local_addr[0], local_addr[1], + family, uv.SOCK_STREAM, + proto, <system.sockaddr*>&lai_addr_static) + if addr is None: + f2 = self._getaddrinfo( + local_addr[0], local_addr[1], family, + uv.SOCK_STREAM, proto, flags, + 0) # 0 == don't unpack + + fs.append(f2) + else: + lai_static.ai_addr = <system.sockaddr*>&lai_addr_static + lai_static.ai_next = NULL + lai = &lai_static + + if len(fs): + await aio_wait(fs) + + if rai is NULL: + ai_remote = f1.result() + if ai_remote.data is NULL: + raise OSError('getaddrinfo() returned empty list') + rai = ai_remote.data + + if lai is NULL and f2 is not None: + ai_local = f2.result() + if ai_local.data is NULL: + raise OSError( + 'getaddrinfo() returned empty list for local_addr') + lai = ai_local.data + + exceptions = [] + rai_iter = rai + while rai_iter is not NULL: + tr = None + try: + waiter = self._new_future() + tr = TCPTransport.new(self, protocol, None, waiter, + context) + + if lai is not NULL: + lai_iter = lai + while lai_iter is not NULL: + try: + tr.bind(lai_iter.ai_addr) + break + except OSError as exc: + exceptions.append(exc) + lai_iter = lai_iter.ai_next + else: + tr._close() + tr = None + + rai_iter = rai_iter.ai_next + continue + + tr.connect(rai_iter.ai_addr) + await waiter + + except OSError as exc: + if tr is not None: + tr._close() + tr = None + exceptions.append(exc) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + if tr is not None: + tr._close() + tr = None + raise + else: + break + + rai_iter = rai_iter.ai_next + + else: + # If they all have the same str(), raise one. + model = str(exceptions[0]) + if all(str(exc) == model for exc in exceptions): + raise exceptions[0] + # Raise a combined exception so the user can see all + # the various error messages. + raise OSError('Multiple exceptions: {}'.format( + ', '.join(str(exc) for exc in exceptions))) + else: + if sock is None: + raise ValueError( + 'host and port was not specified and no sock specified') + if not _is_sock_stream(sock.type): + raise ValueError( + 'A Stream Socket was expected, got {!r}'.format(sock)) + + # libuv will set the socket to non-blocking mode, but + # we want Python socket object to notice that. + sock.setblocking(False) + + waiter = self._new_future() + tr = TCPTransport.new(self, protocol, None, waiter, context) + try: + # libuv will make socket non-blocking + tr._open(sock.fileno()) + tr._init_protocol() + await waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + # It's OK to call `_close()` here, as opposed to + # `_force_close()` or `close()` as we want to terminate the + # transport immediately. The `waiter` can only be waken + # up in `Transport._call_connection_made()`, and calling + # `_close()` before it is fine. + tr._close() + raise + + tr._attach_fileobj(sock) + + if ssl: + app_transport = protocol._get_app_transport(context) + try: + await ssl_waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + app_transport.close() + raise + return app_transport, app_protocol + else: + return tr, protocol + + @cython.iterable_coroutine + async def create_unix_server(self, protocol_factory, path=None, + *, backlog=100, sock=None, ssl=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None, + start_serving=True): + """A coroutine which creates a UNIX Domain Socket server. + + The return value is a Server object, which can be used to stop + the service. + + path is a str, representing a file systsem path to bind the + server socket to. + + sock can optionally be specified in order to use a preexisting + socket object. + + backlog is the maximum number of queued connections passed to + listen() (defaults to 100). + + ssl can be set to an SSLContext to enable SSL over the + accepted connections. + + ssl_handshake_timeout is the time in seconds that an SSL server + will wait for completion of the SSL handshake before aborting the + connection. Default is 60s. + + ssl_shutdown_timeout is the time in seconds that an SSL server + will wait for completion of the SSL shutdown before aborting the + connection. Default is 30s. + """ + cdef: + UnixServer pipe + Server server = Server(self) + + if ssl is not None: + if not isinstance(ssl, ssl_SSLContext): + raise TypeError('ssl argument must be an SSLContext or None') + else: + if ssl_handshake_timeout is not None: + raise ValueError( + 'ssl_handshake_timeout is only meaningful with ssl') + if ssl_shutdown_timeout is not None: + raise ValueError( + 'ssl_shutdown_timeout is only meaningful with ssl') + + if path is not None: + if sock is not None: + raise ValueError( + 'path and sock can not be specified at the same time') + orig_path = path + + path = os_fspath(path) + + if isinstance(path, str): + path = PyUnicode_EncodeFSDefault(path) + + # Check for abstract socket. + if path[0] != 0: + try: + if stat_S_ISSOCK(os_stat(path).st_mode): + os_remove(path) + except FileNotFoundError: + pass + except OSError as err: + # Directory may have permissions only to create socket. + aio_logger.error( + 'Unable to check or remove stale UNIX socket %r: %r', + orig_path, err) + + # We use Python sockets to create a UNIX server socket because + # when UNIX sockets are created by libuv, libuv removes the path + # they were bound to. This is different from asyncio, which + # doesn't cleanup the socket path. + sock = socket_socket(uv.AF_UNIX) + + try: + sock.bind(path) + except OSError as exc: + sock.close() + if exc.errno == errno.EADDRINUSE: + # Let's improve the error message by adding + # with what exact address it occurs. + msg = 'Address {!r} is already in use'.format(orig_path) + raise OSError(errno.EADDRINUSE, msg) from None + else: + raise + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + sock.close() + raise + + else: + if sock is None: + raise ValueError( + 'path was not specified, and no sock specified') + + if sock.family != uv.AF_UNIX or not _is_sock_stream(sock.type): + raise ValueError( + 'A UNIX Domain Stream Socket was expected, got {!r}' + .format(sock)) + + # libuv will set the socket to non-blocking mode, but + # we want Python socket object to notice that. + sock.setblocking(False) + + pipe = UnixServer.new( + self, protocol_factory, server, backlog, + ssl, ssl_handshake_timeout, ssl_shutdown_timeout) + + try: + pipe._open(sock.fileno()) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + pipe._close() + sock.close() + raise + + pipe._attach_fileobj(sock) + server._add_server(pipe) + + if start_serving: + server._start_serving() + + return server + + @cython.iterable_coroutine + async def create_unix_connection(self, protocol_factory, path=None, *, + ssl=None, sock=None, + server_hostname=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None): + + cdef: + UnixTransport tr + object app_protocol + object app_transport + object protocol + object ssl_waiter + + app_protocol = protocol = protocol_factory() + ssl_waiter = None + context = Context_CopyCurrent() + if ssl: + if server_hostname is None: + raise ValueError('You must set server_hostname ' + 'when using ssl without a host') + + ssl_waiter = self._new_future() + sslcontext = None if isinstance(ssl, bool) else ssl + protocol = SSLProtocol( + self, app_protocol, sslcontext, ssl_waiter, + False, server_hostname, + ssl_handshake_timeout=ssl_handshake_timeout, + ssl_shutdown_timeout=ssl_shutdown_timeout) + else: + if server_hostname is not None: + raise ValueError('server_hostname is only meaningful with ssl') + if ssl_handshake_timeout is not None: + raise ValueError( + 'ssl_handshake_timeout is only meaningful with ssl') + if ssl_shutdown_timeout is not None: + raise ValueError( + 'ssl_shutdown_timeout is only meaningful with ssl') + + if path is not None: + if sock is not None: + raise ValueError( + 'path and sock can not be specified at the same time') + + path = os_fspath(path) + + if isinstance(path, str): + path = PyUnicode_EncodeFSDefault(path) + + waiter = self._new_future() + tr = UnixTransport.new(self, protocol, None, waiter, context) + tr.connect(path) + try: + await waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + tr._close() + raise + + else: + if sock is None: + raise ValueError('no path and sock were specified') + + if sock.family != uv.AF_UNIX or not _is_sock_stream(sock.type): + raise ValueError( + 'A UNIX Domain Stream Socket was expected, got {!r}' + .format(sock)) + + # libuv will set the socket to non-blocking mode, but + # we want Python socket object to notice that. + sock.setblocking(False) + + waiter = self._new_future() + tr = UnixTransport.new(self, protocol, None, waiter, context) + try: + tr._open(sock.fileno()) + tr._init_protocol() + await waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + tr._close() + raise + + tr._attach_fileobj(sock) + + if ssl: + app_transport = protocol._get_app_transport(Context_CopyCurrent()) + try: + await ssl_waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + app_transport.close() + raise + return app_transport, app_protocol + else: + return tr, protocol + + def default_exception_handler(self, context): + """Default exception handler. + + This is called when an exception occurs and no exception + handler is set, and can be called by a custom exception + handler that wants to defer to the default behavior. + + The context parameter has the same meaning as in + `call_exception_handler()`. + """ + message = context.get('message') + if not message: + message = 'Unhandled exception in event loop' + + exception = context.get('exception') + if exception is not None: + exc_info = (type(exception), exception, exception.__traceback__) + else: + exc_info = False + + log_lines = [message] + for key in sorted(context): + if key in {'message', 'exception'}: + continue + value = context[key] + if key == 'source_traceback': + tb = ''.join(tb_format_list(value)) + value = 'Object created at (most recent call last):\n' + value += tb.rstrip() + else: + try: + value = repr(value) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as ex: + value = ('Exception in __repr__ {!r}; ' + 'value type: {!r}'.format(ex, type(value))) + log_lines.append('{}: {}'.format(key, value)) + + aio_logger.error('\n'.join(log_lines), exc_info=exc_info) + + def get_exception_handler(self): + """Return an exception handler, or None if the default one is in use. + """ + return self._exception_handler + + def set_exception_handler(self, handler): + """Set handler as the new event loop exception handler. + + If handler is None, the default exception handler will + be set. + + If handler is a callable object, it should have a + signature matching '(loop, context)', where 'loop' + will be a reference to the active event loop, 'context' + will be a dict object (see `call_exception_handler()` + documentation for details about context). + """ + if handler is not None and not callable(handler): + raise TypeError('A callable object or None is expected, ' + 'got {!r}'.format(handler)) + self._exception_handler = handler + + def call_exception_handler(self, context): + """Call the current event loop's exception handler. + + The context argument is a dict containing the following keys: + + - 'message': Error message; + - 'exception' (optional): Exception object; + - 'future' (optional): Future instance; + - 'handle' (optional): Handle instance; + - 'protocol' (optional): Protocol instance; + - 'transport' (optional): Transport instance; + - 'socket' (optional): Socket instance. + + New keys maybe introduced in the future. + + Note: do not overload this method in an event loop subclass. + For custom exception handling, use the + `set_exception_handler()` method. + """ + if UVLOOP_DEBUG: + self._debug_exception_handler_cnt += 1 + + if self._exception_handler is None: + try: + self.default_exception_handler(context) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + # Second protection layer for unexpected errors + # in the default implementation, as well as for subclassed + # event loops with overloaded "default_exception_handler". + aio_logger.error('Exception in default exception handler', + exc_info=True) + else: + try: + self._exception_handler(self, context) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + # Exception in the user set custom exception handler. + try: + # Let's try default handler. + self.default_exception_handler({ + 'message': 'Unhandled error in exception handler', + 'exception': exc, + 'context': context, + }) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + # Guard 'default_exception_handler' in case it is + # overloaded. + aio_logger.error('Exception in default exception handler ' + 'while handling an unexpected error ' + 'in custom exception handler', + exc_info=True) + + def add_reader(self, fileobj, callback, *args): + """Add a reader callback.""" + if len(args) == 0: + args = None + self._add_reader(fileobj, new_Handle(self, callback, args, None)) + + def remove_reader(self, fileobj): + """Remove a reader callback.""" + self._remove_reader(fileobj) + + def add_writer(self, fileobj, callback, *args): + """Add a writer callback..""" + if len(args) == 0: + args = None + self._add_writer(fileobj, new_Handle(self, callback, args, None)) + + def remove_writer(self, fileobj): + """Remove a writer callback.""" + self._remove_writer(fileobj) + + @cython.iterable_coroutine + async def sock_recv(self, sock, n): + """Receive data from the socket. + + The return value is a bytes object representing the data received. + The maximum amount of data to be received at once is specified by + nbytes. + + This method is a coroutine. + """ + cdef: + Handle handle + + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + + fut = _SyncSocketReaderFuture(sock, self) + handle = new_MethodHandle3( + self, + "Loop._sock_recv", + <method3_t>self._sock_recv, + None, + self, + fut, sock, n) + + self._add_reader(sock, handle) + return await fut + + @cython.iterable_coroutine + async def sock_recv_into(self, sock, buf): + """Receive data from the socket. + + The received data is written into *buf* (a writable buffer). + The return value is the number of bytes written. + + This method is a coroutine. + """ + cdef: + Handle handle + + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + + fut = _SyncSocketReaderFuture(sock, self) + handle = new_MethodHandle3( + self, + "Loop._sock_recv_into", + <method3_t>self._sock_recv_into, + None, + self, + fut, sock, buf) + + self._add_reader(sock, handle) + return await fut + + @cython.iterable_coroutine + async def sock_sendall(self, sock, data): + """Send data to the socket. + + The socket must be connected to a remote socket. This method continues + to send data from data until either all data has been sent or an + error occurs. None is returned on success. On error, an exception is + raised, and there is no way to determine how much data, if any, was + successfully processed by the receiving end of the connection. + + This method is a coroutine. + """ + cdef: + Handle handle + ssize_t n + + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + + if not data: + return + + socket_inc_io_ref(sock) + try: + try: + n = sock.send(data) + except (BlockingIOError, InterruptedError): + pass + else: + if UVLOOP_DEBUG: + # This can be a partial success, i.e. only part + # of the data was sent + self._sock_try_write_total += 1 + + if n == len(data): + return + if not isinstance(data, memoryview): + data = memoryview(data) + data = data[n:] + + fut = _SyncSocketWriterFuture(sock, self) + handle = new_MethodHandle3( + self, + "Loop._sock_sendall", + <method3_t>self._sock_sendall, + None, + self, + fut, sock, data) + + self._add_writer(sock, handle) + return await fut + finally: + socket_dec_io_ref(sock) + + @cython.iterable_coroutine + async def sock_accept(self, sock): + """Accept a connection. + + The socket must be bound to an address and listening for connections. + The return value is a pair (conn, address) where conn is a new socket + object usable to send and receive data on the connection, and address + is the address bound to the socket on the other end of the connection. + + This method is a coroutine. + """ + cdef: + Handle handle + + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + + fut = _SyncSocketReaderFuture(sock, self) + handle = new_MethodHandle2( + self, + "Loop._sock_accept", + <method2_t>self._sock_accept, + None, + self, + fut, sock) + + self._add_reader(sock, handle) + return await fut + + @cython.iterable_coroutine + async def sock_connect(self, sock, address): + """Connect to a remote socket at address. + + This method is a coroutine. + """ + if self._debug and sock.gettimeout() != 0: + raise ValueError("the socket must be non-blocking") + + socket_inc_io_ref(sock) + try: + if sock.family == uv.AF_UNIX: + fut = self._sock_connect(sock, address) + else: + addrs = await self.getaddrinfo( + *address[:2], family=sock.family) + + _, _, _, _, address = addrs[0] + fut = self._sock_connect(sock, address) + if fut is not None: + await fut + finally: + socket_dec_io_ref(sock) + + @cython.iterable_coroutine + async def sock_recvfrom(self, sock, bufsize): + raise NotImplementedError + + @cython.iterable_coroutine + async def sock_recvfrom_into(self, sock, buf, nbytes=0): + raise NotImplementedError + + @cython.iterable_coroutine + async def sock_sendto(self, sock, data, address): + raise NotImplementedError + + @cython.iterable_coroutine + async def connect_accepted_socket(self, protocol_factory, sock, *, + ssl=None, + ssl_handshake_timeout=None, + ssl_shutdown_timeout=None): + """Handle an accepted connection. + + This is used by servers that accept connections outside of + asyncio but that use asyncio to handle connections. + + This method is a coroutine. When completed, the coroutine + returns a (transport, protocol) pair. + """ + + cdef: + UVStream transport = None + + if ssl is not None: + if not isinstance(ssl, ssl_SSLContext): + raise TypeError('ssl argument must be an SSLContext or None') + else: + if ssl_handshake_timeout is not None: + raise ValueError( + 'ssl_handshake_timeout is only meaningful with ssl') + if ssl_shutdown_timeout is not None: + raise ValueError( + 'ssl_shutdown_timeout is only meaningful with ssl') + + if not _is_sock_stream(sock.type): + raise ValueError( + 'A Stream Socket was expected, got {!r}'.format(sock)) + + app_protocol = protocol_factory() + waiter = self._new_future() + transport_waiter = None + context = Context_CopyCurrent() + + if ssl is None: + protocol = app_protocol + transport_waiter = waiter + else: + protocol = SSLProtocol( + self, app_protocol, ssl, waiter, + server_side=True, + server_hostname=None, + ssl_handshake_timeout=ssl_handshake_timeout, + ssl_shutdown_timeout=ssl_shutdown_timeout) + transport_waiter = None + + if sock.family == uv.AF_UNIX: + transport = <UVStream>UnixTransport.new( + self, protocol, None, transport_waiter, context) + elif sock.family in (uv.AF_INET, uv.AF_INET6): + transport = <UVStream>TCPTransport.new( + self, protocol, None, transport_waiter, context) + + if transport is None: + raise ValueError( + 'invalid socket family, expected AF_UNIX, AF_INET or AF_INET6') + + transport._open(sock.fileno()) + transport._init_protocol() + transport._attach_fileobj(sock) + + if ssl: + app_transport = protocol._get_app_transport(context) + try: + await waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + app_transport.close() + raise + return app_transport, protocol + else: + try: + await waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + transport._close() + raise + return transport, protocol + + def run_in_executor(self, executor, func, *args): + if aio_iscoroutine(func) or aio_iscoroutinefunction(func): + raise TypeError("coroutines cannot be used with run_in_executor()") + + self._check_closed() + + if executor is None: + executor = self._default_executor + # Only check when the default executor is being used + self._check_default_executor() + if executor is None: + executor = cc_ThreadPoolExecutor() + self._default_executor = executor + + return aio_wrap_future(executor.submit(func, *args), loop=self) + + def set_default_executor(self, executor): + self._default_executor = executor + + @cython.iterable_coroutine + async def __subprocess_run(self, protocol_factory, args, + stdin=subprocess_PIPE, + stdout=subprocess_PIPE, + stderr=subprocess_PIPE, + universal_newlines=False, + shell=True, + bufsize=0, + preexec_fn=None, + close_fds=None, + cwd=None, + env=None, + startupinfo=None, + creationflags=0, + restore_signals=True, + start_new_session=False, + executable=None, + pass_fds=(), + # For tests only! Do not use in your code. Ever. + __uvloop_sleep_after_fork=False): + + # TODO: Implement close_fds (might not be very important in + # Python 3.5, since all FDs aren't inheritable by default.) + + cdef: + int debug_flags = 0 + + if universal_newlines: + raise ValueError("universal_newlines must be False") + if bufsize != 0: + raise ValueError("bufsize must be 0") + if startupinfo is not None: + raise ValueError('startupinfo is not supported') + if creationflags != 0: + raise ValueError('creationflags is not supported') + + if executable is not None: + args[0] = executable + + if __uvloop_sleep_after_fork: + debug_flags |= __PROCESS_DEBUG_SLEEP_AFTER_FORK + + waiter = self._new_future() + protocol = protocol_factory() + proc = UVProcessTransport.new(self, protocol, + args, env, cwd, start_new_session, + stdin, stdout, stderr, pass_fds, + waiter, + debug_flags, + preexec_fn, + restore_signals) + + try: + await waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + proc.close() + raise + + return proc, protocol + + @cython.iterable_coroutine + async def subprocess_shell(self, protocol_factory, cmd, *, + shell=True, + **kwargs): + + if not shell: + raise ValueError("shell must be True") + + args = [cmd] + if shell: + args = [b'/bin/sh', b'-c'] + args + + return await self.__subprocess_run(protocol_factory, args, shell=True, + **kwargs) + + @cython.iterable_coroutine + async def subprocess_exec(self, protocol_factory, program, *args, + shell=False, **kwargs): + + if shell: + raise ValueError("shell must be False") + + args = list((program,) + args) + + return await self.__subprocess_run(protocol_factory, args, shell=False, + **kwargs) + + @cython.iterable_coroutine + async def connect_read_pipe(self, proto_factory, pipe): + """Register read pipe in event loop. Set the pipe to non-blocking mode. + + protocol_factory should instantiate object with Protocol interface. + pipe is a file-like object. + Return pair (transport, protocol), where transport supports the + ReadTransport interface.""" + cdef: + ReadUnixTransport transp + + waiter = self._new_future() + proto = proto_factory() + transp = ReadUnixTransport.new(self, proto, None, waiter) + transp._add_extra_info('pipe', pipe) + try: + transp._open(pipe.fileno()) + transp._init_protocol() + await waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + transp._close() + raise + transp._attach_fileobj(pipe) + return transp, proto + + @cython.iterable_coroutine + async def connect_write_pipe(self, proto_factory, pipe): + """Register write pipe in event loop. + + protocol_factory should instantiate object with BaseProtocol interface. + Pipe is file-like object already switched to nonblocking. + Return pair (transport, protocol), where transport support + WriteTransport interface.""" + cdef: + WriteUnixTransport transp + + waiter = self._new_future() + proto = proto_factory() + transp = WriteUnixTransport.new(self, proto, None, waiter) + transp._add_extra_info('pipe', pipe) + try: + transp._open(pipe.fileno()) + transp._init_protocol() + await waiter + except (KeyboardInterrupt, SystemExit): + raise + except BaseException: + transp._close() + raise + transp._attach_fileobj(pipe) + return transp, proto + + def add_signal_handler(self, sig, callback, *args): + """Add a handler for a signal. UNIX only. + + Raise ValueError if the signal number is invalid or uncatchable. + Raise RuntimeError if there is a problem setting up the handler. + """ + cdef: + Handle h + + if not self._is_main_thread(): + raise ValueError( + 'add_signal_handler() can only be called from ' + 'the main thread') + + if (aio_iscoroutine(callback) + or aio_iscoroutinefunction(callback)): + raise TypeError( + "coroutines cannot be used with add_signal_handler()") + + if sig == uv.SIGCHLD: + if (hasattr(callback, '__self__') and + isinstance(callback.__self__, aio_AbstractChildWatcher)): + + warnings_warn( + "!!! asyncio is trying to install its ChildWatcher for " + "SIGCHLD signal !!!\n\nThis is probably because a uvloop " + "instance is used with asyncio.set_event_loop(). " + "The correct way to use uvloop is to install its policy: " + "`asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())`" + "\n\n", RuntimeWarning, source=self) + + # TODO: ideally we should always raise an error here, + # but that would be a backwards incompatible change, + # because we recommended using "asyncio.set_event_loop()" + # in our README. Need to start a deprecation period + # at some point to turn this warning into an error. + return + + raise RuntimeError( + 'cannot add a signal handler for SIGCHLD: it is used ' + 'by the event loop to track subprocesses') + + self._check_signal(sig) + self._check_closed() + + h = new_Handle(self, callback, args or None, None) + self._signal_handlers[sig] = h + + try: + # Register a dummy signal handler to ask Python to write the signal + # number in the wakeup file descriptor. + signal_signal(sig, self.__sighandler) + + # Set SA_RESTART to limit EINTR occurrences. + signal_siginterrupt(sig, False) + except OSError as exc: + del self._signal_handlers[sig] + if not self._signal_handlers: + try: + signal_set_wakeup_fd(-1) + except (ValueError, OSError) as nexc: + aio_logger.info('set_wakeup_fd(-1) failed: %s', nexc) + + if exc.errno == errno_EINVAL: + raise RuntimeError('sig {} cannot be caught'.format(sig)) + else: + raise + + def remove_signal_handler(self, sig): + """Remove a handler for a signal. UNIX only. + + Return True if a signal handler was removed, False if not. + """ + + if not self._is_main_thread(): + raise ValueError( + 'remove_signal_handler() can only be called from ' + 'the main thread') + + self._check_signal(sig) + + if not self._listening_signals: + return False + + try: + del self._signal_handlers[sig] + except KeyError: + return False + + if sig == uv.SIGINT: + handler = signal_default_int_handler + else: + handler = signal_SIG_DFL + + try: + signal_signal(sig, handler) + except OSError as exc: + if exc.errno == errno_EINVAL: + raise RuntimeError('sig {} cannot be caught'.format(sig)) + else: + raise + + return True + + @cython.iterable_coroutine + async def create_datagram_endpoint(self, protocol_factory, + local_addr=None, remote_addr=None, *, + family=0, proto=0, flags=0, + reuse_address=_unset, reuse_port=None, + allow_broadcast=None, sock=None): + """A coroutine which creates a datagram endpoint. + + This method will try to establish the endpoint in the background. + When successful, the coroutine returns a (transport, protocol) pair. + + protocol_factory must be a callable returning a protocol instance. + + socket family AF_INET or socket.AF_INET6 depending on host (or + family if specified), socket type SOCK_DGRAM. + + reuse_port tells the kernel to allow this endpoint to be bound to + the same port as other existing endpoints are bound to, so long as + they all set this flag when being created. This option is not + supported on Windows and some UNIX's. If the + :py:data:`~socket.SO_REUSEPORT` constant is not defined then this + capability is unsupported. + + allow_broadcast tells the kernel to allow this endpoint to send + messages to the broadcast address. + + sock can optionally be specified in order to use a preexisting + socket object. + """ + cdef: + UDPTransport udp = None + system.addrinfo * lai + system.addrinfo * rai + + if sock is not None: + if not _is_sock_dgram(sock.type): + raise ValueError( + 'A UDP Socket was expected, got {!r}'.format(sock)) + if (local_addr or remote_addr or + family or proto or flags or + reuse_port or allow_broadcast): + # show the problematic kwargs in exception msg + opts = dict(local_addr=local_addr, remote_addr=remote_addr, + family=family, proto=proto, flags=flags, + reuse_address=reuse_address, reuse_port=reuse_port, + allow_broadcast=allow_broadcast) + problems = ', '.join( + '{}={}'.format(k, v) for k, v in opts.items() if v) + raise ValueError( + 'socket modifier keyword arguments can not be used ' + 'when sock is specified. ({})'.format(problems)) + sock.setblocking(False) + udp = UDPTransport.__new__(UDPTransport) + udp._init(self, uv.AF_UNSPEC) + udp.open(sock.family, sock.fileno()) + udp._attach_fileobj(sock) + else: + if reuse_address is not _unset: + if reuse_address: + raise ValueError("Passing `reuse_address=True` is no " + "longer supported, as the usage of " + "SO_REUSEPORT in UDP poses a significant " + "security concern.") + else: + warnings_warn("The *reuse_address* parameter has been " + "deprecated as of 0.15.", DeprecationWarning, + stacklevel=2) + reuse_port = bool(reuse_port) + if reuse_port and not has_SO_REUSEPORT: + raise ValueError( + 'reuse_port not supported by socket module') + + lads = None + if local_addr is not None: + if (not isinstance(local_addr, (tuple, list)) or + len(local_addr) != 2): + raise TypeError( + 'local_addr must be a tuple of (host, port)') + lads = await self._getaddrinfo( + local_addr[0], local_addr[1], + family, uv.SOCK_DGRAM, proto, flags, + 0) + + rads = None + if remote_addr is not None: + if (not isinstance(remote_addr, (tuple, list)) or + len(remote_addr) != 2): + raise TypeError( + 'remote_addr must be a tuple of (host, port)') + rads = await self._getaddrinfo( + remote_addr[0], remote_addr[1], + family, uv.SOCK_DGRAM, proto, flags, + 0) + + excs = [] + if lads is None: + if rads is not None: + udp = UDPTransport.__new__(UDPTransport) + rai = (<AddrInfo>rads).data + udp._init(self, rai.ai_family) + udp._connect(rai.ai_addr, rai.ai_addrlen) + udp._set_address(rai) + else: + if family not in (uv.AF_INET, uv.AF_INET6): + raise ValueError('unexpected address family') + udp = UDPTransport.__new__(UDPTransport) + udp._init(self, family) + + if reuse_port: + self._sock_set_reuseport(udp._fileno()) + + else: + lai = (<AddrInfo>lads).data + while lai is not NULL: + try: + udp = UDPTransport.__new__(UDPTransport) + udp._init(self, lai.ai_family) + if reuse_port: + self._sock_set_reuseport(udp._fileno()) + udp._bind(lai.ai_addr) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as ex: + lai = lai.ai_next + excs.append(ex) + continue + else: + break + else: + ctx = None + if len(excs): + ctx = excs[0] + raise OSError('could not bind to local_addr {}'.format( + local_addr)) from ctx + + if rads is not None: + rai = (<AddrInfo>rads).data + while rai is not NULL: + if rai.ai_family != lai.ai_family: + rai = rai.ai_next + continue + if rai.ai_protocol != lai.ai_protocol: + rai = rai.ai_next + continue + udp._connect(rai.ai_addr, rai.ai_addrlen) + udp._set_address(rai) + break + else: + raise OSError( + 'could not bind to remote_addr {}'.format( + remote_addr)) + + if allow_broadcast: + udp._set_broadcast(1) + + protocol = protocol_factory() + waiter = self._new_future() + assert udp is not None + udp._set_protocol(protocol) + udp._set_waiter(waiter) + udp._init_protocol() + + await waiter + return udp, protocol + + def _monitor_fs(self, path: str, callback) -> asyncio.Handle: + cdef: + UVFSEvent fs_handle + char* c_str_path + + self._check_closed() + fs_handle = UVFSEvent.new(self, callback, None) + p_bytes = path.encode('UTF-8') + c_str_path = p_bytes + flags = 0 + fs_handle.start(c_str_path, flags) + return fs_handle + + def _check_default_executor(self): + if self._executor_shutdown_called: + raise RuntimeError('Executor shutdown has been called') + + def _asyncgen_finalizer_hook(self, agen): + self._asyncgens.discard(agen) + if not self.is_closed(): + self.call_soon_threadsafe(self.create_task, agen.aclose()) + + def _asyncgen_firstiter_hook(self, agen): + if self._asyncgens_shutdown_called: + warnings_warn( + "asynchronous generator {!r} was scheduled after " + "loop.shutdown_asyncgens() call".format(agen), + ResourceWarning, source=self) + + self._asyncgens.add(agen) + + @cython.iterable_coroutine + async def shutdown_asyncgens(self): + """Shutdown all active asynchronous generators.""" + self._asyncgens_shutdown_called = True + + if not len(self._asyncgens): + return + + closing_agens = list(self._asyncgens) + self._asyncgens.clear() + + shutdown_coro = aio_gather( + *[ag.aclose() for ag in closing_agens], + return_exceptions=True) + + results = await shutdown_coro + for result, agen in zip(results, closing_agens): + if isinstance(result, Exception): + self.call_exception_handler({ + 'message': 'an error occurred during closing of ' + 'asynchronous generator {!r}'.format(agen), + 'exception': result, + 'asyncgen': agen + }) + + @cython.iterable_coroutine + async def shutdown_default_executor(self, timeout=None): + """Schedule the shutdown of the default executor. + + The timeout parameter specifies the amount of time the executor will + be given to finish joining. The default value is None, which means + that the executor will be given an unlimited amount of time. + """ + self._executor_shutdown_called = True + if self._default_executor is None: + return + future = self.create_future() + thread = threading_Thread(target=self._do_shutdown, args=(future,)) + thread.start() + try: + await future + finally: + thread.join(timeout) + + if thread.is_alive(): + warnings_warn( + "The executor did not finishing joining " + f"its threads within {timeout} seconds.", + RuntimeWarning, + stacklevel=2 + ) + self._default_executor.shutdown(wait=False) + + def _do_shutdown(self, future): + try: + self._default_executor.shutdown(wait=True) + self.call_soon_threadsafe(future.set_result, None) + except Exception as ex: + self.call_soon_threadsafe(future.set_exception, ex) + + +# Expose pointer for integration with other C-extensions +def libuv_get_loop_t_ptr(loop): + return PyCapsule_New(<void *>(<Loop>loop).uvloop, NULL, NULL) + + +def libuv_get_version(): + return uv.uv_version() + + +def _testhelper_unwrap_capsuled_pointer(obj): + return <uint64_t>PyCapsule_GetPointer(obj, NULL) + + +cdef void __loop_alloc_buffer( + uv.uv_handle_t* uvhandle, + size_t suggested_size, + uv.uv_buf_t* buf +) noexcept with gil: + cdef: + Loop loop = (<UVHandle>uvhandle.data)._loop + + if loop._recv_buffer_in_use == 1: + buf.len = 0 + exc = RuntimeError('concurrent allocations') + loop._handle_exception(exc) + return + + loop._recv_buffer_in_use = 1 + buf.base = loop._recv_buffer + buf.len = sizeof(loop._recv_buffer) + + +cdef inline void __loop_free_buffer(Loop loop): + loop._recv_buffer_in_use = 0 + + +class _SyncSocketReaderFuture(aio_Future): + + def __init__(self, sock, loop): + aio_Future.__init__(self, loop=loop) + self.__sock = sock + self.__loop = loop + + def __remove_reader(self): + if self.__sock is not None and self.__sock.fileno() != -1: + self.__loop.remove_reader(self.__sock) + self.__sock = None + + if PY39: + def cancel(self, msg=None): + self.__remove_reader() + aio_Future.cancel(self, msg=msg) + + else: + def cancel(self): + self.__remove_reader() + aio_Future.cancel(self) + + +class _SyncSocketWriterFuture(aio_Future): + + def __init__(self, sock, loop): + aio_Future.__init__(self, loop=loop) + self.__sock = sock + self.__loop = loop + + def __remove_writer(self): + if self.__sock is not None and self.__sock.fileno() != -1: + self.__loop.remove_writer(self.__sock) + self.__sock = None + + if PY39: + def cancel(self, msg=None): + self.__remove_writer() + aio_Future.cancel(self, msg=msg) + + else: + def cancel(self): + self.__remove_writer() + aio_Future.cancel(self) + + +include "cbhandles.pyx" +include "pseudosock.pyx" +include "lru.pyx" + +include "handles/handle.pyx" +include "handles/async_.pyx" +include "handles/idle.pyx" +include "handles/check.pyx" +include "handles/timer.pyx" +include "handles/poll.pyx" +include "handles/basetransport.pyx" +include "handles/stream.pyx" +include "handles/streamserver.pyx" +include "handles/tcp.pyx" +include "handles/pipe.pyx" +include "handles/process.pyx" +include "handles/fsevent.pyx" + +include "request.pyx" +include "dns.pyx" +include "sslproto.pyx" + +include "handles/udp.pyx" + +include "server.pyx" + + +# Used in UVProcess +cdef vint __atfork_installed = 0 +cdef vint __forking = 0 +cdef Loop __forking_loop = None + + +cdef void __get_fork_handler() noexcept nogil: + with gil: + if (__forking and __forking_loop is not None and + __forking_loop.active_process_handler is not None): + __forking_loop.active_process_handler._after_fork() + +cdef __install_atfork(): + global __atfork_installed + + if __atfork_installed: + return + __atfork_installed = 1 + + cdef int err + + err = system.pthread_atfork(NULL, NULL, &system.handleAtFork) + if err: + __atfork_installed = 0 + raise convert_error(-err) + + +# Install PyMem* memory allocators +cdef vint __mem_installed = 0 +cdef __install_pymem(): + global __mem_installed + if __mem_installed: + return + __mem_installed = 1 + + cdef int err + err = uv.uv_replace_allocator(<uv.uv_malloc_func>PyMem_RawMalloc, + <uv.uv_realloc_func>PyMem_RawRealloc, + <uv.uv_calloc_func>PyMem_RawCalloc, + <uv.uv_free_func>PyMem_RawFree) + if err < 0: + __mem_installed = 0 + raise convert_error(err) + + +cdef _set_signal_wakeup_fd(fd): + if fd >= 0: + return signal_set_wakeup_fd(fd, warn_on_full_buffer=False) + else: + return signal_set_wakeup_fd(fd) + + +# Helpers for tests + +@cython.iterable_coroutine +async def _test_coroutine_1(): + return 42 |