summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/util/_collections.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/util/_collections.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/util/_collections.py715
1 files changed, 0 insertions, 715 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/util/_collections.py b/venv/lib/python3.11/site-packages/sqlalchemy/util/_collections.py
deleted file mode 100644
index e3a8ad8..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/util/_collections.py
+++ /dev/null
@@ -1,715 +0,0 @@
-# util/_collections.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: allow-untyped-defs, allow-untyped-calls
-
-"""Collection classes and helpers."""
-from __future__ import annotations
-
-import operator
-import threading
-import types
-import typing
-from typing import Any
-from typing import Callable
-from typing import cast
-from typing import Dict
-from typing import FrozenSet
-from typing import Generic
-from typing import Iterable
-from typing import Iterator
-from typing import List
-from typing import Mapping
-from typing import NoReturn
-from typing import Optional
-from typing import overload
-from typing import Sequence
-from typing import Set
-from typing import Tuple
-from typing import TypeVar
-from typing import Union
-from typing import ValuesView
-import weakref
-
-from ._has_cy import HAS_CYEXTENSION
-from .typing import is_non_string_iterable
-from .typing import Literal
-from .typing import Protocol
-
-if typing.TYPE_CHECKING or not HAS_CYEXTENSION:
- from ._py_collections import immutabledict as immutabledict
- from ._py_collections import IdentitySet as IdentitySet
- from ._py_collections import ReadOnlyContainer as ReadOnlyContainer
- from ._py_collections import ImmutableDictBase as ImmutableDictBase
- from ._py_collections import OrderedSet as OrderedSet
- from ._py_collections import unique_list as unique_list
-else:
- from sqlalchemy.cyextension.immutabledict import (
- ReadOnlyContainer as ReadOnlyContainer,
- )
- from sqlalchemy.cyextension.immutabledict import (
- ImmutableDictBase as ImmutableDictBase,
- )
- from sqlalchemy.cyextension.immutabledict import (
- immutabledict as immutabledict,
- )
- from sqlalchemy.cyextension.collections import IdentitySet as IdentitySet
- from sqlalchemy.cyextension.collections import OrderedSet as OrderedSet
- from sqlalchemy.cyextension.collections import ( # noqa
- unique_list as unique_list,
- )
-
-
-_T = TypeVar("_T", bound=Any)
-_KT = TypeVar("_KT", bound=Any)
-_VT = TypeVar("_VT", bound=Any)
-_T_co = TypeVar("_T_co", covariant=True)
-
-EMPTY_SET: FrozenSet[Any] = frozenset()
-NONE_SET: FrozenSet[Any] = frozenset([None])
-
-
-def merge_lists_w_ordering(a: List[Any], b: List[Any]) -> List[Any]:
- """merge two lists, maintaining ordering as much as possible.
-
- this is to reconcile vars(cls) with cls.__annotations__.
-
- Example::
-
- >>> a = ['__tablename__', 'id', 'x', 'created_at']
- >>> b = ['id', 'name', 'data', 'y', 'created_at']
- >>> merge_lists_w_ordering(a, b)
- ['__tablename__', 'id', 'name', 'data', 'y', 'x', 'created_at']
-
- This is not necessarily the ordering that things had on the class,
- in this case the class is::
-
- class User(Base):
- __tablename__ = "users"
-
- id: Mapped[int] = mapped_column(primary_key=True)
- name: Mapped[str]
- data: Mapped[Optional[str]]
- x = Column(Integer)
- y: Mapped[int]
- created_at: Mapped[datetime.datetime] = mapped_column()
-
- But things are *mostly* ordered.
-
- The algorithm could also be done by creating a partial ordering for
- all items in both lists and then using topological_sort(), but that
- is too much overhead.
-
- Background on how I came up with this is at:
- https://gist.github.com/zzzeek/89de958cf0803d148e74861bd682ebae
-
- """
- overlap = set(a).intersection(b)
-
- result = []
-
- current, other = iter(a), iter(b)
-
- while True:
- for element in current:
- if element in overlap:
- overlap.discard(element)
- other, current = current, other
- break
-
- result.append(element)
- else:
- result.extend(other)
- break
-
- return result
-
-
-def coerce_to_immutabledict(d: Mapping[_KT, _VT]) -> immutabledict[_KT, _VT]:
- if not d:
- return EMPTY_DICT
- elif isinstance(d, immutabledict):
- return d
- else:
- return immutabledict(d)
-
-
-EMPTY_DICT: immutabledict[Any, Any] = immutabledict()
-
-
-class FacadeDict(ImmutableDictBase[_KT, _VT]):
- """A dictionary that is not publicly mutable."""
-
- def __new__(cls, *args: Any) -> FacadeDict[Any, Any]:
- new = ImmutableDictBase.__new__(cls)
- return new
-
- def copy(self) -> NoReturn:
- raise NotImplementedError(
- "an immutabledict shouldn't need to be copied. use dict(d) "
- "if you need a mutable dictionary."
- )
-
- def __reduce__(self) -> Any:
- return FacadeDict, (dict(self),)
-
- def _insert_item(self, key: _KT, value: _VT) -> None:
- """insert an item into the dictionary directly."""
- dict.__setitem__(self, key, value)
-
- def __repr__(self) -> str:
- return "FacadeDict(%s)" % dict.__repr__(self)
-
-
-_DT = TypeVar("_DT", bound=Any)
-
-_F = TypeVar("_F", bound=Any)
-
-
-class Properties(Generic[_T]):
- """Provide a __getattr__/__setattr__ interface over a dict."""
-
- __slots__ = ("_data",)
-
- _data: Dict[str, _T]
-
- def __init__(self, data: Dict[str, _T]):
- object.__setattr__(self, "_data", data)
-
- def __len__(self) -> int:
- return len(self._data)
-
- def __iter__(self) -> Iterator[_T]:
- return iter(list(self._data.values()))
-
- def __dir__(self) -> List[str]:
- return dir(super()) + [str(k) for k in self._data.keys()]
-
- def __add__(self, other: Properties[_F]) -> List[Union[_T, _F]]:
- return list(self) + list(other)
-
- def __setitem__(self, key: str, obj: _T) -> None:
- self._data[key] = obj
-
- def __getitem__(self, key: str) -> _T:
- return self._data[key]
-
- def __delitem__(self, key: str) -> None:
- del self._data[key]
-
- def __setattr__(self, key: str, obj: _T) -> None:
- self._data[key] = obj
-
- def __getstate__(self) -> Dict[str, Any]:
- return {"_data": self._data}
-
- def __setstate__(self, state: Dict[str, Any]) -> None:
- object.__setattr__(self, "_data", state["_data"])
-
- def __getattr__(self, key: str) -> _T:
- try:
- return self._data[key]
- except KeyError:
- raise AttributeError(key)
-
- def __contains__(self, key: str) -> bool:
- return key in self._data
-
- def as_readonly(self) -> ReadOnlyProperties[_T]:
- """Return an immutable proxy for this :class:`.Properties`."""
-
- return ReadOnlyProperties(self._data)
-
- def update(self, value: Dict[str, _T]) -> None:
- self._data.update(value)
-
- @overload
- def get(self, key: str) -> Optional[_T]: ...
-
- @overload
- def get(self, key: str, default: Union[_DT, _T]) -> Union[_DT, _T]: ...
-
- def get(
- self, key: str, default: Optional[Union[_DT, _T]] = None
- ) -> Optional[Union[_T, _DT]]:
- if key in self:
- return self[key]
- else:
- return default
-
- def keys(self) -> List[str]:
- return list(self._data)
-
- def values(self) -> List[_T]:
- return list(self._data.values())
-
- def items(self) -> List[Tuple[str, _T]]:
- return list(self._data.items())
-
- def has_key(self, key: str) -> bool:
- return key in self._data
-
- def clear(self) -> None:
- self._data.clear()
-
-
-class OrderedProperties(Properties[_T]):
- """Provide a __getattr__/__setattr__ interface with an OrderedDict
- as backing store."""
-
- __slots__ = ()
-
- def __init__(self):
- Properties.__init__(self, OrderedDict())
-
-
-class ReadOnlyProperties(ReadOnlyContainer, Properties[_T]):
- """Provide immutable dict/object attribute to an underlying dictionary."""
-
- __slots__ = ()
-
-
-def _ordered_dictionary_sort(d, key=None):
- """Sort an OrderedDict in-place."""
-
- items = [(k, d[k]) for k in sorted(d, key=key)]
-
- d.clear()
-
- d.update(items)
-
-
-OrderedDict = dict
-sort_dictionary = _ordered_dictionary_sort
-
-
-class WeakSequence(Sequence[_T]):
- def __init__(self, __elements: Sequence[_T] = ()):
- # adapted from weakref.WeakKeyDictionary, prevent reference
- # cycles in the collection itself
- def _remove(item, selfref=weakref.ref(self)):
- self = selfref()
- if self is not None:
- self._storage.remove(item)
-
- self._remove = _remove
- self._storage = [
- weakref.ref(element, _remove) for element in __elements
- ]
-
- def append(self, item):
- self._storage.append(weakref.ref(item, self._remove))
-
- def __len__(self):
- return len(self._storage)
-
- def __iter__(self):
- return (
- obj for obj in (ref() for ref in self._storage) if obj is not None
- )
-
- def __getitem__(self, index):
- try:
- obj = self._storage[index]
- except KeyError:
- raise IndexError("Index %s out of range" % index)
- else:
- return obj()
-
-
-class OrderedIdentitySet(IdentitySet):
- def __init__(self, iterable: Optional[Iterable[Any]] = None):
- IdentitySet.__init__(self)
- self._members = OrderedDict()
- if iterable:
- for o in iterable:
- self.add(o)
-
-
-class PopulateDict(Dict[_KT, _VT]):
- """A dict which populates missing values via a creation function.
-
- Note the creation function takes a key, unlike
- collections.defaultdict.
-
- """
-
- def __init__(self, creator: Callable[[_KT], _VT]):
- self.creator = creator
-
- def __missing__(self, key: Any) -> Any:
- self[key] = val = self.creator(key)
- return val
-
-
-class WeakPopulateDict(Dict[_KT, _VT]):
- """Like PopulateDict, but assumes a self + a method and does not create
- a reference cycle.
-
- """
-
- def __init__(self, creator_method: types.MethodType):
- self.creator = creator_method.__func__
- weakself = creator_method.__self__
- self.weakself = weakref.ref(weakself)
-
- def __missing__(self, key: Any) -> Any:
- self[key] = val = self.creator(self.weakself(), key)
- return val
-
-
-# Define collections that are capable of storing
-# ColumnElement objects as hashable keys/elements.
-# At this point, these are mostly historical, things
-# used to be more complicated.
-column_set = set
-column_dict = dict
-ordered_column_set = OrderedSet
-
-
-class UniqueAppender(Generic[_T]):
- """Appends items to a collection ensuring uniqueness.
-
- Additional appends() of the same object are ignored. Membership is
- determined by identity (``is a``) not equality (``==``).
- """
-
- __slots__ = "data", "_data_appender", "_unique"
-
- data: Union[Iterable[_T], Set[_T], List[_T]]
- _data_appender: Callable[[_T], None]
- _unique: Dict[int, Literal[True]]
-
- def __init__(
- self,
- data: Union[Iterable[_T], Set[_T], List[_T]],
- via: Optional[str] = None,
- ):
- self.data = data
- self._unique = {}
- if via:
- self._data_appender = getattr(data, via)
- elif hasattr(data, "append"):
- self._data_appender = cast("List[_T]", data).append
- elif hasattr(data, "add"):
- self._data_appender = cast("Set[_T]", data).add
-
- def append(self, item: _T) -> None:
- id_ = id(item)
- if id_ not in self._unique:
- self._data_appender(item)
- self._unique[id_] = True
-
- def __iter__(self) -> Iterator[_T]:
- return iter(self.data)
-
-
-def coerce_generator_arg(arg: Any) -> List[Any]:
- if len(arg) == 1 and isinstance(arg[0], types.GeneratorType):
- return list(arg[0])
- else:
- return cast("List[Any]", arg)
-
-
-def to_list(x: Any, default: Optional[List[Any]] = None) -> List[Any]:
- if x is None:
- return default # type: ignore
- if not is_non_string_iterable(x):
- return [x]
- elif isinstance(x, list):
- return x
- else:
- return list(x)
-
-
-def has_intersection(set_, iterable):
- r"""return True if any items of set\_ are present in iterable.
-
- Goes through special effort to ensure __hash__ is not called
- on items in iterable that don't support it.
-
- """
- # TODO: optimize, write in C, etc.
- return bool(set_.intersection([i for i in iterable if i.__hash__]))
-
-
-def to_set(x):
- if x is None:
- return set()
- if not isinstance(x, set):
- return set(to_list(x))
- else:
- return x
-
-
-def to_column_set(x: Any) -> Set[Any]:
- if x is None:
- return column_set()
- if not isinstance(x, column_set):
- return column_set(to_list(x))
- else:
- return x
-
-
-def update_copy(d, _new=None, **kw):
- """Copy the given dict and update with the given values."""
-
- d = d.copy()
- if _new:
- d.update(_new)
- d.update(**kw)
- return d
-
-
-def flatten_iterator(x: Iterable[_T]) -> Iterator[_T]:
- """Given an iterator of which further sub-elements may also be
- iterators, flatten the sub-elements into a single iterator.
-
- """
- elem: _T
- for elem in x:
- if not isinstance(elem, str) and hasattr(elem, "__iter__"):
- yield from flatten_iterator(elem)
- else:
- yield elem
-
-
-class LRUCache(typing.MutableMapping[_KT, _VT]):
- """Dictionary with 'squishy' removal of least
- recently used items.
-
- Note that either get() or [] should be used here, but
- generally its not safe to do an "in" check first as the dictionary
- can change subsequent to that call.
-
- """
-
- __slots__ = (
- "capacity",
- "threshold",
- "size_alert",
- "_data",
- "_counter",
- "_mutex",
- )
-
- capacity: int
- threshold: float
- size_alert: Optional[Callable[[LRUCache[_KT, _VT]], None]]
-
- def __init__(
- self,
- capacity: int = 100,
- threshold: float = 0.5,
- size_alert: Optional[Callable[..., None]] = None,
- ):
- self.capacity = capacity
- self.threshold = threshold
- self.size_alert = size_alert
- self._counter = 0
- self._mutex = threading.Lock()
- self._data: Dict[_KT, Tuple[_KT, _VT, List[int]]] = {}
-
- def _inc_counter(self):
- self._counter += 1
- return self._counter
-
- @overload
- def get(self, key: _KT) -> Optional[_VT]: ...
-
- @overload
- def get(self, key: _KT, default: Union[_VT, _T]) -> Union[_VT, _T]: ...
-
- def get(
- self, key: _KT, default: Optional[Union[_VT, _T]] = None
- ) -> Optional[Union[_VT, _T]]:
- item = self._data.get(key)
- if item is not None:
- item[2][0] = self._inc_counter()
- return item[1]
- else:
- return default
-
- def __getitem__(self, key: _KT) -> _VT:
- item = self._data[key]
- item[2][0] = self._inc_counter()
- return item[1]
-
- def __iter__(self) -> Iterator[_KT]:
- return iter(self._data)
-
- def __len__(self) -> int:
- return len(self._data)
-
- def values(self) -> ValuesView[_VT]:
- return typing.ValuesView({k: i[1] for k, i in self._data.items()})
-
- def __setitem__(self, key: _KT, value: _VT) -> None:
- self._data[key] = (key, value, [self._inc_counter()])
- self._manage_size()
-
- def __delitem__(self, __v: _KT) -> None:
- del self._data[__v]
-
- @property
- def size_threshold(self) -> float:
- return self.capacity + self.capacity * self.threshold
-
- def _manage_size(self) -> None:
- if not self._mutex.acquire(False):
- return
- try:
- size_alert = bool(self.size_alert)
- while len(self) > self.capacity + self.capacity * self.threshold:
- if size_alert:
- size_alert = False
- self.size_alert(self) # type: ignore
- by_counter = sorted(
- self._data.values(),
- key=operator.itemgetter(2),
- reverse=True,
- )
- for item in by_counter[self.capacity :]:
- try:
- del self._data[item[0]]
- except KeyError:
- # deleted elsewhere; skip
- continue
- finally:
- self._mutex.release()
-
-
-class _CreateFuncType(Protocol[_T_co]):
- def __call__(self) -> _T_co: ...
-
-
-class _ScopeFuncType(Protocol):
- def __call__(self) -> Any: ...
-
-
-class ScopedRegistry(Generic[_T]):
- """A Registry that can store one or multiple instances of a single
- class on the basis of a "scope" function.
-
- The object implements ``__call__`` as the "getter", so by
- calling ``myregistry()`` the contained object is returned
- for the current scope.
-
- :param createfunc:
- a callable that returns a new object to be placed in the registry
-
- :param scopefunc:
- a callable that will return a key to store/retrieve an object.
- """
-
- __slots__ = "createfunc", "scopefunc", "registry"
-
- createfunc: _CreateFuncType[_T]
- scopefunc: _ScopeFuncType
- registry: Any
-
- def __init__(
- self, createfunc: Callable[[], _T], scopefunc: Callable[[], Any]
- ):
- """Construct a new :class:`.ScopedRegistry`.
-
- :param createfunc: A creation function that will generate
- a new value for the current scope, if none is present.
-
- :param scopefunc: A function that returns a hashable
- token representing the current scope (such as, current
- thread identifier).
-
- """
- self.createfunc = createfunc
- self.scopefunc = scopefunc
- self.registry = {}
-
- def __call__(self) -> _T:
- key = self.scopefunc()
- try:
- return self.registry[key] # type: ignore[no-any-return]
- except KeyError:
- return self.registry.setdefault(key, self.createfunc()) # type: ignore[no-any-return] # noqa: E501
-
- def has(self) -> bool:
- """Return True if an object is present in the current scope."""
-
- return self.scopefunc() in self.registry
-
- def set(self, obj: _T) -> None:
- """Set the value for the current scope."""
-
- self.registry[self.scopefunc()] = obj
-
- def clear(self) -> None:
- """Clear the current scope, if any."""
-
- try:
- del self.registry[self.scopefunc()]
- except KeyError:
- pass
-
-
-class ThreadLocalRegistry(ScopedRegistry[_T]):
- """A :class:`.ScopedRegistry` that uses a ``threading.local()``
- variable for storage.
-
- """
-
- def __init__(self, createfunc: Callable[[], _T]):
- self.createfunc = createfunc
- self.registry = threading.local()
-
- def __call__(self) -> _T:
- try:
- return self.registry.value # type: ignore[no-any-return]
- except AttributeError:
- val = self.registry.value = self.createfunc()
- return val
-
- def has(self) -> bool:
- return hasattr(self.registry, "value")
-
- def set(self, obj: _T) -> None:
- self.registry.value = obj
-
- def clear(self) -> None:
- try:
- del self.registry.value
- except AttributeError:
- pass
-
-
-def has_dupes(sequence, target):
- """Given a sequence and search object, return True if there's more
- than one, False if zero or one of them.
-
-
- """
- # compare to .index version below, this version introduces less function
- # overhead and is usually the same speed. At 15000 items (way bigger than
- # a relationship-bound collection in memory usually is) it begins to
- # fall behind the other version only by microseconds.
- c = 0
- for item in sequence:
- if item is target:
- c += 1
- if c > 1:
- return True
- return False
-
-
-# .index version. the two __contains__ calls as well
-# as .index() and isinstance() slow this down.
-# def has_dupes(sequence, target):
-# if target not in sequence:
-# return False
-# elif not isinstance(sequence, collections_abc.Sequence):
-# return False
-#
-# idx = sequence.index(target)
-# return target in sequence[idx + 1:]