diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/pool')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/pool/__init__.py | 44 | ||||
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/__init__.cpython-311.pyc | bin | 0 -> 1878 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/base.cpython-311.pyc | bin | 0 -> 59317 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/events.cpython-311.pyc | bin | 0 -> 14483 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/impl.cpython-311.pyc | bin | 0 -> 27550 bytes | |||
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py | 1515 | ||||
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/pool/events.py | 370 | ||||
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/pool/impl.py | 581 |
8 files changed, 2510 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/pool/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__init__.py new file mode 100644 index 0000000..29fd652 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__init__.py @@ -0,0 +1,44 @@ +# pool/__init__.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + + +"""Connection pooling for DB-API connections. + +Provides a number of connection pool implementations for a variety of +usage scenarios and thread behavior requirements imposed by the +application, DB-API or database itself. + +Also provides a DB-API 2.0 connection proxying mechanism allowing +regular DB-API connect() methods to be transparently managed by a +SQLAlchemy connection pool. +""" + +from . import events +from .base import _AdhocProxiedConnection as _AdhocProxiedConnection +from .base import _ConnectionFairy as _ConnectionFairy +from .base import _ConnectionRecord +from .base import _CreatorFnType as _CreatorFnType +from .base import _CreatorWRecFnType as _CreatorWRecFnType +from .base import _finalize_fairy +from .base import _ResetStyleArgType as _ResetStyleArgType +from .base import ConnectionPoolEntry as ConnectionPoolEntry +from .base import ManagesConnection as ManagesConnection +from .base import Pool as Pool +from .base import PoolProxiedConnection as PoolProxiedConnection +from .base import PoolResetState as PoolResetState +from .base import reset_commit as reset_commit +from .base import reset_none as reset_none +from .base import reset_rollback as reset_rollback +from .impl import AssertionPool as AssertionPool +from .impl import AsyncAdaptedQueuePool as AsyncAdaptedQueuePool +from .impl import ( + FallbackAsyncAdaptedQueuePool as FallbackAsyncAdaptedQueuePool, +) +from .impl import NullPool as NullPool +from .impl import QueuePool as QueuePool +from .impl import SingletonThreadPool as SingletonThreadPool +from .impl import StaticPool as StaticPool diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..6bb2901 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/base.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..14fc526 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/base.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/events.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/events.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..0fb8362 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/events.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/impl.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/impl.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..7939b43 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/pool/__pycache__/impl.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py new file mode 100644 index 0000000..98d2027 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py @@ -0,0 +1,1515 @@ +# pool/base.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + + +"""Base constructs for connection pools. + +""" + +from __future__ import annotations + +from collections import deque +import dataclasses +from enum import Enum +import threading +import time +import typing +from typing import Any +from typing import Callable +from typing import cast +from typing import Deque +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple +from typing import TYPE_CHECKING +from typing import Union +import weakref + +from .. import event +from .. import exc +from .. import log +from .. import util +from ..util.typing import Literal +from ..util.typing import Protocol + +if TYPE_CHECKING: + from ..engine.interfaces import DBAPIConnection + from ..engine.interfaces import DBAPICursor + from ..engine.interfaces import Dialect + from ..event import _DispatchCommon + from ..event import _ListenerFnType + from ..event import dispatcher + from ..sql._typing import _InfoType + + +@dataclasses.dataclass(frozen=True) +class PoolResetState: + """describes the state of a DBAPI connection as it is being passed to + the :meth:`.PoolEvents.reset` connection pool event. + + .. versionadded:: 2.0.0b3 + + """ + + __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe") + + transaction_was_reset: bool + """Indicates if the transaction on the DBAPI connection was already + essentially "reset" back by the :class:`.Connection` object. + + This boolean is True if the :class:`.Connection` had transactional + state present upon it, which was then not closed using the + :meth:`.Connection.rollback` or :meth:`.Connection.commit` method; + instead, the transaction was closed inline within the + :meth:`.Connection.close` method so is guaranteed to remain non-present + when this event is reached. + + """ + + terminate_only: bool + """indicates if the connection is to be immediately terminated and + not checked in to the pool. + + This occurs for connections that were invalidated, as well as asyncio + connections that were not cleanly handled by the calling code that + are instead being garbage collected. In the latter case, + operations can't be safely run on asyncio connections within garbage + collection as there is not necessarily an event loop present. + + """ + + asyncio_safe: bool + """Indicates if the reset operation is occurring within a scope where + an enclosing event loop is expected to be present for asyncio applications. + + Will be False in the case that the connection is being garbage collected. + + """ + + +class ResetStyle(Enum): + """Describe options for "reset on return" behaviors.""" + + reset_rollback = 0 + reset_commit = 1 + reset_none = 2 + + +_ResetStyleArgType = Union[ + ResetStyle, + Literal[True, None, False, "commit", "rollback"], +] +reset_rollback, reset_commit, reset_none = list(ResetStyle) + + +class _ConnDialect: + """partial implementation of :class:`.Dialect` + which provides DBAPI connection methods. + + When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`, + the :class:`_engine.Engine` replaces this with its own + :class:`.Dialect`. + + """ + + is_async = False + has_terminate = False + + def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None: + dbapi_connection.rollback() + + def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None: + dbapi_connection.commit() + + def do_terminate(self, dbapi_connection: DBAPIConnection) -> None: + dbapi_connection.close() + + def do_close(self, dbapi_connection: DBAPIConnection) -> None: + dbapi_connection.close() + + def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool: + raise NotImplementedError( + "The ping feature requires that a dialect is " + "passed to the connection pool." + ) + + def get_driver_connection(self, connection: DBAPIConnection) -> Any: + return connection + + +class _AsyncConnDialect(_ConnDialect): + is_async = True + + +class _CreatorFnType(Protocol): + def __call__(self) -> DBAPIConnection: ... + + +class _CreatorWRecFnType(Protocol): + def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ... + + +class Pool(log.Identified, event.EventTarget): + """Abstract base class for connection pools.""" + + dispatch: dispatcher[Pool] + echo: log._EchoFlagType + + _orig_logging_name: Optional[str] + _dialect: Union[_ConnDialect, Dialect] = _ConnDialect() + _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType] + _invoke_creator: _CreatorWRecFnType + _invalidate_time: float + + def __init__( + self, + creator: Union[_CreatorFnType, _CreatorWRecFnType], + recycle: int = -1, + echo: log._EchoFlagType = None, + logging_name: Optional[str] = None, + reset_on_return: _ResetStyleArgType = True, + events: Optional[List[Tuple[_ListenerFnType, str]]] = None, + dialect: Optional[Union[_ConnDialect, Dialect]] = None, + pre_ping: bool = False, + _dispatch: Optional[_DispatchCommon[Pool]] = None, + ): + """ + Construct a Pool. + + :param creator: a callable function that returns a DB-API + connection object. The function will be called with + parameters. + + :param recycle: If set to a value other than -1, number of + seconds between connection recycling, which means upon + checkout, if this timeout is surpassed the connection will be + closed and replaced with a newly opened connection. Defaults to -1. + + :param logging_name: String identifier which will be used within + the "name" field of logging records generated within the + "sqlalchemy.pool" logger. Defaults to a hexstring of the object's + id. + + :param echo: if True, the connection pool will log + informational output such as when connections are invalidated + as well as when connections are recycled to the default log handler, + which defaults to ``sys.stdout`` for output.. If set to the string + ``"debug"``, the logging will include pool checkouts and checkins. + + The :paramref:`_pool.Pool.echo` parameter can also be set from the + :func:`_sa.create_engine` call by using the + :paramref:`_sa.create_engine.echo_pool` parameter. + + .. seealso:: + + :ref:`dbengine_logging` - further detail on how to configure + logging. + + :param reset_on_return: Determine steps to take on + connections as they are returned to the pool, which were + not otherwise handled by a :class:`_engine.Connection`. + Available from :func:`_sa.create_engine` via the + :paramref:`_sa.create_engine.pool_reset_on_return` parameter. + + :paramref:`_pool.Pool.reset_on_return` can have any of these values: + + * ``"rollback"`` - call rollback() on the connection, + to release locks and transaction resources. + This is the default value. The vast majority + of use cases should leave this value set. + * ``"commit"`` - call commit() on the connection, + to release locks and transaction resources. + A commit here may be desirable for databases that + cache query plans if a commit is emitted, + such as Microsoft SQL Server. However, this + value is more dangerous than 'rollback' because + any data changes present on the transaction + are committed unconditionally. + * ``None`` - don't do anything on the connection. + This setting may be appropriate if the database / DBAPI + works in pure "autocommit" mode at all times, or if + a custom reset handler is established using the + :meth:`.PoolEvents.reset` event handler. + + * ``True`` - same as 'rollback', this is here for + backwards compatibility. + * ``False`` - same as None, this is here for + backwards compatibility. + + For further customization of reset on return, the + :meth:`.PoolEvents.reset` event hook may be used which can perform + any connection activity desired on reset. + + .. seealso:: + + :ref:`pool_reset_on_return` + + :meth:`.PoolEvents.reset` + + :param events: a list of 2-tuples, each of the form + ``(callable, target)`` which will be passed to :func:`.event.listen` + upon construction. Provided here so that event listeners + can be assigned via :func:`_sa.create_engine` before dialect-level + listeners are applied. + + :param dialect: a :class:`.Dialect` that will handle the job + of calling rollback(), close(), or commit() on DBAPI connections. + If omitted, a built-in "stub" dialect is used. Applications that + make use of :func:`_sa.create_engine` should not use this parameter + as it is handled by the engine creation strategy. + + :param pre_ping: if True, the pool will emit a "ping" (typically + "SELECT 1", but is dialect-specific) on the connection + upon checkout, to test if the connection is alive or not. If not, + the connection is transparently re-connected and upon success, all + other pooled connections established prior to that timestamp are + invalidated. Requires that a dialect is passed as well to + interpret the disconnection error. + + .. versionadded:: 1.2 + + """ + if logging_name: + self.logging_name = self._orig_logging_name = logging_name + else: + self._orig_logging_name = None + + log.instance_logger(self, echoflag=echo) + self._creator = creator + self._recycle = recycle + self._invalidate_time = 0 + self._pre_ping = pre_ping + self._reset_on_return = util.parse_user_argument_for_enum( + reset_on_return, + { + ResetStyle.reset_rollback: ["rollback", True], + ResetStyle.reset_none: ["none", None, False], + ResetStyle.reset_commit: ["commit"], + }, + "reset_on_return", + ) + + self.echo = echo + + if _dispatch: + self.dispatch._update(_dispatch, only_propagate=False) + if dialect: + self._dialect = dialect + if events: + for fn, target in events: + event.listen(self, target, fn) + + @util.hybridproperty + def _is_asyncio(self) -> bool: + return self._dialect.is_async + + @property + def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]: + return self._creator_arg + + @_creator.setter + def _creator( + self, creator: Union[_CreatorFnType, _CreatorWRecFnType] + ) -> None: + self._creator_arg = creator + + # mypy seems to get super confused assigning functions to + # attributes + self._invoke_creator = self._should_wrap_creator(creator) + + @_creator.deleter + def _creator(self) -> None: + # needed for mock testing + del self._creator_arg + del self._invoke_creator + + def _should_wrap_creator( + self, creator: Union[_CreatorFnType, _CreatorWRecFnType] + ) -> _CreatorWRecFnType: + """Detect if creator accepts a single argument, or is sent + as a legacy style no-arg function. + + """ + + try: + argspec = util.get_callable_argspec(self._creator, no_self=True) + except TypeError: + creator_fn = cast(_CreatorFnType, creator) + return lambda rec: creator_fn() + + if argspec.defaults is not None: + defaulted = len(argspec.defaults) + else: + defaulted = 0 + positionals = len(argspec[0]) - defaulted + + # look for the exact arg signature that DefaultStrategy + # sends us + if (argspec[0], argspec[3]) == (["connection_record"], (None,)): + return cast(_CreatorWRecFnType, creator) + # or just a single positional + elif positionals == 1: + return cast(_CreatorWRecFnType, creator) + # all other cases, just wrap and assume legacy "creator" callable + # thing + else: + creator_fn = cast(_CreatorFnType, creator) + return lambda rec: creator_fn() + + def _close_connection( + self, connection: DBAPIConnection, *, terminate: bool = False + ) -> None: + self.logger.debug( + "%s connection %r", + "Hard-closing" if terminate else "Closing", + connection, + ) + try: + if terminate: + self._dialect.do_terminate(connection) + else: + self._dialect.do_close(connection) + except BaseException as e: + self.logger.error( + f"Exception {'terminating' if terminate else 'closing'} " + f"connection %r", + connection, + exc_info=True, + ) + if not isinstance(e, Exception): + raise + + def _create_connection(self) -> ConnectionPoolEntry: + """Called by subclasses to create a new ConnectionRecord.""" + + return _ConnectionRecord(self) + + def _invalidate( + self, + connection: PoolProxiedConnection, + exception: Optional[BaseException] = None, + _checkin: bool = True, + ) -> None: + """Mark all connections established within the generation + of the given connection as invalidated. + + If this pool's last invalidate time is before when the given + connection was created, update the timestamp til now. Otherwise, + no action is performed. + + Connections with a start time prior to this pool's invalidation + time will be recycled upon next checkout. + """ + rec = getattr(connection, "_connection_record", None) + if not rec or self._invalidate_time < rec.starttime: + self._invalidate_time = time.time() + if _checkin and getattr(connection, "is_valid", False): + connection.invalidate(exception) + + def recreate(self) -> Pool: + """Return a new :class:`_pool.Pool`, of the same class as this one + and configured with identical creation arguments. + + This method is used in conjunction with :meth:`dispose` + to close out an entire :class:`_pool.Pool` and create a new one in + its place. + + """ + + raise NotImplementedError() + + def dispose(self) -> None: + """Dispose of this pool. + + This method leaves the possibility of checked-out connections + remaining open, as it only affects connections that are + idle in the pool. + + .. seealso:: + + :meth:`Pool.recreate` + + """ + + raise NotImplementedError() + + def connect(self) -> PoolProxiedConnection: + """Return a DBAPI connection from the pool. + + The connection is instrumented such that when its + ``close()`` method is called, the connection will be returned to + the pool. + + """ + return _ConnectionFairy._checkout(self) + + def _return_conn(self, record: ConnectionPoolEntry) -> None: + """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`. + + This method is called when an instrumented DBAPI connection + has its ``close()`` method called. + + """ + self._do_return_conn(record) + + def _do_get(self) -> ConnectionPoolEntry: + """Implementation for :meth:`get`, supplied by subclasses.""" + + raise NotImplementedError() + + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: + """Implementation for :meth:`return_conn`, supplied by subclasses.""" + + raise NotImplementedError() + + def status(self) -> str: + raise NotImplementedError() + + +class ManagesConnection: + """Common base for the two connection-management interfaces + :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`. + + These two objects are typically exposed in the public facing API + via the connection pool event hooks, documented at :class:`.PoolEvents`. + + .. versionadded:: 2.0 + + """ + + __slots__ = () + + dbapi_connection: Optional[DBAPIConnection] + """A reference to the actual DBAPI connection being tracked. + + This is a :pep:`249`-compliant object that for traditional sync-style + dialects is provided by the third-party + DBAPI implementation in use. For asyncio dialects, the implementation + is typically an adapter object provided by the SQLAlchemy dialect + itself; the underlying asyncio object is available via the + :attr:`.ManagesConnection.driver_connection` attribute. + + SQLAlchemy's interface for the DBAPI connection is based on the + :class:`.DBAPIConnection` protocol object + + .. seealso:: + + :attr:`.ManagesConnection.driver_connection` + + :ref:`faq_dbapi_connection` + + """ + + driver_connection: Optional[Any] + """The "driver level" connection object as used by the Python + DBAPI or database driver. + + For traditional :pep:`249` DBAPI implementations, this object will + be the same object as that of + :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database + driver, this will be the ultimate "connection" object used by that + driver, such as the ``asyncpg.Connection`` object which will not have + standard pep-249 methods. + + .. versionadded:: 1.4.24 + + .. seealso:: + + :attr:`.ManagesConnection.dbapi_connection` + + :ref:`faq_dbapi_connection` + + """ + + @util.ro_memoized_property + def info(self) -> _InfoType: + """Info dictionary associated with the underlying DBAPI connection + referred to by this :class:`.ManagesConnection` instance, allowing + user-defined data to be associated with the connection. + + The data in this dictionary is persistent for the lifespan + of the DBAPI connection itself, including across pool checkins + and checkouts. When the connection is invalidated + and replaced with a new one, this dictionary is cleared. + + For a :class:`.PoolProxiedConnection` instance that's not associated + with a :class:`.ConnectionPoolEntry`, such as if it were detached, the + attribute returns a dictionary that is local to that + :class:`.ConnectionPoolEntry`. Therefore the + :attr:`.ManagesConnection.info` attribute will always provide a Python + dictionary. + + .. seealso:: + + :attr:`.ManagesConnection.record_info` + + + """ + raise NotImplementedError() + + @util.ro_memoized_property + def record_info(self) -> Optional[_InfoType]: + """Persistent info dictionary associated with this + :class:`.ManagesConnection`. + + Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan + of this dictionary is that of the :class:`.ConnectionPoolEntry` + which owns it; therefore this dictionary will persist across + reconnects and connection invalidation for a particular entry + in the connection pool. + + For a :class:`.PoolProxiedConnection` instance that's not associated + with a :class:`.ConnectionPoolEntry`, such as if it were detached, the + attribute returns None. Contrast to the :attr:`.ManagesConnection.info` + dictionary which is never None. + + + .. seealso:: + + :attr:`.ManagesConnection.info` + + """ + raise NotImplementedError() + + def invalidate( + self, e: Optional[BaseException] = None, soft: bool = False + ) -> None: + """Mark the managed connection as invalidated. + + :param e: an exception object indicating a reason for the invalidation. + + :param soft: if True, the connection isn't closed; instead, this + connection will be recycled on next checkout. + + .. seealso:: + + :ref:`pool_connection_invalidation` + + + """ + raise NotImplementedError() + + +class ConnectionPoolEntry(ManagesConnection): + """Interface for the object that maintains an individual database + connection on behalf of a :class:`_pool.Pool` instance. + + The :class:`.ConnectionPoolEntry` object represents the long term + maintainance of a particular connection for a pool, including expiring or + invalidating that connection to have it replaced with a new one, which will + continue to be maintained by that same :class:`.ConnectionPoolEntry` + instance. Compared to :class:`.PoolProxiedConnection`, which is the + short-term, per-checkout connection manager, this object lasts for the + lifespan of a particular "slot" within a connection pool. + + The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing + API code when it is delivered to connection pool event hooks, such as + :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`. + + .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public + facing interface for the :class:`._ConnectionRecord` internal class. + + """ + + __slots__ = () + + @property + def in_use(self) -> bool: + """Return True the connection is currently checked out""" + + raise NotImplementedError() + + def close(self) -> None: + """Close the DBAPI connection managed by this connection pool entry.""" + raise NotImplementedError() + + +class _ConnectionRecord(ConnectionPoolEntry): + """Maintains a position in a connection pool which references a pooled + connection. + + This is an internal object used by the :class:`_pool.Pool` implementation + to provide context management to a DBAPI connection maintained by + that :class:`_pool.Pool`. The public facing interface for this class + is described by the :class:`.ConnectionPoolEntry` class. See that + class for public API details. + + .. seealso:: + + :class:`.ConnectionPoolEntry` + + :class:`.PoolProxiedConnection` + + """ + + __slots__ = ( + "__pool", + "fairy_ref", + "finalize_callback", + "fresh", + "starttime", + "dbapi_connection", + "__weakref__", + "__dict__", + ) + + finalize_callback: Deque[Callable[[DBAPIConnection], None]] + fresh: bool + fairy_ref: Optional[weakref.ref[_ConnectionFairy]] + starttime: float + + def __init__(self, pool: Pool, connect: bool = True): + self.fresh = False + self.fairy_ref = None + self.starttime = 0 + self.dbapi_connection = None + + self.__pool = pool + if connect: + self.__connect() + self.finalize_callback = deque() + + dbapi_connection: Optional[DBAPIConnection] + + @property + def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501 + if self.dbapi_connection is None: + return None + else: + return self.__pool._dialect.get_driver_connection( + self.dbapi_connection + ) + + @property + @util.deprecated( + "2.0", + "The _ConnectionRecord.connection attribute is deprecated; " + "please use 'driver_connection'", + ) + def connection(self) -> Optional[DBAPIConnection]: + return self.dbapi_connection + + _soft_invalidate_time: float = 0 + + @util.ro_memoized_property + def info(self) -> _InfoType: + return {} + + @util.ro_memoized_property + def record_info(self) -> Optional[_InfoType]: + return {} + + @classmethod + def checkout(cls, pool: Pool) -> _ConnectionFairy: + if TYPE_CHECKING: + rec = cast(_ConnectionRecord, pool._do_get()) + else: + rec = pool._do_get() + + try: + dbapi_connection = rec.get_connection() + except BaseException as err: + with util.safe_reraise(): + rec._checkin_failed(err, _fairy_was_created=False) + + # not reached, for code linters only + raise + + echo = pool._should_log_debug() + fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo) + + rec.fairy_ref = ref = weakref.ref( + fairy, + lambda ref: ( + _finalize_fairy( + None, rec, pool, ref, echo, transaction_was_reset=False + ) + if _finalize_fairy is not None + else None + ), + ) + _strong_ref_connection_records[ref] = rec + if echo: + pool.logger.debug( + "Connection %r checked out from pool", dbapi_connection + ) + return fairy + + def _checkin_failed( + self, err: BaseException, _fairy_was_created: bool = True + ) -> None: + self.invalidate(e=err) + self.checkin( + _fairy_was_created=_fairy_was_created, + ) + + def checkin(self, _fairy_was_created: bool = True) -> None: + if self.fairy_ref is None and _fairy_was_created: + # _fairy_was_created is False for the initial get connection phase; + # meaning there was no _ConnectionFairy and we must unconditionally + # do a checkin. + # + # otherwise, if fairy_was_created==True, if fairy_ref is None here + # that means we were checked in already, so this looks like + # a double checkin. + util.warn("Double checkin attempted on %s" % self) + return + self.fairy_ref = None + connection = self.dbapi_connection + pool = self.__pool + while self.finalize_callback: + finalizer = self.finalize_callback.pop() + if connection is not None: + finalizer(connection) + if pool.dispatch.checkin: + pool.dispatch.checkin(connection, self) + + pool._return_conn(self) + + @property + def in_use(self) -> bool: + return self.fairy_ref is not None + + @property + def last_connect_time(self) -> float: + return self.starttime + + def close(self) -> None: + if self.dbapi_connection is not None: + self.__close() + + def invalidate( + self, e: Optional[BaseException] = None, soft: bool = False + ) -> None: + # already invalidated + if self.dbapi_connection is None: + return + if soft: + self.__pool.dispatch.soft_invalidate( + self.dbapi_connection, self, e + ) + else: + self.__pool.dispatch.invalidate(self.dbapi_connection, self, e) + if e is not None: + self.__pool.logger.info( + "%sInvalidate connection %r (reason: %s:%s)", + "Soft " if soft else "", + self.dbapi_connection, + e.__class__.__name__, + e, + ) + else: + self.__pool.logger.info( + "%sInvalidate connection %r", + "Soft " if soft else "", + self.dbapi_connection, + ) + + if soft: + self._soft_invalidate_time = time.time() + else: + self.__close(terminate=True) + self.dbapi_connection = None + + def get_connection(self) -> DBAPIConnection: + recycle = False + + # NOTE: the various comparisons here are assuming that measurable time + # passes between these state changes. however, time.time() is not + # guaranteed to have sub-second precision. comparisons of + # "invalidation time" to "starttime" should perhaps use >= so that the + # state change can take place assuming no measurable time has passed, + # however this does not guarantee correct behavior here as if time + # continues to not pass, it will try to reconnect repeatedly until + # these timestamps diverge, so in that sense using > is safer. Per + # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be + # within 16 milliseconds accuracy, so unit tests for connection + # invalidation need a sleep of at least this long between initial start + # time and invalidation for the logic below to work reliably. + + if self.dbapi_connection is None: + self.info.clear() + self.__connect() + elif ( + self.__pool._recycle > -1 + and time.time() - self.starttime > self.__pool._recycle + ): + self.__pool.logger.info( + "Connection %r exceeded timeout; recycling", + self.dbapi_connection, + ) + recycle = True + elif self.__pool._invalidate_time > self.starttime: + self.__pool.logger.info( + "Connection %r invalidated due to pool invalidation; " + + "recycling", + self.dbapi_connection, + ) + recycle = True + elif self._soft_invalidate_time > self.starttime: + self.__pool.logger.info( + "Connection %r invalidated due to local soft invalidation; " + + "recycling", + self.dbapi_connection, + ) + recycle = True + + if recycle: + self.__close(terminate=True) + self.info.clear() + + self.__connect() + + assert self.dbapi_connection is not None + return self.dbapi_connection + + def _is_hard_or_soft_invalidated(self) -> bool: + return ( + self.dbapi_connection is None + or self.__pool._invalidate_time > self.starttime + or (self._soft_invalidate_time > self.starttime) + ) + + def __close(self, *, terminate: bool = False) -> None: + self.finalize_callback.clear() + if self.__pool.dispatch.close: + self.__pool.dispatch.close(self.dbapi_connection, self) + assert self.dbapi_connection is not None + self.__pool._close_connection( + self.dbapi_connection, terminate=terminate + ) + self.dbapi_connection = None + + def __connect(self) -> None: + pool = self.__pool + + # ensure any existing connection is removed, so that if + # creator fails, this attribute stays None + self.dbapi_connection = None + try: + self.starttime = time.time() + self.dbapi_connection = connection = pool._invoke_creator(self) + pool.logger.debug("Created new connection %r", connection) + self.fresh = True + except BaseException as e: + with util.safe_reraise(): + pool.logger.debug("Error on connect(): %s", e) + else: + # in SQLAlchemy 1.4 the first_connect event is not used by + # the engine, so this will usually not be set + if pool.dispatch.first_connect: + pool.dispatch.first_connect.for_modify( + pool.dispatch + ).exec_once_unless_exception(self.dbapi_connection, self) + + # init of the dialect now takes place within the connect + # event, so ensure a mutex is used on the first run + pool.dispatch.connect.for_modify( + pool.dispatch + )._exec_w_sync_on_first_run(self.dbapi_connection, self) + + +def _finalize_fairy( + dbapi_connection: Optional[DBAPIConnection], + connection_record: Optional[_ConnectionRecord], + pool: Pool, + ref: Optional[ + weakref.ref[_ConnectionFairy] + ], # this is None when called directly, not by the gc + echo: Optional[log._EchoFlagType], + transaction_was_reset: bool = False, + fairy: Optional[_ConnectionFairy] = None, +) -> None: + """Cleanup for a :class:`._ConnectionFairy` whether or not it's already + been garbage collected. + + When using an async dialect no IO can happen here (without using + a dedicated thread), since this is called outside the greenlet + context and with an already running loop. In this case function + will only log a message and raise a warning. + """ + + is_gc_cleanup = ref is not None + + if is_gc_cleanup: + assert ref is not None + _strong_ref_connection_records.pop(ref, None) + assert connection_record is not None + if connection_record.fairy_ref is not ref: + return + assert dbapi_connection is None + dbapi_connection = connection_record.dbapi_connection + + elif fairy: + _strong_ref_connection_records.pop(weakref.ref(fairy), None) + + # null pool is not _is_asyncio but can be used also with async dialects + dont_restore_gced = pool._dialect.is_async + + if dont_restore_gced: + detach = connection_record is None or is_gc_cleanup + can_manipulate_connection = not is_gc_cleanup + can_close_or_terminate_connection = ( + not pool._dialect.is_async or pool._dialect.has_terminate + ) + requires_terminate_for_close = ( + pool._dialect.is_async and pool._dialect.has_terminate + ) + + else: + detach = connection_record is None + can_manipulate_connection = can_close_or_terminate_connection = True + requires_terminate_for_close = False + + if dbapi_connection is not None: + if connection_record and echo: + pool.logger.debug( + "Connection %r being returned to pool", dbapi_connection + ) + + try: + if not fairy: + assert connection_record is not None + fairy = _ConnectionFairy( + pool, + dbapi_connection, + connection_record, + echo, + ) + assert fairy.dbapi_connection is dbapi_connection + + fairy._reset( + pool, + transaction_was_reset=transaction_was_reset, + terminate_only=detach, + asyncio_safe=can_manipulate_connection, + ) + + if detach: + if connection_record: + fairy._pool = pool + fairy.detach() + + if can_close_or_terminate_connection: + if pool.dispatch.close_detached: + pool.dispatch.close_detached(dbapi_connection) + + pool._close_connection( + dbapi_connection, + terminate=requires_terminate_for_close, + ) + + except BaseException as e: + pool.logger.error( + "Exception during reset or similar", exc_info=True + ) + if connection_record: + connection_record.invalidate(e=e) + if not isinstance(e, Exception): + raise + finally: + if detach and is_gc_cleanup and dont_restore_gced: + message = ( + "The garbage collector is trying to clean up " + f"non-checked-in connection {dbapi_connection!r}, " + f"""which will be { + 'dropped, as it cannot be safely terminated' + if not can_close_or_terminate_connection + else 'terminated' + }. """ + "Please ensure that SQLAlchemy pooled connections are " + "returned to " + "the pool explicitly, either by calling ``close()`` " + "or by using appropriate context managers to manage " + "their lifecycle." + ) + pool.logger.error(message) + util.warn(message) + + if connection_record and connection_record.fairy_ref is not None: + connection_record.checkin() + + # give gc some help. See + # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True] + # which actually started failing when pytest warnings plugin was + # turned on, due to util.warn() above + if fairy is not None: + fairy.dbapi_connection = None # type: ignore + fairy._connection_record = None + del dbapi_connection + del connection_record + del fairy + + +# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that +# GC under pypy will call ConnectionFairy finalizers. linked directly to the +# weakref that will empty itself when collected so that it should not create +# any unmanaged memory references. +_strong_ref_connection_records: Dict[ + weakref.ref[_ConnectionFairy], _ConnectionRecord +] = {} + + +class PoolProxiedConnection(ManagesConnection): + """A connection-like adapter for a :pep:`249` DBAPI connection, which + includes additional methods specific to the :class:`.Pool` implementation. + + :class:`.PoolProxiedConnection` is the public-facing interface for the + internal :class:`._ConnectionFairy` implementation object; users familiar + with :class:`._ConnectionFairy` can consider this object to be equivalent. + + .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public- + facing interface for the :class:`._ConnectionFairy` internal class. + + """ + + __slots__ = () + + if typing.TYPE_CHECKING: + + def commit(self) -> None: ... + + def cursor(self) -> DBAPICursor: ... + + def rollback(self) -> None: ... + + @property + def is_valid(self) -> bool: + """Return True if this :class:`.PoolProxiedConnection` still refers + to an active DBAPI connection.""" + + raise NotImplementedError() + + @property + def is_detached(self) -> bool: + """Return True if this :class:`.PoolProxiedConnection` is detached + from its pool.""" + + raise NotImplementedError() + + def detach(self) -> None: + """Separate this connection from its Pool. + + This means that the connection will no longer be returned to the + pool when closed, and will instead be literally closed. The + associated :class:`.ConnectionPoolEntry` is de-associated from this + DBAPI connection. + + Note that any overall connection limiting constraints imposed by a + Pool implementation may be violated after a detach, as the detached + connection is removed from the pool's knowledge and control. + + """ + + raise NotImplementedError() + + def close(self) -> None: + """Release this connection back to the pool. + + The :meth:`.PoolProxiedConnection.close` method shadows the + :pep:`249` ``.close()`` method, altering its behavior to instead + :term:`release` the proxied connection back to the connection pool. + + Upon release to the pool, whether the connection stays "opened" and + pooled in the Python process, versus actually closed out and removed + from the Python process, is based on the pool implementation in use and + its configuration and current state. + + """ + raise NotImplementedError() + + +class _AdhocProxiedConnection(PoolProxiedConnection): + """provides the :class:`.PoolProxiedConnection` interface for cases where + the DBAPI connection is not actually proxied. + + This is used by the engine internals to pass a consistent + :class:`.PoolProxiedConnection` object to consuming dialects in response to + pool events that may not always have the :class:`._ConnectionFairy` + available. + + """ + + __slots__ = ("dbapi_connection", "_connection_record", "_is_valid") + + dbapi_connection: DBAPIConnection + _connection_record: ConnectionPoolEntry + + def __init__( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ): + self.dbapi_connection = dbapi_connection + self._connection_record = connection_record + self._is_valid = True + + @property + def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125 + return self._connection_record.driver_connection + + @property + def connection(self) -> DBAPIConnection: + return self.dbapi_connection + + @property + def is_valid(self) -> bool: + """Implement is_valid state attribute. + + for the adhoc proxied connection it's assumed the connection is valid + as there is no "invalidate" routine. + + """ + return self._is_valid + + def invalidate( + self, e: Optional[BaseException] = None, soft: bool = False + ) -> None: + self._is_valid = False + + @util.ro_non_memoized_property + def record_info(self) -> Optional[_InfoType]: + return self._connection_record.record_info + + def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: + return self.dbapi_connection.cursor(*args, **kwargs) + + def __getattr__(self, key: Any) -> Any: + return getattr(self.dbapi_connection, key) + + +class _ConnectionFairy(PoolProxiedConnection): + """Proxies a DBAPI connection and provides return-on-dereference + support. + + This is an internal object used by the :class:`_pool.Pool` implementation + to provide context management to a DBAPI connection delivered by + that :class:`_pool.Pool`. The public facing interface for this class + is described by the :class:`.PoolProxiedConnection` class. See that + class for public API details. + + The name "fairy" is inspired by the fact that the + :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts + only for the length of a specific DBAPI connection being checked out from + the pool, and additionally that as a transparent proxy, it is mostly + invisible. + + .. seealso:: + + :class:`.PoolProxiedConnection` + + :class:`.ConnectionPoolEntry` + + + """ + + __slots__ = ( + "dbapi_connection", + "_connection_record", + "_echo", + "_pool", + "_counter", + "__weakref__", + "__dict__", + ) + + pool: Pool + dbapi_connection: DBAPIConnection + _echo: log._EchoFlagType + + def __init__( + self, + pool: Pool, + dbapi_connection: DBAPIConnection, + connection_record: _ConnectionRecord, + echo: log._EchoFlagType, + ): + self._pool = pool + self._counter = 0 + self.dbapi_connection = dbapi_connection + self._connection_record = connection_record + self._echo = echo + + _connection_record: Optional[_ConnectionRecord] + + @property + def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501 + if self._connection_record is None: + return None + return self._connection_record.driver_connection + + @property + @util.deprecated( + "2.0", + "The _ConnectionFairy.connection attribute is deprecated; " + "please use 'driver_connection'", + ) + def connection(self) -> DBAPIConnection: + return self.dbapi_connection + + @classmethod + def _checkout( + cls, + pool: Pool, + threadconns: Optional[threading.local] = None, + fairy: Optional[_ConnectionFairy] = None, + ) -> _ConnectionFairy: + if not fairy: + fairy = _ConnectionRecord.checkout(pool) + + if threadconns is not None: + threadconns.current = weakref.ref(fairy) + + assert ( + fairy._connection_record is not None + ), "can't 'checkout' a detached connection fairy" + assert ( + fairy.dbapi_connection is not None + ), "can't 'checkout' an invalidated connection fairy" + + fairy._counter += 1 + if ( + not pool.dispatch.checkout and not pool._pre_ping + ) or fairy._counter != 1: + return fairy + + # Pool listeners can trigger a reconnection on checkout, as well + # as the pre-pinger. + # there are three attempts made here, but note that if the database + # is not accessible from a connection standpoint, those won't proceed + # here. + + attempts = 2 + + while attempts > 0: + connection_is_fresh = fairy._connection_record.fresh + fairy._connection_record.fresh = False + try: + if pool._pre_ping: + if not connection_is_fresh: + if fairy._echo: + pool.logger.debug( + "Pool pre-ping on connection %s", + fairy.dbapi_connection, + ) + result = pool._dialect._do_ping_w_event( + fairy.dbapi_connection + ) + if not result: + if fairy._echo: + pool.logger.debug( + "Pool pre-ping on connection %s failed, " + "will invalidate pool", + fairy.dbapi_connection, + ) + raise exc.InvalidatePoolError() + elif fairy._echo: + pool.logger.debug( + "Connection %s is fresh, skipping pre-ping", + fairy.dbapi_connection, + ) + + pool.dispatch.checkout( + fairy.dbapi_connection, fairy._connection_record, fairy + ) + return fairy + except exc.DisconnectionError as e: + if e.invalidate_pool: + pool.logger.info( + "Disconnection detected on checkout, " + "invalidating all pooled connections prior to " + "current timestamp (reason: %r)", + e, + ) + fairy._connection_record.invalidate(e) + pool._invalidate(fairy, e, _checkin=False) + else: + pool.logger.info( + "Disconnection detected on checkout, " + "invalidating individual connection %s (reason: %r)", + fairy.dbapi_connection, + e, + ) + fairy._connection_record.invalidate(e) + try: + fairy.dbapi_connection = ( + fairy._connection_record.get_connection() + ) + except BaseException as err: + with util.safe_reraise(): + fairy._connection_record._checkin_failed( + err, + _fairy_was_created=True, + ) + + # prevent _ConnectionFairy from being carried + # in the stack trace. Do this after the + # connection record has been checked in, so that + # if the del triggers a finalize fairy, it won't + # try to checkin a second time. + del fairy + + # never called, this is for code linters + raise + + attempts -= 1 + except BaseException as be_outer: + with util.safe_reraise(): + rec = fairy._connection_record + if rec is not None: + rec._checkin_failed( + be_outer, + _fairy_was_created=True, + ) + + # prevent _ConnectionFairy from being carried + # in the stack trace, see above + del fairy + + # never called, this is for code linters + raise + + pool.logger.info("Reconnection attempts exhausted on checkout") + fairy.invalidate() + raise exc.InvalidRequestError("This connection is closed") + + def _checkout_existing(self) -> _ConnectionFairy: + return _ConnectionFairy._checkout(self._pool, fairy=self) + + def _checkin(self, transaction_was_reset: bool = False) -> None: + _finalize_fairy( + self.dbapi_connection, + self._connection_record, + self._pool, + None, + self._echo, + transaction_was_reset=transaction_was_reset, + fairy=self, + ) + + def _close(self) -> None: + self._checkin() + + def _reset( + self, + pool: Pool, + transaction_was_reset: bool, + terminate_only: bool, + asyncio_safe: bool, + ) -> None: + if pool.dispatch.reset: + pool.dispatch.reset( + self.dbapi_connection, + self._connection_record, + PoolResetState( + transaction_was_reset=transaction_was_reset, + terminate_only=terminate_only, + asyncio_safe=asyncio_safe, + ), + ) + + if not asyncio_safe: + return + + if pool._reset_on_return is reset_rollback: + if transaction_was_reset: + if self._echo: + pool.logger.debug( + "Connection %s reset, transaction already reset", + self.dbapi_connection, + ) + else: + if self._echo: + pool.logger.debug( + "Connection %s rollback-on-return", + self.dbapi_connection, + ) + pool._dialect.do_rollback(self) + elif pool._reset_on_return is reset_commit: + if self._echo: + pool.logger.debug( + "Connection %s commit-on-return", + self.dbapi_connection, + ) + pool._dialect.do_commit(self) + + @property + def _logger(self) -> log._IdentifiedLoggerType: + return self._pool.logger + + @property + def is_valid(self) -> bool: + return self.dbapi_connection is not None + + @property + def is_detached(self) -> bool: + return self._connection_record is None + + @util.ro_memoized_property + def info(self) -> _InfoType: + if self._connection_record is None: + return {} + else: + return self._connection_record.info + + @util.ro_non_memoized_property + def record_info(self) -> Optional[_InfoType]: + if self._connection_record is None: + return None + else: + return self._connection_record.record_info + + def invalidate( + self, e: Optional[BaseException] = None, soft: bool = False + ) -> None: + if self.dbapi_connection is None: + util.warn("Can't invalidate an already-closed connection.") + return + if self._connection_record: + self._connection_record.invalidate(e=e, soft=soft) + if not soft: + # prevent any rollback / reset actions etc. on + # the connection + self.dbapi_connection = None # type: ignore + + # finalize + self._checkin() + + def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor: + assert self.dbapi_connection is not None + return self.dbapi_connection.cursor(*args, **kwargs) + + def __getattr__(self, key: str) -> Any: + return getattr(self.dbapi_connection, key) + + def detach(self) -> None: + if self._connection_record is not None: + rec = self._connection_record + rec.fairy_ref = None + rec.dbapi_connection = None + # TODO: should this be _return_conn? + self._pool._do_return_conn(self._connection_record) + + # can't get the descriptor assignment to work here + # in pylance. mypy is OK w/ it + self.info = self.info.copy() # type: ignore + + self._connection_record = None + + if self._pool.dispatch.detach: + self._pool.dispatch.detach(self.dbapi_connection, rec) + + def close(self) -> None: + self._counter -= 1 + if self._counter == 0: + self._checkin() + + def _close_special(self, transaction_reset: bool = False) -> None: + self._counter -= 1 + if self._counter == 0: + self._checkin(transaction_was_reset=transaction_reset) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/pool/events.py b/venv/lib/python3.11/site-packages/sqlalchemy/pool/events.py new file mode 100644 index 0000000..4b4f4e4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/pool/events.py @@ -0,0 +1,370 @@ +# pool/events.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +from __future__ import annotations + +import typing +from typing import Any +from typing import Optional +from typing import Type +from typing import Union + +from .base import ConnectionPoolEntry +from .base import Pool +from .base import PoolProxiedConnection +from .base import PoolResetState +from .. import event +from .. import util + +if typing.TYPE_CHECKING: + from ..engine import Engine + from ..engine.interfaces import DBAPIConnection + + +class PoolEvents(event.Events[Pool]): + """Available events for :class:`_pool.Pool`. + + The methods here define the name of an event as well + as the names of members that are passed to listener + functions. + + e.g.:: + + from sqlalchemy import event + + def my_on_checkout(dbapi_conn, connection_rec, connection_proxy): + "handle an on checkout event" + + event.listen(Pool, 'checkout', my_on_checkout) + + In addition to accepting the :class:`_pool.Pool` class and + :class:`_pool.Pool` instances, :class:`_events.PoolEvents` also accepts + :class:`_engine.Engine` objects and the :class:`_engine.Engine` class as + targets, which will be resolved to the ``.pool`` attribute of the + given engine or the :class:`_pool.Pool` class:: + + engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") + + # will associate with engine.pool + event.listen(engine, 'checkout', my_on_checkout) + + """ # noqa: E501 + + _target_class_doc = "SomeEngineOrPool" + _dispatch_target = Pool + + @util.preload_module("sqlalchemy.engine") + @classmethod + def _accept_with( + cls, + target: Union[Pool, Type[Pool], Engine, Type[Engine]], + identifier: str, + ) -> Optional[Union[Pool, Type[Pool]]]: + if not typing.TYPE_CHECKING: + Engine = util.preloaded.engine.Engine + + if isinstance(target, type): + if issubclass(target, Engine): + return Pool + else: + assert issubclass(target, Pool) + return target + elif isinstance(target, Engine): + return target.pool + elif isinstance(target, Pool): + return target + elif hasattr(target, "_no_async_engine_events"): + target._no_async_engine_events() + else: + return None + + @classmethod + def _listen( + cls, + event_key: event._EventKey[Pool], + **kw: Any, + ) -> None: + target = event_key.dispatch_target + + kw.setdefault("asyncio", target._is_asyncio) + + event_key.base_listen(**kw) + + def connect( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: + """Called at the moment a particular DBAPI connection is first + created for a given :class:`_pool.Pool`. + + This event allows one to capture the point directly after which + the DBAPI module-level ``.connect()`` method has been used in order + to produce a new DBAPI connection. + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. + + """ + + def first_connect( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: + """Called exactly once for the first time a DBAPI connection is + checked out from a particular :class:`_pool.Pool`. + + The rationale for :meth:`_events.PoolEvents.first_connect` + is to determine + information about a particular series of database connections based + on the settings used for all connections. Since a particular + :class:`_pool.Pool` + refers to a single "creator" function (which in terms + of a :class:`_engine.Engine` + refers to the URL and connection options used), + it is typically valid to make observations about a single connection + that can be safely assumed to be valid about all subsequent + connections, such as the database version, the server and client + encoding settings, collation settings, and many others. + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. + + """ + + def checkout( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + connection_proxy: PoolProxiedConnection, + ) -> None: + """Called when a connection is retrieved from the Pool. + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. + + :param connection_proxy: the :class:`.PoolProxiedConnection` object + which will proxy the public interface of the DBAPI connection for the + lifespan of the checkout. + + If you raise a :class:`~sqlalchemy.exc.DisconnectionError`, the current + connection will be disposed and a fresh connection retrieved. + Processing of all checkout listeners will abort and restart + using the new connection. + + .. seealso:: :meth:`_events.ConnectionEvents.engine_connect` + - a similar event + which occurs upon creation of a new :class:`_engine.Connection`. + + """ + + def checkin( + self, + dbapi_connection: Optional[DBAPIConnection], + connection_record: ConnectionPoolEntry, + ) -> None: + """Called when a connection returns to the pool. + + Note that the connection may be closed, and may be None if the + connection has been invalidated. ``checkin`` will not be called + for detached connections. (They do not return to the pool.) + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. + + """ + + @event._legacy_signature( + "2.0", + ["dbapi_connection", "connection_record"], + lambda dbapi_connection, connection_record, reset_state: ( + dbapi_connection, + connection_record, + ), + ) + def reset( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + reset_state: PoolResetState, + ) -> None: + """Called before the "reset" action occurs for a pooled connection. + + This event represents + when the ``rollback()`` method is called on the DBAPI connection + before it is returned to the pool or discarded. + A custom "reset" strategy may be implemented using this event hook, + which may also be combined with disabling the default "reset" + behavior using the :paramref:`_pool.Pool.reset_on_return` parameter. + + The primary difference between the :meth:`_events.PoolEvents.reset` and + :meth:`_events.PoolEvents.checkin` events are that + :meth:`_events.PoolEvents.reset` is called not just for pooled + connections that are being returned to the pool, but also for + connections that were detached using the + :meth:`_engine.Connection.detach` method as well as asyncio connections + that are being discarded due to garbage collection taking place on + connections before the connection was checked in. + + Note that the event **is not** invoked for connections that were + invalidated using :meth:`_engine.Connection.invalidate`. These + events may be intercepted using the :meth:`.PoolEvents.soft_invalidate` + and :meth:`.PoolEvents.invalidate` event hooks, and all "connection + close" events may be intercepted using :meth:`.PoolEvents.close`. + + The :meth:`_events.PoolEvents.reset` event is usually followed by the + :meth:`_events.PoolEvents.checkin` event, except in those + cases where the connection is discarded immediately after reset. + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. + + :param reset_state: :class:`.PoolResetState` instance which provides + information about the circumstances under which the connection + is being reset. + + .. versionadded:: 2.0 + + .. seealso:: + + :ref:`pool_reset_on_return` + + :meth:`_events.ConnectionEvents.rollback` + + :meth:`_events.ConnectionEvents.commit` + + """ + + def invalidate( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + exception: Optional[BaseException], + ) -> None: + """Called when a DBAPI connection is to be "invalidated". + + This event is called any time the + :meth:`.ConnectionPoolEntry.invalidate` method is invoked, either from + API usage or via "auto-invalidation", without the ``soft`` flag. + + The event occurs before a final attempt to call ``.close()`` on the + connection occurs. + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. + + :param exception: the exception object corresponding to the reason + for this invalidation, if any. May be ``None``. + + .. seealso:: + + :ref:`pool_connection_invalidation` + + """ + + def soft_invalidate( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + exception: Optional[BaseException], + ) -> None: + """Called when a DBAPI connection is to be "soft invalidated". + + This event is called any time the + :meth:`.ConnectionPoolEntry.invalidate` + method is invoked with the ``soft`` flag. + + Soft invalidation refers to when the connection record that tracks + this connection will force a reconnect after the current connection + is checked in. It does not actively close the dbapi_connection + at the point at which it is called. + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. + + :param exception: the exception object corresponding to the reason + for this invalidation, if any. May be ``None``. + + """ + + def close( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: + """Called when a DBAPI connection is closed. + + The event is emitted before the close occurs. + + The close of a connection can fail; typically this is because + the connection is already closed. If the close operation fails, + the connection is discarded. + + The :meth:`.close` event corresponds to a connection that's still + associated with the pool. To intercept close events for detached + connections use :meth:`.close_detached`. + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. + + """ + + def detach( + self, + dbapi_connection: DBAPIConnection, + connection_record: ConnectionPoolEntry, + ) -> None: + """Called when a DBAPI connection is "detached" from a pool. + + This event is emitted after the detach occurs. The connection + is no longer associated with the given connection record. + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + :param connection_record: the :class:`.ConnectionPoolEntry` managing + the DBAPI connection. + + """ + + def close_detached(self, dbapi_connection: DBAPIConnection) -> None: + """Called when a detached DBAPI connection is closed. + + The event is emitted before the close occurs. + + The close of a connection can fail; typically this is because + the connection is already closed. If the close operation fails, + the connection is discarded. + + :param dbapi_connection: a DBAPI connection. + The :attr:`.ConnectionPoolEntry.dbapi_connection` attribute. + + """ diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/pool/impl.py b/venv/lib/python3.11/site-packages/sqlalchemy/pool/impl.py new file mode 100644 index 0000000..157455c --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/pool/impl.py @@ -0,0 +1,581 @@ +# pool/impl.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + + +"""Pool implementation classes. + +""" +from __future__ import annotations + +import threading +import traceback +import typing +from typing import Any +from typing import cast +from typing import List +from typing import Optional +from typing import Set +from typing import Type +from typing import TYPE_CHECKING +from typing import Union +import weakref + +from .base import _AsyncConnDialect +from .base import _ConnectionFairy +from .base import _ConnectionRecord +from .base import _CreatorFnType +from .base import _CreatorWRecFnType +from .base import ConnectionPoolEntry +from .base import Pool +from .base import PoolProxiedConnection +from .. import exc +from .. import util +from ..util import chop_traceback +from ..util import queue as sqla_queue +from ..util.typing import Literal + +if typing.TYPE_CHECKING: + from ..engine.interfaces import DBAPIConnection + + +class QueuePool(Pool): + """A :class:`_pool.Pool` + that imposes a limit on the number of open connections. + + :class:`.QueuePool` is the default pooling implementation used for + all :class:`_engine.Engine` objects other than SQLite with a ``:memory:`` + database. + + The :class:`.QueuePool` class **is not compatible** with asyncio and + :func:`_asyncio.create_async_engine`. The + :class:`.AsyncAdaptedQueuePool` class is used automatically when + using :func:`_asyncio.create_async_engine`, if no other kind of pool + is specified. + + .. seealso:: + + :class:`.AsyncAdaptedQueuePool` + + """ + + _is_asyncio = False # type: ignore[assignment] + + _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( + sqla_queue.Queue + ) + + _pool: sqla_queue.QueueCommon[ConnectionPoolEntry] + + def __init__( + self, + creator: Union[_CreatorFnType, _CreatorWRecFnType], + pool_size: int = 5, + max_overflow: int = 10, + timeout: float = 30.0, + use_lifo: bool = False, + **kw: Any, + ): + r""" + Construct a QueuePool. + + :param creator: a callable function that returns a DB-API + connection object, same as that of :paramref:`_pool.Pool.creator`. + + :param pool_size: The size of the pool to be maintained, + defaults to 5. This is the largest number of connections that + will be kept persistently in the pool. Note that the pool + begins with no connections; once this number of connections + is requested, that number of connections will remain. + ``pool_size`` can be set to 0 to indicate no size limit; to + disable pooling, use a :class:`~sqlalchemy.pool.NullPool` + instead. + + :param max_overflow: The maximum overflow size of the + pool. When the number of checked-out connections reaches the + size set in pool_size, additional connections will be + returned up to this limit. When those additional connections + are returned to the pool, they are disconnected and + discarded. It follows then that the total number of + simultaneous connections the pool will allow is pool_size + + `max_overflow`, and the total number of "sleeping" + connections the pool will allow is pool_size. `max_overflow` + can be set to -1 to indicate no overflow limit; no limit + will be placed on the total number of concurrent + connections. Defaults to 10. + + :param timeout: The number of seconds to wait before giving up + on returning a connection. Defaults to 30.0. This can be a float + but is subject to the limitations of Python time functions which + may not be reliable in the tens of milliseconds. + + :param use_lifo: use LIFO (last-in-first-out) when retrieving + connections instead of FIFO (first-in-first-out). Using LIFO, a + server-side timeout scheme can reduce the number of connections used + during non-peak periods of use. When planning for server-side + timeouts, ensure that a recycle or pre-ping strategy is in use to + gracefully handle stale connections. + + .. versionadded:: 1.3 + + .. seealso:: + + :ref:`pool_use_lifo` + + :ref:`pool_disconnects` + + :param \**kw: Other keyword arguments including + :paramref:`_pool.Pool.recycle`, :paramref:`_pool.Pool.echo`, + :paramref:`_pool.Pool.reset_on_return` and others are passed to the + :class:`_pool.Pool` constructor. + + """ + + Pool.__init__(self, creator, **kw) + self._pool = self._queue_class(pool_size, use_lifo=use_lifo) + self._overflow = 0 - pool_size + self._max_overflow = -1 if pool_size == 0 else max_overflow + self._timeout = timeout + self._overflow_lock = threading.Lock() + + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: + try: + self._pool.put(record, False) + except sqla_queue.Full: + try: + record.close() + finally: + self._dec_overflow() + + def _do_get(self) -> ConnectionPoolEntry: + use_overflow = self._max_overflow > -1 + + wait = use_overflow and self._overflow >= self._max_overflow + try: + return self._pool.get(wait, self._timeout) + except sqla_queue.Empty: + # don't do things inside of "except Empty", because when we say + # we timed out or can't connect and raise, Python 3 tells + # people the real error is queue.Empty which it isn't. + pass + if use_overflow and self._overflow >= self._max_overflow: + if not wait: + return self._do_get() + else: + raise exc.TimeoutError( + "QueuePool limit of size %d overflow %d reached, " + "connection timed out, timeout %0.2f" + % (self.size(), self.overflow(), self._timeout), + code="3o7r", + ) + + if self._inc_overflow(): + try: + return self._create_connection() + except: + with util.safe_reraise(): + self._dec_overflow() + raise + else: + return self._do_get() + + def _inc_overflow(self) -> bool: + if self._max_overflow == -1: + self._overflow += 1 + return True + with self._overflow_lock: + if self._overflow < self._max_overflow: + self._overflow += 1 + return True + else: + return False + + def _dec_overflow(self) -> Literal[True]: + if self._max_overflow == -1: + self._overflow -= 1 + return True + with self._overflow_lock: + self._overflow -= 1 + return True + + def recreate(self) -> QueuePool: + self.logger.info("Pool recreating") + return self.__class__( + self._creator, + pool_size=self._pool.maxsize, + max_overflow=self._max_overflow, + pre_ping=self._pre_ping, + use_lifo=self._pool.use_lifo, + timeout=self._timeout, + recycle=self._recycle, + echo=self.echo, + logging_name=self._orig_logging_name, + reset_on_return=self._reset_on_return, + _dispatch=self.dispatch, + dialect=self._dialect, + ) + + def dispose(self) -> None: + while True: + try: + conn = self._pool.get(False) + conn.close() + except sqla_queue.Empty: + break + + self._overflow = 0 - self.size() + self.logger.info("Pool disposed. %s", self.status()) + + def status(self) -> str: + return ( + "Pool size: %d Connections in pool: %d " + "Current Overflow: %d Current Checked out " + "connections: %d" + % ( + self.size(), + self.checkedin(), + self.overflow(), + self.checkedout(), + ) + ) + + def size(self) -> int: + return self._pool.maxsize + + def timeout(self) -> float: + return self._timeout + + def checkedin(self) -> int: + return self._pool.qsize() + + def overflow(self) -> int: + return self._overflow if self._pool.maxsize else 0 + + def checkedout(self) -> int: + return self._pool.maxsize - self._pool.qsize() + self._overflow + + +class AsyncAdaptedQueuePool(QueuePool): + """An asyncio-compatible version of :class:`.QueuePool`. + + This pool is used by default when using :class:`.AsyncEngine` engines that + were generated from :func:`_asyncio.create_async_engine`. It uses an + asyncio-compatible queue implementation that does not use + ``threading.Lock``. + + The arguments and operation of :class:`.AsyncAdaptedQueuePool` are + otherwise identical to that of :class:`.QueuePool`. + + """ + + _is_asyncio = True # type: ignore[assignment] + _queue_class: Type[sqla_queue.QueueCommon[ConnectionPoolEntry]] = ( + sqla_queue.AsyncAdaptedQueue + ) + + _dialect = _AsyncConnDialect() + + +class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool): + _queue_class = sqla_queue.FallbackAsyncAdaptedQueue + + +class NullPool(Pool): + """A Pool which does not pool connections. + + Instead it literally opens and closes the underlying DB-API connection + per each connection open/close. + + Reconnect-related functions such as ``recycle`` and connection + invalidation are not supported by this Pool implementation, since + no connections are held persistently. + + The :class:`.NullPool` class **is compatible** with asyncio and + :func:`_asyncio.create_async_engine`. + + """ + + def status(self) -> str: + return "NullPool" + + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: + record.close() + + def _do_get(self) -> ConnectionPoolEntry: + return self._create_connection() + + def recreate(self) -> NullPool: + self.logger.info("Pool recreating") + + return self.__class__( + self._creator, + recycle=self._recycle, + echo=self.echo, + logging_name=self._orig_logging_name, + reset_on_return=self._reset_on_return, + pre_ping=self._pre_ping, + _dispatch=self.dispatch, + dialect=self._dialect, + ) + + def dispose(self) -> None: + pass + + +class SingletonThreadPool(Pool): + """A Pool that maintains one connection per thread. + + Maintains one connection per each thread, never moving a connection to a + thread other than the one which it was created in. + + .. warning:: the :class:`.SingletonThreadPool` will call ``.close()`` + on arbitrary connections that exist beyond the size setting of + ``pool_size``, e.g. if more unique **thread identities** + than what ``pool_size`` states are used. This cleanup is + non-deterministic and not sensitive to whether or not the connections + linked to those thread identities are currently in use. + + :class:`.SingletonThreadPool` may be improved in a future release, + however in its current status it is generally used only for test + scenarios using a SQLite ``:memory:`` database and is not recommended + for production use. + + The :class:`.SingletonThreadPool` class **is not compatible** with asyncio + and :func:`_asyncio.create_async_engine`. + + + Options are the same as those of :class:`_pool.Pool`, as well as: + + :param pool_size: The number of threads in which to maintain connections + at once. Defaults to five. + + :class:`.SingletonThreadPool` is used by the SQLite dialect + automatically when a memory-based database is used. + See :ref:`sqlite_toplevel`. + + """ + + _is_asyncio = False # type: ignore[assignment] + + def __init__( + self, + creator: Union[_CreatorFnType, _CreatorWRecFnType], + pool_size: int = 5, + **kw: Any, + ): + Pool.__init__(self, creator, **kw) + self._conn = threading.local() + self._fairy = threading.local() + self._all_conns: Set[ConnectionPoolEntry] = set() + self.size = pool_size + + def recreate(self) -> SingletonThreadPool: + self.logger.info("Pool recreating") + return self.__class__( + self._creator, + pool_size=self.size, + recycle=self._recycle, + echo=self.echo, + pre_ping=self._pre_ping, + logging_name=self._orig_logging_name, + reset_on_return=self._reset_on_return, + _dispatch=self.dispatch, + dialect=self._dialect, + ) + + def dispose(self) -> None: + """Dispose of this pool.""" + + for conn in self._all_conns: + try: + conn.close() + except Exception: + # pysqlite won't even let you close a conn from a thread + # that didn't create it + pass + + self._all_conns.clear() + + def _cleanup(self) -> None: + while len(self._all_conns) >= self.size: + c = self._all_conns.pop() + c.close() + + def status(self) -> str: + return "SingletonThreadPool id:%d size: %d" % ( + id(self), + len(self._all_conns), + ) + + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: + try: + del self._fairy.current + except AttributeError: + pass + + def _do_get(self) -> ConnectionPoolEntry: + try: + if TYPE_CHECKING: + c = cast(ConnectionPoolEntry, self._conn.current()) + else: + c = self._conn.current() + if c: + return c + except AttributeError: + pass + c = self._create_connection() + self._conn.current = weakref.ref(c) + if len(self._all_conns) >= self.size: + self._cleanup() + self._all_conns.add(c) + return c + + def connect(self) -> PoolProxiedConnection: + # vendored from Pool to include the now removed use_threadlocal + # behavior + try: + rec = cast(_ConnectionFairy, self._fairy.current()) + except AttributeError: + pass + else: + if rec is not None: + return rec._checkout_existing() + + return _ConnectionFairy._checkout(self, self._fairy) + + +class StaticPool(Pool): + """A Pool of exactly one connection, used for all requests. + + Reconnect-related functions such as ``recycle`` and connection + invalidation (which is also used to support auto-reconnect) are only + partially supported right now and may not yield good results. + + The :class:`.StaticPool` class **is compatible** with asyncio and + :func:`_asyncio.create_async_engine`. + + """ + + @util.memoized_property + def connection(self) -> _ConnectionRecord: + return _ConnectionRecord(self) + + def status(self) -> str: + return "StaticPool" + + def dispose(self) -> None: + if ( + "connection" in self.__dict__ + and self.connection.dbapi_connection is not None + ): + self.connection.close() + del self.__dict__["connection"] + + def recreate(self) -> StaticPool: + self.logger.info("Pool recreating") + return self.__class__( + creator=self._creator, + recycle=self._recycle, + reset_on_return=self._reset_on_return, + pre_ping=self._pre_ping, + echo=self.echo, + logging_name=self._orig_logging_name, + _dispatch=self.dispatch, + dialect=self._dialect, + ) + + def _transfer_from(self, other_static_pool: StaticPool) -> None: + # used by the test suite to make a new engine / pool without + # losing the state of an existing SQLite :memory: connection + def creator(rec: ConnectionPoolEntry) -> DBAPIConnection: + conn = other_static_pool.connection.dbapi_connection + assert conn is not None + return conn + + self._invoke_creator = creator + + def _create_connection(self) -> ConnectionPoolEntry: + raise NotImplementedError() + + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: + pass + + def _do_get(self) -> ConnectionPoolEntry: + rec = self.connection + if rec._is_hard_or_soft_invalidated(): + del self.__dict__["connection"] + rec = self.connection + + return rec + + +class AssertionPool(Pool): + """A :class:`_pool.Pool` that allows at most one checked out connection at + any given time. + + This will raise an exception if more than one connection is checked out + at a time. Useful for debugging code that is using more connections + than desired. + + The :class:`.AssertionPool` class **is compatible** with asyncio and + :func:`_asyncio.create_async_engine`. + + """ + + _conn: Optional[ConnectionPoolEntry] + _checkout_traceback: Optional[List[str]] + + def __init__(self, *args: Any, **kw: Any): + self._conn = None + self._checked_out = False + self._store_traceback = kw.pop("store_traceback", True) + self._checkout_traceback = None + Pool.__init__(self, *args, **kw) + + def status(self) -> str: + return "AssertionPool" + + def _do_return_conn(self, record: ConnectionPoolEntry) -> None: + if not self._checked_out: + raise AssertionError("connection is not checked out") + self._checked_out = False + assert record is self._conn + + def dispose(self) -> None: + self._checked_out = False + if self._conn: + self._conn.close() + + def recreate(self) -> AssertionPool: + self.logger.info("Pool recreating") + return self.__class__( + self._creator, + echo=self.echo, + pre_ping=self._pre_ping, + recycle=self._recycle, + reset_on_return=self._reset_on_return, + logging_name=self._orig_logging_name, + _dispatch=self.dispatch, + dialect=self._dialect, + ) + + def _do_get(self) -> ConnectionPoolEntry: + if self._checked_out: + if self._checkout_traceback: + suffix = " at:\n%s" % "".join( + chop_traceback(self._checkout_traceback) + ) + else: + suffix = "" + raise AssertionError("connection is already checked out" + suffix) + + if not self._conn: + self._conn = self._create_connection() + + self._checked_out = True + if self._store_traceback: + self._checkout_traceback = traceback.format_stack() + return self._conn |