diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/websockets/legacy')
20 files changed, 0 insertions, 4538 deletions
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__init__.py b/venv/lib/python3.11/site-packages/websockets/legacy/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__init__.py +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/__init__.cpython-311.pyc Binary files differdeleted file mode 100644 index 5384f2b..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/async_timeout.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/async_timeout.cpython-311.pyc Binary files differdeleted file mode 100644 index f3e18ca..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/async_timeout.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/auth.cpython-311.pyc Binary files differdeleted file mode 100644 index e572023..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/auth.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/client.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/client.cpython-311.pyc Binary files differdeleted file mode 100644 index 8ab650d..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/client.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/compatibility.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/compatibility.cpython-311.pyc Binary files differdeleted file mode 100644 index e65d9ec..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/compatibility.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/framing.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/framing.cpython-311.pyc Binary files differdeleted file mode 100644 index 47c4426..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/framing.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/handshake.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/handshake.cpython-311.pyc Binary files differdeleted file mode 100644 index 0db3ae7..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/handshake.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/http.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/http.cpython-311.pyc Binary files differdeleted file mode 100644 index 2199c73..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/http.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/protocol.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/protocol.cpython-311.pyc Binary files differdeleted file mode 100644 index 002cb1f..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/protocol.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/server.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/server.cpython-311.pyc Binary files differdeleted file mode 100644 index 6319b6e..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/server.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py b/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py deleted file mode 100644 index 8264094..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py +++ /dev/null @@ -1,265 +0,0 @@ -# From https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py -# Licensed under the Apache License (Apache-2.0) - -import asyncio -import enum -import sys -import warnings -from types import TracebackType -from typing import Optional, Type - - -# From https://github.com/python/typing_extensions/blob/main/src/typing_extensions.py -# Licensed under the Python Software Foundation License (PSF-2.0) - -if sys.version_info >= (3, 11): - from typing import final -else: - # @final exists in 3.8+, but we backport it for all versions - # before 3.11 to keep support for the __final__ attribute. - # See https://bugs.python.org/issue46342 - def final(f): - """This decorator can be used to indicate to type checkers that - the decorated method cannot be overridden, and decorated class - cannot be subclassed. For example: - - class Base: - @final - def done(self) -> None: - ... - class Sub(Base): - def done(self) -> None: # Error reported by type checker - ... - @final - class Leaf: - ... - class Other(Leaf): # Error reported by type checker - ... - - There is no runtime checking of these properties. The decorator - sets the ``__final__`` attribute to ``True`` on the decorated object - to allow runtime introspection. - """ - try: - f.__final__ = True - except (AttributeError, TypeError): - # Skip the attribute silently if it is not writable. - # AttributeError happens if the object has __slots__ or a - # read-only property, TypeError if it's a builtin class. - pass - return f - - -# End https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py - -__version__ = "4.0.2" - - -__all__ = ("timeout", "timeout_at", "Timeout") - - -def timeout(delay: Optional[float]) -> "Timeout": - """timeout context manager. - - Useful in cases when you want to apply timeout logic around block - of code or in cases when asyncio.wait_for is not suitable. For example: - - >>> async with timeout(0.001): - ... async with aiohttp.get('https://github.com') as r: - ... await r.text() - - - delay - value in seconds or None to disable timeout logic - """ - loop = asyncio.get_running_loop() - if delay is not None: - deadline = loop.time() + delay # type: Optional[float] - else: - deadline = None - return Timeout(deadline, loop) - - -def timeout_at(deadline: Optional[float]) -> "Timeout": - """Schedule the timeout at absolute time. - - deadline argument points on the time in the same clock system - as loop.time(). - - Please note: it is not POSIX time but a time with - undefined starting base, e.g. the time of the system power on. - - >>> async with timeout_at(loop.time() + 10): - ... async with aiohttp.get('https://github.com') as r: - ... await r.text() - - - """ - loop = asyncio.get_running_loop() - return Timeout(deadline, loop) - - -class _State(enum.Enum): - INIT = "INIT" - ENTER = "ENTER" - TIMEOUT = "TIMEOUT" - EXIT = "EXIT" - - -@final -class Timeout: - # Internal class, please don't instantiate it directly - # Use timeout() and timeout_at() public factories instead. - # - # Implementation note: `async with timeout()` is preferred - # over `with timeout()`. - # While technically the Timeout class implementation - # doesn't need to be async at all, - # the `async with` statement explicitly points that - # the context manager should be used from async function context. - # - # This design allows to avoid many silly misusages. - # - # TimeoutError is raised immediately when scheduled - # if the deadline is passed. - # The purpose is to time out as soon as possible - # without waiting for the next await expression. - - __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler") - - def __init__( - self, deadline: Optional[float], loop: asyncio.AbstractEventLoop - ) -> None: - self._loop = loop - self._state = _State.INIT - - self._timeout_handler = None # type: Optional[asyncio.Handle] - if deadline is None: - self._deadline = None # type: Optional[float] - else: - self.update(deadline) - - def __enter__(self) -> "Timeout": - warnings.warn( - "with timeout() is deprecated, use async with timeout() instead", - DeprecationWarning, - stacklevel=2, - ) - self._do_enter() - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> Optional[bool]: - self._do_exit(exc_type) - return None - - async def __aenter__(self) -> "Timeout": - self._do_enter() - return self - - async def __aexit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> Optional[bool]: - self._do_exit(exc_type) - return None - - @property - def expired(self) -> bool: - """Is timeout expired during execution?""" - return self._state == _State.TIMEOUT - - @property - def deadline(self) -> Optional[float]: - return self._deadline - - def reject(self) -> None: - """Reject scheduled timeout if any.""" - # cancel is maybe better name but - # task.cancel() raises CancelledError in asyncio world. - if self._state not in (_State.INIT, _State.ENTER): - raise RuntimeError(f"invalid state {self._state.value}") - self._reject() - - def _reject(self) -> None: - if self._timeout_handler is not None: - self._timeout_handler.cancel() - self._timeout_handler = None - - def shift(self, delay: float) -> None: - """Advance timeout on delay seconds. - - The delay can be negative. - - Raise RuntimeError if shift is called when deadline is not scheduled - """ - deadline = self._deadline - if deadline is None: - raise RuntimeError("cannot shift timeout if deadline is not scheduled") - self.update(deadline + delay) - - def update(self, deadline: float) -> None: - """Set deadline to absolute value. - - deadline argument points on the time in the same clock system - as loop.time(). - - If new deadline is in the past the timeout is raised immediately. - - Please note: it is not POSIX time but a time with - undefined starting base, e.g. the time of the system power on. - """ - if self._state == _State.EXIT: - raise RuntimeError("cannot reschedule after exit from context manager") - if self._state == _State.TIMEOUT: - raise RuntimeError("cannot reschedule expired timeout") - if self._timeout_handler is not None: - self._timeout_handler.cancel() - self._deadline = deadline - if self._state != _State.INIT: - self._reschedule() - - def _reschedule(self) -> None: - assert self._state == _State.ENTER - deadline = self._deadline - if deadline is None: - return - - now = self._loop.time() - if self._timeout_handler is not None: - self._timeout_handler.cancel() - - task = asyncio.current_task() - if deadline <= now: - self._timeout_handler = self._loop.call_soon(self._on_timeout, task) - else: - self._timeout_handler = self._loop.call_at(deadline, self._on_timeout, task) - - def _do_enter(self) -> None: - if self._state != _State.INIT: - raise RuntimeError(f"invalid state {self._state.value}") - self._state = _State.ENTER - self._reschedule() - - def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None: - if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT: - self._timeout_handler = None - raise asyncio.TimeoutError - # timeout has not expired - self._state = _State.EXIT - self._reject() - return None - - def _on_timeout(self, task: "asyncio.Task[None]") -> None: - task.cancel() - self._state = _State.TIMEOUT - # drop the reference early - self._timeout_handler = None - - -# End https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/auth.py b/venv/lib/python3.11/site-packages/websockets/legacy/auth.py deleted file mode 100644 index d342583..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/auth.py +++ /dev/null @@ -1,184 +0,0 @@ -from __future__ import annotations - -import functools -import hmac -import http -from typing import Any, Awaitable, Callable, Iterable, Optional, Tuple, Union, cast - -from ..datastructures import Headers -from ..exceptions import InvalidHeader -from ..headers import build_www_authenticate_basic, parse_authorization_basic -from .server import HTTPResponse, WebSocketServerProtocol - - -__all__ = ["BasicAuthWebSocketServerProtocol", "basic_auth_protocol_factory"] - -Credentials = Tuple[str, str] - - -def is_credentials(value: Any) -> bool: - try: - username, password = value - except (TypeError, ValueError): - return False - else: - return isinstance(username, str) and isinstance(password, str) - - -class BasicAuthWebSocketServerProtocol(WebSocketServerProtocol): - """ - WebSocket server protocol that enforces HTTP Basic Auth. - - """ - - realm: str = "" - """ - Scope of protection. - - If provided, it should contain only ASCII characters because the - encoding of non-ASCII characters is undefined. - """ - - username: Optional[str] = None - """Username of the authenticated user.""" - - def __init__( - self, - *args: Any, - realm: Optional[str] = None, - check_credentials: Optional[Callable[[str, str], Awaitable[bool]]] = None, - **kwargs: Any, - ) -> None: - if realm is not None: - self.realm = realm # shadow class attribute - self._check_credentials = check_credentials - super().__init__(*args, **kwargs) - - async def check_credentials(self, username: str, password: str) -> bool: - """ - Check whether credentials are authorized. - - This coroutine may be overridden in a subclass, for example to - authenticate against a database or an external service. - - Args: - username: HTTP Basic Auth username. - password: HTTP Basic Auth password. - - Returns: - bool: :obj:`True` if the handshake should continue; - :obj:`False` if it should fail with an HTTP 401 error. - - """ - if self._check_credentials is not None: - return await self._check_credentials(username, password) - - return False - - async def process_request( - self, - path: str, - request_headers: Headers, - ) -> Optional[HTTPResponse]: - """ - Check HTTP Basic Auth and return an HTTP 401 response if needed. - - """ - try: - authorization = request_headers["Authorization"] - except KeyError: - return ( - http.HTTPStatus.UNAUTHORIZED, - [("WWW-Authenticate", build_www_authenticate_basic(self.realm))], - b"Missing credentials\n", - ) - - try: - username, password = parse_authorization_basic(authorization) - except InvalidHeader: - return ( - http.HTTPStatus.UNAUTHORIZED, - [("WWW-Authenticate", build_www_authenticate_basic(self.realm))], - b"Unsupported credentials\n", - ) - - if not await self.check_credentials(username, password): - return ( - http.HTTPStatus.UNAUTHORIZED, - [("WWW-Authenticate", build_www_authenticate_basic(self.realm))], - b"Invalid credentials\n", - ) - - self.username = username - - return await super().process_request(path, request_headers) - - -def basic_auth_protocol_factory( - realm: Optional[str] = None, - credentials: Optional[Union[Credentials, Iterable[Credentials]]] = None, - check_credentials: Optional[Callable[[str, str], Awaitable[bool]]] = None, - create_protocol: Optional[Callable[..., BasicAuthWebSocketServerProtocol]] = None, -) -> Callable[..., BasicAuthWebSocketServerProtocol]: - """ - Protocol factory that enforces HTTP Basic Auth. - - :func:`basic_auth_protocol_factory` is designed to integrate with - :func:`~websockets.server.serve` like this:: - - websockets.serve( - ..., - create_protocol=websockets.basic_auth_protocol_factory( - realm="my dev server", - credentials=("hello", "iloveyou"), - ) - ) - - Args: - realm: Scope of protection. It should contain only ASCII characters - because the encoding of non-ASCII characters is undefined. - Refer to section 2.2 of :rfc:`7235` for details. - credentials: Hard coded authorized credentials. It can be a - ``(username, password)`` pair or a list of such pairs. - check_credentials: Coroutine that verifies credentials. - It receives ``username`` and ``password`` arguments - and returns a :class:`bool`. One of ``credentials`` or - ``check_credentials`` must be provided but not both. - create_protocol: Factory that creates the protocol. By default, this - is :class:`BasicAuthWebSocketServerProtocol`. It can be replaced - by a subclass. - Raises: - TypeError: If the ``credentials`` or ``check_credentials`` argument is - wrong. - - """ - if (credentials is None) == (check_credentials is None): - raise TypeError("provide either credentials or check_credentials") - - if credentials is not None: - if is_credentials(credentials): - credentials_list = [cast(Credentials, credentials)] - elif isinstance(credentials, Iterable): - credentials_list = list(credentials) - if not all(is_credentials(item) for item in credentials_list): - raise TypeError(f"invalid credentials argument: {credentials}") - else: - raise TypeError(f"invalid credentials argument: {credentials}") - - credentials_dict = dict(credentials_list) - - async def check_credentials(username: str, password: str) -> bool: - try: - expected_password = credentials_dict[username] - except KeyError: - return False - return hmac.compare_digest(expected_password, password) - - if create_protocol is None: - create_protocol = BasicAuthWebSocketServerProtocol - - return functools.partial( - create_protocol, - realm=realm, - check_credentials=check_credentials, - ) diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/client.py b/venv/lib/python3.11/site-packages/websockets/legacy/client.py deleted file mode 100644 index 4862252..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/client.py +++ /dev/null @@ -1,705 +0,0 @@ -from __future__ import annotations - -import asyncio -import functools -import logging -import random -import urllib.parse -import warnings -from types import TracebackType -from typing import ( - Any, - AsyncIterator, - Callable, - Generator, - List, - Optional, - Sequence, - Tuple, - Type, - cast, -) - -from ..datastructures import Headers, HeadersLike -from ..exceptions import ( - InvalidHandshake, - InvalidHeader, - InvalidMessage, - InvalidStatusCode, - NegotiationError, - RedirectHandshake, - SecurityError, -) -from ..extensions import ClientExtensionFactory, Extension -from ..extensions.permessage_deflate import enable_client_permessage_deflate -from ..headers import ( - build_authorization_basic, - build_extension, - build_host, - build_subprotocol, - parse_extension, - parse_subprotocol, - validate_subprotocols, -) -from ..http import USER_AGENT -from ..typing import ExtensionHeader, LoggerLike, Origin, Subprotocol -from ..uri import WebSocketURI, parse_uri -from .compatibility import asyncio_timeout -from .handshake import build_request, check_response -from .http import read_response -from .protocol import WebSocketCommonProtocol - - -__all__ = ["connect", "unix_connect", "WebSocketClientProtocol"] - - -class WebSocketClientProtocol(WebSocketCommonProtocol): - """ - WebSocket client connection. - - :class:`WebSocketClientProtocol` provides :meth:`recv` and :meth:`send` - coroutines for receiving and sending messages. - - It supports asynchronous iteration to receive incoming messages:: - - async for message in websocket: - await process(message) - - The iterator exits normally when the connection is closed with close code - 1000 (OK) or 1001 (going away) or without a close code. It raises - a :exc:`~websockets.exceptions.ConnectionClosedError` when the connection - is closed with any other code. - - See :func:`connect` for the documentation of ``logger``, ``origin``, - ``extensions``, ``subprotocols``, ``extra_headers``, and - ``user_agent_header``. - - See :class:`~websockets.legacy.protocol.WebSocketCommonProtocol` for the - documentation of ``ping_interval``, ``ping_timeout``, ``close_timeout``, - ``max_size``, ``max_queue``, ``read_limit``, and ``write_limit``. - - """ - - is_client = True - side = "client" - - def __init__( - self, - *, - logger: Optional[LoggerLike] = None, - origin: Optional[Origin] = None, - extensions: Optional[Sequence[ClientExtensionFactory]] = None, - subprotocols: Optional[Sequence[Subprotocol]] = None, - extra_headers: Optional[HeadersLike] = None, - user_agent_header: Optional[str] = USER_AGENT, - **kwargs: Any, - ) -> None: - if logger is None: - logger = logging.getLogger("websockets.client") - super().__init__(logger=logger, **kwargs) - self.origin = origin - self.available_extensions = extensions - self.available_subprotocols = subprotocols - self.extra_headers = extra_headers - self.user_agent_header = user_agent_header - - def write_http_request(self, path: str, headers: Headers) -> None: - """ - Write request line and headers to the HTTP request. - - """ - self.path = path - self.request_headers = headers - - if self.debug: - self.logger.debug("> GET %s HTTP/1.1", path) - for key, value in headers.raw_items(): - self.logger.debug("> %s: %s", key, value) - - # Since the path and headers only contain ASCII characters, - # we can keep this simple. - request = f"GET {path} HTTP/1.1\r\n" - request += str(headers) - - self.transport.write(request.encode()) - - async def read_http_response(self) -> Tuple[int, Headers]: - """ - Read status line and headers from the HTTP response. - - If the response contains a body, it may be read from ``self.reader`` - after this coroutine returns. - - Raises: - InvalidMessage: If the HTTP message is malformed or isn't an - HTTP/1.1 GET response. - - """ - try: - status_code, reason, headers = await read_response(self.reader) - except Exception as exc: - raise InvalidMessage("did not receive a valid HTTP response") from exc - - if self.debug: - self.logger.debug("< HTTP/1.1 %d %s", status_code, reason) - for key, value in headers.raw_items(): - self.logger.debug("< %s: %s", key, value) - - self.response_headers = headers - - return status_code, self.response_headers - - @staticmethod - def process_extensions( - headers: Headers, - available_extensions: Optional[Sequence[ClientExtensionFactory]], - ) -> List[Extension]: - """ - Handle the Sec-WebSocket-Extensions HTTP response header. - - Check that each extension is supported, as well as its parameters. - - Return the list of accepted extensions. - - Raise :exc:`~websockets.exceptions.InvalidHandshake` to abort the - connection. - - :rfc:`6455` leaves the rules up to the specification of each - :extension. - - To provide this level of flexibility, for each extension accepted by - the server, we check for a match with each extension available in the - client configuration. If no match is found, an exception is raised. - - If several variants of the same extension are accepted by the server, - it may be configured several times, which won't make sense in general. - Extensions must implement their own requirements. For this purpose, - the list of previously accepted extensions is provided. - - Other requirements, for example related to mandatory extensions or the - order of extensions, may be implemented by overriding this method. - - """ - accepted_extensions: List[Extension] = [] - - header_values = headers.get_all("Sec-WebSocket-Extensions") - - if header_values: - if available_extensions is None: - raise InvalidHandshake("no extensions supported") - - parsed_header_values: List[ExtensionHeader] = sum( - [parse_extension(header_value) for header_value in header_values], [] - ) - - for name, response_params in parsed_header_values: - for extension_factory in available_extensions: - # Skip non-matching extensions based on their name. - if extension_factory.name != name: - continue - - # Skip non-matching extensions based on their params. - try: - extension = extension_factory.process_response_params( - response_params, accepted_extensions - ) - except NegotiationError: - continue - - # Add matching extension to the final list. - accepted_extensions.append(extension) - - # Break out of the loop once we have a match. - break - - # If we didn't break from the loop, no extension in our list - # matched what the server sent. Fail the connection. - else: - raise NegotiationError( - f"Unsupported extension: " - f"name = {name}, params = {response_params}" - ) - - return accepted_extensions - - @staticmethod - def process_subprotocol( - headers: Headers, available_subprotocols: Optional[Sequence[Subprotocol]] - ) -> Optional[Subprotocol]: - """ - Handle the Sec-WebSocket-Protocol HTTP response header. - - Check that it contains exactly one supported subprotocol. - - Return the selected subprotocol. - - """ - subprotocol: Optional[Subprotocol] = None - - header_values = headers.get_all("Sec-WebSocket-Protocol") - - if header_values: - if available_subprotocols is None: - raise InvalidHandshake("no subprotocols supported") - - parsed_header_values: Sequence[Subprotocol] = sum( - [parse_subprotocol(header_value) for header_value in header_values], [] - ) - - if len(parsed_header_values) > 1: - subprotocols = ", ".join(parsed_header_values) - raise InvalidHandshake(f"multiple subprotocols: {subprotocols}") - - subprotocol = parsed_header_values[0] - - if subprotocol not in available_subprotocols: - raise NegotiationError(f"unsupported subprotocol: {subprotocol}") - - return subprotocol - - async def handshake( - self, - wsuri: WebSocketURI, - origin: Optional[Origin] = None, - available_extensions: Optional[Sequence[ClientExtensionFactory]] = None, - available_subprotocols: Optional[Sequence[Subprotocol]] = None, - extra_headers: Optional[HeadersLike] = None, - ) -> None: - """ - Perform the client side of the opening handshake. - - Args: - wsuri: URI of the WebSocket server. - origin: Value of the ``Origin`` header. - extensions: List of supported extensions, in order in which they - should be negotiated and run. - subprotocols: List of supported subprotocols, in order of decreasing - preference. - extra_headers: Arbitrary HTTP headers to add to the handshake request. - - Raises: - InvalidHandshake: If the handshake fails. - - """ - request_headers = Headers() - - request_headers["Host"] = build_host(wsuri.host, wsuri.port, wsuri.secure) - - if wsuri.user_info: - request_headers["Authorization"] = build_authorization_basic( - *wsuri.user_info - ) - - if origin is not None: - request_headers["Origin"] = origin - - key = build_request(request_headers) - - if available_extensions is not None: - extensions_header = build_extension( - [ - (extension_factory.name, extension_factory.get_request_params()) - for extension_factory in available_extensions - ] - ) - request_headers["Sec-WebSocket-Extensions"] = extensions_header - - if available_subprotocols is not None: - protocol_header = build_subprotocol(available_subprotocols) - request_headers["Sec-WebSocket-Protocol"] = protocol_header - - if self.extra_headers is not None: - request_headers.update(self.extra_headers) - - if self.user_agent_header is not None: - request_headers.setdefault("User-Agent", self.user_agent_header) - - self.write_http_request(wsuri.resource_name, request_headers) - - status_code, response_headers = await self.read_http_response() - if status_code in (301, 302, 303, 307, 308): - if "Location" not in response_headers: - raise InvalidHeader("Location") - raise RedirectHandshake(response_headers["Location"]) - elif status_code != 101: - raise InvalidStatusCode(status_code, response_headers) - - check_response(response_headers, key) - - self.extensions = self.process_extensions( - response_headers, available_extensions - ) - - self.subprotocol = self.process_subprotocol( - response_headers, available_subprotocols - ) - - self.connection_open() - - -class Connect: - """ - Connect to the WebSocket server at ``uri``. - - Awaiting :func:`connect` yields a :class:`WebSocketClientProtocol` which - can then be used to send and receive messages. - - :func:`connect` can be used as a asynchronous context manager:: - - async with websockets.connect(...) as websocket: - ... - - The connection is closed automatically when exiting the context. - - :func:`connect` can be used as an infinite asynchronous iterator to - reconnect automatically on errors:: - - async for websocket in websockets.connect(...): - try: - ... - except websockets.ConnectionClosed: - continue - - The connection is closed automatically after each iteration of the loop. - - If an error occurs while establishing the connection, :func:`connect` - retries with exponential backoff. The backoff delay starts at three - seconds and increases up to one minute. - - If an error occurs in the body of the loop, you can handle the exception - and :func:`connect` will reconnect with the next iteration; or you can - let the exception bubble up and break out of the loop. This lets you - decide which errors trigger a reconnection and which errors are fatal. - - Args: - uri: URI of the WebSocket server. - create_protocol: Factory for the :class:`asyncio.Protocol` managing - the connection. It defaults to :class:`WebSocketClientProtocol`. - Set it to a wrapper or a subclass to customize connection handling. - logger: Logger for this client. - It defaults to ``logging.getLogger("websockets.client")``. - See the :doc:`logging guide <../../topics/logging>` for details. - compression: The "permessage-deflate" extension is enabled by default. - Set ``compression`` to :obj:`None` to disable it. See the - :doc:`compression guide <../../topics/compression>` for details. - origin: Value of the ``Origin`` header, for servers that require it. - extensions: List of supported extensions, in order in which they - should be negotiated and run. - subprotocols: List of supported subprotocols, in order of decreasing - preference. - extra_headers: Arbitrary HTTP headers to add to the handshake request. - user_agent_header: Value of the ``User-Agent`` request header. - It defaults to ``"Python/x.y.z websockets/X.Y"``. - Setting it to :obj:`None` removes the header. - open_timeout: Timeout for opening the connection in seconds. - :obj:`None` disables the timeout. - - See :class:`~websockets.legacy.protocol.WebSocketCommonProtocol` for the - documentation of ``ping_interval``, ``ping_timeout``, ``close_timeout``, - ``max_size``, ``max_queue``, ``read_limit``, and ``write_limit``. - - Any other keyword arguments are passed the event loop's - :meth:`~asyncio.loop.create_connection` method. - - For example: - - * You can set ``ssl`` to a :class:`~ssl.SSLContext` to enforce TLS - settings. When connecting to a ``wss://`` URI, if ``ssl`` isn't - provided, a TLS context is created - with :func:`~ssl.create_default_context`. - - * You can set ``host`` and ``port`` to connect to a different host and - port from those found in ``uri``. This only changes the destination of - the TCP connection. The host name from ``uri`` is still used in the TLS - handshake for secure connections and in the ``Host`` header. - - Raises: - InvalidURI: If ``uri`` isn't a valid WebSocket URI. - OSError: If the TCP connection fails. - InvalidHandshake: If the opening handshake fails. - ~asyncio.TimeoutError: If the opening handshake times out. - - """ - - MAX_REDIRECTS_ALLOWED = 10 - - def __init__( - self, - uri: str, - *, - create_protocol: Optional[Callable[..., WebSocketClientProtocol]] = None, - logger: Optional[LoggerLike] = None, - compression: Optional[str] = "deflate", - origin: Optional[Origin] = None, - extensions: Optional[Sequence[ClientExtensionFactory]] = None, - subprotocols: Optional[Sequence[Subprotocol]] = None, - extra_headers: Optional[HeadersLike] = None, - user_agent_header: Optional[str] = USER_AGENT, - open_timeout: Optional[float] = 10, - ping_interval: Optional[float] = 20, - ping_timeout: Optional[float] = 20, - close_timeout: Optional[float] = None, - max_size: Optional[int] = 2**20, - max_queue: Optional[int] = 2**5, - read_limit: int = 2**16, - write_limit: int = 2**16, - **kwargs: Any, - ) -> None: - # Backwards compatibility: close_timeout used to be called timeout. - timeout: Optional[float] = kwargs.pop("timeout", None) - if timeout is None: - timeout = 10 - else: - warnings.warn("rename timeout to close_timeout", DeprecationWarning) - # If both are specified, timeout is ignored. - if close_timeout is None: - close_timeout = timeout - - # Backwards compatibility: create_protocol used to be called klass. - klass: Optional[Type[WebSocketClientProtocol]] = kwargs.pop("klass", None) - if klass is None: - klass = WebSocketClientProtocol - else: - warnings.warn("rename klass to create_protocol", DeprecationWarning) - # If both are specified, klass is ignored. - if create_protocol is None: - create_protocol = klass - - # Backwards compatibility: recv() used to return None on closed connections - legacy_recv: bool = kwargs.pop("legacy_recv", False) - - # Backwards compatibility: the loop parameter used to be supported. - _loop: Optional[asyncio.AbstractEventLoop] = kwargs.pop("loop", None) - if _loop is None: - loop = asyncio.get_event_loop() - else: - loop = _loop - warnings.warn("remove loop argument", DeprecationWarning) - - wsuri = parse_uri(uri) - if wsuri.secure: - kwargs.setdefault("ssl", True) - elif kwargs.get("ssl") is not None: - raise ValueError( - "connect() received a ssl argument for a ws:// URI, " - "use a wss:// URI to enable TLS" - ) - - if compression == "deflate": - extensions = enable_client_permessage_deflate(extensions) - elif compression is not None: - raise ValueError(f"unsupported compression: {compression}") - - if subprotocols is not None: - validate_subprotocols(subprotocols) - - factory = functools.partial( - create_protocol, - logger=logger, - origin=origin, - extensions=extensions, - subprotocols=subprotocols, - extra_headers=extra_headers, - user_agent_header=user_agent_header, - ping_interval=ping_interval, - ping_timeout=ping_timeout, - close_timeout=close_timeout, - max_size=max_size, - max_queue=max_queue, - read_limit=read_limit, - write_limit=write_limit, - host=wsuri.host, - port=wsuri.port, - secure=wsuri.secure, - legacy_recv=legacy_recv, - loop=_loop, - ) - - if kwargs.pop("unix", False): - path: Optional[str] = kwargs.pop("path", None) - create_connection = functools.partial( - loop.create_unix_connection, factory, path, **kwargs - ) - else: - host: Optional[str] - port: Optional[int] - if kwargs.get("sock") is None: - host, port = wsuri.host, wsuri.port - else: - # If sock is given, host and port shouldn't be specified. - host, port = None, None - if kwargs.get("ssl"): - kwargs.setdefault("server_hostname", wsuri.host) - # If host and port are given, override values from the URI. - host = kwargs.pop("host", host) - port = kwargs.pop("port", port) - create_connection = functools.partial( - loop.create_connection, factory, host, port, **kwargs - ) - - self.open_timeout = open_timeout - if logger is None: - logger = logging.getLogger("websockets.client") - self.logger = logger - - # This is a coroutine function. - self._create_connection = create_connection - self._uri = uri - self._wsuri = wsuri - - def handle_redirect(self, uri: str) -> None: - # Update the state of this instance to connect to a new URI. - old_uri = self._uri - old_wsuri = self._wsuri - new_uri = urllib.parse.urljoin(old_uri, uri) - new_wsuri = parse_uri(new_uri) - - # Forbid TLS downgrade. - if old_wsuri.secure and not new_wsuri.secure: - raise SecurityError("redirect from WSS to WS") - - same_origin = ( - old_wsuri.host == new_wsuri.host and old_wsuri.port == new_wsuri.port - ) - - # Rewrite the host and port arguments for cross-origin redirects. - # This preserves connection overrides with the host and port - # arguments if the redirect points to the same host and port. - if not same_origin: - # Replace the host and port argument passed to the protocol factory. - factory = self._create_connection.args[0] - factory = functools.partial( - factory.func, - *factory.args, - **dict(factory.keywords, host=new_wsuri.host, port=new_wsuri.port), - ) - # Replace the host and port argument passed to create_connection. - self._create_connection = functools.partial( - self._create_connection.func, - *(factory, new_wsuri.host, new_wsuri.port), - **self._create_connection.keywords, - ) - - # Set the new WebSocket URI. This suffices for same-origin redirects. - self._uri = new_uri - self._wsuri = new_wsuri - - # async for ... in connect(...): - - BACKOFF_MIN = 1.92 - BACKOFF_MAX = 60.0 - BACKOFF_FACTOR = 1.618 - BACKOFF_INITIAL = 5 - - async def __aiter__(self) -> AsyncIterator[WebSocketClientProtocol]: - backoff_delay = self.BACKOFF_MIN - while True: - try: - async with self as protocol: - yield protocol - except Exception: - # Add a random initial delay between 0 and 5 seconds. - # See 7.2.3. Recovering from Abnormal Closure in RFC 6544. - if backoff_delay == self.BACKOFF_MIN: - initial_delay = random.random() * self.BACKOFF_INITIAL - self.logger.info( - "! connect failed; reconnecting in %.1f seconds", - initial_delay, - exc_info=True, - ) - await asyncio.sleep(initial_delay) - else: - self.logger.info( - "! connect failed again; retrying in %d seconds", - int(backoff_delay), - exc_info=True, - ) - await asyncio.sleep(int(backoff_delay)) - # Increase delay with truncated exponential backoff. - backoff_delay = backoff_delay * self.BACKOFF_FACTOR - backoff_delay = min(backoff_delay, self.BACKOFF_MAX) - continue - else: - # Connection succeeded - reset backoff delay - backoff_delay = self.BACKOFF_MIN - - # async with connect(...) as ...: - - async def __aenter__(self) -> WebSocketClientProtocol: - return await self - - async def __aexit__( - self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> None: - await self.protocol.close() - - # ... = await connect(...) - - def __await__(self) -> Generator[Any, None, WebSocketClientProtocol]: - # Create a suitable iterator by calling __await__ on a coroutine. - return self.__await_impl_timeout__().__await__() - - async def __await_impl_timeout__(self) -> WebSocketClientProtocol: - async with asyncio_timeout(self.open_timeout): - return await self.__await_impl__() - - async def __await_impl__(self) -> WebSocketClientProtocol: - for redirects in range(self.MAX_REDIRECTS_ALLOWED): - _transport, _protocol = await self._create_connection() - protocol = cast(WebSocketClientProtocol, _protocol) - try: - await protocol.handshake( - self._wsuri, - origin=protocol.origin, - available_extensions=protocol.available_extensions, - available_subprotocols=protocol.available_subprotocols, - extra_headers=protocol.extra_headers, - ) - except RedirectHandshake as exc: - protocol.fail_connection() - await protocol.wait_closed() - self.handle_redirect(exc.uri) - # Avoid leaking a connected socket when the handshake fails. - except (Exception, asyncio.CancelledError): - protocol.fail_connection() - await protocol.wait_closed() - raise - else: - self.protocol = protocol - return protocol - else: - raise SecurityError("too many redirects") - - # ... = yield from connect(...) - remove when dropping Python < 3.10 - - __iter__ = __await__ - - -connect = Connect - - -def unix_connect( - path: Optional[str] = None, - uri: str = "ws://localhost/", - **kwargs: Any, -) -> Connect: - """ - Similar to :func:`connect`, but for connecting to a Unix socket. - - This function builds upon the event loop's - :meth:`~asyncio.loop.create_unix_connection` method. - - It is only available on Unix. - - It's mainly useful for debugging servers listening on Unix sockets. - - Args: - path: File system path to the Unix socket. - uri: URI of the WebSocket server; the host is used in the TLS - handshake for secure connections and in the ``Host`` header. - - """ - return connect(uri=uri, path=path, unix=True, **kwargs) diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/compatibility.py b/venv/lib/python3.11/site-packages/websockets/legacy/compatibility.py deleted file mode 100644 index 6bd01e7..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/compatibility.py +++ /dev/null @@ -1,12 +0,0 @@ -from __future__ import annotations - -import sys - - -__all__ = ["asyncio_timeout"] - - -if sys.version_info[:2] >= (3, 11): - from asyncio import timeout as asyncio_timeout # noqa: F401 -else: - from .async_timeout import timeout as asyncio_timeout # noqa: F401 diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/framing.py b/venv/lib/python3.11/site-packages/websockets/legacy/framing.py deleted file mode 100644 index b77b869..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/framing.py +++ /dev/null @@ -1,176 +0,0 @@ -from __future__ import annotations - -import struct -from typing import Any, Awaitable, Callable, NamedTuple, Optional, Sequence, Tuple - -from .. import extensions, frames -from ..exceptions import PayloadTooBig, ProtocolError - - -try: - from ..speedups import apply_mask -except ImportError: - from ..utils import apply_mask - - -class Frame(NamedTuple): - fin: bool - opcode: frames.Opcode - data: bytes - rsv1: bool = False - rsv2: bool = False - rsv3: bool = False - - @property - def new_frame(self) -> frames.Frame: - return frames.Frame( - self.opcode, - self.data, - self.fin, - self.rsv1, - self.rsv2, - self.rsv3, - ) - - def __str__(self) -> str: - return str(self.new_frame) - - def check(self) -> None: - return self.new_frame.check() - - @classmethod - async def read( - cls, - reader: Callable[[int], Awaitable[bytes]], - *, - mask: bool, - max_size: Optional[int] = None, - extensions: Optional[Sequence[extensions.Extension]] = None, - ) -> Frame: - """ - Read a WebSocket frame. - - Args: - reader: Coroutine that reads exactly the requested number of - bytes, unless the end of file is reached. - mask: Whether the frame should be masked i.e. whether the read - happens on the server side. - max_size: Maximum payload size in bytes. - extensions: List of extensions, applied in reverse order. - - Raises: - PayloadTooBig: If the frame exceeds ``max_size``. - ProtocolError: If the frame contains incorrect values. - - """ - - # Read the header. - data = await reader(2) - head1, head2 = struct.unpack("!BB", data) - - # While not Pythonic, this is marginally faster than calling bool(). - fin = True if head1 & 0b10000000 else False - rsv1 = True if head1 & 0b01000000 else False - rsv2 = True if head1 & 0b00100000 else False - rsv3 = True if head1 & 0b00010000 else False - - try: - opcode = frames.Opcode(head1 & 0b00001111) - except ValueError as exc: - raise ProtocolError("invalid opcode") from exc - - if (True if head2 & 0b10000000 else False) != mask: - raise ProtocolError("incorrect masking") - - length = head2 & 0b01111111 - if length == 126: - data = await reader(2) - (length,) = struct.unpack("!H", data) - elif length == 127: - data = await reader(8) - (length,) = struct.unpack("!Q", data) - if max_size is not None and length > max_size: - raise PayloadTooBig(f"over size limit ({length} > {max_size} bytes)") - if mask: - mask_bits = await reader(4) - - # Read the data. - data = await reader(length) - if mask: - data = apply_mask(data, mask_bits) - - new_frame = frames.Frame(opcode, data, fin, rsv1, rsv2, rsv3) - - if extensions is None: - extensions = [] - for extension in reversed(extensions): - new_frame = extension.decode(new_frame, max_size=max_size) - - new_frame.check() - - return cls( - new_frame.fin, - new_frame.opcode, - new_frame.data, - new_frame.rsv1, - new_frame.rsv2, - new_frame.rsv3, - ) - - def write( - self, - write: Callable[[bytes], Any], - *, - mask: bool, - extensions: Optional[Sequence[extensions.Extension]] = None, - ) -> None: - """ - Write a WebSocket frame. - - Args: - frame: Frame to write. - write: Function that writes bytes. - mask: Whether the frame should be masked i.e. whether the write - happens on the client side. - extensions: List of extensions, applied in order. - - Raises: - ProtocolError: If the frame contains incorrect values. - - """ - # The frame is written in a single call to write in order to prevent - # TCP fragmentation. See #68 for details. This also makes it safe to - # send frames concurrently from multiple coroutines. - write(self.new_frame.serialize(mask=mask, extensions=extensions)) - - -# Backwards compatibility with previously documented public APIs -from ..frames import ( # noqa: E402, F401, I001 - Close, - prepare_ctrl as encode_data, - prepare_data, -) - - -def parse_close(data: bytes) -> Tuple[int, str]: - """ - Parse the payload from a close frame. - - Returns: - Close code and reason. - - Raises: - ProtocolError: If data is ill-formed. - UnicodeDecodeError: If the reason isn't valid UTF-8. - - """ - close = Close.parse(data) - return close.code, close.reason - - -def serialize_close(code: int, reason: str) -> bytes: - """ - Serialize the payload for a close frame. - - """ - return Close(code, reason).serialize() diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/handshake.py b/venv/lib/python3.11/site-packages/websockets/legacy/handshake.py deleted file mode 100644 index ad8faf0..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/handshake.py +++ /dev/null @@ -1,165 +0,0 @@ -from __future__ import annotations - -import base64 -import binascii -from typing import List - -from ..datastructures import Headers, MultipleValuesError -from ..exceptions import InvalidHeader, InvalidHeaderValue, InvalidUpgrade -from ..headers import parse_connection, parse_upgrade -from ..typing import ConnectionOption, UpgradeProtocol -from ..utils import accept_key as accept, generate_key - - -__all__ = ["build_request", "check_request", "build_response", "check_response"] - - -def build_request(headers: Headers) -> str: - """ - Build a handshake request to send to the server. - - Update request headers passed in argument. - - Args: - headers: Handshake request headers. - - Returns: - str: ``key`` that must be passed to :func:`check_response`. - - """ - key = generate_key() - headers["Upgrade"] = "websocket" - headers["Connection"] = "Upgrade" - headers["Sec-WebSocket-Key"] = key - headers["Sec-WebSocket-Version"] = "13" - return key - - -def check_request(headers: Headers) -> str: - """ - Check a handshake request received from the client. - - This function doesn't verify that the request is an HTTP/1.1 or higher GET - request and doesn't perform ``Host`` and ``Origin`` checks. These controls - are usually performed earlier in the HTTP request handling code. They're - the responsibility of the caller. - - Args: - headers: Handshake request headers. - - Returns: - str: ``key`` that must be passed to :func:`build_response`. - - Raises: - InvalidHandshake: If the handshake request is invalid. - Then, the server must return a 400 Bad Request error. - - """ - connection: List[ConnectionOption] = sum( - [parse_connection(value) for value in headers.get_all("Connection")], [] - ) - - if not any(value.lower() == "upgrade" for value in connection): - raise InvalidUpgrade("Connection", ", ".join(connection)) - - upgrade: List[UpgradeProtocol] = sum( - [parse_upgrade(value) for value in headers.get_all("Upgrade")], [] - ) - - # For compatibility with non-strict implementations, ignore case when - # checking the Upgrade header. The RFC always uses "websocket", except - # in section 11.2. (IANA registration) where it uses "WebSocket". - if not (len(upgrade) == 1 and upgrade[0].lower() == "websocket"): - raise InvalidUpgrade("Upgrade", ", ".join(upgrade)) - - try: - s_w_key = headers["Sec-WebSocket-Key"] - except KeyError as exc: - raise InvalidHeader("Sec-WebSocket-Key") from exc - except MultipleValuesError as exc: - raise InvalidHeader( - "Sec-WebSocket-Key", "more than one Sec-WebSocket-Key header found" - ) from exc - - try: - raw_key = base64.b64decode(s_w_key.encode(), validate=True) - except binascii.Error as exc: - raise InvalidHeaderValue("Sec-WebSocket-Key", s_w_key) from exc - if len(raw_key) != 16: - raise InvalidHeaderValue("Sec-WebSocket-Key", s_w_key) - - try: - s_w_version = headers["Sec-WebSocket-Version"] - except KeyError as exc: - raise InvalidHeader("Sec-WebSocket-Version") from exc - except MultipleValuesError as exc: - raise InvalidHeader( - "Sec-WebSocket-Version", "more than one Sec-WebSocket-Version header found" - ) from exc - - if s_w_version != "13": - raise InvalidHeaderValue("Sec-WebSocket-Version", s_w_version) - - return s_w_key - - -def build_response(headers: Headers, key: str) -> None: - """ - Build a handshake response to send to the client. - - Update response headers passed in argument. - - Args: - headers: Handshake response headers. - key: Returned by :func:`check_request`. - - """ - headers["Upgrade"] = "websocket" - headers["Connection"] = "Upgrade" - headers["Sec-WebSocket-Accept"] = accept(key) - - -def check_response(headers: Headers, key: str) -> None: - """ - Check a handshake response received from the server. - - This function doesn't verify that the response is an HTTP/1.1 or higher - response with a 101 status code. These controls are the responsibility of - the caller. - - Args: - headers: Handshake response headers. - key: Returned by :func:`build_request`. - - Raises: - InvalidHandshake: If the handshake response is invalid. - - """ - connection: List[ConnectionOption] = sum( - [parse_connection(value) for value in headers.get_all("Connection")], [] - ) - - if not any(value.lower() == "upgrade" for value in connection): - raise InvalidUpgrade("Connection", " ".join(connection)) - - upgrade: List[UpgradeProtocol] = sum( - [parse_upgrade(value) for value in headers.get_all("Upgrade")], [] - ) - - # For compatibility with non-strict implementations, ignore case when - # checking the Upgrade header. The RFC always uses "websocket", except - # in section 11.2. (IANA registration) where it uses "WebSocket". - if not (len(upgrade) == 1 and upgrade[0].lower() == "websocket"): - raise InvalidUpgrade("Upgrade", ", ".join(upgrade)) - - try: - s_w_accept = headers["Sec-WebSocket-Accept"] - except KeyError as exc: - raise InvalidHeader("Sec-WebSocket-Accept") from exc - except MultipleValuesError as exc: - raise InvalidHeader( - "Sec-WebSocket-Accept", "more than one Sec-WebSocket-Accept header found" - ) from exc - - if s_w_accept != accept(key): - raise InvalidHeaderValue("Sec-WebSocket-Accept", s_w_accept) diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/http.py b/venv/lib/python3.11/site-packages/websockets/legacy/http.py deleted file mode 100644 index 2ac7f70..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/http.py +++ /dev/null @@ -1,201 +0,0 @@ -from __future__ import annotations - -import asyncio -import re -from typing import Tuple - -from ..datastructures import Headers -from ..exceptions import SecurityError - - -__all__ = ["read_request", "read_response"] - -MAX_HEADERS = 128 -MAX_LINE = 8192 - - -def d(value: bytes) -> str: - """ - Decode a bytestring for interpolating into an error message. - - """ - return value.decode(errors="backslashreplace") - - -# See https://www.rfc-editor.org/rfc/rfc7230.html#appendix-B. - -# Regex for validating header names. - -_token_re = re.compile(rb"[-!#$%&\'*+.^_`|~0-9a-zA-Z]+") - -# Regex for validating header values. - -# We don't attempt to support obsolete line folding. - -# Include HTAB (\x09), SP (\x20), VCHAR (\x21-\x7e), obs-text (\x80-\xff). - -# The ABNF is complicated because it attempts to express that optional -# whitespace is ignored. We strip whitespace and don't revalidate that. - -# See also https://www.rfc-editor.org/errata_search.php?rfc=7230&eid=4189 - -_value_re = re.compile(rb"[\x09\x20-\x7e\x80-\xff]*") - - -async def read_request(stream: asyncio.StreamReader) -> Tuple[str, Headers]: - """ - Read an HTTP/1.1 GET request and return ``(path, headers)``. - - ``path`` isn't URL-decoded or validated in any way. - - ``path`` and ``headers`` are expected to contain only ASCII characters. - Other characters are represented with surrogate escapes. - - :func:`read_request` doesn't attempt to read the request body because - WebSocket handshake requests don't have one. If the request contains a - body, it may be read from ``stream`` after this coroutine returns. - - Args: - stream: Input to read the request from. - - Raises: - EOFError: If the connection is closed without a full HTTP request. - SecurityError: If the request exceeds a security limit. - ValueError: If the request isn't well formatted. - - """ - # https://www.rfc-editor.org/rfc/rfc7230.html#section-3.1.1 - - # Parsing is simple because fixed values are expected for method and - # version and because path isn't checked. Since WebSocket software tends - # to implement HTTP/1.1 strictly, there's little need for lenient parsing. - - try: - request_line = await read_line(stream) - except EOFError as exc: - raise EOFError("connection closed while reading HTTP request line") from exc - - try: - method, raw_path, version = request_line.split(b" ", 2) - except ValueError: # not enough values to unpack (expected 3, got 1-2) - raise ValueError(f"invalid HTTP request line: {d(request_line)}") from None - - if method != b"GET": - raise ValueError(f"unsupported HTTP method: {d(method)}") - if version != b"HTTP/1.1": - raise ValueError(f"unsupported HTTP version: {d(version)}") - path = raw_path.decode("ascii", "surrogateescape") - - headers = await read_headers(stream) - - return path, headers - - -async def read_response(stream: asyncio.StreamReader) -> Tuple[int, str, Headers]: - """ - Read an HTTP/1.1 response and return ``(status_code, reason, headers)``. - - ``reason`` and ``headers`` are expected to contain only ASCII characters. - Other characters are represented with surrogate escapes. - - :func:`read_request` doesn't attempt to read the response body because - WebSocket handshake responses don't have one. If the response contains a - body, it may be read from ``stream`` after this coroutine returns. - - Args: - stream: Input to read the response from. - - Raises: - EOFError: If the connection is closed without a full HTTP response. - SecurityError: If the response exceeds a security limit. - ValueError: If the response isn't well formatted. - - """ - # https://www.rfc-editor.org/rfc/rfc7230.html#section-3.1.2 - - # As in read_request, parsing is simple because a fixed value is expected - # for version, status_code is a 3-digit number, and reason can be ignored. - - try: - status_line = await read_line(stream) - except EOFError as exc: - raise EOFError("connection closed while reading HTTP status line") from exc - - try: - version, raw_status_code, raw_reason = status_line.split(b" ", 2) - except ValueError: # not enough values to unpack (expected 3, got 1-2) - raise ValueError(f"invalid HTTP status line: {d(status_line)}") from None - - if version != b"HTTP/1.1": - raise ValueError(f"unsupported HTTP version: {d(version)}") - try: - status_code = int(raw_status_code) - except ValueError: # invalid literal for int() with base 10 - raise ValueError(f"invalid HTTP status code: {d(raw_status_code)}") from None - if not 100 <= status_code < 1000: - raise ValueError(f"unsupported HTTP status code: {d(raw_status_code)}") - if not _value_re.fullmatch(raw_reason): - raise ValueError(f"invalid HTTP reason phrase: {d(raw_reason)}") - reason = raw_reason.decode() - - headers = await read_headers(stream) - - return status_code, reason, headers - - -async def read_headers(stream: asyncio.StreamReader) -> Headers: - """ - Read HTTP headers from ``stream``. - - Non-ASCII characters are represented with surrogate escapes. - - """ - # https://www.rfc-editor.org/rfc/rfc7230.html#section-3.2 - - # We don't attempt to support obsolete line folding. - - headers = Headers() - for _ in range(MAX_HEADERS + 1): - try: - line = await read_line(stream) - except EOFError as exc: - raise EOFError("connection closed while reading HTTP headers") from exc - if line == b"": - break - - try: - raw_name, raw_value = line.split(b":", 1) - except ValueError: # not enough values to unpack (expected 2, got 1) - raise ValueError(f"invalid HTTP header line: {d(line)}") from None - if not _token_re.fullmatch(raw_name): - raise ValueError(f"invalid HTTP header name: {d(raw_name)}") - raw_value = raw_value.strip(b" \t") - if not _value_re.fullmatch(raw_value): - raise ValueError(f"invalid HTTP header value: {d(raw_value)}") - - name = raw_name.decode("ascii") # guaranteed to be ASCII at this point - value = raw_value.decode("ascii", "surrogateescape") - headers[name] = value - - else: - raise SecurityError("too many HTTP headers") - - return headers - - -async def read_line(stream: asyncio.StreamReader) -> bytes: - """ - Read a single line from ``stream``. - - CRLF is stripped from the return value. - - """ - # Security: this is bounded by the StreamReader's limit (default = 32Â KiB). - line = await stream.readline() - # Security: this guarantees header values are small (hard-coded = 8Â KiB) - if len(line) > MAX_LINE: - raise SecurityError("line too long") - # Not mandatory but safe - https://www.rfc-editor.org/rfc/rfc7230.html#section-3.5 - if not line.endswith(b"\r\n"): - raise EOFError("line without CRLF") - return line[:-2] diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/protocol.py b/venv/lib/python3.11/site-packages/websockets/legacy/protocol.py deleted file mode 100644 index 19cee0e..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/protocol.py +++ /dev/null @@ -1,1645 +0,0 @@ -from __future__ import annotations - -import asyncio -import codecs -import collections -import logging -import random -import ssl -import struct -import sys -import time -import uuid -import warnings -from typing import ( - Any, - AsyncIterable, - AsyncIterator, - Awaitable, - Callable, - Deque, - Dict, - Iterable, - List, - Mapping, - Optional, - Tuple, - Union, - cast, -) - -from ..datastructures import Headers -from ..exceptions import ( - ConnectionClosed, - ConnectionClosedError, - ConnectionClosedOK, - InvalidState, - PayloadTooBig, - ProtocolError, -) -from ..extensions import Extension -from ..frames import ( - OK_CLOSE_CODES, - OP_BINARY, - OP_CLOSE, - OP_CONT, - OP_PING, - OP_PONG, - OP_TEXT, - Close, - CloseCode, - Opcode, - prepare_ctrl, - prepare_data, -) -from ..protocol import State -from ..typing import Data, LoggerLike, Subprotocol -from .compatibility import asyncio_timeout -from .framing import Frame - - -__all__ = ["WebSocketCommonProtocol", "broadcast"] - - -# In order to ensure consistency, the code always checks the current value of -# WebSocketCommonProtocol.state before assigning a new value and never yields -# between the check and the assignment. - - -class WebSocketCommonProtocol(asyncio.Protocol): - """ - WebSocket connection. - - :class:`WebSocketCommonProtocol` provides APIs shared between WebSocket - servers and clients. You shouldn't use it directly. Instead, use - :class:`~websockets.client.WebSocketClientProtocol` or - :class:`~websockets.server.WebSocketServerProtocol`. - - This documentation focuses on low-level details that aren't covered in the - documentation of :class:`~websockets.client.WebSocketClientProtocol` and - :class:`~websockets.server.WebSocketServerProtocol` for the sake of - simplicity. - - Once the connection is open, a Ping_ frame is sent every ``ping_interval`` - seconds. This serves as a keepalive. It helps keeping the connection open, - especially in the presence of proxies with short timeouts on inactive - connections. Set ``ping_interval`` to :obj:`None` to disable this behavior. - - .. _Ping: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.5.2 - - If the corresponding Pong_ frame isn't received within ``ping_timeout`` - seconds, the connection is considered unusable and is closed with code 1011. - This ensures that the remote endpoint remains responsive. Set - ``ping_timeout`` to :obj:`None` to disable this behavior. - - .. _Pong: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.5.3 - - See the discussion of :doc:`timeouts <../../topics/timeouts>` for details. - - The ``close_timeout`` parameter defines a maximum wait time for completing - the closing handshake and terminating the TCP connection. For legacy - reasons, :meth:`close` completes in at most ``5 * close_timeout`` seconds - for clients and ``4 * close_timeout`` for servers. - - ``close_timeout`` is a parameter of the protocol because websockets usually - calls :meth:`close` implicitly upon exit: - - * on the client side, when using :func:`~websockets.client.connect` as a - context manager; - * on the server side, when the connection handler terminates. - - To apply a timeout to any other API, wrap it in :func:`~asyncio.timeout` or - :func:`~asyncio.wait_for`. - - The ``max_size`` parameter enforces the maximum size for incoming messages - in bytes. The default value is 1Â MiB. If a larger message is received, - :meth:`recv` will raise :exc:`~websockets.exceptions.ConnectionClosedError` - and the connection will be closed with code 1009. - - The ``max_queue`` parameter sets the maximum length of the queue that - holds incoming messages. The default value is ``32``. Messages are added - to an in-memory queue when they're received; then :meth:`recv` pops from - that queue. In order to prevent excessive memory consumption when - messages are received faster than they can be processed, the queue must - be bounded. If the queue fills up, the protocol stops processing incoming - data until :meth:`recv` is called. In this situation, various receive - buffers (at least in :mod:`asyncio` and in the OS) will fill up, then the - TCP receive window will shrink, slowing down transmission to avoid packet - loss. - - Since Python can use up to 4 bytes of memory to represent a single - character, each connection may use up to ``4 * max_size * max_queue`` - bytes of memory to store incoming messages. By default, this is 128Â MiB. - You may want to lower the limits, depending on your application's - requirements. - - The ``read_limit`` argument sets the high-water limit of the buffer for - incoming bytes. The low-water limit is half the high-water limit. The - default value is 64Â KiB, half of asyncio's default (based on the current - implementation of :class:`~asyncio.StreamReader`). - - The ``write_limit`` argument sets the high-water limit of the buffer for - outgoing bytes. The low-water limit is a quarter of the high-water limit. - The default value is 64Â KiB, equal to asyncio's default (based on the - current implementation of ``FlowControlMixin``). - - See the discussion of :doc:`memory usage <../../topics/memory>` for details. - - Args: - logger: Logger for this server. - It defaults to ``logging.getLogger("websockets.protocol")``. - See the :doc:`logging guide <../../topics/logging>` for details. - ping_interval: Delay between keepalive pings in seconds. - :obj:`None` disables keepalive pings. - ping_timeout: Timeout for keepalive pings in seconds. - :obj:`None` disables timeouts. - close_timeout: Timeout for closing the connection in seconds. - For legacy reasons, the actual timeout is 4 or 5 times larger. - max_size: Maximum size of incoming messages in bytes. - :obj:`None` disables the limit. - max_queue: Maximum number of incoming messages in receive buffer. - :obj:`None` disables the limit. - read_limit: High-water mark of read buffer in bytes. - write_limit: High-water mark of write buffer in bytes. - - """ - - # There are only two differences between the client-side and server-side - # behavior: masking the payload and closing the underlying TCP connection. - # Set is_client = True/False and side = "client"/"server" to pick a side. - is_client: bool - side: str = "undefined" - - def __init__( - self, - *, - logger: Optional[LoggerLike] = None, - ping_interval: Optional[float] = 20, - ping_timeout: Optional[float] = 20, - close_timeout: Optional[float] = None, - max_size: Optional[int] = 2**20, - max_queue: Optional[int] = 2**5, - read_limit: int = 2**16, - write_limit: int = 2**16, - # The following arguments are kept only for backwards compatibility. - host: Optional[str] = None, - port: Optional[int] = None, - secure: Optional[bool] = None, - legacy_recv: bool = False, - loop: Optional[asyncio.AbstractEventLoop] = None, - timeout: Optional[float] = None, - ) -> None: - if legacy_recv: # pragma: no cover - warnings.warn("legacy_recv is deprecated", DeprecationWarning) - - # Backwards compatibility: close_timeout used to be called timeout. - if timeout is None: - timeout = 10 - else: - warnings.warn("rename timeout to close_timeout", DeprecationWarning) - # If both are specified, timeout is ignored. - if close_timeout is None: - close_timeout = timeout - - # Backwards compatibility: the loop parameter used to be supported. - if loop is None: - loop = asyncio.get_event_loop() - else: - warnings.warn("remove loop argument", DeprecationWarning) - - self.ping_interval = ping_interval - self.ping_timeout = ping_timeout - self.close_timeout = close_timeout - self.max_size = max_size - self.max_queue = max_queue - self.read_limit = read_limit - self.write_limit = write_limit - - # Unique identifier. For logs. - self.id: uuid.UUID = uuid.uuid4() - """Unique identifier of the connection. Useful in logs.""" - - # Logger or LoggerAdapter for this connection. - if logger is None: - logger = logging.getLogger("websockets.protocol") - self.logger: LoggerLike = logging.LoggerAdapter(logger, {"websocket": self}) - """Logger for this connection.""" - - # Track if DEBUG is enabled. Shortcut logging calls if it isn't. - self.debug = logger.isEnabledFor(logging.DEBUG) - - self.loop = loop - - self._host = host - self._port = port - self._secure = secure - self.legacy_recv = legacy_recv - - # Configure read buffer limits. The high-water limit is defined by - # ``self.read_limit``. The ``limit`` argument controls the line length - # limit and half the buffer limit of :class:`~asyncio.StreamReader`. - # That's why it must be set to half of ``self.read_limit``. - self.reader = asyncio.StreamReader(limit=read_limit // 2, loop=loop) - - # Copied from asyncio.FlowControlMixin - self._paused = False - self._drain_waiter: Optional[asyncio.Future[None]] = None - - self._drain_lock = asyncio.Lock() - - # This class implements the data transfer and closing handshake, which - # are shared between the client-side and the server-side. - # Subclasses implement the opening handshake and, on success, execute - # :meth:`connection_open` to change the state to OPEN. - self.state = State.CONNECTING - if self.debug: - self.logger.debug("= connection is CONNECTING") - - # HTTP protocol parameters. - self.path: str - """Path of the opening handshake request.""" - self.request_headers: Headers - """Opening handshake request headers.""" - self.response_headers: Headers - """Opening handshake response headers.""" - - # WebSocket protocol parameters. - self.extensions: List[Extension] = [] - self.subprotocol: Optional[Subprotocol] = None - """Subprotocol, if one was negotiated.""" - - # Close code and reason, set when a close frame is sent or received. - self.close_rcvd: Optional[Close] = None - self.close_sent: Optional[Close] = None - self.close_rcvd_then_sent: Optional[bool] = None - - # Completed when the connection state becomes CLOSED. Translates the - # :meth:`connection_lost` callback to a :class:`~asyncio.Future` - # that can be awaited. (Other :class:`~asyncio.Protocol` callbacks are - # translated by ``self.stream_reader``). - self.connection_lost_waiter: asyncio.Future[None] = loop.create_future() - - # Queue of received messages. - self.messages: Deque[Data] = collections.deque() - self._pop_message_waiter: Optional[asyncio.Future[None]] = None - self._put_message_waiter: Optional[asyncio.Future[None]] = None - - # Protect sending fragmented messages. - self._fragmented_message_waiter: Optional[asyncio.Future[None]] = None - - # Mapping of ping IDs to pong waiters, in chronological order. - self.pings: Dict[bytes, Tuple[asyncio.Future[float], float]] = {} - - self.latency: float = 0 - """ - Latency of the connection, in seconds. - - This value is updated after sending a ping frame and receiving a - matching pong frame. Before the first ping, :attr:`latency` is ``0``. - - By default, websockets enables a :ref:`keepalive <keepalive>` mechanism - that sends ping frames automatically at regular intervals. You can also - send ping frames and measure latency with :meth:`ping`. - """ - - # Task running the data transfer. - self.transfer_data_task: asyncio.Task[None] - - # Exception that occurred during data transfer, if any. - self.transfer_data_exc: Optional[BaseException] = None - - # Task sending keepalive pings. - self.keepalive_ping_task: asyncio.Task[None] - - # Task closing the TCP connection. - self.close_connection_task: asyncio.Task[None] - - # Copied from asyncio.FlowControlMixin - async def _drain_helper(self) -> None: # pragma: no cover - if self.connection_lost_waiter.done(): - raise ConnectionResetError("Connection lost") - if not self._paused: - return - waiter = self._drain_waiter - assert waiter is None or waiter.cancelled() - waiter = self.loop.create_future() - self._drain_waiter = waiter - await waiter - - # Copied from asyncio.StreamWriter - async def _drain(self) -> None: # pragma: no cover - if self.reader is not None: - exc = self.reader.exception() - if exc is not None: - raise exc - if self.transport is not None: - if self.transport.is_closing(): - # Yield to the event loop so connection_lost() may be - # called. Without this, _drain_helper() would return - # immediately, and code that calls - # write(...); yield from drain() - # in a loop would never call connection_lost(), so it - # would not see an error when the socket is closed. - await asyncio.sleep(0) - await self._drain_helper() - - def connection_open(self) -> None: - """ - Callback when the WebSocket opening handshake completes. - - Enter the OPEN state and start the data transfer phase. - - """ - # 4.1. The WebSocket Connection is Established. - assert self.state is State.CONNECTING - self.state = State.OPEN - if self.debug: - self.logger.debug("= connection is OPEN") - # Start the task that receives incoming WebSocket messages. - self.transfer_data_task = self.loop.create_task(self.transfer_data()) - # Start the task that sends pings at regular intervals. - self.keepalive_ping_task = self.loop.create_task(self.keepalive_ping()) - # Start the task that eventually closes the TCP connection. - self.close_connection_task = self.loop.create_task(self.close_connection()) - - @property - def host(self) -> Optional[str]: - alternative = "remote_address" if self.is_client else "local_address" - warnings.warn(f"use {alternative}[0] instead of host", DeprecationWarning) - return self._host - - @property - def port(self) -> Optional[int]: - alternative = "remote_address" if self.is_client else "local_address" - warnings.warn(f"use {alternative}[1] instead of port", DeprecationWarning) - return self._port - - @property - def secure(self) -> Optional[bool]: - warnings.warn("don't use secure", DeprecationWarning) - return self._secure - - # Public API - - @property - def local_address(self) -> Any: - """ - Local address of the connection. - - For IPv4 connections, this is a ``(host, port)`` tuple. - - The format of the address depends on the address family; - see :meth:`~socket.socket.getsockname`. - - :obj:`None` if the TCP connection isn't established yet. - - """ - try: - transport = self.transport - except AttributeError: - return None - else: - return transport.get_extra_info("sockname") - - @property - def remote_address(self) -> Any: - """ - Remote address of the connection. - - For IPv4 connections, this is a ``(host, port)`` tuple. - - The format of the address depends on the address family; - see :meth:`~socket.socket.getpeername`. - - :obj:`None` if the TCP connection isn't established yet. - - """ - try: - transport = self.transport - except AttributeError: - return None - else: - return transport.get_extra_info("peername") - - @property - def open(self) -> bool: - """ - :obj:`True` when the connection is open; :obj:`False` otherwise. - - This attribute may be used to detect disconnections. However, this - approach is discouraged per the EAFP_ principle. Instead, you should - handle :exc:`~websockets.exceptions.ConnectionClosed` exceptions. - - .. _EAFP: https://docs.python.org/3/glossary.html#term-eafp - - """ - return self.state is State.OPEN and not self.transfer_data_task.done() - - @property - def closed(self) -> bool: - """ - :obj:`True` when the connection is closed; :obj:`False` otherwise. - - Be aware that both :attr:`open` and :attr:`closed` are :obj:`False` - during the opening and closing sequences. - - """ - return self.state is State.CLOSED - - @property - def close_code(self) -> Optional[int]: - """ - WebSocket close code, defined in `section 7.1.5 of RFC 6455`_. - - .. _section 7.1.5 of RFC 6455: - https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5 - - :obj:`None` if the connection isn't closed yet. - - """ - if self.state is not State.CLOSED: - return None - elif self.close_rcvd is None: - return CloseCode.ABNORMAL_CLOSURE - else: - return self.close_rcvd.code - - @property - def close_reason(self) -> Optional[str]: - """ - WebSocket close reason, defined in `section 7.1.6 of RFC 6455`_. - - .. _section 7.1.6 of RFC 6455: - https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.6 - - :obj:`None` if the connection isn't closed yet. - - """ - if self.state is not State.CLOSED: - return None - elif self.close_rcvd is None: - return "" - else: - return self.close_rcvd.reason - - async def __aiter__(self) -> AsyncIterator[Data]: - """ - Iterate on incoming messages. - - The iterator exits normally when the connection is closed with the close - code 1000 (OK) or 1001 (going away) or without a close code. - - It raises a :exc:`~websockets.exceptions.ConnectionClosedError` - exception when the connection is closed with any other code. - - """ - try: - while True: - yield await self.recv() - except ConnectionClosedOK: - return - - async def recv(self) -> Data: - """ - Receive the next message. - - When the connection is closed, :meth:`recv` raises - :exc:`~websockets.exceptions.ConnectionClosed`. Specifically, it raises - :exc:`~websockets.exceptions.ConnectionClosedOK` after a normal - connection closure and - :exc:`~websockets.exceptions.ConnectionClosedError` after a protocol - error or a network failure. This is how you detect the end of the - message stream. - - Canceling :meth:`recv` is safe. There's no risk of losing the next - message. The next invocation of :meth:`recv` will return it. - - This makes it possible to enforce a timeout by wrapping :meth:`recv` in - :func:`~asyncio.timeout` or :func:`~asyncio.wait_for`. - - Returns: - Data: A string (:class:`str`) for a Text_ frame. A bytestring - (:class:`bytes`) for a Binary_ frame. - - .. _Text: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6 - .. _Binary: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6 - - Raises: - ConnectionClosed: When the connection is closed. - RuntimeError: If two coroutines call :meth:`recv` concurrently. - - """ - if self._pop_message_waiter is not None: - raise RuntimeError( - "cannot call recv while another coroutine " - "is already waiting for the next message" - ) - - # Don't await self.ensure_open() here: - # - messages could be available in the queue even if the connection - # is closed; - # - messages could be received before the closing frame even if the - # connection is closing. - - # Wait until there's a message in the queue (if necessary) or the - # connection is closed. - while len(self.messages) <= 0: - pop_message_waiter: asyncio.Future[None] = self.loop.create_future() - self._pop_message_waiter = pop_message_waiter - try: - # If asyncio.wait() is canceled, it doesn't cancel - # pop_message_waiter and self.transfer_data_task. - await asyncio.wait( - [pop_message_waiter, self.transfer_data_task], - return_when=asyncio.FIRST_COMPLETED, - ) - finally: - self._pop_message_waiter = None - - # If asyncio.wait(...) exited because self.transfer_data_task - # completed before receiving a new message, raise a suitable - # exception (or return None if legacy_recv is enabled). - if not pop_message_waiter.done(): - if self.legacy_recv: - return None # type: ignore - else: - # Wait until the connection is closed to raise - # ConnectionClosed with the correct code and reason. - await self.ensure_open() - - # Pop a message from the queue. - message = self.messages.popleft() - - # Notify transfer_data(). - if self._put_message_waiter is not None: - self._put_message_waiter.set_result(None) - self._put_message_waiter = None - - return message - - async def send( - self, - message: Union[Data, Iterable[Data], AsyncIterable[Data]], - ) -> None: - """ - Send a message. - - A string (:class:`str`) is sent as a Text_ frame. A bytestring or - bytes-like object (:class:`bytes`, :class:`bytearray`, or - :class:`memoryview`) is sent as a Binary_ frame. - - .. _Text: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6 - .. _Binary: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6 - - :meth:`send` also accepts an iterable or an asynchronous iterable of - strings, bytestrings, or bytes-like objects to enable fragmentation_. - Each item is treated as a message fragment and sent in its own frame. - All items must be of the same type, or else :meth:`send` will raise a - :exc:`TypeError` and the connection will be closed. - - .. _fragmentation: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.4 - - :meth:`send` rejects dict-like objects because this is often an error. - (If you want to send the keys of a dict-like object as fragments, call - its :meth:`~dict.keys` method and pass the result to :meth:`send`.) - - Canceling :meth:`send` is discouraged. Instead, you should close the - connection with :meth:`close`. Indeed, there are only two situations - where :meth:`send` may yield control to the event loop and then get - canceled; in both cases, :meth:`close` has the same effect and is - more clear: - - 1. The write buffer is full. If you don't want to wait until enough - data is sent, your only alternative is to close the connection. - :meth:`close` will likely time out then abort the TCP connection. - 2. ``message`` is an asynchronous iterator that yields control. - Stopping in the middle of a fragmented message will cause a - protocol error and the connection will be closed. - - When the connection is closed, :meth:`send` raises - :exc:`~websockets.exceptions.ConnectionClosed`. Specifically, it - raises :exc:`~websockets.exceptions.ConnectionClosedOK` after a normal - connection closure and - :exc:`~websockets.exceptions.ConnectionClosedError` after a protocol - error or a network failure. - - Args: - message (Union[Data, Iterable[Data], AsyncIterable[Data]): message - to send. - - Raises: - ConnectionClosed: When the connection is closed. - TypeError: If ``message`` doesn't have a supported type. - - """ - await self.ensure_open() - - # While sending a fragmented message, prevent sending other messages - # until all fragments are sent. - while self._fragmented_message_waiter is not None: - await asyncio.shield(self._fragmented_message_waiter) - - # Unfragmented message -- this case must be handled first because - # strings and bytes-like objects are iterable. - - if isinstance(message, (str, bytes, bytearray, memoryview)): - opcode, data = prepare_data(message) - await self.write_frame(True, opcode, data) - - # Catch a common mistake -- passing a dict to send(). - - elif isinstance(message, Mapping): - raise TypeError("data is a dict-like object") - - # Fragmented message -- regular iterator. - - elif isinstance(message, Iterable): - # Work around https://github.com/python/mypy/issues/6227 - message = cast(Iterable[Data], message) - - iter_message = iter(message) - try: - fragment = next(iter_message) - except StopIteration: - return - opcode, data = prepare_data(fragment) - - self._fragmented_message_waiter = asyncio.Future() - try: - # First fragment. - await self.write_frame(False, opcode, data) - - # Other fragments. - for fragment in iter_message: - confirm_opcode, data = prepare_data(fragment) - if confirm_opcode != opcode: - raise TypeError("data contains inconsistent types") - await self.write_frame(False, OP_CONT, data) - - # Final fragment. - await self.write_frame(True, OP_CONT, b"") - - except (Exception, asyncio.CancelledError): - # We're half-way through a fragmented message and we can't - # complete it. This makes the connection unusable. - self.fail_connection(CloseCode.INTERNAL_ERROR) - raise - - finally: - self._fragmented_message_waiter.set_result(None) - self._fragmented_message_waiter = None - - # Fragmented message -- asynchronous iterator - - elif isinstance(message, AsyncIterable): - # Implement aiter_message = aiter(message) without aiter - # Work around https://github.com/python/mypy/issues/5738 - aiter_message = cast( - Callable[[AsyncIterable[Data]], AsyncIterator[Data]], - type(message).__aiter__, - )(message) - try: - # Implement fragment = anext(aiter_message) without anext - # Work around https://github.com/python/mypy/issues/5738 - fragment = await cast( - Callable[[AsyncIterator[Data]], Awaitable[Data]], - type(aiter_message).__anext__, - )(aiter_message) - except StopAsyncIteration: - return - opcode, data = prepare_data(fragment) - - self._fragmented_message_waiter = asyncio.Future() - try: - # First fragment. - await self.write_frame(False, opcode, data) - - # Other fragments. - async for fragment in aiter_message: - confirm_opcode, data = prepare_data(fragment) - if confirm_opcode != opcode: - raise TypeError("data contains inconsistent types") - await self.write_frame(False, OP_CONT, data) - - # Final fragment. - await self.write_frame(True, OP_CONT, b"") - - except (Exception, asyncio.CancelledError): - # We're half-way through a fragmented message and we can't - # complete it. This makes the connection unusable. - self.fail_connection(CloseCode.INTERNAL_ERROR) - raise - - finally: - self._fragmented_message_waiter.set_result(None) - self._fragmented_message_waiter = None - - else: - raise TypeError("data must be str, bytes-like, or iterable") - - async def close( - self, - code: int = CloseCode.NORMAL_CLOSURE, - reason: str = "", - ) -> None: - """ - Perform the closing handshake. - - :meth:`close` waits for the other end to complete the handshake and - for the TCP connection to terminate. As a consequence, there's no need - to await :meth:`wait_closed` after :meth:`close`. - - :meth:`close` is idempotent: it doesn't do anything once the - connection is closed. - - Wrapping :func:`close` in :func:`~asyncio.create_task` is safe, given - that errors during connection termination aren't particularly useful. - - Canceling :meth:`close` is discouraged. If it takes too long, you can - set a shorter ``close_timeout``. If you don't want to wait, let the - Python process exit, then the OS will take care of closing the TCP - connection. - - Args: - code: WebSocket close code. - reason: WebSocket close reason. - - """ - try: - async with asyncio_timeout(self.close_timeout): - await self.write_close_frame(Close(code, reason)) - except asyncio.TimeoutError: - # If the close frame cannot be sent because the send buffers - # are full, the closing handshake won't complete anyway. - # Fail the connection to shut down faster. - self.fail_connection() - - # If no close frame is received within the timeout, asyncio_timeout() - # cancels the data transfer task and raises TimeoutError. - - # If close() is called multiple times concurrently and one of these - # calls hits the timeout, the data transfer task will be canceled. - # Other calls will receive a CancelledError here. - - try: - # If close() is canceled during the wait, self.transfer_data_task - # is canceled before the timeout elapses. - async with asyncio_timeout(self.close_timeout): - await self.transfer_data_task - except (asyncio.TimeoutError, asyncio.CancelledError): - pass - - # Wait for the close connection task to close the TCP connection. - await asyncio.shield(self.close_connection_task) - - async def wait_closed(self) -> None: - """ - Wait until the connection is closed. - - This coroutine is identical to the :attr:`closed` attribute, except it - can be awaited. - - This can make it easier to detect connection termination, regardless - of its cause, in tasks that interact with the WebSocket connection. - - """ - await asyncio.shield(self.connection_lost_waiter) - - async def ping(self, data: Optional[Data] = None) -> Awaitable[None]: - """ - Send a Ping_. - - .. _Ping: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.5.2 - - A ping may serve as a keepalive, as a check that the remote endpoint - received all messages up to this point, or to measure :attr:`latency`. - - Canceling :meth:`ping` is discouraged. If :meth:`ping` doesn't return - immediately, it means the write buffer is full. If you don't want to - wait, you should close the connection. - - Canceling the :class:`~asyncio.Future` returned by :meth:`ping` has no - effect. - - Args: - data (Optional[Data]): payload of the ping; a string will be - encoded to UTF-8; or :obj:`None` to generate a payload - containing four random bytes. - - Returns: - ~asyncio.Future[float]: A future that will be completed when the - corresponding pong is received. You can ignore it if you don't - intend to wait. The result of the future is the latency of the - connection in seconds. - - :: - - pong_waiter = await ws.ping() - # only if you want to wait for the corresponding pong - latency = await pong_waiter - - Raises: - ConnectionClosed: When the connection is closed. - RuntimeError: If another ping was sent with the same data and - the corresponding pong wasn't received yet. - - """ - await self.ensure_open() - - if data is not None: - data = prepare_ctrl(data) - - # Protect against duplicates if a payload is explicitly set. - if data in self.pings: - raise RuntimeError("already waiting for a pong with the same data") - - # Generate a unique random payload otherwise. - while data is None or data in self.pings: - data = struct.pack("!I", random.getrandbits(32)) - - pong_waiter = self.loop.create_future() - # Resolution of time.monotonic() may be too low on Windows. - ping_timestamp = time.perf_counter() - self.pings[data] = (pong_waiter, ping_timestamp) - - await self.write_frame(True, OP_PING, data) - - return asyncio.shield(pong_waiter) - - async def pong(self, data: Data = b"") -> None: - """ - Send a Pong_. - - .. _Pong: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.5.3 - - An unsolicited pong may serve as a unidirectional heartbeat. - - Canceling :meth:`pong` is discouraged. If :meth:`pong` doesn't return - immediately, it means the write buffer is full. If you don't want to - wait, you should close the connection. - - Args: - data (Data): Payload of the pong. A string will be encoded to - UTF-8. - - Raises: - ConnectionClosed: When the connection is closed. - - """ - await self.ensure_open() - - data = prepare_ctrl(data) - - await self.write_frame(True, OP_PONG, data) - - # Private methods - no guarantees. - - def connection_closed_exc(self) -> ConnectionClosed: - exc: ConnectionClosed - if ( - self.close_rcvd is not None - and self.close_rcvd.code in OK_CLOSE_CODES - and self.close_sent is not None - and self.close_sent.code in OK_CLOSE_CODES - ): - exc = ConnectionClosedOK( - self.close_rcvd, - self.close_sent, - self.close_rcvd_then_sent, - ) - else: - exc = ConnectionClosedError( - self.close_rcvd, - self.close_sent, - self.close_rcvd_then_sent, - ) - # Chain to the exception that terminated data transfer, if any. - exc.__cause__ = self.transfer_data_exc - return exc - - async def ensure_open(self) -> None: - """ - Check that the WebSocket connection is open. - - Raise :exc:`~websockets.exceptions.ConnectionClosed` if it isn't. - - """ - # Handle cases from most common to least common for performance. - if self.state is State.OPEN: - # If self.transfer_data_task exited without a closing handshake, - # self.close_connection_task may be closing the connection, going - # straight from OPEN to CLOSED. - if self.transfer_data_task.done(): - await asyncio.shield(self.close_connection_task) - raise self.connection_closed_exc() - else: - return - - if self.state is State.CLOSED: - raise self.connection_closed_exc() - - if self.state is State.CLOSING: - # If we started the closing handshake, wait for its completion to - # get the proper close code and reason. self.close_connection_task - # will complete within 4 or 5 * close_timeout after close(). The - # CLOSING state also occurs when failing the connection. In that - # case self.close_connection_task will complete even faster. - await asyncio.shield(self.close_connection_task) - raise self.connection_closed_exc() - - # Control may only reach this point in buggy third-party subclasses. - assert self.state is State.CONNECTING - raise InvalidState("WebSocket connection isn't established yet") - - async def transfer_data(self) -> None: - """ - Read incoming messages and put them in a queue. - - This coroutine runs in a task until the closing handshake is started. - - """ - try: - while True: - message = await self.read_message() - - # Exit the loop when receiving a close frame. - if message is None: - break - - # Wait until there's room in the queue (if necessary). - if self.max_queue is not None: - while len(self.messages) >= self.max_queue: - self._put_message_waiter = self.loop.create_future() - try: - await asyncio.shield(self._put_message_waiter) - finally: - self._put_message_waiter = None - - # Put the message in the queue. - self.messages.append(message) - - # Notify recv(). - if self._pop_message_waiter is not None: - self._pop_message_waiter.set_result(None) - self._pop_message_waiter = None - - except asyncio.CancelledError as exc: - self.transfer_data_exc = exc - # If fail_connection() cancels this task, avoid logging the error - # twice and failing the connection again. - raise - - except ProtocolError as exc: - self.transfer_data_exc = exc - self.fail_connection(CloseCode.PROTOCOL_ERROR) - - except (ConnectionError, TimeoutError, EOFError, ssl.SSLError) as exc: - # Reading data with self.reader.readexactly may raise: - # - most subclasses of ConnectionError if the TCP connection - # breaks, is reset, or is aborted; - # - TimeoutError if the TCP connection times out; - # - IncompleteReadError, a subclass of EOFError, if fewer - # bytes are available than requested; - # - ssl.SSLError if the other side infringes the TLS protocol. - self.transfer_data_exc = exc - self.fail_connection(CloseCode.ABNORMAL_CLOSURE) - - except UnicodeDecodeError as exc: - self.transfer_data_exc = exc - self.fail_connection(CloseCode.INVALID_DATA) - - except PayloadTooBig as exc: - self.transfer_data_exc = exc - self.fail_connection(CloseCode.MESSAGE_TOO_BIG) - - except Exception as exc: - # This shouldn't happen often because exceptions expected under - # regular circumstances are handled above. If it does, consider - # catching and handling more exceptions. - self.logger.error("data transfer failed", exc_info=True) - - self.transfer_data_exc = exc - self.fail_connection(CloseCode.INTERNAL_ERROR) - - async def read_message(self) -> Optional[Data]: - """ - Read a single message from the connection. - - Re-assemble data frames if the message is fragmented. - - Return :obj:`None` when the closing handshake is started. - - """ - frame = await self.read_data_frame(max_size=self.max_size) - - # A close frame was received. - if frame is None: - return None - - if frame.opcode == OP_TEXT: - text = True - elif frame.opcode == OP_BINARY: - text = False - else: # frame.opcode == OP_CONT - raise ProtocolError("unexpected opcode") - - # Shortcut for the common case - no fragmentation - if frame.fin: - return frame.data.decode("utf-8") if text else frame.data - - # 5.4. Fragmentation - fragments: List[Data] = [] - max_size = self.max_size - if text: - decoder_factory = codecs.getincrementaldecoder("utf-8") - decoder = decoder_factory(errors="strict") - if max_size is None: - - def append(frame: Frame) -> None: - nonlocal fragments - fragments.append(decoder.decode(frame.data, frame.fin)) - - else: - - def append(frame: Frame) -> None: - nonlocal fragments, max_size - fragments.append(decoder.decode(frame.data, frame.fin)) - assert isinstance(max_size, int) - max_size -= len(frame.data) - - else: - if max_size is None: - - def append(frame: Frame) -> None: - nonlocal fragments - fragments.append(frame.data) - - else: - - def append(frame: Frame) -> None: - nonlocal fragments, max_size - fragments.append(frame.data) - assert isinstance(max_size, int) - max_size -= len(frame.data) - - append(frame) - - while not frame.fin: - frame = await self.read_data_frame(max_size=max_size) - if frame is None: - raise ProtocolError("incomplete fragmented message") - if frame.opcode != OP_CONT: - raise ProtocolError("unexpected opcode") - append(frame) - - return ("" if text else b"").join(fragments) - - async def read_data_frame(self, max_size: Optional[int]) -> Optional[Frame]: - """ - Read a single data frame from the connection. - - Process control frames received before the next data frame. - - Return :obj:`None` if a close frame is encountered before any data frame. - - """ - # 6.2. Receiving Data - while True: - frame = await self.read_frame(max_size) - - # 5.5. Control Frames - if frame.opcode == OP_CLOSE: - # 7.1.5. The WebSocket Connection Close Code - # 7.1.6. The WebSocket Connection Close Reason - self.close_rcvd = Close.parse(frame.data) - if self.close_sent is not None: - self.close_rcvd_then_sent = False - try: - # Echo the original data instead of re-serializing it with - # Close.serialize() because that fails when the close frame - # is empty and Close.parse() synthesizes a 1005 close code. - await self.write_close_frame(self.close_rcvd, frame.data) - except ConnectionClosed: - # Connection closed before we could echo the close frame. - pass - return None - - elif frame.opcode == OP_PING: - # Answer pings, unless connection is CLOSING. - if self.state is State.OPEN: - try: - await self.pong(frame.data) - except ConnectionClosed: - # Connection closed while draining write buffer. - pass - - elif frame.opcode == OP_PONG: - if frame.data in self.pings: - pong_timestamp = time.perf_counter() - # Sending a pong for only the most recent ping is legal. - # Acknowledge all previous pings too in that case. - ping_id = None - ping_ids = [] - for ping_id, (pong_waiter, ping_timestamp) in self.pings.items(): - ping_ids.append(ping_id) - if not pong_waiter.done(): - pong_waiter.set_result(pong_timestamp - ping_timestamp) - if ping_id == frame.data: - self.latency = pong_timestamp - ping_timestamp - break - else: - raise AssertionError("solicited pong not found in pings") - # Remove acknowledged pings from self.pings. - for ping_id in ping_ids: - del self.pings[ping_id] - - # 5.6. Data Frames - else: - return frame - - async def read_frame(self, max_size: Optional[int]) -> Frame: - """ - Read a single frame from the connection. - - """ - frame = await Frame.read( - self.reader.readexactly, - mask=not self.is_client, - max_size=max_size, - extensions=self.extensions, - ) - if self.debug: - self.logger.debug("< %s", frame) - return frame - - def write_frame_sync(self, fin: bool, opcode: int, data: bytes) -> None: - frame = Frame(fin, Opcode(opcode), data) - if self.debug: - self.logger.debug("> %s", frame) - frame.write( - self.transport.write, - mask=self.is_client, - extensions=self.extensions, - ) - - async def drain(self) -> None: - try: - # drain() cannot be called concurrently by multiple coroutines: - # http://bugs.python.org/issue29930. Remove this lock when no - # version of Python where this bugs exists is supported anymore. - async with self._drain_lock: - # Handle flow control automatically. - await self._drain() - except ConnectionError: - # Terminate the connection if the socket died. - self.fail_connection() - # Wait until the connection is closed to raise ConnectionClosed - # with the correct code and reason. - await self.ensure_open() - - async def write_frame( - self, fin: bool, opcode: int, data: bytes, *, _state: int = State.OPEN - ) -> None: - # Defensive assertion for protocol compliance. - if self.state is not _state: # pragma: no cover - raise InvalidState( - f"Cannot write to a WebSocket in the {self.state.name} state" - ) - self.write_frame_sync(fin, opcode, data) - await self.drain() - - async def write_close_frame( - self, close: Close, data: Optional[bytes] = None - ) -> None: - """ - Write a close frame if and only if the connection state is OPEN. - - This dedicated coroutine must be used for writing close frames to - ensure that at most one close frame is sent on a given connection. - - """ - # Test and set the connection state before sending the close frame to - # avoid sending two frames in case of concurrent calls. - if self.state is State.OPEN: - # 7.1.3. The WebSocket Closing Handshake is Started - self.state = State.CLOSING - if self.debug: - self.logger.debug("= connection is CLOSING") - - self.close_sent = close - if self.close_rcvd is not None: - self.close_rcvd_then_sent = True - if data is None: - data = close.serialize() - - # 7.1.2. Start the WebSocket Closing Handshake - await self.write_frame(True, OP_CLOSE, data, _state=State.CLOSING) - - async def keepalive_ping(self) -> None: - """ - Send a Ping frame and wait for a Pong frame at regular intervals. - - This coroutine exits when the connection terminates and one of the - following happens: - - - :meth:`ping` raises :exc:`ConnectionClosed`, or - - :meth:`close_connection` cancels :attr:`keepalive_ping_task`. - - """ - if self.ping_interval is None: - return - - try: - while True: - await asyncio.sleep(self.ping_interval) - - # ping() raises CancelledError if the connection is closed, - # when close_connection() cancels self.keepalive_ping_task. - - # ping() raises ConnectionClosed if the connection is lost, - # when connection_lost() calls abort_pings(). - - self.logger.debug("% sending keepalive ping") - pong_waiter = await self.ping() - - if self.ping_timeout is not None: - try: - async with asyncio_timeout(self.ping_timeout): - await pong_waiter - self.logger.debug("% received keepalive pong") - except asyncio.TimeoutError: - if self.debug: - self.logger.debug("! timed out waiting for keepalive pong") - self.fail_connection( - CloseCode.INTERNAL_ERROR, - "keepalive ping timeout", - ) - break - - except ConnectionClosed: - pass - - except Exception: - self.logger.error("keepalive ping failed", exc_info=True) - - async def close_connection(self) -> None: - """ - 7.1.1. Close the WebSocket Connection - - When the opening handshake succeeds, :meth:`connection_open` starts - this coroutine in a task. It waits for the data transfer phase to - complete then it closes the TCP connection cleanly. - - When the opening handshake fails, :meth:`fail_connection` does the - same. There's no data transfer phase in that case. - - """ - try: - # Wait for the data transfer phase to complete. - if hasattr(self, "transfer_data_task"): - try: - await self.transfer_data_task - except asyncio.CancelledError: - pass - - # Cancel the keepalive ping task. - if hasattr(self, "keepalive_ping_task"): - self.keepalive_ping_task.cancel() - - # A client should wait for a TCP close from the server. - if self.is_client and hasattr(self, "transfer_data_task"): - if await self.wait_for_connection_lost(): - return - if self.debug: - self.logger.debug("! timed out waiting for TCP close") - - # Half-close the TCP connection if possible (when there's no TLS). - if self.transport.can_write_eof(): - if self.debug: - self.logger.debug("x half-closing TCP connection") - # write_eof() doesn't document which exceptions it raises. - # "[Errno 107] Transport endpoint is not connected" happens - # but it isn't completely clear under which circumstances. - # uvloop can raise RuntimeError here. - try: - self.transport.write_eof() - except (OSError, RuntimeError): # pragma: no cover - pass - - if await self.wait_for_connection_lost(): - return - if self.debug: - self.logger.debug("! timed out waiting for TCP close") - - finally: - # The try/finally ensures that the transport never remains open, - # even if this coroutine is canceled (for example). - await self.close_transport() - - async def close_transport(self) -> None: - """ - Close the TCP connection. - - """ - # If connection_lost() was called, the TCP connection is closed. - # However, if TLS is enabled, the transport still needs closing. - # Else asyncio complains: ResourceWarning: unclosed transport. - if self.connection_lost_waiter.done() and self.transport.is_closing(): - return - - # Close the TCP connection. Buffers are flushed asynchronously. - if self.debug: - self.logger.debug("x closing TCP connection") - self.transport.close() - - if await self.wait_for_connection_lost(): - return - if self.debug: - self.logger.debug("! timed out waiting for TCP close") - - # Abort the TCP connection. Buffers are discarded. - if self.debug: - self.logger.debug("x aborting TCP connection") - # Due to a bug in coverage, this is erroneously reported as not covered. - self.transport.abort() # pragma: no cover - - # connection_lost() is called quickly after aborting. - await self.wait_for_connection_lost() - - async def wait_for_connection_lost(self) -> bool: - """ - Wait until the TCP connection is closed or ``self.close_timeout`` elapses. - - Return :obj:`True` if the connection is closed and :obj:`False` - otherwise. - - """ - if not self.connection_lost_waiter.done(): - try: - async with asyncio_timeout(self.close_timeout): - await asyncio.shield(self.connection_lost_waiter) - except asyncio.TimeoutError: - pass - # Re-check self.connection_lost_waiter.done() synchronously because - # connection_lost() could run between the moment the timeout occurs - # and the moment this coroutine resumes running. - return self.connection_lost_waiter.done() - - def fail_connection( - self, - code: int = CloseCode.ABNORMAL_CLOSURE, - reason: str = "", - ) -> None: - """ - 7.1.7. Fail the WebSocket Connection - - This requires: - - 1. Stopping all processing of incoming data, which means cancelling - :attr:`transfer_data_task`. The close code will be 1006 unless a - close frame was received earlier. - - 2. Sending a close frame with an appropriate code if the opening - handshake succeeded and the other side is likely to process it. - - 3. Closing the connection. :meth:`close_connection` takes care of - this once :attr:`transfer_data_task` exits after being canceled. - - (The specification describes these steps in the opposite order.) - - """ - if self.debug: - self.logger.debug("! failing connection with code %d", code) - - # Cancel transfer_data_task if the opening handshake succeeded. - # cancel() is idempotent and ignored if the task is done already. - if hasattr(self, "transfer_data_task"): - self.transfer_data_task.cancel() - - # Send a close frame when the state is OPEN (a close frame was already - # sent if it's CLOSING), except when failing the connection because of - # an error reading from or writing to the network. - # Don't send a close frame if the connection is broken. - if code != CloseCode.ABNORMAL_CLOSURE and self.state is State.OPEN: - close = Close(code, reason) - - # Write the close frame without draining the write buffer. - - # Keeping fail_connection() synchronous guarantees it can't - # get stuck and simplifies the implementation of the callers. - # Not drainig the write buffer is acceptable in this context. - - # This duplicates a few lines of code from write_close_frame(). - - self.state = State.CLOSING - if self.debug: - self.logger.debug("= connection is CLOSING") - - # If self.close_rcvd was set, the connection state would be - # CLOSING. Therefore self.close_rcvd isn't set and we don't - # have to set self.close_rcvd_then_sent. - assert self.close_rcvd is None - self.close_sent = close - - self.write_frame_sync(True, OP_CLOSE, close.serialize()) - - # Start close_connection_task if the opening handshake didn't succeed. - if not hasattr(self, "close_connection_task"): - self.close_connection_task = self.loop.create_task(self.close_connection()) - - def abort_pings(self) -> None: - """ - Raise ConnectionClosed in pending keepalive pings. - - They'll never receive a pong once the connection is closed. - - """ - assert self.state is State.CLOSED - exc = self.connection_closed_exc() - - for pong_waiter, _ping_timestamp in self.pings.values(): - pong_waiter.set_exception(exc) - # If the exception is never retrieved, it will be logged when ping - # is garbage-collected. This is confusing for users. - # Given that ping is done (with an exception), canceling it does - # nothing, but it prevents logging the exception. - pong_waiter.cancel() - - # asyncio.Protocol methods - - def connection_made(self, transport: asyncio.BaseTransport) -> None: - """ - Configure write buffer limits. - - The high-water limit is defined by ``self.write_limit``. - - The low-water limit currently defaults to ``self.write_limit // 4`` in - :meth:`~asyncio.WriteTransport.set_write_buffer_limits`, which should - be all right for reasonable use cases of this library. - - This is the earliest point where we can get hold of the transport, - which means it's the best point for configuring it. - - """ - transport = cast(asyncio.Transport, transport) - transport.set_write_buffer_limits(self.write_limit) - self.transport = transport - - # Copied from asyncio.StreamReaderProtocol - self.reader.set_transport(transport) - - def connection_lost(self, exc: Optional[Exception]) -> None: - """ - 7.1.4. The WebSocket Connection is Closed. - - """ - self.state = State.CLOSED - self.logger.debug("= connection is CLOSED") - - self.abort_pings() - - # If self.connection_lost_waiter isn't pending, that's a bug, because: - # - it's set only here in connection_lost() which is called only once; - # - it must never be canceled. - self.connection_lost_waiter.set_result(None) - - if True: # pragma: no cover - # Copied from asyncio.StreamReaderProtocol - if self.reader is not None: - if exc is None: - self.reader.feed_eof() - else: - self.reader.set_exception(exc) - - # Copied from asyncio.FlowControlMixin - # Wake up the writer if currently paused. - if not self._paused: - return - waiter = self._drain_waiter - if waiter is None: - return - self._drain_waiter = None - if waiter.done(): - return - if exc is None: - waiter.set_result(None) - else: - waiter.set_exception(exc) - - def pause_writing(self) -> None: # pragma: no cover - assert not self._paused - self._paused = True - - def resume_writing(self) -> None: # pragma: no cover - assert self._paused - self._paused = False - - waiter = self._drain_waiter - if waiter is not None: - self._drain_waiter = None - if not waiter.done(): - waiter.set_result(None) - - def data_received(self, data: bytes) -> None: - self.reader.feed_data(data) - - def eof_received(self) -> None: - """ - Close the transport after receiving EOF. - - The WebSocket protocol has its own closing handshake: endpoints close - the TCP or TLS connection after sending and receiving a close frame. - - As a consequence, they never need to write after receiving EOF, so - there's no reason to keep the transport open by returning :obj:`True`. - - Besides, that doesn't work on TLS connections. - - """ - self.reader.feed_eof() - - -def broadcast( - websockets: Iterable[WebSocketCommonProtocol], - message: Data, - raise_exceptions: bool = False, -) -> None: - """ - Broadcast a message to several WebSocket connections. - - A string (:class:`str`) is sent as a Text_ frame. A bytestring or bytes-like - object (:class:`bytes`, :class:`bytearray`, or :class:`memoryview`) is sent - as a Binary_ frame. - - .. _Text: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6 - .. _Binary: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6 - - :func:`broadcast` pushes the message synchronously to all connections even - if their write buffers are overflowing. There's no backpressure. - - If you broadcast messages faster than a connection can handle them, messages - will pile up in its write buffer until the connection times out. Keep - ``ping_interval`` and ``ping_timeout`` low to prevent excessive memory usage - from slow connections. - - Unlike :meth:`~websockets.server.WebSocketServerProtocol.send`, - :func:`broadcast` doesn't support sending fragmented messages. Indeed, - fragmentation is useful for sending large messages without buffering them in - memory, while :func:`broadcast` buffers one copy per connection as fast as - possible. - - :func:`broadcast` skips connections that aren't open in order to avoid - errors on connections where the closing handshake is in progress. - - :func:`broadcast` ignores failures to write the message on some connections. - It continues writing to other connections. On Python 3.11 and above, you - may set ``raise_exceptions`` to :obj:`True` to record failures and raise all - exceptions in a :pep:`654` :exc:`ExceptionGroup`. - - Args: - websockets: WebSocket connections to which the message will be sent. - message: Message to send. - raise_exceptions: Whether to raise an exception in case of failures. - - Raises: - TypeError: If ``message`` doesn't have a supported type. - - """ - if not isinstance(message, (str, bytes, bytearray, memoryview)): - raise TypeError("data must be str or bytes-like") - - if raise_exceptions: - if sys.version_info[:2] < (3, 11): # pragma: no cover - raise ValueError("raise_exceptions requires at least Python 3.11") - exceptions = [] - - opcode, data = prepare_data(message) - - for websocket in websockets: - if websocket.state is not State.OPEN: - continue - - if websocket._fragmented_message_waiter is not None: - if raise_exceptions: - exception = RuntimeError("sending a fragmented message") - exceptions.append(exception) - else: - websocket.logger.warning( - "skipped broadcast: sending a fragmented message", - ) - - try: - websocket.write_frame_sync(True, opcode, data) - except Exception as write_exception: - if raise_exceptions: - exception = RuntimeError("failed to write message") - exception.__cause__ = write_exception - exceptions.append(exception) - else: - websocket.logger.warning( - "skipped broadcast: failed to write message", - exc_info=True, - ) - - if raise_exceptions: - raise ExceptionGroup("skipped broadcast", exceptions) diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/server.py b/venv/lib/python3.11/site-packages/websockets/legacy/server.py deleted file mode 100644 index 7c24dd7..0000000 --- a/venv/lib/python3.11/site-packages/websockets/legacy/server.py +++ /dev/null @@ -1,1185 +0,0 @@ -from __future__ import annotations - -import asyncio -import email.utils -import functools -import http -import inspect -import logging -import socket -import warnings -from types import TracebackType -from typing import ( - Any, - Awaitable, - Callable, - Generator, - Iterable, - List, - Optional, - Sequence, - Set, - Tuple, - Type, - Union, - cast, -) - -from ..datastructures import Headers, HeadersLike, MultipleValuesError -from ..exceptions import ( - AbortHandshake, - InvalidHandshake, - InvalidHeader, - InvalidMessage, - InvalidOrigin, - InvalidUpgrade, - NegotiationError, -) -from ..extensions import Extension, ServerExtensionFactory -from ..extensions.permessage_deflate import enable_server_permessage_deflate -from ..headers import ( - build_extension, - parse_extension, - parse_subprotocol, - validate_subprotocols, -) -from ..http import USER_AGENT -from ..protocol import State -from ..typing import ExtensionHeader, LoggerLike, Origin, StatusLike, Subprotocol -from .compatibility import asyncio_timeout -from .handshake import build_response, check_request -from .http import read_request -from .protocol import WebSocketCommonProtocol - - -__all__ = ["serve", "unix_serve", "WebSocketServerProtocol", "WebSocketServer"] - - -HeadersLikeOrCallable = Union[HeadersLike, Callable[[str, Headers], HeadersLike]] - -HTTPResponse = Tuple[StatusLike, HeadersLike, bytes] - - -class WebSocketServerProtocol(WebSocketCommonProtocol): - """ - WebSocket server connection. - - :class:`WebSocketServerProtocol` provides :meth:`recv` and :meth:`send` - coroutines for receiving and sending messages. - - It supports asynchronous iteration to receive messages:: - - async for message in websocket: - await process(message) - - The iterator exits normally when the connection is closed with close code - 1000 (OK) or 1001 (going away) or without a close code. It raises - a :exc:`~websockets.exceptions.ConnectionClosedError` when the connection - is closed with any other code. - - You may customize the opening handshake in a subclass by - overriding :meth:`process_request` or :meth:`select_subprotocol`. - - Args: - ws_server: WebSocket server that created this connection. - - See :func:`serve` for the documentation of ``ws_handler``, ``logger``, ``origins``, - ``extensions``, ``subprotocols``, ``extra_headers``, and ``server_header``. - - See :class:`~websockets.legacy.protocol.WebSocketCommonProtocol` for the - documentation of ``ping_interval``, ``ping_timeout``, ``close_timeout``, - ``max_size``, ``max_queue``, ``read_limit``, and ``write_limit``. - - """ - - is_client = False - side = "server" - - def __init__( - self, - ws_handler: Union[ - Callable[[WebSocketServerProtocol], Awaitable[Any]], - Callable[[WebSocketServerProtocol, str], Awaitable[Any]], # deprecated - ], - ws_server: WebSocketServer, - *, - logger: Optional[LoggerLike] = None, - origins: Optional[Sequence[Optional[Origin]]] = None, - extensions: Optional[Sequence[ServerExtensionFactory]] = None, - subprotocols: Optional[Sequence[Subprotocol]] = None, - extra_headers: Optional[HeadersLikeOrCallable] = None, - server_header: Optional[str] = USER_AGENT, - process_request: Optional[ - Callable[[str, Headers], Awaitable[Optional[HTTPResponse]]] - ] = None, - select_subprotocol: Optional[ - Callable[[Sequence[Subprotocol], Sequence[Subprotocol]], Subprotocol] - ] = None, - open_timeout: Optional[float] = 10, - **kwargs: Any, - ) -> None: - if logger is None: - logger = logging.getLogger("websockets.server") - super().__init__(logger=logger, **kwargs) - # For backwards compatibility with 6.0 or earlier. - if origins is not None and "" in origins: - warnings.warn("use None instead of '' in origins", DeprecationWarning) - origins = [None if origin == "" else origin for origin in origins] - # For backwards compatibility with 10.0 or earlier. Done here in - # addition to serve to trigger the deprecation warning on direct - # use of WebSocketServerProtocol. - self.ws_handler = remove_path_argument(ws_handler) - self.ws_server = ws_server - self.origins = origins - self.available_extensions = extensions - self.available_subprotocols = subprotocols - self.extra_headers = extra_headers - self.server_header = server_header - self._process_request = process_request - self._select_subprotocol = select_subprotocol - self.open_timeout = open_timeout - - def connection_made(self, transport: asyncio.BaseTransport) -> None: - """ - Register connection and initialize a task to handle it. - - """ - super().connection_made(transport) - # Register the connection with the server before creating the handler - # task. Registering at the beginning of the handler coroutine would - # create a race condition between the creation of the task, which - # schedules its execution, and the moment the handler starts running. - self.ws_server.register(self) - self.handler_task = self.loop.create_task(self.handler()) - - async def handler(self) -> None: - """ - Handle the lifecycle of a WebSocket connection. - - Since this method doesn't have a caller able to handle exceptions, it - attempts to log relevant ones and guarantees that the TCP connection is - closed before exiting. - - """ - try: - try: - async with asyncio_timeout(self.open_timeout): - await self.handshake( - origins=self.origins, - available_extensions=self.available_extensions, - available_subprotocols=self.available_subprotocols, - extra_headers=self.extra_headers, - ) - except asyncio.TimeoutError: # pragma: no cover - raise - except ConnectionError: - raise - except Exception as exc: - if isinstance(exc, AbortHandshake): - status, headers, body = exc.status, exc.headers, exc.body - elif isinstance(exc, InvalidOrigin): - if self.debug: - self.logger.debug("! invalid origin", exc_info=True) - status, headers, body = ( - http.HTTPStatus.FORBIDDEN, - Headers(), - f"Failed to open a WebSocket connection: {exc}.\n".encode(), - ) - elif isinstance(exc, InvalidUpgrade): - if self.debug: - self.logger.debug("! invalid upgrade", exc_info=True) - status, headers, body = ( - http.HTTPStatus.UPGRADE_REQUIRED, - Headers([("Upgrade", "websocket")]), - ( - f"Failed to open a WebSocket connection: {exc}.\n" - f"\n" - f"You cannot access a WebSocket server directly " - f"with a browser. You need a WebSocket client.\n" - ).encode(), - ) - elif isinstance(exc, InvalidHandshake): - if self.debug: - self.logger.debug("! invalid handshake", exc_info=True) - status, headers, body = ( - http.HTTPStatus.BAD_REQUEST, - Headers(), - f"Failed to open a WebSocket connection: {exc}.\n".encode(), - ) - else: - self.logger.error("opening handshake failed", exc_info=True) - status, headers, body = ( - http.HTTPStatus.INTERNAL_SERVER_ERROR, - Headers(), - ( - b"Failed to open a WebSocket connection.\n" - b"See server log for more information.\n" - ), - ) - - headers.setdefault("Date", email.utils.formatdate(usegmt=True)) - if self.server_header is not None: - headers.setdefault("Server", self.server_header) - - headers.setdefault("Content-Length", str(len(body))) - headers.setdefault("Content-Type", "text/plain") - headers.setdefault("Connection", "close") - - self.write_http_response(status, headers, body) - self.logger.info( - "connection rejected (%d %s)", status.value, status.phrase - ) - await self.close_transport() - return - - try: - await self.ws_handler(self) - except Exception: - self.logger.error("connection handler failed", exc_info=True) - if not self.closed: - self.fail_connection(1011) - raise - - try: - await self.close() - except ConnectionError: - raise - except Exception: - self.logger.error("closing handshake failed", exc_info=True) - raise - - except Exception: - # Last-ditch attempt to avoid leaking connections on errors. - try: - self.transport.close() - except Exception: # pragma: no cover - pass - - finally: - # Unregister the connection with the server when the handler task - # terminates. Registration is tied to the lifecycle of the handler - # task because the server waits for tasks attached to registered - # connections before terminating. - self.ws_server.unregister(self) - self.logger.info("connection closed") - - async def read_http_request(self) -> Tuple[str, Headers]: - """ - Read request line and headers from the HTTP request. - - If the request contains a body, it may be read from ``self.reader`` - after this coroutine returns. - - Raises: - InvalidMessage: if the HTTP message is malformed or isn't an - HTTP/1.1 GET request. - - """ - try: - path, headers = await read_request(self.reader) - except asyncio.CancelledError: # pragma: no cover - raise - except Exception as exc: - raise InvalidMessage("did not receive a valid HTTP request") from exc - - if self.debug: - self.logger.debug("< GET %s HTTP/1.1", path) - for key, value in headers.raw_items(): - self.logger.debug("< %s: %s", key, value) - - self.path = path - self.request_headers = headers - - return path, headers - - def write_http_response( - self, status: http.HTTPStatus, headers: Headers, body: Optional[bytes] = None - ) -> None: - """ - Write status line and headers to the HTTP response. - - This coroutine is also able to write a response body. - - """ - self.response_headers = headers - - if self.debug: - self.logger.debug("> HTTP/1.1 %d %s", status.value, status.phrase) - for key, value in headers.raw_items(): - self.logger.debug("> %s: %s", key, value) - if body is not None: - self.logger.debug("> [body] (%d bytes)", len(body)) - - # Since the status line and headers only contain ASCII characters, - # we can keep this simple. - response = f"HTTP/1.1 {status.value} {status.phrase}\r\n" - response += str(headers) - - self.transport.write(response.encode()) - - if body is not None: - self.transport.write(body) - - async def process_request( - self, path: str, request_headers: Headers - ) -> Optional[HTTPResponse]: - """ - Intercept the HTTP request and return an HTTP response if appropriate. - - You may override this method in a :class:`WebSocketServerProtocol` - subclass, for example: - - * to return an HTTP 200 OK response on a given path; then a load - balancer can use this path for a health check; - * to authenticate the request and return an HTTP 401 Unauthorized or an - HTTP 403 Forbidden when authentication fails. - - You may also override this method with the ``process_request`` - argument of :func:`serve` and :class:`WebSocketServerProtocol`. This - is equivalent, except ``process_request`` won't have access to the - protocol instance, so it can't store information for later use. - - :meth:`process_request` is expected to complete quickly. If it may run - for a long time, then it should await :meth:`wait_closed` and exit if - :meth:`wait_closed` completes, or else it could prevent the server - from shutting down. - - Args: - path: request path, including optional query string. - request_headers: request headers. - - Returns: - Optional[Tuple[StatusLike, HeadersLike, bytes]]: :obj:`None` - to continue the WebSocket handshake normally. - - An HTTP response, represented by a 3-uple of the response status, - headers, and body, to abort the WebSocket handshake and return - that HTTP response instead. - - """ - if self._process_request is not None: - response = self._process_request(path, request_headers) - if isinstance(response, Awaitable): - return await response - else: - # For backwards compatibility with 7.0. - warnings.warn( - "declare process_request as a coroutine", DeprecationWarning - ) - return response - return None - - @staticmethod - def process_origin( - headers: Headers, origins: Optional[Sequence[Optional[Origin]]] = None - ) -> Optional[Origin]: - """ - Handle the Origin HTTP request header. - - Args: - headers: request headers. - origins: optional list of acceptable origins. - - Raises: - InvalidOrigin: if the origin isn't acceptable. - - """ - # "The user agent MUST NOT include more than one Origin header field" - # per https://www.rfc-editor.org/rfc/rfc6454.html#section-7.3. - try: - origin = cast(Optional[Origin], headers.get("Origin")) - except MultipleValuesError as exc: - raise InvalidHeader("Origin", "more than one Origin header found") from exc - if origins is not None: - if origin not in origins: - raise InvalidOrigin(origin) - return origin - - @staticmethod - def process_extensions( - headers: Headers, - available_extensions: Optional[Sequence[ServerExtensionFactory]], - ) -> Tuple[Optional[str], List[Extension]]: - """ - Handle the Sec-WebSocket-Extensions HTTP request header. - - Accept or reject each extension proposed in the client request. - Negotiate parameters for accepted extensions. - - Return the Sec-WebSocket-Extensions HTTP response header and the list - of accepted extensions. - - :rfc:`6455` leaves the rules up to the specification of each - :extension. - - To provide this level of flexibility, for each extension proposed by - the client, we check for a match with each extension available in the - server configuration. If no match is found, the extension is ignored. - - If several variants of the same extension are proposed by the client, - it may be accepted several times, which won't make sense in general. - Extensions must implement their own requirements. For this purpose, - the list of previously accepted extensions is provided. - - This process doesn't allow the server to reorder extensions. It can - only select a subset of the extensions proposed by the client. - - Other requirements, for example related to mandatory extensions or the - order of extensions, may be implemented by overriding this method. - - Args: - headers: request headers. - extensions: optional list of supported extensions. - - Raises: - InvalidHandshake: to abort the handshake with an HTTP 400 error. - - """ - response_header_value: Optional[str] = None - - extension_headers: List[ExtensionHeader] = [] - accepted_extensions: List[Extension] = [] - - header_values = headers.get_all("Sec-WebSocket-Extensions") - - if header_values and available_extensions: - parsed_header_values: List[ExtensionHeader] = sum( - [parse_extension(header_value) for header_value in header_values], [] - ) - - for name, request_params in parsed_header_values: - for ext_factory in available_extensions: - # Skip non-matching extensions based on their name. - if ext_factory.name != name: - continue - - # Skip non-matching extensions based on their params. - try: - response_params, extension = ext_factory.process_request_params( - request_params, accepted_extensions - ) - except NegotiationError: - continue - - # Add matching extension to the final list. - extension_headers.append((name, response_params)) - accepted_extensions.append(extension) - - # Break out of the loop once we have a match. - break - - # If we didn't break from the loop, no extension in our list - # matched what the client sent. The extension is declined. - - # Serialize extension header. - if extension_headers: - response_header_value = build_extension(extension_headers) - - return response_header_value, accepted_extensions - - # Not @staticmethod because it calls self.select_subprotocol() - def process_subprotocol( - self, headers: Headers, available_subprotocols: Optional[Sequence[Subprotocol]] - ) -> Optional[Subprotocol]: - """ - Handle the Sec-WebSocket-Protocol HTTP request header. - - Return Sec-WebSocket-Protocol HTTP response header, which is the same - as the selected subprotocol. - - Args: - headers: request headers. - available_subprotocols: optional list of supported subprotocols. - - Raises: - InvalidHandshake: to abort the handshake with an HTTP 400 error. - - """ - subprotocol: Optional[Subprotocol] = None - - header_values = headers.get_all("Sec-WebSocket-Protocol") - - if header_values and available_subprotocols: - parsed_header_values: List[Subprotocol] = sum( - [parse_subprotocol(header_value) for header_value in header_values], [] - ) - - subprotocol = self.select_subprotocol( - parsed_header_values, available_subprotocols - ) - - return subprotocol - - def select_subprotocol( - self, - client_subprotocols: Sequence[Subprotocol], - server_subprotocols: Sequence[Subprotocol], - ) -> Optional[Subprotocol]: - """ - Pick a subprotocol among those supported by the client and the server. - - If several subprotocols are available, select the preferred subprotocol - by giving equal weight to the preferences of the client and the server. - - If no subprotocol is available, proceed without a subprotocol. - - You may provide a ``select_subprotocol`` argument to :func:`serve` or - :class:`WebSocketServerProtocol` to override this logic. For example, - you could reject the handshake if the client doesn't support a - particular subprotocol, rather than accept the handshake without that - subprotocol. - - Args: - client_subprotocols: list of subprotocols offered by the client. - server_subprotocols: list of subprotocols available on the server. - - Returns: - Optional[Subprotocol]: Selected subprotocol, if a common subprotocol - was found. - - :obj:`None` to continue without a subprotocol. - - """ - if self._select_subprotocol is not None: - return self._select_subprotocol(client_subprotocols, server_subprotocols) - - subprotocols = set(client_subprotocols) & set(server_subprotocols) - if not subprotocols: - return None - return sorted( - subprotocols, - key=lambda p: client_subprotocols.index(p) + server_subprotocols.index(p), - )[0] - - async def handshake( - self, - origins: Optional[Sequence[Optional[Origin]]] = None, - available_extensions: Optional[Sequence[ServerExtensionFactory]] = None, - available_subprotocols: Optional[Sequence[Subprotocol]] = None, - extra_headers: Optional[HeadersLikeOrCallable] = None, - ) -> str: - """ - Perform the server side of the opening handshake. - - Args: - origins: list of acceptable values of the Origin HTTP header; - include :obj:`None` if the lack of an origin is acceptable. - extensions: list of supported extensions, in order in which they - should be tried. - subprotocols: list of supported subprotocols, in order of - decreasing preference. - extra_headers: arbitrary HTTP headers to add to the response when - the handshake succeeds. - - Returns: - str: path of the URI of the request. - - Raises: - InvalidHandshake: if the handshake fails. - - """ - path, request_headers = await self.read_http_request() - - # Hook for customizing request handling, for example checking - # authentication or treating some paths as plain HTTP endpoints. - early_response_awaitable = self.process_request(path, request_headers) - if isinstance(early_response_awaitable, Awaitable): - early_response = await early_response_awaitable - else: - # For backwards compatibility with 7.0. - warnings.warn("declare process_request as a coroutine", DeprecationWarning) - early_response = early_response_awaitable - - # The connection may drop while process_request is running. - if self.state is State.CLOSED: - # This subclass of ConnectionError is silently ignored in handler(). - raise BrokenPipeError("connection closed during opening handshake") - - # Change the response to a 503 error if the server is shutting down. - if not self.ws_server.is_serving(): - early_response = ( - http.HTTPStatus.SERVICE_UNAVAILABLE, - [], - b"Server is shutting down.\n", - ) - - if early_response is not None: - raise AbortHandshake(*early_response) - - key = check_request(request_headers) - - self.origin = self.process_origin(request_headers, origins) - - extensions_header, self.extensions = self.process_extensions( - request_headers, available_extensions - ) - - protocol_header = self.subprotocol = self.process_subprotocol( - request_headers, available_subprotocols - ) - - response_headers = Headers() - - build_response(response_headers, key) - - if extensions_header is not None: - response_headers["Sec-WebSocket-Extensions"] = extensions_header - - if protocol_header is not None: - response_headers["Sec-WebSocket-Protocol"] = protocol_header - - if callable(extra_headers): - extra_headers = extra_headers(path, self.request_headers) - if extra_headers is not None: - response_headers.update(extra_headers) - - response_headers.setdefault("Date", email.utils.formatdate(usegmt=True)) - if self.server_header is not None: - response_headers.setdefault("Server", self.server_header) - - self.write_http_response(http.HTTPStatus.SWITCHING_PROTOCOLS, response_headers) - - self.logger.info("connection open") - - self.connection_open() - - return path - - -class WebSocketServer: - """ - WebSocket server returned by :func:`serve`. - - This class provides the same interface as :class:`~asyncio.Server`, - notably the :meth:`~asyncio.Server.close` - and :meth:`~asyncio.Server.wait_closed` methods. - - It keeps track of WebSocket connections in order to close them properly - when shutting down. - - Args: - logger: Logger for this server. - It defaults to ``logging.getLogger("websockets.server")``. - See the :doc:`logging guide <../../topics/logging>` for details. - - """ - - def __init__(self, logger: Optional[LoggerLike] = None): - if logger is None: - logger = logging.getLogger("websockets.server") - self.logger = logger - - # Keep track of active connections. - self.websockets: Set[WebSocketServerProtocol] = set() - - # Task responsible for closing the server and terminating connections. - self.close_task: Optional[asyncio.Task[None]] = None - - # Completed when the server is closed and connections are terminated. - self.closed_waiter: asyncio.Future[None] - - def wrap(self, server: asyncio.base_events.Server) -> None: - """ - Attach to a given :class:`~asyncio.Server`. - - Since :meth:`~asyncio.loop.create_server` doesn't support injecting a - custom ``Server`` class, the easiest solution that doesn't rely on - private :mod:`asyncio` APIs is to: - - - instantiate a :class:`WebSocketServer` - - give the protocol factory a reference to that instance - - call :meth:`~asyncio.loop.create_server` with the factory - - attach the resulting :class:`~asyncio.Server` with this method - - """ - self.server = server - for sock in server.sockets: - if sock.family == socket.AF_INET: - name = "%s:%d" % sock.getsockname() - elif sock.family == socket.AF_INET6: - name = "[%s]:%d" % sock.getsockname()[:2] - elif sock.family == socket.AF_UNIX: - name = sock.getsockname() - # In the unlikely event that someone runs websockets over a - # protocol other than IP or Unix sockets, avoid crashing. - else: # pragma: no cover - name = str(sock.getsockname()) - self.logger.info("server listening on %s", name) - - # Initialized here because we need a reference to the event loop. - # This should be moved back to __init__ when dropping Python < 3.10. - self.closed_waiter = server.get_loop().create_future() - - def register(self, protocol: WebSocketServerProtocol) -> None: - """ - Register a connection with this server. - - """ - self.websockets.add(protocol) - - def unregister(self, protocol: WebSocketServerProtocol) -> None: - """ - Unregister a connection with this server. - - """ - self.websockets.remove(protocol) - - def close(self, close_connections: bool = True) -> None: - """ - Close the server. - - * Close the underlying :class:`~asyncio.Server`. - * When ``close_connections`` is :obj:`True`, which is the default, - close existing connections. Specifically: - - * Reject opening WebSocket connections with an HTTP 503 (service - unavailable) error. This happens when the server accepted the TCP - connection but didn't complete the opening handshake before closing. - * Close open WebSocket connections with close code 1001 (going away). - - * Wait until all connection handlers terminate. - - :meth:`close` is idempotent. - - """ - if self.close_task is None: - self.close_task = self.get_loop().create_task( - self._close(close_connections) - ) - - async def _close(self, close_connections: bool) -> None: - """ - Implementation of :meth:`close`. - - This calls :meth:`~asyncio.Server.close` on the underlying - :class:`~asyncio.Server` object to stop accepting new connections and - then closes open connections with close code 1001. - - """ - self.logger.info("server closing") - - # Stop accepting new connections. - self.server.close() - - # Wait until all accepted connections reach connection_made() and call - # register(). See https://bugs.python.org/issue34852 for details. - await asyncio.sleep(0) - - if close_connections: - # Close OPEN connections with close code 1001. After server.close(), - # handshake() closes OPENING connections with an HTTP 503 error. - close_tasks = [ - asyncio.create_task(websocket.close(1001)) - for websocket in self.websockets - if websocket.state is not State.CONNECTING - ] - # asyncio.wait doesn't accept an empty first argument. - if close_tasks: - await asyncio.wait(close_tasks) - - # Wait until all TCP connections are closed. - await self.server.wait_closed() - - # Wait until all connection handlers terminate. - # asyncio.wait doesn't accept an empty first argument. - if self.websockets: - await asyncio.wait( - [websocket.handler_task for websocket in self.websockets] - ) - - # Tell wait_closed() to return. - self.closed_waiter.set_result(None) - - self.logger.info("server closed") - - async def wait_closed(self) -> None: - """ - Wait until the server is closed. - - When :meth:`wait_closed` returns, all TCP connections are closed and - all connection handlers have returned. - - To ensure a fast shutdown, a connection handler should always be - awaiting at least one of: - - * :meth:`~WebSocketServerProtocol.recv`: when the connection is closed, - it raises :exc:`~websockets.exceptions.ConnectionClosedOK`; - * :meth:`~WebSocketServerProtocol.wait_closed`: when the connection is - closed, it returns. - - Then the connection handler is immediately notified of the shutdown; - it can clean up and exit. - - """ - await asyncio.shield(self.closed_waiter) - - def get_loop(self) -> asyncio.AbstractEventLoop: - """ - See :meth:`asyncio.Server.get_loop`. - - """ - return self.server.get_loop() - - def is_serving(self) -> bool: - """ - See :meth:`asyncio.Server.is_serving`. - - """ - return self.server.is_serving() - - async def start_serving(self) -> None: # pragma: no cover - """ - See :meth:`asyncio.Server.start_serving`. - - Typical use:: - - server = await serve(..., start_serving=False) - # perform additional setup here... - # ... then start the server - await server.start_serving() - - """ - await self.server.start_serving() - - async def serve_forever(self) -> None: # pragma: no cover - """ - See :meth:`asyncio.Server.serve_forever`. - - Typical use:: - - server = await serve(...) - # this coroutine doesn't return - # canceling it stops the server - await server.serve_forever() - - This is an alternative to using :func:`serve` as an asynchronous context - manager. Shutdown is triggered by canceling :meth:`serve_forever` - instead of exiting a :func:`serve` context. - - """ - await self.server.serve_forever() - - @property - def sockets(self) -> Iterable[socket.socket]: - """ - See :attr:`asyncio.Server.sockets`. - - """ - return self.server.sockets - - async def __aenter__(self) -> WebSocketServer: # pragma: no cover - return self - - async def __aexit__( - self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> None: # pragma: no cover - self.close() - await self.wait_closed() - - -class Serve: - """ - Start a WebSocket server listening on ``host`` and ``port``. - - Whenever a client connects, the server creates a - :class:`WebSocketServerProtocol`, performs the opening handshake, and - delegates to the connection handler, ``ws_handler``. - - The handler receives the :class:`WebSocketServerProtocol` and uses it to - send and receive messages. - - Once the handler completes, either normally or with an exception, the - server performs the closing handshake and closes the connection. - - Awaiting :func:`serve` yields a :class:`WebSocketServer`. This object - provides a :meth:`~WebSocketServer.close` method to shut down the server:: - - stop = asyncio.Future() # set this future to exit the server - - server = await serve(...) - await stop - await server.close() - - :func:`serve` can be used as an asynchronous context manager. Then, the - server is shut down automatically when exiting the context:: - - stop = asyncio.Future() # set this future to exit the server - - async with serve(...): - await stop - - Args: - ws_handler: Connection handler. It receives the WebSocket connection, - which is a :class:`WebSocketServerProtocol`, in argument. - host: Network interfaces the server binds to. - See :meth:`~asyncio.loop.create_server` for details. - port: TCP port the server listens on. - See :meth:`~asyncio.loop.create_server` for details. - create_protocol: Factory for the :class:`asyncio.Protocol` managing - the connection. It defaults to :class:`WebSocketServerProtocol`. - Set it to a wrapper or a subclass to customize connection handling. - logger: Logger for this server. - It defaults to ``logging.getLogger("websockets.server")``. - See the :doc:`logging guide <../../topics/logging>` for details. - compression: The "permessage-deflate" extension is enabled by default. - Set ``compression`` to :obj:`None` to disable it. See the - :doc:`compression guide <../../topics/compression>` for details. - origins: Acceptable values of the ``Origin`` header, for defending - against Cross-Site WebSocket Hijacking attacks. Include :obj:`None` - in the list if the lack of an origin is acceptable. - extensions: List of supported extensions, in order in which they - should be negotiated and run. - subprotocols: List of supported subprotocols, in order of decreasing - preference. - extra_headers (Union[HeadersLike, Callable[[str, Headers], HeadersLike]]): - Arbitrary HTTP headers to add to the response. This can be - a :data:`~websockets.datastructures.HeadersLike` or a callable - taking the request path and headers in arguments and returning - a :data:`~websockets.datastructures.HeadersLike`. - server_header: Value of the ``Server`` response header. - It defaults to ``"Python/x.y.z websockets/X.Y"``. - Setting it to :obj:`None` removes the header. - process_request (Optional[Callable[[str, Headers], \ - Awaitable[Optional[Tuple[StatusLike, HeadersLike, bytes]]]]]): - Intercept HTTP request before the opening handshake. - See :meth:`~WebSocketServerProtocol.process_request` for details. - select_subprotocol: Select a subprotocol supported by the client. - See :meth:`~WebSocketServerProtocol.select_subprotocol` for details. - open_timeout: Timeout for opening connections in seconds. - :obj:`None` disables the timeout. - - See :class:`~websockets.legacy.protocol.WebSocketCommonProtocol` for the - documentation of ``ping_interval``, ``ping_timeout``, ``close_timeout``, - ``max_size``, ``max_queue``, ``read_limit``, and ``write_limit``. - - Any other keyword arguments are passed the event loop's - :meth:`~asyncio.loop.create_server` method. - - For example: - - * You can set ``ssl`` to a :class:`~ssl.SSLContext` to enable TLS. - - * You can set ``sock`` to a :obj:`~socket.socket` that you created - outside of websockets. - - Returns: - WebSocketServer: WebSocket server. - - """ - - def __init__( - self, - ws_handler: Union[ - Callable[[WebSocketServerProtocol], Awaitable[Any]], - Callable[[WebSocketServerProtocol, str], Awaitable[Any]], # deprecated - ], - host: Optional[Union[str, Sequence[str]]] = None, - port: Optional[int] = None, - *, - create_protocol: Optional[Callable[..., WebSocketServerProtocol]] = None, - logger: Optional[LoggerLike] = None, - compression: Optional[str] = "deflate", - origins: Optional[Sequence[Optional[Origin]]] = None, - extensions: Optional[Sequence[ServerExtensionFactory]] = None, - subprotocols: Optional[Sequence[Subprotocol]] = None, - extra_headers: Optional[HeadersLikeOrCallable] = None, - server_header: Optional[str] = USER_AGENT, - process_request: Optional[ - Callable[[str, Headers], Awaitable[Optional[HTTPResponse]]] - ] = None, - select_subprotocol: Optional[ - Callable[[Sequence[Subprotocol], Sequence[Subprotocol]], Subprotocol] - ] = None, - open_timeout: Optional[float] = 10, - ping_interval: Optional[float] = 20, - ping_timeout: Optional[float] = 20, - close_timeout: Optional[float] = None, - max_size: Optional[int] = 2**20, - max_queue: Optional[int] = 2**5, - read_limit: int = 2**16, - write_limit: int = 2**16, - **kwargs: Any, - ) -> None: - # Backwards compatibility: close_timeout used to be called timeout. - timeout: Optional[float] = kwargs.pop("timeout", None) - if timeout is None: - timeout = 10 - else: - warnings.warn("rename timeout to close_timeout", DeprecationWarning) - # If both are specified, timeout is ignored. - if close_timeout is None: - close_timeout = timeout - - # Backwards compatibility: create_protocol used to be called klass. - klass: Optional[Type[WebSocketServerProtocol]] = kwargs.pop("klass", None) - if klass is None: - klass = WebSocketServerProtocol - else: - warnings.warn("rename klass to create_protocol", DeprecationWarning) - # If both are specified, klass is ignored. - if create_protocol is None: - create_protocol = klass - - # Backwards compatibility: recv() used to return None on closed connections - legacy_recv: bool = kwargs.pop("legacy_recv", False) - - # Backwards compatibility: the loop parameter used to be supported. - _loop: Optional[asyncio.AbstractEventLoop] = kwargs.pop("loop", None) - if _loop is None: - loop = asyncio.get_event_loop() - else: - loop = _loop - warnings.warn("remove loop argument", DeprecationWarning) - - ws_server = WebSocketServer(logger=logger) - - secure = kwargs.get("ssl") is not None - - if compression == "deflate": - extensions = enable_server_permessage_deflate(extensions) - elif compression is not None: - raise ValueError(f"unsupported compression: {compression}") - - if subprotocols is not None: - validate_subprotocols(subprotocols) - - factory = functools.partial( - create_protocol, - # For backwards compatibility with 10.0 or earlier. Done here in - # addition to WebSocketServerProtocol to trigger the deprecation - # warning once per serve() call rather than once per connection. - remove_path_argument(ws_handler), - ws_server, - host=host, - port=port, - secure=secure, - open_timeout=open_timeout, - ping_interval=ping_interval, - ping_timeout=ping_timeout, - close_timeout=close_timeout, - max_size=max_size, - max_queue=max_queue, - read_limit=read_limit, - write_limit=write_limit, - loop=_loop, - legacy_recv=legacy_recv, - origins=origins, - extensions=extensions, - subprotocols=subprotocols, - extra_headers=extra_headers, - server_header=server_header, - process_request=process_request, - select_subprotocol=select_subprotocol, - logger=logger, - ) - - if kwargs.pop("unix", False): - path: Optional[str] = kwargs.pop("path", None) - # unix_serve(path) must not specify host and port parameters. - assert host is None and port is None - create_server = functools.partial( - loop.create_unix_server, factory, path, **kwargs - ) - else: - create_server = functools.partial( - loop.create_server, factory, host, port, **kwargs - ) - - # This is a coroutine function. - self._create_server = create_server - self.ws_server = ws_server - - # async with serve(...) - - async def __aenter__(self) -> WebSocketServer: - return await self - - async def __aexit__( - self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> None: - self.ws_server.close() - await self.ws_server.wait_closed() - - # await serve(...) - - def __await__(self) -> Generator[Any, None, WebSocketServer]: - # Create a suitable iterator by calling __await__ on a coroutine. - return self.__await_impl__().__await__() - - async def __await_impl__(self) -> WebSocketServer: - server = await self._create_server() - self.ws_server.wrap(server) - return self.ws_server - - # yield from serve(...) - remove when dropping Python < 3.10 - - __iter__ = __await__ - - -serve = Serve - - -def unix_serve( - ws_handler: Union[ - Callable[[WebSocketServerProtocol], Awaitable[Any]], - Callable[[WebSocketServerProtocol, str], Awaitable[Any]], # deprecated - ], - path: Optional[str] = None, - **kwargs: Any, -) -> Serve: - """ - Start a WebSocket server listening on a Unix socket. - - This function is identical to :func:`serve`, except the ``host`` and - ``port`` arguments are replaced by ``path``. It is only available on Unix. - - Unrecognized keyword arguments are passed the event loop's - :meth:`~asyncio.loop.create_unix_server` method. - - It's useful for deploying a server behind a reverse proxy such as nginx. - - Args: - path: File system path to the Unix socket. - - """ - return serve(ws_handler, path=path, unix=True, **kwargs) - - -def remove_path_argument( - ws_handler: Union[ - Callable[[WebSocketServerProtocol], Awaitable[Any]], - Callable[[WebSocketServerProtocol, str], Awaitable[Any]], - ] -) -> Callable[[WebSocketServerProtocol], Awaitable[Any]]: - try: - inspect.signature(ws_handler).bind(None) - except TypeError: - try: - inspect.signature(ws_handler).bind(None, "") - except TypeError: # pragma: no cover - # ws_handler accepts neither one nor two arguments; leave it alone. - pass - else: - # ws_handler accepts two arguments; activate backwards compatibility. - - # Enable deprecation warning and announce deprecation in 11.0. - # warnings.warn("remove second argument of ws_handler", DeprecationWarning) - - async def _ws_handler(websocket: WebSocketServerProtocol) -> Any: - return await cast( - Callable[[WebSocketServerProtocol, str], Awaitable[Any]], - ws_handler, - )(websocket, websocket.path) - - return _ws_handler - - return cast( - Callable[[WebSocketServerProtocol], Awaitable[Any]], - ws_handler, - ) |