From 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:10:44 -0400 Subject: venv --- .../site-packages/sqlalchemy/orm/util.py | 2416 ++++++++++++++++++++ 1 file changed, 2416 insertions(+) create mode 100644 venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py') diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py new file mode 100644 index 0000000..8e153e6 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py @@ -0,0 +1,2416 @@ +# orm/util.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: allow-untyped-defs, allow-untyped-calls + +from __future__ import annotations + +import enum +import functools +import re +import types +import typing +from typing import AbstractSet +from typing import Any +from typing import Callable +from typing import cast +from typing import Dict +from typing import FrozenSet +from typing import Generic +from typing import Iterable +from typing import Iterator +from typing import List +from typing import Match +from typing import Optional +from typing import Sequence +from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union +import weakref + +from . import attributes # noqa +from . import exc +from ._typing import _O +from ._typing import insp_is_aliased_class +from ._typing import insp_is_mapper +from ._typing import prop_is_relationship +from .base import _class_to_mapper as _class_to_mapper +from .base import _MappedAnnotationBase +from .base import _never_set as _never_set # noqa: F401 +from .base import _none_set as _none_set # noqa: F401 +from .base import attribute_str as attribute_str # noqa: F401 +from .base import class_mapper as class_mapper +from .base import DynamicMapped +from .base import InspectionAttr as InspectionAttr +from .base import instance_str as instance_str # noqa: F401 +from .base import Mapped +from .base import object_mapper as object_mapper +from .base import object_state as object_state # noqa: F401 +from .base import opt_manager_of_class +from .base import ORMDescriptor +from .base import state_attribute_str as state_attribute_str # noqa: F401 +from .base import state_class_str as state_class_str # noqa: F401 +from .base import state_str as state_str # noqa: F401 +from .base import WriteOnlyMapped +from .interfaces import CriteriaOption +from .interfaces import MapperProperty as MapperProperty +from .interfaces import ORMColumnsClauseRole +from .interfaces import ORMEntityColumnsClauseRole +from .interfaces import ORMFromClauseRole +from .path_registry import PathRegistry as PathRegistry +from .. import event +from .. import exc as sa_exc +from .. import inspection +from .. import sql +from .. import util +from ..engine.result import result_tuple +from ..sql import coercions +from ..sql import expression +from ..sql import lambdas +from ..sql import roles +from ..sql import util as sql_util +from ..sql import visitors +from ..sql._typing import is_selectable +from ..sql.annotation import SupportsCloneAnnotations +from ..sql.base import ColumnCollection +from ..sql.cache_key import HasCacheKey +from ..sql.cache_key import MemoizedHasCacheKey +from ..sql.elements import ColumnElement +from ..sql.elements import KeyedColumnElement +from ..sql.selectable import FromClause +from ..util.langhelpers import MemoizedSlots +from ..util.typing import de_stringify_annotation as _de_stringify_annotation +from ..util.typing import ( + de_stringify_union_elements as _de_stringify_union_elements, +) +from ..util.typing import eval_name_only as _eval_name_only +from ..util.typing import is_origin_of_cls +from ..util.typing import Literal +from ..util.typing import Protocol +from ..util.typing import typing_get_origin + +if typing.TYPE_CHECKING: + from ._typing import _EntityType + from ._typing import _IdentityKeyType + from ._typing import _InternalEntityType + from ._typing import _ORMCOLEXPR + from .context import _MapperEntity + from .context import ORMCompileState + from .mapper import Mapper + from .path_registry import AbstractEntityRegistry + from .query import Query + from .relationships import RelationshipProperty + from ..engine import Row + from ..engine import RowMapping + from ..sql._typing import _CE + from ..sql._typing import _ColumnExpressionArgument + from ..sql._typing import _EquivalentColumnMap + from ..sql._typing import _FromClauseArgument + from ..sql._typing import _OnClauseArgument + from ..sql._typing import _PropagateAttrsType + from ..sql.annotation import _SA + from ..sql.base import ReadOnlyColumnCollection + from ..sql.elements import BindParameter + from ..sql.selectable import _ColumnsClauseElement + from ..sql.selectable import Select + from ..sql.selectable import Selectable + from ..sql.visitors import anon_map + from ..util.typing import _AnnotationScanType + from ..util.typing import ArgsTypeProcotol + +_T = TypeVar("_T", bound=Any) + +all_cascades = frozenset( + ( + "delete", + "delete-orphan", + "all", + "merge", + "expunge", + "save-update", + "refresh-expire", + "none", + ) +) + + +_de_stringify_partial = functools.partial( + functools.partial, + locals_=util.immutabledict( + { + "Mapped": Mapped, + "WriteOnlyMapped": WriteOnlyMapped, + "DynamicMapped": DynamicMapped, + } + ), +) + +# partial is practically useless as we have to write out the whole +# function and maintain the signature anyway + + +class _DeStringifyAnnotation(Protocol): + def __call__( + self, + cls: Type[Any], + annotation: _AnnotationScanType, + originating_module: str, + *, + str_cleanup_fn: Optional[Callable[[str, str], str]] = None, + include_generic: bool = False, + ) -> Type[Any]: ... + + +de_stringify_annotation = cast( + _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) +) + + +class _DeStringifyUnionElements(Protocol): + def __call__( + self, + cls: Type[Any], + annotation: ArgsTypeProcotol, + originating_module: str, + *, + str_cleanup_fn: Optional[Callable[[str, str], str]] = None, + ) -> Type[Any]: ... + + +de_stringify_union_elements = cast( + _DeStringifyUnionElements, + _de_stringify_partial(_de_stringify_union_elements), +) + + +class _EvalNameOnly(Protocol): + def __call__(self, name: str, module_name: str) -> Any: ... + + +eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) + + +class CascadeOptions(FrozenSet[str]): + """Keeps track of the options sent to + :paramref:`.relationship.cascade`""" + + _add_w_all_cascades = all_cascades.difference( + ["all", "none", "delete-orphan"] + ) + _allowed_cascades = all_cascades + + _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] + + __slots__ = ( + "save_update", + "delete", + "refresh_expire", + "merge", + "expunge", + "delete_orphan", + ) + + save_update: bool + delete: bool + refresh_expire: bool + merge: bool + expunge: bool + delete_orphan: bool + + def __new__( + cls, value_list: Optional[Union[Iterable[str], str]] + ) -> CascadeOptions: + if isinstance(value_list, str) or value_list is None: + return cls.from_string(value_list) # type: ignore + values = set(value_list) + if values.difference(cls._allowed_cascades): + raise sa_exc.ArgumentError( + "Invalid cascade option(s): %s" + % ", ".join( + [ + repr(x) + for x in sorted( + values.difference(cls._allowed_cascades) + ) + ] + ) + ) + + if "all" in values: + values.update(cls._add_w_all_cascades) + if "none" in values: + values.clear() + values.discard("all") + + self = super().__new__(cls, values) + self.save_update = "save-update" in values + self.delete = "delete" in values + self.refresh_expire = "refresh-expire" in values + self.merge = "merge" in values + self.expunge = "expunge" in values + self.delete_orphan = "delete-orphan" in values + + if self.delete_orphan and not self.delete: + util.warn("The 'delete-orphan' cascade option requires 'delete'.") + return self + + def __repr__(self): + return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) + + @classmethod + def from_string(cls, arg): + values = [c for c in re.split(r"\s*,\s*", arg or "") if c] + return cls(values) + + +def _validator_events(desc, key, validator, include_removes, include_backrefs): + """Runs a validation method on an attribute value to be set or + appended. + """ + + if not include_backrefs: + + def detect_is_backref(state, initiator): + impl = state.manager[key].impl + return initiator.impl is not impl + + if include_removes: + + def append(state, value, initiator): + if initiator.op is not attributes.OP_BULK_REPLACE and ( + include_backrefs or not detect_is_backref(state, initiator) + ): + return validator(state.obj(), key, value, False) + else: + return value + + def bulk_set(state, values, initiator): + if include_backrefs or not detect_is_backref(state, initiator): + obj = state.obj() + values[:] = [ + validator(obj, key, value, False) for value in values + ] + + def set_(state, value, oldvalue, initiator): + if include_backrefs or not detect_is_backref(state, initiator): + return validator(state.obj(), key, value, False) + else: + return value + + def remove(state, value, initiator): + if include_backrefs or not detect_is_backref(state, initiator): + validator(state.obj(), key, value, True) + + else: + + def append(state, value, initiator): + if initiator.op is not attributes.OP_BULK_REPLACE and ( + include_backrefs or not detect_is_backref(state, initiator) + ): + return validator(state.obj(), key, value) + else: + return value + + def bulk_set(state, values, initiator): + if include_backrefs or not detect_is_backref(state, initiator): + obj = state.obj() + values[:] = [validator(obj, key, value) for value in values] + + def set_(state, value, oldvalue, initiator): + if include_backrefs or not detect_is_backref(state, initiator): + return validator(state.obj(), key, value) + else: + return value + + event.listen(desc, "append", append, raw=True, retval=True) + event.listen(desc, "bulk_replace", bulk_set, raw=True) + event.listen(desc, "set", set_, raw=True, retval=True) + if include_removes: + event.listen(desc, "remove", remove, raw=True, retval=True) + + +def polymorphic_union( + table_map, typecolname, aliasname="p_union", cast_nulls=True +): + """Create a ``UNION`` statement used by a polymorphic mapper. + + See :ref:`concrete_inheritance` for an example of how + this is used. + + :param table_map: mapping of polymorphic identities to + :class:`_schema.Table` objects. + :param typecolname: string name of a "discriminator" column, which will be + derived from the query, producing the polymorphic identity for + each row. If ``None``, no polymorphic discriminator is generated. + :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` + construct generated. + :param cast_nulls: if True, non-existent columns, which are represented + as labeled NULLs, will be passed into CAST. This is a legacy behavior + that is problematic on some backends such as Oracle - in which case it + can be set to False. + + """ + + colnames: util.OrderedSet[str] = util.OrderedSet() + colnamemaps = {} + types = {} + for key in table_map: + table = table_map[key] + + table = coercions.expect( + roles.StrictFromClauseRole, table, allow_select=True + ) + table_map[key] = table + + m = {} + for c in table.c: + if c.key == typecolname: + raise sa_exc.InvalidRequestError( + "Polymorphic union can't use '%s' as the discriminator " + "column due to mapped column %r; please apply the " + "'typecolname' " + "argument; this is available on " + "ConcreteBase as '_concrete_discriminator_name'" + % (typecolname, c) + ) + colnames.add(c.key) + m[c.key] = c + types[c.key] = c.type + colnamemaps[table] = m + + def col(name, table): + try: + return colnamemaps[table][name] + except KeyError: + if cast_nulls: + return sql.cast(sql.null(), types[name]).label(name) + else: + return sql.type_coerce(sql.null(), types[name]).label(name) + + result = [] + for type_, table in table_map.items(): + if typecolname is not None: + result.append( + sql.select( + *( + [col(name, table) for name in colnames] + + [ + sql.literal_column( + sql_util._quote_ddl_expr(type_) + ).label(typecolname) + ] + ) + ).select_from(table) + ) + else: + result.append( + sql.select( + *[col(name, table) for name in colnames] + ).select_from(table) + ) + return sql.union_all(*result).alias(aliasname) + + +def identity_key( + class_: Optional[Type[_T]] = None, + ident: Union[Any, Tuple[Any, ...]] = None, + *, + instance: Optional[_T] = None, + row: Optional[Union[Row[Any], RowMapping]] = None, + identity_token: Optional[Any] = None, +) -> _IdentityKeyType[_T]: + r"""Generate "identity key" tuples, as are used as keys in the + :attr:`.Session.identity_map` dictionary. + + This function has several call styles: + + * ``identity_key(class, ident, identity_token=token)`` + + This form receives a mapped class and a primary key scalar or + tuple as an argument. + + E.g.:: + + >>> identity_key(MyClass, (1, 2)) + (, (1, 2), None) + + :param class: mapped class (must be a positional argument) + :param ident: primary key, may be a scalar or tuple argument. + :param identity_token: optional identity token + + .. versionadded:: 1.2 added identity_token + + + * ``identity_key(instance=instance)`` + + This form will produce the identity key for a given instance. The + instance need not be persistent, only that its primary key attributes + are populated (else the key will contain ``None`` for those missing + values). + + E.g.:: + + >>> instance = MyClass(1, 2) + >>> identity_key(instance=instance) + (, (1, 2), None) + + In this form, the given instance is ultimately run though + :meth:`_orm.Mapper.identity_key_from_instance`, which will have the + effect of performing a database check for the corresponding row + if the object is expired. + + :param instance: object instance (must be given as a keyword arg) + + * ``identity_key(class, row=row, identity_token=token)`` + + This form is similar to the class/tuple form, except is passed a + database result row as a :class:`.Row` or :class:`.RowMapping` object. + + E.g.:: + + >>> row = engine.execute(\ + text("select * from table where a=1 and b=2")\ + ).first() + >>> identity_key(MyClass, row=row) + (, (1, 2), None) + + :param class: mapped class (must be a positional argument) + :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` + (must be given as a keyword arg) + :param identity_token: optional identity token + + .. versionadded:: 1.2 added identity_token + + """ + if class_ is not None: + mapper = class_mapper(class_) + if row is None: + if ident is None: + raise sa_exc.ArgumentError("ident or row is required") + return mapper.identity_key_from_primary_key( + tuple(util.to_list(ident)), identity_token=identity_token + ) + else: + return mapper.identity_key_from_row( + row, identity_token=identity_token + ) + elif instance is not None: + mapper = object_mapper(instance) + return mapper.identity_key_from_instance(instance) + else: + raise sa_exc.ArgumentError("class or instance is required") + + +class _TraceAdaptRole(enum.Enum): + """Enumeration of all the use cases for ORMAdapter. + + ORMAdapter remains one of the most complicated aspects of the ORM, as it is + used for in-place adaption of column expressions to be applied to a SELECT, + replacing :class:`.Table` and other objects that are mapped to classes with + aliases of those tables in the case of joined eager loading, or in the case + of polymorphic loading as used with concrete mappings or other custom "with + polymorphic" parameters, with whole user-defined subqueries. The + enumerations provide an overview of all the use cases used by ORMAdapter, a + layer of formality as to the introduction of new ORMAdapter use cases (of + which none are anticipated), as well as a means to trace the origins of a + particular ORMAdapter within runtime debugging. + + SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on + open-ended statement adaption, including the ``Query.with_polymorphic()`` + method and the ``Query.select_from_entity()`` methods, favoring + user-explicit aliasing schemes using the ``aliased()`` and + ``with_polymorphic()`` standalone constructs; these still use adaption, + however the adaption is applied in a narrower scope. + + """ + + # aliased() use that is used to adapt individual attributes at query + # construction time + ALIASED_INSP = enum.auto() + + # joinedload cases; typically adapt an ON clause of a relationship + # join + JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() + JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() + JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() + + # polymorphic cases - these are complex ones that replace FROM + # clauses, replacing tables with subqueries + MAPPER_POLYMORPHIC_ADAPTER = enum.auto() + WITH_POLYMORPHIC_ADAPTER = enum.auto() + WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() + DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() + + # the from_statement() case, used only to adapt individual attributes + # from a given statement to local ORM attributes at result fetching + # time. assigned to ORMCompileState._from_obj_alias + ADAPT_FROM_STATEMENT = enum.auto() + + # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; + # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., + # joinedloads are then placed on the outside. + # assigned to ORMCompileState.compound_eager_adapter + COMPOUND_EAGER_STATEMENT = enum.auto() + + # the legacy Query._set_select_from() case. + # this is needed for Query's set operations (i.e. UNION, etc. ) + # as well as "legacy from_self()", which while removed from 2.0 as + # public API, is used for the Query.count() method. this one + # still does full statement traversal + # assigned to ORMCompileState._from_obj_alias + LEGACY_SELECT_FROM_ALIAS = enum.auto() + + +class ORMStatementAdapter(sql_util.ColumnAdapter): + """ColumnAdapter which includes a role attribute.""" + + __slots__ = ("role",) + + def __init__( + self, + role: _TraceAdaptRole, + selectable: Selectable, + *, + equivalents: Optional[_EquivalentColumnMap] = None, + adapt_required: bool = False, + allow_label_resolve: bool = True, + anonymize_labels: bool = False, + adapt_on_names: bool = False, + adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, + ): + self.role = role + super().__init__( + selectable, + equivalents=equivalents, + adapt_required=adapt_required, + allow_label_resolve=allow_label_resolve, + anonymize_labels=anonymize_labels, + adapt_on_names=adapt_on_names, + adapt_from_selectables=adapt_from_selectables, + ) + + +class ORMAdapter(sql_util.ColumnAdapter): + """ColumnAdapter subclass which excludes adaptation of entities from + non-matching mappers. + + """ + + __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") + + is_aliased_class: bool + aliased_insp: Optional[AliasedInsp[Any]] + + def __init__( + self, + role: _TraceAdaptRole, + entity: _InternalEntityType[Any], + *, + equivalents: Optional[_EquivalentColumnMap] = None, + adapt_required: bool = False, + allow_label_resolve: bool = True, + anonymize_labels: bool = False, + selectable: Optional[Selectable] = None, + limit_on_entity: bool = True, + adapt_on_names: bool = False, + adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, + ): + self.role = role + self.mapper = entity.mapper + if selectable is None: + selectable = entity.selectable + if insp_is_aliased_class(entity): + self.is_aliased_class = True + self.aliased_insp = entity + else: + self.is_aliased_class = False + self.aliased_insp = None + + super().__init__( + selectable, + equivalents, + adapt_required=adapt_required, + allow_label_resolve=allow_label_resolve, + anonymize_labels=anonymize_labels, + include_fn=self._include_fn if limit_on_entity else None, + adapt_on_names=adapt_on_names, + adapt_from_selectables=adapt_from_selectables, + ) + + def _include_fn(self, elem): + entity = elem._annotations.get("parentmapper", None) + + return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) + + +class AliasedClass( + inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] +): + r"""Represents an "aliased" form of a mapped class for usage with Query. + + The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` + construct, this object mimics the mapped class using a + ``__getattr__`` scheme and maintains a reference to a + real :class:`~sqlalchemy.sql.expression.Alias` object. + + A primary purpose of :class:`.AliasedClass` is to serve as an alternate + within a SQL statement generated by the ORM, such that an existing + mapped entity can be used in multiple contexts. A simple example:: + + # find all pairs of users with the same name + user_alias = aliased(User) + session.query(User, user_alias).\ + join((user_alias, User.id > user_alias.id)).\ + filter(User.name == user_alias.name) + + :class:`.AliasedClass` is also capable of mapping an existing mapped + class to an entirely new selectable, provided this selectable is column- + compatible with the existing mapped selectable, and it can also be + configured in a mapping as the target of a :func:`_orm.relationship`. + See the links below for examples. + + The :class:`.AliasedClass` object is constructed typically using the + :func:`_orm.aliased` function. It also is produced with additional + configuration when using the :func:`_orm.with_polymorphic` function. + + The resulting object is an instance of :class:`.AliasedClass`. + This object implements an attribute scheme which produces the + same attribute and method interface as the original mapped + class, allowing :class:`.AliasedClass` to be compatible + with any attribute technique which works on the original class, + including hybrid attributes (see :ref:`hybrids_toplevel`). + + The :class:`.AliasedClass` can be inspected for its underlying + :class:`_orm.Mapper`, aliased selectable, and other information + using :func:`_sa.inspect`:: + + from sqlalchemy import inspect + my_alias = aliased(MyClass) + insp = inspect(my_alias) + + The resulting inspection object is an instance of :class:`.AliasedInsp`. + + + .. seealso:: + + :func:`.aliased` + + :func:`.with_polymorphic` + + :ref:`relationship_aliased_class` + + :ref:`relationship_to_window_function` + + + """ + + __name__: str + + def __init__( + self, + mapped_class_or_ac: _EntityType[_O], + alias: Optional[FromClause] = None, + name: Optional[str] = None, + flat: bool = False, + adapt_on_names: bool = False, + with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, + with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, + base_alias: Optional[AliasedInsp[Any]] = None, + use_mapper_path: bool = False, + represents_outer_join: bool = False, + ): + insp = cast( + "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) + ) + mapper = insp.mapper + + nest_adapters = False + + if alias is None: + if insp.is_aliased_class and insp.selectable._is_subquery: + alias = insp.selectable.alias() + else: + alias = ( + mapper._with_polymorphic_selectable._anonymous_fromclause( + name=name, + flat=flat, + ) + ) + elif insp.is_aliased_class: + nest_adapters = True + + assert alias is not None + self._aliased_insp = AliasedInsp( + self, + insp, + alias, + name, + ( + with_polymorphic_mappers + if with_polymorphic_mappers + else mapper.with_polymorphic_mappers + ), + ( + with_polymorphic_discriminator + if with_polymorphic_discriminator is not None + else mapper.polymorphic_on + ), + base_alias, + use_mapper_path, + adapt_on_names, + represents_outer_join, + nest_adapters, + ) + + self.__name__ = f"aliased({mapper.class_.__name__})" + + @classmethod + def _reconstitute_from_aliased_insp( + cls, aliased_insp: AliasedInsp[_O] + ) -> AliasedClass[_O]: + obj = cls.__new__(cls) + obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" + obj._aliased_insp = aliased_insp + + if aliased_insp._is_with_polymorphic: + for sub_aliased_insp in aliased_insp._with_polymorphic_entities: + if sub_aliased_insp is not aliased_insp: + ent = AliasedClass._reconstitute_from_aliased_insp( + sub_aliased_insp + ) + setattr(obj, sub_aliased_insp.class_.__name__, ent) + + return obj + + def __getattr__(self, key: str) -> Any: + try: + _aliased_insp = self.__dict__["_aliased_insp"] + except KeyError: + raise AttributeError() + else: + target = _aliased_insp._target + # maintain all getattr mechanics + attr = getattr(target, key) + + # attribute is a method, that will be invoked against a + # "self"; so just return a new method with the same function and + # new self + if hasattr(attr, "__call__") and hasattr(attr, "__self__"): + return types.MethodType(attr.__func__, self) + + # attribute is a descriptor, that will be invoked against a + # "self"; so invoke the descriptor against this self + if hasattr(attr, "__get__"): + attr = attr.__get__(None, self) + + # attributes within the QueryableAttribute system will want this + # to be invoked so the object can be adapted + if hasattr(attr, "adapt_to_entity"): + attr = attr.adapt_to_entity(_aliased_insp) + setattr(self, key, attr) + + return attr + + def _get_from_serialized( + self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] + ) -> Any: + # this method is only used in terms of the + # sqlalchemy.ext.serializer extension + attr = getattr(mapped_class, key) + if hasattr(attr, "__call__") and hasattr(attr, "__self__"): + return types.MethodType(attr.__func__, self) + + # attribute is a descriptor, that will be invoked against a + # "self"; so invoke the descriptor against this self + if hasattr(attr, "__get__"): + attr = attr.__get__(None, self) + + # attributes within the QueryableAttribute system will want this + # to be invoked so the object can be adapted + if hasattr(attr, "adapt_to_entity"): + aliased_insp._weak_entity = weakref.ref(self) + attr = attr.adapt_to_entity(aliased_insp) + setattr(self, key, attr) + + return attr + + def __repr__(self) -> str: + return "" % ( + id(self), + self._aliased_insp._target.__name__, + ) + + def __str__(self) -> str: + return str(self._aliased_insp) + + +@inspection._self_inspects +class AliasedInsp( + ORMEntityColumnsClauseRole[_O], + ORMFromClauseRole, + HasCacheKey, + InspectionAttr, + MemoizedSlots, + inspection.Inspectable["AliasedInsp[_O]"], + Generic[_O], +): + """Provide an inspection interface for an + :class:`.AliasedClass` object. + + The :class:`.AliasedInsp` object is returned + given an :class:`.AliasedClass` using the + :func:`_sa.inspect` function:: + + from sqlalchemy import inspect + from sqlalchemy.orm import aliased + + my_alias = aliased(MyMappedClass) + insp = inspect(my_alias) + + Attributes on :class:`.AliasedInsp` + include: + + * ``entity`` - the :class:`.AliasedClass` represented. + * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. + * ``selectable`` - the :class:`_expression.Alias` + construct which ultimately + represents an aliased :class:`_schema.Table` or + :class:`_expression.Select` + construct. + * ``name`` - the name of the alias. Also is used as the attribute + name when returned in a result tuple from :class:`_query.Query`. + * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` + objects + indicating all those mappers expressed in the select construct + for the :class:`.AliasedClass`. + * ``polymorphic_on`` - an alternate column or SQL expression which + will be used as the "discriminator" for a polymorphic load. + + .. seealso:: + + :ref:`inspection_toplevel` + + """ + + __slots__ = ( + "__weakref__", + "_weak_entity", + "mapper", + "selectable", + "name", + "_adapt_on_names", + "with_polymorphic_mappers", + "polymorphic_on", + "_use_mapper_path", + "_base_alias", + "represents_outer_join", + "persist_selectable", + "local_table", + "_is_with_polymorphic", + "_with_polymorphic_entities", + "_adapter", + "_target", + "__clause_element__", + "_memoized_values", + "_all_column_expressions", + "_nest_adapters", + ) + + _cache_key_traversal = [ + ("name", visitors.ExtendedInternalTraversal.dp_string), + ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), + ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), + ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), + ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), + ( + "with_polymorphic_mappers", + visitors.InternalTraversal.dp_has_cache_key_list, + ), + ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), + ] + + mapper: Mapper[_O] + selectable: FromClause + _adapter: ORMAdapter + with_polymorphic_mappers: Sequence[Mapper[Any]] + _with_polymorphic_entities: Sequence[AliasedInsp[Any]] + + _weak_entity: weakref.ref[AliasedClass[_O]] + """the AliasedClass that refers to this AliasedInsp""" + + _target: Union[Type[_O], AliasedClass[_O]] + """the thing referenced by the AliasedClass/AliasedInsp. + + In the vast majority of cases, this is the mapped class. However + it may also be another AliasedClass (alias of alias). + + """ + + def __init__( + self, + entity: AliasedClass[_O], + inspected: _InternalEntityType[_O], + selectable: FromClause, + name: Optional[str], + with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], + polymorphic_on: Optional[ColumnElement[Any]], + _base_alias: Optional[AliasedInsp[Any]], + _use_mapper_path: bool, + adapt_on_names: bool, + represents_outer_join: bool, + nest_adapters: bool, + ): + mapped_class_or_ac = inspected.entity + mapper = inspected.mapper + + self._weak_entity = weakref.ref(entity) + self.mapper = mapper + self.selectable = self.persist_selectable = self.local_table = ( + selectable + ) + self.name = name + self.polymorphic_on = polymorphic_on + self._base_alias = weakref.ref(_base_alias or self) + self._use_mapper_path = _use_mapper_path + self.represents_outer_join = represents_outer_join + self._nest_adapters = nest_adapters + + if with_polymorphic_mappers: + self._is_with_polymorphic = True + self.with_polymorphic_mappers = with_polymorphic_mappers + self._with_polymorphic_entities = [] + for poly in self.with_polymorphic_mappers: + if poly is not mapper: + ent = AliasedClass( + poly.class_, + selectable, + base_alias=self, + adapt_on_names=adapt_on_names, + use_mapper_path=_use_mapper_path, + ) + + setattr(self.entity, poly.class_.__name__, ent) + self._with_polymorphic_entities.append(ent._aliased_insp) + + else: + self._is_with_polymorphic = False + self.with_polymorphic_mappers = [mapper] + + self._adapter = ORMAdapter( + _TraceAdaptRole.ALIASED_INSP, + mapper, + selectable=selectable, + equivalents=mapper._equivalent_columns, + adapt_on_names=adapt_on_names, + anonymize_labels=True, + # make sure the adapter doesn't try to grab other tables that + # are not even the thing we are mapping, such as embedded + # selectables in subqueries or CTEs. See issue #6060 + adapt_from_selectables={ + m.selectable + for m in self.with_polymorphic_mappers + if not adapt_on_names + }, + limit_on_entity=False, + ) + + if nest_adapters: + # supports "aliased class of aliased class" use case + assert isinstance(inspected, AliasedInsp) + self._adapter = inspected._adapter.wrap(self._adapter) + + self._adapt_on_names = adapt_on_names + self._target = mapped_class_or_ac + + @classmethod + def _alias_factory( + cls, + element: Union[_EntityType[_O], FromClause], + alias: Optional[FromClause] = None, + name: Optional[str] = None, + flat: bool = False, + adapt_on_names: bool = False, + ) -> Union[AliasedClass[_O], FromClause]: + if isinstance(element, FromClause): + if adapt_on_names: + raise sa_exc.ArgumentError( + "adapt_on_names only applies to ORM elements" + ) + if name: + return element.alias(name=name, flat=flat) + else: + return coercions.expect( + roles.AnonymizedFromClauseRole, element, flat=flat + ) + else: + return AliasedClass( + element, + alias=alias, + flat=flat, + name=name, + adapt_on_names=adapt_on_names, + ) + + @classmethod + def _with_polymorphic_factory( + cls, + base: Union[Type[_O], Mapper[_O]], + classes: Union[Literal["*"], Iterable[_EntityType[Any]]], + selectable: Union[Literal[False, None], FromClause] = False, + flat: bool = False, + polymorphic_on: Optional[ColumnElement[Any]] = None, + aliased: bool = False, + innerjoin: bool = False, + adapt_on_names: bool = False, + _use_mapper_path: bool = False, + ) -> AliasedClass[_O]: + primary_mapper = _class_to_mapper(base) + + if selectable not in (None, False) and flat: + raise sa_exc.ArgumentError( + "the 'flat' and 'selectable' arguments cannot be passed " + "simultaneously to with_polymorphic()" + ) + + mappers, selectable = primary_mapper._with_polymorphic_args( + classes, selectable, innerjoin=innerjoin + ) + if aliased or flat: + assert selectable is not None + selectable = selectable._anonymous_fromclause(flat=flat) + + return AliasedClass( + base, + selectable, + with_polymorphic_mappers=mappers, + adapt_on_names=adapt_on_names, + with_polymorphic_discriminator=polymorphic_on, + use_mapper_path=_use_mapper_path, + represents_outer_join=not innerjoin, + ) + + @property + def entity(self) -> AliasedClass[_O]: + # to eliminate reference cycles, the AliasedClass is held weakly. + # this produces some situations where the AliasedClass gets lost, + # particularly when one is created internally and only the AliasedInsp + # is passed around. + # to work around this case, we just generate a new one when we need + # it, as it is a simple class with very little initial state on it. + ent = self._weak_entity() + if ent is None: + ent = AliasedClass._reconstitute_from_aliased_insp(self) + self._weak_entity = weakref.ref(ent) + return ent + + is_aliased_class = True + "always returns True" + + def _memoized_method___clause_element__(self) -> FromClause: + return self.selectable._annotate( + { + "parentmapper": self.mapper, + "parententity": self, + "entity_namespace": self, + } + )._set_propagate_attrs( + {"compile_state_plugin": "orm", "plugin_subject": self} + ) + + @property + def entity_namespace(self) -> AliasedClass[_O]: + return self.entity + + @property + def class_(self) -> Type[_O]: + """Return the mapped class ultimately represented by this + :class:`.AliasedInsp`.""" + return self.mapper.class_ + + @property + def _path_registry(self) -> AbstractEntityRegistry: + if self._use_mapper_path: + return self.mapper._path_registry + else: + return PathRegistry.per_mapper(self) + + def __getstate__(self) -> Dict[str, Any]: + return { + "entity": self.entity, + "mapper": self.mapper, + "alias": self.selectable, + "name": self.name, + "adapt_on_names": self._adapt_on_names, + "with_polymorphic_mappers": self.with_polymorphic_mappers, + "with_polymorphic_discriminator": self.polymorphic_on, + "base_alias": self._base_alias(), + "use_mapper_path": self._use_mapper_path, + "represents_outer_join": self.represents_outer_join, + "nest_adapters": self._nest_adapters, + } + + def __setstate__(self, state: Dict[str, Any]) -> None: + self.__init__( # type: ignore + state["entity"], + state["mapper"], + state["alias"], + state["name"], + state["with_polymorphic_mappers"], + state["with_polymorphic_discriminator"], + state["base_alias"], + state["use_mapper_path"], + state["adapt_on_names"], + state["represents_outer_join"], + state["nest_adapters"], + ) + + def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: + # assert self._is_with_polymorphic + # assert other._is_with_polymorphic + + primary_mapper = other.mapper + + assert self.mapper is primary_mapper + + our_classes = util.to_set( + mp.class_ for mp in self.with_polymorphic_mappers + ) + new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} + if our_classes == new_classes: + return other + else: + classes = our_classes.union(new_classes) + + mappers, selectable = primary_mapper._with_polymorphic_args( + classes, None, innerjoin=not other.represents_outer_join + ) + selectable = selectable._anonymous_fromclause(flat=True) + return AliasedClass( + primary_mapper, + selectable, + with_polymorphic_mappers=mappers, + with_polymorphic_discriminator=other.polymorphic_on, + use_mapper_path=other._use_mapper_path, + represents_outer_join=other.represents_outer_join, + )._aliased_insp + + def _adapt_element( + self, expr: _ORMCOLEXPR, key: Optional[str] = None + ) -> _ORMCOLEXPR: + assert isinstance(expr, ColumnElement) + d: Dict[str, Any] = { + "parententity": self, + "parentmapper": self.mapper, + } + if key: + d["proxy_key"] = key + + # IMO mypy should see this one also as returning the same type + # we put into it, but it's not + return ( + self._adapter.traverse(expr) + ._annotate(d) + ._set_propagate_attrs( + {"compile_state_plugin": "orm", "plugin_subject": self} + ) + ) + + if TYPE_CHECKING: + # establish compatibility with the _ORMAdapterProto protocol, + # which in turn is compatible with _CoreAdapterProto. + + def _orm_adapt_element( + self, + obj: _CE, + key: Optional[str] = None, + ) -> _CE: ... + + else: + _orm_adapt_element = _adapt_element + + def _entity_for_mapper(self, mapper): + self_poly = self.with_polymorphic_mappers + if mapper in self_poly: + if mapper is self.mapper: + return self + else: + return getattr( + self.entity, mapper.class_.__name__ + )._aliased_insp + elif mapper.isa(self.mapper): + return self + else: + assert False, "mapper %s doesn't correspond to %s" % (mapper, self) + + def _memoized_attr__get_clause(self): + onclause, replacemap = self.mapper._get_clause + return ( + self._adapter.traverse(onclause), + { + self._adapter.traverse(col): param + for col, param in replacemap.items() + }, + ) + + def _memoized_attr__memoized_values(self): + return {} + + def _memoized_attr__all_column_expressions(self): + if self._is_with_polymorphic: + cols_plus_keys = self.mapper._columns_plus_keys( + [ent.mapper for ent in self._with_polymorphic_entities] + ) + else: + cols_plus_keys = self.mapper._columns_plus_keys() + + cols_plus_keys = [ + (key, self._adapt_element(col)) for key, col in cols_plus_keys + ] + + return ColumnCollection(cols_plus_keys) + + def _memo(self, key, callable_, *args, **kw): + if key in self._memoized_values: + return self._memoized_values[key] + else: + self._memoized_values[key] = value = callable_(*args, **kw) + return value + + def __repr__(self): + if self.with_polymorphic_mappers: + with_poly = "(%s)" % ", ".join( + mp.class_.__name__ for mp in self.with_polymorphic_mappers + ) + else: + with_poly = "" + return "" % ( + id(self), + self.class_.__name__, + with_poly, + ) + + def __str__(self): + if self._is_with_polymorphic: + return "with_polymorphic(%s, [%s])" % ( + self._target.__name__, + ", ".join( + mp.class_.__name__ + for mp in self.with_polymorphic_mappers + if mp is not self.mapper + ), + ) + else: + return "aliased(%s)" % (self._target.__name__,) + + +class _WrapUserEntity: + """A wrapper used within the loader_criteria lambda caller so that + we can bypass declared_attr descriptors on unmapped mixins, which + normally emit a warning for such use. + + might also be useful for other per-lambda instrumentations should + the need arise. + + """ + + __slots__ = ("subject",) + + def __init__(self, subject): + self.subject = subject + + @util.preload_module("sqlalchemy.orm.decl_api") + def __getattribute__(self, name): + decl_api = util.preloaded.orm.decl_api + + subject = object.__getattribute__(self, "subject") + if name in subject.__dict__ and isinstance( + subject.__dict__[name], decl_api.declared_attr + ): + return subject.__dict__[name].fget(subject) + else: + return getattr(subject, name) + + +class LoaderCriteriaOption(CriteriaOption): + """Add additional WHERE criteria to the load for all occurrences of + a particular entity. + + :class:`_orm.LoaderCriteriaOption` is invoked using the + :func:`_orm.with_loader_criteria` function; see that function for + details. + + .. versionadded:: 1.4 + + """ + + __slots__ = ( + "root_entity", + "entity", + "deferred_where_criteria", + "where_criteria", + "_where_crit_orig", + "include_aliases", + "propagate_to_loaders", + ) + + _traverse_internals = [ + ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), + ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), + ("where_criteria", visitors.InternalTraversal.dp_clauseelement), + ("include_aliases", visitors.InternalTraversal.dp_boolean), + ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), + ] + + root_entity: Optional[Type[Any]] + entity: Optional[_InternalEntityType[Any]] + where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] + deferred_where_criteria: bool + include_aliases: bool + propagate_to_loaders: bool + + _where_crit_orig: Any + + def __init__( + self, + entity_or_base: _EntityType[Any], + where_criteria: Union[ + _ColumnExpressionArgument[bool], + Callable[[Any], _ColumnExpressionArgument[bool]], + ], + loader_only: bool = False, + include_aliases: bool = False, + propagate_to_loaders: bool = True, + track_closure_variables: bool = True, + ): + entity = cast( + "_InternalEntityType[Any]", + inspection.inspect(entity_or_base, False), + ) + if entity is None: + self.root_entity = cast("Type[Any]", entity_or_base) + self.entity = None + else: + self.root_entity = None + self.entity = entity + + self._where_crit_orig = where_criteria + if callable(where_criteria): + if self.root_entity is not None: + wrap_entity = self.root_entity + else: + assert entity is not None + wrap_entity = entity.entity + + self.deferred_where_criteria = True + self.where_criteria = lambdas.DeferredLambdaElement( + where_criteria, + roles.WhereHavingRole, + lambda_args=(_WrapUserEntity(wrap_entity),), + opts=lambdas.LambdaOptions( + track_closure_variables=track_closure_variables + ), + ) + else: + self.deferred_where_criteria = False + self.where_criteria = coercions.expect( + roles.WhereHavingRole, where_criteria + ) + + self.include_aliases = include_aliases + self.propagate_to_loaders = propagate_to_loaders + + @classmethod + def _unreduce( + cls, entity, where_criteria, include_aliases, propagate_to_loaders + ): + return LoaderCriteriaOption( + entity, + where_criteria, + include_aliases=include_aliases, + propagate_to_loaders=propagate_to_loaders, + ) + + def __reduce__(self): + return ( + LoaderCriteriaOption._unreduce, + ( + self.entity.class_ if self.entity else self.root_entity, + self._where_crit_orig, + self.include_aliases, + self.propagate_to_loaders, + ), + ) + + def _all_mappers(self) -> Iterator[Mapper[Any]]: + if self.entity: + yield from self.entity.mapper.self_and_descendants + else: + assert self.root_entity + stack = list(self.root_entity.__subclasses__()) + while stack: + subclass = stack.pop(0) + ent = cast( + "_InternalEntityType[Any]", + inspection.inspect(subclass, raiseerr=False), + ) + if ent: + yield from ent.mapper.self_and_descendants + else: + stack.extend(subclass.__subclasses__()) + + def _should_include(self, compile_state: ORMCompileState) -> bool: + if ( + compile_state.select_statement._annotations.get( + "for_loader_criteria", None + ) + is self + ): + return False + return True + + def _resolve_where_criteria( + self, ext_info: _InternalEntityType[Any] + ) -> ColumnElement[bool]: + if self.deferred_where_criteria: + crit = cast( + "ColumnElement[bool]", + self.where_criteria._resolve_with_args(ext_info.entity), + ) + else: + crit = self.where_criteria # type: ignore + assert isinstance(crit, ColumnElement) + return sql_util._deep_annotate( + crit, + {"for_loader_criteria": self}, + detect_subquery_cols=True, + ind_cols_on_fromclause=True, + ) + + def process_compile_state_replaced_entities( + self, + compile_state: ORMCompileState, + mapper_entities: Iterable[_MapperEntity], + ) -> None: + self.process_compile_state(compile_state) + + def process_compile_state(self, compile_state: ORMCompileState) -> None: + """Apply a modification to a given :class:`.CompileState`.""" + + # if options to limit the criteria to immediate query only, + # use compile_state.attributes instead + + self.get_global_criteria(compile_state.global_attributes) + + def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: + for mp in self._all_mappers(): + load_criteria = attributes.setdefault( + ("additional_entity_criteria", mp), [] + ) + + load_criteria.append(self) + + +inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) + + +@inspection._inspects(type) +def _inspect_mc( + class_: Type[_O], +) -> Optional[Mapper[_O]]: + try: + class_manager = opt_manager_of_class(class_) + if class_manager is None or not class_manager.is_mapped: + return None + mapper = class_manager.mapper + except exc.NO_STATE: + return None + else: + return mapper + + +GenericAlias = type(List[Any]) + + +@inspection._inspects(GenericAlias) +def _inspect_generic_alias( + class_: Type[_O], +) -> Optional[Mapper[_O]]: + origin = cast("Type[_O]", typing_get_origin(class_)) + return _inspect_mc(origin) + + +@inspection._self_inspects +class Bundle( + ORMColumnsClauseRole[_T], + SupportsCloneAnnotations, + MemoizedHasCacheKey, + inspection.Inspectable["Bundle[_T]"], + InspectionAttr, +): + """A grouping of SQL expressions that are returned by a :class:`.Query` + under one namespace. + + The :class:`.Bundle` essentially allows nesting of the tuple-based + results returned by a column-oriented :class:`_query.Query` object. + It also + is extensible via simple subclassing, where the primary capability + to override is that of how the set of expressions should be returned, + allowing post-processing as well as custom return types, without + involving ORM identity-mapped classes. + + .. seealso:: + + :ref:`bundles` + + + """ + + single_entity = False + """If True, queries for a single Bundle will be returned as a single + entity, rather than an element within a keyed tuple.""" + + is_clause_element = False + + is_mapper = False + + is_aliased_class = False + + is_bundle = True + + _propagate_attrs: _PropagateAttrsType = util.immutabledict() + + proxy_set = util.EMPTY_SET # type: ignore + + exprs: List[_ColumnsClauseElement] + + def __init__( + self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any + ): + r"""Construct a new :class:`.Bundle`. + + e.g.:: + + bn = Bundle("mybundle", MyClass.x, MyClass.y) + + for row in session.query(bn).filter( + bn.c.x == 5).filter(bn.c.y == 4): + print(row.mybundle.x, row.mybundle.y) + + :param name: name of the bundle. + :param \*exprs: columns or SQL expressions comprising the bundle. + :param single_entity=False: if True, rows for this :class:`.Bundle` + can be returned as a "single entity" outside of any enclosing tuple + in the same manner as a mapped entity. + + """ + self.name = self._label = name + coerced_exprs = [ + coercions.expect( + roles.ColumnsClauseRole, expr, apply_propagate_attrs=self + ) + for expr in exprs + ] + self.exprs = coerced_exprs + + self.c = self.columns = ColumnCollection( + (getattr(col, "key", col._label), col) + for col in [e._annotations.get("bundle", e) for e in coerced_exprs] + ).as_readonly() + self.single_entity = kw.pop("single_entity", self.single_entity) + + def _gen_cache_key( + self, anon_map: anon_map, bindparams: List[BindParameter[Any]] + ) -> Tuple[Any, ...]: + return (self.__class__, self.name, self.single_entity) + tuple( + [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] + ) + + @property + def mapper(self) -> Optional[Mapper[Any]]: + mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( + "parentmapper", None + ) + return mp + + @property + def entity(self) -> Optional[_InternalEntityType[Any]]: + ie: Optional[_InternalEntityType[Any]] = self.exprs[ + 0 + ]._annotations.get("parententity", None) + return ie + + @property + def entity_namespace( + self, + ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: + return self.c + + columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] + + """A namespace of SQL expressions referred to by this :class:`.Bundle`. + + e.g.:: + + bn = Bundle("mybundle", MyClass.x, MyClass.y) + + q = sess.query(bn).filter(bn.c.x == 5) + + Nesting of bundles is also supported:: + + b1 = Bundle("b1", + Bundle('b2', MyClass.a, MyClass.b), + Bundle('b3', MyClass.x, MyClass.y) + ) + + q = sess.query(b1).filter( + b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) + + .. seealso:: + + :attr:`.Bundle.c` + + """ + + c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] + """An alias for :attr:`.Bundle.columns`.""" + + def _clone(self): + cloned = self.__class__.__new__(self.__class__) + cloned.__dict__.update(self.__dict__) + return cloned + + def __clause_element__(self): + # ensure existing entity_namespace remains + annotations = {"bundle": self, "entity_namespace": self} + annotations.update(self._annotations) + + plugin_subject = self.exprs[0]._propagate_attrs.get( + "plugin_subject", self.entity + ) + return ( + expression.ClauseList( + _literal_as_text_role=roles.ColumnsClauseRole, + group=False, + *[e._annotations.get("bundle", e) for e in self.exprs], + ) + ._annotate(annotations) + ._set_propagate_attrs( + # the Bundle *must* use the orm plugin no matter what. the + # subject can be None but it's much better if it's not. + { + "compile_state_plugin": "orm", + "plugin_subject": plugin_subject, + } + ) + ) + + @property + def clauses(self): + return self.__clause_element__().clauses + + def label(self, name): + """Provide a copy of this :class:`.Bundle` passing a new label.""" + + cloned = self._clone() + cloned.name = name + return cloned + + def create_row_processor( + self, + query: Select[Any], + procs: Sequence[Callable[[Row[Any]], Any]], + labels: Sequence[str], + ) -> Callable[[Row[Any]], Any]: + """Produce the "row processing" function for this :class:`.Bundle`. + + May be overridden by subclasses to provide custom behaviors when + results are fetched. The method is passed the statement object and a + set of "row processor" functions at query execution time; these + processor functions when given a result row will return the individual + attribute value, which can then be adapted into any kind of return data + structure. + + The example below illustrates replacing the usual :class:`.Row` + return structure with a straight Python dictionary:: + + from sqlalchemy.orm import Bundle + + class DictBundle(Bundle): + def create_row_processor(self, query, procs, labels): + 'Override create_row_processor to return values as + dictionaries' + + def proc(row): + return dict( + zip(labels, (proc(row) for proc in procs)) + ) + return proc + + A result from the above :class:`_orm.Bundle` will return dictionary + values:: + + bn = DictBundle('mybundle', MyClass.data1, MyClass.data2) + for row in session.execute(select(bn)).where(bn.c.data1 == 'd1'): + print(row.mybundle['data1'], row.mybundle['data2']) + + """ + keyed_tuple = result_tuple(labels, [() for l in labels]) + + def proc(row: Row[Any]) -> Any: + return keyed_tuple([proc(row) for proc in procs]) + + return proc + + +def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA: + """Deep copy the given ClauseElement, annotating each element with the + "_orm_adapt" flag. + + Elements within the exclude collection will be cloned but not annotated. + + """ + return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude) + + +def _orm_deannotate(element: _SA) -> _SA: + """Remove annotations that link a column to a particular mapping. + + Note this doesn't affect "remote" and "foreign" annotations + passed by the :func:`_orm.foreign` and :func:`_orm.remote` + annotators. + + """ + + return sql_util._deep_deannotate( + element, values=("_orm_adapt", "parententity") + ) + + +def _orm_full_deannotate(element: _SA) -> _SA: + return sql_util._deep_deannotate(element) + + +class _ORMJoin(expression.Join): + """Extend Join to support ORM constructs as input.""" + + __visit_name__ = expression.Join.__visit_name__ + + inherit_cache = True + + def __init__( + self, + left: _FromClauseArgument, + right: _FromClauseArgument, + onclause: Optional[_OnClauseArgument] = None, + isouter: bool = False, + full: bool = False, + _left_memo: Optional[Any] = None, + _right_memo: Optional[Any] = None, + _extra_criteria: Tuple[ColumnElement[bool], ...] = (), + ): + left_info = cast( + "Union[FromClause, _InternalEntityType[Any]]", + inspection.inspect(left), + ) + + right_info = cast( + "Union[FromClause, _InternalEntityType[Any]]", + inspection.inspect(right), + ) + adapt_to = right_info.selectable + + # used by joined eager loader + self._left_memo = _left_memo + self._right_memo = _right_memo + + if isinstance(onclause, attributes.QueryableAttribute): + if TYPE_CHECKING: + assert isinstance( + onclause.comparator, RelationshipProperty.Comparator + ) + on_selectable = onclause.comparator._source_selectable() + prop = onclause.property + _extra_criteria += onclause._extra_criteria + elif isinstance(onclause, MapperProperty): + # used internally by joined eager loader...possibly not ideal + prop = onclause + on_selectable = prop.parent.selectable + else: + prop = None + on_selectable = None + + left_selectable = left_info.selectable + if prop: + adapt_from: Optional[FromClause] + if sql_util.clause_is_present(on_selectable, left_selectable): + adapt_from = on_selectable + else: + assert isinstance(left_selectable, FromClause) + adapt_from = left_selectable + + ( + pj, + sj, + source, + dest, + secondary, + target_adapter, + ) = prop._create_joins( + source_selectable=adapt_from, + dest_selectable=adapt_to, + source_polymorphic=True, + of_type_entity=right_info, + alias_secondary=True, + extra_criteria=_extra_criteria, + ) + + if sj is not None: + if isouter: + # note this is an inner join from secondary->right + right = sql.join(secondary, right, sj) + onclause = pj + else: + left = sql.join(left, secondary, pj, isouter) + onclause = sj + else: + onclause = pj + + self._target_adapter = target_adapter + + # we don't use the normal coercions logic for _ORMJoin + # (probably should), so do some gymnastics to get the entity. + # logic here is for #8721, which was a major bug in 1.4 + # for almost two years, not reported/fixed until 1.4.43 (!) + if is_selectable(left_info): + parententity = left_selectable._annotations.get( + "parententity", None + ) + elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): + parententity = left_info + else: + parententity = None + + if parententity is not None: + self._annotations = self._annotations.union( + {"parententity": parententity} + ) + + augment_onclause = bool(_extra_criteria) and not prop + expression.Join.__init__(self, left, right, onclause, isouter, full) + + assert self.onclause is not None + + if augment_onclause: + self.onclause &= sql.and_(*_extra_criteria) + + if ( + not prop + and getattr(right_info, "mapper", None) + and right_info.mapper.single # type: ignore + ): + right_info = cast("_InternalEntityType[Any]", right_info) + # if single inheritance target and we are using a manual + # or implicit ON clause, augment it the same way we'd augment the + # WHERE. + single_crit = right_info.mapper._single_table_criterion + if single_crit is not None: + if insp_is_aliased_class(right_info): + single_crit = right_info._adapter.traverse(single_crit) + self.onclause = self.onclause & single_crit + + def _splice_into_center(self, other): + """Splice a join into the center. + + Given join(a, b) and join(b, c), return join(a, b).join(c) + + """ + leftmost = other + while isinstance(leftmost, sql.Join): + leftmost = leftmost.left + + assert self.right is leftmost + + left = _ORMJoin( + self.left, + other.left, + self.onclause, + isouter=self.isouter, + _left_memo=self._left_memo, + _right_memo=other._left_memo, + ) + + return _ORMJoin( + left, + other.right, + other.onclause, + isouter=other.isouter, + _right_memo=other._right_memo, + ) + + def join( + self, + right: _FromClauseArgument, + onclause: Optional[_OnClauseArgument] = None, + isouter: bool = False, + full: bool = False, + ) -> _ORMJoin: + return _ORMJoin(self, right, onclause, full=full, isouter=isouter) + + def outerjoin( + self, + right: _FromClauseArgument, + onclause: Optional[_OnClauseArgument] = None, + full: bool = False, + ) -> _ORMJoin: + return _ORMJoin(self, right, onclause, isouter=True, full=full) + + +def with_parent( + instance: object, + prop: attributes.QueryableAttribute[Any], + from_entity: Optional[_EntityType[Any]] = None, +) -> ColumnElement[bool]: + """Create filtering criterion that relates this query's primary entity + to the given related instance, using established + :func:`_orm.relationship()` + configuration. + + E.g.:: + + stmt = select(Address).where(with_parent(some_user, User.addresses)) + + + The SQL rendered is the same as that rendered when a lazy loader + would fire off from the given parent on that attribute, meaning + that the appropriate state is taken from the parent object in + Python without the need to render joins to the parent table + in the rendered statement. + + The given property may also make use of :meth:`_orm.PropComparator.of_type` + to indicate the left side of the criteria:: + + + a1 = aliased(Address) + a2 = aliased(Address) + stmt = select(a1, a2).where( + with_parent(u1, User.addresses.of_type(a2)) + ) + + The above use is equivalent to using the + :func:`_orm.with_parent.from_entity` argument:: + + a1 = aliased(Address) + a2 = aliased(Address) + stmt = select(a1, a2).where( + with_parent(u1, User.addresses, from_entity=a2) + ) + + :param instance: + An instance which has some :func:`_orm.relationship`. + + :param property: + Class-bound attribute, which indicates + what relationship from the instance should be used to reconcile the + parent/child relationship. + + :param from_entity: + Entity in which to consider as the left side. This defaults to the + "zero" entity of the :class:`_query.Query` itself. + + .. versionadded:: 1.2 + + """ + prop_t: RelationshipProperty[Any] + + if isinstance(prop, str): + raise sa_exc.ArgumentError( + "with_parent() accepts class-bound mapped attributes, not strings" + ) + elif isinstance(prop, attributes.QueryableAttribute): + if prop._of_type: + from_entity = prop._of_type + mapper_property = prop.property + if mapper_property is None or not prop_is_relationship( + mapper_property + ): + raise sa_exc.ArgumentError( + f"Expected relationship property for with_parent(), " + f"got {mapper_property}" + ) + prop_t = mapper_property + else: + prop_t = prop + + return prop_t._with_parent(instance, from_entity=from_entity) + + +def has_identity(object_: object) -> bool: + """Return True if the given object has a database + identity. + + This typically corresponds to the object being + in either the persistent or detached state. + + .. seealso:: + + :func:`.was_deleted` + + """ + state = attributes.instance_state(object_) + return state.has_identity + + +def was_deleted(object_: object) -> bool: + """Return True if the given object was deleted + within a session flush. + + This is regardless of whether or not the object is + persistent or detached. + + .. seealso:: + + :attr:`.InstanceState.was_deleted` + + """ + + state = attributes.instance_state(object_) + return state.was_deleted + + +def _entity_corresponds_to( + given: _InternalEntityType[Any], entity: _InternalEntityType[Any] +) -> bool: + """determine if 'given' corresponds to 'entity', in terms + of an entity passed to Query that would match the same entity + being referred to elsewhere in the query. + + """ + if insp_is_aliased_class(entity): + if insp_is_aliased_class(given): + if entity._base_alias() is given._base_alias(): + return True + return False + elif insp_is_aliased_class(given): + if given._use_mapper_path: + return entity in given.with_polymorphic_mappers + else: + return entity is given + + assert insp_is_mapper(given) + return entity.common_parent(given) + + +def _entity_corresponds_to_use_path_impl( + given: _InternalEntityType[Any], entity: _InternalEntityType[Any] +) -> bool: + """determine if 'given' corresponds to 'entity', in terms + of a path of loader options where a mapped attribute is taken to + be a member of a parent entity. + + e.g.:: + + someoption(A).someoption(A.b) # -> fn(A, A) -> True + someoption(A).someoption(C.d) # -> fn(A, C) -> False + + a1 = aliased(A) + someoption(a1).someoption(A.b) # -> fn(a1, A) -> False + someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True + + wp = with_polymorphic(A, [A1, A2]) + someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False + someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True + + + """ + if insp_is_aliased_class(given): + return ( + insp_is_aliased_class(entity) + and not entity._use_mapper_path + and (given is entity or entity in given._with_polymorphic_entities) + ) + elif not insp_is_aliased_class(entity): + return given.isa(entity.mapper) + else: + return ( + entity._use_mapper_path + and given in entity.with_polymorphic_mappers + ) + + +def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: + """determine if 'given' "is a" mapper, in terms of the given + would load rows of type 'mapper'. + + """ + if given.is_aliased_class: + return mapper in given.with_polymorphic_mappers or given.mapper.isa( + mapper + ) + elif given.with_polymorphic_mappers: + return mapper in given.with_polymorphic_mappers + else: + return given.isa(mapper) + + +def _getitem(iterable_query: Query[Any], item: Any) -> Any: + """calculate __getitem__ in terms of an iterable query object + that also has a slice() method. + + """ + + def _no_negative_indexes(): + raise IndexError( + "negative indexes are not accepted by SQL " + "index / slice operators" + ) + + if isinstance(item, slice): + start, stop, step = util.decode_slice(item) + + if ( + isinstance(stop, int) + and isinstance(start, int) + and stop - start <= 0 + ): + return [] + + elif (isinstance(start, int) and start < 0) or ( + isinstance(stop, int) and stop < 0 + ): + _no_negative_indexes() + + res = iterable_query.slice(start, stop) + if step is not None: + return list(res)[None : None : item.step] + else: + return list(res) + else: + if item == -1: + _no_negative_indexes() + else: + return list(iterable_query[item : item + 1])[0] + + +def _is_mapped_annotation( + raw_annotation: _AnnotationScanType, + cls: Type[Any], + originating_cls: Type[Any], +) -> bool: + try: + annotated = de_stringify_annotation( + cls, raw_annotation, originating_cls.__module__ + ) + except NameError: + # in most cases, at least within our own tests, we can raise + # here, which is more accurate as it prevents us from returning + # false negatives. However, in the real world, try to avoid getting + # involved with end-user annotations that have nothing to do with us. + # see issue #8888 where we bypass using this function in the case + # that we want to detect an unresolvable Mapped[] type. + return False + else: + return is_origin_of_cls(annotated, _MappedAnnotationBase) + + +class _CleanupError(Exception): + pass + + +def _cleanup_mapped_str_annotation( + annotation: str, originating_module: str +) -> str: + # fix up an annotation that comes in as the form: + # 'Mapped[List[Address]]' so that it instead looks like: + # 'Mapped[List["Address"]]' , which will allow us to get + # "Address" as a string + + # additionally, resolve symbols for these names since this is where + # we'd have to do it + + inner: Optional[Match[str]] + + mm = re.match(r"^(.+?)\[(.+)\]$", annotation) + + if not mm: + return annotation + + # ticket #8759. Resolve the Mapped name to a real symbol. + # originally this just checked the name. + try: + obj = eval_name_only(mm.group(1), originating_module) + except NameError as ne: + raise _CleanupError( + f'For annotation "{annotation}", could not resolve ' + f'container type "{mm.group(1)}". ' + "Please ensure this type is imported at the module level " + "outside of TYPE_CHECKING blocks" + ) from ne + + if obj is typing.ClassVar: + real_symbol = "ClassVar" + else: + try: + if issubclass(obj, _MappedAnnotationBase): + real_symbol = obj.__name__ + else: + return annotation + except TypeError: + # avoid isinstance(obj, type) check, just catch TypeError + return annotation + + # note: if one of the codepaths above didn't define real_symbol and + # then didn't return, real_symbol raises UnboundLocalError + # which is actually a NameError, and the calling routines don't + # notice this since they are catching NameError anyway. Just in case + # this is being modified in the future, something to be aware of. + + stack = [] + inner = mm + while True: + stack.append(real_symbol if mm is inner else inner.group(1)) + g2 = inner.group(2) + inner = re.match(r"^(.+?)\[(.+)\]$", g2) + if inner is None: + stack.append(g2) + break + + # stacks we want to rewrite, that is, quote the last entry which + # we think is a relationship class name: + # + # ['Mapped', 'List', 'Address'] + # ['Mapped', 'A'] + # + # stacks we dont want to rewrite, which are generally MappedColumn + # use cases: + # + # ['Mapped', "'Optional[Dict[str, str]]'"] + # ['Mapped', 'dict[str, str] | None'] + + if ( + # avoid already quoted symbols such as + # ['Mapped', "'Optional[Dict[str, str]]'"] + not re.match(r"""^["'].*["']$""", stack[-1]) + # avoid further generics like Dict[] such as + # ['Mapped', 'dict[str, str] | None'] + and not re.match(r".*\[.*\]", stack[-1]) + ): + stripchars = "\"' " + stack[-1] = ", ".join( + f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") + ) + + annotation = "[".join(stack) + ("]" * (len(stack) - 1)) + + return annotation + + +def _extract_mapped_subtype( + raw_annotation: Optional[_AnnotationScanType], + cls: type, + originating_module: str, + key: str, + attr_cls: Type[Any], + required: bool, + is_dataclass_field: bool, + expect_mapped: bool = True, + raiseerr: bool = True, +) -> Optional[Tuple[Union[type, str], Optional[type]]]: + """given an annotation, figure out if it's ``Mapped[something]`` and if + so, return the ``something`` part. + + Includes error raise scenarios and other options. + + """ + + if raw_annotation is None: + if required: + raise sa_exc.ArgumentError( + f"Python typing annotation is required for attribute " + f'"{cls.__name__}.{key}" when primary argument(s) for ' + f'"{attr_cls.__name__}" construct are None or not present' + ) + return None + + try: + annotated = de_stringify_annotation( + cls, + raw_annotation, + originating_module, + str_cleanup_fn=_cleanup_mapped_str_annotation, + ) + except _CleanupError as ce: + raise sa_exc.ArgumentError( + f"Could not interpret annotation {raw_annotation}. " + "Check that it uses names that are correctly imported at the " + "module level. See chained stack trace for more hints." + ) from ce + except NameError as ne: + if raiseerr and "Mapped[" in raw_annotation: # type: ignore + raise sa_exc.ArgumentError( + f"Could not interpret annotation {raw_annotation}. " + "Check that it uses names that are correctly imported at the " + "module level. See chained stack trace for more hints." + ) from ne + + annotated = raw_annotation # type: ignore + + if is_dataclass_field: + return annotated, None + else: + if not hasattr(annotated, "__origin__") or not is_origin_of_cls( + annotated, _MappedAnnotationBase + ): + if expect_mapped: + if not raiseerr: + return None + + origin = getattr(annotated, "__origin__", None) + if origin is typing.ClassVar: + return None + + # check for other kind of ORM descriptor like AssociationProxy, + # don't raise for that (issue #9957) + elif isinstance(origin, type) and issubclass( + origin, ORMDescriptor + ): + return None + + raise sa_exc.ArgumentError( + f'Type annotation for "{cls.__name__}.{key}" ' + "can't be correctly interpreted for " + "Annotated Declarative Table form. ORM annotations " + "should normally make use of the ``Mapped[]`` generic " + "type, or other ORM-compatible generic type, as a " + "container for the actual type, which indicates the " + "intent that the attribute is mapped. " + "Class variables that are not intended to be mapped " + "by the ORM should use ClassVar[]. " + "To allow Annotated Declarative to disregard legacy " + "annotations which don't use Mapped[] to pass, set " + '"__allow_unmapped__ = True" on the class or a ' + "superclass this class.", + code="zlpr", + ) + + else: + return annotated, None + + if len(annotated.__args__) != 1: + raise sa_exc.ArgumentError( + "Expected sub-type for Mapped[] annotation" + ) + + return annotated.__args__[0], annotated.__origin__ + + +def _mapper_property_as_plain_name(prop: Type[Any]) -> str: + if hasattr(prop, "_mapper_property_name"): + name = prop._mapper_property_name() + else: + name = None + return util.clsname_as_plain_name(prop, name) -- cgit v1.2.3