summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/websockets/legacy/framing.py
blob: b77b869e3faad0518d1434d94761a9a85cd28c64 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from __future__ import annotations

import struct
from typing import Any, Awaitable, Callable, NamedTuple, Optional, Sequence, Tuple

from .. import extensions, frames
from ..exceptions import PayloadTooBig, ProtocolError


try:
    from ..speedups import apply_mask
except ImportError:
    from ..utils import apply_mask


class Frame(NamedTuple):
    fin: bool
    opcode: frames.Opcode
    data: bytes
    rsv1: bool = False
    rsv2: bool = False
    rsv3: bool = False

    @property
    def new_frame(self) -> frames.Frame:
        return frames.Frame(
            self.opcode,
            self.data,
            self.fin,
            self.rsv1,
            self.rsv2,
            self.rsv3,
        )

    def __str__(self) -> str:
        return str(self.new_frame)

    def check(self) -> None:
        return self.new_frame.check()

    @classmethod
    async def read(
        cls,
        reader: Callable[[int], Awaitable[bytes]],
        *,
        mask: bool,
        max_size: Optional[int] = None,
        extensions: Optional[Sequence[extensions.Extension]] = None,
    ) -> Frame:
        """
        Read a WebSocket frame.

        Args:
            reader: Coroutine that reads exactly the requested number of
                bytes, unless the end of file is reached.
            mask: Whether the frame should be masked i.e. whether the read
                happens on the server side.
            max_size: Maximum payload size in bytes.
            extensions: List of extensions, applied in reverse order.

        Raises:
            PayloadTooBig: If the frame exceeds ``max_size``.
            ProtocolError: If the frame contains incorrect values.

        """

        # Read the header.
        data = await reader(2)
        head1, head2 = struct.unpack("!BB", data)

        # While not Pythonic, this is marginally faster than calling bool().
        fin = True if head1 & 0b10000000 else False
        rsv1 = True if head1 & 0b01000000 else False
        rsv2 = True if head1 & 0b00100000 else False
        rsv3 = True if head1 & 0b00010000 else False

        try:
            opcode = frames.Opcode(head1 & 0b00001111)
        except ValueError as exc:
            raise ProtocolError("invalid opcode") from exc

        if (True if head2 & 0b10000000 else False) != mask:
            raise ProtocolError("incorrect masking")

        length = head2 & 0b01111111
        if length == 126:
            data = await reader(2)
            (length,) = struct.unpack("!H", data)
        elif length == 127:
            data = await reader(8)
            (length,) = struct.unpack("!Q", data)
        if max_size is not None and length > max_size:
            raise PayloadTooBig(f"over size limit ({length} > {max_size} bytes)")
        if mask:
            mask_bits = await reader(4)

        # Read the data.
        data = await reader(length)
        if mask:
            data = apply_mask(data, mask_bits)

        new_frame = frames.Frame(opcode, data, fin, rsv1, rsv2, rsv3)

        if extensions is None:
            extensions = []
        for extension in reversed(extensions):
            new_frame = extension.decode(new_frame, max_size=max_size)

        new_frame.check()

        return cls(
            new_frame.fin,
            new_frame.opcode,
            new_frame.data,
            new_frame.rsv1,
            new_frame.rsv2,
            new_frame.rsv3,
        )

    def write(
        self,
        write: Callable[[bytes], Any],
        *,
        mask: bool,
        extensions: Optional[Sequence[extensions.Extension]] = None,
    ) -> None:
        """
        Write a WebSocket frame.

        Args:
            frame: Frame to write.
            write: Function that writes bytes.
            mask: Whether the frame should be masked i.e. whether the write
                happens on the client side.
            extensions: List of extensions, applied in order.

        Raises:
            ProtocolError: If the frame contains incorrect values.

        """
        # The frame is written in a single call to write in order to prevent
        # TCP fragmentation. See #68 for details. This also makes it safe to
        # send frames concurrently from multiple coroutines.
        write(self.new_frame.serialize(mask=mask, extensions=extensions))


# Backwards compatibility with previously documented public APIs
from ..frames import (  # noqa: E402, F401, I001
    Close,
    prepare_ctrl as encode_data,
    prepare_data,
)


def parse_close(data: bytes) -> Tuple[int, str]:
    """
    Parse the payload from a close frame.

    Returns:
        Close code and reason.

    Raises:
        ProtocolError: If data is ill-formed.
        UnicodeDecodeError: If the reason isn't valid UTF-8.

    """
    close = Close.parse(data)
    return close.code, close.reason


def serialize_close(code: int, reason: str) -> bytes:
    """
    Serialize the payload for a close frame.

    """
    return Close(code, reason).serialize()