@cython.no_gc_clear @cython.freelist(DEFAULT_FREELIST_SIZE) cdef class _UDPSendContext: # used to hold additional write request information for uv_write cdef: uv.uv_udp_send_t req uv.uv_buf_t uv_buf Py_buffer py_buf UDPTransport udp bint closed cdef close(self): if self.closed: return self.closed = 1 PyBuffer_Release(&self.py_buf) # void self.req.data = NULL self.uv_buf.base = NULL Py_DECREF(self) self.udp = None @staticmethod cdef _UDPSendContext new(UDPTransport udp, object data): cdef _UDPSendContext ctx ctx = _UDPSendContext.__new__(_UDPSendContext) ctx.udp = None ctx.closed = 1 ctx.req.data = ctx Py_INCREF(ctx) PyObject_GetBuffer(data, &ctx.py_buf, PyBUF_SIMPLE) ctx.uv_buf.base = ctx.py_buf.buf ctx.uv_buf.len = ctx.py_buf.len ctx.udp = udp ctx.closed = 0 return ctx def __dealloc__(self): if UVLOOP_DEBUG: if not self.closed: raise RuntimeError( 'open _UDPSendContext is being deallocated') self.udp = None @cython.no_gc_clear cdef class UDPTransport(UVBaseTransport): def __cinit__(self): self._family = uv.AF_UNSPEC self.__receiving = 0 self._address = None self.context = Context_CopyCurrent() cdef _init(self, Loop loop, unsigned int family): cdef int err self._start_init(loop) self._handle = PyMem_RawMalloc(sizeof(uv.uv_udp_t)) if self._handle is NULL: self._abort_init() raise MemoryError() err = uv.uv_udp_init_ex(loop.uvloop, self._handle, family) if err < 0: self._abort_init() raise convert_error(err) if family in (uv.AF_INET, uv.AF_INET6): self._family = family self._finish_init() cdef _set_address(self, system.addrinfo *addr): self._address = __convert_sockaddr_to_pyaddr(addr.ai_addr) cdef _connect(self, system.sockaddr* addr, size_t addr_len): cdef int err err = uv.uv_udp_connect(self._handle, addr) if err < 0: exc = convert_error(err) raise exc cdef open(self, int family, int sockfd): if family in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): self._family = family else: raise ValueError( 'cannot open a UDP handle, invalid family {}'.format(family)) cdef int err err = uv.uv_udp_open(self._handle, sockfd) if err < 0: exc = convert_error(err) raise exc cdef _bind(self, system.sockaddr* addr): cdef: int err int flags = 0 self._ensure_alive() err = uv.uv_udp_bind(self._handle, addr, flags) if err < 0: exc = convert_error(err) raise exc cdef _set_broadcast(self, bint on): cdef int err self._ensure_alive() err = uv.uv_udp_set_broadcast(self._handle, on) if err < 0: exc = convert_error(err) raise exc cdef size_t _get_write_buffer_size(self): if self._handle is NULL: return 0 return (self._handle).send_queue_size cdef bint _is_reading(self): return self.__receiving cdef _start_reading(self): cdef int err if self.__receiving: return self._ensure_alive() err = uv.uv_udp_recv_start(self._handle, __loop_alloc_buffer, __uv_udp_on_receive) if err < 0: exc = convert_error(err) self._fatal_error(exc, True) return else: # UDPTransport must live until the read callback is called self.__receiving_started() cdef _stop_reading(self): cdef int err if not self.__receiving: return self._ensure_alive() err = uv.uv_udp_recv_stop(self._handle) if err < 0: exc = convert_error(err) self._fatal_error(exc, True) return else: self.__receiving_stopped() cdef inline __receiving_started(self): if self.__receiving: return self.__receiving = 1 Py_INCREF(self) cdef inline __receiving_stopped(self): if not self.__receiving: return self.__receiving = 0 Py_DECREF(self) cdef _new_socket(self): if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): raise RuntimeError( 'UDPTransport.family is undefined; ' 'cannot create python socket') fileno = self._fileno() return PseudoSocket(self._family, uv.SOCK_DGRAM, 0, fileno) cdef _send(self, object data, object addr): cdef: _UDPSendContext ctx system.sockaddr_storage saddr_st system.sockaddr *saddr Py_buffer try_pybuf uv.uv_buf_t try_uvbuf self._ensure_alive() if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): raise RuntimeError('UDPTransport.family is undefined; cannot send') if addr is None: saddr = NULL else: try: __convert_pyaddr_to_sockaddr(self._family, addr, &saddr_st) except (ValueError, TypeError): raise except Exception: raise ValueError( f'{addr!r}: socket family mismatch or ' f'a DNS lookup is required') saddr = (&saddr_st) if self._get_write_buffer_size() == 0: PyObject_GetBuffer(data, &try_pybuf, PyBUF_SIMPLE) try_uvbuf.base = try_pybuf.buf try_uvbuf.len = try_pybuf.len err = uv.uv_udp_try_send(self._handle, &try_uvbuf, 1, saddr) PyBuffer_Release(&try_pybuf) else: err = uv.UV_EAGAIN if err == uv.UV_EAGAIN: ctx = _UDPSendContext.new(self, data) err = uv.uv_udp_send(&ctx.req, self._handle, &ctx.uv_buf, 1, saddr, __uv_udp_on_send) if err < 0: ctx.close() exc = convert_error(err) self._fatal_error(exc, True) else: self._maybe_pause_protocol() else: if err < 0: exc = convert_error(err) self._fatal_error(exc, True) else: self._on_sent(None, self.context.copy()) cdef _on_receive(self, bytes data, object exc, object addr): if exc is None: run_in_context2( self.context, self._protocol.datagram_received, data, addr, ) else: run_in_context1(self.context, self._protocol.error_received, exc) cdef _on_sent(self, object exc, object context=None): if exc is not None: if isinstance(exc, OSError): if context is None: context = self.context run_in_context1(context, self._protocol.error_received, exc) else: self._fatal_error( exc, False, 'Fatal write error on datagram transport') self._maybe_resume_protocol() if not self._get_write_buffer_size(): if self._closing: self._schedule_call_connection_lost(None) # === Public API === def sendto(self, data, addr=None): if not data: # Replicating asyncio logic here. return if self._address: if addr not in (None, self._address): # Replicating asyncio logic here. raise ValueError( 'Invalid address: must be None or %s' % (self._address,)) # Instead of setting addr to self._address below like what asyncio # does, we depend on previous uv_udp_connect() to set the address addr = None if self._conn_lost: # Replicating asyncio logic here. if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES: aio_logger.warning('socket.send() raised exception.') self._conn_lost += 1 return self._send(data, addr) cdef void __uv_udp_on_receive( uv.uv_udp_t* handle, ssize_t nread, const uv.uv_buf_t* buf, const system.sockaddr* addr, unsigned flags ) noexcept with gil: if __ensure_handle_data(handle, "UDPTransport receive callback") == 0: return cdef: UDPTransport udp = handle.data Loop loop = udp._loop bytes data object pyaddr # It's OK to free the buffer early, since nothing will # be able to touch it until this method is done. __loop_free_buffer(loop) if udp._closed: # The handle was closed, there is no reason to # do any work now. udp.__receiving_stopped() # Just in case. return if addr is NULL and nread == 0: # From libuv docs: # addr: struct sockaddr* containing the address # of the sender. Can be NULL. Valid for the duration # of the callback only. # [...] # The receive callback will be called with # nread == 0 and addr == NULL when there is # nothing to read, and with nread == 0 and # addr != NULL when an empty UDP packet is # received. return if addr is NULL: pyaddr = None elif addr.sa_family == uv.AF_UNSPEC: # https://github.com/MagicStack/uvloop/issues/304 if system.PLATFORM_IS_LINUX: pyaddr = None else: pyaddr = '' else: try: pyaddr = __convert_sockaddr_to_pyaddr(addr) except BaseException as exc: udp._error(exc, False) return if nread < 0: exc = convert_error(nread) udp._on_receive(None, exc, pyaddr) return if nread == 0: data = b'' else: data = loop._recv_buffer[:nread] try: udp._on_receive(data, None, pyaddr) except BaseException as exc: udp._error(exc, False) cdef void __uv_udp_on_send( uv.uv_udp_send_t* req, int status, ) noexcept with gil: if req.data is NULL: # Shouldn't happen as: # - _UDPSendContext does an extra INCREF in its 'init()' # - _UDPSendContext holds a ref to the relevant UDPTransport aio_logger.error( 'UVStream.write callback called with NULL req.data, status=%r', status) return cdef: _UDPSendContext ctx = <_UDPSendContext> req.data UDPTransport udp = ctx.udp ctx.close() if status < 0: exc = convert_error(status) print(exc) else: exc = None try: udp._on_sent(exc) except BaseException as exc: udp._error(exc, False)