summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/websockets/legacy
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/websockets/legacy
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/websockets/legacy')
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__init__.py0
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/__init__.cpython-311.pycbin0 -> 201 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/async_timeout.cpython-311.pycbin0 -> 10608 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/auth.cpython-311.pycbin0 -> 8251 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/client.cpython-311.pycbin0 -> 28561 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/compatibility.cpython-311.pycbin0 -> 544 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/framing.cpython-311.pycbin0 -> 6960 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/handshake.cpython-311.pycbin0 -> 7863 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/http.cpython-311.pycbin0 -> 8261 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/protocol.cpython-311.pycbin0 -> 65525 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/server.cpython-311.pycbin0 -> 49251 bytes
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py265
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/auth.py184
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/client.py705
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/compatibility.py12
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/framing.py176
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/handshake.py165
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/http.py201
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/protocol.py1645
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/server.py1185
20 files changed, 4538 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__init__.py b/venv/lib/python3.11/site-packages/websockets/legacy/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__init__.py
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000..5384f2b
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/__init__.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/async_timeout.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/async_timeout.cpython-311.pyc
new file mode 100644
index 0000000..f3e18ca
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/async_timeout.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/auth.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/auth.cpython-311.pyc
new file mode 100644
index 0000000..e572023
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/auth.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/client.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/client.cpython-311.pyc
new file mode 100644
index 0000000..8ab650d
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/client.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/compatibility.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/compatibility.cpython-311.pyc
new file mode 100644
index 0000000..e65d9ec
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/compatibility.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/framing.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/framing.cpython-311.pyc
new file mode 100644
index 0000000..47c4426
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/framing.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/handshake.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/handshake.cpython-311.pyc
new file mode 100644
index 0000000..0db3ae7
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/handshake.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/http.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/http.cpython-311.pyc
new file mode 100644
index 0000000..2199c73
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/http.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/protocol.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/protocol.cpython-311.pyc
new file mode 100644
index 0000000..002cb1f
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/protocol.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/server.cpython-311.pyc b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/server.cpython-311.pyc
new file mode 100644
index 0000000..6319b6e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/__pycache__/server.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py b/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py
new file mode 100644
index 0000000..8264094
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py
@@ -0,0 +1,265 @@
+# From https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py
+# Licensed under the Apache License (Apache-2.0)
+
+import asyncio
+import enum
+import sys
+import warnings
+from types import TracebackType
+from typing import Optional, Type
+
+
+# From https://github.com/python/typing_extensions/blob/main/src/typing_extensions.py
+# Licensed under the Python Software Foundation License (PSF-2.0)
+
+if sys.version_info >= (3, 11):
+ from typing import final
+else:
+ # @final exists in 3.8+, but we backport it for all versions
+ # before 3.11 to keep support for the __final__ attribute.
+ # See https://bugs.python.org/issue46342
+ def final(f):
+ """This decorator can be used to indicate to type checkers that
+ the decorated method cannot be overridden, and decorated class
+ cannot be subclassed. For example:
+
+ class Base:
+ @final
+ def done(self) -> None:
+ ...
+ class Sub(Base):
+ def done(self) -> None: # Error reported by type checker
+ ...
+ @final
+ class Leaf:
+ ...
+ class Other(Leaf): # Error reported by type checker
+ ...
+
+ There is no runtime checking of these properties. The decorator
+ sets the ``__final__`` attribute to ``True`` on the decorated object
+ to allow runtime introspection.
+ """
+ try:
+ f.__final__ = True
+ except (AttributeError, TypeError):
+ # Skip the attribute silently if it is not writable.
+ # AttributeError happens if the object has __slots__ or a
+ # read-only property, TypeError if it's a builtin class.
+ pass
+ return f
+
+
+# End https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py
+
+__version__ = "4.0.2"
+
+
+__all__ = ("timeout", "timeout_at", "Timeout")
+
+
+def timeout(delay: Optional[float]) -> "Timeout":
+ """timeout context manager.
+
+ Useful in cases when you want to apply timeout logic around block
+ of code or in cases when asyncio.wait_for is not suitable. For example:
+
+ >>> async with timeout(0.001):
+ ... async with aiohttp.get('https://github.com') as r:
+ ... await r.text()
+
+
+ delay - value in seconds or None to disable timeout logic
+ """
+ loop = asyncio.get_running_loop()
+ if delay is not None:
+ deadline = loop.time() + delay # type: Optional[float]
+ else:
+ deadline = None
+ return Timeout(deadline, loop)
+
+
+def timeout_at(deadline: Optional[float]) -> "Timeout":
+ """Schedule the timeout at absolute time.
+
+ deadline argument points on the time in the same clock system
+ as loop.time().
+
+ Please note: it is not POSIX time but a time with
+ undefined starting base, e.g. the time of the system power on.
+
+ >>> async with timeout_at(loop.time() + 10):
+ ... async with aiohttp.get('https://github.com') as r:
+ ... await r.text()
+
+
+ """
+ loop = asyncio.get_running_loop()
+ return Timeout(deadline, loop)
+
+
+class _State(enum.Enum):
+ INIT = "INIT"
+ ENTER = "ENTER"
+ TIMEOUT = "TIMEOUT"
+ EXIT = "EXIT"
+
+
+@final
+class Timeout:
+ # Internal class, please don't instantiate it directly
+ # Use timeout() and timeout_at() public factories instead.
+ #
+ # Implementation note: `async with timeout()` is preferred
+ # over `with timeout()`.
+ # While technically the Timeout class implementation
+ # doesn't need to be async at all,
+ # the `async with` statement explicitly points that
+ # the context manager should be used from async function context.
+ #
+ # This design allows to avoid many silly misusages.
+ #
+ # TimeoutError is raised immediately when scheduled
+ # if the deadline is passed.
+ # The purpose is to time out as soon as possible
+ # without waiting for the next await expression.
+
+ __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler")
+
+ def __init__(
+ self, deadline: Optional[float], loop: asyncio.AbstractEventLoop
+ ) -> None:
+ self._loop = loop
+ self._state = _State.INIT
+
+ self._timeout_handler = None # type: Optional[asyncio.Handle]
+ if deadline is None:
+ self._deadline = None # type: Optional[float]
+ else:
+ self.update(deadline)
+
+ def __enter__(self) -> "Timeout":
+ warnings.warn(
+ "with timeout() is deprecated, use async with timeout() instead",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ self._do_enter()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> Optional[bool]:
+ self._do_exit(exc_type)
+ return None
+
+ async def __aenter__(self) -> "Timeout":
+ self._do_enter()
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> Optional[bool]:
+ self._do_exit(exc_type)
+ return None
+
+ @property
+ def expired(self) -> bool:
+ """Is timeout expired during execution?"""
+ return self._state == _State.TIMEOUT
+
+ @property
+ def deadline(self) -> Optional[float]:
+ return self._deadline
+
+ def reject(self) -> None:
+ """Reject scheduled timeout if any."""
+ # cancel is maybe better name but
+ # task.cancel() raises CancelledError in asyncio world.
+ if self._state not in (_State.INIT, _State.ENTER):
+ raise RuntimeError(f"invalid state {self._state.value}")
+ self._reject()
+
+ def _reject(self) -> None:
+ if self._timeout_handler is not None:
+ self._timeout_handler.cancel()
+ self._timeout_handler = None
+
+ def shift(self, delay: float) -> None:
+ """Advance timeout on delay seconds.
+
+ The delay can be negative.
+
+ Raise RuntimeError if shift is called when deadline is not scheduled
+ """
+ deadline = self._deadline
+ if deadline is None:
+ raise RuntimeError("cannot shift timeout if deadline is not scheduled")
+ self.update(deadline + delay)
+
+ def update(self, deadline: float) -> None:
+ """Set deadline to absolute value.
+
+ deadline argument points on the time in the same clock system
+ as loop.time().
+
+ If new deadline is in the past the timeout is raised immediately.
+
+ Please note: it is not POSIX time but a time with
+ undefined starting base, e.g. the time of the system power on.
+ """
+ if self._state == _State.EXIT:
+ raise RuntimeError("cannot reschedule after exit from context manager")
+ if self._state == _State.TIMEOUT:
+ raise RuntimeError("cannot reschedule expired timeout")
+ if self._timeout_handler is not None:
+ self._timeout_handler.cancel()
+ self._deadline = deadline
+ if self._state != _State.INIT:
+ self._reschedule()
+
+ def _reschedule(self) -> None:
+ assert self._state == _State.ENTER
+ deadline = self._deadline
+ if deadline is None:
+ return
+
+ now = self._loop.time()
+ if self._timeout_handler is not None:
+ self._timeout_handler.cancel()
+
+ task = asyncio.current_task()
+ if deadline <= now:
+ self._timeout_handler = self._loop.call_soon(self._on_timeout, task)
+ else:
+ self._timeout_handler = self._loop.call_at(deadline, self._on_timeout, task)
+
+ def _do_enter(self) -> None:
+ if self._state != _State.INIT:
+ raise RuntimeError(f"invalid state {self._state.value}")
+ self._state = _State.ENTER
+ self._reschedule()
+
+ def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None:
+ if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT:
+ self._timeout_handler = None
+ raise asyncio.TimeoutError
+ # timeout has not expired
+ self._state = _State.EXIT
+ self._reject()
+ return None
+
+ def _on_timeout(self, task: "asyncio.Task[None]") -> None:
+ task.cancel()
+ self._state = _State.TIMEOUT
+ # drop the reference early
+ self._timeout_handler = None
+
+
+# End https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/auth.py b/venv/lib/python3.11/site-packages/websockets/legacy/auth.py
new file mode 100644
index 0000000..d342583
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/auth.py
@@ -0,0 +1,184 @@
+from __future__ import annotations
+
+import functools
+import hmac
+import http
+from typing import Any, Awaitable, Callable, Iterable, Optional, Tuple, Union, cast
+
+from ..datastructures import Headers
+from ..exceptions import InvalidHeader
+from ..headers import build_www_authenticate_basic, parse_authorization_basic
+from .server import HTTPResponse, WebSocketServerProtocol
+
+
+__all__ = ["BasicAuthWebSocketServerProtocol", "basic_auth_protocol_factory"]
+
+Credentials = Tuple[str, str]
+
+
+def is_credentials(value: Any) -> bool:
+ try:
+ username, password = value
+ except (TypeError, ValueError):
+ return False
+ else:
+ return isinstance(username, str) and isinstance(password, str)
+
+
+class BasicAuthWebSocketServerProtocol(WebSocketServerProtocol):
+ """
+ WebSocket server protocol that enforces HTTP Basic Auth.
+
+ """
+
+ realm: str = ""
+ """
+ Scope of protection.
+
+ If provided, it should contain only ASCII characters because the
+ encoding of non-ASCII characters is undefined.
+ """
+
+ username: Optional[str] = None
+ """Username of the authenticated user."""
+
+ def __init__(
+ self,
+ *args: Any,
+ realm: Optional[str] = None,
+ check_credentials: Optional[Callable[[str, str], Awaitable[bool]]] = None,
+ **kwargs: Any,
+ ) -> None:
+ if realm is not None:
+ self.realm = realm # shadow class attribute
+ self._check_credentials = check_credentials
+ super().__init__(*args, **kwargs)
+
+ async def check_credentials(self, username: str, password: str) -> bool:
+ """
+ Check whether credentials are authorized.
+
+ This coroutine may be overridden in a subclass, for example to
+ authenticate against a database or an external service.
+
+ Args:
+ username: HTTP Basic Auth username.
+ password: HTTP Basic Auth password.
+
+ Returns:
+ bool: :obj:`True` if the handshake should continue;
+ :obj:`False` if it should fail with an HTTP 401 error.
+
+ """
+ if self._check_credentials is not None:
+ return await self._check_credentials(username, password)
+
+ return False
+
+ async def process_request(
+ self,
+ path: str,
+ request_headers: Headers,
+ ) -> Optional[HTTPResponse]:
+ """
+ Check HTTP Basic Auth and return an HTTP 401 response if needed.
+
+ """
+ try:
+ authorization = request_headers["Authorization"]
+ except KeyError:
+ return (
+ http.HTTPStatus.UNAUTHORIZED,
+ [("WWW-Authenticate", build_www_authenticate_basic(self.realm))],
+ b"Missing credentials\n",
+ )
+
+ try:
+ username, password = parse_authorization_basic(authorization)
+ except InvalidHeader:
+ return (
+ http.HTTPStatus.UNAUTHORIZED,
+ [("WWW-Authenticate", build_www_authenticate_basic(self.realm))],
+ b"Unsupported credentials\n",
+ )
+
+ if not await self.check_credentials(username, password):
+ return (
+ http.HTTPStatus.UNAUTHORIZED,
+ [("WWW-Authenticate", build_www_authenticate_basic(self.realm))],
+ b"Invalid credentials\n",
+ )
+
+ self.username = username
+
+ return await super().process_request(path, request_headers)
+
+
+def basic_auth_protocol_factory(
+ realm: Optional[str] = None,
+ credentials: Optional[Union[Credentials, Iterable[Credentials]]] = None,
+ check_credentials: Optional[Callable[[str, str], Awaitable[bool]]] = None,
+ create_protocol: Optional[Callable[..., BasicAuthWebSocketServerProtocol]] = None,
+) -> Callable[..., BasicAuthWebSocketServerProtocol]:
+ """
+ Protocol factory that enforces HTTP Basic Auth.
+
+ :func:`basic_auth_protocol_factory` is designed to integrate with
+ :func:`~websockets.server.serve` like this::
+
+ websockets.serve(
+ ...,
+ create_protocol=websockets.basic_auth_protocol_factory(
+ realm="my dev server",
+ credentials=("hello", "iloveyou"),
+ )
+ )
+
+ Args:
+ realm: Scope of protection. It should contain only ASCII characters
+ because the encoding of non-ASCII characters is undefined.
+ Refer to section 2.2 of :rfc:`7235` for details.
+ credentials: Hard coded authorized credentials. It can be a
+ ``(username, password)`` pair or a list of such pairs.
+ check_credentials: Coroutine that verifies credentials.
+ It receives ``username`` and ``password`` arguments
+ and returns a :class:`bool`. One of ``credentials`` or
+ ``check_credentials`` must be provided but not both.
+ create_protocol: Factory that creates the protocol. By default, this
+ is :class:`BasicAuthWebSocketServerProtocol`. It can be replaced
+ by a subclass.
+ Raises:
+ TypeError: If the ``credentials`` or ``check_credentials`` argument is
+ wrong.
+
+ """
+ if (credentials is None) == (check_credentials is None):
+ raise TypeError("provide either credentials or check_credentials")
+
+ if credentials is not None:
+ if is_credentials(credentials):
+ credentials_list = [cast(Credentials, credentials)]
+ elif isinstance(credentials, Iterable):
+ credentials_list = list(credentials)
+ if not all(is_credentials(item) for item in credentials_list):
+ raise TypeError(f"invalid credentials argument: {credentials}")
+ else:
+ raise TypeError(f"invalid credentials argument: {credentials}")
+
+ credentials_dict = dict(credentials_list)
+
+ async def check_credentials(username: str, password: str) -> bool:
+ try:
+ expected_password = credentials_dict[username]
+ except KeyError:
+ return False
+ return hmac.compare_digest(expected_password, password)
+
+ if create_protocol is None:
+ create_protocol = BasicAuthWebSocketServerProtocol
+
+ return functools.partial(
+ create_protocol,
+ realm=realm,
+ check_credentials=check_credentials,
+ )
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/client.py b/venv/lib/python3.11/site-packages/websockets/legacy/client.py
new file mode 100644
index 0000000..4862252
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/client.py
@@ -0,0 +1,705 @@
+from __future__ import annotations
+
+import asyncio
+import functools
+import logging
+import random
+import urllib.parse
+import warnings
+from types import TracebackType
+from typing import (
+ Any,
+ AsyncIterator,
+ Callable,
+ Generator,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ Type,
+ cast,
+)
+
+from ..datastructures import Headers, HeadersLike
+from ..exceptions import (
+ InvalidHandshake,
+ InvalidHeader,
+ InvalidMessage,
+ InvalidStatusCode,
+ NegotiationError,
+ RedirectHandshake,
+ SecurityError,
+)
+from ..extensions import ClientExtensionFactory, Extension
+from ..extensions.permessage_deflate import enable_client_permessage_deflate
+from ..headers import (
+ build_authorization_basic,
+ build_extension,
+ build_host,
+ build_subprotocol,
+ parse_extension,
+ parse_subprotocol,
+ validate_subprotocols,
+)
+from ..http import USER_AGENT
+from ..typing import ExtensionHeader, LoggerLike, Origin, Subprotocol
+from ..uri import WebSocketURI, parse_uri
+from .compatibility import asyncio_timeout
+from .handshake import build_request, check_response
+from .http import read_response
+from .protocol import WebSocketCommonProtocol
+
+
+__all__ = ["connect", "unix_connect", "WebSocketClientProtocol"]
+
+
+class WebSocketClientProtocol(WebSocketCommonProtocol):
+ """
+ WebSocket client connection.
+
+ :class:`WebSocketClientProtocol` provides :meth:`recv` and :meth:`send`
+ coroutines for receiving and sending messages.
+
+ It supports asynchronous iteration to receive incoming messages::
+
+ async for message in websocket:
+ await process(message)
+
+ The iterator exits normally when the connection is closed with close code
+ 1000 (OK) or 1001 (going away) or without a close code. It raises
+ a :exc:`~websockets.exceptions.ConnectionClosedError` when the connection
+ is closed with any other code.
+
+ See :func:`connect` for the documentation of ``logger``, ``origin``,
+ ``extensions``, ``subprotocols``, ``extra_headers``, and
+ ``user_agent_header``.
+
+ See :class:`~websockets.legacy.protocol.WebSocketCommonProtocol` for the
+ documentation of ``ping_interval``, ``ping_timeout``, ``close_timeout``,
+ ``max_size``, ``max_queue``, ``read_limit``, and ``write_limit``.
+
+ """
+
+ is_client = True
+ side = "client"
+
+ def __init__(
+ self,
+ *,
+ logger: Optional[LoggerLike] = None,
+ origin: Optional[Origin] = None,
+ extensions: Optional[Sequence[ClientExtensionFactory]] = None,
+ subprotocols: Optional[Sequence[Subprotocol]] = None,
+ extra_headers: Optional[HeadersLike] = None,
+ user_agent_header: Optional[str] = USER_AGENT,
+ **kwargs: Any,
+ ) -> None:
+ if logger is None:
+ logger = logging.getLogger("websockets.client")
+ super().__init__(logger=logger, **kwargs)
+ self.origin = origin
+ self.available_extensions = extensions
+ self.available_subprotocols = subprotocols
+ self.extra_headers = extra_headers
+ self.user_agent_header = user_agent_header
+
+ def write_http_request(self, path: str, headers: Headers) -> None:
+ """
+ Write request line and headers to the HTTP request.
+
+ """
+ self.path = path
+ self.request_headers = headers
+
+ if self.debug:
+ self.logger.debug("> GET %s HTTP/1.1", path)
+ for key, value in headers.raw_items():
+ self.logger.debug("> %s: %s", key, value)
+
+ # Since the path and headers only contain ASCII characters,
+ # we can keep this simple.
+ request = f"GET {path} HTTP/1.1\r\n"
+ request += str(headers)
+
+ self.transport.write(request.encode())
+
+ async def read_http_response(self) -> Tuple[int, Headers]:
+ """
+ Read status line and headers from the HTTP response.
+
+ If the response contains a body, it may be read from ``self.reader``
+ after this coroutine returns.
+
+ Raises:
+ InvalidMessage: If the HTTP message is malformed or isn't an
+ HTTP/1.1 GET response.
+
+ """
+ try:
+ status_code, reason, headers = await read_response(self.reader)
+ except Exception as exc:
+ raise InvalidMessage("did not receive a valid HTTP response") from exc
+
+ if self.debug:
+ self.logger.debug("< HTTP/1.1 %d %s", status_code, reason)
+ for key, value in headers.raw_items():
+ self.logger.debug("< %s: %s", key, value)
+
+ self.response_headers = headers
+
+ return status_code, self.response_headers
+
+ @staticmethod
+ def process_extensions(
+ headers: Headers,
+ available_extensions: Optional[Sequence[ClientExtensionFactory]],
+ ) -> List[Extension]:
+ """
+ Handle the Sec-WebSocket-Extensions HTTP response header.
+
+ Check that each extension is supported, as well as its parameters.
+
+ Return the list of accepted extensions.
+
+ Raise :exc:`~websockets.exceptions.InvalidHandshake` to abort the
+ connection.
+
+ :rfc:`6455` leaves the rules up to the specification of each
+ :extension.
+
+ To provide this level of flexibility, for each extension accepted by
+ the server, we check for a match with each extension available in the
+ client configuration. If no match is found, an exception is raised.
+
+ If several variants of the same extension are accepted by the server,
+ it may be configured several times, which won't make sense in general.
+ Extensions must implement their own requirements. For this purpose,
+ the list of previously accepted extensions is provided.
+
+ Other requirements, for example related to mandatory extensions or the
+ order of extensions, may be implemented by overriding this method.
+
+ """
+ accepted_extensions: List[Extension] = []
+
+ header_values = headers.get_all("Sec-WebSocket-Extensions")
+
+ if header_values:
+ if available_extensions is None:
+ raise InvalidHandshake("no extensions supported")
+
+ parsed_header_values: List[ExtensionHeader] = sum(
+ [parse_extension(header_value) for header_value in header_values], []
+ )
+
+ for name, response_params in parsed_header_values:
+ for extension_factory in available_extensions:
+ # Skip non-matching extensions based on their name.
+ if extension_factory.name != name:
+ continue
+
+ # Skip non-matching extensions based on their params.
+ try:
+ extension = extension_factory.process_response_params(
+ response_params, accepted_extensions
+ )
+ except NegotiationError:
+ continue
+
+ # Add matching extension to the final list.
+ accepted_extensions.append(extension)
+
+ # Break out of the loop once we have a match.
+ break
+
+ # If we didn't break from the loop, no extension in our list
+ # matched what the server sent. Fail the connection.
+ else:
+ raise NegotiationError(
+ f"Unsupported extension: "
+ f"name = {name}, params = {response_params}"
+ )
+
+ return accepted_extensions
+
+ @staticmethod
+ def process_subprotocol(
+ headers: Headers, available_subprotocols: Optional[Sequence[Subprotocol]]
+ ) -> Optional[Subprotocol]:
+ """
+ Handle the Sec-WebSocket-Protocol HTTP response header.
+
+ Check that it contains exactly one supported subprotocol.
+
+ Return the selected subprotocol.
+
+ """
+ subprotocol: Optional[Subprotocol] = None
+
+ header_values = headers.get_all("Sec-WebSocket-Protocol")
+
+ if header_values:
+ if available_subprotocols is None:
+ raise InvalidHandshake("no subprotocols supported")
+
+ parsed_header_values: Sequence[Subprotocol] = sum(
+ [parse_subprotocol(header_value) for header_value in header_values], []
+ )
+
+ if len(parsed_header_values) > 1:
+ subprotocols = ", ".join(parsed_header_values)
+ raise InvalidHandshake(f"multiple subprotocols: {subprotocols}")
+
+ subprotocol = parsed_header_values[0]
+
+ if subprotocol not in available_subprotocols:
+ raise NegotiationError(f"unsupported subprotocol: {subprotocol}")
+
+ return subprotocol
+
+ async def handshake(
+ self,
+ wsuri: WebSocketURI,
+ origin: Optional[Origin] = None,
+ available_extensions: Optional[Sequence[ClientExtensionFactory]] = None,
+ available_subprotocols: Optional[Sequence[Subprotocol]] = None,
+ extra_headers: Optional[HeadersLike] = None,
+ ) -> None:
+ """
+ Perform the client side of the opening handshake.
+
+ Args:
+ wsuri: URI of the WebSocket server.
+ origin: Value of the ``Origin`` header.
+ extensions: List of supported extensions, in order in which they
+ should be negotiated and run.
+ subprotocols: List of supported subprotocols, in order of decreasing
+ preference.
+ extra_headers: Arbitrary HTTP headers to add to the handshake request.
+
+ Raises:
+ InvalidHandshake: If the handshake fails.
+
+ """
+ request_headers = Headers()
+
+ request_headers["Host"] = build_host(wsuri.host, wsuri.port, wsuri.secure)
+
+ if wsuri.user_info:
+ request_headers["Authorization"] = build_authorization_basic(
+ *wsuri.user_info
+ )
+
+ if origin is not None:
+ request_headers["Origin"] = origin
+
+ key = build_request(request_headers)
+
+ if available_extensions is not None:
+ extensions_header = build_extension(
+ [
+ (extension_factory.name, extension_factory.get_request_params())
+ for extension_factory in available_extensions
+ ]
+ )
+ request_headers["Sec-WebSocket-Extensions"] = extensions_header
+
+ if available_subprotocols is not None:
+ protocol_header = build_subprotocol(available_subprotocols)
+ request_headers["Sec-WebSocket-Protocol"] = protocol_header
+
+ if self.extra_headers is not None:
+ request_headers.update(self.extra_headers)
+
+ if self.user_agent_header is not None:
+ request_headers.setdefault("User-Agent", self.user_agent_header)
+
+ self.write_http_request(wsuri.resource_name, request_headers)
+
+ status_code, response_headers = await self.read_http_response()
+ if status_code in (301, 302, 303, 307, 308):
+ if "Location" not in response_headers:
+ raise InvalidHeader("Location")
+ raise RedirectHandshake(response_headers["Location"])
+ elif status_code != 101:
+ raise InvalidStatusCode(status_code, response_headers)
+
+ check_response(response_headers, key)
+
+ self.extensions = self.process_extensions(
+ response_headers, available_extensions
+ )
+
+ self.subprotocol = self.process_subprotocol(
+ response_headers, available_subprotocols
+ )
+
+ self.connection_open()
+
+
+class Connect:
+ """
+ Connect to the WebSocket server at ``uri``.
+
+ Awaiting :func:`connect` yields a :class:`WebSocketClientProtocol` which
+ can then be used to send and receive messages.
+
+ :func:`connect` can be used as a asynchronous context manager::
+
+ async with websockets.connect(...) as websocket:
+ ...
+
+ The connection is closed automatically when exiting the context.
+
+ :func:`connect` can be used as an infinite asynchronous iterator to
+ reconnect automatically on errors::
+
+ async for websocket in websockets.connect(...):
+ try:
+ ...
+ except websockets.ConnectionClosed:
+ continue
+
+ The connection is closed automatically after each iteration of the loop.
+
+ If an error occurs while establishing the connection, :func:`connect`
+ retries with exponential backoff. The backoff delay starts at three
+ seconds and increases up to one minute.
+
+ If an error occurs in the body of the loop, you can handle the exception
+ and :func:`connect` will reconnect with the next iteration; or you can
+ let the exception bubble up and break out of the loop. This lets you
+ decide which errors trigger a reconnection and which errors are fatal.
+
+ Args:
+ uri: URI of the WebSocket server.
+ create_protocol: Factory for the :class:`asyncio.Protocol` managing
+ the connection. It defaults to :class:`WebSocketClientProtocol`.
+ Set it to a wrapper or a subclass to customize connection handling.
+ logger: Logger for this client.
+ It defaults to ``logging.getLogger("websockets.client")``.
+ See the :doc:`logging guide <../../topics/logging>` for details.
+ compression: The "permessage-deflate" extension is enabled by default.
+ Set ``compression`` to :obj:`None` to disable it. See the
+ :doc:`compression guide <../../topics/compression>` for details.
+ origin: Value of the ``Origin`` header, for servers that require it.
+ extensions: List of supported extensions, in order in which they
+ should be negotiated and run.
+ subprotocols: List of supported subprotocols, in order of decreasing
+ preference.
+ extra_headers: Arbitrary HTTP headers to add to the handshake request.
+ user_agent_header: Value of the ``User-Agent`` request header.
+ It defaults to ``"Python/x.y.z websockets/X.Y"``.
+ Setting it to :obj:`None` removes the header.
+ open_timeout: Timeout for opening the connection in seconds.
+ :obj:`None` disables the timeout.
+
+ See :class:`~websockets.legacy.protocol.WebSocketCommonProtocol` for the
+ documentation of ``ping_interval``, ``ping_timeout``, ``close_timeout``,
+ ``max_size``, ``max_queue``, ``read_limit``, and ``write_limit``.
+
+ Any other keyword arguments are passed the event loop's
+ :meth:`~asyncio.loop.create_connection` method.
+
+ For example:
+
+ * You can set ``ssl`` to a :class:`~ssl.SSLContext` to enforce TLS
+ settings. When connecting to a ``wss://`` URI, if ``ssl`` isn't
+ provided, a TLS context is created
+ with :func:`~ssl.create_default_context`.
+
+ * You can set ``host`` and ``port`` to connect to a different host and
+ port from those found in ``uri``. This only changes the destination of
+ the TCP connection. The host name from ``uri`` is still used in the TLS
+ handshake for secure connections and in the ``Host`` header.
+
+ Raises:
+ InvalidURI: If ``uri`` isn't a valid WebSocket URI.
+ OSError: If the TCP connection fails.
+ InvalidHandshake: If the opening handshake fails.
+ ~asyncio.TimeoutError: If the opening handshake times out.
+
+ """
+
+ MAX_REDIRECTS_ALLOWED = 10
+
+ def __init__(
+ self,
+ uri: str,
+ *,
+ create_protocol: Optional[Callable[..., WebSocketClientProtocol]] = None,
+ logger: Optional[LoggerLike] = None,
+ compression: Optional[str] = "deflate",
+ origin: Optional[Origin] = None,
+ extensions: Optional[Sequence[ClientExtensionFactory]] = None,
+ subprotocols: Optional[Sequence[Subprotocol]] = None,
+ extra_headers: Optional[HeadersLike] = None,
+ user_agent_header: Optional[str] = USER_AGENT,
+ open_timeout: Optional[float] = 10,
+ ping_interval: Optional[float] = 20,
+ ping_timeout: Optional[float] = 20,
+ close_timeout: Optional[float] = None,
+ max_size: Optional[int] = 2**20,
+ max_queue: Optional[int] = 2**5,
+ read_limit: int = 2**16,
+ write_limit: int = 2**16,
+ **kwargs: Any,
+ ) -> None:
+ # Backwards compatibility: close_timeout used to be called timeout.
+ timeout: Optional[float] = kwargs.pop("timeout", None)
+ if timeout is None:
+ timeout = 10
+ else:
+ warnings.warn("rename timeout to close_timeout", DeprecationWarning)
+ # If both are specified, timeout is ignored.
+ if close_timeout is None:
+ close_timeout = timeout
+
+ # Backwards compatibility: create_protocol used to be called klass.
+ klass: Optional[Type[WebSocketClientProtocol]] = kwargs.pop("klass", None)
+ if klass is None:
+ klass = WebSocketClientProtocol
+ else:
+ warnings.warn("rename klass to create_protocol", DeprecationWarning)
+ # If both are specified, klass is ignored.
+ if create_protocol is None:
+ create_protocol = klass
+
+ # Backwards compatibility: recv() used to return None on closed connections
+ legacy_recv: bool = kwargs.pop("legacy_recv", False)
+
+ # Backwards compatibility: the loop parameter used to be supported.
+ _loop: Optional[asyncio.AbstractEventLoop] = kwargs.pop("loop", None)
+ if _loop is None:
+ loop = asyncio.get_event_loop()
+ else:
+ loop = _loop
+ warnings.warn("remove loop argument", DeprecationWarning)
+
+ wsuri = parse_uri(uri)
+ if wsuri.secure:
+ kwargs.setdefault("ssl", True)
+ elif kwargs.get("ssl") is not None:
+ raise ValueError(
+ "connect() received a ssl argument for a ws:// URI, "
+ "use a wss:// URI to enable TLS"
+ )
+
+ if compression == "deflate":
+ extensions = enable_client_permessage_deflate(extensions)
+ elif compression is not None:
+ raise ValueError(f"unsupported compression: {compression}")
+
+ if subprotocols is not None:
+ validate_subprotocols(subprotocols)
+
+ factory = functools.partial(
+ create_protocol,
+ logger=logger,
+ origin=origin,
+ extensions=extensions,
+ subprotocols=subprotocols,
+ extra_headers=extra_headers,
+ user_agent_header=user_agent_header,
+ ping_interval=ping_interval,
+ ping_timeout=ping_timeout,
+ close_timeout=close_timeout,
+ max_size=max_size,
+ max_queue=max_queue,
+ read_limit=read_limit,
+ write_limit=write_limit,
+ host=wsuri.host,
+ port=wsuri.port,
+ secure=wsuri.secure,
+ legacy_recv=legacy_recv,
+ loop=_loop,
+ )
+
+ if kwargs.pop("unix", False):
+ path: Optional[str] = kwargs.pop("path", None)
+ create_connection = functools.partial(
+ loop.create_unix_connection, factory, path, **kwargs
+ )
+ else:
+ host: Optional[str]
+ port: Optional[int]
+ if kwargs.get("sock") is None:
+ host, port = wsuri.host, wsuri.port
+ else:
+ # If sock is given, host and port shouldn't be specified.
+ host, port = None, None
+ if kwargs.get("ssl"):
+ kwargs.setdefault("server_hostname", wsuri.host)
+ # If host and port are given, override values from the URI.
+ host = kwargs.pop("host", host)
+ port = kwargs.pop("port", port)
+ create_connection = functools.partial(
+ loop.create_connection, factory, host, port, **kwargs
+ )
+
+ self.open_timeout = open_timeout
+ if logger is None:
+ logger = logging.getLogger("websockets.client")
+ self.logger = logger
+
+ # This is a coroutine function.
+ self._create_connection = create_connection
+ self._uri = uri
+ self._wsuri = wsuri
+
+ def handle_redirect(self, uri: str) -> None:
+ # Update the state of this instance to connect to a new URI.
+ old_uri = self._uri
+ old_wsuri = self._wsuri
+ new_uri = urllib.parse.urljoin(old_uri, uri)
+ new_wsuri = parse_uri(new_uri)
+
+ # Forbid TLS downgrade.
+ if old_wsuri.secure and not new_wsuri.secure:
+ raise SecurityError("redirect from WSS to WS")
+
+ same_origin = (
+ old_wsuri.host == new_wsuri.host and old_wsuri.port == new_wsuri.port
+ )
+
+ # Rewrite the host and port arguments for cross-origin redirects.
+ # This preserves connection overrides with the host and port
+ # arguments if the redirect points to the same host and port.
+ if not same_origin:
+ # Replace the host and port argument passed to the protocol factory.
+ factory = self._create_connection.args[0]
+ factory = functools.partial(
+ factory.func,
+ *factory.args,
+ **dict(factory.keywords, host=new_wsuri.host, port=new_wsuri.port),
+ )
+ # Replace the host and port argument passed to create_connection.
+ self._create_connection = functools.partial(
+ self._create_connection.func,
+ *(factory, new_wsuri.host, new_wsuri.port),
+ **self._create_connection.keywords,
+ )
+
+ # Set the new WebSocket URI. This suffices for same-origin redirects.
+ self._uri = new_uri
+ self._wsuri = new_wsuri
+
+ # async for ... in connect(...):
+
+ BACKOFF_MIN = 1.92
+ BACKOFF_MAX = 60.0
+ BACKOFF_FACTOR = 1.618
+ BACKOFF_INITIAL = 5
+
+ async def __aiter__(self) -> AsyncIterator[WebSocketClientProtocol]:
+ backoff_delay = self.BACKOFF_MIN
+ while True:
+ try:
+ async with self as protocol:
+ yield protocol
+ except Exception:
+ # Add a random initial delay between 0 and 5 seconds.
+ # See 7.2.3. Recovering from Abnormal Closure in RFC 6544.
+ if backoff_delay == self.BACKOFF_MIN:
+ initial_delay = random.random() * self.BACKOFF_INITIAL
+ self.logger.info(
+ "! connect failed; reconnecting in %.1f seconds",
+ initial_delay,
+ exc_info=True,
+ )
+ await asyncio.sleep(initial_delay)
+ else:
+ self.logger.info(
+ "! connect failed again; retrying in %d seconds",
+ int(backoff_delay),
+ exc_info=True,
+ )
+ await asyncio.sleep(int(backoff_delay))
+ # Increase delay with truncated exponential backoff.
+ backoff_delay = backoff_delay * self.BACKOFF_FACTOR
+ backoff_delay = min(backoff_delay, self.BACKOFF_MAX)
+ continue
+ else:
+ # Connection succeeded - reset backoff delay
+ backoff_delay = self.BACKOFF_MIN
+
+ # async with connect(...) as ...:
+
+ async def __aenter__(self) -> WebSocketClientProtocol:
+ return await self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> None:
+ await self.protocol.close()
+
+ # ... = await connect(...)
+
+ def __await__(self) -> Generator[Any, None, WebSocketClientProtocol]:
+ # Create a suitable iterator by calling __await__ on a coroutine.
+ return self.__await_impl_timeout__().__await__()
+
+ async def __await_impl_timeout__(self) -> WebSocketClientProtocol:
+ async with asyncio_timeout(self.open_timeout):
+ return await self.__await_impl__()
+
+ async def __await_impl__(self) -> WebSocketClientProtocol:
+ for redirects in range(self.MAX_REDIRECTS_ALLOWED):
+ _transport, _protocol = await self._create_connection()
+ protocol = cast(WebSocketClientProtocol, _protocol)
+ try:
+ await protocol.handshake(
+ self._wsuri,
+ origin=protocol.origin,
+ available_extensions=protocol.available_extensions,
+ available_subprotocols=protocol.available_subprotocols,
+ extra_headers=protocol.extra_headers,
+ )
+ except RedirectHandshake as exc:
+ protocol.fail_connection()
+ await protocol.wait_closed()
+ self.handle_redirect(exc.uri)
+ # Avoid leaking a connected socket when the handshake fails.
+ except (Exception, asyncio.CancelledError):
+ protocol.fail_connection()
+ await protocol.wait_closed()
+ raise
+ else:
+ self.protocol = protocol
+ return protocol
+ else:
+ raise SecurityError("too many redirects")
+
+ # ... = yield from connect(...) - remove when dropping Python < 3.10
+
+ __iter__ = __await__
+
+
+connect = Connect
+
+
+def unix_connect(
+ path: Optional[str] = None,
+ uri: str = "ws://localhost/",
+ **kwargs: Any,
+) -> Connect:
+ """
+ Similar to :func:`connect`, but for connecting to a Unix socket.
+
+ This function builds upon the event loop's
+ :meth:`~asyncio.loop.create_unix_connection` method.
+
+ It is only available on Unix.
+
+ It's mainly useful for debugging servers listening on Unix sockets.
+
+ Args:
+ path: File system path to the Unix socket.
+ uri: URI of the WebSocket server; the host is used in the TLS
+ handshake for secure connections and in the ``Host`` header.
+
+ """
+ return connect(uri=uri, path=path, unix=True, **kwargs)
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/compatibility.py b/venv/lib/python3.11/site-packages/websockets/legacy/compatibility.py
new file mode 100644
index 0000000..6bd01e7
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/compatibility.py
@@ -0,0 +1,12 @@
+from __future__ import annotations
+
+import sys
+
+
+__all__ = ["asyncio_timeout"]
+
+
+if sys.version_info[:2] >= (3, 11):
+ from asyncio import timeout as asyncio_timeout # noqa: F401
+else:
+ from .async_timeout import timeout as asyncio_timeout # noqa: F401
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/framing.py b/venv/lib/python3.11/site-packages/websockets/legacy/framing.py
new file mode 100644
index 0000000..b77b869
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/framing.py
@@ -0,0 +1,176 @@
+from __future__ import annotations
+
+import struct
+from typing import Any, Awaitable, Callable, NamedTuple, Optional, Sequence, Tuple
+
+from .. import extensions, frames
+from ..exceptions import PayloadTooBig, ProtocolError
+
+
+try:
+ from ..speedups import apply_mask
+except ImportError:
+ from ..utils import apply_mask
+
+
+class Frame(NamedTuple):
+ fin: bool
+ opcode: frames.Opcode
+ data: bytes
+ rsv1: bool = False
+ rsv2: bool = False
+ rsv3: bool = False
+
+ @property
+ def new_frame(self) -> frames.Frame:
+ return frames.Frame(
+ self.opcode,
+ self.data,
+ self.fin,
+ self.rsv1,
+ self.rsv2,
+ self.rsv3,
+ )
+
+ def __str__(self) -> str:
+ return str(self.new_frame)
+
+ def check(self) -> None:
+ return self.new_frame.check()
+
+ @classmethod
+ async def read(
+ cls,
+ reader: Callable[[int], Awaitable[bytes]],
+ *,
+ mask: bool,
+ max_size: Optional[int] = None,
+ extensions: Optional[Sequence[extensions.Extension]] = None,
+ ) -> Frame:
+ """
+ Read a WebSocket frame.
+
+ Args:
+ reader: Coroutine that reads exactly the requested number of
+ bytes, unless the end of file is reached.
+ mask: Whether the frame should be masked i.e. whether the read
+ happens on the server side.
+ max_size: Maximum payload size in bytes.
+ extensions: List of extensions, applied in reverse order.
+
+ Raises:
+ PayloadTooBig: If the frame exceeds ``max_size``.
+ ProtocolError: If the frame contains incorrect values.
+
+ """
+
+ # Read the header.
+ data = await reader(2)
+ head1, head2 = struct.unpack("!BB", data)
+
+ # While not Pythonic, this is marginally faster than calling bool().
+ fin = True if head1 & 0b10000000 else False
+ rsv1 = True if head1 & 0b01000000 else False
+ rsv2 = True if head1 & 0b00100000 else False
+ rsv3 = True if head1 & 0b00010000 else False
+
+ try:
+ opcode = frames.Opcode(head1 & 0b00001111)
+ except ValueError as exc:
+ raise ProtocolError("invalid opcode") from exc
+
+ if (True if head2 & 0b10000000 else False) != mask:
+ raise ProtocolError("incorrect masking")
+
+ length = head2 & 0b01111111
+ if length == 126:
+ data = await reader(2)
+ (length,) = struct.unpack("!H", data)
+ elif length == 127:
+ data = await reader(8)
+ (length,) = struct.unpack("!Q", data)
+ if max_size is not None and length > max_size:
+ raise PayloadTooBig(f"over size limit ({length} > {max_size} bytes)")
+ if mask:
+ mask_bits = await reader(4)
+
+ # Read the data.
+ data = await reader(length)
+ if mask:
+ data = apply_mask(data, mask_bits)
+
+ new_frame = frames.Frame(opcode, data, fin, rsv1, rsv2, rsv3)
+
+ if extensions is None:
+ extensions = []
+ for extension in reversed(extensions):
+ new_frame = extension.decode(new_frame, max_size=max_size)
+
+ new_frame.check()
+
+ return cls(
+ new_frame.fin,
+ new_frame.opcode,
+ new_frame.data,
+ new_frame.rsv1,
+ new_frame.rsv2,
+ new_frame.rsv3,
+ )
+
+ def write(
+ self,
+ write: Callable[[bytes], Any],
+ *,
+ mask: bool,
+ extensions: Optional[Sequence[extensions.Extension]] = None,
+ ) -> None:
+ """
+ Write a WebSocket frame.
+
+ Args:
+ frame: Frame to write.
+ write: Function that writes bytes.
+ mask: Whether the frame should be masked i.e. whether the write
+ happens on the client side.
+ extensions: List of extensions, applied in order.
+
+ Raises:
+ ProtocolError: If the frame contains incorrect values.
+
+ """
+ # The frame is written in a single call to write in order to prevent
+ # TCP fragmentation. See #68 for details. This also makes it safe to
+ # send frames concurrently from multiple coroutines.
+ write(self.new_frame.serialize(mask=mask, extensions=extensions))
+
+
+# Backwards compatibility with previously documented public APIs
+from ..frames import ( # noqa: E402, F401, I001
+ Close,
+ prepare_ctrl as encode_data,
+ prepare_data,
+)
+
+
+def parse_close(data: bytes) -> Tuple[int, str]:
+ """
+ Parse the payload from a close frame.
+
+ Returns:
+ Close code and reason.
+
+ Raises:
+ ProtocolError: If data is ill-formed.
+ UnicodeDecodeError: If the reason isn't valid UTF-8.
+
+ """
+ close = Close.parse(data)
+ return close.code, close.reason
+
+
+def serialize_close(code: int, reason: str) -> bytes:
+ """
+ Serialize the payload for a close frame.
+
+ """
+ return Close(code, reason).serialize()
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/handshake.py b/venv/lib/python3.11/site-packages/websockets/legacy/handshake.py
new file mode 100644
index 0000000..ad8faf0
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/handshake.py
@@ -0,0 +1,165 @@
+from __future__ import annotations
+
+import base64
+import binascii
+from typing import List
+
+from ..datastructures import Headers, MultipleValuesError
+from ..exceptions import InvalidHeader, InvalidHeaderValue, InvalidUpgrade
+from ..headers import parse_connection, parse_upgrade
+from ..typing import ConnectionOption, UpgradeProtocol
+from ..utils import accept_key as accept, generate_key
+
+
+__all__ = ["build_request", "check_request", "build_response", "check_response"]
+
+
+def build_request(headers: Headers) -> str:
+ """
+ Build a handshake request to send to the server.
+
+ Update request headers passed in argument.
+
+ Args:
+ headers: Handshake request headers.
+
+ Returns:
+ str: ``key`` that must be passed to :func:`check_response`.
+
+ """
+ key = generate_key()
+ headers["Upgrade"] = "websocket"
+ headers["Connection"] = "Upgrade"
+ headers["Sec-WebSocket-Key"] = key
+ headers["Sec-WebSocket-Version"] = "13"
+ return key
+
+
+def check_request(headers: Headers) -> str:
+ """
+ Check a handshake request received from the client.
+
+ This function doesn't verify that the request is an HTTP/1.1 or higher GET
+ request and doesn't perform ``Host`` and ``Origin`` checks. These controls
+ are usually performed earlier in the HTTP request handling code. They're
+ the responsibility of the caller.
+
+ Args:
+ headers: Handshake request headers.
+
+ Returns:
+ str: ``key`` that must be passed to :func:`build_response`.
+
+ Raises:
+ InvalidHandshake: If the handshake request is invalid.
+ Then, the server must return a 400 Bad Request error.
+
+ """
+ connection: List[ConnectionOption] = sum(
+ [parse_connection(value) for value in headers.get_all("Connection")], []
+ )
+
+ if not any(value.lower() == "upgrade" for value in connection):
+ raise InvalidUpgrade("Connection", ", ".join(connection))
+
+ upgrade: List[UpgradeProtocol] = sum(
+ [parse_upgrade(value) for value in headers.get_all("Upgrade")], []
+ )
+
+ # For compatibility with non-strict implementations, ignore case when
+ # checking the Upgrade header. The RFC always uses "websocket", except
+ # in section 11.2. (IANA registration) where it uses "WebSocket".
+ if not (len(upgrade) == 1 and upgrade[0].lower() == "websocket"):
+ raise InvalidUpgrade("Upgrade", ", ".join(upgrade))
+
+ try:
+ s_w_key = headers["Sec-WebSocket-Key"]
+ except KeyError as exc:
+ raise InvalidHeader("Sec-WebSocket-Key") from exc
+ except MultipleValuesError as exc:
+ raise InvalidHeader(
+ "Sec-WebSocket-Key", "more than one Sec-WebSocket-Key header found"
+ ) from exc
+
+ try:
+ raw_key = base64.b64decode(s_w_key.encode(), validate=True)
+ except binascii.Error as exc:
+ raise InvalidHeaderValue("Sec-WebSocket-Key", s_w_key) from exc
+ if len(raw_key) != 16:
+ raise InvalidHeaderValue("Sec-WebSocket-Key", s_w_key)
+
+ try:
+ s_w_version = headers["Sec-WebSocket-Version"]
+ except KeyError as exc:
+ raise InvalidHeader("Sec-WebSocket-Version") from exc
+ except MultipleValuesError as exc:
+ raise InvalidHeader(
+ "Sec-WebSocket-Version", "more than one Sec-WebSocket-Version header found"
+ ) from exc
+
+ if s_w_version != "13":
+ raise InvalidHeaderValue("Sec-WebSocket-Version", s_w_version)
+
+ return s_w_key
+
+
+def build_response(headers: Headers, key: str) -> None:
+ """
+ Build a handshake response to send to the client.
+
+ Update response headers passed in argument.
+
+ Args:
+ headers: Handshake response headers.
+ key: Returned by :func:`check_request`.
+
+ """
+ headers["Upgrade"] = "websocket"
+ headers["Connection"] = "Upgrade"
+ headers["Sec-WebSocket-Accept"] = accept(key)
+
+
+def check_response(headers: Headers, key: str) -> None:
+ """
+ Check a handshake response received from the server.
+
+ This function doesn't verify that the response is an HTTP/1.1 or higher
+ response with a 101 status code. These controls are the responsibility of
+ the caller.
+
+ Args:
+ headers: Handshake response headers.
+ key: Returned by :func:`build_request`.
+
+ Raises:
+ InvalidHandshake: If the handshake response is invalid.
+
+ """
+ connection: List[ConnectionOption] = sum(
+ [parse_connection(value) for value in headers.get_all("Connection")], []
+ )
+
+ if not any(value.lower() == "upgrade" for value in connection):
+ raise InvalidUpgrade("Connection", " ".join(connection))
+
+ upgrade: List[UpgradeProtocol] = sum(
+ [parse_upgrade(value) for value in headers.get_all("Upgrade")], []
+ )
+
+ # For compatibility with non-strict implementations, ignore case when
+ # checking the Upgrade header. The RFC always uses "websocket", except
+ # in section 11.2. (IANA registration) where it uses "WebSocket".
+ if not (len(upgrade) == 1 and upgrade[0].lower() == "websocket"):
+ raise InvalidUpgrade("Upgrade", ", ".join(upgrade))
+
+ try:
+ s_w_accept = headers["Sec-WebSocket-Accept"]
+ except KeyError as exc:
+ raise InvalidHeader("Sec-WebSocket-Accept") from exc
+ except MultipleValuesError as exc:
+ raise InvalidHeader(
+ "Sec-WebSocket-Accept", "more than one Sec-WebSocket-Accept header found"
+ ) from exc
+
+ if s_w_accept != accept(key):
+ raise InvalidHeaderValue("Sec-WebSocket-Accept", s_w_accept)
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/http.py b/venv/lib/python3.11/site-packages/websockets/legacy/http.py
new file mode 100644
index 0000000..2ac7f70
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/http.py
@@ -0,0 +1,201 @@
+from __future__ import annotations
+
+import asyncio
+import re
+from typing import Tuple
+
+from ..datastructures import Headers
+from ..exceptions import SecurityError
+
+
+__all__ = ["read_request", "read_response"]
+
+MAX_HEADERS = 128
+MAX_LINE = 8192
+
+
+def d(value: bytes) -> str:
+ """
+ Decode a bytestring for interpolating into an error message.
+
+ """
+ return value.decode(errors="backslashreplace")
+
+
+# See https://www.rfc-editor.org/rfc/rfc7230.html#appendix-B.
+
+# Regex for validating header names.
+
+_token_re = re.compile(rb"[-!#$%&\'*+.^_`|~0-9a-zA-Z]+")
+
+# Regex for validating header values.
+
+# We don't attempt to support obsolete line folding.
+
+# Include HTAB (\x09), SP (\x20), VCHAR (\x21-\x7e), obs-text (\x80-\xff).
+
+# The ABNF is complicated because it attempts to express that optional
+# whitespace is ignored. We strip whitespace and don't revalidate that.
+
+# See also https://www.rfc-editor.org/errata_search.php?rfc=7230&eid=4189
+
+_value_re = re.compile(rb"[\x09\x20-\x7e\x80-\xff]*")
+
+
+async def read_request(stream: asyncio.StreamReader) -> Tuple[str, Headers]:
+ """
+ Read an HTTP/1.1 GET request and return ``(path, headers)``.
+
+ ``path`` isn't URL-decoded or validated in any way.
+
+ ``path`` and ``headers`` are expected to contain only ASCII characters.
+ Other characters are represented with surrogate escapes.
+
+ :func:`read_request` doesn't attempt to read the request body because
+ WebSocket handshake requests don't have one. If the request contains a
+ body, it may be read from ``stream`` after this coroutine returns.
+
+ Args:
+ stream: Input to read the request from.
+
+ Raises:
+ EOFError: If the connection is closed without a full HTTP request.
+ SecurityError: If the request exceeds a security limit.
+ ValueError: If the request isn't well formatted.
+
+ """
+ # https://www.rfc-editor.org/rfc/rfc7230.html#section-3.1.1
+
+ # Parsing is simple because fixed values are expected for method and
+ # version and because path isn't checked. Since WebSocket software tends
+ # to implement HTTP/1.1 strictly, there's little need for lenient parsing.
+
+ try:
+ request_line = await read_line(stream)
+ except EOFError as exc:
+ raise EOFError("connection closed while reading HTTP request line") from exc
+
+ try:
+ method, raw_path, version = request_line.split(b" ", 2)
+ except ValueError: # not enough values to unpack (expected 3, got 1-2)
+ raise ValueError(f"invalid HTTP request line: {d(request_line)}") from None
+
+ if method != b"GET":
+ raise ValueError(f"unsupported HTTP method: {d(method)}")
+ if version != b"HTTP/1.1":
+ raise ValueError(f"unsupported HTTP version: {d(version)}")
+ path = raw_path.decode("ascii", "surrogateescape")
+
+ headers = await read_headers(stream)
+
+ return path, headers
+
+
+async def read_response(stream: asyncio.StreamReader) -> Tuple[int, str, Headers]:
+ """
+ Read an HTTP/1.1 response and return ``(status_code, reason, headers)``.
+
+ ``reason`` and ``headers`` are expected to contain only ASCII characters.
+ Other characters are represented with surrogate escapes.
+
+ :func:`read_request` doesn't attempt to read the response body because
+ WebSocket handshake responses don't have one. If the response contains a
+ body, it may be read from ``stream`` after this coroutine returns.
+
+ Args:
+ stream: Input to read the response from.
+
+ Raises:
+ EOFError: If the connection is closed without a full HTTP response.
+ SecurityError: If the response exceeds a security limit.
+ ValueError: If the response isn't well formatted.
+
+ """
+ # https://www.rfc-editor.org/rfc/rfc7230.html#section-3.1.2
+
+ # As in read_request, parsing is simple because a fixed value is expected
+ # for version, status_code is a 3-digit number, and reason can be ignored.
+
+ try:
+ status_line = await read_line(stream)
+ except EOFError as exc:
+ raise EOFError("connection closed while reading HTTP status line") from exc
+
+ try:
+ version, raw_status_code, raw_reason = status_line.split(b" ", 2)
+ except ValueError: # not enough values to unpack (expected 3, got 1-2)
+ raise ValueError(f"invalid HTTP status line: {d(status_line)}") from None
+
+ if version != b"HTTP/1.1":
+ raise ValueError(f"unsupported HTTP version: {d(version)}")
+ try:
+ status_code = int(raw_status_code)
+ except ValueError: # invalid literal for int() with base 10
+ raise ValueError(f"invalid HTTP status code: {d(raw_status_code)}") from None
+ if not 100 <= status_code < 1000:
+ raise ValueError(f"unsupported HTTP status code: {d(raw_status_code)}")
+ if not _value_re.fullmatch(raw_reason):
+ raise ValueError(f"invalid HTTP reason phrase: {d(raw_reason)}")
+ reason = raw_reason.decode()
+
+ headers = await read_headers(stream)
+
+ return status_code, reason, headers
+
+
+async def read_headers(stream: asyncio.StreamReader) -> Headers:
+ """
+ Read HTTP headers from ``stream``.
+
+ Non-ASCII characters are represented with surrogate escapes.
+
+ """
+ # https://www.rfc-editor.org/rfc/rfc7230.html#section-3.2
+
+ # We don't attempt to support obsolete line folding.
+
+ headers = Headers()
+ for _ in range(MAX_HEADERS + 1):
+ try:
+ line = await read_line(stream)
+ except EOFError as exc:
+ raise EOFError("connection closed while reading HTTP headers") from exc
+ if line == b"":
+ break
+
+ try:
+ raw_name, raw_value = line.split(b":", 1)
+ except ValueError: # not enough values to unpack (expected 2, got 1)
+ raise ValueError(f"invalid HTTP header line: {d(line)}") from None
+ if not _token_re.fullmatch(raw_name):
+ raise ValueError(f"invalid HTTP header name: {d(raw_name)}")
+ raw_value = raw_value.strip(b" \t")
+ if not _value_re.fullmatch(raw_value):
+ raise ValueError(f"invalid HTTP header value: {d(raw_value)}")
+
+ name = raw_name.decode("ascii") # guaranteed to be ASCII at this point
+ value = raw_value.decode("ascii", "surrogateescape")
+ headers[name] = value
+
+ else:
+ raise SecurityError("too many HTTP headers")
+
+ return headers
+
+
+async def read_line(stream: asyncio.StreamReader) -> bytes:
+ """
+ Read a single line from ``stream``.
+
+ CRLF is stripped from the return value.
+
+ """
+ # Security: this is bounded by the StreamReader's limit (default = 32 KiB).
+ line = await stream.readline()
+ # Security: this guarantees header values are small (hard-coded = 8 KiB)
+ if len(line) > MAX_LINE:
+ raise SecurityError("line too long")
+ # Not mandatory but safe - https://www.rfc-editor.org/rfc/rfc7230.html#section-3.5
+ if not line.endswith(b"\r\n"):
+ raise EOFError("line without CRLF")
+ return line[:-2]
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/protocol.py b/venv/lib/python3.11/site-packages/websockets/legacy/protocol.py
new file mode 100644
index 0000000..19cee0e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/protocol.py
@@ -0,0 +1,1645 @@
+from __future__ import annotations
+
+import asyncio
+import codecs
+import collections
+import logging
+import random
+import ssl
+import struct
+import sys
+import time
+import uuid
+import warnings
+from typing import (
+ Any,
+ AsyncIterable,
+ AsyncIterator,
+ Awaitable,
+ Callable,
+ Deque,
+ Dict,
+ Iterable,
+ List,
+ Mapping,
+ Optional,
+ Tuple,
+ Union,
+ cast,
+)
+
+from ..datastructures import Headers
+from ..exceptions import (
+ ConnectionClosed,
+ ConnectionClosedError,
+ ConnectionClosedOK,
+ InvalidState,
+ PayloadTooBig,
+ ProtocolError,
+)
+from ..extensions import Extension
+from ..frames import (
+ OK_CLOSE_CODES,
+ OP_BINARY,
+ OP_CLOSE,
+ OP_CONT,
+ OP_PING,
+ OP_PONG,
+ OP_TEXT,
+ Close,
+ CloseCode,
+ Opcode,
+ prepare_ctrl,
+ prepare_data,
+)
+from ..protocol import State
+from ..typing import Data, LoggerLike, Subprotocol
+from .compatibility import asyncio_timeout
+from .framing import Frame
+
+
+__all__ = ["WebSocketCommonProtocol", "broadcast"]
+
+
+# In order to ensure consistency, the code always checks the current value of
+# WebSocketCommonProtocol.state before assigning a new value and never yields
+# between the check and the assignment.
+
+
+class WebSocketCommonProtocol(asyncio.Protocol):
+ """
+ WebSocket connection.
+
+ :class:`WebSocketCommonProtocol` provides APIs shared between WebSocket
+ servers and clients. You shouldn't use it directly. Instead, use
+ :class:`~websockets.client.WebSocketClientProtocol` or
+ :class:`~websockets.server.WebSocketServerProtocol`.
+
+ This documentation focuses on low-level details that aren't covered in the
+ documentation of :class:`~websockets.client.WebSocketClientProtocol` and
+ :class:`~websockets.server.WebSocketServerProtocol` for the sake of
+ simplicity.
+
+ Once the connection is open, a Ping_ frame is sent every ``ping_interval``
+ seconds. This serves as a keepalive. It helps keeping the connection open,
+ especially in the presence of proxies with short timeouts on inactive
+ connections. Set ``ping_interval`` to :obj:`None` to disable this behavior.
+
+ .. _Ping: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.5.2
+
+ If the corresponding Pong_ frame isn't received within ``ping_timeout``
+ seconds, the connection is considered unusable and is closed with code 1011.
+ This ensures that the remote endpoint remains responsive. Set
+ ``ping_timeout`` to :obj:`None` to disable this behavior.
+
+ .. _Pong: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.5.3
+
+ See the discussion of :doc:`timeouts <../../topics/timeouts>` for details.
+
+ The ``close_timeout`` parameter defines a maximum wait time for completing
+ the closing handshake and terminating the TCP connection. For legacy
+ reasons, :meth:`close` completes in at most ``5 * close_timeout`` seconds
+ for clients and ``4 * close_timeout`` for servers.
+
+ ``close_timeout`` is a parameter of the protocol because websockets usually
+ calls :meth:`close` implicitly upon exit:
+
+ * on the client side, when using :func:`~websockets.client.connect` as a
+ context manager;
+ * on the server side, when the connection handler terminates.
+
+ To apply a timeout to any other API, wrap it in :func:`~asyncio.timeout` or
+ :func:`~asyncio.wait_for`.
+
+ The ``max_size`` parameter enforces the maximum size for incoming messages
+ in bytes. The default value is 1 MiB. If a larger message is received,
+ :meth:`recv` will raise :exc:`~websockets.exceptions.ConnectionClosedError`
+ and the connection will be closed with code 1009.
+
+ The ``max_queue`` parameter sets the maximum length of the queue that
+ holds incoming messages. The default value is ``32``. Messages are added
+ to an in-memory queue when they're received; then :meth:`recv` pops from
+ that queue. In order to prevent excessive memory consumption when
+ messages are received faster than they can be processed, the queue must
+ be bounded. If the queue fills up, the protocol stops processing incoming
+ data until :meth:`recv` is called. In this situation, various receive
+ buffers (at least in :mod:`asyncio` and in the OS) will fill up, then the
+ TCP receive window will shrink, slowing down transmission to avoid packet
+ loss.
+
+ Since Python can use up to 4 bytes of memory to represent a single
+ character, each connection may use up to ``4 * max_size * max_queue``
+ bytes of memory to store incoming messages. By default, this is 128 MiB.
+ You may want to lower the limits, depending on your application's
+ requirements.
+
+ The ``read_limit`` argument sets the high-water limit of the buffer for
+ incoming bytes. The low-water limit is half the high-water limit. The
+ default value is 64 KiB, half of asyncio's default (based on the current
+ implementation of :class:`~asyncio.StreamReader`).
+
+ The ``write_limit`` argument sets the high-water limit of the buffer for
+ outgoing bytes. The low-water limit is a quarter of the high-water limit.
+ The default value is 64 KiB, equal to asyncio's default (based on the
+ current implementation of ``FlowControlMixin``).
+
+ See the discussion of :doc:`memory usage <../../topics/memory>` for details.
+
+ Args:
+ logger: Logger for this server.
+ It defaults to ``logging.getLogger("websockets.protocol")``.
+ See the :doc:`logging guide <../../topics/logging>` for details.
+ ping_interval: Delay between keepalive pings in seconds.
+ :obj:`None` disables keepalive pings.
+ ping_timeout: Timeout for keepalive pings in seconds.
+ :obj:`None` disables timeouts.
+ close_timeout: Timeout for closing the connection in seconds.
+ For legacy reasons, the actual timeout is 4 or 5 times larger.
+ max_size: Maximum size of incoming messages in bytes.
+ :obj:`None` disables the limit.
+ max_queue: Maximum number of incoming messages in receive buffer.
+ :obj:`None` disables the limit.
+ read_limit: High-water mark of read buffer in bytes.
+ write_limit: High-water mark of write buffer in bytes.
+
+ """
+
+ # There are only two differences between the client-side and server-side
+ # behavior: masking the payload and closing the underlying TCP connection.
+ # Set is_client = True/False and side = "client"/"server" to pick a side.
+ is_client: bool
+ side: str = "undefined"
+
+ def __init__(
+ self,
+ *,
+ logger: Optional[LoggerLike] = None,
+ ping_interval: Optional[float] = 20,
+ ping_timeout: Optional[float] = 20,
+ close_timeout: Optional[float] = None,
+ max_size: Optional[int] = 2**20,
+ max_queue: Optional[int] = 2**5,
+ read_limit: int = 2**16,
+ write_limit: int = 2**16,
+ # The following arguments are kept only for backwards compatibility.
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ secure: Optional[bool] = None,
+ legacy_recv: bool = False,
+ loop: Optional[asyncio.AbstractEventLoop] = None,
+ timeout: Optional[float] = None,
+ ) -> None:
+ if legacy_recv: # pragma: no cover
+ warnings.warn("legacy_recv is deprecated", DeprecationWarning)
+
+ # Backwards compatibility: close_timeout used to be called timeout.
+ if timeout is None:
+ timeout = 10
+ else:
+ warnings.warn("rename timeout to close_timeout", DeprecationWarning)
+ # If both are specified, timeout is ignored.
+ if close_timeout is None:
+ close_timeout = timeout
+
+ # Backwards compatibility: the loop parameter used to be supported.
+ if loop is None:
+ loop = asyncio.get_event_loop()
+ else:
+ warnings.warn("remove loop argument", DeprecationWarning)
+
+ self.ping_interval = ping_interval
+ self.ping_timeout = ping_timeout
+ self.close_timeout = close_timeout
+ self.max_size = max_size
+ self.max_queue = max_queue
+ self.read_limit = read_limit
+ self.write_limit = write_limit
+
+ # Unique identifier. For logs.
+ self.id: uuid.UUID = uuid.uuid4()
+ """Unique identifier of the connection. Useful in logs."""
+
+ # Logger or LoggerAdapter for this connection.
+ if logger is None:
+ logger = logging.getLogger("websockets.protocol")
+ self.logger: LoggerLike = logging.LoggerAdapter(logger, {"websocket": self})
+ """Logger for this connection."""
+
+ # Track if DEBUG is enabled. Shortcut logging calls if it isn't.
+ self.debug = logger.isEnabledFor(logging.DEBUG)
+
+ self.loop = loop
+
+ self._host = host
+ self._port = port
+ self._secure = secure
+ self.legacy_recv = legacy_recv
+
+ # Configure read buffer limits. The high-water limit is defined by
+ # ``self.read_limit``. The ``limit`` argument controls the line length
+ # limit and half the buffer limit of :class:`~asyncio.StreamReader`.
+ # That's why it must be set to half of ``self.read_limit``.
+ self.reader = asyncio.StreamReader(limit=read_limit // 2, loop=loop)
+
+ # Copied from asyncio.FlowControlMixin
+ self._paused = False
+ self._drain_waiter: Optional[asyncio.Future[None]] = None
+
+ self._drain_lock = asyncio.Lock()
+
+ # This class implements the data transfer and closing handshake, which
+ # are shared between the client-side and the server-side.
+ # Subclasses implement the opening handshake and, on success, execute
+ # :meth:`connection_open` to change the state to OPEN.
+ self.state = State.CONNECTING
+ if self.debug:
+ self.logger.debug("= connection is CONNECTING")
+
+ # HTTP protocol parameters.
+ self.path: str
+ """Path of the opening handshake request."""
+ self.request_headers: Headers
+ """Opening handshake request headers."""
+ self.response_headers: Headers
+ """Opening handshake response headers."""
+
+ # WebSocket protocol parameters.
+ self.extensions: List[Extension] = []
+ self.subprotocol: Optional[Subprotocol] = None
+ """Subprotocol, if one was negotiated."""
+
+ # Close code and reason, set when a close frame is sent or received.
+ self.close_rcvd: Optional[Close] = None
+ self.close_sent: Optional[Close] = None
+ self.close_rcvd_then_sent: Optional[bool] = None
+
+ # Completed when the connection state becomes CLOSED. Translates the
+ # :meth:`connection_lost` callback to a :class:`~asyncio.Future`
+ # that can be awaited. (Other :class:`~asyncio.Protocol` callbacks are
+ # translated by ``self.stream_reader``).
+ self.connection_lost_waiter: asyncio.Future[None] = loop.create_future()
+
+ # Queue of received messages.
+ self.messages: Deque[Data] = collections.deque()
+ self._pop_message_waiter: Optional[asyncio.Future[None]] = None
+ self._put_message_waiter: Optional[asyncio.Future[None]] = None
+
+ # Protect sending fragmented messages.
+ self._fragmented_message_waiter: Optional[asyncio.Future[None]] = None
+
+ # Mapping of ping IDs to pong waiters, in chronological order.
+ self.pings: Dict[bytes, Tuple[asyncio.Future[float], float]] = {}
+
+ self.latency: float = 0
+ """
+ Latency of the connection, in seconds.
+
+ This value is updated after sending a ping frame and receiving a
+ matching pong frame. Before the first ping, :attr:`latency` is ``0``.
+
+ By default, websockets enables a :ref:`keepalive <keepalive>` mechanism
+ that sends ping frames automatically at regular intervals. You can also
+ send ping frames and measure latency with :meth:`ping`.
+ """
+
+ # Task running the data transfer.
+ self.transfer_data_task: asyncio.Task[None]
+
+ # Exception that occurred during data transfer, if any.
+ self.transfer_data_exc: Optional[BaseException] = None
+
+ # Task sending keepalive pings.
+ self.keepalive_ping_task: asyncio.Task[None]
+
+ # Task closing the TCP connection.
+ self.close_connection_task: asyncio.Task[None]
+
+ # Copied from asyncio.FlowControlMixin
+ async def _drain_helper(self) -> None: # pragma: no cover
+ if self.connection_lost_waiter.done():
+ raise ConnectionResetError("Connection lost")
+ if not self._paused:
+ return
+ waiter = self._drain_waiter
+ assert waiter is None or waiter.cancelled()
+ waiter = self.loop.create_future()
+ self._drain_waiter = waiter
+ await waiter
+
+ # Copied from asyncio.StreamWriter
+ async def _drain(self) -> None: # pragma: no cover
+ if self.reader is not None:
+ exc = self.reader.exception()
+ if exc is not None:
+ raise exc
+ if self.transport is not None:
+ if self.transport.is_closing():
+ # Yield to the event loop so connection_lost() may be
+ # called. Without this, _drain_helper() would return
+ # immediately, and code that calls
+ # write(...); yield from drain()
+ # in a loop would never call connection_lost(), so it
+ # would not see an error when the socket is closed.
+ await asyncio.sleep(0)
+ await self._drain_helper()
+
+ def connection_open(self) -> None:
+ """
+ Callback when the WebSocket opening handshake completes.
+
+ Enter the OPEN state and start the data transfer phase.
+
+ """
+ # 4.1. The WebSocket Connection is Established.
+ assert self.state is State.CONNECTING
+ self.state = State.OPEN
+ if self.debug:
+ self.logger.debug("= connection is OPEN")
+ # Start the task that receives incoming WebSocket messages.
+ self.transfer_data_task = self.loop.create_task(self.transfer_data())
+ # Start the task that sends pings at regular intervals.
+ self.keepalive_ping_task = self.loop.create_task(self.keepalive_ping())
+ # Start the task that eventually closes the TCP connection.
+ self.close_connection_task = self.loop.create_task(self.close_connection())
+
+ @property
+ def host(self) -> Optional[str]:
+ alternative = "remote_address" if self.is_client else "local_address"
+ warnings.warn(f"use {alternative}[0] instead of host", DeprecationWarning)
+ return self._host
+
+ @property
+ def port(self) -> Optional[int]:
+ alternative = "remote_address" if self.is_client else "local_address"
+ warnings.warn(f"use {alternative}[1] instead of port", DeprecationWarning)
+ return self._port
+
+ @property
+ def secure(self) -> Optional[bool]:
+ warnings.warn("don't use secure", DeprecationWarning)
+ return self._secure
+
+ # Public API
+
+ @property
+ def local_address(self) -> Any:
+ """
+ Local address of the connection.
+
+ For IPv4 connections, this is a ``(host, port)`` tuple.
+
+ The format of the address depends on the address family;
+ see :meth:`~socket.socket.getsockname`.
+
+ :obj:`None` if the TCP connection isn't established yet.
+
+ """
+ try:
+ transport = self.transport
+ except AttributeError:
+ return None
+ else:
+ return transport.get_extra_info("sockname")
+
+ @property
+ def remote_address(self) -> Any:
+ """
+ Remote address of the connection.
+
+ For IPv4 connections, this is a ``(host, port)`` tuple.
+
+ The format of the address depends on the address family;
+ see :meth:`~socket.socket.getpeername`.
+
+ :obj:`None` if the TCP connection isn't established yet.
+
+ """
+ try:
+ transport = self.transport
+ except AttributeError:
+ return None
+ else:
+ return transport.get_extra_info("peername")
+
+ @property
+ def open(self) -> bool:
+ """
+ :obj:`True` when the connection is open; :obj:`False` otherwise.
+
+ This attribute may be used to detect disconnections. However, this
+ approach is discouraged per the EAFP_ principle. Instead, you should
+ handle :exc:`~websockets.exceptions.ConnectionClosed` exceptions.
+
+ .. _EAFP: https://docs.python.org/3/glossary.html#term-eafp
+
+ """
+ return self.state is State.OPEN and not self.transfer_data_task.done()
+
+ @property
+ def closed(self) -> bool:
+ """
+ :obj:`True` when the connection is closed; :obj:`False` otherwise.
+
+ Be aware that both :attr:`open` and :attr:`closed` are :obj:`False`
+ during the opening and closing sequences.
+
+ """
+ return self.state is State.CLOSED
+
+ @property
+ def close_code(self) -> Optional[int]:
+ """
+ WebSocket close code, defined in `section 7.1.5 of RFC 6455`_.
+
+ .. _section 7.1.5 of RFC 6455:
+ https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5
+
+ :obj:`None` if the connection isn't closed yet.
+
+ """
+ if self.state is not State.CLOSED:
+ return None
+ elif self.close_rcvd is None:
+ return CloseCode.ABNORMAL_CLOSURE
+ else:
+ return self.close_rcvd.code
+
+ @property
+ def close_reason(self) -> Optional[str]:
+ """
+ WebSocket close reason, defined in `section 7.1.6 of RFC 6455`_.
+
+ .. _section 7.1.6 of RFC 6455:
+ https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.6
+
+ :obj:`None` if the connection isn't closed yet.
+
+ """
+ if self.state is not State.CLOSED:
+ return None
+ elif self.close_rcvd is None:
+ return ""
+ else:
+ return self.close_rcvd.reason
+
+ async def __aiter__(self) -> AsyncIterator[Data]:
+ """
+ Iterate on incoming messages.
+
+ The iterator exits normally when the connection is closed with the close
+ code 1000 (OK) or 1001 (going away) or without a close code.
+
+ It raises a :exc:`~websockets.exceptions.ConnectionClosedError`
+ exception when the connection is closed with any other code.
+
+ """
+ try:
+ while True:
+ yield await self.recv()
+ except ConnectionClosedOK:
+ return
+
+ async def recv(self) -> Data:
+ """
+ Receive the next message.
+
+ When the connection is closed, :meth:`recv` raises
+ :exc:`~websockets.exceptions.ConnectionClosed`. Specifically, it raises
+ :exc:`~websockets.exceptions.ConnectionClosedOK` after a normal
+ connection closure and
+ :exc:`~websockets.exceptions.ConnectionClosedError` after a protocol
+ error or a network failure. This is how you detect the end of the
+ message stream.
+
+ Canceling :meth:`recv` is safe. There's no risk of losing the next
+ message. The next invocation of :meth:`recv` will return it.
+
+ This makes it possible to enforce a timeout by wrapping :meth:`recv` in
+ :func:`~asyncio.timeout` or :func:`~asyncio.wait_for`.
+
+ Returns:
+ Data: A string (:class:`str`) for a Text_ frame. A bytestring
+ (:class:`bytes`) for a Binary_ frame.
+
+ .. _Text: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
+ .. _Binary: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
+
+ Raises:
+ ConnectionClosed: When the connection is closed.
+ RuntimeError: If two coroutines call :meth:`recv` concurrently.
+
+ """
+ if self._pop_message_waiter is not None:
+ raise RuntimeError(
+ "cannot call recv while another coroutine "
+ "is already waiting for the next message"
+ )
+
+ # Don't await self.ensure_open() here:
+ # - messages could be available in the queue even if the connection
+ # is closed;
+ # - messages could be received before the closing frame even if the
+ # connection is closing.
+
+ # Wait until there's a message in the queue (if necessary) or the
+ # connection is closed.
+ while len(self.messages) <= 0:
+ pop_message_waiter: asyncio.Future[None] = self.loop.create_future()
+ self._pop_message_waiter = pop_message_waiter
+ try:
+ # If asyncio.wait() is canceled, it doesn't cancel
+ # pop_message_waiter and self.transfer_data_task.
+ await asyncio.wait(
+ [pop_message_waiter, self.transfer_data_task],
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+ finally:
+ self._pop_message_waiter = None
+
+ # If asyncio.wait(...) exited because self.transfer_data_task
+ # completed before receiving a new message, raise a suitable
+ # exception (or return None if legacy_recv is enabled).
+ if not pop_message_waiter.done():
+ if self.legacy_recv:
+ return None # type: ignore
+ else:
+ # Wait until the connection is closed to raise
+ # ConnectionClosed with the correct code and reason.
+ await self.ensure_open()
+
+ # Pop a message from the queue.
+ message = self.messages.popleft()
+
+ # Notify transfer_data().
+ if self._put_message_waiter is not None:
+ self._put_message_waiter.set_result(None)
+ self._put_message_waiter = None
+
+ return message
+
+ async def send(
+ self,
+ message: Union[Data, Iterable[Data], AsyncIterable[Data]],
+ ) -> None:
+ """
+ Send a message.
+
+ A string (:class:`str`) is sent as a Text_ frame. A bytestring or
+ bytes-like object (:class:`bytes`, :class:`bytearray`, or
+ :class:`memoryview`) is sent as a Binary_ frame.
+
+ .. _Text: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
+ .. _Binary: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
+
+ :meth:`send` also accepts an iterable or an asynchronous iterable of
+ strings, bytestrings, or bytes-like objects to enable fragmentation_.
+ Each item is treated as a message fragment and sent in its own frame.
+ All items must be of the same type, or else :meth:`send` will raise a
+ :exc:`TypeError` and the connection will be closed.
+
+ .. _fragmentation: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.4
+
+ :meth:`send` rejects dict-like objects because this is often an error.
+ (If you want to send the keys of a dict-like object as fragments, call
+ its :meth:`~dict.keys` method and pass the result to :meth:`send`.)
+
+ Canceling :meth:`send` is discouraged. Instead, you should close the
+ connection with :meth:`close`. Indeed, there are only two situations
+ where :meth:`send` may yield control to the event loop and then get
+ canceled; in both cases, :meth:`close` has the same effect and is
+ more clear:
+
+ 1. The write buffer is full. If you don't want to wait until enough
+ data is sent, your only alternative is to close the connection.
+ :meth:`close` will likely time out then abort the TCP connection.
+ 2. ``message`` is an asynchronous iterator that yields control.
+ Stopping in the middle of a fragmented message will cause a
+ protocol error and the connection will be closed.
+
+ When the connection is closed, :meth:`send` raises
+ :exc:`~websockets.exceptions.ConnectionClosed`. Specifically, it
+ raises :exc:`~websockets.exceptions.ConnectionClosedOK` after a normal
+ connection closure and
+ :exc:`~websockets.exceptions.ConnectionClosedError` after a protocol
+ error or a network failure.
+
+ Args:
+ message (Union[Data, Iterable[Data], AsyncIterable[Data]): message
+ to send.
+
+ Raises:
+ ConnectionClosed: When the connection is closed.
+ TypeError: If ``message`` doesn't have a supported type.
+
+ """
+ await self.ensure_open()
+
+ # While sending a fragmented message, prevent sending other messages
+ # until all fragments are sent.
+ while self._fragmented_message_waiter is not None:
+ await asyncio.shield(self._fragmented_message_waiter)
+
+ # Unfragmented message -- this case must be handled first because
+ # strings and bytes-like objects are iterable.
+
+ if isinstance(message, (str, bytes, bytearray, memoryview)):
+ opcode, data = prepare_data(message)
+ await self.write_frame(True, opcode, data)
+
+ # Catch a common mistake -- passing a dict to send().
+
+ elif isinstance(message, Mapping):
+ raise TypeError("data is a dict-like object")
+
+ # Fragmented message -- regular iterator.
+
+ elif isinstance(message, Iterable):
+ # Work around https://github.com/python/mypy/issues/6227
+ message = cast(Iterable[Data], message)
+
+ iter_message = iter(message)
+ try:
+ fragment = next(iter_message)
+ except StopIteration:
+ return
+ opcode, data = prepare_data(fragment)
+
+ self._fragmented_message_waiter = asyncio.Future()
+ try:
+ # First fragment.
+ await self.write_frame(False, opcode, data)
+
+ # Other fragments.
+ for fragment in iter_message:
+ confirm_opcode, data = prepare_data(fragment)
+ if confirm_opcode != opcode:
+ raise TypeError("data contains inconsistent types")
+ await self.write_frame(False, OP_CONT, data)
+
+ # Final fragment.
+ await self.write_frame(True, OP_CONT, b"")
+
+ except (Exception, asyncio.CancelledError):
+ # We're half-way through a fragmented message and we can't
+ # complete it. This makes the connection unusable.
+ self.fail_connection(CloseCode.INTERNAL_ERROR)
+ raise
+
+ finally:
+ self._fragmented_message_waiter.set_result(None)
+ self._fragmented_message_waiter = None
+
+ # Fragmented message -- asynchronous iterator
+
+ elif isinstance(message, AsyncIterable):
+ # Implement aiter_message = aiter(message) without aiter
+ # Work around https://github.com/python/mypy/issues/5738
+ aiter_message = cast(
+ Callable[[AsyncIterable[Data]], AsyncIterator[Data]],
+ type(message).__aiter__,
+ )(message)
+ try:
+ # Implement fragment = anext(aiter_message) without anext
+ # Work around https://github.com/python/mypy/issues/5738
+ fragment = await cast(
+ Callable[[AsyncIterator[Data]], Awaitable[Data]],
+ type(aiter_message).__anext__,
+ )(aiter_message)
+ except StopAsyncIteration:
+ return
+ opcode, data = prepare_data(fragment)
+
+ self._fragmented_message_waiter = asyncio.Future()
+ try:
+ # First fragment.
+ await self.write_frame(False, opcode, data)
+
+ # Other fragments.
+ async for fragment in aiter_message:
+ confirm_opcode, data = prepare_data(fragment)
+ if confirm_opcode != opcode:
+ raise TypeError("data contains inconsistent types")
+ await self.write_frame(False, OP_CONT, data)
+
+ # Final fragment.
+ await self.write_frame(True, OP_CONT, b"")
+
+ except (Exception, asyncio.CancelledError):
+ # We're half-way through a fragmented message and we can't
+ # complete it. This makes the connection unusable.
+ self.fail_connection(CloseCode.INTERNAL_ERROR)
+ raise
+
+ finally:
+ self._fragmented_message_waiter.set_result(None)
+ self._fragmented_message_waiter = None
+
+ else:
+ raise TypeError("data must be str, bytes-like, or iterable")
+
+ async def close(
+ self,
+ code: int = CloseCode.NORMAL_CLOSURE,
+ reason: str = "",
+ ) -> None:
+ """
+ Perform the closing handshake.
+
+ :meth:`close` waits for the other end to complete the handshake and
+ for the TCP connection to terminate. As a consequence, there's no need
+ to await :meth:`wait_closed` after :meth:`close`.
+
+ :meth:`close` is idempotent: it doesn't do anything once the
+ connection is closed.
+
+ Wrapping :func:`close` in :func:`~asyncio.create_task` is safe, given
+ that errors during connection termination aren't particularly useful.
+
+ Canceling :meth:`close` is discouraged. If it takes too long, you can
+ set a shorter ``close_timeout``. If you don't want to wait, let the
+ Python process exit, then the OS will take care of closing the TCP
+ connection.
+
+ Args:
+ code: WebSocket close code.
+ reason: WebSocket close reason.
+
+ """
+ try:
+ async with asyncio_timeout(self.close_timeout):
+ await self.write_close_frame(Close(code, reason))
+ except asyncio.TimeoutError:
+ # If the close frame cannot be sent because the send buffers
+ # are full, the closing handshake won't complete anyway.
+ # Fail the connection to shut down faster.
+ self.fail_connection()
+
+ # If no close frame is received within the timeout, asyncio_timeout()
+ # cancels the data transfer task and raises TimeoutError.
+
+ # If close() is called multiple times concurrently and one of these
+ # calls hits the timeout, the data transfer task will be canceled.
+ # Other calls will receive a CancelledError here.
+
+ try:
+ # If close() is canceled during the wait, self.transfer_data_task
+ # is canceled before the timeout elapses.
+ async with asyncio_timeout(self.close_timeout):
+ await self.transfer_data_task
+ except (asyncio.TimeoutError, asyncio.CancelledError):
+ pass
+
+ # Wait for the close connection task to close the TCP connection.
+ await asyncio.shield(self.close_connection_task)
+
+ async def wait_closed(self) -> None:
+ """
+ Wait until the connection is closed.
+
+ This coroutine is identical to the :attr:`closed` attribute, except it
+ can be awaited.
+
+ This can make it easier to detect connection termination, regardless
+ of its cause, in tasks that interact with the WebSocket connection.
+
+ """
+ await asyncio.shield(self.connection_lost_waiter)
+
+ async def ping(self, data: Optional[Data] = None) -> Awaitable[None]:
+ """
+ Send a Ping_.
+
+ .. _Ping: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.5.2
+
+ A ping may serve as a keepalive, as a check that the remote endpoint
+ received all messages up to this point, or to measure :attr:`latency`.
+
+ Canceling :meth:`ping` is discouraged. If :meth:`ping` doesn't return
+ immediately, it means the write buffer is full. If you don't want to
+ wait, you should close the connection.
+
+ Canceling the :class:`~asyncio.Future` returned by :meth:`ping` has no
+ effect.
+
+ Args:
+ data (Optional[Data]): payload of the ping; a string will be
+ encoded to UTF-8; or :obj:`None` to generate a payload
+ containing four random bytes.
+
+ Returns:
+ ~asyncio.Future[float]: A future that will be completed when the
+ corresponding pong is received. You can ignore it if you don't
+ intend to wait. The result of the future is the latency of the
+ connection in seconds.
+
+ ::
+
+ pong_waiter = await ws.ping()
+ # only if you want to wait for the corresponding pong
+ latency = await pong_waiter
+
+ Raises:
+ ConnectionClosed: When the connection is closed.
+ RuntimeError: If another ping was sent with the same data and
+ the corresponding pong wasn't received yet.
+
+ """
+ await self.ensure_open()
+
+ if data is not None:
+ data = prepare_ctrl(data)
+
+ # Protect against duplicates if a payload is explicitly set.
+ if data in self.pings:
+ raise RuntimeError("already waiting for a pong with the same data")
+
+ # Generate a unique random payload otherwise.
+ while data is None or data in self.pings:
+ data = struct.pack("!I", random.getrandbits(32))
+
+ pong_waiter = self.loop.create_future()
+ # Resolution of time.monotonic() may be too low on Windows.
+ ping_timestamp = time.perf_counter()
+ self.pings[data] = (pong_waiter, ping_timestamp)
+
+ await self.write_frame(True, OP_PING, data)
+
+ return asyncio.shield(pong_waiter)
+
+ async def pong(self, data: Data = b"") -> None:
+ """
+ Send a Pong_.
+
+ .. _Pong: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.5.3
+
+ An unsolicited pong may serve as a unidirectional heartbeat.
+
+ Canceling :meth:`pong` is discouraged. If :meth:`pong` doesn't return
+ immediately, it means the write buffer is full. If you don't want to
+ wait, you should close the connection.
+
+ Args:
+ data (Data): Payload of the pong. A string will be encoded to
+ UTF-8.
+
+ Raises:
+ ConnectionClosed: When the connection is closed.
+
+ """
+ await self.ensure_open()
+
+ data = prepare_ctrl(data)
+
+ await self.write_frame(True, OP_PONG, data)
+
+ # Private methods - no guarantees.
+
+ def connection_closed_exc(self) -> ConnectionClosed:
+ exc: ConnectionClosed
+ if (
+ self.close_rcvd is not None
+ and self.close_rcvd.code in OK_CLOSE_CODES
+ and self.close_sent is not None
+ and self.close_sent.code in OK_CLOSE_CODES
+ ):
+ exc = ConnectionClosedOK(
+ self.close_rcvd,
+ self.close_sent,
+ self.close_rcvd_then_sent,
+ )
+ else:
+ exc = ConnectionClosedError(
+ self.close_rcvd,
+ self.close_sent,
+ self.close_rcvd_then_sent,
+ )
+ # Chain to the exception that terminated data transfer, if any.
+ exc.__cause__ = self.transfer_data_exc
+ return exc
+
+ async def ensure_open(self) -> None:
+ """
+ Check that the WebSocket connection is open.
+
+ Raise :exc:`~websockets.exceptions.ConnectionClosed` if it isn't.
+
+ """
+ # Handle cases from most common to least common for performance.
+ if self.state is State.OPEN:
+ # If self.transfer_data_task exited without a closing handshake,
+ # self.close_connection_task may be closing the connection, going
+ # straight from OPEN to CLOSED.
+ if self.transfer_data_task.done():
+ await asyncio.shield(self.close_connection_task)
+ raise self.connection_closed_exc()
+ else:
+ return
+
+ if self.state is State.CLOSED:
+ raise self.connection_closed_exc()
+
+ if self.state is State.CLOSING:
+ # If we started the closing handshake, wait for its completion to
+ # get the proper close code and reason. self.close_connection_task
+ # will complete within 4 or 5 * close_timeout after close(). The
+ # CLOSING state also occurs when failing the connection. In that
+ # case self.close_connection_task will complete even faster.
+ await asyncio.shield(self.close_connection_task)
+ raise self.connection_closed_exc()
+
+ # Control may only reach this point in buggy third-party subclasses.
+ assert self.state is State.CONNECTING
+ raise InvalidState("WebSocket connection isn't established yet")
+
+ async def transfer_data(self) -> None:
+ """
+ Read incoming messages and put them in a queue.
+
+ This coroutine runs in a task until the closing handshake is started.
+
+ """
+ try:
+ while True:
+ message = await self.read_message()
+
+ # Exit the loop when receiving a close frame.
+ if message is None:
+ break
+
+ # Wait until there's room in the queue (if necessary).
+ if self.max_queue is not None:
+ while len(self.messages) >= self.max_queue:
+ self._put_message_waiter = self.loop.create_future()
+ try:
+ await asyncio.shield(self._put_message_waiter)
+ finally:
+ self._put_message_waiter = None
+
+ # Put the message in the queue.
+ self.messages.append(message)
+
+ # Notify recv().
+ if self._pop_message_waiter is not None:
+ self._pop_message_waiter.set_result(None)
+ self._pop_message_waiter = None
+
+ except asyncio.CancelledError as exc:
+ self.transfer_data_exc = exc
+ # If fail_connection() cancels this task, avoid logging the error
+ # twice and failing the connection again.
+ raise
+
+ except ProtocolError as exc:
+ self.transfer_data_exc = exc
+ self.fail_connection(CloseCode.PROTOCOL_ERROR)
+
+ except (ConnectionError, TimeoutError, EOFError, ssl.SSLError) as exc:
+ # Reading data with self.reader.readexactly may raise:
+ # - most subclasses of ConnectionError if the TCP connection
+ # breaks, is reset, or is aborted;
+ # - TimeoutError if the TCP connection times out;
+ # - IncompleteReadError, a subclass of EOFError, if fewer
+ # bytes are available than requested;
+ # - ssl.SSLError if the other side infringes the TLS protocol.
+ self.transfer_data_exc = exc
+ self.fail_connection(CloseCode.ABNORMAL_CLOSURE)
+
+ except UnicodeDecodeError as exc:
+ self.transfer_data_exc = exc
+ self.fail_connection(CloseCode.INVALID_DATA)
+
+ except PayloadTooBig as exc:
+ self.transfer_data_exc = exc
+ self.fail_connection(CloseCode.MESSAGE_TOO_BIG)
+
+ except Exception as exc:
+ # This shouldn't happen often because exceptions expected under
+ # regular circumstances are handled above. If it does, consider
+ # catching and handling more exceptions.
+ self.logger.error("data transfer failed", exc_info=True)
+
+ self.transfer_data_exc = exc
+ self.fail_connection(CloseCode.INTERNAL_ERROR)
+
+ async def read_message(self) -> Optional[Data]:
+ """
+ Read a single message from the connection.
+
+ Re-assemble data frames if the message is fragmented.
+
+ Return :obj:`None` when the closing handshake is started.
+
+ """
+ frame = await self.read_data_frame(max_size=self.max_size)
+
+ # A close frame was received.
+ if frame is None:
+ return None
+
+ if frame.opcode == OP_TEXT:
+ text = True
+ elif frame.opcode == OP_BINARY:
+ text = False
+ else: # frame.opcode == OP_CONT
+ raise ProtocolError("unexpected opcode")
+
+ # Shortcut for the common case - no fragmentation
+ if frame.fin:
+ return frame.data.decode("utf-8") if text else frame.data
+
+ # 5.4. Fragmentation
+ fragments: List[Data] = []
+ max_size = self.max_size
+ if text:
+ decoder_factory = codecs.getincrementaldecoder("utf-8")
+ decoder = decoder_factory(errors="strict")
+ if max_size is None:
+
+ def append(frame: Frame) -> None:
+ nonlocal fragments
+ fragments.append(decoder.decode(frame.data, frame.fin))
+
+ else:
+
+ def append(frame: Frame) -> None:
+ nonlocal fragments, max_size
+ fragments.append(decoder.decode(frame.data, frame.fin))
+ assert isinstance(max_size, int)
+ max_size -= len(frame.data)
+
+ else:
+ if max_size is None:
+
+ def append(frame: Frame) -> None:
+ nonlocal fragments
+ fragments.append(frame.data)
+
+ else:
+
+ def append(frame: Frame) -> None:
+ nonlocal fragments, max_size
+ fragments.append(frame.data)
+ assert isinstance(max_size, int)
+ max_size -= len(frame.data)
+
+ append(frame)
+
+ while not frame.fin:
+ frame = await self.read_data_frame(max_size=max_size)
+ if frame is None:
+ raise ProtocolError("incomplete fragmented message")
+ if frame.opcode != OP_CONT:
+ raise ProtocolError("unexpected opcode")
+ append(frame)
+
+ return ("" if text else b"").join(fragments)
+
+ async def read_data_frame(self, max_size: Optional[int]) -> Optional[Frame]:
+ """
+ Read a single data frame from the connection.
+
+ Process control frames received before the next data frame.
+
+ Return :obj:`None` if a close frame is encountered before any data frame.
+
+ """
+ # 6.2. Receiving Data
+ while True:
+ frame = await self.read_frame(max_size)
+
+ # 5.5. Control Frames
+ if frame.opcode == OP_CLOSE:
+ # 7.1.5. The WebSocket Connection Close Code
+ # 7.1.6. The WebSocket Connection Close Reason
+ self.close_rcvd = Close.parse(frame.data)
+ if self.close_sent is not None:
+ self.close_rcvd_then_sent = False
+ try:
+ # Echo the original data instead of re-serializing it with
+ # Close.serialize() because that fails when the close frame
+ # is empty and Close.parse() synthesizes a 1005 close code.
+ await self.write_close_frame(self.close_rcvd, frame.data)
+ except ConnectionClosed:
+ # Connection closed before we could echo the close frame.
+ pass
+ return None
+
+ elif frame.opcode == OP_PING:
+ # Answer pings, unless connection is CLOSING.
+ if self.state is State.OPEN:
+ try:
+ await self.pong(frame.data)
+ except ConnectionClosed:
+ # Connection closed while draining write buffer.
+ pass
+
+ elif frame.opcode == OP_PONG:
+ if frame.data in self.pings:
+ pong_timestamp = time.perf_counter()
+ # Sending a pong for only the most recent ping is legal.
+ # Acknowledge all previous pings too in that case.
+ ping_id = None
+ ping_ids = []
+ for ping_id, (pong_waiter, ping_timestamp) in self.pings.items():
+ ping_ids.append(ping_id)
+ if not pong_waiter.done():
+ pong_waiter.set_result(pong_timestamp - ping_timestamp)
+ if ping_id == frame.data:
+ self.latency = pong_timestamp - ping_timestamp
+ break
+ else:
+ raise AssertionError("solicited pong not found in pings")
+ # Remove acknowledged pings from self.pings.
+ for ping_id in ping_ids:
+ del self.pings[ping_id]
+
+ # 5.6. Data Frames
+ else:
+ return frame
+
+ async def read_frame(self, max_size: Optional[int]) -> Frame:
+ """
+ Read a single frame from the connection.
+
+ """
+ frame = await Frame.read(
+ self.reader.readexactly,
+ mask=not self.is_client,
+ max_size=max_size,
+ extensions=self.extensions,
+ )
+ if self.debug:
+ self.logger.debug("< %s", frame)
+ return frame
+
+ def write_frame_sync(self, fin: bool, opcode: int, data: bytes) -> None:
+ frame = Frame(fin, Opcode(opcode), data)
+ if self.debug:
+ self.logger.debug("> %s", frame)
+ frame.write(
+ self.transport.write,
+ mask=self.is_client,
+ extensions=self.extensions,
+ )
+
+ async def drain(self) -> None:
+ try:
+ # drain() cannot be called concurrently by multiple coroutines:
+ # http://bugs.python.org/issue29930. Remove this lock when no
+ # version of Python where this bugs exists is supported anymore.
+ async with self._drain_lock:
+ # Handle flow control automatically.
+ await self._drain()
+ except ConnectionError:
+ # Terminate the connection if the socket died.
+ self.fail_connection()
+ # Wait until the connection is closed to raise ConnectionClosed
+ # with the correct code and reason.
+ await self.ensure_open()
+
+ async def write_frame(
+ self, fin: bool, opcode: int, data: bytes, *, _state: int = State.OPEN
+ ) -> None:
+ # Defensive assertion for protocol compliance.
+ if self.state is not _state: # pragma: no cover
+ raise InvalidState(
+ f"Cannot write to a WebSocket in the {self.state.name} state"
+ )
+ self.write_frame_sync(fin, opcode, data)
+ await self.drain()
+
+ async def write_close_frame(
+ self, close: Close, data: Optional[bytes] = None
+ ) -> None:
+ """
+ Write a close frame if and only if the connection state is OPEN.
+
+ This dedicated coroutine must be used for writing close frames to
+ ensure that at most one close frame is sent on a given connection.
+
+ """
+ # Test and set the connection state before sending the close frame to
+ # avoid sending two frames in case of concurrent calls.
+ if self.state is State.OPEN:
+ # 7.1.3. The WebSocket Closing Handshake is Started
+ self.state = State.CLOSING
+ if self.debug:
+ self.logger.debug("= connection is CLOSING")
+
+ self.close_sent = close
+ if self.close_rcvd is not None:
+ self.close_rcvd_then_sent = True
+ if data is None:
+ data = close.serialize()
+
+ # 7.1.2. Start the WebSocket Closing Handshake
+ await self.write_frame(True, OP_CLOSE, data, _state=State.CLOSING)
+
+ async def keepalive_ping(self) -> None:
+ """
+ Send a Ping frame and wait for a Pong frame at regular intervals.
+
+ This coroutine exits when the connection terminates and one of the
+ following happens:
+
+ - :meth:`ping` raises :exc:`ConnectionClosed`, or
+ - :meth:`close_connection` cancels :attr:`keepalive_ping_task`.
+
+ """
+ if self.ping_interval is None:
+ return
+
+ try:
+ while True:
+ await asyncio.sleep(self.ping_interval)
+
+ # ping() raises CancelledError if the connection is closed,
+ # when close_connection() cancels self.keepalive_ping_task.
+
+ # ping() raises ConnectionClosed if the connection is lost,
+ # when connection_lost() calls abort_pings().
+
+ self.logger.debug("% sending keepalive ping")
+ pong_waiter = await self.ping()
+
+ if self.ping_timeout is not None:
+ try:
+ async with asyncio_timeout(self.ping_timeout):
+ await pong_waiter
+ self.logger.debug("% received keepalive pong")
+ except asyncio.TimeoutError:
+ if self.debug:
+ self.logger.debug("! timed out waiting for keepalive pong")
+ self.fail_connection(
+ CloseCode.INTERNAL_ERROR,
+ "keepalive ping timeout",
+ )
+ break
+
+ except ConnectionClosed:
+ pass
+
+ except Exception:
+ self.logger.error("keepalive ping failed", exc_info=True)
+
+ async def close_connection(self) -> None:
+ """
+ 7.1.1. Close the WebSocket Connection
+
+ When the opening handshake succeeds, :meth:`connection_open` starts
+ this coroutine in a task. It waits for the data transfer phase to
+ complete then it closes the TCP connection cleanly.
+
+ When the opening handshake fails, :meth:`fail_connection` does the
+ same. There's no data transfer phase in that case.
+
+ """
+ try:
+ # Wait for the data transfer phase to complete.
+ if hasattr(self, "transfer_data_task"):
+ try:
+ await self.transfer_data_task
+ except asyncio.CancelledError:
+ pass
+
+ # Cancel the keepalive ping task.
+ if hasattr(self, "keepalive_ping_task"):
+ self.keepalive_ping_task.cancel()
+
+ # A client should wait for a TCP close from the server.
+ if self.is_client and hasattr(self, "transfer_data_task"):
+ if await self.wait_for_connection_lost():
+ return
+ if self.debug:
+ self.logger.debug("! timed out waiting for TCP close")
+
+ # Half-close the TCP connection if possible (when there's no TLS).
+ if self.transport.can_write_eof():
+ if self.debug:
+ self.logger.debug("x half-closing TCP connection")
+ # write_eof() doesn't document which exceptions it raises.
+ # "[Errno 107] Transport endpoint is not connected" happens
+ # but it isn't completely clear under which circumstances.
+ # uvloop can raise RuntimeError here.
+ try:
+ self.transport.write_eof()
+ except (OSError, RuntimeError): # pragma: no cover
+ pass
+
+ if await self.wait_for_connection_lost():
+ return
+ if self.debug:
+ self.logger.debug("! timed out waiting for TCP close")
+
+ finally:
+ # The try/finally ensures that the transport never remains open,
+ # even if this coroutine is canceled (for example).
+ await self.close_transport()
+
+ async def close_transport(self) -> None:
+ """
+ Close the TCP connection.
+
+ """
+ # If connection_lost() was called, the TCP connection is closed.
+ # However, if TLS is enabled, the transport still needs closing.
+ # Else asyncio complains: ResourceWarning: unclosed transport.
+ if self.connection_lost_waiter.done() and self.transport.is_closing():
+ return
+
+ # Close the TCP connection. Buffers are flushed asynchronously.
+ if self.debug:
+ self.logger.debug("x closing TCP connection")
+ self.transport.close()
+
+ if await self.wait_for_connection_lost():
+ return
+ if self.debug:
+ self.logger.debug("! timed out waiting for TCP close")
+
+ # Abort the TCP connection. Buffers are discarded.
+ if self.debug:
+ self.logger.debug("x aborting TCP connection")
+ # Due to a bug in coverage, this is erroneously reported as not covered.
+ self.transport.abort() # pragma: no cover
+
+ # connection_lost() is called quickly after aborting.
+ await self.wait_for_connection_lost()
+
+ async def wait_for_connection_lost(self) -> bool:
+ """
+ Wait until the TCP connection is closed or ``self.close_timeout`` elapses.
+
+ Return :obj:`True` if the connection is closed and :obj:`False`
+ otherwise.
+
+ """
+ if not self.connection_lost_waiter.done():
+ try:
+ async with asyncio_timeout(self.close_timeout):
+ await asyncio.shield(self.connection_lost_waiter)
+ except asyncio.TimeoutError:
+ pass
+ # Re-check self.connection_lost_waiter.done() synchronously because
+ # connection_lost() could run between the moment the timeout occurs
+ # and the moment this coroutine resumes running.
+ return self.connection_lost_waiter.done()
+
+ def fail_connection(
+ self,
+ code: int = CloseCode.ABNORMAL_CLOSURE,
+ reason: str = "",
+ ) -> None:
+ """
+ 7.1.7. Fail the WebSocket Connection
+
+ This requires:
+
+ 1. Stopping all processing of incoming data, which means cancelling
+ :attr:`transfer_data_task`. The close code will be 1006 unless a
+ close frame was received earlier.
+
+ 2. Sending a close frame with an appropriate code if the opening
+ handshake succeeded and the other side is likely to process it.
+
+ 3. Closing the connection. :meth:`close_connection` takes care of
+ this once :attr:`transfer_data_task` exits after being canceled.
+
+ (The specification describes these steps in the opposite order.)
+
+ """
+ if self.debug:
+ self.logger.debug("! failing connection with code %d", code)
+
+ # Cancel transfer_data_task if the opening handshake succeeded.
+ # cancel() is idempotent and ignored if the task is done already.
+ if hasattr(self, "transfer_data_task"):
+ self.transfer_data_task.cancel()
+
+ # Send a close frame when the state is OPEN (a close frame was already
+ # sent if it's CLOSING), except when failing the connection because of
+ # an error reading from or writing to the network.
+ # Don't send a close frame if the connection is broken.
+ if code != CloseCode.ABNORMAL_CLOSURE and self.state is State.OPEN:
+ close = Close(code, reason)
+
+ # Write the close frame without draining the write buffer.
+
+ # Keeping fail_connection() synchronous guarantees it can't
+ # get stuck and simplifies the implementation of the callers.
+ # Not drainig the write buffer is acceptable in this context.
+
+ # This duplicates a few lines of code from write_close_frame().
+
+ self.state = State.CLOSING
+ if self.debug:
+ self.logger.debug("= connection is CLOSING")
+
+ # If self.close_rcvd was set, the connection state would be
+ # CLOSING. Therefore self.close_rcvd isn't set and we don't
+ # have to set self.close_rcvd_then_sent.
+ assert self.close_rcvd is None
+ self.close_sent = close
+
+ self.write_frame_sync(True, OP_CLOSE, close.serialize())
+
+ # Start close_connection_task if the opening handshake didn't succeed.
+ if not hasattr(self, "close_connection_task"):
+ self.close_connection_task = self.loop.create_task(self.close_connection())
+
+ def abort_pings(self) -> None:
+ """
+ Raise ConnectionClosed in pending keepalive pings.
+
+ They'll never receive a pong once the connection is closed.
+
+ """
+ assert self.state is State.CLOSED
+ exc = self.connection_closed_exc()
+
+ for pong_waiter, _ping_timestamp in self.pings.values():
+ pong_waiter.set_exception(exc)
+ # If the exception is never retrieved, it will be logged when ping
+ # is garbage-collected. This is confusing for users.
+ # Given that ping is done (with an exception), canceling it does
+ # nothing, but it prevents logging the exception.
+ pong_waiter.cancel()
+
+ # asyncio.Protocol methods
+
+ def connection_made(self, transport: asyncio.BaseTransport) -> None:
+ """
+ Configure write buffer limits.
+
+ The high-water limit is defined by ``self.write_limit``.
+
+ The low-water limit currently defaults to ``self.write_limit // 4`` in
+ :meth:`~asyncio.WriteTransport.set_write_buffer_limits`, which should
+ be all right for reasonable use cases of this library.
+
+ This is the earliest point where we can get hold of the transport,
+ which means it's the best point for configuring it.
+
+ """
+ transport = cast(asyncio.Transport, transport)
+ transport.set_write_buffer_limits(self.write_limit)
+ self.transport = transport
+
+ # Copied from asyncio.StreamReaderProtocol
+ self.reader.set_transport(transport)
+
+ def connection_lost(self, exc: Optional[Exception]) -> None:
+ """
+ 7.1.4. The WebSocket Connection is Closed.
+
+ """
+ self.state = State.CLOSED
+ self.logger.debug("= connection is CLOSED")
+
+ self.abort_pings()
+
+ # If self.connection_lost_waiter isn't pending, that's a bug, because:
+ # - it's set only here in connection_lost() which is called only once;
+ # - it must never be canceled.
+ self.connection_lost_waiter.set_result(None)
+
+ if True: # pragma: no cover
+ # Copied from asyncio.StreamReaderProtocol
+ if self.reader is not None:
+ if exc is None:
+ self.reader.feed_eof()
+ else:
+ self.reader.set_exception(exc)
+
+ # Copied from asyncio.FlowControlMixin
+ # Wake up the writer if currently paused.
+ if not self._paused:
+ return
+ waiter = self._drain_waiter
+ if waiter is None:
+ return
+ self._drain_waiter = None
+ if waiter.done():
+ return
+ if exc is None:
+ waiter.set_result(None)
+ else:
+ waiter.set_exception(exc)
+
+ def pause_writing(self) -> None: # pragma: no cover
+ assert not self._paused
+ self._paused = True
+
+ def resume_writing(self) -> None: # pragma: no cover
+ assert self._paused
+ self._paused = False
+
+ waiter = self._drain_waiter
+ if waiter is not None:
+ self._drain_waiter = None
+ if not waiter.done():
+ waiter.set_result(None)
+
+ def data_received(self, data: bytes) -> None:
+ self.reader.feed_data(data)
+
+ def eof_received(self) -> None:
+ """
+ Close the transport after receiving EOF.
+
+ The WebSocket protocol has its own closing handshake: endpoints close
+ the TCP or TLS connection after sending and receiving a close frame.
+
+ As a consequence, they never need to write after receiving EOF, so
+ there's no reason to keep the transport open by returning :obj:`True`.
+
+ Besides, that doesn't work on TLS connections.
+
+ """
+ self.reader.feed_eof()
+
+
+def broadcast(
+ websockets: Iterable[WebSocketCommonProtocol],
+ message: Data,
+ raise_exceptions: bool = False,
+) -> None:
+ """
+ Broadcast a message to several WebSocket connections.
+
+ A string (:class:`str`) is sent as a Text_ frame. A bytestring or bytes-like
+ object (:class:`bytes`, :class:`bytearray`, or :class:`memoryview`) is sent
+ as a Binary_ frame.
+
+ .. _Text: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
+ .. _Binary: https://www.rfc-editor.org/rfc/rfc6455.html#section-5.6
+
+ :func:`broadcast` pushes the message synchronously to all connections even
+ if their write buffers are overflowing. There's no backpressure.
+
+ If you broadcast messages faster than a connection can handle them, messages
+ will pile up in its write buffer until the connection times out. Keep
+ ``ping_interval`` and ``ping_timeout`` low to prevent excessive memory usage
+ from slow connections.
+
+ Unlike :meth:`~websockets.server.WebSocketServerProtocol.send`,
+ :func:`broadcast` doesn't support sending fragmented messages. Indeed,
+ fragmentation is useful for sending large messages without buffering them in
+ memory, while :func:`broadcast` buffers one copy per connection as fast as
+ possible.
+
+ :func:`broadcast` skips connections that aren't open in order to avoid
+ errors on connections where the closing handshake is in progress.
+
+ :func:`broadcast` ignores failures to write the message on some connections.
+ It continues writing to other connections. On Python 3.11 and above, you
+ may set ``raise_exceptions`` to :obj:`True` to record failures and raise all
+ exceptions in a :pep:`654` :exc:`ExceptionGroup`.
+
+ Args:
+ websockets: WebSocket connections to which the message will be sent.
+ message: Message to send.
+ raise_exceptions: Whether to raise an exception in case of failures.
+
+ Raises:
+ TypeError: If ``message`` doesn't have a supported type.
+
+ """
+ if not isinstance(message, (str, bytes, bytearray, memoryview)):
+ raise TypeError("data must be str or bytes-like")
+
+ if raise_exceptions:
+ if sys.version_info[:2] < (3, 11): # pragma: no cover
+ raise ValueError("raise_exceptions requires at least Python 3.11")
+ exceptions = []
+
+ opcode, data = prepare_data(message)
+
+ for websocket in websockets:
+ if websocket.state is not State.OPEN:
+ continue
+
+ if websocket._fragmented_message_waiter is not None:
+ if raise_exceptions:
+ exception = RuntimeError("sending a fragmented message")
+ exceptions.append(exception)
+ else:
+ websocket.logger.warning(
+ "skipped broadcast: sending a fragmented message",
+ )
+
+ try:
+ websocket.write_frame_sync(True, opcode, data)
+ except Exception as write_exception:
+ if raise_exceptions:
+ exception = RuntimeError("failed to write message")
+ exception.__cause__ = write_exception
+ exceptions.append(exception)
+ else:
+ websocket.logger.warning(
+ "skipped broadcast: failed to write message",
+ exc_info=True,
+ )
+
+ if raise_exceptions:
+ raise ExceptionGroup("skipped broadcast", exceptions)
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/server.py b/venv/lib/python3.11/site-packages/websockets/legacy/server.py
new file mode 100644
index 0000000..7c24dd7
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/server.py
@@ -0,0 +1,1185 @@
+from __future__ import annotations
+
+import asyncio
+import email.utils
+import functools
+import http
+import inspect
+import logging
+import socket
+import warnings
+from types import TracebackType
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Generator,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
+
+from ..datastructures import Headers, HeadersLike, MultipleValuesError
+from ..exceptions import (
+ AbortHandshake,
+ InvalidHandshake,
+ InvalidHeader,
+ InvalidMessage,
+ InvalidOrigin,
+ InvalidUpgrade,
+ NegotiationError,
+)
+from ..extensions import Extension, ServerExtensionFactory
+from ..extensions.permessage_deflate import enable_server_permessage_deflate
+from ..headers import (
+ build_extension,
+ parse_extension,
+ parse_subprotocol,
+ validate_subprotocols,
+)
+from ..http import USER_AGENT
+from ..protocol import State
+from ..typing import ExtensionHeader, LoggerLike, Origin, StatusLike, Subprotocol
+from .compatibility import asyncio_timeout
+from .handshake import build_response, check_request
+from .http import read_request
+from .protocol import WebSocketCommonProtocol
+
+
+__all__ = ["serve", "unix_serve", "WebSocketServerProtocol", "WebSocketServer"]
+
+
+HeadersLikeOrCallable = Union[HeadersLike, Callable[[str, Headers], HeadersLike]]
+
+HTTPResponse = Tuple[StatusLike, HeadersLike, bytes]
+
+
+class WebSocketServerProtocol(WebSocketCommonProtocol):
+ """
+ WebSocket server connection.
+
+ :class:`WebSocketServerProtocol` provides :meth:`recv` and :meth:`send`
+ coroutines for receiving and sending messages.
+
+ It supports asynchronous iteration to receive messages::
+
+ async for message in websocket:
+ await process(message)
+
+ The iterator exits normally when the connection is closed with close code
+ 1000 (OK) or 1001 (going away) or without a close code. It raises
+ a :exc:`~websockets.exceptions.ConnectionClosedError` when the connection
+ is closed with any other code.
+
+ You may customize the opening handshake in a subclass by
+ overriding :meth:`process_request` or :meth:`select_subprotocol`.
+
+ Args:
+ ws_server: WebSocket server that created this connection.
+
+ See :func:`serve` for the documentation of ``ws_handler``, ``logger``, ``origins``,
+ ``extensions``, ``subprotocols``, ``extra_headers``, and ``server_header``.
+
+ See :class:`~websockets.legacy.protocol.WebSocketCommonProtocol` for the
+ documentation of ``ping_interval``, ``ping_timeout``, ``close_timeout``,
+ ``max_size``, ``max_queue``, ``read_limit``, and ``write_limit``.
+
+ """
+
+ is_client = False
+ side = "server"
+
+ def __init__(
+ self,
+ ws_handler: Union[
+ Callable[[WebSocketServerProtocol], Awaitable[Any]],
+ Callable[[WebSocketServerProtocol, str], Awaitable[Any]], # deprecated
+ ],
+ ws_server: WebSocketServer,
+ *,
+ logger: Optional[LoggerLike] = None,
+ origins: Optional[Sequence[Optional[Origin]]] = None,
+ extensions: Optional[Sequence[ServerExtensionFactory]] = None,
+ subprotocols: Optional[Sequence[Subprotocol]] = None,
+ extra_headers: Optional[HeadersLikeOrCallable] = None,
+ server_header: Optional[str] = USER_AGENT,
+ process_request: Optional[
+ Callable[[str, Headers], Awaitable[Optional[HTTPResponse]]]
+ ] = None,
+ select_subprotocol: Optional[
+ Callable[[Sequence[Subprotocol], Sequence[Subprotocol]], Subprotocol]
+ ] = None,
+ open_timeout: Optional[float] = 10,
+ **kwargs: Any,
+ ) -> None:
+ if logger is None:
+ logger = logging.getLogger("websockets.server")
+ super().__init__(logger=logger, **kwargs)
+ # For backwards compatibility with 6.0 or earlier.
+ if origins is not None and "" in origins:
+ warnings.warn("use None instead of '' in origins", DeprecationWarning)
+ origins = [None if origin == "" else origin for origin in origins]
+ # For backwards compatibility with 10.0 or earlier. Done here in
+ # addition to serve to trigger the deprecation warning on direct
+ # use of WebSocketServerProtocol.
+ self.ws_handler = remove_path_argument(ws_handler)
+ self.ws_server = ws_server
+ self.origins = origins
+ self.available_extensions = extensions
+ self.available_subprotocols = subprotocols
+ self.extra_headers = extra_headers
+ self.server_header = server_header
+ self._process_request = process_request
+ self._select_subprotocol = select_subprotocol
+ self.open_timeout = open_timeout
+
+ def connection_made(self, transport: asyncio.BaseTransport) -> None:
+ """
+ Register connection and initialize a task to handle it.
+
+ """
+ super().connection_made(transport)
+ # Register the connection with the server before creating the handler
+ # task. Registering at the beginning of the handler coroutine would
+ # create a race condition between the creation of the task, which
+ # schedules its execution, and the moment the handler starts running.
+ self.ws_server.register(self)
+ self.handler_task = self.loop.create_task(self.handler())
+
+ async def handler(self) -> None:
+ """
+ Handle the lifecycle of a WebSocket connection.
+
+ Since this method doesn't have a caller able to handle exceptions, it
+ attempts to log relevant ones and guarantees that the TCP connection is
+ closed before exiting.
+
+ """
+ try:
+ try:
+ async with asyncio_timeout(self.open_timeout):
+ await self.handshake(
+ origins=self.origins,
+ available_extensions=self.available_extensions,
+ available_subprotocols=self.available_subprotocols,
+ extra_headers=self.extra_headers,
+ )
+ except asyncio.TimeoutError: # pragma: no cover
+ raise
+ except ConnectionError:
+ raise
+ except Exception as exc:
+ if isinstance(exc, AbortHandshake):
+ status, headers, body = exc.status, exc.headers, exc.body
+ elif isinstance(exc, InvalidOrigin):
+ if self.debug:
+ self.logger.debug("! invalid origin", exc_info=True)
+ status, headers, body = (
+ http.HTTPStatus.FORBIDDEN,
+ Headers(),
+ f"Failed to open a WebSocket connection: {exc}.\n".encode(),
+ )
+ elif isinstance(exc, InvalidUpgrade):
+ if self.debug:
+ self.logger.debug("! invalid upgrade", exc_info=True)
+ status, headers, body = (
+ http.HTTPStatus.UPGRADE_REQUIRED,
+ Headers([("Upgrade", "websocket")]),
+ (
+ f"Failed to open a WebSocket connection: {exc}.\n"
+ f"\n"
+ f"You cannot access a WebSocket server directly "
+ f"with a browser. You need a WebSocket client.\n"
+ ).encode(),
+ )
+ elif isinstance(exc, InvalidHandshake):
+ if self.debug:
+ self.logger.debug("! invalid handshake", exc_info=True)
+ status, headers, body = (
+ http.HTTPStatus.BAD_REQUEST,
+ Headers(),
+ f"Failed to open a WebSocket connection: {exc}.\n".encode(),
+ )
+ else:
+ self.logger.error("opening handshake failed", exc_info=True)
+ status, headers, body = (
+ http.HTTPStatus.INTERNAL_SERVER_ERROR,
+ Headers(),
+ (
+ b"Failed to open a WebSocket connection.\n"
+ b"See server log for more information.\n"
+ ),
+ )
+
+ headers.setdefault("Date", email.utils.formatdate(usegmt=True))
+ if self.server_header is not None:
+ headers.setdefault("Server", self.server_header)
+
+ headers.setdefault("Content-Length", str(len(body)))
+ headers.setdefault("Content-Type", "text/plain")
+ headers.setdefault("Connection", "close")
+
+ self.write_http_response(status, headers, body)
+ self.logger.info(
+ "connection rejected (%d %s)", status.value, status.phrase
+ )
+ await self.close_transport()
+ return
+
+ try:
+ await self.ws_handler(self)
+ except Exception:
+ self.logger.error("connection handler failed", exc_info=True)
+ if not self.closed:
+ self.fail_connection(1011)
+ raise
+
+ try:
+ await self.close()
+ except ConnectionError:
+ raise
+ except Exception:
+ self.logger.error("closing handshake failed", exc_info=True)
+ raise
+
+ except Exception:
+ # Last-ditch attempt to avoid leaking connections on errors.
+ try:
+ self.transport.close()
+ except Exception: # pragma: no cover
+ pass
+
+ finally:
+ # Unregister the connection with the server when the handler task
+ # terminates. Registration is tied to the lifecycle of the handler
+ # task because the server waits for tasks attached to registered
+ # connections before terminating.
+ self.ws_server.unregister(self)
+ self.logger.info("connection closed")
+
+ async def read_http_request(self) -> Tuple[str, Headers]:
+ """
+ Read request line and headers from the HTTP request.
+
+ If the request contains a body, it may be read from ``self.reader``
+ after this coroutine returns.
+
+ Raises:
+ InvalidMessage: if the HTTP message is malformed or isn't an
+ HTTP/1.1 GET request.
+
+ """
+ try:
+ path, headers = await read_request(self.reader)
+ except asyncio.CancelledError: # pragma: no cover
+ raise
+ except Exception as exc:
+ raise InvalidMessage("did not receive a valid HTTP request") from exc
+
+ if self.debug:
+ self.logger.debug("< GET %s HTTP/1.1", path)
+ for key, value in headers.raw_items():
+ self.logger.debug("< %s: %s", key, value)
+
+ self.path = path
+ self.request_headers = headers
+
+ return path, headers
+
+ def write_http_response(
+ self, status: http.HTTPStatus, headers: Headers, body: Optional[bytes] = None
+ ) -> None:
+ """
+ Write status line and headers to the HTTP response.
+
+ This coroutine is also able to write a response body.
+
+ """
+ self.response_headers = headers
+
+ if self.debug:
+ self.logger.debug("> HTTP/1.1 %d %s", status.value, status.phrase)
+ for key, value in headers.raw_items():
+ self.logger.debug("> %s: %s", key, value)
+ if body is not None:
+ self.logger.debug("> [body] (%d bytes)", len(body))
+
+ # Since the status line and headers only contain ASCII characters,
+ # we can keep this simple.
+ response = f"HTTP/1.1 {status.value} {status.phrase}\r\n"
+ response += str(headers)
+
+ self.transport.write(response.encode())
+
+ if body is not None:
+ self.transport.write(body)
+
+ async def process_request(
+ self, path: str, request_headers: Headers
+ ) -> Optional[HTTPResponse]:
+ """
+ Intercept the HTTP request and return an HTTP response if appropriate.
+
+ You may override this method in a :class:`WebSocketServerProtocol`
+ subclass, for example:
+
+ * to return an HTTP 200 OK response on a given path; then a load
+ balancer can use this path for a health check;
+ * to authenticate the request and return an HTTP 401 Unauthorized or an
+ HTTP 403 Forbidden when authentication fails.
+
+ You may also override this method with the ``process_request``
+ argument of :func:`serve` and :class:`WebSocketServerProtocol`. This
+ is equivalent, except ``process_request`` won't have access to the
+ protocol instance, so it can't store information for later use.
+
+ :meth:`process_request` is expected to complete quickly. If it may run
+ for a long time, then it should await :meth:`wait_closed` and exit if
+ :meth:`wait_closed` completes, or else it could prevent the server
+ from shutting down.
+
+ Args:
+ path: request path, including optional query string.
+ request_headers: request headers.
+
+ Returns:
+ Optional[Tuple[StatusLike, HeadersLike, bytes]]: :obj:`None`
+ to continue the WebSocket handshake normally.
+
+ An HTTP response, represented by a 3-uple of the response status,
+ headers, and body, to abort the WebSocket handshake and return
+ that HTTP response instead.
+
+ """
+ if self._process_request is not None:
+ response = self._process_request(path, request_headers)
+ if isinstance(response, Awaitable):
+ return await response
+ else:
+ # For backwards compatibility with 7.0.
+ warnings.warn(
+ "declare process_request as a coroutine", DeprecationWarning
+ )
+ return response
+ return None
+
+ @staticmethod
+ def process_origin(
+ headers: Headers, origins: Optional[Sequence[Optional[Origin]]] = None
+ ) -> Optional[Origin]:
+ """
+ Handle the Origin HTTP request header.
+
+ Args:
+ headers: request headers.
+ origins: optional list of acceptable origins.
+
+ Raises:
+ InvalidOrigin: if the origin isn't acceptable.
+
+ """
+ # "The user agent MUST NOT include more than one Origin header field"
+ # per https://www.rfc-editor.org/rfc/rfc6454.html#section-7.3.
+ try:
+ origin = cast(Optional[Origin], headers.get("Origin"))
+ except MultipleValuesError as exc:
+ raise InvalidHeader("Origin", "more than one Origin header found") from exc
+ if origins is not None:
+ if origin not in origins:
+ raise InvalidOrigin(origin)
+ return origin
+
+ @staticmethod
+ def process_extensions(
+ headers: Headers,
+ available_extensions: Optional[Sequence[ServerExtensionFactory]],
+ ) -> Tuple[Optional[str], List[Extension]]:
+ """
+ Handle the Sec-WebSocket-Extensions HTTP request header.
+
+ Accept or reject each extension proposed in the client request.
+ Negotiate parameters for accepted extensions.
+
+ Return the Sec-WebSocket-Extensions HTTP response header and the list
+ of accepted extensions.
+
+ :rfc:`6455` leaves the rules up to the specification of each
+ :extension.
+
+ To provide this level of flexibility, for each extension proposed by
+ the client, we check for a match with each extension available in the
+ server configuration. If no match is found, the extension is ignored.
+
+ If several variants of the same extension are proposed by the client,
+ it may be accepted several times, which won't make sense in general.
+ Extensions must implement their own requirements. For this purpose,
+ the list of previously accepted extensions is provided.
+
+ This process doesn't allow the server to reorder extensions. It can
+ only select a subset of the extensions proposed by the client.
+
+ Other requirements, for example related to mandatory extensions or the
+ order of extensions, may be implemented by overriding this method.
+
+ Args:
+ headers: request headers.
+ extensions: optional list of supported extensions.
+
+ Raises:
+ InvalidHandshake: to abort the handshake with an HTTP 400 error.
+
+ """
+ response_header_value: Optional[str] = None
+
+ extension_headers: List[ExtensionHeader] = []
+ accepted_extensions: List[Extension] = []
+
+ header_values = headers.get_all("Sec-WebSocket-Extensions")
+
+ if header_values and available_extensions:
+ parsed_header_values: List[ExtensionHeader] = sum(
+ [parse_extension(header_value) for header_value in header_values], []
+ )
+
+ for name, request_params in parsed_header_values:
+ for ext_factory in available_extensions:
+ # Skip non-matching extensions based on their name.
+ if ext_factory.name != name:
+ continue
+
+ # Skip non-matching extensions based on their params.
+ try:
+ response_params, extension = ext_factory.process_request_params(
+ request_params, accepted_extensions
+ )
+ except NegotiationError:
+ continue
+
+ # Add matching extension to the final list.
+ extension_headers.append((name, response_params))
+ accepted_extensions.append(extension)
+
+ # Break out of the loop once we have a match.
+ break
+
+ # If we didn't break from the loop, no extension in our list
+ # matched what the client sent. The extension is declined.
+
+ # Serialize extension header.
+ if extension_headers:
+ response_header_value = build_extension(extension_headers)
+
+ return response_header_value, accepted_extensions
+
+ # Not @staticmethod because it calls self.select_subprotocol()
+ def process_subprotocol(
+ self, headers: Headers, available_subprotocols: Optional[Sequence[Subprotocol]]
+ ) -> Optional[Subprotocol]:
+ """
+ Handle the Sec-WebSocket-Protocol HTTP request header.
+
+ Return Sec-WebSocket-Protocol HTTP response header, which is the same
+ as the selected subprotocol.
+
+ Args:
+ headers: request headers.
+ available_subprotocols: optional list of supported subprotocols.
+
+ Raises:
+ InvalidHandshake: to abort the handshake with an HTTP 400 error.
+
+ """
+ subprotocol: Optional[Subprotocol] = None
+
+ header_values = headers.get_all("Sec-WebSocket-Protocol")
+
+ if header_values and available_subprotocols:
+ parsed_header_values: List[Subprotocol] = sum(
+ [parse_subprotocol(header_value) for header_value in header_values], []
+ )
+
+ subprotocol = self.select_subprotocol(
+ parsed_header_values, available_subprotocols
+ )
+
+ return subprotocol
+
+ def select_subprotocol(
+ self,
+ client_subprotocols: Sequence[Subprotocol],
+ server_subprotocols: Sequence[Subprotocol],
+ ) -> Optional[Subprotocol]:
+ """
+ Pick a subprotocol among those supported by the client and the server.
+
+ If several subprotocols are available, select the preferred subprotocol
+ by giving equal weight to the preferences of the client and the server.
+
+ If no subprotocol is available, proceed without a subprotocol.
+
+ You may provide a ``select_subprotocol`` argument to :func:`serve` or
+ :class:`WebSocketServerProtocol` to override this logic. For example,
+ you could reject the handshake if the client doesn't support a
+ particular subprotocol, rather than accept the handshake without that
+ subprotocol.
+
+ Args:
+ client_subprotocols: list of subprotocols offered by the client.
+ server_subprotocols: list of subprotocols available on the server.
+
+ Returns:
+ Optional[Subprotocol]: Selected subprotocol, if a common subprotocol
+ was found.
+
+ :obj:`None` to continue without a subprotocol.
+
+ """
+ if self._select_subprotocol is not None:
+ return self._select_subprotocol(client_subprotocols, server_subprotocols)
+
+ subprotocols = set(client_subprotocols) & set(server_subprotocols)
+ if not subprotocols:
+ return None
+ return sorted(
+ subprotocols,
+ key=lambda p: client_subprotocols.index(p) + server_subprotocols.index(p),
+ )[0]
+
+ async def handshake(
+ self,
+ origins: Optional[Sequence[Optional[Origin]]] = None,
+ available_extensions: Optional[Sequence[ServerExtensionFactory]] = None,
+ available_subprotocols: Optional[Sequence[Subprotocol]] = None,
+ extra_headers: Optional[HeadersLikeOrCallable] = None,
+ ) -> str:
+ """
+ Perform the server side of the opening handshake.
+
+ Args:
+ origins: list of acceptable values of the Origin HTTP header;
+ include :obj:`None` if the lack of an origin is acceptable.
+ extensions: list of supported extensions, in order in which they
+ should be tried.
+ subprotocols: list of supported subprotocols, in order of
+ decreasing preference.
+ extra_headers: arbitrary HTTP headers to add to the response when
+ the handshake succeeds.
+
+ Returns:
+ str: path of the URI of the request.
+
+ Raises:
+ InvalidHandshake: if the handshake fails.
+
+ """
+ path, request_headers = await self.read_http_request()
+
+ # Hook for customizing request handling, for example checking
+ # authentication or treating some paths as plain HTTP endpoints.
+ early_response_awaitable = self.process_request(path, request_headers)
+ if isinstance(early_response_awaitable, Awaitable):
+ early_response = await early_response_awaitable
+ else:
+ # For backwards compatibility with 7.0.
+ warnings.warn("declare process_request as a coroutine", DeprecationWarning)
+ early_response = early_response_awaitable
+
+ # The connection may drop while process_request is running.
+ if self.state is State.CLOSED:
+ # This subclass of ConnectionError is silently ignored in handler().
+ raise BrokenPipeError("connection closed during opening handshake")
+
+ # Change the response to a 503 error if the server is shutting down.
+ if not self.ws_server.is_serving():
+ early_response = (
+ http.HTTPStatus.SERVICE_UNAVAILABLE,
+ [],
+ b"Server is shutting down.\n",
+ )
+
+ if early_response is not None:
+ raise AbortHandshake(*early_response)
+
+ key = check_request(request_headers)
+
+ self.origin = self.process_origin(request_headers, origins)
+
+ extensions_header, self.extensions = self.process_extensions(
+ request_headers, available_extensions
+ )
+
+ protocol_header = self.subprotocol = self.process_subprotocol(
+ request_headers, available_subprotocols
+ )
+
+ response_headers = Headers()
+
+ build_response(response_headers, key)
+
+ if extensions_header is not None:
+ response_headers["Sec-WebSocket-Extensions"] = extensions_header
+
+ if protocol_header is not None:
+ response_headers["Sec-WebSocket-Protocol"] = protocol_header
+
+ if callable(extra_headers):
+ extra_headers = extra_headers(path, self.request_headers)
+ if extra_headers is not None:
+ response_headers.update(extra_headers)
+
+ response_headers.setdefault("Date", email.utils.formatdate(usegmt=True))
+ if self.server_header is not None:
+ response_headers.setdefault("Server", self.server_header)
+
+ self.write_http_response(http.HTTPStatus.SWITCHING_PROTOCOLS, response_headers)
+
+ self.logger.info("connection open")
+
+ self.connection_open()
+
+ return path
+
+
+class WebSocketServer:
+ """
+ WebSocket server returned by :func:`serve`.
+
+ This class provides the same interface as :class:`~asyncio.Server`,
+ notably the :meth:`~asyncio.Server.close`
+ and :meth:`~asyncio.Server.wait_closed` methods.
+
+ It keeps track of WebSocket connections in order to close them properly
+ when shutting down.
+
+ Args:
+ logger: Logger for this server.
+ It defaults to ``logging.getLogger("websockets.server")``.
+ See the :doc:`logging guide <../../topics/logging>` for details.
+
+ """
+
+ def __init__(self, logger: Optional[LoggerLike] = None):
+ if logger is None:
+ logger = logging.getLogger("websockets.server")
+ self.logger = logger
+
+ # Keep track of active connections.
+ self.websockets: Set[WebSocketServerProtocol] = set()
+
+ # Task responsible for closing the server and terminating connections.
+ self.close_task: Optional[asyncio.Task[None]] = None
+
+ # Completed when the server is closed and connections are terminated.
+ self.closed_waiter: asyncio.Future[None]
+
+ def wrap(self, server: asyncio.base_events.Server) -> None:
+ """
+ Attach to a given :class:`~asyncio.Server`.
+
+ Since :meth:`~asyncio.loop.create_server` doesn't support injecting a
+ custom ``Server`` class, the easiest solution that doesn't rely on
+ private :mod:`asyncio` APIs is to:
+
+ - instantiate a :class:`WebSocketServer`
+ - give the protocol factory a reference to that instance
+ - call :meth:`~asyncio.loop.create_server` with the factory
+ - attach the resulting :class:`~asyncio.Server` with this method
+
+ """
+ self.server = server
+ for sock in server.sockets:
+ if sock.family == socket.AF_INET:
+ name = "%s:%d" % sock.getsockname()
+ elif sock.family == socket.AF_INET6:
+ name = "[%s]:%d" % sock.getsockname()[:2]
+ elif sock.family == socket.AF_UNIX:
+ name = sock.getsockname()
+ # In the unlikely event that someone runs websockets over a
+ # protocol other than IP or Unix sockets, avoid crashing.
+ else: # pragma: no cover
+ name = str(sock.getsockname())
+ self.logger.info("server listening on %s", name)
+
+ # Initialized here because we need a reference to the event loop.
+ # This should be moved back to __init__ when dropping Python < 3.10.
+ self.closed_waiter = server.get_loop().create_future()
+
+ def register(self, protocol: WebSocketServerProtocol) -> None:
+ """
+ Register a connection with this server.
+
+ """
+ self.websockets.add(protocol)
+
+ def unregister(self, protocol: WebSocketServerProtocol) -> None:
+ """
+ Unregister a connection with this server.
+
+ """
+ self.websockets.remove(protocol)
+
+ def close(self, close_connections: bool = True) -> None:
+ """
+ Close the server.
+
+ * Close the underlying :class:`~asyncio.Server`.
+ * When ``close_connections`` is :obj:`True`, which is the default,
+ close existing connections. Specifically:
+
+ * Reject opening WebSocket connections with an HTTP 503 (service
+ unavailable) error. This happens when the server accepted the TCP
+ connection but didn't complete the opening handshake before closing.
+ * Close open WebSocket connections with close code 1001 (going away).
+
+ * Wait until all connection handlers terminate.
+
+ :meth:`close` is idempotent.
+
+ """
+ if self.close_task is None:
+ self.close_task = self.get_loop().create_task(
+ self._close(close_connections)
+ )
+
+ async def _close(self, close_connections: bool) -> None:
+ """
+ Implementation of :meth:`close`.
+
+ This calls :meth:`~asyncio.Server.close` on the underlying
+ :class:`~asyncio.Server` object to stop accepting new connections and
+ then closes open connections with close code 1001.
+
+ """
+ self.logger.info("server closing")
+
+ # Stop accepting new connections.
+ self.server.close()
+
+ # Wait until all accepted connections reach connection_made() and call
+ # register(). See https://bugs.python.org/issue34852 for details.
+ await asyncio.sleep(0)
+
+ if close_connections:
+ # Close OPEN connections with close code 1001. After server.close(),
+ # handshake() closes OPENING connections with an HTTP 503 error.
+ close_tasks = [
+ asyncio.create_task(websocket.close(1001))
+ for websocket in self.websockets
+ if websocket.state is not State.CONNECTING
+ ]
+ # asyncio.wait doesn't accept an empty first argument.
+ if close_tasks:
+ await asyncio.wait(close_tasks)
+
+ # Wait until all TCP connections are closed.
+ await self.server.wait_closed()
+
+ # Wait until all connection handlers terminate.
+ # asyncio.wait doesn't accept an empty first argument.
+ if self.websockets:
+ await asyncio.wait(
+ [websocket.handler_task for websocket in self.websockets]
+ )
+
+ # Tell wait_closed() to return.
+ self.closed_waiter.set_result(None)
+
+ self.logger.info("server closed")
+
+ async def wait_closed(self) -> None:
+ """
+ Wait until the server is closed.
+
+ When :meth:`wait_closed` returns, all TCP connections are closed and
+ all connection handlers have returned.
+
+ To ensure a fast shutdown, a connection handler should always be
+ awaiting at least one of:
+
+ * :meth:`~WebSocketServerProtocol.recv`: when the connection is closed,
+ it raises :exc:`~websockets.exceptions.ConnectionClosedOK`;
+ * :meth:`~WebSocketServerProtocol.wait_closed`: when the connection is
+ closed, it returns.
+
+ Then the connection handler is immediately notified of the shutdown;
+ it can clean up and exit.
+
+ """
+ await asyncio.shield(self.closed_waiter)
+
+ def get_loop(self) -> asyncio.AbstractEventLoop:
+ """
+ See :meth:`asyncio.Server.get_loop`.
+
+ """
+ return self.server.get_loop()
+
+ def is_serving(self) -> bool:
+ """
+ See :meth:`asyncio.Server.is_serving`.
+
+ """
+ return self.server.is_serving()
+
+ async def start_serving(self) -> None: # pragma: no cover
+ """
+ See :meth:`asyncio.Server.start_serving`.
+
+ Typical use::
+
+ server = await serve(..., start_serving=False)
+ # perform additional setup here...
+ # ... then start the server
+ await server.start_serving()
+
+ """
+ await self.server.start_serving()
+
+ async def serve_forever(self) -> None: # pragma: no cover
+ """
+ See :meth:`asyncio.Server.serve_forever`.
+
+ Typical use::
+
+ server = await serve(...)
+ # this coroutine doesn't return
+ # canceling it stops the server
+ await server.serve_forever()
+
+ This is an alternative to using :func:`serve` as an asynchronous context
+ manager. Shutdown is triggered by canceling :meth:`serve_forever`
+ instead of exiting a :func:`serve` context.
+
+ """
+ await self.server.serve_forever()
+
+ @property
+ def sockets(self) -> Iterable[socket.socket]:
+ """
+ See :attr:`asyncio.Server.sockets`.
+
+ """
+ return self.server.sockets
+
+ async def __aenter__(self) -> WebSocketServer: # pragma: no cover
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> None: # pragma: no cover
+ self.close()
+ await self.wait_closed()
+
+
+class Serve:
+ """
+ Start a WebSocket server listening on ``host`` and ``port``.
+
+ Whenever a client connects, the server creates a
+ :class:`WebSocketServerProtocol`, performs the opening handshake, and
+ delegates to the connection handler, ``ws_handler``.
+
+ The handler receives the :class:`WebSocketServerProtocol` and uses it to
+ send and receive messages.
+
+ Once the handler completes, either normally or with an exception, the
+ server performs the closing handshake and closes the connection.
+
+ Awaiting :func:`serve` yields a :class:`WebSocketServer`. This object
+ provides a :meth:`~WebSocketServer.close` method to shut down the server::
+
+ stop = asyncio.Future() # set this future to exit the server
+
+ server = await serve(...)
+ await stop
+ await server.close()
+
+ :func:`serve` can be used as an asynchronous context manager. Then, the
+ server is shut down automatically when exiting the context::
+
+ stop = asyncio.Future() # set this future to exit the server
+
+ async with serve(...):
+ await stop
+
+ Args:
+ ws_handler: Connection handler. It receives the WebSocket connection,
+ which is a :class:`WebSocketServerProtocol`, in argument.
+ host: Network interfaces the server binds to.
+ See :meth:`~asyncio.loop.create_server` for details.
+ port: TCP port the server listens on.
+ See :meth:`~asyncio.loop.create_server` for details.
+ create_protocol: Factory for the :class:`asyncio.Protocol` managing
+ the connection. It defaults to :class:`WebSocketServerProtocol`.
+ Set it to a wrapper or a subclass to customize connection handling.
+ logger: Logger for this server.
+ It defaults to ``logging.getLogger("websockets.server")``.
+ See the :doc:`logging guide <../../topics/logging>` for details.
+ compression: The "permessage-deflate" extension is enabled by default.
+ Set ``compression`` to :obj:`None` to disable it. See the
+ :doc:`compression guide <../../topics/compression>` for details.
+ origins: Acceptable values of the ``Origin`` header, for defending
+ against Cross-Site WebSocket Hijacking attacks. Include :obj:`None`
+ in the list if the lack of an origin is acceptable.
+ extensions: List of supported extensions, in order in which they
+ should be negotiated and run.
+ subprotocols: List of supported subprotocols, in order of decreasing
+ preference.
+ extra_headers (Union[HeadersLike, Callable[[str, Headers], HeadersLike]]):
+ Arbitrary HTTP headers to add to the response. This can be
+ a :data:`~websockets.datastructures.HeadersLike` or a callable
+ taking the request path and headers in arguments and returning
+ a :data:`~websockets.datastructures.HeadersLike`.
+ server_header: Value of the ``Server`` response header.
+ It defaults to ``"Python/x.y.z websockets/X.Y"``.
+ Setting it to :obj:`None` removes the header.
+ process_request (Optional[Callable[[str, Headers], \
+ Awaitable[Optional[Tuple[StatusLike, HeadersLike, bytes]]]]]):
+ Intercept HTTP request before the opening handshake.
+ See :meth:`~WebSocketServerProtocol.process_request` for details.
+ select_subprotocol: Select a subprotocol supported by the client.
+ See :meth:`~WebSocketServerProtocol.select_subprotocol` for details.
+ open_timeout: Timeout for opening connections in seconds.
+ :obj:`None` disables the timeout.
+
+ See :class:`~websockets.legacy.protocol.WebSocketCommonProtocol` for the
+ documentation of ``ping_interval``, ``ping_timeout``, ``close_timeout``,
+ ``max_size``, ``max_queue``, ``read_limit``, and ``write_limit``.
+
+ Any other keyword arguments are passed the event loop's
+ :meth:`~asyncio.loop.create_server` method.
+
+ For example:
+
+ * You can set ``ssl`` to a :class:`~ssl.SSLContext` to enable TLS.
+
+ * You can set ``sock`` to a :obj:`~socket.socket` that you created
+ outside of websockets.
+
+ Returns:
+ WebSocketServer: WebSocket server.
+
+ """
+
+ def __init__(
+ self,
+ ws_handler: Union[
+ Callable[[WebSocketServerProtocol], Awaitable[Any]],
+ Callable[[WebSocketServerProtocol, str], Awaitable[Any]], # deprecated
+ ],
+ host: Optional[Union[str, Sequence[str]]] = None,
+ port: Optional[int] = None,
+ *,
+ create_protocol: Optional[Callable[..., WebSocketServerProtocol]] = None,
+ logger: Optional[LoggerLike] = None,
+ compression: Optional[str] = "deflate",
+ origins: Optional[Sequence[Optional[Origin]]] = None,
+ extensions: Optional[Sequence[ServerExtensionFactory]] = None,
+ subprotocols: Optional[Sequence[Subprotocol]] = None,
+ extra_headers: Optional[HeadersLikeOrCallable] = None,
+ server_header: Optional[str] = USER_AGENT,
+ process_request: Optional[
+ Callable[[str, Headers], Awaitable[Optional[HTTPResponse]]]
+ ] = None,
+ select_subprotocol: Optional[
+ Callable[[Sequence[Subprotocol], Sequence[Subprotocol]], Subprotocol]
+ ] = None,
+ open_timeout: Optional[float] = 10,
+ ping_interval: Optional[float] = 20,
+ ping_timeout: Optional[float] = 20,
+ close_timeout: Optional[float] = None,
+ max_size: Optional[int] = 2**20,
+ max_queue: Optional[int] = 2**5,
+ read_limit: int = 2**16,
+ write_limit: int = 2**16,
+ **kwargs: Any,
+ ) -> None:
+ # Backwards compatibility: close_timeout used to be called timeout.
+ timeout: Optional[float] = kwargs.pop("timeout", None)
+ if timeout is None:
+ timeout = 10
+ else:
+ warnings.warn("rename timeout to close_timeout", DeprecationWarning)
+ # If both are specified, timeout is ignored.
+ if close_timeout is None:
+ close_timeout = timeout
+
+ # Backwards compatibility: create_protocol used to be called klass.
+ klass: Optional[Type[WebSocketServerProtocol]] = kwargs.pop("klass", None)
+ if klass is None:
+ klass = WebSocketServerProtocol
+ else:
+ warnings.warn("rename klass to create_protocol", DeprecationWarning)
+ # If both are specified, klass is ignored.
+ if create_protocol is None:
+ create_protocol = klass
+
+ # Backwards compatibility: recv() used to return None on closed connections
+ legacy_recv: bool = kwargs.pop("legacy_recv", False)
+
+ # Backwards compatibility: the loop parameter used to be supported.
+ _loop: Optional[asyncio.AbstractEventLoop] = kwargs.pop("loop", None)
+ if _loop is None:
+ loop = asyncio.get_event_loop()
+ else:
+ loop = _loop
+ warnings.warn("remove loop argument", DeprecationWarning)
+
+ ws_server = WebSocketServer(logger=logger)
+
+ secure = kwargs.get("ssl") is not None
+
+ if compression == "deflate":
+ extensions = enable_server_permessage_deflate(extensions)
+ elif compression is not None:
+ raise ValueError(f"unsupported compression: {compression}")
+
+ if subprotocols is not None:
+ validate_subprotocols(subprotocols)
+
+ factory = functools.partial(
+ create_protocol,
+ # For backwards compatibility with 10.0 or earlier. Done here in
+ # addition to WebSocketServerProtocol to trigger the deprecation
+ # warning once per serve() call rather than once per connection.
+ remove_path_argument(ws_handler),
+ ws_server,
+ host=host,
+ port=port,
+ secure=secure,
+ open_timeout=open_timeout,
+ ping_interval=ping_interval,
+ ping_timeout=ping_timeout,
+ close_timeout=close_timeout,
+ max_size=max_size,
+ max_queue=max_queue,
+ read_limit=read_limit,
+ write_limit=write_limit,
+ loop=_loop,
+ legacy_recv=legacy_recv,
+ origins=origins,
+ extensions=extensions,
+ subprotocols=subprotocols,
+ extra_headers=extra_headers,
+ server_header=server_header,
+ process_request=process_request,
+ select_subprotocol=select_subprotocol,
+ logger=logger,
+ )
+
+ if kwargs.pop("unix", False):
+ path: Optional[str] = kwargs.pop("path", None)
+ # unix_serve(path) must not specify host and port parameters.
+ assert host is None and port is None
+ create_server = functools.partial(
+ loop.create_unix_server, factory, path, **kwargs
+ )
+ else:
+ create_server = functools.partial(
+ loop.create_server, factory, host, port, **kwargs
+ )
+
+ # This is a coroutine function.
+ self._create_server = create_server
+ self.ws_server = ws_server
+
+ # async with serve(...)
+
+ async def __aenter__(self) -> WebSocketServer:
+ return await self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> None:
+ self.ws_server.close()
+ await self.ws_server.wait_closed()
+
+ # await serve(...)
+
+ def __await__(self) -> Generator[Any, None, WebSocketServer]:
+ # Create a suitable iterator by calling __await__ on a coroutine.
+ return self.__await_impl__().__await__()
+
+ async def __await_impl__(self) -> WebSocketServer:
+ server = await self._create_server()
+ self.ws_server.wrap(server)
+ return self.ws_server
+
+ # yield from serve(...) - remove when dropping Python < 3.10
+
+ __iter__ = __await__
+
+
+serve = Serve
+
+
+def unix_serve(
+ ws_handler: Union[
+ Callable[[WebSocketServerProtocol], Awaitable[Any]],
+ Callable[[WebSocketServerProtocol, str], Awaitable[Any]], # deprecated
+ ],
+ path: Optional[str] = None,
+ **kwargs: Any,
+) -> Serve:
+ """
+ Start a WebSocket server listening on a Unix socket.
+
+ This function is identical to :func:`serve`, except the ``host`` and
+ ``port`` arguments are replaced by ``path``. It is only available on Unix.
+
+ Unrecognized keyword arguments are passed the event loop's
+ :meth:`~asyncio.loop.create_unix_server` method.
+
+ It's useful for deploying a server behind a reverse proxy such as nginx.
+
+ Args:
+ path: File system path to the Unix socket.
+
+ """
+ return serve(ws_handler, path=path, unix=True, **kwargs)
+
+
+def remove_path_argument(
+ ws_handler: Union[
+ Callable[[WebSocketServerProtocol], Awaitable[Any]],
+ Callable[[WebSocketServerProtocol, str], Awaitable[Any]],
+ ]
+) -> Callable[[WebSocketServerProtocol], Awaitable[Any]]:
+ try:
+ inspect.signature(ws_handler).bind(None)
+ except TypeError:
+ try:
+ inspect.signature(ws_handler).bind(None, "")
+ except TypeError: # pragma: no cover
+ # ws_handler accepts neither one nor two arguments; leave it alone.
+ pass
+ else:
+ # ws_handler accepts two arguments; activate backwards compatibility.
+
+ # Enable deprecation warning and announce deprecation in 11.0.
+ # warnings.warn("remove second argument of ws_handler", DeprecationWarning)
+
+ async def _ws_handler(websocket: WebSocketServerProtocol) -> Any:
+ return await cast(
+ Callable[[WebSocketServerProtocol, str], Awaitable[Any]],
+ ws_handler,
+ )(websocket, websocket.path)
+
+ return _ws_handler
+
+ return cast(
+ Callable[[WebSocketServerProtocol], Awaitable[Any]],
+ ws_handler,
+ )