diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/loop.pyx')
-rw-r--r-- | venv/lib/python3.11/site-packages/uvloop/loop.pyx | 3403 |
1 files changed, 0 insertions, 3403 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/loop.pyx b/venv/lib/python3.11/site-packages/uvloop/loop.pyx deleted file mode 100644 index 334d8d5..0000000 --- a/venv/lib/python3.11/site-packages/uvloop/loop.pyx +++ /dev/null @@ -1,3403 +0,0 @@ -# cython: language_level=3, embedsignature=True - -import asyncio -cimport cython - -from .includes.debug cimport UVLOOP_DEBUG -from .includes cimport uv -from .includes cimport system -from .includes.python cimport ( - PY_VERSION_HEX, - PyMem_RawMalloc, PyMem_RawFree, - PyMem_RawCalloc, PyMem_RawRealloc, - PyUnicode_EncodeFSDefault, - PyErr_SetInterrupt, - _Py_RestoreSignals, - Context_CopyCurrent, - Context_Enter, - Context_Exit, - PyMemoryView_FromMemory, PyBUF_WRITE, - PyMemoryView_FromObject, PyMemoryView_Check, - PyOS_AfterFork_Parent, PyOS_AfterFork_Child, - PyOS_BeforeFork, - PyUnicode_FromString -) -from .includes.flowcontrol cimport add_flowcontrol_defaults - -from libc.stdint cimport uint64_t -from libc.string cimport memset, strerror, memcpy -from libc cimport errno - -from cpython cimport PyObject -from cpython cimport PyErr_CheckSignals, PyErr_Occurred -from cpython cimport PyThread_get_thread_ident -from cpython cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF -from cpython cimport ( - PyObject_GetBuffer, PyBuffer_Release, PyBUF_SIMPLE, - Py_buffer, PyBytes_AsString, PyBytes_CheckExact, - PyBytes_AsStringAndSize, - Py_SIZE, PyBytes_AS_STRING, PyBUF_WRITABLE -) -from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer - -from . import _noop - - -include "includes/consts.pxi" -include "includes/stdlib.pxi" - -include "errors.pyx" - -cdef: - int PY39 = PY_VERSION_HEX >= 0x03090000 - int PY311 = PY_VERSION_HEX >= 0x030b0000 - uint64_t MAX_SLEEP = 3600 * 24 * 365 * 100 - - -cdef _is_sock_stream(sock_type): - if SOCK_NONBLOCK == -1: - return sock_type == uv.SOCK_STREAM - else: - # Linux's socket.type is a bitmask that can include extra info - # about socket (like SOCK_NONBLOCK bit), therefore we can't do simple - # `sock_type == socket.SOCK_STREAM`, see - # https://github.com/torvalds/linux/blob/v4.13/include/linux/net.h#L77 - # for more details. - return (sock_type & 0xF) == uv.SOCK_STREAM - - -cdef _is_sock_dgram(sock_type): - if SOCK_NONBLOCK == -1: - return sock_type == uv.SOCK_DGRAM - else: - # Read the comment in `_is_sock_stream`. - return (sock_type & 0xF) == uv.SOCK_DGRAM - - -cdef isfuture(obj): - if aio_isfuture is None: - return isinstance(obj, aio_Future) - else: - return aio_isfuture(obj) - - -cdef inline socket_inc_io_ref(sock): - if isinstance(sock, socket_socket): - sock._io_refs += 1 - - -cdef inline socket_dec_io_ref(sock): - if isinstance(sock, socket_socket): - sock._decref_socketios() - - -cdef inline run_in_context(context, method): - # This method is internally used to workaround a reference issue that in - # certain circumstances, inlined context.run() will not hold a reference to - # the given method instance, which - if deallocated - will cause segfault. - # See also: edgedb/edgedb#2222 - Py_INCREF(method) - try: - return context.run(method) - finally: - Py_DECREF(method) - - -cdef inline run_in_context1(context, method, arg): - Py_INCREF(method) - try: - return context.run(method, arg) - finally: - Py_DECREF(method) - - -cdef inline run_in_context2(context, method, arg1, arg2): - Py_INCREF(method) - try: - return context.run(method, arg1, arg2) - finally: - Py_DECREF(method) - - -# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s -# *reuse_address* parameter -_unset = object() - - -@cython.no_gc_clear -cdef class Loop: - def __cinit__(self): - cdef int err - - # Install PyMem* memory allocators if they aren't installed yet. - __install_pymem() - - # Install pthread_atfork handlers - __install_atfork() - - self.uvloop = <uv.uv_loop_t*>PyMem_RawMalloc(sizeof(uv.uv_loop_t)) - if self.uvloop is NULL: - raise MemoryError() - - self.slow_callback_duration = 0.1 - - self._closed = 0 - self._debug = 0 - self._thread_id = 0 - self._running = 0 - self._stopping = 0 - - self._transports = weakref_WeakValueDictionary() - self._processes = set() - - # Used to keep a reference (and hence keep the fileobj alive) - # for as long as its registered by add_reader or add_writer. - # This is how the selector module and hence asyncio behaves. - self._fd_to_reader_fileobj = {} - self._fd_to_writer_fileobj = {} - - self._timers = set() - self._polls = {} - - self._recv_buffer_in_use = 0 - - err = uv.uv_loop_init(self.uvloop) - if err < 0: - raise convert_error(err) - self.uvloop.data = <void*> self - - self._init_debug_fields() - - self.active_process_handler = None - - self._last_error = None - - self._task_factory = None - self._exception_handler = None - self._default_executor = None - - self._queued_streams = set() - self._executing_streams = set() - self._ready = col_deque() - self._ready_len = 0 - - self.handler_async = UVAsync.new( - self, <method_t>self._on_wake, self) - - self.handler_idle = UVIdle.new( - self, - new_MethodHandle( - self, "loop._on_idle", <method_t>self._on_idle, None, self)) - - # Needed to call `UVStream._exec_write` for writes scheduled - # during `Protocol.data_received`. - self.handler_check__exec_writes = UVCheck.new( - self, - new_MethodHandle( - self, "loop._exec_queued_writes", - <method_t>self._exec_queued_writes, None, self)) - - self._signals = set() - self._ssock = self._csock = None - self._signal_handlers = {} - self._listening_signals = False - self._old_signal_wakeup_id = -1 - - self._coroutine_debug_set = False - - # A weak set of all asynchronous generators that are - # being iterated by the loop. - self._asyncgens = weakref_WeakSet() - - # Set to True when `loop.shutdown_asyncgens` is called. - self._asyncgens_shutdown_called = False - # Set to True when `loop.shutdown_default_executor` is called. - self._executor_shutdown_called = False - - self._servers = set() - - cdef inline _is_main_thread(self): - cdef uint64_t main_thread_id = system.MAIN_THREAD_ID - if system.MAIN_THREAD_ID_SET == 0: - main_thread_id = <uint64_t>threading_main_thread().ident - system.setMainThreadID(main_thread_id) - return main_thread_id == PyThread_get_thread_ident() - - def __init__(self): - self.set_debug( - sys_dev_mode or (not sys_ignore_environment - and bool(os_environ.get('PYTHONASYNCIODEBUG')))) - - def __dealloc__(self): - if self._running == 1: - raise RuntimeError('deallocating a running event loop!') - if self._closed == 0: - aio_logger.error("deallocating an open event loop") - return - PyMem_RawFree(self.uvloop) - self.uvloop = NULL - - cdef _init_debug_fields(self): - self._debug_cc = bool(UVLOOP_DEBUG) - - if UVLOOP_DEBUG: - self._debug_handles_current = col_Counter() - self._debug_handles_closed = col_Counter() - self._debug_handles_total = col_Counter() - else: - self._debug_handles_current = None - self._debug_handles_closed = None - self._debug_handles_total = None - - self._debug_uv_handles_total = 0 - self._debug_uv_handles_freed = 0 - - self._debug_stream_read_cb_total = 0 - self._debug_stream_read_eof_total = 0 - self._debug_stream_read_errors_total = 0 - self._debug_stream_read_cb_errors_total = 0 - self._debug_stream_read_eof_cb_errors_total = 0 - - self._debug_stream_shutdown_errors_total = 0 - self._debug_stream_listen_errors_total = 0 - - self._debug_stream_write_tries = 0 - self._debug_stream_write_errors_total = 0 - self._debug_stream_write_ctx_total = 0 - self._debug_stream_write_ctx_cnt = 0 - self._debug_stream_write_cb_errors_total = 0 - - self._debug_cb_handles_total = 0 - self._debug_cb_handles_count = 0 - - self._debug_cb_timer_handles_total = 0 - self._debug_cb_timer_handles_count = 0 - - self._poll_read_events_total = 0 - self._poll_read_cb_errors_total = 0 - self._poll_write_events_total = 0 - self._poll_write_cb_errors_total = 0 - - self._sock_try_write_total = 0 - - self._debug_exception_handler_cnt = 0 - - cdef _setup_or_resume_signals(self): - if not self._is_main_thread(): - return - - if self._listening_signals: - raise RuntimeError('signals handling has been already setup') - - if self._ssock is not None: - raise RuntimeError('self-pipe exists before loop run') - - # Create a self-pipe and call set_signal_wakeup_fd() with one - # of its ends. This is needed so that libuv knows that it needs - # to wakeup on ^C (no matter if the SIGINT handler is still the - # standard Python's one or or user set their own.) - - self._ssock, self._csock = socket_socketpair() - try: - self._ssock.setblocking(False) - self._csock.setblocking(False) - - fileno = self._csock.fileno() - - self._old_signal_wakeup_id = _set_signal_wakeup_fd(fileno) - except Exception: - # Out of all statements in the try block, only the - # "_set_signal_wakeup_fd()" call can fail, but it shouldn't, - # as we ensure that the current thread is the main thread. - # Still, if something goes horribly wrong we want to clean up - # the socket pair. - self._ssock.close() - self._csock.close() - self._ssock = None - self._csock = None - raise - - self._add_reader( - self._ssock, - new_MethodHandle( - self, - "Loop._read_from_self", - <method_t>self._read_from_self, - None, - self)) - - self._listening_signals = True - - cdef _pause_signals(self): - if not self._is_main_thread(): - if self._listening_signals: - raise RuntimeError( - 'cannot pause signals handling; no longer running in ' - 'the main thread') - else: - return - - if not self._listening_signals: - raise RuntimeError('signals handling has not been setup') - - self._listening_signals = False - - _set_signal_wakeup_fd(self._old_signal_wakeup_id) - - self._remove_reader(self._ssock) - self._ssock.close() - self._csock.close() - self._ssock = None - self._csock = None - - cdef _shutdown_signals(self): - if not self._is_main_thread(): - if self._signal_handlers: - aio_logger.warning( - 'cannot cleanup signal handlers: closing the event loop ' - 'in a non-main OS thread') - return - - if self._listening_signals: - raise RuntimeError( - 'cannot shutdown signals handling as it has not been paused') - - if self._ssock: - raise RuntimeError( - 'self-pipe was not cleaned up after loop was run') - - for sig in list(self._signal_handlers): - self.remove_signal_handler(sig) - - def __sighandler(self, signum, frame): - self._signals.add(signum) - - cdef inline _ceval_process_signals(self): - # Invoke CPython eval loop to let process signals. - PyErr_CheckSignals() - # Calling a pure-Python function will invoke - # _PyEval_EvalFrameDefault which will process - # pending signal callbacks. - _noop.noop() # Might raise ^C - - cdef _read_from_self(self): - cdef bytes sigdata - sigdata = b'' - while True: - try: - data = self._ssock.recv(65536) - if not data: - break - sigdata += data - except InterruptedError: - continue - except BlockingIOError: - break - if sigdata: - self._invoke_signals(sigdata) - - cdef _invoke_signals(self, bytes data): - cdef set sigs - - self._ceval_process_signals() - - sigs = self._signals.copy() - self._signals.clear() - for signum in data: - if not signum: - # ignore null bytes written by set_wakeup_fd() - continue - sigs.discard(signum) - self._handle_signal(signum) - - for signum in sigs: - # Since not all signals are registered by add_signal_handler() - # (for instance, we use the default SIGINT handler) not all - # signals will trigger loop.__sighandler() callback. Therefore - # we combine two datasources: one is self-pipe, one is data - # from __sighandler; this ensures that signals shouldn't be - # lost even if set_wakeup_fd() couldn't write to the self-pipe. - self._handle_signal(signum) - - cdef _handle_signal(self, sig): - cdef Handle handle - - try: - handle = <Handle>(self._signal_handlers[sig]) - except KeyError: - handle = None - - if handle is None: - self._ceval_process_signals() - return - - if handle._cancelled: - self.remove_signal_handler(sig) # Remove it properly. - else: - self._append_ready_handle(handle) - self.handler_async.send() - - cdef _on_wake(self): - if ((self._ready_len > 0 or self._stopping) and - not self.handler_idle.running): - self.handler_idle.start() - - cdef _on_idle(self): - cdef: - int i, ntodo - object popleft = self._ready.popleft - Handle handler - - ntodo = len(self._ready) - if self._debug: - for i from 0 <= i < ntodo: - handler = <Handle> popleft() - if handler._cancelled == 0: - try: - started = time_monotonic() - handler._run() - except BaseException as ex: - self._stop(ex) - return - else: - delta = time_monotonic() - started - if delta > self.slow_callback_duration: - aio_logger.warning( - 'Executing %s took %.3f seconds', - handler._format_handle(), delta) - - else: - for i from 0 <= i < ntodo: - handler = <Handle> popleft() - if handler._cancelled == 0: - try: - handler._run() - except BaseException as ex: - self._stop(ex) - return - - if len(self._queued_streams): - self._exec_queued_writes() - - self._ready_len = len(self._ready) - if self._ready_len == 0 and self.handler_idle.running: - self.handler_idle.stop() - - if self._stopping: - uv.uv_stop(self.uvloop) # void - - cdef _stop(self, exc): - if exc is not None: - self._last_error = exc - if self._stopping == 1: - return - self._stopping = 1 - if not self.handler_idle.running: - self.handler_idle.start() - - cdef __run(self, uv.uv_run_mode mode): - # Although every UVHandle holds a reference to the loop, - # we want to do everything to ensure that the loop will - # never deallocate during the run -- so we do some - # manual refs management. - Py_INCREF(self) - with nogil: - err = uv.uv_run(self.uvloop, mode) - Py_DECREF(self) - - if err < 0: - raise convert_error(err) - - cdef _run(self, uv.uv_run_mode mode): - cdef int err - - if self._closed == 1: - raise RuntimeError('unable to start the loop; it was closed') - - if self._running == 1: - raise RuntimeError('this event loop is already running.') - - if (aio_get_running_loop is not None and - aio_get_running_loop() is not None): - raise RuntimeError( - 'Cannot run the event loop while another loop is running') - - # reset _last_error - self._last_error = None - - self._thread_id = PyThread_get_thread_ident() - self._running = 1 - - self.handler_check__exec_writes.start() - self.handler_idle.start() - - self._setup_or_resume_signals() - - if aio_set_running_loop is not None: - aio_set_running_loop(self) - try: - self.__run(mode) - finally: - if aio_set_running_loop is not None: - aio_set_running_loop(None) - - self.handler_check__exec_writes.stop() - self.handler_idle.stop() - - self._pause_signals() - - self._thread_id = 0 - self._running = 0 - self._stopping = 0 - - if self._last_error is not None: - # The loop was stopped with an error with 'loop._stop(error)' call - raise self._last_error - - cdef _close(self): - cdef int err - - if self._running == 1: - raise RuntimeError("Cannot close a running event loop") - - if self._closed == 1: - return - - self._closed = 1 - - for cb_handle in self._ready: - cb_handle.cancel() - self._ready.clear() - self._ready_len = 0 - - if self._polls: - for poll_handle in self._polls.values(): - (<UVHandle>poll_handle)._close() - - self._polls.clear() - - if self._timers: - for timer_cbhandle in tuple(self._timers): - timer_cbhandle.cancel() - - # Close all remaining handles - self.handler_async._close() - self.handler_idle._close() - self.handler_check__exec_writes._close() - __close_all_handles(self) - self._shutdown_signals() - # During this run there should be no open handles, - # so it should finish right away - self.__run(uv.UV_RUN_DEFAULT) - - if self._fd_to_writer_fileobj: - for fileobj in self._fd_to_writer_fileobj.values(): - socket_dec_io_ref(fileobj) - self._fd_to_writer_fileobj.clear() - - if self._fd_to_reader_fileobj: - for fileobj in self._fd_to_reader_fileobj.values(): - socket_dec_io_ref(fileobj) - self._fd_to_reader_fileobj.clear() - - if self._timers: - raise RuntimeError( - f"new timers were queued during loop closing: {self._timers}") - - if self._polls: - raise RuntimeError( - f"new poll handles were queued during loop closing: " - f"{self._polls}") - - if self._ready: - raise RuntimeError( - f"new callbacks were queued during loop closing: " - f"{self._ready}") - - err = uv.uv_loop_close(self.uvloop) - if err < 0: - raise convert_error(err) - - self.handler_async = None - self.handler_idle = None - self.handler_check__exec_writes = None - - self._executor_shutdown_called = True - executor = self._default_executor - if executor is not None: - self._default_executor = None - executor.shutdown(wait=False) - - cdef uint64_t _time(self): - # asyncio doesn't have a time cache, neither should uvloop. - uv.uv_update_time(self.uvloop) # void - return uv.uv_now(self.uvloop) - - cdef inline _queue_write(self, UVStream stream): - self._queued_streams.add(stream) - if not self.handler_check__exec_writes.running: - self.handler_check__exec_writes.start() - - cdef _exec_queued_writes(self): - if len(self._queued_streams) == 0: - if self.handler_check__exec_writes.running: - self.handler_check__exec_writes.stop() - return - - cdef: - UVStream stream - - streams = self._queued_streams - self._queued_streams = self._executing_streams - self._executing_streams = streams - try: - for pystream in streams: - stream = <UVStream>pystream - stream._exec_write() - finally: - streams.clear() - - if self.handler_check__exec_writes.running: - if len(self._queued_streams) == 0: - self.handler_check__exec_writes.stop() - - cdef inline _call_soon(self, object callback, object args, object context): - cdef Handle handle - handle = new_Handle(self, callback, args, context) - self._call_soon_handle(handle) - return handle - - cdef inline _append_ready_handle(self, Handle handle): - self._check_closed() - self._ready.append(handle) - self._ready_len += 1 - - cdef inline _call_soon_handle(self, Handle handle): - self._append_ready_handle(handle) - if not self.handler_idle.running: - self.handler_idle.start() - - cdef _call_later(self, uint64_t delay, object callback, object args, - object context): - return TimerHandle(self, callback, args, delay, context) - - cdef void _handle_exception(self, object ex): - if isinstance(ex, Exception): - self.call_exception_handler({'exception': ex}) - else: - # BaseException - self._last_error = ex - # Exit ASAP - self._stop(None) - - cdef inline _check_signal(self, sig): - if not isinstance(sig, int): - raise TypeError('sig must be an int, not {!r}'.format(sig)) - - if not (1 <= sig < signal_NSIG): - raise ValueError( - 'sig {} out of range(1, {})'.format(sig, signal_NSIG)) - - cdef inline _check_closed(self): - if self._closed == 1: - raise RuntimeError('Event loop is closed') - - cdef inline _check_thread(self): - if self._thread_id == 0: - return - - cdef uint64_t thread_id - thread_id = <uint64_t>PyThread_get_thread_ident() - - if thread_id != self._thread_id: - raise RuntimeError( - "Non-thread-safe operation invoked on an event loop other " - "than the current one") - - cdef inline _new_future(self): - return aio_Future(loop=self) - - cdef _track_transport(self, UVBaseTransport transport): - self._transports[transport._fileno()] = transport - - cdef _track_process(self, UVProcess proc): - self._processes.add(proc) - - cdef _untrack_process(self, UVProcess proc): - self._processes.discard(proc) - - cdef _fileobj_to_fd(self, fileobj): - """Return a file descriptor from a file object. - - Parameters: - fileobj -- file object or file descriptor - - Returns: - corresponding file descriptor - - Raises: - ValueError if the object is invalid - """ - # Copy of the `selectors._fileobj_to_fd()` function. - if isinstance(fileobj, int): - fd = fileobj - else: - try: - fd = int(fileobj.fileno()) - except (AttributeError, TypeError, ValueError): - raise ValueError("Invalid file object: " - "{!r}".format(fileobj)) from None - if fd < 0: - raise ValueError("Invalid file descriptor: {}".format(fd)) - return fd - - cdef _ensure_fd_no_transport(self, fd): - cdef UVBaseTransport tr - try: - tr = <UVBaseTransport>(self._transports[fd]) - except KeyError: - pass - else: - if tr._is_alive(): - raise RuntimeError( - 'File descriptor {!r} is used by transport {!r}'.format( - fd, tr)) - - cdef _add_reader(self, fileobj, Handle handle): - cdef: - UVPoll poll - - self._check_closed() - fd = self._fileobj_to_fd(fileobj) - self._ensure_fd_no_transport(fd) - - try: - poll = <UVPoll>(self._polls[fd]) - except KeyError: - poll = UVPoll.new(self, fd) - self._polls[fd] = poll - - poll.start_reading(handle) - - old_fileobj = self._fd_to_reader_fileobj.pop(fd, None) - if old_fileobj is not None: - socket_dec_io_ref(old_fileobj) - - self._fd_to_reader_fileobj[fd] = fileobj - socket_inc_io_ref(fileobj) - - cdef _remove_reader(self, fileobj): - cdef: - UVPoll poll - - fd = self._fileobj_to_fd(fileobj) - self._ensure_fd_no_transport(fd) - - mapped_fileobj = self._fd_to_reader_fileobj.pop(fd, None) - if mapped_fileobj is not None: - socket_dec_io_ref(mapped_fileobj) - - if self._closed == 1: - return False - - try: - poll = <UVPoll>(self._polls[fd]) - except KeyError: - return False - - result = poll.stop_reading() - if not poll.is_active(): - del self._polls[fd] - poll._close() - - return result - - cdef _has_reader(self, fileobj): - cdef: - UVPoll poll - - self._check_closed() - fd = self._fileobj_to_fd(fileobj) - - try: - poll = <UVPoll>(self._polls[fd]) - except KeyError: - return False - - return poll.is_reading() - - cdef _add_writer(self, fileobj, Handle handle): - cdef: - UVPoll poll - - self._check_closed() - fd = self._fileobj_to_fd(fileobj) - self._ensure_fd_no_transport(fd) - - try: - poll = <UVPoll>(self._polls[fd]) - except KeyError: - poll = UVPoll.new(self, fd) - self._polls[fd] = poll - - poll.start_writing(handle) - - old_fileobj = self._fd_to_writer_fileobj.pop(fd, None) - if old_fileobj is not None: - socket_dec_io_ref(old_fileobj) - - self._fd_to_writer_fileobj[fd] = fileobj - socket_inc_io_ref(fileobj) - - cdef _remove_writer(self, fileobj): - cdef: - UVPoll poll - - fd = self._fileobj_to_fd(fileobj) - self._ensure_fd_no_transport(fd) - - mapped_fileobj = self._fd_to_writer_fileobj.pop(fd, None) - if mapped_fileobj is not None: - socket_dec_io_ref(mapped_fileobj) - - if self._closed == 1: - return False - - try: - poll = <UVPoll>(self._polls[fd]) - except KeyError: - return False - - result = poll.stop_writing() - if not poll.is_active(): - del self._polls[fd] - poll._close() - - return result - - cdef _has_writer(self, fileobj): - cdef: - UVPoll poll - - self._check_closed() - fd = self._fileobj_to_fd(fileobj) - - try: - poll = <UVPoll>(self._polls[fd]) - except KeyError: - return False - - return poll.is_writing() - - cdef _getaddrinfo(self, object host, object port, - int family, int type, - int proto, int flags, - int unpack): - - if isinstance(port, str): - port = port.encode() - elif isinstance(port, int): - port = str(port).encode() - if port is not None and not isinstance(port, bytes): - raise TypeError('port must be a str, bytes or int') - - if isinstance(host, str): - host = host.encode('idna') - if host is not None: - if not isinstance(host, bytes): - raise TypeError('host must be a str or bytes') - - fut = self._new_future() - - def callback(result): - if AddrInfo.isinstance(result): - try: - if unpack == 0: - data = result - else: - data = (<AddrInfo>result).unpack() - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as ex: - if not fut.cancelled(): - fut.set_exception(ex) - else: - if not fut.cancelled(): - fut.set_result(data) - else: - if not fut.cancelled(): - fut.set_exception(result) - - AddrInfoRequest(self, host, port, family, type, proto, flags, callback) - return fut - - cdef _getnameinfo(self, system.sockaddr *addr, int flags): - cdef NameInfoRequest nr - fut = self._new_future() - - def callback(result): - if isinstance(result, tuple): - fut.set_result(result) - else: - fut.set_exception(result) - - nr = NameInfoRequest(self, callback) - nr.query(addr, flags) - return fut - - cdef _sock_recv(self, fut, sock, n): - if UVLOOP_DEBUG: - if fut.cancelled(): - # Shouldn't happen with _SyncSocketReaderFuture. - raise RuntimeError( - f'_sock_recv is called on a cancelled Future') - - if not self._has_reader(sock): - raise RuntimeError( - f'socket {sock!r} does not have a reader ' - f'in the _sock_recv callback') - - try: - data = sock.recv(n) - except (BlockingIOError, InterruptedError): - # No need to re-add the reader, let's just wait until - # the poll handler calls this callback again. - pass - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as exc: - fut.set_exception(exc) - self._remove_reader(sock) - else: - fut.set_result(data) - self._remove_reader(sock) - - cdef _sock_recv_into(self, fut, sock, buf): - if UVLOOP_DEBUG: - if fut.cancelled(): - # Shouldn't happen with _SyncSocketReaderFuture. - raise RuntimeError( - f'_sock_recv_into is called on a cancelled Future') - - if not self._has_reader(sock): - raise RuntimeError( - f'socket {sock!r} does not have a reader ' - f'in the _sock_recv_into callback') - - try: - data = sock.recv_into(buf) - except (BlockingIOError, InterruptedError): - # No need to re-add the reader, let's just wait until - # the poll handler calls this callback again. - pass - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as exc: - fut.set_exception(exc) - self._remove_reader(sock) - else: - fut.set_result(data) - self._remove_reader(sock) - - cdef _sock_sendall(self, fut, sock, data): - cdef: - Handle handle - int n - - if UVLOOP_DEBUG: - if fut.cancelled(): - # Shouldn't happen with _SyncSocketWriterFuture. - raise RuntimeError( - f'_sock_sendall is called on a cancelled Future') - - if not self._has_writer(sock): - raise RuntimeError( - f'socket {sock!r} does not have a writer ' - f'in the _sock_sendall callback') - - try: - n = sock.send(data) - except (BlockingIOError, InterruptedError): - # Try next time. - return - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as exc: - fut.set_exception(exc) - self._remove_writer(sock) - return - - self._remove_writer(sock) - - if n == len(data): - fut.set_result(None) - else: - if n: - if not isinstance(data, memoryview): - data = memoryview(data) - data = data[n:] - - handle = new_MethodHandle3( - self, - "Loop._sock_sendall", - <method3_t>self._sock_sendall, - None, - self, - fut, sock, data) - - self._add_writer(sock, handle) - - cdef _sock_accept(self, fut, sock): - try: - conn, address = sock.accept() - conn.setblocking(False) - except (BlockingIOError, InterruptedError): - # There is an active reader for _sock_accept, so - # do nothing, it will be called again. - pass - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as exc: - fut.set_exception(exc) - self._remove_reader(sock) - else: - fut.set_result((conn, address)) - self._remove_reader(sock) - - cdef _sock_connect(self, sock, address): - cdef: - Handle handle - - try: - sock.connect(address) - except (BlockingIOError, InterruptedError): - pass - else: - return - - fut = _SyncSocketWriterFuture(sock, self) - handle = new_MethodHandle3( - self, - "Loop._sock_connect", - <method3_t>self._sock_connect_cb, - None, - self, - fut, sock, address) - - self._add_writer(sock, handle) - return fut - - cdef _sock_connect_cb(self, fut, sock, address): - if UVLOOP_DEBUG: - if fut.cancelled(): - # Shouldn't happen with _SyncSocketWriterFuture. - raise RuntimeError( - f'_sock_connect_cb is called on a cancelled Future') - - if not self._has_writer(sock): - raise RuntimeError( - f'socket {sock!r} does not have a writer ' - f'in the _sock_connect_cb callback') - - try: - err = sock.getsockopt(uv.SOL_SOCKET, uv.SO_ERROR) - if err != 0: - # Jump to any except clause below. - raise OSError(err, 'Connect call failed %s' % (address,)) - except (BlockingIOError, InterruptedError): - # socket is still registered, the callback will be retried later - pass - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as exc: - fut.set_exception(exc) - self._remove_writer(sock) - else: - fut.set_result(None) - self._remove_writer(sock) - - cdef _sock_set_reuseport(self, int fd): - cdef: - int err - int reuseport_flag = 1 - - err = system.setsockopt( - fd, - uv.SOL_SOCKET, - SO_REUSEPORT, - <char*>&reuseport_flag, - sizeof(reuseport_flag)) - - if err < 0: - raise convert_error(-errno.errno) - - cdef _set_coroutine_debug(self, bint enabled): - enabled = bool(enabled) - if self._coroutine_debug_set == enabled: - return - - if enabled: - self._coroutine_origin_tracking_saved_depth = ( - sys.get_coroutine_origin_tracking_depth()) - sys.set_coroutine_origin_tracking_depth( - DEBUG_STACK_DEPTH) - else: - sys.set_coroutine_origin_tracking_depth( - self._coroutine_origin_tracking_saved_depth) - - self._coroutine_debug_set = enabled - - def _get_backend_id(self): - """This method is used by uvloop tests and is not part of the API.""" - return uv.uv_backend_fd(self.uvloop) - - cdef _print_debug_info(self): - cdef: - int err - uv.uv_rusage_t rusage - - err = uv.uv_getrusage(&rusage) - if err < 0: - raise convert_error(err) - - # OS - - print('---- Process info: -----') - print('Process memory: {}'.format(rusage.ru_maxrss)) - print('Number of signals: {}'.format(rusage.ru_nsignals)) - print('') - - # Loop - - print('--- Loop debug info: ---') - print('Loop time: {}'.format(self.time())) - print('Errors logged: {}'.format( - self._debug_exception_handler_cnt)) - print() - print('Callback handles: {: <8} | {}'.format( - self._debug_cb_handles_count, - self._debug_cb_handles_total)) - print('Timer handles: {: <8} | {}'.format( - self._debug_cb_timer_handles_count, - self._debug_cb_timer_handles_total)) - print() - - print(' alive | closed |') - print('UVHandles python | libuv | total') - print(' objs | handles |') - print('-------------------------------+---------+---------') - for name in sorted(self._debug_handles_total): - print(' {: <18} {: >7} | {: >7} | {: >7}'.format( - name, - self._debug_handles_current[name], - self._debug_handles_closed[name], - self._debug_handles_total[name])) - print() - - print('uv_handle_t (current: {}; freed: {}; total: {})'.format( - self._debug_uv_handles_total - self._debug_uv_handles_freed, - self._debug_uv_handles_freed, - self._debug_uv_handles_total)) - print() - - print('--- Streams debug info: ---') - print('Write errors: {}'.format( - self._debug_stream_write_errors_total)) - print('Write without poll: {}'.format( - self._debug_stream_write_tries)) - print('Write contexts: {: <8} | {}'.format( - self._debug_stream_write_ctx_cnt, - self._debug_stream_write_ctx_total)) - print('Write failed callbacks: {}'.format( - self._debug_stream_write_cb_errors_total)) - print() - print('Read errors: {}'.format( - self._debug_stream_read_errors_total)) - print('Read callbacks: {}'.format( - self._debug_stream_read_cb_total)) - print('Read failed callbacks: {}'.format( - self._debug_stream_read_cb_errors_total)) - print('Read EOFs: {}'.format( - self._debug_stream_read_eof_total)) - print('Read EOF failed callbacks: {}'.format( - self._debug_stream_read_eof_cb_errors_total)) - print() - print('Listen errors: {}'.format( - self._debug_stream_listen_errors_total)) - print('Shutdown errors {}'.format( - self._debug_stream_shutdown_errors_total)) - print() - - print('--- Polls debug info: ---') - print('Read events: {}'.format( - self._poll_read_events_total)) - print('Read callbacks failed: {}'.format( - self._poll_read_cb_errors_total)) - print('Write events: {}'.format( - self._poll_write_events_total)) - print('Write callbacks failed: {}'.format( - self._poll_write_cb_errors_total)) - print() - - print('--- Sock ops successful on 1st try: ---') - print('Socket try-writes: {}'.format( - self._sock_try_write_total)) - - print(flush=True) - - property print_debug_info: - def __get__(self): - if UVLOOP_DEBUG: - return lambda: self._print_debug_info() - else: - raise AttributeError('print_debug_info') - - # Public API - - def __repr__(self): - return '<{}.{} running={} closed={} debug={}>'.format( - self.__class__.__module__, - self.__class__.__name__, - self.is_running(), - self.is_closed(), - self.get_debug() - ) - - def call_soon(self, callback, *args, context=None): - """Arrange for a callback to be called as soon as possible. - - This operates as a FIFO queue: callbacks are called in the - order in which they are registered. Each callback will be - called exactly once. - - Any positional arguments after the callback will be passed to - the callback when it is called. - """ - if self._debug == 1: - self._check_thread() - if args: - return self._call_soon(callback, args, context) - else: - return self._call_soon(callback, None, context) - - def call_soon_threadsafe(self, callback, *args, context=None): - """Like call_soon(), but thread-safe.""" - if not args: - args = None - cdef Handle handle = new_Handle(self, callback, args, context) - self._append_ready_handle(handle) # deque append is atomic - # libuv async handler is thread-safe while the idle handler is not - - # we only set the async handler here, which will start the idle handler - # in _on_wake() from the loop and eventually call the callback. - self.handler_async.send() - return handle - - def call_later(self, delay, callback, *args, context=None): - """Arrange for a callback to be called at a given time. - - Return a Handle: an opaque object with a cancel() method that - can be used to cancel the call. - - The delay can be an int or float, expressed in seconds. It is - always relative to the current time. - - Each callback will be called exactly once. If two callbacks - are scheduled for exactly the same time, it undefined which - will be called first. - - Any positional arguments after the callback will be passed to - the callback when it is called. - """ - cdef uint64_t when - - self._check_closed() - if self._debug == 1: - self._check_thread() - - if delay < 0: - delay = 0 - elif delay == py_inf or delay > MAX_SLEEP: - # ~100 years sounds like a good approximation of - # infinity for a Python application. - delay = MAX_SLEEP - - when = <uint64_t>round(delay * 1000) - if not args: - args = None - if when == 0: - return self._call_soon(callback, args, context) - else: - return self._call_later(when, callback, args, context) - - def call_at(self, when, callback, *args, context=None): - """Like call_later(), but uses an absolute time. - - Absolute time corresponds to the event loop's time() method. - """ - return self.call_later( - when - self.time(), callback, *args, context=context) - - def time(self): - """Return the time according to the event loop's clock. - - This is a float expressed in seconds since an epoch, but the - epoch, precision, accuracy and drift are unspecified and may - differ per event loop. - """ - return self._time() / 1000 - - def stop(self): - """Stop running the event loop. - - Every callback already scheduled will still run. This simply informs - run_forever to stop looping after a complete iteration. - """ - self._call_soon_handle( - new_MethodHandle1( - self, - "Loop._stop", - <method1_t>self._stop, - None, - self, - None)) - - def run_forever(self): - """Run the event loop until stop() is called.""" - self._check_closed() - mode = uv.UV_RUN_DEFAULT - if self._stopping: - # loop.stop() was called right before loop.run_forever(). - # This is how asyncio loop behaves. - mode = uv.UV_RUN_NOWAIT - self._set_coroutine_debug(self._debug) - old_agen_hooks = sys.get_asyncgen_hooks() - sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, - finalizer=self._asyncgen_finalizer_hook) - try: - self._run(mode) - finally: - self._set_coroutine_debug(False) - sys.set_asyncgen_hooks(*old_agen_hooks) - - def close(self): - """Close the event loop. - - The event loop must not be running. - - This is idempotent and irreversible. - - No other methods should be called after this one. - """ - self._close() - - def get_debug(self): - return bool(self._debug) - - def set_debug(self, enabled): - self._debug = bool(enabled) - if self.is_running(): - self.call_soon_threadsafe( - self._set_coroutine_debug, self, self._debug) - - def is_running(self): - """Return whether the event loop is currently running.""" - return bool(self._running) - - def is_closed(self): - """Returns True if the event loop was closed.""" - return bool(self._closed) - - def create_future(self): - """Create a Future object attached to the loop.""" - return self._new_future() - - def create_task(self, coro, *, name=None, context=None): - """Schedule a coroutine object. - - Return a task object. - - If name is not None, task.set_name(name) will be called if the task - object has the set_name attribute, true for default Task in CPython. - - An optional keyword-only context argument allows specifying a custom - contextvars.Context for the coro to run in. The current context copy is - created when no context is provided. - """ - self._check_closed() - if PY311: - if self._task_factory is None: - task = aio_Task(coro, loop=self, context=context) - else: - task = self._task_factory(self, coro, context=context) - else: - if context is None: - if self._task_factory is None: - task = aio_Task(coro, loop=self) - else: - task = self._task_factory(self, coro) - else: - if self._task_factory is None: - task = context.run(aio_Task, coro, self) - else: - task = context.run(self._task_factory, self, coro) - - # copied from asyncio.tasks._set_task_name (bpo-34270) - if name is not None: - try: - set_name = task.set_name - except AttributeError: - pass - else: - set_name(name) - - return task - - def set_task_factory(self, factory): - """Set a task factory that will be used by loop.create_task(). - - If factory is None the default task factory will be set. - - If factory is a callable, it should have a signature matching - '(loop, coro)', where 'loop' will be a reference to the active - event loop, 'coro' will be a coroutine object. The callable - must return a Future. - """ - if factory is not None and not callable(factory): - raise TypeError('task factory must be a callable or None') - self._task_factory = factory - - def get_task_factory(self): - """Return a task factory, or None if the default one is in use.""" - return self._task_factory - - def run_until_complete(self, future): - """Run until the Future is done. - - If the argument is a coroutine, it is wrapped in a Task. - - WARNING: It would be disastrous to call run_until_complete() - with the same coroutine twice -- it would wrap it in two - different Tasks and that can't be good. - - Return the Future's result, or raise its exception. - """ - self._check_closed() - - new_task = not isfuture(future) - future = aio_ensure_future(future, loop=self) - if new_task: - # An exception is raised if the future didn't complete, so there - # is no need to log the "destroy pending task" message - future._log_destroy_pending = False - - def done_cb(fut): - if not fut.cancelled(): - exc = fut.exception() - if isinstance(exc, (SystemExit, KeyboardInterrupt)): - # Issue #336: run_forever() already finished, - # no need to stop it. - return - self.stop() - - future.add_done_callback(done_cb) - try: - self.run_forever() - except BaseException: - if new_task and future.done() and not future.cancelled(): - # The coroutine raised a BaseException. Consume the exception - # to not log a warning, the caller doesn't have access to the - # local task. - future.exception() - raise - finally: - future.remove_done_callback(done_cb) - if not future.done(): - raise RuntimeError('Event loop stopped before Future completed.') - - return future.result() - - @cython.iterable_coroutine - async def getaddrinfo(self, object host, object port, *, - int family=0, int type=0, int proto=0, int flags=0): - - addr = __static_getaddrinfo_pyaddr(host, port, family, - type, proto, flags) - if addr is not None: - return [addr] - - return await self._getaddrinfo( - host, port, family, type, proto, flags, 1) - - @cython.iterable_coroutine - async def getnameinfo(self, sockaddr, int flags=0): - cdef: - AddrInfo ai_cnt - system.addrinfo *ai - system.sockaddr_in6 *sin6 - - if not isinstance(sockaddr, tuple): - raise TypeError('getnameinfo() argument 1 must be a tuple') - - sl = len(sockaddr) - - if sl < 2 or sl > 4: - raise ValueError('sockaddr must be a tuple of 2, 3 or 4 values') - - if sl > 2: - flowinfo = sockaddr[2] - if flowinfo < 0 or flowinfo > 0xfffff: - raise OverflowError( - 'getnameinfo(): flowinfo must be 0-1048575.') - else: - flowinfo = 0 - - if sl > 3: - scope_id = sockaddr[3] - if scope_id < 0 or scope_id > 2 ** 32: - raise OverflowError( - 'getsockaddrarg: scope_id must be unsigned 32 bit integer') - else: - scope_id = 0 - - ai_cnt = await self._getaddrinfo( - sockaddr[0], sockaddr[1], - uv.AF_UNSPEC, # family - uv.SOCK_DGRAM, # type - 0, # proto - uv.AI_NUMERICHOST, # flags - 0) # unpack - - ai = ai_cnt.data - - if ai.ai_next: - raise OSError("sockaddr resolved to multiple addresses") - - if ai.ai_family == uv.AF_INET: - if sl > 2: - raise OSError("IPv4 sockaddr must be 2 tuple") - elif ai.ai_family == uv.AF_INET6: - # Modify some fields in `ai` - sin6 = <system.sockaddr_in6*> ai.ai_addr - sin6.sin6_flowinfo = system.htonl(flowinfo) - sin6.sin6_scope_id = scope_id - - return await self._getnameinfo(ai.ai_addr, flags) - - @cython.iterable_coroutine - async def start_tls(self, transport, protocol, sslcontext, *, - server_side=False, - server_hostname=None, - ssl_handshake_timeout=None, - ssl_shutdown_timeout=None): - """Upgrade transport to TLS. - - Return a new transport that *protocol* should start using - immediately. - """ - if not isinstance(sslcontext, ssl_SSLContext): - raise TypeError( - f'sslcontext is expected to be an instance of ssl.SSLContext, ' - f'got {sslcontext!r}') - - if isinstance(transport, (TCPTransport, UnixTransport)): - context = (<UVStream>transport).context - elif isinstance(transport, _SSLProtocolTransport): - context = (<_SSLProtocolTransport>transport).context - else: - raise TypeError( - f'transport {transport!r} is not supported by start_tls()') - - waiter = self._new_future() - ssl_protocol = SSLProtocol( - self, protocol, sslcontext, waiter, - server_side, server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout, - ssl_shutdown_timeout=ssl_shutdown_timeout, - call_connection_made=False) - - # Pause early so that "ssl_protocol.data_received()" doesn't - # have a chance to get called before "ssl_protocol.connection_made()". - transport.pause_reading() - - transport.set_protocol(ssl_protocol) - conmade_cb = self.call_soon(ssl_protocol.connection_made, transport, - context=context) - # transport.resume_reading() will use the right context - # (transport.context) to call e.g. data_received() - resume_cb = self.call_soon(transport.resume_reading) - app_transport = ssl_protocol._get_app_transport(context) - - try: - await waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - app_transport.close() - conmade_cb.cancel() - resume_cb.cancel() - raise - - return app_transport - - @cython.iterable_coroutine - async def create_server(self, protocol_factory, host=None, port=None, - *, - int family=uv.AF_UNSPEC, - int flags=uv.AI_PASSIVE, - sock=None, - backlog=100, - ssl=None, - reuse_address=None, - reuse_port=None, - ssl_handshake_timeout=None, - ssl_shutdown_timeout=None, - start_serving=True): - """A coroutine which creates a TCP server bound to host and port. - - The return value is a Server object which can be used to stop - the service. - - If host is an empty string or None all interfaces are assumed - and a list of multiple sockets will be returned (most likely - one for IPv4 and another one for IPv6). The host parameter can also be - a sequence (e.g. list) of hosts to bind to. - - family can be set to either AF_INET or AF_INET6 to force the - socket to use IPv4 or IPv6. If not set it will be determined - from host (defaults to AF_UNSPEC). - - flags is a bitmask for getaddrinfo(). - - sock can optionally be specified in order to use a preexisting - socket object. - - backlog is the maximum number of queued connections passed to - listen() (defaults to 100). - - ssl can be set to an SSLContext to enable SSL over the - accepted connections. - - reuse_address tells the kernel to reuse a local socket in - TIME_WAIT state, without waiting for its natural timeout to - expire. If not specified will automatically be set to True on - UNIX. - - reuse_port tells the kernel to allow this endpoint to be bound to - the same port as other existing endpoints are bound to, so long as - they all set this flag when being created. This option is not - supported on Windows. - - ssl_handshake_timeout is the time in seconds that an SSL server - will wait for completion of the SSL handshake before aborting the - connection. Default is 60s. - - ssl_shutdown_timeout is the time in seconds that an SSL server - will wait for completion of the SSL shutdown before aborting the - connection. Default is 30s. - """ - cdef: - TCPServer tcp - system.addrinfo *addrinfo - Server server - - if sock is not None and sock.family == uv.AF_UNIX: - if host is not None or port is not None: - raise ValueError( - 'host/port and sock can not be specified at the same time') - return await self.create_unix_server( - protocol_factory, sock=sock, backlog=backlog, ssl=ssl, - start_serving=start_serving) - - server = Server(self) - - if ssl is not None: - if not isinstance(ssl, ssl_SSLContext): - raise TypeError('ssl argument must be an SSLContext or None') - else: - if ssl_handshake_timeout is not None: - raise ValueError( - 'ssl_handshake_timeout is only meaningful with ssl') - if ssl_shutdown_timeout is not None: - raise ValueError( - 'ssl_shutdown_timeout is only meaningful with ssl') - - if host is not None or port is not None: - if sock is not None: - raise ValueError( - 'host/port and sock can not be specified at the same time') - - if reuse_address is None: - reuse_address = os_name == 'posix' and sys_platform != 'cygwin' - reuse_port = bool(reuse_port) - if reuse_port and not has_SO_REUSEPORT: - raise ValueError( - 'reuse_port not supported by socket module') - - if host == '': - hosts = [None] - elif (isinstance(host, str) or not isinstance(host, col_Iterable)): - hosts = [host] - else: - hosts = host - - fs = [self._getaddrinfo(host, port, family, - uv.SOCK_STREAM, 0, flags, - 0) for host in hosts] - - infos = await aio_gather(*fs) - - completed = False - sock = None - try: - for info in infos: - addrinfo = (<AddrInfo>info).data - while addrinfo != NULL: - if addrinfo.ai_family == uv.AF_UNSPEC: - raise RuntimeError('AF_UNSPEC in DNS results') - - try: - sock = socket_socket(addrinfo.ai_family, - addrinfo.ai_socktype, - addrinfo.ai_protocol) - except socket_error: - # Assume it's a bad family/type/protocol - # combination. - if self._debug: - aio_logger.warning( - 'create_server() failed to create ' - 'socket.socket(%r, %r, %r)', - addrinfo.ai_family, - addrinfo.ai_socktype, - addrinfo.ai_protocol, exc_info=True) - addrinfo = addrinfo.ai_next - continue - - if reuse_address: - sock.setsockopt(uv.SOL_SOCKET, uv.SO_REUSEADDR, 1) - if reuse_port: - sock.setsockopt(uv.SOL_SOCKET, uv.SO_REUSEPORT, 1) - # Disable IPv4/IPv6 dual stack support (enabled by - # default on Linux) which makes a single socket - # listen on both address families. - if (addrinfo.ai_family == uv.AF_INET6 and - has_IPV6_V6ONLY): - sock.setsockopt(uv.IPPROTO_IPV6, IPV6_V6ONLY, 1) - - pyaddr = __convert_sockaddr_to_pyaddr(addrinfo.ai_addr) - try: - sock.bind(pyaddr) - except OSError as err: - raise OSError( - err.errno, 'error while attempting ' - 'to bind on address %r: %s' - % (pyaddr, err.strerror.lower())) from None - - tcp = TCPServer.new(self, protocol_factory, server, - uv.AF_UNSPEC, backlog, - ssl, ssl_handshake_timeout, - ssl_shutdown_timeout) - - try: - tcp._open(sock.fileno()) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - tcp._close() - raise - - server._add_server(tcp) - sock.detach() - sock = None - - addrinfo = addrinfo.ai_next - - completed = True - finally: - if not completed: - if sock is not None: - sock.close() - server.close() - else: - if sock is None: - raise ValueError('Neither host/port nor sock were specified') - if not _is_sock_stream(sock.type): - raise ValueError( - 'A Stream Socket was expected, got {!r}'.format(sock)) - - # libuv will set the socket to non-blocking mode, but - # we want Python socket object to notice that. - sock.setblocking(False) - - tcp = TCPServer.new(self, protocol_factory, server, - uv.AF_UNSPEC, backlog, - ssl, ssl_handshake_timeout, - ssl_shutdown_timeout) - - try: - tcp._open(sock.fileno()) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - tcp._close() - raise - - tcp._attach_fileobj(sock) - server._add_server(tcp) - - if start_serving: - server._start_serving() - - server._ref() - return server - - @cython.iterable_coroutine - async def create_connection(self, protocol_factory, host=None, port=None, - *, - ssl=None, - family=0, proto=0, flags=0, sock=None, - local_addr=None, server_hostname=None, - ssl_handshake_timeout=None, - ssl_shutdown_timeout=None): - """Connect to a TCP server. - - Create a streaming transport connection to a given Internet host and - port: socket family AF_INET or socket.AF_INET6 depending on host (or - family if specified), socket type SOCK_STREAM. protocol_factory must be - a callable returning a protocol instance. - - This method is a coroutine which will try to establish the connection - in the background. When successful, the coroutine returns a - (transport, protocol) pair. - """ - cdef: - AddrInfo ai_local = None - AddrInfo ai_remote - TCPTransport tr - - system.addrinfo *rai = NULL - system.addrinfo *lai = NULL - - system.addrinfo *rai_iter = NULL - system.addrinfo *lai_iter = NULL - - system.addrinfo rai_static - system.sockaddr_storage rai_addr_static - system.addrinfo lai_static - system.sockaddr_storage lai_addr_static - - object app_protocol - object app_transport - object protocol - object ssl_waiter - - if sock is not None and sock.family == uv.AF_UNIX: - if host is not None or port is not None: - raise ValueError( - 'host/port and sock can not be specified at the same time') - return await self.create_unix_connection( - protocol_factory, None, - sock=sock, ssl=ssl, server_hostname=server_hostname) - - app_protocol = protocol = protocol_factory() - ssl_waiter = None - context = Context_CopyCurrent() - if ssl: - if server_hostname is None: - if not host: - raise ValueError('You must set server_hostname ' - 'when using ssl without a host') - server_hostname = host - - ssl_waiter = self._new_future() - sslcontext = None if isinstance(ssl, bool) else ssl - protocol = SSLProtocol( - self, app_protocol, sslcontext, ssl_waiter, - False, server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout, - ssl_shutdown_timeout=ssl_shutdown_timeout) - else: - if server_hostname is not None: - raise ValueError('server_hostname is only meaningful with ssl') - if ssl_handshake_timeout is not None: - raise ValueError( - 'ssl_handshake_timeout is only meaningful with ssl') - if ssl_shutdown_timeout is not None: - raise ValueError( - 'ssl_shutdown_timeout is only meaningful with ssl') - - if host is not None or port is not None: - if sock is not None: - raise ValueError( - 'host/port and sock can not be specified at the same time') - - fs = [] - f1 = f2 = None - - addr = __static_getaddrinfo( - host, port, family, uv.SOCK_STREAM, - proto, <system.sockaddr*>&rai_addr_static) - - if addr is None: - f1 = self._getaddrinfo( - host, port, family, - uv.SOCK_STREAM, proto, flags, - 0) # 0 == don't unpack - - fs.append(f1) - else: - rai_static.ai_addr = <system.sockaddr*>&rai_addr_static - rai_static.ai_next = NULL - rai = &rai_static - - if local_addr is not None: - if not isinstance(local_addr, (tuple, list)) or \ - len(local_addr) != 2: - raise ValueError( - 'local_addr must be a tuple of host and port') - - addr = __static_getaddrinfo( - local_addr[0], local_addr[1], - family, uv.SOCK_STREAM, - proto, <system.sockaddr*>&lai_addr_static) - if addr is None: - f2 = self._getaddrinfo( - local_addr[0], local_addr[1], family, - uv.SOCK_STREAM, proto, flags, - 0) # 0 == don't unpack - - fs.append(f2) - else: - lai_static.ai_addr = <system.sockaddr*>&lai_addr_static - lai_static.ai_next = NULL - lai = &lai_static - - if len(fs): - await aio_wait(fs) - - if rai is NULL: - ai_remote = f1.result() - if ai_remote.data is NULL: - raise OSError('getaddrinfo() returned empty list') - rai = ai_remote.data - - if lai is NULL and f2 is not None: - ai_local = f2.result() - if ai_local.data is NULL: - raise OSError( - 'getaddrinfo() returned empty list for local_addr') - lai = ai_local.data - - exceptions = [] - rai_iter = rai - while rai_iter is not NULL: - tr = None - try: - waiter = self._new_future() - tr = TCPTransport.new(self, protocol, None, waiter, - context) - - if lai is not NULL: - lai_iter = lai - while lai_iter is not NULL: - try: - tr.bind(lai_iter.ai_addr) - break - except OSError as exc: - exceptions.append(exc) - lai_iter = lai_iter.ai_next - else: - tr._close() - tr = None - - rai_iter = rai_iter.ai_next - continue - - tr.connect(rai_iter.ai_addr) - await waiter - - except OSError as exc: - if tr is not None: - tr._close() - tr = None - exceptions.append(exc) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - if tr is not None: - tr._close() - tr = None - raise - else: - break - - rai_iter = rai_iter.ai_next - - else: - # If they all have the same str(), raise one. - model = str(exceptions[0]) - if all(str(exc) == model for exc in exceptions): - raise exceptions[0] - # Raise a combined exception so the user can see all - # the various error messages. - raise OSError('Multiple exceptions: {}'.format( - ', '.join(str(exc) for exc in exceptions))) - else: - if sock is None: - raise ValueError( - 'host and port was not specified and no sock specified') - if not _is_sock_stream(sock.type): - raise ValueError( - 'A Stream Socket was expected, got {!r}'.format(sock)) - - # libuv will set the socket to non-blocking mode, but - # we want Python socket object to notice that. - sock.setblocking(False) - - waiter = self._new_future() - tr = TCPTransport.new(self, protocol, None, waiter, context) - try: - # libuv will make socket non-blocking - tr._open(sock.fileno()) - tr._init_protocol() - await waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - # It's OK to call `_close()` here, as opposed to - # `_force_close()` or `close()` as we want to terminate the - # transport immediately. The `waiter` can only be waken - # up in `Transport._call_connection_made()`, and calling - # `_close()` before it is fine. - tr._close() - raise - - tr._attach_fileobj(sock) - - if ssl: - app_transport = protocol._get_app_transport(context) - try: - await ssl_waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - app_transport.close() - raise - return app_transport, app_protocol - else: - return tr, protocol - - @cython.iterable_coroutine - async def create_unix_server(self, protocol_factory, path=None, - *, backlog=100, sock=None, ssl=None, - ssl_handshake_timeout=None, - ssl_shutdown_timeout=None, - start_serving=True): - """A coroutine which creates a UNIX Domain Socket server. - - The return value is a Server object, which can be used to stop - the service. - - path is a str, representing a file systsem path to bind the - server socket to. - - sock can optionally be specified in order to use a preexisting - socket object. - - backlog is the maximum number of queued connections passed to - listen() (defaults to 100). - - ssl can be set to an SSLContext to enable SSL over the - accepted connections. - - ssl_handshake_timeout is the time in seconds that an SSL server - will wait for completion of the SSL handshake before aborting the - connection. Default is 60s. - - ssl_shutdown_timeout is the time in seconds that an SSL server - will wait for completion of the SSL shutdown before aborting the - connection. Default is 30s. - """ - cdef: - UnixServer pipe - Server server = Server(self) - - if ssl is not None: - if not isinstance(ssl, ssl_SSLContext): - raise TypeError('ssl argument must be an SSLContext or None') - else: - if ssl_handshake_timeout is not None: - raise ValueError( - 'ssl_handshake_timeout is only meaningful with ssl') - if ssl_shutdown_timeout is not None: - raise ValueError( - 'ssl_shutdown_timeout is only meaningful with ssl') - - if path is not None: - if sock is not None: - raise ValueError( - 'path and sock can not be specified at the same time') - orig_path = path - - path = os_fspath(path) - - if isinstance(path, str): - path = PyUnicode_EncodeFSDefault(path) - - # Check for abstract socket. - if path[0] != 0: - try: - if stat_S_ISSOCK(os_stat(path).st_mode): - os_remove(path) - except FileNotFoundError: - pass - except OSError as err: - # Directory may have permissions only to create socket. - aio_logger.error( - 'Unable to check or remove stale UNIX socket %r: %r', - orig_path, err) - - # We use Python sockets to create a UNIX server socket because - # when UNIX sockets are created by libuv, libuv removes the path - # they were bound to. This is different from asyncio, which - # doesn't cleanup the socket path. - sock = socket_socket(uv.AF_UNIX) - - try: - sock.bind(path) - except OSError as exc: - sock.close() - if exc.errno == errno.EADDRINUSE: - # Let's improve the error message by adding - # with what exact address it occurs. - msg = 'Address {!r} is already in use'.format(orig_path) - raise OSError(errno.EADDRINUSE, msg) from None - else: - raise - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - sock.close() - raise - - else: - if sock is None: - raise ValueError( - 'path was not specified, and no sock specified') - - if sock.family != uv.AF_UNIX or not _is_sock_stream(sock.type): - raise ValueError( - 'A UNIX Domain Stream Socket was expected, got {!r}' - .format(sock)) - - # libuv will set the socket to non-blocking mode, but - # we want Python socket object to notice that. - sock.setblocking(False) - - pipe = UnixServer.new( - self, protocol_factory, server, backlog, - ssl, ssl_handshake_timeout, ssl_shutdown_timeout) - - try: - pipe._open(sock.fileno()) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - pipe._close() - sock.close() - raise - - pipe._attach_fileobj(sock) - server._add_server(pipe) - - if start_serving: - server._start_serving() - - return server - - @cython.iterable_coroutine - async def create_unix_connection(self, protocol_factory, path=None, *, - ssl=None, sock=None, - server_hostname=None, - ssl_handshake_timeout=None, - ssl_shutdown_timeout=None): - - cdef: - UnixTransport tr - object app_protocol - object app_transport - object protocol - object ssl_waiter - - app_protocol = protocol = protocol_factory() - ssl_waiter = None - context = Context_CopyCurrent() - if ssl: - if server_hostname is None: - raise ValueError('You must set server_hostname ' - 'when using ssl without a host') - - ssl_waiter = self._new_future() - sslcontext = None if isinstance(ssl, bool) else ssl - protocol = SSLProtocol( - self, app_protocol, sslcontext, ssl_waiter, - False, server_hostname, - ssl_handshake_timeout=ssl_handshake_timeout, - ssl_shutdown_timeout=ssl_shutdown_timeout) - else: - if server_hostname is not None: - raise ValueError('server_hostname is only meaningful with ssl') - if ssl_handshake_timeout is not None: - raise ValueError( - 'ssl_handshake_timeout is only meaningful with ssl') - if ssl_shutdown_timeout is not None: - raise ValueError( - 'ssl_shutdown_timeout is only meaningful with ssl') - - if path is not None: - if sock is not None: - raise ValueError( - 'path and sock can not be specified at the same time') - - path = os_fspath(path) - - if isinstance(path, str): - path = PyUnicode_EncodeFSDefault(path) - - waiter = self._new_future() - tr = UnixTransport.new(self, protocol, None, waiter, context) - tr.connect(path) - try: - await waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - tr._close() - raise - - else: - if sock is None: - raise ValueError('no path and sock were specified') - - if sock.family != uv.AF_UNIX or not _is_sock_stream(sock.type): - raise ValueError( - 'A UNIX Domain Stream Socket was expected, got {!r}' - .format(sock)) - - # libuv will set the socket to non-blocking mode, but - # we want Python socket object to notice that. - sock.setblocking(False) - - waiter = self._new_future() - tr = UnixTransport.new(self, protocol, None, waiter, context) - try: - tr._open(sock.fileno()) - tr._init_protocol() - await waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - tr._close() - raise - - tr._attach_fileobj(sock) - - if ssl: - app_transport = protocol._get_app_transport(Context_CopyCurrent()) - try: - await ssl_waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - app_transport.close() - raise - return app_transport, app_protocol - else: - return tr, protocol - - def default_exception_handler(self, context): - """Default exception handler. - - This is called when an exception occurs and no exception - handler is set, and can be called by a custom exception - handler that wants to defer to the default behavior. - - The context parameter has the same meaning as in - `call_exception_handler()`. - """ - message = context.get('message') - if not message: - message = 'Unhandled exception in event loop' - - exception = context.get('exception') - if exception is not None: - exc_info = (type(exception), exception, exception.__traceback__) - else: - exc_info = False - - log_lines = [message] - for key in sorted(context): - if key in {'message', 'exception'}: - continue - value = context[key] - if key == 'source_traceback': - tb = ''.join(tb_format_list(value)) - value = 'Object created at (most recent call last):\n' - value += tb.rstrip() - else: - try: - value = repr(value) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as ex: - value = ('Exception in __repr__ {!r}; ' - 'value type: {!r}'.format(ex, type(value))) - log_lines.append('{}: {}'.format(key, value)) - - aio_logger.error('\n'.join(log_lines), exc_info=exc_info) - - def get_exception_handler(self): - """Return an exception handler, or None if the default one is in use. - """ - return self._exception_handler - - def set_exception_handler(self, handler): - """Set handler as the new event loop exception handler. - - If handler is None, the default exception handler will - be set. - - If handler is a callable object, it should have a - signature matching '(loop, context)', where 'loop' - will be a reference to the active event loop, 'context' - will be a dict object (see `call_exception_handler()` - documentation for details about context). - """ - if handler is not None and not callable(handler): - raise TypeError('A callable object or None is expected, ' - 'got {!r}'.format(handler)) - self._exception_handler = handler - - def call_exception_handler(self, context): - """Call the current event loop's exception handler. - - The context argument is a dict containing the following keys: - - - 'message': Error message; - - 'exception' (optional): Exception object; - - 'future' (optional): Future instance; - - 'handle' (optional): Handle instance; - - 'protocol' (optional): Protocol instance; - - 'transport' (optional): Transport instance; - - 'socket' (optional): Socket instance. - - New keys maybe introduced in the future. - - Note: do not overload this method in an event loop subclass. - For custom exception handling, use the - `set_exception_handler()` method. - """ - if UVLOOP_DEBUG: - self._debug_exception_handler_cnt += 1 - - if self._exception_handler is None: - try: - self.default_exception_handler(context) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - # Second protection layer for unexpected errors - # in the default implementation, as well as for subclassed - # event loops with overloaded "default_exception_handler". - aio_logger.error('Exception in default exception handler', - exc_info=True) - else: - try: - self._exception_handler(self, context) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as exc: - # Exception in the user set custom exception handler. - try: - # Let's try default handler. - self.default_exception_handler({ - 'message': 'Unhandled error in exception handler', - 'exception': exc, - 'context': context, - }) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - # Guard 'default_exception_handler' in case it is - # overloaded. - aio_logger.error('Exception in default exception handler ' - 'while handling an unexpected error ' - 'in custom exception handler', - exc_info=True) - - def add_reader(self, fileobj, callback, *args): - """Add a reader callback.""" - if len(args) == 0: - args = None - self._add_reader(fileobj, new_Handle(self, callback, args, None)) - - def remove_reader(self, fileobj): - """Remove a reader callback.""" - self._remove_reader(fileobj) - - def add_writer(self, fileobj, callback, *args): - """Add a writer callback..""" - if len(args) == 0: - args = None - self._add_writer(fileobj, new_Handle(self, callback, args, None)) - - def remove_writer(self, fileobj): - """Remove a writer callback.""" - self._remove_writer(fileobj) - - @cython.iterable_coroutine - async def sock_recv(self, sock, n): - """Receive data from the socket. - - The return value is a bytes object representing the data received. - The maximum amount of data to be received at once is specified by - nbytes. - - This method is a coroutine. - """ - cdef: - Handle handle - - if self._debug and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") - - fut = _SyncSocketReaderFuture(sock, self) - handle = new_MethodHandle3( - self, - "Loop._sock_recv", - <method3_t>self._sock_recv, - None, - self, - fut, sock, n) - - self._add_reader(sock, handle) - return await fut - - @cython.iterable_coroutine - async def sock_recv_into(self, sock, buf): - """Receive data from the socket. - - The received data is written into *buf* (a writable buffer). - The return value is the number of bytes written. - - This method is a coroutine. - """ - cdef: - Handle handle - - if self._debug and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") - - fut = _SyncSocketReaderFuture(sock, self) - handle = new_MethodHandle3( - self, - "Loop._sock_recv_into", - <method3_t>self._sock_recv_into, - None, - self, - fut, sock, buf) - - self._add_reader(sock, handle) - return await fut - - @cython.iterable_coroutine - async def sock_sendall(self, sock, data): - """Send data to the socket. - - The socket must be connected to a remote socket. This method continues - to send data from data until either all data has been sent or an - error occurs. None is returned on success. On error, an exception is - raised, and there is no way to determine how much data, if any, was - successfully processed by the receiving end of the connection. - - This method is a coroutine. - """ - cdef: - Handle handle - ssize_t n - - if self._debug and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") - - if not data: - return - - socket_inc_io_ref(sock) - try: - try: - n = sock.send(data) - except (BlockingIOError, InterruptedError): - pass - else: - if UVLOOP_DEBUG: - # This can be a partial success, i.e. only part - # of the data was sent - self._sock_try_write_total += 1 - - if n == len(data): - return - if not isinstance(data, memoryview): - data = memoryview(data) - data = data[n:] - - fut = _SyncSocketWriterFuture(sock, self) - handle = new_MethodHandle3( - self, - "Loop._sock_sendall", - <method3_t>self._sock_sendall, - None, - self, - fut, sock, data) - - self._add_writer(sock, handle) - return await fut - finally: - socket_dec_io_ref(sock) - - @cython.iterable_coroutine - async def sock_accept(self, sock): - """Accept a connection. - - The socket must be bound to an address and listening for connections. - The return value is a pair (conn, address) where conn is a new socket - object usable to send and receive data on the connection, and address - is the address bound to the socket on the other end of the connection. - - This method is a coroutine. - """ - cdef: - Handle handle - - if self._debug and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") - - fut = _SyncSocketReaderFuture(sock, self) - handle = new_MethodHandle2( - self, - "Loop._sock_accept", - <method2_t>self._sock_accept, - None, - self, - fut, sock) - - self._add_reader(sock, handle) - return await fut - - @cython.iterable_coroutine - async def sock_connect(self, sock, address): - """Connect to a remote socket at address. - - This method is a coroutine. - """ - if self._debug and sock.gettimeout() != 0: - raise ValueError("the socket must be non-blocking") - - socket_inc_io_ref(sock) - try: - if sock.family == uv.AF_UNIX: - fut = self._sock_connect(sock, address) - else: - addrs = await self.getaddrinfo( - *address[:2], family=sock.family) - - _, _, _, _, address = addrs[0] - fut = self._sock_connect(sock, address) - if fut is not None: - await fut - finally: - socket_dec_io_ref(sock) - - @cython.iterable_coroutine - async def sock_recvfrom(self, sock, bufsize): - raise NotImplementedError - - @cython.iterable_coroutine - async def sock_recvfrom_into(self, sock, buf, nbytes=0): - raise NotImplementedError - - @cython.iterable_coroutine - async def sock_sendto(self, sock, data, address): - raise NotImplementedError - - @cython.iterable_coroutine - async def connect_accepted_socket(self, protocol_factory, sock, *, - ssl=None, - ssl_handshake_timeout=None, - ssl_shutdown_timeout=None): - """Handle an accepted connection. - - This is used by servers that accept connections outside of - asyncio but that use asyncio to handle connections. - - This method is a coroutine. When completed, the coroutine - returns a (transport, protocol) pair. - """ - - cdef: - UVStream transport = None - - if ssl is not None: - if not isinstance(ssl, ssl_SSLContext): - raise TypeError('ssl argument must be an SSLContext or None') - else: - if ssl_handshake_timeout is not None: - raise ValueError( - 'ssl_handshake_timeout is only meaningful with ssl') - if ssl_shutdown_timeout is not None: - raise ValueError( - 'ssl_shutdown_timeout is only meaningful with ssl') - - if not _is_sock_stream(sock.type): - raise ValueError( - 'A Stream Socket was expected, got {!r}'.format(sock)) - - app_protocol = protocol_factory() - waiter = self._new_future() - transport_waiter = None - context = Context_CopyCurrent() - - if ssl is None: - protocol = app_protocol - transport_waiter = waiter - else: - protocol = SSLProtocol( - self, app_protocol, ssl, waiter, - server_side=True, - server_hostname=None, - ssl_handshake_timeout=ssl_handshake_timeout, - ssl_shutdown_timeout=ssl_shutdown_timeout) - transport_waiter = None - - if sock.family == uv.AF_UNIX: - transport = <UVStream>UnixTransport.new( - self, protocol, None, transport_waiter, context) - elif sock.family in (uv.AF_INET, uv.AF_INET6): - transport = <UVStream>TCPTransport.new( - self, protocol, None, transport_waiter, context) - - if transport is None: - raise ValueError( - 'invalid socket family, expected AF_UNIX, AF_INET or AF_INET6') - - transport._open(sock.fileno()) - transport._init_protocol() - transport._attach_fileobj(sock) - - if ssl: - app_transport = protocol._get_app_transport(context) - try: - await waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - app_transport.close() - raise - return app_transport, protocol - else: - try: - await waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - transport._close() - raise - return transport, protocol - - def run_in_executor(self, executor, func, *args): - if aio_iscoroutine(func) or aio_iscoroutinefunction(func): - raise TypeError("coroutines cannot be used with run_in_executor()") - - self._check_closed() - - if executor is None: - executor = self._default_executor - # Only check when the default executor is being used - self._check_default_executor() - if executor is None: - executor = cc_ThreadPoolExecutor() - self._default_executor = executor - - return aio_wrap_future(executor.submit(func, *args), loop=self) - - def set_default_executor(self, executor): - self._default_executor = executor - - @cython.iterable_coroutine - async def __subprocess_run(self, protocol_factory, args, - stdin=subprocess_PIPE, - stdout=subprocess_PIPE, - stderr=subprocess_PIPE, - universal_newlines=False, - shell=True, - bufsize=0, - preexec_fn=None, - close_fds=None, - cwd=None, - env=None, - startupinfo=None, - creationflags=0, - restore_signals=True, - start_new_session=False, - executable=None, - pass_fds=(), - # For tests only! Do not use in your code. Ever. - __uvloop_sleep_after_fork=False): - - # TODO: Implement close_fds (might not be very important in - # Python 3.5, since all FDs aren't inheritable by default.) - - cdef: - int debug_flags = 0 - - if universal_newlines: - raise ValueError("universal_newlines must be False") - if bufsize != 0: - raise ValueError("bufsize must be 0") - if startupinfo is not None: - raise ValueError('startupinfo is not supported') - if creationflags != 0: - raise ValueError('creationflags is not supported') - - if executable is not None: - args[0] = executable - - if __uvloop_sleep_after_fork: - debug_flags |= __PROCESS_DEBUG_SLEEP_AFTER_FORK - - waiter = self._new_future() - protocol = protocol_factory() - proc = UVProcessTransport.new(self, protocol, - args, env, cwd, start_new_session, - stdin, stdout, stderr, pass_fds, - waiter, - debug_flags, - preexec_fn, - restore_signals) - - try: - await waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - proc.close() - raise - - return proc, protocol - - @cython.iterable_coroutine - async def subprocess_shell(self, protocol_factory, cmd, *, - shell=True, - **kwargs): - - if not shell: - raise ValueError("shell must be True") - - args = [cmd] - if shell: - args = [b'/bin/sh', b'-c'] + args - - return await self.__subprocess_run(protocol_factory, args, shell=True, - **kwargs) - - @cython.iterable_coroutine - async def subprocess_exec(self, protocol_factory, program, *args, - shell=False, **kwargs): - - if shell: - raise ValueError("shell must be False") - - args = list((program,) + args) - - return await self.__subprocess_run(protocol_factory, args, shell=False, - **kwargs) - - @cython.iterable_coroutine - async def connect_read_pipe(self, proto_factory, pipe): - """Register read pipe in event loop. Set the pipe to non-blocking mode. - - protocol_factory should instantiate object with Protocol interface. - pipe is a file-like object. - Return pair (transport, protocol), where transport supports the - ReadTransport interface.""" - cdef: - ReadUnixTransport transp - - waiter = self._new_future() - proto = proto_factory() - transp = ReadUnixTransport.new(self, proto, None, waiter) - transp._add_extra_info('pipe', pipe) - try: - transp._open(pipe.fileno()) - transp._init_protocol() - await waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - transp._close() - raise - transp._attach_fileobj(pipe) - return transp, proto - - @cython.iterable_coroutine - async def connect_write_pipe(self, proto_factory, pipe): - """Register write pipe in event loop. - - protocol_factory should instantiate object with BaseProtocol interface. - Pipe is file-like object already switched to nonblocking. - Return pair (transport, protocol), where transport support - WriteTransport interface.""" - cdef: - WriteUnixTransport transp - - waiter = self._new_future() - proto = proto_factory() - transp = WriteUnixTransport.new(self, proto, None, waiter) - transp._add_extra_info('pipe', pipe) - try: - transp._open(pipe.fileno()) - transp._init_protocol() - await waiter - except (KeyboardInterrupt, SystemExit): - raise - except BaseException: - transp._close() - raise - transp._attach_fileobj(pipe) - return transp, proto - - def add_signal_handler(self, sig, callback, *args): - """Add a handler for a signal. UNIX only. - - Raise ValueError if the signal number is invalid or uncatchable. - Raise RuntimeError if there is a problem setting up the handler. - """ - cdef: - Handle h - - if not self._is_main_thread(): - raise ValueError( - 'add_signal_handler() can only be called from ' - 'the main thread') - - if (aio_iscoroutine(callback) - or aio_iscoroutinefunction(callback)): - raise TypeError( - "coroutines cannot be used with add_signal_handler()") - - if sig == uv.SIGCHLD: - if (hasattr(callback, '__self__') and - isinstance(callback.__self__, aio_AbstractChildWatcher)): - - warnings_warn( - "!!! asyncio is trying to install its ChildWatcher for " - "SIGCHLD signal !!!\n\nThis is probably because a uvloop " - "instance is used with asyncio.set_event_loop(). " - "The correct way to use uvloop is to install its policy: " - "`asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())`" - "\n\n", RuntimeWarning, source=self) - - # TODO: ideally we should always raise an error here, - # but that would be a backwards incompatible change, - # because we recommended using "asyncio.set_event_loop()" - # in our README. Need to start a deprecation period - # at some point to turn this warning into an error. - return - - raise RuntimeError( - 'cannot add a signal handler for SIGCHLD: it is used ' - 'by the event loop to track subprocesses') - - self._check_signal(sig) - self._check_closed() - - h = new_Handle(self, callback, args or None, None) - self._signal_handlers[sig] = h - - try: - # Register a dummy signal handler to ask Python to write the signal - # number in the wakeup file descriptor. - signal_signal(sig, self.__sighandler) - - # Set SA_RESTART to limit EINTR occurrences. - signal_siginterrupt(sig, False) - except OSError as exc: - del self._signal_handlers[sig] - if not self._signal_handlers: - try: - signal_set_wakeup_fd(-1) - except (ValueError, OSError) as nexc: - aio_logger.info('set_wakeup_fd(-1) failed: %s', nexc) - - if exc.errno == errno_EINVAL: - raise RuntimeError('sig {} cannot be caught'.format(sig)) - else: - raise - - def remove_signal_handler(self, sig): - """Remove a handler for a signal. UNIX only. - - Return True if a signal handler was removed, False if not. - """ - - if not self._is_main_thread(): - raise ValueError( - 'remove_signal_handler() can only be called from ' - 'the main thread') - - self._check_signal(sig) - - if not self._listening_signals: - return False - - try: - del self._signal_handlers[sig] - except KeyError: - return False - - if sig == uv.SIGINT: - handler = signal_default_int_handler - else: - handler = signal_SIG_DFL - - try: - signal_signal(sig, handler) - except OSError as exc: - if exc.errno == errno_EINVAL: - raise RuntimeError('sig {} cannot be caught'.format(sig)) - else: - raise - - return True - - @cython.iterable_coroutine - async def create_datagram_endpoint(self, protocol_factory, - local_addr=None, remote_addr=None, *, - family=0, proto=0, flags=0, - reuse_address=_unset, reuse_port=None, - allow_broadcast=None, sock=None): - """A coroutine which creates a datagram endpoint. - - This method will try to establish the endpoint in the background. - When successful, the coroutine returns a (transport, protocol) pair. - - protocol_factory must be a callable returning a protocol instance. - - socket family AF_INET or socket.AF_INET6 depending on host (or - family if specified), socket type SOCK_DGRAM. - - reuse_port tells the kernel to allow this endpoint to be bound to - the same port as other existing endpoints are bound to, so long as - they all set this flag when being created. This option is not - supported on Windows and some UNIX's. If the - :py:data:`~socket.SO_REUSEPORT` constant is not defined then this - capability is unsupported. - - allow_broadcast tells the kernel to allow this endpoint to send - messages to the broadcast address. - - sock can optionally be specified in order to use a preexisting - socket object. - """ - cdef: - UDPTransport udp = None - system.addrinfo * lai - system.addrinfo * rai - - if sock is not None: - if not _is_sock_dgram(sock.type): - raise ValueError( - 'A UDP Socket was expected, got {!r}'.format(sock)) - if (local_addr or remote_addr or - family or proto or flags or - reuse_port or allow_broadcast): - # show the problematic kwargs in exception msg - opts = dict(local_addr=local_addr, remote_addr=remote_addr, - family=family, proto=proto, flags=flags, - reuse_address=reuse_address, reuse_port=reuse_port, - allow_broadcast=allow_broadcast) - problems = ', '.join( - '{}={}'.format(k, v) for k, v in opts.items() if v) - raise ValueError( - 'socket modifier keyword arguments can not be used ' - 'when sock is specified. ({})'.format(problems)) - sock.setblocking(False) - udp = UDPTransport.__new__(UDPTransport) - udp._init(self, uv.AF_UNSPEC) - udp.open(sock.family, sock.fileno()) - udp._attach_fileobj(sock) - else: - if reuse_address is not _unset: - if reuse_address: - raise ValueError("Passing `reuse_address=True` is no " - "longer supported, as the usage of " - "SO_REUSEPORT in UDP poses a significant " - "security concern.") - else: - warnings_warn("The *reuse_address* parameter has been " - "deprecated as of 0.15.", DeprecationWarning, - stacklevel=2) - reuse_port = bool(reuse_port) - if reuse_port and not has_SO_REUSEPORT: - raise ValueError( - 'reuse_port not supported by socket module') - - lads = None - if local_addr is not None: - if (not isinstance(local_addr, (tuple, list)) or - len(local_addr) != 2): - raise TypeError( - 'local_addr must be a tuple of (host, port)') - lads = await self._getaddrinfo( - local_addr[0], local_addr[1], - family, uv.SOCK_DGRAM, proto, flags, - 0) - - rads = None - if remote_addr is not None: - if (not isinstance(remote_addr, (tuple, list)) or - len(remote_addr) != 2): - raise TypeError( - 'remote_addr must be a tuple of (host, port)') - rads = await self._getaddrinfo( - remote_addr[0], remote_addr[1], - family, uv.SOCK_DGRAM, proto, flags, - 0) - - excs = [] - if lads is None: - if rads is not None: - udp = UDPTransport.__new__(UDPTransport) - rai = (<AddrInfo>rads).data - udp._init(self, rai.ai_family) - udp._connect(rai.ai_addr, rai.ai_addrlen) - udp._set_address(rai) - else: - if family not in (uv.AF_INET, uv.AF_INET6): - raise ValueError('unexpected address family') - udp = UDPTransport.__new__(UDPTransport) - udp._init(self, family) - - if reuse_port: - self._sock_set_reuseport(udp._fileno()) - - else: - lai = (<AddrInfo>lads).data - while lai is not NULL: - try: - udp = UDPTransport.__new__(UDPTransport) - udp._init(self, lai.ai_family) - if reuse_port: - self._sock_set_reuseport(udp._fileno()) - udp._bind(lai.ai_addr) - except (KeyboardInterrupt, SystemExit): - raise - except BaseException as ex: - lai = lai.ai_next - excs.append(ex) - continue - else: - break - else: - ctx = None - if len(excs): - ctx = excs[0] - raise OSError('could not bind to local_addr {}'.format( - local_addr)) from ctx - - if rads is not None: - rai = (<AddrInfo>rads).data - while rai is not NULL: - if rai.ai_family != lai.ai_family: - rai = rai.ai_next - continue - if rai.ai_protocol != lai.ai_protocol: - rai = rai.ai_next - continue - udp._connect(rai.ai_addr, rai.ai_addrlen) - udp._set_address(rai) - break - else: - raise OSError( - 'could not bind to remote_addr {}'.format( - remote_addr)) - - if allow_broadcast: - udp._set_broadcast(1) - - protocol = protocol_factory() - waiter = self._new_future() - assert udp is not None - udp._set_protocol(protocol) - udp._set_waiter(waiter) - udp._init_protocol() - - await waiter - return udp, protocol - - def _monitor_fs(self, path: str, callback) -> asyncio.Handle: - cdef: - UVFSEvent fs_handle - char* c_str_path - - self._check_closed() - fs_handle = UVFSEvent.new(self, callback, None) - p_bytes = path.encode('UTF-8') - c_str_path = p_bytes - flags = 0 - fs_handle.start(c_str_path, flags) - return fs_handle - - def _check_default_executor(self): - if self._executor_shutdown_called: - raise RuntimeError('Executor shutdown has been called') - - def _asyncgen_finalizer_hook(self, agen): - self._asyncgens.discard(agen) - if not self.is_closed(): - self.call_soon_threadsafe(self.create_task, agen.aclose()) - - def _asyncgen_firstiter_hook(self, agen): - if self._asyncgens_shutdown_called: - warnings_warn( - "asynchronous generator {!r} was scheduled after " - "loop.shutdown_asyncgens() call".format(agen), - ResourceWarning, source=self) - - self._asyncgens.add(agen) - - @cython.iterable_coroutine - async def shutdown_asyncgens(self): - """Shutdown all active asynchronous generators.""" - self._asyncgens_shutdown_called = True - - if not len(self._asyncgens): - return - - closing_agens = list(self._asyncgens) - self._asyncgens.clear() - - shutdown_coro = aio_gather( - *[ag.aclose() for ag in closing_agens], - return_exceptions=True) - - results = await shutdown_coro - for result, agen in zip(results, closing_agens): - if isinstance(result, Exception): - self.call_exception_handler({ - 'message': 'an error occurred during closing of ' - 'asynchronous generator {!r}'.format(agen), - 'exception': result, - 'asyncgen': agen - }) - - @cython.iterable_coroutine - async def shutdown_default_executor(self, timeout=None): - """Schedule the shutdown of the default executor. - - The timeout parameter specifies the amount of time the executor will - be given to finish joining. The default value is None, which means - that the executor will be given an unlimited amount of time. - """ - self._executor_shutdown_called = True - if self._default_executor is None: - return - future = self.create_future() - thread = threading_Thread(target=self._do_shutdown, args=(future,)) - thread.start() - try: - await future - finally: - thread.join(timeout) - - if thread.is_alive(): - warnings_warn( - "The executor did not finishing joining " - f"its threads within {timeout} seconds.", - RuntimeWarning, - stacklevel=2 - ) - self._default_executor.shutdown(wait=False) - - def _do_shutdown(self, future): - try: - self._default_executor.shutdown(wait=True) - self.call_soon_threadsafe(future.set_result, None) - except Exception as ex: - self.call_soon_threadsafe(future.set_exception, ex) - - -# Expose pointer for integration with other C-extensions -def libuv_get_loop_t_ptr(loop): - return PyCapsule_New(<void *>(<Loop>loop).uvloop, NULL, NULL) - - -def libuv_get_version(): - return uv.uv_version() - - -def _testhelper_unwrap_capsuled_pointer(obj): - return <uint64_t>PyCapsule_GetPointer(obj, NULL) - - -cdef void __loop_alloc_buffer( - uv.uv_handle_t* uvhandle, - size_t suggested_size, - uv.uv_buf_t* buf -) noexcept with gil: - cdef: - Loop loop = (<UVHandle>uvhandle.data)._loop - - if loop._recv_buffer_in_use == 1: - buf.len = 0 - exc = RuntimeError('concurrent allocations') - loop._handle_exception(exc) - return - - loop._recv_buffer_in_use = 1 - buf.base = loop._recv_buffer - buf.len = sizeof(loop._recv_buffer) - - -cdef inline void __loop_free_buffer(Loop loop): - loop._recv_buffer_in_use = 0 - - -class _SyncSocketReaderFuture(aio_Future): - - def __init__(self, sock, loop): - aio_Future.__init__(self, loop=loop) - self.__sock = sock - self.__loop = loop - - def __remove_reader(self): - if self.__sock is not None and self.__sock.fileno() != -1: - self.__loop.remove_reader(self.__sock) - self.__sock = None - - if PY39: - def cancel(self, msg=None): - self.__remove_reader() - aio_Future.cancel(self, msg=msg) - - else: - def cancel(self): - self.__remove_reader() - aio_Future.cancel(self) - - -class _SyncSocketWriterFuture(aio_Future): - - def __init__(self, sock, loop): - aio_Future.__init__(self, loop=loop) - self.__sock = sock - self.__loop = loop - - def __remove_writer(self): - if self.__sock is not None and self.__sock.fileno() != -1: - self.__loop.remove_writer(self.__sock) - self.__sock = None - - if PY39: - def cancel(self, msg=None): - self.__remove_writer() - aio_Future.cancel(self, msg=msg) - - else: - def cancel(self): - self.__remove_writer() - aio_Future.cancel(self) - - -include "cbhandles.pyx" -include "pseudosock.pyx" -include "lru.pyx" - -include "handles/handle.pyx" -include "handles/async_.pyx" -include "handles/idle.pyx" -include "handles/check.pyx" -include "handles/timer.pyx" -include "handles/poll.pyx" -include "handles/basetransport.pyx" -include "handles/stream.pyx" -include "handles/streamserver.pyx" -include "handles/tcp.pyx" -include "handles/pipe.pyx" -include "handles/process.pyx" -include "handles/fsevent.pyx" - -include "request.pyx" -include "dns.pyx" -include "sslproto.pyx" - -include "handles/udp.pyx" - -include "server.pyx" - - -# Used in UVProcess -cdef vint __atfork_installed = 0 -cdef vint __forking = 0 -cdef Loop __forking_loop = None - - -cdef void __get_fork_handler() noexcept nogil: - with gil: - if (__forking and __forking_loop is not None and - __forking_loop.active_process_handler is not None): - __forking_loop.active_process_handler._after_fork() - -cdef __install_atfork(): - global __atfork_installed - - if __atfork_installed: - return - __atfork_installed = 1 - - cdef int err - - err = system.pthread_atfork(NULL, NULL, &system.handleAtFork) - if err: - __atfork_installed = 0 - raise convert_error(-err) - - -# Install PyMem* memory allocators -cdef vint __mem_installed = 0 -cdef __install_pymem(): - global __mem_installed - if __mem_installed: - return - __mem_installed = 1 - - cdef int err - err = uv.uv_replace_allocator(<uv.uv_malloc_func>PyMem_RawMalloc, - <uv.uv_realloc_func>PyMem_RawRealloc, - <uv.uv_calloc_func>PyMem_RawCalloc, - <uv.uv_free_func>PyMem_RawFree) - if err < 0: - __mem_installed = 0 - raise convert_error(err) - - -cdef _set_signal_wakeup_fd(fd): - if fd >= 0: - return signal_set_wakeup_fd(fd, warn_on_full_buffer=False) - else: - return signal_set_wakeup_fd(fd) - - -# Helpers for tests - -@cython.iterable_coroutine -async def _test_coroutine_1(): - return 42 |