diff options
author | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 |
---|---|---|
committer | cyfraeviolae <cyfraeviolae> | 2024-04-03 03:17:55 -0400 |
commit | 12cf076118570eebbff08c6b3090e0d4798447a1 (patch) | |
tree | 3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle | |
parent | c45662ff3923b34614ddcc8feb9195541166dcc5 (diff) |
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle')
14 files changed, 0 insertions, 6124 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py deleted file mode 100644 index d855122..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__init__.py +++ /dev/null @@ -1,67 +0,0 @@ -# dialects/oracle/__init__.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors -from types import ModuleType - -from . import base # noqa -from . import cx_oracle # noqa -from . import oracledb # noqa -from .base import BFILE -from .base import BINARY_DOUBLE -from .base import BINARY_FLOAT -from .base import BLOB -from .base import CHAR -from .base import CLOB -from .base import DATE -from .base import DOUBLE_PRECISION -from .base import FLOAT -from .base import INTERVAL -from .base import LONG -from .base import NCHAR -from .base import NCLOB -from .base import NUMBER -from .base import NVARCHAR -from .base import NVARCHAR2 -from .base import RAW -from .base import REAL -from .base import ROWID -from .base import TIMESTAMP -from .base import VARCHAR -from .base import VARCHAR2 - -# Alias oracledb also as oracledb_async -oracledb_async = type( - "oracledb_async", (ModuleType,), {"dialect": oracledb.dialect_async} -) - -base.dialect = dialect = cx_oracle.dialect - -__all__ = ( - "VARCHAR", - "NVARCHAR", - "CHAR", - "NCHAR", - "DATE", - "NUMBER", - "BLOB", - "BFILE", - "CLOB", - "NCLOB", - "TIMESTAMP", - "RAW", - "FLOAT", - "DOUBLE_PRECISION", - "BINARY_DOUBLE", - "BINARY_FLOAT", - "LONG", - "dialect", - "INTERVAL", - "VARCHAR2", - "NVARCHAR2", - "ROWID", - "REAL", -) diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pyc Binary files differdeleted file mode 100644 index cc02ead..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/__init__.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pyc Binary files differdeleted file mode 100644 index bf594ef..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/base.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pyc Binary files differdeleted file mode 100644 index 9e8e947..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/cx_oracle.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pyc Binary files differdeleted file mode 100644 index 89ce69c..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/dictionary.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pyc Binary files differdeleted file mode 100644 index 9325524..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/oracledb.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pyc Binary files differdeleted file mode 100644 index 6d3c52d..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/provision.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pyc Binary files differdeleted file mode 100644 index 24bfa8d..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/__pycache__/types.cpython-311.pyc +++ /dev/null diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py deleted file mode 100644 index a548b34..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/base.py +++ /dev/null @@ -1,3240 +0,0 @@ -# dialects/oracle/base.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -r""" -.. dialect:: oracle - :name: Oracle - :full_support: 18c - :normal_support: 11+ - :best_effort: 9+ - - -Auto Increment Behavior ------------------------ - -SQLAlchemy Table objects which include integer primary keys are usually -assumed to have "autoincrementing" behavior, meaning they can generate their -own primary key values upon INSERT. For use within Oracle, two options are -available, which are the use of IDENTITY columns (Oracle 12 and above only) -or the association of a SEQUENCE with the column. - -Specifying GENERATED AS IDENTITY (Oracle 12 and above) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Starting from version 12 Oracle can make use of identity columns using -the :class:`_sql.Identity` to specify the autoincrementing behavior:: - - t = Table('mytable', metadata, - Column('id', Integer, Identity(start=3), primary_key=True), - Column(...), ... - ) - -The CREATE TABLE for the above :class:`_schema.Table` object would be: - -.. sourcecode:: sql - - CREATE TABLE mytable ( - id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3), - ..., - PRIMARY KEY (id) - ) - -The :class:`_schema.Identity` object support many options to control the -"autoincrementing" behavior of the column, like the starting value, the -incrementing value, etc. -In addition to the standard options, Oracle supports setting -:paramref:`_schema.Identity.always` to ``None`` to use the default -generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports -setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL -in conjunction with a 'BY DEFAULT' identity column. - -Using a SEQUENCE (all Oracle versions) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Older version of Oracle had no "autoincrement" -feature, SQLAlchemy relies upon sequences to produce these values. With the -older Oracle versions, *a sequence must always be explicitly specified to -enable autoincrement*. This is divergent with the majority of documentation -examples which assume the usage of an autoincrement-capable database. To -specify sequences, use the sqlalchemy.schema.Sequence object which is passed -to a Column construct:: - - t = Table('mytable', metadata, - Column('id', Integer, Sequence('id_seq', start=1), primary_key=True), - Column(...), ... - ) - -This step is also required when using table reflection, i.e. autoload_with=engine:: - - t = Table('mytable', metadata, - Column('id', Integer, Sequence('id_seq', start=1), primary_key=True), - autoload_with=engine - ) - -.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct - in a :class:`_schema.Column` to specify the option of an autoincrementing - column. - -.. _oracle_isolation_level: - -Transaction Isolation Level / Autocommit ----------------------------------------- - -The Oracle database supports "READ COMMITTED" and "SERIALIZABLE" modes of -isolation. The AUTOCOMMIT isolation level is also supported by the cx_Oracle -dialect. - -To set using per-connection execution options:: - - connection = engine.connect() - connection = connection.execution_options( - isolation_level="AUTOCOMMIT" - ) - -For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle dialect sets the -level at the session level using ``ALTER SESSION``, which is reverted back -to its default setting when the connection is returned to the connection -pool. - -Valid values for ``isolation_level`` include: - -* ``READ COMMITTED`` -* ``AUTOCOMMIT`` -* ``SERIALIZABLE`` - -.. note:: The implementation for the - :meth:`_engine.Connection.get_isolation_level` method as implemented by the - Oracle dialect necessarily forces the start of a transaction using the - Oracle LOCAL_TRANSACTION_ID function; otherwise no level is normally - readable. - - Additionally, the :meth:`_engine.Connection.get_isolation_level` method will - raise an exception if the ``v$transaction`` view is not available due to - permissions or other reasons, which is a common occurrence in Oracle - installations. - - The cx_Oracle dialect attempts to call the - :meth:`_engine.Connection.get_isolation_level` method when the dialect makes - its first connection to the database in order to acquire the - "default"isolation level. This default level is necessary so that the level - can be reset on a connection after it has been temporarily modified using - :meth:`_engine.Connection.execution_options` method. In the common event - that the :meth:`_engine.Connection.get_isolation_level` method raises an - exception due to ``v$transaction`` not being readable as well as any other - database-related failure, the level is assumed to be "READ COMMITTED". No - warning is emitted for this initial first-connect condition as it is - expected to be a common restriction on Oracle databases. - -.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_oracle dialect - as well as the notion of a default isolation level - -.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live - reading of the isolation level. - -.. versionchanged:: 1.3.22 In the event that the default isolation - level cannot be read due to permissions on the v$transaction view as - is common in Oracle installations, the default isolation level is hardcoded - to "READ COMMITTED" which was the behavior prior to 1.3.21. - -.. seealso:: - - :ref:`dbapi_autocommit` - -Identifier Casing ------------------ - -In Oracle, the data dictionary represents all case insensitive identifier -names using UPPERCASE text. SQLAlchemy on the other hand considers an -all-lower case identifier name to be case insensitive. The Oracle dialect -converts all case insensitive identifiers to and from those two formats during -schema level communication, such as reflection of tables and indexes. Using -an UPPERCASE name on the SQLAlchemy side indicates a case sensitive -identifier, and SQLAlchemy will quote the name - this will cause mismatches -against data dictionary data received from Oracle, so unless identifier names -have been truly created as case sensitive (i.e. using quoted names), all -lowercase names should be used on the SQLAlchemy side. - -.. _oracle_max_identifier_lengths: - -Max Identifier Lengths ----------------------- - -Oracle has changed the default max identifier length as of Oracle Server -version 12.2. Prior to this version, the length was 30, and for 12.2 and -greater it is now 128. This change impacts SQLAlchemy in the area of -generated SQL label names as well as the generation of constraint names, -particularly in the case where the constraint naming convention feature -described at :ref:`constraint_naming_conventions` is being used. - -To assist with this change and others, Oracle includes the concept of a -"compatibility" version, which is a version number that is independent of the -actual server version in order to assist with migration of Oracle databases, -and may be configured within the Oracle server itself. This compatibility -version is retrieved using the query ``SELECT value FROM v$parameter WHERE -name = 'compatible';``. The SQLAlchemy Oracle dialect, when tasked with -determining the default max identifier length, will attempt to use this query -upon first connect in order to determine the effective compatibility version of -the server, which determines what the maximum allowed identifier length is for -the server. If the table is not available, the server version information is -used instead. - -As of SQLAlchemy 1.4, the default max identifier length for the Oracle dialect -is 128 characters. Upon first connect, the compatibility version is detected -and if it is less than Oracle version 12.2, the max identifier length is -changed to be 30 characters. In all cases, setting the -:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this -change and the value given will be used as is:: - - engine = create_engine( - "oracle+cx_oracle://scott:tiger@oracle122", - max_identifier_length=30) - -The maximum identifier length comes into play both when generating anonymized -SQL labels in SELECT statements, but more crucially when generating constraint -names from a naming convention. It is this area that has created the need for -SQLAlchemy to change this default conservatively. For example, the following -naming convention produces two very different constraint names based on the -identifier length:: - - from sqlalchemy import Column - from sqlalchemy import Index - from sqlalchemy import Integer - from sqlalchemy import MetaData - from sqlalchemy import Table - from sqlalchemy.dialects import oracle - from sqlalchemy.schema import CreateIndex - - m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"}) - - t = Table( - "t", - m, - Column("some_column_name_1", Integer), - Column("some_column_name_2", Integer), - Column("some_column_name_3", Integer), - ) - - ix = Index( - None, - t.c.some_column_name_1, - t.c.some_column_name_2, - t.c.some_column_name_3, - ) - - oracle_dialect = oracle.dialect(max_identifier_length=30) - print(CreateIndex(ix).compile(dialect=oracle_dialect)) - -With an identifier length of 30, the above CREATE INDEX looks like:: - - CREATE INDEX ix_some_column_name_1s_70cd ON t - (some_column_name_1, some_column_name_2, some_column_name_3) - -However with length=128, it becomes:: - - CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t - (some_column_name_1, some_column_name_2, some_column_name_3) - -Applications which have run versions of SQLAlchemy prior to 1.4 on an Oracle -server version 12.2 or greater are therefore subject to the scenario of a -database migration that wishes to "DROP CONSTRAINT" on a name that was -previously generated with the shorter length. This migration will fail when -the identifier length is changed without the name of the index or constraint -first being adjusted. Such applications are strongly advised to make use of -:paramref:`_sa.create_engine.max_identifier_length` -in order to maintain control -of the generation of truncated names, and to fully review and test all database -migrations in a staging environment when changing this value to ensure that the -impact of this change has been mitigated. - -.. versionchanged:: 1.4 the default max_identifier_length for Oracle is 128 - characters, which is adjusted down to 30 upon first connect if an older - version of Oracle server (compatibility version < 12.2) is detected. - - -LIMIT/OFFSET/FETCH Support --------------------------- - -Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make -use of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming -Oracle 12c or above, and assuming the SELECT statement is not embedded within -a compound statement like UNION. This syntax is also available directly by using -the :meth:`_sql.Select.fetch` method. - -.. versionchanged:: 2.0 the Oracle dialect now uses - ``FETCH FIRST N ROW / OFFSET N ROWS`` for all - :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` usage including - within the ORM and legacy :class:`_orm.Query`. To force the legacy - behavior using window functions, specify the ``enable_offset_fetch=False`` - dialect parameter to :func:`_sa.create_engine`. - -The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle version -by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`, which -will force the use of "legacy" mode that makes use of window functions. -This mode is also selected automatically when using a version of Oracle -prior to 12c. - -When using legacy mode, or when a :class:`.Select` statement -with limit/offset is embedded in a compound statement, an emulated approach for -LIMIT / OFFSET based on window functions is used, which involves creation of a -subquery using ``ROW_NUMBER`` that is prone to performance issues as well as -SQL construction issues for complex statements. However, this approach is -supported by all Oracle versions. See notes below. - -Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the -ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an -Oracle version prior to 12c, the following notes apply: - -* SQLAlchemy currently makes use of ROWNUM to achieve - LIMIT/OFFSET; the exact methodology is taken from - https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results . - -* the "FIRST_ROWS()" optimization keyword is not used by default. To enable - the usage of this optimization directive, specify ``optimize_limits=True`` - to :func:`_sa.create_engine`. - - .. versionchanged:: 1.4 - The Oracle dialect renders limit/offset integer values using a "post - compile" scheme which renders the integer directly before passing the - statement to the cursor for execution. The ``use_binds_for_limits`` flag - no longer has an effect. - - .. seealso:: - - :ref:`change_4808`. - -.. _oracle_returning: - -RETURNING Support ------------------ - -The Oracle database supports RETURNING fully for INSERT, UPDATE and DELETE -statements that are invoked with a single collection of bound parameters -(that is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally -support RETURNING with :term:`executemany` statements). Multiple rows may be -returned as well. - -.. versionchanged:: 2.0 the Oracle backend has full support for RETURNING - on parity with other backends. - - - -ON UPDATE CASCADE ------------------ - -Oracle doesn't have native ON UPDATE CASCADE functionality. A trigger based -solution is available at -https://asktom.oracle.com/tkyte/update_cascade/index.html . - -When using the SQLAlchemy ORM, the ORM has limited ability to manually issue -cascading updates - specify ForeignKey objects using the -"deferrable=True, initially='deferred'" keyword arguments, -and specify "passive_updates=False" on each relationship(). - -Oracle 8 Compatibility ----------------------- - -.. warning:: The status of Oracle 8 compatibility is not known for SQLAlchemy - 2.0. - -When Oracle 8 is detected, the dialect internally configures itself to the -following behaviors: - -* the use_ansi flag is set to False. This has the effect of converting all - JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN - makes use of Oracle's (+) operator. - -* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when - the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued - instead. This because these types don't seem to work correctly on Oracle 8 - even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and - :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate - NVARCHAR2 and NCLOB. - - -Synonym/DBLINK Reflection -------------------------- - -When using reflection with Table objects, the dialect can optionally search -for tables indicated by synonyms, either in local or remote schemas or -accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as -a keyword argument to the :class:`_schema.Table` construct:: - - some_table = Table('some_table', autoload_with=some_engine, - oracle_resolve_synonyms=True) - -When this flag is set, the given name (such as ``some_table`` above) will -be searched not just in the ``ALL_TABLES`` view, but also within the -``ALL_SYNONYMS`` view to see if this name is actually a synonym to another -name. If the synonym is located and refers to a DBLINK, the oracle dialect -knows how to locate the table's information using DBLINK syntax(e.g. -``@dblink``). - -``oracle_resolve_synonyms`` is accepted wherever reflection arguments are -accepted, including methods such as :meth:`_schema.MetaData.reflect` and -:meth:`_reflection.Inspector.get_columns`. - -If synonyms are not in use, this flag should be left disabled. - -.. _oracle_constraint_reflection: - -Constraint Reflection ---------------------- - -The Oracle dialect can return information about foreign key, unique, and -CHECK constraints, as well as indexes on tables. - -Raw information regarding these constraints can be acquired using -:meth:`_reflection.Inspector.get_foreign_keys`, -:meth:`_reflection.Inspector.get_unique_constraints`, -:meth:`_reflection.Inspector.get_check_constraints`, and -:meth:`_reflection.Inspector.get_indexes`. - -.. versionchanged:: 1.2 The Oracle dialect can now reflect UNIQUE and - CHECK constraints. - -When using reflection at the :class:`_schema.Table` level, the -:class:`_schema.Table` -will also include these constraints. - -Note the following caveats: - -* When using the :meth:`_reflection.Inspector.get_check_constraints` method, - Oracle - builds a special "IS NOT NULL" constraint for columns that specify - "NOT NULL". This constraint is **not** returned by default; to include - the "IS NOT NULL" constraints, pass the flag ``include_all=True``:: - - from sqlalchemy import create_engine, inspect - - engine = create_engine("oracle+cx_oracle://s:t@dsn") - inspector = inspect(engine) - all_check_constraints = inspector.get_check_constraints( - "some_table", include_all=True) - -* in most cases, when reflecting a :class:`_schema.Table`, - a UNIQUE constraint will - **not** be available as a :class:`.UniqueConstraint` object, as Oracle - mirrors unique constraints with a UNIQUE index in most cases (the exception - seems to be when two or more unique constraints represent the same columns); - the :class:`_schema.Table` will instead represent these using - :class:`.Index` - with the ``unique=True`` flag set. - -* Oracle creates an implicit index for the primary key of a table; this index - is **excluded** from all index results. - -* the list of columns reflected for an index will not include column names - that start with SYS_NC. - -Table names with SYSTEM/SYSAUX tablespaces -------------------------------------------- - -The :meth:`_reflection.Inspector.get_table_names` and -:meth:`_reflection.Inspector.get_temp_table_names` -methods each return a list of table names for the current engine. These methods -are also part of the reflection which occurs within an operation such as -:meth:`_schema.MetaData.reflect`. By default, -these operations exclude the ``SYSTEM`` -and ``SYSAUX`` tablespaces from the operation. In order to change this, the -default list of tablespaces excluded can be changed at the engine level using -the ``exclude_tablespaces`` parameter:: - - # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM - e = create_engine( - "oracle+cx_oracle://scott:tiger@xe", - exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"]) - -DateTime Compatibility ----------------------- - -Oracle has no datatype known as ``DATETIME``, it instead has only ``DATE``, -which can actually store a date and time value. For this reason, the Oracle -dialect provides a type :class:`_oracle.DATE` which is a subclass of -:class:`.DateTime`. This type has no special behavior, and is only -present as a "marker" for this type; additionally, when a database column -is reflected and the type is reported as ``DATE``, the time-supporting -:class:`_oracle.DATE` type is used. - -.. _oracle_table_options: - -Oracle Table Options -------------------------- - -The CREATE TABLE phrase supports the following options with Oracle -in conjunction with the :class:`_schema.Table` construct: - - -* ``ON COMMIT``:: - - Table( - "some_table", metadata, ..., - prefixes=['GLOBAL TEMPORARY'], oracle_on_commit='PRESERVE ROWS') - -* ``COMPRESS``:: - - Table('mytable', metadata, Column('data', String(32)), - oracle_compress=True) - - Table('mytable', metadata, Column('data', String(32)), - oracle_compress=6) - - The ``oracle_compress`` parameter accepts either an integer compression - level, or ``True`` to use the default compression level. - -.. _oracle_index_options: - -Oracle Specific Index Options ------------------------------ - -Bitmap Indexes -~~~~~~~~~~~~~~ - -You can specify the ``oracle_bitmap`` parameter to create a bitmap index -instead of a B-tree index:: - - Index('my_index', my_table.c.data, oracle_bitmap=True) - -Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not -check for such limitations, only the database will. - -Index compression -~~~~~~~~~~~~~~~~~ - -Oracle has a more efficient storage mode for indexes containing lots of -repeated values. Use the ``oracle_compress`` parameter to turn on key -compression:: - - Index('my_index', my_table.c.data, oracle_compress=True) - - Index('my_index', my_table.c.data1, my_table.c.data2, unique=True, - oracle_compress=1) - -The ``oracle_compress`` parameter accepts either an integer specifying the -number of prefix columns to compress, or ``True`` to use the default (all -columns for non-unique indexes, all but the last column for unique indexes). - -""" # noqa - -from __future__ import annotations - -from collections import defaultdict -from functools import lru_cache -from functools import wraps -import re - -from . import dictionary -from .types import _OracleBoolean -from .types import _OracleDate -from .types import BFILE -from .types import BINARY_DOUBLE -from .types import BINARY_FLOAT -from .types import DATE -from .types import FLOAT -from .types import INTERVAL -from .types import LONG -from .types import NCLOB -from .types import NUMBER -from .types import NVARCHAR2 # noqa -from .types import OracleRaw # noqa -from .types import RAW -from .types import ROWID # noqa -from .types import TIMESTAMP -from .types import VARCHAR2 # noqa -from ... import Computed -from ... import exc -from ... import schema as sa_schema -from ... import sql -from ... import util -from ...engine import default -from ...engine import ObjectKind -from ...engine import ObjectScope -from ...engine import reflection -from ...engine.reflection import ReflectionDefaults -from ...sql import and_ -from ...sql import bindparam -from ...sql import compiler -from ...sql import expression -from ...sql import func -from ...sql import null -from ...sql import or_ -from ...sql import select -from ...sql import sqltypes -from ...sql import util as sql_util -from ...sql import visitors -from ...sql.visitors import InternalTraversal -from ...types import BLOB -from ...types import CHAR -from ...types import CLOB -from ...types import DOUBLE_PRECISION -from ...types import INTEGER -from ...types import NCHAR -from ...types import NVARCHAR -from ...types import REAL -from ...types import VARCHAR - -RESERVED_WORDS = set( - "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN " - "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED " - "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE " - "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE " - "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES " - "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS " - "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER " - "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR " - "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split() -) - -NO_ARG_FNS = set( - "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split() -) - - -colspecs = { - sqltypes.Boolean: _OracleBoolean, - sqltypes.Interval: INTERVAL, - sqltypes.DateTime: DATE, - sqltypes.Date: _OracleDate, -} - -ischema_names = { - "VARCHAR2": VARCHAR, - "NVARCHAR2": NVARCHAR, - "CHAR": CHAR, - "NCHAR": NCHAR, - "DATE": DATE, - "NUMBER": NUMBER, - "BLOB": BLOB, - "BFILE": BFILE, - "CLOB": CLOB, - "NCLOB": NCLOB, - "TIMESTAMP": TIMESTAMP, - "TIMESTAMP WITH TIME ZONE": TIMESTAMP, - "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP, - "INTERVAL DAY TO SECOND": INTERVAL, - "RAW": RAW, - "FLOAT": FLOAT, - "DOUBLE PRECISION": DOUBLE_PRECISION, - "REAL": REAL, - "LONG": LONG, - "BINARY_DOUBLE": BINARY_DOUBLE, - "BINARY_FLOAT": BINARY_FLOAT, - "ROWID": ROWID, -} - - -class OracleTypeCompiler(compiler.GenericTypeCompiler): - # Note: - # Oracle DATE == DATETIME - # Oracle does not allow milliseconds in DATE - # Oracle does not support TIME columns - - def visit_datetime(self, type_, **kw): - return self.visit_DATE(type_, **kw) - - def visit_float(self, type_, **kw): - return self.visit_FLOAT(type_, **kw) - - def visit_double(self, type_, **kw): - return self.visit_DOUBLE_PRECISION(type_, **kw) - - def visit_unicode(self, type_, **kw): - if self.dialect._use_nchar_for_unicode: - return self.visit_NVARCHAR2(type_, **kw) - else: - return self.visit_VARCHAR2(type_, **kw) - - def visit_INTERVAL(self, type_, **kw): - return "INTERVAL DAY%s TO SECOND%s" % ( - type_.day_precision is not None - and "(%d)" % type_.day_precision - or "", - type_.second_precision is not None - and "(%d)" % type_.second_precision - or "", - ) - - def visit_LONG(self, type_, **kw): - return "LONG" - - def visit_TIMESTAMP(self, type_, **kw): - if getattr(type_, "local_timezone", False): - return "TIMESTAMP WITH LOCAL TIME ZONE" - elif type_.timezone: - return "TIMESTAMP WITH TIME ZONE" - else: - return "TIMESTAMP" - - def visit_DOUBLE_PRECISION(self, type_, **kw): - return self._generate_numeric(type_, "DOUBLE PRECISION", **kw) - - def visit_BINARY_DOUBLE(self, type_, **kw): - return self._generate_numeric(type_, "BINARY_DOUBLE", **kw) - - def visit_BINARY_FLOAT(self, type_, **kw): - return self._generate_numeric(type_, "BINARY_FLOAT", **kw) - - def visit_FLOAT(self, type_, **kw): - kw["_requires_binary_precision"] = True - return self._generate_numeric(type_, "FLOAT", **kw) - - def visit_NUMBER(self, type_, **kw): - return self._generate_numeric(type_, "NUMBER", **kw) - - def _generate_numeric( - self, - type_, - name, - precision=None, - scale=None, - _requires_binary_precision=False, - **kw, - ): - if precision is None: - precision = getattr(type_, "precision", None) - - if _requires_binary_precision: - binary_precision = getattr(type_, "binary_precision", None) - - if precision and binary_precision is None: - # https://www.oracletutorial.com/oracle-basics/oracle-float/ - estimated_binary_precision = int(precision / 0.30103) - raise exc.ArgumentError( - "Oracle FLOAT types use 'binary precision', which does " - "not convert cleanly from decimal 'precision'. Please " - "specify " - f"this type with a separate Oracle variant, such as " - f"{type_.__class__.__name__}(precision={precision})." - f"with_variant(oracle.FLOAT" - f"(binary_precision=" - f"{estimated_binary_precision}), 'oracle'), so that the " - "Oracle specific 'binary_precision' may be specified " - "accurately." - ) - else: - precision = binary_precision - - if scale is None: - scale = getattr(type_, "scale", None) - - if precision is None: - return name - elif scale is None: - n = "%(name)s(%(precision)s)" - return n % {"name": name, "precision": precision} - else: - n = "%(name)s(%(precision)s, %(scale)s)" - return n % {"name": name, "precision": precision, "scale": scale} - - def visit_string(self, type_, **kw): - return self.visit_VARCHAR2(type_, **kw) - - def visit_VARCHAR2(self, type_, **kw): - return self._visit_varchar(type_, "", "2") - - def visit_NVARCHAR2(self, type_, **kw): - return self._visit_varchar(type_, "N", "2") - - visit_NVARCHAR = visit_NVARCHAR2 - - def visit_VARCHAR(self, type_, **kw): - return self._visit_varchar(type_, "", "") - - def _visit_varchar(self, type_, n, num): - if not type_.length: - return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n} - elif not n and self.dialect._supports_char_length: - varchar = "VARCHAR%(two)s(%(length)s CHAR)" - return varchar % {"length": type_.length, "two": num} - else: - varchar = "%(n)sVARCHAR%(two)s(%(length)s)" - return varchar % {"length": type_.length, "two": num, "n": n} - - def visit_text(self, type_, **kw): - return self.visit_CLOB(type_, **kw) - - def visit_unicode_text(self, type_, **kw): - if self.dialect._use_nchar_for_unicode: - return self.visit_NCLOB(type_, **kw) - else: - return self.visit_CLOB(type_, **kw) - - def visit_large_binary(self, type_, **kw): - return self.visit_BLOB(type_, **kw) - - def visit_big_integer(self, type_, **kw): - return self.visit_NUMBER(type_, precision=19, **kw) - - def visit_boolean(self, type_, **kw): - return self.visit_SMALLINT(type_, **kw) - - def visit_RAW(self, type_, **kw): - if type_.length: - return "RAW(%(length)s)" % {"length": type_.length} - else: - return "RAW" - - def visit_ROWID(self, type_, **kw): - return "ROWID" - - -class OracleCompiler(compiler.SQLCompiler): - """Oracle compiler modifies the lexical structure of Select - statements to work under non-ANSI configured Oracle databases, if - the use_ansi flag is False. - """ - - compound_keywords = util.update_copy( - compiler.SQLCompiler.compound_keywords, - {expression.CompoundSelect.EXCEPT: "MINUS"}, - ) - - def __init__(self, *args, **kwargs): - self.__wheres = {} - super().__init__(*args, **kwargs) - - def visit_mod_binary(self, binary, operator, **kw): - return "mod(%s, %s)" % ( - self.process(binary.left, **kw), - self.process(binary.right, **kw), - ) - - def visit_now_func(self, fn, **kw): - return "CURRENT_TIMESTAMP" - - def visit_char_length_func(self, fn, **kw): - return "LENGTH" + self.function_argspec(fn, **kw) - - def visit_match_op_binary(self, binary, operator, **kw): - return "CONTAINS (%s, %s)" % ( - self.process(binary.left), - self.process(binary.right), - ) - - def visit_true(self, expr, **kw): - return "1" - - def visit_false(self, expr, **kw): - return "0" - - def get_cte_preamble(self, recursive): - return "WITH" - - def get_select_hint_text(self, byfroms): - return " ".join("/*+ %s */" % text for table, text in byfroms.items()) - - def function_argspec(self, fn, **kw): - if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS: - return compiler.SQLCompiler.function_argspec(self, fn, **kw) - else: - return "" - - def visit_function(self, func, **kw): - text = super().visit_function(func, **kw) - if kw.get("asfrom", False): - text = "TABLE (%s)" % text - return text - - def visit_table_valued_column(self, element, **kw): - text = super().visit_table_valued_column(element, **kw) - text = text + ".COLUMN_VALUE" - return text - - def default_from(self): - """Called when a ``SELECT`` statement has no froms, - and no ``FROM`` clause is to be appended. - - The Oracle compiler tacks a "FROM DUAL" to the statement. - """ - - return " FROM DUAL" - - def visit_join(self, join, from_linter=None, **kwargs): - if self.dialect.use_ansi: - return compiler.SQLCompiler.visit_join( - self, join, from_linter=from_linter, **kwargs - ) - else: - if from_linter: - from_linter.edges.add((join.left, join.right)) - - kwargs["asfrom"] = True - if isinstance(join.right, expression.FromGrouping): - right = join.right.element - else: - right = join.right - return ( - self.process(join.left, from_linter=from_linter, **kwargs) - + ", " - + self.process(right, from_linter=from_linter, **kwargs) - ) - - def _get_nonansi_join_whereclause(self, froms): - clauses = [] - - def visit_join(join): - if join.isouter: - # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354 - # "apply the outer join operator (+) to all columns of B in - # the join condition in the WHERE clause" - that is, - # unconditionally regardless of operator or the other side - def visit_binary(binary): - if isinstance( - binary.left, expression.ColumnClause - ) and join.right.is_derived_from(binary.left.table): - binary.left = _OuterJoinColumn(binary.left) - elif isinstance( - binary.right, expression.ColumnClause - ) and join.right.is_derived_from(binary.right.table): - binary.right = _OuterJoinColumn(binary.right) - - clauses.append( - visitors.cloned_traverse( - join.onclause, {}, {"binary": visit_binary} - ) - ) - else: - clauses.append(join.onclause) - - for j in join.left, join.right: - if isinstance(j, expression.Join): - visit_join(j) - elif isinstance(j, expression.FromGrouping): - visit_join(j.element) - - for f in froms: - if isinstance(f, expression.Join): - visit_join(f) - - if not clauses: - return None - else: - return sql.and_(*clauses) - - def visit_outer_join_column(self, vc, **kw): - return self.process(vc.column, **kw) + "(+)" - - def visit_sequence(self, seq, **kw): - return self.preparer.format_sequence(seq) + ".nextval" - - def get_render_as_alias_suffix(self, alias_name_text): - """Oracle doesn't like ``FROM table AS alias``""" - - return " " + alias_name_text - - def returning_clause( - self, stmt, returning_cols, *, populate_result_map, **kw - ): - columns = [] - binds = [] - - for i, column in enumerate( - expression._select_iterables(returning_cols) - ): - if ( - self.isupdate - and isinstance(column, sa_schema.Column) - and isinstance(column.server_default, Computed) - and not self.dialect._supports_update_returning_computed_cols - ): - util.warn( - "Computed columns don't work with Oracle UPDATE " - "statements that use RETURNING; the value of the column " - "*before* the UPDATE takes place is returned. It is " - "advised to not use RETURNING with an Oracle computed " - "column. Consider setting implicit_returning to False on " - "the Table object in order to avoid implicit RETURNING " - "clauses from being generated for this Table." - ) - if column.type._has_column_expression: - col_expr = column.type.column_expression(column) - else: - col_expr = column - - outparam = sql.outparam("ret_%d" % i, type_=column.type) - self.binds[outparam.key] = outparam - binds.append( - self.bindparam_string(self._truncate_bindparam(outparam)) - ) - - # has_out_parameters would in a normal case be set to True - # as a result of the compiler visiting an outparam() object. - # in this case, the above outparam() objects are not being - # visited. Ensure the statement itself didn't have other - # outparam() objects independently. - # technically, this could be supported, but as it would be - # a very strange use case without a clear rationale, disallow it - if self.has_out_parameters: - raise exc.InvalidRequestError( - "Using explicit outparam() objects with " - "UpdateBase.returning() in the same Core DML statement " - "is not supported in the Oracle dialect." - ) - - self._oracle_returning = True - - columns.append(self.process(col_expr, within_columns_clause=False)) - if populate_result_map: - self._add_to_result_map( - getattr(col_expr, "name", col_expr._anon_name_label), - getattr(col_expr, "name", col_expr._anon_name_label), - ( - column, - getattr(column, "name", None), - getattr(column, "key", None), - ), - column.type, - ) - - return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds) - - def _row_limit_clause(self, select, **kw): - """ORacle 12c supports OFFSET/FETCH operators - Use it instead subquery with row_number - - """ - - if ( - select._fetch_clause is not None - or not self.dialect._supports_offset_fetch - ): - return super()._row_limit_clause( - select, use_literal_execute_for_simple_int=True, **kw - ) - else: - return self.fetch_clause( - select, - fetch_clause=self._get_limit_or_fetch(select), - use_literal_execute_for_simple_int=True, - **kw, - ) - - def _get_limit_or_fetch(self, select): - if select._fetch_clause is None: - return select._limit_clause - else: - return select._fetch_clause - - def translate_select_structure(self, select_stmt, **kwargs): - select = select_stmt - - if not getattr(select, "_oracle_visit", None): - if not self.dialect.use_ansi: - froms = self._display_froms_for_select( - select, kwargs.get("asfrom", False) - ) - whereclause = self._get_nonansi_join_whereclause(froms) - if whereclause is not None: - select = select.where(whereclause) - select._oracle_visit = True - - # if fetch is used this is not needed - if ( - select._has_row_limiting_clause - and not self.dialect._supports_offset_fetch - and select._fetch_clause is None - ): - limit_clause = select._limit_clause - offset_clause = select._offset_clause - - if select._simple_int_clause(limit_clause): - limit_clause = limit_clause.render_literal_execute() - - if select._simple_int_clause(offset_clause): - offset_clause = offset_clause.render_literal_execute() - - # currently using form at: - # https://blogs.oracle.com/oraclemagazine/\ - # on-rownum-and-limiting-results - - orig_select = select - select = select._generate() - select._oracle_visit = True - - # add expressions to accommodate FOR UPDATE OF - for_update = select._for_update_arg - if for_update is not None and for_update.of: - for_update = for_update._clone() - for_update._copy_internals() - - for elem in for_update.of: - if not select.selected_columns.contains_column(elem): - select = select.add_columns(elem) - - # Wrap the middle select and add the hint - inner_subquery = select.alias() - limitselect = sql.select( - *[ - c - for c in inner_subquery.c - if orig_select.selected_columns.corresponding_column(c) - is not None - ] - ) - - if ( - limit_clause is not None - and self.dialect.optimize_limits - and select._simple_int_clause(limit_clause) - ): - limitselect = limitselect.prefix_with( - expression.text( - "/*+ FIRST_ROWS(%s) */" - % self.process(limit_clause, **kwargs) - ) - ) - - limitselect._oracle_visit = True - limitselect._is_wrapper = True - - # add expressions to accommodate FOR UPDATE OF - if for_update is not None and for_update.of: - adapter = sql_util.ClauseAdapter(inner_subquery) - for_update.of = [ - adapter.traverse(elem) for elem in for_update.of - ] - - # If needed, add the limiting clause - if limit_clause is not None: - if select._simple_int_clause(limit_clause) and ( - offset_clause is None - or select._simple_int_clause(offset_clause) - ): - max_row = limit_clause - - if offset_clause is not None: - max_row = max_row + offset_clause - - else: - max_row = limit_clause - - if offset_clause is not None: - max_row = max_row + offset_clause - limitselect = limitselect.where( - sql.literal_column("ROWNUM") <= max_row - ) - - # If needed, add the ora_rn, and wrap again with offset. - if offset_clause is None: - limitselect._for_update_arg = for_update - select = limitselect - else: - limitselect = limitselect.add_columns( - sql.literal_column("ROWNUM").label("ora_rn") - ) - limitselect._oracle_visit = True - limitselect._is_wrapper = True - - if for_update is not None and for_update.of: - limitselect_cols = limitselect.selected_columns - for elem in for_update.of: - if ( - limitselect_cols.corresponding_column(elem) - is None - ): - limitselect = limitselect.add_columns(elem) - - limit_subquery = limitselect.alias() - origselect_cols = orig_select.selected_columns - offsetselect = sql.select( - *[ - c - for c in limit_subquery.c - if origselect_cols.corresponding_column(c) - is not None - ] - ) - - offsetselect._oracle_visit = True - offsetselect._is_wrapper = True - - if for_update is not None and for_update.of: - adapter = sql_util.ClauseAdapter(limit_subquery) - for_update.of = [ - adapter.traverse(elem) for elem in for_update.of - ] - - offsetselect = offsetselect.where( - sql.literal_column("ora_rn") > offset_clause - ) - - offsetselect._for_update_arg = for_update - select = offsetselect - - return select - - def limit_clause(self, select, **kw): - return "" - - def visit_empty_set_expr(self, type_, **kw): - return "SELECT 1 FROM DUAL WHERE 1!=1" - - def for_update_clause(self, select, **kw): - if self.is_subquery(): - return "" - - tmp = " FOR UPDATE" - - if select._for_update_arg.of: - tmp += " OF " + ", ".join( - self.process(elem, **kw) for elem in select._for_update_arg.of - ) - - if select._for_update_arg.nowait: - tmp += " NOWAIT" - if select._for_update_arg.skip_locked: - tmp += " SKIP LOCKED" - - return tmp - - def visit_is_distinct_from_binary(self, binary, operator, **kw): - return "DECODE(%s, %s, 0, 1) = 1" % ( - self.process(binary.left), - self.process(binary.right), - ) - - def visit_is_not_distinct_from_binary(self, binary, operator, **kw): - return "DECODE(%s, %s, 0, 1) = 0" % ( - self.process(binary.left), - self.process(binary.right), - ) - - def visit_regexp_match_op_binary(self, binary, operator, **kw): - string = self.process(binary.left, **kw) - pattern = self.process(binary.right, **kw) - flags = binary.modifiers["flags"] - if flags is None: - return "REGEXP_LIKE(%s, %s)" % (string, pattern) - else: - return "REGEXP_LIKE(%s, %s, %s)" % ( - string, - pattern, - self.render_literal_value(flags, sqltypes.STRINGTYPE), - ) - - def visit_not_regexp_match_op_binary(self, binary, operator, **kw): - return "NOT %s" % self.visit_regexp_match_op_binary( - binary, operator, **kw - ) - - def visit_regexp_replace_op_binary(self, binary, operator, **kw): - string = self.process(binary.left, **kw) - pattern_replace = self.process(binary.right, **kw) - flags = binary.modifiers["flags"] - if flags is None: - return "REGEXP_REPLACE(%s, %s)" % ( - string, - pattern_replace, - ) - else: - return "REGEXP_REPLACE(%s, %s, %s)" % ( - string, - pattern_replace, - self.render_literal_value(flags, sqltypes.STRINGTYPE), - ) - - def visit_aggregate_strings_func(self, fn, **kw): - return "LISTAGG%s" % self.function_argspec(fn, **kw) - - -class OracleDDLCompiler(compiler.DDLCompiler): - def define_constraint_cascades(self, constraint): - text = "" - if constraint.ondelete is not None: - text += " ON DELETE %s" % constraint.ondelete - - # oracle has no ON UPDATE CASCADE - - # its only available via triggers - # https://asktom.oracle.com/tkyte/update_cascade/index.html - if constraint.onupdate is not None: - util.warn( - "Oracle does not contain native UPDATE CASCADE " - "functionality - onupdates will not be rendered for foreign " - "keys. Consider using deferrable=True, initially='deferred' " - "or triggers." - ) - - return text - - def visit_drop_table_comment(self, drop, **kw): - return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table( - drop.element - ) - - def visit_create_index(self, create, **kw): - index = create.element - self._verify_index_table(index) - preparer = self.preparer - text = "CREATE " - if index.unique: - text += "UNIQUE " - if index.dialect_options["oracle"]["bitmap"]: - text += "BITMAP " - text += "INDEX %s ON %s (%s)" % ( - self._prepared_index_name(index, include_schema=True), - preparer.format_table(index.table, use_schema=True), - ", ".join( - self.sql_compiler.process( - expr, include_table=False, literal_binds=True - ) - for expr in index.expressions - ), - ) - if index.dialect_options["oracle"]["compress"] is not False: - if index.dialect_options["oracle"]["compress"] is True: - text += " COMPRESS" - else: - text += " COMPRESS %d" % ( - index.dialect_options["oracle"]["compress"] - ) - return text - - def post_create_table(self, table): - table_opts = [] - opts = table.dialect_options["oracle"] - - if opts["on_commit"]: - on_commit_options = opts["on_commit"].replace("_", " ").upper() - table_opts.append("\n ON COMMIT %s" % on_commit_options) - - if opts["compress"]: - if opts["compress"] is True: - table_opts.append("\n COMPRESS") - else: - table_opts.append("\n COMPRESS FOR %s" % (opts["compress"])) - - return "".join(table_opts) - - def get_identity_options(self, identity_options): - text = super().get_identity_options(identity_options) - text = text.replace("NO MINVALUE", "NOMINVALUE") - text = text.replace("NO MAXVALUE", "NOMAXVALUE") - text = text.replace("NO CYCLE", "NOCYCLE") - if identity_options.order is not None: - text += " ORDER" if identity_options.order else " NOORDER" - return text.strip() - - def visit_computed_column(self, generated, **kw): - text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process( - generated.sqltext, include_table=False, literal_binds=True - ) - if generated.persisted is True: - raise exc.CompileError( - "Oracle computed columns do not support 'stored' persistence; " - "set the 'persisted' flag to None or False for Oracle support." - ) - elif generated.persisted is False: - text += " VIRTUAL" - return text - - def visit_identity_column(self, identity, **kw): - if identity.always is None: - kind = "" - else: - kind = "ALWAYS" if identity.always else "BY DEFAULT" - text = "GENERATED %s" % kind - if identity.on_null: - text += " ON NULL" - text += " AS IDENTITY" - options = self.get_identity_options(identity) - if options: - text += " (%s)" % options - return text - - -class OracleIdentifierPreparer(compiler.IdentifierPreparer): - reserved_words = {x.lower() for x in RESERVED_WORDS} - illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union( - ["_", "$"] - ) - - def _bindparam_requires_quotes(self, value): - """Return True if the given identifier requires quoting.""" - lc_value = value.lower() - return ( - lc_value in self.reserved_words - or value[0] in self.illegal_initial_characters - or not self.legal_characters.match(str(value)) - ) - - def format_savepoint(self, savepoint): - name = savepoint.ident.lstrip("_") - return super().format_savepoint(savepoint, name) - - -class OracleExecutionContext(default.DefaultExecutionContext): - def fire_sequence(self, seq, type_): - return self._execute_scalar( - "SELECT " - + self.identifier_preparer.format_sequence(seq) - + ".nextval FROM DUAL", - type_, - ) - - def pre_exec(self): - if self.statement and "_oracle_dblink" in self.execution_options: - self.statement = self.statement.replace( - dictionary.DB_LINK_PLACEHOLDER, - self.execution_options["_oracle_dblink"], - ) - - -class OracleDialect(default.DefaultDialect): - name = "oracle" - supports_statement_cache = True - supports_alter = True - max_identifier_length = 128 - - _supports_offset_fetch = True - - insert_returning = True - update_returning = True - delete_returning = True - - div_is_floordiv = False - - supports_simple_order_by_label = False - cte_follows_insert = True - returns_native_bytes = True - - supports_sequences = True - sequences_optional = False - postfetch_lastrowid = False - - default_paramstyle = "named" - colspecs = colspecs - ischema_names = ischema_names - requires_name_normalize = True - - supports_comments = True - - supports_default_values = False - supports_default_metavalue = True - supports_empty_insert = False - supports_identity_columns = True - - statement_compiler = OracleCompiler - ddl_compiler = OracleDDLCompiler - type_compiler_cls = OracleTypeCompiler - preparer = OracleIdentifierPreparer - execution_ctx_cls = OracleExecutionContext - - reflection_options = ("oracle_resolve_synonyms",) - - _use_nchar_for_unicode = False - - construct_arguments = [ - ( - sa_schema.Table, - {"resolve_synonyms": False, "on_commit": None, "compress": False}, - ), - (sa_schema.Index, {"bitmap": False, "compress": False}), - ] - - @util.deprecated_params( - use_binds_for_limits=( - "1.4", - "The ``use_binds_for_limits`` Oracle dialect parameter is " - "deprecated. The dialect now renders LIMIT /OFFSET integers " - "inline in all cases using a post-compilation hook, so that the " - "value is still represented by a 'bound parameter' on the Core " - "Expression side.", - ) - ) - def __init__( - self, - use_ansi=True, - optimize_limits=False, - use_binds_for_limits=None, - use_nchar_for_unicode=False, - exclude_tablespaces=("SYSTEM", "SYSAUX"), - enable_offset_fetch=True, - **kwargs, - ): - default.DefaultDialect.__init__(self, **kwargs) - self._use_nchar_for_unicode = use_nchar_for_unicode - self.use_ansi = use_ansi - self.optimize_limits = optimize_limits - self.exclude_tablespaces = exclude_tablespaces - self.enable_offset_fetch = self._supports_offset_fetch = ( - enable_offset_fetch - ) - - def initialize(self, connection): - super().initialize(connection) - - # Oracle 8i has RETURNING: - # https://docs.oracle.com/cd/A87860_01/doc/index.htm - - # so does Oracle8: - # https://docs.oracle.com/cd/A64702_01/doc/index.htm - - if self._is_oracle_8: - self.colspecs = self.colspecs.copy() - self.colspecs.pop(sqltypes.Interval) - self.use_ansi = False - - self.supports_identity_columns = self.server_version_info >= (12,) - self._supports_offset_fetch = ( - self.enable_offset_fetch and self.server_version_info >= (12,) - ) - - def _get_effective_compat_server_version_info(self, connection): - # dialect does not need compat levels below 12.2, so don't query - # in those cases - - if self.server_version_info < (12, 2): - return self.server_version_info - try: - compat = connection.exec_driver_sql( - "SELECT value FROM v$parameter WHERE name = 'compatible'" - ).scalar() - except exc.DBAPIError: - compat = None - - if compat: - try: - return tuple(int(x) for x in compat.split(".")) - except: - return self.server_version_info - else: - return self.server_version_info - - @property - def _is_oracle_8(self): - return self.server_version_info and self.server_version_info < (9,) - - @property - def _supports_table_compression(self): - return self.server_version_info and self.server_version_info >= (10, 1) - - @property - def _supports_table_compress_for(self): - return self.server_version_info and self.server_version_info >= (11,) - - @property - def _supports_char_length(self): - return not self._is_oracle_8 - - @property - def _supports_update_returning_computed_cols(self): - # on version 18 this error is no longet present while it happens on 11 - # it may work also on versions before the 18 - return self.server_version_info and self.server_version_info >= (18,) - - @property - def _supports_except_all(self): - return self.server_version_info and self.server_version_info >= (21,) - - def do_release_savepoint(self, connection, name): - # Oracle does not support RELEASE SAVEPOINT - pass - - def _check_max_identifier_length(self, connection): - if self._get_effective_compat_server_version_info(connection) < ( - 12, - 2, - ): - return 30 - else: - # use the default - return None - - def get_isolation_level_values(self, dbapi_connection): - return ["READ COMMITTED", "SERIALIZABLE"] - - def get_default_isolation_level(self, dbapi_conn): - try: - return self.get_isolation_level(dbapi_conn) - except NotImplementedError: - raise - except: - return "READ COMMITTED" - - def _execute_reflection( - self, connection, query, dblink, returns_long, params=None - ): - if dblink and not dblink.startswith("@"): - dblink = f"@{dblink}" - execution_options = { - # handle db links - "_oracle_dblink": dblink or "", - # override any schema translate map - "schema_translate_map": None, - } - - if dblink and returns_long: - # Oracle seems to error with - # "ORA-00997: illegal use of LONG datatype" when returning - # LONG columns via a dblink in a query with bind params - # This type seems to be very hard to cast into something else - # so it seems easier to just use bind param in this case - def visit_bindparam(bindparam): - bindparam.literal_execute = True - - query = visitors.cloned_traverse( - query, {}, {"bindparam": visit_bindparam} - ) - return connection.execute( - query, params, execution_options=execution_options - ) - - @util.memoized_property - def _has_table_query(self): - # materialized views are returned by all_tables - tables = ( - select( - dictionary.all_tables.c.table_name, - dictionary.all_tables.c.owner, - ) - .union_all( - select( - dictionary.all_views.c.view_name.label("table_name"), - dictionary.all_views.c.owner, - ) - ) - .subquery("tables_and_views") - ) - - query = select(tables.c.table_name).where( - tables.c.table_name == bindparam("table_name"), - tables.c.owner == bindparam("owner"), - ) - return query - - @reflection.cache - def has_table( - self, connection, table_name, schema=None, dblink=None, **kw - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link.""" - self._ensure_has_table_connection(connection) - - if not schema: - schema = self.default_schema_name - - params = { - "table_name": self.denormalize_name(table_name), - "owner": self.denormalize_schema_name(schema), - } - cursor = self._execute_reflection( - connection, - self._has_table_query, - dblink, - returns_long=False, - params=params, - ) - return bool(cursor.scalar()) - - @reflection.cache - def has_sequence( - self, connection, sequence_name, schema=None, dblink=None, **kw - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link.""" - if not schema: - schema = self.default_schema_name - - query = select(dictionary.all_sequences.c.sequence_name).where( - dictionary.all_sequences.c.sequence_name - == self.denormalize_schema_name(sequence_name), - dictionary.all_sequences.c.sequence_owner - == self.denormalize_schema_name(schema), - ) - - cursor = self._execute_reflection( - connection, query, dblink, returns_long=False - ) - return bool(cursor.scalar()) - - def _get_default_schema_name(self, connection): - return self.normalize_name( - connection.exec_driver_sql( - "select sys_context( 'userenv', 'current_schema' ) from dual" - ).scalar() - ) - - def denormalize_schema_name(self, name): - # look for quoted_name - force = getattr(name, "quote", None) - if force is None and name == "public": - # look for case insensitive, no quoting specified, "public" - return "PUBLIC" - return super().denormalize_name(name) - - @reflection.flexi_cache( - ("schema", InternalTraversal.dp_string), - ("filter_names", InternalTraversal.dp_string_list), - ("dblink", InternalTraversal.dp_string), - ) - def _get_synonyms(self, connection, schema, filter_names, dblink, **kw): - owner = self.denormalize_schema_name( - schema or self.default_schema_name - ) - - has_filter_names, params = self._prepare_filter_names(filter_names) - query = select( - dictionary.all_synonyms.c.synonym_name, - dictionary.all_synonyms.c.table_name, - dictionary.all_synonyms.c.table_owner, - dictionary.all_synonyms.c.db_link, - ).where(dictionary.all_synonyms.c.owner == owner) - if has_filter_names: - query = query.where( - dictionary.all_synonyms.c.synonym_name.in_( - params["filter_names"] - ) - ) - result = self._execute_reflection( - connection, query, dblink, returns_long=False - ).mappings() - return result.all() - - @lru_cache() - def _all_objects_query( - self, owner, scope, kind, has_filter_names, has_mat_views - ): - query = ( - select(dictionary.all_objects.c.object_name) - .select_from(dictionary.all_objects) - .where(dictionary.all_objects.c.owner == owner) - ) - - # NOTE: materialized views are listed in all_objects twice; - # once as MATERIALIZE VIEW and once as TABLE - if kind is ObjectKind.ANY: - # materilaized view are listed also as tables so there is no - # need to add them to the in_. - query = query.where( - dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW")) - ) - else: - object_type = [] - if ObjectKind.VIEW in kind: - object_type.append("VIEW") - if ( - ObjectKind.MATERIALIZED_VIEW in kind - and ObjectKind.TABLE not in kind - ): - # materilaized view are listed also as tables so there is no - # need to add them to the in_ if also selecting tables. - object_type.append("MATERIALIZED VIEW") - if ObjectKind.TABLE in kind: - object_type.append("TABLE") - if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind: - # materialized view are listed also as tables, - # so they need to be filtered out - # EXCEPT ALL / MINUS profiles as faster than using - # NOT EXISTS or NOT IN with a subquery, but it's in - # general faster to get the mat view names and exclude - # them only when needed - query = query.where( - dictionary.all_objects.c.object_name.not_in( - bindparam("mat_views") - ) - ) - query = query.where( - dictionary.all_objects.c.object_type.in_(object_type) - ) - - # handles scope - if scope is ObjectScope.DEFAULT: - query = query.where(dictionary.all_objects.c.temporary == "N") - elif scope is ObjectScope.TEMPORARY: - query = query.where(dictionary.all_objects.c.temporary == "Y") - - if has_filter_names: - query = query.where( - dictionary.all_objects.c.object_name.in_( - bindparam("filter_names") - ) - ) - return query - - @reflection.flexi_cache( - ("schema", InternalTraversal.dp_string), - ("scope", InternalTraversal.dp_plain_obj), - ("kind", InternalTraversal.dp_plain_obj), - ("filter_names", InternalTraversal.dp_string_list), - ("dblink", InternalTraversal.dp_string), - ) - def _get_all_objects( - self, connection, schema, scope, kind, filter_names, dblink, **kw - ): - owner = self.denormalize_schema_name( - schema or self.default_schema_name - ) - - has_filter_names, params = self._prepare_filter_names(filter_names) - has_mat_views = False - if ( - ObjectKind.TABLE in kind - and ObjectKind.MATERIALIZED_VIEW not in kind - ): - # see note in _all_objects_query - mat_views = self.get_materialized_view_names( - connection, schema, dblink, _normalize=False, **kw - ) - if mat_views: - params["mat_views"] = mat_views - has_mat_views = True - - query = self._all_objects_query( - owner, scope, kind, has_filter_names, has_mat_views - ) - - result = self._execute_reflection( - connection, query, dblink, returns_long=False, params=params - ).scalars() - - return result.all() - - def _handle_synonyms_decorator(fn): - @wraps(fn) - def wrapper(self, *args, **kwargs): - return self._handle_synonyms(fn, *args, **kwargs) - - return wrapper - - def _handle_synonyms(self, fn, connection, *args, **kwargs): - if not kwargs.get("oracle_resolve_synonyms", False): - return fn(self, connection, *args, **kwargs) - - original_kw = kwargs.copy() - schema = kwargs.pop("schema", None) - result = self._get_synonyms( - connection, - schema=schema, - filter_names=kwargs.pop("filter_names", None), - dblink=kwargs.pop("dblink", None), - info_cache=kwargs.get("info_cache", None), - ) - - dblinks_owners = defaultdict(dict) - for row in result: - key = row["db_link"], row["table_owner"] - tn = self.normalize_name(row["table_name"]) - dblinks_owners[key][tn] = row["synonym_name"] - - if not dblinks_owners: - # No synonym, do the plain thing - return fn(self, connection, *args, **original_kw) - - data = {} - for (dblink, table_owner), mapping in dblinks_owners.items(): - call_kw = { - **original_kw, - "schema": table_owner, - "dblink": self.normalize_name(dblink), - "filter_names": mapping.keys(), - } - call_result = fn(self, connection, *args, **call_kw) - for (_, tn), value in call_result: - synonym_name = self.normalize_name(mapping[tn]) - data[(schema, synonym_name)] = value - return data.items() - - @reflection.cache - def get_schema_names(self, connection, dblink=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link.""" - query = select(dictionary.all_users.c.username).order_by( - dictionary.all_users.c.username - ) - result = self._execute_reflection( - connection, query, dblink, returns_long=False - ).scalars() - return [self.normalize_name(row) for row in result] - - @reflection.cache - def get_table_names(self, connection, schema=None, dblink=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link.""" - # note that table_names() isn't loading DBLINKed or synonym'ed tables - if schema is None: - schema = self.default_schema_name - - den_schema = self.denormalize_schema_name(schema) - if kw.get("oracle_resolve_synonyms", False): - tables = ( - select( - dictionary.all_tables.c.table_name, - dictionary.all_tables.c.owner, - dictionary.all_tables.c.iot_name, - dictionary.all_tables.c.duration, - dictionary.all_tables.c.tablespace_name, - ) - .union_all( - select( - dictionary.all_synonyms.c.synonym_name.label( - "table_name" - ), - dictionary.all_synonyms.c.owner, - dictionary.all_tables.c.iot_name, - dictionary.all_tables.c.duration, - dictionary.all_tables.c.tablespace_name, - ) - .select_from(dictionary.all_tables) - .join( - dictionary.all_synonyms, - and_( - dictionary.all_tables.c.table_name - == dictionary.all_synonyms.c.table_name, - dictionary.all_tables.c.owner - == func.coalesce( - dictionary.all_synonyms.c.table_owner, - dictionary.all_synonyms.c.owner, - ), - ), - ) - ) - .subquery("available_tables") - ) - else: - tables = dictionary.all_tables - - query = select(tables.c.table_name) - if self.exclude_tablespaces: - query = query.where( - func.coalesce( - tables.c.tablespace_name, "no tablespace" - ).not_in(self.exclude_tablespaces) - ) - query = query.where( - tables.c.owner == den_schema, - tables.c.iot_name.is_(null()), - tables.c.duration.is_(null()), - ) - - # remove materialized views - mat_query = select( - dictionary.all_mviews.c.mview_name.label("table_name") - ).where(dictionary.all_mviews.c.owner == den_schema) - - query = ( - query.except_all(mat_query) - if self._supports_except_all - else query.except_(mat_query) - ) - - result = self._execute_reflection( - connection, query, dblink, returns_long=False - ).scalars() - return [self.normalize_name(row) for row in result] - - @reflection.cache - def get_temp_table_names(self, connection, dblink=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link.""" - schema = self.denormalize_schema_name(self.default_schema_name) - - query = select(dictionary.all_tables.c.table_name) - if self.exclude_tablespaces: - query = query.where( - func.coalesce( - dictionary.all_tables.c.tablespace_name, "no tablespace" - ).not_in(self.exclude_tablespaces) - ) - query = query.where( - dictionary.all_tables.c.owner == schema, - dictionary.all_tables.c.iot_name.is_(null()), - dictionary.all_tables.c.duration.is_not(null()), - ) - - result = self._execute_reflection( - connection, query, dblink, returns_long=False - ).scalars() - return [self.normalize_name(row) for row in result] - - @reflection.cache - def get_materialized_view_names( - self, connection, schema=None, dblink=None, _normalize=True, **kw - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link.""" - if not schema: - schema = self.default_schema_name - - query = select(dictionary.all_mviews.c.mview_name).where( - dictionary.all_mviews.c.owner - == self.denormalize_schema_name(schema) - ) - result = self._execute_reflection( - connection, query, dblink, returns_long=False - ).scalars() - if _normalize: - return [self.normalize_name(row) for row in result] - else: - return result.all() - - @reflection.cache - def get_view_names(self, connection, schema=None, dblink=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link.""" - if not schema: - schema = self.default_schema_name - - query = select(dictionary.all_views.c.view_name).where( - dictionary.all_views.c.owner - == self.denormalize_schema_name(schema) - ) - result = self._execute_reflection( - connection, query, dblink, returns_long=False - ).scalars() - return [self.normalize_name(row) for row in result] - - @reflection.cache - def get_sequence_names(self, connection, schema=None, dblink=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link.""" - if not schema: - schema = self.default_schema_name - query = select(dictionary.all_sequences.c.sequence_name).where( - dictionary.all_sequences.c.sequence_owner - == self.denormalize_schema_name(schema) - ) - - result = self._execute_reflection( - connection, query, dblink, returns_long=False - ).scalars() - return [self.normalize_name(row) for row in result] - - def _value_or_raise(self, data, table, schema): - table = self.normalize_name(str(table)) - try: - return dict(data)[(schema, table)] - except KeyError: - raise exc.NoSuchTableError( - f"{schema}.{table}" if schema else table - ) from None - - def _prepare_filter_names(self, filter_names): - if filter_names: - fn = [self.denormalize_name(name) for name in filter_names] - return True, {"filter_names": fn} - else: - return False, {} - - @reflection.cache - def get_table_options(self, connection, table_name, schema=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - data = self.get_multi_table_options( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @lru_cache() - def _table_options_query( - self, owner, scope, kind, has_filter_names, has_mat_views - ): - query = select( - dictionary.all_tables.c.table_name, - dictionary.all_tables.c.compression, - dictionary.all_tables.c.compress_for, - ).where(dictionary.all_tables.c.owner == owner) - if has_filter_names: - query = query.where( - dictionary.all_tables.c.table_name.in_( - bindparam("filter_names") - ) - ) - if scope is ObjectScope.DEFAULT: - query = query.where(dictionary.all_tables.c.duration.is_(null())) - elif scope is ObjectScope.TEMPORARY: - query = query.where( - dictionary.all_tables.c.duration.is_not(null()) - ) - - if ( - has_mat_views - and ObjectKind.TABLE in kind - and ObjectKind.MATERIALIZED_VIEW not in kind - ): - # cant use EXCEPT ALL / MINUS here because we don't have an - # excludable row vs. the query above - # outerjoin + where null works better on oracle 21 but 11 does - # not like it at all. this is the next best thing - - query = query.where( - dictionary.all_tables.c.table_name.not_in( - bindparam("mat_views") - ) - ) - elif ( - ObjectKind.TABLE not in kind - and ObjectKind.MATERIALIZED_VIEW in kind - ): - query = query.where( - dictionary.all_tables.c.table_name.in_(bindparam("mat_views")) - ) - return query - - @_handle_synonyms_decorator - def get_multi_table_options( - self, - connection, - *, - schema, - filter_names, - scope, - kind, - dblink=None, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - owner = self.denormalize_schema_name( - schema or self.default_schema_name - ) - - has_filter_names, params = self._prepare_filter_names(filter_names) - has_mat_views = False - - if ( - ObjectKind.TABLE in kind - and ObjectKind.MATERIALIZED_VIEW not in kind - ): - # see note in _table_options_query - mat_views = self.get_materialized_view_names( - connection, schema, dblink, _normalize=False, **kw - ) - if mat_views: - params["mat_views"] = mat_views - has_mat_views = True - elif ( - ObjectKind.TABLE not in kind - and ObjectKind.MATERIALIZED_VIEW in kind - ): - mat_views = self.get_materialized_view_names( - connection, schema, dblink, _normalize=False, **kw - ) - params["mat_views"] = mat_views - - options = {} - default = ReflectionDefaults.table_options - - if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind: - query = self._table_options_query( - owner, scope, kind, has_filter_names, has_mat_views - ) - result = self._execute_reflection( - connection, query, dblink, returns_long=False, params=params - ) - - for table, compression, compress_for in result: - if compression == "ENABLED": - data = {"oracle_compress": compress_for} - else: - data = default() - options[(schema, self.normalize_name(table))] = data - if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope: - # add the views (no temporary views) - for view in self.get_view_names(connection, schema, dblink, **kw): - if not filter_names or view in filter_names: - options[(schema, view)] = default() - - return options.items() - - @reflection.cache - def get_columns(self, connection, table_name, schema=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - - data = self.get_multi_columns( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - def _run_batches( - self, connection, query, dblink, returns_long, mappings, all_objects - ): - each_batch = 500 - batches = list(all_objects) - while batches: - batch = batches[0:each_batch] - batches[0:each_batch] = [] - - result = self._execute_reflection( - connection, - query, - dblink, - returns_long=returns_long, - params={"all_objects": batch}, - ) - if mappings: - yield from result.mappings() - else: - yield from result - - @lru_cache() - def _column_query(self, owner): - all_cols = dictionary.all_tab_cols - all_comments = dictionary.all_col_comments - all_ids = dictionary.all_tab_identity_cols - - if self.server_version_info >= (12,): - add_cols = ( - all_cols.c.default_on_null, - sql.case( - (all_ids.c.table_name.is_(None), sql.null()), - else_=all_ids.c.generation_type - + "," - + all_ids.c.identity_options, - ).label("identity_options"), - ) - join_identity_cols = True - else: - add_cols = ( - sql.null().label("default_on_null"), - sql.null().label("identity_options"), - ) - join_identity_cols = False - - # NOTE: on oracle cannot create tables/views without columns and - # a table cannot have all column hidden: - # ORA-54039: table must have at least one column that is not invisible - # all_tab_cols returns data for tables/views/mat-views. - # all_tab_cols does not return recycled tables - - query = ( - select( - all_cols.c.table_name, - all_cols.c.column_name, - all_cols.c.data_type, - all_cols.c.char_length, - all_cols.c.data_precision, - all_cols.c.data_scale, - all_cols.c.nullable, - all_cols.c.data_default, - all_comments.c.comments, - all_cols.c.virtual_column, - *add_cols, - ).select_from(all_cols) - # NOTE: all_col_comments has a row for each column even if no - # comment is present, so a join could be performed, but there - # seems to be no difference compared to an outer join - .outerjoin( - all_comments, - and_( - all_cols.c.table_name == all_comments.c.table_name, - all_cols.c.column_name == all_comments.c.column_name, - all_cols.c.owner == all_comments.c.owner, - ), - ) - ) - if join_identity_cols: - query = query.outerjoin( - all_ids, - and_( - all_cols.c.table_name == all_ids.c.table_name, - all_cols.c.column_name == all_ids.c.column_name, - all_cols.c.owner == all_ids.c.owner, - ), - ) - - query = query.where( - all_cols.c.table_name.in_(bindparam("all_objects")), - all_cols.c.hidden_column == "NO", - all_cols.c.owner == owner, - ).order_by(all_cols.c.table_name, all_cols.c.column_id) - return query - - @_handle_synonyms_decorator - def get_multi_columns( - self, - connection, - *, - schema, - filter_names, - scope, - kind, - dblink=None, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - owner = self.denormalize_schema_name( - schema or self.default_schema_name - ) - query = self._column_query(owner) - - if ( - filter_names - and kind is ObjectKind.ANY - and scope is ObjectScope.ANY - ): - all_objects = [self.denormalize_name(n) for n in filter_names] - else: - all_objects = self._get_all_objects( - connection, schema, scope, kind, filter_names, dblink, **kw - ) - - columns = defaultdict(list) - - # all_tab_cols.data_default is LONG - result = self._run_batches( - connection, - query, - dblink, - returns_long=True, - mappings=True, - all_objects=all_objects, - ) - - def maybe_int(value): - if isinstance(value, float) and value.is_integer(): - return int(value) - else: - return value - - remove_size = re.compile(r"\(\d+\)") - - for row_dict in result: - table_name = self.normalize_name(row_dict["table_name"]) - orig_colname = row_dict["column_name"] - colname = self.normalize_name(orig_colname) - coltype = row_dict["data_type"] - precision = maybe_int(row_dict["data_precision"]) - - if coltype == "NUMBER": - scale = maybe_int(row_dict["data_scale"]) - if precision is None and scale == 0: - coltype = INTEGER() - else: - coltype = NUMBER(precision, scale) - elif coltype == "FLOAT": - # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm - if precision == 126: - # The DOUBLE PRECISION datatype is a floating-point - # number with binary precision 126. - coltype = DOUBLE_PRECISION() - elif precision == 63: - # The REAL datatype is a floating-point number with a - # binary precision of 63, or 18 decimal. - coltype = REAL() - else: - # non standard precision - coltype = FLOAT(binary_precision=precision) - - elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"): - char_length = maybe_int(row_dict["char_length"]) - coltype = self.ischema_names.get(coltype)(char_length) - elif "WITH TIME ZONE" in coltype: - coltype = TIMESTAMP(timezone=True) - elif "WITH LOCAL TIME ZONE" in coltype: - coltype = TIMESTAMP(local_timezone=True) - else: - coltype = re.sub(remove_size, "", coltype) - try: - coltype = self.ischema_names[coltype] - except KeyError: - util.warn( - "Did not recognize type '%s' of column '%s'" - % (coltype, colname) - ) - coltype = sqltypes.NULLTYPE - - default = row_dict["data_default"] - if row_dict["virtual_column"] == "YES": - computed = dict(sqltext=default) - default = None - else: - computed = None - - identity_options = row_dict["identity_options"] - if identity_options is not None: - identity = self._parse_identity_options( - identity_options, row_dict["default_on_null"] - ) - default = None - else: - identity = None - - cdict = { - "name": colname, - "type": coltype, - "nullable": row_dict["nullable"] == "Y", - "default": default, - "comment": row_dict["comments"], - } - if orig_colname.lower() == orig_colname: - cdict["quote"] = True - if computed is not None: - cdict["computed"] = computed - if identity is not None: - cdict["identity"] = identity - - columns[(schema, table_name)].append(cdict) - - # NOTE: default not needed since all tables have columns - # default = ReflectionDefaults.columns - # return ( - # (key, value if value else default()) - # for key, value in columns.items() - # ) - return columns.items() - - def _parse_identity_options(self, identity_options, default_on_null): - # identity_options is a string that starts with 'ALWAYS,' or - # 'BY DEFAULT,' and continues with - # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1, - # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N, - # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N - parts = [p.strip() for p in identity_options.split(",")] - identity = { - "always": parts[0] == "ALWAYS", - "on_null": default_on_null == "YES", - } - - for part in parts[1:]: - option, value = part.split(":") - value = value.strip() - - if "START WITH" in option: - identity["start"] = int(value) - elif "INCREMENT BY" in option: - identity["increment"] = int(value) - elif "MAX_VALUE" in option: - identity["maxvalue"] = int(value) - elif "MIN_VALUE" in option: - identity["minvalue"] = int(value) - elif "CYCLE_FLAG" in option: - identity["cycle"] = value == "Y" - elif "CACHE_SIZE" in option: - identity["cache"] = int(value) - elif "ORDER_FLAG" in option: - identity["order"] = value == "Y" - return identity - - @reflection.cache - def get_table_comment(self, connection, table_name, schema=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - data = self.get_multi_table_comment( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @lru_cache() - def _comment_query(self, owner, scope, kind, has_filter_names): - # NOTE: all_tab_comments / all_mview_comments have a row for all - # object even if they don't have comments - queries = [] - if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind: - # all_tab_comments returns also plain views - tbl_view = select( - dictionary.all_tab_comments.c.table_name, - dictionary.all_tab_comments.c.comments, - ).where( - dictionary.all_tab_comments.c.owner == owner, - dictionary.all_tab_comments.c.table_name.not_like("BIN$%"), - ) - if ObjectKind.VIEW not in kind: - tbl_view = tbl_view.where( - dictionary.all_tab_comments.c.table_type == "TABLE" - ) - elif ObjectKind.TABLE not in kind: - tbl_view = tbl_view.where( - dictionary.all_tab_comments.c.table_type == "VIEW" - ) - queries.append(tbl_view) - if ObjectKind.MATERIALIZED_VIEW in kind: - mat_view = select( - dictionary.all_mview_comments.c.mview_name.label("table_name"), - dictionary.all_mview_comments.c.comments, - ).where( - dictionary.all_mview_comments.c.owner == owner, - dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"), - ) - queries.append(mat_view) - if len(queries) == 1: - query = queries[0] - else: - union = sql.union_all(*queries).subquery("tables_and_views") - query = select(union.c.table_name, union.c.comments) - - name_col = query.selected_columns.table_name - - if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY): - temp = "Y" if scope is ObjectScope.TEMPORARY else "N" - # need distinct since materialized view are listed also - # as tables in all_objects - query = query.distinct().join( - dictionary.all_objects, - and_( - dictionary.all_objects.c.owner == owner, - dictionary.all_objects.c.object_name == name_col, - dictionary.all_objects.c.temporary == temp, - ), - ) - if has_filter_names: - query = query.where(name_col.in_(bindparam("filter_names"))) - return query - - @_handle_synonyms_decorator - def get_multi_table_comment( - self, - connection, - *, - schema, - filter_names, - scope, - kind, - dblink=None, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - owner = self.denormalize_schema_name( - schema or self.default_schema_name - ) - has_filter_names, params = self._prepare_filter_names(filter_names) - query = self._comment_query(owner, scope, kind, has_filter_names) - - result = self._execute_reflection( - connection, query, dblink, returns_long=False, params=params - ) - default = ReflectionDefaults.table_comment - # materialized views by default seem to have a comment like - # "snapshot table for snapshot owner.mat_view_name" - ignore_mat_view = "snapshot table for snapshot " - return ( - ( - (schema, self.normalize_name(table)), - ( - {"text": comment} - if comment is not None - and not comment.startswith(ignore_mat_view) - else default() - ), - ) - for table, comment in result - ) - - @reflection.cache - def get_indexes(self, connection, table_name, schema=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - data = self.get_multi_indexes( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @lru_cache() - def _index_query(self, owner): - return ( - select( - dictionary.all_ind_columns.c.table_name, - dictionary.all_ind_columns.c.index_name, - dictionary.all_ind_columns.c.column_name, - dictionary.all_indexes.c.index_type, - dictionary.all_indexes.c.uniqueness, - dictionary.all_indexes.c.compression, - dictionary.all_indexes.c.prefix_length, - dictionary.all_ind_columns.c.descend, - dictionary.all_ind_expressions.c.column_expression, - ) - .select_from(dictionary.all_ind_columns) - .join( - dictionary.all_indexes, - sql.and_( - dictionary.all_ind_columns.c.index_name - == dictionary.all_indexes.c.index_name, - dictionary.all_ind_columns.c.index_owner - == dictionary.all_indexes.c.owner, - ), - ) - .outerjoin( - # NOTE: this adds about 20% to the query time. Using a - # case expression with a scalar subquery only when needed - # with the assumption that most indexes are not expression - # would be faster but oracle does not like that with - # LONG datatype. It errors with: - # ORA-00997: illegal use of LONG datatype - dictionary.all_ind_expressions, - sql.and_( - dictionary.all_ind_expressions.c.index_name - == dictionary.all_ind_columns.c.index_name, - dictionary.all_ind_expressions.c.index_owner - == dictionary.all_ind_columns.c.index_owner, - dictionary.all_ind_expressions.c.column_position - == dictionary.all_ind_columns.c.column_position, - ), - ) - .where( - dictionary.all_indexes.c.table_owner == owner, - dictionary.all_indexes.c.table_name.in_( - bindparam("all_objects") - ), - ) - .order_by( - dictionary.all_ind_columns.c.index_name, - dictionary.all_ind_columns.c.column_position, - ) - ) - - @reflection.flexi_cache( - ("schema", InternalTraversal.dp_string), - ("dblink", InternalTraversal.dp_string), - ("all_objects", InternalTraversal.dp_string_list), - ) - def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw): - owner = self.denormalize_schema_name( - schema or self.default_schema_name - ) - - query = self._index_query(owner) - - pks = { - row_dict["constraint_name"] - for row_dict in self._get_all_constraint_rows( - connection, schema, dblink, all_objects, **kw - ) - if row_dict["constraint_type"] == "P" - } - - # all_ind_expressions.column_expression is LONG - result = self._run_batches( - connection, - query, - dblink, - returns_long=True, - mappings=True, - all_objects=all_objects, - ) - - return [ - row_dict - for row_dict in result - if row_dict["index_name"] not in pks - ] - - @_handle_synonyms_decorator - def get_multi_indexes( - self, - connection, - *, - schema, - filter_names, - scope, - kind, - dblink=None, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - all_objects = self._get_all_objects( - connection, schema, scope, kind, filter_names, dblink, **kw - ) - - uniqueness = {"NONUNIQUE": False, "UNIQUE": True} - enabled = {"DISABLED": False, "ENABLED": True} - is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"} - - indexes = defaultdict(dict) - - for row_dict in self._get_indexes_rows( - connection, schema, dblink, all_objects, **kw - ): - index_name = self.normalize_name(row_dict["index_name"]) - table_name = self.normalize_name(row_dict["table_name"]) - table_indexes = indexes[(schema, table_name)] - - if index_name not in table_indexes: - table_indexes[index_name] = index_dict = { - "name": index_name, - "column_names": [], - "dialect_options": {}, - "unique": uniqueness.get(row_dict["uniqueness"], False), - } - do = index_dict["dialect_options"] - if row_dict["index_type"] in is_bitmap: - do["oracle_bitmap"] = True - if enabled.get(row_dict["compression"], False): - do["oracle_compress"] = row_dict["prefix_length"] - - else: - index_dict = table_indexes[index_name] - - expr = row_dict["column_expression"] - if expr is not None: - index_dict["column_names"].append(None) - if "expressions" in index_dict: - index_dict["expressions"].append(expr) - else: - index_dict["expressions"] = index_dict["column_names"][:-1] - index_dict["expressions"].append(expr) - - if row_dict["descend"].lower() != "asc": - assert row_dict["descend"].lower() == "desc" - cs = index_dict.setdefault("column_sorting", {}) - cs[expr] = ("desc",) - else: - assert row_dict["descend"].lower() == "asc" - cn = self.normalize_name(row_dict["column_name"]) - index_dict["column_names"].append(cn) - if "expressions" in index_dict: - index_dict["expressions"].append(cn) - - default = ReflectionDefaults.indexes - - return ( - (key, list(indexes[key].values()) if key in indexes else default()) - for key in ( - (schema, self.normalize_name(obj_name)) - for obj_name in all_objects - ) - ) - - @reflection.cache - def get_pk_constraint(self, connection, table_name, schema=None, **kw): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - data = self.get_multi_pk_constraint( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @lru_cache() - def _constraint_query(self, owner): - local = dictionary.all_cons_columns.alias("local") - remote = dictionary.all_cons_columns.alias("remote") - return ( - select( - dictionary.all_constraints.c.table_name, - dictionary.all_constraints.c.constraint_type, - dictionary.all_constraints.c.constraint_name, - local.c.column_name.label("local_column"), - remote.c.table_name.label("remote_table"), - remote.c.column_name.label("remote_column"), - remote.c.owner.label("remote_owner"), - dictionary.all_constraints.c.search_condition, - dictionary.all_constraints.c.delete_rule, - ) - .select_from(dictionary.all_constraints) - .join( - local, - and_( - local.c.owner == dictionary.all_constraints.c.owner, - dictionary.all_constraints.c.constraint_name - == local.c.constraint_name, - ), - ) - .outerjoin( - remote, - and_( - dictionary.all_constraints.c.r_owner == remote.c.owner, - dictionary.all_constraints.c.r_constraint_name - == remote.c.constraint_name, - or_( - remote.c.position.is_(sql.null()), - local.c.position == remote.c.position, - ), - ), - ) - .where( - dictionary.all_constraints.c.owner == owner, - dictionary.all_constraints.c.table_name.in_( - bindparam("all_objects") - ), - dictionary.all_constraints.c.constraint_type.in_( - ("R", "P", "U", "C") - ), - ) - .order_by( - dictionary.all_constraints.c.constraint_name, local.c.position - ) - ) - - @reflection.flexi_cache( - ("schema", InternalTraversal.dp_string), - ("dblink", InternalTraversal.dp_string), - ("all_objects", InternalTraversal.dp_string_list), - ) - def _get_all_constraint_rows( - self, connection, schema, dblink, all_objects, **kw - ): - owner = self.denormalize_schema_name( - schema or self.default_schema_name - ) - query = self._constraint_query(owner) - - # since the result is cached a list must be created - values = list( - self._run_batches( - connection, - query, - dblink, - returns_long=False, - mappings=True, - all_objects=all_objects, - ) - ) - return values - - @_handle_synonyms_decorator - def get_multi_pk_constraint( - self, - connection, - *, - scope, - schema, - filter_names, - kind, - dblink=None, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - all_objects = self._get_all_objects( - connection, schema, scope, kind, filter_names, dblink, **kw - ) - - primary_keys = defaultdict(dict) - default = ReflectionDefaults.pk_constraint - - for row_dict in self._get_all_constraint_rows( - connection, schema, dblink, all_objects, **kw - ): - if row_dict["constraint_type"] != "P": - continue - table_name = self.normalize_name(row_dict["table_name"]) - constraint_name = self.normalize_name(row_dict["constraint_name"]) - column_name = self.normalize_name(row_dict["local_column"]) - - table_pk = primary_keys[(schema, table_name)] - if not table_pk: - table_pk["name"] = constraint_name - table_pk["constrained_columns"] = [column_name] - else: - table_pk["constrained_columns"].append(column_name) - - return ( - (key, primary_keys[key] if key in primary_keys else default()) - for key in ( - (schema, self.normalize_name(obj_name)) - for obj_name in all_objects - ) - ) - - @reflection.cache - def get_foreign_keys( - self, - connection, - table_name, - schema=None, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - data = self.get_multi_foreign_keys( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @_handle_synonyms_decorator - def get_multi_foreign_keys( - self, - connection, - *, - scope, - schema, - filter_names, - kind, - dblink=None, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - all_objects = self._get_all_objects( - connection, schema, scope, kind, filter_names, dblink, **kw - ) - - resolve_synonyms = kw.get("oracle_resolve_synonyms", False) - - owner = self.denormalize_schema_name( - schema or self.default_schema_name - ) - - all_remote_owners = set() - fkeys = defaultdict(dict) - - for row_dict in self._get_all_constraint_rows( - connection, schema, dblink, all_objects, **kw - ): - if row_dict["constraint_type"] != "R": - continue - - table_name = self.normalize_name(row_dict["table_name"]) - constraint_name = self.normalize_name(row_dict["constraint_name"]) - table_fkey = fkeys[(schema, table_name)] - - assert constraint_name is not None - - local_column = self.normalize_name(row_dict["local_column"]) - remote_table = self.normalize_name(row_dict["remote_table"]) - remote_column = self.normalize_name(row_dict["remote_column"]) - remote_owner_orig = row_dict["remote_owner"] - remote_owner = self.normalize_name(remote_owner_orig) - if remote_owner_orig is not None: - all_remote_owners.add(remote_owner_orig) - - if remote_table is None: - # ticket 363 - if dblink and not dblink.startswith("@"): - dblink = f"@{dblink}" - util.warn( - "Got 'None' querying 'table_name' from " - f"all_cons_columns{dblink or ''} - does the user have " - "proper rights to the table?" - ) - continue - - if constraint_name not in table_fkey: - table_fkey[constraint_name] = fkey = { - "name": constraint_name, - "constrained_columns": [], - "referred_schema": None, - "referred_table": remote_table, - "referred_columns": [], - "options": {}, - } - - if resolve_synonyms: - # will be removed below - fkey["_ref_schema"] = remote_owner - - if schema is not None or remote_owner_orig != owner: - fkey["referred_schema"] = remote_owner - - delete_rule = row_dict["delete_rule"] - if delete_rule != "NO ACTION": - fkey["options"]["ondelete"] = delete_rule - - else: - fkey = table_fkey[constraint_name] - - fkey["constrained_columns"].append(local_column) - fkey["referred_columns"].append(remote_column) - - if resolve_synonyms and all_remote_owners: - query = select( - dictionary.all_synonyms.c.owner, - dictionary.all_synonyms.c.table_name, - dictionary.all_synonyms.c.table_owner, - dictionary.all_synonyms.c.synonym_name, - ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners)) - - result = self._execute_reflection( - connection, query, dblink, returns_long=False - ).mappings() - - remote_owners_lut = {} - for row in result: - synonym_owner = self.normalize_name(row["owner"]) - table_name = self.normalize_name(row["table_name"]) - - remote_owners_lut[(synonym_owner, table_name)] = ( - row["table_owner"], - row["synonym_name"], - ) - - empty = (None, None) - for table_fkeys in fkeys.values(): - for table_fkey in table_fkeys.values(): - key = ( - table_fkey.pop("_ref_schema"), - table_fkey["referred_table"], - ) - remote_owner, syn_name = remote_owners_lut.get(key, empty) - if syn_name: - sn = self.normalize_name(syn_name) - table_fkey["referred_table"] = sn - if schema is not None or remote_owner != owner: - ro = self.normalize_name(remote_owner) - table_fkey["referred_schema"] = ro - else: - table_fkey["referred_schema"] = None - default = ReflectionDefaults.foreign_keys - - return ( - (key, list(fkeys[key].values()) if key in fkeys else default()) - for key in ( - (schema, self.normalize_name(obj_name)) - for obj_name in all_objects - ) - ) - - @reflection.cache - def get_unique_constraints( - self, connection, table_name, schema=None, **kw - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - data = self.get_multi_unique_constraints( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @_handle_synonyms_decorator - def get_multi_unique_constraints( - self, - connection, - *, - scope, - schema, - filter_names, - kind, - dblink=None, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - all_objects = self._get_all_objects( - connection, schema, scope, kind, filter_names, dblink, **kw - ) - - unique_cons = defaultdict(dict) - - index_names = { - row_dict["index_name"] - for row_dict in self._get_indexes_rows( - connection, schema, dblink, all_objects, **kw - ) - } - - for row_dict in self._get_all_constraint_rows( - connection, schema, dblink, all_objects, **kw - ): - if row_dict["constraint_type"] != "U": - continue - table_name = self.normalize_name(row_dict["table_name"]) - constraint_name_orig = row_dict["constraint_name"] - constraint_name = self.normalize_name(constraint_name_orig) - column_name = self.normalize_name(row_dict["local_column"]) - table_uc = unique_cons[(schema, table_name)] - - assert constraint_name is not None - - if constraint_name not in table_uc: - table_uc[constraint_name] = uc = { - "name": constraint_name, - "column_names": [], - "duplicates_index": ( - constraint_name - if constraint_name_orig in index_names - else None - ), - } - else: - uc = table_uc[constraint_name] - - uc["column_names"].append(column_name) - - default = ReflectionDefaults.unique_constraints - - return ( - ( - key, - ( - list(unique_cons[key].values()) - if key in unique_cons - else default() - ), - ) - for key in ( - (schema, self.normalize_name(obj_name)) - for obj_name in all_objects - ) - ) - - @reflection.cache - def get_view_definition( - self, - connection, - view_name, - schema=None, - dblink=None, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - if kw.get("oracle_resolve_synonyms", False): - synonyms = self._get_synonyms( - connection, schema, filter_names=[view_name], dblink=dblink - ) - if synonyms: - assert len(synonyms) == 1 - row_dict = synonyms[0] - dblink = self.normalize_name(row_dict["db_link"]) - schema = row_dict["table_owner"] - view_name = row_dict["table_name"] - - name = self.denormalize_name(view_name) - owner = self.denormalize_schema_name( - schema or self.default_schema_name - ) - query = ( - select(dictionary.all_views.c.text) - .where( - dictionary.all_views.c.view_name == name, - dictionary.all_views.c.owner == owner, - ) - .union_all( - select(dictionary.all_mviews.c.query).where( - dictionary.all_mviews.c.mview_name == name, - dictionary.all_mviews.c.owner == owner, - ) - ) - ) - - rp = self._execute_reflection( - connection, query, dblink, returns_long=False - ).scalar() - if rp is None: - raise exc.NoSuchTableError( - f"{schema}.{view_name}" if schema else view_name - ) - else: - return rp - - @reflection.cache - def get_check_constraints( - self, connection, table_name, schema=None, include_all=False, **kw - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - data = self.get_multi_check_constraints( - connection, - schema=schema, - filter_names=[table_name], - scope=ObjectScope.ANY, - include_all=include_all, - kind=ObjectKind.ANY, - **kw, - ) - return self._value_or_raise(data, table_name, schema) - - @_handle_synonyms_decorator - def get_multi_check_constraints( - self, - connection, - *, - schema, - filter_names, - dblink=None, - scope, - kind, - include_all=False, - **kw, - ): - """Supported kw arguments are: ``dblink`` to reflect via a db link; - ``oracle_resolve_synonyms`` to resolve names to synonyms - """ - all_objects = self._get_all_objects( - connection, schema, scope, kind, filter_names, dblink, **kw - ) - - not_null = re.compile(r"..+?. IS NOT NULL$") - - check_constraints = defaultdict(list) - - for row_dict in self._get_all_constraint_rows( - connection, schema, dblink, all_objects, **kw - ): - if row_dict["constraint_type"] != "C": - continue - table_name = self.normalize_name(row_dict["table_name"]) - constraint_name = self.normalize_name(row_dict["constraint_name"]) - search_condition = row_dict["search_condition"] - - table_checks = check_constraints[(schema, table_name)] - if constraint_name is not None and ( - include_all or not not_null.match(search_condition) - ): - table_checks.append( - {"name": constraint_name, "sqltext": search_condition} - ) - - default = ReflectionDefaults.check_constraints - - return ( - ( - key, - ( - check_constraints[key] - if key in check_constraints - else default() - ), - ) - for key in ( - (schema, self.normalize_name(obj_name)) - for obj_name in all_objects - ) - ) - - def _list_dblinks(self, connection, dblink=None): - query = select(dictionary.all_db_links.c.db_link) - links = self._execute_reflection( - connection, query, dblink, returns_long=False - ).scalars() - return [self.normalize_name(link) for link in links] - - -class _OuterJoinColumn(sql.ClauseElement): - __visit_name__ = "outer_join_column" - - def __init__(self, column): - self.column = column diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py deleted file mode 100644 index 9346224..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/cx_oracle.py +++ /dev/null @@ -1,1492 +0,0 @@ -# dialects/oracle/cx_oracle.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -r""" -.. dialect:: oracle+cx_oracle - :name: cx-Oracle - :dbapi: cx_oracle - :connectstring: oracle+cx_oracle://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]] - :url: https://oracle.github.io/python-cx_Oracle/ - -DSN vs. Hostname connections ------------------------------ - -cx_Oracle provides several methods of indicating the target database. The -dialect translates from a series of different URL forms. - -Hostname Connections with Easy Connect Syntax -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Given a hostname, port and service name of the target Oracle Database, for -example from Oracle's `Easy Connect syntax -<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#easy-connect-syntax-for-connection-strings>`_, -then connect in SQLAlchemy using the ``service_name`` query string parameter:: - - engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:port/?service_name=myservice&encoding=UTF-8&nencoding=UTF-8") - -The `full Easy Connect syntax -<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B0437826-43C1-49EC-A94D-B650B6A4A6EE>`_ -is not supported. Instead, use a ``tnsnames.ora`` file and connect using a -DSN. - -Connections with tnsnames.ora or Oracle Cloud -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -Alternatively, if no port, database name, or ``service_name`` is provided, the -dialect will use an Oracle DSN "connection string". This takes the "hostname" -portion of the URL as the data source name. For example, if the -``tnsnames.ora`` file contains a `Net Service Name -<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#net-service-names-for-connection-strings>`_ -of ``myalias`` as below:: - - myalias = - (DESCRIPTION = - (ADDRESS = (PROTOCOL = TCP)(HOST = mymachine.example.com)(PORT = 1521)) - (CONNECT_DATA = - (SERVER = DEDICATED) - (SERVICE_NAME = orclpdb1) - ) - ) - -The cx_Oracle dialect connects to this database service when ``myalias`` is the -hostname portion of the URL, without specifying a port, database name or -``service_name``:: - - engine = create_engine("oracle+cx_oracle://scott:tiger@myalias/?encoding=UTF-8&nencoding=UTF-8") - -Users of Oracle Cloud should use this syntax and also configure the cloud -wallet as shown in cx_Oracle documentation `Connecting to Autononmous Databases -<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#connecting-to-autononmous-databases>`_. - -SID Connections -^^^^^^^^^^^^^^^ - -To use Oracle's obsolete SID connection syntax, the SID can be passed in a -"database name" portion of the URL as below:: - - engine = create_engine("oracle+cx_oracle://scott:tiger@hostname:1521/dbname?encoding=UTF-8&nencoding=UTF-8") - -Above, the DSN passed to cx_Oracle is created by ``cx_Oracle.makedsn()`` as -follows:: - - >>> import cx_Oracle - >>> cx_Oracle.makedsn("hostname", 1521, sid="dbname") - '(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=hostname)(PORT=1521))(CONNECT_DATA=(SID=dbname)))' - -Passing cx_Oracle connect arguments ------------------------------------ - -Additional connection arguments can usually be passed via the URL -query string; particular symbols like ``cx_Oracle.SYSDBA`` are intercepted -and converted to the correct symbol:: - - e = create_engine( - "oracle+cx_oracle://user:pass@dsn?encoding=UTF-8&nencoding=UTF-8&mode=SYSDBA&events=true") - -.. versionchanged:: 1.3 the cx_oracle dialect now accepts all argument names - within the URL string itself, to be passed to the cx_Oracle DBAPI. As - was the case earlier but not correctly documented, the - :paramref:`_sa.create_engine.connect_args` parameter also accepts all - cx_Oracle DBAPI connect arguments. - -To pass arguments directly to ``.connect()`` without using the query -string, use the :paramref:`_sa.create_engine.connect_args` dictionary. -Any cx_Oracle parameter value and/or constant may be passed, such as:: - - import cx_Oracle - e = create_engine( - "oracle+cx_oracle://user:pass@dsn", - connect_args={ - "encoding": "UTF-8", - "nencoding": "UTF-8", - "mode": cx_Oracle.SYSDBA, - "events": True - } - ) - -Note that the default value for ``encoding`` and ``nencoding`` was changed to -"UTF-8" in cx_Oracle 8.0 so these parameters can be omitted when using that -version, or later. - -Options consumed by the SQLAlchemy cx_Oracle dialect outside of the driver --------------------------------------------------------------------------- - -There are also options that are consumed by the SQLAlchemy cx_oracle dialect -itself. These options are always passed directly to :func:`_sa.create_engine` -, such as:: - - e = create_engine( - "oracle+cx_oracle://user:pass@dsn", coerce_to_decimal=False) - -The parameters accepted by the cx_oracle dialect are as follows: - -* ``arraysize`` - set the cx_oracle.arraysize value on cursors; defaults - to ``None``, indicating that the driver default should be used (typically - the value is 100). This setting controls how many rows are buffered when - fetching rows, and can have a significant effect on performance when - modified. The setting is used for both ``cx_Oracle`` as well as - ``oracledb``. - - .. versionchanged:: 2.0.26 - changed the default value from 50 to None, - to use the default value of the driver itself. - -* ``auto_convert_lobs`` - defaults to True; See :ref:`cx_oracle_lob`. - -* ``coerce_to_decimal`` - see :ref:`cx_oracle_numeric` for detail. - -* ``encoding_errors`` - see :ref:`cx_oracle_unicode_encoding_errors` for detail. - -.. _cx_oracle_sessionpool: - -Using cx_Oracle SessionPool ---------------------------- - -The cx_Oracle library provides its own connection pool implementation that may -be used in place of SQLAlchemy's pooling functionality. This can be achieved -by using the :paramref:`_sa.create_engine.creator` parameter to provide a -function that returns a new connection, along with setting -:paramref:`_sa.create_engine.pool_class` to ``NullPool`` to disable -SQLAlchemy's pooling:: - - import cx_Oracle - from sqlalchemy import create_engine - from sqlalchemy.pool import NullPool - - pool = cx_Oracle.SessionPool( - user="scott", password="tiger", dsn="orclpdb", - min=2, max=5, increment=1, threaded=True, - encoding="UTF-8", nencoding="UTF-8" - ) - - engine = create_engine("oracle+cx_oracle://", creator=pool.acquire, poolclass=NullPool) - -The above engine may then be used normally where cx_Oracle's pool handles -connection pooling:: - - with engine.connect() as conn: - print(conn.scalar("select 1 FROM dual")) - - -As well as providing a scalable solution for multi-user applications, the -cx_Oracle session pool supports some Oracle features such as DRCP and -`Application Continuity -<https://cx-oracle.readthedocs.io/en/latest/user_guide/ha.html#application-continuity-ac>`_. - -Using Oracle Database Resident Connection Pooling (DRCP) --------------------------------------------------------- - -When using Oracle's `DRCP -<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-015CA8C1-2386-4626-855D-CC546DDC1086>`_, -the best practice is to pass a connection class and "purity" when acquiring a -connection from the SessionPool. Refer to the `cx_Oracle DRCP documentation -<https://cx-oracle.readthedocs.io/en/latest/user_guide/connection_handling.html#database-resident-connection-pooling-drcp>`_. - -This can be achieved by wrapping ``pool.acquire()``:: - - import cx_Oracle - from sqlalchemy import create_engine - from sqlalchemy.pool import NullPool - - pool = cx_Oracle.SessionPool( - user="scott", password="tiger", dsn="orclpdb", - min=2, max=5, increment=1, threaded=True, - encoding="UTF-8", nencoding="UTF-8" - ) - - def creator(): - return pool.acquire(cclass="MYCLASS", purity=cx_Oracle.ATTR_PURITY_SELF) - - engine = create_engine("oracle+cx_oracle://", creator=creator, poolclass=NullPool) - -The above engine may then be used normally where cx_Oracle handles session -pooling and Oracle Database additionally uses DRCP:: - - with engine.connect() as conn: - print(conn.scalar("select 1 FROM dual")) - -.. _cx_oracle_unicode: - -Unicode -------- - -As is the case for all DBAPIs under Python 3, all strings are inherently -Unicode strings. In all cases however, the driver requires an explicit -encoding configuration. - -Ensuring the Correct Client Encoding -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The long accepted standard for establishing client encoding for nearly all -Oracle related software is via the `NLS_LANG <https://www.oracle.com/database/technologies/faq-nls-lang.html>`_ -environment variable. cx_Oracle like most other Oracle drivers will use -this environment variable as the source of its encoding configuration. The -format of this variable is idiosyncratic; a typical value would be -``AMERICAN_AMERICA.AL32UTF8``. - -The cx_Oracle driver also supports a programmatic alternative which is to -pass the ``encoding`` and ``nencoding`` parameters directly to its -``.connect()`` function. These can be present in the URL as follows:: - - engine = create_engine("oracle+cx_oracle://scott:tiger@orclpdb/?encoding=UTF-8&nencoding=UTF-8") - -For the meaning of the ``encoding`` and ``nencoding`` parameters, please -consult -`Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_. - -.. seealso:: - - `Characters Sets and National Language Support (NLS) <https://cx-oracle.readthedocs.io/en/latest/user_guide/globalization.html#globalization>`_ - - in the cx_Oracle documentation. - - -Unicode-specific Column datatypes -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The Core expression language handles unicode data by use of the :class:`.Unicode` -and :class:`.UnicodeText` -datatypes. These types correspond to the VARCHAR2 and CLOB Oracle datatypes by -default. When using these datatypes with Unicode data, it is expected that -the Oracle database is configured with a Unicode-aware character set, as well -as that the ``NLS_LANG`` environment variable is set appropriately, so that -the VARCHAR2 and CLOB datatypes can accommodate the data. - -In the case that the Oracle database is not configured with a Unicode character -set, the two options are to use the :class:`_types.NCHAR` and -:class:`_oracle.NCLOB` datatypes explicitly, or to pass the flag -``use_nchar_for_unicode=True`` to :func:`_sa.create_engine`, -which will cause the -SQLAlchemy dialect to use NCHAR/NCLOB for the :class:`.Unicode` / -:class:`.UnicodeText` datatypes instead of VARCHAR/CLOB. - -.. versionchanged:: 1.3 The :class:`.Unicode` and :class:`.UnicodeText` - datatypes now correspond to the ``VARCHAR2`` and ``CLOB`` Oracle datatypes - unless the ``use_nchar_for_unicode=True`` is passed to the dialect - when :func:`_sa.create_engine` is called. - - -.. _cx_oracle_unicode_encoding_errors: - -Encoding Errors -^^^^^^^^^^^^^^^ - -For the unusual case that data in the Oracle database is present with a broken -encoding, the dialect accepts a parameter ``encoding_errors`` which will be -passed to Unicode decoding functions in order to affect how decoding errors are -handled. The value is ultimately consumed by the Python `decode -<https://docs.python.org/3/library/stdtypes.html#bytes.decode>`_ function, and -is passed both via cx_Oracle's ``encodingErrors`` parameter consumed by -``Cursor.var()``, as well as SQLAlchemy's own decoding function, as the -cx_Oracle dialect makes use of both under different circumstances. - -.. versionadded:: 1.3.11 - - -.. _cx_oracle_setinputsizes: - -Fine grained control over cx_Oracle data binding performance with setinputsizes -------------------------------------------------------------------------------- - -The cx_Oracle DBAPI has a deep and fundamental reliance upon the usage of the -DBAPI ``setinputsizes()`` call. The purpose of this call is to establish the -datatypes that are bound to a SQL statement for Python values being passed as -parameters. While virtually no other DBAPI assigns any use to the -``setinputsizes()`` call, the cx_Oracle DBAPI relies upon it heavily in its -interactions with the Oracle client interface, and in some scenarios it is not -possible for SQLAlchemy to know exactly how data should be bound, as some -settings can cause profoundly different performance characteristics, while -altering the type coercion behavior at the same time. - -Users of the cx_Oracle dialect are **strongly encouraged** to read through -cx_Oracle's list of built-in datatype symbols at -https://cx-oracle.readthedocs.io/en/latest/api_manual/module.html#database-types. -Note that in some cases, significant performance degradation can occur when -using these types vs. not, in particular when specifying ``cx_Oracle.CLOB``. - -On the SQLAlchemy side, the :meth:`.DialectEvents.do_setinputsizes` event can -be used both for runtime visibility (e.g. logging) of the setinputsizes step as -well as to fully control how ``setinputsizes()`` is used on a per-statement -basis. - -.. versionadded:: 1.2.9 Added :meth:`.DialectEvents.setinputsizes` - - -Example 1 - logging all setinputsizes calls -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The following example illustrates how to log the intermediary values from a -SQLAlchemy perspective before they are converted to the raw ``setinputsizes()`` -parameter dictionary. The keys of the dictionary are :class:`.BindParameter` -objects which have a ``.key`` and a ``.type`` attribute:: - - from sqlalchemy import create_engine, event - - engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe") - - @event.listens_for(engine, "do_setinputsizes") - def _log_setinputsizes(inputsizes, cursor, statement, parameters, context): - for bindparam, dbapitype in inputsizes.items(): - log.info( - "Bound parameter name: %s SQLAlchemy type: %r " - "DBAPI object: %s", - bindparam.key, bindparam.type, dbapitype) - -Example 2 - remove all bindings to CLOB -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -The ``CLOB`` datatype in cx_Oracle incurs a significant performance overhead, -however is set by default for the ``Text`` type within the SQLAlchemy 1.2 -series. This setting can be modified as follows:: - - from sqlalchemy import create_engine, event - from cx_Oracle import CLOB - - engine = create_engine("oracle+cx_oracle://scott:tiger@host/xe") - - @event.listens_for(engine, "do_setinputsizes") - def _remove_clob(inputsizes, cursor, statement, parameters, context): - for bindparam, dbapitype in list(inputsizes.items()): - if dbapitype is CLOB: - del inputsizes[bindparam] - -.. _cx_oracle_returning: - -RETURNING Support ------------------ - -The cx_Oracle dialect implements RETURNING using OUT parameters. -The dialect supports RETURNING fully. - -.. _cx_oracle_lob: - -LOB Datatypes --------------- - -LOB datatypes refer to the "large object" datatypes such as CLOB, NCLOB and -BLOB. Modern versions of cx_Oracle and oracledb are optimized for these -datatypes to be delivered as a single buffer. As such, SQLAlchemy makes use of -these newer type handlers by default. - -To disable the use of newer type handlers and deliver LOB objects as classic -buffered objects with a ``read()`` method, the parameter -``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`, -which takes place only engine-wide. - -Two Phase Transactions Not Supported -------------------------------------- - -Two phase transactions are **not supported** under cx_Oracle due to poor -driver support. As of cx_Oracle 6.0b1, the interface for -two phase transactions has been changed to be more of a direct pass-through -to the underlying OCI layer with less automation. The additional logic -to support this system is not implemented in SQLAlchemy. - -.. _cx_oracle_numeric: - -Precision Numerics ------------------- - -SQLAlchemy's numeric types can handle receiving and returning values as Python -``Decimal`` objects or float objects. When a :class:`.Numeric` object, or a -subclass such as :class:`.Float`, :class:`_oracle.DOUBLE_PRECISION` etc. is in -use, the :paramref:`.Numeric.asdecimal` flag determines if values should be -coerced to ``Decimal`` upon return, or returned as float objects. To make -matters more complicated under Oracle, Oracle's ``NUMBER`` type can also -represent integer values if the "scale" is zero, so the Oracle-specific -:class:`_oracle.NUMBER` type takes this into account as well. - -The cx_Oracle dialect makes extensive use of connection- and cursor-level -"outputtypehandler" callables in order to coerce numeric values as requested. -These callables are specific to the specific flavor of :class:`.Numeric` in -use, as well as if no SQLAlchemy typing objects are present. There are -observed scenarios where Oracle may sends incomplete or ambiguous information -about the numeric types being returned, such as a query where the numeric types -are buried under multiple levels of subquery. The type handlers do their best -to make the right decision in all cases, deferring to the underlying cx_Oracle -DBAPI for all those cases where the driver can make the best decision. - -When no typing objects are present, as when executing plain SQL strings, a -default "outputtypehandler" is present which will generally return numeric -values which specify precision and scale as Python ``Decimal`` objects. To -disable this coercion to decimal for performance reasons, pass the flag -``coerce_to_decimal=False`` to :func:`_sa.create_engine`:: - - engine = create_engine("oracle+cx_oracle://dsn", coerce_to_decimal=False) - -The ``coerce_to_decimal`` flag only impacts the results of plain string -SQL statements that are not otherwise associated with a :class:`.Numeric` -SQLAlchemy type (or a subclass of such). - -.. versionchanged:: 1.2 The numeric handling system for cx_Oracle has been - reworked to take advantage of newer cx_Oracle features as well - as better integration of outputtypehandlers. - -""" # noqa -from __future__ import annotations - -import decimal -import random -import re - -from . import base as oracle -from .base import OracleCompiler -from .base import OracleDialect -from .base import OracleExecutionContext -from .types import _OracleDateLiteralRender -from ... import exc -from ... import util -from ...engine import cursor as _cursor -from ...engine import interfaces -from ...engine import processors -from ...sql import sqltypes -from ...sql._typing import is_sql_compiler - -# source: -# https://github.com/oracle/python-cx_Oracle/issues/596#issuecomment-999243649 -_CX_ORACLE_MAGIC_LOB_SIZE = 131072 - - -class _OracleInteger(sqltypes.Integer): - def get_dbapi_type(self, dbapi): - # see https://github.com/oracle/python-cx_Oracle/issues/ - # 208#issuecomment-409715955 - return int - - def _cx_oracle_var(self, dialect, cursor, arraysize=None): - cx_Oracle = dialect.dbapi - return cursor.var( - cx_Oracle.STRING, - 255, - arraysize=arraysize if arraysize is not None else cursor.arraysize, - outconverter=int, - ) - - def _cx_oracle_outputtypehandler(self, dialect): - def handler(cursor, name, default_type, size, precision, scale): - return self._cx_oracle_var(dialect, cursor) - - return handler - - -class _OracleNumeric(sqltypes.Numeric): - is_number = False - - def bind_processor(self, dialect): - if self.scale == 0: - return None - elif self.asdecimal: - processor = processors.to_decimal_processor_factory( - decimal.Decimal, self._effective_decimal_return_scale - ) - - def process(value): - if isinstance(value, (int, float)): - return processor(value) - elif value is not None and value.is_infinite(): - return float(value) - else: - return value - - return process - else: - return processors.to_float - - def result_processor(self, dialect, coltype): - return None - - def _cx_oracle_outputtypehandler(self, dialect): - cx_Oracle = dialect.dbapi - - def handler(cursor, name, default_type, size, precision, scale): - outconverter = None - - if precision: - if self.asdecimal: - if default_type == cx_Oracle.NATIVE_FLOAT: - # receiving float and doing Decimal after the fact - # allows for float("inf") to be handled - type_ = default_type - outconverter = decimal.Decimal - else: - type_ = decimal.Decimal - else: - if self.is_number and scale == 0: - # integer. cx_Oracle is observed to handle the widest - # variety of ints when no directives are passed, - # from 5.2 to 7.0. See [ticket:4457] - return None - else: - type_ = cx_Oracle.NATIVE_FLOAT - - else: - if self.asdecimal: - if default_type == cx_Oracle.NATIVE_FLOAT: - type_ = default_type - outconverter = decimal.Decimal - else: - type_ = decimal.Decimal - else: - if self.is_number and scale == 0: - # integer. cx_Oracle is observed to handle the widest - # variety of ints when no directives are passed, - # from 5.2 to 7.0. See [ticket:4457] - return None - else: - type_ = cx_Oracle.NATIVE_FLOAT - - return cursor.var( - type_, - 255, - arraysize=cursor.arraysize, - outconverter=outconverter, - ) - - return handler - - -class _OracleUUID(sqltypes.Uuid): - def get_dbapi_type(self, dbapi): - return dbapi.STRING - - -class _OracleBinaryFloat(_OracleNumeric): - def get_dbapi_type(self, dbapi): - return dbapi.NATIVE_FLOAT - - -class _OracleBINARY_FLOAT(_OracleBinaryFloat, oracle.BINARY_FLOAT): - pass - - -class _OracleBINARY_DOUBLE(_OracleBinaryFloat, oracle.BINARY_DOUBLE): - pass - - -class _OracleNUMBER(_OracleNumeric): - is_number = True - - -class _CXOracleDate(oracle._OracleDate): - def bind_processor(self, dialect): - return None - - def result_processor(self, dialect, coltype): - def process(value): - if value is not None: - return value.date() - else: - return value - - return process - - -class _CXOracleTIMESTAMP(_OracleDateLiteralRender, sqltypes.TIMESTAMP): - def literal_processor(self, dialect): - return self._literal_processor_datetime(dialect) - - -class _LOBDataType: - pass - - -# TODO: the names used across CHAR / VARCHAR / NCHAR / NVARCHAR -# here are inconsistent and not very good -class _OracleChar(sqltypes.CHAR): - def get_dbapi_type(self, dbapi): - return dbapi.FIXED_CHAR - - -class _OracleNChar(sqltypes.NCHAR): - def get_dbapi_type(self, dbapi): - return dbapi.FIXED_NCHAR - - -class _OracleUnicodeStringNCHAR(oracle.NVARCHAR2): - def get_dbapi_type(self, dbapi): - return dbapi.NCHAR - - -class _OracleUnicodeStringCHAR(sqltypes.Unicode): - def get_dbapi_type(self, dbapi): - return dbapi.LONG_STRING - - -class _OracleUnicodeTextNCLOB(_LOBDataType, oracle.NCLOB): - def get_dbapi_type(self, dbapi): - # previously, this was dbapi.NCLOB. - # DB_TYPE_NVARCHAR will instead be passed to setinputsizes() - # when this datatype is used. - return dbapi.DB_TYPE_NVARCHAR - - -class _OracleUnicodeTextCLOB(_LOBDataType, sqltypes.UnicodeText): - def get_dbapi_type(self, dbapi): - # previously, this was dbapi.CLOB. - # DB_TYPE_NVARCHAR will instead be passed to setinputsizes() - # when this datatype is used. - return dbapi.DB_TYPE_NVARCHAR - - -class _OracleText(_LOBDataType, sqltypes.Text): - def get_dbapi_type(self, dbapi): - # previously, this was dbapi.CLOB. - # DB_TYPE_NVARCHAR will instead be passed to setinputsizes() - # when this datatype is used. - return dbapi.DB_TYPE_NVARCHAR - - -class _OracleLong(_LOBDataType, oracle.LONG): - def get_dbapi_type(self, dbapi): - return dbapi.LONG_STRING - - -class _OracleString(sqltypes.String): - pass - - -class _OracleEnum(sqltypes.Enum): - def bind_processor(self, dialect): - enum_proc = sqltypes.Enum.bind_processor(self, dialect) - - def process(value): - raw_str = enum_proc(value) - return raw_str - - return process - - -class _OracleBinary(_LOBDataType, sqltypes.LargeBinary): - def get_dbapi_type(self, dbapi): - # previously, this was dbapi.BLOB. - # DB_TYPE_RAW will instead be passed to setinputsizes() - # when this datatype is used. - return dbapi.DB_TYPE_RAW - - def bind_processor(self, dialect): - return None - - def result_processor(self, dialect, coltype): - if not dialect.auto_convert_lobs: - return None - else: - return super().result_processor(dialect, coltype) - - -class _OracleInterval(oracle.INTERVAL): - def get_dbapi_type(self, dbapi): - return dbapi.INTERVAL - - -class _OracleRaw(oracle.RAW): - pass - - -class _OracleRowid(oracle.ROWID): - def get_dbapi_type(self, dbapi): - return dbapi.ROWID - - -class OracleCompiler_cx_oracle(OracleCompiler): - _oracle_cx_sql_compiler = True - - _oracle_returning = False - - # Oracle bind names can't start with digits or underscores. - # currently we rely upon Oracle-specific quoting of bind names in most - # cases. however for expanding params, the escape chars are used. - # see #8708 - bindname_escape_characters = util.immutabledict( - { - "%": "P", - "(": "A", - ")": "Z", - ":": "C", - ".": "C", - "[": "C", - "]": "C", - " ": "C", - "\\": "C", - "/": "C", - "?": "C", - } - ) - - def bindparam_string(self, name, **kw): - quote = getattr(name, "quote", None) - if ( - quote is True - or quote is not False - and self.preparer._bindparam_requires_quotes(name) - # bind param quoting for Oracle doesn't work with post_compile - # params. For those, the default bindparam_string will escape - # special chars, and the appending of a number "_1" etc. will - # take care of reserved words - and not kw.get("post_compile", False) - ): - # interesting to note about expanding parameters - since the - # new parameters take the form <paramname>_<int>, at least if - # they are originally formed from reserved words, they no longer - # need quoting :). names that include illegal characters - # won't work however. - quoted_name = '"%s"' % name - kw["escaped_from"] = name - name = quoted_name - return OracleCompiler.bindparam_string(self, name, **kw) - - # TODO: we could likely do away with quoting altogether for - # Oracle parameters and use the custom escaping here - escaped_from = kw.get("escaped_from", None) - if not escaped_from: - if self._bind_translate_re.search(name): - # not quite the translate use case as we want to - # also get a quick boolean if we even found - # unusual characters in the name - new_name = self._bind_translate_re.sub( - lambda m: self._bind_translate_chars[m.group(0)], - name, - ) - if new_name[0].isdigit() or new_name[0] == "_": - new_name = "D" + new_name - kw["escaped_from"] = name - name = new_name - elif name[0].isdigit() or name[0] == "_": - new_name = "D" + name - kw["escaped_from"] = name - name = new_name - - return OracleCompiler.bindparam_string(self, name, **kw) - - -class OracleExecutionContext_cx_oracle(OracleExecutionContext): - out_parameters = None - - def _generate_out_parameter_vars(self): - # check for has_out_parameters or RETURNING, create cx_Oracle.var - # objects if so - if self.compiled.has_out_parameters or self.compiled._oracle_returning: - out_parameters = self.out_parameters - assert out_parameters is not None - - len_params = len(self.parameters) - - quoted_bind_names = self.compiled.escaped_bind_names - for bindparam in self.compiled.binds.values(): - if bindparam.isoutparam: - name = self.compiled.bind_names[bindparam] - type_impl = bindparam.type.dialect_impl(self.dialect) - - if hasattr(type_impl, "_cx_oracle_var"): - out_parameters[name] = type_impl._cx_oracle_var( - self.dialect, self.cursor, arraysize=len_params - ) - else: - dbtype = type_impl.get_dbapi_type(self.dialect.dbapi) - - cx_Oracle = self.dialect.dbapi - - assert cx_Oracle is not None - - if dbtype is None: - raise exc.InvalidRequestError( - "Cannot create out parameter for " - "parameter " - "%r - its type %r is not supported by" - " cx_oracle" % (bindparam.key, bindparam.type) - ) - - # note this is an OUT parameter. Using - # non-LOB datavalues with large unicode-holding - # values causes the failure (both cx_Oracle and - # oracledb): - # ORA-22835: Buffer too small for CLOB to CHAR or - # BLOB to RAW conversion (actual: 16507, - # maximum: 4000) - # [SQL: INSERT INTO long_text (x, y, z) VALUES - # (:x, :y, :z) RETURNING long_text.x, long_text.y, - # long_text.z INTO :ret_0, :ret_1, :ret_2] - # so even for DB_TYPE_NVARCHAR we convert to a LOB - - if isinstance(type_impl, _LOBDataType): - if dbtype == cx_Oracle.DB_TYPE_NVARCHAR: - dbtype = cx_Oracle.NCLOB - elif dbtype == cx_Oracle.DB_TYPE_RAW: - dbtype = cx_Oracle.BLOB - # other LOB types go in directly - - out_parameters[name] = self.cursor.var( - dbtype, - # this is fine also in oracledb_async since - # the driver will await the read coroutine - outconverter=lambda value: value.read(), - arraysize=len_params, - ) - elif ( - isinstance(type_impl, _OracleNumeric) - and type_impl.asdecimal - ): - out_parameters[name] = self.cursor.var( - decimal.Decimal, - arraysize=len_params, - ) - - else: - out_parameters[name] = self.cursor.var( - dbtype, arraysize=len_params - ) - - for param in self.parameters: - param[quoted_bind_names.get(name, name)] = ( - out_parameters[name] - ) - - def _generate_cursor_outputtype_handler(self): - output_handlers = {} - - for keyname, name, objects, type_ in self.compiled._result_columns: - handler = type_._cached_custom_processor( - self.dialect, - "cx_oracle_outputtypehandler", - self._get_cx_oracle_type_handler, - ) - - if handler: - denormalized_name = self.dialect.denormalize_name(keyname) - output_handlers[denormalized_name] = handler - - if output_handlers: - default_handler = self._dbapi_connection.outputtypehandler - - def output_type_handler( - cursor, name, default_type, size, precision, scale - ): - if name in output_handlers: - return output_handlers[name]( - cursor, name, default_type, size, precision, scale - ) - else: - return default_handler( - cursor, name, default_type, size, precision, scale - ) - - self.cursor.outputtypehandler = output_type_handler - - def _get_cx_oracle_type_handler(self, impl): - if hasattr(impl, "_cx_oracle_outputtypehandler"): - return impl._cx_oracle_outputtypehandler(self.dialect) - else: - return None - - def pre_exec(self): - super().pre_exec() - if not getattr(self.compiled, "_oracle_cx_sql_compiler", False): - return - - self.out_parameters = {} - - self._generate_out_parameter_vars() - - self._generate_cursor_outputtype_handler() - - def post_exec(self): - if ( - self.compiled - and is_sql_compiler(self.compiled) - and self.compiled._oracle_returning - ): - initial_buffer = self.fetchall_for_returning( - self.cursor, _internal=True - ) - - fetch_strategy = _cursor.FullyBufferedCursorFetchStrategy( - self.cursor, - [ - (entry.keyname, None) - for entry in self.compiled._result_columns - ], - initial_buffer=initial_buffer, - ) - - self.cursor_fetch_strategy = fetch_strategy - - def create_cursor(self): - c = self._dbapi_connection.cursor() - if self.dialect.arraysize: - c.arraysize = self.dialect.arraysize - - return c - - def fetchall_for_returning(self, cursor, *, _internal=False): - compiled = self.compiled - if ( - not _internal - and compiled is None - or not is_sql_compiler(compiled) - or not compiled._oracle_returning - ): - raise NotImplementedError( - "execution context was not prepared for Oracle RETURNING" - ) - - # create a fake cursor result from the out parameters. unlike - # get_out_parameter_values(), the result-row handlers here will be - # applied at the Result level - - numcols = len(self.out_parameters) - - # [stmt_result for stmt_result in outparam.values] == each - # statement in executemany - # [val for val in stmt_result] == each row for a particular - # statement - return list( - zip( - *[ - [ - val - for stmt_result in self.out_parameters[ - f"ret_{j}" - ].values - for val in (stmt_result or ()) - ] - for j in range(numcols) - ] - ) - ) - - def get_out_parameter_values(self, out_param_names): - # this method should not be called when the compiler has - # RETURNING as we've turned the has_out_parameters flag set to - # False. - assert not self.compiled.returning - - return [ - self.dialect._paramval(self.out_parameters[name]) - for name in out_param_names - ] - - -class OracleDialect_cx_oracle(OracleDialect): - supports_statement_cache = True - execution_ctx_cls = OracleExecutionContext_cx_oracle - statement_compiler = OracleCompiler_cx_oracle - - supports_sane_rowcount = True - supports_sane_multi_rowcount = True - - insert_executemany_returning = True - insert_executemany_returning_sort_by_parameter_order = True - update_executemany_returning = True - delete_executemany_returning = True - - bind_typing = interfaces.BindTyping.SETINPUTSIZES - - driver = "cx_oracle" - - colspecs = util.update_copy( - OracleDialect.colspecs, - { - sqltypes.TIMESTAMP: _CXOracleTIMESTAMP, - sqltypes.Numeric: _OracleNumeric, - sqltypes.Float: _OracleNumeric, - oracle.BINARY_FLOAT: _OracleBINARY_FLOAT, - oracle.BINARY_DOUBLE: _OracleBINARY_DOUBLE, - sqltypes.Integer: _OracleInteger, - oracle.NUMBER: _OracleNUMBER, - sqltypes.Date: _CXOracleDate, - sqltypes.LargeBinary: _OracleBinary, - sqltypes.Boolean: oracle._OracleBoolean, - sqltypes.Interval: _OracleInterval, - oracle.INTERVAL: _OracleInterval, - sqltypes.Text: _OracleText, - sqltypes.String: _OracleString, - sqltypes.UnicodeText: _OracleUnicodeTextCLOB, - sqltypes.CHAR: _OracleChar, - sqltypes.NCHAR: _OracleNChar, - sqltypes.Enum: _OracleEnum, - oracle.LONG: _OracleLong, - oracle.RAW: _OracleRaw, - sqltypes.Unicode: _OracleUnicodeStringCHAR, - sqltypes.NVARCHAR: _OracleUnicodeStringNCHAR, - sqltypes.Uuid: _OracleUUID, - oracle.NCLOB: _OracleUnicodeTextNCLOB, - oracle.ROWID: _OracleRowid, - }, - ) - - execute_sequence_format = list - - _cx_oracle_threaded = None - - _cursor_var_unicode_kwargs = util.immutabledict() - - @util.deprecated_params( - threaded=( - "1.3", - "The 'threaded' parameter to the cx_oracle/oracledb dialect " - "is deprecated as a dialect-level argument, and will be removed " - "in a future release. As of version 1.3, it defaults to False " - "rather than True. The 'threaded' option can be passed to " - "cx_Oracle directly in the URL query string passed to " - ":func:`_sa.create_engine`.", - ) - ) - def __init__( - self, - auto_convert_lobs=True, - coerce_to_decimal=True, - arraysize=None, - encoding_errors=None, - threaded=None, - **kwargs, - ): - OracleDialect.__init__(self, **kwargs) - self.arraysize = arraysize - self.encoding_errors = encoding_errors - if encoding_errors: - self._cursor_var_unicode_kwargs = { - "encodingErrors": encoding_errors - } - if threaded is not None: - self._cx_oracle_threaded = threaded - self.auto_convert_lobs = auto_convert_lobs - self.coerce_to_decimal = coerce_to_decimal - if self._use_nchar_for_unicode: - self.colspecs = self.colspecs.copy() - self.colspecs[sqltypes.Unicode] = _OracleUnicodeStringNCHAR - self.colspecs[sqltypes.UnicodeText] = _OracleUnicodeTextNCLOB - - dbapi_module = self.dbapi - self._load_version(dbapi_module) - - if dbapi_module is not None: - # these constants will first be seen in SQLAlchemy datatypes - # coming from the get_dbapi_type() method. We then - # will place the following types into setinputsizes() calls - # on each statement. Oracle constants that are not in this - # list will not be put into setinputsizes(). - self.include_set_input_sizes = { - dbapi_module.DATETIME, - dbapi_module.DB_TYPE_NVARCHAR, # used for CLOB, NCLOB - dbapi_module.DB_TYPE_RAW, # used for BLOB - dbapi_module.NCLOB, # not currently used except for OUT param - dbapi_module.CLOB, # not currently used except for OUT param - dbapi_module.LOB, # not currently used - dbapi_module.BLOB, # not currently used except for OUT param - dbapi_module.NCHAR, - dbapi_module.FIXED_NCHAR, - dbapi_module.FIXED_CHAR, - dbapi_module.TIMESTAMP, - int, # _OracleInteger, - # _OracleBINARY_FLOAT, _OracleBINARY_DOUBLE, - dbapi_module.NATIVE_FLOAT, - } - - self._paramval = lambda value: value.getvalue() - - def _load_version(self, dbapi_module): - version = (0, 0, 0) - if dbapi_module is not None: - m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", dbapi_module.version) - if m: - version = tuple( - int(x) for x in m.group(1, 2, 3) if x is not None - ) - self.cx_oracle_ver = version - if self.cx_oracle_ver < (8,) and self.cx_oracle_ver > (0, 0, 0): - raise exc.InvalidRequestError( - "cx_Oracle version 8 and above are supported" - ) - - @classmethod - def import_dbapi(cls): - import cx_Oracle - - return cx_Oracle - - def initialize(self, connection): - super().initialize(connection) - self._detect_decimal_char(connection) - - def get_isolation_level(self, dbapi_connection): - # sources: - - # general idea of transaction id, have to start one, etc. - # https://stackoverflow.com/questions/10711204/how-to-check-isoloation-level - - # how to decode xid cols from v$transaction to match - # https://asktom.oracle.com/pls/apex/f?p=100:11:0::::P11_QUESTION_ID:9532779900346079444 - - # Oracle tuple comparison without using IN: - # https://www.sql-workbench.eu/comparison/tuple_comparison.html - - with dbapi_connection.cursor() as cursor: - # this is the only way to ensure a transaction is started without - # actually running DML. There's no way to see the configured - # isolation level without getting it from v$transaction which - # means transaction has to be started. - outval = cursor.var(str) - cursor.execute( - """ - begin - :trans_id := dbms_transaction.local_transaction_id( TRUE ); - end; - """, - {"trans_id": outval}, - ) - trans_id = outval.getvalue() - xidusn, xidslot, xidsqn = trans_id.split(".", 2) - - cursor.execute( - "SELECT CASE BITAND(t.flag, POWER(2, 28)) " - "WHEN 0 THEN 'READ COMMITTED' " - "ELSE 'SERIALIZABLE' END AS isolation_level " - "FROM v$transaction t WHERE " - "(t.xidusn, t.xidslot, t.xidsqn) = " - "((:xidusn, :xidslot, :xidsqn))", - {"xidusn": xidusn, "xidslot": xidslot, "xidsqn": xidsqn}, - ) - row = cursor.fetchone() - if row is None: - raise exc.InvalidRequestError( - "could not retrieve isolation level" - ) - result = row[0] - - return result - - def get_isolation_level_values(self, dbapi_connection): - return super().get_isolation_level_values(dbapi_connection) + [ - "AUTOCOMMIT" - ] - - def set_isolation_level(self, dbapi_connection, level): - if level == "AUTOCOMMIT": - dbapi_connection.autocommit = True - else: - dbapi_connection.autocommit = False - dbapi_connection.rollback() - with dbapi_connection.cursor() as cursor: - cursor.execute(f"ALTER SESSION SET ISOLATION_LEVEL={level}") - - def _detect_decimal_char(self, connection): - # we have the option to change this setting upon connect, - # or just look at what it is upon connect and convert. - # to minimize the chance of interference with changes to - # NLS_TERRITORY or formatting behavior of the DB, we opt - # to just look at it - - dbapi_connection = connection.connection - - with dbapi_connection.cursor() as cursor: - # issue #8744 - # nls_session_parameters is not available in some Oracle - # modes like "mount mode". But then, v$nls_parameters is not - # available if the connection doesn't have SYSDBA priv. - # - # simplify the whole thing and just use the method that we were - # doing in the test suite already, selecting a number - - def output_type_handler( - cursor, name, defaultType, size, precision, scale - ): - return cursor.var( - self.dbapi.STRING, 255, arraysize=cursor.arraysize - ) - - cursor.outputtypehandler = output_type_handler - cursor.execute("SELECT 1.1 FROM DUAL") - value = cursor.fetchone()[0] - - decimal_char = value.lstrip("0")[1] - assert not decimal_char[0].isdigit() - - self._decimal_char = decimal_char - - if self._decimal_char != ".": - _detect_decimal = self._detect_decimal - _to_decimal = self._to_decimal - - self._detect_decimal = lambda value: _detect_decimal( - value.replace(self._decimal_char, ".") - ) - self._to_decimal = lambda value: _to_decimal( - value.replace(self._decimal_char, ".") - ) - - def _detect_decimal(self, value): - if "." in value: - return self._to_decimal(value) - else: - return int(value) - - _to_decimal = decimal.Decimal - - def _generate_connection_outputtype_handler(self): - """establish the default outputtypehandler established at the - connection level. - - """ - - dialect = self - cx_Oracle = dialect.dbapi - - number_handler = _OracleNUMBER( - asdecimal=True - )._cx_oracle_outputtypehandler(dialect) - float_handler = _OracleNUMBER( - asdecimal=False - )._cx_oracle_outputtypehandler(dialect) - - def output_type_handler( - cursor, name, default_type, size, precision, scale - ): - if ( - default_type == cx_Oracle.NUMBER - and default_type is not cx_Oracle.NATIVE_FLOAT - ): - if not dialect.coerce_to_decimal: - return None - elif precision == 0 and scale in (0, -127): - # ambiguous type, this occurs when selecting - # numbers from deep subqueries - return cursor.var( - cx_Oracle.STRING, - 255, - outconverter=dialect._detect_decimal, - arraysize=cursor.arraysize, - ) - elif precision and scale > 0: - return number_handler( - cursor, name, default_type, size, precision, scale - ) - else: - return float_handler( - cursor, name, default_type, size, precision, scale - ) - - # if unicode options were specified, add a decoder, otherwise - # cx_Oracle should return Unicode - elif ( - dialect._cursor_var_unicode_kwargs - and default_type - in ( - cx_Oracle.STRING, - cx_Oracle.FIXED_CHAR, - ) - and default_type is not cx_Oracle.CLOB - and default_type is not cx_Oracle.NCLOB - ): - return cursor.var( - str, - size, - cursor.arraysize, - **dialect._cursor_var_unicode_kwargs, - ) - - elif dialect.auto_convert_lobs and default_type in ( - cx_Oracle.CLOB, - cx_Oracle.NCLOB, - ): - return cursor.var( - cx_Oracle.DB_TYPE_NVARCHAR, - _CX_ORACLE_MAGIC_LOB_SIZE, - cursor.arraysize, - **dialect._cursor_var_unicode_kwargs, - ) - - elif dialect.auto_convert_lobs and default_type in ( - cx_Oracle.BLOB, - ): - return cursor.var( - cx_Oracle.DB_TYPE_RAW, - _CX_ORACLE_MAGIC_LOB_SIZE, - cursor.arraysize, - ) - - return output_type_handler - - def on_connect(self): - output_type_handler = self._generate_connection_outputtype_handler() - - def on_connect(conn): - conn.outputtypehandler = output_type_handler - - return on_connect - - def create_connect_args(self, url): - opts = dict(url.query) - - for opt in ("use_ansi", "auto_convert_lobs"): - if opt in opts: - util.warn_deprecated( - f"{self.driver} dialect option {opt!r} should only be " - "passed to create_engine directly, not within the URL " - "string", - version="1.3", - ) - util.coerce_kw_type(opts, opt, bool) - setattr(self, opt, opts.pop(opt)) - - database = url.database - service_name = opts.pop("service_name", None) - if database or service_name: - # if we have a database, then we have a remote host - port = url.port - if port: - port = int(port) - else: - port = 1521 - - if database and service_name: - raise exc.InvalidRequestError( - '"service_name" option shouldn\'t ' - 'be used with a "database" part of the url' - ) - if database: - makedsn_kwargs = {"sid": database} - if service_name: - makedsn_kwargs = {"service_name": service_name} - - dsn = self.dbapi.makedsn(url.host, port, **makedsn_kwargs) - else: - # we have a local tnsname - dsn = url.host - - if dsn is not None: - opts["dsn"] = dsn - if url.password is not None: - opts["password"] = url.password - if url.username is not None: - opts["user"] = url.username - - if self._cx_oracle_threaded is not None: - opts.setdefault("threaded", self._cx_oracle_threaded) - - def convert_cx_oracle_constant(value): - if isinstance(value, str): - try: - int_val = int(value) - except ValueError: - value = value.upper() - return getattr(self.dbapi, value) - else: - return int_val - else: - return value - - util.coerce_kw_type(opts, "mode", convert_cx_oracle_constant) - util.coerce_kw_type(opts, "threaded", bool) - util.coerce_kw_type(opts, "events", bool) - util.coerce_kw_type(opts, "purity", convert_cx_oracle_constant) - return ([], opts) - - def _get_server_version_info(self, connection): - return tuple(int(x) for x in connection.connection.version.split(".")) - - def is_disconnect(self, e, connection, cursor): - (error,) = e.args - if isinstance( - e, (self.dbapi.InterfaceError, self.dbapi.DatabaseError) - ) and "not connected" in str(e): - return True - - if hasattr(error, "code") and error.code in { - 28, - 3114, - 3113, - 3135, - 1033, - 2396, - }: - # ORA-00028: your session has been killed - # ORA-03114: not connected to ORACLE - # ORA-03113: end-of-file on communication channel - # ORA-03135: connection lost contact - # ORA-01033: ORACLE initialization or shutdown in progress - # ORA-02396: exceeded maximum idle time, please connect again - # TODO: Others ? - return True - - if re.match(r"^(?:DPI-1010|DPI-1080|DPY-1001|DPY-4011)", str(e)): - # DPI-1010: not connected - # DPI-1080: connection was closed by ORA-3113 - # python-oracledb's DPY-1001: not connected to database - # python-oracledb's DPY-4011: the database or network closed the - # connection - # TODO: others? - return True - - return False - - def create_xid(self): - """create a two-phase transaction ID. - - this id will be passed to do_begin_twophase(), do_rollback_twophase(), - do_commit_twophase(). its format is unspecified. - - """ - - id_ = random.randint(0, 2**128) - return (0x1234, "%032x" % id_, "%032x" % 9) - - def do_executemany(self, cursor, statement, parameters, context=None): - if isinstance(parameters, tuple): - parameters = list(parameters) - cursor.executemany(statement, parameters) - - def do_begin_twophase(self, connection, xid): - connection.connection.begin(*xid) - connection.connection.info["cx_oracle_xid"] = xid - - def do_prepare_twophase(self, connection, xid): - result = connection.connection.prepare() - connection.info["cx_oracle_prepared"] = result - - def do_rollback_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - self.do_rollback(connection.connection) - # TODO: need to end XA state here - - def do_commit_twophase( - self, connection, xid, is_prepared=True, recover=False - ): - if not is_prepared: - self.do_commit(connection.connection) - else: - if recover: - raise NotImplementedError( - "2pc recovery not implemented for cx_Oracle" - ) - oci_prepared = connection.info["cx_oracle_prepared"] - if oci_prepared: - self.do_commit(connection.connection) - # TODO: need to end XA state here - - def do_set_input_sizes(self, cursor, list_of_tuples, context): - if self.positional: - # not usually used, here to support if someone is modifying - # the dialect to use positional style - cursor.setinputsizes( - *[dbtype for key, dbtype, sqltype in list_of_tuples] - ) - else: - collection = ( - (key, dbtype) - for key, dbtype, sqltype in list_of_tuples - if dbtype - ) - - cursor.setinputsizes(**{key: dbtype for key, dbtype in collection}) - - def do_recover_twophase(self, connection): - raise NotImplementedError( - "recover two phase query for cx_Oracle not implemented" - ) - - -dialect = OracleDialect_cx_oracle diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py deleted file mode 100644 index 63479b9..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/dictionary.py +++ /dev/null @@ -1,507 +0,0 @@ -# dialects/oracle/dictionary.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -from .types import DATE -from .types import LONG -from .types import NUMBER -from .types import RAW -from .types import VARCHAR2 -from ... import Column -from ... import MetaData -from ... import Table -from ... import table -from ...sql.sqltypes import CHAR - -# constants -DB_LINK_PLACEHOLDER = "__$sa_dblink$__" -# tables -dual = table("dual") -dictionary_meta = MetaData() - -# NOTE: all the dictionary_meta are aliases because oracle does not like -# using the full table@dblink for every column in query, and complains with -# ORA-00960: ambiguous column naming in select list -all_tables = Table( - "all_tables" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("table_name", VARCHAR2(128), nullable=False), - Column("tablespace_name", VARCHAR2(30)), - Column("cluster_name", VARCHAR2(128)), - Column("iot_name", VARCHAR2(128)), - Column("status", VARCHAR2(8)), - Column("pct_free", NUMBER), - Column("pct_used", NUMBER), - Column("ini_trans", NUMBER), - Column("max_trans", NUMBER), - Column("initial_extent", NUMBER), - Column("next_extent", NUMBER), - Column("min_extents", NUMBER), - Column("max_extents", NUMBER), - Column("pct_increase", NUMBER), - Column("freelists", NUMBER), - Column("freelist_groups", NUMBER), - Column("logging", VARCHAR2(3)), - Column("backed_up", VARCHAR2(1)), - Column("num_rows", NUMBER), - Column("blocks", NUMBER), - Column("empty_blocks", NUMBER), - Column("avg_space", NUMBER), - Column("chain_cnt", NUMBER), - Column("avg_row_len", NUMBER), - Column("avg_space_freelist_blocks", NUMBER), - Column("num_freelist_blocks", NUMBER), - Column("degree", VARCHAR2(10)), - Column("instances", VARCHAR2(10)), - Column("cache", VARCHAR2(5)), - Column("table_lock", VARCHAR2(8)), - Column("sample_size", NUMBER), - Column("last_analyzed", DATE), - Column("partitioned", VARCHAR2(3)), - Column("iot_type", VARCHAR2(12)), - Column("temporary", VARCHAR2(1)), - Column("secondary", VARCHAR2(1)), - Column("nested", VARCHAR2(3)), - Column("buffer_pool", VARCHAR2(7)), - Column("flash_cache", VARCHAR2(7)), - Column("cell_flash_cache", VARCHAR2(7)), - Column("row_movement", VARCHAR2(8)), - Column("global_stats", VARCHAR2(3)), - Column("user_stats", VARCHAR2(3)), - Column("duration", VARCHAR2(15)), - Column("skip_corrupt", VARCHAR2(8)), - Column("monitoring", VARCHAR2(3)), - Column("cluster_owner", VARCHAR2(128)), - Column("dependencies", VARCHAR2(8)), - Column("compression", VARCHAR2(8)), - Column("compress_for", VARCHAR2(30)), - Column("dropped", VARCHAR2(3)), - Column("read_only", VARCHAR2(3)), - Column("segment_created", VARCHAR2(3)), - Column("result_cache", VARCHAR2(7)), - Column("clustering", VARCHAR2(3)), - Column("activity_tracking", VARCHAR2(23)), - Column("dml_timestamp", VARCHAR2(25)), - Column("has_identity", VARCHAR2(3)), - Column("container_data", VARCHAR2(3)), - Column("inmemory", VARCHAR2(8)), - Column("inmemory_priority", VARCHAR2(8)), - Column("inmemory_distribute", VARCHAR2(15)), - Column("inmemory_compression", VARCHAR2(17)), - Column("inmemory_duplicate", VARCHAR2(13)), - Column("default_collation", VARCHAR2(100)), - Column("duplicated", VARCHAR2(1)), - Column("sharded", VARCHAR2(1)), - Column("externally_sharded", VARCHAR2(1)), - Column("externally_duplicated", VARCHAR2(1)), - Column("external", VARCHAR2(3)), - Column("hybrid", VARCHAR2(3)), - Column("cellmemory", VARCHAR2(24)), - Column("containers_default", VARCHAR2(3)), - Column("container_map", VARCHAR2(3)), - Column("extended_data_link", VARCHAR2(3)), - Column("extended_data_link_map", VARCHAR2(3)), - Column("inmemory_service", VARCHAR2(12)), - Column("inmemory_service_name", VARCHAR2(1000)), - Column("container_map_object", VARCHAR2(3)), - Column("memoptimize_read", VARCHAR2(8)), - Column("memoptimize_write", VARCHAR2(8)), - Column("has_sensitive_column", VARCHAR2(3)), - Column("admit_null", VARCHAR2(3)), - Column("data_link_dml_enabled", VARCHAR2(3)), - Column("logical_replication", VARCHAR2(8)), -).alias("a_tables") - -all_views = Table( - "all_views" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("view_name", VARCHAR2(128), nullable=False), - Column("text_length", NUMBER), - Column("text", LONG), - Column("text_vc", VARCHAR2(4000)), - Column("type_text_length", NUMBER), - Column("type_text", VARCHAR2(4000)), - Column("oid_text_length", NUMBER), - Column("oid_text", VARCHAR2(4000)), - Column("view_type_owner", VARCHAR2(128)), - Column("view_type", VARCHAR2(128)), - Column("superview_name", VARCHAR2(128)), - Column("editioning_view", VARCHAR2(1)), - Column("read_only", VARCHAR2(1)), - Column("container_data", VARCHAR2(1)), - Column("bequeath", VARCHAR2(12)), - Column("origin_con_id", VARCHAR2(256)), - Column("default_collation", VARCHAR2(100)), - Column("containers_default", VARCHAR2(3)), - Column("container_map", VARCHAR2(3)), - Column("extended_data_link", VARCHAR2(3)), - Column("extended_data_link_map", VARCHAR2(3)), - Column("has_sensitive_column", VARCHAR2(3)), - Column("admit_null", VARCHAR2(3)), - Column("pdb_local_only", VARCHAR2(3)), -).alias("a_views") - -all_sequences = Table( - "all_sequences" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("sequence_owner", VARCHAR2(128), nullable=False), - Column("sequence_name", VARCHAR2(128), nullable=False), - Column("min_value", NUMBER), - Column("max_value", NUMBER), - Column("increment_by", NUMBER, nullable=False), - Column("cycle_flag", VARCHAR2(1)), - Column("order_flag", VARCHAR2(1)), - Column("cache_size", NUMBER, nullable=False), - Column("last_number", NUMBER, nullable=False), - Column("scale_flag", VARCHAR2(1)), - Column("extend_flag", VARCHAR2(1)), - Column("sharded_flag", VARCHAR2(1)), - Column("session_flag", VARCHAR2(1)), - Column("keep_value", VARCHAR2(1)), -).alias("a_sequences") - -all_users = Table( - "all_users" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("username", VARCHAR2(128), nullable=False), - Column("user_id", NUMBER, nullable=False), - Column("created", DATE, nullable=False), - Column("common", VARCHAR2(3)), - Column("oracle_maintained", VARCHAR2(1)), - Column("inherited", VARCHAR2(3)), - Column("default_collation", VARCHAR2(100)), - Column("implicit", VARCHAR2(3)), - Column("all_shard", VARCHAR2(3)), - Column("external_shard", VARCHAR2(3)), -).alias("a_users") - -all_mviews = Table( - "all_mviews" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("mview_name", VARCHAR2(128), nullable=False), - Column("container_name", VARCHAR2(128), nullable=False), - Column("query", LONG), - Column("query_len", NUMBER(38)), - Column("updatable", VARCHAR2(1)), - Column("update_log", VARCHAR2(128)), - Column("master_rollback_seg", VARCHAR2(128)), - Column("master_link", VARCHAR2(128)), - Column("rewrite_enabled", VARCHAR2(1)), - Column("rewrite_capability", VARCHAR2(9)), - Column("refresh_mode", VARCHAR2(6)), - Column("refresh_method", VARCHAR2(8)), - Column("build_mode", VARCHAR2(9)), - Column("fast_refreshable", VARCHAR2(18)), - Column("last_refresh_type", VARCHAR2(8)), - Column("last_refresh_date", DATE), - Column("last_refresh_end_time", DATE), - Column("staleness", VARCHAR2(19)), - Column("after_fast_refresh", VARCHAR2(19)), - Column("unknown_prebuilt", VARCHAR2(1)), - Column("unknown_plsql_func", VARCHAR2(1)), - Column("unknown_external_table", VARCHAR2(1)), - Column("unknown_consider_fresh", VARCHAR2(1)), - Column("unknown_import", VARCHAR2(1)), - Column("unknown_trusted_fd", VARCHAR2(1)), - Column("compile_state", VARCHAR2(19)), - Column("use_no_index", VARCHAR2(1)), - Column("stale_since", DATE), - Column("num_pct_tables", NUMBER), - Column("num_fresh_pct_regions", NUMBER), - Column("num_stale_pct_regions", NUMBER), - Column("segment_created", VARCHAR2(3)), - Column("evaluation_edition", VARCHAR2(128)), - Column("unusable_before", VARCHAR2(128)), - Column("unusable_beginning", VARCHAR2(128)), - Column("default_collation", VARCHAR2(100)), - Column("on_query_computation", VARCHAR2(1)), - Column("auto", VARCHAR2(3)), -).alias("a_mviews") - -all_tab_identity_cols = Table( - "all_tab_identity_cols" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("table_name", VARCHAR2(128), nullable=False), - Column("column_name", VARCHAR2(128), nullable=False), - Column("generation_type", VARCHAR2(10)), - Column("sequence_name", VARCHAR2(128), nullable=False), - Column("identity_options", VARCHAR2(298)), -).alias("a_tab_identity_cols") - -all_tab_cols = Table( - "all_tab_cols" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("table_name", VARCHAR2(128), nullable=False), - Column("column_name", VARCHAR2(128), nullable=False), - Column("data_type", VARCHAR2(128)), - Column("data_type_mod", VARCHAR2(3)), - Column("data_type_owner", VARCHAR2(128)), - Column("data_length", NUMBER, nullable=False), - Column("data_precision", NUMBER), - Column("data_scale", NUMBER), - Column("nullable", VARCHAR2(1)), - Column("column_id", NUMBER), - Column("default_length", NUMBER), - Column("data_default", LONG), - Column("num_distinct", NUMBER), - Column("low_value", RAW(1000)), - Column("high_value", RAW(1000)), - Column("density", NUMBER), - Column("num_nulls", NUMBER), - Column("num_buckets", NUMBER), - Column("last_analyzed", DATE), - Column("sample_size", NUMBER), - Column("character_set_name", VARCHAR2(44)), - Column("char_col_decl_length", NUMBER), - Column("global_stats", VARCHAR2(3)), - Column("user_stats", VARCHAR2(3)), - Column("avg_col_len", NUMBER), - Column("char_length", NUMBER), - Column("char_used", VARCHAR2(1)), - Column("v80_fmt_image", VARCHAR2(3)), - Column("data_upgraded", VARCHAR2(3)), - Column("hidden_column", VARCHAR2(3)), - Column("virtual_column", VARCHAR2(3)), - Column("segment_column_id", NUMBER), - Column("internal_column_id", NUMBER, nullable=False), - Column("histogram", VARCHAR2(15)), - Column("qualified_col_name", VARCHAR2(4000)), - Column("user_generated", VARCHAR2(3)), - Column("default_on_null", VARCHAR2(3)), - Column("identity_column", VARCHAR2(3)), - Column("evaluation_edition", VARCHAR2(128)), - Column("unusable_before", VARCHAR2(128)), - Column("unusable_beginning", VARCHAR2(128)), - Column("collation", VARCHAR2(100)), - Column("collated_column_id", NUMBER), -).alias("a_tab_cols") - -all_tab_comments = Table( - "all_tab_comments" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("table_name", VARCHAR2(128), nullable=False), - Column("table_type", VARCHAR2(11)), - Column("comments", VARCHAR2(4000)), - Column("origin_con_id", NUMBER), -).alias("a_tab_comments") - -all_col_comments = Table( - "all_col_comments" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("table_name", VARCHAR2(128), nullable=False), - Column("column_name", VARCHAR2(128), nullable=False), - Column("comments", VARCHAR2(4000)), - Column("origin_con_id", NUMBER), -).alias("a_col_comments") - -all_mview_comments = Table( - "all_mview_comments" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("mview_name", VARCHAR2(128), nullable=False), - Column("comments", VARCHAR2(4000)), -).alias("a_mview_comments") - -all_ind_columns = Table( - "all_ind_columns" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("index_owner", VARCHAR2(128), nullable=False), - Column("index_name", VARCHAR2(128), nullable=False), - Column("table_owner", VARCHAR2(128), nullable=False), - Column("table_name", VARCHAR2(128), nullable=False), - Column("column_name", VARCHAR2(4000)), - Column("column_position", NUMBER, nullable=False), - Column("column_length", NUMBER, nullable=False), - Column("char_length", NUMBER), - Column("descend", VARCHAR2(4)), - Column("collated_column_id", NUMBER), -).alias("a_ind_columns") - -all_indexes = Table( - "all_indexes" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("index_name", VARCHAR2(128), nullable=False), - Column("index_type", VARCHAR2(27)), - Column("table_owner", VARCHAR2(128), nullable=False), - Column("table_name", VARCHAR2(128), nullable=False), - Column("table_type", CHAR(11)), - Column("uniqueness", VARCHAR2(9)), - Column("compression", VARCHAR2(13)), - Column("prefix_length", NUMBER), - Column("tablespace_name", VARCHAR2(30)), - Column("ini_trans", NUMBER), - Column("max_trans", NUMBER), - Column("initial_extent", NUMBER), - Column("next_extent", NUMBER), - Column("min_extents", NUMBER), - Column("max_extents", NUMBER), - Column("pct_increase", NUMBER), - Column("pct_threshold", NUMBER), - Column("include_column", NUMBER), - Column("freelists", NUMBER), - Column("freelist_groups", NUMBER), - Column("pct_free", NUMBER), - Column("logging", VARCHAR2(3)), - Column("blevel", NUMBER), - Column("leaf_blocks", NUMBER), - Column("distinct_keys", NUMBER), - Column("avg_leaf_blocks_per_key", NUMBER), - Column("avg_data_blocks_per_key", NUMBER), - Column("clustering_factor", NUMBER), - Column("status", VARCHAR2(8)), - Column("num_rows", NUMBER), - Column("sample_size", NUMBER), - Column("last_analyzed", DATE), - Column("degree", VARCHAR2(40)), - Column("instances", VARCHAR2(40)), - Column("partitioned", VARCHAR2(3)), - Column("temporary", VARCHAR2(1)), - Column("generated", VARCHAR2(1)), - Column("secondary", VARCHAR2(1)), - Column("buffer_pool", VARCHAR2(7)), - Column("flash_cache", VARCHAR2(7)), - Column("cell_flash_cache", VARCHAR2(7)), - Column("user_stats", VARCHAR2(3)), - Column("duration", VARCHAR2(15)), - Column("pct_direct_access", NUMBER), - Column("ityp_owner", VARCHAR2(128)), - Column("ityp_name", VARCHAR2(128)), - Column("parameters", VARCHAR2(1000)), - Column("global_stats", VARCHAR2(3)), - Column("domidx_status", VARCHAR2(12)), - Column("domidx_opstatus", VARCHAR2(6)), - Column("funcidx_status", VARCHAR2(8)), - Column("join_index", VARCHAR2(3)), - Column("iot_redundant_pkey_elim", VARCHAR2(3)), - Column("dropped", VARCHAR2(3)), - Column("visibility", VARCHAR2(9)), - Column("domidx_management", VARCHAR2(14)), - Column("segment_created", VARCHAR2(3)), - Column("orphaned_entries", VARCHAR2(3)), - Column("indexing", VARCHAR2(7)), - Column("auto", VARCHAR2(3)), -).alias("a_indexes") - -all_ind_expressions = Table( - "all_ind_expressions" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("index_owner", VARCHAR2(128), nullable=False), - Column("index_name", VARCHAR2(128), nullable=False), - Column("table_owner", VARCHAR2(128), nullable=False), - Column("table_name", VARCHAR2(128), nullable=False), - Column("column_expression", LONG), - Column("column_position", NUMBER, nullable=False), -).alias("a_ind_expressions") - -all_constraints = Table( - "all_constraints" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128)), - Column("constraint_name", VARCHAR2(128)), - Column("constraint_type", VARCHAR2(1)), - Column("table_name", VARCHAR2(128)), - Column("search_condition", LONG), - Column("search_condition_vc", VARCHAR2(4000)), - Column("r_owner", VARCHAR2(128)), - Column("r_constraint_name", VARCHAR2(128)), - Column("delete_rule", VARCHAR2(9)), - Column("status", VARCHAR2(8)), - Column("deferrable", VARCHAR2(14)), - Column("deferred", VARCHAR2(9)), - Column("validated", VARCHAR2(13)), - Column("generated", VARCHAR2(14)), - Column("bad", VARCHAR2(3)), - Column("rely", VARCHAR2(4)), - Column("last_change", DATE), - Column("index_owner", VARCHAR2(128)), - Column("index_name", VARCHAR2(128)), - Column("invalid", VARCHAR2(7)), - Column("view_related", VARCHAR2(14)), - Column("origin_con_id", VARCHAR2(256)), -).alias("a_constraints") - -all_cons_columns = Table( - "all_cons_columns" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("constraint_name", VARCHAR2(128), nullable=False), - Column("table_name", VARCHAR2(128), nullable=False), - Column("column_name", VARCHAR2(4000)), - Column("position", NUMBER), -).alias("a_cons_columns") - -# TODO figure out if it's still relevant, since there is no mention from here -# https://docs.oracle.com/en/database/oracle/oracle-database/21/refrn/ALL_DB_LINKS.html -# original note: -# using user_db_links here since all_db_links appears -# to have more restricted permissions. -# https://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm -# will need to hear from more users if we are doing -# the right thing here. See [ticket:2619] -all_db_links = Table( - "all_db_links" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("db_link", VARCHAR2(128), nullable=False), - Column("username", VARCHAR2(128)), - Column("host", VARCHAR2(2000)), - Column("created", DATE, nullable=False), - Column("hidden", VARCHAR2(3)), - Column("shard_internal", VARCHAR2(3)), - Column("valid", VARCHAR2(3)), - Column("intra_cdb", VARCHAR2(3)), -).alias("a_db_links") - -all_synonyms = Table( - "all_synonyms" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128)), - Column("synonym_name", VARCHAR2(128)), - Column("table_owner", VARCHAR2(128)), - Column("table_name", VARCHAR2(128)), - Column("db_link", VARCHAR2(128)), - Column("origin_con_id", VARCHAR2(256)), -).alias("a_synonyms") - -all_objects = Table( - "all_objects" + DB_LINK_PLACEHOLDER, - dictionary_meta, - Column("owner", VARCHAR2(128), nullable=False), - Column("object_name", VARCHAR2(128), nullable=False), - Column("subobject_name", VARCHAR2(128)), - Column("object_id", NUMBER, nullable=False), - Column("data_object_id", NUMBER), - Column("object_type", VARCHAR2(23)), - Column("created", DATE, nullable=False), - Column("last_ddl_time", DATE, nullable=False), - Column("timestamp", VARCHAR2(19)), - Column("status", VARCHAR2(7)), - Column("temporary", VARCHAR2(1)), - Column("generated", VARCHAR2(1)), - Column("secondary", VARCHAR2(1)), - Column("namespace", NUMBER, nullable=False), - Column("edition_name", VARCHAR2(128)), - Column("sharing", VARCHAR2(13)), - Column("editionable", VARCHAR2(1)), - Column("oracle_maintained", VARCHAR2(1)), - Column("application", VARCHAR2(1)), - Column("default_collation", VARCHAR2(100)), - Column("duplicated", VARCHAR2(1)), - Column("sharded", VARCHAR2(1)), - Column("created_appid", NUMBER), - Column("created_vsnid", NUMBER), - Column("modified_appid", NUMBER), - Column("modified_vsnid", NUMBER), -).alias("a_objects") diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py deleted file mode 100644 index 9cdec3b..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/oracledb.py +++ /dev/null @@ -1,311 +0,0 @@ -# dialects/oracle/oracledb.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -r""" -.. dialect:: oracle+oracledb - :name: python-oracledb - :dbapi: oracledb - :connectstring: oracle+oracledb://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]] - :url: https://oracle.github.io/python-oracledb/ - -python-oracledb is released by Oracle to supersede the cx_Oracle driver. -It is fully compatible with cx_Oracle and features both a "thin" client -mode that requires no dependencies, as well as a "thick" mode that uses -the Oracle Client Interface in the same way as cx_Oracle. - -.. seealso:: - - :ref:`cx_oracle` - all of cx_Oracle's notes apply to the oracledb driver - as well. - -The SQLAlchemy ``oracledb`` dialect provides both a sync and an async -implementation under the same dialect name. The proper version is -selected depending on how the engine is created: - -* calling :func:`_sa.create_engine` with ``oracle+oracledb://...`` will - automatically select the sync version, e.g.:: - - from sqlalchemy import create_engine - sync_engine = create_engine("oracle+oracledb://scott:tiger@localhost/?service_name=XEPDB1") - -* calling :func:`_asyncio.create_async_engine` with - ``oracle+oracledb://...`` will automatically select the async version, - e.g.:: - - from sqlalchemy.ext.asyncio import create_async_engine - asyncio_engine = create_async_engine("oracle+oracledb://scott:tiger@localhost/?service_name=XEPDB1") - -The asyncio version of the dialect may also be specified explicitly using the -``oracledb_async`` suffix, as:: - - from sqlalchemy.ext.asyncio import create_async_engine - asyncio_engine = create_async_engine("oracle+oracledb_async://scott:tiger@localhost/?service_name=XEPDB1") - -.. versionadded:: 2.0.25 added support for the async version of oracledb. - -Thick mode support ------------------- - -By default the ``python-oracledb`` is started in thin mode, that does not -require oracle client libraries to be installed in the system. The -``python-oracledb`` driver also support a "thick" mode, that behaves -similarly to ``cx_oracle`` and requires that Oracle Client Interface (OCI) -is installed. - -To enable this mode, the user may call ``oracledb.init_oracle_client`` -manually, or by passing the parameter ``thick_mode=True`` to -:func:`_sa.create_engine`. To pass custom arguments to ``init_oracle_client``, -like the ``lib_dir`` path, a dict may be passed to this parameter, as in:: - - engine = sa.create_engine("oracle+oracledb://...", thick_mode={ - "lib_dir": "/path/to/oracle/client/lib", "driver_name": "my-app" - }) - -.. seealso:: - - https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.init_oracle_client - - -.. versionadded:: 2.0.0 added support for oracledb driver. - -""" # noqa -from __future__ import annotations - -import collections -import re -from typing import Any -from typing import TYPE_CHECKING - -from .cx_oracle import OracleDialect_cx_oracle as _OracleDialect_cx_oracle -from ... import exc -from ... import pool -from ...connectors.asyncio import AsyncAdapt_dbapi_connection -from ...connectors.asyncio import AsyncAdapt_dbapi_cursor -from ...connectors.asyncio import AsyncAdaptFallback_dbapi_connection -from ...util import asbool -from ...util import await_fallback -from ...util import await_only - -if TYPE_CHECKING: - from oracledb import AsyncConnection - from oracledb import AsyncCursor - - -class OracleDialect_oracledb(_OracleDialect_cx_oracle): - supports_statement_cache = True - driver = "oracledb" - _min_version = (1,) - - def __init__( - self, - auto_convert_lobs=True, - coerce_to_decimal=True, - arraysize=None, - encoding_errors=None, - thick_mode=None, - **kwargs, - ): - super().__init__( - auto_convert_lobs, - coerce_to_decimal, - arraysize, - encoding_errors, - **kwargs, - ) - - if self.dbapi is not None and ( - thick_mode or isinstance(thick_mode, dict) - ): - kw = thick_mode if isinstance(thick_mode, dict) else {} - self.dbapi.init_oracle_client(**kw) - - @classmethod - def import_dbapi(cls): - import oracledb - - return oracledb - - @classmethod - def is_thin_mode(cls, connection): - return connection.connection.dbapi_connection.thin - - @classmethod - def get_async_dialect_cls(cls, url): - return OracleDialectAsync_oracledb - - def _load_version(self, dbapi_module): - version = (0, 0, 0) - if dbapi_module is not None: - m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", dbapi_module.version) - if m: - version = tuple( - int(x) for x in m.group(1, 2, 3) if x is not None - ) - self.oracledb_ver = version - if ( - self.oracledb_ver > (0, 0, 0) - and self.oracledb_ver < self._min_version - ): - raise exc.InvalidRequestError( - f"oracledb version {self._min_version} and above are supported" - ) - - -class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor): - _cursor: AsyncCursor - __slots__ = () - - @property - def outputtypehandler(self): - return self._cursor.outputtypehandler - - @outputtypehandler.setter - def outputtypehandler(self, value): - self._cursor.outputtypehandler = value - - def var(self, *args, **kwargs): - return self._cursor.var(*args, **kwargs) - - def close(self): - self._rows.clear() - self._cursor.close() - - def setinputsizes(self, *args: Any, **kwargs: Any) -> Any: - return self._cursor.setinputsizes(*args, **kwargs) - - def _aenter_cursor(self, cursor: AsyncCursor) -> AsyncCursor: - try: - return cursor.__enter__() - except Exception as error: - self._adapt_connection._handle_exception(error) - - async def _execute_async(self, operation, parameters): - # override to not use mutex, oracledb already has mutex - - if parameters is None: - result = await self._cursor.execute(operation) - else: - result = await self._cursor.execute(operation, parameters) - - if self._cursor.description and not self.server_side: - self._rows = collections.deque(await self._cursor.fetchall()) - return result - - async def _executemany_async( - self, - operation, - seq_of_parameters, - ): - # override to not use mutex, oracledb already has mutex - return await self._cursor.executemany(operation, seq_of_parameters) - - def __enter__(self): - return self - - def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: - self.close() - - -class AsyncAdapt_oracledb_connection(AsyncAdapt_dbapi_connection): - _connection: AsyncConnection - __slots__ = () - - thin = True - - _cursor_cls = AsyncAdapt_oracledb_cursor - _ss_cursor_cls = None - - @property - def autocommit(self): - return self._connection.autocommit - - @autocommit.setter - def autocommit(self, value): - self._connection.autocommit = value - - @property - def outputtypehandler(self): - return self._connection.outputtypehandler - - @outputtypehandler.setter - def outputtypehandler(self, value): - self._connection.outputtypehandler = value - - @property - def version(self): - return self._connection.version - - @property - def stmtcachesize(self): - return self._connection.stmtcachesize - - @stmtcachesize.setter - def stmtcachesize(self, value): - self._connection.stmtcachesize = value - - def cursor(self): - return AsyncAdapt_oracledb_cursor(self) - - -class AsyncAdaptFallback_oracledb_connection( - AsyncAdaptFallback_dbapi_connection, AsyncAdapt_oracledb_connection -): - __slots__ = () - - -class OracledbAdaptDBAPI: - def __init__(self, oracledb) -> None: - self.oracledb = oracledb - - for k, v in self.oracledb.__dict__.items(): - if k != "connect": - self.__dict__[k] = v - - def connect(self, *arg, **kw): - async_fallback = kw.pop("async_fallback", False) - creator_fn = kw.pop("async_creator_fn", self.oracledb.connect_async) - - if asbool(async_fallback): - return AsyncAdaptFallback_oracledb_connection( - self, await_fallback(creator_fn(*arg, **kw)) - ) - - else: - return AsyncAdapt_oracledb_connection( - self, await_only(creator_fn(*arg, **kw)) - ) - - -class OracleDialectAsync_oracledb(OracleDialect_oracledb): - is_async = True - supports_statement_cache = True - - _min_version = (2,) - - # thick_mode mode is not supported by asyncio, oracledb will raise - @classmethod - def import_dbapi(cls): - import oracledb - - return OracledbAdaptDBAPI(oracledb) - - @classmethod - def get_pool_class(cls, url): - async_fallback = url.query.get("async_fallback", False) - - if asbool(async_fallback): - return pool.FallbackAsyncAdaptedQueuePool - else: - return pool.AsyncAdaptedQueuePool - - def get_driver_connection(self, connection): - return connection._connection - - -dialect = OracleDialect_oracledb -dialect_async = OracleDialectAsync_oracledb diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py deleted file mode 100644 index b33c152..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/provision.py +++ /dev/null @@ -1,220 +0,0 @@ -# dialects/oracle/provision.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -from ... import create_engine -from ... import exc -from ... import inspect -from ...engine import url as sa_url -from ...testing.provision import configure_follower -from ...testing.provision import create_db -from ...testing.provision import drop_all_schema_objects_post_tables -from ...testing.provision import drop_all_schema_objects_pre_tables -from ...testing.provision import drop_db -from ...testing.provision import follower_url_from_main -from ...testing.provision import log -from ...testing.provision import post_configure_engine -from ...testing.provision import run_reap_dbs -from ...testing.provision import set_default_schema_on_connection -from ...testing.provision import stop_test_class_outside_fixtures -from ...testing.provision import temp_table_keyword_args -from ...testing.provision import update_db_opts - - -@create_db.for_db("oracle") -def _oracle_create_db(cfg, eng, ident): - # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or - # similar, so that the default tablespace is not "system"; reflection will - # fail otherwise - with eng.begin() as conn: - conn.exec_driver_sql("create user %s identified by xe" % ident) - conn.exec_driver_sql("create user %s_ts1 identified by xe" % ident) - conn.exec_driver_sql("create user %s_ts2 identified by xe" % ident) - conn.exec_driver_sql("grant dba to %s" % (ident,)) - conn.exec_driver_sql("grant unlimited tablespace to %s" % ident) - conn.exec_driver_sql("grant unlimited tablespace to %s_ts1" % ident) - conn.exec_driver_sql("grant unlimited tablespace to %s_ts2" % ident) - # these are needed to create materialized views - conn.exec_driver_sql("grant create table to %s" % ident) - conn.exec_driver_sql("grant create table to %s_ts1" % ident) - conn.exec_driver_sql("grant create table to %s_ts2" % ident) - - -@configure_follower.for_db("oracle") -def _oracle_configure_follower(config, ident): - config.test_schema = "%s_ts1" % ident - config.test_schema_2 = "%s_ts2" % ident - - -def _ora_drop_ignore(conn, dbname): - try: - conn.exec_driver_sql("drop user %s cascade" % dbname) - log.info("Reaped db: %s", dbname) - return True - except exc.DatabaseError as err: - log.warning("couldn't drop db: %s", err) - return False - - -@drop_all_schema_objects_pre_tables.for_db("oracle") -def _ora_drop_all_schema_objects_pre_tables(cfg, eng): - _purge_recyclebin(eng) - _purge_recyclebin(eng, cfg.test_schema) - - -@drop_all_schema_objects_post_tables.for_db("oracle") -def _ora_drop_all_schema_objects_post_tables(cfg, eng): - with eng.begin() as conn: - for syn in conn.dialect._get_synonyms(conn, None, None, None): - conn.exec_driver_sql(f"drop synonym {syn['synonym_name']}") - - for syn in conn.dialect._get_synonyms( - conn, cfg.test_schema, None, None - ): - conn.exec_driver_sql( - f"drop synonym {cfg.test_schema}.{syn['synonym_name']}" - ) - - for tmp_table in inspect(conn).get_temp_table_names(): - conn.exec_driver_sql(f"drop table {tmp_table}") - - -@drop_db.for_db("oracle") -def _oracle_drop_db(cfg, eng, ident): - with eng.begin() as conn: - # cx_Oracle seems to occasionally leak open connections when a large - # suite it run, even if we confirm we have zero references to - # connection objects. - # while there is a "kill session" command in Oracle, - # it unfortunately does not release the connection sufficiently. - _ora_drop_ignore(conn, ident) - _ora_drop_ignore(conn, "%s_ts1" % ident) - _ora_drop_ignore(conn, "%s_ts2" % ident) - - -@stop_test_class_outside_fixtures.for_db("oracle") -def _ora_stop_test_class_outside_fixtures(config, db, cls): - try: - _purge_recyclebin(db) - except exc.DatabaseError as err: - log.warning("purge recyclebin command failed: %s", err) - - # clear statement cache on all connections that were used - # https://github.com/oracle/python-cx_Oracle/issues/519 - - for cx_oracle_conn in _all_conns: - try: - sc = cx_oracle_conn.stmtcachesize - except db.dialect.dbapi.InterfaceError: - # connection closed - pass - else: - cx_oracle_conn.stmtcachesize = 0 - cx_oracle_conn.stmtcachesize = sc - _all_conns.clear() - - -def _purge_recyclebin(eng, schema=None): - with eng.begin() as conn: - if schema is None: - # run magic command to get rid of identity sequences - # https://floo.bar/2019/11/29/drop-the-underlying-sequence-of-an-identity-column/ # noqa: E501 - conn.exec_driver_sql("purge recyclebin") - else: - # per user: https://community.oracle.com/tech/developers/discussion/2255402/how-to-clear-dba-recyclebin-for-a-particular-user # noqa: E501 - for owner, object_name, type_ in conn.exec_driver_sql( - "select owner, object_name,type from " - "dba_recyclebin where owner=:schema and type='TABLE'", - {"schema": conn.dialect.denormalize_name(schema)}, - ).all(): - conn.exec_driver_sql(f'purge {type_} {owner}."{object_name}"') - - -_all_conns = set() - - -@post_configure_engine.for_db("oracle") -def _oracle_post_configure_engine(url, engine, follower_ident): - from sqlalchemy import event - - @event.listens_for(engine, "checkout") - def checkout(dbapi_con, con_record, con_proxy): - _all_conns.add(dbapi_con) - - @event.listens_for(engine, "checkin") - def checkin(dbapi_connection, connection_record): - # work around cx_Oracle issue: - # https://github.com/oracle/python-cx_Oracle/issues/530 - # invalidate oracle connections that had 2pc set up - if "cx_oracle_xid" in connection_record.info: - connection_record.invalidate() - - -@run_reap_dbs.for_db("oracle") -def _reap_oracle_dbs(url, idents): - log.info("db reaper connecting to %r", url) - eng = create_engine(url) - with eng.begin() as conn: - log.info("identifiers in file: %s", ", ".join(idents)) - - to_reap = conn.exec_driver_sql( - "select u.username from all_users u where username " - "like 'TEST_%' and not exists (select username " - "from v$session where username=u.username)" - ) - all_names = {username.lower() for (username,) in to_reap} - to_drop = set() - for name in all_names: - if name.endswith("_ts1") or name.endswith("_ts2"): - continue - elif name in idents: - to_drop.add(name) - if "%s_ts1" % name in all_names: - to_drop.add("%s_ts1" % name) - if "%s_ts2" % name in all_names: - to_drop.add("%s_ts2" % name) - - dropped = total = 0 - for total, username in enumerate(to_drop, 1): - if _ora_drop_ignore(conn, username): - dropped += 1 - log.info( - "Dropped %d out of %d stale databases detected", dropped, total - ) - - -@follower_url_from_main.for_db("oracle") -def _oracle_follower_url_from_main(url, ident): - url = sa_url.make_url(url) - return url.set(username=ident, password="xe") - - -@temp_table_keyword_args.for_db("oracle") -def _oracle_temp_table_keyword_args(cfg, eng): - return { - "prefixes": ["GLOBAL TEMPORARY"], - "oracle_on_commit": "PRESERVE ROWS", - } - - -@set_default_schema_on_connection.for_db("oracle") -def _oracle_set_default_schema_on_connection( - cfg, dbapi_connection, schema_name -): - cursor = dbapi_connection.cursor() - cursor.execute("ALTER SESSION SET CURRENT_SCHEMA=%s" % schema_name) - cursor.close() - - -@update_db_opts.for_db("oracle") -def _update_db_opts(db_url, db_opts, options): - """Set database options (db_opts) for a test database that we created.""" - if ( - options.oracledb_thick_mode - and sa_url.make_url(db_url).get_driver_name() == "oracledb" - ): - db_opts["thick_mode"] = True diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py deleted file mode 100644 index 36caaa0..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/oracle/types.py +++ /dev/null @@ -1,287 +0,0 @@ -# dialects/oracle/types.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# <see AUTHORS file> -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors -from __future__ import annotations - -import datetime as dt -from typing import Optional -from typing import Type -from typing import TYPE_CHECKING - -from ... import exc -from ...sql import sqltypes -from ...types import NVARCHAR -from ...types import VARCHAR - -if TYPE_CHECKING: - from ...engine.interfaces import Dialect - from ...sql.type_api import _LiteralProcessorType - - -class RAW(sqltypes._Binary): - __visit_name__ = "RAW" - - -OracleRaw = RAW - - -class NCLOB(sqltypes.Text): - __visit_name__ = "NCLOB" - - -class VARCHAR2(VARCHAR): - __visit_name__ = "VARCHAR2" - - -NVARCHAR2 = NVARCHAR - - -class NUMBER(sqltypes.Numeric, sqltypes.Integer): - __visit_name__ = "NUMBER" - - def __init__(self, precision=None, scale=None, asdecimal=None): - if asdecimal is None: - asdecimal = bool(scale and scale > 0) - - super().__init__(precision=precision, scale=scale, asdecimal=asdecimal) - - def adapt(self, impltype): - ret = super().adapt(impltype) - # leave a hint for the DBAPI handler - ret._is_oracle_number = True - return ret - - @property - def _type_affinity(self): - if bool(self.scale and self.scale > 0): - return sqltypes.Numeric - else: - return sqltypes.Integer - - -class FLOAT(sqltypes.FLOAT): - """Oracle FLOAT. - - This is the same as :class:`_sqltypes.FLOAT` except that - an Oracle-specific :paramref:`_oracle.FLOAT.binary_precision` - parameter is accepted, and - the :paramref:`_sqltypes.Float.precision` parameter is not accepted. - - Oracle FLOAT types indicate precision in terms of "binary precision", which - defaults to 126. For a REAL type, the value is 63. This parameter does not - cleanly map to a specific number of decimal places but is roughly - equivalent to the desired number of decimal places divided by 0.3103. - - .. versionadded:: 2.0 - - """ - - __visit_name__ = "FLOAT" - - def __init__( - self, - binary_precision=None, - asdecimal=False, - decimal_return_scale=None, - ): - r""" - Construct a FLOAT - - :param binary_precision: Oracle binary precision value to be rendered - in DDL. This may be approximated to the number of decimal characters - using the formula "decimal precision = 0.30103 * binary precision". - The default value used by Oracle for FLOAT / DOUBLE PRECISION is 126. - - :param asdecimal: See :paramref:`_sqltypes.Float.asdecimal` - - :param decimal_return_scale: See - :paramref:`_sqltypes.Float.decimal_return_scale` - - """ - super().__init__( - asdecimal=asdecimal, decimal_return_scale=decimal_return_scale - ) - self.binary_precision = binary_precision - - -class BINARY_DOUBLE(sqltypes.Double): - __visit_name__ = "BINARY_DOUBLE" - - -class BINARY_FLOAT(sqltypes.Float): - __visit_name__ = "BINARY_FLOAT" - - -class BFILE(sqltypes.LargeBinary): - __visit_name__ = "BFILE" - - -class LONG(sqltypes.Text): - __visit_name__ = "LONG" - - -class _OracleDateLiteralRender: - def _literal_processor_datetime(self, dialect): - def process(value): - if getattr(value, "microsecond", None): - value = ( - f"""TO_TIMESTAMP""" - f"""('{value.isoformat().replace("T", " ")}', """ - """'YYYY-MM-DD HH24:MI:SS.FF')""" - ) - else: - value = ( - f"""TO_DATE""" - f"""('{value.isoformat().replace("T", " ")}', """ - """'YYYY-MM-DD HH24:MI:SS')""" - ) - return value - - return process - - def _literal_processor_date(self, dialect): - def process(value): - if getattr(value, "microsecond", None): - value = ( - f"""TO_TIMESTAMP""" - f"""('{value.isoformat().split("T")[0]}', """ - """'YYYY-MM-DD')""" - ) - else: - value = ( - f"""TO_DATE""" - f"""('{value.isoformat().split("T")[0]}', """ - """'YYYY-MM-DD')""" - ) - return value - - return process - - -class DATE(_OracleDateLiteralRender, sqltypes.DateTime): - """Provide the oracle DATE type. - - This type has no special Python behavior, except that it subclasses - :class:`_types.DateTime`; this is to suit the fact that the Oracle - ``DATE`` type supports a time value. - - """ - - __visit_name__ = "DATE" - - def literal_processor(self, dialect): - return self._literal_processor_datetime(dialect) - - def _compare_type_affinity(self, other): - return other._type_affinity in (sqltypes.DateTime, sqltypes.Date) - - -class _OracleDate(_OracleDateLiteralRender, sqltypes.Date): - def literal_processor(self, dialect): - return self._literal_processor_date(dialect) - - -class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval): - __visit_name__ = "INTERVAL" - - def __init__(self, day_precision=None, second_precision=None): - """Construct an INTERVAL. - - Note that only DAY TO SECOND intervals are currently supported. - This is due to a lack of support for YEAR TO MONTH intervals - within available DBAPIs. - - :param day_precision: the day precision value. this is the number of - digits to store for the day field. Defaults to "2" - :param second_precision: the second precision value. this is the - number of digits to store for the fractional seconds field. - Defaults to "6". - - """ - self.day_precision = day_precision - self.second_precision = second_precision - - @classmethod - def _adapt_from_generic_interval(cls, interval): - return INTERVAL( - day_precision=interval.day_precision, - second_precision=interval.second_precision, - ) - - @classmethod - def adapt_emulated_to_native( - cls, interval: sqltypes.Interval, **kw # type: ignore[override] - ): - return INTERVAL( - day_precision=interval.day_precision, - second_precision=interval.second_precision, - ) - - @property - def _type_affinity(self): - return sqltypes.Interval - - def as_generic(self, allow_nulltype=False): - return sqltypes.Interval( - native=True, - second_precision=self.second_precision, - day_precision=self.day_precision, - ) - - @property - def python_type(self) -> Type[dt.timedelta]: - return dt.timedelta - - def literal_processor( - self, dialect: Dialect - ) -> Optional[_LiteralProcessorType[dt.timedelta]]: - def process(value: dt.timedelta) -> str: - return f"NUMTODSINTERVAL({value.total_seconds()}, 'SECOND')" - - return process - - -class TIMESTAMP(sqltypes.TIMESTAMP): - """Oracle implementation of ``TIMESTAMP``, which supports additional - Oracle-specific modes - - .. versionadded:: 2.0 - - """ - - def __init__(self, timezone: bool = False, local_timezone: bool = False): - """Construct a new :class:`_oracle.TIMESTAMP`. - - :param timezone: boolean. Indicates that the TIMESTAMP type should - use Oracle's ``TIMESTAMP WITH TIME ZONE`` datatype. - - :param local_timezone: boolean. Indicates that the TIMESTAMP type - should use Oracle's ``TIMESTAMP WITH LOCAL TIME ZONE`` datatype. - - - """ - if timezone and local_timezone: - raise exc.ArgumentError( - "timezone and local_timezone are mutually exclusive" - ) - super().__init__(timezone=timezone) - self.local_timezone = local_timezone - - -class ROWID(sqltypes.TypeEngine): - """Oracle ROWID type. - - When used in a cast() or similar, generates ROWID. - - """ - - __visit_name__ = "ROWID" - - -class _OracleBoolean(sqltypes.Boolean): - def get_dbapi_type(self, dbapi): - return dbapi.NUMBER |