summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py1389
1 files changed, 1389 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py b/venv/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py
new file mode 100644
index 0000000..22d6091
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/sql/coercions.py
@@ -0,0 +1,1389 @@
+# sql/coercions.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+from __future__ import annotations
+
+import collections.abc as collections_abc
+import numbers
+import re
+import typing
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import Iterable
+from typing import Iterator
+from typing import List
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from . import operators
+from . import roles
+from . import visitors
+from ._typing import is_from_clause
+from .base import ExecutableOption
+from .base import Options
+from .cache_key import HasCacheKey
+from .visitors import Visitable
+from .. import exc
+from .. import inspection
+from .. import util
+from ..util.typing import Literal
+
+if typing.TYPE_CHECKING:
+ # elements lambdas schema selectable are set by __init__
+ from . import elements
+ from . import lambdas
+ from . import schema
+ from . import selectable
+ from ._typing import _ColumnExpressionArgument
+ from ._typing import _ColumnsClauseArgument
+ from ._typing import _DDLColumnArgument
+ from ._typing import _DMLTableArgument
+ from ._typing import _FromClauseArgument
+ from .dml import _DMLTableElement
+ from .elements import BindParameter
+ from .elements import ClauseElement
+ from .elements import ColumnClause
+ from .elements import ColumnElement
+ from .elements import DQLDMLClauseElement
+ from .elements import NamedColumn
+ from .elements import SQLCoreOperations
+ from .schema import Column
+ from .selectable import _ColumnsClauseElement
+ from .selectable import _JoinTargetProtocol
+ from .selectable import FromClause
+ from .selectable import HasCTE
+ from .selectable import SelectBase
+ from .selectable import Subquery
+ from .visitors import _TraverseCallableType
+
+_SR = TypeVar("_SR", bound=roles.SQLRole)
+_F = TypeVar("_F", bound=Callable[..., Any])
+_StringOnlyR = TypeVar("_StringOnlyR", bound=roles.StringRole)
+_T = TypeVar("_T", bound=Any)
+
+
+def _is_literal(element):
+ """Return whether or not the element is a "literal" in the context
+ of a SQL expression construct.
+
+ """
+
+ return not isinstance(
+ element,
+ (Visitable, schema.SchemaEventTarget),
+ ) and not hasattr(element, "__clause_element__")
+
+
+def _deep_is_literal(element):
+ """Return whether or not the element is a "literal" in the context
+ of a SQL expression construct.
+
+ does a deeper more esoteric check than _is_literal. is used
+ for lambda elements that have to distinguish values that would
+ be bound vs. not without any context.
+
+ """
+
+ if isinstance(element, collections_abc.Sequence) and not isinstance(
+ element, str
+ ):
+ for elem in element:
+ if not _deep_is_literal(elem):
+ return False
+ else:
+ return True
+
+ return (
+ not isinstance(
+ element,
+ (
+ Visitable,
+ schema.SchemaEventTarget,
+ HasCacheKey,
+ Options,
+ util.langhelpers.symbol,
+ ),
+ )
+ and not hasattr(element, "__clause_element__")
+ and (
+ not isinstance(element, type)
+ or not issubclass(element, HasCacheKey)
+ )
+ )
+
+
+def _document_text_coercion(
+ paramname: str, meth_rst: str, param_rst: str
+) -> Callable[[_F], _F]:
+ return util.add_parameter_text(
+ paramname,
+ (
+ ".. warning:: "
+ "The %s argument to %s can be passed as a Python string argument, "
+ "which will be treated "
+ "as **trusted SQL text** and rendered as given. **DO NOT PASS "
+ "UNTRUSTED INPUT TO THIS PARAMETER**."
+ )
+ % (param_rst, meth_rst),
+ )
+
+
+def _expression_collection_was_a_list(
+ attrname: str,
+ fnname: str,
+ args: Union[Sequence[_T], Sequence[Sequence[_T]]],
+) -> Sequence[_T]:
+ if args and isinstance(args[0], (list, set, dict)) and len(args) == 1:
+ if isinstance(args[0], list):
+ raise exc.ArgumentError(
+ f'The "{attrname}" argument to {fnname}(), when '
+ "referring to a sequence "
+ "of items, is now passed as a series of positional "
+ "elements, rather than as a list. "
+ )
+ return cast("Sequence[_T]", args[0])
+
+ return cast("Sequence[_T]", args)
+
+
+@overload
+def expect(
+ role: Type[roles.TruncatedLabelRole],
+ element: Any,
+ **kw: Any,
+) -> str: ...
+
+
+@overload
+def expect(
+ role: Type[roles.DMLColumnRole],
+ element: Any,
+ *,
+ as_key: Literal[True] = ...,
+ **kw: Any,
+) -> str: ...
+
+
+@overload
+def expect(
+ role: Type[roles.LiteralValueRole],
+ element: Any,
+ **kw: Any,
+) -> BindParameter[Any]: ...
+
+
+@overload
+def expect(
+ role: Type[roles.DDLReferredColumnRole],
+ element: Any,
+ **kw: Any,
+) -> Column[Any]: ...
+
+
+@overload
+def expect(
+ role: Type[roles.DDLConstraintColumnRole],
+ element: Any,
+ **kw: Any,
+) -> Union[Column[Any], str]: ...
+
+
+@overload
+def expect(
+ role: Type[roles.StatementOptionRole],
+ element: Any,
+ **kw: Any,
+) -> DQLDMLClauseElement: ...
+
+
+@overload
+def expect(
+ role: Type[roles.LabeledColumnExprRole[Any]],
+ element: _ColumnExpressionArgument[_T],
+ **kw: Any,
+) -> NamedColumn[_T]: ...
+
+
+@overload
+def expect(
+ role: Union[
+ Type[roles.ExpressionElementRole[Any]],
+ Type[roles.LimitOffsetRole],
+ Type[roles.WhereHavingRole],
+ ],
+ element: _ColumnExpressionArgument[_T],
+ **kw: Any,
+) -> ColumnElement[_T]: ...
+
+
+@overload
+def expect(
+ role: Union[
+ Type[roles.ExpressionElementRole[Any]],
+ Type[roles.LimitOffsetRole],
+ Type[roles.WhereHavingRole],
+ Type[roles.OnClauseRole],
+ Type[roles.ColumnArgumentRole],
+ ],
+ element: Any,
+ **kw: Any,
+) -> ColumnElement[Any]: ...
+
+
+@overload
+def expect(
+ role: Type[roles.DMLTableRole],
+ element: _DMLTableArgument,
+ **kw: Any,
+) -> _DMLTableElement: ...
+
+
+@overload
+def expect(
+ role: Type[roles.HasCTERole],
+ element: HasCTE,
+ **kw: Any,
+) -> HasCTE: ...
+
+
+@overload
+def expect(
+ role: Type[roles.SelectStatementRole],
+ element: SelectBase,
+ **kw: Any,
+) -> SelectBase: ...
+
+
+@overload
+def expect(
+ role: Type[roles.FromClauseRole],
+ element: _FromClauseArgument,
+ **kw: Any,
+) -> FromClause: ...
+
+
+@overload
+def expect(
+ role: Type[roles.FromClauseRole],
+ element: SelectBase,
+ *,
+ explicit_subquery: Literal[True] = ...,
+ **kw: Any,
+) -> Subquery: ...
+
+
+@overload
+def expect(
+ role: Type[roles.ColumnsClauseRole],
+ element: _ColumnsClauseArgument[Any],
+ **kw: Any,
+) -> _ColumnsClauseElement: ...
+
+
+@overload
+def expect(
+ role: Type[roles.JoinTargetRole],
+ element: _JoinTargetProtocol,
+ **kw: Any,
+) -> _JoinTargetProtocol: ...
+
+
+# catchall for not-yet-implemented overloads
+@overload
+def expect(
+ role: Type[_SR],
+ element: Any,
+ **kw: Any,
+) -> Any: ...
+
+
+def expect(
+ role: Type[_SR],
+ element: Any,
+ *,
+ apply_propagate_attrs: Optional[ClauseElement] = None,
+ argname: Optional[str] = None,
+ post_inspect: bool = False,
+ disable_inspection: bool = False,
+ **kw: Any,
+) -> Any:
+ if (
+ role.allows_lambda
+ # note callable() will not invoke a __getattr__() method, whereas
+ # hasattr(obj, "__call__") will. by keeping the callable() check here
+ # we prevent most needless calls to hasattr() and therefore
+ # __getattr__(), which is present on ColumnElement.
+ and callable(element)
+ and hasattr(element, "__code__")
+ ):
+ return lambdas.LambdaElement(
+ element,
+ role,
+ lambdas.LambdaOptions(**kw),
+ apply_propagate_attrs=apply_propagate_attrs,
+ )
+
+ # major case is that we are given a ClauseElement already, skip more
+ # elaborate logic up front if possible
+ impl = _impl_lookup[role]
+
+ original_element = element
+
+ if not isinstance(
+ element,
+ (
+ elements.CompilerElement,
+ schema.SchemaItem,
+ schema.FetchedValue,
+ lambdas.PyWrapper,
+ ),
+ ):
+ resolved = None
+
+ if impl._resolve_literal_only:
+ resolved = impl._literal_coercion(element, **kw)
+ else:
+ original_element = element
+
+ is_clause_element = False
+
+ # this is a special performance optimization for ORM
+ # joins used by JoinTargetImpl that we don't go through the
+ # work of creating __clause_element__() when we only need the
+ # original QueryableAttribute, as the former will do clause
+ # adaption and all that which is just thrown away here.
+ if (
+ impl._skip_clauseelement_for_target_match
+ and isinstance(element, role)
+ and hasattr(element, "__clause_element__")
+ ):
+ is_clause_element = True
+ else:
+ while hasattr(element, "__clause_element__"):
+ is_clause_element = True
+
+ if not getattr(element, "is_clause_element", False):
+ element = element.__clause_element__()
+ else:
+ break
+
+ if not is_clause_element:
+ if impl._use_inspection and not disable_inspection:
+ insp = inspection.inspect(element, raiseerr=False)
+ if insp is not None:
+ if post_inspect:
+ insp._post_inspect
+ try:
+ resolved = insp.__clause_element__()
+ except AttributeError:
+ impl._raise_for_expected(original_element, argname)
+
+ if resolved is None:
+ resolved = impl._literal_coercion(
+ element, argname=argname, **kw
+ )
+ else:
+ resolved = element
+ elif isinstance(element, lambdas.PyWrapper):
+ resolved = element._sa__py_wrapper_literal(**kw)
+ else:
+ resolved = element
+
+ if apply_propagate_attrs is not None:
+ if typing.TYPE_CHECKING:
+ assert isinstance(resolved, (SQLCoreOperations, ClauseElement))
+
+ if not apply_propagate_attrs._propagate_attrs and getattr(
+ resolved, "_propagate_attrs", None
+ ):
+ apply_propagate_attrs._propagate_attrs = resolved._propagate_attrs
+
+ if impl._role_class in resolved.__class__.__mro__:
+ if impl._post_coercion:
+ resolved = impl._post_coercion(
+ resolved,
+ argname=argname,
+ original_element=original_element,
+ **kw,
+ )
+ return resolved
+ else:
+ return impl._implicit_coercions(
+ original_element, resolved, argname=argname, **kw
+ )
+
+
+def expect_as_key(
+ role: Type[roles.DMLColumnRole], element: Any, **kw: Any
+) -> str:
+ kw.pop("as_key", None)
+ return expect(role, element, as_key=True, **kw)
+
+
+def expect_col_expression_collection(
+ role: Type[roles.DDLConstraintColumnRole],
+ expressions: Iterable[_DDLColumnArgument],
+) -> Iterator[
+ Tuple[
+ Union[str, Column[Any]],
+ Optional[ColumnClause[Any]],
+ Optional[str],
+ Optional[Union[Column[Any], str]],
+ ]
+]:
+ for expr in expressions:
+ strname = None
+ column = None
+
+ resolved: Union[Column[Any], str] = expect(role, expr)
+ if isinstance(resolved, str):
+ assert isinstance(expr, str)
+ strname = resolved = expr
+ else:
+ cols: List[Column[Any]] = []
+ col_append: _TraverseCallableType[Column[Any]] = cols.append
+ visitors.traverse(resolved, {}, {"column": col_append})
+ if cols:
+ column = cols[0]
+ add_element = column if column is not None else strname
+
+ yield resolved, column, strname, add_element
+
+
+class RoleImpl:
+ __slots__ = ("_role_class", "name", "_use_inspection")
+
+ def _literal_coercion(self, element, **kw):
+ raise NotImplementedError()
+
+ _post_coercion: Any = None
+ _resolve_literal_only = False
+ _skip_clauseelement_for_target_match = False
+
+ def __init__(self, role_class):
+ self._role_class = role_class
+ self.name = role_class._role_name
+ self._use_inspection = issubclass(role_class, roles.UsesInspection)
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ self._raise_for_expected(element, argname, resolved)
+
+ def _raise_for_expected(
+ self,
+ element: Any,
+ argname: Optional[str] = None,
+ resolved: Optional[Any] = None,
+ advice: Optional[str] = None,
+ code: Optional[str] = None,
+ err: Optional[Exception] = None,
+ **kw: Any,
+ ) -> NoReturn:
+ if resolved is not None and resolved is not element:
+ got = "%r object resolved from %r object" % (resolved, element)
+ else:
+ got = repr(element)
+
+ if argname:
+ msg = "%s expected for argument %r; got %s." % (
+ self.name,
+ argname,
+ got,
+ )
+ else:
+ msg = "%s expected, got %s." % (self.name, got)
+
+ if advice:
+ msg += " " + advice
+
+ raise exc.ArgumentError(msg, code=code) from err
+
+
+class _Deannotate:
+ __slots__ = ()
+
+ def _post_coercion(self, resolved, **kw):
+ from .util import _deep_deannotate
+
+ return _deep_deannotate(resolved)
+
+
+class _StringOnly:
+ __slots__ = ()
+
+ _resolve_literal_only = True
+
+
+class _ReturnsStringKey(RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(self, element, resolved, argname=None, **kw):
+ if isinstance(element, str):
+ return element
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+ def _literal_coercion(self, element, **kw):
+ return element
+
+
+class _ColumnCoercions(RoleImpl):
+ __slots__ = ()
+
+ def _warn_for_scalar_subquery_coercion(self):
+ util.warn(
+ "implicitly coercing SELECT object to scalar subquery; "
+ "please use the .scalar_subquery() method to produce a scalar "
+ "subquery.",
+ )
+
+ def _implicit_coercions(self, element, resolved, argname=None, **kw):
+ original_element = element
+ if not getattr(resolved, "is_clause_element", False):
+ self._raise_for_expected(original_element, argname, resolved)
+ elif resolved._is_select_base:
+ self._warn_for_scalar_subquery_coercion()
+ return resolved.scalar_subquery()
+ elif resolved._is_from_clause and isinstance(
+ resolved, selectable.Subquery
+ ):
+ self._warn_for_scalar_subquery_coercion()
+ return resolved.element.scalar_subquery()
+ elif self._role_class.allows_lambda and resolved._is_lambda_element:
+ return resolved
+ else:
+ self._raise_for_expected(original_element, argname, resolved)
+
+
+def _no_text_coercion(
+ element: Any,
+ argname: Optional[str] = None,
+ exc_cls: Type[exc.SQLAlchemyError] = exc.ArgumentError,
+ extra: Optional[str] = None,
+ err: Optional[Exception] = None,
+) -> NoReturn:
+ raise exc_cls(
+ "%(extra)sTextual SQL expression %(expr)r %(argname)sshould be "
+ "explicitly declared as text(%(expr)r)"
+ % {
+ "expr": util.ellipses_string(element),
+ "argname": "for argument %s" % (argname,) if argname else "",
+ "extra": "%s " % extra if extra else "",
+ }
+ ) from err
+
+
+class _NoTextCoercion(RoleImpl):
+ __slots__ = ()
+
+ def _literal_coercion(self, element, argname=None, **kw):
+ if isinstance(element, str) and issubclass(
+ elements.TextClause, self._role_class
+ ):
+ _no_text_coercion(element, argname)
+ else:
+ self._raise_for_expected(element, argname)
+
+
+class _CoerceLiterals(RoleImpl):
+ __slots__ = ()
+ _coerce_consts = False
+ _coerce_star = False
+ _coerce_numerics = False
+
+ def _text_coercion(self, element, argname=None):
+ return _no_text_coercion(element, argname)
+
+ def _literal_coercion(self, element, argname=None, **kw):
+ if isinstance(element, str):
+ if self._coerce_star and element == "*":
+ return elements.ColumnClause("*", is_literal=True)
+ else:
+ return self._text_coercion(element, argname, **kw)
+
+ if self._coerce_consts:
+ if element is None:
+ return elements.Null()
+ elif element is False:
+ return elements.False_()
+ elif element is True:
+ return elements.True_()
+
+ if self._coerce_numerics and isinstance(element, (numbers.Number)):
+ return elements.ColumnClause(str(element), is_literal=True)
+
+ self._raise_for_expected(element, argname)
+
+
+class LiteralValueImpl(RoleImpl):
+ _resolve_literal_only = True
+
+ def _implicit_coercions(
+ self,
+ element,
+ resolved,
+ argname,
+ type_=None,
+ literal_execute=False,
+ **kw,
+ ):
+ if not _is_literal(resolved):
+ self._raise_for_expected(
+ element, resolved=resolved, argname=argname, **kw
+ )
+
+ return elements.BindParameter(
+ None,
+ element,
+ type_=type_,
+ unique=True,
+ literal_execute=literal_execute,
+ )
+
+ def _literal_coercion(self, element, argname=None, type_=None, **kw):
+ return element
+
+
+class _SelectIsNotFrom(RoleImpl):
+ __slots__ = ()
+
+ def _raise_for_expected(
+ self,
+ element: Any,
+ argname: Optional[str] = None,
+ resolved: Optional[Any] = None,
+ advice: Optional[str] = None,
+ code: Optional[str] = None,
+ err: Optional[Exception] = None,
+ **kw: Any,
+ ) -> NoReturn:
+ if (
+ not advice
+ and isinstance(element, roles.SelectStatementRole)
+ or isinstance(resolved, roles.SelectStatementRole)
+ ):
+ advice = (
+ "To create a "
+ "FROM clause from a %s object, use the .subquery() method."
+ % (resolved.__class__ if resolved is not None else element,)
+ )
+ code = "89ve"
+ else:
+ code = None
+
+ super()._raise_for_expected(
+ element,
+ argname=argname,
+ resolved=resolved,
+ advice=advice,
+ code=code,
+ err=err,
+ **kw,
+ )
+ # never reached
+ assert False
+
+
+class HasCacheKeyImpl(RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if isinstance(element, HasCacheKey):
+ return element
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+ def _literal_coercion(self, element, **kw):
+ return element
+
+
+class ExecutableOptionImpl(RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if isinstance(element, ExecutableOption):
+ return element
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+ def _literal_coercion(self, element, **kw):
+ return element
+
+
+class ExpressionElementImpl(_ColumnCoercions, RoleImpl):
+ __slots__ = ()
+
+ def _literal_coercion(
+ self, element, name=None, type_=None, argname=None, is_crud=False, **kw
+ ):
+ if (
+ element is None
+ and not is_crud
+ and (type_ is None or not type_.should_evaluate_none)
+ ):
+ # TODO: there's no test coverage now for the
+ # "should_evaluate_none" part of this, as outside of "crud" this
+ # codepath is not normally used except in some special cases
+ return elements.Null()
+ else:
+ try:
+ return elements.BindParameter(
+ name, element, type_, unique=True, _is_crud=is_crud
+ )
+ except exc.ArgumentError as err:
+ self._raise_for_expected(element, err=err)
+
+ def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
+ # select uses implicit coercion with warning instead of raising
+ if isinstance(element, selectable.Values):
+ advice = (
+ "To create a column expression from a VALUES clause, "
+ "use the .scalar_values() method."
+ )
+ elif isinstance(element, roles.AnonymizedFromClauseRole):
+ advice = (
+ "To create a column expression from a FROM clause row "
+ "as a whole, use the .table_valued() method."
+ )
+ else:
+ advice = None
+
+ return super()._raise_for_expected(
+ element, argname=argname, resolved=resolved, advice=advice, **kw
+ )
+
+
+class BinaryElementImpl(ExpressionElementImpl, RoleImpl):
+ __slots__ = ()
+
+ def _literal_coercion(
+ self, element, expr, operator, bindparam_type=None, argname=None, **kw
+ ):
+ try:
+ return expr._bind_param(operator, element, type_=bindparam_type)
+ except exc.ArgumentError as err:
+ self._raise_for_expected(element, err=err)
+
+ def _post_coercion(self, resolved, expr, bindparam_type=None, **kw):
+ if resolved.type._isnull and not expr.type._isnull:
+ resolved = resolved._with_binary_element_type(
+ bindparam_type if bindparam_type is not None else expr.type
+ )
+ return resolved
+
+
+class InElementImpl(RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if resolved._is_from_clause:
+ if (
+ isinstance(resolved, selectable.Alias)
+ and resolved.element._is_select_base
+ ):
+ self._warn_for_implicit_coercion(resolved)
+ return self._post_coercion(resolved.element, **kw)
+ else:
+ self._warn_for_implicit_coercion(resolved)
+ return self._post_coercion(resolved.select(), **kw)
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+ def _warn_for_implicit_coercion(self, elem):
+ util.warn(
+ "Coercing %s object into a select() for use in IN(); "
+ "please pass a select() construct explicitly"
+ % (elem.__class__.__name__)
+ )
+
+ def _literal_coercion(self, element, expr, operator, **kw):
+ if util.is_non_string_iterable(element):
+ non_literal_expressions: Dict[
+ Optional[operators.ColumnOperators],
+ operators.ColumnOperators,
+ ] = {}
+ element = list(element)
+ for o in element:
+ if not _is_literal(o):
+ if not isinstance(o, operators.ColumnOperators):
+ self._raise_for_expected(element, **kw)
+
+ else:
+ non_literal_expressions[o] = o
+ elif o is None:
+ non_literal_expressions[o] = elements.Null()
+
+ if non_literal_expressions:
+ return elements.ClauseList(
+ *[
+ (
+ non_literal_expressions[o]
+ if o in non_literal_expressions
+ else expr._bind_param(operator, o)
+ )
+ for o in element
+ ]
+ )
+ else:
+ return expr._bind_param(operator, element, expanding=True)
+
+ else:
+ self._raise_for_expected(element, **kw)
+
+ def _post_coercion(self, element, expr, operator, **kw):
+ if element._is_select_base:
+ # for IN, we are doing scalar_subquery() coercion without
+ # a warning
+ return element.scalar_subquery()
+ elif isinstance(element, elements.ClauseList):
+ assert not len(element.clauses) == 0
+ return element.self_group(against=operator)
+
+ elif isinstance(element, elements.BindParameter):
+ element = element._clone(maintain_key=True)
+ element.expanding = True
+ element.expand_op = operator
+
+ return element
+ elif isinstance(element, selectable.Values):
+ return element.scalar_values()
+ else:
+ return element
+
+
+class OnClauseImpl(_ColumnCoercions, RoleImpl):
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _literal_coercion(
+ self, element, name=None, type_=None, argname=None, is_crud=False, **kw
+ ):
+ self._raise_for_expected(element)
+
+ def _post_coercion(self, resolved, original_element=None, **kw):
+ # this is a hack right now as we want to use coercion on an
+ # ORM InstrumentedAttribute, but we want to return the object
+ # itself if it is one, not its clause element.
+ # ORM context _join and _legacy_join() would need to be improved
+ # to look for annotations in a clause element form.
+ if isinstance(original_element, roles.JoinTargetRole):
+ return original_element
+ return resolved
+
+
+class WhereHavingImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl):
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _text_coercion(self, element, argname=None):
+ return _no_text_coercion(element, argname)
+
+
+class StatementOptionImpl(_CoerceLiterals, RoleImpl):
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _text_coercion(self, element, argname=None):
+ return elements.TextClause(element)
+
+
+class ColumnArgumentImpl(_NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+
+class ColumnArgumentOrKeyImpl(_ReturnsStringKey, RoleImpl):
+ __slots__ = ()
+
+
+class StrAsPlainColumnImpl(_CoerceLiterals, RoleImpl):
+ __slots__ = ()
+
+ def _text_coercion(self, element, argname=None):
+ return elements.ColumnClause(element)
+
+
+class ByOfImpl(_CoerceLiterals, _ColumnCoercions, RoleImpl, roles.ByOfRole):
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _text_coercion(self, element, argname=None):
+ return elements._textual_label_reference(element)
+
+
+class OrderByImpl(ByOfImpl, RoleImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, resolved, **kw):
+ if (
+ isinstance(resolved, self._role_class)
+ and resolved._order_by_label_element is not None
+ ):
+ return elements._label_reference(resolved)
+ else:
+ return resolved
+
+
+class GroupByImpl(ByOfImpl, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if is_from_clause(resolved):
+ return elements.ClauseList(*resolved.c)
+ else:
+ return resolved
+
+
+class DMLColumnImpl(_ReturnsStringKey, RoleImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, element, as_key=False, **kw):
+ if as_key:
+ return element.key
+ else:
+ return element
+
+
+class ConstExprImpl(RoleImpl):
+ __slots__ = ()
+
+ def _literal_coercion(self, element, argname=None, **kw):
+ if element is None:
+ return elements.Null()
+ elif element is False:
+ return elements.False_()
+ elif element is True:
+ return elements.True_()
+ else:
+ self._raise_for_expected(element, argname)
+
+
+class TruncatedLabelImpl(_StringOnly, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if isinstance(element, str):
+ return resolved
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+ def _literal_coercion(self, element, argname=None, **kw):
+ """coerce the given value to :class:`._truncated_label`.
+
+ Existing :class:`._truncated_label` and
+ :class:`._anonymous_label` objects are passed
+ unchanged.
+ """
+
+ if isinstance(element, elements._truncated_label):
+ return element
+ else:
+ return elements._truncated_label(element)
+
+
+class DDLExpressionImpl(_Deannotate, _CoerceLiterals, RoleImpl):
+ __slots__ = ()
+
+ _coerce_consts = True
+
+ def _text_coercion(self, element, argname=None):
+ # see #5754 for why we can't easily deprecate this coercion.
+ # essentially expressions like postgresql_where would have to be
+ # text() as they come back from reflection and we don't want to
+ # have text() elements wired into the inspection dictionaries.
+ return elements.TextClause(element)
+
+
+class DDLConstraintColumnImpl(_Deannotate, _ReturnsStringKey, RoleImpl):
+ __slots__ = ()
+
+
+class DDLReferredColumnImpl(DDLConstraintColumnImpl):
+ __slots__ = ()
+
+
+class LimitOffsetImpl(RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if resolved is None:
+ return None
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+ def _literal_coercion(self, element, name, type_, **kw):
+ if element is None:
+ return None
+ else:
+ value = util.asint(element)
+ return selectable._OffsetLimitParam(
+ name, value, type_=type_, unique=True
+ )
+
+
+class LabeledColumnExprImpl(ExpressionElementImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if isinstance(resolved, roles.ExpressionElementRole):
+ return resolved.label(None)
+ else:
+ new = super()._implicit_coercions(
+ element, resolved, argname=argname, **kw
+ )
+ if isinstance(new, roles.ExpressionElementRole):
+ return new.label(None)
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+
+class ColumnsClauseImpl(_SelectIsNotFrom, _CoerceLiterals, RoleImpl):
+ __slots__ = ()
+
+ _coerce_consts = True
+ _coerce_numerics = True
+ _coerce_star = True
+
+ _guess_straight_column = re.compile(r"^\w\S*$", re.I)
+
+ def _raise_for_expected(
+ self, element, argname=None, resolved=None, advice=None, **kw
+ ):
+ if not advice and isinstance(element, list):
+ advice = (
+ f"Did you mean to say select("
+ f"{', '.join(repr(e) for e in element)})?"
+ )
+
+ return super()._raise_for_expected(
+ element, argname=argname, resolved=resolved, advice=advice, **kw
+ )
+
+ def _text_coercion(self, element, argname=None):
+ element = str(element)
+
+ guess_is_literal = not self._guess_straight_column.match(element)
+ raise exc.ArgumentError(
+ "Textual column expression %(column)r %(argname)sshould be "
+ "explicitly declared with text(%(column)r), "
+ "or use %(literal_column)s(%(column)r) "
+ "for more specificity"
+ % {
+ "column": util.ellipses_string(element),
+ "argname": "for argument %s" % (argname,) if argname else "",
+ "literal_column": (
+ "literal_column" if guess_is_literal else "column"
+ ),
+ }
+ )
+
+
+class ReturnsRowsImpl(RoleImpl):
+ __slots__ = ()
+
+
+class StatementImpl(_CoerceLiterals, RoleImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, resolved, original_element, argname=None, **kw):
+ if resolved is not original_element and not isinstance(
+ original_element, str
+ ):
+ # use same method as Connection uses; this will later raise
+ # ObjectNotExecutableError
+ try:
+ original_element._execute_on_connection
+ except AttributeError:
+ util.warn_deprecated(
+ "Object %r should not be used directly in a SQL statement "
+ "context, such as passing to methods such as "
+ "session.execute(). This usage will be disallowed in a "
+ "future release. "
+ "Please use Core select() / update() / delete() etc. "
+ "with Session.execute() and other statement execution "
+ "methods." % original_element,
+ "1.4",
+ )
+
+ return resolved
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if resolved._is_lambda_element:
+ return resolved
+ else:
+ return super()._implicit_coercions(
+ element, resolved, argname=argname, **kw
+ )
+
+
+class SelectStatementImpl(_NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if resolved._is_text_clause:
+ return resolved.columns()
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+
+class HasCTEImpl(ReturnsRowsImpl):
+ __slots__ = ()
+
+
+class IsCTEImpl(RoleImpl):
+ __slots__ = ()
+
+
+class JoinTargetImpl(RoleImpl):
+ __slots__ = ()
+
+ _skip_clauseelement_for_target_match = True
+
+ def _literal_coercion(self, element, argname=None, **kw):
+ self._raise_for_expected(element, argname)
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ legacy: bool = False,
+ **kw: Any,
+ ) -> Any:
+ if isinstance(element, roles.JoinTargetRole):
+ # note that this codepath no longer occurs as of
+ # #6550, unless JoinTargetImpl._skip_clauseelement_for_target_match
+ # were set to False.
+ return element
+ elif legacy and resolved._is_select_base:
+ util.warn_deprecated(
+ "Implicit coercion of SELECT and textual SELECT "
+ "constructs into FROM clauses is deprecated; please call "
+ ".subquery() on any Core select or ORM Query object in "
+ "order to produce a subquery object.",
+ version="1.4",
+ )
+ # TODO: doing _implicit_subquery here causes tests to fail,
+ # how was this working before? probably that ORM
+ # join logic treated it as a select and subquery would happen
+ # in _ORMJoin->Join
+ return resolved
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+
+class FromClauseImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ explicit_subquery: bool = False,
+ allow_select: bool = True,
+ **kw: Any,
+ ) -> Any:
+ if resolved._is_select_base:
+ if explicit_subquery:
+ return resolved.subquery()
+ elif allow_select:
+ util.warn_deprecated(
+ "Implicit coercion of SELECT and textual SELECT "
+ "constructs into FROM clauses is deprecated; please call "
+ ".subquery() on any Core select or ORM Query object in "
+ "order to produce a subquery object.",
+ version="1.4",
+ )
+ return resolved._implicit_subquery
+ elif resolved._is_text_clause:
+ return resolved
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+ def _post_coercion(self, element, deannotate=False, **kw):
+ if deannotate:
+ return element._deannotate()
+ else:
+ return element
+
+
+class StrictFromClauseImpl(FromClauseImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ explicit_subquery: bool = False,
+ allow_select: bool = False,
+ **kw: Any,
+ ) -> Any:
+ if resolved._is_select_base and allow_select:
+ util.warn_deprecated(
+ "Implicit coercion of SELECT and textual SELECT constructs "
+ "into FROM clauses is deprecated; please call .subquery() "
+ "on any Core select or ORM Query object in order to produce a "
+ "subquery object.",
+ version="1.4",
+ )
+ return resolved._implicit_subquery
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+
+class AnonymizedFromClauseImpl(StrictFromClauseImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, element, flat=False, name=None, **kw):
+ assert name is None
+
+ return element._anonymous_fromclause(flat=flat)
+
+
+class DMLTableImpl(_SelectIsNotFrom, _NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _post_coercion(self, element, **kw):
+ if "dml_table" in element._annotations:
+ return element._annotations["dml_table"]
+ else:
+ return element
+
+
+class DMLSelectImpl(_NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _implicit_coercions(
+ self,
+ element: Any,
+ resolved: Any,
+ argname: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ if resolved._is_from_clause:
+ if (
+ isinstance(resolved, selectable.Alias)
+ and resolved.element._is_select_base
+ ):
+ return resolved.element
+ else:
+ return resolved.select()
+ else:
+ self._raise_for_expected(element, argname, resolved)
+
+
+class CompoundElementImpl(_NoTextCoercion, RoleImpl):
+ __slots__ = ()
+
+ def _raise_for_expected(self, element, argname=None, resolved=None, **kw):
+ if isinstance(element, roles.FromClauseRole):
+ if element._is_subquery:
+ advice = (
+ "Use the plain select() object without "
+ "calling .subquery() or .alias()."
+ )
+ else:
+ advice = (
+ "To SELECT from any FROM clause, use the .select() method."
+ )
+ else:
+ advice = None
+ return super()._raise_for_expected(
+ element, argname=argname, resolved=resolved, advice=advice, **kw
+ )
+
+
+_impl_lookup = {}
+
+
+for name in dir(roles):
+ cls = getattr(roles, name)
+ if name.endswith("Role"):
+ name = name.replace("Role", "Impl")
+ if name in globals():
+ impl = globals()[name](cls)
+ _impl_lookup[cls] = impl
+
+if not TYPE_CHECKING:
+ ee_impl = _impl_lookup[roles.ExpressionElementRole]
+
+ for py_type in (int, bool, str, float):
+ _impl_lookup[roles.ExpressionElementRole[py_type]] = ee_impl