summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx')
-rw-r--r--venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx409
1 files changed, 409 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx
new file mode 100644
index 0000000..bbe60d5
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/uvloop/handles/udp.pyx
@@ -0,0 +1,409 @@
+@cython.no_gc_clear
+@cython.freelist(DEFAULT_FREELIST_SIZE)
+cdef class _UDPSendContext:
+ # used to hold additional write request information for uv_write
+
+ cdef:
+ uv.uv_udp_send_t req
+
+ uv.uv_buf_t uv_buf
+ Py_buffer py_buf
+
+ UDPTransport udp
+
+ bint closed
+
+ cdef close(self):
+ if self.closed:
+ return
+
+ self.closed = 1
+ PyBuffer_Release(&self.py_buf) # void
+ self.req.data = NULL
+ self.uv_buf.base = NULL
+ Py_DECREF(self)
+ self.udp = None
+
+ @staticmethod
+ cdef _UDPSendContext new(UDPTransport udp, object data):
+ cdef _UDPSendContext ctx
+ ctx = _UDPSendContext.__new__(_UDPSendContext)
+ ctx.udp = None
+ ctx.closed = 1
+
+ ctx.req.data = <void*> ctx
+ Py_INCREF(ctx)
+
+ PyObject_GetBuffer(data, &ctx.py_buf, PyBUF_SIMPLE)
+ ctx.uv_buf.base = <char*>ctx.py_buf.buf
+ ctx.uv_buf.len = ctx.py_buf.len
+ ctx.udp = udp
+
+ ctx.closed = 0
+ return ctx
+
+ def __dealloc__(self):
+ if UVLOOP_DEBUG:
+ if not self.closed:
+ raise RuntimeError(
+ 'open _UDPSendContext is being deallocated')
+ self.udp = None
+
+
+@cython.no_gc_clear
+cdef class UDPTransport(UVBaseTransport):
+ def __cinit__(self):
+ self._family = uv.AF_UNSPEC
+ self.__receiving = 0
+ self._address = None
+ self.context = Context_CopyCurrent()
+
+ cdef _init(self, Loop loop, unsigned int family):
+ cdef int err
+
+ self._start_init(loop)
+
+ self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_udp_t))
+ if self._handle is NULL:
+ self._abort_init()
+ raise MemoryError()
+
+ err = uv.uv_udp_init_ex(loop.uvloop,
+ <uv.uv_udp_t*>self._handle,
+ family)
+ if err < 0:
+ self._abort_init()
+ raise convert_error(err)
+
+ if family in (uv.AF_INET, uv.AF_INET6):
+ self._family = family
+
+ self._finish_init()
+
+ cdef _set_address(self, system.addrinfo *addr):
+ self._address = __convert_sockaddr_to_pyaddr(addr.ai_addr)
+
+ cdef _connect(self, system.sockaddr* addr, size_t addr_len):
+ cdef int err
+ err = uv.uv_udp_connect(<uv.uv_udp_t*>self._handle, addr)
+ if err < 0:
+ exc = convert_error(err)
+ raise exc
+
+ cdef open(self, int family, int sockfd):
+ if family in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
+ self._family = family
+ else:
+ raise ValueError(
+ 'cannot open a UDP handle, invalid family {}'.format(family))
+
+ cdef int err
+ err = uv.uv_udp_open(<uv.uv_udp_t*>self._handle,
+ <uv.uv_os_sock_t>sockfd)
+
+ if err < 0:
+ exc = convert_error(err)
+ raise exc
+
+ cdef _bind(self, system.sockaddr* addr):
+ cdef:
+ int err
+ int flags = 0
+
+ self._ensure_alive()
+
+ err = uv.uv_udp_bind(<uv.uv_udp_t*>self._handle, addr, flags)
+ if err < 0:
+ exc = convert_error(err)
+ raise exc
+
+ cdef _set_broadcast(self, bint on):
+ cdef int err
+
+ self._ensure_alive()
+
+ err = uv.uv_udp_set_broadcast(<uv.uv_udp_t*>self._handle, on)
+ if err < 0:
+ exc = convert_error(err)
+ raise exc
+
+ cdef size_t _get_write_buffer_size(self):
+ if self._handle is NULL:
+ return 0
+ return (<uv.uv_udp_t*>self._handle).send_queue_size
+
+ cdef bint _is_reading(self):
+ return self.__receiving
+
+ cdef _start_reading(self):
+ cdef int err
+
+ if self.__receiving:
+ return
+
+ self._ensure_alive()
+
+ err = uv.uv_udp_recv_start(<uv.uv_udp_t*>self._handle,
+ __loop_alloc_buffer,
+ __uv_udp_on_receive)
+
+ if err < 0:
+ exc = convert_error(err)
+ self._fatal_error(exc, True)
+ return
+ else:
+ # UDPTransport must live until the read callback is called
+ self.__receiving_started()
+
+ cdef _stop_reading(self):
+ cdef int err
+
+ if not self.__receiving:
+ return
+
+ self._ensure_alive()
+
+ err = uv.uv_udp_recv_stop(<uv.uv_udp_t*>self._handle)
+ if err < 0:
+ exc = convert_error(err)
+ self._fatal_error(exc, True)
+ return
+ else:
+ self.__receiving_stopped()
+
+ cdef inline __receiving_started(self):
+ if self.__receiving:
+ return
+ self.__receiving = 1
+ Py_INCREF(self)
+
+ cdef inline __receiving_stopped(self):
+ if not self.__receiving:
+ return
+ self.__receiving = 0
+ Py_DECREF(self)
+
+ cdef _new_socket(self):
+ if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
+ raise RuntimeError(
+ 'UDPTransport.family is undefined; '
+ 'cannot create python socket')
+
+ fileno = self._fileno()
+ return PseudoSocket(self._family, uv.SOCK_DGRAM, 0, fileno)
+
+ cdef _send(self, object data, object addr):
+ cdef:
+ _UDPSendContext ctx
+ system.sockaddr_storage saddr_st
+ system.sockaddr *saddr
+ Py_buffer try_pybuf
+ uv.uv_buf_t try_uvbuf
+
+ self._ensure_alive()
+
+ if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
+ raise RuntimeError('UDPTransport.family is undefined; cannot send')
+
+ if addr is None:
+ saddr = NULL
+ else:
+ try:
+ __convert_pyaddr_to_sockaddr(self._family, addr,
+ <system.sockaddr*>&saddr_st)
+ except (ValueError, TypeError):
+ raise
+ except Exception:
+ raise ValueError(
+ f'{addr!r}: socket family mismatch or '
+ f'a DNS lookup is required')
+ saddr = <system.sockaddr*>(&saddr_st)
+
+ if self._get_write_buffer_size() == 0:
+ PyObject_GetBuffer(data, &try_pybuf, PyBUF_SIMPLE)
+ try_uvbuf.base = <char*>try_pybuf.buf
+ try_uvbuf.len = try_pybuf.len
+ err = uv.uv_udp_try_send(<uv.uv_udp_t*>self._handle,
+ &try_uvbuf,
+ 1,
+ saddr)
+ PyBuffer_Release(&try_pybuf)
+ else:
+ err = uv.UV_EAGAIN
+
+ if err == uv.UV_EAGAIN:
+ ctx = _UDPSendContext.new(self, data)
+ err = uv.uv_udp_send(&ctx.req,
+ <uv.uv_udp_t*>self._handle,
+ &ctx.uv_buf,
+ 1,
+ saddr,
+ __uv_udp_on_send)
+
+ if err < 0:
+ ctx.close()
+
+ exc = convert_error(err)
+ self._fatal_error(exc, True)
+ else:
+ self._maybe_pause_protocol()
+
+ else:
+ if err < 0:
+ exc = convert_error(err)
+ self._fatal_error(exc, True)
+ else:
+ self._on_sent(None, self.context.copy())
+
+ cdef _on_receive(self, bytes data, object exc, object addr):
+ if exc is None:
+ run_in_context2(
+ self.context, self._protocol.datagram_received, data, addr,
+ )
+ else:
+ run_in_context1(self.context, self._protocol.error_received, exc)
+
+ cdef _on_sent(self, object exc, object context=None):
+ if exc is not None:
+ if isinstance(exc, OSError):
+ if context is None:
+ context = self.context
+ run_in_context1(context, self._protocol.error_received, exc)
+ else:
+ self._fatal_error(
+ exc, False, 'Fatal write error on datagram transport')
+
+ self._maybe_resume_protocol()
+ if not self._get_write_buffer_size():
+ if self._closing:
+ self._schedule_call_connection_lost(None)
+
+ # === Public API ===
+
+ def sendto(self, data, addr=None):
+ if not data:
+ # Replicating asyncio logic here.
+ return
+
+ if self._address:
+ if addr not in (None, self._address):
+ # Replicating asyncio logic here.
+ raise ValueError(
+ 'Invalid address: must be None or %s' % (self._address,))
+
+ # Instead of setting addr to self._address below like what asyncio
+ # does, we depend on previous uv_udp_connect() to set the address
+ addr = None
+
+ if self._conn_lost:
+ # Replicating asyncio logic here.
+ if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES:
+ aio_logger.warning('socket.send() raised exception.')
+ self._conn_lost += 1
+ return
+
+ self._send(data, addr)
+
+
+cdef void __uv_udp_on_receive(
+ uv.uv_udp_t* handle,
+ ssize_t nread,
+ const uv.uv_buf_t* buf,
+ const system.sockaddr* addr,
+ unsigned flags
+) noexcept with gil:
+
+ if __ensure_handle_data(<uv.uv_handle_t*>handle,
+ "UDPTransport receive callback") == 0:
+ return
+
+ cdef:
+ UDPTransport udp = <UDPTransport>handle.data
+ Loop loop = udp._loop
+ bytes data
+ object pyaddr
+
+ # It's OK to free the buffer early, since nothing will
+ # be able to touch it until this method is done.
+ __loop_free_buffer(loop)
+
+ if udp._closed:
+ # The handle was closed, there is no reason to
+ # do any work now.
+ udp.__receiving_stopped() # Just in case.
+ return
+
+ if addr is NULL and nread == 0:
+ # From libuv docs:
+ # addr: struct sockaddr* containing the address
+ # of the sender. Can be NULL. Valid for the duration
+ # of the callback only.
+ # [...]
+ # The receive callback will be called with
+ # nread == 0 and addr == NULL when there is
+ # nothing to read, and with nread == 0 and
+ # addr != NULL when an empty UDP packet is
+ # received.
+ return
+
+ if addr is NULL:
+ pyaddr = None
+ elif addr.sa_family == uv.AF_UNSPEC:
+ # https://github.com/MagicStack/uvloop/issues/304
+ if system.PLATFORM_IS_LINUX:
+ pyaddr = None
+ else:
+ pyaddr = ''
+ else:
+ try:
+ pyaddr = __convert_sockaddr_to_pyaddr(addr)
+ except BaseException as exc:
+ udp._error(exc, False)
+ return
+
+ if nread < 0:
+ exc = convert_error(nread)
+ udp._on_receive(None, exc, pyaddr)
+ return
+
+ if nread == 0:
+ data = b''
+ else:
+ data = loop._recv_buffer[:nread]
+
+ try:
+ udp._on_receive(data, None, pyaddr)
+ except BaseException as exc:
+ udp._error(exc, False)
+
+
+cdef void __uv_udp_on_send(
+ uv.uv_udp_send_t* req,
+ int status,
+) noexcept with gil:
+
+ if req.data is NULL:
+ # Shouldn't happen as:
+ # - _UDPSendContext does an extra INCREF in its 'init()'
+ # - _UDPSendContext holds a ref to the relevant UDPTransport
+ aio_logger.error(
+ 'UVStream.write callback called with NULL req.data, status=%r',
+ status)
+ return
+
+ cdef:
+ _UDPSendContext ctx = <_UDPSendContext> req.data
+ UDPTransport udp = <UDPTransport>ctx.udp
+
+ ctx.close()
+
+ if status < 0:
+ exc = convert_error(status)
+ print(exc)
+ else:
+ exc = None
+
+ try:
+ udp._on_sent(exc)
+ except BaseException as exc:
+ udp._error(exc, False)