diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py | 5238 |
1 files changed, 5238 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py new file mode 100644 index 0000000..3eba5aa --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py @@ -0,0 +1,5238 @@ +# orm/session.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +"""Provides the Session class and related utilities.""" + +from __future__ import annotations + +import contextlib +from enum import Enum +import itertools +import sys +import typing +from typing import Any +from typing import Callable +from typing import cast +from typing import Dict +from typing import Generic +from typing import Iterable +from typing import Iterator +from typing import List +from typing import NoReturn +from typing import Optional +from typing import overload +from typing import Sequence +from typing import Set +from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union +import weakref + +from . import attributes +from . import bulk_persistence +from . import context +from . import descriptor_props +from . import exc +from . import identity +from . import loading +from . import query +from . import state as statelib +from ._typing import _O +from ._typing import insp_is_mapper +from ._typing import is_composite_class +from ._typing import is_orm_option +from ._typing import is_user_defined_option +from .base import _class_to_mapper +from .base import _none_set +from .base import _state_mapper +from .base import instance_str +from .base import LoaderCallableStatus +from .base import object_mapper +from .base import object_state +from .base import PassiveFlag +from .base import state_str +from .context import FromStatement +from .context import ORMCompileState +from .identity import IdentityMap +from .query import Query +from .state import InstanceState +from .state_changes import _StateChange +from .state_changes import _StateChangeState +from .state_changes import _StateChangeStates +from .unitofwork import UOWTransaction +from .. import engine +from .. import exc as sa_exc +from .. import sql +from .. import util +from ..engine import Connection +from ..engine import Engine +from ..engine.util import TransactionalContext +from ..event import dispatcher +from ..event import EventTarget +from ..inspection import inspect +from ..inspection import Inspectable +from ..sql import coercions +from ..sql import dml +from ..sql import roles +from ..sql import Select +from ..sql import TableClause +from ..sql import visitors +from ..sql.base import _NoArg +from ..sql.base import CompileState +from ..sql.schema import Table +from ..sql.selectable import ForUpdateArg +from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL +from ..util import IdentitySet +from ..util.typing import Literal +from ..util.typing import Protocol + +if typing.TYPE_CHECKING: + from ._typing import _EntityType + from ._typing import _IdentityKeyType + from ._typing import _InstanceDict + from ._typing import OrmExecuteOptionsParameter + from .interfaces import ORMOption + from .interfaces import UserDefinedOption + from .mapper import Mapper + from .path_registry import PathRegistry + from .query import RowReturningQuery + from ..engine import CursorResult + from ..engine import Result + from ..engine import Row + from ..engine import RowMapping + from ..engine.base import Transaction + from ..engine.base import TwoPhaseTransaction + from ..engine.interfaces import _CoreAnyExecuteParams + from ..engine.interfaces import _CoreSingleExecuteParams + from ..engine.interfaces import _ExecuteOptions + from ..engine.interfaces import CoreExecuteOptionsParameter + from ..engine.result import ScalarResult + from ..event import _InstanceLevelDispatch + from ..sql._typing import _ColumnsClauseArgument + from ..sql._typing import _InfoType + from ..sql._typing import _T0 + from ..sql._typing import _T1 + from ..sql._typing import _T2 + from ..sql._typing import _T3 + from ..sql._typing import _T4 + from ..sql._typing import _T5 + from ..sql._typing import _T6 + from ..sql._typing import _T7 + from ..sql._typing import _TypedColumnClauseArgument as _TCCA + from ..sql.base import Executable + from ..sql.base import ExecutableOption + from ..sql.dml import UpdateBase + from ..sql.elements import ClauseElement + from ..sql.roles import TypedColumnsClauseRole + from ..sql.selectable import ForUpdateParameter + from ..sql.selectable import TypedReturnsRows + +_T = TypeVar("_T", bound=Any) + +__all__ = [ + "Session", + "SessionTransaction", + "sessionmaker", + "ORMExecuteState", + "close_all_sessions", + "make_transient", + "make_transient_to_detached", + "object_session", +] + +_sessions: weakref.WeakValueDictionary[int, Session] = ( + weakref.WeakValueDictionary() +) +"""Weak-referencing dictionary of :class:`.Session` objects. +""" + +statelib._sessions = _sessions + +_PKIdentityArgument = Union[Any, Tuple[Any, ...]] + +_BindArguments = Dict[str, Any] + +_EntityBindKey = Union[Type[_O], "Mapper[_O]"] +_SessionBindKey = Union[Type[Any], "Mapper[Any]", "TableClause", str] +_SessionBind = Union["Engine", "Connection"] + +JoinTransactionMode = Literal[ + "conditional_savepoint", + "rollback_only", + "control_fully", + "create_savepoint", +] + + +class _ConnectionCallableProto(Protocol): + """a callable that returns a :class:`.Connection` given an instance. + + This callable, when present on a :class:`.Session`, is called only from the + ORM's persistence mechanism (i.e. the unit of work flush process) to allow + for connection-per-instance schemes (i.e. horizontal sharding) to be used + as persistence time. + + This callable is not present on a plain :class:`.Session`, however + is established when using the horizontal sharding extension. + + """ + + def __call__( + self, + mapper: Optional[Mapper[Any]] = None, + instance: Optional[object] = None, + **kw: Any, + ) -> Connection: ... + + +def _state_session(state: InstanceState[Any]) -> Optional[Session]: + """Given an :class:`.InstanceState`, return the :class:`.Session` + associated, if any. + """ + return state.session + + +class _SessionClassMethods: + """Class-level methods for :class:`.Session`, :class:`.sessionmaker`.""" + + @classmethod + @util.deprecated( + "1.3", + "The :meth:`.Session.close_all` method is deprecated and will be " + "removed in a future release. Please refer to " + ":func:`.session.close_all_sessions`.", + ) + def close_all(cls) -> None: + """Close *all* sessions in memory.""" + + close_all_sessions() + + @classmethod + @util.preload_module("sqlalchemy.orm.util") + def identity_key( + cls, + class_: Optional[Type[Any]] = None, + ident: Union[Any, Tuple[Any, ...]] = None, + *, + instance: Optional[Any] = None, + row: Optional[Union[Row[Any], RowMapping]] = None, + identity_token: Optional[Any] = None, + ) -> _IdentityKeyType[Any]: + """Return an identity key. + + This is an alias of :func:`.util.identity_key`. + + """ + return util.preloaded.orm_util.identity_key( + class_, + ident, + instance=instance, + row=row, + identity_token=identity_token, + ) + + @classmethod + def object_session(cls, instance: object) -> Optional[Session]: + """Return the :class:`.Session` to which an object belongs. + + This is an alias of :func:`.object_session`. + + """ + + return object_session(instance) + + +class SessionTransactionState(_StateChangeState): + ACTIVE = 1 + PREPARED = 2 + COMMITTED = 3 + DEACTIVE = 4 + CLOSED = 5 + PROVISIONING_CONNECTION = 6 + + +# backwards compatibility +ACTIVE, PREPARED, COMMITTED, DEACTIVE, CLOSED, PROVISIONING_CONNECTION = tuple( + SessionTransactionState +) + + +class ORMExecuteState(util.MemoizedSlots): + """Represents a call to the :meth:`_orm.Session.execute` method, as passed + to the :meth:`.SessionEvents.do_orm_execute` event hook. + + .. versionadded:: 1.4 + + .. seealso:: + + :ref:`session_execute_events` - top level documentation on how + to use :meth:`_orm.SessionEvents.do_orm_execute` + + """ + + __slots__ = ( + "session", + "statement", + "parameters", + "execution_options", + "local_execution_options", + "bind_arguments", + "identity_token", + "_compile_state_cls", + "_starting_event_idx", + "_events_todo", + "_update_execution_options", + ) + + session: Session + """The :class:`_orm.Session` in use.""" + + statement: Executable + """The SQL statement being invoked. + + For an ORM selection as would + be retrieved from :class:`_orm.Query`, this is an instance of + :class:`_sql.select` that was generated from the ORM query. + """ + + parameters: Optional[_CoreAnyExecuteParams] + """Dictionary of parameters that was passed to + :meth:`_orm.Session.execute`.""" + + execution_options: _ExecuteOptions + """The complete dictionary of current execution options. + + This is a merge of the statement level options with the + locally passed execution options. + + .. seealso:: + + :attr:`_orm.ORMExecuteState.local_execution_options` + + :meth:`_sql.Executable.execution_options` + + :ref:`orm_queryguide_execution_options` + + """ + + local_execution_options: _ExecuteOptions + """Dictionary view of the execution options passed to the + :meth:`.Session.execute` method. + + This does not include options that may be associated with the statement + being invoked. + + .. seealso:: + + :attr:`_orm.ORMExecuteState.execution_options` + + """ + + bind_arguments: _BindArguments + """The dictionary passed as the + :paramref:`_orm.Session.execute.bind_arguments` dictionary. + + This dictionary may be used by extensions to :class:`_orm.Session` to pass + arguments that will assist in determining amongst a set of database + connections which one should be used to invoke this statement. + + """ + + _compile_state_cls: Optional[Type[ORMCompileState]] + _starting_event_idx: int + _events_todo: List[Any] + _update_execution_options: Optional[_ExecuteOptions] + + def __init__( + self, + session: Session, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams], + execution_options: _ExecuteOptions, + bind_arguments: _BindArguments, + compile_state_cls: Optional[Type[ORMCompileState]], + events_todo: List[_InstanceLevelDispatch[Session]], + ): + """Construct a new :class:`_orm.ORMExecuteState`. + + this object is constructed internally. + + """ + self.session = session + self.statement = statement + self.parameters = parameters + self.local_execution_options = execution_options + self.execution_options = statement._execution_options.union( + execution_options + ) + self.bind_arguments = bind_arguments + self._compile_state_cls = compile_state_cls + self._events_todo = list(events_todo) + + def _remaining_events(self) -> List[_InstanceLevelDispatch[Session]]: + return self._events_todo[self._starting_event_idx + 1 :] + + def invoke_statement( + self, + statement: Optional[Executable] = None, + params: Optional[_CoreAnyExecuteParams] = None, + execution_options: Optional[OrmExecuteOptionsParameter] = None, + bind_arguments: Optional[_BindArguments] = None, + ) -> Result[Any]: + """Execute the statement represented by this + :class:`.ORMExecuteState`, without re-invoking events that have + already proceeded. + + This method essentially performs a re-entrant execution of the current + statement for which the :meth:`.SessionEvents.do_orm_execute` event is + being currently invoked. The use case for this is for event handlers + that want to override how the ultimate + :class:`_engine.Result` object is returned, such as for schemes that + retrieve results from an offline cache or which concatenate results + from multiple executions. + + When the :class:`_engine.Result` object is returned by the actual + handler function within :meth:`_orm.SessionEvents.do_orm_execute` and + is propagated to the calling + :meth:`_orm.Session.execute` method, the remainder of the + :meth:`_orm.Session.execute` method is preempted and the + :class:`_engine.Result` object is returned to the caller of + :meth:`_orm.Session.execute` immediately. + + :param statement: optional statement to be invoked, in place of the + statement currently represented by :attr:`.ORMExecuteState.statement`. + + :param params: optional dictionary of parameters or list of parameters + which will be merged into the existing + :attr:`.ORMExecuteState.parameters` of this :class:`.ORMExecuteState`. + + .. versionchanged:: 2.0 a list of parameter dictionaries is accepted + for executemany executions. + + :param execution_options: optional dictionary of execution options + will be merged into the existing + :attr:`.ORMExecuteState.execution_options` of this + :class:`.ORMExecuteState`. + + :param bind_arguments: optional dictionary of bind_arguments + which will be merged amongst the current + :attr:`.ORMExecuteState.bind_arguments` + of this :class:`.ORMExecuteState`. + + :return: a :class:`_engine.Result` object with ORM-level results. + + .. seealso:: + + :ref:`do_orm_execute_re_executing` - background and examples on the + appropriate usage of :meth:`_orm.ORMExecuteState.invoke_statement`. + + + """ + + if statement is None: + statement = self.statement + + _bind_arguments = dict(self.bind_arguments) + if bind_arguments: + _bind_arguments.update(bind_arguments) + _bind_arguments["_sa_skip_events"] = True + + _params: Optional[_CoreAnyExecuteParams] + if params: + if self.is_executemany: + _params = [] + exec_many_parameters = cast( + "List[Dict[str, Any]]", self.parameters + ) + for _existing_params, _new_params in itertools.zip_longest( + exec_many_parameters, + cast("List[Dict[str, Any]]", params), + ): + if _existing_params is None or _new_params is None: + raise sa_exc.InvalidRequestError( + f"Can't apply executemany parameters to " + f"statement; number of parameter sets passed to " + f"Session.execute() ({len(exec_many_parameters)}) " + f"does not match number of parameter sets given " + f"to ORMExecuteState.invoke_statement() " + f"({len(params)})" + ) + _existing_params = dict(_existing_params) + _existing_params.update(_new_params) + _params.append(_existing_params) + else: + _params = dict(cast("Dict[str, Any]", self.parameters)) + _params.update(cast("Dict[str, Any]", params)) + else: + _params = self.parameters + + _execution_options = self.local_execution_options + if execution_options: + _execution_options = _execution_options.union(execution_options) + + return self.session._execute_internal( + statement, + _params, + execution_options=_execution_options, + bind_arguments=_bind_arguments, + _parent_execute_state=self, + ) + + @property + def bind_mapper(self) -> Optional[Mapper[Any]]: + """Return the :class:`_orm.Mapper` that is the primary "bind" mapper. + + For an :class:`_orm.ORMExecuteState` object invoking an ORM + statement, that is, the :attr:`_orm.ORMExecuteState.is_orm_statement` + attribute is ``True``, this attribute will return the + :class:`_orm.Mapper` that is considered to be the "primary" mapper + of the statement. The term "bind mapper" refers to the fact that + a :class:`_orm.Session` object may be "bound" to multiple + :class:`_engine.Engine` objects keyed to mapped classes, and the + "bind mapper" determines which of those :class:`_engine.Engine` objects + would be selected. + + For a statement that is invoked against a single mapped class, + :attr:`_orm.ORMExecuteState.bind_mapper` is intended to be a reliable + way of getting this mapper. + + .. versionadded:: 1.4.0b2 + + .. seealso:: + + :attr:`_orm.ORMExecuteState.all_mappers` + + + """ + mp: Optional[Mapper[Any]] = self.bind_arguments.get("mapper", None) + return mp + + @property + def all_mappers(self) -> Sequence[Mapper[Any]]: + """Return a sequence of all :class:`_orm.Mapper` objects that are + involved at the top level of this statement. + + By "top level" we mean those :class:`_orm.Mapper` objects that would + be represented in the result set rows for a :func:`_sql.select` + query, or for a :func:`_dml.update` or :func:`_dml.delete` query, + the mapper that is the main subject of the UPDATE or DELETE. + + .. versionadded:: 1.4.0b2 + + .. seealso:: + + :attr:`_orm.ORMExecuteState.bind_mapper` + + + + """ + if not self.is_orm_statement: + return [] + elif isinstance(self.statement, (Select, FromStatement)): + result = [] + seen = set() + for d in self.statement.column_descriptions: + ent = d["entity"] + if ent: + insp = inspect(ent, raiseerr=False) + if insp and insp.mapper and insp.mapper not in seen: + seen.add(insp.mapper) + result.append(insp.mapper) + return result + elif self.statement.is_dml and self.bind_mapper: + return [self.bind_mapper] + else: + return [] + + @property + def is_orm_statement(self) -> bool: + """return True if the operation is an ORM statement. + + This indicates that the select(), insert(), update(), or delete() + being invoked contains ORM entities as subjects. For a statement + that does not have ORM entities and instead refers only to + :class:`.Table` metadata, it is invoked as a Core SQL statement + and no ORM-level automation takes place. + + """ + return self._compile_state_cls is not None + + @property + def is_executemany(self) -> bool: + """return True if the parameters are a multi-element list of + dictionaries with more than one dictionary. + + .. versionadded:: 2.0 + + """ + return isinstance(self.parameters, list) + + @property + def is_select(self) -> bool: + """return True if this is a SELECT operation.""" + return self.statement.is_select + + @property + def is_insert(self) -> bool: + """return True if this is an INSERT operation.""" + return self.statement.is_dml and self.statement.is_insert + + @property + def is_update(self) -> bool: + """return True if this is an UPDATE operation.""" + return self.statement.is_dml and self.statement.is_update + + @property + def is_delete(self) -> bool: + """return True if this is a DELETE operation.""" + return self.statement.is_dml and self.statement.is_delete + + @property + def _is_crud(self) -> bool: + return isinstance(self.statement, (dml.Update, dml.Delete)) + + def update_execution_options(self, **opts: Any) -> None: + """Update the local execution options with new values.""" + self.local_execution_options = self.local_execution_options.union(opts) + + def _orm_compile_options( + self, + ) -> Optional[ + Union[ + context.ORMCompileState.default_compile_options, + Type[context.ORMCompileState.default_compile_options], + ] + ]: + if not self.is_select: + return None + try: + opts = self.statement._compile_options + except AttributeError: + return None + + if opts is not None and opts.isinstance( + context.ORMCompileState.default_compile_options + ): + return opts # type: ignore + else: + return None + + @property + def lazy_loaded_from(self) -> Optional[InstanceState[Any]]: + """An :class:`.InstanceState` that is using this statement execution + for a lazy load operation. + + The primary rationale for this attribute is to support the horizontal + sharding extension, where it is available within specific query + execution time hooks created by this extension. To that end, the + attribute is only intended to be meaningful at **query execution + time**, and importantly not any time prior to that, including query + compilation time. + + """ + return self.load_options._lazy_loaded_from + + @property + def loader_strategy_path(self) -> Optional[PathRegistry]: + """Return the :class:`.PathRegistry` for the current load path. + + This object represents the "path" in a query along relationships + when a particular object or collection is being loaded. + + """ + opts = self._orm_compile_options() + if opts is not None: + return opts._current_path + else: + return None + + @property + def is_column_load(self) -> bool: + """Return True if the operation is refreshing column-oriented + attributes on an existing ORM object. + + This occurs during operations such as :meth:`_orm.Session.refresh`, + as well as when an attribute deferred by :func:`_orm.defer` is + being loaded, or an attribute that was expired either directly + by :meth:`_orm.Session.expire` or via a commit operation is being + loaded. + + Handlers will very likely not want to add any options to queries + when such an operation is occurring as the query should be a straight + primary key fetch which should not have any additional WHERE criteria, + and loader options travelling with the instance + will have already been added to the query. + + .. versionadded:: 1.4.0b2 + + .. seealso:: + + :attr:`_orm.ORMExecuteState.is_relationship_load` + + """ + opts = self._orm_compile_options() + return opts is not None and opts._for_refresh_state + + @property + def is_relationship_load(self) -> bool: + """Return True if this load is loading objects on behalf of a + relationship. + + This means, the loader in effect is either a LazyLoader, + SelectInLoader, SubqueryLoader, or similar, and the entire + SELECT statement being emitted is on behalf of a relationship + load. + + Handlers will very likely not want to add any options to queries + when such an operation is occurring, as loader options are already + capable of being propagated to relationship loaders and should + be already present. + + .. seealso:: + + :attr:`_orm.ORMExecuteState.is_column_load` + + """ + opts = self._orm_compile_options() + if opts is None: + return False + path = self.loader_strategy_path + return path is not None and not path.is_root + + @property + def load_options( + self, + ) -> Union[ + context.QueryContext.default_load_options, + Type[context.QueryContext.default_load_options], + ]: + """Return the load_options that will be used for this execution.""" + + if not self.is_select: + raise sa_exc.InvalidRequestError( + "This ORM execution is not against a SELECT statement " + "so there are no load options." + ) + + lo: Union[ + context.QueryContext.default_load_options, + Type[context.QueryContext.default_load_options], + ] = self.execution_options.get( + "_sa_orm_load_options", context.QueryContext.default_load_options + ) + return lo + + @property + def update_delete_options( + self, + ) -> Union[ + bulk_persistence.BulkUDCompileState.default_update_options, + Type[bulk_persistence.BulkUDCompileState.default_update_options], + ]: + """Return the update_delete_options that will be used for this + execution.""" + + if not self._is_crud: + raise sa_exc.InvalidRequestError( + "This ORM execution is not against an UPDATE or DELETE " + "statement so there are no update options." + ) + uo: Union[ + bulk_persistence.BulkUDCompileState.default_update_options, + Type[bulk_persistence.BulkUDCompileState.default_update_options], + ] = self.execution_options.get( + "_sa_orm_update_options", + bulk_persistence.BulkUDCompileState.default_update_options, + ) + return uo + + @property + def _non_compile_orm_options(self) -> Sequence[ORMOption]: + return [ + opt + for opt in self.statement._with_options + if is_orm_option(opt) and not opt._is_compile_state + ] + + @property + def user_defined_options(self) -> Sequence[UserDefinedOption]: + """The sequence of :class:`.UserDefinedOptions` that have been + associated with the statement being invoked. + + """ + return [ + opt + for opt in self.statement._with_options + if is_user_defined_option(opt) + ] + + +class SessionTransactionOrigin(Enum): + """indicates the origin of a :class:`.SessionTransaction`. + + This enumeration is present on the + :attr:`.SessionTransaction.origin` attribute of any + :class:`.SessionTransaction` object. + + .. versionadded:: 2.0 + + """ + + AUTOBEGIN = 0 + """transaction were started by autobegin""" + + BEGIN = 1 + """transaction were started by calling :meth:`_orm.Session.begin`""" + + BEGIN_NESTED = 2 + """tranaction were started by :meth:`_orm.Session.begin_nested`""" + + SUBTRANSACTION = 3 + """transaction is an internal "subtransaction" """ + + +class SessionTransaction(_StateChange, TransactionalContext): + """A :class:`.Session`-level transaction. + + :class:`.SessionTransaction` is produced from the + :meth:`_orm.Session.begin` + and :meth:`_orm.Session.begin_nested` methods. It's largely an internal + object that in modern use provides a context manager for session + transactions. + + Documentation on interacting with :class:`_orm.SessionTransaction` is + at: :ref:`unitofwork_transaction`. + + + .. versionchanged:: 1.4 The scoping and API methods to work with the + :class:`_orm.SessionTransaction` object directly have been simplified. + + .. seealso:: + + :ref:`unitofwork_transaction` + + :meth:`.Session.begin` + + :meth:`.Session.begin_nested` + + :meth:`.Session.rollback` + + :meth:`.Session.commit` + + :meth:`.Session.in_transaction` + + :meth:`.Session.in_nested_transaction` + + :meth:`.Session.get_transaction` + + :meth:`.Session.get_nested_transaction` + + + """ + + _rollback_exception: Optional[BaseException] = None + + _connections: Dict[ + Union[Engine, Connection], Tuple[Connection, Transaction, bool, bool] + ] + session: Session + _parent: Optional[SessionTransaction] + + _state: SessionTransactionState + + _new: weakref.WeakKeyDictionary[InstanceState[Any], object] + _deleted: weakref.WeakKeyDictionary[InstanceState[Any], object] + _dirty: weakref.WeakKeyDictionary[InstanceState[Any], object] + _key_switches: weakref.WeakKeyDictionary[ + InstanceState[Any], Tuple[Any, Any] + ] + + origin: SessionTransactionOrigin + """Origin of this :class:`_orm.SessionTransaction`. + + Refers to a :class:`.SessionTransactionOrigin` instance which is an + enumeration indicating the source event that led to constructing + this :class:`_orm.SessionTransaction`. + + .. versionadded:: 2.0 + + """ + + nested: bool = False + """Indicates if this is a nested, or SAVEPOINT, transaction. + + When :attr:`.SessionTransaction.nested` is True, it is expected + that :attr:`.SessionTransaction.parent` will be present as well, + linking to the enclosing :class:`.SessionTransaction`. + + .. seealso:: + + :attr:`.SessionTransaction.origin` + + """ + + def __init__( + self, + session: Session, + origin: SessionTransactionOrigin, + parent: Optional[SessionTransaction] = None, + ): + TransactionalContext._trans_ctx_check(session) + + self.session = session + self._connections = {} + self._parent = parent + self.nested = nested = origin is SessionTransactionOrigin.BEGIN_NESTED + self.origin = origin + + if session._close_state is _SessionCloseState.CLOSED: + raise sa_exc.InvalidRequestError( + "This Session has been permanently closed and is unable " + "to handle any more transaction requests." + ) + + if nested: + if not parent: + raise sa_exc.InvalidRequestError( + "Can't start a SAVEPOINT transaction when no existing " + "transaction is in progress" + ) + + self._previous_nested_transaction = session._nested_transaction + elif origin is SessionTransactionOrigin.SUBTRANSACTION: + assert parent is not None + else: + assert parent is None + + self._state = SessionTransactionState.ACTIVE + + self._take_snapshot() + + # make sure transaction is assigned before we call the + # dispatch + self.session._transaction = self + + self.session.dispatch.after_transaction_create(self.session, self) + + def _raise_for_prerequisite_state( + self, operation_name: str, state: _StateChangeState + ) -> NoReturn: + if state is SessionTransactionState.DEACTIVE: + if self._rollback_exception: + raise sa_exc.PendingRollbackError( + "This Session's transaction has been rolled back " + "due to a previous exception during flush." + " To begin a new transaction with this Session, " + "first issue Session.rollback()." + f" Original exception was: {self._rollback_exception}", + code="7s2a", + ) + else: + raise sa_exc.InvalidRequestError( + "This session is in 'inactive' state, due to the " + "SQL transaction being rolled back; no further SQL " + "can be emitted within this transaction." + ) + elif state is SessionTransactionState.CLOSED: + raise sa_exc.ResourceClosedError("This transaction is closed") + elif state is SessionTransactionState.PROVISIONING_CONNECTION: + raise sa_exc.InvalidRequestError( + "This session is provisioning a new connection; concurrent " + "operations are not permitted", + code="isce", + ) + else: + raise sa_exc.InvalidRequestError( + f"This session is in '{state.name.lower()}' state; no " + "further SQL can be emitted within this transaction." + ) + + @property + def parent(self) -> Optional[SessionTransaction]: + """The parent :class:`.SessionTransaction` of this + :class:`.SessionTransaction`. + + If this attribute is ``None``, indicates this + :class:`.SessionTransaction` is at the top of the stack, and + corresponds to a real "COMMIT"/"ROLLBACK" + block. If non-``None``, then this is either a "subtransaction" + (an internal marker object used by the flush process) or a + "nested" / SAVEPOINT transaction. If the + :attr:`.SessionTransaction.nested` attribute is ``True``, then + this is a SAVEPOINT, and if ``False``, indicates this a subtransaction. + + """ + return self._parent + + @property + def is_active(self) -> bool: + return ( + self.session is not None + and self._state is SessionTransactionState.ACTIVE + ) + + @property + def _is_transaction_boundary(self) -> bool: + return self.nested or not self._parent + + @_StateChange.declare_states( + (SessionTransactionState.ACTIVE,), _StateChangeStates.NO_CHANGE + ) + def connection( + self, + bindkey: Optional[Mapper[Any]], + execution_options: Optional[_ExecuteOptions] = None, + **kwargs: Any, + ) -> Connection: + bind = self.session.get_bind(bindkey, **kwargs) + return self._connection_for_bind(bind, execution_options) + + @_StateChange.declare_states( + (SessionTransactionState.ACTIVE,), _StateChangeStates.NO_CHANGE + ) + def _begin(self, nested: bool = False) -> SessionTransaction: + return SessionTransaction( + self.session, + ( + SessionTransactionOrigin.BEGIN_NESTED + if nested + else SessionTransactionOrigin.SUBTRANSACTION + ), + self, + ) + + def _iterate_self_and_parents( + self, upto: Optional[SessionTransaction] = None + ) -> Iterable[SessionTransaction]: + current = self + result: Tuple[SessionTransaction, ...] = () + while current: + result += (current,) + if current._parent is upto: + break + elif current._parent is None: + raise sa_exc.InvalidRequestError( + "Transaction %s is not on the active transaction list" + % (upto) + ) + else: + current = current._parent + + return result + + def _take_snapshot(self) -> None: + if not self._is_transaction_boundary: + parent = self._parent + assert parent is not None + self._new = parent._new + self._deleted = parent._deleted + self._dirty = parent._dirty + self._key_switches = parent._key_switches + return + + is_begin = self.origin in ( + SessionTransactionOrigin.BEGIN, + SessionTransactionOrigin.AUTOBEGIN, + ) + if not is_begin and not self.session._flushing: + self.session.flush() + + self._new = weakref.WeakKeyDictionary() + self._deleted = weakref.WeakKeyDictionary() + self._dirty = weakref.WeakKeyDictionary() + self._key_switches = weakref.WeakKeyDictionary() + + def _restore_snapshot(self, dirty_only: bool = False) -> None: + """Restore the restoration state taken before a transaction began. + + Corresponds to a rollback. + + """ + assert self._is_transaction_boundary + + to_expunge = set(self._new).union(self.session._new) + self.session._expunge_states(to_expunge, to_transient=True) + + for s, (oldkey, newkey) in self._key_switches.items(): + # we probably can do this conditionally based on + # if we expunged or not, but safe_discard does that anyway + self.session.identity_map.safe_discard(s) + + # restore the old key + s.key = oldkey + + # now restore the object, but only if we didn't expunge + if s not in to_expunge: + self.session.identity_map.replace(s) + + for s in set(self._deleted).union(self.session._deleted): + self.session._update_impl(s, revert_deletion=True) + + assert not self.session._deleted + + for s in self.session.identity_map.all_states(): + if not dirty_only or s.modified or s in self._dirty: + s._expire(s.dict, self.session.identity_map._modified) + + def _remove_snapshot(self) -> None: + """Remove the restoration state taken before a transaction began. + + Corresponds to a commit. + + """ + assert self._is_transaction_boundary + + if not self.nested and self.session.expire_on_commit: + for s in self.session.identity_map.all_states(): + s._expire(s.dict, self.session.identity_map._modified) + + statelib.InstanceState._detach_states( + list(self._deleted), self.session + ) + self._deleted.clear() + elif self.nested: + parent = self._parent + assert parent is not None + parent._new.update(self._new) + parent._dirty.update(self._dirty) + parent._deleted.update(self._deleted) + parent._key_switches.update(self._key_switches) + + @_StateChange.declare_states( + (SessionTransactionState.ACTIVE,), _StateChangeStates.NO_CHANGE + ) + def _connection_for_bind( + self, + bind: _SessionBind, + execution_options: Optional[CoreExecuteOptionsParameter], + ) -> Connection: + if bind in self._connections: + if execution_options: + util.warn( + "Connection is already established for the " + "given bind; execution_options ignored" + ) + return self._connections[bind][0] + + self._state = SessionTransactionState.PROVISIONING_CONNECTION + + local_connect = False + should_commit = True + + try: + if self._parent: + conn = self._parent._connection_for_bind( + bind, execution_options + ) + if not self.nested: + return conn + else: + if isinstance(bind, engine.Connection): + conn = bind + if conn.engine in self._connections: + raise sa_exc.InvalidRequestError( + "Session already has a Connection associated " + "for the given Connection's Engine" + ) + else: + conn = bind.connect() + local_connect = True + + try: + if execution_options: + conn = conn.execution_options(**execution_options) + + transaction: Transaction + if self.session.twophase and self._parent is None: + # TODO: shouldn't we only be here if not + # conn.in_transaction() ? + # if twophase is set and conn.in_transaction(), validate + # that it is in fact twophase. + transaction = conn.begin_twophase() + elif self.nested: + transaction = conn.begin_nested() + elif conn.in_transaction(): + join_transaction_mode = self.session.join_transaction_mode + + if join_transaction_mode == "conditional_savepoint": + if conn.in_nested_transaction(): + join_transaction_mode = "create_savepoint" + else: + join_transaction_mode = "rollback_only" + + if join_transaction_mode in ( + "control_fully", + "rollback_only", + ): + if conn.in_nested_transaction(): + transaction = ( + conn._get_required_nested_transaction() + ) + else: + transaction = conn._get_required_transaction() + if join_transaction_mode == "rollback_only": + should_commit = False + elif join_transaction_mode == "create_savepoint": + transaction = conn.begin_nested() + else: + assert False, join_transaction_mode + else: + transaction = conn.begin() + except: + # connection will not not be associated with this Session; + # close it immediately so that it isn't closed under GC + if local_connect: + conn.close() + raise + else: + bind_is_connection = isinstance(bind, engine.Connection) + + self._connections[conn] = self._connections[conn.engine] = ( + conn, + transaction, + should_commit, + not bind_is_connection, + ) + self.session.dispatch.after_begin(self.session, self, conn) + return conn + finally: + self._state = SessionTransactionState.ACTIVE + + def prepare(self) -> None: + if self._parent is not None or not self.session.twophase: + raise sa_exc.InvalidRequestError( + "'twophase' mode not enabled, or not root transaction; " + "can't prepare." + ) + self._prepare_impl() + + @_StateChange.declare_states( + (SessionTransactionState.ACTIVE,), SessionTransactionState.PREPARED + ) + def _prepare_impl(self) -> None: + if self._parent is None or self.nested: + self.session.dispatch.before_commit(self.session) + + stx = self.session._transaction + assert stx is not None + if stx is not self: + for subtransaction in stx._iterate_self_and_parents(upto=self): + subtransaction.commit() + + if not self.session._flushing: + for _flush_guard in range(100): + if self.session._is_clean(): + break + self.session.flush() + else: + raise exc.FlushError( + "Over 100 subsequent flushes have occurred within " + "session.commit() - is an after_flush() hook " + "creating new objects?" + ) + + if self._parent is None and self.session.twophase: + try: + for t in set(self._connections.values()): + cast("TwoPhaseTransaction", t[1]).prepare() + except: + with util.safe_reraise(): + self.rollback() + + self._state = SessionTransactionState.PREPARED + + @_StateChange.declare_states( + (SessionTransactionState.ACTIVE, SessionTransactionState.PREPARED), + SessionTransactionState.CLOSED, + ) + def commit(self, _to_root: bool = False) -> None: + if self._state is not SessionTransactionState.PREPARED: + with self._expect_state(SessionTransactionState.PREPARED): + self._prepare_impl() + + if self._parent is None or self.nested: + for conn, trans, should_commit, autoclose in set( + self._connections.values() + ): + if should_commit: + trans.commit() + + self._state = SessionTransactionState.COMMITTED + self.session.dispatch.after_commit(self.session) + + self._remove_snapshot() + + with self._expect_state(SessionTransactionState.CLOSED): + self.close() + + if _to_root and self._parent: + self._parent.commit(_to_root=True) + + @_StateChange.declare_states( + ( + SessionTransactionState.ACTIVE, + SessionTransactionState.DEACTIVE, + SessionTransactionState.PREPARED, + ), + SessionTransactionState.CLOSED, + ) + def rollback( + self, _capture_exception: bool = False, _to_root: bool = False + ) -> None: + stx = self.session._transaction + assert stx is not None + if stx is not self: + for subtransaction in stx._iterate_self_and_parents(upto=self): + subtransaction.close() + + boundary = self + rollback_err = None + if self._state in ( + SessionTransactionState.ACTIVE, + SessionTransactionState.PREPARED, + ): + for transaction in self._iterate_self_and_parents(): + if transaction._parent is None or transaction.nested: + try: + for t in set(transaction._connections.values()): + t[1].rollback() + + transaction._state = SessionTransactionState.DEACTIVE + self.session.dispatch.after_rollback(self.session) + except: + rollback_err = sys.exc_info() + finally: + transaction._state = SessionTransactionState.DEACTIVE + transaction._restore_snapshot( + dirty_only=transaction.nested + ) + boundary = transaction + break + else: + transaction._state = SessionTransactionState.DEACTIVE + + sess = self.session + + if not rollback_err and not sess._is_clean(): + # if items were added, deleted, or mutated + # here, we need to re-restore the snapshot + util.warn( + "Session's state has been changed on " + "a non-active transaction - this state " + "will be discarded." + ) + boundary._restore_snapshot(dirty_only=boundary.nested) + + with self._expect_state(SessionTransactionState.CLOSED): + self.close() + + if self._parent and _capture_exception: + self._parent._rollback_exception = sys.exc_info()[1] + + if rollback_err and rollback_err[1]: + raise rollback_err[1].with_traceback(rollback_err[2]) + + sess.dispatch.after_soft_rollback(sess, self) + + if _to_root and self._parent: + self._parent.rollback(_to_root=True) + + @_StateChange.declare_states( + _StateChangeStates.ANY, SessionTransactionState.CLOSED + ) + def close(self, invalidate: bool = False) -> None: + if self.nested: + self.session._nested_transaction = ( + self._previous_nested_transaction + ) + + self.session._transaction = self._parent + + for connection, transaction, should_commit, autoclose in set( + self._connections.values() + ): + if invalidate and self._parent is None: + connection.invalidate() + if should_commit and transaction.is_active: + transaction.close() + if autoclose and self._parent is None: + connection.close() + + self._state = SessionTransactionState.CLOSED + sess = self.session + + # TODO: these two None sets were historically after the + # event hook below, and in 2.0 I changed it this way for some reason, + # and I remember there being a reason, but not what it was. + # Why do we need to get rid of them at all? test_memusage::CycleTest + # passes with these commented out. + # self.session = None # type: ignore + # self._connections = None # type: ignore + + sess.dispatch.after_transaction_end(sess, self) + + def _get_subject(self) -> Session: + return self.session + + def _transaction_is_active(self) -> bool: + return self._state is SessionTransactionState.ACTIVE + + def _transaction_is_closed(self) -> bool: + return self._state is SessionTransactionState.CLOSED + + def _rollback_can_be_called(self) -> bool: + return self._state not in (COMMITTED, CLOSED) + + +class _SessionCloseState(Enum): + ACTIVE = 1 + CLOSED = 2 + CLOSE_IS_RESET = 3 + + +class Session(_SessionClassMethods, EventTarget): + """Manages persistence operations for ORM-mapped objects. + + The :class:`_orm.Session` is **not safe for use in concurrent threads.**. + See :ref:`session_faq_threadsafe` for background. + + The Session's usage paradigm is described at :doc:`/orm/session`. + + + """ + + _is_asyncio = False + + dispatch: dispatcher[Session] + + identity_map: IdentityMap + """A mapping of object identities to objects themselves. + + Iterating through ``Session.identity_map.values()`` provides + access to the full set of persistent objects (i.e., those + that have row identity) currently in the session. + + .. seealso:: + + :func:`.identity_key` - helper function to produce the keys used + in this dictionary. + + """ + + _new: Dict[InstanceState[Any], Any] + _deleted: Dict[InstanceState[Any], Any] + bind: Optional[Union[Engine, Connection]] + __binds: Dict[_SessionBindKey, _SessionBind] + _flushing: bool + _warn_on_events: bool + _transaction: Optional[SessionTransaction] + _nested_transaction: Optional[SessionTransaction] + hash_key: int + autoflush: bool + expire_on_commit: bool + enable_baked_queries: bool + twophase: bool + join_transaction_mode: JoinTransactionMode + _query_cls: Type[Query[Any]] + _close_state: _SessionCloseState + + def __init__( + self, + bind: Optional[_SessionBind] = None, + *, + autoflush: bool = True, + future: Literal[True] = True, + expire_on_commit: bool = True, + autobegin: bool = True, + twophase: bool = False, + binds: Optional[Dict[_SessionBindKey, _SessionBind]] = None, + enable_baked_queries: bool = True, + info: Optional[_InfoType] = None, + query_cls: Optional[Type[Query[Any]]] = None, + autocommit: Literal[False] = False, + join_transaction_mode: JoinTransactionMode = "conditional_savepoint", + close_resets_only: Union[bool, _NoArg] = _NoArg.NO_ARG, + ): + r"""Construct a new :class:`_orm.Session`. + + See also the :class:`.sessionmaker` function which is used to + generate a :class:`.Session`-producing callable with a given + set of arguments. + + :param autoflush: When ``True``, all query operations will issue a + :meth:`~.Session.flush` call to this ``Session`` before proceeding. + This is a convenience feature so that :meth:`~.Session.flush` need + not be called repeatedly in order for database queries to retrieve + results. + + .. seealso:: + + :ref:`session_flushing` - additional background on autoflush + + :param autobegin: Automatically start transactions (i.e. equivalent to + invoking :meth:`_orm.Session.begin`) when database access is + requested by an operation. Defaults to ``True``. Set to + ``False`` to prevent a :class:`_orm.Session` from implicitly + beginning transactions after construction, as well as after any of + the :meth:`_orm.Session.rollback`, :meth:`_orm.Session.commit`, + or :meth:`_orm.Session.close` methods are called. + + .. versionadded:: 2.0 + + .. seealso:: + + :ref:`session_autobegin_disable` + + :param bind: An optional :class:`_engine.Engine` or + :class:`_engine.Connection` to + which this ``Session`` should be bound. When specified, all SQL + operations performed by this session will execute via this + connectable. + + :param binds: A dictionary which may specify any number of + :class:`_engine.Engine` or :class:`_engine.Connection` + objects as the source of + connectivity for SQL operations on a per-entity basis. The keys + of the dictionary consist of any series of mapped classes, + arbitrary Python classes that are bases for mapped classes, + :class:`_schema.Table` objects and :class:`_orm.Mapper` objects. + The + values of the dictionary are then instances of + :class:`_engine.Engine` + or less commonly :class:`_engine.Connection` objects. + Operations which + proceed relative to a particular mapped class will consult this + dictionary for the closest matching entity in order to determine + which :class:`_engine.Engine` should be used for a particular SQL + operation. The complete heuristics for resolution are + described at :meth:`.Session.get_bind`. Usage looks like:: + + Session = sessionmaker(binds={ + SomeMappedClass: create_engine('postgresql+psycopg2://engine1'), + SomeDeclarativeBase: create_engine('postgresql+psycopg2://engine2'), + some_mapper: create_engine('postgresql+psycopg2://engine3'), + some_table: create_engine('postgresql+psycopg2://engine4'), + }) + + .. seealso:: + + :ref:`session_partitioning` + + :meth:`.Session.bind_mapper` + + :meth:`.Session.bind_table` + + :meth:`.Session.get_bind` + + + :param \class_: Specify an alternate class other than + ``sqlalchemy.orm.session.Session`` which should be used by the + returned class. This is the only argument that is local to the + :class:`.sessionmaker` function, and is not sent directly to the + constructor for ``Session``. + + :param enable_baked_queries: legacy; defaults to ``True``. + A parameter consumed + by the :mod:`sqlalchemy.ext.baked` extension to determine if + "baked queries" should be cached, as is the normal operation + of this extension. When set to ``False``, caching as used by + this particular extension is disabled. + + .. versionchanged:: 1.4 The ``sqlalchemy.ext.baked`` extension is + legacy and is not used by any of SQLAlchemy's internals. This + flag therefore only affects applications that are making explicit + use of this extension within their own code. + + :param expire_on_commit: Defaults to ``True``. When ``True``, all + instances will be fully expired after each :meth:`~.commit`, + so that all attribute/object access subsequent to a completed + transaction will load from the most recent database state. + + .. seealso:: + + :ref:`session_committing` + + :param future: Deprecated; this flag is always True. + + .. seealso:: + + :ref:`migration_20_toplevel` + + :param info: optional dictionary of arbitrary data to be associated + with this :class:`.Session`. Is available via the + :attr:`.Session.info` attribute. Note the dictionary is copied at + construction time so that modifications to the per- + :class:`.Session` dictionary will be local to that + :class:`.Session`. + + :param query_cls: Class which should be used to create new Query + objects, as returned by the :meth:`~.Session.query` method. + Defaults to :class:`_query.Query`. + + :param twophase: When ``True``, all transactions will be started as + a "two phase" transaction, i.e. using the "two phase" semantics + of the database in use along with an XID. During a + :meth:`~.commit`, after :meth:`~.flush` has been issued for all + attached databases, the :meth:`~.TwoPhaseTransaction.prepare` + method on each database's :class:`.TwoPhaseTransaction` will be + called. This allows each database to roll back the entire + transaction, before each transaction is committed. + + :param autocommit: the "autocommit" keyword is present for backwards + compatibility but must remain at its default value of ``False``. + + :param join_transaction_mode: Describes the transactional behavior to + take when a given bind is a :class:`_engine.Connection` that + has already begun a transaction outside the scope of this + :class:`_orm.Session`; in other words the + :meth:`_engine.Connection.in_transaction()` method returns True. + + The following behaviors only take effect when the :class:`_orm.Session` + **actually makes use of the connection given**; that is, a method + such as :meth:`_orm.Session.execute`, :meth:`_orm.Session.connection`, + etc. are actually invoked: + + * ``"conditional_savepoint"`` - this is the default. if the given + :class:`_engine.Connection` is begun within a transaction but + does not have a SAVEPOINT, then ``"rollback_only"`` is used. + If the :class:`_engine.Connection` is additionally within + a SAVEPOINT, in other words + :meth:`_engine.Connection.in_nested_transaction()` method returns + True, then ``"create_savepoint"`` is used. + + ``"conditional_savepoint"`` behavior attempts to make use of + savepoints in order to keep the state of the existing transaction + unchanged, but only if there is already a savepoint in progress; + otherwise, it is not assumed that the backend in use has adequate + support for SAVEPOINT, as availability of this feature varies. + ``"conditional_savepoint"`` also seeks to establish approximate + backwards compatibility with previous :class:`_orm.Session` + behavior, for applications that are not setting a specific mode. It + is recommended that one of the explicit settings be used. + + * ``"create_savepoint"`` - the :class:`_orm.Session` will use + :meth:`_engine.Connection.begin_nested()` in all cases to create + its own transaction. This transaction by its nature rides + "on top" of any existing transaction that's opened on the given + :class:`_engine.Connection`; if the underlying database and + the driver in use has full, non-broken support for SAVEPOINT, the + external transaction will remain unaffected throughout the + lifespan of the :class:`_orm.Session`. + + The ``"create_savepoint"`` mode is the most useful for integrating + a :class:`_orm.Session` into a test suite where an externally + initiated transaction should remain unaffected; however, it relies + on proper SAVEPOINT support from the underlying driver and + database. + + .. tip:: When using SQLite, the SQLite driver included through + Python 3.11 does not handle SAVEPOINTs correctly in all cases + without workarounds. See the sections + :ref:`pysqlite_serializable` and :ref:`aiosqlite_serializable` + for details on current workarounds. + + * ``"control_fully"`` - the :class:`_orm.Session` will take + control of the given transaction as its own; + :meth:`_orm.Session.commit` will call ``.commit()`` on the + transaction, :meth:`_orm.Session.rollback` will call + ``.rollback()`` on the transaction, :meth:`_orm.Session.close` will + call ``.rollback`` on the transaction. + + .. tip:: This mode of use is equivalent to how SQLAlchemy 1.4 would + handle a :class:`_engine.Connection` given with an existing + SAVEPOINT (i.e. :meth:`_engine.Connection.begin_nested`); the + :class:`_orm.Session` would take full control of the existing + SAVEPOINT. + + * ``"rollback_only"`` - the :class:`_orm.Session` will take control + of the given transaction for ``.rollback()`` calls only; + ``.commit()`` calls will not be propagated to the given + transaction. ``.close()`` calls will have no effect on the + given transaction. + + .. tip:: This mode of use is equivalent to how SQLAlchemy 1.4 would + handle a :class:`_engine.Connection` given with an existing + regular database transaction (i.e. + :meth:`_engine.Connection.begin`); the :class:`_orm.Session` + would propagate :meth:`_orm.Session.rollback` calls to the + underlying transaction, but not :meth:`_orm.Session.commit` or + :meth:`_orm.Session.close` calls. + + .. versionadded:: 2.0.0rc1 + + :param close_resets_only: Defaults to ``True``. Determines if + the session should reset itself after calling ``.close()`` + or should pass in a no longer usable state, disabling re-use. + + .. versionadded:: 2.0.22 added flag ``close_resets_only``. + A future SQLAlchemy version may change the default value of + this flag to ``False``. + + .. seealso:: + + :ref:`session_closing` - Detail on the semantics of + :meth:`_orm.Session.close` and :meth:`_orm.Session.reset`. + + """ # noqa + + # considering allowing the "autocommit" keyword to still be accepted + # as long as it's False, so that external test suites, oslo.db etc + # continue to function as the argument appears to be passed in lots + # of cases including in our own test suite + if autocommit: + raise sa_exc.ArgumentError( + "autocommit=True is no longer supported" + ) + self.identity_map = identity.WeakInstanceDict() + + if not future: + raise sa_exc.ArgumentError( + "The 'future' parameter passed to " + "Session() may only be set to True." + ) + + self._new = {} # InstanceState->object, strong refs object + self._deleted = {} # same + self.bind = bind + self.__binds = {} + self._flushing = False + self._warn_on_events = False + self._transaction = None + self._nested_transaction = None + self.hash_key = _new_sessionid() + self.autobegin = autobegin + self.autoflush = autoflush + self.expire_on_commit = expire_on_commit + self.enable_baked_queries = enable_baked_queries + + # the idea is that at some point NO_ARG will warn that in the future + # the default will switch to close_resets_only=False. + if close_resets_only or close_resets_only is _NoArg.NO_ARG: + self._close_state = _SessionCloseState.CLOSE_IS_RESET + else: + self._close_state = _SessionCloseState.ACTIVE + if ( + join_transaction_mode + and join_transaction_mode + not in JoinTransactionMode.__args__ # type: ignore + ): + raise sa_exc.ArgumentError( + f"invalid selection for join_transaction_mode: " + f'"{join_transaction_mode}"' + ) + self.join_transaction_mode = join_transaction_mode + + self.twophase = twophase + self._query_cls = query_cls if query_cls else query.Query + if info: + self.info.update(info) + + if binds is not None: + for key, bind in binds.items(): + self._add_bind(key, bind) + + _sessions[self.hash_key] = self + + # used by sqlalchemy.engine.util.TransactionalContext + _trans_context_manager: Optional[TransactionalContext] = None + + connection_callable: Optional[_ConnectionCallableProto] = None + + def __enter__(self: _S) -> _S: + return self + + def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: + self.close() + + @contextlib.contextmanager + def _maker_context_manager(self: _S) -> Iterator[_S]: + with self: + with self.begin(): + yield self + + def in_transaction(self) -> bool: + """Return True if this :class:`_orm.Session` has begun a transaction. + + .. versionadded:: 1.4 + + .. seealso:: + + :attr:`_orm.Session.is_active` + + + """ + return self._transaction is not None + + def in_nested_transaction(self) -> bool: + """Return True if this :class:`_orm.Session` has begun a nested + transaction, e.g. SAVEPOINT. + + .. versionadded:: 1.4 + + """ + return self._nested_transaction is not None + + def get_transaction(self) -> Optional[SessionTransaction]: + """Return the current root transaction in progress, if any. + + .. versionadded:: 1.4 + + """ + trans = self._transaction + while trans is not None and trans._parent is not None: + trans = trans._parent + return trans + + def get_nested_transaction(self) -> Optional[SessionTransaction]: + """Return the current nested transaction in progress, if any. + + .. versionadded:: 1.4 + + """ + + return self._nested_transaction + + @util.memoized_property + def info(self) -> _InfoType: + """A user-modifiable dictionary. + + The initial value of this dictionary can be populated using the + ``info`` argument to the :class:`.Session` constructor or + :class:`.sessionmaker` constructor or factory methods. The dictionary + here is always local to this :class:`.Session` and can be modified + independently of all other :class:`.Session` objects. + + """ + return {} + + def _autobegin_t(self, begin: bool = False) -> SessionTransaction: + if self._transaction is None: + if not begin and not self.autobegin: + raise sa_exc.InvalidRequestError( + "Autobegin is disabled on this Session; please call " + "session.begin() to start a new transaction" + ) + trans = SessionTransaction( + self, + ( + SessionTransactionOrigin.BEGIN + if begin + else SessionTransactionOrigin.AUTOBEGIN + ), + ) + assert self._transaction is trans + return trans + + return self._transaction + + def begin(self, nested: bool = False) -> SessionTransaction: + """Begin a transaction, or nested transaction, + on this :class:`.Session`, if one is not already begun. + + The :class:`_orm.Session` object features **autobegin** behavior, + so that normally it is not necessary to call the + :meth:`_orm.Session.begin` + method explicitly. However, it may be used in order to control + the scope of when the transactional state is begun. + + When used to begin the outermost transaction, an error is raised + if this :class:`.Session` is already inside of a transaction. + + :param nested: if True, begins a SAVEPOINT transaction and is + equivalent to calling :meth:`~.Session.begin_nested`. For + documentation on SAVEPOINT transactions, please see + :ref:`session_begin_nested`. + + :return: the :class:`.SessionTransaction` object. Note that + :class:`.SessionTransaction` + acts as a Python context manager, allowing :meth:`.Session.begin` + to be used in a "with" block. See :ref:`session_explicit_begin` for + an example. + + .. seealso:: + + :ref:`session_autobegin` + + :ref:`unitofwork_transaction` + + :meth:`.Session.begin_nested` + + + """ + + trans = self._transaction + if trans is None: + trans = self._autobegin_t(begin=True) + + if not nested: + return trans + + assert trans is not None + + if nested: + trans = trans._begin(nested=nested) + assert self._transaction is trans + self._nested_transaction = trans + else: + raise sa_exc.InvalidRequestError( + "A transaction is already begun on this Session." + ) + + return trans # needed for __enter__/__exit__ hook + + def begin_nested(self) -> SessionTransaction: + """Begin a "nested" transaction on this Session, e.g. SAVEPOINT. + + The target database(s) and associated drivers must support SQL + SAVEPOINT for this method to function correctly. + + For documentation on SAVEPOINT + transactions, please see :ref:`session_begin_nested`. + + :return: the :class:`.SessionTransaction` object. Note that + :class:`.SessionTransaction` acts as a context manager, allowing + :meth:`.Session.begin_nested` to be used in a "with" block. + See :ref:`session_begin_nested` for a usage example. + + .. seealso:: + + :ref:`session_begin_nested` + + :ref:`pysqlite_serializable` - special workarounds required + with the SQLite driver in order for SAVEPOINT to work + correctly. For asyncio use cases, see the section + :ref:`aiosqlite_serializable`. + + """ + return self.begin(nested=True) + + def rollback(self) -> None: + """Rollback the current transaction in progress. + + If no transaction is in progress, this method is a pass-through. + + The method always rolls back + the topmost database transaction, discarding any nested + transactions that may be in progress. + + .. seealso:: + + :ref:`session_rollback` + + :ref:`unitofwork_transaction` + + """ + if self._transaction is None: + pass + else: + self._transaction.rollback(_to_root=True) + + def commit(self) -> None: + """Flush pending changes and commit the current transaction. + + When the COMMIT operation is complete, all objects are fully + :term:`expired`, erasing their internal contents, which will be + automatically re-loaded when the objects are next accessed. In the + interim, these objects are in an expired state and will not function if + they are :term:`detached` from the :class:`.Session`. Additionally, + this re-load operation is not supported when using asyncio-oriented + APIs. The :paramref:`.Session.expire_on_commit` parameter may be used + to disable this behavior. + + When there is no transaction in place for the :class:`.Session`, + indicating that no operations were invoked on this :class:`.Session` + since the previous call to :meth:`.Session.commit`, the method will + begin and commit an internal-only "logical" transaction, that does not + normally affect the database unless pending flush changes were + detected, but will still invoke event handlers and object expiration + rules. + + The outermost database transaction is committed unconditionally, + automatically releasing any SAVEPOINTs in effect. + + .. seealso:: + + :ref:`session_committing` + + :ref:`unitofwork_transaction` + + :ref:`asyncio_orm_avoid_lazyloads` + + """ + trans = self._transaction + if trans is None: + trans = self._autobegin_t() + + trans.commit(_to_root=True) + + def prepare(self) -> None: + """Prepare the current transaction in progress for two phase commit. + + If no transaction is in progress, this method raises an + :exc:`~sqlalchemy.exc.InvalidRequestError`. + + Only root transactions of two phase sessions can be prepared. If the + current transaction is not such, an + :exc:`~sqlalchemy.exc.InvalidRequestError` is raised. + + """ + trans = self._transaction + if trans is None: + trans = self._autobegin_t() + + trans.prepare() + + def connection( + self, + bind_arguments: Optional[_BindArguments] = None, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> Connection: + r"""Return a :class:`_engine.Connection` object corresponding to this + :class:`.Session` object's transactional state. + + Either the :class:`_engine.Connection` corresponding to the current + transaction is returned, or if no transaction is in progress, a new + one is begun and the :class:`_engine.Connection` + returned (note that no + transactional state is established with the DBAPI until the first + SQL statement is emitted). + + Ambiguity in multi-bind or unbound :class:`.Session` objects can be + resolved through any of the optional keyword arguments. This + ultimately makes usage of the :meth:`.get_bind` method for resolution. + + :param bind_arguments: dictionary of bind arguments. May include + "mapper", "bind", "clause", other custom arguments that are passed + to :meth:`.Session.get_bind`. + + :param execution_options: a dictionary of execution options that will + be passed to :meth:`_engine.Connection.execution_options`, **when the + connection is first procured only**. If the connection is already + present within the :class:`.Session`, a warning is emitted and + the arguments are ignored. + + .. seealso:: + + :ref:`session_transaction_isolation` + + """ + + if bind_arguments: + bind = bind_arguments.pop("bind", None) + + if bind is None: + bind = self.get_bind(**bind_arguments) + else: + bind = self.get_bind() + + return self._connection_for_bind( + bind, + execution_options=execution_options, + ) + + def _connection_for_bind( + self, + engine: _SessionBind, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + **kw: Any, + ) -> Connection: + TransactionalContext._trans_ctx_check(self) + + trans = self._transaction + if trans is None: + trans = self._autobegin_t() + return trans._connection_for_bind(engine, execution_options) + + @overload + def _execute_internal( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + _scalar_result: Literal[True] = ..., + ) -> Any: ... + + @overload + def _execute_internal( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + _scalar_result: bool = ..., + ) -> Result[Any]: ... + + def _execute_internal( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + _scalar_result: bool = False, + ) -> Any: + statement = coercions.expect(roles.StatementRole, statement) + + if not bind_arguments: + bind_arguments = {} + else: + bind_arguments = dict(bind_arguments) + + if ( + statement._propagate_attrs.get("compile_state_plugin", None) + == "orm" + ): + compile_state_cls = CompileState._get_plugin_class_for_plugin( + statement, "orm" + ) + if TYPE_CHECKING: + assert isinstance( + compile_state_cls, context.AbstractORMCompileState + ) + else: + compile_state_cls = None + bind_arguments.setdefault("clause", statement) + + execution_options = util.coerce_to_immutabledict(execution_options) + + if _parent_execute_state: + events_todo = _parent_execute_state._remaining_events() + else: + events_todo = self.dispatch.do_orm_execute + if _add_event: + events_todo = list(events_todo) + [_add_event] + + if events_todo: + if compile_state_cls is not None: + # for event handlers, do the orm_pre_session_exec + # pass ahead of the event handlers, so that things like + # .load_options, .update_delete_options etc. are populated. + # is_pre_event=True allows the hook to hold off on things + # it doesn't want to do twice, including autoflush as well + # as "pre fetch" for DML, etc. + ( + statement, + execution_options, + ) = compile_state_cls.orm_pre_session_exec( + self, + statement, + params, + execution_options, + bind_arguments, + True, + ) + + orm_exec_state = ORMExecuteState( + self, + statement, + params, + execution_options, + bind_arguments, + compile_state_cls, + events_todo, + ) + for idx, fn in enumerate(events_todo): + orm_exec_state._starting_event_idx = idx + fn_result: Optional[Result[Any]] = fn(orm_exec_state) + if fn_result: + if _scalar_result: + return fn_result.scalar() + else: + return fn_result + + statement = orm_exec_state.statement + execution_options = orm_exec_state.local_execution_options + + if compile_state_cls is not None: + # now run orm_pre_session_exec() "for real". if there were + # event hooks, this will re-run the steps that interpret + # new execution_options into load_options / update_delete_options, + # which we assume the event hook might have updated. + # autoflush will also be invoked in this step if enabled. + ( + statement, + execution_options, + ) = compile_state_cls.orm_pre_session_exec( + self, + statement, + params, + execution_options, + bind_arguments, + False, + ) + + bind = self.get_bind(**bind_arguments) + + conn = self._connection_for_bind(bind) + + if _scalar_result and not compile_state_cls: + if TYPE_CHECKING: + params = cast(_CoreSingleExecuteParams, params) + return conn.scalar( + statement, params or {}, execution_options=execution_options + ) + + if compile_state_cls: + result: Result[Any] = compile_state_cls.orm_execute_statement( + self, + statement, + params or {}, + execution_options, + bind_arguments, + conn, + ) + else: + result = conn.execute( + statement, params or {}, execution_options=execution_options + ) + + if _scalar_result: + return result.scalar() + else: + return result + + @overload + def execute( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[_T]: ... + + @overload + def execute( + self, + statement: UpdateBase, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> CursorResult[Any]: ... + + @overload + def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[Any]: ... + + def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[Any]: + r"""Execute a SQL expression construct. + + Returns a :class:`_engine.Result` object representing + results of the statement execution. + + E.g.:: + + from sqlalchemy import select + result = session.execute( + select(User).where(User.id == 5) + ) + + The API contract of :meth:`_orm.Session.execute` is similar to that + of :meth:`_engine.Connection.execute`, the :term:`2.0 style` version + of :class:`_engine.Connection`. + + .. versionchanged:: 1.4 the :meth:`_orm.Session.execute` method is + now the primary point of ORM statement execution when using + :term:`2.0 style` ORM usage. + + :param statement: + An executable statement (i.e. an :class:`.Executable` expression + such as :func:`_expression.select`). + + :param params: + Optional dictionary, or list of dictionaries, containing + bound parameter values. If a single dictionary, single-row + execution occurs; if a list of dictionaries, an + "executemany" will be invoked. The keys in each dictionary + must correspond to parameter names present in the statement. + + :param execution_options: optional dictionary of execution options, + which will be associated with the statement execution. This + dictionary can provide a subset of the options that are accepted + by :meth:`_engine.Connection.execution_options`, and may also + provide additional options understood only in an ORM context. + + .. seealso:: + + :ref:`orm_queryguide_execution_options` - ORM-specific execution + options + + :param bind_arguments: dictionary of additional arguments to determine + the bind. May include "mapper", "bind", or other custom arguments. + Contents of this dictionary are passed to the + :meth:`.Session.get_bind` method. + + :return: a :class:`_engine.Result` object. + + + """ + return self._execute_internal( + statement, + params, + execution_options=execution_options, + bind_arguments=bind_arguments, + _parent_execute_state=_parent_execute_state, + _add_event=_add_event, + ) + + @overload + def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Optional[_T]: ... + + @overload + def scalar( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: ... + + def scalar( + self, + statement: Executable, + params: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: + """Execute a statement and return a scalar result. + + Usage and parameters are the same as that of + :meth:`_orm.Session.execute`; the return result is a scalar Python + value. + + """ + + return self._execute_internal( + statement, + params, + execution_options=execution_options, + bind_arguments=bind_arguments, + _scalar_result=True, + **kw, + ) + + @overload + def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[_T]: ... + + @overload + def scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: ... + + def scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: + """Execute a statement and return the results as scalars. + + Usage and parameters are the same as that of + :meth:`_orm.Session.execute`; the return result is a + :class:`_result.ScalarResult` filtering object which + will return single elements rather than :class:`_row.Row` objects. + + :return: a :class:`_result.ScalarResult` object + + .. versionadded:: 1.4.24 Added :meth:`_orm.Session.scalars` + + .. versionadded:: 1.4.26 Added :meth:`_orm.scoped_session.scalars` + + .. seealso:: + + :ref:`orm_queryguide_select_orm_entities` - contrasts the behavior + of :meth:`_orm.Session.execute` to :meth:`_orm.Session.scalars` + + """ + + return self._execute_internal( + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + _scalar_result=False, # mypy appreciates this + **kw, + ).scalars() + + def close(self) -> None: + """Close out the transactional resources and ORM objects used by this + :class:`_orm.Session`. + + This expunges all ORM objects associated with this + :class:`_orm.Session`, ends any transaction in progress and + :term:`releases` any :class:`_engine.Connection` objects which this + :class:`_orm.Session` itself has checked out from associated + :class:`_engine.Engine` objects. The operation then leaves the + :class:`_orm.Session` in a state which it may be used again. + + .. tip:: + + In the default running mode the :meth:`_orm.Session.close` + method **does not prevent the Session from being used again**. + The :class:`_orm.Session` itself does not actually have a + distinct "closed" state; it merely means + the :class:`_orm.Session` will release all database connections + and ORM objects. + + Setting the parameter :paramref:`_orm.Session.close_resets_only` + to ``False`` will instead make the ``close`` final, meaning that + any further action on the session will be forbidden. + + .. versionchanged:: 1.4 The :meth:`.Session.close` method does not + immediately create a new :class:`.SessionTransaction` object; + instead, the new :class:`.SessionTransaction` is created only if + the :class:`.Session` is used again for a database operation. + + .. seealso:: + + :ref:`session_closing` - detail on the semantics of + :meth:`_orm.Session.close` and :meth:`_orm.Session.reset`. + + :meth:`_orm.Session.reset` - a similar method that behaves like + ``close()`` with the parameter + :paramref:`_orm.Session.close_resets_only` set to ``True``. + + """ + self._close_impl(invalidate=False) + + def reset(self) -> None: + """Close out the transactional resources and ORM objects used by this + :class:`_orm.Session`, resetting the session to its initial state. + + This method provides for same "reset-only" behavior that the + :meth:`_orm.Session.close` method has provided historically, where the + state of the :class:`_orm.Session` is reset as though the object were + brand new, and ready to be used again. + This method may then be useful for :class:`_orm.Session` objects + which set :paramref:`_orm.Session.close_resets_only` to ``False``, + so that "reset only" behavior is still available. + + .. versionadded:: 2.0.22 + + .. seealso:: + + :ref:`session_closing` - detail on the semantics of + :meth:`_orm.Session.close` and :meth:`_orm.Session.reset`. + + :meth:`_orm.Session.close` - a similar method will additionally + prevent re-use of the Session when the parameter + :paramref:`_orm.Session.close_resets_only` is set to ``False``. + """ + self._close_impl(invalidate=False, is_reset=True) + + def invalidate(self) -> None: + """Close this Session, using connection invalidation. + + This is a variant of :meth:`.Session.close` that will additionally + ensure that the :meth:`_engine.Connection.invalidate` + method will be called on each :class:`_engine.Connection` object + that is currently in use for a transaction (typically there is only + one connection unless the :class:`_orm.Session` is used with + multiple engines). + + This can be called when the database is known to be in a state where + the connections are no longer safe to be used. + + Below illustrates a scenario when using `gevent + <https://www.gevent.org/>`_, which can produce ``Timeout`` exceptions + that may mean the underlying connection should be discarded:: + + import gevent + + try: + sess = Session() + sess.add(User()) + sess.commit() + except gevent.Timeout: + sess.invalidate() + raise + except: + sess.rollback() + raise + + The method additionally does everything that :meth:`_orm.Session.close` + does, including that all ORM objects are expunged. + + """ + self._close_impl(invalidate=True) + + def _close_impl(self, invalidate: bool, is_reset: bool = False) -> None: + if not is_reset and self._close_state is _SessionCloseState.ACTIVE: + self._close_state = _SessionCloseState.CLOSED + self.expunge_all() + if self._transaction is not None: + for transaction in self._transaction._iterate_self_and_parents(): + transaction.close(invalidate) + + def expunge_all(self) -> None: + """Remove all object instances from this ``Session``. + + This is equivalent to calling ``expunge(obj)`` on all objects in this + ``Session``. + + """ + + all_states = self.identity_map.all_states() + list(self._new) + self.identity_map._kill() + self.identity_map = identity.WeakInstanceDict() + self._new = {} + self._deleted = {} + + statelib.InstanceState._detach_states(all_states, self) + + def _add_bind(self, key: _SessionBindKey, bind: _SessionBind) -> None: + try: + insp = inspect(key) + except sa_exc.NoInspectionAvailable as err: + if not isinstance(key, type): + raise sa_exc.ArgumentError( + "Not an acceptable bind target: %s" % key + ) from err + else: + self.__binds[key] = bind + else: + if TYPE_CHECKING: + assert isinstance(insp, Inspectable) + + if isinstance(insp, TableClause): + self.__binds[insp] = bind + elif insp_is_mapper(insp): + self.__binds[insp.class_] = bind + for _selectable in insp._all_tables: + self.__binds[_selectable] = bind + else: + raise sa_exc.ArgumentError( + "Not an acceptable bind target: %s" % key + ) + + def bind_mapper( + self, mapper: _EntityBindKey[_O], bind: _SessionBind + ) -> None: + """Associate a :class:`_orm.Mapper` or arbitrary Python class with a + "bind", e.g. an :class:`_engine.Engine` or + :class:`_engine.Connection`. + + The given entity is added to a lookup used by the + :meth:`.Session.get_bind` method. + + :param mapper: a :class:`_orm.Mapper` object, + or an instance of a mapped + class, or any Python class that is the base of a set of mapped + classes. + + :param bind: an :class:`_engine.Engine` or :class:`_engine.Connection` + object. + + .. seealso:: + + :ref:`session_partitioning` + + :paramref:`.Session.binds` + + :meth:`.Session.bind_table` + + + """ + self._add_bind(mapper, bind) + + def bind_table(self, table: TableClause, bind: _SessionBind) -> None: + """Associate a :class:`_schema.Table` with a "bind", e.g. an + :class:`_engine.Engine` + or :class:`_engine.Connection`. + + The given :class:`_schema.Table` is added to a lookup used by the + :meth:`.Session.get_bind` method. + + :param table: a :class:`_schema.Table` object, + which is typically the target + of an ORM mapping, or is present within a selectable that is + mapped. + + :param bind: an :class:`_engine.Engine` or :class:`_engine.Connection` + object. + + .. seealso:: + + :ref:`session_partitioning` + + :paramref:`.Session.binds` + + :meth:`.Session.bind_mapper` + + + """ + self._add_bind(table, bind) + + def get_bind( + self, + mapper: Optional[_EntityBindKey[_O]] = None, + *, + clause: Optional[ClauseElement] = None, + bind: Optional[_SessionBind] = None, + _sa_skip_events: Optional[bool] = None, + _sa_skip_for_implicit_returning: bool = False, + **kw: Any, + ) -> Union[Engine, Connection]: + """Return a "bind" to which this :class:`.Session` is bound. + + The "bind" is usually an instance of :class:`_engine.Engine`, + except in the case where the :class:`.Session` has been + explicitly bound directly to a :class:`_engine.Connection`. + + For a multiply-bound or unbound :class:`.Session`, the + ``mapper`` or ``clause`` arguments are used to determine the + appropriate bind to return. + + Note that the "mapper" argument is usually present + when :meth:`.Session.get_bind` is called via an ORM + operation such as a :meth:`.Session.query`, each + individual INSERT/UPDATE/DELETE operation within a + :meth:`.Session.flush`, call, etc. + + The order of resolution is: + + 1. if mapper given and :paramref:`.Session.binds` is present, + locate a bind based first on the mapper in use, then + on the mapped class in use, then on any base classes that are + present in the ``__mro__`` of the mapped class, from more specific + superclasses to more general. + 2. if clause given and ``Session.binds`` is present, + locate a bind based on :class:`_schema.Table` objects + found in the given clause present in ``Session.binds``. + 3. if ``Session.binds`` is present, return that. + 4. if clause given, attempt to return a bind + linked to the :class:`_schema.MetaData` ultimately + associated with the clause. + 5. if mapper given, attempt to return a bind + linked to the :class:`_schema.MetaData` ultimately + associated with the :class:`_schema.Table` or other + selectable to which the mapper is mapped. + 6. No bind can be found, :exc:`~sqlalchemy.exc.UnboundExecutionError` + is raised. + + Note that the :meth:`.Session.get_bind` method can be overridden on + a user-defined subclass of :class:`.Session` to provide any kind + of bind resolution scheme. See the example at + :ref:`session_custom_partitioning`. + + :param mapper: + Optional mapped class or corresponding :class:`_orm.Mapper` instance. + The bind can be derived from a :class:`_orm.Mapper` first by + consulting the "binds" map associated with this :class:`.Session`, + and secondly by consulting the :class:`_schema.MetaData` associated + with the :class:`_schema.Table` to which the :class:`_orm.Mapper` is + mapped for a bind. + + :param clause: + A :class:`_expression.ClauseElement` (i.e. + :func:`_expression.select`, + :func:`_expression.text`, + etc.). If the ``mapper`` argument is not present or could not + produce a bind, the given expression construct will be searched + for a bound element, typically a :class:`_schema.Table` + associated with + bound :class:`_schema.MetaData`. + + .. seealso:: + + :ref:`session_partitioning` + + :paramref:`.Session.binds` + + :meth:`.Session.bind_mapper` + + :meth:`.Session.bind_table` + + """ + + # this function is documented as a subclassing hook, so we have + # to call this method even if the return is simple + if bind: + return bind + elif not self.__binds and self.bind: + # simplest and most common case, we have a bind and no + # per-mapper/table binds, we're done + return self.bind + + # we don't have self.bind and either have self.__binds + # or we don't have self.__binds (which is legacy). Look at the + # mapper and the clause + if mapper is None and clause is None: + if self.bind: + return self.bind + else: + raise sa_exc.UnboundExecutionError( + "This session is not bound to a single Engine or " + "Connection, and no context was provided to locate " + "a binding." + ) + + # look more closely at the mapper. + if mapper is not None: + try: + inspected_mapper = inspect(mapper) + except sa_exc.NoInspectionAvailable as err: + if isinstance(mapper, type): + raise exc.UnmappedClassError(mapper) from err + else: + raise + else: + inspected_mapper = None + + # match up the mapper or clause in the __binds + if self.__binds: + # matching mappers and selectables to entries in the + # binds dictionary; supported use case. + if inspected_mapper: + for cls in inspected_mapper.class_.__mro__: + if cls in self.__binds: + return self.__binds[cls] + if clause is None: + clause = inspected_mapper.persist_selectable + + if clause is not None: + plugin_subject = clause._propagate_attrs.get( + "plugin_subject", None + ) + + if plugin_subject is not None: + for cls in plugin_subject.mapper.class_.__mro__: + if cls in self.__binds: + return self.__binds[cls] + + for obj in visitors.iterate(clause): + if obj in self.__binds: + if TYPE_CHECKING: + assert isinstance(obj, Table) + return self.__binds[obj] + + # none of the __binds matched, but we have a fallback bind. + # return that + if self.bind: + return self.bind + + context = [] + if inspected_mapper is not None: + context.append(f"mapper {inspected_mapper}") + if clause is not None: + context.append("SQL expression") + + raise sa_exc.UnboundExecutionError( + f"Could not locate a bind configured on " + f'{", ".join(context)} or this Session.' + ) + + @overload + def query(self, _entity: _EntityType[_O]) -> Query[_O]: ... + + @overload + def query( + self, _colexpr: TypedColumnsClauseRole[_T] + ) -> RowReturningQuery[Tuple[_T]]: ... + + # START OVERLOADED FUNCTIONS self.query RowReturningQuery 2-8 + + # code within this block is **programmatically, + # statically generated** by tools/generate_tuple_map_overloads.py + + @overload + def query( + self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1] + ) -> RowReturningQuery[Tuple[_T0, _T1]]: ... + + @overload + def query( + self, __ent0: _TCCA[_T0], __ent1: _TCCA[_T1], __ent2: _TCCA[_T2] + ) -> RowReturningQuery[Tuple[_T0, _T1, _T2]]: ... + + @overload + def query( + self, + __ent0: _TCCA[_T0], + __ent1: _TCCA[_T1], + __ent2: _TCCA[_T2], + __ent3: _TCCA[_T3], + ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3]]: ... + + @overload + def query( + self, + __ent0: _TCCA[_T0], + __ent1: _TCCA[_T1], + __ent2: _TCCA[_T2], + __ent3: _TCCA[_T3], + __ent4: _TCCA[_T4], + ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4]]: ... + + @overload + def query( + self, + __ent0: _TCCA[_T0], + __ent1: _TCCA[_T1], + __ent2: _TCCA[_T2], + __ent3: _TCCA[_T3], + __ent4: _TCCA[_T4], + __ent5: _TCCA[_T5], + ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5]]: ... + + @overload + def query( + self, + __ent0: _TCCA[_T0], + __ent1: _TCCA[_T1], + __ent2: _TCCA[_T2], + __ent3: _TCCA[_T3], + __ent4: _TCCA[_T4], + __ent5: _TCCA[_T5], + __ent6: _TCCA[_T6], + ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6]]: ... + + @overload + def query( + self, + __ent0: _TCCA[_T0], + __ent1: _TCCA[_T1], + __ent2: _TCCA[_T2], + __ent3: _TCCA[_T3], + __ent4: _TCCA[_T4], + __ent5: _TCCA[_T5], + __ent6: _TCCA[_T6], + __ent7: _TCCA[_T7], + ) -> RowReturningQuery[Tuple[_T0, _T1, _T2, _T3, _T4, _T5, _T6, _T7]]: ... + + # END OVERLOADED FUNCTIONS self.query + + @overload + def query( + self, *entities: _ColumnsClauseArgument[Any], **kwargs: Any + ) -> Query[Any]: ... + + def query( + self, *entities: _ColumnsClauseArgument[Any], **kwargs: Any + ) -> Query[Any]: + """Return a new :class:`_query.Query` object corresponding to this + :class:`_orm.Session`. + + Note that the :class:`_query.Query` object is legacy as of + SQLAlchemy 2.0; the :func:`_sql.select` construct is now used + to construct ORM queries. + + .. seealso:: + + :ref:`unified_tutorial` + + :ref:`queryguide_toplevel` + + :ref:`query_api_toplevel` - legacy API doc + + """ + + return self._query_cls(entities, self, **kwargs) + + def _identity_lookup( + self, + mapper: Mapper[_O], + primary_key_identity: Union[Any, Tuple[Any, ...]], + identity_token: Any = None, + passive: PassiveFlag = PassiveFlag.PASSIVE_OFF, + lazy_loaded_from: Optional[InstanceState[Any]] = None, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + ) -> Union[Optional[_O], LoaderCallableStatus]: + """Locate an object in the identity map. + + Given a primary key identity, constructs an identity key and then + looks in the session's identity map. If present, the object may + be run through unexpiration rules (e.g. load unloaded attributes, + check if was deleted). + + e.g.:: + + obj = session._identity_lookup(inspect(SomeClass), (1, )) + + :param mapper: mapper in use + :param primary_key_identity: the primary key we are searching for, as + a tuple. + :param identity_token: identity token that should be used to create + the identity key. Used as is, however overriding subclasses can + repurpose this in order to interpret the value in a special way, + such as if None then look among multiple target tokens. + :param passive: passive load flag passed to + :func:`.loading.get_from_identity`, which impacts the behavior if + the object is found; the object may be validated and/or unexpired + if the flag allows for SQL to be emitted. + :param lazy_loaded_from: an :class:`.InstanceState` that is + specifically asking for this identity as a related identity. Used + for sharding schemes where there is a correspondence between an object + and a related object being lazy-loaded (or otherwise + relationship-loaded). + + :return: None if the object is not found in the identity map, *or* + if the object was unexpired and found to have been deleted. + if passive flags disallow SQL and the object is expired, returns + PASSIVE_NO_RESULT. In all other cases the instance is returned. + + .. versionchanged:: 1.4.0 - the :meth:`.Session._identity_lookup` + method was moved from :class:`_query.Query` to + :class:`.Session`, to avoid having to instantiate the + :class:`_query.Query` object. + + + """ + + key = mapper.identity_key_from_primary_key( + primary_key_identity, identity_token=identity_token + ) + + # work around: https://github.com/python/typing/discussions/1143 + return_value = loading.get_from_identity(self, mapper, key, passive) + return return_value + + @util.non_memoized_property + @contextlib.contextmanager + def no_autoflush(self) -> Iterator[Session]: + """Return a context manager that disables autoflush. + + e.g.:: + + with session.no_autoflush: + + some_object = SomeClass() + session.add(some_object) + # won't autoflush + some_object.related_thing = session.query(SomeRelated).first() + + Operations that proceed within the ``with:`` block + will not be subject to flushes occurring upon query + access. This is useful when initializing a series + of objects which involve existing database queries, + where the uncompleted object should not yet be flushed. + + """ + autoflush = self.autoflush + self.autoflush = False + try: + yield self + finally: + self.autoflush = autoflush + + @util.langhelpers.tag_method_for_warnings( + "This warning originated from the Session 'autoflush' process, " + "which was invoked automatically in response to a user-initiated " + "operation.", + sa_exc.SAWarning, + ) + def _autoflush(self) -> None: + if self.autoflush and not self._flushing: + try: + self.flush() + except sa_exc.StatementError as e: + # note we are reraising StatementError as opposed to + # raising FlushError with "chaining" to remain compatible + # with code that catches StatementError, IntegrityError, + # etc. + e.add_detail( + "raised as a result of Query-invoked autoflush; " + "consider using a session.no_autoflush block if this " + "flush is occurring prematurely" + ) + raise e.with_traceback(sys.exc_info()[2]) + + def refresh( + self, + instance: object, + attribute_names: Optional[Iterable[str]] = None, + with_for_update: ForUpdateParameter = None, + ) -> None: + """Expire and refresh attributes on the given instance. + + The selected attributes will first be expired as they would when using + :meth:`_orm.Session.expire`; then a SELECT statement will be issued to + the database to refresh column-oriented attributes with the current + value available in the current transaction. + + :func:`_orm.relationship` oriented attributes will also be immediately + loaded if they were already eagerly loaded on the object, using the + same eager loading strategy that they were loaded with originally. + + .. versionadded:: 1.4 - the :meth:`_orm.Session.refresh` method + can also refresh eagerly loaded attributes. + + :func:`_orm.relationship` oriented attributes that would normally + load using the ``select`` (or "lazy") loader strategy will also + load **if they are named explicitly in the attribute_names + collection**, emitting a SELECT statement for the attribute using the + ``immediate`` loader strategy. If lazy-loaded relationships are not + named in :paramref:`_orm.Session.refresh.attribute_names`, then + they remain as "lazy loaded" attributes and are not implicitly + refreshed. + + .. versionchanged:: 2.0.4 The :meth:`_orm.Session.refresh` method + will now refresh lazy-loaded :func:`_orm.relationship` oriented + attributes for those which are named explicitly in the + :paramref:`_orm.Session.refresh.attribute_names` collection. + + .. tip:: + + While the :meth:`_orm.Session.refresh` method is capable of + refreshing both column and relationship oriented attributes, its + primary focus is on refreshing of local column-oriented attributes + on a single instance. For more open ended "refresh" functionality, + including the ability to refresh the attributes on many objects at + once while having explicit control over relationship loader + strategies, use the + :ref:`populate existing <orm_queryguide_populate_existing>` feature + instead. + + Note that a highly isolated transaction will return the same values as + were previously read in that same transaction, regardless of changes + in database state outside of that transaction. Refreshing + attributes usually only makes sense at the start of a transaction + where database rows have not yet been accessed. + + :param attribute_names: optional. An iterable collection of + string attribute names indicating a subset of attributes to + be refreshed. + + :param with_for_update: optional boolean ``True`` indicating FOR UPDATE + should be used, or may be a dictionary containing flags to + indicate a more specific set of FOR UPDATE flags for the SELECT; + flags should match the parameters of + :meth:`_query.Query.with_for_update`. + Supersedes the :paramref:`.Session.refresh.lockmode` parameter. + + .. seealso:: + + :ref:`session_expire` - introductory material + + :meth:`.Session.expire` + + :meth:`.Session.expire_all` + + :ref:`orm_queryguide_populate_existing` - allows any ORM query + to refresh objects as they would be loaded normally. + + """ + try: + state = attributes.instance_state(instance) + except exc.NO_STATE as err: + raise exc.UnmappedInstanceError(instance) from err + + self._expire_state(state, attribute_names) + + # this autoflush previously used to occur as a secondary effect + # of the load_on_ident below. Meaning we'd organize the SELECT + # based on current DB pks, then flush, then if pks changed in that + # flush, crash. this was unticketed but discovered as part of + # #8703. So here, autoflush up front, dont autoflush inside + # load_on_ident. + self._autoflush() + + if with_for_update == {}: + raise sa_exc.ArgumentError( + "with_for_update should be the boolean value " + "True, or a dictionary with options. " + "A blank dictionary is ambiguous." + ) + + with_for_update = ForUpdateArg._from_argument(with_for_update) + + stmt: Select[Any] = sql.select(object_mapper(instance)) + if ( + loading.load_on_ident( + self, + stmt, + state.key, + refresh_state=state, + with_for_update=with_for_update, + only_load_props=attribute_names, + require_pk_cols=True, + # technically unnecessary as we just did autoflush + # above, however removes the additional unnecessary + # call to _autoflush() + no_autoflush=True, + is_user_refresh=True, + ) + is None + ): + raise sa_exc.InvalidRequestError( + "Could not refresh instance '%s'" % instance_str(instance) + ) + + def expire_all(self) -> None: + """Expires all persistent instances within this Session. + + When any attributes on a persistent instance is next accessed, + a query will be issued using the + :class:`.Session` object's current transactional context in order to + load all expired attributes for the given instance. Note that + a highly isolated transaction will return the same values as were + previously read in that same transaction, regardless of changes + in database state outside of that transaction. + + To expire individual objects and individual attributes + on those objects, use :meth:`Session.expire`. + + The :class:`.Session` object's default behavior is to + expire all state whenever the :meth:`Session.rollback` + or :meth:`Session.commit` methods are called, so that new + state can be loaded for the new transaction. For this reason, + calling :meth:`Session.expire_all` is not usually needed, + assuming the transaction is isolated. + + .. seealso:: + + :ref:`session_expire` - introductory material + + :meth:`.Session.expire` + + :meth:`.Session.refresh` + + :meth:`_orm.Query.populate_existing` + + """ + for state in self.identity_map.all_states(): + state._expire(state.dict, self.identity_map._modified) + + def expire( + self, instance: object, attribute_names: Optional[Iterable[str]] = None + ) -> None: + """Expire the attributes on an instance. + + Marks the attributes of an instance as out of date. When an expired + attribute is next accessed, a query will be issued to the + :class:`.Session` object's current transactional context in order to + load all expired attributes for the given instance. Note that + a highly isolated transaction will return the same values as were + previously read in that same transaction, regardless of changes + in database state outside of that transaction. + + To expire all objects in the :class:`.Session` simultaneously, + use :meth:`Session.expire_all`. + + The :class:`.Session` object's default behavior is to + expire all state whenever the :meth:`Session.rollback` + or :meth:`Session.commit` methods are called, so that new + state can be loaded for the new transaction. For this reason, + calling :meth:`Session.expire` only makes sense for the specific + case that a non-ORM SQL statement was emitted in the current + transaction. + + :param instance: The instance to be refreshed. + :param attribute_names: optional list of string attribute names + indicating a subset of attributes to be expired. + + .. seealso:: + + :ref:`session_expire` - introductory material + + :meth:`.Session.expire` + + :meth:`.Session.refresh` + + :meth:`_orm.Query.populate_existing` + + """ + try: + state = attributes.instance_state(instance) + except exc.NO_STATE as err: + raise exc.UnmappedInstanceError(instance) from err + self._expire_state(state, attribute_names) + + def _expire_state( + self, + state: InstanceState[Any], + attribute_names: Optional[Iterable[str]], + ) -> None: + self._validate_persistent(state) + if attribute_names: + state._expire_attributes(state.dict, attribute_names) + else: + # pre-fetch the full cascade since the expire is going to + # remove associations + cascaded = list( + state.manager.mapper.cascade_iterator("refresh-expire", state) + ) + self._conditional_expire(state) + for o, m, st_, dct_ in cascaded: + self._conditional_expire(st_) + + def _conditional_expire( + self, state: InstanceState[Any], autoflush: Optional[bool] = None + ) -> None: + """Expire a state if persistent, else expunge if pending""" + + if state.key: + state._expire(state.dict, self.identity_map._modified) + elif state in self._new: + self._new.pop(state) + state._detach(self) + + def expunge(self, instance: object) -> None: + """Remove the `instance` from this ``Session``. + + This will free all internal references to the instance. Cascading + will be applied according to the *expunge* cascade rule. + + """ + try: + state = attributes.instance_state(instance) + except exc.NO_STATE as err: + raise exc.UnmappedInstanceError(instance) from err + if state.session_id is not self.hash_key: + raise sa_exc.InvalidRequestError( + "Instance %s is not present in this Session" % state_str(state) + ) + + cascaded = list( + state.manager.mapper.cascade_iterator("expunge", state) + ) + self._expunge_states([state] + [st_ for o, m, st_, dct_ in cascaded]) + + def _expunge_states( + self, states: Iterable[InstanceState[Any]], to_transient: bool = False + ) -> None: + for state in states: + if state in self._new: + self._new.pop(state) + elif self.identity_map.contains_state(state): + self.identity_map.safe_discard(state) + self._deleted.pop(state, None) + elif self._transaction: + # state is "detached" from being deleted, but still present + # in the transaction snapshot + self._transaction._deleted.pop(state, None) + statelib.InstanceState._detach_states( + states, self, to_transient=to_transient + ) + + def _register_persistent(self, states: Set[InstanceState[Any]]) -> None: + """Register all persistent objects from a flush. + + This is used both for pending objects moving to the persistent + state as well as already persistent objects. + + """ + + pending_to_persistent = self.dispatch.pending_to_persistent or None + for state in states: + mapper = _state_mapper(state) + + # prevent against last minute dereferences of the object + obj = state.obj() + if obj is not None: + instance_key = mapper._identity_key_from_state(state) + + if ( + _none_set.intersection(instance_key[1]) + and not mapper.allow_partial_pks + or _none_set.issuperset(instance_key[1]) + ): + raise exc.FlushError( + "Instance %s has a NULL identity key. If this is an " + "auto-generated value, check that the database table " + "allows generation of new primary key values, and " + "that the mapped Column object is configured to " + "expect these generated values. Ensure also that " + "this flush() is not occurring at an inappropriate " + "time, such as within a load() event." + % state_str(state) + ) + + if state.key is None: + state.key = instance_key + elif state.key != instance_key: + # primary key switch. use safe_discard() in case another + # state has already replaced this one in the identity + # map (see test/orm/test_naturalpks.py ReversePKsTest) + self.identity_map.safe_discard(state) + trans = self._transaction + assert trans is not None + if state in trans._key_switches: + orig_key = trans._key_switches[state][0] + else: + orig_key = state.key + trans._key_switches[state] = ( + orig_key, + instance_key, + ) + state.key = instance_key + + # there can be an existing state in the identity map + # that is replaced when the primary keys of two instances + # are swapped; see test/orm/test_naturalpks.py -> test_reverse + old = self.identity_map.replace(state) + if ( + old is not None + and mapper._identity_key_from_state(old) == instance_key + and old.obj() is not None + ): + util.warn( + "Identity map already had an identity for %s, " + "replacing it with newly flushed object. Are there " + "load operations occurring inside of an event handler " + "within the flush?" % (instance_key,) + ) + state._orphaned_outside_of_session = False + + statelib.InstanceState._commit_all_states( + ((state, state.dict) for state in states), self.identity_map + ) + + self._register_altered(states) + + if pending_to_persistent is not None: + for state in states.intersection(self._new): + pending_to_persistent(self, state) + + # remove from new last, might be the last strong ref + for state in set(states).intersection(self._new): + self._new.pop(state) + + def _register_altered(self, states: Iterable[InstanceState[Any]]) -> None: + if self._transaction: + for state in states: + if state in self._new: + self._transaction._new[state] = True + else: + self._transaction._dirty[state] = True + + def _remove_newly_deleted( + self, states: Iterable[InstanceState[Any]] + ) -> None: + persistent_to_deleted = self.dispatch.persistent_to_deleted or None + for state in states: + if self._transaction: + self._transaction._deleted[state] = True + + if persistent_to_deleted is not None: + # get a strong reference before we pop out of + # self._deleted + obj = state.obj() # noqa + + self.identity_map.safe_discard(state) + self._deleted.pop(state, None) + state._deleted = True + # can't call state._detach() here, because this state + # is still in the transaction snapshot and needs to be + # tracked as part of that + if persistent_to_deleted is not None: + persistent_to_deleted(self, state) + + def add(self, instance: object, _warn: bool = True) -> None: + """Place an object into this :class:`_orm.Session`. + + Objects that are in the :term:`transient` state when passed to the + :meth:`_orm.Session.add` method will move to the + :term:`pending` state, until the next flush, at which point they + will move to the :term:`persistent` state. + + Objects that are in the :term:`detached` state when passed to the + :meth:`_orm.Session.add` method will move to the :term:`persistent` + state directly. + + If the transaction used by the :class:`_orm.Session` is rolled back, + objects which were transient when they were passed to + :meth:`_orm.Session.add` will be moved back to the + :term:`transient` state, and will no longer be present within this + :class:`_orm.Session`. + + .. seealso:: + + :meth:`_orm.Session.add_all` + + :ref:`session_adding` - at :ref:`session_basics` + + """ + if _warn and self._warn_on_events: + self._flush_warning("Session.add()") + + try: + state = attributes.instance_state(instance) + except exc.NO_STATE as err: + raise exc.UnmappedInstanceError(instance) from err + + self._save_or_update_state(state) + + def add_all(self, instances: Iterable[object]) -> None: + """Add the given collection of instances to this :class:`_orm.Session`. + + See the documentation for :meth:`_orm.Session.add` for a general + behavioral description. + + .. seealso:: + + :meth:`_orm.Session.add` + + :ref:`session_adding` - at :ref:`session_basics` + + """ + + if self._warn_on_events: + self._flush_warning("Session.add_all()") + + for instance in instances: + self.add(instance, _warn=False) + + def _save_or_update_state(self, state: InstanceState[Any]) -> None: + state._orphaned_outside_of_session = False + self._save_or_update_impl(state) + + mapper = _state_mapper(state) + for o, m, st_, dct_ in mapper.cascade_iterator( + "save-update", state, halt_on=self._contains_state + ): + self._save_or_update_impl(st_) + + def delete(self, instance: object) -> None: + """Mark an instance as deleted. + + The object is assumed to be either :term:`persistent` or + :term:`detached` when passed; after the method is called, the + object will remain in the :term:`persistent` state until the next + flush proceeds. During this time, the object will also be a member + of the :attr:`_orm.Session.deleted` collection. + + When the next flush proceeds, the object will move to the + :term:`deleted` state, indicating a ``DELETE`` statement was emitted + for its row within the current transaction. When the transaction + is successfully committed, + the deleted object is moved to the :term:`detached` state and is + no longer present within this :class:`_orm.Session`. + + .. seealso:: + + :ref:`session_deleting` - at :ref:`session_basics` + + """ + if self._warn_on_events: + self._flush_warning("Session.delete()") + + try: + state = attributes.instance_state(instance) + except exc.NO_STATE as err: + raise exc.UnmappedInstanceError(instance) from err + + self._delete_impl(state, instance, head=True) + + def _delete_impl( + self, state: InstanceState[Any], obj: object, head: bool + ) -> None: + if state.key is None: + if head: + raise sa_exc.InvalidRequestError( + "Instance '%s' is not persisted" % state_str(state) + ) + else: + return + + to_attach = self._before_attach(state, obj) + + if state in self._deleted: + return + + self.identity_map.add(state) + + if to_attach: + self._after_attach(state, obj) + + if head: + # grab the cascades before adding the item to the deleted list + # so that autoflush does not delete the item + # the strong reference to the instance itself is significant here + cascade_states = list( + state.manager.mapper.cascade_iterator("delete", state) + ) + else: + cascade_states = None + + self._deleted[state] = obj + + if head: + if TYPE_CHECKING: + assert cascade_states is not None + for o, m, st_, dct_ in cascade_states: + self._delete_impl(st_, o, False) + + def get( + self, + entity: _EntityBindKey[_O], + ident: _PKIdentityArgument, + *, + options: Optional[Sequence[ORMOption]] = None, + populate_existing: bool = False, + with_for_update: ForUpdateParameter = None, + identity_token: Optional[Any] = None, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + ) -> Optional[_O]: + """Return an instance based on the given primary key identifier, + or ``None`` if not found. + + E.g.:: + + my_user = session.get(User, 5) + + some_object = session.get(VersionedFoo, (5, 10)) + + some_object = session.get( + VersionedFoo, + {"id": 5, "version_id": 10} + ) + + .. versionadded:: 1.4 Added :meth:`_orm.Session.get`, which is moved + from the now legacy :meth:`_orm.Query.get` method. + + :meth:`_orm.Session.get` is special in that it provides direct + access to the identity map of the :class:`.Session`. + If the given primary key identifier is present + in the local identity map, the object is returned + directly from this collection and no SQL is emitted, + unless the object has been marked fully expired. + If not present, + a SELECT is performed in order to locate the object. + + :meth:`_orm.Session.get` also will perform a check if + the object is present in the identity map and + marked as expired - a SELECT + is emitted to refresh the object as well as to + ensure that the row is still present. + If not, :class:`~sqlalchemy.orm.exc.ObjectDeletedError` is raised. + + :param entity: a mapped class or :class:`.Mapper` indicating the + type of entity to be loaded. + + :param ident: A scalar, tuple, or dictionary representing the + primary key. For a composite (e.g. multiple column) primary key, + a tuple or dictionary should be passed. + + For a single-column primary key, the scalar calling form is typically + the most expedient. If the primary key of a row is the value "5", + the call looks like:: + + my_object = session.get(SomeClass, 5) + + The tuple form contains primary key values typically in + the order in which they correspond to the mapped + :class:`_schema.Table` + object's primary key columns, or if the + :paramref:`_orm.Mapper.primary_key` configuration parameter were + used, in + the order used for that parameter. For example, if the primary key + of a row is represented by the integer + digits "5, 10" the call would look like:: + + my_object = session.get(SomeClass, (5, 10)) + + The dictionary form should include as keys the mapped attribute names + corresponding to each element of the primary key. If the mapped class + has the attributes ``id``, ``version_id`` as the attributes which + store the object's primary key value, the call would look like:: + + my_object = session.get(SomeClass, {"id": 5, "version_id": 10}) + + :param options: optional sequence of loader options which will be + applied to the query, if one is emitted. + + :param populate_existing: causes the method to unconditionally emit + a SQL query and refresh the object with the newly loaded data, + regardless of whether or not the object is already present. + + :param with_for_update: optional boolean ``True`` indicating FOR UPDATE + should be used, or may be a dictionary containing flags to + indicate a more specific set of FOR UPDATE flags for the SELECT; + flags should match the parameters of + :meth:`_query.Query.with_for_update`. + Supersedes the :paramref:`.Session.refresh.lockmode` parameter. + + :param execution_options: optional dictionary of execution options, + which will be associated with the query execution if one is emitted. + This dictionary can provide a subset of the options that are + accepted by :meth:`_engine.Connection.execution_options`, and may + also provide additional options understood only in an ORM context. + + .. versionadded:: 1.4.29 + + .. seealso:: + + :ref:`orm_queryguide_execution_options` - ORM-specific execution + options + + :param bind_arguments: dictionary of additional arguments to determine + the bind. May include "mapper", "bind", or other custom arguments. + Contents of this dictionary are passed to the + :meth:`.Session.get_bind` method. + + .. versionadded: 2.0.0rc1 + + :return: The object instance, or ``None``. + + """ + return self._get_impl( + entity, + ident, + loading.load_on_pk_identity, + options=options, + populate_existing=populate_existing, + with_for_update=with_for_update, + identity_token=identity_token, + execution_options=execution_options, + bind_arguments=bind_arguments, + ) + + def get_one( + self, + entity: _EntityBindKey[_O], + ident: _PKIdentityArgument, + *, + options: Optional[Sequence[ORMOption]] = None, + populate_existing: bool = False, + with_for_update: ForUpdateParameter = None, + identity_token: Optional[Any] = None, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + ) -> _O: + """Return exactly one instance based on the given primary key + identifier, or raise an exception if not found. + + Raises ``sqlalchemy.orm.exc.NoResultFound`` if the query + selects no rows. + + For a detailed documentation of the arguments see the + method :meth:`.Session.get`. + + .. versionadded:: 2.0.22 + + :return: The object instance. + + .. seealso:: + + :meth:`.Session.get` - equivalent method that instead + returns ``None`` if no row was found with the provided primary + key + + """ + + instance = self.get( + entity, + ident, + options=options, + populate_existing=populate_existing, + with_for_update=with_for_update, + identity_token=identity_token, + execution_options=execution_options, + bind_arguments=bind_arguments, + ) + + if instance is None: + raise sa_exc.NoResultFound( + "No row was found when one was required" + ) + + return instance + + def _get_impl( + self, + entity: _EntityBindKey[_O], + primary_key_identity: _PKIdentityArgument, + db_load_fn: Callable[..., _O], + *, + options: Optional[Sequence[ExecutableOption]] = None, + populate_existing: bool = False, + with_for_update: ForUpdateParameter = None, + identity_token: Optional[Any] = None, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + ) -> Optional[_O]: + # convert composite types to individual args + if ( + is_composite_class(primary_key_identity) + and type(primary_key_identity) + in descriptor_props._composite_getters + ): + getter = descriptor_props._composite_getters[ + type(primary_key_identity) + ] + primary_key_identity = getter(primary_key_identity) + + mapper: Optional[Mapper[_O]] = inspect(entity) + + if mapper is None or not mapper.is_mapper: + raise sa_exc.ArgumentError( + "Expected mapped class or mapper, got: %r" % entity + ) + + is_dict = isinstance(primary_key_identity, dict) + if not is_dict: + primary_key_identity = util.to_list( + primary_key_identity, default=[None] + ) + + if len(primary_key_identity) != len(mapper.primary_key): + raise sa_exc.InvalidRequestError( + "Incorrect number of values in identifier to formulate " + "primary key for session.get(); primary key columns " + "are %s" % ",".join("'%s'" % c for c in mapper.primary_key) + ) + + if is_dict: + pk_synonyms = mapper._pk_synonyms + + if pk_synonyms: + correct_keys = set(pk_synonyms).intersection( + primary_key_identity + ) + + if correct_keys: + primary_key_identity = dict(primary_key_identity) + for k in correct_keys: + primary_key_identity[pk_synonyms[k]] = ( + primary_key_identity[k] + ) + + try: + primary_key_identity = list( + primary_key_identity[prop.key] + for prop in mapper._identity_key_props + ) + + except KeyError as err: + raise sa_exc.InvalidRequestError( + "Incorrect names of values in identifier to formulate " + "primary key for session.get(); primary key attribute " + "names are %s (synonym names are also accepted)" + % ",".join( + "'%s'" % prop.key + for prop in mapper._identity_key_props + ) + ) from err + + if ( + not populate_existing + and not mapper.always_refresh + and with_for_update is None + ): + instance = self._identity_lookup( + mapper, + primary_key_identity, + identity_token=identity_token, + execution_options=execution_options, + bind_arguments=bind_arguments, + ) + + if instance is not None: + # reject calls for id in identity map but class + # mismatch. + if not isinstance(instance, mapper.class_): + return None + return instance + + # TODO: this was being tested before, but this is not possible + assert instance is not LoaderCallableStatus.PASSIVE_CLASS_MISMATCH + + # set_label_style() not strictly necessary, however this will ensure + # that tablename_colname style is used which at the moment is + # asserted in a lot of unit tests :) + + load_options = context.QueryContext.default_load_options + + if populate_existing: + load_options += {"_populate_existing": populate_existing} + statement = sql.select(mapper).set_label_style( + LABEL_STYLE_TABLENAME_PLUS_COL + ) + if with_for_update is not None: + statement._for_update_arg = ForUpdateArg._from_argument( + with_for_update + ) + + if options: + statement = statement.options(*options) + return db_load_fn( + self, + statement, + primary_key_identity, + load_options=load_options, + identity_token=identity_token, + execution_options=execution_options, + bind_arguments=bind_arguments, + ) + + def merge( + self, + instance: _O, + *, + load: bool = True, + options: Optional[Sequence[ORMOption]] = None, + ) -> _O: + """Copy the state of a given instance into a corresponding instance + within this :class:`.Session`. + + :meth:`.Session.merge` examines the primary key attributes of the + source instance, and attempts to reconcile it with an instance of the + same primary key in the session. If not found locally, it attempts + to load the object from the database based on primary key, and if + none can be located, creates a new instance. The state of each + attribute on the source instance is then copied to the target + instance. The resulting target instance is then returned by the + method; the original source instance is left unmodified, and + un-associated with the :class:`.Session` if not already. + + This operation cascades to associated instances if the association is + mapped with ``cascade="merge"``. + + See :ref:`unitofwork_merging` for a detailed discussion of merging. + + :param instance: Instance to be merged. + :param load: Boolean, when False, :meth:`.merge` switches into + a "high performance" mode which causes it to forego emitting history + events as well as all database access. This flag is used for + cases such as transferring graphs of objects into a :class:`.Session` + from a second level cache, or to transfer just-loaded objects + into the :class:`.Session` owned by a worker thread or process + without re-querying the database. + + The ``load=False`` use case adds the caveat that the given + object has to be in a "clean" state, that is, has no pending changes + to be flushed - even if the incoming object is detached from any + :class:`.Session`. This is so that when + the merge operation populates local attributes and + cascades to related objects and + collections, the values can be "stamped" onto the + target object as is, without generating any history or attribute + events, and without the need to reconcile the incoming data with + any existing related objects or collections that might not + be loaded. The resulting objects from ``load=False`` are always + produced as "clean", so it is only appropriate that the given objects + should be "clean" as well, else this suggests a mis-use of the + method. + :param options: optional sequence of loader options which will be + applied to the :meth:`_orm.Session.get` method when the merge + operation loads the existing version of the object from the database. + + .. versionadded:: 1.4.24 + + + .. seealso:: + + :func:`.make_transient_to_detached` - provides for an alternative + means of "merging" a single object into the :class:`.Session` + + """ + + if self._warn_on_events: + self._flush_warning("Session.merge()") + + _recursive: Dict[InstanceState[Any], object] = {} + _resolve_conflict_map: Dict[_IdentityKeyType[Any], object] = {} + + if load: + # flush current contents if we expect to load data + self._autoflush() + + object_mapper(instance) # verify mapped + autoflush = self.autoflush + try: + self.autoflush = False + return self._merge( + attributes.instance_state(instance), + attributes.instance_dict(instance), + load=load, + options=options, + _recursive=_recursive, + _resolve_conflict_map=_resolve_conflict_map, + ) + finally: + self.autoflush = autoflush + + def _merge( + self, + state: InstanceState[_O], + state_dict: _InstanceDict, + *, + options: Optional[Sequence[ORMOption]] = None, + load: bool, + _recursive: Dict[Any, object], + _resolve_conflict_map: Dict[_IdentityKeyType[Any], object], + ) -> _O: + mapper: Mapper[_O] = _state_mapper(state) + if state in _recursive: + return cast(_O, _recursive[state]) + + new_instance = False + key = state.key + + merged: Optional[_O] + + if key is None: + if state in self._new: + util.warn( + "Instance %s is already pending in this Session yet is " + "being merged again; this is probably not what you want " + "to do" % state_str(state) + ) + + if not load: + raise sa_exc.InvalidRequestError( + "merge() with load=False option does not support " + "objects transient (i.e. unpersisted) objects. flush() " + "all changes on mapped instances before merging with " + "load=False." + ) + key = mapper._identity_key_from_state(state) + key_is_persistent = LoaderCallableStatus.NEVER_SET not in key[ + 1 + ] and ( + not _none_set.intersection(key[1]) + or ( + mapper.allow_partial_pks + and not _none_set.issuperset(key[1]) + ) + ) + else: + key_is_persistent = True + + if key in self.identity_map: + try: + merged = self.identity_map[key] + except KeyError: + # object was GC'ed right as we checked for it + merged = None + else: + merged = None + + if merged is None: + if key_is_persistent and key in _resolve_conflict_map: + merged = cast(_O, _resolve_conflict_map[key]) + + elif not load: + if state.modified: + raise sa_exc.InvalidRequestError( + "merge() with load=False option does not support " + "objects marked as 'dirty'. flush() all changes on " + "mapped instances before merging with load=False." + ) + merged = mapper.class_manager.new_instance() + merged_state = attributes.instance_state(merged) + merged_state.key = key + self._update_impl(merged_state) + new_instance = True + + elif key_is_persistent: + merged = self.get( + mapper.class_, + key[1], + identity_token=key[2], + options=options, + ) + + if merged is None: + merged = mapper.class_manager.new_instance() + merged_state = attributes.instance_state(merged) + merged_dict = attributes.instance_dict(merged) + new_instance = True + self._save_or_update_state(merged_state) + else: + merged_state = attributes.instance_state(merged) + merged_dict = attributes.instance_dict(merged) + + _recursive[state] = merged + _resolve_conflict_map[key] = merged + + # check that we didn't just pull the exact same + # state out. + if state is not merged_state: + # version check if applicable + if mapper.version_id_col is not None: + existing_version = mapper._get_state_attr_by_column( + state, + state_dict, + mapper.version_id_col, + passive=PassiveFlag.PASSIVE_NO_INITIALIZE, + ) + + merged_version = mapper._get_state_attr_by_column( + merged_state, + merged_dict, + mapper.version_id_col, + passive=PassiveFlag.PASSIVE_NO_INITIALIZE, + ) + + if ( + existing_version + is not LoaderCallableStatus.PASSIVE_NO_RESULT + and merged_version + is not LoaderCallableStatus.PASSIVE_NO_RESULT + and existing_version != merged_version + ): + raise exc.StaleDataError( + "Version id '%s' on merged state %s " + "does not match existing version '%s'. " + "Leave the version attribute unset when " + "merging to update the most recent version." + % ( + existing_version, + state_str(merged_state), + merged_version, + ) + ) + + merged_state.load_path = state.load_path + merged_state.load_options = state.load_options + + # since we are copying load_options, we need to copy + # the callables_ that would have been generated by those + # load_options. + # assumes that the callables we put in state.callables_ + # are not instance-specific (which they should not be) + merged_state._copy_callables(state) + + for prop in mapper.iterate_properties: + prop.merge( + self, + state, + state_dict, + merged_state, + merged_dict, + load, + _recursive, + _resolve_conflict_map, + ) + + if not load: + # remove any history + merged_state._commit_all(merged_dict, self.identity_map) + merged_state.manager.dispatch._sa_event_merge_wo_load( + merged_state, None + ) + + if new_instance: + merged_state.manager.dispatch.load(merged_state, None) + + return merged + + def _validate_persistent(self, state: InstanceState[Any]) -> None: + if not self.identity_map.contains_state(state): + raise sa_exc.InvalidRequestError( + "Instance '%s' is not persistent within this Session" + % state_str(state) + ) + + def _save_impl(self, state: InstanceState[Any]) -> None: + if state.key is not None: + raise sa_exc.InvalidRequestError( + "Object '%s' already has an identity - " + "it can't be registered as pending" % state_str(state) + ) + + obj = state.obj() + to_attach = self._before_attach(state, obj) + if state not in self._new: + self._new[state] = obj + state.insert_order = len(self._new) + if to_attach: + self._after_attach(state, obj) + + def _update_impl( + self, state: InstanceState[Any], revert_deletion: bool = False + ) -> None: + if state.key is None: + raise sa_exc.InvalidRequestError( + "Instance '%s' is not persisted" % state_str(state) + ) + + if state._deleted: + if revert_deletion: + if not state._attached: + return + del state._deleted + else: + raise sa_exc.InvalidRequestError( + "Instance '%s' has been deleted. " + "Use the make_transient() " + "function to send this object back " + "to the transient state." % state_str(state) + ) + + obj = state.obj() + + # check for late gc + if obj is None: + return + + to_attach = self._before_attach(state, obj) + + self._deleted.pop(state, None) + if revert_deletion: + self.identity_map.replace(state) + else: + self.identity_map.add(state) + + if to_attach: + self._after_attach(state, obj) + elif revert_deletion: + self.dispatch.deleted_to_persistent(self, state) + + def _save_or_update_impl(self, state: InstanceState[Any]) -> None: + if state.key is None: + self._save_impl(state) + else: + self._update_impl(state) + + def enable_relationship_loading(self, obj: object) -> None: + """Associate an object with this :class:`.Session` for related + object loading. + + .. warning:: + + :meth:`.enable_relationship_loading` exists to serve special + use cases and is not recommended for general use. + + Accesses of attributes mapped with :func:`_orm.relationship` + will attempt to load a value from the database using this + :class:`.Session` as the source of connectivity. The values + will be loaded based on foreign key and primary key values + present on this object - if not present, then those relationships + will be unavailable. + + The object will be attached to this session, but will + **not** participate in any persistence operations; its state + for almost all purposes will remain either "transient" or + "detached", except for the case of relationship loading. + + Also note that backrefs will often not work as expected. + Altering a relationship-bound attribute on the target object + may not fire off a backref event, if the effective value + is what was already loaded from a foreign-key-holding value. + + The :meth:`.Session.enable_relationship_loading` method is + similar to the ``load_on_pending`` flag on :func:`_orm.relationship`. + Unlike that flag, :meth:`.Session.enable_relationship_loading` allows + an object to remain transient while still being able to load + related items. + + To make a transient object associated with a :class:`.Session` + via :meth:`.Session.enable_relationship_loading` pending, add + it to the :class:`.Session` using :meth:`.Session.add` normally. + If the object instead represents an existing identity in the database, + it should be merged using :meth:`.Session.merge`. + + :meth:`.Session.enable_relationship_loading` does not improve + behavior when the ORM is used normally - object references should be + constructed at the object level, not at the foreign key level, so + that they are present in an ordinary way before flush() + proceeds. This method is not intended for general use. + + .. seealso:: + + :paramref:`_orm.relationship.load_on_pending` - this flag + allows per-relationship loading of many-to-ones on items that + are pending. + + :func:`.make_transient_to_detached` - allows for an object to + be added to a :class:`.Session` without SQL emitted, which then + will unexpire attributes on access. + + """ + try: + state = attributes.instance_state(obj) + except exc.NO_STATE as err: + raise exc.UnmappedInstanceError(obj) from err + + to_attach = self._before_attach(state, obj) + state._load_pending = True + if to_attach: + self._after_attach(state, obj) + + def _before_attach(self, state: InstanceState[Any], obj: object) -> bool: + self._autobegin_t() + + if state.session_id == self.hash_key: + return False + + if state.session_id and state.session_id in _sessions: + raise sa_exc.InvalidRequestError( + "Object '%s' is already attached to session '%s' " + "(this is '%s')" + % (state_str(state), state.session_id, self.hash_key) + ) + + self.dispatch.before_attach(self, state) + + return True + + def _after_attach(self, state: InstanceState[Any], obj: object) -> None: + state.session_id = self.hash_key + if state.modified and state._strong_obj is None: + state._strong_obj = obj + self.dispatch.after_attach(self, state) + + if state.key: + self.dispatch.detached_to_persistent(self, state) + else: + self.dispatch.transient_to_pending(self, state) + + def __contains__(self, instance: object) -> bool: + """Return True if the instance is associated with this session. + + The instance may be pending or persistent within the Session for a + result of True. + + """ + try: + state = attributes.instance_state(instance) + except exc.NO_STATE as err: + raise exc.UnmappedInstanceError(instance) from err + return self._contains_state(state) + + def __iter__(self) -> Iterator[object]: + """Iterate over all pending or persistent instances within this + Session. + + """ + return iter( + list(self._new.values()) + list(self.identity_map.values()) + ) + + def _contains_state(self, state: InstanceState[Any]) -> bool: + return state in self._new or self.identity_map.contains_state(state) + + def flush(self, objects: Optional[Sequence[Any]] = None) -> None: + """Flush all the object changes to the database. + + Writes out all pending object creations, deletions and modifications + to the database as INSERTs, DELETEs, UPDATEs, etc. Operations are + automatically ordered by the Session's unit of work dependency + solver. + + Database operations will be issued in the current transactional + context and do not affect the state of the transaction, unless an + error occurs, in which case the entire transaction is rolled back. + You may flush() as often as you like within a transaction to move + changes from Python to the database's transaction buffer. + + :param objects: Optional; restricts the flush operation to operate + only on elements that are in the given collection. + + This feature is for an extremely narrow set of use cases where + particular objects may need to be operated upon before the + full flush() occurs. It is not intended for general use. + + """ + + if self._flushing: + raise sa_exc.InvalidRequestError("Session is already flushing") + + if self._is_clean(): + return + try: + self._flushing = True + self._flush(objects) + finally: + self._flushing = False + + def _flush_warning(self, method: Any) -> None: + util.warn( + "Usage of the '%s' operation is not currently supported " + "within the execution stage of the flush process. " + "Results may not be consistent. Consider using alternative " + "event listeners or connection-level operations instead." % method + ) + + def _is_clean(self) -> bool: + return ( + not self.identity_map.check_modified() + and not self._deleted + and not self._new + ) + + def _flush(self, objects: Optional[Sequence[object]] = None) -> None: + dirty = self._dirty_states + if not dirty and not self._deleted and not self._new: + self.identity_map._modified.clear() + return + + flush_context = UOWTransaction(self) + + if self.dispatch.before_flush: + self.dispatch.before_flush(self, flush_context, objects) + # re-establish "dirty states" in case the listeners + # added + dirty = self._dirty_states + + deleted = set(self._deleted) + new = set(self._new) + + dirty = set(dirty).difference(deleted) + + # create the set of all objects we want to operate upon + if objects: + # specific list passed in + objset = set() + for o in objects: + try: + state = attributes.instance_state(o) + + except exc.NO_STATE as err: + raise exc.UnmappedInstanceError(o) from err + objset.add(state) + else: + objset = None + + # store objects whose fate has been decided + processed = set() + + # put all saves/updates into the flush context. detect top-level + # orphans and throw them into deleted. + if objset: + proc = new.union(dirty).intersection(objset).difference(deleted) + else: + proc = new.union(dirty).difference(deleted) + + for state in proc: + is_orphan = _state_mapper(state)._is_orphan(state) + + is_persistent_orphan = is_orphan and state.has_identity + + if ( + is_orphan + and not is_persistent_orphan + and state._orphaned_outside_of_session + ): + self._expunge_states([state]) + else: + _reg = flush_context.register_object( + state, isdelete=is_persistent_orphan + ) + assert _reg, "Failed to add object to the flush context!" + processed.add(state) + + # put all remaining deletes into the flush context. + if objset: + proc = deleted.intersection(objset).difference(processed) + else: + proc = deleted.difference(processed) + for state in proc: + _reg = flush_context.register_object(state, isdelete=True) + assert _reg, "Failed to add object to the flush context!" + + if not flush_context.has_work: + return + + flush_context.transaction = transaction = self._autobegin_t()._begin() + try: + self._warn_on_events = True + try: + flush_context.execute() + finally: + self._warn_on_events = False + + self.dispatch.after_flush(self, flush_context) + + flush_context.finalize_flush_changes() + + if not objects and self.identity_map._modified: + len_ = len(self.identity_map._modified) + + statelib.InstanceState._commit_all_states( + [ + (state, state.dict) + for state in self.identity_map._modified + ], + instance_dict=self.identity_map, + ) + util.warn( + "Attribute history events accumulated on %d " + "previously clean instances " + "within inner-flush event handlers have been " + "reset, and will not result in database updates. " + "Consider using set_committed_value() within " + "inner-flush event handlers to avoid this warning." % len_ + ) + + # useful assertions: + # if not objects: + # assert not self.identity_map._modified + # else: + # assert self.identity_map._modified == \ + # self.identity_map._modified.difference(objects) + + self.dispatch.after_flush_postexec(self, flush_context) + + transaction.commit() + + except: + with util.safe_reraise(): + transaction.rollback(_capture_exception=True) + + def bulk_save_objects( + self, + objects: Iterable[object], + return_defaults: bool = False, + update_changed_only: bool = True, + preserve_order: bool = True, + ) -> None: + """Perform a bulk save of the given list of objects. + + .. legacy:: + + This method is a legacy feature as of the 2.0 series of + SQLAlchemy. For modern bulk INSERT and UPDATE, see + the sections :ref:`orm_queryguide_bulk_insert` and + :ref:`orm_queryguide_bulk_update`. + + For general INSERT and UPDATE of existing ORM mapped objects, + prefer standard :term:`unit of work` data management patterns, + introduced in the :ref:`unified_tutorial` at + :ref:`tutorial_orm_data_manipulation`. SQLAlchemy 2.0 + now uses :ref:`engine_insertmanyvalues` with modern dialects + which solves previous issues of bulk INSERT slowness. + + :param objects: a sequence of mapped object instances. The mapped + objects are persisted as is, and are **not** associated with the + :class:`.Session` afterwards. + + For each object, whether the object is sent as an INSERT or an + UPDATE is dependent on the same rules used by the :class:`.Session` + in traditional operation; if the object has the + :attr:`.InstanceState.key` + attribute set, then the object is assumed to be "detached" and + will result in an UPDATE. Otherwise, an INSERT is used. + + In the case of an UPDATE, statements are grouped based on which + attributes have changed, and are thus to be the subject of each + SET clause. If ``update_changed_only`` is False, then all + attributes present within each object are applied to the UPDATE + statement, which may help in allowing the statements to be grouped + together into a larger executemany(), and will also reduce the + overhead of checking history on attributes. + + :param return_defaults: when True, rows that are missing values which + generate defaults, namely integer primary key defaults and sequences, + will be inserted **one at a time**, so that the primary key value + is available. In particular this will allow joined-inheritance + and other multi-table mappings to insert correctly without the need + to provide primary key values ahead of time; however, + :paramref:`.Session.bulk_save_objects.return_defaults` **greatly + reduces the performance gains** of the method overall. It is strongly + advised to please use the standard :meth:`_orm.Session.add_all` + approach. + + :param update_changed_only: when True, UPDATE statements are rendered + based on those attributes in each state that have logged changes. + When False, all attributes present are rendered into the SET clause + with the exception of primary key attributes. + + :param preserve_order: when True, the order of inserts and updates + matches exactly the order in which the objects are given. When + False, common types of objects are grouped into inserts + and updates, to allow for more batching opportunities. + + .. seealso:: + + :doc:`queryguide/dml` + + :meth:`.Session.bulk_insert_mappings` + + :meth:`.Session.bulk_update_mappings` + + """ + + obj_states: Iterable[InstanceState[Any]] + + obj_states = (attributes.instance_state(obj) for obj in objects) + + if not preserve_order: + # the purpose of this sort is just so that common mappers + # and persistence states are grouped together, so that groupby + # will return a single group for a particular type of mapper. + # it's not trying to be deterministic beyond that. + obj_states = sorted( + obj_states, + key=lambda state: (id(state.mapper), state.key is not None), + ) + + def grouping_key( + state: InstanceState[_O], + ) -> Tuple[Mapper[_O], bool]: + return (state.mapper, state.key is not None) + + for (mapper, isupdate), states in itertools.groupby( + obj_states, grouping_key + ): + self._bulk_save_mappings( + mapper, + states, + isupdate, + True, + return_defaults, + update_changed_only, + False, + ) + + def bulk_insert_mappings( + self, + mapper: Mapper[Any], + mappings: Iterable[Dict[str, Any]], + return_defaults: bool = False, + render_nulls: bool = False, + ) -> None: + """Perform a bulk insert of the given list of mapping dictionaries. + + .. legacy:: + + This method is a legacy feature as of the 2.0 series of + SQLAlchemy. For modern bulk INSERT and UPDATE, see + the sections :ref:`orm_queryguide_bulk_insert` and + :ref:`orm_queryguide_bulk_update`. The 2.0 API shares + implementation details with this method and adds new features + as well. + + :param mapper: a mapped class, or the actual :class:`_orm.Mapper` + object, + representing the single kind of object represented within the mapping + list. + + :param mappings: a sequence of dictionaries, each one containing the + state of the mapped row to be inserted, in terms of the attribute + names on the mapped class. If the mapping refers to multiple tables, + such as a joined-inheritance mapping, each dictionary must contain all + keys to be populated into all tables. + + :param return_defaults: when True, the INSERT process will be altered + to ensure that newly generated primary key values will be fetched. + The rationale for this parameter is typically to enable + :ref:`Joined Table Inheritance <joined_inheritance>` mappings to + be bulk inserted. + + .. note:: for backends that don't support RETURNING, the + :paramref:`_orm.Session.bulk_insert_mappings.return_defaults` + parameter can significantly decrease performance as INSERT + statements can no longer be batched. See + :ref:`engine_insertmanyvalues` + for background on which backends are affected. + + :param render_nulls: When True, a value of ``None`` will result + in a NULL value being included in the INSERT statement, rather + than the column being omitted from the INSERT. This allows all + the rows being INSERTed to have the identical set of columns which + allows the full set of rows to be batched to the DBAPI. Normally, + each column-set that contains a different combination of NULL values + than the previous row must omit a different series of columns from + the rendered INSERT statement, which means it must be emitted as a + separate statement. By passing this flag, the full set of rows + are guaranteed to be batchable into one batch; the cost however is + that server-side defaults which are invoked by an omitted column will + be skipped, so care must be taken to ensure that these are not + necessary. + + .. warning:: + + When this flag is set, **server side default SQL values will + not be invoked** for those columns that are inserted as NULL; + the NULL value will be sent explicitly. Care must be taken + to ensure that no server-side default functions need to be + invoked for the operation as a whole. + + .. seealso:: + + :doc:`queryguide/dml` + + :meth:`.Session.bulk_save_objects` + + :meth:`.Session.bulk_update_mappings` + + """ + self._bulk_save_mappings( + mapper, + mappings, + False, + False, + return_defaults, + False, + render_nulls, + ) + + def bulk_update_mappings( + self, mapper: Mapper[Any], mappings: Iterable[Dict[str, Any]] + ) -> None: + """Perform a bulk update of the given list of mapping dictionaries. + + .. legacy:: + + This method is a legacy feature as of the 2.0 series of + SQLAlchemy. For modern bulk INSERT and UPDATE, see + the sections :ref:`orm_queryguide_bulk_insert` and + :ref:`orm_queryguide_bulk_update`. The 2.0 API shares + implementation details with this method and adds new features + as well. + + :param mapper: a mapped class, or the actual :class:`_orm.Mapper` + object, + representing the single kind of object represented within the mapping + list. + + :param mappings: a sequence of dictionaries, each one containing the + state of the mapped row to be updated, in terms of the attribute names + on the mapped class. If the mapping refers to multiple tables, such + as a joined-inheritance mapping, each dictionary may contain keys + corresponding to all tables. All those keys which are present and + are not part of the primary key are applied to the SET clause of the + UPDATE statement; the primary key values, which are required, are + applied to the WHERE clause. + + + .. seealso:: + + :doc:`queryguide/dml` + + :meth:`.Session.bulk_insert_mappings` + + :meth:`.Session.bulk_save_objects` + + """ + self._bulk_save_mappings( + mapper, mappings, True, False, False, False, False + ) + + def _bulk_save_mappings( + self, + mapper: Mapper[_O], + mappings: Union[Iterable[InstanceState[_O]], Iterable[Dict[str, Any]]], + isupdate: bool, + isstates: bool, + return_defaults: bool, + update_changed_only: bool, + render_nulls: bool, + ) -> None: + mapper = _class_to_mapper(mapper) + self._flushing = True + + transaction = self._autobegin_t()._begin() + try: + if isupdate: + bulk_persistence._bulk_update( + mapper, + mappings, + transaction, + isstates, + update_changed_only, + ) + else: + bulk_persistence._bulk_insert( + mapper, + mappings, + transaction, + isstates, + return_defaults, + render_nulls, + ) + transaction.commit() + + except: + with util.safe_reraise(): + transaction.rollback(_capture_exception=True) + finally: + self._flushing = False + + def is_modified( + self, instance: object, include_collections: bool = True + ) -> bool: + r"""Return ``True`` if the given instance has locally + modified attributes. + + This method retrieves the history for each instrumented + attribute on the instance and performs a comparison of the current + value to its previously committed value, if any. + + It is in effect a more expensive and accurate + version of checking for the given instance in the + :attr:`.Session.dirty` collection; a full test for + each attribute's net "dirty" status is performed. + + E.g.:: + + return session.is_modified(someobject) + + A few caveats to this method apply: + + * Instances present in the :attr:`.Session.dirty` collection may + report ``False`` when tested with this method. This is because + the object may have received change events via attribute mutation, + thus placing it in :attr:`.Session.dirty`, but ultimately the state + is the same as that loaded from the database, resulting in no net + change here. + * Scalar attributes may not have recorded the previously set + value when a new value was applied, if the attribute was not loaded, + or was expired, at the time the new value was received - in these + cases, the attribute is assumed to have a change, even if there is + ultimately no net change against its database value. SQLAlchemy in + most cases does not need the "old" value when a set event occurs, so + it skips the expense of a SQL call if the old value isn't present, + based on the assumption that an UPDATE of the scalar value is + usually needed, and in those few cases where it isn't, is less + expensive on average than issuing a defensive SELECT. + + The "old" value is fetched unconditionally upon set only if the + attribute container has the ``active_history`` flag set to ``True``. + This flag is set typically for primary key attributes and scalar + object references that are not a simple many-to-one. To set this + flag for any arbitrary mapped column, use the ``active_history`` + argument with :func:`.column_property`. + + :param instance: mapped instance to be tested for pending changes. + :param include_collections: Indicates if multivalued collections + should be included in the operation. Setting this to ``False`` is a + way to detect only local-column based properties (i.e. scalar columns + or many-to-one foreign keys) that would result in an UPDATE for this + instance upon flush. + + """ + state = object_state(instance) + + if not state.modified: + return False + + dict_ = state.dict + + for attr in state.manager.attributes: + if ( + not include_collections + and hasattr(attr.impl, "get_collection") + ) or not hasattr(attr.impl, "get_history"): + continue + + (added, unchanged, deleted) = attr.impl.get_history( + state, dict_, passive=PassiveFlag.NO_CHANGE + ) + + if added or deleted: + return True + else: + return False + + @property + def is_active(self) -> bool: + """True if this :class:`.Session` not in "partial rollback" state. + + .. versionchanged:: 1.4 The :class:`_orm.Session` no longer begins + a new transaction immediately, so this attribute will be False + when the :class:`_orm.Session` is first instantiated. + + "partial rollback" state typically indicates that the flush process + of the :class:`_orm.Session` has failed, and that the + :meth:`_orm.Session.rollback` method must be emitted in order to + fully roll back the transaction. + + If this :class:`_orm.Session` is not in a transaction at all, the + :class:`_orm.Session` will autobegin when it is first used, so in this + case :attr:`_orm.Session.is_active` will return True. + + Otherwise, if this :class:`_orm.Session` is within a transaction, + and that transaction has not been rolled back internally, the + :attr:`_orm.Session.is_active` will also return True. + + .. seealso:: + + :ref:`faq_session_rollback` + + :meth:`_orm.Session.in_transaction` + + """ + return self._transaction is None or self._transaction.is_active + + @property + def _dirty_states(self) -> Iterable[InstanceState[Any]]: + """The set of all persistent states considered dirty. + + This method returns all states that were modified including + those that were possibly deleted. + + """ + return self.identity_map._dirty_states() + + @property + def dirty(self) -> IdentitySet: + """The set of all persistent instances considered dirty. + + E.g.:: + + some_mapped_object in session.dirty + + Instances are considered dirty when they were modified but not + deleted. + + Note that this 'dirty' calculation is 'optimistic'; most + attribute-setting or collection modification operations will + mark an instance as 'dirty' and place it in this set, even if + there is no net change to the attribute's value. At flush + time, the value of each attribute is compared to its + previously saved value, and if there's no net change, no SQL + operation will occur (this is a more expensive operation so + it's only done at flush time). + + To check if an instance has actionable net changes to its + attributes, use the :meth:`.Session.is_modified` method. + + """ + return IdentitySet( + [ + state.obj() + for state in self._dirty_states + if state not in self._deleted + ] + ) + + @property + def deleted(self) -> IdentitySet: + "The set of all instances marked as 'deleted' within this ``Session``" + + return util.IdentitySet(list(self._deleted.values())) + + @property + def new(self) -> IdentitySet: + "The set of all instances marked as 'new' within this ``Session``." + + return util.IdentitySet(list(self._new.values())) + + +_S = TypeVar("_S", bound="Session") + + +class sessionmaker(_SessionClassMethods, Generic[_S]): + """A configurable :class:`.Session` factory. + + The :class:`.sessionmaker` factory generates new + :class:`.Session` objects when called, creating them given + the configurational arguments established here. + + e.g.:: + + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + + # an Engine, which the Session will use for connection + # resources + engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/') + + Session = sessionmaker(engine) + + with Session() as session: + session.add(some_object) + session.add(some_other_object) + session.commit() + + Context manager use is optional; otherwise, the returned + :class:`_orm.Session` object may be closed explicitly via the + :meth:`_orm.Session.close` method. Using a + ``try:/finally:`` block is optional, however will ensure that the close + takes place even if there are database errors:: + + session = Session() + try: + session.add(some_object) + session.add(some_other_object) + session.commit() + finally: + session.close() + + :class:`.sessionmaker` acts as a factory for :class:`_orm.Session` + objects in the same way as an :class:`_engine.Engine` acts as a factory + for :class:`_engine.Connection` objects. In this way it also includes + a :meth:`_orm.sessionmaker.begin` method, that provides a context + manager which both begins and commits a transaction, as well as closes + out the :class:`_orm.Session` when complete, rolling back the transaction + if any errors occur:: + + Session = sessionmaker(engine) + + with Session.begin() as session: + session.add(some_object) + session.add(some_other_object) + # commits transaction, closes session + + .. versionadded:: 1.4 + + When calling upon :class:`_orm.sessionmaker` to construct a + :class:`_orm.Session`, keyword arguments may also be passed to the + method; these arguments will override that of the globally configured + parameters. Below we use a :class:`_orm.sessionmaker` bound to a certain + :class:`_engine.Engine` to produce a :class:`_orm.Session` that is instead + bound to a specific :class:`_engine.Connection` procured from that engine:: + + Session = sessionmaker(engine) + + # bind an individual session to a connection + + with engine.connect() as connection: + with Session(bind=connection) as session: + # work with session + + The class also includes a method :meth:`_orm.sessionmaker.configure`, which + can be used to specify additional keyword arguments to the factory, which + will take effect for subsequent :class:`.Session` objects generated. This + is usually used to associate one or more :class:`_engine.Engine` objects + with an existing + :class:`.sessionmaker` factory before it is first used:: + + # application starts, sessionmaker does not have + # an engine bound yet + Session = sessionmaker() + + # ... later, when an engine URL is read from a configuration + # file or other events allow the engine to be created + engine = create_engine('sqlite:///foo.db') + Session.configure(bind=engine) + + sess = Session() + # work with session + + .. seealso:: + + :ref:`session_getting` - introductory text on creating + sessions using :class:`.sessionmaker`. + + """ + + class_: Type[_S] + + @overload + def __init__( + self, + bind: Optional[_SessionBind] = ..., + *, + class_: Type[_S], + autoflush: bool = ..., + expire_on_commit: bool = ..., + info: Optional[_InfoType] = ..., + **kw: Any, + ): ... + + @overload + def __init__( + self: "sessionmaker[Session]", + bind: Optional[_SessionBind] = ..., + *, + autoflush: bool = ..., + expire_on_commit: bool = ..., + info: Optional[_InfoType] = ..., + **kw: Any, + ): ... + + def __init__( + self, + bind: Optional[_SessionBind] = None, + *, + class_: Type[_S] = Session, # type: ignore + autoflush: bool = True, + expire_on_commit: bool = True, + info: Optional[_InfoType] = None, + **kw: Any, + ): + r"""Construct a new :class:`.sessionmaker`. + + All arguments here except for ``class_`` correspond to arguments + accepted by :class:`.Session` directly. See the + :meth:`.Session.__init__` docstring for more details on parameters. + + :param bind: a :class:`_engine.Engine` or other :class:`.Connectable` + with + which newly created :class:`.Session` objects will be associated. + :param class\_: class to use in order to create new :class:`.Session` + objects. Defaults to :class:`.Session`. + :param autoflush: The autoflush setting to use with newly created + :class:`.Session` objects. + + .. seealso:: + + :ref:`session_flushing` - additional background on autoflush + + :param expire_on_commit=True: the + :paramref:`_orm.Session.expire_on_commit` setting to use + with newly created :class:`.Session` objects. + + :param info: optional dictionary of information that will be available + via :attr:`.Session.info`. Note this dictionary is *updated*, not + replaced, when the ``info`` parameter is specified to the specific + :class:`.Session` construction operation. + + :param \**kw: all other keyword arguments are passed to the + constructor of newly created :class:`.Session` objects. + + """ + kw["bind"] = bind + kw["autoflush"] = autoflush + kw["expire_on_commit"] = expire_on_commit + if info is not None: + kw["info"] = info + self.kw = kw + # make our own subclass of the given class, so that + # events can be associated with it specifically. + self.class_ = type(class_.__name__, (class_,), {}) + + def begin(self) -> contextlib.AbstractContextManager[_S]: + """Produce a context manager that both provides a new + :class:`_orm.Session` as well as a transaction that commits. + + + e.g.:: + + Session = sessionmaker(some_engine) + + with Session.begin() as session: + session.add(some_object) + + # commits transaction, closes session + + .. versionadded:: 1.4 + + + """ + + session = self() + return session._maker_context_manager() + + def __call__(self, **local_kw: Any) -> _S: + """Produce a new :class:`.Session` object using the configuration + established in this :class:`.sessionmaker`. + + In Python, the ``__call__`` method is invoked on an object when + it is "called" in the same way as a function:: + + Session = sessionmaker(some_engine) + session = Session() # invokes sessionmaker.__call__() + + """ + for k, v in self.kw.items(): + if k == "info" and "info" in local_kw: + d = v.copy() + d.update(local_kw["info"]) + local_kw["info"] = d + else: + local_kw.setdefault(k, v) + return self.class_(**local_kw) + + def configure(self, **new_kw: Any) -> None: + """(Re)configure the arguments for this sessionmaker. + + e.g.:: + + Session = sessionmaker() + + Session.configure(bind=create_engine('sqlite://')) + """ + self.kw.update(new_kw) + + def __repr__(self) -> str: + return "%s(class_=%r, %s)" % ( + self.__class__.__name__, + self.class_.__name__, + ", ".join("%s=%r" % (k, v) for k, v in self.kw.items()), + ) + + +def close_all_sessions() -> None: + """Close all sessions in memory. + + This function consults a global registry of all :class:`.Session` objects + and calls :meth:`.Session.close` on them, which resets them to a clean + state. + + This function is not for general use but may be useful for test suites + within the teardown scheme. + + .. versionadded:: 1.3 + + """ + + for sess in _sessions.values(): + sess.close() + + +def make_transient(instance: object) -> None: + """Alter the state of the given instance so that it is :term:`transient`. + + .. note:: + + :func:`.make_transient` is a special-case function for + advanced use cases only. + + The given mapped instance is assumed to be in the :term:`persistent` or + :term:`detached` state. The function will remove its association with any + :class:`.Session` as well as its :attr:`.InstanceState.identity`. The + effect is that the object will behave as though it were newly constructed, + except retaining any attribute / collection values that were loaded at the + time of the call. The :attr:`.InstanceState.deleted` flag is also reset + if this object had been deleted as a result of using + :meth:`.Session.delete`. + + .. warning:: + + :func:`.make_transient` does **not** "unexpire" or otherwise eagerly + load ORM-mapped attributes that are not currently loaded at the time + the function is called. This includes attributes which: + + * were expired via :meth:`.Session.expire` + + * were expired as the natural effect of committing a session + transaction, e.g. :meth:`.Session.commit` + + * are normally :term:`lazy loaded` but are not currently loaded + + * are "deferred" (see :ref:`orm_queryguide_column_deferral`) and are + not yet loaded + + * were not present in the query which loaded this object, such as that + which is common in joined table inheritance and other scenarios. + + After :func:`.make_transient` is called, unloaded attributes such + as those above will normally resolve to the value ``None`` when + accessed, or an empty collection for a collection-oriented attribute. + As the object is transient and un-associated with any database + identity, it will no longer retrieve these values. + + .. seealso:: + + :func:`.make_transient_to_detached` + + """ + state = attributes.instance_state(instance) + s = _state_session(state) + if s: + s._expunge_states([state]) + + # remove expired state + state.expired_attributes.clear() + + # remove deferred callables + if state.callables: + del state.callables + + if state.key: + del state.key + if state._deleted: + del state._deleted + + +def make_transient_to_detached(instance: object) -> None: + """Make the given transient instance :term:`detached`. + + .. note:: + + :func:`.make_transient_to_detached` is a special-case function for + advanced use cases only. + + All attribute history on the given instance + will be reset as though the instance were freshly loaded + from a query. Missing attributes will be marked as expired. + The primary key attributes of the object, which are required, will be made + into the "key" of the instance. + + The object can then be added to a session, or merged + possibly with the load=False flag, at which point it will look + as if it were loaded that way, without emitting SQL. + + This is a special use case function that differs from a normal + call to :meth:`.Session.merge` in that a given persistent state + can be manufactured without any SQL calls. + + .. seealso:: + + :func:`.make_transient` + + :meth:`.Session.enable_relationship_loading` + + """ + state = attributes.instance_state(instance) + if state.session_id or state.key: + raise sa_exc.InvalidRequestError("Given object must be transient") + state.key = state.mapper._identity_key_from_state(state) + if state._deleted: + del state._deleted + state._commit_all(state.dict) + state._expire_attributes(state.dict, state.unloaded) + + +def object_session(instance: object) -> Optional[Session]: + """Return the :class:`.Session` to which the given instance belongs. + + This is essentially the same as the :attr:`.InstanceState.session` + accessor. See that attribute for details. + + """ + + try: + state = attributes.instance_state(instance) + except exc.NO_STATE as err: + raise exc.UnmappedInstanceError(instance) from err + else: + return _state_session(state) + + +_new_sessionid = util.counter() |