summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/orm/writeonly.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/writeonly.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/orm/writeonly.py678
1 files changed, 678 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/writeonly.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/writeonly.py
new file mode 100644
index 0000000..5680cc7
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/orm/writeonly.py
@@ -0,0 +1,678 @@
+# orm/writeonly.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+"""Write-only collection API.
+
+This is an alternate mapped attribute style that only supports single-item
+collection mutation operations. To read the collection, a select()
+object must be executed each time.
+
+.. versionadded:: 2.0
+
+
+"""
+
+from __future__ import annotations
+
+from typing import Any
+from typing import Collection
+from typing import Dict
+from typing import Generic
+from typing import Iterable
+from typing import Iterator
+from typing import List
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from sqlalchemy.sql import bindparam
+from . import attributes
+from . import interfaces
+from . import relationships
+from . import strategies
+from .base import NEVER_SET
+from .base import object_mapper
+from .base import PassiveFlag
+from .base import RelationshipDirection
+from .. import exc
+from .. import inspect
+from .. import log
+from .. import util
+from ..sql import delete
+from ..sql import insert
+from ..sql import select
+from ..sql import update
+from ..sql.dml import Delete
+from ..sql.dml import Insert
+from ..sql.dml import Update
+from ..util.typing import Literal
+
+if TYPE_CHECKING:
+ from . import QueryableAttribute
+ from ._typing import _InstanceDict
+ from .attributes import AttributeEventToken
+ from .base import LoaderCallableStatus
+ from .collections import _AdaptedCollectionProtocol
+ from .collections import CollectionAdapter
+ from .mapper import Mapper
+ from .relationships import _RelationshipOrderByArg
+ from .state import InstanceState
+ from .util import AliasedClass
+ from ..event import _Dispatch
+ from ..sql.selectable import FromClause
+ from ..sql.selectable import Select
+
+_T = TypeVar("_T", bound=Any)
+
+
+class WriteOnlyHistory(Generic[_T]):
+ """Overrides AttributeHistory to receive append/remove events directly."""
+
+ unchanged_items: util.OrderedIdentitySet
+ added_items: util.OrderedIdentitySet
+ deleted_items: util.OrderedIdentitySet
+ _reconcile_collection: bool
+
+ def __init__(
+ self,
+ attr: WriteOnlyAttributeImpl,
+ state: InstanceState[_T],
+ passive: PassiveFlag,
+ apply_to: Optional[WriteOnlyHistory[_T]] = None,
+ ) -> None:
+ if apply_to:
+ if passive & PassiveFlag.SQL_OK:
+ raise exc.InvalidRequestError(
+ f"Attribute {attr} can't load the existing state from the "
+ "database for this operation; full iteration is not "
+ "permitted. If this is a delete operation, configure "
+ f"passive_deletes=True on the {attr} relationship in "
+ "order to resolve this error."
+ )
+
+ self.unchanged_items = apply_to.unchanged_items
+ self.added_items = apply_to.added_items
+ self.deleted_items = apply_to.deleted_items
+ self._reconcile_collection = apply_to._reconcile_collection
+ else:
+ self.deleted_items = util.OrderedIdentitySet()
+ self.added_items = util.OrderedIdentitySet()
+ self.unchanged_items = util.OrderedIdentitySet()
+ self._reconcile_collection = False
+
+ @property
+ def added_plus_unchanged(self) -> List[_T]:
+ return list(self.added_items.union(self.unchanged_items))
+
+ @property
+ def all_items(self) -> List[_T]:
+ return list(
+ self.added_items.union(self.unchanged_items).union(
+ self.deleted_items
+ )
+ )
+
+ def as_history(self) -> attributes.History:
+ if self._reconcile_collection:
+ added = self.added_items.difference(self.unchanged_items)
+ deleted = self.deleted_items.intersection(self.unchanged_items)
+ unchanged = self.unchanged_items.difference(deleted)
+ else:
+ added, unchanged, deleted = (
+ self.added_items,
+ self.unchanged_items,
+ self.deleted_items,
+ )
+ return attributes.History(list(added), list(unchanged), list(deleted))
+
+ def indexed(self, index: Union[int, slice]) -> Union[List[_T], _T]:
+ return list(self.added_items)[index]
+
+ def add_added(self, value: _T) -> None:
+ self.added_items.add(value)
+
+ def add_removed(self, value: _T) -> None:
+ if value in self.added_items:
+ self.added_items.remove(value)
+ else:
+ self.deleted_items.add(value)
+
+
+class WriteOnlyAttributeImpl(
+ attributes.HasCollectionAdapter, attributes.AttributeImpl
+):
+ uses_objects: bool = True
+ default_accepts_scalar_loader: bool = False
+ supports_population: bool = False
+ _supports_dynamic_iteration: bool = False
+ collection: bool = False
+ dynamic: bool = True
+ order_by: _RelationshipOrderByArg = ()
+ collection_history_cls: Type[WriteOnlyHistory[Any]] = WriteOnlyHistory
+
+ query_class: Type[WriteOnlyCollection[Any]]
+
+ def __init__(
+ self,
+ class_: Union[Type[Any], AliasedClass[Any]],
+ key: str,
+ dispatch: _Dispatch[QueryableAttribute[Any]],
+ target_mapper: Mapper[_T],
+ order_by: _RelationshipOrderByArg,
+ **kw: Any,
+ ):
+ super().__init__(class_, key, None, dispatch, **kw)
+ self.target_mapper = target_mapper
+ self.query_class = WriteOnlyCollection
+ if order_by:
+ self.order_by = tuple(order_by)
+
+ def get(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
+ ) -> Union[util.OrderedIdentitySet, WriteOnlyCollection[Any]]:
+ if not passive & PassiveFlag.SQL_OK:
+ return self._get_collection_history(
+ state, PassiveFlag.PASSIVE_NO_INITIALIZE
+ ).added_items
+ else:
+ return self.query_class(self, state)
+
+ @overload
+ def get_collection(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ user_data: Literal[None] = ...,
+ passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
+ ) -> CollectionAdapter: ...
+
+ @overload
+ def get_collection(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ user_data: _AdaptedCollectionProtocol = ...,
+ passive: PassiveFlag = ...,
+ ) -> CollectionAdapter: ...
+
+ @overload
+ def get_collection(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ user_data: Optional[_AdaptedCollectionProtocol] = ...,
+ passive: PassiveFlag = ...,
+ ) -> Union[
+ Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
+ ]: ...
+
+ def get_collection(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ user_data: Optional[_AdaptedCollectionProtocol] = None,
+ passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
+ ) -> Union[
+ Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
+ ]:
+ data: Collection[Any]
+ if not passive & PassiveFlag.SQL_OK:
+ data = self._get_collection_history(state, passive).added_items
+ else:
+ history = self._get_collection_history(state, passive)
+ data = history.added_plus_unchanged
+ return DynamicCollectionAdapter(data) # type: ignore[return-value]
+
+ @util.memoized_property
+ def _append_token( # type:ignore[override]
+ self,
+ ) -> attributes.AttributeEventToken:
+ return attributes.AttributeEventToken(self, attributes.OP_APPEND)
+
+ @util.memoized_property
+ def _remove_token( # type:ignore[override]
+ self,
+ ) -> attributes.AttributeEventToken:
+ return attributes.AttributeEventToken(self, attributes.OP_REMOVE)
+
+ def fire_append_event(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ collection_history: Optional[WriteOnlyHistory[Any]] = None,
+ ) -> None:
+ if collection_history is None:
+ collection_history = self._modified_event(state, dict_)
+
+ collection_history.add_added(value)
+
+ for fn in self.dispatch.append:
+ value = fn(state, value, initiator or self._append_token)
+
+ if self.trackparent and value is not None:
+ self.sethasparent(attributes.instance_state(value), state, True)
+
+ def fire_remove_event(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ collection_history: Optional[WriteOnlyHistory[Any]] = None,
+ ) -> None:
+ if collection_history is None:
+ collection_history = self._modified_event(state, dict_)
+
+ collection_history.add_removed(value)
+
+ if self.trackparent and value is not None:
+ self.sethasparent(attributes.instance_state(value), state, False)
+
+ for fn in self.dispatch.remove:
+ fn(state, value, initiator or self._remove_token)
+
+ def _modified_event(
+ self, state: InstanceState[Any], dict_: _InstanceDict
+ ) -> WriteOnlyHistory[Any]:
+ if self.key not in state.committed_state:
+ state.committed_state[self.key] = self.collection_history_cls(
+ self, state, PassiveFlag.PASSIVE_NO_FETCH
+ )
+
+ state._modified_event(dict_, self, NEVER_SET)
+
+ # this is a hack to allow the entities.ComparableEntity fixture
+ # to work
+ dict_[self.key] = True
+ return state.committed_state[self.key] # type: ignore[no-any-return]
+
+ def set(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken] = None,
+ passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
+ check_old: Any = None,
+ pop: bool = False,
+ _adapt: bool = True,
+ ) -> None:
+ if initiator and initiator.parent_token is self.parent_token:
+ return
+
+ if pop and value is None:
+ return
+
+ iterable = value
+ new_values = list(iterable)
+ if state.has_identity:
+ if not self._supports_dynamic_iteration:
+ raise exc.InvalidRequestError(
+ f'Collection "{self}" does not support implicit '
+ "iteration; collection replacement operations "
+ "can't be used"
+ )
+ old_collection = util.IdentitySet(
+ self.get(state, dict_, passive=passive)
+ )
+
+ collection_history = self._modified_event(state, dict_)
+ if not state.has_identity:
+ old_collection = collection_history.added_items
+ else:
+ old_collection = old_collection.union(
+ collection_history.added_items
+ )
+
+ constants = old_collection.intersection(new_values)
+ additions = util.IdentitySet(new_values).difference(constants)
+ removals = old_collection.difference(constants)
+
+ for member in new_values:
+ if member in additions:
+ self.fire_append_event(
+ state,
+ dict_,
+ member,
+ None,
+ collection_history=collection_history,
+ )
+
+ for member in removals:
+ self.fire_remove_event(
+ state,
+ dict_,
+ member,
+ None,
+ collection_history=collection_history,
+ )
+
+ def delete(self, *args: Any, **kwargs: Any) -> NoReturn:
+ raise NotImplementedError()
+
+ def set_committed_value(
+ self, state: InstanceState[Any], dict_: _InstanceDict, value: Any
+ ) -> NoReturn:
+ raise NotImplementedError(
+ "Dynamic attributes don't support collection population."
+ )
+
+ def get_history(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+ ) -> attributes.History:
+ c = self._get_collection_history(state, passive)
+ return c.as_history()
+
+ def get_all_pending(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_INITIALIZE,
+ ) -> List[Tuple[InstanceState[Any], Any]]:
+ c = self._get_collection_history(state, passive)
+ return [(attributes.instance_state(x), x) for x in c.all_items]
+
+ def _get_collection_history(
+ self, state: InstanceState[Any], passive: PassiveFlag
+ ) -> WriteOnlyHistory[Any]:
+ c: WriteOnlyHistory[Any]
+ if self.key in state.committed_state:
+ c = state.committed_state[self.key]
+ else:
+ c = self.collection_history_cls(
+ self, state, PassiveFlag.PASSIVE_NO_FETCH
+ )
+
+ if state.has_identity and (passive & PassiveFlag.INIT_OK):
+ return self.collection_history_cls(
+ self, state, passive, apply_to=c
+ )
+ else:
+ return c
+
+ def append(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+ ) -> None:
+ if initiator is not self:
+ self.fire_append_event(state, dict_, value, initiator)
+
+ def remove(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+ ) -> None:
+ if initiator is not self:
+ self.fire_remove_event(state, dict_, value, initiator)
+
+ def pop(
+ self,
+ state: InstanceState[Any],
+ dict_: _InstanceDict,
+ value: Any,
+ initiator: Optional[AttributeEventToken],
+ passive: PassiveFlag = PassiveFlag.PASSIVE_NO_FETCH,
+ ) -> None:
+ self.remove(state, dict_, value, initiator, passive=passive)
+
+
+@log.class_logger
+@relationships.RelationshipProperty.strategy_for(lazy="write_only")
+class WriteOnlyLoader(strategies.AbstractRelationshipLoader, log.Identified):
+ impl_class = WriteOnlyAttributeImpl
+
+ def init_class_attribute(self, mapper: Mapper[Any]) -> None:
+ self.is_class_level = True
+ if not self.uselist or self.parent_property.direction not in (
+ interfaces.ONETOMANY,
+ interfaces.MANYTOMANY,
+ ):
+ raise exc.InvalidRequestError(
+ "On relationship %s, 'dynamic' loaders cannot be used with "
+ "many-to-one/one-to-one relationships and/or "
+ "uselist=False." % self.parent_property
+ )
+
+ strategies._register_attribute( # type: ignore[no-untyped-call]
+ self.parent_property,
+ mapper,
+ useobject=True,
+ impl_class=self.impl_class,
+ target_mapper=self.parent_property.mapper,
+ order_by=self.parent_property.order_by,
+ query_class=self.parent_property.query_class,
+ )
+
+
+class DynamicCollectionAdapter:
+ """simplified CollectionAdapter for internal API consistency"""
+
+ data: Collection[Any]
+
+ def __init__(self, data: Collection[Any]):
+ self.data = data
+
+ def __iter__(self) -> Iterator[Any]:
+ return iter(self.data)
+
+ def _reset_empty(self) -> None:
+ pass
+
+ def __len__(self) -> int:
+ return len(self.data)
+
+ def __bool__(self) -> bool:
+ return True
+
+
+class AbstractCollectionWriter(Generic[_T]):
+ """Virtual collection which includes append/remove methods that synchronize
+ into the attribute event system.
+
+ """
+
+ if not TYPE_CHECKING:
+ __slots__ = ()
+
+ instance: _T
+ _from_obj: Tuple[FromClause, ...]
+
+ def __init__(self, attr: WriteOnlyAttributeImpl, state: InstanceState[_T]):
+ instance = state.obj()
+ if TYPE_CHECKING:
+ assert instance
+ self.instance = instance
+ self.attr = attr
+
+ mapper = object_mapper(instance)
+ prop = mapper._props[self.attr.key]
+
+ if prop.secondary is not None:
+ # this is a hack right now. The Query only knows how to
+ # make subsequent joins() without a given left-hand side
+ # from self._from_obj[0]. We need to ensure prop.secondary
+ # is in the FROM. So we purposely put the mapper selectable
+ # in _from_obj[0] to ensure a user-defined join() later on
+ # doesn't fail, and secondary is then in _from_obj[1].
+
+ # note also, we are using the official ORM-annotated selectable
+ # from __clause_element__(), see #7868
+ self._from_obj = (prop.mapper.__clause_element__(), prop.secondary)
+ else:
+ self._from_obj = ()
+
+ self._where_criteria = (
+ prop._with_parent(instance, alias_secondary=False),
+ )
+
+ if self.attr.order_by:
+ self._order_by_clauses = self.attr.order_by
+ else:
+ self._order_by_clauses = ()
+
+ def _add_all_impl(self, iterator: Iterable[_T]) -> None:
+ for item in iterator:
+ self.attr.append(
+ attributes.instance_state(self.instance),
+ attributes.instance_dict(self.instance),
+ item,
+ None,
+ )
+
+ def _remove_impl(self, item: _T) -> None:
+ self.attr.remove(
+ attributes.instance_state(self.instance),
+ attributes.instance_dict(self.instance),
+ item,
+ None,
+ )
+
+
+class WriteOnlyCollection(AbstractCollectionWriter[_T]):
+ """Write-only collection which can synchronize changes into the
+ attribute event system.
+
+ The :class:`.WriteOnlyCollection` is used in a mapping by
+ using the ``"write_only"`` lazy loading strategy with
+ :func:`_orm.relationship`. For background on this configuration,
+ see :ref:`write_only_relationship`.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ :ref:`write_only_relationship`
+
+ """
+
+ __slots__ = (
+ "instance",
+ "attr",
+ "_where_criteria",
+ "_from_obj",
+ "_order_by_clauses",
+ )
+
+ def __iter__(self) -> NoReturn:
+ raise TypeError(
+ "WriteOnly collections don't support iteration in-place; "
+ "to query for collection items, use the select() method to "
+ "produce a SQL statement and execute it with session.scalars()."
+ )
+
+ def select(self) -> Select[Tuple[_T]]:
+ """Produce a :class:`_sql.Select` construct that represents the
+ rows within this instance-local :class:`_orm.WriteOnlyCollection`.
+
+ """
+ stmt = select(self.attr.target_mapper).where(*self._where_criteria)
+ if self._from_obj:
+ stmt = stmt.select_from(*self._from_obj)
+ if self._order_by_clauses:
+ stmt = stmt.order_by(*self._order_by_clauses)
+ return stmt
+
+ def insert(self) -> Insert:
+ """For one-to-many collections, produce a :class:`_dml.Insert` which
+ will insert new rows in terms of this this instance-local
+ :class:`_orm.WriteOnlyCollection`.
+
+ This construct is only supported for a :class:`_orm.Relationship`
+ that does **not** include the :paramref:`_orm.relationship.secondary`
+ parameter. For relationships that refer to a many-to-many table,
+ use ordinary bulk insert techniques to produce new objects, then
+ use :meth:`_orm.AbstractCollectionWriter.add_all` to associate them
+ with the collection.
+
+
+ """
+
+ state = inspect(self.instance)
+ mapper = state.mapper
+ prop = mapper._props[self.attr.key]
+
+ if prop.direction is not RelationshipDirection.ONETOMANY:
+ raise exc.InvalidRequestError(
+ "Write only bulk INSERT only supported for one-to-many "
+ "collections; for many-to-many, use a separate bulk "
+ "INSERT along with add_all()."
+ )
+
+ dict_: Dict[str, Any] = {}
+
+ for l, r in prop.synchronize_pairs:
+ fn = prop._get_attr_w_warn_on_none(
+ mapper,
+ state,
+ state.dict,
+ l,
+ )
+
+ dict_[r.key] = bindparam(None, callable_=fn)
+
+ return insert(self.attr.target_mapper).values(**dict_)
+
+ def update(self) -> Update:
+ """Produce a :class:`_dml.Update` which will refer to rows in terms
+ of this instance-local :class:`_orm.WriteOnlyCollection`.
+
+ """
+ return update(self.attr.target_mapper).where(*self._where_criteria)
+
+ def delete(self) -> Delete:
+ """Produce a :class:`_dml.Delete` which will refer to rows in terms
+ of this instance-local :class:`_orm.WriteOnlyCollection`.
+
+ """
+ return delete(self.attr.target_mapper).where(*self._where_criteria)
+
+ def add_all(self, iterator: Iterable[_T]) -> None:
+ """Add an iterable of items to this :class:`_orm.WriteOnlyCollection`.
+
+ The given items will be persisted to the database in terms of
+ the parent instance's collection on the next flush.
+
+ """
+ self._add_all_impl(iterator)
+
+ def add(self, item: _T) -> None:
+ """Add an item to this :class:`_orm.WriteOnlyCollection`.
+
+ The given item will be persisted to the database in terms of
+ the parent instance's collection on the next flush.
+
+ """
+ self._add_all_impl([item])
+
+ def remove(self, item: _T) -> None:
+ """Remove an item from this :class:`_orm.WriteOnlyCollection`.
+
+ The given item will be removed from the parent instance's collection on
+ the next flush.
+
+ """
+ self._remove_impl(item)