summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/h11/_receivebuffer.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/h11/_receivebuffer.py')
-rw-r--r--venv/lib/python3.11/site-packages/h11/_receivebuffer.py153
1 files changed, 0 insertions, 153 deletions
diff --git a/venv/lib/python3.11/site-packages/h11/_receivebuffer.py b/venv/lib/python3.11/site-packages/h11/_receivebuffer.py
deleted file mode 100644
index e5c4e08..0000000
--- a/venv/lib/python3.11/site-packages/h11/_receivebuffer.py
+++ /dev/null
@@ -1,153 +0,0 @@
-import re
-import sys
-from typing import List, Optional, Union
-
-__all__ = ["ReceiveBuffer"]
-
-
-# Operations we want to support:
-# - find next \r\n or \r\n\r\n (\n or \n\n are also acceptable),
-# or wait until there is one
-# - read at-most-N bytes
-# Goals:
-# - on average, do this fast
-# - worst case, do this in O(n) where n is the number of bytes processed
-# Plan:
-# - store bytearray, offset, how far we've searched for a separator token
-# - use the how-far-we've-searched data to avoid rescanning
-# - while doing a stream of uninterrupted processing, advance offset instead
-# of constantly copying
-# WARNING:
-# - I haven't benchmarked or profiled any of this yet.
-#
-# Note that starting in Python 3.4, deleting the initial n bytes from a
-# bytearray is amortized O(n), thanks to some excellent work by Antoine
-# Martin:
-#
-# https://bugs.python.org/issue19087
-#
-# This means that if we only supported 3.4+, we could get rid of the code here
-# involving self._start and self.compress, because it's doing exactly the same
-# thing that bytearray now does internally.
-#
-# BUT unfortunately, we still support 2.7, and reading short segments out of a
-# long buffer MUST be O(bytes read) to avoid DoS issues, so we can't actually
-# delete this code. Yet:
-#
-# https://pythonclock.org/
-#
-# (Two things to double-check first though: make sure PyPy also has the
-# optimization, and benchmark to make sure it's a win, since we do have a
-# slightly clever thing where we delay calling compress() until we've
-# processed a whole event, which could in theory be slightly more efficient
-# than the internal bytearray support.)
-blank_line_regex = re.compile(b"\n\r?\n", re.MULTILINE)
-
-
-class ReceiveBuffer:
- def __init__(self) -> None:
- self._data = bytearray()
- self._next_line_search = 0
- self._multiple_lines_search = 0
-
- def __iadd__(self, byteslike: Union[bytes, bytearray]) -> "ReceiveBuffer":
- self._data += byteslike
- return self
-
- def __bool__(self) -> bool:
- return bool(len(self))
-
- def __len__(self) -> int:
- return len(self._data)
-
- # for @property unprocessed_data
- def __bytes__(self) -> bytes:
- return bytes(self._data)
-
- def _extract(self, count: int) -> bytearray:
- # extracting an initial slice of the data buffer and return it
- out = self._data[:count]
- del self._data[:count]
-
- self._next_line_search = 0
- self._multiple_lines_search = 0
-
- return out
-
- def maybe_extract_at_most(self, count: int) -> Optional[bytearray]:
- """
- Extract a fixed number of bytes from the buffer.
- """
- out = self._data[:count]
- if not out:
- return None
-
- return self._extract(count)
-
- def maybe_extract_next_line(self) -> Optional[bytearray]:
- """
- Extract the first line, if it is completed in the buffer.
- """
- # Only search in buffer space that we've not already looked at.
- search_start_index = max(0, self._next_line_search - 1)
- partial_idx = self._data.find(b"\r\n", search_start_index)
-
- if partial_idx == -1:
- self._next_line_search = len(self._data)
- return None
-
- # + 2 is to compensate len(b"\r\n")
- idx = partial_idx + 2
-
- return self._extract(idx)
-
- def maybe_extract_lines(self) -> Optional[List[bytearray]]:
- """
- Extract everything up to the first blank line, and return a list of lines.
- """
- # Handle the case where we have an immediate empty line.
- if self._data[:1] == b"\n":
- self._extract(1)
- return []
-
- if self._data[:2] == b"\r\n":
- self._extract(2)
- return []
-
- # Only search in buffer space that we've not already looked at.
- match = blank_line_regex.search(self._data, self._multiple_lines_search)
- if match is None:
- self._multiple_lines_search = max(0, len(self._data) - 2)
- return None
-
- # Truncate the buffer and return it.
- idx = match.span(0)[-1]
- out = self._extract(idx)
- lines = out.split(b"\n")
-
- for line in lines:
- if line.endswith(b"\r"):
- del line[-1]
-
- assert lines[-2] == lines[-1] == b""
-
- del lines[-2:]
-
- return lines
-
- # In theory we should wait until `\r\n` before starting to validate
- # incoming data. However it's interesting to detect (very) invalid data
- # early given they might not even contain `\r\n` at all (hence only
- # timeout will get rid of them).
- # This is not a 100% effective detection but more of a cheap sanity check
- # allowing for early abort in some useful cases.
- # This is especially interesting when peer is messing up with HTTPS and
- # sent us a TLS stream where we were expecting plain HTTP given all
- # versions of TLS so far start handshake with a 0x16 message type code.
- def is_next_line_obviously_invalid_request_line(self) -> bool:
- try:
- # HTTP header line must not contain non-printable characters
- # and should not start with a space
- return self._data[0] < 0x21
- except IndexError:
- return False