summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/orm/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/context.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/orm/context.py3243
1 files changed, 0 insertions, 3243 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/context.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/context.py
deleted file mode 100644
index 3056016..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/orm/context.py
+++ /dev/null
@@ -1,3243 +0,0 @@
-# orm/context.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-from __future__ import annotations
-
-import itertools
-from typing import Any
-from typing import cast
-from typing import Dict
-from typing import Iterable
-from typing import List
-from typing import Optional
-from typing import Set
-from typing import Tuple
-from typing import Type
-from typing import TYPE_CHECKING
-from typing import TypeVar
-from typing import Union
-
-from . import attributes
-from . import interfaces
-from . import loading
-from .base import _is_aliased_class
-from .interfaces import ORMColumnDescription
-from .interfaces import ORMColumnsClauseRole
-from .path_registry import PathRegistry
-from .util import _entity_corresponds_to
-from .util import _ORMJoin
-from .util import _TraceAdaptRole
-from .util import AliasedClass
-from .util import Bundle
-from .util import ORMAdapter
-from .util import ORMStatementAdapter
-from .. import exc as sa_exc
-from .. import future
-from .. import inspect
-from .. import sql
-from .. import util
-from ..sql import coercions
-from ..sql import expression
-from ..sql import roles
-from ..sql import util as sql_util
-from ..sql import visitors
-from ..sql._typing import _TP
-from ..sql._typing import is_dml
-from ..sql._typing import is_insert_update
-from ..sql._typing import is_select_base
-from ..sql.base import _select_iterables
-from ..sql.base import CacheableOptions
-from ..sql.base import CompileState
-from ..sql.base import Executable
-from ..sql.base import Generative
-from ..sql.base import Options
-from ..sql.dml import UpdateBase
-from ..sql.elements import GroupedElement
-from ..sql.elements import TextClause
-from ..sql.selectable import CompoundSelectState
-from ..sql.selectable import LABEL_STYLE_DISAMBIGUATE_ONLY
-from ..sql.selectable import LABEL_STYLE_NONE
-from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
-from ..sql.selectable import Select
-from ..sql.selectable import SelectLabelStyle
-from ..sql.selectable import SelectState
-from ..sql.selectable import TypedReturnsRows
-from ..sql.visitors import InternalTraversal
-
-if TYPE_CHECKING:
- from ._typing import _InternalEntityType
- from ._typing import OrmExecuteOptionsParameter
- from .loading import PostLoad
- from .mapper import Mapper
- from .query import Query
- from .session import _BindArguments
- from .session import Session
- from ..engine import Result
- from ..engine.interfaces import _CoreSingleExecuteParams
- from ..sql._typing import _ColumnsClauseArgument
- from ..sql.compiler import SQLCompiler
- from ..sql.dml import _DMLTableElement
- from ..sql.elements import ColumnElement
- from ..sql.selectable import _JoinTargetElement
- from ..sql.selectable import _LabelConventionCallable
- from ..sql.selectable import _SetupJoinsElement
- from ..sql.selectable import ExecutableReturnsRows
- from ..sql.selectable import SelectBase
- from ..sql.type_api import TypeEngine
-
-_T = TypeVar("_T", bound=Any)
-_path_registry = PathRegistry.root
-
-_EMPTY_DICT = util.immutabledict()
-
-
-LABEL_STYLE_LEGACY_ORM = SelectLabelStyle.LABEL_STYLE_LEGACY_ORM
-
-
-class QueryContext:
- __slots__ = (
- "top_level_context",
- "compile_state",
- "query",
- "params",
- "load_options",
- "bind_arguments",
- "execution_options",
- "session",
- "autoflush",
- "populate_existing",
- "invoke_all_eagers",
- "version_check",
- "refresh_state",
- "create_eager_joins",
- "propagated_loader_options",
- "attributes",
- "runid",
- "partials",
- "post_load_paths",
- "identity_token",
- "yield_per",
- "loaders_require_buffering",
- "loaders_require_uniquing",
- )
-
- runid: int
- post_load_paths: Dict[PathRegistry, PostLoad]
- compile_state: ORMCompileState
-
- class default_load_options(Options):
- _only_return_tuples = False
- _populate_existing = False
- _version_check = False
- _invoke_all_eagers = True
- _autoflush = True
- _identity_token = None
- _yield_per = None
- _refresh_state = None
- _lazy_loaded_from = None
- _legacy_uniquing = False
- _sa_top_level_orm_context = None
- _is_user_refresh = False
-
- def __init__(
- self,
- compile_state: CompileState,
- statement: Union[Select[Any], FromStatement[Any]],
- params: _CoreSingleExecuteParams,
- session: Session,
- load_options: Union[
- Type[QueryContext.default_load_options],
- QueryContext.default_load_options,
- ],
- execution_options: Optional[OrmExecuteOptionsParameter] = None,
- bind_arguments: Optional[_BindArguments] = None,
- ):
- self.load_options = load_options
- self.execution_options = execution_options or _EMPTY_DICT
- self.bind_arguments = bind_arguments or _EMPTY_DICT
- self.compile_state = compile_state
- self.query = statement
- self.session = session
- self.loaders_require_buffering = False
- self.loaders_require_uniquing = False
- self.params = params
- self.top_level_context = load_options._sa_top_level_orm_context
-
- cached_options = compile_state.select_statement._with_options
- uncached_options = statement._with_options
-
- # see issue #7447 , #8399 for some background
- # propagated loader options will be present on loaded InstanceState
- # objects under state.load_options and are typically used by
- # LazyLoader to apply options to the SELECT statement it emits.
- # For compile state options (i.e. loader strategy options), these
- # need to line up with the ".load_path" attribute which in
- # loader.py is pulled from context.compile_state.current_path.
- # so, this means these options have to be the ones from the
- # *cached* statement that's travelling with compile_state, not the
- # *current* statement which won't match up for an ad-hoc
- # AliasedClass
- self.propagated_loader_options = tuple(
- opt._adapt_cached_option_to_uncached_option(self, uncached_opt)
- for opt, uncached_opt in zip(cached_options, uncached_options)
- if opt.propagate_to_loaders
- )
-
- self.attributes = dict(compile_state.attributes)
-
- self.autoflush = load_options._autoflush
- self.populate_existing = load_options._populate_existing
- self.invoke_all_eagers = load_options._invoke_all_eagers
- self.version_check = load_options._version_check
- self.refresh_state = load_options._refresh_state
- self.yield_per = load_options._yield_per
- self.identity_token = load_options._identity_token
-
- def _get_top_level_context(self) -> QueryContext:
- return self.top_level_context or self
-
-
-_orm_load_exec_options = util.immutabledict(
- {"_result_disable_adapt_to_context": True}
-)
-
-
-class AbstractORMCompileState(CompileState):
- is_dml_returning = False
-
- def _init_global_attributes(
- self, statement, compiler, *, toplevel, process_criteria_for_toplevel
- ):
- self.attributes = {}
-
- if compiler is None:
- # this is the legacy / testing only ORM _compile_state() use case.
- # there is no need to apply criteria options for this.
- self.global_attributes = ga = {}
- assert toplevel
- return
- else:
- self.global_attributes = ga = compiler._global_attributes
-
- if toplevel:
- ga["toplevel_orm"] = True
-
- if process_criteria_for_toplevel:
- for opt in statement._with_options:
- if opt._is_criteria_option:
- opt.process_compile_state(self)
-
- return
- elif ga.get("toplevel_orm", False):
- return
-
- stack_0 = compiler.stack[0]
-
- try:
- toplevel_stmt = stack_0["selectable"]
- except KeyError:
- pass
- else:
- for opt in toplevel_stmt._with_options:
- if opt._is_compile_state and opt._is_criteria_option:
- opt.process_compile_state(self)
-
- ga["toplevel_orm"] = True
-
- @classmethod
- def create_for_statement(
- cls,
- statement: Union[Select, FromStatement],
- compiler: Optional[SQLCompiler],
- **kw: Any,
- ) -> AbstractORMCompileState:
- """Create a context for a statement given a :class:`.Compiler`.
-
- This method is always invoked in the context of SQLCompiler.process().
-
- For a Select object, this would be invoked from
- SQLCompiler.visit_select(). For the special FromStatement object used
- by Query to indicate "Query.from_statement()", this is called by
- FromStatement._compiler_dispatch() that would be called by
- SQLCompiler.process().
- """
- return super().create_for_statement(statement, compiler, **kw)
-
- @classmethod
- def orm_pre_session_exec(
- cls,
- session,
- statement,
- params,
- execution_options,
- bind_arguments,
- is_pre_event,
- ):
- raise NotImplementedError()
-
- @classmethod
- def orm_execute_statement(
- cls,
- session,
- statement,
- params,
- execution_options,
- bind_arguments,
- conn,
- ) -> Result:
- result = conn.execute(
- statement, params or {}, execution_options=execution_options
- )
- return cls.orm_setup_cursor_result(
- session,
- statement,
- params,
- execution_options,
- bind_arguments,
- result,
- )
-
- @classmethod
- def orm_setup_cursor_result(
- cls,
- session,
- statement,
- params,
- execution_options,
- bind_arguments,
- result,
- ):
- raise NotImplementedError()
-
-
-class AutoflushOnlyORMCompileState(AbstractORMCompileState):
- """ORM compile state that is a passthrough, except for autoflush."""
-
- @classmethod
- def orm_pre_session_exec(
- cls,
- session,
- statement,
- params,
- execution_options,
- bind_arguments,
- is_pre_event,
- ):
- # consume result-level load_options. These may have been set up
- # in an ORMExecuteState hook
- (
- load_options,
- execution_options,
- ) = QueryContext.default_load_options.from_execution_options(
- "_sa_orm_load_options",
- {
- "autoflush",
- },
- execution_options,
- statement._execution_options,
- )
-
- if not is_pre_event and load_options._autoflush:
- session._autoflush()
-
- return statement, execution_options
-
- @classmethod
- def orm_setup_cursor_result(
- cls,
- session,
- statement,
- params,
- execution_options,
- bind_arguments,
- result,
- ):
- return result
-
-
-class ORMCompileState(AbstractORMCompileState):
- class default_compile_options(CacheableOptions):
- _cache_key_traversal = [
- ("_use_legacy_query_style", InternalTraversal.dp_boolean),
- ("_for_statement", InternalTraversal.dp_boolean),
- ("_bake_ok", InternalTraversal.dp_boolean),
- ("_current_path", InternalTraversal.dp_has_cache_key),
- ("_enable_single_crit", InternalTraversal.dp_boolean),
- ("_enable_eagerloads", InternalTraversal.dp_boolean),
- ("_only_load_props", InternalTraversal.dp_plain_obj),
- ("_set_base_alias", InternalTraversal.dp_boolean),
- ("_for_refresh_state", InternalTraversal.dp_boolean),
- ("_render_for_subquery", InternalTraversal.dp_boolean),
- ("_is_star", InternalTraversal.dp_boolean),
- ]
-
- # set to True by default from Query._statement_20(), to indicate
- # the rendered query should look like a legacy ORM query. right
- # now this basically indicates we should use tablename_columnname
- # style labels. Generally indicates the statement originated
- # from a Query object.
- _use_legacy_query_style = False
-
- # set *only* when we are coming from the Query.statement
- # accessor, or a Query-level equivalent such as
- # query.subquery(). this supersedes "toplevel".
- _for_statement = False
-
- _bake_ok = True
- _current_path = _path_registry
- _enable_single_crit = True
- _enable_eagerloads = True
- _only_load_props = None
- _set_base_alias = False
- _for_refresh_state = False
- _render_for_subquery = False
- _is_star = False
-
- attributes: Dict[Any, Any]
- global_attributes: Dict[Any, Any]
-
- statement: Union[Select[Any], FromStatement[Any]]
- select_statement: Union[Select[Any], FromStatement[Any]]
- _entities: List[_QueryEntity]
- _polymorphic_adapters: Dict[_InternalEntityType, ORMAdapter]
- compile_options: Union[
- Type[default_compile_options], default_compile_options
- ]
- _primary_entity: Optional[_QueryEntity]
- use_legacy_query_style: bool
- _label_convention: _LabelConventionCallable
- primary_columns: List[ColumnElement[Any]]
- secondary_columns: List[ColumnElement[Any]]
- dedupe_columns: Set[ColumnElement[Any]]
- create_eager_joins: List[
- # TODO: this structure is set up by JoinedLoader
- Tuple[Any, ...]
- ]
- current_path: PathRegistry = _path_registry
- _has_mapper_entities = False
-
- def __init__(self, *arg, **kw):
- raise NotImplementedError()
-
- if TYPE_CHECKING:
-
- @classmethod
- def create_for_statement(
- cls,
- statement: Union[Select, FromStatement],
- compiler: Optional[SQLCompiler],
- **kw: Any,
- ) -> ORMCompileState: ...
-
- def _append_dedupe_col_collection(self, obj, col_collection):
- dedupe = self.dedupe_columns
- if obj not in dedupe:
- dedupe.add(obj)
- col_collection.append(obj)
-
- @classmethod
- def _column_naming_convention(
- cls, label_style: SelectLabelStyle, legacy: bool
- ) -> _LabelConventionCallable:
- if legacy:
-
- def name(col, col_name=None):
- if col_name:
- return col_name
- else:
- return getattr(col, "key")
-
- return name
- else:
- return SelectState._column_naming_convention(label_style)
-
- @classmethod
- def get_column_descriptions(cls, statement):
- return _column_descriptions(statement)
-
- @classmethod
- def orm_pre_session_exec(
- cls,
- session,
- statement,
- params,
- execution_options,
- bind_arguments,
- is_pre_event,
- ):
- # consume result-level load_options. These may have been set up
- # in an ORMExecuteState hook
- (
- load_options,
- execution_options,
- ) = QueryContext.default_load_options.from_execution_options(
- "_sa_orm_load_options",
- {
- "populate_existing",
- "autoflush",
- "yield_per",
- "identity_token",
- "sa_top_level_orm_context",
- },
- execution_options,
- statement._execution_options,
- )
-
- # default execution options for ORM results:
- # 1. _result_disable_adapt_to_context=True
- # this will disable the ResultSetMetadata._adapt_to_context()
- # step which we don't need, as we have result processors cached
- # against the original SELECT statement before caching.
-
- if "sa_top_level_orm_context" in execution_options:
- ctx = execution_options["sa_top_level_orm_context"]
- execution_options = ctx.query._execution_options.merge_with(
- ctx.execution_options, execution_options
- )
-
- if not execution_options:
- execution_options = _orm_load_exec_options
- else:
- execution_options = execution_options.union(_orm_load_exec_options)
-
- # would have been placed here by legacy Query only
- if load_options._yield_per:
- execution_options = execution_options.union(
- {"yield_per": load_options._yield_per}
- )
-
- if (
- getattr(statement._compile_options, "_current_path", None)
- and len(statement._compile_options._current_path) > 10
- and execution_options.get("compiled_cache", True) is not None
- ):
- execution_options: util.immutabledict[str, Any] = (
- execution_options.union(
- {
- "compiled_cache": None,
- "_cache_disable_reason": "excess depth for "
- "ORM loader options",
- }
- )
- )
-
- bind_arguments["clause"] = statement
-
- # new in 1.4 - the coercions system is leveraged to allow the
- # "subject" mapper of a statement be propagated to the top
- # as the statement is built. "subject" mapper is the generally
- # standard object used as an identifier for multi-database schemes.
-
- # we are here based on the fact that _propagate_attrs contains
- # "compile_state_plugin": "orm". The "plugin_subject"
- # needs to be present as well.
-
- try:
- plugin_subject = statement._propagate_attrs["plugin_subject"]
- except KeyError:
- assert False, "statement had 'orm' plugin but no plugin_subject"
- else:
- if plugin_subject:
- bind_arguments["mapper"] = plugin_subject.mapper
-
- if not is_pre_event and load_options._autoflush:
- session._autoflush()
-
- return statement, execution_options
-
- @classmethod
- def orm_setup_cursor_result(
- cls,
- session,
- statement,
- params,
- execution_options,
- bind_arguments,
- result,
- ):
- execution_context = result.context
- compile_state = execution_context.compiled.compile_state
-
- # cover edge case where ORM entities used in legacy select
- # were passed to session.execute:
- # session.execute(legacy_select([User.id, User.name]))
- # see test_query->test_legacy_tuple_old_select
-
- load_options = execution_options.get(
- "_sa_orm_load_options", QueryContext.default_load_options
- )
-
- if compile_state.compile_options._is_star:
- return result
-
- querycontext = QueryContext(
- compile_state,
- statement,
- params,
- session,
- load_options,
- execution_options,
- bind_arguments,
- )
- return loading.instances(result, querycontext)
-
- @property
- def _lead_mapper_entities(self):
- """return all _MapperEntity objects in the lead entities collection.
-
- Does **not** include entities that have been replaced by
- with_entities(), with_only_columns()
-
- """
- return [
- ent for ent in self._entities if isinstance(ent, _MapperEntity)
- ]
-
- def _create_with_polymorphic_adapter(self, ext_info, selectable):
- """given MapperEntity or ORMColumnEntity, setup polymorphic loading
- if called for by the Mapper.
-
- As of #8168 in 2.0.0rc1, polymorphic adapters, which greatly increase
- the complexity of the query creation process, are not used at all
- except in the quasi-legacy cases of with_polymorphic referring to an
- alias and/or subquery. This would apply to concrete polymorphic
- loading, and joined inheritance where a subquery is
- passed to with_polymorphic (which is completely unnecessary in modern
- use).
-
- """
- if (
- not ext_info.is_aliased_class
- and ext_info.mapper.persist_selectable
- not in self._polymorphic_adapters
- ):
- for mp in ext_info.mapper.iterate_to_root():
- self._mapper_loads_polymorphically_with(
- mp,
- ORMAdapter(
- _TraceAdaptRole.WITH_POLYMORPHIC_ADAPTER,
- mp,
- equivalents=mp._equivalent_columns,
- selectable=selectable,
- ),
- )
-
- def _mapper_loads_polymorphically_with(self, mapper, adapter):
- for m2 in mapper._with_polymorphic_mappers or [mapper]:
- self._polymorphic_adapters[m2] = adapter
-
- for m in m2.iterate_to_root():
- self._polymorphic_adapters[m.local_table] = adapter
-
- @classmethod
- def _create_entities_collection(cls, query, legacy):
- raise NotImplementedError(
- "this method only works for ORMSelectCompileState"
- )
-
-
-class DMLReturningColFilter:
- """an adapter used for the DML RETURNING case.
-
- Has a subset of the interface used by
- :class:`.ORMAdapter` and is used for :class:`._QueryEntity`
- instances to set up their columns as used in RETURNING for a
- DML statement.
-
- """
-
- __slots__ = ("mapper", "columns", "__weakref__")
-
- def __init__(self, target_mapper, immediate_dml_mapper):
- if (
- immediate_dml_mapper is not None
- and target_mapper.local_table
- is not immediate_dml_mapper.local_table
- ):
- # joined inh, or in theory other kinds of multi-table mappings
- self.mapper = immediate_dml_mapper
- else:
- # single inh, normal mappings, etc.
- self.mapper = target_mapper
- self.columns = self.columns = util.WeakPopulateDict(
- self.adapt_check_present # type: ignore
- )
-
- def __call__(self, col, as_filter):
- for cc in sql_util._find_columns(col):
- c2 = self.adapt_check_present(cc)
- if c2 is not None:
- return col
- else:
- return None
-
- def adapt_check_present(self, col):
- mapper = self.mapper
- prop = mapper._columntoproperty.get(col, None)
- if prop is None:
- return None
- return mapper.local_table.c.corresponding_column(col)
-
-
-@sql.base.CompileState.plugin_for("orm", "orm_from_statement")
-class ORMFromStatementCompileState(ORMCompileState):
- _from_obj_alias = None
- _has_mapper_entities = False
-
- statement_container: FromStatement
- requested_statement: Union[SelectBase, TextClause, UpdateBase]
- dml_table: Optional[_DMLTableElement] = None
-
- _has_orm_entities = False
- multi_row_eager_loaders = False
- eager_adding_joins = False
- compound_eager_adapter = None
-
- extra_criteria_entities = _EMPTY_DICT
- eager_joins = _EMPTY_DICT
-
- @classmethod
- def create_for_statement(
- cls,
- statement_container: Union[Select, FromStatement],
- compiler: Optional[SQLCompiler],
- **kw: Any,
- ) -> ORMFromStatementCompileState:
- assert isinstance(statement_container, FromStatement)
-
- if compiler is not None and compiler.stack:
- raise sa_exc.CompileError(
- "The ORM FromStatement construct only supports being "
- "invoked as the topmost statement, as it is only intended to "
- "define how result rows should be returned."
- )
-
- self = cls.__new__(cls)
- self._primary_entity = None
-
- self.use_legacy_query_style = (
- statement_container._compile_options._use_legacy_query_style
- )
- self.statement_container = self.select_statement = statement_container
- self.requested_statement = statement = statement_container.element
-
- if statement.is_dml:
- self.dml_table = statement.table
- self.is_dml_returning = True
-
- self._entities = []
- self._polymorphic_adapters = {}
-
- self.compile_options = statement_container._compile_options
-
- if (
- self.use_legacy_query_style
- and isinstance(statement, expression.SelectBase)
- and not statement._is_textual
- and not statement.is_dml
- and statement._label_style is LABEL_STYLE_NONE
- ):
- self.statement = statement.set_label_style(
- LABEL_STYLE_TABLENAME_PLUS_COL
- )
- else:
- self.statement = statement
-
- self._label_convention = self._column_naming_convention(
- (
- statement._label_style
- if not statement._is_textual and not statement.is_dml
- else LABEL_STYLE_NONE
- ),
- self.use_legacy_query_style,
- )
-
- _QueryEntity.to_compile_state(
- self,
- statement_container._raw_columns,
- self._entities,
- is_current_entities=True,
- )
-
- self.current_path = statement_container._compile_options._current_path
-
- self._init_global_attributes(
- statement_container,
- compiler,
- process_criteria_for_toplevel=False,
- toplevel=True,
- )
-
- if statement_container._with_options:
- for opt in statement_container._with_options:
- if opt._is_compile_state:
- opt.process_compile_state(self)
-
- if statement_container._with_context_options:
- for fn, key in statement_container._with_context_options:
- fn(self)
-
- self.primary_columns = []
- self.secondary_columns = []
- self.dedupe_columns = set()
- self.create_eager_joins = []
- self._fallback_from_clauses = []
-
- self.order_by = None
-
- if isinstance(self.statement, expression.TextClause):
- # TextClause has no "column" objects at all. for this case,
- # we generate columns from our _QueryEntity objects, then
- # flip on all the "please match no matter what" parameters.
- self.extra_criteria_entities = {}
-
- for entity in self._entities:
- entity.setup_compile_state(self)
-
- compiler._ordered_columns = compiler._textual_ordered_columns = (
- False
- )
-
- # enable looser result column matching. this is shown to be
- # needed by test_query.py::TextTest
- compiler._loose_column_name_matching = True
-
- for c in self.primary_columns:
- compiler.process(
- c,
- within_columns_clause=True,
- add_to_result_map=compiler._add_to_result_map,
- )
- else:
- # for everyone else, Select, Insert, Update, TextualSelect, they
- # have column objects already. After much
- # experimentation here, the best approach seems to be, use
- # those columns completely, don't interfere with the compiler
- # at all; just in ORM land, use an adapter to convert from
- # our ORM columns to whatever columns are in the statement,
- # before we look in the result row. Adapt on names
- # to accept cases such as issue #9217, however also allow
- # this to be overridden for cases such as #9273.
- self._from_obj_alias = ORMStatementAdapter(
- _TraceAdaptRole.ADAPT_FROM_STATEMENT,
- self.statement,
- adapt_on_names=statement_container._adapt_on_names,
- )
-
- return self
-
- def _adapt_col_list(self, cols, current_adapter):
- return cols
-
- def _get_current_adapter(self):
- return None
-
- def setup_dml_returning_compile_state(self, dml_mapper):
- """used by BulkORMInsert (and Update / Delete?) to set up a handler
- for RETURNING to return ORM objects and expressions
-
- """
- target_mapper = self.statement._propagate_attrs.get(
- "plugin_subject", None
- )
- adapter = DMLReturningColFilter(target_mapper, dml_mapper)
-
- if self.compile_options._is_star and (len(self._entities) != 1):
- raise sa_exc.CompileError(
- "Can't generate ORM query that includes multiple expressions "
- "at the same time as '*'; query for '*' alone if present"
- )
-
- for entity in self._entities:
- entity.setup_dml_returning_compile_state(self, adapter)
-
-
-class FromStatement(GroupedElement, Generative, TypedReturnsRows[_TP]):
- """Core construct that represents a load of ORM objects from various
- :class:`.ReturnsRows` and other classes including:
-
- :class:`.Select`, :class:`.TextClause`, :class:`.TextualSelect`,
- :class:`.CompoundSelect`, :class`.Insert`, :class:`.Update`,
- and in theory, :class:`.Delete`.
-
- """
-
- __visit_name__ = "orm_from_statement"
-
- _compile_options = ORMFromStatementCompileState.default_compile_options
-
- _compile_state_factory = ORMFromStatementCompileState.create_for_statement
-
- _for_update_arg = None
-
- element: Union[ExecutableReturnsRows, TextClause]
-
- _adapt_on_names: bool
-
- _traverse_internals = [
- ("_raw_columns", InternalTraversal.dp_clauseelement_list),
- ("element", InternalTraversal.dp_clauseelement),
- ] + Executable._executable_traverse_internals
-
- _cache_key_traversal = _traverse_internals + [
- ("_compile_options", InternalTraversal.dp_has_cache_key)
- ]
-
- def __init__(
- self,
- entities: Iterable[_ColumnsClauseArgument[Any]],
- element: Union[ExecutableReturnsRows, TextClause],
- _adapt_on_names: bool = True,
- ):
- self._raw_columns = [
- coercions.expect(
- roles.ColumnsClauseRole,
- ent,
- apply_propagate_attrs=self,
- post_inspect=True,
- )
- for ent in util.to_list(entities)
- ]
- self.element = element
- self.is_dml = element.is_dml
- self._label_style = (
- element._label_style if is_select_base(element) else None
- )
- self._adapt_on_names = _adapt_on_names
-
- def _compiler_dispatch(self, compiler, **kw):
- """provide a fixed _compiler_dispatch method.
-
- This is roughly similar to using the sqlalchemy.ext.compiler
- ``@compiles`` extension.
-
- """
-
- compile_state = self._compile_state_factory(self, compiler, **kw)
-
- toplevel = not compiler.stack
-
- if toplevel:
- compiler.compile_state = compile_state
-
- return compiler.process(compile_state.statement, **kw)
-
- @property
- def column_descriptions(self):
- """Return a :term:`plugin-enabled` 'column descriptions' structure
- referring to the columns which are SELECTed by this statement.
-
- See the section :ref:`queryguide_inspection` for an overview
- of this feature.
-
- .. seealso::
-
- :ref:`queryguide_inspection` - ORM background
-
- """
- meth = cast(
- ORMSelectCompileState, SelectState.get_plugin_class(self)
- ).get_column_descriptions
- return meth(self)
-
- def _ensure_disambiguated_names(self):
- return self
-
- def get_children(self, **kw):
- yield from itertools.chain.from_iterable(
- element._from_objects for element in self._raw_columns
- )
- yield from super().get_children(**kw)
-
- @property
- def _all_selected_columns(self):
- return self.element._all_selected_columns
-
- @property
- def _return_defaults(self):
- return self.element._return_defaults if is_dml(self.element) else None
-
- @property
- def _returning(self):
- return self.element._returning if is_dml(self.element) else None
-
- @property
- def _inline(self):
- return self.element._inline if is_insert_update(self.element) else None
-
-
-@sql.base.CompileState.plugin_for("orm", "compound_select")
-class CompoundSelectCompileState(
- AutoflushOnlyORMCompileState, CompoundSelectState
-):
- pass
-
-
-@sql.base.CompileState.plugin_for("orm", "select")
-class ORMSelectCompileState(ORMCompileState, SelectState):
- _already_joined_edges = ()
-
- _memoized_entities = _EMPTY_DICT
-
- _from_obj_alias = None
- _has_mapper_entities = False
-
- _has_orm_entities = False
- multi_row_eager_loaders = False
- eager_adding_joins = False
- compound_eager_adapter = None
-
- correlate = None
- correlate_except = None
- _where_criteria = ()
- _having_criteria = ()
-
- @classmethod
- def create_for_statement(
- cls,
- statement: Union[Select, FromStatement],
- compiler: Optional[SQLCompiler],
- **kw: Any,
- ) -> ORMSelectCompileState:
- """compiler hook, we arrive here from compiler.visit_select() only."""
-
- self = cls.__new__(cls)
-
- if compiler is not None:
- toplevel = not compiler.stack
- else:
- toplevel = True
-
- select_statement = statement
-
- # if we are a select() that was never a legacy Query, we won't
- # have ORM level compile options.
- statement._compile_options = cls.default_compile_options.safe_merge(
- statement._compile_options
- )
-
- if select_statement._execution_options:
- # execution options should not impact the compilation of a
- # query, and at the moment subqueryloader is putting some things
- # in here that we explicitly don't want stuck in a cache.
- self.select_statement = select_statement._clone()
- self.select_statement._execution_options = util.immutabledict()
- else:
- self.select_statement = select_statement
-
- # indicates this select() came from Query.statement
- self.for_statement = select_statement._compile_options._for_statement
-
- # generally if we are from Query or directly from a select()
- self.use_legacy_query_style = (
- select_statement._compile_options._use_legacy_query_style
- )
-
- self._entities = []
- self._primary_entity = None
- self._polymorphic_adapters = {}
-
- self.compile_options = select_statement._compile_options
-
- if not toplevel:
- # for subqueries, turn off eagerloads and set
- # "render_for_subquery".
- self.compile_options += {
- "_enable_eagerloads": False,
- "_render_for_subquery": True,
- }
-
- # determine label style. we can make different decisions here.
- # at the moment, trying to see if we can always use DISAMBIGUATE_ONLY
- # rather than LABEL_STYLE_NONE, and if we can use disambiguate style
- # for new style ORM selects too.
- if (
- self.use_legacy_query_style
- and self.select_statement._label_style is LABEL_STYLE_LEGACY_ORM
- ):
- if not self.for_statement:
- self.label_style = LABEL_STYLE_TABLENAME_PLUS_COL
- else:
- self.label_style = LABEL_STYLE_DISAMBIGUATE_ONLY
- else:
- self.label_style = self.select_statement._label_style
-
- if select_statement._memoized_select_entities:
- self._memoized_entities = {
- memoized_entities: _QueryEntity.to_compile_state(
- self,
- memoized_entities._raw_columns,
- [],
- is_current_entities=False,
- )
- for memoized_entities in (
- select_statement._memoized_select_entities
- )
- }
-
- # label_convention is stateful and will yield deduping keys if it
- # sees the same key twice. therefore it's important that it is not
- # invoked for the above "memoized" entities that aren't actually
- # in the columns clause
- self._label_convention = self._column_naming_convention(
- statement._label_style, self.use_legacy_query_style
- )
-
- _QueryEntity.to_compile_state(
- self,
- select_statement._raw_columns,
- self._entities,
- is_current_entities=True,
- )
-
- self.current_path = select_statement._compile_options._current_path
-
- self.eager_order_by = ()
-
- self._init_global_attributes(
- select_statement,
- compiler,
- toplevel=toplevel,
- process_criteria_for_toplevel=False,
- )
-
- if toplevel and (
- select_statement._with_options
- or select_statement._memoized_select_entities
- ):
- for (
- memoized_entities
- ) in select_statement._memoized_select_entities:
- for opt in memoized_entities._with_options:
- if opt._is_compile_state:
- opt.process_compile_state_replaced_entities(
- self,
- [
- ent
- for ent in self._memoized_entities[
- memoized_entities
- ]
- if isinstance(ent, _MapperEntity)
- ],
- )
-
- for opt in self.select_statement._with_options:
- if opt._is_compile_state:
- opt.process_compile_state(self)
-
- # uncomment to print out the context.attributes structure
- # after it's been set up above
- # self._dump_option_struct()
-
- if select_statement._with_context_options:
- for fn, key in select_statement._with_context_options:
- fn(self)
-
- self.primary_columns = []
- self.secondary_columns = []
- self.dedupe_columns = set()
- self.eager_joins = {}
- self.extra_criteria_entities = {}
- self.create_eager_joins = []
- self._fallback_from_clauses = []
-
- # normalize the FROM clauses early by themselves, as this makes
- # it an easier job when we need to assemble a JOIN onto these,
- # for select.join() as well as joinedload(). As of 1.4 there are now
- # potentially more complex sets of FROM objects here as the use
- # of lambda statements for lazyload, load_on_pk etc. uses more
- # cloning of the select() construct. See #6495
- self.from_clauses = self._normalize_froms(
- info.selectable for info in select_statement._from_obj
- )
-
- # this is a fairly arbitrary break into a second method,
- # so it might be nicer to break up create_for_statement()
- # and _setup_for_generate into three or four logical sections
- self._setup_for_generate()
-
- SelectState.__init__(self, self.statement, compiler, **kw)
- return self
-
- def _dump_option_struct(self):
- print("\n---------------------------------------------------\n")
- print(f"current path: {self.current_path}")
- for key in self.attributes:
- if isinstance(key, tuple) and key[0] == "loader":
- print(f"\nLoader: {PathRegistry.coerce(key[1])}")
- print(f" {self.attributes[key]}")
- print(f" {self.attributes[key].__dict__}")
- elif isinstance(key, tuple) and key[0] == "path_with_polymorphic":
- print(f"\nWith Polymorphic: {PathRegistry.coerce(key[1])}")
- print(f" {self.attributes[key]}")
-
- def _setup_for_generate(self):
- query = self.select_statement
-
- self.statement = None
- self._join_entities = ()
-
- if self.compile_options._set_base_alias:
- # legacy Query only
- self._set_select_from_alias()
-
- for memoized_entities in query._memoized_select_entities:
- if memoized_entities._setup_joins:
- self._join(
- memoized_entities._setup_joins,
- self._memoized_entities[memoized_entities],
- )
-
- if query._setup_joins:
- self._join(query._setup_joins, self._entities)
-
- current_adapter = self._get_current_adapter()
-
- if query._where_criteria:
- self._where_criteria = query._where_criteria
-
- if current_adapter:
- self._where_criteria = tuple(
- current_adapter(crit, True)
- for crit in self._where_criteria
- )
-
- # TODO: some complexity with order_by here was due to mapper.order_by.
- # now that this is removed we can hopefully make order_by /
- # group_by act identically to how they are in Core select.
- self.order_by = (
- self._adapt_col_list(query._order_by_clauses, current_adapter)
- if current_adapter and query._order_by_clauses not in (None, False)
- else query._order_by_clauses
- )
-
- if query._having_criteria:
- self._having_criteria = tuple(
- current_adapter(crit, True) if current_adapter else crit
- for crit in query._having_criteria
- )
-
- self.group_by = (
- self._adapt_col_list(
- util.flatten_iterator(query._group_by_clauses), current_adapter
- )
- if current_adapter and query._group_by_clauses not in (None, False)
- else query._group_by_clauses or None
- )
-
- if self.eager_order_by:
- adapter = self.from_clauses[0]._target_adapter
- self.eager_order_by = adapter.copy_and_process(self.eager_order_by)
-
- if query._distinct_on:
- self.distinct_on = self._adapt_col_list(
- query._distinct_on, current_adapter
- )
- else:
- self.distinct_on = ()
-
- self.distinct = query._distinct
-
- if query._correlate:
- # ORM mapped entities that are mapped to joins can be passed
- # to .correlate, so here they are broken into their component
- # tables.
- self.correlate = tuple(
- util.flatten_iterator(
- sql_util.surface_selectables(s) if s is not None else None
- for s in query._correlate
- )
- )
- elif query._correlate_except is not None:
- self.correlate_except = tuple(
- util.flatten_iterator(
- sql_util.surface_selectables(s) if s is not None else None
- for s in query._correlate_except
- )
- )
- elif not query._auto_correlate:
- self.correlate = (None,)
-
- # PART II
-
- self._for_update_arg = query._for_update_arg
-
- if self.compile_options._is_star and (len(self._entities) != 1):
- raise sa_exc.CompileError(
- "Can't generate ORM query that includes multiple expressions "
- "at the same time as '*'; query for '*' alone if present"
- )
- for entity in self._entities:
- entity.setup_compile_state(self)
-
- for rec in self.create_eager_joins:
- strategy = rec[0]
- strategy(self, *rec[1:])
-
- # else "load from discrete FROMs" mode,
- # i.e. when each _MappedEntity has its own FROM
-
- if self.compile_options._enable_single_crit:
- self._adjust_for_extra_criteria()
-
- if not self.primary_columns:
- if self.compile_options._only_load_props:
- assert False, "no columns were included in _only_load_props"
-
- raise sa_exc.InvalidRequestError(
- "Query contains no columns with which to SELECT from."
- )
-
- if not self.from_clauses:
- self.from_clauses = list(self._fallback_from_clauses)
-
- if self.order_by is False:
- self.order_by = None
-
- if (
- self.multi_row_eager_loaders
- and self.eager_adding_joins
- and self._should_nest_selectable
- ):
- self.statement = self._compound_eager_statement()
- else:
- self.statement = self._simple_statement()
-
- if self.for_statement:
- ezero = self._mapper_zero()
- if ezero is not None:
- # TODO: this goes away once we get rid of the deep entity
- # thing
- self.statement = self.statement._annotate(
- {"deepentity": ezero}
- )
-
- @classmethod
- def _create_entities_collection(cls, query, legacy):
- """Creates a partial ORMSelectCompileState that includes
- the full collection of _MapperEntity and other _QueryEntity objects.
-
- Supports a few remaining use cases that are pre-compilation
- but still need to gather some of the column / adaption information.
-
- """
- self = cls.__new__(cls)
-
- self._entities = []
- self._primary_entity = None
- self._polymorphic_adapters = {}
-
- self._label_convention = self._column_naming_convention(
- query._label_style, legacy
- )
-
- # entities will also set up polymorphic adapters for mappers
- # that have with_polymorphic configured
- _QueryEntity.to_compile_state(
- self, query._raw_columns, self._entities, is_current_entities=True
- )
- return self
-
- @classmethod
- def determine_last_joined_entity(cls, statement):
- setup_joins = statement._setup_joins
-
- return _determine_last_joined_entity(setup_joins, None)
-
- @classmethod
- def all_selected_columns(cls, statement):
- for element in statement._raw_columns:
- if (
- element.is_selectable
- and "entity_namespace" in element._annotations
- ):
- ens = element._annotations["entity_namespace"]
- if not ens.is_mapper and not ens.is_aliased_class:
- yield from _select_iterables([element])
- else:
- yield from _select_iterables(ens._all_column_expressions)
- else:
- yield from _select_iterables([element])
-
- @classmethod
- def get_columns_clause_froms(cls, statement):
- return cls._normalize_froms(
- itertools.chain.from_iterable(
- (
- element._from_objects
- if "parententity" not in element._annotations
- else [
- element._annotations[
- "parententity"
- ].__clause_element__()
- ]
- )
- for element in statement._raw_columns
- )
- )
-
- @classmethod
- def from_statement(cls, statement, from_statement):
- from_statement = coercions.expect(
- roles.ReturnsRowsRole,
- from_statement,
- apply_propagate_attrs=statement,
- )
-
- stmt = FromStatement(statement._raw_columns, from_statement)
-
- stmt.__dict__.update(
- _with_options=statement._with_options,
- _with_context_options=statement._with_context_options,
- _execution_options=statement._execution_options,
- _propagate_attrs=statement._propagate_attrs,
- )
- return stmt
-
- def _set_select_from_alias(self):
- """used only for legacy Query cases"""
-
- query = self.select_statement # query
-
- assert self.compile_options._set_base_alias
- assert len(query._from_obj) == 1
-
- adapter = self._get_select_from_alias_from_obj(query._from_obj[0])
- if adapter:
- self.compile_options += {"_enable_single_crit": False}
- self._from_obj_alias = adapter
-
- def _get_select_from_alias_from_obj(self, from_obj):
- """used only for legacy Query cases"""
-
- info = from_obj
-
- if "parententity" in info._annotations:
- info = info._annotations["parententity"]
-
- if hasattr(info, "mapper"):
- if not info.is_aliased_class:
- raise sa_exc.ArgumentError(
- "A selectable (FromClause) instance is "
- "expected when the base alias is being set."
- )
- else:
- return info._adapter
-
- elif isinstance(info.selectable, sql.selectable.AliasedReturnsRows):
- equivs = self._all_equivs()
- assert info is info.selectable
- return ORMStatementAdapter(
- _TraceAdaptRole.LEGACY_SELECT_FROM_ALIAS,
- info.selectable,
- equivalents=equivs,
- )
- else:
- return None
-
- def _mapper_zero(self):
- """return the Mapper associated with the first QueryEntity."""
- return self._entities[0].mapper
-
- def _entity_zero(self):
- """Return the 'entity' (mapper or AliasedClass) associated
- with the first QueryEntity, or alternatively the 'select from'
- entity if specified."""
-
- for ent in self.from_clauses:
- if "parententity" in ent._annotations:
- return ent._annotations["parententity"]
- for qent in self._entities:
- if qent.entity_zero:
- return qent.entity_zero
-
- return None
-
- def _only_full_mapper_zero(self, methname):
- if self._entities != [self._primary_entity]:
- raise sa_exc.InvalidRequestError(
- "%s() can only be used against "
- "a single mapped class." % methname
- )
- return self._primary_entity.entity_zero
-
- def _only_entity_zero(self, rationale=None):
- if len(self._entities) > 1:
- raise sa_exc.InvalidRequestError(
- rationale
- or "This operation requires a Query "
- "against a single mapper."
- )
- return self._entity_zero()
-
- def _all_equivs(self):
- equivs = {}
-
- for memoized_entities in self._memoized_entities.values():
- for ent in [
- ent
- for ent in memoized_entities
- if isinstance(ent, _MapperEntity)
- ]:
- equivs.update(ent.mapper._equivalent_columns)
-
- for ent in [
- ent for ent in self._entities if isinstance(ent, _MapperEntity)
- ]:
- equivs.update(ent.mapper._equivalent_columns)
- return equivs
-
- def _compound_eager_statement(self):
- # for eager joins present and LIMIT/OFFSET/DISTINCT,
- # wrap the query inside a select,
- # then append eager joins onto that
-
- if self.order_by:
- # the default coercion for ORDER BY is now the OrderByRole,
- # which adds an additional post coercion to ByOfRole in that
- # elements are converted into label references. For the
- # eager load / subquery wrapping case, we need to un-coerce
- # the original expressions outside of the label references
- # in order to have them render.
- unwrapped_order_by = [
- (
- elem.element
- if isinstance(elem, sql.elements._label_reference)
- else elem
- )
- for elem in self.order_by
- ]
-
- order_by_col_expr = sql_util.expand_column_list_from_order_by(
- self.primary_columns, unwrapped_order_by
- )
- else:
- order_by_col_expr = []
- unwrapped_order_by = None
-
- # put FOR UPDATE on the inner query, where MySQL will honor it,
- # as well as if it has an OF so PostgreSQL can use it.
- inner = self._select_statement(
- self.primary_columns
- + [c for c in order_by_col_expr if c not in self.dedupe_columns],
- self.from_clauses,
- self._where_criteria,
- self._having_criteria,
- self.label_style,
- self.order_by,
- for_update=self._for_update_arg,
- hints=self.select_statement._hints,
- statement_hints=self.select_statement._statement_hints,
- correlate=self.correlate,
- correlate_except=self.correlate_except,
- **self._select_args,
- )
-
- inner = inner.alias()
-
- equivs = self._all_equivs()
-
- self.compound_eager_adapter = ORMStatementAdapter(
- _TraceAdaptRole.COMPOUND_EAGER_STATEMENT, inner, equivalents=equivs
- )
-
- statement = future.select(
- *([inner] + self.secondary_columns) # use_labels=self.labels
- )
- statement._label_style = self.label_style
-
- # Oracle however does not allow FOR UPDATE on the subquery,
- # and the Oracle dialect ignores it, plus for PostgreSQL, MySQL
- # we expect that all elements of the row are locked, so also put it
- # on the outside (except in the case of PG when OF is used)
- if (
- self._for_update_arg is not None
- and self._for_update_arg.of is None
- ):
- statement._for_update_arg = self._for_update_arg
-
- from_clause = inner
- for eager_join in self.eager_joins.values():
- # EagerLoader places a 'stop_on' attribute on the join,
- # giving us a marker as to where the "splice point" of
- # the join should be
- from_clause = sql_util.splice_joins(
- from_clause, eager_join, eager_join.stop_on
- )
-
- statement.select_from.non_generative(statement, from_clause)
-
- if unwrapped_order_by:
- statement.order_by.non_generative(
- statement,
- *self.compound_eager_adapter.copy_and_process(
- unwrapped_order_by
- ),
- )
-
- statement.order_by.non_generative(statement, *self.eager_order_by)
- return statement
-
- def _simple_statement(self):
- statement = self._select_statement(
- self.primary_columns + self.secondary_columns,
- tuple(self.from_clauses) + tuple(self.eager_joins.values()),
- self._where_criteria,
- self._having_criteria,
- self.label_style,
- self.order_by,
- for_update=self._for_update_arg,
- hints=self.select_statement._hints,
- statement_hints=self.select_statement._statement_hints,
- correlate=self.correlate,
- correlate_except=self.correlate_except,
- **self._select_args,
- )
-
- if self.eager_order_by:
- statement.order_by.non_generative(statement, *self.eager_order_by)
- return statement
-
- def _select_statement(
- self,
- raw_columns,
- from_obj,
- where_criteria,
- having_criteria,
- label_style,
- order_by,
- for_update,
- hints,
- statement_hints,
- correlate,
- correlate_except,
- limit_clause,
- offset_clause,
- fetch_clause,
- fetch_clause_options,
- distinct,
- distinct_on,
- prefixes,
- suffixes,
- group_by,
- independent_ctes,
- independent_ctes_opts,
- ):
- statement = Select._create_raw_select(
- _raw_columns=raw_columns,
- _from_obj=from_obj,
- _label_style=label_style,
- )
-
- if where_criteria:
- statement._where_criteria = where_criteria
- if having_criteria:
- statement._having_criteria = having_criteria
-
- if order_by:
- statement._order_by_clauses += tuple(order_by)
-
- if distinct_on:
- statement.distinct.non_generative(statement, *distinct_on)
- elif distinct:
- statement.distinct.non_generative(statement)
-
- if group_by:
- statement._group_by_clauses += tuple(group_by)
-
- statement._limit_clause = limit_clause
- statement._offset_clause = offset_clause
- statement._fetch_clause = fetch_clause
- statement._fetch_clause_options = fetch_clause_options
- statement._independent_ctes = independent_ctes
- statement._independent_ctes_opts = independent_ctes_opts
-
- if prefixes:
- statement._prefixes = prefixes
-
- if suffixes:
- statement._suffixes = suffixes
-
- statement._for_update_arg = for_update
-
- if hints:
- statement._hints = hints
- if statement_hints:
- statement._statement_hints = statement_hints
-
- if correlate:
- statement.correlate.non_generative(statement, *correlate)
-
- if correlate_except is not None:
- statement.correlate_except.non_generative(
- statement, *correlate_except
- )
-
- return statement
-
- def _adapt_polymorphic_element(self, element):
- if "parententity" in element._annotations:
- search = element._annotations["parententity"]
- alias = self._polymorphic_adapters.get(search, None)
- if alias:
- return alias.adapt_clause(element)
-
- if isinstance(element, expression.FromClause):
- search = element
- elif hasattr(element, "table"):
- search = element.table
- else:
- return None
-
- alias = self._polymorphic_adapters.get(search, None)
- if alias:
- return alias.adapt_clause(element)
-
- def _adapt_col_list(self, cols, current_adapter):
- if current_adapter:
- return [current_adapter(o, True) for o in cols]
- else:
- return cols
-
- def _get_current_adapter(self):
- adapters = []
-
- if self._from_obj_alias:
- # used for legacy going forward for query set_ops, e.g.
- # union(), union_all(), etc.
- # 1.4 and previously, also used for from_self(),
- # select_entity_from()
- #
- # for the "from obj" alias, apply extra rule to the
- # 'ORM only' check, if this query were generated from a
- # subquery of itself, i.e. _from_selectable(), apply adaption
- # to all SQL constructs.
- adapters.append(
- (
- True,
- self._from_obj_alias.replace,
- )
- )
-
- # this was *hopefully* the only adapter we were going to need
- # going forward...however, we unfortunately need _from_obj_alias
- # for query.union(), which we can't drop
- if self._polymorphic_adapters:
- adapters.append((False, self._adapt_polymorphic_element))
-
- if not adapters:
- return None
-
- def _adapt_clause(clause, as_filter):
- # do we adapt all expression elements or only those
- # tagged as 'ORM' constructs ?
-
- def replace(elem):
- is_orm_adapt = (
- "_orm_adapt" in elem._annotations
- or "parententity" in elem._annotations
- )
- for always_adapt, adapter in adapters:
- if is_orm_adapt or always_adapt:
- e = adapter(elem)
- if e is not None:
- return e
-
- return visitors.replacement_traverse(clause, {}, replace)
-
- return _adapt_clause
-
- def _join(self, args, entities_collection):
- for right, onclause, from_, flags in args:
- isouter = flags["isouter"]
- full = flags["full"]
-
- right = inspect(right)
- if onclause is not None:
- onclause = inspect(onclause)
-
- if isinstance(right, interfaces.PropComparator):
- if onclause is not None:
- raise sa_exc.InvalidRequestError(
- "No 'on clause' argument may be passed when joining "
- "to a relationship path as a target"
- )
-
- onclause = right
- right = None
- elif "parententity" in right._annotations:
- right = right._annotations["parententity"]
-
- if onclause is None:
- if not right.is_selectable and not hasattr(right, "mapper"):
- raise sa_exc.ArgumentError(
- "Expected mapped entity or "
- "selectable/table as join target"
- )
-
- of_type = None
-
- if isinstance(onclause, interfaces.PropComparator):
- # descriptor/property given (or determined); this tells us
- # explicitly what the expected "left" side of the join is.
-
- of_type = getattr(onclause, "_of_type", None)
-
- if right is None:
- if of_type:
- right = of_type
- else:
- right = onclause.property
-
- try:
- right = right.entity
- except AttributeError as err:
- raise sa_exc.ArgumentError(
- "Join target %s does not refer to a "
- "mapped entity" % right
- ) from err
-
- left = onclause._parententity
-
- prop = onclause.property
- if not isinstance(onclause, attributes.QueryableAttribute):
- onclause = prop
-
- # check for this path already present. don't render in that
- # case.
- if (left, right, prop.key) in self._already_joined_edges:
- continue
-
- if from_ is not None:
- if (
- from_ is not left
- and from_._annotations.get("parententity", None)
- is not left
- ):
- raise sa_exc.InvalidRequestError(
- "explicit from clause %s does not match left side "
- "of relationship attribute %s"
- % (
- from_._annotations.get("parententity", from_),
- onclause,
- )
- )
- elif from_ is not None:
- prop = None
- left = from_
- else:
- # no descriptor/property given; we will need to figure out
- # what the effective "left" side is
- prop = left = None
-
- # figure out the final "left" and "right" sides and create an
- # ORMJoin to add to our _from_obj tuple
- self._join_left_to_right(
- entities_collection,
- left,
- right,
- onclause,
- prop,
- isouter,
- full,
- )
-
- def _join_left_to_right(
- self,
- entities_collection,
- left,
- right,
- onclause,
- prop,
- outerjoin,
- full,
- ):
- """given raw "left", "right", "onclause" parameters consumed from
- a particular key within _join(), add a real ORMJoin object to
- our _from_obj list (or augment an existing one)
-
- """
-
- if left is None:
- # left not given (e.g. no relationship object/name specified)
- # figure out the best "left" side based on our existing froms /
- # entities
- assert prop is None
- (
- left,
- replace_from_obj_index,
- use_entity_index,
- ) = self._join_determine_implicit_left_side(
- entities_collection, left, right, onclause
- )
- else:
- # left is given via a relationship/name, or as explicit left side.
- # Determine where in our
- # "froms" list it should be spliced/appended as well as what
- # existing entity it corresponds to.
- (
- replace_from_obj_index,
- use_entity_index,
- ) = self._join_place_explicit_left_side(entities_collection, left)
-
- if left is right:
- raise sa_exc.InvalidRequestError(
- "Can't construct a join from %s to %s, they "
- "are the same entity" % (left, right)
- )
-
- # the right side as given often needs to be adapted. additionally
- # a lot of things can be wrong with it. handle all that and
- # get back the new effective "right" side
- r_info, right, onclause = self._join_check_and_adapt_right_side(
- left, right, onclause, prop
- )
-
- if not r_info.is_selectable:
- extra_criteria = self._get_extra_criteria(r_info)
- else:
- extra_criteria = ()
-
- if replace_from_obj_index is not None:
- # splice into an existing element in the
- # self._from_obj list
- left_clause = self.from_clauses[replace_from_obj_index]
-
- self.from_clauses = (
- self.from_clauses[:replace_from_obj_index]
- + [
- _ORMJoin(
- left_clause,
- right,
- onclause,
- isouter=outerjoin,
- full=full,
- _extra_criteria=extra_criteria,
- )
- ]
- + self.from_clauses[replace_from_obj_index + 1 :]
- )
- else:
- # add a new element to the self._from_obj list
- if use_entity_index is not None:
- # make use of _MapperEntity selectable, which is usually
- # entity_zero.selectable, but if with_polymorphic() were used
- # might be distinct
- assert isinstance(
- entities_collection[use_entity_index], _MapperEntity
- )
- left_clause = entities_collection[use_entity_index].selectable
- else:
- left_clause = left
-
- self.from_clauses = self.from_clauses + [
- _ORMJoin(
- left_clause,
- r_info,
- onclause,
- isouter=outerjoin,
- full=full,
- _extra_criteria=extra_criteria,
- )
- ]
-
- def _join_determine_implicit_left_side(
- self, entities_collection, left, right, onclause
- ):
- """When join conditions don't express the left side explicitly,
- determine if an existing FROM or entity in this query
- can serve as the left hand side.
-
- """
-
- # when we are here, it means join() was called without an ORM-
- # specific way of telling us what the "left" side is, e.g.:
- #
- # join(RightEntity)
- #
- # or
- #
- # join(RightEntity, RightEntity.foo == LeftEntity.bar)
- #
-
- r_info = inspect(right)
-
- replace_from_obj_index = use_entity_index = None
-
- if self.from_clauses:
- # we have a list of FROMs already. So by definition this
- # join has to connect to one of those FROMs.
-
- indexes = sql_util.find_left_clause_to_join_from(
- self.from_clauses, r_info.selectable, onclause
- )
-
- if len(indexes) == 1:
- replace_from_obj_index = indexes[0]
- left = self.from_clauses[replace_from_obj_index]
- elif len(indexes) > 1:
- raise sa_exc.InvalidRequestError(
- "Can't determine which FROM clause to join "
- "from, there are multiple FROMS which can "
- "join to this entity. Please use the .select_from() "
- "method to establish an explicit left side, as well as "
- "providing an explicit ON clause if not present already "
- "to help resolve the ambiguity."
- )
- else:
- raise sa_exc.InvalidRequestError(
- "Don't know how to join to %r. "
- "Please use the .select_from() "
- "method to establish an explicit left side, as well as "
- "providing an explicit ON clause if not present already "
- "to help resolve the ambiguity." % (right,)
- )
-
- elif entities_collection:
- # we have no explicit FROMs, so the implicit left has to
- # come from our list of entities.
-
- potential = {}
- for entity_index, ent in enumerate(entities_collection):
- entity = ent.entity_zero_or_selectable
- if entity is None:
- continue
- ent_info = inspect(entity)
- if ent_info is r_info: # left and right are the same, skip
- continue
-
- # by using a dictionary with the selectables as keys this
- # de-duplicates those selectables as occurs when the query is
- # against a series of columns from the same selectable
- if isinstance(ent, _MapperEntity):
- potential[ent.selectable] = (entity_index, entity)
- else:
- potential[ent_info.selectable] = (None, entity)
-
- all_clauses = list(potential.keys())
- indexes = sql_util.find_left_clause_to_join_from(
- all_clauses, r_info.selectable, onclause
- )
-
- if len(indexes) == 1:
- use_entity_index, left = potential[all_clauses[indexes[0]]]
- elif len(indexes) > 1:
- raise sa_exc.InvalidRequestError(
- "Can't determine which FROM clause to join "
- "from, there are multiple FROMS which can "
- "join to this entity. Please use the .select_from() "
- "method to establish an explicit left side, as well as "
- "providing an explicit ON clause if not present already "
- "to help resolve the ambiguity."
- )
- else:
- raise sa_exc.InvalidRequestError(
- "Don't know how to join to %r. "
- "Please use the .select_from() "
- "method to establish an explicit left side, as well as "
- "providing an explicit ON clause if not present already "
- "to help resolve the ambiguity." % (right,)
- )
- else:
- raise sa_exc.InvalidRequestError(
- "No entities to join from; please use "
- "select_from() to establish the left "
- "entity/selectable of this join"
- )
-
- return left, replace_from_obj_index, use_entity_index
-
- def _join_place_explicit_left_side(self, entities_collection, left):
- """When join conditions express a left side explicitly, determine
- where in our existing list of FROM clauses we should join towards,
- or if we need to make a new join, and if so is it from one of our
- existing entities.
-
- """
-
- # when we are here, it means join() was called with an indicator
- # as to an exact left side, which means a path to a
- # Relationship was given, e.g.:
- #
- # join(RightEntity, LeftEntity.right)
- #
- # or
- #
- # join(LeftEntity.right)
- #
- # as well as string forms:
- #
- # join(RightEntity, "right")
- #
- # etc.
- #
-
- replace_from_obj_index = use_entity_index = None
-
- l_info = inspect(left)
- if self.from_clauses:
- indexes = sql_util.find_left_clause_that_matches_given(
- self.from_clauses, l_info.selectable
- )
-
- if len(indexes) > 1:
- raise sa_exc.InvalidRequestError(
- "Can't identify which entity in which to assign the "
- "left side of this join. Please use a more specific "
- "ON clause."
- )
-
- # have an index, means the left side is already present in
- # an existing FROM in the self._from_obj tuple
- if indexes:
- replace_from_obj_index = indexes[0]
-
- # no index, means we need to add a new element to the
- # self._from_obj tuple
-
- # no from element present, so we will have to add to the
- # self._from_obj tuple. Determine if this left side matches up
- # with existing mapper entities, in which case we want to apply the
- # aliasing / adaptation rules present on that entity if any
- if (
- replace_from_obj_index is None
- and entities_collection
- and hasattr(l_info, "mapper")
- ):
- for idx, ent in enumerate(entities_collection):
- # TODO: should we be checking for multiple mapper entities
- # matching?
- if isinstance(ent, _MapperEntity) and ent.corresponds_to(left):
- use_entity_index = idx
- break
-
- return replace_from_obj_index, use_entity_index
-
- def _join_check_and_adapt_right_side(self, left, right, onclause, prop):
- """transform the "right" side of the join as well as the onclause
- according to polymorphic mapping translations, aliasing on the query
- or on the join, special cases where the right and left side have
- overlapping tables.
-
- """
-
- l_info = inspect(left)
- r_info = inspect(right)
-
- overlap = False
-
- right_mapper = getattr(r_info, "mapper", None)
- # if the target is a joined inheritance mapping,
- # be more liberal about auto-aliasing.
- if right_mapper and (
- right_mapper.with_polymorphic
- or isinstance(right_mapper.persist_selectable, expression.Join)
- ):
- for from_obj in self.from_clauses or [l_info.selectable]:
- if sql_util.selectables_overlap(
- l_info.selectable, from_obj
- ) and sql_util.selectables_overlap(
- from_obj, r_info.selectable
- ):
- overlap = True
- break
-
- if overlap and l_info.selectable is r_info.selectable:
- raise sa_exc.InvalidRequestError(
- "Can't join table/selectable '%s' to itself"
- % l_info.selectable
- )
-
- right_mapper, right_selectable, right_is_aliased = (
- getattr(r_info, "mapper", None),
- r_info.selectable,
- getattr(r_info, "is_aliased_class", False),
- )
-
- if (
- right_mapper
- and prop
- and not right_mapper.common_parent(prop.mapper)
- ):
- raise sa_exc.InvalidRequestError(
- "Join target %s does not correspond to "
- "the right side of join condition %s" % (right, onclause)
- )
-
- # _join_entities is used as a hint for single-table inheritance
- # purposes at the moment
- if hasattr(r_info, "mapper"):
- self._join_entities += (r_info,)
-
- need_adapter = False
-
- # test for joining to an unmapped selectable as the target
- if r_info.is_clause_element:
- if prop:
- right_mapper = prop.mapper
-
- if right_selectable._is_lateral:
- # orm_only is disabled to suit the case where we have to
- # adapt an explicit correlate(Entity) - the select() loses
- # the ORM-ness in this case right now, ideally it would not
- current_adapter = self._get_current_adapter()
- if current_adapter is not None:
- # TODO: we had orm_only=False here before, removing
- # it didn't break things. if we identify the rationale,
- # may need to apply "_orm_only" annotation here.
- right = current_adapter(right, True)
-
- elif prop:
- # joining to selectable with a mapper property given
- # as the ON clause
-
- if not right_selectable.is_derived_from(
- right_mapper.persist_selectable
- ):
- raise sa_exc.InvalidRequestError(
- "Selectable '%s' is not derived from '%s'"
- % (
- right_selectable.description,
- right_mapper.persist_selectable.description,
- )
- )
-
- # if the destination selectable is a plain select(),
- # turn it into an alias().
- if isinstance(right_selectable, expression.SelectBase):
- right_selectable = coercions.expect(
- roles.FromClauseRole, right_selectable
- )
- need_adapter = True
-
- # make the right hand side target into an ORM entity
- right = AliasedClass(right_mapper, right_selectable)
-
- util.warn_deprecated(
- "An alias is being generated automatically against "
- "joined entity %s for raw clauseelement, which is "
- "deprecated and will be removed in a later release. "
- "Use the aliased() "
- "construct explicitly, see the linked example."
- % right_mapper,
- "1.4",
- code="xaj1",
- )
-
- # test for overlap:
- # orm/inheritance/relationships.py
- # SelfReferentialM2MTest
- aliased_entity = right_mapper and not right_is_aliased and overlap
-
- if not need_adapter and aliased_entity:
- # there are a few places in the ORM that automatic aliasing
- # is still desirable, and can't be automatic with a Core
- # only approach. For illustrations of "overlaps" see
- # test/orm/inheritance/test_relationships.py. There are also
- # general overlap cases with many-to-many tables where automatic
- # aliasing is desirable.
- right = AliasedClass(right, flat=True)
- need_adapter = True
-
- util.warn(
- "An alias is being generated automatically against "
- "joined entity %s due to overlapping tables. This is a "
- "legacy pattern which may be "
- "deprecated in a later release. Use the "
- "aliased(<entity>, flat=True) "
- "construct explicitly, see the linked example." % right_mapper,
- code="xaj2",
- )
-
- if need_adapter:
- # if need_adapter is True, we are in a deprecated case and
- # a warning has been emitted.
- assert right_mapper
-
- adapter = ORMAdapter(
- _TraceAdaptRole.DEPRECATED_JOIN_ADAPT_RIGHT_SIDE,
- inspect(right),
- equivalents=right_mapper._equivalent_columns,
- )
-
- # if an alias() on the right side was generated,
- # which is intended to wrap a the right side in a subquery,
- # ensure that columns retrieved from this target in the result
- # set are also adapted.
- self._mapper_loads_polymorphically_with(right_mapper, adapter)
- elif (
- not r_info.is_clause_element
- and not right_is_aliased
- and right_mapper._has_aliased_polymorphic_fromclause
- ):
- # for the case where the target mapper has a with_polymorphic
- # set up, ensure an adapter is set up for criteria that works
- # against this mapper. Previously, this logic used to
- # use the "create_aliases or aliased_entity" case to generate
- # an aliased() object, but this creates an alias that isn't
- # strictly necessary.
- # see test/orm/test_core_compilation.py
- # ::RelNaturalAliasedJoinsTest::test_straight
- # and similar
- self._mapper_loads_polymorphically_with(
- right_mapper,
- ORMAdapter(
- _TraceAdaptRole.WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN,
- right_mapper,
- selectable=right_mapper.selectable,
- equivalents=right_mapper._equivalent_columns,
- ),
- )
- # if the onclause is a ClauseElement, adapt it with any
- # adapters that are in place right now
- if isinstance(onclause, expression.ClauseElement):
- current_adapter = self._get_current_adapter()
- if current_adapter:
- onclause = current_adapter(onclause, True)
-
- # if joining on a MapperProperty path,
- # track the path to prevent redundant joins
- if prop:
- self._already_joined_edges += ((left, right, prop.key),)
-
- return inspect(right), right, onclause
-
- @property
- def _select_args(self):
- return {
- "limit_clause": self.select_statement._limit_clause,
- "offset_clause": self.select_statement._offset_clause,
- "distinct": self.distinct,
- "distinct_on": self.distinct_on,
- "prefixes": self.select_statement._prefixes,
- "suffixes": self.select_statement._suffixes,
- "group_by": self.group_by or None,
- "fetch_clause": self.select_statement._fetch_clause,
- "fetch_clause_options": (
- self.select_statement._fetch_clause_options
- ),
- "independent_ctes": self.select_statement._independent_ctes,
- "independent_ctes_opts": (
- self.select_statement._independent_ctes_opts
- ),
- }
-
- @property
- def _should_nest_selectable(self):
- kwargs = self._select_args
- return (
- kwargs.get("limit_clause") is not None
- or kwargs.get("offset_clause") is not None
- or kwargs.get("distinct", False)
- or kwargs.get("distinct_on", ())
- or kwargs.get("group_by", False)
- )
-
- def _get_extra_criteria(self, ext_info):
- if (
- "additional_entity_criteria",
- ext_info.mapper,
- ) in self.global_attributes:
- return tuple(
- ae._resolve_where_criteria(ext_info)
- for ae in self.global_attributes[
- ("additional_entity_criteria", ext_info.mapper)
- ]
- if (ae.include_aliases or ae.entity is ext_info)
- and ae._should_include(self)
- )
- else:
- return ()
-
- def _adjust_for_extra_criteria(self):
- """Apply extra criteria filtering.
-
- For all distinct single-table-inheritance mappers represented in
- the columns clause of this query, as well as the "select from entity",
- add criterion to the WHERE
- clause of the given QueryContext such that only the appropriate
- subtypes are selected from the total results.
-
- Additionally, add WHERE criteria originating from LoaderCriteriaOptions
- associated with the global context.
-
- """
-
- for fromclause in self.from_clauses:
- ext_info = fromclause._annotations.get("parententity", None)
-
- if (
- ext_info
- and (
- ext_info.mapper._single_table_criterion is not None
- or ("additional_entity_criteria", ext_info.mapper)
- in self.global_attributes
- )
- and ext_info not in self.extra_criteria_entities
- ):
- self.extra_criteria_entities[ext_info] = (
- ext_info,
- ext_info._adapter if ext_info.is_aliased_class else None,
- )
-
- search = set(self.extra_criteria_entities.values())
-
- for ext_info, adapter in search:
- if ext_info in self._join_entities:
- continue
-
- single_crit = ext_info.mapper._single_table_criterion
-
- if self.compile_options._for_refresh_state:
- additional_entity_criteria = []
- else:
- additional_entity_criteria = self._get_extra_criteria(ext_info)
-
- if single_crit is not None:
- additional_entity_criteria += (single_crit,)
-
- current_adapter = self._get_current_adapter()
- for crit in additional_entity_criteria:
- if adapter:
- crit = adapter.traverse(crit)
-
- if current_adapter:
- crit = sql_util._deep_annotate(crit, {"_orm_adapt": True})
- crit = current_adapter(crit, False)
- self._where_criteria += (crit,)
-
-
-def _column_descriptions(
- query_or_select_stmt: Union[Query, Select, FromStatement],
- compile_state: Optional[ORMSelectCompileState] = None,
- legacy: bool = False,
-) -> List[ORMColumnDescription]:
- if compile_state is None:
- compile_state = ORMSelectCompileState._create_entities_collection(
- query_or_select_stmt, legacy=legacy
- )
- ctx = compile_state
- d = [
- {
- "name": ent._label_name,
- "type": ent.type,
- "aliased": getattr(insp_ent, "is_aliased_class", False),
- "expr": ent.expr,
- "entity": (
- getattr(insp_ent, "entity", None)
- if ent.entity_zero is not None
- and not insp_ent.is_clause_element
- else None
- ),
- }
- for ent, insp_ent in [
- (_ent, _ent.entity_zero) for _ent in ctx._entities
- ]
- ]
- return d
-
-
-def _legacy_filter_by_entity_zero(
- query_or_augmented_select: Union[Query[Any], Select[Any]]
-) -> Optional[_InternalEntityType[Any]]:
- self = query_or_augmented_select
- if self._setup_joins:
- _last_joined_entity = self._last_joined_entity
- if _last_joined_entity is not None:
- return _last_joined_entity
-
- if self._from_obj and "parententity" in self._from_obj[0]._annotations:
- return self._from_obj[0]._annotations["parententity"]
-
- return _entity_from_pre_ent_zero(self)
-
-
-def _entity_from_pre_ent_zero(
- query_or_augmented_select: Union[Query[Any], Select[Any]]
-) -> Optional[_InternalEntityType[Any]]:
- self = query_or_augmented_select
- if not self._raw_columns:
- return None
-
- ent = self._raw_columns[0]
-
- if "parententity" in ent._annotations:
- return ent._annotations["parententity"]
- elif isinstance(ent, ORMColumnsClauseRole):
- return ent.entity
- elif "bundle" in ent._annotations:
- return ent._annotations["bundle"]
- else:
- return ent
-
-
-def _determine_last_joined_entity(
- setup_joins: Tuple[_SetupJoinsElement, ...],
- entity_zero: Optional[_InternalEntityType[Any]] = None,
-) -> Optional[Union[_InternalEntityType[Any], _JoinTargetElement]]:
- if not setup_joins:
- return None
-
- (target, onclause, from_, flags) = setup_joins[-1]
-
- if isinstance(
- target,
- attributes.QueryableAttribute,
- ):
- return target.entity
- else:
- return target
-
-
-class _QueryEntity:
- """represent an entity column returned within a Query result."""
-
- __slots__ = ()
-
- supports_single_entity: bool
-
- _non_hashable_value = False
- _null_column_type = False
- use_id_for_hash = False
-
- _label_name: Optional[str]
- type: Union[Type[Any], TypeEngine[Any]]
- expr: Union[_InternalEntityType, ColumnElement[Any]]
- entity_zero: Optional[_InternalEntityType]
-
- def setup_compile_state(self, compile_state: ORMCompileState) -> None:
- raise NotImplementedError()
-
- def setup_dml_returning_compile_state(
- self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
- ) -> None:
- raise NotImplementedError()
-
- def row_processor(self, context, result):
- raise NotImplementedError()
-
- @classmethod
- def to_compile_state(
- cls, compile_state, entities, entities_collection, is_current_entities
- ):
- for idx, entity in enumerate(entities):
- if entity._is_lambda_element:
- if entity._is_sequence:
- cls.to_compile_state(
- compile_state,
- entity._resolved,
- entities_collection,
- is_current_entities,
- )
- continue
- else:
- entity = entity._resolved
-
- if entity.is_clause_element:
- if entity.is_selectable:
- if "parententity" in entity._annotations:
- _MapperEntity(
- compile_state,
- entity,
- entities_collection,
- is_current_entities,
- )
- else:
- _ColumnEntity._for_columns(
- compile_state,
- entity._select_iterable,
- entities_collection,
- idx,
- is_current_entities,
- )
- else:
- if entity._annotations.get("bundle", False):
- _BundleEntity(
- compile_state,
- entity,
- entities_collection,
- is_current_entities,
- )
- elif entity._is_clause_list:
- # this is legacy only - test_composites.py
- # test_query_cols_legacy
- _ColumnEntity._for_columns(
- compile_state,
- entity._select_iterable,
- entities_collection,
- idx,
- is_current_entities,
- )
- else:
- _ColumnEntity._for_columns(
- compile_state,
- [entity],
- entities_collection,
- idx,
- is_current_entities,
- )
- elif entity.is_bundle:
- _BundleEntity(compile_state, entity, entities_collection)
-
- return entities_collection
-
-
-class _MapperEntity(_QueryEntity):
- """mapper/class/AliasedClass entity"""
-
- __slots__ = (
- "expr",
- "mapper",
- "entity_zero",
- "is_aliased_class",
- "path",
- "_extra_entities",
- "_label_name",
- "_with_polymorphic_mappers",
- "selectable",
- "_polymorphic_discriminator",
- )
-
- expr: _InternalEntityType
- mapper: Mapper[Any]
- entity_zero: _InternalEntityType
- is_aliased_class: bool
- path: PathRegistry
- _label_name: str
-
- def __init__(
- self, compile_state, entity, entities_collection, is_current_entities
- ):
- entities_collection.append(self)
- if is_current_entities:
- if compile_state._primary_entity is None:
- compile_state._primary_entity = self
- compile_state._has_mapper_entities = True
- compile_state._has_orm_entities = True
-
- entity = entity._annotations["parententity"]
- entity._post_inspect
- ext_info = self.entity_zero = entity
- entity = ext_info.entity
-
- self.expr = entity
- self.mapper = mapper = ext_info.mapper
-
- self._extra_entities = (self.expr,)
-
- if ext_info.is_aliased_class:
- self._label_name = ext_info.name
- else:
- self._label_name = mapper.class_.__name__
-
- self.is_aliased_class = ext_info.is_aliased_class
- self.path = ext_info._path_registry
-
- self.selectable = ext_info.selectable
- self._with_polymorphic_mappers = ext_info.with_polymorphic_mappers
- self._polymorphic_discriminator = ext_info.polymorphic_on
-
- if mapper._should_select_with_poly_adapter:
- compile_state._create_with_polymorphic_adapter(
- ext_info, self.selectable
- )
-
- supports_single_entity = True
-
- _non_hashable_value = True
- use_id_for_hash = True
-
- @property
- def type(self):
- return self.mapper.class_
-
- @property
- def entity_zero_or_selectable(self):
- return self.entity_zero
-
- def corresponds_to(self, entity):
- return _entity_corresponds_to(self.entity_zero, entity)
-
- def _get_entity_clauses(self, compile_state):
- adapter = None
-
- if not self.is_aliased_class:
- if compile_state._polymorphic_adapters:
- adapter = compile_state._polymorphic_adapters.get(
- self.mapper, None
- )
- else:
- adapter = self.entity_zero._adapter
-
- if adapter:
- if compile_state._from_obj_alias:
- ret = adapter.wrap(compile_state._from_obj_alias)
- else:
- ret = adapter
- else:
- ret = compile_state._from_obj_alias
-
- return ret
-
- def row_processor(self, context, result):
- compile_state = context.compile_state
- adapter = self._get_entity_clauses(compile_state)
-
- if compile_state.compound_eager_adapter and adapter:
- adapter = adapter.wrap(compile_state.compound_eager_adapter)
- elif not adapter:
- adapter = compile_state.compound_eager_adapter
-
- if compile_state._primary_entity is self:
- only_load_props = compile_state.compile_options._only_load_props
- refresh_state = context.refresh_state
- else:
- only_load_props = refresh_state = None
-
- _instance = loading._instance_processor(
- self,
- self.mapper,
- context,
- result,
- self.path,
- adapter,
- only_load_props=only_load_props,
- refresh_state=refresh_state,
- polymorphic_discriminator=self._polymorphic_discriminator,
- )
-
- return _instance, self._label_name, self._extra_entities
-
- def setup_dml_returning_compile_state(
- self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
- ) -> None:
- loading._setup_entity_query(
- compile_state,
- self.mapper,
- self,
- self.path,
- adapter,
- compile_state.primary_columns,
- with_polymorphic=self._with_polymorphic_mappers,
- only_load_props=compile_state.compile_options._only_load_props,
- polymorphic_discriminator=self._polymorphic_discriminator,
- )
-
- def setup_compile_state(self, compile_state):
- adapter = self._get_entity_clauses(compile_state)
-
- single_table_crit = self.mapper._single_table_criterion
- if (
- single_table_crit is not None
- or ("additional_entity_criteria", self.mapper)
- in compile_state.global_attributes
- ):
- ext_info = self.entity_zero
- compile_state.extra_criteria_entities[ext_info] = (
- ext_info,
- ext_info._adapter if ext_info.is_aliased_class else None,
- )
-
- loading._setup_entity_query(
- compile_state,
- self.mapper,
- self,
- self.path,
- adapter,
- compile_state.primary_columns,
- with_polymorphic=self._with_polymorphic_mappers,
- only_load_props=compile_state.compile_options._only_load_props,
- polymorphic_discriminator=self._polymorphic_discriminator,
- )
- compile_state._fallback_from_clauses.append(self.selectable)
-
-
-class _BundleEntity(_QueryEntity):
- _extra_entities = ()
-
- __slots__ = (
- "bundle",
- "expr",
- "type",
- "_label_name",
- "_entities",
- "supports_single_entity",
- )
-
- _entities: List[_QueryEntity]
- bundle: Bundle
- type: Type[Any]
- _label_name: str
- supports_single_entity: bool
- expr: Bundle
-
- def __init__(
- self,
- compile_state,
- expr,
- entities_collection,
- is_current_entities,
- setup_entities=True,
- parent_bundle=None,
- ):
- compile_state._has_orm_entities = True
-
- expr = expr._annotations["bundle"]
- if parent_bundle:
- parent_bundle._entities.append(self)
- else:
- entities_collection.append(self)
-
- if isinstance(
- expr, (attributes.QueryableAttribute, interfaces.PropComparator)
- ):
- bundle = expr.__clause_element__()
- else:
- bundle = expr
-
- self.bundle = self.expr = bundle
- self.type = type(bundle)
- self._label_name = bundle.name
- self._entities = []
-
- if setup_entities:
- for expr in bundle.exprs:
- if "bundle" in expr._annotations:
- _BundleEntity(
- compile_state,
- expr,
- entities_collection,
- is_current_entities,
- parent_bundle=self,
- )
- elif isinstance(expr, Bundle):
- _BundleEntity(
- compile_state,
- expr,
- entities_collection,
- is_current_entities,
- parent_bundle=self,
- )
- else:
- _ORMColumnEntity._for_columns(
- compile_state,
- [expr],
- entities_collection,
- None,
- is_current_entities,
- parent_bundle=self,
- )
-
- self.supports_single_entity = self.bundle.single_entity
-
- @property
- def mapper(self):
- ezero = self.entity_zero
- if ezero is not None:
- return ezero.mapper
- else:
- return None
-
- @property
- def entity_zero(self):
- for ent in self._entities:
- ezero = ent.entity_zero
- if ezero is not None:
- return ezero
- else:
- return None
-
- def corresponds_to(self, entity):
- # TODO: we might be able to implement this but for now
- # we are working around it
- return False
-
- @property
- def entity_zero_or_selectable(self):
- for ent in self._entities:
- ezero = ent.entity_zero_or_selectable
- if ezero is not None:
- return ezero
- else:
- return None
-
- def setup_compile_state(self, compile_state):
- for ent in self._entities:
- ent.setup_compile_state(compile_state)
-
- def setup_dml_returning_compile_state(
- self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
- ) -> None:
- return self.setup_compile_state(compile_state)
-
- def row_processor(self, context, result):
- procs, labels, extra = zip(
- *[ent.row_processor(context, result) for ent in self._entities]
- )
-
- proc = self.bundle.create_row_processor(context.query, procs, labels)
-
- return proc, self._label_name, self._extra_entities
-
-
-class _ColumnEntity(_QueryEntity):
- __slots__ = (
- "_fetch_column",
- "_row_processor",
- "raw_column_index",
- "translate_raw_column",
- )
-
- @classmethod
- def _for_columns(
- cls,
- compile_state,
- columns,
- entities_collection,
- raw_column_index,
- is_current_entities,
- parent_bundle=None,
- ):
- for column in columns:
- annotations = column._annotations
- if "parententity" in annotations:
- _entity = annotations["parententity"]
- else:
- _entity = sql_util.extract_first_column_annotation(
- column, "parententity"
- )
-
- if _entity:
- if "identity_token" in column._annotations:
- _IdentityTokenEntity(
- compile_state,
- column,
- entities_collection,
- _entity,
- raw_column_index,
- is_current_entities,
- parent_bundle=parent_bundle,
- )
- else:
- _ORMColumnEntity(
- compile_state,
- column,
- entities_collection,
- _entity,
- raw_column_index,
- is_current_entities,
- parent_bundle=parent_bundle,
- )
- else:
- _RawColumnEntity(
- compile_state,
- column,
- entities_collection,
- raw_column_index,
- is_current_entities,
- parent_bundle=parent_bundle,
- )
-
- @property
- def type(self):
- return self.column.type
-
- @property
- def _non_hashable_value(self):
- return not self.column.type.hashable
-
- @property
- def _null_column_type(self):
- return self.column.type._isnull
-
- def row_processor(self, context, result):
- compile_state = context.compile_state
-
- # the resulting callable is entirely cacheable so just return
- # it if we already made one
- if self._row_processor is not None:
- getter, label_name, extra_entities = self._row_processor
- if self.translate_raw_column:
- extra_entities += (
- context.query._raw_columns[self.raw_column_index],
- )
-
- return getter, label_name, extra_entities
-
- # retrieve the column that would have been set up in
- # setup_compile_state, to avoid doing redundant work
- if self._fetch_column is not None:
- column = self._fetch_column
- else:
- # fetch_column will be None when we are doing a from_statement
- # and setup_compile_state may not have been called.
- column = self.column
-
- # previously, the RawColumnEntity didn't look for from_obj_alias
- # however I can't think of a case where we would be here and
- # we'd want to ignore it if this is the from_statement use case.
- # it's not really a use case to have raw columns + from_statement
- if compile_state._from_obj_alias:
- column = compile_state._from_obj_alias.columns[column]
-
- if column._annotations:
- # annotated columns perform more slowly in compiler and
- # result due to the __eq__() method, so use deannotated
- column = column._deannotate()
-
- if compile_state.compound_eager_adapter:
- column = compile_state.compound_eager_adapter.columns[column]
-
- getter = result._getter(column)
- ret = getter, self._label_name, self._extra_entities
- self._row_processor = ret
-
- if self.translate_raw_column:
- extra_entities = self._extra_entities + (
- context.query._raw_columns[self.raw_column_index],
- )
- return getter, self._label_name, extra_entities
- else:
- return ret
-
-
-class _RawColumnEntity(_ColumnEntity):
- entity_zero = None
- mapper = None
- supports_single_entity = False
-
- __slots__ = (
- "expr",
- "column",
- "_label_name",
- "entity_zero_or_selectable",
- "_extra_entities",
- )
-
- def __init__(
- self,
- compile_state,
- column,
- entities_collection,
- raw_column_index,
- is_current_entities,
- parent_bundle=None,
- ):
- self.expr = column
- self.raw_column_index = raw_column_index
- self.translate_raw_column = raw_column_index is not None
-
- if column._is_star:
- compile_state.compile_options += {"_is_star": True}
-
- if not is_current_entities or column._is_text_clause:
- self._label_name = None
- else:
- self._label_name = compile_state._label_convention(column)
-
- if parent_bundle:
- parent_bundle._entities.append(self)
- else:
- entities_collection.append(self)
-
- self.column = column
- self.entity_zero_or_selectable = (
- self.column._from_objects[0] if self.column._from_objects else None
- )
- self._extra_entities = (self.expr, self.column)
- self._fetch_column = self._row_processor = None
-
- def corresponds_to(self, entity):
- return False
-
- def setup_dml_returning_compile_state(
- self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
- ) -> None:
- return self.setup_compile_state(compile_state)
-
- def setup_compile_state(self, compile_state):
- current_adapter = compile_state._get_current_adapter()
- if current_adapter:
- column = current_adapter(self.column, False)
- if column is None:
- return
- else:
- column = self.column
-
- if column._annotations:
- # annotated columns perform more slowly in compiler and
- # result due to the __eq__() method, so use deannotated
- column = column._deannotate()
-
- compile_state.dedupe_columns.add(column)
- compile_state.primary_columns.append(column)
- self._fetch_column = column
-
-
-class _ORMColumnEntity(_ColumnEntity):
- """Column/expression based entity."""
-
- supports_single_entity = False
-
- __slots__ = (
- "expr",
- "mapper",
- "column",
- "_label_name",
- "entity_zero_or_selectable",
- "entity_zero",
- "_extra_entities",
- )
-
- def __init__(
- self,
- compile_state,
- column,
- entities_collection,
- parententity,
- raw_column_index,
- is_current_entities,
- parent_bundle=None,
- ):
- annotations = column._annotations
-
- _entity = parententity
-
- # an AliasedClass won't have proxy_key in the annotations for
- # a column if it was acquired using the class' adapter directly,
- # such as using AliasedInsp._adapt_element(). this occurs
- # within internal loaders.
-
- orm_key = annotations.get("proxy_key", None)
- proxy_owner = annotations.get("proxy_owner", _entity)
- if orm_key:
- self.expr = getattr(proxy_owner.entity, orm_key)
- self.translate_raw_column = False
- else:
- # if orm_key is not present, that means this is an ad-hoc
- # SQL ColumnElement, like a CASE() or other expression.
- # include this column position from the invoked statement
- # in the ORM-level ResultSetMetaData on each execute, so that
- # it can be targeted by identity after caching
- self.expr = column
- self.translate_raw_column = raw_column_index is not None
-
- self.raw_column_index = raw_column_index
-
- if is_current_entities:
- self._label_name = compile_state._label_convention(
- column, col_name=orm_key
- )
- else:
- self._label_name = None
-
- _entity._post_inspect
- self.entity_zero = self.entity_zero_or_selectable = ezero = _entity
- self.mapper = mapper = _entity.mapper
-
- if parent_bundle:
- parent_bundle._entities.append(self)
- else:
- entities_collection.append(self)
-
- compile_state._has_orm_entities = True
-
- self.column = column
-
- self._fetch_column = self._row_processor = None
-
- self._extra_entities = (self.expr, self.column)
-
- if mapper._should_select_with_poly_adapter:
- compile_state._create_with_polymorphic_adapter(
- ezero, ezero.selectable
- )
-
- def corresponds_to(self, entity):
- if _is_aliased_class(entity):
- # TODO: polymorphic subclasses ?
- return entity is self.entity_zero
- else:
- return not _is_aliased_class(
- self.entity_zero
- ) and entity.common_parent(self.entity_zero)
-
- def setup_dml_returning_compile_state(
- self,
- compile_state: ORMCompileState,
- adapter: DMLReturningColFilter,
- ) -> None:
- self._fetch_column = self.column
- column = adapter(self.column, False)
- if column is not None:
- compile_state.dedupe_columns.add(column)
- compile_state.primary_columns.append(column)
-
- def setup_compile_state(self, compile_state):
- current_adapter = compile_state._get_current_adapter()
- if current_adapter:
- column = current_adapter(self.column, False)
- if column is None:
- assert compile_state.is_dml_returning
- self._fetch_column = self.column
- return
- else:
- column = self.column
-
- ezero = self.entity_zero
-
- single_table_crit = self.mapper._single_table_criterion
- if (
- single_table_crit is not None
- or ("additional_entity_criteria", self.mapper)
- in compile_state.global_attributes
- ):
- compile_state.extra_criteria_entities[ezero] = (
- ezero,
- ezero._adapter if ezero.is_aliased_class else None,
- )
-
- if column._annotations and not column._expression_label:
- # annotated columns perform more slowly in compiler and
- # result due to the __eq__() method, so use deannotated
- column = column._deannotate()
-
- # use entity_zero as the from if we have it. this is necessary
- # for polymorphic scenarios where our FROM is based on ORM entity,
- # not the FROM of the column. but also, don't use it if our column
- # doesn't actually have any FROMs that line up, such as when its
- # a scalar subquery.
- if set(self.column._from_objects).intersection(
- ezero.selectable._from_objects
- ):
- compile_state._fallback_from_clauses.append(ezero.selectable)
-
- compile_state.dedupe_columns.add(column)
- compile_state.primary_columns.append(column)
- self._fetch_column = column
-
-
-class _IdentityTokenEntity(_ORMColumnEntity):
- translate_raw_column = False
-
- def setup_compile_state(self, compile_state):
- pass
-
- def row_processor(self, context, result):
- def getter(row):
- return context.load_options._identity_token
-
- return getter, self._label_name, self._extra_entities