summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/h11/tests
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/h11/tests
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/h11/tests')
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__init__.py0
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/__init__.cpython-311.pycbin0 -> 193 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/helpers.cpython-311.pycbin0 -> 5113 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_against_stdlib_http.cpython-311.pycbin0 -> 8277 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_connection.cpython-311.pycbin0 -> 68029 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_events.cpython-311.pycbin0 -> 6829 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_headers.cpython-311.pycbin0 -> 9606 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_helpers.cpython-311.pycbin0 -> 1529 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_io.cpython-311.pycbin0 -> 25542 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_receivebuffer.cpython-311.pycbin0 -> 4608 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_state.cpython-311.pycbin0 -> 15210 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_util.cpython-311.pycbin0 -> 7816 bytes
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/data/test-file1
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/helpers.py101
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/test_against_stdlib_http.py115
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/test_connection.py1122
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/test_events.py150
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/test_headers.py157
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/test_helpers.py32
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/test_io.py572
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/test_receivebuffer.py135
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/test_state.py271
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/test_util.py112
23 files changed, 2768 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__init__.py b/venv/lib/python3.11/site-packages/h11/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__init__.py
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000..b45464e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/__init__.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/helpers.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/helpers.cpython-311.pyc
new file mode 100644
index 0000000..d8e415d
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/helpers.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_against_stdlib_http.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_against_stdlib_http.cpython-311.pyc
new file mode 100644
index 0000000..564b601
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_against_stdlib_http.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_connection.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_connection.cpython-311.pyc
new file mode 100644
index 0000000..c9f269e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_connection.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_events.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_events.cpython-311.pyc
new file mode 100644
index 0000000..90bb491
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_events.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_headers.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_headers.cpython-311.pyc
new file mode 100644
index 0000000..5c15287
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_headers.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_helpers.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_helpers.cpython-311.pyc
new file mode 100644
index 0000000..dcefc5a
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_helpers.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_io.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_io.cpython-311.pyc
new file mode 100644
index 0000000..9abf567
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_io.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_receivebuffer.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_receivebuffer.cpython-311.pyc
new file mode 100644
index 0000000..8f1f6db
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_receivebuffer.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_state.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_state.cpython-311.pyc
new file mode 100644
index 0000000..1115678
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_state.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_util.cpython-311.pyc b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_util.cpython-311.pyc
new file mode 100644
index 0000000..b3bfd20
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/__pycache__/test_util.cpython-311.pyc
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/h11/tests/data/test-file b/venv/lib/python3.11/site-packages/h11/tests/data/test-file
new file mode 100644
index 0000000..d0be0a6
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/data/test-file
@@ -0,0 +1 @@
+92b12bc045050b55b848d37167a1a63947c364579889ce1d39788e45e9fac9e5
diff --git a/venv/lib/python3.11/site-packages/h11/tests/helpers.py b/venv/lib/python3.11/site-packages/h11/tests/helpers.py
new file mode 100644
index 0000000..571be44
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/helpers.py
@@ -0,0 +1,101 @@
+from typing import cast, List, Type, Union, ValuesView
+
+from .._connection import Connection, NEED_DATA, PAUSED
+from .._events import (
+ ConnectionClosed,
+ Data,
+ EndOfMessage,
+ Event,
+ InformationalResponse,
+ Request,
+ Response,
+)
+from .._state import CLIENT, CLOSED, DONE, MUST_CLOSE, SERVER
+from .._util import Sentinel
+
+try:
+ from typing import Literal
+except ImportError:
+ from typing_extensions import Literal # type: ignore
+
+
+def get_all_events(conn: Connection) -> List[Event]:
+ got_events = []
+ while True:
+ event = conn.next_event()
+ if event in (NEED_DATA, PAUSED):
+ break
+ event = cast(Event, event)
+ got_events.append(event)
+ if type(event) is ConnectionClosed:
+ break
+ return got_events
+
+
+def receive_and_get(conn: Connection, data: bytes) -> List[Event]:
+ conn.receive_data(data)
+ return get_all_events(conn)
+
+
+# Merges adjacent Data events, converts payloads to bytestrings, and removes
+# chunk boundaries.
+def normalize_data_events(in_events: List[Event]) -> List[Event]:
+ out_events: List[Event] = []
+ for event in in_events:
+ if type(event) is Data:
+ event = Data(data=bytes(event.data), chunk_start=False, chunk_end=False)
+ if out_events and type(out_events[-1]) is type(event) is Data:
+ out_events[-1] = Data(
+ data=out_events[-1].data + event.data,
+ chunk_start=out_events[-1].chunk_start,
+ chunk_end=out_events[-1].chunk_end,
+ )
+ else:
+ out_events.append(event)
+ return out_events
+
+
+# Given that we want to write tests that push some events through a Connection
+# and check that its state updates appropriately... we might as make a habit
+# of pushing them through two Connections with a fake network link in
+# between.
+class ConnectionPair:
+ def __init__(self) -> None:
+ self.conn = {CLIENT: Connection(CLIENT), SERVER: Connection(SERVER)}
+ self.other = {CLIENT: SERVER, SERVER: CLIENT}
+
+ @property
+ def conns(self) -> ValuesView[Connection]:
+ return self.conn.values()
+
+ # expect="match" if expect=send_events; expect=[...] to say what expected
+ def send(
+ self,
+ role: Type[Sentinel],
+ send_events: Union[List[Event], Event],
+ expect: Union[List[Event], Event, Literal["match"]] = "match",
+ ) -> bytes:
+ if not isinstance(send_events, list):
+ send_events = [send_events]
+ data = b""
+ closed = False
+ for send_event in send_events:
+ new_data = self.conn[role].send(send_event)
+ if new_data is None:
+ closed = True
+ else:
+ data += new_data
+ # send uses b"" to mean b"", and None to mean closed
+ # receive uses b"" to mean closed, and None to mean "try again"
+ # so we have to translate between the two conventions
+ if data:
+ self.conn[self.other[role]].receive_data(data)
+ if closed:
+ self.conn[self.other[role]].receive_data(b"")
+ got_events = get_all_events(self.conn[self.other[role]])
+ if expect == "match":
+ expect = send_events
+ if not isinstance(expect, list):
+ expect = [expect]
+ assert got_events == expect
+ return data
diff --git a/venv/lib/python3.11/site-packages/h11/tests/test_against_stdlib_http.py b/venv/lib/python3.11/site-packages/h11/tests/test_against_stdlib_http.py
new file mode 100644
index 0000000..d2ee131
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/test_against_stdlib_http.py
@@ -0,0 +1,115 @@
+import json
+import os.path
+import socket
+import socketserver
+import threading
+from contextlib import closing, contextmanager
+from http.server import SimpleHTTPRequestHandler
+from typing import Callable, Generator
+from urllib.request import urlopen
+
+import h11
+
+
+@contextmanager
+def socket_server(
+ handler: Callable[..., socketserver.BaseRequestHandler]
+) -> Generator[socketserver.TCPServer, None, None]:
+ httpd = socketserver.TCPServer(("127.0.0.1", 0), handler)
+ thread = threading.Thread(
+ target=httpd.serve_forever, kwargs={"poll_interval": 0.01}
+ )
+ thread.daemon = True
+ try:
+ thread.start()
+ yield httpd
+ finally:
+ httpd.shutdown()
+
+
+test_file_path = os.path.join(os.path.dirname(__file__), "data/test-file")
+with open(test_file_path, "rb") as f:
+ test_file_data = f.read()
+
+
+class SingleMindedRequestHandler(SimpleHTTPRequestHandler):
+ def translate_path(self, path: str) -> str:
+ return test_file_path
+
+
+def test_h11_as_client() -> None:
+ with socket_server(SingleMindedRequestHandler) as httpd:
+ with closing(socket.create_connection(httpd.server_address)) as s:
+ c = h11.Connection(h11.CLIENT)
+
+ s.sendall(
+ c.send( # type: ignore[arg-type]
+ h11.Request(
+ method="GET", target="/foo", headers=[("Host", "localhost")]
+ )
+ )
+ )
+ s.sendall(c.send(h11.EndOfMessage())) # type: ignore[arg-type]
+
+ data = bytearray()
+ while True:
+ event = c.next_event()
+ print(event)
+ if event is h11.NEED_DATA:
+ # Use a small read buffer to make things more challenging
+ # and exercise more paths :-)
+ c.receive_data(s.recv(10))
+ continue
+ if type(event) is h11.Response:
+ assert event.status_code == 200
+ if type(event) is h11.Data:
+ data += event.data
+ if type(event) is h11.EndOfMessage:
+ break
+ assert bytes(data) == test_file_data
+
+
+class H11RequestHandler(socketserver.BaseRequestHandler):
+ def handle(self) -> None:
+ with closing(self.request) as s:
+ c = h11.Connection(h11.SERVER)
+ request = None
+ while True:
+ event = c.next_event()
+ if event is h11.NEED_DATA:
+ # Use a small read buffer to make things more challenging
+ # and exercise more paths :-)
+ c.receive_data(s.recv(10))
+ continue
+ if type(event) is h11.Request:
+ request = event
+ if type(event) is h11.EndOfMessage:
+ break
+ assert request is not None
+ info = json.dumps(
+ {
+ "method": request.method.decode("ascii"),
+ "target": request.target.decode("ascii"),
+ "headers": {
+ name.decode("ascii"): value.decode("ascii")
+ for (name, value) in request.headers
+ },
+ }
+ )
+ s.sendall(c.send(h11.Response(status_code=200, headers=[]))) # type: ignore[arg-type]
+ s.sendall(c.send(h11.Data(data=info.encode("ascii"))))
+ s.sendall(c.send(h11.EndOfMessage()))
+
+
+def test_h11_as_server() -> None:
+ with socket_server(H11RequestHandler) as httpd:
+ host, port = httpd.server_address
+ url = "http://{}:{}/some-path".format(host, port)
+ with closing(urlopen(url)) as f:
+ assert f.getcode() == 200
+ data = f.read()
+ info = json.loads(data.decode("ascii"))
+ print(info)
+ assert info["method"] == "GET"
+ assert info["target"] == "/some-path"
+ assert "urllib" in info["headers"]["user-agent"]
diff --git a/venv/lib/python3.11/site-packages/h11/tests/test_connection.py b/venv/lib/python3.11/site-packages/h11/tests/test_connection.py
new file mode 100644
index 0000000..73a27b9
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/test_connection.py
@@ -0,0 +1,1122 @@
+from typing import Any, cast, Dict, List, Optional, Tuple, Type
+
+import pytest
+
+from .._connection import _body_framing, _keep_alive, Connection, NEED_DATA, PAUSED
+from .._events import (
+ ConnectionClosed,
+ Data,
+ EndOfMessage,
+ Event,
+ InformationalResponse,
+ Request,
+ Response,
+)
+from .._state import (
+ CLIENT,
+ CLOSED,
+ DONE,
+ ERROR,
+ IDLE,
+ MIGHT_SWITCH_PROTOCOL,
+ MUST_CLOSE,
+ SEND_BODY,
+ SEND_RESPONSE,
+ SERVER,
+ SWITCHED_PROTOCOL,
+)
+from .._util import LocalProtocolError, RemoteProtocolError, Sentinel
+from .helpers import ConnectionPair, get_all_events, receive_and_get
+
+
+def test__keep_alive() -> None:
+ assert _keep_alive(
+ Request(method="GET", target="/", headers=[("Host", "Example.com")])
+ )
+ assert not _keep_alive(
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "Example.com"), ("Connection", "close")],
+ )
+ )
+ assert not _keep_alive(
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "Example.com"), ("Connection", "a, b, cLOse, foo")],
+ )
+ )
+ assert not _keep_alive(
+ Request(method="GET", target="/", headers=[], http_version="1.0") # type: ignore[arg-type]
+ )
+
+ assert _keep_alive(Response(status_code=200, headers=[])) # type: ignore[arg-type]
+ assert not _keep_alive(Response(status_code=200, headers=[("Connection", "close")]))
+ assert not _keep_alive(
+ Response(status_code=200, headers=[("Connection", "a, b, cLOse, foo")])
+ )
+ assert not _keep_alive(Response(status_code=200, headers=[], http_version="1.0")) # type: ignore[arg-type]
+
+
+def test__body_framing() -> None:
+ def headers(cl: Optional[int], te: bool) -> List[Tuple[str, str]]:
+ headers = []
+ if cl is not None:
+ headers.append(("Content-Length", str(cl)))
+ if te:
+ headers.append(("Transfer-Encoding", "chunked"))
+ return headers
+
+ def resp(
+ status_code: int = 200, cl: Optional[int] = None, te: bool = False
+ ) -> Response:
+ return Response(status_code=status_code, headers=headers(cl, te))
+
+ def req(cl: Optional[int] = None, te: bool = False) -> Request:
+ h = headers(cl, te)
+ h += [("Host", "example.com")]
+ return Request(method="GET", target="/", headers=h)
+
+ # Special cases where the headers are ignored:
+ for kwargs in [{}, {"cl": 100}, {"te": True}, {"cl": 100, "te": True}]:
+ kwargs = cast(Dict[str, Any], kwargs)
+ for meth, r in [
+ (b"HEAD", resp(**kwargs)),
+ (b"GET", resp(status_code=204, **kwargs)),
+ (b"GET", resp(status_code=304, **kwargs)),
+ ]:
+ assert _body_framing(meth, r) == ("content-length", (0,))
+
+ # Transfer-encoding
+ for kwargs in [{"te": True}, {"cl": 100, "te": True}]:
+ kwargs = cast(Dict[str, Any], kwargs)
+ for meth, r in [(None, req(**kwargs)), (b"GET", resp(**kwargs))]: # type: ignore
+ assert _body_framing(meth, r) == ("chunked", ())
+
+ # Content-Length
+ for meth, r in [(None, req(cl=100)), (b"GET", resp(cl=100))]: # type: ignore
+ assert _body_framing(meth, r) == ("content-length", (100,))
+
+ # No headers
+ assert _body_framing(None, req()) == ("content-length", (0,)) # type: ignore
+ assert _body_framing(b"GET", resp()) == ("http/1.0", ())
+
+
+def test_Connection_basics_and_content_length() -> None:
+ with pytest.raises(ValueError):
+ Connection("CLIENT") # type: ignore
+
+ p = ConnectionPair()
+ assert p.conn[CLIENT].our_role is CLIENT
+ assert p.conn[CLIENT].their_role is SERVER
+ assert p.conn[SERVER].our_role is SERVER
+ assert p.conn[SERVER].their_role is CLIENT
+
+ data = p.send(
+ CLIENT,
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "example.com"), ("Content-Length", "10")],
+ ),
+ )
+ assert data == (
+ b"GET / HTTP/1.1\r\n" b"Host: example.com\r\n" b"Content-Length: 10\r\n\r\n"
+ )
+
+ for conn in p.conns:
+ assert conn.states == {CLIENT: SEND_BODY, SERVER: SEND_RESPONSE}
+ assert p.conn[CLIENT].our_state is SEND_BODY
+ assert p.conn[CLIENT].their_state is SEND_RESPONSE
+ assert p.conn[SERVER].our_state is SEND_RESPONSE
+ assert p.conn[SERVER].their_state is SEND_BODY
+
+ assert p.conn[CLIENT].their_http_version is None
+ assert p.conn[SERVER].their_http_version == b"1.1"
+
+ data = p.send(SERVER, InformationalResponse(status_code=100, headers=[])) # type: ignore[arg-type]
+ assert data == b"HTTP/1.1 100 \r\n\r\n"
+
+ data = p.send(SERVER, Response(status_code=200, headers=[("Content-Length", "11")]))
+ assert data == b"HTTP/1.1 200 \r\nContent-Length: 11\r\n\r\n"
+
+ for conn in p.conns:
+ assert conn.states == {CLIENT: SEND_BODY, SERVER: SEND_BODY}
+
+ assert p.conn[CLIENT].their_http_version == b"1.1"
+ assert p.conn[SERVER].their_http_version == b"1.1"
+
+ data = p.send(CLIENT, Data(data=b"12345"))
+ assert data == b"12345"
+ data = p.send(
+ CLIENT, Data(data=b"67890"), expect=[Data(data=b"67890"), EndOfMessage()]
+ )
+ assert data == b"67890"
+ data = p.send(CLIENT, EndOfMessage(), expect=[])
+ assert data == b""
+
+ for conn in p.conns:
+ assert conn.states == {CLIENT: DONE, SERVER: SEND_BODY}
+
+ data = p.send(SERVER, Data(data=b"1234567890"))
+ assert data == b"1234567890"
+ data = p.send(SERVER, Data(data=b"1"), expect=[Data(data=b"1"), EndOfMessage()])
+ assert data == b"1"
+ data = p.send(SERVER, EndOfMessage(), expect=[])
+ assert data == b""
+
+ for conn in p.conns:
+ assert conn.states == {CLIENT: DONE, SERVER: DONE}
+
+
+def test_chunked() -> None:
+ p = ConnectionPair()
+
+ p.send(
+ CLIENT,
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "example.com"), ("Transfer-Encoding", "chunked")],
+ ),
+ )
+ data = p.send(CLIENT, Data(data=b"1234567890", chunk_start=True, chunk_end=True))
+ assert data == b"a\r\n1234567890\r\n"
+ data = p.send(CLIENT, Data(data=b"abcde", chunk_start=True, chunk_end=True))
+ assert data == b"5\r\nabcde\r\n"
+ data = p.send(CLIENT, Data(data=b""), expect=[])
+ assert data == b""
+ data = p.send(CLIENT, EndOfMessage(headers=[("hello", "there")]))
+ assert data == b"0\r\nhello: there\r\n\r\n"
+
+ p.send(
+ SERVER, Response(status_code=200, headers=[("Transfer-Encoding", "chunked")])
+ )
+ p.send(SERVER, Data(data=b"54321", chunk_start=True, chunk_end=True))
+ p.send(SERVER, Data(data=b"12345", chunk_start=True, chunk_end=True))
+ p.send(SERVER, EndOfMessage())
+
+ for conn in p.conns:
+ assert conn.states == {CLIENT: DONE, SERVER: DONE}
+
+
+def test_chunk_boundaries() -> None:
+ conn = Connection(our_role=SERVER)
+
+ request = (
+ b"POST / HTTP/1.1\r\n"
+ b"Host: example.com\r\n"
+ b"Transfer-Encoding: chunked\r\n"
+ b"\r\n"
+ )
+ conn.receive_data(request)
+ assert conn.next_event() == Request(
+ method="POST",
+ target="/",
+ headers=[("Host", "example.com"), ("Transfer-Encoding", "chunked")],
+ )
+ assert conn.next_event() is NEED_DATA
+
+ conn.receive_data(b"5\r\nhello\r\n")
+ assert conn.next_event() == Data(data=b"hello", chunk_start=True, chunk_end=True)
+
+ conn.receive_data(b"5\r\nhel")
+ assert conn.next_event() == Data(data=b"hel", chunk_start=True, chunk_end=False)
+
+ conn.receive_data(b"l")
+ assert conn.next_event() == Data(data=b"l", chunk_start=False, chunk_end=False)
+
+ conn.receive_data(b"o\r\n")
+ assert conn.next_event() == Data(data=b"o", chunk_start=False, chunk_end=True)
+
+ conn.receive_data(b"5\r\nhello")
+ assert conn.next_event() == Data(data=b"hello", chunk_start=True, chunk_end=True)
+
+ conn.receive_data(b"\r\n")
+ assert conn.next_event() == NEED_DATA
+
+ conn.receive_data(b"0\r\n\r\n")
+ assert conn.next_event() == EndOfMessage()
+
+
+def test_client_talking_to_http10_server() -> None:
+ c = Connection(CLIENT)
+ c.send(Request(method="GET", target="/", headers=[("Host", "example.com")]))
+ c.send(EndOfMessage())
+ assert c.our_state is DONE
+ # No content-length, so Http10 framing for body
+ assert receive_and_get(c, b"HTTP/1.0 200 OK\r\n\r\n") == [
+ Response(status_code=200, headers=[], http_version="1.0", reason=b"OK") # type: ignore[arg-type]
+ ]
+ assert c.our_state is MUST_CLOSE
+ assert receive_and_get(c, b"12345") == [Data(data=b"12345")]
+ assert receive_and_get(c, b"67890") == [Data(data=b"67890")]
+ assert receive_and_get(c, b"") == [EndOfMessage(), ConnectionClosed()]
+ assert c.their_state is CLOSED
+
+
+def test_server_talking_to_http10_client() -> None:
+ c = Connection(SERVER)
+ # No content-length, so no body
+ # NB: no host header
+ assert receive_and_get(c, b"GET / HTTP/1.0\r\n\r\n") == [
+ Request(method="GET", target="/", headers=[], http_version="1.0"), # type: ignore[arg-type]
+ EndOfMessage(),
+ ]
+ assert c.their_state is MUST_CLOSE
+
+ # We automatically Connection: close back at them
+ assert (
+ c.send(Response(status_code=200, headers=[])) # type: ignore[arg-type]
+ == b"HTTP/1.1 200 \r\nConnection: close\r\n\r\n"
+ )
+
+ assert c.send(Data(data=b"12345")) == b"12345"
+ assert c.send(EndOfMessage()) == b""
+ assert c.our_state is MUST_CLOSE
+
+ # Check that it works if they do send Content-Length
+ c = Connection(SERVER)
+ # NB: no host header
+ assert receive_and_get(c, b"POST / HTTP/1.0\r\nContent-Length: 10\r\n\r\n1") == [
+ Request(
+ method="POST",
+ target="/",
+ headers=[("Content-Length", "10")],
+ http_version="1.0",
+ ),
+ Data(data=b"1"),
+ ]
+ assert receive_and_get(c, b"234567890") == [Data(data=b"234567890"), EndOfMessage()]
+ assert c.their_state is MUST_CLOSE
+ assert receive_and_get(c, b"") == [ConnectionClosed()]
+
+
+def test_automatic_transfer_encoding_in_response() -> None:
+ # Check that in responses, the user can specify either Transfer-Encoding:
+ # chunked or no framing at all, and in both cases we automatically select
+ # the right option depending on whether the peer speaks HTTP/1.0 or
+ # HTTP/1.1
+ for user_headers in [
+ [("Transfer-Encoding", "chunked")],
+ [],
+ # In fact, this even works if Content-Length is set,
+ # because if both are set then Transfer-Encoding wins
+ [("Transfer-Encoding", "chunked"), ("Content-Length", "100")],
+ ]:
+ user_headers = cast(List[Tuple[str, str]], user_headers)
+ p = ConnectionPair()
+ p.send(
+ CLIENT,
+ [
+ Request(method="GET", target="/", headers=[("Host", "example.com")]),
+ EndOfMessage(),
+ ],
+ )
+ # When speaking to HTTP/1.1 client, all of the above cases get
+ # normalized to Transfer-Encoding: chunked
+ p.send(
+ SERVER,
+ Response(status_code=200, headers=user_headers),
+ expect=Response(
+ status_code=200, headers=[("Transfer-Encoding", "chunked")]
+ ),
+ )
+
+ # When speaking to HTTP/1.0 client, all of the above cases get
+ # normalized to no-framing-headers
+ c = Connection(SERVER)
+ receive_and_get(c, b"GET / HTTP/1.0\r\n\r\n")
+ assert (
+ c.send(Response(status_code=200, headers=user_headers))
+ == b"HTTP/1.1 200 \r\nConnection: close\r\n\r\n"
+ )
+ assert c.send(Data(data=b"12345")) == b"12345"
+
+
+def test_automagic_connection_close_handling() -> None:
+ p = ConnectionPair()
+ # If the user explicitly sets Connection: close, then we notice and
+ # respect it
+ p.send(
+ CLIENT,
+ [
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "example.com"), ("Connection", "close")],
+ ),
+ EndOfMessage(),
+ ],
+ )
+ for conn in p.conns:
+ assert conn.states[CLIENT] is MUST_CLOSE
+ # And if the client sets it, the server automatically echoes it back
+ p.send(
+ SERVER,
+ # no header here...
+ [Response(status_code=204, headers=[]), EndOfMessage()], # type: ignore[arg-type]
+ # ...but oh look, it arrived anyway
+ expect=[
+ Response(status_code=204, headers=[("connection", "close")]),
+ EndOfMessage(),
+ ],
+ )
+ for conn in p.conns:
+ assert conn.states == {CLIENT: MUST_CLOSE, SERVER: MUST_CLOSE}
+
+
+def test_100_continue() -> None:
+ def setup() -> ConnectionPair:
+ p = ConnectionPair()
+ p.send(
+ CLIENT,
+ Request(
+ method="GET",
+ target="/",
+ headers=[
+ ("Host", "example.com"),
+ ("Content-Length", "100"),
+ ("Expect", "100-continue"),
+ ],
+ ),
+ )
+ for conn in p.conns:
+ assert conn.client_is_waiting_for_100_continue
+ assert not p.conn[CLIENT].they_are_waiting_for_100_continue
+ assert p.conn[SERVER].they_are_waiting_for_100_continue
+ return p
+
+ # Disabled by 100 Continue
+ p = setup()
+ p.send(SERVER, InformationalResponse(status_code=100, headers=[])) # type: ignore[arg-type]
+ for conn in p.conns:
+ assert not conn.client_is_waiting_for_100_continue
+ assert not conn.they_are_waiting_for_100_continue
+
+ # Disabled by a real response
+ p = setup()
+ p.send(
+ SERVER, Response(status_code=200, headers=[("Transfer-Encoding", "chunked")])
+ )
+ for conn in p.conns:
+ assert not conn.client_is_waiting_for_100_continue
+ assert not conn.they_are_waiting_for_100_continue
+
+ # Disabled by the client going ahead and sending stuff anyway
+ p = setup()
+ p.send(CLIENT, Data(data=b"12345"))
+ for conn in p.conns:
+ assert not conn.client_is_waiting_for_100_continue
+ assert not conn.they_are_waiting_for_100_continue
+
+
+def test_max_incomplete_event_size_countermeasure() -> None:
+ # Infinitely long headers are definitely not okay
+ c = Connection(SERVER)
+ c.receive_data(b"GET / HTTP/1.0\r\nEndless: ")
+ assert c.next_event() is NEED_DATA
+ with pytest.raises(RemoteProtocolError):
+ while True:
+ c.receive_data(b"a" * 1024)
+ c.next_event()
+
+ # Checking that the same header is accepted / rejected depending on the
+ # max_incomplete_event_size setting:
+ c = Connection(SERVER, max_incomplete_event_size=5000)
+ c.receive_data(b"GET / HTTP/1.0\r\nBig: ")
+ c.receive_data(b"a" * 4000)
+ c.receive_data(b"\r\n\r\n")
+ assert get_all_events(c) == [
+ Request(
+ method="GET", target="/", http_version="1.0", headers=[("big", "a" * 4000)]
+ ),
+ EndOfMessage(),
+ ]
+
+ c = Connection(SERVER, max_incomplete_event_size=4000)
+ c.receive_data(b"GET / HTTP/1.0\r\nBig: ")
+ c.receive_data(b"a" * 4000)
+ with pytest.raises(RemoteProtocolError):
+ c.next_event()
+
+ # Temporarily exceeding the size limit is fine, as long as its done with
+ # complete events:
+ c = Connection(SERVER, max_incomplete_event_size=5000)
+ c.receive_data(b"GET / HTTP/1.0\r\nContent-Length: 10000")
+ c.receive_data(b"\r\n\r\n" + b"a" * 10000)
+ assert get_all_events(c) == [
+ Request(
+ method="GET",
+ target="/",
+ http_version="1.0",
+ headers=[("Content-Length", "10000")],
+ ),
+ Data(data=b"a" * 10000),
+ EndOfMessage(),
+ ]
+
+ c = Connection(SERVER, max_incomplete_event_size=100)
+ # Two pipelined requests to create a way-too-big receive buffer... but
+ # it's fine because we're not checking
+ c.receive_data(
+ b"GET /1 HTTP/1.1\r\nHost: a\r\n\r\n"
+ b"GET /2 HTTP/1.1\r\nHost: b\r\n\r\n" + b"X" * 1000
+ )
+ assert get_all_events(c) == [
+ Request(method="GET", target="/1", headers=[("host", "a")]),
+ EndOfMessage(),
+ ]
+ # Even more data comes in, still no problem
+ c.receive_data(b"X" * 1000)
+ # We can respond and reuse to get the second pipelined request
+ c.send(Response(status_code=200, headers=[])) # type: ignore[arg-type]
+ c.send(EndOfMessage())
+ c.start_next_cycle()
+ assert get_all_events(c) == [
+ Request(method="GET", target="/2", headers=[("host", "b")]),
+ EndOfMessage(),
+ ]
+ # But once we unpause and try to read the next message, and find that it's
+ # incomplete and the buffer is *still* way too large, then *that's* a
+ # problem:
+ c.send(Response(status_code=200, headers=[])) # type: ignore[arg-type]
+ c.send(EndOfMessage())
+ c.start_next_cycle()
+ with pytest.raises(RemoteProtocolError):
+ c.next_event()
+
+
+def test_reuse_simple() -> None:
+ p = ConnectionPair()
+ p.send(
+ CLIENT,
+ [Request(method="GET", target="/", headers=[("Host", "a")]), EndOfMessage()],
+ )
+ p.send(
+ SERVER,
+ [
+ Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
+ EndOfMessage(),
+ ],
+ )
+ for conn in p.conns:
+ assert conn.states == {CLIENT: DONE, SERVER: DONE}
+ conn.start_next_cycle()
+
+ p.send(
+ CLIENT,
+ [
+ Request(method="DELETE", target="/foo", headers=[("Host", "a")]),
+ EndOfMessage(),
+ ],
+ )
+ p.send(
+ SERVER,
+ [
+ Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]),
+ EndOfMessage(),
+ ],
+ )
+
+
+def test_pipelining() -> None:
+ # Client doesn't support pipelining, so we have to do this by hand
+ c = Connection(SERVER)
+ assert c.next_event() is NEED_DATA
+ # 3 requests all bunched up
+ c.receive_data(
+ b"GET /1 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n"
+ b"12345"
+ b"GET /2 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n"
+ b"67890"
+ b"GET /3 HTTP/1.1\r\nHost: a.com\r\n\r\n"
+ )
+ assert get_all_events(c) == [
+ Request(
+ method="GET",
+ target="/1",
+ headers=[("Host", "a.com"), ("Content-Length", "5")],
+ ),
+ Data(data=b"12345"),
+ EndOfMessage(),
+ ]
+ assert c.their_state is DONE
+ assert c.our_state is SEND_RESPONSE
+
+ assert c.next_event() is PAUSED
+
+ c.send(Response(status_code=200, headers=[])) # type: ignore[arg-type]
+ c.send(EndOfMessage())
+ assert c.their_state is DONE
+ assert c.our_state is DONE
+
+ c.start_next_cycle()
+
+ assert get_all_events(c) == [
+ Request(
+ method="GET",
+ target="/2",
+ headers=[("Host", "a.com"), ("Content-Length", "5")],
+ ),
+ Data(data=b"67890"),
+ EndOfMessage(),
+ ]
+ assert c.next_event() is PAUSED
+ c.send(Response(status_code=200, headers=[])) # type: ignore[arg-type]
+ c.send(EndOfMessage())
+ c.start_next_cycle()
+
+ assert get_all_events(c) == [
+ Request(method="GET", target="/3", headers=[("Host", "a.com")]),
+ EndOfMessage(),
+ ]
+ # Doesn't pause this time, no trailing data
+ assert c.next_event() is NEED_DATA
+ c.send(Response(status_code=200, headers=[])) # type: ignore[arg-type]
+ c.send(EndOfMessage())
+
+ # Arrival of more data triggers pause
+ assert c.next_event() is NEED_DATA
+ c.receive_data(b"SADF")
+ assert c.next_event() is PAUSED
+ assert c.trailing_data == (b"SADF", False)
+ # If EOF arrives while paused, we don't see that either:
+ c.receive_data(b"")
+ assert c.trailing_data == (b"SADF", True)
+ assert c.next_event() is PAUSED
+ c.receive_data(b"")
+ assert c.next_event() is PAUSED
+ # Can't call receive_data with non-empty buf after closing it
+ with pytest.raises(RuntimeError):
+ c.receive_data(b"FDSA")
+
+
+def test_protocol_switch() -> None:
+ for (req, deny, accept) in [
+ (
+ Request(
+ method="CONNECT",
+ target="example.com:443",
+ headers=[("Host", "foo"), ("Content-Length", "1")],
+ ),
+ Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]),
+ Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
+ ),
+ (
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "foo"), ("Content-Length", "1"), ("Upgrade", "a, b")],
+ ),
+ Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
+ InformationalResponse(status_code=101, headers=[("Upgrade", "a")]),
+ ),
+ (
+ Request(
+ method="CONNECT",
+ target="example.com:443",
+ headers=[("Host", "foo"), ("Content-Length", "1"), ("Upgrade", "a, b")],
+ ),
+ Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]),
+ # Accept CONNECT, not upgrade
+ Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
+ ),
+ (
+ Request(
+ method="CONNECT",
+ target="example.com:443",
+ headers=[("Host", "foo"), ("Content-Length", "1"), ("Upgrade", "a, b")],
+ ),
+ Response(status_code=404, headers=[(b"transfer-encoding", b"chunked")]),
+ # Accept Upgrade, not CONNECT
+ InformationalResponse(status_code=101, headers=[("Upgrade", "b")]),
+ ),
+ ]:
+
+ def setup() -> ConnectionPair:
+ p = ConnectionPair()
+ p.send(CLIENT, req)
+ # No switch-related state change stuff yet; the client has to
+ # finish the request before that kicks in
+ for conn in p.conns:
+ assert conn.states[CLIENT] is SEND_BODY
+ p.send(CLIENT, [Data(data=b"1"), EndOfMessage()])
+ for conn in p.conns:
+ assert conn.states[CLIENT] is MIGHT_SWITCH_PROTOCOL
+ assert p.conn[SERVER].next_event() is PAUSED
+ return p
+
+ # Test deny case
+ p = setup()
+ p.send(SERVER, deny)
+ for conn in p.conns:
+ assert conn.states == {CLIENT: DONE, SERVER: SEND_BODY}
+ p.send(SERVER, EndOfMessage())
+ # Check that re-use is still allowed after a denial
+ for conn in p.conns:
+ conn.start_next_cycle()
+
+ # Test accept case
+ p = setup()
+ p.send(SERVER, accept)
+ for conn in p.conns:
+ assert conn.states == {CLIENT: SWITCHED_PROTOCOL, SERVER: SWITCHED_PROTOCOL}
+ conn.receive_data(b"123")
+ assert conn.next_event() is PAUSED
+ conn.receive_data(b"456")
+ assert conn.next_event() is PAUSED
+ assert conn.trailing_data == (b"123456", False)
+
+ # Pausing in might-switch, then recovery
+ # (weird artificial case where the trailing data actually is valid
+ # HTTP for some reason, because this makes it easier to test the state
+ # logic)
+ p = setup()
+ sc = p.conn[SERVER]
+ sc.receive_data(b"GET / HTTP/1.0\r\n\r\n")
+ assert sc.next_event() is PAUSED
+ assert sc.trailing_data == (b"GET / HTTP/1.0\r\n\r\n", False)
+ sc.send(deny)
+ assert sc.next_event() is PAUSED
+ sc.send(EndOfMessage())
+ sc.start_next_cycle()
+ assert get_all_events(sc) == [
+ Request(method="GET", target="/", headers=[], http_version="1.0"), # type: ignore[arg-type]
+ EndOfMessage(),
+ ]
+
+ # When we're DONE, have no trailing data, and the connection gets
+ # closed, we report ConnectionClosed(). When we're in might-switch or
+ # switched, we don't.
+ p = setup()
+ sc = p.conn[SERVER]
+ sc.receive_data(b"")
+ assert sc.next_event() is PAUSED
+ assert sc.trailing_data == (b"", True)
+ p.send(SERVER, accept)
+ assert sc.next_event() is PAUSED
+
+ p = setup()
+ sc = p.conn[SERVER]
+ sc.receive_data(b"")
+ assert sc.next_event() is PAUSED
+ sc.send(deny)
+ assert sc.next_event() == ConnectionClosed()
+
+ # You can't send after switching protocols, or while waiting for a
+ # protocol switch
+ p = setup()
+ with pytest.raises(LocalProtocolError):
+ p.conn[CLIENT].send(
+ Request(method="GET", target="/", headers=[("Host", "a")])
+ )
+ p = setup()
+ p.send(SERVER, accept)
+ with pytest.raises(LocalProtocolError):
+ p.conn[SERVER].send(Data(data=b"123"))
+
+
+def test_close_simple() -> None:
+ # Just immediately closing a new connection without anything having
+ # happened yet.
+ for (who_shot_first, who_shot_second) in [(CLIENT, SERVER), (SERVER, CLIENT)]:
+
+ def setup() -> ConnectionPair:
+ p = ConnectionPair()
+ p.send(who_shot_first, ConnectionClosed())
+ for conn in p.conns:
+ assert conn.states == {
+ who_shot_first: CLOSED,
+ who_shot_second: MUST_CLOSE,
+ }
+ return p
+
+ # You can keep putting b"" into a closed connection, and you keep
+ # getting ConnectionClosed() out:
+ p = setup()
+ assert p.conn[who_shot_second].next_event() == ConnectionClosed()
+ assert p.conn[who_shot_second].next_event() == ConnectionClosed()
+ p.conn[who_shot_second].receive_data(b"")
+ assert p.conn[who_shot_second].next_event() == ConnectionClosed()
+ # Second party can close...
+ p = setup()
+ p.send(who_shot_second, ConnectionClosed())
+ for conn in p.conns:
+ assert conn.our_state is CLOSED
+ assert conn.their_state is CLOSED
+ # But trying to receive new data on a closed connection is a
+ # RuntimeError (not ProtocolError, because the problem here isn't
+ # violation of HTTP, it's violation of physics)
+ p = setup()
+ with pytest.raises(RuntimeError):
+ p.conn[who_shot_second].receive_data(b"123")
+ # And receiving new data on a MUST_CLOSE connection is a ProtocolError
+ p = setup()
+ p.conn[who_shot_first].receive_data(b"GET")
+ with pytest.raises(RemoteProtocolError):
+ p.conn[who_shot_first].next_event()
+
+
+def test_close_different_states() -> None:
+ req = [
+ Request(method="GET", target="/foo", headers=[("Host", "a")]),
+ EndOfMessage(),
+ ]
+ resp = [
+ Response(status_code=200, headers=[(b"transfer-encoding", b"chunked")]),
+ EndOfMessage(),
+ ]
+
+ # Client before request
+ p = ConnectionPair()
+ p.send(CLIENT, ConnectionClosed())
+ for conn in p.conns:
+ assert conn.states == {CLIENT: CLOSED, SERVER: MUST_CLOSE}
+
+ # Client after request
+ p = ConnectionPair()
+ p.send(CLIENT, req)
+ p.send(CLIENT, ConnectionClosed())
+ for conn in p.conns:
+ assert conn.states == {CLIENT: CLOSED, SERVER: SEND_RESPONSE}
+
+ # Server after request -> not allowed
+ p = ConnectionPair()
+ p.send(CLIENT, req)
+ with pytest.raises(LocalProtocolError):
+ p.conn[SERVER].send(ConnectionClosed())
+ p.conn[CLIENT].receive_data(b"")
+ with pytest.raises(RemoteProtocolError):
+ p.conn[CLIENT].next_event()
+
+ # Server after response
+ p = ConnectionPair()
+ p.send(CLIENT, req)
+ p.send(SERVER, resp)
+ p.send(SERVER, ConnectionClosed())
+ for conn in p.conns:
+ assert conn.states == {CLIENT: MUST_CLOSE, SERVER: CLOSED}
+
+ # Both after closing (ConnectionClosed() is idempotent)
+ p = ConnectionPair()
+ p.send(CLIENT, req)
+ p.send(SERVER, resp)
+ p.send(CLIENT, ConnectionClosed())
+ p.send(SERVER, ConnectionClosed())
+ p.send(CLIENT, ConnectionClosed())
+ p.send(SERVER, ConnectionClosed())
+
+ # In the middle of sending -> not allowed
+ p = ConnectionPair()
+ p.send(
+ CLIENT,
+ Request(
+ method="GET", target="/", headers=[("Host", "a"), ("Content-Length", "10")]
+ ),
+ )
+ with pytest.raises(LocalProtocolError):
+ p.conn[CLIENT].send(ConnectionClosed())
+ p.conn[SERVER].receive_data(b"")
+ with pytest.raises(RemoteProtocolError):
+ p.conn[SERVER].next_event()
+
+
+# Receive several requests and then client shuts down their side of the
+# connection; we can respond to each
+def test_pipelined_close() -> None:
+ c = Connection(SERVER)
+ # 2 requests then a close
+ c.receive_data(
+ b"GET /1 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n"
+ b"12345"
+ b"GET /2 HTTP/1.1\r\nHost: a.com\r\nContent-Length: 5\r\n\r\n"
+ b"67890"
+ )
+ c.receive_data(b"")
+ assert get_all_events(c) == [
+ Request(
+ method="GET",
+ target="/1",
+ headers=[("host", "a.com"), ("content-length", "5")],
+ ),
+ Data(data=b"12345"),
+ EndOfMessage(),
+ ]
+ assert c.states[CLIENT] is DONE
+ c.send(Response(status_code=200, headers=[])) # type: ignore[arg-type]
+ c.send(EndOfMessage())
+ assert c.states[SERVER] is DONE
+ c.start_next_cycle()
+ assert get_all_events(c) == [
+ Request(
+ method="GET",
+ target="/2",
+ headers=[("host", "a.com"), ("content-length", "5")],
+ ),
+ Data(data=b"67890"),
+ EndOfMessage(),
+ ConnectionClosed(),
+ ]
+ assert c.states == {CLIENT: CLOSED, SERVER: SEND_RESPONSE}
+ c.send(Response(status_code=200, headers=[])) # type: ignore[arg-type]
+ c.send(EndOfMessage())
+ assert c.states == {CLIENT: CLOSED, SERVER: MUST_CLOSE}
+ c.send(ConnectionClosed())
+ assert c.states == {CLIENT: CLOSED, SERVER: CLOSED}
+
+
+def test_sendfile() -> None:
+ class SendfilePlaceholder:
+ def __len__(self) -> int:
+ return 10
+
+ placeholder = SendfilePlaceholder()
+
+ def setup(
+ header: Tuple[str, str], http_version: str
+ ) -> Tuple[Connection, Optional[List[bytes]]]:
+ c = Connection(SERVER)
+ receive_and_get(
+ c, "GET / HTTP/{}\r\nHost: a\r\n\r\n".format(http_version).encode("ascii")
+ )
+ headers = []
+ if header:
+ headers.append(header)
+ c.send(Response(status_code=200, headers=headers))
+ return c, c.send_with_data_passthrough(Data(data=placeholder)) # type: ignore
+
+ c, data = setup(("Content-Length", "10"), "1.1")
+ assert data == [placeholder] # type: ignore
+ # Raises an error if the connection object doesn't think we've sent
+ # exactly 10 bytes
+ c.send(EndOfMessage())
+
+ _, data = setup(("Transfer-Encoding", "chunked"), "1.1")
+ assert placeholder in data # type: ignore
+ data[data.index(placeholder)] = b"x" * 10 # type: ignore
+ assert b"".join(data) == b"a\r\nxxxxxxxxxx\r\n" # type: ignore
+
+ c, data = setup(None, "1.0") # type: ignore
+ assert data == [placeholder] # type: ignore
+ assert c.our_state is SEND_BODY
+
+
+def test_errors() -> None:
+ # After a receive error, you can't receive
+ for role in [CLIENT, SERVER]:
+ c = Connection(our_role=role)
+ c.receive_data(b"gibberish\r\n\r\n")
+ with pytest.raises(RemoteProtocolError):
+ c.next_event()
+ # Now any attempt to receive continues to raise
+ assert c.their_state is ERROR
+ assert c.our_state is not ERROR
+ print(c._cstate.states)
+ with pytest.raises(RemoteProtocolError):
+ c.next_event()
+ # But we can still yell at the client for sending us gibberish
+ if role is SERVER:
+ assert (
+ c.send(Response(status_code=400, headers=[])) # type: ignore[arg-type]
+ == b"HTTP/1.1 400 \r\nConnection: close\r\n\r\n"
+ )
+
+ # After an error sending, you can no longer send
+ # (This is especially important for things like content-length errors,
+ # where there's complex internal state being modified)
+ def conn(role: Type[Sentinel]) -> Connection:
+ c = Connection(our_role=role)
+ if role is SERVER:
+ # Put it into the state where it *could* send a response...
+ receive_and_get(c, b"GET / HTTP/1.0\r\n\r\n")
+ assert c.our_state is SEND_RESPONSE
+ return c
+
+ for role in [CLIENT, SERVER]:
+ if role is CLIENT:
+ # This HTTP/1.0 request won't be detected as bad until after we go
+ # through the state machine and hit the writing code
+ good = Request(method="GET", target="/", headers=[("Host", "example.com")])
+ bad = Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "example.com")],
+ http_version="1.0",
+ )
+ elif role is SERVER:
+ good = Response(status_code=200, headers=[]) # type: ignore[arg-type,assignment]
+ bad = Response(status_code=200, headers=[], http_version="1.0") # type: ignore[arg-type,assignment]
+ # Make sure 'good' actually is good
+ c = conn(role)
+ c.send(good)
+ assert c.our_state is not ERROR
+ # Do that again, but this time sending 'bad' first
+ c = conn(role)
+ with pytest.raises(LocalProtocolError):
+ c.send(bad)
+ assert c.our_state is ERROR
+ assert c.their_state is not ERROR
+ # Now 'good' is not so good
+ with pytest.raises(LocalProtocolError):
+ c.send(good)
+
+ # And check send_failed() too
+ c = conn(role)
+ c.send_failed()
+ assert c.our_state is ERROR
+ assert c.their_state is not ERROR
+ # This is idempotent
+ c.send_failed()
+ assert c.our_state is ERROR
+ assert c.their_state is not ERROR
+
+
+def test_idle_receive_nothing() -> None:
+ # At one point this incorrectly raised an error
+ for role in [CLIENT, SERVER]:
+ c = Connection(role)
+ assert c.next_event() is NEED_DATA
+
+
+def test_connection_drop() -> None:
+ c = Connection(SERVER)
+ c.receive_data(b"GET /")
+ assert c.next_event() is NEED_DATA
+ c.receive_data(b"")
+ with pytest.raises(RemoteProtocolError):
+ c.next_event()
+
+
+def test_408_request_timeout() -> None:
+ # Should be able to send this spontaneously as a server without seeing
+ # anything from client
+ p = ConnectionPair()
+ p.send(SERVER, Response(status_code=408, headers=[(b"connection", b"close")]))
+
+
+# This used to raise IndexError
+def test_empty_request() -> None:
+ c = Connection(SERVER)
+ c.receive_data(b"\r\n")
+ with pytest.raises(RemoteProtocolError):
+ c.next_event()
+
+
+# This used to raise IndexError
+def test_empty_response() -> None:
+ c = Connection(CLIENT)
+ c.send(Request(method="GET", target="/", headers=[("Host", "a")]))
+ c.receive_data(b"\r\n")
+ with pytest.raises(RemoteProtocolError):
+ c.next_event()
+
+
+@pytest.mark.parametrize(
+ "data",
+ [
+ b"\x00",
+ b"\x20",
+ b"\x16\x03\x01\x00\xa5", # Typical start of a TLS Client Hello
+ ],
+)
+def test_early_detection_of_invalid_request(data: bytes) -> None:
+ c = Connection(SERVER)
+ # Early detection should occur before even receiving a `\r\n`
+ c.receive_data(data)
+ with pytest.raises(RemoteProtocolError):
+ c.next_event()
+
+
+@pytest.mark.parametrize(
+ "data",
+ [
+ b"\x00",
+ b"\x20",
+ b"\x16\x03\x03\x00\x31", # Typical start of a TLS Server Hello
+ ],
+)
+def test_early_detection_of_invalid_response(data: bytes) -> None:
+ c = Connection(CLIENT)
+ # Early detection should occur before even receiving a `\r\n`
+ c.receive_data(data)
+ with pytest.raises(RemoteProtocolError):
+ c.next_event()
+
+
+# This used to give different headers for HEAD and GET.
+# The correct way to handle HEAD is to put whatever headers we *would* have
+# put if it were a GET -- even though we know that for HEAD, those headers
+# will be ignored.
+def test_HEAD_framing_headers() -> None:
+ def setup(method: bytes, http_version: bytes) -> Connection:
+ c = Connection(SERVER)
+ c.receive_data(
+ method + b" / HTTP/" + http_version + b"\r\n" + b"Host: example.com\r\n\r\n"
+ )
+ assert type(c.next_event()) is Request
+ assert type(c.next_event()) is EndOfMessage
+ return c
+
+ for method in [b"GET", b"HEAD"]:
+ # No Content-Length, HTTP/1.1 peer, should use chunked
+ c = setup(method, b"1.1")
+ assert (
+ c.send(Response(status_code=200, headers=[])) == b"HTTP/1.1 200 \r\n" # type: ignore[arg-type]
+ b"Transfer-Encoding: chunked\r\n\r\n"
+ )
+
+ # No Content-Length, HTTP/1.0 peer, frame with connection: close
+ c = setup(method, b"1.0")
+ assert (
+ c.send(Response(status_code=200, headers=[])) == b"HTTP/1.1 200 \r\n" # type: ignore[arg-type]
+ b"Connection: close\r\n\r\n"
+ )
+
+ # Content-Length + Transfer-Encoding, TE wins
+ c = setup(method, b"1.1")
+ assert (
+ c.send(
+ Response(
+ status_code=200,
+ headers=[
+ ("Content-Length", "100"),
+ ("Transfer-Encoding", "chunked"),
+ ],
+ )
+ )
+ == b"HTTP/1.1 200 \r\n"
+ b"Transfer-Encoding: chunked\r\n\r\n"
+ )
+
+
+def test_special_exceptions_for_lost_connection_in_message_body() -> None:
+ c = Connection(SERVER)
+ c.receive_data(
+ b"POST / HTTP/1.1\r\n" b"Host: example.com\r\n" b"Content-Length: 100\r\n\r\n"
+ )
+ assert type(c.next_event()) is Request
+ assert c.next_event() is NEED_DATA
+ c.receive_data(b"12345")
+ assert c.next_event() == Data(data=b"12345")
+ c.receive_data(b"")
+ with pytest.raises(RemoteProtocolError) as excinfo:
+ c.next_event()
+ assert "received 5 bytes" in str(excinfo.value)
+ assert "expected 100" in str(excinfo.value)
+
+ c = Connection(SERVER)
+ c.receive_data(
+ b"POST / HTTP/1.1\r\n"
+ b"Host: example.com\r\n"
+ b"Transfer-Encoding: chunked\r\n\r\n"
+ )
+ assert type(c.next_event()) is Request
+ assert c.next_event() is NEED_DATA
+ c.receive_data(b"8\r\n012345")
+ assert c.next_event().data == b"012345" # type: ignore
+ c.receive_data(b"")
+ with pytest.raises(RemoteProtocolError) as excinfo:
+ c.next_event()
+ assert "incomplete chunked read" in str(excinfo.value)
diff --git a/venv/lib/python3.11/site-packages/h11/tests/test_events.py b/venv/lib/python3.11/site-packages/h11/tests/test_events.py
new file mode 100644
index 0000000..bc6c313
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/test_events.py
@@ -0,0 +1,150 @@
+from http import HTTPStatus
+
+import pytest
+
+from .. import _events
+from .._events import (
+ ConnectionClosed,
+ Data,
+ EndOfMessage,
+ Event,
+ InformationalResponse,
+ Request,
+ Response,
+)
+from .._util import LocalProtocolError
+
+
+def test_events() -> None:
+ with pytest.raises(LocalProtocolError):
+ # Missing Host:
+ req = Request(
+ method="GET", target="/", headers=[("a", "b")], http_version="1.1"
+ )
+ # But this is okay (HTTP/1.0)
+ req = Request(method="GET", target="/", headers=[("a", "b")], http_version="1.0")
+ # fields are normalized
+ assert req.method == b"GET"
+ assert req.target == b"/"
+ assert req.headers == [(b"a", b"b")]
+ assert req.http_version == b"1.0"
+
+ # This is also okay -- has a Host (with weird capitalization, which is ok)
+ req = Request(
+ method="GET",
+ target="/",
+ headers=[("a", "b"), ("hOSt", "example.com")],
+ http_version="1.1",
+ )
+ # we normalize header capitalization
+ assert req.headers == [(b"a", b"b"), (b"host", b"example.com")]
+
+ # Multiple host is bad too
+ with pytest.raises(LocalProtocolError):
+ req = Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "a"), ("Host", "a")],
+ http_version="1.1",
+ )
+ # Even for HTTP/1.0
+ with pytest.raises(LocalProtocolError):
+ req = Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "a"), ("Host", "a")],
+ http_version="1.0",
+ )
+
+ # Header values are validated
+ for bad_char in "\x00\r\n\f\v":
+ with pytest.raises(LocalProtocolError):
+ req = Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "a"), ("Foo", "asd" + bad_char)],
+ http_version="1.0",
+ )
+
+ # But for compatibility we allow non-whitespace control characters, even
+ # though they're forbidden by the spec.
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "a"), ("Foo", "asd\x01\x02\x7f")],
+ http_version="1.0",
+ )
+
+ # Request target is validated
+ for bad_byte in b"\x00\x20\x7f\xee":
+ target = bytearray(b"/")
+ target.append(bad_byte)
+ with pytest.raises(LocalProtocolError):
+ Request(
+ method="GET", target=target, headers=[("Host", "a")], http_version="1.1"
+ )
+
+ # Request method is validated
+ with pytest.raises(LocalProtocolError):
+ Request(
+ method="GET / HTTP/1.1",
+ target=target,
+ headers=[("Host", "a")],
+ http_version="1.1",
+ )
+
+ ir = InformationalResponse(status_code=100, headers=[("Host", "a")])
+ assert ir.status_code == 100
+ assert ir.headers == [(b"host", b"a")]
+ assert ir.http_version == b"1.1"
+
+ with pytest.raises(LocalProtocolError):
+ InformationalResponse(status_code=200, headers=[("Host", "a")])
+
+ resp = Response(status_code=204, headers=[], http_version="1.0") # type: ignore[arg-type]
+ assert resp.status_code == 204
+ assert resp.headers == []
+ assert resp.http_version == b"1.0"
+
+ with pytest.raises(LocalProtocolError):
+ resp = Response(status_code=100, headers=[], http_version="1.0") # type: ignore[arg-type]
+
+ with pytest.raises(LocalProtocolError):
+ Response(status_code="100", headers=[], http_version="1.0") # type: ignore[arg-type]
+
+ with pytest.raises(LocalProtocolError):
+ InformationalResponse(status_code=b"100", headers=[], http_version="1.0") # type: ignore[arg-type]
+
+ d = Data(data=b"asdf")
+ assert d.data == b"asdf"
+
+ eom = EndOfMessage()
+ assert eom.headers == []
+
+ cc = ConnectionClosed()
+ assert repr(cc) == "ConnectionClosed()"
+
+
+def test_intenum_status_code() -> None:
+ # https://github.com/python-hyper/h11/issues/72
+
+ r = Response(status_code=HTTPStatus.OK, headers=[], http_version="1.0") # type: ignore[arg-type]
+ assert r.status_code == HTTPStatus.OK
+ assert type(r.status_code) is not type(HTTPStatus.OK)
+ assert type(r.status_code) is int
+
+
+def test_header_casing() -> None:
+ r = Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "example.org"), ("Connection", "keep-alive")],
+ http_version="1.1",
+ )
+ assert len(r.headers) == 2
+ assert r.headers[0] == (b"host", b"example.org")
+ assert r.headers == [(b"host", b"example.org"), (b"connection", b"keep-alive")]
+ assert r.headers.raw_items() == [
+ (b"Host", b"example.org"),
+ (b"Connection", b"keep-alive"),
+ ]
diff --git a/venv/lib/python3.11/site-packages/h11/tests/test_headers.py b/venv/lib/python3.11/site-packages/h11/tests/test_headers.py
new file mode 100644
index 0000000..ba53d08
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/test_headers.py
@@ -0,0 +1,157 @@
+import pytest
+
+from .._events import Request
+from .._headers import (
+ get_comma_header,
+ has_expect_100_continue,
+ Headers,
+ normalize_and_validate,
+ set_comma_header,
+)
+from .._util import LocalProtocolError
+
+
+def test_normalize_and_validate() -> None:
+ assert normalize_and_validate([("foo", "bar")]) == [(b"foo", b"bar")]
+ assert normalize_and_validate([(b"foo", b"bar")]) == [(b"foo", b"bar")]
+
+ # no leading/trailing whitespace in names
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([(b"foo ", "bar")])
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([(b" foo", "bar")])
+
+ # no weird characters in names
+ with pytest.raises(LocalProtocolError) as excinfo:
+ normalize_and_validate([(b"foo bar", b"baz")])
+ assert "foo bar" in str(excinfo.value)
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([(b"foo\x00bar", b"baz")])
+ # Not even 8-bit characters:
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([(b"foo\xffbar", b"baz")])
+ # And not even the control characters we allow in values:
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([(b"foo\x01bar", b"baz")])
+
+ # no return or NUL characters in values
+ with pytest.raises(LocalProtocolError) as excinfo:
+ normalize_and_validate([("foo", "bar\rbaz")])
+ assert "bar\\rbaz" in str(excinfo.value)
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("foo", "bar\nbaz")])
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("foo", "bar\x00baz")])
+ # no leading/trailing whitespace
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("foo", "barbaz ")])
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("foo", " barbaz")])
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("foo", "barbaz\t")])
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("foo", "\tbarbaz")])
+
+ # content-length
+ assert normalize_and_validate([("Content-Length", "1")]) == [
+ (b"content-length", b"1")
+ ]
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("Content-Length", "asdf")])
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("Content-Length", "1x")])
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("Content-Length", "1"), ("Content-Length", "2")])
+ assert normalize_and_validate(
+ [("Content-Length", "0"), ("Content-Length", "0")]
+ ) == [(b"content-length", b"0")]
+ assert normalize_and_validate([("Content-Length", "0 , 0")]) == [
+ (b"content-length", b"0")
+ ]
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate(
+ [("Content-Length", "1"), ("Content-Length", "1"), ("Content-Length", "2")]
+ )
+ with pytest.raises(LocalProtocolError):
+ normalize_and_validate([("Content-Length", "1 , 1,2")])
+
+ # transfer-encoding
+ assert normalize_and_validate([("Transfer-Encoding", "chunked")]) == [
+ (b"transfer-encoding", b"chunked")
+ ]
+ assert normalize_and_validate([("Transfer-Encoding", "cHuNkEd")]) == [
+ (b"transfer-encoding", b"chunked")
+ ]
+ with pytest.raises(LocalProtocolError) as excinfo:
+ normalize_and_validate([("Transfer-Encoding", "gzip")])
+ assert excinfo.value.error_status_hint == 501 # Not Implemented
+ with pytest.raises(LocalProtocolError) as excinfo:
+ normalize_and_validate(
+ [("Transfer-Encoding", "chunked"), ("Transfer-Encoding", "gzip")]
+ )
+ assert excinfo.value.error_status_hint == 501 # Not Implemented
+
+
+def test_get_set_comma_header() -> None:
+ headers = normalize_and_validate(
+ [
+ ("Connection", "close"),
+ ("whatever", "something"),
+ ("connectiON", "fOo,, , BAR"),
+ ]
+ )
+
+ assert get_comma_header(headers, b"connection") == [b"close", b"foo", b"bar"]
+
+ headers = set_comma_header(headers, b"newthing", ["a", "b"]) # type: ignore
+
+ with pytest.raises(LocalProtocolError):
+ set_comma_header(headers, b"newthing", [" a", "b"]) # type: ignore
+
+ assert headers == [
+ (b"connection", b"close"),
+ (b"whatever", b"something"),
+ (b"connection", b"fOo,, , BAR"),
+ (b"newthing", b"a"),
+ (b"newthing", b"b"),
+ ]
+
+ headers = set_comma_header(headers, b"whatever", ["different thing"]) # type: ignore
+
+ assert headers == [
+ (b"connection", b"close"),
+ (b"connection", b"fOo,, , BAR"),
+ (b"newthing", b"a"),
+ (b"newthing", b"b"),
+ (b"whatever", b"different thing"),
+ ]
+
+
+def test_has_100_continue() -> None:
+ assert has_expect_100_continue(
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "example.com"), ("Expect", "100-continue")],
+ )
+ )
+ assert not has_expect_100_continue(
+ Request(method="GET", target="/", headers=[("Host", "example.com")])
+ )
+ # Case insensitive
+ assert has_expect_100_continue(
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "example.com"), ("Expect", "100-Continue")],
+ )
+ )
+ # Doesn't work in HTTP/1.0
+ assert not has_expect_100_continue(
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "example.com"), ("Expect", "100-continue")],
+ http_version="1.0",
+ )
+ )
diff --git a/venv/lib/python3.11/site-packages/h11/tests/test_helpers.py b/venv/lib/python3.11/site-packages/h11/tests/test_helpers.py
new file mode 100644
index 0000000..c329c76
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/test_helpers.py
@@ -0,0 +1,32 @@
+from .._events import (
+ ConnectionClosed,
+ Data,
+ EndOfMessage,
+ Event,
+ InformationalResponse,
+ Request,
+ Response,
+)
+from .helpers import normalize_data_events
+
+
+def test_normalize_data_events() -> None:
+ assert normalize_data_events(
+ [
+ Data(data=bytearray(b"1")),
+ Data(data=b"2"),
+ Response(status_code=200, headers=[]), # type: ignore[arg-type]
+ Data(data=b"3"),
+ Data(data=b"4"),
+ EndOfMessage(),
+ Data(data=b"5"),
+ Data(data=b"6"),
+ Data(data=b"7"),
+ ]
+ ) == [
+ Data(data=b"12"),
+ Response(status_code=200, headers=[]), # type: ignore[arg-type]
+ Data(data=b"34"),
+ EndOfMessage(),
+ Data(data=b"567"),
+ ]
diff --git a/venv/lib/python3.11/site-packages/h11/tests/test_io.py b/venv/lib/python3.11/site-packages/h11/tests/test_io.py
new file mode 100644
index 0000000..2b47c0e
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/test_io.py
@@ -0,0 +1,572 @@
+from typing import Any, Callable, Generator, List
+
+import pytest
+
+from .._events import (
+ ConnectionClosed,
+ Data,
+ EndOfMessage,
+ Event,
+ InformationalResponse,
+ Request,
+ Response,
+)
+from .._headers import Headers, normalize_and_validate
+from .._readers import (
+ _obsolete_line_fold,
+ ChunkedReader,
+ ContentLengthReader,
+ Http10Reader,
+ READERS,
+)
+from .._receivebuffer import ReceiveBuffer
+from .._state import (
+ CLIENT,
+ CLOSED,
+ DONE,
+ IDLE,
+ MIGHT_SWITCH_PROTOCOL,
+ MUST_CLOSE,
+ SEND_BODY,
+ SEND_RESPONSE,
+ SERVER,
+ SWITCHED_PROTOCOL,
+)
+from .._util import LocalProtocolError
+from .._writers import (
+ ChunkedWriter,
+ ContentLengthWriter,
+ Http10Writer,
+ write_any_response,
+ write_headers,
+ write_request,
+ WRITERS,
+)
+from .helpers import normalize_data_events
+
+SIMPLE_CASES = [
+ (
+ (CLIENT, IDLE),
+ Request(
+ method="GET",
+ target="/a",
+ headers=[("Host", "foo"), ("Connection", "close")],
+ ),
+ b"GET /a HTTP/1.1\r\nHost: foo\r\nConnection: close\r\n\r\n",
+ ),
+ (
+ (SERVER, SEND_RESPONSE),
+ Response(status_code=200, headers=[("Connection", "close")], reason=b"OK"),
+ b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n",
+ ),
+ (
+ (SERVER, SEND_RESPONSE),
+ Response(status_code=200, headers=[], reason=b"OK"), # type: ignore[arg-type]
+ b"HTTP/1.1 200 OK\r\n\r\n",
+ ),
+ (
+ (SERVER, SEND_RESPONSE),
+ InformationalResponse(
+ status_code=101, headers=[("Upgrade", "websocket")], reason=b"Upgrade"
+ ),
+ b"HTTP/1.1 101 Upgrade\r\nUpgrade: websocket\r\n\r\n",
+ ),
+ (
+ (SERVER, SEND_RESPONSE),
+ InformationalResponse(status_code=101, headers=[], reason=b"Upgrade"), # type: ignore[arg-type]
+ b"HTTP/1.1 101 Upgrade\r\n\r\n",
+ ),
+]
+
+
+def dowrite(writer: Callable[..., None], obj: Any) -> bytes:
+ got_list: List[bytes] = []
+ writer(obj, got_list.append)
+ return b"".join(got_list)
+
+
+def tw(writer: Any, obj: Any, expected: Any) -> None:
+ got = dowrite(writer, obj)
+ assert got == expected
+
+
+def makebuf(data: bytes) -> ReceiveBuffer:
+ buf = ReceiveBuffer()
+ buf += data
+ return buf
+
+
+def tr(reader: Any, data: bytes, expected: Any) -> None:
+ def check(got: Any) -> None:
+ assert got == expected
+ # Headers should always be returned as bytes, not e.g. bytearray
+ # https://github.com/python-hyper/wsproto/pull/54#issuecomment-377709478
+ for name, value in getattr(got, "headers", []):
+ assert type(name) is bytes
+ assert type(value) is bytes
+
+ # Simple: consume whole thing
+ buf = makebuf(data)
+ check(reader(buf))
+ assert not buf
+
+ # Incrementally growing buffer
+ buf = ReceiveBuffer()
+ for i in range(len(data)):
+ assert reader(buf) is None
+ buf += data[i : i + 1]
+ check(reader(buf))
+
+ # Trailing data
+ buf = makebuf(data)
+ buf += b"trailing"
+ check(reader(buf))
+ assert bytes(buf) == b"trailing"
+
+
+def test_writers_simple() -> None:
+ for ((role, state), event, binary) in SIMPLE_CASES:
+ tw(WRITERS[role, state], event, binary)
+
+
+def test_readers_simple() -> None:
+ for ((role, state), event, binary) in SIMPLE_CASES:
+ tr(READERS[role, state], binary, event)
+
+
+def test_writers_unusual() -> None:
+ # Simple test of the write_headers utility routine
+ tw(
+ write_headers,
+ normalize_and_validate([("foo", "bar"), ("baz", "quux")]),
+ b"foo: bar\r\nbaz: quux\r\n\r\n",
+ )
+ tw(write_headers, Headers([]), b"\r\n")
+
+ # We understand HTTP/1.0, but we don't speak it
+ with pytest.raises(LocalProtocolError):
+ tw(
+ write_request,
+ Request(
+ method="GET",
+ target="/",
+ headers=[("Host", "foo"), ("Connection", "close")],
+ http_version="1.0",
+ ),
+ None,
+ )
+ with pytest.raises(LocalProtocolError):
+ tw(
+ write_any_response,
+ Response(
+ status_code=200, headers=[("Connection", "close")], http_version="1.0"
+ ),
+ None,
+ )
+
+
+def test_readers_unusual() -> None:
+ # Reading HTTP/1.0
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.0\r\nSome: header\r\n\r\n",
+ Request(
+ method="HEAD",
+ target="/foo",
+ headers=[("Some", "header")],
+ http_version="1.0",
+ ),
+ )
+
+ # check no-headers, since it's only legal with HTTP/1.0
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.0\r\n\r\n",
+ Request(method="HEAD", target="/foo", headers=[], http_version="1.0"), # type: ignore[arg-type]
+ )
+
+ tr(
+ READERS[SERVER, SEND_RESPONSE],
+ b"HTTP/1.0 200 OK\r\nSome: header\r\n\r\n",
+ Response(
+ status_code=200,
+ headers=[("Some", "header")],
+ http_version="1.0",
+ reason=b"OK",
+ ),
+ )
+
+ # single-character header values (actually disallowed by the ABNF in RFC
+ # 7230 -- this is a bug in the standard that we originally copied...)
+ tr(
+ READERS[SERVER, SEND_RESPONSE],
+ b"HTTP/1.0 200 OK\r\n" b"Foo: a a a a a \r\n\r\n",
+ Response(
+ status_code=200,
+ headers=[("Foo", "a a a a a")],
+ http_version="1.0",
+ reason=b"OK",
+ ),
+ )
+
+ # Empty headers -- also legal
+ tr(
+ READERS[SERVER, SEND_RESPONSE],
+ b"HTTP/1.0 200 OK\r\n" b"Foo:\r\n\r\n",
+ Response(
+ status_code=200, headers=[("Foo", "")], http_version="1.0", reason=b"OK"
+ ),
+ )
+
+ tr(
+ READERS[SERVER, SEND_RESPONSE],
+ b"HTTP/1.0 200 OK\r\n" b"Foo: \t \t \r\n\r\n",
+ Response(
+ status_code=200, headers=[("Foo", "")], http_version="1.0", reason=b"OK"
+ ),
+ )
+
+ # Tolerate broken servers that leave off the response code
+ tr(
+ READERS[SERVER, SEND_RESPONSE],
+ b"HTTP/1.0 200\r\n" b"Foo: bar\r\n\r\n",
+ Response(
+ status_code=200, headers=[("Foo", "bar")], http_version="1.0", reason=b""
+ ),
+ )
+
+ # Tolerate headers line endings (\r\n and \n)
+ # \n\r\b between headers and body
+ tr(
+ READERS[SERVER, SEND_RESPONSE],
+ b"HTTP/1.1 200 OK\r\nSomeHeader: val\n\r\n",
+ Response(
+ status_code=200,
+ headers=[("SomeHeader", "val")],
+ http_version="1.1",
+ reason="OK",
+ ),
+ )
+
+ # delimited only with \n
+ tr(
+ READERS[SERVER, SEND_RESPONSE],
+ b"HTTP/1.1 200 OK\nSomeHeader1: val1\nSomeHeader2: val2\n\n",
+ Response(
+ status_code=200,
+ headers=[("SomeHeader1", "val1"), ("SomeHeader2", "val2")],
+ http_version="1.1",
+ reason="OK",
+ ),
+ )
+
+ # mixed \r\n and \n
+ tr(
+ READERS[SERVER, SEND_RESPONSE],
+ b"HTTP/1.1 200 OK\r\nSomeHeader1: val1\nSomeHeader2: val2\n\r\n",
+ Response(
+ status_code=200,
+ headers=[("SomeHeader1", "val1"), ("SomeHeader2", "val2")],
+ http_version="1.1",
+ reason="OK",
+ ),
+ )
+
+ # obsolete line folding
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.1\r\n"
+ b"Host: example.com\r\n"
+ b"Some: multi-line\r\n"
+ b" header\r\n"
+ b"\tnonsense\r\n"
+ b" \t \t\tI guess\r\n"
+ b"Connection: close\r\n"
+ b"More-nonsense: in the\r\n"
+ b" last header \r\n\r\n",
+ Request(
+ method="HEAD",
+ target="/foo",
+ headers=[
+ ("Host", "example.com"),
+ ("Some", "multi-line header nonsense I guess"),
+ ("Connection", "close"),
+ ("More-nonsense", "in the last header"),
+ ],
+ ),
+ )
+
+ with pytest.raises(LocalProtocolError):
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.1\r\n" b" folded: line\r\n\r\n",
+ None,
+ )
+
+ with pytest.raises(LocalProtocolError):
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.1\r\n" b"foo : line\r\n\r\n",
+ None,
+ )
+ with pytest.raises(LocalProtocolError):
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.1\r\n" b"foo\t: line\r\n\r\n",
+ None,
+ )
+ with pytest.raises(LocalProtocolError):
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.1\r\n" b"foo\t: line\r\n\r\n",
+ None,
+ )
+ with pytest.raises(LocalProtocolError):
+ tr(READERS[CLIENT, IDLE], b"HEAD /foo HTTP/1.1\r\n" b": line\r\n\r\n", None)
+
+
+def test__obsolete_line_fold_bytes() -> None:
+ # _obsolete_line_fold has a defensive cast to bytearray, which is
+ # necessary to protect against O(n^2) behavior in case anyone ever passes
+ # in regular bytestrings... but right now we never pass in regular
+ # bytestrings. so this test just exists to get some coverage on that
+ # defensive cast.
+ assert list(_obsolete_line_fold([b"aaa", b"bbb", b" ccc", b"ddd"])) == [
+ b"aaa",
+ bytearray(b"bbb ccc"),
+ b"ddd",
+ ]
+
+
+def _run_reader_iter(
+ reader: Any, buf: bytes, do_eof: bool
+) -> Generator[Any, None, None]:
+ while True:
+ event = reader(buf)
+ if event is None:
+ break
+ yield event
+ # body readers have undefined behavior after returning EndOfMessage,
+ # because this changes the state so they don't get called again
+ if type(event) is EndOfMessage:
+ break
+ if do_eof:
+ assert not buf
+ yield reader.read_eof()
+
+
+def _run_reader(*args: Any) -> List[Event]:
+ events = list(_run_reader_iter(*args))
+ return normalize_data_events(events)
+
+
+def t_body_reader(thunk: Any, data: bytes, expected: Any, do_eof: bool = False) -> None:
+ # Simple: consume whole thing
+ print("Test 1")
+ buf = makebuf(data)
+ assert _run_reader(thunk(), buf, do_eof) == expected
+
+ # Incrementally growing buffer
+ print("Test 2")
+ reader = thunk()
+ buf = ReceiveBuffer()
+ events = []
+ for i in range(len(data)):
+ events += _run_reader(reader, buf, False)
+ buf += data[i : i + 1]
+ events += _run_reader(reader, buf, do_eof)
+ assert normalize_data_events(events) == expected
+
+ is_complete = any(type(event) is EndOfMessage for event in expected)
+ if is_complete and not do_eof:
+ buf = makebuf(data + b"trailing")
+ assert _run_reader(thunk(), buf, False) == expected
+
+
+def test_ContentLengthReader() -> None:
+ t_body_reader(lambda: ContentLengthReader(0), b"", [EndOfMessage()])
+
+ t_body_reader(
+ lambda: ContentLengthReader(10),
+ b"0123456789",
+ [Data(data=b"0123456789"), EndOfMessage()],
+ )
+
+
+def test_Http10Reader() -> None:
+ t_body_reader(Http10Reader, b"", [EndOfMessage()], do_eof=True)
+ t_body_reader(Http10Reader, b"asdf", [Data(data=b"asdf")], do_eof=False)
+ t_body_reader(
+ Http10Reader, b"asdf", [Data(data=b"asdf"), EndOfMessage()], do_eof=True
+ )
+
+
+def test_ChunkedReader() -> None:
+ t_body_reader(ChunkedReader, b"0\r\n\r\n", [EndOfMessage()])
+
+ t_body_reader(
+ ChunkedReader,
+ b"0\r\nSome: header\r\n\r\n",
+ [EndOfMessage(headers=[("Some", "header")])],
+ )
+
+ t_body_reader(
+ ChunkedReader,
+ b"5\r\n01234\r\n"
+ + b"10\r\n0123456789abcdef\r\n"
+ + b"0\r\n"
+ + b"Some: header\r\n\r\n",
+ [
+ Data(data=b"012340123456789abcdef"),
+ EndOfMessage(headers=[("Some", "header")]),
+ ],
+ )
+
+ t_body_reader(
+ ChunkedReader,
+ b"5\r\n01234\r\n" + b"10\r\n0123456789abcdef\r\n" + b"0\r\n\r\n",
+ [Data(data=b"012340123456789abcdef"), EndOfMessage()],
+ )
+
+ # handles upper and lowercase hex
+ t_body_reader(
+ ChunkedReader,
+ b"aA\r\n" + b"x" * 0xAA + b"\r\n" + b"0\r\n\r\n",
+ [Data(data=b"x" * 0xAA), EndOfMessage()],
+ )
+
+ # refuses arbitrarily long chunk integers
+ with pytest.raises(LocalProtocolError):
+ # Technically this is legal HTTP/1.1, but we refuse to process chunk
+ # sizes that don't fit into 20 characters of hex
+ t_body_reader(ChunkedReader, b"9" * 100 + b"\r\nxxx", [Data(data=b"xxx")])
+
+ # refuses garbage in the chunk count
+ with pytest.raises(LocalProtocolError):
+ t_body_reader(ChunkedReader, b"10\x00\r\nxxx", None)
+
+ # handles (and discards) "chunk extensions" omg wtf
+ t_body_reader(
+ ChunkedReader,
+ b"5; hello=there\r\n"
+ + b"xxxxx"
+ + b"\r\n"
+ + b'0; random="junk"; some=more; canbe=lonnnnngg\r\n\r\n',
+ [Data(data=b"xxxxx"), EndOfMessage()],
+ )
+
+ t_body_reader(
+ ChunkedReader,
+ b"5 \r\n01234\r\n" + b"0\r\n\r\n",
+ [Data(data=b"01234"), EndOfMessage()],
+ )
+
+
+def test_ContentLengthWriter() -> None:
+ w = ContentLengthWriter(5)
+ assert dowrite(w, Data(data=b"123")) == b"123"
+ assert dowrite(w, Data(data=b"45")) == b"45"
+ assert dowrite(w, EndOfMessage()) == b""
+
+ w = ContentLengthWriter(5)
+ with pytest.raises(LocalProtocolError):
+ dowrite(w, Data(data=b"123456"))
+
+ w = ContentLengthWriter(5)
+ dowrite(w, Data(data=b"123"))
+ with pytest.raises(LocalProtocolError):
+ dowrite(w, Data(data=b"456"))
+
+ w = ContentLengthWriter(5)
+ dowrite(w, Data(data=b"123"))
+ with pytest.raises(LocalProtocolError):
+ dowrite(w, EndOfMessage())
+
+ w = ContentLengthWriter(5)
+ dowrite(w, Data(data=b"123")) == b"123"
+ dowrite(w, Data(data=b"45")) == b"45"
+ with pytest.raises(LocalProtocolError):
+ dowrite(w, EndOfMessage(headers=[("Etag", "asdf")]))
+
+
+def test_ChunkedWriter() -> None:
+ w = ChunkedWriter()
+ assert dowrite(w, Data(data=b"aaa")) == b"3\r\naaa\r\n"
+ assert dowrite(w, Data(data=b"a" * 20)) == b"14\r\n" + b"a" * 20 + b"\r\n"
+
+ assert dowrite(w, Data(data=b"")) == b""
+
+ assert dowrite(w, EndOfMessage()) == b"0\r\n\r\n"
+
+ assert (
+ dowrite(w, EndOfMessage(headers=[("Etag", "asdf"), ("a", "b")]))
+ == b"0\r\nEtag: asdf\r\na: b\r\n\r\n"
+ )
+
+
+def test_Http10Writer() -> None:
+ w = Http10Writer()
+ assert dowrite(w, Data(data=b"1234")) == b"1234"
+ assert dowrite(w, EndOfMessage()) == b""
+
+ with pytest.raises(LocalProtocolError):
+ dowrite(w, EndOfMessage(headers=[("Etag", "asdf")]))
+
+
+def test_reject_garbage_after_request_line() -> None:
+ with pytest.raises(LocalProtocolError):
+ tr(READERS[SERVER, SEND_RESPONSE], b"HTTP/1.0 200 OK\x00xxxx\r\n\r\n", None)
+
+
+def test_reject_garbage_after_response_line() -> None:
+ with pytest.raises(LocalProtocolError):
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.1 xxxxxx\r\n" b"Host: a\r\n\r\n",
+ None,
+ )
+
+
+def test_reject_garbage_in_header_line() -> None:
+ with pytest.raises(LocalProtocolError):
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.1\r\n" b"Host: foo\x00bar\r\n\r\n",
+ None,
+ )
+
+
+def test_reject_non_vchar_in_path() -> None:
+ for bad_char in b"\x00\x20\x7f\xee":
+ message = bytearray(b"HEAD /")
+ message.append(bad_char)
+ message.extend(b" HTTP/1.1\r\nHost: foobar\r\n\r\n")
+ with pytest.raises(LocalProtocolError):
+ tr(READERS[CLIENT, IDLE], message, None)
+
+
+# https://github.com/python-hyper/h11/issues/57
+def test_allow_some_garbage_in_cookies() -> None:
+ tr(
+ READERS[CLIENT, IDLE],
+ b"HEAD /foo HTTP/1.1\r\n"
+ b"Host: foo\r\n"
+ b"Set-Cookie: ___utmvafIumyLc=kUd\x01UpAt; path=/; Max-Age=900\r\n"
+ b"\r\n",
+ Request(
+ method="HEAD",
+ target="/foo",
+ headers=[
+ ("Host", "foo"),
+ ("Set-Cookie", "___utmvafIumyLc=kUd\x01UpAt; path=/; Max-Age=900"),
+ ],
+ ),
+ )
+
+
+def test_host_comes_first() -> None:
+ tw(
+ write_headers,
+ normalize_and_validate([("foo", "bar"), ("Host", "example.com")]),
+ b"Host: example.com\r\nfoo: bar\r\n\r\n",
+ )
diff --git a/venv/lib/python3.11/site-packages/h11/tests/test_receivebuffer.py b/venv/lib/python3.11/site-packages/h11/tests/test_receivebuffer.py
new file mode 100644
index 0000000..21a3870
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/test_receivebuffer.py
@@ -0,0 +1,135 @@
+import re
+from typing import Tuple
+
+import pytest
+
+from .._receivebuffer import ReceiveBuffer
+
+
+def test_receivebuffer() -> None:
+ b = ReceiveBuffer()
+ assert not b
+ assert len(b) == 0
+ assert bytes(b) == b""
+
+ b += b"123"
+ assert b
+ assert len(b) == 3
+ assert bytes(b) == b"123"
+
+ assert bytes(b) == b"123"
+
+ assert b.maybe_extract_at_most(2) == b"12"
+ assert b
+ assert len(b) == 1
+ assert bytes(b) == b"3"
+
+ assert bytes(b) == b"3"
+
+ assert b.maybe_extract_at_most(10) == b"3"
+ assert bytes(b) == b""
+
+ assert b.maybe_extract_at_most(10) is None
+ assert not b
+
+ ################################################################
+ # maybe_extract_until_next
+ ################################################################
+
+ b += b"123\n456\r\n789\r\n"
+
+ assert b.maybe_extract_next_line() == b"123\n456\r\n"
+ assert bytes(b) == b"789\r\n"
+
+ assert b.maybe_extract_next_line() == b"789\r\n"
+ assert bytes(b) == b""
+
+ b += b"12\r"
+ assert b.maybe_extract_next_line() is None
+ assert bytes(b) == b"12\r"
+
+ b += b"345\n\r"
+ assert b.maybe_extract_next_line() is None
+ assert bytes(b) == b"12\r345\n\r"
+
+ # here we stopped at the middle of b"\r\n" delimiter
+
+ b += b"\n6789aaa123\r\n"
+ assert b.maybe_extract_next_line() == b"12\r345\n\r\n"
+ assert b.maybe_extract_next_line() == b"6789aaa123\r\n"
+ assert b.maybe_extract_next_line() is None
+ assert bytes(b) == b""
+
+ ################################################################
+ # maybe_extract_lines
+ ################################################################
+
+ b += b"123\r\na: b\r\nfoo:bar\r\n\r\ntrailing"
+ lines = b.maybe_extract_lines()
+ assert lines == [b"123", b"a: b", b"foo:bar"]
+ assert bytes(b) == b"trailing"
+
+ assert b.maybe_extract_lines() is None
+
+ b += b"\r\n\r"
+ assert b.maybe_extract_lines() is None
+
+ assert b.maybe_extract_at_most(100) == b"trailing\r\n\r"
+ assert not b
+
+ # Empty body case (as happens at the end of chunked encoding if there are
+ # no trailing headers, e.g.)
+ b += b"\r\ntrailing"
+ assert b.maybe_extract_lines() == []
+ assert bytes(b) == b"trailing"
+
+
+@pytest.mark.parametrize(
+ "data",
+ [
+ pytest.param(
+ (
+ b"HTTP/1.1 200 OK\r\n",
+ b"Content-type: text/plain\r\n",
+ b"Connection: close\r\n",
+ b"\r\n",
+ b"Some body",
+ ),
+ id="with_crlf_delimiter",
+ ),
+ pytest.param(
+ (
+ b"HTTP/1.1 200 OK\n",
+ b"Content-type: text/plain\n",
+ b"Connection: close\n",
+ b"\n",
+ b"Some body",
+ ),
+ id="with_lf_only_delimiter",
+ ),
+ pytest.param(
+ (
+ b"HTTP/1.1 200 OK\n",
+ b"Content-type: text/plain\r\n",
+ b"Connection: close\n",
+ b"\n",
+ b"Some body",
+ ),
+ id="with_mixed_crlf_and_lf",
+ ),
+ ],
+)
+def test_receivebuffer_for_invalid_delimiter(data: Tuple[bytes]) -> None:
+ b = ReceiveBuffer()
+
+ for line in data:
+ b += line
+
+ lines = b.maybe_extract_lines()
+
+ assert lines == [
+ b"HTTP/1.1 200 OK",
+ b"Content-type: text/plain",
+ b"Connection: close",
+ ]
+ assert bytes(b) == b"Some body"
diff --git a/venv/lib/python3.11/site-packages/h11/tests/test_state.py b/venv/lib/python3.11/site-packages/h11/tests/test_state.py
new file mode 100644
index 0000000..bc974e6
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/test_state.py
@@ -0,0 +1,271 @@
+import pytest
+
+from .._events import (
+ ConnectionClosed,
+ Data,
+ EndOfMessage,
+ Event,
+ InformationalResponse,
+ Request,
+ Response,
+)
+from .._state import (
+ _SWITCH_CONNECT,
+ _SWITCH_UPGRADE,
+ CLIENT,
+ CLOSED,
+ ConnectionState,
+ DONE,
+ IDLE,
+ MIGHT_SWITCH_PROTOCOL,
+ MUST_CLOSE,
+ SEND_BODY,
+ SEND_RESPONSE,
+ SERVER,
+ SWITCHED_PROTOCOL,
+)
+from .._util import LocalProtocolError
+
+
+def test_ConnectionState() -> None:
+ cs = ConnectionState()
+
+ # Basic event-triggered transitions
+
+ assert cs.states == {CLIENT: IDLE, SERVER: IDLE}
+
+ cs.process_event(CLIENT, Request)
+ # The SERVER-Request special case:
+ assert cs.states == {CLIENT: SEND_BODY, SERVER: SEND_RESPONSE}
+
+ # Illegal transitions raise an error and nothing happens
+ with pytest.raises(LocalProtocolError):
+ cs.process_event(CLIENT, Request)
+ assert cs.states == {CLIENT: SEND_BODY, SERVER: SEND_RESPONSE}
+
+ cs.process_event(SERVER, InformationalResponse)
+ assert cs.states == {CLIENT: SEND_BODY, SERVER: SEND_RESPONSE}
+
+ cs.process_event(SERVER, Response)
+ assert cs.states == {CLIENT: SEND_BODY, SERVER: SEND_BODY}
+
+ cs.process_event(CLIENT, EndOfMessage)
+ cs.process_event(SERVER, EndOfMessage)
+ assert cs.states == {CLIENT: DONE, SERVER: DONE}
+
+ # State-triggered transition
+
+ cs.process_event(SERVER, ConnectionClosed)
+ assert cs.states == {CLIENT: MUST_CLOSE, SERVER: CLOSED}
+
+
+def test_ConnectionState_keep_alive() -> None:
+ # keep_alive = False
+ cs = ConnectionState()
+ cs.process_event(CLIENT, Request)
+ cs.process_keep_alive_disabled()
+ cs.process_event(CLIENT, EndOfMessage)
+ assert cs.states == {CLIENT: MUST_CLOSE, SERVER: SEND_RESPONSE}
+
+ cs.process_event(SERVER, Response)
+ cs.process_event(SERVER, EndOfMessage)
+ assert cs.states == {CLIENT: MUST_CLOSE, SERVER: MUST_CLOSE}
+
+
+def test_ConnectionState_keep_alive_in_DONE() -> None:
+ # Check that if keep_alive is disabled when the CLIENT is already in DONE,
+ # then this is sufficient to immediately trigger the DONE -> MUST_CLOSE
+ # transition
+ cs = ConnectionState()
+ cs.process_event(CLIENT, Request)
+ cs.process_event(CLIENT, EndOfMessage)
+ assert cs.states[CLIENT] is DONE
+ cs.process_keep_alive_disabled()
+ assert cs.states[CLIENT] is MUST_CLOSE
+
+
+def test_ConnectionState_switch_denied() -> None:
+ for switch_type in (_SWITCH_CONNECT, _SWITCH_UPGRADE):
+ for deny_early in (True, False):
+ cs = ConnectionState()
+ cs.process_client_switch_proposal(switch_type)
+ cs.process_event(CLIENT, Request)
+ cs.process_event(CLIENT, Data)
+ assert cs.states == {CLIENT: SEND_BODY, SERVER: SEND_RESPONSE}
+
+ assert switch_type in cs.pending_switch_proposals
+
+ if deny_early:
+ # before client reaches DONE
+ cs.process_event(SERVER, Response)
+ assert not cs.pending_switch_proposals
+
+ cs.process_event(CLIENT, EndOfMessage)
+
+ if deny_early:
+ assert cs.states == {CLIENT: DONE, SERVER: SEND_BODY}
+ else:
+ assert cs.states == {
+ CLIENT: MIGHT_SWITCH_PROTOCOL,
+ SERVER: SEND_RESPONSE,
+ }
+
+ cs.process_event(SERVER, InformationalResponse)
+ assert cs.states == {
+ CLIENT: MIGHT_SWITCH_PROTOCOL,
+ SERVER: SEND_RESPONSE,
+ }
+
+ cs.process_event(SERVER, Response)
+ assert cs.states == {CLIENT: DONE, SERVER: SEND_BODY}
+ assert not cs.pending_switch_proposals
+
+
+_response_type_for_switch = {
+ _SWITCH_UPGRADE: InformationalResponse,
+ _SWITCH_CONNECT: Response,
+ None: Response,
+}
+
+
+def test_ConnectionState_protocol_switch_accepted() -> None:
+ for switch_event in [_SWITCH_UPGRADE, _SWITCH_CONNECT]:
+ cs = ConnectionState()
+ cs.process_client_switch_proposal(switch_event)
+ cs.process_event(CLIENT, Request)
+ cs.process_event(CLIENT, Data)
+ assert cs.states == {CLIENT: SEND_BODY, SERVER: SEND_RESPONSE}
+
+ cs.process_event(CLIENT, EndOfMessage)
+ assert cs.states == {CLIENT: MIGHT_SWITCH_PROTOCOL, SERVER: SEND_RESPONSE}
+
+ cs.process_event(SERVER, InformationalResponse)
+ assert cs.states == {CLIENT: MIGHT_SWITCH_PROTOCOL, SERVER: SEND_RESPONSE}
+
+ cs.process_event(SERVER, _response_type_for_switch[switch_event], switch_event)
+ assert cs.states == {CLIENT: SWITCHED_PROTOCOL, SERVER: SWITCHED_PROTOCOL}
+
+
+def test_ConnectionState_double_protocol_switch() -> None:
+ # CONNECT + Upgrade is legal! Very silly, but legal. So we support
+ # it. Because sometimes doing the silly thing is easier than not.
+ for server_switch in [None, _SWITCH_UPGRADE, _SWITCH_CONNECT]:
+ cs = ConnectionState()
+ cs.process_client_switch_proposal(_SWITCH_UPGRADE)
+ cs.process_client_switch_proposal(_SWITCH_CONNECT)
+ cs.process_event(CLIENT, Request)
+ cs.process_event(CLIENT, EndOfMessage)
+ assert cs.states == {CLIENT: MIGHT_SWITCH_PROTOCOL, SERVER: SEND_RESPONSE}
+ cs.process_event(
+ SERVER, _response_type_for_switch[server_switch], server_switch
+ )
+ if server_switch is None:
+ assert cs.states == {CLIENT: DONE, SERVER: SEND_BODY}
+ else:
+ assert cs.states == {CLIENT: SWITCHED_PROTOCOL, SERVER: SWITCHED_PROTOCOL}
+
+
+def test_ConnectionState_inconsistent_protocol_switch() -> None:
+ for client_switches, server_switch in [
+ ([], _SWITCH_CONNECT),
+ ([], _SWITCH_UPGRADE),
+ ([_SWITCH_UPGRADE], _SWITCH_CONNECT),
+ ([_SWITCH_CONNECT], _SWITCH_UPGRADE),
+ ]:
+ cs = ConnectionState()
+ for client_switch in client_switches: # type: ignore[attr-defined]
+ cs.process_client_switch_proposal(client_switch)
+ cs.process_event(CLIENT, Request)
+ with pytest.raises(LocalProtocolError):
+ cs.process_event(SERVER, Response, server_switch)
+
+
+def test_ConnectionState_keepalive_protocol_switch_interaction() -> None:
+ # keep_alive=False + pending_switch_proposals
+ cs = ConnectionState()
+ cs.process_client_switch_proposal(_SWITCH_UPGRADE)
+ cs.process_event(CLIENT, Request)
+ cs.process_keep_alive_disabled()
+ cs.process_event(CLIENT, Data)
+ assert cs.states == {CLIENT: SEND_BODY, SERVER: SEND_RESPONSE}
+
+ # the protocol switch "wins"
+ cs.process_event(CLIENT, EndOfMessage)
+ assert cs.states == {CLIENT: MIGHT_SWITCH_PROTOCOL, SERVER: SEND_RESPONSE}
+
+ # but when the server denies the request, keep_alive comes back into play
+ cs.process_event(SERVER, Response)
+ assert cs.states == {CLIENT: MUST_CLOSE, SERVER: SEND_BODY}
+
+
+def test_ConnectionState_reuse() -> None:
+ cs = ConnectionState()
+
+ with pytest.raises(LocalProtocolError):
+ cs.start_next_cycle()
+
+ cs.process_event(CLIENT, Request)
+ cs.process_event(CLIENT, EndOfMessage)
+
+ with pytest.raises(LocalProtocolError):
+ cs.start_next_cycle()
+
+ cs.process_event(SERVER, Response)
+ cs.process_event(SERVER, EndOfMessage)
+
+ cs.start_next_cycle()
+ assert cs.states == {CLIENT: IDLE, SERVER: IDLE}
+
+ # No keepalive
+
+ cs.process_event(CLIENT, Request)
+ cs.process_keep_alive_disabled()
+ cs.process_event(CLIENT, EndOfMessage)
+ cs.process_event(SERVER, Response)
+ cs.process_event(SERVER, EndOfMessage)
+
+ with pytest.raises(LocalProtocolError):
+ cs.start_next_cycle()
+
+ # One side closed
+
+ cs = ConnectionState()
+ cs.process_event(CLIENT, Request)
+ cs.process_event(CLIENT, EndOfMessage)
+ cs.process_event(CLIENT, ConnectionClosed)
+ cs.process_event(SERVER, Response)
+ cs.process_event(SERVER, EndOfMessage)
+
+ with pytest.raises(LocalProtocolError):
+ cs.start_next_cycle()
+
+ # Succesful protocol switch
+
+ cs = ConnectionState()
+ cs.process_client_switch_proposal(_SWITCH_UPGRADE)
+ cs.process_event(CLIENT, Request)
+ cs.process_event(CLIENT, EndOfMessage)
+ cs.process_event(SERVER, InformationalResponse, _SWITCH_UPGRADE)
+
+ with pytest.raises(LocalProtocolError):
+ cs.start_next_cycle()
+
+ # Failed protocol switch
+
+ cs = ConnectionState()
+ cs.process_client_switch_proposal(_SWITCH_UPGRADE)
+ cs.process_event(CLIENT, Request)
+ cs.process_event(CLIENT, EndOfMessage)
+ cs.process_event(SERVER, Response)
+ cs.process_event(SERVER, EndOfMessage)
+
+ cs.start_next_cycle()
+ assert cs.states == {CLIENT: IDLE, SERVER: IDLE}
+
+
+def test_server_request_is_illegal() -> None:
+ # There used to be a bug in how we handled the Request special case that
+ # made this allowed...
+ cs = ConnectionState()
+ with pytest.raises(LocalProtocolError):
+ cs.process_event(SERVER, Request)
diff --git a/venv/lib/python3.11/site-packages/h11/tests/test_util.py b/venv/lib/python3.11/site-packages/h11/tests/test_util.py
new file mode 100644
index 0000000..79bc095
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/h11/tests/test_util.py
@@ -0,0 +1,112 @@
+import re
+import sys
+import traceback
+from typing import NoReturn
+
+import pytest
+
+from .._util import (
+ bytesify,
+ LocalProtocolError,
+ ProtocolError,
+ RemoteProtocolError,
+ Sentinel,
+ validate,
+)
+
+
+def test_ProtocolError() -> None:
+ with pytest.raises(TypeError):
+ ProtocolError("abstract base class")
+
+
+def test_LocalProtocolError() -> None:
+ try:
+ raise LocalProtocolError("foo")
+ except LocalProtocolError as e:
+ assert str(e) == "foo"
+ assert e.error_status_hint == 400
+
+ try:
+ raise LocalProtocolError("foo", error_status_hint=418)
+ except LocalProtocolError as e:
+ assert str(e) == "foo"
+ assert e.error_status_hint == 418
+
+ def thunk() -> NoReturn:
+ raise LocalProtocolError("a", error_status_hint=420)
+
+ try:
+ try:
+ thunk()
+ except LocalProtocolError as exc1:
+ orig_traceback = "".join(traceback.format_tb(sys.exc_info()[2]))
+ exc1._reraise_as_remote_protocol_error()
+ except RemoteProtocolError as exc2:
+ assert type(exc2) is RemoteProtocolError
+ assert exc2.args == ("a",)
+ assert exc2.error_status_hint == 420
+ new_traceback = "".join(traceback.format_tb(sys.exc_info()[2]))
+ assert new_traceback.endswith(orig_traceback)
+
+
+def test_validate() -> None:
+ my_re = re.compile(rb"(?P<group1>[0-9]+)\.(?P<group2>[0-9]+)")
+ with pytest.raises(LocalProtocolError):
+ validate(my_re, b"0.")
+
+ groups = validate(my_re, b"0.1")
+ assert groups == {"group1": b"0", "group2": b"1"}
+
+ # successful partial matches are an error - must match whole string
+ with pytest.raises(LocalProtocolError):
+ validate(my_re, b"0.1xx")
+ with pytest.raises(LocalProtocolError):
+ validate(my_re, b"0.1\n")
+
+
+def test_validate_formatting() -> None:
+ my_re = re.compile(rb"foo")
+
+ with pytest.raises(LocalProtocolError) as excinfo:
+ validate(my_re, b"", "oops")
+ assert "oops" in str(excinfo.value)
+
+ with pytest.raises(LocalProtocolError) as excinfo:
+ validate(my_re, b"", "oops {}")
+ assert "oops {}" in str(excinfo.value)
+
+ with pytest.raises(LocalProtocolError) as excinfo:
+ validate(my_re, b"", "oops {} xx", 10)
+ assert "oops 10 xx" in str(excinfo.value)
+
+
+def test_make_sentinel() -> None:
+ class S(Sentinel, metaclass=Sentinel):
+ pass
+
+ assert repr(S) == "S"
+ assert S == S
+ assert type(S).__name__ == "S"
+ assert S in {S}
+ assert type(S) is S
+
+ class S2(Sentinel, metaclass=Sentinel):
+ pass
+
+ assert repr(S2) == "S2"
+ assert S != S2
+ assert S not in {S2}
+ assert type(S) is not type(S2)
+
+
+def test_bytesify() -> None:
+ assert bytesify(b"123") == b"123"
+ assert bytesify(bytearray(b"123")) == b"123"
+ assert bytesify("123") == b"123"
+
+ with pytest.raises(UnicodeEncodeError):
+ bytesify("\u1234")
+
+ with pytest.raises(TypeError):
+ bytesify(10)