summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/orm/strategies.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/strategies.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/orm/strategies.py3344
1 files changed, 3344 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/strategies.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/strategies.py
new file mode 100644
index 0000000..20c3b9c
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/orm/strategies.py
@@ -0,0 +1,3344 @@
+# orm/strategies.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+
+"""sqlalchemy.orm.interfaces.LoaderStrategy
+ implementations, and related MapperOptions."""
+
+from __future__ import annotations
+
+import collections
+import itertools
+from typing import Any
+from typing import Dict
+from typing import Tuple
+from typing import TYPE_CHECKING
+
+from . import attributes
+from . import exc as orm_exc
+from . import interfaces
+from . import loading
+from . import path_registry
+from . import properties
+from . import query
+from . import relationships
+from . import unitofwork
+from . import util as orm_util
+from .base import _DEFER_FOR_STATE
+from .base import _RAISE_FOR_STATE
+from .base import _SET_DEFERRED_EXPIRED
+from .base import ATTR_WAS_SET
+from .base import LoaderCallableStatus
+from .base import PASSIVE_OFF
+from .base import PassiveFlag
+from .context import _column_descriptions
+from .context import ORMCompileState
+from .context import ORMSelectCompileState
+from .context import QueryContext
+from .interfaces import LoaderStrategy
+from .interfaces import StrategizedProperty
+from .session import _state_session
+from .state import InstanceState
+from .strategy_options import Load
+from .util import _none_set
+from .util import AliasedClass
+from .. import event
+from .. import exc as sa_exc
+from .. import inspect
+from .. import log
+from .. import sql
+from .. import util
+from ..sql import util as sql_util
+from ..sql import visitors
+from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
+from ..sql.selectable import Select
+
+if TYPE_CHECKING:
+ from .relationships import RelationshipProperty
+ from ..sql.elements import ColumnElement
+
+
+def _register_attribute(
+ prop,
+ mapper,
+ useobject,
+ compare_function=None,
+ typecallable=None,
+ callable_=None,
+ proxy_property=None,
+ active_history=False,
+ impl_class=None,
+ **kw,
+):
+ listen_hooks = []
+
+ uselist = useobject and prop.uselist
+
+ if useobject and prop.single_parent:
+ listen_hooks.append(single_parent_validator)
+
+ if prop.key in prop.parent.validators:
+ fn, opts = prop.parent.validators[prop.key]
+ listen_hooks.append(
+ lambda desc, prop: orm_util._validator_events(
+ desc, prop.key, fn, **opts
+ )
+ )
+
+ if useobject:
+ listen_hooks.append(unitofwork.track_cascade_events)
+
+ # need to assemble backref listeners
+ # after the singleparentvalidator, mapper validator
+ if useobject:
+ backref = prop.back_populates
+ if backref and prop._effective_sync_backref:
+ listen_hooks.append(
+ lambda desc, prop: attributes.backref_listeners(
+ desc, backref, uselist
+ )
+ )
+
+ # a single MapperProperty is shared down a class inheritance
+ # hierarchy, so we set up attribute instrumentation and backref event
+ # for each mapper down the hierarchy.
+
+ # typically, "mapper" is the same as prop.parent, due to the way
+ # the configure_mappers() process runs, however this is not strongly
+ # enforced, and in the case of a second configure_mappers() run the
+ # mapper here might not be prop.parent; also, a subclass mapper may
+ # be called here before a superclass mapper. That is, can't depend
+ # on mappers not already being set up so we have to check each one.
+
+ for m in mapper.self_and_descendants:
+ if prop is m._props.get(
+ prop.key
+ ) and not m.class_manager._attr_has_impl(prop.key):
+ desc = attributes.register_attribute_impl(
+ m.class_,
+ prop.key,
+ parent_token=prop,
+ uselist=uselist,
+ compare_function=compare_function,
+ useobject=useobject,
+ trackparent=useobject
+ and (
+ prop.single_parent
+ or prop.direction is interfaces.ONETOMANY
+ ),
+ typecallable=typecallable,
+ callable_=callable_,
+ active_history=active_history,
+ impl_class=impl_class,
+ send_modified_events=not useobject or not prop.viewonly,
+ doc=prop.doc,
+ **kw,
+ )
+
+ for hook in listen_hooks:
+ hook(desc, prop)
+
+
+@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
+class UninstrumentedColumnLoader(LoaderStrategy):
+ """Represent a non-instrumented MapperProperty.
+
+ The polymorphic_on argument of mapper() often results in this,
+ if the argument is against the with_polymorphic selectable.
+
+ """
+
+ __slots__ = ("columns",)
+
+ def __init__(self, parent, strategy_key):
+ super().__init__(parent, strategy_key)
+ self.columns = self.parent_property.columns
+
+ def setup_query(
+ self,
+ compile_state,
+ query_entity,
+ path,
+ loadopt,
+ adapter,
+ column_collection=None,
+ **kwargs,
+ ):
+ for c in self.columns:
+ if adapter:
+ c = adapter.columns[c]
+ compile_state._append_dedupe_col_collection(c, column_collection)
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ pass
+
+
+@log.class_logger
+@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
+class ColumnLoader(LoaderStrategy):
+ """Provide loading behavior for a :class:`.ColumnProperty`."""
+
+ __slots__ = "columns", "is_composite"
+
+ def __init__(self, parent, strategy_key):
+ super().__init__(parent, strategy_key)
+ self.columns = self.parent_property.columns
+ self.is_composite = hasattr(self.parent_property, "composite_class")
+
+ def setup_query(
+ self,
+ compile_state,
+ query_entity,
+ path,
+ loadopt,
+ adapter,
+ column_collection,
+ memoized_populators,
+ check_for_adapt=False,
+ **kwargs,
+ ):
+ for c in self.columns:
+ if adapter:
+ if check_for_adapt:
+ c = adapter.adapt_check_present(c)
+ if c is None:
+ return
+ else:
+ c = adapter.columns[c]
+
+ compile_state._append_dedupe_col_collection(c, column_collection)
+
+ fetch = self.columns[0]
+ if adapter:
+ fetch = adapter.columns[fetch]
+ if fetch is None:
+ # None happens here only for dml bulk_persistence cases
+ # when context.DMLReturningColFilter is used
+ return
+
+ memoized_populators[self.parent_property] = fetch
+
+ def init_class_attribute(self, mapper):
+ self.is_class_level = True
+ coltype = self.columns[0].type
+ # TODO: check all columns ? check for foreign key as well?
+ active_history = (
+ self.parent_property.active_history
+ or self.columns[0].primary_key
+ or (
+ mapper.version_id_col is not None
+ and mapper._columntoproperty.get(mapper.version_id_col, None)
+ is self.parent_property
+ )
+ )
+
+ _register_attribute(
+ self.parent_property,
+ mapper,
+ useobject=False,
+ compare_function=coltype.compare_values,
+ active_history=active_history,
+ )
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ # look through list of columns represented here
+ # to see which, if any, is present in the row.
+
+ for col in self.columns:
+ if adapter:
+ col = adapter.columns[col]
+ getter = result._getter(col, False)
+ if getter:
+ populators["quick"].append((self.key, getter))
+ break
+ else:
+ populators["expire"].append((self.key, True))
+
+
+@log.class_logger
+@properties.ColumnProperty.strategy_for(query_expression=True)
+class ExpressionColumnLoader(ColumnLoader):
+ def __init__(self, parent, strategy_key):
+ super().__init__(parent, strategy_key)
+
+ # compare to the "default" expression that is mapped in
+ # the column. If it's sql.null, we don't need to render
+ # unless an expr is passed in the options.
+ null = sql.null().label(None)
+ self._have_default_expression = any(
+ not c.compare(null) for c in self.parent_property.columns
+ )
+
+ def setup_query(
+ self,
+ compile_state,
+ query_entity,
+ path,
+ loadopt,
+ adapter,
+ column_collection,
+ memoized_populators,
+ **kwargs,
+ ):
+ columns = None
+ if loadopt and loadopt._extra_criteria:
+ columns = loadopt._extra_criteria
+
+ elif self._have_default_expression:
+ columns = self.parent_property.columns
+
+ if columns is None:
+ return
+
+ for c in columns:
+ if adapter:
+ c = adapter.columns[c]
+ compile_state._append_dedupe_col_collection(c, column_collection)
+
+ fetch = columns[0]
+ if adapter:
+ fetch = adapter.columns[fetch]
+ if fetch is None:
+ # None is not expected to be the result of any
+ # adapter implementation here, however there may be theoretical
+ # usages of returning() with context.DMLReturningColFilter
+ return
+
+ memoized_populators[self.parent_property] = fetch
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ # look through list of columns represented here
+ # to see which, if any, is present in the row.
+ if loadopt and loadopt._extra_criteria:
+ columns = loadopt._extra_criteria
+
+ for col in columns:
+ if adapter:
+ col = adapter.columns[col]
+ getter = result._getter(col, False)
+ if getter:
+ populators["quick"].append((self.key, getter))
+ break
+ else:
+ populators["expire"].append((self.key, True))
+
+ def init_class_attribute(self, mapper):
+ self.is_class_level = True
+
+ _register_attribute(
+ self.parent_property,
+ mapper,
+ useobject=False,
+ compare_function=self.columns[0].type.compare_values,
+ accepts_scalar_loader=False,
+ )
+
+
+@log.class_logger
+@properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
+@properties.ColumnProperty.strategy_for(
+ deferred=True, instrument=True, raiseload=True
+)
+@properties.ColumnProperty.strategy_for(do_nothing=True)
+class DeferredColumnLoader(LoaderStrategy):
+ """Provide loading behavior for a deferred :class:`.ColumnProperty`."""
+
+ __slots__ = "columns", "group", "raiseload"
+
+ def __init__(self, parent, strategy_key):
+ super().__init__(parent, strategy_key)
+ if hasattr(self.parent_property, "composite_class"):
+ raise NotImplementedError(
+ "Deferred loading for composite types not implemented yet"
+ )
+ self.raiseload = self.strategy_opts.get("raiseload", False)
+ self.columns = self.parent_property.columns
+ self.group = self.parent_property.group
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ # for a DeferredColumnLoader, this method is only used during a
+ # "row processor only" query; see test_deferred.py ->
+ # tests with "rowproc_only" in their name. As of the 1.0 series,
+ # loading._instance_processor doesn't use a "row processing" function
+ # to populate columns, instead it uses data in the "populators"
+ # dictionary. Normally, the DeferredColumnLoader.setup_query()
+ # sets up that data in the "memoized_populators" dictionary
+ # and "create_row_processor()" here is never invoked.
+
+ if (
+ context.refresh_state
+ and context.query._compile_options._only_load_props
+ and self.key in context.query._compile_options._only_load_props
+ ):
+ self.parent_property._get_strategy(
+ (("deferred", False), ("instrument", True))
+ ).create_row_processor(
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ )
+
+ elif not self.is_class_level:
+ if self.raiseload:
+ set_deferred_for_local_state = (
+ self.parent_property._raise_column_loader
+ )
+ else:
+ set_deferred_for_local_state = (
+ self.parent_property._deferred_column_loader
+ )
+ populators["new"].append((self.key, set_deferred_for_local_state))
+ else:
+ populators["expire"].append((self.key, False))
+
+ def init_class_attribute(self, mapper):
+ self.is_class_level = True
+
+ _register_attribute(
+ self.parent_property,
+ mapper,
+ useobject=False,
+ compare_function=self.columns[0].type.compare_values,
+ callable_=self._load_for_state,
+ load_on_unexpire=False,
+ )
+
+ def setup_query(
+ self,
+ compile_state,
+ query_entity,
+ path,
+ loadopt,
+ adapter,
+ column_collection,
+ memoized_populators,
+ only_load_props=None,
+ **kw,
+ ):
+ if (
+ (
+ compile_state.compile_options._render_for_subquery
+ and self.parent_property._renders_in_subqueries
+ )
+ or (
+ loadopt
+ and set(self.columns).intersection(
+ self.parent._should_undefer_in_wildcard
+ )
+ )
+ or (
+ loadopt
+ and self.group
+ and loadopt.local_opts.get(
+ "undefer_group_%s" % self.group, False
+ )
+ )
+ or (only_load_props and self.key in only_load_props)
+ ):
+ self.parent_property._get_strategy(
+ (("deferred", False), ("instrument", True))
+ ).setup_query(
+ compile_state,
+ query_entity,
+ path,
+ loadopt,
+ adapter,
+ column_collection,
+ memoized_populators,
+ **kw,
+ )
+ elif self.is_class_level:
+ memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
+ elif not self.raiseload:
+ memoized_populators[self.parent_property] = _DEFER_FOR_STATE
+ else:
+ memoized_populators[self.parent_property] = _RAISE_FOR_STATE
+
+ def _load_for_state(self, state, passive):
+ if not state.key:
+ return LoaderCallableStatus.ATTR_EMPTY
+
+ if not passive & PassiveFlag.SQL_OK:
+ return LoaderCallableStatus.PASSIVE_NO_RESULT
+
+ localparent = state.manager.mapper
+
+ if self.group:
+ toload = [
+ p.key
+ for p in localparent.iterate_properties
+ if isinstance(p, StrategizedProperty)
+ and isinstance(p.strategy, DeferredColumnLoader)
+ and p.group == self.group
+ ]
+ else:
+ toload = [self.key]
+
+ # narrow the keys down to just those which have no history
+ group = [k for k in toload if k in state.unmodified]
+
+ session = _state_session(state)
+ if session is None:
+ raise orm_exc.DetachedInstanceError(
+ "Parent instance %s is not bound to a Session; "
+ "deferred load operation of attribute '%s' cannot proceed"
+ % (orm_util.state_str(state), self.key)
+ )
+
+ if self.raiseload:
+ self._invoke_raise_load(state, passive, "raise")
+
+ loading.load_scalar_attributes(
+ state.mapper, state, set(group), PASSIVE_OFF
+ )
+
+ return LoaderCallableStatus.ATTR_WAS_SET
+
+ def _invoke_raise_load(self, state, passive, lazy):
+ raise sa_exc.InvalidRequestError(
+ "'%s' is not available due to raiseload=True" % (self,)
+ )
+
+
+class LoadDeferredColumns:
+ """serializable loader object used by DeferredColumnLoader"""
+
+ def __init__(self, key: str, raiseload: bool = False):
+ self.key = key
+ self.raiseload = raiseload
+
+ def __call__(self, state, passive=attributes.PASSIVE_OFF):
+ key = self.key
+
+ localparent = state.manager.mapper
+ prop = localparent._props[key]
+ if self.raiseload:
+ strategy_key = (
+ ("deferred", True),
+ ("instrument", True),
+ ("raiseload", True),
+ )
+ else:
+ strategy_key = (("deferred", True), ("instrument", True))
+ strategy = prop._get_strategy(strategy_key)
+ return strategy._load_for_state(state, passive)
+
+
+class AbstractRelationshipLoader(LoaderStrategy):
+ """LoaderStratgies which deal with related objects."""
+
+ __slots__ = "mapper", "target", "uselist", "entity"
+
+ def __init__(self, parent, strategy_key):
+ super().__init__(parent, strategy_key)
+ self.mapper = self.parent_property.mapper
+ self.entity = self.parent_property.entity
+ self.target = self.parent_property.target
+ self.uselist = self.parent_property.uselist
+
+ def _immediateload_create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ return self.parent_property._get_strategy(
+ (("lazy", "immediate"),)
+ ).create_row_processor(
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ )
+
+
+@log.class_logger
+@relationships.RelationshipProperty.strategy_for(do_nothing=True)
+class DoNothingLoader(LoaderStrategy):
+ """Relationship loader that makes no change to the object's state.
+
+ Compared to NoLoader, this loader does not initialize the
+ collection/attribute to empty/none; the usual default LazyLoader will
+ take effect.
+
+ """
+
+
+@log.class_logger
+@relationships.RelationshipProperty.strategy_for(lazy="noload")
+@relationships.RelationshipProperty.strategy_for(lazy=None)
+class NoLoader(AbstractRelationshipLoader):
+ """Provide loading behavior for a :class:`.Relationship`
+ with "lazy=None".
+
+ """
+
+ __slots__ = ()
+
+ def init_class_attribute(self, mapper):
+ self.is_class_level = True
+
+ _register_attribute(
+ self.parent_property,
+ mapper,
+ useobject=True,
+ typecallable=self.parent_property.collection_class,
+ )
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ def invoke_no_load(state, dict_, row):
+ if self.uselist:
+ attributes.init_state_collection(state, dict_, self.key)
+ else:
+ dict_[self.key] = None
+
+ populators["new"].append((self.key, invoke_no_load))
+
+
+@log.class_logger
+@relationships.RelationshipProperty.strategy_for(lazy=True)
+@relationships.RelationshipProperty.strategy_for(lazy="select")
+@relationships.RelationshipProperty.strategy_for(lazy="raise")
+@relationships.RelationshipProperty.strategy_for(lazy="raise_on_sql")
+@relationships.RelationshipProperty.strategy_for(lazy="baked_select")
+class LazyLoader(
+ AbstractRelationshipLoader, util.MemoizedSlots, log.Identified
+):
+ """Provide loading behavior for a :class:`.Relationship`
+ with "lazy=True", that is loads when first accessed.
+
+ """
+
+ __slots__ = (
+ "_lazywhere",
+ "_rev_lazywhere",
+ "_lazyload_reverse_option",
+ "_order_by",
+ "use_get",
+ "is_aliased_class",
+ "_bind_to_col",
+ "_equated_columns",
+ "_rev_bind_to_col",
+ "_rev_equated_columns",
+ "_simple_lazy_clause",
+ "_raise_always",
+ "_raise_on_sql",
+ )
+
+ _lazywhere: ColumnElement[bool]
+ _bind_to_col: Dict[str, ColumnElement[Any]]
+ _rev_lazywhere: ColumnElement[bool]
+ _rev_bind_to_col: Dict[str, ColumnElement[Any]]
+
+ parent_property: RelationshipProperty[Any]
+
+ def __init__(
+ self, parent: RelationshipProperty[Any], strategy_key: Tuple[Any, ...]
+ ):
+ super().__init__(parent, strategy_key)
+ self._raise_always = self.strategy_opts["lazy"] == "raise"
+ self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql"
+
+ self.is_aliased_class = inspect(self.entity).is_aliased_class
+
+ join_condition = self.parent_property._join_condition
+ (
+ self._lazywhere,
+ self._bind_to_col,
+ self._equated_columns,
+ ) = join_condition.create_lazy_clause()
+
+ (
+ self._rev_lazywhere,
+ self._rev_bind_to_col,
+ self._rev_equated_columns,
+ ) = join_condition.create_lazy_clause(reverse_direction=True)
+
+ if self.parent_property.order_by:
+ self._order_by = [
+ sql_util._deep_annotate(elem, {"_orm_adapt": True})
+ for elem in util.to_list(self.parent_property.order_by)
+ ]
+ else:
+ self._order_by = None
+
+ self.logger.info("%s lazy loading clause %s", self, self._lazywhere)
+
+ # determine if our "lazywhere" clause is the same as the mapper's
+ # get() clause. then we can just use mapper.get()
+ #
+ # TODO: the "not self.uselist" can be taken out entirely; a m2o
+ # load that populates for a list (very unusual, but is possible with
+ # the API) can still set for "None" and the attribute system will
+ # populate as an empty list.
+ self.use_get = (
+ not self.is_aliased_class
+ and not self.uselist
+ and self.entity._get_clause[0].compare(
+ self._lazywhere,
+ use_proxies=True,
+ compare_keys=False,
+ equivalents=self.mapper._equivalent_columns,
+ )
+ )
+
+ if self.use_get:
+ for col in list(self._equated_columns):
+ if col in self.mapper._equivalent_columns:
+ for c in self.mapper._equivalent_columns[col]:
+ self._equated_columns[c] = self._equated_columns[col]
+
+ self.logger.info(
+ "%s will use Session.get() to optimize instance loads", self
+ )
+
+ def init_class_attribute(self, mapper):
+ self.is_class_level = True
+
+ _legacy_inactive_history_style = (
+ self.parent_property._legacy_inactive_history_style
+ )
+
+ if self.parent_property.active_history:
+ active_history = True
+ _deferred_history = False
+
+ elif (
+ self.parent_property.direction is not interfaces.MANYTOONE
+ or not self.use_get
+ ):
+ if _legacy_inactive_history_style:
+ active_history = True
+ _deferred_history = False
+ else:
+ active_history = False
+ _deferred_history = True
+ else:
+ active_history = _deferred_history = False
+
+ _register_attribute(
+ self.parent_property,
+ mapper,
+ useobject=True,
+ callable_=self._load_for_state,
+ typecallable=self.parent_property.collection_class,
+ active_history=active_history,
+ _deferred_history=_deferred_history,
+ )
+
+ def _memoized_attr__simple_lazy_clause(self):
+ lazywhere = sql_util._deep_annotate(
+ self._lazywhere, {"_orm_adapt": True}
+ )
+
+ criterion, bind_to_col = (lazywhere, self._bind_to_col)
+
+ params = []
+
+ def visit_bindparam(bindparam):
+ bindparam.unique = False
+
+ visitors.traverse(criterion, {}, {"bindparam": visit_bindparam})
+
+ def visit_bindparam(bindparam):
+ if bindparam._identifying_key in bind_to_col:
+ params.append(
+ (
+ bindparam.key,
+ bind_to_col[bindparam._identifying_key],
+ None,
+ )
+ )
+ elif bindparam.callable is None:
+ params.append((bindparam.key, None, bindparam.value))
+
+ criterion = visitors.cloned_traverse(
+ criterion, {}, {"bindparam": visit_bindparam}
+ )
+
+ return criterion, params
+
+ def _generate_lazy_clause(self, state, passive):
+ criterion, param_keys = self._simple_lazy_clause
+
+ if state is None:
+ return sql_util.adapt_criterion_to_null(
+ criterion, [key for key, ident, value in param_keys]
+ )
+
+ mapper = self.parent_property.parent
+
+ o = state.obj() # strong ref
+ dict_ = attributes.instance_dict(o)
+
+ if passive & PassiveFlag.INIT_OK:
+ passive ^= PassiveFlag.INIT_OK
+
+ params = {}
+ for key, ident, value in param_keys:
+ if ident is not None:
+ if passive and passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
+ value = mapper._get_committed_state_attr_by_column(
+ state, dict_, ident, passive
+ )
+ else:
+ value = mapper._get_state_attr_by_column(
+ state, dict_, ident, passive
+ )
+
+ params[key] = value
+
+ return criterion, params
+
+ def _invoke_raise_load(self, state, passive, lazy):
+ raise sa_exc.InvalidRequestError(
+ "'%s' is not available due to lazy='%s'" % (self, lazy)
+ )
+
+ def _load_for_state(
+ self,
+ state,
+ passive,
+ loadopt=None,
+ extra_criteria=(),
+ extra_options=(),
+ alternate_effective_path=None,
+ execution_options=util.EMPTY_DICT,
+ ):
+ if not state.key and (
+ (
+ not self.parent_property.load_on_pending
+ and not state._load_pending
+ )
+ or not state.session_id
+ ):
+ return LoaderCallableStatus.ATTR_EMPTY
+
+ pending = not state.key
+ primary_key_identity = None
+
+ use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
+
+ if (not passive & PassiveFlag.SQL_OK and not use_get) or (
+ not passive & attributes.NON_PERSISTENT_OK and pending
+ ):
+ return LoaderCallableStatus.PASSIVE_NO_RESULT
+
+ if (
+ # we were given lazy="raise"
+ self._raise_always
+ # the no_raise history-related flag was not passed
+ and not passive & PassiveFlag.NO_RAISE
+ and (
+ # if we are use_get and related_object_ok is disabled,
+ # which means we are at most looking in the identity map
+ # for history purposes or otherwise returning
+ # PASSIVE_NO_RESULT, don't raise. This is also a
+ # history-related flag
+ not use_get
+ or passive & PassiveFlag.RELATED_OBJECT_OK
+ )
+ ):
+ self._invoke_raise_load(state, passive, "raise")
+
+ session = _state_session(state)
+ if not session:
+ if passive & PassiveFlag.NO_RAISE:
+ return LoaderCallableStatus.PASSIVE_NO_RESULT
+
+ raise orm_exc.DetachedInstanceError(
+ "Parent instance %s is not bound to a Session; "
+ "lazy load operation of attribute '%s' cannot proceed"
+ % (orm_util.state_str(state), self.key)
+ )
+
+ # if we have a simple primary key load, check the
+ # identity map without generating a Query at all
+ if use_get:
+ primary_key_identity = self._get_ident_for_use_get(
+ session, state, passive
+ )
+ if LoaderCallableStatus.PASSIVE_NO_RESULT in primary_key_identity:
+ return LoaderCallableStatus.PASSIVE_NO_RESULT
+ elif LoaderCallableStatus.NEVER_SET in primary_key_identity:
+ return LoaderCallableStatus.NEVER_SET
+
+ if _none_set.issuperset(primary_key_identity):
+ return None
+
+ if (
+ self.key in state.dict
+ and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
+ ):
+ return LoaderCallableStatus.ATTR_WAS_SET
+
+ # look for this identity in the identity map. Delegate to the
+ # Query class in use, as it may have special rules for how it
+ # does this, including how it decides what the correct
+ # identity_token would be for this identity.
+
+ instance = session._identity_lookup(
+ self.entity,
+ primary_key_identity,
+ passive=passive,
+ lazy_loaded_from=state,
+ )
+
+ if instance is not None:
+ if instance is LoaderCallableStatus.PASSIVE_CLASS_MISMATCH:
+ return None
+ else:
+ return instance
+ elif (
+ not passive & PassiveFlag.SQL_OK
+ or not passive & PassiveFlag.RELATED_OBJECT_OK
+ ):
+ return LoaderCallableStatus.PASSIVE_NO_RESULT
+
+ return self._emit_lazyload(
+ session,
+ state,
+ primary_key_identity,
+ passive,
+ loadopt,
+ extra_criteria,
+ extra_options,
+ alternate_effective_path,
+ execution_options,
+ )
+
+ def _get_ident_for_use_get(self, session, state, passive):
+ instance_mapper = state.manager.mapper
+
+ if passive & PassiveFlag.LOAD_AGAINST_COMMITTED:
+ get_attr = instance_mapper._get_committed_state_attr_by_column
+ else:
+ get_attr = instance_mapper._get_state_attr_by_column
+
+ dict_ = state.dict
+
+ return [
+ get_attr(state, dict_, self._equated_columns[pk], passive=passive)
+ for pk in self.mapper.primary_key
+ ]
+
+ @util.preload_module("sqlalchemy.orm.strategy_options")
+ def _emit_lazyload(
+ self,
+ session,
+ state,
+ primary_key_identity,
+ passive,
+ loadopt,
+ extra_criteria,
+ extra_options,
+ alternate_effective_path,
+ execution_options,
+ ):
+ strategy_options = util.preloaded.orm_strategy_options
+
+ clauseelement = self.entity.__clause_element__()
+ stmt = Select._create_raw_select(
+ _raw_columns=[clauseelement],
+ _propagate_attrs=clauseelement._propagate_attrs,
+ _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
+ _compile_options=ORMCompileState.default_compile_options,
+ )
+ load_options = QueryContext.default_load_options
+
+ load_options += {
+ "_invoke_all_eagers": False,
+ "_lazy_loaded_from": state,
+ }
+
+ if self.parent_property.secondary is not None:
+ stmt = stmt.select_from(
+ self.mapper, self.parent_property.secondary
+ )
+
+ pending = not state.key
+
+ # don't autoflush on pending
+ if pending or passive & attributes.NO_AUTOFLUSH:
+ stmt._execution_options = util.immutabledict({"autoflush": False})
+
+ use_get = self.use_get
+
+ if state.load_options or (loadopt and loadopt._extra_criteria):
+ if alternate_effective_path is None:
+ effective_path = state.load_path[self.parent_property]
+ else:
+ effective_path = alternate_effective_path[self.parent_property]
+
+ opts = state.load_options
+
+ if loadopt and loadopt._extra_criteria:
+ use_get = False
+ opts += (
+ orm_util.LoaderCriteriaOption(self.entity, extra_criteria),
+ )
+
+ stmt._with_options = opts
+ elif alternate_effective_path is None:
+ # this path is used if there are not already any options
+ # in the query, but an event may want to add them
+ effective_path = state.mapper._path_registry[self.parent_property]
+ else:
+ # added by immediateloader
+ effective_path = alternate_effective_path[self.parent_property]
+
+ if extra_options:
+ stmt._with_options += extra_options
+
+ stmt._compile_options += {"_current_path": effective_path}
+
+ if use_get:
+ if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
+ self._invoke_raise_load(state, passive, "raise_on_sql")
+
+ return loading.load_on_pk_identity(
+ session,
+ stmt,
+ primary_key_identity,
+ load_options=load_options,
+ execution_options=execution_options,
+ )
+
+ if self._order_by:
+ stmt._order_by_clauses = self._order_by
+
+ def _lazyload_reverse(compile_context):
+ for rev in self.parent_property._reverse_property:
+ # reverse props that are MANYTOONE are loading *this*
+ # object from get(), so don't need to eager out to those.
+ if (
+ rev.direction is interfaces.MANYTOONE
+ and rev._use_get
+ and not isinstance(rev.strategy, LazyLoader)
+ ):
+ strategy_options.Load._construct_for_existing_path(
+ compile_context.compile_options._current_path[
+ rev.parent
+ ]
+ ).lazyload(rev).process_compile_state(compile_context)
+
+ stmt._with_context_options += (
+ (_lazyload_reverse, self.parent_property),
+ )
+
+ lazy_clause, params = self._generate_lazy_clause(state, passive)
+
+ if execution_options:
+ execution_options = util.EMPTY_DICT.merge_with(
+ execution_options,
+ {
+ "_sa_orm_load_options": load_options,
+ },
+ )
+ else:
+ execution_options = {
+ "_sa_orm_load_options": load_options,
+ }
+
+ if (
+ self.key in state.dict
+ and not passive & PassiveFlag.DEFERRED_HISTORY_LOAD
+ ):
+ return LoaderCallableStatus.ATTR_WAS_SET
+
+ if pending:
+ if util.has_intersection(orm_util._none_set, params.values()):
+ return None
+
+ elif util.has_intersection(orm_util._never_set, params.values()):
+ return None
+
+ if self._raise_on_sql and not passive & PassiveFlag.NO_RAISE:
+ self._invoke_raise_load(state, passive, "raise_on_sql")
+
+ stmt._where_criteria = (lazy_clause,)
+
+ result = session.execute(
+ stmt, params, execution_options=execution_options
+ )
+
+ result = result.unique().scalars().all()
+
+ if self.uselist:
+ return result
+ else:
+ l = len(result)
+ if l:
+ if l > 1:
+ util.warn(
+ "Multiple rows returned with "
+ "uselist=False for lazily-loaded attribute '%s' "
+ % self.parent_property
+ )
+
+ return result[0]
+ else:
+ return None
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ key = self.key
+
+ if (
+ context.load_options._is_user_refresh
+ and context.query._compile_options._only_load_props
+ and self.key in context.query._compile_options._only_load_props
+ ):
+ return self._immediateload_create_row_processor(
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ )
+
+ if not self.is_class_level or (loadopt and loadopt._extra_criteria):
+ # we are not the primary manager for this attribute
+ # on this class - set up a
+ # per-instance lazyloader, which will override the
+ # class-level behavior.
+ # this currently only happens when using a
+ # "lazyload" option on a "no load"
+ # attribute - "eager" attributes always have a
+ # class-level lazyloader installed.
+ set_lazy_callable = (
+ InstanceState._instance_level_callable_processor
+ )(
+ mapper.class_manager,
+ LoadLazyAttribute(
+ key,
+ self,
+ loadopt,
+ (
+ loadopt._generate_extra_criteria(context)
+ if loadopt._extra_criteria
+ else None
+ ),
+ ),
+ key,
+ )
+
+ populators["new"].append((self.key, set_lazy_callable))
+ elif context.populate_existing or mapper.always_refresh:
+
+ def reset_for_lazy_callable(state, dict_, row):
+ # we are the primary manager for this attribute on
+ # this class - reset its
+ # per-instance attribute state, so that the class-level
+ # lazy loader is
+ # executed when next referenced on this instance.
+ # this is needed in
+ # populate_existing() types of scenarios to reset
+ # any existing state.
+ state._reset(dict_, key)
+
+ populators["new"].append((self.key, reset_for_lazy_callable))
+
+
+class LoadLazyAttribute:
+ """semi-serializable loader object used by LazyLoader
+
+ Historically, this object would be carried along with instances that
+ needed to run lazyloaders, so it had to be serializable to support
+ cached instances.
+
+ this is no longer a general requirement, and the case where this object
+ is used is exactly the case where we can't really serialize easily,
+ which is when extra criteria in the loader option is present.
+
+ We can't reliably serialize that as it refers to mapped entities and
+ AliasedClass objects that are local to the current process, which would
+ need to be matched up on deserialize e.g. the sqlalchemy.ext.serializer
+ approach.
+
+ """
+
+ def __init__(self, key, initiating_strategy, loadopt, extra_criteria):
+ self.key = key
+ self.strategy_key = initiating_strategy.strategy_key
+ self.loadopt = loadopt
+ self.extra_criteria = extra_criteria
+
+ def __getstate__(self):
+ if self.extra_criteria is not None:
+ util.warn(
+ "Can't reliably serialize a lazyload() option that "
+ "contains additional criteria; please use eager loading "
+ "for this case"
+ )
+ return {
+ "key": self.key,
+ "strategy_key": self.strategy_key,
+ "loadopt": self.loadopt,
+ "extra_criteria": (),
+ }
+
+ def __call__(self, state, passive=attributes.PASSIVE_OFF):
+ key = self.key
+ instance_mapper = state.manager.mapper
+ prop = instance_mapper._props[key]
+ strategy = prop._strategies[self.strategy_key]
+
+ return strategy._load_for_state(
+ state,
+ passive,
+ loadopt=self.loadopt,
+ extra_criteria=self.extra_criteria,
+ )
+
+
+class PostLoader(AbstractRelationshipLoader):
+ """A relationship loader that emits a second SELECT statement."""
+
+ __slots__ = ()
+
+ def _setup_for_recursion(self, context, path, loadopt, join_depth=None):
+ effective_path = (
+ context.compile_state.current_path or orm_util.PathRegistry.root
+ ) + path
+
+ top_level_context = context._get_top_level_context()
+ execution_options = util.immutabledict(
+ {"sa_top_level_orm_context": top_level_context}
+ )
+
+ if loadopt:
+ recursion_depth = loadopt.local_opts.get("recursion_depth", None)
+ unlimited_recursion = recursion_depth == -1
+ else:
+ recursion_depth = None
+ unlimited_recursion = False
+
+ if recursion_depth is not None:
+ if not self.parent_property._is_self_referential:
+ raise sa_exc.InvalidRequestError(
+ f"recursion_depth option on relationship "
+ f"{self.parent_property} not valid for "
+ "non-self-referential relationship"
+ )
+ recursion_depth = context.execution_options.get(
+ f"_recursion_depth_{id(self)}", recursion_depth
+ )
+
+ if not unlimited_recursion and recursion_depth < 0:
+ return (
+ effective_path,
+ False,
+ execution_options,
+ recursion_depth,
+ )
+
+ if not unlimited_recursion:
+ execution_options = execution_options.union(
+ {
+ f"_recursion_depth_{id(self)}": recursion_depth - 1,
+ }
+ )
+
+ if loading.PostLoad.path_exists(
+ context, effective_path, self.parent_property
+ ):
+ return effective_path, False, execution_options, recursion_depth
+
+ path_w_prop = path[self.parent_property]
+ effective_path_w_prop = effective_path[self.parent_property]
+
+ if not path_w_prop.contains(context.attributes, "loader"):
+ if join_depth:
+ if effective_path_w_prop.length / 2 > join_depth:
+ return (
+ effective_path,
+ False,
+ execution_options,
+ recursion_depth,
+ )
+ elif effective_path_w_prop.contains_mapper(self.mapper):
+ return (
+ effective_path,
+ False,
+ execution_options,
+ recursion_depth,
+ )
+
+ return effective_path, True, execution_options, recursion_depth
+
+
+@relationships.RelationshipProperty.strategy_for(lazy="immediate")
+class ImmediateLoader(PostLoader):
+ __slots__ = ("join_depth",)
+
+ def __init__(self, parent, strategy_key):
+ super().__init__(parent, strategy_key)
+ self.join_depth = self.parent_property.join_depth
+
+ def init_class_attribute(self, mapper):
+ self.parent_property._get_strategy(
+ (("lazy", "select"),)
+ ).init_class_attribute(mapper)
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ (
+ effective_path,
+ run_loader,
+ execution_options,
+ recursion_depth,
+ ) = self._setup_for_recursion(context, path, loadopt, self.join_depth)
+ if not run_loader:
+ # this will not emit SQL and will only emit for a many-to-one
+ # "use get" load. the "_RELATED" part means it may return
+ # instance even if its expired, since this is a mutually-recursive
+ # load operation.
+ flags = attributes.PASSIVE_NO_FETCH_RELATED | PassiveFlag.NO_RAISE
+ else:
+ flags = attributes.PASSIVE_OFF | PassiveFlag.NO_RAISE
+
+ loading.PostLoad.callable_for_path(
+ context,
+ effective_path,
+ self.parent,
+ self.parent_property,
+ self._load_for_path,
+ loadopt,
+ flags,
+ recursion_depth,
+ execution_options,
+ )
+
+ def _load_for_path(
+ self,
+ context,
+ path,
+ states,
+ load_only,
+ loadopt,
+ flags,
+ recursion_depth,
+ execution_options,
+ ):
+ if recursion_depth:
+ new_opt = Load(loadopt.path.entity)
+ new_opt.context = (
+ loadopt,
+ loadopt._recurse(),
+ )
+ alternate_effective_path = path._truncate_recursive()
+ extra_options = (new_opt,)
+ else:
+ new_opt = None
+ alternate_effective_path = path
+ extra_options = ()
+
+ key = self.key
+ lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
+ for state, overwrite in states:
+ dict_ = state.dict
+
+ if overwrite or key not in dict_:
+ value = lazyloader._load_for_state(
+ state,
+ flags,
+ extra_options=extra_options,
+ alternate_effective_path=alternate_effective_path,
+ execution_options=execution_options,
+ )
+ if value not in (
+ ATTR_WAS_SET,
+ LoaderCallableStatus.PASSIVE_NO_RESULT,
+ ):
+ state.get_impl(key).set_committed_value(
+ state, dict_, value
+ )
+
+
+@log.class_logger
+@relationships.RelationshipProperty.strategy_for(lazy="subquery")
+class SubqueryLoader(PostLoader):
+ __slots__ = ("join_depth",)
+
+ def __init__(self, parent, strategy_key):
+ super().__init__(parent, strategy_key)
+ self.join_depth = self.parent_property.join_depth
+
+ def init_class_attribute(self, mapper):
+ self.parent_property._get_strategy(
+ (("lazy", "select"),)
+ ).init_class_attribute(mapper)
+
+ def _get_leftmost(
+ self,
+ orig_query_entity_index,
+ subq_path,
+ current_compile_state,
+ is_root,
+ ):
+ given_subq_path = subq_path
+ subq_path = subq_path.path
+ subq_mapper = orm_util._class_to_mapper(subq_path[0])
+
+ # determine attributes of the leftmost mapper
+ if (
+ self.parent.isa(subq_mapper)
+ and self.parent_property is subq_path[1]
+ ):
+ leftmost_mapper, leftmost_prop = self.parent, self.parent_property
+ else:
+ leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]
+
+ if is_root:
+ # the subq_path is also coming from cached state, so when we start
+ # building up this path, it has to also be converted to be in terms
+ # of the current state. this is for the specific case of the entity
+ # is an AliasedClass against a subquery that's not otherwise going
+ # to adapt
+ new_subq_path = current_compile_state._entities[
+ orig_query_entity_index
+ ].entity_zero._path_registry[leftmost_prop]
+ additional = len(subq_path) - len(new_subq_path)
+ if additional:
+ new_subq_path += path_registry.PathRegistry.coerce(
+ subq_path[-additional:]
+ )
+ else:
+ new_subq_path = given_subq_path
+
+ leftmost_cols = leftmost_prop.local_columns
+
+ leftmost_attr = [
+ getattr(
+ new_subq_path.path[0].entity,
+ leftmost_mapper._columntoproperty[c].key,
+ )
+ for c in leftmost_cols
+ ]
+
+ return leftmost_mapper, leftmost_attr, leftmost_prop, new_subq_path
+
+ def _generate_from_original_query(
+ self,
+ orig_compile_state,
+ orig_query,
+ leftmost_mapper,
+ leftmost_attr,
+ leftmost_relationship,
+ orig_entity,
+ ):
+ # reformat the original query
+ # to look only for significant columns
+ q = orig_query._clone().correlate(None)
+
+ # LEGACY: make a Query back from the select() !!
+ # This suits at least two legacy cases:
+ # 1. applications which expect before_compile() to be called
+ # below when we run .subquery() on this query (Keystone)
+ # 2. applications which are doing subqueryload with complex
+ # from_self() queries, as query.subquery() / .statement
+ # has to do the full compile context for multiply-nested
+ # from_self() (Neutron) - see test_subqload_from_self
+ # for demo.
+ q2 = query.Query.__new__(query.Query)
+ q2.__dict__.update(q.__dict__)
+ q = q2
+
+ # set the query's "FROM" list explicitly to what the
+ # FROM list would be in any case, as we will be limiting
+ # the columns in the SELECT list which may no longer include
+ # all entities mentioned in things like WHERE, JOIN, etc.
+ if not q._from_obj:
+ q._enable_assertions = False
+ q.select_from.non_generative(
+ q,
+ *{
+ ent["entity"]
+ for ent in _column_descriptions(
+ orig_query, compile_state=orig_compile_state
+ )
+ if ent["entity"] is not None
+ },
+ )
+
+ # select from the identity columns of the outer (specifically, these
+ # are the 'local_cols' of the property). This will remove other
+ # columns from the query that might suggest the right entity which is
+ # why we do set select_from above. The attributes we have are
+ # coerced and adapted using the original query's adapter, which is
+ # needed only for the case of adapting a subclass column to
+ # that of a polymorphic selectable, e.g. we have
+ # Engineer.primary_language and the entity is Person. All other
+ # adaptations, e.g. from_self, select_entity_from(), will occur
+ # within the new query when it compiles, as the compile_state we are
+ # using here is only a partial one. If the subqueryload is from a
+ # with_polymorphic() or other aliased() object, left_attr will already
+ # be the correct attributes so no adaptation is needed.
+ target_cols = orig_compile_state._adapt_col_list(
+ [
+ sql.coercions.expect(sql.roles.ColumnsClauseRole, o)
+ for o in leftmost_attr
+ ],
+ orig_compile_state._get_current_adapter(),
+ )
+ q._raw_columns = target_cols
+
+ distinct_target_key = leftmost_relationship.distinct_target_key
+
+ if distinct_target_key is True:
+ q._distinct = True
+ elif distinct_target_key is None:
+ # if target_cols refer to a non-primary key or only
+ # part of a composite primary key, set the q as distinct
+ for t in {c.table for c in target_cols}:
+ if not set(target_cols).issuperset(t.primary_key):
+ q._distinct = True
+ break
+
+ # don't need ORDER BY if no limit/offset
+ if not q._has_row_limiting_clause:
+ q._order_by_clauses = ()
+
+ if q._distinct is True and q._order_by_clauses:
+ # the logic to automatically add the order by columns to the query
+ # when distinct is True is deprecated in the query
+ to_add = sql_util.expand_column_list_from_order_by(
+ target_cols, q._order_by_clauses
+ )
+ if to_add:
+ q._set_entities(target_cols + to_add)
+
+ # the original query now becomes a subquery
+ # which we'll join onto.
+ # LEGACY: as "q" is a Query, the before_compile() event is invoked
+ # here.
+ embed_q = q.set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL).subquery()
+ left_alias = orm_util.AliasedClass(
+ leftmost_mapper, embed_q, use_mapper_path=True
+ )
+ return left_alias
+
+ def _prep_for_joins(self, left_alias, subq_path):
+ # figure out what's being joined. a.k.a. the fun part
+ to_join = []
+ pairs = list(subq_path.pairs())
+
+ for i, (mapper, prop) in enumerate(pairs):
+ if i > 0:
+ # look at the previous mapper in the chain -
+ # if it is as or more specific than this prop's
+ # mapper, use that instead.
+ # note we have an assumption here that
+ # the non-first element is always going to be a mapper,
+ # not an AliasedClass
+
+ prev_mapper = pairs[i - 1][1].mapper
+ to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
+ else:
+ to_append = mapper
+
+ to_join.append((to_append, prop.key))
+
+ # determine the immediate parent class we are joining from,
+ # which needs to be aliased.
+
+ if len(to_join) < 2:
+ # in the case of a one level eager load, this is the
+ # leftmost "left_alias".
+ parent_alias = left_alias
+ else:
+ info = inspect(to_join[-1][0])
+ if info.is_aliased_class:
+ parent_alias = info.entity
+ else:
+ # alias a plain mapper as we may be
+ # joining multiple times
+ parent_alias = orm_util.AliasedClass(
+ info.entity, use_mapper_path=True
+ )
+
+ local_cols = self.parent_property.local_columns
+
+ local_attr = [
+ getattr(parent_alias, self.parent._columntoproperty[c].key)
+ for c in local_cols
+ ]
+ return to_join, local_attr, parent_alias
+
+ def _apply_joins(
+ self, q, to_join, left_alias, parent_alias, effective_entity
+ ):
+ ltj = len(to_join)
+ if ltj == 1:
+ to_join = [
+ getattr(left_alias, to_join[0][1]).of_type(effective_entity)
+ ]
+ elif ltj == 2:
+ to_join = [
+ getattr(left_alias, to_join[0][1]).of_type(parent_alias),
+ getattr(parent_alias, to_join[-1][1]).of_type(
+ effective_entity
+ ),
+ ]
+ elif ltj > 2:
+ middle = [
+ (
+ (
+ orm_util.AliasedClass(item[0])
+ if not inspect(item[0]).is_aliased_class
+ else item[0].entity
+ ),
+ item[1],
+ )
+ for item in to_join[1:-1]
+ ]
+ inner = []
+
+ while middle:
+ item = middle.pop(0)
+ attr = getattr(item[0], item[1])
+ if middle:
+ attr = attr.of_type(middle[0][0])
+ else:
+ attr = attr.of_type(parent_alias)
+
+ inner.append(attr)
+
+ to_join = (
+ [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)]
+ + inner
+ + [
+ getattr(parent_alias, to_join[-1][1]).of_type(
+ effective_entity
+ )
+ ]
+ )
+
+ for attr in to_join:
+ q = q.join(attr)
+
+ return q
+
+ def _setup_options(
+ self,
+ context,
+ q,
+ subq_path,
+ rewritten_path,
+ orig_query,
+ effective_entity,
+ loadopt,
+ ):
+ # note that because the subqueryload object
+ # does not re-use the cached query, instead always making
+ # use of the current invoked query, while we have two queries
+ # here (orig and context.query), they are both non-cached
+ # queries and we can transfer the options as is without
+ # adjusting for new criteria. Some work on #6881 / #6889
+ # brought this into question.
+ new_options = orig_query._with_options
+
+ if loadopt and loadopt._extra_criteria:
+ new_options += (
+ orm_util.LoaderCriteriaOption(
+ self.entity,
+ loadopt._generate_extra_criteria(context),
+ ),
+ )
+
+ # propagate loader options etc. to the new query.
+ # these will fire relative to subq_path.
+ q = q._with_current_path(rewritten_path)
+ q = q.options(*new_options)
+
+ return q
+
+ def _setup_outermost_orderby(self, q):
+ if self.parent_property.order_by:
+
+ def _setup_outermost_orderby(compile_context):
+ compile_context.eager_order_by += tuple(
+ util.to_list(self.parent_property.order_by)
+ )
+
+ q = q._add_context_option(
+ _setup_outermost_orderby, self.parent_property
+ )
+
+ return q
+
+ class _SubqCollections:
+ """Given a :class:`_query.Query` used to emit the "subquery load",
+ provide a load interface that executes the query at the
+ first moment a value is needed.
+
+ """
+
+ __slots__ = (
+ "session",
+ "execution_options",
+ "load_options",
+ "params",
+ "subq",
+ "_data",
+ )
+
+ def __init__(self, context, subq):
+ # avoid creating a cycle by storing context
+ # even though that's preferable
+ self.session = context.session
+ self.execution_options = context.execution_options
+ self.load_options = context.load_options
+ self.params = context.params or {}
+ self.subq = subq
+ self._data = None
+
+ def get(self, key, default):
+ if self._data is None:
+ self._load()
+ return self._data.get(key, default)
+
+ def _load(self):
+ self._data = collections.defaultdict(list)
+
+ q = self.subq
+ assert q.session is None
+
+ q = q.with_session(self.session)
+
+ if self.load_options._populate_existing:
+ q = q.populate_existing()
+ # to work with baked query, the parameters may have been
+ # updated since this query was created, so take these into account
+
+ rows = list(q.params(self.params))
+ for k, v in itertools.groupby(rows, lambda x: x[1:]):
+ self._data[k].extend(vv[0] for vv in v)
+
+ def loader(self, state, dict_, row):
+ if self._data is None:
+ self._load()
+
+ def _setup_query_from_rowproc(
+ self,
+ context,
+ query_entity,
+ path,
+ entity,
+ loadopt,
+ adapter,
+ ):
+ compile_state = context.compile_state
+ if (
+ not compile_state.compile_options._enable_eagerloads
+ or compile_state.compile_options._for_refresh_state
+ ):
+ return
+
+ orig_query_entity_index = compile_state._entities.index(query_entity)
+ context.loaders_require_buffering = True
+
+ path = path[self.parent_property]
+
+ # build up a path indicating the path from the leftmost
+ # entity to the thing we're subquery loading.
+ with_poly_entity = path.get(
+ compile_state.attributes, "path_with_polymorphic", None
+ )
+ if with_poly_entity is not None:
+ effective_entity = with_poly_entity
+ else:
+ effective_entity = self.entity
+
+ subq_path, rewritten_path = context.query._execution_options.get(
+ ("subquery_paths", None),
+ (orm_util.PathRegistry.root, orm_util.PathRegistry.root),
+ )
+ is_root = subq_path is orm_util.PathRegistry.root
+ subq_path = subq_path + path
+ rewritten_path = rewritten_path + path
+
+ # use the current query being invoked, not the compile state
+ # one. this is so that we get the current parameters. however,
+ # it means we can't use the existing compile state, we have to make
+ # a new one. other approaches include possibly using the
+ # compiled query but swapping the params, seems only marginally
+ # less time spent but more complicated
+ orig_query = context.query._execution_options.get(
+ ("orig_query", SubqueryLoader), context.query
+ )
+
+ # make a new compile_state for the query that's probably cached, but
+ # we're sort of undoing a bit of that caching :(
+ compile_state_cls = ORMCompileState._get_plugin_class_for_plugin(
+ orig_query, "orm"
+ )
+
+ if orig_query._is_lambda_element:
+ if context.load_options._lazy_loaded_from is None:
+ util.warn(
+ 'subqueryloader for "%s" must invoke lambda callable '
+ "at %r in "
+ "order to produce a new query, decreasing the efficiency "
+ "of caching for this statement. Consider using "
+ "selectinload() for more effective full-lambda caching"
+ % (self, orig_query)
+ )
+ orig_query = orig_query._resolved
+
+ # this is the more "quick" version, however it's not clear how
+ # much of this we need. in particular I can't get a test to
+ # fail if the "set_base_alias" is missing and not sure why that is.
+ orig_compile_state = compile_state_cls._create_entities_collection(
+ orig_query, legacy=False
+ )
+
+ (
+ leftmost_mapper,
+ leftmost_attr,
+ leftmost_relationship,
+ rewritten_path,
+ ) = self._get_leftmost(
+ orig_query_entity_index,
+ rewritten_path,
+ orig_compile_state,
+ is_root,
+ )
+
+ # generate a new Query from the original, then
+ # produce a subquery from it.
+ left_alias = self._generate_from_original_query(
+ orig_compile_state,
+ orig_query,
+ leftmost_mapper,
+ leftmost_attr,
+ leftmost_relationship,
+ entity,
+ )
+
+ # generate another Query that will join the
+ # left alias to the target relationships.
+ # basically doing a longhand
+ # "from_self()". (from_self() itself not quite industrial
+ # strength enough for all contingencies...but very close)
+
+ q = query.Query(effective_entity)
+
+ q._execution_options = context.query._execution_options.merge_with(
+ context.execution_options,
+ {
+ ("orig_query", SubqueryLoader): orig_query,
+ ("subquery_paths", None): (subq_path, rewritten_path),
+ },
+ )
+
+ q = q._set_enable_single_crit(False)
+ to_join, local_attr, parent_alias = self._prep_for_joins(
+ left_alias, subq_path
+ )
+
+ q = q.add_columns(*local_attr)
+ q = self._apply_joins(
+ q, to_join, left_alias, parent_alias, effective_entity
+ )
+
+ q = self._setup_options(
+ context,
+ q,
+ subq_path,
+ rewritten_path,
+ orig_query,
+ effective_entity,
+ loadopt,
+ )
+ q = self._setup_outermost_orderby(q)
+
+ return q
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ if context.refresh_state:
+ return self._immediateload_create_row_processor(
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ )
+
+ _, run_loader, _, _ = self._setup_for_recursion(
+ context, path, loadopt, self.join_depth
+ )
+ if not run_loader:
+ return
+
+ if not isinstance(context.compile_state, ORMSelectCompileState):
+ # issue 7505 - subqueryload() in 1.3 and previous would silently
+ # degrade for from_statement() without warning. this behavior
+ # is restored here
+ return
+
+ if not self.parent.class_manager[self.key].impl.supports_population:
+ raise sa_exc.InvalidRequestError(
+ "'%s' does not support object "
+ "population - eager loading cannot be applied." % self
+ )
+
+ # a little dance here as the "path" is still something that only
+ # semi-tracks the exact series of things we are loading, still not
+ # telling us about with_polymorphic() and stuff like that when it's at
+ # the root.. the initial MapperEntity is more accurate for this case.
+ if len(path) == 1:
+ if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
+ return
+ elif not orm_util._entity_isa(path[-1], self.parent):
+ return
+
+ subq = self._setup_query_from_rowproc(
+ context,
+ query_entity,
+ path,
+ path[-1],
+ loadopt,
+ adapter,
+ )
+
+ if subq is None:
+ return
+
+ assert subq.session is None
+
+ path = path[self.parent_property]
+
+ local_cols = self.parent_property.local_columns
+
+ # cache the loaded collections in the context
+ # so that inheriting mappers don't re-load when they
+ # call upon create_row_processor again
+ collections = path.get(context.attributes, "collections")
+ if collections is None:
+ collections = self._SubqCollections(context, subq)
+ path.set(context.attributes, "collections", collections)
+
+ if adapter:
+ local_cols = [adapter.columns[c] for c in local_cols]
+
+ if self.uselist:
+ self._create_collection_loader(
+ context, result, collections, local_cols, populators
+ )
+ else:
+ self._create_scalar_loader(
+ context, result, collections, local_cols, populators
+ )
+
+ def _create_collection_loader(
+ self, context, result, collections, local_cols, populators
+ ):
+ tuple_getter = result._tuple_getter(local_cols)
+
+ def load_collection_from_subq(state, dict_, row):
+ collection = collections.get(tuple_getter(row), ())
+ state.get_impl(self.key).set_committed_value(
+ state, dict_, collection
+ )
+
+ def load_collection_from_subq_existing_row(state, dict_, row):
+ if self.key not in dict_:
+ load_collection_from_subq(state, dict_, row)
+
+ populators["new"].append((self.key, load_collection_from_subq))
+ populators["existing"].append(
+ (self.key, load_collection_from_subq_existing_row)
+ )
+
+ if context.invoke_all_eagers:
+ populators["eager"].append((self.key, collections.loader))
+
+ def _create_scalar_loader(
+ self, context, result, collections, local_cols, populators
+ ):
+ tuple_getter = result._tuple_getter(local_cols)
+
+ def load_scalar_from_subq(state, dict_, row):
+ collection = collections.get(tuple_getter(row), (None,))
+ if len(collection) > 1:
+ util.warn(
+ "Multiple rows returned with "
+ "uselist=False for eagerly-loaded attribute '%s' " % self
+ )
+
+ scalar = collection[0]
+ state.get_impl(self.key).set_committed_value(state, dict_, scalar)
+
+ def load_scalar_from_subq_existing_row(state, dict_, row):
+ if self.key not in dict_:
+ load_scalar_from_subq(state, dict_, row)
+
+ populators["new"].append((self.key, load_scalar_from_subq))
+ populators["existing"].append(
+ (self.key, load_scalar_from_subq_existing_row)
+ )
+ if context.invoke_all_eagers:
+ populators["eager"].append((self.key, collections.loader))
+
+
+@log.class_logger
+@relationships.RelationshipProperty.strategy_for(lazy="joined")
+@relationships.RelationshipProperty.strategy_for(lazy=False)
+class JoinedLoader(AbstractRelationshipLoader):
+ """Provide loading behavior for a :class:`.Relationship`
+ using joined eager loading.
+
+ """
+
+ __slots__ = "join_depth"
+
+ def __init__(self, parent, strategy_key):
+ super().__init__(parent, strategy_key)
+ self.join_depth = self.parent_property.join_depth
+
+ def init_class_attribute(self, mapper):
+ self.parent_property._get_strategy(
+ (("lazy", "select"),)
+ ).init_class_attribute(mapper)
+
+ def setup_query(
+ self,
+ compile_state,
+ query_entity,
+ path,
+ loadopt,
+ adapter,
+ column_collection=None,
+ parentmapper=None,
+ chained_from_outerjoin=False,
+ **kwargs,
+ ):
+ """Add a left outer join to the statement that's being constructed."""
+
+ if not compile_state.compile_options._enable_eagerloads:
+ return
+ elif self.uselist:
+ compile_state.multi_row_eager_loaders = True
+
+ path = path[self.parent_property]
+
+ with_polymorphic = None
+
+ user_defined_adapter = (
+ self._init_user_defined_eager_proc(
+ loadopt, compile_state, compile_state.attributes
+ )
+ if loadopt
+ else False
+ )
+
+ if user_defined_adapter is not False:
+ # setup an adapter but dont create any JOIN, assume it's already
+ # in the query
+ (
+ clauses,
+ adapter,
+ add_to_collection,
+ ) = self._setup_query_on_user_defined_adapter(
+ compile_state,
+ query_entity,
+ path,
+ adapter,
+ user_defined_adapter,
+ )
+
+ # don't do "wrap" for multi-row, we want to wrap
+ # limited/distinct SELECT,
+ # because we want to put the JOIN on the outside.
+
+ else:
+ # if not via query option, check for
+ # a cycle
+ if not path.contains(compile_state.attributes, "loader"):
+ if self.join_depth:
+ if path.length / 2 > self.join_depth:
+ return
+ elif path.contains_mapper(self.mapper):
+ return
+
+ # add the JOIN and create an adapter
+ (
+ clauses,
+ adapter,
+ add_to_collection,
+ chained_from_outerjoin,
+ ) = self._generate_row_adapter(
+ compile_state,
+ query_entity,
+ path,
+ loadopt,
+ adapter,
+ column_collection,
+ parentmapper,
+ chained_from_outerjoin,
+ )
+
+ # for multi-row, we want to wrap limited/distinct SELECT,
+ # because we want to put the JOIN on the outside.
+ compile_state.eager_adding_joins = True
+
+ with_poly_entity = path.get(
+ compile_state.attributes, "path_with_polymorphic", None
+ )
+ if with_poly_entity is not None:
+ with_polymorphic = inspect(
+ with_poly_entity
+ ).with_polymorphic_mappers
+ else:
+ with_polymorphic = None
+
+ path = path[self.entity]
+
+ loading._setup_entity_query(
+ compile_state,
+ self.mapper,
+ query_entity,
+ path,
+ clauses,
+ add_to_collection,
+ with_polymorphic=with_polymorphic,
+ parentmapper=self.mapper,
+ chained_from_outerjoin=chained_from_outerjoin,
+ )
+
+ has_nones = util.NONE_SET.intersection(compile_state.secondary_columns)
+
+ if has_nones:
+ if with_poly_entity is not None:
+ raise sa_exc.InvalidRequestError(
+ "Detected unaliased columns when generating joined "
+ "load. Make sure to use aliased=True or flat=True "
+ "when using joined loading with with_polymorphic()."
+ )
+ else:
+ compile_state.secondary_columns = [
+ c for c in compile_state.secondary_columns if c is not None
+ ]
+
+ def _init_user_defined_eager_proc(
+ self, loadopt, compile_state, target_attributes
+ ):
+ # check if the opt applies at all
+ if "eager_from_alias" not in loadopt.local_opts:
+ # nope
+ return False
+
+ path = loadopt.path.parent
+
+ # the option applies. check if the "user_defined_eager_row_processor"
+ # has been built up.
+ adapter = path.get(
+ compile_state.attributes, "user_defined_eager_row_processor", False
+ )
+ if adapter is not False:
+ # just return it
+ return adapter
+
+ # otherwise figure it out.
+ alias = loadopt.local_opts["eager_from_alias"]
+ root_mapper, prop = path[-2:]
+
+ if alias is not None:
+ if isinstance(alias, str):
+ alias = prop.target.alias(alias)
+ adapter = orm_util.ORMAdapter(
+ orm_util._TraceAdaptRole.JOINEDLOAD_USER_DEFINED_ALIAS,
+ prop.mapper,
+ selectable=alias,
+ equivalents=prop.mapper._equivalent_columns,
+ limit_on_entity=False,
+ )
+ else:
+ if path.contains(
+ compile_state.attributes, "path_with_polymorphic"
+ ):
+ with_poly_entity = path.get(
+ compile_state.attributes, "path_with_polymorphic"
+ )
+ adapter = orm_util.ORMAdapter(
+ orm_util._TraceAdaptRole.JOINEDLOAD_PATH_WITH_POLYMORPHIC,
+ with_poly_entity,
+ equivalents=prop.mapper._equivalent_columns,
+ )
+ else:
+ adapter = compile_state._polymorphic_adapters.get(
+ prop.mapper, None
+ )
+ path.set(
+ target_attributes,
+ "user_defined_eager_row_processor",
+ adapter,
+ )
+
+ return adapter
+
+ def _setup_query_on_user_defined_adapter(
+ self, context, entity, path, adapter, user_defined_adapter
+ ):
+ # apply some more wrapping to the "user defined adapter"
+ # if we are setting up the query for SQL render.
+ adapter = entity._get_entity_clauses(context)
+
+ if adapter and user_defined_adapter:
+ user_defined_adapter = user_defined_adapter.wrap(adapter)
+ path.set(
+ context.attributes,
+ "user_defined_eager_row_processor",
+ user_defined_adapter,
+ )
+ elif adapter:
+ user_defined_adapter = adapter
+ path.set(
+ context.attributes,
+ "user_defined_eager_row_processor",
+ user_defined_adapter,
+ )
+
+ add_to_collection = context.primary_columns
+ return user_defined_adapter, adapter, add_to_collection
+
+ def _generate_row_adapter(
+ self,
+ compile_state,
+ entity,
+ path,
+ loadopt,
+ adapter,
+ column_collection,
+ parentmapper,
+ chained_from_outerjoin,
+ ):
+ with_poly_entity = path.get(
+ compile_state.attributes, "path_with_polymorphic", None
+ )
+ if with_poly_entity:
+ to_adapt = with_poly_entity
+ else:
+ insp = inspect(self.entity)
+ if insp.is_aliased_class:
+ alt_selectable = insp.selectable
+ else:
+ alt_selectable = None
+
+ to_adapt = orm_util.AliasedClass(
+ self.mapper,
+ alias=(
+ alt_selectable._anonymous_fromclause(flat=True)
+ if alt_selectable is not None
+ else None
+ ),
+ flat=True,
+ use_mapper_path=True,
+ )
+
+ to_adapt_insp = inspect(to_adapt)
+
+ clauses = to_adapt_insp._memo(
+ ("joinedloader_ormadapter", self),
+ orm_util.ORMAdapter,
+ orm_util._TraceAdaptRole.JOINEDLOAD_MEMOIZED_ADAPTER,
+ to_adapt_insp,
+ equivalents=self.mapper._equivalent_columns,
+ adapt_required=True,
+ allow_label_resolve=False,
+ anonymize_labels=True,
+ )
+
+ assert clauses.is_aliased_class
+
+ innerjoin = (
+ loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin)
+ if loadopt is not None
+ else self.parent_property.innerjoin
+ )
+
+ if not innerjoin:
+ # if this is an outer join, all non-nested eager joins from
+ # this path must also be outer joins
+ chained_from_outerjoin = True
+
+ compile_state.create_eager_joins.append(
+ (
+ self._create_eager_join,
+ entity,
+ path,
+ adapter,
+ parentmapper,
+ clauses,
+ innerjoin,
+ chained_from_outerjoin,
+ loadopt._extra_criteria if loadopt else (),
+ )
+ )
+
+ add_to_collection = compile_state.secondary_columns
+ path.set(compile_state.attributes, "eager_row_processor", clauses)
+
+ return clauses, adapter, add_to_collection, chained_from_outerjoin
+
+ def _create_eager_join(
+ self,
+ compile_state,
+ query_entity,
+ path,
+ adapter,
+ parentmapper,
+ clauses,
+ innerjoin,
+ chained_from_outerjoin,
+ extra_criteria,
+ ):
+ if parentmapper is None:
+ localparent = query_entity.mapper
+ else:
+ localparent = parentmapper
+
+ # whether or not the Query will wrap the selectable in a subquery,
+ # and then attach eager load joins to that (i.e., in the case of
+ # LIMIT/OFFSET etc.)
+ should_nest_selectable = (
+ compile_state.multi_row_eager_loaders
+ and compile_state._should_nest_selectable
+ )
+
+ query_entity_key = None
+
+ if (
+ query_entity not in compile_state.eager_joins
+ and not should_nest_selectable
+ and compile_state.from_clauses
+ ):
+ indexes = sql_util.find_left_clause_that_matches_given(
+ compile_state.from_clauses, query_entity.selectable
+ )
+
+ if len(indexes) > 1:
+ # for the eager load case, I can't reproduce this right
+ # now. For query.join() I can.
+ raise sa_exc.InvalidRequestError(
+ "Can't identify which query entity in which to joined "
+ "eager load from. Please use an exact match when "
+ "specifying the join path."
+ )
+
+ if indexes:
+ clause = compile_state.from_clauses[indexes[0]]
+ # join to an existing FROM clause on the query.
+ # key it to its list index in the eager_joins dict.
+ # Query._compile_context will adapt as needed and
+ # append to the FROM clause of the select().
+ query_entity_key, default_towrap = indexes[0], clause
+
+ if query_entity_key is None:
+ query_entity_key, default_towrap = (
+ query_entity,
+ query_entity.selectable,
+ )
+
+ towrap = compile_state.eager_joins.setdefault(
+ query_entity_key, default_towrap
+ )
+
+ if adapter:
+ if getattr(adapter, "is_aliased_class", False):
+ # joining from an adapted entity. The adapted entity
+ # might be a "with_polymorphic", so resolve that to our
+ # specific mapper's entity before looking for our attribute
+ # name on it.
+ efm = adapter.aliased_insp._entity_for_mapper(
+ localparent
+ if localparent.isa(self.parent)
+ else self.parent
+ )
+
+ # look for our attribute on the adapted entity, else fall back
+ # to our straight property
+ onclause = getattr(efm.entity, self.key, self.parent_property)
+ else:
+ onclause = getattr(
+ orm_util.AliasedClass(
+ self.parent, adapter.selectable, use_mapper_path=True
+ ),
+ self.key,
+ self.parent_property,
+ )
+
+ else:
+ onclause = self.parent_property
+
+ assert clauses.is_aliased_class
+
+ attach_on_outside = (
+ not chained_from_outerjoin
+ or not innerjoin
+ or innerjoin == "unnested"
+ or query_entity.entity_zero.represents_outer_join
+ )
+
+ extra_join_criteria = extra_criteria
+ additional_entity_criteria = compile_state.global_attributes.get(
+ ("additional_entity_criteria", self.mapper), ()
+ )
+ if additional_entity_criteria:
+ extra_join_criteria += tuple(
+ ae._resolve_where_criteria(self.mapper)
+ for ae in additional_entity_criteria
+ if ae.propagate_to_loaders
+ )
+
+ if attach_on_outside:
+ # this is the "classic" eager join case.
+ eagerjoin = orm_util._ORMJoin(
+ towrap,
+ clauses.aliased_insp,
+ onclause,
+ isouter=not innerjoin
+ or query_entity.entity_zero.represents_outer_join
+ or (chained_from_outerjoin and isinstance(towrap, sql.Join)),
+ _left_memo=self.parent,
+ _right_memo=self.mapper,
+ _extra_criteria=extra_join_criteria,
+ )
+ else:
+ # all other cases are innerjoin=='nested' approach
+ eagerjoin = self._splice_nested_inner_join(
+ path, towrap, clauses, onclause, extra_join_criteria
+ )
+
+ compile_state.eager_joins[query_entity_key] = eagerjoin
+
+ # send a hint to the Query as to where it may "splice" this join
+ eagerjoin.stop_on = query_entity.selectable
+
+ if not parentmapper:
+ # for parentclause that is the non-eager end of the join,
+ # ensure all the parent cols in the primaryjoin are actually
+ # in the
+ # columns clause (i.e. are not deferred), so that aliasing applied
+ # by the Query propagates those columns outward.
+ # This has the effect
+ # of "undefering" those columns.
+ for col in sql_util._find_columns(
+ self.parent_property.primaryjoin
+ ):
+ if localparent.persist_selectable.c.contains_column(col):
+ if adapter:
+ col = adapter.columns[col]
+ compile_state._append_dedupe_col_collection(
+ col, compile_state.primary_columns
+ )
+
+ if self.parent_property.order_by:
+ compile_state.eager_order_by += tuple(
+ (eagerjoin._target_adapter.copy_and_process)(
+ util.to_list(self.parent_property.order_by)
+ )
+ )
+
+ def _splice_nested_inner_join(
+ self, path, join_obj, clauses, onclause, extra_criteria, splicing=False
+ ):
+ # recursive fn to splice a nested join into an existing one.
+ # splicing=False means this is the outermost call, and it
+ # should return a value. splicing=<from object> is the recursive
+ # form, where it can return None to indicate the end of the recursion
+
+ if splicing is False:
+ # first call is always handed a join object
+ # from the outside
+ assert isinstance(join_obj, orm_util._ORMJoin)
+ elif isinstance(join_obj, sql.selectable.FromGrouping):
+ return self._splice_nested_inner_join(
+ path,
+ join_obj.element,
+ clauses,
+ onclause,
+ extra_criteria,
+ splicing,
+ )
+ elif not isinstance(join_obj, orm_util._ORMJoin):
+ if path[-2].isa(splicing):
+ return orm_util._ORMJoin(
+ join_obj,
+ clauses.aliased_insp,
+ onclause,
+ isouter=False,
+ _left_memo=splicing,
+ _right_memo=path[-1].mapper,
+ _extra_criteria=extra_criteria,
+ )
+ else:
+ return None
+
+ target_join = self._splice_nested_inner_join(
+ path,
+ join_obj.right,
+ clauses,
+ onclause,
+ extra_criteria,
+ join_obj._right_memo,
+ )
+ if target_join is None:
+ right_splice = False
+ target_join = self._splice_nested_inner_join(
+ path,
+ join_obj.left,
+ clauses,
+ onclause,
+ extra_criteria,
+ join_obj._left_memo,
+ )
+ if target_join is None:
+ # should only return None when recursively called,
+ # e.g. splicing refers to a from obj
+ assert (
+ splicing is not False
+ ), "assertion failed attempting to produce joined eager loads"
+ return None
+ else:
+ right_splice = True
+
+ if right_splice:
+ # for a right splice, attempt to flatten out
+ # a JOIN b JOIN c JOIN .. to avoid needless
+ # parenthesis nesting
+ if not join_obj.isouter and not target_join.isouter:
+ eagerjoin = join_obj._splice_into_center(target_join)
+ else:
+ eagerjoin = orm_util._ORMJoin(
+ join_obj.left,
+ target_join,
+ join_obj.onclause,
+ isouter=join_obj.isouter,
+ _left_memo=join_obj._left_memo,
+ )
+ else:
+ eagerjoin = orm_util._ORMJoin(
+ target_join,
+ join_obj.right,
+ join_obj.onclause,
+ isouter=join_obj.isouter,
+ _right_memo=join_obj._right_memo,
+ )
+
+ eagerjoin._target_adapter = target_join._target_adapter
+ return eagerjoin
+
+ def _create_eager_adapter(self, context, result, adapter, path, loadopt):
+ compile_state = context.compile_state
+
+ user_defined_adapter = (
+ self._init_user_defined_eager_proc(
+ loadopt, compile_state, context.attributes
+ )
+ if loadopt
+ else False
+ )
+
+ if user_defined_adapter is not False:
+ decorator = user_defined_adapter
+ # user defined eagerloads are part of the "primary"
+ # portion of the load.
+ # the adapters applied to the Query should be honored.
+ if compile_state.compound_eager_adapter and decorator:
+ decorator = decorator.wrap(
+ compile_state.compound_eager_adapter
+ )
+ elif compile_state.compound_eager_adapter:
+ decorator = compile_state.compound_eager_adapter
+ else:
+ decorator = path.get(
+ compile_state.attributes, "eager_row_processor"
+ )
+ if decorator is None:
+ return False
+
+ if self.mapper._result_has_identity_key(result, decorator):
+ return decorator
+ else:
+ # no identity key - don't return a row
+ # processor, will cause a degrade to lazy
+ return False
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ if not self.parent.class_manager[self.key].impl.supports_population:
+ raise sa_exc.InvalidRequestError(
+ "'%s' does not support object "
+ "population - eager loading cannot be applied." % self
+ )
+
+ if self.uselist:
+ context.loaders_require_uniquing = True
+
+ our_path = path[self.parent_property]
+
+ eager_adapter = self._create_eager_adapter(
+ context, result, adapter, our_path, loadopt
+ )
+
+ if eager_adapter is not False:
+ key = self.key
+
+ _instance = loading._instance_processor(
+ query_entity,
+ self.mapper,
+ context,
+ result,
+ our_path[self.entity],
+ eager_adapter,
+ )
+
+ if not self.uselist:
+ self._create_scalar_loader(context, key, _instance, populators)
+ else:
+ self._create_collection_loader(
+ context, key, _instance, populators
+ )
+ else:
+ self.parent_property._get_strategy(
+ (("lazy", "select"),)
+ ).create_row_processor(
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ )
+
+ def _create_collection_loader(self, context, key, _instance, populators):
+ def load_collection_from_joined_new_row(state, dict_, row):
+ # note this must unconditionally clear out any existing collection.
+ # an existing collection would be present only in the case of
+ # populate_existing().
+ collection = attributes.init_state_collection(state, dict_, key)
+ result_list = util.UniqueAppender(
+ collection, "append_without_event"
+ )
+ context.attributes[(state, key)] = result_list
+ inst = _instance(row)
+ if inst is not None:
+ result_list.append(inst)
+
+ def load_collection_from_joined_existing_row(state, dict_, row):
+ if (state, key) in context.attributes:
+ result_list = context.attributes[(state, key)]
+ else:
+ # appender_key can be absent from context.attributes
+ # with isnew=False when self-referential eager loading
+ # is used; the same instance may be present in two
+ # distinct sets of result columns
+ collection = attributes.init_state_collection(
+ state, dict_, key
+ )
+ result_list = util.UniqueAppender(
+ collection, "append_without_event"
+ )
+ context.attributes[(state, key)] = result_list
+ inst = _instance(row)
+ if inst is not None:
+ result_list.append(inst)
+
+ def load_collection_from_joined_exec(state, dict_, row):
+ _instance(row)
+
+ populators["new"].append(
+ (self.key, load_collection_from_joined_new_row)
+ )
+ populators["existing"].append(
+ (self.key, load_collection_from_joined_existing_row)
+ )
+ if context.invoke_all_eagers:
+ populators["eager"].append(
+ (self.key, load_collection_from_joined_exec)
+ )
+
+ def _create_scalar_loader(self, context, key, _instance, populators):
+ def load_scalar_from_joined_new_row(state, dict_, row):
+ # set a scalar object instance directly on the parent
+ # object, bypassing InstrumentedAttribute event handlers.
+ dict_[key] = _instance(row)
+
+ def load_scalar_from_joined_existing_row(state, dict_, row):
+ # call _instance on the row, even though the object has
+ # been created, so that we further descend into properties
+ existing = _instance(row)
+
+ # conflicting value already loaded, this shouldn't happen
+ if key in dict_:
+ if existing is not dict_[key]:
+ util.warn(
+ "Multiple rows returned with "
+ "uselist=False for eagerly-loaded attribute '%s' "
+ % self
+ )
+ else:
+ # this case is when one row has multiple loads of the
+ # same entity (e.g. via aliasing), one has an attribute
+ # that the other doesn't.
+ dict_[key] = existing
+
+ def load_scalar_from_joined_exec(state, dict_, row):
+ _instance(row)
+
+ populators["new"].append((self.key, load_scalar_from_joined_new_row))
+ populators["existing"].append(
+ (self.key, load_scalar_from_joined_existing_row)
+ )
+ if context.invoke_all_eagers:
+ populators["eager"].append(
+ (self.key, load_scalar_from_joined_exec)
+ )
+
+
+@log.class_logger
+@relationships.RelationshipProperty.strategy_for(lazy="selectin")
+class SelectInLoader(PostLoader, util.MemoizedSlots):
+ __slots__ = (
+ "join_depth",
+ "omit_join",
+ "_parent_alias",
+ "_query_info",
+ "_fallback_query_info",
+ )
+
+ query_info = collections.namedtuple(
+ "queryinfo",
+ [
+ "load_only_child",
+ "load_with_join",
+ "in_expr",
+ "pk_cols",
+ "zero_idx",
+ "child_lookup_cols",
+ ],
+ )
+
+ _chunksize = 500
+
+ def __init__(self, parent, strategy_key):
+ super().__init__(parent, strategy_key)
+ self.join_depth = self.parent_property.join_depth
+ is_m2o = self.parent_property.direction is interfaces.MANYTOONE
+
+ if self.parent_property.omit_join is not None:
+ self.omit_join = self.parent_property.omit_join
+ else:
+ lazyloader = self.parent_property._get_strategy(
+ (("lazy", "select"),)
+ )
+ if is_m2o:
+ self.omit_join = lazyloader.use_get
+ else:
+ self.omit_join = self.parent._get_clause[0].compare(
+ lazyloader._rev_lazywhere,
+ use_proxies=True,
+ compare_keys=False,
+ equivalents=self.parent._equivalent_columns,
+ )
+
+ if self.omit_join:
+ if is_m2o:
+ self._query_info = self._init_for_omit_join_m2o()
+ self._fallback_query_info = self._init_for_join()
+ else:
+ self._query_info = self._init_for_omit_join()
+ else:
+ self._query_info = self._init_for_join()
+
+ def _init_for_omit_join(self):
+ pk_to_fk = dict(
+ self.parent_property._join_condition.local_remote_pairs
+ )
+ pk_to_fk.update(
+ (equiv, pk_to_fk[k])
+ for k in list(pk_to_fk)
+ for equiv in self.parent._equivalent_columns.get(k, ())
+ )
+
+ pk_cols = fk_cols = [
+ pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
+ ]
+ if len(fk_cols) > 1:
+ in_expr = sql.tuple_(*fk_cols)
+ zero_idx = False
+ else:
+ in_expr = fk_cols[0]
+ zero_idx = True
+
+ return self.query_info(False, False, in_expr, pk_cols, zero_idx, None)
+
+ def _init_for_omit_join_m2o(self):
+ pk_cols = self.mapper.primary_key
+ if len(pk_cols) > 1:
+ in_expr = sql.tuple_(*pk_cols)
+ zero_idx = False
+ else:
+ in_expr = pk_cols[0]
+ zero_idx = True
+
+ lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
+ lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]
+
+ return self.query_info(
+ True, False, in_expr, pk_cols, zero_idx, lookup_cols
+ )
+
+ def _init_for_join(self):
+ self._parent_alias = AliasedClass(self.parent.class_)
+ pa_insp = inspect(self._parent_alias)
+ pk_cols = [
+ pa_insp._adapt_element(col) for col in self.parent.primary_key
+ ]
+ if len(pk_cols) > 1:
+ in_expr = sql.tuple_(*pk_cols)
+ zero_idx = False
+ else:
+ in_expr = pk_cols[0]
+ zero_idx = True
+ return self.query_info(False, True, in_expr, pk_cols, zero_idx, None)
+
+ def init_class_attribute(self, mapper):
+ self.parent_property._get_strategy(
+ (("lazy", "select"),)
+ ).init_class_attribute(mapper)
+
+ def create_row_processor(
+ self,
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ ):
+ if context.refresh_state:
+ return self._immediateload_create_row_processor(
+ context,
+ query_entity,
+ path,
+ loadopt,
+ mapper,
+ result,
+ adapter,
+ populators,
+ )
+
+ (
+ effective_path,
+ run_loader,
+ execution_options,
+ recursion_depth,
+ ) = self._setup_for_recursion(
+ context, path, loadopt, join_depth=self.join_depth
+ )
+
+ if not run_loader:
+ return
+
+ if not self.parent.class_manager[self.key].impl.supports_population:
+ raise sa_exc.InvalidRequestError(
+ "'%s' does not support object "
+ "population - eager loading cannot be applied." % self
+ )
+
+ # a little dance here as the "path" is still something that only
+ # semi-tracks the exact series of things we are loading, still not
+ # telling us about with_polymorphic() and stuff like that when it's at
+ # the root.. the initial MapperEntity is more accurate for this case.
+ if len(path) == 1:
+ if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
+ return
+ elif not orm_util._entity_isa(path[-1], self.parent):
+ return
+
+ selectin_path = effective_path
+
+ path_w_prop = path[self.parent_property]
+
+ # build up a path indicating the path from the leftmost
+ # entity to the thing we're subquery loading.
+ with_poly_entity = path_w_prop.get(
+ context.attributes, "path_with_polymorphic", None
+ )
+ if with_poly_entity is not None:
+ effective_entity = inspect(with_poly_entity)
+ else:
+ effective_entity = self.entity
+
+ loading.PostLoad.callable_for_path(
+ context,
+ selectin_path,
+ self.parent,
+ self.parent_property,
+ self._load_for_path,
+ effective_entity,
+ loadopt,
+ recursion_depth,
+ execution_options,
+ )
+
+ def _load_for_path(
+ self,
+ context,
+ path,
+ states,
+ load_only,
+ effective_entity,
+ loadopt,
+ recursion_depth,
+ execution_options,
+ ):
+ if load_only and self.key not in load_only:
+ return
+
+ query_info = self._query_info
+
+ if query_info.load_only_child:
+ our_states = collections.defaultdict(list)
+ none_states = []
+
+ mapper = self.parent
+
+ for state, overwrite in states:
+ state_dict = state.dict
+ related_ident = tuple(
+ mapper._get_state_attr_by_column(
+ state,
+ state_dict,
+ lk,
+ passive=attributes.PASSIVE_NO_FETCH,
+ )
+ for lk in query_info.child_lookup_cols
+ )
+ # if the loaded parent objects do not have the foreign key
+ # to the related item loaded, then degrade into the joined
+ # version of selectinload
+ if LoaderCallableStatus.PASSIVE_NO_RESULT in related_ident:
+ query_info = self._fallback_query_info
+ break
+
+ # organize states into lists keyed to particular foreign
+ # key values.
+ if None not in related_ident:
+ our_states[related_ident].append(
+ (state, state_dict, overwrite)
+ )
+ else:
+ # For FK values that have None, add them to a
+ # separate collection that will be populated separately
+ none_states.append((state, state_dict, overwrite))
+
+ # note the above conditional may have changed query_info
+ if not query_info.load_only_child:
+ our_states = [
+ (state.key[1], state, state.dict, overwrite)
+ for state, overwrite in states
+ ]
+
+ pk_cols = query_info.pk_cols
+ in_expr = query_info.in_expr
+
+ if not query_info.load_with_join:
+ # in "omit join" mode, the primary key column and the
+ # "in" expression are in terms of the related entity. So
+ # if the related entity is polymorphic or otherwise aliased,
+ # we need to adapt our "pk_cols" and "in_expr" to that
+ # entity. in non-"omit join" mode, these are against the
+ # parent entity and do not need adaption.
+ if effective_entity.is_aliased_class:
+ pk_cols = [
+ effective_entity._adapt_element(col) for col in pk_cols
+ ]
+ in_expr = effective_entity._adapt_element(in_expr)
+
+ bundle_ent = orm_util.Bundle("pk", *pk_cols)
+ bundle_sql = bundle_ent.__clause_element__()
+
+ entity_sql = effective_entity.__clause_element__()
+ q = Select._create_raw_select(
+ _raw_columns=[bundle_sql, entity_sql],
+ _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
+ _compile_options=ORMCompileState.default_compile_options,
+ _propagate_attrs={
+ "compile_state_plugin": "orm",
+ "plugin_subject": effective_entity,
+ },
+ )
+
+ if not query_info.load_with_join:
+ # the Bundle we have in the "omit_join" case is against raw, non
+ # annotated columns, so to ensure the Query knows its primary
+ # entity, we add it explicitly. If we made the Bundle against
+ # annotated columns, we hit a performance issue in this specific
+ # case, which is detailed in issue #4347.
+ q = q.select_from(effective_entity)
+ else:
+ # in the non-omit_join case, the Bundle is against the annotated/
+ # mapped column of the parent entity, but the #4347 issue does not
+ # occur in this case.
+ q = q.select_from(self._parent_alias).join(
+ getattr(self._parent_alias, self.parent_property.key).of_type(
+ effective_entity
+ )
+ )
+
+ q = q.filter(in_expr.in_(sql.bindparam("primary_keys")))
+
+ # a test which exercises what these comments talk about is
+ # test_selectin_relations.py -> test_twolevel_selectin_w_polymorphic
+ #
+ # effective_entity above is given to us in terms of the cached
+ # statement, namely this one:
+ orig_query = context.compile_state.select_statement
+
+ # the actual statement that was requested is this one:
+ # context_query = context.query
+ #
+ # that's not the cached one, however. So while it is of the identical
+ # structure, if it has entities like AliasedInsp, which we get from
+ # aliased() or with_polymorphic(), the AliasedInsp will likely be a
+ # different object identity each time, and will not match up
+ # hashing-wise to the corresponding AliasedInsp that's in the
+ # cached query, meaning it won't match on paths and loader lookups
+ # and loaders like this one will be skipped if it is used in options.
+ #
+ # as it turns out, standard loader options like selectinload(),
+ # lazyload() that have a path need
+ # to come from the cached query so that the AliasedInsp etc. objects
+ # that are in the query line up with the object that's in the path
+ # of the strategy object. however other options like
+ # with_loader_criteria() that doesn't have a path (has a fixed entity)
+ # and needs to have access to the latest closure state in order to
+ # be correct, we need to use the uncached one.
+ #
+ # as of #8399 we let the loader option itself figure out what it
+ # wants to do given cached and uncached version of itself.
+
+ effective_path = path[self.parent_property]
+
+ if orig_query is context.query:
+ new_options = orig_query._with_options
+ else:
+ cached_options = orig_query._with_options
+ uncached_options = context.query._with_options
+
+ # propagate compile state options from the original query,
+ # updating their "extra_criteria" as necessary.
+ # note this will create a different cache key than
+ # "orig" options if extra_criteria is present, because the copy
+ # of extra_criteria will have different boundparam than that of
+ # the QueryableAttribute in the path
+ new_options = [
+ orig_opt._adapt_cached_option_to_uncached_option(
+ context, uncached_opt
+ )
+ for orig_opt, uncached_opt in zip(
+ cached_options, uncached_options
+ )
+ ]
+
+ if loadopt and loadopt._extra_criteria:
+ new_options += (
+ orm_util.LoaderCriteriaOption(
+ effective_entity,
+ loadopt._generate_extra_criteria(context),
+ ),
+ )
+
+ if recursion_depth is not None:
+ effective_path = effective_path._truncate_recursive()
+
+ q = q.options(*new_options)
+
+ q = q._update_compile_options({"_current_path": effective_path})
+ if context.populate_existing:
+ q = q.execution_options(populate_existing=True)
+
+ if self.parent_property.order_by:
+ if not query_info.load_with_join:
+ eager_order_by = self.parent_property.order_by
+ if effective_entity.is_aliased_class:
+ eager_order_by = [
+ effective_entity._adapt_element(elem)
+ for elem in eager_order_by
+ ]
+ q = q.order_by(*eager_order_by)
+ else:
+
+ def _setup_outermost_orderby(compile_context):
+ compile_context.eager_order_by += tuple(
+ util.to_list(self.parent_property.order_by)
+ )
+
+ q = q._add_context_option(
+ _setup_outermost_orderby, self.parent_property
+ )
+
+ if query_info.load_only_child:
+ self._load_via_child(
+ our_states,
+ none_states,
+ query_info,
+ q,
+ context,
+ execution_options,
+ )
+ else:
+ self._load_via_parent(
+ our_states, query_info, q, context, execution_options
+ )
+
+ def _load_via_child(
+ self,
+ our_states,
+ none_states,
+ query_info,
+ q,
+ context,
+ execution_options,
+ ):
+ uselist = self.uselist
+
+ # this sort is really for the benefit of the unit tests
+ our_keys = sorted(our_states)
+ while our_keys:
+ chunk = our_keys[0 : self._chunksize]
+ our_keys = our_keys[self._chunksize :]
+ data = {
+ k: v
+ for k, v in context.session.execute(
+ q,
+ params={
+ "primary_keys": [
+ key[0] if query_info.zero_idx else key
+ for key in chunk
+ ]
+ },
+ execution_options=execution_options,
+ ).unique()
+ }
+
+ for key in chunk:
+ # for a real foreign key and no concurrent changes to the
+ # DB while running this method, "key" is always present in
+ # data. However, for primaryjoins without real foreign keys
+ # a non-None primaryjoin condition may still refer to no
+ # related object.
+ related_obj = data.get(key, None)
+ for state, dict_, overwrite in our_states[key]:
+ if not overwrite and self.key in dict_:
+ continue
+
+ state.get_impl(self.key).set_committed_value(
+ state,
+ dict_,
+ related_obj if not uselist else [related_obj],
+ )
+ # populate none states with empty value / collection
+ for state, dict_, overwrite in none_states:
+ if not overwrite and self.key in dict_:
+ continue
+
+ # note it's OK if this is a uselist=True attribute, the empty
+ # collection will be populated
+ state.get_impl(self.key).set_committed_value(state, dict_, None)
+
+ def _load_via_parent(
+ self, our_states, query_info, q, context, execution_options
+ ):
+ uselist = self.uselist
+ _empty_result = () if uselist else None
+
+ while our_states:
+ chunk = our_states[0 : self._chunksize]
+ our_states = our_states[self._chunksize :]
+
+ primary_keys = [
+ key[0] if query_info.zero_idx else key
+ for key, state, state_dict, overwrite in chunk
+ ]
+
+ data = collections.defaultdict(list)
+ for k, v in itertools.groupby(
+ context.session.execute(
+ q,
+ params={"primary_keys": primary_keys},
+ execution_options=execution_options,
+ ).unique(),
+ lambda x: x[0],
+ ):
+ data[k].extend(vv[1] for vv in v)
+
+ for key, state, state_dict, overwrite in chunk:
+ if not overwrite and self.key in state_dict:
+ continue
+
+ collection = data.get(key, _empty_result)
+
+ if not uselist and collection:
+ if len(collection) > 1:
+ util.warn(
+ "Multiple rows returned with "
+ "uselist=False for eagerly-loaded "
+ "attribute '%s' " % self
+ )
+ state.get_impl(self.key).set_committed_value(
+ state, state_dict, collection[0]
+ )
+ else:
+ # note that empty tuple set on uselist=False sets the
+ # value to None
+ state.get_impl(self.key).set_committed_value(
+ state, state_dict, collection
+ )
+
+
+def single_parent_validator(desc, prop):
+ def _do_check(state, value, oldvalue, initiator):
+ if value is not None and initiator.key == prop.key:
+ hasparent = initiator.hasparent(attributes.instance_state(value))
+ if hasparent and oldvalue is not value:
+ raise sa_exc.InvalidRequestError(
+ "Instance %s is already associated with an instance "
+ "of %s via its %s attribute, and is only allowed a "
+ "single parent."
+ % (orm_util.instance_str(value), state.class_, prop),
+ code="bbf1",
+ )
+ return value
+
+ def append(state, value, initiator):
+ return _do_check(state, value, None, initiator)
+
+ def set_(state, value, oldvalue, initiator):
+ return _do_check(state, value, oldvalue, initiator)
+
+ event.listen(
+ desc, "append", append, raw=True, retval=True, active_history=True
+ )
+ event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)