summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/orm/decl_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/orm/decl_base.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/orm/decl_base.py2152
1 files changed, 2152 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/orm/decl_base.py b/venv/lib/python3.11/site-packages/sqlalchemy/orm/decl_base.py
new file mode 100644
index 0000000..96530c3
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/orm/decl_base.py
@@ -0,0 +1,2152 @@
+# orm/decl_base.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+"""Internal implementation for declarative."""
+
+from __future__ import annotations
+
+import collections
+import dataclasses
+import re
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Mapping
+from typing import NamedTuple
+from typing import NoReturn
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+import weakref
+
+from . import attributes
+from . import clsregistry
+from . import exc as orm_exc
+from . import instrumentation
+from . import mapperlib
+from ._typing import _O
+from ._typing import attr_is_internal_proxy
+from .attributes import InstrumentedAttribute
+from .attributes import QueryableAttribute
+from .base import _is_mapped_class
+from .base import InspectionAttr
+from .descriptor_props import CompositeProperty
+from .descriptor_props import SynonymProperty
+from .interfaces import _AttributeOptions
+from .interfaces import _DCAttributeOptions
+from .interfaces import _IntrospectsAnnotations
+from .interfaces import _MappedAttribute
+from .interfaces import _MapsColumns
+from .interfaces import MapperProperty
+from .mapper import Mapper
+from .properties import ColumnProperty
+from .properties import MappedColumn
+from .util import _extract_mapped_subtype
+from .util import _is_mapped_annotation
+from .util import class_mapper
+from .util import de_stringify_annotation
+from .. import event
+from .. import exc
+from .. import util
+from ..sql import expression
+from ..sql.base import _NoArg
+from ..sql.schema import Column
+from ..sql.schema import Table
+from ..util import topological
+from ..util.typing import _AnnotationScanType
+from ..util.typing import is_fwd_ref
+from ..util.typing import is_literal
+from ..util.typing import Protocol
+from ..util.typing import TypedDict
+from ..util.typing import typing_get_args
+
+if TYPE_CHECKING:
+ from ._typing import _ClassDict
+ from ._typing import _RegistryType
+ from .base import Mapped
+ from .decl_api import declared_attr
+ from .instrumentation import ClassManager
+ from ..sql.elements import NamedColumn
+ from ..sql.schema import MetaData
+ from ..sql.selectable import FromClause
+
+_T = TypeVar("_T", bound=Any)
+
+_MapperKwArgs = Mapping[str, Any]
+_TableArgsType = Union[Tuple[Any, ...], Dict[str, Any]]
+
+
+class MappedClassProtocol(Protocol[_O]):
+ """A protocol representing a SQLAlchemy mapped class.
+
+ The protocol is generic on the type of class, use
+ ``MappedClassProtocol[Any]`` to allow any mapped class.
+ """
+
+ __name__: str
+ __mapper__: Mapper[_O]
+ __table__: FromClause
+
+ def __call__(self, **kw: Any) -> _O: ...
+
+
+class _DeclMappedClassProtocol(MappedClassProtocol[_O], Protocol):
+ "Internal more detailed version of ``MappedClassProtocol``."
+ metadata: MetaData
+ __tablename__: str
+ __mapper_args__: _MapperKwArgs
+ __table_args__: Optional[_TableArgsType]
+
+ _sa_apply_dc_transforms: Optional[_DataclassArguments]
+
+ def __declare_first__(self) -> None: ...
+
+ def __declare_last__(self) -> None: ...
+
+
+class _DataclassArguments(TypedDict):
+ init: Union[_NoArg, bool]
+ repr: Union[_NoArg, bool]
+ eq: Union[_NoArg, bool]
+ order: Union[_NoArg, bool]
+ unsafe_hash: Union[_NoArg, bool]
+ match_args: Union[_NoArg, bool]
+ kw_only: Union[_NoArg, bool]
+ dataclass_callable: Union[_NoArg, Callable[..., Type[Any]]]
+
+
+def _declared_mapping_info(
+ cls: Type[Any],
+) -> Optional[Union[_DeferredMapperConfig, Mapper[Any]]]:
+ # deferred mapping
+ if _DeferredMapperConfig.has_cls(cls):
+ return _DeferredMapperConfig.config_for_cls(cls)
+ # regular mapping
+ elif _is_mapped_class(cls):
+ return class_mapper(cls, configure=False)
+ else:
+ return None
+
+
+def _is_supercls_for_inherits(cls: Type[Any]) -> bool:
+ """return True if this class will be used as a superclass to set in
+ 'inherits'.
+
+ This includes deferred mapper configs that aren't mapped yet, however does
+ not include classes with _sa_decl_prepare_nocascade (e.g.
+ ``AbstractConcreteBase``); these concrete-only classes are not set up as
+ "inherits" until after mappers are configured using
+ mapper._set_concrete_base()
+
+ """
+ if _DeferredMapperConfig.has_cls(cls):
+ return not _get_immediate_cls_attr(
+ cls, "_sa_decl_prepare_nocascade", strict=True
+ )
+ # regular mapping
+ elif _is_mapped_class(cls):
+ return True
+ else:
+ return False
+
+
+def _resolve_for_abstract_or_classical(cls: Type[Any]) -> Optional[Type[Any]]:
+ if cls is object:
+ return None
+
+ sup: Optional[Type[Any]]
+
+ if cls.__dict__.get("__abstract__", False):
+ for base_ in cls.__bases__:
+ sup = _resolve_for_abstract_or_classical(base_)
+ if sup is not None:
+ return sup
+ else:
+ return None
+ else:
+ clsmanager = _dive_for_cls_manager(cls)
+
+ if clsmanager:
+ return clsmanager.class_
+ else:
+ return cls
+
+
+def _get_immediate_cls_attr(
+ cls: Type[Any], attrname: str, strict: bool = False
+) -> Optional[Any]:
+ """return an attribute of the class that is either present directly
+ on the class, e.g. not on a superclass, or is from a superclass but
+ this superclass is a non-mapped mixin, that is, not a descendant of
+ the declarative base and is also not classically mapped.
+
+ This is used to detect attributes that indicate something about
+ a mapped class independently from any mapped classes that it may
+ inherit from.
+
+ """
+
+ # the rules are different for this name than others,
+ # make sure we've moved it out. transitional
+ assert attrname != "__abstract__"
+
+ if not issubclass(cls, object):
+ return None
+
+ if attrname in cls.__dict__:
+ return getattr(cls, attrname)
+
+ for base in cls.__mro__[1:]:
+ _is_classical_inherits = _dive_for_cls_manager(base) is not None
+
+ if attrname in base.__dict__ and (
+ base is cls
+ or (
+ (base in cls.__bases__ if strict else True)
+ and not _is_classical_inherits
+ )
+ ):
+ return getattr(base, attrname)
+ else:
+ return None
+
+
+def _dive_for_cls_manager(cls: Type[_O]) -> Optional[ClassManager[_O]]:
+ # because the class manager registration is pluggable,
+ # we need to do the search for every class in the hierarchy,
+ # rather than just a simple "cls._sa_class_manager"
+
+ for base in cls.__mro__:
+ manager: Optional[ClassManager[_O]] = attributes.opt_manager_of_class(
+ base
+ )
+ if manager:
+ return manager
+ return None
+
+
+def _as_declarative(
+ registry: _RegistryType, cls: Type[Any], dict_: _ClassDict
+) -> Optional[_MapperConfig]:
+ # declarative scans the class for attributes. no table or mapper
+ # args passed separately.
+ return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
+
+
+def _mapper(
+ registry: _RegistryType,
+ cls: Type[_O],
+ table: Optional[FromClause],
+ mapper_kw: _MapperKwArgs,
+) -> Mapper[_O]:
+ _ImperativeMapperConfig(registry, cls, table, mapper_kw)
+ return cast("MappedClassProtocol[_O]", cls).__mapper__
+
+
+@util.preload_module("sqlalchemy.orm.decl_api")
+def _is_declarative_props(obj: Any) -> bool:
+ _declared_attr_common = util.preloaded.orm_decl_api._declared_attr_common
+
+ return isinstance(obj, (_declared_attr_common, util.classproperty))
+
+
+def _check_declared_props_nocascade(
+ obj: Any, name: str, cls: Type[_O]
+) -> bool:
+ if _is_declarative_props(obj):
+ if getattr(obj, "_cascading", False):
+ util.warn(
+ "@declared_attr.cascading is not supported on the %s "
+ "attribute on class %s. This attribute invokes for "
+ "subclasses in any case." % (name, cls)
+ )
+ return True
+ else:
+ return False
+
+
+class _MapperConfig:
+ __slots__ = (
+ "cls",
+ "classname",
+ "properties",
+ "declared_attr_reg",
+ "__weakref__",
+ )
+
+ cls: Type[Any]
+ classname: str
+ properties: util.OrderedDict[
+ str,
+ Union[
+ Sequence[NamedColumn[Any]], NamedColumn[Any], MapperProperty[Any]
+ ],
+ ]
+ declared_attr_reg: Dict[declared_attr[Any], Any]
+
+ @classmethod
+ def setup_mapping(
+ cls,
+ registry: _RegistryType,
+ cls_: Type[_O],
+ dict_: _ClassDict,
+ table: Optional[FromClause],
+ mapper_kw: _MapperKwArgs,
+ ) -> Optional[_MapperConfig]:
+ manager = attributes.opt_manager_of_class(cls)
+ if manager and manager.class_ is cls_:
+ raise exc.InvalidRequestError(
+ f"Class {cls!r} already has been instrumented declaratively"
+ )
+
+ if cls_.__dict__.get("__abstract__", False):
+ return None
+
+ defer_map = _get_immediate_cls_attr(
+ cls_, "_sa_decl_prepare_nocascade", strict=True
+ ) or hasattr(cls_, "_sa_decl_prepare")
+
+ if defer_map:
+ return _DeferredMapperConfig(
+ registry, cls_, dict_, table, mapper_kw
+ )
+ else:
+ return _ClassScanMapperConfig(
+ registry, cls_, dict_, table, mapper_kw
+ )
+
+ def __init__(
+ self,
+ registry: _RegistryType,
+ cls_: Type[Any],
+ mapper_kw: _MapperKwArgs,
+ ):
+ self.cls = util.assert_arg_type(cls_, type, "cls_")
+ self.classname = cls_.__name__
+ self.properties = util.OrderedDict()
+ self.declared_attr_reg = {}
+
+ if not mapper_kw.get("non_primary", False):
+ instrumentation.register_class(
+ self.cls,
+ finalize=False,
+ registry=registry,
+ declarative_scan=self,
+ init_method=registry.constructor,
+ )
+ else:
+ manager = attributes.opt_manager_of_class(self.cls)
+ if not manager or not manager.is_mapped:
+ raise exc.InvalidRequestError(
+ "Class %s has no primary mapper configured. Configure "
+ "a primary mapper first before setting up a non primary "
+ "Mapper." % self.cls
+ )
+
+ def set_cls_attribute(self, attrname: str, value: _T) -> _T:
+ manager = instrumentation.manager_of_class(self.cls)
+ manager.install_member(attrname, value)
+ return value
+
+ def map(self, mapper_kw: _MapperKwArgs = ...) -> Mapper[Any]:
+ raise NotImplementedError()
+
+ def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
+ self.map(mapper_kw)
+
+
+class _ImperativeMapperConfig(_MapperConfig):
+ __slots__ = ("local_table", "inherits")
+
+ def __init__(
+ self,
+ registry: _RegistryType,
+ cls_: Type[_O],
+ table: Optional[FromClause],
+ mapper_kw: _MapperKwArgs,
+ ):
+ super().__init__(registry, cls_, mapper_kw)
+
+ self.local_table = self.set_cls_attribute("__table__", table)
+
+ with mapperlib._CONFIGURE_MUTEX:
+ if not mapper_kw.get("non_primary", False):
+ clsregistry.add_class(
+ self.classname, self.cls, registry._class_registry
+ )
+
+ self._setup_inheritance(mapper_kw)
+
+ self._early_mapping(mapper_kw)
+
+ def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
+ mapper_cls = Mapper
+
+ return self.set_cls_attribute(
+ "__mapper__",
+ mapper_cls(self.cls, self.local_table, **mapper_kw),
+ )
+
+ def _setup_inheritance(self, mapper_kw: _MapperKwArgs) -> None:
+ cls = self.cls
+
+ inherits = mapper_kw.get("inherits", None)
+
+ if inherits is None:
+ # since we search for classical mappings now, search for
+ # multiple mapped bases as well and raise an error.
+ inherits_search = []
+ for base_ in cls.__bases__:
+ c = _resolve_for_abstract_or_classical(base_)
+ if c is None:
+ continue
+
+ if _is_supercls_for_inherits(c) and c not in inherits_search:
+ inherits_search.append(c)
+
+ if inherits_search:
+ if len(inherits_search) > 1:
+ raise exc.InvalidRequestError(
+ "Class %s has multiple mapped bases: %r"
+ % (cls, inherits_search)
+ )
+ inherits = inherits_search[0]
+ elif isinstance(inherits, Mapper):
+ inherits = inherits.class_
+
+ self.inherits = inherits
+
+
+class _CollectedAnnotation(NamedTuple):
+ raw_annotation: _AnnotationScanType
+ mapped_container: Optional[Type[Mapped[Any]]]
+ extracted_mapped_annotation: Union[Type[Any], str]
+ is_dataclass: bool
+ attr_value: Any
+ originating_module: str
+ originating_class: Type[Any]
+
+
+class _ClassScanMapperConfig(_MapperConfig):
+ __slots__ = (
+ "registry",
+ "clsdict_view",
+ "collected_attributes",
+ "collected_annotations",
+ "local_table",
+ "persist_selectable",
+ "declared_columns",
+ "column_ordering",
+ "column_copies",
+ "table_args",
+ "tablename",
+ "mapper_args",
+ "mapper_args_fn",
+ "inherits",
+ "single",
+ "allow_dataclass_fields",
+ "dataclass_setup_arguments",
+ "is_dataclass_prior_to_mapping",
+ "allow_unmapped_annotations",
+ )
+
+ is_deferred = False
+ registry: _RegistryType
+ clsdict_view: _ClassDict
+ collected_annotations: Dict[str, _CollectedAnnotation]
+ collected_attributes: Dict[str, Any]
+ local_table: Optional[FromClause]
+ persist_selectable: Optional[FromClause]
+ declared_columns: util.OrderedSet[Column[Any]]
+ column_ordering: Dict[Column[Any], int]
+ column_copies: Dict[
+ Union[MappedColumn[Any], Column[Any]],
+ Union[MappedColumn[Any], Column[Any]],
+ ]
+ tablename: Optional[str]
+ mapper_args: Mapping[str, Any]
+ table_args: Optional[_TableArgsType]
+ mapper_args_fn: Optional[Callable[[], Dict[str, Any]]]
+ inherits: Optional[Type[Any]]
+ single: bool
+
+ is_dataclass_prior_to_mapping: bool
+ allow_unmapped_annotations: bool
+
+ dataclass_setup_arguments: Optional[_DataclassArguments]
+ """if the class has SQLAlchemy native dataclass parameters, where
+ we will turn the class into a dataclass within the declarative mapping
+ process.
+
+ """
+
+ allow_dataclass_fields: bool
+ """if true, look for dataclass-processed Field objects on the target
+ class as well as superclasses and extract ORM mapping directives from
+ the "metadata" attribute of each Field.
+
+ if False, dataclass fields can still be used, however they won't be
+ mapped.
+
+ """
+
+ def __init__(
+ self,
+ registry: _RegistryType,
+ cls_: Type[_O],
+ dict_: _ClassDict,
+ table: Optional[FromClause],
+ mapper_kw: _MapperKwArgs,
+ ):
+ # grab class dict before the instrumentation manager has been added.
+ # reduces cycles
+ self.clsdict_view = (
+ util.immutabledict(dict_) if dict_ else util.EMPTY_DICT
+ )
+ super().__init__(registry, cls_, mapper_kw)
+ self.registry = registry
+ self.persist_selectable = None
+
+ self.collected_attributes = {}
+ self.collected_annotations = {}
+ self.declared_columns = util.OrderedSet()
+ self.column_ordering = {}
+ self.column_copies = {}
+ self.single = False
+ self.dataclass_setup_arguments = dca = getattr(
+ self.cls, "_sa_apply_dc_transforms", None
+ )
+
+ self.allow_unmapped_annotations = getattr(
+ self.cls, "__allow_unmapped__", False
+ ) or bool(self.dataclass_setup_arguments)
+
+ self.is_dataclass_prior_to_mapping = cld = dataclasses.is_dataclass(
+ cls_
+ )
+
+ sdk = _get_immediate_cls_attr(cls_, "__sa_dataclass_metadata_key__")
+
+ # we don't want to consume Field objects from a not-already-dataclass.
+ # the Field objects won't have their "name" or "type" populated,
+ # and while it seems like we could just set these on Field as we
+ # read them, Field is documented as "user read only" and we need to
+ # stay far away from any off-label use of dataclasses APIs.
+ if (not cld or dca) and sdk:
+ raise exc.InvalidRequestError(
+ "SQLAlchemy mapped dataclasses can't consume mapping "
+ "information from dataclass.Field() objects if the immediate "
+ "class is not already a dataclass."
+ )
+
+ # if already a dataclass, and __sa_dataclass_metadata_key__ present,
+ # then also look inside of dataclass.Field() objects yielded by
+ # dataclasses.get_fields(cls) when scanning for attributes
+ self.allow_dataclass_fields = bool(sdk and cld)
+
+ self._setup_declared_events()
+
+ self._scan_attributes()
+
+ self._setup_dataclasses_transforms()
+
+ with mapperlib._CONFIGURE_MUTEX:
+ clsregistry.add_class(
+ self.classname, self.cls, registry._class_registry
+ )
+
+ self._setup_inheriting_mapper(mapper_kw)
+
+ self._extract_mappable_attributes()
+
+ self._extract_declared_columns()
+
+ self._setup_table(table)
+
+ self._setup_inheriting_columns(mapper_kw)
+
+ self._early_mapping(mapper_kw)
+
+ def _setup_declared_events(self) -> None:
+ if _get_immediate_cls_attr(self.cls, "__declare_last__"):
+
+ @event.listens_for(Mapper, "after_configured")
+ def after_configured() -> None:
+ cast(
+ "_DeclMappedClassProtocol[Any]", self.cls
+ ).__declare_last__()
+
+ if _get_immediate_cls_attr(self.cls, "__declare_first__"):
+
+ @event.listens_for(Mapper, "before_configured")
+ def before_configured() -> None:
+ cast(
+ "_DeclMappedClassProtocol[Any]", self.cls
+ ).__declare_first__()
+
+ def _cls_attr_override_checker(
+ self, cls: Type[_O]
+ ) -> Callable[[str, Any], bool]:
+ """Produce a function that checks if a class has overridden an
+ attribute, taking SQLAlchemy-enabled dataclass fields into account.
+
+ """
+
+ if self.allow_dataclass_fields:
+ sa_dataclass_metadata_key = _get_immediate_cls_attr(
+ cls, "__sa_dataclass_metadata_key__"
+ )
+ else:
+ sa_dataclass_metadata_key = None
+
+ if not sa_dataclass_metadata_key:
+
+ def attribute_is_overridden(key: str, obj: Any) -> bool:
+ return getattr(cls, key, obj) is not obj
+
+ else:
+ all_datacls_fields = {
+ f.name: f.metadata[sa_dataclass_metadata_key]
+ for f in util.dataclass_fields(cls)
+ if sa_dataclass_metadata_key in f.metadata
+ }
+ local_datacls_fields = {
+ f.name: f.metadata[sa_dataclass_metadata_key]
+ for f in util.local_dataclass_fields(cls)
+ if sa_dataclass_metadata_key in f.metadata
+ }
+
+ absent = object()
+
+ def attribute_is_overridden(key: str, obj: Any) -> bool:
+ if _is_declarative_props(obj):
+ obj = obj.fget
+
+ # this function likely has some failure modes still if
+ # someone is doing a deep mixing of the same attribute
+ # name as plain Python attribute vs. dataclass field.
+
+ ret = local_datacls_fields.get(key, absent)
+ if _is_declarative_props(ret):
+ ret = ret.fget
+
+ if ret is obj:
+ return False
+ elif ret is not absent:
+ return True
+
+ all_field = all_datacls_fields.get(key, absent)
+
+ ret = getattr(cls, key, obj)
+
+ if ret is obj:
+ return False
+
+ # for dataclasses, this could be the
+ # 'default' of the field. so filter more specifically
+ # for an already-mapped InstrumentedAttribute
+ if ret is not absent and isinstance(
+ ret, InstrumentedAttribute
+ ):
+ return True
+
+ if all_field is obj:
+ return False
+ elif all_field is not absent:
+ return True
+
+ # can't find another attribute
+ return False
+
+ return attribute_is_overridden
+
+ _include_dunders = {
+ "__table__",
+ "__mapper_args__",
+ "__tablename__",
+ "__table_args__",
+ }
+
+ _match_exclude_dunders = re.compile(r"^(?:_sa_|__)")
+
+ def _cls_attr_resolver(
+ self, cls: Type[Any]
+ ) -> Callable[[], Iterable[Tuple[str, Any, Any, bool]]]:
+ """produce a function to iterate the "attributes" of a class
+ which we want to consider for mapping, adjusting for SQLAlchemy fields
+ embedded in dataclass fields.
+
+ """
+ cls_annotations = util.get_annotations(cls)
+
+ cls_vars = vars(cls)
+
+ _include_dunders = self._include_dunders
+ _match_exclude_dunders = self._match_exclude_dunders
+
+ names = [
+ n
+ for n in util.merge_lists_w_ordering(
+ list(cls_vars), list(cls_annotations)
+ )
+ if not _match_exclude_dunders.match(n) or n in _include_dunders
+ ]
+
+ if self.allow_dataclass_fields:
+ sa_dataclass_metadata_key: Optional[str] = _get_immediate_cls_attr(
+ cls, "__sa_dataclass_metadata_key__"
+ )
+ else:
+ sa_dataclass_metadata_key = None
+
+ if not sa_dataclass_metadata_key:
+
+ def local_attributes_for_class() -> (
+ Iterable[Tuple[str, Any, Any, bool]]
+ ):
+ return (
+ (
+ name,
+ cls_vars.get(name),
+ cls_annotations.get(name),
+ False,
+ )
+ for name in names
+ )
+
+ else:
+ dataclass_fields = {
+ field.name: field for field in util.local_dataclass_fields(cls)
+ }
+
+ fixed_sa_dataclass_metadata_key = sa_dataclass_metadata_key
+
+ def local_attributes_for_class() -> (
+ Iterable[Tuple[str, Any, Any, bool]]
+ ):
+ for name in names:
+ field = dataclass_fields.get(name, None)
+ if field and sa_dataclass_metadata_key in field.metadata:
+ yield field.name, _as_dc_declaredattr(
+ field.metadata, fixed_sa_dataclass_metadata_key
+ ), cls_annotations.get(field.name), True
+ else:
+ yield name, cls_vars.get(name), cls_annotations.get(
+ name
+ ), False
+
+ return local_attributes_for_class
+
+ def _scan_attributes(self) -> None:
+ cls = self.cls
+
+ cls_as_Decl = cast("_DeclMappedClassProtocol[Any]", cls)
+
+ clsdict_view = self.clsdict_view
+ collected_attributes = self.collected_attributes
+ column_copies = self.column_copies
+ _include_dunders = self._include_dunders
+ mapper_args_fn = None
+ table_args = inherited_table_args = None
+
+ tablename = None
+ fixed_table = "__table__" in clsdict_view
+
+ attribute_is_overridden = self._cls_attr_override_checker(self.cls)
+
+ bases = []
+
+ for base in cls.__mro__:
+ # collect bases and make sure standalone columns are copied
+ # to be the column they will ultimately be on the class,
+ # so that declared_attr functions use the right columns.
+ # need to do this all the way up the hierarchy first
+ # (see #8190)
+
+ class_mapped = base is not cls and _is_supercls_for_inherits(base)
+
+ local_attributes_for_class = self._cls_attr_resolver(base)
+
+ if not class_mapped and base is not cls:
+ locally_collected_columns = self._produce_column_copies(
+ local_attributes_for_class,
+ attribute_is_overridden,
+ fixed_table,
+ base,
+ )
+ else:
+ locally_collected_columns = {}
+
+ bases.append(
+ (
+ base,
+ class_mapped,
+ local_attributes_for_class,
+ locally_collected_columns,
+ )
+ )
+
+ for (
+ base,
+ class_mapped,
+ local_attributes_for_class,
+ locally_collected_columns,
+ ) in bases:
+ # this transfer can also take place as we scan each name
+ # for finer-grained control of how collected_attributes is
+ # populated, as this is what impacts column ordering.
+ # however it's simpler to get it out of the way here.
+ collected_attributes.update(locally_collected_columns)
+
+ for (
+ name,
+ obj,
+ annotation,
+ is_dataclass_field,
+ ) in local_attributes_for_class():
+ if name in _include_dunders:
+ if name == "__mapper_args__":
+ check_decl = _check_declared_props_nocascade(
+ obj, name, cls
+ )
+ if not mapper_args_fn and (
+ not class_mapped or check_decl
+ ):
+ # don't even invoke __mapper_args__ until
+ # after we've determined everything about the
+ # mapped table.
+ # make a copy of it so a class-level dictionary
+ # is not overwritten when we update column-based
+ # arguments.
+ def _mapper_args_fn() -> Dict[str, Any]:
+ return dict(cls_as_Decl.__mapper_args__)
+
+ mapper_args_fn = _mapper_args_fn
+
+ elif name == "__tablename__":
+ check_decl = _check_declared_props_nocascade(
+ obj, name, cls
+ )
+ if not tablename and (not class_mapped or check_decl):
+ tablename = cls_as_Decl.__tablename__
+ elif name == "__table_args__":
+ check_decl = _check_declared_props_nocascade(
+ obj, name, cls
+ )
+ if not table_args and (not class_mapped or check_decl):
+ table_args = cls_as_Decl.__table_args__
+ if not isinstance(
+ table_args, (tuple, dict, type(None))
+ ):
+ raise exc.ArgumentError(
+ "__table_args__ value must be a tuple, "
+ "dict, or None"
+ )
+ if base is not cls:
+ inherited_table_args = True
+ else:
+ # skip all other dunder names, which at the moment
+ # should only be __table__
+ continue
+ elif class_mapped:
+ if _is_declarative_props(obj) and not obj._quiet:
+ util.warn(
+ "Regular (i.e. not __special__) "
+ "attribute '%s.%s' uses @declared_attr, "
+ "but owning class %s is mapped - "
+ "not applying to subclass %s."
+ % (base.__name__, name, base, cls)
+ )
+
+ continue
+ elif base is not cls:
+ # we're a mixin, abstract base, or something that is
+ # acting like that for now.
+
+ if isinstance(obj, (Column, MappedColumn)):
+ # already copied columns to the mapped class.
+ continue
+ elif isinstance(obj, MapperProperty):
+ raise exc.InvalidRequestError(
+ "Mapper properties (i.e. deferred,"
+ "column_property(), relationship(), etc.) must "
+ "be declared as @declared_attr callables "
+ "on declarative mixin classes. For dataclass "
+ "field() objects, use a lambda:"
+ )
+ elif _is_declarative_props(obj):
+ # tried to get overloads to tell this to
+ # pylance, no luck
+ assert obj is not None
+
+ if obj._cascading:
+ if name in clsdict_view:
+ # unfortunately, while we can use the user-
+ # defined attribute here to allow a clean
+ # override, if there's another
+ # subclass below then it still tries to use
+ # this. not sure if there is enough
+ # information here to add this as a feature
+ # later on.
+ util.warn(
+ "Attribute '%s' on class %s cannot be "
+ "processed due to "
+ "@declared_attr.cascading; "
+ "skipping" % (name, cls)
+ )
+ collected_attributes[name] = column_copies[obj] = (
+ ret
+ ) = obj.__get__(obj, cls)
+ setattr(cls, name, ret)
+ else:
+ if is_dataclass_field:
+ # access attribute using normal class access
+ # first, to see if it's been mapped on a
+ # superclass. note if the dataclasses.field()
+ # has "default", this value can be anything.
+ ret = getattr(cls, name, None)
+
+ # so, if it's anything that's not ORM
+ # mapped, assume we should invoke the
+ # declared_attr
+ if not isinstance(ret, InspectionAttr):
+ ret = obj.fget()
+ else:
+ # access attribute using normal class access.
+ # if the declared attr already took place
+ # on a superclass that is mapped, then
+ # this is no longer a declared_attr, it will
+ # be the InstrumentedAttribute
+ ret = getattr(cls, name)
+
+ # correct for proxies created from hybrid_property
+ # or similar. note there is no known case that
+ # produces nested proxies, so we are only
+ # looking one level deep right now.
+
+ if (
+ isinstance(ret, InspectionAttr)
+ and attr_is_internal_proxy(ret)
+ and not isinstance(
+ ret.original_property, MapperProperty
+ )
+ ):
+ ret = ret.descriptor
+
+ collected_attributes[name] = column_copies[obj] = (
+ ret
+ )
+
+ if (
+ isinstance(ret, (Column, MapperProperty))
+ and ret.doc is None
+ ):
+ ret.doc = obj.__doc__
+
+ self._collect_annotation(
+ name,
+ obj._collect_return_annotation(),
+ base,
+ True,
+ obj,
+ )
+ elif _is_mapped_annotation(annotation, cls, base):
+ # Mapped annotation without any object.
+ # product_column_copies should have handled this.
+ # if future support for other MapperProperty,
+ # then test if this name is already handled and
+ # otherwise proceed to generate.
+ if not fixed_table:
+ assert (
+ name in collected_attributes
+ or attribute_is_overridden(name, None)
+ )
+ continue
+ else:
+ # here, the attribute is some other kind of
+ # property that we assume is not part of the
+ # declarative mapping. however, check for some
+ # more common mistakes
+ self._warn_for_decl_attributes(base, name, obj)
+ elif is_dataclass_field and (
+ name not in clsdict_view or clsdict_view[name] is not obj
+ ):
+ # here, we are definitely looking at the target class
+ # and not a superclass. this is currently a
+ # dataclass-only path. if the name is only
+ # a dataclass field and isn't in local cls.__dict__,
+ # put the object there.
+ # assert that the dataclass-enabled resolver agrees
+ # with what we are seeing
+
+ assert not attribute_is_overridden(name, obj)
+
+ if _is_declarative_props(obj):
+ obj = obj.fget()
+
+ collected_attributes[name] = obj
+ self._collect_annotation(
+ name, annotation, base, False, obj
+ )
+ else:
+ collected_annotation = self._collect_annotation(
+ name, annotation, base, None, obj
+ )
+ is_mapped = (
+ collected_annotation is not None
+ and collected_annotation.mapped_container is not None
+ )
+ generated_obj = (
+ collected_annotation.attr_value
+ if collected_annotation is not None
+ else obj
+ )
+ if obj is None and not fixed_table and is_mapped:
+ collected_attributes[name] = (
+ generated_obj
+ if generated_obj is not None
+ else MappedColumn()
+ )
+ elif name in clsdict_view:
+ collected_attributes[name] = obj
+ # else if the name is not in the cls.__dict__,
+ # don't collect it as an attribute.
+ # we will see the annotation only, which is meaningful
+ # both for mapping and dataclasses setup
+
+ if inherited_table_args and not tablename:
+ table_args = None
+
+ self.table_args = table_args
+ self.tablename = tablename
+ self.mapper_args_fn = mapper_args_fn
+
+ def _setup_dataclasses_transforms(self) -> None:
+ dataclass_setup_arguments = self.dataclass_setup_arguments
+ if not dataclass_setup_arguments:
+ return
+
+ # can't use is_dataclass since it uses hasattr
+ if "__dataclass_fields__" in self.cls.__dict__:
+ raise exc.InvalidRequestError(
+ f"Class {self.cls} is already a dataclass; ensure that "
+ "base classes / decorator styles of establishing dataclasses "
+ "are not being mixed. "
+ "This can happen if a class that inherits from "
+ "'MappedAsDataclass', even indirectly, is been mapped with "
+ "'@registry.mapped_as_dataclass'"
+ )
+
+ warn_for_non_dc_attrs = collections.defaultdict(list)
+
+ def _allow_dataclass_field(
+ key: str, originating_class: Type[Any]
+ ) -> bool:
+ if (
+ originating_class is not self.cls
+ and "__dataclass_fields__" not in originating_class.__dict__
+ ):
+ warn_for_non_dc_attrs[originating_class].append(key)
+
+ return True
+
+ manager = instrumentation.manager_of_class(self.cls)
+ assert manager is not None
+
+ field_list = [
+ _AttributeOptions._get_arguments_for_make_dataclass(
+ key,
+ anno,
+ mapped_container,
+ self.collected_attributes.get(key, _NoArg.NO_ARG),
+ )
+ for key, anno, mapped_container in (
+ (
+ key,
+ mapped_anno if mapped_anno else raw_anno,
+ mapped_container,
+ )
+ for key, (
+ raw_anno,
+ mapped_container,
+ mapped_anno,
+ is_dc,
+ attr_value,
+ originating_module,
+ originating_class,
+ ) in self.collected_annotations.items()
+ if _allow_dataclass_field(key, originating_class)
+ and (
+ key not in self.collected_attributes
+ # issue #9226; check for attributes that we've collected
+ # which are already instrumented, which we would assume
+ # mean we are in an ORM inheritance mapping and this
+ # attribute is already mapped on the superclass. Under
+ # no circumstance should any QueryableAttribute be sent to
+ # the dataclass() function; anything that's mapped should
+ # be Field and that's it
+ or not isinstance(
+ self.collected_attributes[key], QueryableAttribute
+ )
+ )
+ )
+ ]
+
+ if warn_for_non_dc_attrs:
+ for (
+ originating_class,
+ non_dc_attrs,
+ ) in warn_for_non_dc_attrs.items():
+ util.warn_deprecated(
+ f"When transforming {self.cls} to a dataclass, "
+ f"attribute(s) "
+ f"{', '.join(repr(key) for key in non_dc_attrs)} "
+ f"originates from superclass "
+ f"{originating_class}, which is not a dataclass. This "
+ f"usage is deprecated and will raise an error in "
+ f"SQLAlchemy 2.1. When declaring SQLAlchemy Declarative "
+ f"Dataclasses, ensure that all mixin classes and other "
+ f"superclasses which include attributes are also a "
+ f"subclass of MappedAsDataclass.",
+ "2.0",
+ code="dcmx",
+ )
+
+ annotations = {}
+ defaults = {}
+ for item in field_list:
+ if len(item) == 2:
+ name, tp = item
+ elif len(item) == 3:
+ name, tp, spec = item
+ defaults[name] = spec
+ else:
+ assert False
+ annotations[name] = tp
+
+ for k, v in defaults.items():
+ setattr(self.cls, k, v)
+
+ self._apply_dataclasses_to_any_class(
+ dataclass_setup_arguments, self.cls, annotations
+ )
+
+ @classmethod
+ def _update_annotations_for_non_mapped_class(
+ cls, klass: Type[_O]
+ ) -> Mapping[str, _AnnotationScanType]:
+ cls_annotations = util.get_annotations(klass)
+
+ new_anno = {}
+ for name, annotation in cls_annotations.items():
+ if _is_mapped_annotation(annotation, klass, klass):
+ extracted = _extract_mapped_subtype(
+ annotation,
+ klass,
+ klass.__module__,
+ name,
+ type(None),
+ required=False,
+ is_dataclass_field=False,
+ expect_mapped=False,
+ )
+ if extracted:
+ inner, _ = extracted
+ new_anno[name] = inner
+ else:
+ new_anno[name] = annotation
+ return new_anno
+
+ @classmethod
+ def _apply_dataclasses_to_any_class(
+ cls,
+ dataclass_setup_arguments: _DataclassArguments,
+ klass: Type[_O],
+ use_annotations: Mapping[str, _AnnotationScanType],
+ ) -> None:
+ cls._assert_dc_arguments(dataclass_setup_arguments)
+
+ dataclass_callable = dataclass_setup_arguments["dataclass_callable"]
+ if dataclass_callable is _NoArg.NO_ARG:
+ dataclass_callable = dataclasses.dataclass
+
+ restored: Optional[Any]
+
+ if use_annotations:
+ # apply constructed annotations that should look "normal" to a
+ # dataclasses callable, based on the fields present. This
+ # means remove the Mapped[] container and ensure all Field
+ # entries have an annotation
+ restored = getattr(klass, "__annotations__", None)
+ klass.__annotations__ = cast("Dict[str, Any]", use_annotations)
+ else:
+ restored = None
+
+ try:
+ dataclass_callable(
+ klass,
+ **{
+ k: v
+ for k, v in dataclass_setup_arguments.items()
+ if v is not _NoArg.NO_ARG and k != "dataclass_callable"
+ },
+ )
+ except (TypeError, ValueError) as ex:
+ raise exc.InvalidRequestError(
+ f"Python dataclasses error encountered when creating "
+ f"dataclass for {klass.__name__!r}: "
+ f"{ex!r}. Please refer to Python dataclasses "
+ "documentation for additional information.",
+ code="dcte",
+ ) from ex
+ finally:
+ # restore original annotations outside of the dataclasses
+ # process; for mixins and __abstract__ superclasses, SQLAlchemy
+ # Declarative will need to see the Mapped[] container inside the
+ # annotations in order to map subclasses
+ if use_annotations:
+ if restored is None:
+ del klass.__annotations__
+ else:
+ klass.__annotations__ = restored
+
+ @classmethod
+ def _assert_dc_arguments(cls, arguments: _DataclassArguments) -> None:
+ allowed = {
+ "init",
+ "repr",
+ "order",
+ "eq",
+ "unsafe_hash",
+ "kw_only",
+ "match_args",
+ "dataclass_callable",
+ }
+ disallowed_args = set(arguments).difference(allowed)
+ if disallowed_args:
+ msg = ", ".join(f"{arg!r}" for arg in sorted(disallowed_args))
+ raise exc.ArgumentError(
+ f"Dataclass argument(s) {msg} are not accepted"
+ )
+
+ def _collect_annotation(
+ self,
+ name: str,
+ raw_annotation: _AnnotationScanType,
+ originating_class: Type[Any],
+ expect_mapped: Optional[bool],
+ attr_value: Any,
+ ) -> Optional[_CollectedAnnotation]:
+ if name in self.collected_annotations:
+ return self.collected_annotations[name]
+
+ if raw_annotation is None:
+ return None
+
+ is_dataclass = self.is_dataclass_prior_to_mapping
+ allow_unmapped = self.allow_unmapped_annotations
+
+ if expect_mapped is None:
+ is_dataclass_field = isinstance(attr_value, dataclasses.Field)
+ expect_mapped = (
+ not is_dataclass_field
+ and not allow_unmapped
+ and (
+ attr_value is None
+ or isinstance(attr_value, _MappedAttribute)
+ )
+ )
+ else:
+ is_dataclass_field = False
+
+ is_dataclass_field = False
+ extracted = _extract_mapped_subtype(
+ raw_annotation,
+ self.cls,
+ originating_class.__module__,
+ name,
+ type(attr_value),
+ required=False,
+ is_dataclass_field=is_dataclass_field,
+ expect_mapped=expect_mapped
+ and not is_dataclass, # self.allow_dataclass_fields,
+ )
+
+ if extracted is None:
+ # ClassVar can come out here
+ return None
+
+ extracted_mapped_annotation, mapped_container = extracted
+
+ if attr_value is None and not is_literal(extracted_mapped_annotation):
+ for elem in typing_get_args(extracted_mapped_annotation):
+ if isinstance(elem, str) or is_fwd_ref(
+ elem, check_generic=True
+ ):
+ elem = de_stringify_annotation(
+ self.cls,
+ elem,
+ originating_class.__module__,
+ include_generic=True,
+ )
+ # look in Annotated[...] for an ORM construct,
+ # such as Annotated[int, mapped_column(primary_key=True)]
+ if isinstance(elem, _IntrospectsAnnotations):
+ attr_value = elem.found_in_pep593_annotated()
+
+ self.collected_annotations[name] = ca = _CollectedAnnotation(
+ raw_annotation,
+ mapped_container,
+ extracted_mapped_annotation,
+ is_dataclass,
+ attr_value,
+ originating_class.__module__,
+ originating_class,
+ )
+ return ca
+
+ def _warn_for_decl_attributes(
+ self, cls: Type[Any], key: str, c: Any
+ ) -> None:
+ if isinstance(c, expression.ColumnElement):
+ util.warn(
+ f"Attribute '{key}' on class {cls} appears to "
+ "be a non-schema SQLAlchemy expression "
+ "object; this won't be part of the declarative mapping. "
+ "To map arbitrary expressions, use ``column_property()`` "
+ "or a similar function such as ``deferred()``, "
+ "``query_expression()`` etc. "
+ )
+
+ def _produce_column_copies(
+ self,
+ attributes_for_class: Callable[
+ [], Iterable[Tuple[str, Any, Any, bool]]
+ ],
+ attribute_is_overridden: Callable[[str, Any], bool],
+ fixed_table: bool,
+ originating_class: Type[Any],
+ ) -> Dict[str, Union[Column[Any], MappedColumn[Any]]]:
+ cls = self.cls
+ dict_ = self.clsdict_view
+ locally_collected_attributes = {}
+ column_copies = self.column_copies
+ # copy mixin columns to the mapped class
+
+ for name, obj, annotation, is_dataclass in attributes_for_class():
+ if (
+ not fixed_table
+ and obj is None
+ and _is_mapped_annotation(annotation, cls, originating_class)
+ ):
+ # obj is None means this is the annotation only path
+
+ if attribute_is_overridden(name, obj):
+ # perform same "overridden" check as we do for
+ # Column/MappedColumn, this is how a mixin col is not
+ # applied to an inherited subclass that does not have
+ # the mixin. the anno-only path added here for
+ # #9564
+ continue
+
+ collected_annotation = self._collect_annotation(
+ name, annotation, originating_class, True, obj
+ )
+ obj = (
+ collected_annotation.attr_value
+ if collected_annotation is not None
+ else obj
+ )
+ if obj is None:
+ obj = MappedColumn()
+
+ locally_collected_attributes[name] = obj
+ setattr(cls, name, obj)
+
+ elif isinstance(obj, (Column, MappedColumn)):
+ if attribute_is_overridden(name, obj):
+ # if column has been overridden
+ # (like by the InstrumentedAttribute of the
+ # superclass), skip. don't collect the annotation
+ # either (issue #8718)
+ continue
+
+ collected_annotation = self._collect_annotation(
+ name, annotation, originating_class, True, obj
+ )
+ obj = (
+ collected_annotation.attr_value
+ if collected_annotation is not None
+ else obj
+ )
+
+ if name not in dict_ and not (
+ "__table__" in dict_
+ and (getattr(obj, "name", None) or name)
+ in dict_["__table__"].c
+ ):
+ if obj.foreign_keys:
+ for fk in obj.foreign_keys:
+ if (
+ fk._table_column is not None
+ and fk._table_column.table is None
+ ):
+ raise exc.InvalidRequestError(
+ "Columns with foreign keys to "
+ "non-table-bound "
+ "columns must be declared as "
+ "@declared_attr callables "
+ "on declarative mixin classes. "
+ "For dataclass "
+ "field() objects, use a lambda:."
+ )
+
+ column_copies[obj] = copy_ = obj._copy()
+
+ locally_collected_attributes[name] = copy_
+ setattr(cls, name, copy_)
+
+ return locally_collected_attributes
+
+ def _extract_mappable_attributes(self) -> None:
+ cls = self.cls
+ collected_attributes = self.collected_attributes
+
+ our_stuff = self.properties
+
+ _include_dunders = self._include_dunders
+
+ late_mapped = _get_immediate_cls_attr(
+ cls, "_sa_decl_prepare_nocascade", strict=True
+ )
+
+ allow_unmapped_annotations = self.allow_unmapped_annotations
+ expect_annotations_wo_mapped = (
+ allow_unmapped_annotations or self.is_dataclass_prior_to_mapping
+ )
+
+ look_for_dataclass_things = bool(self.dataclass_setup_arguments)
+
+ for k in list(collected_attributes):
+ if k in _include_dunders:
+ continue
+
+ value = collected_attributes[k]
+
+ if _is_declarative_props(value):
+ # @declared_attr in collected_attributes only occurs here for a
+ # @declared_attr that's directly on the mapped class;
+ # for a mixin, these have already been evaluated
+ if value._cascading:
+ util.warn(
+ "Use of @declared_attr.cascading only applies to "
+ "Declarative 'mixin' and 'abstract' classes. "
+ "Currently, this flag is ignored on mapped class "
+ "%s" % self.cls
+ )
+
+ value = getattr(cls, k)
+
+ elif (
+ isinstance(value, QueryableAttribute)
+ and value.class_ is not cls
+ and value.key != k
+ ):
+ # detect a QueryableAttribute that's already mapped being
+ # assigned elsewhere in userland, turn into a synonym()
+ value = SynonymProperty(value.key)
+ setattr(cls, k, value)
+
+ if (
+ isinstance(value, tuple)
+ and len(value) == 1
+ and isinstance(value[0], (Column, _MappedAttribute))
+ ):
+ util.warn(
+ "Ignoring declarative-like tuple value of attribute "
+ "'%s': possibly a copy-and-paste error with a comma "
+ "accidentally placed at the end of the line?" % k
+ )
+ continue
+ elif look_for_dataclass_things and isinstance(
+ value, dataclasses.Field
+ ):
+ # we collected a dataclass Field; dataclasses would have
+ # set up the correct state on the class
+ continue
+ elif not isinstance(value, (Column, _DCAttributeOptions)):
+ # using @declared_attr for some object that
+ # isn't Column/MapperProperty/_DCAttributeOptions; remove
+ # from the clsdict_view
+ # and place the evaluated value onto the class.
+ collected_attributes.pop(k)
+ self._warn_for_decl_attributes(cls, k, value)
+ if not late_mapped:
+ setattr(cls, k, value)
+ continue
+ # we expect to see the name 'metadata' in some valid cases;
+ # however at this point we see it's assigned to something trying
+ # to be mapped, so raise for that.
+ # TODO: should "registry" here be also? might be too late
+ # to change that now (2.0 betas)
+ elif k in ("metadata",):
+ raise exc.InvalidRequestError(
+ f"Attribute name '{k}' is reserved when using the "
+ "Declarative API."
+ )
+ elif isinstance(value, Column):
+ _undefer_column_name(
+ k, self.column_copies.get(value, value) # type: ignore
+ )
+ else:
+ if isinstance(value, _IntrospectsAnnotations):
+ (
+ annotation,
+ mapped_container,
+ extracted_mapped_annotation,
+ is_dataclass,
+ attr_value,
+ originating_module,
+ originating_class,
+ ) = self.collected_annotations.get(
+ k, (None, None, None, False, None, None, None)
+ )
+
+ # issue #8692 - don't do any annotation interpretation if
+ # an annotation were present and a container such as
+ # Mapped[] etc. were not used. If annotation is None,
+ # do declarative_scan so that the property can raise
+ # for required
+ if (
+ mapped_container is not None
+ or annotation is None
+ # issue #10516: need to do declarative_scan even with
+ # a non-Mapped annotation if we are doing
+ # __allow_unmapped__, for things like col.name
+ # assignment
+ or allow_unmapped_annotations
+ ):
+ try:
+ value.declarative_scan(
+ self,
+ self.registry,
+ cls,
+ originating_module,
+ k,
+ mapped_container,
+ annotation,
+ extracted_mapped_annotation,
+ is_dataclass,
+ )
+ except NameError as ne:
+ raise exc.ArgumentError(
+ f"Could not resolve all types within mapped "
+ f'annotation: "{annotation}". Ensure all '
+ f"types are written correctly and are "
+ f"imported within the module in use."
+ ) from ne
+ else:
+ # assert that we were expecting annotations
+ # without Mapped[] were going to be passed.
+ # otherwise an error should have been raised
+ # by util._extract_mapped_subtype before we got here.
+ assert expect_annotations_wo_mapped
+
+ if isinstance(value, _DCAttributeOptions):
+ if (
+ value._has_dataclass_arguments
+ and not look_for_dataclass_things
+ ):
+ if isinstance(value, MapperProperty):
+ argnames = [
+ "init",
+ "default_factory",
+ "repr",
+ "default",
+ ]
+ else:
+ argnames = ["init", "default_factory", "repr"]
+
+ args = {
+ a
+ for a in argnames
+ if getattr(
+ value._attribute_options, f"dataclasses_{a}"
+ )
+ is not _NoArg.NO_ARG
+ }
+
+ raise exc.ArgumentError(
+ f"Attribute '{k}' on class {cls} includes "
+ f"dataclasses argument(s): "
+ f"{', '.join(sorted(repr(a) for a in args))} but "
+ f"class does not specify "
+ "SQLAlchemy native dataclass configuration."
+ )
+
+ if not isinstance(value, (MapperProperty, _MapsColumns)):
+ # filter for _DCAttributeOptions objects that aren't
+ # MapperProperty / mapped_column(). Currently this
+ # includes AssociationProxy. pop it from the things
+ # we're going to map and set it up as a descriptor
+ # on the class.
+ collected_attributes.pop(k)
+
+ # Assoc Prox (or other descriptor object that may
+ # use _DCAttributeOptions) is usually here, except if
+ # 1. we're a
+ # dataclass, dataclasses would have removed the
+ # attr here or 2. assoc proxy is coming from a
+ # superclass, we want it to be direct here so it
+ # tracks state or 3. assoc prox comes from
+ # declared_attr, uncommon case
+ setattr(cls, k, value)
+ continue
+
+ our_stuff[k] = value
+
+ def _extract_declared_columns(self) -> None:
+ our_stuff = self.properties
+
+ # extract columns from the class dict
+ declared_columns = self.declared_columns
+ column_ordering = self.column_ordering
+ name_to_prop_key = collections.defaultdict(set)
+
+ for key, c in list(our_stuff.items()):
+ if isinstance(c, _MapsColumns):
+ mp_to_assign = c.mapper_property_to_assign
+ if mp_to_assign:
+ our_stuff[key] = mp_to_assign
+ else:
+ # if no mapper property to assign, this currently means
+ # this is a MappedColumn that will produce a Column for us
+ del our_stuff[key]
+
+ for col, sort_order in c.columns_to_assign:
+ if not isinstance(c, CompositeProperty):
+ name_to_prop_key[col.name].add(key)
+ declared_columns.add(col)
+
+ # we would assert this, however we want the below
+ # warning to take effect instead. See #9630
+ # assert col not in column_ordering
+
+ column_ordering[col] = sort_order
+
+ # if this is a MappedColumn and the attribute key we
+ # have is not what the column has for its key, map the
+ # Column explicitly under the attribute key name.
+ # otherwise, Mapper will map it under the column key.
+ if mp_to_assign is None and key != col.key:
+ our_stuff[key] = col
+ elif isinstance(c, Column):
+ # undefer previously occurred here, and now occurs earlier.
+ # ensure every column we get here has been named
+ assert c.name is not None
+ name_to_prop_key[c.name].add(key)
+ declared_columns.add(c)
+ # if the column is the same name as the key,
+ # remove it from the explicit properties dict.
+ # the normal rules for assigning column-based properties
+ # will take over, including precedence of columns
+ # in multi-column ColumnProperties.
+ if key == c.key:
+ del our_stuff[key]
+
+ for name, keys in name_to_prop_key.items():
+ if len(keys) > 1:
+ util.warn(
+ "On class %r, Column object %r named "
+ "directly multiple times, "
+ "only one will be used: %s. "
+ "Consider using orm.synonym instead"
+ % (self.classname, name, (", ".join(sorted(keys))))
+ )
+
+ def _setup_table(self, table: Optional[FromClause] = None) -> None:
+ cls = self.cls
+ cls_as_Decl = cast("MappedClassProtocol[Any]", cls)
+
+ tablename = self.tablename
+ table_args = self.table_args
+ clsdict_view = self.clsdict_view
+ declared_columns = self.declared_columns
+ column_ordering = self.column_ordering
+
+ manager = attributes.manager_of_class(cls)
+
+ if "__table__" not in clsdict_view and table is None:
+ if hasattr(cls, "__table_cls__"):
+ table_cls = cast(
+ Type[Table],
+ util.unbound_method_to_callable(cls.__table_cls__), # type: ignore # noqa: E501
+ )
+ else:
+ table_cls = Table
+
+ if tablename is not None:
+ args: Tuple[Any, ...] = ()
+ table_kw: Dict[str, Any] = {}
+
+ if table_args:
+ if isinstance(table_args, dict):
+ table_kw = table_args
+ elif isinstance(table_args, tuple):
+ if isinstance(table_args[-1], dict):
+ args, table_kw = table_args[0:-1], table_args[-1]
+ else:
+ args = table_args
+
+ autoload_with = clsdict_view.get("__autoload_with__")
+ if autoload_with:
+ table_kw["autoload_with"] = autoload_with
+
+ autoload = clsdict_view.get("__autoload__")
+ if autoload:
+ table_kw["autoload"] = True
+
+ sorted_columns = sorted(
+ declared_columns,
+ key=lambda c: column_ordering.get(c, 0),
+ )
+ table = self.set_cls_attribute(
+ "__table__",
+ table_cls(
+ tablename,
+ self._metadata_for_cls(manager),
+ *sorted_columns,
+ *args,
+ **table_kw,
+ ),
+ )
+ else:
+ if table is None:
+ table = cls_as_Decl.__table__
+ if declared_columns:
+ for c in declared_columns:
+ if not table.c.contains_column(c):
+ raise exc.ArgumentError(
+ "Can't add additional column %r when "
+ "specifying __table__" % c.key
+ )
+
+ self.local_table = table
+
+ def _metadata_for_cls(self, manager: ClassManager[Any]) -> MetaData:
+ meta: Optional[MetaData] = getattr(self.cls, "metadata", None)
+ if meta is not None:
+ return meta
+ else:
+ return manager.registry.metadata
+
+ def _setup_inheriting_mapper(self, mapper_kw: _MapperKwArgs) -> None:
+ cls = self.cls
+
+ inherits = mapper_kw.get("inherits", None)
+
+ if inherits is None:
+ # since we search for classical mappings now, search for
+ # multiple mapped bases as well and raise an error.
+ inherits_search = []
+ for base_ in cls.__bases__:
+ c = _resolve_for_abstract_or_classical(base_)
+ if c is None:
+ continue
+
+ if _is_supercls_for_inherits(c) and c not in inherits_search:
+ inherits_search.append(c)
+
+ if inherits_search:
+ if len(inherits_search) > 1:
+ raise exc.InvalidRequestError(
+ "Class %s has multiple mapped bases: %r"
+ % (cls, inherits_search)
+ )
+ inherits = inherits_search[0]
+ elif isinstance(inherits, Mapper):
+ inherits = inherits.class_
+
+ self.inherits = inherits
+
+ clsdict_view = self.clsdict_view
+ if "__table__" not in clsdict_view and self.tablename is None:
+ self.single = True
+
+ def _setup_inheriting_columns(self, mapper_kw: _MapperKwArgs) -> None:
+ table = self.local_table
+ cls = self.cls
+ table_args = self.table_args
+ declared_columns = self.declared_columns
+
+ if (
+ table is None
+ and self.inherits is None
+ and not _get_immediate_cls_attr(cls, "__no_table__")
+ ):
+ raise exc.InvalidRequestError(
+ "Class %r does not have a __table__ or __tablename__ "
+ "specified and does not inherit from an existing "
+ "table-mapped class." % cls
+ )
+ elif self.inherits:
+ inherited_mapper_or_config = _declared_mapping_info(self.inherits)
+ assert inherited_mapper_or_config is not None
+ inherited_table = inherited_mapper_or_config.local_table
+ inherited_persist_selectable = (
+ inherited_mapper_or_config.persist_selectable
+ )
+
+ if table is None:
+ # single table inheritance.
+ # ensure no table args
+ if table_args:
+ raise exc.ArgumentError(
+ "Can't place __table_args__ on an inherited class "
+ "with no table."
+ )
+
+ # add any columns declared here to the inherited table.
+ if declared_columns and not isinstance(inherited_table, Table):
+ raise exc.ArgumentError(
+ f"Can't declare columns on single-table-inherited "
+ f"subclass {self.cls}; superclass {self.inherits} "
+ "is not mapped to a Table"
+ )
+
+ for col in declared_columns:
+ assert inherited_table is not None
+ if col.name in inherited_table.c:
+ if inherited_table.c[col.name] is col:
+ continue
+ raise exc.ArgumentError(
+ f"Column '{col}' on class {cls.__name__} "
+ f"conflicts with existing column "
+ f"'{inherited_table.c[col.name]}'. If using "
+ f"Declarative, consider using the "
+ "use_existing_column parameter of mapped_column() "
+ "to resolve conflicts."
+ )
+ if col.primary_key:
+ raise exc.ArgumentError(
+ "Can't place primary key columns on an inherited "
+ "class with no table."
+ )
+
+ if TYPE_CHECKING:
+ assert isinstance(inherited_table, Table)
+
+ inherited_table.append_column(col)
+ if (
+ inherited_persist_selectable is not None
+ and inherited_persist_selectable is not inherited_table
+ ):
+ inherited_persist_selectable._refresh_for_new_column(
+ col
+ )
+
+ def _prepare_mapper_arguments(self, mapper_kw: _MapperKwArgs) -> None:
+ properties = self.properties
+
+ if self.mapper_args_fn:
+ mapper_args = self.mapper_args_fn()
+ else:
+ mapper_args = {}
+
+ if mapper_kw:
+ mapper_args.update(mapper_kw)
+
+ if "properties" in mapper_args:
+ properties = dict(properties)
+ properties.update(mapper_args["properties"])
+
+ # make sure that column copies are used rather
+ # than the original columns from any mixins
+ for k in ("version_id_col", "polymorphic_on"):
+ if k in mapper_args:
+ v = mapper_args[k]
+ mapper_args[k] = self.column_copies.get(v, v)
+
+ if "primary_key" in mapper_args:
+ mapper_args["primary_key"] = [
+ self.column_copies.get(v, v)
+ for v in util.to_list(mapper_args["primary_key"])
+ ]
+
+ if "inherits" in mapper_args:
+ inherits_arg = mapper_args["inherits"]
+ if isinstance(inherits_arg, Mapper):
+ inherits_arg = inherits_arg.class_
+
+ if inherits_arg is not self.inherits:
+ raise exc.InvalidRequestError(
+ "mapper inherits argument given for non-inheriting "
+ "class %s" % (mapper_args["inherits"])
+ )
+
+ if self.inherits:
+ mapper_args["inherits"] = self.inherits
+
+ if self.inherits and not mapper_args.get("concrete", False):
+ # note the superclass is expected to have a Mapper assigned and
+ # not be a deferred config, as this is called within map()
+ inherited_mapper = class_mapper(self.inherits, False)
+ inherited_table = inherited_mapper.local_table
+
+ # single or joined inheritance
+ # exclude any cols on the inherited table which are
+ # not mapped on the parent class, to avoid
+ # mapping columns specific to sibling/nephew classes
+ if "exclude_properties" not in mapper_args:
+ mapper_args["exclude_properties"] = exclude_properties = {
+ c.key
+ for c in inherited_table.c
+ if c not in inherited_mapper._columntoproperty
+ }.union(inherited_mapper.exclude_properties or ())
+ exclude_properties.difference_update(
+ [c.key for c in self.declared_columns]
+ )
+
+ # look through columns in the current mapper that
+ # are keyed to a propname different than the colname
+ # (if names were the same, we'd have popped it out above,
+ # in which case the mapper makes this combination).
+ # See if the superclass has a similar column property.
+ # If so, join them together.
+ for k, col in list(properties.items()):
+ if not isinstance(col, expression.ColumnElement):
+ continue
+ if k in inherited_mapper._props:
+ p = inherited_mapper._props[k]
+ if isinstance(p, ColumnProperty):
+ # note here we place the subclass column
+ # first. See [ticket:1892] for background.
+ properties[k] = [col] + p.columns
+ result_mapper_args = mapper_args.copy()
+ result_mapper_args["properties"] = properties
+ self.mapper_args = result_mapper_args
+
+ def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
+ self._prepare_mapper_arguments(mapper_kw)
+ if hasattr(self.cls, "__mapper_cls__"):
+ mapper_cls = cast(
+ "Type[Mapper[Any]]",
+ util.unbound_method_to_callable(
+ self.cls.__mapper_cls__ # type: ignore
+ ),
+ )
+ else:
+ mapper_cls = Mapper
+
+ return self.set_cls_attribute(
+ "__mapper__",
+ mapper_cls(self.cls, self.local_table, **self.mapper_args),
+ )
+
+
+@util.preload_module("sqlalchemy.orm.decl_api")
+def _as_dc_declaredattr(
+ field_metadata: Mapping[str, Any], sa_dataclass_metadata_key: str
+) -> Any:
+ # wrap lambdas inside dataclass fields inside an ad-hoc declared_attr.
+ # we can't write it because field.metadata is immutable :( so we have
+ # to go through extra trouble to compare these
+ decl_api = util.preloaded.orm_decl_api
+ obj = field_metadata[sa_dataclass_metadata_key]
+ if callable(obj) and not isinstance(obj, decl_api.declared_attr):
+ return decl_api.declared_attr(obj)
+ else:
+ return obj
+
+
+class _DeferredMapperConfig(_ClassScanMapperConfig):
+ _cls: weakref.ref[Type[Any]]
+
+ is_deferred = True
+
+ _configs: util.OrderedDict[
+ weakref.ref[Type[Any]], _DeferredMapperConfig
+ ] = util.OrderedDict()
+
+ def _early_mapping(self, mapper_kw: _MapperKwArgs) -> None:
+ pass
+
+ # mypy disallows plain property override of variable
+ @property # type: ignore
+ def cls(self) -> Type[Any]:
+ return self._cls() # type: ignore
+
+ @cls.setter
+ def cls(self, class_: Type[Any]) -> None:
+ self._cls = weakref.ref(class_, self._remove_config_cls)
+ self._configs[self._cls] = self
+
+ @classmethod
+ def _remove_config_cls(cls, ref: weakref.ref[Type[Any]]) -> None:
+ cls._configs.pop(ref, None)
+
+ @classmethod
+ def has_cls(cls, class_: Type[Any]) -> bool:
+ # 2.6 fails on weakref if class_ is an old style class
+ return isinstance(class_, type) and weakref.ref(class_) in cls._configs
+
+ @classmethod
+ def raise_unmapped_for_cls(cls, class_: Type[Any]) -> NoReturn:
+ if hasattr(class_, "_sa_raise_deferred_config"):
+ class_._sa_raise_deferred_config()
+
+ raise orm_exc.UnmappedClassError(
+ class_,
+ msg=(
+ f"Class {orm_exc._safe_cls_name(class_)} has a deferred "
+ "mapping on it. It is not yet usable as a mapped class."
+ ),
+ )
+
+ @classmethod
+ def config_for_cls(cls, class_: Type[Any]) -> _DeferredMapperConfig:
+ return cls._configs[weakref.ref(class_)]
+
+ @classmethod
+ def classes_for_base(
+ cls, base_cls: Type[Any], sort: bool = True
+ ) -> List[_DeferredMapperConfig]:
+ classes_for_base = [
+ m
+ for m, cls_ in [(m, m.cls) for m in cls._configs.values()]
+ if cls_ is not None and issubclass(cls_, base_cls)
+ ]
+
+ if not sort:
+ return classes_for_base
+
+ all_m_by_cls = {m.cls: m for m in classes_for_base}
+
+ tuples: List[Tuple[_DeferredMapperConfig, _DeferredMapperConfig]] = []
+ for m_cls in all_m_by_cls:
+ tuples.extend(
+ (all_m_by_cls[base_cls], all_m_by_cls[m_cls])
+ for base_cls in m_cls.__bases__
+ if base_cls in all_m_by_cls
+ )
+ return list(topological.sort(tuples, classes_for_base))
+
+ def map(self, mapper_kw: _MapperKwArgs = util.EMPTY_DICT) -> Mapper[Any]:
+ self._configs.pop(self._cls, None)
+ return super().map(mapper_kw)
+
+
+def _add_attribute(
+ cls: Type[Any], key: str, value: MapperProperty[Any]
+) -> None:
+ """add an attribute to an existing declarative class.
+
+ This runs through the logic to determine MapperProperty,
+ adds it to the Mapper, adds a column to the mapped Table, etc.
+
+ """
+
+ if "__mapper__" in cls.__dict__:
+ mapped_cls = cast("MappedClassProtocol[Any]", cls)
+
+ def _table_or_raise(mc: MappedClassProtocol[Any]) -> Table:
+ if isinstance(mc.__table__, Table):
+ return mc.__table__
+ raise exc.InvalidRequestError(
+ f"Cannot add a new attribute to mapped class {mc.__name__!r} "
+ "because it's not mapped against a table."
+ )
+
+ if isinstance(value, Column):
+ _undefer_column_name(key, value)
+ _table_or_raise(mapped_cls).append_column(
+ value, replace_existing=True
+ )
+ mapped_cls.__mapper__.add_property(key, value)
+ elif isinstance(value, _MapsColumns):
+ mp = value.mapper_property_to_assign
+ for col, _ in value.columns_to_assign:
+ _undefer_column_name(key, col)
+ _table_or_raise(mapped_cls).append_column(
+ col, replace_existing=True
+ )
+ if not mp:
+ mapped_cls.__mapper__.add_property(key, col)
+ if mp:
+ mapped_cls.__mapper__.add_property(key, mp)
+ elif isinstance(value, MapperProperty):
+ mapped_cls.__mapper__.add_property(key, value)
+ elif isinstance(value, QueryableAttribute) and value.key != key:
+ # detect a QueryableAttribute that's already mapped being
+ # assigned elsewhere in userland, turn into a synonym()
+ value = SynonymProperty(value.key)
+ mapped_cls.__mapper__.add_property(key, value)
+ else:
+ type.__setattr__(cls, key, value)
+ mapped_cls.__mapper__._expire_memoizations()
+ else:
+ type.__setattr__(cls, key, value)
+
+
+def _del_attribute(cls: Type[Any], key: str) -> None:
+ if (
+ "__mapper__" in cls.__dict__
+ and key in cls.__dict__
+ and not cast(
+ "MappedClassProtocol[Any]", cls
+ ).__mapper__._dispose_called
+ ):
+ value = cls.__dict__[key]
+ if isinstance(
+ value, (Column, _MapsColumns, MapperProperty, QueryableAttribute)
+ ):
+ raise NotImplementedError(
+ "Can't un-map individual mapped attributes on a mapped class."
+ )
+ else:
+ type.__delattr__(cls, key)
+ cast(
+ "MappedClassProtocol[Any]", cls
+ ).__mapper__._expire_memoizations()
+ else:
+ type.__delattr__(cls, key)
+
+
+def _declarative_constructor(self: Any, **kwargs: Any) -> None:
+ """A simple constructor that allows initialization from kwargs.
+
+ Sets attributes on the constructed instance using the names and
+ values in ``kwargs``.
+
+ Only keys that are present as
+ attributes of the instance's class are allowed. These could be,
+ for example, any mapped columns or relationships.
+ """
+ cls_ = type(self)
+ for k in kwargs:
+ if not hasattr(cls_, k):
+ raise TypeError(
+ "%r is an invalid keyword argument for %s" % (k, cls_.__name__)
+ )
+ setattr(self, k, kwargs[k])
+
+
+_declarative_constructor.__name__ = "__init__"
+
+
+def _undefer_column_name(key: str, column: Column[Any]) -> None:
+ if column.key is None:
+ column.key = key
+ if column.name is None:
+ column.name = key