summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py662
1 files changed, 0 insertions, 662 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py b/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py
deleted file mode 100644
index 0151be0..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py
+++ /dev/null
@@ -1,662 +0,0 @@
-# dialects/postgresql/pg8000.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors <see AUTHORS
-# file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-r"""
-.. dialect:: postgresql+pg8000
- :name: pg8000
- :dbapi: pg8000
- :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...]
- :url: https://pypi.org/project/pg8000/
-
-.. versionchanged:: 1.4 The pg8000 dialect has been updated for version
- 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration
- with full feature support.
-
-.. _pg8000_unicode:
-
-Unicode
--------
-
-pg8000 will encode / decode string values between it and the server using the
-PostgreSQL ``client_encoding`` parameter; by default this is the value in
-the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``.
-Typically, this can be changed to ``utf-8``, as a more useful default::
-
- #client_encoding = sql_ascii # actually, defaults to database
- # encoding
- client_encoding = utf8
-
-The ``client_encoding`` can be overridden for a session by executing the SQL:
-
-SET CLIENT_ENCODING TO 'utf8';
-
-SQLAlchemy will execute this SQL on all new connections based on the value
-passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter::
-
- engine = create_engine(
- "postgresql+pg8000://user:pass@host/dbname", client_encoding='utf8')
-
-.. _pg8000_ssl:
-
-SSL Connections
----------------
-
-pg8000 accepts a Python ``SSLContext`` object which may be specified using the
-:paramref:`_sa.create_engine.connect_args` dictionary::
-
- import ssl
- ssl_context = ssl.create_default_context()
- engine = sa.create_engine(
- "postgresql+pg8000://scott:tiger@192.168.0.199/test",
- connect_args={"ssl_context": ssl_context},
- )
-
-If the server uses an automatically-generated certificate that is self-signed
-or does not match the host name (as seen from the client), it may also be
-necessary to disable hostname checking::
-
- import ssl
- ssl_context = ssl.create_default_context()
- ssl_context.check_hostname = False
- ssl_context.verify_mode = ssl.CERT_NONE
- engine = sa.create_engine(
- "postgresql+pg8000://scott:tiger@192.168.0.199/test",
- connect_args={"ssl_context": ssl_context},
- )
-
-.. _pg8000_isolation_level:
-
-pg8000 Transaction Isolation Level
--------------------------------------
-
-The pg8000 dialect offers the same isolation level settings as that
-of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect:
-
-* ``READ COMMITTED``
-* ``READ UNCOMMITTED``
-* ``REPEATABLE READ``
-* ``SERIALIZABLE``
-* ``AUTOCOMMIT``
-
-.. seealso::
-
- :ref:`postgresql_isolation_level`
-
- :ref:`psycopg2_isolation_level`
-
-
-""" # noqa
-import decimal
-import re
-
-from . import ranges
-from .array import ARRAY as PGARRAY
-from .base import _DECIMAL_TYPES
-from .base import _FLOAT_TYPES
-from .base import _INT_TYPES
-from .base import ENUM
-from .base import INTERVAL
-from .base import PGCompiler
-from .base import PGDialect
-from .base import PGExecutionContext
-from .base import PGIdentifierPreparer
-from .json import JSON
-from .json import JSONB
-from .json import JSONPathType
-from .pg_catalog import _SpaceVector
-from .pg_catalog import OIDVECTOR
-from .types import CITEXT
-from ... import exc
-from ... import util
-from ...engine import processors
-from ...sql import sqltypes
-from ...sql.elements import quoted_name
-
-
-class _PGString(sqltypes.String):
- render_bind_cast = True
-
-
-class _PGNumeric(sqltypes.Numeric):
- render_bind_cast = True
-
- def result_processor(self, dialect, coltype):
- if self.asdecimal:
- if coltype in _FLOAT_TYPES:
- return processors.to_decimal_processor_factory(
- decimal.Decimal, self._effective_decimal_return_scale
- )
- elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
- # pg8000 returns Decimal natively for 1700
- return None
- else:
- raise exc.InvalidRequestError(
- "Unknown PG numeric type: %d" % coltype
- )
- else:
- if coltype in _FLOAT_TYPES:
- # pg8000 returns float natively for 701
- return None
- elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
- return processors.to_float
- else:
- raise exc.InvalidRequestError(
- "Unknown PG numeric type: %d" % coltype
- )
-
-
-class _PGFloat(_PGNumeric, sqltypes.Float):
- __visit_name__ = "float"
- render_bind_cast = True
-
-
-class _PGNumericNoBind(_PGNumeric):
- def bind_processor(self, dialect):
- return None
-
-
-class _PGJSON(JSON):
- render_bind_cast = True
-
- def result_processor(self, dialect, coltype):
- return None
-
-
-class _PGJSONB(JSONB):
- render_bind_cast = True
-
- def result_processor(self, dialect, coltype):
- return None
-
-
-class _PGJSONIndexType(sqltypes.JSON.JSONIndexType):
- def get_dbapi_type(self, dbapi):
- raise NotImplementedError("should not be here")
-
-
-class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
- __visit_name__ = "json_int_index"
-
- render_bind_cast = True
-
-
-class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
- __visit_name__ = "json_str_index"
-
- render_bind_cast = True
-
-
-class _PGJSONPathType(JSONPathType):
- pass
-
- # DBAPI type 1009
-
-
-class _PGEnum(ENUM):
- def get_dbapi_type(self, dbapi):
- return dbapi.UNKNOWN
-
-
-class _PGInterval(INTERVAL):
- render_bind_cast = True
-
- def get_dbapi_type(self, dbapi):
- return dbapi.INTERVAL
-
- @classmethod
- def adapt_emulated_to_native(cls, interval, **kw):
- return _PGInterval(precision=interval.second_precision)
-
-
-class _PGTimeStamp(sqltypes.DateTime):
- render_bind_cast = True
-
-
-class _PGDate(sqltypes.Date):
- render_bind_cast = True
-
-
-class _PGTime(sqltypes.Time):
- render_bind_cast = True
-
-
-class _PGInteger(sqltypes.Integer):
- render_bind_cast = True
-
-
-class _PGSmallInteger(sqltypes.SmallInteger):
- render_bind_cast = True
-
-
-class _PGNullType(sqltypes.NullType):
- pass
-
-
-class _PGBigInteger(sqltypes.BigInteger):
- render_bind_cast = True
-
-
-class _PGBoolean(sqltypes.Boolean):
- render_bind_cast = True
-
-
-class _PGARRAY(PGARRAY):
- render_bind_cast = True
-
-
-class _PGOIDVECTOR(_SpaceVector, OIDVECTOR):
- pass
-
-
-class _Pg8000Range(ranges.AbstractSingleRangeImpl):
- def bind_processor(self, dialect):
- pg8000_Range = dialect.dbapi.Range
-
- def to_range(value):
- if isinstance(value, ranges.Range):
- value = pg8000_Range(
- value.lower, value.upper, value.bounds, value.empty
- )
- return value
-
- return to_range
-
- def result_processor(self, dialect, coltype):
- def to_range(value):
- if value is not None:
- value = ranges.Range(
- value.lower,
- value.upper,
- bounds=value.bounds,
- empty=value.is_empty,
- )
- return value
-
- return to_range
-
-
-class _Pg8000MultiRange(ranges.AbstractMultiRangeImpl):
- def bind_processor(self, dialect):
- pg8000_Range = dialect.dbapi.Range
-
- def to_multirange(value):
- if isinstance(value, list):
- mr = []
- for v in value:
- if isinstance(v, ranges.Range):
- mr.append(
- pg8000_Range(v.lower, v.upper, v.bounds, v.empty)
- )
- else:
- mr.append(v)
- return mr
- else:
- return value
-
- return to_multirange
-
- def result_processor(self, dialect, coltype):
- def to_multirange(value):
- if value is None:
- return None
- else:
- return ranges.MultiRange(
- ranges.Range(
- v.lower, v.upper, bounds=v.bounds, empty=v.is_empty
- )
- for v in value
- )
-
- return to_multirange
-
-
-_server_side_id = util.counter()
-
-
-class PGExecutionContext_pg8000(PGExecutionContext):
- def create_server_side_cursor(self):
- ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
- return ServerSideCursor(self._dbapi_connection.cursor(), ident)
-
- def pre_exec(self):
- if not self.compiled:
- return
-
-
-class ServerSideCursor:
- server_side = True
-
- def __init__(self, cursor, ident):
- self.ident = ident
- self.cursor = cursor
-
- @property
- def connection(self):
- return self.cursor.connection
-
- @property
- def rowcount(self):
- return self.cursor.rowcount
-
- @property
- def description(self):
- return self.cursor.description
-
- def execute(self, operation, args=(), stream=None):
- op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation
- self.cursor.execute(op, args, stream=stream)
- return self
-
- def executemany(self, operation, param_sets):
- self.cursor.executemany(operation, param_sets)
- return self
-
- def fetchone(self):
- self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident)
- return self.cursor.fetchone()
-
- def fetchmany(self, num=None):
- if num is None:
- return self.fetchall()
- else:
- self.cursor.execute(
- "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident
- )
- return self.cursor.fetchall()
-
- def fetchall(self):
- self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident)
- return self.cursor.fetchall()
-
- def close(self):
- self.cursor.execute("CLOSE " + self.ident)
- self.cursor.close()
-
- def setinputsizes(self, *sizes):
- self.cursor.setinputsizes(*sizes)
-
- def setoutputsize(self, size, column=None):
- pass
-
-
-class PGCompiler_pg8000(PGCompiler):
- def visit_mod_binary(self, binary, operator, **kw):
- return (
- self.process(binary.left, **kw)
- + " %% "
- + self.process(binary.right, **kw)
- )
-
-
-class PGIdentifierPreparer_pg8000(PGIdentifierPreparer):
- def __init__(self, *args, **kwargs):
- PGIdentifierPreparer.__init__(self, *args, **kwargs)
- self._double_percents = False
-
-
-class PGDialect_pg8000(PGDialect):
- driver = "pg8000"
- supports_statement_cache = True
-
- supports_unicode_statements = True
-
- supports_unicode_binds = True
-
- default_paramstyle = "format"
- supports_sane_multi_rowcount = True
- execution_ctx_cls = PGExecutionContext_pg8000
- statement_compiler = PGCompiler_pg8000
- preparer = PGIdentifierPreparer_pg8000
- supports_server_side_cursors = True
-
- render_bind_cast = True
-
- # reversed as of pg8000 1.16.6. 1.16.5 and lower
- # are no longer compatible
- description_encoding = None
- # description_encoding = "use_encoding"
-
- colspecs = util.update_copy(
- PGDialect.colspecs,
- {
- sqltypes.String: _PGString,
- sqltypes.Numeric: _PGNumericNoBind,
- sqltypes.Float: _PGFloat,
- sqltypes.JSON: _PGJSON,
- sqltypes.Boolean: _PGBoolean,
- sqltypes.NullType: _PGNullType,
- JSONB: _PGJSONB,
- CITEXT: CITEXT,
- sqltypes.JSON.JSONPathType: _PGJSONPathType,
- sqltypes.JSON.JSONIndexType: _PGJSONIndexType,
- sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
- sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
- sqltypes.Interval: _PGInterval,
- INTERVAL: _PGInterval,
- sqltypes.DateTime: _PGTimeStamp,
- sqltypes.DateTime: _PGTimeStamp,
- sqltypes.Date: _PGDate,
- sqltypes.Time: _PGTime,
- sqltypes.Integer: _PGInteger,
- sqltypes.SmallInteger: _PGSmallInteger,
- sqltypes.BigInteger: _PGBigInteger,
- sqltypes.Enum: _PGEnum,
- sqltypes.ARRAY: _PGARRAY,
- OIDVECTOR: _PGOIDVECTOR,
- ranges.INT4RANGE: _Pg8000Range,
- ranges.INT8RANGE: _Pg8000Range,
- ranges.NUMRANGE: _Pg8000Range,
- ranges.DATERANGE: _Pg8000Range,
- ranges.TSRANGE: _Pg8000Range,
- ranges.TSTZRANGE: _Pg8000Range,
- ranges.INT4MULTIRANGE: _Pg8000MultiRange,
- ranges.INT8MULTIRANGE: _Pg8000MultiRange,
- ranges.NUMMULTIRANGE: _Pg8000MultiRange,
- ranges.DATEMULTIRANGE: _Pg8000MultiRange,
- ranges.TSMULTIRANGE: _Pg8000MultiRange,
- ranges.TSTZMULTIRANGE: _Pg8000MultiRange,
- },
- )
-
- def __init__(self, client_encoding=None, **kwargs):
- PGDialect.__init__(self, **kwargs)
- self.client_encoding = client_encoding
-
- if self._dbapi_version < (1, 16, 6):
- raise NotImplementedError("pg8000 1.16.6 or greater is required")
-
- if self._native_inet_types:
- raise NotImplementedError(
- "The pg8000 dialect does not fully implement "
- "ipaddress type handling; INET is supported by default, "
- "CIDR is not"
- )
-
- @util.memoized_property
- def _dbapi_version(self):
- if self.dbapi and hasattr(self.dbapi, "__version__"):
- return tuple(
- [
- int(x)
- for x in re.findall(
- r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__
- )
- ]
- )
- else:
- return (99, 99, 99)
-
- @classmethod
- def import_dbapi(cls):
- return __import__("pg8000")
-
- def create_connect_args(self, url):
- opts = url.translate_connect_args(username="user")
- if "port" in opts:
- opts["port"] = int(opts["port"])
- opts.update(url.query)
- return ([], opts)
-
- def is_disconnect(self, e, connection, cursor):
- if isinstance(e, self.dbapi.InterfaceError) and "network error" in str(
- e
- ):
- # new as of pg8000 1.19.0 for broken connections
- return True
-
- # connection was closed normally
- return "connection is closed" in str(e)
-
- def get_isolation_level_values(self, dbapi_connection):
- return (
- "AUTOCOMMIT",
- "READ COMMITTED",
- "READ UNCOMMITTED",
- "REPEATABLE READ",
- "SERIALIZABLE",
- )
-
- def set_isolation_level(self, dbapi_connection, level):
- level = level.replace("_", " ")
-
- if level == "AUTOCOMMIT":
- dbapi_connection.autocommit = True
- else:
- dbapi_connection.autocommit = False
- cursor = dbapi_connection.cursor()
- cursor.execute(
- "SET SESSION CHARACTERISTICS AS TRANSACTION "
- f"ISOLATION LEVEL {level}"
- )
- cursor.execute("COMMIT")
- cursor.close()
-
- def set_readonly(self, connection, value):
- cursor = connection.cursor()
- try:
- cursor.execute(
- "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
- % ("READ ONLY" if value else "READ WRITE")
- )
- cursor.execute("COMMIT")
- finally:
- cursor.close()
-
- def get_readonly(self, connection):
- cursor = connection.cursor()
- try:
- cursor.execute("show transaction_read_only")
- val = cursor.fetchone()[0]
- finally:
- cursor.close()
-
- return val == "on"
-
- def set_deferrable(self, connection, value):
- cursor = connection.cursor()
- try:
- cursor.execute(
- "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
- % ("DEFERRABLE" if value else "NOT DEFERRABLE")
- )
- cursor.execute("COMMIT")
- finally:
- cursor.close()
-
- def get_deferrable(self, connection):
- cursor = connection.cursor()
- try:
- cursor.execute("show transaction_deferrable")
- val = cursor.fetchone()[0]
- finally:
- cursor.close()
-
- return val == "on"
-
- def _set_client_encoding(self, dbapi_connection, client_encoding):
- cursor = dbapi_connection.cursor()
- cursor.execute(
- f"""SET CLIENT_ENCODING TO '{
- client_encoding.replace("'", "''")
- }'"""
- )
- cursor.execute("COMMIT")
- cursor.close()
-
- def do_begin_twophase(self, connection, xid):
- connection.connection.tpc_begin((0, xid, ""))
-
- def do_prepare_twophase(self, connection, xid):
- connection.connection.tpc_prepare()
-
- def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- connection.connection.tpc_rollback((0, xid, ""))
-
- def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- connection.connection.tpc_commit((0, xid, ""))
-
- def do_recover_twophase(self, connection):
- return [row[1] for row in connection.connection.tpc_recover()]
-
- def on_connect(self):
- fns = []
-
- def on_connect(conn):
- conn.py_types[quoted_name] = conn.py_types[str]
-
- fns.append(on_connect)
-
- if self.client_encoding is not None:
-
- def on_connect(conn):
- self._set_client_encoding(conn, self.client_encoding)
-
- fns.append(on_connect)
-
- if self._native_inet_types is False:
-
- def on_connect(conn):
- # inet
- conn.register_in_adapter(869, lambda s: s)
-
- # cidr
- conn.register_in_adapter(650, lambda s: s)
-
- fns.append(on_connect)
-
- if self._json_deserializer:
-
- def on_connect(conn):
- # json
- conn.register_in_adapter(114, self._json_deserializer)
-
- # jsonb
- conn.register_in_adapter(3802, self._json_deserializer)
-
- fns.append(on_connect)
-
- if len(fns) > 0:
-
- def on_connect(conn):
- for fn in fns:
- fn(conn)
-
- return on_connect
- else:
- return None
-
- @util.memoized_property
- def _dialect_specific_select_one(self):
- return ";"
-
-
-dialect = PGDialect_pg8000