From 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:10:44 -0400 Subject: venv --- .../site-packages/sqlalchemy/engine/cursor.py | 2178 ++++++++++++++++++++ 1 file changed, 2178 insertions(+) create mode 100644 venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py') diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py b/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py new file mode 100644 index 0000000..71767db --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py @@ -0,0 +1,2178 @@ +# engine/cursor.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: allow-untyped-defs, allow-untyped-calls + +"""Define cursor-specific result set constructs including +:class:`.CursorResult`.""" + + +from __future__ import annotations + +import collections +import functools +import operator +import typing +from typing import Any +from typing import cast +from typing import ClassVar +from typing import Dict +from typing import Iterator +from typing import List +from typing import Mapping +from typing import NoReturn +from typing import Optional +from typing import Sequence +from typing import Tuple +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union + +from .result import IteratorResult +from .result import MergedResult +from .result import Result +from .result import ResultMetaData +from .result import SimpleResultMetaData +from .result import tuplegetter +from .row import Row +from .. import exc +from .. import util +from ..sql import elements +from ..sql import sqltypes +from ..sql import util as sql_util +from ..sql.base import _generative +from ..sql.compiler import ResultColumnsEntry +from ..sql.compiler import RM_NAME +from ..sql.compiler import RM_OBJECTS +from ..sql.compiler import RM_RENDERED_NAME +from ..sql.compiler import RM_TYPE +from ..sql.type_api import TypeEngine +from ..util import compat +from ..util.typing import Literal +from ..util.typing import Self + + +if typing.TYPE_CHECKING: + from .base import Connection + from .default import DefaultExecutionContext + from .interfaces import _DBAPICursorDescription + from .interfaces import DBAPICursor + from .interfaces import Dialect + from .interfaces import ExecutionContext + from .result import _KeyIndexType + from .result import _KeyMapRecType + from .result import _KeyMapType + from .result import _KeyType + from .result import _ProcessorsType + from .result import _TupleGetterType + from ..sql.type_api import _ResultProcessorType + + +_T = TypeVar("_T", bound=Any) + + +# metadata entry tuple indexes. +# using raw tuple is faster than namedtuple. +# these match up to the positions in +# _CursorKeyMapRecType +MD_INDEX: Literal[0] = 0 +"""integer index in cursor.description + +""" + +MD_RESULT_MAP_INDEX: Literal[1] = 1 +"""integer index in compiled._result_columns""" + +MD_OBJECTS: Literal[2] = 2 +"""other string keys and ColumnElement obj that can match. + +This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects + +""" + +MD_LOOKUP_KEY: Literal[3] = 3 +"""string key we usually expect for key-based lookup + +this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name +""" + + +MD_RENDERED_NAME: Literal[4] = 4 +"""name that is usually in cursor.description + +this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname +""" + + +MD_PROCESSOR: Literal[5] = 5 +"""callable to process a result value into a row""" + +MD_UNTRANSLATED: Literal[6] = 6 +"""raw name from cursor.description""" + + +_CursorKeyMapRecType = Tuple[ + Optional[int], # MD_INDEX, None means the record is ambiguously named + int, # MD_RESULT_MAP_INDEX + List[Any], # MD_OBJECTS + str, # MD_LOOKUP_KEY + str, # MD_RENDERED_NAME + Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR + Optional[str], # MD_UNTRANSLATED +] + +_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType] + +# same as _CursorKeyMapRecType except the MD_INDEX value is definitely +# not None +_NonAmbigCursorKeyMapRecType = Tuple[ + int, + int, + List[Any], + str, + str, + Optional["_ResultProcessorType[Any]"], + str, +] + + +class CursorResultMetaData(ResultMetaData): + """Result metadata for DBAPI cursors.""" + + __slots__ = ( + "_keymap", + "_processors", + "_keys", + "_keymap_by_result_column_idx", + "_tuplefilter", + "_translated_indexes", + "_safe_for_cache", + "_unpickled", + "_key_to_index", + # don't need _unique_filters support here for now. Can be added + # if a need arises. + ) + + _keymap: _CursorKeyMapType + _processors: _ProcessorsType + _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]] + _unpickled: bool + _safe_for_cache: bool + _translated_indexes: Optional[List[int]] + + returns_rows: ClassVar[bool] = True + + def _has_key(self, key: Any) -> bool: + return key in self._keymap + + def _for_freeze(self) -> ResultMetaData: + return SimpleResultMetaData( + self._keys, + extra=[self._keymap[key][MD_OBJECTS] for key in self._keys], + ) + + def _make_new_metadata( + self, + *, + unpickled: bool, + processors: _ProcessorsType, + keys: Sequence[str], + keymap: _KeyMapType, + tuplefilter: Optional[_TupleGetterType], + translated_indexes: Optional[List[int]], + safe_for_cache: bool, + keymap_by_result_column_idx: Any, + ) -> CursorResultMetaData: + new_obj = self.__class__.__new__(self.__class__) + new_obj._unpickled = unpickled + new_obj._processors = processors + new_obj._keys = keys + new_obj._keymap = keymap + new_obj._tuplefilter = tuplefilter + new_obj._translated_indexes = translated_indexes + new_obj._safe_for_cache = safe_for_cache + new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx + new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX) + return new_obj + + def _remove_processors(self) -> CursorResultMetaData: + assert not self._tuplefilter + return self._make_new_metadata( + unpickled=self._unpickled, + processors=[None] * len(self._processors), + tuplefilter=None, + translated_indexes=None, + keymap={ + key: value[0:5] + (None,) + value[6:] + for key, value in self._keymap.items() + }, + keys=self._keys, + safe_for_cache=self._safe_for_cache, + keymap_by_result_column_idx=self._keymap_by_result_column_idx, + ) + + def _splice_horizontally( + self, other: CursorResultMetaData + ) -> CursorResultMetaData: + assert not self._tuplefilter + + keymap = dict(self._keymap) + offset = len(self._keys) + keymap.update( + { + key: ( + # int index should be None for ambiguous key + ( + value[0] + offset + if value[0] is not None and key not in keymap + else None + ), + value[1] + offset, + *value[2:], + ) + for key, value in other._keymap.items() + } + ) + return self._make_new_metadata( + unpickled=self._unpickled, + processors=self._processors + other._processors, # type: ignore + tuplefilter=None, + translated_indexes=None, + keys=self._keys + other._keys, # type: ignore + keymap=keymap, + safe_for_cache=self._safe_for_cache, + keymap_by_result_column_idx={ + metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry + for metadata_entry in keymap.values() + }, + ) + + def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData: + recs = list(self._metadata_for_keys(keys)) + + indexes = [rec[MD_INDEX] for rec in recs] + new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs] + + if self._translated_indexes: + indexes = [self._translated_indexes[idx] for idx in indexes] + tup = tuplegetter(*indexes) + new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)] + + keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} + # TODO: need unit test for: + # result = connection.execute("raw sql, no columns").scalars() + # without the "or ()" it's failing because MD_OBJECTS is None + keymap.update( + (e, new_rec) + for new_rec in new_recs + for e in new_rec[MD_OBJECTS] or () + ) + + return self._make_new_metadata( + unpickled=self._unpickled, + processors=self._processors, + keys=new_keys, + tuplefilter=tup, + translated_indexes=indexes, + keymap=keymap, # type: ignore[arg-type] + safe_for_cache=self._safe_for_cache, + keymap_by_result_column_idx=self._keymap_by_result_column_idx, + ) + + def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData: + """When using a cached Compiled construct that has a _result_map, + for a new statement that used the cached Compiled, we need to ensure + the keymap has the Column objects from our new statement as keys. + So here we rewrite keymap with new entries for the new columns + as matched to those of the cached statement. + + """ + + if not context.compiled or not context.compiled._result_columns: + return self + + compiled_statement = context.compiled.statement + invoked_statement = context.invoked_statement + + if TYPE_CHECKING: + assert isinstance(invoked_statement, elements.ClauseElement) + + if compiled_statement is invoked_statement: + return self + + assert invoked_statement is not None + + # this is the most common path for Core statements when + # caching is used. In ORM use, this codepath is not really used + # as the _result_disable_adapt_to_context execution option is + # set by the ORM. + + # make a copy and add the columns from the invoked statement + # to the result map. + + keymap_by_position = self._keymap_by_result_column_idx + + if keymap_by_position is None: + # first retrival from cache, this map will not be set up yet, + # initialize lazily + keymap_by_position = self._keymap_by_result_column_idx = { + metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry + for metadata_entry in self._keymap.values() + } + + assert not self._tuplefilter + return self._make_new_metadata( + keymap=compat.dict_union( + self._keymap, + { + new: keymap_by_position[idx] + for idx, new in enumerate( + invoked_statement._all_selected_columns + ) + if idx in keymap_by_position + }, + ), + unpickled=self._unpickled, + processors=self._processors, + tuplefilter=None, + translated_indexes=None, + keys=self._keys, + safe_for_cache=self._safe_for_cache, + keymap_by_result_column_idx=self._keymap_by_result_column_idx, + ) + + def __init__( + self, + parent: CursorResult[Any], + cursor_description: _DBAPICursorDescription, + ): + context = parent.context + self._tuplefilter = None + self._translated_indexes = None + self._safe_for_cache = self._unpickled = False + + if context.result_column_struct: + ( + result_columns, + cols_are_ordered, + textual_ordered, + ad_hoc_textual, + loose_column_name_matching, + ) = context.result_column_struct + num_ctx_cols = len(result_columns) + else: + result_columns = cols_are_ordered = ( # type: ignore + num_ctx_cols + ) = ad_hoc_textual = loose_column_name_matching = ( + textual_ordered + ) = False + + # merge cursor.description with the column info + # present in the compiled structure, if any + raw = self._merge_cursor_description( + context, + cursor_description, + result_columns, + num_ctx_cols, + cols_are_ordered, + textual_ordered, + ad_hoc_textual, + loose_column_name_matching, + ) + + # processors in key order which are used when building up + # a row + self._processors = [ + metadata_entry[MD_PROCESSOR] for metadata_entry in raw + ] + + # this is used when using this ResultMetaData in a Core-only cache + # retrieval context. it's initialized on first cache retrieval + # when the _result_disable_adapt_to_context execution option + # (which the ORM generally sets) is not set. + self._keymap_by_result_column_idx = None + + # for compiled SQL constructs, copy additional lookup keys into + # the key lookup map, such as Column objects, labels, + # column keys and other names + if num_ctx_cols: + # keymap by primary string... + by_key = { + metadata_entry[MD_LOOKUP_KEY]: metadata_entry + for metadata_entry in raw + } + + if len(by_key) != num_ctx_cols: + # if by-primary-string dictionary smaller than + # number of columns, assume we have dupes; (this check + # is also in place if string dictionary is bigger, as + # can occur when '*' was used as one of the compiled columns, + # which may or may not be suggestive of dupes), rewrite + # dupe records with "None" for index which results in + # ambiguous column exception when accessed. + # + # this is considered to be the less common case as it is not + # common to have dupe column keys in a SELECT statement. + # + # new in 1.4: get the complete set of all possible keys, + # strings, objects, whatever, that are dupes across two + # different records, first. + index_by_key: Dict[Any, Any] = {} + dupes = set() + for metadata_entry in raw: + for key in (metadata_entry[MD_RENDERED_NAME],) + ( + metadata_entry[MD_OBJECTS] or () + ): + idx = metadata_entry[MD_INDEX] + # if this key has been associated with more than one + # positional index, it's a dupe + if index_by_key.setdefault(key, idx) != idx: + dupes.add(key) + + # then put everything we have into the keymap excluding only + # those keys that are dupes. + self._keymap = { + obj_elem: metadata_entry + for metadata_entry in raw + if metadata_entry[MD_OBJECTS] + for obj_elem in metadata_entry[MD_OBJECTS] + if obj_elem not in dupes + } + + # then for the dupe keys, put the "ambiguous column" + # record into by_key. + by_key.update( + { + key: (None, None, [], key, key, None, None) + for key in dupes + } + ) + + else: + # no dupes - copy secondary elements from compiled + # columns into self._keymap. this is the most common + # codepath for Core / ORM statement executions before the + # result metadata is cached + self._keymap = { + obj_elem: metadata_entry + for metadata_entry in raw + if metadata_entry[MD_OBJECTS] + for obj_elem in metadata_entry[MD_OBJECTS] + } + # update keymap with primary string names taking + # precedence + self._keymap.update(by_key) + else: + # no compiled objects to map, just create keymap by primary string + self._keymap = { + metadata_entry[MD_LOOKUP_KEY]: metadata_entry + for metadata_entry in raw + } + + # update keymap with "translated" names. In SQLAlchemy this is a + # sqlite only thing, and in fact impacting only extremely old SQLite + # versions unlikely to be present in modern Python versions. + # however, the pyhive third party dialect is + # also using this hook, which means others still might use it as well. + # I dislike having this awkward hook here but as long as we need + # to use names in cursor.description in some cases we need to have + # some hook to accomplish this. + if not num_ctx_cols and context._translate_colname: + self._keymap.update( + { + metadata_entry[MD_UNTRANSLATED]: self._keymap[ + metadata_entry[MD_LOOKUP_KEY] + ] + for metadata_entry in raw + if metadata_entry[MD_UNTRANSLATED] + } + ) + + self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) + + def _merge_cursor_description( + self, + context, + cursor_description, + result_columns, + num_ctx_cols, + cols_are_ordered, + textual_ordered, + ad_hoc_textual, + loose_column_name_matching, + ): + """Merge a cursor.description with compiled result column information. + + There are at least four separate strategies used here, selected + depending on the type of SQL construct used to start with. + + The most common case is that of the compiled SQL expression construct, + which generated the column names present in the raw SQL string and + which has the identical number of columns as were reported by + cursor.description. In this case, we assume a 1-1 positional mapping + between the entries in cursor.description and the compiled object. + This is also the most performant case as we disregard extracting / + decoding the column names present in cursor.description since we + already have the desired name we generated in the compiled SQL + construct. + + The next common case is that of the completely raw string SQL, + such as passed to connection.execute(). In this case we have no + compiled construct to work with, so we extract and decode the + names from cursor.description and index those as the primary + result row target keys. + + The remaining fairly common case is that of the textual SQL + that includes at least partial column information; this is when + we use a :class:`_expression.TextualSelect` construct. + This construct may have + unordered or ordered column information. In the ordered case, we + merge the cursor.description and the compiled construct's information + positionally, and warn if there are additional description names + present, however we still decode the names in cursor.description + as we don't have a guarantee that the names in the columns match + on these. In the unordered case, we match names in cursor.description + to that of the compiled construct based on name matching. + In both of these cases, the cursor.description names and the column + expression objects and names are indexed as result row target keys. + + The final case is much less common, where we have a compiled + non-textual SQL expression construct, but the number of columns + in cursor.description doesn't match what's in the compiled + construct. We make the guess here that there might be textual + column expressions in the compiled construct that themselves include + a comma in them causing them to split. We do the same name-matching + as with textual non-ordered columns. + + The name-matched system of merging is the same as that used by + SQLAlchemy for all cases up through the 0.9 series. Positional + matching for compiled SQL expressions was introduced in 1.0 as a + major performance feature, and positional matching for textual + :class:`_expression.TextualSelect` objects in 1.1. + As name matching is no longer + a common case, it was acceptable to factor it into smaller generator- + oriented methods that are easier to understand, but incur slightly + more performance overhead. + + """ + + if ( + num_ctx_cols + and cols_are_ordered + and not textual_ordered + and num_ctx_cols == len(cursor_description) + ): + self._keys = [elem[0] for elem in result_columns] + # pure positional 1-1 case; doesn't need to read + # the names from cursor.description + + # most common case for Core and ORM + + # this metadata is safe to cache because we are guaranteed + # to have the columns in the same order for new executions + self._safe_for_cache = True + return [ + ( + idx, + idx, + rmap_entry[RM_OBJECTS], + rmap_entry[RM_NAME], + rmap_entry[RM_RENDERED_NAME], + context.get_result_processor( + rmap_entry[RM_TYPE], + rmap_entry[RM_RENDERED_NAME], + cursor_description[idx][1], + ), + None, + ) + for idx, rmap_entry in enumerate(result_columns) + ] + else: + # name-based or text-positional cases, where we need + # to read cursor.description names + + if textual_ordered or ( + ad_hoc_textual and len(cursor_description) == num_ctx_cols + ): + self._safe_for_cache = True + # textual positional case + raw_iterator = self._merge_textual_cols_by_position( + context, cursor_description, result_columns + ) + elif num_ctx_cols: + # compiled SQL with a mismatch of description cols + # vs. compiled cols, or textual w/ unordered columns + # the order of columns can change if the query is + # against a "select *", so not safe to cache + self._safe_for_cache = False + raw_iterator = self._merge_cols_by_name( + context, + cursor_description, + result_columns, + loose_column_name_matching, + ) + else: + # no compiled SQL, just a raw string, order of columns + # can change for "select *" + self._safe_for_cache = False + raw_iterator = self._merge_cols_by_none( + context, cursor_description + ) + + return [ + ( + idx, + ridx, + obj, + cursor_colname, + cursor_colname, + context.get_result_processor( + mapped_type, cursor_colname, coltype + ), + untranslated, + ) + for ( + idx, + ridx, + cursor_colname, + mapped_type, + coltype, + obj, + untranslated, + ) in raw_iterator + ] + + def _colnames_from_description(self, context, cursor_description): + """Extract column names and data types from a cursor.description. + + Applies unicode decoding, column translation, "normalization", + and case sensitivity rules to the names based on the dialect. + + """ + + dialect = context.dialect + translate_colname = context._translate_colname + normalize_name = ( + dialect.normalize_name if dialect.requires_name_normalize else None + ) + untranslated = None + + self._keys = [] + + for idx, rec in enumerate(cursor_description): + colname = rec[0] + coltype = rec[1] + + if translate_colname: + colname, untranslated = translate_colname(colname) + + if normalize_name: + colname = normalize_name(colname) + + self._keys.append(colname) + + yield idx, colname, untranslated, coltype + + def _merge_textual_cols_by_position( + self, context, cursor_description, result_columns + ): + num_ctx_cols = len(result_columns) + + if num_ctx_cols > len(cursor_description): + util.warn( + "Number of columns in textual SQL (%d) is " + "smaller than number of columns requested (%d)" + % (num_ctx_cols, len(cursor_description)) + ) + seen = set() + for ( + idx, + colname, + untranslated, + coltype, + ) in self._colnames_from_description(context, cursor_description): + if idx < num_ctx_cols: + ctx_rec = result_columns[idx] + obj = ctx_rec[RM_OBJECTS] + ridx = idx + mapped_type = ctx_rec[RM_TYPE] + if obj[0] in seen: + raise exc.InvalidRequestError( + "Duplicate column expression requested " + "in textual SQL: %r" % obj[0] + ) + seen.add(obj[0]) + else: + mapped_type = sqltypes.NULLTYPE + obj = None + ridx = None + yield idx, ridx, colname, mapped_type, coltype, obj, untranslated + + def _merge_cols_by_name( + self, + context, + cursor_description, + result_columns, + loose_column_name_matching, + ): + match_map = self._create_description_match_map( + result_columns, loose_column_name_matching + ) + mapped_type: TypeEngine[Any] + + for ( + idx, + colname, + untranslated, + coltype, + ) in self._colnames_from_description(context, cursor_description): + try: + ctx_rec = match_map[colname] + except KeyError: + mapped_type = sqltypes.NULLTYPE + obj = None + result_columns_idx = None + else: + obj = ctx_rec[1] + mapped_type = ctx_rec[2] + result_columns_idx = ctx_rec[3] + yield ( + idx, + result_columns_idx, + colname, + mapped_type, + coltype, + obj, + untranslated, + ) + + @classmethod + def _create_description_match_map( + cls, + result_columns: List[ResultColumnsEntry], + loose_column_name_matching: bool = False, + ) -> Dict[ + Union[str, object], Tuple[str, Tuple[Any, ...], TypeEngine[Any], int] + ]: + """when matching cursor.description to a set of names that are present + in a Compiled object, as is the case with TextualSelect, get all the + names we expect might match those in cursor.description. + """ + + d: Dict[ + Union[str, object], + Tuple[str, Tuple[Any, ...], TypeEngine[Any], int], + ] = {} + for ridx, elem in enumerate(result_columns): + key = elem[RM_RENDERED_NAME] + if key in d: + # conflicting keyname - just add the column-linked objects + # to the existing record. if there is a duplicate column + # name in the cursor description, this will allow all of those + # objects to raise an ambiguous column error + e_name, e_obj, e_type, e_ridx = d[key] + d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx + else: + d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx) + + if loose_column_name_matching: + # when using a textual statement with an unordered set + # of columns that line up, we are expecting the user + # to be using label names in the SQL that match to the column + # expressions. Enable more liberal matching for this case; + # duplicate keys that are ambiguous will be fixed later. + for r_key in elem[RM_OBJECTS]: + d.setdefault( + r_key, + (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx), + ) + return d + + def _merge_cols_by_none(self, context, cursor_description): + for ( + idx, + colname, + untranslated, + coltype, + ) in self._colnames_from_description(context, cursor_description): + yield ( + idx, + None, + colname, + sqltypes.NULLTYPE, + coltype, + None, + untranslated, + ) + + if not TYPE_CHECKING: + + def _key_fallback( + self, key: Any, err: Optional[Exception], raiseerr: bool = True + ) -> Optional[NoReturn]: + if raiseerr: + if self._unpickled and isinstance(key, elements.ColumnElement): + raise exc.NoSuchColumnError( + "Row was unpickled; lookup by ColumnElement " + "is unsupported" + ) from err + else: + raise exc.NoSuchColumnError( + "Could not locate column in row for column '%s'" + % util.string_or_unprintable(key) + ) from err + else: + return None + + def _raise_for_ambiguous_column_name(self, rec): + raise exc.InvalidRequestError( + "Ambiguous column name '%s' in " + "result set column descriptions" % rec[MD_LOOKUP_KEY] + ) + + def _index_for_key(self, key: Any, raiseerr: bool = True) -> Optional[int]: + # TODO: can consider pre-loading ints and negative ints + # into _keymap - also no coverage here + if isinstance(key, int): + key = self._keys[key] + + try: + rec = self._keymap[key] + except KeyError as ke: + x = self._key_fallback(key, ke, raiseerr) + assert x is None + return None + + index = rec[0] + + if index is None: + self._raise_for_ambiguous_column_name(rec) + return index + + def _indexes_for_keys(self, keys): + try: + return [self._keymap[key][0] for key in keys] + except KeyError as ke: + # ensure it raises + CursorResultMetaData._key_fallback(self, ke.args[0], ke) + + def _metadata_for_keys( + self, keys: Sequence[Any] + ) -> Iterator[_NonAmbigCursorKeyMapRecType]: + for key in keys: + if int in key.__class__.__mro__: + key = self._keys[key] + + try: + rec = self._keymap[key] + except KeyError as ke: + # ensure it raises + CursorResultMetaData._key_fallback(self, ke.args[0], ke) + + index = rec[MD_INDEX] + + if index is None: + self._raise_for_ambiguous_column_name(rec) + + yield cast(_NonAmbigCursorKeyMapRecType, rec) + + def __getstate__(self): + # TODO: consider serializing this as SimpleResultMetaData + return { + "_keymap": { + key: ( + rec[MD_INDEX], + rec[MD_RESULT_MAP_INDEX], + [], + key, + rec[MD_RENDERED_NAME], + None, + None, + ) + for key, rec in self._keymap.items() + if isinstance(key, (str, int)) + }, + "_keys": self._keys, + "_translated_indexes": self._translated_indexes, + } + + def __setstate__(self, state): + self._processors = [None for _ in range(len(state["_keys"]))] + self._keymap = state["_keymap"] + self._keymap_by_result_column_idx = None + self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) + self._keys = state["_keys"] + self._unpickled = True + if state["_translated_indexes"]: + self._translated_indexes = cast( + "List[int]", state["_translated_indexes"] + ) + self._tuplefilter = tuplegetter(*self._translated_indexes) + else: + self._translated_indexes = self._tuplefilter = None + + +class ResultFetchStrategy: + """Define a fetching strategy for a result object. + + + .. versionadded:: 1.4 + + """ + + __slots__ = () + + alternate_cursor_description: Optional[_DBAPICursorDescription] = None + + def soft_close( + self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] + ) -> None: + raise NotImplementedError() + + def hard_close( + self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] + ) -> None: + raise NotImplementedError() + + def yield_per( + self, + result: CursorResult[Any], + dbapi_cursor: Optional[DBAPICursor], + num: int, + ) -> None: + return + + def fetchone( + self, + result: CursorResult[Any], + dbapi_cursor: DBAPICursor, + hard_close: bool = False, + ) -> Any: + raise NotImplementedError() + + def fetchmany( + self, + result: CursorResult[Any], + dbapi_cursor: DBAPICursor, + size: Optional[int] = None, + ) -> Any: + raise NotImplementedError() + + def fetchall( + self, + result: CursorResult[Any], + dbapi_cursor: DBAPICursor, + ) -> Any: + raise NotImplementedError() + + def handle_exception( + self, + result: CursorResult[Any], + dbapi_cursor: Optional[DBAPICursor], + err: BaseException, + ) -> NoReturn: + raise err + + +class NoCursorFetchStrategy(ResultFetchStrategy): + """Cursor strategy for a result that has no open cursor. + + There are two varieties of this strategy, one for DQL and one for + DML (and also DDL), each of which represent a result that had a cursor + but no longer has one. + + """ + + __slots__ = () + + def soft_close(self, result, dbapi_cursor): + pass + + def hard_close(self, result, dbapi_cursor): + pass + + def fetchone(self, result, dbapi_cursor, hard_close=False): + return self._non_result(result, None) + + def fetchmany(self, result, dbapi_cursor, size=None): + return self._non_result(result, []) + + def fetchall(self, result, dbapi_cursor): + return self._non_result(result, []) + + def _non_result(self, result, default, err=None): + raise NotImplementedError() + + +class NoCursorDQLFetchStrategy(NoCursorFetchStrategy): + """Cursor strategy for a DQL result that has no open cursor. + + This is a result set that can return rows, i.e. for a SELECT, or for an + INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state + where the cursor is closed and no rows remain available. The owning result + object may or may not be "hard closed", which determines if the fetch + methods send empty results or raise for closed result. + + """ + + __slots__ = () + + def _non_result(self, result, default, err=None): + if result.closed: + raise exc.ResourceClosedError( + "This result object is closed." + ) from err + else: + return default + + +_NO_CURSOR_DQL = NoCursorDQLFetchStrategy() + + +class NoCursorDMLFetchStrategy(NoCursorFetchStrategy): + """Cursor strategy for a DML result that has no open cursor. + + This is a result set that does not return rows, i.e. for an INSERT, + UPDATE, DELETE that does not include RETURNING. + + """ + + __slots__ = () + + def _non_result(self, result, default, err=None): + # we only expect to have a _NoResultMetaData() here right now. + assert not result._metadata.returns_rows + result._metadata._we_dont_return_rows(err) + + +_NO_CURSOR_DML = NoCursorDMLFetchStrategy() + + +class CursorFetchStrategy(ResultFetchStrategy): + """Call fetch methods from a DBAPI cursor. + + Alternate versions of this class may instead buffer the rows from + cursors or not use cursors at all. + + """ + + __slots__ = () + + def soft_close( + self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] + ) -> None: + result.cursor_strategy = _NO_CURSOR_DQL + + def hard_close( + self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] + ) -> None: + result.cursor_strategy = _NO_CURSOR_DQL + + def handle_exception( + self, + result: CursorResult[Any], + dbapi_cursor: Optional[DBAPICursor], + err: BaseException, + ) -> NoReturn: + result.connection._handle_dbapi_exception( + err, None, None, dbapi_cursor, result.context + ) + + def yield_per( + self, + result: CursorResult[Any], + dbapi_cursor: Optional[DBAPICursor], + num: int, + ) -> None: + result.cursor_strategy = BufferedRowCursorFetchStrategy( + dbapi_cursor, + {"max_row_buffer": num}, + initial_buffer=collections.deque(), + growth_factor=0, + ) + + def fetchone( + self, + result: CursorResult[Any], + dbapi_cursor: DBAPICursor, + hard_close: bool = False, + ) -> Any: + try: + row = dbapi_cursor.fetchone() + if row is None: + result._soft_close(hard=hard_close) + return row + except BaseException as e: + self.handle_exception(result, dbapi_cursor, e) + + def fetchmany( + self, + result: CursorResult[Any], + dbapi_cursor: DBAPICursor, + size: Optional[int] = None, + ) -> Any: + try: + if size is None: + l = dbapi_cursor.fetchmany() + else: + l = dbapi_cursor.fetchmany(size) + + if not l: + result._soft_close() + return l + except BaseException as e: + self.handle_exception(result, dbapi_cursor, e) + + def fetchall( + self, + result: CursorResult[Any], + dbapi_cursor: DBAPICursor, + ) -> Any: + try: + rows = dbapi_cursor.fetchall() + result._soft_close() + return rows + except BaseException as e: + self.handle_exception(result, dbapi_cursor, e) + + +_DEFAULT_FETCH = CursorFetchStrategy() + + +class BufferedRowCursorFetchStrategy(CursorFetchStrategy): + """A cursor fetch strategy with row buffering behavior. + + This strategy buffers the contents of a selection of rows + before ``fetchone()`` is called. This is to allow the results of + ``cursor.description`` to be available immediately, when + interfacing with a DB-API that requires rows to be consumed before + this information is available (currently psycopg2, when used with + server-side cursors). + + The pre-fetching behavior fetches only one row initially, and then + grows its buffer size by a fixed amount with each successive need + for additional rows up the ``max_row_buffer`` size, which defaults + to 1000:: + + with psycopg2_engine.connect() as conn: + + result = conn.execution_options( + stream_results=True, max_row_buffer=50 + ).execute(text("select * from table")) + + .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. + + .. seealso:: + + :ref:`psycopg2_execution_options` + """ + + __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor") + + def __init__( + self, + dbapi_cursor, + execution_options, + growth_factor=5, + initial_buffer=None, + ): + self._max_row_buffer = execution_options.get("max_row_buffer", 1000) + + if initial_buffer is not None: + self._rowbuffer = initial_buffer + else: + self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) + self._growth_factor = growth_factor + + if growth_factor: + self._bufsize = min(self._max_row_buffer, self._growth_factor) + else: + self._bufsize = self._max_row_buffer + + @classmethod + def create(cls, result): + return BufferedRowCursorFetchStrategy( + result.cursor, + result.context.execution_options, + ) + + def _buffer_rows(self, result, dbapi_cursor): + """this is currently used only by fetchone().""" + + size = self._bufsize + try: + if size < 1: + new_rows = dbapi_cursor.fetchall() + else: + new_rows = dbapi_cursor.fetchmany(size) + except BaseException as e: + self.handle_exception(result, dbapi_cursor, e) + + if not new_rows: + return + self._rowbuffer = collections.deque(new_rows) + if self._growth_factor and size < self._max_row_buffer: + self._bufsize = min( + self._max_row_buffer, size * self._growth_factor + ) + + def yield_per(self, result, dbapi_cursor, num): + self._growth_factor = 0 + self._max_row_buffer = self._bufsize = num + + def soft_close(self, result, dbapi_cursor): + self._rowbuffer.clear() + super().soft_close(result, dbapi_cursor) + + def hard_close(self, result, dbapi_cursor): + self._rowbuffer.clear() + super().hard_close(result, dbapi_cursor) + + def fetchone(self, result, dbapi_cursor, hard_close=False): + if not self._rowbuffer: + self._buffer_rows(result, dbapi_cursor) + if not self._rowbuffer: + try: + result._soft_close(hard=hard_close) + except BaseException as e: + self.handle_exception(result, dbapi_cursor, e) + return None + return self._rowbuffer.popleft() + + def fetchmany(self, result, dbapi_cursor, size=None): + if size is None: + return self.fetchall(result, dbapi_cursor) + + buf = list(self._rowbuffer) + lb = len(buf) + if size > lb: + try: + new = dbapi_cursor.fetchmany(size - lb) + except BaseException as e: + self.handle_exception(result, dbapi_cursor, e) + else: + if not new: + result._soft_close() + else: + buf.extend(new) + + result = buf[0:size] + self._rowbuffer = collections.deque(buf[size:]) + return result + + def fetchall(self, result, dbapi_cursor): + try: + ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) + self._rowbuffer.clear() + result._soft_close() + return ret + except BaseException as e: + self.handle_exception(result, dbapi_cursor, e) + + +class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): + """A cursor strategy that buffers rows fully upon creation. + + Used for operations where a result is to be delivered + after the database conversation can not be continued, + such as MSSQL INSERT...OUTPUT after an autocommit. + + """ + + __slots__ = ("_rowbuffer", "alternate_cursor_description") + + def __init__( + self, dbapi_cursor, alternate_description=None, initial_buffer=None + ): + self.alternate_cursor_description = alternate_description + if initial_buffer is not None: + self._rowbuffer = collections.deque(initial_buffer) + else: + self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) + + def yield_per(self, result, dbapi_cursor, num): + pass + + def soft_close(self, result, dbapi_cursor): + self._rowbuffer.clear() + super().soft_close(result, dbapi_cursor) + + def hard_close(self, result, dbapi_cursor): + self._rowbuffer.clear() + super().hard_close(result, dbapi_cursor) + + def fetchone(self, result, dbapi_cursor, hard_close=False): + if self._rowbuffer: + return self._rowbuffer.popleft() + else: + result._soft_close(hard=hard_close) + return None + + def fetchmany(self, result, dbapi_cursor, size=None): + if size is None: + return self.fetchall(result, dbapi_cursor) + + buf = list(self._rowbuffer) + rows = buf[0:size] + self._rowbuffer = collections.deque(buf[size:]) + if not rows: + result._soft_close() + return rows + + def fetchall(self, result, dbapi_cursor): + ret = self._rowbuffer + self._rowbuffer = collections.deque() + result._soft_close() + return ret + + +class _NoResultMetaData(ResultMetaData): + __slots__ = () + + returns_rows = False + + def _we_dont_return_rows(self, err=None): + raise exc.ResourceClosedError( + "This result object does not return rows. " + "It has been closed automatically." + ) from err + + def _index_for_key(self, keys, raiseerr): + self._we_dont_return_rows() + + def _metadata_for_keys(self, key): + self._we_dont_return_rows() + + def _reduce(self, keys): + self._we_dont_return_rows() + + @property + def _keymap(self): + self._we_dont_return_rows() + + @property + def _key_to_index(self): + self._we_dont_return_rows() + + @property + def _processors(self): + self._we_dont_return_rows() + + @property + def keys(self): + self._we_dont_return_rows() + + +_NO_RESULT_METADATA = _NoResultMetaData() + + +def null_dml_result() -> IteratorResult[Any]: + it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([])) + it._soft_close() + return it + + +class CursorResult(Result[_T]): + """A Result that is representing state from a DBAPI cursor. + + .. versionchanged:: 1.4 The :class:`.CursorResult`` + class replaces the previous :class:`.ResultProxy` interface. + This classes are based on the :class:`.Result` calling API + which provides an updated usage model and calling facade for + SQLAlchemy Core and SQLAlchemy ORM. + + Returns database rows via the :class:`.Row` class, which provides + additional API features and behaviors on top of the raw data returned by + the DBAPI. Through the use of filters such as the :meth:`.Result.scalars` + method, other kinds of objects may also be returned. + + .. seealso:: + + :ref:`tutorial_selecting_data` - introductory material for accessing + :class:`_engine.CursorResult` and :class:`.Row` objects. + + """ + + __slots__ = ( + "context", + "dialect", + "cursor", + "cursor_strategy", + "_echo", + "connection", + ) + + _metadata: Union[CursorResultMetaData, _NoResultMetaData] + _no_result_metadata = _NO_RESULT_METADATA + _soft_closed: bool = False + closed: bool = False + _is_cursor = True + + context: DefaultExecutionContext + dialect: Dialect + cursor_strategy: ResultFetchStrategy + connection: Connection + + def __init__( + self, + context: DefaultExecutionContext, + cursor_strategy: ResultFetchStrategy, + cursor_description: Optional[_DBAPICursorDescription], + ): + self.context = context + self.dialect = context.dialect + self.cursor = context.cursor + self.cursor_strategy = cursor_strategy + self.connection = context.root_connection + self._echo = echo = ( + self.connection._echo and context.engine._should_log_debug() + ) + + if cursor_description is not None: + # inline of Result._row_getter(), set up an initial row + # getter assuming no transformations will be called as this + # is the most common case + + metadata = self._init_metadata(context, cursor_description) + + _make_row: Any + _make_row = functools.partial( + Row, + metadata, + metadata._effective_processors, + metadata._key_to_index, + ) + + if context._num_sentinel_cols: + sentinel_filter = operator.itemgetter( + slice(-context._num_sentinel_cols) + ) + + def _sliced_row(raw_data): + return _make_row(sentinel_filter(raw_data)) + + sliced_row = _sliced_row + else: + sliced_row = _make_row + + if echo: + log = self.context.connection._log_debug + + def _log_row(row): + log("Row %r", sql_util._repr_row(row)) + return row + + self._row_logging_fn = _log_row + + def _make_row_2(row): + return _log_row(sliced_row(row)) + + make_row = _make_row_2 + else: + make_row = sliced_row + self._set_memoized_attribute("_row_getter", make_row) + + else: + assert context._num_sentinel_cols == 0 + self._metadata = self._no_result_metadata + + def _init_metadata(self, context, cursor_description): + if context.compiled: + compiled = context.compiled + + if compiled._cached_metadata: + metadata = compiled._cached_metadata + else: + metadata = CursorResultMetaData(self, cursor_description) + if metadata._safe_for_cache: + compiled._cached_metadata = metadata + + # result rewrite/ adapt step. this is to suit the case + # when we are invoked against a cached Compiled object, we want + # to rewrite the ResultMetaData to reflect the Column objects + # that are in our current SQL statement object, not the one + # that is associated with the cached Compiled object. + # the Compiled object may also tell us to not + # actually do this step; this is to support the ORM where + # it is to produce a new Result object in any case, and will + # be using the cached Column objects against this database result + # so we don't want to rewrite them. + # + # Basically this step suits the use case where the end user + # is using Core SQL expressions and is accessing columns in the + # result row using row._mapping[table.c.column]. + if ( + not context.execution_options.get( + "_result_disable_adapt_to_context", False + ) + and compiled._result_columns + and context.cache_hit is context.dialect.CACHE_HIT + and compiled.statement is not context.invoked_statement + ): + metadata = metadata._adapt_to_context(context) + + self._metadata = metadata + + else: + self._metadata = metadata = CursorResultMetaData( + self, cursor_description + ) + if self._echo: + context.connection._log_debug( + "Col %r", tuple(x[0] for x in cursor_description) + ) + return metadata + + def _soft_close(self, hard=False): + """Soft close this :class:`_engine.CursorResult`. + + This releases all DBAPI cursor resources, but leaves the + CursorResult "open" from a semantic perspective, meaning the + fetchXXX() methods will continue to return empty results. + + This method is called automatically when: + + * all result rows are exhausted using the fetchXXX() methods. + * cursor.description is None. + + This method is **not public**, but is documented in order to clarify + the "autoclose" process used. + + .. seealso:: + + :meth:`_engine.CursorResult.close` + + + """ + + if (not hard and self._soft_closed) or (hard and self.closed): + return + + if hard: + self.closed = True + self.cursor_strategy.hard_close(self, self.cursor) + else: + self.cursor_strategy.soft_close(self, self.cursor) + + if not self._soft_closed: + cursor = self.cursor + self.cursor = None # type: ignore + self.connection._safe_close_cursor(cursor) + self._soft_closed = True + + @property + def inserted_primary_key_rows(self): + """Return the value of + :attr:`_engine.CursorResult.inserted_primary_key` + as a row contained within a list; some dialects may support a + multiple row form as well. + + .. note:: As indicated below, in current SQLAlchemy versions this + accessor is only useful beyond what's already supplied by + :attr:`_engine.CursorResult.inserted_primary_key` when using the + :ref:`postgresql_psycopg2` dialect. Future versions hope to + generalize this feature to more dialects. + + This accessor is added to support dialects that offer the feature + that is currently implemented by the :ref:`psycopg2_executemany_mode` + feature, currently **only the psycopg2 dialect**, which provides + for many rows to be INSERTed at once while still retaining the + behavior of being able to return server-generated primary key values. + + * **When using the psycopg2 dialect, or other dialects that may support + "fast executemany" style inserts in upcoming releases** : When + invoking an INSERT statement while passing a list of rows as the + second argument to :meth:`_engine.Connection.execute`, this accessor + will then provide a list of rows, where each row contains the primary + key value for each row that was INSERTed. + + * **When using all other dialects / backends that don't yet support + this feature**: This accessor is only useful for **single row INSERT + statements**, and returns the same information as that of the + :attr:`_engine.CursorResult.inserted_primary_key` within a + single-element list. When an INSERT statement is executed in + conjunction with a list of rows to be INSERTed, the list will contain + one row per row inserted in the statement, however it will contain + ``None`` for any server-generated values. + + Future releases of SQLAlchemy will further generalize the + "fast execution helper" feature of psycopg2 to suit other dialects, + thus allowing this accessor to be of more general use. + + .. versionadded:: 1.4 + + .. seealso:: + + :attr:`_engine.CursorResult.inserted_primary_key` + + """ + if not self.context.compiled: + raise exc.InvalidRequestError( + "Statement is not a compiled expression construct." + ) + elif not self.context.isinsert: + raise exc.InvalidRequestError( + "Statement is not an insert() expression construct." + ) + elif self.context._is_explicit_returning: + raise exc.InvalidRequestError( + "Can't call inserted_primary_key " + "when returning() " + "is used." + ) + return self.context.inserted_primary_key_rows + + @property + def inserted_primary_key(self): + """Return the primary key for the row just inserted. + + The return value is a :class:`_result.Row` object representing + a named tuple of primary key values in the order in which the + primary key columns are configured in the source + :class:`_schema.Table`. + + .. versionchanged:: 1.4.8 - the + :attr:`_engine.CursorResult.inserted_primary_key` + value is now a named tuple via the :class:`_result.Row` class, + rather than a plain tuple. + + This accessor only applies to single row :func:`_expression.insert` + constructs which did not explicitly specify + :meth:`_expression.Insert.returning`. Support for multirow inserts, + while not yet available for most backends, would be accessed using + the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor. + + Note that primary key columns which specify a server_default clause, or + otherwise do not qualify as "autoincrement" columns (see the notes at + :class:`_schema.Column`), and were generated using the database-side + default, will appear in this list as ``None`` unless the backend + supports "returning" and the insert statement executed with the + "implicit returning" enabled. + + Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed + statement is not a compiled expression construct + or is not an insert() construct. + + """ + + if self.context.executemany: + raise exc.InvalidRequestError( + "This statement was an executemany call; if primary key " + "returning is supported, please " + "use .inserted_primary_key_rows." + ) + + ikp = self.inserted_primary_key_rows + if ikp: + return ikp[0] + else: + return None + + def last_updated_params(self): + """Return the collection of updated parameters from this + execution. + + Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed + statement is not a compiled expression construct + or is not an update() construct. + + """ + if not self.context.compiled: + raise exc.InvalidRequestError( + "Statement is not a compiled expression construct." + ) + elif not self.context.isupdate: + raise exc.InvalidRequestError( + "Statement is not an update() expression construct." + ) + elif self.context.executemany: + return self.context.compiled_parameters + else: + return self.context.compiled_parameters[0] + + def last_inserted_params(self): + """Return the collection of inserted parameters from this + execution. + + Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed + statement is not a compiled expression construct + or is not an insert() construct. + + """ + if not self.context.compiled: + raise exc.InvalidRequestError( + "Statement is not a compiled expression construct." + ) + elif not self.context.isinsert: + raise exc.InvalidRequestError( + "Statement is not an insert() expression construct." + ) + elif self.context.executemany: + return self.context.compiled_parameters + else: + return self.context.compiled_parameters[0] + + @property + def returned_defaults_rows(self): + """Return a list of rows each containing the values of default + columns that were fetched using + the :meth:`.ValuesBase.return_defaults` feature. + + The return value is a list of :class:`.Row` objects. + + .. versionadded:: 1.4 + + """ + return self.context.returned_default_rows + + def splice_horizontally(self, other): + """Return a new :class:`.CursorResult` that "horizontally splices" + together the rows of this :class:`.CursorResult` with that of another + :class:`.CursorResult`. + + .. tip:: This method is for the benefit of the SQLAlchemy ORM and is + not intended for general use. + + "horizontally splices" means that for each row in the first and second + result sets, a new row that concatenates the two rows together is + produced, which then becomes the new row. The incoming + :class:`.CursorResult` must have the identical number of rows. It is + typically expected that the two result sets come from the same sort + order as well, as the result rows are spliced together based on their + position in the result. + + The expected use case here is so that multiple INSERT..RETURNING + statements (which definitely need to be sorted) against different + tables can produce a single result that looks like a JOIN of those two + tables. + + E.g.:: + + r1 = connection.execute( + users.insert().returning( + users.c.user_name, + users.c.user_id, + sort_by_parameter_order=True + ), + user_values + ) + + r2 = connection.execute( + addresses.insert().returning( + addresses.c.address_id, + addresses.c.address, + addresses.c.user_id, + sort_by_parameter_order=True + ), + address_values + ) + + rows = r1.splice_horizontally(r2).all() + assert ( + rows == + [ + ("john", 1, 1, "foo@bar.com", 1), + ("jack", 2, 2, "bar@bat.com", 2), + ] + ) + + .. versionadded:: 2.0 + + .. seealso:: + + :meth:`.CursorResult.splice_vertically` + + + """ + + clone = self._generate() + total_rows = [ + tuple(r1) + tuple(r2) + for r1, r2 in zip( + list(self._raw_row_iterator()), + list(other._raw_row_iterator()), + ) + ] + + clone._metadata = clone._metadata._splice_horizontally(other._metadata) + + clone.cursor_strategy = FullyBufferedCursorFetchStrategy( + None, + initial_buffer=total_rows, + ) + clone._reset_memoizations() + return clone + + def splice_vertically(self, other): + """Return a new :class:`.CursorResult` that "vertically splices", + i.e. "extends", the rows of this :class:`.CursorResult` with that of + another :class:`.CursorResult`. + + .. tip:: This method is for the benefit of the SQLAlchemy ORM and is + not intended for general use. + + "vertically splices" means the rows of the given result are appended to + the rows of this cursor result. The incoming :class:`.CursorResult` + must have rows that represent the identical list of columns in the + identical order as they are in this :class:`.CursorResult`. + + .. versionadded:: 2.0 + + .. seealso:: + + :meth:`.CursorResult.splice_horizontally` + + """ + clone = self._generate() + total_rows = list(self._raw_row_iterator()) + list( + other._raw_row_iterator() + ) + + clone.cursor_strategy = FullyBufferedCursorFetchStrategy( + None, + initial_buffer=total_rows, + ) + clone._reset_memoizations() + return clone + + def _rewind(self, rows): + """rewind this result back to the given rowset. + + this is used internally for the case where an :class:`.Insert` + construct combines the use of + :meth:`.Insert.return_defaults` along with the + "supplemental columns" feature. + + """ + + if self._echo: + self.context.connection._log_debug( + "CursorResult rewound %d row(s)", len(rows) + ) + + # the rows given are expected to be Row objects, so we + # have to clear out processors which have already run on these + # rows + self._metadata = cast( + CursorResultMetaData, self._metadata + )._remove_processors() + + self.cursor_strategy = FullyBufferedCursorFetchStrategy( + None, + # TODO: if these are Row objects, can we save on not having to + # re-make new Row objects out of them a second time? is that + # what's actually happening right now? maybe look into this + initial_buffer=rows, + ) + self._reset_memoizations() + return self + + @property + def returned_defaults(self): + """Return the values of default columns that were fetched using + the :meth:`.ValuesBase.return_defaults` feature. + + The value is an instance of :class:`.Row`, or ``None`` + if :meth:`.ValuesBase.return_defaults` was not used or if the + backend does not support RETURNING. + + .. seealso:: + + :meth:`.ValuesBase.return_defaults` + + """ + + if self.context.executemany: + raise exc.InvalidRequestError( + "This statement was an executemany call; if return defaults " + "is supported, please use .returned_defaults_rows." + ) + + rows = self.context.returned_default_rows + if rows: + return rows[0] + else: + return None + + def lastrow_has_defaults(self): + """Return ``lastrow_has_defaults()`` from the underlying + :class:`.ExecutionContext`. + + See :class:`.ExecutionContext` for details. + + """ + + return self.context.lastrow_has_defaults() + + def postfetch_cols(self): + """Return ``postfetch_cols()`` from the underlying + :class:`.ExecutionContext`. + + See :class:`.ExecutionContext` for details. + + Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed + statement is not a compiled expression construct + or is not an insert() or update() construct. + + """ + + if not self.context.compiled: + raise exc.InvalidRequestError( + "Statement is not a compiled expression construct." + ) + elif not self.context.isinsert and not self.context.isupdate: + raise exc.InvalidRequestError( + "Statement is not an insert() or update() " + "expression construct." + ) + return self.context.postfetch_cols + + def prefetch_cols(self): + """Return ``prefetch_cols()`` from the underlying + :class:`.ExecutionContext`. + + See :class:`.ExecutionContext` for details. + + Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed + statement is not a compiled expression construct + or is not an insert() or update() construct. + + """ + + if not self.context.compiled: + raise exc.InvalidRequestError( + "Statement is not a compiled expression construct." + ) + elif not self.context.isinsert and not self.context.isupdate: + raise exc.InvalidRequestError( + "Statement is not an insert() or update() " + "expression construct." + ) + return self.context.prefetch_cols + + def supports_sane_rowcount(self): + """Return ``supports_sane_rowcount`` from the dialect. + + See :attr:`_engine.CursorResult.rowcount` for background. + + """ + + return self.dialect.supports_sane_rowcount + + def supports_sane_multi_rowcount(self): + """Return ``supports_sane_multi_rowcount`` from the dialect. + + See :attr:`_engine.CursorResult.rowcount` for background. + + """ + + return self.dialect.supports_sane_multi_rowcount + + @util.memoized_property + def rowcount(self) -> int: + """Return the 'rowcount' for this result. + + The primary purpose of 'rowcount' is to report the number of rows + matched by the WHERE criterion of an UPDATE or DELETE statement + executed once (i.e. for a single parameter set), which may then be + compared to the number of rows expected to be updated or deleted as a + means of asserting data integrity. + + This attribute is transferred from the ``cursor.rowcount`` attribute + of the DBAPI before the cursor is closed, to support DBAPIs that + don't make this value available after cursor close. Some DBAPIs may + offer meaningful values for other kinds of statements, such as INSERT + and SELECT statements as well. In order to retrieve ``cursor.rowcount`` + for these statements, set the + :paramref:`.Connection.execution_options.preserve_rowcount` + execution option to True, which will cause the ``cursor.rowcount`` + value to be unconditionally memoized before any results are returned + or the cursor is closed, regardless of statement type. + + For cases where the DBAPI does not support rowcount for a particular + kind of statement and/or execution, the returned value will be ``-1``, + which is delivered directly from the DBAPI and is part of :pep:`249`. + All DBAPIs should support rowcount for single-parameter-set + UPDATE and DELETE statements, however. + + .. note:: + + Notes regarding :attr:`_engine.CursorResult.rowcount`: + + + * This attribute returns the number of rows *matched*, + which is not necessarily the same as the number of rows + that were actually *modified*. For example, an UPDATE statement + may have no net change on a given row if the SET values + given are the same as those present in the row already. + Such a row would be matched but not modified. + On backends that feature both styles, such as MySQL, + rowcount is configured to return the match + count in all cases. + + * :attr:`_engine.CursorResult.rowcount` in the default case is + *only* useful in conjunction with an UPDATE or DELETE statement, + and only with a single set of parameters. For other kinds of + statements, SQLAlchemy will not attempt to pre-memoize the value + unless the + :paramref:`.Connection.execution_options.preserve_rowcount` + execution option is used. Note that contrary to :pep:`249`, many + DBAPIs do not support rowcount values for statements that are not + UPDATE or DELETE, particularly when rows are being returned which + are not fully pre-buffered. DBAPIs that dont support rowcount + for a particular kind of statement should return the value ``-1`` + for such statements. + + * :attr:`_engine.CursorResult.rowcount` may not be meaningful + when executing a single statement with multiple parameter sets + (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount" + values across multiple parameter sets and will return ``-1`` + when accessed. + + * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support + a correct population of :attr:`_engine.CursorResult.rowcount` + when the :paramref:`.Connection.execution_options.preserve_rowcount` + execution option is set to True. + + * Statements that use RETURNING may not support rowcount, returning + a ``-1`` value instead. + + .. seealso:: + + :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial` + + :paramref:`.Connection.execution_options.preserve_rowcount` + + """ # noqa: E501 + try: + return self.context.rowcount + except BaseException as e: + self.cursor_strategy.handle_exception(self, self.cursor, e) + raise # not called + + @property + def lastrowid(self): + """Return the 'lastrowid' accessor on the DBAPI cursor. + + This is a DBAPI specific method and is only functional + for those backends which support it, for statements + where it is appropriate. It's behavior is not + consistent across backends. + + Usage of this method is normally unnecessary when + using insert() expression constructs; the + :attr:`~CursorResult.inserted_primary_key` attribute provides a + tuple of primary key values for a newly inserted row, + regardless of database backend. + + """ + try: + return self.context.get_lastrowid() + except BaseException as e: + self.cursor_strategy.handle_exception(self, self.cursor, e) + + @property + def returns_rows(self): + """True if this :class:`_engine.CursorResult` returns zero or more + rows. + + I.e. if it is legal to call the methods + :meth:`_engine.CursorResult.fetchone`, + :meth:`_engine.CursorResult.fetchmany` + :meth:`_engine.CursorResult.fetchall`. + + Overall, the value of :attr:`_engine.CursorResult.returns_rows` should + always be synonymous with whether or not the DBAPI cursor had a + ``.description`` attribute, indicating the presence of result columns, + noting that a cursor that returns zero rows still has a + ``.description`` if a row-returning statement was emitted. + + This attribute should be True for all results that are against + SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE + that use RETURNING. For INSERT/UPDATE/DELETE statements that were + not using RETURNING, the value will usually be False, however + there are some dialect-specific exceptions to this, such as when + using the MSSQL / pyodbc dialect a SELECT is emitted inline in + order to retrieve an inserted primary key value. + + + """ + return self._metadata.returns_rows + + @property + def is_insert(self): + """True if this :class:`_engine.CursorResult` is the result + of a executing an expression language compiled + :func:`_expression.insert` construct. + + When True, this implies that the + :attr:`inserted_primary_key` attribute is accessible, + assuming the statement did not include + a user defined "returning" construct. + + """ + return self.context.isinsert + + def _fetchiter_impl(self): + fetchone = self.cursor_strategy.fetchone + + while True: + row = fetchone(self, self.cursor) + if row is None: + break + yield row + + def _fetchone_impl(self, hard_close=False): + return self.cursor_strategy.fetchone(self, self.cursor, hard_close) + + def _fetchall_impl(self): + return self.cursor_strategy.fetchall(self, self.cursor) + + def _fetchmany_impl(self, size=None): + return self.cursor_strategy.fetchmany(self, self.cursor, size) + + def _raw_row_iterator(self): + return self._fetchiter_impl() + + def merge(self, *others: Result[Any]) -> MergedResult[Any]: + merged_result = super().merge(*others) + if self.context._has_rowcount: + merged_result.rowcount = sum( + cast("CursorResult[Any]", result).rowcount + for result in (self,) + others + ) + return merged_result + + def close(self) -> Any: + """Close this :class:`_engine.CursorResult`. + + This closes out the underlying DBAPI cursor corresponding to the + statement execution, if one is still present. Note that the DBAPI + cursor is automatically released when the :class:`_engine.CursorResult` + exhausts all available rows. :meth:`_engine.CursorResult.close` is + generally an optional method except in the case when discarding a + :class:`_engine.CursorResult` that still has additional rows pending + for fetch. + + After this method is called, it is no longer valid to call upon + the fetch methods, which will raise a :class:`.ResourceClosedError` + on subsequent use. + + .. seealso:: + + :ref:`connections_toplevel` + + """ + self._soft_close(hard=True) + + @_generative + def yield_per(self, num: int) -> Self: + self._yield_per = num + self.cursor_strategy.yield_per(self, self.cursor, num) + return self + + +ResultProxy = CursorResult -- cgit v1.2.3