diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py | 3377 |
1 files changed, 0 insertions, 3377 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py deleted file mode 100644 index 403ec45..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py +++ /dev/null @@ -1,3377 +0,0 @@ -# engine/base.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`. - -""" -from __future__ import annotations - -import contextlib -import sys -import typing -from typing import Any -from typing import Callable -from typing import cast -from typing import Iterable -from typing import Iterator -from typing import List -from typing import Mapping -from typing import NoReturn -from typing import Optional -from typing import overload -from typing import Tuple -from typing import Type -from typing import TypeVar -from typing import Union - -from .interfaces import BindTyping -from .interfaces import ConnectionEventsTarget -from .interfaces import DBAPICursor -from .interfaces import ExceptionContext -from .interfaces import ExecuteStyle -from .interfaces import ExecutionContext -from .interfaces import IsolationLevel -from .util import _distill_params_20 -from .util import _distill_raw_params -from .util import TransactionalContext -from .. import exc -from .. import inspection -from .. import log -from .. import util -from ..sql import compiler -from ..sql import util as sql_util - -if typing.TYPE_CHECKING: - from . import CursorResult - from . import ScalarResult - from .interfaces import _AnyExecuteParams - from .interfaces import _AnyMultiExecuteParams - from .interfaces import _CoreAnyExecuteParams - from .interfaces import _CoreMultiExecuteParams - from .interfaces import _CoreSingleExecuteParams - from .interfaces import _DBAPIAnyExecuteParams - from .interfaces import _DBAPISingleExecuteParams - from .interfaces import _ExecuteOptions - from .interfaces import CompiledCacheType - from .interfaces import CoreExecuteOptionsParameter - from .interfaces import Dialect - from .interfaces import SchemaTranslateMapType - from .reflection import Inspector # noqa - from .url import URL - from ..event import dispatcher - from ..log import _EchoFlagType - from ..pool import _ConnectionFairy - from ..pool import Pool - from ..pool import PoolProxiedConnection - from ..sql import Executable - from ..sql._typing import _InfoType - from ..sql.compiler import Compiled - from ..sql.ddl import ExecutableDDLElement - from ..sql.ddl import SchemaDropper - from ..sql.ddl import SchemaGenerator - from ..sql.functions import FunctionElement - from ..sql.schema import DefaultGenerator - from ..sql.schema import HasSchemaAttr - from ..sql.schema import SchemaItem - from ..sql.selectable import TypedReturnsRows - - -_T = TypeVar("_T", bound=Any) -_EMPTY_EXECUTION_OPTS: _ExecuteOptions = util.EMPTY_DICT -NO_OPTIONS: Mapping[str, Any] = util.EMPTY_DICT - - -class Connection(ConnectionEventsTarget, inspection.Inspectable["Inspector"]): - """Provides high-level functionality for a wrapped DB-API connection. - - The :class:`_engine.Connection` object is procured by calling the - :meth:`_engine.Engine.connect` method of the :class:`_engine.Engine` - object, and provides services for execution of SQL statements as well - as transaction control. - - The Connection object is **not** thread-safe. While a Connection can be - shared among threads using properly synchronized access, it is still - possible that the underlying DBAPI connection may not support shared - access between threads. Check the DBAPI documentation for details. - - The Connection object represents a single DBAPI connection checked out - from the connection pool. In this state, the connection pool has no - affect upon the connection, including its expiration or timeout state. - For the connection pool to properly manage connections, connections - should be returned to the connection pool (i.e. ``connection.close()``) - whenever the connection is not in use. - - .. index:: - single: thread safety; Connection - - """ - - dialect: Dialect - dispatch: dispatcher[ConnectionEventsTarget] - - _sqla_logger_namespace = "sqlalchemy.engine.Connection" - - # used by sqlalchemy.engine.util.TransactionalContext - _trans_context_manager: Optional[TransactionalContext] = None - - # legacy as of 2.0, should be eventually deprecated and - # removed. was used in the "pre_ping" recipe that's been in the docs - # a long time - should_close_with_result = False - - _dbapi_connection: Optional[PoolProxiedConnection] - - _execution_options: _ExecuteOptions - - _transaction: Optional[RootTransaction] - _nested_transaction: Optional[NestedTransaction] - - def __init__( - self, - engine: Engine, - connection: Optional[PoolProxiedConnection] = None, - _has_events: Optional[bool] = None, - _allow_revalidate: bool = True, - _allow_autobegin: bool = True, - ): - """Construct a new Connection.""" - self.engine = engine - self.dialect = dialect = engine.dialect - - if connection is None: - try: - self._dbapi_connection = engine.raw_connection() - except dialect.loaded_dbapi.Error as err: - Connection._handle_dbapi_exception_noconnection( - err, dialect, engine - ) - raise - else: - self._dbapi_connection = connection - - self._transaction = self._nested_transaction = None - self.__savepoint_seq = 0 - self.__in_begin = False - - self.__can_reconnect = _allow_revalidate - self._allow_autobegin = _allow_autobegin - self._echo = self.engine._should_log_info() - - if _has_events is None: - # if _has_events is sent explicitly as False, - # then don't join the dispatch of the engine; we don't - # want to handle any of the engine's events in that case. - self.dispatch = self.dispatch._join(engine.dispatch) - self._has_events = _has_events or ( - _has_events is None and engine._has_events - ) - - self._execution_options = engine._execution_options - - if self._has_events or self.engine._has_events: - self.dispatch.engine_connect(self) - - @util.memoized_property - def _message_formatter(self) -> Any: - if "logging_token" in self._execution_options: - token = self._execution_options["logging_token"] - return lambda msg: "[%s] %s" % (token, msg) - else: - return None - - def _log_info(self, message: str, *arg: Any, **kw: Any) -> None: - fmt = self._message_formatter - - if fmt: - message = fmt(message) - - if log.STACKLEVEL: - kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET - - self.engine.logger.info(message, *arg, **kw) - - def _log_debug(self, message: str, *arg: Any, **kw: Any) -> None: - fmt = self._message_formatter - - if fmt: - message = fmt(message) - - if log.STACKLEVEL: - kw["stacklevel"] = 1 + log.STACKLEVEL_OFFSET - - self.engine.logger.debug(message, *arg, **kw) - - @property - def _schema_translate_map(self) -> Optional[SchemaTranslateMapType]: - schema_translate_map: Optional[SchemaTranslateMapType] = ( - self._execution_options.get("schema_translate_map", None) - ) - - return schema_translate_map - - def schema_for_object(self, obj: HasSchemaAttr) -> Optional[str]: - """Return the schema name for the given schema item taking into - account current schema translate map. - - """ - - name = obj.schema - schema_translate_map: Optional[SchemaTranslateMapType] = ( - self._execution_options.get("schema_translate_map", None) - ) - - if ( - schema_translate_map - and name in schema_translate_map - and obj._use_schema_map - ): - return schema_translate_map[name] - else: - return name - - def __enter__(self) -> Connection: - return self - - def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: - self.close() - - @overload - def execution_options( - self, - *, - compiled_cache: Optional[CompiledCacheType] = ..., - logging_token: str = ..., - isolation_level: IsolationLevel = ..., - no_parameters: bool = False, - stream_results: bool = False, - max_row_buffer: int = ..., - yield_per: int = ..., - insertmanyvalues_page_size: int = ..., - schema_translate_map: Optional[SchemaTranslateMapType] = ..., - preserve_rowcount: bool = False, - **opt: Any, - ) -> Connection: ... - - @overload - def execution_options(self, **opt: Any) -> Connection: ... - - def execution_options(self, **opt: Any) -> Connection: - r"""Set non-SQL options for the connection which take effect - during execution. - - This method modifies this :class:`_engine.Connection` **in-place**; - the return value is the same :class:`_engine.Connection` object - upon which the method is called. Note that this is in contrast - to the behavior of the ``execution_options`` methods on other - objects such as :meth:`_engine.Engine.execution_options` and - :meth:`_sql.Executable.execution_options`. The rationale is that many - such execution options necessarily modify the state of the base - DBAPI connection in any case so there is no feasible means of - keeping the effect of such an option localized to a "sub" connection. - - .. versionchanged:: 2.0 The :meth:`_engine.Connection.execution_options` - method, in contrast to other objects with this method, modifies - the connection in-place without creating copy of it. - - As discussed elsewhere, the :meth:`_engine.Connection.execution_options` - method accepts any arbitrary parameters including user defined names. - All parameters given are consumable in a number of ways including - by using the :meth:`_engine.Connection.get_execution_options` method. - See the examples at :meth:`_sql.Executable.execution_options` - and :meth:`_engine.Engine.execution_options`. - - The keywords that are currently recognized by SQLAlchemy itself - include all those listed under :meth:`.Executable.execution_options`, - as well as others that are specific to :class:`_engine.Connection`. - - :param compiled_cache: Available on: :class:`_engine.Connection`, - :class:`_engine.Engine`. - - A dictionary where :class:`.Compiled` objects - will be cached when the :class:`_engine.Connection` - compiles a clause - expression into a :class:`.Compiled` object. This dictionary will - supersede the statement cache that may be configured on the - :class:`_engine.Engine` itself. If set to None, caching - is disabled, even if the engine has a configured cache size. - - Note that the ORM makes use of its own "compiled" caches for - some operations, including flush operations. The caching - used by the ORM internally supersedes a cache dictionary - specified here. - - :param logging_token: Available on: :class:`_engine.Connection`, - :class:`_engine.Engine`, :class:`_sql.Executable`. - - Adds the specified string token surrounded by brackets in log - messages logged by the connection, i.e. the logging that's enabled - either via the :paramref:`_sa.create_engine.echo` flag or via the - ``logging.getLogger("sqlalchemy.engine")`` logger. This allows a - per-connection or per-sub-engine token to be available which is - useful for debugging concurrent connection scenarios. - - .. versionadded:: 1.4.0b2 - - .. seealso:: - - :ref:`dbengine_logging_tokens` - usage example - - :paramref:`_sa.create_engine.logging_name` - adds a name to the - name used by the Python logger object itself. - - :param isolation_level: Available on: :class:`_engine.Connection`, - :class:`_engine.Engine`. - - Set the transaction isolation level for the lifespan of this - :class:`_engine.Connection` object. - Valid values include those string - values accepted by the :paramref:`_sa.create_engine.isolation_level` - parameter passed to :func:`_sa.create_engine`. These levels are - semi-database specific; see individual dialect documentation for - valid levels. - - The isolation level option applies the isolation level by emitting - statements on the DBAPI connection, and **necessarily affects the - original Connection object overall**. The isolation level will remain - at the given setting until explicitly changed, or when the DBAPI - connection itself is :term:`released` to the connection pool, i.e. the - :meth:`_engine.Connection.close` method is called, at which time an - event handler will emit additional statements on the DBAPI connection - in order to revert the isolation level change. - - .. note:: The ``isolation_level`` execution option may only be - established before the :meth:`_engine.Connection.begin` method is - called, as well as before any SQL statements are emitted which - would otherwise trigger "autobegin", or directly after a call to - :meth:`_engine.Connection.commit` or - :meth:`_engine.Connection.rollback`. A database cannot change the - isolation level on a transaction in progress. - - .. note:: The ``isolation_level`` execution option is implicitly - reset if the :class:`_engine.Connection` is invalidated, e.g. via - the :meth:`_engine.Connection.invalidate` method, or if a - disconnection error occurs. The new connection produced after the - invalidation will **not** have the selected isolation level - re-applied to it automatically. - - .. seealso:: - - :ref:`dbapi_autocommit` - - :meth:`_engine.Connection.get_isolation_level` - - view current actual level - - :param no_parameters: Available on: :class:`_engine.Connection`, - :class:`_sql.Executable`. - - When ``True``, if the final parameter - list or dictionary is totally empty, will invoke the - statement on the cursor as ``cursor.execute(statement)``, - not passing the parameter collection at all. - Some DBAPIs such as psycopg2 and mysql-python consider - percent signs as significant only when parameters are - present; this option allows code to generate SQL - containing percent signs (and possibly other characters) - that is neutral regarding whether it's executed by the DBAPI - or piped into a script that's later invoked by - command line tools. - - :param stream_results: Available on: :class:`_engine.Connection`, - :class:`_sql.Executable`. - - Indicate to the dialect that results should be - "streamed" and not pre-buffered, if possible. For backends - such as PostgreSQL, MySQL and MariaDB, this indicates the use of - a "server side cursor" as opposed to a client side cursor. - Other backends such as that of Oracle may already use server - side cursors by default. - - The usage of - :paramref:`_engine.Connection.execution_options.stream_results` is - usually combined with setting a fixed number of rows to to be fetched - in batches, to allow for efficient iteration of database rows while - at the same time not loading all result rows into memory at once; - this can be configured on a :class:`_engine.Result` object using the - :meth:`_engine.Result.yield_per` method, after execution has - returned a new :class:`_engine.Result`. If - :meth:`_engine.Result.yield_per` is not used, - the :paramref:`_engine.Connection.execution_options.stream_results` - mode of operation will instead use a dynamically sized buffer - which buffers sets of rows at a time, growing on each batch - based on a fixed growth size up until a limit which may - be configured using the - :paramref:`_engine.Connection.execution_options.max_row_buffer` - parameter. - - When using the ORM to fetch ORM mapped objects from a result, - :meth:`_engine.Result.yield_per` should always be used with - :paramref:`_engine.Connection.execution_options.stream_results`, - so that the ORM does not fetch all rows into new ORM objects at once. - - For typical use, the - :paramref:`_engine.Connection.execution_options.yield_per` execution - option should be preferred, which sets up both - :paramref:`_engine.Connection.execution_options.stream_results` and - :meth:`_engine.Result.yield_per` at once. This option is supported - both at a core level by :class:`_engine.Connection` as well as by the - ORM :class:`_engine.Session`; the latter is described at - :ref:`orm_queryguide_yield_per`. - - .. seealso:: - - :ref:`engine_stream_results` - background on - :paramref:`_engine.Connection.execution_options.stream_results` - - :paramref:`_engine.Connection.execution_options.max_row_buffer` - - :paramref:`_engine.Connection.execution_options.yield_per` - - :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` - describing the ORM version of ``yield_per`` - - :param max_row_buffer: Available on: :class:`_engine.Connection`, - :class:`_sql.Executable`. Sets a maximum - buffer size to use when the - :paramref:`_engine.Connection.execution_options.stream_results` - execution option is used on a backend that supports server side - cursors. The default value if not specified is 1000. - - .. seealso:: - - :paramref:`_engine.Connection.execution_options.stream_results` - - :ref:`engine_stream_results` - - - :param yield_per: Available on: :class:`_engine.Connection`, - :class:`_sql.Executable`. Integer value applied which will - set the :paramref:`_engine.Connection.execution_options.stream_results` - execution option and invoke :meth:`_engine.Result.yield_per` - automatically at once. Allows equivalent functionality as - is present when using this parameter with the ORM. - - .. versionadded:: 1.4.40 - - .. seealso:: - - :ref:`engine_stream_results` - background and examples - on using server side cursors with Core. - - :ref:`orm_queryguide_yield_per` - in the :ref:`queryguide_toplevel` - describing the ORM version of ``yield_per`` - - :param insertmanyvalues_page_size: Available on: :class:`_engine.Connection`, - :class:`_engine.Engine`. Number of rows to format into an - INSERT statement when the statement uses "insertmanyvalues" mode, - which is a paged form of bulk insert that is used for many backends - when using :term:`executemany` execution typically in conjunction - with RETURNING. Defaults to 1000. May also be modified on a - per-engine basis using the - :paramref:`_sa.create_engine.insertmanyvalues_page_size` parameter. - - .. versionadded:: 2.0 - - .. seealso:: - - :ref:`engine_insertmanyvalues` - - :param schema_translate_map: Available on: :class:`_engine.Connection`, - :class:`_engine.Engine`, :class:`_sql.Executable`. - - A dictionary mapping schema names to schema names, that will be - applied to the :paramref:`_schema.Table.schema` element of each - :class:`_schema.Table` - encountered when SQL or DDL expression elements - are compiled into strings; the resulting schema name will be - converted based on presence in the map of the original name. - - .. seealso:: - - :ref:`schema_translating` - - :param preserve_rowcount: Boolean; when True, the ``cursor.rowcount`` - attribute will be unconditionally memoized within the result and - made available via the :attr:`.CursorResult.rowcount` attribute. - Normally, this attribute is only preserved for UPDATE and DELETE - statements. Using this option, the DBAPIs rowcount value can - be accessed for other kinds of statements such as INSERT and SELECT, - to the degree that the DBAPI supports these statements. See - :attr:`.CursorResult.rowcount` for notes regarding the behavior - of this attribute. - - .. versionadded:: 2.0.28 - - .. seealso:: - - :meth:`_engine.Engine.execution_options` - - :meth:`.Executable.execution_options` - - :meth:`_engine.Connection.get_execution_options` - - :ref:`orm_queryguide_execution_options` - documentation on all - ORM-specific execution options - - """ # noqa - if self._has_events or self.engine._has_events: - self.dispatch.set_connection_execution_options(self, opt) - self._execution_options = self._execution_options.union(opt) - self.dialect.set_connection_execution_options(self, opt) - return self - - def get_execution_options(self) -> _ExecuteOptions: - """Get the non-SQL options which will take effect during execution. - - .. versionadded:: 1.3 - - .. seealso:: - - :meth:`_engine.Connection.execution_options` - """ - return self._execution_options - - @property - def _still_open_and_dbapi_connection_is_valid(self) -> bool: - pool_proxied_connection = self._dbapi_connection - return ( - pool_proxied_connection is not None - and pool_proxied_connection.is_valid - ) - - @property - def closed(self) -> bool: - """Return True if this connection is closed.""" - - return self._dbapi_connection is None and not self.__can_reconnect - - @property - def invalidated(self) -> bool: - """Return True if this connection was invalidated. - - This does not indicate whether or not the connection was - invalidated at the pool level, however - - """ - - # prior to 1.4, "invalid" was stored as a state independent of - # "closed", meaning an invalidated connection could be "closed", - # the _dbapi_connection would be None and closed=True, yet the - # "invalid" flag would stay True. This meant that there were - # three separate states (open/valid, closed/valid, closed/invalid) - # when there is really no reason for that; a connection that's - # "closed" does not need to be "invalid". So the state is now - # represented by the two facts alone. - - pool_proxied_connection = self._dbapi_connection - return pool_proxied_connection is None and self.__can_reconnect - - @property - def connection(self) -> PoolProxiedConnection: - """The underlying DB-API connection managed by this Connection. - - This is a SQLAlchemy connection-pool proxied connection - which then has the attribute - :attr:`_pool._ConnectionFairy.dbapi_connection` that refers to the - actual driver connection. - - .. seealso:: - - - :ref:`dbapi_connections` - - """ - - if self._dbapi_connection is None: - try: - return self._revalidate_connection() - except (exc.PendingRollbackError, exc.ResourceClosedError): - raise - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - else: - return self._dbapi_connection - - def get_isolation_level(self) -> IsolationLevel: - """Return the current **actual** isolation level that's present on - the database within the scope of this connection. - - This attribute will perform a live SQL operation against the database - in order to procure the current isolation level, so the value returned - is the actual level on the underlying DBAPI connection regardless of - how this state was set. This will be one of the four actual isolation - modes ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, - ``SERIALIZABLE``. It will **not** include the ``AUTOCOMMIT`` isolation - level setting. Third party dialects may also feature additional - isolation level settings. - - .. note:: This method **will not report** on the ``AUTOCOMMIT`` - isolation level, which is a separate :term:`dbapi` setting that's - independent of **actual** isolation level. When ``AUTOCOMMIT`` is - in use, the database connection still has a "traditional" isolation - mode in effect, that is typically one of the four values - ``READ UNCOMMITTED``, ``READ COMMITTED``, ``REPEATABLE READ``, - ``SERIALIZABLE``. - - Compare to the :attr:`_engine.Connection.default_isolation_level` - accessor which returns the isolation level that is present on the - database at initial connection time. - - .. seealso:: - - :attr:`_engine.Connection.default_isolation_level` - - view default level - - :paramref:`_sa.create_engine.isolation_level` - - set per :class:`_engine.Engine` isolation level - - :paramref:`.Connection.execution_options.isolation_level` - - set per :class:`_engine.Connection` isolation level - - """ - dbapi_connection = self.connection.dbapi_connection - assert dbapi_connection is not None - try: - return self.dialect.get_isolation_level(dbapi_connection) - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - - @property - def default_isolation_level(self) -> Optional[IsolationLevel]: - """The initial-connection time isolation level associated with the - :class:`_engine.Dialect` in use. - - This value is independent of the - :paramref:`.Connection.execution_options.isolation_level` and - :paramref:`.Engine.execution_options.isolation_level` execution - options, and is determined by the :class:`_engine.Dialect` when the - first connection is created, by performing a SQL query against the - database for the current isolation level before any additional commands - have been emitted. - - Calling this accessor does not invoke any new SQL queries. - - .. seealso:: - - :meth:`_engine.Connection.get_isolation_level` - - view current actual isolation level - - :paramref:`_sa.create_engine.isolation_level` - - set per :class:`_engine.Engine` isolation level - - :paramref:`.Connection.execution_options.isolation_level` - - set per :class:`_engine.Connection` isolation level - - """ - return self.dialect.default_isolation_level - - def _invalid_transaction(self) -> NoReturn: - raise exc.PendingRollbackError( - "Can't reconnect until invalid %stransaction is rolled " - "back. Please rollback() fully before proceeding" - % ("savepoint " if self._nested_transaction is not None else ""), - code="8s2b", - ) - - def _revalidate_connection(self) -> PoolProxiedConnection: - if self.__can_reconnect and self.invalidated: - if self._transaction is not None: - self._invalid_transaction() - self._dbapi_connection = self.engine.raw_connection() - return self._dbapi_connection - raise exc.ResourceClosedError("This Connection is closed") - - @property - def info(self) -> _InfoType: - """Info dictionary associated with the underlying DBAPI connection - referred to by this :class:`_engine.Connection`, allowing user-defined - data to be associated with the connection. - - The data here will follow along with the DBAPI connection including - after it is returned to the connection pool and used again - in subsequent instances of :class:`_engine.Connection`. - - """ - - return self.connection.info - - def invalidate(self, exception: Optional[BaseException] = None) -> None: - """Invalidate the underlying DBAPI connection associated with - this :class:`_engine.Connection`. - - An attempt will be made to close the underlying DBAPI connection - immediately; however if this operation fails, the error is logged - but not raised. The connection is then discarded whether or not - close() succeeded. - - Upon the next use (where "use" typically means using the - :meth:`_engine.Connection.execute` method or similar), - this :class:`_engine.Connection` will attempt to - procure a new DBAPI connection using the services of the - :class:`_pool.Pool` as a source of connectivity (e.g. - a "reconnection"). - - If a transaction was in progress (e.g. the - :meth:`_engine.Connection.begin` method has been called) when - :meth:`_engine.Connection.invalidate` method is called, at the DBAPI - level all state associated with this transaction is lost, as - the DBAPI connection is closed. The :class:`_engine.Connection` - will not allow a reconnection to proceed until the - :class:`.Transaction` object is ended, by calling the - :meth:`.Transaction.rollback` method; until that point, any attempt at - continuing to use the :class:`_engine.Connection` will raise an - :class:`~sqlalchemy.exc.InvalidRequestError`. - This is to prevent applications from accidentally - continuing an ongoing transactional operations despite the - fact that the transaction has been lost due to an - invalidation. - - The :meth:`_engine.Connection.invalidate` method, - just like auto-invalidation, - will at the connection pool level invoke the - :meth:`_events.PoolEvents.invalidate` event. - - :param exception: an optional ``Exception`` instance that's the - reason for the invalidation. is passed along to event handlers - and logging functions. - - .. seealso:: - - :ref:`pool_connection_invalidation` - - """ - - if self.invalidated: - return - - if self.closed: - raise exc.ResourceClosedError("This Connection is closed") - - if self._still_open_and_dbapi_connection_is_valid: - pool_proxied_connection = self._dbapi_connection - assert pool_proxied_connection is not None - pool_proxied_connection.invalidate(exception) - - self._dbapi_connection = None - - def detach(self) -> None: - """Detach the underlying DB-API connection from its connection pool. - - E.g.:: - - with engine.connect() as conn: - conn.detach() - conn.execute(text("SET search_path TO schema1, schema2")) - - # work with connection - - # connection is fully closed (since we used "with:", can - # also call .close()) - - This :class:`_engine.Connection` instance will remain usable. - When closed - (or exited from a context manager context as above), - the DB-API connection will be literally closed and not - returned to its originating pool. - - This method can be used to insulate the rest of an application - from a modified state on a connection (such as a transaction - isolation level or similar). - - """ - - if self.closed: - raise exc.ResourceClosedError("This Connection is closed") - - pool_proxied_connection = self._dbapi_connection - if pool_proxied_connection is None: - raise exc.InvalidRequestError( - "Can't detach an invalidated Connection" - ) - pool_proxied_connection.detach() - - def _autobegin(self) -> None: - if self._allow_autobegin and not self.__in_begin: - self.begin() - - def begin(self) -> RootTransaction: - """Begin a transaction prior to autobegin occurring. - - E.g.:: - - with engine.connect() as conn: - with conn.begin() as trans: - conn.execute(table.insert(), {"username": "sandy"}) - - - The returned object is an instance of :class:`_engine.RootTransaction`. - This object represents the "scope" of the transaction, - which completes when either the :meth:`_engine.Transaction.rollback` - or :meth:`_engine.Transaction.commit` method is called; the object - also works as a context manager as illustrated above. - - The :meth:`_engine.Connection.begin` method begins a - transaction that normally will be begun in any case when the connection - is first used to execute a statement. The reason this method might be - used would be to invoke the :meth:`_events.ConnectionEvents.begin` - event at a specific time, or to organize code within the scope of a - connection checkout in terms of context managed blocks, such as:: - - with engine.connect() as conn: - with conn.begin(): - conn.execute(...) - conn.execute(...) - - with conn.begin(): - conn.execute(...) - conn.execute(...) - - The above code is not fundamentally any different in its behavior than - the following code which does not use - :meth:`_engine.Connection.begin`; the below style is known - as "commit as you go" style:: - - with engine.connect() as conn: - conn.execute(...) - conn.execute(...) - conn.commit() - - conn.execute(...) - conn.execute(...) - conn.commit() - - From a database point of view, the :meth:`_engine.Connection.begin` - method does not emit any SQL or change the state of the underlying - DBAPI connection in any way; the Python DBAPI does not have any - concept of explicit transaction begin. - - .. seealso:: - - :ref:`tutorial_working_with_transactions` - in the - :ref:`unified_tutorial` - - :meth:`_engine.Connection.begin_nested` - use a SAVEPOINT - - :meth:`_engine.Connection.begin_twophase` - - use a two phase /XID transaction - - :meth:`_engine.Engine.begin` - context manager available from - :class:`_engine.Engine` - - """ - if self._transaction is None: - self._transaction = RootTransaction(self) - return self._transaction - else: - raise exc.InvalidRequestError( - "This connection has already initialized a SQLAlchemy " - "Transaction() object via begin() or autobegin; can't " - "call begin() here unless rollback() or commit() " - "is called first." - ) - - def begin_nested(self) -> NestedTransaction: - """Begin a nested transaction (i.e. SAVEPOINT) and return a transaction - handle that controls the scope of the SAVEPOINT. - - E.g.:: - - with engine.begin() as connection: - with connection.begin_nested(): - connection.execute(table.insert(), {"username": "sandy"}) - - The returned object is an instance of - :class:`_engine.NestedTransaction`, which includes transactional - methods :meth:`_engine.NestedTransaction.commit` and - :meth:`_engine.NestedTransaction.rollback`; for a nested transaction, - these methods correspond to the operations "RELEASE SAVEPOINT <name>" - and "ROLLBACK TO SAVEPOINT <name>". The name of the savepoint is local - to the :class:`_engine.NestedTransaction` object and is generated - automatically. Like any other :class:`_engine.Transaction`, the - :class:`_engine.NestedTransaction` may be used as a context manager as - illustrated above which will "release" or "rollback" corresponding to - if the operation within the block were successful or raised an - exception. - - Nested transactions require SAVEPOINT support in the underlying - database, else the behavior is undefined. SAVEPOINT is commonly used to - run operations within a transaction that may fail, while continuing the - outer transaction. E.g.:: - - from sqlalchemy import exc - - with engine.begin() as connection: - trans = connection.begin_nested() - try: - connection.execute(table.insert(), {"username": "sandy"}) - trans.commit() - except exc.IntegrityError: # catch for duplicate username - trans.rollback() # rollback to savepoint - - # outer transaction continues - connection.execute( ... ) - - If :meth:`_engine.Connection.begin_nested` is called without first - calling :meth:`_engine.Connection.begin` or - :meth:`_engine.Engine.begin`, the :class:`_engine.Connection` object - will "autobegin" the outer transaction first. This outer transaction - may be committed using "commit-as-you-go" style, e.g.:: - - with engine.connect() as connection: # begin() wasn't called - - with connection.begin_nested(): will auto-"begin()" first - connection.execute( ... ) - # savepoint is released - - connection.execute( ... ) - - # explicitly commit outer transaction - connection.commit() - - # can continue working with connection here - - .. versionchanged:: 2.0 - - :meth:`_engine.Connection.begin_nested` will now participate - in the connection "autobegin" behavior that is new as of - 2.0 / "future" style connections in 1.4. - - .. seealso:: - - :meth:`_engine.Connection.begin` - - :ref:`session_begin_nested` - ORM support for SAVEPOINT - - """ - if self._transaction is None: - self._autobegin() - - return NestedTransaction(self) - - def begin_twophase(self, xid: Optional[Any] = None) -> TwoPhaseTransaction: - """Begin a two-phase or XA transaction and return a transaction - handle. - - The returned object is an instance of :class:`.TwoPhaseTransaction`, - which in addition to the methods provided by - :class:`.Transaction`, also provides a - :meth:`~.TwoPhaseTransaction.prepare` method. - - :param xid: the two phase transaction id. If not supplied, a - random id will be generated. - - .. seealso:: - - :meth:`_engine.Connection.begin` - - :meth:`_engine.Connection.begin_twophase` - - """ - - if self._transaction is not None: - raise exc.InvalidRequestError( - "Cannot start a two phase transaction when a transaction " - "is already in progress." - ) - if xid is None: - xid = self.engine.dialect.create_xid() - return TwoPhaseTransaction(self, xid) - - def commit(self) -> None: - """Commit the transaction that is currently in progress. - - This method commits the current transaction if one has been started. - If no transaction was started, the method has no effect, assuming - the connection is in a non-invalidated state. - - A transaction is begun on a :class:`_engine.Connection` automatically - whenever a statement is first executed, or when the - :meth:`_engine.Connection.begin` method is called. - - .. note:: The :meth:`_engine.Connection.commit` method only acts upon - the primary database transaction that is linked to the - :class:`_engine.Connection` object. It does not operate upon a - SAVEPOINT that would have been invoked from the - :meth:`_engine.Connection.begin_nested` method; for control of a - SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the - :class:`_engine.NestedTransaction` that is returned by the - :meth:`_engine.Connection.begin_nested` method itself. - - - """ - if self._transaction: - self._transaction.commit() - - def rollback(self) -> None: - """Roll back the transaction that is currently in progress. - - This method rolls back the current transaction if one has been started. - If no transaction was started, the method has no effect. If a - transaction was started and the connection is in an invalidated state, - the transaction is cleared using this method. - - A transaction is begun on a :class:`_engine.Connection` automatically - whenever a statement is first executed, or when the - :meth:`_engine.Connection.begin` method is called. - - .. note:: The :meth:`_engine.Connection.rollback` method only acts - upon the primary database transaction that is linked to the - :class:`_engine.Connection` object. It does not operate upon a - SAVEPOINT that would have been invoked from the - :meth:`_engine.Connection.begin_nested` method; for control of a - SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the - :class:`_engine.NestedTransaction` that is returned by the - :meth:`_engine.Connection.begin_nested` method itself. - - - """ - if self._transaction: - self._transaction.rollback() - - def recover_twophase(self) -> List[Any]: - return self.engine.dialect.do_recover_twophase(self) - - def rollback_prepared(self, xid: Any, recover: bool = False) -> None: - self.engine.dialect.do_rollback_twophase(self, xid, recover=recover) - - def commit_prepared(self, xid: Any, recover: bool = False) -> None: - self.engine.dialect.do_commit_twophase(self, xid, recover=recover) - - def in_transaction(self) -> bool: - """Return True if a transaction is in progress.""" - return self._transaction is not None and self._transaction.is_active - - def in_nested_transaction(self) -> bool: - """Return True if a transaction is in progress.""" - return ( - self._nested_transaction is not None - and self._nested_transaction.is_active - ) - - def _is_autocommit_isolation(self) -> bool: - opt_iso = self._execution_options.get("isolation_level", None) - return bool( - opt_iso == "AUTOCOMMIT" - or ( - opt_iso is None - and self.engine.dialect._on_connect_isolation_level - == "AUTOCOMMIT" - ) - ) - - def _get_required_transaction(self) -> RootTransaction: - trans = self._transaction - if trans is None: - raise exc.InvalidRequestError("connection is not in a transaction") - return trans - - def _get_required_nested_transaction(self) -> NestedTransaction: - trans = self._nested_transaction - if trans is None: - raise exc.InvalidRequestError( - "connection is not in a nested transaction" - ) - return trans - - def get_transaction(self) -> Optional[RootTransaction]: - """Return the current root transaction in progress, if any. - - .. versionadded:: 1.4 - - """ - - return self._transaction - - def get_nested_transaction(self) -> Optional[NestedTransaction]: - """Return the current nested transaction in progress, if any. - - .. versionadded:: 1.4 - - """ - return self._nested_transaction - - def _begin_impl(self, transaction: RootTransaction) -> None: - if self._echo: - if self._is_autocommit_isolation(): - self._log_info( - "BEGIN (implicit; DBAPI should not BEGIN due to " - "autocommit mode)" - ) - else: - self._log_info("BEGIN (implicit)") - - self.__in_begin = True - - if self._has_events or self.engine._has_events: - self.dispatch.begin(self) - - try: - self.engine.dialect.do_begin(self.connection) - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - finally: - self.__in_begin = False - - def _rollback_impl(self) -> None: - if self._has_events or self.engine._has_events: - self.dispatch.rollback(self) - - if self._still_open_and_dbapi_connection_is_valid: - if self._echo: - if self._is_autocommit_isolation(): - self._log_info( - "ROLLBACK using DBAPI connection.rollback(), " - "DBAPI should ignore due to autocommit mode" - ) - else: - self._log_info("ROLLBACK") - try: - self.engine.dialect.do_rollback(self.connection) - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - - def _commit_impl(self) -> None: - if self._has_events or self.engine._has_events: - self.dispatch.commit(self) - - if self._echo: - if self._is_autocommit_isolation(): - self._log_info( - "COMMIT using DBAPI connection.commit(), " - "DBAPI should ignore due to autocommit mode" - ) - else: - self._log_info("COMMIT") - try: - self.engine.dialect.do_commit(self.connection) - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - - def _savepoint_impl(self, name: Optional[str] = None) -> str: - if self._has_events or self.engine._has_events: - self.dispatch.savepoint(self, name) - - if name is None: - self.__savepoint_seq += 1 - name = "sa_savepoint_%s" % self.__savepoint_seq - self.engine.dialect.do_savepoint(self, name) - return name - - def _rollback_to_savepoint_impl(self, name: str) -> None: - if self._has_events or self.engine._has_events: - self.dispatch.rollback_savepoint(self, name, None) - - if self._still_open_and_dbapi_connection_is_valid: - self.engine.dialect.do_rollback_to_savepoint(self, name) - - def _release_savepoint_impl(self, name: str) -> None: - if self._has_events or self.engine._has_events: - self.dispatch.release_savepoint(self, name, None) - - self.engine.dialect.do_release_savepoint(self, name) - - def _begin_twophase_impl(self, transaction: TwoPhaseTransaction) -> None: - if self._echo: - self._log_info("BEGIN TWOPHASE (implicit)") - if self._has_events or self.engine._has_events: - self.dispatch.begin_twophase(self, transaction.xid) - - self.__in_begin = True - try: - self.engine.dialect.do_begin_twophase(self, transaction.xid) - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - finally: - self.__in_begin = False - - def _prepare_twophase_impl(self, xid: Any) -> None: - if self._has_events or self.engine._has_events: - self.dispatch.prepare_twophase(self, xid) - - assert isinstance(self._transaction, TwoPhaseTransaction) - try: - self.engine.dialect.do_prepare_twophase(self, xid) - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - - def _rollback_twophase_impl(self, xid: Any, is_prepared: bool) -> None: - if self._has_events or self.engine._has_events: - self.dispatch.rollback_twophase(self, xid, is_prepared) - - if self._still_open_and_dbapi_connection_is_valid: - assert isinstance(self._transaction, TwoPhaseTransaction) - try: - self.engine.dialect.do_rollback_twophase( - self, xid, is_prepared - ) - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - - def _commit_twophase_impl(self, xid: Any, is_prepared: bool) -> None: - if self._has_events or self.engine._has_events: - self.dispatch.commit_twophase(self, xid, is_prepared) - - assert isinstance(self._transaction, TwoPhaseTransaction) - try: - self.engine.dialect.do_commit_twophase(self, xid, is_prepared) - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - - def close(self) -> None: - """Close this :class:`_engine.Connection`. - - This results in a release of the underlying database - resources, that is, the DBAPI connection referenced - internally. The DBAPI connection is typically restored - back to the connection-holding :class:`_pool.Pool` referenced - by the :class:`_engine.Engine` that produced this - :class:`_engine.Connection`. Any transactional state present on - the DBAPI connection is also unconditionally released via - the DBAPI connection's ``rollback()`` method, regardless - of any :class:`.Transaction` object that may be - outstanding with regards to this :class:`_engine.Connection`. - - This has the effect of also calling :meth:`_engine.Connection.rollback` - if any transaction is in place. - - After :meth:`_engine.Connection.close` is called, the - :class:`_engine.Connection` is permanently in a closed state, - and will allow no further operations. - - """ - - if self._transaction: - self._transaction.close() - skip_reset = True - else: - skip_reset = False - - if self._dbapi_connection is not None: - conn = self._dbapi_connection - - # as we just closed the transaction, close the connection - # pool connection without doing an additional reset - if skip_reset: - cast("_ConnectionFairy", conn)._close_special( - transaction_reset=True - ) - else: - conn.close() - - # There is a slight chance that conn.close() may have - # triggered an invalidation here in which case - # _dbapi_connection would already be None, however usually - # it will be non-None here and in a "closed" state. - self._dbapi_connection = None - self.__can_reconnect = False - - @overload - def scalar( - self, - statement: TypedReturnsRows[Tuple[_T]], - parameters: Optional[_CoreSingleExecuteParams] = None, - *, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> Optional[_T]: ... - - @overload - def scalar( - self, - statement: Executable, - parameters: Optional[_CoreSingleExecuteParams] = None, - *, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> Any: ... - - def scalar( - self, - statement: Executable, - parameters: Optional[_CoreSingleExecuteParams] = None, - *, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> Any: - r"""Executes a SQL statement construct and returns a scalar object. - - This method is shorthand for invoking the - :meth:`_engine.Result.scalar` method after invoking the - :meth:`_engine.Connection.execute` method. Parameters are equivalent. - - :return: a scalar Python value representing the first column of the - first row returned. - - """ - distilled_parameters = _distill_params_20(parameters) - try: - meth = statement._execute_on_scalar - except AttributeError as err: - raise exc.ObjectNotExecutableError(statement) from err - else: - return meth( - self, - distilled_parameters, - execution_options or NO_OPTIONS, - ) - - @overload - def scalars( - self, - statement: TypedReturnsRows[Tuple[_T]], - parameters: Optional[_CoreAnyExecuteParams] = None, - *, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> ScalarResult[_T]: ... - - @overload - def scalars( - self, - statement: Executable, - parameters: Optional[_CoreAnyExecuteParams] = None, - *, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> ScalarResult[Any]: ... - - def scalars( - self, - statement: Executable, - parameters: Optional[_CoreAnyExecuteParams] = None, - *, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> ScalarResult[Any]: - """Executes and returns a scalar result set, which yields scalar values - from the first column of each row. - - This method is equivalent to calling :meth:`_engine.Connection.execute` - to receive a :class:`_result.Result` object, then invoking the - :meth:`_result.Result.scalars` method to produce a - :class:`_result.ScalarResult` instance. - - :return: a :class:`_result.ScalarResult` - - .. versionadded:: 1.4.24 - - """ - - return self.execute( - statement, parameters, execution_options=execution_options - ).scalars() - - @overload - def execute( - self, - statement: TypedReturnsRows[_T], - parameters: Optional[_CoreAnyExecuteParams] = None, - *, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> CursorResult[_T]: ... - - @overload - def execute( - self, - statement: Executable, - parameters: Optional[_CoreAnyExecuteParams] = None, - *, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> CursorResult[Any]: ... - - def execute( - self, - statement: Executable, - parameters: Optional[_CoreAnyExecuteParams] = None, - *, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> CursorResult[Any]: - r"""Executes a SQL statement construct and returns a - :class:`_engine.CursorResult`. - - :param statement: The statement to be executed. This is always - an object that is in both the :class:`_expression.ClauseElement` and - :class:`_expression.Executable` hierarchies, including: - - * :class:`_expression.Select` - * :class:`_expression.Insert`, :class:`_expression.Update`, - :class:`_expression.Delete` - * :class:`_expression.TextClause` and - :class:`_expression.TextualSelect` - * :class:`_schema.DDL` and objects which inherit from - :class:`_schema.ExecutableDDLElement` - - :param parameters: parameters which will be bound into the statement. - This may be either a dictionary of parameter names to values, - or a mutable sequence (e.g. a list) of dictionaries. When a - list of dictionaries is passed, the underlying statement execution - will make use of the DBAPI ``cursor.executemany()`` method. - When a single dictionary is passed, the DBAPI ``cursor.execute()`` - method will be used. - - :param execution_options: optional dictionary of execution options, - which will be associated with the statement execution. This - dictionary can provide a subset of the options that are accepted - by :meth:`_engine.Connection.execution_options`. - - :return: a :class:`_engine.Result` object. - - """ - distilled_parameters = _distill_params_20(parameters) - try: - meth = statement._execute_on_connection - except AttributeError as err: - raise exc.ObjectNotExecutableError(statement) from err - else: - return meth( - self, - distilled_parameters, - execution_options or NO_OPTIONS, - ) - - def _execute_function( - self, - func: FunctionElement[Any], - distilled_parameters: _CoreMultiExecuteParams, - execution_options: CoreExecuteOptionsParameter, - ) -> CursorResult[Any]: - """Execute a sql.FunctionElement object.""" - - return self._execute_clauseelement( - func.select(), distilled_parameters, execution_options - ) - - def _execute_default( - self, - default: DefaultGenerator, - distilled_parameters: _CoreMultiExecuteParams, - execution_options: CoreExecuteOptionsParameter, - ) -> Any: - """Execute a schema.ColumnDefault object.""" - - execution_options = self._execution_options.merge_with( - execution_options - ) - - event_multiparams: Optional[_CoreMultiExecuteParams] - event_params: Optional[_CoreAnyExecuteParams] - - # note for event handlers, the "distilled parameters" which is always - # a list of dicts is broken out into separate "multiparams" and - # "params" collections, which allows the handler to distinguish - # between an executemany and execute style set of parameters. - if self._has_events or self.engine._has_events: - ( - default, - distilled_parameters, - event_multiparams, - event_params, - ) = self._invoke_before_exec_event( - default, distilled_parameters, execution_options - ) - else: - event_multiparams = event_params = None - - try: - conn = self._dbapi_connection - if conn is None: - conn = self._revalidate_connection() - - dialect = self.dialect - ctx = dialect.execution_ctx_cls._init_default( - dialect, self, conn, execution_options - ) - except (exc.PendingRollbackError, exc.ResourceClosedError): - raise - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) - - ret = ctx._exec_default(None, default, None) - - if self._has_events or self.engine._has_events: - self.dispatch.after_execute( - self, - default, - event_multiparams, - event_params, - execution_options, - ret, - ) - - return ret - - def _execute_ddl( - self, - ddl: ExecutableDDLElement, - distilled_parameters: _CoreMultiExecuteParams, - execution_options: CoreExecuteOptionsParameter, - ) -> CursorResult[Any]: - """Execute a schema.DDL object.""" - - exec_opts = ddl._execution_options.merge_with( - self._execution_options, execution_options - ) - - event_multiparams: Optional[_CoreMultiExecuteParams] - event_params: Optional[_CoreSingleExecuteParams] - - if self._has_events or self.engine._has_events: - ( - ddl, - distilled_parameters, - event_multiparams, - event_params, - ) = self._invoke_before_exec_event( - ddl, distilled_parameters, exec_opts - ) - else: - event_multiparams = event_params = None - - schema_translate_map = exec_opts.get("schema_translate_map", None) - - dialect = self.dialect - - compiled = ddl.compile( - dialect=dialect, schema_translate_map=schema_translate_map - ) - ret = self._execute_context( - dialect, - dialect.execution_ctx_cls._init_ddl, - compiled, - None, - exec_opts, - compiled, - ) - if self._has_events or self.engine._has_events: - self.dispatch.after_execute( - self, - ddl, - event_multiparams, - event_params, - exec_opts, - ret, - ) - return ret - - def _invoke_before_exec_event( - self, - elem: Any, - distilled_params: _CoreMultiExecuteParams, - execution_options: _ExecuteOptions, - ) -> Tuple[ - Any, - _CoreMultiExecuteParams, - _CoreMultiExecuteParams, - _CoreSingleExecuteParams, - ]: - event_multiparams: _CoreMultiExecuteParams - event_params: _CoreSingleExecuteParams - - if len(distilled_params) == 1: - event_multiparams, event_params = [], distilled_params[0] - else: - event_multiparams, event_params = distilled_params, {} - - for fn in self.dispatch.before_execute: - elem, event_multiparams, event_params = fn( - self, - elem, - event_multiparams, - event_params, - execution_options, - ) - - if event_multiparams: - distilled_params = list(event_multiparams) - if event_params: - raise exc.InvalidRequestError( - "Event handler can't return non-empty multiparams " - "and params at the same time" - ) - elif event_params: - distilled_params = [event_params] - else: - distilled_params = [] - - return elem, distilled_params, event_multiparams, event_params - - def _execute_clauseelement( - self, - elem: Executable, - distilled_parameters: _CoreMultiExecuteParams, - execution_options: CoreExecuteOptionsParameter, - ) -> CursorResult[Any]: - """Execute a sql.ClauseElement object.""" - - execution_options = elem._execution_options.merge_with( - self._execution_options, execution_options - ) - - has_events = self._has_events or self.engine._has_events - if has_events: - ( - elem, - distilled_parameters, - event_multiparams, - event_params, - ) = self._invoke_before_exec_event( - elem, distilled_parameters, execution_options - ) - - if distilled_parameters: - # ensure we don't retain a link to the view object for keys() - # which links to the values, which we don't want to cache - keys = sorted(distilled_parameters[0]) - for_executemany = len(distilled_parameters) > 1 - else: - keys = [] - for_executemany = False - - dialect = self.dialect - - schema_translate_map = execution_options.get( - "schema_translate_map", None - ) - - compiled_cache: Optional[CompiledCacheType] = execution_options.get( - "compiled_cache", self.engine._compiled_cache - ) - - compiled_sql, extracted_params, cache_hit = elem._compile_w_cache( - dialect=dialect, - compiled_cache=compiled_cache, - column_keys=keys, - for_executemany=for_executemany, - schema_translate_map=schema_translate_map, - linting=self.dialect.compiler_linting | compiler.WARN_LINTING, - ) - ret = self._execute_context( - dialect, - dialect.execution_ctx_cls._init_compiled, - compiled_sql, - distilled_parameters, - execution_options, - compiled_sql, - distilled_parameters, - elem, - extracted_params, - cache_hit=cache_hit, - ) - if has_events: - self.dispatch.after_execute( - self, - elem, - event_multiparams, - event_params, - execution_options, - ret, - ) - return ret - - def _execute_compiled( - self, - compiled: Compiled, - distilled_parameters: _CoreMultiExecuteParams, - execution_options: CoreExecuteOptionsParameter = _EMPTY_EXECUTION_OPTS, - ) -> CursorResult[Any]: - """Execute a sql.Compiled object. - - TODO: why do we have this? likely deprecate or remove - - """ - - execution_options = compiled.execution_options.merge_with( - self._execution_options, execution_options - ) - - if self._has_events or self.engine._has_events: - ( - compiled, - distilled_parameters, - event_multiparams, - event_params, - ) = self._invoke_before_exec_event( - compiled, distilled_parameters, execution_options - ) - - dialect = self.dialect - - ret = self._execute_context( - dialect, - dialect.execution_ctx_cls._init_compiled, - compiled, - distilled_parameters, - execution_options, - compiled, - distilled_parameters, - None, - None, - ) - if self._has_events or self.engine._has_events: - self.dispatch.after_execute( - self, - compiled, - event_multiparams, - event_params, - execution_options, - ret, - ) - return ret - - def exec_driver_sql( - self, - statement: str, - parameters: Optional[_DBAPIAnyExecuteParams] = None, - execution_options: Optional[CoreExecuteOptionsParameter] = None, - ) -> CursorResult[Any]: - r"""Executes a string SQL statement on the DBAPI cursor directly, - without any SQL compilation steps. - - This can be used to pass any string directly to the - ``cursor.execute()`` method of the DBAPI in use. - - :param statement: The statement str to be executed. Bound parameters - must use the underlying DBAPI's paramstyle, such as "qmark", - "pyformat", "format", etc. - - :param parameters: represent bound parameter values to be used in the - execution. The format is one of: a dictionary of named parameters, - a tuple of positional parameters, or a list containing either - dictionaries or tuples for multiple-execute support. - - :return: a :class:`_engine.CursorResult`. - - E.g. multiple dictionaries:: - - - conn.exec_driver_sql( - "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", - [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}] - ) - - Single dictionary:: - - conn.exec_driver_sql( - "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)", - dict(id=1, value="v1") - ) - - Single tuple:: - - conn.exec_driver_sql( - "INSERT INTO table (id, value) VALUES (?, ?)", - (1, 'v1') - ) - - .. note:: The :meth:`_engine.Connection.exec_driver_sql` method does - not participate in the - :meth:`_events.ConnectionEvents.before_execute` and - :meth:`_events.ConnectionEvents.after_execute` events. To - intercept calls to :meth:`_engine.Connection.exec_driver_sql`, use - :meth:`_events.ConnectionEvents.before_cursor_execute` and - :meth:`_events.ConnectionEvents.after_cursor_execute`. - - .. seealso:: - - :pep:`249` - - """ - - distilled_parameters = _distill_raw_params(parameters) - - execution_options = self._execution_options.merge_with( - execution_options - ) - - dialect = self.dialect - ret = self._execute_context( - dialect, - dialect.execution_ctx_cls._init_statement, - statement, - None, - execution_options, - statement, - distilled_parameters, - ) - - return ret - - def _execute_context( - self, - dialect: Dialect, - constructor: Callable[..., ExecutionContext], - statement: Union[str, Compiled], - parameters: Optional[_AnyMultiExecuteParams], - execution_options: _ExecuteOptions, - *args: Any, - **kw: Any, - ) -> CursorResult[Any]: - """Create an :class:`.ExecutionContext` and execute, returning - a :class:`_engine.CursorResult`.""" - - if execution_options: - yp = execution_options.get("yield_per", None) - if yp: - execution_options = execution_options.union( - {"stream_results": True, "max_row_buffer": yp} - ) - try: - conn = self._dbapi_connection - if conn is None: - conn = self._revalidate_connection() - - context = constructor( - dialect, self, conn, execution_options, *args, **kw - ) - except (exc.PendingRollbackError, exc.ResourceClosedError): - raise - except BaseException as e: - self._handle_dbapi_exception( - e, str(statement), parameters, None, None - ) - - if ( - self._transaction - and not self._transaction.is_active - or ( - self._nested_transaction - and not self._nested_transaction.is_active - ) - ): - self._invalid_transaction() - - elif self._trans_context_manager: - TransactionalContext._trans_ctx_check(self) - - if self._transaction is None: - self._autobegin() - - context.pre_exec() - - if context.execute_style is ExecuteStyle.INSERTMANYVALUES: - return self._exec_insertmany_context(dialect, context) - else: - return self._exec_single_context( - dialect, context, statement, parameters - ) - - def _exec_single_context( - self, - dialect: Dialect, - context: ExecutionContext, - statement: Union[str, Compiled], - parameters: Optional[_AnyMultiExecuteParams], - ) -> CursorResult[Any]: - """continue the _execute_context() method for a single DBAPI - cursor.execute() or cursor.executemany() call. - - """ - if dialect.bind_typing is BindTyping.SETINPUTSIZES: - generic_setinputsizes = context._prepare_set_input_sizes() - - if generic_setinputsizes: - try: - dialect.do_set_input_sizes( - context.cursor, generic_setinputsizes, context - ) - except BaseException as e: - self._handle_dbapi_exception( - e, str(statement), parameters, None, context - ) - - cursor, str_statement, parameters = ( - context.cursor, - context.statement, - context.parameters, - ) - - effective_parameters: Optional[_AnyExecuteParams] - - if not context.executemany: - effective_parameters = parameters[0] - else: - effective_parameters = parameters - - if self._has_events or self.engine._has_events: - for fn in self.dispatch.before_cursor_execute: - str_statement, effective_parameters = fn( - self, - cursor, - str_statement, - effective_parameters, - context, - context.executemany, - ) - - if self._echo: - self._log_info(str_statement) - - stats = context._get_cache_stats() - - if not self.engine.hide_parameters: - self._log_info( - "[%s] %r", - stats, - sql_util._repr_params( - effective_parameters, - batches=10, - ismulti=context.executemany, - ), - ) - else: - self._log_info( - "[%s] [SQL parameters hidden due to hide_parameters=True]", - stats, - ) - - evt_handled: bool = False - try: - if context.execute_style is ExecuteStyle.EXECUTEMANY: - effective_parameters = cast( - "_CoreMultiExecuteParams", effective_parameters - ) - if self.dialect._has_events: - for fn in self.dialect.dispatch.do_executemany: - if fn( - cursor, - str_statement, - effective_parameters, - context, - ): - evt_handled = True - break - if not evt_handled: - self.dialect.do_executemany( - cursor, - str_statement, - effective_parameters, - context, - ) - elif not effective_parameters and context.no_parameters: - if self.dialect._has_events: - for fn in self.dialect.dispatch.do_execute_no_params: - if fn(cursor, str_statement, context): - evt_handled = True - break - if not evt_handled: - self.dialect.do_execute_no_params( - cursor, str_statement, context - ) - else: - effective_parameters = cast( - "_CoreSingleExecuteParams", effective_parameters - ) - if self.dialect._has_events: - for fn in self.dialect.dispatch.do_execute: - if fn( - cursor, - str_statement, - effective_parameters, - context, - ): - evt_handled = True - break - if not evt_handled: - self.dialect.do_execute( - cursor, str_statement, effective_parameters, context - ) - - if self._has_events or self.engine._has_events: - self.dispatch.after_cursor_execute( - self, - cursor, - str_statement, - effective_parameters, - context, - context.executemany, - ) - - context.post_exec() - - result = context._setup_result_proxy() - - except BaseException as e: - self._handle_dbapi_exception( - e, str_statement, effective_parameters, cursor, context - ) - - return result - - def _exec_insertmany_context( - self, - dialect: Dialect, - context: ExecutionContext, - ) -> CursorResult[Any]: - """continue the _execute_context() method for an "insertmanyvalues" - operation, which will invoke DBAPI - cursor.execute() one or more times with individual log and - event hook calls. - - """ - - if dialect.bind_typing is BindTyping.SETINPUTSIZES: - generic_setinputsizes = context._prepare_set_input_sizes() - else: - generic_setinputsizes = None - - cursor, str_statement, parameters = ( - context.cursor, - context.statement, - context.parameters, - ) - - effective_parameters = parameters - - engine_events = self._has_events or self.engine._has_events - if self.dialect._has_events: - do_execute_dispatch: Iterable[Any] = ( - self.dialect.dispatch.do_execute - ) - else: - do_execute_dispatch = () - - if self._echo: - stats = context._get_cache_stats() + " (insertmanyvalues)" - - preserve_rowcount = context.execution_options.get( - "preserve_rowcount", False - ) - rowcount = 0 - - for imv_batch in dialect._deliver_insertmanyvalues_batches( - cursor, - str_statement, - effective_parameters, - generic_setinputsizes, - context, - ): - if imv_batch.processed_setinputsizes: - try: - dialect.do_set_input_sizes( - context.cursor, - imv_batch.processed_setinputsizes, - context, - ) - except BaseException as e: - self._handle_dbapi_exception( - e, - sql_util._long_statement(imv_batch.replaced_statement), - imv_batch.replaced_parameters, - None, - context, - ) - - sub_stmt = imv_batch.replaced_statement - sub_params = imv_batch.replaced_parameters - - if engine_events: - for fn in self.dispatch.before_cursor_execute: - sub_stmt, sub_params = fn( - self, - cursor, - sub_stmt, - sub_params, - context, - True, - ) - - if self._echo: - self._log_info(sql_util._long_statement(sub_stmt)) - - imv_stats = f""" {imv_batch.batchnum}/{ - imv_batch.total_batches - } ({ - 'ordered' - if imv_batch.rows_sorted else 'unordered' - }{ - '; batch not supported' - if imv_batch.is_downgraded - else '' - })""" - - if imv_batch.batchnum == 1: - stats += imv_stats - else: - stats = f"insertmanyvalues{imv_stats}" - - if not self.engine.hide_parameters: - self._log_info( - "[%s] %r", - stats, - sql_util._repr_params( - sub_params, - batches=10, - ismulti=False, - ), - ) - else: - self._log_info( - "[%s] [SQL parameters hidden due to " - "hide_parameters=True]", - stats, - ) - - try: - for fn in do_execute_dispatch: - if fn( - cursor, - sub_stmt, - sub_params, - context, - ): - break - else: - dialect.do_execute( - cursor, - sub_stmt, - sub_params, - context, - ) - - except BaseException as e: - self._handle_dbapi_exception( - e, - sql_util._long_statement(sub_stmt), - sub_params, - cursor, - context, - is_sub_exec=True, - ) - - if engine_events: - self.dispatch.after_cursor_execute( - self, - cursor, - str_statement, - effective_parameters, - context, - context.executemany, - ) - - if preserve_rowcount: - rowcount += imv_batch.current_batch_size - - try: - context.post_exec() - - if preserve_rowcount: - context._rowcount = rowcount # type: ignore[attr-defined] - - result = context._setup_result_proxy() - - except BaseException as e: - self._handle_dbapi_exception( - e, str_statement, effective_parameters, cursor, context - ) - - return result - - def _cursor_execute( - self, - cursor: DBAPICursor, - statement: str, - parameters: _DBAPISingleExecuteParams, - context: Optional[ExecutionContext] = None, - ) -> None: - """Execute a statement + params on the given cursor. - - Adds appropriate logging and exception handling. - - This method is used by DefaultDialect for special-case - executions, such as for sequences and column defaults. - The path of statement execution in the majority of cases - terminates at _execute_context(). - - """ - if self._has_events or self.engine._has_events: - for fn in self.dispatch.before_cursor_execute: - statement, parameters = fn( - self, cursor, statement, parameters, context, False - ) - - if self._echo: - self._log_info(statement) - self._log_info("[raw sql] %r", parameters) - try: - for fn in ( - () - if not self.dialect._has_events - else self.dialect.dispatch.do_execute - ): - if fn(cursor, statement, parameters, context): - break - else: - self.dialect.do_execute(cursor, statement, parameters, context) - except BaseException as e: - self._handle_dbapi_exception( - e, statement, parameters, cursor, context - ) - - if self._has_events or self.engine._has_events: - self.dispatch.after_cursor_execute( - self, cursor, statement, parameters, context, False - ) - - def _safe_close_cursor(self, cursor: DBAPICursor) -> None: - """Close the given cursor, catching exceptions - and turning into log warnings. - - """ - try: - cursor.close() - except Exception: - # log the error through the connection pool's logger. - self.engine.pool.logger.error( - "Error closing cursor", exc_info=True - ) - - _reentrant_error = False - _is_disconnect = False - - def _handle_dbapi_exception( - self, - e: BaseException, - statement: Optional[str], - parameters: Optional[_AnyExecuteParams], - cursor: Optional[DBAPICursor], - context: Optional[ExecutionContext], - is_sub_exec: bool = False, - ) -> NoReturn: - exc_info = sys.exc_info() - - is_exit_exception = util.is_exit_exception(e) - - if not self._is_disconnect: - self._is_disconnect = ( - isinstance(e, self.dialect.loaded_dbapi.Error) - and not self.closed - and self.dialect.is_disconnect( - e, - self._dbapi_connection if not self.invalidated else None, - cursor, - ) - ) or (is_exit_exception and not self.closed) - - invalidate_pool_on_disconnect = not is_exit_exception - - ismulti: bool = ( - not is_sub_exec and context.executemany - if context is not None - else False - ) - if self._reentrant_error: - raise exc.DBAPIError.instance( - statement, - parameters, - e, - self.dialect.loaded_dbapi.Error, - hide_parameters=self.engine.hide_parameters, - dialect=self.dialect, - ismulti=ismulti, - ).with_traceback(exc_info[2]) from e - self._reentrant_error = True - try: - # non-DBAPI error - if we already got a context, - # or there's no string statement, don't wrap it - should_wrap = isinstance(e, self.dialect.loaded_dbapi.Error) or ( - statement is not None - and context is None - and not is_exit_exception - ) - - if should_wrap: - sqlalchemy_exception = exc.DBAPIError.instance( - statement, - parameters, - cast(Exception, e), - self.dialect.loaded_dbapi.Error, - hide_parameters=self.engine.hide_parameters, - connection_invalidated=self._is_disconnect, - dialect=self.dialect, - ismulti=ismulti, - ) - else: - sqlalchemy_exception = None - - newraise = None - - if (self.dialect._has_events) and not self._execution_options.get( - "skip_user_error_events", False - ): - ctx = ExceptionContextImpl( - e, - sqlalchemy_exception, - self.engine, - self.dialect, - self, - cursor, - statement, - parameters, - context, - self._is_disconnect, - invalidate_pool_on_disconnect, - False, - ) - - for fn in self.dialect.dispatch.handle_error: - try: - # handler returns an exception; - # call next handler in a chain - per_fn = fn(ctx) - if per_fn is not None: - ctx.chained_exception = newraise = per_fn - except Exception as _raised: - # handler raises an exception - stop processing - newraise = _raised - break - - if self._is_disconnect != ctx.is_disconnect: - self._is_disconnect = ctx.is_disconnect - if sqlalchemy_exception: - sqlalchemy_exception.connection_invalidated = ( - ctx.is_disconnect - ) - - # set up potentially user-defined value for - # invalidate pool. - invalidate_pool_on_disconnect = ( - ctx.invalidate_pool_on_disconnect - ) - - if should_wrap and context: - context.handle_dbapi_exception(e) - - if not self._is_disconnect: - if cursor: - self._safe_close_cursor(cursor) - # "autorollback" was mostly relevant in 1.x series. - # It's very unlikely to reach here, as the connection - # does autobegin so when we are here, we are usually - # in an explicit / semi-explicit transaction. - # however we have a test which manufactures this - # scenario in any case using an event handler. - # test/engine/test_execute.py-> test_actual_autorollback - if not self.in_transaction(): - self._rollback_impl() - - if newraise: - raise newraise.with_traceback(exc_info[2]) from e - elif should_wrap: - assert sqlalchemy_exception is not None - raise sqlalchemy_exception.with_traceback(exc_info[2]) from e - else: - assert exc_info[1] is not None - raise exc_info[1].with_traceback(exc_info[2]) - finally: - del self._reentrant_error - if self._is_disconnect: - del self._is_disconnect - if not self.invalidated: - dbapi_conn_wrapper = self._dbapi_connection - assert dbapi_conn_wrapper is not None - if invalidate_pool_on_disconnect: - self.engine.pool._invalidate(dbapi_conn_wrapper, e) - self.invalidate(e) - - @classmethod - def _handle_dbapi_exception_noconnection( - cls, - e: BaseException, - dialect: Dialect, - engine: Optional[Engine] = None, - is_disconnect: Optional[bool] = None, - invalidate_pool_on_disconnect: bool = True, - is_pre_ping: bool = False, - ) -> NoReturn: - exc_info = sys.exc_info() - - if is_disconnect is None: - is_disconnect = isinstance( - e, dialect.loaded_dbapi.Error - ) and dialect.is_disconnect(e, None, None) - - should_wrap = isinstance(e, dialect.loaded_dbapi.Error) - - if should_wrap: - sqlalchemy_exception = exc.DBAPIError.instance( - None, - None, - cast(Exception, e), - dialect.loaded_dbapi.Error, - hide_parameters=( - engine.hide_parameters if engine is not None else False - ), - connection_invalidated=is_disconnect, - dialect=dialect, - ) - else: - sqlalchemy_exception = None - - newraise = None - - if dialect._has_events: - ctx = ExceptionContextImpl( - e, - sqlalchemy_exception, - engine, - dialect, - None, - None, - None, - None, - None, - is_disconnect, - invalidate_pool_on_disconnect, - is_pre_ping, - ) - for fn in dialect.dispatch.handle_error: - try: - # handler returns an exception; - # call next handler in a chain - per_fn = fn(ctx) - if per_fn is not None: - ctx.chained_exception = newraise = per_fn - except Exception as _raised: - # handler raises an exception - stop processing - newraise = _raised - break - - if sqlalchemy_exception and is_disconnect != ctx.is_disconnect: - sqlalchemy_exception.connection_invalidated = is_disconnect = ( - ctx.is_disconnect - ) - - if newraise: - raise newraise.with_traceback(exc_info[2]) from e - elif should_wrap: - assert sqlalchemy_exception is not None - raise sqlalchemy_exception.with_traceback(exc_info[2]) from e - else: - assert exc_info[1] is not None - raise exc_info[1].with_traceback(exc_info[2]) - - def _run_ddl_visitor( - self, - visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]], - element: SchemaItem, - **kwargs: Any, - ) -> None: - """run a DDL visitor. - - This method is only here so that the MockConnection can change the - options given to the visitor so that "checkfirst" is skipped. - - """ - visitorcallable(self.dialect, self, **kwargs).traverse_single(element) - - -class ExceptionContextImpl(ExceptionContext): - """Implement the :class:`.ExceptionContext` interface.""" - - __slots__ = ( - "connection", - "engine", - "dialect", - "cursor", - "statement", - "parameters", - "original_exception", - "sqlalchemy_exception", - "chained_exception", - "execution_context", - "is_disconnect", - "invalidate_pool_on_disconnect", - "is_pre_ping", - ) - - def __init__( - self, - exception: BaseException, - sqlalchemy_exception: Optional[exc.StatementError], - engine: Optional[Engine], - dialect: Dialect, - connection: Optional[Connection], - cursor: Optional[DBAPICursor], - statement: Optional[str], - parameters: Optional[_DBAPIAnyExecuteParams], - context: Optional[ExecutionContext], - is_disconnect: bool, - invalidate_pool_on_disconnect: bool, - is_pre_ping: bool, - ): - self.engine = engine - self.dialect = dialect - self.connection = connection - self.sqlalchemy_exception = sqlalchemy_exception - self.original_exception = exception - self.execution_context = context - self.statement = statement - self.parameters = parameters - self.is_disconnect = is_disconnect - self.invalidate_pool_on_disconnect = invalidate_pool_on_disconnect - self.is_pre_ping = is_pre_ping - - -class Transaction(TransactionalContext): - """Represent a database transaction in progress. - - The :class:`.Transaction` object is procured by - calling the :meth:`_engine.Connection.begin` method of - :class:`_engine.Connection`:: - - from sqlalchemy import create_engine - engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") - connection = engine.connect() - trans = connection.begin() - connection.execute(text("insert into x (a, b) values (1, 2)")) - trans.commit() - - The object provides :meth:`.rollback` and :meth:`.commit` - methods in order to control transaction boundaries. It - also implements a context manager interface so that - the Python ``with`` statement can be used with the - :meth:`_engine.Connection.begin` method:: - - with connection.begin(): - connection.execute(text("insert into x (a, b) values (1, 2)")) - - The Transaction object is **not** threadsafe. - - .. seealso:: - - :meth:`_engine.Connection.begin` - - :meth:`_engine.Connection.begin_twophase` - - :meth:`_engine.Connection.begin_nested` - - .. index:: - single: thread safety; Transaction - """ # noqa - - __slots__ = () - - _is_root: bool = False - is_active: bool - connection: Connection - - def __init__(self, connection: Connection): - raise NotImplementedError() - - @property - def _deactivated_from_connection(self) -> bool: - """True if this transaction is totally deactivated from the connection - and therefore can no longer affect its state. - - """ - raise NotImplementedError() - - def _do_close(self) -> None: - raise NotImplementedError() - - def _do_rollback(self) -> None: - raise NotImplementedError() - - def _do_commit(self) -> None: - raise NotImplementedError() - - @property - def is_valid(self) -> bool: - return self.is_active and not self.connection.invalidated - - def close(self) -> None: - """Close this :class:`.Transaction`. - - If this transaction is the base transaction in a begin/commit - nesting, the transaction will rollback(). Otherwise, the - method returns. - - This is used to cancel a Transaction without affecting the scope of - an enclosing transaction. - - """ - try: - self._do_close() - finally: - assert not self.is_active - - def rollback(self) -> None: - """Roll back this :class:`.Transaction`. - - The implementation of this may vary based on the type of transaction in - use: - - * For a simple database transaction (e.g. :class:`.RootTransaction`), - it corresponds to a ROLLBACK. - - * For a :class:`.NestedTransaction`, it corresponds to a - "ROLLBACK TO SAVEPOINT" operation. - - * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two - phase transactions may be used. - - - """ - try: - self._do_rollback() - finally: - assert not self.is_active - - def commit(self) -> None: - """Commit this :class:`.Transaction`. - - The implementation of this may vary based on the type of transaction in - use: - - * For a simple database transaction (e.g. :class:`.RootTransaction`), - it corresponds to a COMMIT. - - * For a :class:`.NestedTransaction`, it corresponds to a - "RELEASE SAVEPOINT" operation. - - * For a :class:`.TwoPhaseTransaction`, DBAPI-specific methods for two - phase transactions may be used. - - """ - try: - self._do_commit() - finally: - assert not self.is_active - - def _get_subject(self) -> Connection: - return self.connection - - def _transaction_is_active(self) -> bool: - return self.is_active - - def _transaction_is_closed(self) -> bool: - return not self._deactivated_from_connection - - def _rollback_can_be_called(self) -> bool: - # for RootTransaction / NestedTransaction, it's safe to call - # rollback() even if the transaction is deactive and no warnings - # will be emitted. tested in - # test_transaction.py -> test_no_rollback_in_deactive(?:_savepoint)? - return True - - -class RootTransaction(Transaction): - """Represent the "root" transaction on a :class:`_engine.Connection`. - - This corresponds to the current "BEGIN/COMMIT/ROLLBACK" that's occurring - for the :class:`_engine.Connection`. The :class:`_engine.RootTransaction` - is created by calling upon the :meth:`_engine.Connection.begin` method, and - remains associated with the :class:`_engine.Connection` throughout its - active span. The current :class:`_engine.RootTransaction` in use is - accessible via the :attr:`_engine.Connection.get_transaction` method of - :class:`_engine.Connection`. - - In :term:`2.0 style` use, the :class:`_engine.Connection` also employs - "autobegin" behavior that will create a new - :class:`_engine.RootTransaction` whenever a connection in a - non-transactional state is used to emit commands on the DBAPI connection. - The scope of the :class:`_engine.RootTransaction` in 2.0 style - use can be controlled using the :meth:`_engine.Connection.commit` and - :meth:`_engine.Connection.rollback` methods. - - - """ - - _is_root = True - - __slots__ = ("connection", "is_active") - - def __init__(self, connection: Connection): - assert connection._transaction is None - if connection._trans_context_manager: - TransactionalContext._trans_ctx_check(connection) - self.connection = connection - self._connection_begin_impl() - connection._transaction = self - - self.is_active = True - - def _deactivate_from_connection(self) -> None: - if self.is_active: - assert self.connection._transaction is self - self.is_active = False - - elif self.connection._transaction is not self: - util.warn("transaction already deassociated from connection") - - @property - def _deactivated_from_connection(self) -> bool: - return self.connection._transaction is not self - - def _connection_begin_impl(self) -> None: - self.connection._begin_impl(self) - - def _connection_rollback_impl(self) -> None: - self.connection._rollback_impl() - - def _connection_commit_impl(self) -> None: - self.connection._commit_impl() - - def _close_impl(self, try_deactivate: bool = False) -> None: - try: - if self.is_active: - self._connection_rollback_impl() - - if self.connection._nested_transaction: - self.connection._nested_transaction._cancel() - finally: - if self.is_active or try_deactivate: - self._deactivate_from_connection() - if self.connection._transaction is self: - self.connection._transaction = None - - assert not self.is_active - assert self.connection._transaction is not self - - def _do_close(self) -> None: - self._close_impl() - - def _do_rollback(self) -> None: - self._close_impl(try_deactivate=True) - - def _do_commit(self) -> None: - if self.is_active: - assert self.connection._transaction is self - - try: - self._connection_commit_impl() - finally: - # whether or not commit succeeds, cancel any - # nested transactions, make this transaction "inactive" - # and remove it as a reset agent - if self.connection._nested_transaction: - self.connection._nested_transaction._cancel() - - self._deactivate_from_connection() - - # ...however only remove as the connection's current transaction - # if commit succeeded. otherwise it stays on so that a rollback - # needs to occur. - self.connection._transaction = None - else: - if self.connection._transaction is self: - self.connection._invalid_transaction() - else: - raise exc.InvalidRequestError("This transaction is inactive") - - assert not self.is_active - assert self.connection._transaction is not self - - -class NestedTransaction(Transaction): - """Represent a 'nested', or SAVEPOINT transaction. - - The :class:`.NestedTransaction` object is created by calling the - :meth:`_engine.Connection.begin_nested` method of - :class:`_engine.Connection`. - - When using :class:`.NestedTransaction`, the semantics of "begin" / - "commit" / "rollback" are as follows: - - * the "begin" operation corresponds to the "BEGIN SAVEPOINT" command, where - the savepoint is given an explicit name that is part of the state - of this object. - - * The :meth:`.NestedTransaction.commit` method corresponds to a - "RELEASE SAVEPOINT" operation, using the savepoint identifier associated - with this :class:`.NestedTransaction`. - - * The :meth:`.NestedTransaction.rollback` method corresponds to a - "ROLLBACK TO SAVEPOINT" operation, using the savepoint identifier - associated with this :class:`.NestedTransaction`. - - The rationale for mimicking the semantics of an outer transaction in - terms of savepoints so that code may deal with a "savepoint" transaction - and an "outer" transaction in an agnostic way. - - .. seealso:: - - :ref:`session_begin_nested` - ORM version of the SAVEPOINT API. - - """ - - __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") - - _savepoint: str - - def __init__(self, connection: Connection): - assert connection._transaction is not None - if connection._trans_context_manager: - TransactionalContext._trans_ctx_check(connection) - self.connection = connection - self._savepoint = self.connection._savepoint_impl() - self.is_active = True - self._previous_nested = connection._nested_transaction - connection._nested_transaction = self - - def _deactivate_from_connection(self, warn: bool = True) -> None: - if self.connection._nested_transaction is self: - self.connection._nested_transaction = self._previous_nested - elif warn: - util.warn( - "nested transaction already deassociated from connection" - ) - - @property - def _deactivated_from_connection(self) -> bool: - return self.connection._nested_transaction is not self - - def _cancel(self) -> None: - # called by RootTransaction when the outer transaction is - # committed, rolled back, or closed to cancel all savepoints - # without any action being taken - self.is_active = False - self._deactivate_from_connection() - if self._previous_nested: - self._previous_nested._cancel() - - def _close_impl( - self, deactivate_from_connection: bool, warn_already_deactive: bool - ) -> None: - try: - if ( - self.is_active - and self.connection._transaction - and self.connection._transaction.is_active - ): - self.connection._rollback_to_savepoint_impl(self._savepoint) - finally: - self.is_active = False - - if deactivate_from_connection: - self._deactivate_from_connection(warn=warn_already_deactive) - - assert not self.is_active - if deactivate_from_connection: - assert self.connection._nested_transaction is not self - - def _do_close(self) -> None: - self._close_impl(True, False) - - def _do_rollback(self) -> None: - self._close_impl(True, True) - - def _do_commit(self) -> None: - if self.is_active: - try: - self.connection._release_savepoint_impl(self._savepoint) - finally: - # nested trans becomes inactive on failed release - # unconditionally. this prevents it from trying to - # emit SQL when it rolls back. - self.is_active = False - - # but only de-associate from connection if it succeeded - self._deactivate_from_connection() - else: - if self.connection._nested_transaction is self: - self.connection._invalid_transaction() - else: - raise exc.InvalidRequestError( - "This nested transaction is inactive" - ) - - -class TwoPhaseTransaction(RootTransaction): - """Represent a two-phase transaction. - - A new :class:`.TwoPhaseTransaction` object may be procured - using the :meth:`_engine.Connection.begin_twophase` method. - - The interface is the same as that of :class:`.Transaction` - with the addition of the :meth:`prepare` method. - - """ - - __slots__ = ("xid", "_is_prepared") - - xid: Any - - def __init__(self, connection: Connection, xid: Any): - self._is_prepared = False - self.xid = xid - super().__init__(connection) - - def prepare(self) -> None: - """Prepare this :class:`.TwoPhaseTransaction`. - - After a PREPARE, the transaction can be committed. - - """ - if not self.is_active: - raise exc.InvalidRequestError("This transaction is inactive") - self.connection._prepare_twophase_impl(self.xid) - self._is_prepared = True - - def _connection_begin_impl(self) -> None: - self.connection._begin_twophase_impl(self) - - def _connection_rollback_impl(self) -> None: - self.connection._rollback_twophase_impl(self.xid, self._is_prepared) - - def _connection_commit_impl(self) -> None: - self.connection._commit_twophase_impl(self.xid, self._is_prepared) - - -class Engine( - ConnectionEventsTarget, log.Identified, inspection.Inspectable["Inspector"] -): - """ - Connects a :class:`~sqlalchemy.pool.Pool` and - :class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a - source of database connectivity and behavior. - - An :class:`_engine.Engine` object is instantiated publicly using the - :func:`~sqlalchemy.create_engine` function. - - .. seealso:: - - :doc:`/core/engines` - - :ref:`connections_toplevel` - - """ - - dispatch: dispatcher[ConnectionEventsTarget] - - _compiled_cache: Optional[CompiledCacheType] - - _execution_options: _ExecuteOptions = _EMPTY_EXECUTION_OPTS - _has_events: bool = False - _connection_cls: Type[Connection] = Connection - _sqla_logger_namespace: str = "sqlalchemy.engine.Engine" - _is_future: bool = False - - _schema_translate_map: Optional[SchemaTranslateMapType] = None - _option_cls: Type[OptionEngine] - - dialect: Dialect - pool: Pool - url: URL - hide_parameters: bool - - def __init__( - self, - pool: Pool, - dialect: Dialect, - url: URL, - logging_name: Optional[str] = None, - echo: Optional[_EchoFlagType] = None, - query_cache_size: int = 500, - execution_options: Optional[Mapping[str, Any]] = None, - hide_parameters: bool = False, - ): - self.pool = pool - self.url = url - self.dialect = dialect - if logging_name: - self.logging_name = logging_name - self.echo = echo - self.hide_parameters = hide_parameters - if query_cache_size != 0: - self._compiled_cache = util.LRUCache( - query_cache_size, size_alert=self._lru_size_alert - ) - else: - self._compiled_cache = None - log.instance_logger(self, echoflag=echo) - if execution_options: - self.update_execution_options(**execution_options) - - def _lru_size_alert(self, cache: util.LRUCache[Any, Any]) -> None: - if self._should_log_info(): - self.logger.info( - "Compiled cache size pruning from %d items to %d. " - "Increase cache size to reduce the frequency of pruning.", - len(cache), - cache.capacity, - ) - - @property - def engine(self) -> Engine: - """Returns this :class:`.Engine`. - - Used for legacy schemes that accept :class:`.Connection` / - :class:`.Engine` objects within the same variable. - - """ - return self - - def clear_compiled_cache(self) -> None: - """Clear the compiled cache associated with the dialect. - - This applies **only** to the built-in cache that is established - via the :paramref:`_engine.create_engine.query_cache_size` parameter. - It will not impact any dictionary caches that were passed via the - :paramref:`.Connection.execution_options.compiled_cache` parameter. - - .. versionadded:: 1.4 - - """ - if self._compiled_cache: - self._compiled_cache.clear() - - def update_execution_options(self, **opt: Any) -> None: - r"""Update the default execution_options dictionary - of this :class:`_engine.Engine`. - - The given keys/values in \**opt are added to the - default execution options that will be used for - all connections. The initial contents of this dictionary - can be sent via the ``execution_options`` parameter - to :func:`_sa.create_engine`. - - .. seealso:: - - :meth:`_engine.Connection.execution_options` - - :meth:`_engine.Engine.execution_options` - - """ - self.dispatch.set_engine_execution_options(self, opt) - self._execution_options = self._execution_options.union(opt) - self.dialect.set_engine_execution_options(self, opt) - - @overload - def execution_options( - self, - *, - compiled_cache: Optional[CompiledCacheType] = ..., - logging_token: str = ..., - isolation_level: IsolationLevel = ..., - insertmanyvalues_page_size: int = ..., - schema_translate_map: Optional[SchemaTranslateMapType] = ..., - **opt: Any, - ) -> OptionEngine: ... - - @overload - def execution_options(self, **opt: Any) -> OptionEngine: ... - - def execution_options(self, **opt: Any) -> OptionEngine: - """Return a new :class:`_engine.Engine` that will provide - :class:`_engine.Connection` objects with the given execution options. - - The returned :class:`_engine.Engine` remains related to the original - :class:`_engine.Engine` in that it shares the same connection pool and - other state: - - * The :class:`_pool.Pool` used by the new :class:`_engine.Engine` - is the - same instance. The :meth:`_engine.Engine.dispose` - method will replace - the connection pool instance for the parent engine as well - as this one. - * Event listeners are "cascaded" - meaning, the new - :class:`_engine.Engine` - inherits the events of the parent, and new events can be associated - with the new :class:`_engine.Engine` individually. - * The logging configuration and logging_name is copied from the parent - :class:`_engine.Engine`. - - The intent of the :meth:`_engine.Engine.execution_options` method is - to implement schemes where multiple :class:`_engine.Engine` - objects refer to the same connection pool, but are differentiated - by options that affect some execution-level behavior for each - engine. One such example is breaking into separate "reader" and - "writer" :class:`_engine.Engine` instances, where one - :class:`_engine.Engine` - has a lower :term:`isolation level` setting configured or is even - transaction-disabled using "autocommit". An example of this - configuration is at :ref:`dbapi_autocommit_multiple`. - - Another example is one that - uses a custom option ``shard_id`` which is consumed by an event - to change the current schema on a database connection:: - - from sqlalchemy import event - from sqlalchemy.engine import Engine - - primary_engine = create_engine("mysql+mysqldb://") - shard1 = primary_engine.execution_options(shard_id="shard1") - shard2 = primary_engine.execution_options(shard_id="shard2") - - shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"} - - @event.listens_for(Engine, "before_cursor_execute") - def _switch_shard(conn, cursor, stmt, - params, context, executemany): - shard_id = conn.get_execution_options().get('shard_id', "default") - current_shard = conn.info.get("current_shard", None) - - if current_shard != shard_id: - cursor.execute("use %s" % shards[shard_id]) - conn.info["current_shard"] = shard_id - - The above recipe illustrates two :class:`_engine.Engine` objects that - will each serve as factories for :class:`_engine.Connection` objects - that have pre-established "shard_id" execution options present. A - :meth:`_events.ConnectionEvents.before_cursor_execute` event handler - then interprets this execution option to emit a MySQL ``use`` statement - to switch databases before a statement execution, while at the same - time keeping track of which database we've established using the - :attr:`_engine.Connection.info` dictionary. - - .. seealso:: - - :meth:`_engine.Connection.execution_options` - - update execution options - on a :class:`_engine.Connection` object. - - :meth:`_engine.Engine.update_execution_options` - - update the execution - options for a given :class:`_engine.Engine` in place. - - :meth:`_engine.Engine.get_execution_options` - - - """ # noqa: E501 - return self._option_cls(self, opt) - - def get_execution_options(self) -> _ExecuteOptions: - """Get the non-SQL options which will take effect during execution. - - .. versionadded: 1.3 - - .. seealso:: - - :meth:`_engine.Engine.execution_options` - """ - return self._execution_options - - @property - def name(self) -> str: - """String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` - in use by this :class:`Engine`. - - """ - - return self.dialect.name - - @property - def driver(self) -> str: - """Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` - in use by this :class:`Engine`. - - """ - - return self.dialect.driver - - echo = log.echo_property() - - def __repr__(self) -> str: - return "Engine(%r)" % (self.url,) - - def dispose(self, close: bool = True) -> None: - """Dispose of the connection pool used by this - :class:`_engine.Engine`. - - A new connection pool is created immediately after the old one has been - disposed. The previous connection pool is disposed either actively, by - closing out all currently checked-in connections in that pool, or - passively, by losing references to it but otherwise not closing any - connections. The latter strategy is more appropriate for an initializer - in a forked Python process. - - :param close: if left at its default of ``True``, has the - effect of fully closing all **currently checked in** - database connections. Connections that are still checked out - will **not** be closed, however they will no longer be associated - with this :class:`_engine.Engine`, - so when they are closed individually, eventually the - :class:`_pool.Pool` which they are associated with will - be garbage collected and they will be closed out fully, if - not already closed on checkin. - - If set to ``False``, the previous connection pool is de-referenced, - and otherwise not touched in any way. - - .. versionadded:: 1.4.33 Added the :paramref:`.Engine.dispose.close` - parameter to allow the replacement of a connection pool in a child - process without interfering with the connections used by the parent - process. - - - .. seealso:: - - :ref:`engine_disposal` - - :ref:`pooling_multiprocessing` - - """ - if close: - self.pool.dispose() - self.pool = self.pool.recreate() - self.dispatch.engine_disposed(self) - - @contextlib.contextmanager - def _optional_conn_ctx_manager( - self, connection: Optional[Connection] = None - ) -> Iterator[Connection]: - if connection is None: - with self.connect() as conn: - yield conn - else: - yield connection - - @contextlib.contextmanager - def begin(self) -> Iterator[Connection]: - """Return a context manager delivering a :class:`_engine.Connection` - with a :class:`.Transaction` established. - - E.g.:: - - with engine.begin() as conn: - conn.execute( - text("insert into table (x, y, z) values (1, 2, 3)") - ) - conn.execute(text("my_special_procedure(5)")) - - Upon successful operation, the :class:`.Transaction` - is committed. If an error is raised, the :class:`.Transaction` - is rolled back. - - .. seealso:: - - :meth:`_engine.Engine.connect` - procure a - :class:`_engine.Connection` from - an :class:`_engine.Engine`. - - :meth:`_engine.Connection.begin` - start a :class:`.Transaction` - for a particular :class:`_engine.Connection`. - - """ - with self.connect() as conn: - with conn.begin(): - yield conn - - def _run_ddl_visitor( - self, - visitorcallable: Type[Union[SchemaGenerator, SchemaDropper]], - element: SchemaItem, - **kwargs: Any, - ) -> None: - with self.begin() as conn: - conn._run_ddl_visitor(visitorcallable, element, **kwargs) - - def connect(self) -> Connection: - """Return a new :class:`_engine.Connection` object. - - The :class:`_engine.Connection` acts as a Python context manager, so - the typical use of this method looks like:: - - with engine.connect() as connection: - connection.execute(text("insert into table values ('foo')")) - connection.commit() - - Where above, after the block is completed, the connection is "closed" - and its underlying DBAPI resources are returned to the connection pool. - This also has the effect of rolling back any transaction that - was explicitly begun or was begun via autobegin, and will - emit the :meth:`_events.ConnectionEvents.rollback` event if one was - started and is still in progress. - - .. seealso:: - - :meth:`_engine.Engine.begin` - - """ - - return self._connection_cls(self) - - def raw_connection(self) -> PoolProxiedConnection: - """Return a "raw" DBAPI connection from the connection pool. - - The returned object is a proxied version of the DBAPI - connection object used by the underlying driver in use. - The object will have all the same behavior as the real DBAPI - connection, except that its ``close()`` method will result in the - connection being returned to the pool, rather than being closed - for real. - - This method provides direct DBAPI connection access for - special situations when the API provided by - :class:`_engine.Connection` - is not needed. When a :class:`_engine.Connection` object is already - present, the DBAPI connection is available using - the :attr:`_engine.Connection.connection` accessor. - - .. seealso:: - - :ref:`dbapi_connections` - - """ - return self.pool.connect() - - -class OptionEngineMixin(log.Identified): - _sa_propagate_class_events = False - - dispatch: dispatcher[ConnectionEventsTarget] - _compiled_cache: Optional[CompiledCacheType] - dialect: Dialect - pool: Pool - url: URL - hide_parameters: bool - echo: log.echo_property - - def __init__( - self, proxied: Engine, execution_options: CoreExecuteOptionsParameter - ): - self._proxied = proxied - self.url = proxied.url - self.dialect = proxied.dialect - self.logging_name = proxied.logging_name - self.echo = proxied.echo - self._compiled_cache = proxied._compiled_cache - self.hide_parameters = proxied.hide_parameters - log.instance_logger(self, echoflag=self.echo) - - # note: this will propagate events that are assigned to the parent - # engine after this OptionEngine is created. Since we share - # the events of the parent we also disallow class-level events - # to apply to the OptionEngine class directly. - # - # the other way this can work would be to transfer existing - # events only, using: - # self.dispatch._update(proxied.dispatch) - # - # that might be more appropriate however it would be a behavioral - # change for logic that assigns events to the parent engine and - # would like it to take effect for the already-created sub-engine. - self.dispatch = self.dispatch._join(proxied.dispatch) - - self._execution_options = proxied._execution_options - self.update_execution_options(**execution_options) - - def update_execution_options(self, **opt: Any) -> None: - raise NotImplementedError() - - if not typing.TYPE_CHECKING: - # https://github.com/python/typing/discussions/1095 - - @property - def pool(self) -> Pool: - return self._proxied.pool - - @pool.setter - def pool(self, pool: Pool) -> None: - self._proxied.pool = pool - - @property - def _has_events(self) -> bool: - return self._proxied._has_events or self.__dict__.get( - "_has_events", False - ) - - @_has_events.setter - def _has_events(self, value: bool) -> None: - self.__dict__["_has_events"] = value - - -class OptionEngine(OptionEngineMixin, Engine): - def update_execution_options(self, **opt: Any) -> None: - Engine.update_execution_options(self, **opt) - - -Engine._option_cls = OptionEngine |