diff options
| author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 | 
|---|---|---|
| committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 | 
| commit | 12cf076118570eebbff08c6b3090e0d4798447a1 (patch) | |
| tree | 3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite | |
| parent | c45662ff3923b34614ddcc8feb9195541166dcc5 (diff) | |
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite')
16 files changed, 0 insertions, 4676 deletions
| diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py deleted file mode 100644 index 45f088e..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py +++ /dev/null @@ -1,57 +0,0 @@ -# dialects/sqlite/__init__.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -from . import aiosqlite  # noqa -from . import base  # noqa -from . import pysqlcipher  # noqa -from . import pysqlite  # noqa -from .base import BLOB -from .base import BOOLEAN -from .base import CHAR -from .base import DATE -from .base import DATETIME -from .base import DECIMAL -from .base import FLOAT -from .base import INTEGER -from .base import JSON -from .base import NUMERIC -from .base import REAL -from .base import SMALLINT -from .base import TEXT -from .base import TIME -from .base import TIMESTAMP -from .base import VARCHAR -from .dml import Insert -from .dml import insert - -# default dialect -base.dialect = dialect = pysqlite.dialect - - -__all__ = ( -    "BLOB", -    "BOOLEAN", -    "CHAR", -    "DATE", -    "DATETIME", -    "DECIMAL", -    "FLOAT", -    "INTEGER", -    "JSON", -    "NUMERIC", -    "SMALLINT", -    "TEXT", -    "TIME", -    "TIMESTAMP", -    "VARCHAR", -    "REAL", -    "Insert", -    "insert", -    "dialect", -) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pycBinary files differ deleted file mode 100644 index e4a9b51..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pycBinary files differ deleted file mode 100644 index 41466a4..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pycBinary files differ deleted file mode 100644 index e7f5c22..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pycBinary files differ deleted file mode 100644 index eb0f448..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pycBinary files differ deleted file mode 100644 index ad4323c..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pycBinary files differ deleted file mode 100644 index d139ba3..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pycBinary files differ deleted file mode 100644 index d26e7b3..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pycBinary files differ deleted file mode 100644 index df08288..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py deleted file mode 100644 index 6c91563..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py +++ /dev/null @@ -1,396 +0,0 @@ -# dialects/sqlite/aiosqlite.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -r""" - -.. dialect:: sqlite+aiosqlite -    :name: aiosqlite -    :dbapi: aiosqlite -    :connectstring: sqlite+aiosqlite:///file_path -    :url: https://pypi.org/project/aiosqlite/ - -The aiosqlite dialect provides support for the SQLAlchemy asyncio interface -running on top of pysqlite. - -aiosqlite is a wrapper around pysqlite that uses a background thread for -each connection.   It does not actually use non-blocking IO, as SQLite -databases are not socket-based.  However it does provide a working asyncio -interface that's useful for testing and prototyping purposes. - -Using a special asyncio mediation layer, the aiosqlite dialect is usable -as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` -extension package. - -This dialect should normally be used only with the -:func:`_asyncio.create_async_engine` engine creation function:: - -    from sqlalchemy.ext.asyncio import create_async_engine -    engine = create_async_engine("sqlite+aiosqlite:///filename") - -The URL passes through all arguments to the ``pysqlite`` driver, so all -connection arguments are the same as they are for that of :ref:`pysqlite`. - -.. _aiosqlite_udfs: - -User-Defined Functions ----------------------- - -aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) -in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. - -.. _aiosqlite_serializable: - -Serializable isolation / Savepoints / Transactional DDL (asyncio version) -------------------------------------------------------------------------- - -Similarly to pysqlite, aiosqlite does not support SAVEPOINT feature. - -The solution is similar to :ref:`pysqlite_serializable`. This is achieved by the event listeners in async:: - -    from sqlalchemy import create_engine, event -    from sqlalchemy.ext.asyncio import create_async_engine - -    engine = create_async_engine("sqlite+aiosqlite:///myfile.db") - -    @event.listens_for(engine.sync_engine, "connect") -    def do_connect(dbapi_connection, connection_record): -        # disable aiosqlite's emitting of the BEGIN statement entirely. -        # also stops it from emitting COMMIT before any DDL. -        dbapi_connection.isolation_level = None - -    @event.listens_for(engine.sync_engine, "begin") -    def do_begin(conn): -        # emit our own BEGIN -        conn.exec_driver_sql("BEGIN") - -.. warning:: When using the above recipe, it is advised to not use the -   :paramref:`.Connection.execution_options.isolation_level` setting on -   :class:`_engine.Connection` and :func:`_sa.create_engine` -   with the SQLite driver, -   as this function necessarily will also alter the ".isolation_level" setting. - -"""  # noqa - -import asyncio -from functools import partial - -from .base import SQLiteExecutionContext -from .pysqlite import SQLiteDialect_pysqlite -from ... import pool -from ... import util -from ...engine import AdaptedConnection -from ...util.concurrency import await_fallback -from ...util.concurrency import await_only - - -class AsyncAdapt_aiosqlite_cursor: -    # TODO: base on connectors/asyncio.py -    # see #10415 - -    __slots__ = ( -        "_adapt_connection", -        "_connection", -        "description", -        "await_", -        "_rows", -        "arraysize", -        "rowcount", -        "lastrowid", -    ) - -    server_side = False - -    def __init__(self, adapt_connection): -        self._adapt_connection = adapt_connection -        self._connection = adapt_connection._connection -        self.await_ = adapt_connection.await_ -        self.arraysize = 1 -        self.rowcount = -1 -        self.description = None -        self._rows = [] - -    def close(self): -        self._rows[:] = [] - -    def execute(self, operation, parameters=None): -        try: -            _cursor = self.await_(self._connection.cursor()) - -            if parameters is None: -                self.await_(_cursor.execute(operation)) -            else: -                self.await_(_cursor.execute(operation, parameters)) - -            if _cursor.description: -                self.description = _cursor.description -                self.lastrowid = self.rowcount = -1 - -                if not self.server_side: -                    self._rows = self.await_(_cursor.fetchall()) -            else: -                self.description = None -                self.lastrowid = _cursor.lastrowid -                self.rowcount = _cursor.rowcount - -            if not self.server_side: -                self.await_(_cursor.close()) -            else: -                self._cursor = _cursor -        except Exception as error: -            self._adapt_connection._handle_exception(error) - -    def executemany(self, operation, seq_of_parameters): -        try: -            _cursor = self.await_(self._connection.cursor()) -            self.await_(_cursor.executemany(operation, seq_of_parameters)) -            self.description = None -            self.lastrowid = _cursor.lastrowid -            self.rowcount = _cursor.rowcount -            self.await_(_cursor.close()) -        except Exception as error: -            self._adapt_connection._handle_exception(error) - -    def setinputsizes(self, *inputsizes): -        pass - -    def __iter__(self): -        while self._rows: -            yield self._rows.pop(0) - -    def fetchone(self): -        if self._rows: -            return self._rows.pop(0) -        else: -            return None - -    def fetchmany(self, size=None): -        if size is None: -            size = self.arraysize - -        retval = self._rows[0:size] -        self._rows[:] = self._rows[size:] -        return retval - -    def fetchall(self): -        retval = self._rows[:] -        self._rows[:] = [] -        return retval - - -class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor): -    # TODO: base on connectors/asyncio.py -    # see #10415 -    __slots__ = "_cursor" - -    server_side = True - -    def __init__(self, *arg, **kw): -        super().__init__(*arg, **kw) -        self._cursor = None - -    def close(self): -        if self._cursor is not None: -            self.await_(self._cursor.close()) -            self._cursor = None - -    def fetchone(self): -        return self.await_(self._cursor.fetchone()) - -    def fetchmany(self, size=None): -        if size is None: -            size = self.arraysize -        return self.await_(self._cursor.fetchmany(size=size)) - -    def fetchall(self): -        return self.await_(self._cursor.fetchall()) - - -class AsyncAdapt_aiosqlite_connection(AdaptedConnection): -    await_ = staticmethod(await_only) -    __slots__ = ("dbapi",) - -    def __init__(self, dbapi, connection): -        self.dbapi = dbapi -        self._connection = connection - -    @property -    def isolation_level(self): -        return self._connection.isolation_level - -    @isolation_level.setter -    def isolation_level(self, value): -        # aiosqlite's isolation_level setter works outside the Thread -        # that it's supposed to, necessitating setting check_same_thread=False. -        # for improved stability, we instead invent our own awaitable version -        # using aiosqlite's async queue directly. - -        def set_iso(connection, value): -            connection.isolation_level = value - -        function = partial(set_iso, self._connection._conn, value) -        future = asyncio.get_event_loop().create_future() - -        self._connection._tx.put_nowait((future, function)) - -        try: -            return self.await_(future) -        except Exception as error: -            self._handle_exception(error) - -    def create_function(self, *args, **kw): -        try: -            self.await_(self._connection.create_function(*args, **kw)) -        except Exception as error: -            self._handle_exception(error) - -    def cursor(self, server_side=False): -        if server_side: -            return AsyncAdapt_aiosqlite_ss_cursor(self) -        else: -            return AsyncAdapt_aiosqlite_cursor(self) - -    def execute(self, *args, **kw): -        return self.await_(self._connection.execute(*args, **kw)) - -    def rollback(self): -        try: -            self.await_(self._connection.rollback()) -        except Exception as error: -            self._handle_exception(error) - -    def commit(self): -        try: -            self.await_(self._connection.commit()) -        except Exception as error: -            self._handle_exception(error) - -    def close(self): -        try: -            self.await_(self._connection.close()) -        except ValueError: -            # this is undocumented for aiosqlite, that ValueError -            # was raised if .close() was called more than once, which is -            # both not customary for DBAPI and is also not a DBAPI.Error -            # exception. This is now fixed in aiosqlite via my PR -            # https://github.com/omnilib/aiosqlite/pull/238, so we can be -            # assured this will not become some other kind of exception, -            # since it doesn't raise anymore. - -            pass -        except Exception as error: -            self._handle_exception(error) - -    def _handle_exception(self, error): -        if ( -            isinstance(error, ValueError) -            and error.args[0] == "no active connection" -        ): -            raise self.dbapi.sqlite.OperationalError( -                "no active connection" -            ) from error -        else: -            raise error - - -class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection): -    __slots__ = () - -    await_ = staticmethod(await_fallback) - - -class AsyncAdapt_aiosqlite_dbapi: -    def __init__(self, aiosqlite, sqlite): -        self.aiosqlite = aiosqlite -        self.sqlite = sqlite -        self.paramstyle = "qmark" -        self._init_dbapi_attributes() - -    def _init_dbapi_attributes(self): -        for name in ( -            "DatabaseError", -            "Error", -            "IntegrityError", -            "NotSupportedError", -            "OperationalError", -            "ProgrammingError", -            "sqlite_version", -            "sqlite_version_info", -        ): -            setattr(self, name, getattr(self.aiosqlite, name)) - -        for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): -            setattr(self, name, getattr(self.sqlite, name)) - -        for name in ("Binary",): -            setattr(self, name, getattr(self.sqlite, name)) - -    def connect(self, *arg, **kw): -        async_fallback = kw.pop("async_fallback", False) - -        creator_fn = kw.pop("async_creator_fn", None) -        if creator_fn: -            connection = creator_fn(*arg, **kw) -        else: -            connection = self.aiosqlite.connect(*arg, **kw) -            # it's a Thread.   you'll thank us later -            connection.daemon = True - -        if util.asbool(async_fallback): -            return AsyncAdaptFallback_aiosqlite_connection( -                self, -                await_fallback(connection), -            ) -        else: -            return AsyncAdapt_aiosqlite_connection( -                self, -                await_only(connection), -            ) - - -class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): -    def create_server_side_cursor(self): -        return self._dbapi_connection.cursor(server_side=True) - - -class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): -    driver = "aiosqlite" -    supports_statement_cache = True - -    is_async = True - -    supports_server_side_cursors = True - -    execution_ctx_cls = SQLiteExecutionContext_aiosqlite - -    @classmethod -    def import_dbapi(cls): -        return AsyncAdapt_aiosqlite_dbapi( -            __import__("aiosqlite"), __import__("sqlite3") -        ) - -    @classmethod -    def get_pool_class(cls, url): -        if cls._is_url_file_db(url): -            return pool.NullPool -        else: -            return pool.StaticPool - -    def is_disconnect(self, e, connection, cursor): -        if isinstance( -            e, self.dbapi.OperationalError -        ) and "no active connection" in str(e): -            return True - -        return super().is_disconnect(e, connection, cursor) - -    def get_driver_connection(self, connection): -        return connection._connection - - -dialect = SQLiteDialect_aiosqlite diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py deleted file mode 100644 index 6db8214..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py +++ /dev/null @@ -1,2782 +0,0 @@ -# dialects/sqlite/base.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -r""" -.. dialect:: sqlite -    :name: SQLite -    :full_support: 3.36.0 -    :normal_support: 3.12+ -    :best_effort: 3.7.16+ - -.. _sqlite_datetime: - -Date and Time Types -------------------- - -SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does -not provide out of the box functionality for translating values between Python -`datetime` objects and a SQLite-supported format. SQLAlchemy's own -:class:`~sqlalchemy.types.DateTime` and related types provide date formatting -and parsing functionality when SQLite is used. The implementation classes are -:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. -These types represent dates and times as ISO formatted strings, which also -nicely support ordering. There's no reliance on typical "libc" internals for -these functions so historical dates are fully supported. - -Ensuring Text affinity -^^^^^^^^^^^^^^^^^^^^^^ - -The DDL rendered for these types is the standard ``DATE``, ``TIME`` -and ``DATETIME`` indicators.    However, custom storage formats can also be -applied to these types.   When the -storage format is detected as containing no alpha characters, the DDL for -these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, -so that the column continues to have textual affinity. - -.. seealso:: - -    `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - -    in the SQLite documentation - -.. _sqlite_autoincrement: - -SQLite Auto Incrementing Behavior ----------------------------------- - -Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html - -Key concepts: - -* SQLite has an implicit "auto increment" feature that takes place for any -  non-composite primary-key column that is specifically created using -  "INTEGER PRIMARY KEY" for the type + primary key. - -* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** -  equivalent to the implicit autoincrement feature; this keyword is not -  recommended for general use.  SQLAlchemy does not render this keyword -  unless a special SQLite-specific directive is used (see below).  However, -  it still requires that the column's type is named "INTEGER". - -Using the AUTOINCREMENT Keyword -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -To specifically render the AUTOINCREMENT keyword on the primary key column -when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table -construct:: - -    Table('sometable', metadata, -            Column('id', Integer, primary_key=True), -            sqlite_autoincrement=True) - -Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -SQLite's typing model is based on naming conventions.  Among other things, this -means that any type name which contains the substring ``"INT"`` will be -determined to be of "integer affinity".  A type named ``"BIGINT"``, -``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be -of "integer" affinity.  However, **the SQLite autoincrement feature, whether -implicitly or explicitly enabled, requires that the name of the column's type -is exactly the string "INTEGER"**.  Therefore, if an application uses a type -like :class:`.BigInteger` for a primary key, on SQLite this type will need to -be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE -TABLE`` statement in order for the autoincrement behavior to be available. - -One approach to achieve this is to use :class:`.Integer` on SQLite -only using :meth:`.TypeEngine.with_variant`:: - -    table = Table( -        "my_table", metadata, -        Column("id", BigInteger().with_variant(Integer, "sqlite"), primary_key=True) -    ) - -Another is to use a subclass of :class:`.BigInteger` that overrides its DDL -name to be ``INTEGER`` when compiled against SQLite:: - -    from sqlalchemy import BigInteger -    from sqlalchemy.ext.compiler import compiles - -    class SLBigInteger(BigInteger): -        pass - -    @compiles(SLBigInteger, 'sqlite') -    def bi_c(element, compiler, **kw): -        return "INTEGER" - -    @compiles(SLBigInteger) -    def bi_c(element, compiler, **kw): -        return compiler.visit_BIGINT(element, **kw) - - -    table = Table( -        "my_table", metadata, -        Column("id", SLBigInteger(), primary_key=True) -    ) - -.. seealso:: - -    :meth:`.TypeEngine.with_variant` - -    :ref:`sqlalchemy.ext.compiler_toplevel` - -    `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ - -.. _sqlite_concurrency: - -Database Locking Behavior / Concurrency ---------------------------------------- - -SQLite is not designed for a high level of write concurrency. The database -itself, being a file, is locked completely during write operations within -transactions, meaning exactly one "connection" (in reality a file handle) -has exclusive access to the database during this period - all other -"connections" will be blocked during this time. - -The Python DBAPI specification also calls for a connection model that is -always in a transaction; there is no ``connection.begin()`` method, -only ``connection.commit()`` and ``connection.rollback()``, upon which a -new transaction is to be begun immediately.  This may seem to imply -that the SQLite driver would in theory allow only a single filehandle on a -particular database file at any time; however, there are several -factors both within SQLite itself as well as within the pysqlite driver -which loosen this restriction significantly. - -However, no matter what locking modes are used, SQLite will still always -lock the database file once a transaction is started and DML (e.g. INSERT, -UPDATE, DELETE) has at least been emitted, and this will block -other transactions at least at the point that they also attempt to emit DML. -By default, the length of time on this block is very short before it times out -with an error. - -This behavior becomes more critical when used in conjunction with the -SQLAlchemy ORM.  SQLAlchemy's :class:`.Session` object by default runs -within a transaction, and with its autoflush model, may emit DML preceding -any SELECT statement.   This may lead to a SQLite database that locks -more quickly than is expected.   The locking mode of SQLite and the pysqlite -driver can be manipulated to some degree, however it should be noted that -achieving a high degree of write-concurrency with SQLite is a losing battle. - -For more information on SQLite's lack of write concurrency by design, please -see -`Situations Where Another RDBMS May Work Better - High Concurrency -<https://www.sqlite.org/whentouse.html>`_ near the bottom of the page. - -The following subsections introduce areas that are impacted by SQLite's -file-based architecture and additionally will usually require workarounds to -work when using the pysqlite driver. - -.. _sqlite_isolation_level: - -Transaction Isolation Level / Autocommit ----------------------------------------- - -SQLite supports "transaction isolation" in a non-standard way, along two -axes.  One is that of the -`PRAGMA read_uncommitted <https://www.sqlite.org/pragma.html#pragma_read_uncommitted>`_ -instruction.   This setting can essentially switch SQLite between its -default mode of ``SERIALIZABLE`` isolation, and a "dirty read" isolation -mode normally referred to as ``READ UNCOMMITTED``. - -SQLAlchemy ties into this PRAGMA statement using the -:paramref:`_sa.create_engine.isolation_level` parameter of -:func:`_sa.create_engine`. -Valid values for this parameter when used with SQLite are ``"SERIALIZABLE"`` -and ``"READ UNCOMMITTED"`` corresponding to a value of 0 and 1, respectively. -SQLite defaults to ``SERIALIZABLE``, however its behavior is impacted by -the pysqlite driver's default behavior. - -When using the pysqlite driver, the ``"AUTOCOMMIT"`` isolation level is also -available, which will alter the pysqlite connection using the ``.isolation_level`` -attribute on the DBAPI connection and set it to None for the duration -of the setting. - -.. versionadded:: 1.3.16 added support for SQLite AUTOCOMMIT isolation level -   when using the pysqlite / sqlite3 SQLite driver. - - -The other axis along which SQLite's transactional locking is impacted is -via the nature of the ``BEGIN`` statement used.   The three varieties -are "deferred", "immediate", and "exclusive", as described at -`BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_.   A straight -``BEGIN`` statement uses the "deferred" mode, where the database file is -not locked until the first read or write operation, and read access remains -open to other transactions until the first write operation.  But again, -it is critical to note that the pysqlite driver interferes with this behavior -by *not even emitting BEGIN* until the first write operation. - -.. warning:: - -    SQLite's transactional scope is impacted by unresolved -    issues in the pysqlite driver, which defers BEGIN statements to a greater -    degree than is often feasible. See the section :ref:`pysqlite_serializable` -    or :ref:`aiosqlite_serializable` for techniques to work around this behavior. - -.. seealso:: - -    :ref:`dbapi_autocommit` - -INSERT/UPDATE/DELETE...RETURNING ---------------------------------- - -The SQLite dialect supports SQLite 3.35's  ``INSERT|UPDATE|DELETE..RETURNING`` -syntax.   ``INSERT..RETURNING`` may be used -automatically in some cases in order to fetch newly generated identifiers in -place of the traditional approach of using ``cursor.lastrowid``, however -``cursor.lastrowid`` is currently still preferred for simple single-statement -cases for its better performance. - -To specify an explicit ``RETURNING`` clause, use the -:meth:`._UpdateBase.returning` method on a per-statement basis:: - -    # INSERT..RETURNING -    result = connection.execute( -        table.insert(). -        values(name='foo'). -        returning(table.c.col1, table.c.col2) -    ) -    print(result.all()) - -    # UPDATE..RETURNING -    result = connection.execute( -        table.update(). -        where(table.c.name=='foo'). -        values(name='bar'). -        returning(table.c.col1, table.c.col2) -    ) -    print(result.all()) - -    # DELETE..RETURNING -    result = connection.execute( -        table.delete(). -        where(table.c.name=='foo'). -        returning(table.c.col1, table.c.col2) -    ) -    print(result.all()) - -.. versionadded:: 2.0  Added support for SQLite RETURNING - -SAVEPOINT Support ----------------------------- - -SQLite supports SAVEPOINTs, which only function once a transaction is -begun.   SQLAlchemy's SAVEPOINT support is available using the -:meth:`_engine.Connection.begin_nested` method at the Core level, and -:meth:`.Session.begin_nested` at the ORM level.   However, SAVEPOINTs -won't work at all with pysqlite unless workarounds are taken. - -.. warning:: - -    SQLite's SAVEPOINT feature is impacted by unresolved -    issues in the pysqlite and aiosqlite drivers, which defer BEGIN statements -    to a greater degree than is often feasible. See the sections -    :ref:`pysqlite_serializable` and :ref:`aiosqlite_serializable` -    for techniques to work around this behavior. - -Transactional DDL ----------------------------- - -The SQLite database supports transactional :term:`DDL` as well. -In this case, the pysqlite driver is not only failing to start transactions, -it also is ending any existing transaction when DDL is detected, so again, -workarounds are required. - -.. warning:: - -    SQLite's transactional DDL is impacted by unresolved issues -    in the pysqlite driver, which fails to emit BEGIN and additionally -    forces a COMMIT to cancel any transaction when DDL is encountered. -    See the section :ref:`pysqlite_serializable` -    for techniques to work around this behavior. - -.. _sqlite_foreign_keys: - -Foreign Key Support -------------------- - -SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, -however by default these constraints have no effect on the operation of the -table. - -Constraint checking on SQLite has three prerequisites: - -* At least version 3.6.19 of SQLite must be in use -* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY -  or SQLITE_OMIT_TRIGGER symbols enabled. -* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all -  connections before use -- including the initial call to -  :meth:`sqlalchemy.schema.MetaData.create_all`. - -SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for -new connections through the usage of events:: - -    from sqlalchemy.engine import Engine -    from sqlalchemy import event - -    @event.listens_for(Engine, "connect") -    def set_sqlite_pragma(dbapi_connection, connection_record): -        cursor = dbapi_connection.cursor() -        cursor.execute("PRAGMA foreign_keys=ON") -        cursor.close() - -.. warning:: - -    When SQLite foreign keys are enabled, it is **not possible** -    to emit CREATE or DROP statements for tables that contain -    mutually-dependent foreign key constraints; -    to emit the DDL for these tables requires that ALTER TABLE be used to -    create or drop these constraints separately, for which SQLite has -    no support. - -.. seealso:: - -    `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ -    - on the SQLite web site. - -    :ref:`event_toplevel` - SQLAlchemy event API. - -    :ref:`use_alter` - more information on SQLAlchemy's facilities for handling -     mutually-dependent foreign key constraints. - -.. _sqlite_on_conflict_ddl: - -ON CONFLICT support for constraints ------------------------------------ - -.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for -   SQLite, which occurs within a CREATE TABLE statement.  For "ON CONFLICT" as -   applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. - -SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied -to primary key, unique, check, and not null constraints.   In DDL, it is -rendered either within the "CONSTRAINT" clause or within the column definition -itself depending on the location of the target constraint.    To render this -clause within DDL, the extension parameter ``sqlite_on_conflict`` can be -specified with a string conflict resolution algorithm within the -:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, -:class:`.CheckConstraint` objects.  Within the :class:`_schema.Column` object, -there -are individual parameters ``sqlite_on_conflict_not_null``, -``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each -correspond to the three types of relevant constraint types that can be -indicated from a :class:`_schema.Column` object. - -.. seealso:: - -    `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite -    documentation - -.. versionadded:: 1.3 - - -The ``sqlite_on_conflict`` parameters accept a  string argument which is just -the resolution name to be chosen, which on SQLite can be one of ROLLBACK, -ABORT, FAIL, IGNORE, and REPLACE.   For example, to add a UNIQUE constraint -that specifies the IGNORE algorithm:: - -    some_table = Table( -        'some_table', metadata, -        Column('id', Integer, primary_key=True), -        Column('data', Integer), -        UniqueConstraint('id', 'data', sqlite_on_conflict='IGNORE') -    ) - -The above renders CREATE TABLE DDL as:: - -    CREATE TABLE some_table ( -        id INTEGER NOT NULL, -        data INTEGER, -        PRIMARY KEY (id), -        UNIQUE (id, data) ON CONFLICT IGNORE -    ) - - -When using the :paramref:`_schema.Column.unique` -flag to add a UNIQUE constraint -to a single column, the ``sqlite_on_conflict_unique`` parameter can -be added to the :class:`_schema.Column` as well, which will be added to the -UNIQUE constraint in the DDL:: - -    some_table = Table( -        'some_table', metadata, -        Column('id', Integer, primary_key=True), -        Column('data', Integer, unique=True, -               sqlite_on_conflict_unique='IGNORE') -    ) - -rendering:: - -    CREATE TABLE some_table ( -        id INTEGER NOT NULL, -        data INTEGER, -        PRIMARY KEY (id), -        UNIQUE (data) ON CONFLICT IGNORE -    ) - -To apply the FAIL algorithm for a NOT NULL constraint, -``sqlite_on_conflict_not_null`` is used:: - -    some_table = Table( -        'some_table', metadata, -        Column('id', Integer, primary_key=True), -        Column('data', Integer, nullable=False, -               sqlite_on_conflict_not_null='FAIL') -    ) - -this renders the column inline ON CONFLICT phrase:: - -    CREATE TABLE some_table ( -        id INTEGER NOT NULL, -        data INTEGER NOT NULL ON CONFLICT FAIL, -        PRIMARY KEY (id) -    ) - - -Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: - -    some_table = Table( -        'some_table', metadata, -        Column('id', Integer, primary_key=True, -               sqlite_on_conflict_primary_key='FAIL') -    ) - -SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict -resolution algorithm is applied to the constraint itself:: - -    CREATE TABLE some_table ( -        id INTEGER NOT NULL, -        PRIMARY KEY (id) ON CONFLICT FAIL -    ) - -.. _sqlite_on_conflict_insert: - -INSERT...ON CONFLICT (Upsert) ------------------------------------ - -.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for -   SQLite, which occurs within an INSERT statement.  For "ON CONFLICT" as -   applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. - -From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) -of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` -statement. A candidate row will only be inserted if that row does not violate -any unique or primary key constraints. In the case of a unique constraint violation, a -secondary action can occur which can be either "DO UPDATE", indicating that -the data in the target row should be updated, or "DO NOTHING", which indicates -to silently skip this row. - -Conflicts are determined using columns that are part of existing unique -constraints and indexes.  These constraints are identified by stating the -columns and conditions that comprise the indexes. - -SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific -:func:`_sqlite.insert()` function, which provides -the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` -and :meth:`_sqlite.Insert.on_conflict_do_nothing`: - -.. sourcecode:: pycon+sql - -    >>> from sqlalchemy.dialects.sqlite import insert - -    >>> insert_stmt = insert(my_table).values( -    ...     id='some_existing_id', -    ...     data='inserted value') - -    >>> do_update_stmt = insert_stmt.on_conflict_do_update( -    ...     index_elements=['id'], -    ...     set_=dict(data='updated value') -    ... ) - -    >>> print(do_update_stmt) -    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) -    ON CONFLICT (id) DO UPDATE SET data = ?{stop} - -    >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing( -    ...     index_elements=['id'] -    ... ) - -    >>> print(do_nothing_stmt) -    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) -    ON CONFLICT (id) DO NOTHING - -.. versionadded:: 1.4 - -.. seealso:: - -    `Upsert -    <https://sqlite.org/lang_UPSERT.html>`_ -    - in the SQLite documentation. - - -Specifying the Target -^^^^^^^^^^^^^^^^^^^^^ - -Both methods supply the "target" of the conflict using column inference: - -* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument -  specifies a sequence containing string column names, :class:`_schema.Column` -  objects, and/or SQL expression elements, which would identify a unique index -  or unique constraint. - -* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` -  to infer an index, a partial index can be inferred by also specifying the -  :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: - -  .. sourcecode:: pycon+sql - -        >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') - -        >>> do_update_stmt = stmt.on_conflict_do_update( -        ...     index_elements=[my_table.c.user_email], -        ...     index_where=my_table.c.user_email.like('%@gmail.com'), -        ...     set_=dict(data=stmt.excluded.data) -        ...     ) - -        >>> print(do_update_stmt) -        {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) -        ON CONFLICT (user_email) -        WHERE user_email LIKE '%@gmail.com' -        DO UPDATE SET data = excluded.data - -The SET Clause -^^^^^^^^^^^^^^^ - -``ON CONFLICT...DO UPDATE`` is used to perform an update of the already -existing row, using any combination of new values as well as values -from the proposed insertion. These values are specified using the -:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter.  This -parameter accepts a dictionary which consists of direct values -for UPDATE: - -.. sourcecode:: pycon+sql - -    >>> stmt = insert(my_table).values(id='some_id', data='inserted value') - -    >>> do_update_stmt = stmt.on_conflict_do_update( -    ...     index_elements=['id'], -    ...     set_=dict(data='updated value') -    ... ) - -    >>> print(do_update_stmt) -    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) -    ON CONFLICT (id) DO UPDATE SET data = ? - -.. warning:: - -    The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take -    into account Python-side default UPDATE values or generation functions, -    e.g. those specified using :paramref:`_schema.Column.onupdate`. These -    values will not be exercised for an ON CONFLICT style of UPDATE, unless -    they are manually specified in the -    :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. - -Updating using the Excluded INSERT Values -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -In order to refer to the proposed insertion row, the special alias -:attr:`~.sqlite.Insert.excluded` is available as an attribute on -the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix -on a column, that informs the DO UPDATE to update the row with the value that -would have been inserted had the constraint not failed: - -.. sourcecode:: pycon+sql - -    >>> stmt = insert(my_table).values( -    ...     id='some_id', -    ...     data='inserted value', -    ...     author='jlh' -    ... ) - -    >>> do_update_stmt = stmt.on_conflict_do_update( -    ...     index_elements=['id'], -    ...     set_=dict(data='updated value', author=stmt.excluded.author) -    ... ) - -    >>> print(do_update_stmt) -    {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) -    ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author - -Additional WHERE Criteria -^^^^^^^^^^^^^^^^^^^^^^^^^ - -The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts -a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` -parameter, which will limit those rows which receive an UPDATE: - -.. sourcecode:: pycon+sql - -    >>> stmt = insert(my_table).values( -    ...     id='some_id', -    ...     data='inserted value', -    ...     author='jlh' -    ... ) - -    >>> on_update_stmt = stmt.on_conflict_do_update( -    ...     index_elements=['id'], -    ...     set_=dict(data='updated value', author=stmt.excluded.author), -    ...     where=(my_table.c.status == 2) -    ... ) -    >>> print(on_update_stmt) -    {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) -    ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author -    WHERE my_table.status = ? - - -Skipping Rows with DO NOTHING -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -``ON CONFLICT`` may be used to skip inserting a row entirely -if any conflict with a unique constraint occurs; below this is illustrated -using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: - -.. sourcecode:: pycon+sql - -    >>> stmt = insert(my_table).values(id='some_id', data='inserted value') -    >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id']) -    >>> print(stmt) -    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING - - -If ``DO NOTHING`` is used without specifying any columns or constraint, -it has the effect of skipping the INSERT for any unique violation which -occurs: - -.. sourcecode:: pycon+sql - -    >>> stmt = insert(my_table).values(id='some_id', data='inserted value') -    >>> stmt = stmt.on_conflict_do_nothing() -    >>> print(stmt) -    {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING - -.. _sqlite_type_reflection: - -Type Reflection ---------------- - -SQLite types are unlike those of most other database backends, in that -the string name of the type usually does not correspond to a "type" in a -one-to-one fashion.  Instead, SQLite links per-column typing behavior -to one of five so-called "type affinities" based on a string matching -pattern for the type. - -SQLAlchemy's reflection process, when inspecting types, uses a simple -lookup table to link the keywords returned to provided SQLAlchemy types. -This lookup table is present within the SQLite dialect as it is for all -other dialects.  However, the SQLite dialect has a different "fallback" -routine for when a particular type name is not located in the lookup map; -it instead implements the SQLite "type affinity" scheme located at -https://www.sqlite.org/datatype3.html section 2.1. - -The provided typemap will make direct associations from an exact string -name match for the following types: - -:class:`_types.BIGINT`, :class:`_types.BLOB`, -:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, -:class:`_types.CHAR`, :class:`_types.DATE`, -:class:`_types.DATETIME`, :class:`_types.FLOAT`, -:class:`_types.DECIMAL`, :class:`_types.FLOAT`, -:class:`_types.INTEGER`, :class:`_types.INTEGER`, -:class:`_types.NUMERIC`, :class:`_types.REAL`, -:class:`_types.SMALLINT`, :class:`_types.TEXT`, -:class:`_types.TIME`, :class:`_types.TIMESTAMP`, -:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, -:class:`_types.NCHAR` - -When a type name does not match one of the above types, the "type affinity" -lookup is used instead: - -* :class:`_types.INTEGER` is returned if the type name includes the -  string ``INT`` -* :class:`_types.TEXT` is returned if the type name includes the -  string ``CHAR``, ``CLOB`` or ``TEXT`` -* :class:`_types.NullType` is returned if the type name includes the -  string ``BLOB`` -* :class:`_types.REAL` is returned if the type name includes the string -  ``REAL``, ``FLOA`` or ``DOUB``. -* Otherwise, the :class:`_types.NUMERIC` type is used. - -.. _sqlite_partial_index: - -Partial Indexes ---------------- - -A partial index, e.g. one which uses a WHERE clause, can be specified -with the DDL system using the argument ``sqlite_where``:: - -    tbl = Table('testtbl', m, Column('data', Integer)) -    idx = Index('test_idx1', tbl.c.data, -                sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10)) - -The index will be rendered at create time as:: - -    CREATE INDEX test_idx1 ON testtbl (data) -    WHERE data > 5 AND data < 10 - -.. _sqlite_dotted_column_names: - -Dotted Column Names -------------------- - -Using table or column names that explicitly have periods in them is -**not recommended**.   While this is generally a bad idea for relational -databases in general, as the dot is a syntactically significant character, -the SQLite driver up until version **3.10.0** of SQLite has a bug which -requires that SQLAlchemy filter out these dots in result sets. - -The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: - -    import sqlite3 - -    assert sqlite3.sqlite_version_info < (3, 10, 0), "bug is fixed in this version" - -    conn = sqlite3.connect(":memory:") -    cursor = conn.cursor() - -    cursor.execute("create table x (a integer, b integer)") -    cursor.execute("insert into x (a, b) values (1, 1)") -    cursor.execute("insert into x (a, b) values (2, 2)") - -    cursor.execute("select x.a, x.b from x") -    assert [c[0] for c in cursor.description] == ['a', 'b'] - -    cursor.execute(''' -        select x.a, x.b from x where a=1 -        union -        select x.a, x.b from x where a=2 -    ''') -    assert [c[0] for c in cursor.description] == ['a', 'b'], \ -        [c[0] for c in cursor.description] - -The second assertion fails:: - -    Traceback (most recent call last): -      File "test.py", line 19, in <module> -        [c[0] for c in cursor.description] -    AssertionError: ['x.a', 'x.b'] - -Where above, the driver incorrectly reports the names of the columns -including the name of the table, which is entirely inconsistent vs. -when the UNION is not present. - -SQLAlchemy relies upon column names being predictable in how they match -to the original statement, so the SQLAlchemy dialect has no choice but -to filter these out:: - - -    from sqlalchemy import create_engine - -    eng = create_engine("sqlite://") -    conn = eng.connect() - -    conn.exec_driver_sql("create table x (a integer, b integer)") -    conn.exec_driver_sql("insert into x (a, b) values (1, 1)") -    conn.exec_driver_sql("insert into x (a, b) values (2, 2)") - -    result = conn.exec_driver_sql("select x.a, x.b from x") -    assert result.keys() == ["a", "b"] - -    result = conn.exec_driver_sql(''' -        select x.a, x.b from x where a=1 -        union -        select x.a, x.b from x where a=2 -    ''') -    assert result.keys() == ["a", "b"] - -Note that above, even though SQLAlchemy filters out the dots, *both -names are still addressable*:: - -    >>> row = result.first() -    >>> row["a"] -    1 -    >>> row["x.a"] -    1 -    >>> row["b"] -    1 -    >>> row["x.b"] -    1 - -Therefore, the workaround applied by SQLAlchemy only impacts -:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In -the very specific case where an application is forced to use column names that -contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and -:meth:`.Row.keys()` is required to return these dotted names unmodified, -the ``sqlite_raw_colnames`` execution option may be provided, either on a -per-:class:`_engine.Connection` basis:: - -    result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql(''' -        select x.a, x.b from x where a=1 -        union -        select x.a, x.b from x where a=2 -    ''') -    assert result.keys() == ["x.a", "x.b"] - -or on a per-:class:`_engine.Engine` basis:: - -    engine = create_engine("sqlite://", execution_options={"sqlite_raw_colnames": True}) - -When using the per-:class:`_engine.Engine` execution option, note that -**Core and ORM queries that use UNION may not function properly**. - -SQLite-specific table options ------------------------------ - -One option for CREATE TABLE is supported directly by the SQLite -dialect in conjunction with the :class:`_schema.Table` construct: - -* ``WITHOUT ROWID``:: - -    Table("some_table", metadata, ..., sqlite_with_rowid=False) - -.. seealso:: - -    `SQLite CREATE TABLE options -    <https://www.sqlite.org/lang_createtable.html>`_ - - -.. _sqlite_include_internal: - -Reflecting internal schema tables ----------------------------------- - -Reflection methods that return lists of tables will omit so-called -"SQLite internal schema object" names, which are considered by SQLite -as any object name that is prefixed with ``sqlite_``.  An example of -such an object is the ``sqlite_sequence`` table that's generated when -the ``AUTOINCREMENT`` column parameter is used.   In order to return -these objects, the parameter ``sqlite_include_internal=True`` may be -passed to methods such as :meth:`_schema.MetaData.reflect` or -:meth:`.Inspector.get_table_names`. - -.. versionadded:: 2.0  Added the ``sqlite_include_internal=True`` parameter. -   Previously, these tables were not ignored by SQLAlchemy reflection -   methods. - -.. note:: - -    The ``sqlite_include_internal`` parameter does not refer to the -    "system" tables that are present in schemas such as ``sqlite_master``. - -.. seealso:: - -    `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite -    documentation. - -"""  # noqa -from __future__ import annotations - -import datetime -import numbers -import re -from typing import Optional - -from .json import JSON -from .json import JSONIndexType -from .json import JSONPathType -from ... import exc -from ... import schema as sa_schema -from ... import sql -from ... import text -from ... import types as sqltypes -from ... import util -from ...engine import default -from ...engine import processors -from ...engine import reflection -from ...engine.reflection import ReflectionDefaults -from ...sql import coercions -from ...sql import ColumnElement -from ...sql import compiler -from ...sql import elements -from ...sql import roles -from ...sql import schema -from ...types import BLOB  # noqa -from ...types import BOOLEAN  # noqa -from ...types import CHAR  # noqa -from ...types import DECIMAL  # noqa -from ...types import FLOAT  # noqa -from ...types import INTEGER  # noqa -from ...types import NUMERIC  # noqa -from ...types import REAL  # noqa -from ...types import SMALLINT  # noqa -from ...types import TEXT  # noqa -from ...types import TIMESTAMP  # noqa -from ...types import VARCHAR  # noqa - - -class _SQliteJson(JSON): -    def result_processor(self, dialect, coltype): -        default_processor = super().result_processor(dialect, coltype) - -        def process(value): -            try: -                return default_processor(value) -            except TypeError: -                if isinstance(value, numbers.Number): -                    return value -                else: -                    raise - -        return process - - -class _DateTimeMixin: -    _reg = None -    _storage_format = None - -    def __init__(self, storage_format=None, regexp=None, **kw): -        super().__init__(**kw) -        if regexp is not None: -            self._reg = re.compile(regexp) -        if storage_format is not None: -            self._storage_format = storage_format - -    @property -    def format_is_text_affinity(self): -        """return True if the storage format will automatically imply -        a TEXT affinity. - -        If the storage format contains no non-numeric characters, -        it will imply a NUMERIC storage format on SQLite; in this case, -        the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, -        TIME_CHAR. - -        """ -        spec = self._storage_format % { -            "year": 0, -            "month": 0, -            "day": 0, -            "hour": 0, -            "minute": 0, -            "second": 0, -            "microsecond": 0, -        } -        return bool(re.search(r"[^0-9]", spec)) - -    def adapt(self, cls, **kw): -        if issubclass(cls, _DateTimeMixin): -            if self._storage_format: -                kw["storage_format"] = self._storage_format -            if self._reg: -                kw["regexp"] = self._reg -        return super().adapt(cls, **kw) - -    def literal_processor(self, dialect): -        bp = self.bind_processor(dialect) - -        def process(value): -            return "'%s'" % bp(value) - -        return process - - -class DATETIME(_DateTimeMixin, sqltypes.DateTime): -    r"""Represent a Python datetime object in SQLite using a string. - -    The default string storage format is:: - -        "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" - -    e.g.:: - -        2021-03-15 12:05:57.105542 - -    The incoming storage format is by default parsed using the -    Python ``datetime.fromisoformat()`` function. - -    .. versionchanged:: 2.0  ``datetime.fromisoformat()`` is used for default -       datetime string parsing. - -    The storage format can be customized to some degree using the -    ``storage_format`` and ``regexp`` parameters, such as:: - -        import re -        from sqlalchemy.dialects.sqlite import DATETIME - -        dt = DATETIME(storage_format="%(year)04d/%(month)02d/%(day)02d " -                                     "%(hour)02d:%(minute)02d:%(second)02d", -                      regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)" -        ) - -    :param storage_format: format string which will be applied to the dict -     with keys year, month, day, hour, minute, second, and microsecond. - -    :param regexp: regular expression which will be applied to incoming result -     rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming -     strings. If the regexp contains named groups, the resulting match dict is -     applied to the Python datetime() constructor as keyword arguments. -     Otherwise, if positional groups are used, the datetime() constructor -     is called with positional arguments via -     ``*map(int, match_obj.groups(0))``. - -    """  # noqa - -    _storage_format = ( -        "%(year)04d-%(month)02d-%(day)02d " -        "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" -    ) - -    def __init__(self, *args, **kwargs): -        truncate_microseconds = kwargs.pop("truncate_microseconds", False) -        super().__init__(*args, **kwargs) -        if truncate_microseconds: -            assert "storage_format" not in kwargs, ( -                "You can specify only " -                "one of truncate_microseconds or storage_format." -            ) -            assert "regexp" not in kwargs, ( -                "You can specify only one of " -                "truncate_microseconds or regexp." -            ) -            self._storage_format = ( -                "%(year)04d-%(month)02d-%(day)02d " -                "%(hour)02d:%(minute)02d:%(second)02d" -            ) - -    def bind_processor(self, dialect): -        datetime_datetime = datetime.datetime -        datetime_date = datetime.date -        format_ = self._storage_format - -        def process(value): -            if value is None: -                return None -            elif isinstance(value, datetime_datetime): -                return format_ % { -                    "year": value.year, -                    "month": value.month, -                    "day": value.day, -                    "hour": value.hour, -                    "minute": value.minute, -                    "second": value.second, -                    "microsecond": value.microsecond, -                } -            elif isinstance(value, datetime_date): -                return format_ % { -                    "year": value.year, -                    "month": value.month, -                    "day": value.day, -                    "hour": 0, -                    "minute": 0, -                    "second": 0, -                    "microsecond": 0, -                } -            else: -                raise TypeError( -                    "SQLite DateTime type only accepts Python " -                    "datetime and date objects as input." -                ) - -        return process - -    def result_processor(self, dialect, coltype): -        if self._reg: -            return processors.str_to_datetime_processor_factory( -                self._reg, datetime.datetime -            ) -        else: -            return processors.str_to_datetime - - -class DATE(_DateTimeMixin, sqltypes.Date): -    r"""Represent a Python date object in SQLite using a string. - -    The default string storage format is:: - -        "%(year)04d-%(month)02d-%(day)02d" - -    e.g.:: - -        2011-03-15 - -    The incoming storage format is by default parsed using the -    Python ``date.fromisoformat()`` function. - -    .. versionchanged:: 2.0  ``date.fromisoformat()`` is used for default -       date string parsing. - - -    The storage format can be customized to some degree using the -    ``storage_format`` and ``regexp`` parameters, such as:: - -        import re -        from sqlalchemy.dialects.sqlite import DATE - -        d = DATE( -                storage_format="%(month)02d/%(day)02d/%(year)04d", -                regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)") -            ) - -    :param storage_format: format string which will be applied to the -     dict with keys year, month, and day. - -    :param regexp: regular expression which will be applied to -     incoming result rows, replacing the use of ``date.fromisoformat()`` to -     parse incoming strings. If the regexp contains named groups, the resulting -     match dict is applied to the Python date() constructor as keyword -     arguments. Otherwise, if positional groups are used, the date() -     constructor is called with positional arguments via -     ``*map(int, match_obj.groups(0))``. - -    """ - -    _storage_format = "%(year)04d-%(month)02d-%(day)02d" - -    def bind_processor(self, dialect): -        datetime_date = datetime.date -        format_ = self._storage_format - -        def process(value): -            if value is None: -                return None -            elif isinstance(value, datetime_date): -                return format_ % { -                    "year": value.year, -                    "month": value.month, -                    "day": value.day, -                } -            else: -                raise TypeError( -                    "SQLite Date type only accepts Python " -                    "date objects as input." -                ) - -        return process - -    def result_processor(self, dialect, coltype): -        if self._reg: -            return processors.str_to_datetime_processor_factory( -                self._reg, datetime.date -            ) -        else: -            return processors.str_to_date - - -class TIME(_DateTimeMixin, sqltypes.Time): -    r"""Represent a Python time object in SQLite using a string. - -    The default string storage format is:: - -        "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" - -    e.g.:: - -        12:05:57.10558 - -    The incoming storage format is by default parsed using the -    Python ``time.fromisoformat()`` function. - -    .. versionchanged:: 2.0  ``time.fromisoformat()`` is used for default -       time string parsing. - -    The storage format can be customized to some degree using the -    ``storage_format`` and ``regexp`` parameters, such as:: - -        import re -        from sqlalchemy.dialects.sqlite import TIME - -        t = TIME(storage_format="%(hour)02d-%(minute)02d-" -                                "%(second)02d-%(microsecond)06d", -                 regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?") -        ) - -    :param storage_format: format string which will be applied to the dict -     with keys hour, minute, second, and microsecond. - -    :param regexp: regular expression which will be applied to incoming result -     rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming -     strings. If the regexp contains named groups, the resulting match dict is -     applied to the Python time() constructor as keyword arguments. Otherwise, -     if positional groups are used, the time() constructor is called with -     positional arguments via ``*map(int, match_obj.groups(0))``. - -    """ - -    _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" - -    def __init__(self, *args, **kwargs): -        truncate_microseconds = kwargs.pop("truncate_microseconds", False) -        super().__init__(*args, **kwargs) -        if truncate_microseconds: -            assert "storage_format" not in kwargs, ( -                "You can specify only " -                "one of truncate_microseconds or storage_format." -            ) -            assert "regexp" not in kwargs, ( -                "You can specify only one of " -                "truncate_microseconds or regexp." -            ) -            self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" - -    def bind_processor(self, dialect): -        datetime_time = datetime.time -        format_ = self._storage_format - -        def process(value): -            if value is None: -                return None -            elif isinstance(value, datetime_time): -                return format_ % { -                    "hour": value.hour, -                    "minute": value.minute, -                    "second": value.second, -                    "microsecond": value.microsecond, -                } -            else: -                raise TypeError( -                    "SQLite Time type only accepts Python " -                    "time objects as input." -                ) - -        return process - -    def result_processor(self, dialect, coltype): -        if self._reg: -            return processors.str_to_datetime_processor_factory( -                self._reg, datetime.time -            ) -        else: -            return processors.str_to_time - - -colspecs = { -    sqltypes.Date: DATE, -    sqltypes.DateTime: DATETIME, -    sqltypes.JSON: _SQliteJson, -    sqltypes.JSON.JSONIndexType: JSONIndexType, -    sqltypes.JSON.JSONPathType: JSONPathType, -    sqltypes.Time: TIME, -} - -ischema_names = { -    "BIGINT": sqltypes.BIGINT, -    "BLOB": sqltypes.BLOB, -    "BOOL": sqltypes.BOOLEAN, -    "BOOLEAN": sqltypes.BOOLEAN, -    "CHAR": sqltypes.CHAR, -    "DATE": sqltypes.DATE, -    "DATE_CHAR": sqltypes.DATE, -    "DATETIME": sqltypes.DATETIME, -    "DATETIME_CHAR": sqltypes.DATETIME, -    "DOUBLE": sqltypes.DOUBLE, -    "DECIMAL": sqltypes.DECIMAL, -    "FLOAT": sqltypes.FLOAT, -    "INT": sqltypes.INTEGER, -    "INTEGER": sqltypes.INTEGER, -    "JSON": JSON, -    "NUMERIC": sqltypes.NUMERIC, -    "REAL": sqltypes.REAL, -    "SMALLINT": sqltypes.SMALLINT, -    "TEXT": sqltypes.TEXT, -    "TIME": sqltypes.TIME, -    "TIME_CHAR": sqltypes.TIME, -    "TIMESTAMP": sqltypes.TIMESTAMP, -    "VARCHAR": sqltypes.VARCHAR, -    "NVARCHAR": sqltypes.NVARCHAR, -    "NCHAR": sqltypes.NCHAR, -} - - -class SQLiteCompiler(compiler.SQLCompiler): -    extract_map = util.update_copy( -        compiler.SQLCompiler.extract_map, -        { -            "month": "%m", -            "day": "%d", -            "year": "%Y", -            "second": "%S", -            "hour": "%H", -            "doy": "%j", -            "minute": "%M", -            "epoch": "%s", -            "dow": "%w", -            "week": "%W", -        }, -    ) - -    def visit_truediv_binary(self, binary, operator, **kw): -        return ( -            self.process(binary.left, **kw) -            + " / " -            + "(%s + 0.0)" % self.process(binary.right, **kw) -        ) - -    def visit_now_func(self, fn, **kw): -        return "CURRENT_TIMESTAMP" - -    def visit_localtimestamp_func(self, func, **kw): -        return 'DATETIME(CURRENT_TIMESTAMP, "localtime")' - -    def visit_true(self, expr, **kw): -        return "1" - -    def visit_false(self, expr, **kw): -        return "0" - -    def visit_char_length_func(self, fn, **kw): -        return "length%s" % self.function_argspec(fn) - -    def visit_aggregate_strings_func(self, fn, **kw): -        return "group_concat%s" % self.function_argspec(fn) - -    def visit_cast(self, cast, **kwargs): -        if self.dialect.supports_cast: -            return super().visit_cast(cast, **kwargs) -        else: -            return self.process(cast.clause, **kwargs) - -    def visit_extract(self, extract, **kw): -        try: -            return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( -                self.extract_map[extract.field], -                self.process(extract.expr, **kw), -            ) -        except KeyError as err: -            raise exc.CompileError( -                "%s is not a valid extract argument." % extract.field -            ) from err - -    def returning_clause( -        self, -        stmt, -        returning_cols, -        *, -        populate_result_map, -        **kw, -    ): -        kw["include_table"] = False -        return super().returning_clause( -            stmt, returning_cols, populate_result_map=populate_result_map, **kw -        ) - -    def limit_clause(self, select, **kw): -        text = "" -        if select._limit_clause is not None: -            text += "\n LIMIT " + self.process(select._limit_clause, **kw) -        if select._offset_clause is not None: -            if select._limit_clause is None: -                text += "\n LIMIT " + self.process(sql.literal(-1)) -            text += " OFFSET " + self.process(select._offset_clause, **kw) -        else: -            text += " OFFSET " + self.process(sql.literal(0), **kw) -        return text - -    def for_update_clause(self, select, **kw): -        # sqlite has no "FOR UPDATE" AFAICT -        return "" - -    def update_from_clause( -        self, update_stmt, from_table, extra_froms, from_hints, **kw -    ): -        kw["asfrom"] = True -        return "FROM " + ", ".join( -            t._compiler_dispatch(self, fromhints=from_hints, **kw) -            for t in extra_froms -        ) - -    def visit_is_distinct_from_binary(self, binary, operator, **kw): -        return "%s IS NOT %s" % ( -            self.process(binary.left), -            self.process(binary.right), -        ) - -    def visit_is_not_distinct_from_binary(self, binary, operator, **kw): -        return "%s IS %s" % ( -            self.process(binary.left), -            self.process(binary.right), -        ) - -    def visit_json_getitem_op_binary(self, binary, operator, **kw): -        if binary.type._type_affinity is sqltypes.JSON: -            expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" -        else: -            expr = "JSON_EXTRACT(%s, %s)" - -        return expr % ( -            self.process(binary.left, **kw), -            self.process(binary.right, **kw), -        ) - -    def visit_json_path_getitem_op_binary(self, binary, operator, **kw): -        if binary.type._type_affinity is sqltypes.JSON: -            expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" -        else: -            expr = "JSON_EXTRACT(%s, %s)" - -        return expr % ( -            self.process(binary.left, **kw), -            self.process(binary.right, **kw), -        ) - -    def visit_empty_set_op_expr(self, type_, expand_op, **kw): -        # slightly old SQLite versions don't seem to be able to handle -        # the empty set impl -        return self.visit_empty_set_expr(type_) - -    def visit_empty_set_expr(self, element_types, **kw): -        return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( -            ", ".join("1" for type_ in element_types or [INTEGER()]), -            ", ".join("1" for type_ in element_types or [INTEGER()]), -        ) - -    def visit_regexp_match_op_binary(self, binary, operator, **kw): -        return self._generate_generic_binary(binary, " REGEXP ", **kw) - -    def visit_not_regexp_match_op_binary(self, binary, operator, **kw): -        return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) - -    def _on_conflict_target(self, clause, **kw): -        if clause.constraint_target is not None: -            target_text = "(%s)" % clause.constraint_target -        elif clause.inferred_target_elements is not None: -            target_text = "(%s)" % ", ".join( -                ( -                    self.preparer.quote(c) -                    if isinstance(c, str) -                    else self.process(c, include_table=False, use_schema=False) -                ) -                for c in clause.inferred_target_elements -            ) -            if clause.inferred_target_whereclause is not None: -                target_text += " WHERE %s" % self.process( -                    clause.inferred_target_whereclause, -                    include_table=False, -                    use_schema=False, -                    literal_binds=True, -                ) - -        else: -            target_text = "" - -        return target_text - -    def visit_on_conflict_do_nothing(self, on_conflict, **kw): -        target_text = self._on_conflict_target(on_conflict, **kw) - -        if target_text: -            return "ON CONFLICT %s DO NOTHING" % target_text -        else: -            return "ON CONFLICT DO NOTHING" - -    def visit_on_conflict_do_update(self, on_conflict, **kw): -        clause = on_conflict - -        target_text = self._on_conflict_target(on_conflict, **kw) - -        action_set_ops = [] - -        set_parameters = dict(clause.update_values_to_set) -        # create a list of column assignment clauses as tuples - -        insert_statement = self.stack[-1]["selectable"] -        cols = insert_statement.table.c -        for c in cols: -            col_key = c.key - -            if col_key in set_parameters: -                value = set_parameters.pop(col_key) -            elif c in set_parameters: -                value = set_parameters.pop(c) -            else: -                continue - -            if coercions._is_literal(value): -                value = elements.BindParameter(None, value, type_=c.type) - -            else: -                if ( -                    isinstance(value, elements.BindParameter) -                    and value.type._isnull -                ): -                    value = value._clone() -                    value.type = c.type -            value_text = self.process(value.self_group(), use_schema=False) - -            key_text = self.preparer.quote(c.name) -            action_set_ops.append("%s = %s" % (key_text, value_text)) - -        # check for names that don't match columns -        if set_parameters: -            util.warn( -                "Additional column names not matching " -                "any column keys in table '%s': %s" -                % ( -                    self.current_executable.table.name, -                    (", ".join("'%s'" % c for c in set_parameters)), -                ) -            ) -            for k, v in set_parameters.items(): -                key_text = ( -                    self.preparer.quote(k) -                    if isinstance(k, str) -                    else self.process(k, use_schema=False) -                ) -                value_text = self.process( -                    coercions.expect(roles.ExpressionElementRole, v), -                    use_schema=False, -                ) -                action_set_ops.append("%s = %s" % (key_text, value_text)) - -        action_text = ", ".join(action_set_ops) -        if clause.update_whereclause is not None: -            action_text += " WHERE %s" % self.process( -                clause.update_whereclause, include_table=True, use_schema=False -            ) - -        return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) - - -class SQLiteDDLCompiler(compiler.DDLCompiler): -    def get_column_specification(self, column, **kwargs): -        coltype = self.dialect.type_compiler_instance.process( -            column.type, type_expression=column -        ) -        colspec = self.preparer.format_column(column) + " " + coltype -        default = self.get_column_default_string(column) -        if default is not None: -            if isinstance(column.server_default.arg, ColumnElement): -                default = "(" + default + ")" -            colspec += " DEFAULT " + default - -        if not column.nullable: -            colspec += " NOT NULL" - -            on_conflict_clause = column.dialect_options["sqlite"][ -                "on_conflict_not_null" -            ] -            if on_conflict_clause is not None: -                colspec += " ON CONFLICT " + on_conflict_clause - -        if column.primary_key: -            if ( -                column.autoincrement is True -                and len(column.table.primary_key.columns) != 1 -            ): -                raise exc.CompileError( -                    "SQLite does not support autoincrement for " -                    "composite primary keys" -                ) - -            if ( -                column.table.dialect_options["sqlite"]["autoincrement"] -                and len(column.table.primary_key.columns) == 1 -                and issubclass(column.type._type_affinity, sqltypes.Integer) -                and not column.foreign_keys -            ): -                colspec += " PRIMARY KEY" - -                on_conflict_clause = column.dialect_options["sqlite"][ -                    "on_conflict_primary_key" -                ] -                if on_conflict_clause is not None: -                    colspec += " ON CONFLICT " + on_conflict_clause - -                colspec += " AUTOINCREMENT" - -        if column.computed is not None: -            colspec += " " + self.process(column.computed) - -        return colspec - -    def visit_primary_key_constraint(self, constraint, **kw): -        # for columns with sqlite_autoincrement=True, -        # the PRIMARY KEY constraint can only be inline -        # with the column itself. -        if len(constraint.columns) == 1: -            c = list(constraint)[0] -            if ( -                c.primary_key -                and c.table.dialect_options["sqlite"]["autoincrement"] -                and issubclass(c.type._type_affinity, sqltypes.Integer) -                and not c.foreign_keys -            ): -                return None - -        text = super().visit_primary_key_constraint(constraint) - -        on_conflict_clause = constraint.dialect_options["sqlite"][ -            "on_conflict" -        ] -        if on_conflict_clause is None and len(constraint.columns) == 1: -            on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ -                "on_conflict_primary_key" -            ] - -        if on_conflict_clause is not None: -            text += " ON CONFLICT " + on_conflict_clause - -        return text - -    def visit_unique_constraint(self, constraint, **kw): -        text = super().visit_unique_constraint(constraint) - -        on_conflict_clause = constraint.dialect_options["sqlite"][ -            "on_conflict" -        ] -        if on_conflict_clause is None and len(constraint.columns) == 1: -            col1 = list(constraint)[0] -            if isinstance(col1, schema.SchemaItem): -                on_conflict_clause = list(constraint)[0].dialect_options[ -                    "sqlite" -                ]["on_conflict_unique"] - -        if on_conflict_clause is not None: -            text += " ON CONFLICT " + on_conflict_clause - -        return text - -    def visit_check_constraint(self, constraint, **kw): -        text = super().visit_check_constraint(constraint) - -        on_conflict_clause = constraint.dialect_options["sqlite"][ -            "on_conflict" -        ] - -        if on_conflict_clause is not None: -            text += " ON CONFLICT " + on_conflict_clause - -        return text - -    def visit_column_check_constraint(self, constraint, **kw): -        text = super().visit_column_check_constraint(constraint) - -        if constraint.dialect_options["sqlite"]["on_conflict"] is not None: -            raise exc.CompileError( -                "SQLite does not support on conflict clause for " -                "column check constraint" -            ) - -        return text - -    def visit_foreign_key_constraint(self, constraint, **kw): -        local_table = constraint.elements[0].parent.table -        remote_table = constraint.elements[0].column.table - -        if local_table.schema != remote_table.schema: -            return None -        else: -            return super().visit_foreign_key_constraint(constraint) - -    def define_constraint_remote_table(self, constraint, table, preparer): -        """Format the remote table clause of a CREATE CONSTRAINT clause.""" - -        return preparer.format_table(table, use_schema=False) - -    def visit_create_index( -        self, create, include_schema=False, include_table_schema=True, **kw -    ): -        index = create.element -        self._verify_index_table(index) -        preparer = self.preparer -        text = "CREATE " -        if index.unique: -            text += "UNIQUE " - -        text += "INDEX " - -        if create.if_not_exists: -            text += "IF NOT EXISTS " - -        text += "%s ON %s (%s)" % ( -            self._prepared_index_name(index, include_schema=True), -            preparer.format_table(index.table, use_schema=False), -            ", ".join( -                self.sql_compiler.process( -                    expr, include_table=False, literal_binds=True -                ) -                for expr in index.expressions -            ), -        ) - -        whereclause = index.dialect_options["sqlite"]["where"] -        if whereclause is not None: -            where_compiled = self.sql_compiler.process( -                whereclause, include_table=False, literal_binds=True -            ) -            text += " WHERE " + where_compiled - -        return text - -    def post_create_table(self, table): -        if table.dialect_options["sqlite"]["with_rowid"] is False: -            return "\n WITHOUT ROWID" -        return "" - - -class SQLiteTypeCompiler(compiler.GenericTypeCompiler): -    def visit_large_binary(self, type_, **kw): -        return self.visit_BLOB(type_) - -    def visit_DATETIME(self, type_, **kw): -        if ( -            not isinstance(type_, _DateTimeMixin) -            or type_.format_is_text_affinity -        ): -            return super().visit_DATETIME(type_) -        else: -            return "DATETIME_CHAR" - -    def visit_DATE(self, type_, **kw): -        if ( -            not isinstance(type_, _DateTimeMixin) -            or type_.format_is_text_affinity -        ): -            return super().visit_DATE(type_) -        else: -            return "DATE_CHAR" - -    def visit_TIME(self, type_, **kw): -        if ( -            not isinstance(type_, _DateTimeMixin) -            or type_.format_is_text_affinity -        ): -            return super().visit_TIME(type_) -        else: -            return "TIME_CHAR" - -    def visit_JSON(self, type_, **kw): -        # note this name provides NUMERIC affinity, not TEXT. -        # should not be an issue unless the JSON value consists of a single -        # numeric value.   JSONTEXT can be used if this case is required. -        return "JSON" - - -class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): -    reserved_words = { -        "add", -        "after", -        "all", -        "alter", -        "analyze", -        "and", -        "as", -        "asc", -        "attach", -        "autoincrement", -        "before", -        "begin", -        "between", -        "by", -        "cascade", -        "case", -        "cast", -        "check", -        "collate", -        "column", -        "commit", -        "conflict", -        "constraint", -        "create", -        "cross", -        "current_date", -        "current_time", -        "current_timestamp", -        "database", -        "default", -        "deferrable", -        "deferred", -        "delete", -        "desc", -        "detach", -        "distinct", -        "drop", -        "each", -        "else", -        "end", -        "escape", -        "except", -        "exclusive", -        "exists", -        "explain", -        "false", -        "fail", -        "for", -        "foreign", -        "from", -        "full", -        "glob", -        "group", -        "having", -        "if", -        "ignore", -        "immediate", -        "in", -        "index", -        "indexed", -        "initially", -        "inner", -        "insert", -        "instead", -        "intersect", -        "into", -        "is", -        "isnull", -        "join", -        "key", -        "left", -        "like", -        "limit", -        "match", -        "natural", -        "not", -        "notnull", -        "null", -        "of", -        "offset", -        "on", -        "or", -        "order", -        "outer", -        "plan", -        "pragma", -        "primary", -        "query", -        "raise", -        "references", -        "reindex", -        "rename", -        "replace", -        "restrict", -        "right", -        "rollback", -        "row", -        "select", -        "set", -        "table", -        "temp", -        "temporary", -        "then", -        "to", -        "transaction", -        "trigger", -        "true", -        "union", -        "unique", -        "update", -        "using", -        "vacuum", -        "values", -        "view", -        "virtual", -        "when", -        "where", -    } - - -class SQLiteExecutionContext(default.DefaultExecutionContext): -    @util.memoized_property -    def _preserve_raw_colnames(self): -        return ( -            not self.dialect._broken_dotted_colnames -            or self.execution_options.get("sqlite_raw_colnames", False) -        ) - -    def _translate_colname(self, colname): -        # TODO: detect SQLite version 3.10.0 or greater; -        # see [ticket:3633] - -        # adjust for dotted column names.  SQLite -        # in the case of UNION may store col names as -        # "tablename.colname", or if using an attached database, -        # "database.tablename.colname", in cursor.description -        if not self._preserve_raw_colnames and "." in colname: -            return colname.split(".")[-1], colname -        else: -            return colname, None - - -class SQLiteDialect(default.DefaultDialect): -    name = "sqlite" -    supports_alter = False - -    # SQlite supports "DEFAULT VALUES" but *does not* support -    # "VALUES (DEFAULT)" -    supports_default_values = True -    supports_default_metavalue = False - -    # sqlite issue: -    # https://github.com/python/cpython/issues/93421 -    # note this parameter is no longer used by the ORM or default dialect -    # see #9414 -    supports_sane_rowcount_returning = False - -    supports_empty_insert = False -    supports_cast = True -    supports_multivalues_insert = True -    use_insertmanyvalues = True -    tuple_in_values = True -    supports_statement_cache = True -    insert_null_pk_still_autoincrements = True -    insert_returning = True -    update_returning = True -    update_returning_multifrom = True -    delete_returning = True -    update_returning_multifrom = True - -    supports_default_metavalue = True -    """dialect supports INSERT... VALUES (DEFAULT) syntax""" - -    default_metavalue_token = "NULL" -    """for INSERT... VALUES (DEFAULT) syntax, the token to put in the -    parenthesis.""" - -    default_paramstyle = "qmark" -    execution_ctx_cls = SQLiteExecutionContext -    statement_compiler = SQLiteCompiler -    ddl_compiler = SQLiteDDLCompiler -    type_compiler_cls = SQLiteTypeCompiler -    preparer = SQLiteIdentifierPreparer -    ischema_names = ischema_names -    colspecs = colspecs - -    construct_arguments = [ -        ( -            sa_schema.Table, -            { -                "autoincrement": False, -                "with_rowid": True, -            }, -        ), -        (sa_schema.Index, {"where": None}), -        ( -            sa_schema.Column, -            { -                "on_conflict_primary_key": None, -                "on_conflict_not_null": None, -                "on_conflict_unique": None, -            }, -        ), -        (sa_schema.Constraint, {"on_conflict": None}), -    ] - -    _broken_fk_pragma_quotes = False -    _broken_dotted_colnames = False - -    @util.deprecated_params( -        _json_serializer=( -            "1.3.7", -            "The _json_serializer argument to the SQLite dialect has " -            "been renamed to the correct name of json_serializer.  The old " -            "argument name will be removed in a future release.", -        ), -        _json_deserializer=( -            "1.3.7", -            "The _json_deserializer argument to the SQLite dialect has " -            "been renamed to the correct name of json_deserializer.  The old " -            "argument name will be removed in a future release.", -        ), -    ) -    def __init__( -        self, -        native_datetime=False, -        json_serializer=None, -        json_deserializer=None, -        _json_serializer=None, -        _json_deserializer=None, -        **kwargs, -    ): -        default.DefaultDialect.__init__(self, **kwargs) - -        if _json_serializer: -            json_serializer = _json_serializer -        if _json_deserializer: -            json_deserializer = _json_deserializer -        self._json_serializer = json_serializer -        self._json_deserializer = json_deserializer - -        # this flag used by pysqlite dialect, and perhaps others in the -        # future, to indicate the driver is handling date/timestamp -        # conversions (and perhaps datetime/time as well on some hypothetical -        # driver ?) -        self.native_datetime = native_datetime - -        if self.dbapi is not None: -            if self.dbapi.sqlite_version_info < (3, 7, 16): -                util.warn( -                    "SQLite version %s is older than 3.7.16, and will not " -                    "support right nested joins, as are sometimes used in " -                    "more complex ORM scenarios.  SQLAlchemy 1.4 and above " -                    "no longer tries to rewrite these joins." -                    % (self.dbapi.sqlite_version_info,) -                ) - -            # NOTE: python 3.7 on fedora for me has SQLite 3.34.1.  These -            # version checks are getting very stale. -            self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( -                3, -                10, -                0, -            ) -            self.supports_default_values = self.dbapi.sqlite_version_info >= ( -                3, -                3, -                8, -            ) -            self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) -            self.supports_multivalues_insert = ( -                # https://www.sqlite.org/releaselog/3_7_11.html -                self.dbapi.sqlite_version_info -                >= (3, 7, 11) -            ) -            # see https://www.sqlalchemy.org/trac/ticket/2568 -            # as well as https://www.sqlite.org/src/info/600482d161 -            self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( -                3, -                6, -                14, -            ) - -            if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: -                self.update_returning = self.delete_returning = ( -                    self.insert_returning -                ) = False - -            if self.dbapi.sqlite_version_info < (3, 32, 0): -                # https://www.sqlite.org/limits.html -                self.insertmanyvalues_max_parameters = 999 - -    _isolation_lookup = util.immutabledict( -        {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} -    ) - -    def get_isolation_level_values(self, dbapi_connection): -        return list(self._isolation_lookup) - -    def set_isolation_level(self, dbapi_connection, level): -        isolation_level = self._isolation_lookup[level] - -        cursor = dbapi_connection.cursor() -        cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") -        cursor.close() - -    def get_isolation_level(self, dbapi_connection): -        cursor = dbapi_connection.cursor() -        cursor.execute("PRAGMA read_uncommitted") -        res = cursor.fetchone() -        if res: -            value = res[0] -        else: -            # https://www.sqlite.org/changes.html#version_3_3_3 -            # "Optional READ UNCOMMITTED isolation (instead of the -            # default isolation level of SERIALIZABLE) and -            # table level locking when database connections -            # share a common cache."" -            # pre-SQLite 3.3.0 default to 0 -            value = 0 -        cursor.close() -        if value == 0: -            return "SERIALIZABLE" -        elif value == 1: -            return "READ UNCOMMITTED" -        else: -            assert False, "Unknown isolation level %s" % value - -    @reflection.cache -    def get_schema_names(self, connection, **kw): -        s = "PRAGMA database_list" -        dl = connection.exec_driver_sql(s) - -        return [db[1] for db in dl if db[1] != "temp"] - -    def _format_schema(self, schema, table_name): -        if schema is not None: -            qschema = self.identifier_preparer.quote_identifier(schema) -            name = f"{qschema}.{table_name}" -        else: -            name = table_name -        return name - -    def _sqlite_main_query( -        self, -        table: str, -        type_: str, -        schema: Optional[str], -        sqlite_include_internal: bool, -    ): -        main = self._format_schema(schema, table) -        if not sqlite_include_internal: -            filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" -        else: -            filter_table = "" -        query = ( -            f"SELECT name FROM {main} " -            f"WHERE type='{type_}'{filter_table} " -            "ORDER BY name" -        ) -        return query - -    @reflection.cache -    def get_table_names( -        self, connection, schema=None, sqlite_include_internal=False, **kw -    ): -        query = self._sqlite_main_query( -            "sqlite_master", "table", schema, sqlite_include_internal -        ) -        names = connection.exec_driver_sql(query).scalars().all() -        return names - -    @reflection.cache -    def get_temp_table_names( -        self, connection, sqlite_include_internal=False, **kw -    ): -        query = self._sqlite_main_query( -            "sqlite_temp_master", "table", None, sqlite_include_internal -        ) -        names = connection.exec_driver_sql(query).scalars().all() -        return names - -    @reflection.cache -    def get_temp_view_names( -        self, connection, sqlite_include_internal=False, **kw -    ): -        query = self._sqlite_main_query( -            "sqlite_temp_master", "view", None, sqlite_include_internal -        ) -        names = connection.exec_driver_sql(query).scalars().all() -        return names - -    @reflection.cache -    def has_table(self, connection, table_name, schema=None, **kw): -        self._ensure_has_table_connection(connection) - -        if schema is not None and schema not in self.get_schema_names( -            connection, **kw -        ): -            return False - -        info = self._get_table_pragma( -            connection, "table_info", table_name, schema=schema -        ) -        return bool(info) - -    def _get_default_schema_name(self, connection): -        return "main" - -    @reflection.cache -    def get_view_names( -        self, connection, schema=None, sqlite_include_internal=False, **kw -    ): -        query = self._sqlite_main_query( -            "sqlite_master", "view", schema, sqlite_include_internal -        ) -        names = connection.exec_driver_sql(query).scalars().all() -        return names - -    @reflection.cache -    def get_view_definition(self, connection, view_name, schema=None, **kw): -        if schema is not None: -            qschema = self.identifier_preparer.quote_identifier(schema) -            master = f"{qschema}.sqlite_master" -            s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( -                master, -            ) -            rs = connection.exec_driver_sql(s, (view_name,)) -        else: -            try: -                s = ( -                    "SELECT sql FROM " -                    " (SELECT * FROM sqlite_master UNION ALL " -                    "  SELECT * FROM sqlite_temp_master) " -                    "WHERE name = ? " -                    "AND type='view'" -                ) -                rs = connection.exec_driver_sql(s, (view_name,)) -            except exc.DBAPIError: -                s = ( -                    "SELECT sql FROM sqlite_master WHERE name = ? " -                    "AND type='view'" -                ) -                rs = connection.exec_driver_sql(s, (view_name,)) - -        result = rs.fetchall() -        if result: -            return result[0].sql -        else: -            raise exc.NoSuchTableError( -                f"{schema}.{view_name}" if schema else view_name -            ) - -    @reflection.cache -    def get_columns(self, connection, table_name, schema=None, **kw): -        pragma = "table_info" -        # computed columns are threaded as hidden, they require table_xinfo -        if self.server_version_info >= (3, 31): -            pragma = "table_xinfo" -        info = self._get_table_pragma( -            connection, pragma, table_name, schema=schema -        ) -        columns = [] -        tablesql = None -        for row in info: -            name = row[1] -            type_ = row[2].upper() -            nullable = not row[3] -            default = row[4] -            primary_key = row[5] -            hidden = row[6] if pragma == "table_xinfo" else 0 - -            # hidden has value 0 for normal columns, 1 for hidden columns, -            # 2 for computed virtual columns and 3 for computed stored columns -            # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b -            if hidden == 1: -                continue - -            generated = bool(hidden) -            persisted = hidden == 3 - -            if tablesql is None and generated: -                tablesql = self._get_table_sql( -                    connection, table_name, schema, **kw -                ) - -            columns.append( -                self._get_column_info( -                    name, -                    type_, -                    nullable, -                    default, -                    primary_key, -                    generated, -                    persisted, -                    tablesql, -                ) -            ) -        if columns: -            return columns -        elif not self.has_table(connection, table_name, schema): -            raise exc.NoSuchTableError( -                f"{schema}.{table_name}" if schema else table_name -            ) -        else: -            return ReflectionDefaults.columns() - -    def _get_column_info( -        self, -        name, -        type_, -        nullable, -        default, -        primary_key, -        generated, -        persisted, -        tablesql, -    ): -        if generated: -            # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" -            # somehow is "INTEGER GENERATED ALWAYS" -            type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) -            type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() - -        coltype = self._resolve_type_affinity(type_) - -        if default is not None: -            default = str(default) - -        colspec = { -            "name": name, -            "type": coltype, -            "nullable": nullable, -            "default": default, -            "primary_key": primary_key, -        } -        if generated: -            sqltext = "" -            if tablesql: -                pattern = r"[^,]*\s+AS\s+\(([^,]*)\)\s*(?:virtual|stored)?" -                match = re.search( -                    re.escape(name) + pattern, tablesql, re.IGNORECASE -                ) -                if match: -                    sqltext = match.group(1) -            colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} -        return colspec - -    def _resolve_type_affinity(self, type_): -        """Return a data type from a reflected column, using affinity rules. - -        SQLite's goal for universal compatibility introduces some complexity -        during reflection, as a column's defined type might not actually be a -        type that SQLite understands - or indeed, my not be defined *at all*. -        Internally, SQLite handles this with a 'data type affinity' for each -        column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', -        'REAL', or 'NONE' (raw bits). The algorithm that determines this is -        listed in https://www.sqlite.org/datatype3.html section 2.1. - -        This method allows SQLAlchemy to support that algorithm, while still -        providing access to smarter reflection utilities by recognizing -        column definitions that SQLite only supports through affinity (like -        DATE and DOUBLE). - -        """ -        match = re.match(r"([\w ]+)(\(.*?\))?", type_) -        if match: -            coltype = match.group(1) -            args = match.group(2) -        else: -            coltype = "" -            args = "" - -        if coltype in self.ischema_names: -            coltype = self.ischema_names[coltype] -        elif "INT" in coltype: -            coltype = sqltypes.INTEGER -        elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: -            coltype = sqltypes.TEXT -        elif "BLOB" in coltype or not coltype: -            coltype = sqltypes.NullType -        elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: -            coltype = sqltypes.REAL -        else: -            coltype = sqltypes.NUMERIC - -        if args is not None: -            args = re.findall(r"(\d+)", args) -            try: -                coltype = coltype(*[int(a) for a in args]) -            except TypeError: -                util.warn( -                    "Could not instantiate type %s with " -                    "reflected arguments %s; using no arguments." -                    % (coltype, args) -                ) -                coltype = coltype() -        else: -            coltype = coltype() - -        return coltype - -    @reflection.cache -    def get_pk_constraint(self, connection, table_name, schema=None, **kw): -        constraint_name = None -        table_data = self._get_table_sql(connection, table_name, schema=schema) -        if table_data: -            PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY" -            result = re.search(PK_PATTERN, table_data, re.I) -            constraint_name = result.group(1) if result else None - -        cols = self.get_columns(connection, table_name, schema, **kw) -        # consider only pk columns. This also avoids sorting the cached -        # value returned by get_columns -        cols = [col for col in cols if col.get("primary_key", 0) > 0] -        cols.sort(key=lambda col: col.get("primary_key")) -        pkeys = [col["name"] for col in cols] - -        if pkeys: -            return {"constrained_columns": pkeys, "name": constraint_name} -        else: -            return ReflectionDefaults.pk_constraint() - -    @reflection.cache -    def get_foreign_keys(self, connection, table_name, schema=None, **kw): -        # sqlite makes this *extremely difficult*. -        # First, use the pragma to get the actual FKs. -        pragma_fks = self._get_table_pragma( -            connection, "foreign_key_list", table_name, schema=schema -        ) - -        fks = {} - -        for row in pragma_fks: -            (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) - -            if not rcol: -                # no referred column, which means it was not named in the -                # original DDL.  The referred columns of the foreign key -                # constraint are therefore the primary key of the referred -                # table. -                try: -                    referred_pk = self.get_pk_constraint( -                        connection, rtbl, schema=schema, **kw -                    ) -                    referred_columns = referred_pk["constrained_columns"] -                except exc.NoSuchTableError: -                    # ignore not existing parents -                    referred_columns = [] -            else: -                # note we use this list only if this is the first column -                # in the constraint.  for subsequent columns we ignore the -                # list and append "rcol" if present. -                referred_columns = [] - -            if self._broken_fk_pragma_quotes: -                rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) - -            if numerical_id in fks: -                fk = fks[numerical_id] -            else: -                fk = fks[numerical_id] = { -                    "name": None, -                    "constrained_columns": [], -                    "referred_schema": schema, -                    "referred_table": rtbl, -                    "referred_columns": referred_columns, -                    "options": {}, -                } -                fks[numerical_id] = fk - -            fk["constrained_columns"].append(lcol) - -            if rcol: -                fk["referred_columns"].append(rcol) - -        def fk_sig(constrained_columns, referred_table, referred_columns): -            return ( -                tuple(constrained_columns) -                + (referred_table,) -                + tuple(referred_columns) -            ) - -        # then, parse the actual SQL and attempt to find DDL that matches -        # the names as well.   SQLite saves the DDL in whatever format -        # it was typed in as, so need to be liberal here. - -        keys_by_signature = { -            fk_sig( -                fk["constrained_columns"], -                fk["referred_table"], -                fk["referred_columns"], -            ): fk -            for fk in fks.values() -        } - -        table_data = self._get_table_sql(connection, table_name, schema=schema) - -        def parse_fks(): -            if table_data is None: -                # system tables, etc. -                return - -            # note that we already have the FKs from PRAGMA above.  This whole -            # regexp thing is trying to locate additional detail about the -            # FKs, namely the name of the constraint and other options. -            # so parsing the columns is really about matching it up to what -            # we already have. -            FK_PATTERN = ( -                r"(?:CONSTRAINT (\w+) +)?" -                r"FOREIGN KEY *\( *(.+?) *\) +" -                r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *'  # noqa: E501 -                r"((?:ON (?:DELETE|UPDATE) " -                r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" -                r"((?:NOT +)?DEFERRABLE)?" -                r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" -            ) -            for match in re.finditer(FK_PATTERN, table_data, re.I): -                ( -                    constraint_name, -                    constrained_columns, -                    referred_quoted_name, -                    referred_name, -                    referred_columns, -                    onupdatedelete, -                    deferrable, -                    initially, -                ) = match.group(1, 2, 3, 4, 5, 6, 7, 8) -                constrained_columns = list( -                    self._find_cols_in_sig(constrained_columns) -                ) -                if not referred_columns: -                    referred_columns = constrained_columns -                else: -                    referred_columns = list( -                        self._find_cols_in_sig(referred_columns) -                    ) -                referred_name = referred_quoted_name or referred_name -                options = {} - -                for token in re.split(r" *\bON\b *", onupdatedelete.upper()): -                    if token.startswith("DELETE"): -                        ondelete = token[6:].strip() -                        if ondelete and ondelete != "NO ACTION": -                            options["ondelete"] = ondelete -                    elif token.startswith("UPDATE"): -                        onupdate = token[6:].strip() -                        if onupdate and onupdate != "NO ACTION": -                            options["onupdate"] = onupdate - -                if deferrable: -                    options["deferrable"] = "NOT" not in deferrable.upper() -                if initially: -                    options["initially"] = initially.upper() - -                yield ( -                    constraint_name, -                    constrained_columns, -                    referred_name, -                    referred_columns, -                    options, -                ) - -        fkeys = [] - -        for ( -            constraint_name, -            constrained_columns, -            referred_name, -            referred_columns, -            options, -        ) in parse_fks(): -            sig = fk_sig(constrained_columns, referred_name, referred_columns) -            if sig not in keys_by_signature: -                util.warn( -                    "WARNING: SQL-parsed foreign key constraint " -                    "'%s' could not be located in PRAGMA " -                    "foreign_keys for table %s" % (sig, table_name) -                ) -                continue -            key = keys_by_signature.pop(sig) -            key["name"] = constraint_name -            key["options"] = options -            fkeys.append(key) -        # assume the remainders are the unnamed, inline constraints, just -        # use them as is as it's extremely difficult to parse inline -        # constraints -        fkeys.extend(keys_by_signature.values()) -        if fkeys: -            return fkeys -        else: -            return ReflectionDefaults.foreign_keys() - -    def _find_cols_in_sig(self, sig): -        for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): -            yield match.group(1) or match.group(2) - -    @reflection.cache -    def get_unique_constraints( -        self, connection, table_name, schema=None, **kw -    ): -        auto_index_by_sig = {} -        for idx in self.get_indexes( -            connection, -            table_name, -            schema=schema, -            include_auto_indexes=True, -            **kw, -        ): -            if not idx["name"].startswith("sqlite_autoindex"): -                continue -            sig = tuple(idx["column_names"]) -            auto_index_by_sig[sig] = idx - -        table_data = self._get_table_sql( -            connection, table_name, schema=schema, **kw -        ) -        unique_constraints = [] - -        def parse_uqs(): -            if table_data is None: -                return -            UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)' -            INLINE_UNIQUE_PATTERN = ( -                r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?) ' -                r"+[a-z0-9_ ]+? +UNIQUE" -            ) - -            for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): -                name, cols = match.group(1, 2) -                yield name, list(self._find_cols_in_sig(cols)) - -            # we need to match inlines as well, as we seek to differentiate -            # a UNIQUE constraint from a UNIQUE INDEX, even though these -            # are kind of the same thing :) -            for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): -                cols = list( -                    self._find_cols_in_sig(match.group(1) or match.group(2)) -                ) -                yield None, cols - -        for name, cols in parse_uqs(): -            sig = tuple(cols) -            if sig in auto_index_by_sig: -                auto_index_by_sig.pop(sig) -                parsed_constraint = {"name": name, "column_names": cols} -                unique_constraints.append(parsed_constraint) -        # NOTE: auto_index_by_sig might not be empty here, -        # the PRIMARY KEY may have an entry. -        if unique_constraints: -            return unique_constraints -        else: -            return ReflectionDefaults.unique_constraints() - -    @reflection.cache -    def get_check_constraints(self, connection, table_name, schema=None, **kw): -        table_data = self._get_table_sql( -            connection, table_name, schema=schema, **kw -        ) - -        CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?" r"CHECK *\( *(.+) *\),? *" -        cks = [] -        # NOTE: we aren't using re.S here because we actually are -        # taking advantage of each CHECK constraint being all on one -        # line in the table definition in order to delineate.  This -        # necessarily makes assumptions as to how the CREATE TABLE -        # was emitted. - -        for match in re.finditer(CHECK_PATTERN, table_data or "", re.I): -            name = match.group(1) - -            if name: -                name = re.sub(r'^"|"$', "", name) - -            cks.append({"sqltext": match.group(2), "name": name}) -        cks.sort(key=lambda d: d["name"] or "~")  # sort None as last -        if cks: -            return cks -        else: -            return ReflectionDefaults.check_constraints() - -    @reflection.cache -    def get_indexes(self, connection, table_name, schema=None, **kw): -        pragma_indexes = self._get_table_pragma( -            connection, "index_list", table_name, schema=schema -        ) -        indexes = [] - -        # regular expression to extract the filter predicate of a partial -        # index. this could fail to extract the predicate correctly on -        # indexes created like -        #   CREATE INDEX i ON t (col || ') where') WHERE col <> '' -        # but as this function does not support expression-based indexes -        # this case does not occur. -        partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) - -        if schema: -            schema_expr = "%s." % self.identifier_preparer.quote_identifier( -                schema -            ) -        else: -            schema_expr = "" - -        include_auto_indexes = kw.pop("include_auto_indexes", False) -        for row in pragma_indexes: -            # ignore implicit primary key index. -            # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html -            if not include_auto_indexes and row[1].startswith( -                "sqlite_autoindex" -            ): -                continue -            indexes.append( -                dict( -                    name=row[1], -                    column_names=[], -                    unique=row[2], -                    dialect_options={}, -                ) -            ) - -            # check partial indexes -            if len(row) >= 5 and row[4]: -                s = ( -                    "SELECT sql FROM %(schema)ssqlite_master " -                    "WHERE name = ? " -                    "AND type = 'index'" % {"schema": schema_expr} -                ) -                rs = connection.exec_driver_sql(s, (row[1],)) -                index_sql = rs.scalar() -                predicate_match = partial_pred_re.search(index_sql) -                if predicate_match is None: -                    # unless the regex is broken this case shouldn't happen -                    # because we know this is a partial index, so the -                    # definition sql should match the regex -                    util.warn( -                        "Failed to look up filter predicate of " -                        "partial index %s" % row[1] -                    ) -                else: -                    predicate = predicate_match.group(1) -                    indexes[-1]["dialect_options"]["sqlite_where"] = text( -                        predicate -                    ) - -        # loop thru unique indexes to get the column names. -        for idx in list(indexes): -            pragma_index = self._get_table_pragma( -                connection, "index_info", idx["name"], schema=schema -            ) - -            for row in pragma_index: -                if row[2] is None: -                    util.warn( -                        "Skipped unsupported reflection of " -                        "expression-based index %s" % idx["name"] -                    ) -                    indexes.remove(idx) -                    break -                else: -                    idx["column_names"].append(row[2]) - -        indexes.sort(key=lambda d: d["name"] or "~")  # sort None as last -        if indexes: -            return indexes -        elif not self.has_table(connection, table_name, schema): -            raise exc.NoSuchTableError( -                f"{schema}.{table_name}" if schema else table_name -            ) -        else: -            return ReflectionDefaults.indexes() - -    def _is_sys_table(self, table_name): -        return table_name in { -            "sqlite_schema", -            "sqlite_master", -            "sqlite_temp_schema", -            "sqlite_temp_master", -        } - -    @reflection.cache -    def _get_table_sql(self, connection, table_name, schema=None, **kw): -        if schema: -            schema_expr = "%s." % ( -                self.identifier_preparer.quote_identifier(schema) -            ) -        else: -            schema_expr = "" -        try: -            s = ( -                "SELECT sql FROM " -                " (SELECT * FROM %(schema)ssqlite_master UNION ALL " -                "  SELECT * FROM %(schema)ssqlite_temp_master) " -                "WHERE name = ? " -                "AND type in ('table', 'view')" % {"schema": schema_expr} -            ) -            rs = connection.exec_driver_sql(s, (table_name,)) -        except exc.DBAPIError: -            s = ( -                "SELECT sql FROM %(schema)ssqlite_master " -                "WHERE name = ? " -                "AND type in ('table', 'view')" % {"schema": schema_expr} -            ) -            rs = connection.exec_driver_sql(s, (table_name,)) -        value = rs.scalar() -        if value is None and not self._is_sys_table(table_name): -            raise exc.NoSuchTableError(f"{schema_expr}{table_name}") -        return value - -    def _get_table_pragma(self, connection, pragma, table_name, schema=None): -        quote = self.identifier_preparer.quote_identifier -        if schema is not None: -            statements = [f"PRAGMA {quote(schema)}."] -        else: -            # because PRAGMA looks in all attached databases if no schema -            # given, need to specify "main" schema, however since we want -            # 'temp' tables in the same namespace as 'main', need to run -            # the PRAGMA twice -            statements = ["PRAGMA main.", "PRAGMA temp."] - -        qtable = quote(table_name) -        for statement in statements: -            statement = f"{statement}{pragma}({qtable})" -            cursor = connection.exec_driver_sql(statement) -            if not cursor._soft_closed: -                # work around SQLite issue whereby cursor.description -                # is blank when PRAGMA returns no rows: -                # https://www.sqlite.org/cvstrac/tktview?tn=1884 -                result = cursor.fetchall() -            else: -                result = [] -            if result: -                return result -        else: -            return [] diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py deleted file mode 100644 index dcf5e44..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py +++ /dev/null @@ -1,240 +0,0 @@ -# dialects/sqlite/dml.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -from __future__ import annotations - -from typing import Any - -from .._typing import _OnConflictIndexElementsT -from .._typing import _OnConflictIndexWhereT -from .._typing import _OnConflictSetT -from .._typing import _OnConflictWhereT -from ... import util -from ...sql import coercions -from ...sql import roles -from ...sql._typing import _DMLTableArgument -from ...sql.base import _exclusive_against -from ...sql.base import _generative -from ...sql.base import ColumnCollection -from ...sql.base import ReadOnlyColumnCollection -from ...sql.dml import Insert as StandardInsert -from ...sql.elements import ClauseElement -from ...sql.elements import KeyedColumnElement -from ...sql.expression import alias -from ...util.typing import Self - -__all__ = ("Insert", "insert") - - -def insert(table: _DMLTableArgument) -> Insert: -    """Construct a sqlite-specific variant :class:`_sqlite.Insert` -    construct. - -    .. container:: inherited_member - -        The :func:`sqlalchemy.dialects.sqlite.insert` function creates -        a :class:`sqlalchemy.dialects.sqlite.Insert`.  This class is based -        on the dialect-agnostic :class:`_sql.Insert` construct which may -        be constructed using the :func:`_sql.insert` function in -        SQLAlchemy Core. - -    The :class:`_sqlite.Insert` construct includes additional methods -    :meth:`_sqlite.Insert.on_conflict_do_update`, -    :meth:`_sqlite.Insert.on_conflict_do_nothing`. - -    """ -    return Insert(table) - - -class Insert(StandardInsert): -    """SQLite-specific implementation of INSERT. - -    Adds methods for SQLite-specific syntaxes such as ON CONFLICT. - -    The :class:`_sqlite.Insert` object is created using the -    :func:`sqlalchemy.dialects.sqlite.insert` function. - -    .. versionadded:: 1.4 - -    .. seealso:: - -        :ref:`sqlite_on_conflict_insert` - -    """ - -    stringify_dialect = "sqlite" -    inherit_cache = False - -    @util.memoized_property -    def excluded( -        self, -    ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: -        """Provide the ``excluded`` namespace for an ON CONFLICT statement - -        SQLite's ON CONFLICT clause allows reference to the row that would -        be inserted, known as ``excluded``.  This attribute provides -        all columns in this row to be referenceable. - -        .. tip::  The :attr:`_sqlite.Insert.excluded` attribute is an instance -            of :class:`_expression.ColumnCollection`, which provides an -            interface the same as that of the :attr:`_schema.Table.c` -            collection described at :ref:`metadata_tables_and_columns`. -            With this collection, ordinary names are accessible like attributes -            (e.g. ``stmt.excluded.some_column``), but special names and -            dictionary method names should be accessed using indexed access, -            such as ``stmt.excluded["column name"]`` or -            ``stmt.excluded["values"]``.  See the docstring for -            :class:`_expression.ColumnCollection` for further examples. - -        """ -        return alias(self.table, name="excluded").columns - -    _on_conflict_exclusive = _exclusive_against( -        "_post_values_clause", -        msgs={ -            "_post_values_clause": "This Insert construct already has " -            "an ON CONFLICT clause established" -        }, -    ) - -    @_generative -    @_on_conflict_exclusive -    def on_conflict_do_update( -        self, -        index_elements: _OnConflictIndexElementsT = None, -        index_where: _OnConflictIndexWhereT = None, -        set_: _OnConflictSetT = None, -        where: _OnConflictWhereT = None, -    ) -> Self: -        r""" -        Specifies a DO UPDATE SET action for ON CONFLICT clause. - -        :param index_elements: -         A sequence consisting of string column names, :class:`_schema.Column` -         objects, or other column expression objects that will be used -         to infer a target index or unique constraint. - -        :param index_where: -         Additional WHERE criterion that can be used to infer a -         conditional target index. - -        :param set\_: -         A dictionary or other mapping object -         where the keys are either names of columns in the target table, -         or :class:`_schema.Column` objects or other ORM-mapped columns -         matching that of the target table, and expressions or literals -         as values, specifying the ``SET`` actions to take. - -         .. versionadded:: 1.4 The -            :paramref:`_sqlite.Insert.on_conflict_do_update.set_` -            parameter supports :class:`_schema.Column` objects from the target -            :class:`_schema.Table` as keys. - -         .. warning:: This dictionary does **not** take into account -            Python-specified default UPDATE values or generation functions, -            e.g. those specified using :paramref:`_schema.Column.onupdate`. -            These values will not be exercised for an ON CONFLICT style of -            UPDATE, unless they are manually specified in the -            :paramref:`.Insert.on_conflict_do_update.set_` dictionary. - -        :param where: -         Optional argument. If present, can be a literal SQL -         string or an acceptable expression for a ``WHERE`` clause -         that restricts the rows affected by ``DO UPDATE SET``. Rows -         not meeting the ``WHERE`` condition will not be updated -         (effectively a ``DO NOTHING`` for those rows). - -        """ - -        self._post_values_clause = OnConflictDoUpdate( -            index_elements, index_where, set_, where -        ) -        return self - -    @_generative -    @_on_conflict_exclusive -    def on_conflict_do_nothing( -        self, -        index_elements: _OnConflictIndexElementsT = None, -        index_where: _OnConflictIndexWhereT = None, -    ) -> Self: -        """ -        Specifies a DO NOTHING action for ON CONFLICT clause. - -        :param index_elements: -         A sequence consisting of string column names, :class:`_schema.Column` -         objects, or other column expression objects that will be used -         to infer a target index or unique constraint. - -        :param index_where: -         Additional WHERE criterion that can be used to infer a -         conditional target index. - -        """ - -        self._post_values_clause = OnConflictDoNothing( -            index_elements, index_where -        ) -        return self - - -class OnConflictClause(ClauseElement): -    stringify_dialect = "sqlite" - -    constraint_target: None -    inferred_target_elements: _OnConflictIndexElementsT -    inferred_target_whereclause: _OnConflictIndexWhereT - -    def __init__( -        self, -        index_elements: _OnConflictIndexElementsT = None, -        index_where: _OnConflictIndexWhereT = None, -    ): -        if index_elements is not None: -            self.constraint_target = None -            self.inferred_target_elements = index_elements -            self.inferred_target_whereclause = index_where -        else: -            self.constraint_target = self.inferred_target_elements = ( -                self.inferred_target_whereclause -            ) = None - - -class OnConflictDoNothing(OnConflictClause): -    __visit_name__ = "on_conflict_do_nothing" - - -class OnConflictDoUpdate(OnConflictClause): -    __visit_name__ = "on_conflict_do_update" - -    def __init__( -        self, -        index_elements: _OnConflictIndexElementsT = None, -        index_where: _OnConflictIndexWhereT = None, -        set_: _OnConflictSetT = None, -        where: _OnConflictWhereT = None, -    ): -        super().__init__( -            index_elements=index_elements, -            index_where=index_where, -        ) - -        if isinstance(set_, dict): -            if not set_: -                raise ValueError("set parameter dictionary must not be empty") -        elif isinstance(set_, ColumnCollection): -            set_ = dict(set_) -        else: -            raise ValueError( -                "set parameter must be a non-empty dictionary " -                "or a ColumnCollection such as the `.c.` collection " -                "of a Table object" -            ) -        self.update_values_to_set = [ -            (coercions.expect(roles.DMLColumnRole, key), value) -            for key, value in set_.items() -        ] -        self.update_whereclause = where diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py deleted file mode 100644 index ec29802..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py +++ /dev/null @@ -1,92 +0,0 @@ -# dialects/sqlite/json.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -from ... import types as sqltypes - - -class JSON(sqltypes.JSON): -    """SQLite JSON type. - -    SQLite supports JSON as of version 3.9 through its JSON1_ extension. Note -    that JSON1_ is a -    `loadable extension <https://www.sqlite.org/loadext.html>`_ and as such -    may not be available, or may require run-time loading. - -    :class:`_sqlite.JSON` is used automatically whenever the base -    :class:`_types.JSON` datatype is used against a SQLite backend. - -    .. seealso:: - -        :class:`_types.JSON` - main documentation for the generic -        cross-platform JSON datatype. - -    The :class:`_sqlite.JSON` type supports persistence of JSON values -    as well as the core index operations provided by :class:`_types.JSON` -    datatype, by adapting the operations to render the ``JSON_EXTRACT`` -    function wrapped in the ``JSON_QUOTE`` function at the database level. -    Extracted values are quoted in order to ensure that the results are -    always JSON string values. - - -    .. versionadded:: 1.3 - - -    .. _JSON1: https://www.sqlite.org/json1.html - -    """ - - -# Note: these objects currently match exactly those of MySQL, however since -# these are not generalizable to all JSON implementations, remain separately -# implemented for each dialect. -class _FormatTypeMixin: -    def _format_value(self, value): -        raise NotImplementedError() - -    def bind_processor(self, dialect): -        super_proc = self.string_bind_processor(dialect) - -        def process(value): -            value = self._format_value(value) -            if super_proc: -                value = super_proc(value) -            return value - -        return process - -    def literal_processor(self, dialect): -        super_proc = self.string_literal_processor(dialect) - -        def process(value): -            value = self._format_value(value) -            if super_proc: -                value = super_proc(value) -            return value - -        return process - - -class JSONIndexType(_FormatTypeMixin, sqltypes.JSON.JSONIndexType): -    def _format_value(self, value): -        if isinstance(value, int): -            value = "$[%s]" % value -        else: -            value = '$."%s"' % value -        return value - - -class JSONPathType(_FormatTypeMixin, sqltypes.JSON.JSONPathType): -    def _format_value(self, value): -        return "$%s" % ( -            "".join( -                [ -                    "[%s]" % elem if isinstance(elem, int) else '."%s"' % elem -                    for elem in value -                ] -            ) -        ) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py deleted file mode 100644 index f18568b..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py +++ /dev/null @@ -1,198 +0,0 @@ -# dialects/sqlite/provision.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -import os -import re - -from ... import exc -from ...engine import url as sa_url -from ...testing.provision import create_db -from ...testing.provision import drop_db -from ...testing.provision import follower_url_from_main -from ...testing.provision import generate_driver_url -from ...testing.provision import log -from ...testing.provision import post_configure_engine -from ...testing.provision import run_reap_dbs -from ...testing.provision import stop_test_class_outside_fixtures -from ...testing.provision import temp_table_keyword_args -from ...testing.provision import upsert - - -# TODO: I can't get this to build dynamically with pytest-xdist procs -_drivernames = { -    "pysqlite", -    "aiosqlite", -    "pysqlcipher", -    "pysqlite_numeric", -    "pysqlite_dollar", -} - - -def _format_url(url, driver, ident): -    """given a sqlite url + desired driver + ident, make a canonical -    URL out of it - -    """ -    url = sa_url.make_url(url) - -    if driver is None: -        driver = url.get_driver_name() - -    filename = url.database - -    needs_enc = driver == "pysqlcipher" -    name_token = None - -    if filename and filename != ":memory:": -        assert "test_schema" not in filename -        tokens = re.split(r"[_\.]", filename) - -        new_filename = f"{driver}" - -        for token in tokens: -            if token in _drivernames: -                if driver is None: -                    driver = token -                continue -            elif token in ("db", "enc"): -                continue -            elif name_token is None: -                name_token = token.strip("_") - -        assert name_token, f"sqlite filename has no name token: {url.database}" - -        new_filename = f"{name_token}_{driver}" -        if ident: -            new_filename += f"_{ident}" -        new_filename += ".db" -        if needs_enc: -            new_filename += ".enc" -        url = url.set(database=new_filename) - -    if needs_enc: -        url = url.set(password="test") - -    url = url.set(drivername="sqlite+%s" % (driver,)) - -    return url - - -@generate_driver_url.for_db("sqlite") -def generate_driver_url(url, driver, query_str): -    url = _format_url(url, driver, None) - -    try: -        url.get_dialect() -    except exc.NoSuchModuleError: -        return None -    else: -        return url - - -@follower_url_from_main.for_db("sqlite") -def _sqlite_follower_url_from_main(url, ident): -    return _format_url(url, None, ident) - - -@post_configure_engine.for_db("sqlite") -def _sqlite_post_configure_engine(url, engine, follower_ident): -    from sqlalchemy import event - -    if follower_ident: -        attach_path = f"{follower_ident}_{engine.driver}_test_schema.db" -    else: -        attach_path = f"{engine.driver}_test_schema.db" - -    @event.listens_for(engine, "connect") -    def connect(dbapi_connection, connection_record): -        # use file DBs in all cases, memory acts kind of strangely -        # as an attached - -        # NOTE!  this has to be done *per connection*.  New sqlite connection, -        # as we get with say, QueuePool, the attaches are gone. -        # so schemes to delete those attached files have to be done at the -        # filesystem level and not rely upon what attachments are in a -        # particular SQLite connection -        dbapi_connection.execute( -            f'ATTACH DATABASE "{attach_path}" AS test_schema' -        ) - -    @event.listens_for(engine, "engine_disposed") -    def dispose(engine): -        """most databases should be dropped using -        stop_test_class_outside_fixtures - -        however a few tests like AttachedDBTest might not get triggered on -        that main hook - -        """ - -        if os.path.exists(attach_path): -            os.remove(attach_path) - -        filename = engine.url.database - -        if filename and filename != ":memory:" and os.path.exists(filename): -            os.remove(filename) - - -@create_db.for_db("sqlite") -def _sqlite_create_db(cfg, eng, ident): -    pass - - -@drop_db.for_db("sqlite") -def _sqlite_drop_db(cfg, eng, ident): -    _drop_dbs_w_ident(eng.url.database, eng.driver, ident) - - -def _drop_dbs_w_ident(databasename, driver, ident): -    for path in os.listdir("."): -        fname, ext = os.path.split(path) -        if ident in fname and ext in [".db", ".db.enc"]: -            log.info("deleting SQLite database file: %s", path) -            os.remove(path) - - -@stop_test_class_outside_fixtures.for_db("sqlite") -def stop_test_class_outside_fixtures(config, db, cls): -    db.dispose() - - -@temp_table_keyword_args.for_db("sqlite") -def _sqlite_temp_table_keyword_args(cfg, eng): -    return {"prefixes": ["TEMPORARY"]} - - -@run_reap_dbs.for_db("sqlite") -def _reap_sqlite_dbs(url, idents): -    log.info("db reaper connecting to %r", url) -    log.info("identifiers in file: %s", ", ".join(idents)) -    url = sa_url.make_url(url) -    for ident in idents: -        for drivername in _drivernames: -            _drop_dbs_w_ident(url.database, drivername, ident) - - -@upsert.for_db("sqlite") -def _upsert( -    cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False -): -    from sqlalchemy.dialects.sqlite import insert - -    stmt = insert(table) - -    if set_lambda: -        stmt = stmt.on_conflict_do_update(set_=set_lambda(stmt.excluded)) -    else: -        stmt = stmt.on_conflict_do_nothing() - -    stmt = stmt.returning( -        *returning, sort_by_parameter_order=sort_by_parameter_order -    ) -    return stmt diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py deleted file mode 100644 index 388a4df..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py +++ /dev/null @@ -1,155 +0,0 @@ -# dialects/sqlite/pysqlcipher.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -""" -.. dialect:: sqlite+pysqlcipher -    :name: pysqlcipher -    :dbapi: sqlcipher 3 or pysqlcipher -    :connectstring: sqlite+pysqlcipher://:passphrase@/file_path[?kdf_iter=<iter>] - -    Dialect for support of DBAPIs that make use of the -    `SQLCipher <https://www.zetetic.net/sqlcipher>`_ backend. - - -Driver ------- - -Current dialect selection logic is: - -* If the :paramref:`_sa.create_engine.module` parameter supplies a DBAPI module, -  that module is used. -* Otherwise for Python 3, choose https://pypi.org/project/sqlcipher3/ -* If not available, fall back to https://pypi.org/project/pysqlcipher3/ -* For Python 2, https://pypi.org/project/pysqlcipher/ is used. - -.. warning:: The ``pysqlcipher3`` and ``pysqlcipher`` DBAPI drivers are no -   longer maintained; the ``sqlcipher3`` driver as of this writing appears -   to be current.  For future compatibility, any pysqlcipher-compatible DBAPI -   may be used as follows:: - -        import sqlcipher_compatible_driver - -        from sqlalchemy import create_engine - -        e = create_engine( -            "sqlite+pysqlcipher://:password@/dbname.db", -            module=sqlcipher_compatible_driver -        ) - -These drivers make use of the SQLCipher engine. This system essentially -introduces new PRAGMA commands to SQLite which allows the setting of a -passphrase and other encryption parameters, allowing the database file to be -encrypted. - - -Connect Strings ---------------- - -The format of the connect string is in every way the same as that -of the :mod:`~sqlalchemy.dialects.sqlite.pysqlite` driver, except that the -"password" field is now accepted, which should contain a passphrase:: - -    e = create_engine('sqlite+pysqlcipher://:testing@/foo.db') - -For an absolute file path, two leading slashes should be used for the -database name:: - -    e = create_engine('sqlite+pysqlcipher://:testing@//path/to/foo.db') - -A selection of additional encryption-related pragmas supported by SQLCipher -as documented at https://www.zetetic.net/sqlcipher/sqlcipher-api/ can be passed -in the query string, and will result in that PRAGMA being called for each -new connection.  Currently, ``cipher``, ``kdf_iter`` -``cipher_page_size`` and ``cipher_use_hmac`` are supported:: - -    e = create_engine('sqlite+pysqlcipher://:testing@/foo.db?cipher=aes-256-cfb&kdf_iter=64000') - -.. warning:: Previous versions of sqlalchemy did not take into consideration -   the encryption-related pragmas passed in the url string, that were silently -   ignored. This may cause errors when opening files saved by a -   previous sqlalchemy version if the encryption options do not match. - - -Pooling Behavior ----------------- - -The driver makes a change to the default pool behavior of pysqlite -as described in :ref:`pysqlite_threading_pooling`.   The pysqlcipher driver -has been observed to be significantly slower on connection than the -pysqlite driver, most likely due to the encryption overhead, so the -dialect here defaults to using the :class:`.SingletonThreadPool` -implementation, -instead of the :class:`.NullPool` pool used by pysqlite.  As always, the pool -implementation is entirely configurable using the -:paramref:`_sa.create_engine.poolclass` parameter; the :class:`. -StaticPool` may -be more feasible for single-threaded use, or :class:`.NullPool` may be used -to prevent unencrypted connections from being held open for long periods of -time, at the expense of slower startup time for new connections. - - -"""  # noqa - -from .pysqlite import SQLiteDialect_pysqlite -from ... import pool - - -class SQLiteDialect_pysqlcipher(SQLiteDialect_pysqlite): -    driver = "pysqlcipher" -    supports_statement_cache = True - -    pragmas = ("kdf_iter", "cipher", "cipher_page_size", "cipher_use_hmac") - -    @classmethod -    def import_dbapi(cls): -        try: -            import sqlcipher3 as sqlcipher -        except ImportError: -            pass -        else: -            return sqlcipher - -        from pysqlcipher3 import dbapi2 as sqlcipher - -        return sqlcipher - -    @classmethod -    def get_pool_class(cls, url): -        return pool.SingletonThreadPool - -    def on_connect_url(self, url): -        super_on_connect = super().on_connect_url(url) - -        # pull the info we need from the URL early.  Even though URL -        # is immutable, we don't want any in-place changes to the URL -        # to affect things -        passphrase = url.password or "" -        url_query = dict(url.query) - -        def on_connect(conn): -            cursor = conn.cursor() -            cursor.execute('pragma key="%s"' % passphrase) -            for prag in self.pragmas: -                value = url_query.get(prag, None) -                if value is not None: -                    cursor.execute('pragma %s="%s"' % (prag, value)) -            cursor.close() - -            if super_on_connect: -                super_on_connect(conn) - -        return on_connect - -    def create_connect_args(self, url): -        plain_url = url._replace(password=None) -        plain_url = plain_url.difference_update_query(self.pragmas) -        return super().create_connect_args(plain_url) - - -dialect = SQLiteDialect_pysqlcipher diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py deleted file mode 100644 index f39baf3..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py +++ /dev/null @@ -1,756 +0,0 @@ -# dialects/sqlite/pysqlite.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -r""" -.. dialect:: sqlite+pysqlite -    :name: pysqlite -    :dbapi: sqlite3 -    :connectstring: sqlite+pysqlite:///file_path -    :url: https://docs.python.org/library/sqlite3.html - -    Note that ``pysqlite`` is the same driver as the ``sqlite3`` -    module included with the Python distribution. - -Driver ------- - -The ``sqlite3`` Python DBAPI is standard on all modern Python versions; -for cPython and Pypy, no additional installation is necessary. - - -Connect Strings ---------------- - -The file specification for the SQLite database is taken as the "database" -portion of the URL.  Note that the format of a SQLAlchemy url is:: - -    driver://user:pass@host/database - -This means that the actual filename to be used starts with the characters to -the **right** of the third slash.   So connecting to a relative filepath -looks like:: - -    # relative path -    e = create_engine('sqlite:///path/to/database.db') - -An absolute path, which is denoted by starting with a slash, means you -need **four** slashes:: - -    # absolute path -    e = create_engine('sqlite:////path/to/database.db') - -To use a Windows path, regular drive specifications and backslashes can be -used. Double backslashes are probably needed:: - -    # absolute path on Windows -    e = create_engine('sqlite:///C:\\path\\to\\database.db') - -To use sqlite ``:memory:`` database specify it as the filename using -``sqlite://:memory:``. It's also the default if no filepath is -present, specifying only ``sqlite://`` and nothing else:: - -    # in-memory database -    e = create_engine('sqlite://:memory:') -    # also in-memory database -    e2 = create_engine('sqlite://') - -.. _pysqlite_uri_connections: - -URI Connections -^^^^^^^^^^^^^^^ - -Modern versions of SQLite support an alternative system of connecting using a -`driver level URI <https://www.sqlite.org/uri.html>`_, which has the  advantage -that additional driver-level arguments can be passed including options such as -"read only".   The Python sqlite3 driver supports this mode under modern Python -3 versions.   The SQLAlchemy pysqlite driver supports this mode of use by -specifying "uri=true" in the URL query string.  The SQLite-level "URI" is kept -as the "database" portion of the SQLAlchemy url (that is, following a slash):: - -    e = create_engine("sqlite:///file:path/to/database?mode=ro&uri=true") - -.. note::  The "uri=true" parameter must appear in the **query string** -   of the URL.  It will not currently work as expected if it is only -   present in the :paramref:`_sa.create_engine.connect_args` -   parameter dictionary. - -The logic reconciles the simultaneous presence of SQLAlchemy's query string and -SQLite's query string by separating out the parameters that belong to the -Python sqlite3 driver vs. those that belong to the SQLite URI.  This is -achieved through the use of a fixed list of parameters known to be accepted by -the Python side of the driver.  For example, to include a URL that indicates -the Python sqlite3 "timeout" and "check_same_thread" parameters, along with the -SQLite "mode" and "nolock" parameters, they can all be passed together on the -query string:: - -    e = create_engine( -        "sqlite:///file:path/to/database?" -        "check_same_thread=true&timeout=10&mode=ro&nolock=1&uri=true" -    ) - -Above, the pysqlite / sqlite3 DBAPI would be passed arguments as:: - -    sqlite3.connect( -        "file:path/to/database?mode=ro&nolock=1", -        check_same_thread=True, timeout=10, uri=True -    ) - -Regarding future parameters added to either the Python or native drivers. new -parameter names added to the SQLite URI scheme should be automatically -accommodated by this scheme.  New parameter names added to the Python driver -side can be accommodated by specifying them in the -:paramref:`_sa.create_engine.connect_args` dictionary, -until dialect support is -added by SQLAlchemy.   For the less likely case that the native SQLite driver -adds a new parameter name that overlaps with one of the existing, known Python -driver parameters (such as "timeout" perhaps), SQLAlchemy's dialect would -require adjustment for the URL scheme to continue to support this. - -As is always the case for all SQLAlchemy dialects, the entire "URL" process -can be bypassed in :func:`_sa.create_engine` through the use of the -:paramref:`_sa.create_engine.creator` -parameter which allows for a custom callable -that creates a Python sqlite3 driver level connection directly. - -.. versionadded:: 1.3.9 - -.. seealso:: - -    `Uniform Resource Identifiers <https://www.sqlite.org/uri.html>`_ - in -    the SQLite documentation - -.. _pysqlite_regexp: - -Regular Expression Support ---------------------------- - -.. versionadded:: 1.4 - -Support for the :meth:`_sql.ColumnOperators.regexp_match` operator is provided -using Python's re.search_ function.  SQLite itself does not include a working -regular expression operator; instead, it includes a non-implemented placeholder -operator ``REGEXP`` that calls a user-defined function that must be provided. - -SQLAlchemy's implementation makes use of the pysqlite create_function_ hook -as follows:: - - -    def regexp(a, b): -        return re.search(a, b) is not None - -    sqlite_connection.create_function( -        "regexp", 2, regexp, -    ) - -There is currently no support for regular expression flags as a separate -argument, as these are not supported by SQLite's REGEXP operator, however these -may be included inline within the regular expression string.  See `Python regular expressions`_ for -details. - -.. seealso:: - -    `Python regular expressions`_: Documentation for Python's regular expression syntax. - -.. _create_function: https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function - -.. _re.search: https://docs.python.org/3/library/re.html#re.search - -.. _Python regular expressions: https://docs.python.org/3/library/re.html#re.search - - - -Compatibility with sqlite3 "native" date and datetime types ------------------------------------------------------------ - -The pysqlite driver includes the sqlite3.PARSE_DECLTYPES and -sqlite3.PARSE_COLNAMES options, which have the effect of any column -or expression explicitly cast as "date" or "timestamp" will be converted -to a Python date or datetime object.  The date and datetime types provided -with the pysqlite dialect are not currently compatible with these options, -since they render the ISO date/datetime including microseconds, which -pysqlite's driver does not.   Additionally, SQLAlchemy does not at -this time automatically render the "cast" syntax required for the -freestanding functions "current_timestamp" and "current_date" to return -datetime/date types natively.   Unfortunately, pysqlite -does not provide the standard DBAPI types in ``cursor.description``, -leaving SQLAlchemy with no way to detect these types on the fly -without expensive per-row type checks. - -Keeping in mind that pysqlite's parsing option is not recommended, -nor should be necessary, for use with SQLAlchemy, usage of PARSE_DECLTYPES -can be forced if one configures "native_datetime=True" on create_engine():: - -    engine = create_engine('sqlite://', -        connect_args={'detect_types': -            sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES}, -        native_datetime=True -    ) - -With this flag enabled, the DATE and TIMESTAMP types (but note - not the -DATETIME or TIME types...confused yet ?) will not perform any bind parameter -or result processing. Execution of "func.current_date()" will return a string. -"func.current_timestamp()" is registered as returning a DATETIME type in -SQLAlchemy, so this function still receives SQLAlchemy-level result -processing. - -.. _pysqlite_threading_pooling: - -Threading/Pooling Behavior ---------------------------- - -The ``sqlite3`` DBAPI by default prohibits the use of a particular connection -in a thread which is not the one in which it was created.  As SQLite has -matured, it's behavior under multiple threads has improved, and even includes -options for memory only databases to be used in multiple threads. - -The thread prohibition is known as "check same thread" and may be controlled -using the ``sqlite3`` parameter ``check_same_thread``, which will disable or -enable this check. SQLAlchemy's default behavior here is to set -``check_same_thread`` to ``False`` automatically whenever a file-based database -is in use, to establish compatibility with the default pool class -:class:`.QueuePool`. - -The SQLAlchemy ``pysqlite`` DBAPI establishes the connection pool differently -based on the kind of SQLite database that's requested: - -* When a ``:memory:`` SQLite database is specified, the dialect by default -  will use :class:`.SingletonThreadPool`. This pool maintains a single -  connection per thread, so that all access to the engine within the current -  thread use the same ``:memory:`` database - other threads would access a -  different ``:memory:`` database.  The ``check_same_thread`` parameter -  defaults to ``True``. -* When a file-based database is specified, the dialect will use -  :class:`.QueuePool` as the source of connections.   at the same time, -  the ``check_same_thread`` flag is set to False by default unless overridden. - -  .. versionchanged:: 2.0 - -    SQLite file database engines now use :class:`.QueuePool` by default. -    Previously, :class:`.NullPool` were used.  The :class:`.NullPool` class -    may be used by specifying it via the -    :paramref:`_sa.create_engine.poolclass` parameter. - -Disabling Connection Pooling for File Databases -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Pooling may be disabled for a file based database by specifying the -:class:`.NullPool` implementation for the :func:`_sa.create_engine.poolclass` -parameter:: - -    from sqlalchemy import NullPool -    engine = create_engine("sqlite:///myfile.db", poolclass=NullPool) - -It's been observed that the :class:`.NullPool` implementation incurs an -extremely small performance overhead for repeated checkouts due to the lack of -connection re-use implemented by :class:`.QueuePool`.  However, it still -may be beneficial to use this class if the application is experiencing -issues with files being locked. - -Using a Memory Database in Multiple Threads -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -To use a ``:memory:`` database in a multithreaded scenario, the same -connection object must be shared among threads, since the database exists -only within the scope of that connection.   The -:class:`.StaticPool` implementation will maintain a single connection -globally, and the ``check_same_thread`` flag can be passed to Pysqlite -as ``False``:: - -    from sqlalchemy.pool import StaticPool -    engine = create_engine('sqlite://', -                        connect_args={'check_same_thread':False}, -                        poolclass=StaticPool) - -Note that using a ``:memory:`` database in multiple threads requires a recent -version of SQLite. - -Using Temporary Tables with SQLite -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Due to the way SQLite deals with temporary tables, if you wish to use a -temporary table in a file-based SQLite database across multiple checkouts -from the connection pool, such as when using an ORM :class:`.Session` where -the temporary table should continue to remain after :meth:`.Session.commit` or -:meth:`.Session.rollback` is called, a pool which maintains a single -connection must be used.   Use :class:`.SingletonThreadPool` if the scope is -only needed within the current thread, or :class:`.StaticPool` is scope is -needed within multiple threads for this case:: - -    # maintain the same connection per thread -    from sqlalchemy.pool import SingletonThreadPool -    engine = create_engine('sqlite:///mydb.db', -                        poolclass=SingletonThreadPool) - - -    # maintain the same connection across all threads -    from sqlalchemy.pool import StaticPool -    engine = create_engine('sqlite:///mydb.db', -                        poolclass=StaticPool) - -Note that :class:`.SingletonThreadPool` should be configured for the number -of threads that are to be used; beyond that number, connections will be -closed out in a non deterministic way. - - -Dealing with Mixed String / Binary Columns ------------------------------------------------------- - -The SQLite database is weakly typed, and as such it is possible when using -binary values, which in Python are represented as ``b'some string'``, that a -particular SQLite database can have data values within different rows where -some of them will be returned as a ``b''`` value by the Pysqlite driver, and -others will be returned as Python strings, e.g. ``''`` values.   This situation -is not known to occur if the SQLAlchemy :class:`.LargeBinary` datatype is used -consistently, however if a particular SQLite database has data that was -inserted using the Pysqlite driver directly, or when using the SQLAlchemy -:class:`.String` type which was later changed to :class:`.LargeBinary`, the -table will not be consistently readable because SQLAlchemy's -:class:`.LargeBinary` datatype does not handle strings so it has no way of -"encoding" a value that is in string format. - -To deal with a SQLite table that has mixed string / binary data in the -same column, use a custom type that will check each row individually:: - -    from sqlalchemy import String -    from sqlalchemy import TypeDecorator - -    class MixedBinary(TypeDecorator): -        impl = String -        cache_ok = True - -        def process_result_value(self, value, dialect): -            if isinstance(value, str): -                value = bytes(value, 'utf-8') -            elif value is not None: -                value = bytes(value) - -            return value - -Then use the above ``MixedBinary`` datatype in the place where -:class:`.LargeBinary` would normally be used. - -.. _pysqlite_serializable: - -Serializable isolation / Savepoints / Transactional DDL -------------------------------------------------------- - -In the section :ref:`sqlite_concurrency`, we refer to the pysqlite -driver's assortment of issues that prevent several features of SQLite -from working correctly.  The pysqlite DBAPI driver has several -long-standing bugs which impact the correctness of its transactional -behavior.   In its default mode of operation, SQLite features such as -SERIALIZABLE isolation, transactional DDL, and SAVEPOINT support are -non-functional, and in order to use these features, workarounds must -be taken. - -The issue is essentially that the driver attempts to second-guess the user's -intent, failing to start transactions and sometimes ending them prematurely, in -an effort to minimize the SQLite databases's file locking behavior, even -though SQLite itself uses "shared" locks for read-only activities. - -SQLAlchemy chooses to not alter this behavior by default, as it is the -long-expected behavior of the pysqlite driver; if and when the pysqlite -driver attempts to repair these issues, that will be more of a driver towards -defaults for SQLAlchemy. - -The good news is that with a few events, we can implement transactional -support fully, by disabling pysqlite's feature entirely and emitting BEGIN -ourselves. This is achieved using two event listeners:: - -    from sqlalchemy import create_engine, event - -    engine = create_engine("sqlite:///myfile.db") - -    @event.listens_for(engine, "connect") -    def do_connect(dbapi_connection, connection_record): -        # disable pysqlite's emitting of the BEGIN statement entirely. -        # also stops it from emitting COMMIT before any DDL. -        dbapi_connection.isolation_level = None - -    @event.listens_for(engine, "begin") -    def do_begin(conn): -        # emit our own BEGIN -        conn.exec_driver_sql("BEGIN") - -.. warning:: When using the above recipe, it is advised to not use the -   :paramref:`.Connection.execution_options.isolation_level` setting on -   :class:`_engine.Connection` and :func:`_sa.create_engine` -   with the SQLite driver, -   as this function necessarily will also alter the ".isolation_level" setting. - - -Above, we intercept a new pysqlite connection and disable any transactional -integration.   Then, at the point at which SQLAlchemy knows that transaction -scope is to begin, we emit ``"BEGIN"`` ourselves. - -When we take control of ``"BEGIN"``, we can also control directly SQLite's -locking modes, introduced at -`BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_, -by adding the desired locking mode to our ``"BEGIN"``:: - -    @event.listens_for(engine, "begin") -    def do_begin(conn): -        conn.exec_driver_sql("BEGIN EXCLUSIVE") - -.. seealso:: - -    `BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_ - -    on the SQLite site - -    `sqlite3 SELECT does not BEGIN a transaction <https://bugs.python.org/issue9924>`_ - -    on the Python bug tracker - -    `sqlite3 module breaks transactions and potentially corrupts data <https://bugs.python.org/issue10740>`_ - -    on the Python bug tracker - -.. _pysqlite_udfs: - -User-Defined Functions ----------------------- - -pysqlite supports a `create_function() <https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function>`_ -method that allows us to create our own user-defined functions (UDFs) in Python and use them directly in SQLite queries. -These functions are registered with a specific DBAPI Connection. - -SQLAlchemy uses connection pooling with file-based SQLite databases, so we need to ensure that the UDF is attached to the -connection when it is created. That is accomplished with an event listener:: - -    from sqlalchemy import create_engine -    from sqlalchemy import event -    from sqlalchemy import text - - -    def udf(): -        return "udf-ok" - - -    engine = create_engine("sqlite:///./db_file") - - -    @event.listens_for(engine, "connect") -    def connect(conn, rec): -        conn.create_function("udf", 0, udf) - - -    for i in range(5): -        with engine.connect() as conn: -            print(conn.scalar(text("SELECT UDF()"))) - - -"""  # noqa - -import math -import os -import re - -from .base import DATE -from .base import DATETIME -from .base import SQLiteDialect -from ... import exc -from ... import pool -from ... import types as sqltypes -from ... import util - - -class _SQLite_pysqliteTimeStamp(DATETIME): -    def bind_processor(self, dialect): -        if dialect.native_datetime: -            return None -        else: -            return DATETIME.bind_processor(self, dialect) - -    def result_processor(self, dialect, coltype): -        if dialect.native_datetime: -            return None -        else: -            return DATETIME.result_processor(self, dialect, coltype) - - -class _SQLite_pysqliteDate(DATE): -    def bind_processor(self, dialect): -        if dialect.native_datetime: -            return None -        else: -            return DATE.bind_processor(self, dialect) - -    def result_processor(self, dialect, coltype): -        if dialect.native_datetime: -            return None -        else: -            return DATE.result_processor(self, dialect, coltype) - - -class SQLiteDialect_pysqlite(SQLiteDialect): -    default_paramstyle = "qmark" -    supports_statement_cache = True -    returns_native_bytes = True - -    colspecs = util.update_copy( -        SQLiteDialect.colspecs, -        { -            sqltypes.Date: _SQLite_pysqliteDate, -            sqltypes.TIMESTAMP: _SQLite_pysqliteTimeStamp, -        }, -    ) - -    description_encoding = None - -    driver = "pysqlite" - -    @classmethod -    def import_dbapi(cls): -        from sqlite3 import dbapi2 as sqlite - -        return sqlite - -    @classmethod -    def _is_url_file_db(cls, url): -        if (url.database and url.database != ":memory:") and ( -            url.query.get("mode", None) != "memory" -        ): -            return True -        else: -            return False - -    @classmethod -    def get_pool_class(cls, url): -        if cls._is_url_file_db(url): -            return pool.QueuePool -        else: -            return pool.SingletonThreadPool - -    def _get_server_version_info(self, connection): -        return self.dbapi.sqlite_version_info - -    _isolation_lookup = SQLiteDialect._isolation_lookup.union( -        { -            "AUTOCOMMIT": None, -        } -    ) - -    def set_isolation_level(self, dbapi_connection, level): -        if level == "AUTOCOMMIT": -            dbapi_connection.isolation_level = None -        else: -            dbapi_connection.isolation_level = "" -            return super().set_isolation_level(dbapi_connection, level) - -    def on_connect(self): -        def regexp(a, b): -            if b is None: -                return None -            return re.search(a, b) is not None - -        if util.py38 and self._get_server_version_info(None) >= (3, 9): -            # sqlite must be greater than 3.8.3 for deterministic=True -            # https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function -            # the check is more conservative since there were still issues -            # with following 3.8 sqlite versions -            create_func_kw = {"deterministic": True} -        else: -            create_func_kw = {} - -        def set_regexp(dbapi_connection): -            dbapi_connection.create_function( -                "regexp", 2, regexp, **create_func_kw -            ) - -        def floor_func(dbapi_connection): -            # NOTE: floor is optionally present in sqlite 3.35+ , however -            # as it is normally non-present we deliver floor() unconditionally -            # for now. -            # https://www.sqlite.org/lang_mathfunc.html -            dbapi_connection.create_function( -                "floor", 1, math.floor, **create_func_kw -            ) - -        fns = [set_regexp, floor_func] - -        def connect(conn): -            for fn in fns: -                fn(conn) - -        return connect - -    def create_connect_args(self, url): -        if url.username or url.password or url.host or url.port: -            raise exc.ArgumentError( -                "Invalid SQLite URL: %s\n" -                "Valid SQLite URL forms are:\n" -                " sqlite:///:memory: (or, sqlite://)\n" -                " sqlite:///relative/path/to/file.db\n" -                " sqlite:////absolute/path/to/file.db" % (url,) -            ) - -        # theoretically, this list can be augmented, at least as far as -        # parameter names accepted by sqlite3/pysqlite, using -        # inspect.getfullargspec().  for the moment this seems like overkill -        # as these parameters don't change very often, and as always, -        # parameters passed to connect_args will always go to the -        # sqlite3/pysqlite driver. -        pysqlite_args = [ -            ("uri", bool), -            ("timeout", float), -            ("isolation_level", str), -            ("detect_types", int), -            ("check_same_thread", bool), -            ("cached_statements", int), -        ] -        opts = url.query -        pysqlite_opts = {} -        for key, type_ in pysqlite_args: -            util.coerce_kw_type(opts, key, type_, dest=pysqlite_opts) - -        if pysqlite_opts.get("uri", False): -            uri_opts = dict(opts) -            # here, we are actually separating the parameters that go to -            # sqlite3/pysqlite vs. those that go the SQLite URI.  What if -            # two names conflict?  again, this seems to be not the case right -            # now, and in the case that new names are added to -            # either side which overlap, again the sqlite3/pysqlite parameters -            # can be passed through connect_args instead of in the URL. -            # If SQLite native URIs add a parameter like "timeout" that -            # we already have listed here for the python driver, then we need -            # to adjust for that here. -            for key, type_ in pysqlite_args: -                uri_opts.pop(key, None) -            filename = url.database -            if uri_opts: -                # sorting of keys is for unit test support -                filename += "?" + ( -                    "&".join( -                        "%s=%s" % (key, uri_opts[key]) -                        for key in sorted(uri_opts) -                    ) -                ) -        else: -            filename = url.database or ":memory:" -            if filename != ":memory:": -                filename = os.path.abspath(filename) - -        pysqlite_opts.setdefault( -            "check_same_thread", not self._is_url_file_db(url) -        ) - -        return ([filename], pysqlite_opts) - -    def is_disconnect(self, e, connection, cursor): -        return isinstance( -            e, self.dbapi.ProgrammingError -        ) and "Cannot operate on a closed database." in str(e) - - -dialect = SQLiteDialect_pysqlite - - -class _SQLiteDialect_pysqlite_numeric(SQLiteDialect_pysqlite): -    """numeric dialect for testing only - -    internal use only.  This dialect is **NOT** supported by SQLAlchemy -    and may change at any time. - -    """ - -    supports_statement_cache = True -    default_paramstyle = "numeric" -    driver = "pysqlite_numeric" - -    _first_bind = ":1" -    _not_in_statement_regexp = None - -    def __init__(self, *arg, **kw): -        kw.setdefault("paramstyle", "numeric") -        super().__init__(*arg, **kw) - -    def create_connect_args(self, url): -        arg, opts = super().create_connect_args(url) -        opts["factory"] = self._fix_sqlite_issue_99953() -        return arg, opts - -    def _fix_sqlite_issue_99953(self): -        import sqlite3 - -        first_bind = self._first_bind -        if self._not_in_statement_regexp: -            nis = self._not_in_statement_regexp - -            def _test_sql(sql): -                m = nis.search(sql) -                assert not m, f"Found {nis.pattern!r} in {sql!r}" - -        else: - -            def _test_sql(sql): -                pass - -        def _numeric_param_as_dict(parameters): -            if parameters: -                assert isinstance(parameters, tuple) -                return { -                    str(idx): value for idx, value in enumerate(parameters, 1) -                } -            else: -                return () - -        class SQLiteFix99953Cursor(sqlite3.Cursor): -            def execute(self, sql, parameters=()): -                _test_sql(sql) -                if first_bind in sql: -                    parameters = _numeric_param_as_dict(parameters) -                return super().execute(sql, parameters) - -            def executemany(self, sql, parameters): -                _test_sql(sql) -                if first_bind in sql: -                    parameters = [ -                        _numeric_param_as_dict(p) for p in parameters -                    ] -                return super().executemany(sql, parameters) - -        class SQLiteFix99953Connection(sqlite3.Connection): -            def cursor(self, factory=None): -                if factory is None: -                    factory = SQLiteFix99953Cursor -                return super().cursor(factory=factory) - -            def execute(self, sql, parameters=()): -                _test_sql(sql) -                if first_bind in sql: -                    parameters = _numeric_param_as_dict(parameters) -                return super().execute(sql, parameters) - -            def executemany(self, sql, parameters): -                _test_sql(sql) -                if first_bind in sql: -                    parameters = [ -                        _numeric_param_as_dict(p) for p in parameters -                    ] -                return super().executemany(sql, parameters) - -        return SQLiteFix99953Connection - - -class _SQLiteDialect_pysqlite_dollar(_SQLiteDialect_pysqlite_numeric): -    """numeric dialect that uses $ for testing only - -    internal use only.  This dialect is **NOT** supported by SQLAlchemy -    and may change at any time. - -    """ - -    supports_statement_cache = True -    default_paramstyle = "numeric_dollar" -    driver = "pysqlite_dollar" - -    _first_bind = "$1" -    _not_in_statement_regexp = re.compile(r"[^\d]:\d+") - -    def __init__(self, *arg, **kw): -        kw.setdefault("paramstyle", "numeric_dollar") -        super().__init__(*arg, **kw) | 
