diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql | |
parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql')
16 files changed, 5571 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__init__.py new file mode 100644 index 0000000..19ab7c4 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__init__.py @@ -0,0 +1,88 @@ +# dialects/mssql/__init__.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +from . import aioodbc # noqa +from . import base # noqa +from . import pymssql # noqa +from . import pyodbc # noqa +from .base import BIGINT +from .base import BINARY +from .base import BIT +from .base import CHAR +from .base import DATE +from .base import DATETIME +from .base import DATETIME2 +from .base import DATETIMEOFFSET +from .base import DECIMAL +from .base import DOUBLE_PRECISION +from .base import FLOAT +from .base import IMAGE +from .base import INTEGER +from .base import JSON +from .base import MONEY +from .base import NCHAR +from .base import NTEXT +from .base import NUMERIC +from .base import NVARCHAR +from .base import REAL +from .base import ROWVERSION +from .base import SMALLDATETIME +from .base import SMALLINT +from .base import SMALLMONEY +from .base import SQL_VARIANT +from .base import TEXT +from .base import TIME +from .base import TIMESTAMP +from .base import TINYINT +from .base import UNIQUEIDENTIFIER +from .base import VARBINARY +from .base import VARCHAR +from .base import XML +from ...sql import try_cast + + +base.dialect = dialect = pyodbc.dialect + + +__all__ = ( + "JSON", + "INTEGER", + "BIGINT", + "SMALLINT", + "TINYINT", + "VARCHAR", + "NVARCHAR", + "CHAR", + "NCHAR", + "TEXT", + "NTEXT", + "DECIMAL", + "NUMERIC", + "FLOAT", + "DATETIME", + "DATETIME2", + "DATETIMEOFFSET", + "DATE", + "DOUBLE_PRECISION", + "TIME", + "SMALLDATETIME", + "BINARY", + "VARBINARY", + "BIT", + "REAL", + "IMAGE", + "TIMESTAMP", + "ROWVERSION", + "MONEY", + "SMALLMONEY", + "UNIQUEIDENTIFIER", + "SQL_VARIANT", + "XML", + "dialect", + "try_cast", +) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..225eebc --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/aioodbc.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/aioodbc.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..3a0585c --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/aioodbc.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/base.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..527fcdf --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/base.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/information_schema.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/information_schema.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..88bc790 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/information_schema.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/json.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/json.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..6312e58 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/json.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/provision.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/provision.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..fd72045 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/provision.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pymssql.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pymssql.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..84555dc --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pymssql.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pyodbc.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pyodbc.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..19ecd43 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/__pycache__/pyodbc.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/aioodbc.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/aioodbc.py new file mode 100644 index 0000000..65945d9 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/aioodbc.py @@ -0,0 +1,64 @@ +# dialects/mssql/aioodbc.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors +r""" +.. dialect:: mssql+aioodbc + :name: aioodbc + :dbapi: aioodbc + :connectstring: mssql+aioodbc://<username>:<password>@<dsnname> + :url: https://pypi.org/project/aioodbc/ + + +Support for the SQL Server database in asyncio style, using the aioodbc +driver which itself is a thread-wrapper around pyodbc. + +.. versionadded:: 2.0.23 Added the mssql+aioodbc dialect which builds + on top of the pyodbc and general aio* dialect architecture. + +Using a special asyncio mediation layer, the aioodbc dialect is usable +as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>` +extension package. + +Most behaviors and caveats for this driver are the same as that of the +pyodbc dialect used on SQL Server; see :ref:`mssql_pyodbc` for general +background. + +This dialect should normally be used only with the +:func:`_asyncio.create_async_engine` engine creation function; connection +styles are otherwise equivalent to those documented in the pyodbc section:: + + from sqlalchemy.ext.asyncio import create_async_engine + engine = create_async_engine( + "mssql+aioodbc://scott:tiger@mssql2017:1433/test?" + "driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes" + ) + + + +""" + +from __future__ import annotations + +from .pyodbc import MSDialect_pyodbc +from .pyodbc import MSExecutionContext_pyodbc +from ...connectors.aioodbc import aiodbcConnector + + +class MSExecutionContext_aioodbc(MSExecutionContext_pyodbc): + def create_server_side_cursor(self): + return self._dbapi_connection.cursor(server_side=True) + + +class MSDialectAsync_aioodbc(aiodbcConnector, MSDialect_pyodbc): + driver = "aioodbc" + + supports_statement_cache = True + + execution_ctx_cls = MSExecutionContext_aioodbc + + +dialect = MSDialectAsync_aioodbc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py new file mode 100644 index 0000000..872f858 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py @@ -0,0 +1,4007 @@ +# dialects/mssql/base.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +""" +.. dialect:: mssql + :name: Microsoft SQL Server + :full_support: 2017 + :normal_support: 2012+ + :best_effort: 2005+ + +.. _mssql_external_dialects: + +External Dialects +----------------- + +In addition to the above DBAPI layers with native SQLAlchemy support, there +are third-party dialects for other DBAPI layers that are compatible +with SQL Server. See the "External Dialects" list on the +:ref:`dialect_toplevel` page. + +.. _mssql_identity: + +Auto Increment Behavior / IDENTITY Columns +------------------------------------------ + +SQL Server provides so-called "auto incrementing" behavior using the +``IDENTITY`` construct, which can be placed on any single integer column in a +table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement" +behavior for an integer primary key column, described at +:paramref:`_schema.Column.autoincrement`. This means that by default, +the first integer primary key column in a :class:`_schema.Table` will be +considered to be the identity column - unless it is associated with a +:class:`.Sequence` - and will generate DDL as such:: + + from sqlalchemy import Table, MetaData, Column, Integer + + m = MetaData() + t = Table('t', m, + Column('id', Integer, primary_key=True), + Column('x', Integer)) + m.create_all(engine) + +The above example will generate DDL as: + +.. sourcecode:: sql + + CREATE TABLE t ( + id INTEGER NOT NULL IDENTITY, + x INTEGER NULL, + PRIMARY KEY (id) + ) + +For the case where this default generation of ``IDENTITY`` is not desired, +specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag, +on the first integer primary key column:: + + m = MetaData() + t = Table('t', m, + Column('id', Integer, primary_key=True, autoincrement=False), + Column('x', Integer)) + m.create_all(engine) + +To add the ``IDENTITY`` keyword to a non-primary key column, specify +``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired +:class:`_schema.Column` object, and ensure that +:paramref:`_schema.Column.autoincrement` +is set to ``False`` on any integer primary key column:: + + m = MetaData() + t = Table('t', m, + Column('id', Integer, primary_key=True, autoincrement=False), + Column('x', Integer, autoincrement=True)) + m.create_all(engine) + +.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct + in a :class:`_schema.Column` to specify the start and increment + parameters of an IDENTITY. These replace + the use of the :class:`.Sequence` object in order to specify these values. + +.. deprecated:: 1.4 + + The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters + to :class:`_schema.Column` are deprecated and should we replaced by + an :class:`_schema.Identity` object. Specifying both ways of configuring + an IDENTITY will result in a compile error. + These options are also no longer returned as part of the + ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`. + Use the information in the ``identity`` key instead. + +.. deprecated:: 1.3 + + The use of :class:`.Sequence` to specify IDENTITY characteristics is + deprecated and will be removed in a future release. Please use + the :class:`_schema.Identity` object parameters + :paramref:`_schema.Identity.start` and + :paramref:`_schema.Identity.increment`. + +.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence` + object to modify IDENTITY characteristics. :class:`.Sequence` objects + now only manipulate true T-SQL SEQUENCE types. + +.. note:: + + There can only be one IDENTITY column on the table. When using + ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not + guard against multiple columns specifying the option simultaneously. The + SQL Server database will instead reject the ``CREATE TABLE`` statement. + +.. note:: + + An INSERT statement which attempts to provide a value for a column that is + marked with IDENTITY will be rejected by SQL Server. In order for the + value to be accepted, a session-level option "SET IDENTITY_INSERT" must be + enabled. The SQLAlchemy SQL Server dialect will perform this operation + automatically when using a core :class:`_expression.Insert` + construct; if the + execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT" + option will be enabled for the span of that statement's invocation.However, + this scenario is not high performing and should not be relied upon for + normal use. If a table doesn't actually require IDENTITY behavior in its + integer primary key column, the keyword should be disabled when creating + the table by ensuring that ``autoincrement=False`` is set. + +Controlling "Start" and "Increment" +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Specific control over the "start" and "increment" values for +the ``IDENTITY`` generator are provided using the +:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment` +parameters passed to the :class:`_schema.Identity` object:: + + from sqlalchemy import Table, Integer, Column, Identity + + test = Table( + 'test', metadata, + Column( + 'id', + Integer, + primary_key=True, + Identity(start=100, increment=10) + ), + Column('name', String(20)) + ) + +The CREATE TABLE for the above :class:`_schema.Table` object would be: + +.. sourcecode:: sql + + CREATE TABLE test ( + id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY, + name VARCHAR(20) NULL, + ) + +.. note:: + + The :class:`_schema.Identity` object supports many other parameter in + addition to ``start`` and ``increment``. These are not supported by + SQL Server and will be ignored when generating the CREATE TABLE ddl. + +.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is + now used to affect the + ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server. + Previously, the :class:`.Sequence` object was used. As SQL Server now + supports real sequences as a separate construct, :class:`.Sequence` will be + functional in the normal way starting from SQLAlchemy version 1.4. + + +Using IDENTITY with Non-Integer numeric types +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To +implement this pattern smoothly in SQLAlchemy, the primary datatype of the +column should remain as ``Integer``, however the underlying implementation +type deployed to the SQL Server database can be specified as ``Numeric`` using +:meth:`.TypeEngine.with_variant`:: + + from sqlalchemy import Column + from sqlalchemy import Integer + from sqlalchemy import Numeric + from sqlalchemy import String + from sqlalchemy.ext.declarative import declarative_base + + Base = declarative_base() + + class TestTable(Base): + __tablename__ = "test" + id = Column( + Integer().with_variant(Numeric(10, 0), "mssql"), + primary_key=True, + autoincrement=True, + ) + name = Column(String) + +In the above example, ``Integer().with_variant()`` provides clear usage +information that accurately describes the intent of the code. The general +restriction that ``autoincrement`` only applies to ``Integer`` is established +at the metadata level and not at the per-dialect level. + +When using the above pattern, the primary key identifier that comes back from +the insertion of a row, which is also the value that would be assigned to an +ORM object such as ``TestTable`` above, will be an instance of ``Decimal()`` +and not ``int`` when using SQL Server. The numeric return type of the +:class:`_types.Numeric` type can be changed to return floats by passing False +to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the +above ``Numeric(10, 0)`` to return Python ints (which also support "long" +integer values in Python 3), use :class:`_types.TypeDecorator` as follows:: + + from sqlalchemy import TypeDecorator + + class NumericAsInteger(TypeDecorator): + '''normalize floating point return values into ints''' + + impl = Numeric(10, 0, asdecimal=False) + cache_ok = True + + def process_result_value(self, value, dialect): + if value is not None: + value = int(value) + return value + + class TestTable(Base): + __tablename__ = "test" + id = Column( + Integer().with_variant(NumericAsInteger, "mssql"), + primary_key=True, + autoincrement=True, + ) + name = Column(String) + +.. _mssql_insert_behavior: + +INSERT behavior +^^^^^^^^^^^^^^^^ + +Handling of the ``IDENTITY`` column at INSERT time involves two key +techniques. The most common is being able to fetch the "last inserted value" +for a given ``IDENTITY`` column, a process which SQLAlchemy performs +implicitly in many cases, most importantly within the ORM. + +The process for fetching this value has several variants: + +* In the vast majority of cases, RETURNING is used in conjunction with INSERT + statements on SQL Server in order to get newly generated primary key values: + + .. sourcecode:: sql + + INSERT INTO t (x) OUTPUT inserted.id VALUES (?) + + As of SQLAlchemy 2.0, the :ref:`engine_insertmanyvalues` feature is also + used by default to optimize many-row INSERT statements; for SQL Server + the feature takes place for both RETURNING and-non RETURNING + INSERT statements. + + .. versionchanged:: 2.0.10 The :ref:`engine_insertmanyvalues` feature for + SQL Server was temporarily disabled for SQLAlchemy version 2.0.9 due to + issues with row ordering. As of 2.0.10 the feature is re-enabled, with + special case handling for the unit of work's requirement for RETURNING to + be ordered. + +* When RETURNING is not available or has been disabled via + ``implicit_returning=False``, either the ``scope_identity()`` function or + the ``@@identity`` variable is used; behavior varies by backend: + + * when using PyODBC, the phrase ``; select scope_identity()`` will be + appended to the end of the INSERT statement; a second result set will be + fetched in order to receive the value. Given a table as:: + + t = Table( + 't', + metadata, + Column('id', Integer, primary_key=True), + Column('x', Integer), + implicit_returning=False + ) + + an INSERT will look like: + + .. sourcecode:: sql + + INSERT INTO t (x) VALUES (?); select scope_identity() + + * Other dialects such as pymssql will call upon + ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT + statement. If the flag ``use_scope_identity=False`` is passed to + :func:`_sa.create_engine`, + the statement ``SELECT @@identity AS lastrowid`` + is used instead. + +A table that contains an ``IDENTITY`` column will prohibit an INSERT statement +that refers to the identity column explicitly. The SQLAlchemy dialect will +detect when an INSERT construct, created using a core +:func:`_expression.insert` +construct (not a plain string SQL), refers to the identity column, and +in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert +statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the +execution. Given this example:: + + m = MetaData() + t = Table('t', m, Column('id', Integer, primary_key=True), + Column('x', Integer)) + m.create_all(engine) + + with engine.begin() as conn: + conn.execute(t.insert(), {'id': 1, 'x':1}, {'id':2, 'x':2}) + +The above column will be created with IDENTITY, however the INSERT statement +we emit is specifying explicit values. In the echo output we can see +how SQLAlchemy handles this: + +.. sourcecode:: sql + + CREATE TABLE t ( + id INTEGER NOT NULL IDENTITY(1,1), + x INTEGER NULL, + PRIMARY KEY (id) + ) + + COMMIT + SET IDENTITY_INSERT t ON + INSERT INTO t (id, x) VALUES (?, ?) + ((1, 1), (2, 2)) + SET IDENTITY_INSERT t OFF + COMMIT + + + +This is an auxiliary use case suitable for testing and bulk insert scenarios. + +SEQUENCE support +---------------- + +The :class:`.Sequence` object creates "real" sequences, i.e., +``CREATE SEQUENCE``: + +.. sourcecode:: pycon+sql + + >>> from sqlalchemy import Sequence + >>> from sqlalchemy.schema import CreateSequence + >>> from sqlalchemy.dialects import mssql + >>> print(CreateSequence(Sequence("my_seq", start=1)).compile(dialect=mssql.dialect())) + {printsql}CREATE SEQUENCE my_seq START WITH 1 + +For integer primary key generation, SQL Server's ``IDENTITY`` construct should +generally be preferred vs. sequence. + +.. tip:: + + The default start value for T-SQL is ``-2**63`` instead of 1 as + in most other SQL databases. Users should explicitly set the + :paramref:`.Sequence.start` to 1 if that's the expected default:: + + seq = Sequence("my_sequence", start=1) + +.. versionadded:: 1.4 added SQL Server support for :class:`.Sequence` + +.. versionchanged:: 2.0 The SQL Server dialect will no longer implicitly + render "START WITH 1" for ``CREATE SEQUENCE``, which was the behavior + first implemented in version 1.4. + +MAX on VARCHAR / NVARCHAR +------------------------- + +SQL Server supports the special string "MAX" within the +:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes, +to indicate "maximum length possible". The dialect currently handles this as +a length of "None" in the base type, rather than supplying a +dialect-specific version of these types, so that a base type +specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on +more than one backend without using dialect-specific types. + +To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None:: + + my_table = Table( + 'my_table', metadata, + Column('my_data', VARCHAR(None)), + Column('my_n_data', NVARCHAR(None)) + ) + + +Collation Support +----------------- + +Character collations are supported by the base string types, +specified by the string argument "collation":: + + from sqlalchemy import VARCHAR + Column('login', VARCHAR(32, collation='Latin1_General_CI_AS')) + +When such a column is associated with a :class:`_schema.Table`, the +CREATE TABLE statement for this column will yield:: + + login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL + +LIMIT/OFFSET Support +-------------------- + +MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the +"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these +syntaxes automatically if SQL Server 2012 or greater is detected. + +.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and + "FETCH NEXT n ROWS" syntax. + +For statements that specify only LIMIT and no OFFSET, all versions of SQL +Server support the TOP keyword. This syntax is used for all SQL Server +versions when no OFFSET clause is present. A statement such as:: + + select(some_table).limit(5) + +will render similarly to:: + + SELECT TOP 5 col1, col2.. FROM table + +For versions of SQL Server prior to SQL Server 2012, a statement that uses +LIMIT and OFFSET, or just OFFSET alone, will be rendered using the +``ROW_NUMBER()`` window function. A statement such as:: + + select(some_table).order_by(some_table.c.col3).limit(5).offset(10) + +will render similarly to:: + + SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2, + ROW_NUMBER() OVER (ORDER BY col3) AS + mssql_rn FROM table WHERE t.x = :x_1) AS + anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1 + +Note that when using LIMIT and/or OFFSET, whether using the older +or newer SQL Server syntaxes, the statement must have an ORDER BY as well, +else a :class:`.CompileError` is raised. + +.. _mssql_comment_support: + +DDL Comment Support +-------------------- + +Comment support, which includes DDL rendering for attributes such as +:paramref:`_schema.Table.comment` and :paramref:`_schema.Column.comment`, as +well as the ability to reflect these comments, is supported assuming a +supported version of SQL Server is in use. If a non-supported version such as +Azure Synapse is detected at first-connect time (based on the presence +of the ``fn_listextendedproperty`` SQL function), comment support including +rendering and table-comment reflection is disabled, as both features rely upon +SQL Server stored procedures and functions that are not available on all +backend types. + +To force comment support to be on or off, bypassing autodetection, set the +parameter ``supports_comments`` within :func:`_sa.create_engine`:: + + e = create_engine("mssql+pyodbc://u:p@dsn", supports_comments=False) + +.. versionadded:: 2.0 Added support for table and column comments for + the SQL Server dialect, including DDL generation and reflection. + +.. _mssql_isolation_level: + +Transaction Isolation Level +--------------------------- + +All SQL Server dialects support setting of transaction isolation level +both via a dialect-specific parameter +:paramref:`_sa.create_engine.isolation_level` +accepted by :func:`_sa.create_engine`, +as well as the :paramref:`.Connection.execution_options.isolation_level` +argument as passed to +:meth:`_engine.Connection.execution_options`. +This feature works by issuing the +command ``SET TRANSACTION ISOLATION LEVEL <level>`` for +each new connection. + +To set isolation level using :func:`_sa.create_engine`:: + + engine = create_engine( + "mssql+pyodbc://scott:tiger@ms_2008", + isolation_level="REPEATABLE READ" + ) + +To set using per-connection execution options:: + + connection = engine.connect() + connection = connection.execution_options( + isolation_level="READ COMMITTED" + ) + +Valid values for ``isolation_level`` include: + +* ``AUTOCOMMIT`` - pyodbc / pymssql-specific +* ``READ COMMITTED`` +* ``READ UNCOMMITTED`` +* ``REPEATABLE READ`` +* ``SERIALIZABLE`` +* ``SNAPSHOT`` - specific to SQL Server + +There are also more options for isolation level configurations, such as +"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply +different isolation level settings. See the discussion at +:ref:`dbapi_autocommit` for background. + +.. seealso:: + + :ref:`dbapi_autocommit` + +.. _mssql_reset_on_return: + +Temporary Table / Resource Reset for Connection Pooling +------------------------------------------------------- + +The :class:`.QueuePool` connection pool implementation used +by the SQLAlchemy :class:`.Engine` object includes +:ref:`reset on return <pool_reset_on_return>` behavior that will invoke +the DBAPI ``.rollback()`` method when connections are returned to the pool. +While this rollback will clear out the immediate state used by the previous +transaction, it does not cover a wider range of session-level state, including +temporary tables as well as other server state such as prepared statement +handles and statement caches. An undocumented SQL Server procedure known +as ``sp_reset_connection`` is known to be a workaround for this issue which +will reset most of the session state that builds up on a connection, including +temporary tables. + +To install ``sp_reset_connection`` as the means of performing reset-on-return, +the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the +example below. The :paramref:`_sa.create_engine.pool_reset_on_return` parameter +is set to ``None`` so that the custom scheme can replace the default behavior +completely. The custom hook implementation calls ``.rollback()`` in any case, +as it's usually important that the DBAPI's own tracking of commit/rollback +will remain consistent with the state of the transaction:: + + from sqlalchemy import create_engine + from sqlalchemy import event + + mssql_engine = create_engine( + "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", + + # disable default reset-on-return scheme + pool_reset_on_return=None, + ) + + + @event.listens_for(mssql_engine, "reset") + def _reset_mssql(dbapi_connection, connection_record, reset_state): + if not reset_state.terminate_only: + dbapi_connection.execute("{call sys.sp_reset_connection}") + + # so that the DBAPI itself knows that the connection has been + # reset + dbapi_connection.rollback() + +.. versionchanged:: 2.0.0b3 Added additional state arguments to + the :meth:`.PoolEvents.reset` event and additionally ensured the event + is invoked for all "reset" occurrences, so that it's appropriate + as a place for custom "reset" handlers. Previous schemes which + use the :meth:`.PoolEvents.checkin` handler remain usable as well. + +.. seealso:: + + :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation + +Nullability +----------- +MSSQL has support for three levels of column nullability. The default +nullability allows nulls and is explicit in the CREATE TABLE +construct:: + + name VARCHAR(20) NULL + +If ``nullable=None`` is specified then no specification is made. In +other words the database's configured default is used. This will +render:: + + name VARCHAR(20) + +If ``nullable`` is ``True`` or ``False`` then the column will be +``NULL`` or ``NOT NULL`` respectively. + +Date / Time Handling +-------------------- +DATE and TIME are supported. Bind parameters are converted +to datetime.datetime() objects as required by most MSSQL drivers, +and results are processed from strings if needed. +The DATE and TIME types are not available for MSSQL 2005 and +previous - if a server version below 2008 is detected, DDL +for these types will be issued as DATETIME. + +.. _mssql_large_type_deprecation: + +Large Text/Binary Type Deprecation +---------------------------------- + +Per +`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_, +the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL +Server in a future release. SQLAlchemy normally relates these types to the +:class:`.UnicodeText`, :class:`_expression.TextClause` and +:class:`.LargeBinary` datatypes. + +In order to accommodate this change, a new flag ``deprecate_large_types`` +is added to the dialect, which will be automatically set based on detection +of the server version in use, if not otherwise set by the user. The +behavior of this flag is as follows: + +* When this flag is ``True``, the :class:`.UnicodeText`, + :class:`_expression.TextClause` and + :class:`.LargeBinary` datatypes, when used to render DDL, will render the + types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``, + respectively. This is a new behavior as of the addition of this flag. + +* When this flag is ``False``, the :class:`.UnicodeText`, + :class:`_expression.TextClause` and + :class:`.LargeBinary` datatypes, when used to render DDL, will render the + types ``NTEXT``, ``TEXT``, and ``IMAGE``, + respectively. This is the long-standing behavior of these types. + +* The flag begins with the value ``None``, before a database connection is + established. If the dialect is used to render DDL without the flag being + set, it is interpreted the same as ``False``. + +* On first connection, the dialect detects if SQL Server version 2012 or + greater is in use; if the flag is still at ``None``, it sets it to ``True`` + or ``False`` based on whether 2012 or greater is detected. + +* The flag can be set to either ``True`` or ``False`` when the dialect + is created, typically via :func:`_sa.create_engine`:: + + eng = create_engine("mssql+pymssql://user:pass@host/db", + deprecate_large_types=True) + +* Complete control over whether the "old" or "new" types are rendered is + available in all SQLAlchemy versions by using the UPPERCASE type objects + instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`, + :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`, + :class:`_mssql.IMAGE` + will always remain fixed and always output exactly that + type. + +.. _multipart_schema_names: + +Multipart Schema Names +---------------------- + +SQL Server schemas sometimes require multiple parts to their "schema" +qualifier, that is, including the database name and owner name as separate +tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set +at once using the :paramref:`_schema.Table.schema` argument of +:class:`_schema.Table`:: + + Table( + "some_table", metadata, + Column("q", String(50)), + schema="mydatabase.dbo" + ) + +When performing operations such as table or component reflection, a schema +argument that contains a dot will be split into separate +"database" and "owner" components in order to correctly query the SQL +Server information schema tables, as these two values are stored separately. +Additionally, when rendering the schema name for DDL or SQL, the two +components will be quoted separately for case sensitive names and other +special characters. Given an argument as below:: + + Table( + "some_table", metadata, + Column("q", String(50)), + schema="MyDataBase.dbo" + ) + +The above schema would be rendered as ``[MyDataBase].dbo``, and also in +reflection, would be reflected using "dbo" as the owner and "MyDataBase" +as the database name. + +To control how the schema name is broken into database / owner, +specify brackets (which in SQL Server are quoting characters) in the name. +Below, the "owner" will be considered as ``MyDataBase.dbo`` and the +"database" will be None:: + + Table( + "some_table", metadata, + Column("q", String(50)), + schema="[MyDataBase.dbo]" + ) + +To individually specify both database and owner name with special characters +or embedded dots, use two sets of brackets:: + + Table( + "some_table", metadata, + Column("q", String(50)), + schema="[MyDataBase.Period].[MyOwner.Dot]" + ) + + +.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as + identifier delimiters splitting the schema into separate database + and owner tokens, to allow dots within either name itself. + +.. _legacy_schema_rendering: + +Legacy Schema Mode +------------------ + +Very old versions of the MSSQL dialect introduced the behavior such that a +schema-qualified table would be auto-aliased when used in a +SELECT statement; given a table:: + + account_table = Table( + 'account', metadata, + Column('id', Integer, primary_key=True), + Column('info', String(100)), + schema="customer_schema" + ) + +this legacy mode of rendering would assume that "customer_schema.account" +would not be accepted by all parts of the SQL statement, as illustrated +below: + +.. sourcecode:: pycon+sql + + >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True) + >>> print(account_table.select().compile(eng)) + {printsql}SELECT account_1.id, account_1.info + FROM customer_schema.account AS account_1 + +This mode of behavior is now off by default, as it appears to have served +no purpose; however in the case that legacy applications rely upon it, +it is available using the ``legacy_schema_aliasing`` argument to +:func:`_sa.create_engine` as illustrated above. + +.. deprecated:: 1.4 + + The ``legacy_schema_aliasing`` flag is now + deprecated and will be removed in a future release. + +.. _mssql_indexes: + +Clustered Index Support +----------------------- + +The MSSQL dialect supports clustered indexes (and primary keys) via the +``mssql_clustered`` option. This option is available to :class:`.Index`, +:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`. +For indexes this option can be combined with the ``mssql_columnstore`` one +to create a clustered columnstore index. + +To generate a clustered index:: + + Index("my_index", table.c.x, mssql_clustered=True) + +which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``. + +To generate a clustered primary key use:: + + Table('my_table', metadata, + Column('x', ...), + Column('y', ...), + PrimaryKeyConstraint("x", "y", mssql_clustered=True)) + +which will render the table, for example, as:: + + CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, + PRIMARY KEY CLUSTERED (x, y)) + +Similarly, we can generate a clustered unique constraint using:: + + Table('my_table', metadata, + Column('x', ...), + Column('y', ...), + PrimaryKeyConstraint("x"), + UniqueConstraint("y", mssql_clustered=True), + ) + +To explicitly request a non-clustered primary key (for example, when +a separate clustered index is desired), use:: + + Table('my_table', metadata, + Column('x', ...), + Column('y', ...), + PrimaryKeyConstraint("x", "y", mssql_clustered=False)) + +which will render the table, for example, as:: + + CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, + PRIMARY KEY NONCLUSTERED (x, y)) + +Columnstore Index Support +------------------------- + +The MSSQL dialect supports columnstore indexes via the ``mssql_columnstore`` +option. This option is available to :class:`.Index`. It be combined with +the ``mssql_clustered`` option to create a clustered columnstore index. + +To generate a columnstore index:: + + Index("my_index", table.c.x, mssql_columnstore=True) + +which renders the index as ``CREATE COLUMNSTORE INDEX my_index ON table (x)``. + +To generate a clustered columnstore index provide no columns:: + + idx = Index("my_index", mssql_clustered=True, mssql_columnstore=True) + # required to associate the index with the table + table.append_constraint(idx) + +the above renders the index as +``CREATE CLUSTERED COLUMNSTORE INDEX my_index ON table``. + +.. versionadded:: 2.0.18 + +MSSQL-Specific Index Options +----------------------------- + +In addition to clustering, the MSSQL dialect supports other special options +for :class:`.Index`. + +INCLUDE +^^^^^^^ + +The ``mssql_include`` option renders INCLUDE(colname) for the given string +names:: + + Index("my_index", table.c.x, mssql_include=['y']) + +would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` + +.. _mssql_index_where: + +Filtered Indexes +^^^^^^^^^^^^^^^^ + +The ``mssql_where`` option renders WHERE(condition) for the given string +names:: + + Index("my_index", table.c.x, mssql_where=table.c.x > 10) + +would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``. + +.. versionadded:: 1.3.4 + +Index ordering +^^^^^^^^^^^^^^ + +Index ordering is available via functional expressions, such as:: + + Index("my_index", table.c.x.desc()) + +would render the index as ``CREATE INDEX my_index ON table (x DESC)`` + +.. seealso:: + + :ref:`schema_indexes_functional` + +Compatibility Levels +-------------------- +MSSQL supports the notion of setting compatibility levels at the +database level. This allows, for instance, to run a database that +is compatible with SQL2000 while running on a SQL2005 database +server. ``server_version_info`` will always return the database +server version information (in this case SQL2005) and not the +compatibility level information. Because of this, if running under +a backwards compatibility mode SQLAlchemy may attempt to use T-SQL +statements that are unable to be parsed by the database server. + +.. _mssql_triggers: + +Triggers +-------- + +SQLAlchemy by default uses OUTPUT INSERTED to get at newly +generated primary key values via IDENTITY columns or other +server side defaults. MS-SQL does not +allow the usage of OUTPUT INSERTED on tables that have triggers. +To disable the usage of OUTPUT INSERTED on a per-table basis, +specify ``implicit_returning=False`` for each :class:`_schema.Table` +which has triggers:: + + Table('mytable', metadata, + Column('id', Integer, primary_key=True), + # ..., + implicit_returning=False + ) + +Declarative form:: + + class MyClass(Base): + # ... + __table_args__ = {'implicit_returning':False} + + +.. _mssql_rowcount_versioning: + +Rowcount Support / ORM Versioning +--------------------------------- + +The SQL Server drivers may have limited ability to return the number +of rows updated from an UPDATE or DELETE statement. + +As of this writing, the PyODBC driver is not able to return a rowcount when +OUTPUT INSERTED is used. Previous versions of SQLAlchemy therefore had +limitations for features such as the "ORM Versioning" feature that relies upon +accurate rowcounts in order to match version numbers with matched rows. + +SQLAlchemy 2.0 now retrieves the "rowcount" manually for these particular use +cases based on counting the rows that arrived back within RETURNING; so while +the driver still has this limitation, the ORM Versioning feature is no longer +impacted by it. As of SQLAlchemy 2.0.5, ORM versioning has been fully +re-enabled for the pyodbc driver. + +.. versionchanged:: 2.0.5 ORM versioning support is restored for the pyodbc + driver. Previously, a warning would be emitted during ORM flush that + versioning was not supported. + + +Enabling Snapshot Isolation +--------------------------- + +SQL Server has a default transaction +isolation mode that locks entire tables, and causes even mildly concurrent +applications to have long held locks and frequent deadlocks. +Enabling snapshot isolation for the database as a whole is recommended +for modern levels of concurrency support. This is accomplished via the +following ALTER DATABASE commands executed at the SQL prompt:: + + ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON + + ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON + +Background on SQL Server snapshot isolation is available at +https://msdn.microsoft.com/en-us/library/ms175095.aspx. + +""" # noqa + +from __future__ import annotations + +import codecs +import datetime +import operator +import re +from typing import overload +from typing import TYPE_CHECKING +from uuid import UUID as _python_UUID + +from . import information_schema as ischema +from .json import JSON +from .json import JSONIndexType +from .json import JSONPathType +from ... import exc +from ... import Identity +from ... import schema as sa_schema +from ... import Sequence +from ... import sql +from ... import text +from ... import util +from ...engine import cursor as _cursor +from ...engine import default +from ...engine import reflection +from ...engine.reflection import ReflectionDefaults +from ...sql import coercions +from ...sql import compiler +from ...sql import elements +from ...sql import expression +from ...sql import func +from ...sql import quoted_name +from ...sql import roles +from ...sql import sqltypes +from ...sql import try_cast as try_cast # noqa: F401 +from ...sql import util as sql_util +from ...sql._typing import is_sql_compiler +from ...sql.compiler import InsertmanyvaluesSentinelOpts +from ...sql.elements import TryCast as TryCast # noqa: F401 +from ...types import BIGINT +from ...types import BINARY +from ...types import CHAR +from ...types import DATE +from ...types import DATETIME +from ...types import DECIMAL +from ...types import FLOAT +from ...types import INTEGER +from ...types import NCHAR +from ...types import NUMERIC +from ...types import NVARCHAR +from ...types import SMALLINT +from ...types import TEXT +from ...types import VARCHAR +from ...util import update_wrapper +from ...util.typing import Literal + +if TYPE_CHECKING: + from ...sql.dml import DMLState + from ...sql.selectable import TableClause + +# https://sqlserverbuilds.blogspot.com/ +MS_2017_VERSION = (14,) +MS_2016_VERSION = (13,) +MS_2014_VERSION = (12,) +MS_2012_VERSION = (11,) +MS_2008_VERSION = (10,) +MS_2005_VERSION = (9,) +MS_2000_VERSION = (8,) + +RESERVED_WORDS = { + "add", + "all", + "alter", + "and", + "any", + "as", + "asc", + "authorization", + "backup", + "begin", + "between", + "break", + "browse", + "bulk", + "by", + "cascade", + "case", + "check", + "checkpoint", + "close", + "clustered", + "coalesce", + "collate", + "column", + "commit", + "compute", + "constraint", + "contains", + "containstable", + "continue", + "convert", + "create", + "cross", + "current", + "current_date", + "current_time", + "current_timestamp", + "current_user", + "cursor", + "database", + "dbcc", + "deallocate", + "declare", + "default", + "delete", + "deny", + "desc", + "disk", + "distinct", + "distributed", + "double", + "drop", + "dump", + "else", + "end", + "errlvl", + "escape", + "except", + "exec", + "execute", + "exists", + "exit", + "external", + "fetch", + "file", + "fillfactor", + "for", + "foreign", + "freetext", + "freetexttable", + "from", + "full", + "function", + "goto", + "grant", + "group", + "having", + "holdlock", + "identity", + "identity_insert", + "identitycol", + "if", + "in", + "index", + "inner", + "insert", + "intersect", + "into", + "is", + "join", + "key", + "kill", + "left", + "like", + "lineno", + "load", + "merge", + "national", + "nocheck", + "nonclustered", + "not", + "null", + "nullif", + "of", + "off", + "offsets", + "on", + "open", + "opendatasource", + "openquery", + "openrowset", + "openxml", + "option", + "or", + "order", + "outer", + "over", + "percent", + "pivot", + "plan", + "precision", + "primary", + "print", + "proc", + "procedure", + "public", + "raiserror", + "read", + "readtext", + "reconfigure", + "references", + "replication", + "restore", + "restrict", + "return", + "revert", + "revoke", + "right", + "rollback", + "rowcount", + "rowguidcol", + "rule", + "save", + "schema", + "securityaudit", + "select", + "session_user", + "set", + "setuser", + "shutdown", + "some", + "statistics", + "system_user", + "table", + "tablesample", + "textsize", + "then", + "to", + "top", + "tran", + "transaction", + "trigger", + "truncate", + "tsequal", + "union", + "unique", + "unpivot", + "update", + "updatetext", + "use", + "user", + "values", + "varying", + "view", + "waitfor", + "when", + "where", + "while", + "with", + "writetext", +} + + +class REAL(sqltypes.REAL): + """the SQL Server REAL datatype.""" + + def __init__(self, **kw): + # REAL is a synonym for FLOAT(24) on SQL server. + # it is only accepted as the word "REAL" in DDL, the numeric + # precision value is not allowed to be present + kw.setdefault("precision", 24) + super().__init__(**kw) + + +class DOUBLE_PRECISION(sqltypes.DOUBLE_PRECISION): + """the SQL Server DOUBLE PRECISION datatype. + + .. versionadded:: 2.0.11 + + """ + + def __init__(self, **kw): + # DOUBLE PRECISION is a synonym for FLOAT(53) on SQL server. + # it is only accepted as the word "DOUBLE PRECISION" in DDL, + # the numeric precision value is not allowed to be present + kw.setdefault("precision", 53) + super().__init__(**kw) + + +class TINYINT(sqltypes.Integer): + __visit_name__ = "TINYINT" + + +# MSSQL DATE/TIME types have varied behavior, sometimes returning +# strings. MSDate/TIME check for everything, and always +# filter bind parameters into datetime objects (required by pyodbc, +# not sure about other dialects). + + +class _MSDate(sqltypes.Date): + def bind_processor(self, dialect): + def process(value): + if type(value) == datetime.date: + return datetime.datetime(value.year, value.month, value.day) + else: + return value + + return process + + _reg = re.compile(r"(\d+)-(\d+)-(\d+)") + + def result_processor(self, dialect, coltype): + def process(value): + if isinstance(value, datetime.datetime): + return value.date() + elif isinstance(value, str): + m = self._reg.match(value) + if not m: + raise ValueError( + "could not parse %r as a date value" % (value,) + ) + return datetime.date(*[int(x or 0) for x in m.groups()]) + else: + return value + + return process + + +class TIME(sqltypes.TIME): + def __init__(self, precision=None, **kwargs): + self.precision = precision + super().__init__() + + __zero_date = datetime.date(1900, 1, 1) + + def bind_processor(self, dialect): + def process(value): + if isinstance(value, datetime.datetime): + value = datetime.datetime.combine( + self.__zero_date, value.time() + ) + elif isinstance(value, datetime.time): + """issue #5339 + per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns + pass TIME value as string + """ # noqa + value = str(value) + return value + + return process + + _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?") + + def result_processor(self, dialect, coltype): + def process(value): + if isinstance(value, datetime.datetime): + return value.time() + elif isinstance(value, str): + m = self._reg.match(value) + if not m: + raise ValueError( + "could not parse %r as a time value" % (value,) + ) + return datetime.time(*[int(x or 0) for x in m.groups()]) + else: + return value + + return process + + +_MSTime = TIME + + +class _BASETIMEIMPL(TIME): + __visit_name__ = "_BASETIMEIMPL" + + +class _DateTimeBase: + def bind_processor(self, dialect): + def process(value): + if type(value) == datetime.date: + return datetime.datetime(value.year, value.month, value.day) + else: + return value + + return process + + +class _MSDateTime(_DateTimeBase, sqltypes.DateTime): + pass + + +class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime): + __visit_name__ = "SMALLDATETIME" + + +class DATETIME2(_DateTimeBase, sqltypes.DateTime): + __visit_name__ = "DATETIME2" + + def __init__(self, precision=None, **kw): + super().__init__(**kw) + self.precision = precision + + +class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime): + __visit_name__ = "DATETIMEOFFSET" + + def __init__(self, precision=None, **kw): + super().__init__(**kw) + self.precision = precision + + +class _UnicodeLiteral: + def literal_processor(self, dialect): + def process(value): + value = value.replace("'", "''") + + if dialect.identifier_preparer._double_percents: + value = value.replace("%", "%%") + + return "N'%s'" % value + + return process + + +class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode): + pass + + +class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText): + pass + + +class TIMESTAMP(sqltypes._Binary): + """Implement the SQL Server TIMESTAMP type. + + Note this is **completely different** than the SQL Standard + TIMESTAMP type, which is not supported by SQL Server. It + is a read-only datatype that does not support INSERT of values. + + .. versionadded:: 1.2 + + .. seealso:: + + :class:`_mssql.ROWVERSION` + + """ + + __visit_name__ = "TIMESTAMP" + + # expected by _Binary to be present + length = None + + def __init__(self, convert_int=False): + """Construct a TIMESTAMP or ROWVERSION type. + + :param convert_int: if True, binary integer values will + be converted to integers on read. + + .. versionadded:: 1.2 + + """ + self.convert_int = convert_int + + def result_processor(self, dialect, coltype): + super_ = super().result_processor(dialect, coltype) + if self.convert_int: + + def process(value): + if super_: + value = super_(value) + if value is not None: + # https://stackoverflow.com/a/30403242/34549 + value = int(codecs.encode(value, "hex"), 16) + return value + + return process + else: + return super_ + + +class ROWVERSION(TIMESTAMP): + """Implement the SQL Server ROWVERSION type. + + The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP + datatype, however current SQL Server documentation suggests using + ROWVERSION for new datatypes going forward. + + The ROWVERSION datatype does **not** reflect (e.g. introspect) from the + database as itself; the returned datatype will be + :class:`_mssql.TIMESTAMP`. + + This is a read-only datatype that does not support INSERT of values. + + .. versionadded:: 1.2 + + .. seealso:: + + :class:`_mssql.TIMESTAMP` + + """ + + __visit_name__ = "ROWVERSION" + + +class NTEXT(sqltypes.UnicodeText): + """MSSQL NTEXT type, for variable-length unicode text up to 2^30 + characters.""" + + __visit_name__ = "NTEXT" + + +class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary): + """The MSSQL VARBINARY type. + + This type adds additional features to the core :class:`_types.VARBINARY` + type, including "deprecate_large_types" mode where + either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL + Server ``FILESTREAM`` option. + + .. seealso:: + + :ref:`mssql_large_type_deprecation` + + """ + + __visit_name__ = "VARBINARY" + + def __init__(self, length=None, filestream=False): + """ + Construct a VARBINARY type. + + :param length: optional, a length for the column for use in + DDL statements, for those binary types that accept a length, + such as the MySQL BLOB type. + + :param filestream=False: if True, renders the ``FILESTREAM`` keyword + in the table definition. In this case ``length`` must be ``None`` + or ``'max'``. + + .. versionadded:: 1.4.31 + + """ + + self.filestream = filestream + if self.filestream and length not in (None, "max"): + raise ValueError( + "length must be None or 'max' when setting filestream" + ) + super().__init__(length=length) + + +class IMAGE(sqltypes.LargeBinary): + __visit_name__ = "IMAGE" + + +class XML(sqltypes.Text): + """MSSQL XML type. + + This is a placeholder type for reflection purposes that does not include + any Python-side datatype support. It also does not currently support + additional arguments, such as "CONTENT", "DOCUMENT", + "xml_schema_collection". + + """ + + __visit_name__ = "XML" + + +class BIT(sqltypes.Boolean): + """MSSQL BIT type. + + Both pyodbc and pymssql return values from BIT columns as + Python <class 'bool'> so just subclass Boolean. + + """ + + __visit_name__ = "BIT" + + +class MONEY(sqltypes.TypeEngine): + __visit_name__ = "MONEY" + + +class SMALLMONEY(sqltypes.TypeEngine): + __visit_name__ = "SMALLMONEY" + + +class MSUUid(sqltypes.Uuid): + def bind_processor(self, dialect): + if self.native_uuid: + # this is currently assuming pyodbc; might not work for + # some other mssql driver + return None + else: + if self.as_uuid: + + def process(value): + if value is not None: + value = value.hex + return value + + return process + else: + + def process(value): + if value is not None: + value = value.replace("-", "").replace("''", "'") + return value + + return process + + def literal_processor(self, dialect): + if self.native_uuid: + + def process(value): + return f"""'{str(value).replace("''", "'")}'""" + + return process + else: + if self.as_uuid: + + def process(value): + return f"""'{value.hex}'""" + + return process + else: + + def process(value): + return f"""'{ + value.replace("-", "").replace("'", "''") + }'""" + + return process + + +class UNIQUEIDENTIFIER(sqltypes.Uuid[sqltypes._UUID_RETURN]): + __visit_name__ = "UNIQUEIDENTIFIER" + + @overload + def __init__( + self: UNIQUEIDENTIFIER[_python_UUID], as_uuid: Literal[True] = ... + ): ... + + @overload + def __init__( + self: UNIQUEIDENTIFIER[str], as_uuid: Literal[False] = ... + ): ... + + def __init__(self, as_uuid: bool = True): + """Construct a :class:`_mssql.UNIQUEIDENTIFIER` type. + + + :param as_uuid=True: if True, values will be interpreted + as Python uuid objects, converting to/from string via the + DBAPI. + + .. versionchanged: 2.0 Added direct "uuid" support to the + :class:`_mssql.UNIQUEIDENTIFIER` datatype; uuid interpretation + defaults to ``True``. + + """ + self.as_uuid = as_uuid + self.native_uuid = True + + +class SQL_VARIANT(sqltypes.TypeEngine): + __visit_name__ = "SQL_VARIANT" + + +# old names. +MSDateTime = _MSDateTime +MSDate = _MSDate +MSReal = REAL +MSTinyInteger = TINYINT +MSTime = TIME +MSSmallDateTime = SMALLDATETIME +MSDateTime2 = DATETIME2 +MSDateTimeOffset = DATETIMEOFFSET +MSText = TEXT +MSNText = NTEXT +MSString = VARCHAR +MSNVarchar = NVARCHAR +MSChar = CHAR +MSNChar = NCHAR +MSBinary = BINARY +MSVarBinary = VARBINARY +MSImage = IMAGE +MSBit = BIT +MSMoney = MONEY +MSSmallMoney = SMALLMONEY +MSUniqueIdentifier = UNIQUEIDENTIFIER +MSVariant = SQL_VARIANT + +ischema_names = { + "int": INTEGER, + "bigint": BIGINT, + "smallint": SMALLINT, + "tinyint": TINYINT, + "varchar": VARCHAR, + "nvarchar": NVARCHAR, + "char": CHAR, + "nchar": NCHAR, + "text": TEXT, + "ntext": NTEXT, + "decimal": DECIMAL, + "numeric": NUMERIC, + "float": FLOAT, + "datetime": DATETIME, + "datetime2": DATETIME2, + "datetimeoffset": DATETIMEOFFSET, + "date": DATE, + "time": TIME, + "smalldatetime": SMALLDATETIME, + "binary": BINARY, + "varbinary": VARBINARY, + "bit": BIT, + "real": REAL, + "double precision": DOUBLE_PRECISION, + "image": IMAGE, + "xml": XML, + "timestamp": TIMESTAMP, + "money": MONEY, + "smallmoney": SMALLMONEY, + "uniqueidentifier": UNIQUEIDENTIFIER, + "sql_variant": SQL_VARIANT, +} + + +class MSTypeCompiler(compiler.GenericTypeCompiler): + def _extend(self, spec, type_, length=None): + """Extend a string-type declaration with standard SQL + COLLATE annotations. + + """ + + if getattr(type_, "collation", None): + collation = "COLLATE %s" % type_.collation + else: + collation = None + + if not length: + length = type_.length + + if length: + spec = spec + "(%s)" % length + + return " ".join([c for c in (spec, collation) if c is not None]) + + def visit_double(self, type_, **kw): + return self.visit_DOUBLE_PRECISION(type_, **kw) + + def visit_FLOAT(self, type_, **kw): + precision = getattr(type_, "precision", None) + if precision is None: + return "FLOAT" + else: + return "FLOAT(%(precision)s)" % {"precision": precision} + + def visit_TINYINT(self, type_, **kw): + return "TINYINT" + + def visit_TIME(self, type_, **kw): + precision = getattr(type_, "precision", None) + if precision is not None: + return "TIME(%s)" % precision + else: + return "TIME" + + def visit_TIMESTAMP(self, type_, **kw): + return "TIMESTAMP" + + def visit_ROWVERSION(self, type_, **kw): + return "ROWVERSION" + + def visit_datetime(self, type_, **kw): + if type_.timezone: + return self.visit_DATETIMEOFFSET(type_, **kw) + else: + return self.visit_DATETIME(type_, **kw) + + def visit_DATETIMEOFFSET(self, type_, **kw): + precision = getattr(type_, "precision", None) + if precision is not None: + return "DATETIMEOFFSET(%s)" % type_.precision + else: + return "DATETIMEOFFSET" + + def visit_DATETIME2(self, type_, **kw): + precision = getattr(type_, "precision", None) + if precision is not None: + return "DATETIME2(%s)" % precision + else: + return "DATETIME2" + + def visit_SMALLDATETIME(self, type_, **kw): + return "SMALLDATETIME" + + def visit_unicode(self, type_, **kw): + return self.visit_NVARCHAR(type_, **kw) + + def visit_text(self, type_, **kw): + if self.dialect.deprecate_large_types: + return self.visit_VARCHAR(type_, **kw) + else: + return self.visit_TEXT(type_, **kw) + + def visit_unicode_text(self, type_, **kw): + if self.dialect.deprecate_large_types: + return self.visit_NVARCHAR(type_, **kw) + else: + return self.visit_NTEXT(type_, **kw) + + def visit_NTEXT(self, type_, **kw): + return self._extend("NTEXT", type_) + + def visit_TEXT(self, type_, **kw): + return self._extend("TEXT", type_) + + def visit_VARCHAR(self, type_, **kw): + return self._extend("VARCHAR", type_, length=type_.length or "max") + + def visit_CHAR(self, type_, **kw): + return self._extend("CHAR", type_) + + def visit_NCHAR(self, type_, **kw): + return self._extend("NCHAR", type_) + + def visit_NVARCHAR(self, type_, **kw): + return self._extend("NVARCHAR", type_, length=type_.length or "max") + + def visit_date(self, type_, **kw): + if self.dialect.server_version_info < MS_2008_VERSION: + return self.visit_DATETIME(type_, **kw) + else: + return self.visit_DATE(type_, **kw) + + def visit__BASETIMEIMPL(self, type_, **kw): + return self.visit_time(type_, **kw) + + def visit_time(self, type_, **kw): + if self.dialect.server_version_info < MS_2008_VERSION: + return self.visit_DATETIME(type_, **kw) + else: + return self.visit_TIME(type_, **kw) + + def visit_large_binary(self, type_, **kw): + if self.dialect.deprecate_large_types: + return self.visit_VARBINARY(type_, **kw) + else: + return self.visit_IMAGE(type_, **kw) + + def visit_IMAGE(self, type_, **kw): + return "IMAGE" + + def visit_XML(self, type_, **kw): + return "XML" + + def visit_VARBINARY(self, type_, **kw): + text = self._extend("VARBINARY", type_, length=type_.length or "max") + if getattr(type_, "filestream", False): + text += " FILESTREAM" + return text + + def visit_boolean(self, type_, **kw): + return self.visit_BIT(type_) + + def visit_BIT(self, type_, **kw): + return "BIT" + + def visit_JSON(self, type_, **kw): + # this is a bit of a break with SQLAlchemy's convention of + # "UPPERCASE name goes to UPPERCASE type name with no modification" + return self._extend("NVARCHAR", type_, length="max") + + def visit_MONEY(self, type_, **kw): + return "MONEY" + + def visit_SMALLMONEY(self, type_, **kw): + return "SMALLMONEY" + + def visit_uuid(self, type_, **kw): + if type_.native_uuid: + return self.visit_UNIQUEIDENTIFIER(type_, **kw) + else: + return super().visit_uuid(type_, **kw) + + def visit_UNIQUEIDENTIFIER(self, type_, **kw): + return "UNIQUEIDENTIFIER" + + def visit_SQL_VARIANT(self, type_, **kw): + return "SQL_VARIANT" + + +class MSExecutionContext(default.DefaultExecutionContext): + _enable_identity_insert = False + _select_lastrowid = False + _lastrowid = None + + dialect: MSDialect + + def _opt_encode(self, statement): + if self.compiled and self.compiled.schema_translate_map: + rst = self.compiled.preparer._render_schema_translates + statement = rst(statement, self.compiled.schema_translate_map) + + return statement + + def pre_exec(self): + """Activate IDENTITY_INSERT if needed.""" + + if self.isinsert: + if TYPE_CHECKING: + assert is_sql_compiler(self.compiled) + assert isinstance(self.compiled.compile_state, DMLState) + assert isinstance( + self.compiled.compile_state.dml_table, TableClause + ) + + tbl = self.compiled.compile_state.dml_table + id_column = tbl._autoincrement_column + + if id_column is not None and ( + not isinstance(id_column.default, Sequence) + ): + insert_has_identity = True + compile_state = self.compiled.dml_compile_state + self._enable_identity_insert = ( + id_column.key in self.compiled_parameters[0] + ) or ( + compile_state._dict_parameters + and (id_column.key in compile_state._insert_col_keys) + ) + + else: + insert_has_identity = False + self._enable_identity_insert = False + + self._select_lastrowid = ( + not self.compiled.inline + and insert_has_identity + and not self.compiled.effective_returning + and not self._enable_identity_insert + and not self.executemany + ) + + if self._enable_identity_insert: + self.root_connection._cursor_execute( + self.cursor, + self._opt_encode( + "SET IDENTITY_INSERT %s ON" + % self.identifier_preparer.format_table(tbl) + ), + (), + self, + ) + + def post_exec(self): + """Disable IDENTITY_INSERT if enabled.""" + + conn = self.root_connection + + if self.isinsert or self.isupdate or self.isdelete: + self._rowcount = self.cursor.rowcount + + if self._select_lastrowid: + if self.dialect.use_scope_identity: + conn._cursor_execute( + self.cursor, + "SELECT scope_identity() AS lastrowid", + (), + self, + ) + else: + conn._cursor_execute( + self.cursor, "SELECT @@identity AS lastrowid", (), self + ) + # fetchall() ensures the cursor is consumed without closing it + row = self.cursor.fetchall()[0] + self._lastrowid = int(row[0]) + + self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML + elif ( + self.compiled is not None + and is_sql_compiler(self.compiled) + and self.compiled.effective_returning + ): + self.cursor_fetch_strategy = ( + _cursor.FullyBufferedCursorFetchStrategy( + self.cursor, + self.cursor.description, + self.cursor.fetchall(), + ) + ) + + if self._enable_identity_insert: + if TYPE_CHECKING: + assert is_sql_compiler(self.compiled) + assert isinstance(self.compiled.compile_state, DMLState) + assert isinstance( + self.compiled.compile_state.dml_table, TableClause + ) + conn._cursor_execute( + self.cursor, + self._opt_encode( + "SET IDENTITY_INSERT %s OFF" + % self.identifier_preparer.format_table( + self.compiled.compile_state.dml_table + ) + ), + (), + self, + ) + + def get_lastrowid(self): + return self._lastrowid + + def handle_dbapi_exception(self, e): + if self._enable_identity_insert: + try: + self.cursor.execute( + self._opt_encode( + "SET IDENTITY_INSERT %s OFF" + % self.identifier_preparer.format_table( + self.compiled.compile_state.dml_table + ) + ) + ) + except Exception: + pass + + def fire_sequence(self, seq, type_): + return self._execute_scalar( + ( + "SELECT NEXT VALUE FOR %s" + % self.identifier_preparer.format_sequence(seq) + ), + type_, + ) + + def get_insert_default(self, column): + if ( + isinstance(column, sa_schema.Column) + and column is column.table._autoincrement_column + and isinstance(column.default, sa_schema.Sequence) + and column.default.optional + ): + return None + return super().get_insert_default(column) + + +class MSSQLCompiler(compiler.SQLCompiler): + returning_precedes_values = True + + extract_map = util.update_copy( + compiler.SQLCompiler.extract_map, + { + "doy": "dayofyear", + "dow": "weekday", + "milliseconds": "millisecond", + "microseconds": "microsecond", + }, + ) + + def __init__(self, *args, **kwargs): + self.tablealiases = {} + super().__init__(*args, **kwargs) + + def _with_legacy_schema_aliasing(fn): + def decorate(self, *arg, **kw): + if self.dialect.legacy_schema_aliasing: + return fn(self, *arg, **kw) + else: + super_ = getattr(super(MSSQLCompiler, self), fn.__name__) + return super_(*arg, **kw) + + return decorate + + def visit_now_func(self, fn, **kw): + return "CURRENT_TIMESTAMP" + + def visit_current_date_func(self, fn, **kw): + return "GETDATE()" + + def visit_length_func(self, fn, **kw): + return "LEN%s" % self.function_argspec(fn, **kw) + + def visit_char_length_func(self, fn, **kw): + return "LEN%s" % self.function_argspec(fn, **kw) + + def visit_aggregate_strings_func(self, fn, **kw): + expr = fn.clauses.clauses[0]._compiler_dispatch(self, **kw) + kw["literal_execute"] = True + delimeter = fn.clauses.clauses[1]._compiler_dispatch(self, **kw) + return f"string_agg({expr}, {delimeter})" + + def visit_concat_op_expression_clauselist( + self, clauselist, operator, **kw + ): + return " + ".join(self.process(elem, **kw) for elem in clauselist) + + def visit_concat_op_binary(self, binary, operator, **kw): + return "%s + %s" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def visit_true(self, expr, **kw): + return "1" + + def visit_false(self, expr, **kw): + return "0" + + def visit_match_op_binary(self, binary, operator, **kw): + return "CONTAINS (%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def get_select_precolumns(self, select, **kw): + """MS-SQL puts TOP, it's version of LIMIT here""" + + s = super().get_select_precolumns(select, **kw) + + if select._has_row_limiting_clause and self._use_top(select): + # ODBC drivers and possibly others + # don't support bind params in the SELECT clause on SQL Server. + # so have to use literal here. + kw["literal_execute"] = True + s += "TOP %s " % self.process( + self._get_limit_or_fetch(select), **kw + ) + if select._fetch_clause is not None: + if select._fetch_clause_options["percent"]: + s += "PERCENT " + if select._fetch_clause_options["with_ties"]: + s += "WITH TIES " + + return s + + def get_from_hint_text(self, table, text): + return text + + def get_crud_hint_text(self, table, text): + return text + + def _get_limit_or_fetch(self, select): + if select._fetch_clause is None: + return select._limit_clause + else: + return select._fetch_clause + + def _use_top(self, select): + return (select._offset_clause is None) and ( + select._simple_int_clause(select._limit_clause) + or ( + # limit can use TOP with is by itself. fetch only uses TOP + # when it needs to because of PERCENT and/or WITH TIES + # TODO: Why? shouldn't we use TOP always ? + select._simple_int_clause(select._fetch_clause) + and ( + select._fetch_clause_options["percent"] + or select._fetch_clause_options["with_ties"] + ) + ) + ) + + def limit_clause(self, cs, **kwargs): + return "" + + def _check_can_use_fetch_limit(self, select): + # to use ROW_NUMBER(), an ORDER BY is required. + # OFFSET are FETCH are options of the ORDER BY clause + if not select._order_by_clause.clauses: + raise exc.CompileError( + "MSSQL requires an order_by when " + "using an OFFSET or a non-simple " + "LIMIT clause" + ) + + if select._fetch_clause_options is not None and ( + select._fetch_clause_options["percent"] + or select._fetch_clause_options["with_ties"] + ): + raise exc.CompileError( + "MSSQL needs TOP to use PERCENT and/or WITH TIES. " + "Only simple fetch without offset can be used." + ) + + def _row_limit_clause(self, select, **kw): + """MSSQL 2012 supports OFFSET/FETCH operators + Use it instead subquery with row_number + + """ + + if self.dialect._supports_offset_fetch and not self._use_top(select): + self._check_can_use_fetch_limit(select) + + return self.fetch_clause( + select, + fetch_clause=self._get_limit_or_fetch(select), + require_offset=True, + **kw, + ) + + else: + return "" + + def visit_try_cast(self, element, **kw): + return "TRY_CAST (%s AS %s)" % ( + self.process(element.clause, **kw), + self.process(element.typeclause, **kw), + ) + + def translate_select_structure(self, select_stmt, **kwargs): + """Look for ``LIMIT`` and OFFSET in a select statement, and if + so tries to wrap it in a subquery with ``row_number()`` criterion. + MSSQL 2012 and above are excluded + + """ + select = select_stmt + + if ( + select._has_row_limiting_clause + and not self.dialect._supports_offset_fetch + and not self._use_top(select) + and not getattr(select, "_mssql_visit", None) + ): + self._check_can_use_fetch_limit(select) + + _order_by_clauses = [ + sql_util.unwrap_label_reference(elem) + for elem in select._order_by_clause.clauses + ] + + limit_clause = self._get_limit_or_fetch(select) + offset_clause = select._offset_clause + + select = select._generate() + select._mssql_visit = True + select = ( + select.add_columns( + sql.func.ROW_NUMBER() + .over(order_by=_order_by_clauses) + .label("mssql_rn") + ) + .order_by(None) + .alias() + ) + + mssql_rn = sql.column("mssql_rn") + limitselect = sql.select( + *[c for c in select.c if c.key != "mssql_rn"] + ) + if offset_clause is not None: + limitselect = limitselect.where(mssql_rn > offset_clause) + if limit_clause is not None: + limitselect = limitselect.where( + mssql_rn <= (limit_clause + offset_clause) + ) + else: + limitselect = limitselect.where(mssql_rn <= (limit_clause)) + return limitselect + else: + return select + + @_with_legacy_schema_aliasing + def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs): + if mssql_aliased is table or iscrud: + return super().visit_table(table, **kwargs) + + # alias schema-qualified tables + alias = self._schema_aliased_table(table) + if alias is not None: + return self.process(alias, mssql_aliased=table, **kwargs) + else: + return super().visit_table(table, **kwargs) + + @_with_legacy_schema_aliasing + def visit_alias(self, alias, **kw): + # translate for schema-qualified table aliases + kw["mssql_aliased"] = alias.element + return super().visit_alias(alias, **kw) + + @_with_legacy_schema_aliasing + def visit_column(self, column, add_to_result_map=None, **kw): + if ( + column.table is not None + and (not self.isupdate and not self.isdelete) + or self.is_subquery() + ): + # translate for schema-qualified table aliases + t = self._schema_aliased_table(column.table) + if t is not None: + converted = elements._corresponding_column_or_error(t, column) + if add_to_result_map is not None: + add_to_result_map( + column.name, + column.name, + (column, column.name, column.key), + column.type, + ) + + return super().visit_column(converted, **kw) + + return super().visit_column( + column, add_to_result_map=add_to_result_map, **kw + ) + + def _schema_aliased_table(self, table): + if getattr(table, "schema", None) is not None: + if table not in self.tablealiases: + self.tablealiases[table] = table.alias() + return self.tablealiases[table] + else: + return None + + def visit_extract(self, extract, **kw): + field = self.extract_map.get(extract.field, extract.field) + return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw)) + + def visit_savepoint(self, savepoint_stmt, **kw): + return "SAVE TRANSACTION %s" % self.preparer.format_savepoint( + savepoint_stmt + ) + + def visit_rollback_to_savepoint(self, savepoint_stmt, **kw): + return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint( + savepoint_stmt + ) + + def visit_binary(self, binary, **kwargs): + """Move bind parameters to the right-hand side of an operator, where + possible. + + """ + if ( + isinstance(binary.left, expression.BindParameter) + and binary.operator == operator.eq + and not isinstance(binary.right, expression.BindParameter) + ): + return self.process( + expression.BinaryExpression( + binary.right, binary.left, binary.operator + ), + **kwargs, + ) + return super().visit_binary(binary, **kwargs) + + def returning_clause( + self, stmt, returning_cols, *, populate_result_map, **kw + ): + # SQL server returning clause requires that the columns refer to + # the virtual table names "inserted" or "deleted". Here, we make + # a simple alias of our table with that name, and then adapt the + # columns we have from the list of RETURNING columns to that new name + # so that they render as "inserted.<colname>" / "deleted.<colname>". + + if stmt.is_insert or stmt.is_update: + target = stmt.table.alias("inserted") + elif stmt.is_delete: + target = stmt.table.alias("deleted") + else: + assert False, "expected Insert, Update or Delete statement" + + adapter = sql_util.ClauseAdapter(target) + + # adapter.traverse() takes a column from our target table and returns + # the one that is linked to the "inserted" / "deleted" tables. So in + # order to retrieve these values back from the result (e.g. like + # row[column]), tell the compiler to also add the original unadapted + # column to the result map. Before #4877, these were (unknowingly) + # falling back using string name matching in the result set which + # necessarily used an expensive KeyError in order to match. + + columns = [ + self._label_returning_column( + stmt, + adapter.traverse(column), + populate_result_map, + {"result_map_targets": (column,)}, + fallback_label_name=fallback_label_name, + column_is_repeated=repeated, + name=name, + proxy_name=proxy_name, + **kw, + ) + for ( + name, + proxy_name, + fallback_label_name, + column, + repeated, + ) in stmt._generate_columns_plus_names( + True, cols=expression._select_iterables(returning_cols) + ) + ] + + return "OUTPUT " + ", ".join(columns) + + def get_cte_preamble(self, recursive): + # SQL Server finds it too inconvenient to accept + # an entirely optional, SQL standard specified, + # "RECURSIVE" word with their "WITH", + # so here we go + return "WITH" + + def label_select_column(self, select, column, asfrom): + if isinstance(column, expression.Function): + return column.label(None) + else: + return super().label_select_column(select, column, asfrom) + + def for_update_clause(self, select, **kw): + # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which + # SQLAlchemy doesn't use + return "" + + def order_by_clause(self, select, **kw): + # MSSQL only allows ORDER BY in subqueries if there is a LIMIT: + # "The ORDER BY clause is invalid in views, inline functions, + # derived tables, subqueries, and common table expressions, + # unless TOP, OFFSET or FOR XML is also specified." + if ( + self.is_subquery() + and not self._use_top(select) + and ( + select._offset is None + or not self.dialect._supports_offset_fetch + ) + ): + # avoid processing the order by clause if we won't end up + # using it, because we don't want all the bind params tacked + # onto the positional list if that is what the dbapi requires + return "" + + order_by = self.process(select._order_by_clause, **kw) + + if order_by: + return " ORDER BY " + order_by + else: + return "" + + def update_from_clause( + self, update_stmt, from_table, extra_froms, from_hints, **kw + ): + """Render the UPDATE..FROM clause specific to MSSQL. + + In MSSQL, if the UPDATE statement involves an alias of the table to + be updated, then the table itself must be added to the FROM list as + well. Otherwise, it is optional. Here, we add it regardless. + + """ + return "FROM " + ", ".join( + t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) + for t in [from_table] + extra_froms + ) + + def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw): + """If we have extra froms make sure we render any alias as hint.""" + ashint = False + if extra_froms: + ashint = True + return from_table._compiler_dispatch( + self, asfrom=True, iscrud=True, ashint=ashint, **kw + ) + + def delete_extra_from_clause( + self, delete_stmt, from_table, extra_froms, from_hints, **kw + ): + """Render the DELETE .. FROM clause specific to MSSQL. + + Yes, it has the FROM keyword twice. + + """ + return "FROM " + ", ".join( + t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) + for t in [from_table] + extra_froms + ) + + def visit_empty_set_expr(self, type_, **kw): + return "SELECT 1 WHERE 1!=1" + + def visit_is_distinct_from_binary(self, binary, operator, **kw): + return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_is_not_distinct_from_binary(self, binary, operator, **kw): + return "EXISTS (SELECT %s INTERSECT SELECT %s)" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def _render_json_extract_from_binary(self, binary, operator, **kw): + # note we are intentionally calling upon the process() calls in the + # order in which they appear in the SQL String as this is used + # by positional parameter rendering + + if binary.type._type_affinity is sqltypes.JSON: + return "JSON_QUERY(%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + # as with other dialects, start with an explicit test for NULL + case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + if binary.type._type_affinity is sqltypes.Integer: + type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + elif binary.type._type_affinity is sqltypes.Numeric: + type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ( + "FLOAT" + if isinstance(binary.type, sqltypes.Float) + else "NUMERIC(%s, %s)" + % (binary.type.precision, binary.type.scale) + ), + ) + elif binary.type._type_affinity is sqltypes.Boolean: + # the NULL handling is particularly weird with boolean, so + # explicitly return numeric (BIT) constants + type_expression = ( + "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL" + ) + elif binary.type._type_affinity is sqltypes.String: + # TODO: does this comment (from mysql) apply to here, too? + # this fails with a JSON value that's a four byte unicode + # string. SQLite has the same problem at the moment + type_expression = "ELSE JSON_VALUE(%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + else: + # other affinity....this is not expected right now + type_expression = "ELSE JSON_QUERY(%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + return case_expression + " " + type_expression + " END" + + def visit_json_getitem_op_binary(self, binary, operator, **kw): + return self._render_json_extract_from_binary(binary, operator, **kw) + + def visit_json_path_getitem_op_binary(self, binary, operator, **kw): + return self._render_json_extract_from_binary(binary, operator, **kw) + + def visit_sequence(self, seq, **kw): + return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq) + + +class MSSQLStrictCompiler(MSSQLCompiler): + """A subclass of MSSQLCompiler which disables the usage of bind + parameters where not allowed natively by MS-SQL. + + A dialect may use this compiler on a platform where native + binds are used. + + """ + + ansi_bind_rules = True + + def visit_in_op_binary(self, binary, operator, **kw): + kw["literal_execute"] = True + return "%s IN %s" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def visit_not_in_op_binary(self, binary, operator, **kw): + kw["literal_execute"] = True + return "%s NOT IN %s" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def render_literal_value(self, value, type_): + """ + For date and datetime values, convert to a string + format acceptable to MSSQL. That seems to be the + so-called ODBC canonical date format which looks + like this: + + yyyy-mm-dd hh:mi:ss.mmm(24h) + + For other data types, call the base class implementation. + """ + # datetime and date are both subclasses of datetime.date + if issubclass(type(value), datetime.date): + # SQL Server wants single quotes around the date string. + return "'" + str(value) + "'" + else: + return super().render_literal_value(value, type_) + + +class MSDDLCompiler(compiler.DDLCompiler): + def get_column_specification(self, column, **kwargs): + colspec = self.preparer.format_column(column) + + # type is not accepted in a computed column + if column.computed is not None: + colspec += " " + self.process(column.computed) + else: + colspec += " " + self.dialect.type_compiler_instance.process( + column.type, type_expression=column + ) + + if column.nullable is not None: + if ( + not column.nullable + or column.primary_key + or isinstance(column.default, sa_schema.Sequence) + or column.autoincrement is True + or column.identity + ): + colspec += " NOT NULL" + elif column.computed is None: + # don't specify "NULL" for computed columns + colspec += " NULL" + + if column.table is None: + raise exc.CompileError( + "mssql requires Table-bound columns " + "in order to generate DDL" + ) + + d_opt = column.dialect_options["mssql"] + start = d_opt["identity_start"] + increment = d_opt["identity_increment"] + if start is not None or increment is not None: + if column.identity: + raise exc.CompileError( + "Cannot specify options 'mssql_identity_start' and/or " + "'mssql_identity_increment' while also using the " + "'Identity' construct." + ) + util.warn_deprecated( + "The dialect options 'mssql_identity_start' and " + "'mssql_identity_increment' are deprecated. " + "Use the 'Identity' object instead.", + "1.4", + ) + + if column.identity: + colspec += self.process(column.identity, **kwargs) + elif ( + column is column.table._autoincrement_column + or column.autoincrement is True + ) and ( + not isinstance(column.default, Sequence) or column.default.optional + ): + colspec += self.process(Identity(start=start, increment=increment)) + else: + default = self.get_column_default_string(column) + if default is not None: + colspec += " DEFAULT " + default + + return colspec + + def visit_create_index(self, create, include_schema=False, **kw): + index = create.element + self._verify_index_table(index) + preparer = self.preparer + text = "CREATE " + if index.unique: + text += "UNIQUE " + + # handle clustering option + clustered = index.dialect_options["mssql"]["clustered"] + if clustered is not None: + if clustered: + text += "CLUSTERED " + else: + text += "NONCLUSTERED " + + # handle columnstore option (has no negative value) + columnstore = index.dialect_options["mssql"]["columnstore"] + if columnstore: + text += "COLUMNSTORE " + + text += "INDEX %s ON %s" % ( + self._prepared_index_name(index, include_schema=include_schema), + preparer.format_table(index.table), + ) + + # in some case mssql allows indexes with no columns defined + if len(index.expressions) > 0: + text += " (%s)" % ", ".join( + self.sql_compiler.process( + expr, include_table=False, literal_binds=True + ) + for expr in index.expressions + ) + + # handle other included columns + if index.dialect_options["mssql"]["include"]: + inclusions = [ + index.table.c[col] if isinstance(col, str) else col + for col in index.dialect_options["mssql"]["include"] + ] + + text += " INCLUDE (%s)" % ", ".join( + [preparer.quote(c.name) for c in inclusions] + ) + + whereclause = index.dialect_options["mssql"]["where"] + + if whereclause is not None: + whereclause = coercions.expect( + roles.DDLExpressionRole, whereclause + ) + + where_compiled = self.sql_compiler.process( + whereclause, include_table=False, literal_binds=True + ) + text += " WHERE " + where_compiled + + return text + + def visit_drop_index(self, drop, **kw): + return "\nDROP INDEX %s ON %s" % ( + self._prepared_index_name(drop.element, include_schema=False), + self.preparer.format_table(drop.element.table), + ) + + def visit_primary_key_constraint(self, constraint, **kw): + if len(constraint) == 0: + return "" + text = "" + if constraint.name is not None: + text += "CONSTRAINT %s " % self.preparer.format_constraint( + constraint + ) + text += "PRIMARY KEY " + + clustered = constraint.dialect_options["mssql"]["clustered"] + if clustered is not None: + if clustered: + text += "CLUSTERED " + else: + text += "NONCLUSTERED " + + text += "(%s)" % ", ".join( + self.preparer.quote(c.name) for c in constraint + ) + text += self.define_constraint_deferrability(constraint) + return text + + def visit_unique_constraint(self, constraint, **kw): + if len(constraint) == 0: + return "" + text = "" + if constraint.name is not None: + formatted_name = self.preparer.format_constraint(constraint) + if formatted_name is not None: + text += "CONSTRAINT %s " % formatted_name + text += "UNIQUE %s" % self.define_unique_constraint_distinct( + constraint, **kw + ) + clustered = constraint.dialect_options["mssql"]["clustered"] + if clustered is not None: + if clustered: + text += "CLUSTERED " + else: + text += "NONCLUSTERED " + + text += "(%s)" % ", ".join( + self.preparer.quote(c.name) for c in constraint + ) + text += self.define_constraint_deferrability(constraint) + return text + + def visit_computed_column(self, generated, **kw): + text = "AS (%s)" % self.sql_compiler.process( + generated.sqltext, include_table=False, literal_binds=True + ) + # explicitly check for True|False since None means server default + if generated.persisted is True: + text += " PERSISTED" + return text + + def visit_set_table_comment(self, create, **kw): + schema = self.preparer.schema_for_object(create.element) + schema_name = schema if schema else self.dialect.default_schema_name + return ( + "execute sp_addextendedproperty 'MS_Description', " + "{}, 'schema', {}, 'table', {}".format( + self.sql_compiler.render_literal_value( + create.element.comment, sqltypes.NVARCHAR() + ), + self.preparer.quote_schema(schema_name), + self.preparer.format_table(create.element, use_schema=False), + ) + ) + + def visit_drop_table_comment(self, drop, **kw): + schema = self.preparer.schema_for_object(drop.element) + schema_name = schema if schema else self.dialect.default_schema_name + return ( + "execute sp_dropextendedproperty 'MS_Description', 'schema', " + "{}, 'table', {}".format( + self.preparer.quote_schema(schema_name), + self.preparer.format_table(drop.element, use_schema=False), + ) + ) + + def visit_set_column_comment(self, create, **kw): + schema = self.preparer.schema_for_object(create.element.table) + schema_name = schema if schema else self.dialect.default_schema_name + return ( + "execute sp_addextendedproperty 'MS_Description', " + "{}, 'schema', {}, 'table', {}, 'column', {}".format( + self.sql_compiler.render_literal_value( + create.element.comment, sqltypes.NVARCHAR() + ), + self.preparer.quote_schema(schema_name), + self.preparer.format_table( + create.element.table, use_schema=False + ), + self.preparer.format_column(create.element), + ) + ) + + def visit_drop_column_comment(self, drop, **kw): + schema = self.preparer.schema_for_object(drop.element.table) + schema_name = schema if schema else self.dialect.default_schema_name + return ( + "execute sp_dropextendedproperty 'MS_Description', 'schema', " + "{}, 'table', {}, 'column', {}".format( + self.preparer.quote_schema(schema_name), + self.preparer.format_table( + drop.element.table, use_schema=False + ), + self.preparer.format_column(drop.element), + ) + ) + + def visit_create_sequence(self, create, **kw): + prefix = None + if create.element.data_type is not None: + data_type = create.element.data_type + prefix = " AS %s" % self.type_compiler.process(data_type) + return super().visit_create_sequence(create, prefix=prefix, **kw) + + def visit_identity_column(self, identity, **kw): + text = " IDENTITY" + if identity.start is not None or identity.increment is not None: + start = 1 if identity.start is None else identity.start + increment = 1 if identity.increment is None else identity.increment + text += "(%s,%s)" % (start, increment) + return text + + +class MSIdentifierPreparer(compiler.IdentifierPreparer): + reserved_words = RESERVED_WORDS + + def __init__(self, dialect): + super().__init__( + dialect, + initial_quote="[", + final_quote="]", + quote_case_sensitive_collations=False, + ) + + def _escape_identifier(self, value): + return value.replace("]", "]]") + + def _unescape_identifier(self, value): + return value.replace("]]", "]") + + def quote_schema(self, schema, force=None): + """Prepare a quoted table and schema name.""" + + # need to re-implement the deprecation warning entirely + if force is not None: + # not using the util.deprecated_params() decorator in this + # case because of the additional function call overhead on this + # very performance-critical spot. + util.warn_deprecated( + "The IdentifierPreparer.quote_schema.force parameter is " + "deprecated and will be removed in a future release. This " + "flag has no effect on the behavior of the " + "IdentifierPreparer.quote method; please refer to " + "quoted_name().", + version="1.3", + ) + + dbname, owner = _schema_elements(schema) + if dbname: + result = "%s.%s" % (self.quote(dbname), self.quote(owner)) + elif owner: + result = self.quote(owner) + else: + result = "" + return result + + +def _db_plus_owner_listing(fn): + def wrap(dialect, connection, schema=None, **kw): + dbname, owner = _owner_plus_db(dialect, schema) + return _switch_db( + dbname, + connection, + fn, + dialect, + connection, + dbname, + owner, + schema, + **kw, + ) + + return update_wrapper(wrap, fn) + + +def _db_plus_owner(fn): + def wrap(dialect, connection, tablename, schema=None, **kw): + dbname, owner = _owner_plus_db(dialect, schema) + return _switch_db( + dbname, + connection, + fn, + dialect, + connection, + tablename, + dbname, + owner, + schema, + **kw, + ) + + return update_wrapper(wrap, fn) + + +def _switch_db(dbname, connection, fn, *arg, **kw): + if dbname: + current_db = connection.exec_driver_sql("select db_name()").scalar() + if current_db != dbname: + connection.exec_driver_sql( + "use %s" % connection.dialect.identifier_preparer.quote(dbname) + ) + try: + return fn(*arg, **kw) + finally: + if dbname and current_db != dbname: + connection.exec_driver_sql( + "use %s" + % connection.dialect.identifier_preparer.quote(current_db) + ) + + +def _owner_plus_db(dialect, schema): + if not schema: + return None, dialect.default_schema_name + else: + return _schema_elements(schema) + + +_memoized_schema = util.LRUCache() + + +def _schema_elements(schema): + if isinstance(schema, quoted_name) and schema.quote: + return None, schema + + if schema in _memoized_schema: + return _memoized_schema[schema] + + # tests for this function are in: + # test/dialect/mssql/test_reflection.py -> + # OwnerPlusDBTest.test_owner_database_pairs + # test/dialect/mssql/test_compiler.py -> test_force_schema_* + # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_* + # + + if schema.startswith("__[SCHEMA_"): + return None, schema + + push = [] + symbol = "" + bracket = False + has_brackets = False + for token in re.split(r"(\[|\]|\.)", schema): + if not token: + continue + if token == "[": + bracket = True + has_brackets = True + elif token == "]": + bracket = False + elif not bracket and token == ".": + if has_brackets: + push.append("[%s]" % symbol) + else: + push.append(symbol) + symbol = "" + has_brackets = False + else: + symbol += token + if symbol: + push.append(symbol) + if len(push) > 1: + dbname, owner = ".".join(push[0:-1]), push[-1] + + # test for internal brackets + if re.match(r".*\].*\[.*", dbname[1:-1]): + dbname = quoted_name(dbname, quote=False) + else: + dbname = dbname.lstrip("[").rstrip("]") + + elif len(push): + dbname, owner = None, push[0] + else: + dbname, owner = None, None + + _memoized_schema[schema] = dbname, owner + return dbname, owner + + +class MSDialect(default.DefaultDialect): + # will assume it's at least mssql2005 + name = "mssql" + supports_statement_cache = True + supports_default_values = True + supports_empty_insert = False + favor_returning_over_lastrowid = True + + returns_native_bytes = True + + supports_comments = True + supports_default_metavalue = False + """dialect supports INSERT... VALUES (DEFAULT) syntax - + SQL Server **does** support this, but **not** for the IDENTITY column, + so we can't turn this on. + + """ + + # supports_native_uuid is partial here, so we implement our + # own impl type + + execution_ctx_cls = MSExecutionContext + use_scope_identity = True + max_identifier_length = 128 + schema_name = "dbo" + + insert_returning = True + update_returning = True + delete_returning = True + update_returning_multifrom = True + delete_returning_multifrom = True + + colspecs = { + sqltypes.DateTime: _MSDateTime, + sqltypes.Date: _MSDate, + sqltypes.JSON: JSON, + sqltypes.JSON.JSONIndexType: JSONIndexType, + sqltypes.JSON.JSONPathType: JSONPathType, + sqltypes.Time: _BASETIMEIMPL, + sqltypes.Unicode: _MSUnicode, + sqltypes.UnicodeText: _MSUnicodeText, + DATETIMEOFFSET: DATETIMEOFFSET, + DATETIME2: DATETIME2, + SMALLDATETIME: SMALLDATETIME, + DATETIME: DATETIME, + sqltypes.Uuid: MSUUid, + } + + engine_config_types = default.DefaultDialect.engine_config_types.union( + {"legacy_schema_aliasing": util.asbool} + ) + + ischema_names = ischema_names + + supports_sequences = True + sequences_optional = True + # This is actually used for autoincrement, where itentity is used that + # starts with 1. + # for sequences T-SQL's actual default is -9223372036854775808 + default_sequence_base = 1 + + supports_native_boolean = False + non_native_boolean_check_constraint = False + supports_unicode_binds = True + postfetch_lastrowid = True + + # may be changed at server inspection time for older SQL server versions + supports_multivalues_insert = True + + use_insertmanyvalues = True + + # note pyodbc will set this to False if fast_executemany is set, + # as of SQLAlchemy 2.0.9 + use_insertmanyvalues_wo_returning = True + + insertmanyvalues_implicit_sentinel = ( + InsertmanyvaluesSentinelOpts.AUTOINCREMENT + | InsertmanyvaluesSentinelOpts.IDENTITY + | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT + ) + + # "The incoming request has too many parameters. The server supports a " + # "maximum of 2100 parameters." + # in fact you can have 2099 parameters. + insertmanyvalues_max_parameters = 2099 + + _supports_offset_fetch = False + _supports_nvarchar_max = False + + legacy_schema_aliasing = False + + server_version_info = () + + statement_compiler = MSSQLCompiler + ddl_compiler = MSDDLCompiler + type_compiler_cls = MSTypeCompiler + preparer = MSIdentifierPreparer + + construct_arguments = [ + (sa_schema.PrimaryKeyConstraint, {"clustered": None}), + (sa_schema.UniqueConstraint, {"clustered": None}), + ( + sa_schema.Index, + { + "clustered": None, + "include": None, + "where": None, + "columnstore": None, + }, + ), + ( + sa_schema.Column, + {"identity_start": None, "identity_increment": None}, + ), + ] + + def __init__( + self, + query_timeout=None, + use_scope_identity=True, + schema_name="dbo", + deprecate_large_types=None, + supports_comments=None, + json_serializer=None, + json_deserializer=None, + legacy_schema_aliasing=None, + ignore_no_transaction_on_rollback=False, + **opts, + ): + self.query_timeout = int(query_timeout or 0) + self.schema_name = schema_name + + self.use_scope_identity = use_scope_identity + self.deprecate_large_types = deprecate_large_types + self.ignore_no_transaction_on_rollback = ( + ignore_no_transaction_on_rollback + ) + self._user_defined_supports_comments = uds = supports_comments + if uds is not None: + self.supports_comments = uds + + if legacy_schema_aliasing is not None: + util.warn_deprecated( + "The legacy_schema_aliasing parameter is " + "deprecated and will be removed in a future release.", + "1.4", + ) + self.legacy_schema_aliasing = legacy_schema_aliasing + + super().__init__(**opts) + + self._json_serializer = json_serializer + self._json_deserializer = json_deserializer + + def do_savepoint(self, connection, name): + # give the DBAPI a push + connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION") + super().do_savepoint(connection, name) + + def do_release_savepoint(self, connection, name): + # SQL Server does not support RELEASE SAVEPOINT + pass + + def do_rollback(self, dbapi_connection): + try: + super().do_rollback(dbapi_connection) + except self.dbapi.ProgrammingError as e: + if self.ignore_no_transaction_on_rollback and re.match( + r".*\b111214\b", str(e) + ): + util.warn( + "ProgrammingError 111214 " + "'No corresponding transaction found.' " + "has been suppressed via " + "ignore_no_transaction_on_rollback=True" + ) + else: + raise + + _isolation_lookup = { + "SERIALIZABLE", + "READ UNCOMMITTED", + "READ COMMITTED", + "REPEATABLE READ", + "SNAPSHOT", + } + + def get_isolation_level_values(self, dbapi_connection): + return list(self._isolation_lookup) + + def set_isolation_level(self, dbapi_connection, level): + cursor = dbapi_connection.cursor() + cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}") + cursor.close() + if level == "SNAPSHOT": + dbapi_connection.commit() + + def get_isolation_level(self, dbapi_connection): + cursor = dbapi_connection.cursor() + view_name = "sys.system_views" + try: + cursor.execute( + ( + "SELECT name FROM {} WHERE name IN " + "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')" + ).format(view_name) + ) + row = cursor.fetchone() + if not row: + raise NotImplementedError( + "Can't fetch isolation level on this particular " + "SQL Server version." + ) + + view_name = f"sys.{row[0]}" + + cursor.execute( + """ + SELECT CASE transaction_isolation_level + WHEN 0 THEN NULL + WHEN 1 THEN 'READ UNCOMMITTED' + WHEN 2 THEN 'READ COMMITTED' + WHEN 3 THEN 'REPEATABLE READ' + WHEN 4 THEN 'SERIALIZABLE' + WHEN 5 THEN 'SNAPSHOT' END + AS TRANSACTION_ISOLATION_LEVEL + FROM {} + where session_id = @@SPID + """.format( + view_name + ) + ) + except self.dbapi.Error as err: + raise NotImplementedError( + "Can't fetch isolation level; encountered error {} when " + 'attempting to query the "{}" view.'.format(err, view_name) + ) from err + else: + row = cursor.fetchone() + return row[0].upper() + finally: + cursor.close() + + def initialize(self, connection): + super().initialize(connection) + self._setup_version_attributes() + self._setup_supports_nvarchar_max(connection) + self._setup_supports_comments(connection) + + def _setup_version_attributes(self): + if self.server_version_info[0] not in list(range(8, 17)): + util.warn( + "Unrecognized server version info '%s'. Some SQL Server " + "features may not function properly." + % ".".join(str(x) for x in self.server_version_info) + ) + + if self.server_version_info >= MS_2008_VERSION: + self.supports_multivalues_insert = True + else: + self.supports_multivalues_insert = False + + if self.deprecate_large_types is None: + self.deprecate_large_types = ( + self.server_version_info >= MS_2012_VERSION + ) + + self._supports_offset_fetch = ( + self.server_version_info and self.server_version_info[0] >= 11 + ) + + def _setup_supports_nvarchar_max(self, connection): + try: + connection.scalar( + sql.text("SELECT CAST('test max support' AS NVARCHAR(max))") + ) + except exc.DBAPIError: + self._supports_nvarchar_max = False + else: + self._supports_nvarchar_max = True + + def _setup_supports_comments(self, connection): + if self._user_defined_supports_comments is not None: + return + + try: + connection.scalar( + sql.text( + "SELECT 1 FROM fn_listextendedproperty" + "(default, default, default, default, " + "default, default, default)" + ) + ) + except exc.DBAPIError: + self.supports_comments = False + else: + self.supports_comments = True + + def _get_default_schema_name(self, connection): + query = sql.text("SELECT schema_name()") + default_schema_name = connection.scalar(query) + if default_schema_name is not None: + # guard against the case where the default_schema_name is being + # fed back into a table reflection function. + return quoted_name(default_schema_name, quote=True) + else: + return self.schema_name + + @_db_plus_owner + def has_table(self, connection, tablename, dbname, owner, schema, **kw): + self._ensure_has_table_connection(connection) + + return self._internal_has_table(connection, tablename, owner, **kw) + + @reflection.cache + @_db_plus_owner + def has_sequence( + self, connection, sequencename, dbname, owner, schema, **kw + ): + sequences = ischema.sequences + + s = sql.select(sequences.c.sequence_name).where( + sequences.c.sequence_name == sequencename + ) + + if owner: + s = s.where(sequences.c.sequence_schema == owner) + + c = connection.execute(s) + + return c.first() is not None + + @reflection.cache + @_db_plus_owner_listing + def get_sequence_names(self, connection, dbname, owner, schema, **kw): + sequences = ischema.sequences + + s = sql.select(sequences.c.sequence_name) + if owner: + s = s.where(sequences.c.sequence_schema == owner) + + c = connection.execute(s) + + return [row[0] for row in c] + + @reflection.cache + def get_schema_names(self, connection, **kw): + s = sql.select(ischema.schemata.c.schema_name).order_by( + ischema.schemata.c.schema_name + ) + schema_names = [r[0] for r in connection.execute(s)] + return schema_names + + @reflection.cache + @_db_plus_owner_listing + def get_table_names(self, connection, dbname, owner, schema, **kw): + tables = ischema.tables + s = ( + sql.select(tables.c.table_name) + .where( + sql.and_( + tables.c.table_schema == owner, + tables.c.table_type == "BASE TABLE", + ) + ) + .order_by(tables.c.table_name) + ) + table_names = [r[0] for r in connection.execute(s)] + return table_names + + @reflection.cache + @_db_plus_owner_listing + def get_view_names(self, connection, dbname, owner, schema, **kw): + tables = ischema.tables + s = ( + sql.select(tables.c.table_name) + .where( + sql.and_( + tables.c.table_schema == owner, + tables.c.table_type == "VIEW", + ) + ) + .order_by(tables.c.table_name) + ) + view_names = [r[0] for r in connection.execute(s)] + return view_names + + @reflection.cache + def _internal_has_table(self, connection, tablename, owner, **kw): + if tablename.startswith("#"): # temporary table + # mssql does not support temporary views + # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed + return bool( + connection.scalar( + # U filters on user tables only. + text("SELECT object_id(:table_name, 'U')"), + {"table_name": f"tempdb.dbo.[{tablename}]"}, + ) + ) + else: + tables = ischema.tables + + s = sql.select(tables.c.table_name).where( + sql.and_( + sql.or_( + tables.c.table_type == "BASE TABLE", + tables.c.table_type == "VIEW", + ), + tables.c.table_name == tablename, + ) + ) + + if owner: + s = s.where(tables.c.table_schema == owner) + + c = connection.execute(s) + + return c.first() is not None + + def _default_or_error(self, connection, tablename, owner, method, **kw): + # TODO: try to avoid having to run a separate query here + if self._internal_has_table(connection, tablename, owner, **kw): + return method() + else: + raise exc.NoSuchTableError(f"{owner}.{tablename}") + + @reflection.cache + @_db_plus_owner + def get_indexes(self, connection, tablename, dbname, owner, schema, **kw): + filter_definition = ( + "ind.filter_definition" + if self.server_version_info >= MS_2008_VERSION + else "NULL as filter_definition" + ) + rp = connection.execution_options(future_result=True).execute( + sql.text( + f""" +select + ind.index_id, + ind.is_unique, + ind.name, + ind.type, + {filter_definition} +from + sys.indexes as ind +join sys.tables as tab on + ind.object_id = tab.object_id +join sys.schemas as sch on + sch.schema_id = tab.schema_id +where + tab.name = :tabname + and sch.name = :schname + and ind.is_primary_key = 0 + and ind.type != 0 +order by + ind.name + """ + ) + .bindparams( + sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), + sql.bindparam("schname", owner, ischema.CoerceUnicode()), + ) + .columns(name=sqltypes.Unicode()) + ) + indexes = {} + for row in rp.mappings(): + indexes[row["index_id"]] = current = { + "name": row["name"], + "unique": row["is_unique"] == 1, + "column_names": [], + "include_columns": [], + "dialect_options": {}, + } + + do = current["dialect_options"] + index_type = row["type"] + if index_type in {1, 2}: + do["mssql_clustered"] = index_type == 1 + if index_type in {5, 6}: + do["mssql_clustered"] = index_type == 5 + do["mssql_columnstore"] = True + if row["filter_definition"] is not None: + do["mssql_where"] = row["filter_definition"] + + rp = connection.execution_options(future_result=True).execute( + sql.text( + """ +select + ind_col.index_id, + col.name, + ind_col.is_included_column +from + sys.columns as col +join sys.tables as tab on + tab.object_id = col.object_id +join sys.index_columns as ind_col on + ind_col.column_id = col.column_id + and ind_col.object_id = tab.object_id +join sys.schemas as sch on + sch.schema_id = tab.schema_id +where + tab.name = :tabname + and sch.name = :schname + """ + ) + .bindparams( + sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), + sql.bindparam("schname", owner, ischema.CoerceUnicode()), + ) + .columns(name=sqltypes.Unicode()) + ) + for row in rp.mappings(): + if row["index_id"] not in indexes: + continue + index_def = indexes[row["index_id"]] + is_colstore = index_def["dialect_options"].get("mssql_columnstore") + is_clustered = index_def["dialect_options"].get("mssql_clustered") + if not (is_colstore and is_clustered): + # a clustered columnstore index includes all columns but does + # not want them in the index definition + if row["is_included_column"] and not is_colstore: + # a noncludsted columnstore index reports that includes + # columns but requires that are listed as normal columns + index_def["include_columns"].append(row["name"]) + else: + index_def["column_names"].append(row["name"]) + for index_info in indexes.values(): + # NOTE: "root level" include_columns is legacy, now part of + # dialect_options (issue #7382) + index_info["dialect_options"]["mssql_include"] = index_info[ + "include_columns" + ] + + if indexes: + return list(indexes.values()) + else: + return self._default_or_error( + connection, tablename, owner, ReflectionDefaults.indexes, **kw + ) + + @reflection.cache + @_db_plus_owner + def get_view_definition( + self, connection, viewname, dbname, owner, schema, **kw + ): + view_def = connection.execute( + sql.text( + "select mod.definition " + "from sys.sql_modules as mod " + "join sys.views as views on mod.object_id = views.object_id " + "join sys.schemas as sch on views.schema_id = sch.schema_id " + "where views.name=:viewname and sch.name=:schname" + ).bindparams( + sql.bindparam("viewname", viewname, ischema.CoerceUnicode()), + sql.bindparam("schname", owner, ischema.CoerceUnicode()), + ) + ).scalar() + if view_def: + return view_def + else: + raise exc.NoSuchTableError(f"{owner}.{viewname}") + + @reflection.cache + def get_table_comment(self, connection, table_name, schema=None, **kw): + if not self.supports_comments: + raise NotImplementedError( + "Can't get table comments on current SQL Server version in use" + ) + + schema_name = schema if schema else self.default_schema_name + COMMENT_SQL = """ + SELECT cast(com.value as nvarchar(max)) + FROM fn_listextendedproperty('MS_Description', + 'schema', :schema, 'table', :table, NULL, NULL + ) as com; + """ + + comment = connection.execute( + sql.text(COMMENT_SQL).bindparams( + sql.bindparam("schema", schema_name, ischema.CoerceUnicode()), + sql.bindparam("table", table_name, ischema.CoerceUnicode()), + ) + ).scalar() + if comment: + return {"text": comment} + else: + return self._default_or_error( + connection, + table_name, + None, + ReflectionDefaults.table_comment, + **kw, + ) + + def _temp_table_name_like_pattern(self, tablename): + # LIKE uses '%' to match zero or more characters and '_' to match any + # single character. We want to match literal underscores, so T-SQL + # requires that we enclose them in square brackets. + return tablename + ( + ("[_][_][_]%") if not tablename.startswith("##") else "" + ) + + def _get_internal_temp_table_name(self, connection, tablename): + # it's likely that schema is always "dbo", but since we can + # get it here, let's get it. + # see https://stackoverflow.com/questions/8311959/ + # specifying-schema-for-temporary-tables + + try: + return connection.execute( + sql.text( + "select table_schema, table_name " + "from tempdb.information_schema.tables " + "where table_name like :p1" + ), + {"p1": self._temp_table_name_like_pattern(tablename)}, + ).one() + except exc.MultipleResultsFound as me: + raise exc.UnreflectableTableError( + "Found more than one temporary table named '%s' in tempdb " + "at this time. Cannot reliably resolve that name to its " + "internal table name." % tablename + ) from me + except exc.NoResultFound as ne: + raise exc.NoSuchTableError( + "Unable to find a temporary table named '%s' in tempdb." + % tablename + ) from ne + + @reflection.cache + @_db_plus_owner + def get_columns(self, connection, tablename, dbname, owner, schema, **kw): + is_temp_table = tablename.startswith("#") + if is_temp_table: + owner, tablename = self._get_internal_temp_table_name( + connection, tablename + ) + + columns = ischema.mssql_temp_table_columns + else: + columns = ischema.columns + + computed_cols = ischema.computed_columns + identity_cols = ischema.identity_columns + if owner: + whereclause = sql.and_( + columns.c.table_name == tablename, + columns.c.table_schema == owner, + ) + full_name = columns.c.table_schema + "." + columns.c.table_name + else: + whereclause = columns.c.table_name == tablename + full_name = columns.c.table_name + + if self._supports_nvarchar_max: + computed_definition = computed_cols.c.definition + else: + # tds_version 4.2 does not support NVARCHAR(MAX) + computed_definition = sql.cast( + computed_cols.c.definition, NVARCHAR(4000) + ) + + object_id = func.object_id(full_name) + + s = ( + sql.select( + columns.c.column_name, + columns.c.data_type, + columns.c.is_nullable, + columns.c.character_maximum_length, + columns.c.numeric_precision, + columns.c.numeric_scale, + columns.c.column_default, + columns.c.collation_name, + computed_definition, + computed_cols.c.is_persisted, + identity_cols.c.is_identity, + identity_cols.c.seed_value, + identity_cols.c.increment_value, + ischema.extended_properties.c.value.label("comment"), + ) + .select_from(columns) + .outerjoin( + computed_cols, + onclause=sql.and_( + computed_cols.c.object_id == object_id, + computed_cols.c.name + == columns.c.column_name.collate("DATABASE_DEFAULT"), + ), + ) + .outerjoin( + identity_cols, + onclause=sql.and_( + identity_cols.c.object_id == object_id, + identity_cols.c.name + == columns.c.column_name.collate("DATABASE_DEFAULT"), + ), + ) + .outerjoin( + ischema.extended_properties, + onclause=sql.and_( + ischema.extended_properties.c["class"] == 1, + ischema.extended_properties.c.major_id == object_id, + ischema.extended_properties.c.minor_id + == columns.c.ordinal_position, + ischema.extended_properties.c.name == "MS_Description", + ), + ) + .where(whereclause) + .order_by(columns.c.ordinal_position) + ) + + c = connection.execution_options(future_result=True).execute(s) + + cols = [] + for row in c.mappings(): + name = row[columns.c.column_name] + type_ = row[columns.c.data_type] + nullable = row[columns.c.is_nullable] == "YES" + charlen = row[columns.c.character_maximum_length] + numericprec = row[columns.c.numeric_precision] + numericscale = row[columns.c.numeric_scale] + default = row[columns.c.column_default] + collation = row[columns.c.collation_name] + definition = row[computed_definition] + is_persisted = row[computed_cols.c.is_persisted] + is_identity = row[identity_cols.c.is_identity] + identity_start = row[identity_cols.c.seed_value] + identity_increment = row[identity_cols.c.increment_value] + comment = row[ischema.extended_properties.c.value] + + coltype = self.ischema_names.get(type_, None) + + kwargs = {} + if coltype in ( + MSString, + MSChar, + MSNVarchar, + MSNChar, + MSText, + MSNText, + MSBinary, + MSVarBinary, + sqltypes.LargeBinary, + ): + if charlen == -1: + charlen = None + kwargs["length"] = charlen + if collation: + kwargs["collation"] = collation + + if coltype is None: + util.warn( + "Did not recognize type '%s' of column '%s'" + % (type_, name) + ) + coltype = sqltypes.NULLTYPE + else: + if issubclass(coltype, sqltypes.Numeric): + kwargs["precision"] = numericprec + + if not issubclass(coltype, sqltypes.Float): + kwargs["scale"] = numericscale + + coltype = coltype(**kwargs) + cdict = { + "name": name, + "type": coltype, + "nullable": nullable, + "default": default, + "autoincrement": is_identity is not None, + "comment": comment, + } + + if definition is not None and is_persisted is not None: + cdict["computed"] = { + "sqltext": definition, + "persisted": is_persisted, + } + + if is_identity is not None: + # identity_start and identity_increment are Decimal or None + if identity_start is None or identity_increment is None: + cdict["identity"] = {} + else: + if isinstance(coltype, sqltypes.BigInteger): + start = int(identity_start) + increment = int(identity_increment) + elif isinstance(coltype, sqltypes.Integer): + start = int(identity_start) + increment = int(identity_increment) + else: + start = identity_start + increment = identity_increment + + cdict["identity"] = { + "start": start, + "increment": increment, + } + + cols.append(cdict) + + if cols: + return cols + else: + return self._default_or_error( + connection, tablename, owner, ReflectionDefaults.columns, **kw + ) + + @reflection.cache + @_db_plus_owner + def get_pk_constraint( + self, connection, tablename, dbname, owner, schema, **kw + ): + pkeys = [] + TC = ischema.constraints + C = ischema.key_constraints.alias("C") + + # Primary key constraints + s = ( + sql.select( + C.c.column_name, + TC.c.constraint_type, + C.c.constraint_name, + func.objectproperty( + func.object_id( + C.c.table_schema + "." + C.c.constraint_name + ), + "CnstIsClustKey", + ).label("is_clustered"), + ) + .where( + sql.and_( + TC.c.constraint_name == C.c.constraint_name, + TC.c.table_schema == C.c.table_schema, + C.c.table_name == tablename, + C.c.table_schema == owner, + ), + ) + .order_by(TC.c.constraint_name, C.c.ordinal_position) + ) + c = connection.execution_options(future_result=True).execute(s) + constraint_name = None + is_clustered = None + for row in c.mappings(): + if "PRIMARY" in row[TC.c.constraint_type.name]: + pkeys.append(row["COLUMN_NAME"]) + if constraint_name is None: + constraint_name = row[C.c.constraint_name.name] + if is_clustered is None: + is_clustered = row["is_clustered"] + if pkeys: + return { + "constrained_columns": pkeys, + "name": constraint_name, + "dialect_options": {"mssql_clustered": is_clustered}, + } + else: + return self._default_or_error( + connection, + tablename, + owner, + ReflectionDefaults.pk_constraint, + **kw, + ) + + @reflection.cache + @_db_plus_owner + def get_foreign_keys( + self, connection, tablename, dbname, owner, schema, **kw + ): + # Foreign key constraints + s = ( + text( + """\ +WITH fk_info AS ( + SELECT + ischema_ref_con.constraint_schema, + ischema_ref_con.constraint_name, + ischema_key_col.ordinal_position, + ischema_key_col.table_schema, + ischema_key_col.table_name, + ischema_ref_con.unique_constraint_schema, + ischema_ref_con.unique_constraint_name, + ischema_ref_con.match_option, + ischema_ref_con.update_rule, + ischema_ref_con.delete_rule, + ischema_key_col.column_name AS constrained_column + FROM + INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con + INNER JOIN + INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON + ischema_key_col.table_schema = ischema_ref_con.constraint_schema + AND ischema_key_col.constraint_name = + ischema_ref_con.constraint_name + WHERE ischema_key_col.table_name = :tablename + AND ischema_key_col.table_schema = :owner +), +constraint_info AS ( + SELECT + ischema_key_col.constraint_schema, + ischema_key_col.constraint_name, + ischema_key_col.ordinal_position, + ischema_key_col.table_schema, + ischema_key_col.table_name, + ischema_key_col.column_name + FROM + INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col +), +index_info AS ( + SELECT + sys.schemas.name AS index_schema, + sys.indexes.name AS index_name, + sys.index_columns.key_ordinal AS ordinal_position, + sys.schemas.name AS table_schema, + sys.objects.name AS table_name, + sys.columns.name AS column_name + FROM + sys.indexes + INNER JOIN + sys.objects ON + sys.objects.object_id = sys.indexes.object_id + INNER JOIN + sys.schemas ON + sys.schemas.schema_id = sys.objects.schema_id + INNER JOIN + sys.index_columns ON + sys.index_columns.object_id = sys.objects.object_id + AND sys.index_columns.index_id = sys.indexes.index_id + INNER JOIN + sys.columns ON + sys.columns.object_id = sys.indexes.object_id + AND sys.columns.column_id = sys.index_columns.column_id +) + SELECT + fk_info.constraint_schema, + fk_info.constraint_name, + fk_info.ordinal_position, + fk_info.constrained_column, + constraint_info.table_schema AS referred_table_schema, + constraint_info.table_name AS referred_table_name, + constraint_info.column_name AS referred_column, + fk_info.match_option, + fk_info.update_rule, + fk_info.delete_rule + FROM + fk_info INNER JOIN constraint_info ON + constraint_info.constraint_schema = + fk_info.unique_constraint_schema + AND constraint_info.constraint_name = + fk_info.unique_constraint_name + AND constraint_info.ordinal_position = fk_info.ordinal_position + UNION + SELECT + fk_info.constraint_schema, + fk_info.constraint_name, + fk_info.ordinal_position, + fk_info.constrained_column, + index_info.table_schema AS referred_table_schema, + index_info.table_name AS referred_table_name, + index_info.column_name AS referred_column, + fk_info.match_option, + fk_info.update_rule, + fk_info.delete_rule + FROM + fk_info INNER JOIN index_info ON + index_info.index_schema = fk_info.unique_constraint_schema + AND index_info.index_name = fk_info.unique_constraint_name + AND index_info.ordinal_position = fk_info.ordinal_position + + ORDER BY fk_info.constraint_schema, fk_info.constraint_name, + fk_info.ordinal_position +""" + ) + .bindparams( + sql.bindparam("tablename", tablename, ischema.CoerceUnicode()), + sql.bindparam("owner", owner, ischema.CoerceUnicode()), + ) + .columns( + constraint_schema=sqltypes.Unicode(), + constraint_name=sqltypes.Unicode(), + table_schema=sqltypes.Unicode(), + table_name=sqltypes.Unicode(), + constrained_column=sqltypes.Unicode(), + referred_table_schema=sqltypes.Unicode(), + referred_table_name=sqltypes.Unicode(), + referred_column=sqltypes.Unicode(), + ) + ) + + # group rows by constraint ID, to handle multi-column FKs + fkeys = [] + + def fkey_rec(): + return { + "name": None, + "constrained_columns": [], + "referred_schema": None, + "referred_table": None, + "referred_columns": [], + "options": {}, + } + + fkeys = util.defaultdict(fkey_rec) + + for r in connection.execute(s).all(): + ( + _, # constraint schema + rfknm, + _, # ordinal position + scol, + rschema, + rtbl, + rcol, + # TODO: we support match=<keyword> for foreign keys so + # we can support this also, PG has match=FULL for example + # but this seems to not be a valid value for SQL Server + _, # match rule + fkuprule, + fkdelrule, + ) = r + + rec = fkeys[rfknm] + rec["name"] = rfknm + + if fkuprule != "NO ACTION": + rec["options"]["onupdate"] = fkuprule + + if fkdelrule != "NO ACTION": + rec["options"]["ondelete"] = fkdelrule + + if not rec["referred_table"]: + rec["referred_table"] = rtbl + if schema is not None or owner != rschema: + if dbname: + rschema = dbname + "." + rschema + rec["referred_schema"] = rschema + + local_cols, remote_cols = ( + rec["constrained_columns"], + rec["referred_columns"], + ) + + local_cols.append(scol) + remote_cols.append(rcol) + + if fkeys: + return list(fkeys.values()) + else: + return self._default_or_error( + connection, + tablename, + owner, + ReflectionDefaults.foreign_keys, + **kw, + ) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/information_schema.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/information_schema.py new file mode 100644 index 0000000..0c5f237 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/information_schema.py @@ -0,0 +1,254 @@ +# dialects/mssql/information_schema.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +from ... import cast +from ... import Column +from ... import MetaData +from ... import Table +from ...ext.compiler import compiles +from ...sql import expression +from ...types import Boolean +from ...types import Integer +from ...types import Numeric +from ...types import NVARCHAR +from ...types import String +from ...types import TypeDecorator +from ...types import Unicode + + +ischema = MetaData() + + +class CoerceUnicode(TypeDecorator): + impl = Unicode + cache_ok = True + + def bind_expression(self, bindvalue): + return _cast_on_2005(bindvalue) + + +class _cast_on_2005(expression.ColumnElement): + def __init__(self, bindvalue): + self.bindvalue = bindvalue + + +@compiles(_cast_on_2005) +def _compile(element, compiler, **kw): + from . import base + + if ( + compiler.dialect.server_version_info is None + or compiler.dialect.server_version_info < base.MS_2005_VERSION + ): + return compiler.process(element.bindvalue, **kw) + else: + return compiler.process(cast(element.bindvalue, Unicode), **kw) + + +schemata = Table( + "SCHEMATA", + ischema, + Column("CATALOG_NAME", CoerceUnicode, key="catalog_name"), + Column("SCHEMA_NAME", CoerceUnicode, key="schema_name"), + Column("SCHEMA_OWNER", CoerceUnicode, key="schema_owner"), + schema="INFORMATION_SCHEMA", +) + +tables = Table( + "TABLES", + ischema, + Column("TABLE_CATALOG", CoerceUnicode, key="table_catalog"), + Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"), + Column("TABLE_NAME", CoerceUnicode, key="table_name"), + Column("TABLE_TYPE", CoerceUnicode, key="table_type"), + schema="INFORMATION_SCHEMA", +) + +columns = Table( + "COLUMNS", + ischema, + Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"), + Column("TABLE_NAME", CoerceUnicode, key="table_name"), + Column("COLUMN_NAME", CoerceUnicode, key="column_name"), + Column("IS_NULLABLE", Integer, key="is_nullable"), + Column("DATA_TYPE", String, key="data_type"), + Column("ORDINAL_POSITION", Integer, key="ordinal_position"), + Column( + "CHARACTER_MAXIMUM_LENGTH", Integer, key="character_maximum_length" + ), + Column("NUMERIC_PRECISION", Integer, key="numeric_precision"), + Column("NUMERIC_SCALE", Integer, key="numeric_scale"), + Column("COLUMN_DEFAULT", Integer, key="column_default"), + Column("COLLATION_NAME", String, key="collation_name"), + schema="INFORMATION_SCHEMA", +) + +mssql_temp_table_columns = Table( + "COLUMNS", + ischema, + Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"), + Column("TABLE_NAME", CoerceUnicode, key="table_name"), + Column("COLUMN_NAME", CoerceUnicode, key="column_name"), + Column("IS_NULLABLE", Integer, key="is_nullable"), + Column("DATA_TYPE", String, key="data_type"), + Column("ORDINAL_POSITION", Integer, key="ordinal_position"), + Column( + "CHARACTER_MAXIMUM_LENGTH", Integer, key="character_maximum_length" + ), + Column("NUMERIC_PRECISION", Integer, key="numeric_precision"), + Column("NUMERIC_SCALE", Integer, key="numeric_scale"), + Column("COLUMN_DEFAULT", Integer, key="column_default"), + Column("COLLATION_NAME", String, key="collation_name"), + schema="tempdb.INFORMATION_SCHEMA", +) + +constraints = Table( + "TABLE_CONSTRAINTS", + ischema, + Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"), + Column("TABLE_NAME", CoerceUnicode, key="table_name"), + Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"), + Column("CONSTRAINT_TYPE", CoerceUnicode, key="constraint_type"), + schema="INFORMATION_SCHEMA", +) + +column_constraints = Table( + "CONSTRAINT_COLUMN_USAGE", + ischema, + Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"), + Column("TABLE_NAME", CoerceUnicode, key="table_name"), + Column("COLUMN_NAME", CoerceUnicode, key="column_name"), + Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"), + schema="INFORMATION_SCHEMA", +) + +key_constraints = Table( + "KEY_COLUMN_USAGE", + ischema, + Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"), + Column("TABLE_NAME", CoerceUnicode, key="table_name"), + Column("COLUMN_NAME", CoerceUnicode, key="column_name"), + Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"), + Column("CONSTRAINT_SCHEMA", CoerceUnicode, key="constraint_schema"), + Column("ORDINAL_POSITION", Integer, key="ordinal_position"), + schema="INFORMATION_SCHEMA", +) + +ref_constraints = Table( + "REFERENTIAL_CONSTRAINTS", + ischema, + Column("CONSTRAINT_CATALOG", CoerceUnicode, key="constraint_catalog"), + Column("CONSTRAINT_SCHEMA", CoerceUnicode, key="constraint_schema"), + Column("CONSTRAINT_NAME", CoerceUnicode, key="constraint_name"), + # TODO: is CATLOG misspelled ? + Column( + "UNIQUE_CONSTRAINT_CATLOG", + CoerceUnicode, + key="unique_constraint_catalog", + ), + Column( + "UNIQUE_CONSTRAINT_SCHEMA", + CoerceUnicode, + key="unique_constraint_schema", + ), + Column( + "UNIQUE_CONSTRAINT_NAME", CoerceUnicode, key="unique_constraint_name" + ), + Column("MATCH_OPTION", String, key="match_option"), + Column("UPDATE_RULE", String, key="update_rule"), + Column("DELETE_RULE", String, key="delete_rule"), + schema="INFORMATION_SCHEMA", +) + +views = Table( + "VIEWS", + ischema, + Column("TABLE_CATALOG", CoerceUnicode, key="table_catalog"), + Column("TABLE_SCHEMA", CoerceUnicode, key="table_schema"), + Column("TABLE_NAME", CoerceUnicode, key="table_name"), + Column("VIEW_DEFINITION", CoerceUnicode, key="view_definition"), + Column("CHECK_OPTION", String, key="check_option"), + Column("IS_UPDATABLE", String, key="is_updatable"), + schema="INFORMATION_SCHEMA", +) + +computed_columns = Table( + "computed_columns", + ischema, + Column("object_id", Integer), + Column("name", CoerceUnicode), + Column("is_computed", Boolean), + Column("is_persisted", Boolean), + Column("definition", CoerceUnicode), + schema="sys", +) + +sequences = Table( + "SEQUENCES", + ischema, + Column("SEQUENCE_CATALOG", CoerceUnicode, key="sequence_catalog"), + Column("SEQUENCE_SCHEMA", CoerceUnicode, key="sequence_schema"), + Column("SEQUENCE_NAME", CoerceUnicode, key="sequence_name"), + schema="INFORMATION_SCHEMA", +) + + +class NumericSqlVariant(TypeDecorator): + r"""This type casts sql_variant columns in the identity_columns view + to numeric. This is required because: + + * pyodbc does not support sql_variant + * pymssql under python 2 return the byte representation of the number, + int 1 is returned as "\x01\x00\x00\x00". On python 3 it returns the + correct value as string. + """ + + impl = Unicode + cache_ok = True + + def column_expression(self, colexpr): + return cast(colexpr, Numeric(38, 0)) + + +identity_columns = Table( + "identity_columns", + ischema, + Column("object_id", Integer), + Column("name", CoerceUnicode), + Column("is_identity", Boolean), + Column("seed_value", NumericSqlVariant), + Column("increment_value", NumericSqlVariant), + Column("last_value", NumericSqlVariant), + Column("is_not_for_replication", Boolean), + schema="sys", +) + + +class NVarcharSqlVariant(TypeDecorator): + """This type casts sql_variant columns in the extended_properties view + to nvarchar. This is required because pyodbc does not support sql_variant + """ + + impl = Unicode + cache_ok = True + + def column_expression(self, colexpr): + return cast(colexpr, NVARCHAR) + + +extended_properties = Table( + "extended_properties", + ischema, + Column("class", Integer), # TINYINT + Column("class_desc", CoerceUnicode), + Column("major_id", Integer), + Column("minor_id", Integer), + Column("name", CoerceUnicode), + Column("value", NVarcharSqlVariant), + schema="sys", +) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/json.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/json.py new file mode 100644 index 0000000..18bea09 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/json.py @@ -0,0 +1,133 @@ +# dialects/mssql/json.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +from ... import types as sqltypes + +# technically, all the dialect-specific datatypes that don't have any special +# behaviors would be private with names like _MSJson. However, we haven't been +# doing this for mysql.JSON or sqlite.JSON which both have JSON / JSONIndexType +# / JSONPathType in their json.py files, so keep consistent with that +# sub-convention for now. A future change can update them all to be +# package-private at once. + + +class JSON(sqltypes.JSON): + """MSSQL JSON type. + + MSSQL supports JSON-formatted data as of SQL Server 2016. + + The :class:`_mssql.JSON` datatype at the DDL level will represent the + datatype as ``NVARCHAR(max)``, but provides for JSON-level comparison + functions as well as Python coercion behavior. + + :class:`_mssql.JSON` is used automatically whenever the base + :class:`_types.JSON` datatype is used against a SQL Server backend. + + .. seealso:: + + :class:`_types.JSON` - main documentation for the generic + cross-platform JSON datatype. + + The :class:`_mssql.JSON` type supports persistence of JSON values + as well as the core index operations provided by :class:`_types.JSON` + datatype, by adapting the operations to render the ``JSON_VALUE`` + or ``JSON_QUERY`` functions at the database level. + + The SQL Server :class:`_mssql.JSON` type necessarily makes use of the + ``JSON_QUERY`` and ``JSON_VALUE`` functions when querying for elements + of a JSON object. These two functions have a major restriction in that + they are **mutually exclusive** based on the type of object to be returned. + The ``JSON_QUERY`` function **only** returns a JSON dictionary or list, + but not an individual string, numeric, or boolean element; the + ``JSON_VALUE`` function **only** returns an individual string, numeric, + or boolean element. **both functions either return NULL or raise + an error if they are not used against the correct expected value**. + + To handle this awkward requirement, indexed access rules are as follows: + + 1. When extracting a sub element from a JSON that is itself a JSON + dictionary or list, the :meth:`_types.JSON.Comparator.as_json` accessor + should be used:: + + stmt = select( + data_table.c.data["some key"].as_json() + ).where( + data_table.c.data["some key"].as_json() == {"sub": "structure"} + ) + + 2. When extracting a sub element from a JSON that is a plain boolean, + string, integer, or float, use the appropriate method among + :meth:`_types.JSON.Comparator.as_boolean`, + :meth:`_types.JSON.Comparator.as_string`, + :meth:`_types.JSON.Comparator.as_integer`, + :meth:`_types.JSON.Comparator.as_float`:: + + stmt = select( + data_table.c.data["some key"].as_string() + ).where( + data_table.c.data["some key"].as_string() == "some string" + ) + + .. versionadded:: 1.4 + + + """ + + # note there was a result processor here that was looking for "number", + # but none of the tests seem to exercise it. + + +# Note: these objects currently match exactly those of MySQL, however since +# these are not generalizable to all JSON implementations, remain separately +# implemented for each dialect. +class _FormatTypeMixin: + def _format_value(self, value): + raise NotImplementedError() + + def bind_processor(self, dialect): + super_proc = self.string_bind_processor(dialect) + + def process(value): + value = self._format_value(value) + if super_proc: + value = super_proc(value) + return value + + return process + + def literal_processor(self, dialect): + super_proc = self.string_literal_processor(dialect) + + def process(value): + value = self._format_value(value) + if super_proc: + value = super_proc(value) + return value + + return process + + +class JSONIndexType(_FormatTypeMixin, sqltypes.JSON.JSONIndexType): + def _format_value(self, value): + if isinstance(value, int): + value = "$[%s]" % value + else: + value = '$."%s"' % value + return value + + +class JSONPathType(_FormatTypeMixin, sqltypes.JSON.JSONPathType): + def _format_value(self, value): + return "$%s" % ( + "".join( + [ + "[%s]" % elem if isinstance(elem, int) else '."%s"' % elem + for elem in value + ] + ) + ) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/provision.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/provision.py new file mode 100644 index 0000000..143d386 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/provision.py @@ -0,0 +1,155 @@ +# dialects/mssql/provision.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +from sqlalchemy import inspect +from sqlalchemy import Integer +from ... import create_engine +from ... import exc +from ...schema import Column +from ...schema import DropConstraint +from ...schema import ForeignKeyConstraint +from ...schema import MetaData +from ...schema import Table +from ...testing.provision import create_db +from ...testing.provision import drop_all_schema_objects_pre_tables +from ...testing.provision import drop_db +from ...testing.provision import generate_driver_url +from ...testing.provision import get_temp_table_name +from ...testing.provision import log +from ...testing.provision import normalize_sequence +from ...testing.provision import run_reap_dbs +from ...testing.provision import temp_table_keyword_args + + +@generate_driver_url.for_db("mssql") +def generate_driver_url(url, driver, query_str): + backend = url.get_backend_name() + + new_url = url.set(drivername="%s+%s" % (backend, driver)) + + if driver not in ("pyodbc", "aioodbc"): + new_url = new_url.set(query="") + + if driver == "aioodbc": + new_url = new_url.update_query_dict({"MARS_Connection": "Yes"}) + + if query_str: + new_url = new_url.update_query_string(query_str) + + try: + new_url.get_dialect() + except exc.NoSuchModuleError: + return None + else: + return new_url + + +@create_db.for_db("mssql") +def _mssql_create_db(cfg, eng, ident): + with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + conn.exec_driver_sql("create database %s" % ident) + conn.exec_driver_sql( + "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident + ) + conn.exec_driver_sql( + "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident + ) + conn.exec_driver_sql("use %s" % ident) + conn.exec_driver_sql("create schema test_schema") + conn.exec_driver_sql("create schema test_schema_2") + + +@drop_db.for_db("mssql") +def _mssql_drop_db(cfg, eng, ident): + with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + _mssql_drop_ignore(conn, ident) + + +def _mssql_drop_ignore(conn, ident): + try: + # typically when this happens, we can't KILL the session anyway, + # so let the cleanup process drop the DBs + # for row in conn.exec_driver_sql( + # "select session_id from sys.dm_exec_sessions " + # "where database_id=db_id('%s')" % ident): + # log.info("killing SQL server session %s", row['session_id']) + # conn.exec_driver_sql("kill %s" % row['session_id']) + conn.exec_driver_sql("drop database %s" % ident) + log.info("Reaped db: %s", ident) + return True + except exc.DatabaseError as err: + log.warning("couldn't drop db: %s", err) + return False + + +@run_reap_dbs.for_db("mssql") +def _reap_mssql_dbs(url, idents): + log.info("db reaper connecting to %r", url) + eng = create_engine(url) + with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + log.info("identifiers in file: %s", ", ".join(idents)) + + to_reap = conn.exec_driver_sql( + "select d.name from sys.databases as d where name " + "like 'TEST_%' and not exists (select session_id " + "from sys.dm_exec_sessions " + "where database_id=d.database_id)" + ) + all_names = {dbname.lower() for (dbname,) in to_reap} + to_drop = set() + for name in all_names: + if name in idents: + to_drop.add(name) + + dropped = total = 0 + for total, dbname in enumerate(to_drop, 1): + if _mssql_drop_ignore(conn, dbname): + dropped += 1 + log.info( + "Dropped %d out of %d stale databases detected", dropped, total + ) + + +@temp_table_keyword_args.for_db("mssql") +def _mssql_temp_table_keyword_args(cfg, eng): + return {} + + +@get_temp_table_name.for_db("mssql") +def _mssql_get_temp_table_name(cfg, eng, base_name): + return "##" + base_name + + +@drop_all_schema_objects_pre_tables.for_db("mssql") +def drop_all_schema_objects_pre_tables(cfg, eng): + with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: + inspector = inspect(conn) + for schema in (None, "dbo", cfg.test_schema, cfg.test_schema_2): + for tname in inspector.get_table_names(schema=schema): + tb = Table( + tname, + MetaData(), + Column("x", Integer), + Column("y", Integer), + schema=schema, + ) + for fk in inspect(conn).get_foreign_keys(tname, schema=schema): + conn.execute( + DropConstraint( + ForeignKeyConstraint( + [tb.c.x], [tb.c.y], name=fk["name"] + ) + ) + ) + + +@normalize_sequence.for_db("mssql") +def normalize_sequence(cfg, sequence): + if sequence.start is None: + sequence.start = 1 + return sequence diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pymssql.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pymssql.py new file mode 100644 index 0000000..ea1f9bd --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pymssql.py @@ -0,0 +1,125 @@ +# dialects/mssql/pymssql.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + + +""" +.. dialect:: mssql+pymssql + :name: pymssql + :dbapi: pymssql + :connectstring: mssql+pymssql://<username>:<password>@<freetds_name>/?charset=utf8 + +pymssql is a Python module that provides a Python DBAPI interface around +`FreeTDS <https://www.freetds.org/>`_. + +.. versionchanged:: 2.0.5 + + pymssql was restored to SQLAlchemy's continuous integration testing + + +""" # noqa +import re + +from .base import MSDialect +from .base import MSIdentifierPreparer +from ... import types as sqltypes +from ... import util +from ...engine import processors + + +class _MSNumeric_pymssql(sqltypes.Numeric): + def result_processor(self, dialect, type_): + if not self.asdecimal: + return processors.to_float + else: + return sqltypes.Numeric.result_processor(self, dialect, type_) + + +class MSIdentifierPreparer_pymssql(MSIdentifierPreparer): + def __init__(self, dialect): + super().__init__(dialect) + # pymssql has the very unusual behavior that it uses pyformat + # yet does not require that percent signs be doubled + self._double_percents = False + + +class MSDialect_pymssql(MSDialect): + supports_statement_cache = True + supports_native_decimal = True + supports_native_uuid = True + driver = "pymssql" + + preparer = MSIdentifierPreparer_pymssql + + colspecs = util.update_copy( + MSDialect.colspecs, + {sqltypes.Numeric: _MSNumeric_pymssql, sqltypes.Float: sqltypes.Float}, + ) + + @classmethod + def import_dbapi(cls): + module = __import__("pymssql") + # pymmsql < 2.1.1 doesn't have a Binary method. we use string + client_ver = tuple(int(x) for x in module.__version__.split(".")) + if client_ver < (2, 1, 1): + # TODO: monkeypatching here is less than ideal + module.Binary = lambda x: x if hasattr(x, "decode") else str(x) + + if client_ver < (1,): + util.warn( + "The pymssql dialect expects at least " + "the 1.0 series of the pymssql DBAPI." + ) + return module + + def _get_server_version_info(self, connection): + vers = connection.exec_driver_sql("select @@version").scalar() + m = re.match(r"Microsoft .*? - (\d+)\.(\d+)\.(\d+)\.(\d+)", vers) + if m: + return tuple(int(x) for x in m.group(1, 2, 3, 4)) + else: + return None + + def create_connect_args(self, url): + opts = url.translate_connect_args(username="user") + opts.update(url.query) + port = opts.pop("port", None) + if port and "host" in opts: + opts["host"] = "%s:%s" % (opts["host"], port) + return ([], opts) + + def is_disconnect(self, e, connection, cursor): + for msg in ( + "Adaptive Server connection timed out", + "Net-Lib error during Connection reset by peer", + "message 20003", # connection timeout + "Error 10054", + "Not connected to any MS SQL server", + "Connection is closed", + "message 20006", # Write to the server failed + "message 20017", # Unexpected EOF from the server + "message 20047", # DBPROCESS is dead or not enabled + ): + if msg in str(e): + return True + else: + return False + + def get_isolation_level_values(self, dbapi_connection): + return super().get_isolation_level_values(dbapi_connection) + [ + "AUTOCOMMIT" + ] + + def set_isolation_level(self, dbapi_connection, level): + if level == "AUTOCOMMIT": + dbapi_connection.autocommit(True) + else: + dbapi_connection.autocommit(False) + super().set_isolation_level(dbapi_connection, level) + + +dialect = MSDialect_pymssql diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pyodbc.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pyodbc.py new file mode 100644 index 0000000..76ea046 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/pyodbc.py @@ -0,0 +1,745 @@ +# dialects/mssql/pyodbc.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +r""" +.. dialect:: mssql+pyodbc + :name: PyODBC + :dbapi: pyodbc + :connectstring: mssql+pyodbc://<username>:<password>@<dsnname> + :url: https://pypi.org/project/pyodbc/ + +Connecting to PyODBC +-------------------- + +The URL here is to be translated to PyODBC connection strings, as +detailed in `ConnectionStrings <https://code.google.com/p/pyodbc/wiki/ConnectionStrings>`_. + +DSN Connections +^^^^^^^^^^^^^^^ + +A DSN connection in ODBC means that a pre-existing ODBC datasource is +configured on the client machine. The application then specifies the name +of this datasource, which encompasses details such as the specific ODBC driver +in use as well as the network address of the database. Assuming a datasource +is configured on the client, a basic DSN-based connection looks like:: + + engine = create_engine("mssql+pyodbc://scott:tiger@some_dsn") + +Which above, will pass the following connection string to PyODBC:: + + DSN=some_dsn;UID=scott;PWD=tiger + +If the username and password are omitted, the DSN form will also add +the ``Trusted_Connection=yes`` directive to the ODBC string. + +Hostname Connections +^^^^^^^^^^^^^^^^^^^^ + +Hostname-based connections are also supported by pyodbc. These are often +easier to use than a DSN and have the additional advantage that the specific +database name to connect towards may be specified locally in the URL, rather +than it being fixed as part of a datasource configuration. + +When using a hostname connection, the driver name must also be specified in the +query parameters of the URL. As these names usually have spaces in them, the +name must be URL encoded which means using plus signs for spaces:: + + engine = create_engine("mssql+pyodbc://scott:tiger@myhost:port/databasename?driver=ODBC+Driver+17+for+SQL+Server") + +The ``driver`` keyword is significant to the pyodbc dialect and must be +specified in lowercase. + +Any other names passed in the query string are passed through in the pyodbc +connect string, such as ``authentication``, ``TrustServerCertificate``, etc. +Multiple keyword arguments must be separated by an ampersand (``&``); these +will be translated to semicolons when the pyodbc connect string is generated +internally:: + + e = create_engine( + "mssql+pyodbc://scott:tiger@mssql2017:1433/test?" + "driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes" + "&authentication=ActiveDirectoryIntegrated" + ) + +The equivalent URL can be constructed using :class:`_sa.engine.URL`:: + + from sqlalchemy.engine import URL + connection_url = URL.create( + "mssql+pyodbc", + username="scott", + password="tiger", + host="mssql2017", + port=1433, + database="test", + query={ + "driver": "ODBC Driver 18 for SQL Server", + "TrustServerCertificate": "yes", + "authentication": "ActiveDirectoryIntegrated", + }, + ) + + +Pass through exact Pyodbc string +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A PyODBC connection string can also be sent in pyodbc's format directly, as +specified in `the PyODBC documentation +<https://github.com/mkleehammer/pyodbc/wiki/Connecting-to-databases>`_, +using the parameter ``odbc_connect``. A :class:`_sa.engine.URL` object +can help make this easier:: + + from sqlalchemy.engine import URL + connection_string = "DRIVER={SQL Server Native Client 10.0};SERVER=dagger;DATABASE=test;UID=user;PWD=password" + connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": connection_string}) + + engine = create_engine(connection_url) + +.. _mssql_pyodbc_access_tokens: + +Connecting to databases with access tokens +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Some database servers are set up to only accept access tokens for login. For +example, SQL Server allows the use of Azure Active Directory tokens to connect +to databases. This requires creating a credential object using the +``azure-identity`` library. More information about the authentication step can be +found in `Microsoft's documentation +<https://docs.microsoft.com/en-us/azure/developer/python/azure-sdk-authenticate?tabs=bash>`_. + +After getting an engine, the credentials need to be sent to ``pyodbc.connect`` +each time a connection is requested. One way to do this is to set up an event +listener on the engine that adds the credential token to the dialect's connect +call. This is discussed more generally in :ref:`engines_dynamic_tokens`. For +SQL Server in particular, this is passed as an ODBC connection attribute with +a data structure `described by Microsoft +<https://docs.microsoft.com/en-us/sql/connect/odbc/using-azure-active-directory#authenticating-with-an-access-token>`_. + +The following code snippet will create an engine that connects to an Azure SQL +database using Azure credentials:: + + import struct + from sqlalchemy import create_engine, event + from sqlalchemy.engine.url import URL + from azure import identity + + SQL_COPT_SS_ACCESS_TOKEN = 1256 # Connection option for access tokens, as defined in msodbcsql.h + TOKEN_URL = "https://database.windows.net/" # The token URL for any Azure SQL database + + connection_string = "mssql+pyodbc://@my-server.database.windows.net/myDb?driver=ODBC+Driver+17+for+SQL+Server" + + engine = create_engine(connection_string) + + azure_credentials = identity.DefaultAzureCredential() + + @event.listens_for(engine, "do_connect") + def provide_token(dialect, conn_rec, cargs, cparams): + # remove the "Trusted_Connection" parameter that SQLAlchemy adds + cargs[0] = cargs[0].replace(";Trusted_Connection=Yes", "") + + # create token credential + raw_token = azure_credentials.get_token(TOKEN_URL).token.encode("utf-16-le") + token_struct = struct.pack(f"<I{len(raw_token)}s", len(raw_token), raw_token) + + # apply it to keyword arguments + cparams["attrs_before"] = {SQL_COPT_SS_ACCESS_TOKEN: token_struct} + +.. tip:: + + The ``Trusted_Connection`` token is currently added by the SQLAlchemy + pyodbc dialect when no username or password is present. This needs + to be removed per Microsoft's + `documentation for Azure access tokens + <https://docs.microsoft.com/en-us/sql/connect/odbc/using-azure-active-directory#authenticating-with-an-access-token>`_, + stating that a connection string when using an access token must not contain + ``UID``, ``PWD``, ``Authentication`` or ``Trusted_Connection`` parameters. + +.. _azure_synapse_ignore_no_transaction_on_rollback: + +Avoiding transaction-related exceptions on Azure Synapse Analytics +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Azure Synapse Analytics has a significant difference in its transaction +handling compared to plain SQL Server; in some cases an error within a Synapse +transaction can cause it to be arbitrarily terminated on the server side, which +then causes the DBAPI ``.rollback()`` method (as well as ``.commit()``) to +fail. The issue prevents the usual DBAPI contract of allowing ``.rollback()`` +to pass silently if no transaction is present as the driver does not expect +this condition. The symptom of this failure is an exception with a message +resembling 'No corresponding transaction found. (111214)' when attempting to +emit a ``.rollback()`` after an operation had a failure of some kind. + +This specific case can be handled by passing ``ignore_no_transaction_on_rollback=True`` to +the SQL Server dialect via the :func:`_sa.create_engine` function as follows:: + + engine = create_engine(connection_url, ignore_no_transaction_on_rollback=True) + +Using the above parameter, the dialect will catch ``ProgrammingError`` +exceptions raised during ``connection.rollback()`` and emit a warning +if the error message contains code ``111214``, however will not raise +an exception. + +.. versionadded:: 1.4.40 Added the + ``ignore_no_transaction_on_rollback=True`` parameter. + +Enable autocommit for Azure SQL Data Warehouse (DW) connections +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Azure SQL Data Warehouse does not support transactions, +and that can cause problems with SQLAlchemy's "autobegin" (and implicit +commit/rollback) behavior. We can avoid these problems by enabling autocommit +at both the pyodbc and engine levels:: + + connection_url = sa.engine.URL.create( + "mssql+pyodbc", + username="scott", + password="tiger", + host="dw.azure.example.com", + database="mydb", + query={ + "driver": "ODBC Driver 17 for SQL Server", + "autocommit": "True", + }, + ) + + engine = create_engine(connection_url).execution_options( + isolation_level="AUTOCOMMIT" + ) + +Avoiding sending large string parameters as TEXT/NTEXT +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default, for historical reasons, Microsoft's ODBC drivers for SQL Server +send long string parameters (greater than 4000 SBCS characters or 2000 Unicode +characters) as TEXT/NTEXT values. TEXT and NTEXT have been deprecated for many +years and are starting to cause compatibility issues with newer versions of +SQL_Server/Azure. For example, see `this +issue <https://github.com/mkleehammer/pyodbc/issues/835>`_. + +Starting with ODBC Driver 18 for SQL Server we can override the legacy +behavior and pass long strings as varchar(max)/nvarchar(max) using the +``LongAsMax=Yes`` connection string parameter:: + + connection_url = sa.engine.URL.create( + "mssql+pyodbc", + username="scott", + password="tiger", + host="mssqlserver.example.com", + database="mydb", + query={ + "driver": "ODBC Driver 18 for SQL Server", + "LongAsMax": "Yes", + }, + ) + + +Pyodbc Pooling / connection close behavior +------------------------------------------ + +PyODBC uses internal `pooling +<https://github.com/mkleehammer/pyodbc/wiki/The-pyodbc-Module#pooling>`_ by +default, which means connections will be longer lived than they are within +SQLAlchemy itself. As SQLAlchemy has its own pooling behavior, it is often +preferable to disable this behavior. This behavior can only be disabled +globally at the PyODBC module level, **before** any connections are made:: + + import pyodbc + + pyodbc.pooling = False + + # don't use the engine before pooling is set to False + engine = create_engine("mssql+pyodbc://user:pass@dsn") + +If this variable is left at its default value of ``True``, **the application +will continue to maintain active database connections**, even when the +SQLAlchemy engine itself fully discards a connection or if the engine is +disposed. + +.. seealso:: + + `pooling <https://github.com/mkleehammer/pyodbc/wiki/The-pyodbc-Module#pooling>`_ - + in the PyODBC documentation. + +Driver / Unicode Support +------------------------- + +PyODBC works best with Microsoft ODBC drivers, particularly in the area +of Unicode support on both Python 2 and Python 3. + +Using the FreeTDS ODBC drivers on Linux or OSX with PyODBC is **not** +recommended; there have been historically many Unicode-related issues +in this area, including before Microsoft offered ODBC drivers for Linux +and OSX. Now that Microsoft offers drivers for all platforms, for +PyODBC support these are recommended. FreeTDS remains relevant for +non-ODBC drivers such as pymssql where it works very well. + + +Rowcount Support +---------------- + +Previous limitations with the SQLAlchemy ORM's "versioned rows" feature with +Pyodbc have been resolved as of SQLAlchemy 2.0.5. See the notes at +:ref:`mssql_rowcount_versioning`. + +.. _mssql_pyodbc_fastexecutemany: + +Fast Executemany Mode +--------------------- + +The PyODBC driver includes support for a "fast executemany" mode of execution +which greatly reduces round trips for a DBAPI ``executemany()`` call when using +Microsoft ODBC drivers, for **limited size batches that fit in memory**. The +feature is enabled by setting the attribute ``.fast_executemany`` on the DBAPI +cursor when an executemany call is to be used. The SQLAlchemy PyODBC SQL +Server dialect supports this parameter by passing the +``fast_executemany`` parameter to +:func:`_sa.create_engine` , when using the **Microsoft ODBC driver only**:: + + engine = create_engine( + "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", + fast_executemany=True) + +.. versionchanged:: 2.0.9 - the ``fast_executemany`` parameter now has its + intended effect of this PyODBC feature taking effect for all INSERT + statements that are executed with multiple parameter sets, which don't + include RETURNING. Previously, SQLAlchemy 2.0's :term:`insertmanyvalues` + feature would cause ``fast_executemany`` to not be used in most cases + even if specified. + +.. versionadded:: 1.3 + +.. seealso:: + + `fast executemany <https://github.com/mkleehammer/pyodbc/wiki/Features-beyond-the-DB-API#fast_executemany>`_ + - on github + +.. _mssql_pyodbc_setinputsizes: + +Setinputsizes Support +----------------------- + +As of version 2.0, the pyodbc ``cursor.setinputsizes()`` method is used for +all statement executions, except for ``cursor.executemany()`` calls when +fast_executemany=True where it is not supported (assuming +:ref:`insertmanyvalues <engine_insertmanyvalues>` is kept enabled, +"fastexecutemany" will not take place for INSERT statements in any case). + +The use of ``cursor.setinputsizes()`` can be disabled by passing +``use_setinputsizes=False`` to :func:`_sa.create_engine`. + +When ``use_setinputsizes`` is left at its default of ``True``, the +specific per-type symbols passed to ``cursor.setinputsizes()`` can be +programmatically customized using the :meth:`.DialectEvents.do_setinputsizes` +hook. See that method for usage examples. + +.. versionchanged:: 2.0 The mssql+pyodbc dialect now defaults to using + ``use_setinputsizes=True`` for all statement executions with the exception of + cursor.executemany() calls when fast_executemany=True. The behavior can + be turned off by passing ``use_setinputsizes=False`` to + :func:`_sa.create_engine`. + +""" # noqa + + +import datetime +import decimal +import re +import struct + +from .base import _MSDateTime +from .base import _MSUnicode +from .base import _MSUnicodeText +from .base import BINARY +from .base import DATETIMEOFFSET +from .base import MSDialect +from .base import MSExecutionContext +from .base import VARBINARY +from .json import JSON as _MSJson +from .json import JSONIndexType as _MSJsonIndexType +from .json import JSONPathType as _MSJsonPathType +from ... import exc +from ... import types as sqltypes +from ... import util +from ...connectors.pyodbc import PyODBCConnector +from ...engine import cursor as _cursor + + +class _ms_numeric_pyodbc: + """Turns Decimals with adjusted() < 0 or > 7 into strings. + + The routines here are needed for older pyodbc versions + as well as current mxODBC versions. + + """ + + def bind_processor(self, dialect): + super_process = super().bind_processor(dialect) + + if not dialect._need_decimal_fix: + return super_process + + def process(value): + if self.asdecimal and isinstance(value, decimal.Decimal): + adjusted = value.adjusted() + if adjusted < 0: + return self._small_dec_to_string(value) + elif adjusted > 7: + return self._large_dec_to_string(value) + + if super_process: + return super_process(value) + else: + return value + + return process + + # these routines needed for older versions of pyodbc. + # as of 2.1.8 this logic is integrated. + + def _small_dec_to_string(self, value): + return "%s0.%s%s" % ( + (value < 0 and "-" or ""), + "0" * (abs(value.adjusted()) - 1), + "".join([str(nint) for nint in value.as_tuple()[1]]), + ) + + def _large_dec_to_string(self, value): + _int = value.as_tuple()[1] + if "E" in str(value): + result = "%s%s%s" % ( + (value < 0 and "-" or ""), + "".join([str(s) for s in _int]), + "0" * (value.adjusted() - (len(_int) - 1)), + ) + else: + if (len(_int) - 1) > value.adjusted(): + result = "%s%s.%s" % ( + (value < 0 and "-" or ""), + "".join([str(s) for s in _int][0 : value.adjusted() + 1]), + "".join([str(s) for s in _int][value.adjusted() + 1 :]), + ) + else: + result = "%s%s" % ( + (value < 0 and "-" or ""), + "".join([str(s) for s in _int][0 : value.adjusted() + 1]), + ) + return result + + +class _MSNumeric_pyodbc(_ms_numeric_pyodbc, sqltypes.Numeric): + pass + + +class _MSFloat_pyodbc(_ms_numeric_pyodbc, sqltypes.Float): + pass + + +class _ms_binary_pyodbc: + """Wraps binary values in dialect-specific Binary wrapper. + If the value is null, return a pyodbc-specific BinaryNull + object to prevent pyODBC [and FreeTDS] from defaulting binary + NULL types to SQLWCHAR and causing implicit conversion errors. + """ + + def bind_processor(self, dialect): + if dialect.dbapi is None: + return None + + DBAPIBinary = dialect.dbapi.Binary + + def process(value): + if value is not None: + return DBAPIBinary(value) + else: + # pyodbc-specific + return dialect.dbapi.BinaryNull + + return process + + +class _ODBCDateTimeBindProcessor: + """Add bind processors to handle datetimeoffset behaviors""" + + has_tz = False + + def bind_processor(self, dialect): + def process(value): + if value is None: + return None + elif isinstance(value, str): + # if a string was passed directly, allow it through + return value + elif not value.tzinfo or (not self.timezone and not self.has_tz): + # for DateTime(timezone=False) + return value + else: + # for DATETIMEOFFSET or DateTime(timezone=True) + # + # Convert to string format required by T-SQL + dto_string = value.strftime("%Y-%m-%d %H:%M:%S.%f %z") + # offset needs a colon, e.g., -0700 -> -07:00 + # "UTC offset in the form (+-)HHMM[SS[.ffffff]]" + # backend currently rejects seconds / fractional seconds + dto_string = re.sub( + r"([\+\-]\d{2})([\d\.]+)$", r"\1:\2", dto_string + ) + return dto_string + + return process + + +class _ODBCDateTime(_ODBCDateTimeBindProcessor, _MSDateTime): + pass + + +class _ODBCDATETIMEOFFSET(_ODBCDateTimeBindProcessor, DATETIMEOFFSET): + has_tz = True + + +class _VARBINARY_pyodbc(_ms_binary_pyodbc, VARBINARY): + pass + + +class _BINARY_pyodbc(_ms_binary_pyodbc, BINARY): + pass + + +class _String_pyodbc(sqltypes.String): + def get_dbapi_type(self, dbapi): + if self.length in (None, "max") or self.length >= 2000: + return (dbapi.SQL_VARCHAR, 0, 0) + else: + return dbapi.SQL_VARCHAR + + +class _Unicode_pyodbc(_MSUnicode): + def get_dbapi_type(self, dbapi): + if self.length in (None, "max") or self.length >= 2000: + return (dbapi.SQL_WVARCHAR, 0, 0) + else: + return dbapi.SQL_WVARCHAR + + +class _UnicodeText_pyodbc(_MSUnicodeText): + def get_dbapi_type(self, dbapi): + if self.length in (None, "max") or self.length >= 2000: + return (dbapi.SQL_WVARCHAR, 0, 0) + else: + return dbapi.SQL_WVARCHAR + + +class _JSON_pyodbc(_MSJson): + def get_dbapi_type(self, dbapi): + return (dbapi.SQL_WVARCHAR, 0, 0) + + +class _JSONIndexType_pyodbc(_MSJsonIndexType): + def get_dbapi_type(self, dbapi): + return dbapi.SQL_WVARCHAR + + +class _JSONPathType_pyodbc(_MSJsonPathType): + def get_dbapi_type(self, dbapi): + return dbapi.SQL_WVARCHAR + + +class MSExecutionContext_pyodbc(MSExecutionContext): + _embedded_scope_identity = False + + def pre_exec(self): + """where appropriate, issue "select scope_identity()" in the same + statement. + + Background on why "scope_identity()" is preferable to "@@identity": + https://msdn.microsoft.com/en-us/library/ms190315.aspx + + Background on why we attempt to embed "scope_identity()" into the same + statement as the INSERT: + https://code.google.com/p/pyodbc/wiki/FAQs#How_do_I_retrieve_autogenerated/identity_values? + + """ + + super().pre_exec() + + # don't embed the scope_identity select into an + # "INSERT .. DEFAULT VALUES" + if ( + self._select_lastrowid + and self.dialect.use_scope_identity + and len(self.parameters[0]) + ): + self._embedded_scope_identity = True + + self.statement += "; select scope_identity()" + + def post_exec(self): + if self._embedded_scope_identity: + # Fetch the last inserted id from the manipulated statement + # We may have to skip over a number of result sets with + # no data (due to triggers, etc.) + while True: + try: + # fetchall() ensures the cursor is consumed + # without closing it (FreeTDS particularly) + rows = self.cursor.fetchall() + except self.dialect.dbapi.Error: + # no way around this - nextset() consumes the previous set + # so we need to just keep flipping + self.cursor.nextset() + else: + if not rows: + # async adapter drivers just return None here + self.cursor.nextset() + continue + row = rows[0] + break + + self._lastrowid = int(row[0]) + + self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML + else: + super().post_exec() + + +class MSDialect_pyodbc(PyODBCConnector, MSDialect): + supports_statement_cache = True + + # note this parameter is no longer used by the ORM or default dialect + # see #9414 + supports_sane_rowcount_returning = False + + execution_ctx_cls = MSExecutionContext_pyodbc + + colspecs = util.update_copy( + MSDialect.colspecs, + { + sqltypes.Numeric: _MSNumeric_pyodbc, + sqltypes.Float: _MSFloat_pyodbc, + BINARY: _BINARY_pyodbc, + # support DateTime(timezone=True) + sqltypes.DateTime: _ODBCDateTime, + DATETIMEOFFSET: _ODBCDATETIMEOFFSET, + # SQL Server dialect has a VARBINARY that is just to support + # "deprecate_large_types" w/ VARBINARY(max), but also we must + # handle the usual SQL standard VARBINARY + VARBINARY: _VARBINARY_pyodbc, + sqltypes.VARBINARY: _VARBINARY_pyodbc, + sqltypes.LargeBinary: _VARBINARY_pyodbc, + sqltypes.String: _String_pyodbc, + sqltypes.Unicode: _Unicode_pyodbc, + sqltypes.UnicodeText: _UnicodeText_pyodbc, + sqltypes.JSON: _JSON_pyodbc, + sqltypes.JSON.JSONIndexType: _JSONIndexType_pyodbc, + sqltypes.JSON.JSONPathType: _JSONPathType_pyodbc, + # this excludes Enum from the string/VARCHAR thing for now + # it looks like Enum's adaptation doesn't really support the + # String type itself having a dialect-level impl + sqltypes.Enum: sqltypes.Enum, + }, + ) + + def __init__( + self, + fast_executemany=False, + use_setinputsizes=True, + **params, + ): + super().__init__(use_setinputsizes=use_setinputsizes, **params) + self.use_scope_identity = ( + self.use_scope_identity + and self.dbapi + and hasattr(self.dbapi.Cursor, "nextset") + ) + self._need_decimal_fix = self.dbapi and self._dbapi_version() < ( + 2, + 1, + 8, + ) + self.fast_executemany = fast_executemany + if fast_executemany: + self.use_insertmanyvalues_wo_returning = False + + def _get_server_version_info(self, connection): + try: + # "Version of the instance of SQL Server, in the form + # of 'major.minor.build.revision'" + raw = connection.exec_driver_sql( + "SELECT CAST(SERVERPROPERTY('ProductVersion') AS VARCHAR)" + ).scalar() + except exc.DBAPIError: + # SQL Server docs indicate this function isn't present prior to + # 2008. Before we had the VARCHAR cast above, pyodbc would also + # fail on this query. + return super()._get_server_version_info(connection) + else: + version = [] + r = re.compile(r"[.\-]") + for n in r.split(raw): + try: + version.append(int(n)) + except ValueError: + pass + return tuple(version) + + def on_connect(self): + super_ = super().on_connect() + + def on_connect(conn): + if super_ is not None: + super_(conn) + + self._setup_timestampoffset_type(conn) + + return on_connect + + def _setup_timestampoffset_type(self, connection): + # output converter function for datetimeoffset + def _handle_datetimeoffset(dto_value): + tup = struct.unpack("<6hI2h", dto_value) + return datetime.datetime( + tup[0], + tup[1], + tup[2], + tup[3], + tup[4], + tup[5], + tup[6] // 1000, + datetime.timezone( + datetime.timedelta(hours=tup[7], minutes=tup[8]) + ), + ) + + odbc_SQL_SS_TIMESTAMPOFFSET = -155 # as defined in SQLNCLI.h + connection.add_output_converter( + odbc_SQL_SS_TIMESTAMPOFFSET, _handle_datetimeoffset + ) + + def do_executemany(self, cursor, statement, parameters, context=None): + if self.fast_executemany: + cursor.fast_executemany = True + super().do_executemany(cursor, statement, parameters, context=context) + + def is_disconnect(self, e, connection, cursor): + if isinstance(e, self.dbapi.Error): + code = e.args[0] + if code in { + "08S01", + "01000", + "01002", + "08003", + "08007", + "08S02", + "08001", + "HYT00", + "HY010", + "10054", + }: + return True + return super().is_disconnect(e, connection, cursor) + + +dialect = MSDialect_pyodbc |