diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio')
14 files changed, 6302 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__init__.py new file mode 100644 index 0000000..78c707b --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__init__.py @@ -0,0 +1,25 @@ +# ext/asyncio/__init__.py +# Copyright (C) 2020-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +from .engine import async_engine_from_config as async_engine_from_config +from .engine import AsyncConnection as AsyncConnection +from .engine import AsyncEngine as AsyncEngine +from .engine import AsyncTransaction as AsyncTransaction +from .engine import create_async_engine as create_async_engine +from .engine import create_async_pool_from_url as create_async_pool_from_url +from .result import AsyncMappingResult as AsyncMappingResult +from .result import AsyncResult as AsyncResult +from .result import AsyncScalarResult as AsyncScalarResult +from .result import AsyncTupleResult as AsyncTupleResult +from .scoping import async_scoped_session as async_scoped_session +from .session import async_object_session as async_object_session +from .session import async_session as async_session +from .session import async_sessionmaker as async_sessionmaker +from .session import AsyncAttrs as AsyncAttrs +from .session import AsyncSession as AsyncSession +from .session import AsyncSessionTransaction as AsyncSessionTransaction +from .session import close_all_sessions as close_all_sessions diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..a647d42 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/base.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..785ef03 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/base.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/engine.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/engine.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..4326d1c --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/engine.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/exc.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/exc.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..5a71fac --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/exc.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/result.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/result.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..c6ae583 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/result.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/scoping.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/scoping.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..8839d42 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/scoping.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/session.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/session.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..0c267a0 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/__pycache__/session.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/base.py new file mode 100644 index 0000000..9899364 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/base.py @@ -0,0 +1,279 @@ +# ext/asyncio/base.py +# Copyright (C) 2020-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +from __future__ import annotations + +import abc +import functools +from typing import Any +from typing import AsyncGenerator +from typing import AsyncIterator +from typing import Awaitable +from typing import Callable +from typing import ClassVar +from typing import Dict +from typing import Generator +from typing import Generic +from typing import NoReturn +from typing import Optional +from typing import overload +from typing import Tuple +from typing import TypeVar +import weakref + +from . import exc as async_exc +from ... import util +from ...util.typing import Literal +from ...util.typing import Self + +_T = TypeVar("_T", bound=Any) +_T_co = TypeVar("_T_co", bound=Any, covariant=True) + + +_PT = TypeVar("_PT", bound=Any) + + +class ReversibleProxy(Generic[_PT]): + _proxy_objects: ClassVar[ + Dict[weakref.ref[Any], weakref.ref[ReversibleProxy[Any]]] + ] = {} + __slots__ = ("__weakref__",) + + @overload + def _assign_proxied(self, target: _PT) -> _PT: ... + + @overload + def _assign_proxied(self, target: None) -> None: ... + + def _assign_proxied(self, target: Optional[_PT]) -> Optional[_PT]: + if target is not None: + target_ref: weakref.ref[_PT] = weakref.ref( + target, ReversibleProxy._target_gced + ) + proxy_ref = weakref.ref( + self, + functools.partial(ReversibleProxy._target_gced, target_ref), + ) + ReversibleProxy._proxy_objects[target_ref] = proxy_ref + + return target + + @classmethod + def _target_gced( + cls, + ref: weakref.ref[_PT], + proxy_ref: Optional[weakref.ref[Self]] = None, # noqa: U100 + ) -> None: + cls._proxy_objects.pop(ref, None) + + @classmethod + def _regenerate_proxy_for_target(cls, target: _PT) -> Self: + raise NotImplementedError() + + @overload + @classmethod + def _retrieve_proxy_for_target( + cls, + target: _PT, + regenerate: Literal[True] = ..., + ) -> Self: ... + + @overload + @classmethod + def _retrieve_proxy_for_target( + cls, target: _PT, regenerate: bool = True + ) -> Optional[Self]: ... + + @classmethod + def _retrieve_proxy_for_target( + cls, target: _PT, regenerate: bool = True + ) -> Optional[Self]: + try: + proxy_ref = cls._proxy_objects[weakref.ref(target)] + except KeyError: + pass + else: + proxy = proxy_ref() + if proxy is not None: + return proxy # type: ignore + + if regenerate: + return cls._regenerate_proxy_for_target(target) + else: + return None + + +class StartableContext(Awaitable[_T_co], abc.ABC): + __slots__ = () + + @abc.abstractmethod + async def start(self, is_ctxmanager: bool = False) -> _T_co: + raise NotImplementedError() + + def __await__(self) -> Generator[Any, Any, _T_co]: + return self.start().__await__() + + async def __aenter__(self) -> _T_co: + return await self.start(is_ctxmanager=True) + + @abc.abstractmethod + async def __aexit__( + self, type_: Any, value: Any, traceback: Any + ) -> Optional[bool]: + pass + + def _raise_for_not_started(self) -> NoReturn: + raise async_exc.AsyncContextNotStarted( + "%s context has not been started and object has not been awaited." + % (self.__class__.__name__) + ) + + +class GeneratorStartableContext(StartableContext[_T_co]): + __slots__ = ("gen",) + + gen: AsyncGenerator[_T_co, Any] + + def __init__( + self, + func: Callable[..., AsyncIterator[_T_co]], + args: Tuple[Any, ...], + kwds: Dict[str, Any], + ): + self.gen = func(*args, **kwds) # type: ignore + + async def start(self, is_ctxmanager: bool = False) -> _T_co: + try: + start_value = await util.anext_(self.gen) + except StopAsyncIteration: + raise RuntimeError("generator didn't yield") from None + + # if not a context manager, then interrupt the generator, don't + # let it complete. this step is technically not needed, as the + # generator will close in any case at gc time. not clear if having + # this here is a good idea or not (though it helps for clarity IMO) + if not is_ctxmanager: + await self.gen.aclose() + + return start_value + + async def __aexit__( + self, typ: Any, value: Any, traceback: Any + ) -> Optional[bool]: + # vendored from contextlib.py + if typ is None: + try: + await util.anext_(self.gen) + except StopAsyncIteration: + return False + else: + raise RuntimeError("generator didn't stop") + else: + if value is None: + # Need to force instantiation so we can reliably + # tell if we get the same exception back + value = typ() + try: + await self.gen.athrow(value) + except StopAsyncIteration as exc: + # Suppress StopIteration *unless* it's the same exception that + # was passed to throw(). This prevents a StopIteration + # raised inside the "with" statement from being suppressed. + return exc is not value + except RuntimeError as exc: + # Don't re-raise the passed in exception. (issue27122) + if exc is value: + return False + # Avoid suppressing if a Stop(Async)Iteration exception + # was passed to athrow() and later wrapped into a RuntimeError + # (see PEP 479 for sync generators; async generators also + # have this behavior). But do this only if the exception + # wrapped + # by the RuntimeError is actully Stop(Async)Iteration (see + # issue29692). + if ( + isinstance(value, (StopIteration, StopAsyncIteration)) + and exc.__cause__ is value + ): + return False + raise + except BaseException as exc: + # only re-raise if it's *not* the exception that was + # passed to throw(), because __exit__() must not raise + # an exception unless __exit__() itself failed. But throw() + # has to raise the exception to signal propagation, so this + # fixes the impedance mismatch between the throw() protocol + # and the __exit__() protocol. + if exc is not value: + raise + return False + raise RuntimeError("generator didn't stop after athrow()") + + +def asyncstartablecontext( + func: Callable[..., AsyncIterator[_T_co]] +) -> Callable[..., GeneratorStartableContext[_T_co]]: + """@asyncstartablecontext decorator. + + the decorated function can be called either as ``async with fn()``, **or** + ``await fn()``. This is decidedly different from what + ``@contextlib.asynccontextmanager`` supports, and the usage pattern + is different as well. + + Typical usage:: + + @asyncstartablecontext + async def some_async_generator(<arguments>): + <setup> + try: + yield <value> + except GeneratorExit: + # return value was awaited, no context manager is present + # and caller will .close() the resource explicitly + pass + else: + <context manager cleanup> + + + Above, ``GeneratorExit`` is caught if the function were used as an + ``await``. In this case, it's essential that the cleanup does **not** + occur, so there should not be a ``finally`` block. + + If ``GeneratorExit`` is not invoked, this means we're in ``__aexit__`` + and we were invoked as a context manager, and cleanup should proceed. + + + """ + + @functools.wraps(func) + def helper(*args: Any, **kwds: Any) -> GeneratorStartableContext[_T_co]: + return GeneratorStartableContext(func, args, kwds) + + return helper + + +class ProxyComparable(ReversibleProxy[_PT]): + __slots__ = () + + @util.ro_non_memoized_property + def _proxied(self) -> _PT: + raise NotImplementedError() + + def __hash__(self) -> int: + return id(self) + + def __eq__(self, other: Any) -> bool: + return ( + isinstance(other, self.__class__) + and self._proxied == other._proxied + ) + + def __ne__(self, other: Any) -> bool: + return ( + not isinstance(other, self.__class__) + or self._proxied != other._proxied + ) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py new file mode 100644 index 0000000..8fc8e96 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py @@ -0,0 +1,1466 @@ +# ext/asyncio/engine.py +# Copyright (C) 2020-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +from __future__ import annotations + +import asyncio +import contextlib +from typing import Any +from typing import AsyncIterator +from typing import Callable +from typing import Dict +from typing import Generator +from typing import NoReturn +from typing import Optional +from typing import overload +from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union + +from . import exc as async_exc +from .base import asyncstartablecontext +from .base import GeneratorStartableContext +from .base import ProxyComparable +from .base import StartableContext +from .result import _ensure_sync_result +from .result import AsyncResult +from .result import AsyncScalarResult +from ... import exc +from ... import inspection +from ... import util +from ...engine import Connection +from ...engine import create_engine as _create_engine +from ...engine import create_pool_from_url as _create_pool_from_url +from ...engine import Engine +from ...engine.base import NestedTransaction +from ...engine.base import Transaction +from ...exc import ArgumentError +from ...util.concurrency import greenlet_spawn +from ...util.typing import Concatenate +from ...util.typing import ParamSpec + +if TYPE_CHECKING: + from ...engine.cursor import CursorResult + from ...engine.interfaces import _CoreAnyExecuteParams + from ...engine.interfaces import _CoreSingleExecuteParams + from ...engine.interfaces import _DBAPIAnyExecuteParams + from ...engine.interfaces import _ExecuteOptions + from ...engine.interfaces import CompiledCacheType + from ...engine.interfaces import CoreExecuteOptionsParameter + from ...engine.interfaces import Dialect + from ...engine.interfaces import IsolationLevel + from ...engine.interfaces import SchemaTranslateMapType + from ...engine.result import ScalarResult + from ...engine.url import URL + from ...pool import Pool + from ...pool import PoolProxiedConnection + from ...sql._typing import _InfoType + from ...sql.base import Executable + from ...sql.selectable import TypedReturnsRows + +_P = ParamSpec("_P") +_T = TypeVar("_T", bound=Any) + + +def create_async_engine(url: Union[str, URL], **kw: Any) -> AsyncEngine: + """Create a new async engine instance. + + Arguments passed to :func:`_asyncio.create_async_engine` are mostly + identical to those passed to the :func:`_sa.create_engine` function. + The specified dialect must be an asyncio-compatible dialect + such as :ref:`dialect-postgresql-asyncpg`. + + .. versionadded:: 1.4 + + :param async_creator: an async callable which returns a driver-level + asyncio connection. If given, the function should take no arguments, + and return a new asyncio connection from the underlying asyncio + database driver; the connection will be wrapped in the appropriate + structures to be used with the :class:`.AsyncEngine`. Note that the + parameters specified in the URL are not applied here, and the creator + function should use its own connection parameters. + + This parameter is the asyncio equivalent of the + :paramref:`_sa.create_engine.creator` parameter of the + :func:`_sa.create_engine` function. + + .. versionadded:: 2.0.16 + + """ + + if kw.get("server_side_cursors", False): + raise async_exc.AsyncMethodRequired( + "Can't set server_side_cursors for async engine globally; " + "use the connection.stream() method for an async " + "streaming result set" + ) + kw["_is_async"] = True + async_creator = kw.pop("async_creator", None) + if async_creator: + if kw.get("creator", None): + raise ArgumentError( + "Can only specify one of 'async_creator' or 'creator', " + "not both." + ) + + def creator() -> Any: + # note that to send adapted arguments like + # prepared_statement_cache_size, user would use + # "creator" and emulate this form here + return sync_engine.dialect.dbapi.connect( # type: ignore + async_creator_fn=async_creator + ) + + kw["creator"] = creator + sync_engine = _create_engine(url, **kw) + return AsyncEngine(sync_engine) + + +def async_engine_from_config( + configuration: Dict[str, Any], prefix: str = "sqlalchemy.", **kwargs: Any +) -> AsyncEngine: + """Create a new AsyncEngine instance using a configuration dictionary. + + This function is analogous to the :func:`_sa.engine_from_config` function + in SQLAlchemy Core, except that the requested dialect must be an + asyncio-compatible dialect such as :ref:`dialect-postgresql-asyncpg`. + The argument signature of the function is identical to that + of :func:`_sa.engine_from_config`. + + .. versionadded:: 1.4.29 + + """ + options = { + key[len(prefix) :]: value + for key, value in configuration.items() + if key.startswith(prefix) + } + options["_coerce_config"] = True + options.update(kwargs) + url = options.pop("url") + return create_async_engine(url, **options) + + +def create_async_pool_from_url(url: Union[str, URL], **kwargs: Any) -> Pool: + """Create a new async engine instance. + + Arguments passed to :func:`_asyncio.create_async_pool_from_url` are mostly + identical to those passed to the :func:`_sa.create_pool_from_url` function. + The specified dialect must be an asyncio-compatible dialect + such as :ref:`dialect-postgresql-asyncpg`. + + .. versionadded:: 2.0.10 + + """ + kwargs["_is_async"] = True + return _create_pool_from_url(url, **kwargs) + + +class AsyncConnectable: + __slots__ = "_slots_dispatch", "__weakref__" + + @classmethod + def _no_async_engine_events(cls) -> NoReturn: + raise NotImplementedError( + "asynchronous events are not implemented at this time. Apply " + "synchronous listeners to the AsyncEngine.sync_engine or " + "AsyncConnection.sync_connection attributes." + ) + + +@util.create_proxy_methods( + Connection, + ":class:`_engine.Connection`", + ":class:`_asyncio.AsyncConnection`", + classmethods=[], + methods=[], + attributes=[ + "closed", + "invalidated", + "dialect", + "default_isolation_level", + ], +) +class AsyncConnection( + ProxyComparable[Connection], + StartableContext["AsyncConnection"], + AsyncConnectable, +): + """An asyncio proxy for a :class:`_engine.Connection`. + + :class:`_asyncio.AsyncConnection` is acquired using the + :meth:`_asyncio.AsyncEngine.connect` + method of :class:`_asyncio.AsyncEngine`:: + + from sqlalchemy.ext.asyncio import create_async_engine + engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") + + async with engine.connect() as conn: + result = await conn.execute(select(table)) + + .. versionadded:: 1.4 + + """ # noqa + + # AsyncConnection is a thin proxy; no state should be added here + # that is not retrievable from the "sync" engine / connection, e.g. + # current transaction, info, etc. It should be possible to + # create a new AsyncConnection that matches this one given only the + # "sync" elements. + __slots__ = ( + "engine", + "sync_engine", + "sync_connection", + ) + + def __init__( + self, + async_engine: AsyncEngine, + sync_connection: Optional[Connection] = None, + ): + self.engine = async_engine + self.sync_engine = async_engine.sync_engine + self.sync_connection = self._assign_proxied(sync_connection) + + sync_connection: Optional[Connection] + """Reference to the sync-style :class:`_engine.Connection` this + :class:`_asyncio.AsyncConnection` proxies requests towards. + + This instance can be used as an event target. + + .. seealso:: + + :ref:`asyncio_events` + + """ + + sync_engine: Engine + """Reference to the sync-style :class:`_engine.Engine` this + :class:`_asyncio.AsyncConnection` is associated with via its underlying + :class:`_engine.Connection`. + + This instance can be used as an event target. + + .. seealso:: + + :ref:`asyncio_events` + + """ + + @classmethod + def _regenerate_proxy_for_target( + cls, target: Connection + ) -> AsyncConnection: + return AsyncConnection( + AsyncEngine._retrieve_proxy_for_target(target.engine), target + ) + + async def start( + self, is_ctxmanager: bool = False # noqa: U100 + ) -> AsyncConnection: + """Start this :class:`_asyncio.AsyncConnection` object's context + outside of using a Python ``with:`` block. + + """ + if self.sync_connection: + raise exc.InvalidRequestError("connection is already started") + self.sync_connection = self._assign_proxied( + await greenlet_spawn(self.sync_engine.connect) + ) + return self + + @property + def connection(self) -> NoReturn: + """Not implemented for async; call + :meth:`_asyncio.AsyncConnection.get_raw_connection`. + """ + raise exc.InvalidRequestError( + "AsyncConnection.connection accessor is not implemented as the " + "attribute may need to reconnect on an invalidated connection. " + "Use the get_raw_connection() method." + ) + + async def get_raw_connection(self) -> PoolProxiedConnection: + """Return the pooled DBAPI-level connection in use by this + :class:`_asyncio.AsyncConnection`. + + This is a SQLAlchemy connection-pool proxied connection + which then has the attribute + :attr:`_pool._ConnectionFairy.driver_connection` that refers to the + actual driver connection. Its + :attr:`_pool._ConnectionFairy.dbapi_connection` refers instead + to an :class:`_engine.AdaptedConnection` instance that + adapts the driver connection to the DBAPI protocol. + + """ + + return await greenlet_spawn(getattr, self._proxied, "connection") + + @util.ro_non_memoized_property + def info(self) -> _InfoType: + """Return the :attr:`_engine.Connection.info` dictionary of the + underlying :class:`_engine.Connection`. + + This dictionary is freely writable for user-defined state to be + associated with the database connection. + + This attribute is only available if the :class:`.AsyncConnection` is + currently connected. If the :attr:`.AsyncConnection.closed` attribute + is ``True``, then accessing this attribute will raise + :class:`.ResourceClosedError`. + + .. versionadded:: 1.4.0b2 + + """ + return self._proxied.info + + @util.ro_non_memoized_property + def _proxied(self) -> Connection: + if not self.sync_connection: + self._raise_for_not_started() + return self.sync_connection + + def begin(self) -> AsyncTransaction: + """Begin a transaction prior to autobegin occurring.""" + assert self._proxied + return AsyncTransaction(self) + + def begin_nested(self) -> AsyncTransaction: + """Begin a nested transaction and return a transaction handle.""" + assert self._proxied + return AsyncTransaction(self, nested=True) + + async def invalidate( + self, exception: Optional[BaseException] = None + ) -> None: + """Invalidate the underlying DBAPI connection associated with + this :class:`_engine.Connection`. + + See the method :meth:`_engine.Connection.invalidate` for full + detail on this method. + + """ + + return await greenlet_spawn( + self._proxied.invalidate, exception=exception + ) + + async def get_isolation_level(self) -> IsolationLevel: + return await greenlet_spawn(self._proxied.get_isolation_level) + + def in_transaction(self) -> bool: + """Return True if a transaction is in progress.""" + + return self._proxied.in_transaction() + + def in_nested_transaction(self) -> bool: + """Return True if a transaction is in progress. + + .. versionadded:: 1.4.0b2 + + """ + return self._proxied.in_nested_transaction() + + def get_transaction(self) -> Optional[AsyncTransaction]: + """Return an :class:`.AsyncTransaction` representing the current + transaction, if any. + + This makes use of the underlying synchronous connection's + :meth:`_engine.Connection.get_transaction` method to get the current + :class:`_engine.Transaction`, which is then proxied in a new + :class:`.AsyncTransaction` object. + + .. versionadded:: 1.4.0b2 + + """ + + trans = self._proxied.get_transaction() + if trans is not None: + return AsyncTransaction._retrieve_proxy_for_target(trans) + else: + return None + + def get_nested_transaction(self) -> Optional[AsyncTransaction]: + """Return an :class:`.AsyncTransaction` representing the current + nested (savepoint) transaction, if any. + + This makes use of the underlying synchronous connection's + :meth:`_engine.Connection.get_nested_transaction` method to get the + current :class:`_engine.Transaction`, which is then proxied in a new + :class:`.AsyncTransaction` object. + + .. versionadded:: 1.4.0b2 + + """ + + trans = self._proxied.get_nested_transaction() + if trans is not None: + return AsyncTransaction._retrieve_proxy_for_target(trans) + else: + return None + + @overload + async def execution_options( + self, + *, + compiled_cache: Optional[CompiledCacheType] = ..., + logging_token: str = ..., + isolation_level: IsolationLevel = ..., + no_parameters: bool = False, + stream_results: bool = False, + max_row_buffer: int = ..., + yield_per: int = ..., + insertmanyvalues_page_size: int = ..., + schema_translate_map: Optional[SchemaTranslateMapType] = ..., + preserve_rowcount: bool = False, + **opt: Any, + ) -> AsyncConnection: ... + + @overload + async def execution_options(self, **opt: Any) -> AsyncConnection: ... + + async def execution_options(self, **opt: Any) -> AsyncConnection: + r"""Set non-SQL options for the connection which take effect + during execution. + + This returns this :class:`_asyncio.AsyncConnection` object with + the new options added. + + See :meth:`_engine.Connection.execution_options` for full details + on this method. + + """ + + conn = self._proxied + c2 = await greenlet_spawn(conn.execution_options, **opt) + assert c2 is conn + return self + + async def commit(self) -> None: + """Commit the transaction that is currently in progress. + + This method commits the current transaction if one has been started. + If no transaction was started, the method has no effect, assuming + the connection is in a non-invalidated state. + + A transaction is begun on a :class:`_engine.Connection` automatically + whenever a statement is first executed, or when the + :meth:`_engine.Connection.begin` method is called. + + """ + await greenlet_spawn(self._proxied.commit) + + async def rollback(self) -> None: + """Roll back the transaction that is currently in progress. + + This method rolls back the current transaction if one has been started. + If no transaction was started, the method has no effect. If a + transaction was started and the connection is in an invalidated state, + the transaction is cleared using this method. + + A transaction is begun on a :class:`_engine.Connection` automatically + whenever a statement is first executed, or when the + :meth:`_engine.Connection.begin` method is called. + + + """ + await greenlet_spawn(self._proxied.rollback) + + async def close(self) -> None: + """Close this :class:`_asyncio.AsyncConnection`. + + This has the effect of also rolling back the transaction if one + is in place. + + """ + await greenlet_spawn(self._proxied.close) + + async def aclose(self) -> None: + """A synonym for :meth:`_asyncio.AsyncConnection.close`. + + The :meth:`_asyncio.AsyncConnection.aclose` name is specifically + to support the Python standard library ``@contextlib.aclosing`` + context manager function. + + .. versionadded:: 2.0.20 + + """ + await self.close() + + async def exec_driver_sql( + self, + statement: str, + parameters: Optional[_DBAPIAnyExecuteParams] = None, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> CursorResult[Any]: + r"""Executes a driver-level SQL string and return buffered + :class:`_engine.Result`. + + """ + + result = await greenlet_spawn( + self._proxied.exec_driver_sql, + statement, + parameters, + execution_options, + _require_await=True, + ) + + return await _ensure_sync_result(result, self.exec_driver_sql) + + @overload + def stream( + self, + statement: TypedReturnsRows[_T], + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> GeneratorStartableContext[AsyncResult[_T]]: ... + + @overload + def stream( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> GeneratorStartableContext[AsyncResult[Any]]: ... + + @asyncstartablecontext + async def stream( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> AsyncIterator[AsyncResult[Any]]: + """Execute a statement and return an awaitable yielding a + :class:`_asyncio.AsyncResult` object. + + E.g.:: + + result = await conn.stream(stmt): + async for row in result: + print(f"{row}") + + The :meth:`.AsyncConnection.stream` + method supports optional context manager use against the + :class:`.AsyncResult` object, as in:: + + async with conn.stream(stmt) as result: + async for row in result: + print(f"{row}") + + In the above pattern, the :meth:`.AsyncResult.close` method is + invoked unconditionally, even if the iterator is interrupted by an + exception throw. Context manager use remains optional, however, + and the function may be called in either an ``async with fn():`` or + ``await fn()`` style. + + .. versionadded:: 2.0.0b3 added context manager support + + + :return: an awaitable object that will yield an + :class:`_asyncio.AsyncResult` object. + + .. seealso:: + + :meth:`.AsyncConnection.stream_scalars` + + """ + if not self.dialect.supports_server_side_cursors: + raise exc.InvalidRequestError( + "Cant use `stream` or `stream_scalars` with the current " + "dialect since it does not support server side cursors." + ) + + result = await greenlet_spawn( + self._proxied.execute, + statement, + parameters, + execution_options=util.EMPTY_DICT.merge_with( + execution_options, {"stream_results": True} + ), + _require_await=True, + ) + assert result.context._is_server_side + ar = AsyncResult(result) + try: + yield ar + except GeneratorExit: + pass + else: + task = asyncio.create_task(ar.close()) + await asyncio.shield(task) + + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> CursorResult[_T]: ... + + @overload + async def execute( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> CursorResult[Any]: ... + + async def execute( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> CursorResult[Any]: + r"""Executes a SQL statement construct and return a buffered + :class:`_engine.Result`. + + :param object: The statement to be executed. This is always + an object that is in both the :class:`_expression.ClauseElement` and + :class:`_expression.Executable` hierarchies, including: + + * :class:`_expression.Select` + * :class:`_expression.Insert`, :class:`_expression.Update`, + :class:`_expression.Delete` + * :class:`_expression.TextClause` and + :class:`_expression.TextualSelect` + * :class:`_schema.DDL` and objects which inherit from + :class:`_schema.ExecutableDDLElement` + + :param parameters: parameters which will be bound into the statement. + This may be either a dictionary of parameter names to values, + or a mutable sequence (e.g. a list) of dictionaries. When a + list of dictionaries is passed, the underlying statement execution + will make use of the DBAPI ``cursor.executemany()`` method. + When a single dictionary is passed, the DBAPI ``cursor.execute()`` + method will be used. + + :param execution_options: optional dictionary of execution options, + which will be associated with the statement execution. This + dictionary can provide a subset of the options that are accepted + by :meth:`_engine.Connection.execution_options`. + + :return: a :class:`_engine.Result` object. + + """ + result = await greenlet_spawn( + self._proxied.execute, + statement, + parameters, + execution_options=execution_options, + _require_await=True, + ) + return await _ensure_sync_result(result, self.execute) + + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> Optional[_T]: ... + + @overload + async def scalar( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> Any: ... + + async def scalar( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> Any: + r"""Executes a SQL statement construct and returns a scalar object. + + This method is shorthand for invoking the + :meth:`_engine.Result.scalar` method after invoking the + :meth:`_engine.Connection.execute` method. Parameters are equivalent. + + :return: a scalar Python value representing the first column of the + first row returned. + + """ + result = await self.execute( + statement, parameters, execution_options=execution_options + ) + return result.scalar() + + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> ScalarResult[_T]: ... + + @overload + async def scalars( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> ScalarResult[Any]: ... + + async def scalars( + self, + statement: Executable, + parameters: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> ScalarResult[Any]: + r"""Executes a SQL statement construct and returns a scalar objects. + + This method is shorthand for invoking the + :meth:`_engine.Result.scalars` method after invoking the + :meth:`_engine.Connection.execute` method. Parameters are equivalent. + + :return: a :class:`_engine.ScalarResult` object. + + .. versionadded:: 1.4.24 + + """ + result = await self.execute( + statement, parameters, execution_options=execution_options + ) + return result.scalars() + + @overload + def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> GeneratorStartableContext[AsyncScalarResult[_T]]: ... + + @overload + def stream_scalars( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> GeneratorStartableContext[AsyncScalarResult[Any]]: ... + + @asyncstartablecontext + async def stream_scalars( + self, + statement: Executable, + parameters: Optional[_CoreSingleExecuteParams] = None, + *, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + ) -> AsyncIterator[AsyncScalarResult[Any]]: + r"""Execute a statement and return an awaitable yielding a + :class:`_asyncio.AsyncScalarResult` object. + + E.g.:: + + result = await conn.stream_scalars(stmt) + async for scalar in result: + print(f"{scalar}") + + This method is shorthand for invoking the + :meth:`_engine.AsyncResult.scalars` method after invoking the + :meth:`_engine.Connection.stream` method. Parameters are equivalent. + + The :meth:`.AsyncConnection.stream_scalars` + method supports optional context manager use against the + :class:`.AsyncScalarResult` object, as in:: + + async with conn.stream_scalars(stmt) as result: + async for scalar in result: + print(f"{scalar}") + + In the above pattern, the :meth:`.AsyncScalarResult.close` method is + invoked unconditionally, even if the iterator is interrupted by an + exception throw. Context manager use remains optional, however, + and the function may be called in either an ``async with fn():`` or + ``await fn()`` style. + + .. versionadded:: 2.0.0b3 added context manager support + + :return: an awaitable object that will yield an + :class:`_asyncio.AsyncScalarResult` object. + + .. versionadded:: 1.4.24 + + .. seealso:: + + :meth:`.AsyncConnection.stream` + + """ + + async with self.stream( + statement, parameters, execution_options=execution_options + ) as result: + yield result.scalars() + + async def run_sync( + self, + fn: Callable[Concatenate[Connection, _P], _T], + *arg: _P.args, + **kw: _P.kwargs, + ) -> _T: + """Invoke the given synchronous (i.e. not async) callable, + passing a synchronous-style :class:`_engine.Connection` as the first + argument. + + This method allows traditional synchronous SQLAlchemy functions to + run within the context of an asyncio application. + + E.g.:: + + def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str: + '''A synchronous function that does not require awaiting + + :param conn: a Core SQLAlchemy Connection, used synchronously + + :return: an optional return value is supported + + ''' + conn.execute( + some_table.insert().values(int_col=arg1, str_col=arg2) + ) + return "success" + + + async def do_something_async(async_engine: AsyncEngine) -> None: + '''an async function that uses awaiting''' + + async with async_engine.begin() as async_conn: + # run do_something_with_core() with a sync-style + # Connection, proxied into an awaitable + return_code = await async_conn.run_sync(do_something_with_core, 5, "strval") + print(return_code) + + This method maintains the asyncio event loop all the way through + to the database connection by running the given callable in a + specially instrumented greenlet. + + The most rudimentary use of :meth:`.AsyncConnection.run_sync` is to + invoke methods such as :meth:`_schema.MetaData.create_all`, given + an :class:`.AsyncConnection` that needs to be provided to + :meth:`_schema.MetaData.create_all` as a :class:`_engine.Connection` + object:: + + # run metadata.create_all(conn) with a sync-style Connection, + # proxied into an awaitable + with async_engine.begin() as conn: + await conn.run_sync(metadata.create_all) + + .. note:: + + The provided callable is invoked inline within the asyncio event + loop, and will block on traditional IO calls. IO within this + callable should only call into SQLAlchemy's asyncio database + APIs which will be properly adapted to the greenlet context. + + .. seealso:: + + :meth:`.AsyncSession.run_sync` + + :ref:`session_run_sync` + + """ # noqa: E501 + + return await greenlet_spawn( + fn, self._proxied, *arg, _require_await=False, **kw + ) + + def __await__(self) -> Generator[Any, None, AsyncConnection]: + return self.start().__await__() + + async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: + task = asyncio.create_task(self.close()) + await asyncio.shield(task) + + # START PROXY METHODS AsyncConnection + + # code within this block is **programmatically, + # statically generated** by tools/generate_proxy_methods.py + + @property + def closed(self) -> Any: + r"""Return True if this connection is closed. + + .. container:: class_bases + + Proxied for the :class:`_engine.Connection` class + on behalf of the :class:`_asyncio.AsyncConnection` class. + + """ # noqa: E501 + + return self._proxied.closed + + @property + def invalidated(self) -> Any: + r"""Return True if this connection was invalidated. + + .. container:: class_bases + + Proxied for the :class:`_engine.Connection` class + on behalf of the :class:`_asyncio.AsyncConnection` class. + + This does not indicate whether or not the connection was + invalidated at the pool level, however + + + """ # noqa: E501 + + return self._proxied.invalidated + + @property + def dialect(self) -> Dialect: + r"""Proxy for the :attr:`_engine.Connection.dialect` attribute + on behalf of the :class:`_asyncio.AsyncConnection` class. + + """ # noqa: E501 + + return self._proxied.dialect + + @dialect.setter + def dialect(self, attr: Dialect) -> None: + self._proxied.dialect = attr + + @property + def default_isolation_level(self) -> Any: + r"""The initial-connection time isolation level associated with the + :class:`_engine.Dialect` in use. + + .. container:: class_bases + + Proxied for the :class:`_engine.Connection` class + on behalf of the :class:`_asyncio.AsyncConnection` class. + + This value is independent of the + :paramref:`.Connection.execution_options.isolation_level` and + :paramref:`.Engine.execution_options.isolation_level` execution + options, and is determined by the :class:`_engine.Dialect` when the + first connection is created, by performing a SQL query against the + database for the current isolation level before any additional commands + have been emitted. + + Calling this accessor does not invoke any new SQL queries. + + .. seealso:: + + :meth:`_engine.Connection.get_isolation_level` + - view current actual isolation level + + :paramref:`_sa.create_engine.isolation_level` + - set per :class:`_engine.Engine` isolation level + + :paramref:`.Connection.execution_options.isolation_level` + - set per :class:`_engine.Connection` isolation level + + + """ # noqa: E501 + + return self._proxied.default_isolation_level + + # END PROXY METHODS AsyncConnection + + +@util.create_proxy_methods( + Engine, + ":class:`_engine.Engine`", + ":class:`_asyncio.AsyncEngine`", + classmethods=[], + methods=[ + "clear_compiled_cache", + "update_execution_options", + "get_execution_options", + ], + attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"], +) +class AsyncEngine(ProxyComparable[Engine], AsyncConnectable): + """An asyncio proxy for a :class:`_engine.Engine`. + + :class:`_asyncio.AsyncEngine` is acquired using the + :func:`_asyncio.create_async_engine` function:: + + from sqlalchemy.ext.asyncio import create_async_engine + engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") + + .. versionadded:: 1.4 + + """ # noqa + + # AsyncEngine is a thin proxy; no state should be added here + # that is not retrievable from the "sync" engine / connection, e.g. + # current transaction, info, etc. It should be possible to + # create a new AsyncEngine that matches this one given only the + # "sync" elements. + __slots__ = "sync_engine" + + _connection_cls: Type[AsyncConnection] = AsyncConnection + + sync_engine: Engine + """Reference to the sync-style :class:`_engine.Engine` this + :class:`_asyncio.AsyncEngine` proxies requests towards. + + This instance can be used as an event target. + + .. seealso:: + + :ref:`asyncio_events` + """ + + def __init__(self, sync_engine: Engine): + if not sync_engine.dialect.is_async: + raise exc.InvalidRequestError( + "The asyncio extension requires an async driver to be used. " + f"The loaded {sync_engine.dialect.driver!r} is not async." + ) + self.sync_engine = self._assign_proxied(sync_engine) + + @util.ro_non_memoized_property + def _proxied(self) -> Engine: + return self.sync_engine + + @classmethod + def _regenerate_proxy_for_target(cls, target: Engine) -> AsyncEngine: + return AsyncEngine(target) + + @contextlib.asynccontextmanager + async def begin(self) -> AsyncIterator[AsyncConnection]: + """Return a context manager which when entered will deliver an + :class:`_asyncio.AsyncConnection` with an + :class:`_asyncio.AsyncTransaction` established. + + E.g.:: + + async with async_engine.begin() as conn: + await conn.execute( + text("insert into table (x, y, z) values (1, 2, 3)") + ) + await conn.execute(text("my_special_procedure(5)")) + + + """ + conn = self.connect() + + async with conn: + async with conn.begin(): + yield conn + + def connect(self) -> AsyncConnection: + """Return an :class:`_asyncio.AsyncConnection` object. + + The :class:`_asyncio.AsyncConnection` will procure a database + connection from the underlying connection pool when it is entered + as an async context manager:: + + async with async_engine.connect() as conn: + result = await conn.execute(select(user_table)) + + The :class:`_asyncio.AsyncConnection` may also be started outside of a + context manager by invoking its :meth:`_asyncio.AsyncConnection.start` + method. + + """ + + return self._connection_cls(self) + + async def raw_connection(self) -> PoolProxiedConnection: + """Return a "raw" DBAPI connection from the connection pool. + + .. seealso:: + + :ref:`dbapi_connections` + + """ + return await greenlet_spawn(self.sync_engine.raw_connection) + + @overload + def execution_options( + self, + *, + compiled_cache: Optional[CompiledCacheType] = ..., + logging_token: str = ..., + isolation_level: IsolationLevel = ..., + insertmanyvalues_page_size: int = ..., + schema_translate_map: Optional[SchemaTranslateMapType] = ..., + **opt: Any, + ) -> AsyncEngine: ... + + @overload + def execution_options(self, **opt: Any) -> AsyncEngine: ... + + def execution_options(self, **opt: Any) -> AsyncEngine: + """Return a new :class:`_asyncio.AsyncEngine` that will provide + :class:`_asyncio.AsyncConnection` objects with the given execution + options. + + Proxied from :meth:`_engine.Engine.execution_options`. See that + method for details. + + """ + + return AsyncEngine(self.sync_engine.execution_options(**opt)) + + async def dispose(self, close: bool = True) -> None: + """Dispose of the connection pool used by this + :class:`_asyncio.AsyncEngine`. + + :param close: if left at its default of ``True``, has the + effect of fully closing all **currently checked in** + database connections. Connections that are still checked out + will **not** be closed, however they will no longer be associated + with this :class:`_engine.Engine`, + so when they are closed individually, eventually the + :class:`_pool.Pool` which they are associated with will + be garbage collected and they will be closed out fully, if + not already closed on checkin. + + If set to ``False``, the previous connection pool is de-referenced, + and otherwise not touched in any way. + + .. seealso:: + + :meth:`_engine.Engine.dispose` + + """ + + await greenlet_spawn(self.sync_engine.dispose, close=close) + + # START PROXY METHODS AsyncEngine + + # code within this block is **programmatically, + # statically generated** by tools/generate_proxy_methods.py + + def clear_compiled_cache(self) -> None: + r"""Clear the compiled cache associated with the dialect. + + .. container:: class_bases + + Proxied for the :class:`_engine.Engine` class on + behalf of the :class:`_asyncio.AsyncEngine` class. + + This applies **only** to the built-in cache that is established + via the :paramref:`_engine.create_engine.query_cache_size` parameter. + It will not impact any dictionary caches that were passed via the + :paramref:`.Connection.execution_options.compiled_cache` parameter. + + .. versionadded:: 1.4 + + + """ # noqa: E501 + + return self._proxied.clear_compiled_cache() + + def update_execution_options(self, **opt: Any) -> None: + r"""Update the default execution_options dictionary + of this :class:`_engine.Engine`. + + .. container:: class_bases + + Proxied for the :class:`_engine.Engine` class on + behalf of the :class:`_asyncio.AsyncEngine` class. + + The given keys/values in \**opt are added to the + default execution options that will be used for + all connections. The initial contents of this dictionary + can be sent via the ``execution_options`` parameter + to :func:`_sa.create_engine`. + + .. seealso:: + + :meth:`_engine.Connection.execution_options` + + :meth:`_engine.Engine.execution_options` + + + """ # noqa: E501 + + return self._proxied.update_execution_options(**opt) + + def get_execution_options(self) -> _ExecuteOptions: + r"""Get the non-SQL options which will take effect during execution. + + .. container:: class_bases + + Proxied for the :class:`_engine.Engine` class on + behalf of the :class:`_asyncio.AsyncEngine` class. + + .. versionadded: 1.3 + + .. seealso:: + + :meth:`_engine.Engine.execution_options` + + """ # noqa: E501 + + return self._proxied.get_execution_options() + + @property + def url(self) -> URL: + r"""Proxy for the :attr:`_engine.Engine.url` attribute + on behalf of the :class:`_asyncio.AsyncEngine` class. + + """ # noqa: E501 + + return self._proxied.url + + @url.setter + def url(self, attr: URL) -> None: + self._proxied.url = attr + + @property + def pool(self) -> Pool: + r"""Proxy for the :attr:`_engine.Engine.pool` attribute + on behalf of the :class:`_asyncio.AsyncEngine` class. + + """ # noqa: E501 + + return self._proxied.pool + + @pool.setter + def pool(self, attr: Pool) -> None: + self._proxied.pool = attr + + @property + def dialect(self) -> Dialect: + r"""Proxy for the :attr:`_engine.Engine.dialect` attribute + on behalf of the :class:`_asyncio.AsyncEngine` class. + + """ # noqa: E501 + + return self._proxied.dialect + + @dialect.setter + def dialect(self, attr: Dialect) -> None: + self._proxied.dialect = attr + + @property + def engine(self) -> Any: + r"""Returns this :class:`.Engine`. + + .. container:: class_bases + + Proxied for the :class:`_engine.Engine` class + on behalf of the :class:`_asyncio.AsyncEngine` class. + + Used for legacy schemes that accept :class:`.Connection` / + :class:`.Engine` objects within the same variable. + + + """ # noqa: E501 + + return self._proxied.engine + + @property + def name(self) -> Any: + r"""String name of the :class:`~sqlalchemy.engine.interfaces.Dialect` + in use by this :class:`Engine`. + + .. container:: class_bases + + Proxied for the :class:`_engine.Engine` class + on behalf of the :class:`_asyncio.AsyncEngine` class. + + + """ # noqa: E501 + + return self._proxied.name + + @property + def driver(self) -> Any: + r"""Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect` + in use by this :class:`Engine`. + + .. container:: class_bases + + Proxied for the :class:`_engine.Engine` class + on behalf of the :class:`_asyncio.AsyncEngine` class. + + + """ # noqa: E501 + + return self._proxied.driver + + @property + def echo(self) -> Any: + r"""When ``True``, enable log output for this element. + + .. container:: class_bases + + Proxied for the :class:`_engine.Engine` class + on behalf of the :class:`_asyncio.AsyncEngine` class. + + This has the effect of setting the Python logging level for the namespace + of this element's class and object reference. A value of boolean ``True`` + indicates that the loglevel ``logging.INFO`` will be set for the logger, + whereas the string value ``debug`` will set the loglevel to + ``logging.DEBUG``. + + """ # noqa: E501 + + return self._proxied.echo + + @echo.setter + def echo(self, attr: Any) -> None: + self._proxied.echo = attr + + # END PROXY METHODS AsyncEngine + + +class AsyncTransaction( + ProxyComparable[Transaction], StartableContext["AsyncTransaction"] +): + """An asyncio proxy for a :class:`_engine.Transaction`.""" + + __slots__ = ("connection", "sync_transaction", "nested") + + sync_transaction: Optional[Transaction] + connection: AsyncConnection + nested: bool + + def __init__(self, connection: AsyncConnection, nested: bool = False): + self.connection = connection + self.sync_transaction = None + self.nested = nested + + @classmethod + def _regenerate_proxy_for_target( + cls, target: Transaction + ) -> AsyncTransaction: + sync_connection = target.connection + sync_transaction = target + nested = isinstance(target, NestedTransaction) + + async_connection = AsyncConnection._retrieve_proxy_for_target( + sync_connection + ) + assert async_connection is not None + + obj = cls.__new__(cls) + obj.connection = async_connection + obj.sync_transaction = obj._assign_proxied(sync_transaction) + obj.nested = nested + return obj + + @util.ro_non_memoized_property + def _proxied(self) -> Transaction: + if not self.sync_transaction: + self._raise_for_not_started() + return self.sync_transaction + + @property + def is_valid(self) -> bool: + return self._proxied.is_valid + + @property + def is_active(self) -> bool: + return self._proxied.is_active + + async def close(self) -> None: + """Close this :class:`.AsyncTransaction`. + + If this transaction is the base transaction in a begin/commit + nesting, the transaction will rollback(). Otherwise, the + method returns. + + This is used to cancel a Transaction without affecting the scope of + an enclosing transaction. + + """ + await greenlet_spawn(self._proxied.close) + + async def rollback(self) -> None: + """Roll back this :class:`.AsyncTransaction`.""" + await greenlet_spawn(self._proxied.rollback) + + async def commit(self) -> None: + """Commit this :class:`.AsyncTransaction`.""" + + await greenlet_spawn(self._proxied.commit) + + async def start(self, is_ctxmanager: bool = False) -> AsyncTransaction: + """Start this :class:`_asyncio.AsyncTransaction` object's context + outside of using a Python ``with:`` block. + + """ + + self.sync_transaction = self._assign_proxied( + await greenlet_spawn( + self.connection._proxied.begin_nested + if self.nested + else self.connection._proxied.begin + ) + ) + if is_ctxmanager: + self.sync_transaction.__enter__() + return self + + async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: + await greenlet_spawn(self._proxied.__exit__, type_, value, traceback) + + +@overload +def _get_sync_engine_or_connection(async_engine: AsyncEngine) -> Engine: ... + + +@overload +def _get_sync_engine_or_connection( + async_engine: AsyncConnection, +) -> Connection: ... + + +def _get_sync_engine_or_connection( + async_engine: Union[AsyncEngine, AsyncConnection] +) -> Union[Engine, Connection]: + if isinstance(async_engine, AsyncConnection): + return async_engine._proxied + + try: + return async_engine.sync_engine + except AttributeError as e: + raise exc.ArgumentError( + "AsyncEngine expected, got %r" % async_engine + ) from e + + +@inspection._inspects(AsyncConnection) +def _no_insp_for_async_conn_yet( + subject: AsyncConnection, # noqa: U100 +) -> NoReturn: + raise exc.NoInspectionAvailable( + "Inspection on an AsyncConnection is currently not supported. " + "Please use ``run_sync`` to pass a callable where it's possible " + "to call ``inspect`` on the passed connection.", + code="xd3s", + ) + + +@inspection._inspects(AsyncEngine) +def _no_insp_for_async_engine_xyet( + subject: AsyncEngine, # noqa: U100 +) -> NoReturn: + raise exc.NoInspectionAvailable( + "Inspection on an AsyncEngine is currently not supported. " + "Please obtain a connection then use ``conn.run_sync`` to pass a " + "callable where it's possible to call ``inspect`` on the " + "passed connection.", + code="xd3s", + ) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/exc.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/exc.py new file mode 100644 index 0000000..1cf6f36 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/exc.py @@ -0,0 +1,21 @@ +# ext/asyncio/exc.py +# Copyright (C) 2020-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +from ... import exc + + +class AsyncMethodRequired(exc.InvalidRequestError): + """an API can't be used because its result would not be + compatible with async""" + + +class AsyncContextNotStarted(exc.InvalidRequestError): + """a startable context manager has not been started.""" + + +class AsyncContextAlreadyStarted(exc.InvalidRequestError): + """a startable context manager is already started.""" diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/result.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/result.py new file mode 100644 index 0000000..7dcbe32 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/result.py @@ -0,0 +1,961 @@ +# ext/asyncio/result.py +# Copyright (C) 2020-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +from __future__ import annotations + +import operator +from typing import Any +from typing import AsyncIterator +from typing import Optional +from typing import overload +from typing import Sequence +from typing import Tuple +from typing import TYPE_CHECKING +from typing import TypeVar + +from . import exc as async_exc +from ... import util +from ...engine import Result +from ...engine.result import _NO_ROW +from ...engine.result import _R +from ...engine.result import _WithKeys +from ...engine.result import FilterResult +from ...engine.result import FrozenResult +from ...engine.result import ResultMetaData +from ...engine.row import Row +from ...engine.row import RowMapping +from ...sql.base import _generative +from ...util.concurrency import greenlet_spawn +from ...util.typing import Literal +from ...util.typing import Self + +if TYPE_CHECKING: + from ...engine import CursorResult + from ...engine.result import _KeyIndexType + from ...engine.result import _UniqueFilterType + +_T = TypeVar("_T", bound=Any) +_TP = TypeVar("_TP", bound=Tuple[Any, ...]) + + +class AsyncCommon(FilterResult[_R]): + __slots__ = () + + _real_result: Result[Any] + _metadata: ResultMetaData + + async def close(self) -> None: # type: ignore[override] + """Close this result.""" + + await greenlet_spawn(self._real_result.close) + + @property + def closed(self) -> bool: + """proxies the .closed attribute of the underlying result object, + if any, else raises ``AttributeError``. + + .. versionadded:: 2.0.0b3 + + """ + return self._real_result.closed + + +class AsyncResult(_WithKeys, AsyncCommon[Row[_TP]]): + """An asyncio wrapper around a :class:`_result.Result` object. + + The :class:`_asyncio.AsyncResult` only applies to statement executions that + use a server-side cursor. It is returned only from the + :meth:`_asyncio.AsyncConnection.stream` and + :meth:`_asyncio.AsyncSession.stream` methods. + + .. note:: As is the case with :class:`_engine.Result`, this object is + used for ORM results returned by :meth:`_asyncio.AsyncSession.execute`, + which can yield instances of ORM mapped objects either individually or + within tuple-like rows. Note that these result objects do not + deduplicate instances or rows automatically as is the case with the + legacy :class:`_orm.Query` object. For in-Python de-duplication of + instances or rows, use the :meth:`_asyncio.AsyncResult.unique` modifier + method. + + .. versionadded:: 1.4 + + """ + + __slots__ = () + + _real_result: Result[_TP] + + def __init__(self, real_result: Result[_TP]): + self._real_result = real_result + + self._metadata = real_result._metadata + self._unique_filter_state = real_result._unique_filter_state + self._post_creational_filter = None + + # BaseCursorResult pre-generates the "_row_getter". Use that + # if available rather than building a second one + if "_row_getter" in real_result.__dict__: + self._set_memoized_attribute( + "_row_getter", real_result.__dict__["_row_getter"] + ) + + @property + def t(self) -> AsyncTupleResult[_TP]: + """Apply a "typed tuple" typing filter to returned rows. + + The :attr:`_asyncio.AsyncResult.t` attribute is a synonym for + calling the :meth:`_asyncio.AsyncResult.tuples` method. + + .. versionadded:: 2.0 + + """ + return self # type: ignore + + def tuples(self) -> AsyncTupleResult[_TP]: + """Apply a "typed tuple" typing filter to returned rows. + + This method returns the same :class:`_asyncio.AsyncResult` object + at runtime, + however annotates as returning a :class:`_asyncio.AsyncTupleResult` + object that will indicate to :pep:`484` typing tools that plain typed + ``Tuple`` instances are returned rather than rows. This allows + tuple unpacking and ``__getitem__`` access of :class:`_engine.Row` + objects to by typed, for those cases where the statement invoked + itself included typing information. + + .. versionadded:: 2.0 + + :return: the :class:`_result.AsyncTupleResult` type at typing time. + + .. seealso:: + + :attr:`_asyncio.AsyncResult.t` - shorter synonym + + :attr:`_engine.Row.t` - :class:`_engine.Row` version + + """ + + return self # type: ignore + + @_generative + def unique(self, strategy: Optional[_UniqueFilterType] = None) -> Self: + """Apply unique filtering to the objects returned by this + :class:`_asyncio.AsyncResult`. + + Refer to :meth:`_engine.Result.unique` in the synchronous + SQLAlchemy API for a complete behavioral description. + + """ + self._unique_filter_state = (set(), strategy) + return self + + def columns(self, *col_expressions: _KeyIndexType) -> Self: + r"""Establish the columns that should be returned in each row. + + Refer to :meth:`_engine.Result.columns` in the synchronous + SQLAlchemy API for a complete behavioral description. + + """ + return self._column_slices(col_expressions) + + async def partitions( + self, size: Optional[int] = None + ) -> AsyncIterator[Sequence[Row[_TP]]]: + """Iterate through sub-lists of rows of the size given. + + An async iterator is returned:: + + async def scroll_results(connection): + result = await connection.stream(select(users_table)) + + async for partition in result.partitions(100): + print("list of rows: %s" % partition) + + Refer to :meth:`_engine.Result.partitions` in the synchronous + SQLAlchemy API for a complete behavioral description. + + """ + + getter = self._manyrow_getter + + while True: + partition = await greenlet_spawn(getter, self, size) + if partition: + yield partition + else: + break + + async def fetchall(self) -> Sequence[Row[_TP]]: + """A synonym for the :meth:`_asyncio.AsyncResult.all` method. + + .. versionadded:: 2.0 + + """ + + return await greenlet_spawn(self._allrows) + + async def fetchone(self) -> Optional[Row[_TP]]: + """Fetch one row. + + When all rows are exhausted, returns None. + + This method is provided for backwards compatibility with + SQLAlchemy 1.x.x. + + To fetch the first row of a result only, use the + :meth:`_asyncio.AsyncResult.first` method. To iterate through all + rows, iterate the :class:`_asyncio.AsyncResult` object directly. + + :return: a :class:`_engine.Row` object if no filters are applied, + or ``None`` if no rows remain. + + """ + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + return None + else: + return row + + async def fetchmany( + self, size: Optional[int] = None + ) -> Sequence[Row[_TP]]: + """Fetch many rows. + + When all rows are exhausted, returns an empty list. + + This method is provided for backwards compatibility with + SQLAlchemy 1.x.x. + + To fetch rows in groups, use the + :meth:`._asyncio.AsyncResult.partitions` method. + + :return: a list of :class:`_engine.Row` objects. + + .. seealso:: + + :meth:`_asyncio.AsyncResult.partitions` + + """ + + return await greenlet_spawn(self._manyrow_getter, self, size) + + async def all(self) -> Sequence[Row[_TP]]: + """Return all rows in a list. + + Closes the result set after invocation. Subsequent invocations + will return an empty list. + + :return: a list of :class:`_engine.Row` objects. + + """ + + return await greenlet_spawn(self._allrows) + + def __aiter__(self) -> AsyncResult[_TP]: + return self + + async def __anext__(self) -> Row[_TP]: + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + raise StopAsyncIteration() + else: + return row + + async def first(self) -> Optional[Row[_TP]]: + """Fetch the first row or ``None`` if no row is present. + + Closes the result set and discards remaining rows. + + .. note:: This method returns one **row**, e.g. tuple, by default. + To return exactly one single scalar value, that is, the first + column of the first row, use the + :meth:`_asyncio.AsyncResult.scalar` method, + or combine :meth:`_asyncio.AsyncResult.scalars` and + :meth:`_asyncio.AsyncResult.first`. + + Additionally, in contrast to the behavior of the legacy ORM + :meth:`_orm.Query.first` method, **no limit is applied** to the + SQL query which was invoked to produce this + :class:`_asyncio.AsyncResult`; + for a DBAPI driver that buffers results in memory before yielding + rows, all rows will be sent to the Python process and all but + the first row will be discarded. + + .. seealso:: + + :ref:`migration_20_unify_select` + + :return: a :class:`_engine.Row` object, or None + if no rows remain. + + .. seealso:: + + :meth:`_asyncio.AsyncResult.scalar` + + :meth:`_asyncio.AsyncResult.one` + + """ + return await greenlet_spawn(self._only_one_row, False, False, False) + + async def one_or_none(self) -> Optional[Row[_TP]]: + """Return at most one result or raise an exception. + + Returns ``None`` if the result has no rows. + Raises :class:`.MultipleResultsFound` + if multiple rows are returned. + + .. versionadded:: 1.4 + + :return: The first :class:`_engine.Row` or ``None`` if no row + is available. + + :raises: :class:`.MultipleResultsFound` + + .. seealso:: + + :meth:`_asyncio.AsyncResult.first` + + :meth:`_asyncio.AsyncResult.one` + + """ + return await greenlet_spawn(self._only_one_row, True, False, False) + + @overload + async def scalar_one(self: AsyncResult[Tuple[_T]]) -> _T: ... + + @overload + async def scalar_one(self) -> Any: ... + + async def scalar_one(self) -> Any: + """Return exactly one scalar result or raise an exception. + + This is equivalent to calling :meth:`_asyncio.AsyncResult.scalars` and + then :meth:`_asyncio.AsyncResult.one`. + + .. seealso:: + + :meth:`_asyncio.AsyncResult.one` + + :meth:`_asyncio.AsyncResult.scalars` + + """ + return await greenlet_spawn(self._only_one_row, True, True, True) + + @overload + async def scalar_one_or_none( + self: AsyncResult[Tuple[_T]], + ) -> Optional[_T]: ... + + @overload + async def scalar_one_or_none(self) -> Optional[Any]: ... + + async def scalar_one_or_none(self) -> Optional[Any]: + """Return exactly one scalar result or ``None``. + + This is equivalent to calling :meth:`_asyncio.AsyncResult.scalars` and + then :meth:`_asyncio.AsyncResult.one_or_none`. + + .. seealso:: + + :meth:`_asyncio.AsyncResult.one_or_none` + + :meth:`_asyncio.AsyncResult.scalars` + + """ + return await greenlet_spawn(self._only_one_row, True, False, True) + + async def one(self) -> Row[_TP]: + """Return exactly one row or raise an exception. + + Raises :class:`.NoResultFound` if the result returns no + rows, or :class:`.MultipleResultsFound` if multiple rows + would be returned. + + .. note:: This method returns one **row**, e.g. tuple, by default. + To return exactly one single scalar value, that is, the first + column of the first row, use the + :meth:`_asyncio.AsyncResult.scalar_one` method, or combine + :meth:`_asyncio.AsyncResult.scalars` and + :meth:`_asyncio.AsyncResult.one`. + + .. versionadded:: 1.4 + + :return: The first :class:`_engine.Row`. + + :raises: :class:`.MultipleResultsFound`, :class:`.NoResultFound` + + .. seealso:: + + :meth:`_asyncio.AsyncResult.first` + + :meth:`_asyncio.AsyncResult.one_or_none` + + :meth:`_asyncio.AsyncResult.scalar_one` + + """ + return await greenlet_spawn(self._only_one_row, True, True, False) + + @overload + async def scalar(self: AsyncResult[Tuple[_T]]) -> Optional[_T]: ... + + @overload + async def scalar(self) -> Any: ... + + async def scalar(self) -> Any: + """Fetch the first column of the first row, and close the result set. + + Returns ``None`` if there are no rows to fetch. + + No validation is performed to test if additional rows remain. + + After calling this method, the object is fully closed, + e.g. the :meth:`_engine.CursorResult.close` + method will have been called. + + :return: a Python scalar value, or ``None`` if no rows remain. + + """ + return await greenlet_spawn(self._only_one_row, False, False, True) + + async def freeze(self) -> FrozenResult[_TP]: + """Return a callable object that will produce copies of this + :class:`_asyncio.AsyncResult` when invoked. + + The callable object returned is an instance of + :class:`_engine.FrozenResult`. + + This is used for result set caching. The method must be called + on the result when it has been unconsumed, and calling the method + will consume the result fully. When the :class:`_engine.FrozenResult` + is retrieved from a cache, it can be called any number of times where + it will produce a new :class:`_engine.Result` object each time + against its stored set of rows. + + .. seealso:: + + :ref:`do_orm_execute_re_executing` - example usage within the + ORM to implement a result-set cache. + + """ + + return await greenlet_spawn(FrozenResult, self) + + @overload + def scalars( + self: AsyncResult[Tuple[_T]], index: Literal[0] + ) -> AsyncScalarResult[_T]: ... + + @overload + def scalars(self: AsyncResult[Tuple[_T]]) -> AsyncScalarResult[_T]: ... + + @overload + def scalars(self, index: _KeyIndexType = 0) -> AsyncScalarResult[Any]: ... + + def scalars(self, index: _KeyIndexType = 0) -> AsyncScalarResult[Any]: + """Return an :class:`_asyncio.AsyncScalarResult` filtering object which + will return single elements rather than :class:`_row.Row` objects. + + Refer to :meth:`_result.Result.scalars` in the synchronous + SQLAlchemy API for a complete behavioral description. + + :param index: integer or row key indicating the column to be fetched + from each row, defaults to ``0`` indicating the first column. + + :return: a new :class:`_asyncio.AsyncScalarResult` filtering object + referring to this :class:`_asyncio.AsyncResult` object. + + """ + return AsyncScalarResult(self._real_result, index) + + def mappings(self) -> AsyncMappingResult: + """Apply a mappings filter to returned rows, returning an instance of + :class:`_asyncio.AsyncMappingResult`. + + When this filter is applied, fetching rows will return + :class:`_engine.RowMapping` objects instead of :class:`_engine.Row` + objects. + + :return: a new :class:`_asyncio.AsyncMappingResult` filtering object + referring to the underlying :class:`_result.Result` object. + + """ + + return AsyncMappingResult(self._real_result) + + +class AsyncScalarResult(AsyncCommon[_R]): + """A wrapper for a :class:`_asyncio.AsyncResult` that returns scalar values + rather than :class:`_row.Row` values. + + The :class:`_asyncio.AsyncScalarResult` object is acquired by calling the + :meth:`_asyncio.AsyncResult.scalars` method. + + Refer to the :class:`_result.ScalarResult` object in the synchronous + SQLAlchemy API for a complete behavioral description. + + .. versionadded:: 1.4 + + """ + + __slots__ = () + + _generate_rows = False + + def __init__(self, real_result: Result[Any], index: _KeyIndexType): + self._real_result = real_result + + if real_result._source_supports_scalars: + self._metadata = real_result._metadata + self._post_creational_filter = None + else: + self._metadata = real_result._metadata._reduce([index]) + self._post_creational_filter = operator.itemgetter(0) + + self._unique_filter_state = real_result._unique_filter_state + + def unique( + self, + strategy: Optional[_UniqueFilterType] = None, + ) -> Self: + """Apply unique filtering to the objects returned by this + :class:`_asyncio.AsyncScalarResult`. + + See :meth:`_asyncio.AsyncResult.unique` for usage details. + + """ + self._unique_filter_state = (set(), strategy) + return self + + async def partitions( + self, size: Optional[int] = None + ) -> AsyncIterator[Sequence[_R]]: + """Iterate through sub-lists of elements of the size given. + + Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that + scalar values, rather than :class:`_engine.Row` objects, + are returned. + + """ + + getter = self._manyrow_getter + + while True: + partition = await greenlet_spawn(getter, self, size) + if partition: + yield partition + else: + break + + async def fetchall(self) -> Sequence[_R]: + """A synonym for the :meth:`_asyncio.AsyncScalarResult.all` method.""" + + return await greenlet_spawn(self._allrows) + + async def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]: + """Fetch many objects. + + Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that + scalar values, rather than :class:`_engine.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._manyrow_getter, self, size) + + async def all(self) -> Sequence[_R]: + """Return all scalar values in a list. + + Equivalent to :meth:`_asyncio.AsyncResult.all` except that + scalar values, rather than :class:`_engine.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._allrows) + + def __aiter__(self) -> AsyncScalarResult[_R]: + return self + + async def __anext__(self) -> _R: + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + raise StopAsyncIteration() + else: + return row + + async def first(self) -> Optional[_R]: + """Fetch the first object or ``None`` if no object is present. + + Equivalent to :meth:`_asyncio.AsyncResult.first` except that + scalar values, rather than :class:`_engine.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._only_one_row, False, False, False) + + async def one_or_none(self) -> Optional[_R]: + """Return at most one object or raise an exception. + + Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that + scalar values, rather than :class:`_engine.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._only_one_row, True, False, False) + + async def one(self) -> _R: + """Return exactly one object or raise an exception. + + Equivalent to :meth:`_asyncio.AsyncResult.one` except that + scalar values, rather than :class:`_engine.Row` objects, + are returned. + + """ + return await greenlet_spawn(self._only_one_row, True, True, False) + + +class AsyncMappingResult(_WithKeys, AsyncCommon[RowMapping]): + """A wrapper for a :class:`_asyncio.AsyncResult` that returns dictionary + values rather than :class:`_engine.Row` values. + + The :class:`_asyncio.AsyncMappingResult` object is acquired by calling the + :meth:`_asyncio.AsyncResult.mappings` method. + + Refer to the :class:`_result.MappingResult` object in the synchronous + SQLAlchemy API for a complete behavioral description. + + .. versionadded:: 1.4 + + """ + + __slots__ = () + + _generate_rows = True + + _post_creational_filter = operator.attrgetter("_mapping") + + def __init__(self, result: Result[Any]): + self._real_result = result + self._unique_filter_state = result._unique_filter_state + self._metadata = result._metadata + if result._source_supports_scalars: + self._metadata = self._metadata._reduce([0]) + + def unique( + self, + strategy: Optional[_UniqueFilterType] = None, + ) -> Self: + """Apply unique filtering to the objects returned by this + :class:`_asyncio.AsyncMappingResult`. + + See :meth:`_asyncio.AsyncResult.unique` for usage details. + + """ + self._unique_filter_state = (set(), strategy) + return self + + def columns(self, *col_expressions: _KeyIndexType) -> Self: + r"""Establish the columns that should be returned in each row.""" + return self._column_slices(col_expressions) + + async def partitions( + self, size: Optional[int] = None + ) -> AsyncIterator[Sequence[RowMapping]]: + """Iterate through sub-lists of elements of the size given. + + Equivalent to :meth:`_asyncio.AsyncResult.partitions` except that + :class:`_engine.RowMapping` values, rather than :class:`_engine.Row` + objects, are returned. + + """ + + getter = self._manyrow_getter + + while True: + partition = await greenlet_spawn(getter, self, size) + if partition: + yield partition + else: + break + + async def fetchall(self) -> Sequence[RowMapping]: + """A synonym for the :meth:`_asyncio.AsyncMappingResult.all` method.""" + + return await greenlet_spawn(self._allrows) + + async def fetchone(self) -> Optional[RowMapping]: + """Fetch one object. + + Equivalent to :meth:`_asyncio.AsyncResult.fetchone` except that + :class:`_engine.RowMapping` values, rather than :class:`_engine.Row` + objects, are returned. + + """ + + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + return None + else: + return row + + async def fetchmany( + self, size: Optional[int] = None + ) -> Sequence[RowMapping]: + """Fetch many rows. + + Equivalent to :meth:`_asyncio.AsyncResult.fetchmany` except that + :class:`_engine.RowMapping` values, rather than :class:`_engine.Row` + objects, are returned. + + """ + + return await greenlet_spawn(self._manyrow_getter, self, size) + + async def all(self) -> Sequence[RowMapping]: + """Return all rows in a list. + + Equivalent to :meth:`_asyncio.AsyncResult.all` except that + :class:`_engine.RowMapping` values, rather than :class:`_engine.Row` + objects, are returned. + + """ + + return await greenlet_spawn(self._allrows) + + def __aiter__(self) -> AsyncMappingResult: + return self + + async def __anext__(self) -> RowMapping: + row = await greenlet_spawn(self._onerow_getter, self) + if row is _NO_ROW: + raise StopAsyncIteration() + else: + return row + + async def first(self) -> Optional[RowMapping]: + """Fetch the first object or ``None`` if no object is present. + + Equivalent to :meth:`_asyncio.AsyncResult.first` except that + :class:`_engine.RowMapping` values, rather than :class:`_engine.Row` + objects, are returned. + + """ + return await greenlet_spawn(self._only_one_row, False, False, False) + + async def one_or_none(self) -> Optional[RowMapping]: + """Return at most one object or raise an exception. + + Equivalent to :meth:`_asyncio.AsyncResult.one_or_none` except that + :class:`_engine.RowMapping` values, rather than :class:`_engine.Row` + objects, are returned. + + """ + return await greenlet_spawn(self._only_one_row, True, False, False) + + async def one(self) -> RowMapping: + """Return exactly one object or raise an exception. + + Equivalent to :meth:`_asyncio.AsyncResult.one` except that + :class:`_engine.RowMapping` values, rather than :class:`_engine.Row` + objects, are returned. + + """ + return await greenlet_spawn(self._only_one_row, True, True, False) + + +class AsyncTupleResult(AsyncCommon[_R], util.TypingOnly): + """A :class:`_asyncio.AsyncResult` that's typed as returning plain + Python tuples instead of rows. + + Since :class:`_engine.Row` acts like a tuple in every way already, + this class is a typing only class, regular :class:`_asyncio.AsyncResult` is + still used at runtime. + + """ + + __slots__ = () + + if TYPE_CHECKING: + + async def partitions( + self, size: Optional[int] = None + ) -> AsyncIterator[Sequence[_R]]: + """Iterate through sub-lists of elements of the size given. + + Equivalent to :meth:`_result.Result.partitions` except that + tuple values, rather than :class:`_engine.Row` objects, + are returned. + + """ + ... + + async def fetchone(self) -> Optional[_R]: + """Fetch one tuple. + + Equivalent to :meth:`_result.Result.fetchone` except that + tuple values, rather than :class:`_engine.Row` + objects, are returned. + + """ + ... + + async def fetchall(self) -> Sequence[_R]: + """A synonym for the :meth:`_engine.ScalarResult.all` method.""" + ... + + async def fetchmany(self, size: Optional[int] = None) -> Sequence[_R]: + """Fetch many objects. + + Equivalent to :meth:`_result.Result.fetchmany` except that + tuple values, rather than :class:`_engine.Row` objects, + are returned. + + """ + ... + + async def all(self) -> Sequence[_R]: # noqa: A001 + """Return all scalar values in a list. + + Equivalent to :meth:`_result.Result.all` except that + tuple values, rather than :class:`_engine.Row` objects, + are returned. + + """ + ... + + async def __aiter__(self) -> AsyncIterator[_R]: ... + + async def __anext__(self) -> _R: ... + + async def first(self) -> Optional[_R]: + """Fetch the first object or ``None`` if no object is present. + + Equivalent to :meth:`_result.Result.first` except that + tuple values, rather than :class:`_engine.Row` objects, + are returned. + + + """ + ... + + async def one_or_none(self) -> Optional[_R]: + """Return at most one object or raise an exception. + + Equivalent to :meth:`_result.Result.one_or_none` except that + tuple values, rather than :class:`_engine.Row` objects, + are returned. + + """ + ... + + async def one(self) -> _R: + """Return exactly one object or raise an exception. + + Equivalent to :meth:`_result.Result.one` except that + tuple values, rather than :class:`_engine.Row` objects, + are returned. + + """ + ... + + @overload + async def scalar_one(self: AsyncTupleResult[Tuple[_T]]) -> _T: ... + + @overload + async def scalar_one(self) -> Any: ... + + async def scalar_one(self) -> Any: + """Return exactly one scalar result or raise an exception. + + This is equivalent to calling :meth:`_engine.Result.scalars` + and then :meth:`_engine.Result.one`. + + .. seealso:: + + :meth:`_engine.Result.one` + + :meth:`_engine.Result.scalars` + + """ + ... + + @overload + async def scalar_one_or_none( + self: AsyncTupleResult[Tuple[_T]], + ) -> Optional[_T]: ... + + @overload + async def scalar_one_or_none(self) -> Optional[Any]: ... + + async def scalar_one_or_none(self) -> Optional[Any]: + """Return exactly one or no scalar result. + + This is equivalent to calling :meth:`_engine.Result.scalars` + and then :meth:`_engine.Result.one_or_none`. + + .. seealso:: + + :meth:`_engine.Result.one_or_none` + + :meth:`_engine.Result.scalars` + + """ + ... + + @overload + async def scalar( + self: AsyncTupleResult[Tuple[_T]], + ) -> Optional[_T]: ... + + @overload + async def scalar(self) -> Any: ... + + async def scalar(self) -> Any: + """Fetch the first column of the first row, and close the result + set. + + Returns ``None`` if there are no rows to fetch. + + No validation is performed to test if additional rows remain. + + After calling this method, the object is fully closed, + e.g. the :meth:`_engine.CursorResult.close` + method will have been called. + + :return: a Python scalar value , or ``None`` if no rows remain. + + """ + ... + + +_RT = TypeVar("_RT", bound="Result[Any]") + + +async def _ensure_sync_result(result: _RT, calling_method: Any) -> _RT: + cursor_result: CursorResult[Any] + + try: + is_cursor = result._is_cursor + except AttributeError: + # legacy execute(DefaultGenerator) case + return result + + if not is_cursor: + cursor_result = getattr(result, "raw", None) # type: ignore + else: + cursor_result = result # type: ignore + if cursor_result and cursor_result.context._is_server_side: + await greenlet_spawn(cursor_result.close) + raise async_exc.AsyncMethodRequired( + "Can't use the %s.%s() method with a " + "server-side cursor. " + "Use the %s.stream() method for an async " + "streaming result set." + % ( + calling_method.__self__.__class__.__name__, + calling_method.__name__, + calling_method.__self__.__class__.__name__, + ) + ) + return result diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/scoping.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/scoping.py new file mode 100644 index 0000000..e879a16 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/scoping.py @@ -0,0 +1,1614 @@ +# ext/asyncio/scoping.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php + +from __future__ import annotations + +from typing import Any +from typing import Callable +from typing import Generic +from typing import Iterable +from typing import Iterator +from typing import Optional +from typing import overload +from typing import Sequence +from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union + +from .session import _AS +from .session import async_sessionmaker +from .session import AsyncSession +from ... import exc as sa_exc +from ... import util +from ...orm.session import Session +from ...util import create_proxy_methods +from ...util import ScopedRegistry +from ...util import warn +from ...util import warn_deprecated + +if TYPE_CHECKING: + from .engine import AsyncConnection + from .result import AsyncResult + from .result import AsyncScalarResult + from .session import AsyncSessionTransaction + from ...engine import Connection + from ...engine import CursorResult + from ...engine import Engine + from ...engine import Result + from ...engine import Row + from ...engine import RowMapping + from ...engine.interfaces import _CoreAnyExecuteParams + from ...engine.interfaces import CoreExecuteOptionsParameter + from ...engine.result import ScalarResult + from ...orm._typing import _IdentityKeyType + from ...orm._typing import _O + from ...orm._typing import OrmExecuteOptionsParameter + from ...orm.interfaces import ORMOption + from ...orm.session import _BindArguments + from ...orm.session import _EntityBindKey + from ...orm.session import _PKIdentityArgument + from ...orm.session import _SessionBind + from ...sql.base import Executable + from ...sql.dml import UpdateBase + from ...sql.elements import ClauseElement + from ...sql.selectable import ForUpdateParameter + from ...sql.selectable import TypedReturnsRows + +_T = TypeVar("_T", bound=Any) + + +@create_proxy_methods( + AsyncSession, + ":class:`_asyncio.AsyncSession`", + ":class:`_asyncio.scoping.async_scoped_session`", + classmethods=["close_all", "object_session", "identity_key"], + methods=[ + "__contains__", + "__iter__", + "aclose", + "add", + "add_all", + "begin", + "begin_nested", + "close", + "reset", + "commit", + "connection", + "delete", + "execute", + "expire", + "expire_all", + "expunge", + "expunge_all", + "flush", + "get_bind", + "is_modified", + "invalidate", + "merge", + "refresh", + "rollback", + "scalar", + "scalars", + "get", + "get_one", + "stream", + "stream_scalars", + ], + attributes=[ + "bind", + "dirty", + "deleted", + "new", + "identity_map", + "is_active", + "autoflush", + "no_autoflush", + "info", + ], + use_intermediate_variable=["get"], +) +class async_scoped_session(Generic[_AS]): + """Provides scoped management of :class:`.AsyncSession` objects. + + See the section :ref:`asyncio_scoped_session` for usage details. + + .. versionadded:: 1.4.19 + + + """ + + _support_async = True + + session_factory: async_sessionmaker[_AS] + """The `session_factory` provided to `__init__` is stored in this + attribute and may be accessed at a later time. This can be useful when + a new non-scoped :class:`.AsyncSession` is needed.""" + + registry: ScopedRegistry[_AS] + + def __init__( + self, + session_factory: async_sessionmaker[_AS], + scopefunc: Callable[[], Any], + ): + """Construct a new :class:`_asyncio.async_scoped_session`. + + :param session_factory: a factory to create new :class:`_asyncio.AsyncSession` + instances. This is usually, but not necessarily, an instance + of :class:`_asyncio.async_sessionmaker`. + + :param scopefunc: function which defines + the current scope. A function such as ``asyncio.current_task`` + may be useful here. + + """ # noqa: E501 + + self.session_factory = session_factory + self.registry = ScopedRegistry(session_factory, scopefunc) + + @property + def _proxied(self) -> _AS: + return self.registry() + + def __call__(self, **kw: Any) -> _AS: + r"""Return the current :class:`.AsyncSession`, creating it + using the :attr:`.scoped_session.session_factory` if not present. + + :param \**kw: Keyword arguments will be passed to the + :attr:`.scoped_session.session_factory` callable, if an existing + :class:`.AsyncSession` is not present. If the + :class:`.AsyncSession` is present + and keyword arguments have been passed, + :exc:`~sqlalchemy.exc.InvalidRequestError` is raised. + + """ + if kw: + if self.registry.has(): + raise sa_exc.InvalidRequestError( + "Scoped session is already present; " + "no new arguments may be specified." + ) + else: + sess = self.session_factory(**kw) + self.registry.set(sess) + else: + sess = self.registry() + if not self._support_async and sess._is_asyncio: + warn_deprecated( + "Using `scoped_session` with asyncio is deprecated and " + "will raise an error in a future version. " + "Please use `async_scoped_session` instead.", + "1.4.23", + ) + return sess + + def configure(self, **kwargs: Any) -> None: + """reconfigure the :class:`.sessionmaker` used by this + :class:`.scoped_session`. + + See :meth:`.sessionmaker.configure`. + + """ + + if self.registry.has(): + warn( + "At least one scoped session is already present. " + " configure() can not affect sessions that have " + "already been created." + ) + + self.session_factory.configure(**kwargs) + + async def remove(self) -> None: + """Dispose of the current :class:`.AsyncSession`, if present. + + Different from scoped_session's remove method, this method would use + await to wait for the close method of AsyncSession. + + """ + + if self.registry.has(): + await self.registry().close() + self.registry.clear() + + # START PROXY METHODS async_scoped_session + + # code within this block is **programmatically, + # statically generated** by tools/generate_proxy_methods.py + + def __contains__(self, instance: object) -> bool: + r"""Return True if the instance is associated with this session. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + The instance may be pending or persistent within the Session for a + result of True. + + + + """ # noqa: E501 + + return self._proxied.__contains__(instance) + + def __iter__(self) -> Iterator[object]: + r"""Iterate over all pending or persistent instances within this + Session. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + + + """ # noqa: E501 + + return self._proxied.__iter__() + + async def aclose(self) -> None: + r"""A synonym for :meth:`_asyncio.AsyncSession.close`. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + The :meth:`_asyncio.AsyncSession.aclose` name is specifically + to support the Python standard library ``@contextlib.aclosing`` + context manager function. + + .. versionadded:: 2.0.20 + + + """ # noqa: E501 + + return await self._proxied.aclose() + + def add(self, instance: object, _warn: bool = True) -> None: + r"""Place an object into this :class:`_orm.Session`. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + Objects that are in the :term:`transient` state when passed to the + :meth:`_orm.Session.add` method will move to the + :term:`pending` state, until the next flush, at which point they + will move to the :term:`persistent` state. + + Objects that are in the :term:`detached` state when passed to the + :meth:`_orm.Session.add` method will move to the :term:`persistent` + state directly. + + If the transaction used by the :class:`_orm.Session` is rolled back, + objects which were transient when they were passed to + :meth:`_orm.Session.add` will be moved back to the + :term:`transient` state, and will no longer be present within this + :class:`_orm.Session`. + + .. seealso:: + + :meth:`_orm.Session.add_all` + + :ref:`session_adding` - at :ref:`session_basics` + + + + """ # noqa: E501 + + return self._proxied.add(instance, _warn=_warn) + + def add_all(self, instances: Iterable[object]) -> None: + r"""Add the given collection of instances to this :class:`_orm.Session`. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + See the documentation for :meth:`_orm.Session.add` for a general + behavioral description. + + .. seealso:: + + :meth:`_orm.Session.add` + + :ref:`session_adding` - at :ref:`session_basics` + + + + """ # noqa: E501 + + return self._proxied.add_all(instances) + + def begin(self) -> AsyncSessionTransaction: + r"""Return an :class:`_asyncio.AsyncSessionTransaction` object. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + The underlying :class:`_orm.Session` will perform the + "begin" action when the :class:`_asyncio.AsyncSessionTransaction` + object is entered:: + + async with async_session.begin(): + # .. ORM transaction is begun + + Note that database IO will not normally occur when the session-level + transaction is begun, as database transactions begin on an + on-demand basis. However, the begin block is async to accommodate + for a :meth:`_orm.SessionEvents.after_transaction_create` + event hook that may perform IO. + + For a general description of ORM begin, see + :meth:`_orm.Session.begin`. + + + """ # noqa: E501 + + return self._proxied.begin() + + def begin_nested(self) -> AsyncSessionTransaction: + r"""Return an :class:`_asyncio.AsyncSessionTransaction` object + which will begin a "nested" transaction, e.g. SAVEPOINT. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`. + + For a general description of ORM begin nested, see + :meth:`_orm.Session.begin_nested`. + + .. seealso:: + + :ref:`aiosqlite_serializable` - special workarounds required + with the SQLite asyncio driver in order for SAVEPOINT to work + correctly. + + + """ # noqa: E501 + + return self._proxied.begin_nested() + + async def close(self) -> None: + r"""Close out the transactional resources and ORM objects used by this + :class:`_asyncio.AsyncSession`. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. seealso:: + + :meth:`_orm.Session.close` - main documentation for + "close" + + :ref:`session_closing` - detail on the semantics of + :meth:`_asyncio.AsyncSession.close` and + :meth:`_asyncio.AsyncSession.reset`. + + + """ # noqa: E501 + + return await self._proxied.close() + + async def reset(self) -> None: + r"""Close out the transactional resources and ORM objects used by this + :class:`_orm.Session`, resetting the session to its initial state. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. versionadded:: 2.0.22 + + .. seealso:: + + :meth:`_orm.Session.reset` - main documentation for + "reset" + + :ref:`session_closing` - detail on the semantics of + :meth:`_asyncio.AsyncSession.close` and + :meth:`_asyncio.AsyncSession.reset`. + + + """ # noqa: E501 + + return await self._proxied.reset() + + async def commit(self) -> None: + r"""Commit the current transaction in progress. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. seealso:: + + :meth:`_orm.Session.commit` - main documentation for + "commit" + + """ # noqa: E501 + + return await self._proxied.commit() + + async def connection( + self, + bind_arguments: Optional[_BindArguments] = None, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + **kw: Any, + ) -> AsyncConnection: + r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to + this :class:`.Session` object's transactional state. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + This method may also be used to establish execution options for the + database connection used by the current transaction. + + .. versionadded:: 1.4.24 Added \**kw arguments which are passed + through to the underlying :meth:`_orm.Session.connection` method. + + .. seealso:: + + :meth:`_orm.Session.connection` - main documentation for + "connection" + + + """ # noqa: E501 + + return await self._proxied.connection( + bind_arguments=bind_arguments, + execution_options=execution_options, + **kw, + ) + + async def delete(self, instance: object) -> None: + r"""Mark an instance as deleted. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + The database delete operation occurs upon ``flush()``. + + As this operation may need to cascade along unloaded relationships, + it is awaitable to allow for those queries to take place. + + .. seealso:: + + :meth:`_orm.Session.delete` - main documentation for delete + + + """ # noqa: E501 + + return await self._proxied.delete(instance) + + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[_T]: ... + + @overload + async def execute( + self, + statement: UpdateBase, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> CursorResult[Any]: ... + + @overload + async def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[Any]: ... + + async def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Result[Any]: + r"""Execute a statement and return a buffered + :class:`_engine.Result` object. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. seealso:: + + :meth:`_orm.Session.execute` - main documentation for execute + + + """ # noqa: E501 + + return await self._proxied.execute( + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + + def expire( + self, instance: object, attribute_names: Optional[Iterable[str]] = None + ) -> None: + r"""Expire the attributes on an instance. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + Marks the attributes of an instance as out of date. When an expired + attribute is next accessed, a query will be issued to the + :class:`.Session` object's current transactional context in order to + load all expired attributes for the given instance. Note that + a highly isolated transaction will return the same values as were + previously read in that same transaction, regardless of changes + in database state outside of that transaction. + + To expire all objects in the :class:`.Session` simultaneously, + use :meth:`Session.expire_all`. + + The :class:`.Session` object's default behavior is to + expire all state whenever the :meth:`Session.rollback` + or :meth:`Session.commit` methods are called, so that new + state can be loaded for the new transaction. For this reason, + calling :meth:`Session.expire` only makes sense for the specific + case that a non-ORM SQL statement was emitted in the current + transaction. + + :param instance: The instance to be refreshed. + :param attribute_names: optional list of string attribute names + indicating a subset of attributes to be expired. + + .. seealso:: + + :ref:`session_expire` - introductory material + + :meth:`.Session.expire` + + :meth:`.Session.refresh` + + :meth:`_orm.Query.populate_existing` + + + + """ # noqa: E501 + + return self._proxied.expire(instance, attribute_names=attribute_names) + + def expire_all(self) -> None: + r"""Expires all persistent instances within this Session. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + When any attributes on a persistent instance is next accessed, + a query will be issued using the + :class:`.Session` object's current transactional context in order to + load all expired attributes for the given instance. Note that + a highly isolated transaction will return the same values as were + previously read in that same transaction, regardless of changes + in database state outside of that transaction. + + To expire individual objects and individual attributes + on those objects, use :meth:`Session.expire`. + + The :class:`.Session` object's default behavior is to + expire all state whenever the :meth:`Session.rollback` + or :meth:`Session.commit` methods are called, so that new + state can be loaded for the new transaction. For this reason, + calling :meth:`Session.expire_all` is not usually needed, + assuming the transaction is isolated. + + .. seealso:: + + :ref:`session_expire` - introductory material + + :meth:`.Session.expire` + + :meth:`.Session.refresh` + + :meth:`_orm.Query.populate_existing` + + + + """ # noqa: E501 + + return self._proxied.expire_all() + + def expunge(self, instance: object) -> None: + r"""Remove the `instance` from this ``Session``. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This will free all internal references to the instance. Cascading + will be applied according to the *expunge* cascade rule. + + + + """ # noqa: E501 + + return self._proxied.expunge(instance) + + def expunge_all(self) -> None: + r"""Remove all object instances from this ``Session``. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This is equivalent to calling ``expunge(obj)`` on all objects in this + ``Session``. + + + + """ # noqa: E501 + + return self._proxied.expunge_all() + + async def flush(self, objects: Optional[Sequence[Any]] = None) -> None: + r"""Flush all the object changes to the database. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. seealso:: + + :meth:`_orm.Session.flush` - main documentation for flush + + + """ # noqa: E501 + + return await self._proxied.flush(objects=objects) + + def get_bind( + self, + mapper: Optional[_EntityBindKey[_O]] = None, + clause: Optional[ClauseElement] = None, + bind: Optional[_SessionBind] = None, + **kw: Any, + ) -> Union[Engine, Connection]: + r"""Return a "bind" to which the synchronous proxied :class:`_orm.Session` + is bound. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + Unlike the :meth:`_orm.Session.get_bind` method, this method is + currently **not** used by this :class:`.AsyncSession` in any way + in order to resolve engines for requests. + + .. note:: + + This method proxies directly to the :meth:`_orm.Session.get_bind` + method, however is currently **not** useful as an override target, + in contrast to that of the :meth:`_orm.Session.get_bind` method. + The example below illustrates how to implement custom + :meth:`_orm.Session.get_bind` schemes that work with + :class:`.AsyncSession` and :class:`.AsyncEngine`. + + The pattern introduced at :ref:`session_custom_partitioning` + illustrates how to apply a custom bind-lookup scheme to a + :class:`_orm.Session` given a set of :class:`_engine.Engine` objects. + To apply a corresponding :meth:`_orm.Session.get_bind` implementation + for use with a :class:`.AsyncSession` and :class:`.AsyncEngine` + objects, continue to subclass :class:`_orm.Session` and apply it to + :class:`.AsyncSession` using + :paramref:`.AsyncSession.sync_session_class`. The inner method must + continue to return :class:`_engine.Engine` instances, which can be + acquired from a :class:`_asyncio.AsyncEngine` using the + :attr:`_asyncio.AsyncEngine.sync_engine` attribute:: + + # using example from "Custom Vertical Partitioning" + + + import random + + from sqlalchemy.ext.asyncio import AsyncSession + from sqlalchemy.ext.asyncio import create_async_engine + from sqlalchemy.ext.asyncio import async_sessionmaker + from sqlalchemy.orm import Session + + # construct async engines w/ async drivers + engines = { + 'leader':create_async_engine("sqlite+aiosqlite:///leader.db"), + 'other':create_async_engine("sqlite+aiosqlite:///other.db"), + 'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"), + 'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"), + } + + class RoutingSession(Session): + def get_bind(self, mapper=None, clause=None, **kw): + # within get_bind(), return sync engines + if mapper and issubclass(mapper.class_, MyOtherClass): + return engines['other'].sync_engine + elif self._flushing or isinstance(clause, (Update, Delete)): + return engines['leader'].sync_engine + else: + return engines[ + random.choice(['follower1','follower2']) + ].sync_engine + + # apply to AsyncSession using sync_session_class + AsyncSessionMaker = async_sessionmaker( + sync_session_class=RoutingSession + ) + + The :meth:`_orm.Session.get_bind` method is called in a non-asyncio, + implicitly non-blocking context in the same manner as ORM event hooks + and functions that are invoked via :meth:`.AsyncSession.run_sync`, so + routines that wish to run SQL commands inside of + :meth:`_orm.Session.get_bind` can continue to do so using + blocking-style code, which will be translated to implicitly async calls + at the point of invoking IO on the database drivers. + + + """ # noqa: E501 + + return self._proxied.get_bind( + mapper=mapper, clause=clause, bind=bind, **kw + ) + + def is_modified( + self, instance: object, include_collections: bool = True + ) -> bool: + r"""Return ``True`` if the given instance has locally + modified attributes. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This method retrieves the history for each instrumented + attribute on the instance and performs a comparison of the current + value to its previously committed value, if any. + + It is in effect a more expensive and accurate + version of checking for the given instance in the + :attr:`.Session.dirty` collection; a full test for + each attribute's net "dirty" status is performed. + + E.g.:: + + return session.is_modified(someobject) + + A few caveats to this method apply: + + * Instances present in the :attr:`.Session.dirty` collection may + report ``False`` when tested with this method. This is because + the object may have received change events via attribute mutation, + thus placing it in :attr:`.Session.dirty`, but ultimately the state + is the same as that loaded from the database, resulting in no net + change here. + * Scalar attributes may not have recorded the previously set + value when a new value was applied, if the attribute was not loaded, + or was expired, at the time the new value was received - in these + cases, the attribute is assumed to have a change, even if there is + ultimately no net change against its database value. SQLAlchemy in + most cases does not need the "old" value when a set event occurs, so + it skips the expense of a SQL call if the old value isn't present, + based on the assumption that an UPDATE of the scalar value is + usually needed, and in those few cases where it isn't, is less + expensive on average than issuing a defensive SELECT. + + The "old" value is fetched unconditionally upon set only if the + attribute container has the ``active_history`` flag set to ``True``. + This flag is set typically for primary key attributes and scalar + object references that are not a simple many-to-one. To set this + flag for any arbitrary mapped column, use the ``active_history`` + argument with :func:`.column_property`. + + :param instance: mapped instance to be tested for pending changes. + :param include_collections: Indicates if multivalued collections + should be included in the operation. Setting this to ``False`` is a + way to detect only local-column based properties (i.e. scalar columns + or many-to-one foreign keys) that would result in an UPDATE for this + instance upon flush. + + + + """ # noqa: E501 + + return self._proxied.is_modified( + instance, include_collections=include_collections + ) + + async def invalidate(self) -> None: + r"""Close this Session, using connection invalidation. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + For a complete description, see :meth:`_orm.Session.invalidate`. + + """ # noqa: E501 + + return await self._proxied.invalidate() + + async def merge( + self, + instance: _O, + *, + load: bool = True, + options: Optional[Sequence[ORMOption]] = None, + ) -> _O: + r"""Copy the state of a given instance into a corresponding instance + within this :class:`_asyncio.AsyncSession`. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. seealso:: + + :meth:`_orm.Session.merge` - main documentation for merge + + + """ # noqa: E501 + + return await self._proxied.merge(instance, load=load, options=options) + + async def refresh( + self, + instance: object, + attribute_names: Optional[Iterable[str]] = None, + with_for_update: ForUpdateParameter = None, + ) -> None: + r"""Expire and refresh the attributes on the given instance. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + A query will be issued to the database and all attributes will be + refreshed with their current database value. + + This is the async version of the :meth:`_orm.Session.refresh` method. + See that method for a complete description of all options. + + .. seealso:: + + :meth:`_orm.Session.refresh` - main documentation for refresh + + + """ # noqa: E501 + + return await self._proxied.refresh( + instance, + attribute_names=attribute_names, + with_for_update=with_for_update, + ) + + async def rollback(self) -> None: + r"""Rollback the current transaction in progress. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. seealso:: + + :meth:`_orm.Session.rollback` - main documentation for + "rollback" + + """ # noqa: E501 + + return await self._proxied.rollback() + + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Optional[_T]: ... + + @overload + async def scalar( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: ... + + async def scalar( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: + r"""Execute a statement and return a scalar result. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. seealso:: + + :meth:`_orm.Session.scalar` - main documentation for scalar + + + """ # noqa: E501 + + return await self._proxied.scalar( + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[_T]: ... + + @overload + async def scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: ... + + async def scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: + r"""Execute a statement and return scalar results. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + :return: a :class:`_result.ScalarResult` object + + .. versionadded:: 1.4.24 Added :meth:`_asyncio.AsyncSession.scalars` + + .. versionadded:: 1.4.26 Added + :meth:`_asyncio.async_scoped_session.scalars` + + .. seealso:: + + :meth:`_orm.Session.scalars` - main documentation for scalars + + :meth:`_asyncio.AsyncSession.stream_scalars` - streaming version + + + """ # noqa: E501 + + return await self._proxied.scalars( + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + + async def get( + self, + entity: _EntityBindKey[_O], + ident: _PKIdentityArgument, + *, + options: Optional[Sequence[ORMOption]] = None, + populate_existing: bool = False, + with_for_update: ForUpdateParameter = None, + identity_token: Optional[Any] = None, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + ) -> Union[_O, None]: + r"""Return an instance based on the given primary key identifier, + or ``None`` if not found. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. seealso:: + + :meth:`_orm.Session.get` - main documentation for get + + + + """ # noqa: E501 + + result = await self._proxied.get( + entity, + ident, + options=options, + populate_existing=populate_existing, + with_for_update=with_for_update, + identity_token=identity_token, + execution_options=execution_options, + ) + return result + + async def get_one( + self, + entity: _EntityBindKey[_O], + ident: _PKIdentityArgument, + *, + options: Optional[Sequence[ORMOption]] = None, + populate_existing: bool = False, + with_for_update: ForUpdateParameter = None, + identity_token: Optional[Any] = None, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + ) -> _O: + r"""Return an instance based on the given primary key identifier, + or raise an exception if not found. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + Raises ``sqlalchemy.orm.exc.NoResultFound`` if the query selects + no rows. + + ..versionadded: 2.0.22 + + .. seealso:: + + :meth:`_orm.Session.get_one` - main documentation for get_one + + + """ # noqa: E501 + + return await self._proxied.get_one( + entity, + ident, + options=options, + populate_existing=populate_existing, + with_for_update=with_for_update, + identity_token=identity_token, + execution_options=execution_options, + ) + + @overload + async def stream( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[_T]: ... + + @overload + async def stream( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[Any]: ... + + async def stream( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[Any]: + r"""Execute a statement and return a streaming + :class:`_asyncio.AsyncResult` object. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + + """ # noqa: E501 + + return await self._proxied.stream( + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + + @overload + async def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[_T]: ... + + @overload + async def stream_scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[Any]: ... + + async def stream_scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[Any]: + r"""Execute a statement and return a stream of scalar results. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + :return: an :class:`_asyncio.AsyncScalarResult` object + + .. versionadded:: 1.4.24 + + .. seealso:: + + :meth:`_orm.Session.scalars` - main documentation for scalars + + :meth:`_asyncio.AsyncSession.scalars` - non streaming version + + + """ # noqa: E501 + + return await self._proxied.stream_scalars( + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + + @property + def bind(self) -> Any: + r"""Proxy for the :attr:`_asyncio.AsyncSession.bind` attribute + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + """ # noqa: E501 + + return self._proxied.bind + + @bind.setter + def bind(self, attr: Any) -> None: + self._proxied.bind = attr + + @property + def dirty(self) -> Any: + r"""The set of all persistent instances considered dirty. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + E.g.:: + + some_mapped_object in session.dirty + + Instances are considered dirty when they were modified but not + deleted. + + Note that this 'dirty' calculation is 'optimistic'; most + attribute-setting or collection modification operations will + mark an instance as 'dirty' and place it in this set, even if + there is no net change to the attribute's value. At flush + time, the value of each attribute is compared to its + previously saved value, and if there's no net change, no SQL + operation will occur (this is a more expensive operation so + it's only done at flush time). + + To check if an instance has actionable net changes to its + attributes, use the :meth:`.Session.is_modified` method. + + + + """ # noqa: E501 + + return self._proxied.dirty + + @property + def deleted(self) -> Any: + r"""The set of all instances marked as 'deleted' within this ``Session`` + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + + """ # noqa: E501 + + return self._proxied.deleted + + @property + def new(self) -> Any: + r"""The set of all instances marked as 'new' within this ``Session``. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + + """ # noqa: E501 + + return self._proxied.new + + @property + def identity_map(self) -> Any: + r"""Proxy for the :attr:`_orm.Session.identity_map` attribute + on behalf of the :class:`_asyncio.AsyncSession` class. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + + """ # noqa: E501 + + return self._proxied.identity_map + + @identity_map.setter + def identity_map(self, attr: Any) -> None: + self._proxied.identity_map = attr + + @property + def is_active(self) -> Any: + r"""True if this :class:`.Session` not in "partial rollback" state. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + .. versionchanged:: 1.4 The :class:`_orm.Session` no longer begins + a new transaction immediately, so this attribute will be False + when the :class:`_orm.Session` is first instantiated. + + "partial rollback" state typically indicates that the flush process + of the :class:`_orm.Session` has failed, and that the + :meth:`_orm.Session.rollback` method must be emitted in order to + fully roll back the transaction. + + If this :class:`_orm.Session` is not in a transaction at all, the + :class:`_orm.Session` will autobegin when it is first used, so in this + case :attr:`_orm.Session.is_active` will return True. + + Otherwise, if this :class:`_orm.Session` is within a transaction, + and that transaction has not been rolled back internally, the + :attr:`_orm.Session.is_active` will also return True. + + .. seealso:: + + :ref:`faq_session_rollback` + + :meth:`_orm.Session.in_transaction` + + + + """ # noqa: E501 + + return self._proxied.is_active + + @property + def autoflush(self) -> Any: + r"""Proxy for the :attr:`_orm.Session.autoflush` attribute + on behalf of the :class:`_asyncio.AsyncSession` class. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + + """ # noqa: E501 + + return self._proxied.autoflush + + @autoflush.setter + def autoflush(self, attr: Any) -> None: + self._proxied.autoflush = attr + + @property + def no_autoflush(self) -> Any: + r"""Return a context manager that disables autoflush. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + e.g.:: + + with session.no_autoflush: + + some_object = SomeClass() + session.add(some_object) + # won't autoflush + some_object.related_thing = session.query(SomeRelated).first() + + Operations that proceed within the ``with:`` block + will not be subject to flushes occurring upon query + access. This is useful when initializing a series + of objects which involve existing database queries, + where the uncompleted object should not yet be flushed. + + + + """ # noqa: E501 + + return self._proxied.no_autoflush + + @property + def info(self) -> Any: + r"""A user-modifiable dictionary. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class + on behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + The initial value of this dictionary can be populated using the + ``info`` argument to the :class:`.Session` constructor or + :class:`.sessionmaker` constructor or factory methods. The dictionary + here is always local to this :class:`.Session` and can be modified + independently of all other :class:`.Session` objects. + + + + """ # noqa: E501 + + return self._proxied.info + + @classmethod + async def close_all(cls) -> None: + r"""Close all :class:`_asyncio.AsyncSession` sessions. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. deprecated:: 2.0 The :meth:`.AsyncSession.close_all` method is deprecated and will be removed in a future release. Please refer to :func:`_asyncio.close_all_sessions`. + + """ # noqa: E501 + + return await AsyncSession.close_all() + + @classmethod + def object_session(cls, instance: object) -> Optional[Session]: + r"""Return the :class:`.Session` to which an object belongs. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This is an alias of :func:`.object_session`. + + + + """ # noqa: E501 + + return AsyncSession.object_session(instance) + + @classmethod + def identity_key( + cls, + class_: Optional[Type[Any]] = None, + ident: Union[Any, Tuple[Any, ...]] = None, + *, + instance: Optional[Any] = None, + row: Optional[Union[Row[Any], RowMapping]] = None, + identity_token: Optional[Any] = None, + ) -> _IdentityKeyType[Any]: + r"""Return an identity key. + + .. container:: class_bases + + Proxied for the :class:`_asyncio.AsyncSession` class on + behalf of the :class:`_asyncio.scoping.async_scoped_session` class. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This is an alias of :func:`.util.identity_key`. + + + + """ # noqa: E501 + + return AsyncSession.identity_key( + class_=class_, + ident=ident, + instance=instance, + row=row, + identity_token=identity_token, + ) + + # END PROXY METHODS async_scoped_session diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py new file mode 100644 index 0000000..c5fe469 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py @@ -0,0 +1,1936 @@ +# ext/asyncio/session.py +# Copyright (C) 2020-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +from __future__ import annotations + +import asyncio +from typing import Any +from typing import Awaitable +from typing import Callable +from typing import cast +from typing import Dict +from typing import Generic +from typing import Iterable +from typing import Iterator +from typing import NoReturn +from typing import Optional +from typing import overload +from typing import Sequence +from typing import Tuple +from typing import Type +from typing import TYPE_CHECKING +from typing import TypeVar +from typing import Union + +from . import engine +from .base import ReversibleProxy +from .base import StartableContext +from .result import _ensure_sync_result +from .result import AsyncResult +from .result import AsyncScalarResult +from ... import util +from ...orm import close_all_sessions as _sync_close_all_sessions +from ...orm import object_session +from ...orm import Session +from ...orm import SessionTransaction +from ...orm import state as _instance_state +from ...util.concurrency import greenlet_spawn +from ...util.typing import Concatenate +from ...util.typing import ParamSpec + + +if TYPE_CHECKING: + from .engine import AsyncConnection + from .engine import AsyncEngine + from ...engine import Connection + from ...engine import CursorResult + from ...engine import Engine + from ...engine import Result + from ...engine import Row + from ...engine import RowMapping + from ...engine import ScalarResult + from ...engine.interfaces import _CoreAnyExecuteParams + from ...engine.interfaces import CoreExecuteOptionsParameter + from ...event import dispatcher + from ...orm._typing import _IdentityKeyType + from ...orm._typing import _O + from ...orm._typing import OrmExecuteOptionsParameter + from ...orm.identity import IdentityMap + from ...orm.interfaces import ORMOption + from ...orm.session import _BindArguments + from ...orm.session import _EntityBindKey + from ...orm.session import _PKIdentityArgument + from ...orm.session import _SessionBind + from ...orm.session import _SessionBindKey + from ...sql._typing import _InfoType + from ...sql.base import Executable + from ...sql.dml import UpdateBase + from ...sql.elements import ClauseElement + from ...sql.selectable import ForUpdateParameter + from ...sql.selectable import TypedReturnsRows + +_AsyncSessionBind = Union["AsyncEngine", "AsyncConnection"] + +_P = ParamSpec("_P") +_T = TypeVar("_T", bound=Any) + + +_EXECUTE_OPTIONS = util.immutabledict({"prebuffer_rows": True}) +_STREAM_OPTIONS = util.immutabledict({"stream_results": True}) + + +class AsyncAttrs: + """Mixin class which provides an awaitable accessor for all attributes. + + E.g.:: + + from __future__ import annotations + + from typing import List + + from sqlalchemy import ForeignKey + from sqlalchemy import func + from sqlalchemy.ext.asyncio import AsyncAttrs + from sqlalchemy.orm import DeclarativeBase + from sqlalchemy.orm import Mapped + from sqlalchemy.orm import mapped_column + from sqlalchemy.orm import relationship + + + class Base(AsyncAttrs, DeclarativeBase): + pass + + + class A(Base): + __tablename__ = "a" + + id: Mapped[int] = mapped_column(primary_key=True) + data: Mapped[str] + bs: Mapped[List[B]] = relationship() + + + class B(Base): + __tablename__ = "b" + id: Mapped[int] = mapped_column(primary_key=True) + a_id: Mapped[int] = mapped_column(ForeignKey("a.id")) + data: Mapped[str] + + In the above example, the :class:`_asyncio.AsyncAttrs` mixin is applied to + the declarative ``Base`` class where it takes effect for all subclasses. + This mixin adds a single new attribute + :attr:`_asyncio.AsyncAttrs.awaitable_attrs` to all classes, which will + yield the value of any attribute as an awaitable. This allows attributes + which may be subject to lazy loading or deferred / unexpiry loading to be + accessed such that IO can still be emitted:: + + a1 = (await async_session.scalars(select(A).where(A.id == 5))).one() + + # use the lazy loader on ``a1.bs`` via the ``.awaitable_attrs`` + # interface, so that it may be awaited + for b1 in await a1.awaitable_attrs.bs: + print(b1) + + The :attr:`_asyncio.AsyncAttrs.awaitable_attrs` performs a call against the + attribute that is approximately equivalent to using the + :meth:`_asyncio.AsyncSession.run_sync` method, e.g.:: + + for b1 in await async_session.run_sync(lambda sess: a1.bs): + print(b1) + + .. versionadded:: 2.0.13 + + .. seealso:: + + :ref:`asyncio_orm_avoid_lazyloads` + + """ + + class _AsyncAttrGetitem: + __slots__ = "_instance" + + def __init__(self, _instance: Any): + self._instance = _instance + + def __getattr__(self, name: str) -> Awaitable[Any]: + return greenlet_spawn(getattr, self._instance, name) + + @property + def awaitable_attrs(self) -> AsyncAttrs._AsyncAttrGetitem: + """provide a namespace of all attributes on this object wrapped + as awaitables. + + e.g.:: + + + a1 = (await async_session.scalars(select(A).where(A.id == 5))).one() + + some_attribute = await a1.awaitable_attrs.some_deferred_attribute + some_collection = await a1.awaitable_attrs.some_collection + + """ # noqa: E501 + + return AsyncAttrs._AsyncAttrGetitem(self) + + +@util.create_proxy_methods( + Session, + ":class:`_orm.Session`", + ":class:`_asyncio.AsyncSession`", + classmethods=["object_session", "identity_key"], + methods=[ + "__contains__", + "__iter__", + "add", + "add_all", + "expire", + "expire_all", + "expunge", + "expunge_all", + "is_modified", + "in_transaction", + "in_nested_transaction", + ], + attributes=[ + "dirty", + "deleted", + "new", + "identity_map", + "is_active", + "autoflush", + "no_autoflush", + "info", + ], +) +class AsyncSession(ReversibleProxy[Session]): + """Asyncio version of :class:`_orm.Session`. + + The :class:`_asyncio.AsyncSession` is a proxy for a traditional + :class:`_orm.Session` instance. + + The :class:`_asyncio.AsyncSession` is **not safe for use in concurrent + tasks.**. See :ref:`session_faq_threadsafe` for background. + + .. versionadded:: 1.4 + + To use an :class:`_asyncio.AsyncSession` with custom :class:`_orm.Session` + implementations, see the + :paramref:`_asyncio.AsyncSession.sync_session_class` parameter. + + + """ + + _is_asyncio = True + + dispatch: dispatcher[Session] + + def __init__( + self, + bind: Optional[_AsyncSessionBind] = None, + *, + binds: Optional[Dict[_SessionBindKey, _AsyncSessionBind]] = None, + sync_session_class: Optional[Type[Session]] = None, + **kw: Any, + ): + r"""Construct a new :class:`_asyncio.AsyncSession`. + + All parameters other than ``sync_session_class`` are passed to the + ``sync_session_class`` callable directly to instantiate a new + :class:`_orm.Session`. Refer to :meth:`_orm.Session.__init__` for + parameter documentation. + + :param sync_session_class: + A :class:`_orm.Session` subclass or other callable which will be used + to construct the :class:`_orm.Session` which will be proxied. This + parameter may be used to provide custom :class:`_orm.Session` + subclasses. Defaults to the + :attr:`_asyncio.AsyncSession.sync_session_class` class-level + attribute. + + .. versionadded:: 1.4.24 + + """ + sync_bind = sync_binds = None + + if bind: + self.bind = bind + sync_bind = engine._get_sync_engine_or_connection(bind) + + if binds: + self.binds = binds + sync_binds = { + key: engine._get_sync_engine_or_connection(b) + for key, b in binds.items() + } + + if sync_session_class: + self.sync_session_class = sync_session_class + + self.sync_session = self._proxied = self._assign_proxied( + self.sync_session_class(bind=sync_bind, binds=sync_binds, **kw) + ) + + sync_session_class: Type[Session] = Session + """The class or callable that provides the + underlying :class:`_orm.Session` instance for a particular + :class:`_asyncio.AsyncSession`. + + At the class level, this attribute is the default value for the + :paramref:`_asyncio.AsyncSession.sync_session_class` parameter. Custom + subclasses of :class:`_asyncio.AsyncSession` can override this. + + At the instance level, this attribute indicates the current class or + callable that was used to provide the :class:`_orm.Session` instance for + this :class:`_asyncio.AsyncSession` instance. + + .. versionadded:: 1.4.24 + + """ + + sync_session: Session + """Reference to the underlying :class:`_orm.Session` this + :class:`_asyncio.AsyncSession` proxies requests towards. + + This instance can be used as an event target. + + .. seealso:: + + :ref:`asyncio_events` + + """ + + @classmethod + def _no_async_engine_events(cls) -> NoReturn: + raise NotImplementedError( + "asynchronous events are not implemented at this time. Apply " + "synchronous listeners to the AsyncSession.sync_session." + ) + + async def refresh( + self, + instance: object, + attribute_names: Optional[Iterable[str]] = None, + with_for_update: ForUpdateParameter = None, + ) -> None: + """Expire and refresh the attributes on the given instance. + + A query will be issued to the database and all attributes will be + refreshed with their current database value. + + This is the async version of the :meth:`_orm.Session.refresh` method. + See that method for a complete description of all options. + + .. seealso:: + + :meth:`_orm.Session.refresh` - main documentation for refresh + + """ + + await greenlet_spawn( + self.sync_session.refresh, + instance, + attribute_names=attribute_names, + with_for_update=with_for_update, + ) + + async def run_sync( + self, + fn: Callable[Concatenate[Session, _P], _T], + *arg: _P.args, + **kw: _P.kwargs, + ) -> _T: + """Invoke the given synchronous (i.e. not async) callable, + passing a synchronous-style :class:`_orm.Session` as the first + argument. + + This method allows traditional synchronous SQLAlchemy functions to + run within the context of an asyncio application. + + E.g.:: + + def some_business_method(session: Session, param: str) -> str: + '''A synchronous function that does not require awaiting + + :param session: a SQLAlchemy Session, used synchronously + + :return: an optional return value is supported + + ''' + session.add(MyObject(param=param)) + session.flush() + return "success" + + + async def do_something_async(async_engine: AsyncEngine) -> None: + '''an async function that uses awaiting''' + + with AsyncSession(async_engine) as async_session: + # run some_business_method() with a sync-style + # Session, proxied into an awaitable + return_code = await async_session.run_sync(some_business_method, param="param1") + print(return_code) + + This method maintains the asyncio event loop all the way through + to the database connection by running the given callable in a + specially instrumented greenlet. + + .. tip:: + + The provided callable is invoked inline within the asyncio event + loop, and will block on traditional IO calls. IO within this + callable should only call into SQLAlchemy's asyncio database + APIs which will be properly adapted to the greenlet context. + + .. seealso:: + + :class:`.AsyncAttrs` - a mixin for ORM mapped classes that provides + a similar feature more succinctly on a per-attribute basis + + :meth:`.AsyncConnection.run_sync` + + :ref:`session_run_sync` + """ # noqa: E501 + + return await greenlet_spawn( + fn, self.sync_session, *arg, _require_await=False, **kw + ) + + @overload + async def execute( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[_T]: ... + + @overload + async def execute( + self, + statement: UpdateBase, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> CursorResult[Any]: ... + + @overload + async def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + _parent_execute_state: Optional[Any] = None, + _add_event: Optional[Any] = None, + ) -> Result[Any]: ... + + async def execute( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Result[Any]: + """Execute a statement and return a buffered + :class:`_engine.Result` object. + + .. seealso:: + + :meth:`_orm.Session.execute` - main documentation for execute + + """ + + if execution_options: + execution_options = util.immutabledict(execution_options).union( + _EXECUTE_OPTIONS + ) + else: + execution_options = _EXECUTE_OPTIONS + + result = await greenlet_spawn( + self.sync_session.execute, + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + return await _ensure_sync_result(result, self.execute) + + @overload + async def scalar( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Optional[_T]: ... + + @overload + async def scalar( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: ... + + async def scalar( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> Any: + """Execute a statement and return a scalar result. + + .. seealso:: + + :meth:`_orm.Session.scalar` - main documentation for scalar + + """ + + if execution_options: + execution_options = util.immutabledict(execution_options).union( + _EXECUTE_OPTIONS + ) + else: + execution_options = _EXECUTE_OPTIONS + + return await greenlet_spawn( + self.sync_session.scalar, + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + + @overload + async def scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[_T]: ... + + @overload + async def scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: ... + + async def scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> ScalarResult[Any]: + """Execute a statement and return scalar results. + + :return: a :class:`_result.ScalarResult` object + + .. versionadded:: 1.4.24 Added :meth:`_asyncio.AsyncSession.scalars` + + .. versionadded:: 1.4.26 Added + :meth:`_asyncio.async_scoped_session.scalars` + + .. seealso:: + + :meth:`_orm.Session.scalars` - main documentation for scalars + + :meth:`_asyncio.AsyncSession.stream_scalars` - streaming version + + """ + + result = await self.execute( + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + return result.scalars() + + async def get( + self, + entity: _EntityBindKey[_O], + ident: _PKIdentityArgument, + *, + options: Optional[Sequence[ORMOption]] = None, + populate_existing: bool = False, + with_for_update: ForUpdateParameter = None, + identity_token: Optional[Any] = None, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + ) -> Union[_O, None]: + """Return an instance based on the given primary key identifier, + or ``None`` if not found. + + .. seealso:: + + :meth:`_orm.Session.get` - main documentation for get + + + """ + + return await greenlet_spawn( + cast("Callable[..., _O]", self.sync_session.get), + entity, + ident, + options=options, + populate_existing=populate_existing, + with_for_update=with_for_update, + identity_token=identity_token, + execution_options=execution_options, + ) + + async def get_one( + self, + entity: _EntityBindKey[_O], + ident: _PKIdentityArgument, + *, + options: Optional[Sequence[ORMOption]] = None, + populate_existing: bool = False, + with_for_update: ForUpdateParameter = None, + identity_token: Optional[Any] = None, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + ) -> _O: + """Return an instance based on the given primary key identifier, + or raise an exception if not found. + + Raises ``sqlalchemy.orm.exc.NoResultFound`` if the query selects + no rows. + + ..versionadded: 2.0.22 + + .. seealso:: + + :meth:`_orm.Session.get_one` - main documentation for get_one + + """ + + return await greenlet_spawn( + cast("Callable[..., _O]", self.sync_session.get_one), + entity, + ident, + options=options, + populate_existing=populate_existing, + with_for_update=with_for_update, + identity_token=identity_token, + execution_options=execution_options, + ) + + @overload + async def stream( + self, + statement: TypedReturnsRows[_T], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[_T]: ... + + @overload + async def stream( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[Any]: ... + + async def stream( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncResult[Any]: + """Execute a statement and return a streaming + :class:`_asyncio.AsyncResult` object. + + """ + + if execution_options: + execution_options = util.immutabledict(execution_options).union( + _STREAM_OPTIONS + ) + else: + execution_options = _STREAM_OPTIONS + + result = await greenlet_spawn( + self.sync_session.execute, + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + return AsyncResult(result) + + @overload + async def stream_scalars( + self, + statement: TypedReturnsRows[Tuple[_T]], + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[_T]: ... + + @overload + async def stream_scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[Any]: ... + + async def stream_scalars( + self, + statement: Executable, + params: Optional[_CoreAnyExecuteParams] = None, + *, + execution_options: OrmExecuteOptionsParameter = util.EMPTY_DICT, + bind_arguments: Optional[_BindArguments] = None, + **kw: Any, + ) -> AsyncScalarResult[Any]: + """Execute a statement and return a stream of scalar results. + + :return: an :class:`_asyncio.AsyncScalarResult` object + + .. versionadded:: 1.4.24 + + .. seealso:: + + :meth:`_orm.Session.scalars` - main documentation for scalars + + :meth:`_asyncio.AsyncSession.scalars` - non streaming version + + """ + + result = await self.stream( + statement, + params=params, + execution_options=execution_options, + bind_arguments=bind_arguments, + **kw, + ) + return result.scalars() + + async def delete(self, instance: object) -> None: + """Mark an instance as deleted. + + The database delete operation occurs upon ``flush()``. + + As this operation may need to cascade along unloaded relationships, + it is awaitable to allow for those queries to take place. + + .. seealso:: + + :meth:`_orm.Session.delete` - main documentation for delete + + """ + await greenlet_spawn(self.sync_session.delete, instance) + + async def merge( + self, + instance: _O, + *, + load: bool = True, + options: Optional[Sequence[ORMOption]] = None, + ) -> _O: + """Copy the state of a given instance into a corresponding instance + within this :class:`_asyncio.AsyncSession`. + + .. seealso:: + + :meth:`_orm.Session.merge` - main documentation for merge + + """ + return await greenlet_spawn( + self.sync_session.merge, instance, load=load, options=options + ) + + async def flush(self, objects: Optional[Sequence[Any]] = None) -> None: + """Flush all the object changes to the database. + + .. seealso:: + + :meth:`_orm.Session.flush` - main documentation for flush + + """ + await greenlet_spawn(self.sync_session.flush, objects=objects) + + def get_transaction(self) -> Optional[AsyncSessionTransaction]: + """Return the current root transaction in progress, if any. + + :return: an :class:`_asyncio.AsyncSessionTransaction` object, or + ``None``. + + .. versionadded:: 1.4.18 + + """ + trans = self.sync_session.get_transaction() + if trans is not None: + return AsyncSessionTransaction._retrieve_proxy_for_target(trans) + else: + return None + + def get_nested_transaction(self) -> Optional[AsyncSessionTransaction]: + """Return the current nested transaction in progress, if any. + + :return: an :class:`_asyncio.AsyncSessionTransaction` object, or + ``None``. + + .. versionadded:: 1.4.18 + + """ + + trans = self.sync_session.get_nested_transaction() + if trans is not None: + return AsyncSessionTransaction._retrieve_proxy_for_target(trans) + else: + return None + + def get_bind( + self, + mapper: Optional[_EntityBindKey[_O]] = None, + clause: Optional[ClauseElement] = None, + bind: Optional[_SessionBind] = None, + **kw: Any, + ) -> Union[Engine, Connection]: + """Return a "bind" to which the synchronous proxied :class:`_orm.Session` + is bound. + + Unlike the :meth:`_orm.Session.get_bind` method, this method is + currently **not** used by this :class:`.AsyncSession` in any way + in order to resolve engines for requests. + + .. note:: + + This method proxies directly to the :meth:`_orm.Session.get_bind` + method, however is currently **not** useful as an override target, + in contrast to that of the :meth:`_orm.Session.get_bind` method. + The example below illustrates how to implement custom + :meth:`_orm.Session.get_bind` schemes that work with + :class:`.AsyncSession` and :class:`.AsyncEngine`. + + The pattern introduced at :ref:`session_custom_partitioning` + illustrates how to apply a custom bind-lookup scheme to a + :class:`_orm.Session` given a set of :class:`_engine.Engine` objects. + To apply a corresponding :meth:`_orm.Session.get_bind` implementation + for use with a :class:`.AsyncSession` and :class:`.AsyncEngine` + objects, continue to subclass :class:`_orm.Session` and apply it to + :class:`.AsyncSession` using + :paramref:`.AsyncSession.sync_session_class`. The inner method must + continue to return :class:`_engine.Engine` instances, which can be + acquired from a :class:`_asyncio.AsyncEngine` using the + :attr:`_asyncio.AsyncEngine.sync_engine` attribute:: + + # using example from "Custom Vertical Partitioning" + + + import random + + from sqlalchemy.ext.asyncio import AsyncSession + from sqlalchemy.ext.asyncio import create_async_engine + from sqlalchemy.ext.asyncio import async_sessionmaker + from sqlalchemy.orm import Session + + # construct async engines w/ async drivers + engines = { + 'leader':create_async_engine("sqlite+aiosqlite:///leader.db"), + 'other':create_async_engine("sqlite+aiosqlite:///other.db"), + 'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"), + 'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"), + } + + class RoutingSession(Session): + def get_bind(self, mapper=None, clause=None, **kw): + # within get_bind(), return sync engines + if mapper and issubclass(mapper.class_, MyOtherClass): + return engines['other'].sync_engine + elif self._flushing or isinstance(clause, (Update, Delete)): + return engines['leader'].sync_engine + else: + return engines[ + random.choice(['follower1','follower2']) + ].sync_engine + + # apply to AsyncSession using sync_session_class + AsyncSessionMaker = async_sessionmaker( + sync_session_class=RoutingSession + ) + + The :meth:`_orm.Session.get_bind` method is called in a non-asyncio, + implicitly non-blocking context in the same manner as ORM event hooks + and functions that are invoked via :meth:`.AsyncSession.run_sync`, so + routines that wish to run SQL commands inside of + :meth:`_orm.Session.get_bind` can continue to do so using + blocking-style code, which will be translated to implicitly async calls + at the point of invoking IO on the database drivers. + + """ # noqa: E501 + + return self.sync_session.get_bind( + mapper=mapper, clause=clause, bind=bind, **kw + ) + + async def connection( + self, + bind_arguments: Optional[_BindArguments] = None, + execution_options: Optional[CoreExecuteOptionsParameter] = None, + **kw: Any, + ) -> AsyncConnection: + r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to + this :class:`.Session` object's transactional state. + + This method may also be used to establish execution options for the + database connection used by the current transaction. + + .. versionadded:: 1.4.24 Added \**kw arguments which are passed + through to the underlying :meth:`_orm.Session.connection` method. + + .. seealso:: + + :meth:`_orm.Session.connection` - main documentation for + "connection" + + """ + + sync_connection = await greenlet_spawn( + self.sync_session.connection, + bind_arguments=bind_arguments, + execution_options=execution_options, + **kw, + ) + return engine.AsyncConnection._retrieve_proxy_for_target( + sync_connection + ) + + def begin(self) -> AsyncSessionTransaction: + """Return an :class:`_asyncio.AsyncSessionTransaction` object. + + The underlying :class:`_orm.Session` will perform the + "begin" action when the :class:`_asyncio.AsyncSessionTransaction` + object is entered:: + + async with async_session.begin(): + # .. ORM transaction is begun + + Note that database IO will not normally occur when the session-level + transaction is begun, as database transactions begin on an + on-demand basis. However, the begin block is async to accommodate + for a :meth:`_orm.SessionEvents.after_transaction_create` + event hook that may perform IO. + + For a general description of ORM begin, see + :meth:`_orm.Session.begin`. + + """ + + return AsyncSessionTransaction(self) + + def begin_nested(self) -> AsyncSessionTransaction: + """Return an :class:`_asyncio.AsyncSessionTransaction` object + which will begin a "nested" transaction, e.g. SAVEPOINT. + + Behavior is the same as that of :meth:`_asyncio.AsyncSession.begin`. + + For a general description of ORM begin nested, see + :meth:`_orm.Session.begin_nested`. + + .. seealso:: + + :ref:`aiosqlite_serializable` - special workarounds required + with the SQLite asyncio driver in order for SAVEPOINT to work + correctly. + + """ + + return AsyncSessionTransaction(self, nested=True) + + async def rollback(self) -> None: + """Rollback the current transaction in progress. + + .. seealso:: + + :meth:`_orm.Session.rollback` - main documentation for + "rollback" + """ + await greenlet_spawn(self.sync_session.rollback) + + async def commit(self) -> None: + """Commit the current transaction in progress. + + .. seealso:: + + :meth:`_orm.Session.commit` - main documentation for + "commit" + """ + await greenlet_spawn(self.sync_session.commit) + + async def close(self) -> None: + """Close out the transactional resources and ORM objects used by this + :class:`_asyncio.AsyncSession`. + + .. seealso:: + + :meth:`_orm.Session.close` - main documentation for + "close" + + :ref:`session_closing` - detail on the semantics of + :meth:`_asyncio.AsyncSession.close` and + :meth:`_asyncio.AsyncSession.reset`. + + """ + await greenlet_spawn(self.sync_session.close) + + async def reset(self) -> None: + """Close out the transactional resources and ORM objects used by this + :class:`_orm.Session`, resetting the session to its initial state. + + .. versionadded:: 2.0.22 + + .. seealso:: + + :meth:`_orm.Session.reset` - main documentation for + "reset" + + :ref:`session_closing` - detail on the semantics of + :meth:`_asyncio.AsyncSession.close` and + :meth:`_asyncio.AsyncSession.reset`. + + """ + await greenlet_spawn(self.sync_session.reset) + + async def aclose(self) -> None: + """A synonym for :meth:`_asyncio.AsyncSession.close`. + + The :meth:`_asyncio.AsyncSession.aclose` name is specifically + to support the Python standard library ``@contextlib.aclosing`` + context manager function. + + .. versionadded:: 2.0.20 + + """ + await self.close() + + async def invalidate(self) -> None: + """Close this Session, using connection invalidation. + + For a complete description, see :meth:`_orm.Session.invalidate`. + """ + await greenlet_spawn(self.sync_session.invalidate) + + @classmethod + @util.deprecated( + "2.0", + "The :meth:`.AsyncSession.close_all` method is deprecated and will be " + "removed in a future release. Please refer to " + ":func:`_asyncio.close_all_sessions`.", + ) + async def close_all(cls) -> None: + """Close all :class:`_asyncio.AsyncSession` sessions.""" + await close_all_sessions() + + async def __aenter__(self: _AS) -> _AS: + return self + + async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: + task = asyncio.create_task(self.close()) + await asyncio.shield(task) + + def _maker_context_manager(self: _AS) -> _AsyncSessionContextManager[_AS]: + return _AsyncSessionContextManager(self) + + # START PROXY METHODS AsyncSession + + # code within this block is **programmatically, + # statically generated** by tools/generate_proxy_methods.py + + def __contains__(self, instance: object) -> bool: + r"""Return True if the instance is associated with this session. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + The instance may be pending or persistent within the Session for a + result of True. + + + """ # noqa: E501 + + return self._proxied.__contains__(instance) + + def __iter__(self) -> Iterator[object]: + r"""Iterate over all pending or persistent instances within this + Session. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + + """ # noqa: E501 + + return self._proxied.__iter__() + + def add(self, instance: object, _warn: bool = True) -> None: + r"""Place an object into this :class:`_orm.Session`. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + Objects that are in the :term:`transient` state when passed to the + :meth:`_orm.Session.add` method will move to the + :term:`pending` state, until the next flush, at which point they + will move to the :term:`persistent` state. + + Objects that are in the :term:`detached` state when passed to the + :meth:`_orm.Session.add` method will move to the :term:`persistent` + state directly. + + If the transaction used by the :class:`_orm.Session` is rolled back, + objects which were transient when they were passed to + :meth:`_orm.Session.add` will be moved back to the + :term:`transient` state, and will no longer be present within this + :class:`_orm.Session`. + + .. seealso:: + + :meth:`_orm.Session.add_all` + + :ref:`session_adding` - at :ref:`session_basics` + + + """ # noqa: E501 + + return self._proxied.add(instance, _warn=_warn) + + def add_all(self, instances: Iterable[object]) -> None: + r"""Add the given collection of instances to this :class:`_orm.Session`. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + See the documentation for :meth:`_orm.Session.add` for a general + behavioral description. + + .. seealso:: + + :meth:`_orm.Session.add` + + :ref:`session_adding` - at :ref:`session_basics` + + + """ # noqa: E501 + + return self._proxied.add_all(instances) + + def expire( + self, instance: object, attribute_names: Optional[Iterable[str]] = None + ) -> None: + r"""Expire the attributes on an instance. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + Marks the attributes of an instance as out of date. When an expired + attribute is next accessed, a query will be issued to the + :class:`.Session` object's current transactional context in order to + load all expired attributes for the given instance. Note that + a highly isolated transaction will return the same values as were + previously read in that same transaction, regardless of changes + in database state outside of that transaction. + + To expire all objects in the :class:`.Session` simultaneously, + use :meth:`Session.expire_all`. + + The :class:`.Session` object's default behavior is to + expire all state whenever the :meth:`Session.rollback` + or :meth:`Session.commit` methods are called, so that new + state can be loaded for the new transaction. For this reason, + calling :meth:`Session.expire` only makes sense for the specific + case that a non-ORM SQL statement was emitted in the current + transaction. + + :param instance: The instance to be refreshed. + :param attribute_names: optional list of string attribute names + indicating a subset of attributes to be expired. + + .. seealso:: + + :ref:`session_expire` - introductory material + + :meth:`.Session.expire` + + :meth:`.Session.refresh` + + :meth:`_orm.Query.populate_existing` + + + """ # noqa: E501 + + return self._proxied.expire(instance, attribute_names=attribute_names) + + def expire_all(self) -> None: + r"""Expires all persistent instances within this Session. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + When any attributes on a persistent instance is next accessed, + a query will be issued using the + :class:`.Session` object's current transactional context in order to + load all expired attributes for the given instance. Note that + a highly isolated transaction will return the same values as were + previously read in that same transaction, regardless of changes + in database state outside of that transaction. + + To expire individual objects and individual attributes + on those objects, use :meth:`Session.expire`. + + The :class:`.Session` object's default behavior is to + expire all state whenever the :meth:`Session.rollback` + or :meth:`Session.commit` methods are called, so that new + state can be loaded for the new transaction. For this reason, + calling :meth:`Session.expire_all` is not usually needed, + assuming the transaction is isolated. + + .. seealso:: + + :ref:`session_expire` - introductory material + + :meth:`.Session.expire` + + :meth:`.Session.refresh` + + :meth:`_orm.Query.populate_existing` + + + """ # noqa: E501 + + return self._proxied.expire_all() + + def expunge(self, instance: object) -> None: + r"""Remove the `instance` from this ``Session``. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This will free all internal references to the instance. Cascading + will be applied according to the *expunge* cascade rule. + + + """ # noqa: E501 + + return self._proxied.expunge(instance) + + def expunge_all(self) -> None: + r"""Remove all object instances from this ``Session``. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This is equivalent to calling ``expunge(obj)`` on all objects in this + ``Session``. + + + """ # noqa: E501 + + return self._proxied.expunge_all() + + def is_modified( + self, instance: object, include_collections: bool = True + ) -> bool: + r"""Return ``True`` if the given instance has locally + modified attributes. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This method retrieves the history for each instrumented + attribute on the instance and performs a comparison of the current + value to its previously committed value, if any. + + It is in effect a more expensive and accurate + version of checking for the given instance in the + :attr:`.Session.dirty` collection; a full test for + each attribute's net "dirty" status is performed. + + E.g.:: + + return session.is_modified(someobject) + + A few caveats to this method apply: + + * Instances present in the :attr:`.Session.dirty` collection may + report ``False`` when tested with this method. This is because + the object may have received change events via attribute mutation, + thus placing it in :attr:`.Session.dirty`, but ultimately the state + is the same as that loaded from the database, resulting in no net + change here. + * Scalar attributes may not have recorded the previously set + value when a new value was applied, if the attribute was not loaded, + or was expired, at the time the new value was received - in these + cases, the attribute is assumed to have a change, even if there is + ultimately no net change against its database value. SQLAlchemy in + most cases does not need the "old" value when a set event occurs, so + it skips the expense of a SQL call if the old value isn't present, + based on the assumption that an UPDATE of the scalar value is + usually needed, and in those few cases where it isn't, is less + expensive on average than issuing a defensive SELECT. + + The "old" value is fetched unconditionally upon set only if the + attribute container has the ``active_history`` flag set to ``True``. + This flag is set typically for primary key attributes and scalar + object references that are not a simple many-to-one. To set this + flag for any arbitrary mapped column, use the ``active_history`` + argument with :func:`.column_property`. + + :param instance: mapped instance to be tested for pending changes. + :param include_collections: Indicates if multivalued collections + should be included in the operation. Setting this to ``False`` is a + way to detect only local-column based properties (i.e. scalar columns + or many-to-one foreign keys) that would result in an UPDATE for this + instance upon flush. + + + """ # noqa: E501 + + return self._proxied.is_modified( + instance, include_collections=include_collections + ) + + def in_transaction(self) -> bool: + r"""Return True if this :class:`_orm.Session` has begun a transaction. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + .. versionadded:: 1.4 + + .. seealso:: + + :attr:`_orm.Session.is_active` + + + + """ # noqa: E501 + + return self._proxied.in_transaction() + + def in_nested_transaction(self) -> bool: + r"""Return True if this :class:`_orm.Session` has begun a nested + transaction, e.g. SAVEPOINT. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + .. versionadded:: 1.4 + + + """ # noqa: E501 + + return self._proxied.in_nested_transaction() + + @property + def dirty(self) -> Any: + r"""The set of all persistent instances considered dirty. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + E.g.:: + + some_mapped_object in session.dirty + + Instances are considered dirty when they were modified but not + deleted. + + Note that this 'dirty' calculation is 'optimistic'; most + attribute-setting or collection modification operations will + mark an instance as 'dirty' and place it in this set, even if + there is no net change to the attribute's value. At flush + time, the value of each attribute is compared to its + previously saved value, and if there's no net change, no SQL + operation will occur (this is a more expensive operation so + it's only done at flush time). + + To check if an instance has actionable net changes to its + attributes, use the :meth:`.Session.is_modified` method. + + + """ # noqa: E501 + + return self._proxied.dirty + + @property + def deleted(self) -> Any: + r"""The set of all instances marked as 'deleted' within this ``Session`` + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + """ # noqa: E501 + + return self._proxied.deleted + + @property + def new(self) -> Any: + r"""The set of all instances marked as 'new' within this ``Session``. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + """ # noqa: E501 + + return self._proxied.new + + @property + def identity_map(self) -> IdentityMap: + r"""Proxy for the :attr:`_orm.Session.identity_map` attribute + on behalf of the :class:`_asyncio.AsyncSession` class. + + """ # noqa: E501 + + return self._proxied.identity_map + + @identity_map.setter + def identity_map(self, attr: IdentityMap) -> None: + self._proxied.identity_map = attr + + @property + def is_active(self) -> Any: + r"""True if this :class:`.Session` not in "partial rollback" state. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + .. versionchanged:: 1.4 The :class:`_orm.Session` no longer begins + a new transaction immediately, so this attribute will be False + when the :class:`_orm.Session` is first instantiated. + + "partial rollback" state typically indicates that the flush process + of the :class:`_orm.Session` has failed, and that the + :meth:`_orm.Session.rollback` method must be emitted in order to + fully roll back the transaction. + + If this :class:`_orm.Session` is not in a transaction at all, the + :class:`_orm.Session` will autobegin when it is first used, so in this + case :attr:`_orm.Session.is_active` will return True. + + Otherwise, if this :class:`_orm.Session` is within a transaction, + and that transaction has not been rolled back internally, the + :attr:`_orm.Session.is_active` will also return True. + + .. seealso:: + + :ref:`faq_session_rollback` + + :meth:`_orm.Session.in_transaction` + + + """ # noqa: E501 + + return self._proxied.is_active + + @property + def autoflush(self) -> bool: + r"""Proxy for the :attr:`_orm.Session.autoflush` attribute + on behalf of the :class:`_asyncio.AsyncSession` class. + + """ # noqa: E501 + + return self._proxied.autoflush + + @autoflush.setter + def autoflush(self, attr: bool) -> None: + self._proxied.autoflush = attr + + @property + def no_autoflush(self) -> Any: + r"""Return a context manager that disables autoflush. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + e.g.:: + + with session.no_autoflush: + + some_object = SomeClass() + session.add(some_object) + # won't autoflush + some_object.related_thing = session.query(SomeRelated).first() + + Operations that proceed within the ``with:`` block + will not be subject to flushes occurring upon query + access. This is useful when initializing a series + of objects which involve existing database queries, + where the uncompleted object should not yet be flushed. + + + """ # noqa: E501 + + return self._proxied.no_autoflush + + @property + def info(self) -> Any: + r"""A user-modifiable dictionary. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class + on behalf of the :class:`_asyncio.AsyncSession` class. + + The initial value of this dictionary can be populated using the + ``info`` argument to the :class:`.Session` constructor or + :class:`.sessionmaker` constructor or factory methods. The dictionary + here is always local to this :class:`.Session` and can be modified + independently of all other :class:`.Session` objects. + + + """ # noqa: E501 + + return self._proxied.info + + @classmethod + def object_session(cls, instance: object) -> Optional[Session]: + r"""Return the :class:`.Session` to which an object belongs. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This is an alias of :func:`.object_session`. + + + """ # noqa: E501 + + return Session.object_session(instance) + + @classmethod + def identity_key( + cls, + class_: Optional[Type[Any]] = None, + ident: Union[Any, Tuple[Any, ...]] = None, + *, + instance: Optional[Any] = None, + row: Optional[Union[Row[Any], RowMapping]] = None, + identity_token: Optional[Any] = None, + ) -> _IdentityKeyType[Any]: + r"""Return an identity key. + + .. container:: class_bases + + Proxied for the :class:`_orm.Session` class on + behalf of the :class:`_asyncio.AsyncSession` class. + + This is an alias of :func:`.util.identity_key`. + + + """ # noqa: E501 + + return Session.identity_key( + class_=class_, + ident=ident, + instance=instance, + row=row, + identity_token=identity_token, + ) + + # END PROXY METHODS AsyncSession + + +_AS = TypeVar("_AS", bound="AsyncSession") + + +class async_sessionmaker(Generic[_AS]): + """A configurable :class:`.AsyncSession` factory. + + The :class:`.async_sessionmaker` factory works in the same way as the + :class:`.sessionmaker` factory, to generate new :class:`.AsyncSession` + objects when called, creating them given + the configurational arguments established here. + + e.g.:: + + from sqlalchemy.ext.asyncio import create_async_engine + from sqlalchemy.ext.asyncio import AsyncSession + from sqlalchemy.ext.asyncio import async_sessionmaker + + async def run_some_sql(async_session: async_sessionmaker[AsyncSession]) -> None: + async with async_session() as session: + session.add(SomeObject(data="object")) + session.add(SomeOtherObject(name="other object")) + await session.commit() + + async def main() -> None: + # an AsyncEngine, which the AsyncSession will use for connection + # resources + engine = create_async_engine('postgresql+asyncpg://scott:tiger@localhost/') + + # create a reusable factory for new AsyncSession instances + async_session = async_sessionmaker(engine) + + await run_some_sql(async_session) + + await engine.dispose() + + The :class:`.async_sessionmaker` is useful so that different parts + of a program can create new :class:`.AsyncSession` objects with a + fixed configuration established up front. Note that :class:`.AsyncSession` + objects may also be instantiated directly when not using + :class:`.async_sessionmaker`. + + .. versionadded:: 2.0 :class:`.async_sessionmaker` provides a + :class:`.sessionmaker` class that's dedicated to the + :class:`.AsyncSession` object, including pep-484 typing support. + + .. seealso:: + + :ref:`asyncio_orm` - shows example use + + :class:`.sessionmaker` - general overview of the + :class:`.sessionmaker` architecture + + + :ref:`session_getting` - introductory text on creating + sessions using :class:`.sessionmaker`. + + """ # noqa E501 + + class_: Type[_AS] + + @overload + def __init__( + self, + bind: Optional[_AsyncSessionBind] = ..., + *, + class_: Type[_AS], + autoflush: bool = ..., + expire_on_commit: bool = ..., + info: Optional[_InfoType] = ..., + **kw: Any, + ): ... + + @overload + def __init__( + self: "async_sessionmaker[AsyncSession]", + bind: Optional[_AsyncSessionBind] = ..., + *, + autoflush: bool = ..., + expire_on_commit: bool = ..., + info: Optional[_InfoType] = ..., + **kw: Any, + ): ... + + def __init__( + self, + bind: Optional[_AsyncSessionBind] = None, + *, + class_: Type[_AS] = AsyncSession, # type: ignore + autoflush: bool = True, + expire_on_commit: bool = True, + info: Optional[_InfoType] = None, + **kw: Any, + ): + r"""Construct a new :class:`.async_sessionmaker`. + + All arguments here except for ``class_`` correspond to arguments + accepted by :class:`.Session` directly. See the + :meth:`.AsyncSession.__init__` docstring for more details on + parameters. + + + """ + kw["bind"] = bind + kw["autoflush"] = autoflush + kw["expire_on_commit"] = expire_on_commit + if info is not None: + kw["info"] = info + self.kw = kw + self.class_ = class_ + + def begin(self) -> _AsyncSessionContextManager[_AS]: + """Produce a context manager that both provides a new + :class:`_orm.AsyncSession` as well as a transaction that commits. + + + e.g.:: + + async def main(): + Session = async_sessionmaker(some_engine) + + async with Session.begin() as session: + session.add(some_object) + + # commits transaction, closes session + + + """ + + session = self() + return session._maker_context_manager() + + def __call__(self, **local_kw: Any) -> _AS: + """Produce a new :class:`.AsyncSession` object using the configuration + established in this :class:`.async_sessionmaker`. + + In Python, the ``__call__`` method is invoked on an object when + it is "called" in the same way as a function:: + + AsyncSession = async_sessionmaker(async_engine, expire_on_commit=False) + session = AsyncSession() # invokes sessionmaker.__call__() + + """ # noqa E501 + for k, v in self.kw.items(): + if k == "info" and "info" in local_kw: + d = v.copy() + d.update(local_kw["info"]) + local_kw["info"] = d + else: + local_kw.setdefault(k, v) + return self.class_(**local_kw) + + def configure(self, **new_kw: Any) -> None: + """(Re)configure the arguments for this async_sessionmaker. + + e.g.:: + + AsyncSession = async_sessionmaker(some_engine) + + AsyncSession.configure(bind=create_async_engine('sqlite+aiosqlite://')) + """ # noqa E501 + + self.kw.update(new_kw) + + def __repr__(self) -> str: + return "%s(class_=%r, %s)" % ( + self.__class__.__name__, + self.class_.__name__, + ", ".join("%s=%r" % (k, v) for k, v in self.kw.items()), + ) + + +class _AsyncSessionContextManager(Generic[_AS]): + __slots__ = ("async_session", "trans") + + async_session: _AS + trans: AsyncSessionTransaction + + def __init__(self, async_session: _AS): + self.async_session = async_session + + async def __aenter__(self) -> _AS: + self.trans = self.async_session.begin() + await self.trans.__aenter__() + return self.async_session + + async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: + async def go() -> None: + await self.trans.__aexit__(type_, value, traceback) + await self.async_session.__aexit__(type_, value, traceback) + + task = asyncio.create_task(go()) + await asyncio.shield(task) + + +class AsyncSessionTransaction( + ReversibleProxy[SessionTransaction], + StartableContext["AsyncSessionTransaction"], +): + """A wrapper for the ORM :class:`_orm.SessionTransaction` object. + + This object is provided so that a transaction-holding object + for the :meth:`_asyncio.AsyncSession.begin` may be returned. + + The object supports both explicit calls to + :meth:`_asyncio.AsyncSessionTransaction.commit` and + :meth:`_asyncio.AsyncSessionTransaction.rollback`, as well as use as an + async context manager. + + + .. versionadded:: 1.4 + + """ + + __slots__ = ("session", "sync_transaction", "nested") + + session: AsyncSession + sync_transaction: Optional[SessionTransaction] + + def __init__(self, session: AsyncSession, nested: bool = False): + self.session = session + self.nested = nested + self.sync_transaction = None + + @property + def is_active(self) -> bool: + return ( + self._sync_transaction() is not None + and self._sync_transaction().is_active + ) + + def _sync_transaction(self) -> SessionTransaction: + if not self.sync_transaction: + self._raise_for_not_started() + return self.sync_transaction + + async def rollback(self) -> None: + """Roll back this :class:`_asyncio.AsyncTransaction`.""" + await greenlet_spawn(self._sync_transaction().rollback) + + async def commit(self) -> None: + """Commit this :class:`_asyncio.AsyncTransaction`.""" + + await greenlet_spawn(self._sync_transaction().commit) + + async def start( + self, is_ctxmanager: bool = False + ) -> AsyncSessionTransaction: + self.sync_transaction = self._assign_proxied( + await greenlet_spawn( + self.session.sync_session.begin_nested # type: ignore + if self.nested + else self.session.sync_session.begin + ) + ) + if is_ctxmanager: + self.sync_transaction.__enter__() + return self + + async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None: + await greenlet_spawn( + self._sync_transaction().__exit__, type_, value, traceback + ) + + +def async_object_session(instance: object) -> Optional[AsyncSession]: + """Return the :class:`_asyncio.AsyncSession` to which the given instance + belongs. + + This function makes use of the sync-API function + :class:`_orm.object_session` to retrieve the :class:`_orm.Session` which + refers to the given instance, and from there links it to the original + :class:`_asyncio.AsyncSession`. + + If the :class:`_asyncio.AsyncSession` has been garbage collected, the + return value is ``None``. + + This functionality is also available from the + :attr:`_orm.InstanceState.async_session` accessor. + + :param instance: an ORM mapped instance + :return: an :class:`_asyncio.AsyncSession` object, or ``None``. + + .. versionadded:: 1.4.18 + + """ + + session = object_session(instance) + if session is not None: + return async_session(session) + else: + return None + + +def async_session(session: Session) -> Optional[AsyncSession]: + """Return the :class:`_asyncio.AsyncSession` which is proxying the given + :class:`_orm.Session` object, if any. + + :param session: a :class:`_orm.Session` instance. + :return: a :class:`_asyncio.AsyncSession` instance, or ``None``. + + .. versionadded:: 1.4.18 + + """ + return AsyncSession._retrieve_proxy_for_target(session, regenerate=False) + + +async def close_all_sessions() -> None: + """Close all :class:`_asyncio.AsyncSession` sessions. + + .. versionadded:: 2.0.23 + + .. seealso:: + + :func:`.session.close_all_sessions` + + """ + await greenlet_spawn(_sync_close_all_sessions) + + +_instance_state._async_provider = async_session # type: ignore |