summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__init__.py88
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/__init__.cpython-311.pycbin2226 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/aioodbc.cpython-311.pycbin2591 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/base.cpython-311.pycbin157867 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/information_schema.cpython-311.pycbin9859 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/json.cpython-311.pycbin5828 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/provision.cpython-311.pycbin8399 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pymssql.cpython-311.pycbin6678 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pyodbc.cpython-311.pycbin33146 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/aioodbc.py64
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py4007
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/information_schema.py254
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/json.py133
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/provision.py155
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pymssql.py125
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pyodbc.py745
16 files changed, 0 insertions, 5571 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__init__.py
deleted file mode 100644
index 19ab7c4..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__init__.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# dialects/mssql/__init__.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-from . import aioodbc # noqa
-from . import base # noqa
-from . import pymssql # noqa
-from . import pyodbc # noqa
-from .base import BIGINT
-from .base import BINARY
-from .base import BIT
-from .base import CHAR
-from .base import DATE
-from .base import DATETIME
-from .base import DATETIME2
-from .base import DATETIMEOFFSET
-from .base import DECIMAL
-from .base import DOUBLE_PRECISION
-from .base import FLOAT
-from .base import IMAGE
-from .base import INTEGER
-from .base import JSON
-from .base import MONEY
-from .base import NCHAR
-from .base import NTEXT
-from .base import NUMERIC
-from .base import NVARCHAR
-from .base import REAL
-from .base import ROWVERSION
-from .base import SMALLDATETIME
-from .base import SMALLINT
-from .base import SMALLMONEY
-from .base import SQL_VARIANT
-from .base import TEXT
-from .base import TIME
-from .base import TIMESTAMP
-from .base import TINYINT
-from .base import UNIQUEIDENTIFIER
-from .base import VARBINARY
-from .base import VARCHAR
-from .base import XML
-from ...sql import try_cast
-
-
-base.dialect = dialect = pyodbc.dialect
-
-
-__all__ = (
- "JSON",
- "INTEGER",
- "BIGINT",
- "SMALLINT",
- "TINYINT",
- "VARCHAR",
- "NVARCHAR",
- "CHAR",
- "NCHAR",
- "TEXT",
- "NTEXT",
- "DECIMAL",
- "NUMERIC",
- "FLOAT",
- "DATETIME",
- "DATETIME2",
- "DATETIMEOFFSET",
- "DATE",
- "DOUBLE_PRECISION",
- "TIME",
- "SMALLDATETIME",
- "BINARY",
- "VARBINARY",
- "BIT",
- "REAL",
- "IMAGE",
- "TIMESTAMP",
- "ROWVERSION",
- "MONEY",
- "SMALLMONEY",
- "UNIQUEIDENTIFIER",
- "SQL_VARIANT",
- "XML",
- "dialect",
- "try_cast",
-)
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index 225eebc..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/aioodbc.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/aioodbc.cpython-311.pyc
deleted file mode 100644
index 3a0585c..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/aioodbc.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/base.cpython-311.pyc
deleted file mode 100644
index 527fcdf..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/base.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/information_schema.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/information_schema.cpython-311.pyc
deleted file mode 100644
index 88bc790..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/information_schema.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/json.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/json.cpython-311.pyc
deleted file mode 100644
index 6312e58..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/json.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/provision.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/provision.cpython-311.pyc
deleted file mode 100644
index fd72045..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/provision.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pymssql.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pymssql.cpython-311.pyc
deleted file mode 100644
index 84555dc..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pymssql.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pyodbc.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pyodbc.cpython-311.pyc
deleted file mode 100644
index 19ecd43..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pyodbc.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/aioodbc.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/aioodbc.py
deleted file mode 100644
index 65945d9..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/aioodbc.py
+++ /dev/null
@@ -1,64 +0,0 @@
-# dialects/mssql/aioodbc.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-r"""
-.. dialect:: mssql+aioodbc
- :name: aioodbc
- :dbapi: aioodbc
- :connectstring: mssql+aioodbc://<username>:<password>@<dsnname>
- :url: https://pypi.org/project/aioodbc/
-
-
-Support for the SQL Server database in asyncio style, using the aioodbc
-driver which itself is a thread-wrapper around pyodbc.
-
-.. versionadded:: 2.0.23 Added the mssql+aioodbc dialect which builds
- on top of the pyodbc and general aio* dialect architecture.
-
-Using a special asyncio mediation layer, the aioodbc dialect is usable
-as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
-extension package.
-
-Most behaviors and caveats for this driver are the same as that of the
-pyodbc dialect used on SQL Server; see :ref:`mssql_pyodbc` for general
-background.
-
-This dialect should normally be used only with the
-:func:`_asyncio.create_async_engine` engine creation function; connection
-styles are otherwise equivalent to those documented in the pyodbc section::
-
- from sqlalchemy.ext.asyncio import create_async_engine
- engine = create_async_engine(
- "mssql+aioodbc://scott:tiger@mssql2017:1433/test?"
- "driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes"
- )
-
-
-
-"""
-
-from __future__ import annotations
-
-from .pyodbc import MSDialect_pyodbc
-from .pyodbc import MSExecutionContext_pyodbc
-from ...connectors.aioodbc import aiodbcConnector
-
-
-class MSExecutionContext_aioodbc(MSExecutionContext_pyodbc):
- def create_server_side_cursor(self):
- return self._dbapi_connection.cursor(server_side=True)
-
-
-class MSDialectAsync_aioodbc(aiodbcConnector, MSDialect_pyodbc):
- driver = "aioodbc"
-
- supports_statement_cache = True
-
- execution_ctx_cls = MSExecutionContext_aioodbc
-
-
-dialect = MSDialectAsync_aioodbc
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py
deleted file mode 100644
index 872f858..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py
+++ /dev/null
@@ -1,4007 +0,0 @@
-# dialects/mssql/base.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-"""
-.. dialect:: mssql
- :name: Microsoft SQL Server
- :full_support: 2017
- :normal_support: 2012+
- :best_effort: 2005+
-
-.. _mssql_external_dialects:
-
-External Dialects
------------------
-
-In addition to the above DBAPI layers with native SQLAlchemy support, there
-are third-party dialects for other DBAPI layers that are compatible
-with SQL Server. See the "External Dialects" list on the
-:ref:`dialect_toplevel` page.
-
-.. _mssql_identity:
-
-Auto Increment Behavior / IDENTITY Columns
-------------------------------------------
-
-SQL Server provides so-called "auto incrementing" behavior using the
-``IDENTITY`` construct, which can be placed on any single integer column in a
-table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement"
-behavior for an integer primary key column, described at
-:paramref:`_schema.Column.autoincrement`. This means that by default,
-the first integer primary key column in a :class:`_schema.Table` will be
-considered to be the identity column - unless it is associated with a
-:class:`.Sequence` - and will generate DDL as such::
-
- from sqlalchemy import Table, MetaData, Column, Integer
-
- m = MetaData()
- t = Table('t', m,
- Column('id', Integer, primary_key=True),
- Column('x', Integer))
- m.create_all(engine)
-
-The above example will generate DDL as:
-
-.. sourcecode:: sql
-
- CREATE TABLE t (
- id INTEGER NOT NULL IDENTITY,
- x INTEGER NULL,
- PRIMARY KEY (id)
- )
-
-For the case where this default generation of ``IDENTITY`` is not desired,
-specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag,
-on the first integer primary key column::
-
- m = MetaData()
- t = Table('t', m,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('x', Integer))
- m.create_all(engine)
-
-To add the ``IDENTITY`` keyword to a non-primary key column, specify
-``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired
-:class:`_schema.Column` object, and ensure that
-:paramref:`_schema.Column.autoincrement`
-is set to ``False`` on any integer primary key column::
-
- m = MetaData()
- t = Table('t', m,
- Column('id', Integer, primary_key=True, autoincrement=False),
- Column('x', Integer, autoincrement=True))
- m.create_all(engine)
-
-.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
- in a :class:`_schema.Column` to specify the start and increment
- parameters of an IDENTITY. These replace
- the use of the :class:`.Sequence` object in order to specify these values.
-
-.. deprecated:: 1.4
-
- The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters
- to :class:`_schema.Column` are deprecated and should we replaced by
- an :class:`_schema.Identity` object. Specifying both ways of configuring
- an IDENTITY will result in a compile error.
- These options are also no longer returned as part of the
- ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`.
- Use the information in the ``identity`` key instead.
-
-.. deprecated:: 1.3
-
- The use of :class:`.Sequence` to specify IDENTITY characteristics is
- deprecated and will be removed in a future release. Please use
- the :class:`_schema.Identity` object parameters
- :paramref:`_schema.Identity.start` and
- :paramref:`_schema.Identity.increment`.
-
-.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence`
- object to modify IDENTITY characteristics. :class:`.Sequence` objects
- now only manipulate true T-SQL SEQUENCE types.
-
-.. note::
-
- There can only be one IDENTITY column on the table. When using
- ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not
- guard against multiple columns specifying the option simultaneously. The
- SQL Server database will instead reject the ``CREATE TABLE`` statement.
-
-.. note::
-
- An INSERT statement which attempts to provide a value for a column that is
- marked with IDENTITY will be rejected by SQL Server. In order for the
- value to be accepted, a session-level option "SET IDENTITY_INSERT" must be
- enabled. The SQLAlchemy SQL Server dialect will perform this operation
- automatically when using a core :class:`_expression.Insert`
- construct; if the
- execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT"
- option will be enabled for the span of that statement's invocation.However,
- this scenario is not high performing and should not be relied upon for
- normal use. If a table doesn't actually require IDENTITY behavior in its
- integer primary key column, the keyword should be disabled when creating
- the table by ensuring that ``autoincrement=False`` is set.
-
-Controlling "Start" and "Increment"
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-Specific control over the "start" and "increment" values for
-the ``IDENTITY`` generator are provided using the
-:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment`
-parameters passed to the :class:`_schema.Identity` object::
-
- from sqlalchemy import Table, Integer, Column, Identity
-
- test = Table(
- 'test', metadata,
- Column(
- 'id',
- Integer,
- primary_key=True,
- Identity(start=100, increment=10)
- ),
- Column('name', String(20))
- )
-
-The CREATE TABLE for the above :class:`_schema.Table` object would be:
-
-.. sourcecode:: sql
-
- CREATE TABLE test (
- id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY,
- name VARCHAR(20) NULL,
- )
-
-.. note::
-
- The :class:`_schema.Identity` object supports many other parameter in
- addition to ``start`` and ``increment``. These are not supported by
- SQL Server and will be ignored when generating the CREATE TABLE ddl.
-
-.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is
- now used to affect the
- ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server.
- Previously, the :class:`.Sequence` object was used. As SQL Server now
- supports real sequences as a separate construct, :class:`.Sequence` will be
- functional in the normal way starting from SQLAlchemy version 1.4.
-
-
-Using IDENTITY with Non-Integer numeric types
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To
-implement this pattern smoothly in SQLAlchemy, the primary datatype of the
-column should remain as ``Integer``, however the underlying implementation
-type deployed to the SQL Server database can be specified as ``Numeric`` using
-:meth:`.TypeEngine.with_variant`::
-
- from sqlalchemy import Column
- from sqlalchemy import Integer
- from sqlalchemy import Numeric
- from sqlalchemy import String
- from sqlalchemy.ext.declarative import declarative_base
-
- Base = declarative_base()
-
- class TestTable(Base):
- __tablename__ = "test"
- id = Column(
- Integer().with_variant(Numeric(10, 0), "mssql"),
- primary_key=True,
- autoincrement=True,
- )
- name = Column(String)
-
-In the above example, ``Integer().with_variant()`` provides clear usage
-information that accurately describes the intent of the code. The general
-restriction that ``autoincrement`` only applies to ``Integer`` is established
-at the metadata level and not at the per-dialect level.
-
-When using the above pattern, the primary key identifier that comes back from
-the insertion of a row, which is also the value that would be assigned to an
-ORM object such as ``TestTable`` above, will be an instance of ``Decimal()``
-and not ``int`` when using SQL Server. The numeric return type of the
-:class:`_types.Numeric` type can be changed to return floats by passing False
-to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the
-above ``Numeric(10, 0)`` to return Python ints (which also support "long"
-integer values in Python 3), use :class:`_types.TypeDecorator` as follows::
-
- from sqlalchemy import TypeDecorator
-
- class NumericAsInteger(TypeDecorator):
- '''normalize floating point return values into ints'''
-
- impl = Numeric(10, 0, asdecimal=False)
- cache_ok = True
-
- def process_result_value(self, value, dialect):
- if value is not None:
- value = int(value)
- return value
-
- class TestTable(Base):
- __tablename__ = "test"
- id = Column(
- Integer().with_variant(NumericAsInteger, "mssql"),
- primary_key=True,
- autoincrement=True,
- )
- name = Column(String)
-
-.. _mssql_insert_behavior:
-
-INSERT behavior
-^^^^^^^^^^^^^^^^
-
-Handling of the ``IDENTITY`` column at INSERT time involves two key
-techniques. The most common is being able to fetch the "last inserted value"
-for a given ``IDENTITY`` column, a process which SQLAlchemy performs
-implicitly in many cases, most importantly within the ORM.
-
-The process for fetching this value has several variants:
-
-* In the vast majority of cases, RETURNING is used in conjunction with INSERT
- statements on SQL Server in order to get newly generated primary key values:
-
- .. sourcecode:: sql
-
- INSERT INTO t (x) OUTPUT inserted.id VALUES (?)
-
- As of SQLAlchemy 2.0, the :ref:`engine_insertmanyvalues` feature is also
- used by default to optimize many-row INSERT statements; for SQL Server
- the feature takes place for both RETURNING and-non RETURNING
- INSERT statements.
-
- .. versionchanged:: 2.0.10 The :ref:`engine_insertmanyvalues` feature for
- SQL Server was temporarily disabled for SQLAlchemy version 2.0.9 due to
- issues with row ordering. As of 2.0.10 the feature is re-enabled, with
- special case handling for the unit of work's requirement for RETURNING to
- be ordered.
-
-* When RETURNING is not available or has been disabled via
- ``implicit_returning=False``, either the ``scope_identity()`` function or
- the ``@@identity`` variable is used; behavior varies by backend:
-
- * when using PyODBC, the phrase ``; select scope_identity()`` will be
- appended to the end of the INSERT statement; a second result set will be
- fetched in order to receive the value. Given a table as::
-
- t = Table(
- 't',
- metadata,
- Column('id', Integer, primary_key=True),
- Column('x', Integer),
- implicit_returning=False
- )
-
- an INSERT will look like:
-
- .. sourcecode:: sql
-
- INSERT INTO t (x) VALUES (?); select scope_identity()
-
- * Other dialects such as pymssql will call upon
- ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT
- statement. If the flag ``use_scope_identity=False`` is passed to
- :func:`_sa.create_engine`,
- the statement ``SELECT @@identity AS lastrowid``
- is used instead.
-
-A table that contains an ``IDENTITY`` column will prohibit an INSERT statement
-that refers to the identity column explicitly. The SQLAlchemy dialect will
-detect when an INSERT construct, created using a core
-:func:`_expression.insert`
-construct (not a plain string SQL), refers to the identity column, and
-in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert
-statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the
-execution. Given this example::
-
- m = MetaData()
- t = Table('t', m, Column('id', Integer, primary_key=True),
- Column('x', Integer))
- m.create_all(engine)
-
- with engine.begin() as conn:
- conn.execute(t.insert(), {'id': 1, 'x':1}, {'id':2, 'x':2})
-
-The above column will be created with IDENTITY, however the INSERT statement
-we emit is specifying explicit values. In the echo output we can see
-how SQLAlchemy handles this:
-
-.. sourcecode:: sql
-
- CREATE TABLE t (
- id INTEGER NOT NULL IDENTITY(1,1),
- x INTEGER NULL,
- PRIMARY KEY (id)
- )
-
- COMMIT
- SET IDENTITY_INSERT t ON
- INSERT INTO t (id, x) VALUES (?, ?)
- ((1, 1), (2, 2))
- SET IDENTITY_INSERT t OFF
- COMMIT
-
-
-
-This is an auxiliary use case suitable for testing and bulk insert scenarios.
-
-SEQUENCE support
-----------------
-
-The :class:`.Sequence` object creates "real" sequences, i.e.,
-``CREATE SEQUENCE``:
-
-.. sourcecode:: pycon+sql
-
- >>> from sqlalchemy import Sequence
- >>> from sqlalchemy.schema import CreateSequence
- >>> from sqlalchemy.dialects import mssql
- >>> print(CreateSequence(Sequence("my_seq", start=1)).compile(dialect=mssql.dialect()))
- {printsql}CREATE SEQUENCE my_seq START WITH 1
-
-For integer primary key generation, SQL Server's ``IDENTITY`` construct should
-generally be preferred vs. sequence.
-
-.. tip::
-
- The default start value for T-SQL is ``-2**63`` instead of 1 as
- in most other SQL databases. Users should explicitly set the
- :paramref:`.Sequence.start` to 1 if that's the expected default::
-
- seq = Sequence("my_sequence", start=1)
-
-.. versionadded:: 1.4 added SQL Server support for :class:`.Sequence`
-
-.. versionchanged:: 2.0 The SQL Server dialect will no longer implicitly
- render "START WITH 1" for ``CREATE SEQUENCE``, which was the behavior
- first implemented in version 1.4.
-
-MAX on VARCHAR / NVARCHAR
--------------------------
-
-SQL Server supports the special string "MAX" within the
-:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes,
-to indicate "maximum length possible". The dialect currently handles this as
-a length of "None" in the base type, rather than supplying a
-dialect-specific version of these types, so that a base type
-specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on
-more than one backend without using dialect-specific types.
-
-To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None::
-
- my_table = Table(
- 'my_table', metadata,
- Column('my_data', VARCHAR(None)),
- Column('my_n_data', NVARCHAR(None))
- )
-
-
-Collation Support
------------------
-
-Character collations are supported by the base string types,
-specified by the string argument "collation"::
-
- from sqlalchemy import VARCHAR
- Column('login', VARCHAR(32, collation='Latin1_General_CI_AS'))
-
-When such a column is associated with a :class:`_schema.Table`, the
-CREATE TABLE statement for this column will yield::
-
- login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL
-
-LIMIT/OFFSET Support
---------------------
-
-MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the
-"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these
-syntaxes automatically if SQL Server 2012 or greater is detected.
-
-.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and
- "FETCH NEXT n ROWS" syntax.
-
-For statements that specify only LIMIT and no OFFSET, all versions of SQL
-Server support the TOP keyword. This syntax is used for all SQL Server
-versions when no OFFSET clause is present. A statement such as::
-
- select(some_table).limit(5)
-
-will render similarly to::
-
- SELECT TOP 5 col1, col2.. FROM table
-
-For versions of SQL Server prior to SQL Server 2012, a statement that uses
-LIMIT and OFFSET, or just OFFSET alone, will be rendered using the
-``ROW_NUMBER()`` window function. A statement such as::
-
- select(some_table).order_by(some_table.c.col3).limit(5).offset(10)
-
-will render similarly to::
-
- SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2,
- ROW_NUMBER() OVER (ORDER BY col3) AS
- mssql_rn FROM table WHERE t.x = :x_1) AS
- anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1
-
-Note that when using LIMIT and/or OFFSET, whether using the older
-or newer SQL Server syntaxes, the statement must have an ORDER BY as well,
-else a :class:`.CompileError` is raised.
-
-.. _mssql_comment_support:
-
-DDL Comment Support
---------------------
-
-Comment support, which includes DDL rendering for attributes such as
-:paramref:`_schema.Table.comment` and :paramref:`_schema.Column.comment`, as
-well as the ability to reflect these comments, is supported assuming a
-supported version of SQL Server is in use. If a non-supported version such as
-Azure Synapse is detected at first-connect time (based on the presence
-of the ``fn_listextendedproperty`` SQL function), comment support including
-rendering and table-comment reflection is disabled, as both features rely upon
-SQL Server stored procedures and functions that are not available on all
-backend types.
-
-To force comment support to be on or off, bypassing autodetection, set the
-parameter ``supports_comments`` within :func:`_sa.create_engine`::
-
- e = create_engine("mssql+pyodbc://u:p@dsn", supports_comments=False)
-
-.. versionadded:: 2.0 Added support for table and column comments for
- the SQL Server dialect, including DDL generation and reflection.
-
-.. _mssql_isolation_level:
-
-Transaction Isolation Level
----------------------------
-
-All SQL Server dialects support setting of transaction isolation level
-both via a dialect-specific parameter
-:paramref:`_sa.create_engine.isolation_level`
-accepted by :func:`_sa.create_engine`,
-as well as the :paramref:`.Connection.execution_options.isolation_level`
-argument as passed to
-:meth:`_engine.Connection.execution_options`.
-This feature works by issuing the
-command ``SET TRANSACTION ISOLATION LEVEL <level>`` for
-each new connection.
-
-To set isolation level using :func:`_sa.create_engine`::
-
- engine = create_engine(
- "mssql+pyodbc://scott:tiger@ms_2008",
- isolation_level="REPEATABLE READ"
- )
-
-To set using per-connection execution options::
-
- connection = engine.connect()
- connection = connection.execution_options(
- isolation_level="READ COMMITTED"
- )
-
-Valid values for ``isolation_level`` include:
-
-* ``AUTOCOMMIT`` - pyodbc / pymssql-specific
-* ``READ COMMITTED``
-* ``READ UNCOMMITTED``
-* ``REPEATABLE READ``
-* ``SERIALIZABLE``
-* ``SNAPSHOT`` - specific to SQL Server
-
-There are also more options for isolation level configurations, such as
-"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
-different isolation level settings. See the discussion at
-:ref:`dbapi_autocommit` for background.
-
-.. seealso::
-
- :ref:`dbapi_autocommit`
-
-.. _mssql_reset_on_return:
-
-Temporary Table / Resource Reset for Connection Pooling
--------------------------------------------------------
-
-The :class:`.QueuePool` connection pool implementation used
-by the SQLAlchemy :class:`.Engine` object includes
-:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
-the DBAPI ``.rollback()`` method when connections are returned to the pool.
-While this rollback will clear out the immediate state used by the previous
-transaction, it does not cover a wider range of session-level state, including
-temporary tables as well as other server state such as prepared statement
-handles and statement caches. An undocumented SQL Server procedure known
-as ``sp_reset_connection`` is known to be a workaround for this issue which
-will reset most of the session state that builds up on a connection, including
-temporary tables.
-
-To install ``sp_reset_connection`` as the means of performing reset-on-return,
-the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the
-example below. The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
-is set to ``None`` so that the custom scheme can replace the default behavior
-completely. The custom hook implementation calls ``.rollback()`` in any case,
-as it's usually important that the DBAPI's own tracking of commit/rollback
-will remain consistent with the state of the transaction::
-
- from sqlalchemy import create_engine
- from sqlalchemy import event
-
- mssql_engine = create_engine(
- "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
-
- # disable default reset-on-return scheme
- pool_reset_on_return=None,
- )
-
-
- @event.listens_for(mssql_engine, "reset")
- def _reset_mssql(dbapi_connection, connection_record, reset_state):
- if not reset_state.terminate_only:
- dbapi_connection.execute("{call sys.sp_reset_connection}")
-
- # so that the DBAPI itself knows that the connection has been
- # reset
- dbapi_connection.rollback()
-
-.. versionchanged:: 2.0.0b3 Added additional state arguments to
- the :meth:`.PoolEvents.reset` event and additionally ensured the event
- is invoked for all "reset" occurrences, so that it's appropriate
- as a place for custom "reset" handlers. Previous schemes which
- use the :meth:`.PoolEvents.checkin` handler remain usable as well.
-
-.. seealso::
-
- :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
-
-Nullability
------------
-MSSQL has support for three levels of column nullability. The default
-nullability allows nulls and is explicit in the CREATE TABLE
-construct::
-
- name VARCHAR(20) NULL
-
-If ``nullable=None`` is specified then no specification is made. In
-other words the database's configured default is used. This will
-render::
-
- name VARCHAR(20)
-
-If ``nullable`` is ``True`` or ``False`` then the column will be
-``NULL`` or ``NOT NULL`` respectively.
-
-Date / Time Handling
---------------------
-DATE and TIME are supported. Bind parameters are converted
-to datetime.datetime() objects as required by most MSSQL drivers,
-and results are processed from strings if needed.
-The DATE and TIME types are not available for MSSQL 2005 and
-previous - if a server version below 2008 is detected, DDL
-for these types will be issued as DATETIME.
-
-.. _mssql_large_type_deprecation:
-
-Large Text/Binary Type Deprecation
-----------------------------------
-
-Per
-`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_,
-the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL
-Server in a future release. SQLAlchemy normally relates these types to the
-:class:`.UnicodeText`, :class:`_expression.TextClause` and
-:class:`.LargeBinary` datatypes.
-
-In order to accommodate this change, a new flag ``deprecate_large_types``
-is added to the dialect, which will be automatically set based on detection
-of the server version in use, if not otherwise set by the user. The
-behavior of this flag is as follows:
-
-* When this flag is ``True``, the :class:`.UnicodeText`,
- :class:`_expression.TextClause` and
- :class:`.LargeBinary` datatypes, when used to render DDL, will render the
- types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``,
- respectively. This is a new behavior as of the addition of this flag.
-
-* When this flag is ``False``, the :class:`.UnicodeText`,
- :class:`_expression.TextClause` and
- :class:`.LargeBinary` datatypes, when used to render DDL, will render the
- types ``NTEXT``, ``TEXT``, and ``IMAGE``,
- respectively. This is the long-standing behavior of these types.
-
-* The flag begins with the value ``None``, before a database connection is
- established. If the dialect is used to render DDL without the flag being
- set, it is interpreted the same as ``False``.
-
-* On first connection, the dialect detects if SQL Server version 2012 or
- greater is in use; if the flag is still at ``None``, it sets it to ``True``
- or ``False`` based on whether 2012 or greater is detected.
-
-* The flag can be set to either ``True`` or ``False`` when the dialect
- is created, typically via :func:`_sa.create_engine`::
-
- eng = create_engine("mssql+pymssql://user:pass@host/db",
- deprecate_large_types=True)
-
-* Complete control over whether the "old" or "new" types are rendered is
- available in all SQLAlchemy versions by using the UPPERCASE type objects
- instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`,
- :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`,
- :class:`_mssql.IMAGE`
- will always remain fixed and always output exactly that
- type.
-
-.. _multipart_schema_names:
-
-Multipart Schema Names
-----------------------
-
-SQL Server schemas sometimes require multiple parts to their "schema"
-qualifier, that is, including the database name and owner name as separate
-tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set
-at once using the :paramref:`_schema.Table.schema` argument of
-:class:`_schema.Table`::
-
- Table(
- "some_table", metadata,
- Column("q", String(50)),
- schema="mydatabase.dbo"
- )
-
-When performing operations such as table or component reflection, a schema
-argument that contains a dot will be split into separate
-"database" and "owner" components in order to correctly query the SQL
-Server information schema tables, as these two values are stored separately.
-Additionally, when rendering the schema name for DDL or SQL, the two
-components will be quoted separately for case sensitive names and other
-special characters. Given an argument as below::
-
- Table(
- "some_table", metadata,
- Column("q", String(50)),
- schema="MyDataBase.dbo"
- )
-
-The above schema would be rendered as ``[MyDataBase].dbo``, and also in
-reflection, would be reflected using "dbo" as the owner and "MyDataBase"
-as the database name.
-
-To control how the schema name is broken into database / owner,
-specify brackets (which in SQL Server are quoting characters) in the name.
-Below, the "owner" will be considered as ``MyDataBase.dbo`` and the
-"database" will be None::
-
- Table(
- "some_table", metadata,
- Column("q", String(50)),
- schema="[MyDataBase.dbo]"
- )
-
-To individually specify both database and owner name with special characters
-or embedded dots, use two sets of brackets::
-
- Table(
- "some_table", metadata,
- Column("q", String(50)),
- schema="[MyDataBase.Period].[MyOwner.Dot]"
- )
-
-
-.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as
- identifier delimiters splitting the schema into separate database
- and owner tokens, to allow dots within either name itself.
-
-.. _legacy_schema_rendering:
-
-Legacy Schema Mode
-------------------
-
-Very old versions of the MSSQL dialect introduced the behavior such that a
-schema-qualified table would be auto-aliased when used in a
-SELECT statement; given a table::
-
- account_table = Table(
- 'account', metadata,
- Column('id', Integer, primary_key=True),
- Column('info', String(100)),
- schema="customer_schema"
- )
-
-this legacy mode of rendering would assume that "customer_schema.account"
-would not be accepted by all parts of the SQL statement, as illustrated
-below:
-
-.. sourcecode:: pycon+sql
-
- >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True)
- >>> print(account_table.select().compile(eng))
- {printsql}SELECT account_1.id, account_1.info
- FROM customer_schema.account AS account_1
-
-This mode of behavior is now off by default, as it appears to have served
-no purpose; however in the case that legacy applications rely upon it,
-it is available using the ``legacy_schema_aliasing`` argument to
-:func:`_sa.create_engine` as illustrated above.
-
-.. deprecated:: 1.4
-
- The ``legacy_schema_aliasing`` flag is now
- deprecated and will be removed in a future release.
-
-.. _mssql_indexes:
-
-Clustered Index Support
------------------------
-
-The MSSQL dialect supports clustered indexes (and primary keys) via the
-``mssql_clustered`` option. This option is available to :class:`.Index`,
-:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`.
-For indexes this option can be combined with the ``mssql_columnstore`` one
-to create a clustered columnstore index.
-
-To generate a clustered index::
-
- Index("my_index", table.c.x, mssql_clustered=True)
-
-which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``.
-
-To generate a clustered primary key use::
-
- Table('my_table', metadata,
- Column('x', ...),
- Column('y', ...),
- PrimaryKeyConstraint("x", "y", mssql_clustered=True))
-
-which will render the table, for example, as::
-
- CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
- PRIMARY KEY CLUSTERED (x, y))
-
-Similarly, we can generate a clustered unique constraint using::
-
- Table('my_table', metadata,
- Column('x', ...),
- Column('y', ...),
- PrimaryKeyConstraint("x"),
- UniqueConstraint("y", mssql_clustered=True),
- )
-
-To explicitly request a non-clustered primary key (for example, when
-a separate clustered index is desired), use::
-
- Table('my_table', metadata,
- Column('x', ...),
- Column('y', ...),
- PrimaryKeyConstraint("x", "y", mssql_clustered=False))
-
-which will render the table, for example, as::
-
- CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL,
- PRIMARY KEY NONCLUSTERED (x, y))
-
-Columnstore Index Support
--------------------------
-
-The MSSQL dialect supports columnstore indexes via the ``mssql_columnstore``
-option. This option is available to :class:`.Index`. It be combined with
-the ``mssql_clustered`` option to create a clustered columnstore index.
-
-To generate a columnstore index::
-
- Index("my_index", table.c.x, mssql_columnstore=True)
-
-which renders the index as ``CREATE COLUMNSTORE INDEX my_index ON table (x)``.
-
-To generate a clustered columnstore index provide no columns::
-
- idx = Index("my_index", mssql_clustered=True, mssql_columnstore=True)
- # required to associate the index with the table
- table.append_constraint(idx)
-
-the above renders the index as
-``CREATE CLUSTERED COLUMNSTORE INDEX my_index ON table``.
-
-.. versionadded:: 2.0.18
-
-MSSQL-Specific Index Options
------------------------------
-
-In addition to clustering, the MSSQL dialect supports other special options
-for :class:`.Index`.
-
-INCLUDE
-^^^^^^^
-
-The ``mssql_include`` option renders INCLUDE(colname) for the given string
-names::
-
- Index("my_index", table.c.x, mssql_include=['y'])
-
-would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
-
-.. _mssql_index_where:
-
-Filtered Indexes
-^^^^^^^^^^^^^^^^
-
-The ``mssql_where`` option renders WHERE(condition) for the given string
-names::
-
- Index("my_index", table.c.x, mssql_where=table.c.x > 10)
-
-would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``.
-
-.. versionadded:: 1.3.4
-
-Index ordering
-^^^^^^^^^^^^^^
-
-Index ordering is available via functional expressions, such as::
-
- Index("my_index", table.c.x.desc())
-
-would render the index as ``CREATE INDEX my_index ON table (x DESC)``
-
-.. seealso::
-
- :ref:`schema_indexes_functional`
-
-Compatibility Levels
---------------------
-MSSQL supports the notion of setting compatibility levels at the
-database level. This allows, for instance, to run a database that
-is compatible with SQL2000 while running on a SQL2005 database
-server. ``server_version_info`` will always return the database
-server version information (in this case SQL2005) and not the
-compatibility level information. Because of this, if running under
-a backwards compatibility mode SQLAlchemy may attempt to use T-SQL
-statements that are unable to be parsed by the database server.
-
-.. _mssql_triggers:
-
-Triggers
---------
-
-SQLAlchemy by default uses OUTPUT INSERTED to get at newly
-generated primary key values via IDENTITY columns or other
-server side defaults. MS-SQL does not
-allow the usage of OUTPUT INSERTED on tables that have triggers.
-To disable the usage of OUTPUT INSERTED on a per-table basis,
-specify ``implicit_returning=False`` for each :class:`_schema.Table`
-which has triggers::
-
- Table('mytable', metadata,
- Column('id', Integer, primary_key=True),
- # ...,
- implicit_returning=False
- )
-
-Declarative form::
-
- class MyClass(Base):
- # ...
- __table_args__ = {'implicit_returning':False}
-
-
-.. _mssql_rowcount_versioning:
-
-Rowcount Support / ORM Versioning
----------------------------------
-
-The SQL Server drivers may have limited ability to return the number
-of rows updated from an UPDATE or DELETE statement.
-
-As of this writing, the PyODBC driver is not able to return a rowcount when
-OUTPUT INSERTED is used. Previous versions of SQLAlchemy therefore had
-limitations for features such as the "ORM Versioning" feature that relies upon
-accurate rowcounts in order to match version numbers with matched rows.
-
-SQLAlchemy 2.0 now retrieves the "rowcount" manually for these particular use
-cases based on counting the rows that arrived back within RETURNING; so while
-the driver still has this limitation, the ORM Versioning feature is no longer
-impacted by it. As of SQLAlchemy 2.0.5, ORM versioning has been fully
-re-enabled for the pyodbc driver.
-
-.. versionchanged:: 2.0.5 ORM versioning support is restored for the pyodbc
- driver. Previously, a warning would be emitted during ORM flush that
- versioning was not supported.
-
-
-Enabling Snapshot Isolation
----------------------------
-
-SQL Server has a default transaction
-isolation mode that locks entire tables, and causes even mildly concurrent
-applications to have long held locks and frequent deadlocks.
-Enabling snapshot isolation for the database as a whole is recommended
-for modern levels of concurrency support. This is accomplished via the
-following ALTER DATABASE commands executed at the SQL prompt::
-
- ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON
-
- ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON
-
-Background on SQL Server snapshot isolation is available at
-https://msdn.microsoft.com/en-us/library/ms175095.aspx.
-
-""" # noqa
-
-from __future__ import annotations
-
-import codecs
-import datetime
-import operator
-import re
-from typing import overload
-from typing import TYPE_CHECKING
-from uuid import UUID as _python_UUID
-
-from . import information_schema as ischema
-from .json import JSON
-from .json import JSONIndexType
-from .json import JSONPathType
-from ... import exc
-from ... import Identity
-from ... import schema as sa_schema
-from ... import Sequence
-from ... import sql
-from ... import text
-from ... import util
-from ...engine import cursor as _cursor
-from ...engine import default
-from ...engine import reflection
-from ...engine.reflection import ReflectionDefaults
-from ...sql import coercions
-from ...sql import compiler
-from ...sql import elements
-from ...sql import expression
-from ...sql import func
-from ...sql import quoted_name
-from ...sql import roles
-from ...sql import sqltypes
-from ...sql import try_cast as try_cast # noqa: F401
-from ...sql import util as sql_util
-from ...sql._typing import is_sql_compiler
-from ...sql.compiler import InsertmanyvaluesSentinelOpts
-from ...sql.elements import TryCast as TryCast # noqa: F401
-from ...types import BIGINT
-from ...types import BINARY
-from ...types import CHAR
-from ...types import DATE
-from ...types import DATETIME
-from ...types import DECIMAL
-from ...types import FLOAT
-from ...types import INTEGER
-from ...types import NCHAR
-from ...types import NUMERIC
-from ...types import NVARCHAR
-from ...types import SMALLINT
-from ...types import TEXT
-from ...types import VARCHAR
-from ...util import update_wrapper
-from ...util.typing import Literal
-
-if TYPE_CHECKING:
- from ...sql.dml import DMLState
- from ...sql.selectable import TableClause
-
-# https://sqlserverbuilds.blogspot.com/
-MS_2017_VERSION = (14,)
-MS_2016_VERSION = (13,)
-MS_2014_VERSION = (12,)
-MS_2012_VERSION = (11,)
-MS_2008_VERSION = (10,)
-MS_2005_VERSION = (9,)
-MS_2000_VERSION = (8,)
-
-RESERVED_WORDS = {
- "add",
- "all",
- "alter",
- "and",
- "any",
- "as",
- "asc",
- "authorization",
- "backup",
- "begin",
- "between",
- "break",
- "browse",
- "bulk",
- "by",
- "cascade",
- "case",
- "check",
- "checkpoint",
- "close",
- "clustered",
- "coalesce",
- "collate",
- "column",
- "commit",
- "compute",
- "constraint",
- "contains",
- "containstable",
- "continue",
- "convert",
- "create",
- "cross",
- "current",
- "current_date",
- "current_time",
- "current_timestamp",
- "current_user",
- "cursor",
- "database",
- "dbcc",
- "deallocate",
- "declare",
- "default",
- "delete",
- "deny",
- "desc",
- "disk",
- "distinct",
- "distributed",
- "double",
- "drop",
- "dump",
- "else",
- "end",
- "errlvl",
- "escape",
- "except",
- "exec",
- "execute",
- "exists",
- "exit",
- "external",
- "fetch",
- "file",
- "fillfactor",
- "for",
- "foreign",
- "freetext",
- "freetexttable",
- "from",
- "full",
- "function",
- "goto",
- "grant",
- "group",
- "having",
- "holdlock",
- "identity",
- "identity_insert",
- "identitycol",
- "if",
- "in",
- "index",
- "inner",
- "insert",
- "intersect",
- "into",
- "is",
- "join",
- "key",
- "kill",
- "left",
- "like",
- "lineno",
- "load",
- "merge",
- "national",
- "nocheck",
- "nonclustered",
- "not",
- "null",
- "nullif",
- "of",
- "off",
- "offsets",
- "on",
- "open",
- "opendatasource",
- "openquery",
- "openrowset",
- "openxml",
- "option",
- "or",
- "order",
- "outer",
- "over",
- "percent",
- "pivot",
- "plan",
- "precision",
- "primary",
- "print",
- "proc",
- "procedure",
- "public",
- "raiserror",
- "read",
- "readtext",
- "reconfigure",
- "references",
- "replication",
- "restore",
- "restrict",
- "return",
- "revert",
- "revoke",
- "right",
- "rollback",
- "rowcount",
- "rowguidcol",
- "rule",
- "save",
- "schema",
- "securityaudit",
- "select",
- "session_user",
- "set",
- "setuser",
- "shutdown",
- "some",
- "statistics",
- "system_user",
- "table",
- "tablesample",
- "textsize",
- "then",
- "to",
- "top",
- "tran",
- "transaction",
- "trigger",
- "truncate",
- "tsequal",
- "union",
- "unique",
- "unpivot",
- "update",
- "updatetext",
- "use",
- "user",
- "values",
- "varying",
- "view",
- "waitfor",
- "when",
- "where",
- "while",
- "with",
- "writetext",
-}
-
-
-class REAL(sqltypes.REAL):
- """the SQL Server REAL datatype."""
-
- def __init__(self, **kw):
- # REAL is a synonym for FLOAT(24) on SQL server.
- # it is only accepted as the word "REAL" in DDL, the numeric
- # precision value is not allowed to be present
- kw.setdefault("precision", 24)
- super().__init__(**kw)
-
-
-class DOUBLE_PRECISION(sqltypes.DOUBLE_PRECISION):
- """the SQL Server DOUBLE PRECISION datatype.
-
- .. versionadded:: 2.0.11
-
- """
-
- def __init__(self, **kw):
- # DOUBLE PRECISION is a synonym for FLOAT(53) on SQL server.
- # it is only accepted as the word "DOUBLE PRECISION" in DDL,
- # the numeric precision value is not allowed to be present
- kw.setdefault("precision", 53)
- super().__init__(**kw)
-
-
-class TINYINT(sqltypes.Integer):
- __visit_name__ = "TINYINT"
-
-
-# MSSQL DATE/TIME types have varied behavior, sometimes returning
-# strings. MSDate/TIME check for everything, and always
-# filter bind parameters into datetime objects (required by pyodbc,
-# not sure about other dialects).
-
-
-class _MSDate(sqltypes.Date):
- def bind_processor(self, dialect):
- def process(value):
- if type(value) == datetime.date:
- return datetime.datetime(value.year, value.month, value.day)
- else:
- return value
-
- return process
-
- _reg = re.compile(r"(\d+)-(\d+)-(\d+)")
-
- def result_processor(self, dialect, coltype):
- def process(value):
- if isinstance(value, datetime.datetime):
- return value.date()
- elif isinstance(value, str):
- m = self._reg.match(value)
- if not m:
- raise ValueError(
- "could not parse %r as a date value" % (value,)
- )
- return datetime.date(*[int(x or 0) for x in m.groups()])
- else:
- return value
-
- return process
-
-
-class TIME(sqltypes.TIME):
- def __init__(self, precision=None, **kwargs):
- self.precision = precision
- super().__init__()
-
- __zero_date = datetime.date(1900, 1, 1)
-
- def bind_processor(self, dialect):
- def process(value):
- if isinstance(value, datetime.datetime):
- value = datetime.datetime.combine(
- self.__zero_date, value.time()
- )
- elif isinstance(value, datetime.time):
- """issue #5339
- per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns
- pass TIME value as string
- """ # noqa
- value = str(value)
- return value
-
- return process
-
- _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?")
-
- def result_processor(self, dialect, coltype):
- def process(value):
- if isinstance(value, datetime.datetime):
- return value.time()
- elif isinstance(value, str):
- m = self._reg.match(value)
- if not m:
- raise ValueError(
- "could not parse %r as a time value" % (value,)
- )
- return datetime.time(*[int(x or 0) for x in m.groups()])
- else:
- return value
-
- return process
-
-
-_MSTime = TIME
-
-
-class _BASETIMEIMPL(TIME):
- __visit_name__ = "_BASETIMEIMPL"
-
-
-class _DateTimeBase:
- def bind_processor(self, dialect):
- def process(value):
- if type(value) == datetime.date:
- return datetime.datetime(value.year, value.month, value.day)
- else:
- return value
-
- return process
-
-
-class _MSDateTime(_DateTimeBase, sqltypes.DateTime):
- pass
-
-
-class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime):
- __visit_name__ = "SMALLDATETIME"
-
-
-class DATETIME2(_DateTimeBase, sqltypes.DateTime):
- __visit_name__ = "DATETIME2"
-
- def __init__(self, precision=None, **kw):
- super().__init__(**kw)
- self.precision = precision
-
-
-class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime):
- __visit_name__ = "DATETIMEOFFSET"
-
- def __init__(self, precision=None, **kw):
- super().__init__(**kw)
- self.precision = precision
-
-
-class _UnicodeLiteral:
- def literal_processor(self, dialect):
- def process(value):
- value = value.replace("'", "''")
-
- if dialect.identifier_preparer._double_percents:
- value = value.replace("%", "%%")
-
- return "N'%s'" % value
-
- return process
-
-
-class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode):
- pass
-
-
-class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText):
- pass
-
-
-class TIMESTAMP(sqltypes._Binary):
- """Implement the SQL Server TIMESTAMP type.
-
- Note this is **completely different** than the SQL Standard
- TIMESTAMP type, which is not supported by SQL Server. It
- is a read-only datatype that does not support INSERT of values.
-
- .. versionadded:: 1.2
-
- .. seealso::
-
- :class:`_mssql.ROWVERSION`
-
- """
-
- __visit_name__ = "TIMESTAMP"
-
- # expected by _Binary to be present
- length = None
-
- def __init__(self, convert_int=False):
- """Construct a TIMESTAMP or ROWVERSION type.
-
- :param convert_int: if True, binary integer values will
- be converted to integers on read.
-
- .. versionadded:: 1.2
-
- """
- self.convert_int = convert_int
-
- def result_processor(self, dialect, coltype):
- super_ = super().result_processor(dialect, coltype)
- if self.convert_int:
-
- def process(value):
- if super_:
- value = super_(value)
- if value is not None:
- # https://stackoverflow.com/a/30403242/34549
- value = int(codecs.encode(value, "hex"), 16)
- return value
-
- return process
- else:
- return super_
-
-
-class ROWVERSION(TIMESTAMP):
- """Implement the SQL Server ROWVERSION type.
-
- The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP
- datatype, however current SQL Server documentation suggests using
- ROWVERSION for new datatypes going forward.
-
- The ROWVERSION datatype does **not** reflect (e.g. introspect) from the
- database as itself; the returned datatype will be
- :class:`_mssql.TIMESTAMP`.
-
- This is a read-only datatype that does not support INSERT of values.
-
- .. versionadded:: 1.2
-
- .. seealso::
-
- :class:`_mssql.TIMESTAMP`
-
- """
-
- __visit_name__ = "ROWVERSION"
-
-
-class NTEXT(sqltypes.UnicodeText):
- """MSSQL NTEXT type, for variable-length unicode text up to 2^30
- characters."""
-
- __visit_name__ = "NTEXT"
-
-
-class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary):
- """The MSSQL VARBINARY type.
-
- This type adds additional features to the core :class:`_types.VARBINARY`
- type, including "deprecate_large_types" mode where
- either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL
- Server ``FILESTREAM`` option.
-
- .. seealso::
-
- :ref:`mssql_large_type_deprecation`
-
- """
-
- __visit_name__ = "VARBINARY"
-
- def __init__(self, length=None, filestream=False):
- """
- Construct a VARBINARY type.
-
- :param length: optional, a length for the column for use in
- DDL statements, for those binary types that accept a length,
- such as the MySQL BLOB type.
-
- :param filestream=False: if True, renders the ``FILESTREAM`` keyword
- in the table definition. In this case ``length`` must be ``None``
- or ``'max'``.
-
- .. versionadded:: 1.4.31
-
- """
-
- self.filestream = filestream
- if self.filestream and length not in (None, "max"):
- raise ValueError(
- "length must be None or 'max' when setting filestream"
- )
- super().__init__(length=length)
-
-
-class IMAGE(sqltypes.LargeBinary):
- __visit_name__ = "IMAGE"
-
-
-class XML(sqltypes.Text):
- """MSSQL XML type.
-
- This is a placeholder type for reflection purposes that does not include
- any Python-side datatype support. It also does not currently support
- additional arguments, such as "CONTENT", "DOCUMENT",
- "xml_schema_collection".
-
- """
-
- __visit_name__ = "XML"
-
-
-class BIT(sqltypes.Boolean):
- """MSSQL BIT type.
-
- Both pyodbc and pymssql return values from BIT columns as
- Python <class 'bool'> so just subclass Boolean.
-
- """
-
- __visit_name__ = "BIT"
-
-
-class MONEY(sqltypes.TypeEngine):
- __visit_name__ = "MONEY"
-
-
-class SMALLMONEY(sqltypes.TypeEngine):
- __visit_name__ = "SMALLMONEY"
-
-
-class MSUUid(sqltypes.Uuid):
- def bind_processor(self, dialect):
- if self.native_uuid:
- # this is currently assuming pyodbc; might not work for
- # some other mssql driver
- return None
- else:
- if self.as_uuid:
-
- def process(value):
- if value is not None:
- value = value.hex
- return value
-
- return process
- else:
-
- def process(value):
- if value is not None:
- value = value.replace("-", "").replace("''", "'")
- return value
-
- return process
-
- def literal_processor(self, dialect):
- if self.native_uuid:
-
- def process(value):
- return f"""'{str(value).replace("''", "'")}'"""
-
- return process
- else:
- if self.as_uuid:
-
- def process(value):
- return f"""'{value.hex}'"""
-
- return process
- else:
-
- def process(value):
- return f"""'{
- value.replace("-", "").replace("'", "''")
- }'"""
-
- return process
-
-
-class UNIQUEIDENTIFIER(sqltypes.Uuid[sqltypes._UUID_RETURN]):
- __visit_name__ = "UNIQUEIDENTIFIER"
-
- @overload
- def __init__(
- self: UNIQUEIDENTIFIER[_python_UUID], as_uuid: Literal[True] = ...
- ): ...
-
- @overload
- def __init__(
- self: UNIQUEIDENTIFIER[str], as_uuid: Literal[False] = ...
- ): ...
-
- def __init__(self, as_uuid: bool = True):
- """Construct a :class:`_mssql.UNIQUEIDENTIFIER` type.
-
-
- :param as_uuid=True: if True, values will be interpreted
- as Python uuid objects, converting to/from string via the
- DBAPI.
-
- .. versionchanged: 2.0 Added direct "uuid" support to the
- :class:`_mssql.UNIQUEIDENTIFIER` datatype; uuid interpretation
- defaults to ``True``.
-
- """
- self.as_uuid = as_uuid
- self.native_uuid = True
-
-
-class SQL_VARIANT(sqltypes.TypeEngine):
- __visit_name__ = "SQL_VARIANT"
-
-
-# old names.
-MSDateTime = _MSDateTime
-MSDate = _MSDate
-MSReal = REAL
-MSTinyInteger = TINYINT
-MSTime = TIME
-MSSmallDateTime = SMALLDATETIME
-MSDateTime2 = DATETIME2
-MSDateTimeOffset = DATETIMEOFFSET
-MSText = TEXT
-MSNText = NTEXT
-MSString = VARCHAR
-MSNVarchar = NVARCHAR
-MSChar = CHAR
-MSNChar = NCHAR
-MSBinary = BINARY
-MSVarBinary = VARBINARY
-MSImage = IMAGE
-MSBit = BIT
-MSMoney = MONEY
-MSSmallMoney = SMALLMONEY
-MSUniqueIdentifier = UNIQUEIDENTIFIER
-MSVariant = SQL_VARIANT
-
-ischema_names = {
- "int": INTEGER,
- "bigint": BIGINT,
- "smallint": SMALLINT,
- "tinyint": TINYINT,
- "varchar": VARCHAR,
- "nvarchar": NVARCHAR,
- "char": CHAR,
- "nchar": NCHAR,
- "text": TEXT,
- "ntext": NTEXT,
- "decimal": DECIMAL,
- "numeric": NUMERIC,
- "float": FLOAT,
- "datetime": DATETIME,
- "datetime2": DATETIME2,
- "datetimeoffset": DATETIMEOFFSET,
- "date": DATE,
- "time": TIME,
- "smalldatetime": SMALLDATETIME,
- "binary": BINARY,
- "varbinary": VARBINARY,
- "bit": BIT,
- "real": REAL,
- "double precision": DOUBLE_PRECISION,
- "image": IMAGE,
- "xml": XML,
- "timestamp": TIMESTAMP,
- "money": MONEY,
- "smallmoney": SMALLMONEY,
- "uniqueidentifier": UNIQUEIDENTIFIER,
- "sql_variant": SQL_VARIANT,
-}
-
-
-class MSTypeCompiler(compiler.GenericTypeCompiler):
- def _extend(self, spec, type_, length=None):
- """Extend a string-type declaration with standard SQL
- COLLATE annotations.
-
- """
-
- if getattr(type_, "collation", None):
- collation = "COLLATE %s" % type_.collation
- else:
- collation = None
-
- if not length:
- length = type_.length
-
- if length:
- spec = spec + "(%s)" % length
-
- return " ".join([c for c in (spec, collation) if c is not None])
-
- def visit_double(self, type_, **kw):
- return self.visit_DOUBLE_PRECISION(type_, **kw)
-
- def visit_FLOAT(self, type_, **kw):
- precision = getattr(type_, "precision", None)
- if precision is None:
- return "FLOAT"
- else:
- return "FLOAT(%(precision)s)" % {"precision": precision}
-
- def visit_TINYINT(self, type_, **kw):
- return "TINYINT"
-
- def visit_TIME(self, type_, **kw):
- precision = getattr(type_, "precision", None)
- if precision is not None:
- return "TIME(%s)" % precision
- else:
- return "TIME"
-
- def visit_TIMESTAMP(self, type_, **kw):
- return "TIMESTAMP"
-
- def visit_ROWVERSION(self, type_, **kw):
- return "ROWVERSION"
-
- def visit_datetime(self, type_, **kw):
- if type_.timezone:
- return self.visit_DATETIMEOFFSET(type_, **kw)
- else:
- return self.visit_DATETIME(type_, **kw)
-
- def visit_DATETIMEOFFSET(self, type_, **kw):
- precision = getattr(type_, "precision", None)
- if precision is not None:
- return "DATETIMEOFFSET(%s)" % type_.precision
- else:
- return "DATETIMEOFFSET"
-
- def visit_DATETIME2(self, type_, **kw):
- precision = getattr(type_, "precision", None)
- if precision is not None:
- return "DATETIME2(%s)" % precision
- else:
- return "DATETIME2"
-
- def visit_SMALLDATETIME(self, type_, **kw):
- return "SMALLDATETIME"
-
- def visit_unicode(self, type_, **kw):
- return self.visit_NVARCHAR(type_, **kw)
-
- def visit_text(self, type_, **kw):
- if self.dialect.deprecate_large_types:
- return self.visit_VARCHAR(type_, **kw)
- else:
- return self.visit_TEXT(type_, **kw)
-
- def visit_unicode_text(self, type_, **kw):
- if self.dialect.deprecate_large_types:
- return self.visit_NVARCHAR(type_, **kw)
- else:
- return self.visit_NTEXT(type_, **kw)
-
- def visit_NTEXT(self, type_, **kw):
- return self._extend("NTEXT", type_)
-
- def visit_TEXT(self, type_, **kw):
- return self._extend("TEXT", type_)
-
- def visit_VARCHAR(self, type_, **kw):
- return self._extend("VARCHAR", type_, length=type_.length or "max")
-
- def visit_CHAR(self, type_, **kw):
- return self._extend("CHAR", type_)
-
- def visit_NCHAR(self, type_, **kw):
- return self._extend("NCHAR", type_)
-
- def visit_NVARCHAR(self, type_, **kw):
- return self._extend("NVARCHAR", type_, length=type_.length or "max")
-
- def visit_date(self, type_, **kw):
- if self.dialect.server_version_info < MS_2008_VERSION:
- return self.visit_DATETIME(type_, **kw)
- else:
- return self.visit_DATE(type_, **kw)
-
- def visit__BASETIMEIMPL(self, type_, **kw):
- return self.visit_time(type_, **kw)
-
- def visit_time(self, type_, **kw):
- if self.dialect.server_version_info < MS_2008_VERSION:
- return self.visit_DATETIME(type_, **kw)
- else:
- return self.visit_TIME(type_, **kw)
-
- def visit_large_binary(self, type_, **kw):
- if self.dialect.deprecate_large_types:
- return self.visit_VARBINARY(type_, **kw)
- else:
- return self.visit_IMAGE(type_, **kw)
-
- def visit_IMAGE(self, type_, **kw):
- return "IMAGE"
-
- def visit_XML(self, type_, **kw):
- return "XML"
-
- def visit_VARBINARY(self, type_, **kw):
- text = self._extend("VARBINARY", type_, length=type_.length or "max")
- if getattr(type_, "filestream", False):
- text += " FILESTREAM"
- return text
-
- def visit_boolean(self, type_, **kw):
- return self.visit_BIT(type_)
-
- def visit_BIT(self, type_, **kw):
- return "BIT"
-
- def visit_JSON(self, type_, **kw):
- # this is a bit of a break with SQLAlchemy's convention of
- # "UPPERCASE name goes to UPPERCASE type name with no modification"
- return self._extend("NVARCHAR", type_, length="max")
-
- def visit_MONEY(self, type_, **kw):
- return "MONEY"
-
- def visit_SMALLMONEY(self, type_, **kw):
- return "SMALLMONEY"
-
- def visit_uuid(self, type_, **kw):
- if type_.native_uuid:
- return self.visit_UNIQUEIDENTIFIER(type_, **kw)
- else:
- return super().visit_uuid(type_, **kw)
-
- def visit_UNIQUEIDENTIFIER(self, type_, **kw):
- return "UNIQUEIDENTIFIER"
-
- def visit_SQL_VARIANT(self, type_, **kw):
- return "SQL_VARIANT"
-
-
-class MSExecutionContext(default.DefaultExecutionContext):
- _enable_identity_insert = False
- _select_lastrowid = False
- _lastrowid = None
-
- dialect: MSDialect
-
- def _opt_encode(self, statement):
- if self.compiled and self.compiled.schema_translate_map:
- rst = self.compiled.preparer._render_schema_translates
- statement = rst(statement, self.compiled.schema_translate_map)
-
- return statement
-
- def pre_exec(self):
- """Activate IDENTITY_INSERT if needed."""
-
- if self.isinsert:
- if TYPE_CHECKING:
- assert is_sql_compiler(self.compiled)
- assert isinstance(self.compiled.compile_state, DMLState)
- assert isinstance(
- self.compiled.compile_state.dml_table, TableClause
- )
-
- tbl = self.compiled.compile_state.dml_table
- id_column = tbl._autoincrement_column
-
- if id_column is not None and (
- not isinstance(id_column.default, Sequence)
- ):
- insert_has_identity = True
- compile_state = self.compiled.dml_compile_state
- self._enable_identity_insert = (
- id_column.key in self.compiled_parameters[0]
- ) or (
- compile_state._dict_parameters
- and (id_column.key in compile_state._insert_col_keys)
- )
-
- else:
- insert_has_identity = False
- self._enable_identity_insert = False
-
- self._select_lastrowid = (
- not self.compiled.inline
- and insert_has_identity
- and not self.compiled.effective_returning
- and not self._enable_identity_insert
- and not self.executemany
- )
-
- if self._enable_identity_insert:
- self.root_connection._cursor_execute(
- self.cursor,
- self._opt_encode(
- "SET IDENTITY_INSERT %s ON"
- % self.identifier_preparer.format_table(tbl)
- ),
- (),
- self,
- )
-
- def post_exec(self):
- """Disable IDENTITY_INSERT if enabled."""
-
- conn = self.root_connection
-
- if self.isinsert or self.isupdate or self.isdelete:
- self._rowcount = self.cursor.rowcount
-
- if self._select_lastrowid:
- if self.dialect.use_scope_identity:
- conn._cursor_execute(
- self.cursor,
- "SELECT scope_identity() AS lastrowid",
- (),
- self,
- )
- else:
- conn._cursor_execute(
- self.cursor, "SELECT @@identity AS lastrowid", (), self
- )
- # fetchall() ensures the cursor is consumed without closing it
- row = self.cursor.fetchall()[0]
- self._lastrowid = int(row[0])
-
- self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML
- elif (
- self.compiled is not None
- and is_sql_compiler(self.compiled)
- and self.compiled.effective_returning
- ):
- self.cursor_fetch_strategy = (
- _cursor.FullyBufferedCursorFetchStrategy(
- self.cursor,
- self.cursor.description,
- self.cursor.fetchall(),
- )
- )
-
- if self._enable_identity_insert:
- if TYPE_CHECKING:
- assert is_sql_compiler(self.compiled)
- assert isinstance(self.compiled.compile_state, DMLState)
- assert isinstance(
- self.compiled.compile_state.dml_table, TableClause
- )
- conn._cursor_execute(
- self.cursor,
- self._opt_encode(
- "SET IDENTITY_INSERT %s OFF"
- % self.identifier_preparer.format_table(
- self.compiled.compile_state.dml_table
- )
- ),
- (),
- self,
- )
-
- def get_lastrowid(self):
- return self._lastrowid
-
- def handle_dbapi_exception(self, e):
- if self._enable_identity_insert:
- try:
- self.cursor.execute(
- self._opt_encode(
- "SET IDENTITY_INSERT %s OFF"
- % self.identifier_preparer.format_table(
- self.compiled.compile_state.dml_table
- )
- )
- )
- except Exception:
- pass
-
- def fire_sequence(self, seq, type_):
- return self._execute_scalar(
- (
- "SELECT NEXT VALUE FOR %s"
- % self.identifier_preparer.format_sequence(seq)
- ),
- type_,
- )
-
- def get_insert_default(self, column):
- if (
- isinstance(column, sa_schema.Column)
- and column is column.table._autoincrement_column
- and isinstance(column.default, sa_schema.Sequence)
- and column.default.optional
- ):
- return None
- return super().get_insert_default(column)
-
-
-class MSSQLCompiler(compiler.SQLCompiler):
- returning_precedes_values = True
-
- extract_map = util.update_copy(
- compiler.SQLCompiler.extract_map,
- {
- "doy": "dayofyear",
- "dow": "weekday",
- "milliseconds": "millisecond",
- "microseconds": "microsecond",
- },
- )
-
- def __init__(self, *args, **kwargs):
- self.tablealiases = {}
- super().__init__(*args, **kwargs)
-
- def _with_legacy_schema_aliasing(fn):
- def decorate(self, *arg, **kw):
- if self.dialect.legacy_schema_aliasing:
- return fn(self, *arg, **kw)
- else:
- super_ = getattr(super(MSSQLCompiler, self), fn.__name__)
- return super_(*arg, **kw)
-
- return decorate
-
- def visit_now_func(self, fn, **kw):
- return "CURRENT_TIMESTAMP"
-
- def visit_current_date_func(self, fn, **kw):
- return "GETDATE()"
-
- def visit_length_func(self, fn, **kw):
- return "LEN%s" % self.function_argspec(fn, **kw)
-
- def visit_char_length_func(self, fn, **kw):
- return "LEN%s" % self.function_argspec(fn, **kw)
-
- def visit_aggregate_strings_func(self, fn, **kw):
- expr = fn.clauses.clauses[0]._compiler_dispatch(self, **kw)
- kw["literal_execute"] = True
- delimeter = fn.clauses.clauses[1]._compiler_dispatch(self, **kw)
- return f"string_agg({expr}, {delimeter})"
-
- def visit_concat_op_expression_clauselist(
- self, clauselist, operator, **kw
- ):
- return " + ".join(self.process(elem, **kw) for elem in clauselist)
-
- def visit_concat_op_binary(self, binary, operator, **kw):
- return "%s + %s" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- def visit_true(self, expr, **kw):
- return "1"
-
- def visit_false(self, expr, **kw):
- return "0"
-
- def visit_match_op_binary(self, binary, operator, **kw):
- return "CONTAINS (%s, %s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- def get_select_precolumns(self, select, **kw):
- """MS-SQL puts TOP, it's version of LIMIT here"""
-
- s = super().get_select_precolumns(select, **kw)
-
- if select._has_row_limiting_clause and self._use_top(select):
- # ODBC drivers and possibly others
- # don't support bind params in the SELECT clause on SQL Server.
- # so have to use literal here.
- kw["literal_execute"] = True
- s += "TOP %s " % self.process(
- self._get_limit_or_fetch(select), **kw
- )
- if select._fetch_clause is not None:
- if select._fetch_clause_options["percent"]:
- s += "PERCENT "
- if select._fetch_clause_options["with_ties"]:
- s += "WITH TIES "
-
- return s
-
- def get_from_hint_text(self, table, text):
- return text
-
- def get_crud_hint_text(self, table, text):
- return text
-
- def _get_limit_or_fetch(self, select):
- if select._fetch_clause is None:
- return select._limit_clause
- else:
- return select._fetch_clause
-
- def _use_top(self, select):
- return (select._offset_clause is None) and (
- select._simple_int_clause(select._limit_clause)
- or (
- # limit can use TOP with is by itself. fetch only uses TOP
- # when it needs to because of PERCENT and/or WITH TIES
- # TODO: Why? shouldn't we use TOP always ?
- select._simple_int_clause(select._fetch_clause)
- and (
- select._fetch_clause_options["percent"]
- or select._fetch_clause_options["with_ties"]
- )
- )
- )
-
- def limit_clause(self, cs, **kwargs):
- return ""
-
- def _check_can_use_fetch_limit(self, select):
- # to use ROW_NUMBER(), an ORDER BY is required.
- # OFFSET are FETCH are options of the ORDER BY clause
- if not select._order_by_clause.clauses:
- raise exc.CompileError(
- "MSSQL requires an order_by when "
- "using an OFFSET or a non-simple "
- "LIMIT clause"
- )
-
- if select._fetch_clause_options is not None and (
- select._fetch_clause_options["percent"]
- or select._fetch_clause_options["with_ties"]
- ):
- raise exc.CompileError(
- "MSSQL needs TOP to use PERCENT and/or WITH TIES. "
- "Only simple fetch without offset can be used."
- )
-
- def _row_limit_clause(self, select, **kw):
- """MSSQL 2012 supports OFFSET/FETCH operators
- Use it instead subquery with row_number
-
- """
-
- if self.dialect._supports_offset_fetch and not self._use_top(select):
- self._check_can_use_fetch_limit(select)
-
- return self.fetch_clause(
- select,
- fetch_clause=self._get_limit_or_fetch(select),
- require_offset=True,
- **kw,
- )
-
- else:
- return ""
-
- def visit_try_cast(self, element, **kw):
- return "TRY_CAST (%s AS %s)" % (
- self.process(element.clause, **kw),
- self.process(element.typeclause, **kw),
- )
-
- def translate_select_structure(self, select_stmt, **kwargs):
- """Look for ``LIMIT`` and OFFSET in a select statement, and if
- so tries to wrap it in a subquery with ``row_number()`` criterion.
- MSSQL 2012 and above are excluded
-
- """
- select = select_stmt
-
- if (
- select._has_row_limiting_clause
- and not self.dialect._supports_offset_fetch
- and not self._use_top(select)
- and not getattr(select, "_mssql_visit", None)
- ):
- self._check_can_use_fetch_limit(select)
-
- _order_by_clauses = [
- sql_util.unwrap_label_reference(elem)
- for elem in select._order_by_clause.clauses
- ]
-
- limit_clause = self._get_limit_or_fetch(select)
- offset_clause = select._offset_clause
-
- select = select._generate()
- select._mssql_visit = True
- select = (
- select.add_columns(
- sql.func.ROW_NUMBER()
- .over(order_by=_order_by_clauses)
- .label("mssql_rn")
- )
- .order_by(None)
- .alias()
- )
-
- mssql_rn = sql.column("mssql_rn")
- limitselect = sql.select(
- *[c for c in select.c if c.key != "mssql_rn"]
- )
- if offset_clause is not None:
- limitselect = limitselect.where(mssql_rn > offset_clause)
- if limit_clause is not None:
- limitselect = limitselect.where(
- mssql_rn <= (limit_clause + offset_clause)
- )
- else:
- limitselect = limitselect.where(mssql_rn <= (limit_clause))
- return limitselect
- else:
- return select
-
- @_with_legacy_schema_aliasing
- def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs):
- if mssql_aliased is table or iscrud:
- return super().visit_table(table, **kwargs)
-
- # alias schema-qualified tables
- alias = self._schema_aliased_table(table)
- if alias is not None:
- return self.process(alias, mssql_aliased=table, **kwargs)
- else:
- return super().visit_table(table, **kwargs)
-
- @_with_legacy_schema_aliasing
- def visit_alias(self, alias, **kw):
- # translate for schema-qualified table aliases
- kw["mssql_aliased"] = alias.element
- return super().visit_alias(alias, **kw)
-
- @_with_legacy_schema_aliasing
- def visit_column(self, column, add_to_result_map=None, **kw):
- if (
- column.table is not None
- and (not self.isupdate and not self.isdelete)
- or self.is_subquery()
- ):
- # translate for schema-qualified table aliases
- t = self._schema_aliased_table(column.table)
- if t is not None:
- converted = elements._corresponding_column_or_error(t, column)
- if add_to_result_map is not None:
- add_to_result_map(
- column.name,
- column.name,
- (column, column.name, column.key),
- column.type,
- )
-
- return super().visit_column(converted, **kw)
-
- return super().visit_column(
- column, add_to_result_map=add_to_result_map, **kw
- )
-
- def _schema_aliased_table(self, table):
- if getattr(table, "schema", None) is not None:
- if table not in self.tablealiases:
- self.tablealiases[table] = table.alias()
- return self.tablealiases[table]
- else:
- return None
-
- def visit_extract(self, extract, **kw):
- field = self.extract_map.get(extract.field, extract.field)
- return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw))
-
- def visit_savepoint(self, savepoint_stmt, **kw):
- return "SAVE TRANSACTION %s" % self.preparer.format_savepoint(
- savepoint_stmt
- )
-
- def visit_rollback_to_savepoint(self, savepoint_stmt, **kw):
- return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint(
- savepoint_stmt
- )
-
- def visit_binary(self, binary, **kwargs):
- """Move bind parameters to the right-hand side of an operator, where
- possible.
-
- """
- if (
- isinstance(binary.left, expression.BindParameter)
- and binary.operator == operator.eq
- and not isinstance(binary.right, expression.BindParameter)
- ):
- return self.process(
- expression.BinaryExpression(
- binary.right, binary.left, binary.operator
- ),
- **kwargs,
- )
- return super().visit_binary(binary, **kwargs)
-
- def returning_clause(
- self, stmt, returning_cols, *, populate_result_map, **kw
- ):
- # SQL server returning clause requires that the columns refer to
- # the virtual table names "inserted" or "deleted". Here, we make
- # a simple alias of our table with that name, and then adapt the
- # columns we have from the list of RETURNING columns to that new name
- # so that they render as "inserted.<colname>" / "deleted.<colname>".
-
- if stmt.is_insert or stmt.is_update:
- target = stmt.table.alias("inserted")
- elif stmt.is_delete:
- target = stmt.table.alias("deleted")
- else:
- assert False, "expected Insert, Update or Delete statement"
-
- adapter = sql_util.ClauseAdapter(target)
-
- # adapter.traverse() takes a column from our target table and returns
- # the one that is linked to the "inserted" / "deleted" tables. So in
- # order to retrieve these values back from the result (e.g. like
- # row[column]), tell the compiler to also add the original unadapted
- # column to the result map. Before #4877, these were (unknowingly)
- # falling back using string name matching in the result set which
- # necessarily used an expensive KeyError in order to match.
-
- columns = [
- self._label_returning_column(
- stmt,
- adapter.traverse(column),
- populate_result_map,
- {"result_map_targets": (column,)},
- fallback_label_name=fallback_label_name,
- column_is_repeated=repeated,
- name=name,
- proxy_name=proxy_name,
- **kw,
- )
- for (
- name,
- proxy_name,
- fallback_label_name,
- column,
- repeated,
- ) in stmt._generate_columns_plus_names(
- True, cols=expression._select_iterables(returning_cols)
- )
- ]
-
- return "OUTPUT " + ", ".join(columns)
-
- def get_cte_preamble(self, recursive):
- # SQL Server finds it too inconvenient to accept
- # an entirely optional, SQL standard specified,
- # "RECURSIVE" word with their "WITH",
- # so here we go
- return "WITH"
-
- def label_select_column(self, select, column, asfrom):
- if isinstance(column, expression.Function):
- return column.label(None)
- else:
- return super().label_select_column(select, column, asfrom)
-
- def for_update_clause(self, select, **kw):
- # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which
- # SQLAlchemy doesn't use
- return ""
-
- def order_by_clause(self, select, **kw):
- # MSSQL only allows ORDER BY in subqueries if there is a LIMIT:
- # "The ORDER BY clause is invalid in views, inline functions,
- # derived tables, subqueries, and common table expressions,
- # unless TOP, OFFSET or FOR XML is also specified."
- if (
- self.is_subquery()
- and not self._use_top(select)
- and (
- select._offset is None
- or not self.dialect._supports_offset_fetch
- )
- ):
- # avoid processing the order by clause if we won't end up
- # using it, because we don't want all the bind params tacked
- # onto the positional list if that is what the dbapi requires
- return ""
-
- order_by = self.process(select._order_by_clause, **kw)
-
- if order_by:
- return " ORDER BY " + order_by
- else:
- return ""
-
- def update_from_clause(
- self, update_stmt, from_table, extra_froms, from_hints, **kw
- ):
- """Render the UPDATE..FROM clause specific to MSSQL.
-
- In MSSQL, if the UPDATE statement involves an alias of the table to
- be updated, then the table itself must be added to the FROM list as
- well. Otherwise, it is optional. Here, we add it regardless.
-
- """
- return "FROM " + ", ".join(
- t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
- for t in [from_table] + extra_froms
- )
-
- def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw):
- """If we have extra froms make sure we render any alias as hint."""
- ashint = False
- if extra_froms:
- ashint = True
- return from_table._compiler_dispatch(
- self, asfrom=True, iscrud=True, ashint=ashint, **kw
- )
-
- def delete_extra_from_clause(
- self, delete_stmt, from_table, extra_froms, from_hints, **kw
- ):
- """Render the DELETE .. FROM clause specific to MSSQL.
-
- Yes, it has the FROM keyword twice.
-
- """
- return "FROM " + ", ".join(
- t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw)
- for t in [from_table] + extra_froms
- )
-
- def visit_empty_set_expr(self, type_, **kw):
- return "SELECT 1 WHERE 1!=1"
-
- def visit_is_distinct_from_binary(self, binary, operator, **kw):
- return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % (
- self.process(binary.left),
- self.process(binary.right),
- )
-
- def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
- return "EXISTS (SELECT %s INTERSECT SELECT %s)" % (
- self.process(binary.left),
- self.process(binary.right),
- )
-
- def _render_json_extract_from_binary(self, binary, operator, **kw):
- # note we are intentionally calling upon the process() calls in the
- # order in which they appear in the SQL String as this is used
- # by positional parameter rendering
-
- if binary.type._type_affinity is sqltypes.JSON:
- return "JSON_QUERY(%s, %s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- # as with other dialects, start with an explicit test for NULL
- case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- if binary.type._type_affinity is sqltypes.Integer:
- type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
- elif binary.type._type_affinity is sqltypes.Numeric:
- type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- (
- "FLOAT"
- if isinstance(binary.type, sqltypes.Float)
- else "NUMERIC(%s, %s)"
- % (binary.type.precision, binary.type.scale)
- ),
- )
- elif binary.type._type_affinity is sqltypes.Boolean:
- # the NULL handling is particularly weird with boolean, so
- # explicitly return numeric (BIT) constants
- type_expression = (
- "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL"
- )
- elif binary.type._type_affinity is sqltypes.String:
- # TODO: does this comment (from mysql) apply to here, too?
- # this fails with a JSON value that's a four byte unicode
- # string. SQLite has the same problem at the moment
- type_expression = "ELSE JSON_VALUE(%s, %s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
- else:
- # other affinity....this is not expected right now
- type_expression = "ELSE JSON_QUERY(%s, %s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- return case_expression + " " + type_expression + " END"
-
- def visit_json_getitem_op_binary(self, binary, operator, **kw):
- return self._render_json_extract_from_binary(binary, operator, **kw)
-
- def visit_json_path_getitem_op_binary(self, binary, operator, **kw):
- return self._render_json_extract_from_binary(binary, operator, **kw)
-
- def visit_sequence(self, seq, **kw):
- return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq)
-
-
-class MSSQLStrictCompiler(MSSQLCompiler):
- """A subclass of MSSQLCompiler which disables the usage of bind
- parameters where not allowed natively by MS-SQL.
-
- A dialect may use this compiler on a platform where native
- binds are used.
-
- """
-
- ansi_bind_rules = True
-
- def visit_in_op_binary(self, binary, operator, **kw):
- kw["literal_execute"] = True
- return "%s IN %s" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- def visit_not_in_op_binary(self, binary, operator, **kw):
- kw["literal_execute"] = True
- return "%s NOT IN %s" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- def render_literal_value(self, value, type_):
- """
- For date and datetime values, convert to a string
- format acceptable to MSSQL. That seems to be the
- so-called ODBC canonical date format which looks
- like this:
-
- yyyy-mm-dd hh:mi:ss.mmm(24h)
-
- For other data types, call the base class implementation.
- """
- # datetime and date are both subclasses of datetime.date
- if issubclass(type(value), datetime.date):
- # SQL Server wants single quotes around the date string.
- return "'" + str(value) + "'"
- else:
- return super().render_literal_value(value, type_)
-
-
-class MSDDLCompiler(compiler.DDLCompiler):
- def get_column_specification(self, column, **kwargs):
- colspec = self.preparer.format_column(column)
-
- # type is not accepted in a computed column
- if column.computed is not None:
- colspec += " " + self.process(column.computed)
- else:
- colspec += " " + self.dialect.type_compiler_instance.process(
- column.type, type_expression=column
- )
-
- if column.nullable is not None:
- if (
- not column.nullable
- or column.primary_key
- or isinstance(column.default, sa_schema.Sequence)
- or column.autoincrement is True
- or column.identity
- ):
- colspec += " NOT NULL"
- elif column.computed is None:
- # don't specify "NULL" for computed columns
- colspec += " NULL"
-
- if column.table is None:
- raise exc.CompileError(
- "mssql requires Table-bound columns "
- "in order to generate DDL"
- )
-
- d_opt = column.dialect_options["mssql"]
- start = d_opt["identity_start"]
- increment = d_opt["identity_increment"]
- if start is not None or increment is not None:
- if column.identity:
- raise exc.CompileError(
- "Cannot specify options 'mssql_identity_start' and/or "
- "'mssql_identity_increment' while also using the "
- "'Identity' construct."
- )
- util.warn_deprecated(
- "The dialect options 'mssql_identity_start' and "
- "'mssql_identity_increment' are deprecated. "
- "Use the 'Identity' object instead.",
- "1.4",
- )
-
- if column.identity:
- colspec += self.process(column.identity, **kwargs)
- elif (
- column is column.table._autoincrement_column
- or column.autoincrement is True
- ) and (
- not isinstance(column.default, Sequence) or column.default.optional
- ):
- colspec += self.process(Identity(start=start, increment=increment))
- else:
- default = self.get_column_default_string(column)
- if default is not None:
- colspec += " DEFAULT " + default
-
- return colspec
-
- def visit_create_index(self, create, include_schema=False, **kw):
- index = create.element
- self._verify_index_table(index)
- preparer = self.preparer
- text = "CREATE "
- if index.unique:
- text += "UNIQUE "
-
- # handle clustering option
- clustered = index.dialect_options["mssql"]["clustered"]
- if clustered is not None:
- if clustered:
- text += "CLUSTERED "
- else:
- text += "NONCLUSTERED "
-
- # handle columnstore option (has no negative value)
- columnstore = index.dialect_options["mssql"]["columnstore"]
- if columnstore:
- text += "COLUMNSTORE "
-
- text += "INDEX %s ON %s" % (
- self._prepared_index_name(index, include_schema=include_schema),
- preparer.format_table(index.table),
- )
-
- # in some case mssql allows indexes with no columns defined
- if len(index.expressions) > 0:
- text += " (%s)" % ", ".join(
- self.sql_compiler.process(
- expr, include_table=False, literal_binds=True
- )
- for expr in index.expressions
- )
-
- # handle other included columns
- if index.dialect_options["mssql"]["include"]:
- inclusions = [
- index.table.c[col] if isinstance(col, str) else col
- for col in index.dialect_options["mssql"]["include"]
- ]
-
- text += " INCLUDE (%s)" % ", ".join(
- [preparer.quote(c.name) for c in inclusions]
- )
-
- whereclause = index.dialect_options["mssql"]["where"]
-
- if whereclause is not None:
- whereclause = coercions.expect(
- roles.DDLExpressionRole, whereclause
- )
-
- where_compiled = self.sql_compiler.process(
- whereclause, include_table=False, literal_binds=True
- )
- text += " WHERE " + where_compiled
-
- return text
-
- def visit_drop_index(self, drop, **kw):
- return "\nDROP INDEX %s ON %s" % (
- self._prepared_index_name(drop.element, include_schema=False),
- self.preparer.format_table(drop.element.table),
- )
-
- def visit_primary_key_constraint(self, constraint, **kw):
- if len(constraint) == 0:
- return ""
- text = ""
- if constraint.name is not None:
- text += "CONSTRAINT %s " % self.preparer.format_constraint(
- constraint
- )
- text += "PRIMARY KEY "
-
- clustered = constraint.dialect_options["mssql"]["clustered"]
- if clustered is not None:
- if clustered:
- text += "CLUSTERED "
- else:
- text += "NONCLUSTERED "
-
- text += "(%s)" % ", ".join(
- self.preparer.quote(c.name) for c in constraint
- )
- text += self.define_constraint_deferrability(constraint)
- return text
-
- def visit_unique_constraint(self, constraint, **kw):
- if len(constraint) == 0:
- return ""
- text = ""
- if constraint.name is not None:
- formatted_name = self.preparer.format_constraint(constraint)
- if formatted_name is not None:
- text += "CONSTRAINT %s " % formatted_name
- text += "UNIQUE %s" % self.define_unique_constraint_distinct(
- constraint, **kw
- )
- clustered = constraint.dialect_options["mssql"]["clustered"]
- if clustered is not None:
- if clustered:
- text += "CLUSTERED "
- else:
- text += "NONCLUSTERED "
-
- text += "(%s)" % ", ".join(
- self.preparer.quote(c.name) for c in constraint
- )
- text += self.define_constraint_deferrability(constraint)
- return text
-
- def visit_computed_column(self, generated, **kw):
- text = "AS (%s)" % self.sql_compiler.process(
- generated.sqltext, include_table=False, literal_binds=True
- )
- # explicitly check for True|False since None means server default
- if generated.persisted is True:
- text += " PERSISTED"
- return text
-
- def visit_set_table_comment(self, create, **kw):
- schema = self.preparer.schema_for_object(create.element)
- schema_name = schema if schema else self.dialect.default_schema_name
- return (
- "execute sp_addextendedproperty 'MS_Description', "
- "{}, 'schema', {}, 'table', {}".format(
- self.sql_compiler.render_literal_value(
- create.element.comment, sqltypes.NVARCHAR()
- ),
- self.preparer.quote_schema(schema_name),
- self.preparer.format_table(create.element, use_schema=False),
- )
- )
-
- def visit_drop_table_comment(self, drop, **kw):
- schema = self.preparer.schema_for_object(drop.element)
- schema_name = schema if schema else self.dialect.default_schema_name
- return (
- "execute sp_dropextendedproperty 'MS_Description', 'schema', "
- "{}, 'table', {}".format(
- self.preparer.quote_schema(schema_name),
- self.preparer.format_table(drop.element, use_schema=False),
- )
- )
-
- def visit_set_column_comment(self, create, **kw):
- schema = self.preparer.schema_for_object(create.element.table)
- schema_name = schema if schema else self.dialect.default_schema_name
- return (
- "execute sp_addextendedproperty 'MS_Description', "
- "{}, 'schema', {}, 'table', {}, 'column', {}".format(
- self.sql_compiler.render_literal_value(
- create.element.comment, sqltypes.NVARCHAR()
- ),
- self.preparer.quote_schema(schema_name),
- self.preparer.format_table(
- create.element.table, use_schema=False
- ),
- self.preparer.format_column(create.element),
- )
- )
-
- def visit_drop_column_comment(self, drop, **kw):
- schema = self.preparer.schema_for_object(drop.element.table)
- schema_name = schema if schema else self.dialect.default_schema_name
- return (
- "execute sp_dropextendedproperty 'MS_Description', 'schema', "
- "{}, 'table', {}, 'column', {}".format(
- self.preparer.quote_schema(schema_name),
- self.preparer.format_table(
- drop.element.table, use_schema=False
- ),
- self.preparer.format_column(drop.element),
- )
- )
-
- def visit_create_sequence(self, create, **kw):
- prefix = None
- if create.element.data_type is not None:
- data_type = create.element.data_type
- prefix = " AS %s" % self.type_compiler.process(data_type)
- return super().visit_create_sequence(create, prefix=prefix, **kw)
-
- def visit_identity_column(self, identity, **kw):
- text = " IDENTITY"
- if identity.start is not None or identity.increment is not None:
- start = 1 if identity.start is None else identity.start
- increment = 1 if identity.increment is None else identity.increment
- text += "(%s,%s)" % (start, increment)
- return text
-
-
-class MSIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = RESERVED_WORDS
-
- def __init__(self, dialect):
- super().__init__(
- dialect,
- initial_quote="[",
- final_quote="]",
- quote_case_sensitive_collations=False,
- )
-
- def _escape_identifier(self, value):
- return value.replace("]", "]]")
-
- def _unescape_identifier(self, value):
- return value.replace("]]", "]")
-
- def quote_schema(self, schema, force=None):
- """Prepare a quoted table and schema name."""
-
- # need to re-implement the deprecation warning entirely
- if force is not None:
- # not using the util.deprecated_params() decorator in this
- # case because of the additional function call overhead on this
- # very performance-critical spot.
- util.warn_deprecated(
- "The IdentifierPreparer.quote_schema.force parameter is "
- "deprecated and will be removed in a future release. This "
- "flag has no effect on the behavior of the "
- "IdentifierPreparer.quote method; please refer to "
- "quoted_name().",
- version="1.3",
- )
-
- dbname, owner = _schema_elements(schema)
- if dbname:
- result = "%s.%s" % (self.quote(dbname), self.quote(owner))
- elif owner:
- result = self.quote(owner)
- else:
- result = ""
- return result
-
-
-def _db_plus_owner_listing(fn):
- def wrap(dialect, connection, schema=None, **kw):
- dbname, owner = _owner_plus_db(dialect, schema)
- return _switch_db(
- dbname,
- connection,
- fn,
- dialect,
- connection,
- dbname,
- owner,
- schema,
- **kw,
- )
-
- return update_wrapper(wrap, fn)
-
-
-def _db_plus_owner(fn):
- def wrap(dialect, connection, tablename, schema=None, **kw):
- dbname, owner = _owner_plus_db(dialect, schema)
- return _switch_db(
- dbname,
- connection,
- fn,
- dialect,
- connection,
- tablename,
- dbname,
- owner,
- schema,
- **kw,
- )
-
- return update_wrapper(wrap, fn)
-
-
-def _switch_db(dbname, connection, fn, *arg, **kw):
- if dbname:
- current_db = connection.exec_driver_sql("select db_name()").scalar()
- if current_db != dbname:
- connection.exec_driver_sql(
- "use %s" % connection.dialect.identifier_preparer.quote(dbname)
- )
- try:
- return fn(*arg, **kw)
- finally:
- if dbname and current_db != dbname:
- connection.exec_driver_sql(
- "use %s"
- % connection.dialect.identifier_preparer.quote(current_db)
- )
-
-
-def _owner_plus_db(dialect, schema):
- if not schema:
- return None, dialect.default_schema_name
- else:
- return _schema_elements(schema)
-
-
-_memoized_schema = util.LRUCache()
-
-
-def _schema_elements(schema):
- if isinstance(schema, quoted_name) and schema.quote:
- return None, schema
-
- if schema in _memoized_schema:
- return _memoized_schema[schema]
-
- # tests for this function are in:
- # test/dialect/mssql/test_reflection.py ->
- # OwnerPlusDBTest.test_owner_database_pairs
- # test/dialect/mssql/test_compiler.py -> test_force_schema_*
- # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_*
- #
-
- if schema.startswith("__[SCHEMA_"):
- return None, schema
-
- push = []
- symbol = ""
- bracket = False
- has_brackets = False
- for token in re.split(r"(\[|\]|\.)", schema):
- if not token:
- continue
- if token == "[":
- bracket = True
- has_brackets = True
- elif token == "]":
- bracket = False
- elif not bracket and token == ".":
- if has_brackets:
- push.append("[%s]" % symbol)
- else:
- push.append(symbol)
- symbol = ""
- has_brackets = False
- else:
- symbol += token
- if symbol:
- push.append(symbol)
- if len(push) > 1:
- dbname, owner = ".".join(push[0:-1]), push[-1]
-
- # test for internal brackets
- if re.match(r".*\].*\[.*", dbname[1:-1]):
- dbname = quoted_name(dbname, quote=False)
- else:
- dbname = dbname.lstrip("[").rstrip("]")
-
- elif len(push):
- dbname, owner = None, push[0]
- else:
- dbname, owner = None, None
-
- _memoized_schema[schema] = dbname, owner
- return dbname, owner
-
-
-class MSDialect(default.DefaultDialect):
- # will assume it's at least mssql2005
- name = "mssql"
- supports_statement_cache = True
- supports_default_values = True
- supports_empty_insert = False
- favor_returning_over_lastrowid = True
-
- returns_native_bytes = True
-
- supports_comments = True
- supports_default_metavalue = False
- """dialect supports INSERT... VALUES (DEFAULT) syntax -
- SQL Server **does** support this, but **not** for the IDENTITY column,
- so we can't turn this on.
-
- """
-
- # supports_native_uuid is partial here, so we implement our
- # own impl type
-
- execution_ctx_cls = MSExecutionContext
- use_scope_identity = True
- max_identifier_length = 128
- schema_name = "dbo"
-
- insert_returning = True
- update_returning = True
- delete_returning = True
- update_returning_multifrom = True
- delete_returning_multifrom = True
-
- colspecs = {
- sqltypes.DateTime: _MSDateTime,
- sqltypes.Date: _MSDate,
- sqltypes.JSON: JSON,
- sqltypes.JSON.JSONIndexType: JSONIndexType,
- sqltypes.JSON.JSONPathType: JSONPathType,
- sqltypes.Time: _BASETIMEIMPL,
- sqltypes.Unicode: _MSUnicode,
- sqltypes.UnicodeText: _MSUnicodeText,
- DATETIMEOFFSET: DATETIMEOFFSET,
- DATETIME2: DATETIME2,
- SMALLDATETIME: SMALLDATETIME,
- DATETIME: DATETIME,
- sqltypes.Uuid: MSUUid,
- }
-
- engine_config_types = default.DefaultDialect.engine_config_types.union(
- {"legacy_schema_aliasing": util.asbool}
- )
-
- ischema_names = ischema_names
-
- supports_sequences = True
- sequences_optional = True
- # This is actually used for autoincrement, where itentity is used that
- # starts with 1.
- # for sequences T-SQL's actual default is -9223372036854775808
- default_sequence_base = 1
-
- supports_native_boolean = False
- non_native_boolean_check_constraint = False
- supports_unicode_binds = True
- postfetch_lastrowid = True
-
- # may be changed at server inspection time for older SQL server versions
- supports_multivalues_insert = True
-
- use_insertmanyvalues = True
-
- # note pyodbc will set this to False if fast_executemany is set,
- # as of SQLAlchemy 2.0.9
- use_insertmanyvalues_wo_returning = True
-
- insertmanyvalues_implicit_sentinel = (
- InsertmanyvaluesSentinelOpts.AUTOINCREMENT
- | InsertmanyvaluesSentinelOpts.IDENTITY
- | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
- )
-
- # "The incoming request has too many parameters. The server supports a "
- # "maximum of 2100 parameters."
- # in fact you can have 2099 parameters.
- insertmanyvalues_max_parameters = 2099
-
- _supports_offset_fetch = False
- _supports_nvarchar_max = False
-
- legacy_schema_aliasing = False
-
- server_version_info = ()
-
- statement_compiler = MSSQLCompiler
- ddl_compiler = MSDDLCompiler
- type_compiler_cls = MSTypeCompiler
- preparer = MSIdentifierPreparer
-
- construct_arguments = [
- (sa_schema.PrimaryKeyConstraint, {"clustered": None}),
- (sa_schema.UniqueConstraint, {"clustered": None}),
- (
- sa_schema.Index,
- {
- "clustered": None,
- "include": None,
- "where": None,
- "columnstore": None,
- },
- ),
- (
- sa_schema.Column,
- {"identity_start": None, "identity_increment": None},
- ),
- ]
-
- def __init__(
- self,
- query_timeout=None,
- use_scope_identity=True,
- schema_name="dbo",
- deprecate_large_types=None,
- supports_comments=None,
- json_serializer=None,
- json_deserializer=None,
- legacy_schema_aliasing=None,
- ignore_no_transaction_on_rollback=False,
- **opts,
- ):
- self.query_timeout = int(query_timeout or 0)
- self.schema_name = schema_name
-
- self.use_scope_identity = use_scope_identity
- self.deprecate_large_types = deprecate_large_types
- self.ignore_no_transaction_on_rollback = (
- ignore_no_transaction_on_rollback
- )
- self._user_defined_supports_comments = uds = supports_comments
- if uds is not None:
- self.supports_comments = uds
-
- if legacy_schema_aliasing is not None:
- util.warn_deprecated(
- "The legacy_schema_aliasing parameter is "
- "deprecated and will be removed in a future release.",
- "1.4",
- )
- self.legacy_schema_aliasing = legacy_schema_aliasing
-
- super().__init__(**opts)
-
- self._json_serializer = json_serializer
- self._json_deserializer = json_deserializer
-
- def do_savepoint(self, connection, name):
- # give the DBAPI a push
- connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION")
- super().do_savepoint(connection, name)
-
- def do_release_savepoint(self, connection, name):
- # SQL Server does not support RELEASE SAVEPOINT
- pass
-
- def do_rollback(self, dbapi_connection):
- try:
- super().do_rollback(dbapi_connection)
- except self.dbapi.ProgrammingError as e:
- if self.ignore_no_transaction_on_rollback and re.match(
- r".*\b111214\b", str(e)
- ):
- util.warn(
- "ProgrammingError 111214 "
- "'No corresponding transaction found.' "
- "has been suppressed via "
- "ignore_no_transaction_on_rollback=True"
- )
- else:
- raise
-
- _isolation_lookup = {
- "SERIALIZABLE",
- "READ UNCOMMITTED",
- "READ COMMITTED",
- "REPEATABLE READ",
- "SNAPSHOT",
- }
-
- def get_isolation_level_values(self, dbapi_connection):
- return list(self._isolation_lookup)
-
- def set_isolation_level(self, dbapi_connection, level):
- cursor = dbapi_connection.cursor()
- cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
- cursor.close()
- if level == "SNAPSHOT":
- dbapi_connection.commit()
-
- def get_isolation_level(self, dbapi_connection):
- cursor = dbapi_connection.cursor()
- view_name = "sys.system_views"
- try:
- cursor.execute(
- (
- "SELECT name FROM {} WHERE name IN "
- "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')"
- ).format(view_name)
- )
- row = cursor.fetchone()
- if not row:
- raise NotImplementedError(
- "Can't fetch isolation level on this particular "
- "SQL Server version."
- )
-
- view_name = f"sys.{row[0]}"
-
- cursor.execute(
- """
- SELECT CASE transaction_isolation_level
- WHEN 0 THEN NULL
- WHEN 1 THEN 'READ UNCOMMITTED'
- WHEN 2 THEN 'READ COMMITTED'
- WHEN 3 THEN 'REPEATABLE READ'
- WHEN 4 THEN 'SERIALIZABLE'
- WHEN 5 THEN 'SNAPSHOT' END
- AS TRANSACTION_ISOLATION_LEVEL
- FROM {}
- where session_id = @@SPID
- """.format(
- view_name
- )
- )
- except self.dbapi.Error as err:
- raise NotImplementedError(
- "Can't fetch isolation level; encountered error {} when "
- 'attempting to query the "{}" view.'.format(err, view_name)
- ) from err
- else:
- row = cursor.fetchone()
- return row[0].upper()
- finally:
- cursor.close()
-
- def initialize(self, connection):
- super().initialize(connection)
- self._setup_version_attributes()
- self._setup_supports_nvarchar_max(connection)
- self._setup_supports_comments(connection)
-
- def _setup_version_attributes(self):
- if self.server_version_info[0] not in list(range(8, 17)):
- util.warn(
- "Unrecognized server version info '%s'. Some SQL Server "
- "features may not function properly."
- % ".".join(str(x) for x in self.server_version_info)
- )
-
- if self.server_version_info >= MS_2008_VERSION:
- self.supports_multivalues_insert = True
- else:
- self.supports_multivalues_insert = False
-
- if self.deprecate_large_types is None:
- self.deprecate_large_types = (
- self.server_version_info >= MS_2012_VERSION
- )
-
- self._supports_offset_fetch = (
- self.server_version_info and self.server_version_info[0] >= 11
- )
-
- def _setup_supports_nvarchar_max(self, connection):
- try:
- connection.scalar(
- sql.text("SELECT CAST('test max support' AS NVARCHAR(max))")
- )
- except exc.DBAPIError:
- self._supports_nvarchar_max = False
- else:
- self._supports_nvarchar_max = True
-
- def _setup_supports_comments(self, connection):
- if self._user_defined_supports_comments is not None:
- return
-
- try:
- connection.scalar(
- sql.text(
- "SELECT 1 FROM fn_listextendedproperty"
- "(default, default, default, default, "
- "default, default, default)"
- )
- )
- except exc.DBAPIError:
- self.supports_comments = False
- else:
- self.supports_comments = True
-
- def _get_default_schema_name(self, connection):
- query = sql.text("SELECT schema_name()")
- default_schema_name = connection.scalar(query)
- if default_schema_name is not None:
- # guard against the case where the default_schema_name is being
- # fed back into a table reflection function.
- return quoted_name(default_schema_name, quote=True)
- else:
- return self.schema_name
-
- @_db_plus_owner
- def has_table(self, connection, tablename, dbname, owner, schema, **kw):
- self._ensure_has_table_connection(connection)
-
- return self._internal_has_table(connection, tablename, owner, **kw)
-
- @reflection.cache
- @_db_plus_owner
- def has_sequence(
- self, connection, sequencename, dbname, owner, schema, **kw
- ):
- sequences = ischema.sequences
-
- s = sql.select(sequences.c.sequence_name).where(
- sequences.c.sequence_name == sequencename
- )
-
- if owner:
- s = s.where(sequences.c.sequence_schema == owner)
-
- c = connection.execute(s)
-
- return c.first() is not None
-
- @reflection.cache
- @_db_plus_owner_listing
- def get_sequence_names(self, connection, dbname, owner, schema, **kw):
- sequences = ischema.sequences
-
- s = sql.select(sequences.c.sequence_name)
- if owner:
- s = s.where(sequences.c.sequence_schema == owner)
-
- c = connection.execute(s)
-
- return [row[0] for row in c]
-
- @reflection.cache
- def get_schema_names(self, connection, **kw):
- s = sql.select(ischema.schemata.c.schema_name).order_by(
- ischema.schemata.c.schema_name
- )
- schema_names = [r[0] for r in connection.execute(s)]
- return schema_names
-
- @reflection.cache
- @_db_plus_owner_listing
- def get_table_names(self, connection, dbname, owner, schema, **kw):
- tables = ischema.tables
- s = (
- sql.select(tables.c.table_name)
- .where(
- sql.and_(
- tables.c.table_schema == owner,
- tables.c.table_type == "BASE TABLE",
- )
- )
- .order_by(tables.c.table_name)
- )
- table_names = [r[0] for r in connection.execute(s)]
- return table_names
-
- @reflection.cache
- @_db_plus_owner_listing
- def get_view_names(self, connection, dbname, owner, schema, **kw):
- tables = ischema.tables
- s = (
- sql.select(tables.c.table_name)
- .where(
- sql.and_(
- tables.c.table_schema == owner,
- tables.c.table_type == "VIEW",
- )
- )
- .order_by(tables.c.table_name)
- )
- view_names = [r[0] for r in connection.execute(s)]
- return view_names
-
- @reflection.cache
- def _internal_has_table(self, connection, tablename, owner, **kw):
- if tablename.startswith("#"): # temporary table
- # mssql does not support temporary views
- # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed
- return bool(
- connection.scalar(
- # U filters on user tables only.
- text("SELECT object_id(:table_name, 'U')"),
- {"table_name": f"tempdb.dbo.[{tablename}]"},
- )
- )
- else:
- tables = ischema.tables
-
- s = sql.select(tables.c.table_name).where(
- sql.and_(
- sql.or_(
- tables.c.table_type == "BASE TABLE",
- tables.c.table_type == "VIEW",
- ),
- tables.c.table_name == tablename,
- )
- )
-
- if owner:
- s = s.where(tables.c.table_schema == owner)
-
- c = connection.execute(s)
-
- return c.first() is not None
-
- def _default_or_error(self, connection, tablename, owner, method, **kw):
- # TODO: try to avoid having to run a separate query here
- if self._internal_has_table(connection, tablename, owner, **kw):
- return method()
- else:
- raise exc.NoSuchTableError(f"{owner}.{tablename}")
-
- @reflection.cache
- @_db_plus_owner
- def get_indexes(self, connection, tablename, dbname, owner, schema, **kw):
- filter_definition = (
- "ind.filter_definition"
- if self.server_version_info >= MS_2008_VERSION
- else "NULL as filter_definition"
- )
- rp = connection.execution_options(future_result=True).execute(
- sql.text(
- f"""
-select
- ind.index_id,
- ind.is_unique,
- ind.name,
- ind.type,
- {filter_definition}
-from
- sys.indexes as ind
-join sys.tables as tab on
- ind.object_id = tab.object_id
-join sys.schemas as sch on
- sch.schema_id = tab.schema_id
-where
- tab.name = :tabname
- and sch.name = :schname
- and ind.is_primary_key = 0
- and ind.type != 0
-order by
- ind.name
- """
- )
- .bindparams(
- sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
- sql.bindparam("schname", owner, ischema.CoerceUnicode()),
- )
- .columns(name=sqltypes.Unicode())
- )
- indexes = {}
- for row in rp.mappings():
- indexes[row["index_id"]] = current = {
- "name": row["name"],
- "unique": row["is_unique"] == 1,
- "column_names": [],
- "include_columns": [],
- "dialect_options": {},
- }
-
- do = current["dialect_options"]
- index_type = row["type"]
- if index_type in {1, 2}:
- do["mssql_clustered"] = index_type == 1
- if index_type in {5, 6}:
- do["mssql_clustered"] = index_type == 5
- do["mssql_columnstore"] = True
- if row["filter_definition"] is not None:
- do["mssql_where"] = row["filter_definition"]
-
- rp = connection.execution_options(future_result=True).execute(
- sql.text(
- """
-select
- ind_col.index_id,
- col.name,
- ind_col.is_included_column
-from
- sys.columns as col
-join sys.tables as tab on
- tab.object_id = col.object_id
-join sys.index_columns as ind_col on
- ind_col.column_id = col.column_id
- and ind_col.object_id = tab.object_id
-join sys.schemas as sch on
- sch.schema_id = tab.schema_id
-where
- tab.name = :tabname
- and sch.name = :schname
- """
- )
- .bindparams(
- sql.bindparam("tabname", tablename, ischema.CoerceUnicode()),
- sql.bindparam("schname", owner, ischema.CoerceUnicode()),
- )
- .columns(name=sqltypes.Unicode())
- )
- for row in rp.mappings():
- if row["index_id"] not in indexes:
- continue
- index_def = indexes[row["index_id"]]
- is_colstore = index_def["dialect_options"].get("mssql_columnstore")
- is_clustered = index_def["dialect_options"].get("mssql_clustered")
- if not (is_colstore and is_clustered):
- # a clustered columnstore index includes all columns but does
- # not want them in the index definition
- if row["is_included_column"] and not is_colstore:
- # a noncludsted columnstore index reports that includes
- # columns but requires that are listed as normal columns
- index_def["include_columns"].append(row["name"])
- else:
- index_def["column_names"].append(row["name"])
- for index_info in indexes.values():
- # NOTE: "root level" include_columns is legacy, now part of
- # dialect_options (issue #7382)
- index_info["dialect_options"]["mssql_include"] = index_info[
- "include_columns"
- ]
-
- if indexes:
- return list(indexes.values())
- else:
- return self._default_or_error(
- connection, tablename, owner, ReflectionDefaults.indexes, **kw
- )
-
- @reflection.cache
- @_db_plus_owner
- def get_view_definition(
- self, connection, viewname, dbname, owner, schema, **kw
- ):
- view_def = connection.execute(
- sql.text(
- "select mod.definition "
- "from sys.sql_modules as mod "
- "join sys.views as views on mod.object_id = views.object_id "
- "join sys.schemas as sch on views.schema_id = sch.schema_id "
- "where views.name=:viewname and sch.name=:schname"
- ).bindparams(
- sql.bindparam("viewname", viewname, ischema.CoerceUnicode()),
- sql.bindparam("schname", owner, ischema.CoerceUnicode()),
- )
- ).scalar()
- if view_def:
- return view_def
- else:
- raise exc.NoSuchTableError(f"{owner}.{viewname}")
-
- @reflection.cache
- def get_table_comment(self, connection, table_name, schema=None, **kw):
- if not self.supports_comments:
- raise NotImplementedError(
- "Can't get table comments on current SQL Server version in use"
- )
-
- schema_name = schema if schema else self.default_schema_name
- COMMENT_SQL = """
- SELECT cast(com.value as nvarchar(max))
- FROM fn_listextendedproperty('MS_Description',
- 'schema', :schema, 'table', :table, NULL, NULL
- ) as com;
- """
-
- comment = connection.execute(
- sql.text(COMMENT_SQL).bindparams(
- sql.bindparam("schema", schema_name, ischema.CoerceUnicode()),
- sql.bindparam("table", table_name, ischema.CoerceUnicode()),
- )
- ).scalar()
- if comment:
- return {"text": comment}
- else:
- return self._default_or_error(
- connection,
- table_name,
- None,
- ReflectionDefaults.table_comment,
- **kw,
- )
-
- def _temp_table_name_like_pattern(self, tablename):
- # LIKE uses '%' to match zero or more characters and '_' to match any
- # single character. We want to match literal underscores, so T-SQL
- # requires that we enclose them in square brackets.
- return tablename + (
- ("[_][_][_]%") if not tablename.startswith("##") else ""
- )
-
- def _get_internal_temp_table_name(self, connection, tablename):
- # it's likely that schema is always "dbo", but since we can
- # get it here, let's get it.
- # see https://stackoverflow.com/questions/8311959/
- # specifying-schema-for-temporary-tables
-
- try:
- return connection.execute(
- sql.text(
- "select table_schema, table_name "
- "from tempdb.information_schema.tables "
- "where table_name like :p1"
- ),
- {"p1": self._temp_table_name_like_pattern(tablename)},
- ).one()
- except exc.MultipleResultsFound as me:
- raise exc.UnreflectableTableError(
- "Found more than one temporary table named '%s' in tempdb "
- "at this time. Cannot reliably resolve that name to its "
- "internal table name." % tablename
- ) from me
- except exc.NoResultFound as ne:
- raise exc.NoSuchTableError(
- "Unable to find a temporary table named '%s' in tempdb."
- % tablename
- ) from ne
-
- @reflection.cache
- @_db_plus_owner
- def get_columns(self, connection, tablename, dbname, owner, schema, **kw):
- is_temp_table = tablename.startswith("#")
- if is_temp_table:
- owner, tablename = self._get_internal_temp_table_name(
- connection, tablename
- )
-
- columns = ischema.mssql_temp_table_columns
- else:
- columns = ischema.columns
-
- computed_cols = ischema.computed_columns
- identity_cols = ischema.identity_columns
- if owner:
- whereclause = sql.and_(
- columns.c.table_name == tablename,
- columns.c.table_schema == owner,
- )
- full_name = columns.c.table_schema + "." + columns.c.table_name
- else:
- whereclause = columns.c.table_name == tablename
- full_name = columns.c.table_name
-
- if self._supports_nvarchar_max:
- computed_definition = computed_cols.c.definition
- else:
- # tds_version 4.2 does not support NVARCHAR(MAX)
- computed_definition = sql.cast(
- computed_cols.c.definition, NVARCHAR(4000)
- )
-
- object_id = func.object_id(full_name)
-
- s = (
- sql.select(
- columns.c.column_name,
- columns.c.data_type,
- columns.c.is_nullable,
- columns.c.character_maximum_length,
- columns.c.numeric_precision,
- columns.c.numeric_scale,
- columns.c.column_default,
- columns.c.collation_name,
- computed_definition,
- computed_cols.c.is_persisted,
- identity_cols.c.is_identity,
- identity_cols.c.seed_value,
- identity_cols.c.increment_value,
- ischema.extended_properties.c.value.label("comment"),
- )
- .select_from(columns)
- .outerjoin(
- computed_cols,
- onclause=sql.and_(
- computed_cols.c.object_id == object_id,
- computed_cols.c.name
- == columns.c.column_name.collate("DATABASE_DEFAULT"),
- ),
- )
- .outerjoin(
- identity_cols,
- onclause=sql.and_(
- identity_cols.c.object_id == object_id,
- identity_cols.c.name
- == columns.c.column_name.collate("DATABASE_DEFAULT"),
- ),
- )
- .outerjoin(
- ischema.extended_properties,
- onclause=sql.and_(
- ischema.extended_properties.c["class"] == 1,
- ischema.extended_properties.c.major_id == object_id,
- ischema.extended_properties.c.minor_id
- == columns.c.ordinal_position,
- ischema.extended_properties.c.name == "MS_Description",
- ),
- )
- .where(whereclause)
- .order_by(columns.c.ordinal_position)
- )
-
- c = connection.execution_options(future_result=True).execute(s)
-
- cols = []
- for row in c.mappings():
- name = row[columns.c.column_name]
- type_ = row[columns.c.data_type]
- nullable = row[columns.c.is_nullable] == "YES"
- charlen = row[columns.c.character_maximum_length]
- numericprec = row[columns.c.numeric_precision]
- numericscale = row[columns.c.numeric_scale]
- default = row[columns.c.column_default]
- collation = row[columns.c.collation_name]
- definition = row[computed_definition]
- is_persisted = row[computed_cols.c.is_persisted]
- is_identity = row[identity_cols.c.is_identity]
- identity_start = row[identity_cols.c.seed_value]
- identity_increment = row[identity_cols.c.increment_value]
- comment = row[ischema.extended_properties.c.value]
-
- coltype = self.ischema_names.get(type_, None)
-
- kwargs = {}
- if coltype in (
- MSString,
- MSChar,
- MSNVarchar,
- MSNChar,
- MSText,
- MSNText,
- MSBinary,
- MSVarBinary,
- sqltypes.LargeBinary,
- ):
- if charlen == -1:
- charlen = None
- kwargs["length"] = charlen
- if collation:
- kwargs["collation"] = collation
-
- if coltype is None:
- util.warn(
- "Did not recognize type '%s' of column '%s'"
- % (type_, name)
- )
- coltype = sqltypes.NULLTYPE
- else:
- if issubclass(coltype, sqltypes.Numeric):
- kwargs["precision"] = numericprec
-
- if not issubclass(coltype, sqltypes.Float):
- kwargs["scale"] = numericscale
-
- coltype = coltype(**kwargs)
- cdict = {
- "name": name,
- "type": coltype,
- "nullable": nullable,
- "default": default,
- "autoincrement": is_identity is not None,
- "comment": comment,
- }
-
- if definition is not None and is_persisted is not None:
- cdict["computed"] = {
- "sqltext": definition,
- "persisted": is_persisted,
- }
-
- if is_identity is not None:
- # identity_start and identity_increment are Decimal or None
- if identity_start is None or identity_increment is None:
- cdict["identity"] = {}
- else:
- if isinstance(coltype, sqltypes.BigInteger):
- start = int(identity_start)
- increment = int(identity_increment)
- elif isinstance(coltype, sqltypes.Integer):
- start = int(identity_start)
- increment = int(identity_increment)
- else:
- start = identity_start
- increment = identity_increment
-
- cdict["identity"] = {
- "start": start,
- "increment": increment,
- }
-
- cols.append(cdict)
-
- if cols:
- return cols
- else:
- return self._default_or_error(
- connection, tablename, owner, ReflectionDefaults.columns, **kw
- )
-
- @reflection.cache
- @_db_plus_owner
- def get_pk_constraint(
- self, connection, tablename, dbname, owner, schema, **kw
- ):
- pkeys = []
- TC = ischema.constraints
- C = ischema.key_constraints.alias("C")
-
- # Primary key constraints
- s = (
- sql.select(
- C.c.column_name,
- TC.c.constraint_type,
- C.c.constraint_name,
- func.objectproperty(
- func.object_id(
- C.c.table_schema + "." + C.c.constraint_name
- ),
- "CnstIsClustKey",
- ).label("is_clustered"),
- )
- .where(
- sql.and_(
- TC.c.constraint_name == C.c.constraint_name,
- TC.c.table_schema == C.c.table_schema,
- C.c.table_name == tablename,
- C.c.table_schema == owner,
- ),
- )
- .order_by(TC.c.constraint_name, C.c.ordinal_position)
- )
- c = connection.execution_options(future_result=True).execute(s)
- constraint_name = None
- is_clustered = None
- for row in c.mappings():
- if "PRIMARY" in row[TC.c.constraint_type.name]:
- pkeys.append(row["COLUMN_NAME"])
- if constraint_name is None:
- constraint_name = row[C.c.constraint_name.name]
- if is_clustered is None:
- is_clustered = row["is_clustered"]
- if pkeys:
- return {
- "constrained_columns": pkeys,
- "name": constraint_name,
- "dialect_options": {"mssql_clustered": is_clustered},
- }
- else:
- return self._default_or_error(
- connection,
- tablename,
- owner,
- ReflectionDefaults.pk_constraint,
- **kw,
- )
-
- @reflection.cache
- @_db_plus_owner
- def get_foreign_keys(
- self, connection, tablename, dbname, owner, schema, **kw
- ):
- # Foreign key constraints
- s = (
- text(
- """\
-WITH fk_info AS (
- SELECT
- ischema_ref_con.constraint_schema,
- ischema_ref_con.constraint_name,
- ischema_key_col.ordinal_position,
- ischema_key_col.table_schema,
- ischema_key_col.table_name,
- ischema_ref_con.unique_constraint_schema,
- ischema_ref_con.unique_constraint_name,
- ischema_ref_con.match_option,
- ischema_ref_con.update_rule,
- ischema_ref_con.delete_rule,
- ischema_key_col.column_name AS constrained_column
- FROM
- INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con
- INNER JOIN
- INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON
- ischema_key_col.table_schema = ischema_ref_con.constraint_schema
- AND ischema_key_col.constraint_name =
- ischema_ref_con.constraint_name
- WHERE ischema_key_col.table_name = :tablename
- AND ischema_key_col.table_schema = :owner
-),
-constraint_info AS (
- SELECT
- ischema_key_col.constraint_schema,
- ischema_key_col.constraint_name,
- ischema_key_col.ordinal_position,
- ischema_key_col.table_schema,
- ischema_key_col.table_name,
- ischema_key_col.column_name
- FROM
- INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col
-),
-index_info AS (
- SELECT
- sys.schemas.name AS index_schema,
- sys.indexes.name AS index_name,
- sys.index_columns.key_ordinal AS ordinal_position,
- sys.schemas.name AS table_schema,
- sys.objects.name AS table_name,
- sys.columns.name AS column_name
- FROM
- sys.indexes
- INNER JOIN
- sys.objects ON
- sys.objects.object_id = sys.indexes.object_id
- INNER JOIN
- sys.schemas ON
- sys.schemas.schema_id = sys.objects.schema_id
- INNER JOIN
- sys.index_columns ON
- sys.index_columns.object_id = sys.objects.object_id
- AND sys.index_columns.index_id = sys.indexes.index_id
- INNER JOIN
- sys.columns ON
- sys.columns.object_id = sys.indexes.object_id
- AND sys.columns.column_id = sys.index_columns.column_id
-)
- SELECT
- fk_info.constraint_schema,
- fk_info.constraint_name,
- fk_info.ordinal_position,
- fk_info.constrained_column,
- constraint_info.table_schema AS referred_table_schema,
- constraint_info.table_name AS referred_table_name,
- constraint_info.column_name AS referred_column,
- fk_info.match_option,
- fk_info.update_rule,
- fk_info.delete_rule
- FROM
- fk_info INNER JOIN constraint_info ON
- constraint_info.constraint_schema =
- fk_info.unique_constraint_schema
- AND constraint_info.constraint_name =
- fk_info.unique_constraint_name
- AND constraint_info.ordinal_position = fk_info.ordinal_position
- UNION
- SELECT
- fk_info.constraint_schema,
- fk_info.constraint_name,
- fk_info.ordinal_position,
- fk_info.constrained_column,
- index_info.table_schema AS referred_table_schema,
- index_info.table_name AS referred_table_name,
- index_info.column_name AS referred_column,
- fk_info.match_option,
- fk_info.update_rule,
- fk_info.delete_rule
- FROM
- fk_info INNER JOIN index_info ON
- index_info.index_schema = fk_info.unique_constraint_schema
- AND index_info.index_name = fk_info.unique_constraint_name
- AND index_info.ordinal_position = fk_info.ordinal_position
-
- ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
- fk_info.ordinal_position
-"""
- )
- .bindparams(
- sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
- sql.bindparam("owner", owner, ischema.CoerceUnicode()),
- )
- .columns(
- constraint_schema=sqltypes.Unicode(),
- constraint_name=sqltypes.Unicode(),
- table_schema=sqltypes.Unicode(),
- table_name=sqltypes.Unicode(),
- constrained_column=sqltypes.Unicode(),
- referred_table_schema=sqltypes.Unicode(),
- referred_table_name=sqltypes.Unicode(),
- referred_column=sqltypes.Unicode(),
- )
- )
-
- # group rows by constraint ID, to handle multi-column FKs
- fkeys = []
-
- def fkey_rec():
- return {
- "name": None,
- "constrained_columns": [],
- "referred_schema": None,
- "referred_table": None,
- "referred_columns": [],
- "options": {},
- }
-
- fkeys = util.defaultdict(fkey_rec)
-
- for r in connection.execute(s).all():
- (
- _, # constraint schema
- rfknm,
- _, # ordinal position
- scol,
- rschema,
- rtbl,
- rcol,
- # TODO: we support match=<keyword> for foreign keys so
- # we can support this also, PG has match=FULL for example
- # but this seems to not be a valid value for SQL Server
- _, # match rule
- fkuprule,
- fkdelrule,
- ) = r
-
- rec = fkeys[rfknm]
- rec["name"] = rfknm
-
- if fkuprule != "NO ACTION":
- rec["options"]["onupdate"] = fkuprule
-
- if fkdelrule != "NO ACTION":
- rec["options"]["ondelete"] = fkdelrule
-
- if not rec["referred_table"]:
- rec["referred_table"] = rtbl
- if schema is not None or owner != rschema:
- if dbname:
- rschema = dbname + "." + rschema
- rec["referred_schema"] = rschema
-
- local_cols, remote_cols = (
- rec["constrained_columns"],
- rec["referred_columns"],
- )
-
- local_cols.append(scol)
- remote_cols.append(rcol)
-
- if fkeys:
- return list(fkeys.values())
- else:
- return self._default_or_error(
- connection,
- tablename,
- owner,
- ReflectionDefaults.foreign_keys,
- **kw,
- )
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/information_schema.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/information_schema.py
deleted file mode 100644
index 0c5f237..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/information_schema.py
+++ /dev/null
@@ -1,254 +0,0 @@
-# dialects/mssql/information_schema.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-from ... import cast
-from ... import Column
-from ... import MetaData
-from ... import Table
-from ...ext.compiler import compiles
-from ...sql import expression
-from ...types import Boolean
-from ...types import Integer
-from ...types import Numeric
-from ...types import NVARCHAR
-from ...types import String
-from ...types import TypeDecorator
-from ...types import Unicode
-
-
-ischema = MetaData()
-
-
-class CoerceUnicode(TypeDecorator):
- impl = Unicode
- cache_ok = True
-
- def bind_expression(self, bindvalue):
- return _cast_on_2005(bindvalue)
-
-
-class _cast_on_2005(expression.ColumnElement):
- def __init__(self, bindvalue):
- self.bindvalue = bindvalue
-
-
-@compiles(_cast_on_2005)
-def _compile(element, compiler, **kw):
- from . import base
-
- if (
- compiler.dialect.server_version_info is None
- or compiler.dialect.server_version_info < base.MS_2005_VERSION
- ):
- return compiler.process(element.bindvalue, **kw)
- else:
- return compiler.process(cast(element.bindvalue, Unicode), **kw)
-
-
-schemata = Table(
- "SCHEMATA",
- ischema,
- Column("CATALOG_NAME", CoerceUnicode, key="catalog_name"),
- Column("SCHEMA_NAME", CoerceUnicode, key="schema_name"),
- Column("SCHEMA_OWNER", CoerceUnicode, key="schema_owner"),
- schema="INFORMATION_SCHEMA",
-)
-
-tables = Table(
- "TABLES",
- ischema,
- Column("TABLE_CATALOG", CoerceUnicode, key="table_catalog"),
- Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
- Column("TABLE_NAME", CoerceUnicode, key="table_name"),
- Column("TABLE_TYPE", CoerceUnicode, key="table_type"),
- schema="INFORMATION_SCHEMA",
-)
-
-columns = Table(
- "COLUMNS",
- ischema,
- Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
- Column("TABLE_NAME", CoerceUnicode, key="table_name"),
- Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
- Column("IS_NULLABLE", Integer, key="is_nullable"),
- Column("DATA_TYPE", String, key="data_type"),
- Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
- Column(
- "CHARACTER_MAXIMUM_LENGTH", Integer, key="character_maximum_length"
- ),
- Column("NUMERIC_PRECISION", Integer, key="numeric_precision"),
- Column("NUMERIC_SCALE", Integer, key="numeric_scale"),
- Column("COLUMN_DEFAULT", Integer, key="column_default"),
- Column("COLLATION_NAME", String, key="collation_name"),
- schema="INFORMATION_SCHEMA",
-)
-
-mssql_temp_table_columns = Table(
- "COLUMNS",
- ischema,
- Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
- Column("TABLE_NAME", CoerceUnicode, key="table_name"),
- Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
- Column("IS_NULLABLE", Integer, key="is_nullable"),
- Column("DATA_TYPE", String, key="data_type"),
- Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
- Column(
- "CHARACTER_MAXIMUM_LENGTH", Integer, key="character_maximum_length"
- ),
- Column("NUMERIC_PRECISION", Integer, key="numeric_precision"),
- Column("NUMERIC_SCALE", Integer, key="numeric_scale"),
- Column("COLUMN_DEFAULT", Integer, key="column_default"),
- Column("COLLATION_NAME", String, key="collation_name"),
- schema="tempdb.INFORMATION_SCHEMA",
-)
-
-constraints = Table(
- "TABLE_CONSTRAINTS",
- ischema,
- Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
- Column("TABLE_NAME", CoerceUnicode, key="table_name"),
- Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
- Column("CONSTRAINT_TYPE", CoerceUnicode, key="constraint_type"),
- schema="INFORMATION_SCHEMA",
-)
-
-column_constraints = Table(
- "CONSTRAINT_COLUMN_USAGE",
- ischema,
- Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
- Column("TABLE_NAME", CoerceUnicode, key="table_name"),
- Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
- Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
- schema="INFORMATION_SCHEMA",
-)
-
-key_constraints = Table(
- "KEY_COLUMN_USAGE",
- ischema,
- Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
- Column("TABLE_NAME", CoerceUnicode, key="table_name"),
- Column("COLUMN_NAME", CoerceUnicode, key="column_name"),
- Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
- Column("CONSTRAINT_SCHEMA", CoerceUnicode, key="constraint_schema"),
- Column("ORDINAL_POSITION", Integer, key="ordinal_position"),
- schema="INFORMATION_SCHEMA",
-)
-
-ref_constraints = Table(
- "REFERENTIAL_CONSTRAINTS",
- ischema,
- Column("CONSTRAINT_CATALOG", CoerceUnicode, key="constraint_catalog"),
- Column("CONSTRAINT_SCHEMA", CoerceUnicode, key="constraint_schema"),
- Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"),
- # TODO: is CATLOG misspelled ?
- Column(
- "UNIQUE_CONSTRAINT_CATLOG",
- CoerceUnicode,
- key="unique_constraint_catalog",
- ),
- Column(
- "UNIQUE_CONSTRAINT_SCHEMA",
- CoerceUnicode,
- key="unique_constraint_schema",
- ),
- Column(
- "UNIQUE_CONSTRAINT_NAME", CoerceUnicode, key="unique_constraint_name"
- ),
- Column("MATCH_OPTION", String, key="match_option"),
- Column("UPDATE_RULE", String, key="update_rule"),
- Column("DELETE_RULE", String, key="delete_rule"),
- schema="INFORMATION_SCHEMA",
-)
-
-views = Table(
- "VIEWS",
- ischema,
- Column("TABLE_CATALOG", CoerceUnicode, key="table_catalog"),
- Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"),
- Column("TABLE_NAME", CoerceUnicode, key="table_name"),
- Column("VIEW_DEFINITION", CoerceUnicode, key="view_definition"),
- Column("CHECK_OPTION", String, key="check_option"),
- Column("IS_UPDATABLE", String, key="is_updatable"),
- schema="INFORMATION_SCHEMA",
-)
-
-computed_columns = Table(
- "computed_columns",
- ischema,
- Column("object_id", Integer),
- Column("name", CoerceUnicode),
- Column("is_computed", Boolean),
- Column("is_persisted", Boolean),
- Column("definition", CoerceUnicode),
- schema="sys",
-)
-
-sequences = Table(
- "SEQUENCES",
- ischema,
- Column("SEQUENCE_CATALOG", CoerceUnicode, key="sequence_catalog"),
- Column("SEQUENCE_SCHEMA", CoerceUnicode, key="sequence_schema"),
- Column("SEQUENCE_NAME", CoerceUnicode, key="sequence_name"),
- schema="INFORMATION_SCHEMA",
-)
-
-
-class NumericSqlVariant(TypeDecorator):
- r"""This type casts sql_variant columns in the identity_columns view
- to numeric. This is required because:
-
- * pyodbc does not support sql_variant
- * pymssql under python 2 return the byte representation of the number,
- int 1 is returned as "\x01\x00\x00\x00". On python 3 it returns the
- correct value as string.
- """
-
- impl = Unicode
- cache_ok = True
-
- def column_expression(self, colexpr):
- return cast(colexpr, Numeric(38, 0))
-
-
-identity_columns = Table(
- "identity_columns",
- ischema,
- Column("object_id", Integer),
- Column("name", CoerceUnicode),
- Column("is_identity", Boolean),
- Column("seed_value", NumericSqlVariant),
- Column("increment_value", NumericSqlVariant),
- Column("last_value", NumericSqlVariant),
- Column("is_not_for_replication", Boolean),
- schema="sys",
-)
-
-
-class NVarcharSqlVariant(TypeDecorator):
- """This type casts sql_variant columns in the extended_properties view
- to nvarchar. This is required because pyodbc does not support sql_variant
- """
-
- impl = Unicode
- cache_ok = True
-
- def column_expression(self, colexpr):
- return cast(colexpr, NVARCHAR)
-
-
-extended_properties = Table(
- "extended_properties",
- ischema,
- Column("class", Integer), # TINYINT
- Column("class_desc", CoerceUnicode),
- Column("major_id", Integer),
- Column("minor_id", Integer),
- Column("name", CoerceUnicode),
- Column("value", NVarcharSqlVariant),
- schema="sys",
-)
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/json.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/json.py
deleted file mode 100644
index 18bea09..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/json.py
+++ /dev/null
@@ -1,133 +0,0 @@
-# dialects/mssql/json.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-from ... import types as sqltypes
-
-# technically, all the dialect-specific datatypes that don't have any special
-# behaviors would be private with names like _MSJson. However, we haven't been
-# doing this for mysql.JSON or sqlite.JSON which both have JSON / JSONIndexType
-# / JSONPathType in their json.py files, so keep consistent with that
-# sub-convention for now. A future change can update them all to be
-# package-private at once.
-
-
-class JSON(sqltypes.JSON):
- """MSSQL JSON type.
-
- MSSQL supports JSON-formatted data as of SQL Server 2016.
-
- The :class:`_mssql.JSON` datatype at the DDL level will represent the
- datatype as ``NVARCHAR(max)``, but provides for JSON-level comparison
- functions as well as Python coercion behavior.
-
- :class:`_mssql.JSON` is used automatically whenever the base
- :class:`_types.JSON` datatype is used against a SQL Server backend.
-
- .. seealso::
-
- :class:`_types.JSON` - main documentation for the generic
- cross-platform JSON datatype.
-
- The :class:`_mssql.JSON` type supports persistence of JSON values
- as well as the core index operations provided by :class:`_types.JSON`
- datatype, by adapting the operations to render the ``JSON_VALUE``
- or ``JSON_QUERY`` functions at the database level.
-
- The SQL Server :class:`_mssql.JSON` type necessarily makes use of the
- ``JSON_QUERY`` and ``JSON_VALUE`` functions when querying for elements
- of a JSON object. These two functions have a major restriction in that
- they are **mutually exclusive** based on the type of object to be returned.
- The ``JSON_QUERY`` function **only** returns a JSON dictionary or list,
- but not an individual string, numeric, or boolean element; the
- ``JSON_VALUE`` function **only** returns an individual string, numeric,
- or boolean element. **both functions either return NULL or raise
- an error if they are not used against the correct expected value**.
-
- To handle this awkward requirement, indexed access rules are as follows:
-
- 1. When extracting a sub element from a JSON that is itself a JSON
- dictionary or list, the :meth:`_types.JSON.Comparator.as_json` accessor
- should be used::
-
- stmt = select(
- data_table.c.data["some key"].as_json()
- ).where(
- data_table.c.data["some key"].as_json() == {"sub": "structure"}
- )
-
- 2. When extracting a sub element from a JSON that is a plain boolean,
- string, integer, or float, use the appropriate method among
- :meth:`_types.JSON.Comparator.as_boolean`,
- :meth:`_types.JSON.Comparator.as_string`,
- :meth:`_types.JSON.Comparator.as_integer`,
- :meth:`_types.JSON.Comparator.as_float`::
-
- stmt = select(
- data_table.c.data["some key"].as_string()
- ).where(
- data_table.c.data["some key"].as_string() == "some string"
- )
-
- .. versionadded:: 1.4
-
-
- """
-
- # note there was a result processor here that was looking for "number",
- # but none of the tests seem to exercise it.
-
-
-# Note: these objects currently match exactly those of MySQL, however since
-# these are not generalizable to all JSON implementations, remain separately
-# implemented for each dialect.
-class _FormatTypeMixin:
- def _format_value(self, value):
- raise NotImplementedError()
-
- def bind_processor(self, dialect):
- super_proc = self.string_bind_processor(dialect)
-
- def process(value):
- value = self._format_value(value)
- if super_proc:
- value = super_proc(value)
- return value
-
- return process
-
- def literal_processor(self, dialect):
- super_proc = self.string_literal_processor(dialect)
-
- def process(value):
- value = self._format_value(value)
- if super_proc:
- value = super_proc(value)
- return value
-
- return process
-
-
-class JSONIndexType(_FormatTypeMixin, sqltypes.JSON.JSONIndexType):
- def _format_value(self, value):
- if isinstance(value, int):
- value = "$[%s]" % value
- else:
- value = '$."%s"' % value
- return value
-
-
-class JSONPathType(_FormatTypeMixin, sqltypes.JSON.JSONPathType):
- def _format_value(self, value):
- return "$%s" % (
- "".join(
- [
- "[%s]" % elem if isinstance(elem, int) else '."%s"' % elem
- for elem in value
- ]
- )
- )
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/provision.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/provision.py
deleted file mode 100644
index 143d386..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/provision.py
+++ /dev/null
@@ -1,155 +0,0 @@
-# dialects/mssql/provision.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-from sqlalchemy import inspect
-from sqlalchemy import Integer
-from ... import create_engine
-from ... import exc
-from ...schema import Column
-from ...schema import DropConstraint
-from ...schema import ForeignKeyConstraint
-from ...schema import MetaData
-from ...schema import Table
-from ...testing.provision import create_db
-from ...testing.provision import drop_all_schema_objects_pre_tables
-from ...testing.provision import drop_db
-from ...testing.provision import generate_driver_url
-from ...testing.provision import get_temp_table_name
-from ...testing.provision import log
-from ...testing.provision import normalize_sequence
-from ...testing.provision import run_reap_dbs
-from ...testing.provision import temp_table_keyword_args
-
-
-@generate_driver_url.for_db("mssql")
-def generate_driver_url(url, driver, query_str):
- backend = url.get_backend_name()
-
- new_url = url.set(drivername="%s+%s" % (backend, driver))
-
- if driver not in ("pyodbc", "aioodbc"):
- new_url = new_url.set(query="")
-
- if driver == "aioodbc":
- new_url = new_url.update_query_dict({"MARS_Connection": "Yes"})
-
- if query_str:
- new_url = new_url.update_query_string(query_str)
-
- try:
- new_url.get_dialect()
- except exc.NoSuchModuleError:
- return None
- else:
- return new_url
-
-
-@create_db.for_db("mssql")
-def _mssql_create_db(cfg, eng, ident):
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- conn.exec_driver_sql("create database %s" % ident)
- conn.exec_driver_sql(
- "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident
- )
- conn.exec_driver_sql(
- "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident
- )
- conn.exec_driver_sql("use %s" % ident)
- conn.exec_driver_sql("create schema test_schema")
- conn.exec_driver_sql("create schema test_schema_2")
-
-
-@drop_db.for_db("mssql")
-def _mssql_drop_db(cfg, eng, ident):
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- _mssql_drop_ignore(conn, ident)
-
-
-def _mssql_drop_ignore(conn, ident):
- try:
- # typically when this happens, we can't KILL the session anyway,
- # so let the cleanup process drop the DBs
- # for row in conn.exec_driver_sql(
- # "select session_id from sys.dm_exec_sessions "
- # "where database_id=db_id('%s')" % ident):
- # log.info("killing SQL server session %s", row['session_id'])
- # conn.exec_driver_sql("kill %s" % row['session_id'])
- conn.exec_driver_sql("drop database %s" % ident)
- log.info("Reaped db: %s", ident)
- return True
- except exc.DatabaseError as err:
- log.warning("couldn't drop db: %s", err)
- return False
-
-
-@run_reap_dbs.for_db("mssql")
-def _reap_mssql_dbs(url, idents):
- log.info("db reaper connecting to %r", url)
- eng = create_engine(url)
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- log.info("identifiers in file: %s", ", ".join(idents))
-
- to_reap = conn.exec_driver_sql(
- "select d.name from sys.databases as d where name "
- "like 'TEST_%' and not exists (select session_id "
- "from sys.dm_exec_sessions "
- "where database_id=d.database_id)"
- )
- all_names = {dbname.lower() for (dbname,) in to_reap}
- to_drop = set()
- for name in all_names:
- if name in idents:
- to_drop.add(name)
-
- dropped = total = 0
- for total, dbname in enumerate(to_drop, 1):
- if _mssql_drop_ignore(conn, dbname):
- dropped += 1
- log.info(
- "Dropped %d out of %d stale databases detected", dropped, total
- )
-
-
-@temp_table_keyword_args.for_db("mssql")
-def _mssql_temp_table_keyword_args(cfg, eng):
- return {}
-
-
-@get_temp_table_name.for_db("mssql")
-def _mssql_get_temp_table_name(cfg, eng, base_name):
- return "##" + base_name
-
-
-@drop_all_schema_objects_pre_tables.for_db("mssql")
-def drop_all_schema_objects_pre_tables(cfg, eng):
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- inspector = inspect(conn)
- for schema in (None, "dbo", cfg.test_schema, cfg.test_schema_2):
- for tname in inspector.get_table_names(schema=schema):
- tb = Table(
- tname,
- MetaData(),
- Column("x", Integer),
- Column("y", Integer),
- schema=schema,
- )
- for fk in inspect(conn).get_foreign_keys(tname, schema=schema):
- conn.execute(
- DropConstraint(
- ForeignKeyConstraint(
- [tb.c.x], [tb.c.y], name=fk["name"]
- )
- )
- )
-
-
-@normalize_sequence.for_db("mssql")
-def normalize_sequence(cfg, sequence):
- if sequence.start is None:
- sequence.start = 1
- return sequence
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pymssql.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pymssql.py
deleted file mode 100644
index ea1f9bd..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pymssql.py
+++ /dev/null
@@ -1,125 +0,0 @@
-# dialects/mssql/pymssql.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-"""
-.. dialect:: mssql+pymssql
- :name: pymssql
- :dbapi: pymssql
- :connectstring: mssql+pymssql://<username>:<password>@<freetds_name>/?charset=utf8
-
-pymssql is a Python module that provides a Python DBAPI interface around
-`FreeTDS <https://www.freetds.org/>`_.
-
-.. versionchanged:: 2.0.5
-
- pymssql was restored to SQLAlchemy's continuous integration testing
-
-
-""" # noqa
-import re
-
-from .base import MSDialect
-from .base import MSIdentifierPreparer
-from ... import types as sqltypes
-from ... import util
-from ...engine import processors
-
-
-class _MSNumeric_pymssql(sqltypes.Numeric):
- def result_processor(self, dialect, type_):
- if not self.asdecimal:
- return processors.to_float
- else:
- return sqltypes.Numeric.result_processor(self, dialect, type_)
-
-
-class MSIdentifierPreparer_pymssql(MSIdentifierPreparer):
- def __init__(self, dialect):
- super().__init__(dialect)
- # pymssql has the very unusual behavior that it uses pyformat
- # yet does not require that percent signs be doubled
- self._double_percents = False
-
-
-class MSDialect_pymssql(MSDialect):
- supports_statement_cache = True
- supports_native_decimal = True
- supports_native_uuid = True
- driver = "pymssql"
-
- preparer = MSIdentifierPreparer_pymssql
-
- colspecs = util.update_copy(
- MSDialect.colspecs,
- {sqltypes.Numeric: _MSNumeric_pymssql, sqltypes.Float: sqltypes.Float},
- )
-
- @classmethod
- def import_dbapi(cls):
- module = __import__("pymssql")
- # pymmsql < 2.1.1 doesn't have a Binary method. we use string
- client_ver = tuple(int(x) for x in module.__version__.split("."))
- if client_ver < (2, 1, 1):
- # TODO: monkeypatching here is less than ideal
- module.Binary = lambda x: x if hasattr(x, "decode") else str(x)
-
- if client_ver < (1,):
- util.warn(
- "The pymssql dialect expects at least "
- "the 1.0 series of the pymssql DBAPI."
- )
- return module
-
- def _get_server_version_info(self, connection):
- vers = connection.exec_driver_sql("select @@version").scalar()
- m = re.match(r"Microsoft .*? - (\d+)\.(\d+)\.(\d+)\.(\d+)", vers)
- if m:
- return tuple(int(x) for x in m.group(1, 2, 3, 4))
- else:
- return None
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username="user")
- opts.update(url.query)
- port = opts.pop("port", None)
- if port and "host" in opts:
- opts["host"] = "%s:%s" % (opts["host"], port)
- return ([], opts)
-
- def is_disconnect(self, e, connection, cursor):
- for msg in (
- "Adaptive Server connection timed out",
- "Net-Lib error during Connection reset by peer",
- "message 20003", # connection timeout
- "Error 10054",
- "Not connected to any MS SQL server",
- "Connection is closed",
- "message 20006", # Write to the server failed
- "message 20017", # Unexpected EOF from the server
- "message 20047", # DBPROCESS is dead or not enabled
- ):
- if msg in str(e):
- return True
- else:
- return False
-
- def get_isolation_level_values(self, dbapi_connection):
- return super().get_isolation_level_values(dbapi_connection) + [
- "AUTOCOMMIT"
- ]
-
- def set_isolation_level(self, dbapi_connection, level):
- if level == "AUTOCOMMIT":
- dbapi_connection.autocommit(True)
- else:
- dbapi_connection.autocommit(False)
- super().set_isolation_level(dbapi_connection, level)
-
-
-dialect = MSDialect_pymssql
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pyodbc.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pyodbc.py
deleted file mode 100644
index 76ea046..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pyodbc.py
+++ /dev/null
@@ -1,745 +0,0 @@
-# dialects/mssql/pyodbc.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-r"""
-.. dialect:: mssql+pyodbc
- :name: PyODBC
- :dbapi: pyodbc
- :connectstring: mssql+pyodbc://<username>:<password>@<dsnname>
- :url: https://pypi.org/project/pyodbc/
-
-Connecting to PyODBC
---------------------
-
-The URL here is to be translated to PyODBC connection strings, as
-detailed in `ConnectionStrings <https://code.google.com/p/pyodbc/wiki/ConnectionStrings>`_.
-
-DSN Connections
-^^^^^^^^^^^^^^^
-
-A DSN connection in ODBC means that a pre-existing ODBC datasource is
-configured on the client machine. The application then specifies the name
-of this datasource, which encompasses details such as the specific ODBC driver
-in use as well as the network address of the database. Assuming a datasource
-is configured on the client, a basic DSN-based connection looks like::
-
- engine = create_engine("mssql+pyodbc://scott:tiger@some_dsn")
-
-Which above, will pass the following connection string to PyODBC::
-
- DSN=some_dsn;UID=scott;PWD=tiger
-
-If the username and password are omitted, the DSN form will also add
-the ``Trusted_Connection=yes`` directive to the ODBC string.
-
-Hostname Connections
-^^^^^^^^^^^^^^^^^^^^
-
-Hostname-based connections are also supported by pyodbc. These are often
-easier to use than a DSN and have the additional advantage that the specific
-database name to connect towards may be specified locally in the URL, rather
-than it being fixed as part of a datasource configuration.
-
-When using a hostname connection, the driver name must also be specified in the
-query parameters of the URL. As these names usually have spaces in them, the
-name must be URL encoded which means using plus signs for spaces::
-
- engine = create_engine("mssql+pyodbc://scott:tiger@myhost:port/databasename?driver=ODBC+Driver+17+for+SQL+Server")
-
-The ``driver`` keyword is significant to the pyodbc dialect and must be
-specified in lowercase.
-
-Any other names passed in the query string are passed through in the pyodbc
-connect string, such as ``authentication``, ``TrustServerCertificate``, etc.
-Multiple keyword arguments must be separated by an ampersand (``&``); these
-will be translated to semicolons when the pyodbc connect string is generated
-internally::
-
- e = create_engine(
- "mssql+pyodbc://scott:tiger@mssql2017:1433/test?"
- "driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes"
- "&authentication=ActiveDirectoryIntegrated"
- )
-
-The equivalent URL can be constructed using :class:`_sa.engine.URL`::
-
- from sqlalchemy.engine import URL
- connection_url = URL.create(
- "mssql+pyodbc",
- username="scott",
- password="tiger",
- host="mssql2017",
- port=1433,
- database="test",
- query={
- "driver": "ODBC Driver 18 for SQL Server",
- "TrustServerCertificate": "yes",
- "authentication": "ActiveDirectoryIntegrated",
- },
- )
-
-
-Pass through exact Pyodbc string
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-A PyODBC connection string can also be sent in pyodbc's format directly, as
-specified in `the PyODBC documentation
-<https://github.com/mkleehammer/pyodbc/wiki/Connecting-to-databases>`_,
-using the parameter ``odbc_connect``. A :class:`_sa.engine.URL` object
-can help make this easier::
-
- from sqlalchemy.engine import URL
- connection_string = "DRIVER={SQL Server Native Client 10.0};SERVER=dagger;DATABASE=test;UID=user;PWD=password"
- connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": connection_string})
-
- engine = create_engine(connection_url)
-
-.. _mssql_pyodbc_access_tokens:
-
-Connecting to databases with access tokens
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-Some database servers are set up to only accept access tokens for login. For
-example, SQL Server allows the use of Azure Active Directory tokens to connect
-to databases. This requires creating a credential object using the
-``azure-identity`` library. More information about the authentication step can be
-found in `Microsoft's documentation
-<https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate?tabs=bash>`_.
-
-After getting an engine, the credentials need to be sent to ``pyodbc.connect``
-each time a connection is requested. One way to do this is to set up an event
-listener on the engine that adds the credential token to the dialect's connect
-call. This is discussed more generally in :ref:`engines_dynamic_tokens`. For
-SQL Server in particular, this is passed as an ODBC connection attribute with
-a data structure `described by Microsoft
-<https://docs.microsoft.com/en-us/sql/connect/odbc/using-azure-active-directory#authenticating-with-an-access-token>`_.
-
-The following code snippet will create an engine that connects to an Azure SQL
-database using Azure credentials::
-
- import struct
- from sqlalchemy import create_engine, event
- from sqlalchemy.engine.url import URL
- from azure import identity
-
- SQL_COPT_SS_ACCESS_TOKEN = 1256 # Connection option for access tokens, as defined in msodbcsql.h
- TOKEN_URL = "https://database.windows.net/" # The token URL for any Azure SQL database
-
- connection_string = "mssql+pyodbc://@my-server.database.windows.net/myDb?driver=ODBC+Driver+17+for+SQL+Server"
-
- engine = create_engine(connection_string)
-
- azure_credentials = identity.DefaultAzureCredential()
-
- @event.listens_for(engine, "do_connect")
- def provide_token(dialect, conn_rec, cargs, cparams):
- # remove the "Trusted_Connection" parameter that SQLAlchemy adds
- cargs[0] = cargs[0].replace(";Trusted_Connection=Yes", "")
-
- # create token credential
- raw_token = azure_credentials.get_token(TOKEN_URL).token.encode("utf-16-le")
- token_struct = struct.pack(f"<I{len(raw_token)}s", len(raw_token), raw_token)
-
- # apply it to keyword arguments
- cparams["attrs_before"] = {SQL_COPT_SS_ACCESS_TOKEN: token_struct}
-
-.. tip::
-
- The ``Trusted_Connection`` token is currently added by the SQLAlchemy
- pyodbc dialect when no username or password is present. This needs
- to be removed per Microsoft's
- `documentation for Azure access tokens
- <https://docs.microsoft.com/en-us/sql/connect/odbc/using-azure-active-directory#authenticating-with-an-access-token>`_,
- stating that a connection string when using an access token must not contain
- ``UID``, ``PWD``, ``Authentication`` or ``Trusted_Connection`` parameters.
-
-.. _azure_synapse_ignore_no_transaction_on_rollback:
-
-Avoiding transaction-related exceptions on Azure Synapse Analytics
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-Azure Synapse Analytics has a significant difference in its transaction
-handling compared to plain SQL Server; in some cases an error within a Synapse
-transaction can cause it to be arbitrarily terminated on the server side, which
-then causes the DBAPI ``.rollback()`` method (as well as ``.commit()``) to
-fail. The issue prevents the usual DBAPI contract of allowing ``.rollback()``
-to pass silently if no transaction is present as the driver does not expect
-this condition. The symptom of this failure is an exception with a message
-resembling 'No corresponding transaction found. (111214)' when attempting to
-emit a ``.rollback()`` after an operation had a failure of some kind.
-
-This specific case can be handled by passing ``ignore_no_transaction_on_rollback=True`` to
-the SQL Server dialect via the :func:`_sa.create_engine` function as follows::
-
- engine = create_engine(connection_url, ignore_no_transaction_on_rollback=True)
-
-Using the above parameter, the dialect will catch ``ProgrammingError``
-exceptions raised during ``connection.rollback()`` and emit a warning
-if the error message contains code ``111214``, however will not raise
-an exception.
-
-.. versionadded:: 1.4.40 Added the
- ``ignore_no_transaction_on_rollback=True`` parameter.
-
-Enable autocommit for Azure SQL Data Warehouse (DW) connections
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-Azure SQL Data Warehouse does not support transactions,
-and that can cause problems with SQLAlchemy's "autobegin" (and implicit
-commit/rollback) behavior. We can avoid these problems by enabling autocommit
-at both the pyodbc and engine levels::
-
- connection_url = sa.engine.URL.create(
- "mssql+pyodbc",
- username="scott",
- password="tiger",
- host="dw.azure.example.com",
- database="mydb",
- query={
- "driver": "ODBC Driver 17 for SQL Server",
- "autocommit": "True",
- },
- )
-
- engine = create_engine(connection_url).execution_options(
- isolation_level="AUTOCOMMIT"
- )
-
-Avoiding sending large string parameters as TEXT/NTEXT
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-By default, for historical reasons, Microsoft's ODBC drivers for SQL Server
-send long string parameters (greater than 4000 SBCS characters or 2000 Unicode
-characters) as TEXT/NTEXT values. TEXT and NTEXT have been deprecated for many
-years and are starting to cause compatibility issues with newer versions of
-SQL_Server/Azure. For example, see `this
-issue <https://github.com/mkleehammer/pyodbc/issues/835>`_.
-
-Starting with ODBC Driver 18 for SQL Server we can override the legacy
-behavior and pass long strings as varchar(max)/nvarchar(max) using the
-``LongAsMax=Yes`` connection string parameter::
-
- connection_url = sa.engine.URL.create(
- "mssql+pyodbc",
- username="scott",
- password="tiger",
- host="mssqlserver.example.com",
- database="mydb",
- query={
- "driver": "ODBC Driver 18 for SQL Server",
- "LongAsMax": "Yes",
- },
- )
-
-
-Pyodbc Pooling / connection close behavior
-------------------------------------------
-
-PyODBC uses internal `pooling
-<https://github.com/mkleehammer/pyodbc/wiki/The-pyodbc-Module#pooling>`_ by
-default, which means connections will be longer lived than they are within
-SQLAlchemy itself. As SQLAlchemy has its own pooling behavior, it is often
-preferable to disable this behavior. This behavior can only be disabled
-globally at the PyODBC module level, **before** any connections are made::
-
- import pyodbc
-
- pyodbc.pooling = False
-
- # don't use the engine before pooling is set to False
- engine = create_engine("mssql+pyodbc://user:pass@dsn")
-
-If this variable is left at its default value of ``True``, **the application
-will continue to maintain active database connections**, even when the
-SQLAlchemy engine itself fully discards a connection or if the engine is
-disposed.
-
-.. seealso::
-
- `pooling <https://github.com/mkleehammer/pyodbc/wiki/The-pyodbc-Module#pooling>`_ -
- in the PyODBC documentation.
-
-Driver / Unicode Support
--------------------------
-
-PyODBC works best with Microsoft ODBC drivers, particularly in the area
-of Unicode support on both Python 2 and Python 3.
-
-Using the FreeTDS ODBC drivers on Linux or OSX with PyODBC is **not**
-recommended; there have been historically many Unicode-related issues
-in this area, including before Microsoft offered ODBC drivers for Linux
-and OSX. Now that Microsoft offers drivers for all platforms, for
-PyODBC support these are recommended. FreeTDS remains relevant for
-non-ODBC drivers such as pymssql where it works very well.
-
-
-Rowcount Support
-----------------
-
-Previous limitations with the SQLAlchemy ORM's "versioned rows" feature with
-Pyodbc have been resolved as of SQLAlchemy 2.0.5. See the notes at
-:ref:`mssql_rowcount_versioning`.
-
-.. _mssql_pyodbc_fastexecutemany:
-
-Fast Executemany Mode
----------------------
-
-The PyODBC driver includes support for a "fast executemany" mode of execution
-which greatly reduces round trips for a DBAPI ``executemany()`` call when using
-Microsoft ODBC drivers, for **limited size batches that fit in memory**. The
-feature is enabled by setting the attribute ``.fast_executemany`` on the DBAPI
-cursor when an executemany call is to be used. The SQLAlchemy PyODBC SQL
-Server dialect supports this parameter by passing the
-``fast_executemany`` parameter to
-:func:`_sa.create_engine` , when using the **Microsoft ODBC driver only**::
-
- engine = create_engine(
- "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
- fast_executemany=True)
-
-.. versionchanged:: 2.0.9 - the ``fast_executemany`` parameter now has its
- intended effect of this PyODBC feature taking effect for all INSERT
- statements that are executed with multiple parameter sets, which don't
- include RETURNING. Previously, SQLAlchemy 2.0's :term:`insertmanyvalues`
- feature would cause ``fast_executemany`` to not be used in most cases
- even if specified.
-
-.. versionadded:: 1.3
-
-.. seealso::
-
- `fast executemany <https://github.com/mkleehammer/pyodbc/wiki/Features-beyond-the-DB-API#fast_executemany>`_
- - on github
-
-.. _mssql_pyodbc_setinputsizes:
-
-Setinputsizes Support
------------------------
-
-As of version 2.0, the pyodbc ``cursor.setinputsizes()`` method is used for
-all statement executions, except for ``cursor.executemany()`` calls when
-fast_executemany=True where it is not supported (assuming
-:ref:`insertmanyvalues <engine_insertmanyvalues>` is kept enabled,
-"fastexecutemany" will not take place for INSERT statements in any case).
-
-The use of ``cursor.setinputsizes()`` can be disabled by passing
-``use_setinputsizes=False`` to :func:`_sa.create_engine`.
-
-When ``use_setinputsizes`` is left at its default of ``True``, the
-specific per-type symbols passed to ``cursor.setinputsizes()`` can be
-programmatically customized using the :meth:`.DialectEvents.do_setinputsizes`
-hook. See that method for usage examples.
-
-.. versionchanged:: 2.0 The mssql+pyodbc dialect now defaults to using
- ``use_setinputsizes=True`` for all statement executions with the exception of
- cursor.executemany() calls when fast_executemany=True. The behavior can
- be turned off by passing ``use_setinputsizes=False`` to
- :func:`_sa.create_engine`.
-
-""" # noqa
-
-
-import datetime
-import decimal
-import re
-import struct
-
-from .base import _MSDateTime
-from .base import _MSUnicode
-from .base import _MSUnicodeText
-from .base import BINARY
-from .base import DATETIMEOFFSET
-from .base import MSDialect
-from .base import MSExecutionContext
-from .base import VARBINARY
-from .json import JSON as _MSJson
-from .json import JSONIndexType as _MSJsonIndexType
-from .json import JSONPathType as _MSJsonPathType
-from ... import exc
-from ... import types as sqltypes
-from ... import util
-from ...connectors.pyodbc import PyODBCConnector
-from ...engine import cursor as _cursor
-
-
-class _ms_numeric_pyodbc:
- """Turns Decimals with adjusted() < 0 or > 7 into strings.
-
- The routines here are needed for older pyodbc versions
- as well as current mxODBC versions.
-
- """
-
- def bind_processor(self, dialect):
- super_process = super().bind_processor(dialect)
-
- if not dialect._need_decimal_fix:
- return super_process
-
- def process(value):
- if self.asdecimal and isinstance(value, decimal.Decimal):
- adjusted = value.adjusted()
- if adjusted < 0:
- return self._small_dec_to_string(value)
- elif adjusted > 7:
- return self._large_dec_to_string(value)
-
- if super_process:
- return super_process(value)
- else:
- return value
-
- return process
-
- # these routines needed for older versions of pyodbc.
- # as of 2.1.8 this logic is integrated.
-
- def _small_dec_to_string(self, value):
- return "%s0.%s%s" % (
- (value < 0 and "-" or ""),
- "0" * (abs(value.adjusted()) - 1),
- "".join([str(nint) for nint in value.as_tuple()[1]]),
- )
-
- def _large_dec_to_string(self, value):
- _int = value.as_tuple()[1]
- if "E" in str(value):
- result = "%s%s%s" % (
- (value < 0 and "-" or ""),
- "".join([str(s) for s in _int]),
- "0" * (value.adjusted() - (len(_int) - 1)),
- )
- else:
- if (len(_int) - 1) > value.adjusted():
- result = "%s%s.%s" % (
- (value < 0 and "-" or ""),
- "".join([str(s) for s in _int][0 : value.adjusted() + 1]),
- "".join([str(s) for s in _int][value.adjusted() + 1 :]),
- )
- else:
- result = "%s%s" % (
- (value < 0 and "-" or ""),
- "".join([str(s) for s in _int][0 : value.adjusted() + 1]),
- )
- return result
-
-
-class _MSNumeric_pyodbc(_ms_numeric_pyodbc, sqltypes.Numeric):
- pass
-
-
-class _MSFloat_pyodbc(_ms_numeric_pyodbc, sqltypes.Float):
- pass
-
-
-class _ms_binary_pyodbc:
- """Wraps binary values in dialect-specific Binary wrapper.
- If the value is null, return a pyodbc-specific BinaryNull
- object to prevent pyODBC [and FreeTDS] from defaulting binary
- NULL types to SQLWCHAR and causing implicit conversion errors.
- """
-
- def bind_processor(self, dialect):
- if dialect.dbapi is None:
- return None
-
- DBAPIBinary = dialect.dbapi.Binary
-
- def process(value):
- if value is not None:
- return DBAPIBinary(value)
- else:
- # pyodbc-specific
- return dialect.dbapi.BinaryNull
-
- return process
-
-
-class _ODBCDateTimeBindProcessor:
- """Add bind processors to handle datetimeoffset behaviors"""
-
- has_tz = False
-
- def bind_processor(self, dialect):
- def process(value):
- if value is None:
- return None
- elif isinstance(value, str):
- # if a string was passed directly, allow it through
- return value
- elif not value.tzinfo or (not self.timezone and not self.has_tz):
- # for DateTime(timezone=False)
- return value
- else:
- # for DATETIMEOFFSET or DateTime(timezone=True)
- #
- # Convert to string format required by T-SQL
- dto_string = value.strftime("%Y-%m-%d %H:%M:%S.%f %z")
- # offset needs a colon, e.g., -0700 -> -07:00
- # "UTC offset in the form (+-)HHMM[SS[.ffffff]]"
- # backend currently rejects seconds / fractional seconds
- dto_string = re.sub(
- r"([\+\-]\d{2})([\d\.]+)$", r"\1:\2", dto_string
- )
- return dto_string
-
- return process
-
-
-class _ODBCDateTime(_ODBCDateTimeBindProcessor, _MSDateTime):
- pass
-
-
-class _ODBCDATETIMEOFFSET(_ODBCDateTimeBindProcessor, DATETIMEOFFSET):
- has_tz = True
-
-
-class _VARBINARY_pyodbc(_ms_binary_pyodbc, VARBINARY):
- pass
-
-
-class _BINARY_pyodbc(_ms_binary_pyodbc, BINARY):
- pass
-
-
-class _String_pyodbc(sqltypes.String):
- def get_dbapi_type(self, dbapi):
- if self.length in (None, "max") or self.length >= 2000:
- return (dbapi.SQL_VARCHAR, 0, 0)
- else:
- return dbapi.SQL_VARCHAR
-
-
-class _Unicode_pyodbc(_MSUnicode):
- def get_dbapi_type(self, dbapi):
- if self.length in (None, "max") or self.length >= 2000:
- return (dbapi.SQL_WVARCHAR, 0, 0)
- else:
- return dbapi.SQL_WVARCHAR
-
-
-class _UnicodeText_pyodbc(_MSUnicodeText):
- def get_dbapi_type(self, dbapi):
- if self.length in (None, "max") or self.length >= 2000:
- return (dbapi.SQL_WVARCHAR, 0, 0)
- else:
- return dbapi.SQL_WVARCHAR
-
-
-class _JSON_pyodbc(_MSJson):
- def get_dbapi_type(self, dbapi):
- return (dbapi.SQL_WVARCHAR, 0, 0)
-
-
-class _JSONIndexType_pyodbc(_MSJsonIndexType):
- def get_dbapi_type(self, dbapi):
- return dbapi.SQL_WVARCHAR
-
-
-class _JSONPathType_pyodbc(_MSJsonPathType):
- def get_dbapi_type(self, dbapi):
- return dbapi.SQL_WVARCHAR
-
-
-class MSExecutionContext_pyodbc(MSExecutionContext):
- _embedded_scope_identity = False
-
- def pre_exec(self):
- """where appropriate, issue "select scope_identity()" in the same
- statement.
-
- Background on why "scope_identity()" is preferable to "@@identity":
- https://msdn.microsoft.com/en-us/library/ms190315.aspx
-
- Background on why we attempt to embed "scope_identity()" into the same
- statement as the INSERT:
- https://code.google.com/p/pyodbc/wiki/FAQs#How_do_I_retrieve_autogenerated/identity_values?
-
- """
-
- super().pre_exec()
-
- # don't embed the scope_identity select into an
- # "INSERT .. DEFAULT VALUES"
- if (
- self._select_lastrowid
- and self.dialect.use_scope_identity
- and len(self.parameters[0])
- ):
- self._embedded_scope_identity = True
-
- self.statement += "; select scope_identity()"
-
- def post_exec(self):
- if self._embedded_scope_identity:
- # Fetch the last inserted id from the manipulated statement
- # We may have to skip over a number of result sets with
- # no data (due to triggers, etc.)
- while True:
- try:
- # fetchall() ensures the cursor is consumed
- # without closing it (FreeTDS particularly)
- rows = self.cursor.fetchall()
- except self.dialect.dbapi.Error:
- # no way around this - nextset() consumes the previous set
- # so we need to just keep flipping
- self.cursor.nextset()
- else:
- if not rows:
- # async adapter drivers just return None here
- self.cursor.nextset()
- continue
- row = rows[0]
- break
-
- self._lastrowid = int(row[0])
-
- self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML
- else:
- super().post_exec()
-
-
-class MSDialect_pyodbc(PyODBCConnector, MSDialect):
- supports_statement_cache = True
-
- # note this parameter is no longer used by the ORM or default dialect
- # see #9414
- supports_sane_rowcount_returning = False
-
- execution_ctx_cls = MSExecutionContext_pyodbc
-
- colspecs = util.update_copy(
- MSDialect.colspecs,
- {
- sqltypes.Numeric: _MSNumeric_pyodbc,
- sqltypes.Float: _MSFloat_pyodbc,
- BINARY: _BINARY_pyodbc,
- # support DateTime(timezone=True)
- sqltypes.DateTime: _ODBCDateTime,
- DATETIMEOFFSET: _ODBCDATETIMEOFFSET,
- # SQL Server dialect has a VARBINARY that is just to support
- # "deprecate_large_types" w/ VARBINARY(max), but also we must
- # handle the usual SQL standard VARBINARY
- VARBINARY: _VARBINARY_pyodbc,
- sqltypes.VARBINARY: _VARBINARY_pyodbc,
- sqltypes.LargeBinary: _VARBINARY_pyodbc,
- sqltypes.String: _String_pyodbc,
- sqltypes.Unicode: _Unicode_pyodbc,
- sqltypes.UnicodeText: _UnicodeText_pyodbc,
- sqltypes.JSON: _JSON_pyodbc,
- sqltypes.JSON.JSONIndexType: _JSONIndexType_pyodbc,
- sqltypes.JSON.JSONPathType: _JSONPathType_pyodbc,
- # this excludes Enum from the string/VARCHAR thing for now
- # it looks like Enum's adaptation doesn't really support the
- # String type itself having a dialect-level impl
- sqltypes.Enum: sqltypes.Enum,
- },
- )
-
- def __init__(
- self,
- fast_executemany=False,
- use_setinputsizes=True,
- **params,
- ):
- super().__init__(use_setinputsizes=use_setinputsizes, **params)
- self.use_scope_identity = (
- self.use_scope_identity
- and self.dbapi
- and hasattr(self.dbapi.Cursor, "nextset")
- )
- self._need_decimal_fix = self.dbapi and self._dbapi_version() < (
- 2,
- 1,
- 8,
- )
- self.fast_executemany = fast_executemany
- if fast_executemany:
- self.use_insertmanyvalues_wo_returning = False
-
- def _get_server_version_info(self, connection):
- try:
- # "Version of the instance of SQL Server, in the form
- # of 'major.minor.build.revision'"
- raw = connection.exec_driver_sql(
- "SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)"
- ).scalar()
- except exc.DBAPIError:
- # SQL Server docs indicate this function isn't present prior to
- # 2008. Before we had the VARCHAR cast above, pyodbc would also
- # fail on this query.
- return super()._get_server_version_info(connection)
- else:
- version = []
- r = re.compile(r"[.\-]")
- for n in r.split(raw):
- try:
- version.append(int(n))
- except ValueError:
- pass
- return tuple(version)
-
- def on_connect(self):
- super_ = super().on_connect()
-
- def on_connect(conn):
- if super_ is not None:
- super_(conn)
-
- self._setup_timestampoffset_type(conn)
-
- return on_connect
-
- def _setup_timestampoffset_type(self, connection):
- # output converter function for datetimeoffset
- def _handle_datetimeoffset(dto_value):
- tup = struct.unpack("<6hI2h", dto_value)
- return datetime.datetime(
- tup[0],
- tup[1],
- tup[2],
- tup[3],
- tup[4],
- tup[5],
- tup[6] // 1000,
- datetime.timezone(
- datetime.timedelta(hours=tup[7], minutes=tup[8])
- ),
- )
-
- odbc_SQL_SS_TIMESTAMPOFFSET = -155 # as defined in SQLNCLI.h
- connection.add_output_converter(
- odbc_SQL_SS_TIMESTAMPOFFSET, _handle_datetimeoffset
- )
-
- def do_executemany(self, cursor, statement, parameters, context=None):
- if self.fast_executemany:
- cursor.fast_executemany = True
- super().do_executemany(cursor, statement, parameters, context=context)
-
- def is_disconnect(self, e, connection, cursor):
- if isinstance(e, self.dbapi.Error):
- code = e.args[0]
- if code in {
- "08S01",
- "01000",
- "01002",
- "08003",
- "08007",
- "08S02",
- "08001",
- "HYT00",
- "HY010",
- "10054",
- }:
- return True
- return super().is_disconnect(e, connection, cursor)
-
-
-dialect = MSDialect_pyodbc