summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/h11/tests/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/h11/tests/helpers.py')
-rw-r--r--venv/lib/python3.11/site-packages/h11/tests/helpers.py101
1 files changed, 0 insertions, 101 deletions
diff --git a/venv/lib/python3.11/site-packages/h11/tests/helpers.py b/venv/lib/python3.11/site-packages/h11/tests/helpers.py
deleted file mode 100644
index 571be44..0000000
--- a/venv/lib/python3.11/site-packages/h11/tests/helpers.py
+++ /dev/null
@@ -1,101 +0,0 @@
-from typing import cast, List, Type, Union, ValuesView
-
-from .._connection import Connection, NEED_DATA, PAUSED
-from .._events import (
- ConnectionClosed,
- Data,
- EndOfMessage,
- Event,
- InformationalResponse,
- Request,
- Response,
-)
-from .._state import CLIENT, CLOSED, DONE, MUST_CLOSE, SERVER
-from .._util import Sentinel
-
-try:
- from typing import Literal
-except ImportError:
- from typing_extensions import Literal # type: ignore
-
-
-def get_all_events(conn: Connection) -> List[Event]:
- got_events = []
- while True:
- event = conn.next_event()
- if event in (NEED_DATA, PAUSED):
- break
- event = cast(Event, event)
- got_events.append(event)
- if type(event) is ConnectionClosed:
- break
- return got_events
-
-
-def receive_and_get(conn: Connection, data: bytes) -> List[Event]:
- conn.receive_data(data)
- return get_all_events(conn)
-
-
-# Merges adjacent Data events, converts payloads to bytestrings, and removes
-# chunk boundaries.
-def normalize_data_events(in_events: List[Event]) -> List[Event]:
- out_events: List[Event] = []
- for event in in_events:
- if type(event) is Data:
- event = Data(data=bytes(event.data), chunk_start=False, chunk_end=False)
- if out_events and type(out_events[-1]) is type(event) is Data:
- out_events[-1] = Data(
- data=out_events[-1].data + event.data,
- chunk_start=out_events[-1].chunk_start,
- chunk_end=out_events[-1].chunk_end,
- )
- else:
- out_events.append(event)
- return out_events
-
-
-# Given that we want to write tests that push some events through a Connection
-# and check that its state updates appropriately... we might as make a habit
-# of pushing them through two Connections with a fake network link in
-# between.
-class ConnectionPair:
- def __init__(self) -> None:
- self.conn = {CLIENT: Connection(CLIENT), SERVER: Connection(SERVER)}
- self.other = {CLIENT: SERVER, SERVER: CLIENT}
-
- @property
- def conns(self) -> ValuesView[Connection]:
- return self.conn.values()
-
- # expect="match" if expect=send_events; expect=[...] to say what expected
- def send(
- self,
- role: Type[Sentinel],
- send_events: Union[List[Event], Event],
- expect: Union[List[Event], Event, Literal["match"]] = "match",
- ) -> bytes:
- if not isinstance(send_events, list):
- send_events = [send_events]
- data = b""
- closed = False
- for send_event in send_events:
- new_data = self.conn[role].send(send_event)
- if new_data is None:
- closed = True
- else:
- data += new_data
- # send uses b"" to mean b"", and None to mean closed
- # receive uses b"" to mean closed, and None to mean "try again"
- # so we have to translate between the two conventions
- if data:
- self.conn[self.other[role]].receive_data(data)
- if closed:
- self.conn[self.other[role]].receive_data(b"")
- got_events = get_all_events(self.conn[self.other[role]])
- if expect == "match":
- expect = send_events
- if not isinstance(expect, list):
- expect = [expect]
- assert got_events == expect
- return data