summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py1665
1 files changed, 0 insertions, 1665 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py
deleted file mode 100644
index 4e2cb82..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py
+++ /dev/null
@@ -1,1665 +0,0 @@
-# orm/loading.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-"""private module containing functions used to convert database
-rows into object instances and associated state.
-
-the functions here are called primarily by Query, Mapper,
-as well as some of the attribute loading strategies.
-
-"""
-
-from __future__ import annotations
-
-from typing import Any
-from typing import Dict
-from typing import Iterable
-from typing import List
-from typing import Mapping
-from typing import Optional
-from typing import Sequence
-from typing import Tuple
-from typing import TYPE_CHECKING
-from typing import TypeVar
-from typing import Union
-
-from . import attributes
-from . import exc as orm_exc
-from . import path_registry
-from .base import _DEFER_FOR_STATE
-from .base import _RAISE_FOR_STATE
-from .base import _SET_DEFERRED_EXPIRED
-from .base import PassiveFlag
-from .context import FromStatement
-from .context import ORMCompileState
-from .context import QueryContext
-from .util import _none_set
-from .util import state_str
-from .. import exc as sa_exc
-from .. import util
-from ..engine import result_tuple
-from ..engine.result import ChunkedIteratorResult
-from ..engine.result import FrozenResult
-from ..engine.result import SimpleResultMetaData
-from ..sql import select
-from ..sql import util as sql_util
-from ..sql.selectable import ForUpdateArg
-from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
-from ..sql.selectable import SelectState
-from ..util import EMPTY_DICT
-
-if TYPE_CHECKING:
- from ._typing import _IdentityKeyType
- from .base import LoaderCallableStatus
- from .interfaces import ORMOption
- from .mapper import Mapper
- from .query import Query
- from .session import Session
- from .state import InstanceState
- from ..engine.cursor import CursorResult
- from ..engine.interfaces import _ExecuteOptions
- from ..engine.result import Result
- from ..sql import Select
-
-_T = TypeVar("_T", bound=Any)
-_O = TypeVar("_O", bound=object)
-_new_runid = util.counter()
-
-
-_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
-
-
-def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
- """Return a :class:`.Result` given an ORM query context.
-
- :param cursor: a :class:`.CursorResult`, generated by a statement
- which came from :class:`.ORMCompileState`
-
- :param context: a :class:`.QueryContext` object
-
- :return: a :class:`.Result` object representing ORM results
-
- .. versionchanged:: 1.4 The instances() function now uses
- :class:`.Result` objects and has an all new interface.
-
- """
-
- context.runid = _new_runid()
-
- if context.top_level_context:
- is_top_level = False
- context.post_load_paths = context.top_level_context.post_load_paths
- else:
- is_top_level = True
- context.post_load_paths = {}
-
- compile_state = context.compile_state
- filtered = compile_state._has_mapper_entities
- single_entity = (
- not context.load_options._only_return_tuples
- and len(compile_state._entities) == 1
- and compile_state._entities[0].supports_single_entity
- )
-
- try:
- (process, labels, extra) = list(
- zip(
- *[
- query_entity.row_processor(context, cursor)
- for query_entity in context.compile_state._entities
- ]
- )
- )
-
- if context.yield_per and (
- context.loaders_require_buffering
- or context.loaders_require_uniquing
- ):
- raise sa_exc.InvalidRequestError(
- "Can't use yield_per with eager loaders that require uniquing "
- "or row buffering, e.g. joinedload() against collections "
- "or subqueryload(). Consider the selectinload() strategy "
- "for better flexibility in loading objects."
- )
-
- except Exception:
- with util.safe_reraise():
- cursor.close()
-
- def _no_unique(entry):
- raise sa_exc.InvalidRequestError(
- "Can't use the ORM yield_per feature in conjunction with unique()"
- )
-
- def _not_hashable(datatype, *, legacy=False, uncertain=False):
- if not legacy:
-
- def go(obj):
- if uncertain:
- try:
- return hash(obj)
- except:
- pass
-
- raise sa_exc.InvalidRequestError(
- "Can't apply uniqueness to row tuple containing value of "
- f"""type {datatype!r}; {
- 'the values returned appear to be'
- if uncertain
- else 'this datatype produces'
- } non-hashable values"""
- )
-
- return go
- elif not uncertain:
- return id
- else:
- _use_id = False
-
- def go(obj):
- nonlocal _use_id
-
- if not _use_id:
- try:
- return hash(obj)
- except:
- pass
-
- # in #10459, we considered using a warning here, however
- # as legacy query uses result.unique() in all cases, this
- # would lead to too many warning cases.
- _use_id = True
-
- return id(obj)
-
- return go
-
- unique_filters = [
- (
- _no_unique
- if context.yield_per
- else (
- _not_hashable(
- ent.column.type, # type: ignore
- legacy=context.load_options._legacy_uniquing,
- uncertain=ent._null_column_type,
- )
- if (
- not ent.use_id_for_hash
- and (ent._non_hashable_value or ent._null_column_type)
- )
- else id if ent.use_id_for_hash else None
- )
- )
- for ent in context.compile_state._entities
- ]
-
- row_metadata = SimpleResultMetaData(
- labels, extra, _unique_filters=unique_filters
- )
-
- def chunks(size): # type: ignore
- while True:
- yield_per = size
-
- context.partials = {}
-
- if yield_per:
- fetch = cursor.fetchmany(yield_per)
-
- if not fetch:
- break
- else:
- fetch = cursor._raw_all_rows()
-
- if single_entity:
- proc = process[0]
- rows = [proc(row) for row in fetch]
- else:
- rows = [
- tuple([proc(row) for proc in process]) for row in fetch
- ]
-
- # if we are the originating load from a query, meaning we
- # aren't being called as a result of a nested "post load",
- # iterate through all the collected post loaders and fire them
- # off. Previously this used to work recursively, however that
- # prevented deeply nested structures from being loadable
- if is_top_level:
- if yield_per:
- # if using yield per, memoize the state of the
- # collection so that it can be restored
- top_level_post_loads = list(
- context.post_load_paths.items()
- )
-
- while context.post_load_paths:
- post_loads = list(context.post_load_paths.items())
- context.post_load_paths.clear()
- for path, post_load in post_loads:
- post_load.invoke(context, path)
-
- if yield_per:
- context.post_load_paths.clear()
- context.post_load_paths.update(top_level_post_loads)
-
- yield rows
-
- if not yield_per:
- break
-
- if context.execution_options.get("prebuffer_rows", False):
- # this is a bit of a hack at the moment.
- # I would rather have some option in the result to pre-buffer
- # internally.
- _prebuffered = list(chunks(None))
-
- def chunks(size):
- return iter(_prebuffered)
-
- result = ChunkedIteratorResult(
- row_metadata,
- chunks,
- source_supports_scalars=single_entity,
- raw=cursor,
- dynamic_yield_per=cursor.context._is_server_side,
- )
-
- # filtered and single_entity are used to indicate to legacy Query that the
- # query has ORM entities, so legacy deduping and scalars should be called
- # on the result.
- result._attributes = result._attributes.union(
- dict(filtered=filtered, is_single_entity=single_entity)
- )
-
- # multi_row_eager_loaders OTOH is specific to joinedload.
- if context.compile_state.multi_row_eager_loaders:
-
- def require_unique(obj):
- raise sa_exc.InvalidRequestError(
- "The unique() method must be invoked on this Result, "
- "as it contains results that include joined eager loads "
- "against collections"
- )
-
- result._unique_filter_state = (None, require_unique)
-
- if context.yield_per:
- result.yield_per(context.yield_per)
-
- return result
-
-
-@util.preload_module("sqlalchemy.orm.context")
-def merge_frozen_result(session, statement, frozen_result, load=True):
- """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
- returning a new :class:`_engine.Result` object with :term:`persistent`
- objects.
-
- See the section :ref:`do_orm_execute_re_executing` for an example.
-
- .. seealso::
-
- :ref:`do_orm_execute_re_executing`
-
- :meth:`_engine.Result.freeze`
-
- :class:`_engine.FrozenResult`
-
- """
- querycontext = util.preloaded.orm_context
-
- if load:
- # flush current contents if we expect to load data
- session._autoflush()
-
- ctx = querycontext.ORMSelectCompileState._create_entities_collection(
- statement, legacy=False
- )
-
- autoflush = session.autoflush
- try:
- session.autoflush = False
- mapped_entities = [
- i
- for i, e in enumerate(ctx._entities)
- if isinstance(e, querycontext._MapperEntity)
- ]
- keys = [ent._label_name for ent in ctx._entities]
-
- keyed_tuple = result_tuple(
- keys, [ent._extra_entities for ent in ctx._entities]
- )
-
- result = []
- for newrow in frozen_result.rewrite_rows():
- for i in mapped_entities:
- if newrow[i] is not None:
- newrow[i] = session._merge(
- attributes.instance_state(newrow[i]),
- attributes.instance_dict(newrow[i]),
- load=load,
- _recursive={},
- _resolve_conflict_map={},
- )
-
- result.append(keyed_tuple(newrow))
-
- return frozen_result.with_new_rows(result)
- finally:
- session.autoflush = autoflush
-
-
-@util.became_legacy_20(
- ":func:`_orm.merge_result`",
- alternative="The function as well as the method on :class:`_orm.Query` "
- "is superseded by the :func:`_orm.merge_frozen_result` function.",
-)
-@util.preload_module("sqlalchemy.orm.context")
-def merge_result(
- query: Query[Any],
- iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
- load: bool = True,
-) -> Union[FrozenResult, Iterable[Any]]:
- """Merge a result into the given :class:`.Query` object's Session.
-
- See :meth:`_orm.Query.merge_result` for top-level documentation on this
- function.
-
- """
-
- querycontext = util.preloaded.orm_context
-
- session = query.session
- if load:
- # flush current contents if we expect to load data
- session._autoflush()
-
- # TODO: need test coverage and documentation for the FrozenResult
- # use case.
- if isinstance(iterator, FrozenResult):
- frozen_result = iterator
- iterator = iter(frozen_result.data)
- else:
- frozen_result = None
-
- ctx = querycontext.ORMSelectCompileState._create_entities_collection(
- query, legacy=True
- )
-
- autoflush = session.autoflush
- try:
- session.autoflush = False
- single_entity = not frozen_result and len(ctx._entities) == 1
-
- if single_entity:
- if isinstance(ctx._entities[0], querycontext._MapperEntity):
- result = [
- session._merge(
- attributes.instance_state(instance),
- attributes.instance_dict(instance),
- load=load,
- _recursive={},
- _resolve_conflict_map={},
- )
- for instance in iterator
- ]
- else:
- result = list(iterator)
- else:
- mapped_entities = [
- i
- for i, e in enumerate(ctx._entities)
- if isinstance(e, querycontext._MapperEntity)
- ]
- result = []
- keys = [ent._label_name for ent in ctx._entities]
-
- keyed_tuple = result_tuple(
- keys, [ent._extra_entities for ent in ctx._entities]
- )
-
- for row in iterator:
- newrow = list(row)
- for i in mapped_entities:
- if newrow[i] is not None:
- newrow[i] = session._merge(
- attributes.instance_state(newrow[i]),
- attributes.instance_dict(newrow[i]),
- load=load,
- _recursive={},
- _resolve_conflict_map={},
- )
- result.append(keyed_tuple(newrow))
-
- if frozen_result:
- return frozen_result.with_new_rows(result)
- else:
- return iter(result)
- finally:
- session.autoflush = autoflush
-
-
-def get_from_identity(
- session: Session,
- mapper: Mapper[_O],
- key: _IdentityKeyType[_O],
- passive: PassiveFlag,
-) -> Union[LoaderCallableStatus, Optional[_O]]:
- """Look up the given key in the given session's identity map,
- check the object for expired state if found.
-
- """
- instance = session.identity_map.get(key)
- if instance is not None:
- state = attributes.instance_state(instance)
-
- if mapper.inherits and not state.mapper.isa(mapper):
- return attributes.PASSIVE_CLASS_MISMATCH
-
- # expired - ensure it still exists
- if state.expired:
- if not passive & attributes.SQL_OK:
- # TODO: no coverage here
- return attributes.PASSIVE_NO_RESULT
- elif not passive & attributes.RELATED_OBJECT_OK:
- # this mode is used within a flush and the instance's
- # expired state will be checked soon enough, if necessary.
- # also used by immediateloader for a mutually-dependent
- # o2m->m2m load, :ticket:`6301`
- return instance
- try:
- state._load_expired(state, passive)
- except orm_exc.ObjectDeletedError:
- session._remove_newly_deleted([state])
- return None
- return instance
- else:
- return None
-
-
-def load_on_ident(
- session: Session,
- statement: Union[Select, FromStatement],
- key: Optional[_IdentityKeyType],
- *,
- load_options: Optional[Sequence[ORMOption]] = None,
- refresh_state: Optional[InstanceState[Any]] = None,
- with_for_update: Optional[ForUpdateArg] = None,
- only_load_props: Optional[Iterable[str]] = None,
- no_autoflush: bool = False,
- bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
- execution_options: _ExecuteOptions = util.EMPTY_DICT,
- require_pk_cols: bool = False,
- is_user_refresh: bool = False,
-):
- """Load the given identity key from the database."""
- if key is not None:
- ident = key[1]
- identity_token = key[2]
- else:
- ident = identity_token = None
-
- return load_on_pk_identity(
- session,
- statement,
- ident,
- load_options=load_options,
- refresh_state=refresh_state,
- with_for_update=with_for_update,
- only_load_props=only_load_props,
- identity_token=identity_token,
- no_autoflush=no_autoflush,
- bind_arguments=bind_arguments,
- execution_options=execution_options,
- require_pk_cols=require_pk_cols,
- is_user_refresh=is_user_refresh,
- )
-
-
-def load_on_pk_identity(
- session: Session,
- statement: Union[Select, FromStatement],
- primary_key_identity: Optional[Tuple[Any, ...]],
- *,
- load_options: Optional[Sequence[ORMOption]] = None,
- refresh_state: Optional[InstanceState[Any]] = None,
- with_for_update: Optional[ForUpdateArg] = None,
- only_load_props: Optional[Iterable[str]] = None,
- identity_token: Optional[Any] = None,
- no_autoflush: bool = False,
- bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
- execution_options: _ExecuteOptions = util.EMPTY_DICT,
- require_pk_cols: bool = False,
- is_user_refresh: bool = False,
-):
- """Load the given primary key identity from the database."""
-
- query = statement
- q = query._clone()
-
- assert not q._is_lambda_element
-
- if load_options is None:
- load_options = QueryContext.default_load_options
-
- if (
- statement._compile_options
- is SelectState.default_select_compile_options
- ):
- compile_options = ORMCompileState.default_compile_options
- else:
- compile_options = statement._compile_options
-
- if primary_key_identity is not None:
- mapper = query._propagate_attrs["plugin_subject"]
-
- (_get_clause, _get_params) = mapper._get_clause
-
- # None present in ident - turn those comparisons
- # into "IS NULL"
- if None in primary_key_identity:
- nones = {
- _get_params[col].key
- for col, value in zip(mapper.primary_key, primary_key_identity)
- if value is None
- }
-
- _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
-
- if len(nones) == len(primary_key_identity):
- util.warn(
- "fully NULL primary key identity cannot load any "
- "object. This condition may raise an error in a future "
- "release."
- )
-
- q._where_criteria = (
- sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
- )
-
- params = {
- _get_params[primary_key].key: id_val
- for id_val, primary_key in zip(
- primary_key_identity, mapper.primary_key
- )
- }
- else:
- params = None
-
- if with_for_update is not None:
- version_check = True
- q._for_update_arg = with_for_update
- elif query._for_update_arg is not None:
- version_check = True
- q._for_update_arg = query._for_update_arg
- else:
- version_check = False
-
- if require_pk_cols and only_load_props:
- if not refresh_state:
- raise sa_exc.ArgumentError(
- "refresh_state is required when require_pk_cols is present"
- )
-
- refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
- has_changes = {
- key
- for key in refresh_state_prokeys.difference(only_load_props)
- if refresh_state.attrs[key].history.has_changes()
- }
- if has_changes:
- # raise if pending pk changes are present.
- # technically, this could be limited to the case where we have
- # relationships in the only_load_props collection to be refreshed
- # also (and only ones that have a secondary eager loader, at that).
- # however, the error is in place across the board so that behavior
- # here is easier to predict. The use case it prevents is one
- # of mutating PK attrs, leaving them unflushed,
- # calling session.refresh(), and expecting those attrs to remain
- # still unflushed. It seems likely someone doing all those
- # things would be better off having the PK attributes flushed
- # to the database before tinkering like that (session.refresh() is
- # tinkering).
- raise sa_exc.InvalidRequestError(
- f"Please flush pending primary key changes on "
- "attributes "
- f"{has_changes} for mapper {refresh_state.mapper} before "
- "proceeding with a refresh"
- )
-
- # overall, the ORM has no internal flow right now for "dont load the
- # primary row of an object at all, but fire off
- # selectinload/subqueryload/immediateload for some relationships".
- # It would probably be a pretty big effort to add such a flow. So
- # here, the case for #8703 is introduced; user asks to refresh some
- # relationship attributes only which are
- # selectinload/subqueryload/immediateload/ etc. (not joinedload).
- # ORM complains there's no columns in the primary row to load.
- # So here, we just add the PK cols if that
- # case is detected, so that there is a SELECT emitted for the primary
- # row.
- #
- # Let's just state right up front, for this one little case,
- # the ORM here is adding a whole extra SELECT just to satisfy
- # limitations in the internal flow. This is really not a thing
- # SQLAlchemy finds itself doing like, ever, obviously, we are
- # constantly working to *remove* SELECTs we don't need. We
- # rationalize this for now based on 1. session.refresh() is not
- # commonly used 2. session.refresh() with only relationship attrs is
- # even less commonly used 3. the SELECT in question is very low
- # latency.
- #
- # to add the flow to not include the SELECT, the quickest way
- # might be to just manufacture a single-row result set to send off to
- # instances(), but we'd have to weave that into context.py and all
- # that. For 2.0.0, we have enough big changes to navigate for now.
- #
- mp = refresh_state.mapper._props
- for p in only_load_props:
- if mp[p]._is_relationship:
- only_load_props = refresh_state_prokeys.union(only_load_props)
- break
-
- if refresh_state and refresh_state.load_options:
- compile_options += {"_current_path": refresh_state.load_path.parent}
- q = q.options(*refresh_state.load_options)
-
- new_compile_options, load_options = _set_get_options(
- compile_options,
- load_options,
- version_check=version_check,
- only_load_props=only_load_props,
- refresh_state=refresh_state,
- identity_token=identity_token,
- is_user_refresh=is_user_refresh,
- )
-
- q._compile_options = new_compile_options
- q._order_by = None
-
- if no_autoflush:
- load_options += {"_autoflush": False}
-
- execution_options = util.EMPTY_DICT.merge_with(
- execution_options, {"_sa_orm_load_options": load_options}
- )
- result = (
- session.execute(
- q,
- params=params,
- execution_options=execution_options,
- bind_arguments=bind_arguments,
- )
- .unique()
- .scalars()
- )
-
- try:
- return result.one()
- except orm_exc.NoResultFound:
- return None
-
-
-def _set_get_options(
- compile_opt,
- load_opt,
- populate_existing=None,
- version_check=None,
- only_load_props=None,
- refresh_state=None,
- identity_token=None,
- is_user_refresh=None,
-):
- compile_options = {}
- load_options = {}
- if version_check:
- load_options["_version_check"] = version_check
- if populate_existing:
- load_options["_populate_existing"] = populate_existing
- if refresh_state:
- load_options["_refresh_state"] = refresh_state
- compile_options["_for_refresh_state"] = True
- if only_load_props:
- compile_options["_only_load_props"] = frozenset(only_load_props)
- if identity_token:
- load_options["_identity_token"] = identity_token
-
- if is_user_refresh:
- load_options["_is_user_refresh"] = is_user_refresh
- if load_options:
- load_opt += load_options
- if compile_options:
- compile_opt += compile_options
-
- return compile_opt, load_opt
-
-
-def _setup_entity_query(
- compile_state,
- mapper,
- query_entity,
- path,
- adapter,
- column_collection,
- with_polymorphic=None,
- only_load_props=None,
- polymorphic_discriminator=None,
- **kw,
-):
- if with_polymorphic:
- poly_properties = mapper._iterate_polymorphic_properties(
- with_polymorphic
- )
- else:
- poly_properties = mapper._polymorphic_properties
-
- quick_populators = {}
-
- path.set(compile_state.attributes, "memoized_setups", quick_populators)
-
- # for the lead entities in the path, e.g. not eager loads, and
- # assuming a user-passed aliased class, e.g. not a from_self() or any
- # implicit aliasing, don't add columns to the SELECT that aren't
- # in the thing that's aliased.
- check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
-
- for value in poly_properties:
- if only_load_props and value.key not in only_load_props:
- continue
- value.setup(
- compile_state,
- query_entity,
- path,
- adapter,
- only_load_props=only_load_props,
- column_collection=column_collection,
- memoized_populators=quick_populators,
- check_for_adapt=check_for_adapt,
- **kw,
- )
-
- if (
- polymorphic_discriminator is not None
- and polymorphic_discriminator is not mapper.polymorphic_on
- ):
- if adapter:
- pd = adapter.columns[polymorphic_discriminator]
- else:
- pd = polymorphic_discriminator
- column_collection.append(pd)
-
-
-def _warn_for_runid_changed(state):
- util.warn(
- "Loading context for %s has changed within a load/refresh "
- "handler, suggesting a row refresh operation took place. If this "
- "event handler is expected to be "
- "emitting row refresh operations within an existing load or refresh "
- "operation, set restore_load_context=True when establishing the "
- "listener to ensure the context remains unchanged when the event "
- "handler completes." % (state_str(state),)
- )
-
-
-def _instance_processor(
- query_entity,
- mapper,
- context,
- result,
- path,
- adapter,
- only_load_props=None,
- refresh_state=None,
- polymorphic_discriminator=None,
- _polymorphic_from=None,
-):
- """Produce a mapper level row processor callable
- which processes rows into mapped instances."""
-
- # note that this method, most of which exists in a closure
- # called _instance(), resists being broken out, as
- # attempts to do so tend to add significant function
- # call overhead. _instance() is the most
- # performance-critical section in the whole ORM.
-
- identity_class = mapper._identity_class
- compile_state = context.compile_state
-
- # look for "row getter" functions that have been assigned along
- # with the compile state that were cached from a previous load.
- # these are operator.itemgetter() objects that each will extract a
- # particular column from each row.
-
- getter_key = ("getters", mapper)
- getters = path.get(compile_state.attributes, getter_key, None)
-
- if getters is None:
- # no getters, so go through a list of attributes we are loading for,
- # and the ones that are column based will have already put information
- # for us in another collection "memoized_setups", which represents the
- # output of the LoaderStrategy.setup_query() method. We can just as
- # easily call LoaderStrategy.create_row_processor for each, but by
- # getting it all at once from setup_query we save another method call
- # per attribute.
- props = mapper._prop_set
- if only_load_props is not None:
- props = props.intersection(
- mapper._props[k] for k in only_load_props
- )
-
- quick_populators = path.get(
- context.attributes, "memoized_setups", EMPTY_DICT
- )
-
- todo = []
- cached_populators = {
- "new": [],
- "quick": [],
- "deferred": [],
- "expire": [],
- "existing": [],
- "eager": [],
- }
-
- if refresh_state is None:
- # we can also get the "primary key" tuple getter function
- pk_cols = mapper.primary_key
-
- if adapter:
- pk_cols = [adapter.columns[c] for c in pk_cols]
- primary_key_getter = result._tuple_getter(pk_cols)
- else:
- primary_key_getter = None
-
- getters = {
- "cached_populators": cached_populators,
- "todo": todo,
- "primary_key_getter": primary_key_getter,
- }
- for prop in props:
- if prop in quick_populators:
- # this is an inlined path just for column-based attributes.
- col = quick_populators[prop]
- if col is _DEFER_FOR_STATE:
- cached_populators["new"].append(
- (prop.key, prop._deferred_column_loader)
- )
- elif col is _SET_DEFERRED_EXPIRED:
- # note that in this path, we are no longer
- # searching in the result to see if the column might
- # be present in some unexpected way.
- cached_populators["expire"].append((prop.key, False))
- elif col is _RAISE_FOR_STATE:
- cached_populators["new"].append(
- (prop.key, prop._raise_column_loader)
- )
- else:
- getter = None
- if adapter:
- # this logic had been removed for all 1.4 releases
- # up until 1.4.18; the adapter here is particularly
- # the compound eager adapter which isn't accommodated
- # in the quick_populators right now. The "fallback"
- # logic below instead took over in many more cases
- # until issue #6596 was identified.
-
- # note there is still an issue where this codepath
- # produces no "getter" for cases where a joined-inh
- # mapping includes a labeled column property, meaning
- # KeyError is caught internally and we fall back to
- # _getter(col), which works anyway. The adapter
- # here for joined inh without any aliasing might not
- # be useful. Tests which see this include
- # test.orm.inheritance.test_basic ->
- # EagerTargetingTest.test_adapt_stringency
- # OptimizedLoadTest.test_column_expression_joined
- # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop # noqa: E501
- #
-
- adapted_col = adapter.columns[col]
- if adapted_col is not None:
- getter = result._getter(adapted_col, False)
- if not getter:
- getter = result._getter(col, False)
- if getter:
- cached_populators["quick"].append((prop.key, getter))
- else:
- # fall back to the ColumnProperty itself, which
- # will iterate through all of its columns
- # to see if one fits
- prop.create_row_processor(
- context,
- query_entity,
- path,
- mapper,
- result,
- adapter,
- cached_populators,
- )
- else:
- # loader strategies like subqueryload, selectinload,
- # joinedload, basically relationships, these need to interact
- # with the context each time to work correctly.
- todo.append(prop)
-
- path.set(compile_state.attributes, getter_key, getters)
-
- cached_populators = getters["cached_populators"]
-
- populators = {key: list(value) for key, value in cached_populators.items()}
- for prop in getters["todo"]:
- prop.create_row_processor(
- context, query_entity, path, mapper, result, adapter, populators
- )
-
- propagated_loader_options = context.propagated_loader_options
- load_path = (
- context.compile_state.current_path + path
- if context.compile_state.current_path.path
- else path
- )
-
- session_identity_map = context.session.identity_map
-
- populate_existing = context.populate_existing or mapper.always_refresh
- load_evt = bool(mapper.class_manager.dispatch.load)
- refresh_evt = bool(mapper.class_manager.dispatch.refresh)
- persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
- if persistent_evt:
- loaded_as_persistent = context.session.dispatch.loaded_as_persistent
- instance_state = attributes.instance_state
- instance_dict = attributes.instance_dict
- session_id = context.session.hash_key
- runid = context.runid
- identity_token = context.identity_token
-
- version_check = context.version_check
- if version_check:
- version_id_col = mapper.version_id_col
- if version_id_col is not None:
- if adapter:
- version_id_col = adapter.columns[version_id_col]
- version_id_getter = result._getter(version_id_col)
- else:
- version_id_getter = None
-
- if not refresh_state and _polymorphic_from is not None:
- key = ("loader", path.path)
-
- if key in context.attributes and context.attributes[key].strategy == (
- ("selectinload_polymorphic", True),
- ):
- option_entities = context.attributes[key].local_opts["entities"]
- else:
- option_entities = None
- selectin_load_via = mapper._should_selectin_load(
- option_entities,
- _polymorphic_from,
- )
-
- if selectin_load_via and selectin_load_via is not _polymorphic_from:
- # only_load_props goes w/ refresh_state only, and in a refresh
- # we are a single row query for the exact entity; polymorphic
- # loading does not apply
- assert only_load_props is None
-
- callable_ = _load_subclass_via_in(
- context,
- path,
- selectin_load_via,
- _polymorphic_from,
- option_entities,
- )
- PostLoad.callable_for_path(
- context,
- load_path,
- selectin_load_via.mapper,
- selectin_load_via,
- callable_,
- selectin_load_via,
- )
-
- post_load = PostLoad.for_context(context, load_path, only_load_props)
-
- if refresh_state:
- refresh_identity_key = refresh_state.key
- if refresh_identity_key is None:
- # super-rare condition; a refresh is being called
- # on a non-instance-key instance; this is meant to only
- # occur within a flush()
- refresh_identity_key = mapper._identity_key_from_state(
- refresh_state
- )
- else:
- refresh_identity_key = None
-
- primary_key_getter = getters["primary_key_getter"]
-
- if mapper.allow_partial_pks:
- is_not_primary_key = _none_set.issuperset
- else:
- is_not_primary_key = _none_set.intersection
-
- def _instance(row):
- # determine the state that we'll be populating
- if refresh_identity_key:
- # fixed state that we're refreshing
- state = refresh_state
- instance = state.obj()
- dict_ = instance_dict(instance)
- isnew = state.runid != runid
- currentload = True
- loaded_instance = False
- else:
- # look at the row, see if that identity is in the
- # session, or we have to create a new one
- identitykey = (
- identity_class,
- primary_key_getter(row),
- identity_token,
- )
-
- instance = session_identity_map.get(identitykey)
-
- if instance is not None:
- # existing instance
- state = instance_state(instance)
- dict_ = instance_dict(instance)
-
- isnew = state.runid != runid
- currentload = not isnew
- loaded_instance = False
-
- if version_check and version_id_getter and not currentload:
- _validate_version_id(
- mapper, state, dict_, row, version_id_getter
- )
-
- else:
- # create a new instance
-
- # check for non-NULL values in the primary key columns,
- # else no entity is returned for the row
- if is_not_primary_key(identitykey[1]):
- return None
-
- isnew = True
- currentload = True
- loaded_instance = True
-
- instance = mapper.class_manager.new_instance()
-
- dict_ = instance_dict(instance)
- state = instance_state(instance)
- state.key = identitykey
- state.identity_token = identity_token
-
- # attach instance to session.
- state.session_id = session_id
- session_identity_map._add_unpresent(state, identitykey)
-
- effective_populate_existing = populate_existing
- if refresh_state is state:
- effective_populate_existing = True
-
- # populate. this looks at whether this state is new
- # for this load or was existing, and whether or not this
- # row is the first row with this identity.
- if currentload or effective_populate_existing:
- # full population routines. Objects here are either
- # just created, or we are doing a populate_existing
-
- # be conservative about setting load_path when populate_existing
- # is in effect; want to maintain options from the original
- # load. see test_expire->test_refresh_maintains_deferred_options
- if isnew and (
- propagated_loader_options or not effective_populate_existing
- ):
- state.load_options = propagated_loader_options
- state.load_path = load_path
-
- _populate_full(
- context,
- row,
- state,
- dict_,
- isnew,
- load_path,
- loaded_instance,
- effective_populate_existing,
- populators,
- )
-
- if isnew:
- # state.runid should be equal to context.runid / runid
- # here, however for event checks we are being more conservative
- # and checking against existing run id
- # assert state.runid == runid
-
- existing_runid = state.runid
-
- if loaded_instance:
- if load_evt:
- state.manager.dispatch.load(state, context)
- if state.runid != existing_runid:
- _warn_for_runid_changed(state)
- if persistent_evt:
- loaded_as_persistent(context.session, state)
- if state.runid != existing_runid:
- _warn_for_runid_changed(state)
- elif refresh_evt:
- state.manager.dispatch.refresh(
- state, context, only_load_props
- )
- if state.runid != runid:
- _warn_for_runid_changed(state)
-
- if effective_populate_existing or state.modified:
- if refresh_state and only_load_props:
- state._commit(dict_, only_load_props)
- else:
- state._commit_all(dict_, session_identity_map)
-
- if post_load:
- post_load.add_state(state, True)
-
- else:
- # partial population routines, for objects that were already
- # in the Session, but a row matches them; apply eager loaders
- # on existing objects, etc.
- unloaded = state.unloaded
- isnew = state not in context.partials
-
- if not isnew or unloaded or populators["eager"]:
- # state is having a partial set of its attributes
- # refreshed. Populate those attributes,
- # and add to the "context.partials" collection.
-
- to_load = _populate_partial(
- context,
- row,
- state,
- dict_,
- isnew,
- load_path,
- unloaded,
- populators,
- )
-
- if isnew:
- if refresh_evt:
- existing_runid = state.runid
- state.manager.dispatch.refresh(state, context, to_load)
- if state.runid != existing_runid:
- _warn_for_runid_changed(state)
-
- state._commit(dict_, to_load)
-
- if post_load and context.invoke_all_eagers:
- post_load.add_state(state, False)
-
- return instance
-
- if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
- # if we are doing polymorphic, dispatch to a different _instance()
- # method specific to the subclass mapper
- def ensure_no_pk(row):
- identitykey = (
- identity_class,
- primary_key_getter(row),
- identity_token,
- )
- if not is_not_primary_key(identitykey[1]):
- return identitykey
- else:
- return None
-
- _instance = _decorate_polymorphic_switch(
- _instance,
- context,
- query_entity,
- mapper,
- result,
- path,
- polymorphic_discriminator,
- adapter,
- ensure_no_pk,
- )
-
- return _instance
-
-
-def _load_subclass_via_in(
- context, path, entity, polymorphic_from, option_entities
-):
- mapper = entity.mapper
-
- # TODO: polymorphic_from seems to be a Mapper in all cases.
- # this is likely not needed, but as we dont have typing in loading.py
- # yet, err on the safe side
- polymorphic_from_mapper = polymorphic_from.mapper
- not_against_basemost = polymorphic_from_mapper.inherits is not None
-
- zero_idx = len(mapper.base_mapper.primary_key) == 1
-
- if entity.is_aliased_class or not_against_basemost:
- q, enable_opt, disable_opt = mapper._subclass_load_via_in(
- entity, polymorphic_from
- )
- else:
- q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
-
- def do_load(context, path, states, load_only, effective_entity):
- if not option_entities:
- # filter out states for those that would have selectinloaded
- # from another loader
- # TODO: we are currently ignoring the case where the
- # "selectin_polymorphic" option is used, as this is much more
- # complex / specific / very uncommon API use
- states = [
- (s, v)
- for s, v in states
- if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
- ]
-
- if not states:
- return
-
- orig_query = context.query
-
- if path.parent:
- enable_opt_lcl = enable_opt._prepend_path(path)
- disable_opt_lcl = disable_opt._prepend_path(path)
- else:
- enable_opt_lcl = enable_opt
- disable_opt_lcl = disable_opt
- options = (
- (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
- )
-
- q2 = q.options(*options)
-
- q2._compile_options = context.compile_state.default_compile_options
- q2._compile_options += {"_current_path": path.parent}
-
- if context.populate_existing:
- q2 = q2.execution_options(populate_existing=True)
-
- context.session.execute(
- q2,
- dict(
- primary_keys=[
- state.key[1][0] if zero_idx else state.key[1]
- for state, load_attrs in states
- ]
- ),
- ).unique().scalars().all()
-
- return do_load
-
-
-def _populate_full(
- context,
- row,
- state,
- dict_,
- isnew,
- load_path,
- loaded_instance,
- populate_existing,
- populators,
-):
- if isnew:
- # first time we are seeing a row with this identity.
- state.runid = context.runid
-
- for key, getter in populators["quick"]:
- dict_[key] = getter(row)
- if populate_existing:
- for key, set_callable in populators["expire"]:
- dict_.pop(key, None)
- if set_callable:
- state.expired_attributes.add(key)
- else:
- for key, set_callable in populators["expire"]:
- if set_callable:
- state.expired_attributes.add(key)
-
- for key, populator in populators["new"]:
- populator(state, dict_, row)
-
- elif load_path != state.load_path:
- # new load path, e.g. object is present in more than one
- # column position in a series of rows
- state.load_path = load_path
-
- # if we have data, and the data isn't in the dict, OK, let's put
- # it in.
- for key, getter in populators["quick"]:
- if key not in dict_:
- dict_[key] = getter(row)
-
- # otherwise treat like an "already seen" row
- for key, populator in populators["existing"]:
- populator(state, dict_, row)
- # TODO: allow "existing" populator to know this is
- # a new path for the state:
- # populator(state, dict_, row, new_path=True)
-
- else:
- # have already seen rows with this identity in this same path.
- for key, populator in populators["existing"]:
- populator(state, dict_, row)
-
- # TODO: same path
- # populator(state, dict_, row, new_path=False)
-
-
-def _populate_partial(
- context, row, state, dict_, isnew, load_path, unloaded, populators
-):
- if not isnew:
- if unloaded:
- # extra pass, see #8166
- for key, getter in populators["quick"]:
- if key in unloaded:
- dict_[key] = getter(row)
-
- to_load = context.partials[state]
- for key, populator in populators["existing"]:
- if key in to_load:
- populator(state, dict_, row)
- else:
- to_load = unloaded
- context.partials[state] = to_load
-
- for key, getter in populators["quick"]:
- if key in to_load:
- dict_[key] = getter(row)
- for key, set_callable in populators["expire"]:
- if key in to_load:
- dict_.pop(key, None)
- if set_callable:
- state.expired_attributes.add(key)
- for key, populator in populators["new"]:
- if key in to_load:
- populator(state, dict_, row)
-
- for key, populator in populators["eager"]:
- if key not in unloaded:
- populator(state, dict_, row)
-
- return to_load
-
-
-def _validate_version_id(mapper, state, dict_, row, getter):
- if mapper._get_state_attr_by_column(
- state, dict_, mapper.version_id_col
- ) != getter(row):
- raise orm_exc.StaleDataError(
- "Instance '%s' has version id '%s' which "
- "does not match database-loaded version id '%s'."
- % (
- state_str(state),
- mapper._get_state_attr_by_column(
- state, dict_, mapper.version_id_col
- ),
- getter(row),
- )
- )
-
-
-def _decorate_polymorphic_switch(
- instance_fn,
- context,
- query_entity,
- mapper,
- result,
- path,
- polymorphic_discriminator,
- adapter,
- ensure_no_pk,
-):
- if polymorphic_discriminator is not None:
- polymorphic_on = polymorphic_discriminator
- else:
- polymorphic_on = mapper.polymorphic_on
- if polymorphic_on is None:
- return instance_fn
-
- if adapter:
- polymorphic_on = adapter.columns[polymorphic_on]
-
- def configure_subclass_mapper(discriminator):
- try:
- sub_mapper = mapper.polymorphic_map[discriminator]
- except KeyError:
- raise AssertionError(
- "No such polymorphic_identity %r is defined" % discriminator
- )
- else:
- if sub_mapper is mapper:
- return None
- elif not sub_mapper.isa(mapper):
- return False
-
- return _instance_processor(
- query_entity,
- sub_mapper,
- context,
- result,
- path,
- adapter,
- _polymorphic_from=mapper,
- )
-
- polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
-
- getter = result._getter(polymorphic_on)
-
- def polymorphic_instance(row):
- discriminator = getter(row)
- if discriminator is not None:
- _instance = polymorphic_instances[discriminator]
- if _instance:
- return _instance(row)
- elif _instance is False:
- identitykey = ensure_no_pk(row)
-
- if identitykey:
- raise sa_exc.InvalidRequestError(
- "Row with identity key %s can't be loaded into an "
- "object; the polymorphic discriminator column '%s' "
- "refers to %s, which is not a sub-mapper of "
- "the requested %s"
- % (
- identitykey,
- polymorphic_on,
- mapper.polymorphic_map[discriminator],
- mapper,
- )
- )
- else:
- return None
- else:
- return instance_fn(row)
- else:
- identitykey = ensure_no_pk(row)
-
- if identitykey:
- raise sa_exc.InvalidRequestError(
- "Row with identity key %s can't be loaded into an "
- "object; the polymorphic discriminator column '%s' is "
- "NULL" % (identitykey, polymorphic_on)
- )
- else:
- return None
-
- return polymorphic_instance
-
-
-class PostLoad:
- """Track loaders and states for "post load" operations."""
-
- __slots__ = "loaders", "states", "load_keys"
-
- def __init__(self):
- self.loaders = {}
- self.states = util.OrderedDict()
- self.load_keys = None
-
- def add_state(self, state, overwrite):
- # the states for a polymorphic load here are all shared
- # within a single PostLoad object among multiple subtypes.
- # Filtering of callables on a per-subclass basis needs to be done at
- # the invocation level
- self.states[state] = overwrite
-
- def invoke(self, context, path):
- if not self.states:
- return
- path = path_registry.PathRegistry.coerce(path)
- for (
- effective_context,
- token,
- limit_to_mapper,
- loader,
- arg,
- kw,
- ) in self.loaders.values():
- states = [
- (state, overwrite)
- for state, overwrite in self.states.items()
- if state.manager.mapper.isa(limit_to_mapper)
- ]
- if states:
- loader(
- effective_context, path, states, self.load_keys, *arg, **kw
- )
- self.states.clear()
-
- @classmethod
- def for_context(cls, context, path, only_load_props):
- pl = context.post_load_paths.get(path.path)
- if pl is not None and only_load_props:
- pl.load_keys = only_load_props
- return pl
-
- @classmethod
- def path_exists(self, context, path, key):
- return (
- path.path in context.post_load_paths
- and key in context.post_load_paths[path.path].loaders
- )
-
- @classmethod
- def callable_for_path(
- cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
- ):
- if path.path in context.post_load_paths:
- pl = context.post_load_paths[path.path]
- else:
- pl = context.post_load_paths[path.path] = PostLoad()
- pl.loaders[token] = (
- context,
- token,
- limit_to_mapper,
- loader_callable,
- arg,
- kw,
- )
-
-
-def load_scalar_attributes(mapper, state, attribute_names, passive):
- """initiate a column-based attribute refresh operation."""
-
- # assert mapper is _state_mapper(state)
- session = state.session
- if not session:
- raise orm_exc.DetachedInstanceError(
- "Instance %s is not bound to a Session; "
- "attribute refresh operation cannot proceed" % (state_str(state))
- )
-
- no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
-
- # in the case of inheritance, particularly concrete and abstract
- # concrete inheritance, the class manager might have some keys
- # of attributes on the superclass that we didn't actually map.
- # These could be mapped as "concrete, don't load" or could be completely
- # excluded from the mapping and we know nothing about them. Filter them
- # here to prevent them from coming through.
- if attribute_names:
- attribute_names = attribute_names.intersection(mapper.attrs.keys())
-
- if mapper.inherits and not mapper.concrete:
- # load based on committed attributes in the object, formed into
- # a truncated SELECT that only includes relevant tables. does not
- # currently use state.key
- statement = mapper._optimized_get_statement(state, attribute_names)
- if statement is not None:
- # undefer() isn't needed here because statement has the
- # columns needed already, this implicitly undefers that column
- stmt = FromStatement(mapper, statement)
-
- return load_on_ident(
- session,
- stmt,
- None,
- only_load_props=attribute_names,
- refresh_state=state,
- no_autoflush=no_autoflush,
- )
-
- # normal load, use state.key as the identity to SELECT
- has_key = bool(state.key)
-
- if has_key:
- identity_key = state.key
- else:
- # this codepath is rare - only valid when inside a flush, and the
- # object is becoming persistent but hasn't yet been assigned
- # an identity_key.
- # check here to ensure we have the attrs we need.
- pk_attrs = [
- mapper._columntoproperty[col].key for col in mapper.primary_key
- ]
- if state.expired_attributes.intersection(pk_attrs):
- raise sa_exc.InvalidRequestError(
- "Instance %s cannot be refreshed - it's not "
- " persistent and does not "
- "contain a full primary key." % state_str(state)
- )
- identity_key = mapper._identity_key_from_state(state)
-
- if (
- _none_set.issubset(identity_key) and not mapper.allow_partial_pks
- ) or _none_set.issuperset(identity_key):
- util.warn_limited(
- "Instance %s to be refreshed doesn't "
- "contain a full primary key - can't be refreshed "
- "(and shouldn't be expired, either).",
- state_str(state),
- )
- return
-
- result = load_on_ident(
- session,
- select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
- identity_key,
- refresh_state=state,
- only_load_props=attribute_names,
- no_autoflush=no_autoflush,
- )
-
- # if instance is pending, a refresh operation
- # may not complete (even if PK attributes are assigned)
- if has_key and result is None:
- raise orm_exc.ObjectDeletedError(state)