diff options
| author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 | 
|---|---|---|
| committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:10:44 -0400 | 
| commit | 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch) | |
| tree | b1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py | |
| parent | 4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff) | |
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py')
| -rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py | 5007 | 
1 files changed, 5007 insertions, 0 deletions
| diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py new file mode 100644 index 0000000..4ab3ca2 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py @@ -0,0 +1,5007 @@ +# dialects/postgresql/base.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +r""" +.. dialect:: postgresql +    :name: PostgreSQL +    :full_support: 12, 13, 14, 15 +    :normal_support: 9.6+ +    :best_effort: 9+ + +.. _postgresql_sequences: + +Sequences/SERIAL/IDENTITY +------------------------- + +PostgreSQL supports sequences, and SQLAlchemy uses these as the default means +of creating new primary key values for integer-based primary key columns. When +creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for +integer-based primary key columns, which generates a sequence and server side +default corresponding to the column. + +To specify a specific named sequence to be used for primary key generation, +use the :func:`~sqlalchemy.schema.Sequence` construct:: + +    Table( +        "sometable", +        metadata, +        Column( +            "id", Integer, Sequence("some_id_seq", start=1), primary_key=True +        ) +    ) + +When SQLAlchemy issues a single INSERT statement, to fulfill the contract of +having the "last insert identifier" available, a RETURNING clause is added to +the INSERT statement which specifies the primary key columns should be +returned after the statement completes. The RETURNING functionality only takes +place if PostgreSQL 8.2 or later is in use. As a fallback approach, the +sequence, whether specified explicitly or implicitly via ``SERIAL``, is +executed independently beforehand, the returned value to be used in the +subsequent insert. Note that when an +:func:`~sqlalchemy.sql.expression.insert()` construct is executed using +"executemany" semantics, the "last inserted identifier" functionality does not +apply; no RETURNING clause is emitted nor is the sequence pre-executed in this +case. + + +PostgreSQL 10 and above IDENTITY columns +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use +of SERIAL. The :class:`_schema.Identity` construct in a +:class:`_schema.Column` can be used to control its behavior:: + +    from sqlalchemy import Table, Column, MetaData, Integer, Computed + +    metadata = MetaData() + +    data = Table( +        "data", +        metadata, +        Column( +            'id', Integer, Identity(start=42, cycle=True), primary_key=True +        ), +        Column('data', String) +    ) + +The CREATE TABLE for the above :class:`_schema.Table` object would be: + +.. sourcecode:: sql + +    CREATE TABLE data ( +        id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE), +        data VARCHAR, +        PRIMARY KEY (id) +    ) + +.. versionchanged::  1.4   Added :class:`_schema.Identity` construct +   in a :class:`_schema.Column` to specify the option of an autoincrementing +   column. + +.. note:: + +   Previous versions of SQLAlchemy did not have built-in support for rendering +   of IDENTITY, and could use the following compilation hook to replace +   occurrences of SERIAL with IDENTITY:: + +       from sqlalchemy.schema import CreateColumn +       from sqlalchemy.ext.compiler import compiles + + +       @compiles(CreateColumn, 'postgresql') +       def use_identity(element, compiler, **kw): +           text = compiler.visit_create_column(element, **kw) +           text = text.replace( +               "SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY" +            ) +           return text + +   Using the above, a table such as:: + +       t = Table( +           't', m, +           Column('id', Integer, primary_key=True), +           Column('data', String) +       ) + +   Will generate on the backing database as:: + +       CREATE TABLE t ( +           id INT GENERATED BY DEFAULT AS IDENTITY, +           data VARCHAR, +           PRIMARY KEY (id) +       ) + +.. _postgresql_ss_cursors: + +Server Side Cursors +------------------- + +Server-side cursor support is available for the psycopg2, asyncpg +dialects and may also be available in others. + +Server side cursors are enabled on a per-statement basis by using the +:paramref:`.Connection.execution_options.stream_results` connection execution +option:: + +    with engine.connect() as conn: +        result = conn.execution_options(stream_results=True).execute(text("select * from table")) + +Note that some kinds of SQL statements may not be supported with +server side cursors; generally, only SQL statements that return rows should be +used with this option. + +.. deprecated:: 1.4  The dialect-level server_side_cursors flag is deprecated +   and will be removed in a future release.  Please use the +   :paramref:`_engine.Connection.stream_results` execution option for +   unbuffered cursor support. + +.. seealso:: + +    :ref:`engine_stream_results` + +.. _postgresql_isolation_level: + +Transaction Isolation Level +--------------------------- + +Most SQLAlchemy dialects support setting of transaction isolation level +using the :paramref:`_sa.create_engine.isolation_level` parameter +at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection` +level via the :paramref:`.Connection.execution_options.isolation_level` +parameter. + +For PostgreSQL dialects, this feature works either by making use of the +DBAPI-specific features, such as psycopg2's isolation level flags which will +embed the isolation level setting inline with the ``"BEGIN"`` statement, or for +DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS +TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement +emitted by the DBAPI.   For the special AUTOCOMMIT isolation level, +DBAPI-specific techniques are used which is typically an ``.autocommit`` +flag on the DBAPI connection object. + +To set isolation level using :func:`_sa.create_engine`:: + +    engine = create_engine( +        "postgresql+pg8000://scott:tiger@localhost/test", +        isolation_level = "REPEATABLE READ" +    ) + +To set using per-connection execution options:: + +    with engine.connect() as conn: +        conn = conn.execution_options( +            isolation_level="REPEATABLE READ" +        ) +        with conn.begin(): +            # ... work with transaction + +There are also more options for isolation level configurations, such as +"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply +different isolation level settings.  See the discussion at +:ref:`dbapi_autocommit` for background. + +Valid values for ``isolation_level`` on most PostgreSQL dialects include: + +* ``READ COMMITTED`` +* ``READ UNCOMMITTED`` +* ``REPEATABLE READ`` +* ``SERIALIZABLE`` +* ``AUTOCOMMIT`` + +.. seealso:: + +    :ref:`dbapi_autocommit` + +    :ref:`postgresql_readonly_deferrable` + +    :ref:`psycopg2_isolation_level` + +    :ref:`pg8000_isolation_level` + +.. _postgresql_readonly_deferrable: + +Setting READ ONLY / DEFERRABLE +------------------------------ + +Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" +characteristics of the transaction, which is in addition to the isolation level +setting. These two attributes can be established either in conjunction with or +independently of the isolation level by passing the ``postgresql_readonly`` and +``postgresql_deferrable`` flags with +:meth:`_engine.Connection.execution_options`.  The example below illustrates +passing the ``"SERIALIZABLE"`` isolation level at the same time as setting +"READ ONLY" and "DEFERRABLE":: + +    with engine.connect() as conn: +        conn = conn.execution_options( +            isolation_level="SERIALIZABLE", +            postgresql_readonly=True, +            postgresql_deferrable=True +        ) +        with conn.begin(): +            #  ... work with transaction + +Note that some DBAPIs such as asyncpg only support "readonly" with +SERIALIZABLE isolation. + +.. versionadded:: 1.4 added support for the ``postgresql_readonly`` +   and ``postgresql_deferrable`` execution options. + +.. _postgresql_reset_on_return: + +Temporary Table / Resource Reset for Connection Pooling +------------------------------------------------------- + +The :class:`.QueuePool` connection pool implementation used +by the SQLAlchemy :class:`.Engine` object includes +:ref:`reset on return <pool_reset_on_return>` behavior that will invoke +the DBAPI ``.rollback()`` method when connections are returned to the pool. +While this rollback will clear out the immediate state used by the previous +transaction, it does not cover a wider range of session-level state, including +temporary tables as well as other server state such as prepared statement +handles and statement caches.   The PostgreSQL database includes a variety +of commands which may be used to reset this state, including +``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``. + + +To install +one or more of these commands as the means of performing reset-on-return, +the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated +in the example below. The implementation +will end transactions in progress as well as discard temporary tables +using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL +documentation for background on what each of these statements do. + +The :paramref:`_sa.create_engine.pool_reset_on_return` parameter +is set to ``None`` so that the custom scheme can replace the default behavior +completely.   The custom hook implementation calls ``.rollback()`` in any case, +as it's usually important that the DBAPI's own tracking of commit/rollback +will remain consistent with the state of the transaction:: + + +    from sqlalchemy import create_engine +    from sqlalchemy import event + +    postgresql_engine = create_engine( +        "postgresql+pyscopg2://scott:tiger@hostname/dbname", + +        # disable default reset-on-return scheme +        pool_reset_on_return=None, +    ) + + +    @event.listens_for(postgresql_engine, "reset") +    def _reset_postgresql(dbapi_connection, connection_record, reset_state): +        if not reset_state.terminate_only: +            dbapi_connection.execute("CLOSE ALL") +            dbapi_connection.execute("RESET ALL") +            dbapi_connection.execute("DISCARD TEMP") + +        # so that the DBAPI itself knows that the connection has been +        # reset +        dbapi_connection.rollback() + +.. versionchanged:: 2.0.0b3  Added additional state arguments to +   the :meth:`.PoolEvents.reset` event and additionally ensured the event +   is invoked for all "reset" occurrences, so that it's appropriate +   as a place for custom "reset" handlers.   Previous schemes which +   use the :meth:`.PoolEvents.checkin` handler remain usable as well. + +.. seealso:: + +    :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation + +.. _postgresql_alternate_search_path: + +Setting Alternate Search Paths on Connect +------------------------------------------ + +The PostgreSQL ``search_path`` variable refers to the list of schema names +that will be implicitly referenced when a particular table or other +object is referenced in a SQL statement.  As detailed in the next section +:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around +the concept of keeping this variable at its default value of ``public``, +however, in order to have it set to any arbitrary name or names when connections +are used automatically, the "SET SESSION search_path" command may be invoked +for all connections in a pool using the following event handler, as discussed +at :ref:`schema_set_default_connections`:: + +    from sqlalchemy import event +    from sqlalchemy import create_engine + +    engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname") + +    @event.listens_for(engine, "connect", insert=True) +    def set_search_path(dbapi_connection, connection_record): +        existing_autocommit = dbapi_connection.autocommit +        dbapi_connection.autocommit = True +        cursor = dbapi_connection.cursor() +        cursor.execute("SET SESSION search_path='%s'" % schema_name) +        cursor.close() +        dbapi_connection.autocommit = existing_autocommit + +The reason the recipe is complicated by use of the ``.autocommit`` DBAPI +attribute is so that when the ``SET SESSION search_path`` directive is invoked, +it is invoked outside of the scope of any transaction and therefore will not +be reverted when the DBAPI connection has a rollback. + +.. seealso:: + +  :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation + + + + +.. _postgresql_schema_reflection: + +Remote-Schema Table Introspection and PostgreSQL search_path +------------------------------------------------------------ + +.. admonition:: Section Best Practices Summarized + +    keep the ``search_path`` variable set to its default of ``public``, without +    any other schema names. Ensure the username used to connect **does not** +    match remote schemas, or ensure the ``"$user"`` token is **removed** from +    ``search_path``.  For other schema names, name these explicitly +    within :class:`_schema.Table` definitions. Alternatively, the +    ``postgresql_ignore_search_path`` option will cause all reflected +    :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema` +    attribute set up. + +The PostgreSQL dialect can reflect tables from any schema, as outlined in +:ref:`metadata_reflection_schemas`. + +In all cases, the first thing SQLAlchemy does when reflecting tables is +to **determine the default schema for the current database connection**. +It does this using the PostgreSQL ``current_schema()`` +function, illustated below using a PostgreSQL client session (i.e. using +the ``psql`` tool):: + +    test=> select current_schema(); +    current_schema +    ---------------- +    public +    (1 row) + +Above we see that on a plain install of PostgreSQL, the default schema name +is the name ``public``. + +However, if your database username **matches the name of a schema**, PostgreSQL's +default is to then **use that name as the default schema**.  Below, we log in +using the username ``scott``.  When we create a schema named ``scott``, **it +implicitly changes the default schema**:: + +    test=> select current_schema(); +    current_schema +    ---------------- +    public +    (1 row) + +    test=> create schema scott; +    CREATE SCHEMA +    test=> select current_schema(); +    current_schema +    ---------------- +    scott +    (1 row) + +The behavior of ``current_schema()`` is derived from the +`PostgreSQL search path +<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ +variable ``search_path``, which in modern PostgreSQL versions defaults to this:: + +    test=> show search_path; +    search_path +    ----------------- +    "$user", public +    (1 row) + +Where above, the ``"$user"`` variable will inject the current username as the +default schema, if one exists.   Otherwise, ``public`` is used. + +When a :class:`_schema.Table` object is reflected, if it is present in the +schema indicated by the ``current_schema()`` function, **the schema name assigned +to the ".schema" attribute of the Table is the Python "None" value**.  Otherwise, the +".schema" attribute will be assigned the string name of that schema. + +With regards to tables which these :class:`_schema.Table` +objects refer to via foreign key constraint, a decision must be made as to how +the ``.schema`` is represented in those remote tables, in the case where that +remote schema name is also a member of the current ``search_path``. + +By default, the PostgreSQL dialect mimics the behavior encouraged by +PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure.  This function +returns a sample definition for a particular foreign key constraint, +omitting the referenced schema name from that definition when the name is +also in the PostgreSQL schema search path.  The interaction below +illustrates this behavior:: + +    test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY); +    CREATE TABLE +    test=> CREATE TABLE referring( +    test(>         id INTEGER PRIMARY KEY, +    test(>         referred_id INTEGER REFERENCES test_schema.referred(id)); +    CREATE TABLE +    test=> SET search_path TO public, test_schema; +    test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM +    test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n +    test-> ON n.oid = c.relnamespace +    test-> JOIN pg_catalog.pg_constraint r  ON c.oid = r.conrelid +    test-> WHERE c.relname='referring' AND r.contype = 'f' +    test-> ; +                   pg_get_constraintdef +    --------------------------------------------------- +     FOREIGN KEY (referred_id) REFERENCES referred(id) +    (1 row) + +Above, we created a table ``referred`` as a member of the remote schema +``test_schema``, however when we added ``test_schema`` to the +PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the +``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of +the function. + +On the other hand, if we set the search path back to the typical default +of ``public``:: + +    test=> SET search_path TO public; +    SET + +The same query against ``pg_get_constraintdef()`` now returns the fully +schema-qualified name for us:: + +    test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM +    test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n +    test-> ON n.oid = c.relnamespace +    test-> JOIN pg_catalog.pg_constraint r  ON c.oid = r.conrelid +    test-> WHERE c.relname='referring' AND r.contype = 'f'; +                         pg_get_constraintdef +    --------------------------------------------------------------- +     FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id) +    (1 row) + +SQLAlchemy will by default use the return value of ``pg_get_constraintdef()`` +in order to determine the remote schema name.  That is, if our ``search_path`` +were set to include ``test_schema``, and we invoked a table +reflection process as follows:: + +    >>> from sqlalchemy import Table, MetaData, create_engine, text +    >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") +    >>> with engine.connect() as conn: +    ...     conn.execute(text("SET search_path TO test_schema, public")) +    ...     metadata_obj = MetaData() +    ...     referring = Table('referring', metadata_obj, +    ...                       autoload_with=conn) +    ... +    <sqlalchemy.engine.result.CursorResult object at 0x101612ed0> + +The above process would deliver to the :attr:`_schema.MetaData.tables` +collection +``referred`` table named **without** the schema:: + +    >>> metadata_obj.tables['referred'].schema is None +    True + +To alter the behavior of reflection such that the referred schema is +maintained regardless of the ``search_path`` setting, use the +``postgresql_ignore_search_path`` option, which can be specified as a +dialect-specific argument to both :class:`_schema.Table` as well as +:meth:`_schema.MetaData.reflect`:: + +    >>> with engine.connect() as conn: +    ...     conn.execute(text("SET search_path TO test_schema, public")) +    ...     metadata_obj = MetaData() +    ...     referring = Table('referring', metadata_obj, +    ...                       autoload_with=conn, +    ...                       postgresql_ignore_search_path=True) +    ... +    <sqlalchemy.engine.result.CursorResult object at 0x1016126d0> + +We will now have ``test_schema.referred`` stored as schema-qualified:: + +    >>> metadata_obj.tables['test_schema.referred'].schema +    'test_schema' + +.. sidebar:: Best Practices for PostgreSQL Schema reflection + +    The description of PostgreSQL schema reflection behavior is complex, and +    is the product of many years of dealing with widely varied use cases and +    user preferences. But in fact, there's no need to understand any of it if +    you just stick to the simplest use pattern: leave the ``search_path`` set +    to its default of ``public`` only, never refer to the name ``public`` as +    an explicit schema name otherwise, and refer to all other schema names +    explicitly when building up a :class:`_schema.Table` object.  The options +    described here are only for those users who can't, or prefer not to, stay +    within these guidelines. + +.. seealso:: + +    :ref:`reflection_schema_qualified_interaction` - discussion of the issue +    from a backend-agnostic perspective + +    `The Schema Search Path +    <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ +    - on the PostgreSQL website. + +INSERT/UPDATE...RETURNING +------------------------- + +The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and +``DELETE..RETURNING`` syntaxes.   ``INSERT..RETURNING`` is used by default +for single-row INSERT statements in order to fetch newly generated +primary key identifiers.   To specify an explicit ``RETURNING`` clause, +use the :meth:`._UpdateBase.returning` method on a per-statement basis:: + +    # INSERT..RETURNING +    result = table.insert().returning(table.c.col1, table.c.col2).\ +        values(name='foo') +    print(result.fetchall()) + +    # UPDATE..RETURNING +    result = table.update().returning(table.c.col1, table.c.col2).\ +        where(table.c.name=='foo').values(name='bar') +    print(result.fetchall()) + +    # DELETE..RETURNING +    result = table.delete().returning(table.c.col1, table.c.col2).\ +        where(table.c.name=='foo') +    print(result.fetchall()) + +.. _postgresql_insert_on_conflict: + +INSERT...ON CONFLICT (Upsert) +------------------------------ + +Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of +rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A +candidate row will only be inserted if that row does not violate any unique +constraints.  In the case of a unique constraint violation, a secondary action +can occur which can be either "DO UPDATE", indicating that the data in the +target row should be updated, or "DO NOTHING", which indicates to silently skip +this row. + +Conflicts are determined using existing unique constraints and indexes.  These +constraints may be identified either using their name as stated in DDL, +or they may be inferred by stating the columns and conditions that comprise +the indexes. + +SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific +:func:`_postgresql.insert()` function, which provides +the generative methods :meth:`_postgresql.Insert.on_conflict_do_update` +and :meth:`~.postgresql.Insert.on_conflict_do_nothing`: + +.. sourcecode:: pycon+sql + +    >>> from sqlalchemy.dialects.postgresql import insert +    >>> insert_stmt = insert(my_table).values( +    ...     id='some_existing_id', +    ...     data='inserted value') +    >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing( +    ...     index_elements=['id'] +    ... ) +    >>> print(do_nothing_stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT (id) DO NOTHING +    {stop} + +    >>> do_update_stmt = insert_stmt.on_conflict_do_update( +    ...     constraint='pk_my_table', +    ...     set_=dict(data='updated value') +    ... ) +    >>> print(do_update_stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s + +.. seealso:: + +    `INSERT .. ON CONFLICT +    <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_ +    - in the PostgreSQL documentation. + +Specifying the Target +^^^^^^^^^^^^^^^^^^^^^ + +Both methods supply the "target" of the conflict using either the +named constraint or by column inference: + +* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument +  specifies a sequence containing string column names, :class:`_schema.Column` +  objects, and/or SQL expression elements, which would identify a unique +  index: + +  .. sourcecode:: pycon+sql + +    >>> do_update_stmt = insert_stmt.on_conflict_do_update( +    ...     index_elements=['id'], +    ...     set_=dict(data='updated value') +    ... ) +    >>> print(do_update_stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT (id) DO UPDATE SET data = %(param_1)s +    {stop} + +    >>> do_update_stmt = insert_stmt.on_conflict_do_update( +    ...     index_elements=[my_table.c.id], +    ...     set_=dict(data='updated value') +    ... ) +    >>> print(do_update_stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT (id) DO UPDATE SET data = %(param_1)s + +* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to +  infer an index, a partial index can be inferred by also specifying the +  use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter: + +  .. sourcecode:: pycon+sql + +    >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') +    >>> stmt = stmt.on_conflict_do_update( +    ...     index_elements=[my_table.c.user_email], +    ...     index_where=my_table.c.user_email.like('%@gmail.com'), +    ...     set_=dict(data=stmt.excluded.data) +    ... ) +    >>> print(stmt) +    {printsql}INSERT INTO my_table (data, user_email) +    VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email) +    WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data + +* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is +  used to specify an index directly rather than inferring it.  This can be +  the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX: + +  .. sourcecode:: pycon+sql + +    >>> do_update_stmt = insert_stmt.on_conflict_do_update( +    ...     constraint='my_table_idx_1', +    ...     set_=dict(data='updated value') +    ... ) +    >>> print(do_update_stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s +    {stop} + +    >>> do_update_stmt = insert_stmt.on_conflict_do_update( +    ...     constraint='my_table_pk', +    ...     set_=dict(data='updated value') +    ... ) +    >>> print(do_update_stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s +    {stop} + +* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may +  also refer to a SQLAlchemy construct representing a constraint, +  e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, +  :class:`.Index`, or :class:`.ExcludeConstraint`.   In this use, +  if the constraint has a name, it is used directly.  Otherwise, if the +  constraint is unnamed, then inference will be used, where the expressions +  and optional WHERE clause of the constraint will be spelled out in the +  construct.  This use is especially convenient +  to refer to the named or unnamed primary key of a :class:`_schema.Table` +  using the +  :attr:`_schema.Table.primary_key` attribute: + +  .. sourcecode:: pycon+sql + +    >>> do_update_stmt = insert_stmt.on_conflict_do_update( +    ...     constraint=my_table.primary_key, +    ...     set_=dict(data='updated value') +    ... ) +    >>> print(do_update_stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT (id) DO UPDATE SET data = %(param_1)s + +The SET Clause +^^^^^^^^^^^^^^^ + +``ON CONFLICT...DO UPDATE`` is used to perform an update of the already +existing row, using any combination of new values as well as values +from the proposed insertion.   These values are specified using the +:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter.  This +parameter accepts a dictionary which consists of direct values +for UPDATE: + +.. sourcecode:: pycon+sql + +    >>> stmt = insert(my_table).values(id='some_id', data='inserted value') +    >>> do_update_stmt = stmt.on_conflict_do_update( +    ...     index_elements=['id'], +    ...     set_=dict(data='updated value') +    ... ) +    >>> print(do_update_stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT (id) DO UPDATE SET data = %(param_1)s + +.. warning:: + +    The :meth:`_expression.Insert.on_conflict_do_update` +    method does **not** take into +    account Python-side default UPDATE values or generation functions, e.g. +    those specified using :paramref:`_schema.Column.onupdate`. +    These values will not be exercised for an ON CONFLICT style of UPDATE, +    unless they are manually specified in the +    :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary. + +Updating using the Excluded INSERT Values +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In order to refer to the proposed insertion row, the special alias +:attr:`~.postgresql.Insert.excluded` is available as an attribute on +the :class:`_postgresql.Insert` object; this object is a +:class:`_expression.ColumnCollection` +which alias contains all columns of the target +table: + +.. sourcecode:: pycon+sql + +    >>> stmt = insert(my_table).values( +    ...     id='some_id', +    ...     data='inserted value', +    ...     author='jlh' +    ... ) +    >>> do_update_stmt = stmt.on_conflict_do_update( +    ...     index_elements=['id'], +    ...     set_=dict(data='updated value', author=stmt.excluded.author) +    ... ) +    >>> print(do_update_stmt) +    {printsql}INSERT INTO my_table (id, data, author) +    VALUES (%(id)s, %(data)s, %(author)s) +    ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author + +Additional WHERE Criteria +^^^^^^^^^^^^^^^^^^^^^^^^^ + +The :meth:`_expression.Insert.on_conflict_do_update` method also accepts +a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where` +parameter, which will limit those rows which receive an UPDATE: + +.. sourcecode:: pycon+sql + +    >>> stmt = insert(my_table).values( +    ...     id='some_id', +    ...     data='inserted value', +    ...     author='jlh' +    ... ) +    >>> on_update_stmt = stmt.on_conflict_do_update( +    ...     index_elements=['id'], +    ...     set_=dict(data='updated value', author=stmt.excluded.author), +    ...     where=(my_table.c.status == 2) +    ... ) +    >>> print(on_update_stmt) +    {printsql}INSERT INTO my_table (id, data, author) +    VALUES (%(id)s, %(data)s, %(author)s) +    ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author +    WHERE my_table.status = %(status_1)s + +Skipping Rows with DO NOTHING +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``ON CONFLICT`` may be used to skip inserting a row entirely +if any conflict with a unique or exclusion constraint occurs; below +this is illustrated using the +:meth:`~.postgresql.Insert.on_conflict_do_nothing` method: + +.. sourcecode:: pycon+sql + +    >>> stmt = insert(my_table).values(id='some_id', data='inserted value') +    >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id']) +    >>> print(stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT (id) DO NOTHING + +If ``DO NOTHING`` is used without specifying any columns or constraint, +it has the effect of skipping the INSERT for any unique or exclusion +constraint violation which occurs: + +.. sourcecode:: pycon+sql + +    >>> stmt = insert(my_table).values(id='some_id', data='inserted value') +    >>> stmt = stmt.on_conflict_do_nothing() +    >>> print(stmt) +    {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) +    ON CONFLICT DO NOTHING + +.. _postgresql_match: + +Full Text Search +---------------- + +PostgreSQL's full text search system is available through the use of the +:data:`.func` namespace, combined with the use of custom operators +via the :meth:`.Operators.bool_op` method.    For simple cases with some +degree of cross-backend compatibility, the :meth:`.Operators.match` operator +may also be used. + +.. _postgresql_simple_match: + +Simple plain text matching with ``match()`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The :meth:`.Operators.match` operator provides for cross-compatible simple +text matching.   For the PostgreSQL backend, it's hardcoded to generate +an expression using the ``@@`` operator in conjunction with the +``plainto_tsquery()`` PostgreSQL function. + +On the PostgreSQL dialect, an expression like the following:: + +    select(sometable.c.text.match("search string")) + +would emit to the database:: + +    SELECT text @@ plainto_tsquery('search string') FROM table + +Above, passing a plain string to :meth:`.Operators.match` will automatically +make use of ``plainto_tsquery()`` to specify the type of tsquery.  This +establishes basic database cross-compatibility for :meth:`.Operators.match` +with other backends. + +.. versionchanged:: 2.0 The default tsquery generation function used by the +   PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``. + +   To render exactly what was rendered in 1.4, use the following form:: + +        from sqlalchemy import func + +        select( +            sometable.c.text.bool_op("@@")(func.to_tsquery("search string")) +        ) + +   Which would emit:: + +        SELECT text @@ to_tsquery('search string') FROM table + +Using PostgreSQL full text functions and operators directly +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Text search operations beyond the simple use of :meth:`.Operators.match` +may make use of the :data:`.func` namespace to generate PostgreSQL full-text +functions, in combination with :meth:`.Operators.bool_op` to generate +any boolean operator. + +For example, the query:: + +    select( +        func.to_tsquery('cat').bool_op("@>")(func.to_tsquery('cat & rat')) +    ) + +would generate: + +.. sourcecode:: sql + +    SELECT to_tsquery('cat') @> to_tsquery('cat & rat') + + +The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST:: + +    from sqlalchemy.dialects.postgresql import TSVECTOR +    from sqlalchemy import select, cast +    select(cast("some text", TSVECTOR)) + +produces a statement equivalent to:: + +    SELECT CAST('some text' AS TSVECTOR) AS anon_1 + +The ``func`` namespace is augmented by the PostgreSQL dialect to set up +correct argument and return types for most full text search functions. +These functions are used automatically by the :attr:`_sql.func` namespace +assuming the ``sqlalchemy.dialects.postgresql`` package has been imported, +or :func:`_sa.create_engine` has been invoked using a ``postgresql`` +dialect.  These functions are documented at: + +* :class:`_postgresql.to_tsvector` +* :class:`_postgresql.to_tsquery` +* :class:`_postgresql.plainto_tsquery` +* :class:`_postgresql.phraseto_tsquery` +* :class:`_postgresql.websearch_to_tsquery` +* :class:`_postgresql.ts_headline` + +Specifying the "regconfig" with ``match()`` or custom operators +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +PostgreSQL's ``plainto_tsquery()`` function accepts an optional +"regconfig" argument that is used to instruct PostgreSQL to use a +particular pre-computed GIN or GiST index in order to perform the search. +When using :meth:`.Operators.match`, this additional parameter may be +specified using the ``postgresql_regconfig`` parameter, such as:: + +    select(mytable.c.id).where( +        mytable.c.title.match('somestring', postgresql_regconfig='english') +    ) + +Which would emit:: + +    SELECT mytable.id FROM mytable +    WHERE mytable.title @@ plainto_tsquery('english', 'somestring') + +When using other PostgreSQL search functions with :data:`.func`, the +"regconfig" parameter may be passed directly as the initial argument:: + +    select(mytable.c.id).where( +        func.to_tsvector("english", mytable.c.title).bool_op("@@")( +            func.to_tsquery("english", "somestring") +        ) +    ) + +produces a statement equivalent to:: + +    SELECT mytable.id FROM mytable +    WHERE to_tsvector('english', mytable.title) @@ +        to_tsquery('english', 'somestring') + +It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from +PostgreSQL to ensure that you are generating queries with SQLAlchemy that +take full advantage of any indexes you may have created for full text search. + +.. seealso:: + +    `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation + + +FROM ONLY ... +------------- + +The dialect supports PostgreSQL's ONLY keyword for targeting only a particular +table in an inheritance hierarchy. This can be used to produce the +``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...`` +syntaxes. It uses SQLAlchemy's hints mechanism:: + +    # SELECT ... FROM ONLY ... +    result = table.select().with_hint(table, 'ONLY', 'postgresql') +    print(result.fetchall()) + +    # UPDATE ONLY ... +    table.update(values=dict(foo='bar')).with_hint('ONLY', +                                                   dialect_name='postgresql') + +    # DELETE FROM ONLY ... +    table.delete().with_hint('ONLY', dialect_name='postgresql') + + +.. _postgresql_indexes: + +PostgreSQL-Specific Index Options +--------------------------------- + +Several extensions to the :class:`.Index` construct are available, specific +to the PostgreSQL dialect. + +Covering Indexes +^^^^^^^^^^^^^^^^ + +The ``postgresql_include`` option renders INCLUDE(colname) for the given +string names:: + +    Index("my_index", table.c.x, postgresql_include=['y']) + +would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` + +Note that this feature requires PostgreSQL 11 or later. + +.. versionadded:: 1.4 + +.. _postgresql_partial_indexes: + +Partial Indexes +^^^^^^^^^^^^^^^ + +Partial indexes add criterion to the index definition so that the index is +applied to a subset of rows.   These can be specified on :class:`.Index` +using the ``postgresql_where`` keyword argument:: + +  Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10) + +.. _postgresql_operator_classes: + +Operator Classes +^^^^^^^^^^^^^^^^ + +PostgreSQL allows the specification of an *operator class* for each column of +an index (see +https://www.postgresql.org/docs/current/interactive/indexes-opclass.html). +The :class:`.Index` construct allows these to be specified via the +``postgresql_ops`` keyword argument:: + +    Index( +        'my_index', my_table.c.id, my_table.c.data, +        postgresql_ops={ +            'data': 'text_pattern_ops', +            'id': 'int4_ops' +        }) + +Note that the keys in the ``postgresql_ops`` dictionaries are the +"key" name of the :class:`_schema.Column`, i.e. the name used to access it from +the ``.c`` collection of :class:`_schema.Table`, which can be configured to be +different than the actual name of the column as expressed in the database. + +If ``postgresql_ops`` is to be used against a complex SQL expression such +as a function call, then to apply to the column it must be given a label +that is identified in the dictionary by name, e.g.:: + +    Index( +        'my_index', my_table.c.id, +        func.lower(my_table.c.data).label('data_lower'), +        postgresql_ops={ +            'data_lower': 'text_pattern_ops', +            'id': 'int4_ops' +        }) + +Operator classes are also supported by the +:class:`_postgresql.ExcludeConstraint` construct using the +:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for +details. + +.. versionadded:: 1.3.21 added support for operator classes with +   :class:`_postgresql.ExcludeConstraint`. + + +Index Types +^^^^^^^^^^^ + +PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well +as the ability for users to create their own (see +https://www.postgresql.org/docs/current/static/indexes-types.html). These can be +specified on :class:`.Index` using the ``postgresql_using`` keyword argument:: + +    Index('my_index', my_table.c.data, postgresql_using='gin') + +The value passed to the keyword argument will be simply passed through to the +underlying CREATE INDEX command, so it *must* be a valid index type for your +version of PostgreSQL. + +.. _postgresql_index_storage: + +Index Storage Parameters +^^^^^^^^^^^^^^^^^^^^^^^^ + +PostgreSQL allows storage parameters to be set on indexes. The storage +parameters available depend on the index method used by the index. Storage +parameters can be specified on :class:`.Index` using the ``postgresql_with`` +keyword argument:: + +    Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50}) + +PostgreSQL allows to define the tablespace in which to create the index. +The tablespace can be specified on :class:`.Index` using the +``postgresql_tablespace`` keyword argument:: + +    Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace') + +Note that the same option is available on :class:`_schema.Table` as well. + +.. _postgresql_index_concurrently: + +Indexes with CONCURRENTLY +^^^^^^^^^^^^^^^^^^^^^^^^^ + +The PostgreSQL index option CONCURRENTLY is supported by passing the +flag ``postgresql_concurrently`` to the :class:`.Index` construct:: + +    tbl = Table('testtbl', m, Column('data', Integer)) + +    idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True) + +The above index construct will render DDL for CREATE INDEX, assuming +PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as:: + +    CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data) + +For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for +a connection-less dialect, it will emit:: + +    DROP INDEX CONCURRENTLY test_idx1 + +When using CONCURRENTLY, the PostgreSQL database requires that the statement +be invoked outside of a transaction block.   The Python DBAPI enforces that +even for a single statement, a transaction is present, so to use this +construct, the DBAPI's "autocommit" mode must be used:: + +    metadata = MetaData() +    table = Table( +        "foo", metadata, +        Column("id", String)) +    index = Index( +        "foo_idx", table.c.id, postgresql_concurrently=True) + +    with engine.connect() as conn: +        with conn.execution_options(isolation_level='AUTOCOMMIT'): +            table.create(conn) + +.. seealso:: + +    :ref:`postgresql_isolation_level` + +.. _postgresql_index_reflection: + +PostgreSQL Index Reflection +--------------------------- + +The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the +UNIQUE CONSTRAINT construct is used.   When inspecting a table using +:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes` +and the :meth:`_reflection.Inspector.get_unique_constraints` +will report on these +two constructs distinctly; in the case of the index, the key +``duplicates_constraint`` will be present in the index entry if it is +detected as mirroring a constraint.   When performing reflection using +``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned +in :attr:`_schema.Table.indexes` when it is detected as mirroring a +:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection +. + +Special Reflection Options +-------------------------- + +The :class:`_reflection.Inspector` +used for the PostgreSQL backend is an instance +of :class:`.PGInspector`, which offers additional methods:: + +    from sqlalchemy import create_engine, inspect + +    engine = create_engine("postgresql+psycopg2://localhost/test") +    insp = inspect(engine)  # will be a PGInspector + +    print(insp.get_enums()) + +.. autoclass:: PGInspector +    :members: + +.. _postgresql_table_options: + +PostgreSQL Table Options +------------------------ + +Several options for CREATE TABLE are supported directly by the PostgreSQL +dialect in conjunction with the :class:`_schema.Table` construct: + +* ``INHERITS``:: + +    Table("some_table", metadata, ..., postgresql_inherits="some_supertable") + +    Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...)) + +* ``ON COMMIT``:: + +    Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS') + +* ``PARTITION BY``:: + +    Table("some_table", metadata, ..., +          postgresql_partition_by='LIST (part_column)') + +    .. versionadded:: 1.2.6 + +* ``TABLESPACE``:: + +    Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace') + +  The above option is also available on the :class:`.Index` construct. + +* ``USING``:: + +    Table("some_table", metadata, ..., postgresql_using='heap') + +    .. versionadded:: 2.0.26 + +* ``WITH OIDS``:: + +    Table("some_table", metadata, ..., postgresql_with_oids=True) + +* ``WITHOUT OIDS``:: + +    Table("some_table", metadata, ..., postgresql_with_oids=False) + +.. seealso:: + +    `PostgreSQL CREATE TABLE options +    <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ - +    in the PostgreSQL documentation. + +.. _postgresql_constraint_options: + +PostgreSQL Constraint Options +----------------------------- + +The following option(s) are supported by the PostgreSQL dialect in conjunction +with selected constraint constructs: + +* ``NOT VALID``:  This option applies towards CHECK and FOREIGN KEY constraints +  when the constraint is being added to an existing table via ALTER TABLE, +  and has the effect that existing rows are not scanned during the ALTER +  operation against the constraint being added. + +  When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_ +  that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument +  may be specified as an additional keyword argument within the operation +  that creates the constraint, as in the following Alembic example:: + +        def update(): +            op.create_foreign_key( +                "fk_user_address", +                "address", +                "user", +                ["user_id"], +                ["id"], +                postgresql_not_valid=True +            ) + +  The keyword is ultimately accepted directly by the +  :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint` +  and :class:`_schema.ForeignKey` constructs; when using a tool like +  Alembic, dialect-specific keyword arguments are passed through to +  these constructs from the migration operation directives:: + +       CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True) + +       ForeignKeyConstraint(["some_id"], ["some_table.some_id"], postgresql_not_valid=True) + +  .. versionadded:: 1.4.32 + +  .. seealso:: + +      `PostgreSQL ALTER TABLE options +      <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ - +      in the PostgreSQL documentation. + +.. _postgresql_table_valued_overview: + +Table values, Table and Column valued functions, Row and Tuple objects +----------------------------------------------------------------------- + +PostgreSQL makes great use of modern SQL forms such as table-valued functions, +tables and rows as values.   These constructs are commonly used as part +of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other +datatypes.  SQLAlchemy's SQL expression language has native support for +most table-valued and row-valued forms. + +.. _postgresql_table_valued: + +Table-Valued Functions +^^^^^^^^^^^^^^^^^^^^^^^ + +Many PostgreSQL built-in functions are intended to be used in the FROM clause +of a SELECT statement, and are capable of returning table rows or sets of table +rows. A large portion of PostgreSQL's JSON functions for example such as +``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``, +``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such +forms. These classes of SQL function calling forms in SQLAlchemy are available +using the :meth:`_functions.FunctionElement.table_valued` method in conjunction +with :class:`_functions.Function` objects generated from the :data:`_sql.func` +namespace. + +Examples from PostgreSQL's reference documentation follow below: + +* ``json_each()``: + +  .. sourcecode:: pycon+sql + +    >>> from sqlalchemy import select, func +    >>> stmt = select(func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value")) +    >>> print(stmt) +    {printsql}SELECT anon_1.key, anon_1.value +    FROM json_each(:json_each_1) AS anon_1 + +* ``json_populate_record()``: + +  .. sourcecode:: pycon+sql + +    >>> from sqlalchemy import select, func, literal_column +    >>> stmt = select( +    ...     func.json_populate_record( +    ...         literal_column("null::myrowtype"), +    ...         '{"a":1,"b":2}' +    ...     ).table_valued("a", "b", name="x") +    ... ) +    >>> print(stmt) +    {printsql}SELECT x.a, x.b +    FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x + +* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived +  columns in the alias, where we may make use of :func:`_sql.column` elements with +  types to produce them.  The :meth:`_functions.FunctionElement.table_valued` +  method produces  a :class:`_sql.TableValuedAlias` construct, and the method +  :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived +  columns specification: + +  .. sourcecode:: pycon+sql + +    >>> from sqlalchemy import select, func, column, Integer, Text +    >>> stmt = select( +    ...     func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}').table_valued( +    ...         column("a", Integer), column("b", Text), column("d", Text), +    ...     ).render_derived(name="x", with_types=True) +    ... ) +    >>> print(stmt) +    {printsql}SELECT x.a, x.b, x.d +    FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT) + +* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an +  ordinal counter to the output of a function and is accepted by a limited set +  of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The +  :meth:`_functions.FunctionElement.table_valued` method accepts a keyword +  parameter ``with_ordinality`` for this purpose, which accepts the string name +  that will be applied to the "ordinality" column: + +  .. sourcecode:: pycon+sql + +    >>> from sqlalchemy import select, func +    >>> stmt = select( +    ...     func.generate_series(4, 1, -1). +    ...     table_valued("value", with_ordinality="ordinality"). +    ...     render_derived() +    ... ) +    >>> print(stmt) +    {printsql}SELECT anon_1.value, anon_1.ordinality +    FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) +    WITH ORDINALITY AS anon_1(value, ordinality) + +.. versionadded:: 1.4.0b2 + +.. seealso:: + +    :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial` + +.. _postgresql_column_valued: + +Column Valued Functions +^^^^^^^^^^^^^^^^^^^^^^^ + +Similar to the table valued function, a column valued function is present +in the FROM clause, but delivers itself to the columns clause as a single +scalar value.  PostgreSQL functions such as ``json_array_elements()``, +``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the +:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`: + +* ``json_array_elements()``: + +  .. sourcecode:: pycon+sql + +    >>> from sqlalchemy import select, func +    >>> stmt = select(func.json_array_elements('["one", "two"]').column_valued("x")) +    >>> print(stmt) +    {printsql}SELECT x +    FROM json_array_elements(:json_array_elements_1) AS x + +* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the +  :func:`_postgresql.array` construct may be used: + +  .. sourcecode:: pycon+sql + +    >>> from sqlalchemy.dialects.postgresql import array +    >>> from sqlalchemy import select, func +    >>> stmt = select(func.unnest(array([1, 2])).column_valued()) +    >>> print(stmt) +    {printsql}SELECT anon_1 +    FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1 + +  The function can of course be used against an existing table-bound column +  that's of type :class:`_types.ARRAY`: + +  .. sourcecode:: pycon+sql + +    >>> from sqlalchemy import table, column, ARRAY, Integer +    >>> from sqlalchemy import select, func +    >>> t = table("t", column('value', ARRAY(Integer))) +    >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value")) +    >>> print(stmt) +    {printsql}SELECT unnested_value +    FROM unnest(t.value) AS unnested_value + +.. seealso:: + +    :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial` + + +Row Types +^^^^^^^^^ + +Built-in support for rendering a ``ROW`` may be approximated using +``func.ROW`` with the :attr:`_sa.func` namespace, or by using the +:func:`_sql.tuple_` construct: + +.. sourcecode:: pycon+sql + +    >>> from sqlalchemy import table, column, func, tuple_ +    >>> t = table("t", column("id"), column("fk")) +    >>> stmt = t.select().where( +    ...     tuple_(t.c.id, t.c.fk) > (1,2) +    ... ).where( +    ...     func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7) +    ... ) +    >>> print(stmt) +    {printsql}SELECT t.id, t.fk +    FROM t +    WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2) + +.. seealso:: + +    `PostgreSQL Row Constructors +    <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_ + +    `PostgreSQL Row Constructor Comparison +    <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_ + +Table Types passed to Functions +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +PostgreSQL supports passing a table as an argument to a function, which is +known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects +such as :class:`_schema.Table` support this special form using the +:meth:`_sql.FromClause.table_valued` method, which is comparable to the +:meth:`_functions.FunctionElement.table_valued` method except that the collection +of columns is already established by that of the :class:`_sql.FromClause` +itself: + +.. sourcecode:: pycon+sql + +    >>> from sqlalchemy import table, column, func, select +    >>> a = table( "a", column("id"), column("x"), column("y")) +    >>> stmt = select(func.row_to_json(a.table_valued())) +    >>> print(stmt) +    {printsql}SELECT row_to_json(a) AS row_to_json_1 +    FROM a + +.. versionadded:: 1.4.0b2 + + + +"""  # noqa: E501 + +from __future__ import annotations + +from collections import defaultdict +from functools import lru_cache +import re +from typing import Any +from typing import cast +from typing import List +from typing import Optional +from typing import Tuple +from typing import TYPE_CHECKING +from typing import Union + +from . import arraylib as _array +from . import json as _json +from . import pg_catalog +from . import ranges as _ranges +from .ext import _regconfig_fn +from .ext import aggregate_order_by +from .hstore import HSTORE +from .named_types import CreateDomainType as CreateDomainType  # noqa: F401 +from .named_types import CreateEnumType as CreateEnumType  # noqa: F401 +from .named_types import DOMAIN as DOMAIN  # noqa: F401 +from .named_types import DropDomainType as DropDomainType  # noqa: F401 +from .named_types import DropEnumType as DropEnumType  # noqa: F401 +from .named_types import ENUM as ENUM  # noqa: F401 +from .named_types import NamedType as NamedType  # noqa: F401 +from .types import _DECIMAL_TYPES  # noqa: F401 +from .types import _FLOAT_TYPES  # noqa: F401 +from .types import _INT_TYPES  # noqa: F401 +from .types import BIT as BIT +from .types import BYTEA as BYTEA +from .types import CIDR as CIDR +from .types import CITEXT as CITEXT +from .types import INET as INET +from .types import INTERVAL as INTERVAL +from .types import MACADDR as MACADDR +from .types import MACADDR8 as MACADDR8 +from .types import MONEY as MONEY +from .types import OID as OID +from .types import PGBit as PGBit  # noqa: F401 +from .types import PGCidr as PGCidr  # noqa: F401 +from .types import PGInet as PGInet  # noqa: F401 +from .types import PGInterval as PGInterval  # noqa: F401 +from .types import PGMacAddr as PGMacAddr  # noqa: F401 +from .types import PGMacAddr8 as PGMacAddr8  # noqa: F401 +from .types import PGUuid as PGUuid +from .types import REGCLASS as REGCLASS +from .types import REGCONFIG as REGCONFIG  # noqa: F401 +from .types import TIME as TIME +from .types import TIMESTAMP as TIMESTAMP +from .types import TSVECTOR as TSVECTOR +from ... import exc +from ... import schema +from ... import select +from ... import sql +from ... import util +from ...engine import characteristics +from ...engine import default +from ...engine import interfaces +from ...engine import ObjectKind +from ...engine import ObjectScope +from ...engine import reflection +from ...engine import URL +from ...engine.reflection import ReflectionDefaults +from ...sql import bindparam +from ...sql import coercions +from ...sql import compiler +from ...sql import elements +from ...sql import expression +from ...sql import roles +from ...sql import sqltypes +from ...sql import util as sql_util +from ...sql.compiler import InsertmanyvaluesSentinelOpts +from ...sql.visitors import InternalTraversal +from ...types import BIGINT +from ...types import BOOLEAN +from ...types import CHAR +from ...types import DATE +from ...types import DOUBLE_PRECISION +from ...types import FLOAT +from ...types import INTEGER +from ...types import NUMERIC +from ...types import REAL +from ...types import SMALLINT +from ...types import TEXT +from ...types import UUID as UUID +from ...types import VARCHAR +from ...util.typing import TypedDict + +IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I) + +RESERVED_WORDS = { +    "all", +    "analyse", +    "analyze", +    "and", +    "any", +    "array", +    "as", +    "asc", +    "asymmetric", +    "both", +    "case", +    "cast", +    "check", +    "collate", +    "column", +    "constraint", +    "create", +    "current_catalog", +    "current_date", +    "current_role", +    "current_time", +    "current_timestamp", +    "current_user", +    "default", +    "deferrable", +    "desc", +    "distinct", +    "do", +    "else", +    "end", +    "except", +    "false", +    "fetch", +    "for", +    "foreign", +    "from", +    "grant", +    "group", +    "having", +    "in", +    "initially", +    "intersect", +    "into", +    "leading", +    "limit", +    "localtime", +    "localtimestamp", +    "new", +    "not", +    "null", +    "of", +    "off", +    "offset", +    "old", +    "on", +    "only", +    "or", +    "order", +    "placing", +    "primary", +    "references", +    "returning", +    "select", +    "session_user", +    "some", +    "symmetric", +    "table", +    "then", +    "to", +    "trailing", +    "true", +    "union", +    "unique", +    "user", +    "using", +    "variadic", +    "when", +    "where", +    "window", +    "with", +    "authorization", +    "between", +    "binary", +    "cross", +    "current_schema", +    "freeze", +    "full", +    "ilike", +    "inner", +    "is", +    "isnull", +    "join", +    "left", +    "like", +    "natural", +    "notnull", +    "outer", +    "over", +    "overlaps", +    "right", +    "similar", +    "verbose", +} + +colspecs = { +    sqltypes.ARRAY: _array.ARRAY, +    sqltypes.Interval: INTERVAL, +    sqltypes.Enum: ENUM, +    sqltypes.JSON.JSONPathType: _json.JSONPATH, +    sqltypes.JSON: _json.JSON, +    sqltypes.Uuid: PGUuid, +} + + +ischema_names = { +    "_array": _array.ARRAY, +    "hstore": HSTORE, +    "json": _json.JSON, +    "jsonb": _json.JSONB, +    "int4range": _ranges.INT4RANGE, +    "int8range": _ranges.INT8RANGE, +    "numrange": _ranges.NUMRANGE, +    "daterange": _ranges.DATERANGE, +    "tsrange": _ranges.TSRANGE, +    "tstzrange": _ranges.TSTZRANGE, +    "int4multirange": _ranges.INT4MULTIRANGE, +    "int8multirange": _ranges.INT8MULTIRANGE, +    "nummultirange": _ranges.NUMMULTIRANGE, +    "datemultirange": _ranges.DATEMULTIRANGE, +    "tsmultirange": _ranges.TSMULTIRANGE, +    "tstzmultirange": _ranges.TSTZMULTIRANGE, +    "integer": INTEGER, +    "bigint": BIGINT, +    "smallint": SMALLINT, +    "character varying": VARCHAR, +    "character": CHAR, +    '"char"': sqltypes.String, +    "name": sqltypes.String, +    "text": TEXT, +    "numeric": NUMERIC, +    "float": FLOAT, +    "real": REAL, +    "inet": INET, +    "cidr": CIDR, +    "citext": CITEXT, +    "uuid": UUID, +    "bit": BIT, +    "bit varying": BIT, +    "macaddr": MACADDR, +    "macaddr8": MACADDR8, +    "money": MONEY, +    "oid": OID, +    "regclass": REGCLASS, +    "double precision": DOUBLE_PRECISION, +    "timestamp": TIMESTAMP, +    "timestamp with time zone": TIMESTAMP, +    "timestamp without time zone": TIMESTAMP, +    "time with time zone": TIME, +    "time without time zone": TIME, +    "date": DATE, +    "time": TIME, +    "bytea": BYTEA, +    "boolean": BOOLEAN, +    "interval": INTERVAL, +    "tsvector": TSVECTOR, +} + + +class PGCompiler(compiler.SQLCompiler): +    def visit_to_tsvector_func(self, element, **kw): +        return self._assert_pg_ts_ext(element, **kw) + +    def visit_to_tsquery_func(self, element, **kw): +        return self._assert_pg_ts_ext(element, **kw) + +    def visit_plainto_tsquery_func(self, element, **kw): +        return self._assert_pg_ts_ext(element, **kw) + +    def visit_phraseto_tsquery_func(self, element, **kw): +        return self._assert_pg_ts_ext(element, **kw) + +    def visit_websearch_to_tsquery_func(self, element, **kw): +        return self._assert_pg_ts_ext(element, **kw) + +    def visit_ts_headline_func(self, element, **kw): +        return self._assert_pg_ts_ext(element, **kw) + +    def _assert_pg_ts_ext(self, element, **kw): +        if not isinstance(element, _regconfig_fn): +            # other options here include trying to rewrite the function +            # with the correct types.  however, that means we have to +            # "un-SQL-ize" the first argument, which can't work in a +            # generalized way. Also, parent compiler class has already added +            # the incorrect return type to the result map.   So let's just +            # make sure the function we want is used up front. + +            raise exc.CompileError( +                f'Can\'t compile "{element.name}()" full text search ' +                f"function construct that does not originate from the " +                f'"sqlalchemy.dialects.postgresql" package.  ' +                f'Please ensure "import sqlalchemy.dialects.postgresql" is ' +                f"called before constructing " +                f'"sqlalchemy.func.{element.name}()" to ensure registration ' +                f"of the correct argument and return types." +            ) + +        return f"{element.name}{self.function_argspec(element, **kw)}" + +    def render_bind_cast(self, type_, dbapi_type, sqltext): +        if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length: +            # use VARCHAR with no length for VARCHAR cast. +            # see #9511 +            dbapi_type = sqltypes.STRINGTYPE +        return f"""{sqltext}::{ +            self.dialect.type_compiler_instance.process( +                dbapi_type, identifier_preparer=self.preparer +            ) +        }""" + +    def visit_array(self, element, **kw): +        return "ARRAY[%s]" % self.visit_clauselist(element, **kw) + +    def visit_slice(self, element, **kw): +        return "%s:%s" % ( +            self.process(element.start, **kw), +            self.process(element.stop, **kw), +        ) + +    def visit_bitwise_xor_op_binary(self, binary, operator, **kw): +        return self._generate_generic_binary(binary, " # ", **kw) + +    def visit_json_getitem_op_binary( +        self, binary, operator, _cast_applied=False, **kw +    ): +        if ( +            not _cast_applied +            and binary.type._type_affinity is not sqltypes.JSON +        ): +            kw["_cast_applied"] = True +            return self.process(sql.cast(binary, binary.type), **kw) + +        kw["eager_grouping"] = True + +        return self._generate_generic_binary( +            binary, " -> " if not _cast_applied else " ->> ", **kw +        ) + +    def visit_json_path_getitem_op_binary( +        self, binary, operator, _cast_applied=False, **kw +    ): +        if ( +            not _cast_applied +            and binary.type._type_affinity is not sqltypes.JSON +        ): +            kw["_cast_applied"] = True +            return self.process(sql.cast(binary, binary.type), **kw) + +        kw["eager_grouping"] = True +        return self._generate_generic_binary( +            binary, " #> " if not _cast_applied else " #>> ", **kw +        ) + +    def visit_getitem_binary(self, binary, operator, **kw): +        return "%s[%s]" % ( +            self.process(binary.left, **kw), +            self.process(binary.right, **kw), +        ) + +    def visit_aggregate_order_by(self, element, **kw): +        return "%s ORDER BY %s" % ( +            self.process(element.target, **kw), +            self.process(element.order_by, **kw), +        ) + +    def visit_match_op_binary(self, binary, operator, **kw): +        if "postgresql_regconfig" in binary.modifiers: +            regconfig = self.render_literal_value( +                binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE +            ) +            if regconfig: +                return "%s @@ plainto_tsquery(%s, %s)" % ( +                    self.process(binary.left, **kw), +                    regconfig, +                    self.process(binary.right, **kw), +                ) +        return "%s @@ plainto_tsquery(%s)" % ( +            self.process(binary.left, **kw), +            self.process(binary.right, **kw), +        ) + +    def visit_ilike_case_insensitive_operand(self, element, **kw): +        return element.element._compiler_dispatch(self, **kw) + +    def visit_ilike_op_binary(self, binary, operator, **kw): +        escape = binary.modifiers.get("escape", None) + +        return "%s ILIKE %s" % ( +            self.process(binary.left, **kw), +            self.process(binary.right, **kw), +        ) + ( +            " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) +            if escape is not None +            else "" +        ) + +    def visit_not_ilike_op_binary(self, binary, operator, **kw): +        escape = binary.modifiers.get("escape", None) +        return "%s NOT ILIKE %s" % ( +            self.process(binary.left, **kw), +            self.process(binary.right, **kw), +        ) + ( +            " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) +            if escape is not None +            else "" +        ) + +    def _regexp_match(self, base_op, binary, operator, kw): +        flags = binary.modifiers["flags"] +        if flags is None: +            return self._generate_generic_binary( +                binary, " %s " % base_op, **kw +            ) +        if flags == "i": +            return self._generate_generic_binary( +                binary, " %s* " % base_op, **kw +            ) +        return "%s %s CONCAT('(?', %s, ')', %s)" % ( +            self.process(binary.left, **kw), +            base_op, +            self.render_literal_value(flags, sqltypes.STRINGTYPE), +            self.process(binary.right, **kw), +        ) + +    def visit_regexp_match_op_binary(self, binary, operator, **kw): +        return self._regexp_match("~", binary, operator, kw) + +    def visit_not_regexp_match_op_binary(self, binary, operator, **kw): +        return self._regexp_match("!~", binary, operator, kw) + +    def visit_regexp_replace_op_binary(self, binary, operator, **kw): +        string = self.process(binary.left, **kw) +        pattern_replace = self.process(binary.right, **kw) +        flags = binary.modifiers["flags"] +        if flags is None: +            return "REGEXP_REPLACE(%s, %s)" % ( +                string, +                pattern_replace, +            ) +        else: +            return "REGEXP_REPLACE(%s, %s, %s)" % ( +                string, +                pattern_replace, +                self.render_literal_value(flags, sqltypes.STRINGTYPE), +            ) + +    def visit_empty_set_expr(self, element_types, **kw): +        # cast the empty set to the type we are comparing against.  if +        # we are comparing against the null type, pick an arbitrary +        # datatype for the empty set +        return "SELECT %s WHERE 1!=1" % ( +            ", ".join( +                "CAST(NULL AS %s)" +                % self.dialect.type_compiler_instance.process( +                    INTEGER() if type_._isnull else type_ +                ) +                for type_ in element_types or [INTEGER()] +            ), +        ) + +    def render_literal_value(self, value, type_): +        value = super().render_literal_value(value, type_) + +        if self.dialect._backslash_escapes: +            value = value.replace("\\", "\\\\") +        return value + +    def visit_aggregate_strings_func(self, fn, **kw): +        return "string_agg%s" % self.function_argspec(fn) + +    def visit_sequence(self, seq, **kw): +        return "nextval('%s')" % self.preparer.format_sequence(seq) + +    def limit_clause(self, select, **kw): +        text = "" +        if select._limit_clause is not None: +            text += " \n LIMIT " + self.process(select._limit_clause, **kw) +        if select._offset_clause is not None: +            if select._limit_clause is None: +                text += "\n LIMIT ALL" +            text += " OFFSET " + self.process(select._offset_clause, **kw) +        return text + +    def format_from_hint_text(self, sqltext, table, hint, iscrud): +        if hint.upper() != "ONLY": +            raise exc.CompileError("Unrecognized hint: %r" % hint) +        return "ONLY " + sqltext + +    def get_select_precolumns(self, select, **kw): +        # Do not call super().get_select_precolumns because +        # it will warn/raise when distinct on is present +        if select._distinct or select._distinct_on: +            if select._distinct_on: +                return ( +                    "DISTINCT ON (" +                    + ", ".join( +                        [ +                            self.process(col, **kw) +                            for col in select._distinct_on +                        ] +                    ) +                    + ") " +                ) +            else: +                return "DISTINCT " +        else: +            return "" + +    def for_update_clause(self, select, **kw): +        if select._for_update_arg.read: +            if select._for_update_arg.key_share: +                tmp = " FOR KEY SHARE" +            else: +                tmp = " FOR SHARE" +        elif select._for_update_arg.key_share: +            tmp = " FOR NO KEY UPDATE" +        else: +            tmp = " FOR UPDATE" + +        if select._for_update_arg.of: +            tables = util.OrderedSet() +            for c in select._for_update_arg.of: +                tables.update(sql_util.surface_selectables_only(c)) + +            tmp += " OF " + ", ".join( +                self.process(table, ashint=True, use_schema=False, **kw) +                for table in tables +            ) + +        if select._for_update_arg.nowait: +            tmp += " NOWAIT" +        if select._for_update_arg.skip_locked: +            tmp += " SKIP LOCKED" + +        return tmp + +    def visit_substring_func(self, func, **kw): +        s = self.process(func.clauses.clauses[0], **kw) +        start = self.process(func.clauses.clauses[1], **kw) +        if len(func.clauses.clauses) > 2: +            length = self.process(func.clauses.clauses[2], **kw) +            return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) +        else: +            return "SUBSTRING(%s FROM %s)" % (s, start) + +    def _on_conflict_target(self, clause, **kw): +        if clause.constraint_target is not None: +            # target may be a name of an Index, UniqueConstraint or +            # ExcludeConstraint.  While there is a separate +            # "max_identifier_length" for indexes, PostgreSQL uses the same +            # length for all objects so we can use +            # truncate_and_render_constraint_name +            target_text = ( +                "ON CONSTRAINT %s" +                % self.preparer.truncate_and_render_constraint_name( +                    clause.constraint_target +                ) +            ) +        elif clause.inferred_target_elements is not None: +            target_text = "(%s)" % ", ".join( +                ( +                    self.preparer.quote(c) +                    if isinstance(c, str) +                    else self.process(c, include_table=False, use_schema=False) +                ) +                for c in clause.inferred_target_elements +            ) +            if clause.inferred_target_whereclause is not None: +                target_text += " WHERE %s" % self.process( +                    clause.inferred_target_whereclause, +                    include_table=False, +                    use_schema=False, +                ) +        else: +            target_text = "" + +        return target_text + +    def visit_on_conflict_do_nothing(self, on_conflict, **kw): +        target_text = self._on_conflict_target(on_conflict, **kw) + +        if target_text: +            return "ON CONFLICT %s DO NOTHING" % target_text +        else: +            return "ON CONFLICT DO NOTHING" + +    def visit_on_conflict_do_update(self, on_conflict, **kw): +        clause = on_conflict + +        target_text = self._on_conflict_target(on_conflict, **kw) + +        action_set_ops = [] + +        set_parameters = dict(clause.update_values_to_set) +        # create a list of column assignment clauses as tuples + +        insert_statement = self.stack[-1]["selectable"] +        cols = insert_statement.table.c +        for c in cols: +            col_key = c.key + +            if col_key in set_parameters: +                value = set_parameters.pop(col_key) +            elif c in set_parameters: +                value = set_parameters.pop(c) +            else: +                continue + +            if coercions._is_literal(value): +                value = elements.BindParameter(None, value, type_=c.type) + +            else: +                if ( +                    isinstance(value, elements.BindParameter) +                    and value.type._isnull +                ): +                    value = value._clone() +                    value.type = c.type +            value_text = self.process(value.self_group(), use_schema=False) + +            key_text = self.preparer.quote(c.name) +            action_set_ops.append("%s = %s" % (key_text, value_text)) + +        # check for names that don't match columns +        if set_parameters: +            util.warn( +                "Additional column names not matching " +                "any column keys in table '%s': %s" +                % ( +                    self.current_executable.table.name, +                    (", ".join("'%s'" % c for c in set_parameters)), +                ) +            ) +            for k, v in set_parameters.items(): +                key_text = ( +                    self.preparer.quote(k) +                    if isinstance(k, str) +                    else self.process(k, use_schema=False) +                ) +                value_text = self.process( +                    coercions.expect(roles.ExpressionElementRole, v), +                    use_schema=False, +                ) +                action_set_ops.append("%s = %s" % (key_text, value_text)) + +        action_text = ", ".join(action_set_ops) +        if clause.update_whereclause is not None: +            action_text += " WHERE %s" % self.process( +                clause.update_whereclause, include_table=True, use_schema=False +            ) + +        return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) + +    def update_from_clause( +        self, update_stmt, from_table, extra_froms, from_hints, **kw +    ): +        kw["asfrom"] = True +        return "FROM " + ", ".join( +            t._compiler_dispatch(self, fromhints=from_hints, **kw) +            for t in extra_froms +        ) + +    def delete_extra_from_clause( +        self, delete_stmt, from_table, extra_froms, from_hints, **kw +    ): +        """Render the DELETE .. USING clause specific to PostgreSQL.""" +        kw["asfrom"] = True +        return "USING " + ", ".join( +            t._compiler_dispatch(self, fromhints=from_hints, **kw) +            for t in extra_froms +        ) + +    def fetch_clause(self, select, **kw): +        # pg requires parens for non literal clauses. It's also required for +        # bind parameters if a ::type casts is used by the driver (asyncpg), +        # so it's easiest to just always add it +        text = "" +        if select._offset_clause is not None: +            text += "\n OFFSET (%s) ROWS" % self.process( +                select._offset_clause, **kw +            ) +        if select._fetch_clause is not None: +            text += "\n FETCH FIRST (%s)%s ROWS %s" % ( +                self.process(select._fetch_clause, **kw), +                " PERCENT" if select._fetch_clause_options["percent"] else "", +                ( +                    "WITH TIES" +                    if select._fetch_clause_options["with_ties"] +                    else "ONLY" +                ), +            ) +        return text + + +class PGDDLCompiler(compiler.DDLCompiler): +    def get_column_specification(self, column, **kwargs): +        colspec = self.preparer.format_column(column) +        impl_type = column.type.dialect_impl(self.dialect) +        if isinstance(impl_type, sqltypes.TypeDecorator): +            impl_type = impl_type.impl + +        has_identity = ( +            column.identity is not None +            and self.dialect.supports_identity_columns +        ) + +        if ( +            column.primary_key +            and column is column.table._autoincrement_column +            and ( +                self.dialect.supports_smallserial +                or not isinstance(impl_type, sqltypes.SmallInteger) +            ) +            and not has_identity +            and ( +                column.default is None +                or ( +                    isinstance(column.default, schema.Sequence) +                    and column.default.optional +                ) +            ) +        ): +            if isinstance(impl_type, sqltypes.BigInteger): +                colspec += " BIGSERIAL" +            elif isinstance(impl_type, sqltypes.SmallInteger): +                colspec += " SMALLSERIAL" +            else: +                colspec += " SERIAL" +        else: +            colspec += " " + self.dialect.type_compiler_instance.process( +                column.type, +                type_expression=column, +                identifier_preparer=self.preparer, +            ) +            default = self.get_column_default_string(column) +            if default is not None: +                colspec += " DEFAULT " + default + +        if column.computed is not None: +            colspec += " " + self.process(column.computed) +        if has_identity: +            colspec += " " + self.process(column.identity) + +        if not column.nullable and not has_identity: +            colspec += " NOT NULL" +        elif column.nullable and has_identity: +            colspec += " NULL" +        return colspec + +    def _define_constraint_validity(self, constraint): +        not_valid = constraint.dialect_options["postgresql"]["not_valid"] +        return " NOT VALID" if not_valid else "" + +    def visit_check_constraint(self, constraint, **kw): +        if constraint._type_bound: +            typ = list(constraint.columns)[0].type +            if ( +                isinstance(typ, sqltypes.ARRAY) +                and isinstance(typ.item_type, sqltypes.Enum) +                and not typ.item_type.native_enum +            ): +                raise exc.CompileError( +                    "PostgreSQL dialect cannot produce the CHECK constraint " +                    "for ARRAY of non-native ENUM; please specify " +                    "create_constraint=False on this Enum datatype." +                ) + +        text = super().visit_check_constraint(constraint) +        text += self._define_constraint_validity(constraint) +        return text + +    def visit_foreign_key_constraint(self, constraint, **kw): +        text = super().visit_foreign_key_constraint(constraint) +        text += self._define_constraint_validity(constraint) +        return text + +    def visit_create_enum_type(self, create, **kw): +        type_ = create.element + +        return "CREATE TYPE %s AS ENUM (%s)" % ( +            self.preparer.format_type(type_), +            ", ".join( +                self.sql_compiler.process(sql.literal(e), literal_binds=True) +                for e in type_.enums +            ), +        ) + +    def visit_drop_enum_type(self, drop, **kw): +        type_ = drop.element + +        return "DROP TYPE %s" % (self.preparer.format_type(type_)) + +    def visit_create_domain_type(self, create, **kw): +        domain: DOMAIN = create.element + +        options = [] +        if domain.collation is not None: +            options.append(f"COLLATE {self.preparer.quote(domain.collation)}") +        if domain.default is not None: +            default = self.render_default_string(domain.default) +            options.append(f"DEFAULT {default}") +        if domain.constraint_name is not None: +            name = self.preparer.truncate_and_render_constraint_name( +                domain.constraint_name +            ) +            options.append(f"CONSTRAINT {name}") +        if domain.not_null: +            options.append("NOT NULL") +        if domain.check is not None: +            check = self.sql_compiler.process( +                domain.check, include_table=False, literal_binds=True +            ) +            options.append(f"CHECK ({check})") + +        return ( +            f"CREATE DOMAIN {self.preparer.format_type(domain)} AS " +            f"{self.type_compiler.process(domain.data_type)} " +            f"{' '.join(options)}" +        ) + +    def visit_drop_domain_type(self, drop, **kw): +        domain = drop.element +        return f"DROP DOMAIN {self.preparer.format_type(domain)}" + +    def visit_create_index(self, create, **kw): +        preparer = self.preparer +        index = create.element +        self._verify_index_table(index) +        text = "CREATE " +        if index.unique: +            text += "UNIQUE " + +        text += "INDEX " + +        if self.dialect._supports_create_index_concurrently: +            concurrently = index.dialect_options["postgresql"]["concurrently"] +            if concurrently: +                text += "CONCURRENTLY " + +        if create.if_not_exists: +            text += "IF NOT EXISTS " + +        text += "%s ON %s " % ( +            self._prepared_index_name(index, include_schema=False), +            preparer.format_table(index.table), +        ) + +        using = index.dialect_options["postgresql"]["using"] +        if using: +            text += ( +                "USING %s " +                % self.preparer.validate_sql_phrase(using, IDX_USING).lower() +            ) + +        ops = index.dialect_options["postgresql"]["ops"] +        text += "(%s)" % ( +            ", ".join( +                [ +                    self.sql_compiler.process( +                        ( +                            expr.self_group() +                            if not isinstance(expr, expression.ColumnClause) +                            else expr +                        ), +                        include_table=False, +                        literal_binds=True, +                    ) +                    + ( +                        (" " + ops[expr.key]) +                        if hasattr(expr, "key") and expr.key in ops +                        else "" +                    ) +                    for expr in index.expressions +                ] +            ) +        ) + +        includeclause = index.dialect_options["postgresql"]["include"] +        if includeclause: +            inclusions = [ +                index.table.c[col] if isinstance(col, str) else col +                for col in includeclause +            ] +            text += " INCLUDE (%s)" % ", ".join( +                [preparer.quote(c.name) for c in inclusions] +            ) + +        nulls_not_distinct = index.dialect_options["postgresql"][ +            "nulls_not_distinct" +        ] +        if nulls_not_distinct is True: +            text += " NULLS NOT DISTINCT" +        elif nulls_not_distinct is False: +            text += " NULLS DISTINCT" + +        withclause = index.dialect_options["postgresql"]["with"] +        if withclause: +            text += " WITH (%s)" % ( +                ", ".join( +                    [ +                        "%s = %s" % storage_parameter +                        for storage_parameter in withclause.items() +                    ] +                ) +            ) + +        tablespace_name = index.dialect_options["postgresql"]["tablespace"] +        if tablespace_name: +            text += " TABLESPACE %s" % preparer.quote(tablespace_name) + +        whereclause = index.dialect_options["postgresql"]["where"] +        if whereclause is not None: +            whereclause = coercions.expect( +                roles.DDLExpressionRole, whereclause +            ) + +            where_compiled = self.sql_compiler.process( +                whereclause, include_table=False, literal_binds=True +            ) +            text += " WHERE " + where_compiled + +        return text + +    def define_unique_constraint_distinct(self, constraint, **kw): +        nulls_not_distinct = constraint.dialect_options["postgresql"][ +            "nulls_not_distinct" +        ] +        if nulls_not_distinct is True: +            nulls_not_distinct_param = "NULLS NOT DISTINCT " +        elif nulls_not_distinct is False: +            nulls_not_distinct_param = "NULLS DISTINCT " +        else: +            nulls_not_distinct_param = "" +        return nulls_not_distinct_param + +    def visit_drop_index(self, drop, **kw): +        index = drop.element + +        text = "\nDROP INDEX " + +        if self.dialect._supports_drop_index_concurrently: +            concurrently = index.dialect_options["postgresql"]["concurrently"] +            if concurrently: +                text += "CONCURRENTLY " + +        if drop.if_exists: +            text += "IF EXISTS " + +        text += self._prepared_index_name(index, include_schema=True) +        return text + +    def visit_exclude_constraint(self, constraint, **kw): +        text = "" +        if constraint.name is not None: +            text += "CONSTRAINT %s " % self.preparer.format_constraint( +                constraint +            ) +        elements = [] +        kw["include_table"] = False +        kw["literal_binds"] = True +        for expr, name, op in constraint._render_exprs: +            exclude_element = self.sql_compiler.process(expr, **kw) + ( +                (" " + constraint.ops[expr.key]) +                if hasattr(expr, "key") and expr.key in constraint.ops +                else "" +            ) + +            elements.append("%s WITH %s" % (exclude_element, op)) +        text += "EXCLUDE USING %s (%s)" % ( +            self.preparer.validate_sql_phrase( +                constraint.using, IDX_USING +            ).lower(), +            ", ".join(elements), +        ) +        if constraint.where is not None: +            text += " WHERE (%s)" % self.sql_compiler.process( +                constraint.where, literal_binds=True +            ) +        text += self.define_constraint_deferrability(constraint) +        return text + +    def post_create_table(self, table): +        table_opts = [] +        pg_opts = table.dialect_options["postgresql"] + +        inherits = pg_opts.get("inherits") +        if inherits is not None: +            if not isinstance(inherits, (list, tuple)): +                inherits = (inherits,) +            table_opts.append( +                "\n INHERITS ( " +                + ", ".join(self.preparer.quote(name) for name in inherits) +                + " )" +            ) + +        if pg_opts["partition_by"]: +            table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"]) + +        if pg_opts["using"]: +            table_opts.append("\n USING %s" % pg_opts["using"]) + +        if pg_opts["with_oids"] is True: +            table_opts.append("\n WITH OIDS") +        elif pg_opts["with_oids"] is False: +            table_opts.append("\n WITHOUT OIDS") + +        if pg_opts["on_commit"]: +            on_commit_options = pg_opts["on_commit"].replace("_", " ").upper() +            table_opts.append("\n ON COMMIT %s" % on_commit_options) + +        if pg_opts["tablespace"]: +            tablespace_name = pg_opts["tablespace"] +            table_opts.append( +                "\n TABLESPACE %s" % self.preparer.quote(tablespace_name) +            ) + +        return "".join(table_opts) + +    def visit_computed_column(self, generated, **kw): +        if generated.persisted is False: +            raise exc.CompileError( +                "PostrgreSQL computed columns do not support 'virtual' " +                "persistence; set the 'persisted' flag to None or True for " +                "PostgreSQL support." +            ) + +        return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process( +            generated.sqltext, include_table=False, literal_binds=True +        ) + +    def visit_create_sequence(self, create, **kw): +        prefix = None +        if create.element.data_type is not None: +            prefix = " AS %s" % self.type_compiler.process( +                create.element.data_type +            ) + +        return super().visit_create_sequence(create, prefix=prefix, **kw) + +    def _can_comment_on_constraint(self, ddl_instance): +        constraint = ddl_instance.element +        if constraint.name is None: +            raise exc.CompileError( +                f"Can't emit COMMENT ON for constraint {constraint!r}: " +                "it has no name" +            ) +        if constraint.table is None: +            raise exc.CompileError( +                f"Can't emit COMMENT ON for constraint {constraint!r}: " +                "it has no associated table" +            ) + +    def visit_set_constraint_comment(self, create, **kw): +        self._can_comment_on_constraint(create) +        return "COMMENT ON CONSTRAINT %s ON %s IS %s" % ( +            self.preparer.format_constraint(create.element), +            self.preparer.format_table(create.element.table), +            self.sql_compiler.render_literal_value( +                create.element.comment, sqltypes.String() +            ), +        ) + +    def visit_drop_constraint_comment(self, drop, **kw): +        self._can_comment_on_constraint(drop) +        return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % ( +            self.preparer.format_constraint(drop.element), +            self.preparer.format_table(drop.element.table), +        ) + + +class PGTypeCompiler(compiler.GenericTypeCompiler): +    def visit_TSVECTOR(self, type_, **kw): +        return "TSVECTOR" + +    def visit_TSQUERY(self, type_, **kw): +        return "TSQUERY" + +    def visit_INET(self, type_, **kw): +        return "INET" + +    def visit_CIDR(self, type_, **kw): +        return "CIDR" + +    def visit_CITEXT(self, type_, **kw): +        return "CITEXT" + +    def visit_MACADDR(self, type_, **kw): +        return "MACADDR" + +    def visit_MACADDR8(self, type_, **kw): +        return "MACADDR8" + +    def visit_MONEY(self, type_, **kw): +        return "MONEY" + +    def visit_OID(self, type_, **kw): +        return "OID" + +    def visit_REGCONFIG(self, type_, **kw): +        return "REGCONFIG" + +    def visit_REGCLASS(self, type_, **kw): +        return "REGCLASS" + +    def visit_FLOAT(self, type_, **kw): +        if not type_.precision: +            return "FLOAT" +        else: +            return "FLOAT(%(precision)s)" % {"precision": type_.precision} + +    def visit_double(self, type_, **kw): +        return self.visit_DOUBLE_PRECISION(type, **kw) + +    def visit_BIGINT(self, type_, **kw): +        return "BIGINT" + +    def visit_HSTORE(self, type_, **kw): +        return "HSTORE" + +    def visit_JSON(self, type_, **kw): +        return "JSON" + +    def visit_JSONB(self, type_, **kw): +        return "JSONB" + +    def visit_INT4MULTIRANGE(self, type_, **kw): +        return "INT4MULTIRANGE" + +    def visit_INT8MULTIRANGE(self, type_, **kw): +        return "INT8MULTIRANGE" + +    def visit_NUMMULTIRANGE(self, type_, **kw): +        return "NUMMULTIRANGE" + +    def visit_DATEMULTIRANGE(self, type_, **kw): +        return "DATEMULTIRANGE" + +    def visit_TSMULTIRANGE(self, type_, **kw): +        return "TSMULTIRANGE" + +    def visit_TSTZMULTIRANGE(self, type_, **kw): +        return "TSTZMULTIRANGE" + +    def visit_INT4RANGE(self, type_, **kw): +        return "INT4RANGE" + +    def visit_INT8RANGE(self, type_, **kw): +        return "INT8RANGE" + +    def visit_NUMRANGE(self, type_, **kw): +        return "NUMRANGE" + +    def visit_DATERANGE(self, type_, **kw): +        return "DATERANGE" + +    def visit_TSRANGE(self, type_, **kw): +        return "TSRANGE" + +    def visit_TSTZRANGE(self, type_, **kw): +        return "TSTZRANGE" + +    def visit_json_int_index(self, type_, **kw): +        return "INT" + +    def visit_json_str_index(self, type_, **kw): +        return "TEXT" + +    def visit_datetime(self, type_, **kw): +        return self.visit_TIMESTAMP(type_, **kw) + +    def visit_enum(self, type_, **kw): +        if not type_.native_enum or not self.dialect.supports_native_enum: +            return super().visit_enum(type_, **kw) +        else: +            return self.visit_ENUM(type_, **kw) + +    def visit_ENUM(self, type_, identifier_preparer=None, **kw): +        if identifier_preparer is None: +            identifier_preparer = self.dialect.identifier_preparer +        return identifier_preparer.format_type(type_) + +    def visit_DOMAIN(self, type_, identifier_preparer=None, **kw): +        if identifier_preparer is None: +            identifier_preparer = self.dialect.identifier_preparer +        return identifier_preparer.format_type(type_) + +    def visit_TIMESTAMP(self, type_, **kw): +        return "TIMESTAMP%s %s" % ( +            ( +                "(%d)" % type_.precision +                if getattr(type_, "precision", None) is not None +                else "" +            ), +            (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", +        ) + +    def visit_TIME(self, type_, **kw): +        return "TIME%s %s" % ( +            ( +                "(%d)" % type_.precision +                if getattr(type_, "precision", None) is not None +                else "" +            ), +            (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", +        ) + +    def visit_INTERVAL(self, type_, **kw): +        text = "INTERVAL" +        if type_.fields is not None: +            text += " " + type_.fields +        if type_.precision is not None: +            text += " (%d)" % type_.precision +        return text + +    def visit_BIT(self, type_, **kw): +        if type_.varying: +            compiled = "BIT VARYING" +            if type_.length is not None: +                compiled += "(%d)" % type_.length +        else: +            compiled = "BIT(%d)" % type_.length +        return compiled + +    def visit_uuid(self, type_, **kw): +        if type_.native_uuid: +            return self.visit_UUID(type_, **kw) +        else: +            return super().visit_uuid(type_, **kw) + +    def visit_UUID(self, type_, **kw): +        return "UUID" + +    def visit_large_binary(self, type_, **kw): +        return self.visit_BYTEA(type_, **kw) + +    def visit_BYTEA(self, type_, **kw): +        return "BYTEA" + +    def visit_ARRAY(self, type_, **kw): +        inner = self.process(type_.item_type, **kw) +        return re.sub( +            r"((?: COLLATE.*)?)$", +            ( +                r"%s\1" +                % ( +                    "[]" +                    * (type_.dimensions if type_.dimensions is not None else 1) +                ) +            ), +            inner, +            count=1, +        ) + +    def visit_json_path(self, type_, **kw): +        return self.visit_JSONPATH(type_, **kw) + +    def visit_JSONPATH(self, type_, **kw): +        return "JSONPATH" + + +class PGIdentifierPreparer(compiler.IdentifierPreparer): +    reserved_words = RESERVED_WORDS + +    def _unquote_identifier(self, value): +        if value[0] == self.initial_quote: +            value = value[1:-1].replace( +                self.escape_to_quote, self.escape_quote +            ) +        return value + +    def format_type(self, type_, use_schema=True): +        if not type_.name: +            raise exc.CompileError( +                f"PostgreSQL {type_.__class__.__name__} type requires a name." +            ) + +        name = self.quote(type_.name) +        effective_schema = self.schema_for_object(type_) + +        if ( +            not self.omit_schema +            and use_schema +            and effective_schema is not None +        ): +            name = f"{self.quote_schema(effective_schema)}.{name}" +        return name + + +class ReflectedNamedType(TypedDict): +    """Represents a reflected named type.""" + +    name: str +    """Name of the type.""" +    schema: str +    """The schema of the type.""" +    visible: bool +    """Indicates if this type is in the current search path.""" + + +class ReflectedDomainConstraint(TypedDict): +    """Represents a reflect check constraint of a domain.""" + +    name: str +    """Name of the constraint.""" +    check: str +    """The check constraint text.""" + + +class ReflectedDomain(ReflectedNamedType): +    """Represents a reflected enum.""" + +    type: str +    """The string name of the underlying data type of the domain.""" +    nullable: bool +    """Indicates if the domain allows null or not.""" +    default: Optional[str] +    """The string representation of the default value of this domain +    or ``None`` if none present. +    """ +    constraints: List[ReflectedDomainConstraint] +    """The constraints defined in the domain, if any. +    The constraint are in order of evaluation by postgresql. +    """ +    collation: Optional[str] +    """The collation for the domain.""" + + +class ReflectedEnum(ReflectedNamedType): +    """Represents a reflected enum.""" + +    labels: List[str] +    """The labels that compose the enum.""" + + +class PGInspector(reflection.Inspector): +    dialect: PGDialect + +    def get_table_oid( +        self, table_name: str, schema: Optional[str] = None +    ) -> int: +        """Return the OID for the given table name. + +        :param table_name: string name of the table.  For special quoting, +         use :class:`.quoted_name`. + +        :param schema: string schema name; if omitted, uses the default schema +         of the database connection.  For special quoting, +         use :class:`.quoted_name`. + +        """ + +        with self._operation_context() as conn: +            return self.dialect.get_table_oid( +                conn, table_name, schema, info_cache=self.info_cache +            ) + +    def get_domains( +        self, schema: Optional[str] = None +    ) -> List[ReflectedDomain]: +        """Return a list of DOMAIN objects. + +        Each member is a dictionary containing these fields: + +            * name - name of the domain +            * schema - the schema name for the domain. +            * visible - boolean, whether or not this domain is visible +              in the default search path. +            * type - the type defined by this domain. +            * nullable - Indicates if this domain can be ``NULL``. +            * default - The default value of the domain or ``None`` if the +              domain has no default. +            * constraints - A list of dict wit the constraint defined by this +              domain. Each element constaints two keys: ``name`` of the +              constraint and ``check`` with the constraint text. + +        :param schema: schema name.  If None, the default schema +         (typically 'public') is used.  May also be set to ``'*'`` to +         indicate load domains for all schemas. + +        .. versionadded:: 2.0 + +        """ +        with self._operation_context() as conn: +            return self.dialect._load_domains( +                conn, schema, info_cache=self.info_cache +            ) + +    def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]: +        """Return a list of ENUM objects. + +        Each member is a dictionary containing these fields: + +            * name - name of the enum +            * schema - the schema name for the enum. +            * visible - boolean, whether or not this enum is visible +              in the default search path. +            * labels - a list of string labels that apply to the enum. + +        :param schema: schema name.  If None, the default schema +         (typically 'public') is used.  May also be set to ``'*'`` to +         indicate load enums for all schemas. + +        """ +        with self._operation_context() as conn: +            return self.dialect._load_enums( +                conn, schema, info_cache=self.info_cache +            ) + +    def get_foreign_table_names( +        self, schema: Optional[str] = None +    ) -> List[str]: +        """Return a list of FOREIGN TABLE names. + +        Behavior is similar to that of +        :meth:`_reflection.Inspector.get_table_names`, +        except that the list is limited to those tables that report a +        ``relkind`` value of ``f``. + +        """ +        with self._operation_context() as conn: +            return self.dialect._get_foreign_table_names( +                conn, schema, info_cache=self.info_cache +            ) + +    def has_type( +        self, type_name: str, schema: Optional[str] = None, **kw: Any +    ) -> bool: +        """Return if the database has the specified type in the provided +        schema. + +        :param type_name: the type to check. +        :param schema: schema name.  If None, the default schema +         (typically 'public') is used.  May also be set to ``'*'`` to +         check in all schemas. + +        .. versionadded:: 2.0 + +        """ +        with self._operation_context() as conn: +            return self.dialect.has_type( +                conn, type_name, schema, info_cache=self.info_cache +            ) + + +class PGExecutionContext(default.DefaultExecutionContext): +    def fire_sequence(self, seq, type_): +        return self._execute_scalar( +            ( +                "select nextval('%s')" +                % self.identifier_preparer.format_sequence(seq) +            ), +            type_, +        ) + +    def get_insert_default(self, column): +        if column.primary_key and column is column.table._autoincrement_column: +            if column.server_default and column.server_default.has_argument: +                # pre-execute passive defaults on primary key columns +                return self._execute_scalar( +                    "select %s" % column.server_default.arg, column.type +                ) + +            elif column.default is None or ( +                column.default.is_sequence and column.default.optional +            ): +                # execute the sequence associated with a SERIAL primary +                # key column. for non-primary-key SERIAL, the ID just +                # generates server side. + +                try: +                    seq_name = column._postgresql_seq_name +                except AttributeError: +                    tab = column.table.name +                    col = column.name +                    tab = tab[0 : 29 + max(0, (29 - len(col)))] +                    col = col[0 : 29 + max(0, (29 - len(tab)))] +                    name = "%s_%s_seq" % (tab, col) +                    column._postgresql_seq_name = seq_name = name + +                if column.table is not None: +                    effective_schema = self.connection.schema_for_object( +                        column.table +                    ) +                else: +                    effective_schema = None + +                if effective_schema is not None: +                    exc = 'select nextval(\'"%s"."%s"\')' % ( +                        effective_schema, +                        seq_name, +                    ) +                else: +                    exc = "select nextval('\"%s\"')" % (seq_name,) + +                return self._execute_scalar(exc, column.type) + +        return super().get_insert_default(column) + + +class PGReadOnlyConnectionCharacteristic( +    characteristics.ConnectionCharacteristic +): +    transactional = True + +    def reset_characteristic(self, dialect, dbapi_conn): +        dialect.set_readonly(dbapi_conn, False) + +    def set_characteristic(self, dialect, dbapi_conn, value): +        dialect.set_readonly(dbapi_conn, value) + +    def get_characteristic(self, dialect, dbapi_conn): +        return dialect.get_readonly(dbapi_conn) + + +class PGDeferrableConnectionCharacteristic( +    characteristics.ConnectionCharacteristic +): +    transactional = True + +    def reset_characteristic(self, dialect, dbapi_conn): +        dialect.set_deferrable(dbapi_conn, False) + +    def set_characteristic(self, dialect, dbapi_conn, value): +        dialect.set_deferrable(dbapi_conn, value) + +    def get_characteristic(self, dialect, dbapi_conn): +        return dialect.get_deferrable(dbapi_conn) + + +class PGDialect(default.DefaultDialect): +    name = "postgresql" +    supports_statement_cache = True +    supports_alter = True +    max_identifier_length = 63 +    supports_sane_rowcount = True + +    bind_typing = interfaces.BindTyping.RENDER_CASTS + +    supports_native_enum = True +    supports_native_boolean = True +    supports_native_uuid = True +    supports_smallserial = True + +    supports_sequences = True +    sequences_optional = True +    preexecute_autoincrement_sequences = True +    postfetch_lastrowid = False +    use_insertmanyvalues = True + +    returns_native_bytes = True + +    insertmanyvalues_implicit_sentinel = ( +        InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT +        | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT +        | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS +    ) + +    supports_comments = True +    supports_constraint_comments = True +    supports_default_values = True + +    supports_default_metavalue = True + +    supports_empty_insert = False +    supports_multivalues_insert = True + +    supports_identity_columns = True + +    default_paramstyle = "pyformat" +    ischema_names = ischema_names +    colspecs = colspecs + +    statement_compiler = PGCompiler +    ddl_compiler = PGDDLCompiler +    type_compiler_cls = PGTypeCompiler +    preparer = PGIdentifierPreparer +    execution_ctx_cls = PGExecutionContext +    inspector = PGInspector + +    update_returning = True +    delete_returning = True +    insert_returning = True +    update_returning_multifrom = True +    delete_returning_multifrom = True + +    connection_characteristics = ( +        default.DefaultDialect.connection_characteristics +    ) +    connection_characteristics = connection_characteristics.union( +        { +            "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), +            "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), +        } +    ) + +    construct_arguments = [ +        ( +            schema.Index, +            { +                "using": False, +                "include": None, +                "where": None, +                "ops": {}, +                "concurrently": False, +                "with": {}, +                "tablespace": None, +                "nulls_not_distinct": None, +            }, +        ), +        ( +            schema.Table, +            { +                "ignore_search_path": False, +                "tablespace": None, +                "partition_by": None, +                "with_oids": None, +                "on_commit": None, +                "inherits": None, +                "using": None, +            }, +        ), +        ( +            schema.CheckConstraint, +            { +                "not_valid": False, +            }, +        ), +        ( +            schema.ForeignKeyConstraint, +            { +                "not_valid": False, +            }, +        ), +        ( +            schema.UniqueConstraint, +            {"nulls_not_distinct": None}, +        ), +    ] + +    reflection_options = ("postgresql_ignore_search_path",) + +    _backslash_escapes = True +    _supports_create_index_concurrently = True +    _supports_drop_index_concurrently = True + +    def __init__( +        self, +        native_inet_types=None, +        json_serializer=None, +        json_deserializer=None, +        **kwargs, +    ): +        default.DefaultDialect.__init__(self, **kwargs) + +        self._native_inet_types = native_inet_types +        self._json_deserializer = json_deserializer +        self._json_serializer = json_serializer + +    def initialize(self, connection): +        super().initialize(connection) + +        # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689 +        self.supports_smallserial = self.server_version_info >= (9, 2) + +        self._set_backslash_escapes(connection) + +        self._supports_drop_index_concurrently = self.server_version_info >= ( +            9, +            2, +        ) +        self.supports_identity_columns = self.server_version_info >= (10,) + +    def get_isolation_level_values(self, dbapi_conn): +        # note the generic dialect doesn't have AUTOCOMMIT, however +        # all postgresql dialects should include AUTOCOMMIT. +        return ( +            "SERIALIZABLE", +            "READ UNCOMMITTED", +            "READ COMMITTED", +            "REPEATABLE READ", +        ) + +    def set_isolation_level(self, dbapi_connection, level): +        cursor = dbapi_connection.cursor() +        cursor.execute( +            "SET SESSION CHARACTERISTICS AS TRANSACTION " +            f"ISOLATION LEVEL {level}" +        ) +        cursor.execute("COMMIT") +        cursor.close() + +    def get_isolation_level(self, dbapi_connection): +        cursor = dbapi_connection.cursor() +        cursor.execute("show transaction isolation level") +        val = cursor.fetchone()[0] +        cursor.close() +        return val.upper() + +    def set_readonly(self, connection, value): +        raise NotImplementedError() + +    def get_readonly(self, connection): +        raise NotImplementedError() + +    def set_deferrable(self, connection, value): +        raise NotImplementedError() + +    def get_deferrable(self, connection): +        raise NotImplementedError() + +    def _split_multihost_from_url(self, url: URL) -> Union[ +        Tuple[None, None], +        Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]], +    ]: +        hosts: Optional[Tuple[Optional[str], ...]] = None +        ports_str: Union[str, Tuple[Optional[str], ...], None] = None + +        integrated_multihost = False + +        if "host" in url.query: +            if isinstance(url.query["host"], (list, tuple)): +                integrated_multihost = True +                hosts, ports_str = zip( +                    *[ +                        token.split(":") if ":" in token else (token, None) +                        for token in url.query["host"] +                    ] +                ) + +            elif isinstance(url.query["host"], str): +                hosts = tuple(url.query["host"].split(",")) + +                if ( +                    "port" not in url.query +                    and len(hosts) == 1 +                    and ":" in hosts[0] +                ): +                    # internet host is alphanumeric plus dots or hyphens. +                    # this is essentially rfc1123, which refers to rfc952. +                    # https://stackoverflow.com/questions/3523028/ +                    # valid-characters-of-a-hostname +                    host_port_match = re.match( +                        r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0] +                    ) +                    if host_port_match: +                        integrated_multihost = True +                        h, p = host_port_match.group(1, 2) +                        if TYPE_CHECKING: +                            assert isinstance(h, str) +                            assert isinstance(p, str) +                        hosts = (h,) +                        ports_str = cast( +                            "Tuple[Optional[str], ...]", (p,) if p else (None,) +                        ) + +        if "port" in url.query: +            if integrated_multihost: +                raise exc.ArgumentError( +                    "Can't mix 'multihost' formats together; use " +                    '"host=h1,h2,h3&port=p1,p2,p3" or ' +                    '"host=h1:p1&host=h2:p2&host=h3:p3" separately' +                ) +            if isinstance(url.query["port"], (list, tuple)): +                ports_str = url.query["port"] +            elif isinstance(url.query["port"], str): +                ports_str = tuple(url.query["port"].split(",")) + +        ports: Optional[Tuple[Optional[int], ...]] = None + +        if ports_str: +            try: +                ports = tuple(int(x) if x else None for x in ports_str) +            except ValueError: +                raise exc.ArgumentError( +                    f"Received non-integer port arguments: {ports_str}" +                ) from None + +        if ports and ( +            (not hosts and len(ports) > 1) +            or ( +                hosts +                and ports +                and len(hosts) != len(ports) +                and (len(hosts) > 1 or len(ports) > 1) +            ) +        ): +            raise exc.ArgumentError("number of hosts and ports don't match") + +        if hosts is not None: +            if ports is None: +                ports = tuple(None for _ in hosts) + +        return hosts, ports  # type: ignore + +    def do_begin_twophase(self, connection, xid): +        self.do_begin(connection.connection) + +    def do_prepare_twophase(self, connection, xid): +        connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) + +    def do_rollback_twophase( +        self, connection, xid, is_prepared=True, recover=False +    ): +        if is_prepared: +            if recover: +                # FIXME: ugly hack to get out of transaction +                # context when committing recoverable transactions +                # Must find out a way how to make the dbapi not +                # open a transaction. +                connection.exec_driver_sql("ROLLBACK") +            connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) +            connection.exec_driver_sql("BEGIN") +            self.do_rollback(connection.connection) +        else: +            self.do_rollback(connection.connection) + +    def do_commit_twophase( +        self, connection, xid, is_prepared=True, recover=False +    ): +        if is_prepared: +            if recover: +                connection.exec_driver_sql("ROLLBACK") +            connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) +            connection.exec_driver_sql("BEGIN") +            self.do_rollback(connection.connection) +        else: +            self.do_commit(connection.connection) + +    def do_recover_twophase(self, connection): +        return connection.scalars( +            sql.text("SELECT gid FROM pg_prepared_xacts") +        ).all() + +    def _get_default_schema_name(self, connection): +        return connection.exec_driver_sql("select current_schema()").scalar() + +    @reflection.cache +    def has_schema(self, connection, schema, **kw): +        query = select(pg_catalog.pg_namespace.c.nspname).where( +            pg_catalog.pg_namespace.c.nspname == schema +        ) +        return bool(connection.scalar(query)) + +    def _pg_class_filter_scope_schema( +        self, query, schema, scope, pg_class_table=None +    ): +        if pg_class_table is None: +            pg_class_table = pg_catalog.pg_class +        query = query.join( +            pg_catalog.pg_namespace, +            pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, +        ) + +        if scope is ObjectScope.DEFAULT: +            query = query.where(pg_class_table.c.relpersistence != "t") +        elif scope is ObjectScope.TEMPORARY: +            query = query.where(pg_class_table.c.relpersistence == "t") + +        if schema is None: +            query = query.where( +                pg_catalog.pg_table_is_visible(pg_class_table.c.oid), +                # ignore pg_catalog schema +                pg_catalog.pg_namespace.c.nspname != "pg_catalog", +            ) +        else: +            query = query.where(pg_catalog.pg_namespace.c.nspname == schema) +        return query + +    def _pg_class_relkind_condition(self, relkinds, pg_class_table=None): +        if pg_class_table is None: +            pg_class_table = pg_catalog.pg_class +        # uses the any form instead of in otherwise postgresql complaings +        # that 'IN could not convert type character to "char"' +        return pg_class_table.c.relkind == sql.any_(_array.array(relkinds)) + +    @lru_cache() +    def _has_table_query(self, schema): +        query = select(pg_catalog.pg_class.c.relname).where( +            pg_catalog.pg_class.c.relname == bindparam("table_name"), +            self._pg_class_relkind_condition( +                pg_catalog.RELKINDS_ALL_TABLE_LIKE +            ), +        ) +        return self._pg_class_filter_scope_schema( +            query, schema, scope=ObjectScope.ANY +        ) + +    @reflection.cache +    def has_table(self, connection, table_name, schema=None, **kw): +        self._ensure_has_table_connection(connection) +        query = self._has_table_query(schema) +        return bool(connection.scalar(query, {"table_name": table_name})) + +    @reflection.cache +    def has_sequence(self, connection, sequence_name, schema=None, **kw): +        query = select(pg_catalog.pg_class.c.relname).where( +            pg_catalog.pg_class.c.relkind == "S", +            pg_catalog.pg_class.c.relname == sequence_name, +        ) +        query = self._pg_class_filter_scope_schema( +            query, schema, scope=ObjectScope.ANY +        ) +        return bool(connection.scalar(query)) + +    @reflection.cache +    def has_type(self, connection, type_name, schema=None, **kw): +        query = ( +            select(pg_catalog.pg_type.c.typname) +            .join( +                pg_catalog.pg_namespace, +                pg_catalog.pg_namespace.c.oid +                == pg_catalog.pg_type.c.typnamespace, +            ) +            .where(pg_catalog.pg_type.c.typname == type_name) +        ) +        if schema is None: +            query = query.where( +                pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), +                # ignore pg_catalog schema +                pg_catalog.pg_namespace.c.nspname != "pg_catalog", +            ) +        elif schema != "*": +            query = query.where(pg_catalog.pg_namespace.c.nspname == schema) + +        return bool(connection.scalar(query)) + +    def _get_server_version_info(self, connection): +        v = connection.exec_driver_sql("select pg_catalog.version()").scalar() +        m = re.match( +            r".*(?:PostgreSQL|EnterpriseDB) " +            r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?", +            v, +        ) +        if not m: +            raise AssertionError( +                "Could not determine version from string '%s'" % v +            ) +        return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) + +    @reflection.cache +    def get_table_oid(self, connection, table_name, schema=None, **kw): +        """Fetch the oid for schema.table_name.""" +        query = select(pg_catalog.pg_class.c.oid).where( +            pg_catalog.pg_class.c.relname == table_name, +            self._pg_class_relkind_condition( +                pg_catalog.RELKINDS_ALL_TABLE_LIKE +            ), +        ) +        query = self._pg_class_filter_scope_schema( +            query, schema, scope=ObjectScope.ANY +        ) +        table_oid = connection.scalar(query) +        if table_oid is None: +            raise exc.NoSuchTableError( +                f"{schema}.{table_name}" if schema else table_name +            ) +        return table_oid + +    @reflection.cache +    def get_schema_names(self, connection, **kw): +        query = ( +            select(pg_catalog.pg_namespace.c.nspname) +            .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%")) +            .order_by(pg_catalog.pg_namespace.c.nspname) +        ) +        return connection.scalars(query).all() + +    def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope): +        query = select(pg_catalog.pg_class.c.relname).where( +            self._pg_class_relkind_condition(relkinds) +        ) +        query = self._pg_class_filter_scope_schema(query, schema, scope=scope) +        return connection.scalars(query).all() + +    @reflection.cache +    def get_table_names(self, connection, schema=None, **kw): +        return self._get_relnames_for_relkinds( +            connection, +            schema, +            pg_catalog.RELKINDS_TABLE_NO_FOREIGN, +            scope=ObjectScope.DEFAULT, +        ) + +    @reflection.cache +    def get_temp_table_names(self, connection, **kw): +        return self._get_relnames_for_relkinds( +            connection, +            schema=None, +            relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN, +            scope=ObjectScope.TEMPORARY, +        ) + +    @reflection.cache +    def _get_foreign_table_names(self, connection, schema=None, **kw): +        return self._get_relnames_for_relkinds( +            connection, schema, relkinds=("f",), scope=ObjectScope.ANY +        ) + +    @reflection.cache +    def get_view_names(self, connection, schema=None, **kw): +        return self._get_relnames_for_relkinds( +            connection, +            schema, +            pg_catalog.RELKINDS_VIEW, +            scope=ObjectScope.DEFAULT, +        ) + +    @reflection.cache +    def get_materialized_view_names(self, connection, schema=None, **kw): +        return self._get_relnames_for_relkinds( +            connection, +            schema, +            pg_catalog.RELKINDS_MAT_VIEW, +            scope=ObjectScope.DEFAULT, +        ) + +    @reflection.cache +    def get_temp_view_names(self, connection, schema=None, **kw): +        return self._get_relnames_for_relkinds( +            connection, +            schema, +            # NOTE: do not include temp materialzied views (that do not +            # seem to be a thing at least up to version 14) +            pg_catalog.RELKINDS_VIEW, +            scope=ObjectScope.TEMPORARY, +        ) + +    @reflection.cache +    def get_sequence_names(self, connection, schema=None, **kw): +        return self._get_relnames_for_relkinds( +            connection, schema, relkinds=("S",), scope=ObjectScope.ANY +        ) + +    @reflection.cache +    def get_view_definition(self, connection, view_name, schema=None, **kw): +        query = ( +            select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid)) +            .select_from(pg_catalog.pg_class) +            .where( +                pg_catalog.pg_class.c.relname == view_name, +                self._pg_class_relkind_condition( +                    pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW +                ), +            ) +        ) +        query = self._pg_class_filter_scope_schema( +            query, schema, scope=ObjectScope.ANY +        ) +        res = connection.scalar(query) +        if res is None: +            raise exc.NoSuchTableError( +                f"{schema}.{view_name}" if schema else view_name +            ) +        else: +            return res + +    def _value_or_raise(self, data, table, schema): +        try: +            return dict(data)[(schema, table)] +        except KeyError: +            raise exc.NoSuchTableError( +                f"{schema}.{table}" if schema else table +            ) from None + +    def _prepare_filter_names(self, filter_names): +        if filter_names: +            return True, {"filter_names": filter_names} +        else: +            return False, {} + +    def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]: +        if kind is ObjectKind.ANY: +            return pg_catalog.RELKINDS_ALL_TABLE_LIKE +        relkinds = () +        if ObjectKind.TABLE in kind: +            relkinds += pg_catalog.RELKINDS_TABLE +        if ObjectKind.VIEW in kind: +            relkinds += pg_catalog.RELKINDS_VIEW +        if ObjectKind.MATERIALIZED_VIEW in kind: +            relkinds += pg_catalog.RELKINDS_MAT_VIEW +        return relkinds + +    @reflection.cache +    def get_columns(self, connection, table_name, schema=None, **kw): +        data = self.get_multi_columns( +            connection, +            schema=schema, +            filter_names=[table_name], +            scope=ObjectScope.ANY, +            kind=ObjectKind.ANY, +            **kw, +        ) +        return self._value_or_raise(data, table_name, schema) + +    @lru_cache() +    def _columns_query(self, schema, has_filter_names, scope, kind): +        # NOTE: the query with the default and identity options scalar +        # subquery is faster than trying to use outer joins for them +        generated = ( +            pg_catalog.pg_attribute.c.attgenerated.label("generated") +            if self.server_version_info >= (12,) +            else sql.null().label("generated") +        ) +        if self.server_version_info >= (10,): +            # join lateral performs worse (~2x slower) than a scalar_subquery +            identity = ( +                select( +                    sql.func.json_build_object( +                        "always", +                        pg_catalog.pg_attribute.c.attidentity == "a", +                        "start", +                        pg_catalog.pg_sequence.c.seqstart, +                        "increment", +                        pg_catalog.pg_sequence.c.seqincrement, +                        "minvalue", +                        pg_catalog.pg_sequence.c.seqmin, +                        "maxvalue", +                        pg_catalog.pg_sequence.c.seqmax, +                        "cache", +                        pg_catalog.pg_sequence.c.seqcache, +                        "cycle", +                        pg_catalog.pg_sequence.c.seqcycle, +                    ) +                ) +                .select_from(pg_catalog.pg_sequence) +                .where( +                    # attidentity != '' is required or it will reflect also +                    # serial columns as identity. +                    pg_catalog.pg_attribute.c.attidentity != "", +                    pg_catalog.pg_sequence.c.seqrelid +                    == sql.cast( +                        sql.cast( +                            pg_catalog.pg_get_serial_sequence( +                                sql.cast( +                                    sql.cast( +                                        pg_catalog.pg_attribute.c.attrelid, +                                        REGCLASS, +                                    ), +                                    TEXT, +                                ), +                                pg_catalog.pg_attribute.c.attname, +                            ), +                            REGCLASS, +                        ), +                        OID, +                    ), +                ) +                .correlate(pg_catalog.pg_attribute) +                .scalar_subquery() +                .label("identity_options") +            ) +        else: +            identity = sql.null().label("identity_options") + +        # join lateral performs the same as scalar_subquery here +        default = ( +            select( +                pg_catalog.pg_get_expr( +                    pg_catalog.pg_attrdef.c.adbin, +                    pg_catalog.pg_attrdef.c.adrelid, +                ) +            ) +            .select_from(pg_catalog.pg_attrdef) +            .where( +                pg_catalog.pg_attrdef.c.adrelid +                == pg_catalog.pg_attribute.c.attrelid, +                pg_catalog.pg_attrdef.c.adnum +                == pg_catalog.pg_attribute.c.attnum, +                pg_catalog.pg_attribute.c.atthasdef, +            ) +            .correlate(pg_catalog.pg_attribute) +            .scalar_subquery() +            .label("default") +        ) +        relkinds = self._kind_to_relkinds(kind) +        query = ( +            select( +                pg_catalog.pg_attribute.c.attname.label("name"), +                pg_catalog.format_type( +                    pg_catalog.pg_attribute.c.atttypid, +                    pg_catalog.pg_attribute.c.atttypmod, +                ).label("format_type"), +                default, +                pg_catalog.pg_attribute.c.attnotnull.label("not_null"), +                pg_catalog.pg_class.c.relname.label("table_name"), +                pg_catalog.pg_description.c.description.label("comment"), +                generated, +                identity, +            ) +            .select_from(pg_catalog.pg_class) +            # NOTE: postgresql support table with no user column, meaning +            # there is no row with pg_attribute.attnum > 0. use a left outer +            # join to avoid filtering these tables. +            .outerjoin( +                pg_catalog.pg_attribute, +                sql.and_( +                    pg_catalog.pg_class.c.oid +                    == pg_catalog.pg_attribute.c.attrelid, +                    pg_catalog.pg_attribute.c.attnum > 0, +                    ~pg_catalog.pg_attribute.c.attisdropped, +                ), +            ) +            .outerjoin( +                pg_catalog.pg_description, +                sql.and_( +                    pg_catalog.pg_description.c.objoid +                    == pg_catalog.pg_attribute.c.attrelid, +                    pg_catalog.pg_description.c.objsubid +                    == pg_catalog.pg_attribute.c.attnum, +                ), +            ) +            .where(self._pg_class_relkind_condition(relkinds)) +            .order_by( +                pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum +            ) +        ) +        query = self._pg_class_filter_scope_schema(query, schema, scope=scope) +        if has_filter_names: +            query = query.where( +                pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) +            ) +        return query + +    def get_multi_columns( +        self, connection, schema, filter_names, scope, kind, **kw +    ): +        has_filter_names, params = self._prepare_filter_names(filter_names) +        query = self._columns_query(schema, has_filter_names, scope, kind) +        rows = connection.execute(query, params).mappings() + +        # dictionary with (name, ) if default search path or (schema, name) +        # as keys +        domains = { +            ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d +            for d in self._load_domains( +                connection, schema="*", info_cache=kw.get("info_cache") +            ) +        } + +        # dictionary with (name, ) if default search path or (schema, name) +        # as keys +        enums = dict( +            ( +                ((rec["name"],), rec) +                if rec["visible"] +                else ((rec["schema"], rec["name"]), rec) +            ) +            for rec in self._load_enums( +                connection, schema="*", info_cache=kw.get("info_cache") +            ) +        ) + +        columns = self._get_columns_info(rows, domains, enums, schema) + +        return columns.items() + +    _format_type_args_pattern = re.compile(r"\((.*)\)") +    _format_type_args_delim = re.compile(r"\s*,\s*") +    _format_array_spec_pattern = re.compile(r"((?:\[\])*)$") + +    def _reflect_type( +        self, +        format_type: Optional[str], +        domains: dict[str, ReflectedDomain], +        enums: dict[str, ReflectedEnum], +        type_description: str, +    ) -> sqltypes.TypeEngine[Any]: +        """ +        Attempts to reconstruct a column type defined in ischema_names based +        on the information available in the format_type. + +        If the `format_type` cannot be associated with a known `ischema_names`, +        it is treated as a reference to a known PostgreSQL named `ENUM` or +        `DOMAIN` type. +        """ +        type_description = type_description or "unknown type" +        if format_type is None: +            util.warn( +                "PostgreSQL format_type() returned NULL for %s" +                % type_description +            ) +            return sqltypes.NULLTYPE + +        attype_args_match = self._format_type_args_pattern.search(format_type) +        if attype_args_match and attype_args_match.group(1): +            attype_args = self._format_type_args_delim.split( +                attype_args_match.group(1) +            ) +        else: +            attype_args = () + +        match_array_dim = self._format_array_spec_pattern.search(format_type) +        # Each "[]" in array specs corresponds to an array dimension +        array_dim = len(match_array_dim.group(1) or "") // 2 + +        # Remove all parameters and array specs from format_type to obtain an +        # ischema_name candidate +        attype = self._format_type_args_pattern.sub("", format_type) +        attype = self._format_array_spec_pattern.sub("", attype) + +        schema_type = self.ischema_names.get(attype.lower(), None) +        args, kwargs = (), {} + +        if attype == "numeric": +            if len(attype_args) == 2: +                precision, scale = map(int, attype_args) +                args = (precision, scale) + +        elif attype == "double precision": +            args = (53,) + +        elif attype == "integer": +            args = () + +        elif attype in ("timestamp with time zone", "time with time zone"): +            kwargs["timezone"] = True +            if len(attype_args) == 1: +                kwargs["precision"] = int(attype_args[0]) + +        elif attype in ( +            "timestamp without time zone", +            "time without time zone", +            "time", +        ): +            kwargs["timezone"] = False +            if len(attype_args) == 1: +                kwargs["precision"] = int(attype_args[0]) + +        elif attype == "bit varying": +            kwargs["varying"] = True +            if len(attype_args) == 1: +                charlen = int(attype_args[0]) +                args = (charlen,) + +        elif attype.startswith("interval"): +            schema_type = INTERVAL + +            field_match = re.match(r"interval (.+)", attype) +            if field_match: +                kwargs["fields"] = field_match.group(1) + +            if len(attype_args) == 1: +                kwargs["precision"] = int(attype_args[0]) + +        else: +            enum_or_domain_key = tuple(util.quoted_token_parser(attype)) + +            if enum_or_domain_key in enums: +                schema_type = ENUM +                enum = enums[enum_or_domain_key] + +                args = tuple(enum["labels"]) +                kwargs["name"] = enum["name"] + +                if not enum["visible"]: +                    kwargs["schema"] = enum["schema"] +                args = tuple(enum["labels"]) +            elif enum_or_domain_key in domains: +                schema_type = DOMAIN +                domain = domains[enum_or_domain_key] + +                data_type = self._reflect_type( +                    domain["type"], +                    domains, +                    enums, +                    type_description="DOMAIN '%s'" % domain["name"], +                ) +                args = (domain["name"], data_type) + +                kwargs["collation"] = domain["collation"] +                kwargs["default"] = domain["default"] +                kwargs["not_null"] = not domain["nullable"] +                kwargs["create_type"] = False + +                if domain["constraints"]: +                    # We only support a single constraint +                    check_constraint = domain["constraints"][0] + +                    kwargs["constraint_name"] = check_constraint["name"] +                    kwargs["check"] = check_constraint["check"] + +                if not domain["visible"]: +                    kwargs["schema"] = domain["schema"] + +            else: +                try: +                    charlen = int(attype_args[0]) +                    args = (charlen, *attype_args[1:]) +                except (ValueError, IndexError): +                    args = attype_args + +        if not schema_type: +            util.warn( +                "Did not recognize type '%s' of %s" +                % (attype, type_description) +            ) +            return sqltypes.NULLTYPE + +        data_type = schema_type(*args, **kwargs) +        if array_dim >= 1: +            # postgres does not preserve dimensionality or size of array types. +            data_type = _array.ARRAY(data_type) + +        return data_type + +    def _get_columns_info(self, rows, domains, enums, schema): +        columns = defaultdict(list) +        for row_dict in rows: +            # ensure that each table has an entry, even if it has no columns +            if row_dict["name"] is None: +                columns[(schema, row_dict["table_name"])] = ( +                    ReflectionDefaults.columns() +                ) +                continue +            table_cols = columns[(schema, row_dict["table_name"])] + +            coltype = self._reflect_type( +                row_dict["format_type"], +                domains, +                enums, +                type_description="column '%s'" % row_dict["name"], +            ) + +            default = row_dict["default"] +            name = row_dict["name"] +            generated = row_dict["generated"] +            nullable = not row_dict["not_null"] + +            if isinstance(coltype, DOMAIN): +                if not default: +                    # domain can override the default value but +                    # cant set it to None +                    if coltype.default is not None: +                        default = coltype.default + +                nullable = nullable and not coltype.not_null + +            identity = row_dict["identity_options"] + +            # If a zero byte or blank string depending on driver (is also +            # absent for older PG versions), then not a generated column. +            # Otherwise, s = stored. (Other values might be added in the +            # future.) +            if generated not in (None, "", b"\x00"): +                computed = dict( +                    sqltext=default, persisted=generated in ("s", b"s") +                ) +                default = None +            else: +                computed = None + +            # adjust the default value +            autoincrement = False +            if default is not None: +                match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) +                if match is not None: +                    if issubclass(coltype._type_affinity, sqltypes.Integer): +                        autoincrement = True +                    # the default is related to a Sequence +                    if "." not in match.group(2) and schema is not None: +                        # unconditionally quote the schema name.  this could +                        # later be enhanced to obey quoting rules / +                        # "quote schema" +                        default = ( +                            match.group(1) +                            + ('"%s"' % schema) +                            + "." +                            + match.group(2) +                            + match.group(3) +                        ) + +            column_info = { +                "name": name, +                "type": coltype, +                "nullable": nullable, +                "default": default, +                "autoincrement": autoincrement or identity is not None, +                "comment": row_dict["comment"], +            } +            if computed is not None: +                column_info["computed"] = computed +            if identity is not None: +                column_info["identity"] = identity + +            table_cols.append(column_info) + +        return columns + +    @lru_cache() +    def _table_oids_query(self, schema, has_filter_names, scope, kind): +        relkinds = self._kind_to_relkinds(kind) +        oid_q = select( +            pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname +        ).where(self._pg_class_relkind_condition(relkinds)) +        oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope) + +        if has_filter_names: +            oid_q = oid_q.where( +                pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) +            ) +        return oid_q + +    @reflection.flexi_cache( +        ("schema", InternalTraversal.dp_string), +        ("filter_names", InternalTraversal.dp_string_list), +        ("kind", InternalTraversal.dp_plain_obj), +        ("scope", InternalTraversal.dp_plain_obj), +    ) +    def _get_table_oids( +        self, connection, schema, filter_names, scope, kind, **kw +    ): +        has_filter_names, params = self._prepare_filter_names(filter_names) +        oid_q = self._table_oids_query(schema, has_filter_names, scope, kind) +        result = connection.execute(oid_q, params) +        return result.all() + +    @lru_cache() +    def _constraint_query(self, is_unique): +        con_sq = ( +            select( +                pg_catalog.pg_constraint.c.conrelid, +                pg_catalog.pg_constraint.c.conname, +                pg_catalog.pg_constraint.c.conindid, +                sql.func.unnest(pg_catalog.pg_constraint.c.conkey).label( +                    "attnum" +                ), +                sql.func.generate_subscripts( +                    pg_catalog.pg_constraint.c.conkey, 1 +                ).label("ord"), +                pg_catalog.pg_description.c.description, +            ) +            .outerjoin( +                pg_catalog.pg_description, +                pg_catalog.pg_description.c.objoid +                == pg_catalog.pg_constraint.c.oid, +            ) +            .where( +                pg_catalog.pg_constraint.c.contype == bindparam("contype"), +                pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")), +            ) +            .subquery("con") +        ) + +        attr_sq = ( +            select( +                con_sq.c.conrelid, +                con_sq.c.conname, +                con_sq.c.conindid, +                con_sq.c.description, +                con_sq.c.ord, +                pg_catalog.pg_attribute.c.attname, +            ) +            .select_from(pg_catalog.pg_attribute) +            .join( +                con_sq, +                sql.and_( +                    pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum, +                    pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid, +                ), +            ) +            .where( +                # NOTE: restate the condition here, since pg15 otherwise +                # seems to get confused on pscopg2 sometimes, doing +                # a sequential scan of pg_attribute. +                # The condition in the con_sq subquery is not actually needed +                # in pg15, but it may be needed in older versions. Keeping it +                # does not seems to have any inpact in any case. +                con_sq.c.conrelid.in_(bindparam("oids")) +            ) +            .subquery("attr") +        ) + +        constraint_query = ( +            select( +                attr_sq.c.conrelid, +                sql.func.array_agg( +                    # NOTE: cast since some postgresql derivatives may +                    # not support array_agg on the name type +                    aggregate_order_by( +                        attr_sq.c.attname.cast(TEXT), attr_sq.c.ord +                    ) +                ).label("cols"), +                attr_sq.c.conname, +                sql.func.min(attr_sq.c.description).label("description"), +            ) +            .group_by(attr_sq.c.conrelid, attr_sq.c.conname) +            .order_by(attr_sq.c.conrelid, attr_sq.c.conname) +        ) + +        if is_unique: +            if self.server_version_info >= (15,): +                constraint_query = constraint_query.join( +                    pg_catalog.pg_index, +                    attr_sq.c.conindid == pg_catalog.pg_index.c.indexrelid, +                ).add_columns( +                    sql.func.bool_and( +                        pg_catalog.pg_index.c.indnullsnotdistinct +                    ).label("indnullsnotdistinct") +                ) +            else: +                constraint_query = constraint_query.add_columns( +                    sql.false().label("indnullsnotdistinct") +                ) +        else: +            constraint_query = constraint_query.add_columns( +                sql.null().label("extra") +            ) +        return constraint_query + +    def _reflect_constraint( +        self, connection, contype, schema, filter_names, scope, kind, **kw +    ): +        # used to reflect primary and unique constraint +        table_oids = self._get_table_oids( +            connection, schema, filter_names, scope, kind, **kw +        ) +        batches = list(table_oids) +        is_unique = contype == "u" + +        while batches: +            batch = batches[0:3000] +            batches[0:3000] = [] + +            result = connection.execute( +                self._constraint_query(is_unique), +                {"oids": [r[0] for r in batch], "contype": contype}, +            ) + +            result_by_oid = defaultdict(list) +            for oid, cols, constraint_name, comment, extra in result: +                result_by_oid[oid].append( +                    (cols, constraint_name, comment, extra) +                ) + +            for oid, tablename in batch: +                for_oid = result_by_oid.get(oid, ()) +                if for_oid: +                    for cols, constraint, comment, extra in for_oid: +                        if is_unique: +                            yield tablename, cols, constraint, comment, { +                                "nullsnotdistinct": extra +                            } +                        else: +                            yield tablename, cols, constraint, comment, None +                else: +                    yield tablename, None, None, None, None + +    @reflection.cache +    def get_pk_constraint(self, connection, table_name, schema=None, **kw): +        data = self.get_multi_pk_constraint( +            connection, +            schema=schema, +            filter_names=[table_name], +            scope=ObjectScope.ANY, +            kind=ObjectKind.ANY, +            **kw, +        ) +        return self._value_or_raise(data, table_name, schema) + +    def get_multi_pk_constraint( +        self, connection, schema, filter_names, scope, kind, **kw +    ): +        result = self._reflect_constraint( +            connection, "p", schema, filter_names, scope, kind, **kw +        ) + +        # only a single pk can be present for each table. Return an entry +        # even if a table has no primary key +        default = ReflectionDefaults.pk_constraint +        return ( +            ( +                (schema, table_name), +                ( +                    { +                        "constrained_columns": [] if cols is None else cols, +                        "name": pk_name, +                        "comment": comment, +                    } +                    if pk_name is not None +                    else default() +                ), +            ) +            for table_name, cols, pk_name, comment, _ in result +        ) + +    @reflection.cache +    def get_foreign_keys( +        self, +        connection, +        table_name, +        schema=None, +        postgresql_ignore_search_path=False, +        **kw, +    ): +        data = self.get_multi_foreign_keys( +            connection, +            schema=schema, +            filter_names=[table_name], +            postgresql_ignore_search_path=postgresql_ignore_search_path, +            scope=ObjectScope.ANY, +            kind=ObjectKind.ANY, +            **kw, +        ) +        return self._value_or_raise(data, table_name, schema) + +    @lru_cache() +    def _foreing_key_query(self, schema, has_filter_names, scope, kind): +        pg_class_ref = pg_catalog.pg_class.alias("cls_ref") +        pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref") +        relkinds = self._kind_to_relkinds(kind) +        query = ( +            select( +                pg_catalog.pg_class.c.relname, +                pg_catalog.pg_constraint.c.conname, +                # NOTE: avoid calling pg_get_constraintdef when not needed +                # to speed up the query +                sql.case( +                    ( +                        pg_catalog.pg_constraint.c.oid.is_not(None), +                        pg_catalog.pg_get_constraintdef( +                            pg_catalog.pg_constraint.c.oid, True +                        ), +                    ), +                    else_=None, +                ), +                pg_namespace_ref.c.nspname, +                pg_catalog.pg_description.c.description, +            ) +            .select_from(pg_catalog.pg_class) +            .outerjoin( +                pg_catalog.pg_constraint, +                sql.and_( +                    pg_catalog.pg_class.c.oid +                    == pg_catalog.pg_constraint.c.conrelid, +                    pg_catalog.pg_constraint.c.contype == "f", +                ), +            ) +            .outerjoin( +                pg_class_ref, +                pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid, +            ) +            .outerjoin( +                pg_namespace_ref, +                pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid, +            ) +            .outerjoin( +                pg_catalog.pg_description, +                pg_catalog.pg_description.c.objoid +                == pg_catalog.pg_constraint.c.oid, +            ) +            .order_by( +                pg_catalog.pg_class.c.relname, +                pg_catalog.pg_constraint.c.conname, +            ) +            .where(self._pg_class_relkind_condition(relkinds)) +        ) +        query = self._pg_class_filter_scope_schema(query, schema, scope) +        if has_filter_names: +            query = query.where( +                pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) +            ) +        return query + +    @util.memoized_property +    def _fk_regex_pattern(self): +        # optionally quoted token +        qtoken = '(?:"[^"]+"|[A-Za-z0-9_]+?)' + +        # https://www.postgresql.org/docs/current/static/sql-createtable.html +        return re.compile( +            r"FOREIGN KEY \((.*?)\) " +            rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)"  # noqa: E501 +            r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" +            r"[\s]?(ON UPDATE " +            r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" +            r"[\s]?(ON DELETE " +            r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" +            r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" +            r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" +        ) + +    def get_multi_foreign_keys( +        self, +        connection, +        schema, +        filter_names, +        scope, +        kind, +        postgresql_ignore_search_path=False, +        **kw, +    ): +        preparer = self.identifier_preparer + +        has_filter_names, params = self._prepare_filter_names(filter_names) +        query = self._foreing_key_query(schema, has_filter_names, scope, kind) +        result = connection.execute(query, params) + +        FK_REGEX = self._fk_regex_pattern + +        fkeys = defaultdict(list) +        default = ReflectionDefaults.foreign_keys +        for table_name, conname, condef, conschema, comment in result: +            # ensure that each table has an entry, even if it has +            # no foreign keys +            if conname is None: +                fkeys[(schema, table_name)] = default() +                continue +            table_fks = fkeys[(schema, table_name)] +            m = re.search(FK_REGEX, condef).groups() + +            ( +                constrained_columns, +                referred_schema, +                referred_table, +                referred_columns, +                _, +                match, +                _, +                onupdate, +                _, +                ondelete, +                deferrable, +                _, +                initially, +            ) = m + +            if deferrable is not None: +                deferrable = True if deferrable == "DEFERRABLE" else False +            constrained_columns = [ +                preparer._unquote_identifier(x) +                for x in re.split(r"\s*,\s*", constrained_columns) +            ] + +            if postgresql_ignore_search_path: +                # when ignoring search path, we use the actual schema +                # provided it isn't the "default" schema +                if conschema != self.default_schema_name: +                    referred_schema = conschema +                else: +                    referred_schema = schema +            elif referred_schema: +                # referred_schema is the schema that we regexp'ed from +                # pg_get_constraintdef().  If the schema is in the search +                # path, pg_get_constraintdef() will give us None. +                referred_schema = preparer._unquote_identifier(referred_schema) +            elif schema is not None and schema == conschema: +                # If the actual schema matches the schema of the table +                # we're reflecting, then we will use that. +                referred_schema = schema + +            referred_table = preparer._unquote_identifier(referred_table) +            referred_columns = [ +                preparer._unquote_identifier(x) +                for x in re.split(r"\s*,\s", referred_columns) +            ] +            options = { +                k: v +                for k, v in [ +                    ("onupdate", onupdate), +                    ("ondelete", ondelete), +                    ("initially", initially), +                    ("deferrable", deferrable), +                    ("match", match), +                ] +                if v is not None and v != "NO ACTION" +            } +            fkey_d = { +                "name": conname, +                "constrained_columns": constrained_columns, +                "referred_schema": referred_schema, +                "referred_table": referred_table, +                "referred_columns": referred_columns, +                "options": options, +                "comment": comment, +            } +            table_fks.append(fkey_d) +        return fkeys.items() + +    @reflection.cache +    def get_indexes(self, connection, table_name, schema=None, **kw): +        data = self.get_multi_indexes( +            connection, +            schema=schema, +            filter_names=[table_name], +            scope=ObjectScope.ANY, +            kind=ObjectKind.ANY, +            **kw, +        ) +        return self._value_or_raise(data, table_name, schema) + +    @util.memoized_property +    def _index_query(self): +        pg_class_index = pg_catalog.pg_class.alias("cls_idx") +        # NOTE: repeating oids clause improve query performance + +        # subquery to get the columns +        idx_sq = ( +            select( +                pg_catalog.pg_index.c.indexrelid, +                pg_catalog.pg_index.c.indrelid, +                sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), +                sql.func.generate_subscripts( +                    pg_catalog.pg_index.c.indkey, 1 +                ).label("ord"), +            ) +            .where( +                ~pg_catalog.pg_index.c.indisprimary, +                pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), +            ) +            .subquery("idx") +        ) + +        attr_sq = ( +            select( +                idx_sq.c.indexrelid, +                idx_sq.c.indrelid, +                idx_sq.c.ord, +                # NOTE: always using pg_get_indexdef is too slow so just +                # invoke when the element is an expression +                sql.case( +                    ( +                        idx_sq.c.attnum == 0, +                        pg_catalog.pg_get_indexdef( +                            idx_sq.c.indexrelid, idx_sq.c.ord + 1, True +                        ), +                    ), +                    # NOTE: need to cast this since attname is of type "name" +                    # that's limited to 63 bytes, while pg_get_indexdef +                    # returns "text" so its output may get cut +                    else_=pg_catalog.pg_attribute.c.attname.cast(TEXT), +                ).label("element"), +                (idx_sq.c.attnum == 0).label("is_expr"), +            ) +            .select_from(idx_sq) +            .outerjoin( +                # do not remove rows where idx_sq.c.attnum is 0 +                pg_catalog.pg_attribute, +                sql.and_( +                    pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum, +                    pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid, +                ), +            ) +            .where(idx_sq.c.indrelid.in_(bindparam("oids"))) +            .subquery("idx_attr") +        ) + +        cols_sq = ( +            select( +                attr_sq.c.indexrelid, +                sql.func.min(attr_sq.c.indrelid), +                sql.func.array_agg( +                    aggregate_order_by(attr_sq.c.element, attr_sq.c.ord) +                ).label("elements"), +                sql.func.array_agg( +                    aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord) +                ).label("elements_is_expr"), +            ) +            .group_by(attr_sq.c.indexrelid) +            .subquery("idx_cols") +        ) + +        if self.server_version_info >= (11, 0): +            indnkeyatts = pg_catalog.pg_index.c.indnkeyatts +        else: +            indnkeyatts = sql.null().label("indnkeyatts") + +        if self.server_version_info >= (15,): +            nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct +        else: +            nulls_not_distinct = sql.false().label("indnullsnotdistinct") + +        return ( +            select( +                pg_catalog.pg_index.c.indrelid, +                pg_class_index.c.relname.label("relname_index"), +                pg_catalog.pg_index.c.indisunique, +                pg_catalog.pg_constraint.c.conrelid.is_not(None).label( +                    "has_constraint" +                ), +                pg_catalog.pg_index.c.indoption, +                pg_class_index.c.reloptions, +                pg_catalog.pg_am.c.amname, +                # NOTE: pg_get_expr is very fast so this case has almost no +                # performance impact +                sql.case( +                    ( +                        pg_catalog.pg_index.c.indpred.is_not(None), +                        pg_catalog.pg_get_expr( +                            pg_catalog.pg_index.c.indpred, +                            pg_catalog.pg_index.c.indrelid, +                        ), +                    ), +                    else_=None, +                ).label("filter_definition"), +                indnkeyatts, +                nulls_not_distinct, +                cols_sq.c.elements, +                cols_sq.c.elements_is_expr, +            ) +            .select_from(pg_catalog.pg_index) +            .where( +                pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), +                ~pg_catalog.pg_index.c.indisprimary, +            ) +            .join( +                pg_class_index, +                pg_catalog.pg_index.c.indexrelid == pg_class_index.c.oid, +            ) +            .join( +                pg_catalog.pg_am, +                pg_class_index.c.relam == pg_catalog.pg_am.c.oid, +            ) +            .outerjoin( +                cols_sq, +                pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid, +            ) +            .outerjoin( +                pg_catalog.pg_constraint, +                sql.and_( +                    pg_catalog.pg_index.c.indrelid +                    == pg_catalog.pg_constraint.c.conrelid, +                    pg_catalog.pg_index.c.indexrelid +                    == pg_catalog.pg_constraint.c.conindid, +                    pg_catalog.pg_constraint.c.contype +                    == sql.any_(_array.array(("p", "u", "x"))), +                ), +            ) +            .order_by(pg_catalog.pg_index.c.indrelid, pg_class_index.c.relname) +        ) + +    def get_multi_indexes( +        self, connection, schema, filter_names, scope, kind, **kw +    ): +        table_oids = self._get_table_oids( +            connection, schema, filter_names, scope, kind, **kw +        ) + +        indexes = defaultdict(list) +        default = ReflectionDefaults.indexes + +        batches = list(table_oids) + +        while batches: +            batch = batches[0:3000] +            batches[0:3000] = [] + +            result = connection.execute( +                self._index_query, {"oids": [r[0] for r in batch]} +            ).mappings() + +            result_by_oid = defaultdict(list) +            for row_dict in result: +                result_by_oid[row_dict["indrelid"]].append(row_dict) + +            for oid, table_name in batch: +                if oid not in result_by_oid: +                    # ensure that each table has an entry, even if reflection +                    # is skipped because not supported +                    indexes[(schema, table_name)] = default() +                    continue + +                for row in result_by_oid[oid]: +                    index_name = row["relname_index"] + +                    table_indexes = indexes[(schema, table_name)] + +                    all_elements = row["elements"] +                    all_elements_is_expr = row["elements_is_expr"] +                    indnkeyatts = row["indnkeyatts"] +                    # "The number of key columns in the index, not counting any +                    # included columns, which are merely stored and do not +                    # participate in the index semantics" +                    if indnkeyatts and len(all_elements) > indnkeyatts: +                        # this is a "covering index" which has INCLUDE columns +                        # as well as regular index columns +                        inc_cols = all_elements[indnkeyatts:] +                        idx_elements = all_elements[:indnkeyatts] +                        idx_elements_is_expr = all_elements_is_expr[ +                            :indnkeyatts +                        ] +                        # postgresql does not support expression on included +                        # columns as of v14: "ERROR: expressions are not +                        # supported in included columns". +                        assert all( +                            not is_expr +                            for is_expr in all_elements_is_expr[indnkeyatts:] +                        ) +                    else: +                        idx_elements = all_elements +                        idx_elements_is_expr = all_elements_is_expr +                        inc_cols = [] + +                    index = {"name": index_name, "unique": row["indisunique"]} +                    if any(idx_elements_is_expr): +                        index["column_names"] = [ +                            None if is_expr else expr +                            for expr, is_expr in zip( +                                idx_elements, idx_elements_is_expr +                            ) +                        ] +                        index["expressions"] = idx_elements +                    else: +                        index["column_names"] = idx_elements + +                    sorting = {} +                    for col_index, col_flags in enumerate(row["indoption"]): +                        col_sorting = () +                        # try to set flags only if they differ from PG +                        # defaults... +                        if col_flags & 0x01: +                            col_sorting += ("desc",) +                            if not (col_flags & 0x02): +                                col_sorting += ("nulls_last",) +                        else: +                            if col_flags & 0x02: +                                col_sorting += ("nulls_first",) +                        if col_sorting: +                            sorting[idx_elements[col_index]] = col_sorting +                    if sorting: +                        index["column_sorting"] = sorting +                    if row["has_constraint"]: +                        index["duplicates_constraint"] = index_name + +                    dialect_options = {} +                    if row["reloptions"]: +                        dialect_options["postgresql_with"] = dict( +                            [option.split("=") for option in row["reloptions"]] +                        ) +                    # it *might* be nice to include that this is 'btree' in the +                    # reflection info.  But we don't want an Index object +                    # to have a ``postgresql_using`` in it that is just the +                    # default, so for the moment leaving this out. +                    amname = row["amname"] +                    if amname != "btree": +                        dialect_options["postgresql_using"] = row["amname"] +                    if row["filter_definition"]: +                        dialect_options["postgresql_where"] = row[ +                            "filter_definition" +                        ] +                    if self.server_version_info >= (11,): +                        # NOTE: this is legacy, this is part of +                        # dialect_options now as of #7382 +                        index["include_columns"] = inc_cols +                        dialect_options["postgresql_include"] = inc_cols +                    if row["indnullsnotdistinct"]: +                        # the default is False, so ignore it. +                        dialect_options["postgresql_nulls_not_distinct"] = row[ +                            "indnullsnotdistinct" +                        ] + +                    if dialect_options: +                        index["dialect_options"] = dialect_options + +                    table_indexes.append(index) +        return indexes.items() + +    @reflection.cache +    def get_unique_constraints( +        self, connection, table_name, schema=None, **kw +    ): +        data = self.get_multi_unique_constraints( +            connection, +            schema=schema, +            filter_names=[table_name], +            scope=ObjectScope.ANY, +            kind=ObjectKind.ANY, +            **kw, +        ) +        return self._value_or_raise(data, table_name, schema) + +    def get_multi_unique_constraints( +        self, +        connection, +        schema, +        filter_names, +        scope, +        kind, +        **kw, +    ): +        result = self._reflect_constraint( +            connection, "u", schema, filter_names, scope, kind, **kw +        ) + +        # each table can have multiple unique constraints +        uniques = defaultdict(list) +        default = ReflectionDefaults.unique_constraints +        for table_name, cols, con_name, comment, options in result: +            # ensure a list is created for each table. leave it empty if +            # the table has no unique cosntraint +            if con_name is None: +                uniques[(schema, table_name)] = default() +                continue + +            uc_dict = { +                "column_names": cols, +                "name": con_name, +                "comment": comment, +            } +            if options: +                if options["nullsnotdistinct"]: +                    uc_dict["dialect_options"] = { +                        "postgresql_nulls_not_distinct": options[ +                            "nullsnotdistinct" +                        ] +                    } + +            uniques[(schema, table_name)].append(uc_dict) +        return uniques.items() + +    @reflection.cache +    def get_table_comment(self, connection, table_name, schema=None, **kw): +        data = self.get_multi_table_comment( +            connection, +            schema, +            [table_name], +            scope=ObjectScope.ANY, +            kind=ObjectKind.ANY, +            **kw, +        ) +        return self._value_or_raise(data, table_name, schema) + +    @lru_cache() +    def _comment_query(self, schema, has_filter_names, scope, kind): +        relkinds = self._kind_to_relkinds(kind) +        query = ( +            select( +                pg_catalog.pg_class.c.relname, +                pg_catalog.pg_description.c.description, +            ) +            .select_from(pg_catalog.pg_class) +            .outerjoin( +                pg_catalog.pg_description, +                sql.and_( +                    pg_catalog.pg_class.c.oid +                    == pg_catalog.pg_description.c.objoid, +                    pg_catalog.pg_description.c.objsubid == 0, +                ), +            ) +            .where(self._pg_class_relkind_condition(relkinds)) +        ) +        query = self._pg_class_filter_scope_schema(query, schema, scope) +        if has_filter_names: +            query = query.where( +                pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) +            ) +        return query + +    def get_multi_table_comment( +        self, connection, schema, filter_names, scope, kind, **kw +    ): +        has_filter_names, params = self._prepare_filter_names(filter_names) +        query = self._comment_query(schema, has_filter_names, scope, kind) +        result = connection.execute(query, params) + +        default = ReflectionDefaults.table_comment +        return ( +            ( +                (schema, table), +                {"text": comment} if comment is not None else default(), +            ) +            for table, comment in result +        ) + +    @reflection.cache +    def get_check_constraints(self, connection, table_name, schema=None, **kw): +        data = self.get_multi_check_constraints( +            connection, +            schema, +            [table_name], +            scope=ObjectScope.ANY, +            kind=ObjectKind.ANY, +            **kw, +        ) +        return self._value_or_raise(data, table_name, schema) + +    @lru_cache() +    def _check_constraint_query(self, schema, has_filter_names, scope, kind): +        relkinds = self._kind_to_relkinds(kind) +        query = ( +            select( +                pg_catalog.pg_class.c.relname, +                pg_catalog.pg_constraint.c.conname, +                # NOTE: avoid calling pg_get_constraintdef when not needed +                # to speed up the query +                sql.case( +                    ( +                        pg_catalog.pg_constraint.c.oid.is_not(None), +                        pg_catalog.pg_get_constraintdef( +                            pg_catalog.pg_constraint.c.oid, True +                        ), +                    ), +                    else_=None, +                ), +                pg_catalog.pg_description.c.description, +            ) +            .select_from(pg_catalog.pg_class) +            .outerjoin( +                pg_catalog.pg_constraint, +                sql.and_( +                    pg_catalog.pg_class.c.oid +                    == pg_catalog.pg_constraint.c.conrelid, +                    pg_catalog.pg_constraint.c.contype == "c", +                ), +            ) +            .outerjoin( +                pg_catalog.pg_description, +                pg_catalog.pg_description.c.objoid +                == pg_catalog.pg_constraint.c.oid, +            ) +            .order_by( +                pg_catalog.pg_class.c.relname, +                pg_catalog.pg_constraint.c.conname, +            ) +            .where(self._pg_class_relkind_condition(relkinds)) +        ) +        query = self._pg_class_filter_scope_schema(query, schema, scope) +        if has_filter_names: +            query = query.where( +                pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) +            ) +        return query + +    def get_multi_check_constraints( +        self, connection, schema, filter_names, scope, kind, **kw +    ): +        has_filter_names, params = self._prepare_filter_names(filter_names) +        query = self._check_constraint_query( +            schema, has_filter_names, scope, kind +        ) +        result = connection.execute(query, params) + +        check_constraints = defaultdict(list) +        default = ReflectionDefaults.check_constraints +        for table_name, check_name, src, comment in result: +            # only two cases for check_name and src: both null or both defined +            if check_name is None and src is None: +                check_constraints[(schema, table_name)] = default() +                continue +            # samples: +            # "CHECK (((a > 1) AND (a < 5)))" +            # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))" +            # "CHECK (((a > 1) AND (a < 5))) NOT VALID" +            # "CHECK (some_boolean_function(a))" +            # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)" +            # "CHECK (a NOT NULL) NO INHERIT" +            # "CHECK (a NOT NULL) NO INHERIT NOT VALID" + +            m = re.match( +                r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$", +                src, +                flags=re.DOTALL, +            ) +            if not m: +                util.warn("Could not parse CHECK constraint text: %r" % src) +                sqltext = "" +            else: +                sqltext = re.compile( +                    r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL +                ).sub(r"\1", m.group(1)) +            entry = { +                "name": check_name, +                "sqltext": sqltext, +                "comment": comment, +            } +            if m: +                do = {} +                if " NOT VALID" in m.groups(): +                    do["not_valid"] = True +                if " NO INHERIT" in m.groups(): +                    do["no_inherit"] = True +                if do: +                    entry["dialect_options"] = do + +            check_constraints[(schema, table_name)].append(entry) +        return check_constraints.items() + +    def _pg_type_filter_schema(self, query, schema): +        if schema is None: +            query = query.where( +                pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), +                # ignore pg_catalog schema +                pg_catalog.pg_namespace.c.nspname != "pg_catalog", +            ) +        elif schema != "*": +            query = query.where(pg_catalog.pg_namespace.c.nspname == schema) +        return query + +    @lru_cache() +    def _enum_query(self, schema): +        lbl_agg_sq = ( +            select( +                pg_catalog.pg_enum.c.enumtypid, +                sql.func.array_agg( +                    aggregate_order_by( +                        # NOTE: cast since some postgresql derivatives may +                        # not support array_agg on the name type +                        pg_catalog.pg_enum.c.enumlabel.cast(TEXT), +                        pg_catalog.pg_enum.c.enumsortorder, +                    ) +                ).label("labels"), +            ) +            .group_by(pg_catalog.pg_enum.c.enumtypid) +            .subquery("lbl_agg") +        ) + +        query = ( +            select( +                pg_catalog.pg_type.c.typname.label("name"), +                pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( +                    "visible" +                ), +                pg_catalog.pg_namespace.c.nspname.label("schema"), +                lbl_agg_sq.c.labels.label("labels"), +            ) +            .join( +                pg_catalog.pg_namespace, +                pg_catalog.pg_namespace.c.oid +                == pg_catalog.pg_type.c.typnamespace, +            ) +            .outerjoin( +                lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid +            ) +            .where(pg_catalog.pg_type.c.typtype == "e") +            .order_by( +                pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname +            ) +        ) + +        return self._pg_type_filter_schema(query, schema) + +    @reflection.cache +    def _load_enums(self, connection, schema=None, **kw): +        if not self.supports_native_enum: +            return [] + +        result = connection.execute(self._enum_query(schema)) + +        enums = [] +        for name, visible, schema, labels in result: +            enums.append( +                { +                    "name": name, +                    "schema": schema, +                    "visible": visible, +                    "labels": [] if labels is None else labels, +                } +            ) +        return enums + +    @lru_cache() +    def _domain_query(self, schema): +        con_sq = ( +            select( +                pg_catalog.pg_constraint.c.contypid, +                sql.func.array_agg( +                    pg_catalog.pg_get_constraintdef( +                        pg_catalog.pg_constraint.c.oid, True +                    ) +                ).label("condefs"), +                sql.func.array_agg( +                    # NOTE: cast since some postgresql derivatives may +                    # not support array_agg on the name type +                    pg_catalog.pg_constraint.c.conname.cast(TEXT) +                ).label("connames"), +            ) +            # The domain this constraint is on; zero if not a domain constraint +            .where(pg_catalog.pg_constraint.c.contypid != 0) +            .group_by(pg_catalog.pg_constraint.c.contypid) +            .subquery("domain_constraints") +        ) + +        query = ( +            select( +                pg_catalog.pg_type.c.typname.label("name"), +                pg_catalog.format_type( +                    pg_catalog.pg_type.c.typbasetype, +                    pg_catalog.pg_type.c.typtypmod, +                ).label("attype"), +                (~pg_catalog.pg_type.c.typnotnull).label("nullable"), +                pg_catalog.pg_type.c.typdefault.label("default"), +                pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( +                    "visible" +                ), +                pg_catalog.pg_namespace.c.nspname.label("schema"), +                con_sq.c.condefs, +                con_sq.c.connames, +                pg_catalog.pg_collation.c.collname, +            ) +            .join( +                pg_catalog.pg_namespace, +                pg_catalog.pg_namespace.c.oid +                == pg_catalog.pg_type.c.typnamespace, +            ) +            .outerjoin( +                pg_catalog.pg_collation, +                pg_catalog.pg_type.c.typcollation +                == pg_catalog.pg_collation.c.oid, +            ) +            .outerjoin( +                con_sq, +                pg_catalog.pg_type.c.oid == con_sq.c.contypid, +            ) +            .where(pg_catalog.pg_type.c.typtype == "d") +            .order_by( +                pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname +            ) +        ) +        return self._pg_type_filter_schema(query, schema) + +    @reflection.cache +    def _load_domains(self, connection, schema=None, **kw): +        result = connection.execute(self._domain_query(schema)) + +        domains: List[ReflectedDomain] = [] +        for domain in result.mappings(): +            # strip (30) from character varying(30) +            attype = re.search(r"([^\(]+)", domain["attype"]).group(1) +            constraints: List[ReflectedDomainConstraint] = [] +            if domain["connames"]: +                # When a domain has multiple CHECK constraints, they will +                # be tested in alphabetical order by name. +                sorted_constraints = sorted( +                    zip(domain["connames"], domain["condefs"]), +                    key=lambda t: t[0], +                ) +                for name, def_ in sorted_constraints: +                    # constraint is in the form "CHECK (expression)". +                    # remove "CHECK (" and the tailing ")". +                    check = def_[7:-1] +                    constraints.append({"name": name, "check": check}) + +            domain_rec: ReflectedDomain = { +                "name": domain["name"], +                "schema": domain["schema"], +                "visible": domain["visible"], +                "type": attype, +                "nullable": domain["nullable"], +                "default": domain["default"], +                "constraints": constraints, +                "collation": domain["collname"], +            } +            domains.append(domain_rec) + +        return domains + +    def _set_backslash_escapes(self, connection): +        # this method is provided as an override hook for descendant +        # dialects (e.g. Redshift), so removing it may break them +        std_string = connection.exec_driver_sql( +            "show standard_conforming_strings" +        ).scalar() +        self._backslash_escapes = std_string == "off" | 
