diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py | 322 |
1 files changed, 0 insertions, 322 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py b/venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py deleted file mode 100644 index 99a68a3..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/util/queue.py +++ /dev/null @@ -1,322 +0,0 @@ -# util/queue.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: allow-untyped-defs, allow-untyped-calls - -"""An adaptation of Py2.3/2.4's Queue module which supports reentrant -behavior, using RLock instead of Lock for its mutex object. The -Queue object is used exclusively by the sqlalchemy.pool.QueuePool -class. - -This is to support the connection pool's usage of weakref callbacks to return -connections to the underlying Queue, which can in extremely -rare cases be invoked within the ``get()`` method of the Queue itself, -producing a ``put()`` inside the ``get()`` and therefore a reentrant -condition. - -""" -from __future__ import annotations - -import asyncio -from collections import deque -import threading -from time import time as _time -import typing -from typing import Any -from typing import Awaitable -from typing import Deque -from typing import Generic -from typing import Optional -from typing import TypeVar - -from .concurrency import await_fallback -from .concurrency import await_only -from .langhelpers import memoized_property - - -_T = TypeVar("_T", bound=Any) -__all__ = ["Empty", "Full", "Queue"] - - -class Empty(Exception): - "Exception raised by Queue.get(block=0)/get_nowait()." - - pass - - -class Full(Exception): - "Exception raised by Queue.put(block=0)/put_nowait()." - - pass - - -class QueueCommon(Generic[_T]): - maxsize: int - use_lifo: bool - - def __init__(self, maxsize: int = 0, use_lifo: bool = False): ... - - def empty(self) -> bool: - raise NotImplementedError() - - def full(self) -> bool: - raise NotImplementedError() - - def qsize(self) -> int: - raise NotImplementedError() - - def put_nowait(self, item: _T) -> None: - raise NotImplementedError() - - def put( - self, item: _T, block: bool = True, timeout: Optional[float] = None - ) -> None: - raise NotImplementedError() - - def get_nowait(self) -> _T: - raise NotImplementedError() - - def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: - raise NotImplementedError() - - -class Queue(QueueCommon[_T]): - queue: Deque[_T] - - def __init__(self, maxsize: int = 0, use_lifo: bool = False): - """Initialize a queue object with a given maximum size. - - If `maxsize` is <= 0, the queue size is infinite. - - If `use_lifo` is True, this Queue acts like a Stack (LIFO). - """ - - self._init(maxsize) - # mutex must be held whenever the queue is mutating. All methods - # that acquire mutex must release it before returning. mutex - # is shared between the two conditions, so acquiring and - # releasing the conditions also acquires and releases mutex. - self.mutex = threading.RLock() - # Notify not_empty whenever an item is added to the queue; a - # thread waiting to get is notified then. - self.not_empty = threading.Condition(self.mutex) - # Notify not_full whenever an item is removed from the queue; - # a thread waiting to put is notified then. - self.not_full = threading.Condition(self.mutex) - # If this queue uses LIFO or FIFO - self.use_lifo = use_lifo - - def qsize(self) -> int: - """Return the approximate size of the queue (not reliable!).""" - - with self.mutex: - return self._qsize() - - def empty(self) -> bool: - """Return True if the queue is empty, False otherwise (not - reliable!).""" - - with self.mutex: - return self._empty() - - def full(self) -> bool: - """Return True if the queue is full, False otherwise (not - reliable!).""" - - with self.mutex: - return self._full() - - def put( - self, item: _T, block: bool = True, timeout: Optional[float] = None - ) -> None: - """Put an item into the queue. - - If optional args `block` is True and `timeout` is None (the - default), block if necessary until a free slot is - available. If `timeout` is a positive number, it blocks at - most `timeout` seconds and raises the ``Full`` exception if no - free slot was available within that time. Otherwise (`block` - is false), put an item on the queue if a free slot is - immediately available, else raise the ``Full`` exception - (`timeout` is ignored in that case). - """ - - with self.not_full: - if not block: - if self._full(): - raise Full - elif timeout is None: - while self._full(): - self.not_full.wait() - else: - if timeout < 0: - raise ValueError("'timeout' must be a positive number") - endtime = _time() + timeout - while self._full(): - remaining = endtime - _time() - if remaining <= 0.0: - raise Full - self.not_full.wait(remaining) - self._put(item) - self.not_empty.notify() - - def put_nowait(self, item: _T) -> None: - """Put an item into the queue without blocking. - - Only enqueue the item if a free slot is immediately available. - Otherwise raise the ``Full`` exception. - """ - return self.put(item, False) - - def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: - """Remove and return an item from the queue. - - If optional args `block` is True and `timeout` is None (the - default), block if necessary until an item is available. If - `timeout` is a positive number, it blocks at most `timeout` - seconds and raises the ``Empty`` exception if no item was - available within that time. Otherwise (`block` is false), - return an item if one is immediately available, else raise the - ``Empty`` exception (`timeout` is ignored in that case). - - """ - with self.not_empty: - if not block: - if self._empty(): - raise Empty - elif timeout is None: - while self._empty(): - self.not_empty.wait() - else: - if timeout < 0: - raise ValueError("'timeout' must be a positive number") - endtime = _time() + timeout - while self._empty(): - remaining = endtime - _time() - if remaining <= 0.0: - raise Empty - self.not_empty.wait(remaining) - item = self._get() - self.not_full.notify() - return item - - def get_nowait(self) -> _T: - """Remove and return an item from the queue without blocking. - - Only get an item if one is immediately available. Otherwise - raise the ``Empty`` exception. - """ - - return self.get(False) - - def _init(self, maxsize: int) -> None: - self.maxsize = maxsize - self.queue = deque() - - def _qsize(self) -> int: - return len(self.queue) - - def _empty(self) -> bool: - return not self.queue - - def _full(self) -> bool: - return self.maxsize > 0 and len(self.queue) == self.maxsize - - def _put(self, item: _T) -> None: - self.queue.append(item) - - def _get(self) -> _T: - if self.use_lifo: - # LIFO - return self.queue.pop() - else: - # FIFO - return self.queue.popleft() - - -class AsyncAdaptedQueue(QueueCommon[_T]): - if typing.TYPE_CHECKING: - - @staticmethod - def await_(coroutine: Awaitable[Any]) -> _T: ... - - else: - await_ = staticmethod(await_only) - - def __init__(self, maxsize: int = 0, use_lifo: bool = False): - self.use_lifo = use_lifo - self.maxsize = maxsize - - def empty(self) -> bool: - return self._queue.empty() - - def full(self): - return self._queue.full() - - def qsize(self): - return self._queue.qsize() - - @memoized_property - def _queue(self) -> asyncio.Queue[_T]: - # Delay creation of the queue until it is first used, to avoid - # binding it to a possibly wrong event loop. - # By delaying the creation of the pool we accommodate the common - # usage pattern of instantiating the engine at module level, where a - # different event loop is in present compared to when the application - # is actually run. - - queue: asyncio.Queue[_T] - - if self.use_lifo: - queue = asyncio.LifoQueue(maxsize=self.maxsize) - else: - queue = asyncio.Queue(maxsize=self.maxsize) - return queue - - def put_nowait(self, item: _T) -> None: - try: - self._queue.put_nowait(item) - except asyncio.QueueFull as err: - raise Full() from err - - def put( - self, item: _T, block: bool = True, timeout: Optional[float] = None - ) -> None: - if not block: - return self.put_nowait(item) - - try: - if timeout is not None: - self.await_(asyncio.wait_for(self._queue.put(item), timeout)) - else: - self.await_(self._queue.put(item)) - except (asyncio.QueueFull, asyncio.TimeoutError) as err: - raise Full() from err - - def get_nowait(self) -> _T: - try: - return self._queue.get_nowait() - except asyncio.QueueEmpty as err: - raise Empty() from err - - def get(self, block: bool = True, timeout: Optional[float] = None) -> _T: - if not block: - return self.get_nowait() - - try: - if timeout is not None: - return self.await_( - asyncio.wait_for(self._queue.get(), timeout) - ) - else: - return self.await_(self._queue.get()) - except (asyncio.QueueEmpty, asyncio.TimeoutError) as err: - raise Empty() from err - - -class FallbackAsyncAdaptedQueue(AsyncAdaptedQueue[_T]): - if not typing.TYPE_CHECKING: - await_ = staticmethod(await_fallback) |