summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/uvloop/loop.pyx
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/uvloop/loop.pyx
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/loop.pyx')
-rw-r--r--venv/lib/python3.11/site-packages/uvloop/loop.pyx3403
1 files changed, 0 insertions, 3403 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/loop.pyx b/venv/lib/python3.11/site-packages/uvloop/loop.pyx
deleted file mode 100644
index 334d8d5..0000000
--- a/venv/lib/python3.11/site-packages/uvloop/loop.pyx
+++ /dev/null
@@ -1,3403 +0,0 @@
-# cython: language_level=3, embedsignature=True
-
-import asyncio
-cimport cython
-
-from .includes.debug cimport UVLOOP_DEBUG
-from .includes cimport uv
-from .includes cimport system
-from .includes.python cimport (
- PY_VERSION_HEX,
- PyMem_RawMalloc, PyMem_RawFree,
- PyMem_RawCalloc, PyMem_RawRealloc,
- PyUnicode_EncodeFSDefault,
- PyErr_SetInterrupt,
- _Py_RestoreSignals,
- Context_CopyCurrent,
- Context_Enter,
- Context_Exit,
- PyMemoryView_FromMemory, PyBUF_WRITE,
- PyMemoryView_FromObject, PyMemoryView_Check,
- PyOS_AfterFork_Parent, PyOS_AfterFork_Child,
- PyOS_BeforeFork,
- PyUnicode_FromString
-)
-from .includes.flowcontrol cimport add_flowcontrol_defaults
-
-from libc.stdint cimport uint64_t
-from libc.string cimport memset, strerror, memcpy
-from libc cimport errno
-
-from cpython cimport PyObject
-from cpython cimport PyErr_CheckSignals, PyErr_Occurred
-from cpython cimport PyThread_get_thread_ident
-from cpython cimport Py_INCREF, Py_DECREF, Py_XDECREF, Py_XINCREF
-from cpython cimport (
- PyObject_GetBuffer, PyBuffer_Release, PyBUF_SIMPLE,
- Py_buffer, PyBytes_AsString, PyBytes_CheckExact,
- PyBytes_AsStringAndSize,
- Py_SIZE, PyBytes_AS_STRING, PyBUF_WRITABLE
-)
-from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer
-
-from . import _noop
-
-
-include "includes/consts.pxi"
-include "includes/stdlib.pxi"
-
-include "errors.pyx"
-
-cdef:
- int PY39 = PY_VERSION_HEX >= 0x03090000
- int PY311 = PY_VERSION_HEX >= 0x030b0000
- uint64_t MAX_SLEEP = 3600 * 24 * 365 * 100
-
-
-cdef _is_sock_stream(sock_type):
- if SOCK_NONBLOCK == -1:
- return sock_type == uv.SOCK_STREAM
- else:
- # Linux's socket.type is a bitmask that can include extra info
- # about socket (like SOCK_NONBLOCK bit), therefore we can't do simple
- # `sock_type == socket.SOCK_STREAM`, see
- # https://github.com/torvalds/linux/blob/v4.13/include/linux/net.h#L77
- # for more details.
- return (sock_type & 0xF) == uv.SOCK_STREAM
-
-
-cdef _is_sock_dgram(sock_type):
- if SOCK_NONBLOCK == -1:
- return sock_type == uv.SOCK_DGRAM
- else:
- # Read the comment in `_is_sock_stream`.
- return (sock_type & 0xF) == uv.SOCK_DGRAM
-
-
-cdef isfuture(obj):
- if aio_isfuture is None:
- return isinstance(obj, aio_Future)
- else:
- return aio_isfuture(obj)
-
-
-cdef inline socket_inc_io_ref(sock):
- if isinstance(sock, socket_socket):
- sock._io_refs += 1
-
-
-cdef inline socket_dec_io_ref(sock):
- if isinstance(sock, socket_socket):
- sock._decref_socketios()
-
-
-cdef inline run_in_context(context, method):
- # This method is internally used to workaround a reference issue that in
- # certain circumstances, inlined context.run() will not hold a reference to
- # the given method instance, which - if deallocated - will cause segfault.
- # See also: edgedb/edgedb#2222
- Py_INCREF(method)
- try:
- return context.run(method)
- finally:
- Py_DECREF(method)
-
-
-cdef inline run_in_context1(context, method, arg):
- Py_INCREF(method)
- try:
- return context.run(method, arg)
- finally:
- Py_DECREF(method)
-
-
-cdef inline run_in_context2(context, method, arg1, arg2):
- Py_INCREF(method)
- try:
- return context.run(method, arg1, arg2)
- finally:
- Py_DECREF(method)
-
-
-# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s
-# *reuse_address* parameter
-_unset = object()
-
-
-@cython.no_gc_clear
-cdef class Loop:
- def __cinit__(self):
- cdef int err
-
- # Install PyMem* memory allocators if they aren't installed yet.
- __install_pymem()
-
- # Install pthread_atfork handlers
- __install_atfork()
-
- self.uvloop = <uv.uv_loop_t*>PyMem_RawMalloc(sizeof(uv.uv_loop_t))
- if self.uvloop is NULL:
- raise MemoryError()
-
- self.slow_callback_duration = 0.1
-
- self._closed = 0
- self._debug = 0
- self._thread_id = 0
- self._running = 0
- self._stopping = 0
-
- self._transports = weakref_WeakValueDictionary()
- self._processes = set()
-
- # Used to keep a reference (and hence keep the fileobj alive)
- # for as long as its registered by add_reader or add_writer.
- # This is how the selector module and hence asyncio behaves.
- self._fd_to_reader_fileobj = {}
- self._fd_to_writer_fileobj = {}
-
- self._timers = set()
- self._polls = {}
-
- self._recv_buffer_in_use = 0
-
- err = uv.uv_loop_init(self.uvloop)
- if err < 0:
- raise convert_error(err)
- self.uvloop.data = <void*> self
-
- self._init_debug_fields()
-
- self.active_process_handler = None
-
- self._last_error = None
-
- self._task_factory = None
- self._exception_handler = None
- self._default_executor = None
-
- self._queued_streams = set()
- self._executing_streams = set()
- self._ready = col_deque()
- self._ready_len = 0
-
- self.handler_async = UVAsync.new(
- self, <method_t>self._on_wake, self)
-
- self.handler_idle = UVIdle.new(
- self,
- new_MethodHandle(
- self, "loop._on_idle", <method_t>self._on_idle, None, self))
-
- # Needed to call `UVStream._exec_write` for writes scheduled
- # during `Protocol.data_received`.
- self.handler_check__exec_writes = UVCheck.new(
- self,
- new_MethodHandle(
- self, "loop._exec_queued_writes",
- <method_t>self._exec_queued_writes, None, self))
-
- self._signals = set()
- self._ssock = self._csock = None
- self._signal_handlers = {}
- self._listening_signals = False
- self._old_signal_wakeup_id = -1
-
- self._coroutine_debug_set = False
-
- # A weak set of all asynchronous generators that are
- # being iterated by the loop.
- self._asyncgens = weakref_WeakSet()
-
- # Set to True when `loop.shutdown_asyncgens` is called.
- self._asyncgens_shutdown_called = False
- # Set to True when `loop.shutdown_default_executor` is called.
- self._executor_shutdown_called = False
-
- self._servers = set()
-
- cdef inline _is_main_thread(self):
- cdef uint64_t main_thread_id = system.MAIN_THREAD_ID
- if system.MAIN_THREAD_ID_SET == 0:
- main_thread_id = <uint64_t>threading_main_thread().ident
- system.setMainThreadID(main_thread_id)
- return main_thread_id == PyThread_get_thread_ident()
-
- def __init__(self):
- self.set_debug(
- sys_dev_mode or (not sys_ignore_environment
- and bool(os_environ.get('PYTHONASYNCIODEBUG'))))
-
- def __dealloc__(self):
- if self._running == 1:
- raise RuntimeError('deallocating a running event loop!')
- if self._closed == 0:
- aio_logger.error("deallocating an open event loop")
- return
- PyMem_RawFree(self.uvloop)
- self.uvloop = NULL
-
- cdef _init_debug_fields(self):
- self._debug_cc = bool(UVLOOP_DEBUG)
-
- if UVLOOP_DEBUG:
- self._debug_handles_current = col_Counter()
- self._debug_handles_closed = col_Counter()
- self._debug_handles_total = col_Counter()
- else:
- self._debug_handles_current = None
- self._debug_handles_closed = None
- self._debug_handles_total = None
-
- self._debug_uv_handles_total = 0
- self._debug_uv_handles_freed = 0
-
- self._debug_stream_read_cb_total = 0
- self._debug_stream_read_eof_total = 0
- self._debug_stream_read_errors_total = 0
- self._debug_stream_read_cb_errors_total = 0
- self._debug_stream_read_eof_cb_errors_total = 0
-
- self._debug_stream_shutdown_errors_total = 0
- self._debug_stream_listen_errors_total = 0
-
- self._debug_stream_write_tries = 0
- self._debug_stream_write_errors_total = 0
- self._debug_stream_write_ctx_total = 0
- self._debug_stream_write_ctx_cnt = 0
- self._debug_stream_write_cb_errors_total = 0
-
- self._debug_cb_handles_total = 0
- self._debug_cb_handles_count = 0
-
- self._debug_cb_timer_handles_total = 0
- self._debug_cb_timer_handles_count = 0
-
- self._poll_read_events_total = 0
- self._poll_read_cb_errors_total = 0
- self._poll_write_events_total = 0
- self._poll_write_cb_errors_total = 0
-
- self._sock_try_write_total = 0
-
- self._debug_exception_handler_cnt = 0
-
- cdef _setup_or_resume_signals(self):
- if not self._is_main_thread():
- return
-
- if self._listening_signals:
- raise RuntimeError('signals handling has been already setup')
-
- if self._ssock is not None:
- raise RuntimeError('self-pipe exists before loop run')
-
- # Create a self-pipe and call set_signal_wakeup_fd() with one
- # of its ends. This is needed so that libuv knows that it needs
- # to wakeup on ^C (no matter if the SIGINT handler is still the
- # standard Python's one or or user set their own.)
-
- self._ssock, self._csock = socket_socketpair()
- try:
- self._ssock.setblocking(False)
- self._csock.setblocking(False)
-
- fileno = self._csock.fileno()
-
- self._old_signal_wakeup_id = _set_signal_wakeup_fd(fileno)
- except Exception:
- # Out of all statements in the try block, only the
- # "_set_signal_wakeup_fd()" call can fail, but it shouldn't,
- # as we ensure that the current thread is the main thread.
- # Still, if something goes horribly wrong we want to clean up
- # the socket pair.
- self._ssock.close()
- self._csock.close()
- self._ssock = None
- self._csock = None
- raise
-
- self._add_reader(
- self._ssock,
- new_MethodHandle(
- self,
- "Loop._read_from_self",
- <method_t>self._read_from_self,
- None,
- self))
-
- self._listening_signals = True
-
- cdef _pause_signals(self):
- if not self._is_main_thread():
- if self._listening_signals:
- raise RuntimeError(
- 'cannot pause signals handling; no longer running in '
- 'the main thread')
- else:
- return
-
- if not self._listening_signals:
- raise RuntimeError('signals handling has not been setup')
-
- self._listening_signals = False
-
- _set_signal_wakeup_fd(self._old_signal_wakeup_id)
-
- self._remove_reader(self._ssock)
- self._ssock.close()
- self._csock.close()
- self._ssock = None
- self._csock = None
-
- cdef _shutdown_signals(self):
- if not self._is_main_thread():
- if self._signal_handlers:
- aio_logger.warning(
- 'cannot cleanup signal handlers: closing the event loop '
- 'in a non-main OS thread')
- return
-
- if self._listening_signals:
- raise RuntimeError(
- 'cannot shutdown signals handling as it has not been paused')
-
- if self._ssock:
- raise RuntimeError(
- 'self-pipe was not cleaned up after loop was run')
-
- for sig in list(self._signal_handlers):
- self.remove_signal_handler(sig)
-
- def __sighandler(self, signum, frame):
- self._signals.add(signum)
-
- cdef inline _ceval_process_signals(self):
- # Invoke CPython eval loop to let process signals.
- PyErr_CheckSignals()
- # Calling a pure-Python function will invoke
- # _PyEval_EvalFrameDefault which will process
- # pending signal callbacks.
- _noop.noop() # Might raise ^C
-
- cdef _read_from_self(self):
- cdef bytes sigdata
- sigdata = b''
- while True:
- try:
- data = self._ssock.recv(65536)
- if not data:
- break
- sigdata += data
- except InterruptedError:
- continue
- except BlockingIOError:
- break
- if sigdata:
- self._invoke_signals(sigdata)
-
- cdef _invoke_signals(self, bytes data):
- cdef set sigs
-
- self._ceval_process_signals()
-
- sigs = self._signals.copy()
- self._signals.clear()
- for signum in data:
- if not signum:
- # ignore null bytes written by set_wakeup_fd()
- continue
- sigs.discard(signum)
- self._handle_signal(signum)
-
- for signum in sigs:
- # Since not all signals are registered by add_signal_handler()
- # (for instance, we use the default SIGINT handler) not all
- # signals will trigger loop.__sighandler() callback. Therefore
- # we combine two datasources: one is self-pipe, one is data
- # from __sighandler; this ensures that signals shouldn't be
- # lost even if set_wakeup_fd() couldn't write to the self-pipe.
- self._handle_signal(signum)
-
- cdef _handle_signal(self, sig):
- cdef Handle handle
-
- try:
- handle = <Handle>(self._signal_handlers[sig])
- except KeyError:
- handle = None
-
- if handle is None:
- self._ceval_process_signals()
- return
-
- if handle._cancelled:
- self.remove_signal_handler(sig) # Remove it properly.
- else:
- self._append_ready_handle(handle)
- self.handler_async.send()
-
- cdef _on_wake(self):
- if ((self._ready_len > 0 or self._stopping) and
- not self.handler_idle.running):
- self.handler_idle.start()
-
- cdef _on_idle(self):
- cdef:
- int i, ntodo
- object popleft = self._ready.popleft
- Handle handler
-
- ntodo = len(self._ready)
- if self._debug:
- for i from 0 <= i < ntodo:
- handler = <Handle> popleft()
- if handler._cancelled == 0:
- try:
- started = time_monotonic()
- handler._run()
- except BaseException as ex:
- self._stop(ex)
- return
- else:
- delta = time_monotonic() - started
- if delta > self.slow_callback_duration:
- aio_logger.warning(
- 'Executing %s took %.3f seconds',
- handler._format_handle(), delta)
-
- else:
- for i from 0 <= i < ntodo:
- handler = <Handle> popleft()
- if handler._cancelled == 0:
- try:
- handler._run()
- except BaseException as ex:
- self._stop(ex)
- return
-
- if len(self._queued_streams):
- self._exec_queued_writes()
-
- self._ready_len = len(self._ready)
- if self._ready_len == 0 and self.handler_idle.running:
- self.handler_idle.stop()
-
- if self._stopping:
- uv.uv_stop(self.uvloop) # void
-
- cdef _stop(self, exc):
- if exc is not None:
- self._last_error = exc
- if self._stopping == 1:
- return
- self._stopping = 1
- if not self.handler_idle.running:
- self.handler_idle.start()
-
- cdef __run(self, uv.uv_run_mode mode):
- # Although every UVHandle holds a reference to the loop,
- # we want to do everything to ensure that the loop will
- # never deallocate during the run -- so we do some
- # manual refs management.
- Py_INCREF(self)
- with nogil:
- err = uv.uv_run(self.uvloop, mode)
- Py_DECREF(self)
-
- if err < 0:
- raise convert_error(err)
-
- cdef _run(self, uv.uv_run_mode mode):
- cdef int err
-
- if self._closed == 1:
- raise RuntimeError('unable to start the loop; it was closed')
-
- if self._running == 1:
- raise RuntimeError('this event loop is already running.')
-
- if (aio_get_running_loop is not None and
- aio_get_running_loop() is not None):
- raise RuntimeError(
- 'Cannot run the event loop while another loop is running')
-
- # reset _last_error
- self._last_error = None
-
- self._thread_id = PyThread_get_thread_ident()
- self._running = 1
-
- self.handler_check__exec_writes.start()
- self.handler_idle.start()
-
- self._setup_or_resume_signals()
-
- if aio_set_running_loop is not None:
- aio_set_running_loop(self)
- try:
- self.__run(mode)
- finally:
- if aio_set_running_loop is not None:
- aio_set_running_loop(None)
-
- self.handler_check__exec_writes.stop()
- self.handler_idle.stop()
-
- self._pause_signals()
-
- self._thread_id = 0
- self._running = 0
- self._stopping = 0
-
- if self._last_error is not None:
- # The loop was stopped with an error with 'loop._stop(error)' call
- raise self._last_error
-
- cdef _close(self):
- cdef int err
-
- if self._running == 1:
- raise RuntimeError("Cannot close a running event loop")
-
- if self._closed == 1:
- return
-
- self._closed = 1
-
- for cb_handle in self._ready:
- cb_handle.cancel()
- self._ready.clear()
- self._ready_len = 0
-
- if self._polls:
- for poll_handle in self._polls.values():
- (<UVHandle>poll_handle)._close()
-
- self._polls.clear()
-
- if self._timers:
- for timer_cbhandle in tuple(self._timers):
- timer_cbhandle.cancel()
-
- # Close all remaining handles
- self.handler_async._close()
- self.handler_idle._close()
- self.handler_check__exec_writes._close()
- __close_all_handles(self)
- self._shutdown_signals()
- # During this run there should be no open handles,
- # so it should finish right away
- self.__run(uv.UV_RUN_DEFAULT)
-
- if self._fd_to_writer_fileobj:
- for fileobj in self._fd_to_writer_fileobj.values():
- socket_dec_io_ref(fileobj)
- self._fd_to_writer_fileobj.clear()
-
- if self._fd_to_reader_fileobj:
- for fileobj in self._fd_to_reader_fileobj.values():
- socket_dec_io_ref(fileobj)
- self._fd_to_reader_fileobj.clear()
-
- if self._timers:
- raise RuntimeError(
- f"new timers were queued during loop closing: {self._timers}")
-
- if self._polls:
- raise RuntimeError(
- f"new poll handles were queued during loop closing: "
- f"{self._polls}")
-
- if self._ready:
- raise RuntimeError(
- f"new callbacks were queued during loop closing: "
- f"{self._ready}")
-
- err = uv.uv_loop_close(self.uvloop)
- if err < 0:
- raise convert_error(err)
-
- self.handler_async = None
- self.handler_idle = None
- self.handler_check__exec_writes = None
-
- self._executor_shutdown_called = True
- executor = self._default_executor
- if executor is not None:
- self._default_executor = None
- executor.shutdown(wait=False)
-
- cdef uint64_t _time(self):
- # asyncio doesn't have a time cache, neither should uvloop.
- uv.uv_update_time(self.uvloop) # void
- return uv.uv_now(self.uvloop)
-
- cdef inline _queue_write(self, UVStream stream):
- self._queued_streams.add(stream)
- if not self.handler_check__exec_writes.running:
- self.handler_check__exec_writes.start()
-
- cdef _exec_queued_writes(self):
- if len(self._queued_streams) == 0:
- if self.handler_check__exec_writes.running:
- self.handler_check__exec_writes.stop()
- return
-
- cdef:
- UVStream stream
-
- streams = self._queued_streams
- self._queued_streams = self._executing_streams
- self._executing_streams = streams
- try:
- for pystream in streams:
- stream = <UVStream>pystream
- stream._exec_write()
- finally:
- streams.clear()
-
- if self.handler_check__exec_writes.running:
- if len(self._queued_streams) == 0:
- self.handler_check__exec_writes.stop()
-
- cdef inline _call_soon(self, object callback, object args, object context):
- cdef Handle handle
- handle = new_Handle(self, callback, args, context)
- self._call_soon_handle(handle)
- return handle
-
- cdef inline _append_ready_handle(self, Handle handle):
- self._check_closed()
- self._ready.append(handle)
- self._ready_len += 1
-
- cdef inline _call_soon_handle(self, Handle handle):
- self._append_ready_handle(handle)
- if not self.handler_idle.running:
- self.handler_idle.start()
-
- cdef _call_later(self, uint64_t delay, object callback, object args,
- object context):
- return TimerHandle(self, callback, args, delay, context)
-
- cdef void _handle_exception(self, object ex):
- if isinstance(ex, Exception):
- self.call_exception_handler({'exception': ex})
- else:
- # BaseException
- self._last_error = ex
- # Exit ASAP
- self._stop(None)
-
- cdef inline _check_signal(self, sig):
- if not isinstance(sig, int):
- raise TypeError('sig must be an int, not {!r}'.format(sig))
-
- if not (1 <= sig < signal_NSIG):
- raise ValueError(
- 'sig {} out of range(1, {})'.format(sig, signal_NSIG))
-
- cdef inline _check_closed(self):
- if self._closed == 1:
- raise RuntimeError('Event loop is closed')
-
- cdef inline _check_thread(self):
- if self._thread_id == 0:
- return
-
- cdef uint64_t thread_id
- thread_id = <uint64_t>PyThread_get_thread_ident()
-
- if thread_id != self._thread_id:
- raise RuntimeError(
- "Non-thread-safe operation invoked on an event loop other "
- "than the current one")
-
- cdef inline _new_future(self):
- return aio_Future(loop=self)
-
- cdef _track_transport(self, UVBaseTransport transport):
- self._transports[transport._fileno()] = transport
-
- cdef _track_process(self, UVProcess proc):
- self._processes.add(proc)
-
- cdef _untrack_process(self, UVProcess proc):
- self._processes.discard(proc)
-
- cdef _fileobj_to_fd(self, fileobj):
- """Return a file descriptor from a file object.
-
- Parameters:
- fileobj -- file object or file descriptor
-
- Returns:
- corresponding file descriptor
-
- Raises:
- ValueError if the object is invalid
- """
- # Copy of the `selectors._fileobj_to_fd()` function.
- if isinstance(fileobj, int):
- fd = fileobj
- else:
- try:
- fd = int(fileobj.fileno())
- except (AttributeError, TypeError, ValueError):
- raise ValueError("Invalid file object: "
- "{!r}".format(fileobj)) from None
- if fd < 0:
- raise ValueError("Invalid file descriptor: {}".format(fd))
- return fd
-
- cdef _ensure_fd_no_transport(self, fd):
- cdef UVBaseTransport tr
- try:
- tr = <UVBaseTransport>(self._transports[fd])
- except KeyError:
- pass
- else:
- if tr._is_alive():
- raise RuntimeError(
- 'File descriptor {!r} is used by transport {!r}'.format(
- fd, tr))
-
- cdef _add_reader(self, fileobj, Handle handle):
- cdef:
- UVPoll poll
-
- self._check_closed()
- fd = self._fileobj_to_fd(fileobj)
- self._ensure_fd_no_transport(fd)
-
- try:
- poll = <UVPoll>(self._polls[fd])
- except KeyError:
- poll = UVPoll.new(self, fd)
- self._polls[fd] = poll
-
- poll.start_reading(handle)
-
- old_fileobj = self._fd_to_reader_fileobj.pop(fd, None)
- if old_fileobj is not None:
- socket_dec_io_ref(old_fileobj)
-
- self._fd_to_reader_fileobj[fd] = fileobj
- socket_inc_io_ref(fileobj)
-
- cdef _remove_reader(self, fileobj):
- cdef:
- UVPoll poll
-
- fd = self._fileobj_to_fd(fileobj)
- self._ensure_fd_no_transport(fd)
-
- mapped_fileobj = self._fd_to_reader_fileobj.pop(fd, None)
- if mapped_fileobj is not None:
- socket_dec_io_ref(mapped_fileobj)
-
- if self._closed == 1:
- return False
-
- try:
- poll = <UVPoll>(self._polls[fd])
- except KeyError:
- return False
-
- result = poll.stop_reading()
- if not poll.is_active():
- del self._polls[fd]
- poll._close()
-
- return result
-
- cdef _has_reader(self, fileobj):
- cdef:
- UVPoll poll
-
- self._check_closed()
- fd = self._fileobj_to_fd(fileobj)
-
- try:
- poll = <UVPoll>(self._polls[fd])
- except KeyError:
- return False
-
- return poll.is_reading()
-
- cdef _add_writer(self, fileobj, Handle handle):
- cdef:
- UVPoll poll
-
- self._check_closed()
- fd = self._fileobj_to_fd(fileobj)
- self._ensure_fd_no_transport(fd)
-
- try:
- poll = <UVPoll>(self._polls[fd])
- except KeyError:
- poll = UVPoll.new(self, fd)
- self._polls[fd] = poll
-
- poll.start_writing(handle)
-
- old_fileobj = self._fd_to_writer_fileobj.pop(fd, None)
- if old_fileobj is not None:
- socket_dec_io_ref(old_fileobj)
-
- self._fd_to_writer_fileobj[fd] = fileobj
- socket_inc_io_ref(fileobj)
-
- cdef _remove_writer(self, fileobj):
- cdef:
- UVPoll poll
-
- fd = self._fileobj_to_fd(fileobj)
- self._ensure_fd_no_transport(fd)
-
- mapped_fileobj = self._fd_to_writer_fileobj.pop(fd, None)
- if mapped_fileobj is not None:
- socket_dec_io_ref(mapped_fileobj)
-
- if self._closed == 1:
- return False
-
- try:
- poll = <UVPoll>(self._polls[fd])
- except KeyError:
- return False
-
- result = poll.stop_writing()
- if not poll.is_active():
- del self._polls[fd]
- poll._close()
-
- return result
-
- cdef _has_writer(self, fileobj):
- cdef:
- UVPoll poll
-
- self._check_closed()
- fd = self._fileobj_to_fd(fileobj)
-
- try:
- poll = <UVPoll>(self._polls[fd])
- except KeyError:
- return False
-
- return poll.is_writing()
-
- cdef _getaddrinfo(self, object host, object port,
- int family, int type,
- int proto, int flags,
- int unpack):
-
- if isinstance(port, str):
- port = port.encode()
- elif isinstance(port, int):
- port = str(port).encode()
- if port is not None and not isinstance(port, bytes):
- raise TypeError('port must be a str, bytes or int')
-
- if isinstance(host, str):
- host = host.encode('idna')
- if host is not None:
- if not isinstance(host, bytes):
- raise TypeError('host must be a str or bytes')
-
- fut = self._new_future()
-
- def callback(result):
- if AddrInfo.isinstance(result):
- try:
- if unpack == 0:
- data = result
- else:
- data = (<AddrInfo>result).unpack()
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as ex:
- if not fut.cancelled():
- fut.set_exception(ex)
- else:
- if not fut.cancelled():
- fut.set_result(data)
- else:
- if not fut.cancelled():
- fut.set_exception(result)
-
- AddrInfoRequest(self, host, port, family, type, proto, flags, callback)
- return fut
-
- cdef _getnameinfo(self, system.sockaddr *addr, int flags):
- cdef NameInfoRequest nr
- fut = self._new_future()
-
- def callback(result):
- if isinstance(result, tuple):
- fut.set_result(result)
- else:
- fut.set_exception(result)
-
- nr = NameInfoRequest(self, callback)
- nr.query(addr, flags)
- return fut
-
- cdef _sock_recv(self, fut, sock, n):
- if UVLOOP_DEBUG:
- if fut.cancelled():
- # Shouldn't happen with _SyncSocketReaderFuture.
- raise RuntimeError(
- f'_sock_recv is called on a cancelled Future')
-
- if not self._has_reader(sock):
- raise RuntimeError(
- f'socket {sock!r} does not have a reader '
- f'in the _sock_recv callback')
-
- try:
- data = sock.recv(n)
- except (BlockingIOError, InterruptedError):
- # No need to re-add the reader, let's just wait until
- # the poll handler calls this callback again.
- pass
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as exc:
- fut.set_exception(exc)
- self._remove_reader(sock)
- else:
- fut.set_result(data)
- self._remove_reader(sock)
-
- cdef _sock_recv_into(self, fut, sock, buf):
- if UVLOOP_DEBUG:
- if fut.cancelled():
- # Shouldn't happen with _SyncSocketReaderFuture.
- raise RuntimeError(
- f'_sock_recv_into is called on a cancelled Future')
-
- if not self._has_reader(sock):
- raise RuntimeError(
- f'socket {sock!r} does not have a reader '
- f'in the _sock_recv_into callback')
-
- try:
- data = sock.recv_into(buf)
- except (BlockingIOError, InterruptedError):
- # No need to re-add the reader, let's just wait until
- # the poll handler calls this callback again.
- pass
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as exc:
- fut.set_exception(exc)
- self._remove_reader(sock)
- else:
- fut.set_result(data)
- self._remove_reader(sock)
-
- cdef _sock_sendall(self, fut, sock, data):
- cdef:
- Handle handle
- int n
-
- if UVLOOP_DEBUG:
- if fut.cancelled():
- # Shouldn't happen with _SyncSocketWriterFuture.
- raise RuntimeError(
- f'_sock_sendall is called on a cancelled Future')
-
- if not self._has_writer(sock):
- raise RuntimeError(
- f'socket {sock!r} does not have a writer '
- f'in the _sock_sendall callback')
-
- try:
- n = sock.send(data)
- except (BlockingIOError, InterruptedError):
- # Try next time.
- return
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as exc:
- fut.set_exception(exc)
- self._remove_writer(sock)
- return
-
- self._remove_writer(sock)
-
- if n == len(data):
- fut.set_result(None)
- else:
- if n:
- if not isinstance(data, memoryview):
- data = memoryview(data)
- data = data[n:]
-
- handle = new_MethodHandle3(
- self,
- "Loop._sock_sendall",
- <method3_t>self._sock_sendall,
- None,
- self,
- fut, sock, data)
-
- self._add_writer(sock, handle)
-
- cdef _sock_accept(self, fut, sock):
- try:
- conn, address = sock.accept()
- conn.setblocking(False)
- except (BlockingIOError, InterruptedError):
- # There is an active reader for _sock_accept, so
- # do nothing, it will be called again.
- pass
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as exc:
- fut.set_exception(exc)
- self._remove_reader(sock)
- else:
- fut.set_result((conn, address))
- self._remove_reader(sock)
-
- cdef _sock_connect(self, sock, address):
- cdef:
- Handle handle
-
- try:
- sock.connect(address)
- except (BlockingIOError, InterruptedError):
- pass
- else:
- return
-
- fut = _SyncSocketWriterFuture(sock, self)
- handle = new_MethodHandle3(
- self,
- "Loop._sock_connect",
- <method3_t>self._sock_connect_cb,
- None,
- self,
- fut, sock, address)
-
- self._add_writer(sock, handle)
- return fut
-
- cdef _sock_connect_cb(self, fut, sock, address):
- if UVLOOP_DEBUG:
- if fut.cancelled():
- # Shouldn't happen with _SyncSocketWriterFuture.
- raise RuntimeError(
- f'_sock_connect_cb is called on a cancelled Future')
-
- if not self._has_writer(sock):
- raise RuntimeError(
- f'socket {sock!r} does not have a writer '
- f'in the _sock_connect_cb callback')
-
- try:
- err = sock.getsockopt(uv.SOL_SOCKET, uv.SO_ERROR)
- if err != 0:
- # Jump to any except clause below.
- raise OSError(err, 'Connect call failed %s' % (address,))
- except (BlockingIOError, InterruptedError):
- # socket is still registered, the callback will be retried later
- pass
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as exc:
- fut.set_exception(exc)
- self._remove_writer(sock)
- else:
- fut.set_result(None)
- self._remove_writer(sock)
-
- cdef _sock_set_reuseport(self, int fd):
- cdef:
- int err
- int reuseport_flag = 1
-
- err = system.setsockopt(
- fd,
- uv.SOL_SOCKET,
- SO_REUSEPORT,
- <char*>&reuseport_flag,
- sizeof(reuseport_flag))
-
- if err < 0:
- raise convert_error(-errno.errno)
-
- cdef _set_coroutine_debug(self, bint enabled):
- enabled = bool(enabled)
- if self._coroutine_debug_set == enabled:
- return
-
- if enabled:
- self._coroutine_origin_tracking_saved_depth = (
- sys.get_coroutine_origin_tracking_depth())
- sys.set_coroutine_origin_tracking_depth(
- DEBUG_STACK_DEPTH)
- else:
- sys.set_coroutine_origin_tracking_depth(
- self._coroutine_origin_tracking_saved_depth)
-
- self._coroutine_debug_set = enabled
-
- def _get_backend_id(self):
- """This method is used by uvloop tests and is not part of the API."""
- return uv.uv_backend_fd(self.uvloop)
-
- cdef _print_debug_info(self):
- cdef:
- int err
- uv.uv_rusage_t rusage
-
- err = uv.uv_getrusage(&rusage)
- if err < 0:
- raise convert_error(err)
-
- # OS
-
- print('---- Process info: -----')
- print('Process memory: {}'.format(rusage.ru_maxrss))
- print('Number of signals: {}'.format(rusage.ru_nsignals))
- print('')
-
- # Loop
-
- print('--- Loop debug info: ---')
- print('Loop time: {}'.format(self.time()))
- print('Errors logged: {}'.format(
- self._debug_exception_handler_cnt))
- print()
- print('Callback handles: {: <8} | {}'.format(
- self._debug_cb_handles_count,
- self._debug_cb_handles_total))
- print('Timer handles: {: <8} | {}'.format(
- self._debug_cb_timer_handles_count,
- self._debug_cb_timer_handles_total))
- print()
-
- print(' alive | closed |')
- print('UVHandles python | libuv | total')
- print(' objs | handles |')
- print('-------------------------------+---------+---------')
- for name in sorted(self._debug_handles_total):
- print(' {: <18} {: >7} | {: >7} | {: >7}'.format(
- name,
- self._debug_handles_current[name],
- self._debug_handles_closed[name],
- self._debug_handles_total[name]))
- print()
-
- print('uv_handle_t (current: {}; freed: {}; total: {})'.format(
- self._debug_uv_handles_total - self._debug_uv_handles_freed,
- self._debug_uv_handles_freed,
- self._debug_uv_handles_total))
- print()
-
- print('--- Streams debug info: ---')
- print('Write errors: {}'.format(
- self._debug_stream_write_errors_total))
- print('Write without poll: {}'.format(
- self._debug_stream_write_tries))
- print('Write contexts: {: <8} | {}'.format(
- self._debug_stream_write_ctx_cnt,
- self._debug_stream_write_ctx_total))
- print('Write failed callbacks: {}'.format(
- self._debug_stream_write_cb_errors_total))
- print()
- print('Read errors: {}'.format(
- self._debug_stream_read_errors_total))
- print('Read callbacks: {}'.format(
- self._debug_stream_read_cb_total))
- print('Read failed callbacks: {}'.format(
- self._debug_stream_read_cb_errors_total))
- print('Read EOFs: {}'.format(
- self._debug_stream_read_eof_total))
- print('Read EOF failed callbacks: {}'.format(
- self._debug_stream_read_eof_cb_errors_total))
- print()
- print('Listen errors: {}'.format(
- self._debug_stream_listen_errors_total))
- print('Shutdown errors {}'.format(
- self._debug_stream_shutdown_errors_total))
- print()
-
- print('--- Polls debug info: ---')
- print('Read events: {}'.format(
- self._poll_read_events_total))
- print('Read callbacks failed: {}'.format(
- self._poll_read_cb_errors_total))
- print('Write events: {}'.format(
- self._poll_write_events_total))
- print('Write callbacks failed: {}'.format(
- self._poll_write_cb_errors_total))
- print()
-
- print('--- Sock ops successful on 1st try: ---')
- print('Socket try-writes: {}'.format(
- self._sock_try_write_total))
-
- print(flush=True)
-
- property print_debug_info:
- def __get__(self):
- if UVLOOP_DEBUG:
- return lambda: self._print_debug_info()
- else:
- raise AttributeError('print_debug_info')
-
- # Public API
-
- def __repr__(self):
- return '<{}.{} running={} closed={} debug={}>'.format(
- self.__class__.__module__,
- self.__class__.__name__,
- self.is_running(),
- self.is_closed(),
- self.get_debug()
- )
-
- def call_soon(self, callback, *args, context=None):
- """Arrange for a callback to be called as soon as possible.
-
- This operates as a FIFO queue: callbacks are called in the
- order in which they are registered. Each callback will be
- called exactly once.
-
- Any positional arguments after the callback will be passed to
- the callback when it is called.
- """
- if self._debug == 1:
- self._check_thread()
- if args:
- return self._call_soon(callback, args, context)
- else:
- return self._call_soon(callback, None, context)
-
- def call_soon_threadsafe(self, callback, *args, context=None):
- """Like call_soon(), but thread-safe."""
- if not args:
- args = None
- cdef Handle handle = new_Handle(self, callback, args, context)
- self._append_ready_handle(handle) # deque append is atomic
- # libuv async handler is thread-safe while the idle handler is not -
- # we only set the async handler here, which will start the idle handler
- # in _on_wake() from the loop and eventually call the callback.
- self.handler_async.send()
- return handle
-
- def call_later(self, delay, callback, *args, context=None):
- """Arrange for a callback to be called at a given time.
-
- Return a Handle: an opaque object with a cancel() method that
- can be used to cancel the call.
-
- The delay can be an int or float, expressed in seconds. It is
- always relative to the current time.
-
- Each callback will be called exactly once. If two callbacks
- are scheduled for exactly the same time, it undefined which
- will be called first.
-
- Any positional arguments after the callback will be passed to
- the callback when it is called.
- """
- cdef uint64_t when
-
- self._check_closed()
- if self._debug == 1:
- self._check_thread()
-
- if delay < 0:
- delay = 0
- elif delay == py_inf or delay > MAX_SLEEP:
- # ~100 years sounds like a good approximation of
- # infinity for a Python application.
- delay = MAX_SLEEP
-
- when = <uint64_t>round(delay * 1000)
- if not args:
- args = None
- if when == 0:
- return self._call_soon(callback, args, context)
- else:
- return self._call_later(when, callback, args, context)
-
- def call_at(self, when, callback, *args, context=None):
- """Like call_later(), but uses an absolute time.
-
- Absolute time corresponds to the event loop's time() method.
- """
- return self.call_later(
- when - self.time(), callback, *args, context=context)
-
- def time(self):
- """Return the time according to the event loop's clock.
-
- This is a float expressed in seconds since an epoch, but the
- epoch, precision, accuracy and drift are unspecified and may
- differ per event loop.
- """
- return self._time() / 1000
-
- def stop(self):
- """Stop running the event loop.
-
- Every callback already scheduled will still run. This simply informs
- run_forever to stop looping after a complete iteration.
- """
- self._call_soon_handle(
- new_MethodHandle1(
- self,
- "Loop._stop",
- <method1_t>self._stop,
- None,
- self,
- None))
-
- def run_forever(self):
- """Run the event loop until stop() is called."""
- self._check_closed()
- mode = uv.UV_RUN_DEFAULT
- if self._stopping:
- # loop.stop() was called right before loop.run_forever().
- # This is how asyncio loop behaves.
- mode = uv.UV_RUN_NOWAIT
- self._set_coroutine_debug(self._debug)
- old_agen_hooks = sys.get_asyncgen_hooks()
- sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
- finalizer=self._asyncgen_finalizer_hook)
- try:
- self._run(mode)
- finally:
- self._set_coroutine_debug(False)
- sys.set_asyncgen_hooks(*old_agen_hooks)
-
- def close(self):
- """Close the event loop.
-
- The event loop must not be running.
-
- This is idempotent and irreversible.
-
- No other methods should be called after this one.
- """
- self._close()
-
- def get_debug(self):
- return bool(self._debug)
-
- def set_debug(self, enabled):
- self._debug = bool(enabled)
- if self.is_running():
- self.call_soon_threadsafe(
- self._set_coroutine_debug, self, self._debug)
-
- def is_running(self):
- """Return whether the event loop is currently running."""
- return bool(self._running)
-
- def is_closed(self):
- """Returns True if the event loop was closed."""
- return bool(self._closed)
-
- def create_future(self):
- """Create a Future object attached to the loop."""
- return self._new_future()
-
- def create_task(self, coro, *, name=None, context=None):
- """Schedule a coroutine object.
-
- Return a task object.
-
- If name is not None, task.set_name(name) will be called if the task
- object has the set_name attribute, true for default Task in CPython.
-
- An optional keyword-only context argument allows specifying a custom
- contextvars.Context for the coro to run in. The current context copy is
- created when no context is provided.
- """
- self._check_closed()
- if PY311:
- if self._task_factory is None:
- task = aio_Task(coro, loop=self, context=context)
- else:
- task = self._task_factory(self, coro, context=context)
- else:
- if context is None:
- if self._task_factory is None:
- task = aio_Task(coro, loop=self)
- else:
- task = self._task_factory(self, coro)
- else:
- if self._task_factory is None:
- task = context.run(aio_Task, coro, self)
- else:
- task = context.run(self._task_factory, self, coro)
-
- # copied from asyncio.tasks._set_task_name (bpo-34270)
- if name is not None:
- try:
- set_name = task.set_name
- except AttributeError:
- pass
- else:
- set_name(name)
-
- return task
-
- def set_task_factory(self, factory):
- """Set a task factory that will be used by loop.create_task().
-
- If factory is None the default task factory will be set.
-
- If factory is a callable, it should have a signature matching
- '(loop, coro)', where 'loop' will be a reference to the active
- event loop, 'coro' will be a coroutine object. The callable
- must return a Future.
- """
- if factory is not None and not callable(factory):
- raise TypeError('task factory must be a callable or None')
- self._task_factory = factory
-
- def get_task_factory(self):
- """Return a task factory, or None if the default one is in use."""
- return self._task_factory
-
- def run_until_complete(self, future):
- """Run until the Future is done.
-
- If the argument is a coroutine, it is wrapped in a Task.
-
- WARNING: It would be disastrous to call run_until_complete()
- with the same coroutine twice -- it would wrap it in two
- different Tasks and that can't be good.
-
- Return the Future's result, or raise its exception.
- """
- self._check_closed()
-
- new_task = not isfuture(future)
- future = aio_ensure_future(future, loop=self)
- if new_task:
- # An exception is raised if the future didn't complete, so there
- # is no need to log the "destroy pending task" message
- future._log_destroy_pending = False
-
- def done_cb(fut):
- if not fut.cancelled():
- exc = fut.exception()
- if isinstance(exc, (SystemExit, KeyboardInterrupt)):
- # Issue #336: run_forever() already finished,
- # no need to stop it.
- return
- self.stop()
-
- future.add_done_callback(done_cb)
- try:
- self.run_forever()
- except BaseException:
- if new_task and future.done() and not future.cancelled():
- # The coroutine raised a BaseException. Consume the exception
- # to not log a warning, the caller doesn't have access to the
- # local task.
- future.exception()
- raise
- finally:
- future.remove_done_callback(done_cb)
- if not future.done():
- raise RuntimeError('Event loop stopped before Future completed.')
-
- return future.result()
-
- @cython.iterable_coroutine
- async def getaddrinfo(self, object host, object port, *,
- int family=0, int type=0, int proto=0, int flags=0):
-
- addr = __static_getaddrinfo_pyaddr(host, port, family,
- type, proto, flags)
- if addr is not None:
- return [addr]
-
- return await self._getaddrinfo(
- host, port, family, type, proto, flags, 1)
-
- @cython.iterable_coroutine
- async def getnameinfo(self, sockaddr, int flags=0):
- cdef:
- AddrInfo ai_cnt
- system.addrinfo *ai
- system.sockaddr_in6 *sin6
-
- if not isinstance(sockaddr, tuple):
- raise TypeError('getnameinfo() argument 1 must be a tuple')
-
- sl = len(sockaddr)
-
- if sl < 2 or sl > 4:
- raise ValueError('sockaddr must be a tuple of 2, 3 or 4 values')
-
- if sl > 2:
- flowinfo = sockaddr[2]
- if flowinfo < 0 or flowinfo > 0xfffff:
- raise OverflowError(
- 'getnameinfo(): flowinfo must be 0-1048575.')
- else:
- flowinfo = 0
-
- if sl > 3:
- scope_id = sockaddr[3]
- if scope_id < 0 or scope_id > 2 ** 32:
- raise OverflowError(
- 'getsockaddrarg: scope_id must be unsigned 32 bit integer')
- else:
- scope_id = 0
-
- ai_cnt = await self._getaddrinfo(
- sockaddr[0], sockaddr[1],
- uv.AF_UNSPEC, # family
- uv.SOCK_DGRAM, # type
- 0, # proto
- uv.AI_NUMERICHOST, # flags
- 0) # unpack
-
- ai = ai_cnt.data
-
- if ai.ai_next:
- raise OSError("sockaddr resolved to multiple addresses")
-
- if ai.ai_family == uv.AF_INET:
- if sl > 2:
- raise OSError("IPv4 sockaddr must be 2 tuple")
- elif ai.ai_family == uv.AF_INET6:
- # Modify some fields in `ai`
- sin6 = <system.sockaddr_in6*> ai.ai_addr
- sin6.sin6_flowinfo = system.htonl(flowinfo)
- sin6.sin6_scope_id = scope_id
-
- return await self._getnameinfo(ai.ai_addr, flags)
-
- @cython.iterable_coroutine
- async def start_tls(self, transport, protocol, sslcontext, *,
- server_side=False,
- server_hostname=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None):
- """Upgrade transport to TLS.
-
- Return a new transport that *protocol* should start using
- immediately.
- """
- if not isinstance(sslcontext, ssl_SSLContext):
- raise TypeError(
- f'sslcontext is expected to be an instance of ssl.SSLContext, '
- f'got {sslcontext!r}')
-
- if isinstance(transport, (TCPTransport, UnixTransport)):
- context = (<UVStream>transport).context
- elif isinstance(transport, _SSLProtocolTransport):
- context = (<_SSLProtocolTransport>transport).context
- else:
- raise TypeError(
- f'transport {transport!r} is not supported by start_tls()')
-
- waiter = self._new_future()
- ssl_protocol = SSLProtocol(
- self, protocol, sslcontext, waiter,
- server_side, server_hostname,
- ssl_handshake_timeout=ssl_handshake_timeout,
- ssl_shutdown_timeout=ssl_shutdown_timeout,
- call_connection_made=False)
-
- # Pause early so that "ssl_protocol.data_received()" doesn't
- # have a chance to get called before "ssl_protocol.connection_made()".
- transport.pause_reading()
-
- transport.set_protocol(ssl_protocol)
- conmade_cb = self.call_soon(ssl_protocol.connection_made, transport,
- context=context)
- # transport.resume_reading() will use the right context
- # (transport.context) to call e.g. data_received()
- resume_cb = self.call_soon(transport.resume_reading)
- app_transport = ssl_protocol._get_app_transport(context)
-
- try:
- await waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- app_transport.close()
- conmade_cb.cancel()
- resume_cb.cancel()
- raise
-
- return app_transport
-
- @cython.iterable_coroutine
- async def create_server(self, protocol_factory, host=None, port=None,
- *,
- int family=uv.AF_UNSPEC,
- int flags=uv.AI_PASSIVE,
- sock=None,
- backlog=100,
- ssl=None,
- reuse_address=None,
- reuse_port=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None,
- start_serving=True):
- """A coroutine which creates a TCP server bound to host and port.
-
- The return value is a Server object which can be used to stop
- the service.
-
- If host is an empty string or None all interfaces are assumed
- and a list of multiple sockets will be returned (most likely
- one for IPv4 and another one for IPv6). The host parameter can also be
- a sequence (e.g. list) of hosts to bind to.
-
- family can be set to either AF_INET or AF_INET6 to force the
- socket to use IPv4 or IPv6. If not set it will be determined
- from host (defaults to AF_UNSPEC).
-
- flags is a bitmask for getaddrinfo().
-
- sock can optionally be specified in order to use a preexisting
- socket object.
-
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
-
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
-
- reuse_address tells the kernel to reuse a local socket in
- TIME_WAIT state, without waiting for its natural timeout to
- expire. If not specified will automatically be set to True on
- UNIX.
-
- reuse_port tells the kernel to allow this endpoint to be bound to
- the same port as other existing endpoints are bound to, so long as
- they all set this flag when being created. This option is not
- supported on Windows.
-
- ssl_handshake_timeout is the time in seconds that an SSL server
- will wait for completion of the SSL handshake before aborting the
- connection. Default is 60s.
-
- ssl_shutdown_timeout is the time in seconds that an SSL server
- will wait for completion of the SSL shutdown before aborting the
- connection. Default is 30s.
- """
- cdef:
- TCPServer tcp
- system.addrinfo *addrinfo
- Server server
-
- if sock is not None and sock.family == uv.AF_UNIX:
- if host is not None or port is not None:
- raise ValueError(
- 'host/port and sock can not be specified at the same time')
- return await self.create_unix_server(
- protocol_factory, sock=sock, backlog=backlog, ssl=ssl,
- start_serving=start_serving)
-
- server = Server(self)
-
- if ssl is not None:
- if not isinstance(ssl, ssl_SSLContext):
- raise TypeError('ssl argument must be an SSLContext or None')
- else:
- if ssl_handshake_timeout is not None:
- raise ValueError(
- 'ssl_handshake_timeout is only meaningful with ssl')
- if ssl_shutdown_timeout is not None:
- raise ValueError(
- 'ssl_shutdown_timeout is only meaningful with ssl')
-
- if host is not None or port is not None:
- if sock is not None:
- raise ValueError(
- 'host/port and sock can not be specified at the same time')
-
- if reuse_address is None:
- reuse_address = os_name == 'posix' and sys_platform != 'cygwin'
- reuse_port = bool(reuse_port)
- if reuse_port and not has_SO_REUSEPORT:
- raise ValueError(
- 'reuse_port not supported by socket module')
-
- if host == '':
- hosts = [None]
- elif (isinstance(host, str) or not isinstance(host, col_Iterable)):
- hosts = [host]
- else:
- hosts = host
-
- fs = [self._getaddrinfo(host, port, family,
- uv.SOCK_STREAM, 0, flags,
- 0) for host in hosts]
-
- infos = await aio_gather(*fs)
-
- completed = False
- sock = None
- try:
- for info in infos:
- addrinfo = (<AddrInfo>info).data
- while addrinfo != NULL:
- if addrinfo.ai_family == uv.AF_UNSPEC:
- raise RuntimeError('AF_UNSPEC in DNS results')
-
- try:
- sock = socket_socket(addrinfo.ai_family,
- addrinfo.ai_socktype,
- addrinfo.ai_protocol)
- except socket_error:
- # Assume it's a bad family/type/protocol
- # combination.
- if self._debug:
- aio_logger.warning(
- 'create_server() failed to create '
- 'socket.socket(%r, %r, %r)',
- addrinfo.ai_family,
- addrinfo.ai_socktype,
- addrinfo.ai_protocol, exc_info=True)
- addrinfo = addrinfo.ai_next
- continue
-
- if reuse_address:
- sock.setsockopt(uv.SOL_SOCKET, uv.SO_REUSEADDR, 1)
- if reuse_port:
- sock.setsockopt(uv.SOL_SOCKET, uv.SO_REUSEPORT, 1)
- # Disable IPv4/IPv6 dual stack support (enabled by
- # default on Linux) which makes a single socket
- # listen on both address families.
- if (addrinfo.ai_family == uv.AF_INET6 and
- has_IPV6_V6ONLY):
- sock.setsockopt(uv.IPPROTO_IPV6, IPV6_V6ONLY, 1)
-
- pyaddr = __convert_sockaddr_to_pyaddr(addrinfo.ai_addr)
- try:
- sock.bind(pyaddr)
- except OSError as err:
- raise OSError(
- err.errno, 'error while attempting '
- 'to bind on address %r: %s'
- % (pyaddr, err.strerror.lower())) from None
-
- tcp = TCPServer.new(self, protocol_factory, server,
- uv.AF_UNSPEC, backlog,
- ssl, ssl_handshake_timeout,
- ssl_shutdown_timeout)
-
- try:
- tcp._open(sock.fileno())
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- tcp._close()
- raise
-
- server._add_server(tcp)
- sock.detach()
- sock = None
-
- addrinfo = addrinfo.ai_next
-
- completed = True
- finally:
- if not completed:
- if sock is not None:
- sock.close()
- server.close()
- else:
- if sock is None:
- raise ValueError('Neither host/port nor sock were specified')
- if not _is_sock_stream(sock.type):
- raise ValueError(
- 'A Stream Socket was expected, got {!r}'.format(sock))
-
- # libuv will set the socket to non-blocking mode, but
- # we want Python socket object to notice that.
- sock.setblocking(False)
-
- tcp = TCPServer.new(self, protocol_factory, server,
- uv.AF_UNSPEC, backlog,
- ssl, ssl_handshake_timeout,
- ssl_shutdown_timeout)
-
- try:
- tcp._open(sock.fileno())
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- tcp._close()
- raise
-
- tcp._attach_fileobj(sock)
- server._add_server(tcp)
-
- if start_serving:
- server._start_serving()
-
- server._ref()
- return server
-
- @cython.iterable_coroutine
- async def create_connection(self, protocol_factory, host=None, port=None,
- *,
- ssl=None,
- family=0, proto=0, flags=0, sock=None,
- local_addr=None, server_hostname=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None):
- """Connect to a TCP server.
-
- Create a streaming transport connection to a given Internet host and
- port: socket family AF_INET or socket.AF_INET6 depending on host (or
- family if specified), socket type SOCK_STREAM. protocol_factory must be
- a callable returning a protocol instance.
-
- This method is a coroutine which will try to establish the connection
- in the background. When successful, the coroutine returns a
- (transport, protocol) pair.
- """
- cdef:
- AddrInfo ai_local = None
- AddrInfo ai_remote
- TCPTransport tr
-
- system.addrinfo *rai = NULL
- system.addrinfo *lai = NULL
-
- system.addrinfo *rai_iter = NULL
- system.addrinfo *lai_iter = NULL
-
- system.addrinfo rai_static
- system.sockaddr_storage rai_addr_static
- system.addrinfo lai_static
- system.sockaddr_storage lai_addr_static
-
- object app_protocol
- object app_transport
- object protocol
- object ssl_waiter
-
- if sock is not None and sock.family == uv.AF_UNIX:
- if host is not None or port is not None:
- raise ValueError(
- 'host/port and sock can not be specified at the same time')
- return await self.create_unix_connection(
- protocol_factory, None,
- sock=sock, ssl=ssl, server_hostname=server_hostname)
-
- app_protocol = protocol = protocol_factory()
- ssl_waiter = None
- context = Context_CopyCurrent()
- if ssl:
- if server_hostname is None:
- if not host:
- raise ValueError('You must set server_hostname '
- 'when using ssl without a host')
- server_hostname = host
-
- ssl_waiter = self._new_future()
- sslcontext = None if isinstance(ssl, bool) else ssl
- protocol = SSLProtocol(
- self, app_protocol, sslcontext, ssl_waiter,
- False, server_hostname,
- ssl_handshake_timeout=ssl_handshake_timeout,
- ssl_shutdown_timeout=ssl_shutdown_timeout)
- else:
- if server_hostname is not None:
- raise ValueError('server_hostname is only meaningful with ssl')
- if ssl_handshake_timeout is not None:
- raise ValueError(
- 'ssl_handshake_timeout is only meaningful with ssl')
- if ssl_shutdown_timeout is not None:
- raise ValueError(
- 'ssl_shutdown_timeout is only meaningful with ssl')
-
- if host is not None or port is not None:
- if sock is not None:
- raise ValueError(
- 'host/port and sock can not be specified at the same time')
-
- fs = []
- f1 = f2 = None
-
- addr = __static_getaddrinfo(
- host, port, family, uv.SOCK_STREAM,
- proto, <system.sockaddr*>&rai_addr_static)
-
- if addr is None:
- f1 = self._getaddrinfo(
- host, port, family,
- uv.SOCK_STREAM, proto, flags,
- 0) # 0 == don't unpack
-
- fs.append(f1)
- else:
- rai_static.ai_addr = <system.sockaddr*>&rai_addr_static
- rai_static.ai_next = NULL
- rai = &rai_static
-
- if local_addr is not None:
- if not isinstance(local_addr, (tuple, list)) or \
- len(local_addr) != 2:
- raise ValueError(
- 'local_addr must be a tuple of host and port')
-
- addr = __static_getaddrinfo(
- local_addr[0], local_addr[1],
- family, uv.SOCK_STREAM,
- proto, <system.sockaddr*>&lai_addr_static)
- if addr is None:
- f2 = self._getaddrinfo(
- local_addr[0], local_addr[1], family,
- uv.SOCK_STREAM, proto, flags,
- 0) # 0 == don't unpack
-
- fs.append(f2)
- else:
- lai_static.ai_addr = <system.sockaddr*>&lai_addr_static
- lai_static.ai_next = NULL
- lai = &lai_static
-
- if len(fs):
- await aio_wait(fs)
-
- if rai is NULL:
- ai_remote = f1.result()
- if ai_remote.data is NULL:
- raise OSError('getaddrinfo() returned empty list')
- rai = ai_remote.data
-
- if lai is NULL and f2 is not None:
- ai_local = f2.result()
- if ai_local.data is NULL:
- raise OSError(
- 'getaddrinfo() returned empty list for local_addr')
- lai = ai_local.data
-
- exceptions = []
- rai_iter = rai
- while rai_iter is not NULL:
- tr = None
- try:
- waiter = self._new_future()
- tr = TCPTransport.new(self, protocol, None, waiter,
- context)
-
- if lai is not NULL:
- lai_iter = lai
- while lai_iter is not NULL:
- try:
- tr.bind(lai_iter.ai_addr)
- break
- except OSError as exc:
- exceptions.append(exc)
- lai_iter = lai_iter.ai_next
- else:
- tr._close()
- tr = None
-
- rai_iter = rai_iter.ai_next
- continue
-
- tr.connect(rai_iter.ai_addr)
- await waiter
-
- except OSError as exc:
- if tr is not None:
- tr._close()
- tr = None
- exceptions.append(exc)
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- if tr is not None:
- tr._close()
- tr = None
- raise
- else:
- break
-
- rai_iter = rai_iter.ai_next
-
- else:
- # If they all have the same str(), raise one.
- model = str(exceptions[0])
- if all(str(exc) == model for exc in exceptions):
- raise exceptions[0]
- # Raise a combined exception so the user can see all
- # the various error messages.
- raise OSError('Multiple exceptions: {}'.format(
- ', '.join(str(exc) for exc in exceptions)))
- else:
- if sock is None:
- raise ValueError(
- 'host and port was not specified and no sock specified')
- if not _is_sock_stream(sock.type):
- raise ValueError(
- 'A Stream Socket was expected, got {!r}'.format(sock))
-
- # libuv will set the socket to non-blocking mode, but
- # we want Python socket object to notice that.
- sock.setblocking(False)
-
- waiter = self._new_future()
- tr = TCPTransport.new(self, protocol, None, waiter, context)
- try:
- # libuv will make socket non-blocking
- tr._open(sock.fileno())
- tr._init_protocol()
- await waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- # It's OK to call `_close()` here, as opposed to
- # `_force_close()` or `close()` as we want to terminate the
- # transport immediately. The `waiter` can only be waken
- # up in `Transport._call_connection_made()`, and calling
- # `_close()` before it is fine.
- tr._close()
- raise
-
- tr._attach_fileobj(sock)
-
- if ssl:
- app_transport = protocol._get_app_transport(context)
- try:
- await ssl_waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- app_transport.close()
- raise
- return app_transport, app_protocol
- else:
- return tr, protocol
-
- @cython.iterable_coroutine
- async def create_unix_server(self, protocol_factory, path=None,
- *, backlog=100, sock=None, ssl=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None,
- start_serving=True):
- """A coroutine which creates a UNIX Domain Socket server.
-
- The return value is a Server object, which can be used to stop
- the service.
-
- path is a str, representing a file systsem path to bind the
- server socket to.
-
- sock can optionally be specified in order to use a preexisting
- socket object.
-
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
-
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
-
- ssl_handshake_timeout is the time in seconds that an SSL server
- will wait for completion of the SSL handshake before aborting the
- connection. Default is 60s.
-
- ssl_shutdown_timeout is the time in seconds that an SSL server
- will wait for completion of the SSL shutdown before aborting the
- connection. Default is 30s.
- """
- cdef:
- UnixServer pipe
- Server server = Server(self)
-
- if ssl is not None:
- if not isinstance(ssl, ssl_SSLContext):
- raise TypeError('ssl argument must be an SSLContext or None')
- else:
- if ssl_handshake_timeout is not None:
- raise ValueError(
- 'ssl_handshake_timeout is only meaningful with ssl')
- if ssl_shutdown_timeout is not None:
- raise ValueError(
- 'ssl_shutdown_timeout is only meaningful with ssl')
-
- if path is not None:
- if sock is not None:
- raise ValueError(
- 'path and sock can not be specified at the same time')
- orig_path = path
-
- path = os_fspath(path)
-
- if isinstance(path, str):
- path = PyUnicode_EncodeFSDefault(path)
-
- # Check for abstract socket.
- if path[0] != 0:
- try:
- if stat_S_ISSOCK(os_stat(path).st_mode):
- os_remove(path)
- except FileNotFoundError:
- pass
- except OSError as err:
- # Directory may have permissions only to create socket.
- aio_logger.error(
- 'Unable to check or remove stale UNIX socket %r: %r',
- orig_path, err)
-
- # We use Python sockets to create a UNIX server socket because
- # when UNIX sockets are created by libuv, libuv removes the path
- # they were bound to. This is different from asyncio, which
- # doesn't cleanup the socket path.
- sock = socket_socket(uv.AF_UNIX)
-
- try:
- sock.bind(path)
- except OSError as exc:
- sock.close()
- if exc.errno == errno.EADDRINUSE:
- # Let's improve the error message by adding
- # with what exact address it occurs.
- msg = 'Address {!r} is already in use'.format(orig_path)
- raise OSError(errno.EADDRINUSE, msg) from None
- else:
- raise
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- sock.close()
- raise
-
- else:
- if sock is None:
- raise ValueError(
- 'path was not specified, and no sock specified')
-
- if sock.family != uv.AF_UNIX or not _is_sock_stream(sock.type):
- raise ValueError(
- 'A UNIX Domain Stream Socket was expected, got {!r}'
- .format(sock))
-
- # libuv will set the socket to non-blocking mode, but
- # we want Python socket object to notice that.
- sock.setblocking(False)
-
- pipe = UnixServer.new(
- self, protocol_factory, server, backlog,
- ssl, ssl_handshake_timeout, ssl_shutdown_timeout)
-
- try:
- pipe._open(sock.fileno())
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- pipe._close()
- sock.close()
- raise
-
- pipe._attach_fileobj(sock)
- server._add_server(pipe)
-
- if start_serving:
- server._start_serving()
-
- return server
-
- @cython.iterable_coroutine
- async def create_unix_connection(self, protocol_factory, path=None, *,
- ssl=None, sock=None,
- server_hostname=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None):
-
- cdef:
- UnixTransport tr
- object app_protocol
- object app_transport
- object protocol
- object ssl_waiter
-
- app_protocol = protocol = protocol_factory()
- ssl_waiter = None
- context = Context_CopyCurrent()
- if ssl:
- if server_hostname is None:
- raise ValueError('You must set server_hostname '
- 'when using ssl without a host')
-
- ssl_waiter = self._new_future()
- sslcontext = None if isinstance(ssl, bool) else ssl
- protocol = SSLProtocol(
- self, app_protocol, sslcontext, ssl_waiter,
- False, server_hostname,
- ssl_handshake_timeout=ssl_handshake_timeout,
- ssl_shutdown_timeout=ssl_shutdown_timeout)
- else:
- if server_hostname is not None:
- raise ValueError('server_hostname is only meaningful with ssl')
- if ssl_handshake_timeout is not None:
- raise ValueError(
- 'ssl_handshake_timeout is only meaningful with ssl')
- if ssl_shutdown_timeout is not None:
- raise ValueError(
- 'ssl_shutdown_timeout is only meaningful with ssl')
-
- if path is not None:
- if sock is not None:
- raise ValueError(
- 'path and sock can not be specified at the same time')
-
- path = os_fspath(path)
-
- if isinstance(path, str):
- path = PyUnicode_EncodeFSDefault(path)
-
- waiter = self._new_future()
- tr = UnixTransport.new(self, protocol, None, waiter, context)
- tr.connect(path)
- try:
- await waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- tr._close()
- raise
-
- else:
- if sock is None:
- raise ValueError('no path and sock were specified')
-
- if sock.family != uv.AF_UNIX or not _is_sock_stream(sock.type):
- raise ValueError(
- 'A UNIX Domain Stream Socket was expected, got {!r}'
- .format(sock))
-
- # libuv will set the socket to non-blocking mode, but
- # we want Python socket object to notice that.
- sock.setblocking(False)
-
- waiter = self._new_future()
- tr = UnixTransport.new(self, protocol, None, waiter, context)
- try:
- tr._open(sock.fileno())
- tr._init_protocol()
- await waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- tr._close()
- raise
-
- tr._attach_fileobj(sock)
-
- if ssl:
- app_transport = protocol._get_app_transport(Context_CopyCurrent())
- try:
- await ssl_waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- app_transport.close()
- raise
- return app_transport, app_protocol
- else:
- return tr, protocol
-
- def default_exception_handler(self, context):
- """Default exception handler.
-
- This is called when an exception occurs and no exception
- handler is set, and can be called by a custom exception
- handler that wants to defer to the default behavior.
-
- The context parameter has the same meaning as in
- `call_exception_handler()`.
- """
- message = context.get('message')
- if not message:
- message = 'Unhandled exception in event loop'
-
- exception = context.get('exception')
- if exception is not None:
- exc_info = (type(exception), exception, exception.__traceback__)
- else:
- exc_info = False
-
- log_lines = [message]
- for key in sorted(context):
- if key in {'message', 'exception'}:
- continue
- value = context[key]
- if key == 'source_traceback':
- tb = ''.join(tb_format_list(value))
- value = 'Object created at (most recent call last):\n'
- value += tb.rstrip()
- else:
- try:
- value = repr(value)
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as ex:
- value = ('Exception in __repr__ {!r}; '
- 'value type: {!r}'.format(ex, type(value)))
- log_lines.append('{}: {}'.format(key, value))
-
- aio_logger.error('\n'.join(log_lines), exc_info=exc_info)
-
- def get_exception_handler(self):
- """Return an exception handler, or None if the default one is in use.
- """
- return self._exception_handler
-
- def set_exception_handler(self, handler):
- """Set handler as the new event loop exception handler.
-
- If handler is None, the default exception handler will
- be set.
-
- If handler is a callable object, it should have a
- signature matching '(loop, context)', where 'loop'
- will be a reference to the active event loop, 'context'
- will be a dict object (see `call_exception_handler()`
- documentation for details about context).
- """
- if handler is not None and not callable(handler):
- raise TypeError('A callable object or None is expected, '
- 'got {!r}'.format(handler))
- self._exception_handler = handler
-
- def call_exception_handler(self, context):
- """Call the current event loop's exception handler.
-
- The context argument is a dict containing the following keys:
-
- - 'message': Error message;
- - 'exception' (optional): Exception object;
- - 'future' (optional): Future instance;
- - 'handle' (optional): Handle instance;
- - 'protocol' (optional): Protocol instance;
- - 'transport' (optional): Transport instance;
- - 'socket' (optional): Socket instance.
-
- New keys maybe introduced in the future.
-
- Note: do not overload this method in an event loop subclass.
- For custom exception handling, use the
- `set_exception_handler()` method.
- """
- if UVLOOP_DEBUG:
- self._debug_exception_handler_cnt += 1
-
- if self._exception_handler is None:
- try:
- self.default_exception_handler(context)
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- # Second protection layer for unexpected errors
- # in the default implementation, as well as for subclassed
- # event loops with overloaded "default_exception_handler".
- aio_logger.error('Exception in default exception handler',
- exc_info=True)
- else:
- try:
- self._exception_handler(self, context)
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as exc:
- # Exception in the user set custom exception handler.
- try:
- # Let's try default handler.
- self.default_exception_handler({
- 'message': 'Unhandled error in exception handler',
- 'exception': exc,
- 'context': context,
- })
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- # Guard 'default_exception_handler' in case it is
- # overloaded.
- aio_logger.error('Exception in default exception handler '
- 'while handling an unexpected error '
- 'in custom exception handler',
- exc_info=True)
-
- def add_reader(self, fileobj, callback, *args):
- """Add a reader callback."""
- if len(args) == 0:
- args = None
- self._add_reader(fileobj, new_Handle(self, callback, args, None))
-
- def remove_reader(self, fileobj):
- """Remove a reader callback."""
- self._remove_reader(fileobj)
-
- def add_writer(self, fileobj, callback, *args):
- """Add a writer callback.."""
- if len(args) == 0:
- args = None
- self._add_writer(fileobj, new_Handle(self, callback, args, None))
-
- def remove_writer(self, fileobj):
- """Remove a writer callback."""
- self._remove_writer(fileobj)
-
- @cython.iterable_coroutine
- async def sock_recv(self, sock, n):
- """Receive data from the socket.
-
- The return value is a bytes object representing the data received.
- The maximum amount of data to be received at once is specified by
- nbytes.
-
- This method is a coroutine.
- """
- cdef:
- Handle handle
-
- if self._debug and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
-
- fut = _SyncSocketReaderFuture(sock, self)
- handle = new_MethodHandle3(
- self,
- "Loop._sock_recv",
- <method3_t>self._sock_recv,
- None,
- self,
- fut, sock, n)
-
- self._add_reader(sock, handle)
- return await fut
-
- @cython.iterable_coroutine
- async def sock_recv_into(self, sock, buf):
- """Receive data from the socket.
-
- The received data is written into *buf* (a writable buffer).
- The return value is the number of bytes written.
-
- This method is a coroutine.
- """
- cdef:
- Handle handle
-
- if self._debug and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
-
- fut = _SyncSocketReaderFuture(sock, self)
- handle = new_MethodHandle3(
- self,
- "Loop._sock_recv_into",
- <method3_t>self._sock_recv_into,
- None,
- self,
- fut, sock, buf)
-
- self._add_reader(sock, handle)
- return await fut
-
- @cython.iterable_coroutine
- async def sock_sendall(self, sock, data):
- """Send data to the socket.
-
- The socket must be connected to a remote socket. This method continues
- to send data from data until either all data has been sent or an
- error occurs. None is returned on success. On error, an exception is
- raised, and there is no way to determine how much data, if any, was
- successfully processed by the receiving end of the connection.
-
- This method is a coroutine.
- """
- cdef:
- Handle handle
- ssize_t n
-
- if self._debug and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
-
- if not data:
- return
-
- socket_inc_io_ref(sock)
- try:
- try:
- n = sock.send(data)
- except (BlockingIOError, InterruptedError):
- pass
- else:
- if UVLOOP_DEBUG:
- # This can be a partial success, i.e. only part
- # of the data was sent
- self._sock_try_write_total += 1
-
- if n == len(data):
- return
- if not isinstance(data, memoryview):
- data = memoryview(data)
- data = data[n:]
-
- fut = _SyncSocketWriterFuture(sock, self)
- handle = new_MethodHandle3(
- self,
- "Loop._sock_sendall",
- <method3_t>self._sock_sendall,
- None,
- self,
- fut, sock, data)
-
- self._add_writer(sock, handle)
- return await fut
- finally:
- socket_dec_io_ref(sock)
-
- @cython.iterable_coroutine
- async def sock_accept(self, sock):
- """Accept a connection.
-
- The socket must be bound to an address and listening for connections.
- The return value is a pair (conn, address) where conn is a new socket
- object usable to send and receive data on the connection, and address
- is the address bound to the socket on the other end of the connection.
-
- This method is a coroutine.
- """
- cdef:
- Handle handle
-
- if self._debug and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
-
- fut = _SyncSocketReaderFuture(sock, self)
- handle = new_MethodHandle2(
- self,
- "Loop._sock_accept",
- <method2_t>self._sock_accept,
- None,
- self,
- fut, sock)
-
- self._add_reader(sock, handle)
- return await fut
-
- @cython.iterable_coroutine
- async def sock_connect(self, sock, address):
- """Connect to a remote socket at address.
-
- This method is a coroutine.
- """
- if self._debug and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
-
- socket_inc_io_ref(sock)
- try:
- if sock.family == uv.AF_UNIX:
- fut = self._sock_connect(sock, address)
- else:
- addrs = await self.getaddrinfo(
- *address[:2], family=sock.family)
-
- _, _, _, _, address = addrs[0]
- fut = self._sock_connect(sock, address)
- if fut is not None:
- await fut
- finally:
- socket_dec_io_ref(sock)
-
- @cython.iterable_coroutine
- async def sock_recvfrom(self, sock, bufsize):
- raise NotImplementedError
-
- @cython.iterable_coroutine
- async def sock_recvfrom_into(self, sock, buf, nbytes=0):
- raise NotImplementedError
-
- @cython.iterable_coroutine
- async def sock_sendto(self, sock, data, address):
- raise NotImplementedError
-
- @cython.iterable_coroutine
- async def connect_accepted_socket(self, protocol_factory, sock, *,
- ssl=None,
- ssl_handshake_timeout=None,
- ssl_shutdown_timeout=None):
- """Handle an accepted connection.
-
- This is used by servers that accept connections outside of
- asyncio but that use asyncio to handle connections.
-
- This method is a coroutine. When completed, the coroutine
- returns a (transport, protocol) pair.
- """
-
- cdef:
- UVStream transport = None
-
- if ssl is not None:
- if not isinstance(ssl, ssl_SSLContext):
- raise TypeError('ssl argument must be an SSLContext or None')
- else:
- if ssl_handshake_timeout is not None:
- raise ValueError(
- 'ssl_handshake_timeout is only meaningful with ssl')
- if ssl_shutdown_timeout is not None:
- raise ValueError(
- 'ssl_shutdown_timeout is only meaningful with ssl')
-
- if not _is_sock_stream(sock.type):
- raise ValueError(
- 'A Stream Socket was expected, got {!r}'.format(sock))
-
- app_protocol = protocol_factory()
- waiter = self._new_future()
- transport_waiter = None
- context = Context_CopyCurrent()
-
- if ssl is None:
- protocol = app_protocol
- transport_waiter = waiter
- else:
- protocol = SSLProtocol(
- self, app_protocol, ssl, waiter,
- server_side=True,
- server_hostname=None,
- ssl_handshake_timeout=ssl_handshake_timeout,
- ssl_shutdown_timeout=ssl_shutdown_timeout)
- transport_waiter = None
-
- if sock.family == uv.AF_UNIX:
- transport = <UVStream>UnixTransport.new(
- self, protocol, None, transport_waiter, context)
- elif sock.family in (uv.AF_INET, uv.AF_INET6):
- transport = <UVStream>TCPTransport.new(
- self, protocol, None, transport_waiter, context)
-
- if transport is None:
- raise ValueError(
- 'invalid socket family, expected AF_UNIX, AF_INET or AF_INET6')
-
- transport._open(sock.fileno())
- transport._init_protocol()
- transport._attach_fileobj(sock)
-
- if ssl:
- app_transport = protocol._get_app_transport(context)
- try:
- await waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- app_transport.close()
- raise
- return app_transport, protocol
- else:
- try:
- await waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- transport._close()
- raise
- return transport, protocol
-
- def run_in_executor(self, executor, func, *args):
- if aio_iscoroutine(func) or aio_iscoroutinefunction(func):
- raise TypeError("coroutines cannot be used with run_in_executor()")
-
- self._check_closed()
-
- if executor is None:
- executor = self._default_executor
- # Only check when the default executor is being used
- self._check_default_executor()
- if executor is None:
- executor = cc_ThreadPoolExecutor()
- self._default_executor = executor
-
- return aio_wrap_future(executor.submit(func, *args), loop=self)
-
- def set_default_executor(self, executor):
- self._default_executor = executor
-
- @cython.iterable_coroutine
- async def __subprocess_run(self, protocol_factory, args,
- stdin=subprocess_PIPE,
- stdout=subprocess_PIPE,
- stderr=subprocess_PIPE,
- universal_newlines=False,
- shell=True,
- bufsize=0,
- preexec_fn=None,
- close_fds=None,
- cwd=None,
- env=None,
- startupinfo=None,
- creationflags=0,
- restore_signals=True,
- start_new_session=False,
- executable=None,
- pass_fds=(),
- # For tests only! Do not use in your code. Ever.
- __uvloop_sleep_after_fork=False):
-
- # TODO: Implement close_fds (might not be very important in
- # Python 3.5, since all FDs aren't inheritable by default.)
-
- cdef:
- int debug_flags = 0
-
- if universal_newlines:
- raise ValueError("universal_newlines must be False")
- if bufsize != 0:
- raise ValueError("bufsize must be 0")
- if startupinfo is not None:
- raise ValueError('startupinfo is not supported')
- if creationflags != 0:
- raise ValueError('creationflags is not supported')
-
- if executable is not None:
- args[0] = executable
-
- if __uvloop_sleep_after_fork:
- debug_flags |= __PROCESS_DEBUG_SLEEP_AFTER_FORK
-
- waiter = self._new_future()
- protocol = protocol_factory()
- proc = UVProcessTransport.new(self, protocol,
- args, env, cwd, start_new_session,
- stdin, stdout, stderr, pass_fds,
- waiter,
- debug_flags,
- preexec_fn,
- restore_signals)
-
- try:
- await waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- proc.close()
- raise
-
- return proc, protocol
-
- @cython.iterable_coroutine
- async def subprocess_shell(self, protocol_factory, cmd, *,
- shell=True,
- **kwargs):
-
- if not shell:
- raise ValueError("shell must be True")
-
- args = [cmd]
- if shell:
- args = [b'/bin/sh', b'-c'] + args
-
- return await self.__subprocess_run(protocol_factory, args, shell=True,
- **kwargs)
-
- @cython.iterable_coroutine
- async def subprocess_exec(self, protocol_factory, program, *args,
- shell=False, **kwargs):
-
- if shell:
- raise ValueError("shell must be False")
-
- args = list((program,) + args)
-
- return await self.__subprocess_run(protocol_factory, args, shell=False,
- **kwargs)
-
- @cython.iterable_coroutine
- async def connect_read_pipe(self, proto_factory, pipe):
- """Register read pipe in event loop. Set the pipe to non-blocking mode.
-
- protocol_factory should instantiate object with Protocol interface.
- pipe is a file-like object.
- Return pair (transport, protocol), where transport supports the
- ReadTransport interface."""
- cdef:
- ReadUnixTransport transp
-
- waiter = self._new_future()
- proto = proto_factory()
- transp = ReadUnixTransport.new(self, proto, None, waiter)
- transp._add_extra_info('pipe', pipe)
- try:
- transp._open(pipe.fileno())
- transp._init_protocol()
- await waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- transp._close()
- raise
- transp._attach_fileobj(pipe)
- return transp, proto
-
- @cython.iterable_coroutine
- async def connect_write_pipe(self, proto_factory, pipe):
- """Register write pipe in event loop.
-
- protocol_factory should instantiate object with BaseProtocol interface.
- Pipe is file-like object already switched to nonblocking.
- Return pair (transport, protocol), where transport support
- WriteTransport interface."""
- cdef:
- WriteUnixTransport transp
-
- waiter = self._new_future()
- proto = proto_factory()
- transp = WriteUnixTransport.new(self, proto, None, waiter)
- transp._add_extra_info('pipe', pipe)
- try:
- transp._open(pipe.fileno())
- transp._init_protocol()
- await waiter
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException:
- transp._close()
- raise
- transp._attach_fileobj(pipe)
- return transp, proto
-
- def add_signal_handler(self, sig, callback, *args):
- """Add a handler for a signal. UNIX only.
-
- Raise ValueError if the signal number is invalid or uncatchable.
- Raise RuntimeError if there is a problem setting up the handler.
- """
- cdef:
- Handle h
-
- if not self._is_main_thread():
- raise ValueError(
- 'add_signal_handler() can only be called from '
- 'the main thread')
-
- if (aio_iscoroutine(callback)
- or aio_iscoroutinefunction(callback)):
- raise TypeError(
- "coroutines cannot be used with add_signal_handler()")
-
- if sig == uv.SIGCHLD:
- if (hasattr(callback, '__self__') and
- isinstance(callback.__self__, aio_AbstractChildWatcher)):
-
- warnings_warn(
- "!!! asyncio is trying to install its ChildWatcher for "
- "SIGCHLD signal !!!\n\nThis is probably because a uvloop "
- "instance is used with asyncio.set_event_loop(). "
- "The correct way to use uvloop is to install its policy: "
- "`asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())`"
- "\n\n", RuntimeWarning, source=self)
-
- # TODO: ideally we should always raise an error here,
- # but that would be a backwards incompatible change,
- # because we recommended using "asyncio.set_event_loop()"
- # in our README. Need to start a deprecation period
- # at some point to turn this warning into an error.
- return
-
- raise RuntimeError(
- 'cannot add a signal handler for SIGCHLD: it is used '
- 'by the event loop to track subprocesses')
-
- self._check_signal(sig)
- self._check_closed()
-
- h = new_Handle(self, callback, args or None, None)
- self._signal_handlers[sig] = h
-
- try:
- # Register a dummy signal handler to ask Python to write the signal
- # number in the wakeup file descriptor.
- signal_signal(sig, self.__sighandler)
-
- # Set SA_RESTART to limit EINTR occurrences.
- signal_siginterrupt(sig, False)
- except OSError as exc:
- del self._signal_handlers[sig]
- if not self._signal_handlers:
- try:
- signal_set_wakeup_fd(-1)
- except (ValueError, OSError) as nexc:
- aio_logger.info('set_wakeup_fd(-1) failed: %s', nexc)
-
- if exc.errno == errno_EINVAL:
- raise RuntimeError('sig {} cannot be caught'.format(sig))
- else:
- raise
-
- def remove_signal_handler(self, sig):
- """Remove a handler for a signal. UNIX only.
-
- Return True if a signal handler was removed, False if not.
- """
-
- if not self._is_main_thread():
- raise ValueError(
- 'remove_signal_handler() can only be called from '
- 'the main thread')
-
- self._check_signal(sig)
-
- if not self._listening_signals:
- return False
-
- try:
- del self._signal_handlers[sig]
- except KeyError:
- return False
-
- if sig == uv.SIGINT:
- handler = signal_default_int_handler
- else:
- handler = signal_SIG_DFL
-
- try:
- signal_signal(sig, handler)
- except OSError as exc:
- if exc.errno == errno_EINVAL:
- raise RuntimeError('sig {} cannot be caught'.format(sig))
- else:
- raise
-
- return True
-
- @cython.iterable_coroutine
- async def create_datagram_endpoint(self, protocol_factory,
- local_addr=None, remote_addr=None, *,
- family=0, proto=0, flags=0,
- reuse_address=_unset, reuse_port=None,
- allow_broadcast=None, sock=None):
- """A coroutine which creates a datagram endpoint.
-
- This method will try to establish the endpoint in the background.
- When successful, the coroutine returns a (transport, protocol) pair.
-
- protocol_factory must be a callable returning a protocol instance.
-
- socket family AF_INET or socket.AF_INET6 depending on host (or
- family if specified), socket type SOCK_DGRAM.
-
- reuse_port tells the kernel to allow this endpoint to be bound to
- the same port as other existing endpoints are bound to, so long as
- they all set this flag when being created. This option is not
- supported on Windows and some UNIX's. If the
- :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
- capability is unsupported.
-
- allow_broadcast tells the kernel to allow this endpoint to send
- messages to the broadcast address.
-
- sock can optionally be specified in order to use a preexisting
- socket object.
- """
- cdef:
- UDPTransport udp = None
- system.addrinfo * lai
- system.addrinfo * rai
-
- if sock is not None:
- if not _is_sock_dgram(sock.type):
- raise ValueError(
- 'A UDP Socket was expected, got {!r}'.format(sock))
- if (local_addr or remote_addr or
- family or proto or flags or
- reuse_port or allow_broadcast):
- # show the problematic kwargs in exception msg
- opts = dict(local_addr=local_addr, remote_addr=remote_addr,
- family=family, proto=proto, flags=flags,
- reuse_address=reuse_address, reuse_port=reuse_port,
- allow_broadcast=allow_broadcast)
- problems = ', '.join(
- '{}={}'.format(k, v) for k, v in opts.items() if v)
- raise ValueError(
- 'socket modifier keyword arguments can not be used '
- 'when sock is specified. ({})'.format(problems))
- sock.setblocking(False)
- udp = UDPTransport.__new__(UDPTransport)
- udp._init(self, uv.AF_UNSPEC)
- udp.open(sock.family, sock.fileno())
- udp._attach_fileobj(sock)
- else:
- if reuse_address is not _unset:
- if reuse_address:
- raise ValueError("Passing `reuse_address=True` is no "
- "longer supported, as the usage of "
- "SO_REUSEPORT in UDP poses a significant "
- "security concern.")
- else:
- warnings_warn("The *reuse_address* parameter has been "
- "deprecated as of 0.15.", DeprecationWarning,
- stacklevel=2)
- reuse_port = bool(reuse_port)
- if reuse_port and not has_SO_REUSEPORT:
- raise ValueError(
- 'reuse_port not supported by socket module')
-
- lads = None
- if local_addr is not None:
- if (not isinstance(local_addr, (tuple, list)) or
- len(local_addr) != 2):
- raise TypeError(
- 'local_addr must be a tuple of (host, port)')
- lads = await self._getaddrinfo(
- local_addr[0], local_addr[1],
- family, uv.SOCK_DGRAM, proto, flags,
- 0)
-
- rads = None
- if remote_addr is not None:
- if (not isinstance(remote_addr, (tuple, list)) or
- len(remote_addr) != 2):
- raise TypeError(
- 'remote_addr must be a tuple of (host, port)')
- rads = await self._getaddrinfo(
- remote_addr[0], remote_addr[1],
- family, uv.SOCK_DGRAM, proto, flags,
- 0)
-
- excs = []
- if lads is None:
- if rads is not None:
- udp = UDPTransport.__new__(UDPTransport)
- rai = (<AddrInfo>rads).data
- udp._init(self, rai.ai_family)
- udp._connect(rai.ai_addr, rai.ai_addrlen)
- udp._set_address(rai)
- else:
- if family not in (uv.AF_INET, uv.AF_INET6):
- raise ValueError('unexpected address family')
- udp = UDPTransport.__new__(UDPTransport)
- udp._init(self, family)
-
- if reuse_port:
- self._sock_set_reuseport(udp._fileno())
-
- else:
- lai = (<AddrInfo>lads).data
- while lai is not NULL:
- try:
- udp = UDPTransport.__new__(UDPTransport)
- udp._init(self, lai.ai_family)
- if reuse_port:
- self._sock_set_reuseport(udp._fileno())
- udp._bind(lai.ai_addr)
- except (KeyboardInterrupt, SystemExit):
- raise
- except BaseException as ex:
- lai = lai.ai_next
- excs.append(ex)
- continue
- else:
- break
- else:
- ctx = None
- if len(excs):
- ctx = excs[0]
- raise OSError('could not bind to local_addr {}'.format(
- local_addr)) from ctx
-
- if rads is not None:
- rai = (<AddrInfo>rads).data
- while rai is not NULL:
- if rai.ai_family != lai.ai_family:
- rai = rai.ai_next
- continue
- if rai.ai_protocol != lai.ai_protocol:
- rai = rai.ai_next
- continue
- udp._connect(rai.ai_addr, rai.ai_addrlen)
- udp._set_address(rai)
- break
- else:
- raise OSError(
- 'could not bind to remote_addr {}'.format(
- remote_addr))
-
- if allow_broadcast:
- udp._set_broadcast(1)
-
- protocol = protocol_factory()
- waiter = self._new_future()
- assert udp is not None
- udp._set_protocol(protocol)
- udp._set_waiter(waiter)
- udp._init_protocol()
-
- await waiter
- return udp, protocol
-
- def _monitor_fs(self, path: str, callback) -> asyncio.Handle:
- cdef:
- UVFSEvent fs_handle
- char* c_str_path
-
- self._check_closed()
- fs_handle = UVFSEvent.new(self, callback, None)
- p_bytes = path.encode('UTF-8')
- c_str_path = p_bytes
- flags = 0
- fs_handle.start(c_str_path, flags)
- return fs_handle
-
- def _check_default_executor(self):
- if self._executor_shutdown_called:
- raise RuntimeError('Executor shutdown has been called')
-
- def _asyncgen_finalizer_hook(self, agen):
- self._asyncgens.discard(agen)
- if not self.is_closed():
- self.call_soon_threadsafe(self.create_task, agen.aclose())
-
- def _asyncgen_firstiter_hook(self, agen):
- if self._asyncgens_shutdown_called:
- warnings_warn(
- "asynchronous generator {!r} was scheduled after "
- "loop.shutdown_asyncgens() call".format(agen),
- ResourceWarning, source=self)
-
- self._asyncgens.add(agen)
-
- @cython.iterable_coroutine
- async def shutdown_asyncgens(self):
- """Shutdown all active asynchronous generators."""
- self._asyncgens_shutdown_called = True
-
- if not len(self._asyncgens):
- return
-
- closing_agens = list(self._asyncgens)
- self._asyncgens.clear()
-
- shutdown_coro = aio_gather(
- *[ag.aclose() for ag in closing_agens],
- return_exceptions=True)
-
- results = await shutdown_coro
- for result, agen in zip(results, closing_agens):
- if isinstance(result, Exception):
- self.call_exception_handler({
- 'message': 'an error occurred during closing of '
- 'asynchronous generator {!r}'.format(agen),
- 'exception': result,
- 'asyncgen': agen
- })
-
- @cython.iterable_coroutine
- async def shutdown_default_executor(self, timeout=None):
- """Schedule the shutdown of the default executor.
-
- The timeout parameter specifies the amount of time the executor will
- be given to finish joining. The default value is None, which means
- that the executor will be given an unlimited amount of time.
- """
- self._executor_shutdown_called = True
- if self._default_executor is None:
- return
- future = self.create_future()
- thread = threading_Thread(target=self._do_shutdown, args=(future,))
- thread.start()
- try:
- await future
- finally:
- thread.join(timeout)
-
- if thread.is_alive():
- warnings_warn(
- "The executor did not finishing joining "
- f"its threads within {timeout} seconds.",
- RuntimeWarning,
- stacklevel=2
- )
- self._default_executor.shutdown(wait=False)
-
- def _do_shutdown(self, future):
- try:
- self._default_executor.shutdown(wait=True)
- self.call_soon_threadsafe(future.set_result, None)
- except Exception as ex:
- self.call_soon_threadsafe(future.set_exception, ex)
-
-
-# Expose pointer for integration with other C-extensions
-def libuv_get_loop_t_ptr(loop):
- return PyCapsule_New(<void *>(<Loop>loop).uvloop, NULL, NULL)
-
-
-def libuv_get_version():
- return uv.uv_version()
-
-
-def _testhelper_unwrap_capsuled_pointer(obj):
- return <uint64_t>PyCapsule_GetPointer(obj, NULL)
-
-
-cdef void __loop_alloc_buffer(
- uv.uv_handle_t* uvhandle,
- size_t suggested_size,
- uv.uv_buf_t* buf
-) noexcept with gil:
- cdef:
- Loop loop = (<UVHandle>uvhandle.data)._loop
-
- if loop._recv_buffer_in_use == 1:
- buf.len = 0
- exc = RuntimeError('concurrent allocations')
- loop._handle_exception(exc)
- return
-
- loop._recv_buffer_in_use = 1
- buf.base = loop._recv_buffer
- buf.len = sizeof(loop._recv_buffer)
-
-
-cdef inline void __loop_free_buffer(Loop loop):
- loop._recv_buffer_in_use = 0
-
-
-class _SyncSocketReaderFuture(aio_Future):
-
- def __init__(self, sock, loop):
- aio_Future.__init__(self, loop=loop)
- self.__sock = sock
- self.__loop = loop
-
- def __remove_reader(self):
- if self.__sock is not None and self.__sock.fileno() != -1:
- self.__loop.remove_reader(self.__sock)
- self.__sock = None
-
- if PY39:
- def cancel(self, msg=None):
- self.__remove_reader()
- aio_Future.cancel(self, msg=msg)
-
- else:
- def cancel(self):
- self.__remove_reader()
- aio_Future.cancel(self)
-
-
-class _SyncSocketWriterFuture(aio_Future):
-
- def __init__(self, sock, loop):
- aio_Future.__init__(self, loop=loop)
- self.__sock = sock
- self.__loop = loop
-
- def __remove_writer(self):
- if self.__sock is not None and self.__sock.fileno() != -1:
- self.__loop.remove_writer(self.__sock)
- self.__sock = None
-
- if PY39:
- def cancel(self, msg=None):
- self.__remove_writer()
- aio_Future.cancel(self, msg=msg)
-
- else:
- def cancel(self):
- self.__remove_writer()
- aio_Future.cancel(self)
-
-
-include "cbhandles.pyx"
-include "pseudosock.pyx"
-include "lru.pyx"
-
-include "handles/handle.pyx"
-include "handles/async_.pyx"
-include "handles/idle.pyx"
-include "handles/check.pyx"
-include "handles/timer.pyx"
-include "handles/poll.pyx"
-include "handles/basetransport.pyx"
-include "handles/stream.pyx"
-include "handles/streamserver.pyx"
-include "handles/tcp.pyx"
-include "handles/pipe.pyx"
-include "handles/process.pyx"
-include "handles/fsevent.pyx"
-
-include "request.pyx"
-include "dns.pyx"
-include "sslproto.pyx"
-
-include "handles/udp.pyx"
-
-include "server.pyx"
-
-
-# Used in UVProcess
-cdef vint __atfork_installed = 0
-cdef vint __forking = 0
-cdef Loop __forking_loop = None
-
-
-cdef void __get_fork_handler() noexcept nogil:
- with gil:
- if (__forking and __forking_loop is not None and
- __forking_loop.active_process_handler is not None):
- __forking_loop.active_process_handler._after_fork()
-
-cdef __install_atfork():
- global __atfork_installed
-
- if __atfork_installed:
- return
- __atfork_installed = 1
-
- cdef int err
-
- err = system.pthread_atfork(NULL, NULL, &system.handleAtFork)
- if err:
- __atfork_installed = 0
- raise convert_error(-err)
-
-
-# Install PyMem* memory allocators
-cdef vint __mem_installed = 0
-cdef __install_pymem():
- global __mem_installed
- if __mem_installed:
- return
- __mem_installed = 1
-
- cdef int err
- err = uv.uv_replace_allocator(<uv.uv_malloc_func>PyMem_RawMalloc,
- <uv.uv_realloc_func>PyMem_RawRealloc,
- <uv.uv_calloc_func>PyMem_RawCalloc,
- <uv.uv_free_func>PyMem_RawFree)
- if err < 0:
- __mem_installed = 0
- raise convert_error(err)
-
-
-cdef _set_signal_wakeup_fd(fd):
- if fd >= 0:
- return signal_set_wakeup_fd(fd, warn_on_full_buffer=False)
- else:
- return signal_set_wakeup_fd(fd)
-
-
-# Helpers for tests
-
-@cython.iterable_coroutine
-async def _test_coroutine_1():
- return 42