diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/websockets/protocol.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/websockets/protocol.py | 708 |
1 files changed, 708 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/websockets/protocol.py b/venv/lib/python3.11/site-packages/websockets/protocol.py new file mode 100644 index 0000000..765e6b9 --- /dev/null +++ b/venv/lib/python3.11/site-packages/websockets/protocol.py @@ -0,0 +1,708 @@ +from __future__ import annotations + +import enum +import logging +import uuid +from typing import Generator, List, Optional, Type, Union + +from .exceptions import ( + ConnectionClosed, + ConnectionClosedError, + ConnectionClosedOK, + InvalidState, + PayloadTooBig, + ProtocolError, +) +from .extensions import Extension +from .frames import ( + OK_CLOSE_CODES, + OP_BINARY, + OP_CLOSE, + OP_CONT, + OP_PING, + OP_PONG, + OP_TEXT, + Close, + CloseCode, + Frame, +) +from .http11 import Request, Response +from .streams import StreamReader +from .typing import LoggerLike, Origin, Subprotocol + + +__all__ = [ + "Protocol", + "Side", + "State", + "SEND_EOF", +] + +Event = Union[Request, Response, Frame] +"""Events that :meth:`~Protocol.events_received` may return.""" + + +class Side(enum.IntEnum): + """A WebSocket connection is either a server or a client.""" + + SERVER, CLIENT = range(2) + + +SERVER = Side.SERVER +CLIENT = Side.CLIENT + + +class State(enum.IntEnum): + """A WebSocket connection is in one of these four states.""" + + CONNECTING, OPEN, CLOSING, CLOSED = range(4) + + +CONNECTING = State.CONNECTING +OPEN = State.OPEN +CLOSING = State.CLOSING +CLOSED = State.CLOSED + + +SEND_EOF = b"" +"""Sentinel signaling that the TCP connection must be half-closed.""" + + +class Protocol: + """ + Sans-I/O implementation of a WebSocket connection. + + Args: + side: :attr:`~Side.CLIENT` or :attr:`~Side.SERVER`. + state: initial state of the WebSocket connection. + max_size: maximum size of incoming messages in bytes; + :obj:`None` disables the limit. + logger: logger for this connection; depending on ``side``, + defaults to ``logging.getLogger("websockets.client")`` + or ``logging.getLogger("websockets.server")``; + see the :doc:`logging guide <../../topics/logging>` for details. + + """ + + def __init__( + self, + side: Side, + *, + state: State = OPEN, + max_size: Optional[int] = 2**20, + logger: Optional[LoggerLike] = None, + ) -> None: + # Unique identifier. For logs. + self.id: uuid.UUID = uuid.uuid4() + """Unique identifier of the connection. Useful in logs.""" + + # Logger or LoggerAdapter for this connection. + if logger is None: + logger = logging.getLogger(f"websockets.{side.name.lower()}") + self.logger: LoggerLike = logger + """Logger for this connection.""" + + # Track if DEBUG is enabled. Shortcut logging calls if it isn't. + self.debug = logger.isEnabledFor(logging.DEBUG) + + # Connection side. CLIENT or SERVER. + self.side = side + + # Connection state. Initially OPEN because subclasses handle CONNECTING. + self.state = state + + # Maximum size of incoming messages in bytes. + self.max_size = max_size + + # Current size of incoming message in bytes. Only set while reading a + # fragmented message i.e. a data frames with the FIN bit not set. + self.cur_size: Optional[int] = None + + # True while sending a fragmented message i.e. a data frames with the + # FIN bit not set. + self.expect_continuation_frame = False + + # WebSocket protocol parameters. + self.origin: Optional[Origin] = None + self.extensions: List[Extension] = [] + self.subprotocol: Optional[Subprotocol] = None + + # Close code and reason, set when a close frame is sent or received. + self.close_rcvd: Optional[Close] = None + self.close_sent: Optional[Close] = None + self.close_rcvd_then_sent: Optional[bool] = None + + # Track if an exception happened during the handshake. + self.handshake_exc: Optional[Exception] = None + """ + Exception to raise if the opening handshake failed. + + :obj:`None` if the opening handshake succeeded. + + """ + + # Track if send_eof() was called. + self.eof_sent = False + + # Parser state. + self.reader = StreamReader() + self.events: List[Event] = [] + self.writes: List[bytes] = [] + self.parser = self.parse() + next(self.parser) # start coroutine + self.parser_exc: Optional[Exception] = None + + @property + def state(self) -> State: + """ + WebSocket connection state. + + Defined in 4.1, 4.2, 7.1.3, and 7.1.4 of :rfc:`6455`. + + """ + return self._state + + @state.setter + def state(self, state: State) -> None: + if self.debug: + self.logger.debug("= connection is %s", state.name) + self._state = state + + @property + def close_code(self) -> Optional[int]: + """ + `WebSocket close code`_. + + .. _WebSocket close code: + https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5 + + :obj:`None` if the connection isn't closed yet. + + """ + if self.state is not CLOSED: + return None + elif self.close_rcvd is None: + return CloseCode.ABNORMAL_CLOSURE + else: + return self.close_rcvd.code + + @property + def close_reason(self) -> Optional[str]: + """ + `WebSocket close reason`_. + + .. _WebSocket close reason: + https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.6 + + :obj:`None` if the connection isn't closed yet. + + """ + if self.state is not CLOSED: + return None + elif self.close_rcvd is None: + return "" + else: + return self.close_rcvd.reason + + @property + def close_exc(self) -> ConnectionClosed: + """ + Exception to raise when trying to interact with a closed connection. + + Don't raise this exception while the connection :attr:`state` + is :attr:`~websockets.protocol.State.CLOSING`; wait until + it's :attr:`~websockets.protocol.State.CLOSED`. + + Indeed, the exception includes the close code and reason, which are + known only once the connection is closed. + + Raises: + AssertionError: if the connection isn't closed yet. + + """ + assert self.state is CLOSED, "connection isn't closed yet" + exc_type: Type[ConnectionClosed] + if ( + self.close_rcvd is not None + and self.close_sent is not None + and self.close_rcvd.code in OK_CLOSE_CODES + and self.close_sent.code in OK_CLOSE_CODES + ): + exc_type = ConnectionClosedOK + else: + exc_type = ConnectionClosedError + exc: ConnectionClosed = exc_type( + self.close_rcvd, + self.close_sent, + self.close_rcvd_then_sent, + ) + # Chain to the exception raised in the parser, if any. + exc.__cause__ = self.parser_exc + return exc + + # Public methods for receiving data. + + def receive_data(self, data: bytes) -> None: + """ + Receive data from the network. + + After calling this method: + + - You must call :meth:`data_to_send` and send this data to the network. + - You should call :meth:`events_received` and process resulting events. + + Raises: + EOFError: if :meth:`receive_eof` was called earlier. + + """ + self.reader.feed_data(data) + next(self.parser) + + def receive_eof(self) -> None: + """ + Receive the end of the data stream from the network. + + After calling this method: + + - You must call :meth:`data_to_send` and send this data to the network; + it will return ``[b""]``, signaling the end of the stream, or ``[]``. + - You aren't expected to call :meth:`events_received`; it won't return + any new events. + + Raises: + EOFError: if :meth:`receive_eof` was called earlier. + + """ + self.reader.feed_eof() + next(self.parser) + + # Public methods for sending events. + + def send_continuation(self, data: bytes, fin: bool) -> None: + """ + Send a `Continuation frame`_. + + .. _Continuation frame: + https://datatracker.ietf.org/doc/html/rfc6455#section-5.6 + + Parameters: + data: payload containing the same kind of data + as the initial frame. + fin: FIN bit; set it to :obj:`True` if this is the last frame + of a fragmented message and to :obj:`False` otherwise. + + Raises: + ProtocolError: if a fragmented message isn't in progress. + + """ + if not self.expect_continuation_frame: + raise ProtocolError("unexpected continuation frame") + self.expect_continuation_frame = not fin + self.send_frame(Frame(OP_CONT, data, fin)) + + def send_text(self, data: bytes, fin: bool = True) -> None: + """ + Send a `Text frame`_. + + .. _Text frame: + https://datatracker.ietf.org/doc/html/rfc6455#section-5.6 + + Parameters: + data: payload containing text encoded with UTF-8. + fin: FIN bit; set it to :obj:`False` if this is the first frame of + a fragmented message. + + Raises: + ProtocolError: if a fragmented message is in progress. + + """ + if self.expect_continuation_frame: + raise ProtocolError("expected a continuation frame") + self.expect_continuation_frame = not fin + self.send_frame(Frame(OP_TEXT, data, fin)) + + def send_binary(self, data: bytes, fin: bool = True) -> None: + """ + Send a `Binary frame`_. + + .. _Binary frame: + https://datatracker.ietf.org/doc/html/rfc6455#section-5.6 + + Parameters: + data: payload containing arbitrary binary data. + fin: FIN bit; set it to :obj:`False` if this is the first frame of + a fragmented message. + + Raises: + ProtocolError: if a fragmented message is in progress. + + """ + if self.expect_continuation_frame: + raise ProtocolError("expected a continuation frame") + self.expect_continuation_frame = not fin + self.send_frame(Frame(OP_BINARY, data, fin)) + + def send_close(self, code: Optional[int] = None, reason: str = "") -> None: + """ + Send a `Close frame`_. + + .. _Close frame: + https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.1 + + Parameters: + code: close code. + reason: close reason. + + Raises: + ProtocolError: if a fragmented message is being sent, if the code + isn't valid, or if a reason is provided without a code + + """ + if self.expect_continuation_frame: + raise ProtocolError("expected a continuation frame") + if code is None: + if reason != "": + raise ProtocolError("cannot send a reason without a code") + close = Close(CloseCode.NO_STATUS_RCVD, "") + data = b"" + else: + close = Close(code, reason) + data = close.serialize() + # send_frame() guarantees that self.state is OPEN at this point. + # 7.1.3. The WebSocket Closing Handshake is Started + self.send_frame(Frame(OP_CLOSE, data)) + self.close_sent = close + self.state = CLOSING + + def send_ping(self, data: bytes) -> None: + """ + Send a `Ping frame`_. + + .. _Ping frame: + https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2 + + Parameters: + data: payload containing arbitrary binary data. + + """ + self.send_frame(Frame(OP_PING, data)) + + def send_pong(self, data: bytes) -> None: + """ + Send a `Pong frame`_. + + .. _Pong frame: + https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3 + + Parameters: + data: payload containing arbitrary binary data. + + """ + self.send_frame(Frame(OP_PONG, data)) + + def fail(self, code: int, reason: str = "") -> None: + """ + `Fail the WebSocket connection`_. + + .. _Fail the WebSocket connection: + https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.7 + + Parameters: + code: close code + reason: close reason + + Raises: + ProtocolError: if the code isn't valid. + """ + # 7.1.7. Fail the WebSocket Connection + + # Send a close frame when the state is OPEN (a close frame was already + # sent if it's CLOSING), except when failing the connection because + # of an error reading from or writing to the network. + if self.state is OPEN: + if code != CloseCode.ABNORMAL_CLOSURE: + close = Close(code, reason) + data = close.serialize() + self.send_frame(Frame(OP_CLOSE, data)) + self.close_sent = close + self.state = CLOSING + + # When failing the connection, a server closes the TCP connection + # without waiting for the client to complete the handshake, while a + # client waits for the server to close the TCP connection, possibly + # after sending a close frame that the client will ignore. + if self.side is SERVER and not self.eof_sent: + self.send_eof() + + # 7.1.7. Fail the WebSocket Connection "An endpoint MUST NOT continue + # to attempt to process data(including a responding Close frame) from + # the remote endpoint after being instructed to _Fail the WebSocket + # Connection_." + self.parser = self.discard() + next(self.parser) # start coroutine + + # Public method for getting incoming events after receiving data. + + def events_received(self) -> List[Event]: + """ + Fetch events generated from data received from the network. + + Call this method immediately after any of the ``receive_*()`` methods. + + Process resulting events, likely by passing them to the application. + + Returns: + List[Event]: Events read from the connection. + """ + events, self.events = self.events, [] + return events + + # Public method for getting outgoing data after receiving data or sending events. + + def data_to_send(self) -> List[bytes]: + """ + Obtain data to send to the network. + + Call this method immediately after any of the ``receive_*()``, + ``send_*()``, or :meth:`fail` methods. + + Write resulting data to the connection. + + The empty bytestring :data:`~websockets.protocol.SEND_EOF` signals + the end of the data stream. When you receive it, half-close the TCP + connection. + + Returns: + List[bytes]: Data to write to the connection. + + """ + writes, self.writes = self.writes, [] + return writes + + def close_expected(self) -> bool: + """ + Tell if the TCP connection is expected to close soon. + + Call this method immediately after any of the ``receive_*()``, + ``send_close()``, or :meth:`fail` methods. + + If it returns :obj:`True`, schedule closing the TCP connection after a + short timeout if the other side hasn't already closed it. + + Returns: + bool: Whether the TCP connection is expected to close soon. + + """ + # We expect a TCP close if and only if we sent a close frame: + # * Normal closure: once we send a close frame, we expect a TCP close: + # server waits for client to complete the TCP closing handshake; + # client waits for server to initiate the TCP closing handshake. + # * Abnormal closure: we always send a close frame and the same logic + # applies, except on EOFError where we don't send a close frame + # because we already received the TCP close, so we don't expect it. + # We already got a TCP Close if and only if the state is CLOSED. + return self.state is CLOSING or self.handshake_exc is not None + + # Private methods for receiving data. + + def parse(self) -> Generator[None, None, None]: + """ + Parse incoming data into frames. + + :meth:`receive_data` and :meth:`receive_eof` run this generator + coroutine until it needs more data or reaches EOF. + + :meth:`parse` never raises an exception. Instead, it sets the + :attr:`parser_exc` and yields control. + + """ + try: + while True: + if (yield from self.reader.at_eof()): + if self.debug: + self.logger.debug("< EOF") + # If the WebSocket connection is closed cleanly, with a + # closing handhshake, recv_frame() substitutes parse() + # with discard(). This branch is reached only when the + # connection isn't closed cleanly. + raise EOFError("unexpected end of stream") + + if self.max_size is None: + max_size = None + elif self.cur_size is None: + max_size = self.max_size + else: + max_size = self.max_size - self.cur_size + + # During a normal closure, execution ends here on the next + # iteration of the loop after receiving a close frame. At + # this point, recv_frame() replaced parse() by discard(). + frame = yield from Frame.parse( + self.reader.read_exact, + mask=self.side is SERVER, + max_size=max_size, + extensions=self.extensions, + ) + + if self.debug: + self.logger.debug("< %s", frame) + + self.recv_frame(frame) + + except ProtocolError as exc: + self.fail(CloseCode.PROTOCOL_ERROR, str(exc)) + self.parser_exc = exc + + except EOFError as exc: + self.fail(CloseCode.ABNORMAL_CLOSURE, str(exc)) + self.parser_exc = exc + + except UnicodeDecodeError as exc: + self.fail(CloseCode.INVALID_DATA, f"{exc.reason} at position {exc.start}") + self.parser_exc = exc + + except PayloadTooBig as exc: + self.fail(CloseCode.MESSAGE_TOO_BIG, str(exc)) + self.parser_exc = exc + + except Exception as exc: + self.logger.error("parser failed", exc_info=True) + # Don't include exception details, which may be security-sensitive. + self.fail(CloseCode.INTERNAL_ERROR) + self.parser_exc = exc + + # During an abnormal closure, execution ends here after catching an + # exception. At this point, fail() replaced parse() by discard(). + yield + raise AssertionError("parse() shouldn't step after error") + + def discard(self) -> Generator[None, None, None]: + """ + Discard incoming data. + + This coroutine replaces :meth:`parse`: + + - after receiving a close frame, during a normal closure (1.4); + - after sending a close frame, during an abnormal closure (7.1.7). + + """ + # The server close the TCP connection in the same circumstances where + # discard() replaces parse(). The client closes the connection later, + # after the server closes the connection or a timeout elapses. + # (The latter case cannot be handled in this Sans-I/O layer.) + assert (self.side is SERVER) == (self.eof_sent) + while not (yield from self.reader.at_eof()): + self.reader.discard() + if self.debug: + self.logger.debug("< EOF") + # A server closes the TCP connection immediately, while a client + # waits for the server to close the TCP connection. + if self.side is CLIENT: + self.send_eof() + self.state = CLOSED + # If discard() completes normally, execution ends here. + yield + # Once the reader reaches EOF, its feed_data/eof() methods raise an + # error, so our receive_data/eof() methods don't step the generator. + raise AssertionError("discard() shouldn't step after EOF") + + def recv_frame(self, frame: Frame) -> None: + """ + Process an incoming frame. + + """ + if frame.opcode is OP_TEXT or frame.opcode is OP_BINARY: + if self.cur_size is not None: + raise ProtocolError("expected a continuation frame") + if frame.fin: + self.cur_size = None + else: + self.cur_size = len(frame.data) + + elif frame.opcode is OP_CONT: + if self.cur_size is None: + raise ProtocolError("unexpected continuation frame") + if frame.fin: + self.cur_size = None + else: + self.cur_size += len(frame.data) + + elif frame.opcode is OP_PING: + # 5.5.2. Ping: "Upon receipt of a Ping frame, an endpoint MUST + # send a Pong frame in response" + pong_frame = Frame(OP_PONG, frame.data) + self.send_frame(pong_frame) + + elif frame.opcode is OP_PONG: + # 5.5.3 Pong: "A response to an unsolicited Pong frame is not + # expected." + pass + + elif frame.opcode is OP_CLOSE: + # 7.1.5. The WebSocket Connection Close Code + # 7.1.6. The WebSocket Connection Close Reason + self.close_rcvd = Close.parse(frame.data) + if self.state is CLOSING: + assert self.close_sent is not None + self.close_rcvd_then_sent = False + + if self.cur_size is not None: + raise ProtocolError("incomplete fragmented message") + + # 5.5.1 Close: "If an endpoint receives a Close frame and did + # not previously send a Close frame, the endpoint MUST send a + # Close frame in response. (When sending a Close frame in + # response, the endpoint typically echos the status code it + # received.)" + + if self.state is OPEN: + # Echo the original data instead of re-serializing it with + # Close.serialize() because that fails when the close frame + # is empty and Close.parse() synthesizes a 1005 close code. + # The rest is identical to send_close(). + self.send_frame(Frame(OP_CLOSE, frame.data)) + self.close_sent = self.close_rcvd + self.close_rcvd_then_sent = True + self.state = CLOSING + + # 7.1.2. Start the WebSocket Closing Handshake: "Once an + # endpoint has both sent and received a Close control frame, + # that endpoint SHOULD _Close the WebSocket Connection_" + + # A server closes the TCP connection immediately, while a client + # waits for the server to close the TCP connection. + if self.side is SERVER: + self.send_eof() + + # 1.4. Closing Handshake: "after receiving a control frame + # indicating the connection should be closed, a peer discards + # any further data received." + self.parser = self.discard() + next(self.parser) # start coroutine + + else: + # This can't happen because Frame.parse() validates opcodes. + raise AssertionError(f"unexpected opcode: {frame.opcode:02x}") + + self.events.append(frame) + + # Private methods for sending events. + + def send_frame(self, frame: Frame) -> None: + if self.state is not OPEN: + raise InvalidState( + f"cannot write to a WebSocket in the {self.state.name} state" + ) + + if self.debug: + self.logger.debug("> %s", frame) + self.writes.append( + frame.serialize(mask=self.side is CLIENT, extensions=self.extensions) + ) + + def send_eof(self) -> None: + assert not self.eof_sent + self.eof_sent = True + if self.debug: + self.logger.debug("> EOF") + self.writes.append(SEND_EOF) |