diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py | 3240 |
1 files changed, 3240 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py new file mode 100644 index 0000000..a548b34 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py @@ -0,0 +1,3240 @@ +# dialects/oracle/base.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + + +r""" +.. dialect:: oracle + :name: Oracle + :full_support: 18c + :normal_support: 11+ + :best_effort: 9+ + + +Auto Increment Behavior +----------------------- + +SQLAlchemy Table objects which include integer primary keys are usually +assumed to have "autoincrementing" behavior, meaning they can generate their +own primary key values upon INSERT. For use within Oracle, two options are +available, which are the use of IDENTITY columns (Oracle 12 and above only) +or the association of a SEQUENCE with the column. + +Specifying GENERATED AS IDENTITY (Oracle 12 and above) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Starting from version 12 Oracle can make use of identity columns using +the :class:`_sql.Identity` to specify the autoincrementing behavior:: + + t = Table('mytable', metadata, + Column('id', Integer, Identity(start=3), primary_key=True), + Column(...), ... + ) + +The CREATE TABLE for the above :class:`_schema.Table` object would be: + +.. sourcecode:: sql + + CREATE TABLE mytable ( + id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3), + ..., + PRIMARY KEY (id) + ) + +The :class:`_schema.Identity` object support many options to control the +"autoincrementing" behavior of the column, like the starting value, the +incrementing value, etc. +In addition to the standard options, Oracle supports setting +:paramref:`_schema.Identity.always` to ``None`` to use the default +generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports +setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL +in conjunction with a 'BY DEFAULT' identity column. + +Using a SEQUENCE (all Oracle versions) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Older version of Oracle had no "autoincrement" +feature, SQLAlchemy relies upon sequences to produce these values. With the +older Oracle versions, *a sequence must always be explicitly specified to +enable autoincrement*. This is divergent with the majority of documentation +examples which assume the usage of an autoincrement-capable database. To +specify sequences, use the sqlalchemy.schema.Sequence object which is passed +to a Column construct:: + + t = Table('mytable', metadata, + Column('id', Integer, Sequence('id_seq', start=1), primary_key=True), + Column(...), ... + ) + +This step is also required when using table reflection, i.e. autoload_with=engine:: + + t = Table('mytable', metadata, + Column('id', Integer, Sequence('id_seq', start=1), primary_key=True), + autoload_with=engine + ) + +.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct + in a :class:`_schema.Column` to specify the option of an autoincrementing + column. + +.. _oracle_isolation_level: + +Transaction Isolation Level / Autocommit +---------------------------------------- + +The Oracle database supports "READ COMMITTED" and "SERIALIZABLE" modes of +isolation. The AUTOCOMMIT isolation level is also supported by the cx_Oracle +dialect. + +To set using per-connection execution options:: + + connection = engine.connect() + connection = connection.execution_options( + isolation_level="AUTOCOMMIT" + ) + +For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle dialect sets the +level at the session level using ``ALTER SESSION``, which is reverted back +to its default setting when the connection is returned to the connection +pool. + +Valid values for ``isolation_level`` include: + +* ``READ COMMITTED`` +* ``AUTOCOMMIT`` +* ``SERIALIZABLE`` + +.. note:: The implementation for the + :meth:`_engine.Connection.get_isolation_level` method as implemented by the + Oracle dialect necessarily forces the start of a transaction using the + Oracle LOCAL_TRANSACTION_ID function; otherwise no level is normally + readable. + + Additionally, the :meth:`_engine.Connection.get_isolation_level` method will + raise an exception if the ``v$transaction`` view is not available due to + permissions or other reasons, which is a common occurrence in Oracle + installations. + + The cx_Oracle dialect attempts to call the + :meth:`_engine.Connection.get_isolation_level` method when the dialect makes + its first connection to the database in order to acquire the + "default"isolation level. This default level is necessary so that the level + can be reset on a connection after it has been temporarily modified using + :meth:`_engine.Connection.execution_options` method. In the common event + that the :meth:`_engine.Connection.get_isolation_level` method raises an + exception due to ``v$transaction`` not being readable as well as any other + database-related failure, the level is assumed to be "READ COMMITTED". No + warning is emitted for this initial first-connect condition as it is + expected to be a common restriction on Oracle databases. + +.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_oracle dialect + as well as the notion of a default isolation level + +.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live + reading of the isolation level. + +.. versionchanged:: 1.3.22 In the event that the default isolation + level cannot be read due to permissions on the v$transaction view as + is common in Oracle installations, the default isolation level is hardcoded + to "READ COMMITTED" which was the behavior prior to 1.3.21. + +.. seealso:: + + :ref:`dbapi_autocommit` + +Identifier Casing +----------------- + +In Oracle, the data dictionary represents all case insensitive identifier +names using UPPERCASE text. SQLAlchemy on the other hand considers an +all-lower case identifier name to be case insensitive. The Oracle dialect +converts all case insensitive identifiers to and from those two formats during +schema level communication, such as reflection of tables and indexes. Using +an UPPERCASE name on the SQLAlchemy side indicates a case sensitive +identifier, and SQLAlchemy will quote the name - this will cause mismatches +against data dictionary data received from Oracle, so unless identifier names +have been truly created as case sensitive (i.e. using quoted names), all +lowercase names should be used on the SQLAlchemy side. + +.. _oracle_max_identifier_lengths: + +Max Identifier Lengths +---------------------- + +Oracle has changed the default max identifier length as of Oracle Server +version 12.2. Prior to this version, the length was 30, and for 12.2 and +greater it is now 128. This change impacts SQLAlchemy in the area of +generated SQL label names as well as the generation of constraint names, +particularly in the case where the constraint naming convention feature +described at :ref:`constraint_naming_conventions` is being used. + +To assist with this change and others, Oracle includes the concept of a +"compatibility" version, which is a version number that is independent of the +actual server version in order to assist with migration of Oracle databases, +and may be configured within the Oracle server itself. This compatibility +version is retrieved using the query ``SELECT value FROM v$parameter WHERE +name = 'compatible';``. The SQLAlchemy Oracle dialect, when tasked with +determining the default max identifier length, will attempt to use this query +upon first connect in order to determine the effective compatibility version of +the server, which determines what the maximum allowed identifier length is for +the server. If the table is not available, the server version information is +used instead. + +As of SQLAlchemy 1.4, the default max identifier length for the Oracle dialect +is 128 characters. Upon first connect, the compatibility version is detected +and if it is less than Oracle version 12.2, the max identifier length is +changed to be 30 characters. In all cases, setting the +:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this +change and the value given will be used as is:: + + engine = create_engine( + "oracle+cx_oracle://scott:tiger@oracle122", + max_identifier_length=30) + +The maximum identifier length comes into play both when generating anonymized +SQL labels in SELECT statements, but more crucially when generating constraint +names from a naming convention. It is this area that has created the need for +SQLAlchemy to change this default conservatively. For example, the following +naming convention produces two very different constraint names based on the +identifier length:: + + from sqlalchemy import Column + from sqlalchemy import Index + from sqlalchemy import Integer + from sqlalchemy import MetaData + from sqlalchemy import Table + from sqlalchemy.dialects import oracle + from sqlalchemy.schema import CreateIndex + + m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"}) + + t = Table( + "t", + m, + Column("some_column_name_1", Integer), + Column("some_column_name_2", Integer), + Column("some_column_name_3", Integer), + ) + + ix = Index( + None, + t.c.some_column_name_1, + t.c.some_column_name_2, + t.c.some_column_name_3, + ) + + oracle_dialect = oracle.dialect(max_identifier_length=30) + print(CreateIndex(ix).compile(dialect=oracle_dialect)) + +With an identifier length of 30, the above CREATE INDEX looks like:: + + CREATE INDEX ix_some_column_name_1s_70cd ON t + (some_column_name_1, some_column_name_2, some_column_name_3) + +However with length=128, it becomes:: + + CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t + (some_column_name_1, some_column_name_2, some_column_name_3) + +Applications which have run versions of SQLAlchemy prior to 1.4 on an Oracle +server version 12.2 or greater are therefore subject to the scenario of a +database migration that wishes to "DROP CONSTRAINT" on a name that was +previously generated with the shorter length. This migration will fail when +the identifier length is changed without the name of the index or constraint +first being adjusted. Such applications are strongly advised to make use of +:paramref:`_sa.create_engine.max_identifier_length` +in order to maintain control +of the generation of truncated names, and to fully review and test all database +migrations in a staging environment when changing this value to ensure that the +impact of this change has been mitigated. + +.. versionchanged:: 1.4 the default max_identifier_length for Oracle is 128 + characters, which is adjusted down to 30 upon first connect if an older + version of Oracle server (compatibility version < 12.2) is detected. + + +LIMIT/OFFSET/FETCH Support +-------------------------- + +Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make +use of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming +Oracle 12c or above, and assuming the SELECT statement is not embedded within +a compound statement like UNION. This syntax is also available directly by using +the :meth:`_sql.Select.fetch` method. + +.. versionchanged:: 2.0 the Oracle dialect now uses + ``FETCH FIRST N ROW / OFFSET N ROWS`` for all + :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` usage including + within the ORM and legacy :class:`_orm.Query`. To force the legacy + behavior using window functions, specify the ``enable_offset_fetch=False`` + dialect parameter to :func:`_sa.create_engine`. + +The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle version +by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`, which +will force the use of "legacy" mode that makes use of window functions. +This mode is also selected automatically when using a version of Oracle +prior to 12c. + +When using legacy mode, or when a :class:`.Select` statement +with limit/offset is embedded in a compound statement, an emulated approach for +LIMIT / OFFSET based on window functions is used, which involves creation of a +subquery using ``ROW_NUMBER`` that is prone to performance issues as well as +SQL construction issues for complex statements. However, this approach is +supported by all Oracle versions. See notes below. + +Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the +ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an +Oracle version prior to 12c, the following notes apply: + +* SQLAlchemy currently makes use of ROWNUM to achieve + LIMIT/OFFSET; the exact methodology is taken from + https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results . + +* the "FIRST_ROWS()" optimization keyword is not used by default. To enable + the usage of this optimization directive, specify ``optimize_limits=True`` + to :func:`_sa.create_engine`. + + .. versionchanged:: 1.4 + The Oracle dialect renders limit/offset integer values using a "post + compile" scheme which renders the integer directly before passing the + statement to the cursor for execution. The ``use_binds_for_limits`` flag + no longer has an effect. + + .. seealso:: + + :ref:`change_4808`. + +.. _oracle_returning: + +RETURNING Support +----------------- + +The Oracle database supports RETURNING fully for INSERT, UPDATE and DELETE +statements that are invoked with a single collection of bound parameters +(that is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally +support RETURNING with :term:`executemany` statements). Multiple rows may be +returned as well. + +.. versionchanged:: 2.0 the Oracle backend has full support for RETURNING + on parity with other backends. + + + +ON UPDATE CASCADE +----------------- + +Oracle doesn't have native ON UPDATE CASCADE functionality. A trigger based +solution is available at +https://asktom.oracle.com/tkyte/update_cascade/index.html . + +When using the SQLAlchemy ORM, the ORM has limited ability to manually issue +cascading updates - specify ForeignKey objects using the +"deferrable=True, initially='deferred'" keyword arguments, +and specify "passive_updates=False" on each relationship(). + +Oracle 8 Compatibility +---------------------- + +.. warning:: The status of Oracle 8 compatibility is not known for SQLAlchemy + 2.0. + +When Oracle 8 is detected, the dialect internally configures itself to the +following behaviors: + +* the use_ansi flag is set to False. This has the effect of converting all + JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN + makes use of Oracle's (+) operator. + +* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when + the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued + instead. This because these types don't seem to work correctly on Oracle 8 + even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and + :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate + NVARCHAR2 and NCLOB. + + +Synonym/DBLINK Reflection +------------------------- + +When using reflection with Table objects, the dialect can optionally search +for tables indicated by synonyms, either in local or remote schemas or +accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as +a keyword argument to the :class:`_schema.Table` construct:: + + some_table = Table('some_table', autoload_with=some_engine, + oracle_resolve_synonyms=True) + +When this flag is set, the given name (such as ``some_table`` above) will +be searched not just in the ``ALL_TABLES`` view, but also within the +``ALL_SYNONYMS`` view to see if this name is actually a synonym to another +name. If the synonym is located and refers to a DBLINK, the oracle dialect +knows how to locate the table's information using DBLINK syntax(e.g. +``@dblink``). + +``oracle_resolve_synonyms`` is accepted wherever reflection arguments are +accepted, including methods such as :meth:`_schema.MetaData.reflect` and +:meth:`_reflection.Inspector.get_columns`. + +If synonyms are not in use, this flag should be left disabled. + +.. _oracle_constraint_reflection: + +Constraint Reflection +--------------------- + +The Oracle dialect can return information about foreign key, unique, and +CHECK constraints, as well as indexes on tables. + +Raw information regarding these constraints can be acquired using +:meth:`_reflection.Inspector.get_foreign_keys`, +:meth:`_reflection.Inspector.get_unique_constraints`, +:meth:`_reflection.Inspector.get_check_constraints`, and +:meth:`_reflection.Inspector.get_indexes`. + +.. versionchanged:: 1.2 The Oracle dialect can now reflect UNIQUE and + CHECK constraints. + +When using reflection at the :class:`_schema.Table` level, the +:class:`_schema.Table` +will also include these constraints. + +Note the following caveats: + +* When using the :meth:`_reflection.Inspector.get_check_constraints` method, + Oracle + builds a special "IS NOT NULL" constraint for columns that specify + "NOT NULL". This constraint is **not** returned by default; to include + the "IS NOT NULL" constraints, pass the flag ``include_all=True``:: + + from sqlalchemy import create_engine, inspect + + engine = create_engine("oracle+cx_oracle://s:t@dsn") + inspector = inspect(engine) + all_check_constraints = inspector.get_check_constraints( + "some_table", include_all=True) + +* in most cases, when reflecting a :class:`_schema.Table`, + a UNIQUE constraint will + **not** be available as a :class:`.UniqueConstraint` object, as Oracle + mirrors unique constraints with a UNIQUE index in most cases (the exception + seems to be when two or more unique constraints represent the same columns); + the :class:`_schema.Table` will instead represent these using + :class:`.Index` + with the ``unique=True`` flag set. + +* Oracle creates an implicit index for the primary key of a table; this index + is **excluded** from all index results. + +* the list of columns reflected for an index will not include column names + that start with SYS_NC. + +Table names with SYSTEM/SYSAUX tablespaces +------------------------------------------- + +The :meth:`_reflection.Inspector.get_table_names` and +:meth:`_reflection.Inspector.get_temp_table_names` +methods each return a list of table names for the current engine. These methods +are also part of the reflection which occurs within an operation such as +:meth:`_schema.MetaData.reflect`. By default, +these operations exclude the ``SYSTEM`` +and ``SYSAUX`` tablespaces from the operation. In order to change this, the +default list of tablespaces excluded can be changed at the engine level using +the ``exclude_tablespaces`` parameter:: + + # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM + e = create_engine( + "oracle+cx_oracle://scott:tiger@xe", + exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"]) + +DateTime Compatibility +---------------------- + +Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``, +which can actually store a date and time value. For this reason, the Oracle +dialect provides a type :class:`_oracle.DATE` which is a subclass of +:class:`.DateTime`. This type has no special behavior, and is only +present as a "marker" for this type; additionally, when a database column +is reflected and the type is reported as ``DATE``, the time-supporting +:class:`_oracle.DATE` type is used. + +.. _oracle_table_options: + +Oracle Table Options +------------------------- + +The CREATE TABLE phrase supports the following options with Oracle +in conjunction with the :class:`_schema.Table` construct: + + +* ``ON COMMIT``:: + + Table( + "some_table", metadata, ..., + prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS') + +* ``COMPRESS``:: + + Table('mytable', metadata, Column('data', String(32)), + oracle_compress=True) + + Table('mytable', metadata, Column('data', String(32)), + oracle_compress=6) + + The ``oracle_compress`` parameter accepts either an integer compression + level, or ``True`` to use the default compression level. + +.. _oracle_index_options: + +Oracle Specific Index Options +----------------------------- + +Bitmap Indexes +~~~~~~~~~~~~~~ + +You can specify the ``oracle_bitmap`` parameter to create a bitmap index +instead of a B-tree index:: + + Index('my_index', my_table.c.data, oracle_bitmap=True) + +Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not +check for such limitations, only the database will. + +Index compression +~~~~~~~~~~~~~~~~~ + +Oracle has a more efficient storage mode for indexes containing lots of +repeated values. Use the ``oracle_compress`` parameter to turn on key +compression:: + + Index('my_index', my_table.c.data, oracle_compress=True) + + Index('my_index', my_table.c.data1, my_table.c.data2, unique=True, + oracle_compress=1) + +The ``oracle_compress`` parameter accepts either an integer specifying the +number of prefix columns to compress, or ``True`` to use the default (all +columns for non-unique indexes, all but the last column for unique indexes). + +""" # noqa + +from __future__ import annotations + +from collections import defaultdict +from functools import lru_cache +from functools import wraps +import re + +from . import dictionary +from .types import _OracleBoolean +from .types import _OracleDate +from .types import BFILE +from .types import BINARY_DOUBLE +from .types import BINARY_FLOAT +from .types import DATE +from .types import FLOAT +from .types import INTERVAL +from .types import LONG +from .types import NCLOB +from .types import NUMBER +from .types import NVARCHAR2 # noqa +from .types import OracleRaw # noqa +from .types import RAW +from .types import ROWID # noqa +from .types import TIMESTAMP +from .types import VARCHAR2 # noqa +from ... import Computed +from ... import exc +from ... import schema as sa_schema +from ... import sql +from ... import util +from ...engine import default +from ...engine import ObjectKind +from ...engine import ObjectScope +from ...engine import reflection +from ...engine.reflection import ReflectionDefaults +from ...sql import and_ +from ...sql import bindparam +from ...sql import compiler +from ...sql import expression +from ...sql import func +from ...sql import null +from ...sql import or_ +from ...sql import select +from ...sql import sqltypes +from ...sql import util as sql_util +from ...sql import visitors +from ...sql.visitors import InternalTraversal +from ...types import BLOB +from ...types import CHAR +from ...types import CLOB +from ...types import DOUBLE_PRECISION +from ...types import INTEGER +from ...types import NCHAR +from ...types import NVARCHAR +from ...types import REAL +from ...types import VARCHAR + +RESERVED_WORDS = set( + "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN " + "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED " + "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE " + "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE " + "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES " + "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS " + "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER " + "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR " + "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split() +) + +NO_ARG_FNS = set( + "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split() +) + + +colspecs = { + sqltypes.Boolean: _OracleBoolean, + sqltypes.Interval: INTERVAL, + sqltypes.DateTime: DATE, + sqltypes.Date: _OracleDate, +} + +ischema_names = { + "VARCHAR2": VARCHAR, + "NVARCHAR2": NVARCHAR, + "CHAR": CHAR, + "NCHAR": NCHAR, + "DATE": DATE, + "NUMBER": NUMBER, + "BLOB": BLOB, + "BFILE": BFILE, + "CLOB": CLOB, + "NCLOB": NCLOB, + "TIMESTAMP": TIMESTAMP, + "TIMESTAMP WITH TIME ZONE": TIMESTAMP, + "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP, + "INTERVAL DAY TO SECOND": INTERVAL, + "RAW": RAW, + "FLOAT": FLOAT, + "DOUBLE PRECISION": DOUBLE_PRECISION, + "REAL": REAL, + "LONG": LONG, + "BINARY_DOUBLE": BINARY_DOUBLE, + "BINARY_FLOAT": BINARY_FLOAT, + "ROWID": ROWID, +} + + +class OracleTypeCompiler(compiler.GenericTypeCompiler): + # Note: + # Oracle DATE == DATETIME + # Oracle does not allow milliseconds in DATE + # Oracle does not support TIME columns + + def visit_datetime(self, type_, **kw): + return self.visit_DATE(type_, **kw) + + def visit_float(self, type_, **kw): + return self.visit_FLOAT(type_, **kw) + + def visit_double(self, type_, **kw): + return self.visit_DOUBLE_PRECISION(type_, **kw) + + def visit_unicode(self, type_, **kw): + if self.dialect._use_nchar_for_unicode: + return self.visit_NVARCHAR2(type_, **kw) + else: + return self.visit_VARCHAR2(type_, **kw) + + def visit_INTERVAL(self, type_, **kw): + return "INTERVAL DAY%s TO SECOND%s" % ( + type_.day_precision is not None + and "(%d)" % type_.day_precision + or "", + type_.second_precision is not None + and "(%d)" % type_.second_precision + or "", + ) + + def visit_LONG(self, type_, **kw): + return "LONG" + + def visit_TIMESTAMP(self, type_, **kw): + if getattr(type_, "local_timezone", False): + return "TIMESTAMP WITH LOCAL TIME ZONE" + elif type_.timezone: + return "TIMESTAMP WITH TIME ZONE" + else: + return "TIMESTAMP" + + def visit_DOUBLE_PRECISION(self, type_, **kw): + return self._generate_numeric(type_, "DOUBLE PRECISION", **kw) + + def visit_BINARY_DOUBLE(self, type_, **kw): + return self._generate_numeric(type_, "BINARY_DOUBLE", **kw) + + def visit_BINARY_FLOAT(self, type_, **kw): + return self._generate_numeric(type_, "BINARY_FLOAT", **kw) + + def visit_FLOAT(self, type_, **kw): + kw["_requires_binary_precision"] = True + return self._generate_numeric(type_, "FLOAT", **kw) + + def visit_NUMBER(self, type_, **kw): + return self._generate_numeric(type_, "NUMBER", **kw) + + def _generate_numeric( + self, + type_, + name, + precision=None, + scale=None, + _requires_binary_precision=False, + **kw, + ): + if precision is None: + precision = getattr(type_, "precision", None) + + if _requires_binary_precision: + binary_precision = getattr(type_, "binary_precision", None) + + if precision and binary_precision is None: + # https://www.oracletutorial.com/oracle-basics/oracle-float/ + estimated_binary_precision = int(precision / 0.30103) + raise exc.ArgumentError( + "Oracle FLOAT types use 'binary precision', which does " + "not convert cleanly from decimal 'precision'. Please " + "specify " + f"this type with a separate Oracle variant, such as " + f"{type_.__class__.__name__}(precision={precision})." + f"with_variant(oracle.FLOAT" + f"(binary_precision=" + f"{estimated_binary_precision}), 'oracle'), so that the " + "Oracle specific 'binary_precision' may be specified " + "accurately." + ) + else: + precision = binary_precision + + if scale is None: + scale = getattr(type_, "scale", None) + + if precision is None: + return name + elif scale is None: + n = "%(name)s(%(precision)s)" + return n % {"name": name, "precision": precision} + else: + n = "%(name)s(%(precision)s, %(scale)s)" + return n % {"name": name, "precision": precision, "scale": scale} + + def visit_string(self, type_, **kw): + return self.visit_VARCHAR2(type_, **kw) + + def visit_VARCHAR2(self, type_, **kw): + return self._visit_varchar(type_, "", "2") + + def visit_NVARCHAR2(self, type_, **kw): + return self._visit_varchar(type_, "N", "2") + + visit_NVARCHAR = visit_NVARCHAR2 + + def visit_VARCHAR(self, type_, **kw): + return self._visit_varchar(type_, "", "") + + def _visit_varchar(self, type_, n, num): + if not type_.length: + return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n} + elif not n and self.dialect._supports_char_length: + varchar = "VARCHAR%(two)s(%(length)s CHAR)" + return varchar % {"length": type_.length, "two": num} + else: + varchar = "%(n)sVARCHAR%(two)s(%(length)s)" + return varchar % {"length": type_.length, "two": num, "n": n} + + def visit_text(self, type_, **kw): + return self.visit_CLOB(type_, **kw) + + def visit_unicode_text(self, type_, **kw): + if self.dialect._use_nchar_for_unicode: + return self.visit_NCLOB(type_, **kw) + else: + return self.visit_CLOB(type_, **kw) + + def visit_large_binary(self, type_, **kw): + return self.visit_BLOB(type_, **kw) + + def visit_big_integer(self, type_, **kw): + return self.visit_NUMBER(type_, precision=19, **kw) + + def visit_boolean(self, type_, **kw): + return self.visit_SMALLINT(type_, **kw) + + def visit_RAW(self, type_, **kw): + if type_.length: + return "RAW(%(length)s)" % {"length": type_.length} + else: + return "RAW" + + def visit_ROWID(self, type_, **kw): + return "ROWID" + + +class OracleCompiler(compiler.SQLCompiler): + """Oracle compiler modifies the lexical structure of Select + statements to work under non-ANSI configured Oracle databases, if + the use_ansi flag is False. + """ + + compound_keywords = util.update_copy( + compiler.SQLCompiler.compound_keywords, + {expression.CompoundSelect.EXCEPT: "MINUS"}, + ) + + def __init__(self, *args, **kwargs): + self.__wheres = {} + super().__init__(*args, **kwargs) + + def visit_mod_binary(self, binary, operator, **kw): + return "mod(%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def visit_now_func(self, fn, **kw): + return "CURRENT_TIMESTAMP" + + def visit_char_length_func(self, fn, **kw): + return "LENGTH" + self.function_argspec(fn, **kw) + + def visit_match_op_binary(self, binary, operator, **kw): + return "CONTAINS (%s, %s)" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_true(self, expr, **kw): + return "1" + + def visit_false(self, expr, **kw): + return "0" + + def get_cte_preamble(self, recursive): + return "WITH" + + def get_select_hint_text(self, byfroms): + return " ".join("/*+ %s */" % text for table, text in byfroms.items()) + + def function_argspec(self, fn, **kw): + if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS: + return compiler.SQLCompiler.function_argspec(self, fn, **kw) + else: + return "" + + def visit_function(self, func, **kw): + text = super().visit_function(func, **kw) + if kw.get("asfrom", False): + text = "TABLE (%s)" % text + return text + + def visit_table_valued_column(self, element, **kw): + text = super().visit_table_valued_column(element, **kw) + text = text + ".COLUMN_VALUE" + return text + + def default_from(self): + """Called when a ``SELECT`` statement has no froms, + and no ``FROM`` clause is to be appended. + + The Oracle compiler tacks a "FROM DUAL" to the statement. + """ + + return " FROM DUAL" + + def visit_join(self, join, from_linter=None, **kwargs): + if self.dialect.use_ansi: + return compiler.SQLCompiler.visit_join( + self, join, from_linter=from_linter, **kwargs + ) + else: + if from_linter: + from_linter.edges.add((join.left, join.right)) + + kwargs["asfrom"] = True + if isinstance(join.right, expression.FromGrouping): + right = join.right.element + else: + right = join.right + return ( + self.process(join.left, from_linter=from_linter, **kwargs) + + ", " + + self.process(right, from_linter=from_linter, **kwargs) + ) + + def _get_nonansi_join_whereclause(self, froms): + clauses = [] + + def visit_join(join): + if join.isouter: + # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354 + # "apply the outer join operator (+) to all columns of B in + # the join condition in the WHERE clause" - that is, + # unconditionally regardless of operator or the other side + def visit_binary(binary): + if isinstance( + binary.left, expression.ColumnClause + ) and join.right.is_derived_from(binary.left.table): + binary.left = _OuterJoinColumn(binary.left) + elif isinstance( + binary.right, expression.ColumnClause + ) and join.right.is_derived_from(binary.right.table): + binary.right = _OuterJoinColumn(binary.right) + + clauses.append( + visitors.cloned_traverse( + join.onclause, {}, {"binary": visit_binary} + ) + ) + else: + clauses.append(join.onclause) + + for j in join.left, join.right: + if isinstance(j, expression.Join): + visit_join(j) + elif isinstance(j, expression.FromGrouping): + visit_join(j.element) + + for f in froms: + if isinstance(f, expression.Join): + visit_join(f) + + if not clauses: + return None + else: + return sql.and_(*clauses) + + def visit_outer_join_column(self, vc, **kw): + return self.process(vc.column, **kw) + "(+)" + + def visit_sequence(self, seq, **kw): + return self.preparer.format_sequence(seq) + ".nextval" + + def get_render_as_alias_suffix(self, alias_name_text): + """Oracle doesn't like ``FROM table AS alias``""" + + return " " + alias_name_text + + def returning_clause( + self, stmt, returning_cols, *, populate_result_map, **kw + ): + columns = [] + binds = [] + + for i, column in enumerate( + expression._select_iterables(returning_cols) + ): + if ( + self.isupdate + and isinstance(column, sa_schema.Column) + and isinstance(column.server_default, Computed) + and not self.dialect._supports_update_returning_computed_cols + ): + util.warn( + "Computed columns don't work with Oracle UPDATE " + "statements that use RETURNING; the value of the column " + "*before* the UPDATE takes place is returned. It is " + "advised to not use RETURNING with an Oracle computed " + "column. Consider setting implicit_returning to False on " + "the Table object in order to avoid implicit RETURNING " + "clauses from being generated for this Table." + ) + if column.type._has_column_expression: + col_expr = column.type.column_expression(column) + else: + col_expr = column + + outparam = sql.outparam("ret_%d" % i, type_=column.type) + self.binds[outparam.key] = outparam + binds.append( + self.bindparam_string(self._truncate_bindparam(outparam)) + ) + + # has_out_parameters would in a normal case be set to True + # as a result of the compiler visiting an outparam() object. + # in this case, the above outparam() objects are not being + # visited. Ensure the statement itself didn't have other + # outparam() objects independently. + # technically, this could be supported, but as it would be + # a very strange use case without a clear rationale, disallow it + if self.has_out_parameters: + raise exc.InvalidRequestError( + "Using explicit outparam() objects with " + "UpdateBase.returning() in the same Core DML statement " + "is not supported in the Oracle dialect." + ) + + self._oracle_returning = True + + columns.append(self.process(col_expr, within_columns_clause=False)) + if populate_result_map: + self._add_to_result_map( + getattr(col_expr, "name", col_expr._anon_name_label), + getattr(col_expr, "name", col_expr._anon_name_label), + ( + column, + getattr(column, "name", None), + getattr(column, "key", None), + ), + column.type, + ) + + return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds) + + def _row_limit_clause(self, select, **kw): + """ORacle 12c supports OFFSET/FETCH operators + Use it instead subquery with row_number + + """ + + if ( + select._fetch_clause is not None + or not self.dialect._supports_offset_fetch + ): + return super()._row_limit_clause( + select, use_literal_execute_for_simple_int=True, **kw + ) + else: + return self.fetch_clause( + select, + fetch_clause=self._get_limit_or_fetch(select), + use_literal_execute_for_simple_int=True, + **kw, + ) + + def _get_limit_or_fetch(self, select): + if select._fetch_clause is None: + return select._limit_clause + else: + return select._fetch_clause + + def translate_select_structure(self, select_stmt, **kwargs): + select = select_stmt + + if not getattr(select, "_oracle_visit", None): + if not self.dialect.use_ansi: + froms = self._display_froms_for_select( + select, kwargs.get("asfrom", False) + ) + whereclause = self._get_nonansi_join_whereclause(froms) + if whereclause is not None: + select = select.where(whereclause) + select._oracle_visit = True + + # if fetch is used this is not needed + if ( + select._has_row_limiting_clause + and not self.dialect._supports_offset_fetch + and select._fetch_clause is None + ): + limit_clause = select._limit_clause + offset_clause = select._offset_clause + + if select._simple_int_clause(limit_clause): + limit_clause = limit_clause.render_literal_execute() + + if select._simple_int_clause(offset_clause): + offset_clause = offset_clause.render_literal_execute() + + # currently using form at: + # https://blogs.oracle.com/oraclemagazine/\ + # on-rownum-and-limiting-results + + orig_select = select + select = select._generate() + select._oracle_visit = True + + # add expressions to accommodate FOR UPDATE OF + for_update = select._for_update_arg + if for_update is not None and for_update.of: + for_update = for_update._clone() + for_update._copy_internals() + + for elem in for_update.of: + if not select.selected_columns.contains_column(elem): + select = select.add_columns(elem) + + # Wrap the middle select and add the hint + inner_subquery = select.alias() + limitselect = sql.select( + *[ + c + for c in inner_subquery.c + if orig_select.selected_columns.corresponding_column(c) + is not None + ] + ) + + if ( + limit_clause is not None + and self.dialect.optimize_limits + and select._simple_int_clause(limit_clause) + ): + limitselect = limitselect.prefix_with( + expression.text( + "/*+ FIRST_ROWS(%s) */" + % self.process(limit_clause, **kwargs) + ) + ) + + limitselect._oracle_visit = True + limitselect._is_wrapper = True + + # add expressions to accommodate FOR UPDATE OF + if for_update is not None and for_update.of: + adapter = sql_util.ClauseAdapter(inner_subquery) + for_update.of = [ + adapter.traverse(elem) for elem in for_update.of + ] + + # If needed, add the limiting clause + if limit_clause is not None: + if select._simple_int_clause(limit_clause) and ( + offset_clause is None + or select._simple_int_clause(offset_clause) + ): + max_row = limit_clause + + if offset_clause is not None: + max_row = max_row + offset_clause + + else: + max_row = limit_clause + + if offset_clause is not None: + max_row = max_row + offset_clause + limitselect = limitselect.where( + sql.literal_column("ROWNUM") <= max_row + ) + + # If needed, add the ora_rn, and wrap again with offset. + if offset_clause is None: + limitselect._for_update_arg = for_update + select = limitselect + else: + limitselect = limitselect.add_columns( + sql.literal_column("ROWNUM").label("ora_rn") + ) + limitselect._oracle_visit = True + limitselect._is_wrapper = True + + if for_update is not None and for_update.of: + limitselect_cols = limitselect.selected_columns + for elem in for_update.of: + if ( + limitselect_cols.corresponding_column(elem) + is None + ): + limitselect = limitselect.add_columns(elem) + + limit_subquery = limitselect.alias() + origselect_cols = orig_select.selected_columns + offsetselect = sql.select( + *[ + c + for c in limit_subquery.c + if origselect_cols.corresponding_column(c) + is not None + ] + ) + + offsetselect._oracle_visit = True + offsetselect._is_wrapper = True + + if for_update is not None and for_update.of: + adapter = sql_util.ClauseAdapter(limit_subquery) + for_update.of = [ + adapter.traverse(elem) for elem in for_update.of + ] + + offsetselect = offsetselect.where( + sql.literal_column("ora_rn") > offset_clause + ) + + offsetselect._for_update_arg = for_update + select = offsetselect + + return select + + def limit_clause(self, select, **kw): + return "" + + def visit_empty_set_expr(self, type_, **kw): + return "SELECT 1 FROM DUAL WHERE 1!=1" + + def for_update_clause(self, select, **kw): + if self.is_subquery(): + return "" + + tmp = " FOR UPDATE" + + if select._for_update_arg.of: + tmp += " OF " + ", ".join( + self.process(elem, **kw) for elem in select._for_update_arg.of + ) + + if select._for_update_arg.nowait: + tmp += " NOWAIT" + if select._for_update_arg.skip_locked: + tmp += " SKIP LOCKED" + + return tmp + + def visit_is_distinct_from_binary(self, binary, operator, **kw): + return "DECODE(%s, %s, 0, 1) = 1" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_is_not_distinct_from_binary(self, binary, operator, **kw): + return "DECODE(%s, %s, 0, 1) = 0" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_regexp_match_op_binary(self, binary, operator, **kw): + string = self.process(binary.left, **kw) + pattern = self.process(binary.right, **kw) + flags = binary.modifiers["flags"] + if flags is None: + return "REGEXP_LIKE(%s, %s)" % (string, pattern) + else: + return "REGEXP_LIKE(%s, %s, %s)" % ( + string, + pattern, + self.render_literal_value(flags, sqltypes.STRINGTYPE), + ) + + def visit_not_regexp_match_op_binary(self, binary, operator, **kw): + return "NOT %s" % self.visit_regexp_match_op_binary( + binary, operator, **kw + ) + + def visit_regexp_replace_op_binary(self, binary, operator, **kw): + string = self.process(binary.left, **kw) + pattern_replace = self.process(binary.right, **kw) + flags = binary.modifiers["flags"] + if flags is None: + return "REGEXP_REPLACE(%s, %s)" % ( + string, + pattern_replace, + ) + else: + return "REGEXP_REPLACE(%s, %s, %s)" % ( + string, + pattern_replace, + self.render_literal_value(flags, sqltypes.STRINGTYPE), + ) + + def visit_aggregate_strings_func(self, fn, **kw): + return "LISTAGG%s" % self.function_argspec(fn, **kw) + + +class OracleDDLCompiler(compiler.DDLCompiler): + def define_constraint_cascades(self, constraint): + text = "" + if constraint.ondelete is not None: + text += " ON DELETE %s" % constraint.ondelete + + # oracle has no ON UPDATE CASCADE - + # its only available via triggers + # https://asktom.oracle.com/tkyte/update_cascade/index.html + if constraint.onupdate is not None: + util.warn( + "Oracle does not contain native UPDATE CASCADE " + "functionality - onupdates will not be rendered for foreign " + "keys. Consider using deferrable=True, initially='deferred' " + "or triggers." + ) + + return text + + def visit_drop_table_comment(self, drop, **kw): + return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table( + drop.element + ) + + def visit_create_index(self, create, **kw): + index = create.element + self._verify_index_table(index) + preparer = self.preparer + text = "CREATE " + if index.unique: + text += "UNIQUE " + if index.dialect_options["oracle"]["bitmap"]: + text += "BITMAP " + text += "INDEX %s ON %s (%s)" % ( + self._prepared_index_name(index, include_schema=True), + preparer.format_table(index.table, use_schema=True), + ", ".join( + self.sql_compiler.process( + expr, include_table=False, literal_binds=True + ) + for expr in index.expressions + ), + ) + if index.dialect_options["oracle"]["compress"] is not False: + if index.dialect_options["oracle"]["compress"] is True: + text += " COMPRESS" + else: + text += " COMPRESS %d" % ( + index.dialect_options["oracle"]["compress"] + ) + return text + + def post_create_table(self, table): + table_opts = [] + opts = table.dialect_options["oracle"] + + if opts["on_commit"]: + on_commit_options = opts["on_commit"].replace("_", " ").upper() + table_opts.append("\n ON COMMIT %s" % on_commit_options) + + if opts["compress"]: + if opts["compress"] is True: + table_opts.append("\n COMPRESS") + else: + table_opts.append("\n COMPRESS FOR %s" % (opts["compress"])) + + return "".join(table_opts) + + def get_identity_options(self, identity_options): + text = super().get_identity_options(identity_options) + text = text.replace("NO MINVALUE", "NOMINVALUE") + text = text.replace("NO MAXVALUE", "NOMAXVALUE") + text = text.replace("NO CYCLE", "NOCYCLE") + if identity_options.order is not None: + text += " ORDER" if identity_options.order else " NOORDER" + return text.strip() + + def visit_computed_column(self, generated, **kw): + text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( + generated.sqltext, include_table=False, literal_binds=True + ) + if generated.persisted is True: + raise exc.CompileError( + "Oracle computed columns do not support 'stored' persistence; " + "set the 'persisted' flag to None or False for Oracle support." + ) + elif generated.persisted is False: + text += " VIRTUAL" + return text + + def visit_identity_column(self, identity, **kw): + if identity.always is None: + kind = "" + else: + kind = "ALWAYS" if identity.always else "BY DEFAULT" + text = "GENERATED %s" % kind + if identity.on_null: + text += " ON NULL" + text += " AS IDENTITY" + options = self.get_identity_options(identity) + if options: + text += " (%s)" % options + return text + + +class OracleIdentifierPreparer(compiler.IdentifierPreparer): + reserved_words = {x.lower() for x in RESERVED_WORDS} + illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union( + ["_", "$"] + ) + + def _bindparam_requires_quotes(self, value): + """Return True if the given identifier requires quoting.""" + lc_value = value.lower() + return ( + lc_value in self.reserved_words + or value[0] in self.illegal_initial_characters + or not self.legal_characters.match(str(value)) + ) + + def format_savepoint(self, savepoint): + name = savepoint.ident.lstrip("_") + return super().format_savepoint(savepoint, name) + + +class OracleExecutionContext(default.DefaultExecutionContext): + def fire_sequence(self, seq, type_): + return self._execute_scalar( + "SELECT " + + self.identifier_preparer.format_sequence(seq) + + ".nextval FROM DUAL", + type_, + ) + + def pre_exec(self): + if self.statement and "_oracle_dblink" in self.execution_options: + self.statement = self.statement.replace( + dictionary.DB_LINK_PLACEHOLDER, + self.execution_options["_oracle_dblink"], + ) + + +class OracleDialect(default.DefaultDialect): + name = "oracle" + supports_statement_cache = True + supports_alter = True + max_identifier_length = 128 + + _supports_offset_fetch = True + + insert_returning = True + update_returning = True + delete_returning = True + + div_is_floordiv = False + + supports_simple_order_by_label = False + cte_follows_insert = True + returns_native_bytes = True + + supports_sequences = True + sequences_optional = False + postfetch_lastrowid = False + + default_paramstyle = "named" + colspecs = colspecs + ischema_names = ischema_names + requires_name_normalize = True + + supports_comments = True + + supports_default_values = False + supports_default_metavalue = True + supports_empty_insert = False + supports_identity_columns = True + + statement_compiler = OracleCompiler + ddl_compiler = OracleDDLCompiler + type_compiler_cls = OracleTypeCompiler + preparer = OracleIdentifierPreparer + execution_ctx_cls = OracleExecutionContext + + reflection_options = ("oracle_resolve_synonyms",) + + _use_nchar_for_unicode = False + + construct_arguments = [ + ( + sa_schema.Table, + {"resolve_synonyms": False, "on_commit": None, "compress": False}, + ), + (sa_schema.Index, {"bitmap": False, "compress": False}), + ] + + @util.deprecated_params( + use_binds_for_limits=( + "1.4", + "The ``use_binds_for_limits`` Oracle dialect parameter is " + "deprecated. The dialect now renders LIMIT /OFFSET integers " + "inline in all cases using a post-compilation hook, so that the " + "value is still represented by a 'bound parameter' on the Core " + "Expression side.", + ) + ) + def __init__( + self, + use_ansi=True, + optimize_limits=False, + use_binds_for_limits=None, + use_nchar_for_unicode=False, + exclude_tablespaces=("SYSTEM", "SYSAUX"), + enable_offset_fetch=True, + **kwargs, + ): + default.DefaultDialect.__init__(self, **kwargs) + self._use_nchar_for_unicode = use_nchar_for_unicode + self.use_ansi = use_ansi + self.optimize_limits = optimize_limits + self.exclude_tablespaces = exclude_tablespaces + self.enable_offset_fetch = self._supports_offset_fetch = ( + enable_offset_fetch + ) + + def initialize(self, connection): + super().initialize(connection) + + # Oracle 8i has RETURNING: + # https://docs.oracle.com/cd/A87860_01/doc/index.htm + + # so does Oracle8: + # https://docs.oracle.com/cd/A64702_01/doc/index.htm + + if self._is_oracle_8: + self.colspecs = self.colspecs.copy() + self.colspecs.pop(sqltypes.Interval) + self.use_ansi = False + + self.supports_identity_columns = self.server_version_info >= (12,) + self._supports_offset_fetch = ( + self.enable_offset_fetch and self.server_version_info >= (12,) + ) + + def _get_effective_compat_server_version_info(self, connection): + # dialect does not need compat levels below 12.2, so don't query + # in those cases + + if self.server_version_info < (12, 2): + return self.server_version_info + try: + compat = connection.exec_driver_sql( + "SELECT value FROM v$parameter WHERE name = 'compatible'" + ).scalar() + except exc.DBAPIError: + compat = None + + if compat: + try: + return tuple(int(x) for x in compat.split(".")) + except: + return self.server_version_info + else: + return self.server_version_info + + @property + def _is_oracle_8(self): + return self.server_version_info and self.server_version_info < (9,) + + @property + def _supports_table_compression(self): + return self.server_version_info and self.server_version_info >= (10, 1) + + @property + def _supports_table_compress_for(self): + return self.server_version_info and self.server_version_info >= (11,) + + @property + def _supports_char_length(self): + return not self._is_oracle_8 + + @property + def _supports_update_returning_computed_cols(self): + # on version 18 this error is no longet present while it happens on 11 + # it may work also on versions before the 18 + return self.server_version_info and self.server_version_info >= (18,) + + @property + def _supports_except_all(self): + return self.server_version_info and self.server_version_info >= (21,) + + def do_release_savepoint(self, connection, name): + # Oracle does not support RELEASE SAVEPOINT + pass + + def _check_max_identifier_length(self, connection): + if self._get_effective_compat_server_version_info(connection) < ( + 12, + 2, + ): + return 30 + else: + # use the default + return None + + def get_isolation_level_values(self, dbapi_connection): + return ["READ COMMITTED", "SERIALIZABLE"] + + def get_default_isolation_level(self, dbapi_conn): + try: + return self.get_isolation_level(dbapi_conn) + except NotImplementedError: + raise + except: + return "READ COMMITTED" + + def _execute_reflection( + self, connection, query, dblink, returns_long, params=None + ): + if dblink and not dblink.startswith("@"): + dblink = f"@{dblink}" + execution_options = { + # handle db links + "_oracle_dblink": dblink or "", + # override any schema translate map + "schema_translate_map": None, + } + + if dblink and returns_long: + # Oracle seems to error with + # "ORA-00997: illegal use of LONG datatype" when returning + # LONG columns via a dblink in a query with bind params + # This type seems to be very hard to cast into something else + # so it seems easier to just use bind param in this case + def visit_bindparam(bindparam): + bindparam.literal_execute = True + + query = visitors.cloned_traverse( + query, {}, {"bindparam": visit_bindparam} + ) + return connection.execute( + query, params, execution_options=execution_options + ) + + @util.memoized_property + def _has_table_query(self): + # materialized views are returned by all_tables + tables = ( + select( + dictionary.all_tables.c.table_name, + dictionary.all_tables.c.owner, + ) + .union_all( + select( + dictionary.all_views.c.view_name.label("table_name"), + dictionary.all_views.c.owner, + ) + ) + .subquery("tables_and_views") + ) + + query = select(tables.c.table_name).where( + tables.c.table_name == bindparam("table_name"), + tables.c.owner == bindparam("owner"), + ) + return query + + @reflection.cache + def has_table( + self, connection, table_name, schema=None, dblink=None, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + self._ensure_has_table_connection(connection) + + if not schema: + schema = self.default_schema_name + + params = { + "table_name": self.denormalize_name(table_name), + "owner": self.denormalize_schema_name(schema), + } + cursor = self._execute_reflection( + connection, + self._has_table_query, + dblink, + returns_long=False, + params=params, + ) + return bool(cursor.scalar()) + + @reflection.cache + def has_sequence( + self, connection, sequence_name, schema=None, dblink=None, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + + query = select(dictionary.all_sequences.c.sequence_name).where( + dictionary.all_sequences.c.sequence_name + == self.denormalize_schema_name(sequence_name), + dictionary.all_sequences.c.sequence_owner + == self.denormalize_schema_name(schema), + ) + + cursor = self._execute_reflection( + connection, query, dblink, returns_long=False + ) + return bool(cursor.scalar()) + + def _get_default_schema_name(self, connection): + return self.normalize_name( + connection.exec_driver_sql( + "select sys_context( 'userenv', 'current_schema' ) from dual" + ).scalar() + ) + + def denormalize_schema_name(self, name): + # look for quoted_name + force = getattr(name, "quote", None) + if force is None and name == "public": + # look for case insensitive, no quoting specified, "public" + return "PUBLIC" + return super().denormalize_name(name) + + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("filter_names", InternalTraversal.dp_string_list), + ("dblink", InternalTraversal.dp_string), + ) + def _get_synonyms(self, connection, schema, filter_names, dblink, **kw): + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + has_filter_names, params = self._prepare_filter_names(filter_names) + query = select( + dictionary.all_synonyms.c.synonym_name, + dictionary.all_synonyms.c.table_name, + dictionary.all_synonyms.c.table_owner, + dictionary.all_synonyms.c.db_link, + ).where(dictionary.all_synonyms.c.owner == owner) + if has_filter_names: + query = query.where( + dictionary.all_synonyms.c.synonym_name.in_( + params["filter_names"] + ) + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).mappings() + return result.all() + + @lru_cache() + def _all_objects_query( + self, owner, scope, kind, has_filter_names, has_mat_views + ): + query = ( + select(dictionary.all_objects.c.object_name) + .select_from(dictionary.all_objects) + .where(dictionary.all_objects.c.owner == owner) + ) + + # NOTE: materialized views are listed in all_objects twice; + # once as MATERIALIZE VIEW and once as TABLE + if kind is ObjectKind.ANY: + # materilaized view are listed also as tables so there is no + # need to add them to the in_. + query = query.where( + dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW")) + ) + else: + object_type = [] + if ObjectKind.VIEW in kind: + object_type.append("VIEW") + if ( + ObjectKind.MATERIALIZED_VIEW in kind + and ObjectKind.TABLE not in kind + ): + # materilaized view are listed also as tables so there is no + # need to add them to the in_ if also selecting tables. + object_type.append("MATERIALIZED VIEW") + if ObjectKind.TABLE in kind: + object_type.append("TABLE") + if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind: + # materialized view are listed also as tables, + # so they need to be filtered out + # EXCEPT ALL / MINUS profiles as faster than using + # NOT EXISTS or NOT IN with a subquery, but it's in + # general faster to get the mat view names and exclude + # them only when needed + query = query.where( + dictionary.all_objects.c.object_name.not_in( + bindparam("mat_views") + ) + ) + query = query.where( + dictionary.all_objects.c.object_type.in_(object_type) + ) + + # handles scope + if scope is ObjectScope.DEFAULT: + query = query.where(dictionary.all_objects.c.temporary == "N") + elif scope is ObjectScope.TEMPORARY: + query = query.where(dictionary.all_objects.c.temporary == "Y") + + if has_filter_names: + query = query.where( + dictionary.all_objects.c.object_name.in_( + bindparam("filter_names") + ) + ) + return query + + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("scope", InternalTraversal.dp_plain_obj), + ("kind", InternalTraversal.dp_plain_obj), + ("filter_names", InternalTraversal.dp_string_list), + ("dblink", InternalTraversal.dp_string), + ) + def _get_all_objects( + self, connection, schema, scope, kind, filter_names, dblink, **kw + ): + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + has_filter_names, params = self._prepare_filter_names(filter_names) + has_mat_views = False + if ( + ObjectKind.TABLE in kind + and ObjectKind.MATERIALIZED_VIEW not in kind + ): + # see note in _all_objects_query + mat_views = self.get_materialized_view_names( + connection, schema, dblink, _normalize=False, **kw + ) + if mat_views: + params["mat_views"] = mat_views + has_mat_views = True + + query = self._all_objects_query( + owner, scope, kind, has_filter_names, has_mat_views + ) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False, params=params + ).scalars() + + return result.all() + + def _handle_synonyms_decorator(fn): + @wraps(fn) + def wrapper(self, *args, **kwargs): + return self._handle_synonyms(fn, *args, **kwargs) + + return wrapper + + def _handle_synonyms(self, fn, connection, *args, **kwargs): + if not kwargs.get("oracle_resolve_synonyms", False): + return fn(self, connection, *args, **kwargs) + + original_kw = kwargs.copy() + schema = kwargs.pop("schema", None) + result = self._get_synonyms( + connection, + schema=schema, + filter_names=kwargs.pop("filter_names", None), + dblink=kwargs.pop("dblink", None), + info_cache=kwargs.get("info_cache", None), + ) + + dblinks_owners = defaultdict(dict) + for row in result: + key = row["db_link"], row["table_owner"] + tn = self.normalize_name(row["table_name"]) + dblinks_owners[key][tn] = row["synonym_name"] + + if not dblinks_owners: + # No synonym, do the plain thing + return fn(self, connection, *args, **original_kw) + + data = {} + for (dblink, table_owner), mapping in dblinks_owners.items(): + call_kw = { + **original_kw, + "schema": table_owner, + "dblink": self.normalize_name(dblink), + "filter_names": mapping.keys(), + } + call_result = fn(self, connection, *args, **call_kw) + for (_, tn), value in call_result: + synonym_name = self.normalize_name(mapping[tn]) + data[(schema, synonym_name)] = value + return data.items() + + @reflection.cache + def get_schema_names(self, connection, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + query = select(dictionary.all_users.c.username).order_by( + dictionary.all_users.c.username + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + @reflection.cache + def get_table_names(self, connection, schema=None, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + # note that table_names() isn't loading DBLINKed or synonym'ed tables + if schema is None: + schema = self.default_schema_name + + den_schema = self.denormalize_schema_name(schema) + if kw.get("oracle_resolve_synonyms", False): + tables = ( + select( + dictionary.all_tables.c.table_name, + dictionary.all_tables.c.owner, + dictionary.all_tables.c.iot_name, + dictionary.all_tables.c.duration, + dictionary.all_tables.c.tablespace_name, + ) + .union_all( + select( + dictionary.all_synonyms.c.synonym_name.label( + "table_name" + ), + dictionary.all_synonyms.c.owner, + dictionary.all_tables.c.iot_name, + dictionary.all_tables.c.duration, + dictionary.all_tables.c.tablespace_name, + ) + .select_from(dictionary.all_tables) + .join( + dictionary.all_synonyms, + and_( + dictionary.all_tables.c.table_name + == dictionary.all_synonyms.c.table_name, + dictionary.all_tables.c.owner + == func.coalesce( + dictionary.all_synonyms.c.table_owner, + dictionary.all_synonyms.c.owner, + ), + ), + ) + ) + .subquery("available_tables") + ) + else: + tables = dictionary.all_tables + + query = select(tables.c.table_name) + if self.exclude_tablespaces: + query = query.where( + func.coalesce( + tables.c.tablespace_name, "no tablespace" + ).not_in(self.exclude_tablespaces) + ) + query = query.where( + tables.c.owner == den_schema, + tables.c.iot_name.is_(null()), + tables.c.duration.is_(null()), + ) + + # remove materialized views + mat_query = select( + dictionary.all_mviews.c.mview_name.label("table_name") + ).where(dictionary.all_mviews.c.owner == den_schema) + + query = ( + query.except_all(mat_query) + if self._supports_except_all + else query.except_(mat_query) + ) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + @reflection.cache + def get_temp_table_names(self, connection, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + schema = self.denormalize_schema_name(self.default_schema_name) + + query = select(dictionary.all_tables.c.table_name) + if self.exclude_tablespaces: + query = query.where( + func.coalesce( + dictionary.all_tables.c.tablespace_name, "no tablespace" + ).not_in(self.exclude_tablespaces) + ) + query = query.where( + dictionary.all_tables.c.owner == schema, + dictionary.all_tables.c.iot_name.is_(null()), + dictionary.all_tables.c.duration.is_not(null()), + ) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + @reflection.cache + def get_materialized_view_names( + self, connection, schema=None, dblink=None, _normalize=True, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + + query = select(dictionary.all_mviews.c.mview_name).where( + dictionary.all_mviews.c.owner + == self.denormalize_schema_name(schema) + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + if _normalize: + return [self.normalize_name(row) for row in result] + else: + return result.all() + + @reflection.cache + def get_view_names(self, connection, schema=None, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + + query = select(dictionary.all_views.c.view_name).where( + dictionary.all_views.c.owner + == self.denormalize_schema_name(schema) + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + @reflection.cache + def get_sequence_names(self, connection, schema=None, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + query = select(dictionary.all_sequences.c.sequence_name).where( + dictionary.all_sequences.c.sequence_owner + == self.denormalize_schema_name(schema) + ) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + def _value_or_raise(self, data, table, schema): + table = self.normalize_name(str(table)) + try: + return dict(data)[(schema, table)] + except KeyError: + raise exc.NoSuchTableError( + f"{schema}.{table}" if schema else table + ) from None + + def _prepare_filter_names(self, filter_names): + if filter_names: + fn = [self.denormalize_name(name) for name in filter_names] + return True, {"filter_names": fn} + else: + return False, {} + + @reflection.cache + def get_table_options(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_table_options( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _table_options_query( + self, owner, scope, kind, has_filter_names, has_mat_views + ): + query = select( + dictionary.all_tables.c.table_name, + dictionary.all_tables.c.compression, + dictionary.all_tables.c.compress_for, + ).where(dictionary.all_tables.c.owner == owner) + if has_filter_names: + query = query.where( + dictionary.all_tables.c.table_name.in_( + bindparam("filter_names") + ) + ) + if scope is ObjectScope.DEFAULT: + query = query.where(dictionary.all_tables.c.duration.is_(null())) + elif scope is ObjectScope.TEMPORARY: + query = query.where( + dictionary.all_tables.c.duration.is_not(null()) + ) + + if ( + has_mat_views + and ObjectKind.TABLE in kind + and ObjectKind.MATERIALIZED_VIEW not in kind + ): + # cant use EXCEPT ALL / MINUS here because we don't have an + # excludable row vs. the query above + # outerjoin + where null works better on oracle 21 but 11 does + # not like it at all. this is the next best thing + + query = query.where( + dictionary.all_tables.c.table_name.not_in( + bindparam("mat_views") + ) + ) + elif ( + ObjectKind.TABLE not in kind + and ObjectKind.MATERIALIZED_VIEW in kind + ): + query = query.where( + dictionary.all_tables.c.table_name.in_(bindparam("mat_views")) + ) + return query + + @_handle_synonyms_decorator + def get_multi_table_options( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + has_filter_names, params = self._prepare_filter_names(filter_names) + has_mat_views = False + + if ( + ObjectKind.TABLE in kind + and ObjectKind.MATERIALIZED_VIEW not in kind + ): + # see note in _table_options_query + mat_views = self.get_materialized_view_names( + connection, schema, dblink, _normalize=False, **kw + ) + if mat_views: + params["mat_views"] = mat_views + has_mat_views = True + elif ( + ObjectKind.TABLE not in kind + and ObjectKind.MATERIALIZED_VIEW in kind + ): + mat_views = self.get_materialized_view_names( + connection, schema, dblink, _normalize=False, **kw + ) + params["mat_views"] = mat_views + + options = {} + default = ReflectionDefaults.table_options + + if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind: + query = self._table_options_query( + owner, scope, kind, has_filter_names, has_mat_views + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False, params=params + ) + + for table, compression, compress_for in result: + if compression == "ENABLED": + data = {"oracle_compress": compress_for} + else: + data = default() + options[(schema, self.normalize_name(table))] = data + if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope: + # add the views (no temporary views) + for view in self.get_view_names(connection, schema, dblink, **kw): + if not filter_names or view in filter_names: + options[(schema, view)] = default() + + return options.items() + + @reflection.cache + def get_columns(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + + data = self.get_multi_columns( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + def _run_batches( + self, connection, query, dblink, returns_long, mappings, all_objects + ): + each_batch = 500 + batches = list(all_objects) + while batches: + batch = batches[0:each_batch] + batches[0:each_batch] = [] + + result = self._execute_reflection( + connection, + query, + dblink, + returns_long=returns_long, + params={"all_objects": batch}, + ) + if mappings: + yield from result.mappings() + else: + yield from result + + @lru_cache() + def _column_query(self, owner): + all_cols = dictionary.all_tab_cols + all_comments = dictionary.all_col_comments + all_ids = dictionary.all_tab_identity_cols + + if self.server_version_info >= (12,): + add_cols = ( + all_cols.c.default_on_null, + sql.case( + (all_ids.c.table_name.is_(None), sql.null()), + else_=all_ids.c.generation_type + + "," + + all_ids.c.identity_options, + ).label("identity_options"), + ) + join_identity_cols = True + else: + add_cols = ( + sql.null().label("default_on_null"), + sql.null().label("identity_options"), + ) + join_identity_cols = False + + # NOTE: on oracle cannot create tables/views without columns and + # a table cannot have all column hidden: + # ORA-54039: table must have at least one column that is not invisible + # all_tab_cols returns data for tables/views/mat-views. + # all_tab_cols does not return recycled tables + + query = ( + select( + all_cols.c.table_name, + all_cols.c.column_name, + all_cols.c.data_type, + all_cols.c.char_length, + all_cols.c.data_precision, + all_cols.c.data_scale, + all_cols.c.nullable, + all_cols.c.data_default, + all_comments.c.comments, + all_cols.c.virtual_column, + *add_cols, + ).select_from(all_cols) + # NOTE: all_col_comments has a row for each column even if no + # comment is present, so a join could be performed, but there + # seems to be no difference compared to an outer join + .outerjoin( + all_comments, + and_( + all_cols.c.table_name == all_comments.c.table_name, + all_cols.c.column_name == all_comments.c.column_name, + all_cols.c.owner == all_comments.c.owner, + ), + ) + ) + if join_identity_cols: + query = query.outerjoin( + all_ids, + and_( + all_cols.c.table_name == all_ids.c.table_name, + all_cols.c.column_name == all_ids.c.column_name, + all_cols.c.owner == all_ids.c.owner, + ), + ) + + query = query.where( + all_cols.c.table_name.in_(bindparam("all_objects")), + all_cols.c.hidden_column == "NO", + all_cols.c.owner == owner, + ).order_by(all_cols.c.table_name, all_cols.c.column_id) + return query + + @_handle_synonyms_decorator + def get_multi_columns( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + query = self._column_query(owner) + + if ( + filter_names + and kind is ObjectKind.ANY + and scope is ObjectScope.ANY + ): + all_objects = [self.denormalize_name(n) for n in filter_names] + else: + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + columns = defaultdict(list) + + # all_tab_cols.data_default is LONG + result = self._run_batches( + connection, + query, + dblink, + returns_long=True, + mappings=True, + all_objects=all_objects, + ) + + def maybe_int(value): + if isinstance(value, float) and value.is_integer(): + return int(value) + else: + return value + + remove_size = re.compile(r"\(\d+\)") + + for row_dict in result: + table_name = self.normalize_name(row_dict["table_name"]) + orig_colname = row_dict["column_name"] + colname = self.normalize_name(orig_colname) + coltype = row_dict["data_type"] + precision = maybe_int(row_dict["data_precision"]) + + if coltype == "NUMBER": + scale = maybe_int(row_dict["data_scale"]) + if precision is None and scale == 0: + coltype = INTEGER() + else: + coltype = NUMBER(precision, scale) + elif coltype == "FLOAT": + # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm + if precision == 126: + # The DOUBLE PRECISION datatype is a floating-point + # number with binary precision 126. + coltype = DOUBLE_PRECISION() + elif precision == 63: + # The REAL datatype is a floating-point number with a + # binary precision of 63, or 18 decimal. + coltype = REAL() + else: + # non standard precision + coltype = FLOAT(binary_precision=precision) + + elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"): + char_length = maybe_int(row_dict["char_length"]) + coltype = self.ischema_names.get(coltype)(char_length) + elif "WITH TIME ZONE" in coltype: + coltype = TIMESTAMP(timezone=True) + elif "WITH LOCAL TIME ZONE" in coltype: + coltype = TIMESTAMP(local_timezone=True) + else: + coltype = re.sub(remove_size, "", coltype) + try: + coltype = self.ischema_names[coltype] + except KeyError: + util.warn( + "Did not recognize type '%s' of column '%s'" + % (coltype, colname) + ) + coltype = sqltypes.NULLTYPE + + default = row_dict["data_default"] + if row_dict["virtual_column"] == "YES": + computed = dict(sqltext=default) + default = None + else: + computed = None + + identity_options = row_dict["identity_options"] + if identity_options is not None: + identity = self._parse_identity_options( + identity_options, row_dict["default_on_null"] + ) + default = None + else: + identity = None + + cdict = { + "name": colname, + "type": coltype, + "nullable": row_dict["nullable"] == "Y", + "default": default, + "comment": row_dict["comments"], + } + if orig_colname.lower() == orig_colname: + cdict["quote"] = True + if computed is not None: + cdict["computed"] = computed + if identity is not None: + cdict["identity"] = identity + + columns[(schema, table_name)].append(cdict) + + # NOTE: default not needed since all tables have columns + # default = ReflectionDefaults.columns + # return ( + # (key, value if value else default()) + # for key, value in columns.items() + # ) + return columns.items() + + def _parse_identity_options(self, identity_options, default_on_null): + # identity_options is a string that starts with 'ALWAYS,' or + # 'BY DEFAULT,' and continues with + # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1, + # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N, + # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N + parts = [p.strip() for p in identity_options.split(",")] + identity = { + "always": parts[0] == "ALWAYS", + "on_null": default_on_null == "YES", + } + + for part in parts[1:]: + option, value = part.split(":") + value = value.strip() + + if "START WITH" in option: + identity["start"] = int(value) + elif "INCREMENT BY" in option: + identity["increment"] = int(value) + elif "MAX_VALUE" in option: + identity["maxvalue"] = int(value) + elif "MIN_VALUE" in option: + identity["minvalue"] = int(value) + elif "CYCLE_FLAG" in option: + identity["cycle"] = value == "Y" + elif "CACHE_SIZE" in option: + identity["cache"] = int(value) + elif "ORDER_FLAG" in option: + identity["order"] = value == "Y" + return identity + + @reflection.cache + def get_table_comment(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_table_comment( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _comment_query(self, owner, scope, kind, has_filter_names): + # NOTE: all_tab_comments / all_mview_comments have a row for all + # object even if they don't have comments + queries = [] + if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind: + # all_tab_comments returns also plain views + tbl_view = select( + dictionary.all_tab_comments.c.table_name, + dictionary.all_tab_comments.c.comments, + ).where( + dictionary.all_tab_comments.c.owner == owner, + dictionary.all_tab_comments.c.table_name.not_like("BIN$%"), + ) + if ObjectKind.VIEW not in kind: + tbl_view = tbl_view.where( + dictionary.all_tab_comments.c.table_type == "TABLE" + ) + elif ObjectKind.TABLE not in kind: + tbl_view = tbl_view.where( + dictionary.all_tab_comments.c.table_type == "VIEW" + ) + queries.append(tbl_view) + if ObjectKind.MATERIALIZED_VIEW in kind: + mat_view = select( + dictionary.all_mview_comments.c.mview_name.label("table_name"), + dictionary.all_mview_comments.c.comments, + ).where( + dictionary.all_mview_comments.c.owner == owner, + dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"), + ) + queries.append(mat_view) + if len(queries) == 1: + query = queries[0] + else: + union = sql.union_all(*queries).subquery("tables_and_views") + query = select(union.c.table_name, union.c.comments) + + name_col = query.selected_columns.table_name + + if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY): + temp = "Y" if scope is ObjectScope.TEMPORARY else "N" + # need distinct since materialized view are listed also + # as tables in all_objects + query = query.distinct().join( + dictionary.all_objects, + and_( + dictionary.all_objects.c.owner == owner, + dictionary.all_objects.c.object_name == name_col, + dictionary.all_objects.c.temporary == temp, + ), + ) + if has_filter_names: + query = query.where(name_col.in_(bindparam("filter_names"))) + return query + + @_handle_synonyms_decorator + def get_multi_table_comment( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + has_filter_names, params = self._prepare_filter_names(filter_names) + query = self._comment_query(owner, scope, kind, has_filter_names) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False, params=params + ) + default = ReflectionDefaults.table_comment + # materialized views by default seem to have a comment like + # "snapshot table for snapshot owner.mat_view_name" + ignore_mat_view = "snapshot table for snapshot " + return ( + ( + (schema, self.normalize_name(table)), + ( + {"text": comment} + if comment is not None + and not comment.startswith(ignore_mat_view) + else default() + ), + ) + for table, comment in result + ) + + @reflection.cache + def get_indexes(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_indexes( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _index_query(self, owner): + return ( + select( + dictionary.all_ind_columns.c.table_name, + dictionary.all_ind_columns.c.index_name, + dictionary.all_ind_columns.c.column_name, + dictionary.all_indexes.c.index_type, + dictionary.all_indexes.c.uniqueness, + dictionary.all_indexes.c.compression, + dictionary.all_indexes.c.prefix_length, + dictionary.all_ind_columns.c.descend, + dictionary.all_ind_expressions.c.column_expression, + ) + .select_from(dictionary.all_ind_columns) + .join( + dictionary.all_indexes, + sql.and_( + dictionary.all_ind_columns.c.index_name + == dictionary.all_indexes.c.index_name, + dictionary.all_ind_columns.c.index_owner + == dictionary.all_indexes.c.owner, + ), + ) + .outerjoin( + # NOTE: this adds about 20% to the query time. Using a + # case expression with a scalar subquery only when needed + # with the assumption that most indexes are not expression + # would be faster but oracle does not like that with + # LONG datatype. It errors with: + # ORA-00997: illegal use of LONG datatype + dictionary.all_ind_expressions, + sql.and_( + dictionary.all_ind_expressions.c.index_name + == dictionary.all_ind_columns.c.index_name, + dictionary.all_ind_expressions.c.index_owner + == dictionary.all_ind_columns.c.index_owner, + dictionary.all_ind_expressions.c.column_position + == dictionary.all_ind_columns.c.column_position, + ), + ) + .where( + dictionary.all_indexes.c.table_owner == owner, + dictionary.all_indexes.c.table_name.in_( + bindparam("all_objects") + ), + ) + .order_by( + dictionary.all_ind_columns.c.index_name, + dictionary.all_ind_columns.c.column_position, + ) + ) + + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("dblink", InternalTraversal.dp_string), + ("all_objects", InternalTraversal.dp_string_list), + ) + def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw): + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + query = self._index_query(owner) + + pks = { + row_dict["constraint_name"] + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ) + if row_dict["constraint_type"] == "P" + } + + # all_ind_expressions.column_expression is LONG + result = self._run_batches( + connection, + query, + dblink, + returns_long=True, + mappings=True, + all_objects=all_objects, + ) + + return [ + row_dict + for row_dict in result + if row_dict["index_name"] not in pks + ] + + @_handle_synonyms_decorator + def get_multi_indexes( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + uniqueness = {"NONUNIQUE": False, "UNIQUE": True} + enabled = {"DISABLED": False, "ENABLED": True} + is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"} + + indexes = defaultdict(dict) + + for row_dict in self._get_indexes_rows( + connection, schema, dblink, all_objects, **kw + ): + index_name = self.normalize_name(row_dict["index_name"]) + table_name = self.normalize_name(row_dict["table_name"]) + table_indexes = indexes[(schema, table_name)] + + if index_name not in table_indexes: + table_indexes[index_name] = index_dict = { + "name": index_name, + "column_names": [], + "dialect_options": {}, + "unique": uniqueness.get(row_dict["uniqueness"], False), + } + do = index_dict["dialect_options"] + if row_dict["index_type"] in is_bitmap: + do["oracle_bitmap"] = True + if enabled.get(row_dict["compression"], False): + do["oracle_compress"] = row_dict["prefix_length"] + + else: + index_dict = table_indexes[index_name] + + expr = row_dict["column_expression"] + if expr is not None: + index_dict["column_names"].append(None) + if "expressions" in index_dict: + index_dict["expressions"].append(expr) + else: + index_dict["expressions"] = index_dict["column_names"][:-1] + index_dict["expressions"].append(expr) + + if row_dict["descend"].lower() != "asc": + assert row_dict["descend"].lower() == "desc" + cs = index_dict.setdefault("column_sorting", {}) + cs[expr] = ("desc",) + else: + assert row_dict["descend"].lower() == "asc" + cn = self.normalize_name(row_dict["column_name"]) + index_dict["column_names"].append(cn) + if "expressions" in index_dict: + index_dict["expressions"].append(cn) + + default = ReflectionDefaults.indexes + + return ( + (key, list(indexes[key].values()) if key in indexes else default()) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + @reflection.cache + def get_pk_constraint(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_pk_constraint( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _constraint_query(self, owner): + local = dictionary.all_cons_columns.alias("local") + remote = dictionary.all_cons_columns.alias("remote") + return ( + select( + dictionary.all_constraints.c.table_name, + dictionary.all_constraints.c.constraint_type, + dictionary.all_constraints.c.constraint_name, + local.c.column_name.label("local_column"), + remote.c.table_name.label("remote_table"), + remote.c.column_name.label("remote_column"), + remote.c.owner.label("remote_owner"), + dictionary.all_constraints.c.search_condition, + dictionary.all_constraints.c.delete_rule, + ) + .select_from(dictionary.all_constraints) + .join( + local, + and_( + local.c.owner == dictionary.all_constraints.c.owner, + dictionary.all_constraints.c.constraint_name + == local.c.constraint_name, + ), + ) + .outerjoin( + remote, + and_( + dictionary.all_constraints.c.r_owner == remote.c.owner, + dictionary.all_constraints.c.r_constraint_name + == remote.c.constraint_name, + or_( + remote.c.position.is_(sql.null()), + local.c.position == remote.c.position, + ), + ), + ) + .where( + dictionary.all_constraints.c.owner == owner, + dictionary.all_constraints.c.table_name.in_( + bindparam("all_objects") + ), + dictionary.all_constraints.c.constraint_type.in_( + ("R", "P", "U", "C") + ), + ) + .order_by( + dictionary.all_constraints.c.constraint_name, local.c.position + ) + ) + + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("dblink", InternalTraversal.dp_string), + ("all_objects", InternalTraversal.dp_string_list), + ) + def _get_all_constraint_rows( + self, connection, schema, dblink, all_objects, **kw + ): + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + query = self._constraint_query(owner) + + # since the result is cached a list must be created + values = list( + self._run_batches( + connection, + query, + dblink, + returns_long=False, + mappings=True, + all_objects=all_objects, + ) + ) + return values + + @_handle_synonyms_decorator + def get_multi_pk_constraint( + self, + connection, + *, + scope, + schema, + filter_names, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + primary_keys = defaultdict(dict) + default = ReflectionDefaults.pk_constraint + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "P": + continue + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name = self.normalize_name(row_dict["constraint_name"]) + column_name = self.normalize_name(row_dict["local_column"]) + + table_pk = primary_keys[(schema, table_name)] + if not table_pk: + table_pk["name"] = constraint_name + table_pk["constrained_columns"] = [column_name] + else: + table_pk["constrained_columns"].append(column_name) + + return ( + (key, primary_keys[key] if key in primary_keys else default()) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + @reflection.cache + def get_foreign_keys( + self, + connection, + table_name, + schema=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_foreign_keys( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @_handle_synonyms_decorator + def get_multi_foreign_keys( + self, + connection, + *, + scope, + schema, + filter_names, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + all_remote_owners = set() + fkeys = defaultdict(dict) + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "R": + continue + + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name = self.normalize_name(row_dict["constraint_name"]) + table_fkey = fkeys[(schema, table_name)] + + assert constraint_name is not None + + local_column = self.normalize_name(row_dict["local_column"]) + remote_table = self.normalize_name(row_dict["remote_table"]) + remote_column = self.normalize_name(row_dict["remote_column"]) + remote_owner_orig = row_dict["remote_owner"] + remote_owner = self.normalize_name(remote_owner_orig) + if remote_owner_orig is not None: + all_remote_owners.add(remote_owner_orig) + + if remote_table is None: + # ticket 363 + if dblink and not dblink.startswith("@"): + dblink = f"@{dblink}" + util.warn( + "Got 'None' querying 'table_name' from " + f"all_cons_columns{dblink or ''} - does the user have " + "proper rights to the table?" + ) + continue + + if constraint_name not in table_fkey: + table_fkey[constraint_name] = fkey = { + "name": constraint_name, + "constrained_columns": [], + "referred_schema": None, + "referred_table": remote_table, + "referred_columns": [], + "options": {}, + } + + if resolve_synonyms: + # will be removed below + fkey["_ref_schema"] = remote_owner + + if schema is not None or remote_owner_orig != owner: + fkey["referred_schema"] = remote_owner + + delete_rule = row_dict["delete_rule"] + if delete_rule != "NO ACTION": + fkey["options"]["ondelete"] = delete_rule + + else: + fkey = table_fkey[constraint_name] + + fkey["constrained_columns"].append(local_column) + fkey["referred_columns"].append(remote_column) + + if resolve_synonyms and all_remote_owners: + query = select( + dictionary.all_synonyms.c.owner, + dictionary.all_synonyms.c.table_name, + dictionary.all_synonyms.c.table_owner, + dictionary.all_synonyms.c.synonym_name, + ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners)) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).mappings() + + remote_owners_lut = {} + for row in result: + synonym_owner = self.normalize_name(row["owner"]) + table_name = self.normalize_name(row["table_name"]) + + remote_owners_lut[(synonym_owner, table_name)] = ( + row["table_owner"], + row["synonym_name"], + ) + + empty = (None, None) + for table_fkeys in fkeys.values(): + for table_fkey in table_fkeys.values(): + key = ( + table_fkey.pop("_ref_schema"), + table_fkey["referred_table"], + ) + remote_owner, syn_name = remote_owners_lut.get(key, empty) + if syn_name: + sn = self.normalize_name(syn_name) + table_fkey["referred_table"] = sn + if schema is not None or remote_owner != owner: + ro = self.normalize_name(remote_owner) + table_fkey["referred_schema"] = ro + else: + table_fkey["referred_schema"] = None + default = ReflectionDefaults.foreign_keys + + return ( + (key, list(fkeys[key].values()) if key in fkeys else default()) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + @reflection.cache + def get_unique_constraints( + self, connection, table_name, schema=None, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_unique_constraints( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @_handle_synonyms_decorator + def get_multi_unique_constraints( + self, + connection, + *, + scope, + schema, + filter_names, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + unique_cons = defaultdict(dict) + + index_names = { + row_dict["index_name"] + for row_dict in self._get_indexes_rows( + connection, schema, dblink, all_objects, **kw + ) + } + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "U": + continue + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name_orig = row_dict["constraint_name"] + constraint_name = self.normalize_name(constraint_name_orig) + column_name = self.normalize_name(row_dict["local_column"]) + table_uc = unique_cons[(schema, table_name)] + + assert constraint_name is not None + + if constraint_name not in table_uc: + table_uc[constraint_name] = uc = { + "name": constraint_name, + "column_names": [], + "duplicates_index": ( + constraint_name + if constraint_name_orig in index_names + else None + ), + } + else: + uc = table_uc[constraint_name] + + uc["column_names"].append(column_name) + + default = ReflectionDefaults.unique_constraints + + return ( + ( + key, + ( + list(unique_cons[key].values()) + if key in unique_cons + else default() + ), + ) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + @reflection.cache + def get_view_definition( + self, + connection, + view_name, + schema=None, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + if kw.get("oracle_resolve_synonyms", False): + synonyms = self._get_synonyms( + connection, schema, filter_names=[view_name], dblink=dblink + ) + if synonyms: + assert len(synonyms) == 1 + row_dict = synonyms[0] + dblink = self.normalize_name(row_dict["db_link"]) + schema = row_dict["table_owner"] + view_name = row_dict["table_name"] + + name = self.denormalize_name(view_name) + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + query = ( + select(dictionary.all_views.c.text) + .where( + dictionary.all_views.c.view_name == name, + dictionary.all_views.c.owner == owner, + ) + .union_all( + select(dictionary.all_mviews.c.query).where( + dictionary.all_mviews.c.mview_name == name, + dictionary.all_mviews.c.owner == owner, + ) + ) + ) + + rp = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalar() + if rp is None: + raise exc.NoSuchTableError( + f"{schema}.{view_name}" if schema else view_name + ) + else: + return rp + + @reflection.cache + def get_check_constraints( + self, connection, table_name, schema=None, include_all=False, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_check_constraints( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + include_all=include_all, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @_handle_synonyms_decorator + def get_multi_check_constraints( + self, + connection, + *, + schema, + filter_names, + dblink=None, + scope, + kind, + include_all=False, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + not_null = re.compile(r"..+?. IS NOT NULL$") + + check_constraints = defaultdict(list) + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "C": + continue + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name = self.normalize_name(row_dict["constraint_name"]) + search_condition = row_dict["search_condition"] + + table_checks = check_constraints[(schema, table_name)] + if constraint_name is not None and ( + include_all or not not_null.match(search_condition) + ): + table_checks.append( + {"name": constraint_name, "sqltext": search_condition} + ) + + default = ReflectionDefaults.check_constraints + + return ( + ( + key, + ( + check_constraints[key] + if key in check_constraints + else default() + ), + ) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + def _list_dblinks(self, connection, dblink=None): + query = select(dictionary.all_db_links.c.db_link) + links = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(link) for link in links] + + +class _OuterJoinColumn(sql.ClauseElement): + __visit_name__ = "outer_join_column" + + def __init__(self, column): + self.column = column |