diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle')
14 files changed, 6124 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py new file mode 100644 index 0000000..d855122 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py @@ -0,0 +1,67 @@ +# dialects/oracle/__init__.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors +from types import ModuleType + +from . import base # noqa +from . import cx_oracle # noqa +from . import oracledb # noqa +from .base import BFILE +from .base import BINARY_DOUBLE +from .base import BINARY_FLOAT +from .base import BLOB +from .base import CHAR +from .base import CLOB +from .base import DATE +from .base import DOUBLE_PRECISION +from .base import FLOAT +from .base import INTERVAL +from .base import LONG +from .base import NCHAR +from .base import NCLOB +from .base import NUMBER +from .base import NVARCHAR +from .base import NVARCHAR2 +from .base import RAW +from .base import REAL +from .base import ROWID +from .base import TIMESTAMP +from .base import VARCHAR +from .base import VARCHAR2 + +# Alias oracledb also as oracledb_async +oracledb_async = type( + "oracledb_async", (ModuleType,), {"dialect": oracledb.dialect_async} +) + +base.dialect = dialect = cx_oracle.dialect + +__all__ = ( + "VARCHAR", + "NVARCHAR", + "CHAR", + "NCHAR", + "DATE", + "NUMBER", + "BLOB", + "BFILE", + "CLOB", + "NCLOB", + "TIMESTAMP", + "RAW", + "FLOAT", + "DOUBLE_PRECISION", + "BINARY_DOUBLE", + "BINARY_FLOAT", + "LONG", + "dialect", + "INTERVAL", + "VARCHAR2", + "NVARCHAR2", + "ROWID", + "REAL", +) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..cc02ead --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..bf594ef --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..9e8e947 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..89ce69c --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..9325524 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..6d3c52d --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..24bfa8d --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pyc diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py new file mode 100644 index 0000000..a548b34 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py @@ -0,0 +1,3240 @@ +# dialects/oracle/base.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + + +r""" +.. dialect:: oracle + :name: Oracle + :full_support: 18c + :normal_support: 11+ + :best_effort: 9+ + + +Auto Increment Behavior +----------------------- + +SQLAlchemy Table objects which include integer primary keys are usually +assumed to have "autoincrementing" behavior, meaning they can generate their +own primary key values upon INSERT. For use within Oracle, two options are +available, which are the use of IDENTITY columns (Oracle 12 and above only) +or the association of a SEQUENCE with the column. + +Specifying GENERATED AS IDENTITY (Oracle 12 and above) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Starting from version 12 Oracle can make use of identity columns using +the :class:`_sql.Identity` to specify the autoincrementing behavior:: + + t = Table('mytable', metadata, + Column('id', Integer, Identity(start=3), primary_key=True), + Column(...), ... + ) + +The CREATE TABLE for the above :class:`_schema.Table` object would be: + +.. sourcecode:: sql + + CREATE TABLE mytable ( + id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3), + ..., + PRIMARY KEY (id) + ) + +The :class:`_schema.Identity` object support many options to control the +"autoincrementing" behavior of the column, like the starting value, the +incrementing value, etc. +In addition to the standard options, Oracle supports setting +:paramref:`_schema.Identity.always` to ``None`` to use the default +generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports +setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL +in conjunction with a 'BY DEFAULT' identity column. + +Using a SEQUENCE (all Oracle versions) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Older version of Oracle had no "autoincrement" +feature, SQLAlchemy relies upon sequences to produce these values. With the +older Oracle versions, *a sequence must always be explicitly specified to +enable autoincrement*. This is divergent with the majority of documentation +examples which assume the usage of an autoincrement-capable database. To +specify sequences, use the sqlalchemy.schema.Sequence object which is passed +to a Column construct:: + + t = Table('mytable', metadata, + Column('id', Integer, Sequence('id_seq', start=1), primary_key=True), + Column(...), ... + ) + +This step is also required when using table reflection, i.e. autoload_with=engine:: + + t = Table('mytable', metadata, + Column('id', Integer, Sequence('id_seq', start=1), primary_key=True), + autoload_with=engine + ) + +.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct + in a :class:`_schema.Column` to specify the option of an autoincrementing + column. + +.. _oracle_isolation_level: + +Transaction Isolation Level / Autocommit +---------------------------------------- + +The Oracle database supports "READ COMMITTED" and "SERIALIZABLE" modes of +isolation. The AUTOCOMMIT isolation level is also supported by the cx_Oracle +dialect. + +To set using per-connection execution options:: + + connection = engine.connect() + connection = connection.execution_options( + isolation_level="AUTOCOMMIT" + ) + +For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle dialect sets the +level at the session level using ``ALTER SESSION``, which is reverted back +to its default setting when the connection is returned to the connection +pool. + +Valid values for ``isolation_level`` include: + +* ``READ COMMITTED`` +* ``AUTOCOMMIT`` +* ``SERIALIZABLE`` + +.. note:: The implementation for the + :meth:`_engine.Connection.get_isolation_level` method as implemented by the + Oracle dialect necessarily forces the start of a transaction using the + Oracle LOCAL_TRANSACTION_ID function; otherwise no level is normally + readable. + + Additionally, the :meth:`_engine.Connection.get_isolation_level` method will + raise an exception if the ``v$transaction`` view is not available due to + permissions or other reasons, which is a common occurrence in Oracle + installations. + + The cx_Oracle dialect attempts to call the + :meth:`_engine.Connection.get_isolation_level` method when the dialect makes + its first connection to the database in order to acquire the + "default"isolation level. This default level is necessary so that the level + can be reset on a connection after it has been temporarily modified using + :meth:`_engine.Connection.execution_options` method. In the common event + that the :meth:`_engine.Connection.get_isolation_level` method raises an + exception due to ``v$transaction`` not being readable as well as any other + database-related failure, the level is assumed to be "READ COMMITTED". No + warning is emitted for this initial first-connect condition as it is + expected to be a common restriction on Oracle databases. + +.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_oracle dialect + as well as the notion of a default isolation level + +.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live + reading of the isolation level. + +.. versionchanged:: 1.3.22 In the event that the default isolation + level cannot be read due to permissions on the v$transaction view as + is common in Oracle installations, the default isolation level is hardcoded + to "READ COMMITTED" which was the behavior prior to 1.3.21. + +.. seealso:: + + :ref:`dbapi_autocommit` + +Identifier Casing +----------------- + +In Oracle, the data dictionary represents all case insensitive identifier +names using UPPERCASE text. SQLAlchemy on the other hand considers an +all-lower case identifier name to be case insensitive. The Oracle dialect +converts all case insensitive identifiers to and from those two formats during +schema level communication, such as reflection of tables and indexes. Using +an UPPERCASE name on the SQLAlchemy side indicates a case sensitive +identifier, and SQLAlchemy will quote the name - this will cause mismatches +against data dictionary data received from Oracle, so unless identifier names +have been truly created as case sensitive (i.e. using quoted names), all +lowercase names should be used on the SQLAlchemy side. + +.. _oracle_max_identifier_lengths: + +Max Identifier Lengths +---------------------- + +Oracle has changed the default max identifier length as of Oracle Server +version 12.2. Prior to this version, the length was 30, and for 12.2 and +greater it is now 128. This change impacts SQLAlchemy in the area of +generated SQL label names as well as the generation of constraint names, +particularly in the case where the constraint naming convention feature +described at :ref:`constraint_naming_conventions` is being used. + +To assist with this change and others, Oracle includes the concept of a +"compatibility" version, which is a version number that is independent of the +actual server version in order to assist with migration of Oracle databases, +and may be configured within the Oracle server itself. This compatibility +version is retrieved using the query ``SELECT value FROM v$parameter WHERE +name = 'compatible';``. The SQLAlchemy Oracle dialect, when tasked with +determining the default max identifier length, will attempt to use this query +upon first connect in order to determine the effective compatibility version of +the server, which determines what the maximum allowed identifier length is for +the server. If the table is not available, the server version information is +used instead. + +As of SQLAlchemy 1.4, the default max identifier length for the Oracle dialect +is 128 characters. Upon first connect, the compatibility version is detected +and if it is less than Oracle version 12.2, the max identifier length is +changed to be 30 characters. In all cases, setting the +:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this +change and the value given will be used as is:: + + engine = create_engine( + "oracle+cx_oracle://scott:tiger@oracle122", + max_identifier_length=30) + +The maximum identifier length comes into play both when generating anonymized +SQL labels in SELECT statements, but more crucially when generating constraint +names from a naming convention. It is this area that has created the need for +SQLAlchemy to change this default conservatively. For example, the following +naming convention produces two very different constraint names based on the +identifier length:: + + from sqlalchemy import Column + from sqlalchemy import Index + from sqlalchemy import Integer + from sqlalchemy import MetaData + from sqlalchemy import Table + from sqlalchemy.dialects import oracle + from sqlalchemy.schema import CreateIndex + + m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"}) + + t = Table( + "t", + m, + Column("some_column_name_1", Integer), + Column("some_column_name_2", Integer), + Column("some_column_name_3", Integer), + ) + + ix = Index( + None, + t.c.some_column_name_1, + t.c.some_column_name_2, + t.c.some_column_name_3, + ) + + oracle_dialect = oracle.dialect(max_identifier_length=30) + print(CreateIndex(ix).compile(dialect=oracle_dialect)) + +With an identifier length of 30, the above CREATE INDEX looks like:: + + CREATE INDEX ix_some_column_name_1s_70cd ON t + (some_column_name_1, some_column_name_2, some_column_name_3) + +However with length=128, it becomes:: + + CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t + (some_column_name_1, some_column_name_2, some_column_name_3) + +Applications which have run versions of SQLAlchemy prior to 1.4 on an Oracle +server version 12.2 or greater are therefore subject to the scenario of a +database migration that wishes to "DROP CONSTRAINT" on a name that was +previously generated with the shorter length. This migration will fail when +the identifier length is changed without the name of the index or constraint +first being adjusted. Such applications are strongly advised to make use of +:paramref:`_sa.create_engine.max_identifier_length` +in order to maintain control +of the generation of truncated names, and to fully review and test all database +migrations in a staging environment when changing this value to ensure that the +impact of this change has been mitigated. + +.. versionchanged:: 1.4 the default max_identifier_length for Oracle is 128 + characters, which is adjusted down to 30 upon first connect if an older + version of Oracle server (compatibility version < 12.2) is detected. + + +LIMIT/OFFSET/FETCH Support +-------------------------- + +Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make +use of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming +Oracle 12c or above, and assuming the SELECT statement is not embedded within +a compound statement like UNION. This syntax is also available directly by using +the :meth:`_sql.Select.fetch` method. + +.. versionchanged:: 2.0 the Oracle dialect now uses + ``FETCH FIRST N ROW / OFFSET N ROWS`` for all + :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` usage including + within the ORM and legacy :class:`_orm.Query`. To force the legacy + behavior using window functions, specify the ``enable_offset_fetch=False`` + dialect parameter to :func:`_sa.create_engine`. + +The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle version +by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`, which +will force the use of "legacy" mode that makes use of window functions. +This mode is also selected automatically when using a version of Oracle +prior to 12c. + +When using legacy mode, or when a :class:`.Select` statement +with limit/offset is embedded in a compound statement, an emulated approach for +LIMIT / OFFSET based on window functions is used, which involves creation of a +subquery using ``ROW_NUMBER`` that is prone to performance issues as well as +SQL construction issues for complex statements. However, this approach is +supported by all Oracle versions. See notes below. + +Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the +ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an +Oracle version prior to 12c, the following notes apply: + +* SQLAlchemy currently makes use of ROWNUM to achieve + LIMIT/OFFSET; the exact methodology is taken from + https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results . + +* the "FIRST_ROWS()" optimization keyword is not used by default. To enable + the usage of this optimization directive, specify ``optimize_limits=True`` + to :func:`_sa.create_engine`. + + .. versionchanged:: 1.4 + The Oracle dialect renders limit/offset integer values using a "post + compile" scheme which renders the integer directly before passing the + statement to the cursor for execution. The ``use_binds_for_limits`` flag + no longer has an effect. + + .. seealso:: + + :ref:`change_4808`. + +.. _oracle_returning: + +RETURNING Support +----------------- + +The Oracle database supports RETURNING fully for INSERT, UPDATE and DELETE +statements that are invoked with a single collection of bound parameters +(that is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally +support RETURNING with :term:`executemany` statements). Multiple rows may be +returned as well. + +.. versionchanged:: 2.0 the Oracle backend has full support for RETURNING + on parity with other backends. + + + +ON UPDATE CASCADE +----------------- + +Oracle doesn't have native ON UPDATE CASCADE functionality. A trigger based +solution is available at +https://asktom.oracle.com/tkyte/update_cascade/index.html . + +When using the SQLAlchemy ORM, the ORM has limited ability to manually issue +cascading updates - specify ForeignKey objects using the +"deferrable=True, initially='deferred'" keyword arguments, +and specify "passive_updates=False" on each relationship(). + +Oracle 8 Compatibility +---------------------- + +.. warning:: The status of Oracle 8 compatibility is not known for SQLAlchemy + 2.0. + +When Oracle 8 is detected, the dialect internally configures itself to the +following behaviors: + +* the use_ansi flag is set to False. This has the effect of converting all + JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN + makes use of Oracle's (+) operator. + +* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when + the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued + instead. This because these types don't seem to work correctly on Oracle 8 + even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and + :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate + NVARCHAR2 and NCLOB. + + +Synonym/DBLINK Reflection +------------------------- + +When using reflection with Table objects, the dialect can optionally search +for tables indicated by synonyms, either in local or remote schemas or +accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as +a keyword argument to the :class:`_schema.Table` construct:: + + some_table = Table('some_table', autoload_with=some_engine, + oracle_resolve_synonyms=True) + +When this flag is set, the given name (such as ``some_table`` above) will +be searched not just in the ``ALL_TABLES`` view, but also within the +``ALL_SYNONYMS`` view to see if this name is actually a synonym to another +name. If the synonym is located and refers to a DBLINK, the oracle dialect +knows how to locate the table's information using DBLINK syntax(e.g. +``@dblink``). + +``oracle_resolve_synonyms`` is accepted wherever reflection arguments are +accepted, including methods such as :meth:`_schema.MetaData.reflect` and +:meth:`_reflection.Inspector.get_columns`. + +If synonyms are not in use, this flag should be left disabled. + +.. _oracle_constraint_reflection: + +Constraint Reflection +--------------------- + +The Oracle dialect can return information about foreign key, unique, and +CHECK constraints, as well as indexes on tables. + +Raw information regarding these constraints can be acquired using +:meth:`_reflection.Inspector.get_foreign_keys`, +:meth:`_reflection.Inspector.get_unique_constraints`, +:meth:`_reflection.Inspector.get_check_constraints`, and +:meth:`_reflection.Inspector.get_indexes`. + +.. versionchanged:: 1.2 The Oracle dialect can now reflect UNIQUE and + CHECK constraints. + +When using reflection at the :class:`_schema.Table` level, the +:class:`_schema.Table` +will also include these constraints. + +Note the following caveats: + +* When using the :meth:`_reflection.Inspector.get_check_constraints` method, + Oracle + builds a special "IS NOT NULL" constraint for columns that specify + "NOT NULL". This constraint is **not** returned by default; to include + the "IS NOT NULL" constraints, pass the flag ``include_all=True``:: + + from sqlalchemy import create_engine, inspect + + engine = create_engine("oracle+cx_oracle://s:t@dsn") + inspector = inspect(engine) + all_check_constraints = inspector.get_check_constraints( + "some_table", include_all=True) + +* in most cases, when reflecting a :class:`_schema.Table`, + a UNIQUE constraint will + **not** be available as a :class:`.UniqueConstraint` object, as Oracle + mirrors unique constraints with a UNIQUE index in most cases (the exception + seems to be when two or more unique constraints represent the same columns); + the :class:`_schema.Table` will instead represent these using + :class:`.Index` + with the ``unique=True`` flag set. + +* Oracle creates an implicit index for the primary key of a table; this index + is **excluded** from all index results. + +* the list of columns reflected for an index will not include column names + that start with SYS_NC. + +Table names with SYSTEM/SYSAUX tablespaces +------------------------------------------- + +The :meth:`_reflection.Inspector.get_table_names` and +:meth:`_reflection.Inspector.get_temp_table_names` +methods each return a list of table names for the current engine. These methods +are also part of the reflection which occurs within an operation such as +:meth:`_schema.MetaData.reflect`. By default, +these operations exclude the ``SYSTEM`` +and ``SYSAUX`` tablespaces from the operation. In order to change this, the +default list of tablespaces excluded can be changed at the engine level using +the ``exclude_tablespaces`` parameter:: + + # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM + e = create_engine( + "oracle+cx_oracle://scott:tiger@xe", + exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"]) + +DateTime Compatibility +---------------------- + +Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``, +which can actually store a date and time value. For this reason, the Oracle +dialect provides a type :class:`_oracle.DATE` which is a subclass of +:class:`.DateTime`. This type has no special behavior, and is only +present as a "marker" for this type; additionally, when a database column +is reflected and the type is reported as ``DATE``, the time-supporting +:class:`_oracle.DATE` type is used. + +.. _oracle_table_options: + +Oracle Table Options +------------------------- + +The CREATE TABLE phrase supports the following options with Oracle +in conjunction with the :class:`_schema.Table` construct: + + +* ``ON COMMIT``:: + + Table( + "some_table", metadata, ..., + prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS') + +* ``COMPRESS``:: + + Table('mytable', metadata, Column('data', String(32)), + oracle_compress=True) + + Table('mytable', metadata, Column('data', String(32)), + oracle_compress=6) + + The ``oracle_compress`` parameter accepts either an integer compression + level, or ``True`` to use the default compression level. + +.. _oracle_index_options: + +Oracle Specific Index Options +----------------------------- + +Bitmap Indexes +~~~~~~~~~~~~~~ + +You can specify the ``oracle_bitmap`` parameter to create a bitmap index +instead of a B-tree index:: + + Index('my_index', my_table.c.data, oracle_bitmap=True) + +Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not +check for such limitations, only the database will. + +Index compression +~~~~~~~~~~~~~~~~~ + +Oracle has a more efficient storage mode for indexes containing lots of +repeated values. Use the ``oracle_compress`` parameter to turn on key +compression:: + + Index('my_index', my_table.c.data, oracle_compress=True) + + Index('my_index', my_table.c.data1, my_table.c.data2, unique=True, + oracle_compress=1) + +The ``oracle_compress`` parameter accepts either an integer specifying the +number of prefix columns to compress, or ``True`` to use the default (all +columns for non-unique indexes, all but the last column for unique indexes). + +""" # noqa + +from __future__ import annotations + +from collections import defaultdict +from functools import lru_cache +from functools import wraps +import re + +from . import dictionary +from .types import _OracleBoolean +from .types import _OracleDate +from .types import BFILE +from .types import BINARY_DOUBLE +from .types import BINARY_FLOAT +from .types import DATE +from .types import FLOAT +from .types import INTERVAL +from .types import LONG +from .types import NCLOB +from .types import NUMBER +from .types import NVARCHAR2 # noqa +from .types import OracleRaw # noqa +from .types import RAW +from .types import ROWID # noqa +from .types import TIMESTAMP +from .types import VARCHAR2 # noqa +from ... import Computed +from ... import exc +from ... import schema as sa_schema +from ... import sql +from ... import util +from ...engine import default +from ...engine import ObjectKind +from ...engine import ObjectScope +from ...engine import reflection +from ...engine.reflection import ReflectionDefaults +from ...sql import and_ +from ...sql import bindparam +from ...sql import compiler +from ...sql import expression +from ...sql import func +from ...sql import null +from ...sql import or_ +from ...sql import select +from ...sql import sqltypes +from ...sql import util as sql_util +from ...sql import visitors +from ...sql.visitors import InternalTraversal +from ...types import BLOB +from ...types import CHAR +from ...types import CLOB +from ...types import DOUBLE_PRECISION +from ...types import INTEGER +from ...types import NCHAR +from ...types import NVARCHAR +from ...types import REAL +from ...types import VARCHAR + +RESERVED_WORDS = set( + "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN " + "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED " + "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE " + "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE " + "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES " + "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS " + "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER " + "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR " + "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split() +) + +NO_ARG_FNS = set( + "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split() +) + + +colspecs = { + sqltypes.Boolean: _OracleBoolean, + sqltypes.Interval: INTERVAL, + sqltypes.DateTime: DATE, + sqltypes.Date: _OracleDate, +} + +ischema_names = { + "VARCHAR2": VARCHAR, + "NVARCHAR2": NVARCHAR, + "CHAR": CHAR, + "NCHAR": NCHAR, + "DATE": DATE, + "NUMBER": NUMBER, + "BLOB": BLOB, + "BFILE": BFILE, + "CLOB": CLOB, + "NCLOB": NCLOB, + "TIMESTAMP": TIMESTAMP, + "TIMESTAMP WITH TIME ZONE": TIMESTAMP, + "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP, + "INTERVAL DAY TO SECOND": INTERVAL, + "RAW": RAW, + "FLOAT": FLOAT, + "DOUBLE PRECISION": DOUBLE_PRECISION, + "REAL": REAL, + "LONG": LONG, + "BINARY_DOUBLE": BINARY_DOUBLE, + "BINARY_FLOAT": BINARY_FLOAT, + "ROWID": ROWID, +} + + +class OracleTypeCompiler(compiler.GenericTypeCompiler): + # Note: + # Oracle DATE == DATETIME + # Oracle does not allow milliseconds in DATE + # Oracle does not support TIME columns + + def visit_datetime(self, type_, **kw): + return self.visit_DATE(type_, **kw) + + def visit_float(self, type_, **kw): + return self.visit_FLOAT(type_, **kw) + + def visit_double(self, type_, **kw): + return self.visit_DOUBLE_PRECISION(type_, **kw) + + def visit_unicode(self, type_, **kw): + if self.dialect._use_nchar_for_unicode: + return self.visit_NVARCHAR2(type_, **kw) + else: + return self.visit_VARCHAR2(type_, **kw) + + def visit_INTERVAL(self, type_, **kw): + return "INTERVAL DAY%s TO SECOND%s" % ( + type_.day_precision is not None + and "(%d)" % type_.day_precision + or "", + type_.second_precision is not None + and "(%d)" % type_.second_precision + or "", + ) + + def visit_LONG(self, type_, **kw): + return "LONG" + + def visit_TIMESTAMP(self, type_, **kw): + if getattr(type_, "local_timezone", False): + return "TIMESTAMP WITH LOCAL TIME ZONE" + elif type_.timezone: + return "TIMESTAMP WITH TIME ZONE" + else: + return "TIMESTAMP" + + def visit_DOUBLE_PRECISION(self, type_, **kw): + return self._generate_numeric(type_, "DOUBLE PRECISION", **kw) + + def visit_BINARY_DOUBLE(self, type_, **kw): + return self._generate_numeric(type_, "BINARY_DOUBLE", **kw) + + def visit_BINARY_FLOAT(self, type_, **kw): + return self._generate_numeric(type_, "BINARY_FLOAT", **kw) + + def visit_FLOAT(self, type_, **kw): + kw["_requires_binary_precision"] = True + return self._generate_numeric(type_, "FLOAT", **kw) + + def visit_NUMBER(self, type_, **kw): + return self._generate_numeric(type_, "NUMBER", **kw) + + def _generate_numeric( + self, + type_, + name, + precision=None, + scale=None, + _requires_binary_precision=False, + **kw, + ): + if precision is None: + precision = getattr(type_, "precision", None) + + if _requires_binary_precision: + binary_precision = getattr(type_, "binary_precision", None) + + if precision and binary_precision is None: + # https://www.oracletutorial.com/oracle-basics/oracle-float/ + estimated_binary_precision = int(precision / 0.30103) + raise exc.ArgumentError( + "Oracle FLOAT types use 'binary precision', which does " + "not convert cleanly from decimal 'precision'. Please " + "specify " + f"this type with a separate Oracle variant, such as " + f"{type_.__class__.__name__}(precision={precision})." + f"with_variant(oracle.FLOAT" + f"(binary_precision=" + f"{estimated_binary_precision}), 'oracle'), so that the " + "Oracle specific 'binary_precision' may be specified " + "accurately." + ) + else: + precision = binary_precision + + if scale is None: + scale = getattr(type_, "scale", None) + + if precision is None: + return name + elif scale is None: + n = "%(name)s(%(precision)s)" + return n % {"name": name, "precision": precision} + else: + n = "%(name)s(%(precision)s, %(scale)s)" + return n % {"name": name, "precision": precision, "scale": scale} + + def visit_string(self, type_, **kw): + return self.visit_VARCHAR2(type_, **kw) + + def visit_VARCHAR2(self, type_, **kw): + return self._visit_varchar(type_, "", "2") + + def visit_NVARCHAR2(self, type_, **kw): + return self._visit_varchar(type_, "N", "2") + + visit_NVARCHAR = visit_NVARCHAR2 + + def visit_VARCHAR(self, type_, **kw): + return self._visit_varchar(type_, "", "") + + def _visit_varchar(self, type_, n, num): + if not type_.length: + return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n} + elif not n and self.dialect._supports_char_length: + varchar = "VARCHAR%(two)s(%(length)s CHAR)" + return varchar % {"length": type_.length, "two": num} + else: + varchar = "%(n)sVARCHAR%(two)s(%(length)s)" + return varchar % {"length": type_.length, "two": num, "n": n} + + def visit_text(self, type_, **kw): + return self.visit_CLOB(type_, **kw) + + def visit_unicode_text(self, type_, **kw): + if self.dialect._use_nchar_for_unicode: + return self.visit_NCLOB(type_, **kw) + else: + return self.visit_CLOB(type_, **kw) + + def visit_large_binary(self, type_, **kw): + return self.visit_BLOB(type_, **kw) + + def visit_big_integer(self, type_, **kw): + return self.visit_NUMBER(type_, precision=19, **kw) + + def visit_boolean(self, type_, **kw): + return self.visit_SMALLINT(type_, **kw) + + def visit_RAW(self, type_, **kw): + if type_.length: + return "RAW(%(length)s)" % {"length": type_.length} + else: + return "RAW" + + def visit_ROWID(self, type_, **kw): + return "ROWID" + + +class OracleCompiler(compiler.SQLCompiler): + """Oracle compiler modifies the lexical structure of Select + statements to work under non-ANSI configured Oracle databases, if + the use_ansi flag is False. + """ + + compound_keywords = util.update_copy( + compiler.SQLCompiler.compound_keywords, + {expression.CompoundSelect.EXCEPT: "MINUS"}, + ) + + def __init__(self, *args, **kwargs): + self.__wheres = {} + super().__init__(*args, **kwargs) + + def visit_mod_binary(self, binary, operator, **kw): + return "mod(%s, %s)" % ( + self.process(binary.left, **kw), + self.process(binary.right, **kw), + ) + + def visit_now_func(self, fn, **kw): + return "CURRENT_TIMESTAMP" + + def visit_char_length_func(self, fn, **kw): + return "LENGTH" + self.function_argspec(fn, **kw) + + def visit_match_op_binary(self, binary, operator, **kw): + return "CONTAINS (%s, %s)" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_true(self, expr, **kw): + return "1" + + def visit_false(self, expr, **kw): + return "0" + + def get_cte_preamble(self, recursive): + return "WITH" + + def get_select_hint_text(self, byfroms): + return " ".join("/*+ %s */" % text for table, text in byfroms.items()) + + def function_argspec(self, fn, **kw): + if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS: + return compiler.SQLCompiler.function_argspec(self, fn, **kw) + else: + return "" + + def visit_function(self, func, **kw): + text = super().visit_function(func, **kw) + if kw.get("asfrom", False): + text = "TABLE (%s)" % text + return text + + def visit_table_valued_column(self, element, **kw): + text = super().visit_table_valued_column(element, **kw) + text = text + ".COLUMN_VALUE" + return text + + def default_from(self): + """Called when a ``SELECT`` statement has no froms, + and no ``FROM`` clause is to be appended. + + The Oracle compiler tacks a "FROM DUAL" to the statement. + """ + + return " FROM DUAL" + + def visit_join(self, join, from_linter=None, **kwargs): + if self.dialect.use_ansi: + return compiler.SQLCompiler.visit_join( + self, join, from_linter=from_linter, **kwargs + ) + else: + if from_linter: + from_linter.edges.add((join.left, join.right)) + + kwargs["asfrom"] = True + if isinstance(join.right, expression.FromGrouping): + right = join.right.element + else: + right = join.right + return ( + self.process(join.left, from_linter=from_linter, **kwargs) + + ", " + + self.process(right, from_linter=from_linter, **kwargs) + ) + + def _get_nonansi_join_whereclause(self, froms): + clauses = [] + + def visit_join(join): + if join.isouter: + # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354 + # "apply the outer join operator (+) to all columns of B in + # the join condition in the WHERE clause" - that is, + # unconditionally regardless of operator or the other side + def visit_binary(binary): + if isinstance( + binary.left, expression.ColumnClause + ) and join.right.is_derived_from(binary.left.table): + binary.left = _OuterJoinColumn(binary.left) + elif isinstance( + binary.right, expression.ColumnClause + ) and join.right.is_derived_from(binary.right.table): + binary.right = _OuterJoinColumn(binary.right) + + clauses.append( + visitors.cloned_traverse( + join.onclause, {}, {"binary": visit_binary} + ) + ) + else: + clauses.append(join.onclause) + + for j in join.left, join.right: + if isinstance(j, expression.Join): + visit_join(j) + elif isinstance(j, expression.FromGrouping): + visit_join(j.element) + + for f in froms: + if isinstance(f, expression.Join): + visit_join(f) + + if not clauses: + return None + else: + return sql.and_(*clauses) + + def visit_outer_join_column(self, vc, **kw): + return self.process(vc.column, **kw) + "(+)" + + def visit_sequence(self, seq, **kw): + return self.preparer.format_sequence(seq) + ".nextval" + + def get_render_as_alias_suffix(self, alias_name_text): + """Oracle doesn't like ``FROM table AS alias``""" + + return " " + alias_name_text + + def returning_clause( + self, stmt, returning_cols, *, populate_result_map, **kw + ): + columns = [] + binds = [] + + for i, column in enumerate( + expression._select_iterables(returning_cols) + ): + if ( + self.isupdate + and isinstance(column, sa_schema.Column) + and isinstance(column.server_default, Computed) + and not self.dialect._supports_update_returning_computed_cols + ): + util.warn( + "Computed columns don't work with Oracle UPDATE " + "statements that use RETURNING; the value of the column " + "*before* the UPDATE takes place is returned. It is " + "advised to not use RETURNING with an Oracle computed " + "column. Consider setting implicit_returning to False on " + "the Table object in order to avoid implicit RETURNING " + "clauses from being generated for this Table." + ) + if column.type._has_column_expression: + col_expr = column.type.column_expression(column) + else: + col_expr = column + + outparam = sql.outparam("ret_%d" % i, type_=column.type) + self.binds[outparam.key] = outparam + binds.append( + self.bindparam_string(self._truncate_bindparam(outparam)) + ) + + # has_out_parameters would in a normal case be set to True + # as a result of the compiler visiting an outparam() object. + # in this case, the above outparam() objects are not being + # visited. Ensure the statement itself didn't have other + # outparam() objects independently. + # technically, this could be supported, but as it would be + # a very strange use case without a clear rationale, disallow it + if self.has_out_parameters: + raise exc.InvalidRequestError( + "Using explicit outparam() objects with " + "UpdateBase.returning() in the same Core DML statement " + "is not supported in the Oracle dialect." + ) + + self._oracle_returning = True + + columns.append(self.process(col_expr, within_columns_clause=False)) + if populate_result_map: + self._add_to_result_map( + getattr(col_expr, "name", col_expr._anon_name_label), + getattr(col_expr, "name", col_expr._anon_name_label), + ( + column, + getattr(column, "name", None), + getattr(column, "key", None), + ), + column.type, + ) + + return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds) + + def _row_limit_clause(self, select, **kw): + """ORacle 12c supports OFFSET/FETCH operators + Use it instead subquery with row_number + + """ + + if ( + select._fetch_clause is not None + or not self.dialect._supports_offset_fetch + ): + return super()._row_limit_clause( + select, use_literal_execute_for_simple_int=True, **kw + ) + else: + return self.fetch_clause( + select, + fetch_clause=self._get_limit_or_fetch(select), + use_literal_execute_for_simple_int=True, + **kw, + ) + + def _get_limit_or_fetch(self, select): + if select._fetch_clause is None: + return select._limit_clause + else: + return select._fetch_clause + + def translate_select_structure(self, select_stmt, **kwargs): + select = select_stmt + + if not getattr(select, "_oracle_visit", None): + if not self.dialect.use_ansi: + froms = self._display_froms_for_select( + select, kwargs.get("asfrom", False) + ) + whereclause = self._get_nonansi_join_whereclause(froms) + if whereclause is not None: + select = select.where(whereclause) + select._oracle_visit = True + + # if fetch is used this is not needed + if ( + select._has_row_limiting_clause + and not self.dialect._supports_offset_fetch + and select._fetch_clause is None + ): + limit_clause = select._limit_clause + offset_clause = select._offset_clause + + if select._simple_int_clause(limit_clause): + limit_clause = limit_clause.render_literal_execute() + + if select._simple_int_clause(offset_clause): + offset_clause = offset_clause.render_literal_execute() + + # currently using form at: + # https://blogs.oracle.com/oraclemagazine/\ + # on-rownum-and-limiting-results + + orig_select = select + select = select._generate() + select._oracle_visit = True + + # add expressions to accommodate FOR UPDATE OF + for_update = select._for_update_arg + if for_update is not None and for_update.of: + for_update = for_update._clone() + for_update._copy_internals() + + for elem in for_update.of: + if not select.selected_columns.contains_column(elem): + select = select.add_columns(elem) + + # Wrap the middle select and add the hint + inner_subquery = select.alias() + limitselect = sql.select( + *[ + c + for c in inner_subquery.c + if orig_select.selected_columns.corresponding_column(c) + is not None + ] + ) + + if ( + limit_clause is not None + and self.dialect.optimize_limits + and select._simple_int_clause(limit_clause) + ): + limitselect = limitselect.prefix_with( + expression.text( + "/*+ FIRST_ROWS(%s) */" + % self.process(limit_clause, **kwargs) + ) + ) + + limitselect._oracle_visit = True + limitselect._is_wrapper = True + + # add expressions to accommodate FOR UPDATE OF + if for_update is not None and for_update.of: + adapter = sql_util.ClauseAdapter(inner_subquery) + for_update.of = [ + adapter.traverse(elem) for elem in for_update.of + ] + + # If needed, add the limiting clause + if limit_clause is not None: + if select._simple_int_clause(limit_clause) and ( + offset_clause is None + or select._simple_int_clause(offset_clause) + ): + max_row = limit_clause + + if offset_clause is not None: + max_row = max_row + offset_clause + + else: + max_row = limit_clause + + if offset_clause is not None: + max_row = max_row + offset_clause + limitselect = limitselect.where( + sql.literal_column("ROWNUM") <= max_row + ) + + # If needed, add the ora_rn, and wrap again with offset. + if offset_clause is None: + limitselect._for_update_arg = for_update + select = limitselect + else: + limitselect = limitselect.add_columns( + sql.literal_column("ROWNUM").label("ora_rn") + ) + limitselect._oracle_visit = True + limitselect._is_wrapper = True + + if for_update is not None and for_update.of: + limitselect_cols = limitselect.selected_columns + for elem in for_update.of: + if ( + limitselect_cols.corresponding_column(elem) + is None + ): + limitselect = limitselect.add_columns(elem) + + limit_subquery = limitselect.alias() + origselect_cols = orig_select.selected_columns + offsetselect = sql.select( + *[ + c + for c in limit_subquery.c + if origselect_cols.corresponding_column(c) + is not None + ] + ) + + offsetselect._oracle_visit = True + offsetselect._is_wrapper = True + + if for_update is not None and for_update.of: + adapter = sql_util.ClauseAdapter(limit_subquery) + for_update.of = [ + adapter.traverse(elem) for elem in for_update.of + ] + + offsetselect = offsetselect.where( + sql.literal_column("ora_rn") > offset_clause + ) + + offsetselect._for_update_arg = for_update + select = offsetselect + + return select + + def limit_clause(self, select, **kw): + return "" + + def visit_empty_set_expr(self, type_, **kw): + return "SELECT 1 FROM DUAL WHERE 1!=1" + + def for_update_clause(self, select, **kw): + if self.is_subquery(): + return "" + + tmp = " FOR UPDATE" + + if select._for_update_arg.of: + tmp += " OF " + ", ".join( + self.process(elem, **kw) for elem in select._for_update_arg.of + ) + + if select._for_update_arg.nowait: + tmp += " NOWAIT" + if select._for_update_arg.skip_locked: + tmp += " SKIP LOCKED" + + return tmp + + def visit_is_distinct_from_binary(self, binary, operator, **kw): + return "DECODE(%s, %s, 0, 1) = 1" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_is_not_distinct_from_binary(self, binary, operator, **kw): + return "DECODE(%s, %s, 0, 1) = 0" % ( + self.process(binary.left), + self.process(binary.right), + ) + + def visit_regexp_match_op_binary(self, binary, operator, **kw): + string = self.process(binary.left, **kw) + pattern = self.process(binary.right, **kw) + flags = binary.modifiers["flags"] + if flags is None: + return "REGEXP_LIKE(%s, %s)" % (string, pattern) + else: + return "REGEXP_LIKE(%s, %s, %s)" % ( + string, + pattern, + self.render_literal_value(flags, sqltypes.STRINGTYPE), + ) + + def visit_not_regexp_match_op_binary(self, binary, operator, **kw): + return "NOT %s" % self.visit_regexp_match_op_binary( + binary, operator, **kw + ) + + def visit_regexp_replace_op_binary(self, binary, operator, **kw): + string = self.process(binary.left, **kw) + pattern_replace = self.process(binary.right, **kw) + flags = binary.modifiers["flags"] + if flags is None: + return "REGEXP_REPLACE(%s, %s)" % ( + string, + pattern_replace, + ) + else: + return "REGEXP_REPLACE(%s, %s, %s)" % ( + string, + pattern_replace, + self.render_literal_value(flags, sqltypes.STRINGTYPE), + ) + + def visit_aggregate_strings_func(self, fn, **kw): + return "LISTAGG%s" % self.function_argspec(fn, **kw) + + +class OracleDDLCompiler(compiler.DDLCompiler): + def define_constraint_cascades(self, constraint): + text = "" + if constraint.ondelete is not None: + text += " ON DELETE %s" % constraint.ondelete + + # oracle has no ON UPDATE CASCADE - + # its only available via triggers + # https://asktom.oracle.com/tkyte/update_cascade/index.html + if constraint.onupdate is not None: + util.warn( + "Oracle does not contain native UPDATE CASCADE " + "functionality - onupdates will not be rendered for foreign " + "keys. Consider using deferrable=True, initially='deferred' " + "or triggers." + ) + + return text + + def visit_drop_table_comment(self, drop, **kw): + return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table( + drop.element + ) + + def visit_create_index(self, create, **kw): + index = create.element + self._verify_index_table(index) + preparer = self.preparer + text = "CREATE " + if index.unique: + text += "UNIQUE " + if index.dialect_options["oracle"]["bitmap"]: + text += "BITMAP " + text += "INDEX %s ON %s (%s)" % ( + self._prepared_index_name(index, include_schema=True), + preparer.format_table(index.table, use_schema=True), + ", ".join( + self.sql_compiler.process( + expr, include_table=False, literal_binds=True + ) + for expr in index.expressions + ), + ) + if index.dialect_options["oracle"]["compress"] is not False: + if index.dialect_options["oracle"]["compress"] is True: + text += " COMPRESS" + else: + text += " COMPRESS %d" % ( + index.dialect_options["oracle"]["compress"] + ) + return text + + def post_create_table(self, table): + table_opts = [] + opts = table.dialect_options["oracle"] + + if opts["on_commit"]: + on_commit_options = opts["on_commit"].replace("_", " ").upper() + table_opts.append("\n ON COMMIT %s" % on_commit_options) + + if opts["compress"]: + if opts["compress"] is True: + table_opts.append("\n COMPRESS") + else: + table_opts.append("\n COMPRESS FOR %s" % (opts["compress"])) + + return "".join(table_opts) + + def get_identity_options(self, identity_options): + text = super().get_identity_options(identity_options) + text = text.replace("NO MINVALUE", "NOMINVALUE") + text = text.replace("NO MAXVALUE", "NOMAXVALUE") + text = text.replace("NO CYCLE", "NOCYCLE") + if identity_options.order is not None: + text += " ORDER" if identity_options.order else " NOORDER" + return text.strip() + + def visit_computed_column(self, generated, **kw): + text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( + generated.sqltext, include_table=False, literal_binds=True + ) + if generated.persisted is True: + raise exc.CompileError( + "Oracle computed columns do not support 'stored' persistence; " + "set the 'persisted' flag to None or False for Oracle support." + ) + elif generated.persisted is False: + text += " VIRTUAL" + return text + + def visit_identity_column(self, identity, **kw): + if identity.always is None: + kind = "" + else: + kind = "ALWAYS" if identity.always else "BY DEFAULT" + text = "GENERATED %s" % kind + if identity.on_null: + text += " ON NULL" + text += " AS IDENTITY" + options = self.get_identity_options(identity) + if options: + text += " (%s)" % options + return text + + +class OracleIdentifierPreparer(compiler.IdentifierPreparer): + reserved_words = {x.lower() for x in RESERVED_WORDS} + illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union( + ["_", "$"] + ) + + def _bindparam_requires_quotes(self, value): + """Return True if the given identifier requires quoting.""" + lc_value = value.lower() + return ( + lc_value in self.reserved_words + or value[0] in self.illegal_initial_characters + or not self.legal_characters.match(str(value)) + ) + + def format_savepoint(self, savepoint): + name = savepoint.ident.lstrip("_") + return super().format_savepoint(savepoint, name) + + +class OracleExecutionContext(default.DefaultExecutionContext): + def fire_sequence(self, seq, type_): + return self._execute_scalar( + "SELECT " + + self.identifier_preparer.format_sequence(seq) + + ".nextval FROM DUAL", + type_, + ) + + def pre_exec(self): + if self.statement and "_oracle_dblink" in self.execution_options: + self.statement = self.statement.replace( + dictionary.DB_LINK_PLACEHOLDER, + self.execution_options["_oracle_dblink"], + ) + + +class OracleDialect(default.DefaultDialect): + name = "oracle" + supports_statement_cache = True + supports_alter = True + max_identifier_length = 128 + + _supports_offset_fetch = True + + insert_returning = True + update_returning = True + delete_returning = True + + div_is_floordiv = False + + supports_simple_order_by_label = False + cte_follows_insert = True + returns_native_bytes = True + + supports_sequences = True + sequences_optional = False + postfetch_lastrowid = False + + default_paramstyle = "named" + colspecs = colspecs + ischema_names = ischema_names + requires_name_normalize = True + + supports_comments = True + + supports_default_values = False + supports_default_metavalue = True + supports_empty_insert = False + supports_identity_columns = True + + statement_compiler = OracleCompiler + ddl_compiler = OracleDDLCompiler + type_compiler_cls = OracleTypeCompiler + preparer = OracleIdentifierPreparer + execution_ctx_cls = OracleExecutionContext + + reflection_options = ("oracle_resolve_synonyms",) + + _use_nchar_for_unicode = False + + construct_arguments = [ + ( + sa_schema.Table, + {"resolve_synonyms": False, "on_commit": None, "compress": False}, + ), + (sa_schema.Index, {"bitmap": False, "compress": False}), + ] + + @util.deprecated_params( + use_binds_for_limits=( + "1.4", + "The ``use_binds_for_limits`` Oracle dialect parameter is " + "deprecated. The dialect now renders LIMIT /OFFSET integers " + "inline in all cases using a post-compilation hook, so that the " + "value is still represented by a 'bound parameter' on the Core " + "Expression side.", + ) + ) + def __init__( + self, + use_ansi=True, + optimize_limits=False, + use_binds_for_limits=None, + use_nchar_for_unicode=False, + exclude_tablespaces=("SYSTEM", "SYSAUX"), + enable_offset_fetch=True, + **kwargs, + ): + default.DefaultDialect.__init__(self, **kwargs) + self._use_nchar_for_unicode = use_nchar_for_unicode + self.use_ansi = use_ansi + self.optimize_limits = optimize_limits + self.exclude_tablespaces = exclude_tablespaces + self.enable_offset_fetch = self._supports_offset_fetch = ( + enable_offset_fetch + ) + + def initialize(self, connection): + super().initialize(connection) + + # Oracle 8i has RETURNING: + # https://docs.oracle.com/cd/A87860_01/doc/index.htm + + # so does Oracle8: + # https://docs.oracle.com/cd/A64702_01/doc/index.htm + + if self._is_oracle_8: + self.colspecs = self.colspecs.copy() + self.colspecs.pop(sqltypes.Interval) + self.use_ansi = False + + self.supports_identity_columns = self.server_version_info >= (12,) + self._supports_offset_fetch = ( + self.enable_offset_fetch and self.server_version_info >= (12,) + ) + + def _get_effective_compat_server_version_info(self, connection): + # dialect does not need compat levels below 12.2, so don't query + # in those cases + + if self.server_version_info < (12, 2): + return self.server_version_info + try: + compat = connection.exec_driver_sql( + "SELECT value FROM v$parameter WHERE name = 'compatible'" + ).scalar() + except exc.DBAPIError: + compat = None + + if compat: + try: + return tuple(int(x) for x in compat.split(".")) + except: + return self.server_version_info + else: + return self.server_version_info + + @property + def _is_oracle_8(self): + return self.server_version_info and self.server_version_info < (9,) + + @property + def _supports_table_compression(self): + return self.server_version_info and self.server_version_info >= (10, 1) + + @property + def _supports_table_compress_for(self): + return self.server_version_info and self.server_version_info >= (11,) + + @property + def _supports_char_length(self): + return not self._is_oracle_8 + + @property + def _supports_update_returning_computed_cols(self): + # on version 18 this error is no longet present while it happens on 11 + # it may work also on versions before the 18 + return self.server_version_info and self.server_version_info >= (18,) + + @property + def _supports_except_all(self): + return self.server_version_info and self.server_version_info >= (21,) + + def do_release_savepoint(self, connection, name): + # Oracle does not support RELEASE SAVEPOINT + pass + + def _check_max_identifier_length(self, connection): + if self._get_effective_compat_server_version_info(connection) < ( + 12, + 2, + ): + return 30 + else: + # use the default + return None + + def get_isolation_level_values(self, dbapi_connection): + return ["READ COMMITTED", "SERIALIZABLE"] + + def get_default_isolation_level(self, dbapi_conn): + try: + return self.get_isolation_level(dbapi_conn) + except NotImplementedError: + raise + except: + return "READ COMMITTED" + + def _execute_reflection( + self, connection, query, dblink, returns_long, params=None + ): + if dblink and not dblink.startswith("@"): + dblink = f"@{dblink}" + execution_options = { + # handle db links + "_oracle_dblink": dblink or "", + # override any schema translate map + "schema_translate_map": None, + } + + if dblink and returns_long: + # Oracle seems to error with + # "ORA-00997: illegal use of LONG datatype" when returning + # LONG columns via a dblink in a query with bind params + # This type seems to be very hard to cast into something else + # so it seems easier to just use bind param in this case + def visit_bindparam(bindparam): + bindparam.literal_execute = True + + query = visitors.cloned_traverse( + query, {}, {"bindparam": visit_bindparam} + ) + return connection.execute( + query, params, execution_options=execution_options + ) + + @util.memoized_property + def _has_table_query(self): + # materialized views are returned by all_tables + tables = ( + select( + dictionary.all_tables.c.table_name, + dictionary.all_tables.c.owner, + ) + .union_all( + select( + dictionary.all_views.c.view_name.label("table_name"), + dictionary.all_views.c.owner, + ) + ) + .subquery("tables_and_views") + ) + + query = select(tables.c.table_name).where( + tables.c.table_name == bindparam("table_name"), + tables.c.owner == bindparam("owner"), + ) + return query + + @reflection.cache + def has_table( + self, connection, table_name, schema=None, dblink=None, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + self._ensure_has_table_connection(connection) + + if not schema: + schema = self.default_schema_name + + params = { + "table_name": self.denormalize_name(table_name), + "owner": self.denormalize_schema_name(schema), + } + cursor = self._execute_reflection( + connection, + self._has_table_query, + dblink, + returns_long=False, + params=params, + ) + return bool(cursor.scalar()) + + @reflection.cache + def has_sequence( + self, connection, sequence_name, schema=None, dblink=None, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + + query = select(dictionary.all_sequences.c.sequence_name).where( + dictionary.all_sequences.c.sequence_name + == self.denormalize_schema_name(sequence_name), + dictionary.all_sequences.c.sequence_owner + == self.denormalize_schema_name(schema), + ) + + cursor = self._execute_reflection( + connection, query, dblink, returns_long=False + ) + return bool(cursor.scalar()) + + def _get_default_schema_name(self, connection): + return self.normalize_name( + connection.exec_driver_sql( + "select sys_context( 'userenv', 'current_schema' ) from dual" + ).scalar() + ) + + def denormalize_schema_name(self, name): + # look for quoted_name + force = getattr(name, "quote", None) + if force is None and name == "public": + # look for case insensitive, no quoting specified, "public" + return "PUBLIC" + return super().denormalize_name(name) + + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("filter_names", InternalTraversal.dp_string_list), + ("dblink", InternalTraversal.dp_string), + ) + def _get_synonyms(self, connection, schema, filter_names, dblink, **kw): + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + has_filter_names, params = self._prepare_filter_names(filter_names) + query = select( + dictionary.all_synonyms.c.synonym_name, + dictionary.all_synonyms.c.table_name, + dictionary.all_synonyms.c.table_owner, + dictionary.all_synonyms.c.db_link, + ).where(dictionary.all_synonyms.c.owner == owner) + if has_filter_names: + query = query.where( + dictionary.all_synonyms.c.synonym_name.in_( + params["filter_names"] + ) + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).mappings() + return result.all() + + @lru_cache() + def _all_objects_query( + self, owner, scope, kind, has_filter_names, has_mat_views + ): + query = ( + select(dictionary.all_objects.c.object_name) + .select_from(dictionary.all_objects) + .where(dictionary.all_objects.c.owner == owner) + ) + + # NOTE: materialized views are listed in all_objects twice; + # once as MATERIALIZE VIEW and once as TABLE + if kind is ObjectKind.ANY: + # materilaized view are listed also as tables so there is no + # need to add them to the in_. + query = query.where( + dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW")) + ) + else: + object_type = [] + if ObjectKind.VIEW in kind: + object_type.append("VIEW") + if ( + ObjectKind.MATERIALIZED_VIEW in kind + and ObjectKind.TABLE not in kind + ): + # materilaized view are listed also as tables so there is no + # need to add them to the in_ if also selecting tables. + object_type.append("MATERIALIZED VIEW") + if ObjectKind.TABLE in kind: + object_type.append("TABLE") + if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind: + # materialized view are listed also as tables, + # so they need to be filtered out + # EXCEPT ALL / MINUS profiles as faster than using + # NOT EXISTS or NOT IN with a subquery, but it's in + # general faster to get the mat view names and exclude + # them only when needed + query = query.where( + dictionary.all_objects.c.object_name.not_in( + bindparam("mat_views") + ) + ) + query = query.where( + dictionary.all_objects.c.object_type.in_(object_type) + ) + + # handles scope + if scope is ObjectScope.DEFAULT: + query = query.where(dictionary.all_objects.c.temporary == "N") + elif scope is ObjectScope.TEMPORARY: + query = query.where(dictionary.all_objects.c.temporary == "Y") + + if has_filter_names: + query = query.where( + dictionary.all_objects.c.object_name.in_( + bindparam("filter_names") + ) + ) + return query + + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("scope", InternalTraversal.dp_plain_obj), + ("kind", InternalTraversal.dp_plain_obj), + ("filter_names", InternalTraversal.dp_string_list), + ("dblink", InternalTraversal.dp_string), + ) + def _get_all_objects( + self, connection, schema, scope, kind, filter_names, dblink, **kw + ): + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + has_filter_names, params = self._prepare_filter_names(filter_names) + has_mat_views = False + if ( + ObjectKind.TABLE in kind + and ObjectKind.MATERIALIZED_VIEW not in kind + ): + # see note in _all_objects_query + mat_views = self.get_materialized_view_names( + connection, schema, dblink, _normalize=False, **kw + ) + if mat_views: + params["mat_views"] = mat_views + has_mat_views = True + + query = self._all_objects_query( + owner, scope, kind, has_filter_names, has_mat_views + ) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False, params=params + ).scalars() + + return result.all() + + def _handle_synonyms_decorator(fn): + @wraps(fn) + def wrapper(self, *args, **kwargs): + return self._handle_synonyms(fn, *args, **kwargs) + + return wrapper + + def _handle_synonyms(self, fn, connection, *args, **kwargs): + if not kwargs.get("oracle_resolve_synonyms", False): + return fn(self, connection, *args, **kwargs) + + original_kw = kwargs.copy() + schema = kwargs.pop("schema", None) + result = self._get_synonyms( + connection, + schema=schema, + filter_names=kwargs.pop("filter_names", None), + dblink=kwargs.pop("dblink", None), + info_cache=kwargs.get("info_cache", None), + ) + + dblinks_owners = defaultdict(dict) + for row in result: + key = row["db_link"], row["table_owner"] + tn = self.normalize_name(row["table_name"]) + dblinks_owners[key][tn] = row["synonym_name"] + + if not dblinks_owners: + # No synonym, do the plain thing + return fn(self, connection, *args, **original_kw) + + data = {} + for (dblink, table_owner), mapping in dblinks_owners.items(): + call_kw = { + **original_kw, + "schema": table_owner, + "dblink": self.normalize_name(dblink), + "filter_names": mapping.keys(), + } + call_result = fn(self, connection, *args, **call_kw) + for (_, tn), value in call_result: + synonym_name = self.normalize_name(mapping[tn]) + data[(schema, synonym_name)] = value + return data.items() + + @reflection.cache + def get_schema_names(self, connection, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + query = select(dictionary.all_users.c.username).order_by( + dictionary.all_users.c.username + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + @reflection.cache + def get_table_names(self, connection, schema=None, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + # note that table_names() isn't loading DBLINKed or synonym'ed tables + if schema is None: + schema = self.default_schema_name + + den_schema = self.denormalize_schema_name(schema) + if kw.get("oracle_resolve_synonyms", False): + tables = ( + select( + dictionary.all_tables.c.table_name, + dictionary.all_tables.c.owner, + dictionary.all_tables.c.iot_name, + dictionary.all_tables.c.duration, + dictionary.all_tables.c.tablespace_name, + ) + .union_all( + select( + dictionary.all_synonyms.c.synonym_name.label( + "table_name" + ), + dictionary.all_synonyms.c.owner, + dictionary.all_tables.c.iot_name, + dictionary.all_tables.c.duration, + dictionary.all_tables.c.tablespace_name, + ) + .select_from(dictionary.all_tables) + .join( + dictionary.all_synonyms, + and_( + dictionary.all_tables.c.table_name + == dictionary.all_synonyms.c.table_name, + dictionary.all_tables.c.owner + == func.coalesce( + dictionary.all_synonyms.c.table_owner, + dictionary.all_synonyms.c.owner, + ), + ), + ) + ) + .subquery("available_tables") + ) + else: + tables = dictionary.all_tables + + query = select(tables.c.table_name) + if self.exclude_tablespaces: + query = query.where( + func.coalesce( + tables.c.tablespace_name, "no tablespace" + ).not_in(self.exclude_tablespaces) + ) + query = query.where( + tables.c.owner == den_schema, + tables.c.iot_name.is_(null()), + tables.c.duration.is_(null()), + ) + + # remove materialized views + mat_query = select( + dictionary.all_mviews.c.mview_name.label("table_name") + ).where(dictionary.all_mviews.c.owner == den_schema) + + query = ( + query.except_all(mat_query) + if self._supports_except_all + else query.except_(mat_query) + ) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + @reflection.cache + def get_temp_table_names(self, connection, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + schema = self.denormalize_schema_name(self.default_schema_name) + + query = select(dictionary.all_tables.c.table_name) + if self.exclude_tablespaces: + query = query.where( + func.coalesce( + dictionary.all_tables.c.tablespace_name, "no tablespace" + ).not_in(self.exclude_tablespaces) + ) + query = query.where( + dictionary.all_tables.c.owner == schema, + dictionary.all_tables.c.iot_name.is_(null()), + dictionary.all_tables.c.duration.is_not(null()), + ) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + @reflection.cache + def get_materialized_view_names( + self, connection, schema=None, dblink=None, _normalize=True, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + + query = select(dictionary.all_mviews.c.mview_name).where( + dictionary.all_mviews.c.owner + == self.denormalize_schema_name(schema) + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + if _normalize: + return [self.normalize_name(row) for row in result] + else: + return result.all() + + @reflection.cache + def get_view_names(self, connection, schema=None, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + + query = select(dictionary.all_views.c.view_name).where( + dictionary.all_views.c.owner + == self.denormalize_schema_name(schema) + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + @reflection.cache + def get_sequence_names(self, connection, schema=None, dblink=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link.""" + if not schema: + schema = self.default_schema_name + query = select(dictionary.all_sequences.c.sequence_name).where( + dictionary.all_sequences.c.sequence_owner + == self.denormalize_schema_name(schema) + ) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(row) for row in result] + + def _value_or_raise(self, data, table, schema): + table = self.normalize_name(str(table)) + try: + return dict(data)[(schema, table)] + except KeyError: + raise exc.NoSuchTableError( + f"{schema}.{table}" if schema else table + ) from None + + def _prepare_filter_names(self, filter_names): + if filter_names: + fn = [self.denormalize_name(name) for name in filter_names] + return True, {"filter_names": fn} + else: + return False, {} + + @reflection.cache + def get_table_options(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_table_options( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _table_options_query( + self, owner, scope, kind, has_filter_names, has_mat_views + ): + query = select( + dictionary.all_tables.c.table_name, + dictionary.all_tables.c.compression, + dictionary.all_tables.c.compress_for, + ).where(dictionary.all_tables.c.owner == owner) + if has_filter_names: + query = query.where( + dictionary.all_tables.c.table_name.in_( + bindparam("filter_names") + ) + ) + if scope is ObjectScope.DEFAULT: + query = query.where(dictionary.all_tables.c.duration.is_(null())) + elif scope is ObjectScope.TEMPORARY: + query = query.where( + dictionary.all_tables.c.duration.is_not(null()) + ) + + if ( + has_mat_views + and ObjectKind.TABLE in kind + and ObjectKind.MATERIALIZED_VIEW not in kind + ): + # cant use EXCEPT ALL / MINUS here because we don't have an + # excludable row vs. the query above + # outerjoin + where null works better on oracle 21 but 11 does + # not like it at all. this is the next best thing + + query = query.where( + dictionary.all_tables.c.table_name.not_in( + bindparam("mat_views") + ) + ) + elif ( + ObjectKind.TABLE not in kind + and ObjectKind.MATERIALIZED_VIEW in kind + ): + query = query.where( + dictionary.all_tables.c.table_name.in_(bindparam("mat_views")) + ) + return query + + @_handle_synonyms_decorator + def get_multi_table_options( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + has_filter_names, params = self._prepare_filter_names(filter_names) + has_mat_views = False + + if ( + ObjectKind.TABLE in kind + and ObjectKind.MATERIALIZED_VIEW not in kind + ): + # see note in _table_options_query + mat_views = self.get_materialized_view_names( + connection, schema, dblink, _normalize=False, **kw + ) + if mat_views: + params["mat_views"] = mat_views + has_mat_views = True + elif ( + ObjectKind.TABLE not in kind + and ObjectKind.MATERIALIZED_VIEW in kind + ): + mat_views = self.get_materialized_view_names( + connection, schema, dblink, _normalize=False, **kw + ) + params["mat_views"] = mat_views + + options = {} + default = ReflectionDefaults.table_options + + if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind: + query = self._table_options_query( + owner, scope, kind, has_filter_names, has_mat_views + ) + result = self._execute_reflection( + connection, query, dblink, returns_long=False, params=params + ) + + for table, compression, compress_for in result: + if compression == "ENABLED": + data = {"oracle_compress": compress_for} + else: + data = default() + options[(schema, self.normalize_name(table))] = data + if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope: + # add the views (no temporary views) + for view in self.get_view_names(connection, schema, dblink, **kw): + if not filter_names or view in filter_names: + options[(schema, view)] = default() + + return options.items() + + @reflection.cache + def get_columns(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + + data = self.get_multi_columns( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + def _run_batches( + self, connection, query, dblink, returns_long, mappings, all_objects + ): + each_batch = 500 + batches = list(all_objects) + while batches: + batch = batches[0:each_batch] + batches[0:each_batch] = [] + + result = self._execute_reflection( + connection, + query, + dblink, + returns_long=returns_long, + params={"all_objects": batch}, + ) + if mappings: + yield from result.mappings() + else: + yield from result + + @lru_cache() + def _column_query(self, owner): + all_cols = dictionary.all_tab_cols + all_comments = dictionary.all_col_comments + all_ids = dictionary.all_tab_identity_cols + + if self.server_version_info >= (12,): + add_cols = ( + all_cols.c.default_on_null, + sql.case( + (all_ids.c.table_name.is_(None), sql.null()), + else_=all_ids.c.generation_type + + "," + + all_ids.c.identity_options, + ).label("identity_options"), + ) + join_identity_cols = True + else: + add_cols = ( + sql.null().label("default_on_null"), + sql.null().label("identity_options"), + ) + join_identity_cols = False + + # NOTE: on oracle cannot create tables/views without columns and + # a table cannot have all column hidden: + # ORA-54039: table must have at least one column that is not invisible + # all_tab_cols returns data for tables/views/mat-views. + # all_tab_cols does not return recycled tables + + query = ( + select( + all_cols.c.table_name, + all_cols.c.column_name, + all_cols.c.data_type, + all_cols.c.char_length, + all_cols.c.data_precision, + all_cols.c.data_scale, + all_cols.c.nullable, + all_cols.c.data_default, + all_comments.c.comments, + all_cols.c.virtual_column, + *add_cols, + ).select_from(all_cols) + # NOTE: all_col_comments has a row for each column even if no + # comment is present, so a join could be performed, but there + # seems to be no difference compared to an outer join + .outerjoin( + all_comments, + and_( + all_cols.c.table_name == all_comments.c.table_name, + all_cols.c.column_name == all_comments.c.column_name, + all_cols.c.owner == all_comments.c.owner, + ), + ) + ) + if join_identity_cols: + query = query.outerjoin( + all_ids, + and_( + all_cols.c.table_name == all_ids.c.table_name, + all_cols.c.column_name == all_ids.c.column_name, + all_cols.c.owner == all_ids.c.owner, + ), + ) + + query = query.where( + all_cols.c.table_name.in_(bindparam("all_objects")), + all_cols.c.hidden_column == "NO", + all_cols.c.owner == owner, + ).order_by(all_cols.c.table_name, all_cols.c.column_id) + return query + + @_handle_synonyms_decorator + def get_multi_columns( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + query = self._column_query(owner) + + if ( + filter_names + and kind is ObjectKind.ANY + and scope is ObjectScope.ANY + ): + all_objects = [self.denormalize_name(n) for n in filter_names] + else: + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + columns = defaultdict(list) + + # all_tab_cols.data_default is LONG + result = self._run_batches( + connection, + query, + dblink, + returns_long=True, + mappings=True, + all_objects=all_objects, + ) + + def maybe_int(value): + if isinstance(value, float) and value.is_integer(): + return int(value) + else: + return value + + remove_size = re.compile(r"\(\d+\)") + + for row_dict in result: + table_name = self.normalize_name(row_dict["table_name"]) + orig_colname = row_dict["column_name"] + colname = self.normalize_name(orig_colname) + coltype = row_dict["data_type"] + precision = maybe_int(row_dict["data_precision"]) + + if coltype == "NUMBER": + scale = maybe_int(row_dict["data_scale"]) + if precision is None and scale == 0: + coltype = INTEGER() + else: + coltype = NUMBER(precision, scale) + elif coltype == "FLOAT": + # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm + if precision == 126: + # The DOUBLE PRECISION datatype is a floating-point + # number with binary precision 126. + coltype = DOUBLE_PRECISION() + elif precision == 63: + # The REAL datatype is a floating-point number with a + # binary precision of 63, or 18 decimal. + coltype = REAL() + else: + # non standard precision + coltype = FLOAT(binary_precision=precision) + + elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"): + char_length = maybe_int(row_dict["char_length"]) + coltype = self.ischema_names.get(coltype)(char_length) + elif "WITH TIME ZONE" in coltype: + coltype = TIMESTAMP(timezone=True) + elif "WITH LOCAL TIME ZONE" in coltype: + coltype = TIMESTAMP(local_timezone=True) + else: + coltype = re.sub(remove_size, "", coltype) + try: + coltype = self.ischema_names[coltype] + except KeyError: + util.warn( + "Did not recognize type '%s' of column '%s'" + % (coltype, colname) + ) + coltype = sqltypes.NULLTYPE + + default = row_dict["data_default"] + if row_dict["virtual_column"] == "YES": + computed = dict(sqltext=default) + default = None + else: + computed = None + + identity_options = row_dict["identity_options"] + if identity_options is not None: + identity = self._parse_identity_options( + identity_options, row_dict["default_on_null"] + ) + default = None + else: + identity = None + + cdict = { + "name": colname, + "type": coltype, + "nullable": row_dict["nullable"] == "Y", + "default": default, + "comment": row_dict["comments"], + } + if orig_colname.lower() == orig_colname: + cdict["quote"] = True + if computed is not None: + cdict["computed"] = computed + if identity is not None: + cdict["identity"] = identity + + columns[(schema, table_name)].append(cdict) + + # NOTE: default not needed since all tables have columns + # default = ReflectionDefaults.columns + # return ( + # (key, value if value else default()) + # for key, value in columns.items() + # ) + return columns.items() + + def _parse_identity_options(self, identity_options, default_on_null): + # identity_options is a string that starts with 'ALWAYS,' or + # 'BY DEFAULT,' and continues with + # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1, + # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N, + # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N + parts = [p.strip() for p in identity_options.split(",")] + identity = { + "always": parts[0] == "ALWAYS", + "on_null": default_on_null == "YES", + } + + for part in parts[1:]: + option, value = part.split(":") + value = value.strip() + + if "START WITH" in option: + identity["start"] = int(value) + elif "INCREMENT BY" in option: + identity["increment"] = int(value) + elif "MAX_VALUE" in option: + identity["maxvalue"] = int(value) + elif "MIN_VALUE" in option: + identity["minvalue"] = int(value) + elif "CYCLE_FLAG" in option: + identity["cycle"] = value == "Y" + elif "CACHE_SIZE" in option: + identity["cache"] = int(value) + elif "ORDER_FLAG" in option: + identity["order"] = value == "Y" + return identity + + @reflection.cache + def get_table_comment(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_table_comment( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _comment_query(self, owner, scope, kind, has_filter_names): + # NOTE: all_tab_comments / all_mview_comments have a row for all + # object even if they don't have comments + queries = [] + if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind: + # all_tab_comments returns also plain views + tbl_view = select( + dictionary.all_tab_comments.c.table_name, + dictionary.all_tab_comments.c.comments, + ).where( + dictionary.all_tab_comments.c.owner == owner, + dictionary.all_tab_comments.c.table_name.not_like("BIN$%"), + ) + if ObjectKind.VIEW not in kind: + tbl_view = tbl_view.where( + dictionary.all_tab_comments.c.table_type == "TABLE" + ) + elif ObjectKind.TABLE not in kind: + tbl_view = tbl_view.where( + dictionary.all_tab_comments.c.table_type == "VIEW" + ) + queries.append(tbl_view) + if ObjectKind.MATERIALIZED_VIEW in kind: + mat_view = select( + dictionary.all_mview_comments.c.mview_name.label("table_name"), + dictionary.all_mview_comments.c.comments, + ).where( + dictionary.all_mview_comments.c.owner == owner, + dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"), + ) + queries.append(mat_view) + if len(queries) == 1: + query = queries[0] + else: + union = sql.union_all(*queries).subquery("tables_and_views") + query = select(union.c.table_name, union.c.comments) + + name_col = query.selected_columns.table_name + + if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY): + temp = "Y" if scope is ObjectScope.TEMPORARY else "N" + # need distinct since materialized view are listed also + # as tables in all_objects + query = query.distinct().join( + dictionary.all_objects, + and_( + dictionary.all_objects.c.owner == owner, + dictionary.all_objects.c.object_name == name_col, + dictionary.all_objects.c.temporary == temp, + ), + ) + if has_filter_names: + query = query.where(name_col.in_(bindparam("filter_names"))) + return query + + @_handle_synonyms_decorator + def get_multi_table_comment( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + has_filter_names, params = self._prepare_filter_names(filter_names) + query = self._comment_query(owner, scope, kind, has_filter_names) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False, params=params + ) + default = ReflectionDefaults.table_comment + # materialized views by default seem to have a comment like + # "snapshot table for snapshot owner.mat_view_name" + ignore_mat_view = "snapshot table for snapshot " + return ( + ( + (schema, self.normalize_name(table)), + ( + {"text": comment} + if comment is not None + and not comment.startswith(ignore_mat_view) + else default() + ), + ) + for table, comment in result + ) + + @reflection.cache + def get_indexes(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_indexes( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _index_query(self, owner): + return ( + select( + dictionary.all_ind_columns.c.table_name, + dictionary.all_ind_columns.c.index_name, + dictionary.all_ind_columns.c.column_name, + dictionary.all_indexes.c.index_type, + dictionary.all_indexes.c.uniqueness, + dictionary.all_indexes.c.compression, + dictionary.all_indexes.c.prefix_length, + dictionary.all_ind_columns.c.descend, + dictionary.all_ind_expressions.c.column_expression, + ) + .select_from(dictionary.all_ind_columns) + .join( + dictionary.all_indexes, + sql.and_( + dictionary.all_ind_columns.c.index_name + == dictionary.all_indexes.c.index_name, + dictionary.all_ind_columns.c.index_owner + == dictionary.all_indexes.c.owner, + ), + ) + .outerjoin( + # NOTE: this adds about 20% to the query time. Using a + # case expression with a scalar subquery only when needed + # with the assumption that most indexes are not expression + # would be faster but oracle does not like that with + # LONG datatype. It errors with: + # ORA-00997: illegal use of LONG datatype + dictionary.all_ind_expressions, + sql.and_( + dictionary.all_ind_expressions.c.index_name + == dictionary.all_ind_columns.c.index_name, + dictionary.all_ind_expressions.c.index_owner + == dictionary.all_ind_columns.c.index_owner, + dictionary.all_ind_expressions.c.column_position + == dictionary.all_ind_columns.c.column_position, + ), + ) + .where( + dictionary.all_indexes.c.table_owner == owner, + dictionary.all_indexes.c.table_name.in_( + bindparam("all_objects") + ), + ) + .order_by( + dictionary.all_ind_columns.c.index_name, + dictionary.all_ind_columns.c.column_position, + ) + ) + + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("dblink", InternalTraversal.dp_string), + ("all_objects", InternalTraversal.dp_string_list), + ) + def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw): + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + query = self._index_query(owner) + + pks = { + row_dict["constraint_name"] + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ) + if row_dict["constraint_type"] == "P" + } + + # all_ind_expressions.column_expression is LONG + result = self._run_batches( + connection, + query, + dblink, + returns_long=True, + mappings=True, + all_objects=all_objects, + ) + + return [ + row_dict + for row_dict in result + if row_dict["index_name"] not in pks + ] + + @_handle_synonyms_decorator + def get_multi_indexes( + self, + connection, + *, + schema, + filter_names, + scope, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + uniqueness = {"NONUNIQUE": False, "UNIQUE": True} + enabled = {"DISABLED": False, "ENABLED": True} + is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"} + + indexes = defaultdict(dict) + + for row_dict in self._get_indexes_rows( + connection, schema, dblink, all_objects, **kw + ): + index_name = self.normalize_name(row_dict["index_name"]) + table_name = self.normalize_name(row_dict["table_name"]) + table_indexes = indexes[(schema, table_name)] + + if index_name not in table_indexes: + table_indexes[index_name] = index_dict = { + "name": index_name, + "column_names": [], + "dialect_options": {}, + "unique": uniqueness.get(row_dict["uniqueness"], False), + } + do = index_dict["dialect_options"] + if row_dict["index_type"] in is_bitmap: + do["oracle_bitmap"] = True + if enabled.get(row_dict["compression"], False): + do["oracle_compress"] = row_dict["prefix_length"] + + else: + index_dict = table_indexes[index_name] + + expr = row_dict["column_expression"] + if expr is not None: + index_dict["column_names"].append(None) + if "expressions" in index_dict: + index_dict["expressions"].append(expr) + else: + index_dict["expressions"] = index_dict["column_names"][:-1] + index_dict["expressions"].append(expr) + + if row_dict["descend"].lower() != "asc": + assert row_dict["descend"].lower() == "desc" + cs = index_dict.setdefault("column_sorting", {}) + cs[expr] = ("desc",) + else: + assert row_dict["descend"].lower() == "asc" + cn = self.normalize_name(row_dict["column_name"]) + index_dict["column_names"].append(cn) + if "expressions" in index_dict: + index_dict["expressions"].append(cn) + + default = ReflectionDefaults.indexes + + return ( + (key, list(indexes[key].values()) if key in indexes else default()) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + @reflection.cache + def get_pk_constraint(self, connection, table_name, schema=None, **kw): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_pk_constraint( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @lru_cache() + def _constraint_query(self, owner): + local = dictionary.all_cons_columns.alias("local") + remote = dictionary.all_cons_columns.alias("remote") + return ( + select( + dictionary.all_constraints.c.table_name, + dictionary.all_constraints.c.constraint_type, + dictionary.all_constraints.c.constraint_name, + local.c.column_name.label("local_column"), + remote.c.table_name.label("remote_table"), + remote.c.column_name.label("remote_column"), + remote.c.owner.label("remote_owner"), + dictionary.all_constraints.c.search_condition, + dictionary.all_constraints.c.delete_rule, + ) + .select_from(dictionary.all_constraints) + .join( + local, + and_( + local.c.owner == dictionary.all_constraints.c.owner, + dictionary.all_constraints.c.constraint_name + == local.c.constraint_name, + ), + ) + .outerjoin( + remote, + and_( + dictionary.all_constraints.c.r_owner == remote.c.owner, + dictionary.all_constraints.c.r_constraint_name + == remote.c.constraint_name, + or_( + remote.c.position.is_(sql.null()), + local.c.position == remote.c.position, + ), + ), + ) + .where( + dictionary.all_constraints.c.owner == owner, + dictionary.all_constraints.c.table_name.in_( + bindparam("all_objects") + ), + dictionary.all_constraints.c.constraint_type.in_( + ("R", "P", "U", "C") + ), + ) + .order_by( + dictionary.all_constraints.c.constraint_name, local.c.position + ) + ) + + @reflection.flexi_cache( + ("schema", InternalTraversal.dp_string), + ("dblink", InternalTraversal.dp_string), + ("all_objects", InternalTraversal.dp_string_list), + ) + def _get_all_constraint_rows( + self, connection, schema, dblink, all_objects, **kw + ): + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + query = self._constraint_query(owner) + + # since the result is cached a list must be created + values = list( + self._run_batches( + connection, + query, + dblink, + returns_long=False, + mappings=True, + all_objects=all_objects, + ) + ) + return values + + @_handle_synonyms_decorator + def get_multi_pk_constraint( + self, + connection, + *, + scope, + schema, + filter_names, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + primary_keys = defaultdict(dict) + default = ReflectionDefaults.pk_constraint + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "P": + continue + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name = self.normalize_name(row_dict["constraint_name"]) + column_name = self.normalize_name(row_dict["local_column"]) + + table_pk = primary_keys[(schema, table_name)] + if not table_pk: + table_pk["name"] = constraint_name + table_pk["constrained_columns"] = [column_name] + else: + table_pk["constrained_columns"].append(column_name) + + return ( + (key, primary_keys[key] if key in primary_keys else default()) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + @reflection.cache + def get_foreign_keys( + self, + connection, + table_name, + schema=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_foreign_keys( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @_handle_synonyms_decorator + def get_multi_foreign_keys( + self, + connection, + *, + scope, + schema, + filter_names, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + resolve_synonyms = kw.get("oracle_resolve_synonyms", False) + + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + + all_remote_owners = set() + fkeys = defaultdict(dict) + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "R": + continue + + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name = self.normalize_name(row_dict["constraint_name"]) + table_fkey = fkeys[(schema, table_name)] + + assert constraint_name is not None + + local_column = self.normalize_name(row_dict["local_column"]) + remote_table = self.normalize_name(row_dict["remote_table"]) + remote_column = self.normalize_name(row_dict["remote_column"]) + remote_owner_orig = row_dict["remote_owner"] + remote_owner = self.normalize_name(remote_owner_orig) + if remote_owner_orig is not None: + all_remote_owners.add(remote_owner_orig) + + if remote_table is None: + # ticket 363 + if dblink and not dblink.startswith("@"): + dblink = f"@{dblink}" + util.warn( + "Got 'None' querying 'table_name' from " + f"all_cons_columns{dblink or ''} - does the user have " + "proper rights to the table?" + ) + continue + + if constraint_name not in table_fkey: + table_fkey[constraint_name] = fkey = { + "name": constraint_name, + "constrained_columns": [], + "referred_schema": None, + "referred_table": remote_table, + "referred_columns": [], + "options": {}, + } + + if resolve_synonyms: + # will be removed below + fkey["_ref_schema"] = remote_owner + + if schema is not None or remote_owner_orig != owner: + fkey["referred_schema"] = remote_owner + + delete_rule = row_dict["delete_rule"] + if delete_rule != "NO ACTION": + fkey["options"]["ondelete"] = delete_rule + + else: + fkey = table_fkey[constraint_name] + + fkey["constrained_columns"].append(local_column) + fkey["referred_columns"].append(remote_column) + + if resolve_synonyms and all_remote_owners: + query = select( + dictionary.all_synonyms.c.owner, + dictionary.all_synonyms.c.table_name, + dictionary.all_synonyms.c.table_owner, + dictionary.all_synonyms.c.synonym_name, + ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners)) + + result = self._execute_reflection( + connection, query, dblink, returns_long=False + ).mappings() + + remote_owners_lut = {} + for row in result: + synonym_owner = self.normalize_name(row["owner"]) + table_name = self.normalize_name(row["table_name"]) + + remote_owners_lut[(synonym_owner, table_name)] = ( + row["table_owner"], + row["synonym_name"], + ) + + empty = (None, None) + for table_fkeys in fkeys.values(): + for table_fkey in table_fkeys.values(): + key = ( + table_fkey.pop("_ref_schema"), + table_fkey["referred_table"], + ) + remote_owner, syn_name = remote_owners_lut.get(key, empty) + if syn_name: + sn = self.normalize_name(syn_name) + table_fkey["referred_table"] = sn + if schema is not None or remote_owner != owner: + ro = self.normalize_name(remote_owner) + table_fkey["referred_schema"] = ro + else: + table_fkey["referred_schema"] = None + default = ReflectionDefaults.foreign_keys + + return ( + (key, list(fkeys[key].values()) if key in fkeys else default()) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + @reflection.cache + def get_unique_constraints( + self, connection, table_name, schema=None, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_unique_constraints( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @_handle_synonyms_decorator + def get_multi_unique_constraints( + self, + connection, + *, + scope, + schema, + filter_names, + kind, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + unique_cons = defaultdict(dict) + + index_names = { + row_dict["index_name"] + for row_dict in self._get_indexes_rows( + connection, schema, dblink, all_objects, **kw + ) + } + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "U": + continue + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name_orig = row_dict["constraint_name"] + constraint_name = self.normalize_name(constraint_name_orig) + column_name = self.normalize_name(row_dict["local_column"]) + table_uc = unique_cons[(schema, table_name)] + + assert constraint_name is not None + + if constraint_name not in table_uc: + table_uc[constraint_name] = uc = { + "name": constraint_name, + "column_names": [], + "duplicates_index": ( + constraint_name + if constraint_name_orig in index_names + else None + ), + } + else: + uc = table_uc[constraint_name] + + uc["column_names"].append(column_name) + + default = ReflectionDefaults.unique_constraints + + return ( + ( + key, + ( + list(unique_cons[key].values()) + if key in unique_cons + else default() + ), + ) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + @reflection.cache + def get_view_definition( + self, + connection, + view_name, + schema=None, + dblink=None, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + if kw.get("oracle_resolve_synonyms", False): + synonyms = self._get_synonyms( + connection, schema, filter_names=[view_name], dblink=dblink + ) + if synonyms: + assert len(synonyms) == 1 + row_dict = synonyms[0] + dblink = self.normalize_name(row_dict["db_link"]) + schema = row_dict["table_owner"] + view_name = row_dict["table_name"] + + name = self.denormalize_name(view_name) + owner = self.denormalize_schema_name( + schema or self.default_schema_name + ) + query = ( + select(dictionary.all_views.c.text) + .where( + dictionary.all_views.c.view_name == name, + dictionary.all_views.c.owner == owner, + ) + .union_all( + select(dictionary.all_mviews.c.query).where( + dictionary.all_mviews.c.mview_name == name, + dictionary.all_mviews.c.owner == owner, + ) + ) + ) + + rp = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalar() + if rp is None: + raise exc.NoSuchTableError( + f"{schema}.{view_name}" if schema else view_name + ) + else: + return rp + + @reflection.cache + def get_check_constraints( + self, connection, table_name, schema=None, include_all=False, **kw + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + data = self.get_multi_check_constraints( + connection, + schema=schema, + filter_names=[table_name], + scope=ObjectScope.ANY, + include_all=include_all, + kind=ObjectKind.ANY, + **kw, + ) + return self._value_or_raise(data, table_name, schema) + + @_handle_synonyms_decorator + def get_multi_check_constraints( + self, + connection, + *, + schema, + filter_names, + dblink=None, + scope, + kind, + include_all=False, + **kw, + ): + """Supported kw arguments are: ``dblink`` to reflect via a db link; + ``oracle_resolve_synonyms`` to resolve names to synonyms + """ + all_objects = self._get_all_objects( + connection, schema, scope, kind, filter_names, dblink, **kw + ) + + not_null = re.compile(r"..+?. IS NOT NULL$") + + check_constraints = defaultdict(list) + + for row_dict in self._get_all_constraint_rows( + connection, schema, dblink, all_objects, **kw + ): + if row_dict["constraint_type"] != "C": + continue + table_name = self.normalize_name(row_dict["table_name"]) + constraint_name = self.normalize_name(row_dict["constraint_name"]) + search_condition = row_dict["search_condition"] + + table_checks = check_constraints[(schema, table_name)] + if constraint_name is not None and ( + include_all or not not_null.match(search_condition) + ): + table_checks.append( + {"name": constraint_name, "sqltext": search_condition} + ) + + default = ReflectionDefaults.check_constraints + + return ( + ( + key, + ( + check_constraints[key] + if key in check_constraints + else default() + ), + ) + for key in ( + (schema, self.normalize_name(obj_name)) + for obj_name in all_objects + ) + ) + + def _list_dblinks(self, connection, dblink=None): + query = select(dictionary.all_db_links.c.db_link) + links = self._execute_reflection( + connection, query, dblink, returns_long=False + ).scalars() + return [self.normalize_name(link) for link in links] + + +class _OuterJoinColumn(sql.ClauseElement): + __visit_name__ = "outer_join_column" + + def __init__(self, column): + self.column = column diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py new file mode 100644 index 0000000..9346224 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py @@ -0,0 +1,1492 @@ +# dialects/oracle/cx_oracle.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + + +r""" +.. dialect:: oracle+cx_oracle + :name: cx-Oracle + :dbapi: cx_oracle + :connectstring: oracle+cx_oracle://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]] + :url: https://oracle.github.io/python-cx_Oracle/ + +DSN vs. Hostname connections +----------------------------- + +cx_Oracle provides several methods of indicating the target database. The +dialect translates from a series of different URL forms. + +Hostname Connections with Easy Connect Syntax +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Given a hostname, port and service name of the target Oracle Database, for +example from Oracle's `Easy Connect syntax +<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#easy-connect-syntax-for-connection-strings>`_, +then connect in SQLAlchemy using the ``service_name`` query string parameter:: + + engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:port/?service_name=myservice&encoding=UTF-8&nencoding=UTF-8") + +The `full Easy Connect syntax +<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B0437826-43C1-49EC-A94D-B650B6A4A6EE>`_ +is not supported. Instead, use a ``tnsnames.ora`` file and connect using a +DSN. + +Connections with tnsnames.ora or Oracle Cloud +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Alternatively, if no port, database name, or ``service_name`` is provided, the +dialect will use an Oracle DSN "connection string". This takes the "hostname" +portion of the URL as the data source name. For example, if the +``tnsnames.ora`` file contains a `Net Service Name +<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#net-service-names-for-connection-strings>`_ +of ``myalias`` as below:: + + myalias = + (DESCRIPTION = + (ADDRESS = (PROTOCOL = TCP)(HOST = mymachine.example.com)(PORT = 1521)) + (CONNECT_DATA = + (SERVER = DEDICATED) + (SERVICE_NAME = orclpdb1) + ) + ) + +The cx_Oracle dialect connects to this database service when ``myalias`` is the +hostname portion of the URL, without specifying a port, database name or +``service_name``:: + + engine = create_engine("oracle+cx_oracle://scott:tiger@myalias/?encoding=UTF-8&nencoding=UTF-8") + +Users of Oracle Cloud should use this syntax and also configure the cloud +wallet as shown in cx_Oracle documentation `Connecting to Autononmous Databases +<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#connecting-to-autononmous-databases>`_. + +SID Connections +^^^^^^^^^^^^^^^ + +To use Oracle's obsolete SID connection syntax, the SID can be passed in a +"database name" portion of the URL as below:: + + engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:1521/dbname?encoding=UTF-8&nencoding=UTF-8") + +Above, the DSN passed to cx_Oracle is created by ``cx_Oracle.makedsn()`` as +follows:: + + >>> import cx_Oracle + >>> cx_Oracle.makedsn("hostname", 1521, sid="dbname") + '(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=hostname)(PORT=1521))(CONNECT_DATA=(SID=dbname)))' + +Passing cx_Oracle connect arguments +----------------------------------- + +Additional connection arguments can usually be passed via the URL +query string; particular symbols like ``cx_Oracle.SYSDBA`` are intercepted +and converted to the correct symbol:: + + e = create_engine( + "oracle+cx_oracle://user:pass@dsn?encoding=UTF-8&nencoding=UTF-8&mode=SYSDBA&events=true") + +.. versionchanged:: 1.3 the cx_oracle dialect now accepts all argument names + within the URL string itself, to be passed to the cx_Oracle DBAPI. As + was the case earlier but not correctly documented, the + :paramref:`_sa.create_engine.connect_args` parameter also accepts all + cx_Oracle DBAPI connect arguments. + +To pass arguments directly to ``.connect()`` without using the query +string, use the :paramref:`_sa.create_engine.connect_args` dictionary. +Any cx_Oracle parameter value and/or constant may be passed, such as:: + + import cx_Oracle + e = create_engine( + "oracle+cx_oracle://user:pass@dsn", + connect_args={ + "encoding": "UTF-8", + "nencoding": "UTF-8", + "mode": cx_Oracle.SYSDBA, + "events": True + } + ) + +Note that the default value for ``encoding`` and ``nencoding`` was changed to +"UTF-8" in cx_Oracle 8.0 so these parameters can be omitted when using that +version, or later. + +Options consumed by the SQLAlchemy cx_Oracle dialect outside of the driver +-------------------------------------------------------------------------- + +There are also options that are consumed by the SQLAlchemy cx_oracle dialect +itself. These options are always passed directly to :func:`_sa.create_engine` +, such as:: + + e = create_engine( + "oracle+cx_oracle://user:pass@dsn", coerce_to_decimal=False) + +The parameters accepted by the cx_oracle dialect are as follows: + +* ``arraysize`` - set the cx_oracle.arraysize value on cursors; defaults + to ``None``, indicating that the driver default should be used (typically + the value is 100). This setting controls how many rows are buffered when + fetching rows, and can have a significant effect on performance when + modified. The setting is used for both ``cx_Oracle`` as well as + ``oracledb``. + + .. versionchanged:: 2.0.26 - changed the default value from 50 to None, + to use the default value of the driver itself. + +* ``auto_convert_lobs`` - defaults to True; See :ref:`cx_oracle_lob`. + +* ``coerce_to_decimal`` - see :ref:`cx_oracle_numeric` for detail. + +* ``encoding_errors`` - see :ref:`cx_oracle_unicode_encoding_errors` for detail. + +.. _cx_oracle_sessionpool: + +Using cx_Oracle SessionPool +--------------------------- + +The cx_Oracle library provides its own connection pool implementation that may +be used in place of SQLAlchemy's pooling functionality. This can be achieved +by using the :paramref:`_sa.create_engine.creator` parameter to provide a +function that returns a new connection, along with setting +:paramref:`_sa.create_engine.pool_class` to ``NullPool`` to disable +SQLAlchemy's pooling:: + + import cx_Oracle + from sqlalchemy import create_engine + from sqlalchemy.pool import NullPool + + pool = cx_Oracle.SessionPool( + user="scott", password="tiger", dsn="orclpdb", + min=2, max=5, increment=1, threaded=True, + encoding="UTF-8", nencoding="UTF-8" + ) + + engine = create_engine("oracle+cx_oracle://", creator=pool.acquire, poolclass=NullPool) + +The above engine may then be used normally where cx_Oracle's pool handles +connection pooling:: + + with engine.connect() as conn: + print(conn.scalar("select 1 FROM dual")) + + +As well as providing a scalable solution for multi-user applications, the +cx_Oracle session pool supports some Oracle features such as DRCP and +`Application Continuity +<https://cx-oracle.readthedocs.io/en/latest/user_guide/ha.html#application-continuity-ac>`_. + +Using Oracle Database Resident Connection Pooling (DRCP) +-------------------------------------------------------- + +When using Oracle's `DRCP +<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-015CA8C1-2386-4626-855D-CC546DDC1086>`_, +the best practice is to pass a connection class and "purity" when acquiring a +connection from the SessionPool. Refer to the `cx_Oracle DRCP documentation +<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#database-resident-connection-pooling-drcp>`_. + +This can be achieved by wrapping ``pool.acquire()``:: + + import cx_Oracle + from sqlalchemy import create_engine + from sqlalchemy.pool import NullPool + + pool = cx_Oracle.SessionPool( + user="scott", password="tiger", dsn="orclpdb", + min=2, max=5, increment=1, threaded=True, + encoding="UTF-8", nencoding="UTF-8" + ) + + def creator(): + return pool.acquire(cclass="MYCLASS", purity=cx_Oracle.ATTR_PURITY_SELF) + + engine = create_engine("oracle+cx_oracle://", creator=creator, poolclass=NullPool) + +The above engine may then be used normally where cx_Oracle handles session +pooling and Oracle Database additionally uses DRCP:: + + with engine.connect() as conn: + print(conn.scalar("select 1 FROM dual")) + +.. _cx_oracle_unicode: + +Unicode +------- + +As is the case for all DBAPIs under Python 3, all strings are inherently +Unicode strings. In all cases however, the driver requires an explicit +encoding configuration. + +Ensuring the Correct Client Encoding +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The long accepted standard for establishing client encoding for nearly all +Oracle related software is via the `NLS_LANG <https://www.oracle.com/database/technologies/faq-nls-lang.html>`_ +environment variable. cx_Oracle like most other Oracle drivers will use +this environment variable as the source of its encoding configuration. The +format of this variable is idiosyncratic; a typical value would be +``AMERICAN_AMERICA.AL32UTF8``. + +The cx_Oracle driver also supports a programmatic alternative which is to +pass the ``encoding`` and ``nencoding`` parameters directly to its +``.connect()`` function. These can be present in the URL as follows:: + + engine = create_engine("oracle+cx_oracle://scott:tiger@orclpdb/?encoding=UTF-8&nencoding=UTF-8") + +For the meaning of the ``encoding`` and ``nencoding`` parameters, please +consult +`Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_. + +.. seealso:: + + `Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_ + - in the cx_Oracle documentation. + + +Unicode-specific Column datatypes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The Core expression language handles unicode data by use of the :class:`.Unicode` +and :class:`.UnicodeText` +datatypes. These types correspond to the VARCHAR2 and CLOB Oracle datatypes by +default. When using these datatypes with Unicode data, it is expected that +the Oracle database is configured with a Unicode-aware character set, as well +as that the ``NLS_LANG`` environment variable is set appropriately, so that +the VARCHAR2 and CLOB datatypes can accommodate the data. + +In the case that the Oracle database is not configured with a Unicode character +set, the two options are to use the :class:`_types.NCHAR` and +:class:`_oracle.NCLOB` datatypes explicitly, or to pass the flag +``use_nchar_for_unicode=True`` to :func:`_sa.create_engine`, +which will cause the +SQLAlchemy dialect to use NCHAR/NCLOB for the :class:`.Unicode` / +:class:`.UnicodeText` datatypes instead of VARCHAR/CLOB. + +.. versionchanged:: 1.3 The :class:`.Unicode` and :class:`.UnicodeText` + datatypes now correspond to the ``VARCHAR2`` and ``CLOB`` Oracle datatypes + unless the ``use_nchar_for_unicode=True`` is passed to the dialect + when :func:`_sa.create_engine` is called. + + +.. _cx_oracle_unicode_encoding_errors: + +Encoding Errors +^^^^^^^^^^^^^^^ + +For the unusual case that data in the Oracle database is present with a broken +encoding, the dialect accepts a parameter ``encoding_errors`` which will be +passed to Unicode decoding functions in order to affect how decoding errors are +handled. The value is ultimately consumed by the Python `decode +<https://docs.python.org/3/library/stdtypes.html#bytes.decode>`_ function, and +is passed both via cx_Oracle's ``encodingErrors`` parameter consumed by +``Cursor.var()``, as well as SQLAlchemy's own decoding function, as the +cx_Oracle dialect makes use of both under different circumstances. + +.. versionadded:: 1.3.11 + + +.. _cx_oracle_setinputsizes: + +Fine grained control over cx_Oracle data binding performance with setinputsizes +------------------------------------------------------------------------------- + +The cx_Oracle DBAPI has a deep and fundamental reliance upon the usage of the +DBAPI ``setinputsizes()`` call. The purpose of this call is to establish the +datatypes that are bound to a SQL statement for Python values being passed as +parameters. While virtually no other DBAPI assigns any use to the +``setinputsizes()`` call, the cx_Oracle DBAPI relies upon it heavily in its +interactions with the Oracle client interface, and in some scenarios it is not +possible for SQLAlchemy to know exactly how data should be bound, as some +settings can cause profoundly different performance characteristics, while +altering the type coercion behavior at the same time. + +Users of the cx_Oracle dialect are **strongly encouraged** to read through +cx_Oracle's list of built-in datatype symbols at +https://cx-oracle.readthedocs.io/en/latest/api_manual/module.html#database-types. +Note that in some cases, significant performance degradation can occur when +using these types vs. not, in particular when specifying ``cx_Oracle.CLOB``. + +On the SQLAlchemy side, the :meth:`.DialectEvents.do_setinputsizes` event can +be used both for runtime visibility (e.g. logging) of the setinputsizes step as +well as to fully control how ``setinputsizes()`` is used on a per-statement +basis. + +.. versionadded:: 1.2.9 Added :meth:`.DialectEvents.setinputsizes` + + +Example 1 - logging all setinputsizes calls +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The following example illustrates how to log the intermediary values from a +SQLAlchemy perspective before they are converted to the raw ``setinputsizes()`` +parameter dictionary. The keys of the dictionary are :class:`.BindParameter` +objects which have a ``.key`` and a ``.type`` attribute:: + + from sqlalchemy import create_engine, event + + engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe") + + @event.listens_for(engine, "do_setinputsizes") + def _log_setinputsizes(inputsizes, cursor, statement, parameters, context): + for bindparam, dbapitype in inputsizes.items(): + log.info( + "Bound parameter name: %s SQLAlchemy type: %r " + "DBAPI object: %s", + bindparam.key, bindparam.type, dbapitype) + +Example 2 - remove all bindings to CLOB +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The ``CLOB`` datatype in cx_Oracle incurs a significant performance overhead, +however is set by default for the ``Text`` type within the SQLAlchemy 1.2 +series. This setting can be modified as follows:: + + from sqlalchemy import create_engine, event + from cx_Oracle import CLOB + + engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe") + + @event.listens_for(engine, "do_setinputsizes") + def _remove_clob(inputsizes, cursor, statement, parameters, context): + for bindparam, dbapitype in list(inputsizes.items()): + if dbapitype is CLOB: + del inputsizes[bindparam] + +.. _cx_oracle_returning: + +RETURNING Support +----------------- + +The cx_Oracle dialect implements RETURNING using OUT parameters. +The dialect supports RETURNING fully. + +.. _cx_oracle_lob: + +LOB Datatypes +-------------- + +LOB datatypes refer to the "large object" datatypes such as CLOB, NCLOB and +BLOB. Modern versions of cx_Oracle and oracledb are optimized for these +datatypes to be delivered as a single buffer. As such, SQLAlchemy makes use of +these newer type handlers by default. + +To disable the use of newer type handlers and deliver LOB objects as classic +buffered objects with a ``read()`` method, the parameter +``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`, +which takes place only engine-wide. + +Two Phase Transactions Not Supported +------------------------------------- + +Two phase transactions are **not supported** under cx_Oracle due to poor +driver support. As of cx_Oracle 6.0b1, the interface for +two phase transactions has been changed to be more of a direct pass-through +to the underlying OCI layer with less automation. The additional logic +to support this system is not implemented in SQLAlchemy. + +.. _cx_oracle_numeric: + +Precision Numerics +------------------ + +SQLAlchemy's numeric types can handle receiving and returning values as Python +``Decimal`` objects or float objects. When a :class:`.Numeric` object, or a +subclass such as :class:`.Float`, :class:`_oracle.DOUBLE_PRECISION` etc. is in +use, the :paramref:`.Numeric.asdecimal` flag determines if values should be +coerced to ``Decimal`` upon return, or returned as float objects. To make +matters more complicated under Oracle, Oracle's ``NUMBER`` type can also +represent integer values if the "scale" is zero, so the Oracle-specific +:class:`_oracle.NUMBER` type takes this into account as well. + +The cx_Oracle dialect makes extensive use of connection- and cursor-level +"outputtypehandler" callables in order to coerce numeric values as requested. +These callables are specific to the specific flavor of :class:`.Numeric` in +use, as well as if no SQLAlchemy typing objects are present. There are +observed scenarios where Oracle may sends incomplete or ambiguous information +about the numeric types being returned, such as a query where the numeric types +are buried under multiple levels of subquery. The type handlers do their best +to make the right decision in all cases, deferring to the underlying cx_Oracle +DBAPI for all those cases where the driver can make the best decision. + +When no typing objects are present, as when executing plain SQL strings, a +default "outputtypehandler" is present which will generally return numeric +values which specify precision and scale as Python ``Decimal`` objects. To +disable this coercion to decimal for performance reasons, pass the flag +``coerce_to_decimal=False`` to :func:`_sa.create_engine`:: + + engine = create_engine("oracle+cx_oracle://dsn", coerce_to_decimal=False) + +The ``coerce_to_decimal`` flag only impacts the results of plain string +SQL statements that are not otherwise associated with a :class:`.Numeric` +SQLAlchemy type (or a subclass of such). + +.. versionchanged:: 1.2 The numeric handling system for cx_Oracle has been + reworked to take advantage of newer cx_Oracle features as well + as better integration of outputtypehandlers. + +""" # noqa +from __future__ import annotations + +import decimal +import random +import re + +from . import base as oracle +from .base import OracleCompiler +from .base import OracleDialect +from .base import OracleExecutionContext +from .types import _OracleDateLiteralRender +from ... import exc +from ... import util +from ...engine import cursor as _cursor +from ...engine import interfaces +from ...engine import processors +from ...sql import sqltypes +from ...sql._typing import is_sql_compiler + +# source: +# https://github.com/oracle/python-cx_Oracle/issues/596#issuecomment-999243649 +_CX_ORACLE_MAGIC_LOB_SIZE = 131072 + + +class _OracleInteger(sqltypes.Integer): + def get_dbapi_type(self, dbapi): + # see https://github.com/oracle/python-cx_Oracle/issues/ + # 208#issuecomment-409715955 + return int + + def _cx_oracle_var(self, dialect, cursor, arraysize=None): + cx_Oracle = dialect.dbapi + return cursor.var( + cx_Oracle.STRING, + 255, + arraysize=arraysize if arraysize is not None else cursor.arraysize, + outconverter=int, + ) + + def _cx_oracle_outputtypehandler(self, dialect): + def handler(cursor, name, default_type, size, precision, scale): + return self._cx_oracle_var(dialect, cursor) + + return handler + + +class _OracleNumeric(sqltypes.Numeric): + is_number = False + + def bind_processor(self, dialect): + if self.scale == 0: + return None + elif self.asdecimal: + processor = processors.to_decimal_processor_factory( + decimal.Decimal, self._effective_decimal_return_scale + ) + + def process(value): + if isinstance(value, (int, float)): + return processor(value) + elif value is not None and value.is_infinite(): + return float(value) + else: + return value + + return process + else: + return processors.to_float + + def result_processor(self, dialect, coltype): + return None + + def _cx_oracle_outputtypehandler(self, dialect): + cx_Oracle = dialect.dbapi + + def handler(cursor, name, default_type, size, precision, scale): + outconverter = None + + if precision: + if self.asdecimal: + if default_type == cx_Oracle.NATIVE_FLOAT: + # receiving float and doing Decimal after the fact + # allows for float("inf") to be handled + type_ = default_type + outconverter = decimal.Decimal + else: + type_ = decimal.Decimal + else: + if self.is_number and scale == 0: + # integer. cx_Oracle is observed to handle the widest + # variety of ints when no directives are passed, + # from 5.2 to 7.0. See [ticket:4457] + return None + else: + type_ = cx_Oracle.NATIVE_FLOAT + + else: + if self.asdecimal: + if default_type == cx_Oracle.NATIVE_FLOAT: + type_ = default_type + outconverter = decimal.Decimal + else: + type_ = decimal.Decimal + else: + if self.is_number and scale == 0: + # integer. cx_Oracle is observed to handle the widest + # variety of ints when no directives are passed, + # from 5.2 to 7.0. See [ticket:4457] + return None + else: + type_ = cx_Oracle.NATIVE_FLOAT + + return cursor.var( + type_, + 255, + arraysize=cursor.arraysize, + outconverter=outconverter, + ) + + return handler + + +class _OracleUUID(sqltypes.Uuid): + def get_dbapi_type(self, dbapi): + return dbapi.STRING + + +class _OracleBinaryFloat(_OracleNumeric): + def get_dbapi_type(self, dbapi): + return dbapi.NATIVE_FLOAT + + +class _OracleBINARY_FLOAT(_OracleBinaryFloat, oracle.BINARY_FLOAT): + pass + + +class _OracleBINARY_DOUBLE(_OracleBinaryFloat, oracle.BINARY_DOUBLE): + pass + + +class _OracleNUMBER(_OracleNumeric): + is_number = True + + +class _CXOracleDate(oracle._OracleDate): + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + def process(value): + if value is not None: + return value.date() + else: + return value + + return process + + +class _CXOracleTIMESTAMP(_OracleDateLiteralRender, sqltypes.TIMESTAMP): + def literal_processor(self, dialect): + return self._literal_processor_datetime(dialect) + + +class _LOBDataType: + pass + + +# TODO: the names used across CHAR / VARCHAR / NCHAR / NVARCHAR +# here are inconsistent and not very good +class _OracleChar(sqltypes.CHAR): + def get_dbapi_type(self, dbapi): + return dbapi.FIXED_CHAR + + +class _OracleNChar(sqltypes.NCHAR): + def get_dbapi_type(self, dbapi): + return dbapi.FIXED_NCHAR + + +class _OracleUnicodeStringNCHAR(oracle.NVARCHAR2): + def get_dbapi_type(self, dbapi): + return dbapi.NCHAR + + +class _OracleUnicodeStringCHAR(sqltypes.Unicode): + def get_dbapi_type(self, dbapi): + return dbapi.LONG_STRING + + +class _OracleUnicodeTextNCLOB(_LOBDataType, oracle.NCLOB): + def get_dbapi_type(self, dbapi): + # previously, this was dbapi.NCLOB. + # DB_TYPE_NVARCHAR will instead be passed to setinputsizes() + # when this datatype is used. + return dbapi.DB_TYPE_NVARCHAR + + +class _OracleUnicodeTextCLOB(_LOBDataType, sqltypes.UnicodeText): + def get_dbapi_type(self, dbapi): + # previously, this was dbapi.CLOB. + # DB_TYPE_NVARCHAR will instead be passed to setinputsizes() + # when this datatype is used. + return dbapi.DB_TYPE_NVARCHAR + + +class _OracleText(_LOBDataType, sqltypes.Text): + def get_dbapi_type(self, dbapi): + # previously, this was dbapi.CLOB. + # DB_TYPE_NVARCHAR will instead be passed to setinputsizes() + # when this datatype is used. + return dbapi.DB_TYPE_NVARCHAR + + +class _OracleLong(_LOBDataType, oracle.LONG): + def get_dbapi_type(self, dbapi): + return dbapi.LONG_STRING + + +class _OracleString(sqltypes.String): + pass + + +class _OracleEnum(sqltypes.Enum): + def bind_processor(self, dialect): + enum_proc = sqltypes.Enum.bind_processor(self, dialect) + + def process(value): + raw_str = enum_proc(value) + return raw_str + + return process + + +class _OracleBinary(_LOBDataType, sqltypes.LargeBinary): + def get_dbapi_type(self, dbapi): + # previously, this was dbapi.BLOB. + # DB_TYPE_RAW will instead be passed to setinputsizes() + # when this datatype is used. + return dbapi.DB_TYPE_RAW + + def bind_processor(self, dialect): + return None + + def result_processor(self, dialect, coltype): + if not dialect.auto_convert_lobs: + return None + else: + return super().result_processor(dialect, coltype) + + +class _OracleInterval(oracle.INTERVAL): + def get_dbapi_type(self, dbapi): + return dbapi.INTERVAL + + +class _OracleRaw(oracle.RAW): + pass + + +class _OracleRowid(oracle.ROWID): + def get_dbapi_type(self, dbapi): + return dbapi.ROWID + + +class OracleCompiler_cx_oracle(OracleCompiler): + _oracle_cx_sql_compiler = True + + _oracle_returning = False + + # Oracle bind names can't start with digits or underscores. + # currently we rely upon Oracle-specific quoting of bind names in most + # cases. however for expanding params, the escape chars are used. + # see #8708 + bindname_escape_characters = util.immutabledict( + { + "%": "P", + "(": "A", + ")": "Z", + ":": "C", + ".": "C", + "[": "C", + "]": "C", + " ": "C", + "\\": "C", + "/": "C", + "?": "C", + } + ) + + def bindparam_string(self, name, **kw): + quote = getattr(name, "quote", None) + if ( + quote is True + or quote is not False + and self.preparer._bindparam_requires_quotes(name) + # bind param quoting for Oracle doesn't work with post_compile + # params. For those, the default bindparam_string will escape + # special chars, and the appending of a number "_1" etc. will + # take care of reserved words + and not kw.get("post_compile", False) + ): + # interesting to note about expanding parameters - since the + # new parameters take the form <paramname>_<int>, at least if + # they are originally formed from reserved words, they no longer + # need quoting :). names that include illegal characters + # won't work however. + quoted_name = '"%s"' % name + kw["escaped_from"] = name + name = quoted_name + return OracleCompiler.bindparam_string(self, name, **kw) + + # TODO: we could likely do away with quoting altogether for + # Oracle parameters and use the custom escaping here + escaped_from = kw.get("escaped_from", None) + if not escaped_from: + if self._bind_translate_re.search(name): + # not quite the translate use case as we want to + # also get a quick boolean if we even found + # unusual characters in the name + new_name = self._bind_translate_re.sub( + lambda m: self._bind_translate_chars[m.group(0)], + name, + ) + if new_name[0].isdigit() or new_name[0] == "_": + new_name = "D" + new_name + kw["escaped_from"] = name + name = new_name + elif name[0].isdigit() or name[0] == "_": + new_name = "D" + name + kw["escaped_from"] = name + name = new_name + + return OracleCompiler.bindparam_string(self, name, **kw) + + +class OracleExecutionContext_cx_oracle(OracleExecutionContext): + out_parameters = None + + def _generate_out_parameter_vars(self): + # check for has_out_parameters or RETURNING, create cx_Oracle.var + # objects if so + if self.compiled.has_out_parameters or self.compiled._oracle_returning: + out_parameters = self.out_parameters + assert out_parameters is not None + + len_params = len(self.parameters) + + quoted_bind_names = self.compiled.escaped_bind_names + for bindparam in self.compiled.binds.values(): + if bindparam.isoutparam: + name = self.compiled.bind_names[bindparam] + type_impl = bindparam.type.dialect_impl(self.dialect) + + if hasattr(type_impl, "_cx_oracle_var"): + out_parameters[name] = type_impl._cx_oracle_var( + self.dialect, self.cursor, arraysize=len_params + ) + else: + dbtype = type_impl.get_dbapi_type(self.dialect.dbapi) + + cx_Oracle = self.dialect.dbapi + + assert cx_Oracle is not None + + if dbtype is None: + raise exc.InvalidRequestError( + "Cannot create out parameter for " + "parameter " + "%r - its type %r is not supported by" + " cx_oracle" % (bindparam.key, bindparam.type) + ) + + # note this is an OUT parameter. Using + # non-LOB datavalues with large unicode-holding + # values causes the failure (both cx_Oracle and + # oracledb): + # ORA-22835: Buffer too small for CLOB to CHAR or + # BLOB to RAW conversion (actual: 16507, + # maximum: 4000) + # [SQL: INSERT INTO long_text (x, y, z) VALUES + # (:x, :y, :z) RETURNING long_text.x, long_text.y, + # long_text.z INTO :ret_0, :ret_1, :ret_2] + # so even for DB_TYPE_NVARCHAR we convert to a LOB + + if isinstance(type_impl, _LOBDataType): + if dbtype == cx_Oracle.DB_TYPE_NVARCHAR: + dbtype = cx_Oracle.NCLOB + elif dbtype == cx_Oracle.DB_TYPE_RAW: + dbtype = cx_Oracle.BLOB + # other LOB types go in directly + + out_parameters[name] = self.cursor.var( + dbtype, + # this is fine also in oracledb_async since + # the driver will await the read coroutine + outconverter=lambda value: value.read(), + arraysize=len_params, + ) + elif ( + isinstance(type_impl, _OracleNumeric) + and type_impl.asdecimal + ): + out_parameters[name] = self.cursor.var( + decimal.Decimal, + arraysize=len_params, + ) + + else: + out_parameters[name] = self.cursor.var( + dbtype, arraysize=len_params + ) + + for param in self.parameters: + param[quoted_bind_names.get(name, name)] = ( + out_parameters[name] + ) + + def _generate_cursor_outputtype_handler(self): + output_handlers = {} + + for keyname, name, objects, type_ in self.compiled._result_columns: + handler = type_._cached_custom_processor( + self.dialect, + "cx_oracle_outputtypehandler", + self._get_cx_oracle_type_handler, + ) + + if handler: + denormalized_name = self.dialect.denormalize_name(keyname) + output_handlers[denormalized_name] = handler + + if output_handlers: + default_handler = self._dbapi_connection.outputtypehandler + + def output_type_handler( + cursor, name, default_type, size, precision, scale + ): + if name in output_handlers: + return output_handlers[name]( + cursor, name, default_type, size, precision, scale + ) + else: + return default_handler( + cursor, name, default_type, size, precision, scale + ) + + self.cursor.outputtypehandler = output_type_handler + + def _get_cx_oracle_type_handler(self, impl): + if hasattr(impl, "_cx_oracle_outputtypehandler"): + return impl._cx_oracle_outputtypehandler(self.dialect) + else: + return None + + def pre_exec(self): + super().pre_exec() + if not getattr(self.compiled, "_oracle_cx_sql_compiler", False): + return + + self.out_parameters = {} + + self._generate_out_parameter_vars() + + self._generate_cursor_outputtype_handler() + + def post_exec(self): + if ( + self.compiled + and is_sql_compiler(self.compiled) + and self.compiled._oracle_returning + ): + initial_buffer = self.fetchall_for_returning( + self.cursor, _internal=True + ) + + fetch_strategy = _cursor.FullyBufferedCursorFetchStrategy( + self.cursor, + [ + (entry.keyname, None) + for entry in self.compiled._result_columns + ], + initial_buffer=initial_buffer, + ) + + self.cursor_fetch_strategy = fetch_strategy + + def create_cursor(self): + c = self._dbapi_connection.cursor() + if self.dialect.arraysize: + c.arraysize = self.dialect.arraysize + + return c + + def fetchall_for_returning(self, cursor, *, _internal=False): + compiled = self.compiled + if ( + not _internal + and compiled is None + or not is_sql_compiler(compiled) + or not compiled._oracle_returning + ): + raise NotImplementedError( + "execution context was not prepared for Oracle RETURNING" + ) + + # create a fake cursor result from the out parameters. unlike + # get_out_parameter_values(), the result-row handlers here will be + # applied at the Result level + + numcols = len(self.out_parameters) + + # [stmt_result for stmt_result in outparam.values] == each + # statement in executemany + # [val for val in stmt_result] == each row for a particular + # statement + return list( + zip( + *[ + [ + val + for stmt_result in self.out_parameters[ + f"ret_{j}" + ].values + for val in (stmt_result or ()) + ] + for j in range(numcols) + ] + ) + ) + + def get_out_parameter_values(self, out_param_names): + # this method should not be called when the compiler has + # RETURNING as we've turned the has_out_parameters flag set to + # False. + assert not self.compiled.returning + + return [ + self.dialect._paramval(self.out_parameters[name]) + for name in out_param_names + ] + + +class OracleDialect_cx_oracle(OracleDialect): + supports_statement_cache = True + execution_ctx_cls = OracleExecutionContext_cx_oracle + statement_compiler = OracleCompiler_cx_oracle + + supports_sane_rowcount = True + supports_sane_multi_rowcount = True + + insert_executemany_returning = True + insert_executemany_returning_sort_by_parameter_order = True + update_executemany_returning = True + delete_executemany_returning = True + + bind_typing = interfaces.BindTyping.SETINPUTSIZES + + driver = "cx_oracle" + + colspecs = util.update_copy( + OracleDialect.colspecs, + { + sqltypes.TIMESTAMP: _CXOracleTIMESTAMP, + sqltypes.Numeric: _OracleNumeric, + sqltypes.Float: _OracleNumeric, + oracle.BINARY_FLOAT: _OracleBINARY_FLOAT, + oracle.BINARY_DOUBLE: _OracleBINARY_DOUBLE, + sqltypes.Integer: _OracleInteger, + oracle.NUMBER: _OracleNUMBER, + sqltypes.Date: _CXOracleDate, + sqltypes.LargeBinary: _OracleBinary, + sqltypes.Boolean: oracle._OracleBoolean, + sqltypes.Interval: _OracleInterval, + oracle.INTERVAL: _OracleInterval, + sqltypes.Text: _OracleText, + sqltypes.String: _OracleString, + sqltypes.UnicodeText: _OracleUnicodeTextCLOB, + sqltypes.CHAR: _OracleChar, + sqltypes.NCHAR: _OracleNChar, + sqltypes.Enum: _OracleEnum, + oracle.LONG: _OracleLong, + oracle.RAW: _OracleRaw, + sqltypes.Unicode: _OracleUnicodeStringCHAR, + sqltypes.NVARCHAR: _OracleUnicodeStringNCHAR, + sqltypes.Uuid: _OracleUUID, + oracle.NCLOB: _OracleUnicodeTextNCLOB, + oracle.ROWID: _OracleRowid, + }, + ) + + execute_sequence_format = list + + _cx_oracle_threaded = None + + _cursor_var_unicode_kwargs = util.immutabledict() + + @util.deprecated_params( + threaded=( + "1.3", + "The 'threaded' parameter to the cx_oracle/oracledb dialect " + "is deprecated as a dialect-level argument, and will be removed " + "in a future release. As of version 1.3, it defaults to False " + "rather than True. The 'threaded' option can be passed to " + "cx_Oracle directly in the URL query string passed to " + ":func:`_sa.create_engine`.", + ) + ) + def __init__( + self, + auto_convert_lobs=True, + coerce_to_decimal=True, + arraysize=None, + encoding_errors=None, + threaded=None, + **kwargs, + ): + OracleDialect.__init__(self, **kwargs) + self.arraysize = arraysize + self.encoding_errors = encoding_errors + if encoding_errors: + self._cursor_var_unicode_kwargs = { + "encodingErrors": encoding_errors + } + if threaded is not None: + self._cx_oracle_threaded = threaded + self.auto_convert_lobs = auto_convert_lobs + self.coerce_to_decimal = coerce_to_decimal + if self._use_nchar_for_unicode: + self.colspecs = self.colspecs.copy() + self.colspecs[sqltypes.Unicode] = _OracleUnicodeStringNCHAR + self.colspecs[sqltypes.UnicodeText] = _OracleUnicodeTextNCLOB + + dbapi_module = self.dbapi + self._load_version(dbapi_module) + + if dbapi_module is not None: + # these constants will first be seen in SQLAlchemy datatypes + # coming from the get_dbapi_type() method. We then + # will place the following types into setinputsizes() calls + # on each statement. Oracle constants that are not in this + # list will not be put into setinputsizes(). + self.include_set_input_sizes = { + dbapi_module.DATETIME, + dbapi_module.DB_TYPE_NVARCHAR, # used for CLOB, NCLOB + dbapi_module.DB_TYPE_RAW, # used for BLOB + dbapi_module.NCLOB, # not currently used except for OUT param + dbapi_module.CLOB, # not currently used except for OUT param + dbapi_module.LOB, # not currently used + dbapi_module.BLOB, # not currently used except for OUT param + dbapi_module.NCHAR, + dbapi_module.FIXED_NCHAR, + dbapi_module.FIXED_CHAR, + dbapi_module.TIMESTAMP, + int, # _OracleInteger, + # _OracleBINARY_FLOAT, _OracleBINARY_DOUBLE, + dbapi_module.NATIVE_FLOAT, + } + + self._paramval = lambda value: value.getvalue() + + def _load_version(self, dbapi_module): + version = (0, 0, 0) + if dbapi_module is not None: + m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", dbapi_module.version) + if m: + version = tuple( + int(x) for x in m.group(1, 2, 3) if x is not None + ) + self.cx_oracle_ver = version + if self.cx_oracle_ver < (8,) and self.cx_oracle_ver > (0, 0, 0): + raise exc.InvalidRequestError( + "cx_Oracle version 8 and above are supported" + ) + + @classmethod + def import_dbapi(cls): + import cx_Oracle + + return cx_Oracle + + def initialize(self, connection): + super().initialize(connection) + self._detect_decimal_char(connection) + + def get_isolation_level(self, dbapi_connection): + # sources: + + # general idea of transaction id, have to start one, etc. + # https://stackoverflow.com/questions/10711204/how-to-check-isoloation-level + + # how to decode xid cols from v$transaction to match + # https://asktom.oracle.com/pls/apex/f?p=100:11:0::::P11_QUESTION_ID:9532779900346079444 + + # Oracle tuple comparison without using IN: + # https://www.sql-workbench.eu/comparison/tuple_comparison.html + + with dbapi_connection.cursor() as cursor: + # this is the only way to ensure a transaction is started without + # actually running DML. There's no way to see the configured + # isolation level without getting it from v$transaction which + # means transaction has to be started. + outval = cursor.var(str) + cursor.execute( + """ + begin + :trans_id := dbms_transaction.local_transaction_id( TRUE ); + end; + """, + {"trans_id": outval}, + ) + trans_id = outval.getvalue() + xidusn, xidslot, xidsqn = trans_id.split(".", 2) + + cursor.execute( + "SELECT CASE BITAND(t.flag, POWER(2, 28)) " + "WHEN 0 THEN 'READ COMMITTED' " + "ELSE 'SERIALIZABLE' END AS isolation_level " + "FROM v$transaction t WHERE " + "(t.xidusn, t.xidslot, t.xidsqn) = " + "((:xidusn, :xidslot, :xidsqn))", + {"xidusn": xidusn, "xidslot": xidslot, "xidsqn": xidsqn}, + ) + row = cursor.fetchone() + if row is None: + raise exc.InvalidRequestError( + "could not retrieve isolation level" + ) + result = row[0] + + return result + + def get_isolation_level_values(self, dbapi_connection): + return super().get_isolation_level_values(dbapi_connection) + [ + "AUTOCOMMIT" + ] + + def set_isolation_level(self, dbapi_connection, level): + if level == "AUTOCOMMIT": + dbapi_connection.autocommit = True + else: + dbapi_connection.autocommit = False + dbapi_connection.rollback() + with dbapi_connection.cursor() as cursor: + cursor.execute(f"ALTER SESSION SET ISOLATION_LEVEL={level}") + + def _detect_decimal_char(self, connection): + # we have the option to change this setting upon connect, + # or just look at what it is upon connect and convert. + # to minimize the chance of interference with changes to + # NLS_TERRITORY or formatting behavior of the DB, we opt + # to just look at it + + dbapi_connection = connection.connection + + with dbapi_connection.cursor() as cursor: + # issue #8744 + # nls_session_parameters is not available in some Oracle + # modes like "mount mode". But then, v$nls_parameters is not + # available if the connection doesn't have SYSDBA priv. + # + # simplify the whole thing and just use the method that we were + # doing in the test suite already, selecting a number + + def output_type_handler( + cursor, name, defaultType, size, precision, scale + ): + return cursor.var( + self.dbapi.STRING, 255, arraysize=cursor.arraysize + ) + + cursor.outputtypehandler = output_type_handler + cursor.execute("SELECT 1.1 FROM DUAL") + value = cursor.fetchone()[0] + + decimal_char = value.lstrip("0")[1] + assert not decimal_char[0].isdigit() + + self._decimal_char = decimal_char + + if self._decimal_char != ".": + _detect_decimal = self._detect_decimal + _to_decimal = self._to_decimal + + self._detect_decimal = lambda value: _detect_decimal( + value.replace(self._decimal_char, ".") + ) + self._to_decimal = lambda value: _to_decimal( + value.replace(self._decimal_char, ".") + ) + + def _detect_decimal(self, value): + if "." in value: + return self._to_decimal(value) + else: + return int(value) + + _to_decimal = decimal.Decimal + + def _generate_connection_outputtype_handler(self): + """establish the default outputtypehandler established at the + connection level. + + """ + + dialect = self + cx_Oracle = dialect.dbapi + + number_handler = _OracleNUMBER( + asdecimal=True + )._cx_oracle_outputtypehandler(dialect) + float_handler = _OracleNUMBER( + asdecimal=False + )._cx_oracle_outputtypehandler(dialect) + + def output_type_handler( + cursor, name, default_type, size, precision, scale + ): + if ( + default_type == cx_Oracle.NUMBER + and default_type is not cx_Oracle.NATIVE_FLOAT + ): + if not dialect.coerce_to_decimal: + return None + elif precision == 0 and scale in (0, -127): + # ambiguous type, this occurs when selecting + # numbers from deep subqueries + return cursor.var( + cx_Oracle.STRING, + 255, + outconverter=dialect._detect_decimal, + arraysize=cursor.arraysize, + ) + elif precision and scale > 0: + return number_handler( + cursor, name, default_type, size, precision, scale + ) + else: + return float_handler( + cursor, name, default_type, size, precision, scale + ) + + # if unicode options were specified, add a decoder, otherwise + # cx_Oracle should return Unicode + elif ( + dialect._cursor_var_unicode_kwargs + and default_type + in ( + cx_Oracle.STRING, + cx_Oracle.FIXED_CHAR, + ) + and default_type is not cx_Oracle.CLOB + and default_type is not cx_Oracle.NCLOB + ): + return cursor.var( + str, + size, + cursor.arraysize, + **dialect._cursor_var_unicode_kwargs, + ) + + elif dialect.auto_convert_lobs and default_type in ( + cx_Oracle.CLOB, + cx_Oracle.NCLOB, + ): + return cursor.var( + cx_Oracle.DB_TYPE_NVARCHAR, + _CX_ORACLE_MAGIC_LOB_SIZE, + cursor.arraysize, + **dialect._cursor_var_unicode_kwargs, + ) + + elif dialect.auto_convert_lobs and default_type in ( + cx_Oracle.BLOB, + ): + return cursor.var( + cx_Oracle.DB_TYPE_RAW, + _CX_ORACLE_MAGIC_LOB_SIZE, + cursor.arraysize, + ) + + return output_type_handler + + def on_connect(self): + output_type_handler = self._generate_connection_outputtype_handler() + + def on_connect(conn): + conn.outputtypehandler = output_type_handler + + return on_connect + + def create_connect_args(self, url): + opts = dict(url.query) + + for opt in ("use_ansi", "auto_convert_lobs"): + if opt in opts: + util.warn_deprecated( + f"{self.driver} dialect option {opt!r} should only be " + "passed to create_engine directly, not within the URL " + "string", + version="1.3", + ) + util.coerce_kw_type(opts, opt, bool) + setattr(self, opt, opts.pop(opt)) + + database = url.database + service_name = opts.pop("service_name", None) + if database or service_name: + # if we have a database, then we have a remote host + port = url.port + if port: + port = int(port) + else: + port = 1521 + + if database and service_name: + raise exc.InvalidRequestError( + '"service_name" option shouldn\'t ' + 'be used with a "database" part of the url' + ) + if database: + makedsn_kwargs = {"sid": database} + if service_name: + makedsn_kwargs = {"service_name": service_name} + + dsn = self.dbapi.makedsn(url.host, port, **makedsn_kwargs) + else: + # we have a local tnsname + dsn = url.host + + if dsn is not None: + opts["dsn"] = dsn + if url.password is not None: + opts["password"] = url.password + if url.username is not None: + opts["user"] = url.username + + if self._cx_oracle_threaded is not None: + opts.setdefault("threaded", self._cx_oracle_threaded) + + def convert_cx_oracle_constant(value): + if isinstance(value, str): + try: + int_val = int(value) + except ValueError: + value = value.upper() + return getattr(self.dbapi, value) + else: + return int_val + else: + return value + + util.coerce_kw_type(opts, "mode", convert_cx_oracle_constant) + util.coerce_kw_type(opts, "threaded", bool) + util.coerce_kw_type(opts, "events", bool) + util.coerce_kw_type(opts, "purity", convert_cx_oracle_constant) + return ([], opts) + + def _get_server_version_info(self, connection): + return tuple(int(x) for x in connection.connection.version.split(".")) + + def is_disconnect(self, e, connection, cursor): + (error,) = e.args + if isinstance( + e, (self.dbapi.InterfaceError, self.dbapi.DatabaseError) + ) and "not connected" in str(e): + return True + + if hasattr(error, "code") and error.code in { + 28, + 3114, + 3113, + 3135, + 1033, + 2396, + }: + # ORA-00028: your session has been killed + # ORA-03114: not connected to ORACLE + # ORA-03113: end-of-file on communication channel + # ORA-03135: connection lost contact + # ORA-01033: ORACLE initialization or shutdown in progress + # ORA-02396: exceeded maximum idle time, please connect again + # TODO: Others ? + return True + + if re.match(r"^(?:DPI-1010|DPI-1080|DPY-1001|DPY-4011)", str(e)): + # DPI-1010: not connected + # DPI-1080: connection was closed by ORA-3113 + # python-oracledb's DPY-1001: not connected to database + # python-oracledb's DPY-4011: the database or network closed the + # connection + # TODO: others? + return True + + return False + + def create_xid(self): + """create a two-phase transaction ID. + + this id will be passed to do_begin_twophase(), do_rollback_twophase(), + do_commit_twophase(). its format is unspecified. + + """ + + id_ = random.randint(0, 2**128) + return (0x1234, "%032x" % id_, "%032x" % 9) + + def do_executemany(self, cursor, statement, parameters, context=None): + if isinstance(parameters, tuple): + parameters = list(parameters) + cursor.executemany(statement, parameters) + + def do_begin_twophase(self, connection, xid): + connection.connection.begin(*xid) + connection.connection.info["cx_oracle_xid"] = xid + + def do_prepare_twophase(self, connection, xid): + result = connection.connection.prepare() + connection.info["cx_oracle_prepared"] = result + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + self.do_rollback(connection.connection) + # TODO: need to end XA state here + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + if not is_prepared: + self.do_commit(connection.connection) + else: + if recover: + raise NotImplementedError( + "2pc recovery not implemented for cx_Oracle" + ) + oci_prepared = connection.info["cx_oracle_prepared"] + if oci_prepared: + self.do_commit(connection.connection) + # TODO: need to end XA state here + + def do_set_input_sizes(self, cursor, list_of_tuples, context): + if self.positional: + # not usually used, here to support if someone is modifying + # the dialect to use positional style + cursor.setinputsizes( + *[dbtype for key, dbtype, sqltype in list_of_tuples] + ) + else: + collection = ( + (key, dbtype) + for key, dbtype, sqltype in list_of_tuples + if dbtype + ) + + cursor.setinputsizes(**{key: dbtype for key, dbtype in collection}) + + def do_recover_twophase(self, connection): + raise NotImplementedError( + "recover two phase query for cx_Oracle not implemented" + ) + + +dialect = OracleDialect_cx_oracle diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py new file mode 100644 index 0000000..63479b9 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py @@ -0,0 +1,507 @@ +# dialects/oracle/dictionary.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +from .types import DATE +from .types import LONG +from .types import NUMBER +from .types import RAW +from .types import VARCHAR2 +from ... import Column +from ... import MetaData +from ... import Table +from ... import table +from ...sql.sqltypes import CHAR + +# constants +DB_LINK_PLACEHOLDER = "__$sa_dblink$__" +# tables +dual = table("dual") +dictionary_meta = MetaData() + +# NOTE: all the dictionary_meta are aliases because oracle does not like +# using the full table@dblink for every column in query, and complains with +# ORA-00960: ambiguous column naming in select list +all_tables = Table( + "all_tables" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("table_name", VARCHAR2(128), nullable=False), + Column("tablespace_name", VARCHAR2(30)), + Column("cluster_name", VARCHAR2(128)), + Column("iot_name", VARCHAR2(128)), + Column("status", VARCHAR2(8)), + Column("pct_free", NUMBER), + Column("pct_used", NUMBER), + Column("ini_trans", NUMBER), + Column("max_trans", NUMBER), + Column("initial_extent", NUMBER), + Column("next_extent", NUMBER), + Column("min_extents", NUMBER), + Column("max_extents", NUMBER), + Column("pct_increase", NUMBER), + Column("freelists", NUMBER), + Column("freelist_groups", NUMBER), + Column("logging", VARCHAR2(3)), + Column("backed_up", VARCHAR2(1)), + Column("num_rows", NUMBER), + Column("blocks", NUMBER), + Column("empty_blocks", NUMBER), + Column("avg_space", NUMBER), + Column("chain_cnt", NUMBER), + Column("avg_row_len", NUMBER), + Column("avg_space_freelist_blocks", NUMBER), + Column("num_freelist_blocks", NUMBER), + Column("degree", VARCHAR2(10)), + Column("instances", VARCHAR2(10)), + Column("cache", VARCHAR2(5)), + Column("table_lock", VARCHAR2(8)), + Column("sample_size", NUMBER), + Column("last_analyzed", DATE), + Column("partitioned", VARCHAR2(3)), + Column("iot_type", VARCHAR2(12)), + Column("temporary", VARCHAR2(1)), + Column("secondary", VARCHAR2(1)), + Column("nested", VARCHAR2(3)), + Column("buffer_pool", VARCHAR2(7)), + Column("flash_cache", VARCHAR2(7)), + Column("cell_flash_cache", VARCHAR2(7)), + Column("row_movement", VARCHAR2(8)), + Column("global_stats", VARCHAR2(3)), + Column("user_stats", VARCHAR2(3)), + Column("duration", VARCHAR2(15)), + Column("skip_corrupt", VARCHAR2(8)), + Column("monitoring", VARCHAR2(3)), + Column("cluster_owner", VARCHAR2(128)), + Column("dependencies", VARCHAR2(8)), + Column("compression", VARCHAR2(8)), + Column("compress_for", VARCHAR2(30)), + Column("dropped", VARCHAR2(3)), + Column("read_only", VARCHAR2(3)), + Column("segment_created", VARCHAR2(3)), + Column("result_cache", VARCHAR2(7)), + Column("clustering", VARCHAR2(3)), + Column("activity_tracking", VARCHAR2(23)), + Column("dml_timestamp", VARCHAR2(25)), + Column("has_identity", VARCHAR2(3)), + Column("container_data", VARCHAR2(3)), + Column("inmemory", VARCHAR2(8)), + Column("inmemory_priority", VARCHAR2(8)), + Column("inmemory_distribute", VARCHAR2(15)), + Column("inmemory_compression", VARCHAR2(17)), + Column("inmemory_duplicate", VARCHAR2(13)), + Column("default_collation", VARCHAR2(100)), + Column("duplicated", VARCHAR2(1)), + Column("sharded", VARCHAR2(1)), + Column("externally_sharded", VARCHAR2(1)), + Column("externally_duplicated", VARCHAR2(1)), + Column("external", VARCHAR2(3)), + Column("hybrid", VARCHAR2(3)), + Column("cellmemory", VARCHAR2(24)), + Column("containers_default", VARCHAR2(3)), + Column("container_map", VARCHAR2(3)), + Column("extended_data_link", VARCHAR2(3)), + Column("extended_data_link_map", VARCHAR2(3)), + Column("inmemory_service", VARCHAR2(12)), + Column("inmemory_service_name", VARCHAR2(1000)), + Column("container_map_object", VARCHAR2(3)), + Column("memoptimize_read", VARCHAR2(8)), + Column("memoptimize_write", VARCHAR2(8)), + Column("has_sensitive_column", VARCHAR2(3)), + Column("admit_null", VARCHAR2(3)), + Column("data_link_dml_enabled", VARCHAR2(3)), + Column("logical_replication", VARCHAR2(8)), +).alias("a_tables") + +all_views = Table( + "all_views" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("view_name", VARCHAR2(128), nullable=False), + Column("text_length", NUMBER), + Column("text", LONG), + Column("text_vc", VARCHAR2(4000)), + Column("type_text_length", NUMBER), + Column("type_text", VARCHAR2(4000)), + Column("oid_text_length", NUMBER), + Column("oid_text", VARCHAR2(4000)), + Column("view_type_owner", VARCHAR2(128)), + Column("view_type", VARCHAR2(128)), + Column("superview_name", VARCHAR2(128)), + Column("editioning_view", VARCHAR2(1)), + Column("read_only", VARCHAR2(1)), + Column("container_data", VARCHAR2(1)), + Column("bequeath", VARCHAR2(12)), + Column("origin_con_id", VARCHAR2(256)), + Column("default_collation", VARCHAR2(100)), + Column("containers_default", VARCHAR2(3)), + Column("container_map", VARCHAR2(3)), + Column("extended_data_link", VARCHAR2(3)), + Column("extended_data_link_map", VARCHAR2(3)), + Column("has_sensitive_column", VARCHAR2(3)), + Column("admit_null", VARCHAR2(3)), + Column("pdb_local_only", VARCHAR2(3)), +).alias("a_views") + +all_sequences = Table( + "all_sequences" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("sequence_owner", VARCHAR2(128), nullable=False), + Column("sequence_name", VARCHAR2(128), nullable=False), + Column("min_value", NUMBER), + Column("max_value", NUMBER), + Column("increment_by", NUMBER, nullable=False), + Column("cycle_flag", VARCHAR2(1)), + Column("order_flag", VARCHAR2(1)), + Column("cache_size", NUMBER, nullable=False), + Column("last_number", NUMBER, nullable=False), + Column("scale_flag", VARCHAR2(1)), + Column("extend_flag", VARCHAR2(1)), + Column("sharded_flag", VARCHAR2(1)), + Column("session_flag", VARCHAR2(1)), + Column("keep_value", VARCHAR2(1)), +).alias("a_sequences") + +all_users = Table( + "all_users" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("username", VARCHAR2(128), nullable=False), + Column("user_id", NUMBER, nullable=False), + Column("created", DATE, nullable=False), + Column("common", VARCHAR2(3)), + Column("oracle_maintained", VARCHAR2(1)), + Column("inherited", VARCHAR2(3)), + Column("default_collation", VARCHAR2(100)), + Column("implicit", VARCHAR2(3)), + Column("all_shard", VARCHAR2(3)), + Column("external_shard", VARCHAR2(3)), +).alias("a_users") + +all_mviews = Table( + "all_mviews" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("mview_name", VARCHAR2(128), nullable=False), + Column("container_name", VARCHAR2(128), nullable=False), + Column("query", LONG), + Column("query_len", NUMBER(38)), + Column("updatable", VARCHAR2(1)), + Column("update_log", VARCHAR2(128)), + Column("master_rollback_seg", VARCHAR2(128)), + Column("master_link", VARCHAR2(128)), + Column("rewrite_enabled", VARCHAR2(1)), + Column("rewrite_capability", VARCHAR2(9)), + Column("refresh_mode", VARCHAR2(6)), + Column("refresh_method", VARCHAR2(8)), + Column("build_mode", VARCHAR2(9)), + Column("fast_refreshable", VARCHAR2(18)), + Column("last_refresh_type", VARCHAR2(8)), + Column("last_refresh_date", DATE), + Column("last_refresh_end_time", DATE), + Column("staleness", VARCHAR2(19)), + Column("after_fast_refresh", VARCHAR2(19)), + Column("unknown_prebuilt", VARCHAR2(1)), + Column("unknown_plsql_func", VARCHAR2(1)), + Column("unknown_external_table", VARCHAR2(1)), + Column("unknown_consider_fresh", VARCHAR2(1)), + Column("unknown_import", VARCHAR2(1)), + Column("unknown_trusted_fd", VARCHAR2(1)), + Column("compile_state", VARCHAR2(19)), + Column("use_no_index", VARCHAR2(1)), + Column("stale_since", DATE), + Column("num_pct_tables", NUMBER), + Column("num_fresh_pct_regions", NUMBER), + Column("num_stale_pct_regions", NUMBER), + Column("segment_created", VARCHAR2(3)), + Column("evaluation_edition", VARCHAR2(128)), + Column("unusable_before", VARCHAR2(128)), + Column("unusable_beginning", VARCHAR2(128)), + Column("default_collation", VARCHAR2(100)), + Column("on_query_computation", VARCHAR2(1)), + Column("auto", VARCHAR2(3)), +).alias("a_mviews") + +all_tab_identity_cols = Table( + "all_tab_identity_cols" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("table_name", VARCHAR2(128), nullable=False), + Column("column_name", VARCHAR2(128), nullable=False), + Column("generation_type", VARCHAR2(10)), + Column("sequence_name", VARCHAR2(128), nullable=False), + Column("identity_options", VARCHAR2(298)), +).alias("a_tab_identity_cols") + +all_tab_cols = Table( + "all_tab_cols" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("table_name", VARCHAR2(128), nullable=False), + Column("column_name", VARCHAR2(128), nullable=False), + Column("data_type", VARCHAR2(128)), + Column("data_type_mod", VARCHAR2(3)), + Column("data_type_owner", VARCHAR2(128)), + Column("data_length", NUMBER, nullable=False), + Column("data_precision", NUMBER), + Column("data_scale", NUMBER), + Column("nullable", VARCHAR2(1)), + Column("column_id", NUMBER), + Column("default_length", NUMBER), + Column("data_default", LONG), + Column("num_distinct", NUMBER), + Column("low_value", RAW(1000)), + Column("high_value", RAW(1000)), + Column("density", NUMBER), + Column("num_nulls", NUMBER), + Column("num_buckets", NUMBER), + Column("last_analyzed", DATE), + Column("sample_size", NUMBER), + Column("character_set_name", VARCHAR2(44)), + Column("char_col_decl_length", NUMBER), + Column("global_stats", VARCHAR2(3)), + Column("user_stats", VARCHAR2(3)), + Column("avg_col_len", NUMBER), + Column("char_length", NUMBER), + Column("char_used", VARCHAR2(1)), + Column("v80_fmt_image", VARCHAR2(3)), + Column("data_upgraded", VARCHAR2(3)), + Column("hidden_column", VARCHAR2(3)), + Column("virtual_column", VARCHAR2(3)), + Column("segment_column_id", NUMBER), + Column("internal_column_id", NUMBER, nullable=False), + Column("histogram", VARCHAR2(15)), + Column("qualified_col_name", VARCHAR2(4000)), + Column("user_generated", VARCHAR2(3)), + Column("default_on_null", VARCHAR2(3)), + Column("identity_column", VARCHAR2(3)), + Column("evaluation_edition", VARCHAR2(128)), + Column("unusable_before", VARCHAR2(128)), + Column("unusable_beginning", VARCHAR2(128)), + Column("collation", VARCHAR2(100)), + Column("collated_column_id", NUMBER), +).alias("a_tab_cols") + +all_tab_comments = Table( + "all_tab_comments" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("table_name", VARCHAR2(128), nullable=False), + Column("table_type", VARCHAR2(11)), + Column("comments", VARCHAR2(4000)), + Column("origin_con_id", NUMBER), +).alias("a_tab_comments") + +all_col_comments = Table( + "all_col_comments" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("table_name", VARCHAR2(128), nullable=False), + Column("column_name", VARCHAR2(128), nullable=False), + Column("comments", VARCHAR2(4000)), + Column("origin_con_id", NUMBER), +).alias("a_col_comments") + +all_mview_comments = Table( + "all_mview_comments" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("mview_name", VARCHAR2(128), nullable=False), + Column("comments", VARCHAR2(4000)), +).alias("a_mview_comments") + +all_ind_columns = Table( + "all_ind_columns" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("index_owner", VARCHAR2(128), nullable=False), + Column("index_name", VARCHAR2(128), nullable=False), + Column("table_owner", VARCHAR2(128), nullable=False), + Column("table_name", VARCHAR2(128), nullable=False), + Column("column_name", VARCHAR2(4000)), + Column("column_position", NUMBER, nullable=False), + Column("column_length", NUMBER, nullable=False), + Column("char_length", NUMBER), + Column("descend", VARCHAR2(4)), + Column("collated_column_id", NUMBER), +).alias("a_ind_columns") + +all_indexes = Table( + "all_indexes" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("index_name", VARCHAR2(128), nullable=False), + Column("index_type", VARCHAR2(27)), + Column("table_owner", VARCHAR2(128), nullable=False), + Column("table_name", VARCHAR2(128), nullable=False), + Column("table_type", CHAR(11)), + Column("uniqueness", VARCHAR2(9)), + Column("compression", VARCHAR2(13)), + Column("prefix_length", NUMBER), + Column("tablespace_name", VARCHAR2(30)), + Column("ini_trans", NUMBER), + Column("max_trans", NUMBER), + Column("initial_extent", NUMBER), + Column("next_extent", NUMBER), + Column("min_extents", NUMBER), + Column("max_extents", NUMBER), + Column("pct_increase", NUMBER), + Column("pct_threshold", NUMBER), + Column("include_column", NUMBER), + Column("freelists", NUMBER), + Column("freelist_groups", NUMBER), + Column("pct_free", NUMBER), + Column("logging", VARCHAR2(3)), + Column("blevel", NUMBER), + Column("leaf_blocks", NUMBER), + Column("distinct_keys", NUMBER), + Column("avg_leaf_blocks_per_key", NUMBER), + Column("avg_data_blocks_per_key", NUMBER), + Column("clustering_factor", NUMBER), + Column("status", VARCHAR2(8)), + Column("num_rows", NUMBER), + Column("sample_size", NUMBER), + Column("last_analyzed", DATE), + Column("degree", VARCHAR2(40)), + Column("instances", VARCHAR2(40)), + Column("partitioned", VARCHAR2(3)), + Column("temporary", VARCHAR2(1)), + Column("generated", VARCHAR2(1)), + Column("secondary", VARCHAR2(1)), + Column("buffer_pool", VARCHAR2(7)), + Column("flash_cache", VARCHAR2(7)), + Column("cell_flash_cache", VARCHAR2(7)), + Column("user_stats", VARCHAR2(3)), + Column("duration", VARCHAR2(15)), + Column("pct_direct_access", NUMBER), + Column("ityp_owner", VARCHAR2(128)), + Column("ityp_name", VARCHAR2(128)), + Column("parameters", VARCHAR2(1000)), + Column("global_stats", VARCHAR2(3)), + Column("domidx_status", VARCHAR2(12)), + Column("domidx_opstatus", VARCHAR2(6)), + Column("funcidx_status", VARCHAR2(8)), + Column("join_index", VARCHAR2(3)), + Column("iot_redundant_pkey_elim", VARCHAR2(3)), + Column("dropped", VARCHAR2(3)), + Column("visibility", VARCHAR2(9)), + Column("domidx_management", VARCHAR2(14)), + Column("segment_created", VARCHAR2(3)), + Column("orphaned_entries", VARCHAR2(3)), + Column("indexing", VARCHAR2(7)), + Column("auto", VARCHAR2(3)), +).alias("a_indexes") + +all_ind_expressions = Table( + "all_ind_expressions" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("index_owner", VARCHAR2(128), nullable=False), + Column("index_name", VARCHAR2(128), nullable=False), + Column("table_owner", VARCHAR2(128), nullable=False), + Column("table_name", VARCHAR2(128), nullable=False), + Column("column_expression", LONG), + Column("column_position", NUMBER, nullable=False), +).alias("a_ind_expressions") + +all_constraints = Table( + "all_constraints" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128)), + Column("constraint_name", VARCHAR2(128)), + Column("constraint_type", VARCHAR2(1)), + Column("table_name", VARCHAR2(128)), + Column("search_condition", LONG), + Column("search_condition_vc", VARCHAR2(4000)), + Column("r_owner", VARCHAR2(128)), + Column("r_constraint_name", VARCHAR2(128)), + Column("delete_rule", VARCHAR2(9)), + Column("status", VARCHAR2(8)), + Column("deferrable", VARCHAR2(14)), + Column("deferred", VARCHAR2(9)), + Column("validated", VARCHAR2(13)), + Column("generated", VARCHAR2(14)), + Column("bad", VARCHAR2(3)), + Column("rely", VARCHAR2(4)), + Column("last_change", DATE), + Column("index_owner", VARCHAR2(128)), + Column("index_name", VARCHAR2(128)), + Column("invalid", VARCHAR2(7)), + Column("view_related", VARCHAR2(14)), + Column("origin_con_id", VARCHAR2(256)), +).alias("a_constraints") + +all_cons_columns = Table( + "all_cons_columns" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("constraint_name", VARCHAR2(128), nullable=False), + Column("table_name", VARCHAR2(128), nullable=False), + Column("column_name", VARCHAR2(4000)), + Column("position", NUMBER), +).alias("a_cons_columns") + +# TODO figure out if it's still relevant, since there is no mention from here +# https://docs.oracle.com/en/database/oracle/oracle-database/21/refrn/ALL_DB_LINKS.html +# original note: +# using user_db_links here since all_db_links appears +# to have more restricted permissions. +# https://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm +# will need to hear from more users if we are doing +# the right thing here. See [ticket:2619] +all_db_links = Table( + "all_db_links" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("db_link", VARCHAR2(128), nullable=False), + Column("username", VARCHAR2(128)), + Column("host", VARCHAR2(2000)), + Column("created", DATE, nullable=False), + Column("hidden", VARCHAR2(3)), + Column("shard_internal", VARCHAR2(3)), + Column("valid", VARCHAR2(3)), + Column("intra_cdb", VARCHAR2(3)), +).alias("a_db_links") + +all_synonyms = Table( + "all_synonyms" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128)), + Column("synonym_name", VARCHAR2(128)), + Column("table_owner", VARCHAR2(128)), + Column("table_name", VARCHAR2(128)), + Column("db_link", VARCHAR2(128)), + Column("origin_con_id", VARCHAR2(256)), +).alias("a_synonyms") + +all_objects = Table( + "all_objects" + DB_LINK_PLACEHOLDER, + dictionary_meta, + Column("owner", VARCHAR2(128), nullable=False), + Column("object_name", VARCHAR2(128), nullable=False), + Column("subobject_name", VARCHAR2(128)), + Column("object_id", NUMBER, nullable=False), + Column("data_object_id", NUMBER), + Column("object_type", VARCHAR2(23)), + Column("created", DATE, nullable=False), + Column("last_ddl_time", DATE, nullable=False), + Column("timestamp", VARCHAR2(19)), + Column("status", VARCHAR2(7)), + Column("temporary", VARCHAR2(1)), + Column("generated", VARCHAR2(1)), + Column("secondary", VARCHAR2(1)), + Column("namespace", NUMBER, nullable=False), + Column("edition_name", VARCHAR2(128)), + Column("sharing", VARCHAR2(13)), + Column("editionable", VARCHAR2(1)), + Column("oracle_maintained", VARCHAR2(1)), + Column("application", VARCHAR2(1)), + Column("default_collation", VARCHAR2(100)), + Column("duplicated", VARCHAR2(1)), + Column("sharded", VARCHAR2(1)), + Column("created_appid", NUMBER), + Column("created_vsnid", NUMBER), + Column("modified_appid", NUMBER), + Column("modified_vsnid", NUMBER), +).alias("a_objects") diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py new file mode 100644 index 0000000..9cdec3b --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py @@ -0,0 +1,311 @@ +# dialects/oracle/oracledb.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +r""" +.. dialect:: oracle+oracledb + :name: python-oracledb + :dbapi: oracledb + :connectstring: oracle+oracledb://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]] + :url: https://oracle.github.io/python-oracledb/ + +python-oracledb is released by Oracle to supersede the cx_Oracle driver. +It is fully compatible with cx_Oracle and features both a "thin" client +mode that requires no dependencies, as well as a "thick" mode that uses +the Oracle Client Interface in the same way as cx_Oracle. + +.. seealso:: + + :ref:`cx_oracle` - all of cx_Oracle's notes apply to the oracledb driver + as well. + +The SQLAlchemy ``oracledb`` dialect provides both a sync and an async +implementation under the same dialect name. The proper version is +selected depending on how the engine is created: + +* calling :func:`_sa.create_engine` with ``oracle+oracledb://...`` will + automatically select the sync version, e.g.:: + + from sqlalchemy import create_engine + sync_engine = create_engine("oracle+oracledb://scott:tiger@localhost/?service_name=XEPDB1") + +* calling :func:`_asyncio.create_async_engine` with + ``oracle+oracledb://...`` will automatically select the async version, + e.g.:: + + from sqlalchemy.ext.asyncio import create_async_engine + asyncio_engine = create_async_engine("oracle+oracledb://scott:tiger@localhost/?service_name=XEPDB1") + +The asyncio version of the dialect may also be specified explicitly using the +``oracledb_async`` suffix, as:: + + from sqlalchemy.ext.asyncio import create_async_engine + asyncio_engine = create_async_engine("oracle+oracledb_async://scott:tiger@localhost/?service_name=XEPDB1") + +.. versionadded:: 2.0.25 added support for the async version of oracledb. + +Thick mode support +------------------ + +By default the ``python-oracledb`` is started in thin mode, that does not +require oracle client libraries to be installed in the system. The +``python-oracledb`` driver also support a "thick" mode, that behaves +similarly to ``cx_oracle`` and requires that Oracle Client Interface (OCI) +is installed. + +To enable this mode, the user may call ``oracledb.init_oracle_client`` +manually, or by passing the parameter ``thick_mode=True`` to +:func:`_sa.create_engine`. To pass custom arguments to ``init_oracle_client``, +like the ``lib_dir`` path, a dict may be passed to this parameter, as in:: + + engine = sa.create_engine("oracle+oracledb://...", thick_mode={ + "lib_dir": "/path/to/oracle/client/lib", "driver_name": "my-app" + }) + +.. seealso:: + + https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.init_oracle_client + + +.. versionadded:: 2.0.0 added support for oracledb driver. + +""" # noqa +from __future__ import annotations + +import collections +import re +from typing import Any +from typing import TYPE_CHECKING + +from .cx_oracle import OracleDialect_cx_oracle as _OracleDialect_cx_oracle +from ... import exc +from ... import pool +from ...connectors.asyncio import AsyncAdapt_dbapi_connection +from ...connectors.asyncio import AsyncAdapt_dbapi_cursor +from ...connectors.asyncio import AsyncAdaptFallback_dbapi_connection +from ...util import asbool +from ...util import await_fallback +from ...util import await_only + +if TYPE_CHECKING: + from oracledb import AsyncConnection + from oracledb import AsyncCursor + + +class OracleDialect_oracledb(_OracleDialect_cx_oracle): + supports_statement_cache = True + driver = "oracledb" + _min_version = (1,) + + def __init__( + self, + auto_convert_lobs=True, + coerce_to_decimal=True, + arraysize=None, + encoding_errors=None, + thick_mode=None, + **kwargs, + ): + super().__init__( + auto_convert_lobs, + coerce_to_decimal, + arraysize, + encoding_errors, + **kwargs, + ) + + if self.dbapi is not None and ( + thick_mode or isinstance(thick_mode, dict) + ): + kw = thick_mode if isinstance(thick_mode, dict) else {} + self.dbapi.init_oracle_client(**kw) + + @classmethod + def import_dbapi(cls): + import oracledb + + return oracledb + + @classmethod + def is_thin_mode(cls, connection): + return connection.connection.dbapi_connection.thin + + @classmethod + def get_async_dialect_cls(cls, url): + return OracleDialectAsync_oracledb + + def _load_version(self, dbapi_module): + version = (0, 0, 0) + if dbapi_module is not None: + m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", dbapi_module.version) + if m: + version = tuple( + int(x) for x in m.group(1, 2, 3) if x is not None + ) + self.oracledb_ver = version + if ( + self.oracledb_ver > (0, 0, 0) + and self.oracledb_ver < self._min_version + ): + raise exc.InvalidRequestError( + f"oracledb version {self._min_version} and above are supported" + ) + + +class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor): + _cursor: AsyncCursor + __slots__ = () + + @property + def outputtypehandler(self): + return self._cursor.outputtypehandler + + @outputtypehandler.setter + def outputtypehandler(self, value): + self._cursor.outputtypehandler = value + + def var(self, *args, **kwargs): + return self._cursor.var(*args, **kwargs) + + def close(self): + self._rows.clear() + self._cursor.close() + + def setinputsizes(self, *args: Any, **kwargs: Any) -> Any: + return self._cursor.setinputsizes(*args, **kwargs) + + def _aenter_cursor(self, cursor: AsyncCursor) -> AsyncCursor: + try: + return cursor.__enter__() + except Exception as error: + self._adapt_connection._handle_exception(error) + + async def _execute_async(self, operation, parameters): + # override to not use mutex, oracledb already has mutex + + if parameters is None: + result = await self._cursor.execute(operation) + else: + result = await self._cursor.execute(operation, parameters) + + if self._cursor.description and not self.server_side: + self._rows = collections.deque(await self._cursor.fetchall()) + return result + + async def _executemany_async( + self, + operation, + seq_of_parameters, + ): + # override to not use mutex, oracledb already has mutex + return await self._cursor.executemany(operation, seq_of_parameters) + + def __enter__(self): + return self + + def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: + self.close() + + +class AsyncAdapt_oracledb_connection(AsyncAdapt_dbapi_connection): + _connection: AsyncConnection + __slots__ = () + + thin = True + + _cursor_cls = AsyncAdapt_oracledb_cursor + _ss_cursor_cls = None + + @property + def autocommit(self): + return self._connection.autocommit + + @autocommit.setter + def autocommit(self, value): + self._connection.autocommit = value + + @property + def outputtypehandler(self): + return self._connection.outputtypehandler + + @outputtypehandler.setter + def outputtypehandler(self, value): + self._connection.outputtypehandler = value + + @property + def version(self): + return self._connection.version + + @property + def stmtcachesize(self): + return self._connection.stmtcachesize + + @stmtcachesize.setter + def stmtcachesize(self, value): + self._connection.stmtcachesize = value + + def cursor(self): + return AsyncAdapt_oracledb_cursor(self) + + +class AsyncAdaptFallback_oracledb_connection( + AsyncAdaptFallback_dbapi_connection, AsyncAdapt_oracledb_connection +): + __slots__ = () + + +class OracledbAdaptDBAPI: + def __init__(self, oracledb) -> None: + self.oracledb = oracledb + + for k, v in self.oracledb.__dict__.items(): + if k != "connect": + self.__dict__[k] = v + + def connect(self, *arg, **kw): + async_fallback = kw.pop("async_fallback", False) + creator_fn = kw.pop("async_creator_fn", self.oracledb.connect_async) + + if asbool(async_fallback): + return AsyncAdaptFallback_oracledb_connection( + self, await_fallback(creator_fn(*arg, **kw)) + ) + + else: + return AsyncAdapt_oracledb_connection( + self, await_only(creator_fn(*arg, **kw)) + ) + + +class OracleDialectAsync_oracledb(OracleDialect_oracledb): + is_async = True + supports_statement_cache = True + + _min_version = (2,) + + # thick_mode mode is not supported by asyncio, oracledb will raise + @classmethod + def import_dbapi(cls): + import oracledb + + return OracledbAdaptDBAPI(oracledb) + + @classmethod + def get_pool_class(cls, url): + async_fallback = url.query.get("async_fallback", False) + + if asbool(async_fallback): + return pool.FallbackAsyncAdaptedQueuePool + else: + return pool.AsyncAdaptedQueuePool + + def get_driver_connection(self, connection): + return connection._connection + + +dialect = OracleDialect_oracledb +dialect_async = OracleDialectAsync_oracledb diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py new file mode 100644 index 0000000..b33c152 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py @@ -0,0 +1,220 @@ +# dialects/oracle/provision.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +from ... import create_engine +from ... import exc +from ... import inspect +from ...engine import url as sa_url +from ...testing.provision import configure_follower +from ...testing.provision import create_db +from ...testing.provision import drop_all_schema_objects_post_tables +from ...testing.provision import drop_all_schema_objects_pre_tables +from ...testing.provision import drop_db +from ...testing.provision import follower_url_from_main +from ...testing.provision import log +from ...testing.provision import post_configure_engine +from ...testing.provision import run_reap_dbs +from ...testing.provision import set_default_schema_on_connection +from ...testing.provision import stop_test_class_outside_fixtures +from ...testing.provision import temp_table_keyword_args +from ...testing.provision import update_db_opts + + +@create_db.for_db("oracle") +def _oracle_create_db(cfg, eng, ident): + # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or + # similar, so that the default tablespace is not "system"; reflection will + # fail otherwise + with eng.begin() as conn: + conn.exec_driver_sql("create user %s identified by xe" % ident) + conn.exec_driver_sql("create user %s_ts1 identified by xe" % ident) + conn.exec_driver_sql("create user %s_ts2 identified by xe" % ident) + conn.exec_driver_sql("grant dba to %s" % (ident,)) + conn.exec_driver_sql("grant unlimited tablespace to %s" % ident) + conn.exec_driver_sql("grant unlimited tablespace to %s_ts1" % ident) + conn.exec_driver_sql("grant unlimited tablespace to %s_ts2" % ident) + # these are needed to create materialized views + conn.exec_driver_sql("grant create table to %s" % ident) + conn.exec_driver_sql("grant create table to %s_ts1" % ident) + conn.exec_driver_sql("grant create table to %s_ts2" % ident) + + +@configure_follower.for_db("oracle") +def _oracle_configure_follower(config, ident): + config.test_schema = "%s_ts1" % ident + config.test_schema_2 = "%s_ts2" % ident + + +def _ora_drop_ignore(conn, dbname): + try: + conn.exec_driver_sql("drop user %s cascade" % dbname) + log.info("Reaped db: %s", dbname) + return True + except exc.DatabaseError as err: + log.warning("couldn't drop db: %s", err) + return False + + +@drop_all_schema_objects_pre_tables.for_db("oracle") +def _ora_drop_all_schema_objects_pre_tables(cfg, eng): + _purge_recyclebin(eng) + _purge_recyclebin(eng, cfg.test_schema) + + +@drop_all_schema_objects_post_tables.for_db("oracle") +def _ora_drop_all_schema_objects_post_tables(cfg, eng): + with eng.begin() as conn: + for syn in conn.dialect._get_synonyms(conn, None, None, None): + conn.exec_driver_sql(f"drop synonym {syn['synonym_name']}") + + for syn in conn.dialect._get_synonyms( + conn, cfg.test_schema, None, None + ): + conn.exec_driver_sql( + f"drop synonym {cfg.test_schema}.{syn['synonym_name']}" + ) + + for tmp_table in inspect(conn).get_temp_table_names(): + conn.exec_driver_sql(f"drop table {tmp_table}") + + +@drop_db.for_db("oracle") +def _oracle_drop_db(cfg, eng, ident): + with eng.begin() as conn: + # cx_Oracle seems to occasionally leak open connections when a large + # suite it run, even if we confirm we have zero references to + # connection objects. + # while there is a "kill session" command in Oracle, + # it unfortunately does not release the connection sufficiently. + _ora_drop_ignore(conn, ident) + _ora_drop_ignore(conn, "%s_ts1" % ident) + _ora_drop_ignore(conn, "%s_ts2" % ident) + + +@stop_test_class_outside_fixtures.for_db("oracle") +def _ora_stop_test_class_outside_fixtures(config, db, cls): + try: + _purge_recyclebin(db) + except exc.DatabaseError as err: + log.warning("purge recyclebin command failed: %s", err) + + # clear statement cache on all connections that were used + # https://github.com/oracle/python-cx_Oracle/issues/519 + + for cx_oracle_conn in _all_conns: + try: + sc = cx_oracle_conn.stmtcachesize + except db.dialect.dbapi.InterfaceError: + # connection closed + pass + else: + cx_oracle_conn.stmtcachesize = 0 + cx_oracle_conn.stmtcachesize = sc + _all_conns.clear() + + +def _purge_recyclebin(eng, schema=None): + with eng.begin() as conn: + if schema is None: + # run magic command to get rid of identity sequences + # https://floo.bar/2019/11/29/drop-the-underlying-sequence-of-an-identity-column/ # noqa: E501 + conn.exec_driver_sql("purge recyclebin") + else: + # per user: https://community.oracle.com/tech/developers/discussion/2255402/how-to-clear-dba-recyclebin-for-a-particular-user # noqa: E501 + for owner, object_name, type_ in conn.exec_driver_sql( + "select owner, object_name,type from " + "dba_recyclebin where owner=:schema and type='TABLE'", + {"schema": conn.dialect.denormalize_name(schema)}, + ).all(): + conn.exec_driver_sql(f'purge {type_} {owner}."{object_name}"') + + +_all_conns = set() + + +@post_configure_engine.for_db("oracle") +def _oracle_post_configure_engine(url, engine, follower_ident): + from sqlalchemy import event + + @event.listens_for(engine, "checkout") + def checkout(dbapi_con, con_record, con_proxy): + _all_conns.add(dbapi_con) + + @event.listens_for(engine, "checkin") + def checkin(dbapi_connection, connection_record): + # work around cx_Oracle issue: + # https://github.com/oracle/python-cx_Oracle/issues/530 + # invalidate oracle connections that had 2pc set up + if "cx_oracle_xid" in connection_record.info: + connection_record.invalidate() + + +@run_reap_dbs.for_db("oracle") +def _reap_oracle_dbs(url, idents): + log.info("db reaper connecting to %r", url) + eng = create_engine(url) + with eng.begin() as conn: + log.info("identifiers in file: %s", ", ".join(idents)) + + to_reap = conn.exec_driver_sql( + "select u.username from all_users u where username " + "like 'TEST_%' and not exists (select username " + "from v$session where username=u.username)" + ) + all_names = {username.lower() for (username,) in to_reap} + to_drop = set() + for name in all_names: + if name.endswith("_ts1") or name.endswith("_ts2"): + continue + elif name in idents: + to_drop.add(name) + if "%s_ts1" % name in all_names: + to_drop.add("%s_ts1" % name) + if "%s_ts2" % name in all_names: + to_drop.add("%s_ts2" % name) + + dropped = total = 0 + for total, username in enumerate(to_drop, 1): + if _ora_drop_ignore(conn, username): + dropped += 1 + log.info( + "Dropped %d out of %d stale databases detected", dropped, total + ) + + +@follower_url_from_main.for_db("oracle") +def _oracle_follower_url_from_main(url, ident): + url = sa_url.make_url(url) + return url.set(username=ident, password="xe") + + +@temp_table_keyword_args.for_db("oracle") +def _oracle_temp_table_keyword_args(cfg, eng): + return { + "prefixes": ["GLOBAL TEMPORARY"], + "oracle_on_commit": "PRESERVE ROWS", + } + + +@set_default_schema_on_connection.for_db("oracle") +def _oracle_set_default_schema_on_connection( + cfg, dbapi_connection, schema_name +): + cursor = dbapi_connection.cursor() + cursor.execute("ALTER SESSION SET CURRENT_SCHEMA=%s" % schema_name) + cursor.close() + + +@update_db_opts.for_db("oracle") +def _update_db_opts(db_url, db_opts, options): + """Set database options (db_opts) for a test database that we created.""" + if ( + options.oracledb_thick_mode + and sa_url.make_url(db_url).get_driver_name() == "oracledb" + ): + db_opts["thick_mode"] = True diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py new file mode 100644 index 0000000..36caaa0 --- /dev/null +++ b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py @@ -0,0 +1,287 @@ +# dialects/oracle/types.py +# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors +from __future__ import annotations + +import datetime as dt +from typing import Optional +from typing import Type +from typing import TYPE_CHECKING + +from ... import exc +from ...sql import sqltypes +from ...types import NVARCHAR +from ...types import VARCHAR + +if TYPE_CHECKING: + from ...engine.interfaces import Dialect + from ...sql.type_api import _LiteralProcessorType + + +class RAW(sqltypes._Binary): + __visit_name__ = "RAW" + + +OracleRaw = RAW + + +class NCLOB(sqltypes.Text): + __visit_name__ = "NCLOB" + + +class VARCHAR2(VARCHAR): + __visit_name__ = "VARCHAR2" + + +NVARCHAR2 = NVARCHAR + + +class NUMBER(sqltypes.Numeric, sqltypes.Integer): + __visit_name__ = "NUMBER" + + def __init__(self, precision=None, scale=None, asdecimal=None): + if asdecimal is None: + asdecimal = bool(scale and scale > 0) + + super().__init__(precision=precision, scale=scale, asdecimal=asdecimal) + + def adapt(self, impltype): + ret = super().adapt(impltype) + # leave a hint for the DBAPI handler + ret._is_oracle_number = True + return ret + + @property + def _type_affinity(self): + if bool(self.scale and self.scale > 0): + return sqltypes.Numeric + else: + return sqltypes.Integer + + +class FLOAT(sqltypes.FLOAT): + """Oracle FLOAT. + + This is the same as :class:`_sqltypes.FLOAT` except that + an Oracle-specific :paramref:`_oracle.FLOAT.binary_precision` + parameter is accepted, and + the :paramref:`_sqltypes.Float.precision` parameter is not accepted. + + Oracle FLOAT types indicate precision in terms of "binary precision", which + defaults to 126. For a REAL type, the value is 63. This parameter does not + cleanly map to a specific number of decimal places but is roughly + equivalent to the desired number of decimal places divided by 0.3103. + + .. versionadded:: 2.0 + + """ + + __visit_name__ = "FLOAT" + + def __init__( + self, + binary_precision=None, + asdecimal=False, + decimal_return_scale=None, + ): + r""" + Construct a FLOAT + + :param binary_precision: Oracle binary precision value to be rendered + in DDL. This may be approximated to the number of decimal characters + using the formula "decimal precision = 0.30103 * binary precision". + The default value used by Oracle for FLOAT / DOUBLE PRECISION is 126. + + :param asdecimal: See :paramref:`_sqltypes.Float.asdecimal` + + :param decimal_return_scale: See + :paramref:`_sqltypes.Float.decimal_return_scale` + + """ + super().__init__( + asdecimal=asdecimal, decimal_return_scale=decimal_return_scale + ) + self.binary_precision = binary_precision + + +class BINARY_DOUBLE(sqltypes.Double): + __visit_name__ = "BINARY_DOUBLE" + + +class BINARY_FLOAT(sqltypes.Float): + __visit_name__ = "BINARY_FLOAT" + + +class BFILE(sqltypes.LargeBinary): + __visit_name__ = "BFILE" + + +class LONG(sqltypes.Text): + __visit_name__ = "LONG" + + +class _OracleDateLiteralRender: + def _literal_processor_datetime(self, dialect): + def process(value): + if getattr(value, "microsecond", None): + value = ( + f"""TO_TIMESTAMP""" + f"""('{value.isoformat().replace("T", " ")}', """ + """'YYYY-MM-DD HH24:MI:SS.FF')""" + ) + else: + value = ( + f"""TO_DATE""" + f"""('{value.isoformat().replace("T", " ")}', """ + """'YYYY-MM-DD HH24:MI:SS')""" + ) + return value + + return process + + def _literal_processor_date(self, dialect): + def process(value): + if getattr(value, "microsecond", None): + value = ( + f"""TO_TIMESTAMP""" + f"""('{value.isoformat().split("T")[0]}', """ + """'YYYY-MM-DD')""" + ) + else: + value = ( + f"""TO_DATE""" + f"""('{value.isoformat().split("T")[0]}', """ + """'YYYY-MM-DD')""" + ) + return value + + return process + + +class DATE(_OracleDateLiteralRender, sqltypes.DateTime): + """Provide the oracle DATE type. + + This type has no special Python behavior, except that it subclasses + :class:`_types.DateTime`; this is to suit the fact that the Oracle + ``DATE`` type supports a time value. + + """ + + __visit_name__ = "DATE" + + def literal_processor(self, dialect): + return self._literal_processor_datetime(dialect) + + def _compare_type_affinity(self, other): + return other._type_affinity in (sqltypes.DateTime, sqltypes.Date) + + +class _OracleDate(_OracleDateLiteralRender, sqltypes.Date): + def literal_processor(self, dialect): + return self._literal_processor_date(dialect) + + +class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval): + __visit_name__ = "INTERVAL" + + def __init__(self, day_precision=None, second_precision=None): + """Construct an INTERVAL. + + Note that only DAY TO SECOND intervals are currently supported. + This is due to a lack of support for YEAR TO MONTH intervals + within available DBAPIs. + + :param day_precision: the day precision value. this is the number of + digits to store for the day field. Defaults to "2" + :param second_precision: the second precision value. this is the + number of digits to store for the fractional seconds field. + Defaults to "6". + + """ + self.day_precision = day_precision + self.second_precision = second_precision + + @classmethod + def _adapt_from_generic_interval(cls, interval): + return INTERVAL( + day_precision=interval.day_precision, + second_precision=interval.second_precision, + ) + + @classmethod + def adapt_emulated_to_native( + cls, interval: sqltypes.Interval, **kw # type: ignore[override] + ): + return INTERVAL( + day_precision=interval.day_precision, + second_precision=interval.second_precision, + ) + + @property + def _type_affinity(self): + return sqltypes.Interval + + def as_generic(self, allow_nulltype=False): + return sqltypes.Interval( + native=True, + second_precision=self.second_precision, + day_precision=self.day_precision, + ) + + @property + def python_type(self) -> Type[dt.timedelta]: + return dt.timedelta + + def literal_processor( + self, dialect: Dialect + ) -> Optional[_LiteralProcessorType[dt.timedelta]]: + def process(value: dt.timedelta) -> str: + return f"NUMTODSINTERVAL({value.total_seconds()}, 'SECOND')" + + return process + + +class TIMESTAMP(sqltypes.TIMESTAMP): + """Oracle implementation of ``TIMESTAMP``, which supports additional + Oracle-specific modes + + .. versionadded:: 2.0 + + """ + + def __init__(self, timezone: bool = False, local_timezone: bool = False): + """Construct a new :class:`_oracle.TIMESTAMP`. + + :param timezone: boolean. Indicates that the TIMESTAMP type should + use Oracle's ``TIMESTAMP WITH TIME ZONE`` datatype. + + :param local_timezone: boolean. Indicates that the TIMESTAMP type + should use Oracle's ``TIMESTAMP WITH LOCAL TIME ZONE`` datatype. + + + """ + if timezone and local_timezone: + raise exc.ArgumentError( + "timezone and local_timezone are mutually exclusive" + ) + super().__init__(timezone=timezone) + self.local_timezone = local_timezone + + +class ROWID(sqltypes.TypeEngine): + """Oracle ROWID type. + + When used in a cast() or similar, generates ROWID. + + """ + + __visit_name__ = "ROWID" + + +class _OracleBoolean(sqltypes.Boolean): + def get_dbapi_type(self, dbapi): + return dbapi.NUMBER |