summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py1466
1 files changed, 1466 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py
new file mode 100644
index 0000000..8fc8e96
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/engine.py
@@ -0,0 +1,1466 @@
+# ext/asyncio/engine.py
+# Copyright (C) 2020-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+from __future__ import annotations
+
+import asyncio
+import contextlib
+from typing import Any
+from typing import AsyncIterator
+from typing import Callable
+from typing import Dict
+from typing import Generator
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from . import exc as async_exc
+from .base import asyncstartablecontext
+from .base import GeneratorStartableContext
+from .base import ProxyComparable
+from .base import StartableContext
+from .result import _ensure_sync_result
+from .result import AsyncResult
+from .result import AsyncScalarResult
+from ... import exc
+from ... import inspection
+from ... import util
+from ...engine import Connection
+from ...engine import create_engine as _create_engine
+from ...engine import create_pool_from_url as _create_pool_from_url
+from ...engine import Engine
+from ...engine.base import NestedTransaction
+from ...engine.base import Transaction
+from ...exc import ArgumentError
+from ...util.concurrency import greenlet_spawn
+from ...util.typing import Concatenate
+from ...util.typing import ParamSpec
+
+if TYPE_CHECKING:
+ from ...engine.cursor import CursorResult
+ from ...engine.interfaces import _CoreAnyExecuteParams
+ from ...engine.interfaces import _CoreSingleExecuteParams
+ from ...engine.interfaces import _DBAPIAnyExecuteParams
+ from ...engine.interfaces import _ExecuteOptions
+ from ...engine.interfaces import CompiledCacheType
+ from ...engine.interfaces import CoreExecuteOptionsParameter
+ from ...engine.interfaces import Dialect
+ from ...engine.interfaces import IsolationLevel
+ from ...engine.interfaces import SchemaTranslateMapType
+ from ...engine.result import ScalarResult
+ from ...engine.url import URL
+ from ...pool import Pool
+ from ...pool import PoolProxiedConnection
+ from ...sql._typing import _InfoType
+ from ...sql.base import Executable
+ from ...sql.selectable import TypedReturnsRows
+
+_P = ParamSpec("_P")
+_T = TypeVar("_T", bound=Any)
+
+
+def create_async_engine(url: Union[str, URL], **kw: Any) -> AsyncEngine:
+ """Create a new async engine instance.
+
+ Arguments passed to :func:`_asyncio.create_async_engine` are mostly
+ identical to those passed to the :func:`_sa.create_engine` function.
+ The specified dialect must be an asyncio-compatible dialect
+ such as :ref:`dialect-postgresql-asyncpg`.
+
+ .. versionadded:: 1.4
+
+ :param async_creator: an async callable which returns a driver-level
+ asyncio connection. If given, the function should take no arguments,
+ and return a new asyncio connection from the underlying asyncio
+ database driver; the connection will be wrapped in the appropriate
+ structures to be used with the :class:`.AsyncEngine`. Note that the
+ parameters specified in the URL are not applied here, and the creator
+ function should use its own connection parameters.
+
+ This parameter is the asyncio equivalent of the
+ :paramref:`_sa.create_engine.creator` parameter of the
+ :func:`_sa.create_engine` function.
+
+ .. versionadded:: 2.0.16
+
+ """
+
+ if kw.get("server_side_cursors", False):
+ raise async_exc.AsyncMethodRequired(
+ "Can't set server_side_cursors for async engine globally; "
+ "use the connection.stream() method for an async "
+ "streaming result set"
+ )
+ kw["_is_async"] = True
+ async_creator = kw.pop("async_creator", None)
+ if async_creator:
+ if kw.get("creator", None):
+ raise ArgumentError(
+ "Can only specify one of 'async_creator' or 'creator', "
+ "not both."
+ )
+
+ def creator() -> Any:
+ # note that to send adapted arguments like
+ # prepared_statement_cache_size, user would use
+ # "creator" and emulate this form here
+ return sync_engine.dialect.dbapi.connect( # type: ignore
+ async_creator_fn=async_creator
+ )
+
+ kw["creator"] = creator
+ sync_engine = _create_engine(url, **kw)
+ return AsyncEngine(sync_engine)
+
+
+def async_engine_from_config(
+ configuration: Dict[str, Any], prefix: str = "sqlalchemy.", **kwargs: Any
+) -> AsyncEngine:
+ """Create a new AsyncEngine instance using a configuration dictionary.
+
+ This function is analogous to the :func:`_sa.engine_from_config` function
+ in SQLAlchemy Core, except that the requested dialect must be an
+ asyncio-compatible dialect such as :ref:`dialect-postgresql-asyncpg`.
+ The argument signature of the function is identical to that
+ of :func:`_sa.engine_from_config`.
+
+ .. versionadded:: 1.4.29
+
+ """
+ options = {
+ key[len(prefix) :]: value
+ for key, value in configuration.items()
+ if key.startswith(prefix)
+ }
+ options["_coerce_config"] = True
+ options.update(kwargs)
+ url = options.pop("url")
+ return create_async_engine(url, **options)
+
+
+def create_async_pool_from_url(url: Union[str, URL], **kwargs: Any) -> Pool:
+ """Create a new async engine instance.
+
+ Arguments passed to :func:`_asyncio.create_async_pool_from_url` are mostly
+ identical to those passed to the :func:`_sa.create_pool_from_url` function.
+ The specified dialect must be an asyncio-compatible dialect
+ such as :ref:`dialect-postgresql-asyncpg`.
+
+ .. versionadded:: 2.0.10
+
+ """
+ kwargs["_is_async"] = True
+ return _create_pool_from_url(url, **kwargs)
+
+
+class AsyncConnectable:
+ __slots__ = "_slots_dispatch", "__weakref__"
+
+ @classmethod
+ def _no_async_engine_events(cls) -> NoReturn:
+ raise NotImplementedError(
+ "asynchronous events are not implemented at this time. Apply "
+ "synchronous listeners to the AsyncEngine.sync_engine or "
+ "AsyncConnection.sync_connection attributes."
+ )
+
+
+@util.create_proxy_methods(
+ Connection,
+ ":class:`_engine.Connection`",
+ ":class:`_asyncio.AsyncConnection`",
+ classmethods=[],
+ methods=[],
+ attributes=[
+ "closed",
+ "invalidated",
+ "dialect",
+ "default_isolation_level",
+ ],
+)
+class AsyncConnection(
+ ProxyComparable[Connection],
+ StartableContext["AsyncConnection"],
+ AsyncConnectable,
+):
+ """An asyncio proxy for a :class:`_engine.Connection`.
+
+ :class:`_asyncio.AsyncConnection` is acquired using the
+ :meth:`_asyncio.AsyncEngine.connect`
+ method of :class:`_asyncio.AsyncEngine`::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
+
+ async with engine.connect() as conn:
+ result = await conn.execute(select(table))
+
+ .. versionadded:: 1.4
+
+ """ # noqa
+
+ # AsyncConnection is a thin proxy; no state should be added here
+ # that is not retrievable from the "sync" engine / connection, e.g.
+ # current transaction, info, etc. It should be possible to
+ # create a new AsyncConnection that matches this one given only the
+ # "sync" elements.
+ __slots__ = (
+ "engine",
+ "sync_engine",
+ "sync_connection",
+ )
+
+ def __init__(
+ self,
+ async_engine: AsyncEngine,
+ sync_connection: Optional[Connection] = None,
+ ):
+ self.engine = async_engine
+ self.sync_engine = async_engine.sync_engine
+ self.sync_connection = self._assign_proxied(sync_connection)
+
+ sync_connection: Optional[Connection]
+ """Reference to the sync-style :class:`_engine.Connection` this
+ :class:`_asyncio.AsyncConnection` proxies requests towards.
+
+ This instance can be used as an event target.
+
+ .. seealso::
+
+ :ref:`asyncio_events`
+
+ """
+
+ sync_engine: Engine
+ """Reference to the sync-style :class:`_engine.Engine` this
+ :class:`_asyncio.AsyncConnection` is associated with via its underlying
+ :class:`_engine.Connection`.
+
+ This instance can be used as an event target.
+
+ .. seealso::
+
+ :ref:`asyncio_events`
+
+ """
+
+ @classmethod
+ def _regenerate_proxy_for_target(
+ cls, target: Connection
+ ) -> AsyncConnection:
+ return AsyncConnection(
+ AsyncEngine._retrieve_proxy_for_target(target.engine), target
+ )
+
+ async def start(
+ self, is_ctxmanager: bool = False # noqa: U100
+ ) -> AsyncConnection:
+ """Start this :class:`_asyncio.AsyncConnection` object's context
+ outside of using a Python ``with:`` block.
+
+ """
+ if self.sync_connection:
+ raise exc.InvalidRequestError("connection is already started")
+ self.sync_connection = self._assign_proxied(
+ await greenlet_spawn(self.sync_engine.connect)
+ )
+ return self
+
+ @property
+ def connection(self) -> NoReturn:
+ """Not implemented for async; call
+ :meth:`_asyncio.AsyncConnection.get_raw_connection`.
+ """
+ raise exc.InvalidRequestError(
+ "AsyncConnection.connection accessor is not implemented as the "
+ "attribute may need to reconnect on an invalidated connection. "
+ "Use the get_raw_connection() method."
+ )
+
+ async def get_raw_connection(self) -> PoolProxiedConnection:
+ """Return the pooled DBAPI-level connection in use by this
+ :class:`_asyncio.AsyncConnection`.
+
+ This is a SQLAlchemy connection-pool proxied connection
+ which then has the attribute
+ :attr:`_pool._ConnectionFairy.driver_connection` that refers to the
+ actual driver connection. Its
+ :attr:`_pool._ConnectionFairy.dbapi_connection` refers instead
+ to an :class:`_engine.AdaptedConnection` instance that
+ adapts the driver connection to the DBAPI protocol.
+
+ """
+
+ return await greenlet_spawn(getattr, self._proxied, "connection")
+
+ @util.ro_non_memoized_property
+ def info(self) -> _InfoType:
+ """Return the :attr:`_engine.Connection.info` dictionary of the
+ underlying :class:`_engine.Connection`.
+
+ This dictionary is freely writable for user-defined state to be
+ associated with the database connection.
+
+ This attribute is only available if the :class:`.AsyncConnection` is
+ currently connected. If the :attr:`.AsyncConnection.closed` attribute
+ is ``True``, then accessing this attribute will raise
+ :class:`.ResourceClosedError`.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ return self._proxied.info
+
+ @util.ro_non_memoized_property
+ def _proxied(self) -> Connection:
+ if not self.sync_connection:
+ self._raise_for_not_started()
+ return self.sync_connection
+
+ def begin(self) -> AsyncTransaction:
+ """Begin a transaction prior to autobegin occurring."""
+ assert self._proxied
+ return AsyncTransaction(self)
+
+ def begin_nested(self) -> AsyncTransaction:
+ """Begin a nested transaction and return a transaction handle."""
+ assert self._proxied
+ return AsyncTransaction(self, nested=True)
+
+ async def invalidate(
+ self, exception: Optional[BaseException] = None
+ ) -> None:
+ """Invalidate the underlying DBAPI connection associated with
+ this :class:`_engine.Connection`.
+
+ See the method :meth:`_engine.Connection.invalidate` for full
+ detail on this method.
+
+ """
+
+ return await greenlet_spawn(
+ self._proxied.invalidate, exception=exception
+ )
+
+ async def get_isolation_level(self) -> IsolationLevel:
+ return await greenlet_spawn(self._proxied.get_isolation_level)
+
+ def in_transaction(self) -> bool:
+ """Return True if a transaction is in progress."""
+
+ return self._proxied.in_transaction()
+
+ def in_nested_transaction(self) -> bool:
+ """Return True if a transaction is in progress.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+ return self._proxied.in_nested_transaction()
+
+ def get_transaction(self) -> Optional[AsyncTransaction]:
+ """Return an :class:`.AsyncTransaction` representing the current
+ transaction, if any.
+
+ This makes use of the underlying synchronous connection's
+ :meth:`_engine.Connection.get_transaction` method to get the current
+ :class:`_engine.Transaction`, which is then proxied in a new
+ :class:`.AsyncTransaction` object.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+
+ trans = self._proxied.get_transaction()
+ if trans is not None:
+ return AsyncTransaction._retrieve_proxy_for_target(trans)
+ else:
+ return None
+
+ def get_nested_transaction(self) -> Optional[AsyncTransaction]:
+ """Return an :class:`.AsyncTransaction` representing the current
+ nested (savepoint) transaction, if any.
+
+ This makes use of the underlying synchronous connection's
+ :meth:`_engine.Connection.get_nested_transaction` method to get the
+ current :class:`_engine.Transaction`, which is then proxied in a new
+ :class:`.AsyncTransaction` object.
+
+ .. versionadded:: 1.4.0b2
+
+ """
+
+ trans = self._proxied.get_nested_transaction()
+ if trans is not None:
+ return AsyncTransaction._retrieve_proxy_for_target(trans)
+ else:
+ return None
+
+ @overload
+ async def execution_options(
+ self,
+ *,
+ compiled_cache: Optional[CompiledCacheType] = ...,
+ logging_token: str = ...,
+ isolation_level: IsolationLevel = ...,
+ no_parameters: bool = False,
+ stream_results: bool = False,
+ max_row_buffer: int = ...,
+ yield_per: int = ...,
+ insertmanyvalues_page_size: int = ...,
+ schema_translate_map: Optional[SchemaTranslateMapType] = ...,
+ preserve_rowcount: bool = False,
+ **opt: Any,
+ ) -> AsyncConnection: ...
+
+ @overload
+ async def execution_options(self, **opt: Any) -> AsyncConnection: ...
+
+ async def execution_options(self, **opt: Any) -> AsyncConnection:
+ r"""Set non-SQL options for the connection which take effect
+ during execution.
+
+ This returns this :class:`_asyncio.AsyncConnection` object with
+ the new options added.
+
+ See :meth:`_engine.Connection.execution_options` for full details
+ on this method.
+
+ """
+
+ conn = self._proxied
+ c2 = await greenlet_spawn(conn.execution_options, **opt)
+ assert c2 is conn
+ return self
+
+ async def commit(self) -> None:
+ """Commit the transaction that is currently in progress.
+
+ This method commits the current transaction if one has been started.
+ If no transaction was started, the method has no effect, assuming
+ the connection is in a non-invalidated state.
+
+ A transaction is begun on a :class:`_engine.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_engine.Connection.begin` method is called.
+
+ """
+ await greenlet_spawn(self._proxied.commit)
+
+ async def rollback(self) -> None:
+ """Roll back the transaction that is currently in progress.
+
+ This method rolls back the current transaction if one has been started.
+ If no transaction was started, the method has no effect. If a
+ transaction was started and the connection is in an invalidated state,
+ the transaction is cleared using this method.
+
+ A transaction is begun on a :class:`_engine.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_engine.Connection.begin` method is called.
+
+
+ """
+ await greenlet_spawn(self._proxied.rollback)
+
+ async def close(self) -> None:
+ """Close this :class:`_asyncio.AsyncConnection`.
+
+ This has the effect of also rolling back the transaction if one
+ is in place.
+
+ """
+ await greenlet_spawn(self._proxied.close)
+
+ async def aclose(self) -> None:
+ """A synonym for :meth:`_asyncio.AsyncConnection.close`.
+
+ The :meth:`_asyncio.AsyncConnection.aclose` name is specifically
+ to support the Python standard library ``@contextlib.aclosing``
+ context manager function.
+
+ .. versionadded:: 2.0.20
+
+ """
+ await self.close()
+
+ async def exec_driver_sql(
+ self,
+ statement: str,
+ parameters: Optional[_DBAPIAnyExecuteParams] = None,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> CursorResult[Any]:
+ r"""Executes a driver-level SQL string and return buffered
+ :class:`_engine.Result`.
+
+ """
+
+ result = await greenlet_spawn(
+ self._proxied.exec_driver_sql,
+ statement,
+ parameters,
+ execution_options,
+ _require_await=True,
+ )
+
+ return await _ensure_sync_result(result, self.exec_driver_sql)
+
+ @overload
+ def stream(
+ self,
+ statement: TypedReturnsRows[_T],
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> GeneratorStartableContext[AsyncResult[_T]]: ...
+
+ @overload
+ def stream(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> GeneratorStartableContext[AsyncResult[Any]]: ...
+
+ @asyncstartablecontext
+ async def stream(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> AsyncIterator[AsyncResult[Any]]:
+ """Execute a statement and return an awaitable yielding a
+ :class:`_asyncio.AsyncResult` object.
+
+ E.g.::
+
+ result = await conn.stream(stmt):
+ async for row in result:
+ print(f"{row}")
+
+ The :meth:`.AsyncConnection.stream`
+ method supports optional context manager use against the
+ :class:`.AsyncResult` object, as in::
+
+ async with conn.stream(stmt) as result:
+ async for row in result:
+ print(f"{row}")
+
+ In the above pattern, the :meth:`.AsyncResult.close` method is
+ invoked unconditionally, even if the iterator is interrupted by an
+ exception throw. Context manager use remains optional, however,
+ and the function may be called in either an ``async with fn():`` or
+ ``await fn()`` style.
+
+ .. versionadded:: 2.0.0b3 added context manager support
+
+
+ :return: an awaitable object that will yield an
+ :class:`_asyncio.AsyncResult` object.
+
+ .. seealso::
+
+ :meth:`.AsyncConnection.stream_scalars`
+
+ """
+ if not self.dialect.supports_server_side_cursors:
+ raise exc.InvalidRequestError(
+ "Cant use `stream` or `stream_scalars` with the current "
+ "dialect since it does not support server side cursors."
+ )
+
+ result = await greenlet_spawn(
+ self._proxied.execute,
+ statement,
+ parameters,
+ execution_options=util.EMPTY_DICT.merge_with(
+ execution_options, {"stream_results": True}
+ ),
+ _require_await=True,
+ )
+ assert result.context._is_server_side
+ ar = AsyncResult(result)
+ try:
+ yield ar
+ except GeneratorExit:
+ pass
+ else:
+ task = asyncio.create_task(ar.close())
+ await asyncio.shield(task)
+
+ @overload
+ async def execute(
+ self,
+ statement: TypedReturnsRows[_T],
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> CursorResult[_T]: ...
+
+ @overload
+ async def execute(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> CursorResult[Any]: ...
+
+ async def execute(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> CursorResult[Any]:
+ r"""Executes a SQL statement construct and return a buffered
+ :class:`_engine.Result`.
+
+ :param object: The statement to be executed. This is always
+ an object that is in both the :class:`_expression.ClauseElement` and
+ :class:`_expression.Executable` hierarchies, including:
+
+ * :class:`_expression.Select`
+ * :class:`_expression.Insert`, :class:`_expression.Update`,
+ :class:`_expression.Delete`
+ * :class:`_expression.TextClause` and
+ :class:`_expression.TextualSelect`
+ * :class:`_schema.DDL` and objects which inherit from
+ :class:`_schema.ExecutableDDLElement`
+
+ :param parameters: parameters which will be bound into the statement.
+ This may be either a dictionary of parameter names to values,
+ or a mutable sequence (e.g. a list) of dictionaries. When a
+ list of dictionaries is passed, the underlying statement execution
+ will make use of the DBAPI ``cursor.executemany()`` method.
+ When a single dictionary is passed, the DBAPI ``cursor.execute()``
+ method will be used.
+
+ :param execution_options: optional dictionary of execution options,
+ which will be associated with the statement execution. This
+ dictionary can provide a subset of the options that are accepted
+ by :meth:`_engine.Connection.execution_options`.
+
+ :return: a :class:`_engine.Result` object.
+
+ """
+ result = await greenlet_spawn(
+ self._proxied.execute,
+ statement,
+ parameters,
+ execution_options=execution_options,
+ _require_await=True,
+ )
+ return await _ensure_sync_result(result, self.execute)
+
+ @overload
+ async def scalar(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> Optional[_T]: ...
+
+ @overload
+ async def scalar(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> Any: ...
+
+ async def scalar(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> Any:
+ r"""Executes a SQL statement construct and returns a scalar object.
+
+ This method is shorthand for invoking the
+ :meth:`_engine.Result.scalar` method after invoking the
+ :meth:`_engine.Connection.execute` method. Parameters are equivalent.
+
+ :return: a scalar Python value representing the first column of the
+ first row returned.
+
+ """
+ result = await self.execute(
+ statement, parameters, execution_options=execution_options
+ )
+ return result.scalar()
+
+ @overload
+ async def scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> ScalarResult[_T]: ...
+
+ @overload
+ async def scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> ScalarResult[Any]: ...
+
+ async def scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreAnyExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> ScalarResult[Any]:
+ r"""Executes a SQL statement construct and returns a scalar objects.
+
+ This method is shorthand for invoking the
+ :meth:`_engine.Result.scalars` method after invoking the
+ :meth:`_engine.Connection.execute` method. Parameters are equivalent.
+
+ :return: a :class:`_engine.ScalarResult` object.
+
+ .. versionadded:: 1.4.24
+
+ """
+ result = await self.execute(
+ statement, parameters, execution_options=execution_options
+ )
+ return result.scalars()
+
+ @overload
+ def stream_scalars(
+ self,
+ statement: TypedReturnsRows[Tuple[_T]],
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> GeneratorStartableContext[AsyncScalarResult[_T]]: ...
+
+ @overload
+ def stream_scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> GeneratorStartableContext[AsyncScalarResult[Any]]: ...
+
+ @asyncstartablecontext
+ async def stream_scalars(
+ self,
+ statement: Executable,
+ parameters: Optional[_CoreSingleExecuteParams] = None,
+ *,
+ execution_options: Optional[CoreExecuteOptionsParameter] = None,
+ ) -> AsyncIterator[AsyncScalarResult[Any]]:
+ r"""Execute a statement and return an awaitable yielding a
+ :class:`_asyncio.AsyncScalarResult` object.
+
+ E.g.::
+
+ result = await conn.stream_scalars(stmt)
+ async for scalar in result:
+ print(f"{scalar}")
+
+ This method is shorthand for invoking the
+ :meth:`_engine.AsyncResult.scalars` method after invoking the
+ :meth:`_engine.Connection.stream` method. Parameters are equivalent.
+
+ The :meth:`.AsyncConnection.stream_scalars`
+ method supports optional context manager use against the
+ :class:`.AsyncScalarResult` object, as in::
+
+ async with conn.stream_scalars(stmt) as result:
+ async for scalar in result:
+ print(f"{scalar}")
+
+ In the above pattern, the :meth:`.AsyncScalarResult.close` method is
+ invoked unconditionally, even if the iterator is interrupted by an
+ exception throw. Context manager use remains optional, however,
+ and the function may be called in either an ``async with fn():`` or
+ ``await fn()`` style.
+
+ .. versionadded:: 2.0.0b3 added context manager support
+
+ :return: an awaitable object that will yield an
+ :class:`_asyncio.AsyncScalarResult` object.
+
+ .. versionadded:: 1.4.24
+
+ .. seealso::
+
+ :meth:`.AsyncConnection.stream`
+
+ """
+
+ async with self.stream(
+ statement, parameters, execution_options=execution_options
+ ) as result:
+ yield result.scalars()
+
+ async def run_sync(
+ self,
+ fn: Callable[Concatenate[Connection, _P], _T],
+ *arg: _P.args,
+ **kw: _P.kwargs,
+ ) -> _T:
+ """Invoke the given synchronous (i.e. not async) callable,
+ passing a synchronous-style :class:`_engine.Connection` as the first
+ argument.
+
+ This method allows traditional synchronous SQLAlchemy functions to
+ run within the context of an asyncio application.
+
+ E.g.::
+
+ def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str:
+ '''A synchronous function that does not require awaiting
+
+ :param conn: a Core SQLAlchemy Connection, used synchronously
+
+ :return: an optional return value is supported
+
+ '''
+ conn.execute(
+ some_table.insert().values(int_col=arg1, str_col=arg2)
+ )
+ return "success"
+
+
+ async def do_something_async(async_engine: AsyncEngine) -> None:
+ '''an async function that uses awaiting'''
+
+ async with async_engine.begin() as async_conn:
+ # run do_something_with_core() with a sync-style
+ # Connection, proxied into an awaitable
+ return_code = await async_conn.run_sync(do_something_with_core, 5, "strval")
+ print(return_code)
+
+ This method maintains the asyncio event loop all the way through
+ to the database connection by running the given callable in a
+ specially instrumented greenlet.
+
+ The most rudimentary use of :meth:`.AsyncConnection.run_sync` is to
+ invoke methods such as :meth:`_schema.MetaData.create_all`, given
+ an :class:`.AsyncConnection` that needs to be provided to
+ :meth:`_schema.MetaData.create_all` as a :class:`_engine.Connection`
+ object::
+
+ # run metadata.create_all(conn) with a sync-style Connection,
+ # proxied into an awaitable
+ with async_engine.begin() as conn:
+ await conn.run_sync(metadata.create_all)
+
+ .. note::
+
+ The provided callable is invoked inline within the asyncio event
+ loop, and will block on traditional IO calls. IO within this
+ callable should only call into SQLAlchemy's asyncio database
+ APIs which will be properly adapted to the greenlet context.
+
+ .. seealso::
+
+ :meth:`.AsyncSession.run_sync`
+
+ :ref:`session_run_sync`
+
+ """ # noqa: E501
+
+ return await greenlet_spawn(
+ fn, self._proxied, *arg, _require_await=False, **kw
+ )
+
+ def __await__(self) -> Generator[Any, None, AsyncConnection]:
+ return self.start().__await__()
+
+ async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
+ task = asyncio.create_task(self.close())
+ await asyncio.shield(task)
+
+ # START PROXY METHODS AsyncConnection
+
+ # code within this block is **programmatically,
+ # statically generated** by tools/generate_proxy_methods.py
+
+ @property
+ def closed(self) -> Any:
+ r"""Return True if this connection is closed.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Connection` class
+ on behalf of the :class:`_asyncio.AsyncConnection` class.
+
+ """ # noqa: E501
+
+ return self._proxied.closed
+
+ @property
+ def invalidated(self) -> Any:
+ r"""Return True if this connection was invalidated.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Connection` class
+ on behalf of the :class:`_asyncio.AsyncConnection` class.
+
+ This does not indicate whether or not the connection was
+ invalidated at the pool level, however
+
+
+ """ # noqa: E501
+
+ return self._proxied.invalidated
+
+ @property
+ def dialect(self) -> Dialect:
+ r"""Proxy for the :attr:`_engine.Connection.dialect` attribute
+ on behalf of the :class:`_asyncio.AsyncConnection` class.
+
+ """ # noqa: E501
+
+ return self._proxied.dialect
+
+ @dialect.setter
+ def dialect(self, attr: Dialect) -> None:
+ self._proxied.dialect = attr
+
+ @property
+ def default_isolation_level(self) -> Any:
+ r"""The initial-connection time isolation level associated with the
+ :class:`_engine.Dialect` in use.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Connection` class
+ on behalf of the :class:`_asyncio.AsyncConnection` class.
+
+ This value is independent of the
+ :paramref:`.Connection.execution_options.isolation_level` and
+ :paramref:`.Engine.execution_options.isolation_level` execution
+ options, and is determined by the :class:`_engine.Dialect` when the
+ first connection is created, by performing a SQL query against the
+ database for the current isolation level before any additional commands
+ have been emitted.
+
+ Calling this accessor does not invoke any new SQL queries.
+
+ .. seealso::
+
+ :meth:`_engine.Connection.get_isolation_level`
+ - view current actual isolation level
+
+ :paramref:`_sa.create_engine.isolation_level`
+ - set per :class:`_engine.Engine` isolation level
+
+ :paramref:`.Connection.execution_options.isolation_level`
+ - set per :class:`_engine.Connection` isolation level
+
+
+ """ # noqa: E501
+
+ return self._proxied.default_isolation_level
+
+ # END PROXY METHODS AsyncConnection
+
+
+@util.create_proxy_methods(
+ Engine,
+ ":class:`_engine.Engine`",
+ ":class:`_asyncio.AsyncEngine`",
+ classmethods=[],
+ methods=[
+ "clear_compiled_cache",
+ "update_execution_options",
+ "get_execution_options",
+ ],
+ attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"],
+)
+class AsyncEngine(ProxyComparable[Engine], AsyncConnectable):
+ """An asyncio proxy for a :class:`_engine.Engine`.
+
+ :class:`_asyncio.AsyncEngine` is acquired using the
+ :func:`_asyncio.create_async_engine` function::
+
+ from sqlalchemy.ext.asyncio import create_async_engine
+ engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
+
+ .. versionadded:: 1.4
+
+ """ # noqa
+
+ # AsyncEngine is a thin proxy; no state should be added here
+ # that is not retrievable from the "sync" engine / connection, e.g.
+ # current transaction, info, etc. It should be possible to
+ # create a new AsyncEngine that matches this one given only the
+ # "sync" elements.
+ __slots__ = "sync_engine"
+
+ _connection_cls: Type[AsyncConnection] = AsyncConnection
+
+ sync_engine: Engine
+ """Reference to the sync-style :class:`_engine.Engine` this
+ :class:`_asyncio.AsyncEngine` proxies requests towards.
+
+ This instance can be used as an event target.
+
+ .. seealso::
+
+ :ref:`asyncio_events`
+ """
+
+ def __init__(self, sync_engine: Engine):
+ if not sync_engine.dialect.is_async:
+ raise exc.InvalidRequestError(
+ "The asyncio extension requires an async driver to be used. "
+ f"The loaded {sync_engine.dialect.driver!r} is not async."
+ )
+ self.sync_engine = self._assign_proxied(sync_engine)
+
+ @util.ro_non_memoized_property
+ def _proxied(self) -> Engine:
+ return self.sync_engine
+
+ @classmethod
+ def _regenerate_proxy_for_target(cls, target: Engine) -> AsyncEngine:
+ return AsyncEngine(target)
+
+ @contextlib.asynccontextmanager
+ async def begin(self) -> AsyncIterator[AsyncConnection]:
+ """Return a context manager which when entered will deliver an
+ :class:`_asyncio.AsyncConnection` with an
+ :class:`_asyncio.AsyncTransaction` established.
+
+ E.g.::
+
+ async with async_engine.begin() as conn:
+ await conn.execute(
+ text("insert into table (x, y, z) values (1, 2, 3)")
+ )
+ await conn.execute(text("my_special_procedure(5)"))
+
+
+ """
+ conn = self.connect()
+
+ async with conn:
+ async with conn.begin():
+ yield conn
+
+ def connect(self) -> AsyncConnection:
+ """Return an :class:`_asyncio.AsyncConnection` object.
+
+ The :class:`_asyncio.AsyncConnection` will procure a database
+ connection from the underlying connection pool when it is entered
+ as an async context manager::
+
+ async with async_engine.connect() as conn:
+ result = await conn.execute(select(user_table))
+
+ The :class:`_asyncio.AsyncConnection` may also be started outside of a
+ context manager by invoking its :meth:`_asyncio.AsyncConnection.start`
+ method.
+
+ """
+
+ return self._connection_cls(self)
+
+ async def raw_connection(self) -> PoolProxiedConnection:
+ """Return a "raw" DBAPI connection from the connection pool.
+
+ .. seealso::
+
+ :ref:`dbapi_connections`
+
+ """
+ return await greenlet_spawn(self.sync_engine.raw_connection)
+
+ @overload
+ def execution_options(
+ self,
+ *,
+ compiled_cache: Optional[CompiledCacheType] = ...,
+ logging_token: str = ...,
+ isolation_level: IsolationLevel = ...,
+ insertmanyvalues_page_size: int = ...,
+ schema_translate_map: Optional[SchemaTranslateMapType] = ...,
+ **opt: Any,
+ ) -> AsyncEngine: ...
+
+ @overload
+ def execution_options(self, **opt: Any) -> AsyncEngine: ...
+
+ def execution_options(self, **opt: Any) -> AsyncEngine:
+ """Return a new :class:`_asyncio.AsyncEngine` that will provide
+ :class:`_asyncio.AsyncConnection` objects with the given execution
+ options.
+
+ Proxied from :meth:`_engine.Engine.execution_options`. See that
+ method for details.
+
+ """
+
+ return AsyncEngine(self.sync_engine.execution_options(**opt))
+
+ async def dispose(self, close: bool = True) -> None:
+ """Dispose of the connection pool used by this
+ :class:`_asyncio.AsyncEngine`.
+
+ :param close: if left at its default of ``True``, has the
+ effect of fully closing all **currently checked in**
+ database connections. Connections that are still checked out
+ will **not** be closed, however they will no longer be associated
+ with this :class:`_engine.Engine`,
+ so when they are closed individually, eventually the
+ :class:`_pool.Pool` which they are associated with will
+ be garbage collected and they will be closed out fully, if
+ not already closed on checkin.
+
+ If set to ``False``, the previous connection pool is de-referenced,
+ and otherwise not touched in any way.
+
+ .. seealso::
+
+ :meth:`_engine.Engine.dispose`
+
+ """
+
+ await greenlet_spawn(self.sync_engine.dispose, close=close)
+
+ # START PROXY METHODS AsyncEngine
+
+ # code within this block is **programmatically,
+ # statically generated** by tools/generate_proxy_methods.py
+
+ def clear_compiled_cache(self) -> None:
+ r"""Clear the compiled cache associated with the dialect.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Engine` class on
+ behalf of the :class:`_asyncio.AsyncEngine` class.
+
+ This applies **only** to the built-in cache that is established
+ via the :paramref:`_engine.create_engine.query_cache_size` parameter.
+ It will not impact any dictionary caches that were passed via the
+ :paramref:`.Connection.execution_options.compiled_cache` parameter.
+
+ .. versionadded:: 1.4
+
+
+ """ # noqa: E501
+
+ return self._proxied.clear_compiled_cache()
+
+ def update_execution_options(self, **opt: Any) -> None:
+ r"""Update the default execution_options dictionary
+ of this :class:`_engine.Engine`.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Engine` class on
+ behalf of the :class:`_asyncio.AsyncEngine` class.
+
+ The given keys/values in \**opt are added to the
+ default execution options that will be used for
+ all connections. The initial contents of this dictionary
+ can be sent via the ``execution_options`` parameter
+ to :func:`_sa.create_engine`.
+
+ .. seealso::
+
+ :meth:`_engine.Connection.execution_options`
+
+ :meth:`_engine.Engine.execution_options`
+
+
+ """ # noqa: E501
+
+ return self._proxied.update_execution_options(**opt)
+
+ def get_execution_options(self) -> _ExecuteOptions:
+ r"""Get the non-SQL options which will take effect during execution.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Engine` class on
+ behalf of the :class:`_asyncio.AsyncEngine` class.
+
+ .. versionadded: 1.3
+
+ .. seealso::
+
+ :meth:`_engine.Engine.execution_options`
+
+ """ # noqa: E501
+
+ return self._proxied.get_execution_options()
+
+ @property
+ def url(self) -> URL:
+ r"""Proxy for the :attr:`_engine.Engine.url` attribute
+ on behalf of the :class:`_asyncio.AsyncEngine` class.
+
+ """ # noqa: E501
+
+ return self._proxied.url
+
+ @url.setter
+ def url(self, attr: URL) -> None:
+ self._proxied.url = attr
+
+ @property
+ def pool(self) -> Pool:
+ r"""Proxy for the :attr:`_engine.Engine.pool` attribute
+ on behalf of the :class:`_asyncio.AsyncEngine` class.
+
+ """ # noqa: E501
+
+ return self._proxied.pool
+
+ @pool.setter
+ def pool(self, attr: Pool) -> None:
+ self._proxied.pool = attr
+
+ @property
+ def dialect(self) -> Dialect:
+ r"""Proxy for the :attr:`_engine.Engine.dialect` attribute
+ on behalf of the :class:`_asyncio.AsyncEngine` class.
+
+ """ # noqa: E501
+
+ return self._proxied.dialect
+
+ @dialect.setter
+ def dialect(self, attr: Dialect) -> None:
+ self._proxied.dialect = attr
+
+ @property
+ def engine(self) -> Any:
+ r"""Returns this :class:`.Engine`.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Engine` class
+ on behalf of the :class:`_asyncio.AsyncEngine` class.
+
+ Used for legacy schemes that accept :class:`.Connection` /
+ :class:`.Engine` objects within the same variable.
+
+
+ """ # noqa: E501
+
+ return self._proxied.engine
+
+ @property
+ def name(self) -> Any:
+ r"""String name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
+ in use by this :class:`Engine`.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Engine` class
+ on behalf of the :class:`_asyncio.AsyncEngine` class.
+
+
+ """ # noqa: E501
+
+ return self._proxied.name
+
+ @property
+ def driver(self) -> Any:
+ r"""Driver name of the :class:`~sqlalchemy.engine.interfaces.Dialect`
+ in use by this :class:`Engine`.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Engine` class
+ on behalf of the :class:`_asyncio.AsyncEngine` class.
+
+
+ """ # noqa: E501
+
+ return self._proxied.driver
+
+ @property
+ def echo(self) -> Any:
+ r"""When ``True``, enable log output for this element.
+
+ .. container:: class_bases
+
+ Proxied for the :class:`_engine.Engine` class
+ on behalf of the :class:`_asyncio.AsyncEngine` class.
+
+ This has the effect of setting the Python logging level for the namespace
+ of this element's class and object reference. A value of boolean ``True``
+ indicates that the loglevel ``logging.INFO`` will be set for the logger,
+ whereas the string value ``debug`` will set the loglevel to
+ ``logging.DEBUG``.
+
+ """ # noqa: E501
+
+ return self._proxied.echo
+
+ @echo.setter
+ def echo(self, attr: Any) -> None:
+ self._proxied.echo = attr
+
+ # END PROXY METHODS AsyncEngine
+
+
+class AsyncTransaction(
+ ProxyComparable[Transaction], StartableContext["AsyncTransaction"]
+):
+ """An asyncio proxy for a :class:`_engine.Transaction`."""
+
+ __slots__ = ("connection", "sync_transaction", "nested")
+
+ sync_transaction: Optional[Transaction]
+ connection: AsyncConnection
+ nested: bool
+
+ def __init__(self, connection: AsyncConnection, nested: bool = False):
+ self.connection = connection
+ self.sync_transaction = None
+ self.nested = nested
+
+ @classmethod
+ def _regenerate_proxy_for_target(
+ cls, target: Transaction
+ ) -> AsyncTransaction:
+ sync_connection = target.connection
+ sync_transaction = target
+ nested = isinstance(target, NestedTransaction)
+
+ async_connection = AsyncConnection._retrieve_proxy_for_target(
+ sync_connection
+ )
+ assert async_connection is not None
+
+ obj = cls.__new__(cls)
+ obj.connection = async_connection
+ obj.sync_transaction = obj._assign_proxied(sync_transaction)
+ obj.nested = nested
+ return obj
+
+ @util.ro_non_memoized_property
+ def _proxied(self) -> Transaction:
+ if not self.sync_transaction:
+ self._raise_for_not_started()
+ return self.sync_transaction
+
+ @property
+ def is_valid(self) -> bool:
+ return self._proxied.is_valid
+
+ @property
+ def is_active(self) -> bool:
+ return self._proxied.is_active
+
+ async def close(self) -> None:
+ """Close this :class:`.AsyncTransaction`.
+
+ If this transaction is the base transaction in a begin/commit
+ nesting, the transaction will rollback(). Otherwise, the
+ method returns.
+
+ This is used to cancel a Transaction without affecting the scope of
+ an enclosing transaction.
+
+ """
+ await greenlet_spawn(self._proxied.close)
+
+ async def rollback(self) -> None:
+ """Roll back this :class:`.AsyncTransaction`."""
+ await greenlet_spawn(self._proxied.rollback)
+
+ async def commit(self) -> None:
+ """Commit this :class:`.AsyncTransaction`."""
+
+ await greenlet_spawn(self._proxied.commit)
+
+ async def start(self, is_ctxmanager: bool = False) -> AsyncTransaction:
+ """Start this :class:`_asyncio.AsyncTransaction` object's context
+ outside of using a Python ``with:`` block.
+
+ """
+
+ self.sync_transaction = self._assign_proxied(
+ await greenlet_spawn(
+ self.connection._proxied.begin_nested
+ if self.nested
+ else self.connection._proxied.begin
+ )
+ )
+ if is_ctxmanager:
+ self.sync_transaction.__enter__()
+ return self
+
+ async def __aexit__(self, type_: Any, value: Any, traceback: Any) -> None:
+ await greenlet_spawn(self._proxied.__exit__, type_, value, traceback)
+
+
+@overload
+def _get_sync_engine_or_connection(async_engine: AsyncEngine) -> Engine: ...
+
+
+@overload
+def _get_sync_engine_or_connection(
+ async_engine: AsyncConnection,
+) -> Connection: ...
+
+
+def _get_sync_engine_or_connection(
+ async_engine: Union[AsyncEngine, AsyncConnection]
+) -> Union[Engine, Connection]:
+ if isinstance(async_engine, AsyncConnection):
+ return async_engine._proxied
+
+ try:
+ return async_engine.sync_engine
+ except AttributeError as e:
+ raise exc.ArgumentError(
+ "AsyncEngine expected, got %r" % async_engine
+ ) from e
+
+
+@inspection._inspects(AsyncConnection)
+def _no_insp_for_async_conn_yet(
+ subject: AsyncConnection, # noqa: U100
+) -> NoReturn:
+ raise exc.NoInspectionAvailable(
+ "Inspection on an AsyncConnection is currently not supported. "
+ "Please use ``run_sync`` to pass a callable where it's possible "
+ "to call ``inspect`` on the passed connection.",
+ code="xd3s",
+ )
+
+
+@inspection._inspects(AsyncEngine)
+def _no_insp_for_async_engine_xyet(
+ subject: AsyncEngine, # noqa: U100
+) -> NoReturn:
+ raise exc.NoInspectionAvailable(
+ "Inspection on an AsyncEngine is currently not supported. "
+ "Please obtain a connection then use ``conn.run_sync`` to pass a "
+ "callable where it's possible to call ``inspect`` on the "
+ "passed connection.",
+ code="xd3s",
+ )