summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/sql/crud.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/sql/crud.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/sql/crud.py1669
1 files changed, 0 insertions, 1669 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/sql/crud.py b/venv/lib/python3.11/site-packages/sqlalchemy/sql/crud.py
deleted file mode 100644
index 499a19d..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/sql/crud.py
+++ /dev/null
@@ -1,1669 +0,0 @@
-# sql/crud.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: allow-untyped-defs, allow-untyped-calls
-
-"""Functions used by compiler.py to determine the parameters rendered
-within INSERT and UPDATE statements.
-
-"""
-from __future__ import annotations
-
-import functools
-import operator
-from typing import Any
-from typing import Callable
-from typing import cast
-from typing import Dict
-from typing import Iterable
-from typing import List
-from typing import MutableMapping
-from typing import NamedTuple
-from typing import Optional
-from typing import overload
-from typing import Sequence
-from typing import Set
-from typing import Tuple
-from typing import TYPE_CHECKING
-from typing import Union
-
-from . import coercions
-from . import dml
-from . import elements
-from . import roles
-from .base import _DefaultDescriptionTuple
-from .dml import isinsert as _compile_state_isinsert
-from .elements import ColumnClause
-from .schema import default_is_clause_element
-from .schema import default_is_sequence
-from .selectable import Select
-from .selectable import TableClause
-from .. import exc
-from .. import util
-from ..util.typing import Literal
-
-if TYPE_CHECKING:
- from .compiler import _BindNameForColProtocol
- from .compiler import SQLCompiler
- from .dml import _DMLColumnElement
- from .dml import DMLState
- from .dml import ValuesBase
- from .elements import ColumnElement
- from .elements import KeyedColumnElement
- from .schema import _SQLExprDefault
- from .schema import Column
-
-REQUIRED = util.symbol(
- "REQUIRED",
- """
-Placeholder for the value within a :class:`.BindParameter`
-which is required to be present when the statement is passed
-to :meth:`_engine.Connection.execute`.
-
-This symbol is typically used when a :func:`_expression.insert`
-or :func:`_expression.update` statement is compiled without parameter
-values present.
-
-""",
-)
-
-
-def _as_dml_column(c: ColumnElement[Any]) -> ColumnClause[Any]:
- if not isinstance(c, ColumnClause):
- raise exc.CompileError(
- f"Can't create DML statement against column expression {c!r}"
- )
- return c
-
-
-_CrudParamElement = Tuple[
- "ColumnElement[Any]",
- str, # column name
- Optional[
- Union[str, "_SQLExprDefault"]
- ], # bound parameter string or SQL expression to apply
- Iterable[str],
-]
-_CrudParamElementStr = Tuple[
- "KeyedColumnElement[Any]",
- str, # column name
- str, # bound parameter string
- Iterable[str],
-]
-_CrudParamElementSQLExpr = Tuple[
- "ColumnClause[Any]",
- str,
- "_SQLExprDefault", # SQL expression to apply
- Iterable[str],
-]
-
-_CrudParamSequence = List[_CrudParamElement]
-
-
-class _CrudParams(NamedTuple):
- single_params: _CrudParamSequence
- all_multi_params: List[Sequence[_CrudParamElementStr]]
- is_default_metavalue_only: bool = False
- use_insertmanyvalues: bool = False
- use_sentinel_columns: Optional[Sequence[Column[Any]]] = None
-
-
-def _get_crud_params(
- compiler: SQLCompiler,
- stmt: ValuesBase,
- compile_state: DMLState,
- toplevel: bool,
- **kw: Any,
-) -> _CrudParams:
- """create a set of tuples representing column/string pairs for use
- in an INSERT or UPDATE statement.
-
- Also generates the Compiled object's postfetch, prefetch, and
- returning column collections, used for default handling and ultimately
- populating the CursorResult's prefetch_cols() and postfetch_cols()
- collections.
-
- """
-
- # note: the _get_crud_params() system was written with the notion in mind
- # that INSERT, UPDATE, DELETE are always the top level statement and
- # that there is only one of them. With the addition of CTEs that can
- # make use of DML, this assumption is no longer accurate; the DML
- # statement is not necessarily the top-level "row returning" thing
- # and it is also theoretically possible (fortunately nobody has asked yet)
- # to have a single statement with multiple DMLs inside of it via CTEs.
-
- # the current _get_crud_params() design doesn't accommodate these cases
- # right now. It "just works" for a CTE that has a single DML inside of
- # it, and for a CTE with multiple DML, it's not clear what would happen.
-
- # overall, the "compiler.XYZ" collections here would need to be in a
- # per-DML structure of some kind, and DefaultDialect would need to
- # navigate these collections on a per-statement basis, with additional
- # emphasis on the "toplevel returning data" statement. However we
- # still need to run through _get_crud_params() for all DML as we have
- # Python / SQL generated column defaults that need to be rendered.
-
- # if there is user need for this kind of thing, it's likely a post 2.0
- # kind of change as it would require deep changes to DefaultDialect
- # as well as here.
-
- compiler.postfetch = []
- compiler.insert_prefetch = []
- compiler.update_prefetch = []
- compiler.implicit_returning = []
-
- visiting_cte = kw.get("visiting_cte", None)
- if visiting_cte is not None:
- # for insert -> CTE -> insert, don't populate an incoming
- # _crud_accumulate_bind_names collection; the INSERT we process here
- # will not be inline within the VALUES of the enclosing INSERT as the
- # CTE is placed on the outside. See issue #9173
- kw.pop("accumulate_bind_names", None)
- assert (
- "accumulate_bind_names" not in kw
- ), "Don't know how to handle insert within insert without a CTE"
-
- # getters - these are normally just column.key,
- # but in the case of mysql multi-table update, the rules for
- # .key must conditionally take tablename into account
- (
- _column_as_key,
- _getattr_col_key,
- _col_bind_name,
- ) = _key_getters_for_crud_column(compiler, stmt, compile_state)
-
- compiler._get_bind_name_for_col = _col_bind_name
-
- if stmt._returning and stmt._return_defaults:
- raise exc.CompileError(
- "Can't compile statement that includes returning() and "
- "return_defaults() simultaneously"
- )
-
- if compile_state.isdelete:
- _setup_delete_return_defaults(
- compiler,
- stmt,
- compile_state,
- (),
- _getattr_col_key,
- _column_as_key,
- _col_bind_name,
- (),
- (),
- toplevel,
- kw,
- )
- return _CrudParams([], [])
-
- # no parameters in the statement, no parameters in the
- # compiled params - return binds for all columns
- if compiler.column_keys is None and compile_state._no_parameters:
- return _CrudParams(
- [
- (
- c,
- compiler.preparer.format_column(c),
- _create_bind_param(compiler, c, None, required=True),
- (c.key,),
- )
- for c in stmt.table.columns
- if not c._omit_from_statements
- ],
- [],
- )
-
- stmt_parameter_tuples: Optional[
- List[Tuple[Union[str, ColumnClause[Any]], Any]]
- ]
- spd: Optional[MutableMapping[_DMLColumnElement, Any]]
-
- if (
- _compile_state_isinsert(compile_state)
- and compile_state._has_multi_parameters
- ):
- mp = compile_state._multi_parameters
- assert mp is not None
- spd = mp[0]
- stmt_parameter_tuples = list(spd.items())
- spd_str_key = {_column_as_key(key) for key in spd}
- elif compile_state._ordered_values:
- spd = compile_state._dict_parameters
- stmt_parameter_tuples = compile_state._ordered_values
- assert spd is not None
- spd_str_key = {_column_as_key(key) for key in spd}
- elif compile_state._dict_parameters:
- spd = compile_state._dict_parameters
- stmt_parameter_tuples = list(spd.items())
- spd_str_key = {_column_as_key(key) for key in spd}
- else:
- stmt_parameter_tuples = spd = spd_str_key = None
-
- # if we have statement parameters - set defaults in the
- # compiled params
- if compiler.column_keys is None:
- parameters = {}
- elif stmt_parameter_tuples:
- assert spd_str_key is not None
- parameters = {
- _column_as_key(key): REQUIRED
- for key in compiler.column_keys
- if key not in spd_str_key
- }
- else:
- parameters = {
- _column_as_key(key): REQUIRED for key in compiler.column_keys
- }
-
- # create a list of column assignment clauses as tuples
- values: List[_CrudParamElement] = []
-
- if stmt_parameter_tuples is not None:
- _get_stmt_parameter_tuples_params(
- compiler,
- compile_state,
- parameters,
- stmt_parameter_tuples,
- _column_as_key,
- values,
- kw,
- )
-
- check_columns: Dict[str, ColumnClause[Any]] = {}
-
- # special logic that only occurs for multi-table UPDATE
- # statements
- if dml.isupdate(compile_state) and compile_state.is_multitable:
- _get_update_multitable_params(
- compiler,
- stmt,
- compile_state,
- stmt_parameter_tuples,
- check_columns,
- _col_bind_name,
- _getattr_col_key,
- values,
- kw,
- )
-
- if _compile_state_isinsert(compile_state) and stmt._select_names:
- # is an insert from select, is not a multiparams
-
- assert not compile_state._has_multi_parameters
-
- _scan_insert_from_select_cols(
- compiler,
- stmt,
- compile_state,
- parameters,
- _getattr_col_key,
- _column_as_key,
- _col_bind_name,
- check_columns,
- values,
- toplevel,
- kw,
- )
- use_insertmanyvalues = False
- use_sentinel_columns = None
- else:
- use_insertmanyvalues, use_sentinel_columns = _scan_cols(
- compiler,
- stmt,
- compile_state,
- parameters,
- _getattr_col_key,
- _column_as_key,
- _col_bind_name,
- check_columns,
- values,
- toplevel,
- kw,
- )
-
- if parameters and stmt_parameter_tuples:
- check = (
- set(parameters)
- .intersection(_column_as_key(k) for k, v in stmt_parameter_tuples)
- .difference(check_columns)
- )
- if check:
- raise exc.CompileError(
- "Unconsumed column names: %s"
- % (", ".join("%s" % (c,) for c in check))
- )
-
- is_default_metavalue_only = False
-
- if (
- _compile_state_isinsert(compile_state)
- and compile_state._has_multi_parameters
- ):
- # is a multiparams, is not an insert from a select
- assert not stmt._select_names
- multi_extended_values = _extend_values_for_multiparams(
- compiler,
- stmt,
- compile_state,
- cast(
- "Sequence[_CrudParamElementStr]",
- values,
- ),
- cast("Callable[..., str]", _column_as_key),
- kw,
- )
- return _CrudParams(values, multi_extended_values)
- elif (
- not values
- and compiler.for_executemany
- and compiler.dialect.supports_default_metavalue
- ):
- # convert an "INSERT DEFAULT VALUES"
- # into INSERT (firstcol) VALUES (DEFAULT) which can be turned
- # into an in-place multi values. This supports
- # insert_executemany_returning mode :)
- values = [
- (
- _as_dml_column(stmt.table.columns[0]),
- compiler.preparer.format_column(stmt.table.columns[0]),
- compiler.dialect.default_metavalue_token,
- (),
- )
- ]
- is_default_metavalue_only = True
-
- return _CrudParams(
- values,
- [],
- is_default_metavalue_only=is_default_metavalue_only,
- use_insertmanyvalues=use_insertmanyvalues,
- use_sentinel_columns=use_sentinel_columns,
- )
-
-
-@overload
-def _create_bind_param(
- compiler: SQLCompiler,
- col: ColumnElement[Any],
- value: Any,
- process: Literal[True] = ...,
- required: bool = False,
- name: Optional[str] = None,
- **kw: Any,
-) -> str: ...
-
-
-@overload
-def _create_bind_param(
- compiler: SQLCompiler,
- col: ColumnElement[Any],
- value: Any,
- **kw: Any,
-) -> str: ...
-
-
-def _create_bind_param(
- compiler: SQLCompiler,
- col: ColumnElement[Any],
- value: Any,
- process: bool = True,
- required: bool = False,
- name: Optional[str] = None,
- **kw: Any,
-) -> Union[str, elements.BindParameter[Any]]:
- if name is None:
- name = col.key
- bindparam = elements.BindParameter(
- name, value, type_=col.type, required=required
- )
- bindparam._is_crud = True
- if process:
- return bindparam._compiler_dispatch(compiler, **kw)
- else:
- return bindparam
-
-
-def _handle_values_anonymous_param(compiler, col, value, name, **kw):
- # the insert() and update() constructs as of 1.4 will now produce anonymous
- # bindparam() objects in the values() collections up front when given plain
- # literal values. This is so that cache key behaviors, which need to
- # produce bound parameters in deterministic order without invoking any
- # compilation here, can be applied to these constructs when they include
- # values() (but not yet multi-values, which are not included in caching
- # right now).
- #
- # in order to produce the desired "crud" style name for these parameters,
- # which will also be targetable in engine/default.py through the usual
- # conventions, apply our desired name to these unique parameters by
- # populating the compiler truncated names cache with the desired name,
- # rather than having
- # compiler.visit_bindparam()->compiler._truncated_identifier make up a
- # name. Saves on call counts also.
-
- # for INSERT/UPDATE that's a CTE, we don't need names to match to
- # external parameters and these would also conflict in the case where
- # multiple insert/update are combined together using CTEs
- is_cte = "visiting_cte" in kw
-
- if (
- not is_cte
- and value.unique
- and isinstance(value.key, elements._truncated_label)
- ):
- compiler.truncated_names[("bindparam", value.key)] = name
-
- if value.type._isnull:
- # either unique parameter, or other bound parameters that were
- # passed in directly
- # set type to that of the column unconditionally
- value = value._with_binary_element_type(col.type)
-
- return value._compiler_dispatch(compiler, **kw)
-
-
-def _key_getters_for_crud_column(
- compiler: SQLCompiler, stmt: ValuesBase, compile_state: DMLState
-) -> Tuple[
- Callable[[Union[str, ColumnClause[Any]]], Union[str, Tuple[str, str]]],
- Callable[[ColumnClause[Any]], Union[str, Tuple[str, str]]],
- _BindNameForColProtocol,
-]:
- if dml.isupdate(compile_state) and compile_state._extra_froms:
- # when extra tables are present, refer to the columns
- # in those extra tables as table-qualified, including in
- # dictionaries and when rendering bind param names.
- # the "main" table of the statement remains unqualified,
- # allowing the most compatibility with a non-multi-table
- # statement.
- _et = set(compile_state._extra_froms)
-
- c_key_role = functools.partial(
- coercions.expect_as_key, roles.DMLColumnRole
- )
-
- def _column_as_key(
- key: Union[ColumnClause[Any], str]
- ) -> Union[str, Tuple[str, str]]:
- str_key = c_key_role(key)
- if hasattr(key, "table") and key.table in _et:
- return (key.table.name, str_key) # type: ignore
- else:
- return str_key
-
- def _getattr_col_key(
- col: ColumnClause[Any],
- ) -> Union[str, Tuple[str, str]]:
- if col.table in _et:
- return (col.table.name, col.key) # type: ignore
- else:
- return col.key
-
- def _col_bind_name(col: ColumnClause[Any]) -> str:
- if col.table in _et:
- if TYPE_CHECKING:
- assert isinstance(col.table, TableClause)
- return "%s_%s" % (col.table.name, col.key)
- else:
- return col.key
-
- else:
- _column_as_key = functools.partial(
- coercions.expect_as_key, roles.DMLColumnRole
- )
- _getattr_col_key = _col_bind_name = operator.attrgetter("key") # type: ignore # noqa: E501
-
- return _column_as_key, _getattr_col_key, _col_bind_name
-
-
-def _scan_insert_from_select_cols(
- compiler,
- stmt,
- compile_state,
- parameters,
- _getattr_col_key,
- _column_as_key,
- _col_bind_name,
- check_columns,
- values,
- toplevel,
- kw,
-):
- cols = [stmt.table.c[_column_as_key(name)] for name in stmt._select_names]
-
- assert compiler.stack[-1]["selectable"] is stmt
-
- compiler.stack[-1]["insert_from_select"] = stmt.select
-
- add_select_cols: List[_CrudParamElementSQLExpr] = []
- if stmt.include_insert_from_select_defaults:
- col_set = set(cols)
- for col in stmt.table.columns:
- # omit columns that were not in the SELECT statement.
- # this will omit columns marked as omit_from_statements naturally,
- # as long as that col was not explicit in the SELECT.
- # if an omit_from_statements col has a "default" on it, then
- # we need to include it, as these defaults should still fire off.
- # but, if it has that default and it's the "sentinel" default,
- # we don't do sentinel default operations for insert_from_select
- # here so we again omit it.
- if (
- col not in col_set
- and col.default
- and not col.default.is_sentinel
- ):
- cols.append(col)
-
- for c in cols:
- col_key = _getattr_col_key(c)
- if col_key in parameters and col_key not in check_columns:
- parameters.pop(col_key)
- values.append((c, compiler.preparer.format_column(c), None, ()))
- else:
- _append_param_insert_select_hasdefault(
- compiler, stmt, c, add_select_cols, kw
- )
-
- if add_select_cols:
- values.extend(add_select_cols)
- ins_from_select = compiler.stack[-1]["insert_from_select"]
- if not isinstance(ins_from_select, Select):
- raise exc.CompileError(
- f"Can't extend statement for INSERT..FROM SELECT to include "
- f"additional default-holding column(s) "
- f"""{
- ', '.join(repr(key) for _, key, _, _ in add_select_cols)
- }. Convert the selectable to a subquery() first, or pass """
- "include_defaults=False to Insert.from_select() to skip these "
- "columns."
- )
- ins_from_select = ins_from_select._generate()
- # copy raw_columns
- ins_from_select._raw_columns = list(ins_from_select._raw_columns) + [
- expr for _, _, expr, _ in add_select_cols
- ]
- compiler.stack[-1]["insert_from_select"] = ins_from_select
-
-
-def _scan_cols(
- compiler,
- stmt,
- compile_state,
- parameters,
- _getattr_col_key,
- _column_as_key,
- _col_bind_name,
- check_columns,
- values,
- toplevel,
- kw,
-):
- (
- need_pks,
- implicit_returning,
- implicit_return_defaults,
- postfetch_lastrowid,
- use_insertmanyvalues,
- use_sentinel_columns,
- ) = _get_returning_modifiers(compiler, stmt, compile_state, toplevel)
-
- assert compile_state.isupdate or compile_state.isinsert
-
- if compile_state._parameter_ordering:
- parameter_ordering = [
- _column_as_key(key) for key in compile_state._parameter_ordering
- ]
- ordered_keys = set(parameter_ordering)
- cols = [
- stmt.table.c[key]
- for key in parameter_ordering
- if isinstance(key, str) and key in stmt.table.c
- ] + [c for c in stmt.table.c if c.key not in ordered_keys]
-
- else:
- cols = stmt.table.columns
-
- isinsert = _compile_state_isinsert(compile_state)
- if isinsert and not compile_state._has_multi_parameters:
- # new rules for #7998. fetch lastrowid or implicit returning
- # for autoincrement column even if parameter is NULL, for DBs that
- # override NULL param for primary key (sqlite, mysql/mariadb)
- autoincrement_col = stmt.table._autoincrement_column
- insert_null_pk_still_autoincrements = (
- compiler.dialect.insert_null_pk_still_autoincrements
- )
- else:
- autoincrement_col = insert_null_pk_still_autoincrements = None
-
- if stmt._supplemental_returning:
- supplemental_returning = set(stmt._supplemental_returning)
- else:
- supplemental_returning = set()
-
- compiler_implicit_returning = compiler.implicit_returning
-
- # TODO - see TODO(return_defaults_columns) below
- # cols_in_params = set()
-
- for c in cols:
- # scan through every column in the target table
-
- col_key = _getattr_col_key(c)
-
- if col_key in parameters and col_key not in check_columns:
- # parameter is present for the column. use that.
-
- _append_param_parameter(
- compiler,
- stmt,
- compile_state,
- c,
- col_key,
- parameters,
- _col_bind_name,
- implicit_returning,
- implicit_return_defaults,
- postfetch_lastrowid,
- values,
- autoincrement_col,
- insert_null_pk_still_autoincrements,
- kw,
- )
-
- # TODO - see TODO(return_defaults_columns) below
- # cols_in_params.add(c)
-
- elif isinsert:
- # no parameter is present and it's an insert.
-
- if c.primary_key and need_pks:
- # it's a primary key column, it will need to be generated by a
- # default generator of some kind, and the statement expects
- # inserted_primary_key to be available.
-
- if implicit_returning:
- # we can use RETURNING, find out how to invoke this
- # column and get the value where RETURNING is an option.
- # we can inline server-side functions in this case.
-
- _append_param_insert_pk_returning(
- compiler, stmt, c, values, kw
- )
- else:
- # otherwise, find out how to invoke this column
- # and get its value where RETURNING is not an option.
- # if we have to invoke a server-side function, we need
- # to pre-execute it. or if this is a straight
- # autoincrement column and the dialect supports it
- # we can use cursor.lastrowid.
-
- _append_param_insert_pk_no_returning(
- compiler, stmt, c, values, kw
- )
-
- elif c.default is not None:
- # column has a default, but it's not a pk column, or it is but
- # we don't need to get the pk back.
- if not c.default.is_sentinel or (
- use_sentinel_columns is not None
- ):
- _append_param_insert_hasdefault(
- compiler, stmt, c, implicit_return_defaults, values, kw
- )
-
- elif c.server_default is not None:
- # column has a DDL-level default, and is either not a pk
- # column or we don't need the pk.
- if implicit_return_defaults and c in implicit_return_defaults:
- compiler_implicit_returning.append(c)
- elif not c.primary_key:
- compiler.postfetch.append(c)
-
- elif implicit_return_defaults and c in implicit_return_defaults:
- compiler_implicit_returning.append(c)
-
- elif (
- c.primary_key
- and c is not stmt.table._autoincrement_column
- and not c.nullable
- ):
- _warn_pk_with_no_anticipated_value(c)
-
- elif compile_state.isupdate:
- # no parameter is present and it's an insert.
-
- _append_param_update(
- compiler,
- compile_state,
- stmt,
- c,
- implicit_return_defaults,
- values,
- kw,
- )
-
- # adding supplemental cols to implicit_returning in table
- # order so that order is maintained between multiple INSERT
- # statements which may have different parameters included, but all
- # have the same RETURNING clause
- if (
- c in supplemental_returning
- and c not in compiler_implicit_returning
- ):
- compiler_implicit_returning.append(c)
-
- if supplemental_returning:
- # we should have gotten every col into implicit_returning,
- # however supplemental returning can also have SQL functions etc.
- # in it
- remaining_supplemental = supplemental_returning.difference(
- compiler_implicit_returning
- )
- compiler_implicit_returning.extend(
- c
- for c in stmt._supplemental_returning
- if c in remaining_supplemental
- )
-
- # TODO(return_defaults_columns): there can still be more columns in
- # _return_defaults_columns in the case that they are from something like an
- # aliased of the table. we can add them here, however this breaks other ORM
- # things. so this is for another day. see
- # test/orm/dml/test_update_delete_where.py -> test_update_from_alias
-
- # if stmt._return_defaults_columns:
- # compiler_implicit_returning.extend(
- # set(stmt._return_defaults_columns)
- # .difference(compiler_implicit_returning)
- # .difference(cols_in_params)
- # )
-
- return (use_insertmanyvalues, use_sentinel_columns)
-
-
-def _setup_delete_return_defaults(
- compiler,
- stmt,
- compile_state,
- parameters,
- _getattr_col_key,
- _column_as_key,
- _col_bind_name,
- check_columns,
- values,
- toplevel,
- kw,
-):
- (_, _, implicit_return_defaults, *_) = _get_returning_modifiers(
- compiler, stmt, compile_state, toplevel
- )
-
- if not implicit_return_defaults:
- return
-
- if stmt._return_defaults_columns:
- compiler.implicit_returning.extend(implicit_return_defaults)
-
- if stmt._supplemental_returning:
- ir_set = set(compiler.implicit_returning)
- compiler.implicit_returning.extend(
- c for c in stmt._supplemental_returning if c not in ir_set
- )
-
-
-def _append_param_parameter(
- compiler,
- stmt,
- compile_state,
- c,
- col_key,
- parameters,
- _col_bind_name,
- implicit_returning,
- implicit_return_defaults,
- postfetch_lastrowid,
- values,
- autoincrement_col,
- insert_null_pk_still_autoincrements,
- kw,
-):
- value = parameters.pop(col_key)
-
- col_value = compiler.preparer.format_column(
- c, use_table=compile_state.include_table_with_column_exprs
- )
-
- accumulated_bind_names: Set[str] = set()
-
- if coercions._is_literal(value):
- if (
- insert_null_pk_still_autoincrements
- and c.primary_key
- and c is autoincrement_col
- ):
- # support use case for #7998, fetch autoincrement cols
- # even if value was given.
-
- if postfetch_lastrowid:
- compiler.postfetch_lastrowid = True
- elif implicit_returning:
- compiler.implicit_returning.append(c)
-
- value = _create_bind_param(
- compiler,
- c,
- value,
- required=value is REQUIRED,
- name=(
- _col_bind_name(c)
- if not _compile_state_isinsert(compile_state)
- or not compile_state._has_multi_parameters
- else "%s_m0" % _col_bind_name(c)
- ),
- accumulate_bind_names=accumulated_bind_names,
- **kw,
- )
- elif value._is_bind_parameter:
- if (
- insert_null_pk_still_autoincrements
- and value.value is None
- and c.primary_key
- and c is autoincrement_col
- ):
- # support use case for #7998, fetch autoincrement cols
- # even if value was given
- if implicit_returning:
- compiler.implicit_returning.append(c)
- elif compiler.dialect.postfetch_lastrowid:
- compiler.postfetch_lastrowid = True
-
- value = _handle_values_anonymous_param(
- compiler,
- c,
- value,
- name=(
- _col_bind_name(c)
- if not _compile_state_isinsert(compile_state)
- or not compile_state._has_multi_parameters
- else "%s_m0" % _col_bind_name(c)
- ),
- accumulate_bind_names=accumulated_bind_names,
- **kw,
- )
- else:
- # value is a SQL expression
- value = compiler.process(
- value.self_group(),
- accumulate_bind_names=accumulated_bind_names,
- **kw,
- )
-
- if compile_state.isupdate:
- if implicit_return_defaults and c in implicit_return_defaults:
- compiler.implicit_returning.append(c)
-
- else:
- compiler.postfetch.append(c)
- else:
- if c.primary_key:
- if implicit_returning:
- compiler.implicit_returning.append(c)
- elif compiler.dialect.postfetch_lastrowid:
- compiler.postfetch_lastrowid = True
-
- elif implicit_return_defaults and (c in implicit_return_defaults):
- compiler.implicit_returning.append(c)
-
- else:
- # postfetch specifically means, "we can SELECT the row we just
- # inserted by primary key to get back the server generated
- # defaults". so by definition this can't be used to get the
- # primary key value back, because we need to have it ahead of
- # time.
-
- compiler.postfetch.append(c)
-
- values.append((c, col_value, value, accumulated_bind_names))
-
-
-def _append_param_insert_pk_returning(compiler, stmt, c, values, kw):
- """Create a primary key expression in the INSERT statement where
- we want to populate result.inserted_primary_key and RETURNING
- is available.
-
- """
- if c.default is not None:
- if c.default.is_sequence:
- if compiler.dialect.supports_sequences and (
- not c.default.optional
- or not compiler.dialect.sequences_optional
- ):
- accumulated_bind_names: Set[str] = set()
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- compiler.process(
- c.default,
- accumulate_bind_names=accumulated_bind_names,
- **kw,
- ),
- accumulated_bind_names,
- )
- )
- compiler.implicit_returning.append(c)
- elif c.default.is_clause_element:
- accumulated_bind_names = set()
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- compiler.process(
- c.default.arg.self_group(),
- accumulate_bind_names=accumulated_bind_names,
- **kw,
- ),
- accumulated_bind_names,
- )
- )
- compiler.implicit_returning.append(c)
- else:
- # client side default. OK we can't use RETURNING, need to
- # do a "prefetch", which in fact fetches the default value
- # on the Python side
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- _create_insert_prefetch_bind_param(compiler, c, **kw),
- (c.key,),
- )
- )
- elif c is stmt.table._autoincrement_column or c.server_default is not None:
- compiler.implicit_returning.append(c)
- elif not c.nullable:
- # no .default, no .server_default, not autoincrement, we have
- # no indication this primary key column will have any value
- _warn_pk_with_no_anticipated_value(c)
-
-
-def _append_param_insert_pk_no_returning(compiler, stmt, c, values, kw):
- """Create a primary key expression in the INSERT statement where
- we want to populate result.inserted_primary_key and we cannot use
- RETURNING.
-
- Depending on the kind of default here we may create a bound parameter
- in the INSERT statement and pre-execute a default generation function,
- or we may use cursor.lastrowid if supported by the dialect.
-
-
- """
-
- if (
- # column has a Python-side default
- c.default is not None
- and (
- # and it either is not a sequence, or it is and we support
- # sequences and want to invoke it
- not c.default.is_sequence
- or (
- compiler.dialect.supports_sequences
- and (
- not c.default.optional
- or not compiler.dialect.sequences_optional
- )
- )
- )
- ) or (
- # column is the "autoincrement column"
- c is stmt.table._autoincrement_column
- and (
- # dialect can't use cursor.lastrowid
- not compiler.dialect.postfetch_lastrowid
- and (
- # column has a Sequence and we support those
- (
- c.default is not None
- and c.default.is_sequence
- and compiler.dialect.supports_sequences
- )
- or
- # column has no default on it, but dialect can run the
- # "autoincrement" mechanism explicitly, e.g. PostgreSQL
- # SERIAL we know the sequence name
- (
- c.default is None
- and compiler.dialect.preexecute_autoincrement_sequences
- )
- )
- )
- ):
- # do a pre-execute of the default
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- _create_insert_prefetch_bind_param(compiler, c, **kw),
- (c.key,),
- )
- )
- elif (
- c.default is None
- and c.server_default is None
- and not c.nullable
- and c is not stmt.table._autoincrement_column
- ):
- # no .default, no .server_default, not autoincrement, we have
- # no indication this primary key column will have any value
- _warn_pk_with_no_anticipated_value(c)
- elif compiler.dialect.postfetch_lastrowid:
- # finally, where it seems like there will be a generated primary key
- # value and we haven't set up any other way to fetch it, and the
- # dialect supports cursor.lastrowid, switch on the lastrowid flag so
- # that the DefaultExecutionContext calls upon cursor.lastrowid
- compiler.postfetch_lastrowid = True
-
-
-def _append_param_insert_hasdefault(
- compiler, stmt, c, implicit_return_defaults, values, kw
-):
- if c.default.is_sequence:
- if compiler.dialect.supports_sequences and (
- not c.default.optional or not compiler.dialect.sequences_optional
- ):
- accumulated_bind_names: Set[str] = set()
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- compiler.process(
- c.default,
- accumulate_bind_names=accumulated_bind_names,
- **kw,
- ),
- accumulated_bind_names,
- )
- )
- if implicit_return_defaults and c in implicit_return_defaults:
- compiler.implicit_returning.append(c)
- elif not c.primary_key:
- compiler.postfetch.append(c)
- elif c.default.is_clause_element:
- accumulated_bind_names = set()
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- compiler.process(
- c.default.arg.self_group(),
- accumulate_bind_names=accumulated_bind_names,
- **kw,
- ),
- accumulated_bind_names,
- )
- )
-
- if implicit_return_defaults and c in implicit_return_defaults:
- compiler.implicit_returning.append(c)
- elif not c.primary_key:
- # don't add primary key column to postfetch
- compiler.postfetch.append(c)
- else:
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- _create_insert_prefetch_bind_param(compiler, c, **kw),
- (c.key,),
- )
- )
-
-
-def _append_param_insert_select_hasdefault(
- compiler: SQLCompiler,
- stmt: ValuesBase,
- c: ColumnClause[Any],
- values: List[_CrudParamElementSQLExpr],
- kw: Dict[str, Any],
-) -> None:
- if default_is_sequence(c.default):
- if compiler.dialect.supports_sequences and (
- not c.default.optional or not compiler.dialect.sequences_optional
- ):
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- c.default.next_value(),
- (),
- )
- )
- elif default_is_clause_element(c.default):
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- c.default.arg.self_group(),
- (),
- )
- )
- else:
- values.append(
- (
- c,
- compiler.preparer.format_column(c),
- _create_insert_prefetch_bind_param(
- compiler, c, process=False, **kw
- ),
- (c.key,),
- )
- )
-
-
-def _append_param_update(
- compiler, compile_state, stmt, c, implicit_return_defaults, values, kw
-):
- include_table = compile_state.include_table_with_column_exprs
- if c.onupdate is not None and not c.onupdate.is_sequence:
- if c.onupdate.is_clause_element:
- values.append(
- (
- c,
- compiler.preparer.format_column(
- c,
- use_table=include_table,
- ),
- compiler.process(c.onupdate.arg.self_group(), **kw),
- (),
- )
- )
- if implicit_return_defaults and c in implicit_return_defaults:
- compiler.implicit_returning.append(c)
- else:
- compiler.postfetch.append(c)
- else:
- values.append(
- (
- c,
- compiler.preparer.format_column(
- c,
- use_table=include_table,
- ),
- _create_update_prefetch_bind_param(compiler, c, **kw),
- (c.key,),
- )
- )
- elif c.server_onupdate is not None:
- if implicit_return_defaults and c in implicit_return_defaults:
- compiler.implicit_returning.append(c)
- else:
- compiler.postfetch.append(c)
- elif (
- implicit_return_defaults
- and (stmt._return_defaults_columns or not stmt._return_defaults)
- and c in implicit_return_defaults
- ):
- compiler.implicit_returning.append(c)
-
-
-@overload
-def _create_insert_prefetch_bind_param(
- compiler: SQLCompiler,
- c: ColumnElement[Any],
- process: Literal[True] = ...,
- **kw: Any,
-) -> str: ...
-
-
-@overload
-def _create_insert_prefetch_bind_param(
- compiler: SQLCompiler,
- c: ColumnElement[Any],
- process: Literal[False],
- **kw: Any,
-) -> elements.BindParameter[Any]: ...
-
-
-def _create_insert_prefetch_bind_param(
- compiler: SQLCompiler,
- c: ColumnElement[Any],
- process: bool = True,
- name: Optional[str] = None,
- **kw: Any,
-) -> Union[elements.BindParameter[Any], str]:
- param = _create_bind_param(
- compiler, c, None, process=process, name=name, **kw
- )
- compiler.insert_prefetch.append(c) # type: ignore
- return param
-
-
-@overload
-def _create_update_prefetch_bind_param(
- compiler: SQLCompiler,
- c: ColumnElement[Any],
- process: Literal[True] = ...,
- **kw: Any,
-) -> str: ...
-
-
-@overload
-def _create_update_prefetch_bind_param(
- compiler: SQLCompiler,
- c: ColumnElement[Any],
- process: Literal[False],
- **kw: Any,
-) -> elements.BindParameter[Any]: ...
-
-
-def _create_update_prefetch_bind_param(
- compiler: SQLCompiler,
- c: ColumnElement[Any],
- process: bool = True,
- name: Optional[str] = None,
- **kw: Any,
-) -> Union[elements.BindParameter[Any], str]:
- param = _create_bind_param(
- compiler, c, None, process=process, name=name, **kw
- )
- compiler.update_prefetch.append(c) # type: ignore
- return param
-
-
-class _multiparam_column(elements.ColumnElement[Any]):
- _is_multiparam_column = True
-
- def __init__(self, original, index):
- self.index = index
- self.key = "%s_m%d" % (original.key, index + 1)
- self.original = original
- self.default = original.default
- self.type = original.type
-
- def compare(self, other, **kw):
- raise NotImplementedError()
-
- def _copy_internals(self, other, **kw):
- raise NotImplementedError()
-
- def __eq__(self, other):
- return (
- isinstance(other, _multiparam_column)
- and other.key == self.key
- and other.original == self.original
- )
-
- @util.memoized_property
- def _default_description_tuple(self) -> _DefaultDescriptionTuple:
- """used by default.py -> _process_execute_defaults()"""
-
- return _DefaultDescriptionTuple._from_column_default(self.default)
-
- @util.memoized_property
- def _onupdate_description_tuple(self) -> _DefaultDescriptionTuple:
- """used by default.py -> _process_execute_defaults()"""
-
- return _DefaultDescriptionTuple._from_column_default(self.onupdate)
-
-
-def _process_multiparam_default_bind(
- compiler: SQLCompiler,
- stmt: ValuesBase,
- c: KeyedColumnElement[Any],
- index: int,
- kw: Dict[str, Any],
-) -> str:
- if not c.default:
- raise exc.CompileError(
- "INSERT value for column %s is explicitly rendered as a bound"
- "parameter in the VALUES clause; "
- "a Python-side value or SQL expression is required" % c
- )
- elif default_is_clause_element(c.default):
- return compiler.process(c.default.arg.self_group(), **kw)
- elif c.default.is_sequence:
- # these conditions would have been established
- # by append_param_insert_(?:hasdefault|pk_returning|pk_no_returning)
- # in order for us to be here, so these don't need to be
- # checked
- # assert compiler.dialect.supports_sequences and (
- # not c.default.optional
- # or not compiler.dialect.sequences_optional
- # )
- return compiler.process(c.default, **kw)
- else:
- col = _multiparam_column(c, index)
- assert isinstance(stmt, dml.Insert)
- return _create_insert_prefetch_bind_param(
- compiler, col, process=True, **kw
- )
-
-
-def _get_update_multitable_params(
- compiler,
- stmt,
- compile_state,
- stmt_parameter_tuples,
- check_columns,
- _col_bind_name,
- _getattr_col_key,
- values,
- kw,
-):
- normalized_params = {
- coercions.expect(roles.DMLColumnRole, c): param
- for c, param in stmt_parameter_tuples or ()
- }
-
- include_table = compile_state.include_table_with_column_exprs
-
- affected_tables = set()
- for t in compile_state._extra_froms:
- for c in t.c:
- if c in normalized_params:
- affected_tables.add(t)
- check_columns[_getattr_col_key(c)] = c
- value = normalized_params[c]
-
- col_value = compiler.process(c, include_table=include_table)
- if coercions._is_literal(value):
- value = _create_bind_param(
- compiler,
- c,
- value,
- required=value is REQUIRED,
- name=_col_bind_name(c),
- **kw, # TODO: no test coverage for literal binds here
- )
- accumulated_bind_names: Iterable[str] = (c.key,)
- elif value._is_bind_parameter:
- cbn = _col_bind_name(c)
- value = _handle_values_anonymous_param(
- compiler, c, value, name=cbn, **kw
- )
- accumulated_bind_names = (cbn,)
- else:
- compiler.postfetch.append(c)
- value = compiler.process(value.self_group(), **kw)
- accumulated_bind_names = ()
- values.append((c, col_value, value, accumulated_bind_names))
- # determine tables which are actually to be updated - process onupdate
- # and server_onupdate for these
- for t in affected_tables:
- for c in t.c:
- if c in normalized_params:
- continue
- elif c.onupdate is not None and not c.onupdate.is_sequence:
- if c.onupdate.is_clause_element:
- values.append(
- (
- c,
- compiler.process(c, include_table=include_table),
- compiler.process(
- c.onupdate.arg.self_group(), **kw
- ),
- (),
- )
- )
- compiler.postfetch.append(c)
- else:
- values.append(
- (
- c,
- compiler.process(c, include_table=include_table),
- _create_update_prefetch_bind_param(
- compiler, c, name=_col_bind_name(c), **kw
- ),
- (c.key,),
- )
- )
- elif c.server_onupdate is not None:
- compiler.postfetch.append(c)
-
-
-def _extend_values_for_multiparams(
- compiler: SQLCompiler,
- stmt: ValuesBase,
- compile_state: DMLState,
- initial_values: Sequence[_CrudParamElementStr],
- _column_as_key: Callable[..., str],
- kw: Dict[str, Any],
-) -> List[Sequence[_CrudParamElementStr]]:
- values_0 = initial_values
- values = [initial_values]
-
- mp = compile_state._multi_parameters
- assert mp is not None
- for i, row in enumerate(mp[1:]):
- extension: List[_CrudParamElementStr] = []
-
- row = {_column_as_key(key): v for key, v in row.items()}
-
- for col, col_expr, param, accumulated_names in values_0:
- if col.key in row:
- key = col.key
-
- if coercions._is_literal(row[key]):
- new_param = _create_bind_param(
- compiler,
- col,
- row[key],
- name="%s_m%d" % (col.key, i + 1),
- **kw,
- )
- else:
- new_param = compiler.process(row[key].self_group(), **kw)
- else:
- new_param = _process_multiparam_default_bind(
- compiler, stmt, col, i, kw
- )
-
- extension.append((col, col_expr, new_param, accumulated_names))
-
- values.append(extension)
-
- return values
-
-
-def _get_stmt_parameter_tuples_params(
- compiler,
- compile_state,
- parameters,
- stmt_parameter_tuples,
- _column_as_key,
- values,
- kw,
-):
- for k, v in stmt_parameter_tuples:
- colkey = _column_as_key(k)
- if colkey is not None:
- parameters.setdefault(colkey, v)
- else:
- # a non-Column expression on the left side;
- # add it to values() in an "as-is" state,
- # coercing right side to bound param
-
- # note one of the main use cases for this is array slice
- # updates on PostgreSQL, as the left side is also an expression.
-
- col_expr = compiler.process(
- k, include_table=compile_state.include_table_with_column_exprs
- )
-
- if coercions._is_literal(v):
- v = compiler.process(
- elements.BindParameter(None, v, type_=k.type), **kw
- )
- else:
- if v._is_bind_parameter and v.type._isnull:
- # either unique parameter, or other bound parameters that
- # were passed in directly
- # set type to that of the column unconditionally
- v = v._with_binary_element_type(k.type)
-
- v = compiler.process(v.self_group(), **kw)
-
- # TODO: not sure if accumulated_bind_names applies here
- values.append((k, col_expr, v, ()))
-
-
-def _get_returning_modifiers(compiler, stmt, compile_state, toplevel):
- """determines RETURNING strategy, if any, for the statement.
-
- This is where it's determined what we need to fetch from the
- INSERT or UPDATE statement after it's invoked.
-
- """
-
- dialect = compiler.dialect
-
- need_pks = (
- toplevel
- and _compile_state_isinsert(compile_state)
- and not stmt._inline
- and (
- not compiler.for_executemany
- or (dialect.insert_executemany_returning and stmt._return_defaults)
- )
- and not stmt._returning
- # and (not stmt._returning or stmt._return_defaults)
- and not compile_state._has_multi_parameters
- )
-
- # check if we have access to simple cursor.lastrowid. we can use that
- # after the INSERT if that's all we need.
- postfetch_lastrowid = (
- need_pks
- and dialect.postfetch_lastrowid
- and stmt.table._autoincrement_column is not None
- )
-
- # see if we want to add RETURNING to an INSERT in order to get
- # primary key columns back. This would be instead of postfetch_lastrowid
- # if that's set.
- implicit_returning = (
- # statement itself can veto it
- need_pks
- # the dialect can veto it if it just doesnt support RETURNING
- # with INSERT
- and dialect.insert_returning
- # user-defined implicit_returning on Table can veto it
- and compile_state._primary_table.implicit_returning
- # the compile_state can veto it (SQlite uses this to disable
- # RETURNING for an ON CONFLICT insert, as SQLite does not return
- # for rows that were updated, which is wrong)
- and compile_state._supports_implicit_returning
- and (
- # since we support MariaDB and SQLite which also support lastrowid,
- # decide if we should use lastrowid or RETURNING. for insert
- # that didnt call return_defaults() and has just one set of
- # parameters, we can use lastrowid. this is more "traditional"
- # and a lot of weird use cases are supported by it.
- # SQLite lastrowid times 3x faster than returning,
- # Mariadb lastrowid 2x faster than returning
- (not postfetch_lastrowid or dialect.favor_returning_over_lastrowid)
- or compile_state._has_multi_parameters
- or stmt._return_defaults
- )
- )
- if implicit_returning:
- postfetch_lastrowid = False
-
- if _compile_state_isinsert(compile_state):
- should_implicit_return_defaults = (
- implicit_returning and stmt._return_defaults
- )
- explicit_returning = (
- should_implicit_return_defaults
- or stmt._returning
- or stmt._supplemental_returning
- )
- use_insertmanyvalues = (
- toplevel
- and compiler.for_executemany
- and dialect.use_insertmanyvalues
- and (
- explicit_returning or dialect.use_insertmanyvalues_wo_returning
- )
- )
-
- use_sentinel_columns = None
- if (
- use_insertmanyvalues
- and explicit_returning
- and stmt._sort_by_parameter_order
- ):
- use_sentinel_columns = compiler._get_sentinel_column_for_table(
- stmt.table
- )
-
- elif compile_state.isupdate:
- should_implicit_return_defaults = (
- stmt._return_defaults
- and compile_state._primary_table.implicit_returning
- and compile_state._supports_implicit_returning
- and dialect.update_returning
- )
- use_insertmanyvalues = False
- use_sentinel_columns = None
- elif compile_state.isdelete:
- should_implicit_return_defaults = (
- stmt._return_defaults
- and compile_state._primary_table.implicit_returning
- and compile_state._supports_implicit_returning
- and dialect.delete_returning
- )
- use_insertmanyvalues = False
- use_sentinel_columns = None
- else:
- should_implicit_return_defaults = False # pragma: no cover
- use_insertmanyvalues = False
- use_sentinel_columns = None
-
- if should_implicit_return_defaults:
- if not stmt._return_defaults_columns:
- # TODO: this is weird. See #9685 where we have to
- # take an extra step to prevent this from happening. why
- # would this ever be *all* columns? but if we set to blank, then
- # that seems to break things also in the ORM. So we should
- # try to clean this up and figure out what return_defaults
- # needs to do w/ the ORM etc. here
- implicit_return_defaults = set(stmt.table.c)
- else:
- implicit_return_defaults = set(stmt._return_defaults_columns)
- else:
- implicit_return_defaults = None
-
- return (
- need_pks,
- implicit_returning or should_implicit_return_defaults,
- implicit_return_defaults,
- postfetch_lastrowid,
- use_insertmanyvalues,
- use_sentinel_columns,
- )
-
-
-def _warn_pk_with_no_anticipated_value(c):
- msg = (
- "Column '%s.%s' is marked as a member of the "
- "primary key for table '%s', "
- "but has no Python-side or server-side default generator indicated, "
- "nor does it indicate 'autoincrement=True' or 'nullable=True', "
- "and no explicit value is passed. "
- "Primary key columns typically may not store NULL."
- % (c.table.fullname, c.name, c.table.fullname)
- )
- if len(c.table.primary_key) > 1:
- msg += (
- " Note that as of SQLAlchemy 1.1, 'autoincrement=True' must be "
- "indicated explicitly for composite (e.g. multicolumn) primary "
- "keys if AUTO_INCREMENT/SERIAL/IDENTITY "
- "behavior is expected for one of the columns in the primary key. "
- "CREATE TABLE statements are impacted by this change as well on "
- "most backends."
- )
- util.warn(msg)