summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py322
1 files changed, 322 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py b/venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py
new file mode 100644
index 0000000..99a68a3
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py
@@ -0,0 +1,322 @@
+# util/queue.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+"""An adaptation of Py2.3/2.4's Queue module which supports reentrant
+behavior, using RLock instead of Lock for its mutex object. The
+Queue object is used exclusively by the sqlalchemy.pool.QueuePool
+class.
+
+This is to support the connection pool's usage of weakref callbacks to return
+connections to the underlying Queue, which can in extremely
+rare cases be invoked within the ``get()`` method of the Queue itself,
+producing a ``put()`` inside the ``get()`` and therefore a reentrant
+condition.
+
+"""
+from __future__ import annotations
+
+import asyncio
+from collections import deque
+import threading
+from time import time as _time
+import typing
+from typing import Any
+from typing import Awaitable
+from typing import Deque
+from typing import Generic
+from typing import Optional
+from typing import TypeVar
+
+from .concurrency import await_fallback
+from .concurrency import await_only
+from .langhelpers import memoized_property
+
+
+_T = TypeVar("_T", bound=Any)
+__all__ = ["Empty", "Full", "Queue"]
+
+
+class Empty(Exception):
+ "Exception raised by Queue.get(block=0)/get_nowait()."
+
+ pass
+
+
+class Full(Exception):
+ "Exception raised by Queue.put(block=0)/put_nowait()."
+
+ pass
+
+
+class QueueCommon(Generic[_T]):
+ maxsize: int
+ use_lifo: bool
+
+ def __init__(self, maxsize: int = 0, use_lifo: bool = False): ...
+
+ def empty(self) -> bool:
+ raise NotImplementedError()
+
+ def full(self) -> bool:
+ raise NotImplementedError()
+
+ def qsize(self) -> int:
+ raise NotImplementedError()
+
+ def put_nowait(self, item: _T) -> None:
+ raise NotImplementedError()
+
+ def put(
+ self, item: _T, block: bool = True, timeout: Optional[float] = None
+ ) -> None:
+ raise NotImplementedError()
+
+ def get_nowait(self) -> _T:
+ raise NotImplementedError()
+
+ def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
+ raise NotImplementedError()
+
+
+class Queue(QueueCommon[_T]):
+ queue: Deque[_T]
+
+ def __init__(self, maxsize: int = 0, use_lifo: bool = False):
+ """Initialize a queue object with a given maximum size.
+
+ If `maxsize` is <= 0, the queue size is infinite.
+
+ If `use_lifo` is True, this Queue acts like a Stack (LIFO).
+ """
+
+ self._init(maxsize)
+ # mutex must be held whenever the queue is mutating. All methods
+ # that acquire mutex must release it before returning. mutex
+ # is shared between the two conditions, so acquiring and
+ # releasing the conditions also acquires and releases mutex.
+ self.mutex = threading.RLock()
+ # Notify not_empty whenever an item is added to the queue; a
+ # thread waiting to get is notified then.
+ self.not_empty = threading.Condition(self.mutex)
+ # Notify not_full whenever an item is removed from the queue;
+ # a thread waiting to put is notified then.
+ self.not_full = threading.Condition(self.mutex)
+ # If this queue uses LIFO or FIFO
+ self.use_lifo = use_lifo
+
+ def qsize(self) -> int:
+ """Return the approximate size of the queue (not reliable!)."""
+
+ with self.mutex:
+ return self._qsize()
+
+ def empty(self) -> bool:
+ """Return True if the queue is empty, False otherwise (not
+ reliable!)."""
+
+ with self.mutex:
+ return self._empty()
+
+ def full(self) -> bool:
+ """Return True if the queue is full, False otherwise (not
+ reliable!)."""
+
+ with self.mutex:
+ return self._full()
+
+ def put(
+ self, item: _T, block: bool = True, timeout: Optional[float] = None
+ ) -> None:
+ """Put an item into the queue.
+
+ If optional args `block` is True and `timeout` is None (the
+ default), block if necessary until a free slot is
+ available. If `timeout` is a positive number, it blocks at
+ most `timeout` seconds and raises the ``Full`` exception if no
+ free slot was available within that time. Otherwise (`block`
+ is false), put an item on the queue if a free slot is
+ immediately available, else raise the ``Full`` exception
+ (`timeout` is ignored in that case).
+ """
+
+ with self.not_full:
+ if not block:
+ if self._full():
+ raise Full
+ elif timeout is None:
+ while self._full():
+ self.not_full.wait()
+ else:
+ if timeout < 0:
+ raise ValueError("'timeout' must be a positive number")
+ endtime = _time() + timeout
+ while self._full():
+ remaining = endtime - _time()
+ if remaining <= 0.0:
+ raise Full
+ self.not_full.wait(remaining)
+ self._put(item)
+ self.not_empty.notify()
+
+ def put_nowait(self, item: _T) -> None:
+ """Put an item into the queue without blocking.
+
+ Only enqueue the item if a free slot is immediately available.
+ Otherwise raise the ``Full`` exception.
+ """
+ return self.put(item, False)
+
+ def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
+ """Remove and return an item from the queue.
+
+ If optional args `block` is True and `timeout` is None (the
+ default), block if necessary until an item is available. If
+ `timeout` is a positive number, it blocks at most `timeout`
+ seconds and raises the ``Empty`` exception if no item was
+ available within that time. Otherwise (`block` is false),
+ return an item if one is immediately available, else raise the
+ ``Empty`` exception (`timeout` is ignored in that case).
+
+ """
+ with self.not_empty:
+ if not block:
+ if self._empty():
+ raise Empty
+ elif timeout is None:
+ while self._empty():
+ self.not_empty.wait()
+ else:
+ if timeout < 0:
+ raise ValueError("'timeout' must be a positive number")
+ endtime = _time() + timeout
+ while self._empty():
+ remaining = endtime - _time()
+ if remaining <= 0.0:
+ raise Empty
+ self.not_empty.wait(remaining)
+ item = self._get()
+ self.not_full.notify()
+ return item
+
+ def get_nowait(self) -> _T:
+ """Remove and return an item from the queue without blocking.
+
+ Only get an item if one is immediately available. Otherwise
+ raise the ``Empty`` exception.
+ """
+
+ return self.get(False)
+
+ def _init(self, maxsize: int) -> None:
+ self.maxsize = maxsize
+ self.queue = deque()
+
+ def _qsize(self) -> int:
+ return len(self.queue)
+
+ def _empty(self) -> bool:
+ return not self.queue
+
+ def _full(self) -> bool:
+ return self.maxsize > 0 and len(self.queue) == self.maxsize
+
+ def _put(self, item: _T) -> None:
+ self.queue.append(item)
+
+ def _get(self) -> _T:
+ if self.use_lifo:
+ # LIFO
+ return self.queue.pop()
+ else:
+ # FIFO
+ return self.queue.popleft()
+
+
+class AsyncAdaptedQueue(QueueCommon[_T]):
+ if typing.TYPE_CHECKING:
+
+ @staticmethod
+ def await_(coroutine: Awaitable[Any]) -> _T: ...
+
+ else:
+ await_ = staticmethod(await_only)
+
+ def __init__(self, maxsize: int = 0, use_lifo: bool = False):
+ self.use_lifo = use_lifo
+ self.maxsize = maxsize
+
+ def empty(self) -> bool:
+ return self._queue.empty()
+
+ def full(self):
+ return self._queue.full()
+
+ def qsize(self):
+ return self._queue.qsize()
+
+ @memoized_property
+ def _queue(self) -> asyncio.Queue[_T]:
+ # Delay creation of the queue until it is first used, to avoid
+ # binding it to a possibly wrong event loop.
+ # By delaying the creation of the pool we accommodate the common
+ # usage pattern of instantiating the engine at module level, where a
+ # different event loop is in present compared to when the application
+ # is actually run.
+
+ queue: asyncio.Queue[_T]
+
+ if self.use_lifo:
+ queue = asyncio.LifoQueue(maxsize=self.maxsize)
+ else:
+ queue = asyncio.Queue(maxsize=self.maxsize)
+ return queue
+
+ def put_nowait(self, item: _T) -> None:
+ try:
+ self._queue.put_nowait(item)
+ except asyncio.QueueFull as err:
+ raise Full() from err
+
+ def put(
+ self, item: _T, block: bool = True, timeout: Optional[float] = None
+ ) -> None:
+ if not block:
+ return self.put_nowait(item)
+
+ try:
+ if timeout is not None:
+ self.await_(asyncio.wait_for(self._queue.put(item), timeout))
+ else:
+ self.await_(self._queue.put(item))
+ except (asyncio.QueueFull, asyncio.TimeoutError) as err:
+ raise Full() from err
+
+ def get_nowait(self) -> _T:
+ try:
+ return self._queue.get_nowait()
+ except asyncio.QueueEmpty as err:
+ raise Empty() from err
+
+ def get(self, block: bool = True, timeout: Optional[float] = None) -> _T:
+ if not block:
+ return self.get_nowait()
+
+ try:
+ if timeout is not None:
+ return self.await_(
+ asyncio.wait_for(self._queue.get(), timeout)
+ )
+ else:
+ return self.await_(self._queue.get())
+ except (asyncio.QueueEmpty, asyncio.TimeoutError) as err:
+ raise Empty() from err
+
+
+class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue[_T]):
+ if not typing.TYPE_CHECKING:
+ await_ = staticmethod(await_fallback)