diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx')
-rw-r--r-- | venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx new file mode 100644 index 0000000..28b3079 --- /dev/null +++ b/venv/lib/python3.11/site-packages/uvloop/handles/basetransport.pyx @@ -0,0 +1,293 @@ +cdef class UVBaseTransport(UVSocketHandle): + + def __cinit__(self): + # Flow control + self._high_water = FLOW_CONTROL_HIGH_WATER * 1024 + self._low_water = FLOW_CONTROL_HIGH_WATER // 4 + + self._protocol = None + self._protocol_connected = 0 + self._protocol_paused = 0 + self._protocol_data_received = None + + self._server = None + self._waiter = None + self._extra_info = None + + self._conn_lost = 0 + + self._closing = 0 + + cdef size_t _get_write_buffer_size(self): + return 0 + + cdef inline _schedule_call_connection_made(self): + self._loop._call_soon_handle( + new_MethodHandle(self._loop, + "UVTransport._call_connection_made", + <method_t>self._call_connection_made, + self.context, + self)) + + cdef inline _schedule_call_connection_lost(self, exc): + self._loop._call_soon_handle( + new_MethodHandle1(self._loop, + "UVTransport._call_connection_lost", + <method1_t>self._call_connection_lost, + self.context, + self, exc)) + + cdef _fatal_error(self, exc, throw, reason=None): + # Overload UVHandle._fatal_error + + self._force_close(exc) + + if not isinstance(exc, OSError): + + if throw or self._loop is None: + raise exc + + msg = f'Fatal error on transport {self.__class__.__name__}' + if reason is not None: + msg = f'{msg} ({reason})' + + self._loop.call_exception_handler({ + 'message': msg, + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + cdef inline _maybe_pause_protocol(self): + cdef: + size_t size = self._get_write_buffer_size() + + if size <= self._high_water: + return + + if not self._protocol_paused: + self._protocol_paused = 1 + try: + # _maybe_pause_protocol() is always triggered from user-calls, + # so we must copy the context to avoid entering context twice + run_in_context( + self.context.copy(), self._protocol.pause_writing, + ) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.pause_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + cdef inline _maybe_resume_protocol(self): + cdef: + size_t size = self._get_write_buffer_size() + + if self._protocol_paused and size <= self._low_water: + self._protocol_paused = 0 + try: + # We're copying the context to avoid entering context twice, + # even though it's not always necessary to copy - it's easier + # to copy here than passing down a copied context. + run_in_context( + self.context.copy(), self._protocol.resume_writing, + ) + except (KeyboardInterrupt, SystemExit): + raise + except BaseException as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.resume_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + cdef _wakeup_waiter(self): + if self._waiter is not None: + if not self._waiter.cancelled(): + if not self._is_alive(): + self._waiter.set_exception( + RuntimeError( + 'closed Transport handle and unset waiter')) + else: + self._waiter.set_result(True) + self._waiter = None + + cdef _call_connection_made(self): + if self._protocol is None: + raise RuntimeError( + 'protocol is not set, cannot call connection_made()') + + # We use `_is_alive()` and not `_closing`, because we call + # `transport._close()` in `loop.create_connection()` if an + # exception happens during `await waiter`. + if not self._is_alive(): + # A connection waiter can be cancelled between + # 'await loop.create_connection()' and + # `_schedule_call_connection_made` and + # the actual `_call_connection_made`. + self._wakeup_waiter() + return + + # Set _protocol_connected to 1 before calling "connection_made": + # if transport is aborted or closed, "connection_lost" will + # still be scheduled. + self._protocol_connected = 1 + + try: + self._protocol.connection_made(self) + except BaseException: + self._wakeup_waiter() + raise + + if not self._is_alive(): + # This might happen when "transport.abort()" is called + # from "Protocol.connection_made". + self._wakeup_waiter() + return + + self._start_reading() + self._wakeup_waiter() + + cdef _call_connection_lost(self, exc): + if self._waiter is not None: + if not self._waiter.done(): + self._waiter.set_exception(exc) + self._waiter = None + + if self._closed: + # The handle is closed -- likely, _call_connection_lost + # was already called before. + return + + try: + if self._protocol_connected: + self._protocol.connection_lost(exc) + finally: + self._clear_protocol() + + self._close() + + server = self._server + if server is not None: + (<Server>server)._detach() + self._server = None + + cdef inline _set_server(self, Server server): + self._server = server + (<Server>server)._attach() + + cdef inline _set_waiter(self, object waiter): + if waiter is not None and not isfuture(waiter): + raise TypeError( + f'invalid waiter object {waiter!r}, expected asyncio.Future') + + self._waiter = waiter + + cdef _set_protocol(self, object protocol): + self._protocol = protocol + # Store a reference to the bound method directly + try: + self._protocol_data_received = protocol.data_received + except AttributeError: + pass + + cdef _clear_protocol(self): + self._protocol = None + self._protocol_data_received = None + + cdef inline _init_protocol(self): + self._loop._track_transport(self) + if self._protocol is None: + raise RuntimeError('invalid _init_protocol call') + self._schedule_call_connection_made() + + cdef inline _add_extra_info(self, str name, object obj): + if self._extra_info is None: + self._extra_info = {} + self._extra_info[name] = obj + + cdef bint _is_reading(self): + raise NotImplementedError + + cdef _start_reading(self): + raise NotImplementedError + + cdef _stop_reading(self): + raise NotImplementedError + + # === Public API === + + property _paused: + # Used by SSLProto. Might be removed in the future. + def __get__(self): + return bool(not self._is_reading()) + + def get_protocol(self): + return self._protocol + + def set_protocol(self, protocol): + self._set_protocol(protocol) + if self._is_reading(): + self._stop_reading() + self._start_reading() + + def _force_close(self, exc): + # Used by SSLProto. Might be removed in the future. + if self._conn_lost or self._closed: + return + if not self._closing: + self._closing = 1 + self._stop_reading() + self._conn_lost += 1 + self._schedule_call_connection_lost(exc) + + def abort(self): + self._force_close(None) + + def close(self): + if self._closing or self._closed: + return + + self._closing = 1 + self._stop_reading() + + if not self._get_write_buffer_size(): + # The write buffer is empty + self._conn_lost += 1 + self._schedule_call_connection_lost(None) + + def is_closing(self): + return self._closing + + def get_write_buffer_size(self): + return self._get_write_buffer_size() + + def set_write_buffer_limits(self, high=None, low=None): + self._ensure_alive() + + self._high_water, self._low_water = add_flowcontrol_defaults( + high, low, FLOW_CONTROL_HIGH_WATER) + + self._maybe_pause_protocol() + + def get_write_buffer_limits(self): + return (self._low_water, self._high_water) + + def get_extra_info(self, name, default=None): + if self._extra_info is not None and name in self._extra_info: + return self._extra_info[name] + if name == 'socket': + return self._get_socket() + if name == 'sockname': + return self._get_socket().getsockname() + if name == 'peername': + try: + return self._get_socket().getpeername() + except socket_error: + return default + return default |