# dialects/postgresql/base.py # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors # # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php # mypy: ignore-errors r""" .. dialect:: postgresql :name: PostgreSQL :full_support: 12, 13, 14, 15 :normal_support: 9.6+ :best_effort: 9+ .. _postgresql_sequences: Sequences/SERIAL/IDENTITY ------------------------- PostgreSQL supports sequences, and SQLAlchemy uses these as the default means of creating new primary key values for integer-based primary key columns. When creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for integer-based primary key columns, which generates a sequence and server side default corresponding to the column. To specify a specific named sequence to be used for primary key generation, use the :func:`~sqlalchemy.schema.Sequence` construct:: Table( "sometable", metadata, Column( "id", Integer, Sequence("some_id_seq", start=1), primary_key=True ) ) When SQLAlchemy issues a single INSERT statement, to fulfill the contract of having the "last insert identifier" available, a RETURNING clause is added to the INSERT statement which specifies the primary key columns should be returned after the statement completes. The RETURNING functionality only takes place if PostgreSQL 8.2 or later is in use. As a fallback approach, the sequence, whether specified explicitly or implicitly via ``SERIAL``, is executed independently beforehand, the returned value to be used in the subsequent insert. Note that when an :func:`~sqlalchemy.sql.expression.insert()` construct is executed using "executemany" semantics, the "last inserted identifier" functionality does not apply; no RETURNING clause is emitted nor is the sequence pre-executed in this case. PostgreSQL 10 and above IDENTITY columns ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use of SERIAL. The :class:`_schema.Identity` construct in a :class:`_schema.Column` can be used to control its behavior:: from sqlalchemy import Table, Column, MetaData, Integer, Computed metadata = MetaData() data = Table( "data", metadata, Column( 'id', Integer, Identity(start=42, cycle=True), primary_key=True ), Column('data', String) ) The CREATE TABLE for the above :class:`_schema.Table` object would be: .. sourcecode:: sql CREATE TABLE data ( id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE), data VARCHAR, PRIMARY KEY (id) ) .. versionchanged:: 1.4 Added :class:`_schema.Identity` construct in a :class:`_schema.Column` to specify the option of an autoincrementing column. .. note:: Previous versions of SQLAlchemy did not have built-in support for rendering of IDENTITY, and could use the following compilation hook to replace occurrences of SERIAL with IDENTITY:: from sqlalchemy.schema import CreateColumn from sqlalchemy.ext.compiler import compiles @compiles(CreateColumn, 'postgresql') def use_identity(element, compiler, **kw): text = compiler.visit_create_column(element, **kw) text = text.replace( "SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY" ) return text Using the above, a table such as:: t = Table( 't', m, Column('id', Integer, primary_key=True), Column('data', String) ) Will generate on the backing database as:: CREATE TABLE t ( id INT GENERATED BY DEFAULT AS IDENTITY, data VARCHAR, PRIMARY KEY (id) ) .. _postgresql_ss_cursors: Server Side Cursors ------------------- Server-side cursor support is available for the psycopg2, asyncpg dialects and may also be available in others. Server side cursors are enabled on a per-statement basis by using the :paramref:`.Connection.execution_options.stream_results` connection execution option:: with engine.connect() as conn: result = conn.execution_options(stream_results=True).execute(text("select * from table")) Note that some kinds of SQL statements may not be supported with server side cursors; generally, only SQL statements that return rows should be used with this option. .. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated and will be removed in a future release. Please use the :paramref:`_engine.Connection.stream_results` execution option for unbuffered cursor support. .. seealso:: :ref:`engine_stream_results` .. _postgresql_isolation_level: Transaction Isolation Level --------------------------- Most SQLAlchemy dialects support setting of transaction isolation level using the :paramref:`_sa.create_engine.isolation_level` parameter at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection` level via the :paramref:`.Connection.execution_options.isolation_level` parameter. For PostgreSQL dialects, this feature works either by making use of the DBAPI-specific features, such as psycopg2's isolation level flags which will embed the isolation level setting inline with the ``"BEGIN"`` statement, or for DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL `` ahead of the ``"BEGIN"`` statement emitted by the DBAPI. For the special AUTOCOMMIT isolation level, DBAPI-specific techniques are used which is typically an ``.autocommit`` flag on the DBAPI connection object. To set isolation level using :func:`_sa.create_engine`:: engine = create_engine( "postgresql+pg8000://scott:tiger@localhost/test", isolation_level = "REPEATABLE READ" ) To set using per-connection execution options:: with engine.connect() as conn: conn = conn.execution_options( isolation_level="REPEATABLE READ" ) with conn.begin(): # ... work with transaction There are also more options for isolation level configurations, such as "sub-engine" objects linked to a main :class:`_engine.Engine` which each apply different isolation level settings. See the discussion at :ref:`dbapi_autocommit` for background. Valid values for ``isolation_level`` on most PostgreSQL dialects include: * ``READ COMMITTED`` * ``READ UNCOMMITTED`` * ``REPEATABLE READ`` * ``SERIALIZABLE`` * ``AUTOCOMMIT`` .. seealso:: :ref:`dbapi_autocommit` :ref:`postgresql_readonly_deferrable` :ref:`psycopg2_isolation_level` :ref:`pg8000_isolation_level` .. _postgresql_readonly_deferrable: Setting READ ONLY / DEFERRABLE ------------------------------ Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" characteristics of the transaction, which is in addition to the isolation level setting. These two attributes can be established either in conjunction with or independently of the isolation level by passing the ``postgresql_readonly`` and ``postgresql_deferrable`` flags with :meth:`_engine.Connection.execution_options`. The example below illustrates passing the ``"SERIALIZABLE"`` isolation level at the same time as setting "READ ONLY" and "DEFERRABLE":: with engine.connect() as conn: conn = conn.execution_options( isolation_level="SERIALIZABLE", postgresql_readonly=True, postgresql_deferrable=True ) with conn.begin(): # ... work with transaction Note that some DBAPIs such as asyncpg only support "readonly" with SERIALIZABLE isolation. .. versionadded:: 1.4 added support for the ``postgresql_readonly`` and ``postgresql_deferrable`` execution options. .. _postgresql_reset_on_return: Temporary Table / Resource Reset for Connection Pooling ------------------------------------------------------- The :class:`.QueuePool` connection pool implementation used by the SQLAlchemy :class:`.Engine` object includes :ref:`reset on return ` behavior that will invoke the DBAPI ``.rollback()`` method when connections are returned to the pool. While this rollback will clear out the immediate state used by the previous transaction, it does not cover a wider range of session-level state, including temporary tables as well as other server state such as prepared statement handles and statement caches. The PostgreSQL database includes a variety of commands which may be used to reset this state, including ``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``. To install one or more of these commands as the means of performing reset-on-return, the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated in the example below. The implementation will end transactions in progress as well as discard temporary tables using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL documentation for background on what each of these statements do. The :paramref:`_sa.create_engine.pool_reset_on_return` parameter is set to ``None`` so that the custom scheme can replace the default behavior completely. The custom hook implementation calls ``.rollback()`` in any case, as it's usually important that the DBAPI's own tracking of commit/rollback will remain consistent with the state of the transaction:: from sqlalchemy import create_engine from sqlalchemy import event postgresql_engine = create_engine( "postgresql+pyscopg2://scott:tiger@hostname/dbname", # disable default reset-on-return scheme pool_reset_on_return=None, ) @event.listens_for(postgresql_engine, "reset") def _reset_postgresql(dbapi_connection, connection_record, reset_state): if not reset_state.terminate_only: dbapi_connection.execute("CLOSE ALL") dbapi_connection.execute("RESET ALL") dbapi_connection.execute("DISCARD TEMP") # so that the DBAPI itself knows that the connection has been # reset dbapi_connection.rollback() .. versionchanged:: 2.0.0b3 Added additional state arguments to the :meth:`.PoolEvents.reset` event and additionally ensured the event is invoked for all "reset" occurrences, so that it's appropriate as a place for custom "reset" handlers. Previous schemes which use the :meth:`.PoolEvents.checkin` handler remain usable as well. .. seealso:: :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation .. _postgresql_alternate_search_path: Setting Alternate Search Paths on Connect ------------------------------------------ The PostgreSQL ``search_path`` variable refers to the list of schema names that will be implicitly referenced when a particular table or other object is referenced in a SQL statement. As detailed in the next section :ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around the concept of keeping this variable at its default value of ``public``, however, in order to have it set to any arbitrary name or names when connections are used automatically, the "SET SESSION search_path" command may be invoked for all connections in a pool using the following event handler, as discussed at :ref:`schema_set_default_connections`:: from sqlalchemy import event from sqlalchemy import create_engine engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname") @event.listens_for(engine, "connect", insert=True) def set_search_path(dbapi_connection, connection_record): existing_autocommit = dbapi_connection.autocommit dbapi_connection.autocommit = True cursor = dbapi_connection.cursor() cursor.execute("SET SESSION search_path='%s'" % schema_name) cursor.close() dbapi_connection.autocommit = existing_autocommit The reason the recipe is complicated by use of the ``.autocommit`` DBAPI attribute is so that when the ``SET SESSION search_path`` directive is invoked, it is invoked outside of the scope of any transaction and therefore will not be reverted when the DBAPI connection has a rollback. .. seealso:: :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation .. _postgresql_schema_reflection: Remote-Schema Table Introspection and PostgreSQL search_path ------------------------------------------------------------ .. admonition:: Section Best Practices Summarized keep the ``search_path`` variable set to its default of ``public``, without any other schema names. Ensure the username used to connect **does not** match remote schemas, or ensure the ``"$user"`` token is **removed** from ``search_path``. For other schema names, name these explicitly within :class:`_schema.Table` definitions. Alternatively, the ``postgresql_ignore_search_path`` option will cause all reflected :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema` attribute set up. The PostgreSQL dialect can reflect tables from any schema, as outlined in :ref:`metadata_reflection_schemas`. In all cases, the first thing SQLAlchemy does when reflecting tables is to **determine the default schema for the current database connection**. It does this using the PostgreSQL ``current_schema()`` function, illustated below using a PostgreSQL client session (i.e. using the ``psql`` tool):: test=> select current_schema(); current_schema ---------------- public (1 row) Above we see that on a plain install of PostgreSQL, the default schema name is the name ``public``. However, if your database username **matches the name of a schema**, PostgreSQL's default is to then **use that name as the default schema**. Below, we log in using the username ``scott``. When we create a schema named ``scott``, **it implicitly changes the default schema**:: test=> select current_schema(); current_schema ---------------- public (1 row) test=> create schema scott; CREATE SCHEMA test=> select current_schema(); current_schema ---------------- scott (1 row) The behavior of ``current_schema()`` is derived from the `PostgreSQL search path `_ variable ``search_path``, which in modern PostgreSQL versions defaults to this:: test=> show search_path; search_path ----------------- "$user", public (1 row) Where above, the ``"$user"`` variable will inject the current username as the default schema, if one exists. Otherwise, ``public`` is used. When a :class:`_schema.Table` object is reflected, if it is present in the schema indicated by the ``current_schema()`` function, **the schema name assigned to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the ".schema" attribute will be assigned the string name of that schema. With regards to tables which these :class:`_schema.Table` objects refer to via foreign key constraint, a decision must be made as to how the ``.schema`` is represented in those remote tables, in the case where that remote schema name is also a member of the current ``search_path``. By default, the PostgreSQL dialect mimics the behavior encouraged by PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function returns a sample definition for a particular foreign key constraint, omitting the referenced schema name from that definition when the name is also in the PostgreSQL schema search path. The interaction below illustrates this behavior:: test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY); CREATE TABLE test=> CREATE TABLE referring( test(> id INTEGER PRIMARY KEY, test(> referred_id INTEGER REFERENCES test_schema.referred(id)); CREATE TABLE test=> SET search_path TO public, test_schema; test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n test-> ON n.oid = c.relnamespace test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid test-> WHERE c.relname='referring' AND r.contype = 'f' test-> ; pg_get_constraintdef --------------------------------------------------- FOREIGN KEY (referred_id) REFERENCES referred(id) (1 row) Above, we created a table ``referred`` as a member of the remote schema ``test_schema``, however when we added ``test_schema`` to the PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the ``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of the function. On the other hand, if we set the search path back to the typical default of ``public``:: test=> SET search_path TO public; SET The same query against ``pg_get_constraintdef()`` now returns the fully schema-qualified name for us:: test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n test-> ON n.oid = c.relnamespace test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid test-> WHERE c.relname='referring' AND r.contype = 'f'; pg_get_constraintdef --------------------------------------------------------------- FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id) (1 row) SQLAlchemy will by default use the return value of ``pg_get_constraintdef()`` in order to determine the remote schema name. That is, if our ``search_path`` were set to include ``test_schema``, and we invoked a table reflection process as follows:: >>> from sqlalchemy import Table, MetaData, create_engine, text >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") >>> with engine.connect() as conn: ... conn.execute(text("SET search_path TO test_schema, public")) ... metadata_obj = MetaData() ... referring = Table('referring', metadata_obj, ... autoload_with=conn) ... The above process would deliver to the :attr:`_schema.MetaData.tables` collection ``referred`` table named **without** the schema:: >>> metadata_obj.tables['referred'].schema is None True To alter the behavior of reflection such that the referred schema is maintained regardless of the ``search_path`` setting, use the ``postgresql_ignore_search_path`` option, which can be specified as a dialect-specific argument to both :class:`_schema.Table` as well as :meth:`_schema.MetaData.reflect`:: >>> with engine.connect() as conn: ... conn.execute(text("SET search_path TO test_schema, public")) ... metadata_obj = MetaData() ... referring = Table('referring', metadata_obj, ... autoload_with=conn, ... postgresql_ignore_search_path=True) ... We will now have ``test_schema.referred`` stored as schema-qualified:: >>> metadata_obj.tables['test_schema.referred'].schema 'test_schema' .. sidebar:: Best Practices for PostgreSQL Schema reflection The description of PostgreSQL schema reflection behavior is complex, and is the product of many years of dealing with widely varied use cases and user preferences. But in fact, there's no need to understand any of it if you just stick to the simplest use pattern: leave the ``search_path`` set to its default of ``public`` only, never refer to the name ``public`` as an explicit schema name otherwise, and refer to all other schema names explicitly when building up a :class:`_schema.Table` object. The options described here are only for those users who can't, or prefer not to, stay within these guidelines. .. seealso:: :ref:`reflection_schema_qualified_interaction` - discussion of the issue from a backend-agnostic perspective `The Schema Search Path `_ - on the PostgreSQL website. INSERT/UPDATE...RETURNING ------------------------- The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and ``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default for single-row INSERT statements in order to fetch newly generated primary key identifiers. To specify an explicit ``RETURNING`` clause, use the :meth:`._UpdateBase.returning` method on a per-statement basis:: # INSERT..RETURNING result = table.insert().returning(table.c.col1, table.c.col2).\ values(name='foo') print(result.fetchall()) # UPDATE..RETURNING result = table.update().returning(table.c.col1, table.c.col2).\ where(table.c.name=='foo').values(name='bar') print(result.fetchall()) # DELETE..RETURNING result = table.delete().returning(table.c.col1, table.c.col2).\ where(table.c.name=='foo') print(result.fetchall()) .. _postgresql_insert_on_conflict: INSERT...ON CONFLICT (Upsert) ------------------------------ Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A candidate row will only be inserted if that row does not violate any unique constraints. In the case of a unique constraint violation, a secondary action can occur which can be either "DO UPDATE", indicating that the data in the target row should be updated, or "DO NOTHING", which indicates to silently skip this row. Conflicts are determined using existing unique constraints and indexes. These constraints may be identified either using their name as stated in DDL, or they may be inferred by stating the columns and conditions that comprise the indexes. SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific :func:`_postgresql.insert()` function, which provides the generative methods :meth:`_postgresql.Insert.on_conflict_do_update` and :meth:`~.postgresql.Insert.on_conflict_do_nothing`: .. sourcecode:: pycon+sql >>> from sqlalchemy.dialects.postgresql import insert >>> insert_stmt = insert(my_table).values( ... id='some_existing_id', ... data='inserted value') >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing( ... index_elements=['id'] ... ) >>> print(do_nothing_stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT (id) DO NOTHING {stop} >>> do_update_stmt = insert_stmt.on_conflict_do_update( ... constraint='pk_my_table', ... set_=dict(data='updated value') ... ) >>> print(do_update_stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s .. seealso:: `INSERT .. ON CONFLICT `_ - in the PostgreSQL documentation. Specifying the Target ^^^^^^^^^^^^^^^^^^^^^ Both methods supply the "target" of the conflict using either the named constraint or by column inference: * The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument specifies a sequence containing string column names, :class:`_schema.Column` objects, and/or SQL expression elements, which would identify a unique index: .. sourcecode:: pycon+sql >>> do_update_stmt = insert_stmt.on_conflict_do_update( ... index_elements=['id'], ... set_=dict(data='updated value') ... ) >>> print(do_update_stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT (id) DO UPDATE SET data = %(param_1)s {stop} >>> do_update_stmt = insert_stmt.on_conflict_do_update( ... index_elements=[my_table.c.id], ... set_=dict(data='updated value') ... ) >>> print(do_update_stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT (id) DO UPDATE SET data = %(param_1)s * When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to infer an index, a partial index can be inferred by also specifying the use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter: .. sourcecode:: pycon+sql >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') >>> stmt = stmt.on_conflict_do_update( ... index_elements=[my_table.c.user_email], ... index_where=my_table.c.user_email.like('%@gmail.com'), ... set_=dict(data=stmt.excluded.data) ... ) >>> print(stmt) {printsql}INSERT INTO my_table (data, user_email) VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email) WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data * The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is used to specify an index directly rather than inferring it. This can be the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX: .. sourcecode:: pycon+sql >>> do_update_stmt = insert_stmt.on_conflict_do_update( ... constraint='my_table_idx_1', ... set_=dict(data='updated value') ... ) >>> print(do_update_stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s {stop} >>> do_update_stmt = insert_stmt.on_conflict_do_update( ... constraint='my_table_pk', ... set_=dict(data='updated value') ... ) >>> print(do_update_stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s {stop} * The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may also refer to a SQLAlchemy construct representing a constraint, e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, :class:`.Index`, or :class:`.ExcludeConstraint`. In this use, if the constraint has a name, it is used directly. Otherwise, if the constraint is unnamed, then inference will be used, where the expressions and optional WHERE clause of the constraint will be spelled out in the construct. This use is especially convenient to refer to the named or unnamed primary key of a :class:`_schema.Table` using the :attr:`_schema.Table.primary_key` attribute: .. sourcecode:: pycon+sql >>> do_update_stmt = insert_stmt.on_conflict_do_update( ... constraint=my_table.primary_key, ... set_=dict(data='updated value') ... ) >>> print(do_update_stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT (id) DO UPDATE SET data = %(param_1)s The SET Clause ^^^^^^^^^^^^^^^ ``ON CONFLICT...DO UPDATE`` is used to perform an update of the already existing row, using any combination of new values as well as values from the proposed insertion. These values are specified using the :paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This parameter accepts a dictionary which consists of direct values for UPDATE: .. sourcecode:: pycon+sql >>> stmt = insert(my_table).values(id='some_id', data='inserted value') >>> do_update_stmt = stmt.on_conflict_do_update( ... index_elements=['id'], ... set_=dict(data='updated value') ... ) >>> print(do_update_stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT (id) DO UPDATE SET data = %(param_1)s .. warning:: The :meth:`_expression.Insert.on_conflict_do_update` method does **not** take into account Python-side default UPDATE values or generation functions, e.g. those specified using :paramref:`_schema.Column.onupdate`. These values will not be exercised for an ON CONFLICT style of UPDATE, unless they are manually specified in the :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary. Updating using the Excluded INSERT Values ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ In order to refer to the proposed insertion row, the special alias :attr:`~.postgresql.Insert.excluded` is available as an attribute on the :class:`_postgresql.Insert` object; this object is a :class:`_expression.ColumnCollection` which alias contains all columns of the target table: .. sourcecode:: pycon+sql >>> stmt = insert(my_table).values( ... id='some_id', ... data='inserted value', ... author='jlh' ... ) >>> do_update_stmt = stmt.on_conflict_do_update( ... index_elements=['id'], ... set_=dict(data='updated value', author=stmt.excluded.author) ... ) >>> print(do_update_stmt) {printsql}INSERT INTO my_table (id, data, author) VALUES (%(id)s, %(data)s, %(author)s) ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author Additional WHERE Criteria ^^^^^^^^^^^^^^^^^^^^^^^^^ The :meth:`_expression.Insert.on_conflict_do_update` method also accepts a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where` parameter, which will limit those rows which receive an UPDATE: .. sourcecode:: pycon+sql >>> stmt = insert(my_table).values( ... id='some_id', ... data='inserted value', ... author='jlh' ... ) >>> on_update_stmt = stmt.on_conflict_do_update( ... index_elements=['id'], ... set_=dict(data='updated value', author=stmt.excluded.author), ... where=(my_table.c.status == 2) ... ) >>> print(on_update_stmt) {printsql}INSERT INTO my_table (id, data, author) VALUES (%(id)s, %(data)s, %(author)s) ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author WHERE my_table.status = %(status_1)s Skipping Rows with DO NOTHING ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ``ON CONFLICT`` may be used to skip inserting a row entirely if any conflict with a unique or exclusion constraint occurs; below this is illustrated using the :meth:`~.postgresql.Insert.on_conflict_do_nothing` method: .. sourcecode:: pycon+sql >>> stmt = insert(my_table).values(id='some_id', data='inserted value') >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id']) >>> print(stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT (id) DO NOTHING If ``DO NOTHING`` is used without specifying any columns or constraint, it has the effect of skipping the INSERT for any unique or exclusion constraint violation which occurs: .. sourcecode:: pycon+sql >>> stmt = insert(my_table).values(id='some_id', data='inserted value') >>> stmt = stmt.on_conflict_do_nothing() >>> print(stmt) {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) ON CONFLICT DO NOTHING .. _postgresql_match: Full Text Search ---------------- PostgreSQL's full text search system is available through the use of the :data:`.func` namespace, combined with the use of custom operators via the :meth:`.Operators.bool_op` method. For simple cases with some degree of cross-backend compatibility, the :meth:`.Operators.match` operator may also be used. .. _postgresql_simple_match: Simple plain text matching with ``match()`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The :meth:`.Operators.match` operator provides for cross-compatible simple text matching. For the PostgreSQL backend, it's hardcoded to generate an expression using the ``@@`` operator in conjunction with the ``plainto_tsquery()`` PostgreSQL function. On the PostgreSQL dialect, an expression like the following:: select(sometable.c.text.match("search string")) would emit to the database:: SELECT text @@ plainto_tsquery('search string') FROM table Above, passing a plain string to :meth:`.Operators.match` will automatically make use of ``plainto_tsquery()`` to specify the type of tsquery. This establishes basic database cross-compatibility for :meth:`.Operators.match` with other backends. .. versionchanged:: 2.0 The default tsquery generation function used by the PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``. To render exactly what was rendered in 1.4, use the following form:: from sqlalchemy import func select( sometable.c.text.bool_op("@@")(func.to_tsquery("search string")) ) Which would emit:: SELECT text @@ to_tsquery('search string') FROM table Using PostgreSQL full text functions and operators directly ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Text search operations beyond the simple use of :meth:`.Operators.match` may make use of the :data:`.func` namespace to generate PostgreSQL full-text functions, in combination with :meth:`.Operators.bool_op` to generate any boolean operator. For example, the query:: select( func.to_tsquery('cat').bool_op("@>")(func.to_tsquery('cat & rat')) ) would generate: .. sourcecode:: sql SELECT to_tsquery('cat') @> to_tsquery('cat & rat') The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST:: from sqlalchemy.dialects.postgresql import TSVECTOR from sqlalchemy import select, cast select(cast("some text", TSVECTOR)) produces a statement equivalent to:: SELECT CAST('some text' AS TSVECTOR) AS anon_1 The ``func`` namespace is augmented by the PostgreSQL dialect to set up correct argument and return types for most full text search functions. These functions are used automatically by the :attr:`_sql.func` namespace assuming the ``sqlalchemy.dialects.postgresql`` package has been imported, or :func:`_sa.create_engine` has been invoked using a ``postgresql`` dialect. These functions are documented at: * :class:`_postgresql.to_tsvector` * :class:`_postgresql.to_tsquery` * :class:`_postgresql.plainto_tsquery` * :class:`_postgresql.phraseto_tsquery` * :class:`_postgresql.websearch_to_tsquery` * :class:`_postgresql.ts_headline` Specifying the "regconfig" with ``match()`` or custom operators ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ PostgreSQL's ``plainto_tsquery()`` function accepts an optional "regconfig" argument that is used to instruct PostgreSQL to use a particular pre-computed GIN or GiST index in order to perform the search. When using :meth:`.Operators.match`, this additional parameter may be specified using the ``postgresql_regconfig`` parameter, such as:: select(mytable.c.id).where( mytable.c.title.match('somestring', postgresql_regconfig='english') ) Which would emit:: SELECT mytable.id FROM mytable WHERE mytable.title @@ plainto_tsquery('english', 'somestring') When using other PostgreSQL search functions with :data:`.func`, the "regconfig" parameter may be passed directly as the initial argument:: select(mytable.c.id).where( func.to_tsvector("english", mytable.c.title).bool_op("@@")( func.to_tsquery("english", "somestring") ) ) produces a statement equivalent to:: SELECT mytable.id FROM mytable WHERE to_tsvector('english', mytable.title) @@ to_tsquery('english', 'somestring') It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from PostgreSQL to ensure that you are generating queries with SQLAlchemy that take full advantage of any indexes you may have created for full text search. .. seealso:: `Full Text Search `_ - in the PostgreSQL documentation FROM ONLY ... ------------- The dialect supports PostgreSQL's ONLY keyword for targeting only a particular table in an inheritance hierarchy. This can be used to produce the ``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...`` syntaxes. It uses SQLAlchemy's hints mechanism:: # SELECT ... FROM ONLY ... result = table.select().with_hint(table, 'ONLY', 'postgresql') print(result.fetchall()) # UPDATE ONLY ... table.update(values=dict(foo='bar')).with_hint('ONLY', dialect_name='postgresql') # DELETE FROM ONLY ... table.delete().with_hint('ONLY', dialect_name='postgresql') .. _postgresql_indexes: PostgreSQL-Specific Index Options --------------------------------- Several extensions to the :class:`.Index` construct are available, specific to the PostgreSQL dialect. Covering Indexes ^^^^^^^^^^^^^^^^ The ``postgresql_include`` option renders INCLUDE(colname) for the given string names:: Index("my_index", table.c.x, postgresql_include=['y']) would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` Note that this feature requires PostgreSQL 11 or later. .. versionadded:: 1.4 .. _postgresql_partial_indexes: Partial Indexes ^^^^^^^^^^^^^^^ Partial indexes add criterion to the index definition so that the index is applied to a subset of rows. These can be specified on :class:`.Index` using the ``postgresql_where`` keyword argument:: Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10) .. _postgresql_operator_classes: Operator Classes ^^^^^^^^^^^^^^^^ PostgreSQL allows the specification of an *operator class* for each column of an index (see https://www.postgresql.org/docs/current/interactive/indexes-opclass.html). The :class:`.Index` construct allows these to be specified via the ``postgresql_ops`` keyword argument:: Index( 'my_index', my_table.c.id, my_table.c.data, postgresql_ops={ 'data': 'text_pattern_ops', 'id': 'int4_ops' }) Note that the keys in the ``postgresql_ops`` dictionaries are the "key" name of the :class:`_schema.Column`, i.e. the name used to access it from the ``.c`` collection of :class:`_schema.Table`, which can be configured to be different than the actual name of the column as expressed in the database. If ``postgresql_ops`` is to be used against a complex SQL expression such as a function call, then to apply to the column it must be given a label that is identified in the dictionary by name, e.g.:: Index( 'my_index', my_table.c.id, func.lower(my_table.c.data).label('data_lower'), postgresql_ops={ 'data_lower': 'text_pattern_ops', 'id': 'int4_ops' }) Operator classes are also supported by the :class:`_postgresql.ExcludeConstraint` construct using the :paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for details. .. versionadded:: 1.3.21 added support for operator classes with :class:`_postgresql.ExcludeConstraint`. Index Types ^^^^^^^^^^^ PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well as the ability for users to create their own (see https://www.postgresql.org/docs/current/static/indexes-types.html). These can be specified on :class:`.Index` using the ``postgresql_using`` keyword argument:: Index('my_index', my_table.c.data, postgresql_using='gin') The value passed to the keyword argument will be simply passed through to the underlying CREATE INDEX command, so it *must* be a valid index type for your version of PostgreSQL. .. _postgresql_index_storage: Index Storage Parameters ^^^^^^^^^^^^^^^^^^^^^^^^ PostgreSQL allows storage parameters to be set on indexes. The storage parameters available depend on the index method used by the index. Storage parameters can be specified on :class:`.Index` using the ``postgresql_with`` keyword argument:: Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50}) PostgreSQL allows to define the tablespace in which to create the index. The tablespace can be specified on :class:`.Index` using the ``postgresql_tablespace`` keyword argument:: Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace') Note that the same option is available on :class:`_schema.Table` as well. .. _postgresql_index_concurrently: Indexes with CONCURRENTLY ^^^^^^^^^^^^^^^^^^^^^^^^^ The PostgreSQL index option CONCURRENTLY is supported by passing the flag ``postgresql_concurrently`` to the :class:`.Index` construct:: tbl = Table('testtbl', m, Column('data', Integer)) idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True) The above index construct will render DDL for CREATE INDEX, assuming PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as:: CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data) For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for a connection-less dialect, it will emit:: DROP INDEX CONCURRENTLY test_idx1 When using CONCURRENTLY, the PostgreSQL database requires that the statement be invoked outside of a transaction block. The Python DBAPI enforces that even for a single statement, a transaction is present, so to use this construct, the DBAPI's "autocommit" mode must be used:: metadata = MetaData() table = Table( "foo", metadata, Column("id", String)) index = Index( "foo_idx", table.c.id, postgresql_concurrently=True) with engine.connect() as conn: with conn.execution_options(isolation_level='AUTOCOMMIT'): table.create(conn) .. seealso:: :ref:`postgresql_isolation_level` .. _postgresql_index_reflection: PostgreSQL Index Reflection --------------------------- The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the UNIQUE CONSTRAINT construct is used. When inspecting a table using :class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes` and the :meth:`_reflection.Inspector.get_unique_constraints` will report on these two constructs distinctly; in the case of the index, the key ``duplicates_constraint`` will be present in the index entry if it is detected as mirroring a constraint. When performing reflection using ``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned in :attr:`_schema.Table.indexes` when it is detected as mirroring a :class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection . Special Reflection Options -------------------------- The :class:`_reflection.Inspector` used for the PostgreSQL backend is an instance of :class:`.PGInspector`, which offers additional methods:: from sqlalchemy import create_engine, inspect engine = create_engine("postgresql+psycopg2://localhost/test") insp = inspect(engine) # will be a PGInspector print(insp.get_enums()) .. autoclass:: PGInspector :members: .. _postgresql_table_options: PostgreSQL Table Options ------------------------ Several options for CREATE TABLE are supported directly by the PostgreSQL dialect in conjunction with the :class:`_schema.Table` construct: * ``INHERITS``:: Table("some_table", metadata, ..., postgresql_inherits="some_supertable") Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...)) * ``ON COMMIT``:: Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS') * ``PARTITION BY``:: Table("some_table", metadata, ..., postgresql_partition_by='LIST (part_column)') .. versionadded:: 1.2.6 * ``TABLESPACE``:: Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace') The above option is also available on the :class:`.Index` construct. * ``USING``:: Table("some_table", metadata, ..., postgresql_using='heap') .. versionadded:: 2.0.26 * ``WITH OIDS``:: Table("some_table", metadata, ..., postgresql_with_oids=True) * ``WITHOUT OIDS``:: Table("some_table", metadata, ..., postgresql_with_oids=False) .. seealso:: `PostgreSQL CREATE TABLE options `_ - in the PostgreSQL documentation. .. _postgresql_constraint_options: PostgreSQL Constraint Options ----------------------------- The following option(s) are supported by the PostgreSQL dialect in conjunction with selected constraint constructs: * ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints when the constraint is being added to an existing table via ALTER TABLE, and has the effect that existing rows are not scanned during the ALTER operation against the constraint being added. When using a SQL migration tool such as `Alembic `_ that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument may be specified as an additional keyword argument within the operation that creates the constraint, as in the following Alembic example:: def update(): op.create_foreign_key( "fk_user_address", "address", "user", ["user_id"], ["id"], postgresql_not_valid=True ) The keyword is ultimately accepted directly by the :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint` and :class:`_schema.ForeignKey` constructs; when using a tool like Alembic, dialect-specific keyword arguments are passed through to these constructs from the migration operation directives:: CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True) ForeignKeyConstraint(["some_id"], ["some_table.some_id"], postgresql_not_valid=True) .. versionadded:: 1.4.32 .. seealso:: `PostgreSQL ALTER TABLE options `_ - in the PostgreSQL documentation. .. _postgresql_table_valued_overview: Table values, Table and Column valued functions, Row and Tuple objects ----------------------------------------------------------------------- PostgreSQL makes great use of modern SQL forms such as table-valued functions, tables and rows as values. These constructs are commonly used as part of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other datatypes. SQLAlchemy's SQL expression language has native support for most table-valued and row-valued forms. .. _postgresql_table_valued: Table-Valued Functions ^^^^^^^^^^^^^^^^^^^^^^^ Many PostgreSQL built-in functions are intended to be used in the FROM clause of a SELECT statement, and are capable of returning table rows or sets of table rows. A large portion of PostgreSQL's JSON functions for example such as ``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``, ``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such forms. These classes of SQL function calling forms in SQLAlchemy are available using the :meth:`_functions.FunctionElement.table_valued` method in conjunction with :class:`_functions.Function` objects generated from the :data:`_sql.func` namespace. Examples from PostgreSQL's reference documentation follow below: * ``json_each()``: .. sourcecode:: pycon+sql >>> from sqlalchemy import select, func >>> stmt = select(func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value")) >>> print(stmt) {printsql}SELECT anon_1.key, anon_1.value FROM json_each(:json_each_1) AS anon_1 * ``json_populate_record()``: .. sourcecode:: pycon+sql >>> from sqlalchemy import select, func, literal_column >>> stmt = select( ... func.json_populate_record( ... literal_column("null::myrowtype"), ... '{"a":1,"b":2}' ... ).table_valued("a", "b", name="x") ... ) >>> print(stmt) {printsql}SELECT x.a, x.b FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x * ``json_to_record()`` - this form uses a PostgreSQL specific form of derived columns in the alias, where we may make use of :func:`_sql.column` elements with types to produce them. The :meth:`_functions.FunctionElement.table_valued` method produces a :class:`_sql.TableValuedAlias` construct, and the method :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived columns specification: .. sourcecode:: pycon+sql >>> from sqlalchemy import select, func, column, Integer, Text >>> stmt = select( ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}').table_valued( ... column("a", Integer), column("b", Text), column("d", Text), ... ).render_derived(name="x", with_types=True) ... ) >>> print(stmt) {printsql}SELECT x.a, x.b, x.d FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT) * ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an ordinal counter to the output of a function and is accepted by a limited set of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The :meth:`_functions.FunctionElement.table_valued` method accepts a keyword parameter ``with_ordinality`` for this purpose, which accepts the string name that will be applied to the "ordinality" column: .. sourcecode:: pycon+sql >>> from sqlalchemy import select, func >>> stmt = select( ... func.generate_series(4, 1, -1). ... table_valued("value", with_ordinality="ordinality"). ... render_derived() ... ) >>> print(stmt) {printsql}SELECT anon_1.value, anon_1.ordinality FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) WITH ORDINALITY AS anon_1(value, ordinality) .. versionadded:: 1.4.0b2 .. seealso:: :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial` .. _postgresql_column_valued: Column Valued Functions ^^^^^^^^^^^^^^^^^^^^^^^ Similar to the table valued function, a column valued function is present in the FROM clause, but delivers itself to the columns clause as a single scalar value. PostgreSQL functions such as ``json_array_elements()``, ``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the :meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`: * ``json_array_elements()``: .. sourcecode:: pycon+sql >>> from sqlalchemy import select, func >>> stmt = select(func.json_array_elements('["one", "two"]').column_valued("x")) >>> print(stmt) {printsql}SELECT x FROM json_array_elements(:json_array_elements_1) AS x * ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the :func:`_postgresql.array` construct may be used: .. sourcecode:: pycon+sql >>> from sqlalchemy.dialects.postgresql import array >>> from sqlalchemy import select, func >>> stmt = select(func.unnest(array([1, 2])).column_valued()) >>> print(stmt) {printsql}SELECT anon_1 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1 The function can of course be used against an existing table-bound column that's of type :class:`_types.ARRAY`: .. sourcecode:: pycon+sql >>> from sqlalchemy import table, column, ARRAY, Integer >>> from sqlalchemy import select, func >>> t = table("t", column('value', ARRAY(Integer))) >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value")) >>> print(stmt) {printsql}SELECT unnested_value FROM unnest(t.value) AS unnested_value .. seealso:: :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial` Row Types ^^^^^^^^^ Built-in support for rendering a ``ROW`` may be approximated using ``func.ROW`` with the :attr:`_sa.func` namespace, or by using the :func:`_sql.tuple_` construct: .. sourcecode:: pycon+sql >>> from sqlalchemy import table, column, func, tuple_ >>> t = table("t", column("id"), column("fk")) >>> stmt = t.select().where( ... tuple_(t.c.id, t.c.fk) > (1,2) ... ).where( ... func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7) ... ) >>> print(stmt) {printsql}SELECT t.id, t.fk FROM t WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2) .. seealso:: `PostgreSQL Row Constructors `_ `PostgreSQL Row Constructor Comparison `_ Table Types passed to Functions ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ PostgreSQL supports passing a table as an argument to a function, which is known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects such as :class:`_schema.Table` support this special form using the :meth:`_sql.FromClause.table_valued` method, which is comparable to the :meth:`_functions.FunctionElement.table_valued` method except that the collection of columns is already established by that of the :class:`_sql.FromClause` itself: .. sourcecode:: pycon+sql >>> from sqlalchemy import table, column, func, select >>> a = table( "a", column("id"), column("x"), column("y")) >>> stmt = select(func.row_to_json(a.table_valued())) >>> print(stmt) {printsql}SELECT row_to_json(a) AS row_to_json_1 FROM a .. versionadded:: 1.4.0b2 """ # noqa: E501 from __future__ import annotations from collections import defaultdict from functools import lru_cache import re from typing import Any from typing import cast from typing import List from typing import Optional from typing import Tuple from typing import TYPE_CHECKING from typing import Union from . import arraylib as _array from . import json as _json from . import pg_catalog from . import ranges as _ranges from .ext import _regconfig_fn from .ext import aggregate_order_by from .hstore import HSTORE from .named_types import CreateDomainType as CreateDomainType # noqa: F401 from .named_types import CreateEnumType as CreateEnumType # noqa: F401 from .named_types import DOMAIN as DOMAIN # noqa: F401 from .named_types import DropDomainType as DropDomainType # noqa: F401 from .named_types import DropEnumType as DropEnumType # noqa: F401 from .named_types import ENUM as ENUM # noqa: F401 from .named_types import NamedType as NamedType # noqa: F401 from .types import _DECIMAL_TYPES # noqa: F401 from .types import _FLOAT_TYPES # noqa: F401 from .types import _INT_TYPES # noqa: F401 from .types import BIT as BIT from .types import BYTEA as BYTEA from .types import CIDR as CIDR from .types import CITEXT as CITEXT from .types import INET as INET from .types import INTERVAL as INTERVAL from .types import MACADDR as MACADDR from .types import MACADDR8 as MACADDR8 from .types import MONEY as MONEY from .types import OID as OID from .types import PGBit as PGBit # noqa: F401 from .types import PGCidr as PGCidr # noqa: F401 from .types import PGInet as PGInet # noqa: F401 from .types import PGInterval as PGInterval # noqa: F401 from .types import PGMacAddr as PGMacAddr # noqa: F401 from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401 from .types import PGUuid as PGUuid from .types import REGCLASS as REGCLASS from .types import REGCONFIG as REGCONFIG # noqa: F401 from .types import TIME as TIME from .types import TIMESTAMP as TIMESTAMP from .types import TSVECTOR as TSVECTOR from ... import exc from ... import schema from ... import select from ... import sql from ... import util from ...engine import characteristics from ...engine import default from ...engine import interfaces from ...engine import ObjectKind from ...engine import ObjectScope from ...engine import reflection from ...engine import URL from ...engine.reflection import ReflectionDefaults from ...sql import bindparam from ...sql import coercions from ...sql import compiler from ...sql import elements from ...sql import expression from ...sql import roles from ...sql import sqltypes from ...sql import util as sql_util from ...sql.compiler import InsertmanyvaluesSentinelOpts from ...sql.visitors import InternalTraversal from ...types import BIGINT from ...types import BOOLEAN from ...types import CHAR from ...types import DATE from ...types import DOUBLE_PRECISION from ...types import FLOAT from ...types import INTEGER from ...types import NUMERIC from ...types import REAL from ...types import SMALLINT from ...types import TEXT from ...types import UUID as UUID from ...types import VARCHAR from ...util.typing import TypedDict IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I) RESERVED_WORDS = { "all", "analyse", "analyze", "and", "any", "array", "as", "asc", "asymmetric", "both", "case", "cast", "check", "collate", "column", "constraint", "create", "current_catalog", "current_date", "current_role", "current_time", "current_timestamp", "current_user", "default", "deferrable", "desc", "distinct", "do", "else", "end", "except", "false", "fetch", "for", "foreign", "from", "grant", "group", "having", "in", "initially", "intersect", "into", "leading", "limit", "localtime", "localtimestamp", "new", "not", "null", "of", "off", "offset", "old", "on", "only", "or", "order", "placing", "primary", "references", "returning", "select", "session_user", "some", "symmetric", "table", "then", "to", "trailing", "true", "union", "unique", "user", "using", "variadic", "when", "where", "window", "with", "authorization", "between", "binary", "cross", "current_schema", "freeze", "full", "ilike", "inner", "is", "isnull", "join", "left", "like", "natural", "notnull", "outer", "over", "overlaps", "right", "similar", "verbose", } colspecs = { sqltypes.ARRAY: _array.ARRAY, sqltypes.Interval: INTERVAL, sqltypes.Enum: ENUM, sqltypes.JSON.JSONPathType: _json.JSONPATH, sqltypes.JSON: _json.JSON, sqltypes.Uuid: PGUuid, } ischema_names = { "_array": _array.ARRAY, "hstore": HSTORE, "json": _json.JSON, "jsonb": _json.JSONB, "int4range": _ranges.INT4RANGE, "int8range": _ranges.INT8RANGE, "numrange": _ranges.NUMRANGE, "daterange": _ranges.DATERANGE, "tsrange": _ranges.TSRANGE, "tstzrange": _ranges.TSTZRANGE, "int4multirange": _ranges.INT4MULTIRANGE, "int8multirange": _ranges.INT8MULTIRANGE, "nummultirange": _ranges.NUMMULTIRANGE, "datemultirange": _ranges.DATEMULTIRANGE, "tsmultirange": _ranges.TSMULTIRANGE, "tstzmultirange": _ranges.TSTZMULTIRANGE, "integer": INTEGER, "bigint": BIGINT, "smallint": SMALLINT, "character varying": VARCHAR, "character": CHAR, '"char"': sqltypes.String, "name": sqltypes.String, "text": TEXT, "numeric": NUMERIC, "float": FLOAT, "real": REAL, "inet": INET, "cidr": CIDR, "citext": CITEXT, "uuid": UUID, "bit": BIT, "bit varying": BIT, "macaddr": MACADDR, "macaddr8": MACADDR8, "money": MONEY, "oid": OID, "regclass": REGCLASS, "double precision": DOUBLE_PRECISION, "timestamp": TIMESTAMP, "timestamp with time zone": TIMESTAMP, "timestamp without time zone": TIMESTAMP, "time with time zone": TIME, "time without time zone": TIME, "date": DATE, "time": TIME, "bytea": BYTEA, "boolean": BOOLEAN, "interval": INTERVAL, "tsvector": TSVECTOR, } class PGCompiler(compiler.SQLCompiler): def visit_to_tsvector_func(self, element, **kw): return self._assert_pg_ts_ext(element, **kw) def visit_to_tsquery_func(self, element, **kw): return self._assert_pg_ts_ext(element, **kw) def visit_plainto_tsquery_func(self, element, **kw): return self._assert_pg_ts_ext(element, **kw) def visit_phraseto_tsquery_func(self, element, **kw): return self._assert_pg_ts_ext(element, **kw) def visit_websearch_to_tsquery_func(self, element, **kw): return self._assert_pg_ts_ext(element, **kw) def visit_ts_headline_func(self, element, **kw): return self._assert_pg_ts_ext(element, **kw) def _assert_pg_ts_ext(self, element, **kw): if not isinstance(element, _regconfig_fn): # other options here include trying to rewrite the function # with the correct types. however, that means we have to # "un-SQL-ize" the first argument, which can't work in a # generalized way. Also, parent compiler class has already added # the incorrect return type to the result map. So let's just # make sure the function we want is used up front. raise exc.CompileError( f'Can\'t compile "{element.name}()" full text search ' f"function construct that does not originate from the " f'"sqlalchemy.dialects.postgresql" package. ' f'Please ensure "import sqlalchemy.dialects.postgresql" is ' f"called before constructing " f'"sqlalchemy.func.{element.name}()" to ensure registration ' f"of the correct argument and return types." ) return f"{element.name}{self.function_argspec(element, **kw)}" def render_bind_cast(self, type_, dbapi_type, sqltext): if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length: # use VARCHAR with no length for VARCHAR cast. # see #9511 dbapi_type = sqltypes.STRINGTYPE return f"""{sqltext}::{ self.dialect.type_compiler_instance.process( dbapi_type, identifier_preparer=self.preparer ) }""" def visit_array(self, element, **kw): return "ARRAY[%s]" % self.visit_clauselist(element, **kw) def visit_slice(self, element, **kw): return "%s:%s" % ( self.process(element.start, **kw), self.process(element.stop, **kw), ) def visit_bitwise_xor_op_binary(self, binary, operator, **kw): return self._generate_generic_binary(binary, " # ", **kw) def visit_json_getitem_op_binary( self, binary, operator, _cast_applied=False, **kw ): if ( not _cast_applied and binary.type._type_affinity is not sqltypes.JSON ): kw["_cast_applied"] = True return self.process(sql.cast(binary, binary.type), **kw) kw["eager_grouping"] = True return self._generate_generic_binary( binary, " -> " if not _cast_applied else " ->> ", **kw ) def visit_json_path_getitem_op_binary( self, binary, operator, _cast_applied=False, **kw ): if ( not _cast_applied and binary.type._type_affinity is not sqltypes.JSON ): kw["_cast_applied"] = True return self.process(sql.cast(binary, binary.type), **kw) kw["eager_grouping"] = True return self._generate_generic_binary( binary, " #> " if not _cast_applied else " #>> ", **kw ) def visit_getitem_binary(self, binary, operator, **kw): return "%s[%s]" % ( self.process(binary.left, **kw), self.process(binary.right, **kw), ) def visit_aggregate_order_by(self, element, **kw): return "%s ORDER BY %s" % ( self.process(element.target, **kw), self.process(element.order_by, **kw), ) def visit_match_op_binary(self, binary, operator, **kw): if "postgresql_regconfig" in binary.modifiers: regconfig = self.render_literal_value( binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE ) if regconfig: return "%s @@ plainto_tsquery(%s, %s)" % ( self.process(binary.left, **kw), regconfig, self.process(binary.right, **kw), ) return "%s @@ plainto_tsquery(%s)" % ( self.process(binary.left, **kw), self.process(binary.right, **kw), ) def visit_ilike_case_insensitive_operand(self, element, **kw): return element.element._compiler_dispatch(self, **kw) def visit_ilike_op_binary(self, binary, operator, **kw): escape = binary.modifiers.get("escape", None) return "%s ILIKE %s" % ( self.process(binary.left, **kw), self.process(binary.right, **kw), ) + ( " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) if escape is not None else "" ) def visit_not_ilike_op_binary(self, binary, operator, **kw): escape = binary.modifiers.get("escape", None) return "%s NOT ILIKE %s" % ( self.process(binary.left, **kw), self.process(binary.right, **kw), ) + ( " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) if escape is not None else "" ) def _regexp_match(self, base_op, binary, operator, kw): flags = binary.modifiers["flags"] if flags is None: return self._generate_generic_binary( binary, " %s " % base_op, **kw ) if flags == "i": return self._generate_generic_binary( binary, " %s* " % base_op, **kw ) return "%s %s CONCAT('(?', %s, ')', %s)" % ( self.process(binary.left, **kw), base_op, self.render_literal_value(flags, sqltypes.STRINGTYPE), self.process(binary.right, **kw), ) def visit_regexp_match_op_binary(self, binary, operator, **kw): return self._regexp_match("~", binary, operator, kw) def visit_not_regexp_match_op_binary(self, binary, operator, **kw): return self._regexp_match("!~", binary, operator, kw) def visit_regexp_replace_op_binary(self, binary, operator, **kw): string = self.process(binary.left, **kw) pattern_replace = self.process(binary.right, **kw) flags = binary.modifiers["flags"] if flags is None: return "REGEXP_REPLACE(%s, %s)" % ( string, pattern_replace, ) else: return "REGEXP_REPLACE(%s, %s, %s)" % ( string, pattern_replace, self.render_literal_value(flags, sqltypes.STRINGTYPE), ) def visit_empty_set_expr(self, element_types, **kw): # cast the empty set to the type we are comparing against. if # we are comparing against the null type, pick an arbitrary # datatype for the empty set return "SELECT %s WHERE 1!=1" % ( ", ".join( "CAST(NULL AS %s)" % self.dialect.type_compiler_instance.process( INTEGER() if type_._isnull else type_ ) for type_ in element_types or [INTEGER()] ), ) def render_literal_value(self, value, type_): value = super().render_literal_value(value, type_) if self.dialect._backslash_escapes: value = value.replace("\\", "\\\\") return value def visit_aggregate_strings_func(self, fn, **kw): return "string_agg%s" % self.function_argspec(fn) def visit_sequence(self, seq, **kw): return "nextval('%s')" % self.preparer.format_sequence(seq) def limit_clause(self, select, **kw): text = "" if select._limit_clause is not None: text += " \n LIMIT " + self.process(select._limit_clause, **kw) if select._offset_clause is not None: if select._limit_clause is None: text += "\n LIMIT ALL" text += " OFFSET " + self.process(select._offset_clause, **kw) return text def format_from_hint_text(self, sqltext, table, hint, iscrud): if hint.upper() != "ONLY": raise exc.CompileError("Unrecognized hint: %r" % hint) return "ONLY " + sqltext def get_select_precolumns(self, select, **kw): # Do not call super().get_select_precolumns because # it will warn/raise when distinct on is present if select._distinct or select._distinct_on: if select._distinct_on: return ( "DISTINCT ON (" + ", ".join( [ self.process(col, **kw) for col in select._distinct_on ] ) + ") " ) else: return "DISTINCT " else: return "" def for_update_clause(self, select, **kw): if select._for_update_arg.read: if select._for_update_arg.key_share: tmp = " FOR KEY SHARE" else: tmp = " FOR SHARE" elif select._for_update_arg.key_share: tmp = " FOR NO KEY UPDATE" else: tmp = " FOR UPDATE" if select._for_update_arg.of: tables = util.OrderedSet() for c in select._for_update_arg.of: tables.update(sql_util.surface_selectables_only(c)) tmp += " OF " + ", ".join( self.process(table, ashint=True, use_schema=False, **kw) for table in tables ) if select._for_update_arg.nowait: tmp += " NOWAIT" if select._for_update_arg.skip_locked: tmp += " SKIP LOCKED" return tmp def visit_substring_func(self, func, **kw): s = self.process(func.clauses.clauses[0], **kw) start = self.process(func.clauses.clauses[1], **kw) if len(func.clauses.clauses) > 2: length = self.process(func.clauses.clauses[2], **kw) return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) else: return "SUBSTRING(%s FROM %s)" % (s, start) def _on_conflict_target(self, clause, **kw): if clause.constraint_target is not None: # target may be a name of an Index, UniqueConstraint or # ExcludeConstraint. While there is a separate # "max_identifier_length" for indexes, PostgreSQL uses the same # length for all objects so we can use # truncate_and_render_constraint_name target_text = ( "ON CONSTRAINT %s" % self.preparer.truncate_and_render_constraint_name( clause.constraint_target ) ) elif clause.inferred_target_elements is not None: target_text = "(%s)" % ", ".join( ( self.preparer.quote(c) if isinstance(c, str) else self.process(c, include_table=False, use_schema=False) ) for c in clause.inferred_target_elements ) if clause.inferred_target_whereclause is not None: target_text += " WHERE %s" % self.process( clause.inferred_target_whereclause, include_table=False, use_schema=False, ) else: target_text = "" return target_text def visit_on_conflict_do_nothing(self, on_conflict, **kw): target_text = self._on_conflict_target(on_conflict, **kw) if target_text: return "ON CONFLICT %s DO NOTHING" % target_text else: return "ON CONFLICT DO NOTHING" def visit_on_conflict_do_update(self, on_conflict, **kw): clause = on_conflict target_text = self._on_conflict_target(on_conflict, **kw) action_set_ops = [] set_parameters = dict(clause.update_values_to_set) # create a list of column assignment clauses as tuples insert_statement = self.stack[-1]["selectable"] cols = insert_statement.table.c for c in cols: col_key = c.key if col_key in set_parameters: value = set_parameters.pop(col_key) elif c in set_parameters: value = set_parameters.pop(c) else: continue if coercions._is_literal(value): value = elements.BindParameter(None, value, type_=c.type) else: if ( isinstance(value, elements.BindParameter) and value.type._isnull ): value = value._clone() value.type = c.type value_text = self.process(value.self_group(), use_schema=False) key_text = self.preparer.quote(c.name) action_set_ops.append("%s = %s" % (key_text, value_text)) # check for names that don't match columns if set_parameters: util.warn( "Additional column names not matching " "any column keys in table '%s': %s" % ( self.current_executable.table.name, (", ".join("'%s'" % c for c in set_parameters)), ) ) for k, v in set_parameters.items(): key_text = ( self.preparer.quote(k) if isinstance(k, str) else self.process(k, use_schema=False) ) value_text = self.process( coercions.expect(roles.ExpressionElementRole, v), use_schema=False, ) action_set_ops.append("%s = %s" % (key_text, value_text)) action_text = ", ".join(action_set_ops) if clause.update_whereclause is not None: action_text += " WHERE %s" % self.process( clause.update_whereclause, include_table=True, use_schema=False ) return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) def update_from_clause( self, update_stmt, from_table, extra_froms, from_hints, **kw ): kw["asfrom"] = True return "FROM " + ", ".join( t._compiler_dispatch(self, fromhints=from_hints, **kw) for t in extra_froms ) def delete_extra_from_clause( self, delete_stmt, from_table, extra_froms, from_hints, **kw ): """Render the DELETE .. USING clause specific to PostgreSQL.""" kw["asfrom"] = True return "USING " + ", ".join( t._compiler_dispatch(self, fromhints=from_hints, **kw) for t in extra_froms ) def fetch_clause(self, select, **kw): # pg requires parens for non literal clauses. It's also required for # bind parameters if a ::type casts is used by the driver (asyncpg), # so it's easiest to just always add it text = "" if select._offset_clause is not None: text += "\n OFFSET (%s) ROWS" % self.process( select._offset_clause, **kw ) if select._fetch_clause is not None: text += "\n FETCH FIRST (%s)%s ROWS %s" % ( self.process(select._fetch_clause, **kw), " PERCENT" if select._fetch_clause_options["percent"] else "", ( "WITH TIES" if select._fetch_clause_options["with_ties"] else "ONLY" ), ) return text class PGDDLCompiler(compiler.DDLCompiler): def get_column_specification(self, column, **kwargs): colspec = self.preparer.format_column(column) impl_type = column.type.dialect_impl(self.dialect) if isinstance(impl_type, sqltypes.TypeDecorator): impl_type = impl_type.impl has_identity = ( column.identity is not None and self.dialect.supports_identity_columns ) if ( column.primary_key and column is column.table._autoincrement_column and ( self.dialect.supports_smallserial or not isinstance(impl_type, sqltypes.SmallInteger) ) and not has_identity and ( column.default is None or ( isinstance(column.default, schema.Sequence) and column.default.optional ) ) ): if isinstance(impl_type, sqltypes.BigInteger): colspec += " BIGSERIAL" elif isinstance(impl_type, sqltypes.SmallInteger): colspec += " SMALLSERIAL" else: colspec += " SERIAL" else: colspec += " " + self.dialect.type_compiler_instance.process( column.type, type_expression=column, identifier_preparer=self.preparer, ) default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default if column.computed is not None: colspec += " " + self.process(column.computed) if has_identity: colspec += " " + self.process(column.identity) if not column.nullable and not has_identity: colspec += " NOT NULL" elif column.nullable and has_identity: colspec += " NULL" return colspec def _define_constraint_validity(self, constraint): not_valid = constraint.dialect_options["postgresql"]["not_valid"] return " NOT VALID" if not_valid else "" def visit_check_constraint(self, constraint, **kw): if constraint._type_bound: typ = list(constraint.columns)[0].type if ( isinstance(typ, sqltypes.ARRAY) and isinstance(typ.item_type, sqltypes.Enum) and not typ.item_type.native_enum ): raise exc.CompileError( "PostgreSQL dialect cannot produce the CHECK constraint " "for ARRAY of non-native ENUM; please specify " "create_constraint=False on this Enum datatype." ) text = super().visit_check_constraint(constraint) text += self._define_constraint_validity(constraint) return text def visit_foreign_key_constraint(self, constraint, **kw): text = super().visit_foreign_key_constraint(constraint) text += self._define_constraint_validity(constraint) return text def visit_create_enum_type(self, create, **kw): type_ = create.element return "CREATE TYPE %s AS ENUM (%s)" % ( self.preparer.format_type(type_), ", ".join( self.sql_compiler.process(sql.literal(e), literal_binds=True) for e in type_.enums ), ) def visit_drop_enum_type(self, drop, **kw): type_ = drop.element return "DROP TYPE %s" % (self.preparer.format_type(type_)) def visit_create_domain_type(self, create, **kw): domain: DOMAIN = create.element options = [] if domain.collation is not None: options.append(f"COLLATE {self.preparer.quote(domain.collation)}") if domain.default is not None: default = self.render_default_string(domain.default) options.append(f"DEFAULT {default}") if domain.constraint_name is not None: name = self.preparer.truncate_and_render_constraint_name( domain.constraint_name ) options.append(f"CONSTRAINT {name}") if domain.not_null: options.append("NOT NULL") if domain.check is not None: check = self.sql_compiler.process( domain.check, include_table=False, literal_binds=True ) options.append(f"CHECK ({check})") return ( f"CREATE DOMAIN {self.preparer.format_type(domain)} AS " f"{self.type_compiler.process(domain.data_type)} " f"{' '.join(options)}" ) def visit_drop_domain_type(self, drop, **kw): domain = drop.element return f"DROP DOMAIN {self.preparer.format_type(domain)}" def visit_create_index(self, create, **kw): preparer = self.preparer index = create.element self._verify_index_table(index) text = "CREATE " if index.unique: text += "UNIQUE " text += "INDEX " if self.dialect._supports_create_index_concurrently: concurrently = index.dialect_options["postgresql"]["concurrently"] if concurrently: text += "CONCURRENTLY " if create.if_not_exists: text += "IF NOT EXISTS " text += "%s ON %s " % ( self._prepared_index_name(index, include_schema=False), preparer.format_table(index.table), ) using = index.dialect_options["postgresql"]["using"] if using: text += ( "USING %s " % self.preparer.validate_sql_phrase(using, IDX_USING).lower() ) ops = index.dialect_options["postgresql"]["ops"] text += "(%s)" % ( ", ".join( [ self.sql_compiler.process( ( expr.self_group() if not isinstance(expr, expression.ColumnClause) else expr ), include_table=False, literal_binds=True, ) + ( (" " + ops[expr.key]) if hasattr(expr, "key") and expr.key in ops else "" ) for expr in index.expressions ] ) ) includeclause = index.dialect_options["postgresql"]["include"] if includeclause: inclusions = [ index.table.c[col] if isinstance(col, str) else col for col in includeclause ] text += " INCLUDE (%s)" % ", ".join( [preparer.quote(c.name) for c in inclusions] ) nulls_not_distinct = index.dialect_options["postgresql"][ "nulls_not_distinct" ] if nulls_not_distinct is True: text += " NULLS NOT DISTINCT" elif nulls_not_distinct is False: text += " NULLS DISTINCT" withclause = index.dialect_options["postgresql"]["with"] if withclause: text += " WITH (%s)" % ( ", ".join( [ "%s = %s" % storage_parameter for storage_parameter in withclause.items() ] ) ) tablespace_name = index.dialect_options["postgresql"]["tablespace"] if tablespace_name: text += " TABLESPACE %s" % preparer.quote(tablespace_name) whereclause = index.dialect_options["postgresql"]["where"] if whereclause is not None: whereclause = coercions.expect( roles.DDLExpressionRole, whereclause ) where_compiled = self.sql_compiler.process( whereclause, include_table=False, literal_binds=True ) text += " WHERE " + where_compiled return text def define_unique_constraint_distinct(self, constraint, **kw): nulls_not_distinct = constraint.dialect_options["postgresql"][ "nulls_not_distinct" ] if nulls_not_distinct is True: nulls_not_distinct_param = "NULLS NOT DISTINCT " elif nulls_not_distinct is False: nulls_not_distinct_param = "NULLS DISTINCT " else: nulls_not_distinct_param = "" return nulls_not_distinct_param def visit_drop_index(self, drop, **kw): index = drop.element text = "\nDROP INDEX " if self.dialect._supports_drop_index_concurrently: concurrently = index.dialect_options["postgresql"]["concurrently"] if concurrently: text += "CONCURRENTLY " if drop.if_exists: text += "IF EXISTS " text += self._prepared_index_name(index, include_schema=True) return text def visit_exclude_constraint(self, constraint, **kw): text = "" if constraint.name is not None: text += "CONSTRAINT %s " % self.preparer.format_constraint( constraint ) elements = [] kw["include_table"] = False kw["literal_binds"] = True for expr, name, op in constraint._render_exprs: exclude_element = self.sql_compiler.process(expr, **kw) + ( (" " + constraint.ops[expr.key]) if hasattr(expr, "key") and expr.key in constraint.ops else "" ) elements.append("%s WITH %s" % (exclude_element, op)) text += "EXCLUDE USING %s (%s)" % ( self.preparer.validate_sql_phrase( constraint.using, IDX_USING ).lower(), ", ".join(elements), ) if constraint.where is not None: text += " WHERE (%s)" % self.sql_compiler.process( constraint.where, literal_binds=True ) text += self.define_constraint_deferrability(constraint) return text def post_create_table(self, table): table_opts = [] pg_opts = table.dialect_options["postgresql"] inherits = pg_opts.get("inherits") if inherits is not None: if not isinstance(inherits, (list, tuple)): inherits = (inherits,) table_opts.append( "\n INHERITS ( " + ", ".join(self.preparer.quote(name) for name in inherits) + " )" ) if pg_opts["partition_by"]: table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"]) if pg_opts["using"]: table_opts.append("\n USING %s" % pg_opts["using"]) if pg_opts["with_oids"] is True: table_opts.append("\n WITH OIDS") elif pg_opts["with_oids"] is False: table_opts.append("\n WITHOUT OIDS") if pg_opts["on_commit"]: on_commit_options = pg_opts["on_commit"].replace("_", " ").upper() table_opts.append("\n ON COMMIT %s" % on_commit_options) if pg_opts["tablespace"]: tablespace_name = pg_opts["tablespace"] table_opts.append( "\n TABLESPACE %s" % self.preparer.quote(tablespace_name) ) return "".join(table_opts) def visit_computed_column(self, generated, **kw): if generated.persisted is False: raise exc.CompileError( "PostrgreSQL computed columns do not support 'virtual' " "persistence; set the 'persisted' flag to None or True for " "PostgreSQL support." ) return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process( generated.sqltext, include_table=False, literal_binds=True ) def visit_create_sequence(self, create, **kw): prefix = None if create.element.data_type is not None: prefix = " AS %s" % self.type_compiler.process( create.element.data_type ) return super().visit_create_sequence(create, prefix=prefix, **kw) def _can_comment_on_constraint(self, ddl_instance): constraint = ddl_instance.element if constraint.name is None: raise exc.CompileError( f"Can't emit COMMENT ON for constraint {constraint!r}: " "it has no name" ) if constraint.table is None: raise exc.CompileError( f"Can't emit COMMENT ON for constraint {constraint!r}: " "it has no associated table" ) def visit_set_constraint_comment(self, create, **kw): self._can_comment_on_constraint(create) return "COMMENT ON CONSTRAINT %s ON %s IS %s" % ( self.preparer.format_constraint(create.element), self.preparer.format_table(create.element.table), self.sql_compiler.render_literal_value( create.element.comment, sqltypes.String() ), ) def visit_drop_constraint_comment(self, drop, **kw): self._can_comment_on_constraint(drop) return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % ( self.preparer.format_constraint(drop.element), self.preparer.format_table(drop.element.table), ) class PGTypeCompiler(compiler.GenericTypeCompiler): def visit_TSVECTOR(self, type_, **kw): return "TSVECTOR" def visit_TSQUERY(self, type_, **kw): return "TSQUERY" def visit_INET(self, type_, **kw): return "INET" def visit_CIDR(self, type_, **kw): return "CIDR" def visit_CITEXT(self, type_, **kw): return "CITEXT" def visit_MACADDR(self, type_, **kw): return "MACADDR" def visit_MACADDR8(self, type_, **kw): return "MACADDR8" def visit_MONEY(self, type_, **kw): return "MONEY" def visit_OID(self, type_, **kw): return "OID" def visit_REGCONFIG(self, type_, **kw): return "REGCONFIG" def visit_REGCLASS(self, type_, **kw): return "REGCLASS" def visit_FLOAT(self, type_, **kw): if not type_.precision: return "FLOAT" else: return "FLOAT(%(precision)s)" % {"precision": type_.precision} def visit_double(self, type_, **kw): return self.visit_DOUBLE_PRECISION(type, **kw) def visit_BIGINT(self, type_, **kw): return "BIGINT" def visit_HSTORE(self, type_, **kw): return "HSTORE" def visit_JSON(self, type_, **kw): return "JSON" def visit_JSONB(self, type_, **kw): return "JSONB" def visit_INT4MULTIRANGE(self, type_, **kw): return "INT4MULTIRANGE" def visit_INT8MULTIRANGE(self, type_, **kw): return "INT8MULTIRANGE" def visit_NUMMULTIRANGE(self, type_, **kw): return "NUMMULTIRANGE" def visit_DATEMULTIRANGE(self, type_, **kw): return "DATEMULTIRANGE" def visit_TSMULTIRANGE(self, type_, **kw): return "TSMULTIRANGE" def visit_TSTZMULTIRANGE(self, type_, **kw): return "TSTZMULTIRANGE" def visit_INT4RANGE(self, type_, **kw): return "INT4RANGE" def visit_INT8RANGE(self, type_, **kw): return "INT8RANGE" def visit_NUMRANGE(self, type_, **kw): return "NUMRANGE" def visit_DATERANGE(self, type_, **kw): return "DATERANGE" def visit_TSRANGE(self, type_, **kw): return "TSRANGE" def visit_TSTZRANGE(self, type_, **kw): return "TSTZRANGE" def visit_json_int_index(self, type_, **kw): return "INT" def visit_json_str_index(self, type_, **kw): return "TEXT" def visit_datetime(self, type_, **kw): return self.visit_TIMESTAMP(type_, **kw) def visit_enum(self, type_, **kw): if not type_.native_enum or not self.dialect.supports_native_enum: return super().visit_enum(type_, **kw) else: return self.visit_ENUM(type_, **kw) def visit_ENUM(self, type_, identifier_preparer=None, **kw): if identifier_preparer is None: identifier_preparer = self.dialect.identifier_preparer return identifier_preparer.format_type(type_) def visit_DOMAIN(self, type_, identifier_preparer=None, **kw): if identifier_preparer is None: identifier_preparer = self.dialect.identifier_preparer return identifier_preparer.format_type(type_) def visit_TIMESTAMP(self, type_, **kw): return "TIMESTAMP%s %s" % ( ( "(%d)" % type_.precision if getattr(type_, "precision", None) is not None else "" ), (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", ) def visit_TIME(self, type_, **kw): return "TIME%s %s" % ( ( "(%d)" % type_.precision if getattr(type_, "precision", None) is not None else "" ), (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", ) def visit_INTERVAL(self, type_, **kw): text = "INTERVAL" if type_.fields is not None: text += " " + type_.fields if type_.precision is not None: text += " (%d)" % type_.precision return text def visit_BIT(self, type_, **kw): if type_.varying: compiled = "BIT VARYING" if type_.length is not None: compiled += "(%d)" % type_.length else: compiled = "BIT(%d)" % type_.length return compiled def visit_uuid(self, type_, **kw): if type_.native_uuid: return self.visit_UUID(type_, **kw) else: return super().visit_uuid(type_, **kw) def visit_UUID(self, type_, **kw): return "UUID" def visit_large_binary(self, type_, **kw): return self.visit_BYTEA(type_, **kw) def visit_BYTEA(self, type_, **kw): return "BYTEA" def visit_ARRAY(self, type_, **kw): inner = self.process(type_.item_type, **kw) return re.sub( r"((?: COLLATE.*)?)$", ( r"%s\1" % ( "[]" * (type_.dimensions if type_.dimensions is not None else 1) ) ), inner, count=1, ) def visit_json_path(self, type_, **kw): return self.visit_JSONPATH(type_, **kw) def visit_JSONPATH(self, type_, **kw): return "JSONPATH" class PGIdentifierPreparer(compiler.IdentifierPreparer): reserved_words = RESERVED_WORDS def _unquote_identifier(self, value): if value[0] == self.initial_quote: value = value[1:-1].replace( self.escape_to_quote, self.escape_quote ) return value def format_type(self, type_, use_schema=True): if not type_.name: raise exc.CompileError( f"PostgreSQL {type_.__class__.__name__} type requires a name." ) name = self.quote(type_.name) effective_schema = self.schema_for_object(type_) if ( not self.omit_schema and use_schema and effective_schema is not None ): name = f"{self.quote_schema(effective_schema)}.{name}" return name class ReflectedNamedType(TypedDict): """Represents a reflected named type.""" name: str """Name of the type.""" schema: str """The schema of the type.""" visible: bool """Indicates if this type is in the current search path.""" class ReflectedDomainConstraint(TypedDict): """Represents a reflect check constraint of a domain.""" name: str """Name of the constraint.""" check: str """The check constraint text.""" class ReflectedDomain(ReflectedNamedType): """Represents a reflected enum.""" type: str """The string name of the underlying data type of the domain.""" nullable: bool """Indicates if the domain allows null or not.""" default: Optional[str] """The string representation of the default value of this domain or ``None`` if none present. """ constraints: List[ReflectedDomainConstraint] """The constraints defined in the domain, if any. The constraint are in order of evaluation by postgresql. """ collation: Optional[str] """The collation for the domain.""" class ReflectedEnum(ReflectedNamedType): """Represents a reflected enum.""" labels: List[str] """The labels that compose the enum.""" class PGInspector(reflection.Inspector): dialect: PGDialect def get_table_oid( self, table_name: str, schema: Optional[str] = None ) -> int: """Return the OID for the given table name. :param table_name: string name of the table. For special quoting, use :class:`.quoted_name`. :param schema: string schema name; if omitted, uses the default schema of the database connection. For special quoting, use :class:`.quoted_name`. """ with self._operation_context() as conn: return self.dialect.get_table_oid( conn, table_name, schema, info_cache=self.info_cache ) def get_domains( self, schema: Optional[str] = None ) -> List[ReflectedDomain]: """Return a list of DOMAIN objects. Each member is a dictionary containing these fields: * name - name of the domain * schema - the schema name for the domain. * visible - boolean, whether or not this domain is visible in the default search path. * type - the type defined by this domain. * nullable - Indicates if this domain can be ``NULL``. * default - The default value of the domain or ``None`` if the domain has no default. * constraints - A list of dict wit the constraint defined by this domain. Each element constaints two keys: ``name`` of the constraint and ``check`` with the constraint text. :param schema: schema name. If None, the default schema (typically 'public') is used. May also be set to ``'*'`` to indicate load domains for all schemas. .. versionadded:: 2.0 """ with self._operation_context() as conn: return self.dialect._load_domains( conn, schema, info_cache=self.info_cache ) def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]: """Return a list of ENUM objects. Each member is a dictionary containing these fields: * name - name of the enum * schema - the schema name for the enum. * visible - boolean, whether or not this enum is visible in the default search path. * labels - a list of string labels that apply to the enum. :param schema: schema name. If None, the default schema (typically 'public') is used. May also be set to ``'*'`` to indicate load enums for all schemas. """ with self._operation_context() as conn: return self.dialect._load_enums( conn, schema, info_cache=self.info_cache ) def get_foreign_table_names( self, schema: Optional[str] = None ) -> List[str]: """Return a list of FOREIGN TABLE names. Behavior is similar to that of :meth:`_reflection.Inspector.get_table_names`, except that the list is limited to those tables that report a ``relkind`` value of ``f``. """ with self._operation_context() as conn: return self.dialect._get_foreign_table_names( conn, schema, info_cache=self.info_cache ) def has_type( self, type_name: str, schema: Optional[str] = None, **kw: Any ) -> bool: """Return if the database has the specified type in the provided schema. :param type_name: the type to check. :param schema: schema name. If None, the default schema (typically 'public') is used. May also be set to ``'*'`` to check in all schemas. .. versionadded:: 2.0 """ with self._operation_context() as conn: return self.dialect.has_type( conn, type_name, schema, info_cache=self.info_cache ) class PGExecutionContext(default.DefaultExecutionContext): def fire_sequence(self, seq, type_): return self._execute_scalar( ( "select nextval('%s')" % self.identifier_preparer.format_sequence(seq) ), type_, ) def get_insert_default(self, column): if column.primary_key and column is column.table._autoincrement_column: if column.server_default and column.server_default.has_argument: # pre-execute passive defaults on primary key columns return self._execute_scalar( "select %s" % column.server_default.arg, column.type ) elif column.default is None or ( column.default.is_sequence and column.default.optional ): # execute the sequence associated with a SERIAL primary # key column. for non-primary-key SERIAL, the ID just # generates server side. try: seq_name = column._postgresql_seq_name except AttributeError: tab = column.table.name col = column.name tab = tab[0 : 29 + max(0, (29 - len(col)))] col = col[0 : 29 + max(0, (29 - len(tab)))] name = "%s_%s_seq" % (tab, col) column._postgresql_seq_name = seq_name = name if column.table is not None: effective_schema = self.connection.schema_for_object( column.table ) else: effective_schema = None if effective_schema is not None: exc = 'select nextval(\'"%s"."%s"\')' % ( effective_schema, seq_name, ) else: exc = "select nextval('\"%s\"')" % (seq_name,) return self._execute_scalar(exc, column.type) return super().get_insert_default(column) class PGReadOnlyConnectionCharacteristic( characteristics.ConnectionCharacteristic ): transactional = True def reset_characteristic(self, dialect, dbapi_conn): dialect.set_readonly(dbapi_conn, False) def set_characteristic(self, dialect, dbapi_conn, value): dialect.set_readonly(dbapi_conn, value) def get_characteristic(self, dialect, dbapi_conn): return dialect.get_readonly(dbapi_conn) class PGDeferrableConnectionCharacteristic( characteristics.ConnectionCharacteristic ): transactional = True def reset_characteristic(self, dialect, dbapi_conn): dialect.set_deferrable(dbapi_conn, False) def set_characteristic(self, dialect, dbapi_conn, value): dialect.set_deferrable(dbapi_conn, value) def get_characteristic(self, dialect, dbapi_conn): return dialect.get_deferrable(dbapi_conn) class PGDialect(default.DefaultDialect): name = "postgresql" supports_statement_cache = True supports_alter = True max_identifier_length = 63 supports_sane_rowcount = True bind_typing = interfaces.BindTyping.RENDER_CASTS supports_native_enum = True supports_native_boolean = True supports_native_uuid = True supports_smallserial = True supports_sequences = True sequences_optional = True preexecute_autoincrement_sequences = True postfetch_lastrowid = False use_insertmanyvalues = True returns_native_bytes = True insertmanyvalues_implicit_sentinel = ( InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS ) supports_comments = True supports_constraint_comments = True supports_default_values = True supports_default_metavalue = True supports_empty_insert = False supports_multivalues_insert = True supports_identity_columns = True default_paramstyle = "pyformat" ischema_names = ischema_names colspecs = colspecs statement_compiler = PGCompiler ddl_compiler = PGDDLCompiler type_compiler_cls = PGTypeCompiler preparer = PGIdentifierPreparer execution_ctx_cls = PGExecutionContext inspector = PGInspector update_returning = True delete_returning = True insert_returning = True update_returning_multifrom = True delete_returning_multifrom = True connection_characteristics = ( default.DefaultDialect.connection_characteristics ) connection_characteristics = connection_characteristics.union( { "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), } ) construct_arguments = [ ( schema.Index, { "using": False, "include": None, "where": None, "ops": {}, "concurrently": False, "with": {}, "tablespace": None, "nulls_not_distinct": None, }, ), ( schema.Table, { "ignore_search_path": False, "tablespace": None, "partition_by": None, "with_oids": None, "on_commit": None, "inherits": None, "using": None, }, ), ( schema.CheckConstraint, { "not_valid": False, }, ), ( schema.ForeignKeyConstraint, { "not_valid": False, }, ), ( schema.UniqueConstraint, {"nulls_not_distinct": None}, ), ] reflection_options = ("postgresql_ignore_search_path",) _backslash_escapes = True _supports_create_index_concurrently = True _supports_drop_index_concurrently = True def __init__( self, native_inet_types=None, json_serializer=None, json_deserializer=None, **kwargs, ): default.DefaultDialect.__init__(self, **kwargs) self._native_inet_types = native_inet_types self._json_deserializer = json_deserializer self._json_serializer = json_serializer def initialize(self, connection): super().initialize(connection) # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689 self.supports_smallserial = self.server_version_info >= (9, 2) self._set_backslash_escapes(connection) self._supports_drop_index_concurrently = self.server_version_info >= ( 9, 2, ) self.supports_identity_columns = self.server_version_info >= (10,) def get_isolation_level_values(self, dbapi_conn): # note the generic dialect doesn't have AUTOCOMMIT, however # all postgresql dialects should include AUTOCOMMIT. return ( "SERIALIZABLE", "READ UNCOMMITTED", "READ COMMITTED", "REPEATABLE READ", ) def set_isolation_level(self, dbapi_connection, level): cursor = dbapi_connection.cursor() cursor.execute( "SET SESSION CHARACTERISTICS AS TRANSACTION " f"ISOLATION LEVEL {level}" ) cursor.execute("COMMIT") cursor.close() def get_isolation_level(self, dbapi_connection): cursor = dbapi_connection.cursor() cursor.execute("show transaction isolation level") val = cursor.fetchone()[0] cursor.close() return val.upper() def set_readonly(self, connection, value): raise NotImplementedError() def get_readonly(self, connection): raise NotImplementedError() def set_deferrable(self, connection, value): raise NotImplementedError() def get_deferrable(self, connection): raise NotImplementedError() def _split_multihost_from_url(self, url: URL) -> Union[ Tuple[None, None], Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]], ]: hosts: Optional[Tuple[Optional[str], ...]] = None ports_str: Union[str, Tuple[Optional[str], ...], None] = None integrated_multihost = False if "host" in url.query: if isinstance(url.query["host"], (list, tuple)): integrated_multihost = True hosts, ports_str = zip( *[ token.split(":") if ":" in token else (token, None) for token in url.query["host"] ] ) elif isinstance(url.query["host"], str): hosts = tuple(url.query["host"].split(",")) if ( "port" not in url.query and len(hosts) == 1 and ":" in hosts[0] ): # internet host is alphanumeric plus dots or hyphens. # this is essentially rfc1123, which refers to rfc952. # https://stackoverflow.com/questions/3523028/ # valid-characters-of-a-hostname host_port_match = re.match( r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0] ) if host_port_match: integrated_multihost = True h, p = host_port_match.group(1, 2) if TYPE_CHECKING: assert isinstance(h, str) assert isinstance(p, str) hosts = (h,) ports_str = cast( "Tuple[Optional[str], ...]", (p,) if p else (None,) ) if "port" in url.query: if integrated_multihost: raise exc.ArgumentError( "Can't mix 'multihost' formats together; use " '"host=h1,h2,h3&port=p1,p2,p3" or ' '"host=h1:p1&host=h2:p2&host=h3:p3" separately' ) if isinstance(url.query["port"], (list, tuple)): ports_str = url.query["port"] elif isinstance(url.query["port"], str): ports_str = tuple(url.query["port"].split(",")) ports: Optional[Tuple[Optional[int], ...]] = None if ports_str: try: ports = tuple(int(x) if x else None for x in ports_str) except ValueError: raise exc.ArgumentError( f"Received non-integer port arguments: {ports_str}" ) from None if ports and ( (not hosts and len(ports) > 1) or ( hosts and ports and len(hosts) != len(ports) and (len(hosts) > 1 or len(ports) > 1) ) ): raise exc.ArgumentError("number of hosts and ports don't match") if hosts is not None: if ports is None: ports = tuple(None for _ in hosts) return hosts, ports # type: ignore def do_begin_twophase(self, connection, xid): self.do_begin(connection.connection) def do_prepare_twophase(self, connection, xid): connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) def do_rollback_twophase( self, connection, xid, is_prepared=True, recover=False ): if is_prepared: if recover: # FIXME: ugly hack to get out of transaction # context when committing recoverable transactions # Must find out a way how to make the dbapi not # open a transaction. connection.exec_driver_sql("ROLLBACK") connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) connection.exec_driver_sql("BEGIN") self.do_rollback(connection.connection) else: self.do_rollback(connection.connection) def do_commit_twophase( self, connection, xid, is_prepared=True, recover=False ): if is_prepared: if recover: connection.exec_driver_sql("ROLLBACK") connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) connection.exec_driver_sql("BEGIN") self.do_rollback(connection.connection) else: self.do_commit(connection.connection) def do_recover_twophase(self, connection): return connection.scalars( sql.text("SELECT gid FROM pg_prepared_xacts") ).all() def _get_default_schema_name(self, connection): return connection.exec_driver_sql("select current_schema()").scalar() @reflection.cache def has_schema(self, connection, schema, **kw): query = select(pg_catalog.pg_namespace.c.nspname).where( pg_catalog.pg_namespace.c.nspname == schema ) return bool(connection.scalar(query)) def _pg_class_filter_scope_schema( self, query, schema, scope, pg_class_table=None ): if pg_class_table is None: pg_class_table = pg_catalog.pg_class query = query.join( pg_catalog.pg_namespace, pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, ) if scope is ObjectScope.DEFAULT: query = query.where(pg_class_table.c.relpersistence != "t") elif scope is ObjectScope.TEMPORARY: query = query.where(pg_class_table.c.relpersistence == "t") if schema is None: query = query.where( pg_catalog.pg_table_is_visible(pg_class_table.c.oid), # ignore pg_catalog schema pg_catalog.pg_namespace.c.nspname != "pg_catalog", ) else: query = query.where(pg_catalog.pg_namespace.c.nspname == schema) return query def _pg_class_relkind_condition(self, relkinds, pg_class_table=None): if pg_class_table is None: pg_class_table = pg_catalog.pg_class # uses the any form instead of in otherwise postgresql complaings # that 'IN could not convert type character to "char"' return pg_class_table.c.relkind == sql.any_(_array.array(relkinds)) @lru_cache() def _has_table_query(self, schema): query = select(pg_catalog.pg_class.c.relname).where( pg_catalog.pg_class.c.relname == bindparam("table_name"), self._pg_class_relkind_condition( pg_catalog.RELKINDS_ALL_TABLE_LIKE ), ) return self._pg_class_filter_scope_schema( query, schema, scope=ObjectScope.ANY ) @reflection.cache def has_table(self, connection, table_name, schema=None, **kw): self._ensure_has_table_connection(connection) query = self._has_table_query(schema) return bool(connection.scalar(query, {"table_name": table_name})) @reflection.cache def has_sequence(self, connection, sequence_name, schema=None, **kw): query = select(pg_catalog.pg_class.c.relname).where( pg_catalog.pg_class.c.relkind == "S", pg_catalog.pg_class.c.relname == sequence_name, ) query = self._pg_class_filter_scope_schema( query, schema, scope=ObjectScope.ANY ) return bool(connection.scalar(query)) @reflection.cache def has_type(self, connection, type_name, schema=None, **kw): query = ( select(pg_catalog.pg_type.c.typname) .join( pg_catalog.pg_namespace, pg_catalog.pg_namespace.c.oid == pg_catalog.pg_type.c.typnamespace, ) .where(pg_catalog.pg_type.c.typname == type_name) ) if schema is None: query = query.where( pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), # ignore pg_catalog schema pg_catalog.pg_namespace.c.nspname != "pg_catalog", ) elif schema != "*": query = query.where(pg_catalog.pg_namespace.c.nspname == schema) return bool(connection.scalar(query)) def _get_server_version_info(self, connection): v = connection.exec_driver_sql("select pg_catalog.version()").scalar() m = re.match( r".*(?:PostgreSQL|EnterpriseDB) " r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?", v, ) if not m: raise AssertionError( "Could not determine version from string '%s'" % v ) return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) @reflection.cache def get_table_oid(self, connection, table_name, schema=None, **kw): """Fetch the oid for schema.table_name.""" query = select(pg_catalog.pg_class.c.oid).where( pg_catalog.pg_class.c.relname == table_name, self._pg_class_relkind_condition( pg_catalog.RELKINDS_ALL_TABLE_LIKE ), ) query = self._pg_class_filter_scope_schema( query, schema, scope=ObjectScope.ANY ) table_oid = connection.scalar(query) if table_oid is None: raise exc.NoSuchTableError( f"{schema}.{table_name}" if schema else table_name ) return table_oid @reflection.cache def get_schema_names(self, connection, **kw): query = ( select(pg_catalog.pg_namespace.c.nspname) .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%")) .order_by(pg_catalog.pg_namespace.c.nspname) ) return connection.scalars(query).all() def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope): query = select(pg_catalog.pg_class.c.relname).where( self._pg_class_relkind_condition(relkinds) ) query = self._pg_class_filter_scope_schema(query, schema, scope=scope) return connection.scalars(query).all() @reflection.cache def get_table_names(self, connection, schema=None, **kw): return self._get_relnames_for_relkinds( connection, schema, pg_catalog.RELKINDS_TABLE_NO_FOREIGN, scope=ObjectScope.DEFAULT, ) @reflection.cache def get_temp_table_names(self, connection, **kw): return self._get_relnames_for_relkinds( connection, schema=None, relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN, scope=ObjectScope.TEMPORARY, ) @reflection.cache def _get_foreign_table_names(self, connection, schema=None, **kw): return self._get_relnames_for_relkinds( connection, schema, relkinds=("f",), scope=ObjectScope.ANY ) @reflection.cache def get_view_names(self, connection, schema=None, **kw): return self._get_relnames_for_relkinds( connection, schema, pg_catalog.RELKINDS_VIEW, scope=ObjectScope.DEFAULT, ) @reflection.cache def get_materialized_view_names(self, connection, schema=None, **kw): return self._get_relnames_for_relkinds( connection, schema, pg_catalog.RELKINDS_MAT_VIEW, scope=ObjectScope.DEFAULT, ) @reflection.cache def get_temp_view_names(self, connection, schema=None, **kw): return self._get_relnames_for_relkinds( connection, schema, # NOTE: do not include temp materialzied views (that do not # seem to be a thing at least up to version 14) pg_catalog.RELKINDS_VIEW, scope=ObjectScope.TEMPORARY, ) @reflection.cache def get_sequence_names(self, connection, schema=None, **kw): return self._get_relnames_for_relkinds( connection, schema, relkinds=("S",), scope=ObjectScope.ANY ) @reflection.cache def get_view_definition(self, connection, view_name, schema=None, **kw): query = ( select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid)) .select_from(pg_catalog.pg_class) .where( pg_catalog.pg_class.c.relname == view_name, self._pg_class_relkind_condition( pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW ), ) ) query = self._pg_class_filter_scope_schema( query, schema, scope=ObjectScope.ANY ) res = connection.scalar(query) if res is None: raise exc.NoSuchTableError( f"{schema}.{view_name}" if schema else view_name ) else: return res def _value_or_raise(self, data, table, schema): try: return dict(data)[(schema, table)] except KeyError: raise exc.NoSuchTableError( f"{schema}.{table}" if schema else table ) from None def _prepare_filter_names(self, filter_names): if filter_names: return True, {"filter_names": filter_names} else: return False, {} def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]: if kind is ObjectKind.ANY: return pg_catalog.RELKINDS_ALL_TABLE_LIKE relkinds = () if ObjectKind.TABLE in kind: relkinds += pg_catalog.RELKINDS_TABLE if ObjectKind.VIEW in kind: relkinds += pg_catalog.RELKINDS_VIEW if ObjectKind.MATERIALIZED_VIEW in kind: relkinds += pg_catalog.RELKINDS_MAT_VIEW return relkinds @reflection.cache def get_columns(self, connection, table_name, schema=None, **kw): data = self.get_multi_columns( connection, schema=schema, filter_names=[table_name], scope=ObjectScope.ANY, kind=ObjectKind.ANY, **kw, ) return self._value_or_raise(data, table_name, schema) @lru_cache() def _columns_query(self, schema, has_filter_names, scope, kind): # NOTE: the query with the default and identity options scalar # subquery is faster than trying to use outer joins for them generated = ( pg_catalog.pg_attribute.c.attgenerated.label("generated") if self.server_version_info >= (12,) else sql.null().label("generated") ) if self.server_version_info >= (10,): # join lateral performs worse (~2x slower) than a scalar_subquery identity = ( select( sql.func.json_build_object( "always", pg_catalog.pg_attribute.c.attidentity == "a", "start", pg_catalog.pg_sequence.c.seqstart, "increment", pg_catalog.pg_sequence.c.seqincrement, "minvalue", pg_catalog.pg_sequence.c.seqmin, "maxvalue", pg_catalog.pg_sequence.c.seqmax, "cache", pg_catalog.pg_sequence.c.seqcache, "cycle", pg_catalog.pg_sequence.c.seqcycle, ) ) .select_from(pg_catalog.pg_sequence) .where( # attidentity != '' is required or it will reflect also # serial columns as identity. pg_catalog.pg_attribute.c.attidentity != "", pg_catalog.pg_sequence.c.seqrelid == sql.cast( sql.cast( pg_catalog.pg_get_serial_sequence( sql.cast( sql.cast( pg_catalog.pg_attribute.c.attrelid, REGCLASS, ), TEXT, ), pg_catalog.pg_attribute.c.attname, ), REGCLASS, ), OID, ), ) .correlate(pg_catalog.pg_attribute) .scalar_subquery() .label("identity_options") ) else: identity = sql.null().label("identity_options") # join lateral performs the same as scalar_subquery here default = ( select( pg_catalog.pg_get_expr( pg_catalog.pg_attrdef.c.adbin, pg_catalog.pg_attrdef.c.adrelid, ) ) .select_from(pg_catalog.pg_attrdef) .where( pg_catalog.pg_attrdef.c.adrelid == pg_catalog.pg_attribute.c.attrelid, pg_catalog.pg_attrdef.c.adnum == pg_catalog.pg_attribute.c.attnum, pg_catalog.pg_attribute.c.atthasdef, ) .correlate(pg_catalog.pg_attribute) .scalar_subquery() .label("default") ) relkinds = self._kind_to_relkinds(kind) query = ( select( pg_catalog.pg_attribute.c.attname.label("name"), pg_catalog.format_type( pg_catalog.pg_attribute.c.atttypid, pg_catalog.pg_attribute.c.atttypmod, ).label("format_type"), default, pg_catalog.pg_attribute.c.attnotnull.label("not_null"), pg_catalog.pg_class.c.relname.label("table_name"), pg_catalog.pg_description.c.description.label("comment"), generated, identity, ) .select_from(pg_catalog.pg_class) # NOTE: postgresql support table with no user column, meaning # there is no row with pg_attribute.attnum > 0. use a left outer # join to avoid filtering these tables. .outerjoin( pg_catalog.pg_attribute, sql.and_( pg_catalog.pg_class.c.oid == pg_catalog.pg_attribute.c.attrelid, pg_catalog.pg_attribute.c.attnum > 0, ~pg_catalog.pg_attribute.c.attisdropped, ), ) .outerjoin( pg_catalog.pg_description, sql.and_( pg_catalog.pg_description.c.objoid == pg_catalog.pg_attribute.c.attrelid, pg_catalog.pg_description.c.objsubid == pg_catalog.pg_attribute.c.attnum, ), ) .where(self._pg_class_relkind_condition(relkinds)) .order_by( pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum ) ) query = self._pg_class_filter_scope_schema(query, schema, scope=scope) if has_filter_names: query = query.where( pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) ) return query def get_multi_columns( self, connection, schema, filter_names, scope, kind, **kw ): has_filter_names, params = self._prepare_filter_names(filter_names) query = self._columns_query(schema, has_filter_names, scope, kind) rows = connection.execute(query, params).mappings() # dictionary with (name, ) if default search path or (schema, name) # as keys domains = { ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d for d in self._load_domains( connection, schema="*", info_cache=kw.get("info_cache") ) } # dictionary with (name, ) if default search path or (schema, name) # as keys enums = dict( ( ((rec["name"],), rec) if rec["visible"] else ((rec["schema"], rec["name"]), rec) ) for rec in self._load_enums( connection, schema="*", info_cache=kw.get("info_cache") ) ) columns = self._get_columns_info(rows, domains, enums, schema) return columns.items() _format_type_args_pattern = re.compile(r"\((.*)\)") _format_type_args_delim = re.compile(r"\s*,\s*") _format_array_spec_pattern = re.compile(r"((?:\[\])*)$") def _reflect_type( self, format_type: Optional[str], domains: dict[str, ReflectedDomain], enums: dict[str, ReflectedEnum], type_description: str, ) -> sqltypes.TypeEngine[Any]: """ Attempts to reconstruct a column type defined in ischema_names based on the information available in the format_type. If the `format_type` cannot be associated with a known `ischema_names`, it is treated as a reference to a known PostgreSQL named `ENUM` or `DOMAIN` type. """ type_description = type_description or "unknown type" if format_type is None: util.warn( "PostgreSQL format_type() returned NULL for %s" % type_description ) return sqltypes.NULLTYPE attype_args_match = self._format_type_args_pattern.search(format_type) if attype_args_match and attype_args_match.group(1): attype_args = self._format_type_args_delim.split( attype_args_match.group(1) ) else: attype_args = () match_array_dim = self._format_array_spec_pattern.search(format_type) # Each "[]" in array specs corresponds to an array dimension array_dim = len(match_array_dim.group(1) or "") // 2 # Remove all parameters and array specs from format_type to obtain an # ischema_name candidate attype = self._format_type_args_pattern.sub("", format_type) attype = self._format_array_spec_pattern.sub("", attype) schema_type = self.ischema_names.get(attype.lower(), None) args, kwargs = (), {} if attype == "numeric": if len(attype_args) == 2: precision, scale = map(int, attype_args) args = (precision, scale) elif attype == "double precision": args = (53,) elif attype == "integer": args = () elif attype in ("timestamp with time zone", "time with time zone"): kwargs["timezone"] = True if len(attype_args) == 1: kwargs["precision"] = int(attype_args[0]) elif attype in ( "timestamp without time zone", "time without time zone", "time", ): kwargs["timezone"] = False if len(attype_args) == 1: kwargs["precision"] = int(attype_args[0]) elif attype == "bit varying": kwargs["varying"] = True if len(attype_args) == 1: charlen = int(attype_args[0]) args = (charlen,) elif attype.startswith("interval"): schema_type = INTERVAL field_match = re.match(r"interval (.+)", attype) if field_match: kwargs["fields"] = field_match.group(1) if len(attype_args) == 1: kwargs["precision"] = int(attype_args[0]) else: enum_or_domain_key = tuple(util.quoted_token_parser(attype)) if enum_or_domain_key in enums: schema_type = ENUM enum = enums[enum_or_domain_key] args = tuple(enum["labels"]) kwargs["name"] = enum["name"] if not enum["visible"]: kwargs["schema"] = enum["schema"] args = tuple(enum["labels"]) elif enum_or_domain_key in domains: schema_type = DOMAIN domain = domains[enum_or_domain_key] data_type = self._reflect_type( domain["type"], domains, enums, type_description="DOMAIN '%s'" % domain["name"], ) args = (domain["name"], data_type) kwargs["collation"] = domain["collation"] kwargs["default"] = domain["default"] kwargs["not_null"] = not domain["nullable"] kwargs["create_type"] = False if domain["constraints"]: # We only support a single constraint check_constraint = domain["constraints"][0] kwargs["constraint_name"] = check_constraint["name"] kwargs["check"] = check_constraint["check"] if not domain["visible"]: kwargs["schema"] = domain["schema"] else: try: charlen = int(attype_args[0]) args = (charlen, *attype_args[1:]) except (ValueError, IndexError): args = attype_args if not schema_type: util.warn( "Did not recognize type '%s' of %s" % (attype, type_description) ) return sqltypes.NULLTYPE data_type = schema_type(*args, **kwargs) if array_dim >= 1: # postgres does not preserve dimensionality or size of array types. data_type = _array.ARRAY(data_type) return data_type def _get_columns_info(self, rows, domains, enums, schema): columns = defaultdict(list) for row_dict in rows: # ensure that each table has an entry, even if it has no columns if row_dict["name"] is None: columns[(schema, row_dict["table_name"])] = ( ReflectionDefaults.columns() ) continue table_cols = columns[(schema, row_dict["table_name"])] coltype = self._reflect_type( row_dict["format_type"], domains, enums, type_description="column '%s'" % row_dict["name"], ) default = row_dict["default"] name = row_dict["name"] generated = row_dict["generated"] nullable = not row_dict["not_null"] if isinstance(coltype, DOMAIN): if not default: # domain can override the default value but # cant set it to None if coltype.default is not None: default = coltype.default nullable = nullable and not coltype.not_null identity = row_dict["identity_options"] # If a zero byte or blank string depending on driver (is also # absent for older PG versions), then not a generated column. # Otherwise, s = stored. (Other values might be added in the # future.) if generated not in (None, "", b"\x00"): computed = dict( sqltext=default, persisted=generated in ("s", b"s") ) default = None else: computed = None # adjust the default value autoincrement = False if default is not None: match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) if match is not None: if issubclass(coltype._type_affinity, sqltypes.Integer): autoincrement = True # the default is related to a Sequence if "." not in match.group(2) and schema is not None: # unconditionally quote the schema name. this could # later be enhanced to obey quoting rules / # "quote schema" default = ( match.group(1) + ('"%s"' % schema) + "." + match.group(2) + match.group(3) ) column_info = { "name": name, "type": coltype, "nullable": nullable, "default": default, "autoincrement": autoincrement or identity is not None, "comment": row_dict["comment"], } if computed is not None: column_info["computed"] = computed if identity is not None: column_info["identity"] = identity table_cols.append(column_info) return columns @lru_cache() def _table_oids_query(self, schema, has_filter_names, scope, kind): relkinds = self._kind_to_relkinds(kind) oid_q = select( pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname ).where(self._pg_class_relkind_condition(relkinds)) oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope) if has_filter_names: oid_q = oid_q.where( pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) ) return oid_q @reflection.flexi_cache( ("schema", InternalTraversal.dp_string), ("filter_names", InternalTraversal.dp_string_list), ("kind", InternalTraversal.dp_plain_obj), ("scope", InternalTraversal.dp_plain_obj), ) def _get_table_oids( self, connection, schema, filter_names, scope, kind, **kw ): has_filter_names, params = self._prepare_filter_names(filter_names) oid_q = self._table_oids_query(schema, has_filter_names, scope, kind) result = connection.execute(oid_q, params) return result.all() @lru_cache() def _constraint_query(self, is_unique): con_sq = ( select( pg_catalog.pg_constraint.c.conrelid, pg_catalog.pg_constraint.c.conname, pg_catalog.pg_constraint.c.conindid, sql.func.unnest(pg_catalog.pg_constraint.c.conkey).label( "attnum" ), sql.func.generate_subscripts( pg_catalog.pg_constraint.c.conkey, 1 ).label("ord"), pg_catalog.pg_description.c.description, ) .outerjoin( pg_catalog.pg_description, pg_catalog.pg_description.c.objoid == pg_catalog.pg_constraint.c.oid, ) .where( pg_catalog.pg_constraint.c.contype == bindparam("contype"), pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")), ) .subquery("con") ) attr_sq = ( select( con_sq.c.conrelid, con_sq.c.conname, con_sq.c.conindid, con_sq.c.description, con_sq.c.ord, pg_catalog.pg_attribute.c.attname, ) .select_from(pg_catalog.pg_attribute) .join( con_sq, sql.and_( pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum, pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid, ), ) .where( # NOTE: restate the condition here, since pg15 otherwise # seems to get confused on pscopg2 sometimes, doing # a sequential scan of pg_attribute. # The condition in the con_sq subquery is not actually needed # in pg15, but it may be needed in older versions. Keeping it # does not seems to have any inpact in any case. con_sq.c.conrelid.in_(bindparam("oids")) ) .subquery("attr") ) constraint_query = ( select( attr_sq.c.conrelid, sql.func.array_agg( # NOTE: cast since some postgresql derivatives may # not support array_agg on the name type aggregate_order_by( attr_sq.c.attname.cast(TEXT), attr_sq.c.ord ) ).label("cols"), attr_sq.c.conname, sql.func.min(attr_sq.c.description).label("description"), ) .group_by(attr_sq.c.conrelid, attr_sq.c.conname) .order_by(attr_sq.c.conrelid, attr_sq.c.conname) ) if is_unique: if self.server_version_info >= (15,): constraint_query = constraint_query.join( pg_catalog.pg_index, attr_sq.c.conindid == pg_catalog.pg_index.c.indexrelid, ).add_columns( sql.func.bool_and( pg_catalog.pg_index.c.indnullsnotdistinct ).label("indnullsnotdistinct") ) else: constraint_query = constraint_query.add_columns( sql.false().label("indnullsnotdistinct") ) else: constraint_query = constraint_query.add_columns( sql.null().label("extra") ) return constraint_query def _reflect_constraint( self, connection, contype, schema, filter_names, scope, kind, **kw ): # used to reflect primary and unique constraint table_oids = self._get_table_oids( connection, schema, filter_names, scope, kind, **kw ) batches = list(table_oids) is_unique = contype == "u" while batches: batch = batches[0:3000] batches[0:3000] = [] result = connection.execute( self._constraint_query(is_unique), {"oids": [r[0] for r in batch], "contype": contype}, ) result_by_oid = defaultdict(list) for oid, cols, constraint_name, comment, extra in result: result_by_oid[oid].append( (cols, constraint_name, comment, extra) ) for oid, tablename in batch: for_oid = result_by_oid.get(oid, ()) if for_oid: for cols, constraint, comment, extra in for_oid: if is_unique: yield tablename, cols, constraint, comment, { "nullsnotdistinct": extra } else: yield tablename, cols, constraint, comment, None else: yield tablename, None, None, None, None @reflection.cache def get_pk_constraint(self, connection, table_name, schema=None, **kw): data = self.get_multi_pk_constraint( connection, schema=schema, filter_names=[table_name], scope=ObjectScope.ANY, kind=ObjectKind.ANY, **kw, ) return self._value_or_raise(data, table_name, schema) def get_multi_pk_constraint( self, connection, schema, filter_names, scope, kind, **kw ): result = self._reflect_constraint( connection, "p", schema, filter_names, scope, kind, **kw ) # only a single pk can be present for each table. Return an entry # even if a table has no primary key default = ReflectionDefaults.pk_constraint return ( ( (schema, table_name), ( { "constrained_columns": [] if cols is None else cols, "name": pk_name, "comment": comment, } if pk_name is not None else default() ), ) for table_name, cols, pk_name, comment, _ in result ) @reflection.cache def get_foreign_keys( self, connection, table_name, schema=None, postgresql_ignore_search_path=False, **kw, ): data = self.get_multi_foreign_keys( connection, schema=schema, filter_names=[table_name], postgresql_ignore_search_path=postgresql_ignore_search_path, scope=ObjectScope.ANY, kind=ObjectKind.ANY, **kw, ) return self._value_or_raise(data, table_name, schema) @lru_cache() def _foreing_key_query(self, schema, has_filter_names, scope, kind): pg_class_ref = pg_catalog.pg_class.alias("cls_ref") pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref") relkinds = self._kind_to_relkinds(kind) query = ( select( pg_catalog.pg_class.c.relname, pg_catalog.pg_constraint.c.conname, # NOTE: avoid calling pg_get_constraintdef when not needed # to speed up the query sql.case( ( pg_catalog.pg_constraint.c.oid.is_not(None), pg_catalog.pg_get_constraintdef( pg_catalog.pg_constraint.c.oid, True ), ), else_=None, ), pg_namespace_ref.c.nspname, pg_catalog.pg_description.c.description, ) .select_from(pg_catalog.pg_class) .outerjoin( pg_catalog.pg_constraint, sql.and_( pg_catalog.pg_class.c.oid == pg_catalog.pg_constraint.c.conrelid, pg_catalog.pg_constraint.c.contype == "f", ), ) .outerjoin( pg_class_ref, pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid, ) .outerjoin( pg_namespace_ref, pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid, ) .outerjoin( pg_catalog.pg_description, pg_catalog.pg_description.c.objoid == pg_catalog.pg_constraint.c.oid, ) .order_by( pg_catalog.pg_class.c.relname, pg_catalog.pg_constraint.c.conname, ) .where(self._pg_class_relkind_condition(relkinds)) ) query = self._pg_class_filter_scope_schema(query, schema, scope) if has_filter_names: query = query.where( pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) ) return query @util.memoized_property def _fk_regex_pattern(self): # optionally quoted token qtoken = '(?:"[^"]+"|[A-Za-z0-9_]+?)' # https://www.postgresql.org/docs/current/static/sql-createtable.html return re.compile( r"FOREIGN KEY \((.*?)\) " rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" r"[\s]?(ON UPDATE " r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" r"[\s]?(ON DELETE " r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" ) def get_multi_foreign_keys( self, connection, schema, filter_names, scope, kind, postgresql_ignore_search_path=False, **kw, ): preparer = self.identifier_preparer has_filter_names, params = self._prepare_filter_names(filter_names) query = self._foreing_key_query(schema, has_filter_names, scope, kind) result = connection.execute(query, params) FK_REGEX = self._fk_regex_pattern fkeys = defaultdict(list) default = ReflectionDefaults.foreign_keys for table_name, conname, condef, conschema, comment in result: # ensure that each table has an entry, even if it has # no foreign keys if conname is None: fkeys[(schema, table_name)] = default() continue table_fks = fkeys[(schema, table_name)] m = re.search(FK_REGEX, condef).groups() ( constrained_columns, referred_schema, referred_table, referred_columns, _, match, _, onupdate, _, ondelete, deferrable, _, initially, ) = m if deferrable is not None: deferrable = True if deferrable == "DEFERRABLE" else False constrained_columns = [ preparer._unquote_identifier(x) for x in re.split(r"\s*,\s*", constrained_columns) ] if postgresql_ignore_search_path: # when ignoring search path, we use the actual schema # provided it isn't the "default" schema if conschema != self.default_schema_name: referred_schema = conschema else: referred_schema = schema elif referred_schema: # referred_schema is the schema that we regexp'ed from # pg_get_constraintdef(). If the schema is in the search # path, pg_get_constraintdef() will give us None. referred_schema = preparer._unquote_identifier(referred_schema) elif schema is not None and schema == conschema: # If the actual schema matches the schema of the table # we're reflecting, then we will use that. referred_schema = schema referred_table = preparer._unquote_identifier(referred_table) referred_columns = [ preparer._unquote_identifier(x) for x in re.split(r"\s*,\s", referred_columns) ] options = { k: v for k, v in [ ("onupdate", onupdate), ("ondelete", ondelete), ("initially", initially), ("deferrable", deferrable), ("match", match), ] if v is not None and v != "NO ACTION" } fkey_d = { "name": conname, "constrained_columns": constrained_columns, "referred_schema": referred_schema, "referred_table": referred_table, "referred_columns": referred_columns, "options": options, "comment": comment, } table_fks.append(fkey_d) return fkeys.items() @reflection.cache def get_indexes(self, connection, table_name, schema=None, **kw): data = self.get_multi_indexes( connection, schema=schema, filter_names=[table_name], scope=ObjectScope.ANY, kind=ObjectKind.ANY, **kw, ) return self._value_or_raise(data, table_name, schema) @util.memoized_property def _index_query(self): pg_class_index = pg_catalog.pg_class.alias("cls_idx") # NOTE: repeating oids clause improve query performance # subquery to get the columns idx_sq = ( select( pg_catalog.pg_index.c.indexrelid, pg_catalog.pg_index.c.indrelid, sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), sql.func.generate_subscripts( pg_catalog.pg_index.c.indkey, 1 ).label("ord"), ) .where( ~pg_catalog.pg_index.c.indisprimary, pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), ) .subquery("idx") ) attr_sq = ( select( idx_sq.c.indexrelid, idx_sq.c.indrelid, idx_sq.c.ord, # NOTE: always using pg_get_indexdef is too slow so just # invoke when the element is an expression sql.case( ( idx_sq.c.attnum == 0, pg_catalog.pg_get_indexdef( idx_sq.c.indexrelid, idx_sq.c.ord + 1, True ), ), # NOTE: need to cast this since attname is of type "name" # that's limited to 63 bytes, while pg_get_indexdef # returns "text" so its output may get cut else_=pg_catalog.pg_attribute.c.attname.cast(TEXT), ).label("element"), (idx_sq.c.attnum == 0).label("is_expr"), ) .select_from(idx_sq) .outerjoin( # do not remove rows where idx_sq.c.attnum is 0 pg_catalog.pg_attribute, sql.and_( pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum, pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid, ), ) .where(idx_sq.c.indrelid.in_(bindparam("oids"))) .subquery("idx_attr") ) cols_sq = ( select( attr_sq.c.indexrelid, sql.func.min(attr_sq.c.indrelid), sql.func.array_agg( aggregate_order_by(attr_sq.c.element, attr_sq.c.ord) ).label("elements"), sql.func.array_agg( aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord) ).label("elements_is_expr"), ) .group_by(attr_sq.c.indexrelid) .subquery("idx_cols") ) if self.server_version_info >= (11, 0): indnkeyatts = pg_catalog.pg_index.c.indnkeyatts else: indnkeyatts = sql.null().label("indnkeyatts") if self.server_version_info >= (15,): nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct else: nulls_not_distinct = sql.false().label("indnullsnotdistinct") return ( select( pg_catalog.pg_index.c.indrelid, pg_class_index.c.relname.label("relname_index"), pg_catalog.pg_index.c.indisunique, pg_catalog.pg_constraint.c.conrelid.is_not(None).label( "has_constraint" ), pg_catalog.pg_index.c.indoption, pg_class_index.c.reloptions, pg_catalog.pg_am.c.amname, # NOTE: pg_get_expr is very fast so this case has almost no # performance impact sql.case( ( pg_catalog.pg_index.c.indpred.is_not(None), pg_catalog.pg_get_expr( pg_catalog.pg_index.c.indpred, pg_catalog.pg_index.c.indrelid, ), ), else_=None, ).label("filter_definition"), indnkeyatts, nulls_not_distinct, cols_sq.c.elements, cols_sq.c.elements_is_expr, ) .select_from(pg_catalog.pg_index) .where( pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), ~pg_catalog.pg_index.c.indisprimary, ) .join( pg_class_index, pg_catalog.pg_index.c.indexrelid == pg_class_index.c.oid, ) .join( pg_catalog.pg_am, pg_class_index.c.relam == pg_catalog.pg_am.c.oid, ) .outerjoin( cols_sq, pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid, ) .outerjoin( pg_catalog.pg_constraint, sql.and_( pg_catalog.pg_index.c.indrelid == pg_catalog.pg_constraint.c.conrelid, pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_constraint.c.conindid, pg_catalog.pg_constraint.c.contype == sql.any_(_array.array(("p", "u", "x"))), ), ) .order_by(pg_catalog.pg_index.c.indrelid, pg_class_index.c.relname) ) def get_multi_indexes( self, connection, schema, filter_names, scope, kind, **kw ): table_oids = self._get_table_oids( connection, schema, filter_names, scope, kind, **kw ) indexes = defaultdict(list) default = ReflectionDefaults.indexes batches = list(table_oids) while batches: batch = batches[0:3000] batches[0:3000] = [] result = connection.execute( self._index_query, {"oids": [r[0] for r in batch]} ).mappings() result_by_oid = defaultdict(list) for row_dict in result: result_by_oid[row_dict["indrelid"]].append(row_dict) for oid, table_name in batch: if oid not in result_by_oid: # ensure that each table has an entry, even if reflection # is skipped because not supported indexes[(schema, table_name)] = default() continue for row in result_by_oid[oid]: index_name = row["relname_index"] table_indexes = indexes[(schema, table_name)] all_elements = row["elements"] all_elements_is_expr = row["elements_is_expr"] indnkeyatts = row["indnkeyatts"] # "The number of key columns in the index, not counting any # included columns, which are merely stored and do not # participate in the index semantics" if indnkeyatts and len(all_elements) > indnkeyatts: # this is a "covering index" which has INCLUDE columns # as well as regular index columns inc_cols = all_elements[indnkeyatts:] idx_elements = all_elements[:indnkeyatts] idx_elements_is_expr = all_elements_is_expr[ :indnkeyatts ] # postgresql does not support expression on included # columns as of v14: "ERROR: expressions are not # supported in included columns". assert all( not is_expr for is_expr in all_elements_is_expr[indnkeyatts:] ) else: idx_elements = all_elements idx_elements_is_expr = all_elements_is_expr inc_cols = [] index = {"name": index_name, "unique": row["indisunique"]} if any(idx_elements_is_expr): index["column_names"] = [ None if is_expr else expr for expr, is_expr in zip( idx_elements, idx_elements_is_expr ) ] index["expressions"] = idx_elements else: index["column_names"] = idx_elements sorting = {} for col_index, col_flags in enumerate(row["indoption"]): col_sorting = () # try to set flags only if they differ from PG # defaults... if col_flags & 0x01: col_sorting += ("desc",) if not (col_flags & 0x02): col_sorting += ("nulls_last",) else: if col_flags & 0x02: col_sorting += ("nulls_first",) if col_sorting: sorting[idx_elements[col_index]] = col_sorting if sorting: index["column_sorting"] = sorting if row["has_constraint"]: index["duplicates_constraint"] = index_name dialect_options = {} if row["reloptions"]: dialect_options["postgresql_with"] = dict( [option.split("=") for option in row["reloptions"]] ) # it *might* be nice to include that this is 'btree' in the # reflection info. But we don't want an Index object # to have a ``postgresql_using`` in it that is just the # default, so for the moment leaving this out. amname = row["amname"] if amname != "btree": dialect_options["postgresql_using"] = row["amname"] if row["filter_definition"]: dialect_options["postgresql_where"] = row[ "filter_definition" ] if self.server_version_info >= (11,): # NOTE: this is legacy, this is part of # dialect_options now as of #7382 index["include_columns"] = inc_cols dialect_options["postgresql_include"] = inc_cols if row["indnullsnotdistinct"]: # the default is False, so ignore it. dialect_options["postgresql_nulls_not_distinct"] = row[ "indnullsnotdistinct" ] if dialect_options: index["dialect_options"] = dialect_options table_indexes.append(index) return indexes.items() @reflection.cache def get_unique_constraints( self, connection, table_name, schema=None, **kw ): data = self.get_multi_unique_constraints( connection, schema=schema, filter_names=[table_name], scope=ObjectScope.ANY, kind=ObjectKind.ANY, **kw, ) return self._value_or_raise(data, table_name, schema) def get_multi_unique_constraints( self, connection, schema, filter_names, scope, kind, **kw, ): result = self._reflect_constraint( connection, "u", schema, filter_names, scope, kind, **kw ) # each table can have multiple unique constraints uniques = defaultdict(list) default = ReflectionDefaults.unique_constraints for table_name, cols, con_name, comment, options in result: # ensure a list is created for each table. leave it empty if # the table has no unique cosntraint if con_name is None: uniques[(schema, table_name)] = default() continue uc_dict = { "column_names": cols, "name": con_name, "comment": comment, } if options: if options["nullsnotdistinct"]: uc_dict["dialect_options"] = { "postgresql_nulls_not_distinct": options[ "nullsnotdistinct" ] } uniques[(schema, table_name)].append(uc_dict) return uniques.items() @reflection.cache def get_table_comment(self, connection, table_name, schema=None, **kw): data = self.get_multi_table_comment( connection, schema, [table_name], scope=ObjectScope.ANY, kind=ObjectKind.ANY, **kw, ) return self._value_or_raise(data, table_name, schema) @lru_cache() def _comment_query(self, schema, has_filter_names, scope, kind): relkinds = self._kind_to_relkinds(kind) query = ( select( pg_catalog.pg_class.c.relname, pg_catalog.pg_description.c.description, ) .select_from(pg_catalog.pg_class) .outerjoin( pg_catalog.pg_description, sql.and_( pg_catalog.pg_class.c.oid == pg_catalog.pg_description.c.objoid, pg_catalog.pg_description.c.objsubid == 0, ), ) .where(self._pg_class_relkind_condition(relkinds)) ) query = self._pg_class_filter_scope_schema(query, schema, scope) if has_filter_names: query = query.where( pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) ) return query def get_multi_table_comment( self, connection, schema, filter_names, scope, kind, **kw ): has_filter_names, params = self._prepare_filter_names(filter_names) query = self._comment_query(schema, has_filter_names, scope, kind) result = connection.execute(query, params) default = ReflectionDefaults.table_comment return ( ( (schema, table), {"text": comment} if comment is not None else default(), ) for table, comment in result ) @reflection.cache def get_check_constraints(self, connection, table_name, schema=None, **kw): data = self.get_multi_check_constraints( connection, schema, [table_name], scope=ObjectScope.ANY, kind=ObjectKind.ANY, **kw, ) return self._value_or_raise(data, table_name, schema) @lru_cache() def _check_constraint_query(self, schema, has_filter_names, scope, kind): relkinds = self._kind_to_relkinds(kind) query = ( select( pg_catalog.pg_class.c.relname, pg_catalog.pg_constraint.c.conname, # NOTE: avoid calling pg_get_constraintdef when not needed # to speed up the query sql.case( ( pg_catalog.pg_constraint.c.oid.is_not(None), pg_catalog.pg_get_constraintdef( pg_catalog.pg_constraint.c.oid, True ), ), else_=None, ), pg_catalog.pg_description.c.description, ) .select_from(pg_catalog.pg_class) .outerjoin( pg_catalog.pg_constraint, sql.and_( pg_catalog.pg_class.c.oid == pg_catalog.pg_constraint.c.conrelid, pg_catalog.pg_constraint.c.contype == "c", ), ) .outerjoin( pg_catalog.pg_description, pg_catalog.pg_description.c.objoid == pg_catalog.pg_constraint.c.oid, ) .order_by( pg_catalog.pg_class.c.relname, pg_catalog.pg_constraint.c.conname, ) .where(self._pg_class_relkind_condition(relkinds)) ) query = self._pg_class_filter_scope_schema(query, schema, scope) if has_filter_names: query = query.where( pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) ) return query def get_multi_check_constraints( self, connection, schema, filter_names, scope, kind, **kw ): has_filter_names, params = self._prepare_filter_names(filter_names) query = self._check_constraint_query( schema, has_filter_names, scope, kind ) result = connection.execute(query, params) check_constraints = defaultdict(list) default = ReflectionDefaults.check_constraints for table_name, check_name, src, comment in result: # only two cases for check_name and src: both null or both defined if check_name is None and src is None: check_constraints[(schema, table_name)] = default() continue # samples: # "CHECK (((a > 1) AND (a < 5)))" # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))" # "CHECK (((a > 1) AND (a < 5))) NOT VALID" # "CHECK (some_boolean_function(a))" # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)" # "CHECK (a NOT NULL) NO INHERIT" # "CHECK (a NOT NULL) NO INHERIT NOT VALID" m = re.match( r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$", src, flags=re.DOTALL, ) if not m: util.warn("Could not parse CHECK constraint text: %r" % src) sqltext = "" else: sqltext = re.compile( r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL ).sub(r"\1", m.group(1)) entry = { "name": check_name, "sqltext": sqltext, "comment": comment, } if m: do = {} if " NOT VALID" in m.groups(): do["not_valid"] = True if " NO INHERIT" in m.groups(): do["no_inherit"] = True if do: entry["dialect_options"] = do check_constraints[(schema, table_name)].append(entry) return check_constraints.items() def _pg_type_filter_schema(self, query, schema): if schema is None: query = query.where( pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), # ignore pg_catalog schema pg_catalog.pg_namespace.c.nspname != "pg_catalog", ) elif schema != "*": query = query.where(pg_catalog.pg_namespace.c.nspname == schema) return query @lru_cache() def _enum_query(self, schema): lbl_agg_sq = ( select( pg_catalog.pg_enum.c.enumtypid, sql.func.array_agg( aggregate_order_by( # NOTE: cast since some postgresql derivatives may # not support array_agg on the name type pg_catalog.pg_enum.c.enumlabel.cast(TEXT), pg_catalog.pg_enum.c.enumsortorder, ) ).label("labels"), ) .group_by(pg_catalog.pg_enum.c.enumtypid) .subquery("lbl_agg") ) query = ( select( pg_catalog.pg_type.c.typname.label("name"), pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( "visible" ), pg_catalog.pg_namespace.c.nspname.label("schema"), lbl_agg_sq.c.labels.label("labels"), ) .join( pg_catalog.pg_namespace, pg_catalog.pg_namespace.c.oid == pg_catalog.pg_type.c.typnamespace, ) .outerjoin( lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid ) .where(pg_catalog.pg_type.c.typtype == "e") .order_by( pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname ) ) return self._pg_type_filter_schema(query, schema) @reflection.cache def _load_enums(self, connection, schema=None, **kw): if not self.supports_native_enum: return [] result = connection.execute(self._enum_query(schema)) enums = [] for name, visible, schema, labels in result: enums.append( { "name": name, "schema": schema, "visible": visible, "labels": [] if labels is None else labels, } ) return enums @lru_cache() def _domain_query(self, schema): con_sq = ( select( pg_catalog.pg_constraint.c.contypid, sql.func.array_agg( pg_catalog.pg_get_constraintdef( pg_catalog.pg_constraint.c.oid, True ) ).label("condefs"), sql.func.array_agg( # NOTE: cast since some postgresql derivatives may # not support array_agg on the name type pg_catalog.pg_constraint.c.conname.cast(TEXT) ).label("connames"), ) # The domain this constraint is on; zero if not a domain constraint .where(pg_catalog.pg_constraint.c.contypid != 0) .group_by(pg_catalog.pg_constraint.c.contypid) .subquery("domain_constraints") ) query = ( select( pg_catalog.pg_type.c.typname.label("name"), pg_catalog.format_type( pg_catalog.pg_type.c.typbasetype, pg_catalog.pg_type.c.typtypmod, ).label("attype"), (~pg_catalog.pg_type.c.typnotnull).label("nullable"), pg_catalog.pg_type.c.typdefault.label("default"), pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( "visible" ), pg_catalog.pg_namespace.c.nspname.label("schema"), con_sq.c.condefs, con_sq.c.connames, pg_catalog.pg_collation.c.collname, ) .join( pg_catalog.pg_namespace, pg_catalog.pg_namespace.c.oid == pg_catalog.pg_type.c.typnamespace, ) .outerjoin( pg_catalog.pg_collation, pg_catalog.pg_type.c.typcollation == pg_catalog.pg_collation.c.oid, ) .outerjoin( con_sq, pg_catalog.pg_type.c.oid == con_sq.c.contypid, ) .where(pg_catalog.pg_type.c.typtype == "d") .order_by( pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname ) ) return self._pg_type_filter_schema(query, schema) @reflection.cache def _load_domains(self, connection, schema=None, **kw): result = connection.execute(self._domain_query(schema)) domains: List[ReflectedDomain] = [] for domain in result.mappings(): # strip (30) from character varying(30) attype = re.search(r"([^\(]+)", domain["attype"]).group(1) constraints: List[ReflectedDomainConstraint] = [] if domain["connames"]: # When a domain has multiple CHECK constraints, they will # be tested in alphabetical order by name. sorted_constraints = sorted( zip(domain["connames"], domain["condefs"]), key=lambda t: t[0], ) for name, def_ in sorted_constraints: # constraint is in the form "CHECK (expression)". # remove "CHECK (" and the tailing ")". check = def_[7:-1] constraints.append({"name": name, "check": check}) domain_rec: ReflectedDomain = { "name": domain["name"], "schema": domain["schema"], "visible": domain["visible"], "type": attype, "nullable": domain["nullable"], "default": domain["default"], "constraints": constraints, "collation": domain["collname"], } domains.append(domain_rec) return domains def _set_backslash_escapes(self, connection): # this method is provided as an override hook for descendant # dialects (e.g. Redshift), so removing it may break them std_string = connection.exec_driver_sql( "show standard_conforming_strings" ).scalar() self._backslash_escapes = std_string == "off"