summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/sql/traversals.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/sql/traversals.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/sql/traversals.py1022
1 files changed, 0 insertions, 1022 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/sql/traversals.py b/venv/lib/python3.11/site-packages/sqlalchemy/sql/traversals.py
deleted file mode 100644
index 3ca3caf..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/sql/traversals.py
+++ /dev/null
@@ -1,1022 +0,0 @@
-# sql/traversals.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: allow-untyped-defs, allow-untyped-calls
-
-from __future__ import annotations
-
-from collections import deque
-import collections.abc as collections_abc
-import itertools
-from itertools import zip_longest
-import operator
-import typing
-from typing import Any
-from typing import Callable
-from typing import Deque
-from typing import Dict
-from typing import Iterable
-from typing import Optional
-from typing import Set
-from typing import Tuple
-from typing import Type
-
-from . import operators
-from .cache_key import HasCacheKey
-from .visitors import _TraverseInternalsType
-from .visitors import anon_map
-from .visitors import ExternallyTraversible
-from .visitors import HasTraversalDispatch
-from .visitors import HasTraverseInternals
-from .. import util
-from ..util import langhelpers
-from ..util.typing import Self
-
-
-SKIP_TRAVERSE = util.symbol("skip_traverse")
-COMPARE_FAILED = False
-COMPARE_SUCCEEDED = True
-
-
-def compare(obj1: Any, obj2: Any, **kw: Any) -> bool:
- strategy: TraversalComparatorStrategy
- if kw.get("use_proxies", False):
- strategy = ColIdentityComparatorStrategy()
- else:
- strategy = TraversalComparatorStrategy()
-
- return strategy.compare(obj1, obj2, **kw)
-
-
-def _preconfigure_traversals(target_hierarchy: Type[Any]) -> None:
- for cls in util.walk_subclasses(target_hierarchy):
- if hasattr(cls, "_generate_cache_attrs") and hasattr(
- cls, "_traverse_internals"
- ):
- cls._generate_cache_attrs()
- _copy_internals.generate_dispatch(
- cls,
- cls._traverse_internals,
- "_generated_copy_internals_traversal",
- )
- _get_children.generate_dispatch(
- cls,
- cls._traverse_internals,
- "_generated_get_children_traversal",
- )
-
-
-class HasShallowCopy(HasTraverseInternals):
- """attribute-wide operations that are useful for classes that use
- __slots__ and therefore can't operate on their attributes in a dictionary.
-
-
- """
-
- __slots__ = ()
-
- if typing.TYPE_CHECKING:
-
- def _generated_shallow_copy_traversal(self, other: Self) -> None: ...
-
- def _generated_shallow_from_dict_traversal(
- self, d: Dict[str, Any]
- ) -> None: ...
-
- def _generated_shallow_to_dict_traversal(self) -> Dict[str, Any]: ...
-
- @classmethod
- def _generate_shallow_copy(
- cls,
- internal_dispatch: _TraverseInternalsType,
- method_name: str,
- ) -> Callable[[Self, Self], None]:
- code = "\n".join(
- f" other.{attrname} = self.{attrname}"
- for attrname, _ in internal_dispatch
- )
- meth_text = f"def {method_name}(self, other):\n{code}\n"
- return langhelpers._exec_code_in_env(meth_text, {}, method_name)
-
- @classmethod
- def _generate_shallow_to_dict(
- cls,
- internal_dispatch: _TraverseInternalsType,
- method_name: str,
- ) -> Callable[[Self], Dict[str, Any]]:
- code = ",\n".join(
- f" '{attrname}': self.{attrname}"
- for attrname, _ in internal_dispatch
- )
- meth_text = f"def {method_name}(self):\n return {{{code}}}\n"
- return langhelpers._exec_code_in_env(meth_text, {}, method_name)
-
- @classmethod
- def _generate_shallow_from_dict(
- cls,
- internal_dispatch: _TraverseInternalsType,
- method_name: str,
- ) -> Callable[[Self, Dict[str, Any]], None]:
- code = "\n".join(
- f" self.{attrname} = d['{attrname}']"
- for attrname, _ in internal_dispatch
- )
- meth_text = f"def {method_name}(self, d):\n{code}\n"
- return langhelpers._exec_code_in_env(meth_text, {}, method_name)
-
- def _shallow_from_dict(self, d: Dict[str, Any]) -> None:
- cls = self.__class__
-
- shallow_from_dict: Callable[[HasShallowCopy, Dict[str, Any]], None]
- try:
- shallow_from_dict = cls.__dict__[
- "_generated_shallow_from_dict_traversal"
- ]
- except KeyError:
- shallow_from_dict = self._generate_shallow_from_dict(
- cls._traverse_internals,
- "_generated_shallow_from_dict_traversal",
- )
-
- cls._generated_shallow_from_dict_traversal = shallow_from_dict # type: ignore # noqa: E501
-
- shallow_from_dict(self, d)
-
- def _shallow_to_dict(self) -> Dict[str, Any]:
- cls = self.__class__
-
- shallow_to_dict: Callable[[HasShallowCopy], Dict[str, Any]]
-
- try:
- shallow_to_dict = cls.__dict__[
- "_generated_shallow_to_dict_traversal"
- ]
- except KeyError:
- shallow_to_dict = self._generate_shallow_to_dict(
- cls._traverse_internals, "_generated_shallow_to_dict_traversal"
- )
-
- cls._generated_shallow_to_dict_traversal = shallow_to_dict # type: ignore # noqa: E501
- return shallow_to_dict(self)
-
- def _shallow_copy_to(self, other: Self) -> None:
- cls = self.__class__
-
- shallow_copy: Callable[[Self, Self], None]
- try:
- shallow_copy = cls.__dict__["_generated_shallow_copy_traversal"]
- except KeyError:
- shallow_copy = self._generate_shallow_copy(
- cls._traverse_internals, "_generated_shallow_copy_traversal"
- )
-
- cls._generated_shallow_copy_traversal = shallow_copy # type: ignore # noqa: E501
- shallow_copy(self, other)
-
- def _clone(self, **kw: Any) -> Self:
- """Create a shallow copy"""
- c = self.__class__.__new__(self.__class__)
- self._shallow_copy_to(c)
- return c
-
-
-class GenerativeOnTraversal(HasShallowCopy):
- """Supplies Generative behavior but making use of traversals to shallow
- copy.
-
- .. seealso::
-
- :class:`sqlalchemy.sql.base.Generative`
-
-
- """
-
- __slots__ = ()
-
- def _generate(self) -> Self:
- cls = self.__class__
- s = cls.__new__(cls)
- self._shallow_copy_to(s)
- return s
-
-
-def _clone(element, **kw):
- return element._clone()
-
-
-class HasCopyInternals(HasTraverseInternals):
- __slots__ = ()
-
- def _clone(self, **kw):
- raise NotImplementedError()
-
- def _copy_internals(
- self, *, omit_attrs: Iterable[str] = (), **kw: Any
- ) -> None:
- """Reassign internal elements to be clones of themselves.
-
- Called during a copy-and-traverse operation on newly
- shallow-copied elements to create a deep copy.
-
- The given clone function should be used, which may be applying
- additional transformations to the element (i.e. replacement
- traversal, cloned traversal, annotations).
-
- """
-
- try:
- traverse_internals = self._traverse_internals
- except AttributeError:
- # user-defined classes may not have a _traverse_internals
- return
-
- for attrname, obj, meth in _copy_internals.run_generated_dispatch(
- self, traverse_internals, "_generated_copy_internals_traversal"
- ):
- if attrname in omit_attrs:
- continue
-
- if obj is not None:
- result = meth(attrname, self, obj, **kw)
- if result is not None:
- setattr(self, attrname, result)
-
-
-class _CopyInternalsTraversal(HasTraversalDispatch):
- """Generate a _copy_internals internal traversal dispatch for classes
- with a _traverse_internals collection."""
-
- def visit_clauseelement(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- return clone(element, **kw)
-
- def visit_clauseelement_list(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- return [clone(clause, **kw) for clause in element]
-
- def visit_clauseelement_tuple(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- return tuple([clone(clause, **kw) for clause in element])
-
- def visit_executable_options(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- return tuple([clone(clause, **kw) for clause in element])
-
- def visit_clauseelement_unordered_set(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- return {clone(clause, **kw) for clause in element}
-
- def visit_clauseelement_tuples(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- return [
- tuple(clone(tup_elem, **kw) for tup_elem in elem)
- for elem in element
- ]
-
- def visit_string_clauseelement_dict(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- return {key: clone(value, **kw) for key, value in element.items()}
-
- def visit_setup_join_tuple(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- return tuple(
- (
- clone(target, **kw) if target is not None else None,
- clone(onclause, **kw) if onclause is not None else None,
- clone(from_, **kw) if from_ is not None else None,
- flags,
- )
- for (target, onclause, from_, flags) in element
- )
-
- def visit_memoized_select_entities(self, attrname, parent, element, **kw):
- return self.visit_clauseelement_tuple(attrname, parent, element, **kw)
-
- def visit_dml_ordered_values(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- # sequence of 2-tuples
- return [
- (
- (
- clone(key, **kw)
- if hasattr(key, "__clause_element__")
- else key
- ),
- clone(value, **kw),
- )
- for key, value in element
- ]
-
- def visit_dml_values(self, attrname, parent, element, clone=_clone, **kw):
- return {
- (
- clone(key, **kw) if hasattr(key, "__clause_element__") else key
- ): clone(value, **kw)
- for key, value in element.items()
- }
-
- def visit_dml_multi_values(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- # sequence of sequences, each sequence contains a list/dict/tuple
-
- def copy(elem):
- if isinstance(elem, (list, tuple)):
- return [
- (
- clone(value, **kw)
- if hasattr(value, "__clause_element__")
- else value
- )
- for value in elem
- ]
- elif isinstance(elem, dict):
- return {
- (
- clone(key, **kw)
- if hasattr(key, "__clause_element__")
- else key
- ): (
- clone(value, **kw)
- if hasattr(value, "__clause_element__")
- else value
- )
- for key, value in elem.items()
- }
- else:
- # TODO: use abc classes
- assert False
-
- return [
- [copy(sub_element) for sub_element in sequence]
- for sequence in element
- ]
-
- def visit_propagate_attrs(
- self, attrname, parent, element, clone=_clone, **kw
- ):
- return element
-
-
-_copy_internals = _CopyInternalsTraversal()
-
-
-def _flatten_clauseelement(element):
- while hasattr(element, "__clause_element__") and not getattr(
- element, "is_clause_element", False
- ):
- element = element.__clause_element__()
-
- return element
-
-
-class _GetChildrenTraversal(HasTraversalDispatch):
- """Generate a _children_traversal internal traversal dispatch for classes
- with a _traverse_internals collection."""
-
- def visit_has_cache_key(self, element, **kw):
- # the GetChildren traversal refers explicitly to ClauseElement
- # structures. Within these, a plain HasCacheKey is not a
- # ClauseElement, so don't include these.
- return ()
-
- def visit_clauseelement(self, element, **kw):
- return (element,)
-
- def visit_clauseelement_list(self, element, **kw):
- return element
-
- def visit_clauseelement_tuple(self, element, **kw):
- return element
-
- def visit_clauseelement_tuples(self, element, **kw):
- return itertools.chain.from_iterable(element)
-
- def visit_fromclause_canonical_column_collection(self, element, **kw):
- return ()
-
- def visit_string_clauseelement_dict(self, element, **kw):
- return element.values()
-
- def visit_fromclause_ordered_set(self, element, **kw):
- return element
-
- def visit_clauseelement_unordered_set(self, element, **kw):
- return element
-
- def visit_setup_join_tuple(self, element, **kw):
- for target, onclause, from_, flags in element:
- if from_ is not None:
- yield from_
-
- if not isinstance(target, str):
- yield _flatten_clauseelement(target)
-
- if onclause is not None and not isinstance(onclause, str):
- yield _flatten_clauseelement(onclause)
-
- def visit_memoized_select_entities(self, element, **kw):
- return self.visit_clauseelement_tuple(element, **kw)
-
- def visit_dml_ordered_values(self, element, **kw):
- for k, v in element:
- if hasattr(k, "__clause_element__"):
- yield k
- yield v
-
- def visit_dml_values(self, element, **kw):
- expr_values = {k for k in element if hasattr(k, "__clause_element__")}
- str_values = expr_values.symmetric_difference(element)
-
- for k in sorted(str_values):
- yield element[k]
- for k in expr_values:
- yield k
- yield element[k]
-
- def visit_dml_multi_values(self, element, **kw):
- return ()
-
- def visit_propagate_attrs(self, element, **kw):
- return ()
-
-
-_get_children = _GetChildrenTraversal()
-
-
-@util.preload_module("sqlalchemy.sql.elements")
-def _resolve_name_for_compare(element, name, anon_map, **kw):
- if isinstance(name, util.preloaded.sql_elements._anonymous_label):
- name = name.apply_map(anon_map)
-
- return name
-
-
-class TraversalComparatorStrategy(HasTraversalDispatch, util.MemoizedSlots):
- __slots__ = "stack", "cache", "anon_map"
-
- def __init__(self):
- self.stack: Deque[
- Tuple[
- Optional[ExternallyTraversible],
- Optional[ExternallyTraversible],
- ]
- ] = deque()
- self.cache = set()
-
- def _memoized_attr_anon_map(self):
- return (anon_map(), anon_map())
-
- def compare(
- self,
- obj1: ExternallyTraversible,
- obj2: ExternallyTraversible,
- **kw: Any,
- ) -> bool:
- stack = self.stack
- cache = self.cache
-
- compare_annotations = kw.get("compare_annotations", False)
-
- stack.append((obj1, obj2))
-
- while stack:
- left, right = stack.popleft()
-
- if left is right:
- continue
- elif left is None or right is None:
- # we know they are different so no match
- return False
- elif (left, right) in cache:
- continue
- cache.add((left, right))
-
- visit_name = left.__visit_name__
- if visit_name != right.__visit_name__:
- return False
-
- meth = getattr(self, "compare_%s" % visit_name, None)
-
- if meth:
- attributes_compared = meth(left, right, **kw)
- if attributes_compared is COMPARE_FAILED:
- return False
- elif attributes_compared is SKIP_TRAVERSE:
- continue
-
- # attributes_compared is returned as a list of attribute
- # names that were "handled" by the comparison method above.
- # remaining attribute names in the _traverse_internals
- # will be compared.
- else:
- attributes_compared = ()
-
- for (
- (left_attrname, left_visit_sym),
- (right_attrname, right_visit_sym),
- ) in zip_longest(
- left._traverse_internals,
- right._traverse_internals,
- fillvalue=(None, None),
- ):
- if not compare_annotations and (
- (left_attrname == "_annotations")
- or (right_attrname == "_annotations")
- ):
- continue
-
- if (
- left_attrname != right_attrname
- or left_visit_sym is not right_visit_sym
- ):
- return False
- elif left_attrname in attributes_compared:
- continue
-
- assert left_visit_sym is not None
- assert left_attrname is not None
- assert right_attrname is not None
-
- dispatch = self.dispatch(left_visit_sym)
- assert dispatch is not None, (
- f"{self.__class__} has no dispatch for "
- f"'{self._dispatch_lookup[left_visit_sym]}'"
- )
- left_child = operator.attrgetter(left_attrname)(left)
- right_child = operator.attrgetter(right_attrname)(right)
- if left_child is None:
- if right_child is not None:
- return False
- else:
- continue
-
- comparison = dispatch(
- left_attrname, left, left_child, right, right_child, **kw
- )
- if comparison is COMPARE_FAILED:
- return False
-
- return True
-
- def compare_inner(self, obj1, obj2, **kw):
- comparator = self.__class__()
- return comparator.compare(obj1, obj2, **kw)
-
- def visit_has_cache_key(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- if left._gen_cache_key(self.anon_map[0], []) != right._gen_cache_key(
- self.anon_map[1], []
- ):
- return COMPARE_FAILED
-
- def visit_propagate_attrs(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return self.compare_inner(
- left.get("plugin_subject", None), right.get("plugin_subject", None)
- )
-
- def visit_has_cache_key_list(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for l, r in zip_longest(left, right, fillvalue=None):
- if l is None:
- if r is not None:
- return COMPARE_FAILED
- else:
- continue
- elif r is None:
- return COMPARE_FAILED
-
- if l._gen_cache_key(self.anon_map[0], []) != r._gen_cache_key(
- self.anon_map[1], []
- ):
- return COMPARE_FAILED
-
- def visit_executable_options(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for l, r in zip_longest(left, right, fillvalue=None):
- if l is None:
- if r is not None:
- return COMPARE_FAILED
- else:
- continue
- elif r is None:
- return COMPARE_FAILED
-
- if (
- l._gen_cache_key(self.anon_map[0], [])
- if l._is_has_cache_key
- else l
- ) != (
- r._gen_cache_key(self.anon_map[1], [])
- if r._is_has_cache_key
- else r
- ):
- return COMPARE_FAILED
-
- def visit_clauseelement(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- self.stack.append((left, right))
-
- def visit_fromclause_canonical_column_collection(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for lcol, rcol in zip_longest(left, right, fillvalue=None):
- self.stack.append((lcol, rcol))
-
- def visit_fromclause_derived_column_collection(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- pass
-
- def visit_string_clauseelement_dict(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for lstr, rstr in zip_longest(
- sorted(left), sorted(right), fillvalue=None
- ):
- if lstr != rstr:
- return COMPARE_FAILED
- self.stack.append((left[lstr], right[rstr]))
-
- def visit_clauseelement_tuples(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for ltup, rtup in zip_longest(left, right, fillvalue=None):
- if ltup is None or rtup is None:
- return COMPARE_FAILED
-
- for l, r in zip_longest(ltup, rtup, fillvalue=None):
- self.stack.append((l, r))
-
- def visit_clauseelement_list(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for l, r in zip_longest(left, right, fillvalue=None):
- self.stack.append((l, r))
-
- def visit_clauseelement_tuple(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for l, r in zip_longest(left, right, fillvalue=None):
- self.stack.append((l, r))
-
- def _compare_unordered_sequences(self, seq1, seq2, **kw):
- if seq1 is None:
- return seq2 is None
-
- completed: Set[object] = set()
- for clause in seq1:
- for other_clause in set(seq2).difference(completed):
- if self.compare_inner(clause, other_clause, **kw):
- completed.add(other_clause)
- break
- return len(completed) == len(seq1) == len(seq2)
-
- def visit_clauseelement_unordered_set(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return self._compare_unordered_sequences(left, right, **kw)
-
- def visit_fromclause_ordered_set(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for l, r in zip_longest(left, right, fillvalue=None):
- self.stack.append((l, r))
-
- def visit_string(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return left == right
-
- def visit_string_list(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return left == right
-
- def visit_string_multi_dict(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for lk, rk in zip_longest(
- sorted(left.keys()), sorted(right.keys()), fillvalue=(None, None)
- ):
- if lk != rk:
- return COMPARE_FAILED
-
- lv, rv = left[lk], right[rk]
-
- lhc = isinstance(left, HasCacheKey)
- rhc = isinstance(right, HasCacheKey)
- if lhc and rhc:
- if lv._gen_cache_key(
- self.anon_map[0], []
- ) != rv._gen_cache_key(self.anon_map[1], []):
- return COMPARE_FAILED
- elif lhc != rhc:
- return COMPARE_FAILED
- elif lv != rv:
- return COMPARE_FAILED
-
- def visit_multi(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- lhc = isinstance(left, HasCacheKey)
- rhc = isinstance(right, HasCacheKey)
- if lhc and rhc:
- if left._gen_cache_key(
- self.anon_map[0], []
- ) != right._gen_cache_key(self.anon_map[1], []):
- return COMPARE_FAILED
- elif lhc != rhc:
- return COMPARE_FAILED
- else:
- return left == right
-
- def visit_anon_name(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return _resolve_name_for_compare(
- left_parent, left, self.anon_map[0], **kw
- ) == _resolve_name_for_compare(
- right_parent, right, self.anon_map[1], **kw
- )
-
- def visit_boolean(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return left == right
-
- def visit_operator(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return left == right
-
- def visit_type(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return left._compare_type_affinity(right)
-
- def visit_plain_dict(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return left == right
-
- def visit_dialect_options(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return left == right
-
- def visit_annotations_key(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- if left and right:
- return (
- left_parent._annotations_cache_key
- == right_parent._annotations_cache_key
- )
- else:
- return left == right
-
- def visit_with_context_options(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return tuple((fn.__code__, c_key) for fn, c_key in left) == tuple(
- (fn.__code__, c_key) for fn, c_key in right
- )
-
- def visit_plain_obj(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return left == right
-
- def visit_named_ddl_element(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- if left is None:
- if right is not None:
- return COMPARE_FAILED
-
- return left.name == right.name
-
- def visit_prefix_sequence(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for (l_clause, l_str), (r_clause, r_str) in zip_longest(
- left, right, fillvalue=(None, None)
- ):
- if l_str != r_str:
- return COMPARE_FAILED
- else:
- self.stack.append((l_clause, r_clause))
-
- def visit_setup_join_tuple(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- # TODO: look at attrname for "legacy_join" and use different structure
- for (
- (l_target, l_onclause, l_from, l_flags),
- (r_target, r_onclause, r_from, r_flags),
- ) in zip_longest(left, right, fillvalue=(None, None, None, None)):
- if l_flags != r_flags:
- return COMPARE_FAILED
- self.stack.append((l_target, r_target))
- self.stack.append((l_onclause, r_onclause))
- self.stack.append((l_from, r_from))
-
- def visit_memoized_select_entities(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return self.visit_clauseelement_tuple(
- attrname, left_parent, left, right_parent, right, **kw
- )
-
- def visit_table_hint_list(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- left_keys = sorted(left, key=lambda elem: (elem[0].fullname, elem[1]))
- right_keys = sorted(
- right, key=lambda elem: (elem[0].fullname, elem[1])
- )
- for (ltable, ldialect), (rtable, rdialect) in zip_longest(
- left_keys, right_keys, fillvalue=(None, None)
- ):
- if ldialect != rdialect:
- return COMPARE_FAILED
- elif left[(ltable, ldialect)] != right[(rtable, rdialect)]:
- return COMPARE_FAILED
- else:
- self.stack.append((ltable, rtable))
-
- def visit_statement_hint_list(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- return left == right
-
- def visit_unknown_structure(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- raise NotImplementedError()
-
- def visit_dml_ordered_values(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- # sequence of tuple pairs
-
- for (lk, lv), (rk, rv) in zip_longest(
- left, right, fillvalue=(None, None)
- ):
- if not self._compare_dml_values_or_ce(lk, rk, **kw):
- return COMPARE_FAILED
-
- def _compare_dml_values_or_ce(self, lv, rv, **kw):
- lvce = hasattr(lv, "__clause_element__")
- rvce = hasattr(rv, "__clause_element__")
- if lvce != rvce:
- return False
- elif lvce and not self.compare_inner(lv, rv, **kw):
- return False
- elif not lvce and lv != rv:
- return False
- elif not self.compare_inner(lv, rv, **kw):
- return False
-
- return True
-
- def visit_dml_values(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- if left is None or right is None or len(left) != len(right):
- return COMPARE_FAILED
-
- if isinstance(left, collections_abc.Sequence):
- for lv, rv in zip(left, right):
- if not self._compare_dml_values_or_ce(lv, rv, **kw):
- return COMPARE_FAILED
- elif isinstance(right, collections_abc.Sequence):
- return COMPARE_FAILED
- else:
- # dictionaries guaranteed to support insert ordering in
- # py37 so that we can compare the keys in order. without
- # this, we can't compare SQL expression keys because we don't
- # know which key is which
- for (lk, lv), (rk, rv) in zip(left.items(), right.items()):
- if not self._compare_dml_values_or_ce(lk, rk, **kw):
- return COMPARE_FAILED
- if not self._compare_dml_values_or_ce(lv, rv, **kw):
- return COMPARE_FAILED
-
- def visit_dml_multi_values(
- self, attrname, left_parent, left, right_parent, right, **kw
- ):
- for lseq, rseq in zip_longest(left, right, fillvalue=None):
- if lseq is None or rseq is None:
- return COMPARE_FAILED
-
- for ld, rd in zip_longest(lseq, rseq, fillvalue=None):
- if (
- self.visit_dml_values(
- attrname, left_parent, ld, right_parent, rd, **kw
- )
- is COMPARE_FAILED
- ):
- return COMPARE_FAILED
-
- def compare_expression_clauselist(self, left, right, **kw):
- if left.operator is right.operator:
- if operators.is_associative(left.operator):
- if self._compare_unordered_sequences(
- left.clauses, right.clauses, **kw
- ):
- return ["operator", "clauses"]
- else:
- return COMPARE_FAILED
- else:
- return ["operator"]
- else:
- return COMPARE_FAILED
-
- def compare_clauselist(self, left, right, **kw):
- return self.compare_expression_clauselist(left, right, **kw)
-
- def compare_binary(self, left, right, **kw):
- if left.operator == right.operator:
- if operators.is_commutative(left.operator):
- if (
- self.compare_inner(left.left, right.left, **kw)
- and self.compare_inner(left.right, right.right, **kw)
- ) or (
- self.compare_inner(left.left, right.right, **kw)
- and self.compare_inner(left.right, right.left, **kw)
- ):
- return ["operator", "negate", "left", "right"]
- else:
- return COMPARE_FAILED
- else:
- return ["operator", "negate"]
- else:
- return COMPARE_FAILED
-
- def compare_bindparam(self, left, right, **kw):
- compare_keys = kw.pop("compare_keys", True)
- compare_values = kw.pop("compare_values", True)
-
- if compare_values:
- omit = []
- else:
- # this means, "skip these, we already compared"
- omit = ["callable", "value"]
-
- if not compare_keys:
- omit.append("key")
-
- return omit
-
-
-class ColIdentityComparatorStrategy(TraversalComparatorStrategy):
- def compare_column_element(
- self, left, right, use_proxies=True, equivalents=(), **kw
- ):
- """Compare ColumnElements using proxies and equivalent collections.
-
- This is a comparison strategy specific to the ORM.
- """
-
- to_compare = (right,)
- if equivalents and right in equivalents:
- to_compare = equivalents[right].union(to_compare)
-
- for oth in to_compare:
- if use_proxies and left.shares_lineage(oth):
- return SKIP_TRAVERSE
- elif hash(left) == hash(right):
- return SKIP_TRAVERSE
- else:
- return COMPARE_FAILED
-
- def compare_column(self, left, right, **kw):
- return self.compare_column_element(left, right, **kw)
-
- def compare_label(self, left, right, **kw):
- return self.compare_column_element(left, right, **kw)
-
- def compare_table(self, left, right, **kw):
- # tables compare on identity, since it's not really feasible to
- # compare them column by column with the above rules
- return SKIP_TRAVERSE if left is right else COMPARE_FAILED