summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py3240
1 files changed, 3240 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py
new file mode 100644
index 0000000..a548b34
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py
@@ -0,0 +1,3240 @@
+# dialects/oracle/base.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+
+r"""
+.. dialect:: oracle
+ :name: Oracle
+ :full_support: 18c
+ :normal_support: 11+
+ :best_effort: 9+
+
+
+Auto Increment Behavior
+-----------------------
+
+SQLAlchemy Table objects which include integer primary keys are usually
+assumed to have "autoincrementing" behavior, meaning they can generate their
+own primary key values upon INSERT. For use within Oracle, two options are
+available, which are the use of IDENTITY columns (Oracle 12 and above only)
+or the association of a SEQUENCE with the column.
+
+Specifying GENERATED AS IDENTITY (Oracle 12 and above)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Starting from version 12 Oracle can make use of identity columns using
+the :class:`_sql.Identity` to specify the autoincrementing behavior::
+
+ t = Table('mytable', metadata,
+ Column('id', Integer, Identity(start=3), primary_key=True),
+ Column(...), ...
+ )
+
+The CREATE TABLE for the above :class:`_schema.Table` object would be:
+
+.. sourcecode:: sql
+
+ CREATE TABLE mytable (
+ id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3),
+ ...,
+ PRIMARY KEY (id)
+ )
+
+The :class:`_schema.Identity` object support many options to control the
+"autoincrementing" behavior of the column, like the starting value, the
+incrementing value, etc.
+In addition to the standard options, Oracle supports setting
+:paramref:`_schema.Identity.always` to ``None`` to use the default
+generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports
+setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL
+in conjunction with a 'BY DEFAULT' identity column.
+
+Using a SEQUENCE (all Oracle versions)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Older version of Oracle had no "autoincrement"
+feature, SQLAlchemy relies upon sequences to produce these values. With the
+older Oracle versions, *a sequence must always be explicitly specified to
+enable autoincrement*. This is divergent with the majority of documentation
+examples which assume the usage of an autoincrement-capable database. To
+specify sequences, use the sqlalchemy.schema.Sequence object which is passed
+to a Column construct::
+
+ t = Table('mytable', metadata,
+ Column('id', Integer, Sequence('id_seq', start=1), primary_key=True),
+ Column(...), ...
+ )
+
+This step is also required when using table reflection, i.e. autoload_with=engine::
+
+ t = Table('mytable', metadata,
+ Column('id', Integer, Sequence('id_seq', start=1), primary_key=True),
+ autoload_with=engine
+ )
+
+.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
+ in a :class:`_schema.Column` to specify the option of an autoincrementing
+ column.
+
+.. _oracle_isolation_level:
+
+Transaction Isolation Level / Autocommit
+----------------------------------------
+
+The Oracle database supports "READ COMMITTED" and "SERIALIZABLE" modes of
+isolation. The AUTOCOMMIT isolation level is also supported by the cx_Oracle
+dialect.
+
+To set using per-connection execution options::
+
+ connection = engine.connect()
+ connection = connection.execution_options(
+ isolation_level="AUTOCOMMIT"
+ )
+
+For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle dialect sets the
+level at the session level using ``ALTER SESSION``, which is reverted back
+to its default setting when the connection is returned to the connection
+pool.
+
+Valid values for ``isolation_level`` include:
+
+* ``READ COMMITTED``
+* ``AUTOCOMMIT``
+* ``SERIALIZABLE``
+
+.. note:: The implementation for the
+ :meth:`_engine.Connection.get_isolation_level` method as implemented by the
+ Oracle dialect necessarily forces the start of a transaction using the
+ Oracle LOCAL_TRANSACTION_ID function; otherwise no level is normally
+ readable.
+
+ Additionally, the :meth:`_engine.Connection.get_isolation_level` method will
+ raise an exception if the ``v$transaction`` view is not available due to
+ permissions or other reasons, which is a common occurrence in Oracle
+ installations.
+
+ The cx_Oracle dialect attempts to call the
+ :meth:`_engine.Connection.get_isolation_level` method when the dialect makes
+ its first connection to the database in order to acquire the
+ "default"isolation level. This default level is necessary so that the level
+ can be reset on a connection after it has been temporarily modified using
+ :meth:`_engine.Connection.execution_options` method. In the common event
+ that the :meth:`_engine.Connection.get_isolation_level` method raises an
+ exception due to ``v$transaction`` not being readable as well as any other
+ database-related failure, the level is assumed to be "READ COMMITTED". No
+ warning is emitted for this initial first-connect condition as it is
+ expected to be a common restriction on Oracle databases.
+
+.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_oracle dialect
+ as well as the notion of a default isolation level
+
+.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live
+ reading of the isolation level.
+
+.. versionchanged:: 1.3.22 In the event that the default isolation
+ level cannot be read due to permissions on the v$transaction view as
+ is common in Oracle installations, the default isolation level is hardcoded
+ to "READ COMMITTED" which was the behavior prior to 1.3.21.
+
+.. seealso::
+
+ :ref:`dbapi_autocommit`
+
+Identifier Casing
+-----------------
+
+In Oracle, the data dictionary represents all case insensitive identifier
+names using UPPERCASE text. SQLAlchemy on the other hand considers an
+all-lower case identifier name to be case insensitive. The Oracle dialect
+converts all case insensitive identifiers to and from those two formats during
+schema level communication, such as reflection of tables and indexes. Using
+an UPPERCASE name on the SQLAlchemy side indicates a case sensitive
+identifier, and SQLAlchemy will quote the name - this will cause mismatches
+against data dictionary data received from Oracle, so unless identifier names
+have been truly created as case sensitive (i.e. using quoted names), all
+lowercase names should be used on the SQLAlchemy side.
+
+.. _oracle_max_identifier_lengths:
+
+Max Identifier Lengths
+----------------------
+
+Oracle has changed the default max identifier length as of Oracle Server
+version 12.2. Prior to this version, the length was 30, and for 12.2 and
+greater it is now 128. This change impacts SQLAlchemy in the area of
+generated SQL label names as well as the generation of constraint names,
+particularly in the case where the constraint naming convention feature
+described at :ref:`constraint_naming_conventions` is being used.
+
+To assist with this change and others, Oracle includes the concept of a
+"compatibility" version, which is a version number that is independent of the
+actual server version in order to assist with migration of Oracle databases,
+and may be configured within the Oracle server itself. This compatibility
+version is retrieved using the query ``SELECT value FROM v$parameter WHERE
+name = 'compatible';``. The SQLAlchemy Oracle dialect, when tasked with
+determining the default max identifier length, will attempt to use this query
+upon first connect in order to determine the effective compatibility version of
+the server, which determines what the maximum allowed identifier length is for
+the server. If the table is not available, the server version information is
+used instead.
+
+As of SQLAlchemy 1.4, the default max identifier length for the Oracle dialect
+is 128 characters. Upon first connect, the compatibility version is detected
+and if it is less than Oracle version 12.2, the max identifier length is
+changed to be 30 characters. In all cases, setting the
+:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this
+change and the value given will be used as is::
+
+ engine = create_engine(
+ "oracle+cx_oracle://scott:tiger@oracle122",
+ max_identifier_length=30)
+
+The maximum identifier length comes into play both when generating anonymized
+SQL labels in SELECT statements, but more crucially when generating constraint
+names from a naming convention. It is this area that has created the need for
+SQLAlchemy to change this default conservatively. For example, the following
+naming convention produces two very different constraint names based on the
+identifier length::
+
+ from sqlalchemy import Column
+ from sqlalchemy import Index
+ from sqlalchemy import Integer
+ from sqlalchemy import MetaData
+ from sqlalchemy import Table
+ from sqlalchemy.dialects import oracle
+ from sqlalchemy.schema import CreateIndex
+
+ m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"})
+
+ t = Table(
+ "t",
+ m,
+ Column("some_column_name_1", Integer),
+ Column("some_column_name_2", Integer),
+ Column("some_column_name_3", Integer),
+ )
+
+ ix = Index(
+ None,
+ t.c.some_column_name_1,
+ t.c.some_column_name_2,
+ t.c.some_column_name_3,
+ )
+
+ oracle_dialect = oracle.dialect(max_identifier_length=30)
+ print(CreateIndex(ix).compile(dialect=oracle_dialect))
+
+With an identifier length of 30, the above CREATE INDEX looks like::
+
+ CREATE INDEX ix_some_column_name_1s_70cd ON t
+ (some_column_name_1, some_column_name_2, some_column_name_3)
+
+However with length=128, it becomes::
+
+ CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t
+ (some_column_name_1, some_column_name_2, some_column_name_3)
+
+Applications which have run versions of SQLAlchemy prior to 1.4 on an Oracle
+server version 12.2 or greater are therefore subject to the scenario of a
+database migration that wishes to "DROP CONSTRAINT" on a name that was
+previously generated with the shorter length. This migration will fail when
+the identifier length is changed without the name of the index or constraint
+first being adjusted. Such applications are strongly advised to make use of
+:paramref:`_sa.create_engine.max_identifier_length`
+in order to maintain control
+of the generation of truncated names, and to fully review and test all database
+migrations in a staging environment when changing this value to ensure that the
+impact of this change has been mitigated.
+
+.. versionchanged:: 1.4 the default max_identifier_length for Oracle is 128
+ characters, which is adjusted down to 30 upon first connect if an older
+ version of Oracle server (compatibility version < 12.2) is detected.
+
+
+LIMIT/OFFSET/FETCH Support
+--------------------------
+
+Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make
+use of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming
+Oracle 12c or above, and assuming the SELECT statement is not embedded within
+a compound statement like UNION. This syntax is also available directly by using
+the :meth:`_sql.Select.fetch` method.
+
+.. versionchanged:: 2.0 the Oracle dialect now uses
+ ``FETCH FIRST N ROW / OFFSET N ROWS`` for all
+ :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` usage including
+ within the ORM and legacy :class:`_orm.Query`. To force the legacy
+ behavior using window functions, specify the ``enable_offset_fetch=False``
+ dialect parameter to :func:`_sa.create_engine`.
+
+The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle version
+by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`, which
+will force the use of "legacy" mode that makes use of window functions.
+This mode is also selected automatically when using a version of Oracle
+prior to 12c.
+
+When using legacy mode, or when a :class:`.Select` statement
+with limit/offset is embedded in a compound statement, an emulated approach for
+LIMIT / OFFSET based on window functions is used, which involves creation of a
+subquery using ``ROW_NUMBER`` that is prone to performance issues as well as
+SQL construction issues for complex statements. However, this approach is
+supported by all Oracle versions. See notes below.
+
+Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the
+ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an
+Oracle version prior to 12c, the following notes apply:
+
+* SQLAlchemy currently makes use of ROWNUM to achieve
+ LIMIT/OFFSET; the exact methodology is taken from
+ https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
+
+* the "FIRST_ROWS()" optimization keyword is not used by default. To enable
+ the usage of this optimization directive, specify ``optimize_limits=True``
+ to :func:`_sa.create_engine`.
+
+ .. versionchanged:: 1.4
+ The Oracle dialect renders limit/offset integer values using a "post
+ compile" scheme which renders the integer directly before passing the
+ statement to the cursor for execution. The ``use_binds_for_limits`` flag
+ no longer has an effect.
+
+ .. seealso::
+
+ :ref:`change_4808`.
+
+.. _oracle_returning:
+
+RETURNING Support
+-----------------
+
+The Oracle database supports RETURNING fully for INSERT, UPDATE and DELETE
+statements that are invoked with a single collection of bound parameters
+(that is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally
+support RETURNING with :term:`executemany` statements). Multiple rows may be
+returned as well.
+
+.. versionchanged:: 2.0 the Oracle backend has full support for RETURNING
+ on parity with other backends.
+
+
+
+ON UPDATE CASCADE
+-----------------
+
+Oracle doesn't have native ON UPDATE CASCADE functionality. A trigger based
+solution is available at
+https://asktom.oracle.com/tkyte/update_cascade/index.html .
+
+When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
+cascading updates - specify ForeignKey objects using the
+"deferrable=True, initially='deferred'" keyword arguments,
+and specify "passive_updates=False" on each relationship().
+
+Oracle 8 Compatibility
+----------------------
+
+.. warning:: The status of Oracle 8 compatibility is not known for SQLAlchemy
+ 2.0.
+
+When Oracle 8 is detected, the dialect internally configures itself to the
+following behaviors:
+
+* the use_ansi flag is set to False. This has the effect of converting all
+ JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
+ makes use of Oracle's (+) operator.
+
+* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
+ the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued
+ instead. This because these types don't seem to work correctly on Oracle 8
+ even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and
+ :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
+ NVARCHAR2 and NCLOB.
+
+
+Synonym/DBLINK Reflection
+-------------------------
+
+When using reflection with Table objects, the dialect can optionally search
+for tables indicated by synonyms, either in local or remote schemas or
+accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
+a keyword argument to the :class:`_schema.Table` construct::
+
+ some_table = Table('some_table', autoload_with=some_engine,
+ oracle_resolve_synonyms=True)
+
+When this flag is set, the given name (such as ``some_table`` above) will
+be searched not just in the ``ALL_TABLES`` view, but also within the
+``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
+name. If the synonym is located and refers to a DBLINK, the oracle dialect
+knows how to locate the table's information using DBLINK syntax(e.g.
+``@dblink``).
+
+``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
+accepted, including methods such as :meth:`_schema.MetaData.reflect` and
+:meth:`_reflection.Inspector.get_columns`.
+
+If synonyms are not in use, this flag should be left disabled.
+
+.. _oracle_constraint_reflection:
+
+Constraint Reflection
+---------------------
+
+The Oracle dialect can return information about foreign key, unique, and
+CHECK constraints, as well as indexes on tables.
+
+Raw information regarding these constraints can be acquired using
+:meth:`_reflection.Inspector.get_foreign_keys`,
+:meth:`_reflection.Inspector.get_unique_constraints`,
+:meth:`_reflection.Inspector.get_check_constraints`, and
+:meth:`_reflection.Inspector.get_indexes`.
+
+.. versionchanged:: 1.2 The Oracle dialect can now reflect UNIQUE and
+ CHECK constraints.
+
+When using reflection at the :class:`_schema.Table` level, the
+:class:`_schema.Table`
+will also include these constraints.
+
+Note the following caveats:
+
+* When using the :meth:`_reflection.Inspector.get_check_constraints` method,
+ Oracle
+ builds a special "IS NOT NULL" constraint for columns that specify
+ "NOT NULL". This constraint is **not** returned by default; to include
+ the "IS NOT NULL" constraints, pass the flag ``include_all=True``::
+
+ from sqlalchemy import create_engine, inspect
+
+ engine = create_engine("oracle+cx_oracle://s:t@dsn")
+ inspector = inspect(engine)
+ all_check_constraints = inspector.get_check_constraints(
+ "some_table", include_all=True)
+
+* in most cases, when reflecting a :class:`_schema.Table`,
+ a UNIQUE constraint will
+ **not** be available as a :class:`.UniqueConstraint` object, as Oracle
+ mirrors unique constraints with a UNIQUE index in most cases (the exception
+ seems to be when two or more unique constraints represent the same columns);
+ the :class:`_schema.Table` will instead represent these using
+ :class:`.Index`
+ with the ``unique=True`` flag set.
+
+* Oracle creates an implicit index for the primary key of a table; this index
+ is **excluded** from all index results.
+
+* the list of columns reflected for an index will not include column names
+ that start with SYS_NC.
+
+Table names with SYSTEM/SYSAUX tablespaces
+-------------------------------------------
+
+The :meth:`_reflection.Inspector.get_table_names` and
+:meth:`_reflection.Inspector.get_temp_table_names`
+methods each return a list of table names for the current engine. These methods
+are also part of the reflection which occurs within an operation such as
+:meth:`_schema.MetaData.reflect`. By default,
+these operations exclude the ``SYSTEM``
+and ``SYSAUX`` tablespaces from the operation. In order to change this, the
+default list of tablespaces excluded can be changed at the engine level using
+the ``exclude_tablespaces`` parameter::
+
+ # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
+ e = create_engine(
+ "oracle+cx_oracle://scott:tiger@xe",
+ exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"])
+
+DateTime Compatibility
+----------------------
+
+Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``,
+which can actually store a date and time value. For this reason, the Oracle
+dialect provides a type :class:`_oracle.DATE` which is a subclass of
+:class:`.DateTime`. This type has no special behavior, and is only
+present as a "marker" for this type; additionally, when a database column
+is reflected and the type is reported as ``DATE``, the time-supporting
+:class:`_oracle.DATE` type is used.
+
+.. _oracle_table_options:
+
+Oracle Table Options
+-------------------------
+
+The CREATE TABLE phrase supports the following options with Oracle
+in conjunction with the :class:`_schema.Table` construct:
+
+
+* ``ON COMMIT``::
+
+ Table(
+ "some_table", metadata, ...,
+ prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS')
+
+* ``COMPRESS``::
+
+ Table('mytable', metadata, Column('data', String(32)),
+ oracle_compress=True)
+
+ Table('mytable', metadata, Column('data', String(32)),
+ oracle_compress=6)
+
+ The ``oracle_compress`` parameter accepts either an integer compression
+ level, or ``True`` to use the default compression level.
+
+.. _oracle_index_options:
+
+Oracle Specific Index Options
+-----------------------------
+
+Bitmap Indexes
+~~~~~~~~~~~~~~
+
+You can specify the ``oracle_bitmap`` parameter to create a bitmap index
+instead of a B-tree index::
+
+ Index('my_index', my_table.c.data, oracle_bitmap=True)
+
+Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
+check for such limitations, only the database will.
+
+Index compression
+~~~~~~~~~~~~~~~~~
+
+Oracle has a more efficient storage mode for indexes containing lots of
+repeated values. Use the ``oracle_compress`` parameter to turn on key
+compression::
+
+ Index('my_index', my_table.c.data, oracle_compress=True)
+
+ Index('my_index', my_table.c.data1, my_table.c.data2, unique=True,
+ oracle_compress=1)
+
+The ``oracle_compress`` parameter accepts either an integer specifying the
+number of prefix columns to compress, or ``True`` to use the default (all
+columns for non-unique indexes, all but the last column for unique indexes).
+
+""" # noqa
+
+from __future__ import annotations
+
+from collections import defaultdict
+from functools import lru_cache
+from functools import wraps
+import re
+
+from . import dictionary
+from .types import _OracleBoolean
+from .types import _OracleDate
+from .types import BFILE
+from .types import BINARY_DOUBLE
+from .types import BINARY_FLOAT
+from .types import DATE
+from .types import FLOAT
+from .types import INTERVAL
+from .types import LONG
+from .types import NCLOB
+from .types import NUMBER
+from .types import NVARCHAR2 # noqa
+from .types import OracleRaw # noqa
+from .types import RAW
+from .types import ROWID # noqa
+from .types import TIMESTAMP
+from .types import VARCHAR2 # noqa
+from ... import Computed
+from ... import exc
+from ... import schema as sa_schema
+from ... import sql
+from ... import util
+from ...engine import default
+from ...engine import ObjectKind
+from ...engine import ObjectScope
+from ...engine import reflection
+from ...engine.reflection import ReflectionDefaults
+from ...sql import and_
+from ...sql import bindparam
+from ...sql import compiler
+from ...sql import expression
+from ...sql import func
+from ...sql import null
+from ...sql import or_
+from ...sql import select
+from ...sql import sqltypes
+from ...sql import util as sql_util
+from ...sql import visitors
+from ...sql.visitors import InternalTraversal
+from ...types import BLOB
+from ...types import CHAR
+from ...types import CLOB
+from ...types import DOUBLE_PRECISION
+from ...types import INTEGER
+from ...types import NCHAR
+from ...types import NVARCHAR
+from ...types import REAL
+from ...types import VARCHAR
+
+RESERVED_WORDS = set(
+ "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
+ "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
+ "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
+ "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
+ "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
+ "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
+ "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
+ "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
+ "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
+)
+
+NO_ARG_FNS = set(
+ "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split()
+)
+
+
+colspecs = {
+ sqltypes.Boolean: _OracleBoolean,
+ sqltypes.Interval: INTERVAL,
+ sqltypes.DateTime: DATE,
+ sqltypes.Date: _OracleDate,
+}
+
+ischema_names = {
+ "VARCHAR2": VARCHAR,
+ "NVARCHAR2": NVARCHAR,
+ "CHAR": CHAR,
+ "NCHAR": NCHAR,
+ "DATE": DATE,
+ "NUMBER": NUMBER,
+ "BLOB": BLOB,
+ "BFILE": BFILE,
+ "CLOB": CLOB,
+ "NCLOB": NCLOB,
+ "TIMESTAMP": TIMESTAMP,
+ "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
+ "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP,
+ "INTERVAL DAY TO SECOND": INTERVAL,
+ "RAW": RAW,
+ "FLOAT": FLOAT,
+ "DOUBLE PRECISION": DOUBLE_PRECISION,
+ "REAL": REAL,
+ "LONG": LONG,
+ "BINARY_DOUBLE": BINARY_DOUBLE,
+ "BINARY_FLOAT": BINARY_FLOAT,
+ "ROWID": ROWID,
+}
+
+
+class OracleTypeCompiler(compiler.GenericTypeCompiler):
+ # Note:
+ # Oracle DATE == DATETIME
+ # Oracle does not allow milliseconds in DATE
+ # Oracle does not support TIME columns
+
+ def visit_datetime(self, type_, **kw):
+ return self.visit_DATE(type_, **kw)
+
+ def visit_float(self, type_, **kw):
+ return self.visit_FLOAT(type_, **kw)
+
+ def visit_double(self, type_, **kw):
+ return self.visit_DOUBLE_PRECISION(type_, **kw)
+
+ def visit_unicode(self, type_, **kw):
+ if self.dialect._use_nchar_for_unicode:
+ return self.visit_NVARCHAR2(type_, **kw)
+ else:
+ return self.visit_VARCHAR2(type_, **kw)
+
+ def visit_INTERVAL(self, type_, **kw):
+ return "INTERVAL DAY%s TO SECOND%s" % (
+ type_.day_precision is not None
+ and "(%d)" % type_.day_precision
+ or "",
+ type_.second_precision is not None
+ and "(%d)" % type_.second_precision
+ or "",
+ )
+
+ def visit_LONG(self, type_, **kw):
+ return "LONG"
+
+ def visit_TIMESTAMP(self, type_, **kw):
+ if getattr(type_, "local_timezone", False):
+ return "TIMESTAMP WITH LOCAL TIME ZONE"
+ elif type_.timezone:
+ return "TIMESTAMP WITH TIME ZONE"
+ else:
+ return "TIMESTAMP"
+
+ def visit_DOUBLE_PRECISION(self, type_, **kw):
+ return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
+
+ def visit_BINARY_DOUBLE(self, type_, **kw):
+ return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
+
+ def visit_BINARY_FLOAT(self, type_, **kw):
+ return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
+
+ def visit_FLOAT(self, type_, **kw):
+ kw["_requires_binary_precision"] = True
+ return self._generate_numeric(type_, "FLOAT", **kw)
+
+ def visit_NUMBER(self, type_, **kw):
+ return self._generate_numeric(type_, "NUMBER", **kw)
+
+ def _generate_numeric(
+ self,
+ type_,
+ name,
+ precision=None,
+ scale=None,
+ _requires_binary_precision=False,
+ **kw,
+ ):
+ if precision is None:
+ precision = getattr(type_, "precision", None)
+
+ if _requires_binary_precision:
+ binary_precision = getattr(type_, "binary_precision", None)
+
+ if precision and binary_precision is None:
+ # https://www.oracletutorial.com/oracle-basics/oracle-float/
+ estimated_binary_precision = int(precision / 0.30103)
+ raise exc.ArgumentError(
+ "Oracle FLOAT types use 'binary precision', which does "
+ "not convert cleanly from decimal 'precision'. Please "
+ "specify "
+ f"this type with a separate Oracle variant, such as "
+ f"{type_.__class__.__name__}(precision={precision})."
+ f"with_variant(oracle.FLOAT"
+ f"(binary_precision="
+ f"{estimated_binary_precision}), 'oracle'), so that the "
+ "Oracle specific 'binary_precision' may be specified "
+ "accurately."
+ )
+ else:
+ precision = binary_precision
+
+ if scale is None:
+ scale = getattr(type_, "scale", None)
+
+ if precision is None:
+ return name
+ elif scale is None:
+ n = "%(name)s(%(precision)s)"
+ return n % {"name": name, "precision": precision}
+ else:
+ n = "%(name)s(%(precision)s, %(scale)s)"
+ return n % {"name": name, "precision": precision, "scale": scale}
+
+ def visit_string(self, type_, **kw):
+ return self.visit_VARCHAR2(type_, **kw)
+
+ def visit_VARCHAR2(self, type_, **kw):
+ return self._visit_varchar(type_, "", "2")
+
+ def visit_NVARCHAR2(self, type_, **kw):
+ return self._visit_varchar(type_, "N", "2")
+
+ visit_NVARCHAR = visit_NVARCHAR2
+
+ def visit_VARCHAR(self, type_, **kw):
+ return self._visit_varchar(type_, "", "")
+
+ def _visit_varchar(self, type_, n, num):
+ if not type_.length:
+ return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
+ elif not n and self.dialect._supports_char_length:
+ varchar = "VARCHAR%(two)s(%(length)s CHAR)"
+ return varchar % {"length": type_.length, "two": num}
+ else:
+ varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
+ return varchar % {"length": type_.length, "two": num, "n": n}
+
+ def visit_text(self, type_, **kw):
+ return self.visit_CLOB(type_, **kw)
+
+ def visit_unicode_text(self, type_, **kw):
+ if self.dialect._use_nchar_for_unicode:
+ return self.visit_NCLOB(type_, **kw)
+ else:
+ return self.visit_CLOB(type_, **kw)
+
+ def visit_large_binary(self, type_, **kw):
+ return self.visit_BLOB(type_, **kw)
+
+ def visit_big_integer(self, type_, **kw):
+ return self.visit_NUMBER(type_, precision=19, **kw)
+
+ def visit_boolean(self, type_, **kw):
+ return self.visit_SMALLINT(type_, **kw)
+
+ def visit_RAW(self, type_, **kw):
+ if type_.length:
+ return "RAW(%(length)s)" % {"length": type_.length}
+ else:
+ return "RAW"
+
+ def visit_ROWID(self, type_, **kw):
+ return "ROWID"
+
+
+class OracleCompiler(compiler.SQLCompiler):
+ """Oracle compiler modifies the lexical structure of Select
+ statements to work under non-ANSI configured Oracle databases, if
+ the use_ansi flag is False.
+ """
+
+ compound_keywords = util.update_copy(
+ compiler.SQLCompiler.compound_keywords,
+ {expression.CompoundSelect.EXCEPT: "MINUS"},
+ )
+
+ def __init__(self, *args, **kwargs):
+ self.__wheres = {}
+ super().__init__(*args, **kwargs)
+
+ def visit_mod_binary(self, binary, operator, **kw):
+ return "mod(%s, %s)" % (
+ self.process(binary.left, **kw),
+ self.process(binary.right, **kw),
+ )
+
+ def visit_now_func(self, fn, **kw):
+ return "CURRENT_TIMESTAMP"
+
+ def visit_char_length_func(self, fn, **kw):
+ return "LENGTH" + self.function_argspec(fn, **kw)
+
+ def visit_match_op_binary(self, binary, operator, **kw):
+ return "CONTAINS (%s, %s)" % (
+ self.process(binary.left),
+ self.process(binary.right),
+ )
+
+ def visit_true(self, expr, **kw):
+ return "1"
+
+ def visit_false(self, expr, **kw):
+ return "0"
+
+ def get_cte_preamble(self, recursive):
+ return "WITH"
+
+ def get_select_hint_text(self, byfroms):
+ return " ".join("/*+ %s */" % text for table, text in byfroms.items())
+
+ def function_argspec(self, fn, **kw):
+ if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
+ return compiler.SQLCompiler.function_argspec(self, fn, **kw)
+ else:
+ return ""
+
+ def visit_function(self, func, **kw):
+ text = super().visit_function(func, **kw)
+ if kw.get("asfrom", False):
+ text = "TABLE (%s)" % text
+ return text
+
+ def visit_table_valued_column(self, element, **kw):
+ text = super().visit_table_valued_column(element, **kw)
+ text = text + ".COLUMN_VALUE"
+ return text
+
+ def default_from(self):
+ """Called when a ``SELECT`` statement has no froms,
+ and no ``FROM`` clause is to be appended.
+
+ The Oracle compiler tacks a "FROM DUAL" to the statement.
+ """
+
+ return " FROM DUAL"
+
+ def visit_join(self, join, from_linter=None, **kwargs):
+ if self.dialect.use_ansi:
+ return compiler.SQLCompiler.visit_join(
+ self, join, from_linter=from_linter, **kwargs
+ )
+ else:
+ if from_linter:
+ from_linter.edges.add((join.left, join.right))
+
+ kwargs["asfrom"] = True
+ if isinstance(join.right, expression.FromGrouping):
+ right = join.right.element
+ else:
+ right = join.right
+ return (
+ self.process(join.left, from_linter=from_linter, **kwargs)
+ + ", "
+ + self.process(right, from_linter=from_linter, **kwargs)
+ )
+
+ def _get_nonansi_join_whereclause(self, froms):
+ clauses = []
+
+ def visit_join(join):
+ if join.isouter:
+ # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354
+ # "apply the outer join operator (+) to all columns of B in
+ # the join condition in the WHERE clause" - that is,
+ # unconditionally regardless of operator or the other side
+ def visit_binary(binary):
+ if isinstance(
+ binary.left, expression.ColumnClause
+ ) and join.right.is_derived_from(binary.left.table):
+ binary.left = _OuterJoinColumn(binary.left)
+ elif isinstance(
+ binary.right, expression.ColumnClause
+ ) and join.right.is_derived_from(binary.right.table):
+ binary.right = _OuterJoinColumn(binary.right)
+
+ clauses.append(
+ visitors.cloned_traverse(
+ join.onclause, {}, {"binary": visit_binary}
+ )
+ )
+ else:
+ clauses.append(join.onclause)
+
+ for j in join.left, join.right:
+ if isinstance(j, expression.Join):
+ visit_join(j)
+ elif isinstance(j, expression.FromGrouping):
+ visit_join(j.element)
+
+ for f in froms:
+ if isinstance(f, expression.Join):
+ visit_join(f)
+
+ if not clauses:
+ return None
+ else:
+ return sql.and_(*clauses)
+
+ def visit_outer_join_column(self, vc, **kw):
+ return self.process(vc.column, **kw) + "(+)"
+
+ def visit_sequence(self, seq, **kw):
+ return self.preparer.format_sequence(seq) + ".nextval"
+
+ def get_render_as_alias_suffix(self, alias_name_text):
+ """Oracle doesn't like ``FROM table AS alias``"""
+
+ return " " + alias_name_text
+
+ def returning_clause(
+ self, stmt, returning_cols, *, populate_result_map, **kw
+ ):
+ columns = []
+ binds = []
+
+ for i, column in enumerate(
+ expression._select_iterables(returning_cols)
+ ):
+ if (
+ self.isupdate
+ and isinstance(column, sa_schema.Column)
+ and isinstance(column.server_default, Computed)
+ and not self.dialect._supports_update_returning_computed_cols
+ ):
+ util.warn(
+ "Computed columns don't work with Oracle UPDATE "
+ "statements that use RETURNING; the value of the column "
+ "*before* the UPDATE takes place is returned. It is "
+ "advised to not use RETURNING with an Oracle computed "
+ "column. Consider setting implicit_returning to False on "
+ "the Table object in order to avoid implicit RETURNING "
+ "clauses from being generated for this Table."
+ )
+ if column.type._has_column_expression:
+ col_expr = column.type.column_expression(column)
+ else:
+ col_expr = column
+
+ outparam = sql.outparam("ret_%d" % i, type_=column.type)
+ self.binds[outparam.key] = outparam
+ binds.append(
+ self.bindparam_string(self._truncate_bindparam(outparam))
+ )
+
+ # has_out_parameters would in a normal case be set to True
+ # as a result of the compiler visiting an outparam() object.
+ # in this case, the above outparam() objects are not being
+ # visited. Ensure the statement itself didn't have other
+ # outparam() objects independently.
+ # technically, this could be supported, but as it would be
+ # a very strange use case without a clear rationale, disallow it
+ if self.has_out_parameters:
+ raise exc.InvalidRequestError(
+ "Using explicit outparam() objects with "
+ "UpdateBase.returning() in the same Core DML statement "
+ "is not supported in the Oracle dialect."
+ )
+
+ self._oracle_returning = True
+
+ columns.append(self.process(col_expr, within_columns_clause=False))
+ if populate_result_map:
+ self._add_to_result_map(
+ getattr(col_expr, "name", col_expr._anon_name_label),
+ getattr(col_expr, "name", col_expr._anon_name_label),
+ (
+ column,
+ getattr(column, "name", None),
+ getattr(column, "key", None),
+ ),
+ column.type,
+ )
+
+ return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
+
+ def _row_limit_clause(self, select, **kw):
+ """ORacle 12c supports OFFSET/FETCH operators
+ Use it instead subquery with row_number
+
+ """
+
+ if (
+ select._fetch_clause is not None
+ or not self.dialect._supports_offset_fetch
+ ):
+ return super()._row_limit_clause(
+ select, use_literal_execute_for_simple_int=True, **kw
+ )
+ else:
+ return self.fetch_clause(
+ select,
+ fetch_clause=self._get_limit_or_fetch(select),
+ use_literal_execute_for_simple_int=True,
+ **kw,
+ )
+
+ def _get_limit_or_fetch(self, select):
+ if select._fetch_clause is None:
+ return select._limit_clause
+ else:
+ return select._fetch_clause
+
+ def translate_select_structure(self, select_stmt, **kwargs):
+ select = select_stmt
+
+ if not getattr(select, "_oracle_visit", None):
+ if not self.dialect.use_ansi:
+ froms = self._display_froms_for_select(
+ select, kwargs.get("asfrom", False)
+ )
+ whereclause = self._get_nonansi_join_whereclause(froms)
+ if whereclause is not None:
+ select = select.where(whereclause)
+ select._oracle_visit = True
+
+ # if fetch is used this is not needed
+ if (
+ select._has_row_limiting_clause
+ and not self.dialect._supports_offset_fetch
+ and select._fetch_clause is None
+ ):
+ limit_clause = select._limit_clause
+ offset_clause = select._offset_clause
+
+ if select._simple_int_clause(limit_clause):
+ limit_clause = limit_clause.render_literal_execute()
+
+ if select._simple_int_clause(offset_clause):
+ offset_clause = offset_clause.render_literal_execute()
+
+ # currently using form at:
+ # https://blogs.oracle.com/oraclemagazine/\
+ # on-rownum-and-limiting-results
+
+ orig_select = select
+ select = select._generate()
+ select._oracle_visit = True
+
+ # add expressions to accommodate FOR UPDATE OF
+ for_update = select._for_update_arg
+ if for_update is not None and for_update.of:
+ for_update = for_update._clone()
+ for_update._copy_internals()
+
+ for elem in for_update.of:
+ if not select.selected_columns.contains_column(elem):
+ select = select.add_columns(elem)
+
+ # Wrap the middle select and add the hint
+ inner_subquery = select.alias()
+ limitselect = sql.select(
+ *[
+ c
+ for c in inner_subquery.c
+ if orig_select.selected_columns.corresponding_column(c)
+ is not None
+ ]
+ )
+
+ if (
+ limit_clause is not None
+ and self.dialect.optimize_limits
+ and select._simple_int_clause(limit_clause)
+ ):
+ limitselect = limitselect.prefix_with(
+ expression.text(
+ "/*+ FIRST_ROWS(%s) */"
+ % self.process(limit_clause, **kwargs)
+ )
+ )
+
+ limitselect._oracle_visit = True
+ limitselect._is_wrapper = True
+
+ # add expressions to accommodate FOR UPDATE OF
+ if for_update is not None and for_update.of:
+ adapter = sql_util.ClauseAdapter(inner_subquery)
+ for_update.of = [
+ adapter.traverse(elem) for elem in for_update.of
+ ]
+
+ # If needed, add the limiting clause
+ if limit_clause is not None:
+ if select._simple_int_clause(limit_clause) and (
+ offset_clause is None
+ or select._simple_int_clause(offset_clause)
+ ):
+ max_row = limit_clause
+
+ if offset_clause is not None:
+ max_row = max_row + offset_clause
+
+ else:
+ max_row = limit_clause
+
+ if offset_clause is not None:
+ max_row = max_row + offset_clause
+ limitselect = limitselect.where(
+ sql.literal_column("ROWNUM") <= max_row
+ )
+
+ # If needed, add the ora_rn, and wrap again with offset.
+ if offset_clause is None:
+ limitselect._for_update_arg = for_update
+ select = limitselect
+ else:
+ limitselect = limitselect.add_columns(
+ sql.literal_column("ROWNUM").label("ora_rn")
+ )
+ limitselect._oracle_visit = True
+ limitselect._is_wrapper = True
+
+ if for_update is not None and for_update.of:
+ limitselect_cols = limitselect.selected_columns
+ for elem in for_update.of:
+ if (
+ limitselect_cols.corresponding_column(elem)
+ is None
+ ):
+ limitselect = limitselect.add_columns(elem)
+
+ limit_subquery = limitselect.alias()
+ origselect_cols = orig_select.selected_columns
+ offsetselect = sql.select(
+ *[
+ c
+ for c in limit_subquery.c
+ if origselect_cols.corresponding_column(c)
+ is not None
+ ]
+ )
+
+ offsetselect._oracle_visit = True
+ offsetselect._is_wrapper = True
+
+ if for_update is not None and for_update.of:
+ adapter = sql_util.ClauseAdapter(limit_subquery)
+ for_update.of = [
+ adapter.traverse(elem) for elem in for_update.of
+ ]
+
+ offsetselect = offsetselect.where(
+ sql.literal_column("ora_rn") > offset_clause
+ )
+
+ offsetselect._for_update_arg = for_update
+ select = offsetselect
+
+ return select
+
+ def limit_clause(self, select, **kw):
+ return ""
+
+ def visit_empty_set_expr(self, type_, **kw):
+ return "SELECT 1 FROM DUAL WHERE 1!=1"
+
+ def for_update_clause(self, select, **kw):
+ if self.is_subquery():
+ return ""
+
+ tmp = " FOR UPDATE"
+
+ if select._for_update_arg.of:
+ tmp += " OF " + ", ".join(
+ self.process(elem, **kw) for elem in select._for_update_arg.of
+ )
+
+ if select._for_update_arg.nowait:
+ tmp += " NOWAIT"
+ if select._for_update_arg.skip_locked:
+ tmp += " SKIP LOCKED"
+
+ return tmp
+
+ def visit_is_distinct_from_binary(self, binary, operator, **kw):
+ return "DECODE(%s, %s, 0, 1) = 1" % (
+ self.process(binary.left),
+ self.process(binary.right),
+ )
+
+ def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
+ return "DECODE(%s, %s, 0, 1) = 0" % (
+ self.process(binary.left),
+ self.process(binary.right),
+ )
+
+ def visit_regexp_match_op_binary(self, binary, operator, **kw):
+ string = self.process(binary.left, **kw)
+ pattern = self.process(binary.right, **kw)
+ flags = binary.modifiers["flags"]
+ if flags is None:
+ return "REGEXP_LIKE(%s, %s)" % (string, pattern)
+ else:
+ return "REGEXP_LIKE(%s, %s, %s)" % (
+ string,
+ pattern,
+ self.render_literal_value(flags, sqltypes.STRINGTYPE),
+ )
+
+ def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+ return "NOT %s" % self.visit_regexp_match_op_binary(
+ binary, operator, **kw
+ )
+
+ def visit_regexp_replace_op_binary(self, binary, operator, **kw):
+ string = self.process(binary.left, **kw)
+ pattern_replace = self.process(binary.right, **kw)
+ flags = binary.modifiers["flags"]
+ if flags is None:
+ return "REGEXP_REPLACE(%s, %s)" % (
+ string,
+ pattern_replace,
+ )
+ else:
+ return "REGEXP_REPLACE(%s, %s, %s)" % (
+ string,
+ pattern_replace,
+ self.render_literal_value(flags, sqltypes.STRINGTYPE),
+ )
+
+ def visit_aggregate_strings_func(self, fn, **kw):
+ return "LISTAGG%s" % self.function_argspec(fn, **kw)
+
+
+class OracleDDLCompiler(compiler.DDLCompiler):
+ def define_constraint_cascades(self, constraint):
+ text = ""
+ if constraint.ondelete is not None:
+ text += " ON DELETE %s" % constraint.ondelete
+
+ # oracle has no ON UPDATE CASCADE -
+ # its only available via triggers
+ # https://asktom.oracle.com/tkyte/update_cascade/index.html
+ if constraint.onupdate is not None:
+ util.warn(
+ "Oracle does not contain native UPDATE CASCADE "
+ "functionality - onupdates will not be rendered for foreign "
+ "keys. Consider using deferrable=True, initially='deferred' "
+ "or triggers."
+ )
+
+ return text
+
+ def visit_drop_table_comment(self, drop, **kw):
+ return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table(
+ drop.element
+ )
+
+ def visit_create_index(self, create, **kw):
+ index = create.element
+ self._verify_index_table(index)
+ preparer = self.preparer
+ text = "CREATE "
+ if index.unique:
+ text += "UNIQUE "
+ if index.dialect_options["oracle"]["bitmap"]:
+ text += "BITMAP "
+ text += "INDEX %s ON %s (%s)" % (
+ self._prepared_index_name(index, include_schema=True),
+ preparer.format_table(index.table, use_schema=True),
+ ", ".join(
+ self.sql_compiler.process(
+ expr, include_table=False, literal_binds=True
+ )
+ for expr in index.expressions
+ ),
+ )
+ if index.dialect_options["oracle"]["compress"] is not False:
+ if index.dialect_options["oracle"]["compress"] is True:
+ text += " COMPRESS"
+ else:
+ text += " COMPRESS %d" % (
+ index.dialect_options["oracle"]["compress"]
+ )
+ return text
+
+ def post_create_table(self, table):
+ table_opts = []
+ opts = table.dialect_options["oracle"]
+
+ if opts["on_commit"]:
+ on_commit_options = opts["on_commit"].replace("_", " ").upper()
+ table_opts.append("\n ON COMMIT %s" % on_commit_options)
+
+ if opts["compress"]:
+ if opts["compress"] is True:
+ table_opts.append("\n COMPRESS")
+ else:
+ table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
+
+ return "".join(table_opts)
+
+ def get_identity_options(self, identity_options):
+ text = super().get_identity_options(identity_options)
+ text = text.replace("NO MINVALUE", "NOMINVALUE")
+ text = text.replace("NO MAXVALUE", "NOMAXVALUE")
+ text = text.replace("NO CYCLE", "NOCYCLE")
+ if identity_options.order is not None:
+ text += " ORDER" if identity_options.order else " NOORDER"
+ return text.strip()
+
+ def visit_computed_column(self, generated, **kw):
+ text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
+ generated.sqltext, include_table=False, literal_binds=True
+ )
+ if generated.persisted is True:
+ raise exc.CompileError(
+ "Oracle computed columns do not support 'stored' persistence; "
+ "set the 'persisted' flag to None or False for Oracle support."
+ )
+ elif generated.persisted is False:
+ text += " VIRTUAL"
+ return text
+
+ def visit_identity_column(self, identity, **kw):
+ if identity.always is None:
+ kind = ""
+ else:
+ kind = "ALWAYS" if identity.always else "BY DEFAULT"
+ text = "GENERATED %s" % kind
+ if identity.on_null:
+ text += " ON NULL"
+ text += " AS IDENTITY"
+ options = self.get_identity_options(identity)
+ if options:
+ text += " (%s)" % options
+ return text
+
+
+class OracleIdentifierPreparer(compiler.IdentifierPreparer):
+ reserved_words = {x.lower() for x in RESERVED_WORDS}
+ illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
+ ["_", "$"]
+ )
+
+ def _bindparam_requires_quotes(self, value):
+ """Return True if the given identifier requires quoting."""
+ lc_value = value.lower()
+ return (
+ lc_value in self.reserved_words
+ or value[0] in self.illegal_initial_characters
+ or not self.legal_characters.match(str(value))
+ )
+
+ def format_savepoint(self, savepoint):
+ name = savepoint.ident.lstrip("_")
+ return super().format_savepoint(savepoint, name)
+
+
+class OracleExecutionContext(default.DefaultExecutionContext):
+ def fire_sequence(self, seq, type_):
+ return self._execute_scalar(
+ "SELECT "
+ + self.identifier_preparer.format_sequence(seq)
+ + ".nextval FROM DUAL",
+ type_,
+ )
+
+ def pre_exec(self):
+ if self.statement and "_oracle_dblink" in self.execution_options:
+ self.statement = self.statement.replace(
+ dictionary.DB_LINK_PLACEHOLDER,
+ self.execution_options["_oracle_dblink"],
+ )
+
+
+class OracleDialect(default.DefaultDialect):
+ name = "oracle"
+ supports_statement_cache = True
+ supports_alter = True
+ max_identifier_length = 128
+
+ _supports_offset_fetch = True
+
+ insert_returning = True
+ update_returning = True
+ delete_returning = True
+
+ div_is_floordiv = False
+
+ supports_simple_order_by_label = False
+ cte_follows_insert = True
+ returns_native_bytes = True
+
+ supports_sequences = True
+ sequences_optional = False
+ postfetch_lastrowid = False
+
+ default_paramstyle = "named"
+ colspecs = colspecs
+ ischema_names = ischema_names
+ requires_name_normalize = True
+
+ supports_comments = True
+
+ supports_default_values = False
+ supports_default_metavalue = True
+ supports_empty_insert = False
+ supports_identity_columns = True
+
+ statement_compiler = OracleCompiler
+ ddl_compiler = OracleDDLCompiler
+ type_compiler_cls = OracleTypeCompiler
+ preparer = OracleIdentifierPreparer
+ execution_ctx_cls = OracleExecutionContext
+
+ reflection_options = ("oracle_resolve_synonyms",)
+
+ _use_nchar_for_unicode = False
+
+ construct_arguments = [
+ (
+ sa_schema.Table,
+ {"resolve_synonyms": False, "on_commit": None, "compress": False},
+ ),
+ (sa_schema.Index, {"bitmap": False, "compress": False}),
+ ]
+
+ @util.deprecated_params(
+ use_binds_for_limits=(
+ "1.4",
+ "The ``use_binds_for_limits`` Oracle dialect parameter is "
+ "deprecated. The dialect now renders LIMIT /OFFSET integers "
+ "inline in all cases using a post-compilation hook, so that the "
+ "value is still represented by a 'bound parameter' on the Core "
+ "Expression side.",
+ )
+ )
+ def __init__(
+ self,
+ use_ansi=True,
+ optimize_limits=False,
+ use_binds_for_limits=None,
+ use_nchar_for_unicode=False,
+ exclude_tablespaces=("SYSTEM", "SYSAUX"),
+ enable_offset_fetch=True,
+ **kwargs,
+ ):
+ default.DefaultDialect.__init__(self, **kwargs)
+ self._use_nchar_for_unicode = use_nchar_for_unicode
+ self.use_ansi = use_ansi
+ self.optimize_limits = optimize_limits
+ self.exclude_tablespaces = exclude_tablespaces
+ self.enable_offset_fetch = self._supports_offset_fetch = (
+ enable_offset_fetch
+ )
+
+ def initialize(self, connection):
+ super().initialize(connection)
+
+ # Oracle 8i has RETURNING:
+ # https://docs.oracle.com/cd/A87860_01/doc/index.htm
+
+ # so does Oracle8:
+ # https://docs.oracle.com/cd/A64702_01/doc/index.htm
+
+ if self._is_oracle_8:
+ self.colspecs = self.colspecs.copy()
+ self.colspecs.pop(sqltypes.Interval)
+ self.use_ansi = False
+
+ self.supports_identity_columns = self.server_version_info >= (12,)
+ self._supports_offset_fetch = (
+ self.enable_offset_fetch and self.server_version_info >= (12,)
+ )
+
+ def _get_effective_compat_server_version_info(self, connection):
+ # dialect does not need compat levels below 12.2, so don't query
+ # in those cases
+
+ if self.server_version_info < (12, 2):
+ return self.server_version_info
+ try:
+ compat = connection.exec_driver_sql(
+ "SELECT value FROM v$parameter WHERE name = 'compatible'"
+ ).scalar()
+ except exc.DBAPIError:
+ compat = None
+
+ if compat:
+ try:
+ return tuple(int(x) for x in compat.split("."))
+ except:
+ return self.server_version_info
+ else:
+ return self.server_version_info
+
+ @property
+ def _is_oracle_8(self):
+ return self.server_version_info and self.server_version_info < (9,)
+
+ @property
+ def _supports_table_compression(self):
+ return self.server_version_info and self.server_version_info >= (10, 1)
+
+ @property
+ def _supports_table_compress_for(self):
+ return self.server_version_info and self.server_version_info >= (11,)
+
+ @property
+ def _supports_char_length(self):
+ return not self._is_oracle_8
+
+ @property
+ def _supports_update_returning_computed_cols(self):
+ # on version 18 this error is no longet present while it happens on 11
+ # it may work also on versions before the 18
+ return self.server_version_info and self.server_version_info >= (18,)
+
+ @property
+ def _supports_except_all(self):
+ return self.server_version_info and self.server_version_info >= (21,)
+
+ def do_release_savepoint(self, connection, name):
+ # Oracle does not support RELEASE SAVEPOINT
+ pass
+
+ def _check_max_identifier_length(self, connection):
+ if self._get_effective_compat_server_version_info(connection) < (
+ 12,
+ 2,
+ ):
+ return 30
+ else:
+ # use the default
+ return None
+
+ def get_isolation_level_values(self, dbapi_connection):
+ return ["READ COMMITTED", "SERIALIZABLE"]
+
+ def get_default_isolation_level(self, dbapi_conn):
+ try:
+ return self.get_isolation_level(dbapi_conn)
+ except NotImplementedError:
+ raise
+ except:
+ return "READ COMMITTED"
+
+ def _execute_reflection(
+ self, connection, query, dblink, returns_long, params=None
+ ):
+ if dblink and not dblink.startswith("@"):
+ dblink = f"@{dblink}"
+ execution_options = {
+ # handle db links
+ "_oracle_dblink": dblink or "",
+ # override any schema translate map
+ "schema_translate_map": None,
+ }
+
+ if dblink and returns_long:
+ # Oracle seems to error with
+ # "ORA-00997: illegal use of LONG datatype" when returning
+ # LONG columns via a dblink in a query with bind params
+ # This type seems to be very hard to cast into something else
+ # so it seems easier to just use bind param in this case
+ def visit_bindparam(bindparam):
+ bindparam.literal_execute = True
+
+ query = visitors.cloned_traverse(
+ query, {}, {"bindparam": visit_bindparam}
+ )
+ return connection.execute(
+ query, params, execution_options=execution_options
+ )
+
+ @util.memoized_property
+ def _has_table_query(self):
+ # materialized views are returned by all_tables
+ tables = (
+ select(
+ dictionary.all_tables.c.table_name,
+ dictionary.all_tables.c.owner,
+ )
+ .union_all(
+ select(
+ dictionary.all_views.c.view_name.label("table_name"),
+ dictionary.all_views.c.owner,
+ )
+ )
+ .subquery("tables_and_views")
+ )
+
+ query = select(tables.c.table_name).where(
+ tables.c.table_name == bindparam("table_name"),
+ tables.c.owner == bindparam("owner"),
+ )
+ return query
+
+ @reflection.cache
+ def has_table(
+ self, connection, table_name, schema=None, dblink=None, **kw
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+ self._ensure_has_table_connection(connection)
+
+ if not schema:
+ schema = self.default_schema_name
+
+ params = {
+ "table_name": self.denormalize_name(table_name),
+ "owner": self.denormalize_schema_name(schema),
+ }
+ cursor = self._execute_reflection(
+ connection,
+ self._has_table_query,
+ dblink,
+ returns_long=False,
+ params=params,
+ )
+ return bool(cursor.scalar())
+
+ @reflection.cache
+ def has_sequence(
+ self, connection, sequence_name, schema=None, dblink=None, **kw
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+ if not schema:
+ schema = self.default_schema_name
+
+ query = select(dictionary.all_sequences.c.sequence_name).where(
+ dictionary.all_sequences.c.sequence_name
+ == self.denormalize_schema_name(sequence_name),
+ dictionary.all_sequences.c.sequence_owner
+ == self.denormalize_schema_name(schema),
+ )
+
+ cursor = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ )
+ return bool(cursor.scalar())
+
+ def _get_default_schema_name(self, connection):
+ return self.normalize_name(
+ connection.exec_driver_sql(
+ "select sys_context( 'userenv', 'current_schema' ) from dual"
+ ).scalar()
+ )
+
+ def denormalize_schema_name(self, name):
+ # look for quoted_name
+ force = getattr(name, "quote", None)
+ if force is None and name == "public":
+ # look for case insensitive, no quoting specified, "public"
+ return "PUBLIC"
+ return super().denormalize_name(name)
+
+ @reflection.flexi_cache(
+ ("schema", InternalTraversal.dp_string),
+ ("filter_names", InternalTraversal.dp_string_list),
+ ("dblink", InternalTraversal.dp_string),
+ )
+ def _get_synonyms(self, connection, schema, filter_names, dblink, **kw):
+ owner = self.denormalize_schema_name(
+ schema or self.default_schema_name
+ )
+
+ has_filter_names, params = self._prepare_filter_names(filter_names)
+ query = select(
+ dictionary.all_synonyms.c.synonym_name,
+ dictionary.all_synonyms.c.table_name,
+ dictionary.all_synonyms.c.table_owner,
+ dictionary.all_synonyms.c.db_link,
+ ).where(dictionary.all_synonyms.c.owner == owner)
+ if has_filter_names:
+ query = query.where(
+ dictionary.all_synonyms.c.synonym_name.in_(
+ params["filter_names"]
+ )
+ )
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).mappings()
+ return result.all()
+
+ @lru_cache()
+ def _all_objects_query(
+ self, owner, scope, kind, has_filter_names, has_mat_views
+ ):
+ query = (
+ select(dictionary.all_objects.c.object_name)
+ .select_from(dictionary.all_objects)
+ .where(dictionary.all_objects.c.owner == owner)
+ )
+
+ # NOTE: materialized views are listed in all_objects twice;
+ # once as MATERIALIZE VIEW and once as TABLE
+ if kind is ObjectKind.ANY:
+ # materilaized view are listed also as tables so there is no
+ # need to add them to the in_.
+ query = query.where(
+ dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW"))
+ )
+ else:
+ object_type = []
+ if ObjectKind.VIEW in kind:
+ object_type.append("VIEW")
+ if (
+ ObjectKind.MATERIALIZED_VIEW in kind
+ and ObjectKind.TABLE not in kind
+ ):
+ # materilaized view are listed also as tables so there is no
+ # need to add them to the in_ if also selecting tables.
+ object_type.append("MATERIALIZED VIEW")
+ if ObjectKind.TABLE in kind:
+ object_type.append("TABLE")
+ if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind:
+ # materialized view are listed also as tables,
+ # so they need to be filtered out
+ # EXCEPT ALL / MINUS profiles as faster than using
+ # NOT EXISTS or NOT IN with a subquery, but it's in
+ # general faster to get the mat view names and exclude
+ # them only when needed
+ query = query.where(
+ dictionary.all_objects.c.object_name.not_in(
+ bindparam("mat_views")
+ )
+ )
+ query = query.where(
+ dictionary.all_objects.c.object_type.in_(object_type)
+ )
+
+ # handles scope
+ if scope is ObjectScope.DEFAULT:
+ query = query.where(dictionary.all_objects.c.temporary == "N")
+ elif scope is ObjectScope.TEMPORARY:
+ query = query.where(dictionary.all_objects.c.temporary == "Y")
+
+ if has_filter_names:
+ query = query.where(
+ dictionary.all_objects.c.object_name.in_(
+ bindparam("filter_names")
+ )
+ )
+ return query
+
+ @reflection.flexi_cache(
+ ("schema", InternalTraversal.dp_string),
+ ("scope", InternalTraversal.dp_plain_obj),
+ ("kind", InternalTraversal.dp_plain_obj),
+ ("filter_names", InternalTraversal.dp_string_list),
+ ("dblink", InternalTraversal.dp_string),
+ )
+ def _get_all_objects(
+ self, connection, schema, scope, kind, filter_names, dblink, **kw
+ ):
+ owner = self.denormalize_schema_name(
+ schema or self.default_schema_name
+ )
+
+ has_filter_names, params = self._prepare_filter_names(filter_names)
+ has_mat_views = False
+ if (
+ ObjectKind.TABLE in kind
+ and ObjectKind.MATERIALIZED_VIEW not in kind
+ ):
+ # see note in _all_objects_query
+ mat_views = self.get_materialized_view_names(
+ connection, schema, dblink, _normalize=False, **kw
+ )
+ if mat_views:
+ params["mat_views"] = mat_views
+ has_mat_views = True
+
+ query = self._all_objects_query(
+ owner, scope, kind, has_filter_names, has_mat_views
+ )
+
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False, params=params
+ ).scalars()
+
+ return result.all()
+
+ def _handle_synonyms_decorator(fn):
+ @wraps(fn)
+ def wrapper(self, *args, **kwargs):
+ return self._handle_synonyms(fn, *args, **kwargs)
+
+ return wrapper
+
+ def _handle_synonyms(self, fn, connection, *args, **kwargs):
+ if not kwargs.get("oracle_resolve_synonyms", False):
+ return fn(self, connection, *args, **kwargs)
+
+ original_kw = kwargs.copy()
+ schema = kwargs.pop("schema", None)
+ result = self._get_synonyms(
+ connection,
+ schema=schema,
+ filter_names=kwargs.pop("filter_names", None),
+ dblink=kwargs.pop("dblink", None),
+ info_cache=kwargs.get("info_cache", None),
+ )
+
+ dblinks_owners = defaultdict(dict)
+ for row in result:
+ key = row["db_link"], row["table_owner"]
+ tn = self.normalize_name(row["table_name"])
+ dblinks_owners[key][tn] = row["synonym_name"]
+
+ if not dblinks_owners:
+ # No synonym, do the plain thing
+ return fn(self, connection, *args, **original_kw)
+
+ data = {}
+ for (dblink, table_owner), mapping in dblinks_owners.items():
+ call_kw = {
+ **original_kw,
+ "schema": table_owner,
+ "dblink": self.normalize_name(dblink),
+ "filter_names": mapping.keys(),
+ }
+ call_result = fn(self, connection, *args, **call_kw)
+ for (_, tn), value in call_result:
+ synonym_name = self.normalize_name(mapping[tn])
+ data[(schema, synonym_name)] = value
+ return data.items()
+
+ @reflection.cache
+ def get_schema_names(self, connection, dblink=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+ query = select(dictionary.all_users.c.username).order_by(
+ dictionary.all_users.c.username
+ )
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).scalars()
+ return [self.normalize_name(row) for row in result]
+
+ @reflection.cache
+ def get_table_names(self, connection, schema=None, dblink=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+ # note that table_names() isn't loading DBLINKed or synonym'ed tables
+ if schema is None:
+ schema = self.default_schema_name
+
+ den_schema = self.denormalize_schema_name(schema)
+ if kw.get("oracle_resolve_synonyms", False):
+ tables = (
+ select(
+ dictionary.all_tables.c.table_name,
+ dictionary.all_tables.c.owner,
+ dictionary.all_tables.c.iot_name,
+ dictionary.all_tables.c.duration,
+ dictionary.all_tables.c.tablespace_name,
+ )
+ .union_all(
+ select(
+ dictionary.all_synonyms.c.synonym_name.label(
+ "table_name"
+ ),
+ dictionary.all_synonyms.c.owner,
+ dictionary.all_tables.c.iot_name,
+ dictionary.all_tables.c.duration,
+ dictionary.all_tables.c.tablespace_name,
+ )
+ .select_from(dictionary.all_tables)
+ .join(
+ dictionary.all_synonyms,
+ and_(
+ dictionary.all_tables.c.table_name
+ == dictionary.all_synonyms.c.table_name,
+ dictionary.all_tables.c.owner
+ == func.coalesce(
+ dictionary.all_synonyms.c.table_owner,
+ dictionary.all_synonyms.c.owner,
+ ),
+ ),
+ )
+ )
+ .subquery("available_tables")
+ )
+ else:
+ tables = dictionary.all_tables
+
+ query = select(tables.c.table_name)
+ if self.exclude_tablespaces:
+ query = query.where(
+ func.coalesce(
+ tables.c.tablespace_name, "no tablespace"
+ ).not_in(self.exclude_tablespaces)
+ )
+ query = query.where(
+ tables.c.owner == den_schema,
+ tables.c.iot_name.is_(null()),
+ tables.c.duration.is_(null()),
+ )
+
+ # remove materialized views
+ mat_query = select(
+ dictionary.all_mviews.c.mview_name.label("table_name")
+ ).where(dictionary.all_mviews.c.owner == den_schema)
+
+ query = (
+ query.except_all(mat_query)
+ if self._supports_except_all
+ else query.except_(mat_query)
+ )
+
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).scalars()
+ return [self.normalize_name(row) for row in result]
+
+ @reflection.cache
+ def get_temp_table_names(self, connection, dblink=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+ schema = self.denormalize_schema_name(self.default_schema_name)
+
+ query = select(dictionary.all_tables.c.table_name)
+ if self.exclude_tablespaces:
+ query = query.where(
+ func.coalesce(
+ dictionary.all_tables.c.tablespace_name, "no tablespace"
+ ).not_in(self.exclude_tablespaces)
+ )
+ query = query.where(
+ dictionary.all_tables.c.owner == schema,
+ dictionary.all_tables.c.iot_name.is_(null()),
+ dictionary.all_tables.c.duration.is_not(null()),
+ )
+
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).scalars()
+ return [self.normalize_name(row) for row in result]
+
+ @reflection.cache
+ def get_materialized_view_names(
+ self, connection, schema=None, dblink=None, _normalize=True, **kw
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+ if not schema:
+ schema = self.default_schema_name
+
+ query = select(dictionary.all_mviews.c.mview_name).where(
+ dictionary.all_mviews.c.owner
+ == self.denormalize_schema_name(schema)
+ )
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).scalars()
+ if _normalize:
+ return [self.normalize_name(row) for row in result]
+ else:
+ return result.all()
+
+ @reflection.cache
+ def get_view_names(self, connection, schema=None, dblink=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+ if not schema:
+ schema = self.default_schema_name
+
+ query = select(dictionary.all_views.c.view_name).where(
+ dictionary.all_views.c.owner
+ == self.denormalize_schema_name(schema)
+ )
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).scalars()
+ return [self.normalize_name(row) for row in result]
+
+ @reflection.cache
+ def get_sequence_names(self, connection, schema=None, dblink=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+ if not schema:
+ schema = self.default_schema_name
+ query = select(dictionary.all_sequences.c.sequence_name).where(
+ dictionary.all_sequences.c.sequence_owner
+ == self.denormalize_schema_name(schema)
+ )
+
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).scalars()
+ return [self.normalize_name(row) for row in result]
+
+ def _value_or_raise(self, data, table, schema):
+ table = self.normalize_name(str(table))
+ try:
+ return dict(data)[(schema, table)]
+ except KeyError:
+ raise exc.NoSuchTableError(
+ f"{schema}.{table}" if schema else table
+ ) from None
+
+ def _prepare_filter_names(self, filter_names):
+ if filter_names:
+ fn = [self.denormalize_name(name) for name in filter_names]
+ return True, {"filter_names": fn}
+ else:
+ return False, {}
+
+ @reflection.cache
+ def get_table_options(self, connection, table_name, schema=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ data = self.get_multi_table_options(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @lru_cache()
+ def _table_options_query(
+ self, owner, scope, kind, has_filter_names, has_mat_views
+ ):
+ query = select(
+ dictionary.all_tables.c.table_name,
+ dictionary.all_tables.c.compression,
+ dictionary.all_tables.c.compress_for,
+ ).where(dictionary.all_tables.c.owner == owner)
+ if has_filter_names:
+ query = query.where(
+ dictionary.all_tables.c.table_name.in_(
+ bindparam("filter_names")
+ )
+ )
+ if scope is ObjectScope.DEFAULT:
+ query = query.where(dictionary.all_tables.c.duration.is_(null()))
+ elif scope is ObjectScope.TEMPORARY:
+ query = query.where(
+ dictionary.all_tables.c.duration.is_not(null())
+ )
+
+ if (
+ has_mat_views
+ and ObjectKind.TABLE in kind
+ and ObjectKind.MATERIALIZED_VIEW not in kind
+ ):
+ # cant use EXCEPT ALL / MINUS here because we don't have an
+ # excludable row vs. the query above
+ # outerjoin + where null works better on oracle 21 but 11 does
+ # not like it at all. this is the next best thing
+
+ query = query.where(
+ dictionary.all_tables.c.table_name.not_in(
+ bindparam("mat_views")
+ )
+ )
+ elif (
+ ObjectKind.TABLE not in kind
+ and ObjectKind.MATERIALIZED_VIEW in kind
+ ):
+ query = query.where(
+ dictionary.all_tables.c.table_name.in_(bindparam("mat_views"))
+ )
+ return query
+
+ @_handle_synonyms_decorator
+ def get_multi_table_options(
+ self,
+ connection,
+ *,
+ schema,
+ filter_names,
+ scope,
+ kind,
+ dblink=None,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ owner = self.denormalize_schema_name(
+ schema or self.default_schema_name
+ )
+
+ has_filter_names, params = self._prepare_filter_names(filter_names)
+ has_mat_views = False
+
+ if (
+ ObjectKind.TABLE in kind
+ and ObjectKind.MATERIALIZED_VIEW not in kind
+ ):
+ # see note in _table_options_query
+ mat_views = self.get_materialized_view_names(
+ connection, schema, dblink, _normalize=False, **kw
+ )
+ if mat_views:
+ params["mat_views"] = mat_views
+ has_mat_views = True
+ elif (
+ ObjectKind.TABLE not in kind
+ and ObjectKind.MATERIALIZED_VIEW in kind
+ ):
+ mat_views = self.get_materialized_view_names(
+ connection, schema, dblink, _normalize=False, **kw
+ )
+ params["mat_views"] = mat_views
+
+ options = {}
+ default = ReflectionDefaults.table_options
+
+ if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind:
+ query = self._table_options_query(
+ owner, scope, kind, has_filter_names, has_mat_views
+ )
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False, params=params
+ )
+
+ for table, compression, compress_for in result:
+ if compression == "ENABLED":
+ data = {"oracle_compress": compress_for}
+ else:
+ data = default()
+ options[(schema, self.normalize_name(table))] = data
+ if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope:
+ # add the views (no temporary views)
+ for view in self.get_view_names(connection, schema, dblink, **kw):
+ if not filter_names or view in filter_names:
+ options[(schema, view)] = default()
+
+ return options.items()
+
+ @reflection.cache
+ def get_columns(self, connection, table_name, schema=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+
+ data = self.get_multi_columns(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ def _run_batches(
+ self, connection, query, dblink, returns_long, mappings, all_objects
+ ):
+ each_batch = 500
+ batches = list(all_objects)
+ while batches:
+ batch = batches[0:each_batch]
+ batches[0:each_batch] = []
+
+ result = self._execute_reflection(
+ connection,
+ query,
+ dblink,
+ returns_long=returns_long,
+ params={"all_objects": batch},
+ )
+ if mappings:
+ yield from result.mappings()
+ else:
+ yield from result
+
+ @lru_cache()
+ def _column_query(self, owner):
+ all_cols = dictionary.all_tab_cols
+ all_comments = dictionary.all_col_comments
+ all_ids = dictionary.all_tab_identity_cols
+
+ if self.server_version_info >= (12,):
+ add_cols = (
+ all_cols.c.default_on_null,
+ sql.case(
+ (all_ids.c.table_name.is_(None), sql.null()),
+ else_=all_ids.c.generation_type
+ + ","
+ + all_ids.c.identity_options,
+ ).label("identity_options"),
+ )
+ join_identity_cols = True
+ else:
+ add_cols = (
+ sql.null().label("default_on_null"),
+ sql.null().label("identity_options"),
+ )
+ join_identity_cols = False
+
+ # NOTE: on oracle cannot create tables/views without columns and
+ # a table cannot have all column hidden:
+ # ORA-54039: table must have at least one column that is not invisible
+ # all_tab_cols returns data for tables/views/mat-views.
+ # all_tab_cols does not return recycled tables
+
+ query = (
+ select(
+ all_cols.c.table_name,
+ all_cols.c.column_name,
+ all_cols.c.data_type,
+ all_cols.c.char_length,
+ all_cols.c.data_precision,
+ all_cols.c.data_scale,
+ all_cols.c.nullable,
+ all_cols.c.data_default,
+ all_comments.c.comments,
+ all_cols.c.virtual_column,
+ *add_cols,
+ ).select_from(all_cols)
+ # NOTE: all_col_comments has a row for each column even if no
+ # comment is present, so a join could be performed, but there
+ # seems to be no difference compared to an outer join
+ .outerjoin(
+ all_comments,
+ and_(
+ all_cols.c.table_name == all_comments.c.table_name,
+ all_cols.c.column_name == all_comments.c.column_name,
+ all_cols.c.owner == all_comments.c.owner,
+ ),
+ )
+ )
+ if join_identity_cols:
+ query = query.outerjoin(
+ all_ids,
+ and_(
+ all_cols.c.table_name == all_ids.c.table_name,
+ all_cols.c.column_name == all_ids.c.column_name,
+ all_cols.c.owner == all_ids.c.owner,
+ ),
+ )
+
+ query = query.where(
+ all_cols.c.table_name.in_(bindparam("all_objects")),
+ all_cols.c.hidden_column == "NO",
+ all_cols.c.owner == owner,
+ ).order_by(all_cols.c.table_name, all_cols.c.column_id)
+ return query
+
+ @_handle_synonyms_decorator
+ def get_multi_columns(
+ self,
+ connection,
+ *,
+ schema,
+ filter_names,
+ scope,
+ kind,
+ dblink=None,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ owner = self.denormalize_schema_name(
+ schema or self.default_schema_name
+ )
+ query = self._column_query(owner)
+
+ if (
+ filter_names
+ and kind is ObjectKind.ANY
+ and scope is ObjectScope.ANY
+ ):
+ all_objects = [self.denormalize_name(n) for n in filter_names]
+ else:
+ all_objects = self._get_all_objects(
+ connection, schema, scope, kind, filter_names, dblink, **kw
+ )
+
+ columns = defaultdict(list)
+
+ # all_tab_cols.data_default is LONG
+ result = self._run_batches(
+ connection,
+ query,
+ dblink,
+ returns_long=True,
+ mappings=True,
+ all_objects=all_objects,
+ )
+
+ def maybe_int(value):
+ if isinstance(value, float) and value.is_integer():
+ return int(value)
+ else:
+ return value
+
+ remove_size = re.compile(r"\(\d+\)")
+
+ for row_dict in result:
+ table_name = self.normalize_name(row_dict["table_name"])
+ orig_colname = row_dict["column_name"]
+ colname = self.normalize_name(orig_colname)
+ coltype = row_dict["data_type"]
+ precision = maybe_int(row_dict["data_precision"])
+
+ if coltype == "NUMBER":
+ scale = maybe_int(row_dict["data_scale"])
+ if precision is None and scale == 0:
+ coltype = INTEGER()
+ else:
+ coltype = NUMBER(precision, scale)
+ elif coltype == "FLOAT":
+ # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm
+ if precision == 126:
+ # The DOUBLE PRECISION datatype is a floating-point
+ # number with binary precision 126.
+ coltype = DOUBLE_PRECISION()
+ elif precision == 63:
+ # The REAL datatype is a floating-point number with a
+ # binary precision of 63, or 18 decimal.
+ coltype = REAL()
+ else:
+ # non standard precision
+ coltype = FLOAT(binary_precision=precision)
+
+ elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
+ char_length = maybe_int(row_dict["char_length"])
+ coltype = self.ischema_names.get(coltype)(char_length)
+ elif "WITH TIME ZONE" in coltype:
+ coltype = TIMESTAMP(timezone=True)
+ elif "WITH LOCAL TIME ZONE" in coltype:
+ coltype = TIMESTAMP(local_timezone=True)
+ else:
+ coltype = re.sub(remove_size, "", coltype)
+ try:
+ coltype = self.ischema_names[coltype]
+ except KeyError:
+ util.warn(
+ "Did not recognize type '%s' of column '%s'"
+ % (coltype, colname)
+ )
+ coltype = sqltypes.NULLTYPE
+
+ default = row_dict["data_default"]
+ if row_dict["virtual_column"] == "YES":
+ computed = dict(sqltext=default)
+ default = None
+ else:
+ computed = None
+
+ identity_options = row_dict["identity_options"]
+ if identity_options is not None:
+ identity = self._parse_identity_options(
+ identity_options, row_dict["default_on_null"]
+ )
+ default = None
+ else:
+ identity = None
+
+ cdict = {
+ "name": colname,
+ "type": coltype,
+ "nullable": row_dict["nullable"] == "Y",
+ "default": default,
+ "comment": row_dict["comments"],
+ }
+ if orig_colname.lower() == orig_colname:
+ cdict["quote"] = True
+ if computed is not None:
+ cdict["computed"] = computed
+ if identity is not None:
+ cdict["identity"] = identity
+
+ columns[(schema, table_name)].append(cdict)
+
+ # NOTE: default not needed since all tables have columns
+ # default = ReflectionDefaults.columns
+ # return (
+ # (key, value if value else default())
+ # for key, value in columns.items()
+ # )
+ return columns.items()
+
+ def _parse_identity_options(self, identity_options, default_on_null):
+ # identity_options is a string that starts with 'ALWAYS,' or
+ # 'BY DEFAULT,' and continues with
+ # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1,
+ # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N,
+ # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N
+ parts = [p.strip() for p in identity_options.split(",")]
+ identity = {
+ "always": parts[0] == "ALWAYS",
+ "on_null": default_on_null == "YES",
+ }
+
+ for part in parts[1:]:
+ option, value = part.split(":")
+ value = value.strip()
+
+ if "START WITH" in option:
+ identity["start"] = int(value)
+ elif "INCREMENT BY" in option:
+ identity["increment"] = int(value)
+ elif "MAX_VALUE" in option:
+ identity["maxvalue"] = int(value)
+ elif "MIN_VALUE" in option:
+ identity["minvalue"] = int(value)
+ elif "CYCLE_FLAG" in option:
+ identity["cycle"] = value == "Y"
+ elif "CACHE_SIZE" in option:
+ identity["cache"] = int(value)
+ elif "ORDER_FLAG" in option:
+ identity["order"] = value == "Y"
+ return identity
+
+ @reflection.cache
+ def get_table_comment(self, connection, table_name, schema=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ data = self.get_multi_table_comment(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @lru_cache()
+ def _comment_query(self, owner, scope, kind, has_filter_names):
+ # NOTE: all_tab_comments / all_mview_comments have a row for all
+ # object even if they don't have comments
+ queries = []
+ if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind:
+ # all_tab_comments returns also plain views
+ tbl_view = select(
+ dictionary.all_tab_comments.c.table_name,
+ dictionary.all_tab_comments.c.comments,
+ ).where(
+ dictionary.all_tab_comments.c.owner == owner,
+ dictionary.all_tab_comments.c.table_name.not_like("BIN$%"),
+ )
+ if ObjectKind.VIEW not in kind:
+ tbl_view = tbl_view.where(
+ dictionary.all_tab_comments.c.table_type == "TABLE"
+ )
+ elif ObjectKind.TABLE not in kind:
+ tbl_view = tbl_view.where(
+ dictionary.all_tab_comments.c.table_type == "VIEW"
+ )
+ queries.append(tbl_view)
+ if ObjectKind.MATERIALIZED_VIEW in kind:
+ mat_view = select(
+ dictionary.all_mview_comments.c.mview_name.label("table_name"),
+ dictionary.all_mview_comments.c.comments,
+ ).where(
+ dictionary.all_mview_comments.c.owner == owner,
+ dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"),
+ )
+ queries.append(mat_view)
+ if len(queries) == 1:
+ query = queries[0]
+ else:
+ union = sql.union_all(*queries).subquery("tables_and_views")
+ query = select(union.c.table_name, union.c.comments)
+
+ name_col = query.selected_columns.table_name
+
+ if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY):
+ temp = "Y" if scope is ObjectScope.TEMPORARY else "N"
+ # need distinct since materialized view are listed also
+ # as tables in all_objects
+ query = query.distinct().join(
+ dictionary.all_objects,
+ and_(
+ dictionary.all_objects.c.owner == owner,
+ dictionary.all_objects.c.object_name == name_col,
+ dictionary.all_objects.c.temporary == temp,
+ ),
+ )
+ if has_filter_names:
+ query = query.where(name_col.in_(bindparam("filter_names")))
+ return query
+
+ @_handle_synonyms_decorator
+ def get_multi_table_comment(
+ self,
+ connection,
+ *,
+ schema,
+ filter_names,
+ scope,
+ kind,
+ dblink=None,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ owner = self.denormalize_schema_name(
+ schema or self.default_schema_name
+ )
+ has_filter_names, params = self._prepare_filter_names(filter_names)
+ query = self._comment_query(owner, scope, kind, has_filter_names)
+
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False, params=params
+ )
+ default = ReflectionDefaults.table_comment
+ # materialized views by default seem to have a comment like
+ # "snapshot table for snapshot owner.mat_view_name"
+ ignore_mat_view = "snapshot table for snapshot "
+ return (
+ (
+ (schema, self.normalize_name(table)),
+ (
+ {"text": comment}
+ if comment is not None
+ and not comment.startswith(ignore_mat_view)
+ else default()
+ ),
+ )
+ for table, comment in result
+ )
+
+ @reflection.cache
+ def get_indexes(self, connection, table_name, schema=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ data = self.get_multi_indexes(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @lru_cache()
+ def _index_query(self, owner):
+ return (
+ select(
+ dictionary.all_ind_columns.c.table_name,
+ dictionary.all_ind_columns.c.index_name,
+ dictionary.all_ind_columns.c.column_name,
+ dictionary.all_indexes.c.index_type,
+ dictionary.all_indexes.c.uniqueness,
+ dictionary.all_indexes.c.compression,
+ dictionary.all_indexes.c.prefix_length,
+ dictionary.all_ind_columns.c.descend,
+ dictionary.all_ind_expressions.c.column_expression,
+ )
+ .select_from(dictionary.all_ind_columns)
+ .join(
+ dictionary.all_indexes,
+ sql.and_(
+ dictionary.all_ind_columns.c.index_name
+ == dictionary.all_indexes.c.index_name,
+ dictionary.all_ind_columns.c.index_owner
+ == dictionary.all_indexes.c.owner,
+ ),
+ )
+ .outerjoin(
+ # NOTE: this adds about 20% to the query time. Using a
+ # case expression with a scalar subquery only when needed
+ # with the assumption that most indexes are not expression
+ # would be faster but oracle does not like that with
+ # LONG datatype. It errors with:
+ # ORA-00997: illegal use of LONG datatype
+ dictionary.all_ind_expressions,
+ sql.and_(
+ dictionary.all_ind_expressions.c.index_name
+ == dictionary.all_ind_columns.c.index_name,
+ dictionary.all_ind_expressions.c.index_owner
+ == dictionary.all_ind_columns.c.index_owner,
+ dictionary.all_ind_expressions.c.column_position
+ == dictionary.all_ind_columns.c.column_position,
+ ),
+ )
+ .where(
+ dictionary.all_indexes.c.table_owner == owner,
+ dictionary.all_indexes.c.table_name.in_(
+ bindparam("all_objects")
+ ),
+ )
+ .order_by(
+ dictionary.all_ind_columns.c.index_name,
+ dictionary.all_ind_columns.c.column_position,
+ )
+ )
+
+ @reflection.flexi_cache(
+ ("schema", InternalTraversal.dp_string),
+ ("dblink", InternalTraversal.dp_string),
+ ("all_objects", InternalTraversal.dp_string_list),
+ )
+ def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw):
+ owner = self.denormalize_schema_name(
+ schema or self.default_schema_name
+ )
+
+ query = self._index_query(owner)
+
+ pks = {
+ row_dict["constraint_name"]
+ for row_dict in self._get_all_constraint_rows(
+ connection, schema, dblink, all_objects, **kw
+ )
+ if row_dict["constraint_type"] == "P"
+ }
+
+ # all_ind_expressions.column_expression is LONG
+ result = self._run_batches(
+ connection,
+ query,
+ dblink,
+ returns_long=True,
+ mappings=True,
+ all_objects=all_objects,
+ )
+
+ return [
+ row_dict
+ for row_dict in result
+ if row_dict["index_name"] not in pks
+ ]
+
+ @_handle_synonyms_decorator
+ def get_multi_indexes(
+ self,
+ connection,
+ *,
+ schema,
+ filter_names,
+ scope,
+ kind,
+ dblink=None,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ all_objects = self._get_all_objects(
+ connection, schema, scope, kind, filter_names, dblink, **kw
+ )
+
+ uniqueness = {"NONUNIQUE": False, "UNIQUE": True}
+ enabled = {"DISABLED": False, "ENABLED": True}
+ is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"}
+
+ indexes = defaultdict(dict)
+
+ for row_dict in self._get_indexes_rows(
+ connection, schema, dblink, all_objects, **kw
+ ):
+ index_name = self.normalize_name(row_dict["index_name"])
+ table_name = self.normalize_name(row_dict["table_name"])
+ table_indexes = indexes[(schema, table_name)]
+
+ if index_name not in table_indexes:
+ table_indexes[index_name] = index_dict = {
+ "name": index_name,
+ "column_names": [],
+ "dialect_options": {},
+ "unique": uniqueness.get(row_dict["uniqueness"], False),
+ }
+ do = index_dict["dialect_options"]
+ if row_dict["index_type"] in is_bitmap:
+ do["oracle_bitmap"] = True
+ if enabled.get(row_dict["compression"], False):
+ do["oracle_compress"] = row_dict["prefix_length"]
+
+ else:
+ index_dict = table_indexes[index_name]
+
+ expr = row_dict["column_expression"]
+ if expr is not None:
+ index_dict["column_names"].append(None)
+ if "expressions" in index_dict:
+ index_dict["expressions"].append(expr)
+ else:
+ index_dict["expressions"] = index_dict["column_names"][:-1]
+ index_dict["expressions"].append(expr)
+
+ if row_dict["descend"].lower() != "asc":
+ assert row_dict["descend"].lower() == "desc"
+ cs = index_dict.setdefault("column_sorting", {})
+ cs[expr] = ("desc",)
+ else:
+ assert row_dict["descend"].lower() == "asc"
+ cn = self.normalize_name(row_dict["column_name"])
+ index_dict["column_names"].append(cn)
+ if "expressions" in index_dict:
+ index_dict["expressions"].append(cn)
+
+ default = ReflectionDefaults.indexes
+
+ return (
+ (key, list(indexes[key].values()) if key in indexes else default())
+ for key in (
+ (schema, self.normalize_name(obj_name))
+ for obj_name in all_objects
+ )
+ )
+
+ @reflection.cache
+ def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ data = self.get_multi_pk_constraint(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @lru_cache()
+ def _constraint_query(self, owner):
+ local = dictionary.all_cons_columns.alias("local")
+ remote = dictionary.all_cons_columns.alias("remote")
+ return (
+ select(
+ dictionary.all_constraints.c.table_name,
+ dictionary.all_constraints.c.constraint_type,
+ dictionary.all_constraints.c.constraint_name,
+ local.c.column_name.label("local_column"),
+ remote.c.table_name.label("remote_table"),
+ remote.c.column_name.label("remote_column"),
+ remote.c.owner.label("remote_owner"),
+ dictionary.all_constraints.c.search_condition,
+ dictionary.all_constraints.c.delete_rule,
+ )
+ .select_from(dictionary.all_constraints)
+ .join(
+ local,
+ and_(
+ local.c.owner == dictionary.all_constraints.c.owner,
+ dictionary.all_constraints.c.constraint_name
+ == local.c.constraint_name,
+ ),
+ )
+ .outerjoin(
+ remote,
+ and_(
+ dictionary.all_constraints.c.r_owner == remote.c.owner,
+ dictionary.all_constraints.c.r_constraint_name
+ == remote.c.constraint_name,
+ or_(
+ remote.c.position.is_(sql.null()),
+ local.c.position == remote.c.position,
+ ),
+ ),
+ )
+ .where(
+ dictionary.all_constraints.c.owner == owner,
+ dictionary.all_constraints.c.table_name.in_(
+ bindparam("all_objects")
+ ),
+ dictionary.all_constraints.c.constraint_type.in_(
+ ("R", "P", "U", "C")
+ ),
+ )
+ .order_by(
+ dictionary.all_constraints.c.constraint_name, local.c.position
+ )
+ )
+
+ @reflection.flexi_cache(
+ ("schema", InternalTraversal.dp_string),
+ ("dblink", InternalTraversal.dp_string),
+ ("all_objects", InternalTraversal.dp_string_list),
+ )
+ def _get_all_constraint_rows(
+ self, connection, schema, dblink, all_objects, **kw
+ ):
+ owner = self.denormalize_schema_name(
+ schema or self.default_schema_name
+ )
+ query = self._constraint_query(owner)
+
+ # since the result is cached a list must be created
+ values = list(
+ self._run_batches(
+ connection,
+ query,
+ dblink,
+ returns_long=False,
+ mappings=True,
+ all_objects=all_objects,
+ )
+ )
+ return values
+
+ @_handle_synonyms_decorator
+ def get_multi_pk_constraint(
+ self,
+ connection,
+ *,
+ scope,
+ schema,
+ filter_names,
+ kind,
+ dblink=None,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ all_objects = self._get_all_objects(
+ connection, schema, scope, kind, filter_names, dblink, **kw
+ )
+
+ primary_keys = defaultdict(dict)
+ default = ReflectionDefaults.pk_constraint
+
+ for row_dict in self._get_all_constraint_rows(
+ connection, schema, dblink, all_objects, **kw
+ ):
+ if row_dict["constraint_type"] != "P":
+ continue
+ table_name = self.normalize_name(row_dict["table_name"])
+ constraint_name = self.normalize_name(row_dict["constraint_name"])
+ column_name = self.normalize_name(row_dict["local_column"])
+
+ table_pk = primary_keys[(schema, table_name)]
+ if not table_pk:
+ table_pk["name"] = constraint_name
+ table_pk["constrained_columns"] = [column_name]
+ else:
+ table_pk["constrained_columns"].append(column_name)
+
+ return (
+ (key, primary_keys[key] if key in primary_keys else default())
+ for key in (
+ (schema, self.normalize_name(obj_name))
+ for obj_name in all_objects
+ )
+ )
+
+ @reflection.cache
+ def get_foreign_keys(
+ self,
+ connection,
+ table_name,
+ schema=None,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ data = self.get_multi_foreign_keys(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @_handle_synonyms_decorator
+ def get_multi_foreign_keys(
+ self,
+ connection,
+ *,
+ scope,
+ schema,
+ filter_names,
+ kind,
+ dblink=None,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ all_objects = self._get_all_objects(
+ connection, schema, scope, kind, filter_names, dblink, **kw
+ )
+
+ resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+
+ owner = self.denormalize_schema_name(
+ schema or self.default_schema_name
+ )
+
+ all_remote_owners = set()
+ fkeys = defaultdict(dict)
+
+ for row_dict in self._get_all_constraint_rows(
+ connection, schema, dblink, all_objects, **kw
+ ):
+ if row_dict["constraint_type"] != "R":
+ continue
+
+ table_name = self.normalize_name(row_dict["table_name"])
+ constraint_name = self.normalize_name(row_dict["constraint_name"])
+ table_fkey = fkeys[(schema, table_name)]
+
+ assert constraint_name is not None
+
+ local_column = self.normalize_name(row_dict["local_column"])
+ remote_table = self.normalize_name(row_dict["remote_table"])
+ remote_column = self.normalize_name(row_dict["remote_column"])
+ remote_owner_orig = row_dict["remote_owner"]
+ remote_owner = self.normalize_name(remote_owner_orig)
+ if remote_owner_orig is not None:
+ all_remote_owners.add(remote_owner_orig)
+
+ if remote_table is None:
+ # ticket 363
+ if dblink and not dblink.startswith("@"):
+ dblink = f"@{dblink}"
+ util.warn(
+ "Got 'None' querying 'table_name' from "
+ f"all_cons_columns{dblink or ''} - does the user have "
+ "proper rights to the table?"
+ )
+ continue
+
+ if constraint_name not in table_fkey:
+ table_fkey[constraint_name] = fkey = {
+ "name": constraint_name,
+ "constrained_columns": [],
+ "referred_schema": None,
+ "referred_table": remote_table,
+ "referred_columns": [],
+ "options": {},
+ }
+
+ if resolve_synonyms:
+ # will be removed below
+ fkey["_ref_schema"] = remote_owner
+
+ if schema is not None or remote_owner_orig != owner:
+ fkey["referred_schema"] = remote_owner
+
+ delete_rule = row_dict["delete_rule"]
+ if delete_rule != "NO ACTION":
+ fkey["options"]["ondelete"] = delete_rule
+
+ else:
+ fkey = table_fkey[constraint_name]
+
+ fkey["constrained_columns"].append(local_column)
+ fkey["referred_columns"].append(remote_column)
+
+ if resolve_synonyms and all_remote_owners:
+ query = select(
+ dictionary.all_synonyms.c.owner,
+ dictionary.all_synonyms.c.table_name,
+ dictionary.all_synonyms.c.table_owner,
+ dictionary.all_synonyms.c.synonym_name,
+ ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners))
+
+ result = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).mappings()
+
+ remote_owners_lut = {}
+ for row in result:
+ synonym_owner = self.normalize_name(row["owner"])
+ table_name = self.normalize_name(row["table_name"])
+
+ remote_owners_lut[(synonym_owner, table_name)] = (
+ row["table_owner"],
+ row["synonym_name"],
+ )
+
+ empty = (None, None)
+ for table_fkeys in fkeys.values():
+ for table_fkey in table_fkeys.values():
+ key = (
+ table_fkey.pop("_ref_schema"),
+ table_fkey["referred_table"],
+ )
+ remote_owner, syn_name = remote_owners_lut.get(key, empty)
+ if syn_name:
+ sn = self.normalize_name(syn_name)
+ table_fkey["referred_table"] = sn
+ if schema is not None or remote_owner != owner:
+ ro = self.normalize_name(remote_owner)
+ table_fkey["referred_schema"] = ro
+ else:
+ table_fkey["referred_schema"] = None
+ default = ReflectionDefaults.foreign_keys
+
+ return (
+ (key, list(fkeys[key].values()) if key in fkeys else default())
+ for key in (
+ (schema, self.normalize_name(obj_name))
+ for obj_name in all_objects
+ )
+ )
+
+ @reflection.cache
+ def get_unique_constraints(
+ self, connection, table_name, schema=None, **kw
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ data = self.get_multi_unique_constraints(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @_handle_synonyms_decorator
+ def get_multi_unique_constraints(
+ self,
+ connection,
+ *,
+ scope,
+ schema,
+ filter_names,
+ kind,
+ dblink=None,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ all_objects = self._get_all_objects(
+ connection, schema, scope, kind, filter_names, dblink, **kw
+ )
+
+ unique_cons = defaultdict(dict)
+
+ index_names = {
+ row_dict["index_name"]
+ for row_dict in self._get_indexes_rows(
+ connection, schema, dblink, all_objects, **kw
+ )
+ }
+
+ for row_dict in self._get_all_constraint_rows(
+ connection, schema, dblink, all_objects, **kw
+ ):
+ if row_dict["constraint_type"] != "U":
+ continue
+ table_name = self.normalize_name(row_dict["table_name"])
+ constraint_name_orig = row_dict["constraint_name"]
+ constraint_name = self.normalize_name(constraint_name_orig)
+ column_name = self.normalize_name(row_dict["local_column"])
+ table_uc = unique_cons[(schema, table_name)]
+
+ assert constraint_name is not None
+
+ if constraint_name not in table_uc:
+ table_uc[constraint_name] = uc = {
+ "name": constraint_name,
+ "column_names": [],
+ "duplicates_index": (
+ constraint_name
+ if constraint_name_orig in index_names
+ else None
+ ),
+ }
+ else:
+ uc = table_uc[constraint_name]
+
+ uc["column_names"].append(column_name)
+
+ default = ReflectionDefaults.unique_constraints
+
+ return (
+ (
+ key,
+ (
+ list(unique_cons[key].values())
+ if key in unique_cons
+ else default()
+ ),
+ )
+ for key in (
+ (schema, self.normalize_name(obj_name))
+ for obj_name in all_objects
+ )
+ )
+
+ @reflection.cache
+ def get_view_definition(
+ self,
+ connection,
+ view_name,
+ schema=None,
+ dblink=None,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ if kw.get("oracle_resolve_synonyms", False):
+ synonyms = self._get_synonyms(
+ connection, schema, filter_names=[view_name], dblink=dblink
+ )
+ if synonyms:
+ assert len(synonyms) == 1
+ row_dict = synonyms[0]
+ dblink = self.normalize_name(row_dict["db_link"])
+ schema = row_dict["table_owner"]
+ view_name = row_dict["table_name"]
+
+ name = self.denormalize_name(view_name)
+ owner = self.denormalize_schema_name(
+ schema or self.default_schema_name
+ )
+ query = (
+ select(dictionary.all_views.c.text)
+ .where(
+ dictionary.all_views.c.view_name == name,
+ dictionary.all_views.c.owner == owner,
+ )
+ .union_all(
+ select(dictionary.all_mviews.c.query).where(
+ dictionary.all_mviews.c.mview_name == name,
+ dictionary.all_mviews.c.owner == owner,
+ )
+ )
+ )
+
+ rp = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).scalar()
+ if rp is None:
+ raise exc.NoSuchTableError(
+ f"{schema}.{view_name}" if schema else view_name
+ )
+ else:
+ return rp
+
+ @reflection.cache
+ def get_check_constraints(
+ self, connection, table_name, schema=None, include_all=False, **kw
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ data = self.get_multi_check_constraints(
+ connection,
+ schema=schema,
+ filter_names=[table_name],
+ scope=ObjectScope.ANY,
+ include_all=include_all,
+ kind=ObjectKind.ANY,
+ **kw,
+ )
+ return self._value_or_raise(data, table_name, schema)
+
+ @_handle_synonyms_decorator
+ def get_multi_check_constraints(
+ self,
+ connection,
+ *,
+ schema,
+ filter_names,
+ dblink=None,
+ scope,
+ kind,
+ include_all=False,
+ **kw,
+ ):
+ """Supported kw arguments are: ``dblink`` to reflect via a db link;
+ ``oracle_resolve_synonyms`` to resolve names to synonyms
+ """
+ all_objects = self._get_all_objects(
+ connection, schema, scope, kind, filter_names, dblink, **kw
+ )
+
+ not_null = re.compile(r"..+?. IS NOT NULL$")
+
+ check_constraints = defaultdict(list)
+
+ for row_dict in self._get_all_constraint_rows(
+ connection, schema, dblink, all_objects, **kw
+ ):
+ if row_dict["constraint_type"] != "C":
+ continue
+ table_name = self.normalize_name(row_dict["table_name"])
+ constraint_name = self.normalize_name(row_dict["constraint_name"])
+ search_condition = row_dict["search_condition"]
+
+ table_checks = check_constraints[(schema, table_name)]
+ if constraint_name is not None and (
+ include_all or not not_null.match(search_condition)
+ ):
+ table_checks.append(
+ {"name": constraint_name, "sqltext": search_condition}
+ )
+
+ default = ReflectionDefaults.check_constraints
+
+ return (
+ (
+ key,
+ (
+ check_constraints[key]
+ if key in check_constraints
+ else default()
+ ),
+ )
+ for key in (
+ (schema, self.normalize_name(obj_name))
+ for obj_name in all_objects
+ )
+ )
+
+ def _list_dblinks(self, connection, dblink=None):
+ query = select(dictionary.all_db_links.c.db_link)
+ links = self._execute_reflection(
+ connection, query, dblink, returns_long=False
+ ).scalars()
+ return [self.normalize_name(link) for link in links]
+
+
+class _OuterJoinColumn(sql.ClauseElement):
+ __visit_name__ = "outer_join_column"
+
+ def __init__(self, column):
+ self.column = column