summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py')
-rw-r--r--venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py265
1 files changed, 265 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py b/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py
new file mode 100644
index 0000000..8264094
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/websockets/legacy/async_timeout.py
@@ -0,0 +1,265 @@
+# From https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py
+# Licensed under the Apache License (Apache-2.0)
+
+import asyncio
+import enum
+import sys
+import warnings
+from types import TracebackType
+from typing import Optional, Type
+
+
+# From https://github.com/python/typing_extensions/blob/main/src/typing_extensions.py
+# Licensed under the Python Software Foundation License (PSF-2.0)
+
+if sys.version_info >= (3, 11):
+ from typing import final
+else:
+ # @final exists in 3.8+, but we backport it for all versions
+ # before 3.11 to keep support for the __final__ attribute.
+ # See https://bugs.python.org/issue46342
+ def final(f):
+ """This decorator can be used to indicate to type checkers that
+ the decorated method cannot be overridden, and decorated class
+ cannot be subclassed. For example:
+
+ class Base:
+ @final
+ def done(self) -> None:
+ ...
+ class Sub(Base):
+ def done(self) -> None: # Error reported by type checker
+ ...
+ @final
+ class Leaf:
+ ...
+ class Other(Leaf): # Error reported by type checker
+ ...
+
+ There is no runtime checking of these properties. The decorator
+ sets the ``__final__`` attribute to ``True`` on the decorated object
+ to allow runtime introspection.
+ """
+ try:
+ f.__final__ = True
+ except (AttributeError, TypeError):
+ # Skip the attribute silently if it is not writable.
+ # AttributeError happens if the object has __slots__ or a
+ # read-only property, TypeError if it's a builtin class.
+ pass
+ return f
+
+
+# End https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py
+
+__version__ = "4.0.2"
+
+
+__all__ = ("timeout", "timeout_at", "Timeout")
+
+
+def timeout(delay: Optional[float]) -> "Timeout":
+ """timeout context manager.
+
+ Useful in cases when you want to apply timeout logic around block
+ of code or in cases when asyncio.wait_for is not suitable. For example:
+
+ >>> async with timeout(0.001):
+ ... async with aiohttp.get('https://github.com') as r:
+ ... await r.text()
+
+
+ delay - value in seconds or None to disable timeout logic
+ """
+ loop = asyncio.get_running_loop()
+ if delay is not None:
+ deadline = loop.time() + delay # type: Optional[float]
+ else:
+ deadline = None
+ return Timeout(deadline, loop)
+
+
+def timeout_at(deadline: Optional[float]) -> "Timeout":
+ """Schedule the timeout at absolute time.
+
+ deadline argument points on the time in the same clock system
+ as loop.time().
+
+ Please note: it is not POSIX time but a time with
+ undefined starting base, e.g. the time of the system power on.
+
+ >>> async with timeout_at(loop.time() + 10):
+ ... async with aiohttp.get('https://github.com') as r:
+ ... await r.text()
+
+
+ """
+ loop = asyncio.get_running_loop()
+ return Timeout(deadline, loop)
+
+
+class _State(enum.Enum):
+ INIT = "INIT"
+ ENTER = "ENTER"
+ TIMEOUT = "TIMEOUT"
+ EXIT = "EXIT"
+
+
+@final
+class Timeout:
+ # Internal class, please don't instantiate it directly
+ # Use timeout() and timeout_at() public factories instead.
+ #
+ # Implementation note: `async with timeout()` is preferred
+ # over `with timeout()`.
+ # While technically the Timeout class implementation
+ # doesn't need to be async at all,
+ # the `async with` statement explicitly points that
+ # the context manager should be used from async function context.
+ #
+ # This design allows to avoid many silly misusages.
+ #
+ # TimeoutError is raised immediately when scheduled
+ # if the deadline is passed.
+ # The purpose is to time out as soon as possible
+ # without waiting for the next await expression.
+
+ __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler")
+
+ def __init__(
+ self, deadline: Optional[float], loop: asyncio.AbstractEventLoop
+ ) -> None:
+ self._loop = loop
+ self._state = _State.INIT
+
+ self._timeout_handler = None # type: Optional[asyncio.Handle]
+ if deadline is None:
+ self._deadline = None # type: Optional[float]
+ else:
+ self.update(deadline)
+
+ def __enter__(self) -> "Timeout":
+ warnings.warn(
+ "with timeout() is deprecated, use async with timeout() instead",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ self._do_enter()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> Optional[bool]:
+ self._do_exit(exc_type)
+ return None
+
+ async def __aenter__(self) -> "Timeout":
+ self._do_enter()
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> Optional[bool]:
+ self._do_exit(exc_type)
+ return None
+
+ @property
+ def expired(self) -> bool:
+ """Is timeout expired during execution?"""
+ return self._state == _State.TIMEOUT
+
+ @property
+ def deadline(self) -> Optional[float]:
+ return self._deadline
+
+ def reject(self) -> None:
+ """Reject scheduled timeout if any."""
+ # cancel is maybe better name but
+ # task.cancel() raises CancelledError in asyncio world.
+ if self._state not in (_State.INIT, _State.ENTER):
+ raise RuntimeError(f"invalid state {self._state.value}")
+ self._reject()
+
+ def _reject(self) -> None:
+ if self._timeout_handler is not None:
+ self._timeout_handler.cancel()
+ self._timeout_handler = None
+
+ def shift(self, delay: float) -> None:
+ """Advance timeout on delay seconds.
+
+ The delay can be negative.
+
+ Raise RuntimeError if shift is called when deadline is not scheduled
+ """
+ deadline = self._deadline
+ if deadline is None:
+ raise RuntimeError("cannot shift timeout if deadline is not scheduled")
+ self.update(deadline + delay)
+
+ def update(self, deadline: float) -> None:
+ """Set deadline to absolute value.
+
+ deadline argument points on the time in the same clock system
+ as loop.time().
+
+ If new deadline is in the past the timeout is raised immediately.
+
+ Please note: it is not POSIX time but a time with
+ undefined starting base, e.g. the time of the system power on.
+ """
+ if self._state == _State.EXIT:
+ raise RuntimeError("cannot reschedule after exit from context manager")
+ if self._state == _State.TIMEOUT:
+ raise RuntimeError("cannot reschedule expired timeout")
+ if self._timeout_handler is not None:
+ self._timeout_handler.cancel()
+ self._deadline = deadline
+ if self._state != _State.INIT:
+ self._reschedule()
+
+ def _reschedule(self) -> None:
+ assert self._state == _State.ENTER
+ deadline = self._deadline
+ if deadline is None:
+ return
+
+ now = self._loop.time()
+ if self._timeout_handler is not None:
+ self._timeout_handler.cancel()
+
+ task = asyncio.current_task()
+ if deadline <= now:
+ self._timeout_handler = self._loop.call_soon(self._on_timeout, task)
+ else:
+ self._timeout_handler = self._loop.call_at(deadline, self._on_timeout, task)
+
+ def _do_enter(self) -> None:
+ if self._state != _State.INIT:
+ raise RuntimeError(f"invalid state {self._state.value}")
+ self._state = _State.ENTER
+ self._reschedule()
+
+ def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None:
+ if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT:
+ self._timeout_handler = None
+ raise asyncio.TimeoutError
+ # timeout has not expired
+ self._state = _State.EXIT
+ self._reject()
+ return None
+
+ def _on_timeout(self, task: "asyncio.Task[None]") -> None:
+ task.cancel()
+ self._state = _State.TIMEOUT
+ # drop the reference early
+ self._timeout_handler = None
+
+
+# End https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py