From 12cf076118570eebbff08c6b3090e0d4798447a1 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:17:55 -0400 Subject: no venv --- .../site-packages/sqlalchemy/orm/util.py | 2416 -------------------- 1 file changed, 2416 deletions(-) delete mode 100644 venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py') diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py deleted file mode 100644 index 8e153e6..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py +++ /dev/null @@ -1,2416 +0,0 @@ -# orm/util.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: allow-untyped-defs, allow-untyped-calls - -from __future__ import annotations - -import enum -import functools -import re -import types -import typing -from typing import AbstractSet -from typing import Any -from typing import Callable -from typing import cast -from typing import Dict -from typing import FrozenSet -from typing import Generic -from typing import Iterable -from typing import Iterator -from typing import List -from typing import Match -from typing import Optional -from typing import Sequence -from typing import Tuple -from typing import Type -from typing import TYPE_CHECKING -from typing import TypeVar -from typing import Union -import weakref - -from . import attributes # noqa -from . import exc -from ._typing import _O -from ._typing import insp_is_aliased_class -from ._typing import insp_is_mapper -from ._typing import prop_is_relationship -from .base import _class_to_mapper as _class_to_mapper -from .base import _MappedAnnotationBase -from .base import _never_set as _never_set # noqa: F401 -from .base import _none_set as _none_set # noqa: F401 -from .base import attribute_str as attribute_str # noqa: F401 -from .base import class_mapper as class_mapper -from .base import DynamicMapped -from .base import InspectionAttr as InspectionAttr -from .base import instance_str as instance_str # noqa: F401 -from .base import Mapped -from .base import object_mapper as object_mapper -from .base import object_state as object_state # noqa: F401 -from .base import opt_manager_of_class -from .base import ORMDescriptor -from .base import state_attribute_str as state_attribute_str # noqa: F401 -from .base import state_class_str as state_class_str # noqa: F401 -from .base import state_str as state_str # noqa: F401 -from .base import WriteOnlyMapped -from .interfaces import CriteriaOption -from .interfaces import MapperProperty as MapperProperty -from .interfaces import ORMColumnsClauseRole -from .interfaces import ORMEntityColumnsClauseRole -from .interfaces import ORMFromClauseRole -from .path_registry import PathRegistry as PathRegistry -from .. import event -from .. import exc as sa_exc -from .. import inspection -from .. import sql -from .. import util -from ..engine.result import result_tuple -from ..sql import coercions -from ..sql import expression -from ..sql import lambdas -from ..sql import roles -from ..sql import util as sql_util -from ..sql import visitors -from ..sql._typing import is_selectable -from ..sql.annotation import SupportsCloneAnnotations -from ..sql.base import ColumnCollection -from ..sql.cache_key import HasCacheKey -from ..sql.cache_key import MemoizedHasCacheKey -from ..sql.elements import ColumnElement -from ..sql.elements import KeyedColumnElement -from ..sql.selectable import FromClause -from ..util.langhelpers import MemoizedSlots -from ..util.typing import de_stringify_annotation as _de_stringify_annotation -from ..util.typing import ( - de_stringify_union_elements as _de_stringify_union_elements, -) -from ..util.typing import eval_name_only as _eval_name_only -from ..util.typing import is_origin_of_cls -from ..util.typing import Literal -from ..util.typing import Protocol -from ..util.typing import typing_get_origin - -if typing.TYPE_CHECKING: - from ._typing import _EntityType - from ._typing import _IdentityKeyType - from ._typing import _InternalEntityType - from ._typing import _ORMCOLEXPR - from .context import _MapperEntity - from .context import ORMCompileState - from .mapper import Mapper - from .path_registry import AbstractEntityRegistry - from .query import Query - from .relationships import RelationshipProperty - from ..engine import Row - from ..engine import RowMapping - from ..sql._typing import _CE - from ..sql._typing import _ColumnExpressionArgument - from ..sql._typing import _EquivalentColumnMap - from ..sql._typing import _FromClauseArgument - from ..sql._typing import _OnClauseArgument - from ..sql._typing import _PropagateAttrsType - from ..sql.annotation import _SA - from ..sql.base import ReadOnlyColumnCollection - from ..sql.elements import BindParameter - from ..sql.selectable import _ColumnsClauseElement - from ..sql.selectable import Select - from ..sql.selectable import Selectable - from ..sql.visitors import anon_map - from ..util.typing import _AnnotationScanType - from ..util.typing import ArgsTypeProcotol - -_T = TypeVar("_T", bound=Any) - -all_cascades = frozenset( - ( - "delete", - "delete-orphan", - "all", - "merge", - "expunge", - "save-update", - "refresh-expire", - "none", - ) -) - - -_de_stringify_partial = functools.partial( - functools.partial, - locals_=util.immutabledict( - { - "Mapped": Mapped, - "WriteOnlyMapped": WriteOnlyMapped, - "DynamicMapped": DynamicMapped, - } - ), -) - -# partial is practically useless as we have to write out the whole -# function and maintain the signature anyway - - -class _DeStringifyAnnotation(Protocol): - def __call__( - self, - cls: Type[Any], - annotation: _AnnotationScanType, - originating_module: str, - *, - str_cleanup_fn: Optional[Callable[[str, str], str]] = None, - include_generic: bool = False, - ) -> Type[Any]: ... - - -de_stringify_annotation = cast( - _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation) -) - - -class _DeStringifyUnionElements(Protocol): - def __call__( - self, - cls: Type[Any], - annotation: ArgsTypeProcotol, - originating_module: str, - *, - str_cleanup_fn: Optional[Callable[[str, str], str]] = None, - ) -> Type[Any]: ... - - -de_stringify_union_elements = cast( - _DeStringifyUnionElements, - _de_stringify_partial(_de_stringify_union_elements), -) - - -class _EvalNameOnly(Protocol): - def __call__(self, name: str, module_name: str) -> Any: ... - - -eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only)) - - -class CascadeOptions(FrozenSet[str]): - """Keeps track of the options sent to - :paramref:`.relationship.cascade`""" - - _add_w_all_cascades = all_cascades.difference( - ["all", "none", "delete-orphan"] - ) - _allowed_cascades = all_cascades - - _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"] - - __slots__ = ( - "save_update", - "delete", - "refresh_expire", - "merge", - "expunge", - "delete_orphan", - ) - - save_update: bool - delete: bool - refresh_expire: bool - merge: bool - expunge: bool - delete_orphan: bool - - def __new__( - cls, value_list: Optional[Union[Iterable[str], str]] - ) -> CascadeOptions: - if isinstance(value_list, str) or value_list is None: - return cls.from_string(value_list) # type: ignore - values = set(value_list) - if values.difference(cls._allowed_cascades): - raise sa_exc.ArgumentError( - "Invalid cascade option(s): %s" - % ", ".join( - [ - repr(x) - for x in sorted( - values.difference(cls._allowed_cascades) - ) - ] - ) - ) - - if "all" in values: - values.update(cls._add_w_all_cascades) - if "none" in values: - values.clear() - values.discard("all") - - self = super().__new__(cls, values) - self.save_update = "save-update" in values - self.delete = "delete" in values - self.refresh_expire = "refresh-expire" in values - self.merge = "merge" in values - self.expunge = "expunge" in values - self.delete_orphan = "delete-orphan" in values - - if self.delete_orphan and not self.delete: - util.warn("The 'delete-orphan' cascade option requires 'delete'.") - return self - - def __repr__(self): - return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)])) - - @classmethod - def from_string(cls, arg): - values = [c for c in re.split(r"\s*,\s*", arg or "") if c] - return cls(values) - - -def _validator_events(desc, key, validator, include_removes, include_backrefs): - """Runs a validation method on an attribute value to be set or - appended. - """ - - if not include_backrefs: - - def detect_is_backref(state, initiator): - impl = state.manager[key].impl - return initiator.impl is not impl - - if include_removes: - - def append(state, value, initiator): - if initiator.op is not attributes.OP_BULK_REPLACE and ( - include_backrefs or not detect_is_backref(state, initiator) - ): - return validator(state.obj(), key, value, False) - else: - return value - - def bulk_set(state, values, initiator): - if include_backrefs or not detect_is_backref(state, initiator): - obj = state.obj() - values[:] = [ - validator(obj, key, value, False) for value in values - ] - - def set_(state, value, oldvalue, initiator): - if include_backrefs or not detect_is_backref(state, initiator): - return validator(state.obj(), key, value, False) - else: - return value - - def remove(state, value, initiator): - if include_backrefs or not detect_is_backref(state, initiator): - validator(state.obj(), key, value, True) - - else: - - def append(state, value, initiator): - if initiator.op is not attributes.OP_BULK_REPLACE and ( - include_backrefs or not detect_is_backref(state, initiator) - ): - return validator(state.obj(), key, value) - else: - return value - - def bulk_set(state, values, initiator): - if include_backrefs or not detect_is_backref(state, initiator): - obj = state.obj() - values[:] = [validator(obj, key, value) for value in values] - - def set_(state, value, oldvalue, initiator): - if include_backrefs or not detect_is_backref(state, initiator): - return validator(state.obj(), key, value) - else: - return value - - event.listen(desc, "append", append, raw=True, retval=True) - event.listen(desc, "bulk_replace", bulk_set, raw=True) - event.listen(desc, "set", set_, raw=True, retval=True) - if include_removes: - event.listen(desc, "remove", remove, raw=True, retval=True) - - -def polymorphic_union( - table_map, typecolname, aliasname="p_union", cast_nulls=True -): - """Create a ``UNION`` statement used by a polymorphic mapper. - - See :ref:`concrete_inheritance` for an example of how - this is used. - - :param table_map: mapping of polymorphic identities to - :class:`_schema.Table` objects. - :param typecolname: string name of a "discriminator" column, which will be - derived from the query, producing the polymorphic identity for - each row. If ``None``, no polymorphic discriminator is generated. - :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()` - construct generated. - :param cast_nulls: if True, non-existent columns, which are represented - as labeled NULLs, will be passed into CAST. This is a legacy behavior - that is problematic on some backends such as Oracle - in which case it - can be set to False. - - """ - - colnames: util.OrderedSet[str] = util.OrderedSet() - colnamemaps = {} - types = {} - for key in table_map: - table = table_map[key] - - table = coercions.expect( - roles.StrictFromClauseRole, table, allow_select=True - ) - table_map[key] = table - - m = {} - for c in table.c: - if c.key == typecolname: - raise sa_exc.InvalidRequestError( - "Polymorphic union can't use '%s' as the discriminator " - "column due to mapped column %r; please apply the " - "'typecolname' " - "argument; this is available on " - "ConcreteBase as '_concrete_discriminator_name'" - % (typecolname, c) - ) - colnames.add(c.key) - m[c.key] = c - types[c.key] = c.type - colnamemaps[table] = m - - def col(name, table): - try: - return colnamemaps[table][name] - except KeyError: - if cast_nulls: - return sql.cast(sql.null(), types[name]).label(name) - else: - return sql.type_coerce(sql.null(), types[name]).label(name) - - result = [] - for type_, table in table_map.items(): - if typecolname is not None: - result.append( - sql.select( - *( - [col(name, table) for name in colnames] - + [ - sql.literal_column( - sql_util._quote_ddl_expr(type_) - ).label(typecolname) - ] - ) - ).select_from(table) - ) - else: - result.append( - sql.select( - *[col(name, table) for name in colnames] - ).select_from(table) - ) - return sql.union_all(*result).alias(aliasname) - - -def identity_key( - class_: Optional[Type[_T]] = None, - ident: Union[Any, Tuple[Any, ...]] = None, - *, - instance: Optional[_T] = None, - row: Optional[Union[Row[Any], RowMapping]] = None, - identity_token: Optional[Any] = None, -) -> _IdentityKeyType[_T]: - r"""Generate "identity key" tuples, as are used as keys in the - :attr:`.Session.identity_map` dictionary. - - This function has several call styles: - - * ``identity_key(class, ident, identity_token=token)`` - - This form receives a mapped class and a primary key scalar or - tuple as an argument. - - E.g.:: - - >>> identity_key(MyClass, (1, 2)) - (, (1, 2), None) - - :param class: mapped class (must be a positional argument) - :param ident: primary key, may be a scalar or tuple argument. - :param identity_token: optional identity token - - .. versionadded:: 1.2 added identity_token - - - * ``identity_key(instance=instance)`` - - This form will produce the identity key for a given instance. The - instance need not be persistent, only that its primary key attributes - are populated (else the key will contain ``None`` for those missing - values). - - E.g.:: - - >>> instance = MyClass(1, 2) - >>> identity_key(instance=instance) - (, (1, 2), None) - - In this form, the given instance is ultimately run though - :meth:`_orm.Mapper.identity_key_from_instance`, which will have the - effect of performing a database check for the corresponding row - if the object is expired. - - :param instance: object instance (must be given as a keyword arg) - - * ``identity_key(class, row=row, identity_token=token)`` - - This form is similar to the class/tuple form, except is passed a - database result row as a :class:`.Row` or :class:`.RowMapping` object. - - E.g.:: - - >>> row = engine.execute(\ - text("select * from table where a=1 and b=2")\ - ).first() - >>> identity_key(MyClass, row=row) - (, (1, 2), None) - - :param class: mapped class (must be a positional argument) - :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult` - (must be given as a keyword arg) - :param identity_token: optional identity token - - .. versionadded:: 1.2 added identity_token - - """ - if class_ is not None: - mapper = class_mapper(class_) - if row is None: - if ident is None: - raise sa_exc.ArgumentError("ident or row is required") - return mapper.identity_key_from_primary_key( - tuple(util.to_list(ident)), identity_token=identity_token - ) - else: - return mapper.identity_key_from_row( - row, identity_token=identity_token - ) - elif instance is not None: - mapper = object_mapper(instance) - return mapper.identity_key_from_instance(instance) - else: - raise sa_exc.ArgumentError("class or instance is required") - - -class _TraceAdaptRole(enum.Enum): - """Enumeration of all the use cases for ORMAdapter. - - ORMAdapter remains one of the most complicated aspects of the ORM, as it is - used for in-place adaption of column expressions to be applied to a SELECT, - replacing :class:`.Table` and other objects that are mapped to classes with - aliases of those tables in the case of joined eager loading, or in the case - of polymorphic loading as used with concrete mappings or other custom "with - polymorphic" parameters, with whole user-defined subqueries. The - enumerations provide an overview of all the use cases used by ORMAdapter, a - layer of formality as to the introduction of new ORMAdapter use cases (of - which none are anticipated), as well as a means to trace the origins of a - particular ORMAdapter within runtime debugging. - - SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on - open-ended statement adaption, including the ``Query.with_polymorphic()`` - method and the ``Query.select_from_entity()`` methods, favoring - user-explicit aliasing schemes using the ``aliased()`` and - ``with_polymorphic()`` standalone constructs; these still use adaption, - however the adaption is applied in a narrower scope. - - """ - - # aliased() use that is used to adapt individual attributes at query - # construction time - ALIASED_INSP = enum.auto() - - # joinedload cases; typically adapt an ON clause of a relationship - # join - JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto() - JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto() - JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto() - - # polymorphic cases - these are complex ones that replace FROM - # clauses, replacing tables with subqueries - MAPPER_POLYMORPHIC_ADAPTER = enum.auto() - WITH_POLYMORPHIC_ADAPTER = enum.auto() - WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto() - DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto() - - # the from_statement() case, used only to adapt individual attributes - # from a given statement to local ORM attributes at result fetching - # time. assigned to ORMCompileState._from_obj_alias - ADAPT_FROM_STATEMENT = enum.auto() - - # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case; - # the query is placed inside of a subquery with the LIMIT/OFFSET/etc., - # joinedloads are then placed on the outside. - # assigned to ORMCompileState.compound_eager_adapter - COMPOUND_EAGER_STATEMENT = enum.auto() - - # the legacy Query._set_select_from() case. - # this is needed for Query's set operations (i.e. UNION, etc. ) - # as well as "legacy from_self()", which while removed from 2.0 as - # public API, is used for the Query.count() method. this one - # still does full statement traversal - # assigned to ORMCompileState._from_obj_alias - LEGACY_SELECT_FROM_ALIAS = enum.auto() - - -class ORMStatementAdapter(sql_util.ColumnAdapter): - """ColumnAdapter which includes a role attribute.""" - - __slots__ = ("role",) - - def __init__( - self, - role: _TraceAdaptRole, - selectable: Selectable, - *, - equivalents: Optional[_EquivalentColumnMap] = None, - adapt_required: bool = False, - allow_label_resolve: bool = True, - anonymize_labels: bool = False, - adapt_on_names: bool = False, - adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, - ): - self.role = role - super().__init__( - selectable, - equivalents=equivalents, - adapt_required=adapt_required, - allow_label_resolve=allow_label_resolve, - anonymize_labels=anonymize_labels, - adapt_on_names=adapt_on_names, - adapt_from_selectables=adapt_from_selectables, - ) - - -class ORMAdapter(sql_util.ColumnAdapter): - """ColumnAdapter subclass which excludes adaptation of entities from - non-matching mappers. - - """ - - __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp") - - is_aliased_class: bool - aliased_insp: Optional[AliasedInsp[Any]] - - def __init__( - self, - role: _TraceAdaptRole, - entity: _InternalEntityType[Any], - *, - equivalents: Optional[_EquivalentColumnMap] = None, - adapt_required: bool = False, - allow_label_resolve: bool = True, - anonymize_labels: bool = False, - selectable: Optional[Selectable] = None, - limit_on_entity: bool = True, - adapt_on_names: bool = False, - adapt_from_selectables: Optional[AbstractSet[FromClause]] = None, - ): - self.role = role - self.mapper = entity.mapper - if selectable is None: - selectable = entity.selectable - if insp_is_aliased_class(entity): - self.is_aliased_class = True - self.aliased_insp = entity - else: - self.is_aliased_class = False - self.aliased_insp = None - - super().__init__( - selectable, - equivalents, - adapt_required=adapt_required, - allow_label_resolve=allow_label_resolve, - anonymize_labels=anonymize_labels, - include_fn=self._include_fn if limit_on_entity else None, - adapt_on_names=adapt_on_names, - adapt_from_selectables=adapt_from_selectables, - ) - - def _include_fn(self, elem): - entity = elem._annotations.get("parentmapper", None) - - return not entity or entity.isa(self.mapper) or self.mapper.isa(entity) - - -class AliasedClass( - inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O] -): - r"""Represents an "aliased" form of a mapped class for usage with Query. - - The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias` - construct, this object mimics the mapped class using a - ``__getattr__`` scheme and maintains a reference to a - real :class:`~sqlalchemy.sql.expression.Alias` object. - - A primary purpose of :class:`.AliasedClass` is to serve as an alternate - within a SQL statement generated by the ORM, such that an existing - mapped entity can be used in multiple contexts. A simple example:: - - # find all pairs of users with the same name - user_alias = aliased(User) - session.query(User, user_alias).\ - join((user_alias, User.id > user_alias.id)).\ - filter(User.name == user_alias.name) - - :class:`.AliasedClass` is also capable of mapping an existing mapped - class to an entirely new selectable, provided this selectable is column- - compatible with the existing mapped selectable, and it can also be - configured in a mapping as the target of a :func:`_orm.relationship`. - See the links below for examples. - - The :class:`.AliasedClass` object is constructed typically using the - :func:`_orm.aliased` function. It also is produced with additional - configuration when using the :func:`_orm.with_polymorphic` function. - - The resulting object is an instance of :class:`.AliasedClass`. - This object implements an attribute scheme which produces the - same attribute and method interface as the original mapped - class, allowing :class:`.AliasedClass` to be compatible - with any attribute technique which works on the original class, - including hybrid attributes (see :ref:`hybrids_toplevel`). - - The :class:`.AliasedClass` can be inspected for its underlying - :class:`_orm.Mapper`, aliased selectable, and other information - using :func:`_sa.inspect`:: - - from sqlalchemy import inspect - my_alias = aliased(MyClass) - insp = inspect(my_alias) - - The resulting inspection object is an instance of :class:`.AliasedInsp`. - - - .. seealso:: - - :func:`.aliased` - - :func:`.with_polymorphic` - - :ref:`relationship_aliased_class` - - :ref:`relationship_to_window_function` - - - """ - - __name__: str - - def __init__( - self, - mapped_class_or_ac: _EntityType[_O], - alias: Optional[FromClause] = None, - name: Optional[str] = None, - flat: bool = False, - adapt_on_names: bool = False, - with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None, - with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None, - base_alias: Optional[AliasedInsp[Any]] = None, - use_mapper_path: bool = False, - represents_outer_join: bool = False, - ): - insp = cast( - "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac) - ) - mapper = insp.mapper - - nest_adapters = False - - if alias is None: - if insp.is_aliased_class and insp.selectable._is_subquery: - alias = insp.selectable.alias() - else: - alias = ( - mapper._with_polymorphic_selectable._anonymous_fromclause( - name=name, - flat=flat, - ) - ) - elif insp.is_aliased_class: - nest_adapters = True - - assert alias is not None - self._aliased_insp = AliasedInsp( - self, - insp, - alias, - name, - ( - with_polymorphic_mappers - if with_polymorphic_mappers - else mapper.with_polymorphic_mappers - ), - ( - with_polymorphic_discriminator - if with_polymorphic_discriminator is not None - else mapper.polymorphic_on - ), - base_alias, - use_mapper_path, - adapt_on_names, - represents_outer_join, - nest_adapters, - ) - - self.__name__ = f"aliased({mapper.class_.__name__})" - - @classmethod - def _reconstitute_from_aliased_insp( - cls, aliased_insp: AliasedInsp[_O] - ) -> AliasedClass[_O]: - obj = cls.__new__(cls) - obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})" - obj._aliased_insp = aliased_insp - - if aliased_insp._is_with_polymorphic: - for sub_aliased_insp in aliased_insp._with_polymorphic_entities: - if sub_aliased_insp is not aliased_insp: - ent = AliasedClass._reconstitute_from_aliased_insp( - sub_aliased_insp - ) - setattr(obj, sub_aliased_insp.class_.__name__, ent) - - return obj - - def __getattr__(self, key: str) -> Any: - try: - _aliased_insp = self.__dict__["_aliased_insp"] - except KeyError: - raise AttributeError() - else: - target = _aliased_insp._target - # maintain all getattr mechanics - attr = getattr(target, key) - - # attribute is a method, that will be invoked against a - # "self"; so just return a new method with the same function and - # new self - if hasattr(attr, "__call__") and hasattr(attr, "__self__"): - return types.MethodType(attr.__func__, self) - - # attribute is a descriptor, that will be invoked against a - # "self"; so invoke the descriptor against this self - if hasattr(attr, "__get__"): - attr = attr.__get__(None, self) - - # attributes within the QueryableAttribute system will want this - # to be invoked so the object can be adapted - if hasattr(attr, "adapt_to_entity"): - attr = attr.adapt_to_entity(_aliased_insp) - setattr(self, key, attr) - - return attr - - def _get_from_serialized( - self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O] - ) -> Any: - # this method is only used in terms of the - # sqlalchemy.ext.serializer extension - attr = getattr(mapped_class, key) - if hasattr(attr, "__call__") and hasattr(attr, "__self__"): - return types.MethodType(attr.__func__, self) - - # attribute is a descriptor, that will be invoked against a - # "self"; so invoke the descriptor against this self - if hasattr(attr, "__get__"): - attr = attr.__get__(None, self) - - # attributes within the QueryableAttribute system will want this - # to be invoked so the object can be adapted - if hasattr(attr, "adapt_to_entity"): - aliased_insp._weak_entity = weakref.ref(self) - attr = attr.adapt_to_entity(aliased_insp) - setattr(self, key, attr) - - return attr - - def __repr__(self) -> str: - return "" % ( - id(self), - self._aliased_insp._target.__name__, - ) - - def __str__(self) -> str: - return str(self._aliased_insp) - - -@inspection._self_inspects -class AliasedInsp( - ORMEntityColumnsClauseRole[_O], - ORMFromClauseRole, - HasCacheKey, - InspectionAttr, - MemoizedSlots, - inspection.Inspectable["AliasedInsp[_O]"], - Generic[_O], -): - """Provide an inspection interface for an - :class:`.AliasedClass` object. - - The :class:`.AliasedInsp` object is returned - given an :class:`.AliasedClass` using the - :func:`_sa.inspect` function:: - - from sqlalchemy import inspect - from sqlalchemy.orm import aliased - - my_alias = aliased(MyMappedClass) - insp = inspect(my_alias) - - Attributes on :class:`.AliasedInsp` - include: - - * ``entity`` - the :class:`.AliasedClass` represented. - * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class. - * ``selectable`` - the :class:`_expression.Alias` - construct which ultimately - represents an aliased :class:`_schema.Table` or - :class:`_expression.Select` - construct. - * ``name`` - the name of the alias. Also is used as the attribute - name when returned in a result tuple from :class:`_query.Query`. - * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper` - objects - indicating all those mappers expressed in the select construct - for the :class:`.AliasedClass`. - * ``polymorphic_on`` - an alternate column or SQL expression which - will be used as the "discriminator" for a polymorphic load. - - .. seealso:: - - :ref:`inspection_toplevel` - - """ - - __slots__ = ( - "__weakref__", - "_weak_entity", - "mapper", - "selectable", - "name", - "_adapt_on_names", - "with_polymorphic_mappers", - "polymorphic_on", - "_use_mapper_path", - "_base_alias", - "represents_outer_join", - "persist_selectable", - "local_table", - "_is_with_polymorphic", - "_with_polymorphic_entities", - "_adapter", - "_target", - "__clause_element__", - "_memoized_values", - "_all_column_expressions", - "_nest_adapters", - ) - - _cache_key_traversal = [ - ("name", visitors.ExtendedInternalTraversal.dp_string), - ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean), - ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean), - ("_target", visitors.ExtendedInternalTraversal.dp_inspectable), - ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement), - ( - "with_polymorphic_mappers", - visitors.InternalTraversal.dp_has_cache_key_list, - ), - ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement), - ] - - mapper: Mapper[_O] - selectable: FromClause - _adapter: ORMAdapter - with_polymorphic_mappers: Sequence[Mapper[Any]] - _with_polymorphic_entities: Sequence[AliasedInsp[Any]] - - _weak_entity: weakref.ref[AliasedClass[_O]] - """the AliasedClass that refers to this AliasedInsp""" - - _target: Union[Type[_O], AliasedClass[_O]] - """the thing referenced by the AliasedClass/AliasedInsp. - - In the vast majority of cases, this is the mapped class. However - it may also be another AliasedClass (alias of alias). - - """ - - def __init__( - self, - entity: AliasedClass[_O], - inspected: _InternalEntityType[_O], - selectable: FromClause, - name: Optional[str], - with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]], - polymorphic_on: Optional[ColumnElement[Any]], - _base_alias: Optional[AliasedInsp[Any]], - _use_mapper_path: bool, - adapt_on_names: bool, - represents_outer_join: bool, - nest_adapters: bool, - ): - mapped_class_or_ac = inspected.entity - mapper = inspected.mapper - - self._weak_entity = weakref.ref(entity) - self.mapper = mapper - self.selectable = self.persist_selectable = self.local_table = ( - selectable - ) - self.name = name - self.polymorphic_on = polymorphic_on - self._base_alias = weakref.ref(_base_alias or self) - self._use_mapper_path = _use_mapper_path - self.represents_outer_join = represents_outer_join - self._nest_adapters = nest_adapters - - if with_polymorphic_mappers: - self._is_with_polymorphic = True - self.with_polymorphic_mappers = with_polymorphic_mappers - self._with_polymorphic_entities = [] - for poly in self.with_polymorphic_mappers: - if poly is not mapper: - ent = AliasedClass( - poly.class_, - selectable, - base_alias=self, - adapt_on_names=adapt_on_names, - use_mapper_path=_use_mapper_path, - ) - - setattr(self.entity, poly.class_.__name__, ent) - self._with_polymorphic_entities.append(ent._aliased_insp) - - else: - self._is_with_polymorphic = False - self.with_polymorphic_mappers = [mapper] - - self._adapter = ORMAdapter( - _TraceAdaptRole.ALIASED_INSP, - mapper, - selectable=selectable, - equivalents=mapper._equivalent_columns, - adapt_on_names=adapt_on_names, - anonymize_labels=True, - # make sure the adapter doesn't try to grab other tables that - # are not even the thing we are mapping, such as embedded - # selectables in subqueries or CTEs. See issue #6060 - adapt_from_selectables={ - m.selectable - for m in self.with_polymorphic_mappers - if not adapt_on_names - }, - limit_on_entity=False, - ) - - if nest_adapters: - # supports "aliased class of aliased class" use case - assert isinstance(inspected, AliasedInsp) - self._adapter = inspected._adapter.wrap(self._adapter) - - self._adapt_on_names = adapt_on_names - self._target = mapped_class_or_ac - - @classmethod - def _alias_factory( - cls, - element: Union[_EntityType[_O], FromClause], - alias: Optional[FromClause] = None, - name: Optional[str] = None, - flat: bool = False, - adapt_on_names: bool = False, - ) -> Union[AliasedClass[_O], FromClause]: - if isinstance(element, FromClause): - if adapt_on_names: - raise sa_exc.ArgumentError( - "adapt_on_names only applies to ORM elements" - ) - if name: - return element.alias(name=name, flat=flat) - else: - return coercions.expect( - roles.AnonymizedFromClauseRole, element, flat=flat - ) - else: - return AliasedClass( - element, - alias=alias, - flat=flat, - name=name, - adapt_on_names=adapt_on_names, - ) - - @classmethod - def _with_polymorphic_factory( - cls, - base: Union[Type[_O], Mapper[_O]], - classes: Union[Literal["*"], Iterable[_EntityType[Any]]], - selectable: Union[Literal[False, None], FromClause] = False, - flat: bool = False, - polymorphic_on: Optional[ColumnElement[Any]] = None, - aliased: bool = False, - innerjoin: bool = False, - adapt_on_names: bool = False, - _use_mapper_path: bool = False, - ) -> AliasedClass[_O]: - primary_mapper = _class_to_mapper(base) - - if selectable not in (None, False) and flat: - raise sa_exc.ArgumentError( - "the 'flat' and 'selectable' arguments cannot be passed " - "simultaneously to with_polymorphic()" - ) - - mappers, selectable = primary_mapper._with_polymorphic_args( - classes, selectable, innerjoin=innerjoin - ) - if aliased or flat: - assert selectable is not None - selectable = selectable._anonymous_fromclause(flat=flat) - - return AliasedClass( - base, - selectable, - with_polymorphic_mappers=mappers, - adapt_on_names=adapt_on_names, - with_polymorphic_discriminator=polymorphic_on, - use_mapper_path=_use_mapper_path, - represents_outer_join=not innerjoin, - ) - - @property - def entity(self) -> AliasedClass[_O]: - # to eliminate reference cycles, the AliasedClass is held weakly. - # this produces some situations where the AliasedClass gets lost, - # particularly when one is created internally and only the AliasedInsp - # is passed around. - # to work around this case, we just generate a new one when we need - # it, as it is a simple class with very little initial state on it. - ent = self._weak_entity() - if ent is None: - ent = AliasedClass._reconstitute_from_aliased_insp(self) - self._weak_entity = weakref.ref(ent) - return ent - - is_aliased_class = True - "always returns True" - - def _memoized_method___clause_element__(self) -> FromClause: - return self.selectable._annotate( - { - "parentmapper": self.mapper, - "parententity": self, - "entity_namespace": self, - } - )._set_propagate_attrs( - {"compile_state_plugin": "orm", "plugin_subject": self} - ) - - @property - def entity_namespace(self) -> AliasedClass[_O]: - return self.entity - - @property - def class_(self) -> Type[_O]: - """Return the mapped class ultimately represented by this - :class:`.AliasedInsp`.""" - return self.mapper.class_ - - @property - def _path_registry(self) -> AbstractEntityRegistry: - if self._use_mapper_path: - return self.mapper._path_registry - else: - return PathRegistry.per_mapper(self) - - def __getstate__(self) -> Dict[str, Any]: - return { - "entity": self.entity, - "mapper": self.mapper, - "alias": self.selectable, - "name": self.name, - "adapt_on_names": self._adapt_on_names, - "with_polymorphic_mappers": self.with_polymorphic_mappers, - "with_polymorphic_discriminator": self.polymorphic_on, - "base_alias": self._base_alias(), - "use_mapper_path": self._use_mapper_path, - "represents_outer_join": self.represents_outer_join, - "nest_adapters": self._nest_adapters, - } - - def __setstate__(self, state: Dict[str, Any]) -> None: - self.__init__( # type: ignore - state["entity"], - state["mapper"], - state["alias"], - state["name"], - state["with_polymorphic_mappers"], - state["with_polymorphic_discriminator"], - state["base_alias"], - state["use_mapper_path"], - state["adapt_on_names"], - state["represents_outer_join"], - state["nest_adapters"], - ) - - def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]: - # assert self._is_with_polymorphic - # assert other._is_with_polymorphic - - primary_mapper = other.mapper - - assert self.mapper is primary_mapper - - our_classes = util.to_set( - mp.class_ for mp in self.with_polymorphic_mappers - ) - new_classes = {mp.class_ for mp in other.with_polymorphic_mappers} - if our_classes == new_classes: - return other - else: - classes = our_classes.union(new_classes) - - mappers, selectable = primary_mapper._with_polymorphic_args( - classes, None, innerjoin=not other.represents_outer_join - ) - selectable = selectable._anonymous_fromclause(flat=True) - return AliasedClass( - primary_mapper, - selectable, - with_polymorphic_mappers=mappers, - with_polymorphic_discriminator=other.polymorphic_on, - use_mapper_path=other._use_mapper_path, - represents_outer_join=other.represents_outer_join, - )._aliased_insp - - def _adapt_element( - self, expr: _ORMCOLEXPR, key: Optional[str] = None - ) -> _ORMCOLEXPR: - assert isinstance(expr, ColumnElement) - d: Dict[str, Any] = { - "parententity": self, - "parentmapper": self.mapper, - } - if key: - d["proxy_key"] = key - - # IMO mypy should see this one also as returning the same type - # we put into it, but it's not - return ( - self._adapter.traverse(expr) - ._annotate(d) - ._set_propagate_attrs( - {"compile_state_plugin": "orm", "plugin_subject": self} - ) - ) - - if TYPE_CHECKING: - # establish compatibility with the _ORMAdapterProto protocol, - # which in turn is compatible with _CoreAdapterProto. - - def _orm_adapt_element( - self, - obj: _CE, - key: Optional[str] = None, - ) -> _CE: ... - - else: - _orm_adapt_element = _adapt_element - - def _entity_for_mapper(self, mapper): - self_poly = self.with_polymorphic_mappers - if mapper in self_poly: - if mapper is self.mapper: - return self - else: - return getattr( - self.entity, mapper.class_.__name__ - )._aliased_insp - elif mapper.isa(self.mapper): - return self - else: - assert False, "mapper %s doesn't correspond to %s" % (mapper, self) - - def _memoized_attr__get_clause(self): - onclause, replacemap = self.mapper._get_clause - return ( - self._adapter.traverse(onclause), - { - self._adapter.traverse(col): param - for col, param in replacemap.items() - }, - ) - - def _memoized_attr__memoized_values(self): - return {} - - def _memoized_attr__all_column_expressions(self): - if self._is_with_polymorphic: - cols_plus_keys = self.mapper._columns_plus_keys( - [ent.mapper for ent in self._with_polymorphic_entities] - ) - else: - cols_plus_keys = self.mapper._columns_plus_keys() - - cols_plus_keys = [ - (key, self._adapt_element(col)) for key, col in cols_plus_keys - ] - - return ColumnCollection(cols_plus_keys) - - def _memo(self, key, callable_, *args, **kw): - if key in self._memoized_values: - return self._memoized_values[key] - else: - self._memoized_values[key] = value = callable_(*args, **kw) - return value - - def __repr__(self): - if self.with_polymorphic_mappers: - with_poly = "(%s)" % ", ".join( - mp.class_.__name__ for mp in self.with_polymorphic_mappers - ) - else: - with_poly = "" - return "" % ( - id(self), - self.class_.__name__, - with_poly, - ) - - def __str__(self): - if self._is_with_polymorphic: - return "with_polymorphic(%s, [%s])" % ( - self._target.__name__, - ", ".join( - mp.class_.__name__ - for mp in self.with_polymorphic_mappers - if mp is not self.mapper - ), - ) - else: - return "aliased(%s)" % (self._target.__name__,) - - -class _WrapUserEntity: - """A wrapper used within the loader_criteria lambda caller so that - we can bypass declared_attr descriptors on unmapped mixins, which - normally emit a warning for such use. - - might also be useful for other per-lambda instrumentations should - the need arise. - - """ - - __slots__ = ("subject",) - - def __init__(self, subject): - self.subject = subject - - @util.preload_module("sqlalchemy.orm.decl_api") - def __getattribute__(self, name): - decl_api = util.preloaded.orm.decl_api - - subject = object.__getattribute__(self, "subject") - if name in subject.__dict__ and isinstance( - subject.__dict__[name], decl_api.declared_attr - ): - return subject.__dict__[name].fget(subject) - else: - return getattr(subject, name) - - -class LoaderCriteriaOption(CriteriaOption): - """Add additional WHERE criteria to the load for all occurrences of - a particular entity. - - :class:`_orm.LoaderCriteriaOption` is invoked using the - :func:`_orm.with_loader_criteria` function; see that function for - details. - - .. versionadded:: 1.4 - - """ - - __slots__ = ( - "root_entity", - "entity", - "deferred_where_criteria", - "where_criteria", - "_where_crit_orig", - "include_aliases", - "propagate_to_loaders", - ) - - _traverse_internals = [ - ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj), - ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key), - ("where_criteria", visitors.InternalTraversal.dp_clauseelement), - ("include_aliases", visitors.InternalTraversal.dp_boolean), - ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean), - ] - - root_entity: Optional[Type[Any]] - entity: Optional[_InternalEntityType[Any]] - where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement] - deferred_where_criteria: bool - include_aliases: bool - propagate_to_loaders: bool - - _where_crit_orig: Any - - def __init__( - self, - entity_or_base: _EntityType[Any], - where_criteria: Union[ - _ColumnExpressionArgument[bool], - Callable[[Any], _ColumnExpressionArgument[bool]], - ], - loader_only: bool = False, - include_aliases: bool = False, - propagate_to_loaders: bool = True, - track_closure_variables: bool = True, - ): - entity = cast( - "_InternalEntityType[Any]", - inspection.inspect(entity_or_base, False), - ) - if entity is None: - self.root_entity = cast("Type[Any]", entity_or_base) - self.entity = None - else: - self.root_entity = None - self.entity = entity - - self._where_crit_orig = where_criteria - if callable(where_criteria): - if self.root_entity is not None: - wrap_entity = self.root_entity - else: - assert entity is not None - wrap_entity = entity.entity - - self.deferred_where_criteria = True - self.where_criteria = lambdas.DeferredLambdaElement( - where_criteria, - roles.WhereHavingRole, - lambda_args=(_WrapUserEntity(wrap_entity),), - opts=lambdas.LambdaOptions( - track_closure_variables=track_closure_variables - ), - ) - else: - self.deferred_where_criteria = False - self.where_criteria = coercions.expect( - roles.WhereHavingRole, where_criteria - ) - - self.include_aliases = include_aliases - self.propagate_to_loaders = propagate_to_loaders - - @classmethod - def _unreduce( - cls, entity, where_criteria, include_aliases, propagate_to_loaders - ): - return LoaderCriteriaOption( - entity, - where_criteria, - include_aliases=include_aliases, - propagate_to_loaders=propagate_to_loaders, - ) - - def __reduce__(self): - return ( - LoaderCriteriaOption._unreduce, - ( - self.entity.class_ if self.entity else self.root_entity, - self._where_crit_orig, - self.include_aliases, - self.propagate_to_loaders, - ), - ) - - def _all_mappers(self) -> Iterator[Mapper[Any]]: - if self.entity: - yield from self.entity.mapper.self_and_descendants - else: - assert self.root_entity - stack = list(self.root_entity.__subclasses__()) - while stack: - subclass = stack.pop(0) - ent = cast( - "_InternalEntityType[Any]", - inspection.inspect(subclass, raiseerr=False), - ) - if ent: - yield from ent.mapper.self_and_descendants - else: - stack.extend(subclass.__subclasses__()) - - def _should_include(self, compile_state: ORMCompileState) -> bool: - if ( - compile_state.select_statement._annotations.get( - "for_loader_criteria", None - ) - is self - ): - return False - return True - - def _resolve_where_criteria( - self, ext_info: _InternalEntityType[Any] - ) -> ColumnElement[bool]: - if self.deferred_where_criteria: - crit = cast( - "ColumnElement[bool]", - self.where_criteria._resolve_with_args(ext_info.entity), - ) - else: - crit = self.where_criteria # type: ignore - assert isinstance(crit, ColumnElement) - return sql_util._deep_annotate( - crit, - {"for_loader_criteria": self}, - detect_subquery_cols=True, - ind_cols_on_fromclause=True, - ) - - def process_compile_state_replaced_entities( - self, - compile_state: ORMCompileState, - mapper_entities: Iterable[_MapperEntity], - ) -> None: - self.process_compile_state(compile_state) - - def process_compile_state(self, compile_state: ORMCompileState) -> None: - """Apply a modification to a given :class:`.CompileState`.""" - - # if options to limit the criteria to immediate query only, - # use compile_state.attributes instead - - self.get_global_criteria(compile_state.global_attributes) - - def get_global_criteria(self, attributes: Dict[Any, Any]) -> None: - for mp in self._all_mappers(): - load_criteria = attributes.setdefault( - ("additional_entity_criteria", mp), [] - ) - - load_criteria.append(self) - - -inspection._inspects(AliasedClass)(lambda target: target._aliased_insp) - - -@inspection._inspects(type) -def _inspect_mc( - class_: Type[_O], -) -> Optional[Mapper[_O]]: - try: - class_manager = opt_manager_of_class(class_) - if class_manager is None or not class_manager.is_mapped: - return None - mapper = class_manager.mapper - except exc.NO_STATE: - return None - else: - return mapper - - -GenericAlias = type(List[Any]) - - -@inspection._inspects(GenericAlias) -def _inspect_generic_alias( - class_: Type[_O], -) -> Optional[Mapper[_O]]: - origin = cast("Type[_O]", typing_get_origin(class_)) - return _inspect_mc(origin) - - -@inspection._self_inspects -class Bundle( - ORMColumnsClauseRole[_T], - SupportsCloneAnnotations, - MemoizedHasCacheKey, - inspection.Inspectable["Bundle[_T]"], - InspectionAttr, -): - """A grouping of SQL expressions that are returned by a :class:`.Query` - under one namespace. - - The :class:`.Bundle` essentially allows nesting of the tuple-based - results returned by a column-oriented :class:`_query.Query` object. - It also - is extensible via simple subclassing, where the primary capability - to override is that of how the set of expressions should be returned, - allowing post-processing as well as custom return types, without - involving ORM identity-mapped classes. - - .. seealso:: - - :ref:`bundles` - - - """ - - single_entity = False - """If True, queries for a single Bundle will be returned as a single - entity, rather than an element within a keyed tuple.""" - - is_clause_element = False - - is_mapper = False - - is_aliased_class = False - - is_bundle = True - - _propagate_attrs: _PropagateAttrsType = util.immutabledict() - - proxy_set = util.EMPTY_SET # type: ignore - - exprs: List[_ColumnsClauseElement] - - def __init__( - self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any - ): - r"""Construct a new :class:`.Bundle`. - - e.g.:: - - bn = Bundle("mybundle", MyClass.x, MyClass.y) - - for row in session.query(bn).filter( - bn.c.x == 5).filter(bn.c.y == 4): - print(row.mybundle.x, row.mybundle.y) - - :param name: name of the bundle. - :param \*exprs: columns or SQL expressions comprising the bundle. - :param single_entity=False: if True, rows for this :class:`.Bundle` - can be returned as a "single entity" outside of any enclosing tuple - in the same manner as a mapped entity. - - """ - self.name = self._label = name - coerced_exprs = [ - coercions.expect( - roles.ColumnsClauseRole, expr, apply_propagate_attrs=self - ) - for expr in exprs - ] - self.exprs = coerced_exprs - - self.c = self.columns = ColumnCollection( - (getattr(col, "key", col._label), col) - for col in [e._annotations.get("bundle", e) for e in coerced_exprs] - ).as_readonly() - self.single_entity = kw.pop("single_entity", self.single_entity) - - def _gen_cache_key( - self, anon_map: anon_map, bindparams: List[BindParameter[Any]] - ) -> Tuple[Any, ...]: - return (self.__class__, self.name, self.single_entity) + tuple( - [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs] - ) - - @property - def mapper(self) -> Optional[Mapper[Any]]: - mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get( - "parentmapper", None - ) - return mp - - @property - def entity(self) -> Optional[_InternalEntityType[Any]]: - ie: Optional[_InternalEntityType[Any]] = self.exprs[ - 0 - ]._annotations.get("parententity", None) - return ie - - @property - def entity_namespace( - self, - ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: - return self.c - - columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] - - """A namespace of SQL expressions referred to by this :class:`.Bundle`. - - e.g.:: - - bn = Bundle("mybundle", MyClass.x, MyClass.y) - - q = sess.query(bn).filter(bn.c.x == 5) - - Nesting of bundles is also supported:: - - b1 = Bundle("b1", - Bundle('b2', MyClass.a, MyClass.b), - Bundle('b3', MyClass.x, MyClass.y) - ) - - q = sess.query(b1).filter( - b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9) - - .. seealso:: - - :attr:`.Bundle.c` - - """ - - c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]] - """An alias for :attr:`.Bundle.columns`.""" - - def _clone(self): - cloned = self.__class__.__new__(self.__class__) - cloned.__dict__.update(self.__dict__) - return cloned - - def __clause_element__(self): - # ensure existing entity_namespace remains - annotations = {"bundle": self, "entity_namespace": self} - annotations.update(self._annotations) - - plugin_subject = self.exprs[0]._propagate_attrs.get( - "plugin_subject", self.entity - ) - return ( - expression.ClauseList( - _literal_as_text_role=roles.ColumnsClauseRole, - group=False, - *[e._annotations.get("bundle", e) for e in self.exprs], - ) - ._annotate(annotations) - ._set_propagate_attrs( - # the Bundle *must* use the orm plugin no matter what. the - # subject can be None but it's much better if it's not. - { - "compile_state_plugin": "orm", - "plugin_subject": plugin_subject, - } - ) - ) - - @property - def clauses(self): - return self.__clause_element__().clauses - - def label(self, name): - """Provide a copy of this :class:`.Bundle` passing a new label.""" - - cloned = self._clone() - cloned.name = name - return cloned - - def create_row_processor( - self, - query: Select[Any], - procs: Sequence[Callable[[Row[Any]], Any]], - labels: Sequence[str], - ) -> Callable[[Row[Any]], Any]: - """Produce the "row processing" function for this :class:`.Bundle`. - - May be overridden by subclasses to provide custom behaviors when - results are fetched. The method is passed the statement object and a - set of "row processor" functions at query execution time; these - processor functions when given a result row will return the individual - attribute value, which can then be adapted into any kind of return data - structure. - - The example below illustrates replacing the usual :class:`.Row` - return structure with a straight Python dictionary:: - - from sqlalchemy.orm import Bundle - - class DictBundle(Bundle): - def create_row_processor(self, query, procs, labels): - 'Override create_row_processor to return values as - dictionaries' - - def proc(row): - return dict( - zip(labels, (proc(row) for proc in procs)) - ) - return proc - - A result from the above :class:`_orm.Bundle` will return dictionary - values:: - - bn = DictBundle('mybundle', MyClass.data1, MyClass.data2) - for row in session.execute(select(bn)).where(bn.c.data1 == 'd1'): - print(row.mybundle['data1'], row.mybundle['data2']) - - """ - keyed_tuple = result_tuple(labels, [() for l in labels]) - - def proc(row: Row[Any]) -> Any: - return keyed_tuple([proc(row) for proc in procs]) - - return proc - - -def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA: - """Deep copy the given ClauseElement, annotating each element with the - "_orm_adapt" flag. - - Elements within the exclude collection will be cloned but not annotated. - - """ - return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude) - - -def _orm_deannotate(element: _SA) -> _SA: - """Remove annotations that link a column to a particular mapping. - - Note this doesn't affect "remote" and "foreign" annotations - passed by the :func:`_orm.foreign` and :func:`_orm.remote` - annotators. - - """ - - return sql_util._deep_deannotate( - element, values=("_orm_adapt", "parententity") - ) - - -def _orm_full_deannotate(element: _SA) -> _SA: - return sql_util._deep_deannotate(element) - - -class _ORMJoin(expression.Join): - """Extend Join to support ORM constructs as input.""" - - __visit_name__ = expression.Join.__visit_name__ - - inherit_cache = True - - def __init__( - self, - left: _FromClauseArgument, - right: _FromClauseArgument, - onclause: Optional[_OnClauseArgument] = None, - isouter: bool = False, - full: bool = False, - _left_memo: Optional[Any] = None, - _right_memo: Optional[Any] = None, - _extra_criteria: Tuple[ColumnElement[bool], ...] = (), - ): - left_info = cast( - "Union[FromClause, _InternalEntityType[Any]]", - inspection.inspect(left), - ) - - right_info = cast( - "Union[FromClause, _InternalEntityType[Any]]", - inspection.inspect(right), - ) - adapt_to = right_info.selectable - - # used by joined eager loader - self._left_memo = _left_memo - self._right_memo = _right_memo - - if isinstance(onclause, attributes.QueryableAttribute): - if TYPE_CHECKING: - assert isinstance( - onclause.comparator, RelationshipProperty.Comparator - ) - on_selectable = onclause.comparator._source_selectable() - prop = onclause.property - _extra_criteria += onclause._extra_criteria - elif isinstance(onclause, MapperProperty): - # used internally by joined eager loader...possibly not ideal - prop = onclause - on_selectable = prop.parent.selectable - else: - prop = None - on_selectable = None - - left_selectable = left_info.selectable - if prop: - adapt_from: Optional[FromClause] - if sql_util.clause_is_present(on_selectable, left_selectable): - adapt_from = on_selectable - else: - assert isinstance(left_selectable, FromClause) - adapt_from = left_selectable - - ( - pj, - sj, - source, - dest, - secondary, - target_adapter, - ) = prop._create_joins( - source_selectable=adapt_from, - dest_selectable=adapt_to, - source_polymorphic=True, - of_type_entity=right_info, - alias_secondary=True, - extra_criteria=_extra_criteria, - ) - - if sj is not None: - if isouter: - # note this is an inner join from secondary->right - right = sql.join(secondary, right, sj) - onclause = pj - else: - left = sql.join(left, secondary, pj, isouter) - onclause = sj - else: - onclause = pj - - self._target_adapter = target_adapter - - # we don't use the normal coercions logic for _ORMJoin - # (probably should), so do some gymnastics to get the entity. - # logic here is for #8721, which was a major bug in 1.4 - # for almost two years, not reported/fixed until 1.4.43 (!) - if is_selectable(left_info): - parententity = left_selectable._annotations.get( - "parententity", None - ) - elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info): - parententity = left_info - else: - parententity = None - - if parententity is not None: - self._annotations = self._annotations.union( - {"parententity": parententity} - ) - - augment_onclause = bool(_extra_criteria) and not prop - expression.Join.__init__(self, left, right, onclause, isouter, full) - - assert self.onclause is not None - - if augment_onclause: - self.onclause &= sql.and_(*_extra_criteria) - - if ( - not prop - and getattr(right_info, "mapper", None) - and right_info.mapper.single # type: ignore - ): - right_info = cast("_InternalEntityType[Any]", right_info) - # if single inheritance target and we are using a manual - # or implicit ON clause, augment it the same way we'd augment the - # WHERE. - single_crit = right_info.mapper._single_table_criterion - if single_crit is not None: - if insp_is_aliased_class(right_info): - single_crit = right_info._adapter.traverse(single_crit) - self.onclause = self.onclause & single_crit - - def _splice_into_center(self, other): - """Splice a join into the center. - - Given join(a, b) and join(b, c), return join(a, b).join(c) - - """ - leftmost = other - while isinstance(leftmost, sql.Join): - leftmost = leftmost.left - - assert self.right is leftmost - - left = _ORMJoin( - self.left, - other.left, - self.onclause, - isouter=self.isouter, - _left_memo=self._left_memo, - _right_memo=other._left_memo, - ) - - return _ORMJoin( - left, - other.right, - other.onclause, - isouter=other.isouter, - _right_memo=other._right_memo, - ) - - def join( - self, - right: _FromClauseArgument, - onclause: Optional[_OnClauseArgument] = None, - isouter: bool = False, - full: bool = False, - ) -> _ORMJoin: - return _ORMJoin(self, right, onclause, full=full, isouter=isouter) - - def outerjoin( - self, - right: _FromClauseArgument, - onclause: Optional[_OnClauseArgument] = None, - full: bool = False, - ) -> _ORMJoin: - return _ORMJoin(self, right, onclause, isouter=True, full=full) - - -def with_parent( - instance: object, - prop: attributes.QueryableAttribute[Any], - from_entity: Optional[_EntityType[Any]] = None, -) -> ColumnElement[bool]: - """Create filtering criterion that relates this query's primary entity - to the given related instance, using established - :func:`_orm.relationship()` - configuration. - - E.g.:: - - stmt = select(Address).where(with_parent(some_user, User.addresses)) - - - The SQL rendered is the same as that rendered when a lazy loader - would fire off from the given parent on that attribute, meaning - that the appropriate state is taken from the parent object in - Python without the need to render joins to the parent table - in the rendered statement. - - The given property may also make use of :meth:`_orm.PropComparator.of_type` - to indicate the left side of the criteria:: - - - a1 = aliased(Address) - a2 = aliased(Address) - stmt = select(a1, a2).where( - with_parent(u1, User.addresses.of_type(a2)) - ) - - The above use is equivalent to using the - :func:`_orm.with_parent.from_entity` argument:: - - a1 = aliased(Address) - a2 = aliased(Address) - stmt = select(a1, a2).where( - with_parent(u1, User.addresses, from_entity=a2) - ) - - :param instance: - An instance which has some :func:`_orm.relationship`. - - :param property: - Class-bound attribute, which indicates - what relationship from the instance should be used to reconcile the - parent/child relationship. - - :param from_entity: - Entity in which to consider as the left side. This defaults to the - "zero" entity of the :class:`_query.Query` itself. - - .. versionadded:: 1.2 - - """ - prop_t: RelationshipProperty[Any] - - if isinstance(prop, str): - raise sa_exc.ArgumentError( - "with_parent() accepts class-bound mapped attributes, not strings" - ) - elif isinstance(prop, attributes.QueryableAttribute): - if prop._of_type: - from_entity = prop._of_type - mapper_property = prop.property - if mapper_property is None or not prop_is_relationship( - mapper_property - ): - raise sa_exc.ArgumentError( - f"Expected relationship property for with_parent(), " - f"got {mapper_property}" - ) - prop_t = mapper_property - else: - prop_t = prop - - return prop_t._with_parent(instance, from_entity=from_entity) - - -def has_identity(object_: object) -> bool: - """Return True if the given object has a database - identity. - - This typically corresponds to the object being - in either the persistent or detached state. - - .. seealso:: - - :func:`.was_deleted` - - """ - state = attributes.instance_state(object_) - return state.has_identity - - -def was_deleted(object_: object) -> bool: - """Return True if the given object was deleted - within a session flush. - - This is regardless of whether or not the object is - persistent or detached. - - .. seealso:: - - :attr:`.InstanceState.was_deleted` - - """ - - state = attributes.instance_state(object_) - return state.was_deleted - - -def _entity_corresponds_to( - given: _InternalEntityType[Any], entity: _InternalEntityType[Any] -) -> bool: - """determine if 'given' corresponds to 'entity', in terms - of an entity passed to Query that would match the same entity - being referred to elsewhere in the query. - - """ - if insp_is_aliased_class(entity): - if insp_is_aliased_class(given): - if entity._base_alias() is given._base_alias(): - return True - return False - elif insp_is_aliased_class(given): - if given._use_mapper_path: - return entity in given.with_polymorphic_mappers - else: - return entity is given - - assert insp_is_mapper(given) - return entity.common_parent(given) - - -def _entity_corresponds_to_use_path_impl( - given: _InternalEntityType[Any], entity: _InternalEntityType[Any] -) -> bool: - """determine if 'given' corresponds to 'entity', in terms - of a path of loader options where a mapped attribute is taken to - be a member of a parent entity. - - e.g.:: - - someoption(A).someoption(A.b) # -> fn(A, A) -> True - someoption(A).someoption(C.d) # -> fn(A, C) -> False - - a1 = aliased(A) - someoption(a1).someoption(A.b) # -> fn(a1, A) -> False - someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True - - wp = with_polymorphic(A, [A1, A2]) - someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False - someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True - - - """ - if insp_is_aliased_class(given): - return ( - insp_is_aliased_class(entity) - and not entity._use_mapper_path - and (given is entity or entity in given._with_polymorphic_entities) - ) - elif not insp_is_aliased_class(entity): - return given.isa(entity.mapper) - else: - return ( - entity._use_mapper_path - and given in entity.with_polymorphic_mappers - ) - - -def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool: - """determine if 'given' "is a" mapper, in terms of the given - would load rows of type 'mapper'. - - """ - if given.is_aliased_class: - return mapper in given.with_polymorphic_mappers or given.mapper.isa( - mapper - ) - elif given.with_polymorphic_mappers: - return mapper in given.with_polymorphic_mappers - else: - return given.isa(mapper) - - -def _getitem(iterable_query: Query[Any], item: Any) -> Any: - """calculate __getitem__ in terms of an iterable query object - that also has a slice() method. - - """ - - def _no_negative_indexes(): - raise IndexError( - "negative indexes are not accepted by SQL " - "index / slice operators" - ) - - if isinstance(item, slice): - start, stop, step = util.decode_slice(item) - - if ( - isinstance(stop, int) - and isinstance(start, int) - and stop - start <= 0 - ): - return [] - - elif (isinstance(start, int) and start < 0) or ( - isinstance(stop, int) and stop < 0 - ): - _no_negative_indexes() - - res = iterable_query.slice(start, stop) - if step is not None: - return list(res)[None : None : item.step] - else: - return list(res) - else: - if item == -1: - _no_negative_indexes() - else: - return list(iterable_query[item : item + 1])[0] - - -def _is_mapped_annotation( - raw_annotation: _AnnotationScanType, - cls: Type[Any], - originating_cls: Type[Any], -) -> bool: - try: - annotated = de_stringify_annotation( - cls, raw_annotation, originating_cls.__module__ - ) - except NameError: - # in most cases, at least within our own tests, we can raise - # here, which is more accurate as it prevents us from returning - # false negatives. However, in the real world, try to avoid getting - # involved with end-user annotations that have nothing to do with us. - # see issue #8888 where we bypass using this function in the case - # that we want to detect an unresolvable Mapped[] type. - return False - else: - return is_origin_of_cls(annotated, _MappedAnnotationBase) - - -class _CleanupError(Exception): - pass - - -def _cleanup_mapped_str_annotation( - annotation: str, originating_module: str -) -> str: - # fix up an annotation that comes in as the form: - # 'Mapped[List[Address]]' so that it instead looks like: - # 'Mapped[List["Address"]]' , which will allow us to get - # "Address" as a string - - # additionally, resolve symbols for these names since this is where - # we'd have to do it - - inner: Optional[Match[str]] - - mm = re.match(r"^(.+?)\[(.+)\]$", annotation) - - if not mm: - return annotation - - # ticket #8759. Resolve the Mapped name to a real symbol. - # originally this just checked the name. - try: - obj = eval_name_only(mm.group(1), originating_module) - except NameError as ne: - raise _CleanupError( - f'For annotation "{annotation}", could not resolve ' - f'container type "{mm.group(1)}". ' - "Please ensure this type is imported at the module level " - "outside of TYPE_CHECKING blocks" - ) from ne - - if obj is typing.ClassVar: - real_symbol = "ClassVar" - else: - try: - if issubclass(obj, _MappedAnnotationBase): - real_symbol = obj.__name__ - else: - return annotation - except TypeError: - # avoid isinstance(obj, type) check, just catch TypeError - return annotation - - # note: if one of the codepaths above didn't define real_symbol and - # then didn't return, real_symbol raises UnboundLocalError - # which is actually a NameError, and the calling routines don't - # notice this since they are catching NameError anyway. Just in case - # this is being modified in the future, something to be aware of. - - stack = [] - inner = mm - while True: - stack.append(real_symbol if mm is inner else inner.group(1)) - g2 = inner.group(2) - inner = re.match(r"^(.+?)\[(.+)\]$", g2) - if inner is None: - stack.append(g2) - break - - # stacks we want to rewrite, that is, quote the last entry which - # we think is a relationship class name: - # - # ['Mapped', 'List', 'Address'] - # ['Mapped', 'A'] - # - # stacks we dont want to rewrite, which are generally MappedColumn - # use cases: - # - # ['Mapped', "'Optional[Dict[str, str]]'"] - # ['Mapped', 'dict[str, str] | None'] - - if ( - # avoid already quoted symbols such as - # ['Mapped', "'Optional[Dict[str, str]]'"] - not re.match(r"""^["'].*["']$""", stack[-1]) - # avoid further generics like Dict[] such as - # ['Mapped', 'dict[str, str] | None'] - and not re.match(r".*\[.*\]", stack[-1]) - ): - stripchars = "\"' " - stack[-1] = ", ".join( - f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",") - ) - - annotation = "[".join(stack) + ("]" * (len(stack) - 1)) - - return annotation - - -def _extract_mapped_subtype( - raw_annotation: Optional[_AnnotationScanType], - cls: type, - originating_module: str, - key: str, - attr_cls: Type[Any], - required: bool, - is_dataclass_field: bool, - expect_mapped: bool = True, - raiseerr: bool = True, -) -> Optional[Tuple[Union[type, str], Optional[type]]]: - """given an annotation, figure out if it's ``Mapped[something]`` and if - so, return the ``something`` part. - - Includes error raise scenarios and other options. - - """ - - if raw_annotation is None: - if required: - raise sa_exc.ArgumentError( - f"Python typing annotation is required for attribute " - f'"{cls.__name__}.{key}" when primary argument(s) for ' - f'"{attr_cls.__name__}" construct are None or not present' - ) - return None - - try: - annotated = de_stringify_annotation( - cls, - raw_annotation, - originating_module, - str_cleanup_fn=_cleanup_mapped_str_annotation, - ) - except _CleanupError as ce: - raise sa_exc.ArgumentError( - f"Could not interpret annotation {raw_annotation}. " - "Check that it uses names that are correctly imported at the " - "module level. See chained stack trace for more hints." - ) from ce - except NameError as ne: - if raiseerr and "Mapped[" in raw_annotation: # type: ignore - raise sa_exc.ArgumentError( - f"Could not interpret annotation {raw_annotation}. " - "Check that it uses names that are correctly imported at the " - "module level. See chained stack trace for more hints." - ) from ne - - annotated = raw_annotation # type: ignore - - if is_dataclass_field: - return annotated, None - else: - if not hasattr(annotated, "__origin__") or not is_origin_of_cls( - annotated, _MappedAnnotationBase - ): - if expect_mapped: - if not raiseerr: - return None - - origin = getattr(annotated, "__origin__", None) - if origin is typing.ClassVar: - return None - - # check for other kind of ORM descriptor like AssociationProxy, - # don't raise for that (issue #9957) - elif isinstance(origin, type) and issubclass( - origin, ORMDescriptor - ): - return None - - raise sa_exc.ArgumentError( - f'Type annotation for "{cls.__name__}.{key}" ' - "can't be correctly interpreted for " - "Annotated Declarative Table form. ORM annotations " - "should normally make use of the ``Mapped[]`` generic " - "type, or other ORM-compatible generic type, as a " - "container for the actual type, which indicates the " - "intent that the attribute is mapped. " - "Class variables that are not intended to be mapped " - "by the ORM should use ClassVar[]. " - "To allow Annotated Declarative to disregard legacy " - "annotations which don't use Mapped[] to pass, set " - '"__allow_unmapped__ = True" on the class or a ' - "superclass this class.", - code="zlpr", - ) - - else: - return annotated, None - - if len(annotated.__args__) != 1: - raise sa_exc.ArgumentError( - "Expected sub-type for Mapped[] annotation" - ) - - return annotated.__args__[0], annotated.__origin__ - - -def _mapper_property_as_plain_name(prop: Type[Any]) -> str: - if hasattr(prop, "_mapper_property_name"): - name = prop._mapper_property_name() - else: - name = None - return util.clsname_as_plain_name(prop, name) -- cgit v1.2.3