summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py2835
1 files changed, 0 insertions, 2835 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py
deleted file mode 100644
index 5b16ce3..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/orm/attributes.py
+++ /dev/null
@@ -1,2835 +0,0 @@
-# orm/attributes.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: allow-untyped-defs, allow-untyped-calls
-
-"""Defines instrumentation for class attributes and their interaction
-with instances.
-
-This module is usually not directly visible to user applications, but
-defines a large part of the ORM's interactivity.
-
-
-"""
-
-from __future__ import annotations
-
-import dataclasses
-import operator
-from typing import Any
-from typing import Callable
-from typing import cast
-from typing import ClassVar
-from typing import Dict
-from typing import Iterable
-from typing import List
-from typing import NamedTuple
-from typing import Optional
-from typing import overload
-from typing import Sequence
-from typing import Tuple
-from typing import Type
-from typing import TYPE_CHECKING
-from typing import TypeVar
-from typing import Union
-
-from . import collections
-from . import exc as orm_exc
-from . import interfaces
-from ._typing import insp_is_aliased_class
-from .base import _DeclarativeMapped
-from .base import ATTR_EMPTY
-from .base import ATTR_WAS_SET
-from .base import CALLABLES_OK
-from .base import DEFERRED_HISTORY_LOAD
-from .base import INCLUDE_PENDING_MUTATIONS # noqa
-from .base import INIT_OK
-from .base import instance_dict as instance_dict
-from .base import instance_state as instance_state
-from .base import instance_str
-from .base import LOAD_AGAINST_COMMITTED
-from .base import LoaderCallableStatus
-from .base import manager_of_class as manager_of_class
-from .base import Mapped as Mapped # noqa
-from .base import NEVER_SET # noqa
-from .base import NO_AUTOFLUSH
-from .base import NO_CHANGE # noqa
-from .base import NO_KEY
-from .base import NO_RAISE
-from .base import NO_VALUE
-from .base import NON_PERSISTENT_OK # noqa
-from .base import opt_manager_of_class as opt_manager_of_class
-from .base import PASSIVE_CLASS_MISMATCH # noqa
-from .base import PASSIVE_NO_FETCH
-from .base import PASSIVE_NO_FETCH_RELATED # noqa
-from .base import PASSIVE_NO_INITIALIZE
-from .base import PASSIVE_NO_RESULT
-from .base import PASSIVE_OFF
-from .base import PASSIVE_ONLY_PERSISTENT
-from .base import PASSIVE_RETURN_NO_VALUE
-from .base import PassiveFlag
-from .base import RELATED_OBJECT_OK # noqa
-from .base import SQL_OK # noqa
-from .base import SQLORMExpression
-from .base import state_str
-from .. import event
-from .. import exc
-from .. import inspection
-from .. import util
-from ..event import dispatcher
-from ..event import EventTarget
-from ..sql import base as sql_base
-from ..sql import cache_key
-from ..sql import coercions
-from ..sql import roles
-from ..sql import visitors
-from ..sql.cache_key import HasCacheKey
-from ..sql.visitors import _TraverseInternalsType
-from ..sql.visitors import InternalTraversal
-from ..util.typing import Literal
-from ..util.typing import Self
-from ..util.typing import TypeGuard
-
-if TYPE_CHECKING:
- from ._typing import _EntityType
- from ._typing import _ExternalEntityType
- from ._typing import _InstanceDict
- from ._typing import _InternalEntityType
- from ._typing import _LoaderCallable
- from ._typing import _O
- from .collections import _AdaptedCollectionProtocol
- from .collections import CollectionAdapter
- from .interfaces import MapperProperty
- from .relationships import RelationshipProperty
- from .state import InstanceState
- from .util import AliasedInsp
- from .writeonly import WriteOnlyAttributeImpl
- from ..event.base import _Dispatch
- from ..sql._typing import _ColumnExpressionArgument
- from ..sql._typing import _DMLColumnArgument
- from ..sql._typing import _InfoType
- from ..sql._typing import _PropagateAttrsType
- from ..sql.annotation import _AnnotationDict
- from ..sql.elements import ColumnElement
- from ..sql.elements import Label
- from ..sql.operators import OperatorType
- from ..sql.selectable import FromClause
-
-
-_T = TypeVar("_T")
-_T_co = TypeVar("_T_co", bound=Any, covariant=True)
-
-
-_AllPendingType = Sequence[
- Tuple[Optional["InstanceState[Any]"], Optional[object]]
-]
-
-
-_UNKNOWN_ATTR_KEY = object()
-
-
-@inspection._self_inspects
-class QueryableAttribute(
- _DeclarativeMapped[_T_co],
- SQLORMExpression[_T_co],
- interfaces.InspectionAttr,
- interfaces.PropComparator[_T_co],
- roles.JoinTargetRole,
- roles.OnClauseRole,
- sql_base.Immutable,
- cache_key.SlotsMemoizedHasCacheKey,
- util.MemoizedSlots,
- EventTarget,
-):
- """Base class for :term:`descriptor` objects that intercept
- attribute events on behalf of a :class:`.MapperProperty`
- object. The actual :class:`.MapperProperty` is accessible
- via the :attr:`.QueryableAttribute.property`
- attribute.
-
-
- .. seealso::
-
- :class:`.InstrumentedAttribute`
-
- :class:`.MapperProperty`
-
- :attr:`_orm.Mapper.all_orm_descriptors`
-
- :attr:`_orm.Mapper.attrs`
- """
-
- __slots__ = (
- "class_",
- "key",
- "impl",
- "comparator",
- "property",
- "parent",
- "expression",
- "_of_type",
- "_extra_criteria",
- "_slots_dispatch",
- "_propagate_attrs",
- "_doc",
- )
-
- is_attribute = True
-
- dispatch: dispatcher[QueryableAttribute[_T_co]]
-
- class_: _ExternalEntityType[Any]
- key: str
- parententity: _InternalEntityType[Any]
- impl: AttributeImpl
- comparator: interfaces.PropComparator[_T_co]
- _of_type: Optional[_InternalEntityType[Any]]
- _extra_criteria: Tuple[ColumnElement[bool], ...]
- _doc: Optional[str]
-
- # PropComparator has a __visit_name__ to participate within
- # traversals. Disambiguate the attribute vs. a comparator.
- __visit_name__ = "orm_instrumented_attribute"
-
- def __init__(
- self,
- class_: _ExternalEntityType[_O],
- key: str,
- parententity: _InternalEntityType[_O],
- comparator: interfaces.PropComparator[_T_co],
- impl: Optional[AttributeImpl] = None,
- of_type: Optional[_InternalEntityType[Any]] = None,
- extra_criteria: Tuple[ColumnElement[bool], ...] = (),
- ):
- self.class_ = class_
- self.key = key
-
- self._parententity = self.parent = parententity
-
- # this attribute is non-None after mappers are set up, however in the
- # interim class manager setup, there's a check for None to see if it
- # needs to be populated, so we assign None here leaving the attribute
- # in a temporarily not-type-correct state
- self.impl = impl # type: ignore
-
- assert comparator is not None
- self.comparator = comparator
- self._of_type = of_type
- self._extra_criteria = extra_criteria
- self._doc = None
-
- manager = opt_manager_of_class(class_)
- # manager is None in the case of AliasedClass
- if manager:
- # propagate existing event listeners from
- # immediate superclass
- for base in manager._bases:
- if key in base:
- self.dispatch._update(base[key].dispatch)
- if base[key].dispatch._active_history:
- self.dispatch._active_history = True # type: ignore
-
- _cache_key_traversal = [
- ("key", visitors.ExtendedInternalTraversal.dp_string),
- ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
- ("_of_type", visitors.ExtendedInternalTraversal.dp_multi),
- ("_extra_criteria", visitors.InternalTraversal.dp_clauseelement_list),
- ]
-
- def __reduce__(self) -> Any:
- # this method is only used in terms of the
- # sqlalchemy.ext.serializer extension
- return (
- _queryable_attribute_unreduce,
- (
- self.key,
- self._parententity.mapper.class_,
- self._parententity,
- self._parententity.entity,
- ),
- )
-
- @property
- def _impl_uses_objects(self) -> bool:
- return self.impl.uses_objects
-
- def get_history(
- self, instance: Any, passive: PassiveFlag = PASSIVE_OFF
- ) -> History:
- return self.impl.get_history(
- instance_state(instance), instance_dict(instance), passive
- )
-
- @property
- def info(self) -> _InfoType:
- """Return the 'info' dictionary for the underlying SQL element.
-
- The behavior here is as follows:
-
- * If the attribute is a column-mapped property, i.e.
- :class:`.ColumnProperty`, which is mapped directly
- to a schema-level :class:`_schema.Column` object, this attribute
- will return the :attr:`.SchemaItem.info` dictionary associated
- with the core-level :class:`_schema.Column` object.
-
- * If the attribute is a :class:`.ColumnProperty` but is mapped to
- any other kind of SQL expression other than a
- :class:`_schema.Column`,
- the attribute will refer to the :attr:`.MapperProperty.info`
- dictionary associated directly with the :class:`.ColumnProperty`,
- assuming the SQL expression itself does not have its own ``.info``
- attribute (which should be the case, unless a user-defined SQL
- construct has defined one).
-
- * If the attribute refers to any other kind of
- :class:`.MapperProperty`, including :class:`.Relationship`,
- the attribute will refer to the :attr:`.MapperProperty.info`
- dictionary associated with that :class:`.MapperProperty`.
-
- * To access the :attr:`.MapperProperty.info` dictionary of the
- :class:`.MapperProperty` unconditionally, including for a
- :class:`.ColumnProperty` that's associated directly with a
- :class:`_schema.Column`, the attribute can be referred to using
- :attr:`.QueryableAttribute.property` attribute, as
- ``MyClass.someattribute.property.info``.
-
- .. seealso::
-
- :attr:`.SchemaItem.info`
-
- :attr:`.MapperProperty.info`
-
- """
- return self.comparator.info
-
- parent: _InternalEntityType[Any]
- """Return an inspection instance representing the parent.
-
- This will be either an instance of :class:`_orm.Mapper`
- or :class:`.AliasedInsp`, depending upon the nature
- of the parent entity which this attribute is associated
- with.
-
- """
-
- expression: ColumnElement[_T_co]
- """The SQL expression object represented by this
- :class:`.QueryableAttribute`.
-
- This will typically be an instance of a :class:`_sql.ColumnElement`
- subclass representing a column expression.
-
- """
-
- def _memoized_attr_expression(self) -> ColumnElement[_T]:
- annotations: _AnnotationDict
-
- # applies only to Proxy() as used by hybrid.
- # currently is an exception to typing rather than feeding through
- # non-string keys.
- # ideally Proxy() would have a separate set of methods to deal
- # with this case.
- entity_namespace = self._entity_namespace
- assert isinstance(entity_namespace, HasCacheKey)
-
- if self.key is _UNKNOWN_ATTR_KEY:
- annotations = {"entity_namespace": entity_namespace}
- else:
- annotations = {
- "proxy_key": self.key,
- "proxy_owner": self._parententity,
- "entity_namespace": entity_namespace,
- }
-
- ce = self.comparator.__clause_element__()
- try:
- if TYPE_CHECKING:
- assert isinstance(ce, ColumnElement)
- anno = ce._annotate
- except AttributeError as ae:
- raise exc.InvalidRequestError(
- 'When interpreting attribute "%s" as a SQL expression, '
- "expected __clause_element__() to return "
- "a ClauseElement object, got: %r" % (self, ce)
- ) from ae
- else:
- return anno(annotations)
-
- def _memoized_attr__propagate_attrs(self) -> _PropagateAttrsType:
- # this suits the case in coercions where we don't actually
- # call ``__clause_element__()`` but still need to get
- # resolved._propagate_attrs. See #6558.
- return util.immutabledict(
- {
- "compile_state_plugin": "orm",
- "plugin_subject": self._parentmapper,
- }
- )
-
- @property
- def _entity_namespace(self) -> _InternalEntityType[Any]:
- return self._parententity
-
- @property
- def _annotations(self) -> _AnnotationDict:
- return self.__clause_element__()._annotations
-
- def __clause_element__(self) -> ColumnElement[_T_co]:
- return self.expression
-
- @property
- def _from_objects(self) -> List[FromClause]:
- return self.expression._from_objects
-
- def _bulk_update_tuples(
- self, value: Any
- ) -> Sequence[Tuple[_DMLColumnArgument, Any]]:
- """Return setter tuples for a bulk UPDATE."""
-
- return self.comparator._bulk_update_tuples(value)
-
- def adapt_to_entity(self, adapt_to_entity: AliasedInsp[Any]) -> Self:
- assert not self._of_type
- return self.__class__(
- adapt_to_entity.entity,
- self.key,
- impl=self.impl,
- comparator=self.comparator.adapt_to_entity(adapt_to_entity),
- parententity=adapt_to_entity,
- )
-
- def of_type(self, entity: _EntityType[Any]) -> QueryableAttribute[_T]:
- return QueryableAttribute(
- self.class_,
- self.key,
- self._parententity,
- impl=self.impl,
- comparator=self.comparator.of_type(entity),
- of_type=inspection.inspect(entity),
- extra_criteria=self._extra_criteria,
- )
-
- def and_(
- self, *clauses: _ColumnExpressionArgument[bool]
- ) -> QueryableAttribute[bool]:
- if TYPE_CHECKING:
- assert isinstance(self.comparator, RelationshipProperty.Comparator)
-
- exprs = tuple(
- coercions.expect(roles.WhereHavingRole, clause)
- for clause in util.coerce_generator_arg(clauses)
- )
-
- return QueryableAttribute(
- self.class_,
- self.key,
- self._parententity,
- impl=self.impl,
- comparator=self.comparator.and_(*exprs),
- of_type=self._of_type,
- extra_criteria=self._extra_criteria + exprs,
- )
-
- def _clone(self, **kw: Any) -> QueryableAttribute[_T]:
- return QueryableAttribute(
- self.class_,
- self.key,
- self._parententity,
- impl=self.impl,
- comparator=self.comparator,
- of_type=self._of_type,
- extra_criteria=self._extra_criteria,
- )
-
- def label(self, name: Optional[str]) -> Label[_T_co]:
- return self.__clause_element__().label(name)
-
- def operate(
- self, op: OperatorType, *other: Any, **kwargs: Any
- ) -> ColumnElement[Any]:
- return op(self.comparator, *other, **kwargs) # type: ignore[no-any-return] # noqa: E501
-
- def reverse_operate(
- self, op: OperatorType, other: Any, **kwargs: Any
- ) -> ColumnElement[Any]:
- return op(other, self.comparator, **kwargs) # type: ignore[no-any-return] # noqa: E501
-
- def hasparent(
- self, state: InstanceState[Any], optimistic: bool = False
- ) -> bool:
- return self.impl.hasparent(state, optimistic=optimistic) is not False
-
- def __getattr__(self, key: str) -> Any:
- try:
- return util.MemoizedSlots.__getattr__(self, key)
- except AttributeError:
- pass
-
- try:
- return getattr(self.comparator, key)
- except AttributeError as err:
- raise AttributeError(
- "Neither %r object nor %r object associated with %s "
- "has an attribute %r"
- % (
- type(self).__name__,
- type(self.comparator).__name__,
- self,
- key,
- )
- ) from err
-
- def __str__(self) -> str:
- return f"{self.class_.__name__}.{self.key}"
-
- def _memoized_attr_property(self) -> Optional[MapperProperty[Any]]:
- return self.comparator.property
-
-
-def _queryable_attribute_unreduce(
- key: str,
- mapped_class: Type[_O],
- parententity: _InternalEntityType[_O],
- entity: _ExternalEntityType[Any],
-) -> Any:
- # this method is only used in terms of the
- # sqlalchemy.ext.serializer extension
- if insp_is_aliased_class(parententity):
- return entity._get_from_serialized(key, mapped_class, parententity)
- else:
- return getattr(entity, key)
-
-
-class InstrumentedAttribute(QueryableAttribute[_T_co]):
- """Class bound instrumented attribute which adds basic
- :term:`descriptor` methods.
-
- See :class:`.QueryableAttribute` for a description of most features.
-
-
- """
-
- __slots__ = ()
-
- inherit_cache = True
- """:meta private:"""
-
- # hack to make __doc__ writeable on instances of
- # InstrumentedAttribute, while still keeping classlevel
- # __doc__ correct
-
- @util.rw_hybridproperty
- def __doc__(self) -> Optional[str]:
- return self._doc
-
- @__doc__.setter # type: ignore
- def __doc__(self, value: Optional[str]) -> None:
- self._doc = value
-
- @__doc__.classlevel # type: ignore
- def __doc__(cls) -> Optional[str]:
- return super().__doc__
-
- def __set__(self, instance: object, value: Any) -> None:
- self.impl.set(
- instance_state(instance), instance_dict(instance), value, None
- )
-
- def __delete__(self, instance: object) -> None:
- self.impl.delete(instance_state(instance), instance_dict(instance))
-
- @overload
- def __get__(
- self, instance: None, owner: Any
- ) -> InstrumentedAttribute[_T_co]: ...
-
- @overload
- def __get__(self, instance: object, owner: Any) -> _T_co: ...
-
- def __get__(
- self, instance: Optional[object], owner: Any
- ) -> Union[InstrumentedAttribute[_T_co], _T_co]:
- if instance is None:
- return self
-
- dict_ = instance_dict(instance)
- if self.impl.supports_population and self.key in dict_:
- return dict_[self.key] # type: ignore[no-any-return]
- else:
- try:
- state = instance_state(instance)
- except AttributeError as err:
- raise orm_exc.UnmappedInstanceError(instance) from err
- return self.impl.get(state, dict_) # type: ignore[no-any-return]
-
-
-@dataclasses.dataclass(frozen=True)
-class AdHocHasEntityNamespace(HasCacheKey):
- _traverse_internals: ClassVar[_TraverseInternalsType] = [
- ("_entity_namespace", InternalTraversal.dp_has_cache_key),
- ]
-
- # py37 compat, no slots=True on dataclass
- __slots__ = ("_entity_namespace",)
- _entity_namespace: _InternalEntityType[Any]
- is_mapper: ClassVar[bool] = False
- is_aliased_class: ClassVar[bool] = False
-
- @property
- def entity_namespace(self):
- return self._entity_namespace.entity_namespace
-
-
-def create_proxied_attribute(
- descriptor: Any,
-) -> Callable[..., QueryableAttribute[Any]]:
- """Create an QueryableAttribute / user descriptor hybrid.
-
- Returns a new QueryableAttribute type that delegates descriptor
- behavior and getattr() to the given descriptor.
- """
-
- # TODO: can move this to descriptor_props if the need for this
- # function is removed from ext/hybrid.py
-
- class Proxy(QueryableAttribute[Any]):
- """Presents the :class:`.QueryableAttribute` interface as a
- proxy on top of a Python descriptor / :class:`.PropComparator`
- combination.
-
- """
-
- _extra_criteria = ()
-
- # the attribute error catches inside of __getattr__ basically create a
- # singularity if you try putting slots on this too
- # __slots__ = ("descriptor", "original_property", "_comparator")
-
- def __init__(
- self,
- class_,
- key,
- descriptor,
- comparator,
- adapt_to_entity=None,
- doc=None,
- original_property=None,
- ):
- self.class_ = class_
- self.key = key
- self.descriptor = descriptor
- self.original_property = original_property
- self._comparator = comparator
- self._adapt_to_entity = adapt_to_entity
- self._doc = self.__doc__ = doc
-
- @property
- def _parententity(self):
- return inspection.inspect(self.class_, raiseerr=False)
-
- @property
- def parent(self):
- return inspection.inspect(self.class_, raiseerr=False)
-
- _is_internal_proxy = True
-
- _cache_key_traversal = [
- ("key", visitors.ExtendedInternalTraversal.dp_string),
- ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
- ]
-
- @property
- def _impl_uses_objects(self):
- return (
- self.original_property is not None
- and getattr(self.class_, self.key).impl.uses_objects
- )
-
- @property
- def _entity_namespace(self):
- if hasattr(self._comparator, "_parententity"):
- return self._comparator._parententity
- else:
- # used by hybrid attributes which try to remain
- # agnostic of any ORM concepts like mappers
- return AdHocHasEntityNamespace(self._parententity)
-
- @property
- def property(self):
- return self.comparator.property
-
- @util.memoized_property
- def comparator(self):
- if callable(self._comparator):
- self._comparator = self._comparator()
- if self._adapt_to_entity:
- self._comparator = self._comparator.adapt_to_entity(
- self._adapt_to_entity
- )
- return self._comparator
-
- def adapt_to_entity(self, adapt_to_entity):
- return self.__class__(
- adapt_to_entity.entity,
- self.key,
- self.descriptor,
- self._comparator,
- adapt_to_entity,
- )
-
- def _clone(self, **kw):
- return self.__class__(
- self.class_,
- self.key,
- self.descriptor,
- self._comparator,
- adapt_to_entity=self._adapt_to_entity,
- original_property=self.original_property,
- )
-
- def __get__(self, instance, owner):
- retval = self.descriptor.__get__(instance, owner)
- # detect if this is a plain Python @property, which just returns
- # itself for class level access. If so, then return us.
- # Otherwise, return the object returned by the descriptor.
- if retval is self.descriptor and instance is None:
- return self
- else:
- return retval
-
- def __str__(self) -> str:
- return f"{self.class_.__name__}.{self.key}"
-
- def __getattr__(self, attribute):
- """Delegate __getattr__ to the original descriptor and/or
- comparator."""
-
- # this is unfortunately very complicated, and is easily prone
- # to recursion overflows when implementations of related
- # __getattr__ schemes are changed
-
- try:
- return util.MemoizedSlots.__getattr__(self, attribute)
- except AttributeError:
- pass
-
- try:
- return getattr(descriptor, attribute)
- except AttributeError as err:
- if attribute == "comparator":
- raise AttributeError("comparator") from err
- try:
- # comparator itself might be unreachable
- comparator = self.comparator
- except AttributeError as err2:
- raise AttributeError(
- "Neither %r object nor unconfigured comparator "
- "object associated with %s has an attribute %r"
- % (type(descriptor).__name__, self, attribute)
- ) from err2
- else:
- try:
- return getattr(comparator, attribute)
- except AttributeError as err3:
- raise AttributeError(
- "Neither %r object nor %r object "
- "associated with %s has an attribute %r"
- % (
- type(descriptor).__name__,
- type(comparator).__name__,
- self,
- attribute,
- )
- ) from err3
-
- Proxy.__name__ = type(descriptor).__name__ + "Proxy"
-
- util.monkeypatch_proxied_specials(
- Proxy, type(descriptor), name="descriptor", from_instance=descriptor
- )
- return Proxy
-
-
-OP_REMOVE = util.symbol("REMOVE")
-OP_APPEND = util.symbol("APPEND")
-OP_REPLACE = util.symbol("REPLACE")
-OP_BULK_REPLACE = util.symbol("BULK_REPLACE")
-OP_MODIFIED = util.symbol("MODIFIED")
-
-
-class AttributeEventToken:
- """A token propagated throughout the course of a chain of attribute
- events.
-
- Serves as an indicator of the source of the event and also provides
- a means of controlling propagation across a chain of attribute
- operations.
-
- The :class:`.Event` object is sent as the ``initiator`` argument
- when dealing with events such as :meth:`.AttributeEvents.append`,
- :meth:`.AttributeEvents.set`,
- and :meth:`.AttributeEvents.remove`.
-
- The :class:`.Event` object is currently interpreted by the backref
- event handlers, and is used to control the propagation of operations
- across two mutually-dependent attributes.
-
- .. versionchanged:: 2.0 Changed the name from ``AttributeEvent``
- to ``AttributeEventToken``.
-
- :attribute impl: The :class:`.AttributeImpl` which is the current event
- initiator.
-
- :attribute op: The symbol :attr:`.OP_APPEND`, :attr:`.OP_REMOVE`,
- :attr:`.OP_REPLACE`, or :attr:`.OP_BULK_REPLACE`, indicating the
- source operation.
-
- """
-
- __slots__ = "impl", "op", "parent_token"
-
- def __init__(self, attribute_impl: AttributeImpl, op: util.symbol):
- self.impl = attribute_impl
- self.op = op
- self.parent_token = self.impl.parent_token
-
- def __eq__(self, other):
- return (
- isinstance(other, AttributeEventToken)
- and other.impl is self.impl
- and other.op == self.op
- )
-
- @property
- def key(self):
- return self.impl.key
-
- def hasparent(self, state):
- return self.impl.hasparent(state)
-
-
-AttributeEvent = AttributeEventToken # legacy
-Event = AttributeEventToken # legacy
-
-
-class AttributeImpl:
- """internal implementation for instrumented attributes."""
-
- collection: bool
- default_accepts_scalar_loader: bool
- uses_objects: bool
- supports_population: bool
- dynamic: bool
-
- _is_has_collection_adapter = False
-
- _replace_token: AttributeEventToken
- _remove_token: AttributeEventToken
- _append_token: AttributeEventToken
-
- def __init__(
- self,
- class_: _ExternalEntityType[_O],
- key: str,
- callable_: Optional[_LoaderCallable],
- dispatch: _Dispatch[QueryableAttribute[Any]],
- trackparent: bool = False,
- compare_function: Optional[Callable[..., bool]] = None,
- active_history: bool = False,
- parent_token: Optional[AttributeEventToken] = None,
- load_on_unexpire: bool = True,
- send_modified_events: bool = True,
- accepts_scalar_loader: Optional[bool] = None,
- **kwargs: Any,
- ):
- r"""Construct an AttributeImpl.
-
- :param \class_: associated class
-
- :param key: string name of the attribute
-
- :param \callable_:
- optional function which generates a callable based on a parent
- instance, which produces the "default" values for a scalar or
- collection attribute when it's first accessed, if not present
- already.
-
- :param trackparent:
- if True, attempt to track if an instance has a parent attached
- to it via this attribute.
-
- :param compare_function:
- a function that compares two values which are normally
- assignable to this attribute.
-
- :param active_history:
- indicates that get_history() should always return the "old" value,
- even if it means executing a lazy callable upon attribute change.
-
- :param parent_token:
- Usually references the MapperProperty, used as a key for
- the hasparent() function to identify an "owning" attribute.
- Allows multiple AttributeImpls to all match a single
- owner attribute.
-
- :param load_on_unexpire:
- if False, don't include this attribute in a load-on-expired
- operation, i.e. the "expired_attribute_loader" process.
- The attribute can still be in the "expired" list and be
- considered to be "expired". Previously, this flag was called
- "expire_missing" and is only used by a deferred column
- attribute.
-
- :param send_modified_events:
- if False, the InstanceState._modified_event method will have no
- effect; this means the attribute will never show up as changed in a
- history entry.
-
- """
- self.class_ = class_
- self.key = key
- self.callable_ = callable_
- self.dispatch = dispatch
- self.trackparent = trackparent
- self.parent_token = parent_token or self
- self.send_modified_events = send_modified_events
- if compare_function is None:
- self.is_equal = operator.eq
- else:
- self.is_equal = compare_function
-
- if accepts_scalar_loader is not None:
- self.accepts_scalar_loader = accepts_scalar_loader
- else:
- self.accepts_scalar_loader = self.default_accepts_scalar_loader
-
- _deferred_history = kwargs.pop("_deferred_history", False)
- self._deferred_history = _deferred_history
-
- if active_history:
- self.dispatch._active_history = True
-
- self.load_on_unexpire = load_on_unexpire
- self._modified_token = AttributeEventToken(self, OP_MODIFIED)
-
- __slots__ = (
- "class_",
- "key",
- "callable_",
- "dispatch",
- "trackparent",
- "parent_token",
- "send_modified_events",
- "is_equal",
- "load_on_unexpire",
- "_modified_token",
- "accepts_scalar_loader",
- "_deferred_history",
- )
-
- def __str__(self) -> str:
- return f"{self.class_.__name__}.{self.key}"
-
- def _get_active_history(self):
- """Backwards compat for impl.active_history"""
-
- return self.dispatch._active_history
-
- def _set_active_history(self, value):
- self.dispatch._active_history = value
-
- active_history = property(_get_active_history, _set_active_history)
-
- def hasparent(
- self, state: InstanceState[Any], optimistic: bool = False
- ) -> bool:
- """Return the boolean value of a `hasparent` flag attached to
- the given state.
-
- The `optimistic` flag determines what the default return value
- should be if no `hasparent` flag can be located.
-
- As this function is used to determine if an instance is an
- *orphan*, instances that were loaded from storage should be
- assumed to not be orphans, until a True/False value for this
- flag is set.
-
- An instance attribute that is loaded by a callable function
- will also not have a `hasparent` flag.
-
- """
- msg = "This AttributeImpl is not configured to track parents."
- assert self.trackparent, msg
-
- return (
- state.parents.get(id(self.parent_token), optimistic) is not False
- )
-
- def sethasparent(
- self,
- state: InstanceState[Any],
- parent_state: InstanceState[Any],
- value: bool,
- ) -> None:
- """Set a boolean flag on the given item corresponding to
- whether or not it is attached to a parent object via the
- attribute represented by this ``InstrumentedAttribute``.
-
- """
- msg = "This AttributeImpl is not configured to track parents."
- assert self.trackparent, msg
-
- id_ = id(self.parent_token)
- if value:
- state.parents[id_] = parent_state
- else:
- if id_ in state.parents:
- last_parent = state.parents[id_]
-
- if (
- last_parent is not False
- and last_parent.key != parent_state.key
- ):
- if last_parent.obj() is None:
- raise orm_exc.StaleDataError(
- "Removing state %s from parent "
- "state %s along attribute '%s', "
- "but the parent record "
- "has gone stale, can't be sure this "
- "is the most recent parent."
- % (
- state_str(state),
- state_str(parent_state),
- self.key,
- )
- )
-
- return
-
- state.parents[id_] = False
-
- def get_history(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> History:
- raise NotImplementedError()
-
- def get_all_pending(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
- ) -> _AllPendingType:
- """Return a list of tuples of (state, obj)
- for all objects in this attribute's current state
- + history.
-
- Only applies to object-based attributes.
-
- This is an inlining of existing functionality
- which roughly corresponds to:
-
- get_state_history(
- state,
- key,
- passive=PASSIVE_NO_INITIALIZE).sum()
-
- """
- raise NotImplementedError()
-
- def _default_value(
- self, state: InstanceState[Any], dict_: _InstanceDict
- ) -> Any:
- """Produce an empty value for an uninitialized scalar attribute."""
-
- assert self.key not in dict_, (
- "_default_value should only be invoked for an "
- "uninitialized or expired attribute"
- )
-
- value = None
- for fn in self.dispatch.init_scalar:
- ret = fn(state, value, dict_)
- if ret is not ATTR_EMPTY:
- value = ret
-
- return value
-
- def get(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> Any:
- """Retrieve a value from the given object.
- If a callable is assembled on this object's attribute, and
- passive is False, the callable will be executed and the
- resulting value will be set as the new value for this attribute.
- """
- if self.key in dict_:
- return dict_[self.key]
- else:
- # if history present, don't load
- key = self.key
- if (
- key not in state.committed_state
- or state.committed_state[key] is NO_VALUE
- ):
- if not passive & CALLABLES_OK:
- return PASSIVE_NO_RESULT
-
- value = self._fire_loader_callables(state, key, passive)
-
- if value is PASSIVE_NO_RESULT or value is NO_VALUE:
- return value
- elif value is ATTR_WAS_SET:
- try:
- return dict_[key]
- except KeyError as err:
- # TODO: no test coverage here.
- raise KeyError(
- "Deferred loader for attribute "
- "%r failed to populate "
- "correctly" % key
- ) from err
- elif value is not ATTR_EMPTY:
- return self.set_committed_value(state, dict_, value)
-
- if not passive & INIT_OK:
- return NO_VALUE
- else:
- return self._default_value(state, dict_)
-
- def _fire_loader_callables(
- self, state: InstanceState[Any], key: str, passive: PassiveFlag
- ) -> Any:
- if (
- self.accepts_scalar_loader
- and self.load_on_unexpire
- and key in state.expired_attributes
- ):
- return state._load_expired(state, passive)
- elif key in state.callables:
- callable_ = state.callables[key]
- return callable_(state, passive)
- elif self.callable_:
- return self.callable_(state, passive)
- else:
- return ATTR_EMPTY
-
- def append(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken],
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> None:
- self.set(state, dict_, value, initiator, passive=passive)
-
- def remove(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken],
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> None:
- self.set(
- state, dict_, None, initiator, passive=passive, check_old=value
- )
-
- def pop(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken],
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> None:
- self.set(
- state,
- dict_,
- None,
- initiator,
- passive=passive,
- check_old=value,
- pop=True,
- )
-
- def set(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken] = None,
- passive: PassiveFlag = PASSIVE_OFF,
- check_old: Any = None,
- pop: bool = False,
- ) -> None:
- raise NotImplementedError()
-
- def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
- raise NotImplementedError()
-
- def get_committed_value(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> Any:
- """return the unchanged value of this attribute"""
-
- if self.key in state.committed_state:
- value = state.committed_state[self.key]
- if value is NO_VALUE:
- return None
- else:
- return value
- else:
- return self.get(state, dict_, passive=passive)
-
- def set_committed_value(self, state, dict_, value):
- """set an attribute value on the given instance and 'commit' it."""
-
- dict_[self.key] = value
- state._commit(dict_, [self.key])
- return value
-
-
-class ScalarAttributeImpl(AttributeImpl):
- """represents a scalar value-holding InstrumentedAttribute."""
-
- default_accepts_scalar_loader = True
- uses_objects = False
- supports_population = True
- collection = False
- dynamic = False
-
- __slots__ = "_replace_token", "_append_token", "_remove_token"
-
- def __init__(self, *arg, **kw):
- super().__init__(*arg, **kw)
- self._replace_token = self._append_token = AttributeEventToken(
- self, OP_REPLACE
- )
- self._remove_token = AttributeEventToken(self, OP_REMOVE)
-
- def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
- if self.dispatch._active_history:
- old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
- else:
- old = dict_.get(self.key, NO_VALUE)
-
- if self.dispatch.remove:
- self.fire_remove_event(state, dict_, old, self._remove_token)
- state._modified_event(dict_, self, old)
-
- existing = dict_.pop(self.key, NO_VALUE)
- if (
- existing is NO_VALUE
- and old is NO_VALUE
- and not state.expired
- and self.key not in state.expired_attributes
- ):
- raise AttributeError("%s object does not have a value" % self)
-
- def get_history(
- self,
- state: InstanceState[Any],
- dict_: Dict[str, Any],
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> History:
- if self.key in dict_:
- return History.from_scalar_attribute(self, state, dict_[self.key])
- elif self.key in state.committed_state:
- return History.from_scalar_attribute(self, state, NO_VALUE)
- else:
- if passive & INIT_OK:
- passive ^= INIT_OK
- current = self.get(state, dict_, passive=passive)
- if current is PASSIVE_NO_RESULT:
- return HISTORY_BLANK
- else:
- return History.from_scalar_attribute(self, state, current)
-
- def set(
- self,
- state: InstanceState[Any],
- dict_: Dict[str, Any],
- value: Any,
- initiator: Optional[AttributeEventToken] = None,
- passive: PassiveFlag = PASSIVE_OFF,
- check_old: Optional[object] = None,
- pop: bool = False,
- ) -> None:
- if self.dispatch._active_history:
- old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
- else:
- old = dict_.get(self.key, NO_VALUE)
-
- if self.dispatch.set:
- value = self.fire_replace_event(
- state, dict_, value, old, initiator
- )
- state._modified_event(dict_, self, old)
- dict_[self.key] = value
-
- def fire_replace_event(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: _T,
- previous: Any,
- initiator: Optional[AttributeEventToken],
- ) -> _T:
- for fn in self.dispatch.set:
- value = fn(
- state, value, previous, initiator or self._replace_token
- )
- return value
-
- def fire_remove_event(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken],
- ) -> None:
- for fn in self.dispatch.remove:
- fn(state, value, initiator or self._remove_token)
-
-
-class ScalarObjectAttributeImpl(ScalarAttributeImpl):
- """represents a scalar-holding InstrumentedAttribute,
- where the target object is also instrumented.
-
- Adds events to delete/set operations.
-
- """
-
- default_accepts_scalar_loader = False
- uses_objects = True
- supports_population = True
- collection = False
-
- __slots__ = ()
-
- def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
- if self.dispatch._active_history:
- old = self.get(
- state,
- dict_,
- passive=PASSIVE_ONLY_PERSISTENT
- | NO_AUTOFLUSH
- | LOAD_AGAINST_COMMITTED,
- )
- else:
- old = self.get(
- state,
- dict_,
- passive=PASSIVE_NO_FETCH ^ INIT_OK
- | LOAD_AGAINST_COMMITTED
- | NO_RAISE,
- )
-
- self.fire_remove_event(state, dict_, old, self._remove_token)
-
- existing = dict_.pop(self.key, NO_VALUE)
-
- # if the attribute is expired, we currently have no way to tell
- # that an object-attribute was expired vs. not loaded. So
- # for this test, we look to see if the object has a DB identity.
- if (
- existing is NO_VALUE
- and old is not PASSIVE_NO_RESULT
- and state.key is None
- ):
- raise AttributeError("%s object does not have a value" % self)
-
- def get_history(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> History:
- if self.key in dict_:
- current = dict_[self.key]
- else:
- if passive & INIT_OK:
- passive ^= INIT_OK
- current = self.get(state, dict_, passive=passive)
- if current is PASSIVE_NO_RESULT:
- return HISTORY_BLANK
-
- if not self._deferred_history:
- return History.from_object_attribute(self, state, current)
- else:
- original = state.committed_state.get(self.key, _NO_HISTORY)
- if original is PASSIVE_NO_RESULT:
- loader_passive = passive | (
- PASSIVE_ONLY_PERSISTENT
- | NO_AUTOFLUSH
- | LOAD_AGAINST_COMMITTED
- | NO_RAISE
- | DEFERRED_HISTORY_LOAD
- )
- original = self._fire_loader_callables(
- state, self.key, loader_passive
- )
- return History.from_object_attribute(
- self, state, current, original=original
- )
-
- def get_all_pending(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
- ) -> _AllPendingType:
- if self.key in dict_:
- current = dict_[self.key]
- elif passive & CALLABLES_OK:
- current = self.get(state, dict_, passive=passive)
- else:
- return []
-
- ret: _AllPendingType
-
- # can't use __hash__(), can't use __eq__() here
- if (
- current is not None
- and current is not PASSIVE_NO_RESULT
- and current is not NO_VALUE
- ):
- ret = [(instance_state(current), current)]
- else:
- ret = [(None, None)]
-
- if self.key in state.committed_state:
- original = state.committed_state[self.key]
- if (
- original is not None
- and original is not PASSIVE_NO_RESULT
- and original is not NO_VALUE
- and original is not current
- ):
- ret.append((instance_state(original), original))
- return ret
-
- def set(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken] = None,
- passive: PassiveFlag = PASSIVE_OFF,
- check_old: Any = None,
- pop: bool = False,
- ) -> None:
- """Set a value on the given InstanceState."""
-
- if self.dispatch._active_history:
- old = self.get(
- state,
- dict_,
- passive=PASSIVE_ONLY_PERSISTENT
- | NO_AUTOFLUSH
- | LOAD_AGAINST_COMMITTED,
- )
- else:
- old = self.get(
- state,
- dict_,
- passive=PASSIVE_NO_FETCH ^ INIT_OK
- | LOAD_AGAINST_COMMITTED
- | NO_RAISE,
- )
-
- if (
- check_old is not None
- and old is not PASSIVE_NO_RESULT
- and check_old is not old
- ):
- if pop:
- return
- else:
- raise ValueError(
- "Object %s not associated with %s on attribute '%s'"
- % (instance_str(check_old), state_str(state), self.key)
- )
-
- value = self.fire_replace_event(state, dict_, value, old, initiator)
- dict_[self.key] = value
-
- def fire_remove_event(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken],
- ) -> None:
- if self.trackparent and value not in (
- None,
- PASSIVE_NO_RESULT,
- NO_VALUE,
- ):
- self.sethasparent(instance_state(value), state, False)
-
- for fn in self.dispatch.remove:
- fn(state, value, initiator or self._remove_token)
-
- state._modified_event(dict_, self, value)
-
- def fire_replace_event(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: _T,
- previous: Any,
- initiator: Optional[AttributeEventToken],
- ) -> _T:
- if self.trackparent:
- if previous is not value and previous not in (
- None,
- PASSIVE_NO_RESULT,
- NO_VALUE,
- ):
- self.sethasparent(instance_state(previous), state, False)
-
- for fn in self.dispatch.set:
- value = fn(
- state, value, previous, initiator or self._replace_token
- )
-
- state._modified_event(dict_, self, previous)
-
- if self.trackparent:
- if value is not None:
- self.sethasparent(instance_state(value), state, True)
-
- return value
-
-
-class HasCollectionAdapter:
- __slots__ = ()
-
- collection: bool
- _is_has_collection_adapter = True
-
- def _dispose_previous_collection(
- self,
- state: InstanceState[Any],
- collection: _AdaptedCollectionProtocol,
- adapter: CollectionAdapter,
- fire_event: bool,
- ) -> None:
- raise NotImplementedError()
-
- @overload
- def get_collection(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- user_data: Literal[None] = ...,
- passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
- ) -> CollectionAdapter: ...
-
- @overload
- def get_collection(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- user_data: _AdaptedCollectionProtocol = ...,
- passive: PassiveFlag = ...,
- ) -> CollectionAdapter: ...
-
- @overload
- def get_collection(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- user_data: Optional[_AdaptedCollectionProtocol] = ...,
- passive: PassiveFlag = ...,
- ) -> Union[
- Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
- ]: ...
-
- def get_collection(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- user_data: Optional[_AdaptedCollectionProtocol] = None,
- passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
- ) -> Union[
- Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
- ]:
- raise NotImplementedError()
-
- def set(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken] = None,
- passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
- check_old: Any = None,
- pop: bool = False,
- _adapt: bool = True,
- ) -> None:
- raise NotImplementedError()
-
-
-if TYPE_CHECKING:
-
- def _is_collection_attribute_impl(
- impl: AttributeImpl,
- ) -> TypeGuard[CollectionAttributeImpl]: ...
-
-else:
- _is_collection_attribute_impl = operator.attrgetter("collection")
-
-
-class CollectionAttributeImpl(HasCollectionAdapter, AttributeImpl):
- """A collection-holding attribute that instruments changes in membership.
-
- Only handles collections of instrumented objects.
-
- InstrumentedCollectionAttribute holds an arbitrary, user-specified
- container object (defaulting to a list) and brokers access to the
- CollectionAdapter, a "view" onto that object that presents consistent bag
- semantics to the orm layer independent of the user data implementation.
-
- """
-
- uses_objects = True
- collection = True
- default_accepts_scalar_loader = False
- supports_population = True
- dynamic = False
-
- _bulk_replace_token: AttributeEventToken
-
- __slots__ = (
- "copy",
- "collection_factory",
- "_append_token",
- "_remove_token",
- "_bulk_replace_token",
- "_duck_typed_as",
- )
-
- def __init__(
- self,
- class_,
- key,
- callable_,
- dispatch,
- typecallable=None,
- trackparent=False,
- copy_function=None,
- compare_function=None,
- **kwargs,
- ):
- super().__init__(
- class_,
- key,
- callable_,
- dispatch,
- trackparent=trackparent,
- compare_function=compare_function,
- **kwargs,
- )
-
- if copy_function is None:
- copy_function = self.__copy
- self.copy = copy_function
- self.collection_factory = typecallable
- self._append_token = AttributeEventToken(self, OP_APPEND)
- self._remove_token = AttributeEventToken(self, OP_REMOVE)
- self._bulk_replace_token = AttributeEventToken(self, OP_BULK_REPLACE)
- self._duck_typed_as = util.duck_type_collection(
- self.collection_factory()
- )
-
- if getattr(self.collection_factory, "_sa_linker", None):
-
- @event.listens_for(self, "init_collection")
- def link(target, collection, collection_adapter):
- collection._sa_linker(collection_adapter)
-
- @event.listens_for(self, "dispose_collection")
- def unlink(target, collection, collection_adapter):
- collection._sa_linker(None)
-
- def __copy(self, item):
- return [y for y in collections.collection_adapter(item)]
-
- def get_history(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> History:
- current = self.get(state, dict_, passive=passive)
-
- if current is PASSIVE_NO_RESULT:
- if (
- passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS
- and self.key in state._pending_mutations
- ):
- pending = state._pending_mutations[self.key]
- return pending.merge_with_history(HISTORY_BLANK)
- else:
- return HISTORY_BLANK
- else:
- if passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS:
- # this collection is loaded / present. should not be any
- # pending mutations
- assert self.key not in state._pending_mutations
-
- return History.from_collection(self, state, current)
-
- def get_all_pending(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
- ) -> _AllPendingType:
- # NOTE: passive is ignored here at the moment
-
- if self.key not in dict_:
- return []
-
- current = dict_[self.key]
- current = getattr(current, "_sa_adapter")
-
- if self.key in state.committed_state:
- original = state.committed_state[self.key]
- if original is not NO_VALUE:
- current_states = [
- ((c is not None) and instance_state(c) or None, c)
- for c in current
- ]
- original_states = [
- ((c is not None) and instance_state(c) or None, c)
- for c in original
- ]
-
- current_set = dict(current_states)
- original_set = dict(original_states)
-
- return (
- [
- (s, o)
- for s, o in current_states
- if s not in original_set
- ]
- + [(s, o) for s, o in current_states if s in original_set]
- + [
- (s, o)
- for s, o in original_states
- if s not in current_set
- ]
- )
-
- return [(instance_state(o), o) for o in current]
-
- def fire_append_event(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: _T,
- initiator: Optional[AttributeEventToken],
- key: Optional[Any],
- ) -> _T:
- for fn in self.dispatch.append:
- value = fn(state, value, initiator or self._append_token, key=key)
-
- state._modified_event(dict_, self, NO_VALUE, True)
-
- if self.trackparent and value is not None:
- self.sethasparent(instance_state(value), state, True)
-
- return value
-
- def fire_append_wo_mutation_event(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: _T,
- initiator: Optional[AttributeEventToken],
- key: Optional[Any],
- ) -> _T:
- for fn in self.dispatch.append_wo_mutation:
- value = fn(state, value, initiator or self._append_token, key=key)
-
- return value
-
- def fire_pre_remove_event(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- initiator: Optional[AttributeEventToken],
- key: Optional[Any],
- ) -> None:
- """A special event used for pop() operations.
-
- The "remove" event needs to have the item to be removed passed to
- it, which in the case of pop from a set, we don't have a way to access
- the item before the operation. the event is used for all pop()
- operations (even though set.pop is the one where it is really needed).
-
- """
- state._modified_event(dict_, self, NO_VALUE, True)
-
- def fire_remove_event(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken],
- key: Optional[Any],
- ) -> None:
- if self.trackparent and value is not None:
- self.sethasparent(instance_state(value), state, False)
-
- for fn in self.dispatch.remove:
- fn(state, value, initiator or self._remove_token, key=key)
-
- state._modified_event(dict_, self, NO_VALUE, True)
-
- def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
- if self.key not in dict_:
- return
-
- state._modified_event(dict_, self, NO_VALUE, True)
-
- collection = self.get_collection(state, state.dict)
- collection.clear_with_event()
-
- # key is always present because we checked above. e.g.
- # del is a no-op if collection not present.
- del dict_[self.key]
-
- def _default_value(
- self, state: InstanceState[Any], dict_: _InstanceDict
- ) -> _AdaptedCollectionProtocol:
- """Produce an empty collection for an un-initialized attribute"""
-
- assert self.key not in dict_, (
- "_default_value should only be invoked for an "
- "uninitialized or expired attribute"
- )
-
- if self.key in state._empty_collections:
- return state._empty_collections[self.key]
-
- adapter, user_data = self._initialize_collection(state)
- adapter._set_empty(user_data)
- return user_data
-
- def _initialize_collection(
- self, state: InstanceState[Any]
- ) -> Tuple[CollectionAdapter, _AdaptedCollectionProtocol]:
- adapter, collection = state.manager.initialize_collection(
- self.key, state, self.collection_factory
- )
-
- self.dispatch.init_collection(state, collection, adapter)
-
- return adapter, collection
-
- def append(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken],
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> None:
- collection = self.get_collection(
- state, dict_, user_data=None, passive=passive
- )
- if collection is PASSIVE_NO_RESULT:
- value = self.fire_append_event(
- state, dict_, value, initiator, key=NO_KEY
- )
- assert (
- self.key not in dict_
- ), "Collection was loaded during event handling."
- state._get_pending_mutation(self.key).append(value)
- else:
- if TYPE_CHECKING:
- assert isinstance(collection, CollectionAdapter)
- collection.append_with_event(value, initiator)
-
- def remove(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken],
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> None:
- collection = self.get_collection(
- state, state.dict, user_data=None, passive=passive
- )
- if collection is PASSIVE_NO_RESULT:
- self.fire_remove_event(state, dict_, value, initiator, key=NO_KEY)
- assert (
- self.key not in dict_
- ), "Collection was loaded during event handling."
- state._get_pending_mutation(self.key).remove(value)
- else:
- if TYPE_CHECKING:
- assert isinstance(collection, CollectionAdapter)
- collection.remove_with_event(value, initiator)
-
- def pop(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken],
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> None:
- try:
- # TODO: better solution here would be to add
- # a "popper" role to collections.py to complement
- # "remover".
- self.remove(state, dict_, value, initiator, passive=passive)
- except (ValueError, KeyError, IndexError):
- pass
-
- def set(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- value: Any,
- initiator: Optional[AttributeEventToken] = None,
- passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
- check_old: Any = None,
- pop: bool = False,
- _adapt: bool = True,
- ) -> None:
- iterable = orig_iterable = value
- new_keys = None
-
- # pulling a new collection first so that an adaptation exception does
- # not trigger a lazy load of the old collection.
- new_collection, user_data = self._initialize_collection(state)
- if _adapt:
- if new_collection._converter is not None:
- iterable = new_collection._converter(iterable)
- else:
- setting_type = util.duck_type_collection(iterable)
- receiving_type = self._duck_typed_as
-
- if setting_type is not receiving_type:
- given = (
- iterable is None
- and "None"
- or iterable.__class__.__name__
- )
- wanted = self._duck_typed_as.__name__
- raise TypeError(
- "Incompatible collection type: %s is not %s-like"
- % (given, wanted)
- )
-
- # If the object is an adapted collection, return the (iterable)
- # adapter.
- if hasattr(iterable, "_sa_iterator"):
- iterable = iterable._sa_iterator()
- elif setting_type is dict:
- new_keys = list(iterable)
- iterable = iterable.values()
- else:
- iterable = iter(iterable)
- elif util.duck_type_collection(iterable) is dict:
- new_keys = list(value)
-
- new_values = list(iterable)
-
- evt = self._bulk_replace_token
-
- self.dispatch.bulk_replace(state, new_values, evt, keys=new_keys)
-
- # propagate NO_RAISE in passive through to the get() for the
- # existing object (ticket #8862)
- old = self.get(
- state,
- dict_,
- passive=PASSIVE_ONLY_PERSISTENT ^ (passive & PassiveFlag.NO_RAISE),
- )
- if old is PASSIVE_NO_RESULT:
- old = self._default_value(state, dict_)
- elif old is orig_iterable:
- # ignore re-assignment of the current collection, as happens
- # implicitly with in-place operators (foo.collection |= other)
- return
-
- # place a copy of "old" in state.committed_state
- state._modified_event(dict_, self, old, True)
-
- old_collection = old._sa_adapter
-
- dict_[self.key] = user_data
-
- collections.bulk_replace(
- new_values, old_collection, new_collection, initiator=evt
- )
-
- self._dispose_previous_collection(state, old, old_collection, True)
-
- def _dispose_previous_collection(
- self,
- state: InstanceState[Any],
- collection: _AdaptedCollectionProtocol,
- adapter: CollectionAdapter,
- fire_event: bool,
- ) -> None:
- del collection._sa_adapter
-
- # discarding old collection make sure it is not referenced in empty
- # collections.
- state._empty_collections.pop(self.key, None)
- if fire_event:
- self.dispatch.dispose_collection(state, collection, adapter)
-
- def _invalidate_collection(
- self, collection: _AdaptedCollectionProtocol
- ) -> None:
- adapter = getattr(collection, "_sa_adapter")
- adapter.invalidated = True
-
- def set_committed_value(
- self, state: InstanceState[Any], dict_: _InstanceDict, value: Any
- ) -> _AdaptedCollectionProtocol:
- """Set an attribute value on the given instance and 'commit' it."""
-
- collection, user_data = self._initialize_collection(state)
-
- if value:
- collection.append_multiple_without_event(value)
-
- state.dict[self.key] = user_data
-
- state._commit(dict_, [self.key])
-
- if self.key in state._pending_mutations:
- # pending items exist. issue a modified event,
- # add/remove new items.
- state._modified_event(dict_, self, user_data, True)
-
- pending = state._pending_mutations.pop(self.key)
- added = pending.added_items
- removed = pending.deleted_items
- for item in added:
- collection.append_without_event(item)
- for item in removed:
- collection.remove_without_event(item)
-
- return user_data
-
- @overload
- def get_collection(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- user_data: Literal[None] = ...,
- passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
- ) -> CollectionAdapter: ...
-
- @overload
- def get_collection(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- user_data: _AdaptedCollectionProtocol = ...,
- passive: PassiveFlag = ...,
- ) -> CollectionAdapter: ...
-
- @overload
- def get_collection(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- user_data: Optional[_AdaptedCollectionProtocol] = ...,
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> Union[
- Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
- ]: ...
-
- def get_collection(
- self,
- state: InstanceState[Any],
- dict_: _InstanceDict,
- user_data: Optional[_AdaptedCollectionProtocol] = None,
- passive: PassiveFlag = PASSIVE_OFF,
- ) -> Union[
- Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
- ]:
- """Retrieve the CollectionAdapter associated with the given state.
-
- if user_data is None, retrieves it from the state using normal
- "get()" rules, which will fire lazy callables or return the "empty"
- collection value.
-
- """
- if user_data is None:
- fetch_user_data = self.get(state, dict_, passive=passive)
- if fetch_user_data is LoaderCallableStatus.PASSIVE_NO_RESULT:
- return fetch_user_data
- else:
- user_data = cast("_AdaptedCollectionProtocol", fetch_user_data)
-
- return user_data._sa_adapter
-
-
-def backref_listeners(
- attribute: QueryableAttribute[Any], key: str, uselist: bool
-) -> None:
- """Apply listeners to synchronize a two-way relationship."""
-
- # use easily recognizable names for stack traces.
-
- # in the sections marked "tokens to test for a recursive loop",
- # this is somewhat brittle and very performance-sensitive logic
- # that is specific to how we might arrive at each event. a marker
- # that can target us directly to arguments being invoked against
- # the impl might be simpler, but could interfere with other systems.
-
- parent_token = attribute.impl.parent_token
- parent_impl = attribute.impl
-
- def _acceptable_key_err(child_state, initiator, child_impl):
- raise ValueError(
- "Bidirectional attribute conflict detected: "
- 'Passing object %s to attribute "%s" '
- 'triggers a modify event on attribute "%s" '
- 'via the backref "%s".'
- % (
- state_str(child_state),
- initiator.parent_token,
- child_impl.parent_token,
- attribute.impl.parent_token,
- )
- )
-
- def emit_backref_from_scalar_set_event(
- state, child, oldchild, initiator, **kw
- ):
- if oldchild is child:
- return child
- if (
- oldchild is not None
- and oldchild is not PASSIVE_NO_RESULT
- and oldchild is not NO_VALUE
- ):
- # With lazy=None, there's no guarantee that the full collection is
- # present when updating via a backref.
- old_state, old_dict = (
- instance_state(oldchild),
- instance_dict(oldchild),
- )
- impl = old_state.manager[key].impl
-
- # tokens to test for a recursive loop.
- if not impl.collection and not impl.dynamic:
- check_recursive_token = impl._replace_token
- else:
- check_recursive_token = impl._remove_token
-
- if initiator is not check_recursive_token:
- impl.pop(
- old_state,
- old_dict,
- state.obj(),
- parent_impl._append_token,
- passive=PASSIVE_NO_FETCH,
- )
-
- if child is not None:
- child_state, child_dict = (
- instance_state(child),
- instance_dict(child),
- )
- child_impl = child_state.manager[key].impl
-
- if (
- initiator.parent_token is not parent_token
- and initiator.parent_token is not child_impl.parent_token
- ):
- _acceptable_key_err(state, initiator, child_impl)
-
- # tokens to test for a recursive loop.
- check_append_token = child_impl._append_token
- check_bulk_replace_token = (
- child_impl._bulk_replace_token
- if _is_collection_attribute_impl(child_impl)
- else None
- )
-
- if (
- initiator is not check_append_token
- and initiator is not check_bulk_replace_token
- ):
- child_impl.append(
- child_state,
- child_dict,
- state.obj(),
- initiator,
- passive=PASSIVE_NO_FETCH,
- )
- return child
-
- def emit_backref_from_collection_append_event(
- state, child, initiator, **kw
- ):
- if child is None:
- return
-
- child_state, child_dict = instance_state(child), instance_dict(child)
- child_impl = child_state.manager[key].impl
-
- if (
- initiator.parent_token is not parent_token
- and initiator.parent_token is not child_impl.parent_token
- ):
- _acceptable_key_err(state, initiator, child_impl)
-
- # tokens to test for a recursive loop.
- check_append_token = child_impl._append_token
- check_bulk_replace_token = (
- child_impl._bulk_replace_token
- if _is_collection_attribute_impl(child_impl)
- else None
- )
-
- if (
- initiator is not check_append_token
- and initiator is not check_bulk_replace_token
- ):
- child_impl.append(
- child_state,
- child_dict,
- state.obj(),
- initiator,
- passive=PASSIVE_NO_FETCH,
- )
- return child
-
- def emit_backref_from_collection_remove_event(
- state, child, initiator, **kw
- ):
- if (
- child is not None
- and child is not PASSIVE_NO_RESULT
- and child is not NO_VALUE
- ):
- child_state, child_dict = (
- instance_state(child),
- instance_dict(child),
- )
- child_impl = child_state.manager[key].impl
-
- check_replace_token: Optional[AttributeEventToken]
-
- # tokens to test for a recursive loop.
- if not child_impl.collection and not child_impl.dynamic:
- check_remove_token = child_impl._remove_token
- check_replace_token = child_impl._replace_token
- check_for_dupes_on_remove = uselist and not parent_impl.dynamic
- else:
- check_remove_token = child_impl._remove_token
- check_replace_token = (
- child_impl._bulk_replace_token
- if _is_collection_attribute_impl(child_impl)
- else None
- )
- check_for_dupes_on_remove = False
-
- if (
- initiator is not check_remove_token
- and initiator is not check_replace_token
- ):
- if not check_for_dupes_on_remove or not util.has_dupes(
- # when this event is called, the item is usually
- # present in the list, except for a pop() operation.
- state.dict[parent_impl.key],
- child,
- ):
- child_impl.pop(
- child_state,
- child_dict,
- state.obj(),
- initiator,
- passive=PASSIVE_NO_FETCH,
- )
-
- if uselist:
- event.listen(
- attribute,
- "append",
- emit_backref_from_collection_append_event,
- retval=True,
- raw=True,
- include_key=True,
- )
- else:
- event.listen(
- attribute,
- "set",
- emit_backref_from_scalar_set_event,
- retval=True,
- raw=True,
- include_key=True,
- )
- # TODO: need coverage in test/orm/ of remove event
- event.listen(
- attribute,
- "remove",
- emit_backref_from_collection_remove_event,
- retval=True,
- raw=True,
- include_key=True,
- )
-
-
-_NO_HISTORY = util.symbol("NO_HISTORY")
-_NO_STATE_SYMBOLS = frozenset([id(PASSIVE_NO_RESULT), id(NO_VALUE)])
-
-
-class History(NamedTuple):
- """A 3-tuple of added, unchanged and deleted values,
- representing the changes which have occurred on an instrumented
- attribute.
-
- The easiest way to get a :class:`.History` object for a particular
- attribute on an object is to use the :func:`_sa.inspect` function::
-
- from sqlalchemy import inspect
-
- hist = inspect(myobject).attrs.myattribute.history
-
- Each tuple member is an iterable sequence:
-
- * ``added`` - the collection of items added to the attribute (the first
- tuple element).
-
- * ``unchanged`` - the collection of items that have not changed on the
- attribute (the second tuple element).
-
- * ``deleted`` - the collection of items that have been removed from the
- attribute (the third tuple element).
-
- """
-
- added: Union[Tuple[()], List[Any]]
- unchanged: Union[Tuple[()], List[Any]]
- deleted: Union[Tuple[()], List[Any]]
-
- def __bool__(self) -> bool:
- return self != HISTORY_BLANK
-
- def empty(self) -> bool:
- """Return True if this :class:`.History` has no changes
- and no existing, unchanged state.
-
- """
-
- return not bool((self.added or self.deleted) or self.unchanged)
-
- def sum(self) -> Sequence[Any]:
- """Return a collection of added + unchanged + deleted."""
-
- return (
- (self.added or []) + (self.unchanged or []) + (self.deleted or [])
- )
-
- def non_deleted(self) -> Sequence[Any]:
- """Return a collection of added + unchanged."""
-
- return (self.added or []) + (self.unchanged or [])
-
- def non_added(self) -> Sequence[Any]:
- """Return a collection of unchanged + deleted."""
-
- return (self.unchanged or []) + (self.deleted or [])
-
- def has_changes(self) -> bool:
- """Return True if this :class:`.History` has changes."""
-
- return bool(self.added or self.deleted)
-
- def _merge(self, added: Iterable[Any], deleted: Iterable[Any]) -> History:
- return History(
- list(self.added) + list(added),
- self.unchanged,
- list(self.deleted) + list(deleted),
- )
-
- def as_state(self) -> History:
- return History(
- [
- (c is not None) and instance_state(c) or None
- for c in self.added
- ],
- [
- (c is not None) and instance_state(c) or None
- for c in self.unchanged
- ],
- [
- (c is not None) and instance_state(c) or None
- for c in self.deleted
- ],
- )
-
- @classmethod
- def from_scalar_attribute(
- cls,
- attribute: ScalarAttributeImpl,
- state: InstanceState[Any],
- current: Any,
- ) -> History:
- original = state.committed_state.get(attribute.key, _NO_HISTORY)
-
- deleted: Union[Tuple[()], List[Any]]
-
- if original is _NO_HISTORY:
- if current is NO_VALUE:
- return cls((), (), ())
- else:
- return cls((), [current], ())
- # don't let ClauseElement expressions here trip things up
- elif (
- current is not NO_VALUE
- and attribute.is_equal(current, original) is True
- ):
- return cls((), [current], ())
- else:
- # current convention on native scalars is to not
- # include information
- # about missing previous value in "deleted", but
- # we do include None, which helps in some primary
- # key situations
- if id(original) in _NO_STATE_SYMBOLS:
- deleted = ()
- # indicate a "del" operation occurred when we don't have
- # the previous value as: ([None], (), ())
- if id(current) in _NO_STATE_SYMBOLS:
- current = None
- else:
- deleted = [original]
- if current is NO_VALUE:
- return cls((), (), deleted)
- else:
- return cls([current], (), deleted)
-
- @classmethod
- def from_object_attribute(
- cls,
- attribute: ScalarObjectAttributeImpl,
- state: InstanceState[Any],
- current: Any,
- original: Any = _NO_HISTORY,
- ) -> History:
- deleted: Union[Tuple[()], List[Any]]
-
- if original is _NO_HISTORY:
- original = state.committed_state.get(attribute.key, _NO_HISTORY)
-
- if original is _NO_HISTORY:
- if current is NO_VALUE:
- return cls((), (), ())
- else:
- return cls((), [current], ())
- elif current is original and current is not NO_VALUE:
- return cls((), [current], ())
- else:
- # current convention on related objects is to not
- # include information
- # about missing previous value in "deleted", and
- # to also not include None - the dependency.py rules
- # ignore the None in any case.
- if id(original) in _NO_STATE_SYMBOLS or original is None:
- deleted = ()
- # indicate a "del" operation occurred when we don't have
- # the previous value as: ([None], (), ())
- if id(current) in _NO_STATE_SYMBOLS:
- current = None
- else:
- deleted = [original]
- if current is NO_VALUE:
- return cls((), (), deleted)
- else:
- return cls([current], (), deleted)
-
- @classmethod
- def from_collection(
- cls,
- attribute: CollectionAttributeImpl,
- state: InstanceState[Any],
- current: Any,
- ) -> History:
- original = state.committed_state.get(attribute.key, _NO_HISTORY)
- if current is NO_VALUE:
- return cls((), (), ())
-
- current = getattr(current, "_sa_adapter")
- if original is NO_VALUE:
- return cls(list(current), (), ())
- elif original is _NO_HISTORY:
- return cls((), list(current), ())
- else:
- current_states = [
- ((c is not None) and instance_state(c) or None, c)
- for c in current
- ]
- original_states = [
- ((c is not None) and instance_state(c) or None, c)
- for c in original
- ]
-
- current_set = dict(current_states)
- original_set = dict(original_states)
-
- return cls(
- [o for s, o in current_states if s not in original_set],
- [o for s, o in current_states if s in original_set],
- [o for s, o in original_states if s not in current_set],
- )
-
-
-HISTORY_BLANK = History((), (), ())
-
-
-def get_history(
- obj: object, key: str, passive: PassiveFlag = PASSIVE_OFF
-) -> History:
- """Return a :class:`.History` record for the given object
- and attribute key.
-
- This is the **pre-flush** history for a given attribute, which is
- reset each time the :class:`.Session` flushes changes to the
- current database transaction.
-
- .. note::
-
- Prefer to use the :attr:`.AttributeState.history` and
- :meth:`.AttributeState.load_history` accessors to retrieve the
- :class:`.History` for instance attributes.
-
-
- :param obj: an object whose class is instrumented by the
- attributes package.
-
- :param key: string attribute name.
-
- :param passive: indicates loading behavior for the attribute
- if the value is not already present. This is a
- bitflag attribute, which defaults to the symbol
- :attr:`.PASSIVE_OFF` indicating all necessary SQL
- should be emitted.
-
- .. seealso::
-
- :attr:`.AttributeState.history`
-
- :meth:`.AttributeState.load_history` - retrieve history
- using loader callables if the value is not locally present.
-
- """
-
- return get_state_history(instance_state(obj), key, passive)
-
-
-def get_state_history(
- state: InstanceState[Any], key: str, passive: PassiveFlag = PASSIVE_OFF
-) -> History:
- return state.get_history(key, passive)
-
-
-def has_parent(
- cls: Type[_O], obj: _O, key: str, optimistic: bool = False
-) -> bool:
- """TODO"""
- manager = manager_of_class(cls)
- state = instance_state(obj)
- return manager.has_parent(state, key, optimistic)
-
-
-def register_attribute(
- class_: Type[_O],
- key: str,
- *,
- comparator: interfaces.PropComparator[_T],
- parententity: _InternalEntityType[_O],
- doc: Optional[str] = None,
- **kw: Any,
-) -> InstrumentedAttribute[_T]:
- desc = register_descriptor(
- class_, key, comparator=comparator, parententity=parententity, doc=doc
- )
- register_attribute_impl(class_, key, **kw)
- return desc
-
-
-def register_attribute_impl(
- class_: Type[_O],
- key: str,
- uselist: bool = False,
- callable_: Optional[_LoaderCallable] = None,
- useobject: bool = False,
- impl_class: Optional[Type[AttributeImpl]] = None,
- backref: Optional[str] = None,
- **kw: Any,
-) -> QueryableAttribute[Any]:
- manager = manager_of_class(class_)
- if uselist:
- factory = kw.pop("typecallable", None)
- typecallable = manager.instrument_collection_class(
- key, factory or list
- )
- else:
- typecallable = kw.pop("typecallable", None)
-
- dispatch = cast(
- "_Dispatch[QueryableAttribute[Any]]", manager[key].dispatch
- ) # noqa: E501
-
- impl: AttributeImpl
-
- if impl_class:
- # TODO: this appears to be the WriteOnlyAttributeImpl /
- # DynamicAttributeImpl constructor which is hardcoded
- impl = cast("Type[WriteOnlyAttributeImpl]", impl_class)(
- class_, key, dispatch, **kw
- )
- elif uselist:
- impl = CollectionAttributeImpl(
- class_, key, callable_, dispatch, typecallable=typecallable, **kw
- )
- elif useobject:
- impl = ScalarObjectAttributeImpl(
- class_, key, callable_, dispatch, **kw
- )
- else:
- impl = ScalarAttributeImpl(class_, key, callable_, dispatch, **kw)
-
- manager[key].impl = impl
-
- if backref:
- backref_listeners(manager[key], backref, uselist)
-
- manager.post_configure_attribute(key)
- return manager[key]
-
-
-def register_descriptor(
- class_: Type[Any],
- key: str,
- *,
- comparator: interfaces.PropComparator[_T],
- parententity: _InternalEntityType[Any],
- doc: Optional[str] = None,
-) -> InstrumentedAttribute[_T]:
- manager = manager_of_class(class_)
-
- descriptor = InstrumentedAttribute(
- class_, key, comparator=comparator, parententity=parententity
- )
-
- descriptor.__doc__ = doc # type: ignore
-
- manager.instrument_attribute(key, descriptor)
- return descriptor
-
-
-def unregister_attribute(class_: Type[Any], key: str) -> None:
- manager_of_class(class_).uninstrument_attribute(key)
-
-
-def init_collection(obj: object, key: str) -> CollectionAdapter:
- """Initialize a collection attribute and return the collection adapter.
-
- This function is used to provide direct access to collection internals
- for a previously unloaded attribute. e.g.::
-
- collection_adapter = init_collection(someobject, 'elements')
- for elem in values:
- collection_adapter.append_without_event(elem)
-
- For an easier way to do the above, see
- :func:`~sqlalchemy.orm.attributes.set_committed_value`.
-
- :param obj: a mapped object
-
- :param key: string attribute name where the collection is located.
-
- """
- state = instance_state(obj)
- dict_ = state.dict
- return init_state_collection(state, dict_, key)
-
-
-def init_state_collection(
- state: InstanceState[Any], dict_: _InstanceDict, key: str
-) -> CollectionAdapter:
- """Initialize a collection attribute and return the collection adapter.
-
- Discards any existing collection which may be there.
-
- """
- attr = state.manager[key].impl
-
- if TYPE_CHECKING:
- assert isinstance(attr, HasCollectionAdapter)
-
- old = dict_.pop(key, None) # discard old collection
- if old is not None:
- old_collection = old._sa_adapter
- attr._dispose_previous_collection(state, old, old_collection, False)
-
- user_data = attr._default_value(state, dict_)
- adapter: CollectionAdapter = attr.get_collection(
- state, dict_, user_data, passive=PassiveFlag.PASSIVE_NO_FETCH
- )
- adapter._reset_empty()
-
- return adapter
-
-
-def set_committed_value(instance, key, value):
- """Set the value of an attribute with no history events.
-
- Cancels any previous history present. The value should be
- a scalar value for scalar-holding attributes, or
- an iterable for any collection-holding attribute.
-
- This is the same underlying method used when a lazy loader
- fires off and loads additional data from the database.
- In particular, this method can be used by application code
- which has loaded additional attributes or collections through
- separate queries, which can then be attached to an instance
- as though it were part of its original loaded state.
-
- """
- state, dict_ = instance_state(instance), instance_dict(instance)
- state.manager[key].impl.set_committed_value(state, dict_, value)
-
-
-def set_attribute(
- instance: object,
- key: str,
- value: Any,
- initiator: Optional[AttributeEventToken] = None,
-) -> None:
- """Set the value of an attribute, firing history events.
-
- This function may be used regardless of instrumentation
- applied directly to the class, i.e. no descriptors are required.
- Custom attribute management schemes will need to make usage
- of this method to establish attribute state as understood
- by SQLAlchemy.
-
- :param instance: the object that will be modified
-
- :param key: string name of the attribute
-
- :param value: value to assign
-
- :param initiator: an instance of :class:`.Event` that would have
- been propagated from a previous event listener. This argument
- is used when the :func:`.set_attribute` function is being used within
- an existing event listening function where an :class:`.Event` object
- is being supplied; the object may be used to track the origin of the
- chain of events.
-
- .. versionadded:: 1.2.3
-
- """
- state, dict_ = instance_state(instance), instance_dict(instance)
- state.manager[key].impl.set(state, dict_, value, initiator)
-
-
-def get_attribute(instance: object, key: str) -> Any:
- """Get the value of an attribute, firing any callables required.
-
- This function may be used regardless of instrumentation
- applied directly to the class, i.e. no descriptors are required.
- Custom attribute management schemes will need to make usage
- of this method to make usage of attribute state as understood
- by SQLAlchemy.
-
- """
- state, dict_ = instance_state(instance), instance_dict(instance)
- return state.manager[key].impl.get(state, dict_)
-
-
-def del_attribute(instance: object, key: str) -> None:
- """Delete the value of an attribute, firing history events.
-
- This function may be used regardless of instrumentation
- applied directly to the class, i.e. no descriptors are required.
- Custom attribute management schemes will need to make usage
- of this method to establish attribute state as understood
- by SQLAlchemy.
-
- """
- state, dict_ = instance_state(instance), instance_dict(instance)
- state.manager[key].impl.delete(state, dict_)
-
-
-def flag_modified(instance: object, key: str) -> None:
- """Mark an attribute on an instance as 'modified'.
-
- This sets the 'modified' flag on the instance and
- establishes an unconditional change event for the given attribute.
- The attribute must have a value present, else an
- :class:`.InvalidRequestError` is raised.
-
- To mark an object "dirty" without referring to any specific attribute
- so that it is considered within a flush, use the
- :func:`.attributes.flag_dirty` call.
-
- .. seealso::
-
- :func:`.attributes.flag_dirty`
-
- """
- state, dict_ = instance_state(instance), instance_dict(instance)
- impl = state.manager[key].impl
- impl.dispatch.modified(state, impl._modified_token)
- state._modified_event(dict_, impl, NO_VALUE, is_userland=True)
-
-
-def flag_dirty(instance: object) -> None:
- """Mark an instance as 'dirty' without any specific attribute mentioned.
-
- This is a special operation that will allow the object to travel through
- the flush process for interception by events such as
- :meth:`.SessionEvents.before_flush`. Note that no SQL will be emitted in
- the flush process for an object that has no changes, even if marked dirty
- via this method. However, a :meth:`.SessionEvents.before_flush` handler
- will be able to see the object in the :attr:`.Session.dirty` collection and
- may establish changes on it, which will then be included in the SQL
- emitted.
-
- .. versionadded:: 1.2
-
- .. seealso::
-
- :func:`.attributes.flag_modified`
-
- """
-
- state, dict_ = instance_state(instance), instance_dict(instance)
- state._modified_event(dict_, None, NO_VALUE, is_userland=True)