summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py2343
1 files changed, 0 insertions, 2343 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py b/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py
deleted file mode 100644
index 90cafe4..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py
+++ /dev/null
@@ -1,2343 +0,0 @@
-# engine/default.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: allow-untyped-defs, allow-untyped-calls
-
-"""Default implementations of per-dialect sqlalchemy.engine classes.
-
-These are semi-private implementation classes which are only of importance
-to database dialect authors; dialects will usually use the classes here
-as the base class for their own corresponding classes.
-
-"""
-
-from __future__ import annotations
-
-import functools
-import operator
-import random
-import re
-from time import perf_counter
-import typing
-from typing import Any
-from typing import Callable
-from typing import cast
-from typing import Dict
-from typing import List
-from typing import Mapping
-from typing import MutableMapping
-from typing import MutableSequence
-from typing import Optional
-from typing import Sequence
-from typing import Set
-from typing import Tuple
-from typing import Type
-from typing import TYPE_CHECKING
-from typing import Union
-import weakref
-
-from . import characteristics
-from . import cursor as _cursor
-from . import interfaces
-from .base import Connection
-from .interfaces import CacheStats
-from .interfaces import DBAPICursor
-from .interfaces import Dialect
-from .interfaces import ExecuteStyle
-from .interfaces import ExecutionContext
-from .reflection import ObjectKind
-from .reflection import ObjectScope
-from .. import event
-from .. import exc
-from .. import pool
-from .. import util
-from ..sql import compiler
-from ..sql import dml
-from ..sql import expression
-from ..sql import type_api
-from ..sql._typing import is_tuple_type
-from ..sql.base import _NoArg
-from ..sql.compiler import DDLCompiler
-from ..sql.compiler import InsertmanyvaluesSentinelOpts
-from ..sql.compiler import SQLCompiler
-from ..sql.elements import quoted_name
-from ..util.typing import Final
-from ..util.typing import Literal
-
-if typing.TYPE_CHECKING:
- from types import ModuleType
-
- from .base import Engine
- from .cursor import ResultFetchStrategy
- from .interfaces import _CoreMultiExecuteParams
- from .interfaces import _CoreSingleExecuteParams
- from .interfaces import _DBAPICursorDescription
- from .interfaces import _DBAPIMultiExecuteParams
- from .interfaces import _ExecuteOptions
- from .interfaces import _MutableCoreSingleExecuteParams
- from .interfaces import _ParamStyle
- from .interfaces import DBAPIConnection
- from .interfaces import IsolationLevel
- from .row import Row
- from .url import URL
- from ..event import _ListenerFnType
- from ..pool import Pool
- from ..pool import PoolProxiedConnection
- from ..sql import Executable
- from ..sql.compiler import Compiled
- from ..sql.compiler import Linting
- from ..sql.compiler import ResultColumnsEntry
- from ..sql.dml import DMLState
- from ..sql.dml import UpdateBase
- from ..sql.elements import BindParameter
- from ..sql.schema import Column
- from ..sql.type_api import _BindProcessorType
- from ..sql.type_api import _ResultProcessorType
- from ..sql.type_api import TypeEngine
-
-# When we're handed literal SQL, ensure it's a SELECT query
-SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
-
-
-(
- CACHE_HIT,
- CACHE_MISS,
- CACHING_DISABLED,
- NO_CACHE_KEY,
- NO_DIALECT_SUPPORT,
-) = list(CacheStats)
-
-
-class DefaultDialect(Dialect):
- """Default implementation of Dialect"""
-
- statement_compiler = compiler.SQLCompiler
- ddl_compiler = compiler.DDLCompiler
- type_compiler_cls = compiler.GenericTypeCompiler
-
- preparer = compiler.IdentifierPreparer
- supports_alter = True
- supports_comments = False
- supports_constraint_comments = False
- inline_comments = False
- supports_statement_cache = True
-
- div_is_floordiv = True
-
- bind_typing = interfaces.BindTyping.NONE
-
- include_set_input_sizes: Optional[Set[Any]] = None
- exclude_set_input_sizes: Optional[Set[Any]] = None
-
- # the first value we'd get for an autoincrement column.
- default_sequence_base = 1
-
- # most DBAPIs happy with this for execute().
- # not cx_oracle.
- execute_sequence_format = tuple
-
- supports_schemas = True
- supports_views = True
- supports_sequences = False
- sequences_optional = False
- preexecute_autoincrement_sequences = False
- supports_identity_columns = False
- postfetch_lastrowid = True
- favor_returning_over_lastrowid = False
- insert_null_pk_still_autoincrements = False
- update_returning = False
- delete_returning = False
- update_returning_multifrom = False
- delete_returning_multifrom = False
- insert_returning = False
-
- cte_follows_insert = False
-
- supports_native_enum = False
- supports_native_boolean = False
- supports_native_uuid = False
- returns_native_bytes = False
-
- non_native_boolean_check_constraint = True
-
- supports_simple_order_by_label = True
-
- tuple_in_values = False
-
- connection_characteristics = util.immutabledict(
- {"isolation_level": characteristics.IsolationLevelCharacteristic()}
- )
-
- engine_config_types: Mapping[str, Any] = util.immutabledict(
- {
- "pool_timeout": util.asint,
- "echo": util.bool_or_str("debug"),
- "echo_pool": util.bool_or_str("debug"),
- "pool_recycle": util.asint,
- "pool_size": util.asint,
- "max_overflow": util.asint,
- "future": util.asbool,
- }
- )
-
- # if the NUMERIC type
- # returns decimal.Decimal.
- # *not* the FLOAT type however.
- supports_native_decimal = False
-
- name = "default"
-
- # length at which to truncate
- # any identifier.
- max_identifier_length = 9999
- _user_defined_max_identifier_length: Optional[int] = None
-
- isolation_level: Optional[str] = None
-
- # sub-categories of max_identifier_length.
- # currently these accommodate for MySQL which allows alias names
- # of 255 but DDL names only of 64.
- max_index_name_length: Optional[int] = None
- max_constraint_name_length: Optional[int] = None
-
- supports_sane_rowcount = True
- supports_sane_multi_rowcount = True
- colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
- default_paramstyle = "named"
-
- supports_default_values = False
- """dialect supports INSERT... DEFAULT VALUES syntax"""
-
- supports_default_metavalue = False
- """dialect supports INSERT... VALUES (DEFAULT) syntax"""
-
- default_metavalue_token = "DEFAULT"
- """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
- parenthesis."""
-
- # not sure if this is a real thing but the compiler will deliver it
- # if this is the only flag enabled.
- supports_empty_insert = True
- """dialect supports INSERT () VALUES ()"""
-
- supports_multivalues_insert = False
-
- use_insertmanyvalues: bool = False
-
- use_insertmanyvalues_wo_returning: bool = False
-
- insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
- InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
- )
-
- insertmanyvalues_page_size: int = 1000
- insertmanyvalues_max_parameters = 32700
-
- supports_is_distinct_from = True
-
- supports_server_side_cursors = False
-
- server_side_cursors = False
-
- # extra record-level locking features (#4860)
- supports_for_update_of = False
-
- server_version_info = None
-
- default_schema_name: Optional[str] = None
-
- # indicates symbol names are
- # UPPERCASEd if they are case insensitive
- # within the database.
- # if this is True, the methods normalize_name()
- # and denormalize_name() must be provided.
- requires_name_normalize = False
-
- is_async = False
-
- has_terminate = False
-
- # TODO: this is not to be part of 2.0. implement rudimentary binary
- # literals for SQLite, PostgreSQL, MySQL only within
- # _Binary.literal_processor
- _legacy_binary_type_literal_encoding = "utf-8"
-
- @util.deprecated_params(
- empty_in_strategy=(
- "1.4",
- "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
- "deprecated, and no longer has any effect. All IN expressions "
- "are now rendered using "
- 'the "expanding parameter" strategy which renders a set of bound'
- 'expressions, or an "empty set" SELECT, at statement execution'
- "time.",
- ),
- server_side_cursors=(
- "1.4",
- "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
- "is deprecated and will be removed in a future release. Please "
- "use the "
- ":paramref:`_engine.Connection.execution_options.stream_results` "
- "parameter.",
- ),
- )
- def __init__(
- self,
- paramstyle: Optional[_ParamStyle] = None,
- isolation_level: Optional[IsolationLevel] = None,
- dbapi: Optional[ModuleType] = None,
- implicit_returning: Literal[True] = True,
- supports_native_boolean: Optional[bool] = None,
- max_identifier_length: Optional[int] = None,
- label_length: Optional[int] = None,
- insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
- use_insertmanyvalues: Optional[bool] = None,
- # util.deprecated_params decorator cannot render the
- # Linting.NO_LINTING constant
- compiler_linting: Linting = int(compiler.NO_LINTING), # type: ignore
- server_side_cursors: bool = False,
- **kwargs: Any,
- ):
- if server_side_cursors:
- if not self.supports_server_side_cursors:
- raise exc.ArgumentError(
- "Dialect %s does not support server side cursors" % self
- )
- else:
- self.server_side_cursors = True
-
- if getattr(self, "use_setinputsizes", False):
- util.warn_deprecated(
- "The dialect-level use_setinputsizes attribute is "
- "deprecated. Please use "
- "bind_typing = BindTyping.SETINPUTSIZES",
- "2.0",
- )
- self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
-
- self.positional = False
- self._ischema = None
-
- self.dbapi = dbapi
-
- if paramstyle is not None:
- self.paramstyle = paramstyle
- elif self.dbapi is not None:
- self.paramstyle = self.dbapi.paramstyle
- else:
- self.paramstyle = self.default_paramstyle
- self.positional = self.paramstyle in (
- "qmark",
- "format",
- "numeric",
- "numeric_dollar",
- )
- self.identifier_preparer = self.preparer(self)
- self._on_connect_isolation_level = isolation_level
-
- legacy_tt_callable = getattr(self, "type_compiler", None)
- if legacy_tt_callable is not None:
- tt_callable = cast(
- Type[compiler.GenericTypeCompiler],
- self.type_compiler,
- )
- else:
- tt_callable = self.type_compiler_cls
-
- self.type_compiler_instance = self.type_compiler = tt_callable(self)
-
- if supports_native_boolean is not None:
- self.supports_native_boolean = supports_native_boolean
-
- self._user_defined_max_identifier_length = max_identifier_length
- if self._user_defined_max_identifier_length:
- self.max_identifier_length = (
- self._user_defined_max_identifier_length
- )
- self.label_length = label_length
- self.compiler_linting = compiler_linting
-
- if use_insertmanyvalues is not None:
- self.use_insertmanyvalues = use_insertmanyvalues
-
- if insertmanyvalues_page_size is not _NoArg.NO_ARG:
- self.insertmanyvalues_page_size = insertmanyvalues_page_size
-
- @property
- @util.deprecated(
- "2.0",
- "full_returning is deprecated, please use insert_returning, "
- "update_returning, delete_returning",
- )
- def full_returning(self):
- return (
- self.insert_returning
- and self.update_returning
- and self.delete_returning
- )
-
- @util.memoized_property
- def insert_executemany_returning(self):
- """Default implementation for insert_executemany_returning, if not
- otherwise overridden by the specific dialect.
-
- The default dialect determines "insert_executemany_returning" is
- available if the dialect in use has opted into using the
- "use_insertmanyvalues" feature. If they haven't opted into that, then
- this attribute is False, unless the dialect in question overrides this
- and provides some other implementation (such as the Oracle dialect).
-
- """
- return self.insert_returning and self.use_insertmanyvalues
-
- @util.memoized_property
- def insert_executemany_returning_sort_by_parameter_order(self):
- """Default implementation for
- insert_executemany_returning_deterministic_order, if not otherwise
- overridden by the specific dialect.
-
- The default dialect determines "insert_executemany_returning" can have
- deterministic order only if the dialect in use has opted into using the
- "use_insertmanyvalues" feature, which implements deterministic ordering
- using client side sentinel columns only by default. The
- "insertmanyvalues" feature also features alternate forms that can
- use server-generated PK values as "sentinels", but those are only
- used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
- bitflag enables those alternate SQL forms, which are disabled
- by default.
-
- If the dialect in use hasn't opted into that, then this attribute is
- False, unless the dialect in question overrides this and provides some
- other implementation (such as the Oracle dialect).
-
- """
- return self.insert_returning and self.use_insertmanyvalues
-
- update_executemany_returning = False
- delete_executemany_returning = False
-
- @util.memoized_property
- def loaded_dbapi(self) -> ModuleType:
- if self.dbapi is None:
- raise exc.InvalidRequestError(
- f"Dialect {self} does not have a Python DBAPI established "
- "and cannot be used for actual database interaction"
- )
- return self.dbapi
-
- @util.memoized_property
- def _bind_typing_render_casts(self):
- return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
-
- def _ensure_has_table_connection(self, arg):
- if not isinstance(arg, Connection):
- raise exc.ArgumentError(
- "The argument passed to Dialect.has_table() should be a "
- "%s, got %s. "
- "Additionally, the Dialect.has_table() method is for "
- "internal dialect "
- "use only; please use "
- "``inspect(some_engine).has_table(<tablename>>)`` "
- "for public API use." % (Connection, type(arg))
- )
-
- @util.memoized_property
- def _supports_statement_cache(self):
- ssc = self.__class__.__dict__.get("supports_statement_cache", None)
- if ssc is None:
- util.warn(
- "Dialect %s:%s will not make use of SQL compilation caching "
- "as it does not set the 'supports_statement_cache' attribute "
- "to ``True``. This can have "
- "significant performance implications including some "
- "performance degradations in comparison to prior SQLAlchemy "
- "versions. Dialect maintainers should seek to set this "
- "attribute to True after appropriate development and testing "
- "for SQLAlchemy 1.4 caching support. Alternatively, this "
- "attribute may be set to False which will disable this "
- "warning." % (self.name, self.driver),
- code="cprf",
- )
-
- return bool(ssc)
-
- @util.memoized_property
- def _type_memos(self):
- return weakref.WeakKeyDictionary()
-
- @property
- def dialect_description(self):
- return self.name + "+" + self.driver
-
- @property
- def supports_sane_rowcount_returning(self):
- """True if this dialect supports sane rowcount even if RETURNING is
- in use.
-
- For dialects that don't support RETURNING, this is synonymous with
- ``supports_sane_rowcount``.
-
- """
- return self.supports_sane_rowcount
-
- @classmethod
- def get_pool_class(cls, url: URL) -> Type[Pool]:
- return getattr(cls, "poolclass", pool.QueuePool)
-
- def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
- return self.get_pool_class(url)
-
- @classmethod
- def load_provisioning(cls):
- package = ".".join(cls.__module__.split(".")[0:-1])
- try:
- __import__(package + ".provision")
- except ImportError:
- pass
-
- def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
- if self._on_connect_isolation_level is not None:
-
- def builtin_connect(dbapi_conn, conn_rec):
- self._assert_and_set_isolation_level(
- dbapi_conn, self._on_connect_isolation_level
- )
-
- return builtin_connect
- else:
- return None
-
- def initialize(self, connection):
- try:
- self.server_version_info = self._get_server_version_info(
- connection
- )
- except NotImplementedError:
- self.server_version_info = None
- try:
- self.default_schema_name = self._get_default_schema_name(
- connection
- )
- except NotImplementedError:
- self.default_schema_name = None
-
- try:
- self.default_isolation_level = self.get_default_isolation_level(
- connection.connection.dbapi_connection
- )
- except NotImplementedError:
- self.default_isolation_level = None
-
- if not self._user_defined_max_identifier_length:
- max_ident_length = self._check_max_identifier_length(connection)
- if max_ident_length:
- self.max_identifier_length = max_ident_length
-
- if (
- self.label_length
- and self.label_length > self.max_identifier_length
- ):
- raise exc.ArgumentError(
- "Label length of %d is greater than this dialect's"
- " maximum identifier length of %d"
- % (self.label_length, self.max_identifier_length)
- )
-
- def on_connect(self):
- # inherits the docstring from interfaces.Dialect.on_connect
- return None
-
- def _check_max_identifier_length(self, connection):
- """Perform a connection / server version specific check to determine
- the max_identifier_length.
-
- If the dialect's class level max_identifier_length should be used,
- can return None.
-
- .. versionadded:: 1.3.9
-
- """
- return None
-
- def get_default_isolation_level(self, dbapi_conn):
- """Given a DBAPI connection, return its isolation level, or
- a default isolation level if one cannot be retrieved.
-
- May be overridden by subclasses in order to provide a
- "fallback" isolation level for databases that cannot reliably
- retrieve the actual isolation level.
-
- By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
- method, propagating any exceptions raised.
-
- .. versionadded:: 1.3.22
-
- """
- return self.get_isolation_level(dbapi_conn)
-
- def type_descriptor(self, typeobj):
- """Provide a database-specific :class:`.TypeEngine` object, given
- the generic object which comes from the types module.
-
- This method looks for a dictionary called
- ``colspecs`` as a class or instance-level variable,
- and passes on to :func:`_types.adapt_type`.
-
- """
- return type_api.adapt_type(typeobj, self.colspecs)
-
- def has_index(self, connection, table_name, index_name, schema=None, **kw):
- if not self.has_table(connection, table_name, schema=schema, **kw):
- return False
- for idx in self.get_indexes(
- connection, table_name, schema=schema, **kw
- ):
- if idx["name"] == index_name:
- return True
- else:
- return False
-
- def has_schema(
- self, connection: Connection, schema_name: str, **kw: Any
- ) -> bool:
- return schema_name in self.get_schema_names(connection, **kw)
-
- def validate_identifier(self, ident):
- if len(ident) > self.max_identifier_length:
- raise exc.IdentifierError(
- "Identifier '%s' exceeds maximum length of %d characters"
- % (ident, self.max_identifier_length)
- )
-
- def connect(self, *cargs, **cparams):
- # inherits the docstring from interfaces.Dialect.connect
- return self.loaded_dbapi.connect(*cargs, **cparams)
-
- def create_connect_args(self, url):
- # inherits the docstring from interfaces.Dialect.create_connect_args
- opts = url.translate_connect_args()
- opts.update(url.query)
- return ([], opts)
-
- def set_engine_execution_options(
- self, engine: Engine, opts: Mapping[str, Any]
- ) -> None:
- supported_names = set(self.connection_characteristics).intersection(
- opts
- )
- if supported_names:
- characteristics: Mapping[str, Any] = util.immutabledict(
- (name, opts[name]) for name in supported_names
- )
-
- @event.listens_for(engine, "engine_connect")
- def set_connection_characteristics(connection):
- self._set_connection_characteristics(
- connection, characteristics
- )
-
- def set_connection_execution_options(
- self, connection: Connection, opts: Mapping[str, Any]
- ) -> None:
- supported_names = set(self.connection_characteristics).intersection(
- opts
- )
- if supported_names:
- characteristics: Mapping[str, Any] = util.immutabledict(
- (name, opts[name]) for name in supported_names
- )
- self._set_connection_characteristics(connection, characteristics)
-
- def _set_connection_characteristics(self, connection, characteristics):
- characteristic_values = [
- (name, self.connection_characteristics[name], value)
- for name, value in characteristics.items()
- ]
-
- if connection.in_transaction():
- trans_objs = [
- (name, obj)
- for name, obj, value in characteristic_values
- if obj.transactional
- ]
- if trans_objs:
- raise exc.InvalidRequestError(
- "This connection has already initialized a SQLAlchemy "
- "Transaction() object via begin() or autobegin; "
- "%s may not be altered unless rollback() or commit() "
- "is called first."
- % (", ".join(name for name, obj in trans_objs))
- )
-
- dbapi_connection = connection.connection.dbapi_connection
- for name, characteristic, value in characteristic_values:
- characteristic.set_characteristic(self, dbapi_connection, value)
- connection.connection._connection_record.finalize_callback.append(
- functools.partial(self._reset_characteristics, characteristics)
- )
-
- def _reset_characteristics(self, characteristics, dbapi_connection):
- for characteristic_name in characteristics:
- characteristic = self.connection_characteristics[
- characteristic_name
- ]
- characteristic.reset_characteristic(self, dbapi_connection)
-
- def do_begin(self, dbapi_connection):
- pass
-
- def do_rollback(self, dbapi_connection):
- dbapi_connection.rollback()
-
- def do_commit(self, dbapi_connection):
- dbapi_connection.commit()
-
- def do_terminate(self, dbapi_connection):
- self.do_close(dbapi_connection)
-
- def do_close(self, dbapi_connection):
- dbapi_connection.close()
-
- @util.memoized_property
- def _dialect_specific_select_one(self):
- return str(expression.select(1).compile(dialect=self))
-
- def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
- try:
- return self.do_ping(dbapi_connection)
- except self.loaded_dbapi.Error as err:
- is_disconnect = self.is_disconnect(err, dbapi_connection, None)
-
- if self._has_events:
- try:
- Connection._handle_dbapi_exception_noconnection(
- err,
- self,
- is_disconnect=is_disconnect,
- invalidate_pool_on_disconnect=False,
- is_pre_ping=True,
- )
- except exc.StatementError as new_err:
- is_disconnect = new_err.connection_invalidated
-
- if is_disconnect:
- return False
- else:
- raise
-
- def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
- cursor = None
-
- cursor = dbapi_connection.cursor()
- try:
- cursor.execute(self._dialect_specific_select_one)
- finally:
- cursor.close()
- return True
-
- def create_xid(self):
- """Create a random two-phase transaction ID.
-
- This id will be passed to do_begin_twophase(), do_rollback_twophase(),
- do_commit_twophase(). Its format is unspecified.
- """
-
- return "_sa_%032x" % random.randint(0, 2**128)
-
- def do_savepoint(self, connection, name):
- connection.execute(expression.SavepointClause(name))
-
- def do_rollback_to_savepoint(self, connection, name):
- connection.execute(expression.RollbackToSavepointClause(name))
-
- def do_release_savepoint(self, connection, name):
- connection.execute(expression.ReleaseSavepointClause(name))
-
- def _deliver_insertmanyvalues_batches(
- self, cursor, statement, parameters, generic_setinputsizes, context
- ):
- context = cast(DefaultExecutionContext, context)
- compiled = cast(SQLCompiler, context.compiled)
-
- _composite_sentinel_proc: Sequence[
- Optional[_ResultProcessorType[Any]]
- ] = ()
- _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
- _sentinel_proc_initialized: bool = False
-
- compiled_parameters = context.compiled_parameters
-
- imv = compiled._insertmanyvalues
- assert imv is not None
-
- is_returning: Final[bool] = bool(compiled.effective_returning)
- batch_size = context.execution_options.get(
- "insertmanyvalues_page_size", self.insertmanyvalues_page_size
- )
-
- if compiled.schema_translate_map:
- schema_translate_map = context.execution_options.get(
- "schema_translate_map", {}
- )
- else:
- schema_translate_map = None
-
- if is_returning:
- result: Optional[List[Any]] = []
- context._insertmanyvalues_rows = result
-
- sort_by_parameter_order = imv.sort_by_parameter_order
-
- else:
- sort_by_parameter_order = False
- result = None
-
- for imv_batch in compiled._deliver_insertmanyvalues_batches(
- statement,
- parameters,
- compiled_parameters,
- generic_setinputsizes,
- batch_size,
- sort_by_parameter_order,
- schema_translate_map,
- ):
- yield imv_batch
-
- if is_returning:
-
- rows = context.fetchall_for_returning(cursor)
-
- # I would have thought "is_returning: Final[bool]"
- # would have assured this but pylance thinks not
- assert result is not None
-
- if imv.num_sentinel_columns and not imv_batch.is_downgraded:
- composite_sentinel = imv.num_sentinel_columns > 1
- if imv.implicit_sentinel:
- # for implicit sentinel, which is currently single-col
- # integer autoincrement, do a simple sort.
- assert not composite_sentinel
- result.extend(
- sorted(rows, key=operator.itemgetter(-1))
- )
- continue
-
- # otherwise, create dictionaries to match up batches
- # with parameters
- assert imv.sentinel_param_keys
- assert imv.sentinel_columns
-
- _nsc = imv.num_sentinel_columns
-
- if not _sentinel_proc_initialized:
- if composite_sentinel:
- _composite_sentinel_proc = [
- col.type._cached_result_processor(
- self, cursor_desc[1]
- )
- for col, cursor_desc in zip(
- imv.sentinel_columns,
- cursor.description[-_nsc:],
- )
- ]
- else:
- _scalar_sentinel_proc = (
- imv.sentinel_columns[0]
- ).type._cached_result_processor(
- self, cursor.description[-1][1]
- )
- _sentinel_proc_initialized = True
-
- rows_by_sentinel: Union[
- Dict[Tuple[Any, ...], Any],
- Dict[Any, Any],
- ]
- if composite_sentinel:
- rows_by_sentinel = {
- tuple(
- (proc(val) if proc else val)
- for val, proc in zip(
- row[-_nsc:], _composite_sentinel_proc
- )
- ): row
- for row in rows
- }
- elif _scalar_sentinel_proc:
- rows_by_sentinel = {
- _scalar_sentinel_proc(row[-1]): row for row in rows
- }
- else:
- rows_by_sentinel = {row[-1]: row for row in rows}
-
- if len(rows_by_sentinel) != len(imv_batch.batch):
- # see test_insert_exec.py::
- # IMVSentinelTest::test_sentinel_incorrect_rowcount
- # for coverage / demonstration
- raise exc.InvalidRequestError(
- f"Sentinel-keyed result set did not produce "
- f"correct number of rows {len(imv_batch.batch)}; "
- "produced "
- f"{len(rows_by_sentinel)}. Please ensure the "
- "sentinel column is fully unique and populated in "
- "all cases."
- )
-
- try:
- ordered_rows = [
- rows_by_sentinel[sentinel_keys]
- for sentinel_keys in imv_batch.sentinel_values
- ]
- except KeyError as ke:
- # see test_insert_exec.py::
- # IMVSentinelTest::test_sentinel_cant_match_keys
- # for coverage / demonstration
- raise exc.InvalidRequestError(
- f"Can't match sentinel values in result set to "
- f"parameter sets; key {ke.args[0]!r} was not "
- "found. "
- "There may be a mismatch between the datatype "
- "passed to the DBAPI driver vs. that which it "
- "returns in a result row. Ensure the given "
- "Python value matches the expected result type "
- "*exactly*, taking care to not rely upon implicit "
- "conversions which may occur such as when using "
- "strings in place of UUID or integer values, etc. "
- ) from ke
-
- result.extend(ordered_rows)
-
- else:
- result.extend(rows)
-
- def do_executemany(self, cursor, statement, parameters, context=None):
- cursor.executemany(statement, parameters)
-
- def do_execute(self, cursor, statement, parameters, context=None):
- cursor.execute(statement, parameters)
-
- def do_execute_no_params(self, cursor, statement, context=None):
- cursor.execute(statement)
-
- def is_disconnect(self, e, connection, cursor):
- return False
-
- @util.memoized_instancemethod
- def _gen_allowed_isolation_levels(self, dbapi_conn):
- try:
- raw_levels = list(self.get_isolation_level_values(dbapi_conn))
- except NotImplementedError:
- return None
- else:
- normalized_levels = [
- level.replace("_", " ").upper() for level in raw_levels
- ]
- if raw_levels != normalized_levels:
- raise ValueError(
- f"Dialect {self.name!r} get_isolation_level_values() "
- f"method should return names as UPPERCASE using spaces, "
- f"not underscores; got "
- f"{sorted(set(raw_levels).difference(normalized_levels))}"
- )
- return tuple(normalized_levels)
-
- def _assert_and_set_isolation_level(self, dbapi_conn, level):
- level = level.replace("_", " ").upper()
-
- _allowed_isolation_levels = self._gen_allowed_isolation_levels(
- dbapi_conn
- )
- if (
- _allowed_isolation_levels
- and level not in _allowed_isolation_levels
- ):
- raise exc.ArgumentError(
- f"Invalid value {level!r} for isolation_level. "
- f"Valid isolation levels for {self.name!r} are "
- f"{', '.join(_allowed_isolation_levels)}"
- )
-
- self.set_isolation_level(dbapi_conn, level)
-
- def reset_isolation_level(self, dbapi_conn):
- if self._on_connect_isolation_level is not None:
- assert (
- self._on_connect_isolation_level == "AUTOCOMMIT"
- or self._on_connect_isolation_level
- == self.default_isolation_level
- )
- self._assert_and_set_isolation_level(
- dbapi_conn, self._on_connect_isolation_level
- )
- else:
- assert self.default_isolation_level is not None
- self._assert_and_set_isolation_level(
- dbapi_conn,
- self.default_isolation_level,
- )
-
- def normalize_name(self, name):
- if name is None:
- return None
-
- name_lower = name.lower()
- name_upper = name.upper()
-
- if name_upper == name_lower:
- # name has no upper/lower conversion, e.g. non-european characters.
- # return unchanged
- return name
- elif name_upper == name and not (
- self.identifier_preparer._requires_quotes
- )(name_lower):
- # name is all uppercase and doesn't require quoting; normalize
- # to all lower case
- return name_lower
- elif name_lower == name:
- # name is all lower case, which if denormalized means we need to
- # force quoting on it
- return quoted_name(name, quote=True)
- else:
- # name is mixed case, means it will be quoted in SQL when used
- # later, no normalizes
- return name
-
- def denormalize_name(self, name):
- if name is None:
- return None
-
- name_lower = name.lower()
- name_upper = name.upper()
-
- if name_upper == name_lower:
- # name has no upper/lower conversion, e.g. non-european characters.
- # return unchanged
- return name
- elif name_lower == name and not (
- self.identifier_preparer._requires_quotes
- )(name_lower):
- name = name_upper
- return name
-
- def get_driver_connection(self, connection):
- return connection
-
- def _overrides_default(self, method):
- return (
- getattr(type(self), method).__code__
- is not getattr(DefaultDialect, method).__code__
- )
-
- def _default_multi_reflect(
- self,
- single_tbl_method,
- connection,
- kind,
- schema,
- filter_names,
- scope,
- **kw,
- ):
- names_fns = []
- temp_names_fns = []
- if ObjectKind.TABLE in kind:
- names_fns.append(self.get_table_names)
- temp_names_fns.append(self.get_temp_table_names)
- if ObjectKind.VIEW in kind:
- names_fns.append(self.get_view_names)
- temp_names_fns.append(self.get_temp_view_names)
- if ObjectKind.MATERIALIZED_VIEW in kind:
- names_fns.append(self.get_materialized_view_names)
- # no temp materialized view at the moment
- # temp_names_fns.append(self.get_temp_materialized_view_names)
-
- unreflectable = kw.pop("unreflectable", {})
-
- if (
- filter_names
- and scope is ObjectScope.ANY
- and kind is ObjectKind.ANY
- ):
- # if names are given and no qualification on type of table
- # (i.e. the Table(..., autoload) case), take the names as given,
- # don't run names queries. If a table does not exit
- # NoSuchTableError is raised and it's skipped
-
- # this also suits the case for mssql where we can reflect
- # individual temp tables but there's no temp_names_fn
- names = filter_names
- else:
- names = []
- name_kw = {"schema": schema, **kw}
- fns = []
- if ObjectScope.DEFAULT in scope:
- fns.extend(names_fns)
- if ObjectScope.TEMPORARY in scope:
- fns.extend(temp_names_fns)
-
- for fn in fns:
- try:
- names.extend(fn(connection, **name_kw))
- except NotImplementedError:
- pass
-
- if filter_names:
- filter_names = set(filter_names)
-
- # iterate over all the tables/views and call the single table method
- for table in names:
- if not filter_names or table in filter_names:
- key = (schema, table)
- try:
- yield (
- key,
- single_tbl_method(
- connection, table, schema=schema, **kw
- ),
- )
- except exc.UnreflectableTableError as err:
- if key not in unreflectable:
- unreflectable[key] = err
- except exc.NoSuchTableError:
- pass
-
- def get_multi_table_options(self, connection, **kw):
- return self._default_multi_reflect(
- self.get_table_options, connection, **kw
- )
-
- def get_multi_columns(self, connection, **kw):
- return self._default_multi_reflect(self.get_columns, connection, **kw)
-
- def get_multi_pk_constraint(self, connection, **kw):
- return self._default_multi_reflect(
- self.get_pk_constraint, connection, **kw
- )
-
- def get_multi_foreign_keys(self, connection, **kw):
- return self._default_multi_reflect(
- self.get_foreign_keys, connection, **kw
- )
-
- def get_multi_indexes(self, connection, **kw):
- return self._default_multi_reflect(self.get_indexes, connection, **kw)
-
- def get_multi_unique_constraints(self, connection, **kw):
- return self._default_multi_reflect(
- self.get_unique_constraints, connection, **kw
- )
-
- def get_multi_check_constraints(self, connection, **kw):
- return self._default_multi_reflect(
- self.get_check_constraints, connection, **kw
- )
-
- def get_multi_table_comment(self, connection, **kw):
- return self._default_multi_reflect(
- self.get_table_comment, connection, **kw
- )
-
-
-class StrCompileDialect(DefaultDialect):
- statement_compiler = compiler.StrSQLCompiler
- ddl_compiler = compiler.DDLCompiler
- type_compiler_cls = compiler.StrSQLTypeCompiler
- preparer = compiler.IdentifierPreparer
-
- insert_returning = True
- update_returning = True
- delete_returning = True
-
- supports_statement_cache = True
-
- supports_identity_columns = True
-
- supports_sequences = True
- sequences_optional = True
- preexecute_autoincrement_sequences = False
-
- supports_native_boolean = True
-
- supports_multivalues_insert = True
- supports_simple_order_by_label = True
-
-
-class DefaultExecutionContext(ExecutionContext):
- isinsert = False
- isupdate = False
- isdelete = False
- is_crud = False
- is_text = False
- isddl = False
-
- execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
-
- compiled: Optional[Compiled] = None
- result_column_struct: Optional[
- Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
- ] = None
- returned_default_rows: Optional[Sequence[Row[Any]]] = None
-
- execution_options: _ExecuteOptions = util.EMPTY_DICT
-
- cursor_fetch_strategy = _cursor._DEFAULT_FETCH
-
- invoked_statement: Optional[Executable] = None
-
- _is_implicit_returning = False
- _is_explicit_returning = False
- _is_supplemental_returning = False
- _is_server_side = False
-
- _soft_closed = False
-
- _rowcount: Optional[int] = None
-
- # a hook for SQLite's translation of
- # result column names
- # NOTE: pyhive is using this hook, can't remove it :(
- _translate_colname: Optional[Callable[[str], str]] = None
-
- _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
- """used by set_input_sizes().
-
- This collection comes from ``ExpandedState.parameter_expansion``.
-
- """
-
- cache_hit = NO_CACHE_KEY
-
- root_connection: Connection
- _dbapi_connection: PoolProxiedConnection
- dialect: Dialect
- unicode_statement: str
- cursor: DBAPICursor
- compiled_parameters: List[_MutableCoreSingleExecuteParams]
- parameters: _DBAPIMultiExecuteParams
- extracted_parameters: Optional[Sequence[BindParameter[Any]]]
-
- _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
-
- _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
- _num_sentinel_cols: int = 0
-
- @classmethod
- def _init_ddl(
- cls,
- dialect: Dialect,
- connection: Connection,
- dbapi_connection: PoolProxiedConnection,
- execution_options: _ExecuteOptions,
- compiled_ddl: DDLCompiler,
- ) -> ExecutionContext:
- """Initialize execution context for an ExecutableDDLElement
- construct."""
-
- self = cls.__new__(cls)
- self.root_connection = connection
- self._dbapi_connection = dbapi_connection
- self.dialect = connection.dialect
-
- self.compiled = compiled = compiled_ddl
- self.isddl = True
-
- self.execution_options = execution_options
-
- self.unicode_statement = str(compiled)
- if compiled.schema_translate_map:
- schema_translate_map = self.execution_options.get(
- "schema_translate_map", {}
- )
-
- rst = compiled.preparer._render_schema_translates
- self.unicode_statement = rst(
- self.unicode_statement, schema_translate_map
- )
-
- self.statement = self.unicode_statement
-
- self.cursor = self.create_cursor()
- self.compiled_parameters = []
-
- if dialect.positional:
- self.parameters = [dialect.execute_sequence_format()]
- else:
- self.parameters = [self._empty_dict_params]
-
- return self
-
- @classmethod
- def _init_compiled(
- cls,
- dialect: Dialect,
- connection: Connection,
- dbapi_connection: PoolProxiedConnection,
- execution_options: _ExecuteOptions,
- compiled: SQLCompiler,
- parameters: _CoreMultiExecuteParams,
- invoked_statement: Executable,
- extracted_parameters: Optional[Sequence[BindParameter[Any]]],
- cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
- ) -> ExecutionContext:
- """Initialize execution context for a Compiled construct."""
-
- self = cls.__new__(cls)
- self.root_connection = connection
- self._dbapi_connection = dbapi_connection
- self.dialect = connection.dialect
- self.extracted_parameters = extracted_parameters
- self.invoked_statement = invoked_statement
- self.compiled = compiled
- self.cache_hit = cache_hit
-
- self.execution_options = execution_options
-
- self.result_column_struct = (
- compiled._result_columns,
- compiled._ordered_columns,
- compiled._textual_ordered_columns,
- compiled._ad_hoc_textual,
- compiled._loose_column_name_matching,
- )
-
- self.isinsert = ii = compiled.isinsert
- self.isupdate = iu = compiled.isupdate
- self.isdelete = id_ = compiled.isdelete
- self.is_text = compiled.isplaintext
-
- if ii or iu or id_:
- dml_statement = compiled.compile_state.statement # type: ignore
- if TYPE_CHECKING:
- assert isinstance(dml_statement, UpdateBase)
- self.is_crud = True
- self._is_explicit_returning = ier = bool(dml_statement._returning)
- self._is_implicit_returning = iir = bool(
- compiled.implicit_returning
- )
- if iir and dml_statement._supplemental_returning:
- self._is_supplemental_returning = True
-
- # dont mix implicit and explicit returning
- assert not (iir and ier)
-
- if (ier or iir) and compiled.for_executemany:
- if ii and not self.dialect.insert_executemany_returning:
- raise exc.InvalidRequestError(
- f"Dialect {self.dialect.dialect_description} with "
- f"current server capabilities does not support "
- "INSERT..RETURNING when executemany is used"
- )
- elif (
- ii
- and dml_statement._sort_by_parameter_order
- and not self.dialect.insert_executemany_returning_sort_by_parameter_order # noqa: E501
- ):
- raise exc.InvalidRequestError(
- f"Dialect {self.dialect.dialect_description} with "
- f"current server capabilities does not support "
- "INSERT..RETURNING with deterministic row ordering "
- "when executemany is used"
- )
- elif (
- ii
- and self.dialect.use_insertmanyvalues
- and not compiled._insertmanyvalues
- ):
- raise exc.InvalidRequestError(
- 'Statement does not have "insertmanyvalues" '
- "enabled, can't use INSERT..RETURNING with "
- "executemany in this case."
- )
- elif iu and not self.dialect.update_executemany_returning:
- raise exc.InvalidRequestError(
- f"Dialect {self.dialect.dialect_description} with "
- f"current server capabilities does not support "
- "UPDATE..RETURNING when executemany is used"
- )
- elif id_ and not self.dialect.delete_executemany_returning:
- raise exc.InvalidRequestError(
- f"Dialect {self.dialect.dialect_description} with "
- f"current server capabilities does not support "
- "DELETE..RETURNING when executemany is used"
- )
-
- if not parameters:
- self.compiled_parameters = [
- compiled.construct_params(
- extracted_parameters=extracted_parameters,
- escape_names=False,
- )
- ]
- else:
- self.compiled_parameters = [
- compiled.construct_params(
- m,
- escape_names=False,
- _group_number=grp,
- extracted_parameters=extracted_parameters,
- )
- for grp, m in enumerate(parameters)
- ]
-
- if len(parameters) > 1:
- if self.isinsert and compiled._insertmanyvalues:
- self.execute_style = ExecuteStyle.INSERTMANYVALUES
-
- imv = compiled._insertmanyvalues
- if imv.sentinel_columns is not None:
- self._num_sentinel_cols = imv.num_sentinel_columns
- else:
- self.execute_style = ExecuteStyle.EXECUTEMANY
-
- self.unicode_statement = compiled.string
-
- self.cursor = self.create_cursor()
-
- if self.compiled.insert_prefetch or self.compiled.update_prefetch:
- self._process_execute_defaults()
-
- processors = compiled._bind_processors
-
- flattened_processors: Mapping[
- str, _BindProcessorType[Any]
- ] = processors # type: ignore[assignment]
-
- if compiled.literal_execute_params or compiled.post_compile_params:
- if self.executemany:
- raise exc.InvalidRequestError(
- "'literal_execute' or 'expanding' parameters can't be "
- "used with executemany()"
- )
-
- expanded_state = compiled._process_parameters_for_postcompile(
- self.compiled_parameters[0]
- )
-
- # re-assign self.unicode_statement
- self.unicode_statement = expanded_state.statement
-
- self._expanded_parameters = expanded_state.parameter_expansion
-
- flattened_processors = dict(processors) # type: ignore
- flattened_processors.update(expanded_state.processors)
- positiontup = expanded_state.positiontup
- elif compiled.positional:
- positiontup = self.compiled.positiontup
- else:
- positiontup = None
-
- if compiled.schema_translate_map:
- schema_translate_map = self.execution_options.get(
- "schema_translate_map", {}
- )
- rst = compiled.preparer._render_schema_translates
- self.unicode_statement = rst(
- self.unicode_statement, schema_translate_map
- )
-
- # final self.unicode_statement is now assigned, encode if needed
- # by dialect
- self.statement = self.unicode_statement
-
- # Convert the dictionary of bind parameter values
- # into a dict or list to be sent to the DBAPI's
- # execute() or executemany() method.
-
- if compiled.positional:
- core_positional_parameters: MutableSequence[Sequence[Any]] = []
- assert positiontup is not None
- for compiled_params in self.compiled_parameters:
- l_param: List[Any] = [
- (
- flattened_processors[key](compiled_params[key])
- if key in flattened_processors
- else compiled_params[key]
- )
- for key in positiontup
- ]
- core_positional_parameters.append(
- dialect.execute_sequence_format(l_param)
- )
-
- self.parameters = core_positional_parameters
- else:
- core_dict_parameters: MutableSequence[Dict[str, Any]] = []
- escaped_names = compiled.escaped_bind_names
-
- # note that currently, "expanded" parameters will be present
- # in self.compiled_parameters in their quoted form. This is
- # slightly inconsistent with the approach taken as of
- # #8056 where self.compiled_parameters is meant to contain unquoted
- # param names.
- d_param: Dict[str, Any]
- for compiled_params in self.compiled_parameters:
- if escaped_names:
- d_param = {
- escaped_names.get(key, key): (
- flattened_processors[key](compiled_params[key])
- if key in flattened_processors
- else compiled_params[key]
- )
- for key in compiled_params
- }
- else:
- d_param = {
- key: (
- flattened_processors[key](compiled_params[key])
- if key in flattened_processors
- else compiled_params[key]
- )
- for key in compiled_params
- }
-
- core_dict_parameters.append(d_param)
-
- self.parameters = core_dict_parameters
-
- return self
-
- @classmethod
- def _init_statement(
- cls,
- dialect: Dialect,
- connection: Connection,
- dbapi_connection: PoolProxiedConnection,
- execution_options: _ExecuteOptions,
- statement: str,
- parameters: _DBAPIMultiExecuteParams,
- ) -> ExecutionContext:
- """Initialize execution context for a string SQL statement."""
-
- self = cls.__new__(cls)
- self.root_connection = connection
- self._dbapi_connection = dbapi_connection
- self.dialect = connection.dialect
- self.is_text = True
-
- self.execution_options = execution_options
-
- if not parameters:
- if self.dialect.positional:
- self.parameters = [dialect.execute_sequence_format()]
- else:
- self.parameters = [self._empty_dict_params]
- elif isinstance(parameters[0], dialect.execute_sequence_format):
- self.parameters = parameters
- elif isinstance(parameters[0], dict):
- self.parameters = parameters
- else:
- self.parameters = [
- dialect.execute_sequence_format(p) for p in parameters
- ]
-
- if len(parameters) > 1:
- self.execute_style = ExecuteStyle.EXECUTEMANY
-
- self.statement = self.unicode_statement = statement
-
- self.cursor = self.create_cursor()
- return self
-
- @classmethod
- def _init_default(
- cls,
- dialect: Dialect,
- connection: Connection,
- dbapi_connection: PoolProxiedConnection,
- execution_options: _ExecuteOptions,
- ) -> ExecutionContext:
- """Initialize execution context for a ColumnDefault construct."""
-
- self = cls.__new__(cls)
- self.root_connection = connection
- self._dbapi_connection = dbapi_connection
- self.dialect = connection.dialect
-
- self.execution_options = execution_options
-
- self.cursor = self.create_cursor()
- return self
-
- def _get_cache_stats(self) -> str:
- if self.compiled is None:
- return "raw sql"
-
- now = perf_counter()
-
- ch = self.cache_hit
-
- gen_time = self.compiled._gen_time
- assert gen_time is not None
-
- if ch is NO_CACHE_KEY:
- return "no key %.5fs" % (now - gen_time,)
- elif ch is CACHE_HIT:
- return "cached since %.4gs ago" % (now - gen_time,)
- elif ch is CACHE_MISS:
- return "generated in %.5fs" % (now - gen_time,)
- elif ch is CACHING_DISABLED:
- if "_cache_disable_reason" in self.execution_options:
- return "caching disabled (%s) %.5fs " % (
- self.execution_options["_cache_disable_reason"],
- now - gen_time,
- )
- else:
- return "caching disabled %.5fs" % (now - gen_time,)
- elif ch is NO_DIALECT_SUPPORT:
- return "dialect %s+%s does not support caching %.5fs" % (
- self.dialect.name,
- self.dialect.driver,
- now - gen_time,
- )
- else:
- return "unknown"
-
- @property
- def executemany(self):
- return self.execute_style in (
- ExecuteStyle.EXECUTEMANY,
- ExecuteStyle.INSERTMANYVALUES,
- )
-
- @util.memoized_property
- def identifier_preparer(self):
- if self.compiled:
- return self.compiled.preparer
- elif "schema_translate_map" in self.execution_options:
- return self.dialect.identifier_preparer._with_schema_translate(
- self.execution_options["schema_translate_map"]
- )
- else:
- return self.dialect.identifier_preparer
-
- @util.memoized_property
- def engine(self):
- return self.root_connection.engine
-
- @util.memoized_property
- def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
- if TYPE_CHECKING:
- assert isinstance(self.compiled, SQLCompiler)
- return self.compiled.postfetch
-
- @util.memoized_property
- def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
- if TYPE_CHECKING:
- assert isinstance(self.compiled, SQLCompiler)
- if self.isinsert:
- return self.compiled.insert_prefetch
- elif self.isupdate:
- return self.compiled.update_prefetch
- else:
- return ()
-
- @util.memoized_property
- def no_parameters(self):
- return self.execution_options.get("no_parameters", False)
-
- def _execute_scalar(self, stmt, type_, parameters=None):
- """Execute a string statement on the current cursor, returning a
- scalar result.
-
- Used to fire off sequences, default phrases, and "select lastrowid"
- types of statements individually or in the context of a parent INSERT
- or UPDATE statement.
-
- """
-
- conn = self.root_connection
-
- if "schema_translate_map" in self.execution_options:
- schema_translate_map = self.execution_options.get(
- "schema_translate_map", {}
- )
-
- rst = self.identifier_preparer._render_schema_translates
- stmt = rst(stmt, schema_translate_map)
-
- if not parameters:
- if self.dialect.positional:
- parameters = self.dialect.execute_sequence_format()
- else:
- parameters = {}
-
- conn._cursor_execute(self.cursor, stmt, parameters, context=self)
- row = self.cursor.fetchone()
- if row is not None:
- r = row[0]
- else:
- r = None
- if type_ is not None:
- # apply type post processors to the result
- proc = type_._cached_result_processor(
- self.dialect, self.cursor.description[0][1]
- )
- if proc:
- return proc(r)
- return r
-
- @util.memoized_property
- def connection(self):
- return self.root_connection
-
- def _use_server_side_cursor(self):
- if not self.dialect.supports_server_side_cursors:
- return False
-
- if self.dialect.server_side_cursors:
- # this is deprecated
- use_server_side = self.execution_options.get(
- "stream_results", True
- ) and (
- self.compiled
- and isinstance(self.compiled.statement, expression.Selectable)
- or (
- (
- not self.compiled
- or isinstance(
- self.compiled.statement, expression.TextClause
- )
- )
- and self.unicode_statement
- and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
- )
- )
- else:
- use_server_side = self.execution_options.get(
- "stream_results", False
- )
-
- return use_server_side
-
- def create_cursor(self):
- if (
- # inlining initial preference checks for SS cursors
- self.dialect.supports_server_side_cursors
- and (
- self.execution_options.get("stream_results", False)
- or (
- self.dialect.server_side_cursors
- and self._use_server_side_cursor()
- )
- )
- ):
- self._is_server_side = True
- return self.create_server_side_cursor()
- else:
- self._is_server_side = False
- return self.create_default_cursor()
-
- def fetchall_for_returning(self, cursor):
- return cursor.fetchall()
-
- def create_default_cursor(self):
- return self._dbapi_connection.cursor()
-
- def create_server_side_cursor(self):
- raise NotImplementedError()
-
- def pre_exec(self):
- pass
-
- def get_out_parameter_values(self, names):
- raise NotImplementedError(
- "This dialect does not support OUT parameters"
- )
-
- def post_exec(self):
- pass
-
- def get_result_processor(self, type_, colname, coltype):
- """Return a 'result processor' for a given type as present in
- cursor.description.
-
- This has a default implementation that dialects can override
- for context-sensitive result type handling.
-
- """
- return type_._cached_result_processor(self.dialect, coltype)
-
- def get_lastrowid(self):
- """return self.cursor.lastrowid, or equivalent, after an INSERT.
-
- This may involve calling special cursor functions, issuing a new SELECT
- on the cursor (or a new one), or returning a stored value that was
- calculated within post_exec().
-
- This function will only be called for dialects which support "implicit"
- primary key generation, keep preexecute_autoincrement_sequences set to
- False, and when no explicit id value was bound to the statement.
-
- The function is called once for an INSERT statement that would need to
- return the last inserted primary key for those dialects that make use
- of the lastrowid concept. In these cases, it is called directly after
- :meth:`.ExecutionContext.post_exec`.
-
- """
- return self.cursor.lastrowid
-
- def handle_dbapi_exception(self, e):
- pass
-
- @util.non_memoized_property
- def rowcount(self) -> int:
- if self._rowcount is not None:
- return self._rowcount
- else:
- return self.cursor.rowcount
-
- @property
- def _has_rowcount(self):
- return self._rowcount is not None
-
- def supports_sane_rowcount(self):
- return self.dialect.supports_sane_rowcount
-
- def supports_sane_multi_rowcount(self):
- return self.dialect.supports_sane_multi_rowcount
-
- def _setup_result_proxy(self):
- exec_opt = self.execution_options
-
- if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
- self._rowcount = self.cursor.rowcount
-
- if self.is_crud or self.is_text:
- result = self._setup_dml_or_text_result()
- yp = sr = False
- else:
- yp = exec_opt.get("yield_per", None)
- sr = self._is_server_side or exec_opt.get("stream_results", False)
- strategy = self.cursor_fetch_strategy
- if sr and strategy is _cursor._DEFAULT_FETCH:
- strategy = _cursor.BufferedRowCursorFetchStrategy(
- self.cursor, self.execution_options
- )
- cursor_description: _DBAPICursorDescription = (
- strategy.alternate_cursor_description
- or self.cursor.description
- )
- if cursor_description is None:
- strategy = _cursor._NO_CURSOR_DQL
-
- result = _cursor.CursorResult(self, strategy, cursor_description)
-
- compiled = self.compiled
-
- if (
- compiled
- and not self.isddl
- and cast(SQLCompiler, compiled).has_out_parameters
- ):
- self._setup_out_parameters(result)
-
- self._soft_closed = result._soft_closed
-
- if yp:
- result = result.yield_per(yp)
-
- return result
-
- def _setup_out_parameters(self, result):
- compiled = cast(SQLCompiler, self.compiled)
-
- out_bindparams = [
- (param, name)
- for param, name in compiled.bind_names.items()
- if param.isoutparam
- ]
- out_parameters = {}
-
- for bindparam, raw_value in zip(
- [param for param, name in out_bindparams],
- self.get_out_parameter_values(
- [name for param, name in out_bindparams]
- ),
- ):
- type_ = bindparam.type
- impl_type = type_.dialect_impl(self.dialect)
- dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
- result_processor = impl_type.result_processor(
- self.dialect, dbapi_type
- )
- if result_processor is not None:
- raw_value = result_processor(raw_value)
- out_parameters[bindparam.key] = raw_value
-
- result.out_parameters = out_parameters
-
- def _setup_dml_or_text_result(self):
- compiled = cast(SQLCompiler, self.compiled)
-
- strategy: ResultFetchStrategy = self.cursor_fetch_strategy
-
- if self.isinsert:
- if (
- self.execute_style is ExecuteStyle.INSERTMANYVALUES
- and compiled.effective_returning
- ):
- strategy = _cursor.FullyBufferedCursorFetchStrategy(
- self.cursor,
- initial_buffer=self._insertmanyvalues_rows,
- # maintain alt cursor description if set by the
- # dialect, e.g. mssql preserves it
- alternate_description=(
- strategy.alternate_cursor_description
- ),
- )
-
- if compiled.postfetch_lastrowid:
- self.inserted_primary_key_rows = (
- self._setup_ins_pk_from_lastrowid()
- )
- # else if not self._is_implicit_returning,
- # the default inserted_primary_key_rows accessor will
- # return an "empty" primary key collection when accessed.
-
- if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
- strategy = _cursor.BufferedRowCursorFetchStrategy(
- self.cursor, self.execution_options
- )
-
- if strategy is _cursor._NO_CURSOR_DML:
- cursor_description = None
- else:
- cursor_description = (
- strategy.alternate_cursor_description
- or self.cursor.description
- )
-
- if cursor_description is None:
- strategy = _cursor._NO_CURSOR_DML
- elif self._num_sentinel_cols:
- assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
- # strip out the sentinel columns from cursor description
- # a similar logic is done to the rows only in CursorResult
- cursor_description = cursor_description[
- 0 : -self._num_sentinel_cols
- ]
-
- result: _cursor.CursorResult[Any] = _cursor.CursorResult(
- self, strategy, cursor_description
- )
-
- if self.isinsert:
- if self._is_implicit_returning:
- rows = result.all()
-
- self.returned_default_rows = rows
-
- self.inserted_primary_key_rows = (
- self._setup_ins_pk_from_implicit_returning(result, rows)
- )
-
- # test that it has a cursor metadata that is accurate. the
- # first row will have been fetched and current assumptions
- # are that the result has only one row, until executemany()
- # support is added here.
- assert result._metadata.returns_rows
-
- # Insert statement has both return_defaults() and
- # returning(). rewind the result on the list of rows
- # we just used.
- if self._is_supplemental_returning:
- result._rewind(rows)
- else:
- result._soft_close()
- elif not self._is_explicit_returning:
- result._soft_close()
-
- # we assume here the result does not return any rows.
- # *usually*, this will be true. However, some dialects
- # such as that of MSSQL/pyodbc need to SELECT a post fetch
- # function so this is not necessarily true.
- # assert not result.returns_rows
-
- elif self._is_implicit_returning:
- rows = result.all()
-
- if rows:
- self.returned_default_rows = rows
- self._rowcount = len(rows)
-
- if self._is_supplemental_returning:
- result._rewind(rows)
- else:
- result._soft_close()
-
- # test that it has a cursor metadata that is accurate.
- # the rows have all been fetched however.
- assert result._metadata.returns_rows
-
- elif not result._metadata.returns_rows:
- # no results, get rowcount
- # (which requires open cursor on some drivers)
- if self._rowcount is None:
- self._rowcount = self.cursor.rowcount
- result._soft_close()
- elif self.isupdate or self.isdelete:
- if self._rowcount is None:
- self._rowcount = self.cursor.rowcount
- return result
-
- @util.memoized_property
- def inserted_primary_key_rows(self):
- # if no specific "get primary key" strategy was set up
- # during execution, return a "default" primary key based
- # on what's in the compiled_parameters and nothing else.
- return self._setup_ins_pk_from_empty()
-
- def _setup_ins_pk_from_lastrowid(self):
- getter = cast(
- SQLCompiler, self.compiled
- )._inserted_primary_key_from_lastrowid_getter
- lastrowid = self.get_lastrowid()
- return [getter(lastrowid, self.compiled_parameters[0])]
-
- def _setup_ins_pk_from_empty(self):
- getter = cast(
- SQLCompiler, self.compiled
- )._inserted_primary_key_from_lastrowid_getter
- return [getter(None, param) for param in self.compiled_parameters]
-
- def _setup_ins_pk_from_implicit_returning(self, result, rows):
- if not rows:
- return []
-
- getter = cast(
- SQLCompiler, self.compiled
- )._inserted_primary_key_from_returning_getter
- compiled_params = self.compiled_parameters
-
- return [
- getter(row, param) for row, param in zip(rows, compiled_params)
- ]
-
- def lastrow_has_defaults(self):
- return (self.isinsert or self.isupdate) and bool(
- cast(SQLCompiler, self.compiled).postfetch
- )
-
- def _prepare_set_input_sizes(
- self,
- ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
- """Given a cursor and ClauseParameters, prepare arguments
- in order to call the appropriate
- style of ``setinputsizes()`` on the cursor, using DB-API types
- from the bind parameter's ``TypeEngine`` objects.
-
- This method only called by those dialects which set
- the :attr:`.Dialect.bind_typing` attribute to
- :attr:`.BindTyping.SETINPUTSIZES`. cx_Oracle is the only DBAPI
- that requires setinputsizes(), pyodbc offers it as an option.
-
- Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
- for pg8000 and asyncpg, which has been changed to inline rendering
- of casts.
-
- """
- if self.isddl or self.is_text:
- return None
-
- compiled = cast(SQLCompiler, self.compiled)
-
- inputsizes = compiled._get_set_input_sizes_lookup()
-
- if inputsizes is None:
- return None
-
- dialect = self.dialect
-
- # all of the rest of this... cython?
-
- if dialect._has_events:
- inputsizes = dict(inputsizes)
- dialect.dispatch.do_setinputsizes(
- inputsizes, self.cursor, self.statement, self.parameters, self
- )
-
- if compiled.escaped_bind_names:
- escaped_bind_names = compiled.escaped_bind_names
- else:
- escaped_bind_names = None
-
- if dialect.positional:
- items = [
- (key, compiled.binds[key])
- for key in compiled.positiontup or ()
- ]
- else:
- items = [
- (key, bindparam)
- for bindparam, key in compiled.bind_names.items()
- ]
-
- generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
- for key, bindparam in items:
- if bindparam in compiled.literal_execute_params:
- continue
-
- if key in self._expanded_parameters:
- if is_tuple_type(bindparam.type):
- num = len(bindparam.type.types)
- dbtypes = inputsizes[bindparam]
- generic_inputsizes.extend(
- (
- (
- escaped_bind_names.get(paramname, paramname)
- if escaped_bind_names is not None
- else paramname
- ),
- dbtypes[idx % num],
- bindparam.type.types[idx % num],
- )
- for idx, paramname in enumerate(
- self._expanded_parameters[key]
- )
- )
- else:
- dbtype = inputsizes.get(bindparam, None)
- generic_inputsizes.extend(
- (
- (
- escaped_bind_names.get(paramname, paramname)
- if escaped_bind_names is not None
- else paramname
- ),
- dbtype,
- bindparam.type,
- )
- for paramname in self._expanded_parameters[key]
- )
- else:
- dbtype = inputsizes.get(bindparam, None)
-
- escaped_name = (
- escaped_bind_names.get(key, key)
- if escaped_bind_names is not None
- else key
- )
-
- generic_inputsizes.append(
- (escaped_name, dbtype, bindparam.type)
- )
-
- return generic_inputsizes
-
- def _exec_default(self, column, default, type_):
- if default.is_sequence:
- return self.fire_sequence(default, type_)
- elif default.is_callable:
- # this codepath is not normally used as it's inlined
- # into _process_execute_defaults
- self.current_column = column
- return default.arg(self)
- elif default.is_clause_element:
- return self._exec_default_clause_element(column, default, type_)
- else:
- # this codepath is not normally used as it's inlined
- # into _process_execute_defaults
- return default.arg
-
- def _exec_default_clause_element(self, column, default, type_):
- # execute a default that's a complete clause element. Here, we have
- # to re-implement a miniature version of the compile->parameters->
- # cursor.execute() sequence, since we don't want to modify the state
- # of the connection / result in progress or create new connection/
- # result objects etc.
- # .. versionchanged:: 1.4
-
- if not default._arg_is_typed:
- default_arg = expression.type_coerce(default.arg, type_)
- else:
- default_arg = default.arg
- compiled = expression.select(default_arg).compile(dialect=self.dialect)
- compiled_params = compiled.construct_params()
- processors = compiled._bind_processors
- if compiled.positional:
- parameters = self.dialect.execute_sequence_format(
- [
- (
- processors[key](compiled_params[key]) # type: ignore
- if key in processors
- else compiled_params[key]
- )
- for key in compiled.positiontup or ()
- ]
- )
- else:
- parameters = {
- key: (
- processors[key](compiled_params[key]) # type: ignore
- if key in processors
- else compiled_params[key]
- )
- for key in compiled_params
- }
- return self._execute_scalar(
- str(compiled), type_, parameters=parameters
- )
-
- current_parameters: Optional[_CoreSingleExecuteParams] = None
- """A dictionary of parameters applied to the current row.
-
- This attribute is only available in the context of a user-defined default
- generation function, e.g. as described at :ref:`context_default_functions`.
- It consists of a dictionary which includes entries for each column/value
- pair that is to be part of the INSERT or UPDATE statement. The keys of the
- dictionary will be the key value of each :class:`_schema.Column`,
- which is usually
- synonymous with the name.
-
- Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
- does not accommodate for the "multi-values" feature of the
- :meth:`_expression.Insert.values` method. The
- :meth:`.DefaultExecutionContext.get_current_parameters` method should be
- preferred.
-
- .. seealso::
-
- :meth:`.DefaultExecutionContext.get_current_parameters`
-
- :ref:`context_default_functions`
-
- """
-
- def get_current_parameters(self, isolate_multiinsert_groups=True):
- """Return a dictionary of parameters applied to the current row.
-
- This method can only be used in the context of a user-defined default
- generation function, e.g. as described at
- :ref:`context_default_functions`. When invoked, a dictionary is
- returned which includes entries for each column/value pair that is part
- of the INSERT or UPDATE statement. The keys of the dictionary will be
- the key value of each :class:`_schema.Column`,
- which is usually synonymous
- with the name.
-
- :param isolate_multiinsert_groups=True: indicates that multi-valued
- INSERT constructs created using :meth:`_expression.Insert.values`
- should be
- handled by returning only the subset of parameters that are local
- to the current column default invocation. When ``False``, the
- raw parameters of the statement are returned including the
- naming convention used in the case of multi-valued INSERT.
-
- .. versionadded:: 1.2 added
- :meth:`.DefaultExecutionContext.get_current_parameters`
- which provides more functionality over the existing
- :attr:`.DefaultExecutionContext.current_parameters`
- attribute.
-
- .. seealso::
-
- :attr:`.DefaultExecutionContext.current_parameters`
-
- :ref:`context_default_functions`
-
- """
- try:
- parameters = self.current_parameters
- column = self.current_column
- except AttributeError:
- raise exc.InvalidRequestError(
- "get_current_parameters() can only be invoked in the "
- "context of a Python side column default function"
- )
- else:
- assert column is not None
- assert parameters is not None
- compile_state = cast(
- "DMLState", cast(SQLCompiler, self.compiled).compile_state
- )
- assert compile_state is not None
- if (
- isolate_multiinsert_groups
- and dml.isinsert(compile_state)
- and compile_state._has_multi_parameters
- ):
- if column._is_multiparam_column:
- index = column.index + 1
- d = {column.original.key: parameters[column.key]}
- else:
- d = {column.key: parameters[column.key]}
- index = 0
- assert compile_state._dict_parameters is not None
- keys = compile_state._dict_parameters.keys()
- d.update(
- (key, parameters["%s_m%d" % (key, index)]) for key in keys
- )
- return d
- else:
- return parameters
-
- def get_insert_default(self, column):
- if column.default is None:
- return None
- else:
- return self._exec_default(column, column.default, column.type)
-
- def get_update_default(self, column):
- if column.onupdate is None:
- return None
- else:
- return self._exec_default(column, column.onupdate, column.type)
-
- def _process_execute_defaults(self):
- compiled = cast(SQLCompiler, self.compiled)
-
- key_getter = compiled._within_exec_param_key_getter
-
- sentinel_counter = 0
-
- if compiled.insert_prefetch:
- prefetch_recs = [
- (
- c,
- key_getter(c),
- c._default_description_tuple,
- self.get_insert_default,
- )
- for c in compiled.insert_prefetch
- ]
- elif compiled.update_prefetch:
- prefetch_recs = [
- (
- c,
- key_getter(c),
- c._onupdate_description_tuple,
- self.get_update_default,
- )
- for c in compiled.update_prefetch
- ]
- else:
- prefetch_recs = []
-
- for param in self.compiled_parameters:
- self.current_parameters = param
-
- for (
- c,
- param_key,
- (arg, is_scalar, is_callable, is_sentinel),
- fallback,
- ) in prefetch_recs:
- if is_sentinel:
- param[param_key] = sentinel_counter
- sentinel_counter += 1
- elif is_scalar:
- param[param_key] = arg
- elif is_callable:
- self.current_column = c
- param[param_key] = arg(self)
- else:
- val = fallback(c)
- if val is not None:
- param[param_key] = val
-
- del self.current_parameters
-
-
-DefaultDialect.execution_ctx_cls = DefaultExecutionContext