# connectors/asyncio.py # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors # # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php # mypy: ignore-errors """generic asyncio-adapted versions of DBAPI connection and cursor""" from __future__ import annotations import collections import itertools from ..engine import AdaptedConnection from ..util.concurrency import asyncio from ..util.concurrency import await_fallback from ..util.concurrency import await_only class AsyncAdapt_dbapi_cursor: server_side = False __slots__ = ( "_adapt_connection", "_connection", "await_", "_cursor", "_rows", ) def __init__(self, adapt_connection): self._adapt_connection = adapt_connection self._connection = adapt_connection._connection self.await_ = adapt_connection.await_ cursor = self._connection.cursor() self._cursor = self._aenter_cursor(cursor) self._rows = collections.deque() def _aenter_cursor(self, cursor): return self.await_(cursor.__aenter__()) @property def description(self): return self._cursor.description @property def rowcount(self): return self._cursor.rowcount @property def arraysize(self): return self._cursor.arraysize @arraysize.setter def arraysize(self, value): self._cursor.arraysize = value @property def lastrowid(self): return self._cursor.lastrowid def close(self): # note we aren't actually closing the cursor here, # we are just letting GC do it. see notes in aiomysql dialect self._rows.clear() def execute(self, operation, parameters=None): return self.await_(self._execute_async(operation, parameters)) def executemany(self, operation, seq_of_parameters): return self.await_( self._executemany_async(operation, seq_of_parameters) ) async def _execute_async(self, operation, parameters): async with self._adapt_connection._execute_mutex: result = await self._cursor.execute(operation, parameters or ()) if self._cursor.description and not self.server_side: self._rows = collections.deque(await self._cursor.fetchall()) return result async def _executemany_async(self, operation, seq_of_parameters): async with self._adapt_connection._execute_mutex: return await self._cursor.executemany(operation, seq_of_parameters) def nextset(self): self.await_(self._cursor.nextset()) if self._cursor.description and not self.server_side: self._rows = collections.deque( self.await_(self._cursor.fetchall()) ) def setinputsizes(self, *inputsizes): # NOTE: this is overrridden in aioodbc due to # see https://github.com/aio-libs/aioodbc/issues/451 # right now return self.await_(self._cursor.setinputsizes(*inputsizes)) def __iter__(self): while self._rows: yield self._rows.popleft() def fetchone(self): if self._rows: return self._rows.popleft() else: return None def fetchmany(self, size=None): if size is None: size = self.arraysize rr = iter(self._rows) retval = list(itertools.islice(rr, 0, size)) self._rows = collections.deque(rr) return retval def fetchall(self): retval = list(self._rows) self._rows.clear() return retval class AsyncAdapt_dbapi_ss_cursor(AsyncAdapt_dbapi_cursor): __slots__ = () server_side = True def __init__(self, adapt_connection): self._adapt_connection = adapt_connection self._connection = adapt_connection._connection self.await_ = adapt_connection.await_ cursor = self._connection.cursor() self._cursor = self.await_(cursor.__aenter__()) def close(self): if self._cursor is not None: self.await_(self._cursor.close()) self._cursor = None def fetchone(self): return self.await_(self._cursor.fetchone()) def fetchmany(self, size=None): return self.await_(self._cursor.fetchmany(size=size)) def fetchall(self): return self.await_(self._cursor.fetchall()) class AsyncAdapt_dbapi_connection(AdaptedConnection): _cursor_cls = AsyncAdapt_dbapi_cursor _ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor await_ = staticmethod(await_only) __slots__ = ("dbapi", "_execute_mutex") def __init__(self, dbapi, connection): self.dbapi = dbapi self._connection = connection self._execute_mutex = asyncio.Lock() def ping(self, reconnect): return self.await_(self._connection.ping(reconnect)) def add_output_converter(self, *arg, **kw): self._connection.add_output_converter(*arg, **kw) def character_set_name(self): return self._connection.character_set_name() @property def autocommit(self): return self._connection.autocommit @autocommit.setter def autocommit(self, value): # https://github.com/aio-libs/aioodbc/issues/448 # self._connection.autocommit = value self._connection._conn.autocommit = value def cursor(self, server_side=False): if server_side: return self._ss_cursor_cls(self) else: return self._cursor_cls(self) def rollback(self): self.await_(self._connection.rollback()) def commit(self): self.await_(self._connection.commit()) def close(self): self.await_(self._connection.close()) class AsyncAdaptFallback_dbapi_connection(AsyncAdapt_dbapi_connection): __slots__ = () await_ = staticmethod(await_fallback)