summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/websockets/sync/messages.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/websockets/sync/messages.py')
-rw-r--r--venv/lib/python3.11/site-packages/websockets/sync/messages.py281
1 files changed, 281 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/websockets/sync/messages.py b/venv/lib/python3.11/site-packages/websockets/sync/messages.py
new file mode 100644
index 0000000..67a2231
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/sync/messages.py
@@ -0,0 +1,281 @@
+from __future__ import annotations
+
+import codecs
+import queue
+import threading
+from typing import Iterator, List, Optional, cast
+
+from ..frames import Frame, Opcode
+from ..typing import Data
+
+
+__all__ = ["Assembler"]
+
+UTF8Decoder = codecs.getincrementaldecoder("utf-8")
+
+
+class Assembler:
+ """
+ Assemble messages from frames.
+
+ """
+
+ def __init__(self) -> None:
+ # Serialize reads and writes -- except for reads via synchronization
+ # primitives provided by the threading and queue modules.
+ self.mutex = threading.Lock()
+
+ # We create a latch with two events to ensure proper interleaving of
+ # writing and reading messages.
+ # put() sets this event to tell get() that a message can be fetched.
+ self.message_complete = threading.Event()
+ # get() sets this event to let put() that the message was fetched.
+ self.message_fetched = threading.Event()
+
+ # This flag prevents concurrent calls to get() by user code.
+ self.get_in_progress = False
+ # This flag prevents concurrent calls to put() by library code.
+ self.put_in_progress = False
+
+ # Decoder for text frames, None for binary frames.
+ self.decoder: Optional[codecs.IncrementalDecoder] = None
+
+ # Buffer of frames belonging to the same message.
+ self.chunks: List[Data] = []
+
+ # When switching from "buffering" to "streaming", we use a thread-safe
+ # queue for transferring frames from the writing thread (library code)
+ # to the reading thread (user code). We're buffering when chunks_queue
+ # is None and streaming when it's a SimpleQueue. None is a sentinel
+ # value marking the end of the stream, superseding message_complete.
+
+ # Stream data from frames belonging to the same message.
+ # Remove quotes around type when dropping Python < 3.9.
+ self.chunks_queue: Optional["queue.SimpleQueue[Optional[Data]]"] = None
+
+ # This flag marks the end of the stream.
+ self.closed = False
+
+ def get(self, timeout: Optional[float] = None) -> Data:
+ """
+ Read the next message.
+
+ :meth:`get` returns a single :class:`str` or :class:`bytes`.
+
+ If the message is fragmented, :meth:`get` waits until the last frame is
+ received, then it reassembles the message and returns it. To receive
+ messages frame by frame, use :meth:`get_iter` instead.
+
+ Args:
+ timeout: If a timeout is provided and elapses before a complete
+ message is received, :meth:`get` raises :exc:`TimeoutError`.
+
+ Raises:
+ EOFError: If the stream of frames has ended.
+ RuntimeError: If two threads run :meth:`get` or :meth:``get_iter`
+ concurrently.
+
+ """
+ with self.mutex:
+ if self.closed:
+ raise EOFError("stream of frames ended")
+
+ if self.get_in_progress:
+ raise RuntimeError("get or get_iter is already running")
+
+ self.get_in_progress = True
+
+ # If the message_complete event isn't set yet, release the lock to
+ # allow put() to run and eventually set it.
+ # Locking with get_in_progress ensures only one thread can get here.
+ completed = self.message_complete.wait(timeout)
+
+ with self.mutex:
+ self.get_in_progress = False
+
+ # Waiting for a complete message timed out.
+ if not completed:
+ raise TimeoutError(f"timed out in {timeout:.1f}s")
+
+ # get() was unblocked by close() rather than put().
+ if self.closed:
+ raise EOFError("stream of frames ended")
+
+ assert self.message_complete.is_set()
+ self.message_complete.clear()
+
+ joiner: Data = b"" if self.decoder is None else ""
+ # mypy cannot figure out that chunks have the proper type.
+ message: Data = joiner.join(self.chunks) # type: ignore
+
+ assert not self.message_fetched.is_set()
+ self.message_fetched.set()
+
+ self.chunks = []
+ assert self.chunks_queue is None
+
+ return message
+
+ def get_iter(self) -> Iterator[Data]:
+ """
+ Stream the next message.
+
+ Iterating the return value of :meth:`get_iter` yields a :class:`str` or
+ :class:`bytes` for each frame in the message.
+
+ The iterator must be fully consumed before calling :meth:`get_iter` or
+ :meth:`get` again. Else, :exc:`RuntimeError` is raised.
+
+ This method only makes sense for fragmented messages. If messages aren't
+ fragmented, use :meth:`get` instead.
+
+ Raises:
+ EOFError: If the stream of frames has ended.
+ RuntimeError: If two threads run :meth:`get` or :meth:``get_iter`
+ concurrently.
+
+ """
+ with self.mutex:
+ if self.closed:
+ raise EOFError("stream of frames ended")
+
+ if self.get_in_progress:
+ raise RuntimeError("get or get_iter is already running")
+
+ chunks = self.chunks
+ self.chunks = []
+ self.chunks_queue = cast(
+ # Remove quotes around type when dropping Python < 3.9.
+ "queue.SimpleQueue[Optional[Data]]",
+ queue.SimpleQueue(),
+ )
+
+ # Sending None in chunk_queue supersedes setting message_complete
+ # when switching to "streaming". If message is already complete
+ # when the switch happens, put() didn't send None, so we have to.
+ if self.message_complete.is_set():
+ self.chunks_queue.put(None)
+
+ self.get_in_progress = True
+
+ # Locking with get_in_progress ensures only one thread can get here.
+ yield from chunks
+ while True:
+ chunk = self.chunks_queue.get()
+ if chunk is None:
+ break
+ yield chunk
+
+ with self.mutex:
+ self.get_in_progress = False
+
+ assert self.message_complete.is_set()
+ self.message_complete.clear()
+
+ # get_iter() was unblocked by close() rather than put().
+ if self.closed:
+ raise EOFError("stream of frames ended")
+
+ assert not self.message_fetched.is_set()
+ self.message_fetched.set()
+
+ assert self.chunks == []
+ self.chunks_queue = None
+
+ def put(self, frame: Frame) -> None:
+ """
+ Add ``frame`` to the next message.
+
+ When ``frame`` is the final frame in a message, :meth:`put` waits until
+ the message is fetched, either by calling :meth:`get` or by fully
+ consuming the return value of :meth:`get_iter`.
+
+ :meth:`put` assumes that the stream of frames respects the protocol. If
+ it doesn't, the behavior is undefined.
+
+ Raises:
+ EOFError: If the stream of frames has ended.
+ RuntimeError: If two threads run :meth:`put` concurrently.
+
+ """
+ with self.mutex:
+ if self.closed:
+ raise EOFError("stream of frames ended")
+
+ if self.put_in_progress:
+ raise RuntimeError("put is already running")
+
+ if frame.opcode is Opcode.TEXT:
+ self.decoder = UTF8Decoder(errors="strict")
+ elif frame.opcode is Opcode.BINARY:
+ self.decoder = None
+ elif frame.opcode is Opcode.CONT:
+ pass
+ else:
+ # Ignore control frames.
+ return
+
+ data: Data
+ if self.decoder is not None:
+ data = self.decoder.decode(frame.data, frame.fin)
+ else:
+ data = frame.data
+
+ if self.chunks_queue is None:
+ self.chunks.append(data)
+ else:
+ self.chunks_queue.put(data)
+
+ if not frame.fin:
+ return
+
+ # Message is complete. Wait until it's fetched to return.
+
+ assert not self.message_complete.is_set()
+ self.message_complete.set()
+
+ if self.chunks_queue is not None:
+ self.chunks_queue.put(None)
+
+ assert not self.message_fetched.is_set()
+
+ self.put_in_progress = True
+
+ # Release the lock to allow get() to run and eventually set the event.
+ self.message_fetched.wait()
+
+ with self.mutex:
+ self.put_in_progress = False
+
+ assert self.message_fetched.is_set()
+ self.message_fetched.clear()
+
+ # put() was unblocked by close() rather than get() or get_iter().
+ if self.closed:
+ raise EOFError("stream of frames ended")
+
+ self.decoder = None
+
+ def close(self) -> None:
+ """
+ End the stream of frames.
+
+ Callling :meth:`close` concurrently with :meth:`get`, :meth:`get_iter`,
+ or :meth:`put` is safe. They will raise :exc:`EOFError`.
+
+ """
+ with self.mutex:
+ if self.closed:
+ return
+
+ self.closed = True
+
+ # Unblock get or get_iter.
+ if self.get_in_progress:
+ self.message_complete.set()
+ if self.chunks_queue is not None:
+ self.chunks_queue.put(None)
+
+ # Unblock put().
+ if self.put_in_progress:
+ self.message_fetched.set()