summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/anyio/streams/buffered.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/anyio/streams/buffered.py
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/anyio/streams/buffered.py')
-rw-r--r--venv/lib/python3.11/site-packages/anyio/streams/buffered.py119
1 files changed, 0 insertions, 119 deletions
diff --git a/venv/lib/python3.11/site-packages/anyio/streams/buffered.py b/venv/lib/python3.11/site-packages/anyio/streams/buffered.py
deleted file mode 100644
index f5d5e83..0000000
--- a/venv/lib/python3.11/site-packages/anyio/streams/buffered.py
+++ /dev/null
@@ -1,119 +0,0 @@
-from __future__ import annotations
-
-from collections.abc import Callable, Mapping
-from dataclasses import dataclass, field
-from typing import Any
-
-from .. import ClosedResourceError, DelimiterNotFound, EndOfStream, IncompleteRead
-from ..abc import AnyByteReceiveStream, ByteReceiveStream
-
-
-@dataclass(eq=False)
-class BufferedByteReceiveStream(ByteReceiveStream):
- """
- Wraps any bytes-based receive stream and uses a buffer to provide sophisticated
- receiving capabilities in the form of a byte stream.
- """
-
- receive_stream: AnyByteReceiveStream
- _buffer: bytearray = field(init=False, default_factory=bytearray)
- _closed: bool = field(init=False, default=False)
-
- async def aclose(self) -> None:
- await self.receive_stream.aclose()
- self._closed = True
-
- @property
- def buffer(self) -> bytes:
- """The bytes currently in the buffer."""
- return bytes(self._buffer)
-
- @property
- def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
- return self.receive_stream.extra_attributes
-
- async def receive(self, max_bytes: int = 65536) -> bytes:
- if self._closed:
- raise ClosedResourceError
-
- if self._buffer:
- chunk = bytes(self._buffer[:max_bytes])
- del self._buffer[:max_bytes]
- return chunk
- elif isinstance(self.receive_stream, ByteReceiveStream):
- return await self.receive_stream.receive(max_bytes)
- else:
- # With a bytes-oriented object stream, we need to handle any surplus bytes
- # we get from the receive() call
- chunk = await self.receive_stream.receive()
- if len(chunk) > max_bytes:
- # Save the surplus bytes in the buffer
- self._buffer.extend(chunk[max_bytes:])
- return chunk[:max_bytes]
- else:
- return chunk
-
- async def receive_exactly(self, nbytes: int) -> bytes:
- """
- Read exactly the given amount of bytes from the stream.
-
- :param nbytes: the number of bytes to read
- :return: the bytes read
- :raises ~anyio.IncompleteRead: if the stream was closed before the requested
- amount of bytes could be read from the stream
-
- """
- while True:
- remaining = nbytes - len(self._buffer)
- if remaining <= 0:
- retval = self._buffer[:nbytes]
- del self._buffer[:nbytes]
- return bytes(retval)
-
- try:
- if isinstance(self.receive_stream, ByteReceiveStream):
- chunk = await self.receive_stream.receive(remaining)
- else:
- chunk = await self.receive_stream.receive()
- except EndOfStream as exc:
- raise IncompleteRead from exc
-
- self._buffer.extend(chunk)
-
- async def receive_until(self, delimiter: bytes, max_bytes: int) -> bytes:
- """
- Read from the stream until the delimiter is found or max_bytes have been read.
-
- :param delimiter: the marker to look for in the stream
- :param max_bytes: maximum number of bytes that will be read before raising
- :exc:`~anyio.DelimiterNotFound`
- :return: the bytes read (not including the delimiter)
- :raises ~anyio.IncompleteRead: if the stream was closed before the delimiter
- was found
- :raises ~anyio.DelimiterNotFound: if the delimiter is not found within the
- bytes read up to the maximum allowed
-
- """
- delimiter_size = len(delimiter)
- offset = 0
- while True:
- # Check if the delimiter can be found in the current buffer
- index = self._buffer.find(delimiter, offset)
- if index >= 0:
- found = self._buffer[:index]
- del self._buffer[: index + len(delimiter) :]
- return bytes(found)
-
- # Check if the buffer is already at or over the limit
- if len(self._buffer) >= max_bytes:
- raise DelimiterNotFound(max_bytes)
-
- # Read more data into the buffer from the socket
- try:
- data = await self.receive_stream.receive()
- except EndOfStream as exc:
- raise IncompleteRead from exc
-
- # Move the offset forward and add the new data to the buffer
- offset = max(len(self._buffer) - delimiter_size + 1, 0)
- self._buffer.extend(data)