diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 |
commit | 12cf076118570eebbff08c6b3090e0d4798447a1 (patch) | |
tree | 3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py | |
parent | c45662ff3923b34614ddcc8feb9195541166dcc5 (diff) |
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py | 1665 |
1 files changed, 0 insertions, 1665 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py deleted file mode 100644 index 4e2cb82..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py +++ /dev/null @@ -1,1665 +0,0 @@ -# orm/loading.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -"""private module containing functions used to convert database -rows into object instances and associated state. - -the functions here are called primarily by Query, Mapper, -as well as some of the attribute loading strategies. - -""" - -from __future__ import annotations - -from typing import Any -from typing import Dict -from typing import Iterable -from typing import List -from typing import Mapping -from typing import Optional -from typing import Sequence -from typing import Tuple -from typing import TYPE_CHECKING -from typing import TypeVar -from typing import Union - -from . import attributes -from . import exc as orm_exc -from . import path_registry -from .base import _DEFER_FOR_STATE -from .base import _RAISE_FOR_STATE -from .base import _SET_DEFERRED_EXPIRED -from .base import PassiveFlag -from .context import FromStatement -from .context import ORMCompileState -from .context import QueryContext -from .util import _none_set -from .util import state_str -from .. import exc as sa_exc -from .. import util -from ..engine import result_tuple -from ..engine.result import ChunkedIteratorResult -from ..engine.result import FrozenResult -from ..engine.result import SimpleResultMetaData -from ..sql import select -from ..sql import util as sql_util -from ..sql.selectable import ForUpdateArg -from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL -from ..sql.selectable import SelectState -from ..util import EMPTY_DICT - -if TYPE_CHECKING: - from ._typing import _IdentityKeyType - from .base import LoaderCallableStatus - from .interfaces import ORMOption - from .mapper import Mapper - from .query import Query - from .session import Session - from .state import InstanceState - from ..engine.cursor import CursorResult - from ..engine.interfaces import _ExecuteOptions - from ..engine.result import Result - from ..sql import Select - -_T = TypeVar("_T", bound=Any) -_O = TypeVar("_O", bound=object) -_new_runid = util.counter() - - -_PopulatorDict = Dict[str, List[Tuple[str, Any]]] - - -def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]: - """Return a :class:`.Result` given an ORM query context. - - :param cursor: a :class:`.CursorResult`, generated by a statement - which came from :class:`.ORMCompileState` - - :param context: a :class:`.QueryContext` object - - :return: a :class:`.Result` object representing ORM results - - .. versionchanged:: 1.4 The instances() function now uses - :class:`.Result` objects and has an all new interface. - - """ - - context.runid = _new_runid() - - if context.top_level_context: - is_top_level = False - context.post_load_paths = context.top_level_context.post_load_paths - else: - is_top_level = True - context.post_load_paths = {} - - compile_state = context.compile_state - filtered = compile_state._has_mapper_entities - single_entity = ( - not context.load_options._only_return_tuples - and len(compile_state._entities) == 1 - and compile_state._entities[0].supports_single_entity - ) - - try: - (process, labels, extra) = list( - zip( - *[ - query_entity.row_processor(context, cursor) - for query_entity in context.compile_state._entities - ] - ) - ) - - if context.yield_per and ( - context.loaders_require_buffering - or context.loaders_require_uniquing - ): - raise sa_exc.InvalidRequestError( - "Can't use yield_per with eager loaders that require uniquing " - "or row buffering, e.g. joinedload() against collections " - "or subqueryload(). Consider the selectinload() strategy " - "for better flexibility in loading objects." - ) - - except Exception: - with util.safe_reraise(): - cursor.close() - - def _no_unique(entry): - raise sa_exc.InvalidRequestError( - "Can't use the ORM yield_per feature in conjunction with unique()" - ) - - def _not_hashable(datatype, *, legacy=False, uncertain=False): - if not legacy: - - def go(obj): - if uncertain: - try: - return hash(obj) - except: - pass - - raise sa_exc.InvalidRequestError( - "Can't apply uniqueness to row tuple containing value of " - f"""type {datatype!r}; { - 'the values returned appear to be' - if uncertain - else 'this datatype produces' - } non-hashable values""" - ) - - return go - elif not uncertain: - return id - else: - _use_id = False - - def go(obj): - nonlocal _use_id - - if not _use_id: - try: - return hash(obj) - except: - pass - - # in #10459, we considered using a warning here, however - # as legacy query uses result.unique() in all cases, this - # would lead to too many warning cases. - _use_id = True - - return id(obj) - - return go - - unique_filters = [ - ( - _no_unique - if context.yield_per - else ( - _not_hashable( - ent.column.type, # type: ignore - legacy=context.load_options._legacy_uniquing, - uncertain=ent._null_column_type, - ) - if ( - not ent.use_id_for_hash - and (ent._non_hashable_value or ent._null_column_type) - ) - else id if ent.use_id_for_hash else None - ) - ) - for ent in context.compile_state._entities - ] - - row_metadata = SimpleResultMetaData( - labels, extra, _unique_filters=unique_filters - ) - - def chunks(size): # type: ignore - while True: - yield_per = size - - context.partials = {} - - if yield_per: - fetch = cursor.fetchmany(yield_per) - - if not fetch: - break - else: - fetch = cursor._raw_all_rows() - - if single_entity: - proc = process[0] - rows = [proc(row) for row in fetch] - else: - rows = [ - tuple([proc(row) for proc in process]) for row in fetch - ] - - # if we are the originating load from a query, meaning we - # aren't being called as a result of a nested "post load", - # iterate through all the collected post loaders and fire them - # off. Previously this used to work recursively, however that - # prevented deeply nested structures from being loadable - if is_top_level: - if yield_per: - # if using yield per, memoize the state of the - # collection so that it can be restored - top_level_post_loads = list( - context.post_load_paths.items() - ) - - while context.post_load_paths: - post_loads = list(context.post_load_paths.items()) - context.post_load_paths.clear() - for path, post_load in post_loads: - post_load.invoke(context, path) - - if yield_per: - context.post_load_paths.clear() - context.post_load_paths.update(top_level_post_loads) - - yield rows - - if not yield_per: - break - - if context.execution_options.get("prebuffer_rows", False): - # this is a bit of a hack at the moment. - # I would rather have some option in the result to pre-buffer - # internally. - _prebuffered = list(chunks(None)) - - def chunks(size): - return iter(_prebuffered) - - result = ChunkedIteratorResult( - row_metadata, - chunks, - source_supports_scalars=single_entity, - raw=cursor, - dynamic_yield_per=cursor.context._is_server_side, - ) - - # filtered and single_entity are used to indicate to legacy Query that the - # query has ORM entities, so legacy deduping and scalars should be called - # on the result. - result._attributes = result._attributes.union( - dict(filtered=filtered, is_single_entity=single_entity) - ) - - # multi_row_eager_loaders OTOH is specific to joinedload. - if context.compile_state.multi_row_eager_loaders: - - def require_unique(obj): - raise sa_exc.InvalidRequestError( - "The unique() method must be invoked on this Result, " - "as it contains results that include joined eager loads " - "against collections" - ) - - result._unique_filter_state = (None, require_unique) - - if context.yield_per: - result.yield_per(context.yield_per) - - return result - - -@util.preload_module("sqlalchemy.orm.context") -def merge_frozen_result(session, statement, frozen_result, load=True): - """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`, - returning a new :class:`_engine.Result` object with :term:`persistent` - objects. - - See the section :ref:`do_orm_execute_re_executing` for an example. - - .. seealso:: - - :ref:`do_orm_execute_re_executing` - - :meth:`_engine.Result.freeze` - - :class:`_engine.FrozenResult` - - """ - querycontext = util.preloaded.orm_context - - if load: - # flush current contents if we expect to load data - session._autoflush() - - ctx = querycontext.ORMSelectCompileState._create_entities_collection( - statement, legacy=False - ) - - autoflush = session.autoflush - try: - session.autoflush = False - mapped_entities = [ - i - for i, e in enumerate(ctx._entities) - if isinstance(e, querycontext._MapperEntity) - ] - keys = [ent._label_name for ent in ctx._entities] - - keyed_tuple = result_tuple( - keys, [ent._extra_entities for ent in ctx._entities] - ) - - result = [] - for newrow in frozen_result.rewrite_rows(): - for i in mapped_entities: - if newrow[i] is not None: - newrow[i] = session._merge( - attributes.instance_state(newrow[i]), - attributes.instance_dict(newrow[i]), - load=load, - _recursive={}, - _resolve_conflict_map={}, - ) - - result.append(keyed_tuple(newrow)) - - return frozen_result.with_new_rows(result) - finally: - session.autoflush = autoflush - - -@util.became_legacy_20( - ":func:`_orm.merge_result`", - alternative="The function as well as the method on :class:`_orm.Query` " - "is superseded by the :func:`_orm.merge_frozen_result` function.", -) -@util.preload_module("sqlalchemy.orm.context") -def merge_result( - query: Query[Any], - iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]], - load: bool = True, -) -> Union[FrozenResult, Iterable[Any]]: - """Merge a result into the given :class:`.Query` object's Session. - - See :meth:`_orm.Query.merge_result` for top-level documentation on this - function. - - """ - - querycontext = util.preloaded.orm_context - - session = query.session - if load: - # flush current contents if we expect to load data - session._autoflush() - - # TODO: need test coverage and documentation for the FrozenResult - # use case. - if isinstance(iterator, FrozenResult): - frozen_result = iterator - iterator = iter(frozen_result.data) - else: - frozen_result = None - - ctx = querycontext.ORMSelectCompileState._create_entities_collection( - query, legacy=True - ) - - autoflush = session.autoflush - try: - session.autoflush = False - single_entity = not frozen_result and len(ctx._entities) == 1 - - if single_entity: - if isinstance(ctx._entities[0], querycontext._MapperEntity): - result = [ - session._merge( - attributes.instance_state(instance), - attributes.instance_dict(instance), - load=load, - _recursive={}, - _resolve_conflict_map={}, - ) - for instance in iterator - ] - else: - result = list(iterator) - else: - mapped_entities = [ - i - for i, e in enumerate(ctx._entities) - if isinstance(e, querycontext._MapperEntity) - ] - result = [] - keys = [ent._label_name for ent in ctx._entities] - - keyed_tuple = result_tuple( - keys, [ent._extra_entities for ent in ctx._entities] - ) - - for row in iterator: - newrow = list(row) - for i in mapped_entities: - if newrow[i] is not None: - newrow[i] = session._merge( - attributes.instance_state(newrow[i]), - attributes.instance_dict(newrow[i]), - load=load, - _recursive={}, - _resolve_conflict_map={}, - ) - result.append(keyed_tuple(newrow)) - - if frozen_result: - return frozen_result.with_new_rows(result) - else: - return iter(result) - finally: - session.autoflush = autoflush - - -def get_from_identity( - session: Session, - mapper: Mapper[_O], - key: _IdentityKeyType[_O], - passive: PassiveFlag, -) -> Union[LoaderCallableStatus, Optional[_O]]: - """Look up the given key in the given session's identity map, - check the object for expired state if found. - - """ - instance = session.identity_map.get(key) - if instance is not None: - state = attributes.instance_state(instance) - - if mapper.inherits and not state.mapper.isa(mapper): - return attributes.PASSIVE_CLASS_MISMATCH - - # expired - ensure it still exists - if state.expired: - if not passive & attributes.SQL_OK: - # TODO: no coverage here - return attributes.PASSIVE_NO_RESULT - elif not passive & attributes.RELATED_OBJECT_OK: - # this mode is used within a flush and the instance's - # expired state will be checked soon enough, if necessary. - # also used by immediateloader for a mutually-dependent - # o2m->m2m load, :ticket:`6301` - return instance - try: - state._load_expired(state, passive) - except orm_exc.ObjectDeletedError: - session._remove_newly_deleted([state]) - return None - return instance - else: - return None - - -def load_on_ident( - session: Session, - statement: Union[Select, FromStatement], - key: Optional[_IdentityKeyType], - *, - load_options: Optional[Sequence[ORMOption]] = None, - refresh_state: Optional[InstanceState[Any]] = None, - with_for_update: Optional[ForUpdateArg] = None, - only_load_props: Optional[Iterable[str]] = None, - no_autoflush: bool = False, - bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, - execution_options: _ExecuteOptions = util.EMPTY_DICT, - require_pk_cols: bool = False, - is_user_refresh: bool = False, -): - """Load the given identity key from the database.""" - if key is not None: - ident = key[1] - identity_token = key[2] - else: - ident = identity_token = None - - return load_on_pk_identity( - session, - statement, - ident, - load_options=load_options, - refresh_state=refresh_state, - with_for_update=with_for_update, - only_load_props=only_load_props, - identity_token=identity_token, - no_autoflush=no_autoflush, - bind_arguments=bind_arguments, - execution_options=execution_options, - require_pk_cols=require_pk_cols, - is_user_refresh=is_user_refresh, - ) - - -def load_on_pk_identity( - session: Session, - statement: Union[Select, FromStatement], - primary_key_identity: Optional[Tuple[Any, ...]], - *, - load_options: Optional[Sequence[ORMOption]] = None, - refresh_state: Optional[InstanceState[Any]] = None, - with_for_update: Optional[ForUpdateArg] = None, - only_load_props: Optional[Iterable[str]] = None, - identity_token: Optional[Any] = None, - no_autoflush: bool = False, - bind_arguments: Mapping[str, Any] = util.EMPTY_DICT, - execution_options: _ExecuteOptions = util.EMPTY_DICT, - require_pk_cols: bool = False, - is_user_refresh: bool = False, -): - """Load the given primary key identity from the database.""" - - query = statement - q = query._clone() - - assert not q._is_lambda_element - - if load_options is None: - load_options = QueryContext.default_load_options - - if ( - statement._compile_options - is SelectState.default_select_compile_options - ): - compile_options = ORMCompileState.default_compile_options - else: - compile_options = statement._compile_options - - if primary_key_identity is not None: - mapper = query._propagate_attrs["plugin_subject"] - - (_get_clause, _get_params) = mapper._get_clause - - # None present in ident - turn those comparisons - # into "IS NULL" - if None in primary_key_identity: - nones = { - _get_params[col].key - for col, value in zip(mapper.primary_key, primary_key_identity) - if value is None - } - - _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones) - - if len(nones) == len(primary_key_identity): - util.warn( - "fully NULL primary key identity cannot load any " - "object. This condition may raise an error in a future " - "release." - ) - - q._where_criteria = ( - sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}), - ) - - params = { - _get_params[primary_key].key: id_val - for id_val, primary_key in zip( - primary_key_identity, mapper.primary_key - ) - } - else: - params = None - - if with_for_update is not None: - version_check = True - q._for_update_arg = with_for_update - elif query._for_update_arg is not None: - version_check = True - q._for_update_arg = query._for_update_arg - else: - version_check = False - - if require_pk_cols and only_load_props: - if not refresh_state: - raise sa_exc.ArgumentError( - "refresh_state is required when require_pk_cols is present" - ) - - refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys - has_changes = { - key - for key in refresh_state_prokeys.difference(only_load_props) - if refresh_state.attrs[key].history.has_changes() - } - if has_changes: - # raise if pending pk changes are present. - # technically, this could be limited to the case where we have - # relationships in the only_load_props collection to be refreshed - # also (and only ones that have a secondary eager loader, at that). - # however, the error is in place across the board so that behavior - # here is easier to predict. The use case it prevents is one - # of mutating PK attrs, leaving them unflushed, - # calling session.refresh(), and expecting those attrs to remain - # still unflushed. It seems likely someone doing all those - # things would be better off having the PK attributes flushed - # to the database before tinkering like that (session.refresh() is - # tinkering). - raise sa_exc.InvalidRequestError( - f"Please flush pending primary key changes on " - "attributes " - f"{has_changes} for mapper {refresh_state.mapper} before " - "proceeding with a refresh" - ) - - # overall, the ORM has no internal flow right now for "dont load the - # primary row of an object at all, but fire off - # selectinload/subqueryload/immediateload for some relationships". - # It would probably be a pretty big effort to add such a flow. So - # here, the case for #8703 is introduced; user asks to refresh some - # relationship attributes only which are - # selectinload/subqueryload/immediateload/ etc. (not joinedload). - # ORM complains there's no columns in the primary row to load. - # So here, we just add the PK cols if that - # case is detected, so that there is a SELECT emitted for the primary - # row. - # - # Let's just state right up front, for this one little case, - # the ORM here is adding a whole extra SELECT just to satisfy - # limitations in the internal flow. This is really not a thing - # SQLAlchemy finds itself doing like, ever, obviously, we are - # constantly working to *remove* SELECTs we don't need. We - # rationalize this for now based on 1. session.refresh() is not - # commonly used 2. session.refresh() with only relationship attrs is - # even less commonly used 3. the SELECT in question is very low - # latency. - # - # to add the flow to not include the SELECT, the quickest way - # might be to just manufacture a single-row result set to send off to - # instances(), but we'd have to weave that into context.py and all - # that. For 2.0.0, we have enough big changes to navigate for now. - # - mp = refresh_state.mapper._props - for p in only_load_props: - if mp[p]._is_relationship: - only_load_props = refresh_state_prokeys.union(only_load_props) - break - - if refresh_state and refresh_state.load_options: - compile_options += {"_current_path": refresh_state.load_path.parent} - q = q.options(*refresh_state.load_options) - - new_compile_options, load_options = _set_get_options( - compile_options, - load_options, - version_check=version_check, - only_load_props=only_load_props, - refresh_state=refresh_state, - identity_token=identity_token, - is_user_refresh=is_user_refresh, - ) - - q._compile_options = new_compile_options - q._order_by = None - - if no_autoflush: - load_options += {"_autoflush": False} - - execution_options = util.EMPTY_DICT.merge_with( - execution_options, {"_sa_orm_load_options": load_options} - ) - result = ( - session.execute( - q, - params=params, - execution_options=execution_options, - bind_arguments=bind_arguments, - ) - .unique() - .scalars() - ) - - try: - return result.one() - except orm_exc.NoResultFound: - return None - - -def _set_get_options( - compile_opt, - load_opt, - populate_existing=None, - version_check=None, - only_load_props=None, - refresh_state=None, - identity_token=None, - is_user_refresh=None, -): - compile_options = {} - load_options = {} - if version_check: - load_options["_version_check"] = version_check - if populate_existing: - load_options["_populate_existing"] = populate_existing - if refresh_state: - load_options["_refresh_state"] = refresh_state - compile_options["_for_refresh_state"] = True - if only_load_props: - compile_options["_only_load_props"] = frozenset(only_load_props) - if identity_token: - load_options["_identity_token"] = identity_token - - if is_user_refresh: - load_options["_is_user_refresh"] = is_user_refresh - if load_options: - load_opt += load_options - if compile_options: - compile_opt += compile_options - - return compile_opt, load_opt - - -def _setup_entity_query( - compile_state, - mapper, - query_entity, - path, - adapter, - column_collection, - with_polymorphic=None, - only_load_props=None, - polymorphic_discriminator=None, - **kw, -): - if with_polymorphic: - poly_properties = mapper._iterate_polymorphic_properties( - with_polymorphic - ) - else: - poly_properties = mapper._polymorphic_properties - - quick_populators = {} - - path.set(compile_state.attributes, "memoized_setups", quick_populators) - - # for the lead entities in the path, e.g. not eager loads, and - # assuming a user-passed aliased class, e.g. not a from_self() or any - # implicit aliasing, don't add columns to the SELECT that aren't - # in the thing that's aliased. - check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class - - for value in poly_properties: - if only_load_props and value.key not in only_load_props: - continue - value.setup( - compile_state, - query_entity, - path, - adapter, - only_load_props=only_load_props, - column_collection=column_collection, - memoized_populators=quick_populators, - check_for_adapt=check_for_adapt, - **kw, - ) - - if ( - polymorphic_discriminator is not None - and polymorphic_discriminator is not mapper.polymorphic_on - ): - if adapter: - pd = adapter.columns[polymorphic_discriminator] - else: - pd = polymorphic_discriminator - column_collection.append(pd) - - -def _warn_for_runid_changed(state): - util.warn( - "Loading context for %s has changed within a load/refresh " - "handler, suggesting a row refresh operation took place. If this " - "event handler is expected to be " - "emitting row refresh operations within an existing load or refresh " - "operation, set restore_load_context=True when establishing the " - "listener to ensure the context remains unchanged when the event " - "handler completes." % (state_str(state),) - ) - - -def _instance_processor( - query_entity, - mapper, - context, - result, - path, - adapter, - only_load_props=None, - refresh_state=None, - polymorphic_discriminator=None, - _polymorphic_from=None, -): - """Produce a mapper level row processor callable - which processes rows into mapped instances.""" - - # note that this method, most of which exists in a closure - # called _instance(), resists being broken out, as - # attempts to do so tend to add significant function - # call overhead. _instance() is the most - # performance-critical section in the whole ORM. - - identity_class = mapper._identity_class - compile_state = context.compile_state - - # look for "row getter" functions that have been assigned along - # with the compile state that were cached from a previous load. - # these are operator.itemgetter() objects that each will extract a - # particular column from each row. - - getter_key = ("getters", mapper) - getters = path.get(compile_state.attributes, getter_key, None) - - if getters is None: - # no getters, so go through a list of attributes we are loading for, - # and the ones that are column based will have already put information - # for us in another collection "memoized_setups", which represents the - # output of the LoaderStrategy.setup_query() method. We can just as - # easily call LoaderStrategy.create_row_processor for each, but by - # getting it all at once from setup_query we save another method call - # per attribute. - props = mapper._prop_set - if only_load_props is not None: - props = props.intersection( - mapper._props[k] for k in only_load_props - ) - - quick_populators = path.get( - context.attributes, "memoized_setups", EMPTY_DICT - ) - - todo = [] - cached_populators = { - "new": [], - "quick": [], - "deferred": [], - "expire": [], - "existing": [], - "eager": [], - } - - if refresh_state is None: - # we can also get the "primary key" tuple getter function - pk_cols = mapper.primary_key - - if adapter: - pk_cols = [adapter.columns[c] for c in pk_cols] - primary_key_getter = result._tuple_getter(pk_cols) - else: - primary_key_getter = None - - getters = { - "cached_populators": cached_populators, - "todo": todo, - "primary_key_getter": primary_key_getter, - } - for prop in props: - if prop in quick_populators: - # this is an inlined path just for column-based attributes. - col = quick_populators[prop] - if col is _DEFER_FOR_STATE: - cached_populators["new"].append( - (prop.key, prop._deferred_column_loader) - ) - elif col is _SET_DEFERRED_EXPIRED: - # note that in this path, we are no longer - # searching in the result to see if the column might - # be present in some unexpected way. - cached_populators["expire"].append((prop.key, False)) - elif col is _RAISE_FOR_STATE: - cached_populators["new"].append( - (prop.key, prop._raise_column_loader) - ) - else: - getter = None - if adapter: - # this logic had been removed for all 1.4 releases - # up until 1.4.18; the adapter here is particularly - # the compound eager adapter which isn't accommodated - # in the quick_populators right now. The "fallback" - # logic below instead took over in many more cases - # until issue #6596 was identified. - - # note there is still an issue where this codepath - # produces no "getter" for cases where a joined-inh - # mapping includes a labeled column property, meaning - # KeyError is caught internally and we fall back to - # _getter(col), which works anyway. The adapter - # here for joined inh without any aliasing might not - # be useful. Tests which see this include - # test.orm.inheritance.test_basic -> - # EagerTargetingTest.test_adapt_stringency - # OptimizedLoadTest.test_column_expression_joined - # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501 - # - - adapted_col = adapter.columns[col] - if adapted_col is not None: - getter = result._getter(adapted_col, False) - if not getter: - getter = result._getter(col, False) - if getter: - cached_populators["quick"].append((prop.key, getter)) - else: - # fall back to the ColumnProperty itself, which - # will iterate through all of its columns - # to see if one fits - prop.create_row_processor( - context, - query_entity, - path, - mapper, - result, - adapter, - cached_populators, - ) - else: - # loader strategies like subqueryload, selectinload, - # joinedload, basically relationships, these need to interact - # with the context each time to work correctly. - todo.append(prop) - - path.set(compile_state.attributes, getter_key, getters) - - cached_populators = getters["cached_populators"] - - populators = {key: list(value) for key, value in cached_populators.items()} - for prop in getters["todo"]: - prop.create_row_processor( - context, query_entity, path, mapper, result, adapter, populators - ) - - propagated_loader_options = context.propagated_loader_options - load_path = ( - context.compile_state.current_path + path - if context.compile_state.current_path.path - else path - ) - - session_identity_map = context.session.identity_map - - populate_existing = context.populate_existing or mapper.always_refresh - load_evt = bool(mapper.class_manager.dispatch.load) - refresh_evt = bool(mapper.class_manager.dispatch.refresh) - persistent_evt = bool(context.session.dispatch.loaded_as_persistent) - if persistent_evt: - loaded_as_persistent = context.session.dispatch.loaded_as_persistent - instance_state = attributes.instance_state - instance_dict = attributes.instance_dict - session_id = context.session.hash_key - runid = context.runid - identity_token = context.identity_token - - version_check = context.version_check - if version_check: - version_id_col = mapper.version_id_col - if version_id_col is not None: - if adapter: - version_id_col = adapter.columns[version_id_col] - version_id_getter = result._getter(version_id_col) - else: - version_id_getter = None - - if not refresh_state and _polymorphic_from is not None: - key = ("loader", path.path) - - if key in context.attributes and context.attributes[key].strategy == ( - ("selectinload_polymorphic", True), - ): - option_entities = context.attributes[key].local_opts["entities"] - else: - option_entities = None - selectin_load_via = mapper._should_selectin_load( - option_entities, - _polymorphic_from, - ) - - if selectin_load_via and selectin_load_via is not _polymorphic_from: - # only_load_props goes w/ refresh_state only, and in a refresh - # we are a single row query for the exact entity; polymorphic - # loading does not apply - assert only_load_props is None - - callable_ = _load_subclass_via_in( - context, - path, - selectin_load_via, - _polymorphic_from, - option_entities, - ) - PostLoad.callable_for_path( - context, - load_path, - selectin_load_via.mapper, - selectin_load_via, - callable_, - selectin_load_via, - ) - - post_load = PostLoad.for_context(context, load_path, only_load_props) - - if refresh_state: - refresh_identity_key = refresh_state.key - if refresh_identity_key is None: - # super-rare condition; a refresh is being called - # on a non-instance-key instance; this is meant to only - # occur within a flush() - refresh_identity_key = mapper._identity_key_from_state( - refresh_state - ) - else: - refresh_identity_key = None - - primary_key_getter = getters["primary_key_getter"] - - if mapper.allow_partial_pks: - is_not_primary_key = _none_set.issuperset - else: - is_not_primary_key = _none_set.intersection - - def _instance(row): - # determine the state that we'll be populating - if refresh_identity_key: - # fixed state that we're refreshing - state = refresh_state - instance = state.obj() - dict_ = instance_dict(instance) - isnew = state.runid != runid - currentload = True - loaded_instance = False - else: - # look at the row, see if that identity is in the - # session, or we have to create a new one - identitykey = ( - identity_class, - primary_key_getter(row), - identity_token, - ) - - instance = session_identity_map.get(identitykey) - - if instance is not None: - # existing instance - state = instance_state(instance) - dict_ = instance_dict(instance) - - isnew = state.runid != runid - currentload = not isnew - loaded_instance = False - - if version_check and version_id_getter and not currentload: - _validate_version_id( - mapper, state, dict_, row, version_id_getter - ) - - else: - # create a new instance - - # check for non-NULL values in the primary key columns, - # else no entity is returned for the row - if is_not_primary_key(identitykey[1]): - return None - - isnew = True - currentload = True - loaded_instance = True - - instance = mapper.class_manager.new_instance() - - dict_ = instance_dict(instance) - state = instance_state(instance) - state.key = identitykey - state.identity_token = identity_token - - # attach instance to session. - state.session_id = session_id - session_identity_map._add_unpresent(state, identitykey) - - effective_populate_existing = populate_existing - if refresh_state is state: - effective_populate_existing = True - - # populate. this looks at whether this state is new - # for this load or was existing, and whether or not this - # row is the first row with this identity. - if currentload or effective_populate_existing: - # full population routines. Objects here are either - # just created, or we are doing a populate_existing - - # be conservative about setting load_path when populate_existing - # is in effect; want to maintain options from the original - # load. see test_expire->test_refresh_maintains_deferred_options - if isnew and ( - propagated_loader_options or not effective_populate_existing - ): - state.load_options = propagated_loader_options - state.load_path = load_path - - _populate_full( - context, - row, - state, - dict_, - isnew, - load_path, - loaded_instance, - effective_populate_existing, - populators, - ) - - if isnew: - # state.runid should be equal to context.runid / runid - # here, however for event checks we are being more conservative - # and checking against existing run id - # assert state.runid == runid - - existing_runid = state.runid - - if loaded_instance: - if load_evt: - state.manager.dispatch.load(state, context) - if state.runid != existing_runid: - _warn_for_runid_changed(state) - if persistent_evt: - loaded_as_persistent(context.session, state) - if state.runid != existing_runid: - _warn_for_runid_changed(state) - elif refresh_evt: - state.manager.dispatch.refresh( - state, context, only_load_props - ) - if state.runid != runid: - _warn_for_runid_changed(state) - - if effective_populate_existing or state.modified: - if refresh_state and only_load_props: - state._commit(dict_, only_load_props) - else: - state._commit_all(dict_, session_identity_map) - - if post_load: - post_load.add_state(state, True) - - else: - # partial population routines, for objects that were already - # in the Session, but a row matches them; apply eager loaders - # on existing objects, etc. - unloaded = state.unloaded - isnew = state not in context.partials - - if not isnew or unloaded or populators["eager"]: - # state is having a partial set of its attributes - # refreshed. Populate those attributes, - # and add to the "context.partials" collection. - - to_load = _populate_partial( - context, - row, - state, - dict_, - isnew, - load_path, - unloaded, - populators, - ) - - if isnew: - if refresh_evt: - existing_runid = state.runid - state.manager.dispatch.refresh(state, context, to_load) - if state.runid != existing_runid: - _warn_for_runid_changed(state) - - state._commit(dict_, to_load) - - if post_load and context.invoke_all_eagers: - post_load.add_state(state, False) - - return instance - - if mapper.polymorphic_map and not _polymorphic_from and not refresh_state: - # if we are doing polymorphic, dispatch to a different _instance() - # method specific to the subclass mapper - def ensure_no_pk(row): - identitykey = ( - identity_class, - primary_key_getter(row), - identity_token, - ) - if not is_not_primary_key(identitykey[1]): - return identitykey - else: - return None - - _instance = _decorate_polymorphic_switch( - _instance, - context, - query_entity, - mapper, - result, - path, - polymorphic_discriminator, - adapter, - ensure_no_pk, - ) - - return _instance - - -def _load_subclass_via_in( - context, path, entity, polymorphic_from, option_entities -): - mapper = entity.mapper - - # TODO: polymorphic_from seems to be a Mapper in all cases. - # this is likely not needed, but as we dont have typing in loading.py - # yet, err on the safe side - polymorphic_from_mapper = polymorphic_from.mapper - not_against_basemost = polymorphic_from_mapper.inherits is not None - - zero_idx = len(mapper.base_mapper.primary_key) == 1 - - if entity.is_aliased_class or not_against_basemost: - q, enable_opt, disable_opt = mapper._subclass_load_via_in( - entity, polymorphic_from - ) - else: - q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper - - def do_load(context, path, states, load_only, effective_entity): - if not option_entities: - # filter out states for those that would have selectinloaded - # from another loader - # TODO: we are currently ignoring the case where the - # "selectin_polymorphic" option is used, as this is much more - # complex / specific / very uncommon API use - states = [ - (s, v) - for s, v in states - if s.mapper._would_selectin_load_only_from_given_mapper(mapper) - ] - - if not states: - return - - orig_query = context.query - - if path.parent: - enable_opt_lcl = enable_opt._prepend_path(path) - disable_opt_lcl = disable_opt._prepend_path(path) - else: - enable_opt_lcl = enable_opt - disable_opt_lcl = disable_opt - options = ( - (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,) - ) - - q2 = q.options(*options) - - q2._compile_options = context.compile_state.default_compile_options - q2._compile_options += {"_current_path": path.parent} - - if context.populate_existing: - q2 = q2.execution_options(populate_existing=True) - - context.session.execute( - q2, - dict( - primary_keys=[ - state.key[1][0] if zero_idx else state.key[1] - for state, load_attrs in states - ] - ), - ).unique().scalars().all() - - return do_load - - -def _populate_full( - context, - row, - state, - dict_, - isnew, - load_path, - loaded_instance, - populate_existing, - populators, -): - if isnew: - # first time we are seeing a row with this identity. - state.runid = context.runid - - for key, getter in populators["quick"]: - dict_[key] = getter(row) - if populate_existing: - for key, set_callable in populators["expire"]: - dict_.pop(key, None) - if set_callable: - state.expired_attributes.add(key) - else: - for key, set_callable in populators["expire"]: - if set_callable: - state.expired_attributes.add(key) - - for key, populator in populators["new"]: - populator(state, dict_, row) - - elif load_path != state.load_path: - # new load path, e.g. object is present in more than one - # column position in a series of rows - state.load_path = load_path - - # if we have data, and the data isn't in the dict, OK, let's put - # it in. - for key, getter in populators["quick"]: - if key not in dict_: - dict_[key] = getter(row) - - # otherwise treat like an "already seen" row - for key, populator in populators["existing"]: - populator(state, dict_, row) - # TODO: allow "existing" populator to know this is - # a new path for the state: - # populator(state, dict_, row, new_path=True) - - else: - # have already seen rows with this identity in this same path. - for key, populator in populators["existing"]: - populator(state, dict_, row) - - # TODO: same path - # populator(state, dict_, row, new_path=False) - - -def _populate_partial( - context, row, state, dict_, isnew, load_path, unloaded, populators -): - if not isnew: - if unloaded: - # extra pass, see #8166 - for key, getter in populators["quick"]: - if key in unloaded: - dict_[key] = getter(row) - - to_load = context.partials[state] - for key, populator in populators["existing"]: - if key in to_load: - populator(state, dict_, row) - else: - to_load = unloaded - context.partials[state] = to_load - - for key, getter in populators["quick"]: - if key in to_load: - dict_[key] = getter(row) - for key, set_callable in populators["expire"]: - if key in to_load: - dict_.pop(key, None) - if set_callable: - state.expired_attributes.add(key) - for key, populator in populators["new"]: - if key in to_load: - populator(state, dict_, row) - - for key, populator in populators["eager"]: - if key not in unloaded: - populator(state, dict_, row) - - return to_load - - -def _validate_version_id(mapper, state, dict_, row, getter): - if mapper._get_state_attr_by_column( - state, dict_, mapper.version_id_col - ) != getter(row): - raise orm_exc.StaleDataError( - "Instance '%s' has version id '%s' which " - "does not match database-loaded version id '%s'." - % ( - state_str(state), - mapper._get_state_attr_by_column( - state, dict_, mapper.version_id_col - ), - getter(row), - ) - ) - - -def _decorate_polymorphic_switch( - instance_fn, - context, - query_entity, - mapper, - result, - path, - polymorphic_discriminator, - adapter, - ensure_no_pk, -): - if polymorphic_discriminator is not None: - polymorphic_on = polymorphic_discriminator - else: - polymorphic_on = mapper.polymorphic_on - if polymorphic_on is None: - return instance_fn - - if adapter: - polymorphic_on = adapter.columns[polymorphic_on] - - def configure_subclass_mapper(discriminator): - try: - sub_mapper = mapper.polymorphic_map[discriminator] - except KeyError: - raise AssertionError( - "No such polymorphic_identity %r is defined" % discriminator - ) - else: - if sub_mapper is mapper: - return None - elif not sub_mapper.isa(mapper): - return False - - return _instance_processor( - query_entity, - sub_mapper, - context, - result, - path, - adapter, - _polymorphic_from=mapper, - ) - - polymorphic_instances = util.PopulateDict(configure_subclass_mapper) - - getter = result._getter(polymorphic_on) - - def polymorphic_instance(row): - discriminator = getter(row) - if discriminator is not None: - _instance = polymorphic_instances[discriminator] - if _instance: - return _instance(row) - elif _instance is False: - identitykey = ensure_no_pk(row) - - if identitykey: - raise sa_exc.InvalidRequestError( - "Row with identity key %s can't be loaded into an " - "object; the polymorphic discriminator column '%s' " - "refers to %s, which is not a sub-mapper of " - "the requested %s" - % ( - identitykey, - polymorphic_on, - mapper.polymorphic_map[discriminator], - mapper, - ) - ) - else: - return None - else: - return instance_fn(row) - else: - identitykey = ensure_no_pk(row) - - if identitykey: - raise sa_exc.InvalidRequestError( - "Row with identity key %s can't be loaded into an " - "object; the polymorphic discriminator column '%s' is " - "NULL" % (identitykey, polymorphic_on) - ) - else: - return None - - return polymorphic_instance - - -class PostLoad: - """Track loaders and states for "post load" operations.""" - - __slots__ = "loaders", "states", "load_keys" - - def __init__(self): - self.loaders = {} - self.states = util.OrderedDict() - self.load_keys = None - - def add_state(self, state, overwrite): - # the states for a polymorphic load here are all shared - # within a single PostLoad object among multiple subtypes. - # Filtering of callables on a per-subclass basis needs to be done at - # the invocation level - self.states[state] = overwrite - - def invoke(self, context, path): - if not self.states: - return - path = path_registry.PathRegistry.coerce(path) - for ( - effective_context, - token, - limit_to_mapper, - loader, - arg, - kw, - ) in self.loaders.values(): - states = [ - (state, overwrite) - for state, overwrite in self.states.items() - if state.manager.mapper.isa(limit_to_mapper) - ] - if states: - loader( - effective_context, path, states, self.load_keys, *arg, **kw - ) - self.states.clear() - - @classmethod - def for_context(cls, context, path, only_load_props): - pl = context.post_load_paths.get(path.path) - if pl is not None and only_load_props: - pl.load_keys = only_load_props - return pl - - @classmethod - def path_exists(self, context, path, key): - return ( - path.path in context.post_load_paths - and key in context.post_load_paths[path.path].loaders - ) - - @classmethod - def callable_for_path( - cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw - ): - if path.path in context.post_load_paths: - pl = context.post_load_paths[path.path] - else: - pl = context.post_load_paths[path.path] = PostLoad() - pl.loaders[token] = ( - context, - token, - limit_to_mapper, - loader_callable, - arg, - kw, - ) - - -def load_scalar_attributes(mapper, state, attribute_names, passive): - """initiate a column-based attribute refresh operation.""" - - # assert mapper is _state_mapper(state) - session = state.session - if not session: - raise orm_exc.DetachedInstanceError( - "Instance %s is not bound to a Session; " - "attribute refresh operation cannot proceed" % (state_str(state)) - ) - - no_autoflush = bool(passive & attributes.NO_AUTOFLUSH) - - # in the case of inheritance, particularly concrete and abstract - # concrete inheritance, the class manager might have some keys - # of attributes on the superclass that we didn't actually map. - # These could be mapped as "concrete, don't load" or could be completely - # excluded from the mapping and we know nothing about them. Filter them - # here to prevent them from coming through. - if attribute_names: - attribute_names = attribute_names.intersection(mapper.attrs.keys()) - - if mapper.inherits and not mapper.concrete: - # load based on committed attributes in the object, formed into - # a truncated SELECT that only includes relevant tables. does not - # currently use state.key - statement = mapper._optimized_get_statement(state, attribute_names) - if statement is not None: - # undefer() isn't needed here because statement has the - # columns needed already, this implicitly undefers that column - stmt = FromStatement(mapper, statement) - - return load_on_ident( - session, - stmt, - None, - only_load_props=attribute_names, - refresh_state=state, - no_autoflush=no_autoflush, - ) - - # normal load, use state.key as the identity to SELECT - has_key = bool(state.key) - - if has_key: - identity_key = state.key - else: - # this codepath is rare - only valid when inside a flush, and the - # object is becoming persistent but hasn't yet been assigned - # an identity_key. - # check here to ensure we have the attrs we need. - pk_attrs = [ - mapper._columntoproperty[col].key for col in mapper.primary_key - ] - if state.expired_attributes.intersection(pk_attrs): - raise sa_exc.InvalidRequestError( - "Instance %s cannot be refreshed - it's not " - " persistent and does not " - "contain a full primary key." % state_str(state) - ) - identity_key = mapper._identity_key_from_state(state) - - if ( - _none_set.issubset(identity_key) and not mapper.allow_partial_pks - ) or _none_set.issuperset(identity_key): - util.warn_limited( - "Instance %s to be refreshed doesn't " - "contain a full primary key - can't be refreshed " - "(and shouldn't be expired, either).", - state_str(state), - ) - return - - result = load_on_ident( - session, - select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL), - identity_key, - refresh_state=state, - only_load_props=attribute_names, - no_autoflush=no_autoflush, - ) - - # if instance is pending, a refresh operation - # may not complete (even if PK attributes are assigned) - if has_key and result is None: - raise orm_exc.ObjectDeletedError(state) |