From 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:10:44 -0400 Subject: venv --- .../site-packages/sqlalchemy/orm/loading.py | 1665 ++++++++++++++++++++ 1 file changed, 1665 insertions(+) create mode 100644 venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py') diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py new file mode 100644 index 0000000..4e2cb82 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py @@ -0,0 +1,1665 @@ +# orm/loading.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + + +"""private module containing functions used to convert database +rows into object instances and associated state. + +the functions here are called primarily by Query, Mapper, +as well as some of the attribute loading strategies. + +""" + +from __future__ import annotations + +from typing import Any +from typing import Dict +from typing import Iterable +from typing import List +from typing import Mapping +from typing import Optional +from typing import Sequence +from typing import Tuple +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union + +from . import attributes +from . import exc as orm_exc +from . import path_registry +from .base import _DEFER_FOR_STATE +from .base import _RAISE_FOR_STATE +from .base import _SET_DEFERRED_EXPIRED +from .base import PassiveFlag +from .context import FromStatement +from .context import ORMCompileState +from .context import QueryContext +from .util import _none_set +from .util import state_str +from .. import exc as sa_exc +from .. import util +from ..engine import result_tuple +from ..engine.result import ChunkedIteratorResult +from ..engine.result import FrozenResult +from ..engine.result import SimpleResultMetaData +from ..sql import select +from ..sql import util as sql_util +from ..sql.selectable import ForUpdateArg +from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL +from ..sql.selectable import SelectState +from ..util import EMPTY_DICT + +if TYPE_CHECKING: + from ._typing import _IdentityKeyType + from .base import LoaderCallableStatus + from .interfaces import ORMOption + from .mapper import Mapper + from .query import Query + from .session import Session + from .state import InstanceState + from ..engine.cursor import CursorResult + from ..engine.interfaces import _ExecuteOptions + from ..engine.result import Result + from ..sql import Select + +_T = TypeVar("_T", bound=Any) +_O = TypeVar("_O", bound=object) +_new_runid = util.counter() + + +_PopulatorDict = Dict[str, List[Tuple[str, Any]]] + + +def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]: + """Return a :class:`.Result` given an ORM query context. + + :param cursor: a :class:`.CursorResult`, generated by a statement + which came from :class:`.ORMCompileState` + + :param context: a :class:`.QueryContext` object + + :return: a :class:`.Result` object representing ORM results + + .. versionchanged:: 1.4 The instances() function now uses + :class:`.Result` objects and has an all new interface. + + """ + + context.runid = _new_runid() + + if context.top_level_context: + is_top_level = False + context.post_load_paths = context.top_level_context.post_load_paths + else: + is_top_level = True + context.post_load_paths = {} + + compile_state = context.compile_state + filtered = compile_state._has_mapper_entities + single_entity = ( + not context.load_options._only_return_tuples + and len(compile_state._entities) == 1 + and compile_state._entities[0].supports_single_entity + ) + + try: + (process, labels, extra) = list( + zip( + *[ + query_entity.row_processor(context, cursor) + for query_entity in context.compile_state._entities + ] + ) + ) + + if context.yield_per and ( + context.loaders_require_buffering + or context.loaders_require_uniquing + ): + raise sa_exc.InvalidRequestError( + "Can't use yield_per with eager loaders that require uniquing " + "or row buffering, e.g. joinedload() against collections " + "or subqueryload(). Consider the selectinload() strategy " + "for better flexibility in loading objects." + ) + + except Exception: + with util.safe_reraise(): + cursor.close() + + def _no_unique(entry): + raise sa_exc.InvalidRequestError( + "Can't use the ORM yield_per feature in conjunction with unique()" + ) + + def _not_hashable(datatype, *, legacy=False, uncertain=False): + if not legacy: + + def go(obj): + if uncertain: + try: + return hash(obj) + except: + pass + + raise sa_exc.InvalidRequestError( + "Can't apply uniqueness to row tuple containing value of " + f"""type {datatype!r}; { + 'the values returned appear to be' + if uncertain + else 'this datatype produces' + } non-hashable values""" + ) + + return go + elif not uncertain: + return id + else: + _use_id = False + + def go(obj): + nonlocal _use_id + + if not _use_id: + try: + return hash(obj) + except: + pass + + # in #10459, we considered using a warning here, however + # as legacy query uses result.unique() in all cases, this + # would lead to too many warning cases. + _use_id = True + + return id(obj) + + return go + + unique_filters = [ + ( + _no_unique + if context.yield_per + else ( + _not_hashable( + ent.column.type, # type: ignore + legacy=context.load_options._legacy_uniquing, + uncertain=ent._null_column_type, + ) + if ( + not ent.use_id_for_hash + and (ent._non_hashable_value or ent._null_column_type) + ) + else id if ent.use_id_for_hash else None + ) + ) + for ent in context.compile_state._entities + ] + + row_metadata = SimpleResultMetaData( + labels, extra, _unique_filters=unique_filters + ) + + def chunks(size): # type: ignore + while True: + yield_per = size + + context.partials = {} + + if yield_per: + fetch = cursor.fetchmany(yield_per) + + if not fetch: + break + else: + fetch = cursor._raw_all_rows() + + if single_entity: + proc = process[0] + rows = [proc(row) for row in fetch] + else: + rows = [ + tuple([proc(row) for proc in process]) for row in fetch + ] + + # if we are the originating load from a query, meaning we + # aren't being called as a result of a nested "post load", + # iterate through all the collected post loaders and fire them + # off. Previously this used to work recursively, however that + # prevented deeply nested structures from being loadable + if is_top_level: + if yield_per: + # if using yield per, memoize the state of the + # collection so that it can be restored + top_level_post_loads = list( + context.post_load_paths.items() + ) + + while context.post_load_paths: + post_loads = list(context.post_load_paths.items()) + context.post_load_paths.clear() + for path, post_load in post_loads: + post_load.invoke(context, path) + + if yield_per: + context.post_load_paths.clear() + context.post_load_paths.update(top_level_post_loads) + + yield rows + + if not yield_per: + break + + if context.execution_options.get("prebuffer_rows", False): + # this is a bit of a hack at the moment. + # I would rather have some option in the result to pre-buffer + # internally. + _prebuffered = list(chunks(None)) + + def chunks(size): + return iter(_prebuffered) + + result = ChunkedIteratorResult( + row_metadata, + chunks, + source_supports_scalars=single_entity, + raw=cursor, + dynamic_yield_per=cursor.context._is_server_side, + ) + + # filtered and single_entity are used to indicate to legacy Query that the + # query has ORM entities, so legacy deduping and scalars should be called + # on the result. + result._attributes = result._attributes.union( + dict(filtered=filtered, is_single_entity=single_entity) + ) + + # multi_row_eager_loaders OTOH is specific to joinedload. + if context.compile_state.multi_row_eager_loaders: + + def require_unique(obj): + raise sa_exc.InvalidRequestError( + "The unique() method must be invoked on this Result, " + "as it contains results that include joined eager loads " + "against collections" + ) + + result._unique_filter_state = (None, require_unique) + + if context.yield_per: + result.yield_per(context.yield_per) + + return result + + +@util.preload_module("sqlalchemy.orm.context") +def merge_frozen_result(session, statement, frozen_result, load=True): + """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`, + returning a new :class:`_engine.Result` object with :term:`persistent` + objects. + + See the section :ref:`do_orm_execute_re_executing` for an example. + + .. seealso:: + + :ref:`do_orm_execute_re_executing` + + :meth:`_engine.Result.freeze` + + :class:`_engine.FrozenResult` + + """ + querycontext = util.preloaded.orm_context + + if load: + # flush current contents if we expect to load data + session._autoflush() + + ctx = querycontext.ORMSelectCompileState._create_entities_collection( + statement, legacy=False + ) + + autoflush = session.autoflush + try: + session.autoflush = False + mapped_entities = [ + i + for i, e in enumerate(ctx._entities) + if isinstance(e, querycontext._MapperEntity) + ] + keys = [ent._label_name for ent in ctx._entities] + + keyed_tuple = result_tuple( + keys, [ent._extra_entities for ent in ctx._entities] + ) + + result = [] + for newrow in frozen_result.rewrite_rows(): + for i in mapped_entities: + if newrow[i] is not None: + newrow[i] = session._merge( + attributes.instance_state(newrow[i]), + attributes.instance_dict(newrow[i]), + load=load, + _recursive={}, + _resolve_conflict_map={}, + ) + + result.append(keyed_tuple(newrow)) + + return frozen_result.with_new_rows(result) + finally: + session.autoflush = autoflush + + +@util.became_legacy_20( + ":func:`_orm.merge_result`", + alternative="The function as well as the method on :class:`_orm.Query` " + "is superseded by the :func:`_orm.merge_frozen_result` function.", +) +@util.preload_module("sqlalchemy.orm.context") +def merge_result( + query: Query[Any], + iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]], + load: bool = True, +) -> Union[FrozenResult, Iterable[Any]]: + """Merge a result into the given :class:`.Query` object's Session. + + See :meth:`_orm.Query.merge_result` for top-level documentation on this + function. + + """ + + querycontext = util.preloaded.orm_context + + session = query.session + if load: + # flush current contents if we expect to load data + session._autoflush() + + # TODO: need test coverage and documentation for the FrozenResult + # use case. + if isinstance(iterator, FrozenResult): + frozen_result = iterator + iterator = iter(frozen_result.data) + else: + frozen_result = None + + ctx = querycontext.ORMSelectCompileState._create_entities_collection( + query, legacy=True + ) + + autoflush = session.autoflush + try: + session.autoflush = False + single_entity = not frozen_result and len(ctx._entities) == 1 + + if single_entity: + if isinstance(ctx._entities[0], querycontext._MapperEntity): + result = [ + session._merge( + attributes.instance_state(instance), + attributes.instance_dict(instance), + load=load, + _recursive={}, + _resolve_conflict_map={}, + ) + for instance in iterator + ] + else: + result = list(iterator) + else: + mapped_entities = [ + i + for i, e in enumerate(ctx._entities) + if isinstance(e, querycontext._MapperEntity) + ] + result = [] + keys = [ent._label_name for ent in ctx._entities] + + keyed_tuple = result_tuple( + keys, [ent._extra_entities for ent in ctx._entities] + ) + + for row in iterator: + newrow = list(row) + for i in mapped_entities: + if newrow[i] is not None: + newrow[i] = session._merge( + attributes.instance_state(newrow[i]), + attributes.instance_dict(newrow[i]), + load=load, + _recursive={}, + _resolve_conflict_map={}, + ) + result.append(keyed_tuple(newrow)) + + if frozen_result: + return frozen_result.with_new_rows(result) + else: + return iter(result) + finally: + session.autoflush = autoflush + + +def get_from_identity( + session: Session, + mapper: Mapper[_O], + key: _IdentityKeyType[_O], + passive: PassiveFlag, +) -> Union[LoaderCallableStatus, Optional[_O]]: + """Look up the given key in the given session's identity map, + check the object for expired state if found. + + """ + instance = session.identity_map.get(key) + if instance is not None: + state = attributes.instance_state(instance) + + if mapper.inherits and not state.mapper.isa(mapper): + return attributes.PASSIVE_CLASS_MISMATCH + + # expired - ensure it still exists + if state.expired: + if not passive & attributes.SQL_OK: + # TODO: no coverage here + return attributes.PASSIVE_NO_RESULT + elif not passive & attributes.RELATED_OBJECT_OK: + # this mode is used within a flush and the instance's + # expired state will be checked soon enough, if necessary. + # also used by immediateloader for a mutually-dependent + # o2m->m2m load, :ticket:`6301` + return instance + try: + state._load_expired(state, passive) + except orm_exc.ObjectDeletedError: + session._remove_newly_deleted([state]) + return None + return instance + else: + return None + + +def load_on_ident( + session: Session, + statement: Union[Select, FromStatement], + key: Optional[_IdentityKeyType], + *, + load_options: Optional[Sequence[ORMOption]] = None, + refresh_state: Optional[InstanceState[Any]] = None, + with_for_update: Optional[ForUpdateArg] = None, + only_load_props: Optional[Iterable[str]] = None, + no_autoflush: bool = False, + bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, + execution_options: _ExecuteOptions = util.EMPTY_DICT, + require_pk_cols: bool = False, + is_user_refresh: bool = False, +): + """Load the given identity key from the database.""" + if key is not None: + ident = key[1] + identity_token = key[2] + else: + ident = identity_token = None + + return load_on_pk_identity( + session, + statement, + ident, + load_options=load_options, + refresh_state=refresh_state, + with_for_update=with_for_update, + only_load_props=only_load_props, + identity_token=identity_token, + no_autoflush=no_autoflush, + bind_arguments=bind_arguments, + execution_options=execution_options, + require_pk_cols=require_pk_cols, + is_user_refresh=is_user_refresh, + ) + + +def load_on_pk_identity( + session: Session, + statement: Union[Select, FromStatement], + primary_key_identity: Optional[Tuple[Any, ...]], + *, + load_options: Optional[Sequence[ORMOption]] = None, + refresh_state: Optional[InstanceState[Any]] = None, + with_for_update: Optional[ForUpdateArg] = None, + only_load_props: Optional[Iterable[str]] = None, + identity_token: Optional[Any] = None, + no_autoflush: bool = False, + bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, + execution_options: _ExecuteOptions = util.EMPTY_DICT, + require_pk_cols: bool = False, + is_user_refresh: bool = False, +): + """Load the given primary key identity from the database.""" + + query = statement + q = query._clone() + + assert not q._is_lambda_element + + if load_options is None: + load_options = QueryContext.default_load_options + + if ( + statement._compile_options + is SelectState.default_select_compile_options + ): + compile_options = ORMCompileState.default_compile_options + else: + compile_options = statement._compile_options + + if primary_key_identity is not None: + mapper = query._propagate_attrs["plugin_subject"] + + (_get_clause, _get_params) = mapper._get_clause + + # None present in ident - turn those comparisons + # into "IS NULL" + if None in primary_key_identity: + nones = { + _get_params[col].key + for col, value in zip(mapper.primary_key, primary_key_identity) + if value is None + } + + _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones) + + if len(nones) == len(primary_key_identity): + util.warn( + "fully NULL primary key identity cannot load any " + "object. This condition may raise an error in a future " + "release." + ) + + q._where_criteria = ( + sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}), + ) + + params = { + _get_params[primary_key].key: id_val + for id_val, primary_key in zip( + primary_key_identity, mapper.primary_key + ) + } + else: + params = None + + if with_for_update is not None: + version_check = True + q._for_update_arg = with_for_update + elif query._for_update_arg is not None: + version_check = True + q._for_update_arg = query._for_update_arg + else: + version_check = False + + if require_pk_cols and only_load_props: + if not refresh_state: + raise sa_exc.ArgumentError( + "refresh_state is required when require_pk_cols is present" + ) + + refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys + has_changes = { + key + for key in refresh_state_prokeys.difference(only_load_props) + if refresh_state.attrs[key].history.has_changes() + } + if has_changes: + # raise if pending pk changes are present. + # technically, this could be limited to the case where we have + # relationships in the only_load_props collection to be refreshed + # also (and only ones that have a secondary eager loader, at that). + # however, the error is in place across the board so that behavior + # here is easier to predict. The use case it prevents is one + # of mutating PK attrs, leaving them unflushed, + # calling session.refresh(), and expecting those attrs to remain + # still unflushed. It seems likely someone doing all those + # things would be better off having the PK attributes flushed + # to the database before tinkering like that (session.refresh() is + # tinkering). + raise sa_exc.InvalidRequestError( + f"Please flush pending primary key changes on " + "attributes " + f"{has_changes} for mapper {refresh_state.mapper} before " + "proceeding with a refresh" + ) + + # overall, the ORM has no internal flow right now for "dont load the + # primary row of an object at all, but fire off + # selectinload/subqueryload/immediateload for some relationships". + # It would probably be a pretty big effort to add such a flow. So + # here, the case for #8703 is introduced; user asks to refresh some + # relationship attributes only which are + # selectinload/subqueryload/immediateload/ etc. (not joinedload). + # ORM complains there's no columns in the primary row to load. + # So here, we just add the PK cols if that + # case is detected, so that there is a SELECT emitted for the primary + # row. + # + # Let's just state right up front, for this one little case, + # the ORM here is adding a whole extra SELECT just to satisfy + # limitations in the internal flow. This is really not a thing + # SQLAlchemy finds itself doing like, ever, obviously, we are + # constantly working to *remove* SELECTs we don't need. We + # rationalize this for now based on 1. session.refresh() is not + # commonly used 2. session.refresh() with only relationship attrs is + # even less commonly used 3. the SELECT in question is very low + # latency. + # + # to add the flow to not include the SELECT, the quickest way + # might be to just manufacture a single-row result set to send off to + # instances(), but we'd have to weave that into context.py and all + # that. For 2.0.0, we have enough big changes to navigate for now. + # + mp = refresh_state.mapper._props + for p in only_load_props: + if mp[p]._is_relationship: + only_load_props = refresh_state_prokeys.union(only_load_props) + break + + if refresh_state and refresh_state.load_options: + compile_options += {"_current_path": refresh_state.load_path.parent} + q = q.options(*refresh_state.load_options) + + new_compile_options, load_options = _set_get_options( + compile_options, + load_options, + version_check=version_check, + only_load_props=only_load_props, + refresh_state=refresh_state, + identity_token=identity_token, + is_user_refresh=is_user_refresh, + ) + + q._compile_options = new_compile_options + q._order_by = None + + if no_autoflush: + load_options += {"_autoflush": False} + + execution_options = util.EMPTY_DICT.merge_with( + execution_options, {"_sa_orm_load_options": load_options} + ) + result = ( + session.execute( + q, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + ) + .unique() + .scalars() + ) + + try: + return result.one() + except orm_exc.NoResultFound: + return None + + +def _set_get_options( + compile_opt, + load_opt, + populate_existing=None, + version_check=None, + only_load_props=None, + refresh_state=None, + identity_token=None, + is_user_refresh=None, +): + compile_options = {} + load_options = {} + if version_check: + load_options["_version_check"] = version_check + if populate_existing: + load_options["_populate_existing"] = populate_existing + if refresh_state: + load_options["_refresh_state"] = refresh_state + compile_options["_for_refresh_state"] = True + if only_load_props: + compile_options["_only_load_props"] = frozenset(only_load_props) + if identity_token: + load_options["_identity_token"] = identity_token + + if is_user_refresh: + load_options["_is_user_refresh"] = is_user_refresh + if load_options: + load_opt += load_options + if compile_options: + compile_opt += compile_options + + return compile_opt, load_opt + + +def _setup_entity_query( + compile_state, + mapper, + query_entity, + path, + adapter, + column_collection, + with_polymorphic=None, + only_load_props=None, + polymorphic_discriminator=None, + **kw, +): + if with_polymorphic: + poly_properties = mapper._iterate_polymorphic_properties( + with_polymorphic + ) + else: + poly_properties = mapper._polymorphic_properties + + quick_populators = {} + + path.set(compile_state.attributes, "memoized_setups", quick_populators) + + # for the lead entities in the path, e.g. not eager loads, and + # assuming a user-passed aliased class, e.g. not a from_self() or any + # implicit aliasing, don't add columns to the SELECT that aren't + # in the thing that's aliased. + check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class + + for value in poly_properties: + if only_load_props and value.key not in only_load_props: + continue + value.setup( + compile_state, + query_entity, + path, + adapter, + only_load_props=only_load_props, + column_collection=column_collection, + memoized_populators=quick_populators, + check_for_adapt=check_for_adapt, + **kw, + ) + + if ( + polymorphic_discriminator is not None + and polymorphic_discriminator is not mapper.polymorphic_on + ): + if adapter: + pd = adapter.columns[polymorphic_discriminator] + else: + pd = polymorphic_discriminator + column_collection.append(pd) + + +def _warn_for_runid_changed(state): + util.warn( + "Loading context for %s has changed within a load/refresh " + "handler, suggesting a row refresh operation took place. If this " + "event handler is expected to be " + "emitting row refresh operations within an existing load or refresh " + "operation, set restore_load_context=True when establishing the " + "listener to ensure the context remains unchanged when the event " + "handler completes." % (state_str(state),) + ) + + +def _instance_processor( + query_entity, + mapper, + context, + result, + path, + adapter, + only_load_props=None, + refresh_state=None, + polymorphic_discriminator=None, + _polymorphic_from=None, +): + """Produce a mapper level row processor callable + which processes rows into mapped instances.""" + + # note that this method, most of which exists in a closure + # called _instance(), resists being broken out, as + # attempts to do so tend to add significant function + # call overhead. _instance() is the most + # performance-critical section in the whole ORM. + + identity_class = mapper._identity_class + compile_state = context.compile_state + + # look for "row getter" functions that have been assigned along + # with the compile state that were cached from a previous load. + # these are operator.itemgetter() objects that each will extract a + # particular column from each row. + + getter_key = ("getters", mapper) + getters = path.get(compile_state.attributes, getter_key, None) + + if getters is None: + # no getters, so go through a list of attributes we are loading for, + # and the ones that are column based will have already put information + # for us in another collection "memoized_setups", which represents the + # output of the LoaderStrategy.setup_query() method. We can just as + # easily call LoaderStrategy.create_row_processor for each, but by + # getting it all at once from setup_query we save another method call + # per attribute. + props = mapper._prop_set + if only_load_props is not None: + props = props.intersection( + mapper._props[k] for k in only_load_props + ) + + quick_populators = path.get( + context.attributes, "memoized_setups", EMPTY_DICT + ) + + todo = [] + cached_populators = { + "new": [], + "quick": [], + "deferred": [], + "expire": [], + "existing": [], + "eager": [], + } + + if refresh_state is None: + # we can also get the "primary key" tuple getter function + pk_cols = mapper.primary_key + + if adapter: + pk_cols = [adapter.columns[c] for c in pk_cols] + primary_key_getter = result._tuple_getter(pk_cols) + else: + primary_key_getter = None + + getters = { + "cached_populators": cached_populators, + "todo": todo, + "primary_key_getter": primary_key_getter, + } + for prop in props: + if prop in quick_populators: + # this is an inlined path just for column-based attributes. + col = quick_populators[prop] + if col is _DEFER_FOR_STATE: + cached_populators["new"].append( + (prop.key, prop._deferred_column_loader) + ) + elif col is _SET_DEFERRED_EXPIRED: + # note that in this path, we are no longer + # searching in the result to see if the column might + # be present in some unexpected way. + cached_populators["expire"].append((prop.key, False)) + elif col is _RAISE_FOR_STATE: + cached_populators["new"].append( + (prop.key, prop._raise_column_loader) + ) + else: + getter = None + if adapter: + # this logic had been removed for all 1.4 releases + # up until 1.4.18; the adapter here is particularly + # the compound eager adapter which isn't accommodated + # in the quick_populators right now. The "fallback" + # logic below instead took over in many more cases + # until issue #6596 was identified. + + # note there is still an issue where this codepath + # produces no "getter" for cases where a joined-inh + # mapping includes a labeled column property, meaning + # KeyError is caught internally and we fall back to + # _getter(col), which works anyway. The adapter + # here for joined inh without any aliasing might not + # be useful. Tests which see this include + # test.orm.inheritance.test_basic -> + # EagerTargetingTest.test_adapt_stringency + # OptimizedLoadTest.test_column_expression_joined + # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501 + # + + adapted_col = adapter.columns[col] + if adapted_col is not None: + getter = result._getter(adapted_col, False) + if not getter: + getter = result._getter(col, False) + if getter: + cached_populators["quick"].append((prop.key, getter)) + else: + # fall back to the ColumnProperty itself, which + # will iterate through all of its columns + # to see if one fits + prop.create_row_processor( + context, + query_entity, + path, + mapper, + result, + adapter, + cached_populators, + ) + else: + # loader strategies like subqueryload, selectinload, + # joinedload, basically relationships, these need to interact + # with the context each time to work correctly. + todo.append(prop) + + path.set(compile_state.attributes, getter_key, getters) + + cached_populators = getters["cached_populators"] + + populators = {key: list(value) for key, value in cached_populators.items()} + for prop in getters["todo"]: + prop.create_row_processor( + context, query_entity, path, mapper, result, adapter, populators + ) + + propagated_loader_options = context.propagated_loader_options + load_path = ( + context.compile_state.current_path + path + if context.compile_state.current_path.path + else path + ) + + session_identity_map = context.session.identity_map + + populate_existing = context.populate_existing or mapper.always_refresh + load_evt = bool(mapper.class_manager.dispatch.load) + refresh_evt = bool(mapper.class_manager.dispatch.refresh) + persistent_evt = bool(context.session.dispatch.loaded_as_persistent) + if persistent_evt: + loaded_as_persistent = context.session.dispatch.loaded_as_persistent + instance_state = attributes.instance_state + instance_dict = attributes.instance_dict + session_id = context.session.hash_key + runid = context.runid + identity_token = context.identity_token + + version_check = context.version_check + if version_check: + version_id_col = mapper.version_id_col + if version_id_col is not None: + if adapter: + version_id_col = adapter.columns[version_id_col] + version_id_getter = result._getter(version_id_col) + else: + version_id_getter = None + + if not refresh_state and _polymorphic_from is not None: + key = ("loader", path.path) + + if key in context.attributes and context.attributes[key].strategy == ( + ("selectinload_polymorphic", True), + ): + option_entities = context.attributes[key].local_opts["entities"] + else: + option_entities = None + selectin_load_via = mapper._should_selectin_load( + option_entities, + _polymorphic_from, + ) + + if selectin_load_via and selectin_load_via is not _polymorphic_from: + # only_load_props goes w/ refresh_state only, and in a refresh + # we are a single row query for the exact entity; polymorphic + # loading does not apply + assert only_load_props is None + + callable_ = _load_subclass_via_in( + context, + path, + selectin_load_via, + _polymorphic_from, + option_entities, + ) + PostLoad.callable_for_path( + context, + load_path, + selectin_load_via.mapper, + selectin_load_via, + callable_, + selectin_load_via, + ) + + post_load = PostLoad.for_context(context, load_path, only_load_props) + + if refresh_state: + refresh_identity_key = refresh_state.key + if refresh_identity_key is None: + # super-rare condition; a refresh is being called + # on a non-instance-key instance; this is meant to only + # occur within a flush() + refresh_identity_key = mapper._identity_key_from_state( + refresh_state + ) + else: + refresh_identity_key = None + + primary_key_getter = getters["primary_key_getter"] + + if mapper.allow_partial_pks: + is_not_primary_key = _none_set.issuperset + else: + is_not_primary_key = _none_set.intersection + + def _instance(row): + # determine the state that we'll be populating + if refresh_identity_key: + # fixed state that we're refreshing + state = refresh_state + instance = state.obj() + dict_ = instance_dict(instance) + isnew = state.runid != runid + currentload = True + loaded_instance = False + else: + # look at the row, see if that identity is in the + # session, or we have to create a new one + identitykey = ( + identity_class, + primary_key_getter(row), + identity_token, + ) + + instance = session_identity_map.get(identitykey) + + if instance is not None: + # existing instance + state = instance_state(instance) + dict_ = instance_dict(instance) + + isnew = state.runid != runid + currentload = not isnew + loaded_instance = False + + if version_check and version_id_getter and not currentload: + _validate_version_id( + mapper, state, dict_, row, version_id_getter + ) + + else: + # create a new instance + + # check for non-NULL values in the primary key columns, + # else no entity is returned for the row + if is_not_primary_key(identitykey[1]): + return None + + isnew = True + currentload = True + loaded_instance = True + + instance = mapper.class_manager.new_instance() + + dict_ = instance_dict(instance) + state = instance_state(instance) + state.key = identitykey + state.identity_token = identity_token + + # attach instance to session. + state.session_id = session_id + session_identity_map._add_unpresent(state, identitykey) + + effective_populate_existing = populate_existing + if refresh_state is state: + effective_populate_existing = True + + # populate. this looks at whether this state is new + # for this load or was existing, and whether or not this + # row is the first row with this identity. + if currentload or effective_populate_existing: + # full population routines. Objects here are either + # just created, or we are doing a populate_existing + + # be conservative about setting load_path when populate_existing + # is in effect; want to maintain options from the original + # load. see test_expire->test_refresh_maintains_deferred_options + if isnew and ( + propagated_loader_options or not effective_populate_existing + ): + state.load_options = propagated_loader_options + state.load_path = load_path + + _populate_full( + context, + row, + state, + dict_, + isnew, + load_path, + loaded_instance, + effective_populate_existing, + populators, + ) + + if isnew: + # state.runid should be equal to context.runid / runid + # here, however for event checks we are being more conservative + # and checking against existing run id + # assert state.runid == runid + + existing_runid = state.runid + + if loaded_instance: + if load_evt: + state.manager.dispatch.load(state, context) + if state.runid != existing_runid: + _warn_for_runid_changed(state) + if persistent_evt: + loaded_as_persistent(context.session, state) + if state.runid != existing_runid: + _warn_for_runid_changed(state) + elif refresh_evt: + state.manager.dispatch.refresh( + state, context, only_load_props + ) + if state.runid != runid: + _warn_for_runid_changed(state) + + if effective_populate_existing or state.modified: + if refresh_state and only_load_props: + state._commit(dict_, only_load_props) + else: + state._commit_all(dict_, session_identity_map) + + if post_load: + post_load.add_state(state, True) + + else: + # partial population routines, for objects that were already + # in the Session, but a row matches them; apply eager loaders + # on existing objects, etc. + unloaded = state.unloaded + isnew = state not in context.partials + + if not isnew or unloaded or populators["eager"]: + # state is having a partial set of its attributes + # refreshed. Populate those attributes, + # and add to the "context.partials" collection. + + to_load = _populate_partial( + context, + row, + state, + dict_, + isnew, + load_path, + unloaded, + populators, + ) + + if isnew: + if refresh_evt: + existing_runid = state.runid + state.manager.dispatch.refresh(state, context, to_load) + if state.runid != existing_runid: + _warn_for_runid_changed(state) + + state._commit(dict_, to_load) + + if post_load and context.invoke_all_eagers: + post_load.add_state(state, False) + + return instance + + if mapper.polymorphic_map and not _polymorphic_from and not refresh_state: + # if we are doing polymorphic, dispatch to a different _instance() + # method specific to the subclass mapper + def ensure_no_pk(row): + identitykey = ( + identity_class, + primary_key_getter(row), + identity_token, + ) + if not is_not_primary_key(identitykey[1]): + return identitykey + else: + return None + + _instance = _decorate_polymorphic_switch( + _instance, + context, + query_entity, + mapper, + result, + path, + polymorphic_discriminator, + adapter, + ensure_no_pk, + ) + + return _instance + + +def _load_subclass_via_in( + context, path, entity, polymorphic_from, option_entities +): + mapper = entity.mapper + + # TODO: polymorphic_from seems to be a Mapper in all cases. + # this is likely not needed, but as we dont have typing in loading.py + # yet, err on the safe side + polymorphic_from_mapper = polymorphic_from.mapper + not_against_basemost = polymorphic_from_mapper.inherits is not None + + zero_idx = len(mapper.base_mapper.primary_key) == 1 + + if entity.is_aliased_class or not_against_basemost: + q, enable_opt, disable_opt = mapper._subclass_load_via_in( + entity, polymorphic_from + ) + else: + q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper + + def do_load(context, path, states, load_only, effective_entity): + if not option_entities: + # filter out states for those that would have selectinloaded + # from another loader + # TODO: we are currently ignoring the case where the + # "selectin_polymorphic" option is used, as this is much more + # complex / specific / very uncommon API use + states = [ + (s, v) + for s, v in states + if s.mapper._would_selectin_load_only_from_given_mapper(mapper) + ] + + if not states: + return + + orig_query = context.query + + if path.parent: + enable_opt_lcl = enable_opt._prepend_path(path) + disable_opt_lcl = disable_opt._prepend_path(path) + else: + enable_opt_lcl = enable_opt + disable_opt_lcl = disable_opt + options = ( + (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,) + ) + + q2 = q.options(*options) + + q2._compile_options = context.compile_state.default_compile_options + q2._compile_options += {"_current_path": path.parent} + + if context.populate_existing: + q2 = q2.execution_options(populate_existing=True) + + context.session.execute( + q2, + dict( + primary_keys=[ + state.key[1][0] if zero_idx else state.key[1] + for state, load_attrs in states + ] + ), + ).unique().scalars().all() + + return do_load + + +def _populate_full( + context, + row, + state, + dict_, + isnew, + load_path, + loaded_instance, + populate_existing, + populators, +): + if isnew: + # first time we are seeing a row with this identity. + state.runid = context.runid + + for key, getter in populators["quick"]: + dict_[key] = getter(row) + if populate_existing: + for key, set_callable in populators["expire"]: + dict_.pop(key, None) + if set_callable: + state.expired_attributes.add(key) + else: + for key, set_callable in populators["expire"]: + if set_callable: + state.expired_attributes.add(key) + + for key, populator in populators["new"]: + populator(state, dict_, row) + + elif load_path != state.load_path: + # new load path, e.g. object is present in more than one + # column position in a series of rows + state.load_path = load_path + + # if we have data, and the data isn't in the dict, OK, let's put + # it in. + for key, getter in populators["quick"]: + if key not in dict_: + dict_[key] = getter(row) + + # otherwise treat like an "already seen" row + for key, populator in populators["existing"]: + populator(state, dict_, row) + # TODO: allow "existing" populator to know this is + # a new path for the state: + # populator(state, dict_, row, new_path=True) + + else: + # have already seen rows with this identity in this same path. + for key, populator in populators["existing"]: + populator(state, dict_, row) + + # TODO: same path + # populator(state, dict_, row, new_path=False) + + +def _populate_partial( + context, row, state, dict_, isnew, load_path, unloaded, populators +): + if not isnew: + if unloaded: + # extra pass, see #8166 + for key, getter in populators["quick"]: + if key in unloaded: + dict_[key] = getter(row) + + to_load = context.partials[state] + for key, populator in populators["existing"]: + if key in to_load: + populator(state, dict_, row) + else: + to_load = unloaded + context.partials[state] = to_load + + for key, getter in populators["quick"]: + if key in to_load: + dict_[key] = getter(row) + for key, set_callable in populators["expire"]: + if key in to_load: + dict_.pop(key, None) + if set_callable: + state.expired_attributes.add(key) + for key, populator in populators["new"]: + if key in to_load: + populator(state, dict_, row) + + for key, populator in populators["eager"]: + if key not in unloaded: + populator(state, dict_, row) + + return to_load + + +def _validate_version_id(mapper, state, dict_, row, getter): + if mapper._get_state_attr_by_column( + state, dict_, mapper.version_id_col + ) != getter(row): + raise orm_exc.StaleDataError( + "Instance '%s' has version id '%s' which " + "does not match database-loaded version id '%s'." + % ( + state_str(state), + mapper._get_state_attr_by_column( + state, dict_, mapper.version_id_col + ), + getter(row), + ) + ) + + +def _decorate_polymorphic_switch( + instance_fn, + context, + query_entity, + mapper, + result, + path, + polymorphic_discriminator, + adapter, + ensure_no_pk, +): + if polymorphic_discriminator is not None: + polymorphic_on = polymorphic_discriminator + else: + polymorphic_on = mapper.polymorphic_on + if polymorphic_on is None: + return instance_fn + + if adapter: + polymorphic_on = adapter.columns[polymorphic_on] + + def configure_subclass_mapper(discriminator): + try: + sub_mapper = mapper.polymorphic_map[discriminator] + except KeyError: + raise AssertionError( + "No such polymorphic_identity %r is defined" % discriminator + ) + else: + if sub_mapper is mapper: + return None + elif not sub_mapper.isa(mapper): + return False + + return _instance_processor( + query_entity, + sub_mapper, + context, + result, + path, + adapter, + _polymorphic_from=mapper, + ) + + polymorphic_instances = util.PopulateDict(configure_subclass_mapper) + + getter = result._getter(polymorphic_on) + + def polymorphic_instance(row): + discriminator = getter(row) + if discriminator is not None: + _instance = polymorphic_instances[discriminator] + if _instance: + return _instance(row) + elif _instance is False: + identitykey = ensure_no_pk(row) + + if identitykey: + raise sa_exc.InvalidRequestError( + "Row with identity key %s can't be loaded into an " + "object; the polymorphic discriminator column '%s' " + "refers to %s, which is not a sub-mapper of " + "the requested %s" + % ( + identitykey, + polymorphic_on, + mapper.polymorphic_map[discriminator], + mapper, + ) + ) + else: + return None + else: + return instance_fn(row) + else: + identitykey = ensure_no_pk(row) + + if identitykey: + raise sa_exc.InvalidRequestError( + "Row with identity key %s can't be loaded into an " + "object; the polymorphic discriminator column '%s' is " + "NULL" % (identitykey, polymorphic_on) + ) + else: + return None + + return polymorphic_instance + + +class PostLoad: + """Track loaders and states for "post load" operations.""" + + __slots__ = "loaders", "states", "load_keys" + + def __init__(self): + self.loaders = {} + self.states = util.OrderedDict() + self.load_keys = None + + def add_state(self, state, overwrite): + # the states for a polymorphic load here are all shared + # within a single PostLoad object among multiple subtypes. + # Filtering of callables on a per-subclass basis needs to be done at + # the invocation level + self.states[state] = overwrite + + def invoke(self, context, path): + if not self.states: + return + path = path_registry.PathRegistry.coerce(path) + for ( + effective_context, + token, + limit_to_mapper, + loader, + arg, + kw, + ) in self.loaders.values(): + states = [ + (state, overwrite) + for state, overwrite in self.states.items() + if state.manager.mapper.isa(limit_to_mapper) + ] + if states: + loader( + effective_context, path, states, self.load_keys, *arg, **kw + ) + self.states.clear() + + @classmethod + def for_context(cls, context, path, only_load_props): + pl = context.post_load_paths.get(path.path) + if pl is not None and only_load_props: + pl.load_keys = only_load_props + return pl + + @classmethod + def path_exists(self, context, path, key): + return ( + path.path in context.post_load_paths + and key in context.post_load_paths[path.path].loaders + ) + + @classmethod + def callable_for_path( + cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw + ): + if path.path in context.post_load_paths: + pl = context.post_load_paths[path.path] + else: + pl = context.post_load_paths[path.path] = PostLoad() + pl.loaders[token] = ( + context, + token, + limit_to_mapper, + loader_callable, + arg, + kw, + ) + + +def load_scalar_attributes(mapper, state, attribute_names, passive): + """initiate a column-based attribute refresh operation.""" + + # assert mapper is _state_mapper(state) + session = state.session + if not session: + raise orm_exc.DetachedInstanceError( + "Instance %s is not bound to a Session; " + "attribute refresh operation cannot proceed" % (state_str(state)) + ) + + no_autoflush = bool(passive & attributes.NO_AUTOFLUSH) + + # in the case of inheritance, particularly concrete and abstract + # concrete inheritance, the class manager might have some keys + # of attributes on the superclass that we didn't actually map. + # These could be mapped as "concrete, don't load" or could be completely + # excluded from the mapping and we know nothing about them. Filter them + # here to prevent them from coming through. + if attribute_names: + attribute_names = attribute_names.intersection(mapper.attrs.keys()) + + if mapper.inherits and not mapper.concrete: + # load based on committed attributes in the object, formed into + # a truncated SELECT that only includes relevant tables. does not + # currently use state.key + statement = mapper._optimized_get_statement(state, attribute_names) + if statement is not None: + # undefer() isn't needed here because statement has the + # columns needed already, this implicitly undefers that column + stmt = FromStatement(mapper, statement) + + return load_on_ident( + session, + stmt, + None, + only_load_props=attribute_names, + refresh_state=state, + no_autoflush=no_autoflush, + ) + + # normal load, use state.key as the identity to SELECT + has_key = bool(state.key) + + if has_key: + identity_key = state.key + else: + # this codepath is rare - only valid when inside a flush, and the + # object is becoming persistent but hasn't yet been assigned + # an identity_key. + # check here to ensure we have the attrs we need. + pk_attrs = [ + mapper._columntoproperty[col].key for col in mapper.primary_key + ] + if state.expired_attributes.intersection(pk_attrs): + raise sa_exc.InvalidRequestError( + "Instance %s cannot be refreshed - it's not " + " persistent and does not " + "contain a full primary key." % state_str(state) + ) + identity_key = mapper._identity_key_from_state(state) + + if ( + _none_set.issubset(identity_key) and not mapper.allow_partial_pks + ) or _none_set.issuperset(identity_key): + util.warn_limited( + "Instance %s to be refreshed doesn't " + "contain a full primary key - can't be refreshed " + "(and shouldn't be expired, either).", + state_str(state), + ) + return + + result = load_on_ident( + session, + select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL), + identity_key, + refresh_state=state, + only_load_props=attribute_names, + no_autoflush=no_autoflush, + ) + + # if instance is pending, a refresh operation + # may not complete (even if PK attributes are assigned) + if has_key and result is None: + raise orm_exc.ObjectDeletedError(state) -- cgit v1.2.3