summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py1936
1 files changed, 1936 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py
new file mode 100644
index 0000000..c5fe469
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py
@@ -0,0 +1,1936 @@
+# ext/asyncio/session.py
+# Copyright (C) 2020-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+from __future__ import annotations
+
+import asyncio
+from typing import Any
+from typing import Awaitable
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import Generic
+from typing import Iterable
+from typing import Iterator
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from . import engine
+from .base import ReversibleProxy
+from .base import StartableContext
+from .result import _ensure_sync_result
+from .result import AsyncResult
+from .result import AsyncScalarResult
+from ... import util
+from ...orm import close_all_sessions as _sync_close_all_sessions
+from ...orm import object_session
+from ...orm import Session
+from ...orm import SessionTransaction
+from ...orm import state as _instance_state
+from ...util.concurrency import greenlet_spawn
+from ...util.typing import Concatenate
+from ...util.typing import ParamSpec
+
+
+if TYPE_CHECKING:
+ from .engine import AsyncConnection
+ from .engine import AsyncEngine
+ from ...engine import Connection
+ from ...engine import CursorResult
+ from ...engine import Engine
+ from ...engine import Result
+ from ...engine import Row
+ from ...engine import RowMapping
+ from ...engine import ScalarResult
+ from ...engine.interfaces import _CoreAnyExecuteParams
+ from ...engine.interfaces import CoreExecuteOptionsParameter
+ from ...event import dispatcher
+ from ...orm._typing import _IdentityKeyType
+ from ...orm._typing import _O
+ from ...orm._typing import OrmExecuteOptionsParameter
+ from ...orm.identity import IdentityMap
+ from ...orm.interfaces import ORMOption
+ from ...orm.session import _BindArguments
+ from ...orm.session import _EntityBindKey
+ from ...orm.session import _PKIdentityArgument
+ from ...orm.session import _SessionBind
+ from ...orm.session import _SessionBindKey
+ from ...sql._typing import _InfoType
+ from ...sql.base import Executable
+ from ...sql.dml import UpdateBase
+ from ...sql.elements import ClauseElement
+ from ...sql.selectable import ForUpdateParameter
+ from ...sql.selectable import TypedReturnsRows
+
+_AsyncSessionBind = Union["AsyncEngine", "AsyncConnection"]
+
+_P = ParamSpec("_P")
+_T = TypeVar("_T", bound=Any)
+
+
+_EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True})
+_STREAM_OPTIONS = util.immutabledict({"stream_results": True})
+
+
+class AsyncAttrs:
+ """Mixin class which provides an awaitable accessor for all attributes.
+
+ E.g.::
+
+ from __future__ import annotations
+
+ from typing import List
+
+ from sqlalchemy import ForeignKey
+ from sqlalchemy import func
+ from sqlalchemy.ext.asyncio import AsyncAttrs
+ from sqlalchemy.orm import DeclarativeBase
+ from sqlalchemy.orm import Mapped
+ from sqlalchemy.orm import mapped_column
+ from sqlalchemy.orm import relationship
+
+
+ class Base(AsyncAttrs, DeclarativeBase):
+ pass
+
+
+ class A(Base):
+ __tablename__ = "a"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ data: Mapped[str]
+ bs: Mapped[List[B]] = relationship()
+
+
+ class B(Base):
+ __tablename__ = "b"
+ id: Mapped[int] = mapped_column(primary_key=True)
+ a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
+ data: Mapped[str]
+
+ In the above example, the :class:`_asyncio.AsyncAttrs` mixin is applied to
+ the declarative ``Base`` class where it takes effect for all subclasses.
+ This mixin adds a single new attribute
+ :attr:`_asyncio.AsyncAttrs.awaitable_attrs` to all classes, which will
+ yield the value of any attribute as an awaitable. This allows attributes
+ which may be subject to lazy loading or deferred / unexpiry loading to be
+ accessed such that IO can still be emitted::
+
+ a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
+
+ # use the lazy loader on ``a1.bs`` via the ``.awaitable_attrs``
+ # interface, so that it may be awaited
+ for b1 in await a1.awaitable_attrs.bs:
+ print(b1)
+
+ The :attr:`_asyncio.AsyncAttrs.awaitable_attrs` performs a call against the
+ attribute that is approximately equivalent to using the
+ :meth:`_asyncio.AsyncSession.run_sync` method, e.g.::
+
+ for b1 in await async_session.run_sync(lambda sess: a1.bs):
+ print(b1)
+
+ .. versionadded:: 2.0.13
+
+ .. seealso::
+
+ :ref:`asyncio_orm_avoid_lazyloads`
+
+ """
+
+ class _AsyncAttrGetitem:
+ __slots__ = "_instance"
+
+ def __init__(self, _instance: Any):
+ self._instance = _instance
+
+ def __getattr__(self, name: str) -> Awaitable[Any]:
+ return greenlet_spawn(getattr, self._instance, name)
+
+ @property
+ def awaitable_attrs(self) -> AsyncAttrs._AsyncAttrGetitem:
+ """provide a namespace of all attributes on this object wrapped
+ as awaitables.
+
+ e.g.::
+
+
+ a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()
+
+ some_attribute = await a1.awaitable_attrs.some_deferred_attribute
+ some_collection = await a1.awaitable_attrs.some_collection
+
+ """ # noqa: E501
+
+ return AsyncAttrs._AsyncAttrGetitem(self)
+
+
+@util.create_proxy_methods(
+ Session,
+ ":class:`_orm.Session`",
+ ":class:`_asyncio.AsyncSession`",
+ classmethods=["object_session", "identity_key"],
+ methods=[
+ "__contains__",
+ "__iter__",
+ "add",
+ "add_all",
+ "expire",
+ "expire_all",
+ "expunge",
+ "expunge_all",
+ "is_modified",
+ "in_transaction",
+ "in_nested_transaction",
+ ],
+ attributes=[
+ "dirty",
+ "deleted",
+ "new",
+ "identity_map",
+ "is_active",
+ "autoflush",
+ "no_autoflush",
+ "info",
+ ],
+)
+class AsyncSession(ReversibleProxy[Session]):
+ """Asyncio version of :class:`_orm.Session`.
+
+ The :class:`_asyncio.AsyncSession` is a proxy for a traditional
+ :class:`_orm.Session` instance.
+
+ The :class:`_asyncio.AsyncSession` is **not safe for use in concurrent
+ tasks.**. See :ref:`session_faq_threadsafe` for background.
+
+ .. versionadded:: 1.4
+
+ To use an :class:`_asyncio.AsyncSession` with custom :class:`_orm.Session`
+ implementations, see the
+ :paramref:`_asyncio.AsyncSession.sync_session_class` parameter.
+
+
+ """
+
+ _is_asyncio = True
+
+ dispatch: dispatcher[Session]
+
+ def __init__(
+ self,
+ bind: Optional[_AsyncSessionBind] = None,
+ *,
+ binds: Optional[Dict[_SessionBindKey, _AsyncSessionBind]] = None,
+ sync_session_class: Optional[Type[Session]] = None,
+ **kw: Any,
+ ):
+ r"""Construct a new :class:`_asyncio.AsyncSession`.
+
+ All parameters other than ``sync_session_class`` are passed to the
+ ``sync_session_class`` callable directly to instantiate a new
+ :class:`_orm.Session`. Refer to :meth:`_orm.Session.__init__` for
+ parameter documentation.
+
+ :param sync_session_class:
+ A :class:`_orm.Session` subclass or other callable which will be used
+ to construct the :class:`_orm.Session` which will be proxied. This
+ parameter may be used to provide custom :class:`_orm.Session`
+ subclasses. Defaults to the
+ :attr:`_asyncio.AsyncSession.sync_session_class` class-level
+ attribute.
+
+ .. versionadded:: 1.4.24
+
+ """
+ sync_bind = sync_binds = None
+
+ if bind:
+ self.bind = bind
+ sync_bind = engine._get_sync_engine_or_connection(bind)
+
+ if binds:
+ self.binds = binds
+ sync_binds = {
+ key: engine._get_sync_engine_or_connection(b)
+ for key, b in binds.items()
+ }
+
+ if sync_session_class:
+ self.sync_session_class = sync_session_class
+
+ self.sync_session = self._proxied = self._assign_proxied(
+ self.sync_session_class(bind=sync_bind, binds=sync_binds, **kw)
+ )
+
+ sync_session_class: Type[Session] = Session
+ """The class or callable that provides the
+ underlying :class:`_orm.Session` instance for a particular
+ :class:`_asyncio.AsyncSession`.
+
+ At the class level, this attribute is the default value for the
+ :paramref:`_asyncio.AsyncSession.sync_session_class` parameter. Custom
+ subclasses of :class:`_asyncio.AsyncSession` can override this.
+
+ At the instance level, this attribute indicates the current class or
+ callable that was used to provide the :class:`_orm.Session` instance for
+ this :class:`_asyncio.AsyncSession` instance.
+
+ .. versionadded:: 1.4.24
+
+ """
+
+ sync_session: Session
+ """Reference to the underlying :class:`_orm.Session` this
+ :class:`_asyncio.AsyncSession` proxies requests towards.
+
+ This instance can be used as an event target.
+
+ .. seealso::
+
+ :ref:`asyncio_events`
+
+ """
+
+ @classmethod
+ def _no_async_engine_events(cls) -> NoReturn:
+ raise NotImplementedError(
+ "asynchronous events are not implemented at this time. Apply "
+ "synchronous listeners to the AsyncSession.sync_session."
+ )
+
+ async def refresh(
+ self,
+ instance: object,
+ attribute_names: Optional[Iterable[str]] = None,
+ with_for_update: ForUpdateParameter = None,
+ ) -> None:
+ """Expire and refresh the attributes on the given instance.
+
+ A query will be issued to the database and all attributes will be
+ refreshed with their current database value.
+
+ This is the async version of the :meth:`_orm.Session.refresh` method.
+ See that method for a complete description of all options.
+
+ .. seealso::
+
+ :meth:`_orm.Session.refresh` - main documentation for refresh
+
+ """
+
+ await greenlet_spawn(
+ self.sync_session.refresh,
+ instance,
+ attribute_names=attribute_names,
+ with_for_update=with_for_update,
+ )
+
+ async def run_sync(
+ self,
+ fn: Callable[Concatenate[Session, _P], _T],
+ *arg: _P.args,
+ **kw: _P.kwargs,
+ ) -> _T:
+ """Invoke the given synchronous (i.e. not async) callable,
+ passing a synchronous-style :class:`_orm.Session` as the first
+ argument.
+
+ This method allows traditional synchronous SQLAlchemy functions to
+ run within the context of an asyncio application.
+
+ E.g.::
+
+ def some_business_method(session: Session, param: str) -> str:
+ '''A synchronous function that does not require awaiting
+
+ :param session: a SQLAlchemy Session, used synchronously
+
+ :return: an optional return value is supported
+
+ '''
+ session.add(MyObject(param=param))
+ session.flush()
+ return "success"
+
+
+ async def do_something_async(async_engine: AsyncEngine) -> None:
+ '''an async function that uses awaiting'''
+
+ with AsyncSession(async_engine) as async_session:
+ # run some_business_method() with a sync-style
+ # Session, proxied into an awaitable
+ return_code = await async_session.run_sync(some_business_method, param="param1")
+ print(return_code)
+
+ This method maintains the asyncio event loop all the way through
+ to the database connection by running the given callable in a
+ specially instrumented greenlet.
+
+ .. tip::
+
+ The provided callable is invoked inline within the asyncio event
+ loop, and will block on traditional IO calls. IO within this
+ callable should only call into SQLAlchemy's asyncio database
+ APIs which will be properly adapted to the greenlet context.
+
+ .. seealso::
+
+ :class:`.AsyncAttrs` - a mixin for ORM mapped classes that provides
+ a similar feature more succinctly on a per-attribute basis
+
+ :meth:`.AsyncConnection.run_sync`
+
+ :ref:`session_run_sync`
+ """ # noqa: E501
+
+ return await greenlet_spawn(
+ fn, self.sync_session, *arg, _require_await=False, **kw
+ )
+
+ @overload
+ async def execute(
+ self,
+ statement: TypedReturnsRows[_T],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> Result[_T]: ...
+
+ @overload
+ async def execute(
+ self,
+ statement: UpdateBase,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> CursorResult[Any]: ...
+
+ @overload
+ async def execute(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ _parent_execute_state: Optional[Any] = None,
+ _add_event: Optional[Any] = None,
+ ) -> Result[Any]: ...
+
+ async def execute(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Result[Any]:
+ """Execute a statement and return a buffered
+ :class:`_engine.Result` object.
+
+ .. seealso::
+
+ :meth:`_orm.Session.execute` - main documentation for execute
+
+ """
+
+ if execution_options:
+ execution_options = util.immutabledict(execution_options).union(
+ _EXECUTE_OPTIONS
+ )
+ else:
+ execution_options = _EXECUTE_OPTIONS
+
+ result = await greenlet_spawn(
+ self.sync_session.execute,
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw,
+ )
+ return await _ensure_sync_result(result, self.execute)
+
+ @overload
+ async def scalar(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Optional[_T]: ...
+
+ @overload
+ async def scalar(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Any: ...
+
+ async def scalar(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> Any:
+ """Execute a statement and return a scalar result.
+
+ .. seealso::
+
+ :meth:`_orm.Session.scalar` - main documentation for scalar
+
+ """
+
+ if execution_options:
+ execution_options = util.immutabledict(execution_options).union(
+ _EXECUTE_OPTIONS
+ )
+ else:
+ execution_options = _EXECUTE_OPTIONS
+
+ return await greenlet_spawn(
+ self.sync_session.scalar,
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw,
+ )
+
+ @overload
+ async def scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[_T]: ...
+
+ @overload
+ async def scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[Any]: ...
+
+ async def scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> ScalarResult[Any]:
+ """Execute a statement and return scalar results.
+
+ :return: a :class:`_result.ScalarResult` object
+
+ .. versionadded:: 1.4.24 Added :meth:`_asyncio.AsyncSession.scalars`
+
+ .. versionadded:: 1.4.26 Added
+ :meth:`_asyncio.async_scoped_session.scalars`
+
+ .. seealso::
+
+ :meth:`_orm.Session.scalars` - main documentation for scalars
+
+ :meth:`_asyncio.AsyncSession.stream_scalars` - streaming version
+
+ """
+
+ result = await self.execute(
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw,
+ )
+ return result.scalars()
+
+ async def get(
+ self,
+ entity: _EntityBindKey[_O],
+ ident: _PKIdentityArgument,
+ *,
+ options: Optional[Sequence[ORMOption]] = None,
+ populate_existing: bool = False,
+ with_for_update: ForUpdateParameter = None,
+ identity_token: Optional[Any] = None,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ ) -> Union[_O, None]:
+ """Return an instance based on the given primary key identifier,
+ or ``None`` if not found.
+
+ .. seealso::
+
+ :meth:`_orm.Session.get` - main documentation for get
+
+
+ """
+
+ return await greenlet_spawn(
+ cast("Callable[..., _O]", self.sync_session.get),
+ entity,
+ ident,
+ options=options,
+ populate_existing=populate_existing,
+ with_for_update=with_for_update,
+ identity_token=identity_token,
+ execution_options=execution_options,
+ )
+
+ async def get_one(
+ self,
+ entity: _EntityBindKey[_O],
+ ident: _PKIdentityArgument,
+ *,
+ options: Optional[Sequence[ORMOption]] = None,
+ populate_existing: bool = False,
+ with_for_update: ForUpdateParameter = None,
+ identity_token: Optional[Any] = None,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ ) -> _O:
+ """Return an instance based on the given primary key identifier,
+ or raise an exception if not found.
+
+ Raises ``sqlalchemy.orm.exc.NoResultFound`` if the query selects
+ no rows.
+
+ ..versionadded: 2.0.22
+
+ .. seealso::
+
+ :meth:`_orm.Session.get_one` - main documentation for get_one
+
+ """
+
+ return await greenlet_spawn(
+ cast("Callable[..., _O]", self.sync_session.get_one),
+ entity,
+ ident,
+ options=options,
+ populate_existing=populate_existing,
+ with_for_update=with_for_update,
+ identity_token=identity_token,
+ execution_options=execution_options,
+ )
+
+ @overload
+ async def stream(
+ self,
+ statement: TypedReturnsRows[_T],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult[_T]: ...
+
+ @overload
+ async def stream(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult[Any]: ...
+
+ async def stream(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncResult[Any]:
+ """Execute a statement and return a streaming
+ :class:`_asyncio.AsyncResult` object.
+
+ """
+
+ if execution_options:
+ execution_options = util.immutabledict(execution_options).union(
+ _STREAM_OPTIONS
+ )
+ else:
+ execution_options = _STREAM_OPTIONS
+
+ result = await greenlet_spawn(
+ self.sync_session.execute,
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw,
+ )
+ return AsyncResult(result)
+
+ @overload
+ async def stream_scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[_T]: ...
+
+ @overload
+ async def stream_scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[Any]: ...
+
+ async def stream_scalars(
+ self,
+ statement: Executable,
+ params: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT,
+ bind_arguments: Optional[_BindArguments] = None,
+ **kw: Any,
+ ) -> AsyncScalarResult[Any]:
+ """Execute a statement and return a stream of scalar results.
+
+ :return: an :class:`_asyncio.AsyncScalarResult` object
+
+ .. versionadded:: 1.4.24
+
+ .. seealso::
+
+ :meth:`_orm.Session.scalars` - main documentation for scalars
+
+ :meth:`_asyncio.AsyncSession.scalars` - non streaming version
+
+ """
+
+ result = await self.stream(
+ statement,
+ params=params,
+ execution_options=execution_options,
+ bind_arguments=bind_arguments,
+ **kw,
+ )
+ return result.scalars()
+
+ async def delete(self, instance: object) -> None:
+ """Mark an instance as deleted.
+
+ The database delete operation occurs upon ``flush()``.
+
+ As this operation may need to cascade along unloaded relationships,
+ it is awaitable to allow for those queries to take place.
+
+ .. seealso::
+
+ :meth:`_orm.Session.delete` - main documentation for delete
+
+ """
+ await greenlet_spawn(self.sync_session.delete, instance)
+
+ async def merge(
+ self,
+ instance: _O,
+ *,
+ load: bool = True,
+ options: Optional[Sequence[ORMOption]] = None,
+ ) -> _O:
+ """Copy the state of a given instance into a corresponding instance
+ within this :class:`_asyncio.AsyncSession`.
+
+ .. seealso::
+
+ :meth:`_orm.Session.merge` - main documentation for merge
+
+ """
+ return await greenlet_spawn(
+ self.sync_session.merge, instance, load=load, options=options
+ )
+
+ async def flush(self, objects: Optional[Sequence[Any]] = None) -> None:
+ """Flush all the object changes to the database.
+
+ .. seealso::
+
+ :meth:`_orm.Session.flush` - main documentation for flush
+
+ """
+ await greenlet_spawn(self.sync_session.flush, objects=objects)
+
+ def get_transaction(self) -> Optional[AsyncSessionTransaction]:
+ """Return the current root transaction in progress, if any.
+
+ :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
+ ``None``.
+
+ .. versionadded:: 1.4.18
+
+ """
+ trans = self.sync_session.get_transaction()
+ if trans is not None:
+ return AsyncSessionTransaction._retrieve_proxy_for_target(trans)
+ else:
+ return None
+
+ def get_nested_transaction(self) -> Optional[AsyncSessionTransaction]:
+ """Return the current nested transaction in progress, if any.
+
+ :return: an :class:`_asyncio.AsyncSessionTransaction` object, or
+ ``None``.
+
+ .. versionadded:: 1.4.18
+
+ """
+
+ trans = self.sync_session.get_nested_transaction()
+ if trans is not None:
+ return AsyncSessionTransaction._retrieve_proxy_for_target(trans)
+ else:
+ return None
+
+ def get_bind(
+ self,
+ mapper: Optional[_EntityBindKey[_O]] = None,
+ clause: Optional[ClauseElement] = None,
+ bind: Optional[_SessionBind] = None,
+ **kw: Any,
+ ) -> Union[Engine, Connection]:
+ """Return a "bind" to which the synchronous proxied :class:`_orm.Session`
+ is bound.
+
+ Unlike the :meth:`_orm.Session.get_bind` method, this method is
+ currently **not** used by this :class:`.AsyncSession` in any way
+ in order to resolve engines for requests.
+
+ .. note::
+
+ This method proxies directly to the :meth:`_orm.Session.get_bind`
+ method, however is currently **not** useful as an override target,
+ in contrast to that of the :meth:`_orm.Session.get_bind` method.
+ The example below illustrates how to implement custom
+ :meth:`_orm.Session.get_bind` schemes that work with
+ :class:`.AsyncSession` and :class:`.AsyncEngine`.
+
+ The pattern introduced at :ref:`session_custom_partitioning`
+ illustrates how to apply a custom bind-lookup scheme to a
+ :class:`_orm.Session` given a set of :class:`_engine.Engine` objects.
+ To apply a corresponding :meth:`_orm.Session.get_bind` implementation
+ for use with a :class:`.AsyncSession` and :class:`.AsyncEngine`
+ objects, continue to subclass :class:`_orm.Session` and apply it to
+ :class:`.AsyncSession` using
+ :paramref:`.AsyncSession.sync_session_class`. The inner method must
+ continue to return :class:`_engine.Engine` instances, which can be
+ acquired from a :class:`_asyncio.AsyncEngine` using the
+ :attr:`_asyncio.AsyncEngine.sync_engine` attribute::
+
+ # using example from "Custom Vertical Partitioning"
+
+
+ import random
+
+ from sqlalchemy.ext.asyncio import AsyncSession
+ from sqlalchemy.ext.asyncio import create_async_engine
+ from sqlalchemy.ext.asyncio import async_sessionmaker
+ from sqlalchemy.orm import Session
+
+ # construct async engines w/ async drivers
+ engines = {
+ 'leader':create_async_engine("sqlite+aiosqlite:///leader.db"),
+ 'other':create_async_engine("sqlite+aiosqlite:///other.db"),
+ 'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"),
+ 'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"),
+ }
+
+ class RoutingSession(Session):
+ def get_bind(self, mapper=None, clause=None, **kw):
+ # within get_bind(), return sync engines
+ if mapper and issubclass(mapper.class_, MyOtherClass):
+ return engines['other'].sync_engine
+ elif self._flushing or isinstance(clause, (Update, Delete)):
+ return engines['leader'].sync_engine
+ else:
+ return engines[
+ random.choice(['follower1','follower2'])
+ ].sync_engine
+
+ # apply to AsyncSession using sync_session_class
+ AsyncSessionMaker = async_sessionmaker(
+ sync_session_class=RoutingSession
+ )
+
+ The :meth:`_orm.Session.get_bind` method is called in a non-asyncio,
+ implicitly non-blocking context in the same manner as ORM event hooks
+ and functions that are invoked via :meth:`.AsyncSession.run_sync`, so
+ routines that wish to run SQL commands inside of
+ :meth:`_orm.Session.get_bind` can continue to do so using
+ blocking-style code, which will be translated to implicitly async calls
+ at the point of invoking IO on the database drivers.
+
+ """ # noqa: E501
+
+ return self.sync_session.get_bind(
+ mapper=mapper, clause=clause, bind=bind, **kw
+ )
+
+ async def connection(
+ self,
+ bind_arguments: Optional[_BindArguments] = None,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ **kw: Any,
+ ) -> AsyncConnection:
+ r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
+ this :class:`.Session` object's transactional state.
+
+ This method may also be used to establish execution options for the
+ database connection used by the current transaction.
+
+ .. versionadded:: 1.4.24 Added \**kw arguments which are passed
+ through to the underlying :meth:`_orm.Session.connection` method.
+
+ .. seealso::
+
+ :meth:`_orm.Session.connection` - main documentation for
+ "connection"
+
+ """
+
+ sync_connection = await greenlet_spawn(
+ self.sync_session.connection,
+ bind_arguments=bind_arguments,
+ execution_options=execution_options,
+ **kw,
+ )
+ return engine.AsyncConnection._retrieve_proxy_for_target(
+ sync_connection
+ )
+
+ def begin(self) -> AsyncSessionTransaction:
+ """Return an :class:`_asyncio.AsyncSessionTransaction` object.
+
+ The underlying :class:`_orm.Session` will perform the
+ "begin" action when the :class:`_asyncio.AsyncSessionTransaction`
+ object is entered::
+
+ async with async_session.begin():
+ # .. ORM transaction is begun
+
+ Note that database IO will not normally occur when the session-level
+ transaction is begun, as database transactions begin on an
+ on-demand basis. However, the begin block is async to accommodate
+ for a :meth:`_orm.SessionEvents.after_transaction_create`
+ event hook that may perform IO.
+
+ For a general description of ORM begin, see
+ :meth:`_orm.Session.begin`.
+
+ """
+
+ return AsyncSessionTransaction(self)
+
+ def begin_nested(self) -> AsyncSessionTransaction:
+ """Return an :class:`_asyncio.AsyncSessionTransaction` object
+ which will begin a "nested" transaction, e.g. SAVEPOINT.
+
+ Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`.
+
+ For a general description of ORM begin nested, see
+ :meth:`_orm.Session.begin_nested`.
+
+ .. seealso::
+
+ :ref:`aiosqlite_serializable` - special workarounds required
+ with the SQLite asyncio driver in order for SAVEPOINT to work
+ correctly.
+
+ """
+
+ return AsyncSessionTransaction(self, nested=True)
+
+ async def rollback(self) -> None:
+ """Rollback the current transaction in progress.
+
+ .. seealso::
+
+ :meth:`_orm.Session.rollback` - main documentation for
+ "rollback"
+ """
+ await greenlet_spawn(self.sync_session.rollback)
+
+ async def commit(self) -> None:
+ """Commit the current transaction in progress.
+
+ .. seealso::
+
+ :meth:`_orm.Session.commit` - main documentation for
+ "commit"
+ """
+ await greenlet_spawn(self.sync_session.commit)
+
+ async def close(self) -> None:
+ """Close out the transactional resources and ORM objects used by this
+ :class:`_asyncio.AsyncSession`.
+
+ .. seealso::
+
+ :meth:`_orm.Session.close` - main documentation for
+ "close"
+
+ :ref:`session_closing` - detail on the semantics of
+ :meth:`_asyncio.AsyncSession.close` and
+ :meth:`_asyncio.AsyncSession.reset`.
+
+ """
+ await greenlet_spawn(self.sync_session.close)
+
+ async def reset(self) -> None:
+ """Close out the transactional resources and ORM objects used by this
+ :class:`_orm.Session`, resetting the session to its initial state.
+
+ .. versionadded:: 2.0.22
+
+ .. seealso::
+
+ :meth:`_orm.Session.reset` - main documentation for
+ "reset"
+
+ :ref:`session_closing` - detail on the semantics of
+ :meth:`_asyncio.AsyncSession.close` and
+ :meth:`_asyncio.AsyncSession.reset`.
+
+ """
+ await greenlet_spawn(self.sync_session.reset)
+
+ async def aclose(self) -> None:
+ """A synonym for :meth:`_asyncio.AsyncSession.close`.
+
+ The :meth:`_asyncio.AsyncSession.aclose` name is specifically
+ to support the Python standard library ``@contextlib.aclosing``
+ context manager function.
+
+ .. versionadded:: 2.0.20
+
+ """
+ await self.close()
+
+ async def invalidate(self) -> None:
+ """Close this Session, using connection invalidation.
+
+ For a complete description, see :meth:`_orm.Session.invalidate`.
+ """
+ await greenlet_spawn(self.sync_session.invalidate)
+
+ @classmethod
+ @util.deprecated(
+ "2.0",
+ "The :meth:`.AsyncSession.close_all` method is deprecated and will be "
+ "removed in a future release. Please refer to "
+ ":func:`_asyncio.close_all_sessions`.",
+ )
+ async def close_all(cls) -> None:
+ """Close all :class:`_asyncio.AsyncSession` sessions."""
+ await close_all_sessions()
+
+ async def __aenter__(self: _AS) -> _AS:
+ return self
+
+ async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
+ task = asyncio.create_task(self.close())
+ await asyncio.shield(task)
+
+ def _maker_context_manager(self: _AS) -> _AsyncSessionContextManager[_AS]:
+ return _AsyncSessionContextManager(self)
+
+ # START PROXY METHODS AsyncSession
+
+ # code within this block is **programmatically,
+ # statically generated** by tools/generate_proxy_methods.py
+
+ def __contains__(self, instance: object) -> bool:
+ r"""Return True if the instance is associated with this session.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ The instance may be pending or persistent within the Session for a
+ result of True.
+
+
+ """ # noqa: E501
+
+ return self._proxied.__contains__(instance)
+
+ def __iter__(self) -> Iterator[object]:
+ r"""Iterate over all pending or persistent instances within this
+ Session.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+
+ """ # noqa: E501
+
+ return self._proxied.__iter__()
+
+ def add(self, instance: object, _warn: bool = True) -> None:
+ r"""Place an object into this :class:`_orm.Session`.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ Objects that are in the :term:`transient` state when passed to the
+ :meth:`_orm.Session.add` method will move to the
+ :term:`pending` state, until the next flush, at which point they
+ will move to the :term:`persistent` state.
+
+ Objects that are in the :term:`detached` state when passed to the
+ :meth:`_orm.Session.add` method will move to the :term:`persistent`
+ state directly.
+
+ If the transaction used by the :class:`_orm.Session` is rolled back,
+ objects which were transient when they were passed to
+ :meth:`_orm.Session.add` will be moved back to the
+ :term:`transient` state, and will no longer be present within this
+ :class:`_orm.Session`.
+
+ .. seealso::
+
+ :meth:`_orm.Session.add_all`
+
+ :ref:`session_adding` - at :ref:`session_basics`
+
+
+ """ # noqa: E501
+
+ return self._proxied.add(instance, _warn=_warn)
+
+ def add_all(self, instances: Iterable[object]) -> None:
+ r"""Add the given collection of instances to this :class:`_orm.Session`.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ See the documentation for :meth:`_orm.Session.add` for a general
+ behavioral description.
+
+ .. seealso::
+
+ :meth:`_orm.Session.add`
+
+ :ref:`session_adding` - at :ref:`session_basics`
+
+
+ """ # noqa: E501
+
+ return self._proxied.add_all(instances)
+
+ def expire(
+ self, instance: object, attribute_names: Optional[Iterable[str]] = None
+ ) -> None:
+ r"""Expire the attributes on an instance.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ Marks the attributes of an instance as out of date. When an expired
+ attribute is next accessed, a query will be issued to the
+ :class:`.Session` object's current transactional context in order to
+ load all expired attributes for the given instance. Note that
+ a highly isolated transaction will return the same values as were
+ previously read in that same transaction, regardless of changes
+ in database state outside of that transaction.
+
+ To expire all objects in the :class:`.Session` simultaneously,
+ use :meth:`Session.expire_all`.
+
+ The :class:`.Session` object's default behavior is to
+ expire all state whenever the :meth:`Session.rollback`
+ or :meth:`Session.commit` methods are called, so that new
+ state can be loaded for the new transaction. For this reason,
+ calling :meth:`Session.expire` only makes sense for the specific
+ case that a non-ORM SQL statement was emitted in the current
+ transaction.
+
+ :param instance: The instance to be refreshed.
+ :param attribute_names: optional list of string attribute names
+ indicating a subset of attributes to be expired.
+
+ .. seealso::
+
+ :ref:`session_expire` - introductory material
+
+ :meth:`.Session.expire`
+
+ :meth:`.Session.refresh`
+
+ :meth:`_orm.Query.populate_existing`
+
+
+ """ # noqa: E501
+
+ return self._proxied.expire(instance, attribute_names=attribute_names)
+
+ def expire_all(self) -> None:
+ r"""Expires all persistent instances within this Session.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ When any attributes on a persistent instance is next accessed,
+ a query will be issued using the
+ :class:`.Session` object's current transactional context in order to
+ load all expired attributes for the given instance. Note that
+ a highly isolated transaction will return the same values as were
+ previously read in that same transaction, regardless of changes
+ in database state outside of that transaction.
+
+ To expire individual objects and individual attributes
+ on those objects, use :meth:`Session.expire`.
+
+ The :class:`.Session` object's default behavior is to
+ expire all state whenever the :meth:`Session.rollback`
+ or :meth:`Session.commit` methods are called, so that new
+ state can be loaded for the new transaction. For this reason,
+ calling :meth:`Session.expire_all` is not usually needed,
+ assuming the transaction is isolated.
+
+ .. seealso::
+
+ :ref:`session_expire` - introductory material
+
+ :meth:`.Session.expire`
+
+ :meth:`.Session.refresh`
+
+ :meth:`_orm.Query.populate_existing`
+
+
+ """ # noqa: E501
+
+ return self._proxied.expire_all()
+
+ def expunge(self, instance: object) -> None:
+ r"""Remove the `instance` from this ``Session``.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ This will free all internal references to the instance. Cascading
+ will be applied according to the *expunge* cascade rule.
+
+
+ """ # noqa: E501
+
+ return self._proxied.expunge(instance)
+
+ def expunge_all(self) -> None:
+ r"""Remove all object instances from this ``Session``.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ This is equivalent to calling ``expunge(obj)`` on all objects in this
+ ``Session``.
+
+
+ """ # noqa: E501
+
+ return self._proxied.expunge_all()
+
+ def is_modified(
+ self, instance: object, include_collections: bool = True
+ ) -> bool:
+ r"""Return ``True`` if the given instance has locally
+ modified attributes.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ This method retrieves the history for each instrumented
+ attribute on the instance and performs a comparison of the current
+ value to its previously committed value, if any.
+
+ It is in effect a more expensive and accurate
+ version of checking for the given instance in the
+ :attr:`.Session.dirty` collection; a full test for
+ each attribute's net "dirty" status is performed.
+
+ E.g.::
+
+ return session.is_modified(someobject)
+
+ A few caveats to this method apply:
+
+ * Instances present in the :attr:`.Session.dirty` collection may
+ report ``False`` when tested with this method. This is because
+ the object may have received change events via attribute mutation,
+ thus placing it in :attr:`.Session.dirty`, but ultimately the state
+ is the same as that loaded from the database, resulting in no net
+ change here.
+ * Scalar attributes may not have recorded the previously set
+ value when a new value was applied, if the attribute was not loaded,
+ or was expired, at the time the new value was received - in these
+ cases, the attribute is assumed to have a change, even if there is
+ ultimately no net change against its database value. SQLAlchemy in
+ most cases does not need the "old" value when a set event occurs, so
+ it skips the expense of a SQL call if the old value isn't present,
+ based on the assumption that an UPDATE of the scalar value is
+ usually needed, and in those few cases where it isn't, is less
+ expensive on average than issuing a defensive SELECT.
+
+ The "old" value is fetched unconditionally upon set only if the
+ attribute container has the ``active_history`` flag set to ``True``.
+ This flag is set typically for primary key attributes and scalar
+ object references that are not a simple many-to-one. To set this
+ flag for any arbitrary mapped column, use the ``active_history``
+ argument with :func:`.column_property`.
+
+ :param instance: mapped instance to be tested for pending changes.
+ :param include_collections: Indicates if multivalued collections
+ should be included in the operation. Setting this to ``False`` is a
+ way to detect only local-column based properties (i.e. scalar columns
+ or many-to-one foreign keys) that would result in an UPDATE for this
+ instance upon flush.
+
+
+ """ # noqa: E501
+
+ return self._proxied.is_modified(
+ instance, include_collections=include_collections
+ )
+
+ def in_transaction(self) -> bool:
+ r"""Return True if this :class:`_orm.Session` has begun a transaction.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ .. versionadded:: 1.4
+
+ .. seealso::
+
+ :attr:`_orm.Session.is_active`
+
+
+
+ """ # noqa: E501
+
+ return self._proxied.in_transaction()
+
+ def in_nested_transaction(self) -> bool:
+ r"""Return True if this :class:`_orm.Session` has begun a nested
+ transaction, e.g. SAVEPOINT.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ .. versionadded:: 1.4
+
+
+ """ # noqa: E501
+
+ return self._proxied.in_nested_transaction()
+
+ @property
+ def dirty(self) -> Any:
+ r"""The set of all persistent instances considered dirty.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ E.g.::
+
+ some_mapped_object in session.dirty
+
+ Instances are considered dirty when they were modified but not
+ deleted.
+
+ Note that this 'dirty' calculation is 'optimistic'; most
+ attribute-setting or collection modification operations will
+ mark an instance as 'dirty' and place it in this set, even if
+ there is no net change to the attribute's value. At flush
+ time, the value of each attribute is compared to its
+ previously saved value, and if there's no net change, no SQL
+ operation will occur (this is a more expensive operation so
+ it's only done at flush time).
+
+ To check if an instance has actionable net changes to its
+ attributes, use the :meth:`.Session.is_modified` method.
+
+
+ """ # noqa: E501
+
+ return self._proxied.dirty
+
+ @property
+ def deleted(self) -> Any:
+ r"""The set of all instances marked as 'deleted' within this ``Session``
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ """ # noqa: E501
+
+ return self._proxied.deleted
+
+ @property
+ def new(self) -> Any:
+ r"""The set of all instances marked as 'new' within this ``Session``.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ """ # noqa: E501
+
+ return self._proxied.new
+
+ @property
+ def identity_map(self) -> IdentityMap:
+ r"""Proxy for the :attr:`_orm.Session.identity_map` attribute
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ """ # noqa: E501
+
+ return self._proxied.identity_map
+
+ @identity_map.setter
+ def identity_map(self, attr: IdentityMap) -> None:
+ self._proxied.identity_map = attr
+
+ @property
+ def is_active(self) -> Any:
+ r"""True if this :class:`.Session` not in "partial rollback" state.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ .. versionchanged:: 1.4 The :class:`_orm.Session` no longer begins
+ a new transaction immediately, so this attribute will be False
+ when the :class:`_orm.Session` is first instantiated.
+
+ "partial rollback" state typically indicates that the flush process
+ of the :class:`_orm.Session` has failed, and that the
+ :meth:`_orm.Session.rollback` method must be emitted in order to
+ fully roll back the transaction.
+
+ If this :class:`_orm.Session` is not in a transaction at all, the
+ :class:`_orm.Session` will autobegin when it is first used, so in this
+ case :attr:`_orm.Session.is_active` will return True.
+
+ Otherwise, if this :class:`_orm.Session` is within a transaction,
+ and that transaction has not been rolled back internally, the
+ :attr:`_orm.Session.is_active` will also return True.
+
+ .. seealso::
+
+ :ref:`faq_session_rollback`
+
+ :meth:`_orm.Session.in_transaction`
+
+
+ """ # noqa: E501
+
+ return self._proxied.is_active
+
+ @property
+ def autoflush(self) -> bool:
+ r"""Proxy for the :attr:`_orm.Session.autoflush` attribute
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ """ # noqa: E501
+
+ return self._proxied.autoflush
+
+ @autoflush.setter
+ def autoflush(self, attr: bool) -> None:
+ self._proxied.autoflush = attr
+
+ @property
+ def no_autoflush(self) -> Any:
+ r"""Return a context manager that disables autoflush.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ e.g.::
+
+ with session.no_autoflush:
+
+ some_object = SomeClass()
+ session.add(some_object)
+ # won't autoflush
+ some_object.related_thing = session.query(SomeRelated).first()
+
+ Operations that proceed within the ``with:`` block
+ will not be subject to flushes occurring upon query
+ access. This is useful when initializing a series
+ of objects which involve existing database queries,
+ where the uncompleted object should not yet be flushed.
+
+
+ """ # noqa: E501
+
+ return self._proxied.no_autoflush
+
+ @property
+ def info(self) -> Any:
+ r"""A user-modifiable dictionary.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class
+ on behalf of the :class:`_asyncio.AsyncSession` class.
+
+ The initial value of this dictionary can be populated using the
+ ``info`` argument to the :class:`.Session` constructor or
+ :class:`.sessionmaker` constructor or factory methods. The dictionary
+ here is always local to this :class:`.Session` and can be modified
+ independently of all other :class:`.Session` objects.
+
+
+ """ # noqa: E501
+
+ return self._proxied.info
+
+ @classmethod
+ def object_session(cls, instance: object) -> Optional[Session]:
+ r"""Return the :class:`.Session` to which an object belongs.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ This is an alias of :func:`.object_session`.
+
+
+ """ # noqa: E501
+
+ return Session.object_session(instance)
+
+ @classmethod
+ def identity_key(
+ cls,
+ class_: Optional[Type[Any]] = None,
+ ident: Union[Any, Tuple[Any, ...]] = None,
+ *,
+ instance: Optional[Any] = None,
+ row: Optional[Union[Row[Any], RowMapping]] = None,
+ identity_token: Optional[Any] = None,
+ ) -> _IdentityKeyType[Any]:
+ r"""Return an identity key.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_orm.Session` class on
+ behalf of the :class:`_asyncio.AsyncSession` class.
+
+ This is an alias of :func:`.util.identity_key`.
+
+
+ """ # noqa: E501
+
+ return Session.identity_key(
+ class_=class_,
+ ident=ident,
+ instance=instance,
+ row=row,
+ identity_token=identity_token,
+ )
+
+ # END PROXY METHODS AsyncSession
+
+
+_AS = TypeVar("_AS", bound="AsyncSession")
+
+
+class async_sessionmaker(Generic[_AS]):
+ """A configurable :class:`.AsyncSession` factory.
+
+ The :class:`.async_sessionmaker` factory works in the same way as the
+ :class:`.sessionmaker` factory, to generate new :class:`.AsyncSession`
+ objects when called, creating them given
+ the configurational arguments established here.
+
+ e.g.::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ from sqlalchemy.ext.asyncio import AsyncSession
+ from sqlalchemy.ext.asyncio import async_sessionmaker
+
+ async def run_some_sql(async_session: async_sessionmaker[AsyncSession]) -> None:
+ async with async_session() as session:
+ session.add(SomeObject(data="object"))
+ session.add(SomeOtherObject(name="other object"))
+ await session.commit()
+
+ async def main() -> None:
+ # an AsyncEngine, which the AsyncSession will use for connection
+ # resources
+ engine = create_async_engine('postgresql+asyncpg://scott:tiger@localhost/')
+
+ # create a reusable factory for new AsyncSession instances
+ async_session = async_sessionmaker(engine)
+
+ await run_some_sql(async_session)
+
+ await engine.dispose()
+
+ The :class:`.async_sessionmaker` is useful so that different parts
+ of a program can create new :class:`.AsyncSession` objects with a
+ fixed configuration established up front. Note that :class:`.AsyncSession`
+ objects may also be instantiated directly when not using
+ :class:`.async_sessionmaker`.
+
+ .. versionadded:: 2.0 :class:`.async_sessionmaker` provides a
+ :class:`.sessionmaker` class that's dedicated to the
+ :class:`.AsyncSession` object, including pep-484 typing support.
+
+ .. seealso::
+
+ :ref:`asyncio_orm` - shows example use
+
+ :class:`.sessionmaker` - general overview of the
+ :class:`.sessionmaker` architecture
+
+
+ :ref:`session_getting` - introductory text on creating
+ sessions using :class:`.sessionmaker`.
+
+ """ # noqa E501
+
+ class_: Type[_AS]
+
+ @overload
+ def __init__(
+ self,
+ bind: Optional[_AsyncSessionBind] = ...,
+ *,
+ class_: Type[_AS],
+ autoflush: bool = ...,
+ expire_on_commit: bool = ...,
+ info: Optional[_InfoType] = ...,
+ **kw: Any,
+ ): ...
+
+ @overload
+ def __init__(
+ self: "async_sessionmaker[AsyncSession]",
+ bind: Optional[_AsyncSessionBind] = ...,
+ *,
+ autoflush: bool = ...,
+ expire_on_commit: bool = ...,
+ info: Optional[_InfoType] = ...,
+ **kw: Any,
+ ): ...
+
+ def __init__(
+ self,
+ bind: Optional[_AsyncSessionBind] = None,
+ *,
+ class_: Type[_AS] = AsyncSession, # type: ignore
+ autoflush: bool = True,
+ expire_on_commit: bool = True,
+ info: Optional[_InfoType] = None,
+ **kw: Any,
+ ):
+ r"""Construct a new :class:`.async_sessionmaker`.
+
+ All arguments here except for ``class_`` correspond to arguments
+ accepted by :class:`.Session` directly. See the
+ :meth:`.AsyncSession.__init__` docstring for more details on
+ parameters.
+
+
+ """
+ kw["bind"] = bind
+ kw["autoflush"] = autoflush
+ kw["expire_on_commit"] = expire_on_commit
+ if info is not None:
+ kw["info"] = info
+ self.kw = kw
+ self.class_ = class_
+
+ def begin(self) -> _AsyncSessionContextManager[_AS]:
+ """Produce a context manager that both provides a new
+ :class:`_orm.AsyncSession` as well as a transaction that commits.
+
+
+ e.g.::
+
+ async def main():
+ Session = async_sessionmaker(some_engine)
+
+ async with Session.begin() as session:
+ session.add(some_object)
+
+ # commits transaction, closes session
+
+
+ """
+
+ session = self()
+ return session._maker_context_manager()
+
+ def __call__(self, **local_kw: Any) -> _AS:
+ """Produce a new :class:`.AsyncSession` object using the configuration
+ established in this :class:`.async_sessionmaker`.
+
+ In Python, the ``__call__`` method is invoked on an object when
+ it is "called" in the same way as a function::
+
+ AsyncSession = async_sessionmaker(async_engine, expire_on_commit=False)
+ session = AsyncSession() # invokes sessionmaker.__call__()
+
+ """ # noqa E501
+ for k, v in self.kw.items():
+ if k == "info" and "info" in local_kw:
+ d = v.copy()
+ d.update(local_kw["info"])
+ local_kw["info"] = d
+ else:
+ local_kw.setdefault(k, v)
+ return self.class_(**local_kw)
+
+ def configure(self, **new_kw: Any) -> None:
+ """(Re)configure the arguments for this async_sessionmaker.
+
+ e.g.::
+
+ AsyncSession = async_sessionmaker(some_engine)
+
+ AsyncSession.configure(bind=create_async_engine('sqlite+aiosqlite://'))
+ """ # noqa E501
+
+ self.kw.update(new_kw)
+
+ def __repr__(self) -> str:
+ return "%s(class_=%r, %s)" % (
+ self.__class__.__name__,
+ self.class_.__name__,
+ ", ".join("%s=%r" % (k, v) for k, v in self.kw.items()),
+ )
+
+
+class _AsyncSessionContextManager(Generic[_AS]):
+ __slots__ = ("async_session", "trans")
+
+ async_session: _AS
+ trans: AsyncSessionTransaction
+
+ def __init__(self, async_session: _AS):
+ self.async_session = async_session
+
+ async def __aenter__(self) -> _AS:
+ self.trans = self.async_session.begin()
+ await self.trans.__aenter__()
+ return self.async_session
+
+ async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
+ async def go() -> None:
+ await self.trans.__aexit__(type_, value, traceback)
+ await self.async_session.__aexit__(type_, value, traceback)
+
+ task = asyncio.create_task(go())
+ await asyncio.shield(task)
+
+
+class AsyncSessionTransaction(
+ ReversibleProxy[SessionTransaction],
+ StartableContext["AsyncSessionTransaction"],
+):
+ """A wrapper for the ORM :class:`_orm.SessionTransaction` object.
+
+ This object is provided so that a transaction-holding object
+ for the :meth:`_asyncio.AsyncSession.begin` may be returned.
+
+ The object supports both explicit calls to
+ :meth:`_asyncio.AsyncSessionTransaction.commit` and
+ :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an
+ async context manager.
+
+
+ .. versionadded:: 1.4
+
+ """
+
+ __slots__ = ("session", "sync_transaction", "nested")
+
+ session: AsyncSession
+ sync_transaction: Optional[SessionTransaction]
+
+ def __init__(self, session: AsyncSession, nested: bool = False):
+ self.session = session
+ self.nested = nested
+ self.sync_transaction = None
+
+ @property
+ def is_active(self) -> bool:
+ return (
+ self._sync_transaction() is not None
+ and self._sync_transaction().is_active
+ )
+
+ def _sync_transaction(self) -> SessionTransaction:
+ if not self.sync_transaction:
+ self._raise_for_not_started()
+ return self.sync_transaction
+
+ async def rollback(self) -> None:
+ """Roll back this :class:`_asyncio.AsyncTransaction`."""
+ await greenlet_spawn(self._sync_transaction().rollback)
+
+ async def commit(self) -> None:
+ """Commit this :class:`_asyncio.AsyncTransaction`."""
+
+ await greenlet_spawn(self._sync_transaction().commit)
+
+ async def start(
+ self, is_ctxmanager: bool = False
+ ) -> AsyncSessionTransaction:
+ self.sync_transaction = self._assign_proxied(
+ await greenlet_spawn(
+ self.session.sync_session.begin_nested # type: ignore
+ if self.nested
+ else self.session.sync_session.begin
+ )
+ )
+ if is_ctxmanager:
+ self.sync_transaction.__enter__()
+ return self
+
+ async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
+ await greenlet_spawn(
+ self._sync_transaction().__exit__, type_, value, traceback
+ )
+
+
+def async_object_session(instance: object) -> Optional[AsyncSession]:
+ """Return the :class:`_asyncio.AsyncSession` to which the given instance
+ belongs.
+
+ This function makes use of the sync-API function
+ :class:`_orm.object_session` to retrieve the :class:`_orm.Session` which
+ refers to the given instance, and from there links it to the original
+ :class:`_asyncio.AsyncSession`.
+
+ If the :class:`_asyncio.AsyncSession` has been garbage collected, the
+ return value is ``None``.
+
+ This functionality is also available from the
+ :attr:`_orm.InstanceState.async_session` accessor.
+
+ :param instance: an ORM mapped instance
+ :return: an :class:`_asyncio.AsyncSession` object, or ``None``.
+
+ .. versionadded:: 1.4.18
+
+ """
+
+ session = object_session(instance)
+ if session is not None:
+ return async_session(session)
+ else:
+ return None
+
+
+def async_session(session: Session) -> Optional[AsyncSession]:
+ """Return the :class:`_asyncio.AsyncSession` which is proxying the given
+ :class:`_orm.Session` object, if any.
+
+ :param session: a :class:`_orm.Session` instance.
+ :return: a :class:`_asyncio.AsyncSession` instance, or ``None``.
+
+ .. versionadded:: 1.4.18
+
+ """
+ return AsyncSession._retrieve_proxy_for_target(session, regenerate=False)
+
+
+async def close_all_sessions() -> None:
+ """Close all :class:`_asyncio.AsyncSession` sessions.
+
+ .. versionadded:: 2.0.23
+
+ .. seealso::
+
+ :func:`.session.close_all_sessions`
+
+ """
+ await greenlet_spawn(_sync_close_all_sessions)
+
+
+_instance_state._async_provider = async_session # type: ignore