diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 |
commit | 12cf076118570eebbff08c6b3090e0d4798447a1 (patch) | |
tree | 3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/asyncmy.py | |
parent | c45662ff3923b34614ddcc8feb9195541166dcc5 (diff) |
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/asyncmy.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/asyncmy.py | 337 |
1 files changed, 0 insertions, 337 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/asyncmy.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/asyncmy.py deleted file mode 100644 index 7360044..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mysql/asyncmy.py +++ /dev/null @@ -1,337 +0,0 @@ -# dialects/mysql/asyncmy.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors <see AUTHORS -# file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -r""" -.. dialect:: mysql+asyncmy - :name: asyncmy - :dbapi: asyncmy - :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...] - :url: https://github.com/long2ice/asyncmy - -Using a special asyncio mediation layer, the asyncmy dialect is usable -as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` -extension package. - -This dialect should normally be used only with the -:func:`_asyncio.create_async_engine` engine creation function:: - - from sqlalchemy.ext.asyncio import create_async_engine - engine = create_async_engine("mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4") - - -""" # noqa -from contextlib import asynccontextmanager - -from .pymysql import MySQLDialect_pymysql -from ... import pool -from ... import util -from ...engine import AdaptedConnection -from ...util.concurrency import asyncio -from ...util.concurrency import await_fallback -from ...util.concurrency import await_only - - -class AsyncAdapt_asyncmy_cursor: - # TODO: base on connectors/asyncio.py - # see #10415 - server_side = False - __slots__ = ( - "_adapt_connection", - "_connection", - "await_", - "_cursor", - "_rows", - ) - - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - - cursor = self._connection.cursor() - - self._cursor = self.await_(cursor.__aenter__()) - self._rows = [] - - @property - def description(self): - return self._cursor.description - - @property - def rowcount(self): - return self._cursor.rowcount - - @property - def arraysize(self): - return self._cursor.arraysize - - @arraysize.setter - def arraysize(self, value): - self._cursor.arraysize = value - - @property - def lastrowid(self): - return self._cursor.lastrowid - - def close(self): - # note we aren't actually closing the cursor here, - # we are just letting GC do it. to allow this to be async - # we would need the Result to change how it does "Safe close cursor". - # MySQL "cursors" don't actually have state to be "closed" besides - # exhausting rows, which we already have done for sync cursor. - # another option would be to emulate aiosqlite dialect and assign - # cursor only if we are doing server side cursor operation. - self._rows[:] = [] - - def execute(self, operation, parameters=None): - return self.await_(self._execute_async(operation, parameters)) - - def executemany(self, operation, seq_of_parameters): - return self.await_( - self._executemany_async(operation, seq_of_parameters) - ) - - async def _execute_async(self, operation, parameters): - async with self._adapt_connection._mutex_and_adapt_errors(): - if parameters is None: - result = await self._cursor.execute(operation) - else: - result = await self._cursor.execute(operation, parameters) - - if not self.server_side: - # asyncmy has a "fake" async result, so we have to pull it out - # of that here since our default result is not async. - # we could just as easily grab "_rows" here and be done with it - # but this is safer. - self._rows = list(await self._cursor.fetchall()) - return result - - async def _executemany_async(self, operation, seq_of_parameters): - async with self._adapt_connection._mutex_and_adapt_errors(): - return await self._cursor.executemany(operation, seq_of_parameters) - - def setinputsizes(self, *inputsizes): - pass - - def __iter__(self): - while self._rows: - yield self._rows.pop(0) - - def fetchone(self): - if self._rows: - return self._rows.pop(0) - else: - return None - - def fetchmany(self, size=None): - if size is None: - size = self.arraysize - - retval = self._rows[0:size] - self._rows[:] = self._rows[size:] - return retval - - def fetchall(self): - retval = self._rows[:] - self._rows[:] = [] - return retval - - -class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor): - # TODO: base on connectors/asyncio.py - # see #10415 - __slots__ = () - server_side = True - - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - - cursor = self._connection.cursor( - adapt_connection.dbapi.asyncmy.cursors.SSCursor - ) - - self._cursor = self.await_(cursor.__aenter__()) - - def close(self): - if self._cursor is not None: - self.await_(self._cursor.close()) - self._cursor = None - - def fetchone(self): - return self.await_(self._cursor.fetchone()) - - def fetchmany(self, size=None): - return self.await_(self._cursor.fetchmany(size=size)) - - def fetchall(self): - return self.await_(self._cursor.fetchall()) - - -class AsyncAdapt_asyncmy_connection(AdaptedConnection): - # TODO: base on connectors/asyncio.py - # see #10415 - await_ = staticmethod(await_only) - __slots__ = ("dbapi", "_execute_mutex") - - def __init__(self, dbapi, connection): - self.dbapi = dbapi - self._connection = connection - self._execute_mutex = asyncio.Lock() - - @asynccontextmanager - async def _mutex_and_adapt_errors(self): - async with self._execute_mutex: - try: - yield - except AttributeError: - raise self.dbapi.InternalError( - "network operation failed due to asyncmy attribute error" - ) - - def ping(self, reconnect): - assert not reconnect - return self.await_(self._do_ping()) - - async def _do_ping(self): - async with self._mutex_and_adapt_errors(): - return await self._connection.ping(False) - - def character_set_name(self): - return self._connection.character_set_name() - - def autocommit(self, value): - self.await_(self._connection.autocommit(value)) - - def cursor(self, server_side=False): - if server_side: - return AsyncAdapt_asyncmy_ss_cursor(self) - else: - return AsyncAdapt_asyncmy_cursor(self) - - def rollback(self): - self.await_(self._connection.rollback()) - - def commit(self): - self.await_(self._connection.commit()) - - def terminate(self): - # it's not awaitable. - self._connection.close() - - def close(self) -> None: - self.await_(self._connection.ensure_closed()) - - -class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection): - __slots__ = () - - await_ = staticmethod(await_fallback) - - -def _Binary(x): - """Return x as a binary type.""" - return bytes(x) - - -class AsyncAdapt_asyncmy_dbapi: - def __init__(self, asyncmy): - self.asyncmy = asyncmy - self.paramstyle = "format" - self._init_dbapi_attributes() - - def _init_dbapi_attributes(self): - for name in ( - "Warning", - "Error", - "InterfaceError", - "DataError", - "DatabaseError", - "OperationalError", - "InterfaceError", - "IntegrityError", - "ProgrammingError", - "InternalError", - "NotSupportedError", - ): - setattr(self, name, getattr(self.asyncmy.errors, name)) - - STRING = util.symbol("STRING") - NUMBER = util.symbol("NUMBER") - BINARY = util.symbol("BINARY") - DATETIME = util.symbol("DATETIME") - TIMESTAMP = util.symbol("TIMESTAMP") - Binary = staticmethod(_Binary) - - def connect(self, *arg, **kw): - async_fallback = kw.pop("async_fallback", False) - creator_fn = kw.pop("async_creator_fn", self.asyncmy.connect) - - if util.asbool(async_fallback): - return AsyncAdaptFallback_asyncmy_connection( - self, - await_fallback(creator_fn(*arg, **kw)), - ) - else: - return AsyncAdapt_asyncmy_connection( - self, - await_only(creator_fn(*arg, **kw)), - ) - - -class MySQLDialect_asyncmy(MySQLDialect_pymysql): - driver = "asyncmy" - supports_statement_cache = True - - supports_server_side_cursors = True - _sscursor = AsyncAdapt_asyncmy_ss_cursor - - is_async = True - has_terminate = True - - @classmethod - def import_dbapi(cls): - return AsyncAdapt_asyncmy_dbapi(__import__("asyncmy")) - - @classmethod - def get_pool_class(cls, url): - async_fallback = url.query.get("async_fallback", False) - - if util.asbool(async_fallback): - return pool.FallbackAsyncAdaptedQueuePool - else: - return pool.AsyncAdaptedQueuePool - - def do_terminate(self, dbapi_connection) -> None: - dbapi_connection.terminate() - - def create_connect_args(self, url): - return super().create_connect_args( - url, _translate_args=dict(username="user", database="db") - ) - - def is_disconnect(self, e, connection, cursor): - if super().is_disconnect(e, connection, cursor): - return True - else: - str_e = str(e).lower() - return ( - "not connected" in str_e or "network operation failed" in str_e - ) - - def _found_rows_client_flag(self): - from asyncmy.constants import CLIENT - - return CLIENT.FOUND_ROWS - - def get_driver_connection(self, connection): - return connection._connection - - -dialect = MySQLDialect_asyncmy |