diff options
| author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 | 
|---|---|---|
| committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 | 
| commit | 12cf076118570eebbff08c6b3090e0d4798447a1 (patch) | |
| tree | 3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/uvicorn/protocols | |
| parent | c45662ff3923b34614ddcc8feb9195541166dcc5 (diff) | |
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/uvicorn/protocols')
22 files changed, 0 insertions, 2044 deletions
diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/__init__.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/__init__.py +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/__pycache__/__init__.cpython-311.pyc Binary files differdeleted file mode 100644 index 5ff273e..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/__pycache__/utils.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/__pycache__/utils.cpython-311.pyc Binary files differdeleted file mode 100644 index da51e43..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/__pycache__/utils.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__init__.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__init__.py +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/__init__.cpython-311.pyc Binary files differdeleted file mode 100644 index c56186b..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/auto.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/auto.cpython-311.pyc Binary files differdeleted file mode 100644 index a598e8a..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/auto.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/flow_control.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/flow_control.cpython-311.pyc Binary files differdeleted file mode 100644 index a925cb4..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/flow_control.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/h11_impl.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/h11_impl.cpython-311.pyc Binary files differdeleted file mode 100644 index 2379bb6..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/h11_impl.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/httptools_impl.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/httptools_impl.cpython-311.pyc Binary files differdeleted file mode 100644 index ba76184..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/__pycache__/httptools_impl.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/auto.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/auto.py deleted file mode 100644 index a14bec1..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/auto.py +++ /dev/null @@ -1,15 +0,0 @@ -from __future__ import annotations - -import asyncio - -AutoHTTPProtocol: type[asyncio.Protocol] -try: -    import httptools  # noqa -except ImportError:  # pragma: no cover -    from uvicorn.protocols.http.h11_impl import H11Protocol - -    AutoHTTPProtocol = H11Protocol -else:  # pragma: no cover -    from uvicorn.protocols.http.httptools_impl import HttpToolsProtocol - -    AutoHTTPProtocol = HttpToolsProtocol diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/flow_control.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/flow_control.py deleted file mode 100644 index 893a26c..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/flow_control.py +++ /dev/null @@ -1,64 +0,0 @@ -import asyncio - -from uvicorn._types import ( -    ASGIReceiveCallable, -    ASGISendCallable, -    HTTPResponseBodyEvent, -    HTTPResponseStartEvent, -    Scope, -) - -CLOSE_HEADER = (b"connection", b"close") - -HIGH_WATER_LIMIT = 65536 - - -class FlowControl: -    def __init__(self, transport: asyncio.Transport) -> None: -        self._transport = transport -        self.read_paused = False -        self.write_paused = False -        self._is_writable_event = asyncio.Event() -        self._is_writable_event.set() - -    async def drain(self) -> None: -        await self._is_writable_event.wait() - -    def pause_reading(self) -> None: -        if not self.read_paused: -            self.read_paused = True -            self._transport.pause_reading() - -    def resume_reading(self) -> None: -        if self.read_paused: -            self.read_paused = False -            self._transport.resume_reading() - -    def pause_writing(self) -> None: -        if not self.write_paused: -            self.write_paused = True -            self._is_writable_event.clear() - -    def resume_writing(self) -> None: -        if self.write_paused: -            self.write_paused = False -            self._is_writable_event.set() - - -async def service_unavailable(scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable") -> None: -    response_start: "HTTPResponseStartEvent" = { -        "type": "http.response.start", -        "status": 503, -        "headers": [ -            (b"content-type", b"text/plain; charset=utf-8"), -            (b"connection", b"close"), -        ], -    } -    await send(response_start) - -    response_body: "HTTPResponseBodyEvent" = { -        "type": "http.response.body", -        "body": b"Service Unavailable", -        "more_body": False, -    } -    await send(response_body) diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py deleted file mode 100644 index d0f2b2a..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py +++ /dev/null @@ -1,547 +0,0 @@ -from __future__ import annotations - -import asyncio -import http -import logging -from typing import Any, Callable, Literal, cast -from urllib.parse import unquote - -import h11 -from h11._connection import DEFAULT_MAX_INCOMPLETE_EVENT_SIZE - -from uvicorn._types import ( -    ASGI3Application, -    ASGIReceiveEvent, -    ASGISendEvent, -    HTTPRequestEvent, -    HTTPResponseBodyEvent, -    HTTPResponseStartEvent, -    HTTPScope, -) -from uvicorn.config import Config -from uvicorn.logging import TRACE_LOG_LEVEL -from uvicorn.protocols.http.flow_control import ( -    CLOSE_HEADER, -    HIGH_WATER_LIMIT, -    FlowControl, -    service_unavailable, -) -from uvicorn.protocols.utils import ( -    get_client_addr, -    get_local_addr, -    get_path_with_query_string, -    get_remote_addr, -    is_ssl, -) -from uvicorn.server import ServerState - - -def _get_status_phrase(status_code: int) -> bytes: -    try: -        return http.HTTPStatus(status_code).phrase.encode() -    except ValueError: -        return b"" - - -STATUS_PHRASES = {status_code: _get_status_phrase(status_code) for status_code in range(100, 600)} - - -class H11Protocol(asyncio.Protocol): -    def __init__( -        self, -        config: Config, -        server_state: ServerState, -        app_state: dict[str, Any], -        _loop: asyncio.AbstractEventLoop | None = None, -    ) -> None: -        if not config.loaded: -            config.load() - -        self.config = config -        self.app = config.loaded_app -        self.loop = _loop or asyncio.get_event_loop() -        self.logger = logging.getLogger("uvicorn.error") -        self.access_logger = logging.getLogger("uvicorn.access") -        self.access_log = self.access_logger.hasHandlers() -        self.conn = h11.Connection( -            h11.SERVER, -            config.h11_max_incomplete_event_size -            if config.h11_max_incomplete_event_size is not None -            else DEFAULT_MAX_INCOMPLETE_EVENT_SIZE, -        ) -        self.ws_protocol_class = config.ws_protocol_class -        self.root_path = config.root_path -        self.limit_concurrency = config.limit_concurrency -        self.app_state = app_state - -        # Timeouts -        self.timeout_keep_alive_task: asyncio.TimerHandle | None = None -        self.timeout_keep_alive = config.timeout_keep_alive - -        # Shared server state -        self.server_state = server_state -        self.connections = server_state.connections -        self.tasks = server_state.tasks - -        # Per-connection state -        self.transport: asyncio.Transport = None  # type: ignore[assignment] -        self.flow: FlowControl = None  # type: ignore[assignment] -        self.server: tuple[str, int] | None = None -        self.client: tuple[str, int] | None = None -        self.scheme: Literal["http", "https"] | None = None - -        # Per-request state -        self.scope: HTTPScope = None  # type: ignore[assignment] -        self.headers: list[tuple[bytes, bytes]] = None  # type: ignore[assignment] -        self.cycle: RequestResponseCycle = None  # type: ignore[assignment] - -    # Protocol interface -    def connection_made(  # type: ignore[override] -        self, transport: asyncio.Transport -    ) -> None: -        self.connections.add(self) - -        self.transport = transport -        self.flow = FlowControl(transport) -        self.server = get_local_addr(transport) -        self.client = get_remote_addr(transport) -        self.scheme = "https" if is_ssl(transport) else "http" - -        if self.logger.level <= TRACE_LOG_LEVEL: -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sHTTP connection made", prefix) - -    def connection_lost(self, exc: Exception | None) -> None: -        self.connections.discard(self) - -        if self.logger.level <= TRACE_LOG_LEVEL: -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sHTTP connection lost", prefix) - -        if self.cycle and not self.cycle.response_complete: -            self.cycle.disconnected = True -        if self.conn.our_state != h11.ERROR: -            event = h11.ConnectionClosed() -            try: -                self.conn.send(event) -            except h11.LocalProtocolError: -                # Premature client disconnect -                pass - -        if self.cycle is not None: -            self.cycle.message_event.set() -        if self.flow is not None: -            self.flow.resume_writing() -        if exc is None: -            self.transport.close() -            self._unset_keepalive_if_required() - -    def eof_received(self) -> None: -        pass - -    def _unset_keepalive_if_required(self) -> None: -        if self.timeout_keep_alive_task is not None: -            self.timeout_keep_alive_task.cancel() -            self.timeout_keep_alive_task = None - -    def _get_upgrade(self) -> bytes | None: -        connection = [] -        upgrade = None -        for name, value in self.headers: -            if name == b"connection": -                connection = [token.lower().strip() for token in value.split(b",")] -            if name == b"upgrade": -                upgrade = value.lower() -        if b"upgrade" in connection: -            return upgrade -        return None - -    def _should_upgrade_to_ws(self) -> bool: -        if self.ws_protocol_class is None: -            if self.config.ws == "auto": -                msg = "Unsupported upgrade request." -                self.logger.warning(msg) -                msg = "No supported WebSocket library detected. Please use \"pip install 'uvicorn[standard]'\", or install 'websockets' or 'wsproto' manually."  # noqa: E501 -                self.logger.warning(msg) -            return False -        return True - -    def data_received(self, data: bytes) -> None: -        self._unset_keepalive_if_required() - -        self.conn.receive_data(data) -        self.handle_events() - -    def handle_events(self) -> None: -        while True: -            try: -                event = self.conn.next_event() -            except h11.RemoteProtocolError: -                msg = "Invalid HTTP request received." -                self.logger.warning(msg) -                self.send_400_response(msg) -                return - -            if event is h11.NEED_DATA: -                break - -            elif event is h11.PAUSED: -                # This case can occur in HTTP pipelining, so we need to -                # stop reading any more data, and ensure that at the end -                # of the active request/response cycle we handle any -                # events that have been buffered up. -                self.flow.pause_reading() -                break - -            elif isinstance(event, h11.Request): -                self.headers = [(key.lower(), value) for key, value in event.headers] -                raw_path, _, query_string = event.target.partition(b"?") -                path = unquote(raw_path.decode("ascii")) -                full_path = self.root_path + path -                full_raw_path = self.root_path.encode("ascii") + raw_path -                self.scope = { -                    "type": "http", -                    "asgi": { -                        "version": self.config.asgi_version, -                        "spec_version": "2.4", -                    }, -                    "http_version": event.http_version.decode("ascii"), -                    "server": self.server, -                    "client": self.client, -                    "scheme": self.scheme,  # type: ignore[typeddict-item] -                    "method": event.method.decode("ascii"), -                    "root_path": self.root_path, -                    "path": full_path, -                    "raw_path": full_raw_path, -                    "query_string": query_string, -                    "headers": self.headers, -                    "state": self.app_state.copy(), -                } - -                upgrade = self._get_upgrade() -                if upgrade == b"websocket" and self._should_upgrade_to_ws(): -                    self.handle_websocket_upgrade(event) -                    return - -                # Handle 503 responses when 'limit_concurrency' is exceeded. -                if self.limit_concurrency is not None and ( -                    len(self.connections) >= self.limit_concurrency or len(self.tasks) >= self.limit_concurrency -                ): -                    app = service_unavailable -                    message = "Exceeded concurrency limit." -                    self.logger.warning(message) -                else: -                    app = self.app - -                # When starting to process a request, disable the keep-alive -                # timeout. Normally we disable this when receiving data from -                # client and set back when finishing processing its request. -                # However, for pipelined requests processing finishes after -                # already receiving the next request and thus the timer may -                # be set here, which we don't want. -                self._unset_keepalive_if_required() - -                self.cycle = RequestResponseCycle( -                    scope=self.scope, -                    conn=self.conn, -                    transport=self.transport, -                    flow=self.flow, -                    logger=self.logger, -                    access_logger=self.access_logger, -                    access_log=self.access_log, -                    default_headers=self.server_state.default_headers, -                    message_event=asyncio.Event(), -                    on_response=self.on_response_complete, -                ) -                task = self.loop.create_task(self.cycle.run_asgi(app)) -                task.add_done_callback(self.tasks.discard) -                self.tasks.add(task) - -            elif isinstance(event, h11.Data): -                if self.conn.our_state is h11.DONE: -                    continue -                self.cycle.body += event.data -                if len(self.cycle.body) > HIGH_WATER_LIMIT: -                    self.flow.pause_reading() -                self.cycle.message_event.set() - -            elif isinstance(event, h11.EndOfMessage): -                if self.conn.our_state is h11.DONE: -                    self.transport.resume_reading() -                    self.conn.start_next_cycle() -                    continue -                self.cycle.more_body = False -                self.cycle.message_event.set() - -    def handle_websocket_upgrade(self, event: h11.Request) -> None: -        if self.logger.level <= TRACE_LOG_LEVEL: -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sUpgrading to WebSocket", prefix) - -        self.connections.discard(self) -        output = [event.method, b" ", event.target, b" HTTP/1.1\r\n"] -        for name, value in self.headers: -            output += [name, b": ", value, b"\r\n"] -        output.append(b"\r\n") -        protocol = self.ws_protocol_class(  # type: ignore[call-arg, misc] -            config=self.config, -            server_state=self.server_state, -            app_state=self.app_state, -        ) -        protocol.connection_made(self.transport) -        protocol.data_received(b"".join(output)) -        self.transport.set_protocol(protocol) - -    def send_400_response(self, msg: str) -> None: -        reason = STATUS_PHRASES[400] -        headers: list[tuple[bytes, bytes]] = [ -            (b"content-type", b"text/plain; charset=utf-8"), -            (b"connection", b"close"), -        ] -        event = h11.Response(status_code=400, headers=headers, reason=reason) -        output = self.conn.send(event) -        self.transport.write(output) - -        output = self.conn.send(event=h11.Data(data=msg.encode("ascii"))) -        self.transport.write(output) - -        output = self.conn.send(event=h11.EndOfMessage()) -        self.transport.write(output) - -        self.transport.close() - -    def on_response_complete(self) -> None: -        self.server_state.total_requests += 1 - -        if self.transport.is_closing(): -            return - -        # Set a short Keep-Alive timeout. -        self._unset_keepalive_if_required() - -        self.timeout_keep_alive_task = self.loop.call_later(self.timeout_keep_alive, self.timeout_keep_alive_handler) - -        # Unpause data reads if needed. -        self.flow.resume_reading() - -        # Unblock any pipelined events. -        if self.conn.our_state is h11.DONE and self.conn.their_state is h11.DONE: -            self.conn.start_next_cycle() -            self.handle_events() - -    def shutdown(self) -> None: -        """ -        Called by the server to commence a graceful shutdown. -        """ -        if self.cycle is None or self.cycle.response_complete: -            event = h11.ConnectionClosed() -            self.conn.send(event) -            self.transport.close() -        else: -            self.cycle.keep_alive = False - -    def pause_writing(self) -> None: -        """ -        Called by the transport when the write buffer exceeds the high water mark. -        """ -        self.flow.pause_writing() - -    def resume_writing(self) -> None: -        """ -        Called by the transport when the write buffer drops below the low water mark. -        """ -        self.flow.resume_writing() - -    def timeout_keep_alive_handler(self) -> None: -        """ -        Called on a keep-alive connection if no new data is received after a short -        delay. -        """ -        if not self.transport.is_closing(): -            event = h11.ConnectionClosed() -            self.conn.send(event) -            self.transport.close() - - -class RequestResponseCycle: -    def __init__( -        self, -        scope: HTTPScope, -        conn: h11.Connection, -        transport: asyncio.Transport, -        flow: FlowControl, -        logger: logging.Logger, -        access_logger: logging.Logger, -        access_log: bool, -        default_headers: list[tuple[bytes, bytes]], -        message_event: asyncio.Event, -        on_response: Callable[..., None], -    ) -> None: -        self.scope = scope -        self.conn = conn -        self.transport = transport -        self.flow = flow -        self.logger = logger -        self.access_logger = access_logger -        self.access_log = access_log -        self.default_headers = default_headers -        self.message_event = message_event -        self.on_response = on_response - -        # Connection state -        self.disconnected = False -        self.keep_alive = True -        self.waiting_for_100_continue = conn.they_are_waiting_for_100_continue - -        # Request state -        self.body = b"" -        self.more_body = True - -        # Response state -        self.response_started = False -        self.response_complete = False - -    # ASGI exception wrapper -    async def run_asgi(self, app: ASGI3Application) -> None: -        try: -            result = await app(  # type: ignore[func-returns-value] -                self.scope, self.receive, self.send -            ) -        except BaseException as exc: -            msg = "Exception in ASGI application\n" -            self.logger.error(msg, exc_info=exc) -            if not self.response_started: -                await self.send_500_response() -            else: -                self.transport.close() -        else: -            if result is not None: -                msg = "ASGI callable should return None, but returned '%s'." -                self.logger.error(msg, result) -                self.transport.close() -            elif not self.response_started and not self.disconnected: -                msg = "ASGI callable returned without starting response." -                self.logger.error(msg) -                await self.send_500_response() -            elif not self.response_complete and not self.disconnected: -                msg = "ASGI callable returned without completing response." -                self.logger.error(msg) -                self.transport.close() -        finally: -            self.on_response = lambda: None - -    async def send_500_response(self) -> None: -        response_start_event: HTTPResponseStartEvent = { -            "type": "http.response.start", -            "status": 500, -            "headers": [ -                (b"content-type", b"text/plain; charset=utf-8"), -                (b"connection", b"close"), -            ], -        } -        await self.send(response_start_event) -        response_body_event: HTTPResponseBodyEvent = { -            "type": "http.response.body", -            "body": b"Internal Server Error", -            "more_body": False, -        } -        await self.send(response_body_event) - -    # ASGI interface -    async def send(self, message: ASGISendEvent) -> None: -        message_type = message["type"] - -        if self.flow.write_paused and not self.disconnected: -            await self.flow.drain() - -        if self.disconnected: -            return - -        if not self.response_started: -            # Sending response status line and headers -            if message_type != "http.response.start": -                msg = "Expected ASGI message 'http.response.start', but got '%s'." -                raise RuntimeError(msg % message_type) -            message = cast("HTTPResponseStartEvent", message) - -            self.response_started = True -            self.waiting_for_100_continue = False - -            status = message["status"] -            headers = self.default_headers + list(message.get("headers", [])) - -            if CLOSE_HEADER in self.scope["headers"] and CLOSE_HEADER not in headers: -                headers = headers + [CLOSE_HEADER] - -            if self.access_log: -                self.access_logger.info( -                    '%s - "%s %s HTTP/%s" %d', -                    get_client_addr(self.scope), -                    self.scope["method"], -                    get_path_with_query_string(self.scope), -                    self.scope["http_version"], -                    status, -                ) - -            # Write response status line and headers -            reason = STATUS_PHRASES[status] -            response = h11.Response(status_code=status, headers=headers, reason=reason) -            output = self.conn.send(event=response) -            self.transport.write(output) - -        elif not self.response_complete: -            # Sending response body -            if message_type != "http.response.body": -                msg = "Expected ASGI message 'http.response.body', but got '%s'." -                raise RuntimeError(msg % message_type) -            message = cast("HTTPResponseBodyEvent", message) - -            body = message.get("body", b"") -            more_body = message.get("more_body", False) - -            # Write response body -            data = b"" if self.scope["method"] == "HEAD" else body -            output = self.conn.send(event=h11.Data(data=data)) -            self.transport.write(output) - -            # Handle response completion -            if not more_body: -                self.response_complete = True -                self.message_event.set() -                output = self.conn.send(event=h11.EndOfMessage()) -                self.transport.write(output) - -        else: -            # Response already sent -            msg = "Unexpected ASGI message '%s' sent, after response already completed." -            raise RuntimeError(msg % message_type) - -        if self.response_complete: -            if self.conn.our_state is h11.MUST_CLOSE or not self.keep_alive: -                self.conn.send(event=h11.ConnectionClosed()) -                self.transport.close() -            self.on_response() - -    async def receive(self) -> ASGIReceiveEvent: -        if self.waiting_for_100_continue and not self.transport.is_closing(): -            headers: list[tuple[str, str]] = [] -            event = h11.InformationalResponse(status_code=100, headers=headers, reason="Continue") -            output = self.conn.send(event=event) -            self.transport.write(output) -            self.waiting_for_100_continue = False - -        if not self.disconnected and not self.response_complete: -            self.flow.resume_reading() -            await self.message_event.wait() -            self.message_event.clear() - -        if self.disconnected or self.response_complete: -            return {"type": "http.disconnect"} - -        message: HTTPRequestEvent = { -            "type": "http.request", -            "body": self.body, -            "more_body": self.more_body, -        } -        self.body = b"" -        return message diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py deleted file mode 100644 index 997c6bb..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py +++ /dev/null @@ -1,575 +0,0 @@ -from __future__ import annotations - -import asyncio -import http -import logging -import re -import urllib -from asyncio.events import TimerHandle -from collections import deque -from typing import Any, Callable, Literal, cast - -import httptools - -from uvicorn._types import ( -    ASGI3Application, -    ASGIReceiveEvent, -    ASGISendEvent, -    HTTPRequestEvent, -    HTTPResponseBodyEvent, -    HTTPResponseStartEvent, -    HTTPScope, -) -from uvicorn.config import Config -from uvicorn.logging import TRACE_LOG_LEVEL -from uvicorn.protocols.http.flow_control import ( -    CLOSE_HEADER, -    HIGH_WATER_LIMIT, -    FlowControl, -    service_unavailable, -) -from uvicorn.protocols.utils import ( -    get_client_addr, -    get_local_addr, -    get_path_with_query_string, -    get_remote_addr, -    is_ssl, -) -from uvicorn.server import ServerState - -HEADER_RE = re.compile(b'[\x00-\x1F\x7F()<>@,;:[]={} \t\\"]') -HEADER_VALUE_RE = re.compile(b"[\x00-\x1F\x7F]") - - -def _get_status_line(status_code: int) -> bytes: -    try: -        phrase = http.HTTPStatus(status_code).phrase.encode() -    except ValueError: -        phrase = b"" -    return b"".join([b"HTTP/1.1 ", str(status_code).encode(), b" ", phrase, b"\r\n"]) - - -STATUS_LINE = {status_code: _get_status_line(status_code) for status_code in range(100, 600)} - - -class HttpToolsProtocol(asyncio.Protocol): -    def __init__( -        self, -        config: Config, -        server_state: ServerState, -        app_state: dict[str, Any], -        _loop: asyncio.AbstractEventLoop | None = None, -    ) -> None: -        if not config.loaded: -            config.load() - -        self.config = config -        self.app = config.loaded_app -        self.loop = _loop or asyncio.get_event_loop() -        self.logger = logging.getLogger("uvicorn.error") -        self.access_logger = logging.getLogger("uvicorn.access") -        self.access_log = self.access_logger.hasHandlers() -        self.parser = httptools.HttpRequestParser(self) -        self.ws_protocol_class = config.ws_protocol_class -        self.root_path = config.root_path -        self.limit_concurrency = config.limit_concurrency -        self.app_state = app_state - -        # Timeouts -        self.timeout_keep_alive_task: TimerHandle | None = None -        self.timeout_keep_alive = config.timeout_keep_alive - -        # Global state -        self.server_state = server_state -        self.connections = server_state.connections -        self.tasks = server_state.tasks - -        # Per-connection state -        self.transport: asyncio.Transport = None  # type: ignore[assignment] -        self.flow: FlowControl = None  # type: ignore[assignment] -        self.server: tuple[str, int] | None = None -        self.client: tuple[str, int] | None = None -        self.scheme: Literal["http", "https"] | None = None -        self.pipeline: deque[tuple[RequestResponseCycle, ASGI3Application]] = deque() - -        # Per-request state -        self.scope: HTTPScope = None  # type: ignore[assignment] -        self.headers: list[tuple[bytes, bytes]] = None  # type: ignore[assignment] -        self.expect_100_continue = False -        self.cycle: RequestResponseCycle = None  # type: ignore[assignment] - -    # Protocol interface -    def connection_made(  # type: ignore[override] -        self, transport: asyncio.Transport -    ) -> None: -        self.connections.add(self) - -        self.transport = transport -        self.flow = FlowControl(transport) -        self.server = get_local_addr(transport) -        self.client = get_remote_addr(transport) -        self.scheme = "https" if is_ssl(transport) else "http" - -        if self.logger.level <= TRACE_LOG_LEVEL: -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sHTTP connection made", prefix) - -    def connection_lost(self, exc: Exception | None) -> None: -        self.connections.discard(self) - -        if self.logger.level <= TRACE_LOG_LEVEL: -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sHTTP connection lost", prefix) - -        if self.cycle and not self.cycle.response_complete: -            self.cycle.disconnected = True -        if self.cycle is not None: -            self.cycle.message_event.set() -        if self.flow is not None: -            self.flow.resume_writing() -        if exc is None: -            self.transport.close() -            self._unset_keepalive_if_required() - -        self.parser = None - -    def eof_received(self) -> None: -        pass - -    def _unset_keepalive_if_required(self) -> None: -        if self.timeout_keep_alive_task is not None: -            self.timeout_keep_alive_task.cancel() -            self.timeout_keep_alive_task = None - -    def _get_upgrade(self) -> bytes | None: -        connection = [] -        upgrade = None -        for name, value in self.headers: -            if name == b"connection": -                connection = [token.lower().strip() for token in value.split(b",")] -            if name == b"upgrade": -                upgrade = value.lower() -        if b"upgrade" in connection: -            return upgrade -        return None - -    def _should_upgrade_to_ws(self, upgrade: bytes | None) -> bool: -        if upgrade == b"websocket" and self.ws_protocol_class is not None: -            return True -        if self.config.ws == "auto": -            msg = "Unsupported upgrade request." -            self.logger.warning(msg) -            msg = "No supported WebSocket library detected. Please use \"pip install 'uvicorn[standard]'\", or install 'websockets' or 'wsproto' manually."  # noqa: E501 -            self.logger.warning(msg) -        return False - -    def _should_upgrade(self) -> bool: -        upgrade = self._get_upgrade() -        return self._should_upgrade_to_ws(upgrade) - -    def data_received(self, data: bytes) -> None: -        self._unset_keepalive_if_required() - -        try: -            self.parser.feed_data(data) -        except httptools.HttpParserError: -            msg = "Invalid HTTP request received." -            self.logger.warning(msg) -            self.send_400_response(msg) -            return -        except httptools.HttpParserUpgrade: -            upgrade = self._get_upgrade() -            if self._should_upgrade_to_ws(upgrade): -                self.handle_websocket_upgrade() - -    def handle_websocket_upgrade(self) -> None: -        if self.logger.level <= TRACE_LOG_LEVEL: -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sUpgrading to WebSocket", prefix) - -        self.connections.discard(self) -        method = self.scope["method"].encode() -        output = [method, b" ", self.url, b" HTTP/1.1\r\n"] -        for name, value in self.scope["headers"]: -            output += [name, b": ", value, b"\r\n"] -        output.append(b"\r\n") -        protocol = self.ws_protocol_class(  # type: ignore[call-arg, misc] -            config=self.config, -            server_state=self.server_state, -            app_state=self.app_state, -        ) -        protocol.connection_made(self.transport) -        protocol.data_received(b"".join(output)) -        self.transport.set_protocol(protocol) - -    def send_400_response(self, msg: str) -> None: -        content = [STATUS_LINE[400]] -        for name, value in self.server_state.default_headers: -            content.extend([name, b": ", value, b"\r\n"]) -        content.extend( -            [ -                b"content-type: text/plain; charset=utf-8\r\n", -                b"content-length: " + str(len(msg)).encode("ascii") + b"\r\n", -                b"connection: close\r\n", -                b"\r\n", -                msg.encode("ascii"), -            ] -        ) -        self.transport.write(b"".join(content)) -        self.transport.close() - -    def on_message_begin(self) -> None: -        self.url = b"" -        self.expect_100_continue = False -        self.headers = [] -        self.scope = {  # type: ignore[typeddict-item] -            "type": "http", -            "asgi": {"version": self.config.asgi_version, "spec_version": "2.4"}, -            "http_version": "1.1", -            "server": self.server, -            "client": self.client, -            "scheme": self.scheme,  # type: ignore[typeddict-item] -            "root_path": self.root_path, -            "headers": self.headers, -            "state": self.app_state.copy(), -        } - -    # Parser callbacks -    def on_url(self, url: bytes) -> None: -        self.url += url - -    def on_header(self, name: bytes, value: bytes) -> None: -        name = name.lower() -        if name == b"expect" and value.lower() == b"100-continue": -            self.expect_100_continue = True -        self.headers.append((name, value)) - -    def on_headers_complete(self) -> None: -        http_version = self.parser.get_http_version() -        method = self.parser.get_method() -        self.scope["method"] = method.decode("ascii") -        if http_version != "1.1": -            self.scope["http_version"] = http_version -        if self.parser.should_upgrade() and self._should_upgrade(): -            return -        parsed_url = httptools.parse_url(self.url) -        raw_path = parsed_url.path -        path = raw_path.decode("ascii") -        if "%" in path: -            path = urllib.parse.unquote(path) -        full_path = self.root_path + path -        full_raw_path = self.root_path.encode("ascii") + raw_path -        self.scope["path"] = full_path -        self.scope["raw_path"] = full_raw_path -        self.scope["query_string"] = parsed_url.query or b"" - -        # Handle 503 responses when 'limit_concurrency' is exceeded. -        if self.limit_concurrency is not None and ( -            len(self.connections) >= self.limit_concurrency or len(self.tasks) >= self.limit_concurrency -        ): -            app = service_unavailable -            message = "Exceeded concurrency limit." -            self.logger.warning(message) -        else: -            app = self.app - -        existing_cycle = self.cycle -        self.cycle = RequestResponseCycle( -            scope=self.scope, -            transport=self.transport, -            flow=self.flow, -            logger=self.logger, -            access_logger=self.access_logger, -            access_log=self.access_log, -            default_headers=self.server_state.default_headers, -            message_event=asyncio.Event(), -            expect_100_continue=self.expect_100_continue, -            keep_alive=http_version != "1.0", -            on_response=self.on_response_complete, -        ) -        if existing_cycle is None or existing_cycle.response_complete: -            # Standard case - start processing the request. -            task = self.loop.create_task(self.cycle.run_asgi(app)) -            task.add_done_callback(self.tasks.discard) -            self.tasks.add(task) -        else: -            # Pipelined HTTP requests need to be queued up. -            self.flow.pause_reading() -            self.pipeline.appendleft((self.cycle, app)) - -    def on_body(self, body: bytes) -> None: -        if (self.parser.should_upgrade() and self._should_upgrade()) or self.cycle.response_complete: -            return -        self.cycle.body += body -        if len(self.cycle.body) > HIGH_WATER_LIMIT: -            self.flow.pause_reading() -        self.cycle.message_event.set() - -    def on_message_complete(self) -> None: -        if (self.parser.should_upgrade() and self._should_upgrade()) or self.cycle.response_complete: -            return -        self.cycle.more_body = False -        self.cycle.message_event.set() - -    def on_response_complete(self) -> None: -        # Callback for pipelined HTTP requests to be started. -        self.server_state.total_requests += 1 - -        if self.transport.is_closing(): -            return - -        self._unset_keepalive_if_required() - -        # Unpause data reads if needed. -        self.flow.resume_reading() - -        # Unblock any pipelined events. If there are none, arm the -        # Keep-Alive timeout instead. -        if self.pipeline: -            cycle, app = self.pipeline.pop() -            task = self.loop.create_task(cycle.run_asgi(app)) -            task.add_done_callback(self.tasks.discard) -            self.tasks.add(task) -        else: -            self.timeout_keep_alive_task = self.loop.call_later( -                self.timeout_keep_alive, self.timeout_keep_alive_handler -            ) - -    def shutdown(self) -> None: -        """ -        Called by the server to commence a graceful shutdown. -        """ -        if self.cycle is None or self.cycle.response_complete: -            self.transport.close() -        else: -            self.cycle.keep_alive = False - -    def pause_writing(self) -> None: -        """ -        Called by the transport when the write buffer exceeds the high water mark. -        """ -        self.flow.pause_writing() - -    def resume_writing(self) -> None: -        """ -        Called by the transport when the write buffer drops below the low water mark. -        """ -        self.flow.resume_writing() - -    def timeout_keep_alive_handler(self) -> None: -        """ -        Called on a keep-alive connection if no new data is received after a short -        delay. -        """ -        if not self.transport.is_closing(): -            self.transport.close() - - -class RequestResponseCycle: -    def __init__( -        self, -        scope: HTTPScope, -        transport: asyncio.Transport, -        flow: FlowControl, -        logger: logging.Logger, -        access_logger: logging.Logger, -        access_log: bool, -        default_headers: list[tuple[bytes, bytes]], -        message_event: asyncio.Event, -        expect_100_continue: bool, -        keep_alive: bool, -        on_response: Callable[..., None], -    ): -        self.scope = scope -        self.transport = transport -        self.flow = flow -        self.logger = logger -        self.access_logger = access_logger -        self.access_log = access_log -        self.default_headers = default_headers -        self.message_event = message_event -        self.on_response = on_response - -        # Connection state -        self.disconnected = False -        self.keep_alive = keep_alive -        self.waiting_for_100_continue = expect_100_continue - -        # Request state -        self.body = b"" -        self.more_body = True - -        # Response state -        self.response_started = False -        self.response_complete = False -        self.chunked_encoding: bool | None = None -        self.expected_content_length = 0 - -    # ASGI exception wrapper -    async def run_asgi(self, app: ASGI3Application) -> None: -        try: -            result = await app(  # type: ignore[func-returns-value] -                self.scope, self.receive, self.send -            ) -        except BaseException as exc: -            msg = "Exception in ASGI application\n" -            self.logger.error(msg, exc_info=exc) -            if not self.response_started: -                await self.send_500_response() -            else: -                self.transport.close() -        else: -            if result is not None: -                msg = "ASGI callable should return None, but returned '%s'." -                self.logger.error(msg, result) -                self.transport.close() -            elif not self.response_started and not self.disconnected: -                msg = "ASGI callable returned without starting response." -                self.logger.error(msg) -                await self.send_500_response() -            elif not self.response_complete and not self.disconnected: -                msg = "ASGI callable returned without completing response." -                self.logger.error(msg) -                self.transport.close() -        finally: -            self.on_response = lambda: None - -    async def send_500_response(self) -> None: -        response_start_event: HTTPResponseStartEvent = { -            "type": "http.response.start", -            "status": 500, -            "headers": [ -                (b"content-type", b"text/plain; charset=utf-8"), -                (b"connection", b"close"), -            ], -        } -        await self.send(response_start_event) -        response_body_event: HTTPResponseBodyEvent = { -            "type": "http.response.body", -            "body": b"Internal Server Error", -            "more_body": False, -        } -        await self.send(response_body_event) - -    # ASGI interface -    async def send(self, message: ASGISendEvent) -> None: -        message_type = message["type"] - -        if self.flow.write_paused and not self.disconnected: -            await self.flow.drain() - -        if self.disconnected: -            return - -        if not self.response_started: -            # Sending response status line and headers -            if message_type != "http.response.start": -                msg = "Expected ASGI message 'http.response.start', but got '%s'." -                raise RuntimeError(msg % message_type) -            message = cast("HTTPResponseStartEvent", message) - -            self.response_started = True -            self.waiting_for_100_continue = False - -            status_code = message["status"] -            headers = self.default_headers + list(message.get("headers", [])) - -            if CLOSE_HEADER in self.scope["headers"] and CLOSE_HEADER not in headers: -                headers = headers + [CLOSE_HEADER] - -            if self.access_log: -                self.access_logger.info( -                    '%s - "%s %s HTTP/%s" %d', -                    get_client_addr(self.scope), -                    self.scope["method"], -                    get_path_with_query_string(self.scope), -                    self.scope["http_version"], -                    status_code, -                ) - -            # Write response status line and headers -            content = [STATUS_LINE[status_code]] - -            for name, value in headers: -                if HEADER_RE.search(name): -                    raise RuntimeError("Invalid HTTP header name.") -                if HEADER_VALUE_RE.search(value): -                    raise RuntimeError("Invalid HTTP header value.") - -                name = name.lower() -                if name == b"content-length" and self.chunked_encoding is None: -                    self.expected_content_length = int(value.decode()) -                    self.chunked_encoding = False -                elif name == b"transfer-encoding" and value.lower() == b"chunked": -                    self.expected_content_length = 0 -                    self.chunked_encoding = True -                elif name == b"connection" and value.lower() == b"close": -                    self.keep_alive = False -                content.extend([name, b": ", value, b"\r\n"]) - -            if self.chunked_encoding is None and self.scope["method"] != "HEAD" and status_code not in (204, 304): -                # Neither content-length nor transfer-encoding specified -                self.chunked_encoding = True -                content.append(b"transfer-encoding: chunked\r\n") - -            content.append(b"\r\n") -            self.transport.write(b"".join(content)) - -        elif not self.response_complete: -            # Sending response body -            if message_type != "http.response.body": -                msg = "Expected ASGI message 'http.response.body', but got '%s'." -                raise RuntimeError(msg % message_type) - -            body = cast(bytes, message.get("body", b"")) -            more_body = message.get("more_body", False) - -            # Write response body -            if self.scope["method"] == "HEAD": -                self.expected_content_length = 0 -            elif self.chunked_encoding: -                if body: -                    content = [b"%x\r\n" % len(body), body, b"\r\n"] -                else: -                    content = [] -                if not more_body: -                    content.append(b"0\r\n\r\n") -                self.transport.write(b"".join(content)) -            else: -                num_bytes = len(body) -                if num_bytes > self.expected_content_length: -                    raise RuntimeError("Response content longer than Content-Length") -                else: -                    self.expected_content_length -= num_bytes -                self.transport.write(body) - -            # Handle response completion -            if not more_body: -                if self.expected_content_length != 0: -                    raise RuntimeError("Response content shorter than Content-Length") -                self.response_complete = True -                self.message_event.set() -                if not self.keep_alive: -                    self.transport.close() -                self.on_response() - -        else: -            # Response already sent -            msg = "Unexpected ASGI message '%s' sent, after response already completed." -            raise RuntimeError(msg % message_type) - -    async def receive(self) -> ASGIReceiveEvent: -        if self.waiting_for_100_continue and not self.transport.is_closing(): -            self.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n") -            self.waiting_for_100_continue = False - -        if not self.disconnected and not self.response_complete: -            self.flow.resume_reading() -            await self.message_event.wait() -            self.message_event.clear() - -        if self.disconnected or self.response_complete: -            return {"type": "http.disconnect"} -        message: HTTPRequestEvent = {"type": "http.request", "body": self.body, "more_body": self.more_body} -        self.body = b"" -        return message diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/utils.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/utils.py deleted file mode 100644 index 4e65806..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/utils.py +++ /dev/null @@ -1,57 +0,0 @@ -from __future__ import annotations - -import asyncio -import urllib.parse - -from uvicorn._types import WWWScope - - -class ClientDisconnected(IOError): -    ... - - -def get_remote_addr(transport: asyncio.Transport) -> tuple[str, int] | None: -    socket_info = transport.get_extra_info("socket") -    if socket_info is not None: -        try: -            info = socket_info.getpeername() -            return (str(info[0]), int(info[1])) if isinstance(info, tuple) else None -        except OSError:  # pragma: no cover -            # This case appears to inconsistently occur with uvloop -            # bound to a unix domain socket. -            return None - -    info = transport.get_extra_info("peername") -    if info is not None and isinstance(info, (list, tuple)) and len(info) == 2: -        return (str(info[0]), int(info[1])) -    return None - - -def get_local_addr(transport: asyncio.Transport) -> tuple[str, int] | None: -    socket_info = transport.get_extra_info("socket") -    if socket_info is not None: -        info = socket_info.getsockname() - -        return (str(info[0]), int(info[1])) if isinstance(info, tuple) else None -    info = transport.get_extra_info("sockname") -    if info is not None and isinstance(info, (list, tuple)) and len(info) == 2: -        return (str(info[0]), int(info[1])) -    return None - - -def is_ssl(transport: asyncio.Transport) -> bool: -    return bool(transport.get_extra_info("sslcontext")) - - -def get_client_addr(scope: WWWScope) -> str: -    client = scope.get("client") -    if not client: -        return "" -    return "%s:%d" % client - - -def get_path_with_query_string(scope: WWWScope) -> str: -    path_with_query_string = urllib.parse.quote(scope["path"]) -    if scope["query_string"]: -        path_with_query_string = "{}?{}".format(path_with_query_string, scope["query_string"].decode("ascii")) -    return path_with_query_string diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__init__.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__init__.py +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/__init__.cpython-311.pyc Binary files differdeleted file mode 100644 index d216ab9..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/auto.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/auto.cpython-311.pyc Binary files differdeleted file mode 100644 index e8c06fa..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/auto.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/websockets_impl.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/websockets_impl.cpython-311.pyc Binary files differdeleted file mode 100644 index 334a441..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/websockets_impl.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/wsproto_impl.cpython-311.pyc b/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/wsproto_impl.cpython-311.pyc Binary files differdeleted file mode 100644 index 3b1911e..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/__pycache__/wsproto_impl.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/auto.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/auto.py deleted file mode 100644 index 08fd136..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/auto.py +++ /dev/null @@ -1,21 +0,0 @@ -from __future__ import annotations - -import asyncio -import typing - -AutoWebSocketsProtocol: typing.Callable[..., asyncio.Protocol] | None -try: -    import websockets  # noqa -except ImportError:  # pragma: no cover -    try: -        import wsproto  # noqa -    except ImportError: -        AutoWebSocketsProtocol = None -    else: -        from uvicorn.protocols.websockets.wsproto_impl import WSProtocol - -        AutoWebSocketsProtocol = WSProtocol -else: -    from uvicorn.protocols.websockets.websockets_impl import WebSocketProtocol - -    AutoWebSocketsProtocol = WebSocketProtocol diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/websockets_impl.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/websockets_impl.py deleted file mode 100644 index 6d098d5..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/websockets_impl.py +++ /dev/null @@ -1,388 +0,0 @@ -from __future__ import annotations - -import asyncio -import http -import logging -from typing import Any, Literal, Optional, Sequence, cast -from urllib.parse import unquote - -import websockets -from websockets.datastructures import Headers -from websockets.exceptions import ConnectionClosed -from websockets.extensions.permessage_deflate import ServerPerMessageDeflateFactory -from websockets.legacy.server import HTTPResponse -from websockets.server import WebSocketServerProtocol -from websockets.typing import Subprotocol - -from uvicorn._types import ( -    ASGISendEvent, -    WebSocketAcceptEvent, -    WebSocketCloseEvent, -    WebSocketConnectEvent, -    WebSocketDisconnectEvent, -    WebSocketReceiveEvent, -    WebSocketResponseBodyEvent, -    WebSocketResponseStartEvent, -    WebSocketScope, -    WebSocketSendEvent, -) -from uvicorn.config import Config -from uvicorn.logging import TRACE_LOG_LEVEL -from uvicorn.protocols.utils import ( -    ClientDisconnected, -    get_local_addr, -    get_path_with_query_string, -    get_remote_addr, -    is_ssl, -) -from uvicorn.server import ServerState - - -class Server: -    closing = False - -    def register(self, ws: WebSocketServerProtocol) -> None: -        pass - -    def unregister(self, ws: WebSocketServerProtocol) -> None: -        pass - -    def is_serving(self) -> bool: -        return not self.closing - - -class WebSocketProtocol(WebSocketServerProtocol): -    extra_headers: list[tuple[str, str]] - -    def __init__( -        self, -        config: Config, -        server_state: ServerState, -        app_state: dict[str, Any], -        _loop: asyncio.AbstractEventLoop | None = None, -    ): -        if not config.loaded: -            config.load() - -        self.config = config -        self.app = config.loaded_app -        self.loop = _loop or asyncio.get_event_loop() -        self.root_path = config.root_path -        self.app_state = app_state - -        # Shared server state -        self.connections = server_state.connections -        self.tasks = server_state.tasks - -        # Connection state -        self.transport: asyncio.Transport = None  # type: ignore[assignment] -        self.server: tuple[str, int] | None = None -        self.client: tuple[str, int] | None = None -        self.scheme: Literal["wss", "ws"] = None  # type: ignore[assignment] - -        # Connection events -        self.scope: WebSocketScope -        self.handshake_started_event = asyncio.Event() -        self.handshake_completed_event = asyncio.Event() -        self.closed_event = asyncio.Event() -        self.initial_response: HTTPResponse | None = None -        self.connect_sent = False -        self.lost_connection_before_handshake = False -        self.accepted_subprotocol: Subprotocol | None = None - -        self.ws_server: Server = Server()  # type: ignore[assignment] - -        extensions = [] -        if self.config.ws_per_message_deflate: -            extensions.append(ServerPerMessageDeflateFactory()) - -        super().__init__( -            ws_handler=self.ws_handler, -            ws_server=self.ws_server,  # type: ignore[arg-type] -            max_size=self.config.ws_max_size, -            max_queue=self.config.ws_max_queue, -            ping_interval=self.config.ws_ping_interval, -            ping_timeout=self.config.ws_ping_timeout, -            extensions=extensions, -            logger=logging.getLogger("uvicorn.error"), -        ) -        self.server_header = None -        self.extra_headers = [ -            (name.decode("latin-1"), value.decode("latin-1")) for name, value in server_state.default_headers -        ] - -    def connection_made(  # type: ignore[override] -        self, transport: asyncio.Transport -    ) -> None: -        self.connections.add(self) -        self.transport = transport -        self.server = get_local_addr(transport) -        self.client = get_remote_addr(transport) -        self.scheme = "wss" if is_ssl(transport) else "ws" - -        if self.logger.isEnabledFor(TRACE_LOG_LEVEL): -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket connection made", prefix) - -        super().connection_made(transport) - -    def connection_lost(self, exc: Exception | None) -> None: -        self.connections.remove(self) - -        if self.logger.isEnabledFor(TRACE_LOG_LEVEL): -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket connection lost", prefix) - -        self.lost_connection_before_handshake = not self.handshake_completed_event.is_set() -        self.handshake_completed_event.set() -        super().connection_lost(exc) -        if exc is None: -            self.transport.close() - -    def shutdown(self) -> None: -        self.ws_server.closing = True -        if self.handshake_completed_event.is_set(): -            self.fail_connection(1012) -        else: -            self.send_500_response() -        self.transport.close() - -    def on_task_complete(self, task: asyncio.Task) -> None: -        self.tasks.discard(task) - -    async def process_request(self, path: str, headers: Headers) -> HTTPResponse | None: -        """ -        This hook is called to determine if the websocket should return -        an HTTP response and close. - -        Our behavior here is to start the ASGI application, and then wait -        for either `accept` or `close` in order to determine if we should -        close the connection. -        """ -        path_portion, _, query_string = path.partition("?") - -        websockets.legacy.handshake.check_request(headers) - -        subprotocols = [] -        for header in headers.get_all("Sec-WebSocket-Protocol"): -            subprotocols.extend([token.strip() for token in header.split(",")]) - -        asgi_headers = [ -            (name.encode("ascii"), value.encode("ascii", errors="surrogateescape")) -            for name, value in headers.raw_items() -        ] -        path = unquote(path_portion) -        full_path = self.root_path + path -        full_raw_path = self.root_path.encode("ascii") + path_portion.encode("ascii") - -        self.scope = { -            "type": "websocket", -            "asgi": {"version": self.config.asgi_version, "spec_version": "2.4"}, -            "http_version": "1.1", -            "scheme": self.scheme, -            "server": self.server, -            "client": self.client, -            "root_path": self.root_path, -            "path": full_path, -            "raw_path": full_raw_path, -            "query_string": query_string.encode("ascii"), -            "headers": asgi_headers, -            "subprotocols": subprotocols, -            "state": self.app_state.copy(), -            "extensions": {"websocket.http.response": {}}, -        } -        task = self.loop.create_task(self.run_asgi()) -        task.add_done_callback(self.on_task_complete) -        self.tasks.add(task) -        await self.handshake_started_event.wait() -        return self.initial_response - -    def process_subprotocol( -        self, headers: Headers, available_subprotocols: Sequence[Subprotocol] | None -    ) -> Subprotocol | None: -        """ -        We override the standard 'process_subprotocol' behavior here so that -        we return whatever subprotocol is sent in the 'accept' message. -        """ -        return self.accepted_subprotocol - -    def send_500_response(self) -> None: -        msg = b"Internal Server Error" -        content = [ -            b"HTTP/1.1 500 Internal Server Error\r\n" b"content-type: text/plain; charset=utf-8\r\n", -            b"content-length: " + str(len(msg)).encode("ascii") + b"\r\n", -            b"connection: close\r\n", -            b"\r\n", -            msg, -        ] -        self.transport.write(b"".join(content)) -        # Allow handler task to terminate cleanly, as websockets doesn't cancel it by -        # itself (see https://github.com/encode/uvicorn/issues/920) -        self.handshake_started_event.set() - -    async def ws_handler(  # type: ignore[override] -        self, protocol: WebSocketServerProtocol, path: str -    ) -> Any: -        """ -        This is the main handler function for the 'websockets' implementation -        to call into. We just wait for close then return, and instead allow -        'send' and 'receive' events to drive the flow. -        """ -        self.handshake_completed_event.set() -        await self.wait_closed() - -    async def run_asgi(self) -> None: -        """ -        Wrapper around the ASGI callable, handling exceptions and unexpected -        termination states. -        """ -        try: -            result = await self.app(self.scope, self.asgi_receive, self.asgi_send) -        except ClientDisconnected: -            self.closed_event.set() -            self.transport.close() -        except BaseException as exc: -            self.closed_event.set() -            msg = "Exception in ASGI application\n" -            self.logger.error(msg, exc_info=exc) -            if not self.handshake_started_event.is_set(): -                self.send_500_response() -            else: -                await self.handshake_completed_event.wait() -            self.transport.close() -        else: -            self.closed_event.set() -            if not self.handshake_started_event.is_set(): -                msg = "ASGI callable returned without sending handshake." -                self.logger.error(msg) -                self.send_500_response() -                self.transport.close() -            elif result is not None: -                msg = "ASGI callable should return None, but returned '%s'." -                self.logger.error(msg, result) -                await self.handshake_completed_event.wait() -                self.transport.close() - -    async def asgi_send(self, message: ASGISendEvent) -> None: -        message_type = message["type"] - -        if not self.handshake_started_event.is_set(): -            if message_type == "websocket.accept": -                message = cast("WebSocketAcceptEvent", message) -                self.logger.info( -                    '%s - "WebSocket %s" [accepted]', -                    self.scope["client"], -                    get_path_with_query_string(self.scope), -                ) -                self.initial_response = None -                self.accepted_subprotocol = cast(Optional[Subprotocol], message.get("subprotocol")) -                if "headers" in message: -                    self.extra_headers.extend( -                        # ASGI spec requires bytes -                        # But for compatibility we need to convert it to strings -                        (name.decode("latin-1"), value.decode("latin-1")) -                        for name, value in message["headers"] -                    ) -                self.handshake_started_event.set() - -            elif message_type == "websocket.close": -                message = cast("WebSocketCloseEvent", message) -                self.logger.info( -                    '%s - "WebSocket %s" 403', -                    self.scope["client"], -                    get_path_with_query_string(self.scope), -                ) -                self.initial_response = (http.HTTPStatus.FORBIDDEN, [], b"") -                self.handshake_started_event.set() -                self.closed_event.set() - -            elif message_type == "websocket.http.response.start": -                message = cast("WebSocketResponseStartEvent", message) -                self.logger.info( -                    '%s - "WebSocket %s" %d', -                    self.scope["client"], -                    get_path_with_query_string(self.scope), -                    message["status"], -                ) -                # websockets requires the status to be an enum. look it up. -                status = http.HTTPStatus(message["status"]) -                headers = [ -                    (name.decode("latin-1"), value.decode("latin-1")) for name, value in message.get("headers", []) -                ] -                self.initial_response = (status, headers, b"") -                self.handshake_started_event.set() - -            else: -                msg = ( -                    "Expected ASGI message 'websocket.accept', 'websocket.close', " -                    "or 'websocket.http.response.start' but got '%s'." -                ) -                raise RuntimeError(msg % message_type) - -        elif not self.closed_event.is_set() and self.initial_response is None: -            await self.handshake_completed_event.wait() - -            try: -                if message_type == "websocket.send": -                    message = cast("WebSocketSendEvent", message) -                    bytes_data = message.get("bytes") -                    text_data = message.get("text") -                    data = text_data if bytes_data is None else bytes_data -                    await self.send(data)  # type: ignore[arg-type] - -                elif message_type == "websocket.close": -                    message = cast("WebSocketCloseEvent", message) -                    code = message.get("code", 1000) -                    reason = message.get("reason", "") or "" -                    await self.close(code, reason) -                    self.closed_event.set() - -                else: -                    msg = "Expected ASGI message 'websocket.send' or 'websocket.close'," " but got '%s'." -                    raise RuntimeError(msg % message_type) -            except ConnectionClosed as exc: -                raise ClientDisconnected from exc - -        elif self.initial_response is not None: -            if message_type == "websocket.http.response.body": -                message = cast("WebSocketResponseBodyEvent", message) -                body = self.initial_response[2] + message["body"] -                self.initial_response = self.initial_response[:2] + (body,) -                if not message.get("more_body", False): -                    self.closed_event.set() -            else: -                msg = "Expected ASGI message 'websocket.http.response.body' " "but got '%s'." -                raise RuntimeError(msg % message_type) - -        else: -            msg = "Unexpected ASGI message '%s', after sending 'websocket.close' " "or response already completed." -            raise RuntimeError(msg % message_type) - -    async def asgi_receive( -        self, -    ) -> WebSocketDisconnectEvent | WebSocketConnectEvent | WebSocketReceiveEvent: -        if not self.connect_sent: -            self.connect_sent = True -            return {"type": "websocket.connect"} - -        await self.handshake_completed_event.wait() - -        if self.lost_connection_before_handshake: -            # If the handshake failed or the app closed before handshake completion, -            # use 1006 Abnormal Closure. -            return {"type": "websocket.disconnect", "code": 1006} - -        if self.closed_event.is_set(): -            return {"type": "websocket.disconnect", "code": 1005} - -        try: -            data = await self.recv() -        except ConnectionClosed as exc: -            self.closed_event.set() -            if self.ws_server.closing: -                return {"type": "websocket.disconnect", "code": 1012} -            return {"type": "websocket.disconnect", "code": exc.code} - -        if isinstance(data, str): -            return {"type": "websocket.receive", "text": data} -        return {"type": "websocket.receive", "bytes": data} diff --git a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/wsproto_impl.py b/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/wsproto_impl.py deleted file mode 100644 index c926252..0000000 --- a/venv/lib/python3.11/site-packages/uvicorn/protocols/websockets/wsproto_impl.py +++ /dev/null @@ -1,377 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -import typing -from typing import Literal -from urllib.parse import unquote - -import wsproto -from wsproto import ConnectionType, events -from wsproto.connection import ConnectionState -from wsproto.extensions import Extension, PerMessageDeflate -from wsproto.utilities import LocalProtocolError, RemoteProtocolError - -from uvicorn._types import ( -    ASGISendEvent, -    WebSocketAcceptEvent, -    WebSocketCloseEvent, -    WebSocketEvent, -    WebSocketResponseBodyEvent, -    WebSocketResponseStartEvent, -    WebSocketScope, -    WebSocketSendEvent, -) -from uvicorn.config import Config -from uvicorn.logging import TRACE_LOG_LEVEL -from uvicorn.protocols.utils import ( -    ClientDisconnected, -    get_local_addr, -    get_path_with_query_string, -    get_remote_addr, -    is_ssl, -) -from uvicorn.server import ServerState - - -class WSProtocol(asyncio.Protocol): -    def __init__( -        self, -        config: Config, -        server_state: ServerState, -        app_state: dict[str, typing.Any], -        _loop: asyncio.AbstractEventLoop | None = None, -    ) -> None: -        if not config.loaded: -            config.load() - -        self.config = config -        self.app = config.loaded_app -        self.loop = _loop or asyncio.get_event_loop() -        self.logger = logging.getLogger("uvicorn.error") -        self.root_path = config.root_path -        self.app_state = app_state - -        # Shared server state -        self.connections = server_state.connections -        self.tasks = server_state.tasks -        self.default_headers = server_state.default_headers - -        # Connection state -        self.transport: asyncio.Transport = None  # type: ignore[assignment] -        self.server: tuple[str, int] | None = None -        self.client: tuple[str, int] | None = None -        self.scheme: Literal["wss", "ws"] = None  # type: ignore[assignment] - -        # WebSocket state -        self.queue: asyncio.Queue[WebSocketEvent] = asyncio.Queue() -        self.handshake_complete = False -        self.close_sent = False - -        # Rejection state -        self.response_started = False - -        self.conn = wsproto.WSConnection(connection_type=ConnectionType.SERVER) - -        self.read_paused = False -        self.writable = asyncio.Event() -        self.writable.set() - -        # Buffers -        self.bytes = b"" -        self.text = "" - -    # Protocol interface - -    def connection_made(  # type: ignore[override] -        self, transport: asyncio.Transport -    ) -> None: -        self.connections.add(self) -        self.transport = transport -        self.server = get_local_addr(transport) -        self.client = get_remote_addr(transport) -        self.scheme = "wss" if is_ssl(transport) else "ws" - -        if self.logger.level <= TRACE_LOG_LEVEL: -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket connection made", prefix) - -    def connection_lost(self, exc: Exception | None) -> None: -        code = 1005 if self.handshake_complete else 1006 -        self.queue.put_nowait({"type": "websocket.disconnect", "code": code}) -        self.connections.remove(self) - -        if self.logger.level <= TRACE_LOG_LEVEL: -            prefix = "%s:%d - " % self.client if self.client else "" -            self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket connection lost", prefix) - -        self.handshake_complete = True -        if exc is None: -            self.transport.close() - -    def eof_received(self) -> None: -        pass - -    def data_received(self, data: bytes) -> None: -        try: -            self.conn.receive_data(data) -        except RemoteProtocolError as err: -            # TODO: Remove `type: ignore` when wsproto fixes the type annotation. -            self.transport.write(self.conn.send(err.event_hint))  # type: ignore[arg-type]  # noqa: E501 -            self.transport.close() -        else: -            self.handle_events() - -    def handle_events(self) -> None: -        for event in self.conn.events(): -            if isinstance(event, events.Request): -                self.handle_connect(event) -            elif isinstance(event, events.TextMessage): -                self.handle_text(event) -            elif isinstance(event, events.BytesMessage): -                self.handle_bytes(event) -            elif isinstance(event, events.CloseConnection): -                self.handle_close(event) -            elif isinstance(event, events.Ping): -                self.handle_ping(event) - -    def pause_writing(self) -> None: -        """ -        Called by the transport when the write buffer exceeds the high water mark. -        """ -        self.writable.clear() - -    def resume_writing(self) -> None: -        """ -        Called by the transport when the write buffer drops below the low water mark. -        """ -        self.writable.set() - -    def shutdown(self) -> None: -        if self.handshake_complete: -            self.queue.put_nowait({"type": "websocket.disconnect", "code": 1012}) -            output = self.conn.send(wsproto.events.CloseConnection(code=1012)) -            self.transport.write(output) -        else: -            self.send_500_response() -        self.transport.close() - -    def on_task_complete(self, task: asyncio.Task) -> None: -        self.tasks.discard(task) - -    # Event handlers - -    def handle_connect(self, event: events.Request) -> None: -        headers = [(b"host", event.host.encode())] -        headers += [(key.lower(), value) for key, value in event.extra_headers] -        raw_path, _, query_string = event.target.partition("?") -        path = unquote(raw_path) -        full_path = self.root_path + path -        full_raw_path = self.root_path.encode("ascii") + raw_path.encode("ascii") -        self.scope: WebSocketScope = { -            "type": "websocket", -            "asgi": {"version": self.config.asgi_version, "spec_version": "2.4"}, -            "http_version": "1.1", -            "scheme": self.scheme, -            "server": self.server, -            "client": self.client, -            "root_path": self.root_path, -            "path": full_path, -            "raw_path": full_raw_path, -            "query_string": query_string.encode("ascii"), -            "headers": headers, -            "subprotocols": event.subprotocols, -            "state": self.app_state.copy(), -            "extensions": {"websocket.http.response": {}}, -        } -        self.queue.put_nowait({"type": "websocket.connect"}) -        task = self.loop.create_task(self.run_asgi()) -        task.add_done_callback(self.on_task_complete) -        self.tasks.add(task) - -    def handle_text(self, event: events.TextMessage) -> None: -        self.text += event.data -        if event.message_finished: -            self.queue.put_nowait({"type": "websocket.receive", "text": self.text}) -            self.text = "" -            if not self.read_paused: -                self.read_paused = True -                self.transport.pause_reading() - -    def handle_bytes(self, event: events.BytesMessage) -> None: -        self.bytes += event.data -        # todo: we may want to guard the size of self.bytes and self.text -        if event.message_finished: -            self.queue.put_nowait({"type": "websocket.receive", "bytes": self.bytes}) -            self.bytes = b"" -            if not self.read_paused: -                self.read_paused = True -                self.transport.pause_reading() - -    def handle_close(self, event: events.CloseConnection) -> None: -        if self.conn.state == ConnectionState.REMOTE_CLOSING: -            self.transport.write(self.conn.send(event.response())) -        self.queue.put_nowait({"type": "websocket.disconnect", "code": event.code}) -        self.transport.close() - -    def handle_ping(self, event: events.Ping) -> None: -        self.transport.write(self.conn.send(event.response())) - -    def send_500_response(self) -> None: -        if self.response_started or self.handshake_complete: -            return  # we cannot send responses anymore -        headers = [ -            (b"content-type", b"text/plain; charset=utf-8"), -            (b"connection", b"close"), -        ] -        output = self.conn.send(wsproto.events.RejectConnection(status_code=500, headers=headers, has_body=True)) -        output += self.conn.send(wsproto.events.RejectData(data=b"Internal Server Error")) -        self.transport.write(output) - -    async def run_asgi(self) -> None: -        try: -            result = await self.app(self.scope, self.receive, self.send) -        except ClientDisconnected: -            self.transport.close() -        except BaseException: -            self.logger.exception("Exception in ASGI application\n") -            self.send_500_response() -            self.transport.close() -        else: -            if not self.handshake_complete: -                msg = "ASGI callable returned without completing handshake." -                self.logger.error(msg) -                self.send_500_response() -                self.transport.close() -            elif result is not None: -                msg = "ASGI callable should return None, but returned '%s'." -                self.logger.error(msg, result) -                self.transport.close() - -    async def send(self, message: ASGISendEvent) -> None: -        await self.writable.wait() - -        message_type = message["type"] - -        if not self.handshake_complete: -            if message_type == "websocket.accept": -                message = typing.cast(WebSocketAcceptEvent, message) -                self.logger.info( -                    '%s - "WebSocket %s" [accepted]', -                    self.scope["client"], -                    get_path_with_query_string(self.scope), -                ) -                subprotocol = message.get("subprotocol") -                extra_headers = self.default_headers + list(message.get("headers", [])) -                extensions: list[Extension] = [] -                if self.config.ws_per_message_deflate: -                    extensions.append(PerMessageDeflate()) -                if not self.transport.is_closing(): -                    self.handshake_complete = True -                    output = self.conn.send( -                        wsproto.events.AcceptConnection( -                            subprotocol=subprotocol, -                            extensions=extensions, -                            extra_headers=extra_headers, -                        ) -                    ) -                    self.transport.write(output) - -            elif message_type == "websocket.close": -                self.queue.put_nowait({"type": "websocket.disconnect", "code": 1006}) -                self.logger.info( -                    '%s - "WebSocket %s" 403', -                    self.scope["client"], -                    get_path_with_query_string(self.scope), -                ) -                self.handshake_complete = True -                self.close_sent = True -                event = events.RejectConnection(status_code=403, headers=[]) -                output = self.conn.send(event) -                self.transport.write(output) -                self.transport.close() - -            elif message_type == "websocket.http.response.start": -                message = typing.cast(WebSocketResponseStartEvent, message) -                # ensure status code is in the valid range -                if not (100 <= message["status"] < 600): -                    msg = "Invalid HTTP status code '%d' in response." -                    raise RuntimeError(msg % message["status"]) -                self.logger.info( -                    '%s - "WebSocket %s" %d', -                    self.scope["client"], -                    get_path_with_query_string(self.scope), -                    message["status"], -                ) -                self.handshake_complete = True -                event = events.RejectConnection( -                    status_code=message["status"], -                    headers=list(message["headers"]), -                    has_body=True, -                ) -                output = self.conn.send(event) -                self.transport.write(output) -                self.response_started = True - -            else: -                msg = ( -                    "Expected ASGI message 'websocket.accept', 'websocket.close' " -                    "or 'websocket.http.response.start' " -                    "but got '%s'." -                ) -                raise RuntimeError(msg % message_type) - -        elif not self.close_sent and not self.response_started: -            try: -                if message_type == "websocket.send": -                    message = typing.cast(WebSocketSendEvent, message) -                    bytes_data = message.get("bytes") -                    text_data = message.get("text") -                    data = text_data if bytes_data is None else bytes_data -                    output = self.conn.send(wsproto.events.Message(data=data))  # type: ignore -                    if not self.transport.is_closing(): -                        self.transport.write(output) - -                elif message_type == "websocket.close": -                    message = typing.cast(WebSocketCloseEvent, message) -                    self.close_sent = True -                    code = message.get("code", 1000) -                    reason = message.get("reason", "") or "" -                    self.queue.put_nowait({"type": "websocket.disconnect", "code": code}) -                    output = self.conn.send(wsproto.events.CloseConnection(code=code, reason=reason)) -                    if not self.transport.is_closing(): -                        self.transport.write(output) -                        self.transport.close() - -                else: -                    msg = "Expected ASGI message 'websocket.send' or 'websocket.close'," " but got '%s'." -                    raise RuntimeError(msg % message_type) -            except LocalProtocolError as exc: -                raise ClientDisconnected from exc -        elif self.response_started: -            if message_type == "websocket.http.response.body": -                message = typing.cast("WebSocketResponseBodyEvent", message) -                body_finished = not message.get("more_body", False) -                reject_data = events.RejectData(data=message["body"], body_finished=body_finished) -                output = self.conn.send(reject_data) -                self.transport.write(output) - -                if body_finished: -                    self.queue.put_nowait({"type": "websocket.disconnect", "code": 1006}) -                    self.close_sent = True -                    self.transport.close() - -            else: -                msg = "Expected ASGI message 'websocket.http.response.body' " "but got '%s'." -                raise RuntimeError(msg % message_type) - -        else: -            msg = "Unexpected ASGI message '%s', after sending 'websocket.close'." -            raise RuntimeError(msg % message_type) - -    async def receive(self) -> WebSocketEvent: -        message = await self.queue.get() -        if self.read_paused and self.queue.empty(): -            self.read_paused = False -            self.transport.resume_reading() -        return message  | 
