summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py57
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pycbin1381 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pycbin19317 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pycbin104751 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pycbin10200 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pycbin4319 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pycbin7990 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pycbin6604 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pycbin33807 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py396
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py2782
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py240
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py92
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py198
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py155
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py756
16 files changed, 0 insertions, 4676 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py
deleted file mode 100644
index 45f088e..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__init__.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# dialects/sqlite/__init__.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-from . import aiosqlite # noqa
-from . import base # noqa
-from . import pysqlcipher # noqa
-from . import pysqlite # noqa
-from .base import BLOB
-from .base import BOOLEAN
-from .base import CHAR
-from .base import DATE
-from .base import DATETIME
-from .base import DECIMAL
-from .base import FLOAT
-from .base import INTEGER
-from .base import JSON
-from .base import NUMERIC
-from .base import REAL
-from .base import SMALLINT
-from .base import TEXT
-from .base import TIME
-from .base import TIMESTAMP
-from .base import VARCHAR
-from .dml import Insert
-from .dml import insert
-
-# default dialect
-base.dialect = dialect = pysqlite.dialect
-
-
-__all__ = (
- "BLOB",
- "BOOLEAN",
- "CHAR",
- "DATE",
- "DATETIME",
- "DECIMAL",
- "FLOAT",
- "INTEGER",
- "JSON",
- "NUMERIC",
- "SMALLINT",
- "TEXT",
- "TIME",
- "TIMESTAMP",
- "VARCHAR",
- "REAL",
- "Insert",
- "insert",
- "dialect",
-)
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index e4a9b51..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pyc
deleted file mode 100644
index 41466a4..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/aiosqlite.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pyc
deleted file mode 100644
index e7f5c22..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/base.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pyc
deleted file mode 100644
index eb0f448..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/dml.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pyc
deleted file mode 100644
index ad4323c..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/json.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pyc
deleted file mode 100644
index d139ba3..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/provision.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pyc
deleted file mode 100644
index d26e7b3..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlcipher.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pyc
deleted file mode 100644
index df08288..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/__pycache__/pysqlite.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py
deleted file mode 100644
index 6c91563..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py
+++ /dev/null
@@ -1,396 +0,0 @@
-# dialects/sqlite/aiosqlite.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-r"""
-
-.. dialect:: sqlite+aiosqlite
- :name: aiosqlite
- :dbapi: aiosqlite
- :connectstring: sqlite+aiosqlite:///file_path
- :url: https://pypi.org/project/aiosqlite/
-
-The aiosqlite dialect provides support for the SQLAlchemy asyncio interface
-running on top of pysqlite.
-
-aiosqlite is a wrapper around pysqlite that uses a background thread for
-each connection. It does not actually use non-blocking IO, as SQLite
-databases are not socket-based. However it does provide a working asyncio
-interface that's useful for testing and prototyping purposes.
-
-Using a special asyncio mediation layer, the aiosqlite dialect is usable
-as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
-extension package.
-
-This dialect should normally be used only with the
-:func:`_asyncio.create_async_engine` engine creation function::
-
- from sqlalchemy.ext.asyncio import create_async_engine
- engine = create_async_engine("sqlite+aiosqlite:///filename")
-
-The URL passes through all arguments to the ``pysqlite`` driver, so all
-connection arguments are the same as they are for that of :ref:`pysqlite`.
-
-.. _aiosqlite_udfs:
-
-User-Defined Functions
-----------------------
-
-aiosqlite extends pysqlite to support async, so we can create our own user-defined functions (UDFs)
-in Python and use them directly in SQLite queries as described here: :ref:`pysqlite_udfs`.
-
-.. _aiosqlite_serializable:
-
-Serializable isolation / Savepoints / Transactional DDL (asyncio version)
--------------------------------------------------------------------------
-
-Similarly to pysqlite, aiosqlite does not support SAVEPOINT feature.
-
-The solution is similar to :ref:`pysqlite_serializable`. This is achieved by the event listeners in async::
-
- from sqlalchemy import create_engine, event
- from sqlalchemy.ext.asyncio import create_async_engine
-
- engine = create_async_engine("sqlite+aiosqlite:///myfile.db")
-
- @event.listens_for(engine.sync_engine, "connect")
- def do_connect(dbapi_connection, connection_record):
- # disable aiosqlite's emitting of the BEGIN statement entirely.
- # also stops it from emitting COMMIT before any DDL.
- dbapi_connection.isolation_level = None
-
- @event.listens_for(engine.sync_engine, "begin")
- def do_begin(conn):
- # emit our own BEGIN
- conn.exec_driver_sql("BEGIN")
-
-.. warning:: When using the above recipe, it is advised to not use the
- :paramref:`.Connection.execution_options.isolation_level` setting on
- :class:`_engine.Connection` and :func:`_sa.create_engine`
- with the SQLite driver,
- as this function necessarily will also alter the ".isolation_level" setting.
-
-""" # noqa
-
-import asyncio
-from functools import partial
-
-from .base import SQLiteExecutionContext
-from .pysqlite import SQLiteDialect_pysqlite
-from ... import pool
-from ... import util
-from ...engine import AdaptedConnection
-from ...util.concurrency import await_fallback
-from ...util.concurrency import await_only
-
-
-class AsyncAdapt_aiosqlite_cursor:
- # TODO: base on connectors/asyncio.py
- # see #10415
-
- __slots__ = (
- "_adapt_connection",
- "_connection",
- "description",
- "await_",
- "_rows",
- "arraysize",
- "rowcount",
- "lastrowid",
- )
-
- server_side = False
-
- def __init__(self, adapt_connection):
- self._adapt_connection = adapt_connection
- self._connection = adapt_connection._connection
- self.await_ = adapt_connection.await_
- self.arraysize = 1
- self.rowcount = -1
- self.description = None
- self._rows = []
-
- def close(self):
- self._rows[:] = []
-
- def execute(self, operation, parameters=None):
- try:
- _cursor = self.await_(self._connection.cursor())
-
- if parameters is None:
- self.await_(_cursor.execute(operation))
- else:
- self.await_(_cursor.execute(operation, parameters))
-
- if _cursor.description:
- self.description = _cursor.description
- self.lastrowid = self.rowcount = -1
-
- if not self.server_side:
- self._rows = self.await_(_cursor.fetchall())
- else:
- self.description = None
- self.lastrowid = _cursor.lastrowid
- self.rowcount = _cursor.rowcount
-
- if not self.server_side:
- self.await_(_cursor.close())
- else:
- self._cursor = _cursor
- except Exception as error:
- self._adapt_connection._handle_exception(error)
-
- def executemany(self, operation, seq_of_parameters):
- try:
- _cursor = self.await_(self._connection.cursor())
- self.await_(_cursor.executemany(operation, seq_of_parameters))
- self.description = None
- self.lastrowid = _cursor.lastrowid
- self.rowcount = _cursor.rowcount
- self.await_(_cursor.close())
- except Exception as error:
- self._adapt_connection._handle_exception(error)
-
- def setinputsizes(self, *inputsizes):
- pass
-
- def __iter__(self):
- while self._rows:
- yield self._rows.pop(0)
-
- def fetchone(self):
- if self._rows:
- return self._rows.pop(0)
- else:
- return None
-
- def fetchmany(self, size=None):
- if size is None:
- size = self.arraysize
-
- retval = self._rows[0:size]
- self._rows[:] = self._rows[size:]
- return retval
-
- def fetchall(self):
- retval = self._rows[:]
- self._rows[:] = []
- return retval
-
-
-class AsyncAdapt_aiosqlite_ss_cursor(AsyncAdapt_aiosqlite_cursor):
- # TODO: base on connectors/asyncio.py
- # see #10415
- __slots__ = "_cursor"
-
- server_side = True
-
- def __init__(self, *arg, **kw):
- super().__init__(*arg, **kw)
- self._cursor = None
-
- def close(self):
- if self._cursor is not None:
- self.await_(self._cursor.close())
- self._cursor = None
-
- def fetchone(self):
- return self.await_(self._cursor.fetchone())
-
- def fetchmany(self, size=None):
- if size is None:
- size = self.arraysize
- return self.await_(self._cursor.fetchmany(size=size))
-
- def fetchall(self):
- return self.await_(self._cursor.fetchall())
-
-
-class AsyncAdapt_aiosqlite_connection(AdaptedConnection):
- await_ = staticmethod(await_only)
- __slots__ = ("dbapi",)
-
- def __init__(self, dbapi, connection):
- self.dbapi = dbapi
- self._connection = connection
-
- @property
- def isolation_level(self):
- return self._connection.isolation_level
-
- @isolation_level.setter
- def isolation_level(self, value):
- # aiosqlite's isolation_level setter works outside the Thread
- # that it's supposed to, necessitating setting check_same_thread=False.
- # for improved stability, we instead invent our own awaitable version
- # using aiosqlite's async queue directly.
-
- def set_iso(connection, value):
- connection.isolation_level = value
-
- function = partial(set_iso, self._connection._conn, value)
- future = asyncio.get_event_loop().create_future()
-
- self._connection._tx.put_nowait((future, function))
-
- try:
- return self.await_(future)
- except Exception as error:
- self._handle_exception(error)
-
- def create_function(self, *args, **kw):
- try:
- self.await_(self._connection.create_function(*args, **kw))
- except Exception as error:
- self._handle_exception(error)
-
- def cursor(self, server_side=False):
- if server_side:
- return AsyncAdapt_aiosqlite_ss_cursor(self)
- else:
- return AsyncAdapt_aiosqlite_cursor(self)
-
- def execute(self, *args, **kw):
- return self.await_(self._connection.execute(*args, **kw))
-
- def rollback(self):
- try:
- self.await_(self._connection.rollback())
- except Exception as error:
- self._handle_exception(error)
-
- def commit(self):
- try:
- self.await_(self._connection.commit())
- except Exception as error:
- self._handle_exception(error)
-
- def close(self):
- try:
- self.await_(self._connection.close())
- except ValueError:
- # this is undocumented for aiosqlite, that ValueError
- # was raised if .close() was called more than once, which is
- # both not customary for DBAPI and is also not a DBAPI.Error
- # exception. This is now fixed in aiosqlite via my PR
- # https://github.com/omnilib/aiosqlite/pull/238, so we can be
- # assured this will not become some other kind of exception,
- # since it doesn't raise anymore.
-
- pass
- except Exception as error:
- self._handle_exception(error)
-
- def _handle_exception(self, error):
- if (
- isinstance(error, ValueError)
- and error.args[0] == "no active connection"
- ):
- raise self.dbapi.sqlite.OperationalError(
- "no active connection"
- ) from error
- else:
- raise error
-
-
-class AsyncAdaptFallback_aiosqlite_connection(AsyncAdapt_aiosqlite_connection):
- __slots__ = ()
-
- await_ = staticmethod(await_fallback)
-
-
-class AsyncAdapt_aiosqlite_dbapi:
- def __init__(self, aiosqlite, sqlite):
- self.aiosqlite = aiosqlite
- self.sqlite = sqlite
- self.paramstyle = "qmark"
- self._init_dbapi_attributes()
-
- def _init_dbapi_attributes(self):
- for name in (
- "DatabaseError",
- "Error",
- "IntegrityError",
- "NotSupportedError",
- "OperationalError",
- "ProgrammingError",
- "sqlite_version",
- "sqlite_version_info",
- ):
- setattr(self, name, getattr(self.aiosqlite, name))
-
- for name in ("PARSE_COLNAMES", "PARSE_DECLTYPES"):
- setattr(self, name, getattr(self.sqlite, name))
-
- for name in ("Binary",):
- setattr(self, name, getattr(self.sqlite, name))
-
- def connect(self, *arg, **kw):
- async_fallback = kw.pop("async_fallback", False)
-
- creator_fn = kw.pop("async_creator_fn", None)
- if creator_fn:
- connection = creator_fn(*arg, **kw)
- else:
- connection = self.aiosqlite.connect(*arg, **kw)
- # it's a Thread. you'll thank us later
- connection.daemon = True
-
- if util.asbool(async_fallback):
- return AsyncAdaptFallback_aiosqlite_connection(
- self,
- await_fallback(connection),
- )
- else:
- return AsyncAdapt_aiosqlite_connection(
- self,
- await_only(connection),
- )
-
-
-class SQLiteExecutionContext_aiosqlite(SQLiteExecutionContext):
- def create_server_side_cursor(self):
- return self._dbapi_connection.cursor(server_side=True)
-
-
-class SQLiteDialect_aiosqlite(SQLiteDialect_pysqlite):
- driver = "aiosqlite"
- supports_statement_cache = True
-
- is_async = True
-
- supports_server_side_cursors = True
-
- execution_ctx_cls = SQLiteExecutionContext_aiosqlite
-
- @classmethod
- def import_dbapi(cls):
- return AsyncAdapt_aiosqlite_dbapi(
- __import__("aiosqlite"), __import__("sqlite3")
- )
-
- @classmethod
- def get_pool_class(cls, url):
- if cls._is_url_file_db(url):
- return pool.NullPool
- else:
- return pool.StaticPool
-
- def is_disconnect(self, e, connection, cursor):
- if isinstance(
- e, self.dbapi.OperationalError
- ) and "no active connection" in str(e):
- return True
-
- return super().is_disconnect(e, connection, cursor)
-
- def get_driver_connection(self, connection):
- return connection._connection
-
-
-dialect = SQLiteDialect_aiosqlite
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py
deleted file mode 100644
index 6db8214..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/base.py
+++ /dev/null
@@ -1,2782 +0,0 @@
-# dialects/sqlite/base.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-r"""
-.. dialect:: sqlite
- :name: SQLite
- :full_support: 3.36.0
- :normal_support: 3.12+
- :best_effort: 3.7.16+
-
-.. _sqlite_datetime:
-
-Date and Time Types
--------------------
-
-SQLite does not have built-in DATE, TIME, or DATETIME types, and pysqlite does
-not provide out of the box functionality for translating values between Python
-`datetime` objects and a SQLite-supported format. SQLAlchemy's own
-:class:`~sqlalchemy.types.DateTime` and related types provide date formatting
-and parsing functionality when SQLite is used. The implementation classes are
-:class:`_sqlite.DATETIME`, :class:`_sqlite.DATE` and :class:`_sqlite.TIME`.
-These types represent dates and times as ISO formatted strings, which also
-nicely support ordering. There's no reliance on typical "libc" internals for
-these functions so historical dates are fully supported.
-
-Ensuring Text affinity
-^^^^^^^^^^^^^^^^^^^^^^
-
-The DDL rendered for these types is the standard ``DATE``, ``TIME``
-and ``DATETIME`` indicators. However, custom storage formats can also be
-applied to these types. When the
-storage format is detected as containing no alpha characters, the DDL for
-these types is rendered as ``DATE_CHAR``, ``TIME_CHAR``, and ``DATETIME_CHAR``,
-so that the column continues to have textual affinity.
-
-.. seealso::
-
- `Type Affinity <https://www.sqlite.org/datatype3.html#affinity>`_ -
- in the SQLite documentation
-
-.. _sqlite_autoincrement:
-
-SQLite Auto Incrementing Behavior
-----------------------------------
-
-Background on SQLite's autoincrement is at: https://sqlite.org/autoinc.html
-
-Key concepts:
-
-* SQLite has an implicit "auto increment" feature that takes place for any
- non-composite primary-key column that is specifically created using
- "INTEGER PRIMARY KEY" for the type + primary key.
-
-* SQLite also has an explicit "AUTOINCREMENT" keyword, that is **not**
- equivalent to the implicit autoincrement feature; this keyword is not
- recommended for general use. SQLAlchemy does not render this keyword
- unless a special SQLite-specific directive is used (see below). However,
- it still requires that the column's type is named "INTEGER".
-
-Using the AUTOINCREMENT Keyword
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-To specifically render the AUTOINCREMENT keyword on the primary key column
-when rendering DDL, add the flag ``sqlite_autoincrement=True`` to the Table
-construct::
-
- Table('sometable', metadata,
- Column('id', Integer, primary_key=True),
- sqlite_autoincrement=True)
-
-Allowing autoincrement behavior SQLAlchemy types other than Integer/INTEGER
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-SQLite's typing model is based on naming conventions. Among other things, this
-means that any type name which contains the substring ``"INT"`` will be
-determined to be of "integer affinity". A type named ``"BIGINT"``,
-``"SPECIAL_INT"`` or even ``"XYZINTQPR"``, will be considered by SQLite to be
-of "integer" affinity. However, **the SQLite autoincrement feature, whether
-implicitly or explicitly enabled, requires that the name of the column's type
-is exactly the string "INTEGER"**. Therefore, if an application uses a type
-like :class:`.BigInteger` for a primary key, on SQLite this type will need to
-be rendered as the name ``"INTEGER"`` when emitting the initial ``CREATE
-TABLE`` statement in order for the autoincrement behavior to be available.
-
-One approach to achieve this is to use :class:`.Integer` on SQLite
-only using :meth:`.TypeEngine.with_variant`::
-
- table = Table(
- "my_table", metadata,
- Column("id", BigInteger().with_variant(Integer, "sqlite"), primary_key=True)
- )
-
-Another is to use a subclass of :class:`.BigInteger` that overrides its DDL
-name to be ``INTEGER`` when compiled against SQLite::
-
- from sqlalchemy import BigInteger
- from sqlalchemy.ext.compiler import compiles
-
- class SLBigInteger(BigInteger):
- pass
-
- @compiles(SLBigInteger, 'sqlite')
- def bi_c(element, compiler, **kw):
- return "INTEGER"
-
- @compiles(SLBigInteger)
- def bi_c(element, compiler, **kw):
- return compiler.visit_BIGINT(element, **kw)
-
-
- table = Table(
- "my_table", metadata,
- Column("id", SLBigInteger(), primary_key=True)
- )
-
-.. seealso::
-
- :meth:`.TypeEngine.with_variant`
-
- :ref:`sqlalchemy.ext.compiler_toplevel`
-
- `Datatypes In SQLite Version 3 <https://sqlite.org/datatype3.html>`_
-
-.. _sqlite_concurrency:
-
-Database Locking Behavior / Concurrency
----------------------------------------
-
-SQLite is not designed for a high level of write concurrency. The database
-itself, being a file, is locked completely during write operations within
-transactions, meaning exactly one "connection" (in reality a file handle)
-has exclusive access to the database during this period - all other
-"connections" will be blocked during this time.
-
-The Python DBAPI specification also calls for a connection model that is
-always in a transaction; there is no ``connection.begin()`` method,
-only ``connection.commit()`` and ``connection.rollback()``, upon which a
-new transaction is to be begun immediately. This may seem to imply
-that the SQLite driver would in theory allow only a single filehandle on a
-particular database file at any time; however, there are several
-factors both within SQLite itself as well as within the pysqlite driver
-which loosen this restriction significantly.
-
-However, no matter what locking modes are used, SQLite will still always
-lock the database file once a transaction is started and DML (e.g. INSERT,
-UPDATE, DELETE) has at least been emitted, and this will block
-other transactions at least at the point that they also attempt to emit DML.
-By default, the length of time on this block is very short before it times out
-with an error.
-
-This behavior becomes more critical when used in conjunction with the
-SQLAlchemy ORM. SQLAlchemy's :class:`.Session` object by default runs
-within a transaction, and with its autoflush model, may emit DML preceding
-any SELECT statement. This may lead to a SQLite database that locks
-more quickly than is expected. The locking mode of SQLite and the pysqlite
-driver can be manipulated to some degree, however it should be noted that
-achieving a high degree of write-concurrency with SQLite is a losing battle.
-
-For more information on SQLite's lack of write concurrency by design, please
-see
-`Situations Where Another RDBMS May Work Better - High Concurrency
-<https://www.sqlite.org/whentouse.html>`_ near the bottom of the page.
-
-The following subsections introduce areas that are impacted by SQLite's
-file-based architecture and additionally will usually require workarounds to
-work when using the pysqlite driver.
-
-.. _sqlite_isolation_level:
-
-Transaction Isolation Level / Autocommit
-----------------------------------------
-
-SQLite supports "transaction isolation" in a non-standard way, along two
-axes. One is that of the
-`PRAGMA read_uncommitted <https://www.sqlite.org/pragma.html#pragma_read_uncommitted>`_
-instruction. This setting can essentially switch SQLite between its
-default mode of ``SERIALIZABLE`` isolation, and a "dirty read" isolation
-mode normally referred to as ``READ UNCOMMITTED``.
-
-SQLAlchemy ties into this PRAGMA statement using the
-:paramref:`_sa.create_engine.isolation_level` parameter of
-:func:`_sa.create_engine`.
-Valid values for this parameter when used with SQLite are ``"SERIALIZABLE"``
-and ``"READ UNCOMMITTED"`` corresponding to a value of 0 and 1, respectively.
-SQLite defaults to ``SERIALIZABLE``, however its behavior is impacted by
-the pysqlite driver's default behavior.
-
-When using the pysqlite driver, the ``"AUTOCOMMIT"`` isolation level is also
-available, which will alter the pysqlite connection using the ``.isolation_level``
-attribute on the DBAPI connection and set it to None for the duration
-of the setting.
-
-.. versionadded:: 1.3.16 added support for SQLite AUTOCOMMIT isolation level
- when using the pysqlite / sqlite3 SQLite driver.
-
-
-The other axis along which SQLite's transactional locking is impacted is
-via the nature of the ``BEGIN`` statement used. The three varieties
-are "deferred", "immediate", and "exclusive", as described at
-`BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_. A straight
-``BEGIN`` statement uses the "deferred" mode, where the database file is
-not locked until the first read or write operation, and read access remains
-open to other transactions until the first write operation. But again,
-it is critical to note that the pysqlite driver interferes with this behavior
-by *not even emitting BEGIN* until the first write operation.
-
-.. warning::
-
- SQLite's transactional scope is impacted by unresolved
- issues in the pysqlite driver, which defers BEGIN statements to a greater
- degree than is often feasible. See the section :ref:`pysqlite_serializable`
- or :ref:`aiosqlite_serializable` for techniques to work around this behavior.
-
-.. seealso::
-
- :ref:`dbapi_autocommit`
-
-INSERT/UPDATE/DELETE...RETURNING
----------------------------------
-
-The SQLite dialect supports SQLite 3.35's ``INSERT|UPDATE|DELETE..RETURNING``
-syntax. ``INSERT..RETURNING`` may be used
-automatically in some cases in order to fetch newly generated identifiers in
-place of the traditional approach of using ``cursor.lastrowid``, however
-``cursor.lastrowid`` is currently still preferred for simple single-statement
-cases for its better performance.
-
-To specify an explicit ``RETURNING`` clause, use the
-:meth:`._UpdateBase.returning` method on a per-statement basis::
-
- # INSERT..RETURNING
- result = connection.execute(
- table.insert().
- values(name='foo').
- returning(table.c.col1, table.c.col2)
- )
- print(result.all())
-
- # UPDATE..RETURNING
- result = connection.execute(
- table.update().
- where(table.c.name=='foo').
- values(name='bar').
- returning(table.c.col1, table.c.col2)
- )
- print(result.all())
-
- # DELETE..RETURNING
- result = connection.execute(
- table.delete().
- where(table.c.name=='foo').
- returning(table.c.col1, table.c.col2)
- )
- print(result.all())
-
-.. versionadded:: 2.0 Added support for SQLite RETURNING
-
-SAVEPOINT Support
-----------------------------
-
-SQLite supports SAVEPOINTs, which only function once a transaction is
-begun. SQLAlchemy's SAVEPOINT support is available using the
-:meth:`_engine.Connection.begin_nested` method at the Core level, and
-:meth:`.Session.begin_nested` at the ORM level. However, SAVEPOINTs
-won't work at all with pysqlite unless workarounds are taken.
-
-.. warning::
-
- SQLite's SAVEPOINT feature is impacted by unresolved
- issues in the pysqlite and aiosqlite drivers, which defer BEGIN statements
- to a greater degree than is often feasible. See the sections
- :ref:`pysqlite_serializable` and :ref:`aiosqlite_serializable`
- for techniques to work around this behavior.
-
-Transactional DDL
-----------------------------
-
-The SQLite database supports transactional :term:`DDL` as well.
-In this case, the pysqlite driver is not only failing to start transactions,
-it also is ending any existing transaction when DDL is detected, so again,
-workarounds are required.
-
-.. warning::
-
- SQLite's transactional DDL is impacted by unresolved issues
- in the pysqlite driver, which fails to emit BEGIN and additionally
- forces a COMMIT to cancel any transaction when DDL is encountered.
- See the section :ref:`pysqlite_serializable`
- for techniques to work around this behavior.
-
-.. _sqlite_foreign_keys:
-
-Foreign Key Support
--------------------
-
-SQLite supports FOREIGN KEY syntax when emitting CREATE statements for tables,
-however by default these constraints have no effect on the operation of the
-table.
-
-Constraint checking on SQLite has three prerequisites:
-
-* At least version 3.6.19 of SQLite must be in use
-* The SQLite library must be compiled *without* the SQLITE_OMIT_FOREIGN_KEY
- or SQLITE_OMIT_TRIGGER symbols enabled.
-* The ``PRAGMA foreign_keys = ON`` statement must be emitted on all
- connections before use -- including the initial call to
- :meth:`sqlalchemy.schema.MetaData.create_all`.
-
-SQLAlchemy allows for the ``PRAGMA`` statement to be emitted automatically for
-new connections through the usage of events::
-
- from sqlalchemy.engine import Engine
- from sqlalchemy import event
-
- @event.listens_for(Engine, "connect")
- def set_sqlite_pragma(dbapi_connection, connection_record):
- cursor = dbapi_connection.cursor()
- cursor.execute("PRAGMA foreign_keys=ON")
- cursor.close()
-
-.. warning::
-
- When SQLite foreign keys are enabled, it is **not possible**
- to emit CREATE or DROP statements for tables that contain
- mutually-dependent foreign key constraints;
- to emit the DDL for these tables requires that ALTER TABLE be used to
- create or drop these constraints separately, for which SQLite has
- no support.
-
-.. seealso::
-
- `SQLite Foreign Key Support <https://www.sqlite.org/foreignkeys.html>`_
- - on the SQLite web site.
-
- :ref:`event_toplevel` - SQLAlchemy event API.
-
- :ref:`use_alter` - more information on SQLAlchemy's facilities for handling
- mutually-dependent foreign key constraints.
-
-.. _sqlite_on_conflict_ddl:
-
-ON CONFLICT support for constraints
------------------------------------
-
-.. seealso:: This section describes the :term:`DDL` version of "ON CONFLICT" for
- SQLite, which occurs within a CREATE TABLE statement. For "ON CONFLICT" as
- applied to an INSERT statement, see :ref:`sqlite_on_conflict_insert`.
-
-SQLite supports a non-standard DDL clause known as ON CONFLICT which can be applied
-to primary key, unique, check, and not null constraints. In DDL, it is
-rendered either within the "CONSTRAINT" clause or within the column definition
-itself depending on the location of the target constraint. To render this
-clause within DDL, the extension parameter ``sqlite_on_conflict`` can be
-specified with a string conflict resolution algorithm within the
-:class:`.PrimaryKeyConstraint`, :class:`.UniqueConstraint`,
-:class:`.CheckConstraint` objects. Within the :class:`_schema.Column` object,
-there
-are individual parameters ``sqlite_on_conflict_not_null``,
-``sqlite_on_conflict_primary_key``, ``sqlite_on_conflict_unique`` which each
-correspond to the three types of relevant constraint types that can be
-indicated from a :class:`_schema.Column` object.
-
-.. seealso::
-
- `ON CONFLICT <https://www.sqlite.org/lang_conflict.html>`_ - in the SQLite
- documentation
-
-.. versionadded:: 1.3
-
-
-The ``sqlite_on_conflict`` parameters accept a string argument which is just
-the resolution name to be chosen, which on SQLite can be one of ROLLBACK,
-ABORT, FAIL, IGNORE, and REPLACE. For example, to add a UNIQUE constraint
-that specifies the IGNORE algorithm::
-
- some_table = Table(
- 'some_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', Integer),
- UniqueConstraint('id', 'data', sqlite_on_conflict='IGNORE')
- )
-
-The above renders CREATE TABLE DDL as::
-
- CREATE TABLE some_table (
- id INTEGER NOT NULL,
- data INTEGER,
- PRIMARY KEY (id),
- UNIQUE (id, data) ON CONFLICT IGNORE
- )
-
-
-When using the :paramref:`_schema.Column.unique`
-flag to add a UNIQUE constraint
-to a single column, the ``sqlite_on_conflict_unique`` parameter can
-be added to the :class:`_schema.Column` as well, which will be added to the
-UNIQUE constraint in the DDL::
-
- some_table = Table(
- 'some_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', Integer, unique=True,
- sqlite_on_conflict_unique='IGNORE')
- )
-
-rendering::
-
- CREATE TABLE some_table (
- id INTEGER NOT NULL,
- data INTEGER,
- PRIMARY KEY (id),
- UNIQUE (data) ON CONFLICT IGNORE
- )
-
-To apply the FAIL algorithm for a NOT NULL constraint,
-``sqlite_on_conflict_not_null`` is used::
-
- some_table = Table(
- 'some_table', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', Integer, nullable=False,
- sqlite_on_conflict_not_null='FAIL')
- )
-
-this renders the column inline ON CONFLICT phrase::
-
- CREATE TABLE some_table (
- id INTEGER NOT NULL,
- data INTEGER NOT NULL ON CONFLICT FAIL,
- PRIMARY KEY (id)
- )
-
-
-Similarly, for an inline primary key, use ``sqlite_on_conflict_primary_key``::
-
- some_table = Table(
- 'some_table', metadata,
- Column('id', Integer, primary_key=True,
- sqlite_on_conflict_primary_key='FAIL')
- )
-
-SQLAlchemy renders the PRIMARY KEY constraint separately, so the conflict
-resolution algorithm is applied to the constraint itself::
-
- CREATE TABLE some_table (
- id INTEGER NOT NULL,
- PRIMARY KEY (id) ON CONFLICT FAIL
- )
-
-.. _sqlite_on_conflict_insert:
-
-INSERT...ON CONFLICT (Upsert)
------------------------------------
-
-.. seealso:: This section describes the :term:`DML` version of "ON CONFLICT" for
- SQLite, which occurs within an INSERT statement. For "ON CONFLICT" as
- applied to a CREATE TABLE statement, see :ref:`sqlite_on_conflict_ddl`.
-
-From version 3.24.0 onwards, SQLite supports "upserts" (update or insert)
-of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT``
-statement. A candidate row will only be inserted if that row does not violate
-any unique or primary key constraints. In the case of a unique constraint violation, a
-secondary action can occur which can be either "DO UPDATE", indicating that
-the data in the target row should be updated, or "DO NOTHING", which indicates
-to silently skip this row.
-
-Conflicts are determined using columns that are part of existing unique
-constraints and indexes. These constraints are identified by stating the
-columns and conditions that comprise the indexes.
-
-SQLAlchemy provides ``ON CONFLICT`` support via the SQLite-specific
-:func:`_sqlite.insert()` function, which provides
-the generative methods :meth:`_sqlite.Insert.on_conflict_do_update`
-and :meth:`_sqlite.Insert.on_conflict_do_nothing`:
-
-.. sourcecode:: pycon+sql
-
- >>> from sqlalchemy.dialects.sqlite import insert
-
- >>> insert_stmt = insert(my_table).values(
- ... id='some_existing_id',
- ... data='inserted value')
-
- >>> do_update_stmt = insert_stmt.on_conflict_do_update(
- ... index_elements=['id'],
- ... set_=dict(data='updated value')
- ... )
-
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
- ON CONFLICT (id) DO UPDATE SET data = ?{stop}
-
- >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
- ... index_elements=['id']
- ... )
-
- >>> print(do_nothing_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
- ON CONFLICT (id) DO NOTHING
-
-.. versionadded:: 1.4
-
-.. seealso::
-
- `Upsert
- <https://sqlite.org/lang_UPSERT.html>`_
- - in the SQLite documentation.
-
-
-Specifying the Target
-^^^^^^^^^^^^^^^^^^^^^
-
-Both methods supply the "target" of the conflict using column inference:
-
-* The :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements` argument
- specifies a sequence containing string column names, :class:`_schema.Column`
- objects, and/or SQL expression elements, which would identify a unique index
- or unique constraint.
-
-* When using :paramref:`_sqlite.Insert.on_conflict_do_update.index_elements`
- to infer an index, a partial index can be inferred by also specifying the
- :paramref:`_sqlite.Insert.on_conflict_do_update.index_where` parameter:
-
- .. sourcecode:: pycon+sql
-
- >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
-
- >>> do_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=[my_table.c.user_email],
- ... index_where=my_table.c.user_email.like('%@gmail.com'),
- ... set_=dict(data=stmt.excluded.data)
- ... )
-
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (data, user_email) VALUES (?, ?)
- ON CONFLICT (user_email)
- WHERE user_email LIKE '%@gmail.com'
- DO UPDATE SET data = excluded.data
-
-The SET Clause
-^^^^^^^^^^^^^^^
-
-``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
-existing row, using any combination of new values as well as values
-from the proposed insertion. These values are specified using the
-:paramref:`_sqlite.Insert.on_conflict_do_update.set_` parameter. This
-parameter accepts a dictionary which consists of direct values
-for UPDATE:
-
-.. sourcecode:: pycon+sql
-
- >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
-
- >>> do_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=['id'],
- ... set_=dict(data='updated value')
- ... )
-
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (?, ?)
- ON CONFLICT (id) DO UPDATE SET data = ?
-
-.. warning::
-
- The :meth:`_sqlite.Insert.on_conflict_do_update` method does **not** take
- into account Python-side default UPDATE values or generation functions,
- e.g. those specified using :paramref:`_schema.Column.onupdate`. These
- values will not be exercised for an ON CONFLICT style of UPDATE, unless
- they are manually specified in the
- :paramref:`_sqlite.Insert.on_conflict_do_update.set_` dictionary.
-
-Updating using the Excluded INSERT Values
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-In order to refer to the proposed insertion row, the special alias
-:attr:`~.sqlite.Insert.excluded` is available as an attribute on
-the :class:`_sqlite.Insert` object; this object creates an "excluded." prefix
-on a column, that informs the DO UPDATE to update the row with the value that
-would have been inserted had the constraint not failed:
-
-.. sourcecode:: pycon+sql
-
- >>> stmt = insert(my_table).values(
- ... id='some_id',
- ... data='inserted value',
- ... author='jlh'
- ... )
-
- >>> do_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=['id'],
- ... set_=dict(data='updated value', author=stmt.excluded.author)
- ... )
-
- >>> print(do_update_stmt)
- {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
- ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
-
-Additional WHERE Criteria
-^^^^^^^^^^^^^^^^^^^^^^^^^
-
-The :meth:`_sqlite.Insert.on_conflict_do_update` method also accepts
-a WHERE clause using the :paramref:`_sqlite.Insert.on_conflict_do_update.where`
-parameter, which will limit those rows which receive an UPDATE:
-
-.. sourcecode:: pycon+sql
-
- >>> stmt = insert(my_table).values(
- ... id='some_id',
- ... data='inserted value',
- ... author='jlh'
- ... )
-
- >>> on_update_stmt = stmt.on_conflict_do_update(
- ... index_elements=['id'],
- ... set_=dict(data='updated value', author=stmt.excluded.author),
- ... where=(my_table.c.status == 2)
- ... )
- >>> print(on_update_stmt)
- {printsql}INSERT INTO my_table (id, data, author) VALUES (?, ?, ?)
- ON CONFLICT (id) DO UPDATE SET data = ?, author = excluded.author
- WHERE my_table.status = ?
-
-
-Skipping Rows with DO NOTHING
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-``ON CONFLICT`` may be used to skip inserting a row entirely
-if any conflict with a unique constraint occurs; below this is illustrated
-using the :meth:`_sqlite.Insert.on_conflict_do_nothing` method:
-
-.. sourcecode:: pycon+sql
-
- >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
- >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
- >>> print(stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT (id) DO NOTHING
-
-
-If ``DO NOTHING`` is used without specifying any columns or constraint,
-it has the effect of skipping the INSERT for any unique violation which
-occurs:
-
-.. sourcecode:: pycon+sql
-
- >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
- >>> stmt = stmt.on_conflict_do_nothing()
- >>> print(stmt)
- {printsql}INSERT INTO my_table (id, data) VALUES (?, ?) ON CONFLICT DO NOTHING
-
-.. _sqlite_type_reflection:
-
-Type Reflection
----------------
-
-SQLite types are unlike those of most other database backends, in that
-the string name of the type usually does not correspond to a "type" in a
-one-to-one fashion. Instead, SQLite links per-column typing behavior
-to one of five so-called "type affinities" based on a string matching
-pattern for the type.
-
-SQLAlchemy's reflection process, when inspecting types, uses a simple
-lookup table to link the keywords returned to provided SQLAlchemy types.
-This lookup table is present within the SQLite dialect as it is for all
-other dialects. However, the SQLite dialect has a different "fallback"
-routine for when a particular type name is not located in the lookup map;
-it instead implements the SQLite "type affinity" scheme located at
-https://www.sqlite.org/datatype3.html section 2.1.
-
-The provided typemap will make direct associations from an exact string
-name match for the following types:
-
-:class:`_types.BIGINT`, :class:`_types.BLOB`,
-:class:`_types.BOOLEAN`, :class:`_types.BOOLEAN`,
-:class:`_types.CHAR`, :class:`_types.DATE`,
-:class:`_types.DATETIME`, :class:`_types.FLOAT`,
-:class:`_types.DECIMAL`, :class:`_types.FLOAT`,
-:class:`_types.INTEGER`, :class:`_types.INTEGER`,
-:class:`_types.NUMERIC`, :class:`_types.REAL`,
-:class:`_types.SMALLINT`, :class:`_types.TEXT`,
-:class:`_types.TIME`, :class:`_types.TIMESTAMP`,
-:class:`_types.VARCHAR`, :class:`_types.NVARCHAR`,
-:class:`_types.NCHAR`
-
-When a type name does not match one of the above types, the "type affinity"
-lookup is used instead:
-
-* :class:`_types.INTEGER` is returned if the type name includes the
- string ``INT``
-* :class:`_types.TEXT` is returned if the type name includes the
- string ``CHAR``, ``CLOB`` or ``TEXT``
-* :class:`_types.NullType` is returned if the type name includes the
- string ``BLOB``
-* :class:`_types.REAL` is returned if the type name includes the string
- ``REAL``, ``FLOA`` or ``DOUB``.
-* Otherwise, the :class:`_types.NUMERIC` type is used.
-
-.. _sqlite_partial_index:
-
-Partial Indexes
----------------
-
-A partial index, e.g. one which uses a WHERE clause, can be specified
-with the DDL system using the argument ``sqlite_where``::
-
- tbl = Table('testtbl', m, Column('data', Integer))
- idx = Index('test_idx1', tbl.c.data,
- sqlite_where=and_(tbl.c.data > 5, tbl.c.data < 10))
-
-The index will be rendered at create time as::
-
- CREATE INDEX test_idx1 ON testtbl (data)
- WHERE data > 5 AND data < 10
-
-.. _sqlite_dotted_column_names:
-
-Dotted Column Names
--------------------
-
-Using table or column names that explicitly have periods in them is
-**not recommended**. While this is generally a bad idea for relational
-databases in general, as the dot is a syntactically significant character,
-the SQLite driver up until version **3.10.0** of SQLite has a bug which
-requires that SQLAlchemy filter out these dots in result sets.
-
-The bug, entirely outside of SQLAlchemy, can be illustrated thusly::
-
- import sqlite3
-
- assert sqlite3.sqlite_version_info < (3, 10, 0), "bug is fixed in this version"
-
- conn = sqlite3.connect(":memory:")
- cursor = conn.cursor()
-
- cursor.execute("create table x (a integer, b integer)")
- cursor.execute("insert into x (a, b) values (1, 1)")
- cursor.execute("insert into x (a, b) values (2, 2)")
-
- cursor.execute("select x.a, x.b from x")
- assert [c[0] for c in cursor.description] == ['a', 'b']
-
- cursor.execute('''
- select x.a, x.b from x where a=1
- union
- select x.a, x.b from x where a=2
- ''')
- assert [c[0] for c in cursor.description] == ['a', 'b'], \
- [c[0] for c in cursor.description]
-
-The second assertion fails::
-
- Traceback (most recent call last):
- File "test.py", line 19, in <module>
- [c[0] for c in cursor.description]
- AssertionError: ['x.a', 'x.b']
-
-Where above, the driver incorrectly reports the names of the columns
-including the name of the table, which is entirely inconsistent vs.
-when the UNION is not present.
-
-SQLAlchemy relies upon column names being predictable in how they match
-to the original statement, so the SQLAlchemy dialect has no choice but
-to filter these out::
-
-
- from sqlalchemy import create_engine
-
- eng = create_engine("sqlite://")
- conn = eng.connect()
-
- conn.exec_driver_sql("create table x (a integer, b integer)")
- conn.exec_driver_sql("insert into x (a, b) values (1, 1)")
- conn.exec_driver_sql("insert into x (a, b) values (2, 2)")
-
- result = conn.exec_driver_sql("select x.a, x.b from x")
- assert result.keys() == ["a", "b"]
-
- result = conn.exec_driver_sql('''
- select x.a, x.b from x where a=1
- union
- select x.a, x.b from x where a=2
- ''')
- assert result.keys() == ["a", "b"]
-
-Note that above, even though SQLAlchemy filters out the dots, *both
-names are still addressable*::
-
- >>> row = result.first()
- >>> row["a"]
- 1
- >>> row["x.a"]
- 1
- >>> row["b"]
- 1
- >>> row["x.b"]
- 1
-
-Therefore, the workaround applied by SQLAlchemy only impacts
-:meth:`_engine.CursorResult.keys` and :meth:`.Row.keys()` in the public API. In
-the very specific case where an application is forced to use column names that
-contain dots, and the functionality of :meth:`_engine.CursorResult.keys` and
-:meth:`.Row.keys()` is required to return these dotted names unmodified,
-the ``sqlite_raw_colnames`` execution option may be provided, either on a
-per-:class:`_engine.Connection` basis::
-
- result = conn.execution_options(sqlite_raw_colnames=True).exec_driver_sql('''
- select x.a, x.b from x where a=1
- union
- select x.a, x.b from x where a=2
- ''')
- assert result.keys() == ["x.a", "x.b"]
-
-or on a per-:class:`_engine.Engine` basis::
-
- engine = create_engine("sqlite://", execution_options={"sqlite_raw_colnames": True})
-
-When using the per-:class:`_engine.Engine` execution option, note that
-**Core and ORM queries that use UNION may not function properly**.
-
-SQLite-specific table options
------------------------------
-
-One option for CREATE TABLE is supported directly by the SQLite
-dialect in conjunction with the :class:`_schema.Table` construct:
-
-* ``WITHOUT ROWID``::
-
- Table("some_table", metadata, ..., sqlite_with_rowid=False)
-
-.. seealso::
-
- `SQLite CREATE TABLE options
- <https://www.sqlite.org/lang_createtable.html>`_
-
-
-.. _sqlite_include_internal:
-
-Reflecting internal schema tables
-----------------------------------
-
-Reflection methods that return lists of tables will omit so-called
-"SQLite internal schema object" names, which are considered by SQLite
-as any object name that is prefixed with ``sqlite_``. An example of
-such an object is the ``sqlite_sequence`` table that's generated when
-the ``AUTOINCREMENT`` column parameter is used. In order to return
-these objects, the parameter ``sqlite_include_internal=True`` may be
-passed to methods such as :meth:`_schema.MetaData.reflect` or
-:meth:`.Inspector.get_table_names`.
-
-.. versionadded:: 2.0 Added the ``sqlite_include_internal=True`` parameter.
- Previously, these tables were not ignored by SQLAlchemy reflection
- methods.
-
-.. note::
-
- The ``sqlite_include_internal`` parameter does not refer to the
- "system" tables that are present in schemas such as ``sqlite_master``.
-
-.. seealso::
-
- `SQLite Internal Schema Objects <https://www.sqlite.org/fileformat2.html#intschema>`_ - in the SQLite
- documentation.
-
-""" # noqa
-from __future__ import annotations
-
-import datetime
-import numbers
-import re
-from typing import Optional
-
-from .json import JSON
-from .json import JSONIndexType
-from .json import JSONPathType
-from ... import exc
-from ... import schema as sa_schema
-from ... import sql
-from ... import text
-from ... import types as sqltypes
-from ... import util
-from ...engine import default
-from ...engine import processors
-from ...engine import reflection
-from ...engine.reflection import ReflectionDefaults
-from ...sql import coercions
-from ...sql import ColumnElement
-from ...sql import compiler
-from ...sql import elements
-from ...sql import roles
-from ...sql import schema
-from ...types import BLOB # noqa
-from ...types import BOOLEAN # noqa
-from ...types import CHAR # noqa
-from ...types import DECIMAL # noqa
-from ...types import FLOAT # noqa
-from ...types import INTEGER # noqa
-from ...types import NUMERIC # noqa
-from ...types import REAL # noqa
-from ...types import SMALLINT # noqa
-from ...types import TEXT # noqa
-from ...types import TIMESTAMP # noqa
-from ...types import VARCHAR # noqa
-
-
-class _SQliteJson(JSON):
- def result_processor(self, dialect, coltype):
- default_processor = super().result_processor(dialect, coltype)
-
- def process(value):
- try:
- return default_processor(value)
- except TypeError:
- if isinstance(value, numbers.Number):
- return value
- else:
- raise
-
- return process
-
-
-class _DateTimeMixin:
- _reg = None
- _storage_format = None
-
- def __init__(self, storage_format=None, regexp=None, **kw):
- super().__init__(**kw)
- if regexp is not None:
- self._reg = re.compile(regexp)
- if storage_format is not None:
- self._storage_format = storage_format
-
- @property
- def format_is_text_affinity(self):
- """return True if the storage format will automatically imply
- a TEXT affinity.
-
- If the storage format contains no non-numeric characters,
- it will imply a NUMERIC storage format on SQLite; in this case,
- the type will generate its DDL as DATE_CHAR, DATETIME_CHAR,
- TIME_CHAR.
-
- """
- spec = self._storage_format % {
- "year": 0,
- "month": 0,
- "day": 0,
- "hour": 0,
- "minute": 0,
- "second": 0,
- "microsecond": 0,
- }
- return bool(re.search(r"[^0-9]", spec))
-
- def adapt(self, cls, **kw):
- if issubclass(cls, _DateTimeMixin):
- if self._storage_format:
- kw["storage_format"] = self._storage_format
- if self._reg:
- kw["regexp"] = self._reg
- return super().adapt(cls, **kw)
-
- def literal_processor(self, dialect):
- bp = self.bind_processor(dialect)
-
- def process(value):
- return "'%s'" % bp(value)
-
- return process
-
-
-class DATETIME(_DateTimeMixin, sqltypes.DateTime):
- r"""Represent a Python datetime object in SQLite using a string.
-
- The default string storage format is::
-
- "%(year)04d-%(month)02d-%(day)02d %(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
-
- e.g.::
-
- 2021-03-15 12:05:57.105542
-
- The incoming storage format is by default parsed using the
- Python ``datetime.fromisoformat()`` function.
-
- .. versionchanged:: 2.0 ``datetime.fromisoformat()`` is used for default
- datetime string parsing.
-
- The storage format can be customized to some degree using the
- ``storage_format`` and ``regexp`` parameters, such as::
-
- import re
- from sqlalchemy.dialects.sqlite import DATETIME
-
- dt = DATETIME(storage_format="%(year)04d/%(month)02d/%(day)02d "
- "%(hour)02d:%(minute)02d:%(second)02d",
- regexp=r"(\d+)/(\d+)/(\d+) (\d+)-(\d+)-(\d+)"
- )
-
- :param storage_format: format string which will be applied to the dict
- with keys year, month, day, hour, minute, second, and microsecond.
-
- :param regexp: regular expression which will be applied to incoming result
- rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
- strings. If the regexp contains named groups, the resulting match dict is
- applied to the Python datetime() constructor as keyword arguments.
- Otherwise, if positional groups are used, the datetime() constructor
- is called with positional arguments via
- ``*map(int, match_obj.groups(0))``.
-
- """ # noqa
-
- _storage_format = (
- "%(year)04d-%(month)02d-%(day)02d "
- "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
- )
-
- def __init__(self, *args, **kwargs):
- truncate_microseconds = kwargs.pop("truncate_microseconds", False)
- super().__init__(*args, **kwargs)
- if truncate_microseconds:
- assert "storage_format" not in kwargs, (
- "You can specify only "
- "one of truncate_microseconds or storage_format."
- )
- assert "regexp" not in kwargs, (
- "You can specify only one of "
- "truncate_microseconds or regexp."
- )
- self._storage_format = (
- "%(year)04d-%(month)02d-%(day)02d "
- "%(hour)02d:%(minute)02d:%(second)02d"
- )
-
- def bind_processor(self, dialect):
- datetime_datetime = datetime.datetime
- datetime_date = datetime.date
- format_ = self._storage_format
-
- def process(value):
- if value is None:
- return None
- elif isinstance(value, datetime_datetime):
- return format_ % {
- "year": value.year,
- "month": value.month,
- "day": value.day,
- "hour": value.hour,
- "minute": value.minute,
- "second": value.second,
- "microsecond": value.microsecond,
- }
- elif isinstance(value, datetime_date):
- return format_ % {
- "year": value.year,
- "month": value.month,
- "day": value.day,
- "hour": 0,
- "minute": 0,
- "second": 0,
- "microsecond": 0,
- }
- else:
- raise TypeError(
- "SQLite DateTime type only accepts Python "
- "datetime and date objects as input."
- )
-
- return process
-
- def result_processor(self, dialect, coltype):
- if self._reg:
- return processors.str_to_datetime_processor_factory(
- self._reg, datetime.datetime
- )
- else:
- return processors.str_to_datetime
-
-
-class DATE(_DateTimeMixin, sqltypes.Date):
- r"""Represent a Python date object in SQLite using a string.
-
- The default string storage format is::
-
- "%(year)04d-%(month)02d-%(day)02d"
-
- e.g.::
-
- 2011-03-15
-
- The incoming storage format is by default parsed using the
- Python ``date.fromisoformat()`` function.
-
- .. versionchanged:: 2.0 ``date.fromisoformat()`` is used for default
- date string parsing.
-
-
- The storage format can be customized to some degree using the
- ``storage_format`` and ``regexp`` parameters, such as::
-
- import re
- from sqlalchemy.dialects.sqlite import DATE
-
- d = DATE(
- storage_format="%(month)02d/%(day)02d/%(year)04d",
- regexp=re.compile("(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)")
- )
-
- :param storage_format: format string which will be applied to the
- dict with keys year, month, and day.
-
- :param regexp: regular expression which will be applied to
- incoming result rows, replacing the use of ``date.fromisoformat()`` to
- parse incoming strings. If the regexp contains named groups, the resulting
- match dict is applied to the Python date() constructor as keyword
- arguments. Otherwise, if positional groups are used, the date()
- constructor is called with positional arguments via
- ``*map(int, match_obj.groups(0))``.
-
- """
-
- _storage_format = "%(year)04d-%(month)02d-%(day)02d"
-
- def bind_processor(self, dialect):
- datetime_date = datetime.date
- format_ = self._storage_format
-
- def process(value):
- if value is None:
- return None
- elif isinstance(value, datetime_date):
- return format_ % {
- "year": value.year,
- "month": value.month,
- "day": value.day,
- }
- else:
- raise TypeError(
- "SQLite Date type only accepts Python "
- "date objects as input."
- )
-
- return process
-
- def result_processor(self, dialect, coltype):
- if self._reg:
- return processors.str_to_datetime_processor_factory(
- self._reg, datetime.date
- )
- else:
- return processors.str_to_date
-
-
-class TIME(_DateTimeMixin, sqltypes.Time):
- r"""Represent a Python time object in SQLite using a string.
-
- The default string storage format is::
-
- "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
-
- e.g.::
-
- 12:05:57.10558
-
- The incoming storage format is by default parsed using the
- Python ``time.fromisoformat()`` function.
-
- .. versionchanged:: 2.0 ``time.fromisoformat()`` is used for default
- time string parsing.
-
- The storage format can be customized to some degree using the
- ``storage_format`` and ``regexp`` parameters, such as::
-
- import re
- from sqlalchemy.dialects.sqlite import TIME
-
- t = TIME(storage_format="%(hour)02d-%(minute)02d-"
- "%(second)02d-%(microsecond)06d",
- regexp=re.compile("(\d+)-(\d+)-(\d+)-(?:-(\d+))?")
- )
-
- :param storage_format: format string which will be applied to the dict
- with keys hour, minute, second, and microsecond.
-
- :param regexp: regular expression which will be applied to incoming result
- rows, replacing the use of ``datetime.fromisoformat()`` to parse incoming
- strings. If the regexp contains named groups, the resulting match dict is
- applied to the Python time() constructor as keyword arguments. Otherwise,
- if positional groups are used, the time() constructor is called with
- positional arguments via ``*map(int, match_obj.groups(0))``.
-
- """
-
- _storage_format = "%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
-
- def __init__(self, *args, **kwargs):
- truncate_microseconds = kwargs.pop("truncate_microseconds", False)
- super().__init__(*args, **kwargs)
- if truncate_microseconds:
- assert "storage_format" not in kwargs, (
- "You can specify only "
- "one of truncate_microseconds or storage_format."
- )
- assert "regexp" not in kwargs, (
- "You can specify only one of "
- "truncate_microseconds or regexp."
- )
- self._storage_format = "%(hour)02d:%(minute)02d:%(second)02d"
-
- def bind_processor(self, dialect):
- datetime_time = datetime.time
- format_ = self._storage_format
-
- def process(value):
- if value is None:
- return None
- elif isinstance(value, datetime_time):
- return format_ % {
- "hour": value.hour,
- "minute": value.minute,
- "second": value.second,
- "microsecond": value.microsecond,
- }
- else:
- raise TypeError(
- "SQLite Time type only accepts Python "
- "time objects as input."
- )
-
- return process
-
- def result_processor(self, dialect, coltype):
- if self._reg:
- return processors.str_to_datetime_processor_factory(
- self._reg, datetime.time
- )
- else:
- return processors.str_to_time
-
-
-colspecs = {
- sqltypes.Date: DATE,
- sqltypes.DateTime: DATETIME,
- sqltypes.JSON: _SQliteJson,
- sqltypes.JSON.JSONIndexType: JSONIndexType,
- sqltypes.JSON.JSONPathType: JSONPathType,
- sqltypes.Time: TIME,
-}
-
-ischema_names = {
- "BIGINT": sqltypes.BIGINT,
- "BLOB": sqltypes.BLOB,
- "BOOL": sqltypes.BOOLEAN,
- "BOOLEAN": sqltypes.BOOLEAN,
- "CHAR": sqltypes.CHAR,
- "DATE": sqltypes.DATE,
- "DATE_CHAR": sqltypes.DATE,
- "DATETIME": sqltypes.DATETIME,
- "DATETIME_CHAR": sqltypes.DATETIME,
- "DOUBLE": sqltypes.DOUBLE,
- "DECIMAL": sqltypes.DECIMAL,
- "FLOAT": sqltypes.FLOAT,
- "INT": sqltypes.INTEGER,
- "INTEGER": sqltypes.INTEGER,
- "JSON": JSON,
- "NUMERIC": sqltypes.NUMERIC,
- "REAL": sqltypes.REAL,
- "SMALLINT": sqltypes.SMALLINT,
- "TEXT": sqltypes.TEXT,
- "TIME": sqltypes.TIME,
- "TIME_CHAR": sqltypes.TIME,
- "TIMESTAMP": sqltypes.TIMESTAMP,
- "VARCHAR": sqltypes.VARCHAR,
- "NVARCHAR": sqltypes.NVARCHAR,
- "NCHAR": sqltypes.NCHAR,
-}
-
-
-class SQLiteCompiler(compiler.SQLCompiler):
- extract_map = util.update_copy(
- compiler.SQLCompiler.extract_map,
- {
- "month": "%m",
- "day": "%d",
- "year": "%Y",
- "second": "%S",
- "hour": "%H",
- "doy": "%j",
- "minute": "%M",
- "epoch": "%s",
- "dow": "%w",
- "week": "%W",
- },
- )
-
- def visit_truediv_binary(self, binary, operator, **kw):
- return (
- self.process(binary.left, **kw)
- + " / "
- + "(%s + 0.0)" % self.process(binary.right, **kw)
- )
-
- def visit_now_func(self, fn, **kw):
- return "CURRENT_TIMESTAMP"
-
- def visit_localtimestamp_func(self, func, **kw):
- return 'DATETIME(CURRENT_TIMESTAMP, "localtime")'
-
- def visit_true(self, expr, **kw):
- return "1"
-
- def visit_false(self, expr, **kw):
- return "0"
-
- def visit_char_length_func(self, fn, **kw):
- return "length%s" % self.function_argspec(fn)
-
- def visit_aggregate_strings_func(self, fn, **kw):
- return "group_concat%s" % self.function_argspec(fn)
-
- def visit_cast(self, cast, **kwargs):
- if self.dialect.supports_cast:
- return super().visit_cast(cast, **kwargs)
- else:
- return self.process(cast.clause, **kwargs)
-
- def visit_extract(self, extract, **kw):
- try:
- return "CAST(STRFTIME('%s', %s) AS INTEGER)" % (
- self.extract_map[extract.field],
- self.process(extract.expr, **kw),
- )
- except KeyError as err:
- raise exc.CompileError(
- "%s is not a valid extract argument." % extract.field
- ) from err
-
- def returning_clause(
- self,
- stmt,
- returning_cols,
- *,
- populate_result_map,
- **kw,
- ):
- kw["include_table"] = False
- return super().returning_clause(
- stmt, returning_cols, populate_result_map=populate_result_map, **kw
- )
-
- def limit_clause(self, select, **kw):
- text = ""
- if select._limit_clause is not None:
- text += "\n LIMIT " + self.process(select._limit_clause, **kw)
- if select._offset_clause is not None:
- if select._limit_clause is None:
- text += "\n LIMIT " + self.process(sql.literal(-1))
- text += " OFFSET " + self.process(select._offset_clause, **kw)
- else:
- text += " OFFSET " + self.process(sql.literal(0), **kw)
- return text
-
- def for_update_clause(self, select, **kw):
- # sqlite has no "FOR UPDATE" AFAICT
- return ""
-
- def update_from_clause(
- self, update_stmt, from_table, extra_froms, from_hints, **kw
- ):
- kw["asfrom"] = True
- return "FROM " + ", ".join(
- t._compiler_dispatch(self, fromhints=from_hints, **kw)
- for t in extra_froms
- )
-
- def visit_is_distinct_from_binary(self, binary, operator, **kw):
- return "%s IS NOT %s" % (
- self.process(binary.left),
- self.process(binary.right),
- )
-
- def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
- return "%s IS %s" % (
- self.process(binary.left),
- self.process(binary.right),
- )
-
- def visit_json_getitem_op_binary(self, binary, operator, **kw):
- if binary.type._type_affinity is sqltypes.JSON:
- expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
- else:
- expr = "JSON_EXTRACT(%s, %s)"
-
- return expr % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
- if binary.type._type_affinity is sqltypes.JSON:
- expr = "JSON_QUOTE(JSON_EXTRACT(%s, %s))"
- else:
- expr = "JSON_EXTRACT(%s, %s)"
-
- return expr % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- def visit_empty_set_op_expr(self, type_, expand_op, **kw):
- # slightly old SQLite versions don't seem to be able to handle
- # the empty set impl
- return self.visit_empty_set_expr(type_)
-
- def visit_empty_set_expr(self, element_types, **kw):
- return "SELECT %s FROM (SELECT %s) WHERE 1!=1" % (
- ", ".join("1" for type_ in element_types or [INTEGER()]),
- ", ".join("1" for type_ in element_types or [INTEGER()]),
- )
-
- def visit_regexp_match_op_binary(self, binary, operator, **kw):
- return self._generate_generic_binary(binary, " REGEXP ", **kw)
-
- def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
- return self._generate_generic_binary(binary, " NOT REGEXP ", **kw)
-
- def _on_conflict_target(self, clause, **kw):
- if clause.constraint_target is not None:
- target_text = "(%s)" % clause.constraint_target
- elif clause.inferred_target_elements is not None:
- target_text = "(%s)" % ", ".join(
- (
- self.preparer.quote(c)
- if isinstance(c, str)
- else self.process(c, include_table=False, use_schema=False)
- )
- for c in clause.inferred_target_elements
- )
- if clause.inferred_target_whereclause is not None:
- target_text += " WHERE %s" % self.process(
- clause.inferred_target_whereclause,
- include_table=False,
- use_schema=False,
- literal_binds=True,
- )
-
- else:
- target_text = ""
-
- return target_text
-
- def visit_on_conflict_do_nothing(self, on_conflict, **kw):
- target_text = self._on_conflict_target(on_conflict, **kw)
-
- if target_text:
- return "ON CONFLICT %s DO NOTHING" % target_text
- else:
- return "ON CONFLICT DO NOTHING"
-
- def visit_on_conflict_do_update(self, on_conflict, **kw):
- clause = on_conflict
-
- target_text = self._on_conflict_target(on_conflict, **kw)
-
- action_set_ops = []
-
- set_parameters = dict(clause.update_values_to_set)
- # create a list of column assignment clauses as tuples
-
- insert_statement = self.stack[-1]["selectable"]
- cols = insert_statement.table.c
- for c in cols:
- col_key = c.key
-
- if col_key in set_parameters:
- value = set_parameters.pop(col_key)
- elif c in set_parameters:
- value = set_parameters.pop(c)
- else:
- continue
-
- if coercions._is_literal(value):
- value = elements.BindParameter(None, value, type_=c.type)
-
- else:
- if (
- isinstance(value, elements.BindParameter)
- and value.type._isnull
- ):
- value = value._clone()
- value.type = c.type
- value_text = self.process(value.self_group(), use_schema=False)
-
- key_text = self.preparer.quote(c.name)
- action_set_ops.append("%s = %s" % (key_text, value_text))
-
- # check for names that don't match columns
- if set_parameters:
- util.warn(
- "Additional column names not matching "
- "any column keys in table '%s': %s"
- % (
- self.current_executable.table.name,
- (", ".join("'%s'" % c for c in set_parameters)),
- )
- )
- for k, v in set_parameters.items():
- key_text = (
- self.preparer.quote(k)
- if isinstance(k, str)
- else self.process(k, use_schema=False)
- )
- value_text = self.process(
- coercions.expect(roles.ExpressionElementRole, v),
- use_schema=False,
- )
- action_set_ops.append("%s = %s" % (key_text, value_text))
-
- action_text = ", ".join(action_set_ops)
- if clause.update_whereclause is not None:
- action_text += " WHERE %s" % self.process(
- clause.update_whereclause, include_table=True, use_schema=False
- )
-
- return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
-
-
-class SQLiteDDLCompiler(compiler.DDLCompiler):
- def get_column_specification(self, column, **kwargs):
- coltype = self.dialect.type_compiler_instance.process(
- column.type, type_expression=column
- )
- colspec = self.preparer.format_column(column) + " " + coltype
- default = self.get_column_default_string(column)
- if default is not None:
- if isinstance(column.server_default.arg, ColumnElement):
- default = "(" + default + ")"
- colspec += " DEFAULT " + default
-
- if not column.nullable:
- colspec += " NOT NULL"
-
- on_conflict_clause = column.dialect_options["sqlite"][
- "on_conflict_not_null"
- ]
- if on_conflict_clause is not None:
- colspec += " ON CONFLICT " + on_conflict_clause
-
- if column.primary_key:
- if (
- column.autoincrement is True
- and len(column.table.primary_key.columns) != 1
- ):
- raise exc.CompileError(
- "SQLite does not support autoincrement for "
- "composite primary keys"
- )
-
- if (
- column.table.dialect_options["sqlite"]["autoincrement"]
- and len(column.table.primary_key.columns) == 1
- and issubclass(column.type._type_affinity, sqltypes.Integer)
- and not column.foreign_keys
- ):
- colspec += " PRIMARY KEY"
-
- on_conflict_clause = column.dialect_options["sqlite"][
- "on_conflict_primary_key"
- ]
- if on_conflict_clause is not None:
- colspec += " ON CONFLICT " + on_conflict_clause
-
- colspec += " AUTOINCREMENT"
-
- if column.computed is not None:
- colspec += " " + self.process(column.computed)
-
- return colspec
-
- def visit_primary_key_constraint(self, constraint, **kw):
- # for columns with sqlite_autoincrement=True,
- # the PRIMARY KEY constraint can only be inline
- # with the column itself.
- if len(constraint.columns) == 1:
- c = list(constraint)[0]
- if (
- c.primary_key
- and c.table.dialect_options["sqlite"]["autoincrement"]
- and issubclass(c.type._type_affinity, sqltypes.Integer)
- and not c.foreign_keys
- ):
- return None
-
- text = super().visit_primary_key_constraint(constraint)
-
- on_conflict_clause = constraint.dialect_options["sqlite"][
- "on_conflict"
- ]
- if on_conflict_clause is None and len(constraint.columns) == 1:
- on_conflict_clause = list(constraint)[0].dialect_options["sqlite"][
- "on_conflict_primary_key"
- ]
-
- if on_conflict_clause is not None:
- text += " ON CONFLICT " + on_conflict_clause
-
- return text
-
- def visit_unique_constraint(self, constraint, **kw):
- text = super().visit_unique_constraint(constraint)
-
- on_conflict_clause = constraint.dialect_options["sqlite"][
- "on_conflict"
- ]
- if on_conflict_clause is None and len(constraint.columns) == 1:
- col1 = list(constraint)[0]
- if isinstance(col1, schema.SchemaItem):
- on_conflict_clause = list(constraint)[0].dialect_options[
- "sqlite"
- ]["on_conflict_unique"]
-
- if on_conflict_clause is not None:
- text += " ON CONFLICT " + on_conflict_clause
-
- return text
-
- def visit_check_constraint(self, constraint, **kw):
- text = super().visit_check_constraint(constraint)
-
- on_conflict_clause = constraint.dialect_options["sqlite"][
- "on_conflict"
- ]
-
- if on_conflict_clause is not None:
- text += " ON CONFLICT " + on_conflict_clause
-
- return text
-
- def visit_column_check_constraint(self, constraint, **kw):
- text = super().visit_column_check_constraint(constraint)
-
- if constraint.dialect_options["sqlite"]["on_conflict"] is not None:
- raise exc.CompileError(
- "SQLite does not support on conflict clause for "
- "column check constraint"
- )
-
- return text
-
- def visit_foreign_key_constraint(self, constraint, **kw):
- local_table = constraint.elements[0].parent.table
- remote_table = constraint.elements[0].column.table
-
- if local_table.schema != remote_table.schema:
- return None
- else:
- return super().visit_foreign_key_constraint(constraint)
-
- def define_constraint_remote_table(self, constraint, table, preparer):
- """Format the remote table clause of a CREATE CONSTRAINT clause."""
-
- return preparer.format_table(table, use_schema=False)
-
- def visit_create_index(
- self, create, include_schema=False, include_table_schema=True, **kw
- ):
- index = create.element
- self._verify_index_table(index)
- preparer = self.preparer
- text = "CREATE "
- if index.unique:
- text += "UNIQUE "
-
- text += "INDEX "
-
- if create.if_not_exists:
- text += "IF NOT EXISTS "
-
- text += "%s ON %s (%s)" % (
- self._prepared_index_name(index, include_schema=True),
- preparer.format_table(index.table, use_schema=False),
- ", ".join(
- self.sql_compiler.process(
- expr, include_table=False, literal_binds=True
- )
- for expr in index.expressions
- ),
- )
-
- whereclause = index.dialect_options["sqlite"]["where"]
- if whereclause is not None:
- where_compiled = self.sql_compiler.process(
- whereclause, include_table=False, literal_binds=True
- )
- text += " WHERE " + where_compiled
-
- return text
-
- def post_create_table(self, table):
- if table.dialect_options["sqlite"]["with_rowid"] is False:
- return "\n WITHOUT ROWID"
- return ""
-
-
-class SQLiteTypeCompiler(compiler.GenericTypeCompiler):
- def visit_large_binary(self, type_, **kw):
- return self.visit_BLOB(type_)
-
- def visit_DATETIME(self, type_, **kw):
- if (
- not isinstance(type_, _DateTimeMixin)
- or type_.format_is_text_affinity
- ):
- return super().visit_DATETIME(type_)
- else:
- return "DATETIME_CHAR"
-
- def visit_DATE(self, type_, **kw):
- if (
- not isinstance(type_, _DateTimeMixin)
- or type_.format_is_text_affinity
- ):
- return super().visit_DATE(type_)
- else:
- return "DATE_CHAR"
-
- def visit_TIME(self, type_, **kw):
- if (
- not isinstance(type_, _DateTimeMixin)
- or type_.format_is_text_affinity
- ):
- return super().visit_TIME(type_)
- else:
- return "TIME_CHAR"
-
- def visit_JSON(self, type_, **kw):
- # note this name provides NUMERIC affinity, not TEXT.
- # should not be an issue unless the JSON value consists of a single
- # numeric value. JSONTEXT can be used if this case is required.
- return "JSON"
-
-
-class SQLiteIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = {
- "add",
- "after",
- "all",
- "alter",
- "analyze",
- "and",
- "as",
- "asc",
- "attach",
- "autoincrement",
- "before",
- "begin",
- "between",
- "by",
- "cascade",
- "case",
- "cast",
- "check",
- "collate",
- "column",
- "commit",
- "conflict",
- "constraint",
- "create",
- "cross",
- "current_date",
- "current_time",
- "current_timestamp",
- "database",
- "default",
- "deferrable",
- "deferred",
- "delete",
- "desc",
- "detach",
- "distinct",
- "drop",
- "each",
- "else",
- "end",
- "escape",
- "except",
- "exclusive",
- "exists",
- "explain",
- "false",
- "fail",
- "for",
- "foreign",
- "from",
- "full",
- "glob",
- "group",
- "having",
- "if",
- "ignore",
- "immediate",
- "in",
- "index",
- "indexed",
- "initially",
- "inner",
- "insert",
- "instead",
- "intersect",
- "into",
- "is",
- "isnull",
- "join",
- "key",
- "left",
- "like",
- "limit",
- "match",
- "natural",
- "not",
- "notnull",
- "null",
- "of",
- "offset",
- "on",
- "or",
- "order",
- "outer",
- "plan",
- "pragma",
- "primary",
- "query",
- "raise",
- "references",
- "reindex",
- "rename",
- "replace",
- "restrict",
- "right",
- "rollback",
- "row",
- "select",
- "set",
- "table",
- "temp",
- "temporary",
- "then",
- "to",
- "transaction",
- "trigger",
- "true",
- "union",
- "unique",
- "update",
- "using",
- "vacuum",
- "values",
- "view",
- "virtual",
- "when",
- "where",
- }
-
-
-class SQLiteExecutionContext(default.DefaultExecutionContext):
- @util.memoized_property
- def _preserve_raw_colnames(self):
- return (
- not self.dialect._broken_dotted_colnames
- or self.execution_options.get("sqlite_raw_colnames", False)
- )
-
- def _translate_colname(self, colname):
- # TODO: detect SQLite version 3.10.0 or greater;
- # see [ticket:3633]
-
- # adjust for dotted column names. SQLite
- # in the case of UNION may store col names as
- # "tablename.colname", or if using an attached database,
- # "database.tablename.colname", in cursor.description
- if not self._preserve_raw_colnames and "." in colname:
- return colname.split(".")[-1], colname
- else:
- return colname, None
-
-
-class SQLiteDialect(default.DefaultDialect):
- name = "sqlite"
- supports_alter = False
-
- # SQlite supports "DEFAULT VALUES" but *does not* support
- # "VALUES (DEFAULT)"
- supports_default_values = True
- supports_default_metavalue = False
-
- # sqlite issue:
- # https://github.com/python/cpython/issues/93421
- # note this parameter is no longer used by the ORM or default dialect
- # see #9414
- supports_sane_rowcount_returning = False
-
- supports_empty_insert = False
- supports_cast = True
- supports_multivalues_insert = True
- use_insertmanyvalues = True
- tuple_in_values = True
- supports_statement_cache = True
- insert_null_pk_still_autoincrements = True
- insert_returning = True
- update_returning = True
- update_returning_multifrom = True
- delete_returning = True
- update_returning_multifrom = True
-
- supports_default_metavalue = True
- """dialect supports INSERT... VALUES (DEFAULT) syntax"""
-
- default_metavalue_token = "NULL"
- """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
- parenthesis."""
-
- default_paramstyle = "qmark"
- execution_ctx_cls = SQLiteExecutionContext
- statement_compiler = SQLiteCompiler
- ddl_compiler = SQLiteDDLCompiler
- type_compiler_cls = SQLiteTypeCompiler
- preparer = SQLiteIdentifierPreparer
- ischema_names = ischema_names
- colspecs = colspecs
-
- construct_arguments = [
- (
- sa_schema.Table,
- {
- "autoincrement": False,
- "with_rowid": True,
- },
- ),
- (sa_schema.Index, {"where": None}),
- (
- sa_schema.Column,
- {
- "on_conflict_primary_key": None,
- "on_conflict_not_null": None,
- "on_conflict_unique": None,
- },
- ),
- (sa_schema.Constraint, {"on_conflict": None}),
- ]
-
- _broken_fk_pragma_quotes = False
- _broken_dotted_colnames = False
-
- @util.deprecated_params(
- _json_serializer=(
- "1.3.7",
- "The _json_serializer argument to the SQLite dialect has "
- "been renamed to the correct name of json_serializer. The old "
- "argument name will be removed in a future release.",
- ),
- _json_deserializer=(
- "1.3.7",
- "The _json_deserializer argument to the SQLite dialect has "
- "been renamed to the correct name of json_deserializer. The old "
- "argument name will be removed in a future release.",
- ),
- )
- def __init__(
- self,
- native_datetime=False,
- json_serializer=None,
- json_deserializer=None,
- _json_serializer=None,
- _json_deserializer=None,
- **kwargs,
- ):
- default.DefaultDialect.__init__(self, **kwargs)
-
- if _json_serializer:
- json_serializer = _json_serializer
- if _json_deserializer:
- json_deserializer = _json_deserializer
- self._json_serializer = json_serializer
- self._json_deserializer = json_deserializer
-
- # this flag used by pysqlite dialect, and perhaps others in the
- # future, to indicate the driver is handling date/timestamp
- # conversions (and perhaps datetime/time as well on some hypothetical
- # driver ?)
- self.native_datetime = native_datetime
-
- if self.dbapi is not None:
- if self.dbapi.sqlite_version_info < (3, 7, 16):
- util.warn(
- "SQLite version %s is older than 3.7.16, and will not "
- "support right nested joins, as are sometimes used in "
- "more complex ORM scenarios. SQLAlchemy 1.4 and above "
- "no longer tries to rewrite these joins."
- % (self.dbapi.sqlite_version_info,)
- )
-
- # NOTE: python 3.7 on fedora for me has SQLite 3.34.1. These
- # version checks are getting very stale.
- self._broken_dotted_colnames = self.dbapi.sqlite_version_info < (
- 3,
- 10,
- 0,
- )
- self.supports_default_values = self.dbapi.sqlite_version_info >= (
- 3,
- 3,
- 8,
- )
- self.supports_cast = self.dbapi.sqlite_version_info >= (3, 2, 3)
- self.supports_multivalues_insert = (
- # https://www.sqlite.org/releaselog/3_7_11.html
- self.dbapi.sqlite_version_info
- >= (3, 7, 11)
- )
- # see https://www.sqlalchemy.org/trac/ticket/2568
- # as well as https://www.sqlite.org/src/info/600482d161
- self._broken_fk_pragma_quotes = self.dbapi.sqlite_version_info < (
- 3,
- 6,
- 14,
- )
-
- if self.dbapi.sqlite_version_info < (3, 35) or util.pypy:
- self.update_returning = self.delete_returning = (
- self.insert_returning
- ) = False
-
- if self.dbapi.sqlite_version_info < (3, 32, 0):
- # https://www.sqlite.org/limits.html
- self.insertmanyvalues_max_parameters = 999
-
- _isolation_lookup = util.immutabledict(
- {"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
- )
-
- def get_isolation_level_values(self, dbapi_connection):
- return list(self._isolation_lookup)
-
- def set_isolation_level(self, dbapi_connection, level):
- isolation_level = self._isolation_lookup[level]
-
- cursor = dbapi_connection.cursor()
- cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
- cursor.close()
-
- def get_isolation_level(self, dbapi_connection):
- cursor = dbapi_connection.cursor()
- cursor.execute("PRAGMA read_uncommitted")
- res = cursor.fetchone()
- if res:
- value = res[0]
- else:
- # https://www.sqlite.org/changes.html#version_3_3_3
- # "Optional READ UNCOMMITTED isolation (instead of the
- # default isolation level of SERIALIZABLE) and
- # table level locking when database connections
- # share a common cache.""
- # pre-SQLite 3.3.0 default to 0
- value = 0
- cursor.close()
- if value == 0:
- return "SERIALIZABLE"
- elif value == 1:
- return "READ UNCOMMITTED"
- else:
- assert False, "Unknown isolation level %s" % value
-
- @reflection.cache
- def get_schema_names(self, connection, **kw):
- s = "PRAGMA database_list"
- dl = connection.exec_driver_sql(s)
-
- return [db[1] for db in dl if db[1] != "temp"]
-
- def _format_schema(self, schema, table_name):
- if schema is not None:
- qschema = self.identifier_preparer.quote_identifier(schema)
- name = f"{qschema}.{table_name}"
- else:
- name = table_name
- return name
-
- def _sqlite_main_query(
- self,
- table: str,
- type_: str,
- schema: Optional[str],
- sqlite_include_internal: bool,
- ):
- main = self._format_schema(schema, table)
- if not sqlite_include_internal:
- filter_table = " AND name NOT LIKE 'sqlite~_%' ESCAPE '~'"
- else:
- filter_table = ""
- query = (
- f"SELECT name FROM {main} "
- f"WHERE type='{type_}'{filter_table} "
- "ORDER BY name"
- )
- return query
-
- @reflection.cache
- def get_table_names(
- self, connection, schema=None, sqlite_include_internal=False, **kw
- ):
- query = self._sqlite_main_query(
- "sqlite_master", "table", schema, sqlite_include_internal
- )
- names = connection.exec_driver_sql(query).scalars().all()
- return names
-
- @reflection.cache
- def get_temp_table_names(
- self, connection, sqlite_include_internal=False, **kw
- ):
- query = self._sqlite_main_query(
- "sqlite_temp_master", "table", None, sqlite_include_internal
- )
- names = connection.exec_driver_sql(query).scalars().all()
- return names
-
- @reflection.cache
- def get_temp_view_names(
- self, connection, sqlite_include_internal=False, **kw
- ):
- query = self._sqlite_main_query(
- "sqlite_temp_master", "view", None, sqlite_include_internal
- )
- names = connection.exec_driver_sql(query).scalars().all()
- return names
-
- @reflection.cache
- def has_table(self, connection, table_name, schema=None, **kw):
- self._ensure_has_table_connection(connection)
-
- if schema is not None and schema not in self.get_schema_names(
- connection, **kw
- ):
- return False
-
- info = self._get_table_pragma(
- connection, "table_info", table_name, schema=schema
- )
- return bool(info)
-
- def _get_default_schema_name(self, connection):
- return "main"
-
- @reflection.cache
- def get_view_names(
- self, connection, schema=None, sqlite_include_internal=False, **kw
- ):
- query = self._sqlite_main_query(
- "sqlite_master", "view", schema, sqlite_include_internal
- )
- names = connection.exec_driver_sql(query).scalars().all()
- return names
-
- @reflection.cache
- def get_view_definition(self, connection, view_name, schema=None, **kw):
- if schema is not None:
- qschema = self.identifier_preparer.quote_identifier(schema)
- master = f"{qschema}.sqlite_master"
- s = ("SELECT sql FROM %s WHERE name = ? AND type='view'") % (
- master,
- )
- rs = connection.exec_driver_sql(s, (view_name,))
- else:
- try:
- s = (
- "SELECT sql FROM "
- " (SELECT * FROM sqlite_master UNION ALL "
- " SELECT * FROM sqlite_temp_master) "
- "WHERE name = ? "
- "AND type='view'"
- )
- rs = connection.exec_driver_sql(s, (view_name,))
- except exc.DBAPIError:
- s = (
- "SELECT sql FROM sqlite_master WHERE name = ? "
- "AND type='view'"
- )
- rs = connection.exec_driver_sql(s, (view_name,))
-
- result = rs.fetchall()
- if result:
- return result[0].sql
- else:
- raise exc.NoSuchTableError(
- f"{schema}.{view_name}" if schema else view_name
- )
-
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- pragma = "table_info"
- # computed columns are threaded as hidden, they require table_xinfo
- if self.server_version_info >= (3, 31):
- pragma = "table_xinfo"
- info = self._get_table_pragma(
- connection, pragma, table_name, schema=schema
- )
- columns = []
- tablesql = None
- for row in info:
- name = row[1]
- type_ = row[2].upper()
- nullable = not row[3]
- default = row[4]
- primary_key = row[5]
- hidden = row[6] if pragma == "table_xinfo" else 0
-
- # hidden has value 0 for normal columns, 1 for hidden columns,
- # 2 for computed virtual columns and 3 for computed stored columns
- # https://www.sqlite.org/src/info/069351b85f9a706f60d3e98fbc8aaf40c374356b967c0464aede30ead3d9d18b
- if hidden == 1:
- continue
-
- generated = bool(hidden)
- persisted = hidden == 3
-
- if tablesql is None and generated:
- tablesql = self._get_table_sql(
- connection, table_name, schema, **kw
- )
-
- columns.append(
- self._get_column_info(
- name,
- type_,
- nullable,
- default,
- primary_key,
- generated,
- persisted,
- tablesql,
- )
- )
- if columns:
- return columns
- elif not self.has_table(connection, table_name, schema):
- raise exc.NoSuchTableError(
- f"{schema}.{table_name}" if schema else table_name
- )
- else:
- return ReflectionDefaults.columns()
-
- def _get_column_info(
- self,
- name,
- type_,
- nullable,
- default,
- primary_key,
- generated,
- persisted,
- tablesql,
- ):
- if generated:
- # the type of a column "cc INTEGER GENERATED ALWAYS AS (1 + 42)"
- # somehow is "INTEGER GENERATED ALWAYS"
- type_ = re.sub("generated", "", type_, flags=re.IGNORECASE)
- type_ = re.sub("always", "", type_, flags=re.IGNORECASE).strip()
-
- coltype = self._resolve_type_affinity(type_)
-
- if default is not None:
- default = str(default)
-
- colspec = {
- "name": name,
- "type": coltype,
- "nullable": nullable,
- "default": default,
- "primary_key": primary_key,
- }
- if generated:
- sqltext = ""
- if tablesql:
- pattern = r"[^,]*\s+AS\s+\(([^,]*)\)\s*(?:virtual|stored)?"
- match = re.search(
- re.escape(name) + pattern, tablesql, re.IGNORECASE
- )
- if match:
- sqltext = match.group(1)
- colspec["computed"] = {"sqltext": sqltext, "persisted": persisted}
- return colspec
-
- def _resolve_type_affinity(self, type_):
- """Return a data type from a reflected column, using affinity rules.
-
- SQLite's goal for universal compatibility introduces some complexity
- during reflection, as a column's defined type might not actually be a
- type that SQLite understands - or indeed, my not be defined *at all*.
- Internally, SQLite handles this with a 'data type affinity' for each
- column definition, mapping to one of 'TEXT', 'NUMERIC', 'INTEGER',
- 'REAL', or 'NONE' (raw bits). The algorithm that determines this is
- listed in https://www.sqlite.org/datatype3.html section 2.1.
-
- This method allows SQLAlchemy to support that algorithm, while still
- providing access to smarter reflection utilities by recognizing
- column definitions that SQLite only supports through affinity (like
- DATE and DOUBLE).
-
- """
- match = re.match(r"([\w ]+)(\(.*?\))?", type_)
- if match:
- coltype = match.group(1)
- args = match.group(2)
- else:
- coltype = ""
- args = ""
-
- if coltype in self.ischema_names:
- coltype = self.ischema_names[coltype]
- elif "INT" in coltype:
- coltype = sqltypes.INTEGER
- elif "CHAR" in coltype or "CLOB" in coltype or "TEXT" in coltype:
- coltype = sqltypes.TEXT
- elif "BLOB" in coltype or not coltype:
- coltype = sqltypes.NullType
- elif "REAL" in coltype or "FLOA" in coltype or "DOUB" in coltype:
- coltype = sqltypes.REAL
- else:
- coltype = sqltypes.NUMERIC
-
- if args is not None:
- args = re.findall(r"(\d+)", args)
- try:
- coltype = coltype(*[int(a) for a in args])
- except TypeError:
- util.warn(
- "Could not instantiate type %s with "
- "reflected arguments %s; using no arguments."
- % (coltype, args)
- )
- coltype = coltype()
- else:
- coltype = coltype()
-
- return coltype
-
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- constraint_name = None
- table_data = self._get_table_sql(connection, table_name, schema=schema)
- if table_data:
- PK_PATTERN = r"CONSTRAINT (\w+) PRIMARY KEY"
- result = re.search(PK_PATTERN, table_data, re.I)
- constraint_name = result.group(1) if result else None
-
- cols = self.get_columns(connection, table_name, schema, **kw)
- # consider only pk columns. This also avoids sorting the cached
- # value returned by get_columns
- cols = [col for col in cols if col.get("primary_key", 0) > 0]
- cols.sort(key=lambda col: col.get("primary_key"))
- pkeys = [col["name"] for col in cols]
-
- if pkeys:
- return {"constrained_columns": pkeys, "name": constraint_name}
- else:
- return ReflectionDefaults.pk_constraint()
-
- @reflection.cache
- def get_foreign_keys(self, connection, table_name, schema=None, **kw):
- # sqlite makes this *extremely difficult*.
- # First, use the pragma to get the actual FKs.
- pragma_fks = self._get_table_pragma(
- connection, "foreign_key_list", table_name, schema=schema
- )
-
- fks = {}
-
- for row in pragma_fks:
- (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
-
- if not rcol:
- # no referred column, which means it was not named in the
- # original DDL. The referred columns of the foreign key
- # constraint are therefore the primary key of the referred
- # table.
- try:
- referred_pk = self.get_pk_constraint(
- connection, rtbl, schema=schema, **kw
- )
- referred_columns = referred_pk["constrained_columns"]
- except exc.NoSuchTableError:
- # ignore not existing parents
- referred_columns = []
- else:
- # note we use this list only if this is the first column
- # in the constraint. for subsequent columns we ignore the
- # list and append "rcol" if present.
- referred_columns = []
-
- if self._broken_fk_pragma_quotes:
- rtbl = re.sub(r"^[\"\[`\']|[\"\]`\']$", "", rtbl)
-
- if numerical_id in fks:
- fk = fks[numerical_id]
- else:
- fk = fks[numerical_id] = {
- "name": None,
- "constrained_columns": [],
- "referred_schema": schema,
- "referred_table": rtbl,
- "referred_columns": referred_columns,
- "options": {},
- }
- fks[numerical_id] = fk
-
- fk["constrained_columns"].append(lcol)
-
- if rcol:
- fk["referred_columns"].append(rcol)
-
- def fk_sig(constrained_columns, referred_table, referred_columns):
- return (
- tuple(constrained_columns)
- + (referred_table,)
- + tuple(referred_columns)
- )
-
- # then, parse the actual SQL and attempt to find DDL that matches
- # the names as well. SQLite saves the DDL in whatever format
- # it was typed in as, so need to be liberal here.
-
- keys_by_signature = {
- fk_sig(
- fk["constrained_columns"],
- fk["referred_table"],
- fk["referred_columns"],
- ): fk
- for fk in fks.values()
- }
-
- table_data = self._get_table_sql(connection, table_name, schema=schema)
-
- def parse_fks():
- if table_data is None:
- # system tables, etc.
- return
-
- # note that we already have the FKs from PRAGMA above. This whole
- # regexp thing is trying to locate additional detail about the
- # FKs, namely the name of the constraint and other options.
- # so parsing the columns is really about matching it up to what
- # we already have.
- FK_PATTERN = (
- r"(?:CONSTRAINT (\w+) +)?"
- r"FOREIGN KEY *\( *(.+?) *\) +"
- r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\( *((?:(?:"[^"]+"|[a-z0-9_]+) *(?:, *)?)+)\) *' # noqa: E501
- r"((?:ON (?:DELETE|UPDATE) "
- r"(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)"
- r"((?:NOT +)?DEFERRABLE)?"
- r"(?: +INITIALLY +(DEFERRED|IMMEDIATE))?"
- )
- for match in re.finditer(FK_PATTERN, table_data, re.I):
- (
- constraint_name,
- constrained_columns,
- referred_quoted_name,
- referred_name,
- referred_columns,
- onupdatedelete,
- deferrable,
- initially,
- ) = match.group(1, 2, 3, 4, 5, 6, 7, 8)
- constrained_columns = list(
- self._find_cols_in_sig(constrained_columns)
- )
- if not referred_columns:
- referred_columns = constrained_columns
- else:
- referred_columns = list(
- self._find_cols_in_sig(referred_columns)
- )
- referred_name = referred_quoted_name or referred_name
- options = {}
-
- for token in re.split(r" *\bON\b *", onupdatedelete.upper()):
- if token.startswith("DELETE"):
- ondelete = token[6:].strip()
- if ondelete and ondelete != "NO ACTION":
- options["ondelete"] = ondelete
- elif token.startswith("UPDATE"):
- onupdate = token[6:].strip()
- if onupdate and onupdate != "NO ACTION":
- options["onupdate"] = onupdate
-
- if deferrable:
- options["deferrable"] = "NOT" not in deferrable.upper()
- if initially:
- options["initially"] = initially.upper()
-
- yield (
- constraint_name,
- constrained_columns,
- referred_name,
- referred_columns,
- options,
- )
-
- fkeys = []
-
- for (
- constraint_name,
- constrained_columns,
- referred_name,
- referred_columns,
- options,
- ) in parse_fks():
- sig = fk_sig(constrained_columns, referred_name, referred_columns)
- if sig not in keys_by_signature:
- util.warn(
- "WARNING: SQL-parsed foreign key constraint "
- "'%s' could not be located in PRAGMA "
- "foreign_keys for table %s" % (sig, table_name)
- )
- continue
- key = keys_by_signature.pop(sig)
- key["name"] = constraint_name
- key["options"] = options
- fkeys.append(key)
- # assume the remainders are the unnamed, inline constraints, just
- # use them as is as it's extremely difficult to parse inline
- # constraints
- fkeys.extend(keys_by_signature.values())
- if fkeys:
- return fkeys
- else:
- return ReflectionDefaults.foreign_keys()
-
- def _find_cols_in_sig(self, sig):
- for match in re.finditer(r'(?:"(.+?)")|([a-z0-9_]+)', sig, re.I):
- yield match.group(1) or match.group(2)
-
- @reflection.cache
- def get_unique_constraints(
- self, connection, table_name, schema=None, **kw
- ):
- auto_index_by_sig = {}
- for idx in self.get_indexes(
- connection,
- table_name,
- schema=schema,
- include_auto_indexes=True,
- **kw,
- ):
- if not idx["name"].startswith("sqlite_autoindex"):
- continue
- sig = tuple(idx["column_names"])
- auto_index_by_sig[sig] = idx
-
- table_data = self._get_table_sql(
- connection, table_name, schema=schema, **kw
- )
- unique_constraints = []
-
- def parse_uqs():
- if table_data is None:
- return
- UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)'
- INLINE_UNIQUE_PATTERN = (
- r'(?:(".+?")|(?:[\[`])?([a-z0-9_]+)(?:[\]`])?) '
- r"+[a-z0-9_ ]+? +UNIQUE"
- )
-
- for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
- name, cols = match.group(1, 2)
- yield name, list(self._find_cols_in_sig(cols))
-
- # we need to match inlines as well, as we seek to differentiate
- # a UNIQUE constraint from a UNIQUE INDEX, even though these
- # are kind of the same thing :)
- for match in re.finditer(INLINE_UNIQUE_PATTERN, table_data, re.I):
- cols = list(
- self._find_cols_in_sig(match.group(1) or match.group(2))
- )
- yield None, cols
-
- for name, cols in parse_uqs():
- sig = tuple(cols)
- if sig in auto_index_by_sig:
- auto_index_by_sig.pop(sig)
- parsed_constraint = {"name": name, "column_names": cols}
- unique_constraints.append(parsed_constraint)
- # NOTE: auto_index_by_sig might not be empty here,
- # the PRIMARY KEY may have an entry.
- if unique_constraints:
- return unique_constraints
- else:
- return ReflectionDefaults.unique_constraints()
-
- @reflection.cache
- def get_check_constraints(self, connection, table_name, schema=None, **kw):
- table_data = self._get_table_sql(
- connection, table_name, schema=schema, **kw
- )
-
- CHECK_PATTERN = r"(?:CONSTRAINT (.+) +)?" r"CHECK *\( *(.+) *\),? *"
- cks = []
- # NOTE: we aren't using re.S here because we actually are
- # taking advantage of each CHECK constraint being all on one
- # line in the table definition in order to delineate. This
- # necessarily makes assumptions as to how the CREATE TABLE
- # was emitted.
-
- for match in re.finditer(CHECK_PATTERN, table_data or "", re.I):
- name = match.group(1)
-
- if name:
- name = re.sub(r'^"|"$', "", name)
-
- cks.append({"sqltext": match.group(2), "name": name})
- cks.sort(key=lambda d: d["name"] or "~") # sort None as last
- if cks:
- return cks
- else:
- return ReflectionDefaults.check_constraints()
-
- @reflection.cache
- def get_indexes(self, connection, table_name, schema=None, **kw):
- pragma_indexes = self._get_table_pragma(
- connection, "index_list", table_name, schema=schema
- )
- indexes = []
-
- # regular expression to extract the filter predicate of a partial
- # index. this could fail to extract the predicate correctly on
- # indexes created like
- # CREATE INDEX i ON t (col || ') where') WHERE col <> ''
- # but as this function does not support expression-based indexes
- # this case does not occur.
- partial_pred_re = re.compile(r"\)\s+where\s+(.+)", re.IGNORECASE)
-
- if schema:
- schema_expr = "%s." % self.identifier_preparer.quote_identifier(
- schema
- )
- else:
- schema_expr = ""
-
- include_auto_indexes = kw.pop("include_auto_indexes", False)
- for row in pragma_indexes:
- # ignore implicit primary key index.
- # https://www.mail-archive.com/sqlite-users@sqlite.org/msg30517.html
- if not include_auto_indexes and row[1].startswith(
- "sqlite_autoindex"
- ):
- continue
- indexes.append(
- dict(
- name=row[1],
- column_names=[],
- unique=row[2],
- dialect_options={},
- )
- )
-
- # check partial indexes
- if len(row) >= 5 and row[4]:
- s = (
- "SELECT sql FROM %(schema)ssqlite_master "
- "WHERE name = ? "
- "AND type = 'index'" % {"schema": schema_expr}
- )
- rs = connection.exec_driver_sql(s, (row[1],))
- index_sql = rs.scalar()
- predicate_match = partial_pred_re.search(index_sql)
- if predicate_match is None:
- # unless the regex is broken this case shouldn't happen
- # because we know this is a partial index, so the
- # definition sql should match the regex
- util.warn(
- "Failed to look up filter predicate of "
- "partial index %s" % row[1]
- )
- else:
- predicate = predicate_match.group(1)
- indexes[-1]["dialect_options"]["sqlite_where"] = text(
- predicate
- )
-
- # loop thru unique indexes to get the column names.
- for idx in list(indexes):
- pragma_index = self._get_table_pragma(
- connection, "index_info", idx["name"], schema=schema
- )
-
- for row in pragma_index:
- if row[2] is None:
- util.warn(
- "Skipped unsupported reflection of "
- "expression-based index %s" % idx["name"]
- )
- indexes.remove(idx)
- break
- else:
- idx["column_names"].append(row[2])
-
- indexes.sort(key=lambda d: d["name"] or "~") # sort None as last
- if indexes:
- return indexes
- elif not self.has_table(connection, table_name, schema):
- raise exc.NoSuchTableError(
- f"{schema}.{table_name}" if schema else table_name
- )
- else:
- return ReflectionDefaults.indexes()
-
- def _is_sys_table(self, table_name):
- return table_name in {
- "sqlite_schema",
- "sqlite_master",
- "sqlite_temp_schema",
- "sqlite_temp_master",
- }
-
- @reflection.cache
- def _get_table_sql(self, connection, table_name, schema=None, **kw):
- if schema:
- schema_expr = "%s." % (
- self.identifier_preparer.quote_identifier(schema)
- )
- else:
- schema_expr = ""
- try:
- s = (
- "SELECT sql FROM "
- " (SELECT * FROM %(schema)ssqlite_master UNION ALL "
- " SELECT * FROM %(schema)ssqlite_temp_master) "
- "WHERE name = ? "
- "AND type in ('table', 'view')" % {"schema": schema_expr}
- )
- rs = connection.exec_driver_sql(s, (table_name,))
- except exc.DBAPIError:
- s = (
- "SELECT sql FROM %(schema)ssqlite_master "
- "WHERE name = ? "
- "AND type in ('table', 'view')" % {"schema": schema_expr}
- )
- rs = connection.exec_driver_sql(s, (table_name,))
- value = rs.scalar()
- if value is None and not self._is_sys_table(table_name):
- raise exc.NoSuchTableError(f"{schema_expr}{table_name}")
- return value
-
- def _get_table_pragma(self, connection, pragma, table_name, schema=None):
- quote = self.identifier_preparer.quote_identifier
- if schema is not None:
- statements = [f"PRAGMA {quote(schema)}."]
- else:
- # because PRAGMA looks in all attached databases if no schema
- # given, need to specify "main" schema, however since we want
- # 'temp' tables in the same namespace as 'main', need to run
- # the PRAGMA twice
- statements = ["PRAGMA main.", "PRAGMA temp."]
-
- qtable = quote(table_name)
- for statement in statements:
- statement = f"{statement}{pragma}({qtable})"
- cursor = connection.exec_driver_sql(statement)
- if not cursor._soft_closed:
- # work around SQLite issue whereby cursor.description
- # is blank when PRAGMA returns no rows:
- # https://www.sqlite.org/cvstrac/tktview?tn=1884
- result = cursor.fetchall()
- else:
- result = []
- if result:
- return result
- else:
- return []
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py
deleted file mode 100644
index dcf5e44..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/dml.py
+++ /dev/null
@@ -1,240 +0,0 @@
-# dialects/sqlite/dml.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-from __future__ import annotations
-
-from typing import Any
-
-from .._typing import _OnConflictIndexElementsT
-from .._typing import _OnConflictIndexWhereT
-from .._typing import _OnConflictSetT
-from .._typing import _OnConflictWhereT
-from ... import util
-from ...sql import coercions
-from ...sql import roles
-from ...sql._typing import _DMLTableArgument
-from ...sql.base import _exclusive_against
-from ...sql.base import _generative
-from ...sql.base import ColumnCollection
-from ...sql.base import ReadOnlyColumnCollection
-from ...sql.dml import Insert as StandardInsert
-from ...sql.elements import ClauseElement
-from ...sql.elements import KeyedColumnElement
-from ...sql.expression import alias
-from ...util.typing import Self
-
-__all__ = ("Insert", "insert")
-
-
-def insert(table: _DMLTableArgument) -> Insert:
- """Construct a sqlite-specific variant :class:`_sqlite.Insert`
- construct.
-
- .. container:: inherited_member
-
- The :func:`sqlalchemy.dialects.sqlite.insert` function creates
- a :class:`sqlalchemy.dialects.sqlite.Insert`. This class is based
- on the dialect-agnostic :class:`_sql.Insert` construct which may
- be constructed using the :func:`_sql.insert` function in
- SQLAlchemy Core.
-
- The :class:`_sqlite.Insert` construct includes additional methods
- :meth:`_sqlite.Insert.on_conflict_do_update`,
- :meth:`_sqlite.Insert.on_conflict_do_nothing`.
-
- """
- return Insert(table)
-
-
-class Insert(StandardInsert):
- """SQLite-specific implementation of INSERT.
-
- Adds methods for SQLite-specific syntaxes such as ON CONFLICT.
-
- The :class:`_sqlite.Insert` object is created using the
- :func:`sqlalchemy.dialects.sqlite.insert` function.
-
- .. versionadded:: 1.4
-
- .. seealso::
-
- :ref:`sqlite_on_conflict_insert`
-
- """
-
- stringify_dialect = "sqlite"
- inherit_cache = False
-
- @util.memoized_property
- def excluded(
- self,
- ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
- """Provide the ``excluded`` namespace for an ON CONFLICT statement
-
- SQLite's ON CONFLICT clause allows reference to the row that would
- be inserted, known as ``excluded``. This attribute provides
- all columns in this row to be referenceable.
-
- .. tip:: The :attr:`_sqlite.Insert.excluded` attribute is an instance
- of :class:`_expression.ColumnCollection`, which provides an
- interface the same as that of the :attr:`_schema.Table.c`
- collection described at :ref:`metadata_tables_and_columns`.
- With this collection, ordinary names are accessible like attributes
- (e.g. ``stmt.excluded.some_column``), but special names and
- dictionary method names should be accessed using indexed access,
- such as ``stmt.excluded["column name"]`` or
- ``stmt.excluded["values"]``. See the docstring for
- :class:`_expression.ColumnCollection` for further examples.
-
- """
- return alias(self.table, name="excluded").columns
-
- _on_conflict_exclusive = _exclusive_against(
- "_post_values_clause",
- msgs={
- "_post_values_clause": "This Insert construct already has "
- "an ON CONFLICT clause established"
- },
- )
-
- @_generative
- @_on_conflict_exclusive
- def on_conflict_do_update(
- self,
- index_elements: _OnConflictIndexElementsT = None,
- index_where: _OnConflictIndexWhereT = None,
- set_: _OnConflictSetT = None,
- where: _OnConflictWhereT = None,
- ) -> Self:
- r"""
- Specifies a DO UPDATE SET action for ON CONFLICT clause.
-
- :param index_elements:
- A sequence consisting of string column names, :class:`_schema.Column`
- objects, or other column expression objects that will be used
- to infer a target index or unique constraint.
-
- :param index_where:
- Additional WHERE criterion that can be used to infer a
- conditional target index.
-
- :param set\_:
- A dictionary or other mapping object
- where the keys are either names of columns in the target table,
- or :class:`_schema.Column` objects or other ORM-mapped columns
- matching that of the target table, and expressions or literals
- as values, specifying the ``SET`` actions to take.
-
- .. versionadded:: 1.4 The
- :paramref:`_sqlite.Insert.on_conflict_do_update.set_`
- parameter supports :class:`_schema.Column` objects from the target
- :class:`_schema.Table` as keys.
-
- .. warning:: This dictionary does **not** take into account
- Python-specified default UPDATE values or generation functions,
- e.g. those specified using :paramref:`_schema.Column.onupdate`.
- These values will not be exercised for an ON CONFLICT style of
- UPDATE, unless they are manually specified in the
- :paramref:`.Insert.on_conflict_do_update.set_` dictionary.
-
- :param where:
- Optional argument. If present, can be a literal SQL
- string or an acceptable expression for a ``WHERE`` clause
- that restricts the rows affected by ``DO UPDATE SET``. Rows
- not meeting the ``WHERE`` condition will not be updated
- (effectively a ``DO NOTHING`` for those rows).
-
- """
-
- self._post_values_clause = OnConflictDoUpdate(
- index_elements, index_where, set_, where
- )
- return self
-
- @_generative
- @_on_conflict_exclusive
- def on_conflict_do_nothing(
- self,
- index_elements: _OnConflictIndexElementsT = None,
- index_where: _OnConflictIndexWhereT = None,
- ) -> Self:
- """
- Specifies a DO NOTHING action for ON CONFLICT clause.
-
- :param index_elements:
- A sequence consisting of string column names, :class:`_schema.Column`
- objects, or other column expression objects that will be used
- to infer a target index or unique constraint.
-
- :param index_where:
- Additional WHERE criterion that can be used to infer a
- conditional target index.
-
- """
-
- self._post_values_clause = OnConflictDoNothing(
- index_elements, index_where
- )
- return self
-
-
-class OnConflictClause(ClauseElement):
- stringify_dialect = "sqlite"
-
- constraint_target: None
- inferred_target_elements: _OnConflictIndexElementsT
- inferred_target_whereclause: _OnConflictIndexWhereT
-
- def __init__(
- self,
- index_elements: _OnConflictIndexElementsT = None,
- index_where: _OnConflictIndexWhereT = None,
- ):
- if index_elements is not None:
- self.constraint_target = None
- self.inferred_target_elements = index_elements
- self.inferred_target_whereclause = index_where
- else:
- self.constraint_target = self.inferred_target_elements = (
- self.inferred_target_whereclause
- ) = None
-
-
-class OnConflictDoNothing(OnConflictClause):
- __visit_name__ = "on_conflict_do_nothing"
-
-
-class OnConflictDoUpdate(OnConflictClause):
- __visit_name__ = "on_conflict_do_update"
-
- def __init__(
- self,
- index_elements: _OnConflictIndexElementsT = None,
- index_where: _OnConflictIndexWhereT = None,
- set_: _OnConflictSetT = None,
- where: _OnConflictWhereT = None,
- ):
- super().__init__(
- index_elements=index_elements,
- index_where=index_where,
- )
-
- if isinstance(set_, dict):
- if not set_:
- raise ValueError("set parameter dictionary must not be empty")
- elif isinstance(set_, ColumnCollection):
- set_ = dict(set_)
- else:
- raise ValueError(
- "set parameter must be a non-empty dictionary "
- "or a ColumnCollection such as the `.c.` collection "
- "of a Table object"
- )
- self.update_values_to_set = [
- (coercions.expect(roles.DMLColumnRole, key), value)
- for key, value in set_.items()
- ]
- self.update_whereclause = where
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py
deleted file mode 100644
index ec29802..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/json.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# dialects/sqlite/json.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-from ... import types as sqltypes
-
-
-class JSON(sqltypes.JSON):
- """SQLite JSON type.
-
- SQLite supports JSON as of version 3.9 through its JSON1_ extension. Note
- that JSON1_ is a
- `loadable extension <https://www.sqlite.org/loadext.html>`_ and as such
- may not be available, or may require run-time loading.
-
- :class:`_sqlite.JSON` is used automatically whenever the base
- :class:`_types.JSON` datatype is used against a SQLite backend.
-
- .. seealso::
-
- :class:`_types.JSON` - main documentation for the generic
- cross-platform JSON datatype.
-
- The :class:`_sqlite.JSON` type supports persistence of JSON values
- as well as the core index operations provided by :class:`_types.JSON`
- datatype, by adapting the operations to render the ``JSON_EXTRACT``
- function wrapped in the ``JSON_QUOTE`` function at the database level.
- Extracted values are quoted in order to ensure that the results are
- always JSON string values.
-
-
- .. versionadded:: 1.3
-
-
- .. _JSON1: https://www.sqlite.org/json1.html
-
- """
-
-
-# Note: these objects currently match exactly those of MySQL, however since
-# these are not generalizable to all JSON implementations, remain separately
-# implemented for each dialect.
-class _FormatTypeMixin:
- def _format_value(self, value):
- raise NotImplementedError()
-
- def bind_processor(self, dialect):
- super_proc = self.string_bind_processor(dialect)
-
- def process(value):
- value = self._format_value(value)
- if super_proc:
- value = super_proc(value)
- return value
-
- return process
-
- def literal_processor(self, dialect):
- super_proc = self.string_literal_processor(dialect)
-
- def process(value):
- value = self._format_value(value)
- if super_proc:
- value = super_proc(value)
- return value
-
- return process
-
-
-class JSONIndexType(_FormatTypeMixin, sqltypes.JSON.JSONIndexType):
- def _format_value(self, value):
- if isinstance(value, int):
- value = "$[%s]" % value
- else:
- value = '$."%s"' % value
- return value
-
-
-class JSONPathType(_FormatTypeMixin, sqltypes.JSON.JSONPathType):
- def _format_value(self, value):
- return "$%s" % (
- "".join(
- [
- "[%s]" % elem if isinstance(elem, int) else '."%s"' % elem
- for elem in value
- ]
- )
- )
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py
deleted file mode 100644
index f18568b..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/provision.py
+++ /dev/null
@@ -1,198 +0,0 @@
-# dialects/sqlite/provision.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-import os
-import re
-
-from ... import exc
-from ...engine import url as sa_url
-from ...testing.provision import create_db
-from ...testing.provision import drop_db
-from ...testing.provision import follower_url_from_main
-from ...testing.provision import generate_driver_url
-from ...testing.provision import log
-from ...testing.provision import post_configure_engine
-from ...testing.provision import run_reap_dbs
-from ...testing.provision import stop_test_class_outside_fixtures
-from ...testing.provision import temp_table_keyword_args
-from ...testing.provision import upsert
-
-
-# TODO: I can't get this to build dynamically with pytest-xdist procs
-_drivernames = {
- "pysqlite",
- "aiosqlite",
- "pysqlcipher",
- "pysqlite_numeric",
- "pysqlite_dollar",
-}
-
-
-def _format_url(url, driver, ident):
- """given a sqlite url + desired driver + ident, make a canonical
- URL out of it
-
- """
- url = sa_url.make_url(url)
-
- if driver is None:
- driver = url.get_driver_name()
-
- filename = url.database
-
- needs_enc = driver == "pysqlcipher"
- name_token = None
-
- if filename and filename != ":memory:":
- assert "test_schema" not in filename
- tokens = re.split(r"[_\.]", filename)
-
- new_filename = f"{driver}"
-
- for token in tokens:
- if token in _drivernames:
- if driver is None:
- driver = token
- continue
- elif token in ("db", "enc"):
- continue
- elif name_token is None:
- name_token = token.strip("_")
-
- assert name_token, f"sqlite filename has no name token: {url.database}"
-
- new_filename = f"{name_token}_{driver}"
- if ident:
- new_filename += f"_{ident}"
- new_filename += ".db"
- if needs_enc:
- new_filename += ".enc"
- url = url.set(database=new_filename)
-
- if needs_enc:
- url = url.set(password="test")
-
- url = url.set(drivername="sqlite+%s" % (driver,))
-
- return url
-
-
-@generate_driver_url.for_db("sqlite")
-def generate_driver_url(url, driver, query_str):
- url = _format_url(url, driver, None)
-
- try:
- url.get_dialect()
- except exc.NoSuchModuleError:
- return None
- else:
- return url
-
-
-@follower_url_from_main.for_db("sqlite")
-def _sqlite_follower_url_from_main(url, ident):
- return _format_url(url, None, ident)
-
-
-@post_configure_engine.for_db("sqlite")
-def _sqlite_post_configure_engine(url, engine, follower_ident):
- from sqlalchemy import event
-
- if follower_ident:
- attach_path = f"{follower_ident}_{engine.driver}_test_schema.db"
- else:
- attach_path = f"{engine.driver}_test_schema.db"
-
- @event.listens_for(engine, "connect")
- def connect(dbapi_connection, connection_record):
- # use file DBs in all cases, memory acts kind of strangely
- # as an attached
-
- # NOTE! this has to be done *per connection*. New sqlite connection,
- # as we get with say, QueuePool, the attaches are gone.
- # so schemes to delete those attached files have to be done at the
- # filesystem level and not rely upon what attachments are in a
- # particular SQLite connection
- dbapi_connection.execute(
- f'ATTACH DATABASE "{attach_path}" AS test_schema'
- )
-
- @event.listens_for(engine, "engine_disposed")
- def dispose(engine):
- """most databases should be dropped using
- stop_test_class_outside_fixtures
-
- however a few tests like AttachedDBTest might not get triggered on
- that main hook
-
- """
-
- if os.path.exists(attach_path):
- os.remove(attach_path)
-
- filename = engine.url.database
-
- if filename and filename != ":memory:" and os.path.exists(filename):
- os.remove(filename)
-
-
-@create_db.for_db("sqlite")
-def _sqlite_create_db(cfg, eng, ident):
- pass
-
-
-@drop_db.for_db("sqlite")
-def _sqlite_drop_db(cfg, eng, ident):
- _drop_dbs_w_ident(eng.url.database, eng.driver, ident)
-
-
-def _drop_dbs_w_ident(databasename, driver, ident):
- for path in os.listdir("."):
- fname, ext = os.path.split(path)
- if ident in fname and ext in [".db", ".db.enc"]:
- log.info("deleting SQLite database file: %s", path)
- os.remove(path)
-
-
-@stop_test_class_outside_fixtures.for_db("sqlite")
-def stop_test_class_outside_fixtures(config, db, cls):
- db.dispose()
-
-
-@temp_table_keyword_args.for_db("sqlite")
-def _sqlite_temp_table_keyword_args(cfg, eng):
- return {"prefixes": ["TEMPORARY"]}
-
-
-@run_reap_dbs.for_db("sqlite")
-def _reap_sqlite_dbs(url, idents):
- log.info("db reaper connecting to %r", url)
- log.info("identifiers in file: %s", ", ".join(idents))
- url = sa_url.make_url(url)
- for ident in idents:
- for drivername in _drivernames:
- _drop_dbs_w_ident(url.database, drivername, ident)
-
-
-@upsert.for_db("sqlite")
-def _upsert(
- cfg, table, returning, *, set_lambda=None, sort_by_parameter_order=False
-):
- from sqlalchemy.dialects.sqlite import insert
-
- stmt = insert(table)
-
- if set_lambda:
- stmt = stmt.on_conflict_do_update(set_=set_lambda(stmt.excluded))
- else:
- stmt = stmt.on_conflict_do_nothing()
-
- stmt = stmt.returning(
- *returning, sort_by_parameter_order=sort_by_parameter_order
- )
- return stmt
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py
deleted file mode 100644
index 388a4df..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlcipher.py
+++ /dev/null
@@ -1,155 +0,0 @@
-# dialects/sqlite/pysqlcipher.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-"""
-.. dialect:: sqlite+pysqlcipher
- :name: pysqlcipher
- :dbapi: sqlcipher 3 or pysqlcipher
- :connectstring: sqlite+pysqlcipher://:passphrase@/file_path[?kdf_iter=<iter>]
-
- Dialect for support of DBAPIs that make use of the
- `SQLCipher <https://www.zetetic.net/sqlcipher>`_ backend.
-
-
-Driver
-------
-
-Current dialect selection logic is:
-
-* If the :paramref:`_sa.create_engine.module` parameter supplies a DBAPI module,
- that module is used.
-* Otherwise for Python 3, choose https://pypi.org/project/sqlcipher3/
-* If not available, fall back to https://pypi.org/project/pysqlcipher3/
-* For Python 2, https://pypi.org/project/pysqlcipher/ is used.
-
-.. warning:: The ``pysqlcipher3`` and ``pysqlcipher`` DBAPI drivers are no
- longer maintained; the ``sqlcipher3`` driver as of this writing appears
- to be current. For future compatibility, any pysqlcipher-compatible DBAPI
- may be used as follows::
-
- import sqlcipher_compatible_driver
-
- from sqlalchemy import create_engine
-
- e = create_engine(
- "sqlite+pysqlcipher://:password@/dbname.db",
- module=sqlcipher_compatible_driver
- )
-
-These drivers make use of the SQLCipher engine. This system essentially
-introduces new PRAGMA commands to SQLite which allows the setting of a
-passphrase and other encryption parameters, allowing the database file to be
-encrypted.
-
-
-Connect Strings
----------------
-
-The format of the connect string is in every way the same as that
-of the :mod:`~sqlalchemy.dialects.sqlite.pysqlite` driver, except that the
-"password" field is now accepted, which should contain a passphrase::
-
- e = create_engine('sqlite+pysqlcipher://:testing@/foo.db')
-
-For an absolute file path, two leading slashes should be used for the
-database name::
-
- e = create_engine('sqlite+pysqlcipher://:testing@//path/to/foo.db')
-
-A selection of additional encryption-related pragmas supported by SQLCipher
-as documented at https://www.zetetic.net/sqlcipher/sqlcipher-api/ can be passed
-in the query string, and will result in that PRAGMA being called for each
-new connection. Currently, ``cipher``, ``kdf_iter``
-``cipher_page_size`` and ``cipher_use_hmac`` are supported::
-
- e = create_engine('sqlite+pysqlcipher://:testing@/foo.db?cipher=aes-256-cfb&kdf_iter=64000')
-
-.. warning:: Previous versions of sqlalchemy did not take into consideration
- the encryption-related pragmas passed in the url string, that were silently
- ignored. This may cause errors when opening files saved by a
- previous sqlalchemy version if the encryption options do not match.
-
-
-Pooling Behavior
-----------------
-
-The driver makes a change to the default pool behavior of pysqlite
-as described in :ref:`pysqlite_threading_pooling`. The pysqlcipher driver
-has been observed to be significantly slower on connection than the
-pysqlite driver, most likely due to the encryption overhead, so the
-dialect here defaults to using the :class:`.SingletonThreadPool`
-implementation,
-instead of the :class:`.NullPool` pool used by pysqlite. As always, the pool
-implementation is entirely configurable using the
-:paramref:`_sa.create_engine.poolclass` parameter; the :class:`.
-StaticPool` may
-be more feasible for single-threaded use, or :class:`.NullPool` may be used
-to prevent unencrypted connections from being held open for long periods of
-time, at the expense of slower startup time for new connections.
-
-
-""" # noqa
-
-from .pysqlite import SQLiteDialect_pysqlite
-from ... import pool
-
-
-class SQLiteDialect_pysqlcipher(SQLiteDialect_pysqlite):
- driver = "pysqlcipher"
- supports_statement_cache = True
-
- pragmas = ("kdf_iter", "cipher", "cipher_page_size", "cipher_use_hmac")
-
- @classmethod
- def import_dbapi(cls):
- try:
- import sqlcipher3 as sqlcipher
- except ImportError:
- pass
- else:
- return sqlcipher
-
- from pysqlcipher3 import dbapi2 as sqlcipher
-
- return sqlcipher
-
- @classmethod
- def get_pool_class(cls, url):
- return pool.SingletonThreadPool
-
- def on_connect_url(self, url):
- super_on_connect = super().on_connect_url(url)
-
- # pull the info we need from the URL early. Even though URL
- # is immutable, we don't want any in-place changes to the URL
- # to affect things
- passphrase = url.password or ""
- url_query = dict(url.query)
-
- def on_connect(conn):
- cursor = conn.cursor()
- cursor.execute('pragma key="%s"' % passphrase)
- for prag in self.pragmas:
- value = url_query.get(prag, None)
- if value is not None:
- cursor.execute('pragma %s="%s"' % (prag, value))
- cursor.close()
-
- if super_on_connect:
- super_on_connect(conn)
-
- return on_connect
-
- def create_connect_args(self, url):
- plain_url = url._replace(password=None)
- plain_url = plain_url.difference_update_query(self.pragmas)
- return super().create_connect_args(plain_url)
-
-
-dialect = SQLiteDialect_pysqlcipher
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py
deleted file mode 100644
index f39baf3..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/pysqlite.py
+++ /dev/null
@@ -1,756 +0,0 @@
-# dialects/sqlite/pysqlite.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-r"""
-.. dialect:: sqlite+pysqlite
- :name: pysqlite
- :dbapi: sqlite3
- :connectstring: sqlite+pysqlite:///file_path
- :url: https://docs.python.org/library/sqlite3.html
-
- Note that ``pysqlite`` is the same driver as the ``sqlite3``
- module included with the Python distribution.
-
-Driver
-------
-
-The ``sqlite3`` Python DBAPI is standard on all modern Python versions;
-for cPython and Pypy, no additional installation is necessary.
-
-
-Connect Strings
----------------
-
-The file specification for the SQLite database is taken as the "database"
-portion of the URL. Note that the format of a SQLAlchemy url is::
-
- driver://user:pass@host/database
-
-This means that the actual filename to be used starts with the characters to
-the **right** of the third slash. So connecting to a relative filepath
-looks like::
-
- # relative path
- e = create_engine('sqlite:///path/to/database.db')
-
-An absolute path, which is denoted by starting with a slash, means you
-need **four** slashes::
-
- # absolute path
- e = create_engine('sqlite:////path/to/database.db')
-
-To use a Windows path, regular drive specifications and backslashes can be
-used. Double backslashes are probably needed::
-
- # absolute path on Windows
- e = create_engine('sqlite:///C:\\path\\to\\database.db')
-
-To use sqlite ``:memory:`` database specify it as the filename using
-``sqlite://:memory:``. It's also the default if no filepath is
-present, specifying only ``sqlite://`` and nothing else::
-
- # in-memory database
- e = create_engine('sqlite://:memory:')
- # also in-memory database
- e2 = create_engine('sqlite://')
-
-.. _pysqlite_uri_connections:
-
-URI Connections
-^^^^^^^^^^^^^^^
-
-Modern versions of SQLite support an alternative system of connecting using a
-`driver level URI <https://www.sqlite.org/uri.html>`_, which has the advantage
-that additional driver-level arguments can be passed including options such as
-"read only". The Python sqlite3 driver supports this mode under modern Python
-3 versions. The SQLAlchemy pysqlite driver supports this mode of use by
-specifying "uri=true" in the URL query string. The SQLite-level "URI" is kept
-as the "database" portion of the SQLAlchemy url (that is, following a slash)::
-
- e = create_engine("sqlite:///file:path/to/database?mode=ro&uri=true")
-
-.. note:: The "uri=true" parameter must appear in the **query string**
- of the URL. It will not currently work as expected if it is only
- present in the :paramref:`_sa.create_engine.connect_args`
- parameter dictionary.
-
-The logic reconciles the simultaneous presence of SQLAlchemy's query string and
-SQLite's query string by separating out the parameters that belong to the
-Python sqlite3 driver vs. those that belong to the SQLite URI. This is
-achieved through the use of a fixed list of parameters known to be accepted by
-the Python side of the driver. For example, to include a URL that indicates
-the Python sqlite3 "timeout" and "check_same_thread" parameters, along with the
-SQLite "mode" and "nolock" parameters, they can all be passed together on the
-query string::
-
- e = create_engine(
- "sqlite:///file:path/to/database?"
- "check_same_thread=true&timeout=10&mode=ro&nolock=1&uri=true"
- )
-
-Above, the pysqlite / sqlite3 DBAPI would be passed arguments as::
-
- sqlite3.connect(
- "file:path/to/database?mode=ro&nolock=1",
- check_same_thread=True, timeout=10, uri=True
- )
-
-Regarding future parameters added to either the Python or native drivers. new
-parameter names added to the SQLite URI scheme should be automatically
-accommodated by this scheme. New parameter names added to the Python driver
-side can be accommodated by specifying them in the
-:paramref:`_sa.create_engine.connect_args` dictionary,
-until dialect support is
-added by SQLAlchemy. For the less likely case that the native SQLite driver
-adds a new parameter name that overlaps with one of the existing, known Python
-driver parameters (such as "timeout" perhaps), SQLAlchemy's dialect would
-require adjustment for the URL scheme to continue to support this.
-
-As is always the case for all SQLAlchemy dialects, the entire "URL" process
-can be bypassed in :func:`_sa.create_engine` through the use of the
-:paramref:`_sa.create_engine.creator`
-parameter which allows for a custom callable
-that creates a Python sqlite3 driver level connection directly.
-
-.. versionadded:: 1.3.9
-
-.. seealso::
-
- `Uniform Resource Identifiers <https://www.sqlite.org/uri.html>`_ - in
- the SQLite documentation
-
-.. _pysqlite_regexp:
-
-Regular Expression Support
----------------------------
-
-.. versionadded:: 1.4
-
-Support for the :meth:`_sql.ColumnOperators.regexp_match` operator is provided
-using Python's re.search_ function. SQLite itself does not include a working
-regular expression operator; instead, it includes a non-implemented placeholder
-operator ``REGEXP`` that calls a user-defined function that must be provided.
-
-SQLAlchemy's implementation makes use of the pysqlite create_function_ hook
-as follows::
-
-
- def regexp(a, b):
- return re.search(a, b) is not None
-
- sqlite_connection.create_function(
- "regexp", 2, regexp,
- )
-
-There is currently no support for regular expression flags as a separate
-argument, as these are not supported by SQLite's REGEXP operator, however these
-may be included inline within the regular expression string. See `Python regular expressions`_ for
-details.
-
-.. seealso::
-
- `Python regular expressions`_: Documentation for Python's regular expression syntax.
-
-.. _create_function: https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function
-
-.. _re.search: https://docs.python.org/3/library/re.html#re.search
-
-.. _Python regular expressions: https://docs.python.org/3/library/re.html#re.search
-
-
-
-Compatibility with sqlite3 "native" date and datetime types
------------------------------------------------------------
-
-The pysqlite driver includes the sqlite3.PARSE_DECLTYPES and
-sqlite3.PARSE_COLNAMES options, which have the effect of any column
-or expression explicitly cast as "date" or "timestamp" will be converted
-to a Python date or datetime object. The date and datetime types provided
-with the pysqlite dialect are not currently compatible with these options,
-since they render the ISO date/datetime including microseconds, which
-pysqlite's driver does not. Additionally, SQLAlchemy does not at
-this time automatically render the "cast" syntax required for the
-freestanding functions "current_timestamp" and "current_date" to return
-datetime/date types natively. Unfortunately, pysqlite
-does not provide the standard DBAPI types in ``cursor.description``,
-leaving SQLAlchemy with no way to detect these types on the fly
-without expensive per-row type checks.
-
-Keeping in mind that pysqlite's parsing option is not recommended,
-nor should be necessary, for use with SQLAlchemy, usage of PARSE_DECLTYPES
-can be forced if one configures "native_datetime=True" on create_engine()::
-
- engine = create_engine('sqlite://',
- connect_args={'detect_types':
- sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES},
- native_datetime=True
- )
-
-With this flag enabled, the DATE and TIMESTAMP types (but note - not the
-DATETIME or TIME types...confused yet ?) will not perform any bind parameter
-or result processing. Execution of "func.current_date()" will return a string.
-"func.current_timestamp()" is registered as returning a DATETIME type in
-SQLAlchemy, so this function still receives SQLAlchemy-level result
-processing.
-
-.. _pysqlite_threading_pooling:
-
-Threading/Pooling Behavior
----------------------------
-
-The ``sqlite3`` DBAPI by default prohibits the use of a particular connection
-in a thread which is not the one in which it was created. As SQLite has
-matured, it's behavior under multiple threads has improved, and even includes
-options for memory only databases to be used in multiple threads.
-
-The thread prohibition is known as "check same thread" and may be controlled
-using the ``sqlite3`` parameter ``check_same_thread``, which will disable or
-enable this check. SQLAlchemy's default behavior here is to set
-``check_same_thread`` to ``False`` automatically whenever a file-based database
-is in use, to establish compatibility with the default pool class
-:class:`.QueuePool`.
-
-The SQLAlchemy ``pysqlite`` DBAPI establishes the connection pool differently
-based on the kind of SQLite database that's requested:
-
-* When a ``:memory:`` SQLite database is specified, the dialect by default
- will use :class:`.SingletonThreadPool`. This pool maintains a single
- connection per thread, so that all access to the engine within the current
- thread use the same ``:memory:`` database - other threads would access a
- different ``:memory:`` database. The ``check_same_thread`` parameter
- defaults to ``True``.
-* When a file-based database is specified, the dialect will use
- :class:`.QueuePool` as the source of connections. at the same time,
- the ``check_same_thread`` flag is set to False by default unless overridden.
-
- .. versionchanged:: 2.0
-
- SQLite file database engines now use :class:`.QueuePool` by default.
- Previously, :class:`.NullPool` were used. The :class:`.NullPool` class
- may be used by specifying it via the
- :paramref:`_sa.create_engine.poolclass` parameter.
-
-Disabling Connection Pooling for File Databases
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-Pooling may be disabled for a file based database by specifying the
-:class:`.NullPool` implementation for the :func:`_sa.create_engine.poolclass`
-parameter::
-
- from sqlalchemy import NullPool
- engine = create_engine("sqlite:///myfile.db", poolclass=NullPool)
-
-It's been observed that the :class:`.NullPool` implementation incurs an
-extremely small performance overhead for repeated checkouts due to the lack of
-connection re-use implemented by :class:`.QueuePool`. However, it still
-may be beneficial to use this class if the application is experiencing
-issues with files being locked.
-
-Using a Memory Database in Multiple Threads
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-To use a ``:memory:`` database in a multithreaded scenario, the same
-connection object must be shared among threads, since the database exists
-only within the scope of that connection. The
-:class:`.StaticPool` implementation will maintain a single connection
-globally, and the ``check_same_thread`` flag can be passed to Pysqlite
-as ``False``::
-
- from sqlalchemy.pool import StaticPool
- engine = create_engine('sqlite://',
- connect_args={'check_same_thread':False},
- poolclass=StaticPool)
-
-Note that using a ``:memory:`` database in multiple threads requires a recent
-version of SQLite.
-
-Using Temporary Tables with SQLite
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-Due to the way SQLite deals with temporary tables, if you wish to use a
-temporary table in a file-based SQLite database across multiple checkouts
-from the connection pool, such as when using an ORM :class:`.Session` where
-the temporary table should continue to remain after :meth:`.Session.commit` or
-:meth:`.Session.rollback` is called, a pool which maintains a single
-connection must be used. Use :class:`.SingletonThreadPool` if the scope is
-only needed within the current thread, or :class:`.StaticPool` is scope is
-needed within multiple threads for this case::
-
- # maintain the same connection per thread
- from sqlalchemy.pool import SingletonThreadPool
- engine = create_engine('sqlite:///mydb.db',
- poolclass=SingletonThreadPool)
-
-
- # maintain the same connection across all threads
- from sqlalchemy.pool import StaticPool
- engine = create_engine('sqlite:///mydb.db',
- poolclass=StaticPool)
-
-Note that :class:`.SingletonThreadPool` should be configured for the number
-of threads that are to be used; beyond that number, connections will be
-closed out in a non deterministic way.
-
-
-Dealing with Mixed String / Binary Columns
-------------------------------------------------------
-
-The SQLite database is weakly typed, and as such it is possible when using
-binary values, which in Python are represented as ``b'some string'``, that a
-particular SQLite database can have data values within different rows where
-some of them will be returned as a ``b''`` value by the Pysqlite driver, and
-others will be returned as Python strings, e.g. ``''`` values. This situation
-is not known to occur if the SQLAlchemy :class:`.LargeBinary` datatype is used
-consistently, however if a particular SQLite database has data that was
-inserted using the Pysqlite driver directly, or when using the SQLAlchemy
-:class:`.String` type which was later changed to :class:`.LargeBinary`, the
-table will not be consistently readable because SQLAlchemy's
-:class:`.LargeBinary` datatype does not handle strings so it has no way of
-"encoding" a value that is in string format.
-
-To deal with a SQLite table that has mixed string / binary data in the
-same column, use a custom type that will check each row individually::
-
- from sqlalchemy import String
- from sqlalchemy import TypeDecorator
-
- class MixedBinary(TypeDecorator):
- impl = String
- cache_ok = True
-
- def process_result_value(self, value, dialect):
- if isinstance(value, str):
- value = bytes(value, 'utf-8')
- elif value is not None:
- value = bytes(value)
-
- return value
-
-Then use the above ``MixedBinary`` datatype in the place where
-:class:`.LargeBinary` would normally be used.
-
-.. _pysqlite_serializable:
-
-Serializable isolation / Savepoints / Transactional DDL
--------------------------------------------------------
-
-In the section :ref:`sqlite_concurrency`, we refer to the pysqlite
-driver's assortment of issues that prevent several features of SQLite
-from working correctly. The pysqlite DBAPI driver has several
-long-standing bugs which impact the correctness of its transactional
-behavior. In its default mode of operation, SQLite features such as
-SERIALIZABLE isolation, transactional DDL, and SAVEPOINT support are
-non-functional, and in order to use these features, workarounds must
-be taken.
-
-The issue is essentially that the driver attempts to second-guess the user's
-intent, failing to start transactions and sometimes ending them prematurely, in
-an effort to minimize the SQLite databases's file locking behavior, even
-though SQLite itself uses "shared" locks for read-only activities.
-
-SQLAlchemy chooses to not alter this behavior by default, as it is the
-long-expected behavior of the pysqlite driver; if and when the pysqlite
-driver attempts to repair these issues, that will be more of a driver towards
-defaults for SQLAlchemy.
-
-The good news is that with a few events, we can implement transactional
-support fully, by disabling pysqlite's feature entirely and emitting BEGIN
-ourselves. This is achieved using two event listeners::
-
- from sqlalchemy import create_engine, event
-
- engine = create_engine("sqlite:///myfile.db")
-
- @event.listens_for(engine, "connect")
- def do_connect(dbapi_connection, connection_record):
- # disable pysqlite's emitting of the BEGIN statement entirely.
- # also stops it from emitting COMMIT before any DDL.
- dbapi_connection.isolation_level = None
-
- @event.listens_for(engine, "begin")
- def do_begin(conn):
- # emit our own BEGIN
- conn.exec_driver_sql("BEGIN")
-
-.. warning:: When using the above recipe, it is advised to not use the
- :paramref:`.Connection.execution_options.isolation_level` setting on
- :class:`_engine.Connection` and :func:`_sa.create_engine`
- with the SQLite driver,
- as this function necessarily will also alter the ".isolation_level" setting.
-
-
-Above, we intercept a new pysqlite connection and disable any transactional
-integration. Then, at the point at which SQLAlchemy knows that transaction
-scope is to begin, we emit ``"BEGIN"`` ourselves.
-
-When we take control of ``"BEGIN"``, we can also control directly SQLite's
-locking modes, introduced at
-`BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_,
-by adding the desired locking mode to our ``"BEGIN"``::
-
- @event.listens_for(engine, "begin")
- def do_begin(conn):
- conn.exec_driver_sql("BEGIN EXCLUSIVE")
-
-.. seealso::
-
- `BEGIN TRANSACTION <https://sqlite.org/lang_transaction.html>`_ -
- on the SQLite site
-
- `sqlite3 SELECT does not BEGIN a transaction <https://bugs.python.org/issue9924>`_ -
- on the Python bug tracker
-
- `sqlite3 module breaks transactions and potentially corrupts data <https://bugs.python.org/issue10740>`_ -
- on the Python bug tracker
-
-.. _pysqlite_udfs:
-
-User-Defined Functions
-----------------------
-
-pysqlite supports a `create_function() <https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function>`_
-method that allows us to create our own user-defined functions (UDFs) in Python and use them directly in SQLite queries.
-These functions are registered with a specific DBAPI Connection.
-
-SQLAlchemy uses connection pooling with file-based SQLite databases, so we need to ensure that the UDF is attached to the
-connection when it is created. That is accomplished with an event listener::
-
- from sqlalchemy import create_engine
- from sqlalchemy import event
- from sqlalchemy import text
-
-
- def udf():
- return "udf-ok"
-
-
- engine = create_engine("sqlite:///./db_file")
-
-
- @event.listens_for(engine, "connect")
- def connect(conn, rec):
- conn.create_function("udf", 0, udf)
-
-
- for i in range(5):
- with engine.connect() as conn:
- print(conn.scalar(text("SELECT UDF()")))
-
-
-""" # noqa
-
-import math
-import os
-import re
-
-from .base import DATE
-from .base import DATETIME
-from .base import SQLiteDialect
-from ... import exc
-from ... import pool
-from ... import types as sqltypes
-from ... import util
-
-
-class _SQLite_pysqliteTimeStamp(DATETIME):
- def bind_processor(self, dialect):
- if dialect.native_datetime:
- return None
- else:
- return DATETIME.bind_processor(self, dialect)
-
- def result_processor(self, dialect, coltype):
- if dialect.native_datetime:
- return None
- else:
- return DATETIME.result_processor(self, dialect, coltype)
-
-
-class _SQLite_pysqliteDate(DATE):
- def bind_processor(self, dialect):
- if dialect.native_datetime:
- return None
- else:
- return DATE.bind_processor(self, dialect)
-
- def result_processor(self, dialect, coltype):
- if dialect.native_datetime:
- return None
- else:
- return DATE.result_processor(self, dialect, coltype)
-
-
-class SQLiteDialect_pysqlite(SQLiteDialect):
- default_paramstyle = "qmark"
- supports_statement_cache = True
- returns_native_bytes = True
-
- colspecs = util.update_copy(
- SQLiteDialect.colspecs,
- {
- sqltypes.Date: _SQLite_pysqliteDate,
- sqltypes.TIMESTAMP: _SQLite_pysqliteTimeStamp,
- },
- )
-
- description_encoding = None
-
- driver = "pysqlite"
-
- @classmethod
- def import_dbapi(cls):
- from sqlite3 import dbapi2 as sqlite
-
- return sqlite
-
- @classmethod
- def _is_url_file_db(cls, url):
- if (url.database and url.database != ":memory:") and (
- url.query.get("mode", None) != "memory"
- ):
- return True
- else:
- return False
-
- @classmethod
- def get_pool_class(cls, url):
- if cls._is_url_file_db(url):
- return pool.QueuePool
- else:
- return pool.SingletonThreadPool
-
- def _get_server_version_info(self, connection):
- return self.dbapi.sqlite_version_info
-
- _isolation_lookup = SQLiteDialect._isolation_lookup.union(
- {
- "AUTOCOMMIT": None,
- }
- )
-
- def set_isolation_level(self, dbapi_connection, level):
- if level == "AUTOCOMMIT":
- dbapi_connection.isolation_level = None
- else:
- dbapi_connection.isolation_level = ""
- return super().set_isolation_level(dbapi_connection, level)
-
- def on_connect(self):
- def regexp(a, b):
- if b is None:
- return None
- return re.search(a, b) is not None
-
- if util.py38 and self._get_server_version_info(None) >= (3, 9):
- # sqlite must be greater than 3.8.3 for deterministic=True
- # https://docs.python.org/3/library/sqlite3.html#sqlite3.Connection.create_function
- # the check is more conservative since there were still issues
- # with following 3.8 sqlite versions
- create_func_kw = {"deterministic": True}
- else:
- create_func_kw = {}
-
- def set_regexp(dbapi_connection):
- dbapi_connection.create_function(
- "regexp", 2, regexp, **create_func_kw
- )
-
- def floor_func(dbapi_connection):
- # NOTE: floor is optionally present in sqlite 3.35+ , however
- # as it is normally non-present we deliver floor() unconditionally
- # for now.
- # https://www.sqlite.org/lang_mathfunc.html
- dbapi_connection.create_function(
- "floor", 1, math.floor, **create_func_kw
- )
-
- fns = [set_regexp, floor_func]
-
- def connect(conn):
- for fn in fns:
- fn(conn)
-
- return connect
-
- def create_connect_args(self, url):
- if url.username or url.password or url.host or url.port:
- raise exc.ArgumentError(
- "Invalid SQLite URL: %s\n"
- "Valid SQLite URL forms are:\n"
- " sqlite:///:memory: (or, sqlite://)\n"
- " sqlite:///relative/path/to/file.db\n"
- " sqlite:////absolute/path/to/file.db" % (url,)
- )
-
- # theoretically, this list can be augmented, at least as far as
- # parameter names accepted by sqlite3/pysqlite, using
- # inspect.getfullargspec(). for the moment this seems like overkill
- # as these parameters don't change very often, and as always,
- # parameters passed to connect_args will always go to the
- # sqlite3/pysqlite driver.
- pysqlite_args = [
- ("uri", bool),
- ("timeout", float),
- ("isolation_level", str),
- ("detect_types", int),
- ("check_same_thread", bool),
- ("cached_statements", int),
- ]
- opts = url.query
- pysqlite_opts = {}
- for key, type_ in pysqlite_args:
- util.coerce_kw_type(opts, key, type_, dest=pysqlite_opts)
-
- if pysqlite_opts.get("uri", False):
- uri_opts = dict(opts)
- # here, we are actually separating the parameters that go to
- # sqlite3/pysqlite vs. those that go the SQLite URI. What if
- # two names conflict? again, this seems to be not the case right
- # now, and in the case that new names are added to
- # either side which overlap, again the sqlite3/pysqlite parameters
- # can be passed through connect_args instead of in the URL.
- # If SQLite native URIs add a parameter like "timeout" that
- # we already have listed here for the python driver, then we need
- # to adjust for that here.
- for key, type_ in pysqlite_args:
- uri_opts.pop(key, None)
- filename = url.database
- if uri_opts:
- # sorting of keys is for unit test support
- filename += "?" + (
- "&".join(
- "%s=%s" % (key, uri_opts[key])
- for key in sorted(uri_opts)
- )
- )
- else:
- filename = url.database or ":memory:"
- if filename != ":memory:":
- filename = os.path.abspath(filename)
-
- pysqlite_opts.setdefault(
- "check_same_thread", not self._is_url_file_db(url)
- )
-
- return ([filename], pysqlite_opts)
-
- def is_disconnect(self, e, connection, cursor):
- return isinstance(
- e, self.dbapi.ProgrammingError
- ) and "Cannot operate on a closed database." in str(e)
-
-
-dialect = SQLiteDialect_pysqlite
-
-
-class _SQLiteDialect_pysqlite_numeric(SQLiteDialect_pysqlite):
- """numeric dialect for testing only
-
- internal use only. This dialect is **NOT** supported by SQLAlchemy
- and may change at any time.
-
- """
-
- supports_statement_cache = True
- default_paramstyle = "numeric"
- driver = "pysqlite_numeric"
-
- _first_bind = ":1"
- _not_in_statement_regexp = None
-
- def __init__(self, *arg, **kw):
- kw.setdefault("paramstyle", "numeric")
- super().__init__(*arg, **kw)
-
- def create_connect_args(self, url):
- arg, opts = super().create_connect_args(url)
- opts["factory"] = self._fix_sqlite_issue_99953()
- return arg, opts
-
- def _fix_sqlite_issue_99953(self):
- import sqlite3
-
- first_bind = self._first_bind
- if self._not_in_statement_regexp:
- nis = self._not_in_statement_regexp
-
- def _test_sql(sql):
- m = nis.search(sql)
- assert not m, f"Found {nis.pattern!r} in {sql!r}"
-
- else:
-
- def _test_sql(sql):
- pass
-
- def _numeric_param_as_dict(parameters):
- if parameters:
- assert isinstance(parameters, tuple)
- return {
- str(idx): value for idx, value in enumerate(parameters, 1)
- }
- else:
- return ()
-
- class SQLiteFix99953Cursor(sqlite3.Cursor):
- def execute(self, sql, parameters=()):
- _test_sql(sql)
- if first_bind in sql:
- parameters = _numeric_param_as_dict(parameters)
- return super().execute(sql, parameters)
-
- def executemany(self, sql, parameters):
- _test_sql(sql)
- if first_bind in sql:
- parameters = [
- _numeric_param_as_dict(p) for p in parameters
- ]
- return super().executemany(sql, parameters)
-
- class SQLiteFix99953Connection(sqlite3.Connection):
- def cursor(self, factory=None):
- if factory is None:
- factory = SQLiteFix99953Cursor
- return super().cursor(factory=factory)
-
- def execute(self, sql, parameters=()):
- _test_sql(sql)
- if first_bind in sql:
- parameters = _numeric_param_as_dict(parameters)
- return super().execute(sql, parameters)
-
- def executemany(self, sql, parameters):
- _test_sql(sql)
- if first_bind in sql:
- parameters = [
- _numeric_param_as_dict(p) for p in parameters
- ]
- return super().executemany(sql, parameters)
-
- return SQLiteFix99953Connection
-
-
-class _SQLiteDialect_pysqlite_dollar(_SQLiteDialect_pysqlite_numeric):
- """numeric dialect that uses $ for testing only
-
- internal use only. This dialect is **NOT** supported by SQLAlchemy
- and may change at any time.
-
- """
-
- supports_statement_cache = True
- default_paramstyle = "numeric_dollar"
- driver = "pysqlite_dollar"
-
- _first_bind = "$1"
- _not_in_statement_regexp = re.compile(r"[^\d]:\d+")
-
- def __init__(self, *arg, **kw):
- kw.setdefault("paramstyle", "numeric_dollar")
- super().__init__(*arg, **kw)