summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/websockets/streams.py
blob: f861d4bd25ecc5b5a4ac5ebc22a251712fb4e68e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from __future__ import annotations

from typing import Generator


class StreamReader:
    """
    Generator-based stream reader.

    This class doesn't support concurrent calls to :meth:`read_line`,
    :meth:`read_exact`, or :meth:`read_to_eof`. Make sure calls are
    serialized.

    """

    def __init__(self) -> None:
        self.buffer = bytearray()
        self.eof = False

    def read_line(self, m: int) -> Generator[None, None, bytes]:
        """
        Read a LF-terminated line from the stream.

        This is a generator-based coroutine.

        The return value includes the LF character.

        Args:
            m: maximum number bytes to read; this is a security limit.

        Raises:
            EOFError: if the stream ends without a LF.
            RuntimeError: if the stream ends in more than ``m`` bytes.

        """
        n = 0  # number of bytes to read
        p = 0  # number of bytes without a newline
        while True:
            n = self.buffer.find(b"\n", p) + 1
            if n > 0:
                break
            p = len(self.buffer)
            if p > m:
                raise RuntimeError(f"read {p} bytes, expected no more than {m} bytes")
            if self.eof:
                raise EOFError(f"stream ends after {p} bytes, before end of line")
            yield
        if n > m:
            raise RuntimeError(f"read {n} bytes, expected no more than {m} bytes")
        r = self.buffer[:n]
        del self.buffer[:n]
        return r

    def read_exact(self, n: int) -> Generator[None, None, bytes]:
        """
        Read a given number of bytes from the stream.

        This is a generator-based coroutine.

        Args:
            n: how many bytes to read.

        Raises:
            EOFError: if the stream ends in less than ``n`` bytes.

        """
        assert n >= 0
        while len(self.buffer) < n:
            if self.eof:
                p = len(self.buffer)
                raise EOFError(f"stream ends after {p} bytes, expected {n} bytes")
            yield
        r = self.buffer[:n]
        del self.buffer[:n]
        return r

    def read_to_eof(self, m: int) -> Generator[None, None, bytes]:
        """
        Read all bytes from the stream.

        This is a generator-based coroutine.

        Args:
            m: maximum number bytes to read; this is a security limit.

        Raises:
            RuntimeError: if the stream ends in more than ``m`` bytes.

        """
        while not self.eof:
            p = len(self.buffer)
            if p > m:
                raise RuntimeError(f"read {p} bytes, expected no more than {m} bytes")
            yield
        r = self.buffer[:]
        del self.buffer[:]
        return r

    def at_eof(self) -> Generator[None, None, bool]:
        """
        Tell whether the stream has ended and all data was read.

        This is a generator-based coroutine.

        """
        while True:
            if self.buffer:
                return False
            if self.eof:
                return True
            # When all data was read but the stream hasn't ended, we can't
            # tell if until either feed_data() or feed_eof() is called.
            yield

    def feed_data(self, data: bytes) -> None:
        """
        Write data to the stream.

        :meth:`feed_data` cannot be called after :meth:`feed_eof`.

        Args:
            data: data to write.

        Raises:
            EOFError: if the stream has ended.

        """
        if self.eof:
            raise EOFError("stream ended")
        self.buffer += data

    def feed_eof(self) -> None:
        """
        End the stream.

        :meth:`feed_eof` cannot be called more than once.

        Raises:
            EOFError: if the stream has ended.

        """
        if self.eof:
            raise EOFError("stream ended")
        self.eof = True

    def discard(self) -> None:
        """
        Discard all buffered data, but don't end the stream.

        """
        del self.buffer[:]