From 12cf076118570eebbff08c6b3090e0d4798447a1 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:17:55 -0400 Subject: no venv --- .../site-packages/sqlalchemy/engine/cursor.py | 2178 -------------------- 1 file changed, 2178 deletions(-) delete mode 100644 venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py') diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py b/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py deleted file mode 100644 index 71767db..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/engine/cursor.py +++ /dev/null @@ -1,2178 +0,0 @@ -# engine/cursor.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: allow-untyped-defs, allow-untyped-calls - -"""Define cursor-specific result set constructs including -:class:`.CursorResult`.""" - - -from __future__ import annotations - -import collections -import functools -import operator -import typing -from typing import Any -from typing import cast -from typing import ClassVar -from typing import Dict -from typing import Iterator -from typing import List -from typing import Mapping -from typing import NoReturn -from typing import Optional -from typing import Sequence -from typing import Tuple -from typing import TYPE_CHECKING -from typing import TypeVar -from typing import Union - -from .result import IteratorResult -from .result import MergedResult -from .result import Result -from .result import ResultMetaData -from .result import SimpleResultMetaData -from .result import tuplegetter -from .row import Row -from .. import exc -from .. import util -from ..sql import elements -from ..sql import sqltypes -from ..sql import util as sql_util -from ..sql.base import _generative -from ..sql.compiler import ResultColumnsEntry -from ..sql.compiler import RM_NAME -from ..sql.compiler import RM_OBJECTS -from ..sql.compiler import RM_RENDERED_NAME -from ..sql.compiler import RM_TYPE -from ..sql.type_api import TypeEngine -from ..util import compat -from ..util.typing import Literal -from ..util.typing import Self - - -if typing.TYPE_CHECKING: - from .base import Connection - from .default import DefaultExecutionContext - from .interfaces import _DBAPICursorDescription - from .interfaces import DBAPICursor - from .interfaces import Dialect - from .interfaces import ExecutionContext - from .result import _KeyIndexType - from .result import _KeyMapRecType - from .result import _KeyMapType - from .result import _KeyType - from .result import _ProcessorsType - from .result import _TupleGetterType - from ..sql.type_api import _ResultProcessorType - - -_T = TypeVar("_T", bound=Any) - - -# metadata entry tuple indexes. -# using raw tuple is faster than namedtuple. -# these match up to the positions in -# _CursorKeyMapRecType -MD_INDEX: Literal[0] = 0 -"""integer index in cursor.description - -""" - -MD_RESULT_MAP_INDEX: Literal[1] = 1 -"""integer index in compiled._result_columns""" - -MD_OBJECTS: Literal[2] = 2 -"""other string keys and ColumnElement obj that can match. - -This comes from compiler.RM_OBJECTS / compiler.ResultColumnsEntry.objects - -""" - -MD_LOOKUP_KEY: Literal[3] = 3 -"""string key we usually expect for key-based lookup - -this comes from compiler.RM_NAME / compiler.ResultColumnsEntry.name -""" - - -MD_RENDERED_NAME: Literal[4] = 4 -"""name that is usually in cursor.description - -this comes from compiler.RENDERED_NAME / compiler.ResultColumnsEntry.keyname -""" - - -MD_PROCESSOR: Literal[5] = 5 -"""callable to process a result value into a row""" - -MD_UNTRANSLATED: Literal[6] = 6 -"""raw name from cursor.description""" - - -_CursorKeyMapRecType = Tuple[ - Optional[int], # MD_INDEX, None means the record is ambiguously named - int, # MD_RESULT_MAP_INDEX - List[Any], # MD_OBJECTS - str, # MD_LOOKUP_KEY - str, # MD_RENDERED_NAME - Optional["_ResultProcessorType[Any]"], # MD_PROCESSOR - Optional[str], # MD_UNTRANSLATED -] - -_CursorKeyMapType = Mapping["_KeyType", _CursorKeyMapRecType] - -# same as _CursorKeyMapRecType except the MD_INDEX value is definitely -# not None -_NonAmbigCursorKeyMapRecType = Tuple[ - int, - int, - List[Any], - str, - str, - Optional["_ResultProcessorType[Any]"], - str, -] - - -class CursorResultMetaData(ResultMetaData): - """Result metadata for DBAPI cursors.""" - - __slots__ = ( - "_keymap", - "_processors", - "_keys", - "_keymap_by_result_column_idx", - "_tuplefilter", - "_translated_indexes", - "_safe_for_cache", - "_unpickled", - "_key_to_index", - # don't need _unique_filters support here for now. Can be added - # if a need arises. - ) - - _keymap: _CursorKeyMapType - _processors: _ProcessorsType - _keymap_by_result_column_idx: Optional[Dict[int, _KeyMapRecType]] - _unpickled: bool - _safe_for_cache: bool - _translated_indexes: Optional[List[int]] - - returns_rows: ClassVar[bool] = True - - def _has_key(self, key: Any) -> bool: - return key in self._keymap - - def _for_freeze(self) -> ResultMetaData: - return SimpleResultMetaData( - self._keys, - extra=[self._keymap[key][MD_OBJECTS] for key in self._keys], - ) - - def _make_new_metadata( - self, - *, - unpickled: bool, - processors: _ProcessorsType, - keys: Sequence[str], - keymap: _KeyMapType, - tuplefilter: Optional[_TupleGetterType], - translated_indexes: Optional[List[int]], - safe_for_cache: bool, - keymap_by_result_column_idx: Any, - ) -> CursorResultMetaData: - new_obj = self.__class__.__new__(self.__class__) - new_obj._unpickled = unpickled - new_obj._processors = processors - new_obj._keys = keys - new_obj._keymap = keymap - new_obj._tuplefilter = tuplefilter - new_obj._translated_indexes = translated_indexes - new_obj._safe_for_cache = safe_for_cache - new_obj._keymap_by_result_column_idx = keymap_by_result_column_idx - new_obj._key_to_index = self._make_key_to_index(keymap, MD_INDEX) - return new_obj - - def _remove_processors(self) -> CursorResultMetaData: - assert not self._tuplefilter - return self._make_new_metadata( - unpickled=self._unpickled, - processors=[None] * len(self._processors), - tuplefilter=None, - translated_indexes=None, - keymap={ - key: value[0:5] + (None,) + value[6:] - for key, value in self._keymap.items() - }, - keys=self._keys, - safe_for_cache=self._safe_for_cache, - keymap_by_result_column_idx=self._keymap_by_result_column_idx, - ) - - def _splice_horizontally( - self, other: CursorResultMetaData - ) -> CursorResultMetaData: - assert not self._tuplefilter - - keymap = dict(self._keymap) - offset = len(self._keys) - keymap.update( - { - key: ( - # int index should be None for ambiguous key - ( - value[0] + offset - if value[0] is not None and key not in keymap - else None - ), - value[1] + offset, - *value[2:], - ) - for key, value in other._keymap.items() - } - ) - return self._make_new_metadata( - unpickled=self._unpickled, - processors=self._processors + other._processors, # type: ignore - tuplefilter=None, - translated_indexes=None, - keys=self._keys + other._keys, # type: ignore - keymap=keymap, - safe_for_cache=self._safe_for_cache, - keymap_by_result_column_idx={ - metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry - for metadata_entry in keymap.values() - }, - ) - - def _reduce(self, keys: Sequence[_KeyIndexType]) -> ResultMetaData: - recs = list(self._metadata_for_keys(keys)) - - indexes = [rec[MD_INDEX] for rec in recs] - new_keys: List[str] = [rec[MD_LOOKUP_KEY] for rec in recs] - - if self._translated_indexes: - indexes = [self._translated_indexes[idx] for idx in indexes] - tup = tuplegetter(*indexes) - new_recs = [(index,) + rec[1:] for index, rec in enumerate(recs)] - - keymap = {rec[MD_LOOKUP_KEY]: rec for rec in new_recs} - # TODO: need unit test for: - # result = connection.execute("raw sql, no columns").scalars() - # without the "or ()" it's failing because MD_OBJECTS is None - keymap.update( - (e, new_rec) - for new_rec in new_recs - for e in new_rec[MD_OBJECTS] or () - ) - - return self._make_new_metadata( - unpickled=self._unpickled, - processors=self._processors, - keys=new_keys, - tuplefilter=tup, - translated_indexes=indexes, - keymap=keymap, # type: ignore[arg-type] - safe_for_cache=self._safe_for_cache, - keymap_by_result_column_idx=self._keymap_by_result_column_idx, - ) - - def _adapt_to_context(self, context: ExecutionContext) -> ResultMetaData: - """When using a cached Compiled construct that has a _result_map, - for a new statement that used the cached Compiled, we need to ensure - the keymap has the Column objects from our new statement as keys. - So here we rewrite keymap with new entries for the new columns - as matched to those of the cached statement. - - """ - - if not context.compiled or not context.compiled._result_columns: - return self - - compiled_statement = context.compiled.statement - invoked_statement = context.invoked_statement - - if TYPE_CHECKING: - assert isinstance(invoked_statement, elements.ClauseElement) - - if compiled_statement is invoked_statement: - return self - - assert invoked_statement is not None - - # this is the most common path for Core statements when - # caching is used. In ORM use, this codepath is not really used - # as the _result_disable_adapt_to_context execution option is - # set by the ORM. - - # make a copy and add the columns from the invoked statement - # to the result map. - - keymap_by_position = self._keymap_by_result_column_idx - - if keymap_by_position is None: - # first retrival from cache, this map will not be set up yet, - # initialize lazily - keymap_by_position = self._keymap_by_result_column_idx = { - metadata_entry[MD_RESULT_MAP_INDEX]: metadata_entry - for metadata_entry in self._keymap.values() - } - - assert not self._tuplefilter - return self._make_new_metadata( - keymap=compat.dict_union( - self._keymap, - { - new: keymap_by_position[idx] - for idx, new in enumerate( - invoked_statement._all_selected_columns - ) - if idx in keymap_by_position - }, - ), - unpickled=self._unpickled, - processors=self._processors, - tuplefilter=None, - translated_indexes=None, - keys=self._keys, - safe_for_cache=self._safe_for_cache, - keymap_by_result_column_idx=self._keymap_by_result_column_idx, - ) - - def __init__( - self, - parent: CursorResult[Any], - cursor_description: _DBAPICursorDescription, - ): - context = parent.context - self._tuplefilter = None - self._translated_indexes = None - self._safe_for_cache = self._unpickled = False - - if context.result_column_struct: - ( - result_columns, - cols_are_ordered, - textual_ordered, - ad_hoc_textual, - loose_column_name_matching, - ) = context.result_column_struct - num_ctx_cols = len(result_columns) - else: - result_columns = cols_are_ordered = ( # type: ignore - num_ctx_cols - ) = ad_hoc_textual = loose_column_name_matching = ( - textual_ordered - ) = False - - # merge cursor.description with the column info - # present in the compiled structure, if any - raw = self._merge_cursor_description( - context, - cursor_description, - result_columns, - num_ctx_cols, - cols_are_ordered, - textual_ordered, - ad_hoc_textual, - loose_column_name_matching, - ) - - # processors in key order which are used when building up - # a row - self._processors = [ - metadata_entry[MD_PROCESSOR] for metadata_entry in raw - ] - - # this is used when using this ResultMetaData in a Core-only cache - # retrieval context. it's initialized on first cache retrieval - # when the _result_disable_adapt_to_context execution option - # (which the ORM generally sets) is not set. - self._keymap_by_result_column_idx = None - - # for compiled SQL constructs, copy additional lookup keys into - # the key lookup map, such as Column objects, labels, - # column keys and other names - if num_ctx_cols: - # keymap by primary string... - by_key = { - metadata_entry[MD_LOOKUP_KEY]: metadata_entry - for metadata_entry in raw - } - - if len(by_key) != num_ctx_cols: - # if by-primary-string dictionary smaller than - # number of columns, assume we have dupes; (this check - # is also in place if string dictionary is bigger, as - # can occur when '*' was used as one of the compiled columns, - # which may or may not be suggestive of dupes), rewrite - # dupe records with "None" for index which results in - # ambiguous column exception when accessed. - # - # this is considered to be the less common case as it is not - # common to have dupe column keys in a SELECT statement. - # - # new in 1.4: get the complete set of all possible keys, - # strings, objects, whatever, that are dupes across two - # different records, first. - index_by_key: Dict[Any, Any] = {} - dupes = set() - for metadata_entry in raw: - for key in (metadata_entry[MD_RENDERED_NAME],) + ( - metadata_entry[MD_OBJECTS] or () - ): - idx = metadata_entry[MD_INDEX] - # if this key has been associated with more than one - # positional index, it's a dupe - if index_by_key.setdefault(key, idx) != idx: - dupes.add(key) - - # then put everything we have into the keymap excluding only - # those keys that are dupes. - self._keymap = { - obj_elem: metadata_entry - for metadata_entry in raw - if metadata_entry[MD_OBJECTS] - for obj_elem in metadata_entry[MD_OBJECTS] - if obj_elem not in dupes - } - - # then for the dupe keys, put the "ambiguous column" - # record into by_key. - by_key.update( - { - key: (None, None, [], key, key, None, None) - for key in dupes - } - ) - - else: - # no dupes - copy secondary elements from compiled - # columns into self._keymap. this is the most common - # codepath for Core / ORM statement executions before the - # result metadata is cached - self._keymap = { - obj_elem: metadata_entry - for metadata_entry in raw - if metadata_entry[MD_OBJECTS] - for obj_elem in metadata_entry[MD_OBJECTS] - } - # update keymap with primary string names taking - # precedence - self._keymap.update(by_key) - else: - # no compiled objects to map, just create keymap by primary string - self._keymap = { - metadata_entry[MD_LOOKUP_KEY]: metadata_entry - for metadata_entry in raw - } - - # update keymap with "translated" names. In SQLAlchemy this is a - # sqlite only thing, and in fact impacting only extremely old SQLite - # versions unlikely to be present in modern Python versions. - # however, the pyhive third party dialect is - # also using this hook, which means others still might use it as well. - # I dislike having this awkward hook here but as long as we need - # to use names in cursor.description in some cases we need to have - # some hook to accomplish this. - if not num_ctx_cols and context._translate_colname: - self._keymap.update( - { - metadata_entry[MD_UNTRANSLATED]: self._keymap[ - metadata_entry[MD_LOOKUP_KEY] - ] - for metadata_entry in raw - if metadata_entry[MD_UNTRANSLATED] - } - ) - - self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) - - def _merge_cursor_description( - self, - context, - cursor_description, - result_columns, - num_ctx_cols, - cols_are_ordered, - textual_ordered, - ad_hoc_textual, - loose_column_name_matching, - ): - """Merge a cursor.description with compiled result column information. - - There are at least four separate strategies used here, selected - depending on the type of SQL construct used to start with. - - The most common case is that of the compiled SQL expression construct, - which generated the column names present in the raw SQL string and - which has the identical number of columns as were reported by - cursor.description. In this case, we assume a 1-1 positional mapping - between the entries in cursor.description and the compiled object. - This is also the most performant case as we disregard extracting / - decoding the column names present in cursor.description since we - already have the desired name we generated in the compiled SQL - construct. - - The next common case is that of the completely raw string SQL, - such as passed to connection.execute(). In this case we have no - compiled construct to work with, so we extract and decode the - names from cursor.description and index those as the primary - result row target keys. - - The remaining fairly common case is that of the textual SQL - that includes at least partial column information; this is when - we use a :class:`_expression.TextualSelect` construct. - This construct may have - unordered or ordered column information. In the ordered case, we - merge the cursor.description and the compiled construct's information - positionally, and warn if there are additional description names - present, however we still decode the names in cursor.description - as we don't have a guarantee that the names in the columns match - on these. In the unordered case, we match names in cursor.description - to that of the compiled construct based on name matching. - In both of these cases, the cursor.description names and the column - expression objects and names are indexed as result row target keys. - - The final case is much less common, where we have a compiled - non-textual SQL expression construct, but the number of columns - in cursor.description doesn't match what's in the compiled - construct. We make the guess here that there might be textual - column expressions in the compiled construct that themselves include - a comma in them causing them to split. We do the same name-matching - as with textual non-ordered columns. - - The name-matched system of merging is the same as that used by - SQLAlchemy for all cases up through the 0.9 series. Positional - matching for compiled SQL expressions was introduced in 1.0 as a - major performance feature, and positional matching for textual - :class:`_expression.TextualSelect` objects in 1.1. - As name matching is no longer - a common case, it was acceptable to factor it into smaller generator- - oriented methods that are easier to understand, but incur slightly - more performance overhead. - - """ - - if ( - num_ctx_cols - and cols_are_ordered - and not textual_ordered - and num_ctx_cols == len(cursor_description) - ): - self._keys = [elem[0] for elem in result_columns] - # pure positional 1-1 case; doesn't need to read - # the names from cursor.description - - # most common case for Core and ORM - - # this metadata is safe to cache because we are guaranteed - # to have the columns in the same order for new executions - self._safe_for_cache = True - return [ - ( - idx, - idx, - rmap_entry[RM_OBJECTS], - rmap_entry[RM_NAME], - rmap_entry[RM_RENDERED_NAME], - context.get_result_processor( - rmap_entry[RM_TYPE], - rmap_entry[RM_RENDERED_NAME], - cursor_description[idx][1], - ), - None, - ) - for idx, rmap_entry in enumerate(result_columns) - ] - else: - # name-based or text-positional cases, where we need - # to read cursor.description names - - if textual_ordered or ( - ad_hoc_textual and len(cursor_description) == num_ctx_cols - ): - self._safe_for_cache = True - # textual positional case - raw_iterator = self._merge_textual_cols_by_position( - context, cursor_description, result_columns - ) - elif num_ctx_cols: - # compiled SQL with a mismatch of description cols - # vs. compiled cols, or textual w/ unordered columns - # the order of columns can change if the query is - # against a "select *", so not safe to cache - self._safe_for_cache = False - raw_iterator = self._merge_cols_by_name( - context, - cursor_description, - result_columns, - loose_column_name_matching, - ) - else: - # no compiled SQL, just a raw string, order of columns - # can change for "select *" - self._safe_for_cache = False - raw_iterator = self._merge_cols_by_none( - context, cursor_description - ) - - return [ - ( - idx, - ridx, - obj, - cursor_colname, - cursor_colname, - context.get_result_processor( - mapped_type, cursor_colname, coltype - ), - untranslated, - ) - for ( - idx, - ridx, - cursor_colname, - mapped_type, - coltype, - obj, - untranslated, - ) in raw_iterator - ] - - def _colnames_from_description(self, context, cursor_description): - """Extract column names and data types from a cursor.description. - - Applies unicode decoding, column translation, "normalization", - and case sensitivity rules to the names based on the dialect. - - """ - - dialect = context.dialect - translate_colname = context._translate_colname - normalize_name = ( - dialect.normalize_name if dialect.requires_name_normalize else None - ) - untranslated = None - - self._keys = [] - - for idx, rec in enumerate(cursor_description): - colname = rec[0] - coltype = rec[1] - - if translate_colname: - colname, untranslated = translate_colname(colname) - - if normalize_name: - colname = normalize_name(colname) - - self._keys.append(colname) - - yield idx, colname, untranslated, coltype - - def _merge_textual_cols_by_position( - self, context, cursor_description, result_columns - ): - num_ctx_cols = len(result_columns) - - if num_ctx_cols > len(cursor_description): - util.warn( - "Number of columns in textual SQL (%d) is " - "smaller than number of columns requested (%d)" - % (num_ctx_cols, len(cursor_description)) - ) - seen = set() - for ( - idx, - colname, - untranslated, - coltype, - ) in self._colnames_from_description(context, cursor_description): - if idx < num_ctx_cols: - ctx_rec = result_columns[idx] - obj = ctx_rec[RM_OBJECTS] - ridx = idx - mapped_type = ctx_rec[RM_TYPE] - if obj[0] in seen: - raise exc.InvalidRequestError( - "Duplicate column expression requested " - "in textual SQL: %r" % obj[0] - ) - seen.add(obj[0]) - else: - mapped_type = sqltypes.NULLTYPE - obj = None - ridx = None - yield idx, ridx, colname, mapped_type, coltype, obj, untranslated - - def _merge_cols_by_name( - self, - context, - cursor_description, - result_columns, - loose_column_name_matching, - ): - match_map = self._create_description_match_map( - result_columns, loose_column_name_matching - ) - mapped_type: TypeEngine[Any] - - for ( - idx, - colname, - untranslated, - coltype, - ) in self._colnames_from_description(context, cursor_description): - try: - ctx_rec = match_map[colname] - except KeyError: - mapped_type = sqltypes.NULLTYPE - obj = None - result_columns_idx = None - else: - obj = ctx_rec[1] - mapped_type = ctx_rec[2] - result_columns_idx = ctx_rec[3] - yield ( - idx, - result_columns_idx, - colname, - mapped_type, - coltype, - obj, - untranslated, - ) - - @classmethod - def _create_description_match_map( - cls, - result_columns: List[ResultColumnsEntry], - loose_column_name_matching: bool = False, - ) -> Dict[ - Union[str, object], Tuple[str, Tuple[Any, ...], TypeEngine[Any], int] - ]: - """when matching cursor.description to a set of names that are present - in a Compiled object, as is the case with TextualSelect, get all the - names we expect might match those in cursor.description. - """ - - d: Dict[ - Union[str, object], - Tuple[str, Tuple[Any, ...], TypeEngine[Any], int], - ] = {} - for ridx, elem in enumerate(result_columns): - key = elem[RM_RENDERED_NAME] - if key in d: - # conflicting keyname - just add the column-linked objects - # to the existing record. if there is a duplicate column - # name in the cursor description, this will allow all of those - # objects to raise an ambiguous column error - e_name, e_obj, e_type, e_ridx = d[key] - d[key] = e_name, e_obj + elem[RM_OBJECTS], e_type, ridx - else: - d[key] = (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx) - - if loose_column_name_matching: - # when using a textual statement with an unordered set - # of columns that line up, we are expecting the user - # to be using label names in the SQL that match to the column - # expressions. Enable more liberal matching for this case; - # duplicate keys that are ambiguous will be fixed later. - for r_key in elem[RM_OBJECTS]: - d.setdefault( - r_key, - (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE], ridx), - ) - return d - - def _merge_cols_by_none(self, context, cursor_description): - for ( - idx, - colname, - untranslated, - coltype, - ) in self._colnames_from_description(context, cursor_description): - yield ( - idx, - None, - colname, - sqltypes.NULLTYPE, - coltype, - None, - untranslated, - ) - - if not TYPE_CHECKING: - - def _key_fallback( - self, key: Any, err: Optional[Exception], raiseerr: bool = True - ) -> Optional[NoReturn]: - if raiseerr: - if self._unpickled and isinstance(key, elements.ColumnElement): - raise exc.NoSuchColumnError( - "Row was unpickled; lookup by ColumnElement " - "is unsupported" - ) from err - else: - raise exc.NoSuchColumnError( - "Could not locate column in row for column '%s'" - % util.string_or_unprintable(key) - ) from err - else: - return None - - def _raise_for_ambiguous_column_name(self, rec): - raise exc.InvalidRequestError( - "Ambiguous column name '%s' in " - "result set column descriptions" % rec[MD_LOOKUP_KEY] - ) - - def _index_for_key(self, key: Any, raiseerr: bool = True) -> Optional[int]: - # TODO: can consider pre-loading ints and negative ints - # into _keymap - also no coverage here - if isinstance(key, int): - key = self._keys[key] - - try: - rec = self._keymap[key] - except KeyError as ke: - x = self._key_fallback(key, ke, raiseerr) - assert x is None - return None - - index = rec[0] - - if index is None: - self._raise_for_ambiguous_column_name(rec) - return index - - def _indexes_for_keys(self, keys): - try: - return [self._keymap[key][0] for key in keys] - except KeyError as ke: - # ensure it raises - CursorResultMetaData._key_fallback(self, ke.args[0], ke) - - def _metadata_for_keys( - self, keys: Sequence[Any] - ) -> Iterator[_NonAmbigCursorKeyMapRecType]: - for key in keys: - if int in key.__class__.__mro__: - key = self._keys[key] - - try: - rec = self._keymap[key] - except KeyError as ke: - # ensure it raises - CursorResultMetaData._key_fallback(self, ke.args[0], ke) - - index = rec[MD_INDEX] - - if index is None: - self._raise_for_ambiguous_column_name(rec) - - yield cast(_NonAmbigCursorKeyMapRecType, rec) - - def __getstate__(self): - # TODO: consider serializing this as SimpleResultMetaData - return { - "_keymap": { - key: ( - rec[MD_INDEX], - rec[MD_RESULT_MAP_INDEX], - [], - key, - rec[MD_RENDERED_NAME], - None, - None, - ) - for key, rec in self._keymap.items() - if isinstance(key, (str, int)) - }, - "_keys": self._keys, - "_translated_indexes": self._translated_indexes, - } - - def __setstate__(self, state): - self._processors = [None for _ in range(len(state["_keys"]))] - self._keymap = state["_keymap"] - self._keymap_by_result_column_idx = None - self._key_to_index = self._make_key_to_index(self._keymap, MD_INDEX) - self._keys = state["_keys"] - self._unpickled = True - if state["_translated_indexes"]: - self._translated_indexes = cast( - "List[int]", state["_translated_indexes"] - ) - self._tuplefilter = tuplegetter(*self._translated_indexes) - else: - self._translated_indexes = self._tuplefilter = None - - -class ResultFetchStrategy: - """Define a fetching strategy for a result object. - - - .. versionadded:: 1.4 - - """ - - __slots__ = () - - alternate_cursor_description: Optional[_DBAPICursorDescription] = None - - def soft_close( - self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] - ) -> None: - raise NotImplementedError() - - def hard_close( - self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] - ) -> None: - raise NotImplementedError() - - def yield_per( - self, - result: CursorResult[Any], - dbapi_cursor: Optional[DBAPICursor], - num: int, - ) -> None: - return - - def fetchone( - self, - result: CursorResult[Any], - dbapi_cursor: DBAPICursor, - hard_close: bool = False, - ) -> Any: - raise NotImplementedError() - - def fetchmany( - self, - result: CursorResult[Any], - dbapi_cursor: DBAPICursor, - size: Optional[int] = None, - ) -> Any: - raise NotImplementedError() - - def fetchall( - self, - result: CursorResult[Any], - dbapi_cursor: DBAPICursor, - ) -> Any: - raise NotImplementedError() - - def handle_exception( - self, - result: CursorResult[Any], - dbapi_cursor: Optional[DBAPICursor], - err: BaseException, - ) -> NoReturn: - raise err - - -class NoCursorFetchStrategy(ResultFetchStrategy): - """Cursor strategy for a result that has no open cursor. - - There are two varieties of this strategy, one for DQL and one for - DML (and also DDL), each of which represent a result that had a cursor - but no longer has one. - - """ - - __slots__ = () - - def soft_close(self, result, dbapi_cursor): - pass - - def hard_close(self, result, dbapi_cursor): - pass - - def fetchone(self, result, dbapi_cursor, hard_close=False): - return self._non_result(result, None) - - def fetchmany(self, result, dbapi_cursor, size=None): - return self._non_result(result, []) - - def fetchall(self, result, dbapi_cursor): - return self._non_result(result, []) - - def _non_result(self, result, default, err=None): - raise NotImplementedError() - - -class NoCursorDQLFetchStrategy(NoCursorFetchStrategy): - """Cursor strategy for a DQL result that has no open cursor. - - This is a result set that can return rows, i.e. for a SELECT, or for an - INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state - where the cursor is closed and no rows remain available. The owning result - object may or may not be "hard closed", which determines if the fetch - methods send empty results or raise for closed result. - - """ - - __slots__ = () - - def _non_result(self, result, default, err=None): - if result.closed: - raise exc.ResourceClosedError( - "This result object is closed." - ) from err - else: - return default - - -_NO_CURSOR_DQL = NoCursorDQLFetchStrategy() - - -class NoCursorDMLFetchStrategy(NoCursorFetchStrategy): - """Cursor strategy for a DML result that has no open cursor. - - This is a result set that does not return rows, i.e. for an INSERT, - UPDATE, DELETE that does not include RETURNING. - - """ - - __slots__ = () - - def _non_result(self, result, default, err=None): - # we only expect to have a _NoResultMetaData() here right now. - assert not result._metadata.returns_rows - result._metadata._we_dont_return_rows(err) - - -_NO_CURSOR_DML = NoCursorDMLFetchStrategy() - - -class CursorFetchStrategy(ResultFetchStrategy): - """Call fetch methods from a DBAPI cursor. - - Alternate versions of this class may instead buffer the rows from - cursors or not use cursors at all. - - """ - - __slots__ = () - - def soft_close( - self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] - ) -> None: - result.cursor_strategy = _NO_CURSOR_DQL - - def hard_close( - self, result: CursorResult[Any], dbapi_cursor: Optional[DBAPICursor] - ) -> None: - result.cursor_strategy = _NO_CURSOR_DQL - - def handle_exception( - self, - result: CursorResult[Any], - dbapi_cursor: Optional[DBAPICursor], - err: BaseException, - ) -> NoReturn: - result.connection._handle_dbapi_exception( - err, None, None, dbapi_cursor, result.context - ) - - def yield_per( - self, - result: CursorResult[Any], - dbapi_cursor: Optional[DBAPICursor], - num: int, - ) -> None: - result.cursor_strategy = BufferedRowCursorFetchStrategy( - dbapi_cursor, - {"max_row_buffer": num}, - initial_buffer=collections.deque(), - growth_factor=0, - ) - - def fetchone( - self, - result: CursorResult[Any], - dbapi_cursor: DBAPICursor, - hard_close: bool = False, - ) -> Any: - try: - row = dbapi_cursor.fetchone() - if row is None: - result._soft_close(hard=hard_close) - return row - except BaseException as e: - self.handle_exception(result, dbapi_cursor, e) - - def fetchmany( - self, - result: CursorResult[Any], - dbapi_cursor: DBAPICursor, - size: Optional[int] = None, - ) -> Any: - try: - if size is None: - l = dbapi_cursor.fetchmany() - else: - l = dbapi_cursor.fetchmany(size) - - if not l: - result._soft_close() - return l - except BaseException as e: - self.handle_exception(result, dbapi_cursor, e) - - def fetchall( - self, - result: CursorResult[Any], - dbapi_cursor: DBAPICursor, - ) -> Any: - try: - rows = dbapi_cursor.fetchall() - result._soft_close() - return rows - except BaseException as e: - self.handle_exception(result, dbapi_cursor, e) - - -_DEFAULT_FETCH = CursorFetchStrategy() - - -class BufferedRowCursorFetchStrategy(CursorFetchStrategy): - """A cursor fetch strategy with row buffering behavior. - - This strategy buffers the contents of a selection of rows - before ``fetchone()`` is called. This is to allow the results of - ``cursor.description`` to be available immediately, when - interfacing with a DB-API that requires rows to be consumed before - this information is available (currently psycopg2, when used with - server-side cursors). - - The pre-fetching behavior fetches only one row initially, and then - grows its buffer size by a fixed amount with each successive need - for additional rows up the ``max_row_buffer`` size, which defaults - to 1000:: - - with psycopg2_engine.connect() as conn: - - result = conn.execution_options( - stream_results=True, max_row_buffer=50 - ).execute(text("select * from table")) - - .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. - - .. seealso:: - - :ref:`psycopg2_execution_options` - """ - - __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize", "_growth_factor") - - def __init__( - self, - dbapi_cursor, - execution_options, - growth_factor=5, - initial_buffer=None, - ): - self._max_row_buffer = execution_options.get("max_row_buffer", 1000) - - if initial_buffer is not None: - self._rowbuffer = initial_buffer - else: - self._rowbuffer = collections.deque(dbapi_cursor.fetchmany(1)) - self._growth_factor = growth_factor - - if growth_factor: - self._bufsize = min(self._max_row_buffer, self._growth_factor) - else: - self._bufsize = self._max_row_buffer - - @classmethod - def create(cls, result): - return BufferedRowCursorFetchStrategy( - result.cursor, - result.context.execution_options, - ) - - def _buffer_rows(self, result, dbapi_cursor): - """this is currently used only by fetchone().""" - - size = self._bufsize - try: - if size < 1: - new_rows = dbapi_cursor.fetchall() - else: - new_rows = dbapi_cursor.fetchmany(size) - except BaseException as e: - self.handle_exception(result, dbapi_cursor, e) - - if not new_rows: - return - self._rowbuffer = collections.deque(new_rows) - if self._growth_factor and size < self._max_row_buffer: - self._bufsize = min( - self._max_row_buffer, size * self._growth_factor - ) - - def yield_per(self, result, dbapi_cursor, num): - self._growth_factor = 0 - self._max_row_buffer = self._bufsize = num - - def soft_close(self, result, dbapi_cursor): - self._rowbuffer.clear() - super().soft_close(result, dbapi_cursor) - - def hard_close(self, result, dbapi_cursor): - self._rowbuffer.clear() - super().hard_close(result, dbapi_cursor) - - def fetchone(self, result, dbapi_cursor, hard_close=False): - if not self._rowbuffer: - self._buffer_rows(result, dbapi_cursor) - if not self._rowbuffer: - try: - result._soft_close(hard=hard_close) - except BaseException as e: - self.handle_exception(result, dbapi_cursor, e) - return None - return self._rowbuffer.popleft() - - def fetchmany(self, result, dbapi_cursor, size=None): - if size is None: - return self.fetchall(result, dbapi_cursor) - - buf = list(self._rowbuffer) - lb = len(buf) - if size > lb: - try: - new = dbapi_cursor.fetchmany(size - lb) - except BaseException as e: - self.handle_exception(result, dbapi_cursor, e) - else: - if not new: - result._soft_close() - else: - buf.extend(new) - - result = buf[0:size] - self._rowbuffer = collections.deque(buf[size:]) - return result - - def fetchall(self, result, dbapi_cursor): - try: - ret = list(self._rowbuffer) + list(dbapi_cursor.fetchall()) - self._rowbuffer.clear() - result._soft_close() - return ret - except BaseException as e: - self.handle_exception(result, dbapi_cursor, e) - - -class FullyBufferedCursorFetchStrategy(CursorFetchStrategy): - """A cursor strategy that buffers rows fully upon creation. - - Used for operations where a result is to be delivered - after the database conversation can not be continued, - such as MSSQL INSERT...OUTPUT after an autocommit. - - """ - - __slots__ = ("_rowbuffer", "alternate_cursor_description") - - def __init__( - self, dbapi_cursor, alternate_description=None, initial_buffer=None - ): - self.alternate_cursor_description = alternate_description - if initial_buffer is not None: - self._rowbuffer = collections.deque(initial_buffer) - else: - self._rowbuffer = collections.deque(dbapi_cursor.fetchall()) - - def yield_per(self, result, dbapi_cursor, num): - pass - - def soft_close(self, result, dbapi_cursor): - self._rowbuffer.clear() - super().soft_close(result, dbapi_cursor) - - def hard_close(self, result, dbapi_cursor): - self._rowbuffer.clear() - super().hard_close(result, dbapi_cursor) - - def fetchone(self, result, dbapi_cursor, hard_close=False): - if self._rowbuffer: - return self._rowbuffer.popleft() - else: - result._soft_close(hard=hard_close) - return None - - def fetchmany(self, result, dbapi_cursor, size=None): - if size is None: - return self.fetchall(result, dbapi_cursor) - - buf = list(self._rowbuffer) - rows = buf[0:size] - self._rowbuffer = collections.deque(buf[size:]) - if not rows: - result._soft_close() - return rows - - def fetchall(self, result, dbapi_cursor): - ret = self._rowbuffer - self._rowbuffer = collections.deque() - result._soft_close() - return ret - - -class _NoResultMetaData(ResultMetaData): - __slots__ = () - - returns_rows = False - - def _we_dont_return_rows(self, err=None): - raise exc.ResourceClosedError( - "This result object does not return rows. " - "It has been closed automatically." - ) from err - - def _index_for_key(self, keys, raiseerr): - self._we_dont_return_rows() - - def _metadata_for_keys(self, key): - self._we_dont_return_rows() - - def _reduce(self, keys): - self._we_dont_return_rows() - - @property - def _keymap(self): - self._we_dont_return_rows() - - @property - def _key_to_index(self): - self._we_dont_return_rows() - - @property - def _processors(self): - self._we_dont_return_rows() - - @property - def keys(self): - self._we_dont_return_rows() - - -_NO_RESULT_METADATA = _NoResultMetaData() - - -def null_dml_result() -> IteratorResult[Any]: - it: IteratorResult[Any] = IteratorResult(_NoResultMetaData(), iter([])) - it._soft_close() - return it - - -class CursorResult(Result[_T]): - """A Result that is representing state from a DBAPI cursor. - - .. versionchanged:: 1.4 The :class:`.CursorResult`` - class replaces the previous :class:`.ResultProxy` interface. - This classes are based on the :class:`.Result` calling API - which provides an updated usage model and calling facade for - SQLAlchemy Core and SQLAlchemy ORM. - - Returns database rows via the :class:`.Row` class, which provides - additional API features and behaviors on top of the raw data returned by - the DBAPI. Through the use of filters such as the :meth:`.Result.scalars` - method, other kinds of objects may also be returned. - - .. seealso:: - - :ref:`tutorial_selecting_data` - introductory material for accessing - :class:`_engine.CursorResult` and :class:`.Row` objects. - - """ - - __slots__ = ( - "context", - "dialect", - "cursor", - "cursor_strategy", - "_echo", - "connection", - ) - - _metadata: Union[CursorResultMetaData, _NoResultMetaData] - _no_result_metadata = _NO_RESULT_METADATA - _soft_closed: bool = False - closed: bool = False - _is_cursor = True - - context: DefaultExecutionContext - dialect: Dialect - cursor_strategy: ResultFetchStrategy - connection: Connection - - def __init__( - self, - context: DefaultExecutionContext, - cursor_strategy: ResultFetchStrategy, - cursor_description: Optional[_DBAPICursorDescription], - ): - self.context = context - self.dialect = context.dialect - self.cursor = context.cursor - self.cursor_strategy = cursor_strategy - self.connection = context.root_connection - self._echo = echo = ( - self.connection._echo and context.engine._should_log_debug() - ) - - if cursor_description is not None: - # inline of Result._row_getter(), set up an initial row - # getter assuming no transformations will be called as this - # is the most common case - - metadata = self._init_metadata(context, cursor_description) - - _make_row: Any - _make_row = functools.partial( - Row, - metadata, - metadata._effective_processors, - metadata._key_to_index, - ) - - if context._num_sentinel_cols: - sentinel_filter = operator.itemgetter( - slice(-context._num_sentinel_cols) - ) - - def _sliced_row(raw_data): - return _make_row(sentinel_filter(raw_data)) - - sliced_row = _sliced_row - else: - sliced_row = _make_row - - if echo: - log = self.context.connection._log_debug - - def _log_row(row): - log("Row %r", sql_util._repr_row(row)) - return row - - self._row_logging_fn = _log_row - - def _make_row_2(row): - return _log_row(sliced_row(row)) - - make_row = _make_row_2 - else: - make_row = sliced_row - self._set_memoized_attribute("_row_getter", make_row) - - else: - assert context._num_sentinel_cols == 0 - self._metadata = self._no_result_metadata - - def _init_metadata(self, context, cursor_description): - if context.compiled: - compiled = context.compiled - - if compiled._cached_metadata: - metadata = compiled._cached_metadata - else: - metadata = CursorResultMetaData(self, cursor_description) - if metadata._safe_for_cache: - compiled._cached_metadata = metadata - - # result rewrite/ adapt step. this is to suit the case - # when we are invoked against a cached Compiled object, we want - # to rewrite the ResultMetaData to reflect the Column objects - # that are in our current SQL statement object, not the one - # that is associated with the cached Compiled object. - # the Compiled object may also tell us to not - # actually do this step; this is to support the ORM where - # it is to produce a new Result object in any case, and will - # be using the cached Column objects against this database result - # so we don't want to rewrite them. - # - # Basically this step suits the use case where the end user - # is using Core SQL expressions and is accessing columns in the - # result row using row._mapping[table.c.column]. - if ( - not context.execution_options.get( - "_result_disable_adapt_to_context", False - ) - and compiled._result_columns - and context.cache_hit is context.dialect.CACHE_HIT - and compiled.statement is not context.invoked_statement - ): - metadata = metadata._adapt_to_context(context) - - self._metadata = metadata - - else: - self._metadata = metadata = CursorResultMetaData( - self, cursor_description - ) - if self._echo: - context.connection._log_debug( - "Col %r", tuple(x[0] for x in cursor_description) - ) - return metadata - - def _soft_close(self, hard=False): - """Soft close this :class:`_engine.CursorResult`. - - This releases all DBAPI cursor resources, but leaves the - CursorResult "open" from a semantic perspective, meaning the - fetchXXX() methods will continue to return empty results. - - This method is called automatically when: - - * all result rows are exhausted using the fetchXXX() methods. - * cursor.description is None. - - This method is **not public**, but is documented in order to clarify - the "autoclose" process used. - - .. seealso:: - - :meth:`_engine.CursorResult.close` - - - """ - - if (not hard and self._soft_closed) or (hard and self.closed): - return - - if hard: - self.closed = True - self.cursor_strategy.hard_close(self, self.cursor) - else: - self.cursor_strategy.soft_close(self, self.cursor) - - if not self._soft_closed: - cursor = self.cursor - self.cursor = None # type: ignore - self.connection._safe_close_cursor(cursor) - self._soft_closed = True - - @property - def inserted_primary_key_rows(self): - """Return the value of - :attr:`_engine.CursorResult.inserted_primary_key` - as a row contained within a list; some dialects may support a - multiple row form as well. - - .. note:: As indicated below, in current SQLAlchemy versions this - accessor is only useful beyond what's already supplied by - :attr:`_engine.CursorResult.inserted_primary_key` when using the - :ref:`postgresql_psycopg2` dialect. Future versions hope to - generalize this feature to more dialects. - - This accessor is added to support dialects that offer the feature - that is currently implemented by the :ref:`psycopg2_executemany_mode` - feature, currently **only the psycopg2 dialect**, which provides - for many rows to be INSERTed at once while still retaining the - behavior of being able to return server-generated primary key values. - - * **When using the psycopg2 dialect, or other dialects that may support - "fast executemany" style inserts in upcoming releases** : When - invoking an INSERT statement while passing a list of rows as the - second argument to :meth:`_engine.Connection.execute`, this accessor - will then provide a list of rows, where each row contains the primary - key value for each row that was INSERTed. - - * **When using all other dialects / backends that don't yet support - this feature**: This accessor is only useful for **single row INSERT - statements**, and returns the same information as that of the - :attr:`_engine.CursorResult.inserted_primary_key` within a - single-element list. When an INSERT statement is executed in - conjunction with a list of rows to be INSERTed, the list will contain - one row per row inserted in the statement, however it will contain - ``None`` for any server-generated values. - - Future releases of SQLAlchemy will further generalize the - "fast execution helper" feature of psycopg2 to suit other dialects, - thus allowing this accessor to be of more general use. - - .. versionadded:: 1.4 - - .. seealso:: - - :attr:`_engine.CursorResult.inserted_primary_key` - - """ - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled expression construct." - ) - elif not self.context.isinsert: - raise exc.InvalidRequestError( - "Statement is not an insert() expression construct." - ) - elif self.context._is_explicit_returning: - raise exc.InvalidRequestError( - "Can't call inserted_primary_key " - "when returning() " - "is used." - ) - return self.context.inserted_primary_key_rows - - @property - def inserted_primary_key(self): - """Return the primary key for the row just inserted. - - The return value is a :class:`_result.Row` object representing - a named tuple of primary key values in the order in which the - primary key columns are configured in the source - :class:`_schema.Table`. - - .. versionchanged:: 1.4.8 - the - :attr:`_engine.CursorResult.inserted_primary_key` - value is now a named tuple via the :class:`_result.Row` class, - rather than a plain tuple. - - This accessor only applies to single row :func:`_expression.insert` - constructs which did not explicitly specify - :meth:`_expression.Insert.returning`. Support for multirow inserts, - while not yet available for most backends, would be accessed using - the :attr:`_engine.CursorResult.inserted_primary_key_rows` accessor. - - Note that primary key columns which specify a server_default clause, or - otherwise do not qualify as "autoincrement" columns (see the notes at - :class:`_schema.Column`), and were generated using the database-side - default, will appear in this list as ``None`` unless the backend - supports "returning" and the insert statement executed with the - "implicit returning" enabled. - - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an insert() construct. - - """ - - if self.context.executemany: - raise exc.InvalidRequestError( - "This statement was an executemany call; if primary key " - "returning is supported, please " - "use .inserted_primary_key_rows." - ) - - ikp = self.inserted_primary_key_rows - if ikp: - return ikp[0] - else: - return None - - def last_updated_params(self): - """Return the collection of updated parameters from this - execution. - - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an update() construct. - - """ - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled expression construct." - ) - elif not self.context.isupdate: - raise exc.InvalidRequestError( - "Statement is not an update() expression construct." - ) - elif self.context.executemany: - return self.context.compiled_parameters - else: - return self.context.compiled_parameters[0] - - def last_inserted_params(self): - """Return the collection of inserted parameters from this - execution. - - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an insert() construct. - - """ - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled expression construct." - ) - elif not self.context.isinsert: - raise exc.InvalidRequestError( - "Statement is not an insert() expression construct." - ) - elif self.context.executemany: - return self.context.compiled_parameters - else: - return self.context.compiled_parameters[0] - - @property - def returned_defaults_rows(self): - """Return a list of rows each containing the values of default - columns that were fetched using - the :meth:`.ValuesBase.return_defaults` feature. - - The return value is a list of :class:`.Row` objects. - - .. versionadded:: 1.4 - - """ - return self.context.returned_default_rows - - def splice_horizontally(self, other): - """Return a new :class:`.CursorResult` that "horizontally splices" - together the rows of this :class:`.CursorResult` with that of another - :class:`.CursorResult`. - - .. tip:: This method is for the benefit of the SQLAlchemy ORM and is - not intended for general use. - - "horizontally splices" means that for each row in the first and second - result sets, a new row that concatenates the two rows together is - produced, which then becomes the new row. The incoming - :class:`.CursorResult` must have the identical number of rows. It is - typically expected that the two result sets come from the same sort - order as well, as the result rows are spliced together based on their - position in the result. - - The expected use case here is so that multiple INSERT..RETURNING - statements (which definitely need to be sorted) against different - tables can produce a single result that looks like a JOIN of those two - tables. - - E.g.:: - - r1 = connection.execute( - users.insert().returning( - users.c.user_name, - users.c.user_id, - sort_by_parameter_order=True - ), - user_values - ) - - r2 = connection.execute( - addresses.insert().returning( - addresses.c.address_id, - addresses.c.address, - addresses.c.user_id, - sort_by_parameter_order=True - ), - address_values - ) - - rows = r1.splice_horizontally(r2).all() - assert ( - rows == - [ - ("john", 1, 1, "foo@bar.com", 1), - ("jack", 2, 2, "bar@bat.com", 2), - ] - ) - - .. versionadded:: 2.0 - - .. seealso:: - - :meth:`.CursorResult.splice_vertically` - - - """ - - clone = self._generate() - total_rows = [ - tuple(r1) + tuple(r2) - for r1, r2 in zip( - list(self._raw_row_iterator()), - list(other._raw_row_iterator()), - ) - ] - - clone._metadata = clone._metadata._splice_horizontally(other._metadata) - - clone.cursor_strategy = FullyBufferedCursorFetchStrategy( - None, - initial_buffer=total_rows, - ) - clone._reset_memoizations() - return clone - - def splice_vertically(self, other): - """Return a new :class:`.CursorResult` that "vertically splices", - i.e. "extends", the rows of this :class:`.CursorResult` with that of - another :class:`.CursorResult`. - - .. tip:: This method is for the benefit of the SQLAlchemy ORM and is - not intended for general use. - - "vertically splices" means the rows of the given result are appended to - the rows of this cursor result. The incoming :class:`.CursorResult` - must have rows that represent the identical list of columns in the - identical order as they are in this :class:`.CursorResult`. - - .. versionadded:: 2.0 - - .. seealso:: - - :meth:`.CursorResult.splice_horizontally` - - """ - clone = self._generate() - total_rows = list(self._raw_row_iterator()) + list( - other._raw_row_iterator() - ) - - clone.cursor_strategy = FullyBufferedCursorFetchStrategy( - None, - initial_buffer=total_rows, - ) - clone._reset_memoizations() - return clone - - def _rewind(self, rows): - """rewind this result back to the given rowset. - - this is used internally for the case where an :class:`.Insert` - construct combines the use of - :meth:`.Insert.return_defaults` along with the - "supplemental columns" feature. - - """ - - if self._echo: - self.context.connection._log_debug( - "CursorResult rewound %d row(s)", len(rows) - ) - - # the rows given are expected to be Row objects, so we - # have to clear out processors which have already run on these - # rows - self._metadata = cast( - CursorResultMetaData, self._metadata - )._remove_processors() - - self.cursor_strategy = FullyBufferedCursorFetchStrategy( - None, - # TODO: if these are Row objects, can we save on not having to - # re-make new Row objects out of them a second time? is that - # what's actually happening right now? maybe look into this - initial_buffer=rows, - ) - self._reset_memoizations() - return self - - @property - def returned_defaults(self): - """Return the values of default columns that were fetched using - the :meth:`.ValuesBase.return_defaults` feature. - - The value is an instance of :class:`.Row`, or ``None`` - if :meth:`.ValuesBase.return_defaults` was not used or if the - backend does not support RETURNING. - - .. seealso:: - - :meth:`.ValuesBase.return_defaults` - - """ - - if self.context.executemany: - raise exc.InvalidRequestError( - "This statement was an executemany call; if return defaults " - "is supported, please use .returned_defaults_rows." - ) - - rows = self.context.returned_default_rows - if rows: - return rows[0] - else: - return None - - def lastrow_has_defaults(self): - """Return ``lastrow_has_defaults()`` from the underlying - :class:`.ExecutionContext`. - - See :class:`.ExecutionContext` for details. - - """ - - return self.context.lastrow_has_defaults() - - def postfetch_cols(self): - """Return ``postfetch_cols()`` from the underlying - :class:`.ExecutionContext`. - - See :class:`.ExecutionContext` for details. - - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an insert() or update() construct. - - """ - - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled expression construct." - ) - elif not self.context.isinsert and not self.context.isupdate: - raise exc.InvalidRequestError( - "Statement is not an insert() or update() " - "expression construct." - ) - return self.context.postfetch_cols - - def prefetch_cols(self): - """Return ``prefetch_cols()`` from the underlying - :class:`.ExecutionContext`. - - See :class:`.ExecutionContext` for details. - - Raises :class:`~sqlalchemy.exc.InvalidRequestError` if the executed - statement is not a compiled expression construct - or is not an insert() or update() construct. - - """ - - if not self.context.compiled: - raise exc.InvalidRequestError( - "Statement is not a compiled expression construct." - ) - elif not self.context.isinsert and not self.context.isupdate: - raise exc.InvalidRequestError( - "Statement is not an insert() or update() " - "expression construct." - ) - return self.context.prefetch_cols - - def supports_sane_rowcount(self): - """Return ``supports_sane_rowcount`` from the dialect. - - See :attr:`_engine.CursorResult.rowcount` for background. - - """ - - return self.dialect.supports_sane_rowcount - - def supports_sane_multi_rowcount(self): - """Return ``supports_sane_multi_rowcount`` from the dialect. - - See :attr:`_engine.CursorResult.rowcount` for background. - - """ - - return self.dialect.supports_sane_multi_rowcount - - @util.memoized_property - def rowcount(self) -> int: - """Return the 'rowcount' for this result. - - The primary purpose of 'rowcount' is to report the number of rows - matched by the WHERE criterion of an UPDATE or DELETE statement - executed once (i.e. for a single parameter set), which may then be - compared to the number of rows expected to be updated or deleted as a - means of asserting data integrity. - - This attribute is transferred from the ``cursor.rowcount`` attribute - of the DBAPI before the cursor is closed, to support DBAPIs that - don't make this value available after cursor close. Some DBAPIs may - offer meaningful values for other kinds of statements, such as INSERT - and SELECT statements as well. In order to retrieve ``cursor.rowcount`` - for these statements, set the - :paramref:`.Connection.execution_options.preserve_rowcount` - execution option to True, which will cause the ``cursor.rowcount`` - value to be unconditionally memoized before any results are returned - or the cursor is closed, regardless of statement type. - - For cases where the DBAPI does not support rowcount for a particular - kind of statement and/or execution, the returned value will be ``-1``, - which is delivered directly from the DBAPI and is part of :pep:`249`. - All DBAPIs should support rowcount for single-parameter-set - UPDATE and DELETE statements, however. - - .. note:: - - Notes regarding :attr:`_engine.CursorResult.rowcount`: - - - * This attribute returns the number of rows *matched*, - which is not necessarily the same as the number of rows - that were actually *modified*. For example, an UPDATE statement - may have no net change on a given row if the SET values - given are the same as those present in the row already. - Such a row would be matched but not modified. - On backends that feature both styles, such as MySQL, - rowcount is configured to return the match - count in all cases. - - * :attr:`_engine.CursorResult.rowcount` in the default case is - *only* useful in conjunction with an UPDATE or DELETE statement, - and only with a single set of parameters. For other kinds of - statements, SQLAlchemy will not attempt to pre-memoize the value - unless the - :paramref:`.Connection.execution_options.preserve_rowcount` - execution option is used. Note that contrary to :pep:`249`, many - DBAPIs do not support rowcount values for statements that are not - UPDATE or DELETE, particularly when rows are being returned which - are not fully pre-buffered. DBAPIs that dont support rowcount - for a particular kind of statement should return the value ``-1`` - for such statements. - - * :attr:`_engine.CursorResult.rowcount` may not be meaningful - when executing a single statement with multiple parameter sets - (i.e. an :term:`executemany`). Most DBAPIs do not sum "rowcount" - values across multiple parameter sets and will return ``-1`` - when accessed. - - * SQLAlchemy's :ref:`engine_insertmanyvalues` feature does support - a correct population of :attr:`_engine.CursorResult.rowcount` - when the :paramref:`.Connection.execution_options.preserve_rowcount` - execution option is set to True. - - * Statements that use RETURNING may not support rowcount, returning - a ``-1`` value instead. - - .. seealso:: - - :ref:`tutorial_update_delete_rowcount` - in the :ref:`unified_tutorial` - - :paramref:`.Connection.execution_options.preserve_rowcount` - - """ # noqa: E501 - try: - return self.context.rowcount - except BaseException as e: - self.cursor_strategy.handle_exception(self, self.cursor, e) - raise # not called - - @property - def lastrowid(self): - """Return the 'lastrowid' accessor on the DBAPI cursor. - - This is a DBAPI specific method and is only functional - for those backends which support it, for statements - where it is appropriate. It's behavior is not - consistent across backends. - - Usage of this method is normally unnecessary when - using insert() expression constructs; the - :attr:`~CursorResult.inserted_primary_key` attribute provides a - tuple of primary key values for a newly inserted row, - regardless of database backend. - - """ - try: - return self.context.get_lastrowid() - except BaseException as e: - self.cursor_strategy.handle_exception(self, self.cursor, e) - - @property - def returns_rows(self): - """True if this :class:`_engine.CursorResult` returns zero or more - rows. - - I.e. if it is legal to call the methods - :meth:`_engine.CursorResult.fetchone`, - :meth:`_engine.CursorResult.fetchmany` - :meth:`_engine.CursorResult.fetchall`. - - Overall, the value of :attr:`_engine.CursorResult.returns_rows` should - always be synonymous with whether or not the DBAPI cursor had a - ``.description`` attribute, indicating the presence of result columns, - noting that a cursor that returns zero rows still has a - ``.description`` if a row-returning statement was emitted. - - This attribute should be True for all results that are against - SELECT statements, as well as for DML statements INSERT/UPDATE/DELETE - that use RETURNING. For INSERT/UPDATE/DELETE statements that were - not using RETURNING, the value will usually be False, however - there are some dialect-specific exceptions to this, such as when - using the MSSQL / pyodbc dialect a SELECT is emitted inline in - order to retrieve an inserted primary key value. - - - """ - return self._metadata.returns_rows - - @property - def is_insert(self): - """True if this :class:`_engine.CursorResult` is the result - of a executing an expression language compiled - :func:`_expression.insert` construct. - - When True, this implies that the - :attr:`inserted_primary_key` attribute is accessible, - assuming the statement did not include - a user defined "returning" construct. - - """ - return self.context.isinsert - - def _fetchiter_impl(self): - fetchone = self.cursor_strategy.fetchone - - while True: - row = fetchone(self, self.cursor) - if row is None: - break - yield row - - def _fetchone_impl(self, hard_close=False): - return self.cursor_strategy.fetchone(self, self.cursor, hard_close) - - def _fetchall_impl(self): - return self.cursor_strategy.fetchall(self, self.cursor) - - def _fetchmany_impl(self, size=None): - return self.cursor_strategy.fetchmany(self, self.cursor, size) - - def _raw_row_iterator(self): - return self._fetchiter_impl() - - def merge(self, *others: Result[Any]) -> MergedResult[Any]: - merged_result = super().merge(*others) - if self.context._has_rowcount: - merged_result.rowcount = sum( - cast("CursorResult[Any]", result).rowcount - for result in (self,) + others - ) - return merged_result - - def close(self) -> Any: - """Close this :class:`_engine.CursorResult`. - - This closes out the underlying DBAPI cursor corresponding to the - statement execution, if one is still present. Note that the DBAPI - cursor is automatically released when the :class:`_engine.CursorResult` - exhausts all available rows. :meth:`_engine.CursorResult.close` is - generally an optional method except in the case when discarding a - :class:`_engine.CursorResult` that still has additional rows pending - for fetch. - - After this method is called, it is no longer valid to call upon - the fetch methods, which will raise a :class:`.ResourceClosedError` - on subsequent use. - - .. seealso:: - - :ref:`connections_toplevel` - - """ - self._soft_close(hard=True) - - @_generative - def yield_per(self, num: int) -> Self: - self._yield_per = num - self.cursor_strategy.yield_per(self, self.cursor, num) - return self - - -ResultProxy = CursorResult -- cgit v1.2.3