summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py2178
1 files changed, 2178 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py b/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py
new file mode 100644
index 0000000..71767db
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py
@@ -0,0 +1,2178 @@
+# engine/cursor.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+"""Define cursor-specific result set constructs including
+:class:`.CursorResult`."""
+
+
+from __future__ import annotations
+
+import collections
+import functools
+import operator
+import typing
+from typing import Any
+from typing import cast
+from typing import ClassVar
+from typing import Dict
+from typing import Iterator
+from typing import List
+from typing import Mapping
+from typing import NoReturn
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from .result import IteratorResult
+from .result import MergedResult
+from .result import Result
+from .result import ResultMetaData
+from .result import SimpleResultMetaData
+from .result import tuplegetter
+from .row import Row
+from .. import exc
+from .. import util
+from ..sql import elements
+from ..sql import sqltypes
+from ..sql import util as sql_util
+from ..sql.base import _generative
+from ..sql.compiler import ResultColumnsEntry
+from ..sql.compiler import RM_NAME
+from ..sql.compiler import RM_OBJECTS
+from ..sql.compiler import RM_RENDERED_NAME
+from ..sql.compiler import RM_TYPE
+from ..sql.type_api import TypeEngine
+from ..util import compat
+from ..util.typing import Literal
+from ..util.typing import Self
+
+
+if typing.TYPE_CHECKING:
+ from .base import Connection
+ from .default import DefaultExecutionContext
+ from .interfaces import _DBAPICursorDescription
+ from .interfaces import DBAPICursor
+ from .interfaces import Dialect
+ from .interfaces import ExecutionContext
+ from .result import _KeyIndexType
+ from .result import _KeyMapRecType
+ from .result import _KeyMapType
+ from .result import _KeyType
+ from .result import _ProcessorsType
+ from .result import _TupleGetterType
+ from ..sql.type_api import _ResultProcessorType
+
+
+_T = TypeVar("_T", bound=Any)
+
+
+# metadata entry tuple indexes.
+# using raw tuple is faster than namedtuple.
+# these match up to the positions in
+# _CursorKeyMapRecType
+MD_INDEX: Literal[0] = 0
+"""integer index in cursor.description
+
+"""
+
+MD_RESULT_MAP_INDEX: Literal[1] = 1
+"""integer index in compiled._result_columns"""
+
+MD_OBJECTS: Literal[2] = 2
+"""other string keys and ColumnElement obj that can match.
+
+This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects
+
+"""
+
+MD_LOOKUP_KEY: Literal[3] = 3
+"""string key we usually expect for key-based lookup
+
+this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name
+"""
+
+
+MD_RENDERED_NAME: Literal[4] = 4
+"""name that is usually in cursor.description
+
+this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname
+"""
+
+
+MD_PROCESSOR: Literal[5] = 5
+"""callable to process a result value into a row"""
+
+MD_UNTRANSLATED: Literal[6] = 6
+"""raw name from cursor.description"""
+
+
+_CursorKeyMapRecType = Tuple[
+ Optional[int], # MD_INDEX, None means the record is ambiguously named
+ int, # MD_RESULT_MAP_INDEX
+ List[Any], # MD_OBJECTS
+ str, # MD_LOOKUP_KEY
+ str, # MD_RENDERED_NAME
+ Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR
+ Optional[str], # MD_UNTRANSLATED
+]
+
+_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType]
+
+# same as _CursorKeyMapRecType except the MD_INDEX value is definitely
+# not None
+_NonAmbigCursorKeyMapRecType = Tuple[
+ int,
+ int,
+ List[Any],
+ str,
+ str,
+ Optional["_ResultProcessorType[Any]"],
+ str,
+]
+
+
+class CursorResultMetaData(ResultMetaData):
+ """Result metadata for DBAPI cursors."""
+
+ __slots__ = (
+ "_keymap",
+ "_processors",
+ "_keys",
+ "_keymap_by_result_column_idx",
+ "_tuplefilter",
+ "_translated_indexes",
+ "_safe_for_cache",
+ "_unpickled",
+ "_key_to_index",
+ # don't need _unique_filters support here for now. Can be added
+ # if a need arises.
+ )
+
+ _keymap: _CursorKeyMapType
+ _processors: _ProcessorsType
+ _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]]
+ _unpickled: bool
+ _safe_for_cache: bool
+ _translated_indexes: Optional[List[int]]
+
+ returns_rows: ClassVar[bool] = True
+
+ def _has_key(self, key: Any) -> bool:
+ return key in self._keymap
+
+ def _for_freeze(self) -> ResultMetaData:
+ return SimpleResultMetaData(
+ self._keys,
+ extra=[self._keymap[key][MD_OBJECTS] for key in self._keys],
+ )
+
+ def _make_new_metadata(
+ self,
+ *,
+ unpickled: bool,
+ processors: _ProcessorsType,
+ keys: Sequence[str],
+ keymap: _KeyMapType,
+ tuplefilter: Optional[_TupleGetterType],
+ translated_indexes: Optional[List[int]],
+ safe_for_cache: bool,
+ keymap_by_result_column_idx: Any,
+ ) -> CursorResultMetaData:
+ new_obj = self.__class__.__new__(self.__class__)
+ new_obj._unpickled = unpickled
+ new_obj._processors = processors
+ new_obj._keys = keys
+ new_obj._keymap = keymap
+ new_obj._tuplefilter = tuplefilter
+ new_obj._translated_indexes = translated_indexes
+ new_obj._safe_for_cache = safe_for_cache
+ new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx
+ new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX)
+ return new_obj
+
+ def _remove_processors(self) -> CursorResultMetaData:
+ assert not self._tuplefilter
+ return self._make_new_metadata(
+ unpickled=self._unpickled,
+ processors=[None] * len(self._processors),
+ tuplefilter=None,
+ translated_indexes=None,
+ keymap={
+ key: value[0:5] + (None,) + value[6:]
+ for key, value in self._keymap.items()
+ },
+ keys=self._keys,
+ safe_for_cache=self._safe_for_cache,
+ keymap_by_result_column_idx=self._keymap_by_result_column_idx,
+ )
+
+ def _splice_horizontally(
+ self, other: CursorResultMetaData
+ ) -> CursorResultMetaData:
+ assert not self._tuplefilter
+
+ keymap = dict(self._keymap)
+ offset = len(self._keys)
+ keymap.update(
+ {
+ key: (
+ # int index should be None for ambiguous key
+ (
+ value[0] + offset
+ if value[0] is not None and key not in keymap
+ else None
+ ),
+ value[1] + offset,
+ *value[2:],
+ )
+ for key, value in other._keymap.items()
+ }
+ )
+ return self._make_new_metadata(
+ unpickled=self._unpickled,
+ processors=self._processors + other._processors, # type: ignore
+ tuplefilter=None,
+ translated_indexes=None,
+ keys=self._keys + other._keys, # type: ignore
+ keymap=keymap,
+ safe_for_cache=self._safe_for_cache,
+ keymap_by_result_column_idx={
+ metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
+ for metadata_entry in keymap.values()
+ },
+ )
+
+ def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData:
+ recs = list(self._metadata_for_keys(keys))
+
+ indexes = [rec[MD_INDEX] for rec in recs]
+ new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs]
+
+ if self._translated_indexes:
+ indexes = [self._translated_indexes[idx] for idx in indexes]
+ tup = tuplegetter(*indexes)
+ new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)]
+
+ keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs}
+ # TODO: need unit test for:
+ # result = connection.execute("raw sql, no columns").scalars()
+ # without the "or ()" it's failing because MD_OBJECTS is None
+ keymap.update(
+ (e, new_rec)
+ for new_rec in new_recs
+ for e in new_rec[MD_OBJECTS] or ()
+ )
+
+ return self._make_new_metadata(
+ unpickled=self._unpickled,
+ processors=self._processors,
+ keys=new_keys,
+ tuplefilter=tup,
+ translated_indexes=indexes,
+ keymap=keymap, # type: ignore[arg-type]
+ safe_for_cache=self._safe_for_cache,
+ keymap_by_result_column_idx=self._keymap_by_result_column_idx,
+ )
+
+ def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData:
+ """When using a cached Compiled construct that has a _result_map,
+ for a new statement that used the cached Compiled, we need to ensure
+ the keymap has the Column objects from our new statement as keys.
+ So here we rewrite keymap with new entries for the new columns
+ as matched to those of the cached statement.
+
+ """
+
+ if not context.compiled or not context.compiled._result_columns:
+ return self
+
+ compiled_statement = context.compiled.statement
+ invoked_statement = context.invoked_statement
+
+ if TYPE_CHECKING:
+ assert isinstance(invoked_statement, elements.ClauseElement)
+
+ if compiled_statement is invoked_statement:
+ return self
+
+ assert invoked_statement is not None
+
+ # this is the most common path for Core statements when
+ # caching is used. In ORM use, this codepath is not really used
+ # as the _result_disable_adapt_to_context execution option is
+ # set by the ORM.
+
+ # make a copy and add the columns from the invoked statement
+ # to the result map.
+
+ keymap_by_position = self._keymap_by_result_column_idx
+
+ if keymap_by_position is None:
+ # first retrival from cache, this map will not be set up yet,
+ # initialize lazily
+ keymap_by_position = self._keymap_by_result_column_idx = {
+ metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry
+ for metadata_entry in self._keymap.values()
+ }
+
+ assert not self._tuplefilter
+ return self._make_new_metadata(
+ keymap=compat.dict_union(
+ self._keymap,
+ {
+ new: keymap_by_position[idx]
+ for idx, new in enumerate(
+ invoked_statement._all_selected_columns
+ )
+ if idx in keymap_by_position
+ },
+ ),
+ unpickled=self._unpickled,
+ processors=self._processors,
+ tuplefilter=None,
+ translated_indexes=None,
+ keys=self._keys,
+ safe_for_cache=self._safe_for_cache,
+ keymap_by_result_column_idx=self._keymap_by_result_column_idx,
+ )
+
+ def __init__(
+ self,
+ parent: CursorResult[Any],
+ cursor_description: _DBAPICursorDescription,
+ ):
+ context = parent.context
+ self._tuplefilter = None
+ self._translated_indexes = None
+ self._safe_for_cache = self._unpickled = False
+
+ if context.result_column_struct:
+ (
+ result_columns,
+ cols_are_ordered,
+ textual_ordered,
+ ad_hoc_textual,
+ loose_column_name_matching,
+ ) = context.result_column_struct
+ num_ctx_cols = len(result_columns)
+ else:
+ result_columns = cols_are_ordered = ( # type: ignore
+ num_ctx_cols
+ ) = ad_hoc_textual = loose_column_name_matching = (
+ textual_ordered
+ ) = False
+
+ # merge cursor.description with the column info
+ # present in the compiled structure, if any
+ raw = self._merge_cursor_description(
+ context,
+ cursor_description,
+ result_columns,
+ num_ctx_cols,
+ cols_are_ordered,
+ textual_ordered,
+ ad_hoc_textual,
+ loose_column_name_matching,
+ )
+
+ # processors in key order which are used when building up
+ # a row
+ self._processors = [
+ metadata_entry[MD_PROCESSOR] for metadata_entry in raw
+ ]
+
+ # this is used when using this ResultMetaData in a Core-only cache
+ # retrieval context. it's initialized on first cache retrieval
+ # when the _result_disable_adapt_to_context execution option
+ # (which the ORM generally sets) is not set.
+ self._keymap_by_result_column_idx = None
+
+ # for compiled SQL constructs, copy additional lookup keys into
+ # the key lookup map, such as Column objects, labels,
+ # column keys and other names
+ if num_ctx_cols:
+ # keymap by primary string...
+ by_key = {
+ metadata_entry[MD_LOOKUP_KEY]: metadata_entry
+ for metadata_entry in raw
+ }
+
+ if len(by_key) != num_ctx_cols:
+ # if by-primary-string dictionary smaller than
+ # number of columns, assume we have dupes; (this check
+ # is also in place if string dictionary is bigger, as
+ # can occur when '*' was used as one of the compiled columns,
+ # which may or may not be suggestive of dupes), rewrite
+ # dupe records with "None" for index which results in
+ # ambiguous column exception when accessed.
+ #
+ # this is considered to be the less common case as it is not
+ # common to have dupe column keys in a SELECT statement.
+ #
+ # new in 1.4: get the complete set of all possible keys,
+ # strings, objects, whatever, that are dupes across two
+ # different records, first.
+ index_by_key: Dict[Any, Any] = {}
+ dupes = set()
+ for metadata_entry in raw:
+ for key in (metadata_entry[MD_RENDERED_NAME],) + (
+ metadata_entry[MD_OBJECTS] or ()
+ ):
+ idx = metadata_entry[MD_INDEX]
+ # if this key has been associated with more than one
+ # positional index, it's a dupe
+ if index_by_key.setdefault(key, idx) != idx:
+ dupes.add(key)
+
+ # then put everything we have into the keymap excluding only
+ # those keys that are dupes.
+ self._keymap = {
+ obj_elem: metadata_entry
+ for metadata_entry in raw
+ if metadata_entry[MD_OBJECTS]
+ for obj_elem in metadata_entry[MD_OBJECTS]
+ if obj_elem not in dupes
+ }
+
+ # then for the dupe keys, put the "ambiguous column"
+ # record into by_key.
+ by_key.update(
+ {
+ key: (None, None, [], key, key, None, None)
+ for key in dupes
+ }
+ )
+
+ else:
+ # no dupes - copy secondary elements from compiled
+ # columns into self._keymap. this is the most common
+ # codepath for Core / ORM statement executions before the
+ # result metadata is cached
+ self._keymap = {
+ obj_elem: metadata_entry
+ for metadata_entry in raw
+ if metadata_entry[MD_OBJECTS]
+ for obj_elem in metadata_entry[MD_OBJECTS]
+ }
+ # update keymap with primary string names taking
+ # precedence
+ self._keymap.update(by_key)
+ else:
+ # no compiled objects to map, just create keymap by primary string
+ self._keymap = {
+ metadata_entry[MD_LOOKUP_KEY]: metadata_entry
+ for metadata_entry in raw
+ }
+
+ # update keymap with "translated" names. In SQLAlchemy this is a
+ # sqlite only thing, and in fact impacting only extremely old SQLite
+ # versions unlikely to be present in modern Python versions.
+ # however, the pyhive third party dialect is
+ # also using this hook, which means others still might use it as well.
+ # I dislike having this awkward hook here but as long as we need
+ # to use names in cursor.description in some cases we need to have
+ # some hook to accomplish this.
+ if not num_ctx_cols and context._translate_colname:
+ self._keymap.update(
+ {
+ metadata_entry[MD_UNTRANSLATED]: self._keymap[
+ metadata_entry[MD_LOOKUP_KEY]
+ ]
+ for metadata_entry in raw
+ if metadata_entry[MD_UNTRANSLATED]
+ }
+ )
+
+ self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
+
+ def _merge_cursor_description(
+ self,
+ context,
+ cursor_description,
+ result_columns,
+ num_ctx_cols,
+ cols_are_ordered,
+ textual_ordered,
+ ad_hoc_textual,
+ loose_column_name_matching,
+ ):
+ """Merge a cursor.description with compiled result column information.
+
+ There are at least four separate strategies used here, selected
+ depending on the type of SQL construct used to start with.
+
+ The most common case is that of the compiled SQL expression construct,
+ which generated the column names present in the raw SQL string and
+ which has the identical number of columns as were reported by
+ cursor.description. In this case, we assume a 1-1 positional mapping
+ between the entries in cursor.description and the compiled object.
+ This is also the most performant case as we disregard extracting /
+ decoding the column names present in cursor.description since we
+ already have the desired name we generated in the compiled SQL
+ construct.
+
+ The next common case is that of the completely raw string SQL,
+ such as passed to connection.execute(). In this case we have no
+ compiled construct to work with, so we extract and decode the
+ names from cursor.description and index those as the primary
+ result row target keys.
+
+ The remaining fairly common case is that of the textual SQL
+ that includes at least partial column information; this is when
+ we use a :class:`_expression.TextualSelect` construct.
+ This construct may have
+ unordered or ordered column information. In the ordered case, we
+ merge the cursor.description and the compiled construct's information
+ positionally, and warn if there are additional description names
+ present, however we still decode the names in cursor.description
+ as we don't have a guarantee that the names in the columns match
+ on these. In the unordered case, we match names in cursor.description
+ to that of the compiled construct based on name matching.
+ In both of these cases, the cursor.description names and the column
+ expression objects and names are indexed as result row target keys.
+
+ The final case is much less common, where we have a compiled
+ non-textual SQL expression construct, but the number of columns
+ in cursor.description doesn't match what's in the compiled
+ construct. We make the guess here that there might be textual
+ column expressions in the compiled construct that themselves include
+ a comma in them causing them to split. We do the same name-matching
+ as with textual non-ordered columns.
+
+ The name-matched system of merging is the same as that used by
+ SQLAlchemy for all cases up through the 0.9 series. Positional
+ matching for compiled SQL expressions was introduced in 1.0 as a
+ major performance feature, and positional matching for textual
+ :class:`_expression.TextualSelect` objects in 1.1.
+ As name matching is no longer
+ a common case, it was acceptable to factor it into smaller generator-
+ oriented methods that are easier to understand, but incur slightly
+ more performance overhead.
+
+ """
+
+ if (
+ num_ctx_cols
+ and cols_are_ordered
+ and not textual_ordered
+ and num_ctx_cols == len(cursor_description)
+ ):
+ self._keys = [elem[0] for elem in result_columns]
+ # pure positional 1-1 case; doesn't need to read
+ # the names from cursor.description
+
+ # most common case for Core and ORM
+
+ # this metadata is safe to cache because we are guaranteed
+ # to have the columns in the same order for new executions
+ self._safe_for_cache = True
+ return [
+ (
+ idx,
+ idx,
+ rmap_entry[RM_OBJECTS],
+ rmap_entry[RM_NAME],
+ rmap_entry[RM_RENDERED_NAME],
+ context.get_result_processor(
+ rmap_entry[RM_TYPE],
+ rmap_entry[RM_RENDERED_NAME],
+ cursor_description[idx][1],
+ ),
+ None,
+ )
+ for idx, rmap_entry in enumerate(result_columns)
+ ]
+ else:
+ # name-based or text-positional cases, where we need
+ # to read cursor.description names
+
+ if textual_ordered or (
+ ad_hoc_textual and len(cursor_description) == num_ctx_cols
+ ):
+ self._safe_for_cache = True
+ # textual positional case
+ raw_iterator = self._merge_textual_cols_by_position(
+ context, cursor_description, result_columns
+ )
+ elif num_ctx_cols:
+ # compiled SQL with a mismatch of description cols
+ # vs. compiled cols, or textual w/ unordered columns
+ # the order of columns can change if the query is
+ # against a "select *", so not safe to cache
+ self._safe_for_cache = False
+ raw_iterator = self._merge_cols_by_name(
+ context,
+ cursor_description,
+ result_columns,
+ loose_column_name_matching,
+ )
+ else:
+ # no compiled SQL, just a raw string, order of columns
+ # can change for "select *"
+ self._safe_for_cache = False
+ raw_iterator = self._merge_cols_by_none(
+ context, cursor_description
+ )
+
+ return [
+ (
+ idx,
+ ridx,
+ obj,
+ cursor_colname,
+ cursor_colname,
+ context.get_result_processor(
+ mapped_type, cursor_colname, coltype
+ ),
+ untranslated,
+ )
+ for (
+ idx,
+ ridx,
+ cursor_colname,
+ mapped_type,
+ coltype,
+ obj,
+ untranslated,
+ ) in raw_iterator
+ ]
+
+ def _colnames_from_description(self, context, cursor_description):
+ """Extract column names and data types from a cursor.description.
+
+ Applies unicode decoding, column translation, "normalization",
+ and case sensitivity rules to the names based on the dialect.
+
+ """
+
+ dialect = context.dialect
+ translate_colname = context._translate_colname
+ normalize_name = (
+ dialect.normalize_name if dialect.requires_name_normalize else None
+ )
+ untranslated = None
+
+ self._keys = []
+
+ for idx, rec in enumerate(cursor_description):
+ colname = rec[0]
+ coltype = rec[1]
+
+ if translate_colname:
+ colname, untranslated = translate_colname(colname)
+
+ if normalize_name:
+ colname = normalize_name(colname)
+
+ self._keys.append(colname)
+
+ yield idx, colname, untranslated, coltype
+
+ def _merge_textual_cols_by_position(
+ self, context, cursor_description, result_columns
+ ):
+ num_ctx_cols = len(result_columns)
+
+ if num_ctx_cols > len(cursor_description):
+ util.warn(
+ "Number of columns in textual SQL (%d) is "
+ "smaller than number of columns requested (%d)"
+ % (num_ctx_cols, len(cursor_description))
+ )
+ seen = set()
+ for (
+ idx,
+ colname,
+ untranslated,
+ coltype,
+ ) in self._colnames_from_description(context, cursor_description):
+ if idx < num_ctx_cols:
+ ctx_rec = result_columns[idx]
+ obj = ctx_rec[RM_OBJECTS]
+ ridx = idx
+ mapped_type = ctx_rec[RM_TYPE]
+ if obj[0] in seen:
+ raise exc.InvalidRequestError(
+ "Duplicate column expression requested "
+ "in textual SQL: %r" % obj[0]
+ )
+ seen.add(obj[0])
+ else:
+ mapped_type = sqltypes.NULLTYPE
+ obj = None
+ ridx = None
+ yield idx, ridx, colname, mapped_type, coltype, obj, untranslated
+
+ def _merge_cols_by_name(
+ self,
+ context,
+ cursor_description,
+ result_columns,
+ loose_column_name_matching,
+ ):
+ match_map = self._create_description_match_map(
+ result_columns, loose_column_name_matching
+ )
+ mapped_type: TypeEngine[Any]
+
+ for (
+ idx,
+ colname,
+ untranslated,
+ coltype,
+ ) in self._colnames_from_description(context, cursor_description):
+ try:
+ ctx_rec = match_map[colname]
+ except KeyError:
+ mapped_type = sqltypes.NULLTYPE
+ obj = None
+ result_columns_idx = None
+ else:
+ obj = ctx_rec[1]
+ mapped_type = ctx_rec[2]
+ result_columns_idx = ctx_rec[3]
+ yield (
+ idx,
+ result_columns_idx,
+ colname,
+ mapped_type,
+ coltype,
+ obj,
+ untranslated,
+ )
+
+ @classmethod
+ def _create_description_match_map(
+ cls,
+ result_columns: List[ResultColumnsEntry],
+ loose_column_name_matching: bool = False,
+ ) -> Dict[
+ Union[str, object], Tuple[str, Tuple[Any, ...], TypeEngine[Any], int]
+ ]:
+ """when matching cursor.description to a set of names that are present
+ in a Compiled object, as is the case with TextualSelect, get all the
+ names we expect might match those in cursor.description.
+ """
+
+ d: Dict[
+ Union[str, object],
+ Tuple[str, Tuple[Any, ...], TypeEngine[Any], int],
+ ] = {}
+ for ridx, elem in enumerate(result_columns):
+ key = elem[RM_RENDERED_NAME]
+ if key in d:
+ # conflicting keyname - just add the column-linked objects
+ # to the existing record. if there is a duplicate column
+ # name in the cursor description, this will allow all of those
+ # objects to raise an ambiguous column error
+ e_name, e_obj, e_type, e_ridx = d[key]
+ d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx
+ else:
+ d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx)
+
+ if loose_column_name_matching:
+ # when using a textual statement with an unordered set
+ # of columns that line up, we are expecting the user
+ # to be using label names in the SQL that match to the column
+ # expressions. Enable more liberal matching for this case;
+ # duplicate keys that are ambiguous will be fixed later.
+ for r_key in elem[RM_OBJECTS]:
+ d.setdefault(
+ r_key,
+ (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx),
+ )
+ return d
+
+ def _merge_cols_by_none(self, context, cursor_description):
+ for (
+ idx,
+ colname,
+ untranslated,
+ coltype,
+ ) in self._colnames_from_description(context, cursor_description):
+ yield (
+ idx,
+ None,
+ colname,
+ sqltypes.NULLTYPE,
+ coltype,
+ None,
+ untranslated,
+ )
+
+ if not TYPE_CHECKING:
+
+ def _key_fallback(
+ self, key: Any, err: Optional[Exception], raiseerr: bool = True
+ ) -> Optional[NoReturn]:
+ if raiseerr:
+ if self._unpickled and isinstance(key, elements.ColumnElement):
+ raise exc.NoSuchColumnError(
+ "Row was unpickled; lookup by ColumnElement "
+ "is unsupported"
+ ) from err
+ else:
+ raise exc.NoSuchColumnError(
+ "Could not locate column in row for column '%s'"
+ % util.string_or_unprintable(key)
+ ) from err
+ else:
+ return None
+
+ def _raise_for_ambiguous_column_name(self, rec):
+ raise exc.InvalidRequestError(
+ "Ambiguous column name '%s' in "
+ "result set column descriptions" % rec[MD_LOOKUP_KEY]
+ )
+
+ def _index_for_key(self, key: Any, raiseerr: bool = True) -> Optional[int]:
+ # TODO: can consider pre-loading ints and negative ints
+ # into _keymap - also no coverage here
+ if isinstance(key, int):
+ key = self._keys[key]
+
+ try:
+ rec = self._keymap[key]
+ except KeyError as ke:
+ x = self._key_fallback(key, ke, raiseerr)
+ assert x is None
+ return None
+
+ index = rec[0]
+
+ if index is None:
+ self._raise_for_ambiguous_column_name(rec)
+ return index
+
+ def _indexes_for_keys(self, keys):
+ try:
+ return [self._keymap[key][0] for key in keys]
+ except KeyError as ke:
+ # ensure it raises
+ CursorResultMetaData._key_fallback(self, ke.args[0], ke)
+
+ def _metadata_for_keys(
+ self, keys: Sequence[Any]
+ ) -> Iterator[_NonAmbigCursorKeyMapRecType]:
+ for key in keys:
+ if int in key.__class__.__mro__:
+ key = self._keys[key]
+
+ try:
+ rec = self._keymap[key]
+ except KeyError as ke:
+ # ensure it raises
+ CursorResultMetaData._key_fallback(self, ke.args[0], ke)
+
+ index = rec[MD_INDEX]
+
+ if index is None:
+ self._raise_for_ambiguous_column_name(rec)
+
+ yield cast(_NonAmbigCursorKeyMapRecType, rec)
+
+ def __getstate__(self):
+ # TODO: consider serializing this as SimpleResultMetaData
+ return {
+ "_keymap": {
+ key: (
+ rec[MD_INDEX],
+ rec[MD_RESULT_MAP_INDEX],
+ [],
+ key,
+ rec[MD_RENDERED_NAME],
+ None,
+ None,
+ )
+ for key, rec in self._keymap.items()
+ if isinstance(key, (str, int))
+ },
+ "_keys": self._keys,
+ "_translated_indexes": self._translated_indexes,
+ }
+
+ def __setstate__(self, state):
+ self._processors = [None for _ in range(len(state["_keys"]))]
+ self._keymap = state["_keymap"]
+ self._keymap_by_result_column_idx = None
+ self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX)
+ self._keys = state["_keys"]
+ self._unpickled = True
+ if state["_translated_indexes"]:
+ self._translated_indexes = cast(
+ "List[int]", state["_translated_indexes"]
+ )
+ self._tuplefilter = tuplegetter(*self._translated_indexes)
+ else:
+ self._translated_indexes = self._tuplefilter = None
+
+
+class ResultFetchStrategy:
+ """Define a fetching strategy for a result object.
+
+
+ .. versionadded:: 1.4
+
+ """
+
+ __slots__ = ()
+
+ alternate_cursor_description: Optional[_DBAPICursorDescription] = None
+
+ def soft_close(
+ self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
+ ) -> None:
+ raise NotImplementedError()
+
+ def hard_close(
+ self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
+ ) -> None:
+ raise NotImplementedError()
+
+ def yield_per(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: Optional[DBAPICursor],
+ num: int,
+ ) -> None:
+ return
+
+ def fetchone(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: DBAPICursor,
+ hard_close: bool = False,
+ ) -> Any:
+ raise NotImplementedError()
+
+ def fetchmany(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: DBAPICursor,
+ size: Optional[int] = None,
+ ) -> Any:
+ raise NotImplementedError()
+
+ def fetchall(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: DBAPICursor,
+ ) -> Any:
+ raise NotImplementedError()
+
+ def handle_exception(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: Optional[DBAPICursor],
+ err: BaseException,
+ ) -> NoReturn:
+ raise err
+
+
+class NoCursorFetchStrategy(ResultFetchStrategy):
+ """Cursor strategy for a result that has no open cursor.
+
+ There are two varieties of this strategy, one for DQL and one for
+ DML (and also DDL), each of which represent a result that had a cursor
+ but no longer has one.
+
+ """
+
+ __slots__ = ()
+
+ def soft_close(self, result, dbapi_cursor):
+ pass
+
+ def hard_close(self, result, dbapi_cursor):
+ pass
+
+ def fetchone(self, result, dbapi_cursor, hard_close=False):
+ return self._non_result(result, None)
+
+ def fetchmany(self, result, dbapi_cursor, size=None):
+ return self._non_result(result, [])
+
+ def fetchall(self, result, dbapi_cursor):
+ return self._non_result(result, [])
+
+ def _non_result(self, result, default, err=None):
+ raise NotImplementedError()
+
+
+class NoCursorDQLFetchStrategy(NoCursorFetchStrategy):
+ """Cursor strategy for a DQL result that has no open cursor.
+
+ This is a result set that can return rows, i.e. for a SELECT, or for an
+ INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state
+ where the cursor is closed and no rows remain available. The owning result
+ object may or may not be "hard closed", which determines if the fetch
+ methods send empty results or raise for closed result.
+
+ """
+
+ __slots__ = ()
+
+ def _non_result(self, result, default, err=None):
+ if result.closed:
+ raise exc.ResourceClosedError(
+ "This result object is closed."
+ ) from err
+ else:
+ return default
+
+
+_NO_CURSOR_DQL = NoCursorDQLFetchStrategy()
+
+
+class NoCursorDMLFetchStrategy(NoCursorFetchStrategy):
+ """Cursor strategy for a DML result that has no open cursor.
+
+ This is a result set that does not return rows, i.e. for an INSERT,
+ UPDATE, DELETE that does not include RETURNING.
+
+ """
+
+ __slots__ = ()
+
+ def _non_result(self, result, default, err=None):
+ # we only expect to have a _NoResultMetaData() here right now.
+ assert not result._metadata.returns_rows
+ result._metadata._we_dont_return_rows(err)
+
+
+_NO_CURSOR_DML = NoCursorDMLFetchStrategy()
+
+
+class CursorFetchStrategy(ResultFetchStrategy):
+ """Call fetch methods from a DBAPI cursor.
+
+ Alternate versions of this class may instead buffer the rows from
+ cursors or not use cursors at all.
+
+ """
+
+ __slots__ = ()
+
+ def soft_close(
+ self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
+ ) -> None:
+ result.cursor_strategy = _NO_CURSOR_DQL
+
+ def hard_close(
+ self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor]
+ ) -> None:
+ result.cursor_strategy = _NO_CURSOR_DQL
+
+ def handle_exception(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: Optional[DBAPICursor],
+ err: BaseException,
+ ) -> NoReturn:
+ result.connection._handle_dbapi_exception(
+ err, None, None, dbapi_cursor, result.context
+ )
+
+ def yield_per(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: Optional[DBAPICursor],
+ num: int,
+ ) -> None:
+ result.cursor_strategy = BufferedRowCursorFetchStrategy(
+ dbapi_cursor,
+ {"max_row_buffer": num},
+ initial_buffer=collections.deque(),
+ growth_factor=0,
+ )
+
+ def fetchone(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: DBAPICursor,
+ hard_close: bool = False,
+ ) -> Any:
+ try:
+ row = dbapi_cursor.fetchone()
+ if row is None:
+ result._soft_close(hard=hard_close)
+ return row
+ except BaseException as e:
+ self.handle_exception(result, dbapi_cursor, e)
+
+ def fetchmany(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: DBAPICursor,
+ size: Optional[int] = None,
+ ) -> Any:
+ try:
+ if size is None:
+ l = dbapi_cursor.fetchmany()
+ else:
+ l = dbapi_cursor.fetchmany(size)
+
+ if not l:
+ result._soft_close()
+ return l
+ except BaseException as e:
+ self.handle_exception(result, dbapi_cursor, e)
+
+ def fetchall(
+ self,
+ result: CursorResult[Any],
+ dbapi_cursor: DBAPICursor,
+ ) -> Any:
+ try:
+ rows = dbapi_cursor.fetchall()
+ result._soft_close()
+ return rows
+ except BaseException as e:
+ self.handle_exception(result, dbapi_cursor, e)
+
+
+_DEFAULT_FETCH = CursorFetchStrategy()
+
+
+class BufferedRowCursorFetchStrategy(CursorFetchStrategy):
+ """A cursor fetch strategy with row buffering behavior.
+
+ This strategy buffers the contents of a selection of rows
+ before ``fetchone()`` is called. This is to allow the results of
+ ``cursor.description`` to be available immediately, when
+ interfacing with a DB-API that requires rows to be consumed before
+ this information is available (currently psycopg2, when used with
+ server-side cursors).
+
+ The pre-fetching behavior fetches only one row initially, and then
+ grows its buffer size by a fixed amount with each successive need
+ for additional rows up the ``max_row_buffer`` size, which defaults
+ to 1000::
+
+ with psycopg2_engine.connect() as conn:
+
+ result = conn.execution_options(
+ stream_results=True, max_row_buffer=50
+ ).execute(text("select * from table"))
+
+ .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows.
+
+ .. seealso::
+
+ :ref:`psycopg2_execution_options`
+ """
+
+ __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor")
+
+ def __init__(
+ self,
+ dbapi_cursor,
+ execution_options,
+ growth_factor=5,
+ initial_buffer=None,
+ ):
+ self._max_row_buffer = execution_options.get("max_row_buffer", 1000)
+
+ if initial_buffer is not None:
+ self._rowbuffer = initial_buffer
+ else:
+ self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1))
+ self._growth_factor = growth_factor
+
+ if growth_factor:
+ self._bufsize = min(self._max_row_buffer, self._growth_factor)
+ else:
+ self._bufsize = self._max_row_buffer
+
+ @classmethod
+ def create(cls, result):
+ return BufferedRowCursorFetchStrategy(
+ result.cursor,
+ result.context.execution_options,
+ )
+
+ def _buffer_rows(self, result, dbapi_cursor):
+ """this is currently used only by fetchone()."""
+
+ size = self._bufsize
+ try:
+ if size < 1:
+ new_rows = dbapi_cursor.fetchall()
+ else:
+ new_rows = dbapi_cursor.fetchmany(size)
+ except BaseException as e:
+ self.handle_exception(result, dbapi_cursor, e)
+
+ if not new_rows:
+ return
+ self._rowbuffer = collections.deque(new_rows)
+ if self._growth_factor and size < self._max_row_buffer:
+ self._bufsize = min(
+ self._max_row_buffer, size * self._growth_factor
+ )
+
+ def yield_per(self, result, dbapi_cursor, num):
+ self._growth_factor = 0
+ self._max_row_buffer = self._bufsize = num
+
+ def soft_close(self, result, dbapi_cursor):
+ self._rowbuffer.clear()
+ super().soft_close(result, dbapi_cursor)
+
+ def hard_close(self, result, dbapi_cursor):
+ self._rowbuffer.clear()
+ super().hard_close(result, dbapi_cursor)
+
+ def fetchone(self, result, dbapi_cursor, hard_close=False):
+ if not self._rowbuffer:
+ self._buffer_rows(result, dbapi_cursor)
+ if not self._rowbuffer:
+ try:
+ result._soft_close(hard=hard_close)
+ except BaseException as e:
+ self.handle_exception(result, dbapi_cursor, e)
+ return None
+ return self._rowbuffer.popleft()
+
+ def fetchmany(self, result, dbapi_cursor, size=None):
+ if size is None:
+ return self.fetchall(result, dbapi_cursor)
+
+ buf = list(self._rowbuffer)
+ lb = len(buf)
+ if size > lb:
+ try:
+ new = dbapi_cursor.fetchmany(size - lb)
+ except BaseException as e:
+ self.handle_exception(result, dbapi_cursor, e)
+ else:
+ if not new:
+ result._soft_close()
+ else:
+ buf.extend(new)
+
+ result = buf[0:size]
+ self._rowbuffer = collections.deque(buf[size:])
+ return result
+
+ def fetchall(self, result, dbapi_cursor):
+ try:
+ ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall())
+ self._rowbuffer.clear()
+ result._soft_close()
+ return ret
+ except BaseException as e:
+ self.handle_exception(result, dbapi_cursor, e)
+
+
+class FullyBufferedCursorFetchStrategy(CursorFetchStrategy):
+ """A cursor strategy that buffers rows fully upon creation.
+
+ Used for operations where a result is to be delivered
+ after the database conversation can not be continued,
+ such as MSSQL INSERT...OUTPUT after an autocommit.
+
+ """
+
+ __slots__ = ("_rowbuffer", "alternate_cursor_description")
+
+ def __init__(
+ self, dbapi_cursor, alternate_description=None, initial_buffer=None
+ ):
+ self.alternate_cursor_description = alternate_description
+ if initial_buffer is not None:
+ self._rowbuffer = collections.deque(initial_buffer)
+ else:
+ self._rowbuffer = collections.deque(dbapi_cursor.fetchall())
+
+ def yield_per(self, result, dbapi_cursor, num):
+ pass
+
+ def soft_close(self, result, dbapi_cursor):
+ self._rowbuffer.clear()
+ super().soft_close(result, dbapi_cursor)
+
+ def hard_close(self, result, dbapi_cursor):
+ self._rowbuffer.clear()
+ super().hard_close(result, dbapi_cursor)
+
+ def fetchone(self, result, dbapi_cursor, hard_close=False):
+ if self._rowbuffer:
+ return self._rowbuffer.popleft()
+ else:
+ result._soft_close(hard=hard_close)
+ return None
+
+ def fetchmany(self, result, dbapi_cursor, size=None):
+ if size is None:
+ return self.fetchall(result, dbapi_cursor)
+
+ buf = list(self._rowbuffer)
+ rows = buf[0:size]
+ self._rowbuffer = collections.deque(buf[size:])
+ if not rows:
+ result._soft_close()
+ return rows
+
+ def fetchall(self, result, dbapi_cursor):
+ ret = self._rowbuffer
+ self._rowbuffer = collections.deque()
+ result._soft_close()
+ return ret
+
+
+class _NoResultMetaData(ResultMetaData):
+ __slots__ = ()
+
+ returns_rows = False
+
+ def _we_dont_return_rows(self, err=None):
+ raise exc.ResourceClosedError(
+ "This result object does not return rows. "
+ "It has been closed automatically."
+ ) from err
+
+ def _index_for_key(self, keys, raiseerr):
+ self._we_dont_return_rows()
+
+ def _metadata_for_keys(self, key):
+ self._we_dont_return_rows()
+
+ def _reduce(self, keys):
+ self._we_dont_return_rows()
+
+ @property
+ def _keymap(self):
+ self._we_dont_return_rows()
+
+ @property
+ def _key_to_index(self):
+ self._we_dont_return_rows()
+
+ @property
+ def _processors(self):
+ self._we_dont_return_rows()
+
+ @property
+ def keys(self):
+ self._we_dont_return_rows()
+
+
+_NO_RESULT_METADATA = _NoResultMetaData()
+
+
+def null_dml_result() -> IteratorResult[Any]:
+ it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([]))
+ it._soft_close()
+ return it
+
+
+class CursorResult(Result[_T]):
+ """A Result that is representing state from a DBAPI cursor.
+
+ .. versionchanged:: 1.4 The :class:`.CursorResult``
+ class replaces the previous :class:`.ResultProxy` interface.
+ This classes are based on the :class:`.Result` calling API
+ which provides an updated usage model and calling facade for
+ SQLAlchemy Core and SQLAlchemy ORM.
+
+ Returns database rows via the :class:`.Row` class, which provides
+ additional API features and behaviors on top of the raw data returned by
+ the DBAPI. Through the use of filters such as the :meth:`.Result.scalars`
+ method, other kinds of objects may also be returned.
+
+ .. seealso::
+
+ :ref:`tutorial_selecting_data` - introductory material for accessing
+ :class:`_engine.CursorResult` and :class:`.Row` objects.
+
+ """
+
+ __slots__ = (
+ "context",
+ "dialect",
+ "cursor",
+ "cursor_strategy",
+ "_echo",
+ "connection",
+ )
+
+ _metadata: Union[CursorResultMetaData, _NoResultMetaData]
+ _no_result_metadata = _NO_RESULT_METADATA
+ _soft_closed: bool = False
+ closed: bool = False
+ _is_cursor = True
+
+ context: DefaultExecutionContext
+ dialect: Dialect
+ cursor_strategy: ResultFetchStrategy
+ connection: Connection
+
+ def __init__(
+ self,
+ context: DefaultExecutionContext,
+ cursor_strategy: ResultFetchStrategy,
+ cursor_description: Optional[_DBAPICursorDescription],
+ ):
+ self.context = context
+ self.dialect = context.dialect
+ self.cursor = context.cursor
+ self.cursor_strategy = cursor_strategy
+ self.connection = context.root_connection
+ self._echo = echo = (
+ self.connection._echo and context.engine._should_log_debug()
+ )
+
+ if cursor_description is not None:
+ # inline of Result._row_getter(), set up an initial row
+ # getter assuming no transformations will be called as this
+ # is the most common case
+
+ metadata = self._init_metadata(context, cursor_description)
+
+ _make_row: Any
+ _make_row = functools.partial(
+ Row,
+ metadata,
+ metadata._effective_processors,
+ metadata._key_to_index,
+ )
+
+ if context._num_sentinel_cols:
+ sentinel_filter = operator.itemgetter(
+ slice(-context._num_sentinel_cols)
+ )
+
+ def _sliced_row(raw_data):
+ return _make_row(sentinel_filter(raw_data))
+
+ sliced_row = _sliced_row
+ else:
+ sliced_row = _make_row
+
+ if echo:
+ log = self.context.connection._log_debug
+
+ def _log_row(row):
+ log("Row %r", sql_util._repr_row(row))
+ return row
+
+ self._row_logging_fn = _log_row
+
+ def _make_row_2(row):
+ return _log_row(sliced_row(row))
+
+ make_row = _make_row_2
+ else:
+ make_row = sliced_row
+ self._set_memoized_attribute("_row_getter", make_row)
+
+ else:
+ assert context._num_sentinel_cols == 0
+ self._metadata = self._no_result_metadata
+
+ def _init_metadata(self, context, cursor_description):
+ if context.compiled:
+ compiled = context.compiled
+
+ if compiled._cached_metadata:
+ metadata = compiled._cached_metadata
+ else:
+ metadata = CursorResultMetaData(self, cursor_description)
+ if metadata._safe_for_cache:
+ compiled._cached_metadata = metadata
+
+ # result rewrite/ adapt step. this is to suit the case
+ # when we are invoked against a cached Compiled object, we want
+ # to rewrite the ResultMetaData to reflect the Column objects
+ # that are in our current SQL statement object, not the one
+ # that is associated with the cached Compiled object.
+ # the Compiled object may also tell us to not
+ # actually do this step; this is to support the ORM where
+ # it is to produce a new Result object in any case, and will
+ # be using the cached Column objects against this database result
+ # so we don't want to rewrite them.
+ #
+ # Basically this step suits the use case where the end user
+ # is using Core SQL expressions and is accessing columns in the
+ # result row using row._mapping[table.c.column].
+ if (
+ not context.execution_options.get(
+ "_result_disable_adapt_to_context", False
+ )
+ and compiled._result_columns
+ and context.cache_hit is context.dialect.CACHE_HIT
+ and compiled.statement is not context.invoked_statement
+ ):
+ metadata = metadata._adapt_to_context(context)
+
+ self._metadata = metadata
+
+ else:
+ self._metadata = metadata = CursorResultMetaData(
+ self, cursor_description
+ )
+ if self._echo:
+ context.connection._log_debug(
+ "Col %r", tuple(x[0] for x in cursor_description)
+ )
+ return metadata
+
+ def _soft_close(self, hard=False):
+ """Soft close this :class:`_engine.CursorResult`.
+
+ This releases all DBAPI cursor resources, but leaves the
+ CursorResult "open" from a semantic perspective, meaning the
+ fetchXXX() methods will continue to return empty results.
+
+ This method is called automatically when:
+
+ * all result rows are exhausted using the fetchXXX() methods.
+ * cursor.description is None.
+
+ This method is **not public**, but is documented in order to clarify
+ the "autoclose" process used.
+
+ .. seealso::
+
+ :meth:`_engine.CursorResult.close`
+
+
+ """
+
+ if (not hard and self._soft_closed) or (hard and self.closed):
+ return
+
+ if hard:
+ self.closed = True
+ self.cursor_strategy.hard_close(self, self.cursor)
+ else:
+ self.cursor_strategy.soft_close(self, self.cursor)
+
+ if not self._soft_closed:
+ cursor = self.cursor
+ self.cursor = None # type: ignore
+ self.connection._safe_close_cursor(cursor)
+ self._soft_closed = True
+
+ @property
+ def inserted_primary_key_rows(self):
+ """Return the value of
+ :attr:`_engine.CursorResult.inserted_primary_key`
+ as a row contained within a list; some dialects may support a
+ multiple row form as well.
+
+ .. note:: As indicated below, in current SQLAlchemy versions this
+ accessor is only useful beyond what's already supplied by
+ :attr:`_engine.CursorResult.inserted_primary_key` when using the
+ :ref:`postgresql_psycopg2` dialect. Future versions hope to
+ generalize this feature to more dialects.
+
+ This accessor is added to support dialects that offer the feature
+ that is currently implemented by the :ref:`psycopg2_executemany_mode`
+ feature, currently **only the psycopg2 dialect**, which provides
+ for many rows to be INSERTed at once while still retaining the
+ behavior of being able to return server-generated primary key values.
+
+ * **When using the psycopg2 dialect, or other dialects that may support
+ "fast executemany" style inserts in upcoming releases** : When
+ invoking an INSERT statement while passing a list of rows as the
+ second argument to :meth:`_engine.Connection.execute`, this accessor
+ will then provide a list of rows, where each row contains the primary
+ key value for each row that was INSERTed.
+
+ * **When using all other dialects / backends that don't yet support
+ this feature**: This accessor is only useful for **single row INSERT
+ statements**, and returns the same information as that of the
+ :attr:`_engine.CursorResult.inserted_primary_key` within a
+ single-element list. When an INSERT statement is executed in
+ conjunction with a list of rows to be INSERTed, the list will contain
+ one row per row inserted in the statement, however it will contain
+ ``None`` for any server-generated values.
+
+ Future releases of SQLAlchemy will further generalize the
+ "fast execution helper" feature of psycopg2 to suit other dialects,
+ thus allowing this accessor to be of more general use.
+
+ .. versionadded:: 1.4
+
+ .. seealso::
+
+ :attr:`_engine.CursorResult.inserted_primary_key`
+
+ """
+ if not self.context.compiled:
+ raise exc.InvalidRequestError(
+ "Statement is not a compiled expression construct."
+ )
+ elif not self.context.isinsert:
+ raise exc.InvalidRequestError(
+ "Statement is not an insert() expression construct."
+ )
+ elif self.context._is_explicit_returning:
+ raise exc.InvalidRequestError(
+ "Can't call inserted_primary_key "
+ "when returning() "
+ "is used."
+ )
+ return self.context.inserted_primary_key_rows
+
+ @property
+ def inserted_primary_key(self):
+ """Return the primary key for the row just inserted.
+
+ The return value is a :class:`_result.Row` object representing
+ a named tuple of primary key values in the order in which the
+ primary key columns are configured in the source
+ :class:`_schema.Table`.
+
+ .. versionchanged:: 1.4.8 - the
+ :attr:`_engine.CursorResult.inserted_primary_key`
+ value is now a named tuple via the :class:`_result.Row` class,
+ rather than a plain tuple.
+
+ This accessor only applies to single row :func:`_expression.insert`
+ constructs which did not explicitly specify
+ :meth:`_expression.Insert.returning`. Support for multirow inserts,
+ while not yet available for most backends, would be accessed using
+ the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor.
+
+ Note that primary key columns which specify a server_default clause, or
+ otherwise do not qualify as "autoincrement" columns (see the notes at
+ :class:`_schema.Column`), and were generated using the database-side
+ default, will appear in this list as ``None`` unless the backend
+ supports "returning" and the insert statement executed with the
+ "implicit returning" enabled.
+
+ Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
+ statement is not a compiled expression construct
+ or is not an insert() construct.
+
+ """
+
+ if self.context.executemany:
+ raise exc.InvalidRequestError(
+ "This statement was an executemany call; if primary key "
+ "returning is supported, please "
+ "use .inserted_primary_key_rows."
+ )
+
+ ikp = self.inserted_primary_key_rows
+ if ikp:
+ return ikp[0]
+ else:
+ return None
+
+ def last_updated_params(self):
+ """Return the collection of updated parameters from this
+ execution.
+
+ Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
+ statement is not a compiled expression construct
+ or is not an update() construct.
+
+ """
+ if not self.context.compiled:
+ raise exc.InvalidRequestError(
+ "Statement is not a compiled expression construct."
+ )
+ elif not self.context.isupdate:
+ raise exc.InvalidRequestError(
+ "Statement is not an update() expression construct."
+ )
+ elif self.context.executemany:
+ return self.context.compiled_parameters
+ else:
+ return self.context.compiled_parameters[0]
+
+ def last_inserted_params(self):
+ """Return the collection of inserted parameters from this
+ execution.
+
+ Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
+ statement is not a compiled expression construct
+ or is not an insert() construct.
+
+ """
+ if not self.context.compiled:
+ raise exc.InvalidRequestError(
+ "Statement is not a compiled expression construct."
+ )
+ elif not self.context.isinsert:
+ raise exc.InvalidRequestError(
+ "Statement is not an insert() expression construct."
+ )
+ elif self.context.executemany:
+ return self.context.compiled_parameters
+ else:
+ return self.context.compiled_parameters[0]
+
+ @property
+ def returned_defaults_rows(self):
+ """Return a list of rows each containing the values of default
+ columns that were fetched using
+ the :meth:`.ValuesBase.return_defaults` feature.
+
+ The return value is a list of :class:`.Row` objects.
+
+ .. versionadded:: 1.4
+
+ """
+ return self.context.returned_default_rows
+
+ def splice_horizontally(self, other):
+ """Return a new :class:`.CursorResult` that "horizontally splices"
+ together the rows of this :class:`.CursorResult` with that of another
+ :class:`.CursorResult`.
+
+ .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
+ not intended for general use.
+
+ "horizontally splices" means that for each row in the first and second
+ result sets, a new row that concatenates the two rows together is
+ produced, which then becomes the new row. The incoming
+ :class:`.CursorResult` must have the identical number of rows. It is
+ typically expected that the two result sets come from the same sort
+ order as well, as the result rows are spliced together based on their
+ position in the result.
+
+ The expected use case here is so that multiple INSERT..RETURNING
+ statements (which definitely need to be sorted) against different
+ tables can produce a single result that looks like a JOIN of those two
+ tables.
+
+ E.g.::
+
+ r1 = connection.execute(
+ users.insert().returning(
+ users.c.user_name,
+ users.c.user_id,
+ sort_by_parameter_order=True
+ ),
+ user_values
+ )
+
+ r2 = connection.execute(
+ addresses.insert().returning(
+ addresses.c.address_id,
+ addresses.c.address,
+ addresses.c.user_id,
+ sort_by_parameter_order=True
+ ),
+ address_values
+ )
+
+ rows = r1.splice_horizontally(r2).all()
+ assert (
+ rows ==
+ [
+ ("john", 1, 1, "foo@bar.com", 1),
+ ("jack", 2, 2, "bar@bat.com", 2),
+ ]
+ )
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :meth:`.CursorResult.splice_vertically`
+
+
+ """
+
+ clone = self._generate()
+ total_rows = [
+ tuple(r1) + tuple(r2)
+ for r1, r2 in zip(
+ list(self._raw_row_iterator()),
+ list(other._raw_row_iterator()),
+ )
+ ]
+
+ clone._metadata = clone._metadata._splice_horizontally(other._metadata)
+
+ clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
+ None,
+ initial_buffer=total_rows,
+ )
+ clone._reset_memoizations()
+ return clone
+
+ def splice_vertically(self, other):
+ """Return a new :class:`.CursorResult` that "vertically splices",
+ i.e. "extends", the rows of this :class:`.CursorResult` with that of
+ another :class:`.CursorResult`.
+
+ .. tip:: This method is for the benefit of the SQLAlchemy ORM and is
+ not intended for general use.
+
+ "vertically splices" means the rows of the given result are appended to
+ the rows of this cursor result. The incoming :class:`.CursorResult`
+ must have rows that represent the identical list of columns in the
+ identical order as they are in this :class:`.CursorResult`.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :meth:`.CursorResult.splice_horizontally`
+
+ """
+ clone = self._generate()
+ total_rows = list(self._raw_row_iterator()) + list(
+ other._raw_row_iterator()
+ )
+
+ clone.cursor_strategy = FullyBufferedCursorFetchStrategy(
+ None,
+ initial_buffer=total_rows,
+ )
+ clone._reset_memoizations()
+ return clone
+
+ def _rewind(self, rows):
+ """rewind this result back to the given rowset.
+
+ this is used internally for the case where an :class:`.Insert`
+ construct combines the use of
+ :meth:`.Insert.return_defaults` along with the
+ "supplemental columns" feature.
+
+ """
+
+ if self._echo:
+ self.context.connection._log_debug(
+ "CursorResult rewound %d row(s)", len(rows)
+ )
+
+ # the rows given are expected to be Row objects, so we
+ # have to clear out processors which have already run on these
+ # rows
+ self._metadata = cast(
+ CursorResultMetaData, self._metadata
+ )._remove_processors()
+
+ self.cursor_strategy = FullyBufferedCursorFetchStrategy(
+ None,
+ # TODO: if these are Row objects, can we save on not having to
+ # re-make new Row objects out of them a second time? is that
+ # what's actually happening right now? maybe look into this
+ initial_buffer=rows,
+ )
+ self._reset_memoizations()
+ return self
+
+ @property
+ def returned_defaults(self):
+ """Return the values of default columns that were fetched using
+ the :meth:`.ValuesBase.return_defaults` feature.
+
+ The value is an instance of :class:`.Row`, or ``None``
+ if :meth:`.ValuesBase.return_defaults` was not used or if the
+ backend does not support RETURNING.
+
+ .. seealso::
+
+ :meth:`.ValuesBase.return_defaults`
+
+ """
+
+ if self.context.executemany:
+ raise exc.InvalidRequestError(
+ "This statement was an executemany call; if return defaults "
+ "is supported, please use .returned_defaults_rows."
+ )
+
+ rows = self.context.returned_default_rows
+ if rows:
+ return rows[0]
+ else:
+ return None
+
+ def lastrow_has_defaults(self):
+ """Return ``lastrow_has_defaults()`` from the underlying
+ :class:`.ExecutionContext`.
+
+ See :class:`.ExecutionContext` for details.
+
+ """
+
+ return self.context.lastrow_has_defaults()
+
+ def postfetch_cols(self):
+ """Return ``postfetch_cols()`` from the underlying
+ :class:`.ExecutionContext`.
+
+ See :class:`.ExecutionContext` for details.
+
+ Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
+ statement is not a compiled expression construct
+ or is not an insert() or update() construct.
+
+ """
+
+ if not self.context.compiled:
+ raise exc.InvalidRequestError(
+ "Statement is not a compiled expression construct."
+ )
+ elif not self.context.isinsert and not self.context.isupdate:
+ raise exc.InvalidRequestError(
+ "Statement is not an insert() or update() "
+ "expression construct."
+ )
+ return self.context.postfetch_cols
+
+ def prefetch_cols(self):
+ """Return ``prefetch_cols()`` from the underlying
+ :class:`.ExecutionContext`.
+
+ See :class:`.ExecutionContext` for details.
+
+ Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed
+ statement is not a compiled expression construct
+ or is not an insert() or update() construct.
+
+ """
+
+ if not self.context.compiled:
+ raise exc.InvalidRequestError(
+ "Statement is not a compiled expression construct."
+ )
+ elif not self.context.isinsert and not self.context.isupdate:
+ raise exc.InvalidRequestError(
+ "Statement is not an insert() or update() "
+ "expression construct."
+ )
+ return self.context.prefetch_cols
+
+ def supports_sane_rowcount(self):
+ """Return ``supports_sane_rowcount`` from the dialect.
+
+ See :attr:`_engine.CursorResult.rowcount` for background.
+
+ """
+
+ return self.dialect.supports_sane_rowcount
+
+ def supports_sane_multi_rowcount(self):
+ """Return ``supports_sane_multi_rowcount`` from the dialect.
+
+ See :attr:`_engine.CursorResult.rowcount` for background.
+
+ """
+
+ return self.dialect.supports_sane_multi_rowcount
+
+ @util.memoized_property
+ def rowcount(self) -> int:
+ """Return the 'rowcount' for this result.
+
+ The primary purpose of 'rowcount' is to report the number of rows
+ matched by the WHERE criterion of an UPDATE or DELETE statement
+ executed once (i.e. for a single parameter set), which may then be
+ compared to the number of rows expected to be updated or deleted as a
+ means of asserting data integrity.
+
+ This attribute is transferred from the ``cursor.rowcount`` attribute
+ of the DBAPI before the cursor is closed, to support DBAPIs that
+ don't make this value available after cursor close. Some DBAPIs may
+ offer meaningful values for other kinds of statements, such as INSERT
+ and SELECT statements as well. In order to retrieve ``cursor.rowcount``
+ for these statements, set the
+ :paramref:`.Connection.execution_options.preserve_rowcount`
+ execution option to True, which will cause the ``cursor.rowcount``
+ value to be unconditionally memoized before any results are returned
+ or the cursor is closed, regardless of statement type.
+
+ For cases where the DBAPI does not support rowcount for a particular
+ kind of statement and/or execution, the returned value will be ``-1``,
+ which is delivered directly from the DBAPI and is part of :pep:`249`.
+ All DBAPIs should support rowcount for single-parameter-set
+ UPDATE and DELETE statements, however.
+
+ .. note::
+
+ Notes regarding :attr:`_engine.CursorResult.rowcount`:
+
+
+ * This attribute returns the number of rows *matched*,
+ which is not necessarily the same as the number of rows
+ that were actually *modified*. For example, an UPDATE statement
+ may have no net change on a given row if the SET values
+ given are the same as those present in the row already.
+ Such a row would be matched but not modified.
+ On backends that feature both styles, such as MySQL,
+ rowcount is configured to return the match
+ count in all cases.
+
+ * :attr:`_engine.CursorResult.rowcount` in the default case is
+ *only* useful in conjunction with an UPDATE or DELETE statement,
+ and only with a single set of parameters. For other kinds of
+ statements, SQLAlchemy will not attempt to pre-memoize the value
+ unless the
+ :paramref:`.Connection.execution_options.preserve_rowcount`
+ execution option is used. Note that contrary to :pep:`249`, many
+ DBAPIs do not support rowcount values for statements that are not
+ UPDATE or DELETE, particularly when rows are being returned which
+ are not fully pre-buffered. DBAPIs that dont support rowcount
+ for a particular kind of statement should return the value ``-1``
+ for such statements.
+
+ * :attr:`_engine.CursorResult.rowcount` may not be meaningful
+ when executing a single statement with multiple parameter sets
+ (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount"
+ values across multiple parameter sets and will return ``-1``
+ when accessed.
+
+ * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support
+ a correct population of :attr:`_engine.CursorResult.rowcount`
+ when the :paramref:`.Connection.execution_options.preserve_rowcount`
+ execution option is set to True.
+
+ * Statements that use RETURNING may not support rowcount, returning
+ a ``-1`` value instead.
+
+ .. seealso::
+
+ :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial`
+
+ :paramref:`.Connection.execution_options.preserve_rowcount`
+
+ """ # noqa: E501
+ try:
+ return self.context.rowcount
+ except BaseException as e:
+ self.cursor_strategy.handle_exception(self, self.cursor, e)
+ raise # not called
+
+ @property
+ def lastrowid(self):
+ """Return the 'lastrowid' accessor on the DBAPI cursor.
+
+ This is a DBAPI specific method and is only functional
+ for those backends which support it, for statements
+ where it is appropriate. It's behavior is not
+ consistent across backends.
+
+ Usage of this method is normally unnecessary when
+ using insert() expression constructs; the
+ :attr:`~CursorResult.inserted_primary_key` attribute provides a
+ tuple of primary key values for a newly inserted row,
+ regardless of database backend.
+
+ """
+ try:
+ return self.context.get_lastrowid()
+ except BaseException as e:
+ self.cursor_strategy.handle_exception(self, self.cursor, e)
+
+ @property
+ def returns_rows(self):
+ """True if this :class:`_engine.CursorResult` returns zero or more
+ rows.
+
+ I.e. if it is legal to call the methods
+ :meth:`_engine.CursorResult.fetchone`,
+ :meth:`_engine.CursorResult.fetchmany`
+ :meth:`_engine.CursorResult.fetchall`.
+
+ Overall, the value of :attr:`_engine.CursorResult.returns_rows` should
+ always be synonymous with whether or not the DBAPI cursor had a
+ ``.description`` attribute, indicating the presence of result columns,
+ noting that a cursor that returns zero rows still has a
+ ``.description`` if a row-returning statement was emitted.
+
+ This attribute should be True for all results that are against
+ SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE
+ that use RETURNING. For INSERT/UPDATE/DELETE statements that were
+ not using RETURNING, the value will usually be False, however
+ there are some dialect-specific exceptions to this, such as when
+ using the MSSQL / pyodbc dialect a SELECT is emitted inline in
+ order to retrieve an inserted primary key value.
+
+
+ """
+ return self._metadata.returns_rows
+
+ @property
+ def is_insert(self):
+ """True if this :class:`_engine.CursorResult` is the result
+ of a executing an expression language compiled
+ :func:`_expression.insert` construct.
+
+ When True, this implies that the
+ :attr:`inserted_primary_key` attribute is accessible,
+ assuming the statement did not include
+ a user defined "returning" construct.
+
+ """
+ return self.context.isinsert
+
+ def _fetchiter_impl(self):
+ fetchone = self.cursor_strategy.fetchone
+
+ while True:
+ row = fetchone(self, self.cursor)
+ if row is None:
+ break
+ yield row
+
+ def _fetchone_impl(self, hard_close=False):
+ return self.cursor_strategy.fetchone(self, self.cursor, hard_close)
+
+ def _fetchall_impl(self):
+ return self.cursor_strategy.fetchall(self, self.cursor)
+
+ def _fetchmany_impl(self, size=None):
+ return self.cursor_strategy.fetchmany(self, self.cursor, size)
+
+ def _raw_row_iterator(self):
+ return self._fetchiter_impl()
+
+ def merge(self, *others: Result[Any]) -> MergedResult[Any]:
+ merged_result = super().merge(*others)
+ if self.context._has_rowcount:
+ merged_result.rowcount = sum(
+ cast("CursorResult[Any]", result).rowcount
+ for result in (self,) + others
+ )
+ return merged_result
+
+ def close(self) -> Any:
+ """Close this :class:`_engine.CursorResult`.
+
+ This closes out the underlying DBAPI cursor corresponding to the
+ statement execution, if one is still present. Note that the DBAPI
+ cursor is automatically released when the :class:`_engine.CursorResult`
+ exhausts all available rows. :meth:`_engine.CursorResult.close` is
+ generally an optional method except in the case when discarding a
+ :class:`_engine.CursorResult` that still has additional rows pending
+ for fetch.
+
+ After this method is called, it is no longer valid to call upon
+ the fetch methods, which will raise a :class:`.ResourceClosedError`
+ on subsequent use.
+
+ .. seealso::
+
+ :ref:`connections_toplevel`
+
+ """
+ self._soft_close(hard=True)
+
+ @_generative
+ def yield_per(self, num: int) -> Self:
+ self._yield_per = num
+ self.cursor_strategy.yield_per(self, self.cursor, num)
+ return self
+
+
+ResultProxy = CursorResult