From 12cf076118570eebbff08c6b3090e0d4798447a1 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:17:55 -0400 Subject: no venv --- .../site-packages/sqlalchemy/orm/clsregistry.py | 570 --------------------- 1 file changed, 570 deletions(-) delete mode 100644 venv/lib/python3.11/site-packages/sqlalchemy/orm/clsregistry.py (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/clsregistry.py') diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/clsregistry.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/clsregistry.py deleted file mode 100644 index 26113d8..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/orm/clsregistry.py +++ /dev/null @@ -1,570 +0,0 @@ -# orm/clsregistry.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php - -"""Routines to handle the string class registry used by declarative. - -This system allows specification of classes and expressions used in -:func:`_orm.relationship` using strings. - -""" - -from __future__ import annotations - -import re -from typing import Any -from typing import Callable -from typing import cast -from typing import Dict -from typing import Generator -from typing import Iterable -from typing import List -from typing import Mapping -from typing import MutableMapping -from typing import NoReturn -from typing import Optional -from typing import Set -from typing import Tuple -from typing import Type -from typing import TYPE_CHECKING -from typing import TypeVar -from typing import Union -import weakref - -from . import attributes -from . import interfaces -from .descriptor_props import SynonymProperty -from .properties import ColumnProperty -from .util import class_mapper -from .. import exc -from .. import inspection -from .. import util -from ..sql.schema import _get_table_key -from ..util.typing import CallableReference - -if TYPE_CHECKING: - from .relationships import RelationshipProperty - from ..sql.schema import MetaData - from ..sql.schema import Table - -_T = TypeVar("_T", bound=Any) - -_ClsRegistryType = MutableMapping[str, Union[type, "ClsRegistryToken"]] - -# strong references to registries which we place in -# the _decl_class_registry, which is usually weak referencing. -# the internal registries here link to classes with weakrefs and remove -# themselves when all references to contained classes are removed. -_registries: Set[ClsRegistryToken] = set() - - -def add_class( - classname: str, cls: Type[_T], decl_class_registry: _ClsRegistryType -) -> None: - """Add a class to the _decl_class_registry associated with the - given declarative class. - - """ - if classname in decl_class_registry: - # class already exists. - existing = decl_class_registry[classname] - if not isinstance(existing, _MultipleClassMarker): - existing = decl_class_registry[classname] = _MultipleClassMarker( - [cls, cast("Type[Any]", existing)] - ) - else: - decl_class_registry[classname] = cls - - try: - root_module = cast( - _ModuleMarker, decl_class_registry["_sa_module_registry"] - ) - except KeyError: - decl_class_registry["_sa_module_registry"] = root_module = ( - _ModuleMarker("_sa_module_registry", None) - ) - - tokens = cls.__module__.split(".") - - # build up a tree like this: - # modulename: myapp.snacks.nuts - # - # myapp->snack->nuts->(classes) - # snack->nuts->(classes) - # nuts->(classes) - # - # this allows partial token paths to be used. - while tokens: - token = tokens.pop(0) - module = root_module.get_module(token) - for token in tokens: - module = module.get_module(token) - - try: - module.add_class(classname, cls) - except AttributeError as ae: - if not isinstance(module, _ModuleMarker): - raise exc.InvalidRequestError( - f'name "{classname}" matches both a ' - "class name and a module name" - ) from ae - else: - raise - - -def remove_class( - classname: str, cls: Type[Any], decl_class_registry: _ClsRegistryType -) -> None: - if classname in decl_class_registry: - existing = decl_class_registry[classname] - if isinstance(existing, _MultipleClassMarker): - existing.remove_item(cls) - else: - del decl_class_registry[classname] - - try: - root_module = cast( - _ModuleMarker, decl_class_registry["_sa_module_registry"] - ) - except KeyError: - return - - tokens = cls.__module__.split(".") - - while tokens: - token = tokens.pop(0) - module = root_module.get_module(token) - for token in tokens: - module = module.get_module(token) - try: - module.remove_class(classname, cls) - except AttributeError: - if not isinstance(module, _ModuleMarker): - pass - else: - raise - - -def _key_is_empty( - key: str, - decl_class_registry: _ClsRegistryType, - test: Callable[[Any], bool], -) -> bool: - """test if a key is empty of a certain object. - - used for unit tests against the registry to see if garbage collection - is working. - - "test" is a callable that will be passed an object should return True - if the given object is the one we were looking for. - - We can't pass the actual object itself b.c. this is for testing garbage - collection; the caller will have to have removed references to the - object itself. - - """ - if key not in decl_class_registry: - return True - - thing = decl_class_registry[key] - if isinstance(thing, _MultipleClassMarker): - for sub_thing in thing.contents: - if test(sub_thing): - return False - else: - raise NotImplementedError("unknown codepath") - else: - return not test(thing) - - -class ClsRegistryToken: - """an object that can be in the registry._class_registry as a value.""" - - __slots__ = () - - -class _MultipleClassMarker(ClsRegistryToken): - """refers to multiple classes of the same name - within _decl_class_registry. - - """ - - __slots__ = "on_remove", "contents", "__weakref__" - - contents: Set[weakref.ref[Type[Any]]] - on_remove: CallableReference[Optional[Callable[[], None]]] - - def __init__( - self, - classes: Iterable[Type[Any]], - on_remove: Optional[Callable[[], None]] = None, - ): - self.on_remove = on_remove - self.contents = { - weakref.ref(item, self._remove_item) for item in classes - } - _registries.add(self) - - def remove_item(self, cls: Type[Any]) -> None: - self._remove_item(weakref.ref(cls)) - - def __iter__(self) -> Generator[Optional[Type[Any]], None, None]: - return (ref() for ref in self.contents) - - def attempt_get(self, path: List[str], key: str) -> Type[Any]: - if len(self.contents) > 1: - raise exc.InvalidRequestError( - 'Multiple classes found for path "%s" ' - "in the registry of this declarative " - "base. Please use a fully module-qualified path." - % (".".join(path + [key])) - ) - else: - ref = list(self.contents)[0] - cls = ref() - if cls is None: - raise NameError(key) - return cls - - def _remove_item(self, ref: weakref.ref[Type[Any]]) -> None: - self.contents.discard(ref) - if not self.contents: - _registries.discard(self) - if self.on_remove: - self.on_remove() - - def add_item(self, item: Type[Any]) -> None: - # protect against class registration race condition against - # asynchronous garbage collection calling _remove_item, - # [ticket:3208] and [ticket:10782] - modules = { - cls.__module__ - for cls in [ref() for ref in list(self.contents)] - if cls is not None - } - if item.__module__ in modules: - util.warn( - "This declarative base already contains a class with the " - "same class name and module name as %s.%s, and will " - "be replaced in the string-lookup table." - % (item.__module__, item.__name__) - ) - self.contents.add(weakref.ref(item, self._remove_item)) - - -class _ModuleMarker(ClsRegistryToken): - """Refers to a module name within - _decl_class_registry. - - """ - - __slots__ = "parent", "name", "contents", "mod_ns", "path", "__weakref__" - - parent: Optional[_ModuleMarker] - contents: Dict[str, Union[_ModuleMarker, _MultipleClassMarker]] - mod_ns: _ModNS - path: List[str] - - def __init__(self, name: str, parent: Optional[_ModuleMarker]): - self.parent = parent - self.name = name - self.contents = {} - self.mod_ns = _ModNS(self) - if self.parent: - self.path = self.parent.path + [self.name] - else: - self.path = [] - _registries.add(self) - - def __contains__(self, name: str) -> bool: - return name in self.contents - - def __getitem__(self, name: str) -> ClsRegistryToken: - return self.contents[name] - - def _remove_item(self, name: str) -> None: - self.contents.pop(name, None) - if not self.contents and self.parent is not None: - self.parent._remove_item(self.name) - _registries.discard(self) - - def resolve_attr(self, key: str) -> Union[_ModNS, Type[Any]]: - return self.mod_ns.__getattr__(key) - - def get_module(self, name: str) -> _ModuleMarker: - if name not in self.contents: - marker = _ModuleMarker(name, self) - self.contents[name] = marker - else: - marker = cast(_ModuleMarker, self.contents[name]) - return marker - - def add_class(self, name: str, cls: Type[Any]) -> None: - if name in self.contents: - existing = cast(_MultipleClassMarker, self.contents[name]) - try: - existing.add_item(cls) - except AttributeError as ae: - if not isinstance(existing, _MultipleClassMarker): - raise exc.InvalidRequestError( - f'name "{name}" matches both a ' - "class name and a module name" - ) from ae - else: - raise - else: - existing = self.contents[name] = _MultipleClassMarker( - [cls], on_remove=lambda: self._remove_item(name) - ) - - def remove_class(self, name: str, cls: Type[Any]) -> None: - if name in self.contents: - existing = cast(_MultipleClassMarker, self.contents[name]) - existing.remove_item(cls) - - -class _ModNS: - __slots__ = ("__parent",) - - __parent: _ModuleMarker - - def __init__(self, parent: _ModuleMarker): - self.__parent = parent - - def __getattr__(self, key: str) -> Union[_ModNS, Type[Any]]: - try: - value = self.__parent.contents[key] - except KeyError: - pass - else: - if value is not None: - if isinstance(value, _ModuleMarker): - return value.mod_ns - else: - assert isinstance(value, _MultipleClassMarker) - return value.attempt_get(self.__parent.path, key) - raise NameError( - "Module %r has no mapped classes " - "registered under the name %r" % (self.__parent.name, key) - ) - - -class _GetColumns: - __slots__ = ("cls",) - - cls: Type[Any] - - def __init__(self, cls: Type[Any]): - self.cls = cls - - def __getattr__(self, key: str) -> Any: - mp = class_mapper(self.cls, configure=False) - if mp: - if key not in mp.all_orm_descriptors: - raise AttributeError( - "Class %r does not have a mapped column named %r" - % (self.cls, key) - ) - - desc = mp.all_orm_descriptors[key] - if desc.extension_type is interfaces.NotExtension.NOT_EXTENSION: - assert isinstance(desc, attributes.QueryableAttribute) - prop = desc.property - if isinstance(prop, SynonymProperty): - key = prop.name - elif not isinstance(prop, ColumnProperty): - raise exc.InvalidRequestError( - "Property %r is not an instance of" - " ColumnProperty (i.e. does not correspond" - " directly to a Column)." % key - ) - return getattr(self.cls, key) - - -inspection._inspects(_GetColumns)( - lambda target: inspection.inspect(target.cls) -) - - -class _GetTable: - __slots__ = "key", "metadata" - - key: str - metadata: MetaData - - def __init__(self, key: str, metadata: MetaData): - self.key = key - self.metadata = metadata - - def __getattr__(self, key: str) -> Table: - return self.metadata.tables[_get_table_key(key, self.key)] - - -def _determine_container(key: str, value: Any) -> _GetColumns: - if isinstance(value, _MultipleClassMarker): - value = value.attempt_get([], key) - return _GetColumns(value) - - -class _class_resolver: - __slots__ = ( - "cls", - "prop", - "arg", - "fallback", - "_dict", - "_resolvers", - "favor_tables", - ) - - cls: Type[Any] - prop: RelationshipProperty[Any] - fallback: Mapping[str, Any] - arg: str - favor_tables: bool - _resolvers: Tuple[Callable[[str], Any], ...] - - def __init__( - self, - cls: Type[Any], - prop: RelationshipProperty[Any], - fallback: Mapping[str, Any], - arg: str, - favor_tables: bool = False, - ): - self.cls = cls - self.prop = prop - self.arg = arg - self.fallback = fallback - self._dict = util.PopulateDict(self._access_cls) - self._resolvers = () - self.favor_tables = favor_tables - - def _access_cls(self, key: str) -> Any: - cls = self.cls - - manager = attributes.manager_of_class(cls) - decl_base = manager.registry - assert decl_base is not None - decl_class_registry = decl_base._class_registry - metadata = decl_base.metadata - - if self.favor_tables: - if key in metadata.tables: - return metadata.tables[key] - elif key in metadata._schemas: - return _GetTable(key, getattr(cls, "metadata", metadata)) - - if key in decl_class_registry: - return _determine_container(key, decl_class_registry[key]) - - if not self.favor_tables: - if key in metadata.tables: - return metadata.tables[key] - elif key in metadata._schemas: - return _GetTable(key, getattr(cls, "metadata", metadata)) - - if "_sa_module_registry" in decl_class_registry and key in cast( - _ModuleMarker, decl_class_registry["_sa_module_registry"] - ): - registry = cast( - _ModuleMarker, decl_class_registry["_sa_module_registry"] - ) - return registry.resolve_attr(key) - elif self._resolvers: - for resolv in self._resolvers: - value = resolv(key) - if value is not None: - return value - - return self.fallback[key] - - def _raise_for_name(self, name: str, err: Exception) -> NoReturn: - generic_match = re.match(r"(.+)\[(.+)\]", name) - - if generic_match: - clsarg = generic_match.group(2).strip("'") - raise exc.InvalidRequestError( - f"When initializing mapper {self.prop.parent}, " - f'expression "relationship({self.arg!r})" seems to be ' - "using a generic class as the argument to relationship(); " - "please state the generic argument " - "using an annotation, e.g. " - f'"{self.prop.key}: Mapped[{generic_match.group(1)}' - f"['{clsarg}']] = relationship()\"" - ) from err - else: - raise exc.InvalidRequestError( - "When initializing mapper %s, expression %r failed to " - "locate a name (%r). If this is a class name, consider " - "adding this relationship() to the %r class after " - "both dependent classes have been defined." - % (self.prop.parent, self.arg, name, self.cls) - ) from err - - def _resolve_name(self) -> Union[Table, Type[Any], _ModNS]: - name = self.arg - d = self._dict - rval = None - try: - for token in name.split("."): - if rval is None: - rval = d[token] - else: - rval = getattr(rval, token) - except KeyError as err: - self._raise_for_name(name, err) - except NameError as n: - self._raise_for_name(n.args[0], n) - else: - if isinstance(rval, _GetColumns): - return rval.cls - else: - if TYPE_CHECKING: - assert isinstance(rval, (type, Table, _ModNS)) - return rval - - def __call__(self) -> Any: - try: - x = eval(self.arg, globals(), self._dict) - - if isinstance(x, _GetColumns): - return x.cls - else: - return x - except NameError as n: - self._raise_for_name(n.args[0], n) - - -_fallback_dict: Mapping[str, Any] = None # type: ignore - - -def _resolver(cls: Type[Any], prop: RelationshipProperty[Any]) -> Tuple[ - Callable[[str], Callable[[], Union[Type[Any], Table, _ModNS]]], - Callable[[str, bool], _class_resolver], -]: - global _fallback_dict - - if _fallback_dict is None: - import sqlalchemy - from . import foreign - from . import remote - - _fallback_dict = util.immutabledict(sqlalchemy.__dict__).union( - {"foreign": foreign, "remote": remote} - ) - - def resolve_arg(arg: str, favor_tables: bool = False) -> _class_resolver: - return _class_resolver( - cls, prop, _fallback_dict, arg, favor_tables=favor_tables - ) - - def resolve_name( - arg: str, - ) -> Callable[[], Union[Type[Any], Table, _ModNS]]: - return _class_resolver(cls, prop, _fallback_dict, arg)._resolve_name - - return resolve_name, resolve_arg -- cgit v1.2.3