summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py1515
1 files changed, 1515 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py
new file mode 100644
index 0000000..98d2027
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/pool/base.py
@@ -0,0 +1,1515 @@
+# pool/base.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+
+"""Base constructs for connection pools.
+
+"""
+
+from __future__ import annotations
+
+from collections import deque
+import dataclasses
+from enum import Enum
+import threading
+import time
+import typing
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Deque
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+import weakref
+
+from .. import event
+from .. import exc
+from .. import log
+from .. import util
+from ..util.typing import Literal
+from ..util.typing import Protocol
+
+if TYPE_CHECKING:
+ from ..engine.interfaces import DBAPIConnection
+ from ..engine.interfaces import DBAPICursor
+ from ..engine.interfaces import Dialect
+ from ..event import _DispatchCommon
+ from ..event import _ListenerFnType
+ from ..event import dispatcher
+ from ..sql._typing import _InfoType
+
+
+@dataclasses.dataclass(frozen=True)
+class PoolResetState:
+ """describes the state of a DBAPI connection as it is being passed to
+ the :meth:`.PoolEvents.reset` connection pool event.
+
+ .. versionadded:: 2.0.0b3
+
+ """
+
+ __slots__ = ("transaction_was_reset", "terminate_only", "asyncio_safe")
+
+ transaction_was_reset: bool
+ """Indicates if the transaction on the DBAPI connection was already
+ essentially "reset" back by the :class:`.Connection` object.
+
+ This boolean is True if the :class:`.Connection` had transactional
+ state present upon it, which was then not closed using the
+ :meth:`.Connection.rollback` or :meth:`.Connection.commit` method;
+ instead, the transaction was closed inline within the
+ :meth:`.Connection.close` method so is guaranteed to remain non-present
+ when this event is reached.
+
+ """
+
+ terminate_only: bool
+ """indicates if the connection is to be immediately terminated and
+ not checked in to the pool.
+
+ This occurs for connections that were invalidated, as well as asyncio
+ connections that were not cleanly handled by the calling code that
+ are instead being garbage collected. In the latter case,
+ operations can't be safely run on asyncio connections within garbage
+ collection as there is not necessarily an event loop present.
+
+ """
+
+ asyncio_safe: bool
+ """Indicates if the reset operation is occurring within a scope where
+ an enclosing event loop is expected to be present for asyncio applications.
+
+ Will be False in the case that the connection is being garbage collected.
+
+ """
+
+
+class ResetStyle(Enum):
+ """Describe options for "reset on return" behaviors."""
+
+ reset_rollback = 0
+ reset_commit = 1
+ reset_none = 2
+
+
+_ResetStyleArgType = Union[
+ ResetStyle,
+ Literal[True, None, False, "commit", "rollback"],
+]
+reset_rollback, reset_commit, reset_none = list(ResetStyle)
+
+
+class _ConnDialect:
+ """partial implementation of :class:`.Dialect`
+ which provides DBAPI connection methods.
+
+ When a :class:`_pool.Pool` is combined with an :class:`_engine.Engine`,
+ the :class:`_engine.Engine` replaces this with its own
+ :class:`.Dialect`.
+
+ """
+
+ is_async = False
+ has_terminate = False
+
+ def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
+ dbapi_connection.rollback()
+
+ def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
+ dbapi_connection.commit()
+
+ def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+ dbapi_connection.close()
+
+ def do_close(self, dbapi_connection: DBAPIConnection) -> None:
+ dbapi_connection.close()
+
+ def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
+ raise NotImplementedError(
+ "The ping feature requires that a dialect is "
+ "passed to the connection pool."
+ )
+
+ def get_driver_connection(self, connection: DBAPIConnection) -> Any:
+ return connection
+
+
+class _AsyncConnDialect(_ConnDialect):
+ is_async = True
+
+
+class _CreatorFnType(Protocol):
+ def __call__(self) -> DBAPIConnection: ...
+
+
+class _CreatorWRecFnType(Protocol):
+ def __call__(self, rec: ConnectionPoolEntry) -> DBAPIConnection: ...
+
+
+class Pool(log.Identified, event.EventTarget):
+ """Abstract base class for connection pools."""
+
+ dispatch: dispatcher[Pool]
+ echo: log._EchoFlagType
+
+ _orig_logging_name: Optional[str]
+ _dialect: Union[_ConnDialect, Dialect] = _ConnDialect()
+ _creator_arg: Union[_CreatorFnType, _CreatorWRecFnType]
+ _invoke_creator: _CreatorWRecFnType
+ _invalidate_time: float
+
+ def __init__(
+ self,
+ creator: Union[_CreatorFnType, _CreatorWRecFnType],
+ recycle: int = -1,
+ echo: log._EchoFlagType = None,
+ logging_name: Optional[str] = None,
+ reset_on_return: _ResetStyleArgType = True,
+ events: Optional[List[Tuple[_ListenerFnType, str]]] = None,
+ dialect: Optional[Union[_ConnDialect, Dialect]] = None,
+ pre_ping: bool = False,
+ _dispatch: Optional[_DispatchCommon[Pool]] = None,
+ ):
+ """
+ Construct a Pool.
+
+ :param creator: a callable function that returns a DB-API
+ connection object. The function will be called with
+ parameters.
+
+ :param recycle: If set to a value other than -1, number of
+ seconds between connection recycling, which means upon
+ checkout, if this timeout is surpassed the connection will be
+ closed and replaced with a newly opened connection. Defaults to -1.
+
+ :param logging_name: String identifier which will be used within
+ the "name" field of logging records generated within the
+ "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
+ id.
+
+ :param echo: if True, the connection pool will log
+ informational output such as when connections are invalidated
+ as well as when connections are recycled to the default log handler,
+ which defaults to ``sys.stdout`` for output.. If set to the string
+ ``"debug"``, the logging will include pool checkouts and checkins.
+
+ The :paramref:`_pool.Pool.echo` parameter can also be set from the
+ :func:`_sa.create_engine` call by using the
+ :paramref:`_sa.create_engine.echo_pool` parameter.
+
+ .. seealso::
+
+ :ref:`dbengine_logging` - further detail on how to configure
+ logging.
+
+ :param reset_on_return: Determine steps to take on
+ connections as they are returned to the pool, which were
+ not otherwise handled by a :class:`_engine.Connection`.
+ Available from :func:`_sa.create_engine` via the
+ :paramref:`_sa.create_engine.pool_reset_on_return` parameter.
+
+ :paramref:`_pool.Pool.reset_on_return` can have any of these values:
+
+ * ``"rollback"`` - call rollback() on the connection,
+ to release locks and transaction resources.
+ This is the default value. The vast majority
+ of use cases should leave this value set.
+ * ``"commit"`` - call commit() on the connection,
+ to release locks and transaction resources.
+ A commit here may be desirable for databases that
+ cache query plans if a commit is emitted,
+ such as Microsoft SQL Server. However, this
+ value is more dangerous than 'rollback' because
+ any data changes present on the transaction
+ are committed unconditionally.
+ * ``None`` - don't do anything on the connection.
+ This setting may be appropriate if the database / DBAPI
+ works in pure "autocommit" mode at all times, or if
+ a custom reset handler is established using the
+ :meth:`.PoolEvents.reset` event handler.
+
+ * ``True`` - same as 'rollback', this is here for
+ backwards compatibility.
+ * ``False`` - same as None, this is here for
+ backwards compatibility.
+
+ For further customization of reset on return, the
+ :meth:`.PoolEvents.reset` event hook may be used which can perform
+ any connection activity desired on reset.
+
+ .. seealso::
+
+ :ref:`pool_reset_on_return`
+
+ :meth:`.PoolEvents.reset`
+
+ :param events: a list of 2-tuples, each of the form
+ ``(callable, target)`` which will be passed to :func:`.event.listen`
+ upon construction. Provided here so that event listeners
+ can be assigned via :func:`_sa.create_engine` before dialect-level
+ listeners are applied.
+
+ :param dialect: a :class:`.Dialect` that will handle the job
+ of calling rollback(), close(), or commit() on DBAPI connections.
+ If omitted, a built-in "stub" dialect is used. Applications that
+ make use of :func:`_sa.create_engine` should not use this parameter
+ as it is handled by the engine creation strategy.
+
+ :param pre_ping: if True, the pool will emit a "ping" (typically
+ "SELECT 1", but is dialect-specific) on the connection
+ upon checkout, to test if the connection is alive or not. If not,
+ the connection is transparently re-connected and upon success, all
+ other pooled connections established prior to that timestamp are
+ invalidated. Requires that a dialect is passed as well to
+ interpret the disconnection error.
+
+ .. versionadded:: 1.2
+
+ """
+ if logging_name:
+ self.logging_name = self._orig_logging_name = logging_name
+ else:
+ self._orig_logging_name = None
+
+ log.instance_logger(self, echoflag=echo)
+ self._creator = creator
+ self._recycle = recycle
+ self._invalidate_time = 0
+ self._pre_ping = pre_ping
+ self._reset_on_return = util.parse_user_argument_for_enum(
+ reset_on_return,
+ {
+ ResetStyle.reset_rollback: ["rollback", True],
+ ResetStyle.reset_none: ["none", None, False],
+ ResetStyle.reset_commit: ["commit"],
+ },
+ "reset_on_return",
+ )
+
+ self.echo = echo
+
+ if _dispatch:
+ self.dispatch._update(_dispatch, only_propagate=False)
+ if dialect:
+ self._dialect = dialect
+ if events:
+ for fn, target in events:
+ event.listen(self, target, fn)
+
+ @util.hybridproperty
+ def _is_asyncio(self) -> bool:
+ return self._dialect.is_async
+
+ @property
+ def _creator(self) -> Union[_CreatorFnType, _CreatorWRecFnType]:
+ return self._creator_arg
+
+ @_creator.setter
+ def _creator(
+ self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
+ ) -> None:
+ self._creator_arg = creator
+
+ # mypy seems to get super confused assigning functions to
+ # attributes
+ self._invoke_creator = self._should_wrap_creator(creator)
+
+ @_creator.deleter
+ def _creator(self) -> None:
+ # needed for mock testing
+ del self._creator_arg
+ del self._invoke_creator
+
+ def _should_wrap_creator(
+ self, creator: Union[_CreatorFnType, _CreatorWRecFnType]
+ ) -> _CreatorWRecFnType:
+ """Detect if creator accepts a single argument, or is sent
+ as a legacy style no-arg function.
+
+ """
+
+ try:
+ argspec = util.get_callable_argspec(self._creator, no_self=True)
+ except TypeError:
+ creator_fn = cast(_CreatorFnType, creator)
+ return lambda rec: creator_fn()
+
+ if argspec.defaults is not None:
+ defaulted = len(argspec.defaults)
+ else:
+ defaulted = 0
+ positionals = len(argspec[0]) - defaulted
+
+ # look for the exact arg signature that DefaultStrategy
+ # sends us
+ if (argspec[0], argspec[3]) == (["connection_record"], (None,)):
+ return cast(_CreatorWRecFnType, creator)
+ # or just a single positional
+ elif positionals == 1:
+ return cast(_CreatorWRecFnType, creator)
+ # all other cases, just wrap and assume legacy "creator" callable
+ # thing
+ else:
+ creator_fn = cast(_CreatorFnType, creator)
+ return lambda rec: creator_fn()
+
+ def _close_connection(
+ self, connection: DBAPIConnection, *, terminate: bool = False
+ ) -> None:
+ self.logger.debug(
+ "%s connection %r",
+ "Hard-closing" if terminate else "Closing",
+ connection,
+ )
+ try:
+ if terminate:
+ self._dialect.do_terminate(connection)
+ else:
+ self._dialect.do_close(connection)
+ except BaseException as e:
+ self.logger.error(
+ f"Exception {'terminating' if terminate else 'closing'} "
+ f"connection %r",
+ connection,
+ exc_info=True,
+ )
+ if not isinstance(e, Exception):
+ raise
+
+ def _create_connection(self) -> ConnectionPoolEntry:
+ """Called by subclasses to create a new ConnectionRecord."""
+
+ return _ConnectionRecord(self)
+
+ def _invalidate(
+ self,
+ connection: PoolProxiedConnection,
+ exception: Optional[BaseException] = None,
+ _checkin: bool = True,
+ ) -> None:
+ """Mark all connections established within the generation
+ of the given connection as invalidated.
+
+ If this pool's last invalidate time is before when the given
+ connection was created, update the timestamp til now. Otherwise,
+ no action is performed.
+
+ Connections with a start time prior to this pool's invalidation
+ time will be recycled upon next checkout.
+ """
+ rec = getattr(connection, "_connection_record", None)
+ if not rec or self._invalidate_time < rec.starttime:
+ self._invalidate_time = time.time()
+ if _checkin and getattr(connection, "is_valid", False):
+ connection.invalidate(exception)
+
+ def recreate(self) -> Pool:
+ """Return a new :class:`_pool.Pool`, of the same class as this one
+ and configured with identical creation arguments.
+
+ This method is used in conjunction with :meth:`dispose`
+ to close out an entire :class:`_pool.Pool` and create a new one in
+ its place.
+
+ """
+
+ raise NotImplementedError()
+
+ def dispose(self) -> None:
+ """Dispose of this pool.
+
+ This method leaves the possibility of checked-out connections
+ remaining open, as it only affects connections that are
+ idle in the pool.
+
+ .. seealso::
+
+ :meth:`Pool.recreate`
+
+ """
+
+ raise NotImplementedError()
+
+ def connect(self) -> PoolProxiedConnection:
+ """Return a DBAPI connection from the pool.
+
+ The connection is instrumented such that when its
+ ``close()`` method is called, the connection will be returned to
+ the pool.
+
+ """
+ return _ConnectionFairy._checkout(self)
+
+ def _return_conn(self, record: ConnectionPoolEntry) -> None:
+ """Given a _ConnectionRecord, return it to the :class:`_pool.Pool`.
+
+ This method is called when an instrumented DBAPI connection
+ has its ``close()`` method called.
+
+ """
+ self._do_return_conn(record)
+
+ def _do_get(self) -> ConnectionPoolEntry:
+ """Implementation for :meth:`get`, supplied by subclasses."""
+
+ raise NotImplementedError()
+
+ def _do_return_conn(self, record: ConnectionPoolEntry) -> None:
+ """Implementation for :meth:`return_conn`, supplied by subclasses."""
+
+ raise NotImplementedError()
+
+ def status(self) -> str:
+ raise NotImplementedError()
+
+
+class ManagesConnection:
+ """Common base for the two connection-management interfaces
+ :class:`.PoolProxiedConnection` and :class:`.ConnectionPoolEntry`.
+
+ These two objects are typically exposed in the public facing API
+ via the connection pool event hooks, documented at :class:`.PoolEvents`.
+
+ .. versionadded:: 2.0
+
+ """
+
+ __slots__ = ()
+
+ dbapi_connection: Optional[DBAPIConnection]
+ """A reference to the actual DBAPI connection being tracked.
+
+ This is a :pep:`249`-compliant object that for traditional sync-style
+ dialects is provided by the third-party
+ DBAPI implementation in use. For asyncio dialects, the implementation
+ is typically an adapter object provided by the SQLAlchemy dialect
+ itself; the underlying asyncio object is available via the
+ :attr:`.ManagesConnection.driver_connection` attribute.
+
+ SQLAlchemy's interface for the DBAPI connection is based on the
+ :class:`.DBAPIConnection` protocol object
+
+ .. seealso::
+
+ :attr:`.ManagesConnection.driver_connection`
+
+ :ref:`faq_dbapi_connection`
+
+ """
+
+ driver_connection: Optional[Any]
+ """The "driver level" connection object as used by the Python
+ DBAPI or database driver.
+
+ For traditional :pep:`249` DBAPI implementations, this object will
+ be the same object as that of
+ :attr:`.ManagesConnection.dbapi_connection`. For an asyncio database
+ driver, this will be the ultimate "connection" object used by that
+ driver, such as the ``asyncpg.Connection`` object which will not have
+ standard pep-249 methods.
+
+ .. versionadded:: 1.4.24
+
+ .. seealso::
+
+ :attr:`.ManagesConnection.dbapi_connection`
+
+ :ref:`faq_dbapi_connection`
+
+ """
+
+ @util.ro_memoized_property
+ def info(self) -> _InfoType:
+ """Info dictionary associated with the underlying DBAPI connection
+ referred to by this :class:`.ManagesConnection` instance, allowing
+ user-defined data to be associated with the connection.
+
+ The data in this dictionary is persistent for the lifespan
+ of the DBAPI connection itself, including across pool checkins
+ and checkouts. When the connection is invalidated
+ and replaced with a new one, this dictionary is cleared.
+
+ For a :class:`.PoolProxiedConnection` instance that's not associated
+ with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
+ attribute returns a dictionary that is local to that
+ :class:`.ConnectionPoolEntry`. Therefore the
+ :attr:`.ManagesConnection.info` attribute will always provide a Python
+ dictionary.
+
+ .. seealso::
+
+ :attr:`.ManagesConnection.record_info`
+
+
+ """
+ raise NotImplementedError()
+
+ @util.ro_memoized_property
+ def record_info(self) -> Optional[_InfoType]:
+ """Persistent info dictionary associated with this
+ :class:`.ManagesConnection`.
+
+ Unlike the :attr:`.ManagesConnection.info` dictionary, the lifespan
+ of this dictionary is that of the :class:`.ConnectionPoolEntry`
+ which owns it; therefore this dictionary will persist across
+ reconnects and connection invalidation for a particular entry
+ in the connection pool.
+
+ For a :class:`.PoolProxiedConnection` instance that's not associated
+ with a :class:`.ConnectionPoolEntry`, such as if it were detached, the
+ attribute returns None. Contrast to the :attr:`.ManagesConnection.info`
+ dictionary which is never None.
+
+
+ .. seealso::
+
+ :attr:`.ManagesConnection.info`
+
+ """
+ raise NotImplementedError()
+
+ def invalidate(
+ self, e: Optional[BaseException] = None, soft: bool = False
+ ) -> None:
+ """Mark the managed connection as invalidated.
+
+ :param e: an exception object indicating a reason for the invalidation.
+
+ :param soft: if True, the connection isn't closed; instead, this
+ connection will be recycled on next checkout.
+
+ .. seealso::
+
+ :ref:`pool_connection_invalidation`
+
+
+ """
+ raise NotImplementedError()
+
+
+class ConnectionPoolEntry(ManagesConnection):
+ """Interface for the object that maintains an individual database
+ connection on behalf of a :class:`_pool.Pool` instance.
+
+ The :class:`.ConnectionPoolEntry` object represents the long term
+ maintainance of a particular connection for a pool, including expiring or
+ invalidating that connection to have it replaced with a new one, which will
+ continue to be maintained by that same :class:`.ConnectionPoolEntry`
+ instance. Compared to :class:`.PoolProxiedConnection`, which is the
+ short-term, per-checkout connection manager, this object lasts for the
+ lifespan of a particular "slot" within a connection pool.
+
+ The :class:`.ConnectionPoolEntry` object is mostly visible to public-facing
+ API code when it is delivered to connection pool event hooks, such as
+ :meth:`_events.PoolEvents.connect` and :meth:`_events.PoolEvents.checkout`.
+
+ .. versionadded:: 2.0 :class:`.ConnectionPoolEntry` provides the public
+ facing interface for the :class:`._ConnectionRecord` internal class.
+
+ """
+
+ __slots__ = ()
+
+ @property
+ def in_use(self) -> bool:
+ """Return True the connection is currently checked out"""
+
+ raise NotImplementedError()
+
+ def close(self) -> None:
+ """Close the DBAPI connection managed by this connection pool entry."""
+ raise NotImplementedError()
+
+
+class _ConnectionRecord(ConnectionPoolEntry):
+ """Maintains a position in a connection pool which references a pooled
+ connection.
+
+ This is an internal object used by the :class:`_pool.Pool` implementation
+ to provide context management to a DBAPI connection maintained by
+ that :class:`_pool.Pool`. The public facing interface for this class
+ is described by the :class:`.ConnectionPoolEntry` class. See that
+ class for public API details.
+
+ .. seealso::
+
+ :class:`.ConnectionPoolEntry`
+
+ :class:`.PoolProxiedConnection`
+
+ """
+
+ __slots__ = (
+ "__pool",
+ "fairy_ref",
+ "finalize_callback",
+ "fresh",
+ "starttime",
+ "dbapi_connection",
+ "__weakref__",
+ "__dict__",
+ )
+
+ finalize_callback: Deque[Callable[[DBAPIConnection], None]]
+ fresh: bool
+ fairy_ref: Optional[weakref.ref[_ConnectionFairy]]
+ starttime: float
+
+ def __init__(self, pool: Pool, connect: bool = True):
+ self.fresh = False
+ self.fairy_ref = None
+ self.starttime = 0
+ self.dbapi_connection = None
+
+ self.__pool = pool
+ if connect:
+ self.__connect()
+ self.finalize_callback = deque()
+
+ dbapi_connection: Optional[DBAPIConnection]
+
+ @property
+ def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
+ if self.dbapi_connection is None:
+ return None
+ else:
+ return self.__pool._dialect.get_driver_connection(
+ self.dbapi_connection
+ )
+
+ @property
+ @util.deprecated(
+ "2.0",
+ "The _ConnectionRecord.connection attribute is deprecated; "
+ "please use 'driver_connection'",
+ )
+ def connection(self) -> Optional[DBAPIConnection]:
+ return self.dbapi_connection
+
+ _soft_invalidate_time: float = 0
+
+ @util.ro_memoized_property
+ def info(self) -> _InfoType:
+ return {}
+
+ @util.ro_memoized_property
+ def record_info(self) -> Optional[_InfoType]:
+ return {}
+
+ @classmethod
+ def checkout(cls, pool: Pool) -> _ConnectionFairy:
+ if TYPE_CHECKING:
+ rec = cast(_ConnectionRecord, pool._do_get())
+ else:
+ rec = pool._do_get()
+
+ try:
+ dbapi_connection = rec.get_connection()
+ except BaseException as err:
+ with util.safe_reraise():
+ rec._checkin_failed(err, _fairy_was_created=False)
+
+ # not reached, for code linters only
+ raise
+
+ echo = pool._should_log_debug()
+ fairy = _ConnectionFairy(pool, dbapi_connection, rec, echo)
+
+ rec.fairy_ref = ref = weakref.ref(
+ fairy,
+ lambda ref: (
+ _finalize_fairy(
+ None, rec, pool, ref, echo, transaction_was_reset=False
+ )
+ if _finalize_fairy is not None
+ else None
+ ),
+ )
+ _strong_ref_connection_records[ref] = rec
+ if echo:
+ pool.logger.debug(
+ "Connection %r checked out from pool", dbapi_connection
+ )
+ return fairy
+
+ def _checkin_failed(
+ self, err: BaseException, _fairy_was_created: bool = True
+ ) -> None:
+ self.invalidate(e=err)
+ self.checkin(
+ _fairy_was_created=_fairy_was_created,
+ )
+
+ def checkin(self, _fairy_was_created: bool = True) -> None:
+ if self.fairy_ref is None and _fairy_was_created:
+ # _fairy_was_created is False for the initial get connection phase;
+ # meaning there was no _ConnectionFairy and we must unconditionally
+ # do a checkin.
+ #
+ # otherwise, if fairy_was_created==True, if fairy_ref is None here
+ # that means we were checked in already, so this looks like
+ # a double checkin.
+ util.warn("Double checkin attempted on %s" % self)
+ return
+ self.fairy_ref = None
+ connection = self.dbapi_connection
+ pool = self.__pool
+ while self.finalize_callback:
+ finalizer = self.finalize_callback.pop()
+ if connection is not None:
+ finalizer(connection)
+ if pool.dispatch.checkin:
+ pool.dispatch.checkin(connection, self)
+
+ pool._return_conn(self)
+
+ @property
+ def in_use(self) -> bool:
+ return self.fairy_ref is not None
+
+ @property
+ def last_connect_time(self) -> float:
+ return self.starttime
+
+ def close(self) -> None:
+ if self.dbapi_connection is not None:
+ self.__close()
+
+ def invalidate(
+ self, e: Optional[BaseException] = None, soft: bool = False
+ ) -> None:
+ # already invalidated
+ if self.dbapi_connection is None:
+ return
+ if soft:
+ self.__pool.dispatch.soft_invalidate(
+ self.dbapi_connection, self, e
+ )
+ else:
+ self.__pool.dispatch.invalidate(self.dbapi_connection, self, e)
+ if e is not None:
+ self.__pool.logger.info(
+ "%sInvalidate connection %r (reason: %s:%s)",
+ "Soft " if soft else "",
+ self.dbapi_connection,
+ e.__class__.__name__,
+ e,
+ )
+ else:
+ self.__pool.logger.info(
+ "%sInvalidate connection %r",
+ "Soft " if soft else "",
+ self.dbapi_connection,
+ )
+
+ if soft:
+ self._soft_invalidate_time = time.time()
+ else:
+ self.__close(terminate=True)
+ self.dbapi_connection = None
+
+ def get_connection(self) -> DBAPIConnection:
+ recycle = False
+
+ # NOTE: the various comparisons here are assuming that measurable time
+ # passes between these state changes. however, time.time() is not
+ # guaranteed to have sub-second precision. comparisons of
+ # "invalidation time" to "starttime" should perhaps use >= so that the
+ # state change can take place assuming no measurable time has passed,
+ # however this does not guarantee correct behavior here as if time
+ # continues to not pass, it will try to reconnect repeatedly until
+ # these timestamps diverge, so in that sense using > is safer. Per
+ # https://stackoverflow.com/a/1938096/34549, Windows time.time() may be
+ # within 16 milliseconds accuracy, so unit tests for connection
+ # invalidation need a sleep of at least this long between initial start
+ # time and invalidation for the logic below to work reliably.
+
+ if self.dbapi_connection is None:
+ self.info.clear()
+ self.__connect()
+ elif (
+ self.__pool._recycle > -1
+ and time.time() - self.starttime > self.__pool._recycle
+ ):
+ self.__pool.logger.info(
+ "Connection %r exceeded timeout; recycling",
+ self.dbapi_connection,
+ )
+ recycle = True
+ elif self.__pool._invalidate_time > self.starttime:
+ self.__pool.logger.info(
+ "Connection %r invalidated due to pool invalidation; "
+ + "recycling",
+ self.dbapi_connection,
+ )
+ recycle = True
+ elif self._soft_invalidate_time > self.starttime:
+ self.__pool.logger.info(
+ "Connection %r invalidated due to local soft invalidation; "
+ + "recycling",
+ self.dbapi_connection,
+ )
+ recycle = True
+
+ if recycle:
+ self.__close(terminate=True)
+ self.info.clear()
+
+ self.__connect()
+
+ assert self.dbapi_connection is not None
+ return self.dbapi_connection
+
+ def _is_hard_or_soft_invalidated(self) -> bool:
+ return (
+ self.dbapi_connection is None
+ or self.__pool._invalidate_time > self.starttime
+ or (self._soft_invalidate_time > self.starttime)
+ )
+
+ def __close(self, *, terminate: bool = False) -> None:
+ self.finalize_callback.clear()
+ if self.__pool.dispatch.close:
+ self.__pool.dispatch.close(self.dbapi_connection, self)
+ assert self.dbapi_connection is not None
+ self.__pool._close_connection(
+ self.dbapi_connection, terminate=terminate
+ )
+ self.dbapi_connection = None
+
+ def __connect(self) -> None:
+ pool = self.__pool
+
+ # ensure any existing connection is removed, so that if
+ # creator fails, this attribute stays None
+ self.dbapi_connection = None
+ try:
+ self.starttime = time.time()
+ self.dbapi_connection = connection = pool._invoke_creator(self)
+ pool.logger.debug("Created new connection %r", connection)
+ self.fresh = True
+ except BaseException as e:
+ with util.safe_reraise():
+ pool.logger.debug("Error on connect(): %s", e)
+ else:
+ # in SQLAlchemy 1.4 the first_connect event is not used by
+ # the engine, so this will usually not be set
+ if pool.dispatch.first_connect:
+ pool.dispatch.first_connect.for_modify(
+ pool.dispatch
+ ).exec_once_unless_exception(self.dbapi_connection, self)
+
+ # init of the dialect now takes place within the connect
+ # event, so ensure a mutex is used on the first run
+ pool.dispatch.connect.for_modify(
+ pool.dispatch
+ )._exec_w_sync_on_first_run(self.dbapi_connection, self)
+
+
+def _finalize_fairy(
+ dbapi_connection: Optional[DBAPIConnection],
+ connection_record: Optional[_ConnectionRecord],
+ pool: Pool,
+ ref: Optional[
+ weakref.ref[_ConnectionFairy]
+ ], # this is None when called directly, not by the gc
+ echo: Optional[log._EchoFlagType],
+ transaction_was_reset: bool = False,
+ fairy: Optional[_ConnectionFairy] = None,
+) -> None:
+ """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
+ been garbage collected.
+
+ When using an async dialect no IO can happen here (without using
+ a dedicated thread), since this is called outside the greenlet
+ context and with an already running loop. In this case function
+ will only log a message and raise a warning.
+ """
+
+ is_gc_cleanup = ref is not None
+
+ if is_gc_cleanup:
+ assert ref is not None
+ _strong_ref_connection_records.pop(ref, None)
+ assert connection_record is not None
+ if connection_record.fairy_ref is not ref:
+ return
+ assert dbapi_connection is None
+ dbapi_connection = connection_record.dbapi_connection
+
+ elif fairy:
+ _strong_ref_connection_records.pop(weakref.ref(fairy), None)
+
+ # null pool is not _is_asyncio but can be used also with async dialects
+ dont_restore_gced = pool._dialect.is_async
+
+ if dont_restore_gced:
+ detach = connection_record is None or is_gc_cleanup
+ can_manipulate_connection = not is_gc_cleanup
+ can_close_or_terminate_connection = (
+ not pool._dialect.is_async or pool._dialect.has_terminate
+ )
+ requires_terminate_for_close = (
+ pool._dialect.is_async and pool._dialect.has_terminate
+ )
+
+ else:
+ detach = connection_record is None
+ can_manipulate_connection = can_close_or_terminate_connection = True
+ requires_terminate_for_close = False
+
+ if dbapi_connection is not None:
+ if connection_record and echo:
+ pool.logger.debug(
+ "Connection %r being returned to pool", dbapi_connection
+ )
+
+ try:
+ if not fairy:
+ assert connection_record is not None
+ fairy = _ConnectionFairy(
+ pool,
+ dbapi_connection,
+ connection_record,
+ echo,
+ )
+ assert fairy.dbapi_connection is dbapi_connection
+
+ fairy._reset(
+ pool,
+ transaction_was_reset=transaction_was_reset,
+ terminate_only=detach,
+ asyncio_safe=can_manipulate_connection,
+ )
+
+ if detach:
+ if connection_record:
+ fairy._pool = pool
+ fairy.detach()
+
+ if can_close_or_terminate_connection:
+ if pool.dispatch.close_detached:
+ pool.dispatch.close_detached(dbapi_connection)
+
+ pool._close_connection(
+ dbapi_connection,
+ terminate=requires_terminate_for_close,
+ )
+
+ except BaseException as e:
+ pool.logger.error(
+ "Exception during reset or similar", exc_info=True
+ )
+ if connection_record:
+ connection_record.invalidate(e=e)
+ if not isinstance(e, Exception):
+ raise
+ finally:
+ if detach and is_gc_cleanup and dont_restore_gced:
+ message = (
+ "The garbage collector is trying to clean up "
+ f"non-checked-in connection {dbapi_connection!r}, "
+ f"""which will be {
+ 'dropped, as it cannot be safely terminated'
+ if not can_close_or_terminate_connection
+ else 'terminated'
+ }. """
+ "Please ensure that SQLAlchemy pooled connections are "
+ "returned to "
+ "the pool explicitly, either by calling ``close()`` "
+ "or by using appropriate context managers to manage "
+ "their lifecycle."
+ )
+ pool.logger.error(message)
+ util.warn(message)
+
+ if connection_record and connection_record.fairy_ref is not None:
+ connection_record.checkin()
+
+ # give gc some help. See
+ # test/engine/test_pool.py::PoolEventsTest::test_checkin_event_gc[True]
+ # which actually started failing when pytest warnings plugin was
+ # turned on, due to util.warn() above
+ if fairy is not None:
+ fairy.dbapi_connection = None # type: ignore
+ fairy._connection_record = None
+ del dbapi_connection
+ del connection_record
+ del fairy
+
+
+# a dictionary of the _ConnectionFairy weakrefs to _ConnectionRecord, so that
+# GC under pypy will call ConnectionFairy finalizers. linked directly to the
+# weakref that will empty itself when collected so that it should not create
+# any unmanaged memory references.
+_strong_ref_connection_records: Dict[
+ weakref.ref[_ConnectionFairy], _ConnectionRecord
+] = {}
+
+
+class PoolProxiedConnection(ManagesConnection):
+ """A connection-like adapter for a :pep:`249` DBAPI connection, which
+ includes additional methods specific to the :class:`.Pool` implementation.
+
+ :class:`.PoolProxiedConnection` is the public-facing interface for the
+ internal :class:`._ConnectionFairy` implementation object; users familiar
+ with :class:`._ConnectionFairy` can consider this object to be equivalent.
+
+ .. versionadded:: 2.0 :class:`.PoolProxiedConnection` provides the public-
+ facing interface for the :class:`._ConnectionFairy` internal class.
+
+ """
+
+ __slots__ = ()
+
+ if typing.TYPE_CHECKING:
+
+ def commit(self) -> None: ...
+
+ def cursor(self) -> DBAPICursor: ...
+
+ def rollback(self) -> None: ...
+
+ @property
+ def is_valid(self) -> bool:
+ """Return True if this :class:`.PoolProxiedConnection` still refers
+ to an active DBAPI connection."""
+
+ raise NotImplementedError()
+
+ @property
+ def is_detached(self) -> bool:
+ """Return True if this :class:`.PoolProxiedConnection` is detached
+ from its pool."""
+
+ raise NotImplementedError()
+
+ def detach(self) -> None:
+ """Separate this connection from its Pool.
+
+ This means that the connection will no longer be returned to the
+ pool when closed, and will instead be literally closed. The
+ associated :class:`.ConnectionPoolEntry` is de-associated from this
+ DBAPI connection.
+
+ Note that any overall connection limiting constraints imposed by a
+ Pool implementation may be violated after a detach, as the detached
+ connection is removed from the pool's knowledge and control.
+
+ """
+
+ raise NotImplementedError()
+
+ def close(self) -> None:
+ """Release this connection back to the pool.
+
+ The :meth:`.PoolProxiedConnection.close` method shadows the
+ :pep:`249` ``.close()`` method, altering its behavior to instead
+ :term:`release` the proxied connection back to the connection pool.
+
+ Upon release to the pool, whether the connection stays "opened" and
+ pooled in the Python process, versus actually closed out and removed
+ from the Python process, is based on the pool implementation in use and
+ its configuration and current state.
+
+ """
+ raise NotImplementedError()
+
+
+class _AdhocProxiedConnection(PoolProxiedConnection):
+ """provides the :class:`.PoolProxiedConnection` interface for cases where
+ the DBAPI connection is not actually proxied.
+
+ This is used by the engine internals to pass a consistent
+ :class:`.PoolProxiedConnection` object to consuming dialects in response to
+ pool events that may not always have the :class:`._ConnectionFairy`
+ available.
+
+ """
+
+ __slots__ = ("dbapi_connection", "_connection_record", "_is_valid")
+
+ dbapi_connection: DBAPIConnection
+ _connection_record: ConnectionPoolEntry
+
+ def __init__(
+ self,
+ dbapi_connection: DBAPIConnection,
+ connection_record: ConnectionPoolEntry,
+ ):
+ self.dbapi_connection = dbapi_connection
+ self._connection_record = connection_record
+ self._is_valid = True
+
+ @property
+ def driver_connection(self) -> Any: # type: ignore[override] # mypy#4125
+ return self._connection_record.driver_connection
+
+ @property
+ def connection(self) -> DBAPIConnection:
+ return self.dbapi_connection
+
+ @property
+ def is_valid(self) -> bool:
+ """Implement is_valid state attribute.
+
+ for the adhoc proxied connection it's assumed the connection is valid
+ as there is no "invalidate" routine.
+
+ """
+ return self._is_valid
+
+ def invalidate(
+ self, e: Optional[BaseException] = None, soft: bool = False
+ ) -> None:
+ self._is_valid = False
+
+ @util.ro_non_memoized_property
+ def record_info(self) -> Optional[_InfoType]:
+ return self._connection_record.record_info
+
+ def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
+ return self.dbapi_connection.cursor(*args, **kwargs)
+
+ def __getattr__(self, key: Any) -> Any:
+ return getattr(self.dbapi_connection, key)
+
+
+class _ConnectionFairy(PoolProxiedConnection):
+ """Proxies a DBAPI connection and provides return-on-dereference
+ support.
+
+ This is an internal object used by the :class:`_pool.Pool` implementation
+ to provide context management to a DBAPI connection delivered by
+ that :class:`_pool.Pool`. The public facing interface for this class
+ is described by the :class:`.PoolProxiedConnection` class. See that
+ class for public API details.
+
+ The name "fairy" is inspired by the fact that the
+ :class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
+ only for the length of a specific DBAPI connection being checked out from
+ the pool, and additionally that as a transparent proxy, it is mostly
+ invisible.
+
+ .. seealso::
+
+ :class:`.PoolProxiedConnection`
+
+ :class:`.ConnectionPoolEntry`
+
+
+ """
+
+ __slots__ = (
+ "dbapi_connection",
+ "_connection_record",
+ "_echo",
+ "_pool",
+ "_counter",
+ "__weakref__",
+ "__dict__",
+ )
+
+ pool: Pool
+ dbapi_connection: DBAPIConnection
+ _echo: log._EchoFlagType
+
+ def __init__(
+ self,
+ pool: Pool,
+ dbapi_connection: DBAPIConnection,
+ connection_record: _ConnectionRecord,
+ echo: log._EchoFlagType,
+ ):
+ self._pool = pool
+ self._counter = 0
+ self.dbapi_connection = dbapi_connection
+ self._connection_record = connection_record
+ self._echo = echo
+
+ _connection_record: Optional[_ConnectionRecord]
+
+ @property
+ def driver_connection(self) -> Optional[Any]: # type: ignore[override] # mypy#4125 # noqa: E501
+ if self._connection_record is None:
+ return None
+ return self._connection_record.driver_connection
+
+ @property
+ @util.deprecated(
+ "2.0",
+ "The _ConnectionFairy.connection attribute is deprecated; "
+ "please use 'driver_connection'",
+ )
+ def connection(self) -> DBAPIConnection:
+ return self.dbapi_connection
+
+ @classmethod
+ def _checkout(
+ cls,
+ pool: Pool,
+ threadconns: Optional[threading.local] = None,
+ fairy: Optional[_ConnectionFairy] = None,
+ ) -> _ConnectionFairy:
+ if not fairy:
+ fairy = _ConnectionRecord.checkout(pool)
+
+ if threadconns is not None:
+ threadconns.current = weakref.ref(fairy)
+
+ assert (
+ fairy._connection_record is not None
+ ), "can't 'checkout' a detached connection fairy"
+ assert (
+ fairy.dbapi_connection is not None
+ ), "can't 'checkout' an invalidated connection fairy"
+
+ fairy._counter += 1
+ if (
+ not pool.dispatch.checkout and not pool._pre_ping
+ ) or fairy._counter != 1:
+ return fairy
+
+ # Pool listeners can trigger a reconnection on checkout, as well
+ # as the pre-pinger.
+ # there are three attempts made here, but note that if the database
+ # is not accessible from a connection standpoint, those won't proceed
+ # here.
+
+ attempts = 2
+
+ while attempts > 0:
+ connection_is_fresh = fairy._connection_record.fresh
+ fairy._connection_record.fresh = False
+ try:
+ if pool._pre_ping:
+ if not connection_is_fresh:
+ if fairy._echo:
+ pool.logger.debug(
+ "Pool pre-ping on connection %s",
+ fairy.dbapi_connection,
+ )
+ result = pool._dialect._do_ping_w_event(
+ fairy.dbapi_connection
+ )
+ if not result:
+ if fairy._echo:
+ pool.logger.debug(
+ "Pool pre-ping on connection %s failed, "
+ "will invalidate pool",
+ fairy.dbapi_connection,
+ )
+ raise exc.InvalidatePoolError()
+ elif fairy._echo:
+ pool.logger.debug(
+ "Connection %s is fresh, skipping pre-ping",
+ fairy.dbapi_connection,
+ )
+
+ pool.dispatch.checkout(
+ fairy.dbapi_connection, fairy._connection_record, fairy
+ )
+ return fairy
+ except exc.DisconnectionError as e:
+ if e.invalidate_pool:
+ pool.logger.info(
+ "Disconnection detected on checkout, "
+ "invalidating all pooled connections prior to "
+ "current timestamp (reason: %r)",
+ e,
+ )
+ fairy._connection_record.invalidate(e)
+ pool._invalidate(fairy, e, _checkin=False)
+ else:
+ pool.logger.info(
+ "Disconnection detected on checkout, "
+ "invalidating individual connection %s (reason: %r)",
+ fairy.dbapi_connection,
+ e,
+ )
+ fairy._connection_record.invalidate(e)
+ try:
+ fairy.dbapi_connection = (
+ fairy._connection_record.get_connection()
+ )
+ except BaseException as err:
+ with util.safe_reraise():
+ fairy._connection_record._checkin_failed(
+ err,
+ _fairy_was_created=True,
+ )
+
+ # prevent _ConnectionFairy from being carried
+ # in the stack trace. Do this after the
+ # connection record has been checked in, so that
+ # if the del triggers a finalize fairy, it won't
+ # try to checkin a second time.
+ del fairy
+
+ # never called, this is for code linters
+ raise
+
+ attempts -= 1
+ except BaseException as be_outer:
+ with util.safe_reraise():
+ rec = fairy._connection_record
+ if rec is not None:
+ rec._checkin_failed(
+ be_outer,
+ _fairy_was_created=True,
+ )
+
+ # prevent _ConnectionFairy from being carried
+ # in the stack trace, see above
+ del fairy
+
+ # never called, this is for code linters
+ raise
+
+ pool.logger.info("Reconnection attempts exhausted on checkout")
+ fairy.invalidate()
+ raise exc.InvalidRequestError("This connection is closed")
+
+ def _checkout_existing(self) -> _ConnectionFairy:
+ return _ConnectionFairy._checkout(self._pool, fairy=self)
+
+ def _checkin(self, transaction_was_reset: bool = False) -> None:
+ _finalize_fairy(
+ self.dbapi_connection,
+ self._connection_record,
+ self._pool,
+ None,
+ self._echo,
+ transaction_was_reset=transaction_was_reset,
+ fairy=self,
+ )
+
+ def _close(self) -> None:
+ self._checkin()
+
+ def _reset(
+ self,
+ pool: Pool,
+ transaction_was_reset: bool,
+ terminate_only: bool,
+ asyncio_safe: bool,
+ ) -> None:
+ if pool.dispatch.reset:
+ pool.dispatch.reset(
+ self.dbapi_connection,
+ self._connection_record,
+ PoolResetState(
+ transaction_was_reset=transaction_was_reset,
+ terminate_only=terminate_only,
+ asyncio_safe=asyncio_safe,
+ ),
+ )
+
+ if not asyncio_safe:
+ return
+
+ if pool._reset_on_return is reset_rollback:
+ if transaction_was_reset:
+ if self._echo:
+ pool.logger.debug(
+ "Connection %s reset, transaction already reset",
+ self.dbapi_connection,
+ )
+ else:
+ if self._echo:
+ pool.logger.debug(
+ "Connection %s rollback-on-return",
+ self.dbapi_connection,
+ )
+ pool._dialect.do_rollback(self)
+ elif pool._reset_on_return is reset_commit:
+ if self._echo:
+ pool.logger.debug(
+ "Connection %s commit-on-return",
+ self.dbapi_connection,
+ )
+ pool._dialect.do_commit(self)
+
+ @property
+ def _logger(self) -> log._IdentifiedLoggerType:
+ return self._pool.logger
+
+ @property
+ def is_valid(self) -> bool:
+ return self.dbapi_connection is not None
+
+ @property
+ def is_detached(self) -> bool:
+ return self._connection_record is None
+
+ @util.ro_memoized_property
+ def info(self) -> _InfoType:
+ if self._connection_record is None:
+ return {}
+ else:
+ return self._connection_record.info
+
+ @util.ro_non_memoized_property
+ def record_info(self) -> Optional[_InfoType]:
+ if self._connection_record is None:
+ return None
+ else:
+ return self._connection_record.record_info
+
+ def invalidate(
+ self, e: Optional[BaseException] = None, soft: bool = False
+ ) -> None:
+ if self.dbapi_connection is None:
+ util.warn("Can't invalidate an already-closed connection.")
+ return
+ if self._connection_record:
+ self._connection_record.invalidate(e=e, soft=soft)
+ if not soft:
+ # prevent any rollback / reset actions etc. on
+ # the connection
+ self.dbapi_connection = None # type: ignore
+
+ # finalize
+ self._checkin()
+
+ def cursor(self, *args: Any, **kwargs: Any) -> DBAPICursor:
+ assert self.dbapi_connection is not None
+ return self.dbapi_connection.cursor(*args, **kwargs)
+
+ def __getattr__(self, key: str) -> Any:
+ return getattr(self.dbapi_connection, key)
+
+ def detach(self) -> None:
+ if self._connection_record is not None:
+ rec = self._connection_record
+ rec.fairy_ref = None
+ rec.dbapi_connection = None
+ # TODO: should this be _return_conn?
+ self._pool._do_return_conn(self._connection_record)
+
+ # can't get the descriptor assignment to work here
+ # in pylance. mypy is OK w/ it
+ self.info = self.info.copy() # type: ignore
+
+ self._connection_record = None
+
+ if self._pool.dispatch.detach:
+ self._pool.dispatch.detach(self.dbapi_connection, rec)
+
+ def close(self) -> None:
+ self._counter -= 1
+ if self._counter == 0:
+ self._checkin()
+
+ def _close_special(self, transaction_reset: bool = False) -> None:
+ self._counter -= 1
+ if self._counter == 0:
+ self._checkin(transaction_was_reset=transaction_reset)