diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 |
commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py | |
parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py | 4007 |
1 files changed, 4007 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py new file mode 100644 index 0000000..872f858 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/mssql/base.py @@ -0,0 +1,4007 @@ +# dialects/mssql/base.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +""" +.. dialect:: mssql + :name: Microsoft SQL Server + :full_support: 2017 + :normal_support: 2012+ + :best_effort: 2005+ + +.. _mssql_external_dialects: + +External Dialects +----------------- + +In addition to the above DBAPI layers with native SQLAlchemy support, there +are third-party dialects for other DBAPI layers that are compatible +with SQL Server. See the "External Dialects" list on the +:ref:`dialect_toplevel` page. + +.. _mssql_identity: + +Auto Increment Behavior / IDENTITY Columns +------------------------------------------ + +SQL Server provides so-called "auto incrementing" behavior using the +``IDENTITY`` construct, which can be placed on any single integer column in a +table. SQLAlchemy considers ``IDENTITY`` within its default "autoincrement" +behavior for an integer primary key column, described at +:paramref:`_schema.Column.autoincrement`. This means that by default, +the first integer primary key column in a :class:`_schema.Table` will be +considered to be the identity column - unless it is associated with a +:class:`.Sequence` - and will generate DDL as such:: + + from sqlalchemy import Table, MetaData, Column, Integer + + m = MetaData() + t = Table('t', m, + Column('id', Integer, primary_key=True), + Column('x', Integer)) + m.create_all(engine) + +The above example will generate DDL as: + +.. sourcecode:: sql + + CREATE TABLE t ( + id INTEGER NOT NULL IDENTITY, + x INTEGER NULL, + PRIMARY KEY (id) + ) + +For the case where this default generation of ``IDENTITY`` is not desired, +specify ``False`` for the :paramref:`_schema.Column.autoincrement` flag, +on the first integer primary key column:: + + m = MetaData() + t = Table('t', m, + Column('id', Integer, primary_key=True, autoincrement=False), + Column('x', Integer)) + m.create_all(engine) + +To add the ``IDENTITY`` keyword to a non-primary key column, specify +``True`` for the :paramref:`_schema.Column.autoincrement` flag on the desired +:class:`_schema.Column` object, and ensure that +:paramref:`_schema.Column.autoincrement` +is set to ``False`` on any integer primary key column:: + + m = MetaData() + t = Table('t', m, + Column('id', Integer, primary_key=True, autoincrement=False), + Column('x', Integer, autoincrement=True)) + m.create_all(engine) + +.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct + in a :class:`_schema.Column` to specify the start and increment + parameters of an IDENTITY. These replace + the use of the :class:`.Sequence` object in order to specify these values. + +.. deprecated:: 1.4 + + The ``mssql_identity_start`` and ``mssql_identity_increment`` parameters + to :class:`_schema.Column` are deprecated and should we replaced by + an :class:`_schema.Identity` object. Specifying both ways of configuring + an IDENTITY will result in a compile error. + These options are also no longer returned as part of the + ``dialect_options`` key in :meth:`_reflection.Inspector.get_columns`. + Use the information in the ``identity`` key instead. + +.. deprecated:: 1.3 + + The use of :class:`.Sequence` to specify IDENTITY characteristics is + deprecated and will be removed in a future release. Please use + the :class:`_schema.Identity` object parameters + :paramref:`_schema.Identity.start` and + :paramref:`_schema.Identity.increment`. + +.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence` + object to modify IDENTITY characteristics. :class:`.Sequence` objects + now only manipulate true T-SQL SEQUENCE types. + +.. note:: + + There can only be one IDENTITY column on the table. When using + ``autoincrement=True`` to enable the IDENTITY keyword, SQLAlchemy does not + guard against multiple columns specifying the option simultaneously. The + SQL Server database will instead reject the ``CREATE TABLE`` statement. + +.. note:: + + An INSERT statement which attempts to provide a value for a column that is + marked with IDENTITY will be rejected by SQL Server. In order for the + value to be accepted, a session-level option "SET IDENTITY_INSERT" must be + enabled. The SQLAlchemy SQL Server dialect will perform this operation + automatically when using a core :class:`_expression.Insert` + construct; if the + execution specifies a value for the IDENTITY column, the "IDENTITY_INSERT" + option will be enabled for the span of that statement's invocation.However, + this scenario is not high performing and should not be relied upon for + normal use. If a table doesn't actually require IDENTITY behavior in its + integer primary key column, the keyword should be disabled when creating + the table by ensuring that ``autoincrement=False`` is set. + +Controlling "Start" and "Increment" +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Specific control over the "start" and "increment" values for +the ``IDENTITY`` generator are provided using the +:paramref:`_schema.Identity.start` and :paramref:`_schema.Identity.increment` +parameters passed to the :class:`_schema.Identity` object:: + + from sqlalchemy import Table, Integer, Column, Identity + + test = Table( + 'test', metadata, + Column( + 'id', + Integer, + primary_key=True, + Identity(start=100, increment=10) + ), + Column('name', String(20)) + ) + +The CREATE TABLE for the above :class:`_schema.Table` object would be: + +.. sourcecode:: sql + + CREATE TABLE test ( + id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY, + name VARCHAR(20) NULL, + ) + +.. note:: + + The :class:`_schema.Identity` object supports many other parameter in + addition to ``start`` and ``increment``. These are not supported by + SQL Server and will be ignored when generating the CREATE TABLE ddl. + +.. versionchanged:: 1.3.19 The :class:`_schema.Identity` object is + now used to affect the + ``IDENTITY`` generator for a :class:`_schema.Column` under SQL Server. + Previously, the :class:`.Sequence` object was used. As SQL Server now + supports real sequences as a separate construct, :class:`.Sequence` will be + functional in the normal way starting from SQLAlchemy version 1.4. + + +Using IDENTITY with Non-Integer numeric types +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +SQL Server also allows ``IDENTITY`` to be used with ``NUMERIC`` columns. To +implement this pattern smoothly in SQLAlchemy, the primary datatype of the +column should remain as ``Integer``, however the underlying implementation +type deployed to the SQL Server database can be specified as ``Numeric`` using +:meth:`.TypeEngine.with_variant`:: + + from sqlalchemy import Column + from sqlalchemy import Integer + from sqlalchemy import Numeric + from sqlalchemy import String + from sqlalchemy.ext.declarative import declarative_base + + Base = declarative_base() + + class TestTable(Base): + __tablename__ = "test" + id = Column( + Integer().with_variant(Numeric(10, 0), "mssql"), + primary_key=True, + autoincrement=True, + ) + name = Column(String) + +In the above example, ``Integer().with_variant()`` provides clear usage +information that accurately describes the intent of the code. The general +restriction that ``autoincrement`` only applies to ``Integer`` is established +at the metadata level and not at the per-dialect level. + +When using the above pattern, the primary key identifier that comes back from +the insertion of a row, which is also the value that would be assigned to an +ORM object such as ``TestTable`` above, will be an instance of ``Decimal()`` +and not ``int`` when using SQL Server. The numeric return type of the +:class:`_types.Numeric` type can be changed to return floats by passing False +to :paramref:`_types.Numeric.asdecimal`. To normalize the return type of the +above ``Numeric(10, 0)`` to return Python ints (which also support "long" +integer values in Python 3), use :class:`_types.TypeDecorator` as follows:: + + from sqlalchemy import TypeDecorator + + class NumericAsInteger(TypeDecorator): + '''normalize floating point return values into ints''' + + impl = Numeric(10, 0, asdecimal=False) + cache_ok = True + + def process_result_value(self, value, dialect): + if value is not None: + value = int(value) + return value + + class TestTable(Base): + __tablename__ = "test" + id = Column( + Integer().with_variant(NumericAsInteger, "mssql"), + primary_key=True, + autoincrement=True, + ) + name = Column(String) + +.. _mssql_insert_behavior: + +INSERT behavior +^^^^^^^^^^^^^^^^ + +Handling of the ``IDENTITY`` column at INSERT time involves two key +techniques. The most common is being able to fetch the "last inserted value" +for a given ``IDENTITY`` column, a process which SQLAlchemy performs +implicitly in many cases, most importantly within the ORM. + +The process for fetching this value has several variants: + +* In the vast majority of cases, RETURNING is used in conjunction with INSERT + statements on SQL Server in order to get newly generated primary key values: + + .. sourcecode:: sql + + INSERT INTO t (x) OUTPUT inserted.id VALUES (?) + + As of SQLAlchemy 2.0, the :ref:`engine_insertmanyvalues` feature is also + used by default to optimize many-row INSERT statements; for SQL Server + the feature takes place for both RETURNING and-non RETURNING + INSERT statements. + + .. versionchanged:: 2.0.10 The :ref:`engine_insertmanyvalues` feature for + SQL Server was temporarily disabled for SQLAlchemy version 2.0.9 due to + issues with row ordering. As of 2.0.10 the feature is re-enabled, with + special case handling for the unit of work's requirement for RETURNING to + be ordered. + +* When RETURNING is not available or has been disabled via + ``implicit_returning=False``, either the ``scope_identity()`` function or + the ``@@identity`` variable is used; behavior varies by backend: + + * when using PyODBC, the phrase ``; select scope_identity()`` will be + appended to the end of the INSERT statement; a second result set will be + fetched in order to receive the value. Given a table as:: + + t = Table( + 't', + metadata, + Column('id', Integer, primary_key=True), + Column('x', Integer), + implicit_returning=False + ) + + an INSERT will look like: + + .. sourcecode:: sql + + INSERT INTO t (x) VALUES (?); select scope_identity() + + * Other dialects such as pymssql will call upon + ``SELECT scope_identity() AS lastrowid`` subsequent to an INSERT + statement. If the flag ``use_scope_identity=False`` is passed to + :func:`_sa.create_engine`, + the statement ``SELECT @@identity AS lastrowid`` + is used instead. + +A table that contains an ``IDENTITY`` column will prohibit an INSERT statement +that refers to the identity column explicitly. The SQLAlchemy dialect will +detect when an INSERT construct, created using a core +:func:`_expression.insert` +construct (not a plain string SQL), refers to the identity column, and +in this case will emit ``SET IDENTITY_INSERT ON`` prior to the insert +statement proceeding, and ``SET IDENTITY_INSERT OFF`` subsequent to the +execution. Given this example:: + + m = MetaData() + t = Table('t', m, Column('id', Integer, primary_key=True), + Column('x', Integer)) + m.create_all(engine) + + with engine.begin() as conn: + conn.execute(t.insert(), {'id': 1, 'x':1}, {'id':2, 'x':2}) + +The above column will be created with IDENTITY, however the INSERT statement +we emit is specifying explicit values. In the echo output we can see +how SQLAlchemy handles this: + +.. sourcecode:: sql + + CREATE TABLE t ( + id INTEGER NOT NULL IDENTITY(1,1), + x INTEGER NULL, + PRIMARY KEY (id) + ) + + COMMIT + SET IDENTITY_INSERT t ON + INSERT INTO t (id, x) VALUES (?, ?) + ((1, 1), (2, 2)) + SET IDENTITY_INSERT t OFF + COMMIT + + + +This is an auxiliary use case suitable for testing and bulk insert scenarios. + +SEQUENCE support +---------------- + +The :class:`.Sequence` object creates "real" sequences, i.e., +``CREATE SEQUENCE``: + +.. sourcecode:: pycon+sql + + >>> from sqlalchemy import Sequence + >>> from sqlalchemy.schema import CreateSequence + >>> from sqlalchemy.dialects import mssql + >>> print(CreateSequence(Sequence("my_seq", start=1)).compile(dialect=mssql.dialect())) + {printsql}CREATE SEQUENCE my_seq START WITH 1 + +For integer primary key generation, SQL Server's ``IDENTITY`` construct should +generally be preferred vs. sequence. + +.. tip:: + + The default start value for T-SQL is ``-2**63`` instead of 1 as + in most other SQL databases. Users should explicitly set the + :paramref:`.Sequence.start` to 1 if that's the expected default:: + + seq = Sequence("my_sequence", start=1) + +.. versionadded:: 1.4 added SQL Server support for :class:`.Sequence` + +.. versionchanged:: 2.0 The SQL Server dialect will no longer implicitly + render "START WITH 1" for ``CREATE SEQUENCE``, which was the behavior + first implemented in version 1.4. + +MAX on VARCHAR / NVARCHAR +------------------------- + +SQL Server supports the special string "MAX" within the +:class:`_types.VARCHAR` and :class:`_types.NVARCHAR` datatypes, +to indicate "maximum length possible". The dialect currently handles this as +a length of "None" in the base type, rather than supplying a +dialect-specific version of these types, so that a base type +specified such as ``VARCHAR(None)`` can assume "unlengthed" behavior on +more than one backend without using dialect-specific types. + +To build a SQL Server VARCHAR or NVARCHAR with MAX length, use None:: + + my_table = Table( + 'my_table', metadata, + Column('my_data', VARCHAR(None)), + Column('my_n_data', NVARCHAR(None)) + ) + + +Collation Support +----------------- + +Character collations are supported by the base string types, +specified by the string argument "collation":: + + from sqlalchemy import VARCHAR + Column('login', VARCHAR(32, collation='Latin1_General_CI_AS')) + +When such a column is associated with a :class:`_schema.Table`, the +CREATE TABLE statement for this column will yield:: + + login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL + +LIMIT/OFFSET Support +-------------------- + +MSSQL has added support for LIMIT / OFFSET as of SQL Server 2012, via the +"OFFSET n ROWS" and "FETCH NEXT n ROWS" clauses. SQLAlchemy supports these +syntaxes automatically if SQL Server 2012 or greater is detected. + +.. versionchanged:: 1.4 support added for SQL Server "OFFSET n ROWS" and + "FETCH NEXT n ROWS" syntax. + +For statements that specify only LIMIT and no OFFSET, all versions of SQL +Server support the TOP keyword. This syntax is used for all SQL Server +versions when no OFFSET clause is present. A statement such as:: + + select(some_table).limit(5) + +will render similarly to:: + + SELECT TOP 5 col1, col2.. FROM table + +For versions of SQL Server prior to SQL Server 2012, a statement that uses +LIMIT and OFFSET, or just OFFSET alone, will be rendered using the +``ROW_NUMBER()`` window function. A statement such as:: + + select(some_table).order_by(some_table.c.col3).limit(5).offset(10) + +will render similarly to:: + + SELECT anon_1.col1, anon_1.col2 FROM (SELECT col1, col2, + ROW_NUMBER() OVER (ORDER BY col3) AS + mssql_rn FROM table WHERE t.x = :x_1) AS + anon_1 WHERE mssql_rn > :param_1 AND mssql_rn <= :param_2 + :param_1 + +Note that when using LIMIT and/or OFFSET, whether using the older +or newer SQL Server syntaxes, the statement must have an ORDER BY as well, +else a :class:`.CompileError` is raised. + +.. _mssql_comment_support: + +DDL Comment Support +-------------------- + +Comment support, which includes DDL rendering for attributes such as +:paramref:`_schema.Table.comment` and :paramref:`_schema.Column.comment`, as +well as the ability to reflect these comments, is supported assuming a +supported version of SQL Server is in use. If a non-supported version such as +Azure Synapse is detected at first-connect time (based on the presence +of the ``fn_listextendedproperty`` SQL function), comment support including +rendering and table-comment reflection is disabled, as both features rely upon +SQL Server stored procedures and functions that are not available on all +backend types. + +To force comment support to be on or off, bypassing autodetection, set the +parameter ``supports_comments`` within :func:`_sa.create_engine`:: + + e = create_engine("mssql+pyodbc://u:p@dsn", supports_comments=False) + +.. versionadded:: 2.0 Added support for table and column comments for + the SQL Server dialect, including DDL generation and reflection. + +.. _mssql_isolation_level: + +Transaction Isolation Level +--------------------------- + +All SQL Server dialects support setting of transaction isolation level +both via a dialect-specific parameter +:paramref:`_sa.create_engine.isolation_level` +accepted by :func:`_sa.create_engine`, +as well as the :paramref:`.Connection.execution_options.isolation_level` +argument as passed to +:meth:`_engine.Connection.execution_options`. +This feature works by issuing the +command ``SET TRANSACTION ISOLATION LEVEL <level>`` for +each new connection. + +To set isolation level using :func:`_sa.create_engine`:: + + engine = create_engine( + "mssql+pyodbc://scott:tiger@ms_2008", + isolation_level="REPEATABLE READ" + ) + +To set using per-connection execution options:: + + connection = engine.connect() + connection = connection.execution_options( + isolation_level="READ COMMITTED" + ) + +Valid values for ``isolation_level`` include: + +* ``AUTOCOMMIT`` - pyodbc / pymssql-specific +* ``READ COMMITTED`` +* ``READ UNCOMMITTED`` +* ``REPEATABLE READ`` +* ``SERIALIZABLE`` +* ``SNAPSHOT`` - specific to SQL Server + +There are also more options for isolation level configurations, such as +"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply +different isolation level settings. See the discussion at +:ref:`dbapi_autocommit` for background. + +.. seealso:: + + :ref:`dbapi_autocommit` + +.. _mssql_reset_on_return: + +Temporary Table / Resource Reset for Connection Pooling +------------------------------------------------------- + +The :class:`.QueuePool` connection pool implementation used +by the SQLAlchemy :class:`.Engine` object includes +:ref:`reset on return <pool_reset_on_return>` behavior that will invoke +the DBAPI ``.rollback()`` method when connections are returned to the pool. +While this rollback will clear out the immediate state used by the previous +transaction, it does not cover a wider range of session-level state, including +temporary tables as well as other server state such as prepared statement +handles and statement caches. An undocumented SQL Server procedure known +as ``sp_reset_connection`` is known to be a workaround for this issue which +will reset most of the session state that builds up on a connection, including +temporary tables. + +To install ``sp_reset_connection`` as the means of performing reset-on-return, +the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the +example below. The :paramref:`_sa.create_engine.pool_reset_on_return` parameter +is set to ``None`` so that the custom scheme can replace the default behavior +completely. The custom hook implementation calls ``.rollback()`` in any case, +as it's usually important that the DBAPI's own tracking of commit/rollback +will remain consistent with the state of the transaction:: + + from sqlalchemy import create_engine + from sqlalchemy import event + + mssql_engine = create_engine( + "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server", + + # disable default reset-on-return scheme + pool_reset_on_return=None, + ) + + + @event.listens_for(mssql_engine, "reset") + def _reset_mssql(dbapi_connection, connection_record, reset_state): + if not reset_state.terminate_only: + dbapi_connection.execute("{call sys.sp_reset_connection}") + + # so that the DBAPI itself knows that the connection has been + # reset + dbapi_connection.rollback() + +.. versionchanged:: 2.0.0b3 Added additional state arguments to + the :meth:`.PoolEvents.reset` event and additionally ensured the event + is invoked for all "reset" occurrences, so that it's appropriate + as a place for custom "reset" handlers. Previous schemes which + use the :meth:`.PoolEvents.checkin` handler remain usable as well. + +.. seealso:: + + :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation + +Nullability +----------- +MSSQL has support for three levels of column nullability. The default +nullability allows nulls and is explicit in the CREATE TABLE +construct:: + + name VARCHAR(20) NULL + +If ``nullable=None`` is specified then no specification is made. In +other words the database's configured default is used. This will +render:: + + name VARCHAR(20) + +If ``nullable`` is ``True`` or ``False`` then the column will be +``NULL`` or ``NOT NULL`` respectively. + +Date / Time Handling +-------------------- +DATE and TIME are supported. Bind parameters are converted +to datetime.datetime() objects as required by most MSSQL drivers, +and results are processed from strings if needed. +The DATE and TIME types are not available for MSSQL 2005 and +previous - if a server version below 2008 is detected, DDL +for these types will be issued as DATETIME. + +.. _mssql_large_type_deprecation: + +Large Text/Binary Type Deprecation +---------------------------------- + +Per +`SQL Server 2012/2014 Documentation <https://technet.microsoft.com/en-us/library/ms187993.aspx>`_, +the ``NTEXT``, ``TEXT`` and ``IMAGE`` datatypes are to be removed from SQL +Server in a future release. SQLAlchemy normally relates these types to the +:class:`.UnicodeText`, :class:`_expression.TextClause` and +:class:`.LargeBinary` datatypes. + +In order to accommodate this change, a new flag ``deprecate_large_types`` +is added to the dialect, which will be automatically set based on detection +of the server version in use, if not otherwise set by the user. The +behavior of this flag is as follows: + +* When this flag is ``True``, the :class:`.UnicodeText`, + :class:`_expression.TextClause` and + :class:`.LargeBinary` datatypes, when used to render DDL, will render the + types ``NVARCHAR(max)``, ``VARCHAR(max)``, and ``VARBINARY(max)``, + respectively. This is a new behavior as of the addition of this flag. + +* When this flag is ``False``, the :class:`.UnicodeText`, + :class:`_expression.TextClause` and + :class:`.LargeBinary` datatypes, when used to render DDL, will render the + types ``NTEXT``, ``TEXT``, and ``IMAGE``, + respectively. This is the long-standing behavior of these types. + +* The flag begins with the value ``None``, before a database connection is + established. If the dialect is used to render DDL without the flag being + set, it is interpreted the same as ``False``. + +* On first connection, the dialect detects if SQL Server version 2012 or + greater is in use; if the flag is still at ``None``, it sets it to ``True`` + or ``False`` based on whether 2012 or greater is detected. + +* The flag can be set to either ``True`` or ``False`` when the dialect + is created, typically via :func:`_sa.create_engine`:: + + eng = create_engine("mssql+pymssql://user:pass@host/db", + deprecate_large_types=True) + +* Complete control over whether the "old" or "new" types are rendered is + available in all SQLAlchemy versions by using the UPPERCASE type objects + instead: :class:`_types.NVARCHAR`, :class:`_types.VARCHAR`, + :class:`_types.VARBINARY`, :class:`_types.TEXT`, :class:`_mssql.NTEXT`, + :class:`_mssql.IMAGE` + will always remain fixed and always output exactly that + type. + +.. _multipart_schema_names: + +Multipart Schema Names +---------------------- + +SQL Server schemas sometimes require multiple parts to their "schema" +qualifier, that is, including the database name and owner name as separate +tokens, such as ``mydatabase.dbo.some_table``. These multipart names can be set +at once using the :paramref:`_schema.Table.schema` argument of +:class:`_schema.Table`:: + + Table( + "some_table", metadata, + Column("q", String(50)), + schema="mydatabase.dbo" + ) + +When performing operations such as table or component reflection, a schema +argument that contains a dot will be split into separate +"database" and "owner" components in order to correctly query the SQL +Server information schema tables, as these two values are stored separately. +Additionally, when rendering the schema name for DDL or SQL, the two +components will be quoted separately for case sensitive names and other +special characters. Given an argument as below:: + + Table( + "some_table", metadata, + Column("q", String(50)), + schema="MyDataBase.dbo" + ) + +The above schema would be rendered as ``[MyDataBase].dbo``, and also in +reflection, would be reflected using "dbo" as the owner and "MyDataBase" +as the database name. + +To control how the schema name is broken into database / owner, +specify brackets (which in SQL Server are quoting characters) in the name. +Below, the "owner" will be considered as ``MyDataBase.dbo`` and the +"database" will be None:: + + Table( + "some_table", metadata, + Column("q", String(50)), + schema="[MyDataBase.dbo]" + ) + +To individually specify both database and owner name with special characters +or embedded dots, use two sets of brackets:: + + Table( + "some_table", metadata, + Column("q", String(50)), + schema="[MyDataBase.Period].[MyOwner.Dot]" + ) + + +.. versionchanged:: 1.2 the SQL Server dialect now treats brackets as + identifier delimiters splitting the schema into separate database + and owner tokens, to allow dots within either name itself. + +.. _legacy_schema_rendering: + +Legacy Schema Mode +------------------ + +Very old versions of the MSSQL dialect introduced the behavior such that a +schema-qualified table would be auto-aliased when used in a +SELECT statement; given a table:: + + account_table = Table( + 'account', metadata, + Column('id', Integer, primary_key=True), + Column('info', String(100)), + schema="customer_schema" + ) + +this legacy mode of rendering would assume that "customer_schema.account" +would not be accepted by all parts of the SQL statement, as illustrated +below: + +.. sourcecode:: pycon+sql + + >>> eng = create_engine("mssql+pymssql://mydsn", legacy_schema_aliasing=True) + >>> print(account_table.select().compile(eng)) + {printsql}SELECT account_1.id, account_1.info + FROM customer_schema.account AS account_1 + +This mode of behavior is now off by default, as it appears to have served +no purpose; however in the case that legacy applications rely upon it, +it is available using the ``legacy_schema_aliasing`` argument to +:func:`_sa.create_engine` as illustrated above. + +.. deprecated:: 1.4 + + The ``legacy_schema_aliasing`` flag is now + deprecated and will be removed in a future release. + +.. _mssql_indexes: + +Clustered Index Support +----------------------- + +The MSSQL dialect supports clustered indexes (and primary keys) via the +``mssql_clustered`` option. This option is available to :class:`.Index`, +:class:`.UniqueConstraint`. and :class:`.PrimaryKeyConstraint`. +For indexes this option can be combined with the ``mssql_columnstore`` one +to create a clustered columnstore index. + +To generate a clustered index:: + + Index("my_index", table.c.x, mssql_clustered=True) + +which renders the index as ``CREATE CLUSTERED INDEX my_index ON table (x)``. + +To generate a clustered primary key use:: + + Table('my_table', metadata, + Column('x', ...), + Column('y', ...), + PrimaryKeyConstraint("x", "y", mssql_clustered=True)) + +which will render the table, for example, as:: + + CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, + PRIMARY KEY CLUSTERED (x, y)) + +Similarly, we can generate a clustered unique constraint using:: + + Table('my_table', metadata, + Column('x', ...), + Column('y', ...), + PrimaryKeyConstraint("x"), + UniqueConstraint("y", mssql_clustered=True), + ) + +To explicitly request a non-clustered primary key (for example, when +a separate clustered index is desired), use:: + + Table('my_table', metadata, + Column('x', ...), + Column('y', ...), + PrimaryKeyConstraint("x", "y", mssql_clustered=False)) + +which will render the table, for example, as:: + + CREATE TABLE my_table (x INTEGER NOT NULL, y INTEGER NOT NULL, + PRIMARY KEY NONCLUSTERED (x, y)) + +Columnstore Index Support +------------------------- + +The MSSQL dialect supports columnstore indexes via the ``mssql_columnstore`` +option. This option is available to :class:`.Index`. It be combined with +the ``mssql_clustered`` option to create a clustered columnstore index. + +To generate a columnstore index:: + + Index("my_index", table.c.x, mssql_columnstore=True) + +which renders the index as ``CREATE COLUMNSTORE INDEX my_index ON table (x)``. + +To generate a clustered columnstore index provide no columns:: + + idx = Index("my_index", mssql_clustered=True, mssql_columnstore=True) + # required to associate the index with the table + table.append_constraint(idx) + +the above renders the index as +``CREATE CLUSTERED COLUMNSTORE INDEX my_index ON table``. + +.. versionadded:: 2.0.18 + +MSSQL-Specific Index Options +----------------------------- + +In addition to clustering, the MSSQL dialect supports other special options +for :class:`.Index`. + +INCLUDE +^^^^^^^ + +The ``mssql_include`` option renders INCLUDE(colname) for the given string +names:: + + Index("my_index", table.c.x, mssql_include=['y']) + +would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` + +.. _mssql_index_where: + +Filtered Indexes +^^^^^^^^^^^^^^^^ + +The ``mssql_where`` option renders WHERE(condition) for the given string +names:: + + Index("my_index", table.c.x, mssql_where=table.c.x > 10) + +would render the index as ``CREATE INDEX my_index ON table (x) WHERE x > 10``. + +.. versionadded:: 1.3.4 + +Index ordering +^^^^^^^^^^^^^^ + +Index ordering is available via functional expressions, such as:: + + Index("my_index", table.c.x.desc()) + +would render the index as ``CREATE INDEX my_index ON table (x DESC)`` + +.. seealso:: + + :ref:`schema_indexes_functional` + +Compatibility Levels +-------------------- +MSSQL supports the notion of setting compatibility levels at the +database level. This allows, for instance, to run a database that +is compatible with SQL2000 while running on a SQL2005 database +server. ``server_version_info`` will always return the database +server version information (in this case SQL2005) and not the +compatibility level information. Because of this, if running under +a backwards compatibility mode SQLAlchemy may attempt to use T-SQL +statements that are unable to be parsed by the database server. + +.. _mssql_triggers: + +Triggers +-------- + +SQLAlchemy by default uses OUTPUT INSERTED to get at newly +generated primary key values via IDENTITY columns or other +server side defaults. MS-SQL does not +allow the usage of OUTPUT INSERTED on tables that have triggers. +To disable the usage of OUTPUT INSERTED on a per-table basis, +specify ``implicit_returning=False`` for each :class:`_schema.Table` +which has triggers:: + + Table('mytable', metadata, + Column('id', Integer, primary_key=True), + # ..., + implicit_returning=False + ) + +Declarative form:: + + class MyClass(Base): + # ... + __table_args__ = {'implicit_returning':False} + + +.. _mssql_rowcount_versioning: + +Rowcount Support / ORM Versioning +--------------------------------- + +The SQL Server drivers may have limited ability to return the number +of rows updated from an UPDATE or DELETE statement. + +As of this writing, the PyODBC driver is not able to return a rowcount when +OUTPUT INSERTED is used. Previous versions of SQLAlchemy therefore had +limitations for features such as the "ORM Versioning" feature that relies upon +accurate rowcounts in order to match version numbers with matched rows. + +SQLAlchemy 2.0 now retrieves the "rowcount" manually for these particular use +cases based on counting the rows that arrived back within RETURNING; so while +the driver still has this limitation, the ORM Versioning feature is no longer +impacted by it. As of SQLAlchemy 2.0.5, ORM versioning has been fully +re-enabled for the pyodbc driver. + +.. versionchanged:: 2.0.5 ORM versioning support is restored for the pyodbc + driver. Previously, a warning would be emitted during ORM flush that + versioning was not supported. + + +Enabling Snapshot Isolation +--------------------------- + +SQL Server has a default transaction +isolation mode that locks entire tables, and causes even mildly concurrent +applications to have long held locks and frequent deadlocks. +Enabling snapshot isolation for the database as a whole is recommended +for modern levels of concurrency support. This is accomplished via the +following ALTER DATABASE commands executed at the SQL prompt:: + + ALTER DATABASE MyDatabase SET ALLOW_SNAPSHOT_ISOLATION ON + + ALTER DATABASE MyDatabase SET READ_COMMITTED_SNAPSHOT ON + +Background on SQL Server snapshot isolation is available at +https://msdn.microsoft.com/en-us/library/ms175095.aspx. + +""" # noqa + +from __future__ import annotations + +import codecs +import datetime +import operator +import re +from typing import overload +from typing import TYPE_CHECKING +from uuid import UUID as _python_UUID + +from . import information_schema as ischema +from .json import JSON +from .json import JSONIndexType +from .json import JSONPathType +from ... import exc +from ... import Identity +from ... import schema as sa_schema +from ... import Sequence +from ... import sql +from ... import text +from ... import util +from ...engine import cursor as _cursor +from ...engine import default +from ...engine import reflection +from ...engine.reflection import ReflectionDefaults +from ...sql import coercions +from ...sql import compiler +from ...sql import elements +from ...sql import expression +from ...sql import func +from ...sql import quoted_name +from ...sql import roles +from ...sql import sqltypes +from ...sql import try_cast as try_cast # noqa: F401 +from ...sql import util as sql_util +from ...sql._typing import is_sql_compiler +from ...sql.compiler import InsertmanyvaluesSentinelOpts +from ...sql.elements import TryCast as TryCast # noqa: F401 +from ...types import BIGINT +from ...types import BINARY +from ...types import CHAR +from ...types import DATE +from ...types import DATETIME +from ...types import DECIMAL +from ...types import FLOAT +from ...types import INTEGER +from ...types import NCHAR +from ...types import NUMERIC +from ...types import NVARCHAR +from ...types import SMALLINT +from ...types import TEXT +from ...types import VARCHAR +from ...util import update_wrapper +from ...util.typing import Literal + +if TYPE_CHECKING: + from ...sql.dml import DMLState + from ...sql.selectable import TableClause + +# https://sqlserverbuilds.blogspot.com/ +MS_2017_VERSION = (14,) +MS_2016_VERSION = (13,) +MS_2014_VERSION = (12,) +MS_2012_VERSION = (11,) +MS_2008_VERSION = (10,) +MS_2005_VERSION = (9,) +MS_2000_VERSION = (8,) + +RESERVED_WORDS = { + "add", + "all", + "alter", + "and", + "any", + "as", + "asc", + "authorization", + "backup", + "begin", + "between", + "break", + "browse", + "bulk", + "by", + "cascade", + "case", + "check", + "checkpoint", + "close", + "clustered", + "coalesce", + "collate", + "column", + "commit", + "compute", + "constraint", + "contains", + "containstable", + "continue", + "convert", + "create", + "cross", + "current", + "current_date", + "current_time", + "current_timestamp", + "current_user", + "cursor", + "database", + "dbcc", + "deallocate", + "declare", + "default", + "delete", + "deny", + "desc", + "disk", + "distinct", + "distributed", + "double", + "drop", + "dump", + "else", + "end", + "errlvl", + "escape", + "except", + "exec", + "execute", + "exists", + "exit", + "external", + "fetch", + "file", + "fillfactor", + "for", + "foreign", + "freetext", + "freetexttable", + "from", + "full", + "function", + "goto", + "grant", + "group", + "having", + "holdlock", + "identity", + "identity_insert", + "identitycol", + "if", + "in", + "index", + "inner", + "insert", + "intersect", + "into", + "is", + "join", + "key", + "kill", + "left", + "like", + "lineno", + "load", + "merge", + "national", + "nocheck", + "nonclustered", + "not", + "null", + "nullif", + "of", + "off", + "offsets", + "on", + "open", + "opendatasource", + "openquery", + "openrowset", + "openxml", + "option", + "or", + "order", + "outer", + "over", + "percent", + "pivot", + "plan", + "precision", + "primary", + "print", + "proc", + "procedure", + "public", + "raiserror", + "read", + "readtext", + "reconfigure", + "references", + "replication", + "restore", + "restrict", + "return", + "revert", + "revoke", + "right", + "rollback", + "rowcount", + "rowguidcol", + "rule", + "save", + "schema", + "securityaudit", + "select", + "session_user", + "set", + "setuser", + "shutdown", + "some", + "statistics", + "system_user", + "table", + "tablesample", + "textsize", + "then", + "to", + "top", + "tran", + "transaction", + "trigger", + "truncate", + "tsequal", + "union", + "unique", + "unpivot", + "update", + "updatetext", + "use", + "user", + "values", + "varying", + "view", + "waitfor", + "when", + "where", + "while", + "with", + "writetext", +} + + +class REAL(sqltypes.REAL): + """the SQL Server REAL datatype.""" + + def __init__(self, **kw): + # REAL is a synonym for FLOAT(24) on SQL server. + # it is only accepted as the word "REAL" in DDL, the numeric + # precision value is not allowed to be present + kw.setdefault("precision", 24) + super().__init__(**kw) + + +class DOUBLE_PRECISION(sqltypes.DOUBLE_PRECISION): + """the SQL Server DOUBLE PRECISION datatype. + + .. versionadded:: 2.0.11 + + """ + + def __init__(self, **kw): + # DOUBLE PRECISION is a synonym for FLOAT(53) on SQL server. + # it is only accepted as the word "DOUBLE PRECISION" in DDL, + # the numeric precision value is not allowed to be present + kw.setdefault("precision", 53) + super().__init__(**kw) + + +class TINYINT(sqltypes.Integer): + __visit_name__ = "TINYINT" + + +# MSSQL DATE/TIME types have varied behavior, sometimes returning +# strings. MSDate/TIME check for everything, and always +# filter bind parameters into datetime objects (required by pyodbc, +# not sure about other dialects). + + +class _MSDate(sqltypes.Date): + def bind_processor(self, dialect): + def process(value): + if type(value) == datetime.date: + return datetime.datetime(value.year, value.month, value.day) + else: + return value + + return process + + _reg = re.compile(r"(\d+)-(\d+)-(\d+)") + + def result_processor(self, dialect, coltype): + def process(value): + if isinstance(value, datetime.datetime): + return value.date() + elif isinstance(value, str): + m = self._reg.match(value) + if not m: + raise ValueError( + "could not parse %r as a date value" % (value,) + ) + return datetime.date(*[int(x or 0) for x in m.groups()]) + else: + return value + + return process + + +class TIME(sqltypes.TIME): + def __init__(self, precision=None, **kwargs): + self.precision = precision + super().__init__() + + __zero_date = datetime.date(1900, 1, 1) + + def bind_processor(self, dialect): + def process(value): + if isinstance(value, datetime.datetime): + value = datetime.datetime.combine( + self.__zero_date, value.time() + ) + elif isinstance(value, datetime.time): + """issue #5339 + per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns + pass TIME value as string + """ # noqa + value = str(value) + return value + + return process + + _reg = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d{0,6}))?") + + def result_processor(self, dialect, coltype): + def process(value): + if isinstance(value, datetime.datetime): + return value.time() + elif isinstance(value, str): + m = self._reg.match(value) + if not m: + raise ValueError( + "could not parse %r as a time value" % (value,) + ) + return datetime.time(*[int(x or 0) for x in m.groups()]) + else: + return value + + return process + + +_MSTime = TIME + + +class _BASETIMEIMPL(TIME): + __visit_name__ = "_BASETIMEIMPL" + + +class _DateTimeBase: + def bind_processor(self, dialect): + def process(value): + if type(value) == datetime.date: + return datetime.datetime(value.year, value.month, value.day) + else: + return value + + return process + + +class _MSDateTime(_DateTimeBase, sqltypes.DateTime): + pass + + +class SMALLDATETIME(_DateTimeBase, sqltypes.DateTime): + __visit_name__ = "SMALLDATETIME" + + +class DATETIME2(_DateTimeBase, sqltypes.DateTime): + __visit_name__ = "DATETIME2" + + def __init__(self, precision=None, **kw): + super().__init__(**kw) + self.precision = precision + + +class DATETIMEOFFSET(_DateTimeBase, sqltypes.DateTime): + __visit_name__ = "DATETIMEOFFSET" + + def __init__(self, precision=None, **kw): + super().__init__(**kw) + self.precision = precision + + +class _UnicodeLiteral: + def literal_processor(self, dialect): + def process(value): + value = value.replace("'", "''") + + if dialect.identifier_preparer._double_percents: + value = value.replace("%", "%%") + + return "N'%s'" % value + + return process + + +class _MSUnicode(_UnicodeLiteral, sqltypes.Unicode): + pass + + +class _MSUnicodeText(_UnicodeLiteral, sqltypes.UnicodeText): + pass + + +class TIMESTAMP(sqltypes._Binary): + """Implement the SQL Server TIMESTAMP type. + + Note this is **completely different** than the SQL Standard + TIMESTAMP type, which is not supported by SQL Server. It + is a read-only datatype that does not support INSERT of values. + + .. versionadded:: 1.2 + + .. seealso:: + + :class:`_mssql.ROWVERSION` + + """ + + __visit_name__ = "TIMESTAMP" + + # expected by _Binary to be present + length = None + + def __init__(self, convert_int=False): + """Construct a TIMESTAMP or ROWVERSION type. + + :param convert_int: if True, binary integer values will + be converted to integers on read. + + .. versionadded:: 1.2 + + """ + self.convert_int = convert_int + + def result_processor(self, dialect, coltype): + super_ = super().result_processor(dialect, coltype) + if self.convert_int: + + def process(value): + if super_: + value = super_(value) + if value is not None: + # https://stackoverflow.com/a/30403242/34549 + value = int(codecs.encode(value, "hex"), 16) + return value + + return process + else: + return super_ + + +class ROWVERSION(TIMESTAMP): + """Implement the SQL Server ROWVERSION type. + + The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP + datatype, however current SQL Server documentation suggests using + ROWVERSION for new datatypes going forward. + + The ROWVERSION datatype does **not** reflect (e.g. introspect) from the + database as itself; the returned datatype will be + :class:`_mssql.TIMESTAMP`. + + This is a read-only datatype that does not support INSERT of values. + + .. versionadded:: 1.2 + + .. seealso:: + + :class:`_mssql.TIMESTAMP` + + """ + + __visit_name__ = "ROWVERSION" + + +class NTEXT(sqltypes.UnicodeText): + """MSSQL NTEXT type, for variable-length unicode text up to 2^30 + characters.""" + + __visit_name__ = "NTEXT" + + +class VARBINARY(sqltypes.VARBINARY, sqltypes.LargeBinary): + """The MSSQL VARBINARY type. + + This type adds additional features to the core :class:`_types.VARBINARY` + type, including "deprecate_large_types" mode where + either ``VARBINARY(max)`` or IMAGE is rendered, as well as the SQL + Server ``FILESTREAM`` option. + + .. seealso:: + + :ref:`mssql_large_type_deprecation` + + """ + + __visit_name__ = "VARBINARY" + + def __init__(self, length=None, filestream=False): + """ + Construct a VARBINARY type. + + :param length: optional, a length for the column for use in + DDL statements, for those binary types that accept a length, + such as the MySQL BLOB type. + + :param filestream=False: if True, renders the ``FILESTREAM`` keyword + in the table definition. In this case ``length`` must be ``None`` + or ``'max'``. + + .. versionadded:: 1.4.31 + + """ + + self.filestream = filestream + if self.filestream and length not in (None, "max"): + raise ValueError( + "length must be None or 'max' when setting filestream" + ) + super().__init__(length=length) + + +class IMAGE(sqltypes.LargeBinary): + __visit_name__ = "IMAGE" + + +class XML(sqltypes.Text): + """MSSQL XML type. + + This is a placeholder type for reflection purposes that does not include + any Python-side datatype support. It also does not currently support + additional arguments, such as "CONTENT", "DOCUMENT", + "xml_schema_collection". + + """ + + __visit_name__ = "XML" + + +class BIT(sqltypes.Boolean): + """MSSQL BIT type. + + Both pyodbc and pymssql return values from BIT columns as + Python <class 'bool'> so just subclass Boolean. + + """ + + __visit_name__ = "BIT" + + +class MONEY(sqltypes.TypeEngine): + __visit_name__ = "MONEY" + + +class SMALLMONEY(sqltypes.TypeEngine): + __visit_name__ = "SMALLMONEY" + + +class MSUUid(sqltypes.Uuid): + def bind_processor(self, dialect): + if self.native_uuid: + # this is currently assuming pyodbc; might not work for + # some other mssql driver + return None + else: + if self.as_uuid: + + def process(value): + if value is not None: + value = value.hex + return value + + return process + else: + + def process(value): + if value is not None: + value = value.replace("-", "").replace("''", "'") + return value + + return process + + def literal_processor(self, dialect): + if self.native_uuid: + + def process(value): + return f"""'{str(value).replace("''", "'")}'""" + + return process + else: + if self.as_uuid: + + def process(value): + return f"""'{value.hex}'""" + + return process + else: + + def process(value): + return f"""'{ + value.replace("-", "").replace("'", "''") + }'""" + + return process + + +class UNIQUEIDENTIFIER(sqltypes.Uuid[sqltypes._UUID_RETURN]): + __visit_name__ = "UNIQUEIDENTIFIER" + + @overload + def __init__( + self: UNIQUEIDENTIFIER[_python_UUID], as_uuid: Literal[True] = ... + ): ... + + @overload + def __init__( + self: UNIQUEIDENTIFIER[str], as_uuid: Literal[False] = ... + ): ... + + def __init__(self, as_uuid: bool = True): + """Construct a :class:`_mssql.UNIQUEIDENTIFIER` type. + + + :param as_uuid=True: if True, values will be interpreted + as Python uuid objects, converting to/from string via the + DBAPI. + + .. versionchanged: 2.0 Added direct "uuid" support to the + :class:`_mssql.UNIQUEIDENTIFIER` datatype; uuid interpretation + defaults to ``True``. + + """ + self.as_uuid = as_uuid + self.native_uuid = True + + +class SQL_VARIANT(sqltypes.TypeEngine): + __visit_name__ = "SQL_VARIANT" + + +# old names. +MSDateTime = _MSDateTime +MSDate = _MSDate +MSReal = REAL +MSTinyInteger = TINYINT +MSTime = TIME +MSSmallDateTime = SMALLDATETIME +MSDateTime2 = DATETIME2 +MSDateTimeOffset = DATETIMEOFFSET +MSText = TEXT +MSNText = NTEXT +MSString = VARCHAR +MSNVarchar = NVARCHAR +MSChar = CHAR +MSNChar = NCHAR +MSBinary = BINARY +MSVarBinary = VARBINARY +MSImage = IMAGE +MSBit = BIT +MSMoney = MONEY +MSSmallMoney = SMALLMONEY +MSUniqueIdentifier = UNIQUEIDENTIFIER +MSVariant = SQL_VARIANT + +ischema_names = { + "int": INTEGER, + "bigint": BIGINT, + "smallint": SMALLINT, + "tinyint": TINYINT, + "varchar": VARCHAR, + "nvarchar": NVARCHAR, + "char": CHAR, + "nchar": NCHAR, + "text": TEXT, + "ntext": NTEXT, + "decimal": DECIMAL, + "numeric": NUMERIC, + "float": FLOAT, + "datetime": DATETIME, + "datetime2": DATETIME2, + "datetimeoffset": DATETIMEOFFSET, + "date": DATE, + "time": TIME, + "smalldatetime": SMALLDATETIME, + "binary": BINARY, + "varbinary": VARBINARY, + "bit": BIT, + "real": REAL, + "double precision": DOUBLE_PRECISION, + "image": IMAGE, + "xml": XML, + "timestamp": TIMESTAMP, + "money": MONEY, + "smallmoney": SMALLMONEY, + "uniqueidentifier": UNIQUEIDENTIFIER, + "sql_variant": SQL_VARIANT, +} + + +class MSTypeCompiler(compiler.GenericTypeCompiler): + def _extend(self, spec, type_, length=None): + """Extend a string-type declaration with standard SQL + COLLATE annotations. + + """ + + if getattr(type_, "collation", None): + collation = "COLLATE %s" % type_.collation + else: + collation = None + + if not length: + length = type_.length + + if length: + spec = spec + "(%s)" % length + + return " ".join([c for c in (spec, collation) if c is not None]) + + def visit_double(self, type_, **kw): + return self.visit_DOUBLE_PRECISION(type_, **kw) + + def visit_FLOAT(self, type_, **kw): + precision = getattr(type_, "precision", None) + if precision is None: + return "FLOAT" + else: + return "FLOAT(%(precision)s)" % {"precision": precision} + + def visit_TINYINT(self, type_, **kw): + return "TINYINT" + + def visit_TIME(self, type_, **kw): + precision = getattr(type_, "precision", None) + if precision is not None: + return "TIME(%s)" % precision + else: + return "TIME" + + def visit_TIMESTAMP(self, type_, **kw): + return "TIMESTAMP" + + def visit_ROWVERSION(self, type_, **kw): + return "ROWVERSION" + + def visit_datetime(self, type_, **kw): + if type_.timezone: + return self.visit_DATETIMEOFFSET(type_, **kw) + else: + return self.visit_DATETIME(type_, **kw) + + def visit_DATETIMEOFFSET(self, type_, **kw): + precision = getattr(type_, "precision", None) + if precision is not None: + return "DATETIMEOFFSET(%s)" % type_.precision + else: + return "DATETIMEOFFSET" + + def visit_DATETIME2(self, type_, **kw): + precision = getattr(type_, "precision", None) + if precision is not None: + return "DATETIME2(%s)" % precision + else: + return "DATETIME2" + + def visit_SMALLDATETIME(self, type_, **kw): + return "SMALLDATETIME" + + def visit_unicode(self, type_, **kw): + return self.visit_NVARCHAR(type_, **kw) + + def visit_text(self, type_, **kw): + if self.dialect.deprecate_large_types: + return self.visit_VARCHAR(type_, **kw) + else: + return self.visit_TEXT(type_, **kw) + + def visit_unicode_text(self, type_, **kw): + if self.dialect.deprecate_large_types: + return self.visit_NVARCHAR(type_, **kw) + else: + return self.visit_NTEXT(type_, **kw) + + def visit_NTEXT(self, type_, **kw): + return self._extend("NTEXT", type_) + + def visit_TEXT(self, type_, **kw): + return self._extend("TEXT", type_) + + def visit_VARCHAR(self, type_, **kw): + return self._extend("VARCHAR", type_, length=type_.length or "max") + + def visit_CHAR(self, type_, **kw): + return self._extend("CHAR", type_) + + def visit_NCHAR(self, type_, **kw): + return self._extend("NCHAR", type_) + + def visit_NVARCHAR(self, type_, **kw): + return self._extend("NVARCHAR", type_, length=type_.length or "max") + + def visit_date(self, type_, **kw): + if self.dialect.server_version_info < MS_2008_VERSION: + return self.visit_DATETIME(type_, **kw) + else: + return self.visit_DATE(type_, **kw) + + def visit__BASETIMEIMPL(self, type_, **kw): + return self.visit_time(type_, **kw) + + def visit_time(self, type_, **kw): + if self.dialect.server_version_info < MS_2008_VERSION: + return self.visit_DATETIME(type_, **kw) + else: + return self.visit_TIME(type_, **kw) + + def visit_large_binary(self, type_, **kw): + if self.dialect.deprecate_large_types: + return self.visit_VARBINARY(type_, **kw) + else: + return self.visit_IMAGE(type_, **kw) + + def visit_IMAGE(self, type_, **kw): + return "IMAGE" + + def visit_XML(self, type_, **kw): + return "XML" + + def visit_VARBINARY(self, type_, **kw): + text = self._extend("VARBINARY", type_, length=type_.length or "max") + if getattr(type_, "filestream", False): + text += " FILESTREAM" + return text + + def visit_boolean(self, type_, **kw): + return self.visit_BIT(type_) + + def visit_BIT(self, type_, **kw): + return "BIT" + + def visit_JSON(self, type_, **kw): + # this is a bit of a break with SQLAlchemy's convention of + # "UPPERCASE name goes to UPPERCASE type name with no modification" + return self._extend("NVARCHAR", type_, length="max") + + def visit_MONEY(self, type_, **kw): + return "MONEY" + + def visit_SMALLMONEY(self, type_, **kw): + return "SMALLMONEY" + + def visit_uuid(self, type_, **kw): + if type_.native_uuid: + return self.visit_UNIQUEIDENTIFIER(type_, **kw) + else: + return super().visit_uuid(type_, **kw) + + def visit_UNIQUEIDENTIFIER(self, type_, **kw): + return "UNIQUEIDENTIFIER" + + def visit_SQL_VARIANT(self, type_, **kw): + return "SQL_VARIANT" + + +class MSExecutionContext(default.DefaultExecutionContext): + _enable_identity_insert = False + _select_lastrowid = False + _lastrowid = None + + dialect: MSDialect + + def _opt_encode(self, statement): + if self.compiled and self.compiled.schema_translate_map: + rst = self.compiled.preparer._render_schema_translates + statement = rst(statement, self.compiled.schema_translate_map) + + return statement + + def pre_exec(self): + """Activate IDENTITY_INSERT if needed.""" + + if self.isinsert: + if TYPE_CHECKING: + assert is_sql_compiler(self.compiled) + assert isinstance(self.compiled.compile_state, DMLState) + assert isinstance( + self.compiled.compile_state.dml_table, TableClause + ) + + tbl = self.compiled.compile_state.dml_table + id_column = tbl._autoincrement_column + + if id_column is not None and ( + not isinstance(id_column.default, Sequence) + ): + insert_has_identity = True + compile_state = self.compiled.dml_compile_state + self._enable_identity_insert = ( + id_column.key in self.compiled_parameters[0] + ) or ( + compile_state._dict_parameters + and (id_column.key in compile_state._insert_col_keys) + ) + + else: + insert_has_identity = False + self._enable_identity_insert = False + + self._select_lastrowid = ( + not self.compiled.inline + and insert_has_identity + and not self.compiled.effective_returning + and not self._enable_identity_insert + and not self.executemany + ) + + if self._enable_identity_insert: + self.root_connection._cursor_execute( + self.cursor, + self._opt_encode( + "SET IDENTITY_INSERT %s ON" + % self.identifier_preparer.format_table(tbl) + ), + (), + self, + ) + + def post_exec(self): + """Disable IDENTITY_INSERT if enabled.""" + + conn = self.root_connection + + if self.isinsert or self.isupdate or self.isdelete: + self._rowcount = self.cursor.rowcount + + if self._select_lastrowid: + if self.dialect.use_scope_identity: + conn._cursor_execute( + self.cursor, + "SELECT scope_identity() AS lastrowid", + (), + self, + ) + else: + conn._cursor_execute( + self.cursor, "SELECT @@identity AS lastrowid", (), self + ) + # fetchall() ensures the cursor is consumed without closing it + row = self.cursor.fetchall()[0] + self._lastrowid = int(row[0]) + + self.cursor_fetch_strategy = _cursor._NO_CURSOR_DML + elif ( + self.compiled is not None + and is_sql_compiler(self.compiled) + and self.compiled.effective_returning + ): + self.cursor_fetch_strategy = ( + _cursor.FullyBufferedCursorFetchStrategy( + self.cursor, + self.cursor.description, + self.cursor.fetchall(), + ) + ) + + if self._enable_identity_insert: + if TYPE_CHECKING: + assert is_sql_compiler(self.compiled) + assert isinstance(self.compiled.compile_state, DMLState) + assert isinstance( + self.compiled.compile_state.dml_table, TableClause + ) + conn._cursor_execute( + self.cursor, + self._opt_encode( + "SET IDENTITY_INSERT %s OFF" + % self.identifier_preparer.format_table( + self.compiled.compile_state.dml_table + ) + ), + (), + self, + ) + + def get_lastrowid(self): + return self._lastrowid + + def handle_dbapi_exception(self, e): + if self._enable_identity_insert: + try: + self.cursor.execute( + self._opt_encode( + "SET IDENTITY_INSERT %s OFF" + % self.identifier_preparer.format_table( + self.compiled.compile_state.dml_table + ) + ) + ) + except Exception: + pass + + def fire_sequence(self, seq, type_): + return self._execute_scalar( + ( + "SELECT NEXT VALUE FOR %s" + % self.identifier_preparer.format_sequence(seq) + ), + type_, + ) + + def get_insert_default(self, column): + if ( + isinstance(column, sa_schema.Column) + and column is column.table._autoincrement_column + and isinstance(column.default, sa_schema.Sequence) + and column.default.optional + ): + return None + return super().get_insert_default(column) + + +class MSSQLCompiler(compiler.SQLCompiler): + returning_precedes_values = True + + extract_map = util.update_copy( + compiler.SQLCompiler.extract_map, + { + "doy": "dayofyear", + "dow": "weekday", + "milliseconds": "millisecond", + "microseconds": "microsecond", + }, + ) + + def __init__(self, *args, **kwargs): + self.tablealiases = {} + super().__init__(*args, **kwargs) + + def _with_legacy_schema_aliasing(fn): + def decorate(self, *arg, **kw): + if self.dialect.legacy_schema_aliasing: + return fn(self, *arg, **kw) + else: + super_ = getattr(super(MSSQLCompiler, self), fn.__name__) + return super_(*arg, **kw) + + return decorate + + def visit_now_func(self, fn, **kw): + return "CURRENT_TIMESTAMP" + + def visit_current_date_func(self, fn, **kw): + return "GETDATE()" + + def visit_length_func(self, fn, **kw): + return "LEN%s" % self.function_argspec(fn, **kw) + + def visit_char_length_func(self, fn, **kw): + return "LEN%s" % self.function_argspec(fn, **kw) + + def visit_aggregate_strings_func(self, fn, **kw): + expr = fn.clauses.clauses[0]._compiler_dispatch(self, **kw) + kw["literal_execute"] = True + delimeter = fn.clauses.clauses[1]._compiler_dispatch(self, **kw) + return f"string_agg({expr}, {delimeter})" + + def visit_concat_op_expression_clauselist( + self, clauselist, operator, **kw + ): + return " + ".join(self.process(elem, **kw) for elem in clauselist) + + def visit_concat_op_binary(self, binary, operator, **kw): + return "%s + %s" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def visit_true(self, expr, **kw): + return "1" + + def visit_false(self, expr, **kw): + return "0" + + def visit_match_op_binary(self, binary, operator, **kw): + return "CONTAINS (%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def get_select_precolumns(self, select, **kw): + """MS-SQL puts TOP, it's version of LIMIT here""" + + s = super().get_select_precolumns(select, **kw) + + if select._has_row_limiting_clause and self._use_top(select): + # ODBC drivers and possibly others + # don't support bind params in the SELECT clause on SQL Server. + # so have to use literal here. + kw["literal_execute"] = True + s += "TOP %s " % self.process( + self._get_limit_or_fetch(select), **kw + ) + if select._fetch_clause is not None: + if select._fetch_clause_options["percent"]: + s += "PERCENT " + if select._fetch_clause_options["with_ties"]: + s += "WITH TIES " + + return s + + def get_from_hint_text(self, table, text): + return text + + def get_crud_hint_text(self, table, text): + return text + + def _get_limit_or_fetch(self, select): + if select._fetch_clause is None: + return select._limit_clause + else: + return select._fetch_clause + + def _use_top(self, select): + return (select._offset_clause is None) and ( + select._simple_int_clause(select._limit_clause) + or ( + # limit can use TOP with is by itself. fetch only uses TOP + # when it needs to because of PERCENT and/or WITH TIES + # TODO: Why? shouldn't we use TOP always ? + select._simple_int_clause(select._fetch_clause) + and ( + select._fetch_clause_options["percent"] + or select._fetch_clause_options["with_ties"] + ) + ) + ) + + def limit_clause(self, cs, **kwargs): + return "" + + def _check_can_use_fetch_limit(self, select): + # to use ROW_NUMBER(), an ORDER BY is required. + # OFFSET are FETCH are options of the ORDER BY clause + if not select._order_by_clause.clauses: + raise exc.CompileError( + "MSSQL requires an order_by when " + "using an OFFSET or a non-simple " + "LIMIT clause" + ) + + if select._fetch_clause_options is not None and ( + select._fetch_clause_options["percent"] + or select._fetch_clause_options["with_ties"] + ): + raise exc.CompileError( + "MSSQL needs TOP to use PERCENT and/or WITH TIES. " + "Only simple fetch without offset can be used." + ) + + def _row_limit_clause(self, select, **kw): + """MSSQL 2012 supports OFFSET/FETCH operators + Use it instead subquery with row_number + + """ + + if self.dialect._supports_offset_fetch and not self._use_top(select): + self._check_can_use_fetch_limit(select) + + return self.fetch_clause( + select, + fetch_clause=self._get_limit_or_fetch(select), + require_offset=True, + **kw, + ) + + else: + return "" + + def visit_try_cast(self, element, **kw): + return "TRY_CAST (%s AS %s)" % ( + self.process(element.clause, **kw), + self.process(element.typeclause, **kw), + ) + + def translate_select_structure(self, select_stmt, **kwargs): + """Look for ``LIMIT`` and OFFSET in a select statement, and if + so tries to wrap it in a subquery with ``row_number()`` criterion. + MSSQL 2012 and above are excluded + + """ + select = select_stmt + + if ( + select._has_row_limiting_clause + and not self.dialect._supports_offset_fetch + and not self._use_top(select) + and not getattr(select, "_mssql_visit", None) + ): + self._check_can_use_fetch_limit(select) + + _order_by_clauses = [ + sql_util.unwrap_label_reference(elem) + for elem in select._order_by_clause.clauses + ] + + limit_clause = self._get_limit_or_fetch(select) + offset_clause = select._offset_clause + + select = select._generate() + select._mssql_visit = True + select = ( + select.add_columns( + sql.func.ROW_NUMBER() + .over(order_by=_order_by_clauses) + .label("mssql_rn") + ) + .order_by(None) + .alias() + ) + + mssql_rn = sql.column("mssql_rn") + limitselect = sql.select( + *[c for c in select.c if c.key != "mssql_rn"] + ) + if offset_clause is not None: + limitselect = limitselect.where(mssql_rn > offset_clause) + if limit_clause is not None: + limitselect = limitselect.where( + mssql_rn <= (limit_clause + offset_clause) + ) + else: + limitselect = limitselect.where(mssql_rn <= (limit_clause)) + return limitselect + else: + return select + + @_with_legacy_schema_aliasing + def visit_table(self, table, mssql_aliased=False, iscrud=False, **kwargs): + if mssql_aliased is table or iscrud: + return super().visit_table(table, **kwargs) + + # alias schema-qualified tables + alias = self._schema_aliased_table(table) + if alias is not None: + return self.process(alias, mssql_aliased=table, **kwargs) + else: + return super().visit_table(table, **kwargs) + + @_with_legacy_schema_aliasing + def visit_alias(self, alias, **kw): + # translate for schema-qualified table aliases + kw["mssql_aliased"] = alias.element + return super().visit_alias(alias, **kw) + + @_with_legacy_schema_aliasing + def visit_column(self, column, add_to_result_map=None, **kw): + if ( + column.table is not None + and (not self.isupdate and not self.isdelete) + or self.is_subquery() + ): + # translate for schema-qualified table aliases + t = self._schema_aliased_table(column.table) + if t is not None: + converted = elements._corresponding_column_or_error(t, column) + if add_to_result_map is not None: + add_to_result_map( + column.name, + column.name, + (column, column.name, column.key), + column.type, + ) + + return super().visit_column(converted, **kw) + + return super().visit_column( + column, add_to_result_map=add_to_result_map, **kw + ) + + def _schema_aliased_table(self, table): + if getattr(table, "schema", None) is not None: + if table not in self.tablealiases: + self.tablealiases[table] = table.alias() + return self.tablealiases[table] + else: + return None + + def visit_extract(self, extract, **kw): + field = self.extract_map.get(extract.field, extract.field) + return "DATEPART(%s, %s)" % (field, self.process(extract.expr, **kw)) + + def visit_savepoint(self, savepoint_stmt, **kw): + return "SAVE TRANSACTION %s" % self.preparer.format_savepoint( + savepoint_stmt + ) + + def visit_rollback_to_savepoint(self, savepoint_stmt, **kw): + return "ROLLBACK TRANSACTION %s" % self.preparer.format_savepoint( + savepoint_stmt + ) + + def visit_binary(self, binary, **kwargs): + """Move bind parameters to the right-hand side of an operator, where + possible. + + """ + if ( + isinstance(binary.left, expression.BindParameter) + and binary.operator == operator.eq + and not isinstance(binary.right, expression.BindParameter) + ): + return self.process( + expression.BinaryExpression( + binary.right, binary.left, binary.operator + ), + **kwargs, + ) + return super().visit_binary(binary, **kwargs) + + def returning_clause( + self, stmt, returning_cols, *, populate_result_map, **kw + ): + # SQL server returning clause requires that the columns refer to + # the virtual table names "inserted" or "deleted". Here, we make + # a simple alias of our table with that name, and then adapt the + # columns we have from the list of RETURNING columns to that new name + # so that they render as "inserted.<colname>" / "deleted.<colname>". + + if stmt.is_insert or stmt.is_update: + target = stmt.table.alias("inserted") + elif stmt.is_delete: + target = stmt.table.alias("deleted") + else: + assert False, "expected Insert, Update or Delete statement" + + adapter = sql_util.ClauseAdapter(target) + + # adapter.traverse() takes a column from our target table and returns + # the one that is linked to the "inserted" / "deleted" tables. So in + # order to retrieve these values back from the result (e.g. like + # row[column]), tell the compiler to also add the original unadapted + # column to the result map. Before #4877, these were (unknowingly) + # falling back using string name matching in the result set which + # necessarily used an expensive KeyError in order to match. + + columns = [ + self._label_returning_column( + stmt, + adapter.traverse(column), + populate_result_map, + {"result_map_targets": (column,)}, + fallback_label_name=fallback_label_name, + column_is_repeated=repeated, + name=name, + proxy_name=proxy_name, + **kw, + ) + for ( + name, + proxy_name, + fallback_label_name, + column, + repeated, + ) in stmt._generate_columns_plus_names( + True, cols=expression._select_iterables(returning_cols) + ) + ] + + return "OUTPUT " + ", ".join(columns) + + def get_cte_preamble(self, recursive): + # SQL Server finds it too inconvenient to accept + # an entirely optional, SQL standard specified, + # "RECURSIVE" word with their "WITH", + # so here we go + return "WITH" + + def label_select_column(self, select, column, asfrom): + if isinstance(column, expression.Function): + return column.label(None) + else: + return super().label_select_column(select, column, asfrom) + + def for_update_clause(self, select, **kw): + # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which + # SQLAlchemy doesn't use + return "" + + def order_by_clause(self, select, **kw): + # MSSQL only allows ORDER BY in subqueries if there is a LIMIT: + # "The ORDER BY clause is invalid in views, inline functions, + # derived tables, subqueries, and common table expressions, + # unless TOP, OFFSET or FOR XML is also specified." + if ( + self.is_subquery() + and not self._use_top(select) + and ( + select._offset is None + or not self.dialect._supports_offset_fetch + ) + ): + # avoid processing the order by clause if we won't end up + # using it, because we don't want all the bind params tacked + # onto the positional list if that is what the dbapi requires + return "" + + order_by = self.process(select._order_by_clause, **kw) + + if order_by: + return " ORDER BY " + order_by + else: + return "" + + def update_from_clause( + self, update_stmt, from_table, extra_froms, from_hints, **kw + ): + """Render the UPDATE..FROM clause specific to MSSQL. + + In MSSQL, if the UPDATE statement involves an alias of the table to + be updated, then the table itself must be added to the FROM list as + well. Otherwise, it is optional. Here, we add it regardless. + + """ + return "FROM " + ", ".join( + t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) + for t in [from_table] + extra_froms + ) + + def delete_table_clause(self, delete_stmt, from_table, extra_froms, **kw): + """If we have extra froms make sure we render any alias as hint.""" + ashint = False + if extra_froms: + ashint = True + return from_table._compiler_dispatch( + self, asfrom=True, iscrud=True, ashint=ashint, **kw + ) + + def delete_extra_from_clause( + self, delete_stmt, from_table, extra_froms, from_hints, **kw + ): + """Render the DELETE .. FROM clause specific to MSSQL. + + Yes, it has the FROM keyword twice. + + """ + return "FROM " + ", ".join( + t._compiler_dispatch(self, asfrom=True, fromhints=from_hints, **kw) + for t in [from_table] + extra_froms + ) + + def visit_empty_set_expr(self, type_, **kw): + return "SELECT 1 WHERE 1!=1" + + def visit_is_distinct_from_binary(self, binary, operator, **kw): + return "NOT EXISTS (SELECT %s INTERSECT SELECT %s)" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_is_not_distinct_from_binary(self, binary, operator, **kw): + return "EXISTS (SELECT %s INTERSECT SELECT %s)" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def _render_json_extract_from_binary(self, binary, operator, **kw): + # note we are intentionally calling upon the process() calls in the + # order in which they appear in the SQL String as this is used + # by positional parameter rendering + + if binary.type._type_affinity is sqltypes.JSON: + return "JSON_QUERY(%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + # as with other dialects, start with an explicit test for NULL + case_expression = "CASE JSON_VALUE(%s, %s) WHEN NULL THEN NULL" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + if binary.type._type_affinity is sqltypes.Integer: + type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS INTEGER)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + elif binary.type._type_affinity is sqltypes.Numeric: + type_expression = "ELSE CAST(JSON_VALUE(%s, %s) AS %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ( + "FLOAT" + if isinstance(binary.type, sqltypes.Float) + else "NUMERIC(%s, %s)" + % (binary.type.precision, binary.type.scale) + ), + ) + elif binary.type._type_affinity is sqltypes.Boolean: + # the NULL handling is particularly weird with boolean, so + # explicitly return numeric (BIT) constants + type_expression = ( + "WHEN 'true' THEN 1 WHEN 'false' THEN 0 ELSE NULL" + ) + elif binary.type._type_affinity is sqltypes.String: + # TODO: does this comment (from mysql) apply to here, too? + # this fails with a JSON value that's a four byte unicode + # string. SQLite has the same problem at the moment + type_expression = "ELSE JSON_VALUE(%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + else: + # other affinity....this is not expected right now + type_expression = "ELSE JSON_QUERY(%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + return case_expression + " " + type_expression + " END" + + def visit_json_getitem_op_binary(self, binary, operator, **kw): + return self._render_json_extract_from_binary(binary, operator, **kw) + + def visit_json_path_getitem_op_binary(self, binary, operator, **kw): + return self._render_json_extract_from_binary(binary, operator, **kw) + + def visit_sequence(self, seq, **kw): + return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq) + + +class MSSQLStrictCompiler(MSSQLCompiler): + """A subclass of MSSQLCompiler which disables the usage of bind + parameters where not allowed natively by MS-SQL. + + A dialect may use this compiler on a platform where native + binds are used. + + """ + + ansi_bind_rules = True + + def visit_in_op_binary(self, binary, operator, **kw): + kw["literal_execute"] = True + return "%s IN %s" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def visit_not_in_op_binary(self, binary, operator, **kw): + kw["literal_execute"] = True + return "%s NOT IN %s" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def render_literal_value(self, value, type_): + """ + For date and datetime values, convert to a string + format acceptable to MSSQL. That seems to be the + so-called ODBC canonical date format which looks + like this: + + yyyy-mm-dd hh:mi:ss.mmm(24h) + + For other data types, call the base class implementation. + """ + # datetime and date are both subclasses of datetime.date + if issubclass(type(value), datetime.date): + # SQL Server wants single quotes around the date string. + return "'" + str(value) + "'" + else: + return super().render_literal_value(value, type_) + + +class MSDDLCompiler(compiler.DDLCompiler): + def get_column_specification(self, column, **kwargs): + colspec = self.preparer.format_column(column) + + # type is not accepted in a computed column + if column.computed is not None: + colspec += " " + self.process(column.computed) + else: + colspec += " " + self.dialect.type_compiler_instance.process( + column.type, type_expression=column + ) + + if column.nullable is not None: + if ( + not column.nullable + or column.primary_key + or isinstance(column.default, sa_schema.Sequence) + or column.autoincrement is True + or column.identity + ): + colspec += " NOT NULL" + elif column.computed is None: + # don't specify "NULL" for computed columns + colspec += " NULL" + + if column.table is None: + raise exc.CompileError( + "mssql requires Table-bound columns " + "in order to generate DDL" + ) + + d_opt = column.dialect_options["mssql"] + start = d_opt["identity_start"] + increment = d_opt["identity_increment"] + if start is not None or increment is not None: + if column.identity: + raise exc.CompileError( + "Cannot specify options 'mssql_identity_start' and/or " + "'mssql_identity_increment' while also using the " + "'Identity' construct." + ) + util.warn_deprecated( + "The dialect options 'mssql_identity_start' and " + "'mssql_identity_increment' are deprecated. " + "Use the 'Identity' object instead.", + "1.4", + ) + + if column.identity: + colspec += self.process(column.identity, **kwargs) + elif ( + column is column.table._autoincrement_column + or column.autoincrement is True + ) and ( + not isinstance(column.default, Sequence) or column.default.optional + ): + colspec += self.process(Identity(start=start, increment=increment)) + else: + default = self.get_column_default_string(column) + if default is not None: + colspec += " DEFAULT " + default + + return colspec + + def visit_create_index(self, create, include_schema=False, **kw): + index = create.element + self._verify_index_table(index) + preparer = self.preparer + text = "CREATE " + if index.unique: + text += "UNIQUE " + + # handle clustering option + clustered = index.dialect_options["mssql"]["clustered"] + if clustered is not None: + if clustered: + text += "CLUSTERED " + else: + text += "NONCLUSTERED " + + # handle columnstore option (has no negative value) + columnstore = index.dialect_options["mssql"]["columnstore"] + if columnstore: + text += "COLUMNSTORE " + + text += "INDEX %s ON %s" % ( + self._prepared_index_name(index, include_schema=include_schema), + preparer.format_table(index.table), + ) + + # in some case mssql allows indexes with no columns defined + if len(index.expressions) > 0: + text += " (%s)" % ", ".join( + self.sql_compiler.process( + expr, include_table=False, literal_binds=True + ) + for expr in index.expressions + ) + + # handle other included columns + if index.dialect_options["mssql"]["include"]: + inclusions = [ + index.table.c[col] if isinstance(col, str) else col + for col in index.dialect_options["mssql"]["include"] + ] + + text += " INCLUDE (%s)" % ", ".join( + [preparer.quote(c.name) for c in inclusions] + ) + + whereclause = index.dialect_options["mssql"]["where"] + + if whereclause is not None: + whereclause = coercions.expect( + roles.DDLExpressionRole, whereclause + ) + + where_compiled = self.sql_compiler.process( + whereclause, include_table=False, literal_binds=True + ) + text += " WHERE " + where_compiled + + return text + + def visit_drop_index(self, drop, **kw): + return "\nDROP INDEX %s ON %s" % ( + self._prepared_index_name(drop.element, include_schema=False), + self.preparer.format_table(drop.element.table), + ) + + def visit_primary_key_constraint(self, constraint, **kw): + if len(constraint) == 0: + return "" + text = "" + if constraint.name is not None: + text += "CONSTRAINT %s " % self.preparer.format_constraint( + constraint + ) + text += "PRIMARY KEY " + + clustered = constraint.dialect_options["mssql"]["clustered"] + if clustered is not None: + if clustered: + text += "CLUSTERED " + else: + text += "NONCLUSTERED " + + text += "(%s)" % ", ".join( + self.preparer.quote(c.name) for c in constraint + ) + text += self.define_constraint_deferrability(constraint) + return text + + def visit_unique_constraint(self, constraint, **kw): + if len(constraint) == 0: + return "" + text = "" + if constraint.name is not None: + formatted_name = self.preparer.format_constraint(constraint) + if formatted_name is not None: + text += "CONSTRAINT %s " % formatted_name + text += "UNIQUE %s" % self.define_unique_constraint_distinct( + constraint, **kw + ) + clustered = constraint.dialect_options["mssql"]["clustered"] + if clustered is not None: + if clustered: + text += "CLUSTERED " + else: + text += "NONCLUSTERED " + + text += "(%s)" % ", ".join( + self.preparer.quote(c.name) for c in constraint + ) + text += self.define_constraint_deferrability(constraint) + return text + + def visit_computed_column(self, generated, **kw): + text = "AS (%s)" % self.sql_compiler.process( + generated.sqltext, include_table=False, literal_binds=True + ) + # explicitly check for True|False since None means server default + if generated.persisted is True: + text += " PERSISTED" + return text + + def visit_set_table_comment(self, create, **kw): + schema = self.preparer.schema_for_object(create.element) + schema_name = schema if schema else self.dialect.default_schema_name + return ( + "execute sp_addextendedproperty 'MS_Description', " + "{}, 'schema', {}, 'table', {}".format( + self.sql_compiler.render_literal_value( + create.element.comment, sqltypes.NVARCHAR() + ), + self.preparer.quote_schema(schema_name), + self.preparer.format_table(create.element, use_schema=False), + ) + ) + + def visit_drop_table_comment(self, drop, **kw): + schema = self.preparer.schema_for_object(drop.element) + schema_name = schema if schema else self.dialect.default_schema_name + return ( + "execute sp_dropextendedproperty 'MS_Description', 'schema', " + "{}, 'table', {}".format( + self.preparer.quote_schema(schema_name), + self.preparer.format_table(drop.element, use_schema=False), + ) + ) + + def visit_set_column_comment(self, create, **kw): + schema = self.preparer.schema_for_object(create.element.table) + schema_name = schema if schema else self.dialect.default_schema_name + return ( + "execute sp_addextendedproperty 'MS_Description', " + "{}, 'schema', {}, 'table', {}, 'column', {}".format( + self.sql_compiler.render_literal_value( + create.element.comment, sqltypes.NVARCHAR() + ), + self.preparer.quote_schema(schema_name), + self.preparer.format_table( + create.element.table, use_schema=False + ), + self.preparer.format_column(create.element), + ) + ) + + def visit_drop_column_comment(self, drop, **kw): + schema = self.preparer.schema_for_object(drop.element.table) + schema_name = schema if schema else self.dialect.default_schema_name + return ( + "execute sp_dropextendedproperty 'MS_Description', 'schema', " + "{}, 'table', {}, 'column', {}".format( + self.preparer.quote_schema(schema_name), + self.preparer.format_table( + drop.element.table, use_schema=False + ), + self.preparer.format_column(drop.element), + ) + ) + + def visit_create_sequence(self, create, **kw): + prefix = None + if create.element.data_type is not None: + data_type = create.element.data_type + prefix = " AS %s" % self.type_compiler.process(data_type) + return super().visit_create_sequence(create, prefix=prefix, **kw) + + def visit_identity_column(self, identity, **kw): + text = " IDENTITY" + if identity.start is not None or identity.increment is not None: + start = 1 if identity.start is None else identity.start + increment = 1 if identity.increment is None else identity.increment + text += "(%s,%s)" % (start, increment) + return text + + +class MSIdentifierPreparer(compiler.IdentifierPreparer): + reserved_words = RESERVED_WORDS + + def __init__(self, dialect): + super().__init__( + dialect, + initial_quote="[", + final_quote="]", + quote_case_sensitive_collations=False, + ) + + def _escape_identifier(self, value): + return value.replace("]", "]]") + + def _unescape_identifier(self, value): + return value.replace("]]", "]") + + def quote_schema(self, schema, force=None): + """Prepare a quoted table and schema name.""" + + # need to re-implement the deprecation warning entirely + if force is not None: + # not using the util.deprecated_params() decorator in this + # case because of the additional function call overhead on this + # very performance-critical spot. + util.warn_deprecated( + "The IdentifierPreparer.quote_schema.force parameter is " + "deprecated and will be removed in a future release. This " + "flag has no effect on the behavior of the " + "IdentifierPreparer.quote method; please refer to " + "quoted_name().", + version="1.3", + ) + + dbname, owner = _schema_elements(schema) + if dbname: + result = "%s.%s" % (self.quote(dbname), self.quote(owner)) + elif owner: + result = self.quote(owner) + else: + result = "" + return result + + +def _db_plus_owner_listing(fn): + def wrap(dialect, connection, schema=None, **kw): + dbname, owner = _owner_plus_db(dialect, schema) + return _switch_db( + dbname, + connection, + fn, + dialect, + connection, + dbname, + owner, + schema, + **kw, + ) + + return update_wrapper(wrap, fn) + + +def _db_plus_owner(fn): + def wrap(dialect, connection, tablename, schema=None, **kw): + dbname, owner = _owner_plus_db(dialect, schema) + return _switch_db( + dbname, + connection, + fn, + dialect, + connection, + tablename, + dbname, + owner, + schema, + **kw, + ) + + return update_wrapper(wrap, fn) + + +def _switch_db(dbname, connection, fn, *arg, **kw): + if dbname: + current_db = connection.exec_driver_sql("select db_name()").scalar() + if current_db != dbname: + connection.exec_driver_sql( + "use %s" % connection.dialect.identifier_preparer.quote(dbname) + ) + try: + return fn(*arg, **kw) + finally: + if dbname and current_db != dbname: + connection.exec_driver_sql( + "use %s" + % connection.dialect.identifier_preparer.quote(current_db) + ) + + +def _owner_plus_db(dialect, schema): + if not schema: + return None, dialect.default_schema_name + else: + return _schema_elements(schema) + + +_memoized_schema = util.LRUCache() + + +def _schema_elements(schema): + if isinstance(schema, quoted_name) and schema.quote: + return None, schema + + if schema in _memoized_schema: + return _memoized_schema[schema] + + # tests for this function are in: + # test/dialect/mssql/test_reflection.py -> + # OwnerPlusDBTest.test_owner_database_pairs + # test/dialect/mssql/test_compiler.py -> test_force_schema_* + # test/dialect/mssql/test_compiler.py -> test_schema_many_tokens_* + # + + if schema.startswith("__[SCHEMA_"): + return None, schema + + push = [] + symbol = "" + bracket = False + has_brackets = False + for token in re.split(r"(\[|\]|\.)", schema): + if not token: + continue + if token == "[": + bracket = True + has_brackets = True + elif token == "]": + bracket = False + elif not bracket and token == ".": + if has_brackets: + push.append("[%s]" % symbol) + else: + push.append(symbol) + symbol = "" + has_brackets = False + else: + symbol += token + if symbol: + push.append(symbol) + if len(push) > 1: + dbname, owner = ".".join(push[0:-1]), push[-1] + + # test for internal brackets + if re.match(r".*\].*\[.*", dbname[1:-1]): + dbname = quoted_name(dbname, quote=False) + else: + dbname = dbname.lstrip("[").rstrip("]") + + elif len(push): + dbname, owner = None, push[0] + else: + dbname, owner = None, None + + _memoized_schema[schema] = dbname, owner + return dbname, owner + + +class MSDialect(default.DefaultDialect): + # will assume it's at least mssql2005 + name = "mssql" + supports_statement_cache = True + supports_default_values = True + supports_empty_insert = False + favor_returning_over_lastrowid = True + + returns_native_bytes = True + + supports_comments = True + supports_default_metavalue = False + """dialect supports INSERT... VALUES (DEFAULT) syntax - + SQL Server **does** support this, but **not** for the IDENTITY column, + so we can't turn this on. + + """ + + # supports_native_uuid is partial here, so we implement our + # own impl type + + execution_ctx_cls = MSExecutionContext + use_scope_identity = True + max_identifier_length = 128 + schema_name = "dbo" + + insert_returning = True + update_returning = True + delete_returning = True + update_returning_multifrom = True + delete_returning_multifrom = True + + colspecs = { + sqltypes.DateTime: _MSDateTime, + sqltypes.Date: _MSDate, + sqltypes.JSON: JSON, + sqltypes.JSON.JSONIndexType: JSONIndexType, + sqltypes.JSON.JSONPathType: JSONPathType, + sqltypes.Time: _BASETIMEIMPL, + sqltypes.Unicode: _MSUnicode, + sqltypes.UnicodeText: _MSUnicodeText, + DATETIMEOFFSET: DATETIMEOFFSET, + DATETIME2: DATETIME2, + SMALLDATETIME: SMALLDATETIME, + DATETIME: DATETIME, + sqltypes.Uuid: MSUUid, + } + + engine_config_types = default.DefaultDialect.engine_config_types.union( + {"legacy_schema_aliasing": util.asbool} + ) + + ischema_names = ischema_names + + supports_sequences = True + sequences_optional = True + # This is actually used for autoincrement, where itentity is used that + # starts with 1. + # for sequences T-SQL's actual default is -9223372036854775808 + default_sequence_base = 1 + + supports_native_boolean = False + non_native_boolean_check_constraint = False + supports_unicode_binds = True + postfetch_lastrowid = True + + # may be changed at server inspection time for older SQL server versions + supports_multivalues_insert = True + + use_insertmanyvalues = True + + # note pyodbc will set this to False if fast_executemany is set, + # as of SQLAlchemy 2.0.9 + use_insertmanyvalues_wo_returning = True + + insertmanyvalues_implicit_sentinel = ( + InsertmanyvaluesSentinelOpts.AUTOINCREMENT + | InsertmanyvaluesSentinelOpts.IDENTITY + | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT + ) + + # "The incoming request has too many parameters. The server supports a " + # "maximum of 2100 parameters." + # in fact you can have 2099 parameters. + insertmanyvalues_max_parameters = 2099 + + _supports_offset_fetch = False + _supports_nvarchar_max = False + + legacy_schema_aliasing = False + + server_version_info = () + + statement_compiler = MSSQLCompiler + ddl_compiler = MSDDLCompiler + type_compiler_cls = MSTypeCompiler + preparer = MSIdentifierPreparer + + construct_arguments = [ + (sa_schema.PrimaryKeyConstraint, {"clustered": None}), + (sa_schema.UniqueConstraint, {"clustered": None}), + ( + sa_schema.Index, + { + "clustered": None, + "include": None, + "where": None, + "columnstore": None, + }, + ), + ( + sa_schema.Column, + {"identity_start": None, "identity_increment": None}, + ), + ] + + def __init__( + self, + query_timeout=None, + use_scope_identity=True, + schema_name="dbo", + deprecate_large_types=None, + supports_comments=None, + json_serializer=None, + json_deserializer=None, + legacy_schema_aliasing=None, + ignore_no_transaction_on_rollback=False, + **opts, + ): + self.query_timeout = int(query_timeout or 0) + self.schema_name = schema_name + + self.use_scope_identity = use_scope_identity + self.deprecate_large_types = deprecate_large_types + self.ignore_no_transaction_on_rollback = ( + ignore_no_transaction_on_rollback + ) + self._user_defined_supports_comments = uds = supports_comments + if uds is not None: + self.supports_comments = uds + + if legacy_schema_aliasing is not None: + util.warn_deprecated( + "The legacy_schema_aliasing parameter is " + "deprecated and will be removed in a future release.", + "1.4", + ) + self.legacy_schema_aliasing = legacy_schema_aliasing + + super().__init__(**opts) + + self._json_serializer = json_serializer + self._json_deserializer = json_deserializer + + def do_savepoint(self, connection, name): + # give the DBAPI a push + connection.exec_driver_sql("IF @@TRANCOUNT = 0 BEGIN TRANSACTION") + super().do_savepoint(connection, name) + + def do_release_savepoint(self, connection, name): + # SQL Server does not support RELEASE SAVEPOINT + pass + + def do_rollback(self, dbapi_connection): + try: + super().do_rollback(dbapi_connection) + except self.dbapi.ProgrammingError as e: + if self.ignore_no_transaction_on_rollback and re.match( + r".*\b111214\b", str(e) + ): + util.warn( + "ProgrammingError 111214 " + "'No corresponding transaction found.' " + "has been suppressed via " + "ignore_no_transaction_on_rollback=True" + ) + else: + raise + + _isolation_lookup = { + "SERIALIZABLE", + "READ UNCOMMITTED", + "READ COMMITTED", + "REPEATABLE READ", + "SNAPSHOT", + } + + def get_isolation_level_values(self, dbapi_connection): + return list(self._isolation_lookup) + + def set_isolation_level(self, dbapi_connection, level): + cursor = dbapi_connection.cursor() + cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}") + cursor.close() + if level == "SNAPSHOT": + dbapi_connection.commit() + + def get_isolation_level(self, dbapi_connection): + cursor = dbapi_connection.cursor() + view_name = "sys.system_views" + try: + cursor.execute( + ( + "SELECT name FROM {} WHERE name IN " + "('dm_exec_sessions', 'dm_pdw_nodes_exec_sessions')" + ).format(view_name) + ) + row = cursor.fetchone() + if not row: + raise NotImplementedError( + "Can't fetch isolation level on this particular " + "SQL Server version." + ) + + view_name = f"sys.{row[0]}" + + cursor.execute( + """ + SELECT CASE transaction_isolation_level + WHEN 0 THEN NULL + WHEN 1 THEN 'READ UNCOMMITTED' + WHEN 2 THEN 'READ COMMITTED' + WHEN 3 THEN 'REPEATABLE READ' + WHEN 4 THEN 'SERIALIZABLE' + WHEN 5 THEN 'SNAPSHOT' END + AS TRANSACTION_ISOLATION_LEVEL + FROM {} + where session_id = @@SPID + """.format( + view_name + ) + ) + except self.dbapi.Error as err: + raise NotImplementedError( + "Can't fetch isolation level; encountered error {} when " + 'attempting to query the "{}" view.'.format(err, view_name) + ) from err + else: + row = cursor.fetchone() + return row[0].upper() + finally: + cursor.close() + + def initialize(self, connection): + super().initialize(connection) + self._setup_version_attributes() + self._setup_supports_nvarchar_max(connection) + self._setup_supports_comments(connection) + + def _setup_version_attributes(self): + if self.server_version_info[0] not in list(range(8, 17)): + util.warn( + "Unrecognized server version info '%s'. Some SQL Server " + "features may not function properly." + % ".".join(str(x) for x in self.server_version_info) + ) + + if self.server_version_info >= MS_2008_VERSION: + self.supports_multivalues_insert = True + else: + self.supports_multivalues_insert = False + + if self.deprecate_large_types is None: + self.deprecate_large_types = ( + self.server_version_info >= MS_2012_VERSION + ) + + self._supports_offset_fetch = ( + self.server_version_info and self.server_version_info[0] >= 11 + ) + + def _setup_supports_nvarchar_max(self, connection): + try: + connection.scalar( + sql.text("SELECT CAST('test max support' AS NVARCHAR(max))") + ) + except exc.DBAPIError: + self._supports_nvarchar_max = False + else: + self._supports_nvarchar_max = True + + def _setup_supports_comments(self, connection): + if self._user_defined_supports_comments is not None: + return + + try: + connection.scalar( + sql.text( + "SELECT 1 FROM fn_listextendedproperty" + "(default, default, default, default, " + "default, default, default)" + ) + ) + except exc.DBAPIError: + self.supports_comments = False + else: + self.supports_comments = True + + def _get_default_schema_name(self, connection): + query = sql.text("SELECT schema_name()") + default_schema_name = connection.scalar(query) + if default_schema_name is not None: + # guard against the case where the default_schema_name is being + # fed back into a table reflection function. + return quoted_name(default_schema_name, quote=True) + else: + return self.schema_name + + @_db_plus_owner + def has_table(self, connection, tablename, dbname, owner, schema, **kw): + self._ensure_has_table_connection(connection) + + return self._internal_has_table(connection, tablename, owner, **kw) + + @reflection.cache + @_db_plus_owner + def has_sequence( + self, connection, sequencename, dbname, owner, schema, **kw + ): + sequences = ischema.sequences + + s = sql.select(sequences.c.sequence_name).where( + sequences.c.sequence_name == sequencename + ) + + if owner: + s = s.where(sequences.c.sequence_schema == owner) + + c = connection.execute(s) + + return c.first() is not None + + @reflection.cache + @_db_plus_owner_listing + def get_sequence_names(self, connection, dbname, owner, schema, **kw): + sequences = ischema.sequences + + s = sql.select(sequences.c.sequence_name) + if owner: + s = s.where(sequences.c.sequence_schema == owner) + + c = connection.execute(s) + + return [row[0] for row in c] + + @reflection.cache + def get_schema_names(self, connection, **kw): + s = sql.select(ischema.schemata.c.schema_name).order_by( + ischema.schemata.c.schema_name + ) + schema_names = [r[0] for r in connection.execute(s)] + return schema_names + + @reflection.cache + @_db_plus_owner_listing + def get_table_names(self, connection, dbname, owner, schema, **kw): + tables = ischema.tables + s = ( + sql.select(tables.c.table_name) + .where( + sql.and_( + tables.c.table_schema == owner, + tables.c.table_type == "BASE TABLE", + ) + ) + .order_by(tables.c.table_name) + ) + table_names = [r[0] for r in connection.execute(s)] + return table_names + + @reflection.cache + @_db_plus_owner_listing + def get_view_names(self, connection, dbname, owner, schema, **kw): + tables = ischema.tables + s = ( + sql.select(tables.c.table_name) + .where( + sql.and_( + tables.c.table_schema == owner, + tables.c.table_type == "VIEW", + ) + ) + .order_by(tables.c.table_name) + ) + view_names = [r[0] for r in connection.execute(s)] + return view_names + + @reflection.cache + def _internal_has_table(self, connection, tablename, owner, **kw): + if tablename.startswith("#"): # temporary table + # mssql does not support temporary views + # SQL Error [4103] [S0001]: "#v": Temporary views are not allowed + return bool( + connection.scalar( + # U filters on user tables only. + text("SELECT object_id(:table_name, 'U')"), + {"table_name": f"tempdb.dbo.[{tablename}]"}, + ) + ) + else: + tables = ischema.tables + + s = sql.select(tables.c.table_name).where( + sql.and_( + sql.or_( + tables.c.table_type == "BASE TABLE", + tables.c.table_type == "VIEW", + ), + tables.c.table_name == tablename, + ) + ) + + if owner: + s = s.where(tables.c.table_schema == owner) + + c = connection.execute(s) + + return c.first() is not None + + def _default_or_error(self, connection, tablename, owner, method, **kw): + # TODO: try to avoid having to run a separate query here + if self._internal_has_table(connection, tablename, owner, **kw): + return method() + else: + raise exc.NoSuchTableError(f"{owner}.{tablename}") + + @reflection.cache + @_db_plus_owner + def get_indexes(self, connection, tablename, dbname, owner, schema, **kw): + filter_definition = ( + "ind.filter_definition" + if self.server_version_info >= MS_2008_VERSION + else "NULL as filter_definition" + ) + rp = connection.execution_options(future_result=True).execute( + sql.text( + f""" +select + ind.index_id, + ind.is_unique, + ind.name, + ind.type, + {filter_definition} +from + sys.indexes as ind +join sys.tables as tab on + ind.object_id = tab.object_id +join sys.schemas as sch on + sch.schema_id = tab.schema_id +where + tab.name = :tabname + and sch.name = :schname + and ind.is_primary_key = 0 + and ind.type != 0 +order by + ind.name + """ + ) + .bindparams( + sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), + sql.bindparam("schname", owner, ischema.CoerceUnicode()), + ) + .columns(name=sqltypes.Unicode()) + ) + indexes = {} + for row in rp.mappings(): + indexes[row["index_id"]] = current = { + "name": row["name"], + "unique": row["is_unique"] == 1, + "column_names": [], + "include_columns": [], + "dialect_options": {}, + } + + do = current["dialect_options"] + index_type = row["type"] + if index_type in {1, 2}: + do["mssql_clustered"] = index_type == 1 + if index_type in {5, 6}: + do["mssql_clustered"] = index_type == 5 + do["mssql_columnstore"] = True + if row["filter_definition"] is not None: + do["mssql_where"] = row["filter_definition"] + + rp = connection.execution_options(future_result=True).execute( + sql.text( + """ +select + ind_col.index_id, + col.name, + ind_col.is_included_column +from + sys.columns as col +join sys.tables as tab on + tab.object_id = col.object_id +join sys.index_columns as ind_col on + ind_col.column_id = col.column_id + and ind_col.object_id = tab.object_id +join sys.schemas as sch on + sch.schema_id = tab.schema_id +where + tab.name = :tabname + and sch.name = :schname + """ + ) + .bindparams( + sql.bindparam("tabname", tablename, ischema.CoerceUnicode()), + sql.bindparam("schname", owner, ischema.CoerceUnicode()), + ) + .columns(name=sqltypes.Unicode()) + ) + for row in rp.mappings(): + if row["index_id"] not in indexes: + continue + index_def = indexes[row["index_id"]] + is_colstore = index_def["dialect_options"].get("mssql_columnstore") + is_clustered = index_def["dialect_options"].get("mssql_clustered") + if not (is_colstore and is_clustered): + # a clustered columnstore index includes all columns but does + # not want them in the index definition + if row["is_included_column"] and not is_colstore: + # a noncludsted columnstore index reports that includes + # columns but requires that are listed as normal columns + index_def["include_columns"].append(row["name"]) + else: + index_def["column_names"].append(row["name"]) + for index_info in indexes.values(): + # NOTE: "root level" include_columns is legacy, now part of + # dialect_options (issue #7382) + index_info["dialect_options"]["mssql_include"] = index_info[ + "include_columns" + ] + + if indexes: + return list(indexes.values()) + else: + return self._default_or_error( + connection, tablename, owner, ReflectionDefaults.indexes, **kw + ) + + @reflection.cache + @_db_plus_owner + def get_view_definition( + self, connection, viewname, dbname, owner, schema, **kw + ): + view_def = connection.execute( + sql.text( + "select mod.definition " + "from sys.sql_modules as mod " + "join sys.views as views on mod.object_id = views.object_id " + "join sys.schemas as sch on views.schema_id = sch.schema_id " + "where views.name=:viewname and sch.name=:schname" + ).bindparams( + sql.bindparam("viewname", viewname, ischema.CoerceUnicode()), + sql.bindparam("schname", owner, ischema.CoerceUnicode()), + ) + ).scalar() + if view_def: + return view_def + else: + raise exc.NoSuchTableError(f"{owner}.{viewname}") + + @reflection.cache + def get_table_comment(self, connection, table_name, schema=None, **kw): + if not self.supports_comments: + raise NotImplementedError( + "Can't get table comments on current SQL Server version in use" + ) + + schema_name = schema if schema else self.default_schema_name + COMMENT_SQL = """ + SELECT cast(com.value as nvarchar(max)) + FROM fn_listextendedproperty('MS_Description', + 'schema', :schema, 'table', :table, NULL, NULL + ) as com; + """ + + comment = connection.execute( + sql.text(COMMENT_SQL).bindparams( + sql.bindparam("schema", schema_name, ischema.CoerceUnicode()), + sql.bindparam("table", table_name, ischema.CoerceUnicode()), + ) + ).scalar() + if comment: + return {"text": comment} + else: + return self._default_or_error( + connection, + table_name, + None, + ReflectionDefaults.table_comment, + **kw, + ) + + def _temp_table_name_like_pattern(self, tablename): + # LIKE uses '%' to match zero or more characters and '_' to match any + # single character. We want to match literal underscores, so T-SQL + # requires that we enclose them in square brackets. + return tablename + ( + ("[_][_][_]%") if not tablename.startswith("##") else "" + ) + + def _get_internal_temp_table_name(self, connection, tablename): + # it's likely that schema is always "dbo", but since we can + # get it here, let's get it. + # see https://stackoverflow.com/questions/8311959/ + # specifying-schema-for-temporary-tables + + try: + return connection.execute( + sql.text( + "select table_schema, table_name " + "from tempdb.information_schema.tables " + "where table_name like :p1" + ), + {"p1": self._temp_table_name_like_pattern(tablename)}, + ).one() + except exc.MultipleResultsFound as me: + raise exc.UnreflectableTableError( + "Found more than one temporary table named '%s' in tempdb " + "at this time. Cannot reliably resolve that name to its " + "internal table name." % tablename + ) from me + except exc.NoResultFound as ne: + raise exc.NoSuchTableError( + "Unable to find a temporary table named '%s' in tempdb." + % tablename + ) from ne + + @reflection.cache + @_db_plus_owner + def get_columns(self, connection, tablename, dbname, owner, schema, **kw): + is_temp_table = tablename.startswith("#") + if is_temp_table: + owner, tablename = self._get_internal_temp_table_name( + connection, tablename + ) + + columns = ischema.mssql_temp_table_columns + else: + columns = ischema.columns + + computed_cols = ischema.computed_columns + identity_cols = ischema.identity_columns + if owner: + whereclause = sql.and_( + columns.c.table_name == tablename, + columns.c.table_schema == owner, + ) + full_name = columns.c.table_schema + "." + columns.c.table_name + else: + whereclause = columns.c.table_name == tablename + full_name = columns.c.table_name + + if self._supports_nvarchar_max: + computed_definition = computed_cols.c.definition + else: + # tds_version 4.2 does not support NVARCHAR(MAX) + computed_definition = sql.cast( + computed_cols.c.definition, NVARCHAR(4000) + ) + + object_id = func.object_id(full_name) + + s = ( + sql.select( + columns.c.column_name, + columns.c.data_type, + columns.c.is_nullable, + columns.c.character_maximum_length, + columns.c.numeric_precision, + columns.c.numeric_scale, + columns.c.column_default, + columns.c.collation_name, + computed_definition, + computed_cols.c.is_persisted, + identity_cols.c.is_identity, + identity_cols.c.seed_value, + identity_cols.c.increment_value, + ischema.extended_properties.c.value.label("comment"), + ) + .select_from(columns) + .outerjoin( + computed_cols, + onclause=sql.and_( + computed_cols.c.object_id == object_id, + computed_cols.c.name + == columns.c.column_name.collate("DATABASE_DEFAULT"), + ), + ) + .outerjoin( + identity_cols, + onclause=sql.and_( + identity_cols.c.object_id == object_id, + identity_cols.c.name + == columns.c.column_name.collate("DATABASE_DEFAULT"), + ), + ) + .outerjoin( + ischema.extended_properties, + onclause=sql.and_( + ischema.extended_properties.c["class"] == 1, + ischema.extended_properties.c.major_id == object_id, + ischema.extended_properties.c.minor_id + == columns.c.ordinal_position, + ischema.extended_properties.c.name == "MS_Description", + ), + ) + .where(whereclause) + .order_by(columns.c.ordinal_position) + ) + + c = connection.execution_options(future_result=True).execute(s) + + cols = [] + for row in c.mappings(): + name = row[columns.c.column_name] + type_ = row[columns.c.data_type] + nullable = row[columns.c.is_nullable] == "YES" + charlen = row[columns.c.character_maximum_length] + numericprec = row[columns.c.numeric_precision] + numericscale = row[columns.c.numeric_scale] + default = row[columns.c.column_default] + collation = row[columns.c.collation_name] + definition = row[computed_definition] + is_persisted = row[computed_cols.c.is_persisted] + is_identity = row[identity_cols.c.is_identity] + identity_start = row[identity_cols.c.seed_value] + identity_increment = row[identity_cols.c.increment_value] + comment = row[ischema.extended_properties.c.value] + + coltype = self.ischema_names.get(type_, None) + + kwargs = {} + if coltype in ( + MSString, + MSChar, + MSNVarchar, + MSNChar, + MSText, + MSNText, + MSBinary, + MSVarBinary, + sqltypes.LargeBinary, + ): + if charlen == -1: + charlen = None + kwargs["length"] = charlen + if collation: + kwargs["collation"] = collation + + if coltype is None: + util.warn( + "Did not recognize type '%s' of column '%s'" + % (type_, name) + ) + coltype = sqltypes.NULLTYPE + else: + if issubclass(coltype, sqltypes.Numeric): + kwargs["precision"] = numericprec + + if not issubclass(coltype, sqltypes.Float): + kwargs["scale"] = numericscale + + coltype = coltype(**kwargs) + cdict = { + "name": name, + "type": coltype, + "nullable": nullable, + "default": default, + "autoincrement": is_identity is not None, + "comment": comment, + } + + if definition is not None and is_persisted is not None: + cdict["computed"] = { + "sqltext": definition, + "persisted": is_persisted, + } + + if is_identity is not None: + # identity_start and identity_increment are Decimal or None + if identity_start is None or identity_increment is None: + cdict["identity"] = {} + else: + if isinstance(coltype, sqltypes.BigInteger): + start = int(identity_start) + increment = int(identity_increment) + elif isinstance(coltype, sqltypes.Integer): + start = int(identity_start) + increment = int(identity_increment) + else: + start = identity_start + increment = identity_increment + + cdict["identity"] = { + "start": start, + "increment": increment, + } + + cols.append(cdict) + + if cols: + return cols + else: + return self._default_or_error( + connection, tablename, owner, ReflectionDefaults.columns, **kw + ) + + @reflection.cache + @_db_plus_owner + def get_pk_constraint( + self, connection, tablename, dbname, owner, schema, **kw + ): + pkeys = [] + TC = ischema.constraints + C = ischema.key_constraints.alias("C") + + # Primary key constraints + s = ( + sql.select( + C.c.column_name, + TC.c.constraint_type, + C.c.constraint_name, + func.objectproperty( + func.object_id( + C.c.table_schema + "." + C.c.constraint_name + ), + "CnstIsClustKey", + ).label("is_clustered"), + ) + .where( + sql.and_( + TC.c.constraint_name == C.c.constraint_name, + TC.c.table_schema == C.c.table_schema, + C.c.table_name == tablename, + C.c.table_schema == owner, + ), + ) + .order_by(TC.c.constraint_name, C.c.ordinal_position) + ) + c = connection.execution_options(future_result=True).execute(s) + constraint_name = None + is_clustered = None + for row in c.mappings(): + if "PRIMARY" in row[TC.c.constraint_type.name]: + pkeys.append(row["COLUMN_NAME"]) + if constraint_name is None: + constraint_name = row[C.c.constraint_name.name] + if is_clustered is None: + is_clustered = row["is_clustered"] + if pkeys: + return { + "constrained_columns": pkeys, + "name": constraint_name, + "dialect_options": {"mssql_clustered": is_clustered}, + } + else: + return self._default_or_error( + connection, + tablename, + owner, + ReflectionDefaults.pk_constraint, + **kw, + ) + + @reflection.cache + @_db_plus_owner + def get_foreign_keys( + self, connection, tablename, dbname, owner, schema, **kw + ): + # Foreign key constraints + s = ( + text( + """\ +WITH fk_info AS ( + SELECT + ischema_ref_con.constraint_schema, + ischema_ref_con.constraint_name, + ischema_key_col.ordinal_position, + ischema_key_col.table_schema, + ischema_key_col.table_name, + ischema_ref_con.unique_constraint_schema, + ischema_ref_con.unique_constraint_name, + ischema_ref_con.match_option, + ischema_ref_con.update_rule, + ischema_ref_con.delete_rule, + ischema_key_col.column_name AS constrained_column + FROM + INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con + INNER JOIN + INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON + ischema_key_col.table_schema = ischema_ref_con.constraint_schema + AND ischema_key_col.constraint_name = + ischema_ref_con.constraint_name + WHERE ischema_key_col.table_name = :tablename + AND ischema_key_col.table_schema = :owner +), +constraint_info AS ( + SELECT + ischema_key_col.constraint_schema, + ischema_key_col.constraint_name, + ischema_key_col.ordinal_position, + ischema_key_col.table_schema, + ischema_key_col.table_name, + ischema_key_col.column_name + FROM + INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col +), +index_info AS ( + SELECT + sys.schemas.name AS index_schema, + sys.indexes.name AS index_name, + sys.index_columns.key_ordinal AS ordinal_position, + sys.schemas.name AS table_schema, + sys.objects.name AS table_name, + sys.columns.name AS column_name + FROM + sys.indexes + INNER JOIN + sys.objects ON + sys.objects.object_id = sys.indexes.object_id + INNER JOIN + sys.schemas ON + sys.schemas.schema_id = sys.objects.schema_id + INNER JOIN + sys.index_columns ON + sys.index_columns.object_id = sys.objects.object_id + AND sys.index_columns.index_id = sys.indexes.index_id + INNER JOIN + sys.columns ON + sys.columns.object_id = sys.indexes.object_id + AND sys.columns.column_id = sys.index_columns.column_id +) + SELECT + fk_info.constraint_schema, + fk_info.constraint_name, + fk_info.ordinal_position, + fk_info.constrained_column, + constraint_info.table_schema AS referred_table_schema, + constraint_info.table_name AS referred_table_name, + constraint_info.column_name AS referred_column, + fk_info.match_option, + fk_info.update_rule, + fk_info.delete_rule + FROM + fk_info INNER JOIN constraint_info ON + constraint_info.constraint_schema = + fk_info.unique_constraint_schema + AND constraint_info.constraint_name = + fk_info.unique_constraint_name + AND constraint_info.ordinal_position = fk_info.ordinal_position + UNION + SELECT + fk_info.constraint_schema, + fk_info.constraint_name, + fk_info.ordinal_position, + fk_info.constrained_column, + index_info.table_schema AS referred_table_schema, + index_info.table_name AS referred_table_name, + index_info.column_name AS referred_column, + fk_info.match_option, + fk_info.update_rule, + fk_info.delete_rule + FROM + fk_info INNER JOIN index_info ON + index_info.index_schema = fk_info.unique_constraint_schema + AND index_info.index_name = fk_info.unique_constraint_name + AND index_info.ordinal_position = fk_info.ordinal_position + + ORDER BY fk_info.constraint_schema, fk_info.constraint_name, + fk_info.ordinal_position +""" + ) + .bindparams( + sql.bindparam("tablename", tablename, ischema.CoerceUnicode()), + sql.bindparam("owner", owner, ischema.CoerceUnicode()), + ) + .columns( + constraint_schema=sqltypes.Unicode(), + constraint_name=sqltypes.Unicode(), + table_schema=sqltypes.Unicode(), + table_name=sqltypes.Unicode(), + constrained_column=sqltypes.Unicode(), + referred_table_schema=sqltypes.Unicode(), + referred_table_name=sqltypes.Unicode(), + referred_column=sqltypes.Unicode(), + ) + ) + + # group rows by constraint ID, to handle multi-column FKs + fkeys = [] + + def fkey_rec(): + return { + "name": None, + "constrained_columns": [], + "referred_schema": None, + "referred_table": None, + "referred_columns": [], + "options": {}, + } + + fkeys = util.defaultdict(fkey_rec) + + for r in connection.execute(s).all(): + ( + _, # constraint schema + rfknm, + _, # ordinal position + scol, + rschema, + rtbl, + rcol, + # TODO: we support match=<keyword> for foreign keys so + # we can support this also, PG has match=FULL for example + # but this seems to not be a valid value for SQL Server + _, # match rule + fkuprule, + fkdelrule, + ) = r + + rec = fkeys[rfknm] + rec["name"] = rfknm + + if fkuprule != "NO ACTION": + rec["options"]["onupdate"] = fkuprule + + if fkdelrule != "NO ACTION": + rec["options"]["ondelete"] = fkdelrule + + if not rec["referred_table"]: + rec["referred_table"] = rtbl + if schema is not None or owner != rschema: + if dbname: + rschema = dbname + "." + rschema + rec["referred_schema"] = rschema + + local_cols, remote_cols = ( + rec["constrained_columns"], + rec["referred_columns"], + ) + + local_cols.append(scol) + remote_cols.append(rcol) + + if fkeys: + return list(fkeys.values()) + else: + return self._default_or_error( + connection, + tablename, + owner, + ReflectionDefaults.foreign_keys, + **kw, + ) |