summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py1665
1 files changed, 1665 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py
new file mode 100644
index 0000000..4e2cb82
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py
@@ -0,0 +1,1665 @@
+# orm/loading.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+
+"""private module containing functions used to convert database
+rows into object instances and associated state.
+
+the functions here are called primarily by Query, Mapper,
+as well as some of the attribute loading strategies.
+
+"""
+
+from __future__ import annotations
+
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Mapping
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from . import attributes
+from . import exc as orm_exc
+from . import path_registry
+from .base import _DEFER_FOR_STATE
+from .base import _RAISE_FOR_STATE
+from .base import _SET_DEFERRED_EXPIRED
+from .base import PassiveFlag
+from .context import FromStatement
+from .context import ORMCompileState
+from .context import QueryContext
+from .util import _none_set
+from .util import state_str
+from .. import exc as sa_exc
+from .. import util
+from ..engine import result_tuple
+from ..engine.result import ChunkedIteratorResult
+from ..engine.result import FrozenResult
+from ..engine.result import SimpleResultMetaData
+from ..sql import select
+from ..sql import util as sql_util
+from ..sql.selectable import ForUpdateArg
+from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
+from ..sql.selectable import SelectState
+from ..util import EMPTY_DICT
+
+if TYPE_CHECKING:
+ from ._typing import _IdentityKeyType
+ from .base import LoaderCallableStatus
+ from .interfaces import ORMOption
+ from .mapper import Mapper
+ from .query import Query
+ from .session import Session
+ from .state import InstanceState
+ from ..engine.cursor import CursorResult
+ from ..engine.interfaces import _ExecuteOptions
+ from ..engine.result import Result
+ from ..sql import Select
+
+_T = TypeVar("_T", bound=Any)
+_O = TypeVar("_O", bound=object)
+_new_runid = util.counter()
+
+
+_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
+
+
+def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
+ """Return a :class:`.Result` given an ORM query context.
+
+ :param cursor: a :class:`.CursorResult`, generated by a statement
+ which came from :class:`.ORMCompileState`
+
+ :param context: a :class:`.QueryContext` object
+
+ :return: a :class:`.Result` object representing ORM results
+
+ .. versionchanged:: 1.4 The instances() function now uses
+ :class:`.Result` objects and has an all new interface.
+
+ """
+
+ context.runid = _new_runid()
+
+ if context.top_level_context:
+ is_top_level = False
+ context.post_load_paths = context.top_level_context.post_load_paths
+ else:
+ is_top_level = True
+ context.post_load_paths = {}
+
+ compile_state = context.compile_state
+ filtered = compile_state._has_mapper_entities
+ single_entity = (
+ not context.load_options._only_return_tuples
+ and len(compile_state._entities) == 1
+ and compile_state._entities[0].supports_single_entity
+ )
+
+ try:
+ (process, labels, extra) = list(
+ zip(
+ *[
+ query_entity.row_processor(context, cursor)
+ for query_entity in context.compile_state._entities
+ ]
+ )
+ )
+
+ if context.yield_per and (
+ context.loaders_require_buffering
+ or context.loaders_require_uniquing
+ ):
+ raise sa_exc.InvalidRequestError(
+ "Can't use yield_per with eager loaders that require uniquing "
+ "or row buffering, e.g. joinedload() against collections "
+ "or subqueryload(). Consider the selectinload() strategy "
+ "for better flexibility in loading objects."
+ )
+
+ except Exception:
+ with util.safe_reraise():
+ cursor.close()
+
+ def _no_unique(entry):
+ raise sa_exc.InvalidRequestError(
+ "Can't use the ORM yield_per feature in conjunction with unique()"
+ )
+
+ def _not_hashable(datatype, *, legacy=False, uncertain=False):
+ if not legacy:
+
+ def go(obj):
+ if uncertain:
+ try:
+ return hash(obj)
+ except:
+ pass
+
+ raise sa_exc.InvalidRequestError(
+ "Can't apply uniqueness to row tuple containing value of "
+ f"""type {datatype!r}; {
+ 'the values returned appear to be'
+ if uncertain
+ else 'this datatype produces'
+ } non-hashable values"""
+ )
+
+ return go
+ elif not uncertain:
+ return id
+ else:
+ _use_id = False
+
+ def go(obj):
+ nonlocal _use_id
+
+ if not _use_id:
+ try:
+ return hash(obj)
+ except:
+ pass
+
+ # in #10459, we considered using a warning here, however
+ # as legacy query uses result.unique() in all cases, this
+ # would lead to too many warning cases.
+ _use_id = True
+
+ return id(obj)
+
+ return go
+
+ unique_filters = [
+ (
+ _no_unique
+ if context.yield_per
+ else (
+ _not_hashable(
+ ent.column.type, # type: ignore
+ legacy=context.load_options._legacy_uniquing,
+ uncertain=ent._null_column_type,
+ )
+ if (
+ not ent.use_id_for_hash
+ and (ent._non_hashable_value or ent._null_column_type)
+ )
+ else id if ent.use_id_for_hash else None
+ )
+ )
+ for ent in context.compile_state._entities
+ ]
+
+ row_metadata = SimpleResultMetaData(
+ labels, extra, _unique_filters=unique_filters
+ )
+
+ def chunks(size): # type: ignore
+ while True:
+ yield_per = size
+
+ context.partials = {}
+
+ if yield_per:
+ fetch = cursor.fetchmany(yield_per)
+
+ if not fetch:
+ break
+ else:
+ fetch = cursor._raw_all_rows()
+
+ if single_entity:
+ proc = process[0]
+ rows = [proc(row) for row in fetch]
+ else:
+ rows = [
+ tuple([proc(row) for proc in process]) for row in fetch
+ ]
+
+ # if we are the originating load from a query, meaning we
+ # aren't being called as a result of a nested "post load",
+ # iterate through all the collected post loaders and fire them
+ # off. Previously this used to work recursively, however that
+ # prevented deeply nested structures from being loadable
+ if is_top_level:
+ if yield_per:
+ # if using yield per, memoize the state of the
+ # collection so that it can be restored
+ top_level_post_loads = list(
+ context.post_load_paths.items()
+ )
+
+ while context.post_load_paths:
+ post_loads = list(context.post_load_paths.items())
+ context.post_load_paths.clear()
+ for path, post_load in post_loads:
+ post_load.invoke(context, path)
+
+ if yield_per:
+ context.post_load_paths.clear()
+ context.post_load_paths.update(top_level_post_loads)
+
+ yield rows
+
+ if not yield_per:
+ break
+
+ if context.execution_options.get("prebuffer_rows", False):
+ # this is a bit of a hack at the moment.
+ # I would rather have some option in the result to pre-buffer
+ # internally.
+ _prebuffered = list(chunks(None))
+
+ def chunks(size):
+ return iter(_prebuffered)
+
+ result = ChunkedIteratorResult(
+ row_metadata,
+ chunks,
+ source_supports_scalars=single_entity,
+ raw=cursor,
+ dynamic_yield_per=cursor.context._is_server_side,
+ )
+
+ # filtered and single_entity are used to indicate to legacy Query that the
+ # query has ORM entities, so legacy deduping and scalars should be called
+ # on the result.
+ result._attributes = result._attributes.union(
+ dict(filtered=filtered, is_single_entity=single_entity)
+ )
+
+ # multi_row_eager_loaders OTOH is specific to joinedload.
+ if context.compile_state.multi_row_eager_loaders:
+
+ def require_unique(obj):
+ raise sa_exc.InvalidRequestError(
+ "The unique() method must be invoked on this Result, "
+ "as it contains results that include joined eager loads "
+ "against collections"
+ )
+
+ result._unique_filter_state = (None, require_unique)
+
+ if context.yield_per:
+ result.yield_per(context.yield_per)
+
+ return result
+
+
+@util.preload_module("sqlalchemy.orm.context")
+def merge_frozen_result(session, statement, frozen_result, load=True):
+ """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
+ returning a new :class:`_engine.Result` object with :term:`persistent`
+ objects.
+
+ See the section :ref:`do_orm_execute_re_executing` for an example.
+
+ .. seealso::
+
+ :ref:`do_orm_execute_re_executing`
+
+ :meth:`_engine.Result.freeze`
+
+ :class:`_engine.FrozenResult`
+
+ """
+ querycontext = util.preloaded.orm_context
+
+ if load:
+ # flush current contents if we expect to load data
+ session._autoflush()
+
+ ctx = querycontext.ORMSelectCompileState._create_entities_collection(
+ statement, legacy=False
+ )
+
+ autoflush = session.autoflush
+ try:
+ session.autoflush = False
+ mapped_entities = [
+ i
+ for i, e in enumerate(ctx._entities)
+ if isinstance(e, querycontext._MapperEntity)
+ ]
+ keys = [ent._label_name for ent in ctx._entities]
+
+ keyed_tuple = result_tuple(
+ keys, [ent._extra_entities for ent in ctx._entities]
+ )
+
+ result = []
+ for newrow in frozen_result.rewrite_rows():
+ for i in mapped_entities:
+ if newrow[i] is not None:
+ newrow[i] = session._merge(
+ attributes.instance_state(newrow[i]),
+ attributes.instance_dict(newrow[i]),
+ load=load,
+ _recursive={},
+ _resolve_conflict_map={},
+ )
+
+ result.append(keyed_tuple(newrow))
+
+ return frozen_result.with_new_rows(result)
+ finally:
+ session.autoflush = autoflush
+
+
+@util.became_legacy_20(
+ ":func:`_orm.merge_result`",
+ alternative="The function as well as the method on :class:`_orm.Query` "
+ "is superseded by the :func:`_orm.merge_frozen_result` function.",
+)
+@util.preload_module("sqlalchemy.orm.context")
+def merge_result(
+ query: Query[Any],
+ iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
+ load: bool = True,
+) -> Union[FrozenResult, Iterable[Any]]:
+ """Merge a result into the given :class:`.Query` object's Session.
+
+ See :meth:`_orm.Query.merge_result` for top-level documentation on this
+ function.
+
+ """
+
+ querycontext = util.preloaded.orm_context
+
+ session = query.session
+ if load:
+ # flush current contents if we expect to load data
+ session._autoflush()
+
+ # TODO: need test coverage and documentation for the FrozenResult
+ # use case.
+ if isinstance(iterator, FrozenResult):
+ frozen_result = iterator
+ iterator = iter(frozen_result.data)
+ else:
+ frozen_result = None
+
+ ctx = querycontext.ORMSelectCompileState._create_entities_collection(
+ query, legacy=True
+ )
+
+ autoflush = session.autoflush
+ try:
+ session.autoflush = False
+ single_entity = not frozen_result and len(ctx._entities) == 1
+
+ if single_entity:
+ if isinstance(ctx._entities[0], querycontext._MapperEntity):
+ result = [
+ session._merge(
+ attributes.instance_state(instance),
+ attributes.instance_dict(instance),
+ load=load,
+ _recursive={},
+ _resolve_conflict_map={},
+ )
+ for instance in iterator
+ ]
+ else:
+ result = list(iterator)
+ else:
+ mapped_entities = [
+ i
+ for i, e in enumerate(ctx._entities)
+ if isinstance(e, querycontext._MapperEntity)
+ ]
+ result = []
+ keys = [ent._label_name for ent in ctx._entities]
+
+ keyed_tuple = result_tuple(
+ keys, [ent._extra_entities for ent in ctx._entities]
+ )
+
+ for row in iterator:
+ newrow = list(row)
+ for i in mapped_entities:
+ if newrow[i] is not None:
+ newrow[i] = session._merge(
+ attributes.instance_state(newrow[i]),
+ attributes.instance_dict(newrow[i]),
+ load=load,
+ _recursive={},
+ _resolve_conflict_map={},
+ )
+ result.append(keyed_tuple(newrow))
+
+ if frozen_result:
+ return frozen_result.with_new_rows(result)
+ else:
+ return iter(result)
+ finally:
+ session.autoflush = autoflush
+
+
+def get_from_identity(
+ session: Session,
+ mapper: Mapper[_O],
+ key: _IdentityKeyType[_O],
+ passive: PassiveFlag,
+) -> Union[LoaderCallableStatus, Optional[_O]]:
+ """Look up the given key in the given session's identity map,
+ check the object for expired state if found.
+
+ """
+ instance = session.identity_map.get(key)
+ if instance is not None:
+ state = attributes.instance_state(instance)
+
+ if mapper.inherits and not state.mapper.isa(mapper):
+ return attributes.PASSIVE_CLASS_MISMATCH
+
+ # expired - ensure it still exists
+ if state.expired:
+ if not passive & attributes.SQL_OK:
+ # TODO: no coverage here
+ return attributes.PASSIVE_NO_RESULT
+ elif not passive & attributes.RELATED_OBJECT_OK:
+ # this mode is used within a flush and the instance's
+ # expired state will be checked soon enough, if necessary.
+ # also used by immediateloader for a mutually-dependent
+ # o2m->m2m load, :ticket:`6301`
+ return instance
+ try:
+ state._load_expired(state, passive)
+ except orm_exc.ObjectDeletedError:
+ session._remove_newly_deleted([state])
+ return None
+ return instance
+ else:
+ return None
+
+
+def load_on_ident(
+ session: Session,
+ statement: Union[Select, FromStatement],
+ key: Optional[_IdentityKeyType],
+ *,
+ load_options: Optional[Sequence[ORMOption]] = None,
+ refresh_state: Optional[InstanceState[Any]] = None,
+ with_for_update: Optional[ForUpdateArg] = None,
+ only_load_props: Optional[Iterable[str]] = None,
+ no_autoflush: bool = False,
+ bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
+ execution_options: _ExecuteOptions = util.EMPTY_DICT,
+ require_pk_cols: bool = False,
+ is_user_refresh: bool = False,
+):
+ """Load the given identity key from the database."""
+ if key is not None:
+ ident = key[1]
+ identity_token = key[2]
+ else:
+ ident = identity_token = None
+
+ return load_on_pk_identity(
+ session,
+ statement,
+ ident,
+ load_options=load_options,
+ refresh_state=refresh_state,
+ with_for_update=with_for_update,
+ only_load_props=only_load_props,
+ identity_token=identity_token,
+ no_autoflush=no_autoflush,
+ bind_arguments=bind_arguments,
+ execution_options=execution_options,
+ require_pk_cols=require_pk_cols,
+ is_user_refresh=is_user_refresh,
+ )
+
+
+def load_on_pk_identity(
+ session: Session,
+ statement: Union[Select, FromStatement],
+ primary_key_identity: Optional[Tuple[Any, ...]],
+ *,
+ load_options: Optional[Sequence[ORMOption]] = None,
+ refresh_state: Optional[InstanceState[Any]] = None,
+ with_for_update: Optional[ForUpdateArg] = None,
+ only_load_props: Optional[Iterable[str]] = None,
+ identity_token: Optional[Any] = None,
+ no_autoflush: bool = False,
+ bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
+ execution_options: _ExecuteOptions = util.EMPTY_DICT,
+ require_pk_cols: bool = False,
+ is_user_refresh: bool = False,
+):
+ """Load the given primary key identity from the database."""
+
+ query = statement
+ q = query._clone()
+
+ assert not q._is_lambda_element
+
+ if load_options is None:
+ load_options = QueryContext.default_load_options
+
+ if (
+ statement._compile_options
+ is SelectState.default_select_compile_options
+ ):
+ compile_options = ORMCompileState.default_compile_options
+ else:
+ compile_options = statement._compile_options
+
+ if primary_key_identity is not None:
+ mapper = query._propagate_attrs["plugin_subject"]
+
+ (_get_clause, _get_params) = mapper._get_clause
+
+ # None present in ident - turn those comparisons
+ # into "IS NULL"
+ if None in primary_key_identity:
+ nones = {
+ _get_params[col].key
+ for col, value in zip(mapper.primary_key, primary_key_identity)
+ if value is None
+ }
+
+ _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
+
+ if len(nones) == len(primary_key_identity):
+ util.warn(
+ "fully NULL primary key identity cannot load any "
+ "object. This condition may raise an error in a future "
+ "release."
+ )
+
+ q._where_criteria = (
+ sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
+ )
+
+ params = {
+ _get_params[primary_key].key: id_val
+ for id_val, primary_key in zip(
+ primary_key_identity, mapper.primary_key
+ )
+ }
+ else:
+ params = None
+
+ if with_for_update is not None:
+ version_check = True
+ q._for_update_arg = with_for_update
+ elif query._for_update_arg is not None:
+ version_check = True
+ q._for_update_arg = query._for_update_arg
+ else:
+ version_check = False
+
+ if require_pk_cols and only_load_props:
+ if not refresh_state:
+ raise sa_exc.ArgumentError(
+ "refresh_state is required when require_pk_cols is present"
+ )
+
+ refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
+ has_changes = {
+ key
+ for key in refresh_state_prokeys.difference(only_load_props)
+ if refresh_state.attrs[key].history.has_changes()
+ }
+ if has_changes:
+ # raise if pending pk changes are present.
+ # technically, this could be limited to the case where we have
+ # relationships in the only_load_props collection to be refreshed
+ # also (and only ones that have a secondary eager loader, at that).
+ # however, the error is in place across the board so that behavior
+ # here is easier to predict. The use case it prevents is one
+ # of mutating PK attrs, leaving them unflushed,
+ # calling session.refresh(), and expecting those attrs to remain
+ # still unflushed. It seems likely someone doing all those
+ # things would be better off having the PK attributes flushed
+ # to the database before tinkering like that (session.refresh() is
+ # tinkering).
+ raise sa_exc.InvalidRequestError(
+ f"Please flush pending primary key changes on "
+ "attributes "
+ f"{has_changes} for mapper {refresh_state.mapper} before "
+ "proceeding with a refresh"
+ )
+
+ # overall, the ORM has no internal flow right now for "dont load the
+ # primary row of an object at all, but fire off
+ # selectinload/subqueryload/immediateload for some relationships".
+ # It would probably be a pretty big effort to add such a flow. So
+ # here, the case for #8703 is introduced; user asks to refresh some
+ # relationship attributes only which are
+ # selectinload/subqueryload/immediateload/ etc. (not joinedload).
+ # ORM complains there's no columns in the primary row to load.
+ # So here, we just add the PK cols if that
+ # case is detected, so that there is a SELECT emitted for the primary
+ # row.
+ #
+ # Let's just state right up front, for this one little case,
+ # the ORM here is adding a whole extra SELECT just to satisfy
+ # limitations in the internal flow. This is really not a thing
+ # SQLAlchemy finds itself doing like, ever, obviously, we are
+ # constantly working to *remove* SELECTs we don't need. We
+ # rationalize this for now based on 1. session.refresh() is not
+ # commonly used 2. session.refresh() with only relationship attrs is
+ # even less commonly used 3. the SELECT in question is very low
+ # latency.
+ #
+ # to add the flow to not include the SELECT, the quickest way
+ # might be to just manufacture a single-row result set to send off to
+ # instances(), but we'd have to weave that into context.py and all
+ # that. For 2.0.0, we have enough big changes to navigate for now.
+ #
+ mp = refresh_state.mapper._props
+ for p in only_load_props:
+ if mp[p]._is_relationship:
+ only_load_props = refresh_state_prokeys.union(only_load_props)
+ break
+
+ if refresh_state and refresh_state.load_options:
+ compile_options += {"_current_path": refresh_state.load_path.parent}
+ q = q.options(*refresh_state.load_options)
+
+ new_compile_options, load_options = _set_get_options(
+ compile_options,
+ load_options,
+ version_check=version_check,
+ only_load_props=only_load_props,
+ refresh_state=refresh_state,
+ identity_token=identity_token,
+ is_user_refresh=is_user_refresh,
+ )
+
+ q._compile_options = new_compile_options
+ q._order_by = None
+
+ if no_autoflush:
+ load_options += {"_autoflush": False}
+
+ execution_options = util.EMPTY_DICT.merge_with(
+ execution_options, {"_sa_orm_load_options": load_options}
+ )
+ result = (
+ session.execute(
+ q,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ )
+ .unique()
+ .scalars()
+ )
+
+ try:
+ return result.one()
+ except orm_exc.NoResultFound:
+ return None
+
+
+def _set_get_options(
+ compile_opt,
+ load_opt,
+ populate_existing=None,
+ version_check=None,
+ only_load_props=None,
+ refresh_state=None,
+ identity_token=None,
+ is_user_refresh=None,
+):
+ compile_options = {}
+ load_options = {}
+ if version_check:
+ load_options["_version_check"] = version_check
+ if populate_existing:
+ load_options["_populate_existing"] = populate_existing
+ if refresh_state:
+ load_options["_refresh_state"] = refresh_state
+ compile_options["_for_refresh_state"] = True
+ if only_load_props:
+ compile_options["_only_load_props"] = frozenset(only_load_props)
+ if identity_token:
+ load_options["_identity_token"] = identity_token
+
+ if is_user_refresh:
+ load_options["_is_user_refresh"] = is_user_refresh
+ if load_options:
+ load_opt += load_options
+ if compile_options:
+ compile_opt += compile_options
+
+ return compile_opt, load_opt
+
+
+def _setup_entity_query(
+ compile_state,
+ mapper,
+ query_entity,
+ path,
+ adapter,
+ column_collection,
+ with_polymorphic=None,
+ only_load_props=None,
+ polymorphic_discriminator=None,
+ **kw,
+):
+ if with_polymorphic:
+ poly_properties = mapper._iterate_polymorphic_properties(
+ with_polymorphic
+ )
+ else:
+ poly_properties = mapper._polymorphic_properties
+
+ quick_populators = {}
+
+ path.set(compile_state.attributes, "memoized_setups", quick_populators)
+
+ # for the lead entities in the path, e.g. not eager loads, and
+ # assuming a user-passed aliased class, e.g. not a from_self() or any
+ # implicit aliasing, don't add columns to the SELECT that aren't
+ # in the thing that's aliased.
+ check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
+
+ for value in poly_properties:
+ if only_load_props and value.key not in only_load_props:
+ continue
+ value.setup(
+ compile_state,
+ query_entity,
+ path,
+ adapter,
+ only_load_props=only_load_props,
+ column_collection=column_collection,
+ memoized_populators=quick_populators,
+ check_for_adapt=check_for_adapt,
+ **kw,
+ )
+
+ if (
+ polymorphic_discriminator is not None
+ and polymorphic_discriminator is not mapper.polymorphic_on
+ ):
+ if adapter:
+ pd = adapter.columns[polymorphic_discriminator]
+ else:
+ pd = polymorphic_discriminator
+ column_collection.append(pd)
+
+
+def _warn_for_runid_changed(state):
+ util.warn(
+ "Loading context for %s has changed within a load/refresh "
+ "handler, suggesting a row refresh operation took place. If this "
+ "event handler is expected to be "
+ "emitting row refresh operations within an existing load or refresh "
+ "operation, set restore_load_context=True when establishing the "
+ "listener to ensure the context remains unchanged when the event "
+ "handler completes." % (state_str(state),)
+ )
+
+
+def _instance_processor(
+ query_entity,
+ mapper,
+ context,
+ result,
+ path,
+ adapter,
+ only_load_props=None,
+ refresh_state=None,
+ polymorphic_discriminator=None,
+ _polymorphic_from=None,
+):
+ """Produce a mapper level row processor callable
+ which processes rows into mapped instances."""
+
+ # note that this method, most of which exists in a closure
+ # called _instance(), resists being broken out, as
+ # attempts to do so tend to add significant function
+ # call overhead. _instance() is the most
+ # performance-critical section in the whole ORM.
+
+ identity_class = mapper._identity_class
+ compile_state = context.compile_state
+
+ # look for "row getter" functions that have been assigned along
+ # with the compile state that were cached from a previous load.
+ # these are operator.itemgetter() objects that each will extract a
+ # particular column from each row.
+
+ getter_key = ("getters", mapper)
+ getters = path.get(compile_state.attributes, getter_key, None)
+
+ if getters is None:
+ # no getters, so go through a list of attributes we are loading for,
+ # and the ones that are column based will have already put information
+ # for us in another collection "memoized_setups", which represents the
+ # output of the LoaderStrategy.setup_query() method. We can just as
+ # easily call LoaderStrategy.create_row_processor for each, but by
+ # getting it all at once from setup_query we save another method call
+ # per attribute.
+ props = mapper._prop_set
+ if only_load_props is not None:
+ props = props.intersection(
+ mapper._props[k] for k in only_load_props
+ )
+
+ quick_populators = path.get(
+ context.attributes, "memoized_setups", EMPTY_DICT
+ )
+
+ todo = []
+ cached_populators = {
+ "new": [],
+ "quick": [],
+ "deferred": [],
+ "expire": [],
+ "existing": [],
+ "eager": [],
+ }
+
+ if refresh_state is None:
+ # we can also get the "primary key" tuple getter function
+ pk_cols = mapper.primary_key
+
+ if adapter:
+ pk_cols = [adapter.columns[c] for c in pk_cols]
+ primary_key_getter = result._tuple_getter(pk_cols)
+ else:
+ primary_key_getter = None
+
+ getters = {
+ "cached_populators": cached_populators,
+ "todo": todo,
+ "primary_key_getter": primary_key_getter,
+ }
+ for prop in props:
+ if prop in quick_populators:
+ # this is an inlined path just for column-based attributes.
+ col = quick_populators[prop]
+ if col is _DEFER_FOR_STATE:
+ cached_populators["new"].append(
+ (prop.key, prop._deferred_column_loader)
+ )
+ elif col is _SET_DEFERRED_EXPIRED:
+ # note that in this path, we are no longer
+ # searching in the result to see if the column might
+ # be present in some unexpected way.
+ cached_populators["expire"].append((prop.key, False))
+ elif col is _RAISE_FOR_STATE:
+ cached_populators["new"].append(
+ (prop.key, prop._raise_column_loader)
+ )
+ else:
+ getter = None
+ if adapter:
+ # this logic had been removed for all 1.4 releases
+ # up until 1.4.18; the adapter here is particularly
+ # the compound eager adapter which isn't accommodated
+ # in the quick_populators right now. The "fallback"
+ # logic below instead took over in many more cases
+ # until issue #6596 was identified.
+
+ # note there is still an issue where this codepath
+ # produces no "getter" for cases where a joined-inh
+ # mapping includes a labeled column property, meaning
+ # KeyError is caught internally and we fall back to
+ # _getter(col), which works anyway. The adapter
+ # here for joined inh without any aliasing might not
+ # be useful. Tests which see this include
+ # test.orm.inheritance.test_basic ->
+ # EagerTargetingTest.test_adapt_stringency
+ # OptimizedLoadTest.test_column_expression_joined
+ # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
+ #
+
+ adapted_col = adapter.columns[col]
+ if adapted_col is not None:
+ getter = result._getter(adapted_col, False)
+ if not getter:
+ getter = result._getter(col, False)
+ if getter:
+ cached_populators["quick"].append((prop.key, getter))
+ else:
+ # fall back to the ColumnProperty itself, which
+ # will iterate through all of its columns
+ # to see if one fits
+ prop.create_row_processor(
+ context,
+ query_entity,
+ path,
+ mapper,
+ result,
+ adapter,
+ cached_populators,
+ )
+ else:
+ # loader strategies like subqueryload, selectinload,
+ # joinedload, basically relationships, these need to interact
+ # with the context each time to work correctly.
+ todo.append(prop)
+
+ path.set(compile_state.attributes, getter_key, getters)
+
+ cached_populators = getters["cached_populators"]
+
+ populators = {key: list(value) for key, value in cached_populators.items()}
+ for prop in getters["todo"]:
+ prop.create_row_processor(
+ context, query_entity, path, mapper, result, adapter, populators
+ )
+
+ propagated_loader_options = context.propagated_loader_options
+ load_path = (
+ context.compile_state.current_path + path
+ if context.compile_state.current_path.path
+ else path
+ )
+
+ session_identity_map = context.session.identity_map
+
+ populate_existing = context.populate_existing or mapper.always_refresh
+ load_evt = bool(mapper.class_manager.dispatch.load)
+ refresh_evt = bool(mapper.class_manager.dispatch.refresh)
+ persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
+ if persistent_evt:
+ loaded_as_persistent = context.session.dispatch.loaded_as_persistent
+ instance_state = attributes.instance_state
+ instance_dict = attributes.instance_dict
+ session_id = context.session.hash_key
+ runid = context.runid
+ identity_token = context.identity_token
+
+ version_check = context.version_check
+ if version_check:
+ version_id_col = mapper.version_id_col
+ if version_id_col is not None:
+ if adapter:
+ version_id_col = adapter.columns[version_id_col]
+ version_id_getter = result._getter(version_id_col)
+ else:
+ version_id_getter = None
+
+ if not refresh_state and _polymorphic_from is not None:
+ key = ("loader", path.path)
+
+ if key in context.attributes and context.attributes[key].strategy == (
+ ("selectinload_polymorphic", True),
+ ):
+ option_entities = context.attributes[key].local_opts["entities"]
+ else:
+ option_entities = None
+ selectin_load_via = mapper._should_selectin_load(
+ option_entities,
+ _polymorphic_from,
+ )
+
+ if selectin_load_via and selectin_load_via is not _polymorphic_from:
+ # only_load_props goes w/ refresh_state only, and in a refresh
+ # we are a single row query for the exact entity; polymorphic
+ # loading does not apply
+ assert only_load_props is None
+
+ callable_ = _load_subclass_via_in(
+ context,
+ path,
+ selectin_load_via,
+ _polymorphic_from,
+ option_entities,
+ )
+ PostLoad.callable_for_path(
+ context,
+ load_path,
+ selectin_load_via.mapper,
+ selectin_load_via,
+ callable_,
+ selectin_load_via,
+ )
+
+ post_load = PostLoad.for_context(context, load_path, only_load_props)
+
+ if refresh_state:
+ refresh_identity_key = refresh_state.key
+ if refresh_identity_key is None:
+ # super-rare condition; a refresh is being called
+ # on a non-instance-key instance; this is meant to only
+ # occur within a flush()
+ refresh_identity_key = mapper._identity_key_from_state(
+ refresh_state
+ )
+ else:
+ refresh_identity_key = None
+
+ primary_key_getter = getters["primary_key_getter"]
+
+ if mapper.allow_partial_pks:
+ is_not_primary_key = _none_set.issuperset
+ else:
+ is_not_primary_key = _none_set.intersection
+
+ def _instance(row):
+ # determine the state that we'll be populating
+ if refresh_identity_key:
+ # fixed state that we're refreshing
+ state = refresh_state
+ instance = state.obj()
+ dict_ = instance_dict(instance)
+ isnew = state.runid != runid
+ currentload = True
+ loaded_instance = False
+ else:
+ # look at the row, see if that identity is in the
+ # session, or we have to create a new one
+ identitykey = (
+ identity_class,
+ primary_key_getter(row),
+ identity_token,
+ )
+
+ instance = session_identity_map.get(identitykey)
+
+ if instance is not None:
+ # existing instance
+ state = instance_state(instance)
+ dict_ = instance_dict(instance)
+
+ isnew = state.runid != runid
+ currentload = not isnew
+ loaded_instance = False
+
+ if version_check and version_id_getter and not currentload:
+ _validate_version_id(
+ mapper, state, dict_, row, version_id_getter
+ )
+
+ else:
+ # create a new instance
+
+ # check for non-NULL values in the primary key columns,
+ # else no entity is returned for the row
+ if is_not_primary_key(identitykey[1]):
+ return None
+
+ isnew = True
+ currentload = True
+ loaded_instance = True
+
+ instance = mapper.class_manager.new_instance()
+
+ dict_ = instance_dict(instance)
+ state = instance_state(instance)
+ state.key = identitykey
+ state.identity_token = identity_token
+
+ # attach instance to session.
+ state.session_id = session_id
+ session_identity_map._add_unpresent(state, identitykey)
+
+ effective_populate_existing = populate_existing
+ if refresh_state is state:
+ effective_populate_existing = True
+
+ # populate. this looks at whether this state is new
+ # for this load or was existing, and whether or not this
+ # row is the first row with this identity.
+ if currentload or effective_populate_existing:
+ # full population routines. Objects here are either
+ # just created, or we are doing a populate_existing
+
+ # be conservative about setting load_path when populate_existing
+ # is in effect; want to maintain options from the original
+ # load. see test_expire->test_refresh_maintains_deferred_options
+ if isnew and (
+ propagated_loader_options or not effective_populate_existing
+ ):
+ state.load_options = propagated_loader_options
+ state.load_path = load_path
+
+ _populate_full(
+ context,
+ row,
+ state,
+ dict_,
+ isnew,
+ load_path,
+ loaded_instance,
+ effective_populate_existing,
+ populators,
+ )
+
+ if isnew:
+ # state.runid should be equal to context.runid / runid
+ # here, however for event checks we are being more conservative
+ # and checking against existing run id
+ # assert state.runid == runid
+
+ existing_runid = state.runid
+
+ if loaded_instance:
+ if load_evt:
+ state.manager.dispatch.load(state, context)
+ if state.runid != existing_runid:
+ _warn_for_runid_changed(state)
+ if persistent_evt:
+ loaded_as_persistent(context.session, state)
+ if state.runid != existing_runid:
+ _warn_for_runid_changed(state)
+ elif refresh_evt:
+ state.manager.dispatch.refresh(
+ state, context, only_load_props
+ )
+ if state.runid != runid:
+ _warn_for_runid_changed(state)
+
+ if effective_populate_existing or state.modified:
+ if refresh_state and only_load_props:
+ state._commit(dict_, only_load_props)
+ else:
+ state._commit_all(dict_, session_identity_map)
+
+ if post_load:
+ post_load.add_state(state, True)
+
+ else:
+ # partial population routines, for objects that were already
+ # in the Session, but a row matches them; apply eager loaders
+ # on existing objects, etc.
+ unloaded = state.unloaded
+ isnew = state not in context.partials
+
+ if not isnew or unloaded or populators["eager"]:
+ # state is having a partial set of its attributes
+ # refreshed. Populate those attributes,
+ # and add to the "context.partials" collection.
+
+ to_load = _populate_partial(
+ context,
+ row,
+ state,
+ dict_,
+ isnew,
+ load_path,
+ unloaded,
+ populators,
+ )
+
+ if isnew:
+ if refresh_evt:
+ existing_runid = state.runid
+ state.manager.dispatch.refresh(state, context, to_load)
+ if state.runid != existing_runid:
+ _warn_for_runid_changed(state)
+
+ state._commit(dict_, to_load)
+
+ if post_load and context.invoke_all_eagers:
+ post_load.add_state(state, False)
+
+ return instance
+
+ if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
+ # if we are doing polymorphic, dispatch to a different _instance()
+ # method specific to the subclass mapper
+ def ensure_no_pk(row):
+ identitykey = (
+ identity_class,
+ primary_key_getter(row),
+ identity_token,
+ )
+ if not is_not_primary_key(identitykey[1]):
+ return identitykey
+ else:
+ return None
+
+ _instance = _decorate_polymorphic_switch(
+ _instance,
+ context,
+ query_entity,
+ mapper,
+ result,
+ path,
+ polymorphic_discriminator,
+ adapter,
+ ensure_no_pk,
+ )
+
+ return _instance
+
+
+def _load_subclass_via_in(
+ context, path, entity, polymorphic_from, option_entities
+):
+ mapper = entity.mapper
+
+ # TODO: polymorphic_from seems to be a Mapper in all cases.
+ # this is likely not needed, but as we dont have typing in loading.py
+ # yet, err on the safe side
+ polymorphic_from_mapper = polymorphic_from.mapper
+ not_against_basemost = polymorphic_from_mapper.inherits is not None
+
+ zero_idx = len(mapper.base_mapper.primary_key) == 1
+
+ if entity.is_aliased_class or not_against_basemost:
+ q, enable_opt, disable_opt = mapper._subclass_load_via_in(
+ entity, polymorphic_from
+ )
+ else:
+ q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
+
+ def do_load(context, path, states, load_only, effective_entity):
+ if not option_entities:
+ # filter out states for those that would have selectinloaded
+ # from another loader
+ # TODO: we are currently ignoring the case where the
+ # "selectin_polymorphic" option is used, as this is much more
+ # complex / specific / very uncommon API use
+ states = [
+ (s, v)
+ for s, v in states
+ if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
+ ]
+
+ if not states:
+ return
+
+ orig_query = context.query
+
+ if path.parent:
+ enable_opt_lcl = enable_opt._prepend_path(path)
+ disable_opt_lcl = disable_opt._prepend_path(path)
+ else:
+ enable_opt_lcl = enable_opt
+ disable_opt_lcl = disable_opt
+ options = (
+ (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
+ )
+
+ q2 = q.options(*options)
+
+ q2._compile_options = context.compile_state.default_compile_options
+ q2._compile_options += {"_current_path": path.parent}
+
+ if context.populate_existing:
+ q2 = q2.execution_options(populate_existing=True)
+
+ context.session.execute(
+ q2,
+ dict(
+ primary_keys=[
+ state.key[1][0] if zero_idx else state.key[1]
+ for state, load_attrs in states
+ ]
+ ),
+ ).unique().scalars().all()
+
+ return do_load
+
+
+def _populate_full(
+ context,
+ row,
+ state,
+ dict_,
+ isnew,
+ load_path,
+ loaded_instance,
+ populate_existing,
+ populators,
+):
+ if isnew:
+ # first time we are seeing a row with this identity.
+ state.runid = context.runid
+
+ for key, getter in populators["quick"]:
+ dict_[key] = getter(row)
+ if populate_existing:
+ for key, set_callable in populators["expire"]:
+ dict_.pop(key, None)
+ if set_callable:
+ state.expired_attributes.add(key)
+ else:
+ for key, set_callable in populators["expire"]:
+ if set_callable:
+ state.expired_attributes.add(key)
+
+ for key, populator in populators["new"]:
+ populator(state, dict_, row)
+
+ elif load_path != state.load_path:
+ # new load path, e.g. object is present in more than one
+ # column position in a series of rows
+ state.load_path = load_path
+
+ # if we have data, and the data isn't in the dict, OK, let's put
+ # it in.
+ for key, getter in populators["quick"]:
+ if key not in dict_:
+ dict_[key] = getter(row)
+
+ # otherwise treat like an "already seen" row
+ for key, populator in populators["existing"]:
+ populator(state, dict_, row)
+ # TODO: allow "existing" populator to know this is
+ # a new path for the state:
+ # populator(state, dict_, row, new_path=True)
+
+ else:
+ # have already seen rows with this identity in this same path.
+ for key, populator in populators["existing"]:
+ populator(state, dict_, row)
+
+ # TODO: same path
+ # populator(state, dict_, row, new_path=False)
+
+
+def _populate_partial(
+ context, row, state, dict_, isnew, load_path, unloaded, populators
+):
+ if not isnew:
+ if unloaded:
+ # extra pass, see #8166
+ for key, getter in populators["quick"]:
+ if key in unloaded:
+ dict_[key] = getter(row)
+
+ to_load = context.partials[state]
+ for key, populator in populators["existing"]:
+ if key in to_load:
+ populator(state, dict_, row)
+ else:
+ to_load = unloaded
+ context.partials[state] = to_load
+
+ for key, getter in populators["quick"]:
+ if key in to_load:
+ dict_[key] = getter(row)
+ for key, set_callable in populators["expire"]:
+ if key in to_load:
+ dict_.pop(key, None)
+ if set_callable:
+ state.expired_attributes.add(key)
+ for key, populator in populators["new"]:
+ if key in to_load:
+ populator(state, dict_, row)
+
+ for key, populator in populators["eager"]:
+ if key not in unloaded:
+ populator(state, dict_, row)
+
+ return to_load
+
+
+def _validate_version_id(mapper, state, dict_, row, getter):
+ if mapper._get_state_attr_by_column(
+ state, dict_, mapper.version_id_col
+ ) != getter(row):
+ raise orm_exc.StaleDataError(
+ "Instance '%s' has version id '%s' which "
+ "does not match database-loaded version id '%s'."
+ % (
+ state_str(state),
+ mapper._get_state_attr_by_column(
+ state, dict_, mapper.version_id_col
+ ),
+ getter(row),
+ )
+ )
+
+
+def _decorate_polymorphic_switch(
+ instance_fn,
+ context,
+ query_entity,
+ mapper,
+ result,
+ path,
+ polymorphic_discriminator,
+ adapter,
+ ensure_no_pk,
+):
+ if polymorphic_discriminator is not None:
+ polymorphic_on = polymorphic_discriminator
+ else:
+ polymorphic_on = mapper.polymorphic_on
+ if polymorphic_on is None:
+ return instance_fn
+
+ if adapter:
+ polymorphic_on = adapter.columns[polymorphic_on]
+
+ def configure_subclass_mapper(discriminator):
+ try:
+ sub_mapper = mapper.polymorphic_map[discriminator]
+ except KeyError:
+ raise AssertionError(
+ "No such polymorphic_identity %r is defined" % discriminator
+ )
+ else:
+ if sub_mapper is mapper:
+ return None
+ elif not sub_mapper.isa(mapper):
+ return False
+
+ return _instance_processor(
+ query_entity,
+ sub_mapper,
+ context,
+ result,
+ path,
+ adapter,
+ _polymorphic_from=mapper,
+ )
+
+ polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
+
+ getter = result._getter(polymorphic_on)
+
+ def polymorphic_instance(row):
+ discriminator = getter(row)
+ if discriminator is not None:
+ _instance = polymorphic_instances[discriminator]
+ if _instance:
+ return _instance(row)
+ elif _instance is False:
+ identitykey = ensure_no_pk(row)
+
+ if identitykey:
+ raise sa_exc.InvalidRequestError(
+ "Row with identity key %s can't be loaded into an "
+ "object; the polymorphic discriminator column '%s' "
+ "refers to %s, which is not a sub-mapper of "
+ "the requested %s"
+ % (
+ identitykey,
+ polymorphic_on,
+ mapper.polymorphic_map[discriminator],
+ mapper,
+ )
+ )
+ else:
+ return None
+ else:
+ return instance_fn(row)
+ else:
+ identitykey = ensure_no_pk(row)
+
+ if identitykey:
+ raise sa_exc.InvalidRequestError(
+ "Row with identity key %s can't be loaded into an "
+ "object; the polymorphic discriminator column '%s' is "
+ "NULL" % (identitykey, polymorphic_on)
+ )
+ else:
+ return None
+
+ return polymorphic_instance
+
+
+class PostLoad:
+ """Track loaders and states for "post load" operations."""
+
+ __slots__ = "loaders", "states", "load_keys"
+
+ def __init__(self):
+ self.loaders = {}
+ self.states = util.OrderedDict()
+ self.load_keys = None
+
+ def add_state(self, state, overwrite):
+ # the states for a polymorphic load here are all shared
+ # within a single PostLoad object among multiple subtypes.
+ # Filtering of callables on a per-subclass basis needs to be done at
+ # the invocation level
+ self.states[state] = overwrite
+
+ def invoke(self, context, path):
+ if not self.states:
+ return
+ path = path_registry.PathRegistry.coerce(path)
+ for (
+ effective_context,
+ token,
+ limit_to_mapper,
+ loader,
+ arg,
+ kw,
+ ) in self.loaders.values():
+ states = [
+ (state, overwrite)
+ for state, overwrite in self.states.items()
+ if state.manager.mapper.isa(limit_to_mapper)
+ ]
+ if states:
+ loader(
+ effective_context, path, states, self.load_keys, *arg, **kw
+ )
+ self.states.clear()
+
+ @classmethod
+ def for_context(cls, context, path, only_load_props):
+ pl = context.post_load_paths.get(path.path)
+ if pl is not None and only_load_props:
+ pl.load_keys = only_load_props
+ return pl
+
+ @classmethod
+ def path_exists(self, context, path, key):
+ return (
+ path.path in context.post_load_paths
+ and key in context.post_load_paths[path.path].loaders
+ )
+
+ @classmethod
+ def callable_for_path(
+ cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
+ ):
+ if path.path in context.post_load_paths:
+ pl = context.post_load_paths[path.path]
+ else:
+ pl = context.post_load_paths[path.path] = PostLoad()
+ pl.loaders[token] = (
+ context,
+ token,
+ limit_to_mapper,
+ loader_callable,
+ arg,
+ kw,
+ )
+
+
+def load_scalar_attributes(mapper, state, attribute_names, passive):
+ """initiate a column-based attribute refresh operation."""
+
+ # assert mapper is _state_mapper(state)
+ session = state.session
+ if not session:
+ raise orm_exc.DetachedInstanceError(
+ "Instance %s is not bound to a Session; "
+ "attribute refresh operation cannot proceed" % (state_str(state))
+ )
+
+ no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
+
+ # in the case of inheritance, particularly concrete and abstract
+ # concrete inheritance, the class manager might have some keys
+ # of attributes on the superclass that we didn't actually map.
+ # These could be mapped as "concrete, don't load" or could be completely
+ # excluded from the mapping and we know nothing about them. Filter them
+ # here to prevent them from coming through.
+ if attribute_names:
+ attribute_names = attribute_names.intersection(mapper.attrs.keys())
+
+ if mapper.inherits and not mapper.concrete:
+ # load based on committed attributes in the object, formed into
+ # a truncated SELECT that only includes relevant tables. does not
+ # currently use state.key
+ statement = mapper._optimized_get_statement(state, attribute_names)
+ if statement is not None:
+ # undefer() isn't needed here because statement has the
+ # columns needed already, this implicitly undefers that column
+ stmt = FromStatement(mapper, statement)
+
+ return load_on_ident(
+ session,
+ stmt,
+ None,
+ only_load_props=attribute_names,
+ refresh_state=state,
+ no_autoflush=no_autoflush,
+ )
+
+ # normal load, use state.key as the identity to SELECT
+ has_key = bool(state.key)
+
+ if has_key:
+ identity_key = state.key
+ else:
+ # this codepath is rare - only valid when inside a flush, and the
+ # object is becoming persistent but hasn't yet been assigned
+ # an identity_key.
+ # check here to ensure we have the attrs we need.
+ pk_attrs = [
+ mapper._columntoproperty[col].key for col in mapper.primary_key
+ ]
+ if state.expired_attributes.intersection(pk_attrs):
+ raise sa_exc.InvalidRequestError(
+ "Instance %s cannot be refreshed - it's not "
+ " persistent and does not "
+ "contain a full primary key." % state_str(state)
+ )
+ identity_key = mapper._identity_key_from_state(state)
+
+ if (
+ _none_set.issubset(identity_key) and not mapper.allow_partial_pks
+ ) or _none_set.issuperset(identity_key):
+ util.warn_limited(
+ "Instance %s to be refreshed doesn't "
+ "contain a full primary key - can't be refreshed "
+ "(and shouldn't be expired, either).",
+ state_str(state),
+ )
+ return
+
+ result = load_on_ident(
+ session,
+ select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
+ identity_key,
+ refresh_state=state,
+ only_load_props=attribute_names,
+ no_autoflush=no_autoflush,
+ )
+
+ # if instance is pending, a refresh operation
+ # may not complete (even if PK attributes are assigned)
+ if has_key and result is None:
+ raise orm_exc.ObjectDeletedError(state)