summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py67
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pycbin1731 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pycbin136700 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pycbin62959 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pycbin32392 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pycbin15311 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pycbin12688 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pycbin13844 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py3240
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py1492
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py507
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py311
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py220
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py287
14 files changed, 0 insertions, 6124 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py
deleted file mode 100644
index d855122..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# dialects/oracle/__init__.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-from types import ModuleType
-
-from . import base # noqa
-from . import cx_oracle # noqa
-from . import oracledb # noqa
-from .base import BFILE
-from .base import BINARY_DOUBLE
-from .base import BINARY_FLOAT
-from .base import BLOB
-from .base import CHAR
-from .base import CLOB
-from .base import DATE
-from .base import DOUBLE_PRECISION
-from .base import FLOAT
-from .base import INTERVAL
-from .base import LONG
-from .base import NCHAR
-from .base import NCLOB
-from .base import NUMBER
-from .base import NVARCHAR
-from .base import NVARCHAR2
-from .base import RAW
-from .base import REAL
-from .base import ROWID
-from .base import TIMESTAMP
-from .base import VARCHAR
-from .base import VARCHAR2
-
-# Alias oracledb also as oracledb_async
-oracledb_async = type(
- "oracledb_async", (ModuleType,), {"dialect": oracledb.dialect_async}
-)
-
-base.dialect = dialect = cx_oracle.dialect
-
-__all__ = (
- "VARCHAR",
- "NVARCHAR",
- "CHAR",
- "NCHAR",
- "DATE",
- "NUMBER",
- "BLOB",
- "BFILE",
- "CLOB",
- "NCLOB",
- "TIMESTAMP",
- "RAW",
- "FLOAT",
- "DOUBLE_PRECISION",
- "BINARY_DOUBLE",
- "BINARY_FLOAT",
- "LONG",
- "dialect",
- "INTERVAL",
- "VARCHAR2",
- "NVARCHAR2",
- "ROWID",
- "REAL",
-)
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index cc02ead..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pyc
deleted file mode 100644
index bf594ef..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pyc
deleted file mode 100644
index 9e8e947..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pyc
deleted file mode 100644
index 89ce69c..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pyc
deleted file mode 100644
index 9325524..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pyc
deleted file mode 100644
index 6d3c52d..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pyc
deleted file mode 100644
index 24bfa8d..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py
deleted file mode 100644
index a548b34..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py
+++ /dev/null
@@ -1,3240 +0,0 @@
-# dialects/oracle/base.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-r"""
-.. dialect:: oracle
- :name: Oracle
- :full_support: 18c
- :normal_support: 11+
- :best_effort: 9+
-
-
-Auto Increment Behavior
------------------------
-
-SQLAlchemy Table objects which include integer primary keys are usually
-assumed to have "autoincrementing" behavior, meaning they can generate their
-own primary key values upon INSERT. For use within Oracle, two options are
-available, which are the use of IDENTITY columns (Oracle 12 and above only)
-or the association of a SEQUENCE with the column.
-
-Specifying GENERATED AS IDENTITY (Oracle 12 and above)
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Starting from version 12 Oracle can make use of identity columns using
-the :class:`_sql.Identity` to specify the autoincrementing behavior::
-
- t = Table('mytable', metadata,
- Column('id', Integer, Identity(start=3), primary_key=True),
- Column(...), ...
- )
-
-The CREATE TABLE for the above :class:`_schema.Table` object would be:
-
-.. sourcecode:: sql
-
- CREATE TABLE mytable (
- id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3),
- ...,
- PRIMARY KEY (id)
- )
-
-The :class:`_schema.Identity` object support many options to control the
-"autoincrementing" behavior of the column, like the starting value, the
-incrementing value, etc.
-In addition to the standard options, Oracle supports setting
-:paramref:`_schema.Identity.always` to ``None`` to use the default
-generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports
-setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL
-in conjunction with a 'BY DEFAULT' identity column.
-
-Using a SEQUENCE (all Oracle versions)
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Older version of Oracle had no "autoincrement"
-feature, SQLAlchemy relies upon sequences to produce these values. With the
-older Oracle versions, *a sequence must always be explicitly specified to
-enable autoincrement*. This is divergent with the majority of documentation
-examples which assume the usage of an autoincrement-capable database. To
-specify sequences, use the sqlalchemy.schema.Sequence object which is passed
-to a Column construct::
-
- t = Table('mytable', metadata,
- Column('id', Integer, Sequence('id_seq', start=1), primary_key=True),
- Column(...), ...
- )
-
-This step is also required when using table reflection, i.e. autoload_with=engine::
-
- t = Table('mytable', metadata,
- Column('id', Integer, Sequence('id_seq', start=1), primary_key=True),
- autoload_with=engine
- )
-
-.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
- in a :class:`_schema.Column` to specify the option of an autoincrementing
- column.
-
-.. _oracle_isolation_level:
-
-Transaction Isolation Level / Autocommit
-----------------------------------------
-
-The Oracle database supports "READ COMMITTED" and "SERIALIZABLE" modes of
-isolation. The AUTOCOMMIT isolation level is also supported by the cx_Oracle
-dialect.
-
-To set using per-connection execution options::
-
- connection = engine.connect()
- connection = connection.execution_options(
- isolation_level="AUTOCOMMIT"
- )
-
-For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle dialect sets the
-level at the session level using ``ALTER SESSION``, which is reverted back
-to its default setting when the connection is returned to the connection
-pool.
-
-Valid values for ``isolation_level`` include:
-
-* ``READ COMMITTED``
-* ``AUTOCOMMIT``
-* ``SERIALIZABLE``
-
-.. note:: The implementation for the
- :meth:`_engine.Connection.get_isolation_level` method as implemented by the
- Oracle dialect necessarily forces the start of a transaction using the
- Oracle LOCAL_TRANSACTION_ID function; otherwise no level is normally
- readable.
-
- Additionally, the :meth:`_engine.Connection.get_isolation_level` method will
- raise an exception if the ``v$transaction`` view is not available due to
- permissions or other reasons, which is a common occurrence in Oracle
- installations.
-
- The cx_Oracle dialect attempts to call the
- :meth:`_engine.Connection.get_isolation_level` method when the dialect makes
- its first connection to the database in order to acquire the
- "default"isolation level. This default level is necessary so that the level
- can be reset on a connection after it has been temporarily modified using
- :meth:`_engine.Connection.execution_options` method. In the common event
- that the :meth:`_engine.Connection.get_isolation_level` method raises an
- exception due to ``v$transaction`` not being readable as well as any other
- database-related failure, the level is assumed to be "READ COMMITTED". No
- warning is emitted for this initial first-connect condition as it is
- expected to be a common restriction on Oracle databases.
-
-.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_oracle dialect
- as well as the notion of a default isolation level
-
-.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live
- reading of the isolation level.
-
-.. versionchanged:: 1.3.22 In the event that the default isolation
- level cannot be read due to permissions on the v$transaction view as
- is common in Oracle installations, the default isolation level is hardcoded
- to "READ COMMITTED" which was the behavior prior to 1.3.21.
-
-.. seealso::
-
- :ref:`dbapi_autocommit`
-
-Identifier Casing
------------------
-
-In Oracle, the data dictionary represents all case insensitive identifier
-names using UPPERCASE text. SQLAlchemy on the other hand considers an
-all-lower case identifier name to be case insensitive. The Oracle dialect
-converts all case insensitive identifiers to and from those two formats during
-schema level communication, such as reflection of tables and indexes. Using
-an UPPERCASE name on the SQLAlchemy side indicates a case sensitive
-identifier, and SQLAlchemy will quote the name - this will cause mismatches
-against data dictionary data received from Oracle, so unless identifier names
-have been truly created as case sensitive (i.e. using quoted names), all
-lowercase names should be used on the SQLAlchemy side.
-
-.. _oracle_max_identifier_lengths:
-
-Max Identifier Lengths
-----------------------
-
-Oracle has changed the default max identifier length as of Oracle Server
-version 12.2. Prior to this version, the length was 30, and for 12.2 and
-greater it is now 128. This change impacts SQLAlchemy in the area of
-generated SQL label names as well as the generation of constraint names,
-particularly in the case where the constraint naming convention feature
-described at :ref:`constraint_naming_conventions` is being used.
-
-To assist with this change and others, Oracle includes the concept of a
-"compatibility" version, which is a version number that is independent of the
-actual server version in order to assist with migration of Oracle databases,
-and may be configured within the Oracle server itself. This compatibility
-version is retrieved using the query ``SELECT value FROM v$parameter WHERE
-name = 'compatible';``. The SQLAlchemy Oracle dialect, when tasked with
-determining the default max identifier length, will attempt to use this query
-upon first connect in order to determine the effective compatibility version of
-the server, which determines what the maximum allowed identifier length is for
-the server. If the table is not available, the server version information is
-used instead.
-
-As of SQLAlchemy 1.4, the default max identifier length for the Oracle dialect
-is 128 characters. Upon first connect, the compatibility version is detected
-and if it is less than Oracle version 12.2, the max identifier length is
-changed to be 30 characters. In all cases, setting the
-:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this
-change and the value given will be used as is::
-
- engine = create_engine(
- "oracle+cx_oracle://scott:tiger@oracle122",
- max_identifier_length=30)
-
-The maximum identifier length comes into play both when generating anonymized
-SQL labels in SELECT statements, but more crucially when generating constraint
-names from a naming convention. It is this area that has created the need for
-SQLAlchemy to change this default conservatively. For example, the following
-naming convention produces two very different constraint names based on the
-identifier length::
-
- from sqlalchemy import Column
- from sqlalchemy import Index
- from sqlalchemy import Integer
- from sqlalchemy import MetaData
- from sqlalchemy import Table
- from sqlalchemy.dialects import oracle
- from sqlalchemy.schema import CreateIndex
-
- m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"})
-
- t = Table(
- "t",
- m,
- Column("some_column_name_1", Integer),
- Column("some_column_name_2", Integer),
- Column("some_column_name_3", Integer),
- )
-
- ix = Index(
- None,
- t.c.some_column_name_1,
- t.c.some_column_name_2,
- t.c.some_column_name_3,
- )
-
- oracle_dialect = oracle.dialect(max_identifier_length=30)
- print(CreateIndex(ix).compile(dialect=oracle_dialect))
-
-With an identifier length of 30, the above CREATE INDEX looks like::
-
- CREATE INDEX ix_some_column_name_1s_70cd ON t
- (some_column_name_1, some_column_name_2, some_column_name_3)
-
-However with length=128, it becomes::
-
- CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t
- (some_column_name_1, some_column_name_2, some_column_name_3)
-
-Applications which have run versions of SQLAlchemy prior to 1.4 on an Oracle
-server version 12.2 or greater are therefore subject to the scenario of a
-database migration that wishes to "DROP CONSTRAINT" on a name that was
-previously generated with the shorter length. This migration will fail when
-the identifier length is changed without the name of the index or constraint
-first being adjusted. Such applications are strongly advised to make use of
-:paramref:`_sa.create_engine.max_identifier_length`
-in order to maintain control
-of the generation of truncated names, and to fully review and test all database
-migrations in a staging environment when changing this value to ensure that the
-impact of this change has been mitigated.
-
-.. versionchanged:: 1.4 the default max_identifier_length for Oracle is 128
- characters, which is adjusted down to 30 upon first connect if an older
- version of Oracle server (compatibility version < 12.2) is detected.
-
-
-LIMIT/OFFSET/FETCH Support
---------------------------
-
-Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make
-use of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming
-Oracle 12c or above, and assuming the SELECT statement is not embedded within
-a compound statement like UNION. This syntax is also available directly by using
-the :meth:`_sql.Select.fetch` method.
-
-.. versionchanged:: 2.0 the Oracle dialect now uses
- ``FETCH FIRST N ROW / OFFSET N ROWS`` for all
- :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` usage including
- within the ORM and legacy :class:`_orm.Query`. To force the legacy
- behavior using window functions, specify the ``enable_offset_fetch=False``
- dialect parameter to :func:`_sa.create_engine`.
-
-The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle version
-by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`, which
-will force the use of "legacy" mode that makes use of window functions.
-This mode is also selected automatically when using a version of Oracle
-prior to 12c.
-
-When using legacy mode, or when a :class:`.Select` statement
-with limit/offset is embedded in a compound statement, an emulated approach for
-LIMIT / OFFSET based on window functions is used, which involves creation of a
-subquery using ``ROW_NUMBER`` that is prone to performance issues as well as
-SQL construction issues for complex statements. However, this approach is
-supported by all Oracle versions. See notes below.
-
-Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used)
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the
-ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an
-Oracle version prior to 12c, the following notes apply:
-
-* SQLAlchemy currently makes use of ROWNUM to achieve
- LIMIT/OFFSET; the exact methodology is taken from
- https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
-
-* the "FIRST_ROWS()" optimization keyword is not used by default. To enable
- the usage of this optimization directive, specify ``optimize_limits=True``
- to :func:`_sa.create_engine`.
-
- .. versionchanged:: 1.4
- The Oracle dialect renders limit/offset integer values using a "post
- compile" scheme which renders the integer directly before passing the
- statement to the cursor for execution. The ``use_binds_for_limits`` flag
- no longer has an effect.
-
- .. seealso::
-
- :ref:`change_4808`.
-
-.. _oracle_returning:
-
-RETURNING Support
------------------
-
-The Oracle database supports RETURNING fully for INSERT, UPDATE and DELETE
-statements that are invoked with a single collection of bound parameters
-(that is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally
-support RETURNING with :term:`executemany` statements). Multiple rows may be
-returned as well.
-
-.. versionchanged:: 2.0 the Oracle backend has full support for RETURNING
- on parity with other backends.
-
-
-
-ON UPDATE CASCADE
------------------
-
-Oracle doesn't have native ON UPDATE CASCADE functionality. A trigger based
-solution is available at
-https://asktom.oracle.com/tkyte/update_cascade/index.html .
-
-When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
-cascading updates - specify ForeignKey objects using the
-"deferrable=True, initially='deferred'" keyword arguments,
-and specify "passive_updates=False" on each relationship().
-
-Oracle 8 Compatibility
-----------------------
-
-.. warning:: The status of Oracle 8 compatibility is not known for SQLAlchemy
- 2.0.
-
-When Oracle 8 is detected, the dialect internally configures itself to the
-following behaviors:
-
-* the use_ansi flag is set to False. This has the effect of converting all
- JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
- makes use of Oracle's (+) operator.
-
-* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
- the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued
- instead. This because these types don't seem to work correctly on Oracle 8
- even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and
- :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
- NVARCHAR2 and NCLOB.
-
-
-Synonym/DBLINK Reflection
--------------------------
-
-When using reflection with Table objects, the dialect can optionally search
-for tables indicated by synonyms, either in local or remote schemas or
-accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
-a keyword argument to the :class:`_schema.Table` construct::
-
- some_table = Table('some_table', autoload_with=some_engine,
- oracle_resolve_synonyms=True)
-
-When this flag is set, the given name (such as ``some_table`` above) will
-be searched not just in the ``ALL_TABLES`` view, but also within the
-``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
-name. If the synonym is located and refers to a DBLINK, the oracle dialect
-knows how to locate the table's information using DBLINK syntax(e.g.
-``@dblink``).
-
-``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
-accepted, including methods such as :meth:`_schema.MetaData.reflect` and
-:meth:`_reflection.Inspector.get_columns`.
-
-If synonyms are not in use, this flag should be left disabled.
-
-.. _oracle_constraint_reflection:
-
-Constraint Reflection
----------------------
-
-The Oracle dialect can return information about foreign key, unique, and
-CHECK constraints, as well as indexes on tables.
-
-Raw information regarding these constraints can be acquired using
-:meth:`_reflection.Inspector.get_foreign_keys`,
-:meth:`_reflection.Inspector.get_unique_constraints`,
-:meth:`_reflection.Inspector.get_check_constraints`, and
-:meth:`_reflection.Inspector.get_indexes`.
-
-.. versionchanged:: 1.2 The Oracle dialect can now reflect UNIQUE and
- CHECK constraints.
-
-When using reflection at the :class:`_schema.Table` level, the
-:class:`_schema.Table`
-will also include these constraints.
-
-Note the following caveats:
-
-* When using the :meth:`_reflection.Inspector.get_check_constraints` method,
- Oracle
- builds a special "IS NOT NULL" constraint for columns that specify
- "NOT NULL". This constraint is **not** returned by default; to include
- the "IS NOT NULL" constraints, pass the flag ``include_all=True``::
-
- from sqlalchemy import create_engine, inspect
-
- engine = create_engine("oracle+cx_oracle://s:t@dsn")
- inspector = inspect(engine)
- all_check_constraints = inspector.get_check_constraints(
- "some_table", include_all=True)
-
-* in most cases, when reflecting a :class:`_schema.Table`,
- a UNIQUE constraint will
- **not** be available as a :class:`.UniqueConstraint` object, as Oracle
- mirrors unique constraints with a UNIQUE index in most cases (the exception
- seems to be when two or more unique constraints represent the same columns);
- the :class:`_schema.Table` will instead represent these using
- :class:`.Index`
- with the ``unique=True`` flag set.
-
-* Oracle creates an implicit index for the primary key of a table; this index
- is **excluded** from all index results.
-
-* the list of columns reflected for an index will not include column names
- that start with SYS_NC.
-
-Table names with SYSTEM/SYSAUX tablespaces
--------------------------------------------
-
-The :meth:`_reflection.Inspector.get_table_names` and
-:meth:`_reflection.Inspector.get_temp_table_names`
-methods each return a list of table names for the current engine. These methods
-are also part of the reflection which occurs within an operation such as
-:meth:`_schema.MetaData.reflect`. By default,
-these operations exclude the ``SYSTEM``
-and ``SYSAUX`` tablespaces from the operation. In order to change this, the
-default list of tablespaces excluded can be changed at the engine level using
-the ``exclude_tablespaces`` parameter::
-
- # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
- e = create_engine(
- "oracle+cx_oracle://scott:tiger@xe",
- exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"])
-
-DateTime Compatibility
-----------------------
-
-Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``,
-which can actually store a date and time value. For this reason, the Oracle
-dialect provides a type :class:`_oracle.DATE` which is a subclass of
-:class:`.DateTime`. This type has no special behavior, and is only
-present as a "marker" for this type; additionally, when a database column
-is reflected and the type is reported as ``DATE``, the time-supporting
-:class:`_oracle.DATE` type is used.
-
-.. _oracle_table_options:
-
-Oracle Table Options
--------------------------
-
-The CREATE TABLE phrase supports the following options with Oracle
-in conjunction with the :class:`_schema.Table` construct:
-
-
-* ``ON COMMIT``::
-
- Table(
- "some_table", metadata, ...,
- prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS')
-
-* ``COMPRESS``::
-
- Table('mytable', metadata, Column('data', String(32)),
- oracle_compress=True)
-
- Table('mytable', metadata, Column('data', String(32)),
- oracle_compress=6)
-
- The ``oracle_compress`` parameter accepts either an integer compression
- level, or ``True`` to use the default compression level.
-
-.. _oracle_index_options:
-
-Oracle Specific Index Options
------------------------------
-
-Bitmap Indexes
-~~~~~~~~~~~~~~
-
-You can specify the ``oracle_bitmap`` parameter to create a bitmap index
-instead of a B-tree index::
-
- Index('my_index', my_table.c.data, oracle_bitmap=True)
-
-Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
-check for such limitations, only the database will.
-
-Index compression
-~~~~~~~~~~~~~~~~~
-
-Oracle has a more efficient storage mode for indexes containing lots of
-repeated values. Use the ``oracle_compress`` parameter to turn on key
-compression::
-
- Index('my_index', my_table.c.data, oracle_compress=True)
-
- Index('my_index', my_table.c.data1, my_table.c.data2, unique=True,
- oracle_compress=1)
-
-The ``oracle_compress`` parameter accepts either an integer specifying the
-number of prefix columns to compress, or ``True`` to use the default (all
-columns for non-unique indexes, all but the last column for unique indexes).
-
-""" # noqa
-
-from __future__ import annotations
-
-from collections import defaultdict
-from functools import lru_cache
-from functools import wraps
-import re
-
-from . import dictionary
-from .types import _OracleBoolean
-from .types import _OracleDate
-from .types import BFILE
-from .types import BINARY_DOUBLE
-from .types import BINARY_FLOAT
-from .types import DATE
-from .types import FLOAT
-from .types import INTERVAL
-from .types import LONG
-from .types import NCLOB
-from .types import NUMBER
-from .types import NVARCHAR2 # noqa
-from .types import OracleRaw # noqa
-from .types import RAW
-from .types import ROWID # noqa
-from .types import TIMESTAMP
-from .types import VARCHAR2 # noqa
-from ... import Computed
-from ... import exc
-from ... import schema as sa_schema
-from ... import sql
-from ... import util
-from ...engine import default
-from ...engine import ObjectKind
-from ...engine import ObjectScope
-from ...engine import reflection
-from ...engine.reflection import ReflectionDefaults
-from ...sql import and_
-from ...sql import bindparam
-from ...sql import compiler
-from ...sql import expression
-from ...sql import func
-from ...sql import null
-from ...sql import or_
-from ...sql import select
-from ...sql import sqltypes
-from ...sql import util as sql_util
-from ...sql import visitors
-from ...sql.visitors import InternalTraversal
-from ...types import BLOB
-from ...types import CHAR
-from ...types import CLOB
-from ...types import DOUBLE_PRECISION
-from ...types import INTEGER
-from ...types import NCHAR
-from ...types import NVARCHAR
-from ...types import REAL
-from ...types import VARCHAR
-
-RESERVED_WORDS = set(
- "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
- "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
- "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
- "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
- "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
- "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
- "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
- "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
- "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
-)
-
-NO_ARG_FNS = set(
- "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split()
-)
-
-
-colspecs = {
- sqltypes.Boolean: _OracleBoolean,
- sqltypes.Interval: INTERVAL,
- sqltypes.DateTime: DATE,
- sqltypes.Date: _OracleDate,
-}
-
-ischema_names = {
- "VARCHAR2": VARCHAR,
- "NVARCHAR2": NVARCHAR,
- "CHAR": CHAR,
- "NCHAR": NCHAR,
- "DATE": DATE,
- "NUMBER": NUMBER,
- "BLOB": BLOB,
- "BFILE": BFILE,
- "CLOB": CLOB,
- "NCLOB": NCLOB,
- "TIMESTAMP": TIMESTAMP,
- "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
- "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP,
- "INTERVAL DAY TO SECOND": INTERVAL,
- "RAW": RAW,
- "FLOAT": FLOAT,
- "DOUBLE PRECISION": DOUBLE_PRECISION,
- "REAL": REAL,
- "LONG": LONG,
- "BINARY_DOUBLE": BINARY_DOUBLE,
- "BINARY_FLOAT": BINARY_FLOAT,
- "ROWID": ROWID,
-}
-
-
-class OracleTypeCompiler(compiler.GenericTypeCompiler):
- # Note:
- # Oracle DATE == DATETIME
- # Oracle does not allow milliseconds in DATE
- # Oracle does not support TIME columns
-
- def visit_datetime(self, type_, **kw):
- return self.visit_DATE(type_, **kw)
-
- def visit_float(self, type_, **kw):
- return self.visit_FLOAT(type_, **kw)
-
- def visit_double(self, type_, **kw):
- return self.visit_DOUBLE_PRECISION(type_, **kw)
-
- def visit_unicode(self, type_, **kw):
- if self.dialect._use_nchar_for_unicode:
- return self.visit_NVARCHAR2(type_, **kw)
- else:
- return self.visit_VARCHAR2(type_, **kw)
-
- def visit_INTERVAL(self, type_, **kw):
- return "INTERVAL DAY%s TO SECOND%s" % (
- type_.day_precision is not None
- and "(%d)" % type_.day_precision
- or "",
- type_.second_precision is not None
- and "(%d)" % type_.second_precision
- or "",
- )
-
- def visit_LONG(self, type_, **kw):
- return "LONG"
-
- def visit_TIMESTAMP(self, type_, **kw):
- if getattr(type_, "local_timezone", False):
- return "TIMESTAMP WITH LOCAL TIME ZONE"
- elif type_.timezone:
- return "TIMESTAMP WITH TIME ZONE"
- else:
- return "TIMESTAMP"
-
- def visit_DOUBLE_PRECISION(self, type_, **kw):
- return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
-
- def visit_BINARY_DOUBLE(self, type_, **kw):
- return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
-
- def visit_BINARY_FLOAT(self, type_, **kw):
- return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
-
- def visit_FLOAT(self, type_, **kw):
- kw["_requires_binary_precision"] = True
- return self._generate_numeric(type_, "FLOAT", **kw)
-
- def visit_NUMBER(self, type_, **kw):
- return self._generate_numeric(type_, "NUMBER", **kw)
-
- def _generate_numeric(
- self,
- type_,
- name,
- precision=None,
- scale=None,
- _requires_binary_precision=False,
- **kw,
- ):
- if precision is None:
- precision = getattr(type_, "precision", None)
-
- if _requires_binary_precision:
- binary_precision = getattr(type_, "binary_precision", None)
-
- if precision and binary_precision is None:
- # https://www.oracletutorial.com/oracle-basics/oracle-float/
- estimated_binary_precision = int(precision / 0.30103)
- raise exc.ArgumentError(
- "Oracle FLOAT types use 'binary precision', which does "
- "not convert cleanly from decimal 'precision'. Please "
- "specify "
- f"this type with a separate Oracle variant, such as "
- f"{type_.__class__.__name__}(precision={precision})."
- f"with_variant(oracle.FLOAT"
- f"(binary_precision="
- f"{estimated_binary_precision}), 'oracle'), so that the "
- "Oracle specific 'binary_precision' may be specified "
- "accurately."
- )
- else:
- precision = binary_precision
-
- if scale is None:
- scale = getattr(type_, "scale", None)
-
- if precision is None:
- return name
- elif scale is None:
- n = "%(name)s(%(precision)s)"
- return n % {"name": name, "precision": precision}
- else:
- n = "%(name)s(%(precision)s, %(scale)s)"
- return n % {"name": name, "precision": precision, "scale": scale}
-
- def visit_string(self, type_, **kw):
- return self.visit_VARCHAR2(type_, **kw)
-
- def visit_VARCHAR2(self, type_, **kw):
- return self._visit_varchar(type_, "", "2")
-
- def visit_NVARCHAR2(self, type_, **kw):
- return self._visit_varchar(type_, "N", "2")
-
- visit_NVARCHAR = visit_NVARCHAR2
-
- def visit_VARCHAR(self, type_, **kw):
- return self._visit_varchar(type_, "", "")
-
- def _visit_varchar(self, type_, n, num):
- if not type_.length:
- return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
- elif not n and self.dialect._supports_char_length:
- varchar = "VARCHAR%(two)s(%(length)s CHAR)"
- return varchar % {"length": type_.length, "two": num}
- else:
- varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
- return varchar % {"length": type_.length, "two": num, "n": n}
-
- def visit_text(self, type_, **kw):
- return self.visit_CLOB(type_, **kw)
-
- def visit_unicode_text(self, type_, **kw):
- if self.dialect._use_nchar_for_unicode:
- return self.visit_NCLOB(type_, **kw)
- else:
- return self.visit_CLOB(type_, **kw)
-
- def visit_large_binary(self, type_, **kw):
- return self.visit_BLOB(type_, **kw)
-
- def visit_big_integer(self, type_, **kw):
- return self.visit_NUMBER(type_, precision=19, **kw)
-
- def visit_boolean(self, type_, **kw):
- return self.visit_SMALLINT(type_, **kw)
-
- def visit_RAW(self, type_, **kw):
- if type_.length:
- return "RAW(%(length)s)" % {"length": type_.length}
- else:
- return "RAW"
-
- def visit_ROWID(self, type_, **kw):
- return "ROWID"
-
-
-class OracleCompiler(compiler.SQLCompiler):
- """Oracle compiler modifies the lexical structure of Select
- statements to work under non-ANSI configured Oracle databases, if
- the use_ansi flag is False.
- """
-
- compound_keywords = util.update_copy(
- compiler.SQLCompiler.compound_keywords,
- {expression.CompoundSelect.EXCEPT: "MINUS"},
- )
-
- def __init__(self, *args, **kwargs):
- self.__wheres = {}
- super().__init__(*args, **kwargs)
-
- def visit_mod_binary(self, binary, operator, **kw):
- return "mod(%s, %s)" % (
- self.process(binary.left, **kw),
- self.process(binary.right, **kw),
- )
-
- def visit_now_func(self, fn, **kw):
- return "CURRENT_TIMESTAMP"
-
- def visit_char_length_func(self, fn, **kw):
- return "LENGTH" + self.function_argspec(fn, **kw)
-
- def visit_match_op_binary(self, binary, operator, **kw):
- return "CONTAINS (%s, %s)" % (
- self.process(binary.left),
- self.process(binary.right),
- )
-
- def visit_true(self, expr, **kw):
- return "1"
-
- def visit_false(self, expr, **kw):
- return "0"
-
- def get_cte_preamble(self, recursive):
- return "WITH"
-
- def get_select_hint_text(self, byfroms):
- return " ".join("/*+ %s */" % text for table, text in byfroms.items())
-
- def function_argspec(self, fn, **kw):
- if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
- return compiler.SQLCompiler.function_argspec(self, fn, **kw)
- else:
- return ""
-
- def visit_function(self, func, **kw):
- text = super().visit_function(func, **kw)
- if kw.get("asfrom", False):
- text = "TABLE (%s)" % text
- return text
-
- def visit_table_valued_column(self, element, **kw):
- text = super().visit_table_valued_column(element, **kw)
- text = text + ".COLUMN_VALUE"
- return text
-
- def default_from(self):
- """Called when a ``SELECT`` statement has no froms,
- and no ``FROM`` clause is to be appended.
-
- The Oracle compiler tacks a "FROM DUAL" to the statement.
- """
-
- return " FROM DUAL"
-
- def visit_join(self, join, from_linter=None, **kwargs):
- if self.dialect.use_ansi:
- return compiler.SQLCompiler.visit_join(
- self, join, from_linter=from_linter, **kwargs
- )
- else:
- if from_linter:
- from_linter.edges.add((join.left, join.right))
-
- kwargs["asfrom"] = True
- if isinstance(join.right, expression.FromGrouping):
- right = join.right.element
- else:
- right = join.right
- return (
- self.process(join.left, from_linter=from_linter, **kwargs)
- + ", "
- + self.process(right, from_linter=from_linter, **kwargs)
- )
-
- def _get_nonansi_join_whereclause(self, froms):
- clauses = []
-
- def visit_join(join):
- if join.isouter:
- # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354
- # "apply the outer join operator (+) to all columns of B in
- # the join condition in the WHERE clause" - that is,
- # unconditionally regardless of operator or the other side
- def visit_binary(binary):
- if isinstance(
- binary.left, expression.ColumnClause
- ) and join.right.is_derived_from(binary.left.table):
- binary.left = _OuterJoinColumn(binary.left)
- elif isinstance(
- binary.right, expression.ColumnClause
- ) and join.right.is_derived_from(binary.right.table):
- binary.right = _OuterJoinColumn(binary.right)
-
- clauses.append(
- visitors.cloned_traverse(
- join.onclause, {}, {"binary": visit_binary}
- )
- )
- else:
- clauses.append(join.onclause)
-
- for j in join.left, join.right:
- if isinstance(j, expression.Join):
- visit_join(j)
- elif isinstance(j, expression.FromGrouping):
- visit_join(j.element)
-
- for f in froms:
- if isinstance(f, expression.Join):
- visit_join(f)
-
- if not clauses:
- return None
- else:
- return sql.and_(*clauses)
-
- def visit_outer_join_column(self, vc, **kw):
- return self.process(vc.column, **kw) + "(+)"
-
- def visit_sequence(self, seq, **kw):
- return self.preparer.format_sequence(seq) + ".nextval"
-
- def get_render_as_alias_suffix(self, alias_name_text):
- """Oracle doesn't like ``FROM table AS alias``"""
-
- return " " + alias_name_text
-
- def returning_clause(
- self, stmt, returning_cols, *, populate_result_map, **kw
- ):
- columns = []
- binds = []
-
- for i, column in enumerate(
- expression._select_iterables(returning_cols)
- ):
- if (
- self.isupdate
- and isinstance(column, sa_schema.Column)
- and isinstance(column.server_default, Computed)
- and not self.dialect._supports_update_returning_computed_cols
- ):
- util.warn(
- "Computed columns don't work with Oracle UPDATE "
- "statements that use RETURNING; the value of the column "
- "*before* the UPDATE takes place is returned. It is "
- "advised to not use RETURNING with an Oracle computed "
- "column. Consider setting implicit_returning to False on "
- "the Table object in order to avoid implicit RETURNING "
- "clauses from being generated for this Table."
- )
- if column.type._has_column_expression:
- col_expr = column.type.column_expression(column)
- else:
- col_expr = column
-
- outparam = sql.outparam("ret_%d" % i, type_=column.type)
- self.binds[outparam.key] = outparam
- binds.append(
- self.bindparam_string(self._truncate_bindparam(outparam))
- )
-
- # has_out_parameters would in a normal case be set to True
- # as a result of the compiler visiting an outparam() object.
- # in this case, the above outparam() objects are not being
- # visited. Ensure the statement itself didn't have other
- # outparam() objects independently.
- # technically, this could be supported, but as it would be
- # a very strange use case without a clear rationale, disallow it
- if self.has_out_parameters:
- raise exc.InvalidRequestError(
- "Using explicit outparam() objects with "
- "UpdateBase.returning() in the same Core DML statement "
- "is not supported in the Oracle dialect."
- )
-
- self._oracle_returning = True
-
- columns.append(self.process(col_expr, within_columns_clause=False))
- if populate_result_map:
- self._add_to_result_map(
- getattr(col_expr, "name", col_expr._anon_name_label),
- getattr(col_expr, "name", col_expr._anon_name_label),
- (
- column,
- getattr(column, "name", None),
- getattr(column, "key", None),
- ),
- column.type,
- )
-
- return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
-
- def _row_limit_clause(self, select, **kw):
- """ORacle 12c supports OFFSET/FETCH operators
- Use it instead subquery with row_number
-
- """
-
- if (
- select._fetch_clause is not None
- or not self.dialect._supports_offset_fetch
- ):
- return super()._row_limit_clause(
- select, use_literal_execute_for_simple_int=True, **kw
- )
- else:
- return self.fetch_clause(
- select,
- fetch_clause=self._get_limit_or_fetch(select),
- use_literal_execute_for_simple_int=True,
- **kw,
- )
-
- def _get_limit_or_fetch(self, select):
- if select._fetch_clause is None:
- return select._limit_clause
- else:
- return select._fetch_clause
-
- def translate_select_structure(self, select_stmt, **kwargs):
- select = select_stmt
-
- if not getattr(select, "_oracle_visit", None):
- if not self.dialect.use_ansi:
- froms = self._display_froms_for_select(
- select, kwargs.get("asfrom", False)
- )
- whereclause = self._get_nonansi_join_whereclause(froms)
- if whereclause is not None:
- select = select.where(whereclause)
- select._oracle_visit = True
-
- # if fetch is used this is not needed
- if (
- select._has_row_limiting_clause
- and not self.dialect._supports_offset_fetch
- and select._fetch_clause is None
- ):
- limit_clause = select._limit_clause
- offset_clause = select._offset_clause
-
- if select._simple_int_clause(limit_clause):
- limit_clause = limit_clause.render_literal_execute()
-
- if select._simple_int_clause(offset_clause):
- offset_clause = offset_clause.render_literal_execute()
-
- # currently using form at:
- # https://blogs.oracle.com/oraclemagazine/\
- # on-rownum-and-limiting-results
-
- orig_select = select
- select = select._generate()
- select._oracle_visit = True
-
- # add expressions to accommodate FOR UPDATE OF
- for_update = select._for_update_arg
- if for_update is not None and for_update.of:
- for_update = for_update._clone()
- for_update._copy_internals()
-
- for elem in for_update.of:
- if not select.selected_columns.contains_column(elem):
- select = select.add_columns(elem)
-
- # Wrap the middle select and add the hint
- inner_subquery = select.alias()
- limitselect = sql.select(
- *[
- c
- for c in inner_subquery.c
- if orig_select.selected_columns.corresponding_column(c)
- is not None
- ]
- )
-
- if (
- limit_clause is not None
- and self.dialect.optimize_limits
- and select._simple_int_clause(limit_clause)
- ):
- limitselect = limitselect.prefix_with(
- expression.text(
- "/*+ FIRST_ROWS(%s) */"
- % self.process(limit_clause, **kwargs)
- )
- )
-
- limitselect._oracle_visit = True
- limitselect._is_wrapper = True
-
- # add expressions to accommodate FOR UPDATE OF
- if for_update is not None and for_update.of:
- adapter = sql_util.ClauseAdapter(inner_subquery)
- for_update.of = [
- adapter.traverse(elem) for elem in for_update.of
- ]
-
- # If needed, add the limiting clause
- if limit_clause is not None:
- if select._simple_int_clause(limit_clause) and (
- offset_clause is None
- or select._simple_int_clause(offset_clause)
- ):
- max_row = limit_clause
-
- if offset_clause is not None:
- max_row = max_row + offset_clause
-
- else:
- max_row = limit_clause
-
- if offset_clause is not None:
- max_row = max_row + offset_clause
- limitselect = limitselect.where(
- sql.literal_column("ROWNUM") <= max_row
- )
-
- # If needed, add the ora_rn, and wrap again with offset.
- if offset_clause is None:
- limitselect._for_update_arg = for_update
- select = limitselect
- else:
- limitselect = limitselect.add_columns(
- sql.literal_column("ROWNUM").label("ora_rn")
- )
- limitselect._oracle_visit = True
- limitselect._is_wrapper = True
-
- if for_update is not None and for_update.of:
- limitselect_cols = limitselect.selected_columns
- for elem in for_update.of:
- if (
- limitselect_cols.corresponding_column(elem)
- is None
- ):
- limitselect = limitselect.add_columns(elem)
-
- limit_subquery = limitselect.alias()
- origselect_cols = orig_select.selected_columns
- offsetselect = sql.select(
- *[
- c
- for c in limit_subquery.c
- if origselect_cols.corresponding_column(c)
- is not None
- ]
- )
-
- offsetselect._oracle_visit = True
- offsetselect._is_wrapper = True
-
- if for_update is not None and for_update.of:
- adapter = sql_util.ClauseAdapter(limit_subquery)
- for_update.of = [
- adapter.traverse(elem) for elem in for_update.of
- ]
-
- offsetselect = offsetselect.where(
- sql.literal_column("ora_rn") > offset_clause
- )
-
- offsetselect._for_update_arg = for_update
- select = offsetselect
-
- return select
-
- def limit_clause(self, select, **kw):
- return ""
-
- def visit_empty_set_expr(self, type_, **kw):
- return "SELECT 1 FROM DUAL WHERE 1!=1"
-
- def for_update_clause(self, select, **kw):
- if self.is_subquery():
- return ""
-
- tmp = " FOR UPDATE"
-
- if select._for_update_arg.of:
- tmp += " OF " + ", ".join(
- self.process(elem, **kw) for elem in select._for_update_arg.of
- )
-
- if select._for_update_arg.nowait:
- tmp += " NOWAIT"
- if select._for_update_arg.skip_locked:
- tmp += " SKIP LOCKED"
-
- return tmp
-
- def visit_is_distinct_from_binary(self, binary, operator, **kw):
- return "DECODE(%s, %s, 0, 1) = 1" % (
- self.process(binary.left),
- self.process(binary.right),
- )
-
- def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
- return "DECODE(%s, %s, 0, 1) = 0" % (
- self.process(binary.left),
- self.process(binary.right),
- )
-
- def visit_regexp_match_op_binary(self, binary, operator, **kw):
- string = self.process(binary.left, **kw)
- pattern = self.process(binary.right, **kw)
- flags = binary.modifiers["flags"]
- if flags is None:
- return "REGEXP_LIKE(%s, %s)" % (string, pattern)
- else:
- return "REGEXP_LIKE(%s, %s, %s)" % (
- string,
- pattern,
- self.render_literal_value(flags, sqltypes.STRINGTYPE),
- )
-
- def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
- return "NOT %s" % self.visit_regexp_match_op_binary(
- binary, operator, **kw
- )
-
- def visit_regexp_replace_op_binary(self, binary, operator, **kw):
- string = self.process(binary.left, **kw)
- pattern_replace = self.process(binary.right, **kw)
- flags = binary.modifiers["flags"]
- if flags is None:
- return "REGEXP_REPLACE(%s, %s)" % (
- string,
- pattern_replace,
- )
- else:
- return "REGEXP_REPLACE(%s, %s, %s)" % (
- string,
- pattern_replace,
- self.render_literal_value(flags, sqltypes.STRINGTYPE),
- )
-
- def visit_aggregate_strings_func(self, fn, **kw):
- return "LISTAGG%s" % self.function_argspec(fn, **kw)
-
-
-class OracleDDLCompiler(compiler.DDLCompiler):
- def define_constraint_cascades(self, constraint):
- text = ""
- if constraint.ondelete is not None:
- text += " ON DELETE %s" % constraint.ondelete
-
- # oracle has no ON UPDATE CASCADE -
- # its only available via triggers
- # https://asktom.oracle.com/tkyte/update_cascade/index.html
- if constraint.onupdate is not None:
- util.warn(
- "Oracle does not contain native UPDATE CASCADE "
- "functionality - onupdates will not be rendered for foreign "
- "keys. Consider using deferrable=True, initially='deferred' "
- "or triggers."
- )
-
- return text
-
- def visit_drop_table_comment(self, drop, **kw):
- return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table(
- drop.element
- )
-
- def visit_create_index(self, create, **kw):
- index = create.element
- self._verify_index_table(index)
- preparer = self.preparer
- text = "CREATE "
- if index.unique:
- text += "UNIQUE "
- if index.dialect_options["oracle"]["bitmap"]:
- text += "BITMAP "
- text += "INDEX %s ON %s (%s)" % (
- self._prepared_index_name(index, include_schema=True),
- preparer.format_table(index.table, use_schema=True),
- ", ".join(
- self.sql_compiler.process(
- expr, include_table=False, literal_binds=True
- )
- for expr in index.expressions
- ),
- )
- if index.dialect_options["oracle"]["compress"] is not False:
- if index.dialect_options["oracle"]["compress"] is True:
- text += " COMPRESS"
- else:
- text += " COMPRESS %d" % (
- index.dialect_options["oracle"]["compress"]
- )
- return text
-
- def post_create_table(self, table):
- table_opts = []
- opts = table.dialect_options["oracle"]
-
- if opts["on_commit"]:
- on_commit_options = opts["on_commit"].replace("_", " ").upper()
- table_opts.append("\n ON COMMIT %s" % on_commit_options)
-
- if opts["compress"]:
- if opts["compress"] is True:
- table_opts.append("\n COMPRESS")
- else:
- table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
-
- return "".join(table_opts)
-
- def get_identity_options(self, identity_options):
- text = super().get_identity_options(identity_options)
- text = text.replace("NO MINVALUE", "NOMINVALUE")
- text = text.replace("NO MAXVALUE", "NOMAXVALUE")
- text = text.replace("NO CYCLE", "NOCYCLE")
- if identity_options.order is not None:
- text += " ORDER" if identity_options.order else " NOORDER"
- return text.strip()
-
- def visit_computed_column(self, generated, **kw):
- text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
- generated.sqltext, include_table=False, literal_binds=True
- )
- if generated.persisted is True:
- raise exc.CompileError(
- "Oracle computed columns do not support 'stored' persistence; "
- "set the 'persisted' flag to None or False for Oracle support."
- )
- elif generated.persisted is False:
- text += " VIRTUAL"
- return text
-
- def visit_identity_column(self, identity, **kw):
- if identity.always is None:
- kind = ""
- else:
- kind = "ALWAYS" if identity.always else "BY DEFAULT"
- text = "GENERATED %s" % kind
- if identity.on_null:
- text += " ON NULL"
- text += " AS IDENTITY"
- options = self.get_identity_options(identity)
- if options:
- text += " (%s)" % options
- return text
-
-
-class OracleIdentifierPreparer(compiler.IdentifierPreparer):
- reserved_words = {x.lower() for x in RESERVED_WORDS}
- illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
- ["_", "$"]
- )
-
- def _bindparam_requires_quotes(self, value):
- """Return True if the given identifier requires quoting."""
- lc_value = value.lower()
- return (
- lc_value in self.reserved_words
- or value[0] in self.illegal_initial_characters
- or not self.legal_characters.match(str(value))
- )
-
- def format_savepoint(self, savepoint):
- name = savepoint.ident.lstrip("_")
- return super().format_savepoint(savepoint, name)
-
-
-class OracleExecutionContext(default.DefaultExecutionContext):
- def fire_sequence(self, seq, type_):
- return self._execute_scalar(
- "SELECT "
- + self.identifier_preparer.format_sequence(seq)
- + ".nextval FROM DUAL",
- type_,
- )
-
- def pre_exec(self):
- if self.statement and "_oracle_dblink" in self.execution_options:
- self.statement = self.statement.replace(
- dictionary.DB_LINK_PLACEHOLDER,
- self.execution_options["_oracle_dblink"],
- )
-
-
-class OracleDialect(default.DefaultDialect):
- name = "oracle"
- supports_statement_cache = True
- supports_alter = True
- max_identifier_length = 128
-
- _supports_offset_fetch = True
-
- insert_returning = True
- update_returning = True
- delete_returning = True
-
- div_is_floordiv = False
-
- supports_simple_order_by_label = False
- cte_follows_insert = True
- returns_native_bytes = True
-
- supports_sequences = True
- sequences_optional = False
- postfetch_lastrowid = False
-
- default_paramstyle = "named"
- colspecs = colspecs
- ischema_names = ischema_names
- requires_name_normalize = True
-
- supports_comments = True
-
- supports_default_values = False
- supports_default_metavalue = True
- supports_empty_insert = False
- supports_identity_columns = True
-
- statement_compiler = OracleCompiler
- ddl_compiler = OracleDDLCompiler
- type_compiler_cls = OracleTypeCompiler
- preparer = OracleIdentifierPreparer
- execution_ctx_cls = OracleExecutionContext
-
- reflection_options = ("oracle_resolve_synonyms",)
-
- _use_nchar_for_unicode = False
-
- construct_arguments = [
- (
- sa_schema.Table,
- {"resolve_synonyms": False, "on_commit": None, "compress": False},
- ),
- (sa_schema.Index, {"bitmap": False, "compress": False}),
- ]
-
- @util.deprecated_params(
- use_binds_for_limits=(
- "1.4",
- "The ``use_binds_for_limits`` Oracle dialect parameter is "
- "deprecated. The dialect now renders LIMIT /OFFSET integers "
- "inline in all cases using a post-compilation hook, so that the "
- "value is still represented by a 'bound parameter' on the Core "
- "Expression side.",
- )
- )
- def __init__(
- self,
- use_ansi=True,
- optimize_limits=False,
- use_binds_for_limits=None,
- use_nchar_for_unicode=False,
- exclude_tablespaces=("SYSTEM", "SYSAUX"),
- enable_offset_fetch=True,
- **kwargs,
- ):
- default.DefaultDialect.__init__(self, **kwargs)
- self._use_nchar_for_unicode = use_nchar_for_unicode
- self.use_ansi = use_ansi
- self.optimize_limits = optimize_limits
- self.exclude_tablespaces = exclude_tablespaces
- self.enable_offset_fetch = self._supports_offset_fetch = (
- enable_offset_fetch
- )
-
- def initialize(self, connection):
- super().initialize(connection)
-
- # Oracle 8i has RETURNING:
- # https://docs.oracle.com/cd/A87860_01/doc/index.htm
-
- # so does Oracle8:
- # https://docs.oracle.com/cd/A64702_01/doc/index.htm
-
- if self._is_oracle_8:
- self.colspecs = self.colspecs.copy()
- self.colspecs.pop(sqltypes.Interval)
- self.use_ansi = False
-
- self.supports_identity_columns = self.server_version_info >= (12,)
- self._supports_offset_fetch = (
- self.enable_offset_fetch and self.server_version_info >= (12,)
- )
-
- def _get_effective_compat_server_version_info(self, connection):
- # dialect does not need compat levels below 12.2, so don't query
- # in those cases
-
- if self.server_version_info < (12, 2):
- return self.server_version_info
- try:
- compat = connection.exec_driver_sql(
- "SELECT value FROM v$parameter WHERE name = 'compatible'"
- ).scalar()
- except exc.DBAPIError:
- compat = None
-
- if compat:
- try:
- return tuple(int(x) for x in compat.split("."))
- except:
- return self.server_version_info
- else:
- return self.server_version_info
-
- @property
- def _is_oracle_8(self):
- return self.server_version_info and self.server_version_info < (9,)
-
- @property
- def _supports_table_compression(self):
- return self.server_version_info and self.server_version_info >= (10, 1)
-
- @property
- def _supports_table_compress_for(self):
- return self.server_version_info and self.server_version_info >= (11,)
-
- @property
- def _supports_char_length(self):
- return not self._is_oracle_8
-
- @property
- def _supports_update_returning_computed_cols(self):
- # on version 18 this error is no longet present while it happens on 11
- # it may work also on versions before the 18
- return self.server_version_info and self.server_version_info >= (18,)
-
- @property
- def _supports_except_all(self):
- return self.server_version_info and self.server_version_info >= (21,)
-
- def do_release_savepoint(self, connection, name):
- # Oracle does not support RELEASE SAVEPOINT
- pass
-
- def _check_max_identifier_length(self, connection):
- if self._get_effective_compat_server_version_info(connection) < (
- 12,
- 2,
- ):
- return 30
- else:
- # use the default
- return None
-
- def get_isolation_level_values(self, dbapi_connection):
- return ["READ COMMITTED", "SERIALIZABLE"]
-
- def get_default_isolation_level(self, dbapi_conn):
- try:
- return self.get_isolation_level(dbapi_conn)
- except NotImplementedError:
- raise
- except:
- return "READ COMMITTED"
-
- def _execute_reflection(
- self, connection, query, dblink, returns_long, params=None
- ):
- if dblink and not dblink.startswith("@"):
- dblink = f"@{dblink}"
- execution_options = {
- # handle db links
- "_oracle_dblink": dblink or "",
- # override any schema translate map
- "schema_translate_map": None,
- }
-
- if dblink and returns_long:
- # Oracle seems to error with
- # "ORA-00997: illegal use of LONG datatype" when returning
- # LONG columns via a dblink in a query with bind params
- # This type seems to be very hard to cast into something else
- # so it seems easier to just use bind param in this case
- def visit_bindparam(bindparam):
- bindparam.literal_execute = True
-
- query = visitors.cloned_traverse(
- query, {}, {"bindparam": visit_bindparam}
- )
- return connection.execute(
- query, params, execution_options=execution_options
- )
-
- @util.memoized_property
- def _has_table_query(self):
- # materialized views are returned by all_tables
- tables = (
- select(
- dictionary.all_tables.c.table_name,
- dictionary.all_tables.c.owner,
- )
- .union_all(
- select(
- dictionary.all_views.c.view_name.label("table_name"),
- dictionary.all_views.c.owner,
- )
- )
- .subquery("tables_and_views")
- )
-
- query = select(tables.c.table_name).where(
- tables.c.table_name == bindparam("table_name"),
- tables.c.owner == bindparam("owner"),
- )
- return query
-
- @reflection.cache
- def has_table(
- self, connection, table_name, schema=None, dblink=None, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- self._ensure_has_table_connection(connection)
-
- if not schema:
- schema = self.default_schema_name
-
- params = {
- "table_name": self.denormalize_name(table_name),
- "owner": self.denormalize_schema_name(schema),
- }
- cursor = self._execute_reflection(
- connection,
- self._has_table_query,
- dblink,
- returns_long=False,
- params=params,
- )
- return bool(cursor.scalar())
-
- @reflection.cache
- def has_sequence(
- self, connection, sequence_name, schema=None, dblink=None, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- if not schema:
- schema = self.default_schema_name
-
- query = select(dictionary.all_sequences.c.sequence_name).where(
- dictionary.all_sequences.c.sequence_name
- == self.denormalize_schema_name(sequence_name),
- dictionary.all_sequences.c.sequence_owner
- == self.denormalize_schema_name(schema),
- )
-
- cursor = self._execute_reflection(
- connection, query, dblink, returns_long=False
- )
- return bool(cursor.scalar())
-
- def _get_default_schema_name(self, connection):
- return self.normalize_name(
- connection.exec_driver_sql(
- "select sys_context( 'userenv', 'current_schema' ) from dual"
- ).scalar()
- )
-
- def denormalize_schema_name(self, name):
- # look for quoted_name
- force = getattr(name, "quote", None)
- if force is None and name == "public":
- # look for case insensitive, no quoting specified, "public"
- return "PUBLIC"
- return super().denormalize_name(name)
-
- @reflection.flexi_cache(
- ("schema", InternalTraversal.dp_string),
- ("filter_names", InternalTraversal.dp_string_list),
- ("dblink", InternalTraversal.dp_string),
- )
- def _get_synonyms(self, connection, schema, filter_names, dblink, **kw):
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
-
- has_filter_names, params = self._prepare_filter_names(filter_names)
- query = select(
- dictionary.all_synonyms.c.synonym_name,
- dictionary.all_synonyms.c.table_name,
- dictionary.all_synonyms.c.table_owner,
- dictionary.all_synonyms.c.db_link,
- ).where(dictionary.all_synonyms.c.owner == owner)
- if has_filter_names:
- query = query.where(
- dictionary.all_synonyms.c.synonym_name.in_(
- params["filter_names"]
- )
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).mappings()
- return result.all()
-
- @lru_cache()
- def _all_objects_query(
- self, owner, scope, kind, has_filter_names, has_mat_views
- ):
- query = (
- select(dictionary.all_objects.c.object_name)
- .select_from(dictionary.all_objects)
- .where(dictionary.all_objects.c.owner == owner)
- )
-
- # NOTE: materialized views are listed in all_objects twice;
- # once as MATERIALIZE VIEW and once as TABLE
- if kind is ObjectKind.ANY:
- # materilaized view are listed also as tables so there is no
- # need to add them to the in_.
- query = query.where(
- dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW"))
- )
- else:
- object_type = []
- if ObjectKind.VIEW in kind:
- object_type.append("VIEW")
- if (
- ObjectKind.MATERIALIZED_VIEW in kind
- and ObjectKind.TABLE not in kind
- ):
- # materilaized view are listed also as tables so there is no
- # need to add them to the in_ if also selecting tables.
- object_type.append("MATERIALIZED VIEW")
- if ObjectKind.TABLE in kind:
- object_type.append("TABLE")
- if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind:
- # materialized view are listed also as tables,
- # so they need to be filtered out
- # EXCEPT ALL / MINUS profiles as faster than using
- # NOT EXISTS or NOT IN with a subquery, but it's in
- # general faster to get the mat view names and exclude
- # them only when needed
- query = query.where(
- dictionary.all_objects.c.object_name.not_in(
- bindparam("mat_views")
- )
- )
- query = query.where(
- dictionary.all_objects.c.object_type.in_(object_type)
- )
-
- # handles scope
- if scope is ObjectScope.DEFAULT:
- query = query.where(dictionary.all_objects.c.temporary == "N")
- elif scope is ObjectScope.TEMPORARY:
- query = query.where(dictionary.all_objects.c.temporary == "Y")
-
- if has_filter_names:
- query = query.where(
- dictionary.all_objects.c.object_name.in_(
- bindparam("filter_names")
- )
- )
- return query
-
- @reflection.flexi_cache(
- ("schema", InternalTraversal.dp_string),
- ("scope", InternalTraversal.dp_plain_obj),
- ("kind", InternalTraversal.dp_plain_obj),
- ("filter_names", InternalTraversal.dp_string_list),
- ("dblink", InternalTraversal.dp_string),
- )
- def _get_all_objects(
- self, connection, schema, scope, kind, filter_names, dblink, **kw
- ):
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
-
- has_filter_names, params = self._prepare_filter_names(filter_names)
- has_mat_views = False
- if (
- ObjectKind.TABLE in kind
- and ObjectKind.MATERIALIZED_VIEW not in kind
- ):
- # see note in _all_objects_query
- mat_views = self.get_materialized_view_names(
- connection, schema, dblink, _normalize=False, **kw
- )
- if mat_views:
- params["mat_views"] = mat_views
- has_mat_views = True
-
- query = self._all_objects_query(
- owner, scope, kind, has_filter_names, has_mat_views
- )
-
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False, params=params
- ).scalars()
-
- return result.all()
-
- def _handle_synonyms_decorator(fn):
- @wraps(fn)
- def wrapper(self, *args, **kwargs):
- return self._handle_synonyms(fn, *args, **kwargs)
-
- return wrapper
-
- def _handle_synonyms(self, fn, connection, *args, **kwargs):
- if not kwargs.get("oracle_resolve_synonyms", False):
- return fn(self, connection, *args, **kwargs)
-
- original_kw = kwargs.copy()
- schema = kwargs.pop("schema", None)
- result = self._get_synonyms(
- connection,
- schema=schema,
- filter_names=kwargs.pop("filter_names", None),
- dblink=kwargs.pop("dblink", None),
- info_cache=kwargs.get("info_cache", None),
- )
-
- dblinks_owners = defaultdict(dict)
- for row in result:
- key = row["db_link"], row["table_owner"]
- tn = self.normalize_name(row["table_name"])
- dblinks_owners[key][tn] = row["synonym_name"]
-
- if not dblinks_owners:
- # No synonym, do the plain thing
- return fn(self, connection, *args, **original_kw)
-
- data = {}
- for (dblink, table_owner), mapping in dblinks_owners.items():
- call_kw = {
- **original_kw,
- "schema": table_owner,
- "dblink": self.normalize_name(dblink),
- "filter_names": mapping.keys(),
- }
- call_result = fn(self, connection, *args, **call_kw)
- for (_, tn), value in call_result:
- synonym_name = self.normalize_name(mapping[tn])
- data[(schema, synonym_name)] = value
- return data.items()
-
- @reflection.cache
- def get_schema_names(self, connection, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- query = select(dictionary.all_users.c.username).order_by(
- dictionary.all_users.c.username
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
-
- @reflection.cache
- def get_table_names(self, connection, schema=None, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- # note that table_names() isn't loading DBLINKed or synonym'ed tables
- if schema is None:
- schema = self.default_schema_name
-
- den_schema = self.denormalize_schema_name(schema)
- if kw.get("oracle_resolve_synonyms", False):
- tables = (
- select(
- dictionary.all_tables.c.table_name,
- dictionary.all_tables.c.owner,
- dictionary.all_tables.c.iot_name,
- dictionary.all_tables.c.duration,
- dictionary.all_tables.c.tablespace_name,
- )
- .union_all(
- select(
- dictionary.all_synonyms.c.synonym_name.label(
- "table_name"
- ),
- dictionary.all_synonyms.c.owner,
- dictionary.all_tables.c.iot_name,
- dictionary.all_tables.c.duration,
- dictionary.all_tables.c.tablespace_name,
- )
- .select_from(dictionary.all_tables)
- .join(
- dictionary.all_synonyms,
- and_(
- dictionary.all_tables.c.table_name
- == dictionary.all_synonyms.c.table_name,
- dictionary.all_tables.c.owner
- == func.coalesce(
- dictionary.all_synonyms.c.table_owner,
- dictionary.all_synonyms.c.owner,
- ),
- ),
- )
- )
- .subquery("available_tables")
- )
- else:
- tables = dictionary.all_tables
-
- query = select(tables.c.table_name)
- if self.exclude_tablespaces:
- query = query.where(
- func.coalesce(
- tables.c.tablespace_name, "no tablespace"
- ).not_in(self.exclude_tablespaces)
- )
- query = query.where(
- tables.c.owner == den_schema,
- tables.c.iot_name.is_(null()),
- tables.c.duration.is_(null()),
- )
-
- # remove materialized views
- mat_query = select(
- dictionary.all_mviews.c.mview_name.label("table_name")
- ).where(dictionary.all_mviews.c.owner == den_schema)
-
- query = (
- query.except_all(mat_query)
- if self._supports_except_all
- else query.except_(mat_query)
- )
-
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
-
- @reflection.cache
- def get_temp_table_names(self, connection, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- schema = self.denormalize_schema_name(self.default_schema_name)
-
- query = select(dictionary.all_tables.c.table_name)
- if self.exclude_tablespaces:
- query = query.where(
- func.coalesce(
- dictionary.all_tables.c.tablespace_name, "no tablespace"
- ).not_in(self.exclude_tablespaces)
- )
- query = query.where(
- dictionary.all_tables.c.owner == schema,
- dictionary.all_tables.c.iot_name.is_(null()),
- dictionary.all_tables.c.duration.is_not(null()),
- )
-
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
-
- @reflection.cache
- def get_materialized_view_names(
- self, connection, schema=None, dblink=None, _normalize=True, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- if not schema:
- schema = self.default_schema_name
-
- query = select(dictionary.all_mviews.c.mview_name).where(
- dictionary.all_mviews.c.owner
- == self.denormalize_schema_name(schema)
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- if _normalize:
- return [self.normalize_name(row) for row in result]
- else:
- return result.all()
-
- @reflection.cache
- def get_view_names(self, connection, schema=None, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- if not schema:
- schema = self.default_schema_name
-
- query = select(dictionary.all_views.c.view_name).where(
- dictionary.all_views.c.owner
- == self.denormalize_schema_name(schema)
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
-
- @reflection.cache
- def get_sequence_names(self, connection, schema=None, dblink=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link."""
- if not schema:
- schema = self.default_schema_name
- query = select(dictionary.all_sequences.c.sequence_name).where(
- dictionary.all_sequences.c.sequence_owner
- == self.denormalize_schema_name(schema)
- )
-
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(row) for row in result]
-
- def _value_or_raise(self, data, table, schema):
- table = self.normalize_name(str(table))
- try:
- return dict(data)[(schema, table)]
- except KeyError:
- raise exc.NoSuchTableError(
- f"{schema}.{table}" if schema else table
- ) from None
-
- def _prepare_filter_names(self, filter_names):
- if filter_names:
- fn = [self.denormalize_name(name) for name in filter_names]
- return True, {"filter_names": fn}
- else:
- return False, {}
-
- @reflection.cache
- def get_table_options(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_table_options(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
-
- @lru_cache()
- def _table_options_query(
- self, owner, scope, kind, has_filter_names, has_mat_views
- ):
- query = select(
- dictionary.all_tables.c.table_name,
- dictionary.all_tables.c.compression,
- dictionary.all_tables.c.compress_for,
- ).where(dictionary.all_tables.c.owner == owner)
- if has_filter_names:
- query = query.where(
- dictionary.all_tables.c.table_name.in_(
- bindparam("filter_names")
- )
- )
- if scope is ObjectScope.DEFAULT:
- query = query.where(dictionary.all_tables.c.duration.is_(null()))
- elif scope is ObjectScope.TEMPORARY:
- query = query.where(
- dictionary.all_tables.c.duration.is_not(null())
- )
-
- if (
- has_mat_views
- and ObjectKind.TABLE in kind
- and ObjectKind.MATERIALIZED_VIEW not in kind
- ):
- # cant use EXCEPT ALL / MINUS here because we don't have an
- # excludable row vs. the query above
- # outerjoin + where null works better on oracle 21 but 11 does
- # not like it at all. this is the next best thing
-
- query = query.where(
- dictionary.all_tables.c.table_name.not_in(
- bindparam("mat_views")
- )
- )
- elif (
- ObjectKind.TABLE not in kind
- and ObjectKind.MATERIALIZED_VIEW in kind
- ):
- query = query.where(
- dictionary.all_tables.c.table_name.in_(bindparam("mat_views"))
- )
- return query
-
- @_handle_synonyms_decorator
- def get_multi_table_options(
- self,
- connection,
- *,
- schema,
- filter_names,
- scope,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
-
- has_filter_names, params = self._prepare_filter_names(filter_names)
- has_mat_views = False
-
- if (
- ObjectKind.TABLE in kind
- and ObjectKind.MATERIALIZED_VIEW not in kind
- ):
- # see note in _table_options_query
- mat_views = self.get_materialized_view_names(
- connection, schema, dblink, _normalize=False, **kw
- )
- if mat_views:
- params["mat_views"] = mat_views
- has_mat_views = True
- elif (
- ObjectKind.TABLE not in kind
- and ObjectKind.MATERIALIZED_VIEW in kind
- ):
- mat_views = self.get_materialized_view_names(
- connection, schema, dblink, _normalize=False, **kw
- )
- params["mat_views"] = mat_views
-
- options = {}
- default = ReflectionDefaults.table_options
-
- if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind:
- query = self._table_options_query(
- owner, scope, kind, has_filter_names, has_mat_views
- )
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False, params=params
- )
-
- for table, compression, compress_for in result:
- if compression == "ENABLED":
- data = {"oracle_compress": compress_for}
- else:
- data = default()
- options[(schema, self.normalize_name(table))] = data
- if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope:
- # add the views (no temporary views)
- for view in self.get_view_names(connection, schema, dblink, **kw):
- if not filter_names or view in filter_names:
- options[(schema, view)] = default()
-
- return options.items()
-
- @reflection.cache
- def get_columns(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
-
- data = self.get_multi_columns(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
-
- def _run_batches(
- self, connection, query, dblink, returns_long, mappings, all_objects
- ):
- each_batch = 500
- batches = list(all_objects)
- while batches:
- batch = batches[0:each_batch]
- batches[0:each_batch] = []
-
- result = self._execute_reflection(
- connection,
- query,
- dblink,
- returns_long=returns_long,
- params={"all_objects": batch},
- )
- if mappings:
- yield from result.mappings()
- else:
- yield from result
-
- @lru_cache()
- def _column_query(self, owner):
- all_cols = dictionary.all_tab_cols
- all_comments = dictionary.all_col_comments
- all_ids = dictionary.all_tab_identity_cols
-
- if self.server_version_info >= (12,):
- add_cols = (
- all_cols.c.default_on_null,
- sql.case(
- (all_ids.c.table_name.is_(None), sql.null()),
- else_=all_ids.c.generation_type
- + ","
- + all_ids.c.identity_options,
- ).label("identity_options"),
- )
- join_identity_cols = True
- else:
- add_cols = (
- sql.null().label("default_on_null"),
- sql.null().label("identity_options"),
- )
- join_identity_cols = False
-
- # NOTE: on oracle cannot create tables/views without columns and
- # a table cannot have all column hidden:
- # ORA-54039: table must have at least one column that is not invisible
- # all_tab_cols returns data for tables/views/mat-views.
- # all_tab_cols does not return recycled tables
-
- query = (
- select(
- all_cols.c.table_name,
- all_cols.c.column_name,
- all_cols.c.data_type,
- all_cols.c.char_length,
- all_cols.c.data_precision,
- all_cols.c.data_scale,
- all_cols.c.nullable,
- all_cols.c.data_default,
- all_comments.c.comments,
- all_cols.c.virtual_column,
- *add_cols,
- ).select_from(all_cols)
- # NOTE: all_col_comments has a row for each column even if no
- # comment is present, so a join could be performed, but there
- # seems to be no difference compared to an outer join
- .outerjoin(
- all_comments,
- and_(
- all_cols.c.table_name == all_comments.c.table_name,
- all_cols.c.column_name == all_comments.c.column_name,
- all_cols.c.owner == all_comments.c.owner,
- ),
- )
- )
- if join_identity_cols:
- query = query.outerjoin(
- all_ids,
- and_(
- all_cols.c.table_name == all_ids.c.table_name,
- all_cols.c.column_name == all_ids.c.column_name,
- all_cols.c.owner == all_ids.c.owner,
- ),
- )
-
- query = query.where(
- all_cols.c.table_name.in_(bindparam("all_objects")),
- all_cols.c.hidden_column == "NO",
- all_cols.c.owner == owner,
- ).order_by(all_cols.c.table_name, all_cols.c.column_id)
- return query
-
- @_handle_synonyms_decorator
- def get_multi_columns(
- self,
- connection,
- *,
- schema,
- filter_names,
- scope,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- query = self._column_query(owner)
-
- if (
- filter_names
- and kind is ObjectKind.ANY
- and scope is ObjectScope.ANY
- ):
- all_objects = [self.denormalize_name(n) for n in filter_names]
- else:
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
-
- columns = defaultdict(list)
-
- # all_tab_cols.data_default is LONG
- result = self._run_batches(
- connection,
- query,
- dblink,
- returns_long=True,
- mappings=True,
- all_objects=all_objects,
- )
-
- def maybe_int(value):
- if isinstance(value, float) and value.is_integer():
- return int(value)
- else:
- return value
-
- remove_size = re.compile(r"\(\d+\)")
-
- for row_dict in result:
- table_name = self.normalize_name(row_dict["table_name"])
- orig_colname = row_dict["column_name"]
- colname = self.normalize_name(orig_colname)
- coltype = row_dict["data_type"]
- precision = maybe_int(row_dict["data_precision"])
-
- if coltype == "NUMBER":
- scale = maybe_int(row_dict["data_scale"])
- if precision is None and scale == 0:
- coltype = INTEGER()
- else:
- coltype = NUMBER(precision, scale)
- elif coltype == "FLOAT":
- # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm
- if precision == 126:
- # The DOUBLE PRECISION datatype is a floating-point
- # number with binary precision 126.
- coltype = DOUBLE_PRECISION()
- elif precision == 63:
- # The REAL datatype is a floating-point number with a
- # binary precision of 63, or 18 decimal.
- coltype = REAL()
- else:
- # non standard precision
- coltype = FLOAT(binary_precision=precision)
-
- elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
- char_length = maybe_int(row_dict["char_length"])
- coltype = self.ischema_names.get(coltype)(char_length)
- elif "WITH TIME ZONE" in coltype:
- coltype = TIMESTAMP(timezone=True)
- elif "WITH LOCAL TIME ZONE" in coltype:
- coltype = TIMESTAMP(local_timezone=True)
- else:
- coltype = re.sub(remove_size, "", coltype)
- try:
- coltype = self.ischema_names[coltype]
- except KeyError:
- util.warn(
- "Did not recognize type '%s' of column '%s'"
- % (coltype, colname)
- )
- coltype = sqltypes.NULLTYPE
-
- default = row_dict["data_default"]
- if row_dict["virtual_column"] == "YES":
- computed = dict(sqltext=default)
- default = None
- else:
- computed = None
-
- identity_options = row_dict["identity_options"]
- if identity_options is not None:
- identity = self._parse_identity_options(
- identity_options, row_dict["default_on_null"]
- )
- default = None
- else:
- identity = None
-
- cdict = {
- "name": colname,
- "type": coltype,
- "nullable": row_dict["nullable"] == "Y",
- "default": default,
- "comment": row_dict["comments"],
- }
- if orig_colname.lower() == orig_colname:
- cdict["quote"] = True
- if computed is not None:
- cdict["computed"] = computed
- if identity is not None:
- cdict["identity"] = identity
-
- columns[(schema, table_name)].append(cdict)
-
- # NOTE: default not needed since all tables have columns
- # default = ReflectionDefaults.columns
- # return (
- # (key, value if value else default())
- # for key, value in columns.items()
- # )
- return columns.items()
-
- def _parse_identity_options(self, identity_options, default_on_null):
- # identity_options is a string that starts with 'ALWAYS,' or
- # 'BY DEFAULT,' and continues with
- # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1,
- # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N,
- # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N
- parts = [p.strip() for p in identity_options.split(",")]
- identity = {
- "always": parts[0] == "ALWAYS",
- "on_null": default_on_null == "YES",
- }
-
- for part in parts[1:]:
- option, value = part.split(":")
- value = value.strip()
-
- if "START WITH" in option:
- identity["start"] = int(value)
- elif "INCREMENT BY" in option:
- identity["increment"] = int(value)
- elif "MAX_VALUE" in option:
- identity["maxvalue"] = int(value)
- elif "MIN_VALUE" in option:
- identity["minvalue"] = int(value)
- elif "CYCLE_FLAG" in option:
- identity["cycle"] = value == "Y"
- elif "CACHE_SIZE" in option:
- identity["cache"] = int(value)
- elif "ORDER_FLAG" in option:
- identity["order"] = value == "Y"
- return identity
-
- @reflection.cache
- def get_table_comment(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_table_comment(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
-
- @lru_cache()
- def _comment_query(self, owner, scope, kind, has_filter_names):
- # NOTE: all_tab_comments / all_mview_comments have a row for all
- # object even if they don't have comments
- queries = []
- if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind:
- # all_tab_comments returns also plain views
- tbl_view = select(
- dictionary.all_tab_comments.c.table_name,
- dictionary.all_tab_comments.c.comments,
- ).where(
- dictionary.all_tab_comments.c.owner == owner,
- dictionary.all_tab_comments.c.table_name.not_like("BIN$%"),
- )
- if ObjectKind.VIEW not in kind:
- tbl_view = tbl_view.where(
- dictionary.all_tab_comments.c.table_type == "TABLE"
- )
- elif ObjectKind.TABLE not in kind:
- tbl_view = tbl_view.where(
- dictionary.all_tab_comments.c.table_type == "VIEW"
- )
- queries.append(tbl_view)
- if ObjectKind.MATERIALIZED_VIEW in kind:
- mat_view = select(
- dictionary.all_mview_comments.c.mview_name.label("table_name"),
- dictionary.all_mview_comments.c.comments,
- ).where(
- dictionary.all_mview_comments.c.owner == owner,
- dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"),
- )
- queries.append(mat_view)
- if len(queries) == 1:
- query = queries[0]
- else:
- union = sql.union_all(*queries).subquery("tables_and_views")
- query = select(union.c.table_name, union.c.comments)
-
- name_col = query.selected_columns.table_name
-
- if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY):
- temp = "Y" if scope is ObjectScope.TEMPORARY else "N"
- # need distinct since materialized view are listed also
- # as tables in all_objects
- query = query.distinct().join(
- dictionary.all_objects,
- and_(
- dictionary.all_objects.c.owner == owner,
- dictionary.all_objects.c.object_name == name_col,
- dictionary.all_objects.c.temporary == temp,
- ),
- )
- if has_filter_names:
- query = query.where(name_col.in_(bindparam("filter_names")))
- return query
-
- @_handle_synonyms_decorator
- def get_multi_table_comment(
- self,
- connection,
- *,
- schema,
- filter_names,
- scope,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- has_filter_names, params = self._prepare_filter_names(filter_names)
- query = self._comment_query(owner, scope, kind, has_filter_names)
-
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False, params=params
- )
- default = ReflectionDefaults.table_comment
- # materialized views by default seem to have a comment like
- # "snapshot table for snapshot owner.mat_view_name"
- ignore_mat_view = "snapshot table for snapshot "
- return (
- (
- (schema, self.normalize_name(table)),
- (
- {"text": comment}
- if comment is not None
- and not comment.startswith(ignore_mat_view)
- else default()
- ),
- )
- for table, comment in result
- )
-
- @reflection.cache
- def get_indexes(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_indexes(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
-
- @lru_cache()
- def _index_query(self, owner):
- return (
- select(
- dictionary.all_ind_columns.c.table_name,
- dictionary.all_ind_columns.c.index_name,
- dictionary.all_ind_columns.c.column_name,
- dictionary.all_indexes.c.index_type,
- dictionary.all_indexes.c.uniqueness,
- dictionary.all_indexes.c.compression,
- dictionary.all_indexes.c.prefix_length,
- dictionary.all_ind_columns.c.descend,
- dictionary.all_ind_expressions.c.column_expression,
- )
- .select_from(dictionary.all_ind_columns)
- .join(
- dictionary.all_indexes,
- sql.and_(
- dictionary.all_ind_columns.c.index_name
- == dictionary.all_indexes.c.index_name,
- dictionary.all_ind_columns.c.index_owner
- == dictionary.all_indexes.c.owner,
- ),
- )
- .outerjoin(
- # NOTE: this adds about 20% to the query time. Using a
- # case expression with a scalar subquery only when needed
- # with the assumption that most indexes are not expression
- # would be faster but oracle does not like that with
- # LONG datatype. It errors with:
- # ORA-00997: illegal use of LONG datatype
- dictionary.all_ind_expressions,
- sql.and_(
- dictionary.all_ind_expressions.c.index_name
- == dictionary.all_ind_columns.c.index_name,
- dictionary.all_ind_expressions.c.index_owner
- == dictionary.all_ind_columns.c.index_owner,
- dictionary.all_ind_expressions.c.column_position
- == dictionary.all_ind_columns.c.column_position,
- ),
- )
- .where(
- dictionary.all_indexes.c.table_owner == owner,
- dictionary.all_indexes.c.table_name.in_(
- bindparam("all_objects")
- ),
- )
- .order_by(
- dictionary.all_ind_columns.c.index_name,
- dictionary.all_ind_columns.c.column_position,
- )
- )
-
- @reflection.flexi_cache(
- ("schema", InternalTraversal.dp_string),
- ("dblink", InternalTraversal.dp_string),
- ("all_objects", InternalTraversal.dp_string_list),
- )
- def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw):
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
-
- query = self._index_query(owner)
-
- pks = {
- row_dict["constraint_name"]
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- )
- if row_dict["constraint_type"] == "P"
- }
-
- # all_ind_expressions.column_expression is LONG
- result = self._run_batches(
- connection,
- query,
- dblink,
- returns_long=True,
- mappings=True,
- all_objects=all_objects,
- )
-
- return [
- row_dict
- for row_dict in result
- if row_dict["index_name"] not in pks
- ]
-
- @_handle_synonyms_decorator
- def get_multi_indexes(
- self,
- connection,
- *,
- schema,
- filter_names,
- scope,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
-
- uniqueness = {"NONUNIQUE": False, "UNIQUE": True}
- enabled = {"DISABLED": False, "ENABLED": True}
- is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"}
-
- indexes = defaultdict(dict)
-
- for row_dict in self._get_indexes_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- index_name = self.normalize_name(row_dict["index_name"])
- table_name = self.normalize_name(row_dict["table_name"])
- table_indexes = indexes[(schema, table_name)]
-
- if index_name not in table_indexes:
- table_indexes[index_name] = index_dict = {
- "name": index_name,
- "column_names": [],
- "dialect_options": {},
- "unique": uniqueness.get(row_dict["uniqueness"], False),
- }
- do = index_dict["dialect_options"]
- if row_dict["index_type"] in is_bitmap:
- do["oracle_bitmap"] = True
- if enabled.get(row_dict["compression"], False):
- do["oracle_compress"] = row_dict["prefix_length"]
-
- else:
- index_dict = table_indexes[index_name]
-
- expr = row_dict["column_expression"]
- if expr is not None:
- index_dict["column_names"].append(None)
- if "expressions" in index_dict:
- index_dict["expressions"].append(expr)
- else:
- index_dict["expressions"] = index_dict["column_names"][:-1]
- index_dict["expressions"].append(expr)
-
- if row_dict["descend"].lower() != "asc":
- assert row_dict["descend"].lower() == "desc"
- cs = index_dict.setdefault("column_sorting", {})
- cs[expr] = ("desc",)
- else:
- assert row_dict["descend"].lower() == "asc"
- cn = self.normalize_name(row_dict["column_name"])
- index_dict["column_names"].append(cn)
- if "expressions" in index_dict:
- index_dict["expressions"].append(cn)
-
- default = ReflectionDefaults.indexes
-
- return (
- (key, list(indexes[key].values()) if key in indexes else default())
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
-
- @reflection.cache
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_pk_constraint(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
-
- @lru_cache()
- def _constraint_query(self, owner):
- local = dictionary.all_cons_columns.alias("local")
- remote = dictionary.all_cons_columns.alias("remote")
- return (
- select(
- dictionary.all_constraints.c.table_name,
- dictionary.all_constraints.c.constraint_type,
- dictionary.all_constraints.c.constraint_name,
- local.c.column_name.label("local_column"),
- remote.c.table_name.label("remote_table"),
- remote.c.column_name.label("remote_column"),
- remote.c.owner.label("remote_owner"),
- dictionary.all_constraints.c.search_condition,
- dictionary.all_constraints.c.delete_rule,
- )
- .select_from(dictionary.all_constraints)
- .join(
- local,
- and_(
- local.c.owner == dictionary.all_constraints.c.owner,
- dictionary.all_constraints.c.constraint_name
- == local.c.constraint_name,
- ),
- )
- .outerjoin(
- remote,
- and_(
- dictionary.all_constraints.c.r_owner == remote.c.owner,
- dictionary.all_constraints.c.r_constraint_name
- == remote.c.constraint_name,
- or_(
- remote.c.position.is_(sql.null()),
- local.c.position == remote.c.position,
- ),
- ),
- )
- .where(
- dictionary.all_constraints.c.owner == owner,
- dictionary.all_constraints.c.table_name.in_(
- bindparam("all_objects")
- ),
- dictionary.all_constraints.c.constraint_type.in_(
- ("R", "P", "U", "C")
- ),
- )
- .order_by(
- dictionary.all_constraints.c.constraint_name, local.c.position
- )
- )
-
- @reflection.flexi_cache(
- ("schema", InternalTraversal.dp_string),
- ("dblink", InternalTraversal.dp_string),
- ("all_objects", InternalTraversal.dp_string_list),
- )
- def _get_all_constraint_rows(
- self, connection, schema, dblink, all_objects, **kw
- ):
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- query = self._constraint_query(owner)
-
- # since the result is cached a list must be created
- values = list(
- self._run_batches(
- connection,
- query,
- dblink,
- returns_long=False,
- mappings=True,
- all_objects=all_objects,
- )
- )
- return values
-
- @_handle_synonyms_decorator
- def get_multi_pk_constraint(
- self,
- connection,
- *,
- scope,
- schema,
- filter_names,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
-
- primary_keys = defaultdict(dict)
- default = ReflectionDefaults.pk_constraint
-
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- if row_dict["constraint_type"] != "P":
- continue
- table_name = self.normalize_name(row_dict["table_name"])
- constraint_name = self.normalize_name(row_dict["constraint_name"])
- column_name = self.normalize_name(row_dict["local_column"])
-
- table_pk = primary_keys[(schema, table_name)]
- if not table_pk:
- table_pk["name"] = constraint_name
- table_pk["constrained_columns"] = [column_name]
- else:
- table_pk["constrained_columns"].append(column_name)
-
- return (
- (key, primary_keys[key] if key in primary_keys else default())
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
-
- @reflection.cache
- def get_foreign_keys(
- self,
- connection,
- table_name,
- schema=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_foreign_keys(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
-
- @_handle_synonyms_decorator
- def get_multi_foreign_keys(
- self,
- connection,
- *,
- scope,
- schema,
- filter_names,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
-
- resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
-
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
-
- all_remote_owners = set()
- fkeys = defaultdict(dict)
-
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- if row_dict["constraint_type"] != "R":
- continue
-
- table_name = self.normalize_name(row_dict["table_name"])
- constraint_name = self.normalize_name(row_dict["constraint_name"])
- table_fkey = fkeys[(schema, table_name)]
-
- assert constraint_name is not None
-
- local_column = self.normalize_name(row_dict["local_column"])
- remote_table = self.normalize_name(row_dict["remote_table"])
- remote_column = self.normalize_name(row_dict["remote_column"])
- remote_owner_orig = row_dict["remote_owner"]
- remote_owner = self.normalize_name(remote_owner_orig)
- if remote_owner_orig is not None:
- all_remote_owners.add(remote_owner_orig)
-
- if remote_table is None:
- # ticket 363
- if dblink and not dblink.startswith("@"):
- dblink = f"@{dblink}"
- util.warn(
- "Got 'None' querying 'table_name' from "
- f"all_cons_columns{dblink or ''} - does the user have "
- "proper rights to the table?"
- )
- continue
-
- if constraint_name not in table_fkey:
- table_fkey[constraint_name] = fkey = {
- "name": constraint_name,
- "constrained_columns": [],
- "referred_schema": None,
- "referred_table": remote_table,
- "referred_columns": [],
- "options": {},
- }
-
- if resolve_synonyms:
- # will be removed below
- fkey["_ref_schema"] = remote_owner
-
- if schema is not None or remote_owner_orig != owner:
- fkey["referred_schema"] = remote_owner
-
- delete_rule = row_dict["delete_rule"]
- if delete_rule != "NO ACTION":
- fkey["options"]["ondelete"] = delete_rule
-
- else:
- fkey = table_fkey[constraint_name]
-
- fkey["constrained_columns"].append(local_column)
- fkey["referred_columns"].append(remote_column)
-
- if resolve_synonyms and all_remote_owners:
- query = select(
- dictionary.all_synonyms.c.owner,
- dictionary.all_synonyms.c.table_name,
- dictionary.all_synonyms.c.table_owner,
- dictionary.all_synonyms.c.synonym_name,
- ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners))
-
- result = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).mappings()
-
- remote_owners_lut = {}
- for row in result:
- synonym_owner = self.normalize_name(row["owner"])
- table_name = self.normalize_name(row["table_name"])
-
- remote_owners_lut[(synonym_owner, table_name)] = (
- row["table_owner"],
- row["synonym_name"],
- )
-
- empty = (None, None)
- for table_fkeys in fkeys.values():
- for table_fkey in table_fkeys.values():
- key = (
- table_fkey.pop("_ref_schema"),
- table_fkey["referred_table"],
- )
- remote_owner, syn_name = remote_owners_lut.get(key, empty)
- if syn_name:
- sn = self.normalize_name(syn_name)
- table_fkey["referred_table"] = sn
- if schema is not None or remote_owner != owner:
- ro = self.normalize_name(remote_owner)
- table_fkey["referred_schema"] = ro
- else:
- table_fkey["referred_schema"] = None
- default = ReflectionDefaults.foreign_keys
-
- return (
- (key, list(fkeys[key].values()) if key in fkeys else default())
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
-
- @reflection.cache
- def get_unique_constraints(
- self, connection, table_name, schema=None, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_unique_constraints(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
-
- @_handle_synonyms_decorator
- def get_multi_unique_constraints(
- self,
- connection,
- *,
- scope,
- schema,
- filter_names,
- kind,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
-
- unique_cons = defaultdict(dict)
-
- index_names = {
- row_dict["index_name"]
- for row_dict in self._get_indexes_rows(
- connection, schema, dblink, all_objects, **kw
- )
- }
-
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- if row_dict["constraint_type"] != "U":
- continue
- table_name = self.normalize_name(row_dict["table_name"])
- constraint_name_orig = row_dict["constraint_name"]
- constraint_name = self.normalize_name(constraint_name_orig)
- column_name = self.normalize_name(row_dict["local_column"])
- table_uc = unique_cons[(schema, table_name)]
-
- assert constraint_name is not None
-
- if constraint_name not in table_uc:
- table_uc[constraint_name] = uc = {
- "name": constraint_name,
- "column_names": [],
- "duplicates_index": (
- constraint_name
- if constraint_name_orig in index_names
- else None
- ),
- }
- else:
- uc = table_uc[constraint_name]
-
- uc["column_names"].append(column_name)
-
- default = ReflectionDefaults.unique_constraints
-
- return (
- (
- key,
- (
- list(unique_cons[key].values())
- if key in unique_cons
- else default()
- ),
- )
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
-
- @reflection.cache
- def get_view_definition(
- self,
- connection,
- view_name,
- schema=None,
- dblink=None,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- if kw.get("oracle_resolve_synonyms", False):
- synonyms = self._get_synonyms(
- connection, schema, filter_names=[view_name], dblink=dblink
- )
- if synonyms:
- assert len(synonyms) == 1
- row_dict = synonyms[0]
- dblink = self.normalize_name(row_dict["db_link"])
- schema = row_dict["table_owner"]
- view_name = row_dict["table_name"]
-
- name = self.denormalize_name(view_name)
- owner = self.denormalize_schema_name(
- schema or self.default_schema_name
- )
- query = (
- select(dictionary.all_views.c.text)
- .where(
- dictionary.all_views.c.view_name == name,
- dictionary.all_views.c.owner == owner,
- )
- .union_all(
- select(dictionary.all_mviews.c.query).where(
- dictionary.all_mviews.c.mview_name == name,
- dictionary.all_mviews.c.owner == owner,
- )
- )
- )
-
- rp = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalar()
- if rp is None:
- raise exc.NoSuchTableError(
- f"{schema}.{view_name}" if schema else view_name
- )
- else:
- return rp
-
- @reflection.cache
- def get_check_constraints(
- self, connection, table_name, schema=None, include_all=False, **kw
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- data = self.get_multi_check_constraints(
- connection,
- schema=schema,
- filter_names=[table_name],
- scope=ObjectScope.ANY,
- include_all=include_all,
- kind=ObjectKind.ANY,
- **kw,
- )
- return self._value_or_raise(data, table_name, schema)
-
- @_handle_synonyms_decorator
- def get_multi_check_constraints(
- self,
- connection,
- *,
- schema,
- filter_names,
- dblink=None,
- scope,
- kind,
- include_all=False,
- **kw,
- ):
- """Supported kw arguments are: ``dblink`` to reflect via a db link;
- ``oracle_resolve_synonyms`` to resolve names to synonyms
- """
- all_objects = self._get_all_objects(
- connection, schema, scope, kind, filter_names, dblink, **kw
- )
-
- not_null = re.compile(r"..+?. IS NOT NULL$")
-
- check_constraints = defaultdict(list)
-
- for row_dict in self._get_all_constraint_rows(
- connection, schema, dblink, all_objects, **kw
- ):
- if row_dict["constraint_type"] != "C":
- continue
- table_name = self.normalize_name(row_dict["table_name"])
- constraint_name = self.normalize_name(row_dict["constraint_name"])
- search_condition = row_dict["search_condition"]
-
- table_checks = check_constraints[(schema, table_name)]
- if constraint_name is not None and (
- include_all or not not_null.match(search_condition)
- ):
- table_checks.append(
- {"name": constraint_name, "sqltext": search_condition}
- )
-
- default = ReflectionDefaults.check_constraints
-
- return (
- (
- key,
- (
- check_constraints[key]
- if key in check_constraints
- else default()
- ),
- )
- for key in (
- (schema, self.normalize_name(obj_name))
- for obj_name in all_objects
- )
- )
-
- def _list_dblinks(self, connection, dblink=None):
- query = select(dictionary.all_db_links.c.db_link)
- links = self._execute_reflection(
- connection, query, dblink, returns_long=False
- ).scalars()
- return [self.normalize_name(link) for link in links]
-
-
-class _OuterJoinColumn(sql.ClauseElement):
- __visit_name__ = "outer_join_column"
-
- def __init__(self, column):
- self.column = column
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py
deleted file mode 100644
index 9346224..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py
+++ /dev/null
@@ -1,1492 +0,0 @@
-# dialects/oracle/cx_oracle.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-r"""
-.. dialect:: oracle+cx_oracle
- :name: cx-Oracle
- :dbapi: cx_oracle
- :connectstring: oracle+cx_oracle://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]]
- :url: https://oracle.github.io/python-cx_Oracle/
-
-DSN vs. Hostname connections
------------------------------
-
-cx_Oracle provides several methods of indicating the target database. The
-dialect translates from a series of different URL forms.
-
-Hostname Connections with Easy Connect Syntax
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-Given a hostname, port and service name of the target Oracle Database, for
-example from Oracle's `Easy Connect syntax
-<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#easy-connect-syntax-for-connection-strings>`_,
-then connect in SQLAlchemy using the ``service_name`` query string parameter::
-
- engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:port/?service_name=myservice&encoding=UTF-8&nencoding=UTF-8")
-
-The `full Easy Connect syntax
-<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B0437826-43C1-49EC-A94D-B650B6A4A6EE>`_
-is not supported. Instead, use a ``tnsnames.ora`` file and connect using a
-DSN.
-
-Connections with tnsnames.ora or Oracle Cloud
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-Alternatively, if no port, database name, or ``service_name`` is provided, the
-dialect will use an Oracle DSN "connection string". This takes the "hostname"
-portion of the URL as the data source name. For example, if the
-``tnsnames.ora`` file contains a `Net Service Name
-<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#net-service-names-for-connection-strings>`_
-of ``myalias`` as below::
-
- myalias =
- (DESCRIPTION =
- (ADDRESS = (PROTOCOL = TCP)(HOST = mymachine.example.com)(PORT = 1521))
- (CONNECT_DATA =
- (SERVER = DEDICATED)
- (SERVICE_NAME = orclpdb1)
- )
- )
-
-The cx_Oracle dialect connects to this database service when ``myalias`` is the
-hostname portion of the URL, without specifying a port, database name or
-``service_name``::
-
- engine = create_engine("oracle+cx_oracle://scott:tiger@myalias/?encoding=UTF-8&nencoding=UTF-8")
-
-Users of Oracle Cloud should use this syntax and also configure the cloud
-wallet as shown in cx_Oracle documentation `Connecting to Autononmous Databases
-<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#connecting-to-autononmous-databases>`_.
-
-SID Connections
-^^^^^^^^^^^^^^^
-
-To use Oracle's obsolete SID connection syntax, the SID can be passed in a
-"database name" portion of the URL as below::
-
- engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:1521/dbname?encoding=UTF-8&nencoding=UTF-8")
-
-Above, the DSN passed to cx_Oracle is created by ``cx_Oracle.makedsn()`` as
-follows::
-
- >>> import cx_Oracle
- >>> cx_Oracle.makedsn("hostname", 1521, sid="dbname")
- '(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=hostname)(PORT=1521))(CONNECT_DATA=(SID=dbname)))'
-
-Passing cx_Oracle connect arguments
------------------------------------
-
-Additional connection arguments can usually be passed via the URL
-query string; particular symbols like ``cx_Oracle.SYSDBA`` are intercepted
-and converted to the correct symbol::
-
- e = create_engine(
- "oracle+cx_oracle://user:pass@dsn?encoding=UTF-8&nencoding=UTF-8&mode=SYSDBA&events=true")
-
-.. versionchanged:: 1.3 the cx_oracle dialect now accepts all argument names
- within the URL string itself, to be passed to the cx_Oracle DBAPI. As
- was the case earlier but not correctly documented, the
- :paramref:`_sa.create_engine.connect_args` parameter also accepts all
- cx_Oracle DBAPI connect arguments.
-
-To pass arguments directly to ``.connect()`` without using the query
-string, use the :paramref:`_sa.create_engine.connect_args` dictionary.
-Any cx_Oracle parameter value and/or constant may be passed, such as::
-
- import cx_Oracle
- e = create_engine(
- "oracle+cx_oracle://user:pass@dsn",
- connect_args={
- "encoding": "UTF-8",
- "nencoding": "UTF-8",
- "mode": cx_Oracle.SYSDBA,
- "events": True
- }
- )
-
-Note that the default value for ``encoding`` and ``nencoding`` was changed to
-"UTF-8" in cx_Oracle 8.0 so these parameters can be omitted when using that
-version, or later.
-
-Options consumed by the SQLAlchemy cx_Oracle dialect outside of the driver
---------------------------------------------------------------------------
-
-There are also options that are consumed by the SQLAlchemy cx_oracle dialect
-itself. These options are always passed directly to :func:`_sa.create_engine`
-, such as::
-
- e = create_engine(
- "oracle+cx_oracle://user:pass@dsn", coerce_to_decimal=False)
-
-The parameters accepted by the cx_oracle dialect are as follows:
-
-* ``arraysize`` - set the cx_oracle.arraysize value on cursors; defaults
- to ``None``, indicating that the driver default should be used (typically
- the value is 100). This setting controls how many rows are buffered when
- fetching rows, and can have a significant effect on performance when
- modified. The setting is used for both ``cx_Oracle`` as well as
- ``oracledb``.
-
- .. versionchanged:: 2.0.26 - changed the default value from 50 to None,
- to use the default value of the driver itself.
-
-* ``auto_convert_lobs`` - defaults to True; See :ref:`cx_oracle_lob`.
-
-* ``coerce_to_decimal`` - see :ref:`cx_oracle_numeric` for detail.
-
-* ``encoding_errors`` - see :ref:`cx_oracle_unicode_encoding_errors` for detail.
-
-.. _cx_oracle_sessionpool:
-
-Using cx_Oracle SessionPool
----------------------------
-
-The cx_Oracle library provides its own connection pool implementation that may
-be used in place of SQLAlchemy's pooling functionality. This can be achieved
-by using the :paramref:`_sa.create_engine.creator` parameter to provide a
-function that returns a new connection, along with setting
-:paramref:`_sa.create_engine.pool_class` to ``NullPool`` to disable
-SQLAlchemy's pooling::
-
- import cx_Oracle
- from sqlalchemy import create_engine
- from sqlalchemy.pool import NullPool
-
- pool = cx_Oracle.SessionPool(
- user="scott", password="tiger", dsn="orclpdb",
- min=2, max=5, increment=1, threaded=True,
- encoding="UTF-8", nencoding="UTF-8"
- )
-
- engine = create_engine("oracle+cx_oracle://", creator=pool.acquire, poolclass=NullPool)
-
-The above engine may then be used normally where cx_Oracle's pool handles
-connection pooling::
-
- with engine.connect() as conn:
- print(conn.scalar("select 1 FROM dual"))
-
-
-As well as providing a scalable solution for multi-user applications, the
-cx_Oracle session pool supports some Oracle features such as DRCP and
-`Application Continuity
-<https://cx-oracle.readthedocs.io/en/latest/user_guide/ha.html#application-continuity-ac>`_.
-
-Using Oracle Database Resident Connection Pooling (DRCP)
---------------------------------------------------------
-
-When using Oracle's `DRCP
-<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-015CA8C1-2386-4626-855D-CC546DDC1086>`_,
-the best practice is to pass a connection class and "purity" when acquiring a
-connection from the SessionPool. Refer to the `cx_Oracle DRCP documentation
-<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#database-resident-connection-pooling-drcp>`_.
-
-This can be achieved by wrapping ``pool.acquire()``::
-
- import cx_Oracle
- from sqlalchemy import create_engine
- from sqlalchemy.pool import NullPool
-
- pool = cx_Oracle.SessionPool(
- user="scott", password="tiger", dsn="orclpdb",
- min=2, max=5, increment=1, threaded=True,
- encoding="UTF-8", nencoding="UTF-8"
- )
-
- def creator():
- return pool.acquire(cclass="MYCLASS", purity=cx_Oracle.ATTR_PURITY_SELF)
-
- engine = create_engine("oracle+cx_oracle://", creator=creator, poolclass=NullPool)
-
-The above engine may then be used normally where cx_Oracle handles session
-pooling and Oracle Database additionally uses DRCP::
-
- with engine.connect() as conn:
- print(conn.scalar("select 1 FROM dual"))
-
-.. _cx_oracle_unicode:
-
-Unicode
--------
-
-As is the case for all DBAPIs under Python 3, all strings are inherently
-Unicode strings. In all cases however, the driver requires an explicit
-encoding configuration.
-
-Ensuring the Correct Client Encoding
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-The long accepted standard for establishing client encoding for nearly all
-Oracle related software is via the `NLS_LANG <https://www.oracle.com/database/technologies/faq-nls-lang.html>`_
-environment variable. cx_Oracle like most other Oracle drivers will use
-this environment variable as the source of its encoding configuration. The
-format of this variable is idiosyncratic; a typical value would be
-``AMERICAN_AMERICA.AL32UTF8``.
-
-The cx_Oracle driver also supports a programmatic alternative which is to
-pass the ``encoding`` and ``nencoding`` parameters directly to its
-``.connect()`` function. These can be present in the URL as follows::
-
- engine = create_engine("oracle+cx_oracle://scott:tiger@orclpdb/?encoding=UTF-8&nencoding=UTF-8")
-
-For the meaning of the ``encoding`` and ``nencoding`` parameters, please
-consult
-`Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_.
-
-.. seealso::
-
- `Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_
- - in the cx_Oracle documentation.
-
-
-Unicode-specific Column datatypes
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-The Core expression language handles unicode data by use of the :class:`.Unicode`
-and :class:`.UnicodeText`
-datatypes. These types correspond to the VARCHAR2 and CLOB Oracle datatypes by
-default. When using these datatypes with Unicode data, it is expected that
-the Oracle database is configured with a Unicode-aware character set, as well
-as that the ``NLS_LANG`` environment variable is set appropriately, so that
-the VARCHAR2 and CLOB datatypes can accommodate the data.
-
-In the case that the Oracle database is not configured with a Unicode character
-set, the two options are to use the :class:`_types.NCHAR` and
-:class:`_oracle.NCLOB` datatypes explicitly, or to pass the flag
-``use_nchar_for_unicode=True`` to :func:`_sa.create_engine`,
-which will cause the
-SQLAlchemy dialect to use NCHAR/NCLOB for the :class:`.Unicode` /
-:class:`.UnicodeText` datatypes instead of VARCHAR/CLOB.
-
-.. versionchanged:: 1.3 The :class:`.Unicode` and :class:`.UnicodeText`
- datatypes now correspond to the ``VARCHAR2`` and ``CLOB`` Oracle datatypes
- unless the ``use_nchar_for_unicode=True`` is passed to the dialect
- when :func:`_sa.create_engine` is called.
-
-
-.. _cx_oracle_unicode_encoding_errors:
-
-Encoding Errors
-^^^^^^^^^^^^^^^
-
-For the unusual case that data in the Oracle database is present with a broken
-encoding, the dialect accepts a parameter ``encoding_errors`` which will be
-passed to Unicode decoding functions in order to affect how decoding errors are
-handled. The value is ultimately consumed by the Python `decode
-<https://docs.python.org/3/library/stdtypes.html#bytes.decode>`_ function, and
-is passed both via cx_Oracle's ``encodingErrors`` parameter consumed by
-``Cursor.var()``, as well as SQLAlchemy's own decoding function, as the
-cx_Oracle dialect makes use of both under different circumstances.
-
-.. versionadded:: 1.3.11
-
-
-.. _cx_oracle_setinputsizes:
-
-Fine grained control over cx_Oracle data binding performance with setinputsizes
--------------------------------------------------------------------------------
-
-The cx_Oracle DBAPI has a deep and fundamental reliance upon the usage of the
-DBAPI ``setinputsizes()`` call. The purpose of this call is to establish the
-datatypes that are bound to a SQL statement for Python values being passed as
-parameters. While virtually no other DBAPI assigns any use to the
-``setinputsizes()`` call, the cx_Oracle DBAPI relies upon it heavily in its
-interactions with the Oracle client interface, and in some scenarios it is not
-possible for SQLAlchemy to know exactly how data should be bound, as some
-settings can cause profoundly different performance characteristics, while
-altering the type coercion behavior at the same time.
-
-Users of the cx_Oracle dialect are **strongly encouraged** to read through
-cx_Oracle's list of built-in datatype symbols at
-https://cx-oracle.readthedocs.io/en/latest/api_manual/module.html#database-types.
-Note that in some cases, significant performance degradation can occur when
-using these types vs. not, in particular when specifying ``cx_Oracle.CLOB``.
-
-On the SQLAlchemy side, the :meth:`.DialectEvents.do_setinputsizes` event can
-be used both for runtime visibility (e.g. logging) of the setinputsizes step as
-well as to fully control how ``setinputsizes()`` is used on a per-statement
-basis.
-
-.. versionadded:: 1.2.9 Added :meth:`.DialectEvents.setinputsizes`
-
-
-Example 1 - logging all setinputsizes calls
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-The following example illustrates how to log the intermediary values from a
-SQLAlchemy perspective before they are converted to the raw ``setinputsizes()``
-parameter dictionary. The keys of the dictionary are :class:`.BindParameter`
-objects which have a ``.key`` and a ``.type`` attribute::
-
- from sqlalchemy import create_engine, event
-
- engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe")
-
- @event.listens_for(engine, "do_setinputsizes")
- def _log_setinputsizes(inputsizes, cursor, statement, parameters, context):
- for bindparam, dbapitype in inputsizes.items():
- log.info(
- "Bound parameter name: %s SQLAlchemy type: %r "
- "DBAPI object: %s",
- bindparam.key, bindparam.type, dbapitype)
-
-Example 2 - remove all bindings to CLOB
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-
-The ``CLOB`` datatype in cx_Oracle incurs a significant performance overhead,
-however is set by default for the ``Text`` type within the SQLAlchemy 1.2
-series. This setting can be modified as follows::
-
- from sqlalchemy import create_engine, event
- from cx_Oracle import CLOB
-
- engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe")
-
- @event.listens_for(engine, "do_setinputsizes")
- def _remove_clob(inputsizes, cursor, statement, parameters, context):
- for bindparam, dbapitype in list(inputsizes.items()):
- if dbapitype is CLOB:
- del inputsizes[bindparam]
-
-.. _cx_oracle_returning:
-
-RETURNING Support
------------------
-
-The cx_Oracle dialect implements RETURNING using OUT parameters.
-The dialect supports RETURNING fully.
-
-.. _cx_oracle_lob:
-
-LOB Datatypes
---------------
-
-LOB datatypes refer to the "large object" datatypes such as CLOB, NCLOB and
-BLOB. Modern versions of cx_Oracle and oracledb are optimized for these
-datatypes to be delivered as a single buffer. As such, SQLAlchemy makes use of
-these newer type handlers by default.
-
-To disable the use of newer type handlers and deliver LOB objects as classic
-buffered objects with a ``read()`` method, the parameter
-``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`,
-which takes place only engine-wide.
-
-Two Phase Transactions Not Supported
--------------------------------------
-
-Two phase transactions are **not supported** under cx_Oracle due to poor
-driver support. As of cx_Oracle 6.0b1, the interface for
-two phase transactions has been changed to be more of a direct pass-through
-to the underlying OCI layer with less automation. The additional logic
-to support this system is not implemented in SQLAlchemy.
-
-.. _cx_oracle_numeric:
-
-Precision Numerics
-------------------
-
-SQLAlchemy's numeric types can handle receiving and returning values as Python
-``Decimal`` objects or float objects. When a :class:`.Numeric` object, or a
-subclass such as :class:`.Float`, :class:`_oracle.DOUBLE_PRECISION` etc. is in
-use, the :paramref:`.Numeric.asdecimal` flag determines if values should be
-coerced to ``Decimal`` upon return, or returned as float objects. To make
-matters more complicated under Oracle, Oracle's ``NUMBER`` type can also
-represent integer values if the "scale" is zero, so the Oracle-specific
-:class:`_oracle.NUMBER` type takes this into account as well.
-
-The cx_Oracle dialect makes extensive use of connection- and cursor-level
-"outputtypehandler" callables in order to coerce numeric values as requested.
-These callables are specific to the specific flavor of :class:`.Numeric` in
-use, as well as if no SQLAlchemy typing objects are present. There are
-observed scenarios where Oracle may sends incomplete or ambiguous information
-about the numeric types being returned, such as a query where the numeric types
-are buried under multiple levels of subquery. The type handlers do their best
-to make the right decision in all cases, deferring to the underlying cx_Oracle
-DBAPI for all those cases where the driver can make the best decision.
-
-When no typing objects are present, as when executing plain SQL strings, a
-default "outputtypehandler" is present which will generally return numeric
-values which specify precision and scale as Python ``Decimal`` objects. To
-disable this coercion to decimal for performance reasons, pass the flag
-``coerce_to_decimal=False`` to :func:`_sa.create_engine`::
-
- engine = create_engine("oracle+cx_oracle://dsn", coerce_to_decimal=False)
-
-The ``coerce_to_decimal`` flag only impacts the results of plain string
-SQL statements that are not otherwise associated with a :class:`.Numeric`
-SQLAlchemy type (or a subclass of such).
-
-.. versionchanged:: 1.2 The numeric handling system for cx_Oracle has been
- reworked to take advantage of newer cx_Oracle features as well
- as better integration of outputtypehandlers.
-
-""" # noqa
-from __future__ import annotations
-
-import decimal
-import random
-import re
-
-from . import base as oracle
-from .base import OracleCompiler
-from .base import OracleDialect
-from .base import OracleExecutionContext
-from .types import _OracleDateLiteralRender
-from ... import exc
-from ... import util
-from ...engine import cursor as _cursor
-from ...engine import interfaces
-from ...engine import processors
-from ...sql import sqltypes
-from ...sql._typing import is_sql_compiler
-
-# source:
-# https://github.com/oracle/python-cx_Oracle/issues/596#issuecomment-999243649
-_CX_ORACLE_MAGIC_LOB_SIZE = 131072
-
-
-class _OracleInteger(sqltypes.Integer):
- def get_dbapi_type(self, dbapi):
- # see https://github.com/oracle/python-cx_Oracle/issues/
- # 208#issuecomment-409715955
- return int
-
- def _cx_oracle_var(self, dialect, cursor, arraysize=None):
- cx_Oracle = dialect.dbapi
- return cursor.var(
- cx_Oracle.STRING,
- 255,
- arraysize=arraysize if arraysize is not None else cursor.arraysize,
- outconverter=int,
- )
-
- def _cx_oracle_outputtypehandler(self, dialect):
- def handler(cursor, name, default_type, size, precision, scale):
- return self._cx_oracle_var(dialect, cursor)
-
- return handler
-
-
-class _OracleNumeric(sqltypes.Numeric):
- is_number = False
-
- def bind_processor(self, dialect):
- if self.scale == 0:
- return None
- elif self.asdecimal:
- processor = processors.to_decimal_processor_factory(
- decimal.Decimal, self._effective_decimal_return_scale
- )
-
- def process(value):
- if isinstance(value, (int, float)):
- return processor(value)
- elif value is not None and value.is_infinite():
- return float(value)
- else:
- return value
-
- return process
- else:
- return processors.to_float
-
- def result_processor(self, dialect, coltype):
- return None
-
- def _cx_oracle_outputtypehandler(self, dialect):
- cx_Oracle = dialect.dbapi
-
- def handler(cursor, name, default_type, size, precision, scale):
- outconverter = None
-
- if precision:
- if self.asdecimal:
- if default_type == cx_Oracle.NATIVE_FLOAT:
- # receiving float and doing Decimal after the fact
- # allows for float("inf") to be handled
- type_ = default_type
- outconverter = decimal.Decimal
- else:
- type_ = decimal.Decimal
- else:
- if self.is_number and scale == 0:
- # integer. cx_Oracle is observed to handle the widest
- # variety of ints when no directives are passed,
- # from 5.2 to 7.0. See [ticket:4457]
- return None
- else:
- type_ = cx_Oracle.NATIVE_FLOAT
-
- else:
- if self.asdecimal:
- if default_type == cx_Oracle.NATIVE_FLOAT:
- type_ = default_type
- outconverter = decimal.Decimal
- else:
- type_ = decimal.Decimal
- else:
- if self.is_number and scale == 0:
- # integer. cx_Oracle is observed to handle the widest
- # variety of ints when no directives are passed,
- # from 5.2 to 7.0. See [ticket:4457]
- return None
- else:
- type_ = cx_Oracle.NATIVE_FLOAT
-
- return cursor.var(
- type_,
- 255,
- arraysize=cursor.arraysize,
- outconverter=outconverter,
- )
-
- return handler
-
-
-class _OracleUUID(sqltypes.Uuid):
- def get_dbapi_type(self, dbapi):
- return dbapi.STRING
-
-
-class _OracleBinaryFloat(_OracleNumeric):
- def get_dbapi_type(self, dbapi):
- return dbapi.NATIVE_FLOAT
-
-
-class _OracleBINARY_FLOAT(_OracleBinaryFloat, oracle.BINARY_FLOAT):
- pass
-
-
-class _OracleBINARY_DOUBLE(_OracleBinaryFloat, oracle.BINARY_DOUBLE):
- pass
-
-
-class _OracleNUMBER(_OracleNumeric):
- is_number = True
-
-
-class _CXOracleDate(oracle._OracleDate):
- def bind_processor(self, dialect):
- return None
-
- def result_processor(self, dialect, coltype):
- def process(value):
- if value is not None:
- return value.date()
- else:
- return value
-
- return process
-
-
-class _CXOracleTIMESTAMP(_OracleDateLiteralRender, sqltypes.TIMESTAMP):
- def literal_processor(self, dialect):
- return self._literal_processor_datetime(dialect)
-
-
-class _LOBDataType:
- pass
-
-
-# TODO: the names used across CHAR / VARCHAR / NCHAR / NVARCHAR
-# here are inconsistent and not very good
-class _OracleChar(sqltypes.CHAR):
- def get_dbapi_type(self, dbapi):
- return dbapi.FIXED_CHAR
-
-
-class _OracleNChar(sqltypes.NCHAR):
- def get_dbapi_type(self, dbapi):
- return dbapi.FIXED_NCHAR
-
-
-class _OracleUnicodeStringNCHAR(oracle.NVARCHAR2):
- def get_dbapi_type(self, dbapi):
- return dbapi.NCHAR
-
-
-class _OracleUnicodeStringCHAR(sqltypes.Unicode):
- def get_dbapi_type(self, dbapi):
- return dbapi.LONG_STRING
-
-
-class _OracleUnicodeTextNCLOB(_LOBDataType, oracle.NCLOB):
- def get_dbapi_type(self, dbapi):
- # previously, this was dbapi.NCLOB.
- # DB_TYPE_NVARCHAR will instead be passed to setinputsizes()
- # when this datatype is used.
- return dbapi.DB_TYPE_NVARCHAR
-
-
-class _OracleUnicodeTextCLOB(_LOBDataType, sqltypes.UnicodeText):
- def get_dbapi_type(self, dbapi):
- # previously, this was dbapi.CLOB.
- # DB_TYPE_NVARCHAR will instead be passed to setinputsizes()
- # when this datatype is used.
- return dbapi.DB_TYPE_NVARCHAR
-
-
-class _OracleText(_LOBDataType, sqltypes.Text):
- def get_dbapi_type(self, dbapi):
- # previously, this was dbapi.CLOB.
- # DB_TYPE_NVARCHAR will instead be passed to setinputsizes()
- # when this datatype is used.
- return dbapi.DB_TYPE_NVARCHAR
-
-
-class _OracleLong(_LOBDataType, oracle.LONG):
- def get_dbapi_type(self, dbapi):
- return dbapi.LONG_STRING
-
-
-class _OracleString(sqltypes.String):
- pass
-
-
-class _OracleEnum(sqltypes.Enum):
- def bind_processor(self, dialect):
- enum_proc = sqltypes.Enum.bind_processor(self, dialect)
-
- def process(value):
- raw_str = enum_proc(value)
- return raw_str
-
- return process
-
-
-class _OracleBinary(_LOBDataType, sqltypes.LargeBinary):
- def get_dbapi_type(self, dbapi):
- # previously, this was dbapi.BLOB.
- # DB_TYPE_RAW will instead be passed to setinputsizes()
- # when this datatype is used.
- return dbapi.DB_TYPE_RAW
-
- def bind_processor(self, dialect):
- return None
-
- def result_processor(self, dialect, coltype):
- if not dialect.auto_convert_lobs:
- return None
- else:
- return super().result_processor(dialect, coltype)
-
-
-class _OracleInterval(oracle.INTERVAL):
- def get_dbapi_type(self, dbapi):
- return dbapi.INTERVAL
-
-
-class _OracleRaw(oracle.RAW):
- pass
-
-
-class _OracleRowid(oracle.ROWID):
- def get_dbapi_type(self, dbapi):
- return dbapi.ROWID
-
-
-class OracleCompiler_cx_oracle(OracleCompiler):
- _oracle_cx_sql_compiler = True
-
- _oracle_returning = False
-
- # Oracle bind names can't start with digits or underscores.
- # currently we rely upon Oracle-specific quoting of bind names in most
- # cases. however for expanding params, the escape chars are used.
- # see #8708
- bindname_escape_characters = util.immutabledict(
- {
- "%": "P",
- "(": "A",
- ")": "Z",
- ":": "C",
- ".": "C",
- "[": "C",
- "]": "C",
- " ": "C",
- "\\": "C",
- "/": "C",
- "?": "C",
- }
- )
-
- def bindparam_string(self, name, **kw):
- quote = getattr(name, "quote", None)
- if (
- quote is True
- or quote is not False
- and self.preparer._bindparam_requires_quotes(name)
- # bind param quoting for Oracle doesn't work with post_compile
- # params. For those, the default bindparam_string will escape
- # special chars, and the appending of a number "_1" etc. will
- # take care of reserved words
- and not kw.get("post_compile", False)
- ):
- # interesting to note about expanding parameters - since the
- # new parameters take the form <paramname>_<int>, at least if
- # they are originally formed from reserved words, they no longer
- # need quoting :). names that include illegal characters
- # won't work however.
- quoted_name = '"%s"' % name
- kw["escaped_from"] = name
- name = quoted_name
- return OracleCompiler.bindparam_string(self, name, **kw)
-
- # TODO: we could likely do away with quoting altogether for
- # Oracle parameters and use the custom escaping here
- escaped_from = kw.get("escaped_from", None)
- if not escaped_from:
- if self._bind_translate_re.search(name):
- # not quite the translate use case as we want to
- # also get a quick boolean if we even found
- # unusual characters in the name
- new_name = self._bind_translate_re.sub(
- lambda m: self._bind_translate_chars[m.group(0)],
- name,
- )
- if new_name[0].isdigit() or new_name[0] == "_":
- new_name = "D" + new_name
- kw["escaped_from"] = name
- name = new_name
- elif name[0].isdigit() or name[0] == "_":
- new_name = "D" + name
- kw["escaped_from"] = name
- name = new_name
-
- return OracleCompiler.bindparam_string(self, name, **kw)
-
-
-class OracleExecutionContext_cx_oracle(OracleExecutionContext):
- out_parameters = None
-
- def _generate_out_parameter_vars(self):
- # check for has_out_parameters or RETURNING, create cx_Oracle.var
- # objects if so
- if self.compiled.has_out_parameters or self.compiled._oracle_returning:
- out_parameters = self.out_parameters
- assert out_parameters is not None
-
- len_params = len(self.parameters)
-
- quoted_bind_names = self.compiled.escaped_bind_names
- for bindparam in self.compiled.binds.values():
- if bindparam.isoutparam:
- name = self.compiled.bind_names[bindparam]
- type_impl = bindparam.type.dialect_impl(self.dialect)
-
- if hasattr(type_impl, "_cx_oracle_var"):
- out_parameters[name] = type_impl._cx_oracle_var(
- self.dialect, self.cursor, arraysize=len_params
- )
- else:
- dbtype = type_impl.get_dbapi_type(self.dialect.dbapi)
-
- cx_Oracle = self.dialect.dbapi
-
- assert cx_Oracle is not None
-
- if dbtype is None:
- raise exc.InvalidRequestError(
- "Cannot create out parameter for "
- "parameter "
- "%r - its type %r is not supported by"
- " cx_oracle" % (bindparam.key, bindparam.type)
- )
-
- # note this is an OUT parameter. Using
- # non-LOB datavalues with large unicode-holding
- # values causes the failure (both cx_Oracle and
- # oracledb):
- # ORA-22835: Buffer too small for CLOB to CHAR or
- # BLOB to RAW conversion (actual: 16507,
- # maximum: 4000)
- # [SQL: INSERT INTO long_text (x, y, z) VALUES
- # (:x, :y, :z) RETURNING long_text.x, long_text.y,
- # long_text.z INTO :ret_0, :ret_1, :ret_2]
- # so even for DB_TYPE_NVARCHAR we convert to a LOB
-
- if isinstance(type_impl, _LOBDataType):
- if dbtype == cx_Oracle.DB_TYPE_NVARCHAR:
- dbtype = cx_Oracle.NCLOB
- elif dbtype == cx_Oracle.DB_TYPE_RAW:
- dbtype = cx_Oracle.BLOB
- # other LOB types go in directly
-
- out_parameters[name] = self.cursor.var(
- dbtype,
- # this is fine also in oracledb_async since
- # the driver will await the read coroutine
- outconverter=lambda value: value.read(),
- arraysize=len_params,
- )
- elif (
- isinstance(type_impl, _OracleNumeric)
- and type_impl.asdecimal
- ):
- out_parameters[name] = self.cursor.var(
- decimal.Decimal,
- arraysize=len_params,
- )
-
- else:
- out_parameters[name] = self.cursor.var(
- dbtype, arraysize=len_params
- )
-
- for param in self.parameters:
- param[quoted_bind_names.get(name, name)] = (
- out_parameters[name]
- )
-
- def _generate_cursor_outputtype_handler(self):
- output_handlers = {}
-
- for keyname, name, objects, type_ in self.compiled._result_columns:
- handler = type_._cached_custom_processor(
- self.dialect,
- "cx_oracle_outputtypehandler",
- self._get_cx_oracle_type_handler,
- )
-
- if handler:
- denormalized_name = self.dialect.denormalize_name(keyname)
- output_handlers[denormalized_name] = handler
-
- if output_handlers:
- default_handler = self._dbapi_connection.outputtypehandler
-
- def output_type_handler(
- cursor, name, default_type, size, precision, scale
- ):
- if name in output_handlers:
- return output_handlers[name](
- cursor, name, default_type, size, precision, scale
- )
- else:
- return default_handler(
- cursor, name, default_type, size, precision, scale
- )
-
- self.cursor.outputtypehandler = output_type_handler
-
- def _get_cx_oracle_type_handler(self, impl):
- if hasattr(impl, "_cx_oracle_outputtypehandler"):
- return impl._cx_oracle_outputtypehandler(self.dialect)
- else:
- return None
-
- def pre_exec(self):
- super().pre_exec()
- if not getattr(self.compiled, "_oracle_cx_sql_compiler", False):
- return
-
- self.out_parameters = {}
-
- self._generate_out_parameter_vars()
-
- self._generate_cursor_outputtype_handler()
-
- def post_exec(self):
- if (
- self.compiled
- and is_sql_compiler(self.compiled)
- and self.compiled._oracle_returning
- ):
- initial_buffer = self.fetchall_for_returning(
- self.cursor, _internal=True
- )
-
- fetch_strategy = _cursor.FullyBufferedCursorFetchStrategy(
- self.cursor,
- [
- (entry.keyname, None)
- for entry in self.compiled._result_columns
- ],
- initial_buffer=initial_buffer,
- )
-
- self.cursor_fetch_strategy = fetch_strategy
-
- def create_cursor(self):
- c = self._dbapi_connection.cursor()
- if self.dialect.arraysize:
- c.arraysize = self.dialect.arraysize
-
- return c
-
- def fetchall_for_returning(self, cursor, *, _internal=False):
- compiled = self.compiled
- if (
- not _internal
- and compiled is None
- or not is_sql_compiler(compiled)
- or not compiled._oracle_returning
- ):
- raise NotImplementedError(
- "execution context was not prepared for Oracle RETURNING"
- )
-
- # create a fake cursor result from the out parameters. unlike
- # get_out_parameter_values(), the result-row handlers here will be
- # applied at the Result level
-
- numcols = len(self.out_parameters)
-
- # [stmt_result for stmt_result in outparam.values] == each
- # statement in executemany
- # [val for val in stmt_result] == each row for a particular
- # statement
- return list(
- zip(
- *[
- [
- val
- for stmt_result in self.out_parameters[
- f"ret_{j}"
- ].values
- for val in (stmt_result or ())
- ]
- for j in range(numcols)
- ]
- )
- )
-
- def get_out_parameter_values(self, out_param_names):
- # this method should not be called when the compiler has
- # RETURNING as we've turned the has_out_parameters flag set to
- # False.
- assert not self.compiled.returning
-
- return [
- self.dialect._paramval(self.out_parameters[name])
- for name in out_param_names
- ]
-
-
-class OracleDialect_cx_oracle(OracleDialect):
- supports_statement_cache = True
- execution_ctx_cls = OracleExecutionContext_cx_oracle
- statement_compiler = OracleCompiler_cx_oracle
-
- supports_sane_rowcount = True
- supports_sane_multi_rowcount = True
-
- insert_executemany_returning = True
- insert_executemany_returning_sort_by_parameter_order = True
- update_executemany_returning = True
- delete_executemany_returning = True
-
- bind_typing = interfaces.BindTyping.SETINPUTSIZES
-
- driver = "cx_oracle"
-
- colspecs = util.update_copy(
- OracleDialect.colspecs,
- {
- sqltypes.TIMESTAMP: _CXOracleTIMESTAMP,
- sqltypes.Numeric: _OracleNumeric,
- sqltypes.Float: _OracleNumeric,
- oracle.BINARY_FLOAT: _OracleBINARY_FLOAT,
- oracle.BINARY_DOUBLE: _OracleBINARY_DOUBLE,
- sqltypes.Integer: _OracleInteger,
- oracle.NUMBER: _OracleNUMBER,
- sqltypes.Date: _CXOracleDate,
- sqltypes.LargeBinary: _OracleBinary,
- sqltypes.Boolean: oracle._OracleBoolean,
- sqltypes.Interval: _OracleInterval,
- oracle.INTERVAL: _OracleInterval,
- sqltypes.Text: _OracleText,
- sqltypes.String: _OracleString,
- sqltypes.UnicodeText: _OracleUnicodeTextCLOB,
- sqltypes.CHAR: _OracleChar,
- sqltypes.NCHAR: _OracleNChar,
- sqltypes.Enum: _OracleEnum,
- oracle.LONG: _OracleLong,
- oracle.RAW: _OracleRaw,
- sqltypes.Unicode: _OracleUnicodeStringCHAR,
- sqltypes.NVARCHAR: _OracleUnicodeStringNCHAR,
- sqltypes.Uuid: _OracleUUID,
- oracle.NCLOB: _OracleUnicodeTextNCLOB,
- oracle.ROWID: _OracleRowid,
- },
- )
-
- execute_sequence_format = list
-
- _cx_oracle_threaded = None
-
- _cursor_var_unicode_kwargs = util.immutabledict()
-
- @util.deprecated_params(
- threaded=(
- "1.3",
- "The 'threaded' parameter to the cx_oracle/oracledb dialect "
- "is deprecated as a dialect-level argument, and will be removed "
- "in a future release. As of version 1.3, it defaults to False "
- "rather than True. The 'threaded' option can be passed to "
- "cx_Oracle directly in the URL query string passed to "
- ":func:`_sa.create_engine`.",
- )
- )
- def __init__(
- self,
- auto_convert_lobs=True,
- coerce_to_decimal=True,
- arraysize=None,
- encoding_errors=None,
- threaded=None,
- **kwargs,
- ):
- OracleDialect.__init__(self, **kwargs)
- self.arraysize = arraysize
- self.encoding_errors = encoding_errors
- if encoding_errors:
- self._cursor_var_unicode_kwargs = {
- "encodingErrors": encoding_errors
- }
- if threaded is not None:
- self._cx_oracle_threaded = threaded
- self.auto_convert_lobs = auto_convert_lobs
- self.coerce_to_decimal = coerce_to_decimal
- if self._use_nchar_for_unicode:
- self.colspecs = self.colspecs.copy()
- self.colspecs[sqltypes.Unicode] = _OracleUnicodeStringNCHAR
- self.colspecs[sqltypes.UnicodeText] = _OracleUnicodeTextNCLOB
-
- dbapi_module = self.dbapi
- self._load_version(dbapi_module)
-
- if dbapi_module is not None:
- # these constants will first be seen in SQLAlchemy datatypes
- # coming from the get_dbapi_type() method. We then
- # will place the following types into setinputsizes() calls
- # on each statement. Oracle constants that are not in this
- # list will not be put into setinputsizes().
- self.include_set_input_sizes = {
- dbapi_module.DATETIME,
- dbapi_module.DB_TYPE_NVARCHAR, # used for CLOB, NCLOB
- dbapi_module.DB_TYPE_RAW, # used for BLOB
- dbapi_module.NCLOB, # not currently used except for OUT param
- dbapi_module.CLOB, # not currently used except for OUT param
- dbapi_module.LOB, # not currently used
- dbapi_module.BLOB, # not currently used except for OUT param
- dbapi_module.NCHAR,
- dbapi_module.FIXED_NCHAR,
- dbapi_module.FIXED_CHAR,
- dbapi_module.TIMESTAMP,
- int, # _OracleInteger,
- # _OracleBINARY_FLOAT, _OracleBINARY_DOUBLE,
- dbapi_module.NATIVE_FLOAT,
- }
-
- self._paramval = lambda value: value.getvalue()
-
- def _load_version(self, dbapi_module):
- version = (0, 0, 0)
- if dbapi_module is not None:
- m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", dbapi_module.version)
- if m:
- version = tuple(
- int(x) for x in m.group(1, 2, 3) if x is not None
- )
- self.cx_oracle_ver = version
- if self.cx_oracle_ver < (8,) and self.cx_oracle_ver > (0, 0, 0):
- raise exc.InvalidRequestError(
- "cx_Oracle version 8 and above are supported"
- )
-
- @classmethod
- def import_dbapi(cls):
- import cx_Oracle
-
- return cx_Oracle
-
- def initialize(self, connection):
- super().initialize(connection)
- self._detect_decimal_char(connection)
-
- def get_isolation_level(self, dbapi_connection):
- # sources:
-
- # general idea of transaction id, have to start one, etc.
- # https://stackoverflow.com/questions/10711204/how-to-check-isoloation-level
-
- # how to decode xid cols from v$transaction to match
- # https://asktom.oracle.com/pls/apex/f?p=100:11:0::::P11_QUESTION_ID:9532779900346079444
-
- # Oracle tuple comparison without using IN:
- # https://www.sql-workbench.eu/comparison/tuple_comparison.html
-
- with dbapi_connection.cursor() as cursor:
- # this is the only way to ensure a transaction is started without
- # actually running DML. There's no way to see the configured
- # isolation level without getting it from v$transaction which
- # means transaction has to be started.
- outval = cursor.var(str)
- cursor.execute(
- """
- begin
- :trans_id := dbms_transaction.local_transaction_id( TRUE );
- end;
- """,
- {"trans_id": outval},
- )
- trans_id = outval.getvalue()
- xidusn, xidslot, xidsqn = trans_id.split(".", 2)
-
- cursor.execute(
- "SELECT CASE BITAND(t.flag, POWER(2, 28)) "
- "WHEN 0 THEN 'READ COMMITTED' "
- "ELSE 'SERIALIZABLE' END AS isolation_level "
- "FROM v$transaction t WHERE "
- "(t.xidusn, t.xidslot, t.xidsqn) = "
- "((:xidusn, :xidslot, :xidsqn))",
- {"xidusn": xidusn, "xidslot": xidslot, "xidsqn": xidsqn},
- )
- row = cursor.fetchone()
- if row is None:
- raise exc.InvalidRequestError(
- "could not retrieve isolation level"
- )
- result = row[0]
-
- return result
-
- def get_isolation_level_values(self, dbapi_connection):
- return super().get_isolation_level_values(dbapi_connection) + [
- "AUTOCOMMIT"
- ]
-
- def set_isolation_level(self, dbapi_connection, level):
- if level == "AUTOCOMMIT":
- dbapi_connection.autocommit = True
- else:
- dbapi_connection.autocommit = False
- dbapi_connection.rollback()
- with dbapi_connection.cursor() as cursor:
- cursor.execute(f"ALTER SESSION SET ISOLATION_LEVEL={level}")
-
- def _detect_decimal_char(self, connection):
- # we have the option to change this setting upon connect,
- # or just look at what it is upon connect and convert.
- # to minimize the chance of interference with changes to
- # NLS_TERRITORY or formatting behavior of the DB, we opt
- # to just look at it
-
- dbapi_connection = connection.connection
-
- with dbapi_connection.cursor() as cursor:
- # issue #8744
- # nls_session_parameters is not available in some Oracle
- # modes like "mount mode". But then, v$nls_parameters is not
- # available if the connection doesn't have SYSDBA priv.
- #
- # simplify the whole thing and just use the method that we were
- # doing in the test suite already, selecting a number
-
- def output_type_handler(
- cursor, name, defaultType, size, precision, scale
- ):
- return cursor.var(
- self.dbapi.STRING, 255, arraysize=cursor.arraysize
- )
-
- cursor.outputtypehandler = output_type_handler
- cursor.execute("SELECT 1.1 FROM DUAL")
- value = cursor.fetchone()[0]
-
- decimal_char = value.lstrip("0")[1]
- assert not decimal_char[0].isdigit()
-
- self._decimal_char = decimal_char
-
- if self._decimal_char != ".":
- _detect_decimal = self._detect_decimal
- _to_decimal = self._to_decimal
-
- self._detect_decimal = lambda value: _detect_decimal(
- value.replace(self._decimal_char, ".")
- )
- self._to_decimal = lambda value: _to_decimal(
- value.replace(self._decimal_char, ".")
- )
-
- def _detect_decimal(self, value):
- if "." in value:
- return self._to_decimal(value)
- else:
- return int(value)
-
- _to_decimal = decimal.Decimal
-
- def _generate_connection_outputtype_handler(self):
- """establish the default outputtypehandler established at the
- connection level.
-
- """
-
- dialect = self
- cx_Oracle = dialect.dbapi
-
- number_handler = _OracleNUMBER(
- asdecimal=True
- )._cx_oracle_outputtypehandler(dialect)
- float_handler = _OracleNUMBER(
- asdecimal=False
- )._cx_oracle_outputtypehandler(dialect)
-
- def output_type_handler(
- cursor, name, default_type, size, precision, scale
- ):
- if (
- default_type == cx_Oracle.NUMBER
- and default_type is not cx_Oracle.NATIVE_FLOAT
- ):
- if not dialect.coerce_to_decimal:
- return None
- elif precision == 0 and scale in (0, -127):
- # ambiguous type, this occurs when selecting
- # numbers from deep subqueries
- return cursor.var(
- cx_Oracle.STRING,
- 255,
- outconverter=dialect._detect_decimal,
- arraysize=cursor.arraysize,
- )
- elif precision and scale > 0:
- return number_handler(
- cursor, name, default_type, size, precision, scale
- )
- else:
- return float_handler(
- cursor, name, default_type, size, precision, scale
- )
-
- # if unicode options were specified, add a decoder, otherwise
- # cx_Oracle should return Unicode
- elif (
- dialect._cursor_var_unicode_kwargs
- and default_type
- in (
- cx_Oracle.STRING,
- cx_Oracle.FIXED_CHAR,
- )
- and default_type is not cx_Oracle.CLOB
- and default_type is not cx_Oracle.NCLOB
- ):
- return cursor.var(
- str,
- size,
- cursor.arraysize,
- **dialect._cursor_var_unicode_kwargs,
- )
-
- elif dialect.auto_convert_lobs and default_type in (
- cx_Oracle.CLOB,
- cx_Oracle.NCLOB,
- ):
- return cursor.var(
- cx_Oracle.DB_TYPE_NVARCHAR,
- _CX_ORACLE_MAGIC_LOB_SIZE,
- cursor.arraysize,
- **dialect._cursor_var_unicode_kwargs,
- )
-
- elif dialect.auto_convert_lobs and default_type in (
- cx_Oracle.BLOB,
- ):
- return cursor.var(
- cx_Oracle.DB_TYPE_RAW,
- _CX_ORACLE_MAGIC_LOB_SIZE,
- cursor.arraysize,
- )
-
- return output_type_handler
-
- def on_connect(self):
- output_type_handler = self._generate_connection_outputtype_handler()
-
- def on_connect(conn):
- conn.outputtypehandler = output_type_handler
-
- return on_connect
-
- def create_connect_args(self, url):
- opts = dict(url.query)
-
- for opt in ("use_ansi", "auto_convert_lobs"):
- if opt in opts:
- util.warn_deprecated(
- f"{self.driver} dialect option {opt!r} should only be "
- "passed to create_engine directly, not within the URL "
- "string",
- version="1.3",
- )
- util.coerce_kw_type(opts, opt, bool)
- setattr(self, opt, opts.pop(opt))
-
- database = url.database
- service_name = opts.pop("service_name", None)
- if database or service_name:
- # if we have a database, then we have a remote host
- port = url.port
- if port:
- port = int(port)
- else:
- port = 1521
-
- if database and service_name:
- raise exc.InvalidRequestError(
- '"service_name" option shouldn\'t '
- 'be used with a "database" part of the url'
- )
- if database:
- makedsn_kwargs = {"sid": database}
- if service_name:
- makedsn_kwargs = {"service_name": service_name}
-
- dsn = self.dbapi.makedsn(url.host, port, **makedsn_kwargs)
- else:
- # we have a local tnsname
- dsn = url.host
-
- if dsn is not None:
- opts["dsn"] = dsn
- if url.password is not None:
- opts["password"] = url.password
- if url.username is not None:
- opts["user"] = url.username
-
- if self._cx_oracle_threaded is not None:
- opts.setdefault("threaded", self._cx_oracle_threaded)
-
- def convert_cx_oracle_constant(value):
- if isinstance(value, str):
- try:
- int_val = int(value)
- except ValueError:
- value = value.upper()
- return getattr(self.dbapi, value)
- else:
- return int_val
- else:
- return value
-
- util.coerce_kw_type(opts, "mode", convert_cx_oracle_constant)
- util.coerce_kw_type(opts, "threaded", bool)
- util.coerce_kw_type(opts, "events", bool)
- util.coerce_kw_type(opts, "purity", convert_cx_oracle_constant)
- return ([], opts)
-
- def _get_server_version_info(self, connection):
- return tuple(int(x) for x in connection.connection.version.split("."))
-
- def is_disconnect(self, e, connection, cursor):
- (error,) = e.args
- if isinstance(
- e, (self.dbapi.InterfaceError, self.dbapi.DatabaseError)
- ) and "not connected" in str(e):
- return True
-
- if hasattr(error, "code") and error.code in {
- 28,
- 3114,
- 3113,
- 3135,
- 1033,
- 2396,
- }:
- # ORA-00028: your session has been killed
- # ORA-03114: not connected to ORACLE
- # ORA-03113: end-of-file on communication channel
- # ORA-03135: connection lost contact
- # ORA-01033: ORACLE initialization or shutdown in progress
- # ORA-02396: exceeded maximum idle time, please connect again
- # TODO: Others ?
- return True
-
- if re.match(r"^(?:DPI-1010|DPI-1080|DPY-1001|DPY-4011)", str(e)):
- # DPI-1010: not connected
- # DPI-1080: connection was closed by ORA-3113
- # python-oracledb's DPY-1001: not connected to database
- # python-oracledb's DPY-4011: the database or network closed the
- # connection
- # TODO: others?
- return True
-
- return False
-
- def create_xid(self):
- """create a two-phase transaction ID.
-
- this id will be passed to do_begin_twophase(), do_rollback_twophase(),
- do_commit_twophase(). its format is unspecified.
-
- """
-
- id_ = random.randint(0, 2**128)
- return (0x1234, "%032x" % id_, "%032x" % 9)
-
- def do_executemany(self, cursor, statement, parameters, context=None):
- if isinstance(parameters, tuple):
- parameters = list(parameters)
- cursor.executemany(statement, parameters)
-
- def do_begin_twophase(self, connection, xid):
- connection.connection.begin(*xid)
- connection.connection.info["cx_oracle_xid"] = xid
-
- def do_prepare_twophase(self, connection, xid):
- result = connection.connection.prepare()
- connection.info["cx_oracle_prepared"] = result
-
- def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- self.do_rollback(connection.connection)
- # TODO: need to end XA state here
-
- def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if not is_prepared:
- self.do_commit(connection.connection)
- else:
- if recover:
- raise NotImplementedError(
- "2pc recovery not implemented for cx_Oracle"
- )
- oci_prepared = connection.info["cx_oracle_prepared"]
- if oci_prepared:
- self.do_commit(connection.connection)
- # TODO: need to end XA state here
-
- def do_set_input_sizes(self, cursor, list_of_tuples, context):
- if self.positional:
- # not usually used, here to support if someone is modifying
- # the dialect to use positional style
- cursor.setinputsizes(
- *[dbtype for key, dbtype, sqltype in list_of_tuples]
- )
- else:
- collection = (
- (key, dbtype)
- for key, dbtype, sqltype in list_of_tuples
- if dbtype
- )
-
- cursor.setinputsizes(**{key: dbtype for key, dbtype in collection})
-
- def do_recover_twophase(self, connection):
- raise NotImplementedError(
- "recover two phase query for cx_Oracle not implemented"
- )
-
-
-dialect = OracleDialect_cx_oracle
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py
deleted file mode 100644
index 63479b9..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py
+++ /dev/null
@@ -1,507 +0,0 @@
-# dialects/oracle/dictionary.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-from .types import DATE
-from .types import LONG
-from .types import NUMBER
-from .types import RAW
-from .types import VARCHAR2
-from ... import Column
-from ... import MetaData
-from ... import Table
-from ... import table
-from ...sql.sqltypes import CHAR
-
-# constants
-DB_LINK_PLACEHOLDER = "__$sa_dblink$__"
-# tables
-dual = table("dual")
-dictionary_meta = MetaData()
-
-# NOTE: all the dictionary_meta are aliases because oracle does not like
-# using the full table@dblink for every column in query, and complains with
-# ORA-00960: ambiguous column naming in select list
-all_tables = Table(
- "all_tables" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("table_name", VARCHAR2(128), nullable=False),
- Column("tablespace_name", VARCHAR2(30)),
- Column("cluster_name", VARCHAR2(128)),
- Column("iot_name", VARCHAR2(128)),
- Column("status", VARCHAR2(8)),
- Column("pct_free", NUMBER),
- Column("pct_used", NUMBER),
- Column("ini_trans", NUMBER),
- Column("max_trans", NUMBER),
- Column("initial_extent", NUMBER),
- Column("next_extent", NUMBER),
- Column("min_extents", NUMBER),
- Column("max_extents", NUMBER),
- Column("pct_increase", NUMBER),
- Column("freelists", NUMBER),
- Column("freelist_groups", NUMBER),
- Column("logging", VARCHAR2(3)),
- Column("backed_up", VARCHAR2(1)),
- Column("num_rows", NUMBER),
- Column("blocks", NUMBER),
- Column("empty_blocks", NUMBER),
- Column("avg_space", NUMBER),
- Column("chain_cnt", NUMBER),
- Column("avg_row_len", NUMBER),
- Column("avg_space_freelist_blocks", NUMBER),
- Column("num_freelist_blocks", NUMBER),
- Column("degree", VARCHAR2(10)),
- Column("instances", VARCHAR2(10)),
- Column("cache", VARCHAR2(5)),
- Column("table_lock", VARCHAR2(8)),
- Column("sample_size", NUMBER),
- Column("last_analyzed", DATE),
- Column("partitioned", VARCHAR2(3)),
- Column("iot_type", VARCHAR2(12)),
- Column("temporary", VARCHAR2(1)),
- Column("secondary", VARCHAR2(1)),
- Column("nested", VARCHAR2(3)),
- Column("buffer_pool", VARCHAR2(7)),
- Column("flash_cache", VARCHAR2(7)),
- Column("cell_flash_cache", VARCHAR2(7)),
- Column("row_movement", VARCHAR2(8)),
- Column("global_stats", VARCHAR2(3)),
- Column("user_stats", VARCHAR2(3)),
- Column("duration", VARCHAR2(15)),
- Column("skip_corrupt", VARCHAR2(8)),
- Column("monitoring", VARCHAR2(3)),
- Column("cluster_owner", VARCHAR2(128)),
- Column("dependencies", VARCHAR2(8)),
- Column("compression", VARCHAR2(8)),
- Column("compress_for", VARCHAR2(30)),
- Column("dropped", VARCHAR2(3)),
- Column("read_only", VARCHAR2(3)),
- Column("segment_created", VARCHAR2(3)),
- Column("result_cache", VARCHAR2(7)),
- Column("clustering", VARCHAR2(3)),
- Column("activity_tracking", VARCHAR2(23)),
- Column("dml_timestamp", VARCHAR2(25)),
- Column("has_identity", VARCHAR2(3)),
- Column("container_data", VARCHAR2(3)),
- Column("inmemory", VARCHAR2(8)),
- Column("inmemory_priority", VARCHAR2(8)),
- Column("inmemory_distribute", VARCHAR2(15)),
- Column("inmemory_compression", VARCHAR2(17)),
- Column("inmemory_duplicate", VARCHAR2(13)),
- Column("default_collation", VARCHAR2(100)),
- Column("duplicated", VARCHAR2(1)),
- Column("sharded", VARCHAR2(1)),
- Column("externally_sharded", VARCHAR2(1)),
- Column("externally_duplicated", VARCHAR2(1)),
- Column("external", VARCHAR2(3)),
- Column("hybrid", VARCHAR2(3)),
- Column("cellmemory", VARCHAR2(24)),
- Column("containers_default", VARCHAR2(3)),
- Column("container_map", VARCHAR2(3)),
- Column("extended_data_link", VARCHAR2(3)),
- Column("extended_data_link_map", VARCHAR2(3)),
- Column("inmemory_service", VARCHAR2(12)),
- Column("inmemory_service_name", VARCHAR2(1000)),
- Column("container_map_object", VARCHAR2(3)),
- Column("memoptimize_read", VARCHAR2(8)),
- Column("memoptimize_write", VARCHAR2(8)),
- Column("has_sensitive_column", VARCHAR2(3)),
- Column("admit_null", VARCHAR2(3)),
- Column("data_link_dml_enabled", VARCHAR2(3)),
- Column("logical_replication", VARCHAR2(8)),
-).alias("a_tables")
-
-all_views = Table(
- "all_views" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("view_name", VARCHAR2(128), nullable=False),
- Column("text_length", NUMBER),
- Column("text", LONG),
- Column("text_vc", VARCHAR2(4000)),
- Column("type_text_length", NUMBER),
- Column("type_text", VARCHAR2(4000)),
- Column("oid_text_length", NUMBER),
- Column("oid_text", VARCHAR2(4000)),
- Column("view_type_owner", VARCHAR2(128)),
- Column("view_type", VARCHAR2(128)),
- Column("superview_name", VARCHAR2(128)),
- Column("editioning_view", VARCHAR2(1)),
- Column("read_only", VARCHAR2(1)),
- Column("container_data", VARCHAR2(1)),
- Column("bequeath", VARCHAR2(12)),
- Column("origin_con_id", VARCHAR2(256)),
- Column("default_collation", VARCHAR2(100)),
- Column("containers_default", VARCHAR2(3)),
- Column("container_map", VARCHAR2(3)),
- Column("extended_data_link", VARCHAR2(3)),
- Column("extended_data_link_map", VARCHAR2(3)),
- Column("has_sensitive_column", VARCHAR2(3)),
- Column("admit_null", VARCHAR2(3)),
- Column("pdb_local_only", VARCHAR2(3)),
-).alias("a_views")
-
-all_sequences = Table(
- "all_sequences" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("sequence_owner", VARCHAR2(128), nullable=False),
- Column("sequence_name", VARCHAR2(128), nullable=False),
- Column("min_value", NUMBER),
- Column("max_value", NUMBER),
- Column("increment_by", NUMBER, nullable=False),
- Column("cycle_flag", VARCHAR2(1)),
- Column("order_flag", VARCHAR2(1)),
- Column("cache_size", NUMBER, nullable=False),
- Column("last_number", NUMBER, nullable=False),
- Column("scale_flag", VARCHAR2(1)),
- Column("extend_flag", VARCHAR2(1)),
- Column("sharded_flag", VARCHAR2(1)),
- Column("session_flag", VARCHAR2(1)),
- Column("keep_value", VARCHAR2(1)),
-).alias("a_sequences")
-
-all_users = Table(
- "all_users" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("username", VARCHAR2(128), nullable=False),
- Column("user_id", NUMBER, nullable=False),
- Column("created", DATE, nullable=False),
- Column("common", VARCHAR2(3)),
- Column("oracle_maintained", VARCHAR2(1)),
- Column("inherited", VARCHAR2(3)),
- Column("default_collation", VARCHAR2(100)),
- Column("implicit", VARCHAR2(3)),
- Column("all_shard", VARCHAR2(3)),
- Column("external_shard", VARCHAR2(3)),
-).alias("a_users")
-
-all_mviews = Table(
- "all_mviews" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("mview_name", VARCHAR2(128), nullable=False),
- Column("container_name", VARCHAR2(128), nullable=False),
- Column("query", LONG),
- Column("query_len", NUMBER(38)),
- Column("updatable", VARCHAR2(1)),
- Column("update_log", VARCHAR2(128)),
- Column("master_rollback_seg", VARCHAR2(128)),
- Column("master_link", VARCHAR2(128)),
- Column("rewrite_enabled", VARCHAR2(1)),
- Column("rewrite_capability", VARCHAR2(9)),
- Column("refresh_mode", VARCHAR2(6)),
- Column("refresh_method", VARCHAR2(8)),
- Column("build_mode", VARCHAR2(9)),
- Column("fast_refreshable", VARCHAR2(18)),
- Column("last_refresh_type", VARCHAR2(8)),
- Column("last_refresh_date", DATE),
- Column("last_refresh_end_time", DATE),
- Column("staleness", VARCHAR2(19)),
- Column("after_fast_refresh", VARCHAR2(19)),
- Column("unknown_prebuilt", VARCHAR2(1)),
- Column("unknown_plsql_func", VARCHAR2(1)),
- Column("unknown_external_table", VARCHAR2(1)),
- Column("unknown_consider_fresh", VARCHAR2(1)),
- Column("unknown_import", VARCHAR2(1)),
- Column("unknown_trusted_fd", VARCHAR2(1)),
- Column("compile_state", VARCHAR2(19)),
- Column("use_no_index", VARCHAR2(1)),
- Column("stale_since", DATE),
- Column("num_pct_tables", NUMBER),
- Column("num_fresh_pct_regions", NUMBER),
- Column("num_stale_pct_regions", NUMBER),
- Column("segment_created", VARCHAR2(3)),
- Column("evaluation_edition", VARCHAR2(128)),
- Column("unusable_before", VARCHAR2(128)),
- Column("unusable_beginning", VARCHAR2(128)),
- Column("default_collation", VARCHAR2(100)),
- Column("on_query_computation", VARCHAR2(1)),
- Column("auto", VARCHAR2(3)),
-).alias("a_mviews")
-
-all_tab_identity_cols = Table(
- "all_tab_identity_cols" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("table_name", VARCHAR2(128), nullable=False),
- Column("column_name", VARCHAR2(128), nullable=False),
- Column("generation_type", VARCHAR2(10)),
- Column("sequence_name", VARCHAR2(128), nullable=False),
- Column("identity_options", VARCHAR2(298)),
-).alias("a_tab_identity_cols")
-
-all_tab_cols = Table(
- "all_tab_cols" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("table_name", VARCHAR2(128), nullable=False),
- Column("column_name", VARCHAR2(128), nullable=False),
- Column("data_type", VARCHAR2(128)),
- Column("data_type_mod", VARCHAR2(3)),
- Column("data_type_owner", VARCHAR2(128)),
- Column("data_length", NUMBER, nullable=False),
- Column("data_precision", NUMBER),
- Column("data_scale", NUMBER),
- Column("nullable", VARCHAR2(1)),
- Column("column_id", NUMBER),
- Column("default_length", NUMBER),
- Column("data_default", LONG),
- Column("num_distinct", NUMBER),
- Column("low_value", RAW(1000)),
- Column("high_value", RAW(1000)),
- Column("density", NUMBER),
- Column("num_nulls", NUMBER),
- Column("num_buckets", NUMBER),
- Column("last_analyzed", DATE),
- Column("sample_size", NUMBER),
- Column("character_set_name", VARCHAR2(44)),
- Column("char_col_decl_length", NUMBER),
- Column("global_stats", VARCHAR2(3)),
- Column("user_stats", VARCHAR2(3)),
- Column("avg_col_len", NUMBER),
- Column("char_length", NUMBER),
- Column("char_used", VARCHAR2(1)),
- Column("v80_fmt_image", VARCHAR2(3)),
- Column("data_upgraded", VARCHAR2(3)),
- Column("hidden_column", VARCHAR2(3)),
- Column("virtual_column", VARCHAR2(3)),
- Column("segment_column_id", NUMBER),
- Column("internal_column_id", NUMBER, nullable=False),
- Column("histogram", VARCHAR2(15)),
- Column("qualified_col_name", VARCHAR2(4000)),
- Column("user_generated", VARCHAR2(3)),
- Column("default_on_null", VARCHAR2(3)),
- Column("identity_column", VARCHAR2(3)),
- Column("evaluation_edition", VARCHAR2(128)),
- Column("unusable_before", VARCHAR2(128)),
- Column("unusable_beginning", VARCHAR2(128)),
- Column("collation", VARCHAR2(100)),
- Column("collated_column_id", NUMBER),
-).alias("a_tab_cols")
-
-all_tab_comments = Table(
- "all_tab_comments" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("table_name", VARCHAR2(128), nullable=False),
- Column("table_type", VARCHAR2(11)),
- Column("comments", VARCHAR2(4000)),
- Column("origin_con_id", NUMBER),
-).alias("a_tab_comments")
-
-all_col_comments = Table(
- "all_col_comments" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("table_name", VARCHAR2(128), nullable=False),
- Column("column_name", VARCHAR2(128), nullable=False),
- Column("comments", VARCHAR2(4000)),
- Column("origin_con_id", NUMBER),
-).alias("a_col_comments")
-
-all_mview_comments = Table(
- "all_mview_comments" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("mview_name", VARCHAR2(128), nullable=False),
- Column("comments", VARCHAR2(4000)),
-).alias("a_mview_comments")
-
-all_ind_columns = Table(
- "all_ind_columns" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("index_owner", VARCHAR2(128), nullable=False),
- Column("index_name", VARCHAR2(128), nullable=False),
- Column("table_owner", VARCHAR2(128), nullable=False),
- Column("table_name", VARCHAR2(128), nullable=False),
- Column("column_name", VARCHAR2(4000)),
- Column("column_position", NUMBER, nullable=False),
- Column("column_length", NUMBER, nullable=False),
- Column("char_length", NUMBER),
- Column("descend", VARCHAR2(4)),
- Column("collated_column_id", NUMBER),
-).alias("a_ind_columns")
-
-all_indexes = Table(
- "all_indexes" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("index_name", VARCHAR2(128), nullable=False),
- Column("index_type", VARCHAR2(27)),
- Column("table_owner", VARCHAR2(128), nullable=False),
- Column("table_name", VARCHAR2(128), nullable=False),
- Column("table_type", CHAR(11)),
- Column("uniqueness", VARCHAR2(9)),
- Column("compression", VARCHAR2(13)),
- Column("prefix_length", NUMBER),
- Column("tablespace_name", VARCHAR2(30)),
- Column("ini_trans", NUMBER),
- Column("max_trans", NUMBER),
- Column("initial_extent", NUMBER),
- Column("next_extent", NUMBER),
- Column("min_extents", NUMBER),
- Column("max_extents", NUMBER),
- Column("pct_increase", NUMBER),
- Column("pct_threshold", NUMBER),
- Column("include_column", NUMBER),
- Column("freelists", NUMBER),
- Column("freelist_groups", NUMBER),
- Column("pct_free", NUMBER),
- Column("logging", VARCHAR2(3)),
- Column("blevel", NUMBER),
- Column("leaf_blocks", NUMBER),
- Column("distinct_keys", NUMBER),
- Column("avg_leaf_blocks_per_key", NUMBER),
- Column("avg_data_blocks_per_key", NUMBER),
- Column("clustering_factor", NUMBER),
- Column("status", VARCHAR2(8)),
- Column("num_rows", NUMBER),
- Column("sample_size", NUMBER),
- Column("last_analyzed", DATE),
- Column("degree", VARCHAR2(40)),
- Column("instances", VARCHAR2(40)),
- Column("partitioned", VARCHAR2(3)),
- Column("temporary", VARCHAR2(1)),
- Column("generated", VARCHAR2(1)),
- Column("secondary", VARCHAR2(1)),
- Column("buffer_pool", VARCHAR2(7)),
- Column("flash_cache", VARCHAR2(7)),
- Column("cell_flash_cache", VARCHAR2(7)),
- Column("user_stats", VARCHAR2(3)),
- Column("duration", VARCHAR2(15)),
- Column("pct_direct_access", NUMBER),
- Column("ityp_owner", VARCHAR2(128)),
- Column("ityp_name", VARCHAR2(128)),
- Column("parameters", VARCHAR2(1000)),
- Column("global_stats", VARCHAR2(3)),
- Column("domidx_status", VARCHAR2(12)),
- Column("domidx_opstatus", VARCHAR2(6)),
- Column("funcidx_status", VARCHAR2(8)),
- Column("join_index", VARCHAR2(3)),
- Column("iot_redundant_pkey_elim", VARCHAR2(3)),
- Column("dropped", VARCHAR2(3)),
- Column("visibility", VARCHAR2(9)),
- Column("domidx_management", VARCHAR2(14)),
- Column("segment_created", VARCHAR2(3)),
- Column("orphaned_entries", VARCHAR2(3)),
- Column("indexing", VARCHAR2(7)),
- Column("auto", VARCHAR2(3)),
-).alias("a_indexes")
-
-all_ind_expressions = Table(
- "all_ind_expressions" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("index_owner", VARCHAR2(128), nullable=False),
- Column("index_name", VARCHAR2(128), nullable=False),
- Column("table_owner", VARCHAR2(128), nullable=False),
- Column("table_name", VARCHAR2(128), nullable=False),
- Column("column_expression", LONG),
- Column("column_position", NUMBER, nullable=False),
-).alias("a_ind_expressions")
-
-all_constraints = Table(
- "all_constraints" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128)),
- Column("constraint_name", VARCHAR2(128)),
- Column("constraint_type", VARCHAR2(1)),
- Column("table_name", VARCHAR2(128)),
- Column("search_condition", LONG),
- Column("search_condition_vc", VARCHAR2(4000)),
- Column("r_owner", VARCHAR2(128)),
- Column("r_constraint_name", VARCHAR2(128)),
- Column("delete_rule", VARCHAR2(9)),
- Column("status", VARCHAR2(8)),
- Column("deferrable", VARCHAR2(14)),
- Column("deferred", VARCHAR2(9)),
- Column("validated", VARCHAR2(13)),
- Column("generated", VARCHAR2(14)),
- Column("bad", VARCHAR2(3)),
- Column("rely", VARCHAR2(4)),
- Column("last_change", DATE),
- Column("index_owner", VARCHAR2(128)),
- Column("index_name", VARCHAR2(128)),
- Column("invalid", VARCHAR2(7)),
- Column("view_related", VARCHAR2(14)),
- Column("origin_con_id", VARCHAR2(256)),
-).alias("a_constraints")
-
-all_cons_columns = Table(
- "all_cons_columns" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("constraint_name", VARCHAR2(128), nullable=False),
- Column("table_name", VARCHAR2(128), nullable=False),
- Column("column_name", VARCHAR2(4000)),
- Column("position", NUMBER),
-).alias("a_cons_columns")
-
-# TODO figure out if it's still relevant, since there is no mention from here
-# https://docs.oracle.com/en/database/oracle/oracle-database/21/refrn/ALL_DB_LINKS.html
-# original note:
-# using user_db_links here since all_db_links appears
-# to have more restricted permissions.
-# https://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm
-# will need to hear from more users if we are doing
-# the right thing here. See [ticket:2619]
-all_db_links = Table(
- "all_db_links" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("db_link", VARCHAR2(128), nullable=False),
- Column("username", VARCHAR2(128)),
- Column("host", VARCHAR2(2000)),
- Column("created", DATE, nullable=False),
- Column("hidden", VARCHAR2(3)),
- Column("shard_internal", VARCHAR2(3)),
- Column("valid", VARCHAR2(3)),
- Column("intra_cdb", VARCHAR2(3)),
-).alias("a_db_links")
-
-all_synonyms = Table(
- "all_synonyms" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128)),
- Column("synonym_name", VARCHAR2(128)),
- Column("table_owner", VARCHAR2(128)),
- Column("table_name", VARCHAR2(128)),
- Column("db_link", VARCHAR2(128)),
- Column("origin_con_id", VARCHAR2(256)),
-).alias("a_synonyms")
-
-all_objects = Table(
- "all_objects" + DB_LINK_PLACEHOLDER,
- dictionary_meta,
- Column("owner", VARCHAR2(128), nullable=False),
- Column("object_name", VARCHAR2(128), nullable=False),
- Column("subobject_name", VARCHAR2(128)),
- Column("object_id", NUMBER, nullable=False),
- Column("data_object_id", NUMBER),
- Column("object_type", VARCHAR2(23)),
- Column("created", DATE, nullable=False),
- Column("last_ddl_time", DATE, nullable=False),
- Column("timestamp", VARCHAR2(19)),
- Column("status", VARCHAR2(7)),
- Column("temporary", VARCHAR2(1)),
- Column("generated", VARCHAR2(1)),
- Column("secondary", VARCHAR2(1)),
- Column("namespace", NUMBER, nullable=False),
- Column("edition_name", VARCHAR2(128)),
- Column("sharing", VARCHAR2(13)),
- Column("editionable", VARCHAR2(1)),
- Column("oracle_maintained", VARCHAR2(1)),
- Column("application", VARCHAR2(1)),
- Column("default_collation", VARCHAR2(100)),
- Column("duplicated", VARCHAR2(1)),
- Column("sharded", VARCHAR2(1)),
- Column("created_appid", NUMBER),
- Column("created_vsnid", NUMBER),
- Column("modified_appid", NUMBER),
- Column("modified_vsnid", NUMBER),
-).alias("a_objects")
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py
deleted file mode 100644
index 9cdec3b..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py
+++ /dev/null
@@ -1,311 +0,0 @@
-# dialects/oracle/oracledb.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-r"""
-.. dialect:: oracle+oracledb
- :name: python-oracledb
- :dbapi: oracledb
- :connectstring: oracle+oracledb://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]]
- :url: https://oracle.github.io/python-oracledb/
-
-python-oracledb is released by Oracle to supersede the cx_Oracle driver.
-It is fully compatible with cx_Oracle and features both a "thin" client
-mode that requires no dependencies, as well as a "thick" mode that uses
-the Oracle Client Interface in the same way as cx_Oracle.
-
-.. seealso::
-
- :ref:`cx_oracle` - all of cx_Oracle's notes apply to the oracledb driver
- as well.
-
-The SQLAlchemy ``oracledb`` dialect provides both a sync and an async
-implementation under the same dialect name. The proper version is
-selected depending on how the engine is created:
-
-* calling :func:`_sa.create_engine` with ``oracle+oracledb://...`` will
- automatically select the sync version, e.g.::
-
- from sqlalchemy import create_engine
- sync_engine = create_engine("oracle+oracledb://scott:tiger@localhost/?service_name=XEPDB1")
-
-* calling :func:`_asyncio.create_async_engine` with
- ``oracle+oracledb://...`` will automatically select the async version,
- e.g.::
-
- from sqlalchemy.ext.asyncio import create_async_engine
- asyncio_engine = create_async_engine("oracle+oracledb://scott:tiger@localhost/?service_name=XEPDB1")
-
-The asyncio version of the dialect may also be specified explicitly using the
-``oracledb_async`` suffix, as::
-
- from sqlalchemy.ext.asyncio import create_async_engine
- asyncio_engine = create_async_engine("oracle+oracledb_async://scott:tiger@localhost/?service_name=XEPDB1")
-
-.. versionadded:: 2.0.25 added support for the async version of oracledb.
-
-Thick mode support
-------------------
-
-By default the ``python-oracledb`` is started in thin mode, that does not
-require oracle client libraries to be installed in the system. The
-``python-oracledb`` driver also support a "thick" mode, that behaves
-similarly to ``cx_oracle`` and requires that Oracle Client Interface (OCI)
-is installed.
-
-To enable this mode, the user may call ``oracledb.init_oracle_client``
-manually, or by passing the parameter ``thick_mode=True`` to
-:func:`_sa.create_engine`. To pass custom arguments to ``init_oracle_client``,
-like the ``lib_dir`` path, a dict may be passed to this parameter, as in::
-
- engine = sa.create_engine("oracle+oracledb://...", thick_mode={
- "lib_dir": "/path/to/oracle/client/lib", "driver_name": "my-app"
- })
-
-.. seealso::
-
- https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.init_oracle_client
-
-
-.. versionadded:: 2.0.0 added support for oracledb driver.
-
-""" # noqa
-from __future__ import annotations
-
-import collections
-import re
-from typing import Any
-from typing import TYPE_CHECKING
-
-from .cx_oracle import OracleDialect_cx_oracle as _OracleDialect_cx_oracle
-from ... import exc
-from ... import pool
-from ...connectors.asyncio import AsyncAdapt_dbapi_connection
-from ...connectors.asyncio import AsyncAdapt_dbapi_cursor
-from ...connectors.asyncio import AsyncAdaptFallback_dbapi_connection
-from ...util import asbool
-from ...util import await_fallback
-from ...util import await_only
-
-if TYPE_CHECKING:
- from oracledb import AsyncConnection
- from oracledb import AsyncCursor
-
-
-class OracleDialect_oracledb(_OracleDialect_cx_oracle):
- supports_statement_cache = True
- driver = "oracledb"
- _min_version = (1,)
-
- def __init__(
- self,
- auto_convert_lobs=True,
- coerce_to_decimal=True,
- arraysize=None,
- encoding_errors=None,
- thick_mode=None,
- **kwargs,
- ):
- super().__init__(
- auto_convert_lobs,
- coerce_to_decimal,
- arraysize,
- encoding_errors,
- **kwargs,
- )
-
- if self.dbapi is not None and (
- thick_mode or isinstance(thick_mode, dict)
- ):
- kw = thick_mode if isinstance(thick_mode, dict) else {}
- self.dbapi.init_oracle_client(**kw)
-
- @classmethod
- def import_dbapi(cls):
- import oracledb
-
- return oracledb
-
- @classmethod
- def is_thin_mode(cls, connection):
- return connection.connection.dbapi_connection.thin
-
- @classmethod
- def get_async_dialect_cls(cls, url):
- return OracleDialectAsync_oracledb
-
- def _load_version(self, dbapi_module):
- version = (0, 0, 0)
- if dbapi_module is not None:
- m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", dbapi_module.version)
- if m:
- version = tuple(
- int(x) for x in m.group(1, 2, 3) if x is not None
- )
- self.oracledb_ver = version
- if (
- self.oracledb_ver > (0, 0, 0)
- and self.oracledb_ver < self._min_version
- ):
- raise exc.InvalidRequestError(
- f"oracledb version {self._min_version} and above are supported"
- )
-
-
-class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor):
- _cursor: AsyncCursor
- __slots__ = ()
-
- @property
- def outputtypehandler(self):
- return self._cursor.outputtypehandler
-
- @outputtypehandler.setter
- def outputtypehandler(self, value):
- self._cursor.outputtypehandler = value
-
- def var(self, *args, **kwargs):
- return self._cursor.var(*args, **kwargs)
-
- def close(self):
- self._rows.clear()
- self._cursor.close()
-
- def setinputsizes(self, *args: Any, **kwargs: Any) -> Any:
- return self._cursor.setinputsizes(*args, **kwargs)
-
- def _aenter_cursor(self, cursor: AsyncCursor) -> AsyncCursor:
- try:
- return cursor.__enter__()
- except Exception as error:
- self._adapt_connection._handle_exception(error)
-
- async def _execute_async(self, operation, parameters):
- # override to not use mutex, oracledb already has mutex
-
- if parameters is None:
- result = await self._cursor.execute(operation)
- else:
- result = await self._cursor.execute(operation, parameters)
-
- if self._cursor.description and not self.server_side:
- self._rows = collections.deque(await self._cursor.fetchall())
- return result
-
- async def _executemany_async(
- self,
- operation,
- seq_of_parameters,
- ):
- # override to not use mutex, oracledb already has mutex
- return await self._cursor.executemany(operation, seq_of_parameters)
-
- def __enter__(self):
- return self
-
- def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
- self.close()
-
-
-class AsyncAdapt_oracledb_connection(AsyncAdapt_dbapi_connection):
- _connection: AsyncConnection
- __slots__ = ()
-
- thin = True
-
- _cursor_cls = AsyncAdapt_oracledb_cursor
- _ss_cursor_cls = None
-
- @property
- def autocommit(self):
- return self._connection.autocommit
-
- @autocommit.setter
- def autocommit(self, value):
- self._connection.autocommit = value
-
- @property
- def outputtypehandler(self):
- return self._connection.outputtypehandler
-
- @outputtypehandler.setter
- def outputtypehandler(self, value):
- self._connection.outputtypehandler = value
-
- @property
- def version(self):
- return self._connection.version
-
- @property
- def stmtcachesize(self):
- return self._connection.stmtcachesize
-
- @stmtcachesize.setter
- def stmtcachesize(self, value):
- self._connection.stmtcachesize = value
-
- def cursor(self):
- return AsyncAdapt_oracledb_cursor(self)
-
-
-class AsyncAdaptFallback_oracledb_connection(
- AsyncAdaptFallback_dbapi_connection, AsyncAdapt_oracledb_connection
-):
- __slots__ = ()
-
-
-class OracledbAdaptDBAPI:
- def __init__(self, oracledb) -> None:
- self.oracledb = oracledb
-
- for k, v in self.oracledb.__dict__.items():
- if k != "connect":
- self.__dict__[k] = v
-
- def connect(self, *arg, **kw):
- async_fallback = kw.pop("async_fallback", False)
- creator_fn = kw.pop("async_creator_fn", self.oracledb.connect_async)
-
- if asbool(async_fallback):
- return AsyncAdaptFallback_oracledb_connection(
- self, await_fallback(creator_fn(*arg, **kw))
- )
-
- else:
- return AsyncAdapt_oracledb_connection(
- self, await_only(creator_fn(*arg, **kw))
- )
-
-
-class OracleDialectAsync_oracledb(OracleDialect_oracledb):
- is_async = True
- supports_statement_cache = True
-
- _min_version = (2,)
-
- # thick_mode mode is not supported by asyncio, oracledb will raise
- @classmethod
- def import_dbapi(cls):
- import oracledb
-
- return OracledbAdaptDBAPI(oracledb)
-
- @classmethod
- def get_pool_class(cls, url):
- async_fallback = url.query.get("async_fallback", False)
-
- if asbool(async_fallback):
- return pool.FallbackAsyncAdaptedQueuePool
- else:
- return pool.AsyncAdaptedQueuePool
-
- def get_driver_connection(self, connection):
- return connection._connection
-
-
-dialect = OracleDialect_oracledb
-dialect_async = OracleDialectAsync_oracledb
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py
deleted file mode 100644
index b33c152..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py
+++ /dev/null
@@ -1,220 +0,0 @@
-# dialects/oracle/provision.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-from ... import create_engine
-from ... import exc
-from ... import inspect
-from ...engine import url as sa_url
-from ...testing.provision import configure_follower
-from ...testing.provision import create_db
-from ...testing.provision import drop_all_schema_objects_post_tables
-from ...testing.provision import drop_all_schema_objects_pre_tables
-from ...testing.provision import drop_db
-from ...testing.provision import follower_url_from_main
-from ...testing.provision import log
-from ...testing.provision import post_configure_engine
-from ...testing.provision import run_reap_dbs
-from ...testing.provision import set_default_schema_on_connection
-from ...testing.provision import stop_test_class_outside_fixtures
-from ...testing.provision import temp_table_keyword_args
-from ...testing.provision import update_db_opts
-
-
-@create_db.for_db("oracle")
-def _oracle_create_db(cfg, eng, ident):
- # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
- # similar, so that the default tablespace is not "system"; reflection will
- # fail otherwise
- with eng.begin() as conn:
- conn.exec_driver_sql("create user %s identified by xe" % ident)
- conn.exec_driver_sql("create user %s_ts1 identified by xe" % ident)
- conn.exec_driver_sql("create user %s_ts2 identified by xe" % ident)
- conn.exec_driver_sql("grant dba to %s" % (ident,))
- conn.exec_driver_sql("grant unlimited tablespace to %s" % ident)
- conn.exec_driver_sql("grant unlimited tablespace to %s_ts1" % ident)
- conn.exec_driver_sql("grant unlimited tablespace to %s_ts2" % ident)
- # these are needed to create materialized views
- conn.exec_driver_sql("grant create table to %s" % ident)
- conn.exec_driver_sql("grant create table to %s_ts1" % ident)
- conn.exec_driver_sql("grant create table to %s_ts2" % ident)
-
-
-@configure_follower.for_db("oracle")
-def _oracle_configure_follower(config, ident):
- config.test_schema = "%s_ts1" % ident
- config.test_schema_2 = "%s_ts2" % ident
-
-
-def _ora_drop_ignore(conn, dbname):
- try:
- conn.exec_driver_sql("drop user %s cascade" % dbname)
- log.info("Reaped db: %s", dbname)
- return True
- except exc.DatabaseError as err:
- log.warning("couldn't drop db: %s", err)
- return False
-
-
-@drop_all_schema_objects_pre_tables.for_db("oracle")
-def _ora_drop_all_schema_objects_pre_tables(cfg, eng):
- _purge_recyclebin(eng)
- _purge_recyclebin(eng, cfg.test_schema)
-
-
-@drop_all_schema_objects_post_tables.for_db("oracle")
-def _ora_drop_all_schema_objects_post_tables(cfg, eng):
- with eng.begin() as conn:
- for syn in conn.dialect._get_synonyms(conn, None, None, None):
- conn.exec_driver_sql(f"drop synonym {syn['synonym_name']}")
-
- for syn in conn.dialect._get_synonyms(
- conn, cfg.test_schema, None, None
- ):
- conn.exec_driver_sql(
- f"drop synonym {cfg.test_schema}.{syn['synonym_name']}"
- )
-
- for tmp_table in inspect(conn).get_temp_table_names():
- conn.exec_driver_sql(f"drop table {tmp_table}")
-
-
-@drop_db.for_db("oracle")
-def _oracle_drop_db(cfg, eng, ident):
- with eng.begin() as conn:
- # cx_Oracle seems to occasionally leak open connections when a large
- # suite it run, even if we confirm we have zero references to
- # connection objects.
- # while there is a "kill session" command in Oracle,
- # it unfortunately does not release the connection sufficiently.
- _ora_drop_ignore(conn, ident)
- _ora_drop_ignore(conn, "%s_ts1" % ident)
- _ora_drop_ignore(conn, "%s_ts2" % ident)
-
-
-@stop_test_class_outside_fixtures.for_db("oracle")
-def _ora_stop_test_class_outside_fixtures(config, db, cls):
- try:
- _purge_recyclebin(db)
- except exc.DatabaseError as err:
- log.warning("purge recyclebin command failed: %s", err)
-
- # clear statement cache on all connections that were used
- # https://github.com/oracle/python-cx_Oracle/issues/519
-
- for cx_oracle_conn in _all_conns:
- try:
- sc = cx_oracle_conn.stmtcachesize
- except db.dialect.dbapi.InterfaceError:
- # connection closed
- pass
- else:
- cx_oracle_conn.stmtcachesize = 0
- cx_oracle_conn.stmtcachesize = sc
- _all_conns.clear()
-
-
-def _purge_recyclebin(eng, schema=None):
- with eng.begin() as conn:
- if schema is None:
- # run magic command to get rid of identity sequences
- # https://floo.bar/2019/11/29/drop-the-underlying-sequence-of-an-identity-column/ # noqa: E501
- conn.exec_driver_sql("purge recyclebin")
- else:
- # per user: https://community.oracle.com/tech/developers/discussion/2255402/how-to-clear-dba-recyclebin-for-a-particular-user # noqa: E501
- for owner, object_name, type_ in conn.exec_driver_sql(
- "select owner, object_name,type from "
- "dba_recyclebin where owner=:schema and type='TABLE'",
- {"schema": conn.dialect.denormalize_name(schema)},
- ).all():
- conn.exec_driver_sql(f'purge {type_} {owner}."{object_name}"')
-
-
-_all_conns = set()
-
-
-@post_configure_engine.for_db("oracle")
-def _oracle_post_configure_engine(url, engine, follower_ident):
- from sqlalchemy import event
-
- @event.listens_for(engine, "checkout")
- def checkout(dbapi_con, con_record, con_proxy):
- _all_conns.add(dbapi_con)
-
- @event.listens_for(engine, "checkin")
- def checkin(dbapi_connection, connection_record):
- # work around cx_Oracle issue:
- # https://github.com/oracle/python-cx_Oracle/issues/530
- # invalidate oracle connections that had 2pc set up
- if "cx_oracle_xid" in connection_record.info:
- connection_record.invalidate()
-
-
-@run_reap_dbs.for_db("oracle")
-def _reap_oracle_dbs(url, idents):
- log.info("db reaper connecting to %r", url)
- eng = create_engine(url)
- with eng.begin() as conn:
- log.info("identifiers in file: %s", ", ".join(idents))
-
- to_reap = conn.exec_driver_sql(
- "select u.username from all_users u where username "
- "like 'TEST_%' and not exists (select username "
- "from v$session where username=u.username)"
- )
- all_names = {username.lower() for (username,) in to_reap}
- to_drop = set()
- for name in all_names:
- if name.endswith("_ts1") or name.endswith("_ts2"):
- continue
- elif name in idents:
- to_drop.add(name)
- if "%s_ts1" % name in all_names:
- to_drop.add("%s_ts1" % name)
- if "%s_ts2" % name in all_names:
- to_drop.add("%s_ts2" % name)
-
- dropped = total = 0
- for total, username in enumerate(to_drop, 1):
- if _ora_drop_ignore(conn, username):
- dropped += 1
- log.info(
- "Dropped %d out of %d stale databases detected", dropped, total
- )
-
-
-@follower_url_from_main.for_db("oracle")
-def _oracle_follower_url_from_main(url, ident):
- url = sa_url.make_url(url)
- return url.set(username=ident, password="xe")
-
-
-@temp_table_keyword_args.for_db("oracle")
-def _oracle_temp_table_keyword_args(cfg, eng):
- return {
- "prefixes": ["GLOBAL TEMPORARY"],
- "oracle_on_commit": "PRESERVE ROWS",
- }
-
-
-@set_default_schema_on_connection.for_db("oracle")
-def _oracle_set_default_schema_on_connection(
- cfg, dbapi_connection, schema_name
-):
- cursor = dbapi_connection.cursor()
- cursor.execute("ALTER SESSION SET CURRENT_SCHEMA=%s" % schema_name)
- cursor.close()
-
-
-@update_db_opts.for_db("oracle")
-def _update_db_opts(db_url, db_opts, options):
- """Set database options (db_opts) for a test database that we created."""
- if (
- options.oracledb_thick_mode
- and sa_url.make_url(db_url).get_driver_name() == "oracledb"
- ):
- db_opts["thick_mode"] = True
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py
deleted file mode 100644
index 36caaa0..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py
+++ /dev/null
@@ -1,287 +0,0 @@
-# dialects/oracle/types.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-from __future__ import annotations
-
-import datetime as dt
-from typing import Optional
-from typing import Type
-from typing import TYPE_CHECKING
-
-from ... import exc
-from ...sql import sqltypes
-from ...types import NVARCHAR
-from ...types import VARCHAR
-
-if TYPE_CHECKING:
- from ...engine.interfaces import Dialect
- from ...sql.type_api import _LiteralProcessorType
-
-
-class RAW(sqltypes._Binary):
- __visit_name__ = "RAW"
-
-
-OracleRaw = RAW
-
-
-class NCLOB(sqltypes.Text):
- __visit_name__ = "NCLOB"
-
-
-class VARCHAR2(VARCHAR):
- __visit_name__ = "VARCHAR2"
-
-
-NVARCHAR2 = NVARCHAR
-
-
-class NUMBER(sqltypes.Numeric, sqltypes.Integer):
- __visit_name__ = "NUMBER"
-
- def __init__(self, precision=None, scale=None, asdecimal=None):
- if asdecimal is None:
- asdecimal = bool(scale and scale > 0)
-
- super().__init__(precision=precision, scale=scale, asdecimal=asdecimal)
-
- def adapt(self, impltype):
- ret = super().adapt(impltype)
- # leave a hint for the DBAPI handler
- ret._is_oracle_number = True
- return ret
-
- @property
- def _type_affinity(self):
- if bool(self.scale and self.scale > 0):
- return sqltypes.Numeric
- else:
- return sqltypes.Integer
-
-
-class FLOAT(sqltypes.FLOAT):
- """Oracle FLOAT.
-
- This is the same as :class:`_sqltypes.FLOAT` except that
- an Oracle-specific :paramref:`_oracle.FLOAT.binary_precision`
- parameter is accepted, and
- the :paramref:`_sqltypes.Float.precision` parameter is not accepted.
-
- Oracle FLOAT types indicate precision in terms of "binary precision", which
- defaults to 126. For a REAL type, the value is 63. This parameter does not
- cleanly map to a specific number of decimal places but is roughly
- equivalent to the desired number of decimal places divided by 0.3103.
-
- .. versionadded:: 2.0
-
- """
-
- __visit_name__ = "FLOAT"
-
- def __init__(
- self,
- binary_precision=None,
- asdecimal=False,
- decimal_return_scale=None,
- ):
- r"""
- Construct a FLOAT
-
- :param binary_precision: Oracle binary precision value to be rendered
- in DDL. This may be approximated to the number of decimal characters
- using the formula "decimal precision = 0.30103 * binary precision".
- The default value used by Oracle for FLOAT / DOUBLE PRECISION is 126.
-
- :param asdecimal: See :paramref:`_sqltypes.Float.asdecimal`
-
- :param decimal_return_scale: See
- :paramref:`_sqltypes.Float.decimal_return_scale`
-
- """
- super().__init__(
- asdecimal=asdecimal, decimal_return_scale=decimal_return_scale
- )
- self.binary_precision = binary_precision
-
-
-class BINARY_DOUBLE(sqltypes.Double):
- __visit_name__ = "BINARY_DOUBLE"
-
-
-class BINARY_FLOAT(sqltypes.Float):
- __visit_name__ = "BINARY_FLOAT"
-
-
-class BFILE(sqltypes.LargeBinary):
- __visit_name__ = "BFILE"
-
-
-class LONG(sqltypes.Text):
- __visit_name__ = "LONG"
-
-
-class _OracleDateLiteralRender:
- def _literal_processor_datetime(self, dialect):
- def process(value):
- if getattr(value, "microsecond", None):
- value = (
- f"""TO_TIMESTAMP"""
- f"""('{value.isoformat().replace("T", " ")}', """
- """'YYYY-MM-DD HH24:MI:SS.FF')"""
- )
- else:
- value = (
- f"""TO_DATE"""
- f"""('{value.isoformat().replace("T", " ")}', """
- """'YYYY-MM-DD HH24:MI:SS')"""
- )
- return value
-
- return process
-
- def _literal_processor_date(self, dialect):
- def process(value):
- if getattr(value, "microsecond", None):
- value = (
- f"""TO_TIMESTAMP"""
- f"""('{value.isoformat().split("T")[0]}', """
- """'YYYY-MM-DD')"""
- )
- else:
- value = (
- f"""TO_DATE"""
- f"""('{value.isoformat().split("T")[0]}', """
- """'YYYY-MM-DD')"""
- )
- return value
-
- return process
-
-
-class DATE(_OracleDateLiteralRender, sqltypes.DateTime):
- """Provide the oracle DATE type.
-
- This type has no special Python behavior, except that it subclasses
- :class:`_types.DateTime`; this is to suit the fact that the Oracle
- ``DATE`` type supports a time value.
-
- """
-
- __visit_name__ = "DATE"
-
- def literal_processor(self, dialect):
- return self._literal_processor_datetime(dialect)
-
- def _compare_type_affinity(self, other):
- return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
-
-
-class _OracleDate(_OracleDateLiteralRender, sqltypes.Date):
- def literal_processor(self, dialect):
- return self._literal_processor_date(dialect)
-
-
-class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
- __visit_name__ = "INTERVAL"
-
- def __init__(self, day_precision=None, second_precision=None):
- """Construct an INTERVAL.
-
- Note that only DAY TO SECOND intervals are currently supported.
- This is due to a lack of support for YEAR TO MONTH intervals
- within available DBAPIs.
-
- :param day_precision: the day precision value. this is the number of
- digits to store for the day field. Defaults to "2"
- :param second_precision: the second precision value. this is the
- number of digits to store for the fractional seconds field.
- Defaults to "6".
-
- """
- self.day_precision = day_precision
- self.second_precision = second_precision
-
- @classmethod
- def _adapt_from_generic_interval(cls, interval):
- return INTERVAL(
- day_precision=interval.day_precision,
- second_precision=interval.second_precision,
- )
-
- @classmethod
- def adapt_emulated_to_native(
- cls, interval: sqltypes.Interval, **kw # type: ignore[override]
- ):
- return INTERVAL(
- day_precision=interval.day_precision,
- second_precision=interval.second_precision,
- )
-
- @property
- def _type_affinity(self):
- return sqltypes.Interval
-
- def as_generic(self, allow_nulltype=False):
- return sqltypes.Interval(
- native=True,
- second_precision=self.second_precision,
- day_precision=self.day_precision,
- )
-
- @property
- def python_type(self) -> Type[dt.timedelta]:
- return dt.timedelta
-
- def literal_processor(
- self, dialect: Dialect
- ) -> Optional[_LiteralProcessorType[dt.timedelta]]:
- def process(value: dt.timedelta) -> str:
- return f"NUMTODSINTERVAL({value.total_seconds()}, 'SECOND')"
-
- return process
-
-
-class TIMESTAMP(sqltypes.TIMESTAMP):
- """Oracle implementation of ``TIMESTAMP``, which supports additional
- Oracle-specific modes
-
- .. versionadded:: 2.0
-
- """
-
- def __init__(self, timezone: bool = False, local_timezone: bool = False):
- """Construct a new :class:`_oracle.TIMESTAMP`.
-
- :param timezone: boolean. Indicates that the TIMESTAMP type should
- use Oracle's ``TIMESTAMP WITH TIME ZONE`` datatype.
-
- :param local_timezone: boolean. Indicates that the TIMESTAMP type
- should use Oracle's ``TIMESTAMP WITH LOCAL TIME ZONE`` datatype.
-
-
- """
- if timezone and local_timezone:
- raise exc.ArgumentError(
- "timezone and local_timezone are mutually exclusive"
- )
- super().__init__(timezone=timezone)
- self.local_timezone = local_timezone
-
-
-class ROWID(sqltypes.TypeEngine):
- """Oracle ROWID type.
-
- When used in a cast() or similar, generates ROWID.
-
- """
-
- __visit_name__ = "ROWID"
-
-
-class _OracleBoolean(sqltypes.Boolean):
- def get_dbapi_type(self, dbapi):
- return dbapi.NUMBER