summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/httpx/_decoders.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/httpx/_decoders.py
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/httpx/_decoders.py')
-rw-r--r--venv/lib/python3.11/site-packages/httpx/_decoders.py329
1 files changed, 0 insertions, 329 deletions
diff --git a/venv/lib/python3.11/site-packages/httpx/_decoders.py b/venv/lib/python3.11/site-packages/httpx/_decoders.py
deleted file mode 100644
index 31c72c7..0000000
--- a/venv/lib/python3.11/site-packages/httpx/_decoders.py
+++ /dev/null
@@ -1,329 +0,0 @@
-"""
-Handlers for Content-Encoding.
-
-See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
-"""
-from __future__ import annotations
-
-import codecs
-import io
-import typing
-import zlib
-
-from ._compat import brotli
-from ._exceptions import DecodingError
-
-
-class ContentDecoder:
- def decode(self, data: bytes) -> bytes:
- raise NotImplementedError() # pragma: no cover
-
- def flush(self) -> bytes:
- raise NotImplementedError() # pragma: no cover
-
-
-class IdentityDecoder(ContentDecoder):
- """
- Handle unencoded data.
- """
-
- def decode(self, data: bytes) -> bytes:
- return data
-
- def flush(self) -> bytes:
- return b""
-
-
-class DeflateDecoder(ContentDecoder):
- """
- Handle 'deflate' decoding.
-
- See: https://stackoverflow.com/questions/1838699
- """
-
- def __init__(self) -> None:
- self.first_attempt = True
- self.decompressor = zlib.decompressobj()
-
- def decode(self, data: bytes) -> bytes:
- was_first_attempt = self.first_attempt
- self.first_attempt = False
- try:
- return self.decompressor.decompress(data)
- except zlib.error as exc:
- if was_first_attempt:
- self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
- return self.decode(data)
- raise DecodingError(str(exc)) from exc
-
- def flush(self) -> bytes:
- try:
- return self.decompressor.flush()
- except zlib.error as exc: # pragma: no cover
- raise DecodingError(str(exc)) from exc
-
-
-class GZipDecoder(ContentDecoder):
- """
- Handle 'gzip' decoding.
-
- See: https://stackoverflow.com/questions/1838699
- """
-
- def __init__(self) -> None:
- self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
-
- def decode(self, data: bytes) -> bytes:
- try:
- return self.decompressor.decompress(data)
- except zlib.error as exc:
- raise DecodingError(str(exc)) from exc
-
- def flush(self) -> bytes:
- try:
- return self.decompressor.flush()
- except zlib.error as exc: # pragma: no cover
- raise DecodingError(str(exc)) from exc
-
-
-class BrotliDecoder(ContentDecoder):
- """
- Handle 'brotli' decoding.
-
- Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/
- or `pip install brotli`. See https://github.com/google/brotli
- Supports both 'brotlipy' and 'Brotli' packages since they share an import
- name. The top branches are for 'brotlipy' and bottom branches for 'Brotli'
- """
-
- def __init__(self) -> None:
- if brotli is None: # pragma: no cover
- raise ImportError(
- "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' "
- "packages have been installed. "
- "Make sure to install httpx using `pip install httpx[brotli]`."
- ) from None
-
- self.decompressor = brotli.Decompressor()
- self.seen_data = False
- self._decompress: typing.Callable[[bytes], bytes]
- if hasattr(self.decompressor, "decompress"):
- # The 'brotlicffi' package.
- self._decompress = self.decompressor.decompress # pragma: no cover
- else:
- # The 'brotli' package.
- self._decompress = self.decompressor.process # pragma: no cover
-
- def decode(self, data: bytes) -> bytes:
- if not data:
- return b""
- self.seen_data = True
- try:
- return self._decompress(data)
- except brotli.error as exc:
- raise DecodingError(str(exc)) from exc
-
- def flush(self) -> bytes:
- if not self.seen_data:
- return b""
- try:
- if hasattr(self.decompressor, "finish"):
- # Only available in the 'brotlicffi' package.
-
- # As the decompressor decompresses eagerly, this
- # will never actually emit any data. However, it will potentially throw
- # errors if a truncated or damaged data stream has been used.
- self.decompressor.finish() # pragma: no cover
- return b""
- except brotli.error as exc: # pragma: no cover
- raise DecodingError(str(exc)) from exc
-
-
-class MultiDecoder(ContentDecoder):
- """
- Handle the case where multiple encodings have been applied.
- """
-
- def __init__(self, children: typing.Sequence[ContentDecoder]) -> None:
- """
- 'children' should be a sequence of decoders in the order in which
- each was applied.
- """
- # Note that we reverse the order for decoding.
- self.children = list(reversed(children))
-
- def decode(self, data: bytes) -> bytes:
- for child in self.children:
- data = child.decode(data)
- return data
-
- def flush(self) -> bytes:
- data = b""
- for child in self.children:
- data = child.decode(data) + child.flush()
- return data
-
-
-class ByteChunker:
- """
- Handles returning byte content in fixed-size chunks.
- """
-
- def __init__(self, chunk_size: int | None = None) -> None:
- self._buffer = io.BytesIO()
- self._chunk_size = chunk_size
-
- def decode(self, content: bytes) -> list[bytes]:
- if self._chunk_size is None:
- return [content] if content else []
-
- self._buffer.write(content)
- if self._buffer.tell() >= self._chunk_size:
- value = self._buffer.getvalue()
- chunks = [
- value[i : i + self._chunk_size]
- for i in range(0, len(value), self._chunk_size)
- ]
- if len(chunks[-1]) == self._chunk_size:
- self._buffer.seek(0)
- self._buffer.truncate()
- return chunks
- else:
- self._buffer.seek(0)
- self._buffer.write(chunks[-1])
- self._buffer.truncate()
- return chunks[:-1]
- else:
- return []
-
- def flush(self) -> list[bytes]:
- value = self._buffer.getvalue()
- self._buffer.seek(0)
- self._buffer.truncate()
- return [value] if value else []
-
-
-class TextChunker:
- """
- Handles returning text content in fixed-size chunks.
- """
-
- def __init__(self, chunk_size: int | None = None) -> None:
- self._buffer = io.StringIO()
- self._chunk_size = chunk_size
-
- def decode(self, content: str) -> list[str]:
- if self._chunk_size is None:
- return [content] if content else []
-
- self._buffer.write(content)
- if self._buffer.tell() >= self._chunk_size:
- value = self._buffer.getvalue()
- chunks = [
- value[i : i + self._chunk_size]
- for i in range(0, len(value), self._chunk_size)
- ]
- if len(chunks[-1]) == self._chunk_size:
- self._buffer.seek(0)
- self._buffer.truncate()
- return chunks
- else:
- self._buffer.seek(0)
- self._buffer.write(chunks[-1])
- self._buffer.truncate()
- return chunks[:-1]
- else:
- return []
-
- def flush(self) -> list[str]:
- value = self._buffer.getvalue()
- self._buffer.seek(0)
- self._buffer.truncate()
- return [value] if value else []
-
-
-class TextDecoder:
- """
- Handles incrementally decoding bytes into text
- """
-
- def __init__(self, encoding: str = "utf-8") -> None:
- self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace")
-
- def decode(self, data: bytes) -> str:
- return self.decoder.decode(data)
-
- def flush(self) -> str:
- return self.decoder.decode(b"", True)
-
-
-class LineDecoder:
- """
- Handles incrementally reading lines from text.
-
- Has the same behaviour as the stdllib splitlines,
- but handling the input iteratively.
- """
-
- def __init__(self) -> None:
- self.buffer: list[str] = []
- self.trailing_cr: bool = False
-
- def decode(self, text: str) -> list[str]:
- # See https://docs.python.org/3/library/stdtypes.html#str.splitlines
- NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
-
- # We always push a trailing `\r` into the next decode iteration.
- if self.trailing_cr:
- text = "\r" + text
- self.trailing_cr = False
- if text.endswith("\r"):
- self.trailing_cr = True
- text = text[:-1]
-
- if not text:
- # NOTE: the edge case input of empty text doesn't occur in practice,
- # because other httpx internals filter out this value
- return [] # pragma: no cover
-
- trailing_newline = text[-1] in NEWLINE_CHARS
- lines = text.splitlines()
-
- if len(lines) == 1 and not trailing_newline:
- # No new lines, buffer the input and continue.
- self.buffer.append(lines[0])
- return []
-
- if self.buffer:
- # Include any existing buffer in the first portion of the
- # splitlines result.
- lines = ["".join(self.buffer) + lines[0]] + lines[1:]
- self.buffer = []
-
- if not trailing_newline:
- # If the last segment of splitlines is not newline terminated,
- # then drop it from our output and start a new buffer.
- self.buffer = [lines.pop()]
-
- return lines
-
- def flush(self) -> list[str]:
- if not self.buffer and not self.trailing_cr:
- return []
-
- lines = ["".join(self.buffer)]
- self.buffer = []
- self.trailing_cr = False
- return lines
-
-
-SUPPORTED_DECODERS = {
- "identity": IdentityDecoder,
- "gzip": GZipDecoder,
- "deflate": DeflateDecoder,
- "br": BrotliDecoder,
-}
-
-
-if brotli is None:
- SUPPORTED_DECODERS.pop("br") # pragma: no cover