diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite')
16 files changed, 0 insertions, 4676 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py deleted file mode 100644 index 45f088e..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py +++ /dev/null @@ -1,57 +0,0 @@ -# dialects/sqlite/__init__.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -from . import aiosqlite # noqa -from . import base # noqa -from . import pysqlcipher # noqa -from . import pysqlite # noqa -from .base import BLOB -from .base import BOOLEAN -from .base import CHAR -from .base import DATE -from .base import DATETIME -from .base import DECIMAL -from .base import FLOAT -from .base import INTEGER -from .base import JSON -from .base import NUMERIC -from .base import REAL -from .base import SMALLINT -from .base import TEXT -from .base import TIME -from .base import TIMESTAMP -from .base import VARCHAR -from .dml import Insert -from .dml import insert - -# default dialect -base.dialect = dialect = pysqlite.dialect - - -__all__ = ( - "BLOB", - "BOOLEAN", - "CHAR", - "DATE", - "DATETIME", - "DECIMAL", - "FLOAT", - "INTEGER", - "JSON", - "NUMERIC", - "SMALLINT", - "TEXT", - "TIME", - "TIMESTAMP", - "VARCHAR", - "REAL", - "Insert", - "insert", - "dialect", -) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pyc Binary files differdeleted file mode 100644 index e4a9b51..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pyc Binary files differdeleted file mode 100644 index 41466a4..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pyc Binary files differdeleted file mode 100644 index e7f5c22..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pyc Binary files differdeleted file mode 100644 index eb0f448..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pyc Binary files differdeleted file mode 100644 index ad4323c..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pyc Binary files differdeleted file mode 100644 index d139ba3..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pyc Binary files differdeleted file mode 100644 index d26e7b3..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pyc Binary files differdeleted file mode 100644 index df08288..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py deleted file mode 100644 index 6c91563..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py +++ /dev/null @@ -1,396 +0,0 @@ -# dialects/sqlite/aiosqlite.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -r""" - -.. dialect:: sqlite+aiosqlite - :name: aiosqlite - :dbapi: aiosqlite - :connectstring: sqlite+aiosqlite:///file_path - :url: https://pypi.org/project/aiosqlite/ - -The aiosqlite dialect provides support for the SQLAlchemy asyncio interface -running on top of pysqlite. - -aiosqlite is a wrapper around pysqlite that uses a background thread for -each connection. It does not actually use non-blocking IO, as SQLite -databases are not socket-based. However it does provide a working asyncio -interface that's useful for testing and prototyping purposes. - -Using a special asyncio mediation layer, the aiosqlite dialect is usable -as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` -extension package. - -This dialect should normally be used only with the -:func:`_asyncio.create_async_engine` engine creation function:: - - from sqlalchemy.ext.asyncio import create_async_engine - engine = create_async_engine("sqlite+aiosqlite:///filename") - -The URL passes through all arguments to the ``pysqlite`` driver, so all -connection arguments are the same as they are for that of :ref:`pysqlite`. - -.. _aiosqlite_udfs: - -User-Defined Functions ----------------------- - -aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs) -in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`. - -.. _aiosqlite_serializable: - -Serializable isolation / Savepoints / Transactional DDL (asyncio version) -------------------------------------------------------------------------- - -Similarly to pysqlite, aiosqlite does not support SAVEPOINT feature. - -The solution is similar to :ref:`pysqlite_serializable`. This is achieved by the event listeners in async:: - - from sqlalchemy import create_engine, event - from sqlalchemy.ext.asyncio import create_async_engine - - engine = create_async_engine("sqlite+aiosqlite:///myfile.db") - - @event.listens_for(engine.sync_engine, "connect") - def do_connect(dbapi_connection, connection_record): - # disable aiosqlite's emitting of the BEGIN statement entirely. - # also stops it from emitting COMMIT before any DDL. - dbapi_connection.isolation_level = None - - @event.listens_for(engine.sync_engine, "begin") - def do_begin(conn): - # emit our own BEGIN - conn.exec_driver_sql("BEGIN") - -.. warning:: When using the above recipe, it is advised to not use the - :paramref:`.Connection.execution_options.isolation_level` setting on - :class:`_engine.Connection` and :func:`_sa.create_engine` - with the SQLite driver, - as this function necessarily will also alter the ".isolation_level" setting. - -""" # noqa - -import asyncio -from functools import partial - -from .base import SQLiteExecutionContext -from .pysqlite import SQLiteDialect_pysqlite -from ... import pool -from ... import util -from ...engine import AdaptedConnection -from ...util.concurrency import await_fallback -from ...util.concurrency import await_only - - -class AsyncAdapt_aiosqlite_cursor: - # TODO: base on connectors/asyncio.py - # see #10415 - - __slots__ = ( - "_adapt_connection", - "_connection", - "description", - "await_", - "_rows", - "arraysize", - "rowcount", - "lastrowid", - ) - - server_side = False - - def __init__(self, adapt_connection): - self._adapt_connection = adapt_connection - self._connection = adapt_connection._connection - self.await_ = adapt_connection.await_ - self.arraysize = 1 - self.rowcount = -1 - self.description = None - self._rows = [] - - def close(self): - self._rows[:] = [] - - def execute(self, operation, parameters=None): - try: - _cursor = self.await_(self._connection.cursor()) - - if parameters is None: - self.await_(_cursor.execute(operation)) - else: - self.await_(_cursor.execute(operation, parameters)) - - if _cursor.description: - self.description = _cursor.description - self.lastrowid = self.rowcount = -1 - - if not self.server_side: - self._rows = self.await_(_cursor.fetchall()) - else: - self.description = None - self.lastrowid = _cursor.lastrowid - self.rowcount = _cursor.rowcount - - if not self.server_side: - self.await_(_cursor.close()) - else: - self._cursor = _cursor - except Exception as error: - self._adapt_connection._handle_exception(error) - - def executemany(self, operation, seq_of_parameters): - try: - _cursor = self.await_(self._connection.cursor()) - self.await_(_cursor.executemany(operation, seq_of_parameters)) - self.description = None - self.lastrowid = _cursor.lastrowid - self.rowcount = _cursor.rowcount - self.await_(_cursor.close()) - except Exception as error: - self._adapt_connection._handle_exception(error) - - def setinputsizes(self, *inputsizes): - pass - - def __iter__(self): - while self._rows: - yield self._rows.pop(0) - - def fetchone(self): - if self._rows: - return self._rows.pop(0) - else: - return None - - def fetchmany(self, size=None): - if size is None: - size = self.arraysize - - retval = self._rows[0:size] - self._rows[:] = self._rows[size:] - return retval - - def fetchall(self): - retval = self._rows[:] - self._rows[:] = [] - return retval - - -class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor): - # TODO: base on connectors/asyncio.py - # see #10415 - __slots__ = "_cursor" - - server_side = True - - def __init__(self, *arg, **kw): - super().__init__(*arg, **kw) - self._cursor = None - - def close(self): - if self._cursor is not None: - self.await_(self._cursor.close()) - self._cursor = None - - def fetchone(self): - return self.await_(self._cursor.fetchone()) - - def fetchmany(self, size=None): - if size is None: - size = self.arraysize - return self.await_(self._cursor.fetchmany(size=size)) - - def fetchall(self): - return self.await_(self._cursor.fetchall()) - - -class AsyncAdapt_aiosqlite_connection(AdaptedConnection): - await_ = staticmethod(await_only) - __slots__ = ("dbapi",) - - def __init__(self, dbapi, connection): - self.dbapi = dbapi - self._connection = connection - - @property - def isolation_level(self): - return self._connection.isolation_level - - @isolation_level.setter - def isolation_level(self, value): - # aiosqlite's isolation_level setter works outside the Thread - # that it's supposed to, necessitating setting check_same_thread=False. - # for improved stability, we instead invent our own awaitable version - # using aiosqlite's async queue directly. - - def set_iso(connection, value): - connection.isolation_level = value - - function = partial(set_iso, self._connection._conn, value) - future = asyncio.get_event_loop().create_future() - - self._connection._tx.put_nowait((future, function)) - - try: - return self.await_(future) - except Exception as error: - self._handle_exception(error) - - def create_function(self, *args, **kw): - try: - self.await_(self._connection.create_function(*args, **kw)) - except Exception as error: - self._handle_exception(error) - - def cursor(self, server_side=False): - if server_side: - return AsyncAdapt_aiosqlite_ss_cursor(self) - else: - return AsyncAdapt_aiosqlite_cursor(self) - - def execute(self, *args, **kw): - return self.await_(self._connection.execute(*args, **kw)) - - def rollback(self): - try: - self.await_(self._connection.rollback()) - except Exception as error: - self._handle_exception(error) - - def commit(self): - try: - self.await_(self._connection.commit()) - except Exception as error: - self._handle_exception(error) - - def close(self): - try: - self.await_(self._connection.close()) - except ValueError: - # this is undocumented for aiosqlite, that ValueError - # was raised if .close() was called more than once, which is - # both not customary for DBAPI and is also not a DBAPI.Error - # exception. This is now fixed in aiosqlite via my PR - # https://github.com/omnilib/aiosqlite/pull/238, so we can be - # assured this will not become some other kind of exception, - # since it doesn't raise anymore. - - pass - except Exception as error: - self._handle_exception(error) - - def _handle_exception(self, error): - if ( - isinstance(error, ValueError) - and error.args[0] == "no active connection" - ): - raise self.dbapi.sqlite.OperationalError( - "no active connection" - ) from error - else: - raise error - - -class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection): - __slots__ = () - - await_ = staticmethod(await_fallback) - - -class AsyncAdapt_aiosqlite_dbapi: - def __init__(self, aiosqlite, sqlite): - self.aiosqlite = aiosqlite - self.sqlite = sqlite - self.paramstyle = "qmark" - self._init_dbapi_attributes() - - def _init_dbapi_attributes(self): - for name in ( - "DatabaseError", - "Error", - "IntegrityError", - "NotSupportedError", - "OperationalError", - "ProgrammingError", - "sqlite_version", - "sqlite_version_info", - ): - setattr(self, name, getattr(self.aiosqlite, name)) - - for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"): - setattr(self, name, getattr(self.sqlite, name)) - - for name in ("Binary",): - setattr(self, name, getattr(self.sqlite, name)) - - def connect(self, *arg, **kw): - async_fallback = kw.pop("async_fallback", False) - - creator_fn = kw.pop("async_creator_fn", None) - if creator_fn: - connection = creator_fn(*arg, **kw) - else: - connection = self.aiosqlite.connect(*arg, **kw) - # it's a Thread. you'll thank us later - connection.daemon = True - - if util.asbool(async_fallback): - return AsyncAdaptFallback_aiosqlite_connection( - self, - await_fallback(connection), - ) - else: - return AsyncAdapt_aiosqlite_connection( - self, - await_only(connection), - ) - - -class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext): - def create_server_side_cursor(self): - return self._dbapi_connection.cursor(server_side=True) - - -class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite): - driver = "aiosqlite" - supports_statement_cache = True - - is_async = True - - supports_server_side_cursors = True - - execution_ctx_cls = SQLiteExecutionContext_aiosqlite - - @classmethod - def import_dbapi(cls): - return AsyncAdapt_aiosqlite_dbapi( - __import__("aiosqlite"), __import__("sqlite3") - ) - - @classmethod - def get_pool_class(cls, url): - if cls._is_url_file_db(url): - return pool.NullPool - else: - return pool.StaticPool - - def is_disconnect(self, e, connection, cursor): - if isinstance( - e, self.dbapi.OperationalError - ) and "no active connection" in str(e): - return True - - return super().is_disconnect(e, connection, cursor) - - def get_driver_connection(self, connection): - return connection._connection - - -dialect = SQLiteDialect_aiosqlite diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py deleted file mode 100644 index 6db8214..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py +++ /dev/null @@ -1,2782 +0,0 @@ -# dialects/sqlite/base.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -r""" -.. dialect:: sqlite - :name: SQLite - :full_support: 3.36.0 - :normal_support: 3.12+ - :best_effort: 3.7.16+ - -.. _sqlite_datetime: - -Date and Time Types -------------------- - -SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does -not provide out of the box functionality for translating values between Python -`datetime` objects and a SQLite-supported format. SQLAlchemy's own -:class:`~sqlalchemy.types.DateTime` and related types provide date formatting -and parsing functionality when SQLite is used. The implementation classes are -:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`. -These types represent dates and times as ISO formatted strings, which also -nicely support ordering. There's no reliance on typical "libc" internals for -these functions so historical dates are fully supported. - -Ensuring Text affinity -^^^^^^^^^^^^^^^^^^^^^^ - -The DDL rendered for these types is the standard ``DATE``, ``TIME`` -and ``DATETIME`` indicators. However, custom storage formats can also be -applied to these types. When the -storage format is detected as containing no alpha characters, the DDL for -these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``, -so that the column continues to have textual affinity. - -.. seealso:: - - `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ - - in the SQLite documentation - -.. _sqlite_autoincrement: - -SQLite Auto Incrementing Behavior ----------------------------------- - -Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html - -Key concepts: - -* SQLite has an implicit "auto increment" feature that takes place for any - non-composite primary-key column that is specifically created using - "INTEGER PRIMARY KEY" for the type + primary key. - -* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not** - equivalent to the implicit autoincrement feature; this keyword is not - recommended for general use. SQLAlchemy does not render this keyword - unless a special SQLite-specific directive is used (see below). However, - it still requires that the column's type is named "INTEGER". - -Using the AUTOINCREMENT Keyword -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -To specifically render the AUTOINCREMENT keyword on the primary key column -when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table -construct:: - - Table('sometable', metadata, - Column('id', Integer, primary_key=True), - sqlite_autoincrement=True) - -Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -SQLite's typing model is based on naming conventions. Among other things, this -means that any type name which contains the substring ``"INT"`` will be -determined to be of "integer affinity". A type named ``"BIGINT"``, -``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be -of "integer" affinity. However, **the SQLite autoincrement feature, whether -implicitly or explicitly enabled, requires that the name of the column's type -is exactly the string "INTEGER"**. Therefore, if an application uses a type -like :class:`.BigInteger` for a primary key, on SQLite this type will need to -be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE -TABLE`` statement in order for the autoincrement behavior to be available. - -One approach to achieve this is to use :class:`.Integer` on SQLite -only using :meth:`.TypeEngine.with_variant`:: - - table = Table( - "my_table", metadata, - Column("id", BigInteger().with_variant(Integer, "sqlite"), primary_key=True) - ) - -Another is to use a subclass of :class:`.BigInteger` that overrides its DDL -name to be ``INTEGER`` when compiled against SQLite:: - - from sqlalchemy import BigInteger - from sqlalchemy.ext.compiler import compiles - - class SLBigInteger(BigInteger): - pass - - @compiles(SLBigInteger, 'sqlite') - def bi_c(element, compiler, **kw): - return "INTEGER" - - @compiles(SLBigInteger) - def bi_c(element, compiler, **kw): - return compiler.visit_BIGINT(element, **kw) - - - table = Table( - "my_table", metadata, - Column("id", SLBigInteger(), primary_key=True) - ) - -.. seealso:: - - :meth:`.TypeEngine.with_variant` - - :ref:`sqlalchemy.ext.compiler_toplevel` - - `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_ - -.. _sqlite_concurrency: - -Database Locking Behavior / Concurrency ---------------------------------------- - -SQLite is not designed for a high level of write concurrency. The database -itself, being a file, is locked completely during write operations within -transactions, meaning exactly one "connection" (in reality a file handle) -has exclusive access to the database during this period - all other -"connections" will be blocked during this time. - -The Python DBAPI specification also calls for a connection model that is -always in a transaction; there is no ``connection.begin()`` method, -only ``connection.commit()`` and ``connection.rollback()``, upon which a -new transaction is to be begun immediately. This may seem to imply -that the SQLite driver would in theory allow only a single filehandle on a -particular database file at any time; however, there are several -factors both within SQLite itself as well as within the pysqlite driver -which loosen this restriction significantly. - -However, no matter what locking modes are used, SQLite will still always -lock the database file once a transaction is started and DML (e.g. INSERT, -UPDATE, DELETE) has at least been emitted, and this will block -other transactions at least at the point that they also attempt to emit DML. -By default, the length of time on this block is very short before it times out -with an error. - -This behavior becomes more critical when used in conjunction with the -SQLAlchemy ORM. SQLAlchemy's :class:`.Session` object by default runs -within a transaction, and with its autoflush model, may emit DML preceding -any SELECT statement. This may lead to a SQLite database that locks -more quickly than is expected. The locking mode of SQLite and the pysqlite -driver can be manipulated to some degree, however it should be noted that -achieving a high degree of write-concurrency with SQLite is a losing battle. - -For more information on SQLite's lack of write concurrency by design, please -see -`Situations Where Another RDBMS May Work Better - High Concurrency -<https://www.sqlite.org/whentouse.html>`_ near the bottom of the page. - -The following subsections introduce areas that are impacted by SQLite's -file-based architecture and additionally will usually require workarounds to -work when using the pysqlite driver. - -.. _sqlite_isolation_level: - -Transaction Isolation Level / Autocommit ----------------------------------------- - -SQLite supports "transaction isolation" in a non-standard way, along two -axes. One is that of the -`PRAGMA read_uncommitted <https://www.sqlite.org/pragma.html#pragma_read_uncommitted>`_ -instruction. This setting can essentially switch SQLite between its -default mode of ``SERIALIZABLE`` isolation, and a "dirty read" isolation -mode normally referred to as ``READ UNCOMMITTED``. - -SQLAlchemy ties into this PRAGMA statement using the -:paramref:`_sa.create_engine.isolation_level` parameter of -:func:`_sa.create_engine`. -Valid values for this parameter when used with SQLite are ``"SERIALIZABLE"`` -and ``"READ UNCOMMITTED"`` corresponding to a value of 0 and 1, respectively. -SQLite defaults to ``SERIALIZABLE``, however its behavior is impacted by -the pysqlite driver's default behavior. - -When using the pysqlite driver, the ``"AUTOCOMMIT"`` isolation level is also -available, which will alter the pysqlite connection using the ``.isolation_level`` -attribute on the DBAPI connection and set it to None for the duration -of the setting. - -.. versionadded:: 1.3.16 added support for SQLite AUTOCOMMIT isolation level - when using the pysqlite / sqlite3 SQLite driver. - - -The other axis along which SQLite's transactional locking is impacted is -via the nature of the ``BEGIN`` statement used. The three varieties -are "deferred", "immediate", and "exclusive", as described at -`BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_. A straight -``BEGIN`` statement uses the "deferred" mode, where the database file is -not locked until the first read or write operation, and read access remains -open to other transactions until the first write operation. But again, -it is critical to note that the pysqlite driver interferes with this behavior -by *not even emitting BEGIN* until the first write operation. - -.. warning:: - - SQLite's transactional scope is impacted by unresolved - issues in the pysqlite driver, which defers BEGIN statements to a greater - degree than is often feasible. See the section :ref:`pysqlite_serializable` - or :ref:`aiosqlite_serializable` for techniques to work around this behavior. - -.. seealso:: - - :ref:`dbapi_autocommit` - -INSERT/UPDATE/DELETE...RETURNING ---------------------------------- - -The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING`` -syntax. ``INSERT..RETURNING`` may be used -automatically in some cases in order to fetch newly generated identifiers in -place of the traditional approach of using ``cursor.lastrowid``, however -``cursor.lastrowid`` is currently still preferred for simple single-statement -cases for its better performance. - -To specify an explicit ``RETURNING`` clause, use the -:meth:`._UpdateBase.returning` method on a per-statement basis:: - - # INSERT..RETURNING - result = connection.execute( - table.insert(). - values(name='foo'). - returning(table.c.col1, table.c.col2) - ) - print(result.all()) - - # UPDATE..RETURNING - result = connection.execute( - table.update(). - where(table.c.name=='foo'). - values(name='bar'). - returning(table.c.col1, table.c.col2) - ) - print(result.all()) - - # DELETE..RETURNING - result = connection.execute( - table.delete(). - where(table.c.name=='foo'). - returning(table.c.col1, table.c.col2) - ) - print(result.all()) - -.. versionadded:: 2.0 Added support for SQLite RETURNING - -SAVEPOINT Support ----------------------------- - -SQLite supports SAVEPOINTs, which only function once a transaction is -begun. SQLAlchemy's SAVEPOINT support is available using the -:meth:`_engine.Connection.begin_nested` method at the Core level, and -:meth:`.Session.begin_nested` at the ORM level. However, SAVEPOINTs -won't work at all with pysqlite unless workarounds are taken. - -.. warning:: - - SQLite's SAVEPOINT feature is impacted by unresolved - issues in the pysqlite and aiosqlite drivers, which defer BEGIN statements - to a greater degree than is often feasible. See the sections - :ref:`pysqlite_serializable` and :ref:`aiosqlite_serializable` - for techniques to work around this behavior. - -Transactional DDL ----------------------------- - -The SQLite database supports transactional :term:`DDL` as well. -In this case, the pysqlite driver is not only failing to start transactions, -it also is ending any existing transaction when DDL is detected, so again, -workarounds are required. - -.. warning:: - - SQLite's transactional DDL is impacted by unresolved issues - in the pysqlite driver, which fails to emit BEGIN and additionally - forces a COMMIT to cancel any transaction when DDL is encountered. - See the section :ref:`pysqlite_serializable` - for techniques to work around this behavior. - -.. _sqlite_foreign_keys: - -Foreign Key Support -------------------- - -SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables, -however by default these constraints have no effect on the operation of the -table. - -Constraint checking on SQLite has three prerequisites: - -* At least version 3.6.19 of SQLite must be in use -* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY - or SQLITE_OMIT_TRIGGER symbols enabled. -* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all - connections before use -- including the initial call to - :meth:`sqlalchemy.schema.MetaData.create_all`. - -SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for -new connections through the usage of events:: - - from sqlalchemy.engine import Engine - from sqlalchemy import event - - @event.listens_for(Engine, "connect") - def set_sqlite_pragma(dbapi_connection, connection_record): - cursor = dbapi_connection.cursor() - cursor.execute("PRAGMA foreign_keys=ON") - cursor.close() - -.. warning:: - - When SQLite foreign keys are enabled, it is **not possible** - to emit CREATE or DROP statements for tables that contain - mutually-dependent foreign key constraints; - to emit the DDL for these tables requires that ALTER TABLE be used to - create or drop these constraints separately, for which SQLite has - no support. - -.. seealso:: - - `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_ - - on the SQLite web site. - - :ref:`event_toplevel` - SQLAlchemy event API. - - :ref:`use_alter` - more information on SQLAlchemy's facilities for handling - mutually-dependent foreign key constraints. - -.. _sqlite_on_conflict_ddl: - -ON CONFLICT support for constraints ------------------------------------ - -.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for - SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as - applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`. - -SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied -to primary key, unique, check, and not null constraints. In DDL, it is -rendered either within the "CONSTRAINT" clause or within the column definition -itself depending on the location of the target constraint. To render this -clause within DDL, the extension parameter ``sqlite_on_conflict`` can be -specified with a string conflict resolution algorithm within the -:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`, -:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object, -there -are individual parameters ``sqlite_on_conflict_not_null``, -``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each -correspond to the three types of relevant constraint types that can be -indicated from a :class:`_schema.Column` object. - -.. seealso:: - - `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite - documentation - -.. versionadded:: 1.3 - - -The ``sqlite_on_conflict`` parameters accept a string argument which is just -the resolution name to be chosen, which on SQLite can be one of ROLLBACK, -ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint -that specifies the IGNORE algorithm:: - - some_table = Table( - 'some_table', metadata, - Column('id', Integer, primary_key=True), - Column('data', Integer), - UniqueConstraint('id', 'data', sqlite_on_conflict='IGNORE') - ) - -The above renders CREATE TABLE DDL as:: - - CREATE TABLE some_table ( - id INTEGER NOT NULL, - data INTEGER, - PRIMARY KEY (id), - UNIQUE (id, data) ON CONFLICT IGNORE - ) - - -When using the :paramref:`_schema.Column.unique` -flag to add a UNIQUE constraint -to a single column, the ``sqlite_on_conflict_unique`` parameter can -be added to the :class:`_schema.Column` as well, which will be added to the -UNIQUE constraint in the DDL:: - - some_table = Table( - 'some_table', metadata, - Column('id', Integer, primary_key=True), - Column('data', Integer, unique=True, - sqlite_on_conflict_unique='IGNORE') - ) - -rendering:: - - CREATE TABLE some_table ( - id INTEGER NOT NULL, - data INTEGER, - PRIMARY KEY (id), - UNIQUE (data) ON CONFLICT IGNORE - ) - -To apply the FAIL algorithm for a NOT NULL constraint, -``sqlite_on_conflict_not_null`` is used:: - - some_table = Table( - 'some_table', metadata, - Column('id', Integer, primary_key=True), - Column('data', Integer, nullable=False, - sqlite_on_conflict_not_null='FAIL') - ) - -this renders the column inline ON CONFLICT phrase:: - - CREATE TABLE some_table ( - id INTEGER NOT NULL, - data INTEGER NOT NULL ON CONFLICT FAIL, - PRIMARY KEY (id) - ) - - -Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``:: - - some_table = Table( - 'some_table', metadata, - Column('id', Integer, primary_key=True, - sqlite_on_conflict_primary_key='FAIL') - ) - -SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict -resolution algorithm is applied to the constraint itself:: - - CREATE TABLE some_table ( - id INTEGER NOT NULL, - PRIMARY KEY (id) ON CONFLICT FAIL - ) - -.. _sqlite_on_conflict_insert: - -INSERT...ON CONFLICT (Upsert) ------------------------------------ - -.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for - SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as - applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`. - -From version 3.24.0 onwards, SQLite supports "upserts" (update or insert) -of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` -statement. A candidate row will only be inserted if that row does not violate -any unique or primary key constraints. In the case of a unique constraint violation, a -secondary action can occur which can be either "DO UPDATE", indicating that -the data in the target row should be updated, or "DO NOTHING", which indicates -to silently skip this row. - -Conflicts are determined using columns that are part of existing unique -constraints and indexes. These constraints are identified by stating the -columns and conditions that comprise the indexes. - -SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific -:func:`_sqlite.insert()` function, which provides -the generative methods :meth:`_sqlite.Insert.on_conflict_do_update` -and :meth:`_sqlite.Insert.on_conflict_do_nothing`: - -.. sourcecode:: pycon+sql - - >>> from sqlalchemy.dialects.sqlite import insert - - >>> insert_stmt = insert(my_table).values( - ... id='some_existing_id', - ... data='inserted value') - - >>> do_update_stmt = insert_stmt.on_conflict_do_update( - ... index_elements=['id'], - ... set_=dict(data='updated value') - ... ) - - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) - ON CONFLICT (id) DO UPDATE SET data = ?{stop} - - >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing( - ... index_elements=['id'] - ... ) - - >>> print(do_nothing_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) - ON CONFLICT (id) DO NOTHING - -.. versionadded:: 1.4 - -.. seealso:: - - `Upsert - <https://sqlite.org/lang_UPSERT.html>`_ - - in the SQLite documentation. - - -Specifying the Target -^^^^^^^^^^^^^^^^^^^^^ - -Both methods supply the "target" of the conflict using column inference: - -* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument - specifies a sequence containing string column names, :class:`_schema.Column` - objects, and/or SQL expression elements, which would identify a unique index - or unique constraint. - -* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` - to infer an index, a partial index can be inferred by also specifying the - :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter: - - .. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') - - >>> do_update_stmt = stmt.on_conflict_do_update( - ... index_elements=[my_table.c.user_email], - ... index_where=my_table.c.user_email.like('%@gmail.com'), - ... set_=dict(data=stmt.excluded.data) - ... ) - - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?) - ON CONFLICT (user_email) - WHERE user_email LIKE '%@gmail.com' - DO UPDATE SET data = excluded.data - -The SET Clause -^^^^^^^^^^^^^^^ - -``ON CONFLICT...DO UPDATE`` is used to perform an update of the already -existing row, using any combination of new values as well as values -from the proposed insertion. These values are specified using the -:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This -parameter accepts a dictionary which consists of direct values -for UPDATE: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values(id='some_id', data='inserted value') - - >>> do_update_stmt = stmt.on_conflict_do_update( - ... index_elements=['id'], - ... set_=dict(data='updated value') - ... ) - - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) - ON CONFLICT (id) DO UPDATE SET data = ? - -.. warning:: - - The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take - into account Python-side default UPDATE values or generation functions, - e.g. those specified using :paramref:`_schema.Column.onupdate`. These - values will not be exercised for an ON CONFLICT style of UPDATE, unless - they are manually specified in the - :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary. - -Updating using the Excluded INSERT Values -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -In order to refer to the proposed insertion row, the special alias -:attr:`~.sqlite.Insert.excluded` is available as an attribute on -the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix -on a column, that informs the DO UPDATE to update the row with the value that -would have been inserted had the constraint not failed: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values( - ... id='some_id', - ... data='inserted value', - ... author='jlh' - ... ) - - >>> do_update_stmt = stmt.on_conflict_do_update( - ... index_elements=['id'], - ... set_=dict(data='updated value', author=stmt.excluded.author) - ... ) - - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) - ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author - -Additional WHERE Criteria -^^^^^^^^^^^^^^^^^^^^^^^^^ - -The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts -a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where` -parameter, which will limit those rows which receive an UPDATE: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values( - ... id='some_id', - ... data='inserted value', - ... author='jlh' - ... ) - - >>> on_update_stmt = stmt.on_conflict_do_update( - ... index_elements=['id'], - ... set_=dict(data='updated value', author=stmt.excluded.author), - ... where=(my_table.c.status == 2) - ... ) - >>> print(on_update_stmt) - {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?) - ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author - WHERE my_table.status = ? - - -Skipping Rows with DO NOTHING -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -``ON CONFLICT`` may be used to skip inserting a row entirely -if any conflict with a unique constraint occurs; below this is illustrated -using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values(id='some_id', data='inserted value') - >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id']) - >>> print(stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING - - -If ``DO NOTHING`` is used without specifying any columns or constraint, -it has the effect of skipping the INSERT for any unique violation which -occurs: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values(id='some_id', data='inserted value') - >>> stmt = stmt.on_conflict_do_nothing() - >>> print(stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING - -.. _sqlite_type_reflection: - -Type Reflection ---------------- - -SQLite types are unlike those of most other database backends, in that -the string name of the type usually does not correspond to a "type" in a -one-to-one fashion. Instead, SQLite links per-column typing behavior -to one of five so-called "type affinities" based on a string matching -pattern for the type. - -SQLAlchemy's reflection process, when inspecting types, uses a simple -lookup table to link the keywords returned to provided SQLAlchemy types. -This lookup table is present within the SQLite dialect as it is for all -other dialects. However, the SQLite dialect has a different "fallback" -routine for when a particular type name is not located in the lookup map; -it instead implements the SQLite "type affinity" scheme located at -https://www.sqlite.org/datatype3.html section 2.1. - -The provided typemap will make direct associations from an exact string -name match for the following types: - -:class:`_types.BIGINT`, :class:`_types.BLOB`, -:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`, -:class:`_types.CHAR`, :class:`_types.DATE`, -:class:`_types.DATETIME`, :class:`_types.FLOAT`, -:class:`_types.DECIMAL`, :class:`_types.FLOAT`, -:class:`_types.INTEGER`, :class:`_types.INTEGER`, -:class:`_types.NUMERIC`, :class:`_types.REAL`, -:class:`_types.SMALLINT`, :class:`_types.TEXT`, -:class:`_types.TIME`, :class:`_types.TIMESTAMP`, -:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`, -:class:`_types.NCHAR` - -When a type name does not match one of the above types, the "type affinity" -lookup is used instead: - -* :class:`_types.INTEGER` is returned if the type name includes the - string ``INT`` -* :class:`_types.TEXT` is returned if the type name includes the - string ``CHAR``, ``CLOB`` or ``TEXT`` -* :class:`_types.NullType` is returned if the type name includes the - string ``BLOB`` -* :class:`_types.REAL` is returned if the type name includes the string - ``REAL``, ``FLOA`` or ``DOUB``. -* Otherwise, the :class:`_types.NUMERIC` type is used. - -.. _sqlite_partial_index: - -Partial Indexes ---------------- - -A partial index, e.g. one which uses a WHERE clause, can be specified -with the DDL system using the argument ``sqlite_where``:: - - tbl = Table('testtbl', m, Column('data', Integer)) - idx = Index('test_idx1', tbl.c.data, - sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10)) - -The index will be rendered at create time as:: - - CREATE INDEX test_idx1 ON testtbl (data) - WHERE data > 5 AND data < 10 - -.. _sqlite_dotted_column_names: - -Dotted Column Names -------------------- - -Using table or column names that explicitly have periods in them is -**not recommended**. While this is generally a bad idea for relational -databases in general, as the dot is a syntactically significant character, -the SQLite driver up until version **3.10.0** of SQLite has a bug which -requires that SQLAlchemy filter out these dots in result sets. - -The bug, entirely outside of SQLAlchemy, can be illustrated thusly:: - - import sqlite3 - - assert sqlite3.sqlite_version_info < (3, 10, 0), "bug is fixed in this version" - - conn = sqlite3.connect(":memory:") - cursor = conn.cursor() - - cursor.execute("create table x (a integer, b integer)") - cursor.execute("insert into x (a, b) values (1, 1)") - cursor.execute("insert into x (a, b) values (2, 2)") - - cursor.execute("select x.a, x.b from x") - assert [c[0] for c in cursor.description] == ['a', 'b'] - - cursor.execute(''' - select x.a, x.b from x where a=1 - union - select x.a, x.b from x where a=2 - ''') - assert [c[0] for c in cursor.description] == ['a', 'b'], \ - [c[0] for c in cursor.description] - -The second assertion fails:: - - Traceback (most recent call last): - File "test.py", line 19, in <module> - [c[0] for c in cursor.description] - AssertionError: ['x.a', 'x.b'] - -Where above, the driver incorrectly reports the names of the columns -including the name of the table, which is entirely inconsistent vs. -when the UNION is not present. - -SQLAlchemy relies upon column names being predictable in how they match -to the original statement, so the SQLAlchemy dialect has no choice but -to filter these out:: - - - from sqlalchemy import create_engine - - eng = create_engine("sqlite://") - conn = eng.connect() - - conn.exec_driver_sql("create table x (a integer, b integer)") - conn.exec_driver_sql("insert into x (a, b) values (1, 1)") - conn.exec_driver_sql("insert into x (a, b) values (2, 2)") - - result = conn.exec_driver_sql("select x.a, x.b from x") - assert result.keys() == ["a", "b"] - - result = conn.exec_driver_sql(''' - select x.a, x.b from x where a=1 - union - select x.a, x.b from x where a=2 - ''') - assert result.keys() == ["a", "b"] - -Note that above, even though SQLAlchemy filters out the dots, *both -names are still addressable*:: - - >>> row = result.first() - >>> row["a"] - 1 - >>> row["x.a"] - 1 - >>> row["b"] - 1 - >>> row["x.b"] - 1 - -Therefore, the workaround applied by SQLAlchemy only impacts -:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In -the very specific case where an application is forced to use column names that -contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and -:meth:`.Row.keys()` is required to return these dotted names unmodified, -the ``sqlite_raw_colnames`` execution option may be provided, either on a -per-:class:`_engine.Connection` basis:: - - result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql(''' - select x.a, x.b from x where a=1 - union - select x.a, x.b from x where a=2 - ''') - assert result.keys() == ["x.a", "x.b"] - -or on a per-:class:`_engine.Engine` basis:: - - engine = create_engine("sqlite://", execution_options={"sqlite_raw_colnames": True}) - -When using the per-:class:`_engine.Engine` execution option, note that -**Core and ORM queries that use UNION may not function properly**. - -SQLite-specific table options ------------------------------ - -One option for CREATE TABLE is supported directly by the SQLite -dialect in conjunction with the :class:`_schema.Table` construct: - -* ``WITHOUT ROWID``:: - - Table("some_table", metadata, ..., sqlite_with_rowid=False) - -.. seealso:: - - `SQLite CREATE TABLE options - <https://www.sqlite.org/lang_createtable.html>`_ - - -.. _sqlite_include_internal: - -Reflecting internal schema tables ----------------------------------- - -Reflection methods that return lists of tables will omit so-called -"SQLite internal schema object" names, which are considered by SQLite -as any object name that is prefixed with ``sqlite_``. An example of -such an object is the ``sqlite_sequence`` table that's generated when -the ``AUTOINCREMENT`` column parameter is used. In order to return -these objects, the parameter ``sqlite_include_internal=True`` may be -passed to methods such as :meth:`_schema.MetaData.reflect` or -:meth:`.Inspector.get_table_names`. - -.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter. - Previously, these tables were not ignored by SQLAlchemy reflection - methods. - -.. note:: - - The ``sqlite_include_internal`` parameter does not refer to the - "system" tables that are present in schemas such as ``sqlite_master``. - -.. seealso:: - - `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite - documentation. - -""" # noqa -from __future__ import annotations - -import datetime -import numbers -import re -from typing import Optional - -from .json import JSON -from .json import JSONIndexType -from .json import JSONPathType -from ... import exc -from ... import schema as sa_schema -from ... import sql -from ... import text -from ... import types as sqltypes -from ... import util -from ...engine import default -from ...engine import processors -from ...engine import reflection -from ...engine.reflection import ReflectionDefaults -from ...sql import coercions -from ...sql import ColumnElement -from ...sql import compiler -from ...sql import elements -from ...sql import roles -from ...sql import schema -from ...types import BLOB # noqa -from ...types import BOOLEAN # noqa -from ...types import CHAR # noqa -from ...types import DECIMAL # noqa -from ...types import FLOAT # noqa -from ...types import INTEGER # noqa -from ...types import NUMERIC # noqa -from ...types import REAL # noqa -from ...types import SMALLINT # noqa -from ...types import TEXT # noqa -from ...types import TIMESTAMP # noqa -from ...types import VARCHAR # noqa - - -class _SQliteJson(JSON): - def result_processor(self, dialect, coltype): - default_processor = super().result_processor(dialect, coltype) - - def process(value): - try: - return default_processor(value) - except TypeError: - if isinstance(value, numbers.Number): - return value - else: - raise - - return process - - -class _DateTimeMixin: - _reg = None - _storage_format = None - - def __init__(self, storage_format=None, regexp=None, **kw): - super().__init__(**kw) - if regexp is not None: - self._reg = re.compile(regexp) - if storage_format is not None: - self._storage_format = storage_format - - @property - def format_is_text_affinity(self): - """return True if the storage format will automatically imply - a TEXT affinity. - - If the storage format contains no non-numeric characters, - it will imply a NUMERIC storage format on SQLite; in this case, - the type will generate its DDL as DATE_CHAR, DATETIME_CHAR, - TIME_CHAR. - - """ - spec = self._storage_format % { - "year": 0, - "month": 0, - "day": 0, - "hour": 0, - "minute": 0, - "second": 0, - "microsecond": 0, - } - return bool(re.search(r"[^0-9]", spec)) - - def adapt(self, cls, **kw): - if issubclass(cls, _DateTimeMixin): - if self._storage_format: - kw["storage_format"] = self._storage_format - if self._reg: - kw["regexp"] = self._reg - return super().adapt(cls, **kw) - - def literal_processor(self, dialect): - bp = self.bind_processor(dialect) - - def process(value): - return "'%s'" % bp(value) - - return process - - -class DATETIME(_DateTimeMixin, sqltypes.DateTime): - r"""Represent a Python datetime object in SQLite using a string. - - The default string storage format is:: - - "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" - - e.g.:: - - 2021-03-15 12:05:57.105542 - - The incoming storage format is by default parsed using the - Python ``datetime.fromisoformat()`` function. - - .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default - datetime string parsing. - - The storage format can be customized to some degree using the - ``storage_format`` and ``regexp`` parameters, such as:: - - import re - from sqlalchemy.dialects.sqlite import DATETIME - - dt = DATETIME(storage_format="%(year)04d/%(month)02d/%(day)02d " - "%(hour)02d:%(minute)02d:%(second)02d", - regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)" - ) - - :param storage_format: format string which will be applied to the dict - with keys year, month, day, hour, minute, second, and microsecond. - - :param regexp: regular expression which will be applied to incoming result - rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming - strings. If the regexp contains named groups, the resulting match dict is - applied to the Python datetime() constructor as keyword arguments. - Otherwise, if positional groups are used, the datetime() constructor - is called with positional arguments via - ``*map(int, match_obj.groups(0))``. - - """ # noqa - - _storage_format = ( - "%(year)04d-%(month)02d-%(day)02d " - "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" - ) - - def __init__(self, *args, **kwargs): - truncate_microseconds = kwargs.pop("truncate_microseconds", False) - super().__init__(*args, **kwargs) - if truncate_microseconds: - assert "storage_format" not in kwargs, ( - "You can specify only " - "one of truncate_microseconds or storage_format." - ) - assert "regexp" not in kwargs, ( - "You can specify only one of " - "truncate_microseconds or regexp." - ) - self._storage_format = ( - "%(year)04d-%(month)02d-%(day)02d " - "%(hour)02d:%(minute)02d:%(second)02d" - ) - - def bind_processor(self, dialect): - datetime_datetime = datetime.datetime - datetime_date = datetime.date - format_ = self._storage_format - - def process(value): - if value is None: - return None - elif isinstance(value, datetime_datetime): - return format_ % { - "year": value.year, - "month": value.month, - "day": value.day, - "hour": value.hour, - "minute": value.minute, - "second": value.second, - "microsecond": value.microsecond, - } - elif isinstance(value, datetime_date): - return format_ % { - "year": value.year, - "month": value.month, - "day": value.day, - "hour": 0, - "minute": 0, - "second": 0, - "microsecond": 0, - } - else: - raise TypeError( - "SQLite DateTime type only accepts Python " - "datetime and date objects as input." - ) - - return process - - def result_processor(self, dialect, coltype): - if self._reg: - return processors.str_to_datetime_processor_factory( - self._reg, datetime.datetime - ) - else: - return processors.str_to_datetime - - -class DATE(_DateTimeMixin, sqltypes.Date): - r"""Represent a Python date object in SQLite using a string. - - The default string storage format is:: - - "%(year)04d-%(month)02d-%(day)02d" - - e.g.:: - - 2011-03-15 - - The incoming storage format is by default parsed using the - Python ``date.fromisoformat()`` function. - - .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default - date string parsing. - - - The storage format can be customized to some degree using the - ``storage_format`` and ``regexp`` parameters, such as:: - - import re - from sqlalchemy.dialects.sqlite import DATE - - d = DATE( - storage_format="%(month)02d/%(day)02d/%(year)04d", - regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)") - ) - - :param storage_format: format string which will be applied to the - dict with keys year, month, and day. - - :param regexp: regular expression which will be applied to - incoming result rows, replacing the use of ``date.fromisoformat()`` to - parse incoming strings. If the regexp contains named groups, the resulting - match dict is applied to the Python date() constructor as keyword - arguments. Otherwise, if positional groups are used, the date() - constructor is called with positional arguments via - ``*map(int, match_obj.groups(0))``. - - """ - - _storage_format = "%(year)04d-%(month)02d-%(day)02d" - - def bind_processor(self, dialect): - datetime_date = datetime.date - format_ = self._storage_format - - def process(value): - if value is None: - return None - elif isinstance(value, datetime_date): - return format_ % { - "year": value.year, - "month": value.month, - "day": value.day, - } - else: - raise TypeError( - "SQLite Date type only accepts Python " - "date objects as input." - ) - - return process - - def result_processor(self, dialect, coltype): - if self._reg: - return processors.str_to_datetime_processor_factory( - self._reg, datetime.date - ) - else: - return processors.str_to_date - - -class TIME(_DateTimeMixin, sqltypes.Time): - r"""Represent a Python time object in SQLite using a string. - - The default string storage format is:: - - "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" - - e.g.:: - - 12:05:57.10558 - - The incoming storage format is by default parsed using the - Python ``time.fromisoformat()`` function. - - .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default - time string parsing. - - The storage format can be customized to some degree using the - ``storage_format`` and ``regexp`` parameters, such as:: - - import re - from sqlalchemy.dialects.sqlite import TIME - - t = TIME(storage_format="%(hour)02d-%(minute)02d-" - "%(second)02d-%(microsecond)06d", - regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?") - ) - - :param storage_format: format string which will be applied to the dict - with keys hour, minute, second, and microsecond. - - :param regexp: regular expression which will be applied to incoming result - rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming - strings. If the regexp contains named groups, the resulting match dict is - applied to the Python time() constructor as keyword arguments. Otherwise, - if positional groups are used, the time() constructor is called with - positional arguments via ``*map(int, match_obj.groups(0))``. - - """ - - _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d" - - def __init__(self, *args, **kwargs): - truncate_microseconds = kwargs.pop("truncate_microseconds", False) - super().__init__(*args, **kwargs) - if truncate_microseconds: - assert "storage_format" not in kwargs, ( - "You can specify only " - "one of truncate_microseconds or storage_format." - ) - assert "regexp" not in kwargs, ( - "You can specify only one of " - "truncate_microseconds or regexp." - ) - self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d" - - def bind_processor(self, dialect): - datetime_time = datetime.time - format_ = self._storage_format - - def process(value): - if value is None: - return None - elif isinstance(value, datetime_time): - return format_ % { - "hour": value.hour, - "minute": value.minute, - "second": value.second, - "microsecond": value.microsecond, - } - else: - raise TypeError( - "SQLite Time type only accepts Python " - "time objects as input." - ) - - return process - - def result_processor(self, dialect, coltype): - if self._reg: - return processors.str_to_datetime_processor_factory( - self._reg, datetime.time - ) - else: - return processors.str_to_time - - -colspecs = { - sqltypes.Date: DATE, - sqltypes.DateTime: DATETIME, - sqltypes.JSON: _SQliteJson, - sqltypes.JSON.JSONIndexType: JSONIndexType, - sqltypes.JSON.JSONPathType: JSONPathType, - sqltypes.Time: TIME, -} - -ischema_names = { - "BIGINT": sqltypes.BIGINT, - "BLOB": sqltypes.BLOB, - "BOOL": sqltypes.BOOLEAN, - "BOOLEAN": sqltypes.BOOLEAN, - "CHAR": sqltypes.CHAR, - "DATE": sqltypes.DATE, - "DATE_CHAR": sqltypes.DATE, - "DATETIME": sqltypes.DATETIME, - "DATETIME_CHAR": sqltypes.DATETIME, - "DOUBLE": sqltypes.DOUBLE, - "DECIMAL": sqltypes.DECIMAL, - "FLOAT": sqltypes.FLOAT, - "INT": sqltypes.INTEGER, - "INTEGER": sqltypes.INTEGER, - "JSON": JSON, - "NUMERIC": sqltypes.NUMERIC, - "REAL": sqltypes.REAL, - "SMALLINT": sqltypes.SMALLINT, - "TEXT": sqltypes.TEXT, - "TIME": sqltypes.TIME, - "TIME_CHAR": sqltypes.TIME, - "TIMESTAMP": sqltypes.TIMESTAMP, - "VARCHAR": sqltypes.VARCHAR, - "NVARCHAR": sqltypes.NVARCHAR, - "NCHAR": sqltypes.NCHAR, -} - - -class SQLiteCompiler(compiler.SQLCompiler): - extract_map = util.update_copy( - compiler.SQLCompiler.extract_map, - { - "month": "%m", - "day": "%d", - "year": "%Y", - "second": "%S", - "hour": "%H", - "doy": "%j", - "minute": "%M", - "epoch": "%s", - "dow": "%w", - "week": "%W", - }, - ) - - def visit_truediv_binary(self, binary, operator, **kw): - return ( - self.process(binary.left, **kw) - + " / " - + "(%s + 0.0)" % self.process(binary.right, **kw) - ) - - def visit_now_func(self, fn, **kw): - return "CURRENT_TIMESTAMP" - - def visit_localtimestamp_func(self, func, **kw): - return 'DATETIME(CURRENT_TIMESTAMP, "localtime")' - - def visit_true(self, expr, **kw): - return "1" - - def visit_false(self, expr, **kw): - return "0" - - def visit_char_length_func(self, fn, **kw): - return "length%s" % self.function_argspec(fn) - - def visit_aggregate_strings_func(self, fn, **kw): - return "group_concat%s" % self.function_argspec(fn) - - def visit_cast(self, cast, **kwargs): - if self.dialect.supports_cast: - return super().visit_cast(cast, **kwargs) - else: - return self.process(cast.clause, **kwargs) - - def visit_extract(self, extract, **kw): - try: - return "CAST(STRFTIME('%s', %s) AS INTEGER)" % ( - self.extract_map[extract.field], - self.process(extract.expr, **kw), - ) - except KeyError as err: - raise exc.CompileError( - "%s is not a valid extract argument." % extract.field - ) from err - - def returning_clause( - self, - stmt, - returning_cols, - *, - populate_result_map, - **kw, - ): - kw["include_table"] = False - return super().returning_clause( - stmt, returning_cols, populate_result_map=populate_result_map, **kw - ) - - def limit_clause(self, select, **kw): - text = "" - if select._limit_clause is not None: - text += "\n LIMIT " + self.process(select._limit_clause, **kw) - if select._offset_clause is not None: - if select._limit_clause is None: - text += "\n LIMIT " + self.process(sql.literal(-1)) - text += " OFFSET " + self.process(select._offset_clause, **kw) - else: - text += " OFFSET " + self.process(sql.literal(0), **kw) - return text - - def for_update_clause(self, select, **kw): - # sqlite has no "FOR UPDATE" AFAICT - return "" - - def update_from_clause( - self, update_stmt, from_table, extra_froms, from_hints, **kw - ): - kw["asfrom"] = True - return "FROM " + ", ".join( - t._compiler_dispatch(self, fromhints=from_hints, **kw) - for t in extra_froms - ) - - def visit_is_distinct_from_binary(self, binary, operator, **kw): - return "%s IS NOT %s" % ( - self.process(binary.left), - self.process(binary.right), - ) - - def visit_is_not_distinct_from_binary(self, binary, operator, **kw): - return "%s IS %s" % ( - self.process(binary.left), - self.process(binary.right), - ) - - def visit_json_getitem_op_binary(self, binary, operator, **kw): - if binary.type._type_affinity is sqltypes.JSON: - expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" - else: - expr = "JSON_EXTRACT(%s, %s)" - - return expr % ( - self.process(binary.left, **kw), - self.process(binary.right, **kw), - ) - - def visit_json_path_getitem_op_binary(self, binary, operator, **kw): - if binary.type._type_affinity is sqltypes.JSON: - expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))" - else: - expr = "JSON_EXTRACT(%s, %s)" - - return expr % ( - self.process(binary.left, **kw), - self.process(binary.right, **kw), - ) - - def visit_empty_set_op_expr(self, type_, expand_op, **kw): - # slightly old SQLite versions don't seem to be able to handle - # the empty set impl - return self.visit_empty_set_expr(type_) - - def visit_empty_set_expr(self, element_types, **kw): - return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % ( - ", ".join("1" for type_ in element_types or [INTEGER()]), - ", ".join("1" for type_ in element_types or [INTEGER()]), - ) - - def visit_regexp_match_op_binary(self, binary, operator, **kw): - return self._generate_generic_binary(binary, " REGEXP ", **kw) - - def visit_not_regexp_match_op_binary(self, binary, operator, **kw): - return self._generate_generic_binary(binary, " NOT REGEXP ", **kw) - - def _on_conflict_target(self, clause, **kw): - if clause.constraint_target is not None: - target_text = "(%s)" % clause.constraint_target - elif clause.inferred_target_elements is not None: - target_text = "(%s)" % ", ".join( - ( - self.preparer.quote(c) - if isinstance(c, str) - else self.process(c, include_table=False, use_schema=False) - ) - for c in clause.inferred_target_elements - ) - if clause.inferred_target_whereclause is not None: - target_text += " WHERE %s" % self.process( - clause.inferred_target_whereclause, - include_table=False, - use_schema=False, - literal_binds=True, - ) - - else: - target_text = "" - - return target_text - - def visit_on_conflict_do_nothing(self, on_conflict, **kw): - target_text = self._on_conflict_target(on_conflict, **kw) - - if target_text: - return "ON CONFLICT %s DO NOTHING" % target_text - else: - return "ON CONFLICT DO NOTHING" - - def visit_on_conflict_do_update(self, on_conflict, **kw): - clause = on_conflict - - target_text = self._on_conflict_target(on_conflict, **kw) - - action_set_ops = [] - - set_parameters = dict(clause.update_values_to_set) - # create a list of column assignment clauses as tuples - - insert_statement = self.stack[-1]["selectable"] - cols = insert_statement.table.c - for c in cols: - col_key = c.key - - if col_key in set_parameters: - value = set_parameters.pop(col_key) - elif c in set_parameters: - value = set_parameters.pop(c) - else: - continue - - if coercions._is_literal(value): - value = elements.BindParameter(None, value, type_=c.type) - - else: - if ( - isinstance(value, elements.BindParameter) - and value.type._isnull - ): - value = value._clone() - value.type = c.type - value_text = self.process(value.self_group(), use_schema=False) - - key_text = self.preparer.quote(c.name) - action_set_ops.append("%s = %s" % (key_text, value_text)) - - # check for names that don't match columns - if set_parameters: - util.warn( - "Additional column names not matching " - "any column keys in table '%s': %s" - % ( - self.current_executable.table.name, - (", ".join("'%s'" % c for c in set_parameters)), - ) - ) - for k, v in set_parameters.items(): - key_text = ( - self.preparer.quote(k) - if isinstance(k, str) - else self.process(k, use_schema=False) - ) - value_text = self.process( - coercions.expect(roles.ExpressionElementRole, v), - use_schema=False, - ) - action_set_ops.append("%s = %s" % (key_text, value_text)) - - action_text = ", ".join(action_set_ops) - if clause.update_whereclause is not None: - action_text += " WHERE %s" % self.process( - clause.update_whereclause, include_table=True, use_schema=False - ) - - return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) - - -class SQLiteDDLCompiler(compiler.DDLCompiler): - def get_column_specification(self, column, **kwargs): - coltype = self.dialect.type_compiler_instance.process( - column.type, type_expression=column - ) - colspec = self.preparer.format_column(column) + " " + coltype - default = self.get_column_default_string(column) - if default is not None: - if isinstance(column.server_default.arg, ColumnElement): - default = "(" + default + ")" - colspec += " DEFAULT " + default - - if not column.nullable: - colspec += " NOT NULL" - - on_conflict_clause = column.dialect_options["sqlite"][ - "on_conflict_not_null" - ] - if on_conflict_clause is not None: - colspec += " ON CONFLICT " + on_conflict_clause - - if column.primary_key: - if ( - column.autoincrement is True - and len(column.table.primary_key.columns) != 1 - ): - raise exc.CompileError( - "SQLite does not support autoincrement for " - "composite primary keys" - ) - - if ( - column.table.dialect_options["sqlite"]["autoincrement"] - and len(column.table.primary_key.columns) == 1 - and issubclass(column.type._type_affinity, sqltypes.Integer) - and not column.foreign_keys - ): - colspec += " PRIMARY KEY" - - on_conflict_clause = column.dialect_options["sqlite"][ - "on_conflict_primary_key" - ] - if on_conflict_clause is not None: - colspec += " ON CONFLICT " + on_conflict_clause - - colspec += " AUTOINCREMENT" - - if column.computed is not None: - colspec += " " + self.process(column.computed) - - return colspec - - def visit_primary_key_constraint(self, constraint, **kw): - # for columns with sqlite_autoincrement=True, - # the PRIMARY KEY constraint can only be inline - # with the column itself. - if len(constraint.columns) == 1: - c = list(constraint)[0] - if ( - c.primary_key - and c.table.dialect_options["sqlite"]["autoincrement"] - and issubclass(c.type._type_affinity, sqltypes.Integer) - and not c.foreign_keys - ): - return None - - text = super().visit_primary_key_constraint(constraint) - - on_conflict_clause = constraint.dialect_options["sqlite"][ - "on_conflict" - ] - if on_conflict_clause is None and len(constraint.columns) == 1: - on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][ - "on_conflict_primary_key" - ] - - if on_conflict_clause is not None: - text += " ON CONFLICT " + on_conflict_clause - - return text - - def visit_unique_constraint(self, constraint, **kw): - text = super().visit_unique_constraint(constraint) - - on_conflict_clause = constraint.dialect_options["sqlite"][ - "on_conflict" - ] - if on_conflict_clause is None and len(constraint.columns) == 1: - col1 = list(constraint)[0] - if isinstance(col1, schema.SchemaItem): - on_conflict_clause = list(constraint)[0].dialect_options[ - "sqlite" - ]["on_conflict_unique"] - - if on_conflict_clause is not None: - text += " ON CONFLICT " + on_conflict_clause - - return text - - def visit_check_constraint(self, constraint, **kw): - text = super().visit_check_constraint(constraint) - - on_conflict_clause = constraint.dialect_options["sqlite"][ - "on_conflict" - ] - - if on_conflict_clause is not None: - text += " ON CONFLICT " + on_conflict_clause - - return text - - def visit_column_check_constraint(self, constraint, **kw): - text = super().visit_column_check_constraint(constraint) - - if constraint.dialect_options["sqlite"]["on_conflict"] is not None: - raise exc.CompileError( - "SQLite does not support on conflict clause for " - "column check constraint" - ) - - return text - - def visit_foreign_key_constraint(self, constraint, **kw): - local_table = constraint.elements[0].parent.table - remote_table = constraint.elements[0].column.table - - if local_table.schema != remote_table.schema: - return None - else: - return super().visit_foreign_key_constraint(constraint) - - def define_constraint_remote_table(self, constraint, table, preparer): - """Format the remote table clause of a CREATE CONSTRAINT clause.""" - - return preparer.format_table(table, use_schema=False) - - def visit_create_index( - self, create, include_schema=False, include_table_schema=True, **kw - ): - index = create.element - self._verify_index_table(index) - preparer = self.preparer - text = "CREATE " - if index.unique: - text += "UNIQUE " - - text += "INDEX " - - if create.if_not_exists: - text += "IF NOT EXISTS " - - text += "%s ON %s (%s)" % ( - self._prepared_index_name(index, include_schema=True), - preparer.format_table(index.table, use_schema=False), - ", ".join( - self.sql_compiler.process( - expr, include_table=False, literal_binds=True - ) - for expr in index.expressions - ), - ) - - whereclause = index.dialect_options["sqlite"]["where"] - if whereclause is not None: - where_compiled = self.sql_compiler.process( - whereclause, include_table=False, literal_binds=True - ) - text += " WHERE " + where_compiled - - return text - - def post_create_table(self, table): - if table.dialect_options["sqlite"]["with_rowid"] is False: - return "\n WITHOUT ROWID" - return "" - - -class SQLiteTypeCompiler(compiler.GenericTypeCompiler): - def visit_large_binary(self, type_, **kw): - return self.visit_BLOB(type_) - - def visit_DATETIME(self, type_, **kw): - if ( - not isinstance(type_, _DateTimeMixin) - or type_.format_is_text_affinity - ): - return super().visit_DATETIME(type_) - else: - return "DATETIME_CHAR" - - def visit_DATE(self, type_, **kw): - if ( - not isinstance(type_, _DateTimeMixin) - or type_.format_is_text_affinity - ): - return super().visit_DATE(type_) - else: - return "DATE_CHAR" - - def visit_TIME(self, type_, **kw): - if ( - not isinstance(type_, _DateTimeMixin) - or type_.format_is_text_affinity - ): - return super().visit_TIME(type_) - else: - return "TIME_CHAR" - - def visit_JSON(self, type_, **kw): - # note this name provides NUMERIC affinity, not TEXT. - # should not be an issue unless the JSON value consists of a single - # numeric value. JSONTEXT can be used if this case is required. - return "JSON" - - -class SQLiteIdentifierPreparer(compiler.IdentifierPreparer): - reserved_words = { - "add", - "after", - "all", - "alter", - "analyze", - "and", - "as", - "asc", - "attach", - "autoincrement", - "before", - "begin", - "between", - "by", - "cascade", - "case", - "cast", - "check", - "collate", - "column", - "commit", - "conflict", - "constraint", - "create", - "cross", - "current_date", - "current_time", - "current_timestamp", - "database", - "default", - "deferrable", - "deferred", - "delete", - "desc", - "detach", - "distinct", - "drop", - "each", - "else", - "end", - "escape", - "except", - "exclusive", - "exists", - "explain", - "false", - "fail", - "for", - "foreign", - "from", - "full", - "glob", - "group", - "having", - "if", - "ignore", - "immediate", - "in", - "index", - "indexed", - "initially", - "inner", - "insert", - "instead", - "intersect", - "into", - "is", - "isnull", - "join", - "key", - "left", - "like", - "limit", - "match", - "natural", - "not", - "notnull", - "null", - "of", - "offset", - "on", - "or", - "order", - "outer", - "plan", - "pragma", - "primary", - "query", - "raise", - "references", - "reindex", - "rename", - "replace", - "restrict", - "right", - "rollback", - "row", - "select", - "set", - "table", - "temp", - "temporary", - "then", - "to", - "transaction", - "trigger", - "true", - "union", - "unique", - "update", - "using", - "vacuum", - "values", - "view", - "virtual", - "when", - "where", - } - - -class SQLiteExecutionContext(default.DefaultExecutionContext): - @util.memoized_property - def _preserve_raw_colnames(self): - return ( - not self.dialect._broken_dotted_colnames - or self.execution_options.get("sqlite_raw_colnames", False) - ) - - def _translate_colname(self, colname): - # TODO: detect SQLite version 3.10.0 or greater; - # see [ticket:3633] - - # adjust for dotted column names. SQLite - # in the case of UNION may store col names as - # "tablename.colname", or if using an attached database, - # "database.tablename.colname", in cursor.description - if not self._preserve_raw_colnames and "." in colname: - return colname.split(".")[-1], colname - else: - return colname, None - - -class SQLiteDialect(default.DefaultDialect): - name = "sqlite" - supports_alter = False - - # SQlite supports "DEFAULT VALUES" but *does not* support - # "VALUES (DEFAULT)" - supports_default_values = True - supports_default_metavalue = False - - # sqlite issue: - # https://github.com/python/cpython/issues/93421 - # note this parameter is no longer used by the ORM or default dialect - # see #9414 - supports_sane_rowcount_returning = False - - supports_empty_insert = False - supports_cast = True - supports_multivalues_insert = True - use_insertmanyvalues = True - tuple_in_values = True - supports_statement_cache = True - insert_null_pk_still_autoincrements = True - insert_returning = True - update_returning = True - update_returning_multifrom = True - delete_returning = True - update_returning_multifrom = True - - supports_default_metavalue = True - """dialect supports INSERT... VALUES (DEFAULT) syntax""" - - default_metavalue_token = "NULL" - """for INSERT... VALUES (DEFAULT) syntax, the token to put in the - parenthesis.""" - - default_paramstyle = "qmark" - execution_ctx_cls = SQLiteExecutionContext - statement_compiler = SQLiteCompiler - ddl_compiler = SQLiteDDLCompiler - type_compiler_cls = SQLiteTypeCompiler - preparer = SQLiteIdentifierPreparer - ischema_names = ischema_names - colspecs = colspecs - - construct_arguments = [ - ( - sa_schema.Table, - { - "autoincrement": False, - "with_rowid": True, - }, - ), - (sa_schema.Index, {"where": None}), - ( - sa_schema.Column, - { - "on_conflict_primary_key": None, - "on_conflict_not_null": None, - "on_conflict_unique": None, - }, - ), - (sa_schema.Constraint, {"on_conflict": None}), - ] - - _broken_fk_pragma_quotes = False - _broken_dotted_colnames = False - - @util.deprecated_params( - _json_serializer=( - "1.3.7", - "The _json_serializer argument to the SQLite dialect has " - "been renamed to the correct name of json_serializer. The old " - "argument name will be removed in a future release.", - ), - _json_deserializer=( - "1.3.7", - "The _json_deserializer argument to the SQLite dialect has " - "been renamed to the correct name of json_deserializer. The old " - "argument name will be removed in a future release.", - ), - ) - def __init__( - self, - native_datetime=False, - json_serializer=None, - json_deserializer=None, - _json_serializer=None, - _json_deserializer=None, - **kwargs, - ): - default.DefaultDialect.__init__(self, **kwargs) - - if _json_serializer: - json_serializer = _json_serializer - if _json_deserializer: - json_deserializer = _json_deserializer - self._json_serializer = json_serializer - self._json_deserializer = json_deserializer - - # this flag used by pysqlite dialect, and perhaps others in the - # future, to indicate the driver is handling date/timestamp - # conversions (and perhaps datetime/time as well on some hypothetical - # driver ?) - self.native_datetime = native_datetime - - if self.dbapi is not None: - if self.dbapi.sqlite_version_info < (3, 7, 16): - util.warn( - "SQLite version %s is older than 3.7.16, and will not " - "support right nested joins, as are sometimes used in " - "more complex ORM scenarios. SQLAlchemy 1.4 and above " - "no longer tries to rewrite these joins." - % (self.dbapi.sqlite_version_info,) - ) - - # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These - # version checks are getting very stale. - self._broken_dotted_colnames = self.dbapi.sqlite_version_info < ( - 3, - 10, - 0, - ) - self.supports_default_values = self.dbapi.sqlite_version_info >= ( - 3, - 3, - 8, - ) - self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3) - self.supports_multivalues_insert = ( - # https://www.sqlite.org/releaselog/3_7_11.html - self.dbapi.sqlite_version_info - >= (3, 7, 11) - ) - # see https://www.sqlalchemy.org/trac/ticket/2568 - # as well as https://www.sqlite.org/src/info/600482d161 - self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < ( - 3, - 6, - 14, - ) - - if self.dbapi.sqlite_version_info < (3, 35) or util.pypy: - self.update_returning = self.delete_returning = ( - self.insert_returning - ) = False - - if self.dbapi.sqlite_version_info < (3, 32, 0): - # https://www.sqlite.org/limits.html - self.insertmanyvalues_max_parameters = 999 - - _isolation_lookup = util.immutabledict( - {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0} - ) - - def get_isolation_level_values(self, dbapi_connection): - return list(self._isolation_lookup) - - def set_isolation_level(self, dbapi_connection, level): - isolation_level = self._isolation_lookup[level] - - cursor = dbapi_connection.cursor() - cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}") - cursor.close() - - def get_isolation_level(self, dbapi_connection): - cursor = dbapi_connection.cursor() - cursor.execute("PRAGMA read_uncommitted") - res = cursor.fetchone() - if res: - value = res[0] - else: - # https://www.sqlite.org/changes.html#version_3_3_3 - # "Optional READ UNCOMMITTED isolation (instead of the - # default isolation level of SERIALIZABLE) and - # table level locking when database connections - # share a common cache."" - # pre-SQLite 3.3.0 default to 0 - value = 0 - cursor.close() - if value == 0: - return "SERIALIZABLE" - elif value == 1: - return "READ UNCOMMITTED" - else: - assert False, "Unknown isolation level %s" % value - - @reflection.cache - def get_schema_names(self, connection, **kw): - s = "PRAGMA database_list" - dl = connection.exec_driver_sql(s) - - return [db[1] for db in dl if db[1] != "temp"] - - def _format_schema(self, schema, table_name): - if schema is not None: - qschema = self.identifier_preparer.quote_identifier(schema) - name = f"{qschema}.{table_name}" - else: - name = table_name - return name - - def _sqlite_main_query( - self, - table: str, - type_: str, - schema: Optional[str], - sqlite_include_internal: bool, - ): - main = self._format_schema(schema, table) - if not sqlite_include_internal: - filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'" - else: - filter_table = "" - query = ( - f"SELECT name FROM {main} " - f"WHERE type='{type_}'{filter_table} " - "ORDER BY name" - ) - return query - - @reflection.cache - def get_table_names( - self, connection, schema=None, sqlite_include_internal=False, **kw - ): - query = self._sqlite_main_query( - "sqlite_master", "table", schema, sqlite_include_internal - ) - names = connection.exec_driver_sql(query).scalars().all() - return names - - @reflection.cache - def get_temp_table_names( - self, connection, sqlite_include_internal=False, **kw - ): - query = self._sqlite_main_query( - "sqlite_temp_master", "table", None, sqlite_include_internal - ) - names = connection.exec_driver_sql(query).scalars().all() - return names - - @reflection.cache - def get_temp_view_names( - self, connection, sqlite_include_internal=False, **kw - ): - query = self._sqlite_main_query( - "sqlite_temp_master", "view", None, sqlite_include_internal - ) - names = connection.exec_driver_sql(query).scalars().all() - return names - - @reflection.cache - def has_table(self, connection, table_name, schema=None, **kw): - self._ensure_has_table_connection(connection) - - if schema is not None and schema not in self.get_schema_names( - connection, **kw - ): - return False - - info = self._get_table_pragma( - connection, "table_info", table_name, schema=schema - ) - return bool(info) - - def _get_default_schema_name(self, connection): - return "main" - - @reflection.cache - def get_view_names( - self, connection, schema=None, sqlite_include_internal=False, **kw - ): - query = self._sqlite_main_query( - "sqlite_master", "view", schema, sqlite_include_internal - ) - names = connection.exec_driver_sql(query).scalars().all() - return names - - @reflection.cache - def get_view_definition(self, connection, view_name, schema=None, **kw): - if schema is not None: - qschema = self.identifier_preparer.quote_identifier(schema) - master = f"{qschema}.sqlite_master" - s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % ( - master, - ) - rs = connection.exec_driver_sql(s, (view_name,)) - else: - try: - s = ( - "SELECT sql FROM " - " (SELECT * FROM sqlite_master UNION ALL " - " SELECT * FROM sqlite_temp_master) " - "WHERE name = ? " - "AND type='view'" - ) - rs = connection.exec_driver_sql(s, (view_name,)) - except exc.DBAPIError: - s = ( - "SELECT sql FROM sqlite_master WHERE name = ? " - "AND type='view'" - ) - rs = connection.exec_driver_sql(s, (view_name,)) - - result = rs.fetchall() - if result: - return result[0].sql - else: - raise exc.NoSuchTableError( - f"{schema}.{view_name}" if schema else view_name - ) - - @reflection.cache - def get_columns(self, connection, table_name, schema=None, **kw): - pragma = "table_info" - # computed columns are threaded as hidden, they require table_xinfo - if self.server_version_info >= (3, 31): - pragma = "table_xinfo" - info = self._get_table_pragma( - connection, pragma, table_name, schema=schema - ) - columns = [] - tablesql = None - for row in info: - name = row[1] - type_ = row[2].upper() - nullable = not row[3] - default = row[4] - primary_key = row[5] - hidden = row[6] if pragma == "table_xinfo" else 0 - - # hidden has value 0 for normal columns, 1 for hidden columns, - # 2 for computed virtual columns and 3 for computed stored columns - # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b - if hidden == 1: - continue - - generated = bool(hidden) - persisted = hidden == 3 - - if tablesql is None and generated: - tablesql = self._get_table_sql( - connection, table_name, schema, **kw - ) - - columns.append( - self._get_column_info( - name, - type_, - nullable, - default, - primary_key, - generated, - persisted, - tablesql, - ) - ) - if columns: - return columns - elif not self.has_table(connection, table_name, schema): - raise exc.NoSuchTableError( - f"{schema}.{table_name}" if schema else table_name - ) - else: - return ReflectionDefaults.columns() - - def _get_column_info( - self, - name, - type_, - nullable, - default, - primary_key, - generated, - persisted, - tablesql, - ): - if generated: - # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)" - # somehow is "INTEGER GENERATED ALWAYS" - type_ = re.sub("generated", "", type_, flags=re.IGNORECASE) - type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip() - - coltype = self._resolve_type_affinity(type_) - - if default is not None: - default = str(default) - - colspec = { - "name": name, - "type": coltype, - "nullable": nullable, - "default": default, - "primary_key": primary_key, - } - if generated: - sqltext = "" - if tablesql: - pattern = r"[^,]*\s+AS\s+\(([^,]*)\)\s*(?:virtual|stored)?" - match = re.search( - re.escape(name) + pattern, tablesql, re.IGNORECASE - ) - if match: - sqltext = match.group(1) - colspec["computed"] = {"sqltext": sqltext, "persisted": persisted} - return colspec - - def _resolve_type_affinity(self, type_): - """Return a data type from a reflected column, using affinity rules. - - SQLite's goal for universal compatibility introduces some complexity - during reflection, as a column's defined type might not actually be a - type that SQLite understands - or indeed, my not be defined *at all*. - Internally, SQLite handles this with a 'data type affinity' for each - column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER', - 'REAL', or 'NONE' (raw bits). The algorithm that determines this is - listed in https://www.sqlite.org/datatype3.html section 2.1. - - This method allows SQLAlchemy to support that algorithm, while still - providing access to smarter reflection utilities by recognizing - column definitions that SQLite only supports through affinity (like - DATE and DOUBLE). - - """ - match = re.match(r"([\w ]+)(\(.*?\))?", type_) - if match: - coltype = match.group(1) - args = match.group(2) - else: - coltype = "" - args = "" - - if coltype in self.ischema_names: - coltype = self.ischema_names[coltype] - elif "INT" in coltype: - coltype = sqltypes.INTEGER - elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype: - coltype = sqltypes.TEXT - elif "BLOB" in coltype or not coltype: - coltype = sqltypes.NullType - elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype: - coltype = sqltypes.REAL - else: - coltype = sqltypes.NUMERIC - - if args is not None: - args = re.findall(r"(\d+)", args) - try: - coltype = coltype(*[int(a) for a in args]) - except TypeError: - util.warn( - "Could not instantiate type %s with " - "reflected arguments %s; using no arguments." - % (coltype, args) - ) - coltype = coltype() - else: - coltype = coltype() - - return coltype - - @reflection.cache - def get_pk_constraint(self, connection, table_name, schema=None, **kw): - constraint_name = None - table_data = self._get_table_sql(connection, table_name, schema=schema) - if table_data: - PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY" - result = re.search(PK_PATTERN, table_data, re.I) - constraint_name = result.group(1) if result else None - - cols = self.get_columns(connection, table_name, schema, **kw) - # consider only pk columns. This also avoids sorting the cached - # value returned by get_columns - cols = [col for col in cols if col.get("primary_key", 0) > 0] - cols.sort(key=lambda col: col.get("primary_key")) - pkeys = [col["name"] for col in cols] - - if pkeys: - return {"constrained_columns": pkeys, "name": constraint_name} - else: - return ReflectionDefaults.pk_constraint() - - @reflection.cache - def get_foreign_keys(self, connection, table_name, schema=None, **kw): - # sqlite makes this *extremely difficult*. - # First, use the pragma to get the actual FKs. - pragma_fks = self._get_table_pragma( - connection, "foreign_key_list", table_name, schema=schema - ) - - fks = {} - - for row in pragma_fks: - (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4]) - - if not rcol: - # no referred column, which means it was not named in the - # original DDL. The referred columns of the foreign key - # constraint are therefore the primary key of the referred - # table. - try: - referred_pk = self.get_pk_constraint( - connection, rtbl, schema=schema, **kw - ) - referred_columns = referred_pk["constrained_columns"] - except exc.NoSuchTableError: - # ignore not existing parents - referred_columns = [] - else: - # note we use this list only if this is the first column - # in the constraint. for subsequent columns we ignore the - # list and append "rcol" if present. - referred_columns = [] - - if self._broken_fk_pragma_quotes: - rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl) - - if numerical_id in fks: - fk = fks[numerical_id] - else: - fk = fks[numerical_id] = { - "name": None, - "constrained_columns": [], - "referred_schema": schema, - "referred_table": rtbl, - "referred_columns": referred_columns, - "options": {}, - } - fks[numerical_id] = fk - - fk["constrained_columns"].append(lcol) - - if rcol: - fk["referred_columns"].append(rcol) - - def fk_sig(constrained_columns, referred_table, referred_columns): - return ( - tuple(constrained_columns) - + (referred_table,) - + tuple(referred_columns) - ) - - # then, parse the actual SQL and attempt to find DDL that matches - # the names as well. SQLite saves the DDL in whatever format - # it was typed in as, so need to be liberal here. - - keys_by_signature = { - fk_sig( - fk["constrained_columns"], - fk["referred_table"], - fk["referred_columns"], - ): fk - for fk in fks.values() - } - - table_data = self._get_table_sql(connection, table_name, schema=schema) - - def parse_fks(): - if table_data is None: - # system tables, etc. - return - - # note that we already have the FKs from PRAGMA above. This whole - # regexp thing is trying to locate additional detail about the - # FKs, namely the name of the constraint and other options. - # so parsing the columns is really about matching it up to what - # we already have. - FK_PATTERN = ( - r"(?:CONSTRAINT (\w+) +)?" - r"FOREIGN KEY *\( *(.+?) *\) +" - r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501 - r"((?:ON (?:DELETE|UPDATE) " - r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)" - r"((?:NOT +)?DEFERRABLE)?" - r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?" - ) - for match in re.finditer(FK_PATTERN, table_data, re.I): - ( - constraint_name, - constrained_columns, - referred_quoted_name, - referred_name, - referred_columns, - onupdatedelete, - deferrable, - initially, - ) = match.group(1, 2, 3, 4, 5, 6, 7, 8) - constrained_columns = list( - self._find_cols_in_sig(constrained_columns) - ) - if not referred_columns: - referred_columns = constrained_columns - else: - referred_columns = list( - self._find_cols_in_sig(referred_columns) - ) - referred_name = referred_quoted_name or referred_name - options = {} - - for token in re.split(r" *\bON\b *", onupdatedelete.upper()): - if token.startswith("DELETE"): - ondelete = token[6:].strip() - if ondelete and ondelete != "NO ACTION": - options["ondelete"] = ondelete - elif token.startswith("UPDATE"): - onupdate = token[6:].strip() - if onupdate and onupdate != "NO ACTION": - options["onupdate"] = onupdate - - if deferrable: - options["deferrable"] = "NOT" not in deferrable.upper() - if initially: - options["initially"] = initially.upper() - - yield ( - constraint_name, - constrained_columns, - referred_name, - referred_columns, - options, - ) - - fkeys = [] - - for ( - constraint_name, - constrained_columns, - referred_name, - referred_columns, - options, - ) in parse_fks(): - sig = fk_sig(constrained_columns, referred_name, referred_columns) - if sig not in keys_by_signature: - util.warn( - "WARNING: SQL-parsed foreign key constraint " - "'%s' could not be located in PRAGMA " - "foreign_keys for table %s" % (sig, table_name) - ) - continue - key = keys_by_signature.pop(sig) - key["name"] = constraint_name - key["options"] = options - fkeys.append(key) - # assume the remainders are the unnamed, inline constraints, just - # use them as is as it's extremely difficult to parse inline - # constraints - fkeys.extend(keys_by_signature.values()) - if fkeys: - return fkeys - else: - return ReflectionDefaults.foreign_keys() - - def _find_cols_in_sig(self, sig): - for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I): - yield match.group(1) or match.group(2) - - @reflection.cache - def get_unique_constraints( - self, connection, table_name, schema=None, **kw - ): - auto_index_by_sig = {} - for idx in self.get_indexes( - connection, - table_name, - schema=schema, - include_auto_indexes=True, - **kw, - ): - if not idx["name"].startswith("sqlite_autoindex"): - continue - sig = tuple(idx["column_names"]) - auto_index_by_sig[sig] = idx - - table_data = self._get_table_sql( - connection, table_name, schema=schema, **kw - ) - unique_constraints = [] - - def parse_uqs(): - if table_data is None: - return - UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)' - INLINE_UNIQUE_PATTERN = ( - r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?) ' - r"+[a-z0-9_ ]+? +UNIQUE" - ) - - for match in re.finditer(UNIQUE_PATTERN, table_data, re.I): - name, cols = match.group(1, 2) - yield name, list(self._find_cols_in_sig(cols)) - - # we need to match inlines as well, as we seek to differentiate - # a UNIQUE constraint from a UNIQUE INDEX, even though these - # are kind of the same thing :) - for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I): - cols = list( - self._find_cols_in_sig(match.group(1) or match.group(2)) - ) - yield None, cols - - for name, cols in parse_uqs(): - sig = tuple(cols) - if sig in auto_index_by_sig: - auto_index_by_sig.pop(sig) - parsed_constraint = {"name": name, "column_names": cols} - unique_constraints.append(parsed_constraint) - # NOTE: auto_index_by_sig might not be empty here, - # the PRIMARY KEY may have an entry. - if unique_constraints: - return unique_constraints - else: - return ReflectionDefaults.unique_constraints() - - @reflection.cache - def get_check_constraints(self, connection, table_name, schema=None, **kw): - table_data = self._get_table_sql( - connection, table_name, schema=schema, **kw - ) - - CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?" r"CHECK *\( *(.+) *\),? *" - cks = [] - # NOTE: we aren't using re.S here because we actually are - # taking advantage of each CHECK constraint being all on one - # line in the table definition in order to delineate. This - # necessarily makes assumptions as to how the CREATE TABLE - # was emitted. - - for match in re.finditer(CHECK_PATTERN, table_data or "", re.I): - name = match.group(1) - - if name: - name = re.sub(r'^"|"$', "", name) - - cks.append({"sqltext": match.group(2), "name": name}) - cks.sort(key=lambda d: d["name"] or "~") # sort None as last - if cks: - return cks - else: - return ReflectionDefaults.check_constraints() - - @reflection.cache - def get_indexes(self, connection, table_name, schema=None, **kw): - pragma_indexes = self._get_table_pragma( - connection, "index_list", table_name, schema=schema - ) - indexes = [] - - # regular expression to extract the filter predicate of a partial - # index. this could fail to extract the predicate correctly on - # indexes created like - # CREATE INDEX i ON t (col || ') where') WHERE col <> '' - # but as this function does not support expression-based indexes - # this case does not occur. - partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE) - - if schema: - schema_expr = "%s." % self.identifier_preparer.quote_identifier( - schema - ) - else: - schema_expr = "" - - include_auto_indexes = kw.pop("include_auto_indexes", False) - for row in pragma_indexes: - # ignore implicit primary key index. - # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html - if not include_auto_indexes and row[1].startswith( - "sqlite_autoindex" - ): - continue - indexes.append( - dict( - name=row[1], - column_names=[], - unique=row[2], - dialect_options={}, - ) - ) - - # check partial indexes - if len(row) >= 5 and row[4]: - s = ( - "SELECT sql FROM %(schema)ssqlite_master " - "WHERE name = ? " - "AND type = 'index'" % {"schema": schema_expr} - ) - rs = connection.exec_driver_sql(s, (row[1],)) - index_sql = rs.scalar() - predicate_match = partial_pred_re.search(index_sql) - if predicate_match is None: - # unless the regex is broken this case shouldn't happen - # because we know this is a partial index, so the - # definition sql should match the regex - util.warn( - "Failed to look up filter predicate of " - "partial index %s" % row[1] - ) - else: - predicate = predicate_match.group(1) - indexes[-1]["dialect_options"]["sqlite_where"] = text( - predicate - ) - - # loop thru unique indexes to get the column names. - for idx in list(indexes): - pragma_index = self._get_table_pragma( - connection, "index_info", idx["name"], schema=schema - ) - - for row in pragma_index: - if row[2] is None: - util.warn( - "Skipped unsupported reflection of " - "expression-based index %s" % idx["name"] - ) - indexes.remove(idx) - break - else: - idx["column_names"].append(row[2]) - - indexes.sort(key=lambda d: d["name"] or "~") # sort None as last - if indexes: - return indexes - elif not self.has_table(connection, table_name, schema): - raise exc.NoSuchTableError( - f"{schema}.{table_name}" if schema else table_name - ) - else: - return ReflectionDefaults.indexes() - - def _is_sys_table(self, table_name): - return table_name in { - "sqlite_schema", - "sqlite_master", - "sqlite_temp_schema", - "sqlite_temp_master", - } - - @reflection.cache - def _get_table_sql(self, connection, table_name, schema=None, **kw): - if schema: - schema_expr = "%s." % ( - self.identifier_preparer.quote_identifier(schema) - ) - else: - schema_expr = "" - try: - s = ( - "SELECT sql FROM " - " (SELECT * FROM %(schema)ssqlite_master UNION ALL " - " SELECT * FROM %(schema)ssqlite_temp_master) " - "WHERE name = ? " - "AND type in ('table', 'view')" % {"schema": schema_expr} - ) - rs = connection.exec_driver_sql(s, (table_name,)) - except exc.DBAPIError: - s = ( - "SELECT sql FROM %(schema)ssqlite_master " - "WHERE name = ? " - "AND type in ('table', 'view')" % {"schema": schema_expr} - ) - rs = connection.exec_driver_sql(s, (table_name,)) - value = rs.scalar() - if value is None and not self._is_sys_table(table_name): - raise exc.NoSuchTableError(f"{schema_expr}{table_name}") - return value - - def _get_table_pragma(self, connection, pragma, table_name, schema=None): - quote = self.identifier_preparer.quote_identifier - if schema is not None: - statements = [f"PRAGMA {quote(schema)}."] - else: - # because PRAGMA looks in all attached databases if no schema - # given, need to specify "main" schema, however since we want - # 'temp' tables in the same namespace as 'main', need to run - # the PRAGMA twice - statements = ["PRAGMA main.", "PRAGMA temp."] - - qtable = quote(table_name) - for statement in statements: - statement = f"{statement}{pragma}({qtable})" - cursor = connection.exec_driver_sql(statement) - if not cursor._soft_closed: - # work around SQLite issue whereby cursor.description - # is blank when PRAGMA returns no rows: - # https://www.sqlite.org/cvstrac/tktview?tn=1884 - result = cursor.fetchall() - else: - result = [] - if result: - return result - else: - return [] diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py deleted file mode 100644 index dcf5e44..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py +++ /dev/null @@ -1,240 +0,0 @@ -# dialects/sqlite/dml.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -from __future__ import annotations - -from typing import Any - -from .._typing import _OnConflictIndexElementsT -from .._typing import _OnConflictIndexWhereT -from .._typing import _OnConflictSetT -from .._typing import _OnConflictWhereT -from ... import util -from ...sql import coercions -from ...sql import roles -from ...sql._typing import _DMLTableArgument -from ...sql.base import _exclusive_against -from ...sql.base import _generative -from ...sql.base import ColumnCollection -from ...sql.base import ReadOnlyColumnCollection -from ...sql.dml import Insert as StandardInsert -from ...sql.elements import ClauseElement -from ...sql.elements import KeyedColumnElement -from ...sql.expression import alias -from ...util.typing import Self - -__all__ = ("Insert", "insert") - - -def insert(table: _DMLTableArgument) -> Insert: - """Construct a sqlite-specific variant :class:`_sqlite.Insert` - construct. - - .. container:: inherited_member - - The :func:`sqlalchemy.dialects.sqlite.insert` function creates - a :class:`sqlalchemy.dialects.sqlite.Insert`. This class is based - on the dialect-agnostic :class:`_sql.Insert` construct which may - be constructed using the :func:`_sql.insert` function in - SQLAlchemy Core. - - The :class:`_sqlite.Insert` construct includes additional methods - :meth:`_sqlite.Insert.on_conflict_do_update`, - :meth:`_sqlite.Insert.on_conflict_do_nothing`. - - """ - return Insert(table) - - -class Insert(StandardInsert): - """SQLite-specific implementation of INSERT. - - Adds methods for SQLite-specific syntaxes such as ON CONFLICT. - - The :class:`_sqlite.Insert` object is created using the - :func:`sqlalchemy.dialects.sqlite.insert` function. - - .. versionadded:: 1.4 - - .. seealso:: - - :ref:`sqlite_on_conflict_insert` - - """ - - stringify_dialect = "sqlite" - inherit_cache = False - - @util.memoized_property - def excluded( - self, - ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]: - """Provide the ``excluded`` namespace for an ON CONFLICT statement - - SQLite's ON CONFLICT clause allows reference to the row that would - be inserted, known as ``excluded``. This attribute provides - all columns in this row to be referenceable. - - .. tip:: The :attr:`_sqlite.Insert.excluded` attribute is an instance - of :class:`_expression.ColumnCollection`, which provides an - interface the same as that of the :attr:`_schema.Table.c` - collection described at :ref:`metadata_tables_and_columns`. - With this collection, ordinary names are accessible like attributes - (e.g. ``stmt.excluded.some_column``), but special names and - dictionary method names should be accessed using indexed access, - such as ``stmt.excluded["column name"]`` or - ``stmt.excluded["values"]``. See the docstring for - :class:`_expression.ColumnCollection` for further examples. - - """ - return alias(self.table, name="excluded").columns - - _on_conflict_exclusive = _exclusive_against( - "_post_values_clause", - msgs={ - "_post_values_clause": "This Insert construct already has " - "an ON CONFLICT clause established" - }, - ) - - @_generative - @_on_conflict_exclusive - def on_conflict_do_update( - self, - index_elements: _OnConflictIndexElementsT = None, - index_where: _OnConflictIndexWhereT = None, - set_: _OnConflictSetT = None, - where: _OnConflictWhereT = None, - ) -> Self: - r""" - Specifies a DO UPDATE SET action for ON CONFLICT clause. - - :param index_elements: - A sequence consisting of string column names, :class:`_schema.Column` - objects, or other column expression objects that will be used - to infer a target index or unique constraint. - - :param index_where: - Additional WHERE criterion that can be used to infer a - conditional target index. - - :param set\_: - A dictionary or other mapping object - where the keys are either names of columns in the target table, - or :class:`_schema.Column` objects or other ORM-mapped columns - matching that of the target table, and expressions or literals - as values, specifying the ``SET`` actions to take. - - .. versionadded:: 1.4 The - :paramref:`_sqlite.Insert.on_conflict_do_update.set_` - parameter supports :class:`_schema.Column` objects from the target - :class:`_schema.Table` as keys. - - .. warning:: This dictionary does **not** take into account - Python-specified default UPDATE values or generation functions, - e.g. those specified using :paramref:`_schema.Column.onupdate`. - These values will not be exercised for an ON CONFLICT style of - UPDATE, unless they are manually specified in the - :paramref:`.Insert.on_conflict_do_update.set_` dictionary. - - :param where: - Optional argument. If present, can be a literal SQL - string or an acceptable expression for a ``WHERE`` clause - that restricts the rows affected by ``DO UPDATE SET``. Rows - not meeting the ``WHERE`` condition will not be updated - (effectively a ``DO NOTHING`` for those rows). - - """ - - self._post_values_clause = OnConflictDoUpdate( - index_elements, index_where, set_, where - ) - return self - - @_generative - @_on_conflict_exclusive - def on_conflict_do_nothing( - self, - index_elements: _OnConflictIndexElementsT = None, - index_where: _OnConflictIndexWhereT = None, - ) -> Self: - """ - Specifies a DO NOTHING action for ON CONFLICT clause. - - :param index_elements: - A sequence consisting of string column names, :class:`_schema.Column` - objects, or other column expression objects that will be used - to infer a target index or unique constraint. - - :param index_where: - Additional WHERE criterion that can be used to infer a - conditional target index. - - """ - - self._post_values_clause = OnConflictDoNothing( - index_elements, index_where - ) - return self - - -class OnConflictClause(ClauseElement): - stringify_dialect = "sqlite" - - constraint_target: None - inferred_target_elements: _OnConflictIndexElementsT - inferred_target_whereclause: _OnConflictIndexWhereT - - def __init__( - self, - index_elements: _OnConflictIndexElementsT = None, - index_where: _OnConflictIndexWhereT = None, - ): - if index_elements is not None: - self.constraint_target = None - self.inferred_target_elements = index_elements - self.inferred_target_whereclause = index_where - else: - self.constraint_target = self.inferred_target_elements = ( - self.inferred_target_whereclause - ) = None - - -class OnConflictDoNothing(OnConflictClause): - __visit_name__ = "on_conflict_do_nothing" - - -class OnConflictDoUpdate(OnConflictClause): - __visit_name__ = "on_conflict_do_update" - - def __init__( - self, - index_elements: _OnConflictIndexElementsT = None, - index_where: _OnConflictIndexWhereT = None, - set_: _OnConflictSetT = None, - where: _OnConflictWhereT = None, - ): - super().__init__( - index_elements=index_elements, - index_where=index_where, - ) - - if isinstance(set_, dict): - if not set_: - raise ValueError("set parameter dictionary must not be empty") - elif isinstance(set_, ColumnCollection): - set_ = dict(set_) - else: - raise ValueError( - "set parameter must be a non-empty dictionary " - "or a ColumnCollection such as the `.c.` collection " - "of a Table object" - ) - self.update_values_to_set = [ - (coercions.expect(roles.DMLColumnRole, key), value) - for key, value in set_.items() - ] - self.update_whereclause = where diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py deleted file mode 100644 index ec29802..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py +++ /dev/null @@ -1,92 +0,0 @@ -# dialects/sqlite/json.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -from ... import types as sqltypes - - -class JSON(sqltypes.JSON): - """SQLite JSON type. - - SQLite supports JSON as of version 3.9 through its JSON1_ extension. Note - that JSON1_ is a - `loadable extension <https://www.sqlite.org/loadext.html>`_ and as such - may not be available, or may require run-time loading. - - :class:`_sqlite.JSON` is used automatically whenever the base - :class:`_types.JSON` datatype is used against a SQLite backend. - - .. seealso:: - - :class:`_types.JSON` - main documentation for the generic - cross-platform JSON datatype. - - The :class:`_sqlite.JSON` type supports persistence of JSON values - as well as the core index operations provided by :class:`_types.JSON` - datatype, by adapting the operations to render the ``JSON_EXTRACT`` - function wrapped in the ``JSON_QUOTE`` function at the database level. - Extracted values are quoted in order to ensure that the results are - always JSON string values. - - - .. versionadded:: 1.3 - - - .. _JSON1: https://www.sqlite.org/json1.html - - """ - - -# Note: these objects currently match exactly those of MySQL, however since -# these are not generalizable to all JSON implementations, remain separately -# implemented for each dialect. -class _FormatTypeMixin: - def _format_value(self, value): - raise NotImplementedError() - - def bind_processor(self, dialect): - super_proc = self.string_bind_processor(dialect) - - def process(value): - value = self._format_value(value) - if super_proc: - value = super_proc(value) - return value - - return process - - def literal_processor(self, dialect): - super_proc = self.string_literal_processor(dialect) - - def process(value): - value = self._format_value(value) - if super_proc: - value = super_proc(value) - return value - - return process - - -class JSONIndexType(_FormatTypeMixin, sqltypes.JSON.JSONIndexType): - def _format_value(self, value): - if isinstance(value, int): - value = "$[%s]" % value - else: - value = '$."%s"' % value - return value - - -class JSONPathType(_FormatTypeMixin, sqltypes.JSON.JSONPathType): - def _format_value(self, value): - return "$%s" % ( - "".join( - [ - "[%s]" % elem if isinstance(elem, int) else '."%s"' % elem - for elem in value - ] - ) - ) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py deleted file mode 100644 index f18568b..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py +++ /dev/null @@ -1,198 +0,0 @@ -# dialects/sqlite/provision.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -import os -import re - -from ... import exc -from ...engine import url as sa_url -from ...testing.provision import create_db -from ...testing.provision import drop_db -from ...testing.provision import follower_url_from_main -from ...testing.provision import generate_driver_url -from ...testing.provision import log -from ...testing.provision import post_configure_engine -from ...testing.provision import run_reap_dbs -from ...testing.provision import stop_test_class_outside_fixtures -from ...testing.provision import temp_table_keyword_args -from ...testing.provision import upsert - - -# TODO: I can't get this to build dynamically with pytest-xdist procs -_drivernames = { - "pysqlite", - "aiosqlite", - "pysqlcipher", - "pysqlite_numeric", - "pysqlite_dollar", -} - - -def _format_url(url, driver, ident): - """given a sqlite url + desired driver + ident, make a canonical - URL out of it - - """ - url = sa_url.make_url(url) - - if driver is None: - driver = url.get_driver_name() - - filename = url.database - - needs_enc = driver == "pysqlcipher" - name_token = None - - if filename and filename != ":memory:": - assert "test_schema" not in filename - tokens = re.split(r"[_\.]", filename) - - new_filename = f"{driver}" - - for token in tokens: - if token in _drivernames: - if driver is None: - driver = token - continue - elif token in ("db", "enc"): - continue - elif name_token is None: - name_token = token.strip("_") - - assert name_token, f"sqlite filename has no name token: {url.database}" - - new_filename = f"{name_token}_{driver}" - if ident: - new_filename += f"_{ident}" - new_filename += ".db" - if needs_enc: - new_filename += ".enc" - url = url.set(database=new_filename) - - if needs_enc: - url = url.set(password="test") - - url = url.set(drivername="sqlite+%s" % (driver,)) - - return url - - -@generate_driver_url.for_db("sqlite") -def generate_driver_url(url, driver, query_str): - url = _format_url(url, driver, None) - - try: - url.get_dialect() - except exc.NoSuchModuleError: - return None - else: - return url - - -@follower_url_from_main.for_db("sqlite") -def _sqlite_follower_url_from_main(url, ident): - return _format_url(url, None, ident) - - -@post_configure_engine.for_db("sqlite") -def _sqlite_post_configure_engine(url, engine, follower_ident): - from sqlalchemy import event - - if follower_ident: - attach_path = f"{follower_ident}_{engine.driver}_test_schema.db" - else: - attach_path = f"{engine.driver}_test_schema.db" - - @event.listens_for(engine, "connect") - def connect(dbapi_connection, connection_record): - # use file DBs in all cases, memory acts kind of strangely - # as an attached - - # NOTE! this has to be done *per connection*. New sqlite connection, - # as we get with say, QueuePool, the attaches are gone. - # so schemes to delete those attached files have to be done at the - # filesystem level and not rely upon what attachments are in a - # particular SQLite connection - dbapi_connection.execute( - f'ATTACH DATABASE "{attach_path}" AS test_schema' - ) - - @event.listens_for(engine, "engine_disposed") - def dispose(engine): - """most databases should be dropped using - stop_test_class_outside_fixtures - - however a few tests like AttachedDBTest might not get triggered on - that main hook - - """ - - if os.path.exists(attach_path): - os.remove(attach_path) - - filename = engine.url.database - - if filename and filename != ":memory:" and os.path.exists(filename): - os.remove(filename) - - -@create_db.for_db("sqlite") -def _sqlite_create_db(cfg, eng, ident): - pass - - -@drop_db.for_db("sqlite") -def _sqlite_drop_db(cfg, eng, ident): - _drop_dbs_w_ident(eng.url.database, eng.driver, ident) - - -def _drop_dbs_w_ident(databasename, driver, ident): - for path in os.listdir("."): - fname, ext = os.path.split(path) - if ident in fname and ext in [".db", ".db.enc"]: - log.info("deleting SQLite database file: %s", path) - os.remove(path) - - -@stop_test_class_outside_fixtures.for_db("sqlite") -def stop_test_class_outside_fixtures(config, db, cls): - db.dispose() - - -@temp_table_keyword_args.for_db("sqlite") -def _sqlite_temp_table_keyword_args(cfg, eng): - return {"prefixes": ["TEMPORARY"]} - - -@run_reap_dbs.for_db("sqlite") -def _reap_sqlite_dbs(url, idents): - log.info("db reaper connecting to %r", url) - log.info("identifiers in file: %s", ", ".join(idents)) - url = sa_url.make_url(url) - for ident in idents: - for drivername in _drivernames: - _drop_dbs_w_ident(url.database, drivername, ident) - - -@upsert.for_db("sqlite") -def _upsert( - cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False -): - from sqlalchemy.dialects.sqlite import insert - - stmt = insert(table) - - if set_lambda: - stmt = stmt.on_conflict_do_update(set_=set_lambda(stmt.excluded)) - else: - stmt = stmt.on_conflict_do_nothing() - - stmt = stmt.returning( - *returning, sort_by_parameter_order=sort_by_parameter_order - ) - return stmt diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py deleted file mode 100644 index 388a4df..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py +++ /dev/null @@ -1,155 +0,0 @@ -# dialects/sqlite/pysqlcipher.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -""" -.. dialect:: sqlite+pysqlcipher - :name: pysqlcipher - :dbapi: sqlcipher 3 or pysqlcipher - :connectstring: sqlite+pysqlcipher://:passphrase@/file_path[?kdf_iter=<iter>] - - Dialect for support of DBAPIs that make use of the - `SQLCipher <https://www.zetetic.net/sqlcipher>`_ backend. - - -Driver ------- - -Current dialect selection logic is: - -* If the :paramref:`_sa.create_engine.module` parameter supplies a DBAPI module, - that module is used. -* Otherwise for Python 3, choose https://pypi.org/project/sqlcipher3/ -* If not available, fall back to https://pypi.org/project/pysqlcipher3/ -* For Python 2, https://pypi.org/project/pysqlcipher/ is used. - -.. warning:: The ``pysqlcipher3`` and ``pysqlcipher`` DBAPI drivers are no - longer maintained; the ``sqlcipher3`` driver as of this writing appears - to be current. For future compatibility, any pysqlcipher-compatible DBAPI - may be used as follows:: - - import sqlcipher_compatible_driver - - from sqlalchemy import create_engine - - e = create_engine( - "sqlite+pysqlcipher://:password@/dbname.db", - module=sqlcipher_compatible_driver - ) - -These drivers make use of the SQLCipher engine. This system essentially -introduces new PRAGMA commands to SQLite which allows the setting of a -passphrase and other encryption parameters, allowing the database file to be -encrypted. - - -Connect Strings ---------------- - -The format of the connect string is in every way the same as that -of the :mod:`~sqlalchemy.dialects.sqlite.pysqlite` driver, except that the -"password" field is now accepted, which should contain a passphrase:: - - e = create_engine('sqlite+pysqlcipher://:testing@/foo.db') - -For an absolute file path, two leading slashes should be used for the -database name:: - - e = create_engine('sqlite+pysqlcipher://:testing@//path/to/foo.db') - -A selection of additional encryption-related pragmas supported by SQLCipher -as documented at https://www.zetetic.net/sqlcipher/sqlcipher-api/ can be passed -in the query string, and will result in that PRAGMA being called for each -new connection. Currently, ``cipher``, ``kdf_iter`` -``cipher_page_size`` and ``cipher_use_hmac`` are supported:: - - e = create_engine('sqlite+pysqlcipher://:testing@/foo.db?cipher=aes-256-cfb&kdf_iter=64000') - -.. warning:: Previous versions of sqlalchemy did not take into consideration - the encryption-related pragmas passed in the url string, that were silently - ignored. This may cause errors when opening files saved by a - previous sqlalchemy version if the encryption options do not match. - - -Pooling Behavior ----------------- - -The driver makes a change to the default pool behavior of pysqlite -as described in :ref:`pysqlite_threading_pooling`. The pysqlcipher driver -has been observed to be significantly slower on connection than the -pysqlite driver, most likely due to the encryption overhead, so the -dialect here defaults to using the :class:`.SingletonThreadPool` -implementation, -instead of the :class:`.NullPool` pool used by pysqlite. As always, the pool -implementation is entirely configurable using the -:paramref:`_sa.create_engine.poolclass` parameter; the :class:`. -StaticPool` may -be more feasible for single-threaded use, or :class:`.NullPool` may be used -to prevent unencrypted connections from being held open for long periods of -time, at the expense of slower startup time for new connections. - - -""" # noqa - -from .pysqlite import SQLiteDialect_pysqlite -from ... import pool - - -class SQLiteDialect_pysqlcipher(SQLiteDialect_pysqlite): - driver = "pysqlcipher" - supports_statement_cache = True - - pragmas = ("kdf_iter", "cipher", "cipher_page_size", "cipher_use_hmac") - - @classmethod - def import_dbapi(cls): - try: - import sqlcipher3 as sqlcipher - except ImportError: - pass - else: - return sqlcipher - - from pysqlcipher3 import dbapi2 as sqlcipher - - return sqlcipher - - @classmethod - def get_pool_class(cls, url): - return pool.SingletonThreadPool - - def on_connect_url(self, url): - super_on_connect = super().on_connect_url(url) - - # pull the info we need from the URL early. Even though URL - # is immutable, we don't want any in-place changes to the URL - # to affect things - passphrase = url.password or "" - url_query = dict(url.query) - - def on_connect(conn): - cursor = conn.cursor() - cursor.execute('pragma key="%s"' % passphrase) - for prag in self.pragmas: - value = url_query.get(prag, None) - if value is not None: - cursor.execute('pragma %s="%s"' % (prag, value)) - cursor.close() - - if super_on_connect: - super_on_connect(conn) - - return on_connect - - def create_connect_args(self, url): - plain_url = url._replace(password=None) - plain_url = plain_url.difference_update_query(self.pragmas) - return super().create_connect_args(plain_url) - - -dialect = SQLiteDialect_pysqlcipher diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py deleted file mode 100644 index f39baf3..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py +++ /dev/null @@ -1,756 +0,0 @@ -# dialects/sqlite/pysqlite.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -r""" -.. dialect:: sqlite+pysqlite - :name: pysqlite - :dbapi: sqlite3 - :connectstring: sqlite+pysqlite:///file_path - :url: https://docs.python.org/library/sqlite3.html - - Note that ``pysqlite`` is the same driver as the ``sqlite3`` - module included with the Python distribution. - -Driver ------- - -The ``sqlite3`` Python DBAPI is standard on all modern Python versions; -for cPython and Pypy, no additional installation is necessary. - - -Connect Strings ---------------- - -The file specification for the SQLite database is taken as the "database" -portion of the URL. Note that the format of a SQLAlchemy url is:: - - driver://user:pass@host/database - -This means that the actual filename to be used starts with the characters to -the **right** of the third slash. So connecting to a relative filepath -looks like:: - - # relative path - e = create_engine('sqlite:///path/to/database.db') - -An absolute path, which is denoted by starting with a slash, means you -need **four** slashes:: - - # absolute path - e = create_engine('sqlite:////path/to/database.db') - -To use a Windows path, regular drive specifications and backslashes can be -used. Double backslashes are probably needed:: - - # absolute path on Windows - e = create_engine('sqlite:///C:\\path\\to\\database.db') - -To use sqlite ``:memory:`` database specify it as the filename using -``sqlite://:memory:``. It's also the default if no filepath is -present, specifying only ``sqlite://`` and nothing else:: - - # in-memory database - e = create_engine('sqlite://:memory:') - # also in-memory database - e2 = create_engine('sqlite://') - -.. _pysqlite_uri_connections: - -URI Connections -^^^^^^^^^^^^^^^ - -Modern versions of SQLite support an alternative system of connecting using a -`driver level URI <https://www.sqlite.org/uri.html>`_, which has the advantage -that additional driver-level arguments can be passed including options such as -"read only". The Python sqlite3 driver supports this mode under modern Python -3 versions. The SQLAlchemy pysqlite driver supports this mode of use by -specifying "uri=true" in the URL query string. The SQLite-level "URI" is kept -as the "database" portion of the SQLAlchemy url (that is, following a slash):: - - e = create_engine("sqlite:///file:path/to/database?mode=ro&uri=true") - -.. note:: The "uri=true" parameter must appear in the **query string** - of the URL. It will not currently work as expected if it is only - present in the :paramref:`_sa.create_engine.connect_args` - parameter dictionary. - -The logic reconciles the simultaneous presence of SQLAlchemy's query string and -SQLite's query string by separating out the parameters that belong to the -Python sqlite3 driver vs. those that belong to the SQLite URI. This is -achieved through the use of a fixed list of parameters known to be accepted by -the Python side of the driver. For example, to include a URL that indicates -the Python sqlite3 "timeout" and "check_same_thread" parameters, along with the -SQLite "mode" and "nolock" parameters, they can all be passed together on the -query string:: - - e = create_engine( - "sqlite:///file:path/to/database?" - "check_same_thread=true&timeout=10&mode=ro&nolock=1&uri=true" - ) - -Above, the pysqlite / sqlite3 DBAPI would be passed arguments as:: - - sqlite3.connect( - "file:path/to/database?mode=ro&nolock=1", - check_same_thread=True, timeout=10, uri=True - ) - -Regarding future parameters added to either the Python or native drivers. new -parameter names added to the SQLite URI scheme should be automatically -accommodated by this scheme. New parameter names added to the Python driver -side can be accommodated by specifying them in the -:paramref:`_sa.create_engine.connect_args` dictionary, -until dialect support is -added by SQLAlchemy. For the less likely case that the native SQLite driver -adds a new parameter name that overlaps with one of the existing, known Python -driver parameters (such as "timeout" perhaps), SQLAlchemy's dialect would -require adjustment for the URL scheme to continue to support this. - -As is always the case for all SQLAlchemy dialects, the entire "URL" process -can be bypassed in :func:`_sa.create_engine` through the use of the -:paramref:`_sa.create_engine.creator` -parameter which allows for a custom callable -that creates a Python sqlite3 driver level connection directly. - -.. versionadded:: 1.3.9 - -.. seealso:: - - `Uniform Resource Identifiers <https://www.sqlite.org/uri.html>`_ - in - the SQLite documentation - -.. _pysqlite_regexp: - -Regular Expression Support ---------------------------- - -.. versionadded:: 1.4 - -Support for the :meth:`_sql.ColumnOperators.regexp_match` operator is provided -using Python's re.search_ function. SQLite itself does not include a working -regular expression operator; instead, it includes a non-implemented placeholder -operator ``REGEXP`` that calls a user-defined function that must be provided. - -SQLAlchemy's implementation makes use of the pysqlite create_function_ hook -as follows:: - - - def regexp(a, b): - return re.search(a, b) is not None - - sqlite_connection.create_function( - "regexp", 2, regexp, - ) - -There is currently no support for regular expression flags as a separate -argument, as these are not supported by SQLite's REGEXP operator, however these -may be included inline within the regular expression string. See `Python regular expressions`_ for -details. - -.. seealso:: - - `Python regular expressions`_: Documentation for Python's regular expression syntax. - -.. _create_function: https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function - -.. _re.search: https://docs.python.org/3/library/re.html#re.search - -.. _Python regular expressions: https://docs.python.org/3/library/re.html#re.search - - - -Compatibility with sqlite3 "native" date and datetime types ------------------------------------------------------------ - -The pysqlite driver includes the sqlite3.PARSE_DECLTYPES and -sqlite3.PARSE_COLNAMES options, which have the effect of any column -or expression explicitly cast as "date" or "timestamp" will be converted -to a Python date or datetime object. The date and datetime types provided -with the pysqlite dialect are not currently compatible with these options, -since they render the ISO date/datetime including microseconds, which -pysqlite's driver does not. Additionally, SQLAlchemy does not at -this time automatically render the "cast" syntax required for the -freestanding functions "current_timestamp" and "current_date" to return -datetime/date types natively. Unfortunately, pysqlite -does not provide the standard DBAPI types in ``cursor.description``, -leaving SQLAlchemy with no way to detect these types on the fly -without expensive per-row type checks. - -Keeping in mind that pysqlite's parsing option is not recommended, -nor should be necessary, for use with SQLAlchemy, usage of PARSE_DECLTYPES -can be forced if one configures "native_datetime=True" on create_engine():: - - engine = create_engine('sqlite://', - connect_args={'detect_types': - sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES}, - native_datetime=True - ) - -With this flag enabled, the DATE and TIMESTAMP types (but note - not the -DATETIME or TIME types...confused yet ?) will not perform any bind parameter -or result processing. Execution of "func.current_date()" will return a string. -"func.current_timestamp()" is registered as returning a DATETIME type in -SQLAlchemy, so this function still receives SQLAlchemy-level result -processing. - -.. _pysqlite_threading_pooling: - -Threading/Pooling Behavior ---------------------------- - -The ``sqlite3`` DBAPI by default prohibits the use of a particular connection -in a thread which is not the one in which it was created. As SQLite has -matured, it's behavior under multiple threads has improved, and even includes -options for memory only databases to be used in multiple threads. - -The thread prohibition is known as "check same thread" and may be controlled -using the ``sqlite3`` parameter ``check_same_thread``, which will disable or -enable this check. SQLAlchemy's default behavior here is to set -``check_same_thread`` to ``False`` automatically whenever a file-based database -is in use, to establish compatibility with the default pool class -:class:`.QueuePool`. - -The SQLAlchemy ``pysqlite`` DBAPI establishes the connection pool differently -based on the kind of SQLite database that's requested: - -* When a ``:memory:`` SQLite database is specified, the dialect by default - will use :class:`.SingletonThreadPool`. This pool maintains a single - connection per thread, so that all access to the engine within the current - thread use the same ``:memory:`` database - other threads would access a - different ``:memory:`` database. The ``check_same_thread`` parameter - defaults to ``True``. -* When a file-based database is specified, the dialect will use - :class:`.QueuePool` as the source of connections. at the same time, - the ``check_same_thread`` flag is set to False by default unless overridden. - - .. versionchanged:: 2.0 - - SQLite file database engines now use :class:`.QueuePool` by default. - Previously, :class:`.NullPool` were used. The :class:`.NullPool` class - may be used by specifying it via the - :paramref:`_sa.create_engine.poolclass` parameter. - -Disabling Connection Pooling for File Databases -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Pooling may be disabled for a file based database by specifying the -:class:`.NullPool` implementation for the :func:`_sa.create_engine.poolclass` -parameter:: - - from sqlalchemy import NullPool - engine = create_engine("sqlite:///myfile.db", poolclass=NullPool) - -It's been observed that the :class:`.NullPool` implementation incurs an -extremely small performance overhead for repeated checkouts due to the lack of -connection re-use implemented by :class:`.QueuePool`. However, it still -may be beneficial to use this class if the application is experiencing -issues with files being locked. - -Using a Memory Database in Multiple Threads -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -To use a ``:memory:`` database in a multithreaded scenario, the same -connection object must be shared among threads, since the database exists -only within the scope of that connection. The -:class:`.StaticPool` implementation will maintain a single connection -globally, and the ``check_same_thread`` flag can be passed to Pysqlite -as ``False``:: - - from sqlalchemy.pool import StaticPool - engine = create_engine('sqlite://', - connect_args={'check_same_thread':False}, - poolclass=StaticPool) - -Note that using a ``:memory:`` database in multiple threads requires a recent -version of SQLite. - -Using Temporary Tables with SQLite -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Due to the way SQLite deals with temporary tables, if you wish to use a -temporary table in a file-based SQLite database across multiple checkouts -from the connection pool, such as when using an ORM :class:`.Session` where -the temporary table should continue to remain after :meth:`.Session.commit` or -:meth:`.Session.rollback` is called, a pool which maintains a single -connection must be used. Use :class:`.SingletonThreadPool` if the scope is -only needed within the current thread, or :class:`.StaticPool` is scope is -needed within multiple threads for this case:: - - # maintain the same connection per thread - from sqlalchemy.pool import SingletonThreadPool - engine = create_engine('sqlite:///mydb.db', - poolclass=SingletonThreadPool) - - - # maintain the same connection across all threads - from sqlalchemy.pool import StaticPool - engine = create_engine('sqlite:///mydb.db', - poolclass=StaticPool) - -Note that :class:`.SingletonThreadPool` should be configured for the number -of threads that are to be used; beyond that number, connections will be -closed out in a non deterministic way. - - -Dealing with Mixed String / Binary Columns ------------------------------------------------------- - -The SQLite database is weakly typed, and as such it is possible when using -binary values, which in Python are represented as ``b'some string'``, that a -particular SQLite database can have data values within different rows where -some of them will be returned as a ``b''`` value by the Pysqlite driver, and -others will be returned as Python strings, e.g. ``''`` values. This situation -is not known to occur if the SQLAlchemy :class:`.LargeBinary` datatype is used -consistently, however if a particular SQLite database has data that was -inserted using the Pysqlite driver directly, or when using the SQLAlchemy -:class:`.String` type which was later changed to :class:`.LargeBinary`, the -table will not be consistently readable because SQLAlchemy's -:class:`.LargeBinary` datatype does not handle strings so it has no way of -"encoding" a value that is in string format. - -To deal with a SQLite table that has mixed string / binary data in the -same column, use a custom type that will check each row individually:: - - from sqlalchemy import String - from sqlalchemy import TypeDecorator - - class MixedBinary(TypeDecorator): - impl = String - cache_ok = True - - def process_result_value(self, value, dialect): - if isinstance(value, str): - value = bytes(value, 'utf-8') - elif value is not None: - value = bytes(value) - - return value - -Then use the above ``MixedBinary`` datatype in the place where -:class:`.LargeBinary` would normally be used. - -.. _pysqlite_serializable: - -Serializable isolation / Savepoints / Transactional DDL -------------------------------------------------------- - -In the section :ref:`sqlite_concurrency`, we refer to the pysqlite -driver's assortment of issues that prevent several features of SQLite -from working correctly. The pysqlite DBAPI driver has several -long-standing bugs which impact the correctness of its transactional -behavior. In its default mode of operation, SQLite features such as -SERIALIZABLE isolation, transactional DDL, and SAVEPOINT support are -non-functional, and in order to use these features, workarounds must -be taken. - -The issue is essentially that the driver attempts to second-guess the user's -intent, failing to start transactions and sometimes ending them prematurely, in -an effort to minimize the SQLite databases's file locking behavior, even -though SQLite itself uses "shared" locks for read-only activities. - -SQLAlchemy chooses to not alter this behavior by default, as it is the -long-expected behavior of the pysqlite driver; if and when the pysqlite -driver attempts to repair these issues, that will be more of a driver towards -defaults for SQLAlchemy. - -The good news is that with a few events, we can implement transactional -support fully, by disabling pysqlite's feature entirely and emitting BEGIN -ourselves. This is achieved using two event listeners:: - - from sqlalchemy import create_engine, event - - engine = create_engine("sqlite:///myfile.db") - - @event.listens_for(engine, "connect") - def do_connect(dbapi_connection, connection_record): - # disable pysqlite's emitting of the BEGIN statement entirely. - # also stops it from emitting COMMIT before any DDL. - dbapi_connection.isolation_level = None - - @event.listens_for(engine, "begin") - def do_begin(conn): - # emit our own BEGIN - conn.exec_driver_sql("BEGIN") - -.. warning:: When using the above recipe, it is advised to not use the - :paramref:`.Connection.execution_options.isolation_level` setting on - :class:`_engine.Connection` and :func:`_sa.create_engine` - with the SQLite driver, - as this function necessarily will also alter the ".isolation_level" setting. - - -Above, we intercept a new pysqlite connection and disable any transactional -integration. Then, at the point at which SQLAlchemy knows that transaction -scope is to begin, we emit ``"BEGIN"`` ourselves. - -When we take control of ``"BEGIN"``, we can also control directly SQLite's -locking modes, introduced at -`BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_, -by adding the desired locking mode to our ``"BEGIN"``:: - - @event.listens_for(engine, "begin") - def do_begin(conn): - conn.exec_driver_sql("BEGIN EXCLUSIVE") - -.. seealso:: - - `BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_ - - on the SQLite site - - `sqlite3 SELECT does not BEGIN a transaction <https://bugs.python.org/issue9924>`_ - - on the Python bug tracker - - `sqlite3 module breaks transactions and potentially corrupts data <https://bugs.python.org/issue10740>`_ - - on the Python bug tracker - -.. _pysqlite_udfs: - -User-Defined Functions ----------------------- - -pysqlite supports a `create_function() <https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function>`_ -method that allows us to create our own user-defined functions (UDFs) in Python and use them directly in SQLite queries. -These functions are registered with a specific DBAPI Connection. - -SQLAlchemy uses connection pooling with file-based SQLite databases, so we need to ensure that the UDF is attached to the -connection when it is created. That is accomplished with an event listener:: - - from sqlalchemy import create_engine - from sqlalchemy import event - from sqlalchemy import text - - - def udf(): - return "udf-ok" - - - engine = create_engine("sqlite:///./db_file") - - - @event.listens_for(engine, "connect") - def connect(conn, rec): - conn.create_function("udf", 0, udf) - - - for i in range(5): - with engine.connect() as conn: - print(conn.scalar(text("SELECT UDF()"))) - - -""" # noqa - -import math -import os -import re - -from .base import DATE -from .base import DATETIME -from .base import SQLiteDialect -from ... import exc -from ... import pool -from ... import types as sqltypes -from ... import util - - -class _SQLite_pysqliteTimeStamp(DATETIME): - def bind_processor(self, dialect): - if dialect.native_datetime: - return None - else: - return DATETIME.bind_processor(self, dialect) - - def result_processor(self, dialect, coltype): - if dialect.native_datetime: - return None - else: - return DATETIME.result_processor(self, dialect, coltype) - - -class _SQLite_pysqliteDate(DATE): - def bind_processor(self, dialect): - if dialect.native_datetime: - return None - else: - return DATE.bind_processor(self, dialect) - - def result_processor(self, dialect, coltype): - if dialect.native_datetime: - return None - else: - return DATE.result_processor(self, dialect, coltype) - - -class SQLiteDialect_pysqlite(SQLiteDialect): - default_paramstyle = "qmark" - supports_statement_cache = True - returns_native_bytes = True - - colspecs = util.update_copy( - SQLiteDialect.colspecs, - { - sqltypes.Date: _SQLite_pysqliteDate, - sqltypes.TIMESTAMP: _SQLite_pysqliteTimeStamp, - }, - ) - - description_encoding = None - - driver = "pysqlite" - - @classmethod - def import_dbapi(cls): - from sqlite3 import dbapi2 as sqlite - - return sqlite - - @classmethod - def _is_url_file_db(cls, url): - if (url.database and url.database != ":memory:") and ( - url.query.get("mode", None) != "memory" - ): - return True - else: - return False - - @classmethod - def get_pool_class(cls, url): - if cls._is_url_file_db(url): - return pool.QueuePool - else: - return pool.SingletonThreadPool - - def _get_server_version_info(self, connection): - return self.dbapi.sqlite_version_info - - _isolation_lookup = SQLiteDialect._isolation_lookup.union( - { - "AUTOCOMMIT": None, - } - ) - - def set_isolation_level(self, dbapi_connection, level): - if level == "AUTOCOMMIT": - dbapi_connection.isolation_level = None - else: - dbapi_connection.isolation_level = "" - return super().set_isolation_level(dbapi_connection, level) - - def on_connect(self): - def regexp(a, b): - if b is None: - return None - return re.search(a, b) is not None - - if util.py38 and self._get_server_version_info(None) >= (3, 9): - # sqlite must be greater than 3.8.3 for deterministic=True - # https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function - # the check is more conservative since there were still issues - # with following 3.8 sqlite versions - create_func_kw = {"deterministic": True} - else: - create_func_kw = {} - - def set_regexp(dbapi_connection): - dbapi_connection.create_function( - "regexp", 2, regexp, **create_func_kw - ) - - def floor_func(dbapi_connection): - # NOTE: floor is optionally present in sqlite 3.35+ , however - # as it is normally non-present we deliver floor() unconditionally - # for now. - # https://www.sqlite.org/lang_mathfunc.html - dbapi_connection.create_function( - "floor", 1, math.floor, **create_func_kw - ) - - fns = [set_regexp, floor_func] - - def connect(conn): - for fn in fns: - fn(conn) - - return connect - - def create_connect_args(self, url): - if url.username or url.password or url.host or url.port: - raise exc.ArgumentError( - "Invalid SQLite URL: %s\n" - "Valid SQLite URL forms are:\n" - " sqlite:///:memory: (or, sqlite://)\n" - " sqlite:///relative/path/to/file.db\n" - " sqlite:////absolute/path/to/file.db" % (url,) - ) - - # theoretically, this list can be augmented, at least as far as - # parameter names accepted by sqlite3/pysqlite, using - # inspect.getfullargspec(). for the moment this seems like overkill - # as these parameters don't change very often, and as always, - # parameters passed to connect_args will always go to the - # sqlite3/pysqlite driver. - pysqlite_args = [ - ("uri", bool), - ("timeout", float), - ("isolation_level", str), - ("detect_types", int), - ("check_same_thread", bool), - ("cached_statements", int), - ] - opts = url.query - pysqlite_opts = {} - for key, type_ in pysqlite_args: - util.coerce_kw_type(opts, key, type_, dest=pysqlite_opts) - - if pysqlite_opts.get("uri", False): - uri_opts = dict(opts) - # here, we are actually separating the parameters that go to - # sqlite3/pysqlite vs. those that go the SQLite URI. What if - # two names conflict? again, this seems to be not the case right - # now, and in the case that new names are added to - # either side which overlap, again the sqlite3/pysqlite parameters - # can be passed through connect_args instead of in the URL. - # If SQLite native URIs add a parameter like "timeout" that - # we already have listed here for the python driver, then we need - # to adjust for that here. - for key, type_ in pysqlite_args: - uri_opts.pop(key, None) - filename = url.database - if uri_opts: - # sorting of keys is for unit test support - filename += "?" + ( - "&".join( - "%s=%s" % (key, uri_opts[key]) - for key in sorted(uri_opts) - ) - ) - else: - filename = url.database or ":memory:" - if filename != ":memory:": - filename = os.path.abspath(filename) - - pysqlite_opts.setdefault( - "check_same_thread", not self._is_url_file_db(url) - ) - - return ([filename], pysqlite_opts) - - def is_disconnect(self, e, connection, cursor): - return isinstance( - e, self.dbapi.ProgrammingError - ) and "Cannot operate on a closed database." in str(e) - - -dialect = SQLiteDialect_pysqlite - - -class _SQLiteDialect_pysqlite_numeric(SQLiteDialect_pysqlite): - """numeric dialect for testing only - - internal use only. This dialect is **NOT** supported by SQLAlchemy - and may change at any time. - - """ - - supports_statement_cache = True - default_paramstyle = "numeric" - driver = "pysqlite_numeric" - - _first_bind = ":1" - _not_in_statement_regexp = None - - def __init__(self, *arg, **kw): - kw.setdefault("paramstyle", "numeric") - super().__init__(*arg, **kw) - - def create_connect_args(self, url): - arg, opts = super().create_connect_args(url) - opts["factory"] = self._fix_sqlite_issue_99953() - return arg, opts - - def _fix_sqlite_issue_99953(self): - import sqlite3 - - first_bind = self._first_bind - if self._not_in_statement_regexp: - nis = self._not_in_statement_regexp - - def _test_sql(sql): - m = nis.search(sql) - assert not m, f"Found {nis.pattern!r} in {sql!r}" - - else: - - def _test_sql(sql): - pass - - def _numeric_param_as_dict(parameters): - if parameters: - assert isinstance(parameters, tuple) - return { - str(idx): value for idx, value in enumerate(parameters, 1) - } - else: - return () - - class SQLiteFix99953Cursor(sqlite3.Cursor): - def execute(self, sql, parameters=()): - _test_sql(sql) - if first_bind in sql: - parameters = _numeric_param_as_dict(parameters) - return super().execute(sql, parameters) - - def executemany(self, sql, parameters): - _test_sql(sql) - if first_bind in sql: - parameters = [ - _numeric_param_as_dict(p) for p in parameters - ] - return super().executemany(sql, parameters) - - class SQLiteFix99953Connection(sqlite3.Connection): - def cursor(self, factory=None): - if factory is None: - factory = SQLiteFix99953Cursor - return super().cursor(factory=factory) - - def execute(self, sql, parameters=()): - _test_sql(sql) - if first_bind in sql: - parameters = _numeric_param_as_dict(parameters) - return super().execute(sql, parameters) - - def executemany(self, sql, parameters): - _test_sql(sql) - if first_bind in sql: - parameters = [ - _numeric_param_as_dict(p) for p in parameters - ] - return super().executemany(sql, parameters) - - return SQLiteFix99953Connection - - -class _SQLiteDialect_pysqlite_dollar(_SQLiteDialect_pysqlite_numeric): - """numeric dialect that uses $ for testing only - - internal use only. This dialect is **NOT** supported by SQLAlchemy - and may change at any time. - - """ - - supports_statement_cache = True - default_paramstyle = "numeric_dollar" - driver = "pysqlite_dollar" - - _first_bind = "$1" - _not_in_statement_regexp = re.compile(r"[^\d]:\d+") - - def __init__(self, *arg, **kw): - kw.setdefault("paramstyle", "numeric_dollar") - super().__init__(*arg, **kw) |