summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/sql/base.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:17:55 -0400
commit12cf076118570eebbff08c6b3090e0d4798447a1 (patch)
tree3ba25e17e3c3a5e82316558ba3864b955919ff72 /venv/lib/python3.11/site-packages/sqlalchemy/sql/base.py
parentc45662ff3923b34614ddcc8feb9195541166dcc5 (diff)
no venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/sql/base.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/sql/base.py2180
1 files changed, 0 insertions, 2180 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/sql/base.py b/venv/lib/python3.11/site-packages/sqlalchemy/sql/base.py
deleted file mode 100644
index 5eb32e3..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/sql/base.py
+++ /dev/null
@@ -1,2180 +0,0 @@
-# sql/base.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: allow-untyped-defs, allow-untyped-calls
-
-"""Foundational utilities common to many sql modules.
-
-"""
-
-
-from __future__ import annotations
-
-import collections
-from enum import Enum
-import itertools
-from itertools import zip_longest
-import operator
-import re
-from typing import Any
-from typing import Callable
-from typing import cast
-from typing import Dict
-from typing import FrozenSet
-from typing import Generic
-from typing import Iterable
-from typing import Iterator
-from typing import List
-from typing import Mapping
-from typing import MutableMapping
-from typing import NamedTuple
-from typing import NoReturn
-from typing import Optional
-from typing import overload
-from typing import Sequence
-from typing import Set
-from typing import Tuple
-from typing import Type
-from typing import TYPE_CHECKING
-from typing import TypeVar
-from typing import Union
-
-from . import roles
-from . import visitors
-from .cache_key import HasCacheKey # noqa
-from .cache_key import MemoizedHasCacheKey # noqa
-from .traversals import HasCopyInternals # noqa
-from .visitors import ClauseVisitor
-from .visitors import ExtendedInternalTraversal
-from .visitors import ExternallyTraversible
-from .visitors import InternalTraversal
-from .. import event
-from .. import exc
-from .. import util
-from ..util import HasMemoized as HasMemoized
-from ..util import hybridmethod
-from ..util import typing as compat_typing
-from ..util.typing import Protocol
-from ..util.typing import Self
-from ..util.typing import TypeGuard
-
-if TYPE_CHECKING:
- from . import coercions
- from . import elements
- from . import type_api
- from ._orm_types import DMLStrategyArgument
- from ._orm_types import SynchronizeSessionArgument
- from ._typing import _CLE
- from .elements import BindParameter
- from .elements import ClauseList
- from .elements import ColumnClause # noqa
- from .elements import ColumnElement
- from .elements import KeyedColumnElement
- from .elements import NamedColumn
- from .elements import SQLCoreOperations
- from .elements import TextClause
- from .schema import Column
- from .schema import DefaultGenerator
- from .selectable import _JoinTargetElement
- from .selectable import _SelectIterable
- from .selectable import FromClause
- from ..engine import Connection
- from ..engine import CursorResult
- from ..engine.interfaces import _CoreMultiExecuteParams
- from ..engine.interfaces import _ExecuteOptions
- from ..engine.interfaces import _ImmutableExecuteOptions
- from ..engine.interfaces import CacheStats
- from ..engine.interfaces import Compiled
- from ..engine.interfaces import CompiledCacheType
- from ..engine.interfaces import CoreExecuteOptionsParameter
- from ..engine.interfaces import Dialect
- from ..engine.interfaces import IsolationLevel
- from ..engine.interfaces import SchemaTranslateMapType
- from ..event import dispatcher
-
-if not TYPE_CHECKING:
- coercions = None # noqa
- elements = None # noqa
- type_api = None # noqa
-
-
-class _NoArg(Enum):
- NO_ARG = 0
-
- def __repr__(self):
- return f"_NoArg.{self.name}"
-
-
-NO_ARG = _NoArg.NO_ARG
-
-
-class _NoneName(Enum):
- NONE_NAME = 0
- """indicate a 'deferred' name that was ultimately the value None."""
-
-
-_NONE_NAME = _NoneName.NONE_NAME
-
-_T = TypeVar("_T", bound=Any)
-
-_Fn = TypeVar("_Fn", bound=Callable[..., Any])
-
-_AmbiguousTableNameMap = MutableMapping[str, str]
-
-
-class _DefaultDescriptionTuple(NamedTuple):
- arg: Any
- is_scalar: Optional[bool]
- is_callable: Optional[bool]
- is_sentinel: Optional[bool]
-
- @classmethod
- def _from_column_default(
- cls, default: Optional[DefaultGenerator]
- ) -> _DefaultDescriptionTuple:
- return (
- _DefaultDescriptionTuple(
- default.arg, # type: ignore
- default.is_scalar,
- default.is_callable,
- default.is_sentinel,
- )
- if default
- and (
- default.has_arg
- or (not default.for_update and default.is_sentinel)
- )
- else _DefaultDescriptionTuple(None, None, None, None)
- )
-
-
-_never_select_column = operator.attrgetter("_omit_from_statements")
-
-
-class _EntityNamespace(Protocol):
- def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
-
-
-class _HasEntityNamespace(Protocol):
- @util.ro_non_memoized_property
- def entity_namespace(self) -> _EntityNamespace: ...
-
-
-def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
- return hasattr(element, "entity_namespace")
-
-
-# Remove when https://github.com/python/mypy/issues/14640 will be fixed
-_Self = TypeVar("_Self", bound=Any)
-
-
-class Immutable:
- """mark a ClauseElement as 'immutable' when expressions are cloned.
-
- "immutable" objects refers to the "mutability" of an object in the
- context of SQL DQL and DML generation. Such as, in DQL, one can
- compose a SELECT or subquery of varied forms, but one cannot modify
- the structure of a specific table or column within DQL.
- :class:`.Immutable` is mostly intended to follow this concept, and as
- such the primary "immutable" objects are :class:`.ColumnClause`,
- :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
-
- """
-
- __slots__ = ()
-
- _is_immutable = True
-
- def unique_params(self, *optionaldict, **kwargs):
- raise NotImplementedError("Immutable objects do not support copying")
-
- def params(self, *optionaldict, **kwargs):
- raise NotImplementedError("Immutable objects do not support copying")
-
- def _clone(self: _Self, **kw: Any) -> _Self:
- return self
-
- def _copy_internals(
- self, *, omit_attrs: Iterable[str] = (), **kw: Any
- ) -> None:
- pass
-
-
-class SingletonConstant(Immutable):
- """Represent SQL constants like NULL, TRUE, FALSE"""
-
- _is_singleton_constant = True
-
- _singleton: SingletonConstant
-
- def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
- return cast(_T, cls._singleton)
-
- @util.non_memoized_property
- def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
- raise NotImplementedError()
-
- @classmethod
- def _create_singleton(cls):
- obj = object.__new__(cls)
- obj.__init__() # type: ignore
-
- # for a long time this was an empty frozenset, meaning
- # a SingletonConstant would never be a "corresponding column" in
- # a statement. This referred to #6259. However, in #7154 we see
- # that we do in fact need "correspondence" to work when matching cols
- # in result sets, so the non-correspondence was moved to a more
- # specific level when we are actually adapting expressions for SQL
- # render only.
- obj.proxy_set = frozenset([obj])
- cls._singleton = obj
-
-
-def _from_objects(
- *elements: Union[
- ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
- ]
-) -> Iterator[FromClause]:
- return itertools.chain.from_iterable(
- [element._from_objects for element in elements]
- )
-
-
-def _select_iterables(
- elements: Iterable[roles.ColumnsClauseRole],
-) -> _SelectIterable:
- """expand tables into individual columns in the
- given list of column expressions.
-
- """
- return itertools.chain.from_iterable(
- [c._select_iterable for c in elements]
- )
-
-
-_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
-
-
-class _GenerativeType(compat_typing.Protocol):
- def _generate(self) -> Self: ...
-
-
-def _generative(fn: _Fn) -> _Fn:
- """non-caching _generative() decorator.
-
- This is basically the legacy decorator that copies the object and
- runs a method on the new copy.
-
- """
-
- @util.decorator
- def _generative(
- fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
- ) -> _SelfGenerativeType:
- """Mark a method as generative."""
-
- self = self._generate()
- x = fn(self, *args, **kw)
- assert x is self, "generative methods must return self"
- return self
-
- decorated = _generative(fn)
- decorated.non_generative = fn # type: ignore
- return decorated
-
-
-def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
- msgs = kw.pop("msgs", {})
-
- defaults = kw.pop("defaults", {})
-
- getters = [
- (name, operator.attrgetter(name), defaults.get(name, None))
- for name in names
- ]
-
- @util.decorator
- def check(fn, *args, **kw):
- # make pylance happy by not including "self" in the argument
- # list
- self = args[0]
- args = args[1:]
- for name, getter, default_ in getters:
- if getter(self) is not default_:
- msg = msgs.get(
- name,
- "Method %s() has already been invoked on this %s construct"
- % (fn.__name__, self.__class__),
- )
- raise exc.InvalidRequestError(msg)
- return fn(self, *args, **kw)
-
- return check
-
-
-def _clone(element, **kw):
- return element._clone(**kw)
-
-
-def _expand_cloned(
- elements: Iterable[_CLE],
-) -> Iterable[_CLE]:
- """expand the given set of ClauseElements to be the set of all 'cloned'
- predecessors.
-
- """
- # TODO: cython candidate
- return itertools.chain(*[x._cloned_set for x in elements])
-
-
-def _de_clone(
- elements: Iterable[_CLE],
-) -> Iterable[_CLE]:
- for x in elements:
- while x._is_clone_of is not None:
- x = x._is_clone_of
- yield x
-
-
-def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
- """return the intersection of sets a and b, counting
- any overlap between 'cloned' predecessors.
-
- The returned set is in terms of the entities present within 'a'.
-
- """
- all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
- return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
-
-
-def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
- all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
- return {
- elem for elem in a if not all_overlap.intersection(elem._cloned_set)
- }
-
-
-class _DialectArgView(MutableMapping[str, Any]):
- """A dictionary view of dialect-level arguments in the form
- <dialectname>_<argument_name>.
-
- """
-
- def __init__(self, obj):
- self.obj = obj
-
- def _key(self, key):
- try:
- dialect, value_key = key.split("_", 1)
- except ValueError as err:
- raise KeyError(key) from err
- else:
- return dialect, value_key
-
- def __getitem__(self, key):
- dialect, value_key = self._key(key)
-
- try:
- opt = self.obj.dialect_options[dialect]
- except exc.NoSuchModuleError as err:
- raise KeyError(key) from err
- else:
- return opt[value_key]
-
- def __setitem__(self, key, value):
- try:
- dialect, value_key = self._key(key)
- except KeyError as err:
- raise exc.ArgumentError(
- "Keys must be of the form <dialectname>_<argname>"
- ) from err
- else:
- self.obj.dialect_options[dialect][value_key] = value
-
- def __delitem__(self, key):
- dialect, value_key = self._key(key)
- del self.obj.dialect_options[dialect][value_key]
-
- def __len__(self):
- return sum(
- len(args._non_defaults)
- for args in self.obj.dialect_options.values()
- )
-
- def __iter__(self):
- return (
- "%s_%s" % (dialect_name, value_name)
- for dialect_name in self.obj.dialect_options
- for value_name in self.obj.dialect_options[
- dialect_name
- ]._non_defaults
- )
-
-
-class _DialectArgDict(MutableMapping[str, Any]):
- """A dictionary view of dialect-level arguments for a specific
- dialect.
-
- Maintains a separate collection of user-specified arguments
- and dialect-specified default arguments.
-
- """
-
- def __init__(self):
- self._non_defaults = {}
- self._defaults = {}
-
- def __len__(self):
- return len(set(self._non_defaults).union(self._defaults))
-
- def __iter__(self):
- return iter(set(self._non_defaults).union(self._defaults))
-
- def __getitem__(self, key):
- if key in self._non_defaults:
- return self._non_defaults[key]
- else:
- return self._defaults[key]
-
- def __setitem__(self, key, value):
- self._non_defaults[key] = value
-
- def __delitem__(self, key):
- del self._non_defaults[key]
-
-
-@util.preload_module("sqlalchemy.dialects")
-def _kw_reg_for_dialect(dialect_name):
- dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
- if dialect_cls.construct_arguments is None:
- return None
- return dict(dialect_cls.construct_arguments)
-
-
-class DialectKWArgs:
- """Establish the ability for a class to have dialect-specific arguments
- with defaults and constructor validation.
-
- The :class:`.DialectKWArgs` interacts with the
- :attr:`.DefaultDialect.construct_arguments` present on a dialect.
-
- .. seealso::
-
- :attr:`.DefaultDialect.construct_arguments`
-
- """
-
- __slots__ = ()
-
- _dialect_kwargs_traverse_internals = [
- ("dialect_options", InternalTraversal.dp_dialect_options)
- ]
-
- @classmethod
- def argument_for(cls, dialect_name, argument_name, default):
- """Add a new kind of dialect-specific keyword argument for this class.
-
- E.g.::
-
- Index.argument_for("mydialect", "length", None)
-
- some_index = Index('a', 'b', mydialect_length=5)
-
- The :meth:`.DialectKWArgs.argument_for` method is a per-argument
- way adding extra arguments to the
- :attr:`.DefaultDialect.construct_arguments` dictionary. This
- dictionary provides a list of argument names accepted by various
- schema-level constructs on behalf of a dialect.
-
- New dialects should typically specify this dictionary all at once as a
- data member of the dialect class. The use case for ad-hoc addition of
- argument names is typically for end-user code that is also using
- a custom compilation scheme which consumes the additional arguments.
-
- :param dialect_name: name of a dialect. The dialect must be
- locatable, else a :class:`.NoSuchModuleError` is raised. The
- dialect must also include an existing
- :attr:`.DefaultDialect.construct_arguments` collection, indicating
- that it participates in the keyword-argument validation and default
- system, else :class:`.ArgumentError` is raised. If the dialect does
- not include this collection, then any keyword argument can be
- specified on behalf of this dialect already. All dialects packaged
- within SQLAlchemy include this collection, however for third party
- dialects, support may vary.
-
- :param argument_name: name of the parameter.
-
- :param default: default value of the parameter.
-
- """
-
- construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
- if construct_arg_dictionary is None:
- raise exc.ArgumentError(
- "Dialect '%s' does have keyword-argument "
- "validation and defaults enabled configured" % dialect_name
- )
- if cls not in construct_arg_dictionary:
- construct_arg_dictionary[cls] = {}
- construct_arg_dictionary[cls][argument_name] = default
-
- @util.memoized_property
- def dialect_kwargs(self):
- """A collection of keyword arguments specified as dialect-specific
- options to this construct.
-
- The arguments are present here in their original ``<dialect>_<kwarg>``
- format. Only arguments that were actually passed are included;
- unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
- contains all options known by this dialect including defaults.
-
- The collection is also writable; keys are accepted of the
- form ``<dialect>_<kwarg>`` where the value will be assembled
- into the list of options.
-
- .. seealso::
-
- :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
-
- """
- return _DialectArgView(self)
-
- @property
- def kwargs(self):
- """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
- return self.dialect_kwargs
-
- _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
-
- def _kw_reg_for_dialect_cls(self, dialect_name):
- construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
- d = _DialectArgDict()
-
- if construct_arg_dictionary is None:
- d._defaults.update({"*": None})
- else:
- for cls in reversed(self.__class__.__mro__):
- if cls in construct_arg_dictionary:
- d._defaults.update(construct_arg_dictionary[cls])
- return d
-
- @util.memoized_property
- def dialect_options(self):
- """A collection of keyword arguments specified as dialect-specific
- options to this construct.
-
- This is a two-level nested registry, keyed to ``<dialect_name>``
- and ``<argument_name>``. For example, the ``postgresql_where``
- argument would be locatable as::
-
- arg = my_object.dialect_options['postgresql']['where']
-
- .. versionadded:: 0.9.2
-
- .. seealso::
-
- :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
-
- """
-
- return util.PopulateDict(
- util.portable_instancemethod(self._kw_reg_for_dialect_cls)
- )
-
- def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
- # validate remaining kwargs that they all specify DB prefixes
-
- if not kwargs:
- return
-
- for k in kwargs:
- m = re.match("^(.+?)_(.+)$", k)
- if not m:
- raise TypeError(
- "Additional arguments should be "
- "named <dialectname>_<argument>, got '%s'" % k
- )
- dialect_name, arg_name = m.group(1, 2)
-
- try:
- construct_arg_dictionary = self.dialect_options[dialect_name]
- except exc.NoSuchModuleError:
- util.warn(
- "Can't validate argument %r; can't "
- "locate any SQLAlchemy dialect named %r"
- % (k, dialect_name)
- )
- self.dialect_options[dialect_name] = d = _DialectArgDict()
- d._defaults.update({"*": None})
- d._non_defaults[arg_name] = kwargs[k]
- else:
- if (
- "*" not in construct_arg_dictionary
- and arg_name not in construct_arg_dictionary
- ):
- raise exc.ArgumentError(
- "Argument %r is not accepted by "
- "dialect %r on behalf of %r"
- % (k, dialect_name, self.__class__)
- )
- else:
- construct_arg_dictionary[arg_name] = kwargs[k]
-
-
-class CompileState:
- """Produces additional object state necessary for a statement to be
- compiled.
-
- the :class:`.CompileState` class is at the base of classes that assemble
- state for a particular statement object that is then used by the
- compiler. This process is essentially an extension of the process that
- the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
- on converting raw user intent into more organized structures rather than
- producing string output. The top-level :class:`.CompileState` for the
- statement being executed is also accessible when the execution context
- works with invoking the statement and collecting results.
-
- The production of :class:`.CompileState` is specific to the compiler, such
- as within the :meth:`.SQLCompiler.visit_insert`,
- :meth:`.SQLCompiler.visit_select` etc. methods. These methods are also
- responsible for associating the :class:`.CompileState` with the
- :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
- i.e. the outermost SQL statement that's actually being executed.
- There can be other :class:`.CompileState` objects that are not the
- toplevel, such as when a SELECT subquery or CTE-nested
- INSERT/UPDATE/DELETE is generated.
-
- .. versionadded:: 1.4
-
- """
-
- __slots__ = ("statement", "_ambiguous_table_name_map")
-
- plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
-
- _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
-
- @classmethod
- def create_for_statement(cls, statement, compiler, **kw):
- # factory construction.
-
- if statement._propagate_attrs:
- plugin_name = statement._propagate_attrs.get(
- "compile_state_plugin", "default"
- )
- klass = cls.plugins.get(
- (plugin_name, statement._effective_plugin_target), None
- )
- if klass is None:
- klass = cls.plugins[
- ("default", statement._effective_plugin_target)
- ]
-
- else:
- klass = cls.plugins[
- ("default", statement._effective_plugin_target)
- ]
-
- if klass is cls:
- return cls(statement, compiler, **kw)
- else:
- return klass.create_for_statement(statement, compiler, **kw)
-
- def __init__(self, statement, compiler, **kw):
- self.statement = statement
-
- @classmethod
- def get_plugin_class(
- cls, statement: Executable
- ) -> Optional[Type[CompileState]]:
- plugin_name = statement._propagate_attrs.get(
- "compile_state_plugin", None
- )
-
- if plugin_name:
- key = (plugin_name, statement._effective_plugin_target)
- if key in cls.plugins:
- return cls.plugins[key]
-
- # there's no case where we call upon get_plugin_class() and want
- # to get None back, there should always be a default. return that
- # if there was no plugin-specific class (e.g. Insert with "orm"
- # plugin)
- try:
- return cls.plugins[("default", statement._effective_plugin_target)]
- except KeyError:
- return None
-
- @classmethod
- def _get_plugin_class_for_plugin(
- cls, statement: Executable, plugin_name: str
- ) -> Optional[Type[CompileState]]:
- try:
- return cls.plugins[
- (plugin_name, statement._effective_plugin_target)
- ]
- except KeyError:
- return None
-
- @classmethod
- def plugin_for(
- cls, plugin_name: str, visit_name: str
- ) -> Callable[[_Fn], _Fn]:
- def decorate(cls_to_decorate):
- cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
- return cls_to_decorate
-
- return decorate
-
-
-class Generative(HasMemoized):
- """Provide a method-chaining pattern in conjunction with the
- @_generative decorator."""
-
- def _generate(self) -> Self:
- skip = self._memoized_keys
- cls = self.__class__
- s = cls.__new__(cls)
- if skip:
- # ensure this iteration remains atomic
- s.__dict__ = {
- k: v for k, v in self.__dict__.copy().items() if k not in skip
- }
- else:
- s.__dict__ = self.__dict__.copy()
- return s
-
-
-class InPlaceGenerative(HasMemoized):
- """Provide a method-chaining pattern in conjunction with the
- @_generative decorator that mutates in place."""
-
- __slots__ = ()
-
- def _generate(self):
- skip = self._memoized_keys
- # note __dict__ needs to be in __slots__ if this is used
- for k in skip:
- self.__dict__.pop(k, None)
- return self
-
-
-class HasCompileState(Generative):
- """A class that has a :class:`.CompileState` associated with it."""
-
- _compile_state_plugin: Optional[Type[CompileState]] = None
-
- _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
-
- _compile_state_factory = CompileState.create_for_statement
-
-
-class _MetaOptions(type):
- """metaclass for the Options class.
-
- This metaclass is actually necessary despite the availability of the
- ``__init_subclass__()`` hook as this type also provides custom class-level
- behavior for the ``__add__()`` method.
-
- """
-
- _cache_attrs: Tuple[str, ...]
-
- def __add__(self, other):
- o1 = self()
-
- if set(other).difference(self._cache_attrs):
- raise TypeError(
- "dictionary contains attributes not covered by "
- "Options class %s: %r"
- % (self, set(other).difference(self._cache_attrs))
- )
-
- o1.__dict__.update(other)
- return o1
-
- if TYPE_CHECKING:
-
- def __getattr__(self, key: str) -> Any: ...
-
- def __setattr__(self, key: str, value: Any) -> None: ...
-
- def __delattr__(self, key: str) -> None: ...
-
-
-class Options(metaclass=_MetaOptions):
- """A cacheable option dictionary with defaults."""
-
- __slots__ = ()
-
- _cache_attrs: Tuple[str, ...]
-
- def __init_subclass__(cls) -> None:
- dict_ = cls.__dict__
- cls._cache_attrs = tuple(
- sorted(
- d
- for d in dict_
- if not d.startswith("__")
- and d not in ("_cache_key_traversal",)
- )
- )
- super().__init_subclass__()
-
- def __init__(self, **kw):
- self.__dict__.update(kw)
-
- def __add__(self, other):
- o1 = self.__class__.__new__(self.__class__)
- o1.__dict__.update(self.__dict__)
-
- if set(other).difference(self._cache_attrs):
- raise TypeError(
- "dictionary contains attributes not covered by "
- "Options class %s: %r"
- % (self, set(other).difference(self._cache_attrs))
- )
-
- o1.__dict__.update(other)
- return o1
-
- def __eq__(self, other):
- # TODO: very inefficient. This is used only in test suites
- # right now.
- for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
- if getattr(self, a) != getattr(other, b):
- return False
- return True
-
- def __repr__(self):
- # TODO: fairly inefficient, used only in debugging right now.
-
- return "%s(%s)" % (
- self.__class__.__name__,
- ", ".join(
- "%s=%r" % (k, self.__dict__[k])
- for k in self._cache_attrs
- if k in self.__dict__
- ),
- )
-
- @classmethod
- def isinstance(cls, klass: Type[Any]) -> bool:
- return issubclass(cls, klass)
-
- @hybridmethod
- def add_to_element(self, name, value):
- return self + {name: getattr(self, name) + value}
-
- @hybridmethod
- def _state_dict_inst(self) -> Mapping[str, Any]:
- return self.__dict__
-
- _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
-
- @_state_dict_inst.classlevel
- def _state_dict(cls) -> Mapping[str, Any]:
- return cls._state_dict_const
-
- @classmethod
- def safe_merge(cls, other):
- d = other._state_dict()
-
- # only support a merge with another object of our class
- # and which does not have attrs that we don't. otherwise
- # we risk having state that might not be part of our cache
- # key strategy
-
- if (
- cls is not other.__class__
- and other._cache_attrs
- and set(other._cache_attrs).difference(cls._cache_attrs)
- ):
- raise TypeError(
- "other element %r is not empty, is not of type %s, "
- "and contains attributes not covered here %r"
- % (
- other,
- cls,
- set(other._cache_attrs).difference(cls._cache_attrs),
- )
- )
- return cls + d
-
- @classmethod
- def from_execution_options(
- cls, key, attrs, exec_options, statement_exec_options
- ):
- """process Options argument in terms of execution options.
-
-
- e.g.::
-
- (
- load_options,
- execution_options,
- ) = QueryContext.default_load_options.from_execution_options(
- "_sa_orm_load_options",
- {
- "populate_existing",
- "autoflush",
- "yield_per"
- },
- execution_options,
- statement._execution_options,
- )
-
- get back the Options and refresh "_sa_orm_load_options" in the
- exec options dict w/ the Options as well
-
- """
-
- # common case is that no options we are looking for are
- # in either dictionary, so cancel for that first
- check_argnames = attrs.intersection(
- set(exec_options).union(statement_exec_options)
- )
-
- existing_options = exec_options.get(key, cls)
-
- if check_argnames:
- result = {}
- for argname in check_argnames:
- local = "_" + argname
- if argname in exec_options:
- result[local] = exec_options[argname]
- elif argname in statement_exec_options:
- result[local] = statement_exec_options[argname]
-
- new_options = existing_options + result
- exec_options = util.immutabledict().merge_with(
- exec_options, {key: new_options}
- )
- return new_options, exec_options
-
- else:
- return existing_options, exec_options
-
- if TYPE_CHECKING:
-
- def __getattr__(self, key: str) -> Any: ...
-
- def __setattr__(self, key: str, value: Any) -> None: ...
-
- def __delattr__(self, key: str) -> None: ...
-
-
-class CacheableOptions(Options, HasCacheKey):
- __slots__ = ()
-
- @hybridmethod
- def _gen_cache_key_inst(self, anon_map, bindparams):
- return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
-
- @_gen_cache_key_inst.classlevel
- def _gen_cache_key(cls, anon_map, bindparams):
- return (cls, ())
-
- @hybridmethod
- def _generate_cache_key(self):
- return HasCacheKey._generate_cache_key_for_object(self)
-
-
-class ExecutableOption(HasCopyInternals):
- __slots__ = ()
-
- _annotations = util.EMPTY_DICT
-
- __visit_name__ = "executable_option"
-
- _is_has_cache_key = False
-
- _is_core = True
-
- def _clone(self, **kw):
- """Create a shallow copy of this ExecutableOption."""
- c = self.__class__.__new__(self.__class__)
- c.__dict__ = dict(self.__dict__) # type: ignore
- return c
-
-
-class Executable(roles.StatementRole):
- """Mark a :class:`_expression.ClauseElement` as supporting execution.
-
- :class:`.Executable` is a superclass for all "statement" types
- of objects, including :func:`select`, :func:`delete`, :func:`update`,
- :func:`insert`, :func:`text`.
-
- """
-
- supports_execution: bool = True
- _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
- _is_default_generator = False
- _with_options: Tuple[ExecutableOption, ...] = ()
- _with_context_options: Tuple[
- Tuple[Callable[[CompileState], None], Any], ...
- ] = ()
- _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
-
- _executable_traverse_internals = [
- ("_with_options", InternalTraversal.dp_executable_options),
- (
- "_with_context_options",
- ExtendedInternalTraversal.dp_with_context_options,
- ),
- ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
- ]
-
- is_select = False
- is_update = False
- is_insert = False
- is_text = False
- is_delete = False
- is_dml = False
-
- if TYPE_CHECKING:
- __visit_name__: str
-
- def _compile_w_cache(
- self,
- dialect: Dialect,
- *,
- compiled_cache: Optional[CompiledCacheType],
- column_keys: List[str],
- for_executemany: bool = False,
- schema_translate_map: Optional[SchemaTranslateMapType] = None,
- **kw: Any,
- ) -> Tuple[
- Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
- ]: ...
-
- def _execute_on_connection(
- self,
- connection: Connection,
- distilled_params: _CoreMultiExecuteParams,
- execution_options: CoreExecuteOptionsParameter,
- ) -> CursorResult[Any]: ...
-
- def _execute_on_scalar(
- self,
- connection: Connection,
- distilled_params: _CoreMultiExecuteParams,
- execution_options: CoreExecuteOptionsParameter,
- ) -> Any: ...
-
- @util.ro_non_memoized_property
- def _all_selected_columns(self):
- raise NotImplementedError()
-
- @property
- def _effective_plugin_target(self) -> str:
- return self.__visit_name__
-
- @_generative
- def options(self, *options: ExecutableOption) -> Self:
- """Apply options to this statement.
-
- In the general sense, options are any kind of Python object
- that can be interpreted by the SQL compiler for the statement.
- These options can be consumed by specific dialects or specific kinds
- of compilers.
-
- The most commonly known kind of option are the ORM level options
- that apply "eager load" and other loading behaviors to an ORM
- query. However, options can theoretically be used for many other
- purposes.
-
- For background on specific kinds of options for specific kinds of
- statements, refer to the documentation for those option objects.
-
- .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
- Core statement objects towards the goal of allowing unified
- Core / ORM querying capabilities.
-
- .. seealso::
-
- :ref:`loading_columns` - refers to options specific to the usage
- of ORM queries
-
- :ref:`relationship_loader_options` - refers to options specific
- to the usage of ORM queries
-
- """
- self._with_options += tuple(
- coercions.expect(roles.ExecutableOptionRole, opt)
- for opt in options
- )
- return self
-
- @_generative
- def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
- """Assign the compile options to a new value.
-
- :param compile_options: appropriate CacheableOptions structure
-
- """
-
- self._compile_options = compile_options
- return self
-
- @_generative
- def _update_compile_options(self, options: CacheableOptions) -> Self:
- """update the _compile_options with new keys."""
-
- assert self._compile_options is not None
- self._compile_options += options
- return self
-
- @_generative
- def _add_context_option(
- self,
- callable_: Callable[[CompileState], None],
- cache_args: Any,
- ) -> Self:
- """Add a context option to this statement.
-
- These are callable functions that will
- be given the CompileState object upon compilation.
-
- A second argument cache_args is required, which will be combined with
- the ``__code__`` identity of the function itself in order to produce a
- cache key.
-
- """
- self._with_context_options += ((callable_, cache_args),)
- return self
-
- @overload
- def execution_options(
- self,
- *,
- compiled_cache: Optional[CompiledCacheType] = ...,
- logging_token: str = ...,
- isolation_level: IsolationLevel = ...,
- no_parameters: bool = False,
- stream_results: bool = False,
- max_row_buffer: int = ...,
- yield_per: int = ...,
- insertmanyvalues_page_size: int = ...,
- schema_translate_map: Optional[SchemaTranslateMapType] = ...,
- populate_existing: bool = False,
- autoflush: bool = False,
- synchronize_session: SynchronizeSessionArgument = ...,
- dml_strategy: DMLStrategyArgument = ...,
- render_nulls: bool = ...,
- is_delete_using: bool = ...,
- is_update_from: bool = ...,
- preserve_rowcount: bool = False,
- **opt: Any,
- ) -> Self: ...
-
- @overload
- def execution_options(self, **opt: Any) -> Self: ...
-
- @_generative
- def execution_options(self, **kw: Any) -> Self:
- """Set non-SQL options for the statement which take effect during
- execution.
-
- Execution options can be set at many scopes, including per-statement,
- per-connection, or per execution, using methods such as
- :meth:`_engine.Connection.execution_options` and parameters which
- accept a dictionary of options such as
- :paramref:`_engine.Connection.execute.execution_options` and
- :paramref:`_orm.Session.execute.execution_options`.
-
- The primary characteristic of an execution option, as opposed to
- other kinds of options such as ORM loader options, is that
- **execution options never affect the compiled SQL of a query, only
- things that affect how the SQL statement itself is invoked or how
- results are fetched**. That is, execution options are not part of
- what's accommodated by SQL compilation nor are they considered part of
- the cached state of a statement.
-
- The :meth:`_sql.Executable.execution_options` method is
- :term:`generative`, as
- is the case for the method as applied to the :class:`_engine.Engine`
- and :class:`_orm.Query` objects, which means when the method is called,
- a copy of the object is returned, which applies the given parameters to
- that new copy, but leaves the original unchanged::
-
- statement = select(table.c.x, table.c.y)
- new_statement = statement.execution_options(my_option=True)
-
- An exception to this behavior is the :class:`_engine.Connection`
- object, where the :meth:`_engine.Connection.execution_options` method
- is explicitly **not** generative.
-
- The kinds of options that may be passed to
- :meth:`_sql.Executable.execution_options` and other related methods and
- parameter dictionaries include parameters that are explicitly consumed
- by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
- defined by SQLAlchemy, which means the methods and/or parameter
- dictionaries may be used for user-defined parameters that interact with
- custom code, which may access the parameters using methods such as
- :meth:`_sql.Executable.get_execution_options` and
- :meth:`_engine.Connection.get_execution_options`, or within selected
- event hooks using a dedicated ``execution_options`` event parameter
- such as
- :paramref:`_events.ConnectionEvents.before_execute.execution_options`
- or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
-
- from sqlalchemy import event
-
- @event.listens_for(some_engine, "before_execute")
- def _process_opt(conn, statement, multiparams, params, execution_options):
- "run a SQL function before invoking a statement"
-
- if execution_options.get("do_special_thing", False):
- conn.exec_driver_sql("run_special_function()")
-
- Within the scope of options that are explicitly recognized by
- SQLAlchemy, most apply to specific classes of objects and not others.
- The most common execution options include:
-
- * :paramref:`_engine.Connection.execution_options.isolation_level` -
- sets the isolation level for a connection or a class of connections
- via an :class:`_engine.Engine`. This option is accepted only
- by :class:`_engine.Connection` or :class:`_engine.Engine`.
-
- * :paramref:`_engine.Connection.execution_options.stream_results` -
- indicates results should be fetched using a server side cursor;
- this option is accepted by :class:`_engine.Connection`, by the
- :paramref:`_engine.Connection.execute.execution_options` parameter
- on :meth:`_engine.Connection.execute`, and additionally by
- :meth:`_sql.Executable.execution_options` on a SQL statement object,
- as well as by ORM constructs like :meth:`_orm.Session.execute`.
-
- * :paramref:`_engine.Connection.execution_options.compiled_cache` -
- indicates a dictionary that will serve as the
- :ref:`SQL compilation cache <sql_caching>`
- for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
- well as for ORM methods like :meth:`_orm.Session.execute`.
- Can be passed as ``None`` to disable caching for statements.
- This option is not accepted by
- :meth:`_sql.Executable.execution_options` as it is inadvisable to
- carry along a compilation cache within a statement object.
-
- * :paramref:`_engine.Connection.execution_options.schema_translate_map`
- - a mapping of schema names used by the
- :ref:`Schema Translate Map <schema_translating>` feature, accepted
- by :class:`_engine.Connection`, :class:`_engine.Engine`,
- :class:`_sql.Executable`, as well as by ORM constructs
- like :meth:`_orm.Session.execute`.
-
- .. seealso::
-
- :meth:`_engine.Connection.execution_options`
-
- :paramref:`_engine.Connection.execute.execution_options`
-
- :paramref:`_orm.Session.execute.execution_options`
-
- :ref:`orm_queryguide_execution_options` - documentation on all
- ORM-specific execution options
-
- """ # noqa: E501
- if "isolation_level" in kw:
- raise exc.ArgumentError(
- "'isolation_level' execution option may only be specified "
- "on Connection.execution_options(), or "
- "per-engine using the isolation_level "
- "argument to create_engine()."
- )
- if "compiled_cache" in kw:
- raise exc.ArgumentError(
- "'compiled_cache' execution option may only be specified "
- "on Connection.execution_options(), not per statement."
- )
- self._execution_options = self._execution_options.union(kw)
- return self
-
- def get_execution_options(self) -> _ExecuteOptions:
- """Get the non-SQL options which will take effect during execution.
-
- .. versionadded:: 1.3
-
- .. seealso::
-
- :meth:`.Executable.execution_options`
- """
- return self._execution_options
-
-
-class SchemaEventTarget(event.EventTarget):
- """Base class for elements that are the targets of :class:`.DDLEvents`
- events.
-
- This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
-
- """
-
- dispatch: dispatcher[SchemaEventTarget]
-
- def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
- """Associate with this SchemaEvent's parent object."""
-
- def _set_parent_with_dispatch(
- self, parent: SchemaEventTarget, **kw: Any
- ) -> None:
- self.dispatch.before_parent_attach(self, parent)
- self._set_parent(parent, **kw)
- self.dispatch.after_parent_attach(self, parent)
-
-
-class SchemaVisitor(ClauseVisitor):
- """Define the visiting for ``SchemaItem`` objects."""
-
- __traverse_options__ = {"schema_visitor": True}
-
-
-class _SentinelDefaultCharacterization(Enum):
- NONE = "none"
- UNKNOWN = "unknown"
- CLIENTSIDE = "clientside"
- SENTINEL_DEFAULT = "sentinel_default"
- SERVERSIDE = "serverside"
- IDENTITY = "identity"
- SEQUENCE = "sequence"
-
-
-class _SentinelColumnCharacterization(NamedTuple):
- columns: Optional[Sequence[Column[Any]]] = None
- is_explicit: bool = False
- is_autoinc: bool = False
- default_characterization: _SentinelDefaultCharacterization = (
- _SentinelDefaultCharacterization.NONE
- )
-
-
-_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
-
-_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
-_COL = TypeVar("_COL", bound="KeyedColumnElement[Any]")
-
-
-class _ColumnMetrics(Generic[_COL_co]):
- __slots__ = ("column",)
-
- column: _COL_co
-
- def __init__(
- self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
- ):
- self.column = col
-
- # proxy_index being non-empty means it was initialized.
- # so we need to update it
- pi = collection._proxy_index
- if pi:
- for eps_col in col._expanded_proxy_set:
- pi[eps_col].add(self)
-
- def get_expanded_proxy_set(self):
- return self.column._expanded_proxy_set
-
- def dispose(self, collection):
- pi = collection._proxy_index
- if not pi:
- return
- for col in self.column._expanded_proxy_set:
- colset = pi.get(col, None)
- if colset:
- colset.discard(self)
- if colset is not None and not colset:
- del pi[col]
-
- def embedded(
- self,
- target_set: Union[
- Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
- ],
- ) -> bool:
- expanded_proxy_set = self.column._expanded_proxy_set
- for t in target_set.difference(expanded_proxy_set):
- if not expanded_proxy_set.intersection(_expand_cloned([t])):
- return False
- return True
-
-
-class ColumnCollection(Generic[_COLKEY, _COL_co]):
- """Collection of :class:`_expression.ColumnElement` instances,
- typically for
- :class:`_sql.FromClause` objects.
-
- The :class:`_sql.ColumnCollection` object is most commonly available
- as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
- on the :class:`_schema.Table` object, introduced at
- :ref:`metadata_tables_and_columns`.
-
- The :class:`_expression.ColumnCollection` has both mapping- and sequence-
- like behaviors. A :class:`_expression.ColumnCollection` usually stores
- :class:`_schema.Column` objects, which are then accessible both via mapping
- style access as well as attribute access style.
-
- To access :class:`_schema.Column` objects using ordinary attribute-style
- access, specify the name like any other object attribute, such as below
- a column named ``employee_name`` is accessed::
-
- >>> employee_table.c.employee_name
-
- To access columns that have names with special characters or spaces,
- index-style access is used, such as below which illustrates a column named
- ``employee ' payment`` is accessed::
-
- >>> employee_table.c["employee ' payment"]
-
- As the :class:`_sql.ColumnCollection` object provides a Python dictionary
- interface, common dictionary method names like
- :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
- and :meth:`_sql.ColumnCollection.items` are available, which means that
- database columns that are keyed under these names also need to use indexed
- access::
-
- >>> employee_table.c["values"]
-
-
- The name for which a :class:`_schema.Column` would be present is normally
- that of the :paramref:`_schema.Column.key` parameter. In some contexts,
- such as a :class:`_sql.Select` object that uses a label style set
- using the :meth:`_sql.Select.set_label_style` method, a column of a certain
- key may instead be represented under a particular label name such
- as ``tablename_columnname``::
-
- >>> from sqlalchemy import select, column, table
- >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
- >>> t = table("t", column("c"))
- >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
- >>> subq = stmt.subquery()
- >>> subq.c.t_c
- <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
-
- :class:`.ColumnCollection` also indexes the columns in order and allows
- them to be accessible by their integer position::
-
- >>> cc[0]
- Column('x', Integer(), table=None)
- >>> cc[1]
- Column('y', Integer(), table=None)
-
- .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
- allows integer-based
- index access to the collection.
-
- Iterating the collection yields the column expressions in order::
-
- >>> list(cc)
- [Column('x', Integer(), table=None),
- Column('y', Integer(), table=None)]
-
- The base :class:`_expression.ColumnCollection` object can store
- duplicates, which can
- mean either two columns with the same key, in which case the column
- returned by key access is **arbitrary**::
-
- >>> x1, x2 = Column('x', Integer), Column('x', Integer)
- >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
- >>> list(cc)
- [Column('x', Integer(), table=None),
- Column('x', Integer(), table=None)]
- >>> cc['x'] is x1
- False
- >>> cc['x'] is x2
- True
-
- Or it can also mean the same column multiple times. These cases are
- supported as :class:`_expression.ColumnCollection`
- is used to represent the columns in
- a SELECT statement which may include duplicates.
-
- A special subclass :class:`.DedupeColumnCollection` exists which instead
- maintains SQLAlchemy's older behavior of not allowing duplicates; this
- collection is used for schema level objects like :class:`_schema.Table`
- and
- :class:`.PrimaryKeyConstraint` where this deduping is helpful. The
- :class:`.DedupeColumnCollection` class also has additional mutation methods
- as the schema constructs have more use cases that require removal and
- replacement of columns.
-
- .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
- now stores duplicate
- column keys as well as the same column in multiple positions. The
- :class:`.DedupeColumnCollection` class is added to maintain the
- former behavior in those cases where deduplication as well as
- additional replace/remove operations are needed.
-
-
- """
-
- __slots__ = "_collection", "_index", "_colset", "_proxy_index"
-
- _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
- _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
- _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
- _colset: Set[_COL_co]
-
- def __init__(
- self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
- ):
- object.__setattr__(self, "_colset", set())
- object.__setattr__(self, "_index", {})
- object.__setattr__(
- self, "_proxy_index", collections.defaultdict(util.OrderedSet)
- )
- object.__setattr__(self, "_collection", [])
- if columns:
- self._initial_populate(columns)
-
- @util.preload_module("sqlalchemy.sql.elements")
- def __clause_element__(self) -> ClauseList:
- elements = util.preloaded.sql_elements
-
- return elements.ClauseList(
- _literal_as_text_role=roles.ColumnsClauseRole,
- group=False,
- *self._all_columns,
- )
-
- def _initial_populate(
- self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
- ) -> None:
- self._populate_separate_keys(iter_)
-
- @property
- def _all_columns(self) -> List[_COL_co]:
- return [col for (_, col, _) in self._collection]
-
- def keys(self) -> List[_COLKEY]:
- """Return a sequence of string key names for all columns in this
- collection."""
- return [k for (k, _, _) in self._collection]
-
- def values(self) -> List[_COL_co]:
- """Return a sequence of :class:`_sql.ColumnClause` or
- :class:`_schema.Column` objects for all columns in this
- collection."""
- return [col for (_, col, _) in self._collection]
-
- def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
- """Return a sequence of (key, column) tuples for all columns in this
- collection each consisting of a string key name and a
- :class:`_sql.ColumnClause` or
- :class:`_schema.Column` object.
- """
-
- return [(k, col) for (k, col, _) in self._collection]
-
- def __bool__(self) -> bool:
- return bool(self._collection)
-
- def __len__(self) -> int:
- return len(self._collection)
-
- def __iter__(self) -> Iterator[_COL_co]:
- # turn to a list first to maintain over a course of changes
- return iter([col for _, col, _ in self._collection])
-
- @overload
- def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
-
- @overload
- def __getitem__(
- self, key: Tuple[Union[str, int], ...]
- ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
-
- @overload
- def __getitem__(
- self, key: slice
- ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
-
- def __getitem__(
- self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
- ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
- try:
- if isinstance(key, (tuple, slice)):
- if isinstance(key, slice):
- cols = (
- (sub_key, col)
- for (sub_key, col, _) in self._collection[key]
- )
- else:
- cols = (self._index[sub_key] for sub_key in key)
-
- return ColumnCollection(cols).as_readonly()
- else:
- return self._index[key][1]
- except KeyError as err:
- if isinstance(err.args[0], int):
- raise IndexError(err.args[0]) from err
- else:
- raise
-
- def __getattr__(self, key: str) -> _COL_co:
- try:
- return self._index[key][1]
- except KeyError as err:
- raise AttributeError(key) from err
-
- def __contains__(self, key: str) -> bool:
- if key not in self._index:
- if not isinstance(key, str):
- raise exc.ArgumentError(
- "__contains__ requires a string argument"
- )
- return False
- else:
- return True
-
- def compare(self, other: ColumnCollection[Any, Any]) -> bool:
- """Compare this :class:`_expression.ColumnCollection` to another
- based on the names of the keys"""
-
- for l, r in zip_longest(self, other):
- if l is not r:
- return False
- else:
- return True
-
- def __eq__(self, other: Any) -> bool:
- return self.compare(other)
-
- def get(
- self, key: str, default: Optional[_COL_co] = None
- ) -> Optional[_COL_co]:
- """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
- based on a string key name from this
- :class:`_expression.ColumnCollection`."""
-
- if key in self._index:
- return self._index[key][1]
- else:
- return default
-
- def __str__(self) -> str:
- return "%s(%s)" % (
- self.__class__.__name__,
- ", ".join(str(c) for c in self),
- )
-
- def __setitem__(self, key: str, value: Any) -> NoReturn:
- raise NotImplementedError()
-
- def __delitem__(self, key: str) -> NoReturn:
- raise NotImplementedError()
-
- def __setattr__(self, key: str, obj: Any) -> NoReturn:
- raise NotImplementedError()
-
- def clear(self) -> NoReturn:
- """Dictionary clear() is not implemented for
- :class:`_sql.ColumnCollection`."""
- raise NotImplementedError()
-
- def remove(self, column: Any) -> None:
- raise NotImplementedError()
-
- def update(self, iter_: Any) -> NoReturn:
- """Dictionary update() is not implemented for
- :class:`_sql.ColumnCollection`."""
- raise NotImplementedError()
-
- # https://github.com/python/mypy/issues/4266
- __hash__ = None # type: ignore
-
- def _populate_separate_keys(
- self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
- ) -> None:
- """populate from an iterator of (key, column)"""
-
- self._collection[:] = collection = [
- (k, c, _ColumnMetrics(self, c)) for k, c in iter_
- ]
- self._colset.update(c._deannotate() for _, c, _ in collection)
- self._index.update(
- {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
- )
- self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
-
- def add(
- self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
- ) -> None:
- """Add a column to this :class:`_sql.ColumnCollection`.
-
- .. note::
-
- This method is **not normally used by user-facing code**, as the
- :class:`_sql.ColumnCollection` is usually part of an existing
- object such as a :class:`_schema.Table`. To add a
- :class:`_schema.Column` to an existing :class:`_schema.Table`
- object, use the :meth:`_schema.Table.append_column` method.
-
- """
- colkey: _COLKEY
-
- if key is None:
- colkey = column.key # type: ignore
- else:
- colkey = key
-
- l = len(self._collection)
-
- # don't really know how this part is supposed to work w/ the
- # covariant thing
-
- _column = cast(_COL_co, column)
-
- self._collection.append(
- (colkey, _column, _ColumnMetrics(self, _column))
- )
- self._colset.add(_column._deannotate())
- self._index[l] = (colkey, _column)
- if colkey not in self._index:
- self._index[colkey] = (colkey, _column)
-
- def __getstate__(self) -> Dict[str, Any]:
- return {
- "_collection": [(k, c) for k, c, _ in self._collection],
- "_index": self._index,
- }
-
- def __setstate__(self, state: Dict[str, Any]) -> None:
- object.__setattr__(self, "_index", state["_index"])
- object.__setattr__(
- self, "_proxy_index", collections.defaultdict(util.OrderedSet)
- )
- object.__setattr__(
- self,
- "_collection",
- [
- (k, c, _ColumnMetrics(self, c))
- for (k, c) in state["_collection"]
- ],
- )
- object.__setattr__(
- self, "_colset", {col for k, col, _ in self._collection}
- )
-
- def contains_column(self, col: ColumnElement[Any]) -> bool:
- """Checks if a column object exists in this collection"""
- if col not in self._colset:
- if isinstance(col, str):
- raise exc.ArgumentError(
- "contains_column cannot be used with string arguments. "
- "Use ``col_name in table.c`` instead."
- )
- return False
- else:
- return True
-
- def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
- """Return a "read only" form of this
- :class:`_sql.ColumnCollection`."""
-
- return ReadOnlyColumnCollection(self)
-
- def _init_proxy_index(self):
- """populate the "proxy index", if empty.
-
- proxy index is added in 2.0 to provide more efficient operation
- for the corresponding_column() method.
-
- For reasons of both time to construct new .c collections as well as
- memory conservation for large numbers of large .c collections, the
- proxy_index is only filled if corresponding_column() is called. once
- filled it stays that way, and new _ColumnMetrics objects created after
- that point will populate it with new data. Note this case would be
- unusual, if not nonexistent, as it means a .c collection is being
- mutated after corresponding_column() were used, however it is tested in
- test/base/test_utils.py.
-
- """
- pi = self._proxy_index
- if pi:
- return
-
- for _, _, metrics in self._collection:
- eps = metrics.column._expanded_proxy_set
-
- for eps_col in eps:
- pi[eps_col].add(metrics)
-
- def corresponding_column(
- self, column: _COL, require_embedded: bool = False
- ) -> Optional[Union[_COL, _COL_co]]:
- """Given a :class:`_expression.ColumnElement`, return the exported
- :class:`_expression.ColumnElement` object from this
- :class:`_expression.ColumnCollection`
- which corresponds to that original :class:`_expression.ColumnElement`
- via a common
- ancestor column.
-
- :param column: the target :class:`_expression.ColumnElement`
- to be matched.
-
- :param require_embedded: only return corresponding columns for
- the given :class:`_expression.ColumnElement`, if the given
- :class:`_expression.ColumnElement`
- is actually present within a sub-element
- of this :class:`_expression.Selectable`.
- Normally the column will match if
- it merely shares a common ancestor with one of the exported
- columns of this :class:`_expression.Selectable`.
-
- .. seealso::
-
- :meth:`_expression.Selectable.corresponding_column`
- - invokes this method
- against the collection returned by
- :attr:`_expression.Selectable.exported_columns`.
-
- .. versionchanged:: 1.4 the implementation for ``corresponding_column``
- was moved onto the :class:`_expression.ColumnCollection` itself.
-
- """
- # TODO: cython candidate
-
- # don't dig around if the column is locally present
- if column in self._colset:
- return column
-
- selected_intersection, selected_metrics = None, None
- target_set = column.proxy_set
-
- pi = self._proxy_index
- if not pi:
- self._init_proxy_index()
-
- for current_metrics in (
- mm for ts in target_set if ts in pi for mm in pi[ts]
- ):
- if not require_embedded or current_metrics.embedded(target_set):
- if selected_metrics is None:
- # no corresponding column yet, pick this one.
- selected_metrics = current_metrics
- continue
-
- current_intersection = target_set.intersection(
- current_metrics.column._expanded_proxy_set
- )
- if selected_intersection is None:
- selected_intersection = target_set.intersection(
- selected_metrics.column._expanded_proxy_set
- )
-
- if len(current_intersection) > len(selected_intersection):
- # 'current' has a larger field of correspondence than
- # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
- # matches a1.c.x->table.c.x better than
- # selectable.c.x->table.c.x does.
-
- selected_metrics = current_metrics
- selected_intersection = current_intersection
- elif current_intersection == selected_intersection:
- # they have the same field of correspondence. see
- # which proxy_set has fewer columns in it, which
- # indicates a closer relationship with the root
- # column. Also take into account the "weight"
- # attribute which CompoundSelect() uses to give
- # higher precedence to columns based on vertical
- # position in the compound statement, and discard
- # columns that have no reference to the target
- # column (also occurs with CompoundSelect)
-
- selected_col_distance = sum(
- [
- sc._annotations.get("weight", 1)
- for sc in (
- selected_metrics.column._uncached_proxy_list()
- )
- if sc.shares_lineage(column)
- ],
- )
- current_col_distance = sum(
- [
- sc._annotations.get("weight", 1)
- for sc in (
- current_metrics.column._uncached_proxy_list()
- )
- if sc.shares_lineage(column)
- ],
- )
- if current_col_distance < selected_col_distance:
- selected_metrics = current_metrics
- selected_intersection = current_intersection
-
- return selected_metrics.column if selected_metrics else None
-
-
-_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
-
-
-class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
- """A :class:`_expression.ColumnCollection`
- that maintains deduplicating behavior.
-
- This is useful by schema level objects such as :class:`_schema.Table` and
- :class:`.PrimaryKeyConstraint`. The collection includes more
- sophisticated mutator methods as well to suit schema objects which
- require mutable column collections.
-
- .. versionadded:: 1.4
-
- """
-
- def add(
- self, column: ColumnElement[Any], key: Optional[str] = None
- ) -> None:
- named_column = cast(_NAMEDCOL, column)
- if key is not None and named_column.key != key:
- raise exc.ArgumentError(
- "DedupeColumnCollection requires columns be under "
- "the same key as their .key"
- )
- key = named_column.key
-
- if key is None:
- raise exc.ArgumentError(
- "Can't add unnamed column to column collection"
- )
-
- if key in self._index:
- existing = self._index[key][1]
-
- if existing is named_column:
- return
-
- self.replace(named_column)
-
- # pop out memoized proxy_set as this
- # operation may very well be occurring
- # in a _make_proxy operation
- util.memoized_property.reset(named_column, "proxy_set")
- else:
- self._append_new_column(key, named_column)
-
- def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
- l = len(self._collection)
- self._collection.append(
- (key, named_column, _ColumnMetrics(self, named_column))
- )
- self._colset.add(named_column._deannotate())
- self._index[l] = (key, named_column)
- self._index[key] = (key, named_column)
-
- def _populate_separate_keys(
- self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
- ) -> None:
- """populate from an iterator of (key, column)"""
- cols = list(iter_)
-
- replace_col = []
- for k, col in cols:
- if col.key != k:
- raise exc.ArgumentError(
- "DedupeColumnCollection requires columns be under "
- "the same key as their .key"
- )
- if col.name in self._index and col.key != col.name:
- replace_col.append(col)
- elif col.key in self._index:
- replace_col.append(col)
- else:
- self._index[k] = (k, col)
- self._collection.append((k, col, _ColumnMetrics(self, col)))
- self._colset.update(c._deannotate() for (k, c, _) in self._collection)
-
- self._index.update(
- (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
- )
- for col in replace_col:
- self.replace(col)
-
- def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
- self._populate_separate_keys((col.key, col) for col in iter_)
-
- def remove(self, column: _NAMEDCOL) -> None:
- if column not in self._colset:
- raise ValueError(
- "Can't remove column %r; column is not in this collection"
- % column
- )
- del self._index[column.key]
- self._colset.remove(column)
- self._collection[:] = [
- (k, c, metrics)
- for (k, c, metrics) in self._collection
- if c is not column
- ]
- for metrics in self._proxy_index.get(column, ()):
- metrics.dispose(self)
-
- self._index.update(
- {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
- )
- # delete higher index
- del self._index[len(self._collection)]
-
- def replace(
- self,
- column: _NAMEDCOL,
- extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
- ) -> None:
- """add the given column to this collection, removing unaliased
- versions of this column as well as existing columns with the
- same key.
-
- e.g.::
-
- t = Table('sometable', metadata, Column('col1', Integer))
- t.columns.replace(Column('col1', Integer, key='columnone'))
-
- will remove the original 'col1' from the collection, and add
- the new column under the name 'columnname'.
-
- Used by schema.Column to override columns during table reflection.
-
- """
-
- if extra_remove:
- remove_col = set(extra_remove)
- else:
- remove_col = set()
- # remove up to two columns based on matches of name as well as key
- if column.name in self._index and column.key != column.name:
- other = self._index[column.name][1]
- if other.name == other.key:
- remove_col.add(other)
-
- if column.key in self._index:
- remove_col.add(self._index[column.key][1])
-
- if not remove_col:
- self._append_new_column(column.key, column)
- return
- new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
- replaced = False
- for k, col, metrics in self._collection:
- if col in remove_col:
- if not replaced:
- replaced = True
- new_cols.append(
- (column.key, column, _ColumnMetrics(self, column))
- )
- else:
- new_cols.append((k, col, metrics))
-
- if remove_col:
- self._colset.difference_update(remove_col)
-
- for rc in remove_col:
- for metrics in self._proxy_index.get(rc, ()):
- metrics.dispose(self)
-
- if not replaced:
- new_cols.append((column.key, column, _ColumnMetrics(self, column)))
-
- self._colset.add(column._deannotate())
- self._collection[:] = new_cols
-
- self._index.clear()
-
- self._index.update(
- {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
- )
- self._index.update({k: (k, col) for (k, col, _) in self._collection})
-
-
-class ReadOnlyColumnCollection(
- util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
-):
- __slots__ = ("_parent",)
-
- def __init__(self, collection):
- object.__setattr__(self, "_parent", collection)
- object.__setattr__(self, "_colset", collection._colset)
- object.__setattr__(self, "_index", collection._index)
- object.__setattr__(self, "_collection", collection._collection)
- object.__setattr__(self, "_proxy_index", collection._proxy_index)
-
- def __getstate__(self):
- return {"_parent": self._parent}
-
- def __setstate__(self, state):
- parent = state["_parent"]
- self.__init__(parent) # type: ignore
-
- def add(self, column: Any, key: Any = ...) -> Any:
- self._readonly()
-
- def extend(self, elements: Any) -> NoReturn:
- self._readonly()
-
- def remove(self, item: Any) -> NoReturn:
- self._readonly()
-
-
-class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
- def contains_column(self, col):
- return col in self
-
- def extend(self, cols):
- for col in cols:
- self.add(col)
-
- def __eq__(self, other):
- l = []
- for c in other:
- for local in self:
- if c.shares_lineage(local):
- l.append(c == local)
- return elements.and_(*l)
-
- def __hash__(self):
- return hash(tuple(x for x in self))
-
-
-def _entity_namespace(
- entity: Union[_HasEntityNamespace, ExternallyTraversible]
-) -> _EntityNamespace:
- """Return the nearest .entity_namespace for the given entity.
-
- If not immediately available, does an iterate to find a sub-element
- that has one, if any.
-
- """
- try:
- return cast(_HasEntityNamespace, entity).entity_namespace
- except AttributeError:
- for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
- if _is_has_entity_namespace(elem):
- return elem.entity_namespace
- else:
- raise
-
-
-def _entity_namespace_key(
- entity: Union[_HasEntityNamespace, ExternallyTraversible],
- key: str,
- default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
-) -> SQLCoreOperations[Any]:
- """Return an entry from an entity_namespace.
-
-
- Raises :class:`_exc.InvalidRequestError` rather than attribute error
- on not found.
-
- """
-
- try:
- ns = _entity_namespace(entity)
- if default is not NO_ARG:
- return getattr(ns, key, default)
- else:
- return getattr(ns, key) # type: ignore
- except AttributeError as err:
- raise exc.InvalidRequestError(
- 'Entity namespace for "%s" has no property "%s"' % (entity, key)
- ) from err