summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py2416
1 files changed, 0 insertions, 2416 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py
deleted file mode 100644
index 8e153e6..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/orm/util.py
+++ /dev/null
@@ -1,2416 +0,0 @@
-# orm/util.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: allow-untyped-defs, allow-untyped-calls
-
-from __future__ import annotations
-
-import enum
-import functools
-import re
-import types
-import typing
-from typing import AbstractSet
-from typing import Any
-from typing import Callable
-from typing import cast
-from typing import Dict
-from typing import FrozenSet
-from typing import Generic
-from typing import Iterable
-from typing import Iterator
-from typing import List
-from typing import Match
-from typing import Optional
-from typing import Sequence
-from typing import Tuple
-from typing import Type
-from typing import TYPE_CHECKING
-from typing import TypeVar
-from typing import Union
-import weakref
-
-from . import attributes # noqa
-from . import exc
-from ._typing import _O
-from ._typing import insp_is_aliased_class
-from ._typing import insp_is_mapper
-from ._typing import prop_is_relationship
-from .base import _class_to_mapper as _class_to_mapper
-from .base import _MappedAnnotationBase
-from .base import _never_set as _never_set # noqa: F401
-from .base import _none_set as _none_set # noqa: F401
-from .base import attribute_str as attribute_str # noqa: F401
-from .base import class_mapper as class_mapper
-from .base import DynamicMapped
-from .base import InspectionAttr as InspectionAttr
-from .base import instance_str as instance_str # noqa: F401
-from .base import Mapped
-from .base import object_mapper as object_mapper
-from .base import object_state as object_state # noqa: F401
-from .base import opt_manager_of_class
-from .base import ORMDescriptor
-from .base import state_attribute_str as state_attribute_str # noqa: F401
-from .base import state_class_str as state_class_str # noqa: F401
-from .base import state_str as state_str # noqa: F401
-from .base import WriteOnlyMapped
-from .interfaces import CriteriaOption
-from .interfaces import MapperProperty as MapperProperty
-from .interfaces import ORMColumnsClauseRole
-from .interfaces import ORMEntityColumnsClauseRole
-from .interfaces import ORMFromClauseRole
-from .path_registry import PathRegistry as PathRegistry
-from .. import event
-from .. import exc as sa_exc
-from .. import inspection
-from .. import sql
-from .. import util
-from ..engine.result import result_tuple
-from ..sql import coercions
-from ..sql import expression
-from ..sql import lambdas
-from ..sql import roles
-from ..sql import util as sql_util
-from ..sql import visitors
-from ..sql._typing import is_selectable
-from ..sql.annotation import SupportsCloneAnnotations
-from ..sql.base import ColumnCollection
-from ..sql.cache_key import HasCacheKey
-from ..sql.cache_key import MemoizedHasCacheKey
-from ..sql.elements import ColumnElement
-from ..sql.elements import KeyedColumnElement
-from ..sql.selectable import FromClause
-from ..util.langhelpers import MemoizedSlots
-from ..util.typing import de_stringify_annotation as _de_stringify_annotation
-from ..util.typing import (
- de_stringify_union_elements as _de_stringify_union_elements,
-)
-from ..util.typing import eval_name_only as _eval_name_only
-from ..util.typing import is_origin_of_cls
-from ..util.typing import Literal
-from ..util.typing import Protocol
-from ..util.typing import typing_get_origin
-
-if typing.TYPE_CHECKING:
- from ._typing import _EntityType
- from ._typing import _IdentityKeyType
- from ._typing import _InternalEntityType
- from ._typing import _ORMCOLEXPR
- from .context import _MapperEntity
- from .context import ORMCompileState
- from .mapper import Mapper
- from .path_registry import AbstractEntityRegistry
- from .query import Query
- from .relationships import RelationshipProperty
- from ..engine import Row
- from ..engine import RowMapping
- from ..sql._typing import _CE
- from ..sql._typing import _ColumnExpressionArgument
- from ..sql._typing import _EquivalentColumnMap
- from ..sql._typing import _FromClauseArgument
- from ..sql._typing import _OnClauseArgument
- from ..sql._typing import _PropagateAttrsType
- from ..sql.annotation import _SA
- from ..sql.base import ReadOnlyColumnCollection
- from ..sql.elements import BindParameter
- from ..sql.selectable import _ColumnsClauseElement
- from ..sql.selectable import Select
- from ..sql.selectable import Selectable
- from ..sql.visitors import anon_map
- from ..util.typing import _AnnotationScanType
- from ..util.typing import ArgsTypeProcotol
-
-_T = TypeVar("_T", bound=Any)
-
-all_cascades = frozenset(
- (
- "delete",
- "delete-orphan",
- "all",
- "merge",
- "expunge",
- "save-update",
- "refresh-expire",
- "none",
- )
-)
-
-
-_de_stringify_partial = functools.partial(
- functools.partial,
- locals_=util.immutabledict(
- {
- "Mapped": Mapped,
- "WriteOnlyMapped": WriteOnlyMapped,
- "DynamicMapped": DynamicMapped,
- }
- ),
-)
-
-# partial is practically useless as we have to write out the whole
-# function and maintain the signature anyway
-
-
-class _DeStringifyAnnotation(Protocol):
- def __call__(
- self,
- cls: Type[Any],
- annotation: _AnnotationScanType,
- originating_module: str,
- *,
- str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
- include_generic: bool = False,
- ) -> Type[Any]: ...
-
-
-de_stringify_annotation = cast(
- _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
-)
-
-
-class _DeStringifyUnionElements(Protocol):
- def __call__(
- self,
- cls: Type[Any],
- annotation: ArgsTypeProcotol,
- originating_module: str,
- *,
- str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
- ) -> Type[Any]: ...
-
-
-de_stringify_union_elements = cast(
- _DeStringifyUnionElements,
- _de_stringify_partial(_de_stringify_union_elements),
-)
-
-
-class _EvalNameOnly(Protocol):
- def __call__(self, name: str, module_name: str) -> Any: ...
-
-
-eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
-
-
-class CascadeOptions(FrozenSet[str]):
- """Keeps track of the options sent to
- :paramref:`.relationship.cascade`"""
-
- _add_w_all_cascades = all_cascades.difference(
- ["all", "none", "delete-orphan"]
- )
- _allowed_cascades = all_cascades
-
- _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
-
- __slots__ = (
- "save_update",
- "delete",
- "refresh_expire",
- "merge",
- "expunge",
- "delete_orphan",
- )
-
- save_update: bool
- delete: bool
- refresh_expire: bool
- merge: bool
- expunge: bool
- delete_orphan: bool
-
- def __new__(
- cls, value_list: Optional[Union[Iterable[str], str]]
- ) -> CascadeOptions:
- if isinstance(value_list, str) or value_list is None:
- return cls.from_string(value_list) # type: ignore
- values = set(value_list)
- if values.difference(cls._allowed_cascades):
- raise sa_exc.ArgumentError(
- "Invalid cascade option(s): %s"
- % ", ".join(
- [
- repr(x)
- for x in sorted(
- values.difference(cls._allowed_cascades)
- )
- ]
- )
- )
-
- if "all" in values:
- values.update(cls._add_w_all_cascades)
- if "none" in values:
- values.clear()
- values.discard("all")
-
- self = super().__new__(cls, values)
- self.save_update = "save-update" in values
- self.delete = "delete" in values
- self.refresh_expire = "refresh-expire" in values
- self.merge = "merge" in values
- self.expunge = "expunge" in values
- self.delete_orphan = "delete-orphan" in values
-
- if self.delete_orphan and not self.delete:
- util.warn("The 'delete-orphan' cascade option requires 'delete'.")
- return self
-
- def __repr__(self):
- return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
-
- @classmethod
- def from_string(cls, arg):
- values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
- return cls(values)
-
-
-def _validator_events(desc, key, validator, include_removes, include_backrefs):
- """Runs a validation method on an attribute value to be set or
- appended.
- """
-
- if not include_backrefs:
-
- def detect_is_backref(state, initiator):
- impl = state.manager[key].impl
- return initiator.impl is not impl
-
- if include_removes:
-
- def append(state, value, initiator):
- if initiator.op is not attributes.OP_BULK_REPLACE and (
- include_backrefs or not detect_is_backref(state, initiator)
- ):
- return validator(state.obj(), key, value, False)
- else:
- return value
-
- def bulk_set(state, values, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- obj = state.obj()
- values[:] = [
- validator(obj, key, value, False) for value in values
- ]
-
- def set_(state, value, oldvalue, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- return validator(state.obj(), key, value, False)
- else:
- return value
-
- def remove(state, value, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- validator(state.obj(), key, value, True)
-
- else:
-
- def append(state, value, initiator):
- if initiator.op is not attributes.OP_BULK_REPLACE and (
- include_backrefs or not detect_is_backref(state, initiator)
- ):
- return validator(state.obj(), key, value)
- else:
- return value
-
- def bulk_set(state, values, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- obj = state.obj()
- values[:] = [validator(obj, key, value) for value in values]
-
- def set_(state, value, oldvalue, initiator):
- if include_backrefs or not detect_is_backref(state, initiator):
- return validator(state.obj(), key, value)
- else:
- return value
-
- event.listen(desc, "append", append, raw=True, retval=True)
- event.listen(desc, "bulk_replace", bulk_set, raw=True)
- event.listen(desc, "set", set_, raw=True, retval=True)
- if include_removes:
- event.listen(desc, "remove", remove, raw=True, retval=True)
-
-
-def polymorphic_union(
- table_map, typecolname, aliasname="p_union", cast_nulls=True
-):
- """Create a ``UNION`` statement used by a polymorphic mapper.
-
- See :ref:`concrete_inheritance` for an example of how
- this is used.
-
- :param table_map: mapping of polymorphic identities to
- :class:`_schema.Table` objects.
- :param typecolname: string name of a "discriminator" column, which will be
- derived from the query, producing the polymorphic identity for
- each row. If ``None``, no polymorphic discriminator is generated.
- :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
- construct generated.
- :param cast_nulls: if True, non-existent columns, which are represented
- as labeled NULLs, will be passed into CAST. This is a legacy behavior
- that is problematic on some backends such as Oracle - in which case it
- can be set to False.
-
- """
-
- colnames: util.OrderedSet[str] = util.OrderedSet()
- colnamemaps = {}
- types = {}
- for key in table_map:
- table = table_map[key]
-
- table = coercions.expect(
- roles.StrictFromClauseRole, table, allow_select=True
- )
- table_map[key] = table
-
- m = {}
- for c in table.c:
- if c.key == typecolname:
- raise sa_exc.InvalidRequestError(
- "Polymorphic union can't use '%s' as the discriminator "
- "column due to mapped column %r; please apply the "
- "'typecolname' "
- "argument; this is available on "
- "ConcreteBase as '_concrete_discriminator_name'"
- % (typecolname, c)
- )
- colnames.add(c.key)
- m[c.key] = c
- types[c.key] = c.type
- colnamemaps[table] = m
-
- def col(name, table):
- try:
- return colnamemaps[table][name]
- except KeyError:
- if cast_nulls:
- return sql.cast(sql.null(), types[name]).label(name)
- else:
- return sql.type_coerce(sql.null(), types[name]).label(name)
-
- result = []
- for type_, table in table_map.items():
- if typecolname is not None:
- result.append(
- sql.select(
- *(
- [col(name, table) for name in colnames]
- + [
- sql.literal_column(
- sql_util._quote_ddl_expr(type_)
- ).label(typecolname)
- ]
- )
- ).select_from(table)
- )
- else:
- result.append(
- sql.select(
- *[col(name, table) for name in colnames]
- ).select_from(table)
- )
- return sql.union_all(*result).alias(aliasname)
-
-
-def identity_key(
- class_: Optional[Type[_T]] = None,
- ident: Union[Any, Tuple[Any, ...]] = None,
- *,
- instance: Optional[_T] = None,
- row: Optional[Union[Row[Any], RowMapping]] = None,
- identity_token: Optional[Any] = None,
-) -> _IdentityKeyType[_T]:
- r"""Generate "identity key" tuples, as are used as keys in the
- :attr:`.Session.identity_map` dictionary.
-
- This function has several call styles:
-
- * ``identity_key(class, ident, identity_token=token)``
-
- This form receives a mapped class and a primary key scalar or
- tuple as an argument.
-
- E.g.::
-
- >>> identity_key(MyClass, (1, 2))
- (<class '__main__.MyClass'>, (1, 2), None)
-
- :param class: mapped class (must be a positional argument)
- :param ident: primary key, may be a scalar or tuple argument.
- :param identity_token: optional identity token
-
- .. versionadded:: 1.2 added identity_token
-
-
- * ``identity_key(instance=instance)``
-
- This form will produce the identity key for a given instance. The
- instance need not be persistent, only that its primary key attributes
- are populated (else the key will contain ``None`` for those missing
- values).
-
- E.g.::
-
- >>> instance = MyClass(1, 2)
- >>> identity_key(instance=instance)
- (<class '__main__.MyClass'>, (1, 2), None)
-
- In this form, the given instance is ultimately run though
- :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
- effect of performing a database check for the corresponding row
- if the object is expired.
-
- :param instance: object instance (must be given as a keyword arg)
-
- * ``identity_key(class, row=row, identity_token=token)``
-
- This form is similar to the class/tuple form, except is passed a
- database result row as a :class:`.Row` or :class:`.RowMapping` object.
-
- E.g.::
-
- >>> row = engine.execute(\
- text("select * from table where a=1 and b=2")\
- ).first()
- >>> identity_key(MyClass, row=row)
- (<class '__main__.MyClass'>, (1, 2), None)
-
- :param class: mapped class (must be a positional argument)
- :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
- (must be given as a keyword arg)
- :param identity_token: optional identity token
-
- .. versionadded:: 1.2 added identity_token
-
- """
- if class_ is not None:
- mapper = class_mapper(class_)
- if row is None:
- if ident is None:
- raise sa_exc.ArgumentError("ident or row is required")
- return mapper.identity_key_from_primary_key(
- tuple(util.to_list(ident)), identity_token=identity_token
- )
- else:
- return mapper.identity_key_from_row(
- row, identity_token=identity_token
- )
- elif instance is not None:
- mapper = object_mapper(instance)
- return mapper.identity_key_from_instance(instance)
- else:
- raise sa_exc.ArgumentError("class or instance is required")
-
-
-class _TraceAdaptRole(enum.Enum):
- """Enumeration of all the use cases for ORMAdapter.
-
- ORMAdapter remains one of the most complicated aspects of the ORM, as it is
- used for in-place adaption of column expressions to be applied to a SELECT,
- replacing :class:`.Table` and other objects that are mapped to classes with
- aliases of those tables in the case of joined eager loading, or in the case
- of polymorphic loading as used with concrete mappings or other custom "with
- polymorphic" parameters, with whole user-defined subqueries. The
- enumerations provide an overview of all the use cases used by ORMAdapter, a
- layer of formality as to the introduction of new ORMAdapter use cases (of
- which none are anticipated), as well as a means to trace the origins of a
- particular ORMAdapter within runtime debugging.
-
- SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
- open-ended statement adaption, including the ``Query.with_polymorphic()``
- method and the ``Query.select_from_entity()`` methods, favoring
- user-explicit aliasing schemes using the ``aliased()`` and
- ``with_polymorphic()`` standalone constructs; these still use adaption,
- however the adaption is applied in a narrower scope.
-
- """
-
- # aliased() use that is used to adapt individual attributes at query
- # construction time
- ALIASED_INSP = enum.auto()
-
- # joinedload cases; typically adapt an ON clause of a relationship
- # join
- JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
- JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
- JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
-
- # polymorphic cases - these are complex ones that replace FROM
- # clauses, replacing tables with subqueries
- MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
- WITH_POLYMORPHIC_ADAPTER = enum.auto()
- WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
- DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
-
- # the from_statement() case, used only to adapt individual attributes
- # from a given statement to local ORM attributes at result fetching
- # time. assigned to ORMCompileState._from_obj_alias
- ADAPT_FROM_STATEMENT = enum.auto()
-
- # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
- # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
- # joinedloads are then placed on the outside.
- # assigned to ORMCompileState.compound_eager_adapter
- COMPOUND_EAGER_STATEMENT = enum.auto()
-
- # the legacy Query._set_select_from() case.
- # this is needed for Query's set operations (i.e. UNION, etc. )
- # as well as "legacy from_self()", which while removed from 2.0 as
- # public API, is used for the Query.count() method. this one
- # still does full statement traversal
- # assigned to ORMCompileState._from_obj_alias
- LEGACY_SELECT_FROM_ALIAS = enum.auto()
-
-
-class ORMStatementAdapter(sql_util.ColumnAdapter):
- """ColumnAdapter which includes a role attribute."""
-
- __slots__ = ("role",)
-
- def __init__(
- self,
- role: _TraceAdaptRole,
- selectable: Selectable,
- *,
- equivalents: Optional[_EquivalentColumnMap] = None,
- adapt_required: bool = False,
- allow_label_resolve: bool = True,
- anonymize_labels: bool = False,
- adapt_on_names: bool = False,
- adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
- ):
- self.role = role
- super().__init__(
- selectable,
- equivalents=equivalents,
- adapt_required=adapt_required,
- allow_label_resolve=allow_label_resolve,
- anonymize_labels=anonymize_labels,
- adapt_on_names=adapt_on_names,
- adapt_from_selectables=adapt_from_selectables,
- )
-
-
-class ORMAdapter(sql_util.ColumnAdapter):
- """ColumnAdapter subclass which excludes adaptation of entities from
- non-matching mappers.
-
- """
-
- __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
-
- is_aliased_class: bool
- aliased_insp: Optional[AliasedInsp[Any]]
-
- def __init__(
- self,
- role: _TraceAdaptRole,
- entity: _InternalEntityType[Any],
- *,
- equivalents: Optional[_EquivalentColumnMap] = None,
- adapt_required: bool = False,
- allow_label_resolve: bool = True,
- anonymize_labels: bool = False,
- selectable: Optional[Selectable] = None,
- limit_on_entity: bool = True,
- adapt_on_names: bool = False,
- adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
- ):
- self.role = role
- self.mapper = entity.mapper
- if selectable is None:
- selectable = entity.selectable
- if insp_is_aliased_class(entity):
- self.is_aliased_class = True
- self.aliased_insp = entity
- else:
- self.is_aliased_class = False
- self.aliased_insp = None
-
- super().__init__(
- selectable,
- equivalents,
- adapt_required=adapt_required,
- allow_label_resolve=allow_label_resolve,
- anonymize_labels=anonymize_labels,
- include_fn=self._include_fn if limit_on_entity else None,
- adapt_on_names=adapt_on_names,
- adapt_from_selectables=adapt_from_selectables,
- )
-
- def _include_fn(self, elem):
- entity = elem._annotations.get("parentmapper", None)
-
- return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
-
-
-class AliasedClass(
- inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
-):
- r"""Represents an "aliased" form of a mapped class for usage with Query.
-
- The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
- construct, this object mimics the mapped class using a
- ``__getattr__`` scheme and maintains a reference to a
- real :class:`~sqlalchemy.sql.expression.Alias` object.
-
- A primary purpose of :class:`.AliasedClass` is to serve as an alternate
- within a SQL statement generated by the ORM, such that an existing
- mapped entity can be used in multiple contexts. A simple example::
-
- # find all pairs of users with the same name
- user_alias = aliased(User)
- session.query(User, user_alias).\
- join((user_alias, User.id > user_alias.id)).\
- filter(User.name == user_alias.name)
-
- :class:`.AliasedClass` is also capable of mapping an existing mapped
- class to an entirely new selectable, provided this selectable is column-
- compatible with the existing mapped selectable, and it can also be
- configured in a mapping as the target of a :func:`_orm.relationship`.
- See the links below for examples.
-
- The :class:`.AliasedClass` object is constructed typically using the
- :func:`_orm.aliased` function. It also is produced with additional
- configuration when using the :func:`_orm.with_polymorphic` function.
-
- The resulting object is an instance of :class:`.AliasedClass`.
- This object implements an attribute scheme which produces the
- same attribute and method interface as the original mapped
- class, allowing :class:`.AliasedClass` to be compatible
- with any attribute technique which works on the original class,
- including hybrid attributes (see :ref:`hybrids_toplevel`).
-
- The :class:`.AliasedClass` can be inspected for its underlying
- :class:`_orm.Mapper`, aliased selectable, and other information
- using :func:`_sa.inspect`::
-
- from sqlalchemy import inspect
- my_alias = aliased(MyClass)
- insp = inspect(my_alias)
-
- The resulting inspection object is an instance of :class:`.AliasedInsp`.
-
-
- .. seealso::
-
- :func:`.aliased`
-
- :func:`.with_polymorphic`
-
- :ref:`relationship_aliased_class`
-
- :ref:`relationship_to_window_function`
-
-
- """
-
- __name__: str
-
- def __init__(
- self,
- mapped_class_or_ac: _EntityType[_O],
- alias: Optional[FromClause] = None,
- name: Optional[str] = None,
- flat: bool = False,
- adapt_on_names: bool = False,
- with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
- with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
- base_alias: Optional[AliasedInsp[Any]] = None,
- use_mapper_path: bool = False,
- represents_outer_join: bool = False,
- ):
- insp = cast(
- "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
- )
- mapper = insp.mapper
-
- nest_adapters = False
-
- if alias is None:
- if insp.is_aliased_class and insp.selectable._is_subquery:
- alias = insp.selectable.alias()
- else:
- alias = (
- mapper._with_polymorphic_selectable._anonymous_fromclause(
- name=name,
- flat=flat,
- )
- )
- elif insp.is_aliased_class:
- nest_adapters = True
-
- assert alias is not None
- self._aliased_insp = AliasedInsp(
- self,
- insp,
- alias,
- name,
- (
- with_polymorphic_mappers
- if with_polymorphic_mappers
- else mapper.with_polymorphic_mappers
- ),
- (
- with_polymorphic_discriminator
- if with_polymorphic_discriminator is not None
- else mapper.polymorphic_on
- ),
- base_alias,
- use_mapper_path,
- adapt_on_names,
- represents_outer_join,
- nest_adapters,
- )
-
- self.__name__ = f"aliased({mapper.class_.__name__})"
-
- @classmethod
- def _reconstitute_from_aliased_insp(
- cls, aliased_insp: AliasedInsp[_O]
- ) -> AliasedClass[_O]:
- obj = cls.__new__(cls)
- obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
- obj._aliased_insp = aliased_insp
-
- if aliased_insp._is_with_polymorphic:
- for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
- if sub_aliased_insp is not aliased_insp:
- ent = AliasedClass._reconstitute_from_aliased_insp(
- sub_aliased_insp
- )
- setattr(obj, sub_aliased_insp.class_.__name__, ent)
-
- return obj
-
- def __getattr__(self, key: str) -> Any:
- try:
- _aliased_insp = self.__dict__["_aliased_insp"]
- except KeyError:
- raise AttributeError()
- else:
- target = _aliased_insp._target
- # maintain all getattr mechanics
- attr = getattr(target, key)
-
- # attribute is a method, that will be invoked against a
- # "self"; so just return a new method with the same function and
- # new self
- if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
- return types.MethodType(attr.__func__, self)
-
- # attribute is a descriptor, that will be invoked against a
- # "self"; so invoke the descriptor against this self
- if hasattr(attr, "__get__"):
- attr = attr.__get__(None, self)
-
- # attributes within the QueryableAttribute system will want this
- # to be invoked so the object can be adapted
- if hasattr(attr, "adapt_to_entity"):
- attr = attr.adapt_to_entity(_aliased_insp)
- setattr(self, key, attr)
-
- return attr
-
- def _get_from_serialized(
- self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
- ) -> Any:
- # this method is only used in terms of the
- # sqlalchemy.ext.serializer extension
- attr = getattr(mapped_class, key)
- if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
- return types.MethodType(attr.__func__, self)
-
- # attribute is a descriptor, that will be invoked against a
- # "self"; so invoke the descriptor against this self
- if hasattr(attr, "__get__"):
- attr = attr.__get__(None, self)
-
- # attributes within the QueryableAttribute system will want this
- # to be invoked so the object can be adapted
- if hasattr(attr, "adapt_to_entity"):
- aliased_insp._weak_entity = weakref.ref(self)
- attr = attr.adapt_to_entity(aliased_insp)
- setattr(self, key, attr)
-
- return attr
-
- def __repr__(self) -> str:
- return "<AliasedClass at 0x%x; %s>" % (
- id(self),
- self._aliased_insp._target.__name__,
- )
-
- def __str__(self) -> str:
- return str(self._aliased_insp)
-
-
-@inspection._self_inspects
-class AliasedInsp(
- ORMEntityColumnsClauseRole[_O],
- ORMFromClauseRole,
- HasCacheKey,
- InspectionAttr,
- MemoizedSlots,
- inspection.Inspectable["AliasedInsp[_O]"],
- Generic[_O],
-):
- """Provide an inspection interface for an
- :class:`.AliasedClass` object.
-
- The :class:`.AliasedInsp` object is returned
- given an :class:`.AliasedClass` using the
- :func:`_sa.inspect` function::
-
- from sqlalchemy import inspect
- from sqlalchemy.orm import aliased
-
- my_alias = aliased(MyMappedClass)
- insp = inspect(my_alias)
-
- Attributes on :class:`.AliasedInsp`
- include:
-
- * ``entity`` - the :class:`.AliasedClass` represented.
- * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
- * ``selectable`` - the :class:`_expression.Alias`
- construct which ultimately
- represents an aliased :class:`_schema.Table` or
- :class:`_expression.Select`
- construct.
- * ``name`` - the name of the alias. Also is used as the attribute
- name when returned in a result tuple from :class:`_query.Query`.
- * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
- objects
- indicating all those mappers expressed in the select construct
- for the :class:`.AliasedClass`.
- * ``polymorphic_on`` - an alternate column or SQL expression which
- will be used as the "discriminator" for a polymorphic load.
-
- .. seealso::
-
- :ref:`inspection_toplevel`
-
- """
-
- __slots__ = (
- "__weakref__",
- "_weak_entity",
- "mapper",
- "selectable",
- "name",
- "_adapt_on_names",
- "with_polymorphic_mappers",
- "polymorphic_on",
- "_use_mapper_path",
- "_base_alias",
- "represents_outer_join",
- "persist_selectable",
- "local_table",
- "_is_with_polymorphic",
- "_with_polymorphic_entities",
- "_adapter",
- "_target",
- "__clause_element__",
- "_memoized_values",
- "_all_column_expressions",
- "_nest_adapters",
- )
-
- _cache_key_traversal = [
- ("name", visitors.ExtendedInternalTraversal.dp_string),
- ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
- ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
- ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
- ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
- (
- "with_polymorphic_mappers",
- visitors.InternalTraversal.dp_has_cache_key_list,
- ),
- ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
- ]
-
- mapper: Mapper[_O]
- selectable: FromClause
- _adapter: ORMAdapter
- with_polymorphic_mappers: Sequence[Mapper[Any]]
- _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
-
- _weak_entity: weakref.ref[AliasedClass[_O]]
- """the AliasedClass that refers to this AliasedInsp"""
-
- _target: Union[Type[_O], AliasedClass[_O]]
- """the thing referenced by the AliasedClass/AliasedInsp.
-
- In the vast majority of cases, this is the mapped class. However
- it may also be another AliasedClass (alias of alias).
-
- """
-
- def __init__(
- self,
- entity: AliasedClass[_O],
- inspected: _InternalEntityType[_O],
- selectable: FromClause,
- name: Optional[str],
- with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
- polymorphic_on: Optional[ColumnElement[Any]],
- _base_alias: Optional[AliasedInsp[Any]],
- _use_mapper_path: bool,
- adapt_on_names: bool,
- represents_outer_join: bool,
- nest_adapters: bool,
- ):
- mapped_class_or_ac = inspected.entity
- mapper = inspected.mapper
-
- self._weak_entity = weakref.ref(entity)
- self.mapper = mapper
- self.selectable = self.persist_selectable = self.local_table = (
- selectable
- )
- self.name = name
- self.polymorphic_on = polymorphic_on
- self._base_alias = weakref.ref(_base_alias or self)
- self._use_mapper_path = _use_mapper_path
- self.represents_outer_join = represents_outer_join
- self._nest_adapters = nest_adapters
-
- if with_polymorphic_mappers:
- self._is_with_polymorphic = True
- self.with_polymorphic_mappers = with_polymorphic_mappers
- self._with_polymorphic_entities = []
- for poly in self.with_polymorphic_mappers:
- if poly is not mapper:
- ent = AliasedClass(
- poly.class_,
- selectable,
- base_alias=self,
- adapt_on_names=adapt_on_names,
- use_mapper_path=_use_mapper_path,
- )
-
- setattr(self.entity, poly.class_.__name__, ent)
- self._with_polymorphic_entities.append(ent._aliased_insp)
-
- else:
- self._is_with_polymorphic = False
- self.with_polymorphic_mappers = [mapper]
-
- self._adapter = ORMAdapter(
- _TraceAdaptRole.ALIASED_INSP,
- mapper,
- selectable=selectable,
- equivalents=mapper._equivalent_columns,
- adapt_on_names=adapt_on_names,
- anonymize_labels=True,
- # make sure the adapter doesn't try to grab other tables that
- # are not even the thing we are mapping, such as embedded
- # selectables in subqueries or CTEs. See issue #6060
- adapt_from_selectables={
- m.selectable
- for m in self.with_polymorphic_mappers
- if not adapt_on_names
- },
- limit_on_entity=False,
- )
-
- if nest_adapters:
- # supports "aliased class of aliased class" use case
- assert isinstance(inspected, AliasedInsp)
- self._adapter = inspected._adapter.wrap(self._adapter)
-
- self._adapt_on_names = adapt_on_names
- self._target = mapped_class_or_ac
-
- @classmethod
- def _alias_factory(
- cls,
- element: Union[_EntityType[_O], FromClause],
- alias: Optional[FromClause] = None,
- name: Optional[str] = None,
- flat: bool = False,
- adapt_on_names: bool = False,
- ) -> Union[AliasedClass[_O], FromClause]:
- if isinstance(element, FromClause):
- if adapt_on_names:
- raise sa_exc.ArgumentError(
- "adapt_on_names only applies to ORM elements"
- )
- if name:
- return element.alias(name=name, flat=flat)
- else:
- return coercions.expect(
- roles.AnonymizedFromClauseRole, element, flat=flat
- )
- else:
- return AliasedClass(
- element,
- alias=alias,
- flat=flat,
- name=name,
- adapt_on_names=adapt_on_names,
- )
-
- @classmethod
- def _with_polymorphic_factory(
- cls,
- base: Union[Type[_O], Mapper[_O]],
- classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
- selectable: Union[Literal[False, None], FromClause] = False,
- flat: bool = False,
- polymorphic_on: Optional[ColumnElement[Any]] = None,
- aliased: bool = False,
- innerjoin: bool = False,
- adapt_on_names: bool = False,
- _use_mapper_path: bool = False,
- ) -> AliasedClass[_O]:
- primary_mapper = _class_to_mapper(base)
-
- if selectable not in (None, False) and flat:
- raise sa_exc.ArgumentError(
- "the 'flat' and 'selectable' arguments cannot be passed "
- "simultaneously to with_polymorphic()"
- )
-
- mappers, selectable = primary_mapper._with_polymorphic_args(
- classes, selectable, innerjoin=innerjoin
- )
- if aliased or flat:
- assert selectable is not None
- selectable = selectable._anonymous_fromclause(flat=flat)
-
- return AliasedClass(
- base,
- selectable,
- with_polymorphic_mappers=mappers,
- adapt_on_names=adapt_on_names,
- with_polymorphic_discriminator=polymorphic_on,
- use_mapper_path=_use_mapper_path,
- represents_outer_join=not innerjoin,
- )
-
- @property
- def entity(self) -> AliasedClass[_O]:
- # to eliminate reference cycles, the AliasedClass is held weakly.
- # this produces some situations where the AliasedClass gets lost,
- # particularly when one is created internally and only the AliasedInsp
- # is passed around.
- # to work around this case, we just generate a new one when we need
- # it, as it is a simple class with very little initial state on it.
- ent = self._weak_entity()
- if ent is None:
- ent = AliasedClass._reconstitute_from_aliased_insp(self)
- self._weak_entity = weakref.ref(ent)
- return ent
-
- is_aliased_class = True
- "always returns True"
-
- def _memoized_method___clause_element__(self) -> FromClause:
- return self.selectable._annotate(
- {
- "parentmapper": self.mapper,
- "parententity": self,
- "entity_namespace": self,
- }
- )._set_propagate_attrs(
- {"compile_state_plugin": "orm", "plugin_subject": self}
- )
-
- @property
- def entity_namespace(self) -> AliasedClass[_O]:
- return self.entity
-
- @property
- def class_(self) -> Type[_O]:
- """Return the mapped class ultimately represented by this
- :class:`.AliasedInsp`."""
- return self.mapper.class_
-
- @property
- def _path_registry(self) -> AbstractEntityRegistry:
- if self._use_mapper_path:
- return self.mapper._path_registry
- else:
- return PathRegistry.per_mapper(self)
-
- def __getstate__(self) -> Dict[str, Any]:
- return {
- "entity": self.entity,
- "mapper": self.mapper,
- "alias": self.selectable,
- "name": self.name,
- "adapt_on_names": self._adapt_on_names,
- "with_polymorphic_mappers": self.with_polymorphic_mappers,
- "with_polymorphic_discriminator": self.polymorphic_on,
- "base_alias": self._base_alias(),
- "use_mapper_path": self._use_mapper_path,
- "represents_outer_join": self.represents_outer_join,
- "nest_adapters": self._nest_adapters,
- }
-
- def __setstate__(self, state: Dict[str, Any]) -> None:
- self.__init__( # type: ignore
- state["entity"],
- state["mapper"],
- state["alias"],
- state["name"],
- state["with_polymorphic_mappers"],
- state["with_polymorphic_discriminator"],
- state["base_alias"],
- state["use_mapper_path"],
- state["adapt_on_names"],
- state["represents_outer_join"],
- state["nest_adapters"],
- )
-
- def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
- # assert self._is_with_polymorphic
- # assert other._is_with_polymorphic
-
- primary_mapper = other.mapper
-
- assert self.mapper is primary_mapper
-
- our_classes = util.to_set(
- mp.class_ for mp in self.with_polymorphic_mappers
- )
- new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
- if our_classes == new_classes:
- return other
- else:
- classes = our_classes.union(new_classes)
-
- mappers, selectable = primary_mapper._with_polymorphic_args(
- classes, None, innerjoin=not other.represents_outer_join
- )
- selectable = selectable._anonymous_fromclause(flat=True)
- return AliasedClass(
- primary_mapper,
- selectable,
- with_polymorphic_mappers=mappers,
- with_polymorphic_discriminator=other.polymorphic_on,
- use_mapper_path=other._use_mapper_path,
- represents_outer_join=other.represents_outer_join,
- )._aliased_insp
-
- def _adapt_element(
- self, expr: _ORMCOLEXPR, key: Optional[str] = None
- ) -> _ORMCOLEXPR:
- assert isinstance(expr, ColumnElement)
- d: Dict[str, Any] = {
- "parententity": self,
- "parentmapper": self.mapper,
- }
- if key:
- d["proxy_key"] = key
-
- # IMO mypy should see this one also as returning the same type
- # we put into it, but it's not
- return (
- self._adapter.traverse(expr)
- ._annotate(d)
- ._set_propagate_attrs(
- {"compile_state_plugin": "orm", "plugin_subject": self}
- )
- )
-
- if TYPE_CHECKING:
- # establish compatibility with the _ORMAdapterProto protocol,
- # which in turn is compatible with _CoreAdapterProto.
-
- def _orm_adapt_element(
- self,
- obj: _CE,
- key: Optional[str] = None,
- ) -> _CE: ...
-
- else:
- _orm_adapt_element = _adapt_element
-
- def _entity_for_mapper(self, mapper):
- self_poly = self.with_polymorphic_mappers
- if mapper in self_poly:
- if mapper is self.mapper:
- return self
- else:
- return getattr(
- self.entity, mapper.class_.__name__
- )._aliased_insp
- elif mapper.isa(self.mapper):
- return self
- else:
- assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
-
- def _memoized_attr__get_clause(self):
- onclause, replacemap = self.mapper._get_clause
- return (
- self._adapter.traverse(onclause),
- {
- self._adapter.traverse(col): param
- for col, param in replacemap.items()
- },
- )
-
- def _memoized_attr__memoized_values(self):
- return {}
-
- def _memoized_attr__all_column_expressions(self):
- if self._is_with_polymorphic:
- cols_plus_keys = self.mapper._columns_plus_keys(
- [ent.mapper for ent in self._with_polymorphic_entities]
- )
- else:
- cols_plus_keys = self.mapper._columns_plus_keys()
-
- cols_plus_keys = [
- (key, self._adapt_element(col)) for key, col in cols_plus_keys
- ]
-
- return ColumnCollection(cols_plus_keys)
-
- def _memo(self, key, callable_, *args, **kw):
- if key in self._memoized_values:
- return self._memoized_values[key]
- else:
- self._memoized_values[key] = value = callable_(*args, **kw)
- return value
-
- def __repr__(self):
- if self.with_polymorphic_mappers:
- with_poly = "(%s)" % ", ".join(
- mp.class_.__name__ for mp in self.with_polymorphic_mappers
- )
- else:
- with_poly = ""
- return "<AliasedInsp at 0x%x; %s%s>" % (
- id(self),
- self.class_.__name__,
- with_poly,
- )
-
- def __str__(self):
- if self._is_with_polymorphic:
- return "with_polymorphic(%s, [%s])" % (
- self._target.__name__,
- ", ".join(
- mp.class_.__name__
- for mp in self.with_polymorphic_mappers
- if mp is not self.mapper
- ),
- )
- else:
- return "aliased(%s)" % (self._target.__name__,)
-
-
-class _WrapUserEntity:
- """A wrapper used within the loader_criteria lambda caller so that
- we can bypass declared_attr descriptors on unmapped mixins, which
- normally emit a warning for such use.
-
- might also be useful for other per-lambda instrumentations should
- the need arise.
-
- """
-
- __slots__ = ("subject",)
-
- def __init__(self, subject):
- self.subject = subject
-
- @util.preload_module("sqlalchemy.orm.decl_api")
- def __getattribute__(self, name):
- decl_api = util.preloaded.orm.decl_api
-
- subject = object.__getattribute__(self, "subject")
- if name in subject.__dict__ and isinstance(
- subject.__dict__[name], decl_api.declared_attr
- ):
- return subject.__dict__[name].fget(subject)
- else:
- return getattr(subject, name)
-
-
-class LoaderCriteriaOption(CriteriaOption):
- """Add additional WHERE criteria to the load for all occurrences of
- a particular entity.
-
- :class:`_orm.LoaderCriteriaOption` is invoked using the
- :func:`_orm.with_loader_criteria` function; see that function for
- details.
-
- .. versionadded:: 1.4
-
- """
-
- __slots__ = (
- "root_entity",
- "entity",
- "deferred_where_criteria",
- "where_criteria",
- "_where_crit_orig",
- "include_aliases",
- "propagate_to_loaders",
- )
-
- _traverse_internals = [
- ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
- ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
- ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
- ("include_aliases", visitors.InternalTraversal.dp_boolean),
- ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
- ]
-
- root_entity: Optional[Type[Any]]
- entity: Optional[_InternalEntityType[Any]]
- where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
- deferred_where_criteria: bool
- include_aliases: bool
- propagate_to_loaders: bool
-
- _where_crit_orig: Any
-
- def __init__(
- self,
- entity_or_base: _EntityType[Any],
- where_criteria: Union[
- _ColumnExpressionArgument[bool],
- Callable[[Any], _ColumnExpressionArgument[bool]],
- ],
- loader_only: bool = False,
- include_aliases: bool = False,
- propagate_to_loaders: bool = True,
- track_closure_variables: bool = True,
- ):
- entity = cast(
- "_InternalEntityType[Any]",
- inspection.inspect(entity_or_base, False),
- )
- if entity is None:
- self.root_entity = cast("Type[Any]", entity_or_base)
- self.entity = None
- else:
- self.root_entity = None
- self.entity = entity
-
- self._where_crit_orig = where_criteria
- if callable(where_criteria):
- if self.root_entity is not None:
- wrap_entity = self.root_entity
- else:
- assert entity is not None
- wrap_entity = entity.entity
-
- self.deferred_where_criteria = True
- self.where_criteria = lambdas.DeferredLambdaElement(
- where_criteria,
- roles.WhereHavingRole,
- lambda_args=(_WrapUserEntity(wrap_entity),),
- opts=lambdas.LambdaOptions(
- track_closure_variables=track_closure_variables
- ),
- )
- else:
- self.deferred_where_criteria = False
- self.where_criteria = coercions.expect(
- roles.WhereHavingRole, where_criteria
- )
-
- self.include_aliases = include_aliases
- self.propagate_to_loaders = propagate_to_loaders
-
- @classmethod
- def _unreduce(
- cls, entity, where_criteria, include_aliases, propagate_to_loaders
- ):
- return LoaderCriteriaOption(
- entity,
- where_criteria,
- include_aliases=include_aliases,
- propagate_to_loaders=propagate_to_loaders,
- )
-
- def __reduce__(self):
- return (
- LoaderCriteriaOption._unreduce,
- (
- self.entity.class_ if self.entity else self.root_entity,
- self._where_crit_orig,
- self.include_aliases,
- self.propagate_to_loaders,
- ),
- )
-
- def _all_mappers(self) -> Iterator[Mapper[Any]]:
- if self.entity:
- yield from self.entity.mapper.self_and_descendants
- else:
- assert self.root_entity
- stack = list(self.root_entity.__subclasses__())
- while stack:
- subclass = stack.pop(0)
- ent = cast(
- "_InternalEntityType[Any]",
- inspection.inspect(subclass, raiseerr=False),
- )
- if ent:
- yield from ent.mapper.self_and_descendants
- else:
- stack.extend(subclass.__subclasses__())
-
- def _should_include(self, compile_state: ORMCompileState) -> bool:
- if (
- compile_state.select_statement._annotations.get(
- "for_loader_criteria", None
- )
- is self
- ):
- return False
- return True
-
- def _resolve_where_criteria(
- self, ext_info: _InternalEntityType[Any]
- ) -> ColumnElement[bool]:
- if self.deferred_where_criteria:
- crit = cast(
- "ColumnElement[bool]",
- self.where_criteria._resolve_with_args(ext_info.entity),
- )
- else:
- crit = self.where_criteria # type: ignore
- assert isinstance(crit, ColumnElement)
- return sql_util._deep_annotate(
- crit,
- {"for_loader_criteria": self},
- detect_subquery_cols=True,
- ind_cols_on_fromclause=True,
- )
-
- def process_compile_state_replaced_entities(
- self,
- compile_state: ORMCompileState,
- mapper_entities: Iterable[_MapperEntity],
- ) -> None:
- self.process_compile_state(compile_state)
-
- def process_compile_state(self, compile_state: ORMCompileState) -> None:
- """Apply a modification to a given :class:`.CompileState`."""
-
- # if options to limit the criteria to immediate query only,
- # use compile_state.attributes instead
-
- self.get_global_criteria(compile_state.global_attributes)
-
- def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
- for mp in self._all_mappers():
- load_criteria = attributes.setdefault(
- ("additional_entity_criteria", mp), []
- )
-
- load_criteria.append(self)
-
-
-inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
-
-
-@inspection._inspects(type)
-def _inspect_mc(
- class_: Type[_O],
-) -> Optional[Mapper[_O]]:
- try:
- class_manager = opt_manager_of_class(class_)
- if class_manager is None or not class_manager.is_mapped:
- return None
- mapper = class_manager.mapper
- except exc.NO_STATE:
- return None
- else:
- return mapper
-
-
-GenericAlias = type(List[Any])
-
-
-@inspection._inspects(GenericAlias)
-def _inspect_generic_alias(
- class_: Type[_O],
-) -> Optional[Mapper[_O]]:
- origin = cast("Type[_O]", typing_get_origin(class_))
- return _inspect_mc(origin)
-
-
-@inspection._self_inspects
-class Bundle(
- ORMColumnsClauseRole[_T],
- SupportsCloneAnnotations,
- MemoizedHasCacheKey,
- inspection.Inspectable["Bundle[_T]"],
- InspectionAttr,
-):
- """A grouping of SQL expressions that are returned by a :class:`.Query`
- under one namespace.
-
- The :class:`.Bundle` essentially allows nesting of the tuple-based
- results returned by a column-oriented :class:`_query.Query` object.
- It also
- is extensible via simple subclassing, where the primary capability
- to override is that of how the set of expressions should be returned,
- allowing post-processing as well as custom return types, without
- involving ORM identity-mapped classes.
-
- .. seealso::
-
- :ref:`bundles`
-
-
- """
-
- single_entity = False
- """If True, queries for a single Bundle will be returned as a single
- entity, rather than an element within a keyed tuple."""
-
- is_clause_element = False
-
- is_mapper = False
-
- is_aliased_class = False
-
- is_bundle = True
-
- _propagate_attrs: _PropagateAttrsType = util.immutabledict()
-
- proxy_set = util.EMPTY_SET # type: ignore
-
- exprs: List[_ColumnsClauseElement]
-
- def __init__(
- self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
- ):
- r"""Construct a new :class:`.Bundle`.
-
- e.g.::
-
- bn = Bundle("mybundle", MyClass.x, MyClass.y)
-
- for row in session.query(bn).filter(
- bn.c.x == 5).filter(bn.c.y == 4):
- print(row.mybundle.x, row.mybundle.y)
-
- :param name: name of the bundle.
- :param \*exprs: columns or SQL expressions comprising the bundle.
- :param single_entity=False: if True, rows for this :class:`.Bundle`
- can be returned as a "single entity" outside of any enclosing tuple
- in the same manner as a mapped entity.
-
- """
- self.name = self._label = name
- coerced_exprs = [
- coercions.expect(
- roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
- )
- for expr in exprs
- ]
- self.exprs = coerced_exprs
-
- self.c = self.columns = ColumnCollection(
- (getattr(col, "key", col._label), col)
- for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
- ).as_readonly()
- self.single_entity = kw.pop("single_entity", self.single_entity)
-
- def _gen_cache_key(
- self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
- ) -> Tuple[Any, ...]:
- return (self.__class__, self.name, self.single_entity) + tuple(
- [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
- )
-
- @property
- def mapper(self) -> Optional[Mapper[Any]]:
- mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
- "parentmapper", None
- )
- return mp
-
- @property
- def entity(self) -> Optional[_InternalEntityType[Any]]:
- ie: Optional[_InternalEntityType[Any]] = self.exprs[
- 0
- ]._annotations.get("parententity", None)
- return ie
-
- @property
- def entity_namespace(
- self,
- ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
- return self.c
-
- columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
-
- """A namespace of SQL expressions referred to by this :class:`.Bundle`.
-
- e.g.::
-
- bn = Bundle("mybundle", MyClass.x, MyClass.y)
-
- q = sess.query(bn).filter(bn.c.x == 5)
-
- Nesting of bundles is also supported::
-
- b1 = Bundle("b1",
- Bundle('b2', MyClass.a, MyClass.b),
- Bundle('b3', MyClass.x, MyClass.y)
- )
-
- q = sess.query(b1).filter(
- b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
-
- .. seealso::
-
- :attr:`.Bundle.c`
-
- """
-
- c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
- """An alias for :attr:`.Bundle.columns`."""
-
- def _clone(self):
- cloned = self.__class__.__new__(self.__class__)
- cloned.__dict__.update(self.__dict__)
- return cloned
-
- def __clause_element__(self):
- # ensure existing entity_namespace remains
- annotations = {"bundle": self, "entity_namespace": self}
- annotations.update(self._annotations)
-
- plugin_subject = self.exprs[0]._propagate_attrs.get(
- "plugin_subject", self.entity
- )
- return (
- expression.ClauseList(
- _literal_as_text_role=roles.ColumnsClauseRole,
- group=False,
- *[e._annotations.get("bundle", e) for e in self.exprs],
- )
- ._annotate(annotations)
- ._set_propagate_attrs(
- # the Bundle *must* use the orm plugin no matter what. the
- # subject can be None but it's much better if it's not.
- {
- "compile_state_plugin": "orm",
- "plugin_subject": plugin_subject,
- }
- )
- )
-
- @property
- def clauses(self):
- return self.__clause_element__().clauses
-
- def label(self, name):
- """Provide a copy of this :class:`.Bundle` passing a new label."""
-
- cloned = self._clone()
- cloned.name = name
- return cloned
-
- def create_row_processor(
- self,
- query: Select[Any],
- procs: Sequence[Callable[[Row[Any]], Any]],
- labels: Sequence[str],
- ) -> Callable[[Row[Any]], Any]:
- """Produce the "row processing" function for this :class:`.Bundle`.
-
- May be overridden by subclasses to provide custom behaviors when
- results are fetched. The method is passed the statement object and a
- set of "row processor" functions at query execution time; these
- processor functions when given a result row will return the individual
- attribute value, which can then be adapted into any kind of return data
- structure.
-
- The example below illustrates replacing the usual :class:`.Row`
- return structure with a straight Python dictionary::
-
- from sqlalchemy.orm import Bundle
-
- class DictBundle(Bundle):
- def create_row_processor(self, query, procs, labels):
- 'Override create_row_processor to return values as
- dictionaries'
-
- def proc(row):
- return dict(
- zip(labels, (proc(row) for proc in procs))
- )
- return proc
-
- A result from the above :class:`_orm.Bundle` will return dictionary
- values::
-
- bn = DictBundle('mybundle', MyClass.data1, MyClass.data2)
- for row in session.execute(select(bn)).where(bn.c.data1 == 'd1'):
- print(row.mybundle['data1'], row.mybundle['data2'])
-
- """
- keyed_tuple = result_tuple(labels, [() for l in labels])
-
- def proc(row: Row[Any]) -> Any:
- return keyed_tuple([proc(row) for proc in procs])
-
- return proc
-
-
-def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA:
- """Deep copy the given ClauseElement, annotating each element with the
- "_orm_adapt" flag.
-
- Elements within the exclude collection will be cloned but not annotated.
-
- """
- return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude)
-
-
-def _orm_deannotate(element: _SA) -> _SA:
- """Remove annotations that link a column to a particular mapping.
-
- Note this doesn't affect "remote" and "foreign" annotations
- passed by the :func:`_orm.foreign` and :func:`_orm.remote`
- annotators.
-
- """
-
- return sql_util._deep_deannotate(
- element, values=("_orm_adapt", "parententity")
- )
-
-
-def _orm_full_deannotate(element: _SA) -> _SA:
- return sql_util._deep_deannotate(element)
-
-
-class _ORMJoin(expression.Join):
- """Extend Join to support ORM constructs as input."""
-
- __visit_name__ = expression.Join.__visit_name__
-
- inherit_cache = True
-
- def __init__(
- self,
- left: _FromClauseArgument,
- right: _FromClauseArgument,
- onclause: Optional[_OnClauseArgument] = None,
- isouter: bool = False,
- full: bool = False,
- _left_memo: Optional[Any] = None,
- _right_memo: Optional[Any] = None,
- _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
- ):
- left_info = cast(
- "Union[FromClause, _InternalEntityType[Any]]",
- inspection.inspect(left),
- )
-
- right_info = cast(
- "Union[FromClause, _InternalEntityType[Any]]",
- inspection.inspect(right),
- )
- adapt_to = right_info.selectable
-
- # used by joined eager loader
- self._left_memo = _left_memo
- self._right_memo = _right_memo
-
- if isinstance(onclause, attributes.QueryableAttribute):
- if TYPE_CHECKING:
- assert isinstance(
- onclause.comparator, RelationshipProperty.Comparator
- )
- on_selectable = onclause.comparator._source_selectable()
- prop = onclause.property
- _extra_criteria += onclause._extra_criteria
- elif isinstance(onclause, MapperProperty):
- # used internally by joined eager loader...possibly not ideal
- prop = onclause
- on_selectable = prop.parent.selectable
- else:
- prop = None
- on_selectable = None
-
- left_selectable = left_info.selectable
- if prop:
- adapt_from: Optional[FromClause]
- if sql_util.clause_is_present(on_selectable, left_selectable):
- adapt_from = on_selectable
- else:
- assert isinstance(left_selectable, FromClause)
- adapt_from = left_selectable
-
- (
- pj,
- sj,
- source,
- dest,
- secondary,
- target_adapter,
- ) = prop._create_joins(
- source_selectable=adapt_from,
- dest_selectable=adapt_to,
- source_polymorphic=True,
- of_type_entity=right_info,
- alias_secondary=True,
- extra_criteria=_extra_criteria,
- )
-
- if sj is not None:
- if isouter:
- # note this is an inner join from secondary->right
- right = sql.join(secondary, right, sj)
- onclause = pj
- else:
- left = sql.join(left, secondary, pj, isouter)
- onclause = sj
- else:
- onclause = pj
-
- self._target_adapter = target_adapter
-
- # we don't use the normal coercions logic for _ORMJoin
- # (probably should), so do some gymnastics to get the entity.
- # logic here is for #8721, which was a major bug in 1.4
- # for almost two years, not reported/fixed until 1.4.43 (!)
- if is_selectable(left_info):
- parententity = left_selectable._annotations.get(
- "parententity", None
- )
- elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
- parententity = left_info
- else:
- parententity = None
-
- if parententity is not None:
- self._annotations = self._annotations.union(
- {"parententity": parententity}
- )
-
- augment_onclause = bool(_extra_criteria) and not prop
- expression.Join.__init__(self, left, right, onclause, isouter, full)
-
- assert self.onclause is not None
-
- if augment_onclause:
- self.onclause &= sql.and_(*_extra_criteria)
-
- if (
- not prop
- and getattr(right_info, "mapper", None)
- and right_info.mapper.single # type: ignore
- ):
- right_info = cast("_InternalEntityType[Any]", right_info)
- # if single inheritance target and we are using a manual
- # or implicit ON clause, augment it the same way we'd augment the
- # WHERE.
- single_crit = right_info.mapper._single_table_criterion
- if single_crit is not None:
- if insp_is_aliased_class(right_info):
- single_crit = right_info._adapter.traverse(single_crit)
- self.onclause = self.onclause & single_crit
-
- def _splice_into_center(self, other):
- """Splice a join into the center.
-
- Given join(a, b) and join(b, c), return join(a, b).join(c)
-
- """
- leftmost = other
- while isinstance(leftmost, sql.Join):
- leftmost = leftmost.left
-
- assert self.right is leftmost
-
- left = _ORMJoin(
- self.left,
- other.left,
- self.onclause,
- isouter=self.isouter,
- _left_memo=self._left_memo,
- _right_memo=other._left_memo,
- )
-
- return _ORMJoin(
- left,
- other.right,
- other.onclause,
- isouter=other.isouter,
- _right_memo=other._right_memo,
- )
-
- def join(
- self,
- right: _FromClauseArgument,
- onclause: Optional[_OnClauseArgument] = None,
- isouter: bool = False,
- full: bool = False,
- ) -> _ORMJoin:
- return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
-
- def outerjoin(
- self,
- right: _FromClauseArgument,
- onclause: Optional[_OnClauseArgument] = None,
- full: bool = False,
- ) -> _ORMJoin:
- return _ORMJoin(self, right, onclause, isouter=True, full=full)
-
-
-def with_parent(
- instance: object,
- prop: attributes.QueryableAttribute[Any],
- from_entity: Optional[_EntityType[Any]] = None,
-) -> ColumnElement[bool]:
- """Create filtering criterion that relates this query's primary entity
- to the given related instance, using established
- :func:`_orm.relationship()`
- configuration.
-
- E.g.::
-
- stmt = select(Address).where(with_parent(some_user, User.addresses))
-
-
- The SQL rendered is the same as that rendered when a lazy loader
- would fire off from the given parent on that attribute, meaning
- that the appropriate state is taken from the parent object in
- Python without the need to render joins to the parent table
- in the rendered statement.
-
- The given property may also make use of :meth:`_orm.PropComparator.of_type`
- to indicate the left side of the criteria::
-
-
- a1 = aliased(Address)
- a2 = aliased(Address)
- stmt = select(a1, a2).where(
- with_parent(u1, User.addresses.of_type(a2))
- )
-
- The above use is equivalent to using the
- :func:`_orm.with_parent.from_entity` argument::
-
- a1 = aliased(Address)
- a2 = aliased(Address)
- stmt = select(a1, a2).where(
- with_parent(u1, User.addresses, from_entity=a2)
- )
-
- :param instance:
- An instance which has some :func:`_orm.relationship`.
-
- :param property:
- Class-bound attribute, which indicates
- what relationship from the instance should be used to reconcile the
- parent/child relationship.
-
- :param from_entity:
- Entity in which to consider as the left side. This defaults to the
- "zero" entity of the :class:`_query.Query` itself.
-
- .. versionadded:: 1.2
-
- """
- prop_t: RelationshipProperty[Any]
-
- if isinstance(prop, str):
- raise sa_exc.ArgumentError(
- "with_parent() accepts class-bound mapped attributes, not strings"
- )
- elif isinstance(prop, attributes.QueryableAttribute):
- if prop._of_type:
- from_entity = prop._of_type
- mapper_property = prop.property
- if mapper_property is None or not prop_is_relationship(
- mapper_property
- ):
- raise sa_exc.ArgumentError(
- f"Expected relationship property for with_parent(), "
- f"got {mapper_property}"
- )
- prop_t = mapper_property
- else:
- prop_t = prop
-
- return prop_t._with_parent(instance, from_entity=from_entity)
-
-
-def has_identity(object_: object) -> bool:
- """Return True if the given object has a database
- identity.
-
- This typically corresponds to the object being
- in either the persistent or detached state.
-
- .. seealso::
-
- :func:`.was_deleted`
-
- """
- state = attributes.instance_state(object_)
- return state.has_identity
-
-
-def was_deleted(object_: object) -> bool:
- """Return True if the given object was deleted
- within a session flush.
-
- This is regardless of whether or not the object is
- persistent or detached.
-
- .. seealso::
-
- :attr:`.InstanceState.was_deleted`
-
- """
-
- state = attributes.instance_state(object_)
- return state.was_deleted
-
-
-def _entity_corresponds_to(
- given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
-) -> bool:
- """determine if 'given' corresponds to 'entity', in terms
- of an entity passed to Query that would match the same entity
- being referred to elsewhere in the query.
-
- """
- if insp_is_aliased_class(entity):
- if insp_is_aliased_class(given):
- if entity._base_alias() is given._base_alias():
- return True
- return False
- elif insp_is_aliased_class(given):
- if given._use_mapper_path:
- return entity in given.with_polymorphic_mappers
- else:
- return entity is given
-
- assert insp_is_mapper(given)
- return entity.common_parent(given)
-
-
-def _entity_corresponds_to_use_path_impl(
- given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
-) -> bool:
- """determine if 'given' corresponds to 'entity', in terms
- of a path of loader options where a mapped attribute is taken to
- be a member of a parent entity.
-
- e.g.::
-
- someoption(A).someoption(A.b) # -> fn(A, A) -> True
- someoption(A).someoption(C.d) # -> fn(A, C) -> False
-
- a1 = aliased(A)
- someoption(a1).someoption(A.b) # -> fn(a1, A) -> False
- someoption(a1).someoption(a1.b) # -> fn(a1, a1) -> True
-
- wp = with_polymorphic(A, [A1, A2])
- someoption(wp).someoption(A1.foo) # -> fn(wp, A1) -> False
- someoption(wp).someoption(wp.A1.foo) # -> fn(wp, wp.A1) -> True
-
-
- """
- if insp_is_aliased_class(given):
- return (
- insp_is_aliased_class(entity)
- and not entity._use_mapper_path
- and (given is entity or entity in given._with_polymorphic_entities)
- )
- elif not insp_is_aliased_class(entity):
- return given.isa(entity.mapper)
- else:
- return (
- entity._use_mapper_path
- and given in entity.with_polymorphic_mappers
- )
-
-
-def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
- """determine if 'given' "is a" mapper, in terms of the given
- would load rows of type 'mapper'.
-
- """
- if given.is_aliased_class:
- return mapper in given.with_polymorphic_mappers or given.mapper.isa(
- mapper
- )
- elif given.with_polymorphic_mappers:
- return mapper in given.with_polymorphic_mappers
- else:
- return given.isa(mapper)
-
-
-def _getitem(iterable_query: Query[Any], item: Any) -> Any:
- """calculate __getitem__ in terms of an iterable query object
- that also has a slice() method.
-
- """
-
- def _no_negative_indexes():
- raise IndexError(
- "negative indexes are not accepted by SQL "
- "index / slice operators"
- )
-
- if isinstance(item, slice):
- start, stop, step = util.decode_slice(item)
-
- if (
- isinstance(stop, int)
- and isinstance(start, int)
- and stop - start <= 0
- ):
- return []
-
- elif (isinstance(start, int) and start < 0) or (
- isinstance(stop, int) and stop < 0
- ):
- _no_negative_indexes()
-
- res = iterable_query.slice(start, stop)
- if step is not None:
- return list(res)[None : None : item.step]
- else:
- return list(res)
- else:
- if item == -1:
- _no_negative_indexes()
- else:
- return list(iterable_query[item : item + 1])[0]
-
-
-def _is_mapped_annotation(
- raw_annotation: _AnnotationScanType,
- cls: Type[Any],
- originating_cls: Type[Any],
-) -> bool:
- try:
- annotated = de_stringify_annotation(
- cls, raw_annotation, originating_cls.__module__
- )
- except NameError:
- # in most cases, at least within our own tests, we can raise
- # here, which is more accurate as it prevents us from returning
- # false negatives. However, in the real world, try to avoid getting
- # involved with end-user annotations that have nothing to do with us.
- # see issue #8888 where we bypass using this function in the case
- # that we want to detect an unresolvable Mapped[] type.
- return False
- else:
- return is_origin_of_cls(annotated, _MappedAnnotationBase)
-
-
-class _CleanupError(Exception):
- pass
-
-
-def _cleanup_mapped_str_annotation(
- annotation: str, originating_module: str
-) -> str:
- # fix up an annotation that comes in as the form:
- # 'Mapped[List[Address]]' so that it instead looks like:
- # 'Mapped[List["Address"]]' , which will allow us to get
- # "Address" as a string
-
- # additionally, resolve symbols for these names since this is where
- # we'd have to do it
-
- inner: Optional[Match[str]]
-
- mm = re.match(r"^(.+?)\[(.+)\]$", annotation)
-
- if not mm:
- return annotation
-
- # ticket #8759. Resolve the Mapped name to a real symbol.
- # originally this just checked the name.
- try:
- obj = eval_name_only(mm.group(1), originating_module)
- except NameError as ne:
- raise _CleanupError(
- f'For annotation "{annotation}", could not resolve '
- f'container type "{mm.group(1)}". '
- "Please ensure this type is imported at the module level "
- "outside of TYPE_CHECKING blocks"
- ) from ne
-
- if obj is typing.ClassVar:
- real_symbol = "ClassVar"
- else:
- try:
- if issubclass(obj, _MappedAnnotationBase):
- real_symbol = obj.__name__
- else:
- return annotation
- except TypeError:
- # avoid isinstance(obj, type) check, just catch TypeError
- return annotation
-
- # note: if one of the codepaths above didn't define real_symbol and
- # then didn't return, real_symbol raises UnboundLocalError
- # which is actually a NameError, and the calling routines don't
- # notice this since they are catching NameError anyway. Just in case
- # this is being modified in the future, something to be aware of.
-
- stack = []
- inner = mm
- while True:
- stack.append(real_symbol if mm is inner else inner.group(1))
- g2 = inner.group(2)
- inner = re.match(r"^(.+?)\[(.+)\]$", g2)
- if inner is None:
- stack.append(g2)
- break
-
- # stacks we want to rewrite, that is, quote the last entry which
- # we think is a relationship class name:
- #
- # ['Mapped', 'List', 'Address']
- # ['Mapped', 'A']
- #
- # stacks we dont want to rewrite, which are generally MappedColumn
- # use cases:
- #
- # ['Mapped', "'Optional[Dict[str, str]]'"]
- # ['Mapped', 'dict[str, str] | None']
-
- if (
- # avoid already quoted symbols such as
- # ['Mapped', "'Optional[Dict[str, str]]'"]
- not re.match(r"""^["'].*["']$""", stack[-1])
- # avoid further generics like Dict[] such as
- # ['Mapped', 'dict[str, str] | None']
- and not re.match(r".*\[.*\]", stack[-1])
- ):
- stripchars = "\"' "
- stack[-1] = ", ".join(
- f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
- )
-
- annotation = "[".join(stack) + ("]" * (len(stack) - 1))
-
- return annotation
-
-
-def _extract_mapped_subtype(
- raw_annotation: Optional[_AnnotationScanType],
- cls: type,
- originating_module: str,
- key: str,
- attr_cls: Type[Any],
- required: bool,
- is_dataclass_field: bool,
- expect_mapped: bool = True,
- raiseerr: bool = True,
-) -> Optional[Tuple[Union[type, str], Optional[type]]]:
- """given an annotation, figure out if it's ``Mapped[something]`` and if
- so, return the ``something`` part.
-
- Includes error raise scenarios and other options.
-
- """
-
- if raw_annotation is None:
- if required:
- raise sa_exc.ArgumentError(
- f"Python typing annotation is required for attribute "
- f'"{cls.__name__}.{key}" when primary argument(s) for '
- f'"{attr_cls.__name__}" construct are None or not present'
- )
- return None
-
- try:
- annotated = de_stringify_annotation(
- cls,
- raw_annotation,
- originating_module,
- str_cleanup_fn=_cleanup_mapped_str_annotation,
- )
- except _CleanupError as ce:
- raise sa_exc.ArgumentError(
- f"Could not interpret annotation {raw_annotation}. "
- "Check that it uses names that are correctly imported at the "
- "module level. See chained stack trace for more hints."
- ) from ce
- except NameError as ne:
- if raiseerr and "Mapped[" in raw_annotation: # type: ignore
- raise sa_exc.ArgumentError(
- f"Could not interpret annotation {raw_annotation}. "
- "Check that it uses names that are correctly imported at the "
- "module level. See chained stack trace for more hints."
- ) from ne
-
- annotated = raw_annotation # type: ignore
-
- if is_dataclass_field:
- return annotated, None
- else:
- if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
- annotated, _MappedAnnotationBase
- ):
- if expect_mapped:
- if not raiseerr:
- return None
-
- origin = getattr(annotated, "__origin__", None)
- if origin is typing.ClassVar:
- return None
-
- # check for other kind of ORM descriptor like AssociationProxy,
- # don't raise for that (issue #9957)
- elif isinstance(origin, type) and issubclass(
- origin, ORMDescriptor
- ):
- return None
-
- raise sa_exc.ArgumentError(
- f'Type annotation for "{cls.__name__}.{key}" '
- "can't be correctly interpreted for "
- "Annotated Declarative Table form. ORM annotations "
- "should normally make use of the ``Mapped[]`` generic "
- "type, or other ORM-compatible generic type, as a "
- "container for the actual type, which indicates the "
- "intent that the attribute is mapped. "
- "Class variables that are not intended to be mapped "
- "by the ORM should use ClassVar[]. "
- "To allow Annotated Declarative to disregard legacy "
- "annotations which don't use Mapped[] to pass, set "
- '"__allow_unmapped__ = True" on the class or a '
- "superclass this class.",
- code="zlpr",
- )
-
- else:
- return annotated, None
-
- if len(annotated.__args__) != 1:
- raise sa_exc.ArgumentError(
- "Expected sub-type for Mapped[] annotation"
- )
-
- return annotated.__args__[0], annotated.__origin__
-
-
-def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
- if hasattr(prop, "_mapper_property_name"):
- name = prop._mapper_property_name()
- else:
- name = None
- return util.clsname_as_plain_name(prop, name)