summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py2343
1 files changed, 2343 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py b/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py
new file mode 100644
index 0000000..90cafe4
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py
@@ -0,0 +1,2343 @@
+# engine/default.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+"""Default implementations of per-dialect sqlalchemy.engine classes.
+
+These are semi-private implementation classes which are only of importance
+to database dialect authors; dialects will usually use the classes here
+as the base class for their own corresponding classes.
+
+"""
+
+from __future__ import annotations
+
+import functools
+import operator
+import random
+import re
+from time import perf_counter
+import typing
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import List
+from typing import Mapping
+from typing import MutableMapping
+from typing import MutableSequence
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
+import weakref
+
+from . import characteristics
+from . import cursor as _cursor
+from . import interfaces
+from .base import Connection
+from .interfaces import CacheStats
+from .interfaces import DBAPICursor
+from .interfaces import Dialect
+from .interfaces import ExecuteStyle
+from .interfaces import ExecutionContext
+from .reflection import ObjectKind
+from .reflection import ObjectScope
+from .. import event
+from .. import exc
+from .. import pool
+from .. import util
+from ..sql import compiler
+from ..sql import dml
+from ..sql import expression
+from ..sql import type_api
+from ..sql._typing import is_tuple_type
+from ..sql.base import _NoArg
+from ..sql.compiler import DDLCompiler
+from ..sql.compiler import InsertmanyvaluesSentinelOpts
+from ..sql.compiler import SQLCompiler
+from ..sql.elements import quoted_name
+from ..util.typing import Final
+from ..util.typing import Literal
+
+if typing.TYPE_CHECKING:
+ from types import ModuleType
+
+ from .base import Engine
+ from .cursor import ResultFetchStrategy
+ from .interfaces import _CoreMultiExecuteParams
+ from .interfaces import _CoreSingleExecuteParams
+ from .interfaces import _DBAPICursorDescription
+ from .interfaces import _DBAPIMultiExecuteParams
+ from .interfaces import _ExecuteOptions
+ from .interfaces import _MutableCoreSingleExecuteParams
+ from .interfaces import _ParamStyle
+ from .interfaces import DBAPIConnection
+ from .interfaces import IsolationLevel
+ from .row import Row
+ from .url import URL
+ from ..event import _ListenerFnType
+ from ..pool import Pool
+ from ..pool import PoolProxiedConnection
+ from ..sql import Executable
+ from ..sql.compiler import Compiled
+ from ..sql.compiler import Linting
+ from ..sql.compiler import ResultColumnsEntry
+ from ..sql.dml import DMLState
+ from ..sql.dml import UpdateBase
+ from ..sql.elements import BindParameter
+ from ..sql.schema import Column
+ from ..sql.type_api import _BindProcessorType
+ from ..sql.type_api import _ResultProcessorType
+ from ..sql.type_api import TypeEngine
+
+# When we're handed literal SQL, ensure it's a SELECT query
+SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
+
+
+(
+ CACHE_HIT,
+ CACHE_MISS,
+ CACHING_DISABLED,
+ NO_CACHE_KEY,
+ NO_DIALECT_SUPPORT,
+) = list(CacheStats)
+
+
+class DefaultDialect(Dialect):
+ """Default implementation of Dialect"""
+
+ statement_compiler = compiler.SQLCompiler
+ ddl_compiler = compiler.DDLCompiler
+ type_compiler_cls = compiler.GenericTypeCompiler
+
+ preparer = compiler.IdentifierPreparer
+ supports_alter = True
+ supports_comments = False
+ supports_constraint_comments = False
+ inline_comments = False
+ supports_statement_cache = True
+
+ div_is_floordiv = True
+
+ bind_typing = interfaces.BindTyping.NONE
+
+ include_set_input_sizes: Optional[Set[Any]] = None
+ exclude_set_input_sizes: Optional[Set[Any]] = None
+
+ # the first value we'd get for an autoincrement column.
+ default_sequence_base = 1
+
+ # most DBAPIs happy with this for execute().
+ # not cx_oracle.
+ execute_sequence_format = tuple
+
+ supports_schemas = True
+ supports_views = True
+ supports_sequences = False
+ sequences_optional = False
+ preexecute_autoincrement_sequences = False
+ supports_identity_columns = False
+ postfetch_lastrowid = True
+ favor_returning_over_lastrowid = False
+ insert_null_pk_still_autoincrements = False
+ update_returning = False
+ delete_returning = False
+ update_returning_multifrom = False
+ delete_returning_multifrom = False
+ insert_returning = False
+
+ cte_follows_insert = False
+
+ supports_native_enum = False
+ supports_native_boolean = False
+ supports_native_uuid = False
+ returns_native_bytes = False
+
+ non_native_boolean_check_constraint = True
+
+ supports_simple_order_by_label = True
+
+ tuple_in_values = False
+
+ connection_characteristics = util.immutabledict(
+ {"isolation_level": characteristics.IsolationLevelCharacteristic()}
+ )
+
+ engine_config_types: Mapping[str, Any] = util.immutabledict(
+ {
+ "pool_timeout": util.asint,
+ "echo": util.bool_or_str("debug"),
+ "echo_pool": util.bool_or_str("debug"),
+ "pool_recycle": util.asint,
+ "pool_size": util.asint,
+ "max_overflow": util.asint,
+ "future": util.asbool,
+ }
+ )
+
+ # if the NUMERIC type
+ # returns decimal.Decimal.
+ # *not* the FLOAT type however.
+ supports_native_decimal = False
+
+ name = "default"
+
+ # length at which to truncate
+ # any identifier.
+ max_identifier_length = 9999
+ _user_defined_max_identifier_length: Optional[int] = None
+
+ isolation_level: Optional[str] = None
+
+ # sub-categories of max_identifier_length.
+ # currently these accommodate for MySQL which allows alias names
+ # of 255 but DDL names only of 64.
+ max_index_name_length: Optional[int] = None
+ max_constraint_name_length: Optional[int] = None
+
+ supports_sane_rowcount = True
+ supports_sane_multi_rowcount = True
+ colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
+ default_paramstyle = "named"
+
+ supports_default_values = False
+ """dialect supports INSERT... DEFAULT VALUES syntax"""
+
+ supports_default_metavalue = False
+ """dialect supports INSERT... VALUES (DEFAULT) syntax"""
+
+ default_metavalue_token = "DEFAULT"
+ """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
+ parenthesis."""
+
+ # not sure if this is a real thing but the compiler will deliver it
+ # if this is the only flag enabled.
+ supports_empty_insert = True
+ """dialect supports INSERT () VALUES ()"""
+
+ supports_multivalues_insert = False
+
+ use_insertmanyvalues: bool = False
+
+ use_insertmanyvalues_wo_returning: bool = False
+
+ insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
+ InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
+ )
+
+ insertmanyvalues_page_size: int = 1000
+ insertmanyvalues_max_parameters = 32700
+
+ supports_is_distinct_from = True
+
+ supports_server_side_cursors = False
+
+ server_side_cursors = False
+
+ # extra record-level locking features (#4860)
+ supports_for_update_of = False
+
+ server_version_info = None
+
+ default_schema_name: Optional[str] = None
+
+ # indicates symbol names are
+ # UPPERCASEd if they are case insensitive
+ # within the database.
+ # if this is True, the methods normalize_name()
+ # and denormalize_name() must be provided.
+ requires_name_normalize = False
+
+ is_async = False
+
+ has_terminate = False
+
+ # TODO: this is not to be part of 2.0. implement rudimentary binary
+ # literals for SQLite, PostgreSQL, MySQL only within
+ # _Binary.literal_processor
+ _legacy_binary_type_literal_encoding = "utf-8"
+
+ @util.deprecated_params(
+ empty_in_strategy=(
+ "1.4",
+ "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
+ "deprecated, and no longer has any effect. All IN expressions "
+ "are now rendered using "
+ 'the "expanding parameter" strategy which renders a set of bound'
+ 'expressions, or an "empty set" SELECT, at statement execution'
+ "time.",
+ ),
+ server_side_cursors=(
+ "1.4",
+ "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
+ "is deprecated and will be removed in a future release. Please "
+ "use the "
+ ":paramref:`_engine.Connection.execution_options.stream_results` "
+ "parameter.",
+ ),
+ )
+ def __init__(
+ self,
+ paramstyle: Optional[_ParamStyle] = None,
+ isolation_level: Optional[IsolationLevel] = None,
+ dbapi: Optional[ModuleType] = None,
+ implicit_returning: Literal[True] = True,
+ supports_native_boolean: Optional[bool] = None,
+ max_identifier_length: Optional[int] = None,
+ label_length: Optional[int] = None,
+ insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
+ use_insertmanyvalues: Optional[bool] = None,
+ # util.deprecated_params decorator cannot render the
+ # Linting.NO_LINTING constant
+ compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
+ server_side_cursors: bool = False,
+ **kwargs: Any,
+ ):
+ if server_side_cursors:
+ if not self.supports_server_side_cursors:
+ raise exc.ArgumentError(
+ "Dialect %s does not support server side cursors" % self
+ )
+ else:
+ self.server_side_cursors = True
+
+ if getattr(self, "use_setinputsizes", False):
+ util.warn_deprecated(
+ "The dialect-level use_setinputsizes attribute is "
+ "deprecated. Please use "
+ "bind_typing = BindTyping.SETINPUTSIZES",
+ "2.0",
+ )
+ self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
+
+ self.positional = False
+ self._ischema = None
+
+ self.dbapi = dbapi
+
+ if paramstyle is not None:
+ self.paramstyle = paramstyle
+ elif self.dbapi is not None:
+ self.paramstyle = self.dbapi.paramstyle
+ else:
+ self.paramstyle = self.default_paramstyle
+ self.positional = self.paramstyle in (
+ "qmark",
+ "format",
+ "numeric",
+ "numeric_dollar",
+ )
+ self.identifier_preparer = self.preparer(self)
+ self._on_connect_isolation_level = isolation_level
+
+ legacy_tt_callable = getattr(self, "type_compiler", None)
+ if legacy_tt_callable is not None:
+ tt_callable = cast(
+ Type[compiler.GenericTypeCompiler],
+ self.type_compiler,
+ )
+ else:
+ tt_callable = self.type_compiler_cls
+
+ self.type_compiler_instance = self.type_compiler = tt_callable(self)
+
+ if supports_native_boolean is not None:
+ self.supports_native_boolean = supports_native_boolean
+
+ self._user_defined_max_identifier_length = max_identifier_length
+ if self._user_defined_max_identifier_length:
+ self.max_identifier_length = (
+ self._user_defined_max_identifier_length
+ )
+ self.label_length = label_length
+ self.compiler_linting = compiler_linting
+
+ if use_insertmanyvalues is not None:
+ self.use_insertmanyvalues = use_insertmanyvalues
+
+ if insertmanyvalues_page_size is not _NoArg.NO_ARG:
+ self.insertmanyvalues_page_size = insertmanyvalues_page_size
+
+ @property
+ @util.deprecated(
+ "2.0",
+ "full_returning is deprecated, please use insert_returning, "
+ "update_returning, delete_returning",
+ )
+ def full_returning(self):
+ return (
+ self.insert_returning
+ and self.update_returning
+ and self.delete_returning
+ )
+
+ @util.memoized_property
+ def insert_executemany_returning(self):
+ """Default implementation for insert_executemany_returning, if not
+ otherwise overridden by the specific dialect.
+
+ The default dialect determines "insert_executemany_returning" is
+ available if the dialect in use has opted into using the
+ "use_insertmanyvalues" feature. If they haven't opted into that, then
+ this attribute is False, unless the dialect in question overrides this
+ and provides some other implementation (such as the Oracle dialect).
+
+ """
+ return self.insert_returning and self.use_insertmanyvalues
+
+ @util.memoized_property
+ def insert_executemany_returning_sort_by_parameter_order(self):
+ """Default implementation for
+ insert_executemany_returning_deterministic_order, if not otherwise
+ overridden by the specific dialect.
+
+ The default dialect determines "insert_executemany_returning" can have
+ deterministic order only if the dialect in use has opted into using the
+ "use_insertmanyvalues" feature, which implements deterministic ordering
+ using client side sentinel columns only by default. The
+ "insertmanyvalues" feature also features alternate forms that can
+ use server-generated PK values as "sentinels", but those are only
+ used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
+ bitflag enables those alternate SQL forms, which are disabled
+ by default.
+
+ If the dialect in use hasn't opted into that, then this attribute is
+ False, unless the dialect in question overrides this and provides some
+ other implementation (such as the Oracle dialect).
+
+ """
+ return self.insert_returning and self.use_insertmanyvalues
+
+ update_executemany_returning = False
+ delete_executemany_returning = False
+
+ @util.memoized_property
+ def loaded_dbapi(self) -> ModuleType:
+ if self.dbapi is None:
+ raise exc.InvalidRequestError(
+ f"Dialect {self} does not have a Python DBAPI established "
+ "and cannot be used for actual database interaction"
+ )
+ return self.dbapi
+
+ @util.memoized_property
+ def _bind_typing_render_casts(self):
+ return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
+
+ def _ensure_has_table_connection(self, arg):
+ if not isinstance(arg, Connection):
+ raise exc.ArgumentError(
+ "The argument passed to Dialect.has_table() should be a "
+ "%s, got %s. "
+ "Additionally, the Dialect.has_table() method is for "
+ "internal dialect "
+ "use only; please use "
+ "``inspect(some_engine).has_table(<tablename>>)`` "
+ "for public API use." % (Connection, type(arg))
+ )
+
+ @util.memoized_property
+ def _supports_statement_cache(self):
+ ssc = self.__class__.__dict__.get("supports_statement_cache", None)
+ if ssc is None:
+ util.warn(
+ "Dialect %s:%s will not make use of SQL compilation caching "
+ "as it does not set the 'supports_statement_cache' attribute "
+ "to ``True``. This can have "
+ "significant performance implications including some "
+ "performance degradations in comparison to prior SQLAlchemy "
+ "versions. Dialect maintainers should seek to set this "
+ "attribute to True after appropriate development and testing "
+ "for SQLAlchemy 1.4 caching support. Alternatively, this "
+ "attribute may be set to False which will disable this "
+ "warning." % (self.name, self.driver),
+ code="cprf",
+ )
+
+ return bool(ssc)
+
+ @util.memoized_property
+ def _type_memos(self):
+ return weakref.WeakKeyDictionary()
+
+ @property
+ def dialect_description(self):
+ return self.name + "+" + self.driver
+
+ @property
+ def supports_sane_rowcount_returning(self):
+ """True if this dialect supports sane rowcount even if RETURNING is
+ in use.
+
+ For dialects that don't support RETURNING, this is synonymous with
+ ``supports_sane_rowcount``.
+
+ """
+ return self.supports_sane_rowcount
+
+ @classmethod
+ def get_pool_class(cls, url: URL) -> Type[Pool]:
+ return getattr(cls, "poolclass", pool.QueuePool)
+
+ def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
+ return self.get_pool_class(url)
+
+ @classmethod
+ def load_provisioning(cls):
+ package = ".".join(cls.__module__.split(".")[0:-1])
+ try:
+ __import__(package + ".provision")
+ except ImportError:
+ pass
+
+ def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
+ if self._on_connect_isolation_level is not None:
+
+ def builtin_connect(dbapi_conn, conn_rec):
+ self._assert_and_set_isolation_level(
+ dbapi_conn, self._on_connect_isolation_level
+ )
+
+ return builtin_connect
+ else:
+ return None
+
+ def initialize(self, connection):
+ try:
+ self.server_version_info = self._get_server_version_info(
+ connection
+ )
+ except NotImplementedError:
+ self.server_version_info = None
+ try:
+ self.default_schema_name = self._get_default_schema_name(
+ connection
+ )
+ except NotImplementedError:
+ self.default_schema_name = None
+
+ try:
+ self.default_isolation_level = self.get_default_isolation_level(
+ connection.connection.dbapi_connection
+ )
+ except NotImplementedError:
+ self.default_isolation_level = None
+
+ if not self._user_defined_max_identifier_length:
+ max_ident_length = self._check_max_identifier_length(connection)
+ if max_ident_length:
+ self.max_identifier_length = max_ident_length
+
+ if (
+ self.label_length
+ and self.label_length > self.max_identifier_length
+ ):
+ raise exc.ArgumentError(
+ "Label length of %d is greater than this dialect's"
+ " maximum identifier length of %d"
+ % (self.label_length, self.max_identifier_length)
+ )
+
+ def on_connect(self):
+ # inherits the docstring from interfaces.Dialect.on_connect
+ return None
+
+ def _check_max_identifier_length(self, connection):
+ """Perform a connection / server version specific check to determine
+ the max_identifier_length.
+
+ If the dialect's class level max_identifier_length should be used,
+ can return None.
+
+ .. versionadded:: 1.3.9
+
+ """
+ return None
+
+ def get_default_isolation_level(self, dbapi_conn):
+ """Given a DBAPI connection, return its isolation level, or
+ a default isolation level if one cannot be retrieved.
+
+ May be overridden by subclasses in order to provide a
+ "fallback" isolation level for databases that cannot reliably
+ retrieve the actual isolation level.
+
+ By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
+ method, propagating any exceptions raised.
+
+ .. versionadded:: 1.3.22
+
+ """
+ return self.get_isolation_level(dbapi_conn)
+
+ def type_descriptor(self, typeobj):
+ """Provide a database-specific :class:`.TypeEngine` object, given
+ the generic object which comes from the types module.
+
+ This method looks for a dictionary called
+ ``colspecs`` as a class or instance-level variable,
+ and passes on to :func:`_types.adapt_type`.
+
+ """
+ return type_api.adapt_type(typeobj, self.colspecs)
+
+ def has_index(self, connection, table_name, index_name, schema=None, **kw):
+ if not self.has_table(connection, table_name, schema=schema, **kw):
+ return False
+ for idx in self.get_indexes(
+ connection, table_name, schema=schema, **kw
+ ):
+ if idx["name"] == index_name:
+ return True
+ else:
+ return False
+
+ def has_schema(
+ self, connection: Connection, schema_name: str, **kw: Any
+ ) -> bool:
+ return schema_name in self.get_schema_names(connection, **kw)
+
+ def validate_identifier(self, ident):
+ if len(ident) > self.max_identifier_length:
+ raise exc.IdentifierError(
+ "Identifier '%s' exceeds maximum length of %d characters"
+ % (ident, self.max_identifier_length)
+ )
+
+ def connect(self, *cargs, **cparams):
+ # inherits the docstring from interfaces.Dialect.connect
+ return self.loaded_dbapi.connect(*cargs, **cparams)
+
+ def create_connect_args(self, url):
+ # inherits the docstring from interfaces.Dialect.create_connect_args
+ opts = url.translate_connect_args()
+ opts.update(url.query)
+ return ([], opts)
+
+ def set_engine_execution_options(
+ self, engine: Engine, opts: Mapping[str, Any]
+ ) -> None:
+ supported_names = set(self.connection_characteristics).intersection(
+ opts
+ )
+ if supported_names:
+ characteristics: Mapping[str, Any] = util.immutabledict(
+ (name, opts[name]) for name in supported_names
+ )
+
+ @event.listens_for(engine, "engine_connect")
+ def set_connection_characteristics(connection):
+ self._set_connection_characteristics(
+ connection, characteristics
+ )
+
+ def set_connection_execution_options(
+ self, connection: Connection, opts: Mapping[str, Any]
+ ) -> None:
+ supported_names = set(self.connection_characteristics).intersection(
+ opts
+ )
+ if supported_names:
+ characteristics: Mapping[str, Any] = util.immutabledict(
+ (name, opts[name]) for name in supported_names
+ )
+ self._set_connection_characteristics(connection, characteristics)
+
+ def _set_connection_characteristics(self, connection, characteristics):
+ characteristic_values = [
+ (name, self.connection_characteristics[name], value)
+ for name, value in characteristics.items()
+ ]
+
+ if connection.in_transaction():
+ trans_objs = [
+ (name, obj)
+ for name, obj, value in characteristic_values
+ if obj.transactional
+ ]
+ if trans_objs:
+ raise exc.InvalidRequestError(
+ "This connection has already initialized a SQLAlchemy "
+ "Transaction() object via begin() or autobegin; "
+ "%s may not be altered unless rollback() or commit() "
+ "is called first."
+ % (", ".join(name for name, obj in trans_objs))
+ )
+
+ dbapi_connection = connection.connection.dbapi_connection
+ for name, characteristic, value in characteristic_values:
+ characteristic.set_characteristic(self, dbapi_connection, value)
+ connection.connection._connection_record.finalize_callback.append(
+ functools.partial(self._reset_characteristics, characteristics)
+ )
+
+ def _reset_characteristics(self, characteristics, dbapi_connection):
+ for characteristic_name in characteristics:
+ characteristic = self.connection_characteristics[
+ characteristic_name
+ ]
+ characteristic.reset_characteristic(self, dbapi_connection)
+
+ def do_begin(self, dbapi_connection):
+ pass
+
+ def do_rollback(self, dbapi_connection):
+ dbapi_connection.rollback()
+
+ def do_commit(self, dbapi_connection):
+ dbapi_connection.commit()
+
+ def do_terminate(self, dbapi_connection):
+ self.do_close(dbapi_connection)
+
+ def do_close(self, dbapi_connection):
+ dbapi_connection.close()
+
+ @util.memoized_property
+ def _dialect_specific_select_one(self):
+ return str(expression.select(1).compile(dialect=self))
+
+ def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
+ try:
+ return self.do_ping(dbapi_connection)
+ except self.loaded_dbapi.Error as err:
+ is_disconnect = self.is_disconnect(err, dbapi_connection, None)
+
+ if self._has_events:
+ try:
+ Connection._handle_dbapi_exception_noconnection(
+ err,
+ self,
+ is_disconnect=is_disconnect,
+ invalidate_pool_on_disconnect=False,
+ is_pre_ping=True,
+ )
+ except exc.StatementError as new_err:
+ is_disconnect = new_err.connection_invalidated
+
+ if is_disconnect:
+ return False
+ else:
+ raise
+
+ def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
+ cursor = None
+
+ cursor = dbapi_connection.cursor()
+ try:
+ cursor.execute(self._dialect_specific_select_one)
+ finally:
+ cursor.close()
+ return True
+
+ def create_xid(self):
+ """Create a random two-phase transaction ID.
+
+ This id will be passed to do_begin_twophase(), do_rollback_twophase(),
+ do_commit_twophase(). Its format is unspecified.
+ """
+
+ return "_sa_%032x" % random.randint(0, 2**128)
+
+ def do_savepoint(self, connection, name):
+ connection.execute(expression.SavepointClause(name))
+
+ def do_rollback_to_savepoint(self, connection, name):
+ connection.execute(expression.RollbackToSavepointClause(name))
+
+ def do_release_savepoint(self, connection, name):
+ connection.execute(expression.ReleaseSavepointClause(name))
+
+ def _deliver_insertmanyvalues_batches(
+ self, cursor, statement, parameters, generic_setinputsizes, context
+ ):
+ context = cast(DefaultExecutionContext, context)
+ compiled = cast(SQLCompiler, context.compiled)
+
+ _composite_sentinel_proc: Sequence[
+ Optional[_ResultProcessorType[Any]]
+ ] = ()
+ _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
+ _sentinel_proc_initialized: bool = False
+
+ compiled_parameters = context.compiled_parameters
+
+ imv = compiled._insertmanyvalues
+ assert imv is not None
+
+ is_returning: Final[bool] = bool(compiled.effective_returning)
+ batch_size = context.execution_options.get(
+ "insertmanyvalues_page_size", self.insertmanyvalues_page_size
+ )
+
+ if compiled.schema_translate_map:
+ schema_translate_map = context.execution_options.get(
+ "schema_translate_map", {}
+ )
+ else:
+ schema_translate_map = None
+
+ if is_returning:
+ result: Optional[List[Any]] = []
+ context._insertmanyvalues_rows = result
+
+ sort_by_parameter_order = imv.sort_by_parameter_order
+
+ else:
+ sort_by_parameter_order = False
+ result = None
+
+ for imv_batch in compiled._deliver_insertmanyvalues_batches(
+ statement,
+ parameters,
+ compiled_parameters,
+ generic_setinputsizes,
+ batch_size,
+ sort_by_parameter_order,
+ schema_translate_map,
+ ):
+ yield imv_batch
+
+ if is_returning:
+
+ rows = context.fetchall_for_returning(cursor)
+
+ # I would have thought "is_returning: Final[bool]"
+ # would have assured this but pylance thinks not
+ assert result is not None
+
+ if imv.num_sentinel_columns and not imv_batch.is_downgraded:
+ composite_sentinel = imv.num_sentinel_columns > 1
+ if imv.implicit_sentinel:
+ # for implicit sentinel, which is currently single-col
+ # integer autoincrement, do a simple sort.
+ assert not composite_sentinel
+ result.extend(
+ sorted(rows, key=operator.itemgetter(-1))
+ )
+ continue
+
+ # otherwise, create dictionaries to match up batches
+ # with parameters
+ assert imv.sentinel_param_keys
+ assert imv.sentinel_columns
+
+ _nsc = imv.num_sentinel_columns
+
+ if not _sentinel_proc_initialized:
+ if composite_sentinel:
+ _composite_sentinel_proc = [
+ col.type._cached_result_processor(
+ self, cursor_desc[1]
+ )
+ for col, cursor_desc in zip(
+ imv.sentinel_columns,
+ cursor.description[-_nsc:],
+ )
+ ]
+ else:
+ _scalar_sentinel_proc = (
+ imv.sentinel_columns[0]
+ ).type._cached_result_processor(
+ self, cursor.description[-1][1]
+ )
+ _sentinel_proc_initialized = True
+
+ rows_by_sentinel: Union[
+ Dict[Tuple[Any, ...], Any],
+ Dict[Any, Any],
+ ]
+ if composite_sentinel:
+ rows_by_sentinel = {
+ tuple(
+ (proc(val) if proc else val)
+ for val, proc in zip(
+ row[-_nsc:], _composite_sentinel_proc
+ )
+ ): row
+ for row in rows
+ }
+ elif _scalar_sentinel_proc:
+ rows_by_sentinel = {
+ _scalar_sentinel_proc(row[-1]): row for row in rows
+ }
+ else:
+ rows_by_sentinel = {row[-1]: row for row in rows}
+
+ if len(rows_by_sentinel) != len(imv_batch.batch):
+ # see test_insert_exec.py::
+ # IMVSentinelTest::test_sentinel_incorrect_rowcount
+ # for coverage / demonstration
+ raise exc.InvalidRequestError(
+ f"Sentinel-keyed result set did not produce "
+ f"correct number of rows {len(imv_batch.batch)}; "
+ "produced "
+ f"{len(rows_by_sentinel)}. Please ensure the "
+ "sentinel column is fully unique and populated in "
+ "all cases."
+ )
+
+ try:
+ ordered_rows = [
+ rows_by_sentinel[sentinel_keys]
+ for sentinel_keys in imv_batch.sentinel_values
+ ]
+ except KeyError as ke:
+ # see test_insert_exec.py::
+ # IMVSentinelTest::test_sentinel_cant_match_keys
+ # for coverage / demonstration
+ raise exc.InvalidRequestError(
+ f"Can't match sentinel values in result set to "
+ f"parameter sets; key {ke.args[0]!r} was not "
+ "found. "
+ "There may be a mismatch between the datatype "
+ "passed to the DBAPI driver vs. that which it "
+ "returns in a result row. Ensure the given "
+ "Python value matches the expected result type "
+ "*exactly*, taking care to not rely upon implicit "
+ "conversions which may occur such as when using "
+ "strings in place of UUID or integer values, etc. "
+ ) from ke
+
+ result.extend(ordered_rows)
+
+ else:
+ result.extend(rows)
+
+ def do_executemany(self, cursor, statement, parameters, context=None):
+ cursor.executemany(statement, parameters)
+
+ def do_execute(self, cursor, statement, parameters, context=None):
+ cursor.execute(statement, parameters)
+
+ def do_execute_no_params(self, cursor, statement, context=None):
+ cursor.execute(statement)
+
+ def is_disconnect(self, e, connection, cursor):
+ return False
+
+ @util.memoized_instancemethod
+ def _gen_allowed_isolation_levels(self, dbapi_conn):
+ try:
+ raw_levels = list(self.get_isolation_level_values(dbapi_conn))
+ except NotImplementedError:
+ return None
+ else:
+ normalized_levels = [
+ level.replace("_", " ").upper() for level in raw_levels
+ ]
+ if raw_levels != normalized_levels:
+ raise ValueError(
+ f"Dialect {self.name!r} get_isolation_level_values() "
+ f"method should return names as UPPERCASE using spaces, "
+ f"not underscores; got "
+ f"{sorted(set(raw_levels).difference(normalized_levels))}"
+ )
+ return tuple(normalized_levels)
+
+ def _assert_and_set_isolation_level(self, dbapi_conn, level):
+ level = level.replace("_", " ").upper()
+
+ _allowed_isolation_levels = self._gen_allowed_isolation_levels(
+ dbapi_conn
+ )
+ if (
+ _allowed_isolation_levels
+ and level not in _allowed_isolation_levels
+ ):
+ raise exc.ArgumentError(
+ f"Invalid value {level!r} for isolation_level. "
+ f"Valid isolation levels for {self.name!r} are "
+ f"{', '.join(_allowed_isolation_levels)}"
+ )
+
+ self.set_isolation_level(dbapi_conn, level)
+
+ def reset_isolation_level(self, dbapi_conn):
+ if self._on_connect_isolation_level is not None:
+ assert (
+ self._on_connect_isolation_level == "AUTOCOMMIT"
+ or self._on_connect_isolation_level
+ == self.default_isolation_level
+ )
+ self._assert_and_set_isolation_level(
+ dbapi_conn, self._on_connect_isolation_level
+ )
+ else:
+ assert self.default_isolation_level is not None
+ self._assert_and_set_isolation_level(
+ dbapi_conn,
+ self.default_isolation_level,
+ )
+
+ def normalize_name(self, name):
+ if name is None:
+ return None
+
+ name_lower = name.lower()
+ name_upper = name.upper()
+
+ if name_upper == name_lower:
+ # name has no upper/lower conversion, e.g. non-european characters.
+ # return unchanged
+ return name
+ elif name_upper == name and not (
+ self.identifier_preparer._requires_quotes
+ )(name_lower):
+ # name is all uppercase and doesn't require quoting; normalize
+ # to all lower case
+ return name_lower
+ elif name_lower == name:
+ # name is all lower case, which if denormalized means we need to
+ # force quoting on it
+ return quoted_name(name, quote=True)
+ else:
+ # name is mixed case, means it will be quoted in SQL when used
+ # later, no normalizes
+ return name
+
+ def denormalize_name(self, name):
+ if name is None:
+ return None
+
+ name_lower = name.lower()
+ name_upper = name.upper()
+
+ if name_upper == name_lower:
+ # name has no upper/lower conversion, e.g. non-european characters.
+ # return unchanged
+ return name
+ elif name_lower == name and not (
+ self.identifier_preparer._requires_quotes
+ )(name_lower):
+ name = name_upper
+ return name
+
+ def get_driver_connection(self, connection):
+ return connection
+
+ def _overrides_default(self, method):
+ return (
+ getattr(type(self), method).__code__
+ is not getattr(DefaultDialect, method).__code__
+ )
+
+ def _default_multi_reflect(
+ self,
+ single_tbl_method,
+ connection,
+ kind,
+ schema,
+ filter_names,
+ scope,
+ **kw,
+ ):
+ names_fns = []
+ temp_names_fns = []
+ if ObjectKind.TABLE in kind:
+ names_fns.append(self.get_table_names)
+ temp_names_fns.append(self.get_temp_table_names)
+ if ObjectKind.VIEW in kind:
+ names_fns.append(self.get_view_names)
+ temp_names_fns.append(self.get_temp_view_names)
+ if ObjectKind.MATERIALIZED_VIEW in kind:
+ names_fns.append(self.get_materialized_view_names)
+ # no temp materialized view at the moment
+ # temp_names_fns.append(self.get_temp_materialized_view_names)
+
+ unreflectable = kw.pop("unreflectable", {})
+
+ if (
+ filter_names
+ and scope is ObjectScope.ANY
+ and kind is ObjectKind.ANY
+ ):
+ # if names are given and no qualification on type of table
+ # (i.e. the Table(..., autoload) case), take the names as given,
+ # don't run names queries. If a table does not exit
+ # NoSuchTableError is raised and it's skipped
+
+ # this also suits the case for mssql where we can reflect
+ # individual temp tables but there's no temp_names_fn
+ names = filter_names
+ else:
+ names = []
+ name_kw = {"schema": schema, **kw}
+ fns = []
+ if ObjectScope.DEFAULT in scope:
+ fns.extend(names_fns)
+ if ObjectScope.TEMPORARY in scope:
+ fns.extend(temp_names_fns)
+
+ for fn in fns:
+ try:
+ names.extend(fn(connection, **name_kw))
+ except NotImplementedError:
+ pass
+
+ if filter_names:
+ filter_names = set(filter_names)
+
+ # iterate over all the tables/views and call the single table method
+ for table in names:
+ if not filter_names or table in filter_names:
+ key = (schema, table)
+ try:
+ yield (
+ key,
+ single_tbl_method(
+ connection, table, schema=schema, **kw
+ ),
+ )
+ except exc.UnreflectableTableError as err:
+ if key not in unreflectable:
+ unreflectable[key] = err
+ except exc.NoSuchTableError:
+ pass
+
+ def get_multi_table_options(self, connection, **kw):
+ return self._default_multi_reflect(
+ self.get_table_options, connection, **kw
+ )
+
+ def get_multi_columns(self, connection, **kw):
+ return self._default_multi_reflect(self.get_columns, connection, **kw)
+
+ def get_multi_pk_constraint(self, connection, **kw):
+ return self._default_multi_reflect(
+ self.get_pk_constraint, connection, **kw
+ )
+
+ def get_multi_foreign_keys(self, connection, **kw):
+ return self._default_multi_reflect(
+ self.get_foreign_keys, connection, **kw
+ )
+
+ def get_multi_indexes(self, connection, **kw):
+ return self._default_multi_reflect(self.get_indexes, connection, **kw)
+
+ def get_multi_unique_constraints(self, connection, **kw):
+ return self._default_multi_reflect(
+ self.get_unique_constraints, connection, **kw
+ )
+
+ def get_multi_check_constraints(self, connection, **kw):
+ return self._default_multi_reflect(
+ self.get_check_constraints, connection, **kw
+ )
+
+ def get_multi_table_comment(self, connection, **kw):
+ return self._default_multi_reflect(
+ self.get_table_comment, connection, **kw
+ )
+
+
+class StrCompileDialect(DefaultDialect):
+ statement_compiler = compiler.StrSQLCompiler
+ ddl_compiler = compiler.DDLCompiler
+ type_compiler_cls = compiler.StrSQLTypeCompiler
+ preparer = compiler.IdentifierPreparer
+
+ insert_returning = True
+ update_returning = True
+ delete_returning = True
+
+ supports_statement_cache = True
+
+ supports_identity_columns = True
+
+ supports_sequences = True
+ sequences_optional = True
+ preexecute_autoincrement_sequences = False
+
+ supports_native_boolean = True
+
+ supports_multivalues_insert = True
+ supports_simple_order_by_label = True
+
+
+class DefaultExecutionContext(ExecutionContext):
+ isinsert = False
+ isupdate = False
+ isdelete = False
+ is_crud = False
+ is_text = False
+ isddl = False
+
+ execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
+
+ compiled: Optional[Compiled] = None
+ result_column_struct: Optional[
+ Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
+ ] = None
+ returned_default_rows: Optional[Sequence[Row[Any]]] = None
+
+ execution_options: _ExecuteOptions = util.EMPTY_DICT
+
+ cursor_fetch_strategy = _cursor._DEFAULT_FETCH
+
+ invoked_statement: Optional[Executable] = None
+
+ _is_implicit_returning = False
+ _is_explicit_returning = False
+ _is_supplemental_returning = False
+ _is_server_side = False
+
+ _soft_closed = False
+
+ _rowcount: Optional[int] = None
+
+ # a hook for SQLite's translation of
+ # result column names
+ # NOTE: pyhive is using this hook, can't remove it :(
+ _translate_colname: Optional[Callable[[str], str]] = None
+
+ _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
+ """used by set_input_sizes().
+
+ This collection comes from ``ExpandedState.parameter_expansion``.
+
+ """
+
+ cache_hit = NO_CACHE_KEY
+
+ root_connection: Connection
+ _dbapi_connection: PoolProxiedConnection
+ dialect: Dialect
+ unicode_statement: str
+ cursor: DBAPICursor
+ compiled_parameters: List[_MutableCoreSingleExecuteParams]
+ parameters: _DBAPIMultiExecuteParams
+ extracted_parameters: Optional[Sequence[BindParameter[Any]]]
+
+ _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
+
+ _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
+ _num_sentinel_cols: int = 0
+
+ @classmethod
+ def _init_ddl(
+ cls,
+ dialect: Dialect,
+ connection: Connection,
+ dbapi_connection: PoolProxiedConnection,
+ execution_options: _ExecuteOptions,
+ compiled_ddl: DDLCompiler,
+ ) -> ExecutionContext:
+ """Initialize execution context for an ExecutableDDLElement
+ construct."""
+
+ self = cls.__new__(cls)
+ self.root_connection = connection
+ self._dbapi_connection = dbapi_connection
+ self.dialect = connection.dialect
+
+ self.compiled = compiled = compiled_ddl
+ self.isddl = True
+
+ self.execution_options = execution_options
+
+ self.unicode_statement = str(compiled)
+ if compiled.schema_translate_map:
+ schema_translate_map = self.execution_options.get(
+ "schema_translate_map", {}
+ )
+
+ rst = compiled.preparer._render_schema_translates
+ self.unicode_statement = rst(
+ self.unicode_statement, schema_translate_map
+ )
+
+ self.statement = self.unicode_statement
+
+ self.cursor = self.create_cursor()
+ self.compiled_parameters = []
+
+ if dialect.positional:
+ self.parameters = [dialect.execute_sequence_format()]
+ else:
+ self.parameters = [self._empty_dict_params]
+
+ return self
+
+ @classmethod
+ def _init_compiled(
+ cls,
+ dialect: Dialect,
+ connection: Connection,
+ dbapi_connection: PoolProxiedConnection,
+ execution_options: _ExecuteOptions,
+ compiled: SQLCompiler,
+ parameters: _CoreMultiExecuteParams,
+ invoked_statement: Executable,
+ extracted_parameters: Optional[Sequence[BindParameter[Any]]],
+ cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
+ ) -> ExecutionContext:
+ """Initialize execution context for a Compiled construct."""
+
+ self = cls.__new__(cls)
+ self.root_connection = connection
+ self._dbapi_connection = dbapi_connection
+ self.dialect = connection.dialect
+ self.extracted_parameters = extracted_parameters
+ self.invoked_statement = invoked_statement
+ self.compiled = compiled
+ self.cache_hit = cache_hit
+
+ self.execution_options = execution_options
+
+ self.result_column_struct = (
+ compiled._result_columns,
+ compiled._ordered_columns,
+ compiled._textual_ordered_columns,
+ compiled._ad_hoc_textual,
+ compiled._loose_column_name_matching,
+ )
+
+ self.isinsert = ii = compiled.isinsert
+ self.isupdate = iu = compiled.isupdate
+ self.isdelete = id_ = compiled.isdelete
+ self.is_text = compiled.isplaintext
+
+ if ii or iu or id_:
+ dml_statement = compiled.compile_state.statement # type: ignore
+ if TYPE_CHECKING:
+ assert isinstance(dml_statement, UpdateBase)
+ self.is_crud = True
+ self._is_explicit_returning = ier = bool(dml_statement._returning)
+ self._is_implicit_returning = iir = bool(
+ compiled.implicit_returning
+ )
+ if iir and dml_statement._supplemental_returning:
+ self._is_supplemental_returning = True
+
+ # dont mix implicit and explicit returning
+ assert not (iir and ier)
+
+ if (ier or iir) and compiled.for_executemany:
+ if ii and not self.dialect.insert_executemany_returning:
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "INSERT..RETURNING when executemany is used"
+ )
+ elif (
+ ii
+ and dml_statement._sort_by_parameter_order
+ and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
+ ):
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "INSERT..RETURNING with deterministic row ordering "
+ "when executemany is used"
+ )
+ elif (
+ ii
+ and self.dialect.use_insertmanyvalues
+ and not compiled._insertmanyvalues
+ ):
+ raise exc.InvalidRequestError(
+ 'Statement does not have "insertmanyvalues" '
+ "enabled, can't use INSERT..RETURNING with "
+ "executemany in this case."
+ )
+ elif iu and not self.dialect.update_executemany_returning:
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "UPDATE..RETURNING when executemany is used"
+ )
+ elif id_ and not self.dialect.delete_executemany_returning:
+ raise exc.InvalidRequestError(
+ f"Dialect {self.dialect.dialect_description} with "
+ f"current server capabilities does not support "
+ "DELETE..RETURNING when executemany is used"
+ )
+
+ if not parameters:
+ self.compiled_parameters = [
+ compiled.construct_params(
+ extracted_parameters=extracted_parameters,
+ escape_names=False,
+ )
+ ]
+ else:
+ self.compiled_parameters = [
+ compiled.construct_params(
+ m,
+ escape_names=False,
+ _group_number=grp,
+ extracted_parameters=extracted_parameters,
+ )
+ for grp, m in enumerate(parameters)
+ ]
+
+ if len(parameters) > 1:
+ if self.isinsert and compiled._insertmanyvalues:
+ self.execute_style = ExecuteStyle.INSERTMANYVALUES
+
+ imv = compiled._insertmanyvalues
+ if imv.sentinel_columns is not None:
+ self._num_sentinel_cols = imv.num_sentinel_columns
+ else:
+ self.execute_style = ExecuteStyle.EXECUTEMANY
+
+ self.unicode_statement = compiled.string
+
+ self.cursor = self.create_cursor()
+
+ if self.compiled.insert_prefetch or self.compiled.update_prefetch:
+ self._process_execute_defaults()
+
+ processors = compiled._bind_processors
+
+ flattened_processors: Mapping[
+ str, _BindProcessorType[Any]
+ ] = processors # type: ignore[assignment]
+
+ if compiled.literal_execute_params or compiled.post_compile_params:
+ if self.executemany:
+ raise exc.InvalidRequestError(
+ "'literal_execute' or 'expanding' parameters can't be "
+ "used with executemany()"
+ )
+
+ expanded_state = compiled._process_parameters_for_postcompile(
+ self.compiled_parameters[0]
+ )
+
+ # re-assign self.unicode_statement
+ self.unicode_statement = expanded_state.statement
+
+ self._expanded_parameters = expanded_state.parameter_expansion
+
+ flattened_processors = dict(processors) # type: ignore
+ flattened_processors.update(expanded_state.processors)
+ positiontup = expanded_state.positiontup
+ elif compiled.positional:
+ positiontup = self.compiled.positiontup
+ else:
+ positiontup = None
+
+ if compiled.schema_translate_map:
+ schema_translate_map = self.execution_options.get(
+ "schema_translate_map", {}
+ )
+ rst = compiled.preparer._render_schema_translates
+ self.unicode_statement = rst(
+ self.unicode_statement, schema_translate_map
+ )
+
+ # final self.unicode_statement is now assigned, encode if needed
+ # by dialect
+ self.statement = self.unicode_statement
+
+ # Convert the dictionary of bind parameter values
+ # into a dict or list to be sent to the DBAPI's
+ # execute() or executemany() method.
+
+ if compiled.positional:
+ core_positional_parameters: MutableSequence[Sequence[Any]] = []
+ assert positiontup is not None
+ for compiled_params in self.compiled_parameters:
+ l_param: List[Any] = [
+ (
+ flattened_processors[key](compiled_params[key])
+ if key in flattened_processors
+ else compiled_params[key]
+ )
+ for key in positiontup
+ ]
+ core_positional_parameters.append(
+ dialect.execute_sequence_format(l_param)
+ )
+
+ self.parameters = core_positional_parameters
+ else:
+ core_dict_parameters: MutableSequence[Dict[str, Any]] = []
+ escaped_names = compiled.escaped_bind_names
+
+ # note that currently, "expanded" parameters will be present
+ # in self.compiled_parameters in their quoted form. This is
+ # slightly inconsistent with the approach taken as of
+ # #8056 where self.compiled_parameters is meant to contain unquoted
+ # param names.
+ d_param: Dict[str, Any]
+ for compiled_params in self.compiled_parameters:
+ if escaped_names:
+ d_param = {
+ escaped_names.get(key, key): (
+ flattened_processors[key](compiled_params[key])
+ if key in flattened_processors
+ else compiled_params[key]
+ )
+ for key in compiled_params
+ }
+ else:
+ d_param = {
+ key: (
+ flattened_processors[key](compiled_params[key])
+ if key in flattened_processors
+ else compiled_params[key]
+ )
+ for key in compiled_params
+ }
+
+ core_dict_parameters.append(d_param)
+
+ self.parameters = core_dict_parameters
+
+ return self
+
+ @classmethod
+ def _init_statement(
+ cls,
+ dialect: Dialect,
+ connection: Connection,
+ dbapi_connection: PoolProxiedConnection,
+ execution_options: _ExecuteOptions,
+ statement: str,
+ parameters: _DBAPIMultiExecuteParams,
+ ) -> ExecutionContext:
+ """Initialize execution context for a string SQL statement."""
+
+ self = cls.__new__(cls)
+ self.root_connection = connection
+ self._dbapi_connection = dbapi_connection
+ self.dialect = connection.dialect
+ self.is_text = True
+
+ self.execution_options = execution_options
+
+ if not parameters:
+ if self.dialect.positional:
+ self.parameters = [dialect.execute_sequence_format()]
+ else:
+ self.parameters = [self._empty_dict_params]
+ elif isinstance(parameters[0], dialect.execute_sequence_format):
+ self.parameters = parameters
+ elif isinstance(parameters[0], dict):
+ self.parameters = parameters
+ else:
+ self.parameters = [
+ dialect.execute_sequence_format(p) for p in parameters
+ ]
+
+ if len(parameters) > 1:
+ self.execute_style = ExecuteStyle.EXECUTEMANY
+
+ self.statement = self.unicode_statement = statement
+
+ self.cursor = self.create_cursor()
+ return self
+
+ @classmethod
+ def _init_default(
+ cls,
+ dialect: Dialect,
+ connection: Connection,
+ dbapi_connection: PoolProxiedConnection,
+ execution_options: _ExecuteOptions,
+ ) -> ExecutionContext:
+ """Initialize execution context for a ColumnDefault construct."""
+
+ self = cls.__new__(cls)
+ self.root_connection = connection
+ self._dbapi_connection = dbapi_connection
+ self.dialect = connection.dialect
+
+ self.execution_options = execution_options
+
+ self.cursor = self.create_cursor()
+ return self
+
+ def _get_cache_stats(self) -> str:
+ if self.compiled is None:
+ return "raw sql"
+
+ now = perf_counter()
+
+ ch = self.cache_hit
+
+ gen_time = self.compiled._gen_time
+ assert gen_time is not None
+
+ if ch is NO_CACHE_KEY:
+ return "no key %.5fs" % (now - gen_time,)
+ elif ch is CACHE_HIT:
+ return "cached since %.4gs ago" % (now - gen_time,)
+ elif ch is CACHE_MISS:
+ return "generated in %.5fs" % (now - gen_time,)
+ elif ch is CACHING_DISABLED:
+ if "_cache_disable_reason" in self.execution_options:
+ return "caching disabled (%s) %.5fs " % (
+ self.execution_options["_cache_disable_reason"],
+ now - gen_time,
+ )
+ else:
+ return "caching disabled %.5fs" % (now - gen_time,)
+ elif ch is NO_DIALECT_SUPPORT:
+ return "dialect %s+%s does not support caching %.5fs" % (
+ self.dialect.name,
+ self.dialect.driver,
+ now - gen_time,
+ )
+ else:
+ return "unknown"
+
+ @property
+ def executemany(self):
+ return self.execute_style in (
+ ExecuteStyle.EXECUTEMANY,
+ ExecuteStyle.INSERTMANYVALUES,
+ )
+
+ @util.memoized_property
+ def identifier_preparer(self):
+ if self.compiled:
+ return self.compiled.preparer
+ elif "schema_translate_map" in self.execution_options:
+ return self.dialect.identifier_preparer._with_schema_translate(
+ self.execution_options["schema_translate_map"]
+ )
+ else:
+ return self.dialect.identifier_preparer
+
+ @util.memoized_property
+ def engine(self):
+ return self.root_connection.engine
+
+ @util.memoized_property
+ def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
+ if TYPE_CHECKING:
+ assert isinstance(self.compiled, SQLCompiler)
+ return self.compiled.postfetch
+
+ @util.memoized_property
+ def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
+ if TYPE_CHECKING:
+ assert isinstance(self.compiled, SQLCompiler)
+ if self.isinsert:
+ return self.compiled.insert_prefetch
+ elif self.isupdate:
+ return self.compiled.update_prefetch
+ else:
+ return ()
+
+ @util.memoized_property
+ def no_parameters(self):
+ return self.execution_options.get("no_parameters", False)
+
+ def _execute_scalar(self, stmt, type_, parameters=None):
+ """Execute a string statement on the current cursor, returning a
+ scalar result.
+
+ Used to fire off sequences, default phrases, and "select lastrowid"
+ types of statements individually or in the context of a parent INSERT
+ or UPDATE statement.
+
+ """
+
+ conn = self.root_connection
+
+ if "schema_translate_map" in self.execution_options:
+ schema_translate_map = self.execution_options.get(
+ "schema_translate_map", {}
+ )
+
+ rst = self.identifier_preparer._render_schema_translates
+ stmt = rst(stmt, schema_translate_map)
+
+ if not parameters:
+ if self.dialect.positional:
+ parameters = self.dialect.execute_sequence_format()
+ else:
+ parameters = {}
+
+ conn._cursor_execute(self.cursor, stmt, parameters, context=self)
+ row = self.cursor.fetchone()
+ if row is not None:
+ r = row[0]
+ else:
+ r = None
+ if type_ is not None:
+ # apply type post processors to the result
+ proc = type_._cached_result_processor(
+ self.dialect, self.cursor.description[0][1]
+ )
+ if proc:
+ return proc(r)
+ return r
+
+ @util.memoized_property
+ def connection(self):
+ return self.root_connection
+
+ def _use_server_side_cursor(self):
+ if not self.dialect.supports_server_side_cursors:
+ return False
+
+ if self.dialect.server_side_cursors:
+ # this is deprecated
+ use_server_side = self.execution_options.get(
+ "stream_results", True
+ ) and (
+ self.compiled
+ and isinstance(self.compiled.statement, expression.Selectable)
+ or (
+ (
+ not self.compiled
+ or isinstance(
+ self.compiled.statement, expression.TextClause
+ )
+ )
+ and self.unicode_statement
+ and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
+ )
+ )
+ else:
+ use_server_side = self.execution_options.get(
+ "stream_results", False
+ )
+
+ return use_server_side
+
+ def create_cursor(self):
+ if (
+ # inlining initial preference checks for SS cursors
+ self.dialect.supports_server_side_cursors
+ and (
+ self.execution_options.get("stream_results", False)
+ or (
+ self.dialect.server_side_cursors
+ and self._use_server_side_cursor()
+ )
+ )
+ ):
+ self._is_server_side = True
+ return self.create_server_side_cursor()
+ else:
+ self._is_server_side = False
+ return self.create_default_cursor()
+
+ def fetchall_for_returning(self, cursor):
+ return cursor.fetchall()
+
+ def create_default_cursor(self):
+ return self._dbapi_connection.cursor()
+
+ def create_server_side_cursor(self):
+ raise NotImplementedError()
+
+ def pre_exec(self):
+ pass
+
+ def get_out_parameter_values(self, names):
+ raise NotImplementedError(
+ "This dialect does not support OUT parameters"
+ )
+
+ def post_exec(self):
+ pass
+
+ def get_result_processor(self, type_, colname, coltype):
+ """Return a 'result processor' for a given type as present in
+ cursor.description.
+
+ This has a default implementation that dialects can override
+ for context-sensitive result type handling.
+
+ """
+ return type_._cached_result_processor(self.dialect, coltype)
+
+ def get_lastrowid(self):
+ """return self.cursor.lastrowid, or equivalent, after an INSERT.
+
+ This may involve calling special cursor functions, issuing a new SELECT
+ on the cursor (or a new one), or returning a stored value that was
+ calculated within post_exec().
+
+ This function will only be called for dialects which support "implicit"
+ primary key generation, keep preexecute_autoincrement_sequences set to
+ False, and when no explicit id value was bound to the statement.
+
+ The function is called once for an INSERT statement that would need to
+ return the last inserted primary key for those dialects that make use
+ of the lastrowid concept. In these cases, it is called directly after
+ :meth:`.ExecutionContext.post_exec`.
+
+ """
+ return self.cursor.lastrowid
+
+ def handle_dbapi_exception(self, e):
+ pass
+
+ @util.non_memoized_property
+ def rowcount(self) -> int:
+ if self._rowcount is not None:
+ return self._rowcount
+ else:
+ return self.cursor.rowcount
+
+ @property
+ def _has_rowcount(self):
+ return self._rowcount is not None
+
+ def supports_sane_rowcount(self):
+ return self.dialect.supports_sane_rowcount
+
+ def supports_sane_multi_rowcount(self):
+ return self.dialect.supports_sane_multi_rowcount
+
+ def _setup_result_proxy(self):
+ exec_opt = self.execution_options
+
+ if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
+ self._rowcount = self.cursor.rowcount
+
+ if self.is_crud or self.is_text:
+ result = self._setup_dml_or_text_result()
+ yp = sr = False
+ else:
+ yp = exec_opt.get("yield_per", None)
+ sr = self._is_server_side or exec_opt.get("stream_results", False)
+ strategy = self.cursor_fetch_strategy
+ if sr and strategy is _cursor._DEFAULT_FETCH:
+ strategy = _cursor.BufferedRowCursorFetchStrategy(
+ self.cursor, self.execution_options
+ )
+ cursor_description: _DBAPICursorDescription = (
+ strategy.alternate_cursor_description
+ or self.cursor.description
+ )
+ if cursor_description is None:
+ strategy = _cursor._NO_CURSOR_DQL
+
+ result = _cursor.CursorResult(self, strategy, cursor_description)
+
+ compiled = self.compiled
+
+ if (
+ compiled
+ and not self.isddl
+ and cast(SQLCompiler, compiled).has_out_parameters
+ ):
+ self._setup_out_parameters(result)
+
+ self._soft_closed = result._soft_closed
+
+ if yp:
+ result = result.yield_per(yp)
+
+ return result
+
+ def _setup_out_parameters(self, result):
+ compiled = cast(SQLCompiler, self.compiled)
+
+ out_bindparams = [
+ (param, name)
+ for param, name in compiled.bind_names.items()
+ if param.isoutparam
+ ]
+ out_parameters = {}
+
+ for bindparam, raw_value in zip(
+ [param for param, name in out_bindparams],
+ self.get_out_parameter_values(
+ [name for param, name in out_bindparams]
+ ),
+ ):
+ type_ = bindparam.type
+ impl_type = type_.dialect_impl(self.dialect)
+ dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
+ result_processor = impl_type.result_processor(
+ self.dialect, dbapi_type
+ )
+ if result_processor is not None:
+ raw_value = result_processor(raw_value)
+ out_parameters[bindparam.key] = raw_value
+
+ result.out_parameters = out_parameters
+
+ def _setup_dml_or_text_result(self):
+ compiled = cast(SQLCompiler, self.compiled)
+
+ strategy: ResultFetchStrategy = self.cursor_fetch_strategy
+
+ if self.isinsert:
+ if (
+ self.execute_style is ExecuteStyle.INSERTMANYVALUES
+ and compiled.effective_returning
+ ):
+ strategy = _cursor.FullyBufferedCursorFetchStrategy(
+ self.cursor,
+ initial_buffer=self._insertmanyvalues_rows,
+ # maintain alt cursor description if set by the
+ # dialect, e.g. mssql preserves it
+ alternate_description=(
+ strategy.alternate_cursor_description
+ ),
+ )
+
+ if compiled.postfetch_lastrowid:
+ self.inserted_primary_key_rows = (
+ self._setup_ins_pk_from_lastrowid()
+ )
+ # else if not self._is_implicit_returning,
+ # the default inserted_primary_key_rows accessor will
+ # return an "empty" primary key collection when accessed.
+
+ if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
+ strategy = _cursor.BufferedRowCursorFetchStrategy(
+ self.cursor, self.execution_options
+ )
+
+ if strategy is _cursor._NO_CURSOR_DML:
+ cursor_description = None
+ else:
+ cursor_description = (
+ strategy.alternate_cursor_description
+ or self.cursor.description
+ )
+
+ if cursor_description is None:
+ strategy = _cursor._NO_CURSOR_DML
+ elif self._num_sentinel_cols:
+ assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
+ # strip out the sentinel columns from cursor description
+ # a similar logic is done to the rows only in CursorResult
+ cursor_description = cursor_description[
+ 0 : -self._num_sentinel_cols
+ ]
+
+ result: _cursor.CursorResult[Any] = _cursor.CursorResult(
+ self, strategy, cursor_description
+ )
+
+ if self.isinsert:
+ if self._is_implicit_returning:
+ rows = result.all()
+
+ self.returned_default_rows = rows
+
+ self.inserted_primary_key_rows = (
+ self._setup_ins_pk_from_implicit_returning(result, rows)
+ )
+
+ # test that it has a cursor metadata that is accurate. the
+ # first row will have been fetched and current assumptions
+ # are that the result has only one row, until executemany()
+ # support is added here.
+ assert result._metadata.returns_rows
+
+ # Insert statement has both return_defaults() and
+ # returning(). rewind the result on the list of rows
+ # we just used.
+ if self._is_supplemental_returning:
+ result._rewind(rows)
+ else:
+ result._soft_close()
+ elif not self._is_explicit_returning:
+ result._soft_close()
+
+ # we assume here the result does not return any rows.
+ # *usually*, this will be true. However, some dialects
+ # such as that of MSSQL/pyodbc need to SELECT a post fetch
+ # function so this is not necessarily true.
+ # assert not result.returns_rows
+
+ elif self._is_implicit_returning:
+ rows = result.all()
+
+ if rows:
+ self.returned_default_rows = rows
+ self._rowcount = len(rows)
+
+ if self._is_supplemental_returning:
+ result._rewind(rows)
+ else:
+ result._soft_close()
+
+ # test that it has a cursor metadata that is accurate.
+ # the rows have all been fetched however.
+ assert result._metadata.returns_rows
+
+ elif not result._metadata.returns_rows:
+ # no results, get rowcount
+ # (which requires open cursor on some drivers)
+ if self._rowcount is None:
+ self._rowcount = self.cursor.rowcount
+ result._soft_close()
+ elif self.isupdate or self.isdelete:
+ if self._rowcount is None:
+ self._rowcount = self.cursor.rowcount
+ return result
+
+ @util.memoized_property
+ def inserted_primary_key_rows(self):
+ # if no specific "get primary key" strategy was set up
+ # during execution, return a "default" primary key based
+ # on what's in the compiled_parameters and nothing else.
+ return self._setup_ins_pk_from_empty()
+
+ def _setup_ins_pk_from_lastrowid(self):
+ getter = cast(
+ SQLCompiler, self.compiled
+ )._inserted_primary_key_from_lastrowid_getter
+ lastrowid = self.get_lastrowid()
+ return [getter(lastrowid, self.compiled_parameters[0])]
+
+ def _setup_ins_pk_from_empty(self):
+ getter = cast(
+ SQLCompiler, self.compiled
+ )._inserted_primary_key_from_lastrowid_getter
+ return [getter(None, param) for param in self.compiled_parameters]
+
+ def _setup_ins_pk_from_implicit_returning(self, result, rows):
+ if not rows:
+ return []
+
+ getter = cast(
+ SQLCompiler, self.compiled
+ )._inserted_primary_key_from_returning_getter
+ compiled_params = self.compiled_parameters
+
+ return [
+ getter(row, param) for row, param in zip(rows, compiled_params)
+ ]
+
+ def lastrow_has_defaults(self):
+ return (self.isinsert or self.isupdate) and bool(
+ cast(SQLCompiler, self.compiled).postfetch
+ )
+
+ def _prepare_set_input_sizes(
+ self,
+ ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
+ """Given a cursor and ClauseParameters, prepare arguments
+ in order to call the appropriate
+ style of ``setinputsizes()`` on the cursor, using DB-API types
+ from the bind parameter's ``TypeEngine`` objects.
+
+ This method only called by those dialects which set
+ the :attr:`.Dialect.bind_typing` attribute to
+ :attr:`.BindTyping.SETINPUTSIZES`. cx_Oracle is the only DBAPI
+ that requires setinputsizes(), pyodbc offers it as an option.
+
+ Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
+ for pg8000 and asyncpg, which has been changed to inline rendering
+ of casts.
+
+ """
+ if self.isddl or self.is_text:
+ return None
+
+ compiled = cast(SQLCompiler, self.compiled)
+
+ inputsizes = compiled._get_set_input_sizes_lookup()
+
+ if inputsizes is None:
+ return None
+
+ dialect = self.dialect
+
+ # all of the rest of this... cython?
+
+ if dialect._has_events:
+ inputsizes = dict(inputsizes)
+ dialect.dispatch.do_setinputsizes(
+ inputsizes, self.cursor, self.statement, self.parameters, self
+ )
+
+ if compiled.escaped_bind_names:
+ escaped_bind_names = compiled.escaped_bind_names
+ else:
+ escaped_bind_names = None
+
+ if dialect.positional:
+ items = [
+ (key, compiled.binds[key])
+ for key in compiled.positiontup or ()
+ ]
+ else:
+ items = [
+ (key, bindparam)
+ for bindparam, key in compiled.bind_names.items()
+ ]
+
+ generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
+ for key, bindparam in items:
+ if bindparam in compiled.literal_execute_params:
+ continue
+
+ if key in self._expanded_parameters:
+ if is_tuple_type(bindparam.type):
+ num = len(bindparam.type.types)
+ dbtypes = inputsizes[bindparam]
+ generic_inputsizes.extend(
+ (
+ (
+ escaped_bind_names.get(paramname, paramname)
+ if escaped_bind_names is not None
+ else paramname
+ ),
+ dbtypes[idx % num],
+ bindparam.type.types[idx % num],
+ )
+ for idx, paramname in enumerate(
+ self._expanded_parameters[key]
+ )
+ )
+ else:
+ dbtype = inputsizes.get(bindparam, None)
+ generic_inputsizes.extend(
+ (
+ (
+ escaped_bind_names.get(paramname, paramname)
+ if escaped_bind_names is not None
+ else paramname
+ ),
+ dbtype,
+ bindparam.type,
+ )
+ for paramname in self._expanded_parameters[key]
+ )
+ else:
+ dbtype = inputsizes.get(bindparam, None)
+
+ escaped_name = (
+ escaped_bind_names.get(key, key)
+ if escaped_bind_names is not None
+ else key
+ )
+
+ generic_inputsizes.append(
+ (escaped_name, dbtype, bindparam.type)
+ )
+
+ return generic_inputsizes
+
+ def _exec_default(self, column, default, type_):
+ if default.is_sequence:
+ return self.fire_sequence(default, type_)
+ elif default.is_callable:
+ # this codepath is not normally used as it's inlined
+ # into _process_execute_defaults
+ self.current_column = column
+ return default.arg(self)
+ elif default.is_clause_element:
+ return self._exec_default_clause_element(column, default, type_)
+ else:
+ # this codepath is not normally used as it's inlined
+ # into _process_execute_defaults
+ return default.arg
+
+ def _exec_default_clause_element(self, column, default, type_):
+ # execute a default that's a complete clause element. Here, we have
+ # to re-implement a miniature version of the compile->parameters->
+ # cursor.execute() sequence, since we don't want to modify the state
+ # of the connection / result in progress or create new connection/
+ # result objects etc.
+ # .. versionchanged:: 1.4
+
+ if not default._arg_is_typed:
+ default_arg = expression.type_coerce(default.arg, type_)
+ else:
+ default_arg = default.arg
+ compiled = expression.select(default_arg).compile(dialect=self.dialect)
+ compiled_params = compiled.construct_params()
+ processors = compiled._bind_processors
+ if compiled.positional:
+ parameters = self.dialect.execute_sequence_format(
+ [
+ (
+ processors[key](compiled_params[key]) # type: ignore
+ if key in processors
+ else compiled_params[key]
+ )
+ for key in compiled.positiontup or ()
+ ]
+ )
+ else:
+ parameters = {
+ key: (
+ processors[key](compiled_params[key]) # type: ignore
+ if key in processors
+ else compiled_params[key]
+ )
+ for key in compiled_params
+ }
+ return self._execute_scalar(
+ str(compiled), type_, parameters=parameters
+ )
+
+ current_parameters: Optional[_CoreSingleExecuteParams] = None
+ """A dictionary of parameters applied to the current row.
+
+ This attribute is only available in the context of a user-defined default
+ generation function, e.g. as described at :ref:`context_default_functions`.
+ It consists of a dictionary which includes entries for each column/value
+ pair that is to be part of the INSERT or UPDATE statement. The keys of the
+ dictionary will be the key value of each :class:`_schema.Column`,
+ which is usually
+ synonymous with the name.
+
+ Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
+ does not accommodate for the "multi-values" feature of the
+ :meth:`_expression.Insert.values` method. The
+ :meth:`.DefaultExecutionContext.get_current_parameters` method should be
+ preferred.
+
+ .. seealso::
+
+ :meth:`.DefaultExecutionContext.get_current_parameters`
+
+ :ref:`context_default_functions`
+
+ """
+
+ def get_current_parameters(self, isolate_multiinsert_groups=True):
+ """Return a dictionary of parameters applied to the current row.
+
+ This method can only be used in the context of a user-defined default
+ generation function, e.g. as described at
+ :ref:`context_default_functions`. When invoked, a dictionary is
+ returned which includes entries for each column/value pair that is part
+ of the INSERT or UPDATE statement. The keys of the dictionary will be
+ the key value of each :class:`_schema.Column`,
+ which is usually synonymous
+ with the name.
+
+ :param isolate_multiinsert_groups=True: indicates that multi-valued
+ INSERT constructs created using :meth:`_expression.Insert.values`
+ should be
+ handled by returning only the subset of parameters that are local
+ to the current column default invocation. When ``False``, the
+ raw parameters of the statement are returned including the
+ naming convention used in the case of multi-valued INSERT.
+
+ .. versionadded:: 1.2 added
+ :meth:`.DefaultExecutionContext.get_current_parameters`
+ which provides more functionality over the existing
+ :attr:`.DefaultExecutionContext.current_parameters`
+ attribute.
+
+ .. seealso::
+
+ :attr:`.DefaultExecutionContext.current_parameters`
+
+ :ref:`context_default_functions`
+
+ """
+ try:
+ parameters = self.current_parameters
+ column = self.current_column
+ except AttributeError:
+ raise exc.InvalidRequestError(
+ "get_current_parameters() can only be invoked in the "
+ "context of a Python side column default function"
+ )
+ else:
+ assert column is not None
+ assert parameters is not None
+ compile_state = cast(
+ "DMLState", cast(SQLCompiler, self.compiled).compile_state
+ )
+ assert compile_state is not None
+ if (
+ isolate_multiinsert_groups
+ and dml.isinsert(compile_state)
+ and compile_state._has_multi_parameters
+ ):
+ if column._is_multiparam_column:
+ index = column.index + 1
+ d = {column.original.key: parameters[column.key]}
+ else:
+ d = {column.key: parameters[column.key]}
+ index = 0
+ assert compile_state._dict_parameters is not None
+ keys = compile_state._dict_parameters.keys()
+ d.update(
+ (key, parameters["%s_m%d" % (key, index)]) for key in keys
+ )
+ return d
+ else:
+ return parameters
+
+ def get_insert_default(self, column):
+ if column.default is None:
+ return None
+ else:
+ return self._exec_default(column, column.default, column.type)
+
+ def get_update_default(self, column):
+ if column.onupdate is None:
+ return None
+ else:
+ return self._exec_default(column, column.onupdate, column.type)
+
+ def _process_execute_defaults(self):
+ compiled = cast(SQLCompiler, self.compiled)
+
+ key_getter = compiled._within_exec_param_key_getter
+
+ sentinel_counter = 0
+
+ if compiled.insert_prefetch:
+ prefetch_recs = [
+ (
+ c,
+ key_getter(c),
+ c._default_description_tuple,
+ self.get_insert_default,
+ )
+ for c in compiled.insert_prefetch
+ ]
+ elif compiled.update_prefetch:
+ prefetch_recs = [
+ (
+ c,
+ key_getter(c),
+ c._onupdate_description_tuple,
+ self.get_update_default,
+ )
+ for c in compiled.update_prefetch
+ ]
+ else:
+ prefetch_recs = []
+
+ for param in self.compiled_parameters:
+ self.current_parameters = param
+
+ for (
+ c,
+ param_key,
+ (arg, is_scalar, is_callable, is_sentinel),
+ fallback,
+ ) in prefetch_recs:
+ if is_sentinel:
+ param[param_key] = sentinel_counter
+ sentinel_counter += 1
+ elif is_scalar:
+ param[param_key] = arg
+ elif is_callable:
+ self.current_column = c
+ param[param_key] = arg(self)
+ else:
+ val = fallback(c)
+ if val is not None:
+ param[param_key] = val
+
+ del self.current_parameters
+
+
+DefaultDialect.execution_ctx_cls = DefaultExecutionContext