summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/connectors/asyncio.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/connectors/asyncio.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/connectors/asyncio.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/connectors/asyncio.py208
1 files changed, 208 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/connectors/asyncio.py b/venv/lib/python3.11/site-packages/sqlalchemy/connectors/asyncio.py
new file mode 100644
index 0000000..0b44f23
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/connectors/asyncio.py
@@ -0,0 +1,208 @@
+# connectors/asyncio.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+"""generic asyncio-adapted versions of DBAPI connection and cursor"""
+
+from __future__ import annotations
+
+import collections
+import itertools
+
+from ..engine import AdaptedConnection
+from ..util.concurrency import asyncio
+from ..util.concurrency import await_fallback
+from ..util.concurrency import await_only
+
+
+class AsyncAdapt_dbapi_cursor:
+ server_side = False
+ __slots__ = (
+ "_adapt_connection",
+ "_connection",
+ "await_",
+ "_cursor",
+ "_rows",
+ )
+
+ def __init__(self, adapt_connection):
+ self._adapt_connection = adapt_connection
+ self._connection = adapt_connection._connection
+ self.await_ = adapt_connection.await_
+
+ cursor = self._connection.cursor()
+ self._cursor = self._aenter_cursor(cursor)
+
+ self._rows = collections.deque()
+
+ def _aenter_cursor(self, cursor):
+ return self.await_(cursor.__aenter__())
+
+ @property
+ def description(self):
+ return self._cursor.description
+
+ @property
+ def rowcount(self):
+ return self._cursor.rowcount
+
+ @property
+ def arraysize(self):
+ return self._cursor.arraysize
+
+ @arraysize.setter
+ def arraysize(self, value):
+ self._cursor.arraysize = value
+
+ @property
+ def lastrowid(self):
+ return self._cursor.lastrowid
+
+ def close(self):
+ # note we aren't actually closing the cursor here,
+ # we are just letting GC do it. see notes in aiomysql dialect
+ self._rows.clear()
+
+ def execute(self, operation, parameters=None):
+ return self.await_(self._execute_async(operation, parameters))
+
+ def executemany(self, operation, seq_of_parameters):
+ return self.await_(
+ self._executemany_async(operation, seq_of_parameters)
+ )
+
+ async def _execute_async(self, operation, parameters):
+ async with self._adapt_connection._execute_mutex:
+ result = await self._cursor.execute(operation, parameters or ())
+
+ if self._cursor.description and not self.server_side:
+ self._rows = collections.deque(await self._cursor.fetchall())
+ return result
+
+ async def _executemany_async(self, operation, seq_of_parameters):
+ async with self._adapt_connection._execute_mutex:
+ return await self._cursor.executemany(operation, seq_of_parameters)
+
+ def nextset(self):
+ self.await_(self._cursor.nextset())
+ if self._cursor.description and not self.server_side:
+ self._rows = collections.deque(
+ self.await_(self._cursor.fetchall())
+ )
+
+ def setinputsizes(self, *inputsizes):
+ # NOTE: this is overrridden in aioodbc due to
+ # see https://github.com/aio-libs/aioodbc/issues/451
+ # right now
+
+ return self.await_(self._cursor.setinputsizes(*inputsizes))
+
+ def __iter__(self):
+ while self._rows:
+ yield self._rows.popleft()
+
+ def fetchone(self):
+ if self._rows:
+ return self._rows.popleft()
+ else:
+ return None
+
+ def fetchmany(self, size=None):
+ if size is None:
+ size = self.arraysize
+
+ rr = iter(self._rows)
+ retval = list(itertools.islice(rr, 0, size))
+ self._rows = collections.deque(rr)
+ return retval
+
+ def fetchall(self):
+ retval = list(self._rows)
+ self._rows.clear()
+ return retval
+
+
+class AsyncAdapt_dbapi_ss_cursor(AsyncAdapt_dbapi_cursor):
+ __slots__ = ()
+ server_side = True
+
+ def __init__(self, adapt_connection):
+ self._adapt_connection = adapt_connection
+ self._connection = adapt_connection._connection
+ self.await_ = adapt_connection.await_
+
+ cursor = self._connection.cursor()
+
+ self._cursor = self.await_(cursor.__aenter__())
+
+ def close(self):
+ if self._cursor is not None:
+ self.await_(self._cursor.close())
+ self._cursor = None
+
+ def fetchone(self):
+ return self.await_(self._cursor.fetchone())
+
+ def fetchmany(self, size=None):
+ return self.await_(self._cursor.fetchmany(size=size))
+
+ def fetchall(self):
+ return self.await_(self._cursor.fetchall())
+
+
+class AsyncAdapt_dbapi_connection(AdaptedConnection):
+ _cursor_cls = AsyncAdapt_dbapi_cursor
+ _ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor
+
+ await_ = staticmethod(await_only)
+ __slots__ = ("dbapi", "_execute_mutex")
+
+ def __init__(self, dbapi, connection):
+ self.dbapi = dbapi
+ self._connection = connection
+ self._execute_mutex = asyncio.Lock()
+
+ def ping(self, reconnect):
+ return self.await_(self._connection.ping(reconnect))
+
+ def add_output_converter(self, *arg, **kw):
+ self._connection.add_output_converter(*arg, **kw)
+
+ def character_set_name(self):
+ return self._connection.character_set_name()
+
+ @property
+ def autocommit(self):
+ return self._connection.autocommit
+
+ @autocommit.setter
+ def autocommit(self, value):
+ # https://github.com/aio-libs/aioodbc/issues/448
+ # self._connection.autocommit = value
+
+ self._connection._conn.autocommit = value
+
+ def cursor(self, server_side=False):
+ if server_side:
+ return self._ss_cursor_cls(self)
+ else:
+ return self._cursor_cls(self)
+
+ def rollback(self):
+ self.await_(self._connection.rollback())
+
+ def commit(self):
+ self.await_(self._connection.commit())
+
+ def close(self):
+ self.await_(self._connection.close())
+
+
+class AsyncAdaptFallback_dbapi_connection(AsyncAdapt_dbapi_connection):
+ __slots__ = ()
+
+ await_ = staticmethod(await_fallback)