diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py | 5007 |
1 files changed, 0 insertions, 5007 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py deleted file mode 100644 index 4ab3ca2..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py +++ /dev/null @@ -1,5007 +0,0 @@ -# dialects/postgresql/base.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -r""" -.. dialect:: postgresql - :name: PostgreSQL - :full_support: 12, 13, 14, 15 - :normal_support: 9.6+ - :best_effort: 9+ - -.. _postgresql_sequences: - -Sequences/SERIAL/IDENTITY -------------------------- - -PostgreSQL supports sequences, and SQLAlchemy uses these as the default means -of creating new primary key values for integer-based primary key columns. When -creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for -integer-based primary key columns, which generates a sequence and server side -default corresponding to the column. - -To specify a specific named sequence to be used for primary key generation, -use the :func:`~sqlalchemy.schema.Sequence` construct:: - - Table( - "sometable", - metadata, - Column( - "id", Integer, Sequence("some_id_seq", start=1), primary_key=True - ) - ) - -When SQLAlchemy issues a single INSERT statement, to fulfill the contract of -having the "last insert identifier" available, a RETURNING clause is added to -the INSERT statement which specifies the primary key columns should be -returned after the statement completes. The RETURNING functionality only takes -place if PostgreSQL 8.2 or later is in use. As a fallback approach, the -sequence, whether specified explicitly or implicitly via ``SERIAL``, is -executed independently beforehand, the returned value to be used in the -subsequent insert. Note that when an -:func:`~sqlalchemy.sql.expression.insert()` construct is executed using -"executemany" semantics, the "last inserted identifier" functionality does not -apply; no RETURNING clause is emitted nor is the sequence pre-executed in this -case. - - -PostgreSQL 10 and above IDENTITY columns -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use -of SERIAL. The :class:`_schema.Identity` construct in a -:class:`_schema.Column` can be used to control its behavior:: - - from sqlalchemy import Table, Column, MetaData, Integer, Computed - - metadata = MetaData() - - data = Table( - "data", - metadata, - Column( - 'id', Integer, Identity(start=42, cycle=True), primary_key=True - ), - Column('data', String) - ) - -The CREATE TABLE for the above :class:`_schema.Table` object would be: - -.. sourcecode:: sql - - CREATE TABLE data ( - id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE), - data VARCHAR, - PRIMARY KEY (id) - ) - -.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct - in a :class:`_schema.Column` to specify the option of an autoincrementing - column. - -.. note:: - - Previous versions of SQLAlchemy did not have built-in support for rendering - of IDENTITY, and could use the following compilation hook to replace - occurrences of SERIAL with IDENTITY:: - - from sqlalchemy.schema import CreateColumn - from sqlalchemy.ext.compiler import compiles - - - @compiles(CreateColumn, 'postgresql') - def use_identity(element, compiler, **kw): - text = compiler.visit_create_column(element, **kw) - text = text.replace( - "SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY" - ) - return text - - Using the above, a table such as:: - - t = Table( - 't', m, - Column('id', Integer, primary_key=True), - Column('data', String) - ) - - Will generate on the backing database as:: - - CREATE TABLE t ( - id INT GENERATED BY DEFAULT AS IDENTITY, - data VARCHAR, - PRIMARY KEY (id) - ) - -.. _postgresql_ss_cursors: - -Server Side Cursors -------------------- - -Server-side cursor support is available for the psycopg2, asyncpg -dialects and may also be available in others. - -Server side cursors are enabled on a per-statement basis by using the -:paramref:`.Connection.execution_options.stream_results` connection execution -option:: - - with engine.connect() as conn: - result = conn.execution_options(stream_results=True).execute(text("select * from table")) - -Note that some kinds of SQL statements may not be supported with -server side cursors; generally, only SQL statements that return rows should be -used with this option. - -.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated - and will be removed in a future release. Please use the - :paramref:`_engine.Connection.stream_results` execution option for - unbuffered cursor support. - -.. seealso:: - - :ref:`engine_stream_results` - -.. _postgresql_isolation_level: - -Transaction Isolation Level ---------------------------- - -Most SQLAlchemy dialects support setting of transaction isolation level -using the :paramref:`_sa.create_engine.isolation_level` parameter -at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection` -level via the :paramref:`.Connection.execution_options.isolation_level` -parameter. - -For PostgreSQL dialects, this feature works either by making use of the -DBAPI-specific features, such as psycopg2's isolation level flags which will -embed the isolation level setting inline with the ``"BEGIN"`` statement, or for -DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS -TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement -emitted by the DBAPI. For the special AUTOCOMMIT isolation level, -DBAPI-specific techniques are used which is typically an ``.autocommit`` -flag on the DBAPI connection object. - -To set isolation level using :func:`_sa.create_engine`:: - - engine = create_engine( - "postgresql+pg8000://scott:tiger@localhost/test", - isolation_level = "REPEATABLE READ" - ) - -To set using per-connection execution options:: - - with engine.connect() as conn: - conn = conn.execution_options( - isolation_level="REPEATABLE READ" - ) - with conn.begin(): - # ... work with transaction - -There are also more options for isolation level configurations, such as -"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply -different isolation level settings. See the discussion at -:ref:`dbapi_autocommit` for background. - -Valid values for ``isolation_level`` on most PostgreSQL dialects include: - -* ``READ COMMITTED`` -* ``READ UNCOMMITTED`` -* ``REPEATABLE READ`` -* ``SERIALIZABLE`` -* ``AUTOCOMMIT`` - -.. seealso:: - - :ref:`dbapi_autocommit` - - :ref:`postgresql_readonly_deferrable` - - :ref:`psycopg2_isolation_level` - - :ref:`pg8000_isolation_level` - -.. _postgresql_readonly_deferrable: - -Setting READ ONLY / DEFERRABLE ------------------------------- - -Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" -characteristics of the transaction, which is in addition to the isolation level -setting. These two attributes can be established either in conjunction with or -independently of the isolation level by passing the ``postgresql_readonly`` and -``postgresql_deferrable`` flags with -:meth:`_engine.Connection.execution_options`. The example below illustrates -passing the ``"SERIALIZABLE"`` isolation level at the same time as setting -"READ ONLY" and "DEFERRABLE":: - - with engine.connect() as conn: - conn = conn.execution_options( - isolation_level="SERIALIZABLE", - postgresql_readonly=True, - postgresql_deferrable=True - ) - with conn.begin(): - # ... work with transaction - -Note that some DBAPIs such as asyncpg only support "readonly" with -SERIALIZABLE isolation. - -.. versionadded:: 1.4 added support for the ``postgresql_readonly`` - and ``postgresql_deferrable`` execution options. - -.. _postgresql_reset_on_return: - -Temporary Table / Resource Reset for Connection Pooling -------------------------------------------------------- - -The :class:`.QueuePool` connection pool implementation used -by the SQLAlchemy :class:`.Engine` object includes -:ref:`reset on return <pool_reset_on_return>` behavior that will invoke -the DBAPI ``.rollback()`` method when connections are returned to the pool. -While this rollback will clear out the immediate state used by the previous -transaction, it does not cover a wider range of session-level state, including -temporary tables as well as other server state such as prepared statement -handles and statement caches. The PostgreSQL database includes a variety -of commands which may be used to reset this state, including -``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``. - - -To install -one or more of these commands as the means of performing reset-on-return, -the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated -in the example below. The implementation -will end transactions in progress as well as discard temporary tables -using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL -documentation for background on what each of these statements do. - -The :paramref:`_sa.create_engine.pool_reset_on_return` parameter -is set to ``None`` so that the custom scheme can replace the default behavior -completely. The custom hook implementation calls ``.rollback()`` in any case, -as it's usually important that the DBAPI's own tracking of commit/rollback -will remain consistent with the state of the transaction:: - - - from sqlalchemy import create_engine - from sqlalchemy import event - - postgresql_engine = create_engine( - "postgresql+pyscopg2://scott:tiger@hostname/dbname", - - # disable default reset-on-return scheme - pool_reset_on_return=None, - ) - - - @event.listens_for(postgresql_engine, "reset") - def _reset_postgresql(dbapi_connection, connection_record, reset_state): - if not reset_state.terminate_only: - dbapi_connection.execute("CLOSE ALL") - dbapi_connection.execute("RESET ALL") - dbapi_connection.execute("DISCARD TEMP") - - # so that the DBAPI itself knows that the connection has been - # reset - dbapi_connection.rollback() - -.. versionchanged:: 2.0.0b3 Added additional state arguments to - the :meth:`.PoolEvents.reset` event and additionally ensured the event - is invoked for all "reset" occurrences, so that it's appropriate - as a place for custom "reset" handlers. Previous schemes which - use the :meth:`.PoolEvents.checkin` handler remain usable as well. - -.. seealso:: - - :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation - -.. _postgresql_alternate_search_path: - -Setting Alternate Search Paths on Connect ------------------------------------------- - -The PostgreSQL ``search_path`` variable refers to the list of schema names -that will be implicitly referenced when a particular table or other -object is referenced in a SQL statement. As detailed in the next section -:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around -the concept of keeping this variable at its default value of ``public``, -however, in order to have it set to any arbitrary name or names when connections -are used automatically, the "SET SESSION search_path" command may be invoked -for all connections in a pool using the following event handler, as discussed -at :ref:`schema_set_default_connections`:: - - from sqlalchemy import event - from sqlalchemy import create_engine - - engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname") - - @event.listens_for(engine, "connect", insert=True) - def set_search_path(dbapi_connection, connection_record): - existing_autocommit = dbapi_connection.autocommit - dbapi_connection.autocommit = True - cursor = dbapi_connection.cursor() - cursor.execute("SET SESSION search_path='%s'" % schema_name) - cursor.close() - dbapi_connection.autocommit = existing_autocommit - -The reason the recipe is complicated by use of the ``.autocommit`` DBAPI -attribute is so that when the ``SET SESSION search_path`` directive is invoked, -it is invoked outside of the scope of any transaction and therefore will not -be reverted when the DBAPI connection has a rollback. - -.. seealso:: - - :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation - - - - -.. _postgresql_schema_reflection: - -Remote-Schema Table Introspection and PostgreSQL search_path ------------------------------------------------------------- - -.. admonition:: Section Best Practices Summarized - - keep the ``search_path`` variable set to its default of ``public``, without - any other schema names. Ensure the username used to connect **does not** - match remote schemas, or ensure the ``"$user"`` token is **removed** from - ``search_path``. For other schema names, name these explicitly - within :class:`_schema.Table` definitions. Alternatively, the - ``postgresql_ignore_search_path`` option will cause all reflected - :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema` - attribute set up. - -The PostgreSQL dialect can reflect tables from any schema, as outlined in -:ref:`metadata_reflection_schemas`. - -In all cases, the first thing SQLAlchemy does when reflecting tables is -to **determine the default schema for the current database connection**. -It does this using the PostgreSQL ``current_schema()`` -function, illustated below using a PostgreSQL client session (i.e. using -the ``psql`` tool):: - - test=> select current_schema(); - current_schema - ---------------- - public - (1 row) - -Above we see that on a plain install of PostgreSQL, the default schema name -is the name ``public``. - -However, if your database username **matches the name of a schema**, PostgreSQL's -default is to then **use that name as the default schema**. Below, we log in -using the username ``scott``. When we create a schema named ``scott``, **it -implicitly changes the default schema**:: - - test=> select current_schema(); - current_schema - ---------------- - public - (1 row) - - test=> create schema scott; - CREATE SCHEMA - test=> select current_schema(); - current_schema - ---------------- - scott - (1 row) - -The behavior of ``current_schema()`` is derived from the -`PostgreSQL search path -<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ -variable ``search_path``, which in modern PostgreSQL versions defaults to this:: - - test=> show search_path; - search_path - ----------------- - "$user", public - (1 row) - -Where above, the ``"$user"`` variable will inject the current username as the -default schema, if one exists. Otherwise, ``public`` is used. - -When a :class:`_schema.Table` object is reflected, if it is present in the -schema indicated by the ``current_schema()`` function, **the schema name assigned -to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the -".schema" attribute will be assigned the string name of that schema. - -With regards to tables which these :class:`_schema.Table` -objects refer to via foreign key constraint, a decision must be made as to how -the ``.schema`` is represented in those remote tables, in the case where that -remote schema name is also a member of the current ``search_path``. - -By default, the PostgreSQL dialect mimics the behavior encouraged by -PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function -returns a sample definition for a particular foreign key constraint, -omitting the referenced schema name from that definition when the name is -also in the PostgreSQL schema search path. The interaction below -illustrates this behavior:: - - test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY); - CREATE TABLE - test=> CREATE TABLE referring( - test(> id INTEGER PRIMARY KEY, - test(> referred_id INTEGER REFERENCES test_schema.referred(id)); - CREATE TABLE - test=> SET search_path TO public, test_schema; - test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM - test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n - test-> ON n.oid = c.relnamespace - test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid - test-> WHERE c.relname='referring' AND r.contype = 'f' - test-> ; - pg_get_constraintdef - --------------------------------------------------- - FOREIGN KEY (referred_id) REFERENCES referred(id) - (1 row) - -Above, we created a table ``referred`` as a member of the remote schema -``test_schema``, however when we added ``test_schema`` to the -PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the -``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of -the function. - -On the other hand, if we set the search path back to the typical default -of ``public``:: - - test=> SET search_path TO public; - SET - -The same query against ``pg_get_constraintdef()`` now returns the fully -schema-qualified name for us:: - - test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM - test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n - test-> ON n.oid = c.relnamespace - test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid - test-> WHERE c.relname='referring' AND r.contype = 'f'; - pg_get_constraintdef - --------------------------------------------------------------- - FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id) - (1 row) - -SQLAlchemy will by default use the return value of ``pg_get_constraintdef()`` -in order to determine the remote schema name. That is, if our ``search_path`` -were set to include ``test_schema``, and we invoked a table -reflection process as follows:: - - >>> from sqlalchemy import Table, MetaData, create_engine, text - >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") - >>> with engine.connect() as conn: - ... conn.execute(text("SET search_path TO test_schema, public")) - ... metadata_obj = MetaData() - ... referring = Table('referring', metadata_obj, - ... autoload_with=conn) - ... - <sqlalchemy.engine.result.CursorResult object at 0x101612ed0> - -The above process would deliver to the :attr:`_schema.MetaData.tables` -collection -``referred`` table named **without** the schema:: - - >>> metadata_obj.tables['referred'].schema is None - True - -To alter the behavior of reflection such that the referred schema is -maintained regardless of the ``search_path`` setting, use the -``postgresql_ignore_search_path`` option, which can be specified as a -dialect-specific argument to both :class:`_schema.Table` as well as -:meth:`_schema.MetaData.reflect`:: - - >>> with engine.connect() as conn: - ... conn.execute(text("SET search_path TO test_schema, public")) - ... metadata_obj = MetaData() - ... referring = Table('referring', metadata_obj, - ... autoload_with=conn, - ... postgresql_ignore_search_path=True) - ... - <sqlalchemy.engine.result.CursorResult object at 0x1016126d0> - -We will now have ``test_schema.referred`` stored as schema-qualified:: - - >>> metadata_obj.tables['test_schema.referred'].schema - 'test_schema' - -.. sidebar:: Best Practices for PostgreSQL Schema reflection - - The description of PostgreSQL schema reflection behavior is complex, and - is the product of many years of dealing with widely varied use cases and - user preferences. But in fact, there's no need to understand any of it if - you just stick to the simplest use pattern: leave the ``search_path`` set - to its default of ``public`` only, never refer to the name ``public`` as - an explicit schema name otherwise, and refer to all other schema names - explicitly when building up a :class:`_schema.Table` object. The options - described here are only for those users who can't, or prefer not to, stay - within these guidelines. - -.. seealso:: - - :ref:`reflection_schema_qualified_interaction` - discussion of the issue - from a backend-agnostic perspective - - `The Schema Search Path - <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ - - on the PostgreSQL website. - -INSERT/UPDATE...RETURNING -------------------------- - -The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and -``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default -for single-row INSERT statements in order to fetch newly generated -primary key identifiers. To specify an explicit ``RETURNING`` clause, -use the :meth:`._UpdateBase.returning` method on a per-statement basis:: - - # INSERT..RETURNING - result = table.insert().returning(table.c.col1, table.c.col2).\ - values(name='foo') - print(result.fetchall()) - - # UPDATE..RETURNING - result = table.update().returning(table.c.col1, table.c.col2).\ - where(table.c.name=='foo').values(name='bar') - print(result.fetchall()) - - # DELETE..RETURNING - result = table.delete().returning(table.c.col1, table.c.col2).\ - where(table.c.name=='foo') - print(result.fetchall()) - -.. _postgresql_insert_on_conflict: - -INSERT...ON CONFLICT (Upsert) ------------------------------- - -Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of -rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A -candidate row will only be inserted if that row does not violate any unique -constraints. In the case of a unique constraint violation, a secondary action -can occur which can be either "DO UPDATE", indicating that the data in the -target row should be updated, or "DO NOTHING", which indicates to silently skip -this row. - -Conflicts are determined using existing unique constraints and indexes. These -constraints may be identified either using their name as stated in DDL, -or they may be inferred by stating the columns and conditions that comprise -the indexes. - -SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific -:func:`_postgresql.insert()` function, which provides -the generative methods :meth:`_postgresql.Insert.on_conflict_do_update` -and :meth:`~.postgresql.Insert.on_conflict_do_nothing`: - -.. sourcecode:: pycon+sql - - >>> from sqlalchemy.dialects.postgresql import insert - >>> insert_stmt = insert(my_table).values( - ... id='some_existing_id', - ... data='inserted value') - >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing( - ... index_elements=['id'] - ... ) - >>> print(do_nothing_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT (id) DO NOTHING - {stop} - - >>> do_update_stmt = insert_stmt.on_conflict_do_update( - ... constraint='pk_my_table', - ... set_=dict(data='updated value') - ... ) - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s - -.. seealso:: - - `INSERT .. ON CONFLICT - <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_ - - in the PostgreSQL documentation. - -Specifying the Target -^^^^^^^^^^^^^^^^^^^^^ - -Both methods supply the "target" of the conflict using either the -named constraint or by column inference: - -* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument - specifies a sequence containing string column names, :class:`_schema.Column` - objects, and/or SQL expression elements, which would identify a unique - index: - - .. sourcecode:: pycon+sql - - >>> do_update_stmt = insert_stmt.on_conflict_do_update( - ... index_elements=['id'], - ... set_=dict(data='updated value') - ... ) - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT (id) DO UPDATE SET data = %(param_1)s - {stop} - - >>> do_update_stmt = insert_stmt.on_conflict_do_update( - ... index_elements=[my_table.c.id], - ... set_=dict(data='updated value') - ... ) - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT (id) DO UPDATE SET data = %(param_1)s - -* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to - infer an index, a partial index can be inferred by also specifying the - use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter: - - .. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') - >>> stmt = stmt.on_conflict_do_update( - ... index_elements=[my_table.c.user_email], - ... index_where=my_table.c.user_email.like('%@gmail.com'), - ... set_=dict(data=stmt.excluded.data) - ... ) - >>> print(stmt) - {printsql}INSERT INTO my_table (data, user_email) - VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email) - WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data - -* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is - used to specify an index directly rather than inferring it. This can be - the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX: - - .. sourcecode:: pycon+sql - - >>> do_update_stmt = insert_stmt.on_conflict_do_update( - ... constraint='my_table_idx_1', - ... set_=dict(data='updated value') - ... ) - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s - {stop} - - >>> do_update_stmt = insert_stmt.on_conflict_do_update( - ... constraint='my_table_pk', - ... set_=dict(data='updated value') - ... ) - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s - {stop} - -* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may - also refer to a SQLAlchemy construct representing a constraint, - e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, - :class:`.Index`, or :class:`.ExcludeConstraint`. In this use, - if the constraint has a name, it is used directly. Otherwise, if the - constraint is unnamed, then inference will be used, where the expressions - and optional WHERE clause of the constraint will be spelled out in the - construct. This use is especially convenient - to refer to the named or unnamed primary key of a :class:`_schema.Table` - using the - :attr:`_schema.Table.primary_key` attribute: - - .. sourcecode:: pycon+sql - - >>> do_update_stmt = insert_stmt.on_conflict_do_update( - ... constraint=my_table.primary_key, - ... set_=dict(data='updated value') - ... ) - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT (id) DO UPDATE SET data = %(param_1)s - -The SET Clause -^^^^^^^^^^^^^^^ - -``ON CONFLICT...DO UPDATE`` is used to perform an update of the already -existing row, using any combination of new values as well as values -from the proposed insertion. These values are specified using the -:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This -parameter accepts a dictionary which consists of direct values -for UPDATE: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values(id='some_id', data='inserted value') - >>> do_update_stmt = stmt.on_conflict_do_update( - ... index_elements=['id'], - ... set_=dict(data='updated value') - ... ) - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT (id) DO UPDATE SET data = %(param_1)s - -.. warning:: - - The :meth:`_expression.Insert.on_conflict_do_update` - method does **not** take into - account Python-side default UPDATE values or generation functions, e.g. - those specified using :paramref:`_schema.Column.onupdate`. - These values will not be exercised for an ON CONFLICT style of UPDATE, - unless they are manually specified in the - :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary. - -Updating using the Excluded INSERT Values -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -In order to refer to the proposed insertion row, the special alias -:attr:`~.postgresql.Insert.excluded` is available as an attribute on -the :class:`_postgresql.Insert` object; this object is a -:class:`_expression.ColumnCollection` -which alias contains all columns of the target -table: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values( - ... id='some_id', - ... data='inserted value', - ... author='jlh' - ... ) - >>> do_update_stmt = stmt.on_conflict_do_update( - ... index_elements=['id'], - ... set_=dict(data='updated value', author=stmt.excluded.author) - ... ) - >>> print(do_update_stmt) - {printsql}INSERT INTO my_table (id, data, author) - VALUES (%(id)s, %(data)s, %(author)s) - ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author - -Additional WHERE Criteria -^^^^^^^^^^^^^^^^^^^^^^^^^ - -The :meth:`_expression.Insert.on_conflict_do_update` method also accepts -a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where` -parameter, which will limit those rows which receive an UPDATE: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values( - ... id='some_id', - ... data='inserted value', - ... author='jlh' - ... ) - >>> on_update_stmt = stmt.on_conflict_do_update( - ... index_elements=['id'], - ... set_=dict(data='updated value', author=stmt.excluded.author), - ... where=(my_table.c.status == 2) - ... ) - >>> print(on_update_stmt) - {printsql}INSERT INTO my_table (id, data, author) - VALUES (%(id)s, %(data)s, %(author)s) - ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author - WHERE my_table.status = %(status_1)s - -Skipping Rows with DO NOTHING -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -``ON CONFLICT`` may be used to skip inserting a row entirely -if any conflict with a unique or exclusion constraint occurs; below -this is illustrated using the -:meth:`~.postgresql.Insert.on_conflict_do_nothing` method: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values(id='some_id', data='inserted value') - >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id']) - >>> print(stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT (id) DO NOTHING - -If ``DO NOTHING`` is used without specifying any columns or constraint, -it has the effect of skipping the INSERT for any unique or exclusion -constraint violation which occurs: - -.. sourcecode:: pycon+sql - - >>> stmt = insert(my_table).values(id='some_id', data='inserted value') - >>> stmt = stmt.on_conflict_do_nothing() - >>> print(stmt) - {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) - ON CONFLICT DO NOTHING - -.. _postgresql_match: - -Full Text Search ----------------- - -PostgreSQL's full text search system is available through the use of the -:data:`.func` namespace, combined with the use of custom operators -via the :meth:`.Operators.bool_op` method. For simple cases with some -degree of cross-backend compatibility, the :meth:`.Operators.match` operator -may also be used. - -.. _postgresql_simple_match: - -Simple plain text matching with ``match()`` -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The :meth:`.Operators.match` operator provides for cross-compatible simple -text matching. For the PostgreSQL backend, it's hardcoded to generate -an expression using the ``@@`` operator in conjunction with the -``plainto_tsquery()`` PostgreSQL function. - -On the PostgreSQL dialect, an expression like the following:: - - select(sometable.c.text.match("search string")) - -would emit to the database:: - - SELECT text @@ plainto_tsquery('search string') FROM table - -Above, passing a plain string to :meth:`.Operators.match` will automatically -make use of ``plainto_tsquery()`` to specify the type of tsquery. This -establishes basic database cross-compatibility for :meth:`.Operators.match` -with other backends. - -.. versionchanged:: 2.0 The default tsquery generation function used by the - PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``. - - To render exactly what was rendered in 1.4, use the following form:: - - from sqlalchemy import func - - select( - sometable.c.text.bool_op("@@")(func.to_tsquery("search string")) - ) - - Which would emit:: - - SELECT text @@ to_tsquery('search string') FROM table - -Using PostgreSQL full text functions and operators directly -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Text search operations beyond the simple use of :meth:`.Operators.match` -may make use of the :data:`.func` namespace to generate PostgreSQL full-text -functions, in combination with :meth:`.Operators.bool_op` to generate -any boolean operator. - -For example, the query:: - - select( - func.to_tsquery('cat').bool_op("@>")(func.to_tsquery('cat & rat')) - ) - -would generate: - -.. sourcecode:: sql - - SELECT to_tsquery('cat') @> to_tsquery('cat & rat') - - -The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST:: - - from sqlalchemy.dialects.postgresql import TSVECTOR - from sqlalchemy import select, cast - select(cast("some text", TSVECTOR)) - -produces a statement equivalent to:: - - SELECT CAST('some text' AS TSVECTOR) AS anon_1 - -The ``func`` namespace is augmented by the PostgreSQL dialect to set up -correct argument and return types for most full text search functions. -These functions are used automatically by the :attr:`_sql.func` namespace -assuming the ``sqlalchemy.dialects.postgresql`` package has been imported, -or :func:`_sa.create_engine` has been invoked using a ``postgresql`` -dialect. These functions are documented at: - -* :class:`_postgresql.to_tsvector` -* :class:`_postgresql.to_tsquery` -* :class:`_postgresql.plainto_tsquery` -* :class:`_postgresql.phraseto_tsquery` -* :class:`_postgresql.websearch_to_tsquery` -* :class:`_postgresql.ts_headline` - -Specifying the "regconfig" with ``match()`` or custom operators -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -PostgreSQL's ``plainto_tsquery()`` function accepts an optional -"regconfig" argument that is used to instruct PostgreSQL to use a -particular pre-computed GIN or GiST index in order to perform the search. -When using :meth:`.Operators.match`, this additional parameter may be -specified using the ``postgresql_regconfig`` parameter, such as:: - - select(mytable.c.id).where( - mytable.c.title.match('somestring', postgresql_regconfig='english') - ) - -Which would emit:: - - SELECT mytable.id FROM mytable - WHERE mytable.title @@ plainto_tsquery('english', 'somestring') - -When using other PostgreSQL search functions with :data:`.func`, the -"regconfig" parameter may be passed directly as the initial argument:: - - select(mytable.c.id).where( - func.to_tsvector("english", mytable.c.title).bool_op("@@")( - func.to_tsquery("english", "somestring") - ) - ) - -produces a statement equivalent to:: - - SELECT mytable.id FROM mytable - WHERE to_tsvector('english', mytable.title) @@ - to_tsquery('english', 'somestring') - -It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from -PostgreSQL to ensure that you are generating queries with SQLAlchemy that -take full advantage of any indexes you may have created for full text search. - -.. seealso:: - - `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation - - -FROM ONLY ... -------------- - -The dialect supports PostgreSQL's ONLY keyword for targeting only a particular -table in an inheritance hierarchy. This can be used to produce the -``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...`` -syntaxes. It uses SQLAlchemy's hints mechanism:: - - # SELECT ... FROM ONLY ... - result = table.select().with_hint(table, 'ONLY', 'postgresql') - print(result.fetchall()) - - # UPDATE ONLY ... - table.update(values=dict(foo='bar')).with_hint('ONLY', - dialect_name='postgresql') - - # DELETE FROM ONLY ... - table.delete().with_hint('ONLY', dialect_name='postgresql') - - -.. _postgresql_indexes: - -PostgreSQL-Specific Index Options ---------------------------------- - -Several extensions to the :class:`.Index` construct are available, specific -to the PostgreSQL dialect. - -Covering Indexes -^^^^^^^^^^^^^^^^ - -The ``postgresql_include`` option renders INCLUDE(colname) for the given -string names:: - - Index("my_index", table.c.x, postgresql_include=['y']) - -would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` - -Note that this feature requires PostgreSQL 11 or later. - -.. versionadded:: 1.4 - -.. _postgresql_partial_indexes: - -Partial Indexes -^^^^^^^^^^^^^^^ - -Partial indexes add criterion to the index definition so that the index is -applied to a subset of rows. These can be specified on :class:`.Index` -using the ``postgresql_where`` keyword argument:: - - Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10) - -.. _postgresql_operator_classes: - -Operator Classes -^^^^^^^^^^^^^^^^ - -PostgreSQL allows the specification of an *operator class* for each column of -an index (see -https://www.postgresql.org/docs/current/interactive/indexes-opclass.html). -The :class:`.Index` construct allows these to be specified via the -``postgresql_ops`` keyword argument:: - - Index( - 'my_index', my_table.c.id, my_table.c.data, - postgresql_ops={ - 'data': 'text_pattern_ops', - 'id': 'int4_ops' - }) - -Note that the keys in the ``postgresql_ops`` dictionaries are the -"key" name of the :class:`_schema.Column`, i.e. the name used to access it from -the ``.c`` collection of :class:`_schema.Table`, which can be configured to be -different than the actual name of the column as expressed in the database. - -If ``postgresql_ops`` is to be used against a complex SQL expression such -as a function call, then to apply to the column it must be given a label -that is identified in the dictionary by name, e.g.:: - - Index( - 'my_index', my_table.c.id, - func.lower(my_table.c.data).label('data_lower'), - postgresql_ops={ - 'data_lower': 'text_pattern_ops', - 'id': 'int4_ops' - }) - -Operator classes are also supported by the -:class:`_postgresql.ExcludeConstraint` construct using the -:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for -details. - -.. versionadded:: 1.3.21 added support for operator classes with - :class:`_postgresql.ExcludeConstraint`. - - -Index Types -^^^^^^^^^^^ - -PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well -as the ability for users to create their own (see -https://www.postgresql.org/docs/current/static/indexes-types.html). These can be -specified on :class:`.Index` using the ``postgresql_using`` keyword argument:: - - Index('my_index', my_table.c.data, postgresql_using='gin') - -The value passed to the keyword argument will be simply passed through to the -underlying CREATE INDEX command, so it *must* be a valid index type for your -version of PostgreSQL. - -.. _postgresql_index_storage: - -Index Storage Parameters -^^^^^^^^^^^^^^^^^^^^^^^^ - -PostgreSQL allows storage parameters to be set on indexes. The storage -parameters available depend on the index method used by the index. Storage -parameters can be specified on :class:`.Index` using the ``postgresql_with`` -keyword argument:: - - Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50}) - -PostgreSQL allows to define the tablespace in which to create the index. -The tablespace can be specified on :class:`.Index` using the -``postgresql_tablespace`` keyword argument:: - - Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace') - -Note that the same option is available on :class:`_schema.Table` as well. - -.. _postgresql_index_concurrently: - -Indexes with CONCURRENTLY -^^^^^^^^^^^^^^^^^^^^^^^^^ - -The PostgreSQL index option CONCURRENTLY is supported by passing the -flag ``postgresql_concurrently`` to the :class:`.Index` construct:: - - tbl = Table('testtbl', m, Column('data', Integer)) - - idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True) - -The above index construct will render DDL for CREATE INDEX, assuming -PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as:: - - CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data) - -For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for -a connection-less dialect, it will emit:: - - DROP INDEX CONCURRENTLY test_idx1 - -When using CONCURRENTLY, the PostgreSQL database requires that the statement -be invoked outside of a transaction block. The Python DBAPI enforces that -even for a single statement, a transaction is present, so to use this -construct, the DBAPI's "autocommit" mode must be used:: - - metadata = MetaData() - table = Table( - "foo", metadata, - Column("id", String)) - index = Index( - "foo_idx", table.c.id, postgresql_concurrently=True) - - with engine.connect() as conn: - with conn.execution_options(isolation_level='AUTOCOMMIT'): - table.create(conn) - -.. seealso:: - - :ref:`postgresql_isolation_level` - -.. _postgresql_index_reflection: - -PostgreSQL Index Reflection ---------------------------- - -The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the -UNIQUE CONSTRAINT construct is used. When inspecting a table using -:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes` -and the :meth:`_reflection.Inspector.get_unique_constraints` -will report on these -two constructs distinctly; in the case of the index, the key -``duplicates_constraint`` will be present in the index entry if it is -detected as mirroring a constraint. When performing reflection using -``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned -in :attr:`_schema.Table.indexes` when it is detected as mirroring a -:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection -. - -Special Reflection Options --------------------------- - -The :class:`_reflection.Inspector` -used for the PostgreSQL backend is an instance -of :class:`.PGInspector`, which offers additional methods:: - - from sqlalchemy import create_engine, inspect - - engine = create_engine("postgresql+psycopg2://localhost/test") - insp = inspect(engine) # will be a PGInspector - - print(insp.get_enums()) - -.. autoclass:: PGInspector - :members: - -.. _postgresql_table_options: - -PostgreSQL Table Options ------------------------- - -Several options for CREATE TABLE are supported directly by the PostgreSQL -dialect in conjunction with the :class:`_schema.Table` construct: - -* ``INHERITS``:: - - Table("some_table", metadata, ..., postgresql_inherits="some_supertable") - - Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...)) - -* ``ON COMMIT``:: - - Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS') - -* ``PARTITION BY``:: - - Table("some_table", metadata, ..., - postgresql_partition_by='LIST (part_column)') - - .. versionadded:: 1.2.6 - -* ``TABLESPACE``:: - - Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace') - - The above option is also available on the :class:`.Index` construct. - -* ``USING``:: - - Table("some_table", metadata, ..., postgresql_using='heap') - - .. versionadded:: 2.0.26 - -* ``WITH OIDS``:: - - Table("some_table", metadata, ..., postgresql_with_oids=True) - -* ``WITHOUT OIDS``:: - - Table("some_table", metadata, ..., postgresql_with_oids=False) - -.. seealso:: - - `PostgreSQL CREATE TABLE options - <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ - - in the PostgreSQL documentation. - -.. _postgresql_constraint_options: - -PostgreSQL Constraint Options ------------------------------ - -The following option(s) are supported by the PostgreSQL dialect in conjunction -with selected constraint constructs: - -* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints - when the constraint is being added to an existing table via ALTER TABLE, - and has the effect that existing rows are not scanned during the ALTER - operation against the constraint being added. - - When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_ - that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument - may be specified as an additional keyword argument within the operation - that creates the constraint, as in the following Alembic example:: - - def update(): - op.create_foreign_key( - "fk_user_address", - "address", - "user", - ["user_id"], - ["id"], - postgresql_not_valid=True - ) - - The keyword is ultimately accepted directly by the - :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint` - and :class:`_schema.ForeignKey` constructs; when using a tool like - Alembic, dialect-specific keyword arguments are passed through to - these constructs from the migration operation directives:: - - CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True) - - ForeignKeyConstraint(["some_id"], ["some_table.some_id"], postgresql_not_valid=True) - - .. versionadded:: 1.4.32 - - .. seealso:: - - `PostgreSQL ALTER TABLE options - <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ - - in the PostgreSQL documentation. - -.. _postgresql_table_valued_overview: - -Table values, Table and Column valued functions, Row and Tuple objects ------------------------------------------------------------------------ - -PostgreSQL makes great use of modern SQL forms such as table-valued functions, -tables and rows as values. These constructs are commonly used as part -of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other -datatypes. SQLAlchemy's SQL expression language has native support for -most table-valued and row-valued forms. - -.. _postgresql_table_valued: - -Table-Valued Functions -^^^^^^^^^^^^^^^^^^^^^^^ - -Many PostgreSQL built-in functions are intended to be used in the FROM clause -of a SELECT statement, and are capable of returning table rows or sets of table -rows. A large portion of PostgreSQL's JSON functions for example such as -``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``, -``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such -forms. These classes of SQL function calling forms in SQLAlchemy are available -using the :meth:`_functions.FunctionElement.table_valued` method in conjunction -with :class:`_functions.Function` objects generated from the :data:`_sql.func` -namespace. - -Examples from PostgreSQL's reference documentation follow below: - -* ``json_each()``: - - .. sourcecode:: pycon+sql - - >>> from sqlalchemy import select, func - >>> stmt = select(func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value")) - >>> print(stmt) - {printsql}SELECT anon_1.key, anon_1.value - FROM json_each(:json_each_1) AS anon_1 - -* ``json_populate_record()``: - - .. sourcecode:: pycon+sql - - >>> from sqlalchemy import select, func, literal_column - >>> stmt = select( - ... func.json_populate_record( - ... literal_column("null::myrowtype"), - ... '{"a":1,"b":2}' - ... ).table_valued("a", "b", name="x") - ... ) - >>> print(stmt) - {printsql}SELECT x.a, x.b - FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x - -* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived - columns in the alias, where we may make use of :func:`_sql.column` elements with - types to produce them. The :meth:`_functions.FunctionElement.table_valued` - method produces a :class:`_sql.TableValuedAlias` construct, and the method - :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived - columns specification: - - .. sourcecode:: pycon+sql - - >>> from sqlalchemy import select, func, column, Integer, Text - >>> stmt = select( - ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}').table_valued( - ... column("a", Integer), column("b", Text), column("d", Text), - ... ).render_derived(name="x", with_types=True) - ... ) - >>> print(stmt) - {printsql}SELECT x.a, x.b, x.d - FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT) - -* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an - ordinal counter to the output of a function and is accepted by a limited set - of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The - :meth:`_functions.FunctionElement.table_valued` method accepts a keyword - parameter ``with_ordinality`` for this purpose, which accepts the string name - that will be applied to the "ordinality" column: - - .. sourcecode:: pycon+sql - - >>> from sqlalchemy import select, func - >>> stmt = select( - ... func.generate_series(4, 1, -1). - ... table_valued("value", with_ordinality="ordinality"). - ... render_derived() - ... ) - >>> print(stmt) - {printsql}SELECT anon_1.value, anon_1.ordinality - FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) - WITH ORDINALITY AS anon_1(value, ordinality) - -.. versionadded:: 1.4.0b2 - -.. seealso:: - - :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial` - -.. _postgresql_column_valued: - -Column Valued Functions -^^^^^^^^^^^^^^^^^^^^^^^ - -Similar to the table valued function, a column valued function is present -in the FROM clause, but delivers itself to the columns clause as a single -scalar value. PostgreSQL functions such as ``json_array_elements()``, -``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the -:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`: - -* ``json_array_elements()``: - - .. sourcecode:: pycon+sql - - >>> from sqlalchemy import select, func - >>> stmt = select(func.json_array_elements('["one", "two"]').column_valued("x")) - >>> print(stmt) - {printsql}SELECT x - FROM json_array_elements(:json_array_elements_1) AS x - -* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the - :func:`_postgresql.array` construct may be used: - - .. sourcecode:: pycon+sql - - >>> from sqlalchemy.dialects.postgresql import array - >>> from sqlalchemy import select, func - >>> stmt = select(func.unnest(array([1, 2])).column_valued()) - >>> print(stmt) - {printsql}SELECT anon_1 - FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1 - - The function can of course be used against an existing table-bound column - that's of type :class:`_types.ARRAY`: - - .. sourcecode:: pycon+sql - - >>> from sqlalchemy import table, column, ARRAY, Integer - >>> from sqlalchemy import select, func - >>> t = table("t", column('value', ARRAY(Integer))) - >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value")) - >>> print(stmt) - {printsql}SELECT unnested_value - FROM unnest(t.value) AS unnested_value - -.. seealso:: - - :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial` - - -Row Types -^^^^^^^^^ - -Built-in support for rendering a ``ROW`` may be approximated using -``func.ROW`` with the :attr:`_sa.func` namespace, or by using the -:func:`_sql.tuple_` construct: - -.. sourcecode:: pycon+sql - - >>> from sqlalchemy import table, column, func, tuple_ - >>> t = table("t", column("id"), column("fk")) - >>> stmt = t.select().where( - ... tuple_(t.c.id, t.c.fk) > (1,2) - ... ).where( - ... func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7) - ... ) - >>> print(stmt) - {printsql}SELECT t.id, t.fk - FROM t - WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2) - -.. seealso:: - - `PostgreSQL Row Constructors - <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_ - - `PostgreSQL Row Constructor Comparison - <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_ - -Table Types passed to Functions -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -PostgreSQL supports passing a table as an argument to a function, which is -known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects -such as :class:`_schema.Table` support this special form using the -:meth:`_sql.FromClause.table_valued` method, which is comparable to the -:meth:`_functions.FunctionElement.table_valued` method except that the collection -of columns is already established by that of the :class:`_sql.FromClause` -itself: - -.. sourcecode:: pycon+sql - - >>> from sqlalchemy import table, column, func, select - >>> a = table( "a", column("id"), column("x"), column("y")) - >>> stmt = select(func.row_to_json(a.table_valued())) - >>> print(stmt) - {printsql}SELECT row_to_json(a) AS row_to_json_1 - FROM a - -.. versionadded:: 1.4.0b2 - - - -""" # noqa: E501 - -from __future__ import annotations - -from collections import defaultdict -from functools import lru_cache -import re -from typing import Any -from typing import cast -from typing import List -from typing import Optional -from typing import Tuple -from typing import TYPE_CHECKING -from typing import Union - -from . import arraylib as _array -from . import json as _json -from . import pg_catalog -from . import ranges as _ranges -from .ext import _regconfig_fn -from .ext import aggregate_order_by -from .hstore import HSTORE -from .named_types import CreateDomainType as CreateDomainType # noqa: F401 -from .named_types import CreateEnumType as CreateEnumType # noqa: F401 -from .named_types import DOMAIN as DOMAIN # noqa: F401 -from .named_types import DropDomainType as DropDomainType # noqa: F401 -from .named_types import DropEnumType as DropEnumType # noqa: F401 -from .named_types import ENUM as ENUM # noqa: F401 -from .named_types import NamedType as NamedType # noqa: F401 -from .types import _DECIMAL_TYPES # noqa: F401 -from .types import _FLOAT_TYPES # noqa: F401 -from .types import _INT_TYPES # noqa: F401 -from .types import BIT as BIT -from .types import BYTEA as BYTEA -from .types import CIDR as CIDR -from .types import CITEXT as CITEXT -from .types import INET as INET -from .types import INTERVAL as INTERVAL -from .types import MACADDR as MACADDR -from .types import MACADDR8 as MACADDR8 -from .types import MONEY as MONEY -from .types import OID as OID -from .types import PGBit as PGBit # noqa: F401 -from .types import PGCidr as PGCidr # noqa: F401 -from .types import PGInet as PGInet # noqa: F401 -from .types import PGInterval as PGInterval # noqa: F401 -from .types import PGMacAddr as PGMacAddr # noqa: F401 -from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401 -from .types import PGUuid as PGUuid -from .types import REGCLASS as REGCLASS -from .types import REGCONFIG as REGCONFIG # noqa: F401 -from .types import TIME as TIME -from .types import TIMESTAMP as TIMESTAMP -from .types import TSVECTOR as TSVECTOR -from ... import exc -from ... import schema -from ... import select -from ... import sql -from ... import util -from ...engine import characteristics -from ...engine import default -from ...engine import interfaces -from ...engine import ObjectKind -from ...engine import ObjectScope -from ...engine import reflection -from ...engine import URL -from ...engine.reflection import ReflectionDefaults -from ...sql import bindparam -from ...sql import coercions -from ...sql import compiler -from ...sql import elements -from ...sql import expression -from ...sql import roles -from ...sql import sqltypes -from ...sql import util as sql_util -from ...sql.compiler import InsertmanyvaluesSentinelOpts -from ...sql.visitors import InternalTraversal -from ...types import BIGINT -from ...types import BOOLEAN -from ...types import CHAR -from ...types import DATE -from ...types import DOUBLE_PRECISION -from ...types import FLOAT -from ...types import INTEGER -from ...types import NUMERIC -from ...types import REAL -from ...types import SMALLINT -from ...types import TEXT -from ...types import UUID as UUID -from ...types import VARCHAR -from ...util.typing import TypedDict - -IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I) - -RESERVED_WORDS = { - "all", - "analyse", - "analyze", - "and", - "any", - "array", - "as", - "asc", - "asymmetric", - "both", - "case", - "cast", - "check", - "collate", - "column", - "constraint", - "create", - "current_catalog", - "current_date", - "current_role", - "current_time", - "current_timestamp", - "current_user", - "default", - "deferrable", - "desc", - "distinct", - "do", - "else", - "end", - "except", - "false", - "fetch", - "for", - "foreign", - "from", - "grant", - "group", - "having", - "in", - "initially", - "intersect", - "into", - "leading", - "limit", - "localtime", - "localtimestamp", - "new", - "not", - "null", - "of", - "off", - "offset", - "old", - "on", - "only", - "or", - "order", - "placing", - "primary", - "references", - "returning", - "select", - "session_user", - "some", - "symmetric", - "table", - "then", - "to", - "trailing", - "true", - "union", - "unique", - "user", - "using", - "variadic", - "when", - "where", - "window", - "with", - "authorization", - "between", - "binary", - "cross", - "current_schema", - "freeze", - "full", - "ilike", - "inner", - "is", - "isnull", - "join", - "left", - "like", - "natural", - "notnull", - "outer", - "over", - "overlaps", - "right", - "similar", - "verbose", -} - -colspecs = { - sqltypes.ARRAY: _array.ARRAY, - sqltypes.Interval: INTERVAL, - sqltypes.Enum: ENUM, - sqltypes.JSON.JSONPathType: _json.JSONPATH, - sqltypes.JSON: _json.JSON, - sqltypes.Uuid: PGUuid, -} - - -ischema_names = { - "_array": _array.ARRAY, - "hstore": HSTORE, - "json": _json.JSON, - "jsonb": _json.JSONB, - "int4range": _ranges.INT4RANGE, - "int8range": _ranges.INT8RANGE, - "numrange": _ranges.NUMRANGE, - "daterange": _ranges.DATERANGE, - "tsrange": _ranges.TSRANGE, - "tstzrange": _ranges.TSTZRANGE, - "int4multirange": _ranges.INT4MULTIRANGE, - "int8multirange": _ranges.INT8MULTIRANGE, - "nummultirange": _ranges.NUMMULTIRANGE, - "datemultirange": _ranges.DATEMULTIRANGE, - "tsmultirange": _ranges.TSMULTIRANGE, - "tstzmultirange": _ranges.TSTZMULTIRANGE, - "integer": INTEGER, - "bigint": BIGINT, - "smallint": SMALLINT, - "character varying": VARCHAR, - "character": CHAR, - '"char"': sqltypes.String, - "name": sqltypes.String, - "text": TEXT, - "numeric": NUMERIC, - "float": FLOAT, - "real": REAL, - "inet": INET, - "cidr": CIDR, - "citext": CITEXT, - "uuid": UUID, - "bit": BIT, - "bit varying": BIT, - "macaddr": MACADDR, - "macaddr8": MACADDR8, - "money": MONEY, - "oid": OID, - "regclass": REGCLASS, - "double precision": DOUBLE_PRECISION, - "timestamp": TIMESTAMP, - "timestamp with time zone": TIMESTAMP, - "timestamp without time zone": TIMESTAMP, - "time with time zone": TIME, - "time without time zone": TIME, - "date": DATE, - "time": TIME, - "bytea": BYTEA, - "boolean": BOOLEAN, - "interval": INTERVAL, - "tsvector": TSVECTOR, -} - - -class PGCompiler(compiler.SQLCompiler): - def visit_to_tsvector_func(self, element, **kw): - return self._assert_pg_ts_ext(element, **kw) - - def visit_to_tsquery_func(self, element, **kw): - return self._assert_pg_ts_ext(element, **kw) - - def visit_plainto_tsquery_func(self, element, **kw): - return self._assert_pg_ts_ext(element, **kw) - - def visit_phraseto_tsquery_func(self, element, **kw): - return self._assert_pg_ts_ext(element, **kw) - - def visit_websearch_to_tsquery_func(self, element, **kw): - return self._assert_pg_ts_ext(element, **kw) - - def visit_ts_headline_func(self, element, **kw): - return self._assert_pg_ts_ext(element, **kw) - - def _assert_pg_ts_ext(self, element, **kw): - if not isinstance(element, _regconfig_fn): - # other options here include trying to rewrite the function - # with the correct types. however, that means we have to - # "un-SQL-ize" the first argument, which can't work in a - # generalized way. Also, parent compiler class has already added - # the incorrect return type to the result map. So let's just - # make sure the function we want is used up front. - - raise exc.CompileError( - f'Can\'t compile "{element.name}()" full text search ' - f"function construct that does not originate from the " - f'"sqlalchemy.dialects.postgresql" package. ' - f'Please ensure "import sqlalchemy.dialects.postgresql" is ' - f"called before constructing " - f'"sqlalchemy.func.{element.name}()" to ensure registration ' - f"of the correct argument and return types." - ) - - return f"{element.name}{self.function_argspec(element, **kw)}" - - def render_bind_cast(self, type_, dbapi_type, sqltext): - if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length: - # use VARCHAR with no length for VARCHAR cast. - # see #9511 - dbapi_type = sqltypes.STRINGTYPE - return f"""{sqltext}::{ - self.dialect.type_compiler_instance.process( - dbapi_type, identifier_preparer=self.preparer - ) - }""" - - def visit_array(self, element, **kw): - return "ARRAY[%s]" % self.visit_clauselist(element, **kw) - - def visit_slice(self, element, **kw): - return "%s:%s" % ( - self.process(element.start, **kw), - self.process(element.stop, **kw), - ) - - def visit_bitwise_xor_op_binary(self, binary, operator, **kw): - return self._generate_generic_binary(binary, " # ", **kw) - - def visit_json_getitem_op_binary( - self, binary, operator, _cast_applied=False, **kw - ): - if ( - not _cast_applied - and binary.type._type_affinity is not sqltypes.JSON - ): - kw["_cast_applied"] = True - return self.process(sql.cast(binary, binary.type), **kw) - - kw["eager_grouping"] = True - - return self._generate_generic_binary( - binary, " -> " if not _cast_applied else " ->> ", **kw - ) - - def visit_json_path_getitem_op_binary( - self, binary, operator, _cast_applied=False, **kw - ): - if ( - not _cast_applied - and binary.type._type_affinity is not sqltypes.JSON - ): - kw["_cast_applied"] = True - return self.process(sql.cast(binary, binary.type), **kw) - - kw["eager_grouping"] = True - return self._generate_generic_binary( - binary, " #> " if not _cast_applied else " #>> ", **kw - ) - - def visit_getitem_binary(self, binary, operator, **kw): - return "%s[%s]" % ( - self.process(binary.left, **kw), - self.process(binary.right, **kw), - ) - - def visit_aggregate_order_by(self, element, **kw): - return "%s ORDER BY %s" % ( - self.process(element.target, **kw), - self.process(element.order_by, **kw), - ) - - def visit_match_op_binary(self, binary, operator, **kw): - if "postgresql_regconfig" in binary.modifiers: - regconfig = self.render_literal_value( - binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE - ) - if regconfig: - return "%s @@ plainto_tsquery(%s, %s)" % ( - self.process(binary.left, **kw), - regconfig, - self.process(binary.right, **kw), - ) - return "%s @@ plainto_tsquery(%s)" % ( - self.process(binary.left, **kw), - self.process(binary.right, **kw), - ) - - def visit_ilike_case_insensitive_operand(self, element, **kw): - return element.element._compiler_dispatch(self, **kw) - - def visit_ilike_op_binary(self, binary, operator, **kw): - escape = binary.modifiers.get("escape", None) - - return "%s ILIKE %s" % ( - self.process(binary.left, **kw), - self.process(binary.right, **kw), - ) + ( - " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) - if escape is not None - else "" - ) - - def visit_not_ilike_op_binary(self, binary, operator, **kw): - escape = binary.modifiers.get("escape", None) - return "%s NOT ILIKE %s" % ( - self.process(binary.left, **kw), - self.process(binary.right, **kw), - ) + ( - " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) - if escape is not None - else "" - ) - - def _regexp_match(self, base_op, binary, operator, kw): - flags = binary.modifiers["flags"] - if flags is None: - return self._generate_generic_binary( - binary, " %s " % base_op, **kw - ) - if flags == "i": - return self._generate_generic_binary( - binary, " %s* " % base_op, **kw - ) - return "%s %s CONCAT('(?', %s, ')', %s)" % ( - self.process(binary.left, **kw), - base_op, - self.render_literal_value(flags, sqltypes.STRINGTYPE), - self.process(binary.right, **kw), - ) - - def visit_regexp_match_op_binary(self, binary, operator, **kw): - return self._regexp_match("~", binary, operator, kw) - - def visit_not_regexp_match_op_binary(self, binary, operator, **kw): - return self._regexp_match("!~", binary, operator, kw) - - def visit_regexp_replace_op_binary(self, binary, operator, **kw): - string = self.process(binary.left, **kw) - pattern_replace = self.process(binary.right, **kw) - flags = binary.modifiers["flags"] - if flags is None: - return "REGEXP_REPLACE(%s, %s)" % ( - string, - pattern_replace, - ) - else: - return "REGEXP_REPLACE(%s, %s, %s)" % ( - string, - pattern_replace, - self.render_literal_value(flags, sqltypes.STRINGTYPE), - ) - - def visit_empty_set_expr(self, element_types, **kw): - # cast the empty set to the type we are comparing against. if - # we are comparing against the null type, pick an arbitrary - # datatype for the empty set - return "SELECT %s WHERE 1!=1" % ( - ", ".join( - "CAST(NULL AS %s)" - % self.dialect.type_compiler_instance.process( - INTEGER() if type_._isnull else type_ - ) - for type_ in element_types or [INTEGER()] - ), - ) - - def render_literal_value(self, value, type_): - value = super().render_literal_value(value, type_) - - if self.dialect._backslash_escapes: - value = value.replace("\\", "\\\\") - return value - - def visit_aggregate_strings_func(self, fn, **kw): - return "string_agg%s" % self.function_argspec(fn) - - def visit_sequence(self, seq, **kw): - return "nextval('%s')" % self.preparer.format_sequence(seq) - - def limit_clause(self, select, **kw): - text = "" - if select._limit_clause is not None: - text += " \n LIMIT " + self.process(select._limit_clause, **kw) - if select._offset_clause is not None: - if select._limit_clause is None: - text += "\n LIMIT ALL" - text += " OFFSET " + self.process(select._offset_clause, **kw) - return text - - def format_from_hint_text(self, sqltext, table, hint, iscrud): - if hint.upper() != "ONLY": - raise exc.CompileError("Unrecognized hint: %r" % hint) - return "ONLY " + sqltext - - def get_select_precolumns(self, select, **kw): - # Do not call super().get_select_precolumns because - # it will warn/raise when distinct on is present - if select._distinct or select._distinct_on: - if select._distinct_on: - return ( - "DISTINCT ON (" - + ", ".join( - [ - self.process(col, **kw) - for col in select._distinct_on - ] - ) - + ") " - ) - else: - return "DISTINCT " - else: - return "" - - def for_update_clause(self, select, **kw): - if select._for_update_arg.read: - if select._for_update_arg.key_share: - tmp = " FOR KEY SHARE" - else: - tmp = " FOR SHARE" - elif select._for_update_arg.key_share: - tmp = " FOR NO KEY UPDATE" - else: - tmp = " FOR UPDATE" - - if select._for_update_arg.of: - tables = util.OrderedSet() - for c in select._for_update_arg.of: - tables.update(sql_util.surface_selectables_only(c)) - - tmp += " OF " + ", ".join( - self.process(table, ashint=True, use_schema=False, **kw) - for table in tables - ) - - if select._for_update_arg.nowait: - tmp += " NOWAIT" - if select._for_update_arg.skip_locked: - tmp += " SKIP LOCKED" - - return tmp - - def visit_substring_func(self, func, **kw): - s = self.process(func.clauses.clauses[0], **kw) - start = self.process(func.clauses.clauses[1], **kw) - if len(func.clauses.clauses) > 2: - length = self.process(func.clauses.clauses[2], **kw) - return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) - else: - return "SUBSTRING(%s FROM %s)" % (s, start) - - def _on_conflict_target(self, clause, **kw): - if clause.constraint_target is not None: - # target may be a name of an Index, UniqueConstraint or - # ExcludeConstraint. While there is a separate - # "max_identifier_length" for indexes, PostgreSQL uses the same - # length for all objects so we can use - # truncate_and_render_constraint_name - target_text = ( - "ON CONSTRAINT %s" - % self.preparer.truncate_and_render_constraint_name( - clause.constraint_target - ) - ) - elif clause.inferred_target_elements is not None: - target_text = "(%s)" % ", ".join( - ( - self.preparer.quote(c) - if isinstance(c, str) - else self.process(c, include_table=False, use_schema=False) - ) - for c in clause.inferred_target_elements - ) - if clause.inferred_target_whereclause is not None: - target_text += " WHERE %s" % self.process( - clause.inferred_target_whereclause, - include_table=False, - use_schema=False, - ) - else: - target_text = "" - - return target_text - - def visit_on_conflict_do_nothing(self, on_conflict, **kw): - target_text = self._on_conflict_target(on_conflict, **kw) - - if target_text: - return "ON CONFLICT %s DO NOTHING" % target_text - else: - return "ON CONFLICT DO NOTHING" - - def visit_on_conflict_do_update(self, on_conflict, **kw): - clause = on_conflict - - target_text = self._on_conflict_target(on_conflict, **kw) - - action_set_ops = [] - - set_parameters = dict(clause.update_values_to_set) - # create a list of column assignment clauses as tuples - - insert_statement = self.stack[-1]["selectable"] - cols = insert_statement.table.c - for c in cols: - col_key = c.key - - if col_key in set_parameters: - value = set_parameters.pop(col_key) - elif c in set_parameters: - value = set_parameters.pop(c) - else: - continue - - if coercions._is_literal(value): - value = elements.BindParameter(None, value, type_=c.type) - - else: - if ( - isinstance(value, elements.BindParameter) - and value.type._isnull - ): - value = value._clone() - value.type = c.type - value_text = self.process(value.self_group(), use_schema=False) - - key_text = self.preparer.quote(c.name) - action_set_ops.append("%s = %s" % (key_text, value_text)) - - # check for names that don't match columns - if set_parameters: - util.warn( - "Additional column names not matching " - "any column keys in table '%s': %s" - % ( - self.current_executable.table.name, - (", ".join("'%s'" % c for c in set_parameters)), - ) - ) - for k, v in set_parameters.items(): - key_text = ( - self.preparer.quote(k) - if isinstance(k, str) - else self.process(k, use_schema=False) - ) - value_text = self.process( - coercions.expect(roles.ExpressionElementRole, v), - use_schema=False, - ) - action_set_ops.append("%s = %s" % (key_text, value_text)) - - action_text = ", ".join(action_set_ops) - if clause.update_whereclause is not None: - action_text += " WHERE %s" % self.process( - clause.update_whereclause, include_table=True, use_schema=False - ) - - return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) - - def update_from_clause( - self, update_stmt, from_table, extra_froms, from_hints, **kw - ): - kw["asfrom"] = True - return "FROM " + ", ".join( - t._compiler_dispatch(self, fromhints=from_hints, **kw) - for t in extra_froms - ) - - def delete_extra_from_clause( - self, delete_stmt, from_table, extra_froms, from_hints, **kw - ): - """Render the DELETE .. USING clause specific to PostgreSQL.""" - kw["asfrom"] = True - return "USING " + ", ".join( - t._compiler_dispatch(self, fromhints=from_hints, **kw) - for t in extra_froms - ) - - def fetch_clause(self, select, **kw): - # pg requires parens for non literal clauses. It's also required for - # bind parameters if a ::type casts is used by the driver (asyncpg), - # so it's easiest to just always add it - text = "" - if select._offset_clause is not None: - text += "\n OFFSET (%s) ROWS" % self.process( - select._offset_clause, **kw - ) - if select._fetch_clause is not None: - text += "\n FETCH FIRST (%s)%s ROWS %s" % ( - self.process(select._fetch_clause, **kw), - " PERCENT" if select._fetch_clause_options["percent"] else "", - ( - "WITH TIES" - if select._fetch_clause_options["with_ties"] - else "ONLY" - ), - ) - return text - - -class PGDDLCompiler(compiler.DDLCompiler): - def get_column_specification(self, column, **kwargs): - colspec = self.preparer.format_column(column) - impl_type = column.type.dialect_impl(self.dialect) - if isinstance(impl_type, sqltypes.TypeDecorator): - impl_type = impl_type.impl - - has_identity = ( - column.identity is not None - and self.dialect.supports_identity_columns - ) - - if ( - column.primary_key - and column is column.table._autoincrement_column - and ( - self.dialect.supports_smallserial - or not isinstance(impl_type, sqltypes.SmallInteger) - ) - and not has_identity - and ( - column.default is None - or ( - isinstance(column.default, schema.Sequence) - and column.default.optional - ) - ) - ): - if isinstance(impl_type, sqltypes.BigInteger): - colspec += " BIGSERIAL" - elif isinstance(impl_type, sqltypes.SmallInteger): - colspec += " SMALLSERIAL" - else: - colspec += " SERIAL" - else: - colspec += " " + self.dialect.type_compiler_instance.process( - column.type, - type_expression=column, - identifier_preparer=self.preparer, - ) - default = self.get_column_default_string(column) - if default is not None: - colspec += " DEFAULT " + default - - if column.computed is not None: - colspec += " " + self.process(column.computed) - if has_identity: - colspec += " " + self.process(column.identity) - - if not column.nullable and not has_identity: - colspec += " NOT NULL" - elif column.nullable and has_identity: - colspec += " NULL" - return colspec - - def _define_constraint_validity(self, constraint): - not_valid = constraint.dialect_options["postgresql"]["not_valid"] - return " NOT VALID" if not_valid else "" - - def visit_check_constraint(self, constraint, **kw): - if constraint._type_bound: - typ = list(constraint.columns)[0].type - if ( - isinstance(typ, sqltypes.ARRAY) - and isinstance(typ.item_type, sqltypes.Enum) - and not typ.item_type.native_enum - ): - raise exc.CompileError( - "PostgreSQL dialect cannot produce the CHECK constraint " - "for ARRAY of non-native ENUM; please specify " - "create_constraint=False on this Enum datatype." - ) - - text = super().visit_check_constraint(constraint) - text += self._define_constraint_validity(constraint) - return text - - def visit_foreign_key_constraint(self, constraint, **kw): - text = super().visit_foreign_key_constraint(constraint) - text += self._define_constraint_validity(constraint) - return text - - def visit_create_enum_type(self, create, **kw): - type_ = create.element - - return "CREATE TYPE %s AS ENUM (%s)" % ( - self.preparer.format_type(type_), - ", ".join( - self.sql_compiler.process(sql.literal(e), literal_binds=True) - for e in type_.enums - ), - ) - - def visit_drop_enum_type(self, drop, **kw): - type_ = drop.element - - return "DROP TYPE %s" % (self.preparer.format_type(type_)) - - def visit_create_domain_type(self, create, **kw): - domain: DOMAIN = create.element - - options = [] - if domain.collation is not None: - options.append(f"COLLATE {self.preparer.quote(domain.collation)}") - if domain.default is not None: - default = self.render_default_string(domain.default) - options.append(f"DEFAULT {default}") - if domain.constraint_name is not None: - name = self.preparer.truncate_and_render_constraint_name( - domain.constraint_name - ) - options.append(f"CONSTRAINT {name}") - if domain.not_null: - options.append("NOT NULL") - if domain.check is not None: - check = self.sql_compiler.process( - domain.check, include_table=False, literal_binds=True - ) - options.append(f"CHECK ({check})") - - return ( - f"CREATE DOMAIN {self.preparer.format_type(domain)} AS " - f"{self.type_compiler.process(domain.data_type)} " - f"{' '.join(options)}" - ) - - def visit_drop_domain_type(self, drop, **kw): - domain = drop.element - return f"DROP DOMAIN {self.preparer.format_type(domain)}" - - def visit_create_index(self, create, **kw): - preparer = self.preparer - index = create.element - self._verify_index_table(index) - text = "CREATE " - if index.unique: - text += "UNIQUE " - - text += "INDEX " - - if self.dialect._supports_create_index_concurrently: - concurrently = index.dialect_options["postgresql"]["concurrently"] - if concurrently: - text += "CONCURRENTLY " - - if create.if_not_exists: - text += "IF NOT EXISTS " - - text += "%s ON %s " % ( - self._prepared_index_name(index, include_schema=False), - preparer.format_table(index.table), - ) - - using = index.dialect_options["postgresql"]["using"] - if using: - text += ( - "USING %s " - % self.preparer.validate_sql_phrase(using, IDX_USING).lower() - ) - - ops = index.dialect_options["postgresql"]["ops"] - text += "(%s)" % ( - ", ".join( - [ - self.sql_compiler.process( - ( - expr.self_group() - if not isinstance(expr, expression.ColumnClause) - else expr - ), - include_table=False, - literal_binds=True, - ) - + ( - (" " + ops[expr.key]) - if hasattr(expr, "key") and expr.key in ops - else "" - ) - for expr in index.expressions - ] - ) - ) - - includeclause = index.dialect_options["postgresql"]["include"] - if includeclause: - inclusions = [ - index.table.c[col] if isinstance(col, str) else col - for col in includeclause - ] - text += " INCLUDE (%s)" % ", ".join( - [preparer.quote(c.name) for c in inclusions] - ) - - nulls_not_distinct = index.dialect_options["postgresql"][ - "nulls_not_distinct" - ] - if nulls_not_distinct is True: - text += " NULLS NOT DISTINCT" - elif nulls_not_distinct is False: - text += " NULLS DISTINCT" - - withclause = index.dialect_options["postgresql"]["with"] - if withclause: - text += " WITH (%s)" % ( - ", ".join( - [ - "%s = %s" % storage_parameter - for storage_parameter in withclause.items() - ] - ) - ) - - tablespace_name = index.dialect_options["postgresql"]["tablespace"] - if tablespace_name: - text += " TABLESPACE %s" % preparer.quote(tablespace_name) - - whereclause = index.dialect_options["postgresql"]["where"] - if whereclause is not None: - whereclause = coercions.expect( - roles.DDLExpressionRole, whereclause - ) - - where_compiled = self.sql_compiler.process( - whereclause, include_table=False, literal_binds=True - ) - text += " WHERE " + where_compiled - - return text - - def define_unique_constraint_distinct(self, constraint, **kw): - nulls_not_distinct = constraint.dialect_options["postgresql"][ - "nulls_not_distinct" - ] - if nulls_not_distinct is True: - nulls_not_distinct_param = "NULLS NOT DISTINCT " - elif nulls_not_distinct is False: - nulls_not_distinct_param = "NULLS DISTINCT " - else: - nulls_not_distinct_param = "" - return nulls_not_distinct_param - - def visit_drop_index(self, drop, **kw): - index = drop.element - - text = "\nDROP INDEX " - - if self.dialect._supports_drop_index_concurrently: - concurrently = index.dialect_options["postgresql"]["concurrently"] - if concurrently: - text += "CONCURRENTLY " - - if drop.if_exists: - text += "IF EXISTS " - - text += self._prepared_index_name(index, include_schema=True) - return text - - def visit_exclude_constraint(self, constraint, **kw): - text = "" - if constraint.name is not None: - text += "CONSTRAINT %s " % self.preparer.format_constraint( - constraint - ) - elements = [] - kw["include_table"] = False - kw["literal_binds"] = True - for expr, name, op in constraint._render_exprs: - exclude_element = self.sql_compiler.process(expr, **kw) + ( - (" " + constraint.ops[expr.key]) - if hasattr(expr, "key") and expr.key in constraint.ops - else "" - ) - - elements.append("%s WITH %s" % (exclude_element, op)) - text += "EXCLUDE USING %s (%s)" % ( - self.preparer.validate_sql_phrase( - constraint.using, IDX_USING - ).lower(), - ", ".join(elements), - ) - if constraint.where is not None: - text += " WHERE (%s)" % self.sql_compiler.process( - constraint.where, literal_binds=True - ) - text += self.define_constraint_deferrability(constraint) - return text - - def post_create_table(self, table): - table_opts = [] - pg_opts = table.dialect_options["postgresql"] - - inherits = pg_opts.get("inherits") - if inherits is not None: - if not isinstance(inherits, (list, tuple)): - inherits = (inherits,) - table_opts.append( - "\n INHERITS ( " - + ", ".join(self.preparer.quote(name) for name in inherits) - + " )" - ) - - if pg_opts["partition_by"]: - table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"]) - - if pg_opts["using"]: - table_opts.append("\n USING %s" % pg_opts["using"]) - - if pg_opts["with_oids"] is True: - table_opts.append("\n WITH OIDS") - elif pg_opts["with_oids"] is False: - table_opts.append("\n WITHOUT OIDS") - - if pg_opts["on_commit"]: - on_commit_options = pg_opts["on_commit"].replace("_", " ").upper() - table_opts.append("\n ON COMMIT %s" % on_commit_options) - - if pg_opts["tablespace"]: - tablespace_name = pg_opts["tablespace"] - table_opts.append( - "\n TABLESPACE %s" % self.preparer.quote(tablespace_name) - ) - - return "".join(table_opts) - - def visit_computed_column(self, generated, **kw): - if generated.persisted is False: - raise exc.CompileError( - "PostrgreSQL computed columns do not support 'virtual' " - "persistence; set the 'persisted' flag to None or True for " - "PostgreSQL support." - ) - - return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process( - generated.sqltext, include_table=False, literal_binds=True - ) - - def visit_create_sequence(self, create, **kw): - prefix = None - if create.element.data_type is not None: - prefix = " AS %s" % self.type_compiler.process( - create.element.data_type - ) - - return super().visit_create_sequence(create, prefix=prefix, **kw) - - def _can_comment_on_constraint(self, ddl_instance): - constraint = ddl_instance.element - if constraint.name is None: - raise exc.CompileError( - f"Can't emit COMMENT ON for constraint {constraint!r}: " - "it has no name" - ) - if constraint.table is None: - raise exc.CompileError( - f"Can't emit COMMENT ON for constraint {constraint!r}: " - "it has no associated table" - ) - - def visit_set_constraint_comment(self, create, **kw): - self._can_comment_on_constraint(create) - return "COMMENT ON CONSTRAINT %s ON %s IS %s" % ( - self.preparer.format_constraint(create.element), - self.preparer.format_table(create.element.table), - self.sql_compiler.render_literal_value( - create.element.comment, sqltypes.String() - ), - ) - - def visit_drop_constraint_comment(self, drop, **kw): - self._can_comment_on_constraint(drop) - return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % ( - self.preparer.format_constraint(drop.element), - self.preparer.format_table(drop.element.table), - ) - - -class PGTypeCompiler(compiler.GenericTypeCompiler): - def visit_TSVECTOR(self, type_, **kw): - return "TSVECTOR" - - def visit_TSQUERY(self, type_, **kw): - return "TSQUERY" - - def visit_INET(self, type_, **kw): - return "INET" - - def visit_CIDR(self, type_, **kw): - return "CIDR" - - def visit_CITEXT(self, type_, **kw): - return "CITEXT" - - def visit_MACADDR(self, type_, **kw): - return "MACADDR" - - def visit_MACADDR8(self, type_, **kw): - return "MACADDR8" - - def visit_MONEY(self, type_, **kw): - return "MONEY" - - def visit_OID(self, type_, **kw): - return "OID" - - def visit_REGCONFIG(self, type_, **kw): - return "REGCONFIG" - - def visit_REGCLASS(self, type_, **kw): - return "REGCLASS" - - def visit_FLOAT(self, type_, **kw): - if not type_.precision: - return "FLOAT" - else: - return "FLOAT(%(precision)s)" % {"precision": type_.precision} - - def visit_double(self, type_, **kw): - return self.visit_DOUBLE_PRECISION(type, **kw) - - def visit_BIGINT(self, type_, **kw): - return "BIGINT" - - def visit_HSTORE(self, type_, **kw): - return "HSTORE" - - def visit_JSON(self, type_, **kw): - return "JSON" - - def visit_JSONB(self, type_, **kw): - return "JSONB" - - def visit_INT4MULTIRANGE(self, type_, **kw): - return "INT4MULTIRANGE" - - def visit_INT8MULTIRANGE(self, type_, **kw): - return "INT8MULTIRANGE" - - def visit_NUMMULTIRANGE(self, type_, **kw): - return "NUMMULTIRANGE" - - def visit_DATEMULTIRANGE(self, type_, **kw): - return "DATEMULTIRANGE" - - def visit_TSMULTIRANGE(self, type_, **kw): - return "TSMULTIRANGE" - - def visit_TSTZMULTIRANGE(self, type_, **kw): - return "TSTZMULTIRANGE" - - def visit_INT4RANGE(self, type_, **kw): - return "INT4RANGE" - - def visit_INT8RANGE(self, type_, **kw): - return "INT8RANGE" - - def visit_NUMRANGE(self, type_, **kw): - return "NUMRANGE" - - def visit_DATERANGE(self, type_, **kw): - return "DATERANGE" - - def visit_TSRANGE(self, type_, **kw): - return "TSRANGE" - - def visit_TSTZRANGE(self, type_, **kw): - return "TSTZRANGE" - - def visit_json_int_index(self, type_, **kw): - return "INT" - - def visit_json_str_index(self, type_, **kw): - return "TEXT" - - def visit_datetime(self, type_, **kw): - return self.visit_TIMESTAMP(type_, **kw) - - def visit_enum(self, type_, **kw): - if not type_.native_enum or not self.dialect.supports_native_enum: - return super().visit_enum(type_, **kw) - else: - return self.visit_ENUM(type_, **kw) - - def visit_ENUM(self, type_, identifier_preparer=None, **kw): - if identifier_preparer is None: - identifier_preparer = self.dialect.identifier_preparer - return identifier_preparer.format_type(type_) - - def visit_DOMAIN(self, type_, identifier_preparer=None, **kw): - if identifier_preparer is None: - identifier_preparer = self.dialect.identifier_preparer - return identifier_preparer.format_type(type_) - - def visit_TIMESTAMP(self, type_, **kw): - return "TIMESTAMP%s %s" % ( - ( - "(%d)" % type_.precision - if getattr(type_, "precision", None) is not None - else "" - ), - (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", - ) - - def visit_TIME(self, type_, **kw): - return "TIME%s %s" % ( - ( - "(%d)" % type_.precision - if getattr(type_, "precision", None) is not None - else "" - ), - (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", - ) - - def visit_INTERVAL(self, type_, **kw): - text = "INTERVAL" - if type_.fields is not None: - text += " " + type_.fields - if type_.precision is not None: - text += " (%d)" % type_.precision - return text - - def visit_BIT(self, type_, **kw): - if type_.varying: - compiled = "BIT VARYING" - if type_.length is not None: - compiled += "(%d)" % type_.length - else: - compiled = "BIT(%d)" % type_.length - return compiled - - def visit_uuid(self, type_, **kw): - if type_.native_uuid: - return self.visit_UUID(type_, **kw) - else: - return super().visit_uuid(type_, **kw) - - def visit_UUID(self, type_, **kw): - return "UUID" - - def visit_large_binary(self, type_, **kw): - return self.visit_BYTEA(type_, **kw) - - def visit_BYTEA(self, type_, **kw): - return "BYTEA" - - def visit_ARRAY(self, type_, **kw): - inner = self.process(type_.item_type, **kw) - return re.sub( - r"((?: COLLATE.*)?)$", - ( - r"%s\1" - % ( - "[]" - * (type_.dimensions if type_.dimensions is not None else 1) - ) - ), - inner, - count=1, - ) - - def visit_json_path(self, type_, **kw): - return self.visit_JSONPATH(type_, **kw) - - def visit_JSONPATH(self, type_, **kw): - return "JSONPATH" - - -class PGIdentifierPreparer(compiler.IdentifierPreparer): - reserved_words = RESERVED_WORDS - - def _unquote_identifier(self, value): - if value[0] == self.initial_quote: - value = value[1:-1].replace( - self.escape_to_quote, self.escape_quote - ) - return value - - def format_type(self, type_, use_schema=True): - if not type_.name: - raise exc.CompileError( - f"PostgreSQL {type_.__class__.__name__} type requires a name." - ) - - name = self.quote(type_.name) - effective_schema = self.schema_for_object(type_) - - if ( - not self.omit_schema - and use_schema - and effective_schema is not None - ): - name = f"{self.quote_schema(effective_schema)}.{name}" - return name - - -class ReflectedNamedType(TypedDict): - """Represents a reflected named type.""" - - name: str - """Name of the type.""" - schema: str - """The schema of the type.""" - visible: bool - """Indicates if this type is in the current search path.""" - - -class ReflectedDomainConstraint(TypedDict): - """Represents a reflect check constraint of a domain.""" - - name: str - """Name of the constraint.""" - check: str - """The check constraint text.""" - - -class ReflectedDomain(ReflectedNamedType): - """Represents a reflected enum.""" - - type: str - """The string name of the underlying data type of the domain.""" - nullable: bool - """Indicates if the domain allows null or not.""" - default: Optional[str] - """The string representation of the default value of this domain - or ``None`` if none present. - """ - constraints: List[ReflectedDomainConstraint] - """The constraints defined in the domain, if any. - The constraint are in order of evaluation by postgresql. - """ - collation: Optional[str] - """The collation for the domain.""" - - -class ReflectedEnum(ReflectedNamedType): - """Represents a reflected enum.""" - - labels: List[str] - """The labels that compose the enum.""" - - -class PGInspector(reflection.Inspector): - dialect: PGDialect - - def get_table_oid( - self, table_name: str, schema: Optional[str] = None - ) -> int: - """Return the OID for the given table name. - - :param table_name: string name of the table. For special quoting, - use :class:`.quoted_name`. - - :param schema: string schema name; if omitted, uses the default schema - of the database connection. For special quoting, - use :class:`.quoted_name`. - - """ - - with self._operation_context() as conn: - return self.dialect.get_table_oid( - conn, table_name, schema, info_cache=self.info_cache - ) - - def get_domains( - self, schema: Optional[str] = None - ) -> List[ReflectedDomain]: - """Return a list of DOMAIN objects. - - Each member is a dictionary containing these fields: - - * name - name of the domain - * schema - the schema name for the domain. - * visible - boolean, whether or not this domain is visible - in the default search path. - * type - the type defined by this domain. - * nullable - Indicates if this domain can be ``NULL``. - * default - The default value of the domain or ``None`` if the - domain has no default. - * constraints - A list of dict wit the constraint defined by this - domain. Each element constaints two keys: ``name`` of the - constraint and ``check`` with the constraint text. - - :param schema: schema name. If None, the default schema - (typically 'public') is used. May also be set to ``'*'`` to - indicate load domains for all schemas. - - .. versionadded:: 2.0 - - """ - with self._operation_context() as conn: - return self.dialect._load_domains( - conn, schema, info_cache=self.info_cache - ) - - def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]: - """Return a list of ENUM objects. - - Each member is a dictionary containing these fields: - - * name - name of the enum - * schema - the schema name for the enum. - * visible - boolean, whether or not this enum is visible - in the default search path. - * labels - a list of string labels that apply to the enum. - - :param schema: schema name. If None, the default schema - (typically 'public') is used. May also be set to ``'*'`` to - indicate load enums for all schemas. - - """ - with self._operation_context() as conn: - return self.dialect._load_enums( - conn, schema, info_cache=self.info_cache - ) - - def get_foreign_table_names( - self, schema: Optional[str] = None - ) -> List[str]: - """Return a list of FOREIGN TABLE names. - - Behavior is similar to that of - :meth:`_reflection.Inspector.get_table_names`, - except that the list is limited to those tables that report a - ``relkind`` value of ``f``. - - """ - with self._operation_context() as conn: - return self.dialect._get_foreign_table_names( - conn, schema, info_cache=self.info_cache - ) - - def has_type( - self, type_name: str, schema: Optional[str] = None, **kw: Any - ) -> bool: - """Return if the database has the specified type in the provided - schema. - - :param type_name: the type to check. - :param schema: schema name. If None, the default schema - (typically 'public') is used. May also be set to ``'*'`` to - check in all schemas. - - .. versionadded:: 2.0 - - """ - with self._operation_context() as conn: - return self.dialect.has_type( - conn, type_name, schema, info_cache=self.info_cache - ) - - -class PGExecutionContext(default.DefaultExecutionContext): - def fire_sequence(self, seq, type_): - return self._execute_scalar( - ( - "select nextval('%s')" - % self.identifier_preparer.format_sequence(seq) - ), - type_, - ) - - def get_insert_default(self, column): - if column.primary_key and column is column.table._autoincrement_column: - if column.server_default and column.server_default.has_argument: - # pre-execute passive defaults on primary key columns - return self._execute_scalar( - "select %s" % column.server_default.arg, column.type - ) - - elif column.default is None or ( - column.default.is_sequence and column.default.optional - ): - # execute the sequence associated with a SERIAL primary - # key column. for non-primary-key SERIAL, the ID just - # generates server side. - - try: - seq_name = column._postgresql_seq_name - except AttributeError: - tab = column.table.name - col = column.name - tab = tab[0 : 29 + max(0, (29 - len(col)))] - col = col[0 : 29 + max(0, (29 - len(tab)))] - name = "%s_%s_seq" % (tab, col) - column._postgresql_seq_name = seq_name = name - - if column.table is not None: - effective_schema = self.connection.schema_for_object( - column.table - ) - else: - effective_schema = None - - if effective_schema is not None: - exc = 'select nextval(\'"%s"."%s"\')' % ( - effective_schema, - seq_name, - ) - else: - exc = "select nextval('\"%s\"')" % (seq_name,) - - return self._execute_scalar(exc, column.type) - - return super().get_insert_default(column) - - -class PGReadOnlyConnectionCharacteristic( - characteristics.ConnectionCharacteristic -): - transactional = True - - def reset_characteristic(self, dialect, dbapi_conn): - dialect.set_readonly(dbapi_conn, False) - - def set_characteristic(self, dialect, dbapi_conn, value): - dialect.set_readonly(dbapi_conn, value) - - def get_characteristic(self, dialect, dbapi_conn): - return dialect.get_readonly(dbapi_conn) - - -class PGDeferrableConnectionCharacteristic( - characteristics.ConnectionCharacteristic -): - transactional = True - - def reset_characteristic(self, dialect, dbapi_conn): - dialect.set_deferrable(dbapi_conn, False) - - def set_characteristic(self, dialect, dbapi_conn, value): - dialect.set_deferrable(dbapi_conn, value) - - def get_characteristic(self, dialect, dbapi_conn): - return dialect.get_deferrable(dbapi_conn) - - -class PGDialect(default.DefaultDialect): - name = "postgresql" - supports_statement_cache = True - supports_alter = True - max_identifier_length = 63 - supports_sane_rowcount = True - - bind_typing = interfaces.BindTyping.RENDER_CASTS - - supports_native_enum = True - supports_native_boolean = True - supports_native_uuid = True - supports_smallserial = True - - supports_sequences = True - sequences_optional = True - preexecute_autoincrement_sequences = True - postfetch_lastrowid = False - use_insertmanyvalues = True - - returns_native_bytes = True - - insertmanyvalues_implicit_sentinel = ( - InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT - | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT - | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS - ) - - supports_comments = True - supports_constraint_comments = True - supports_default_values = True - - supports_default_metavalue = True - - supports_empty_insert = False - supports_multivalues_insert = True - - supports_identity_columns = True - - default_paramstyle = "pyformat" - ischema_names = ischema_names - colspecs = colspecs - - statement_compiler = PGCompiler - ddl_compiler = PGDDLCompiler - type_compiler_cls = PGTypeCompiler - preparer = PGIdentifierPreparer - execution_ctx_cls = PGExecutionContext - inspector = PGInspector - - update_returning = True - delete_returning = True - insert_returning = True - update_returning_multifrom = True - delete_returning_multifrom = True - - connection_characteristics = ( - default.DefaultDialect.connection_characteristics - ) - connection_characteristics = connection_characteristics.union( - { - "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), - "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), - } - ) - - construct_arguments = [ - ( - schema.Index, - { - "using": False, - "include": None, - "where": None, - "ops": {}, - "concurrently": False, - "with": {}, - "tablespace": None, - "nulls_not_distinct": None, - }, - ), - ( - schema.Table, - { - "ignore_search_path": False, - "tablespace": None, - "partition_by": None, - "with_oids": None, - "on_commit": None, - "inherits": None, - "using": None, - }, - ), - ( - schema.CheckConstraint, - { - "not_valid": False, - }, - ), - ( - schema.ForeignKeyConstraint, - { - "not_valid": False, - }, - ), - ( - schema.UniqueConstraint, - {"nulls_not_distinct": None}, - ), - ] - - reflection_options = ("postgresql_ignore_search_path",) - - _backslash_escapes = True - _supports_create_index_concurrently = True - _supports_drop_index_concurrently = True - - def __init__( - self, - native_inet_types=None, - json_serializer=None, - json_deserializer=None, - **kwargs, - ): - default.DefaultDialect.__init__(self, **kwargs) - - self._native_inet_types = native_inet_types - self._json_deserializer = json_deserializer - self._json_serializer = json_serializer - - def initialize(self, connection): - super().initialize(connection) - - # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689 - self.supports_smallserial = self.server_version_info >= (9, 2) - - self._set_backslash_escapes(connection) - - self._supports_drop_index_concurrently = self.server_version_info >= ( - 9, - 2, - ) - self.supports_identity_columns = self.server_version_info >= (10,) - - def get_isolation_level_values(self, dbapi_conn): - # note the generic dialect doesn't have AUTOCOMMIT, however - # all postgresql dialects should include AUTOCOMMIT. - return ( - "SERIALIZABLE", - "READ UNCOMMITTED", - "READ COMMITTED", - "REPEATABLE READ", - ) - - def set_isolation_level(self, dbapi_connection, level): - cursor = dbapi_connection.cursor() - cursor.execute( - "SET SESSION CHARACTERISTICS AS TRANSACTION " - f"ISOLATION LEVEL {level}" - ) - cursor.execute("COMMIT") - cursor.close() - - def get_isolation_level(self, dbapi_connection): - cursor = dbapi_connection.cursor() - cursor.execute("show transaction isolation level") - val = cursor.fetchone()[0] - cursor.close() - return val.upper() - - def set_readonly(self, connection, value): - raise NotImplementedError() - - def get_readonly(self, connection): - raise NotImplementedError() - - def set_deferrable(self, connection, value): - raise NotImplementedError() - - def get_deferrable(self, connection): - raise NotImplementedError() - - def _split_multihost_from_url(self, url: URL) -> Union[ - Tuple[None, None], - Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]], - ]: - hosts: Optional[Tuple[Optional[str], ...]] = None - ports_str: Union[str, Tuple[Optional[str], ...], None] = None - - integrated_multihost = False - - if "host" in url.query: - if isinstance(url.query["host"], (list, tuple)): - integrated_multihost = True - hosts, ports_str = zip( - *[ - token.split(":") if ":" in token else (token, None) - for token in url.query["host"] - ] - ) - - elif isinstance(url.query["host"], str): - hosts = tuple(url.query["host"].split(",")) - - if ( - "port" not in url.query - and len(hosts) == 1 - and ":" in hosts[0] - ): - # internet host is alphanumeric plus dots or hyphens. - # this is essentially rfc1123, which refers to rfc952. - # https://stackoverflow.com/questions/3523028/ - # valid-characters-of-a-hostname - host_port_match = re.match( - r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0] - ) - if host_port_match: - integrated_multihost = True - h, p = host_port_match.group(1, 2) - if TYPE_CHECKING: - assert isinstance(h, str) - assert isinstance(p, str) - hosts = (h,) - ports_str = cast( - "Tuple[Optional[str], ...]", (p,) if p else (None,) - ) - - if "port" in url.query: - if integrated_multihost: - raise exc.ArgumentError( - "Can't mix 'multihost' formats together; use " - '"host=h1,h2,h3&port=p1,p2,p3" or ' - '"host=h1:p1&host=h2:p2&host=h3:p3" separately' - ) - if isinstance(url.query["port"], (list, tuple)): - ports_str = url.query["port"] - elif isinstance(url.query["port"], str): - ports_str = tuple(url.query["port"].split(",")) - - ports: Optional[Tuple[Optional[int], ...]] = None - - if ports_str: - try: - ports = tuple(int(x) if x else None for x in ports_str) - except ValueError: - raise exc.ArgumentError( - f"Received non-integer port arguments: {ports_str}" - ) from None - - if ports and ( - (not hosts and len(ports) > 1) - or ( - hosts - and ports - and len(hosts) != len(ports) - and (len(hosts) > 1 or len(ports) > 1) - ) - ): - raise exc.ArgumentError("number of hosts and ports don't match") - - if hosts is not None: - if ports is None: - ports = tuple(None for _ in hosts) - - return hosts, ports # type: ignore - - def do_begin_twophase(self, connection, xid): - self.do_begin(connection.connection) - - def do_prepare_twophase(self, connection, xid): - connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) - - def do_rollback_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - if is_prepared: - if recover: - # FIXME: ugly hack to get out of transaction - # context when committing recoverable transactions - # Must find out a way how to make the dbapi not - # open a transaction. - connection.exec_driver_sql("ROLLBACK") - connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) - connection.exec_driver_sql("BEGIN") - self.do_rollback(connection.connection) - else: - self.do_rollback(connection.connection) - - def do_commit_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - if is_prepared: - if recover: - connection.exec_driver_sql("ROLLBACK") - connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) - connection.exec_driver_sql("BEGIN") - self.do_rollback(connection.connection) - else: - self.do_commit(connection.connection) - - def do_recover_twophase(self, connection): - return connection.scalars( - sql.text("SELECT gid FROM pg_prepared_xacts") - ).all() - - def _get_default_schema_name(self, connection): - return connection.exec_driver_sql("select current_schema()").scalar() - - @reflection.cache - def has_schema(self, connection, schema, **kw): - query = select(pg_catalog.pg_namespace.c.nspname).where( - pg_catalog.pg_namespace.c.nspname == schema - ) - return bool(connection.scalar(query)) - - def _pg_class_filter_scope_schema( - self, query, schema, scope, pg_class_table=None - ): - if pg_class_table is None: - pg_class_table = pg_catalog.pg_class - query = query.join( - pg_catalog.pg_namespace, - pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, - ) - - if scope is ObjectScope.DEFAULT: - query = query.where(pg_class_table.c.relpersistence != "t") - elif scope is ObjectScope.TEMPORARY: - query = query.where(pg_class_table.c.relpersistence == "t") - - if schema is None: - query = query.where( - pg_catalog.pg_table_is_visible(pg_class_table.c.oid), - # ignore pg_catalog schema - pg_catalog.pg_namespace.c.nspname != "pg_catalog", - ) - else: - query = query.where(pg_catalog.pg_namespace.c.nspname == schema) - return query - - def _pg_class_relkind_condition(self, relkinds, pg_class_table=None): - if pg_class_table is None: - pg_class_table = pg_catalog.pg_class - # uses the any form instead of in otherwise postgresql complaings - # that 'IN could not convert type character to "char"' - return pg_class_table.c.relkind == sql.any_(_array.array(relkinds)) - - @lru_cache() - def _has_table_query(self, schema): - query = select(pg_catalog.pg_class.c.relname).where( - pg_catalog.pg_class.c.relname == bindparam("table_name"), - self._pg_class_relkind_condition( - pg_catalog.RELKINDS_ALL_TABLE_LIKE - ), - ) - return self._pg_class_filter_scope_schema( - query, schema, scope=ObjectScope.ANY - ) - - @reflection.cache - def has_table(self, connection, table_name, schema=None, **kw): - self._ensure_has_table_connection(connection) - query = self._has_table_query(schema) - return bool(connection.scalar(query, {"table_name": table_name})) - - @reflection.cache - def has_sequence(self, connection, sequence_name, schema=None, **kw): - query = select(pg_catalog.pg_class.c.relname).where( - pg_catalog.pg_class.c.relkind == "S", - pg_catalog.pg_class.c.relname == sequence_name, - ) - query = self._pg_class_filter_scope_schema( - query, schema, scope=ObjectScope.ANY - ) - return bool(connection.scalar(query)) - - @reflection.cache - def has_type(self, connection, type_name, schema=None, **kw): - query = ( - select(pg_catalog.pg_type.c.typname) - .join( - pg_catalog.pg_namespace, - pg_catalog.pg_namespace.c.oid - == pg_catalog.pg_type.c.typnamespace, - ) - .where(pg_catalog.pg_type.c.typname == type_name) - ) - if schema is None: - query = query.where( - pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), - # ignore pg_catalog schema - pg_catalog.pg_namespace.c.nspname != "pg_catalog", - ) - elif schema != "*": - query = query.where(pg_catalog.pg_namespace.c.nspname == schema) - - return bool(connection.scalar(query)) - - def _get_server_version_info(self, connection): - v = connection.exec_driver_sql("select pg_catalog.version()").scalar() - m = re.match( - r".*(?:PostgreSQL|EnterpriseDB) " - r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?", - v, - ) - if not m: - raise AssertionError( - "Could not determine version from string '%s'" % v - ) - return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) - - @reflection.cache - def get_table_oid(self, connection, table_name, schema=None, **kw): - """Fetch the oid for schema.table_name.""" - query = select(pg_catalog.pg_class.c.oid).where( - pg_catalog.pg_class.c.relname == table_name, - self._pg_class_relkind_condition( - pg_catalog.RELKINDS_ALL_TABLE_LIKE - ), - ) - query = self._pg_class_filter_scope_schema( - query, schema, scope=ObjectScope.ANY - ) - table_oid = connection.scalar(query) - if table_oid is None: - raise exc.NoSuchTableError( - f"{schema}.{table_name}" if schema else table_name - ) - return table_oid - - @reflection.cache - def get_schema_names(self, connection, **kw): - query = ( - select(pg_catalog.pg_namespace.c.nspname) - .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%")) - .order_by(pg_catalog.pg_namespace.c.nspname) - ) - return connection.scalars(query).all() - - def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope): - query = select(pg_catalog.pg_class.c.relname).where( - self._pg_class_relkind_condition(relkinds) - ) - query = self._pg_class_filter_scope_schema(query, schema, scope=scope) - return connection.scalars(query).all() - - @reflection.cache - def get_table_names(self, connection, schema=None, **kw): - return self._get_relnames_for_relkinds( - connection, - schema, - pg_catalog.RELKINDS_TABLE_NO_FOREIGN, - scope=ObjectScope.DEFAULT, - ) - - @reflection.cache - def get_temp_table_names(self, connection, **kw): - return self._get_relnames_for_relkinds( - connection, - schema=None, - relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN, - scope=ObjectScope.TEMPORARY, - ) - - @reflection.cache - def _get_foreign_table_names(self, connection, schema=None, **kw): - return self._get_relnames_for_relkinds( - connection, schema, relkinds=("f",), scope=ObjectScope.ANY - ) - - @reflection.cache - def get_view_names(self, connection, schema=None, **kw): - return self._get_relnames_for_relkinds( - connection, - schema, - pg_catalog.RELKINDS_VIEW, - scope=ObjectScope.DEFAULT, - ) - - @reflection.cache - def get_materialized_view_names(self, connection, schema=None, **kw): - return self._get_relnames_for_relkinds( - connection, - schema, - pg_catalog.RELKINDS_MAT_VIEW, - scope=ObjectScope.DEFAULT, - ) - - @reflection.cache - def get_temp_view_names(self, connection, schema=None, **kw): - return self._get_relnames_for_relkinds( - connection, - schema, - # NOTE: do not include temp materialzied views (that do not - # seem to be a thing at least up to version 14) - pg_catalog.RELKINDS_VIEW, - scope=ObjectScope.TEMPORARY, - ) - - @reflection.cache - def get_sequence_names(self, connection, schema=None, **kw): - return self._get_relnames_for_relkinds( - connection, schema, relkinds=("S",), scope=ObjectScope.ANY - ) - - @reflection.cache - def get_view_definition(self, connection, view_name, schema=None, **kw): - query = ( - select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid)) - .select_from(pg_catalog.pg_class) - .where( - pg_catalog.pg_class.c.relname == view_name, - self._pg_class_relkind_condition( - pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW - ), - ) - ) - query = self._pg_class_filter_scope_schema( - query, schema, scope=ObjectScope.ANY - ) - res = connection.scalar(query) - if res is None: - raise exc.NoSuchTableError( - f"{schema}.{view_name}" if schema else view_name - ) - else: - return res - - def _value_or_raise(self, data, table, schema): - try: - return dict(data)[(schema, table)] - except KeyError: - raise exc.NoSuchTableError( - f"{schema}.{table}" if schema else table - ) from None - - def _prepare_filter_names(self, filter_names): - if filter_names: - return True, {"filter_names": filter_names} - else: - return False, {} - - def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]: - if kind is ObjectKind.ANY: - return pg_catalog.RELKINDS_ALL_TABLE_LIKE - relkinds = () - if ObjectKind.TABLE in kind: - relkinds += pg_catalog.RELKINDS_TABLE - if ObjectKind.VIEW in kind: - relkinds += pg_catalog.RELKINDS_VIEW - if ObjectKind.MATERIALIZED_VIEW in kind: - relkinds += pg_catalog.RELKINDS_MAT_VIEW - return relkinds - - @reflection.cache - def get_columns(self, connection, table_name, schema=None, **kw): - data = self.get_multi_columns( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @lru_cache() - def _columns_query(self, schema, has_filter_names, scope, kind): - # NOTE: the query with the default and identity options scalar - # subquery is faster than trying to use outer joins for them - generated = ( - pg_catalog.pg_attribute.c.attgenerated.label("generated") - if self.server_version_info >= (12,) - else sql.null().label("generated") - ) - if self.server_version_info >= (10,): - # join lateral performs worse (~2x slower) than a scalar_subquery - identity = ( - select( - sql.func.json_build_object( - "always", - pg_catalog.pg_attribute.c.attidentity == "a", - "start", - pg_catalog.pg_sequence.c.seqstart, - "increment", - pg_catalog.pg_sequence.c.seqincrement, - "minvalue", - pg_catalog.pg_sequence.c.seqmin, - "maxvalue", - pg_catalog.pg_sequence.c.seqmax, - "cache", - pg_catalog.pg_sequence.c.seqcache, - "cycle", - pg_catalog.pg_sequence.c.seqcycle, - ) - ) - .select_from(pg_catalog.pg_sequence) - .where( - # attidentity != '' is required or it will reflect also - # serial columns as identity. - pg_catalog.pg_attribute.c.attidentity != "", - pg_catalog.pg_sequence.c.seqrelid - == sql.cast( - sql.cast( - pg_catalog.pg_get_serial_sequence( - sql.cast( - sql.cast( - pg_catalog.pg_attribute.c.attrelid, - REGCLASS, - ), - TEXT, - ), - pg_catalog.pg_attribute.c.attname, - ), - REGCLASS, - ), - OID, - ), - ) - .correlate(pg_catalog.pg_attribute) - .scalar_subquery() - .label("identity_options") - ) - else: - identity = sql.null().label("identity_options") - - # join lateral performs the same as scalar_subquery here - default = ( - select( - pg_catalog.pg_get_expr( - pg_catalog.pg_attrdef.c.adbin, - pg_catalog.pg_attrdef.c.adrelid, - ) - ) - .select_from(pg_catalog.pg_attrdef) - .where( - pg_catalog.pg_attrdef.c.adrelid - == pg_catalog.pg_attribute.c.attrelid, - pg_catalog.pg_attrdef.c.adnum - == pg_catalog.pg_attribute.c.attnum, - pg_catalog.pg_attribute.c.atthasdef, - ) - .correlate(pg_catalog.pg_attribute) - .scalar_subquery() - .label("default") - ) - relkinds = self._kind_to_relkinds(kind) - query = ( - select( - pg_catalog.pg_attribute.c.attname.label("name"), - pg_catalog.format_type( - pg_catalog.pg_attribute.c.atttypid, - pg_catalog.pg_attribute.c.atttypmod, - ).label("format_type"), - default, - pg_catalog.pg_attribute.c.attnotnull.label("not_null"), - pg_catalog.pg_class.c.relname.label("table_name"), - pg_catalog.pg_description.c.description.label("comment"), - generated, - identity, - ) - .select_from(pg_catalog.pg_class) - # NOTE: postgresql support table with no user column, meaning - # there is no row with pg_attribute.attnum > 0. use a left outer - # join to avoid filtering these tables. - .outerjoin( - pg_catalog.pg_attribute, - sql.and_( - pg_catalog.pg_class.c.oid - == pg_catalog.pg_attribute.c.attrelid, - pg_catalog.pg_attribute.c.attnum > 0, - ~pg_catalog.pg_attribute.c.attisdropped, - ), - ) - .outerjoin( - pg_catalog.pg_description, - sql.and_( - pg_catalog.pg_description.c.objoid - == pg_catalog.pg_attribute.c.attrelid, - pg_catalog.pg_description.c.objsubid - == pg_catalog.pg_attribute.c.attnum, - ), - ) - .where(self._pg_class_relkind_condition(relkinds)) - .order_by( - pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum - ) - ) - query = self._pg_class_filter_scope_schema(query, schema, scope=scope) - if has_filter_names: - query = query.where( - pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) - ) - return query - - def get_multi_columns( - self, connection, schema, filter_names, scope, kind, **kw - ): - has_filter_names, params = self._prepare_filter_names(filter_names) - query = self._columns_query(schema, has_filter_names, scope, kind) - rows = connection.execute(query, params).mappings() - - # dictionary with (name, ) if default search path or (schema, name) - # as keys - domains = { - ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d - for d in self._load_domains( - connection, schema="*", info_cache=kw.get("info_cache") - ) - } - - # dictionary with (name, ) if default search path or (schema, name) - # as keys - enums = dict( - ( - ((rec["name"],), rec) - if rec["visible"] - else ((rec["schema"], rec["name"]), rec) - ) - for rec in self._load_enums( - connection, schema="*", info_cache=kw.get("info_cache") - ) - ) - - columns = self._get_columns_info(rows, domains, enums, schema) - - return columns.items() - - _format_type_args_pattern = re.compile(r"\((.*)\)") - _format_type_args_delim = re.compile(r"\s*,\s*") - _format_array_spec_pattern = re.compile(r"((?:\[\])*)$") - - def _reflect_type( - self, - format_type: Optional[str], - domains: dict[str, ReflectedDomain], - enums: dict[str, ReflectedEnum], - type_description: str, - ) -> sqltypes.TypeEngine[Any]: - """ - Attempts to reconstruct a column type defined in ischema_names based - on the information available in the format_type. - - If the `format_type` cannot be associated with a known `ischema_names`, - it is treated as a reference to a known PostgreSQL named `ENUM` or - `DOMAIN` type. - """ - type_description = type_description or "unknown type" - if format_type is None: - util.warn( - "PostgreSQL format_type() returned NULL for %s" - % type_description - ) - return sqltypes.NULLTYPE - - attype_args_match = self._format_type_args_pattern.search(format_type) - if attype_args_match and attype_args_match.group(1): - attype_args = self._format_type_args_delim.split( - attype_args_match.group(1) - ) - else: - attype_args = () - - match_array_dim = self._format_array_spec_pattern.search(format_type) - # Each "[]" in array specs corresponds to an array dimension - array_dim = len(match_array_dim.group(1) or "") // 2 - - # Remove all parameters and array specs from format_type to obtain an - # ischema_name candidate - attype = self._format_type_args_pattern.sub("", format_type) - attype = self._format_array_spec_pattern.sub("", attype) - - schema_type = self.ischema_names.get(attype.lower(), None) - args, kwargs = (), {} - - if attype == "numeric": - if len(attype_args) == 2: - precision, scale = map(int, attype_args) - args = (precision, scale) - - elif attype == "double precision": - args = (53,) - - elif attype == "integer": - args = () - - elif attype in ("timestamp with time zone", "time with time zone"): - kwargs["timezone"] = True - if len(attype_args) == 1: - kwargs["precision"] = int(attype_args[0]) - - elif attype in ( - "timestamp without time zone", - "time without time zone", - "time", - ): - kwargs["timezone"] = False - if len(attype_args) == 1: - kwargs["precision"] = int(attype_args[0]) - - elif attype == "bit varying": - kwargs["varying"] = True - if len(attype_args) == 1: - charlen = int(attype_args[0]) - args = (charlen,) - - elif attype.startswith("interval"): - schema_type = INTERVAL - - field_match = re.match(r"interval (.+)", attype) - if field_match: - kwargs["fields"] = field_match.group(1) - - if len(attype_args) == 1: - kwargs["precision"] = int(attype_args[0]) - - else: - enum_or_domain_key = tuple(util.quoted_token_parser(attype)) - - if enum_or_domain_key in enums: - schema_type = ENUM - enum = enums[enum_or_domain_key] - - args = tuple(enum["labels"]) - kwargs["name"] = enum["name"] - - if not enum["visible"]: - kwargs["schema"] = enum["schema"] - args = tuple(enum["labels"]) - elif enum_or_domain_key in domains: - schema_type = DOMAIN - domain = domains[enum_or_domain_key] - - data_type = self._reflect_type( - domain["type"], - domains, - enums, - type_description="DOMAIN '%s'" % domain["name"], - ) - args = (domain["name"], data_type) - - kwargs["collation"] = domain["collation"] - kwargs["default"] = domain["default"] - kwargs["not_null"] = not domain["nullable"] - kwargs["create_type"] = False - - if domain["constraints"]: - # We only support a single constraint - check_constraint = domain["constraints"][0] - - kwargs["constraint_name"] = check_constraint["name"] - kwargs["check"] = check_constraint["check"] - - if not domain["visible"]: - kwargs["schema"] = domain["schema"] - - else: - try: - charlen = int(attype_args[0]) - args = (charlen, *attype_args[1:]) - except (ValueError, IndexError): - args = attype_args - - if not schema_type: - util.warn( - "Did not recognize type '%s' of %s" - % (attype, type_description) - ) - return sqltypes.NULLTYPE - - data_type = schema_type(*args, **kwargs) - if array_dim >= 1: - # postgres does not preserve dimensionality or size of array types. - data_type = _array.ARRAY(data_type) - - return data_type - - def _get_columns_info(self, rows, domains, enums, schema): - columns = defaultdict(list) - for row_dict in rows: - # ensure that each table has an entry, even if it has no columns - if row_dict["name"] is None: - columns[(schema, row_dict["table_name"])] = ( - ReflectionDefaults.columns() - ) - continue - table_cols = columns[(schema, row_dict["table_name"])] - - coltype = self._reflect_type( - row_dict["format_type"], - domains, - enums, - type_description="column '%s'" % row_dict["name"], - ) - - default = row_dict["default"] - name = row_dict["name"] - generated = row_dict["generated"] - nullable = not row_dict["not_null"] - - if isinstance(coltype, DOMAIN): - if not default: - # domain can override the default value but - # cant set it to None - if coltype.default is not None: - default = coltype.default - - nullable = nullable and not coltype.not_null - - identity = row_dict["identity_options"] - - # If a zero byte or blank string depending on driver (is also - # absent for older PG versions), then not a generated column. - # Otherwise, s = stored. (Other values might be added in the - # future.) - if generated not in (None, "", b"\x00"): - computed = dict( - sqltext=default, persisted=generated in ("s", b"s") - ) - default = None - else: - computed = None - - # adjust the default value - autoincrement = False - if default is not None: - match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) - if match is not None: - if issubclass(coltype._type_affinity, sqltypes.Integer): - autoincrement = True - # the default is related to a Sequence - if "." not in match.group(2) and schema is not None: - # unconditionally quote the schema name. this could - # later be enhanced to obey quoting rules / - # "quote schema" - default = ( - match.group(1) - + ('"%s"' % schema) - + "." - + match.group(2) - + match.group(3) - ) - - column_info = { - "name": name, - "type": coltype, - "nullable": nullable, - "default": default, - "autoincrement": autoincrement or identity is not None, - "comment": row_dict["comment"], - } - if computed is not None: - column_info["computed"] = computed - if identity is not None: - column_info["identity"] = identity - - table_cols.append(column_info) - - return columns - - @lru_cache() - def _table_oids_query(self, schema, has_filter_names, scope, kind): - relkinds = self._kind_to_relkinds(kind) - oid_q = select( - pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname - ).where(self._pg_class_relkind_condition(relkinds)) - oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope) - - if has_filter_names: - oid_q = oid_q.where( - pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) - ) - return oid_q - - @reflection.flexi_cache( - ("schema", InternalTraversal.dp_string), - ("filter_names", InternalTraversal.dp_string_list), - ("kind", InternalTraversal.dp_plain_obj), - ("scope", InternalTraversal.dp_plain_obj), - ) - def _get_table_oids( - self, connection, schema, filter_names, scope, kind, **kw - ): - has_filter_names, params = self._prepare_filter_names(filter_names) - oid_q = self._table_oids_query(schema, has_filter_names, scope, kind) - result = connection.execute(oid_q, params) - return result.all() - - @lru_cache() - def _constraint_query(self, is_unique): - con_sq = ( - select( - pg_catalog.pg_constraint.c.conrelid, - pg_catalog.pg_constraint.c.conname, - pg_catalog.pg_constraint.c.conindid, - sql.func.unnest(pg_catalog.pg_constraint.c.conkey).label( - "attnum" - ), - sql.func.generate_subscripts( - pg_catalog.pg_constraint.c.conkey, 1 - ).label("ord"), - pg_catalog.pg_description.c.description, - ) - .outerjoin( - pg_catalog.pg_description, - pg_catalog.pg_description.c.objoid - == pg_catalog.pg_constraint.c.oid, - ) - .where( - pg_catalog.pg_constraint.c.contype == bindparam("contype"), - pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")), - ) - .subquery("con") - ) - - attr_sq = ( - select( - con_sq.c.conrelid, - con_sq.c.conname, - con_sq.c.conindid, - con_sq.c.description, - con_sq.c.ord, - pg_catalog.pg_attribute.c.attname, - ) - .select_from(pg_catalog.pg_attribute) - .join( - con_sq, - sql.and_( - pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum, - pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid, - ), - ) - .where( - # NOTE: restate the condition here, since pg15 otherwise - # seems to get confused on pscopg2 sometimes, doing - # a sequential scan of pg_attribute. - # The condition in the con_sq subquery is not actually needed - # in pg15, but it may be needed in older versions. Keeping it - # does not seems to have any inpact in any case. - con_sq.c.conrelid.in_(bindparam("oids")) - ) - .subquery("attr") - ) - - constraint_query = ( - select( - attr_sq.c.conrelid, - sql.func.array_agg( - # NOTE: cast since some postgresql derivatives may - # not support array_agg on the name type - aggregate_order_by( - attr_sq.c.attname.cast(TEXT), attr_sq.c.ord - ) - ).label("cols"), - attr_sq.c.conname, - sql.func.min(attr_sq.c.description).label("description"), - ) - .group_by(attr_sq.c.conrelid, attr_sq.c.conname) - .order_by(attr_sq.c.conrelid, attr_sq.c.conname) - ) - - if is_unique: - if self.server_version_info >= (15,): - constraint_query = constraint_query.join( - pg_catalog.pg_index, - attr_sq.c.conindid == pg_catalog.pg_index.c.indexrelid, - ).add_columns( - sql.func.bool_and( - pg_catalog.pg_index.c.indnullsnotdistinct - ).label("indnullsnotdistinct") - ) - else: - constraint_query = constraint_query.add_columns( - sql.false().label("indnullsnotdistinct") - ) - else: - constraint_query = constraint_query.add_columns( - sql.null().label("extra") - ) - return constraint_query - - def _reflect_constraint( - self, connection, contype, schema, filter_names, scope, kind, **kw - ): - # used to reflect primary and unique constraint - table_oids = self._get_table_oids( - connection, schema, filter_names, scope, kind, **kw - ) - batches = list(table_oids) - is_unique = contype == "u" - - while batches: - batch = batches[0:3000] - batches[0:3000] = [] - - result = connection.execute( - self._constraint_query(is_unique), - {"oids": [r[0] for r in batch], "contype": contype}, - ) - - result_by_oid = defaultdict(list) - for oid, cols, constraint_name, comment, extra in result: - result_by_oid[oid].append( - (cols, constraint_name, comment, extra) - ) - - for oid, tablename in batch: - for_oid = result_by_oid.get(oid, ()) - if for_oid: - for cols, constraint, comment, extra in for_oid: - if is_unique: - yield tablename, cols, constraint, comment, { - "nullsnotdistinct": extra - } - else: - yield tablename, cols, constraint, comment, None - else: - yield tablename, None, None, None, None - - @reflection.cache - def get_pk_constraint(self, connection, table_name, schema=None, **kw): - data = self.get_multi_pk_constraint( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - def get_multi_pk_constraint( - self, connection, schema, filter_names, scope, kind, **kw - ): - result = self._reflect_constraint( - connection, "p", schema, filter_names, scope, kind, **kw - ) - - # only a single pk can be present for each table. Return an entry - # even if a table has no primary key - default = ReflectionDefaults.pk_constraint - return ( - ( - (schema, table_name), - ( - { - "constrained_columns": [] if cols is None else cols, - "name": pk_name, - "comment": comment, - } - if pk_name is not None - else default() - ), - ) - for table_name, cols, pk_name, comment, _ in result - ) - - @reflection.cache - def get_foreign_keys( - self, - connection, - table_name, - schema=None, - postgresql_ignore_search_path=False, - **kw, - ): - data = self.get_multi_foreign_keys( - connection, - schema=schema, - filter_names=[table_name], - postgresql_ignore_search_path=postgresql_ignore_search_path, - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @lru_cache() - def _foreing_key_query(self, schema, has_filter_names, scope, kind): - pg_class_ref = pg_catalog.pg_class.alias("cls_ref") - pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref") - relkinds = self._kind_to_relkinds(kind) - query = ( - select( - pg_catalog.pg_class.c.relname, - pg_catalog.pg_constraint.c.conname, - # NOTE: avoid calling pg_get_constraintdef when not needed - # to speed up the query - sql.case( - ( - pg_catalog.pg_constraint.c.oid.is_not(None), - pg_catalog.pg_get_constraintdef( - pg_catalog.pg_constraint.c.oid, True - ), - ), - else_=None, - ), - pg_namespace_ref.c.nspname, - pg_catalog.pg_description.c.description, - ) - .select_from(pg_catalog.pg_class) - .outerjoin( - pg_catalog.pg_constraint, - sql.and_( - pg_catalog.pg_class.c.oid - == pg_catalog.pg_constraint.c.conrelid, - pg_catalog.pg_constraint.c.contype == "f", - ), - ) - .outerjoin( - pg_class_ref, - pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid, - ) - .outerjoin( - pg_namespace_ref, - pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid, - ) - .outerjoin( - pg_catalog.pg_description, - pg_catalog.pg_description.c.objoid - == pg_catalog.pg_constraint.c.oid, - ) - .order_by( - pg_catalog.pg_class.c.relname, - pg_catalog.pg_constraint.c.conname, - ) - .where(self._pg_class_relkind_condition(relkinds)) - ) - query = self._pg_class_filter_scope_schema(query, schema, scope) - if has_filter_names: - query = query.where( - pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) - ) - return query - - @util.memoized_property - def _fk_regex_pattern(self): - # optionally quoted token - qtoken = '(?:"[^"]+"|[A-Za-z0-9_]+?)' - - # https://www.postgresql.org/docs/current/static/sql-createtable.html - return re.compile( - r"FOREIGN KEY \((.*?)\) " - rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501 - r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" - r"[\s]?(ON UPDATE " - r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" - r"[\s]?(ON DELETE " - r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" - r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" - r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" - ) - - def get_multi_foreign_keys( - self, - connection, - schema, - filter_names, - scope, - kind, - postgresql_ignore_search_path=False, - **kw, - ): - preparer = self.identifier_preparer - - has_filter_names, params = self._prepare_filter_names(filter_names) - query = self._foreing_key_query(schema, has_filter_names, scope, kind) - result = connection.execute(query, params) - - FK_REGEX = self._fk_regex_pattern - - fkeys = defaultdict(list) - default = ReflectionDefaults.foreign_keys - for table_name, conname, condef, conschema, comment in result: - # ensure that each table has an entry, even if it has - # no foreign keys - if conname is None: - fkeys[(schema, table_name)] = default() - continue - table_fks = fkeys[(schema, table_name)] - m = re.search(FK_REGEX, condef).groups() - - ( - constrained_columns, - referred_schema, - referred_table, - referred_columns, - _, - match, - _, - onupdate, - _, - ondelete, - deferrable, - _, - initially, - ) = m - - if deferrable is not None: - deferrable = True if deferrable == "DEFERRABLE" else False - constrained_columns = [ - preparer._unquote_identifier(x) - for x in re.split(r"\s*,\s*", constrained_columns) - ] - - if postgresql_ignore_search_path: - # when ignoring search path, we use the actual schema - # provided it isn't the "default" schema - if conschema != self.default_schema_name: - referred_schema = conschema - else: - referred_schema = schema - elif referred_schema: - # referred_schema is the schema that we regexp'ed from - # pg_get_constraintdef(). If the schema is in the search - # path, pg_get_constraintdef() will give us None. - referred_schema = preparer._unquote_identifier(referred_schema) - elif schema is not None and schema == conschema: - # If the actual schema matches the schema of the table - # we're reflecting, then we will use that. - referred_schema = schema - - referred_table = preparer._unquote_identifier(referred_table) - referred_columns = [ - preparer._unquote_identifier(x) - for x in re.split(r"\s*,\s", referred_columns) - ] - options = { - k: v - for k, v in [ - ("onupdate", onupdate), - ("ondelete", ondelete), - ("initially", initially), - ("deferrable", deferrable), - ("match", match), - ] - if v is not None and v != "NO ACTION" - } - fkey_d = { - "name": conname, - "constrained_columns": constrained_columns, - "referred_schema": referred_schema, - "referred_table": referred_table, - "referred_columns": referred_columns, - "options": options, - "comment": comment, - } - table_fks.append(fkey_d) - return fkeys.items() - - @reflection.cache - def get_indexes(self, connection, table_name, schema=None, **kw): - data = self.get_multi_indexes( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @util.memoized_property - def _index_query(self): - pg_class_index = pg_catalog.pg_class.alias("cls_idx") - # NOTE: repeating oids clause improve query performance - - # subquery to get the columns - idx_sq = ( - select( - pg_catalog.pg_index.c.indexrelid, - pg_catalog.pg_index.c.indrelid, - sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), - sql.func.generate_subscripts( - pg_catalog.pg_index.c.indkey, 1 - ).label("ord"), - ) - .where( - ~pg_catalog.pg_index.c.indisprimary, - pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), - ) - .subquery("idx") - ) - - attr_sq = ( - select( - idx_sq.c.indexrelid, - idx_sq.c.indrelid, - idx_sq.c.ord, - # NOTE: always using pg_get_indexdef is too slow so just - # invoke when the element is an expression - sql.case( - ( - idx_sq.c.attnum == 0, - pg_catalog.pg_get_indexdef( - idx_sq.c.indexrelid, idx_sq.c.ord + 1, True - ), - ), - # NOTE: need to cast this since attname is of type "name" - # that's limited to 63 bytes, while pg_get_indexdef - # returns "text" so its output may get cut - else_=pg_catalog.pg_attribute.c.attname.cast(TEXT), - ).label("element"), - (idx_sq.c.attnum == 0).label("is_expr"), - ) - .select_from(idx_sq) - .outerjoin( - # do not remove rows where idx_sq.c.attnum is 0 - pg_catalog.pg_attribute, - sql.and_( - pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum, - pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid, - ), - ) - .where(idx_sq.c.indrelid.in_(bindparam("oids"))) - .subquery("idx_attr") - ) - - cols_sq = ( - select( - attr_sq.c.indexrelid, - sql.func.min(attr_sq.c.indrelid), - sql.func.array_agg( - aggregate_order_by(attr_sq.c.element, attr_sq.c.ord) - ).label("elements"), - sql.func.array_agg( - aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord) - ).label("elements_is_expr"), - ) - .group_by(attr_sq.c.indexrelid) - .subquery("idx_cols") - ) - - if self.server_version_info >= (11, 0): - indnkeyatts = pg_catalog.pg_index.c.indnkeyatts - else: - indnkeyatts = sql.null().label("indnkeyatts") - - if self.server_version_info >= (15,): - nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct - else: - nulls_not_distinct = sql.false().label("indnullsnotdistinct") - - return ( - select( - pg_catalog.pg_index.c.indrelid, - pg_class_index.c.relname.label("relname_index"), - pg_catalog.pg_index.c.indisunique, - pg_catalog.pg_constraint.c.conrelid.is_not(None).label( - "has_constraint" - ), - pg_catalog.pg_index.c.indoption, - pg_class_index.c.reloptions, - pg_catalog.pg_am.c.amname, - # NOTE: pg_get_expr is very fast so this case has almost no - # performance impact - sql.case( - ( - pg_catalog.pg_index.c.indpred.is_not(None), - pg_catalog.pg_get_expr( - pg_catalog.pg_index.c.indpred, - pg_catalog.pg_index.c.indrelid, - ), - ), - else_=None, - ).label("filter_definition"), - indnkeyatts, - nulls_not_distinct, - cols_sq.c.elements, - cols_sq.c.elements_is_expr, - ) - .select_from(pg_catalog.pg_index) - .where( - pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), - ~pg_catalog.pg_index.c.indisprimary, - ) - .join( - pg_class_index, - pg_catalog.pg_index.c.indexrelid == pg_class_index.c.oid, - ) - .join( - pg_catalog.pg_am, - pg_class_index.c.relam == pg_catalog.pg_am.c.oid, - ) - .outerjoin( - cols_sq, - pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid, - ) - .outerjoin( - pg_catalog.pg_constraint, - sql.and_( - pg_catalog.pg_index.c.indrelid - == pg_catalog.pg_constraint.c.conrelid, - pg_catalog.pg_index.c.indexrelid - == pg_catalog.pg_constraint.c.conindid, - pg_catalog.pg_constraint.c.contype - == sql.any_(_array.array(("p", "u", "x"))), - ), - ) - .order_by(pg_catalog.pg_index.c.indrelid, pg_class_index.c.relname) - ) - - def get_multi_indexes( - self, connection, schema, filter_names, scope, kind, **kw - ): - table_oids = self._get_table_oids( - connection, schema, filter_names, scope, kind, **kw - ) - - indexes = defaultdict(list) - default = ReflectionDefaults.indexes - - batches = list(table_oids) - - while batches: - batch = batches[0:3000] - batches[0:3000] = [] - - result = connection.execute( - self._index_query, {"oids": [r[0] for r in batch]} - ).mappings() - - result_by_oid = defaultdict(list) - for row_dict in result: - result_by_oid[row_dict["indrelid"]].append(row_dict) - - for oid, table_name in batch: - if oid not in result_by_oid: - # ensure that each table has an entry, even if reflection - # is skipped because not supported - indexes[(schema, table_name)] = default() - continue - - for row in result_by_oid[oid]: - index_name = row["relname_index"] - - table_indexes = indexes[(schema, table_name)] - - all_elements = row["elements"] - all_elements_is_expr = row["elements_is_expr"] - indnkeyatts = row["indnkeyatts"] - # "The number of key columns in the index, not counting any - # included columns, which are merely stored and do not - # participate in the index semantics" - if indnkeyatts and len(all_elements) > indnkeyatts: - # this is a "covering index" which has INCLUDE columns - # as well as regular index columns - inc_cols = all_elements[indnkeyatts:] - idx_elements = all_elements[:indnkeyatts] - idx_elements_is_expr = all_elements_is_expr[ - :indnkeyatts - ] - # postgresql does not support expression on included - # columns as of v14: "ERROR: expressions are not - # supported in included columns". - assert all( - not is_expr - for is_expr in all_elements_is_expr[indnkeyatts:] - ) - else: - idx_elements = all_elements - idx_elements_is_expr = all_elements_is_expr - inc_cols = [] - - index = {"name": index_name, "unique": row["indisunique"]} - if any(idx_elements_is_expr): - index["column_names"] = [ - None if is_expr else expr - for expr, is_expr in zip( - idx_elements, idx_elements_is_expr - ) - ] - index["expressions"] = idx_elements - else: - index["column_names"] = idx_elements - - sorting = {} - for col_index, col_flags in enumerate(row["indoption"]): - col_sorting = () - # try to set flags only if they differ from PG - # defaults... - if col_flags & 0x01: - col_sorting += ("desc",) - if not (col_flags & 0x02): - col_sorting += ("nulls_last",) - else: - if col_flags & 0x02: - col_sorting += ("nulls_first",) - if col_sorting: - sorting[idx_elements[col_index]] = col_sorting - if sorting: - index["column_sorting"] = sorting - if row["has_constraint"]: - index["duplicates_constraint"] = index_name - - dialect_options = {} - if row["reloptions"]: - dialect_options["postgresql_with"] = dict( - [option.split("=") for option in row["reloptions"]] - ) - # it *might* be nice to include that this is 'btree' in the - # reflection info. But we don't want an Index object - # to have a ``postgresql_using`` in it that is just the - # default, so for the moment leaving this out. - amname = row["amname"] - if amname != "btree": - dialect_options["postgresql_using"] = row["amname"] - if row["filter_definition"]: - dialect_options["postgresql_where"] = row[ - "filter_definition" - ] - if self.server_version_info >= (11,): - # NOTE: this is legacy, this is part of - # dialect_options now as of #7382 - index["include_columns"] = inc_cols - dialect_options["postgresql_include"] = inc_cols - if row["indnullsnotdistinct"]: - # the default is False, so ignore it. - dialect_options["postgresql_nulls_not_distinct"] = row[ - "indnullsnotdistinct" - ] - - if dialect_options: - index["dialect_options"] = dialect_options - - table_indexes.append(index) - return indexes.items() - - @reflection.cache - def get_unique_constraints( - self, connection, table_name, schema=None, **kw - ): - data = self.get_multi_unique_constraints( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - def get_multi_unique_constraints( - self, - connection, - schema, - filter_names, - scope, - kind, - **kw, - ): - result = self._reflect_constraint( - connection, "u", schema, filter_names, scope, kind, **kw - ) - - # each table can have multiple unique constraints - uniques = defaultdict(list) - default = ReflectionDefaults.unique_constraints - for table_name, cols, con_name, comment, options in result: - # ensure a list is created for each table. leave it empty if - # the table has no unique cosntraint - if con_name is None: - uniques[(schema, table_name)] = default() - continue - - uc_dict = { - "column_names": cols, - "name": con_name, - "comment": comment, - } - if options: - if options["nullsnotdistinct"]: - uc_dict["dialect_options"] = { - "postgresql_nulls_not_distinct": options[ - "nullsnotdistinct" - ] - } - - uniques[(schema, table_name)].append(uc_dict) - return uniques.items() - - @reflection.cache - def get_table_comment(self, connection, table_name, schema=None, **kw): - data = self.get_multi_table_comment( - connection, - schema, - [table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @lru_cache() - def _comment_query(self, schema, has_filter_names, scope, kind): - relkinds = self._kind_to_relkinds(kind) - query = ( - select( - pg_catalog.pg_class.c.relname, - pg_catalog.pg_description.c.description, - ) - .select_from(pg_catalog.pg_class) - .outerjoin( - pg_catalog.pg_description, - sql.and_( - pg_catalog.pg_class.c.oid - == pg_catalog.pg_description.c.objoid, - pg_catalog.pg_description.c.objsubid == 0, - ), - ) - .where(self._pg_class_relkind_condition(relkinds)) - ) - query = self._pg_class_filter_scope_schema(query, schema, scope) - if has_filter_names: - query = query.where( - pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) - ) - return query - - def get_multi_table_comment( - self, connection, schema, filter_names, scope, kind, **kw - ): - has_filter_names, params = self._prepare_filter_names(filter_names) - query = self._comment_query(schema, has_filter_names, scope, kind) - result = connection.execute(query, params) - - default = ReflectionDefaults.table_comment - return ( - ( - (schema, table), - {"text": comment} if comment is not None else default(), - ) - for table, comment in result - ) - - @reflection.cache - def get_check_constraints(self, connection, table_name, schema=None, **kw): - data = self.get_multi_check_constraints( - connection, - schema, - [table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @lru_cache() - def _check_constraint_query(self, schema, has_filter_names, scope, kind): - relkinds = self._kind_to_relkinds(kind) - query = ( - select( - pg_catalog.pg_class.c.relname, - pg_catalog.pg_constraint.c.conname, - # NOTE: avoid calling pg_get_constraintdef when not needed - # to speed up the query - sql.case( - ( - pg_catalog.pg_constraint.c.oid.is_not(None), - pg_catalog.pg_get_constraintdef( - pg_catalog.pg_constraint.c.oid, True - ), - ), - else_=None, - ), - pg_catalog.pg_description.c.description, - ) - .select_from(pg_catalog.pg_class) - .outerjoin( - pg_catalog.pg_constraint, - sql.and_( - pg_catalog.pg_class.c.oid - == pg_catalog.pg_constraint.c.conrelid, - pg_catalog.pg_constraint.c.contype == "c", - ), - ) - .outerjoin( - pg_catalog.pg_description, - pg_catalog.pg_description.c.objoid - == pg_catalog.pg_constraint.c.oid, - ) - .order_by( - pg_catalog.pg_class.c.relname, - pg_catalog.pg_constraint.c.conname, - ) - .where(self._pg_class_relkind_condition(relkinds)) - ) - query = self._pg_class_filter_scope_schema(query, schema, scope) - if has_filter_names: - query = query.where( - pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) - ) - return query - - def get_multi_check_constraints( - self, connection, schema, filter_names, scope, kind, **kw - ): - has_filter_names, params = self._prepare_filter_names(filter_names) - query = self._check_constraint_query( - schema, has_filter_names, scope, kind - ) - result = connection.execute(query, params) - - check_constraints = defaultdict(list) - default = ReflectionDefaults.check_constraints - for table_name, check_name, src, comment in result: - # only two cases for check_name and src: both null or both defined - if check_name is None and src is None: - check_constraints[(schema, table_name)] = default() - continue - # samples: - # "CHECK (((a > 1) AND (a < 5)))" - # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))" - # "CHECK (((a > 1) AND (a < 5))) NOT VALID" - # "CHECK (some_boolean_function(a))" - # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)" - # "CHECK (a NOT NULL) NO INHERIT" - # "CHECK (a NOT NULL) NO INHERIT NOT VALID" - - m = re.match( - r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$", - src, - flags=re.DOTALL, - ) - if not m: - util.warn("Could not parse CHECK constraint text: %r" % src) - sqltext = "" - else: - sqltext = re.compile( - r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL - ).sub(r"\1", m.group(1)) - entry = { - "name": check_name, - "sqltext": sqltext, - "comment": comment, - } - if m: - do = {} - if " NOT VALID" in m.groups(): - do["not_valid"] = True - if " NO INHERIT" in m.groups(): - do["no_inherit"] = True - if do: - entry["dialect_options"] = do - - check_constraints[(schema, table_name)].append(entry) - return check_constraints.items() - - def _pg_type_filter_schema(self, query, schema): - if schema is None: - query = query.where( - pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), - # ignore pg_catalog schema - pg_catalog.pg_namespace.c.nspname != "pg_catalog", - ) - elif schema != "*": - query = query.where(pg_catalog.pg_namespace.c.nspname == schema) - return query - - @lru_cache() - def _enum_query(self, schema): - lbl_agg_sq = ( - select( - pg_catalog.pg_enum.c.enumtypid, - sql.func.array_agg( - aggregate_order_by( - # NOTE: cast since some postgresql derivatives may - # not support array_agg on the name type - pg_catalog.pg_enum.c.enumlabel.cast(TEXT), - pg_catalog.pg_enum.c.enumsortorder, - ) - ).label("labels"), - ) - .group_by(pg_catalog.pg_enum.c.enumtypid) - .subquery("lbl_agg") - ) - - query = ( - select( - pg_catalog.pg_type.c.typname.label("name"), - pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( - "visible" - ), - pg_catalog.pg_namespace.c.nspname.label("schema"), - lbl_agg_sq.c.labels.label("labels"), - ) - .join( - pg_catalog.pg_namespace, - pg_catalog.pg_namespace.c.oid - == pg_catalog.pg_type.c.typnamespace, - ) - .outerjoin( - lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid - ) - .where(pg_catalog.pg_type.c.typtype == "e") - .order_by( - pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname - ) - ) - - return self._pg_type_filter_schema(query, schema) - - @reflection.cache - def _load_enums(self, connection, schema=None, **kw): - if not self.supports_native_enum: - return [] - - result = connection.execute(self._enum_query(schema)) - - enums = [] - for name, visible, schema, labels in result: - enums.append( - { - "name": name, - "schema": schema, - "visible": visible, - "labels": [] if labels is None else labels, - } - ) - return enums - - @lru_cache() - def _domain_query(self, schema): - con_sq = ( - select( - pg_catalog.pg_constraint.c.contypid, - sql.func.array_agg( - pg_catalog.pg_get_constraintdef( - pg_catalog.pg_constraint.c.oid, True - ) - ).label("condefs"), - sql.func.array_agg( - # NOTE: cast since some postgresql derivatives may - # not support array_agg on the name type - pg_catalog.pg_constraint.c.conname.cast(TEXT) - ).label("connames"), - ) - # The domain this constraint is on; zero if not a domain constraint - .where(pg_catalog.pg_constraint.c.contypid != 0) - .group_by(pg_catalog.pg_constraint.c.contypid) - .subquery("domain_constraints") - ) - - query = ( - select( - pg_catalog.pg_type.c.typname.label("name"), - pg_catalog.format_type( - pg_catalog.pg_type.c.typbasetype, - pg_catalog.pg_type.c.typtypmod, - ).label("attype"), - (~pg_catalog.pg_type.c.typnotnull).label("nullable"), - pg_catalog.pg_type.c.typdefault.label("default"), - pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( - "visible" - ), - pg_catalog.pg_namespace.c.nspname.label("schema"), - con_sq.c.condefs, - con_sq.c.connames, - pg_catalog.pg_collation.c.collname, - ) - .join( - pg_catalog.pg_namespace, - pg_catalog.pg_namespace.c.oid - == pg_catalog.pg_type.c.typnamespace, - ) - .outerjoin( - pg_catalog.pg_collation, - pg_catalog.pg_type.c.typcollation - == pg_catalog.pg_collation.c.oid, - ) - .outerjoin( - con_sq, - pg_catalog.pg_type.c.oid == con_sq.c.contypid, - ) - .where(pg_catalog.pg_type.c.typtype == "d") - .order_by( - pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname - ) - ) - return self._pg_type_filter_schema(query, schema) - - @reflection.cache - def _load_domains(self, connection, schema=None, **kw): - result = connection.execute(self._domain_query(schema)) - - domains: List[ReflectedDomain] = [] - for domain in result.mappings(): - # strip (30) from character varying(30) - attype = re.search(r"([^\(]+)", domain["attype"]).group(1) - constraints: List[ReflectedDomainConstraint] = [] - if domain["connames"]: - # When a domain has multiple CHECK constraints, they will - # be tested in alphabetical order by name. - sorted_constraints = sorted( - zip(domain["connames"], domain["condefs"]), - key=lambda t: t[0], - ) - for name, def_ in sorted_constraints: - # constraint is in the form "CHECK (expression)". - # remove "CHECK (" and the tailing ")". - check = def_[7:-1] - constraints.append({"name": name, "check": check}) - - domain_rec: ReflectedDomain = { - "name": domain["name"], - "schema": domain["schema"], - "visible": domain["visible"], - "type": attype, - "nullable": domain["nullable"], - "default": domain["default"], - "constraints": constraints, - "collation": domain["collname"], - } - domains.append(domain_rec) - - return domains - - def _set_backslash_escapes(self, connection): - # this method is provided as an override hook for descendant - # dialects (e.g. Redshift), so removing it may break them - std_string = connection.exec_driver_sql( - "show standard_conforming_strings" - ).scalar() - self._backslash_escapes = std_string == "off" |