summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/testing/suite/test_insert.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/testing/suite/test_insert.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/suite/test_insert.py630
1 files changed, 630 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/suite/test_insert.py b/venv/lib/python3.11/site-packages/sqlalchemy/testing/suite/test_insert.py
new file mode 100644
index 0000000..1cff044
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/testing/suite/test_insert.py
@@ -0,0 +1,630 @@
+# testing/suite/test_insert.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+from decimal import Decimal
+import uuid
+
+from . import testing
+from .. import fixtures
+from ..assertions import eq_
+from ..config import requirements
+from ..schema import Column
+from ..schema import Table
+from ... import Double
+from ... import Float
+from ... import Identity
+from ... import Integer
+from ... import literal
+from ... import literal_column
+from ... import Numeric
+from ... import select
+from ... import String
+from ...types import LargeBinary
+from ...types import UUID
+from ...types import Uuid
+
+
+class LastrowidTest(fixtures.TablesTest):
+ run_deletes = "each"
+
+ __backend__ = True
+
+ __requires__ = "implements_get_lastrowid", "autoincrement_insert"
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "autoinc_pk",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ implicit_returning=False,
+ )
+
+ Table(
+ "manual_pk",
+ metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("data", String(50)),
+ implicit_returning=False,
+ )
+
+ def _assert_round_trip(self, table, conn):
+ row = conn.execute(table.select()).first()
+ eq_(
+ row,
+ (
+ conn.dialect.default_sequence_base,
+ "some data",
+ ),
+ )
+
+ def test_autoincrement_on_insert(self, connection):
+ connection.execute(
+ self.tables.autoinc_pk.insert(), dict(data="some data")
+ )
+ self._assert_round_trip(self.tables.autoinc_pk, connection)
+
+ def test_last_inserted_id(self, connection):
+ r = connection.execute(
+ self.tables.autoinc_pk.insert(), dict(data="some data")
+ )
+ pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
+ eq_(r.inserted_primary_key, (pk,))
+
+ @requirements.dbapi_lastrowid
+ def test_native_lastrowid_autoinc(self, connection):
+ r = connection.execute(
+ self.tables.autoinc_pk.insert(), dict(data="some data")
+ )
+ lastrowid = r.lastrowid
+ pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
+ eq_(lastrowid, pk)
+
+
+class InsertBehaviorTest(fixtures.TablesTest):
+ run_deletes = "each"
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "autoinc_pk",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ )
+ Table(
+ "manual_pk",
+ metadata,
+ Column("id", Integer, primary_key=True, autoincrement=False),
+ Column("data", String(50)),
+ )
+ Table(
+ "no_implicit_returning",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ implicit_returning=False,
+ )
+ Table(
+ "includes_defaults",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ Column("x", Integer, default=5),
+ Column(
+ "y",
+ Integer,
+ default=literal_column("2", type_=Integer) + literal(2),
+ ),
+ )
+
+ @testing.variation("style", ["plain", "return_defaults"])
+ @testing.variation("executemany", [True, False])
+ def test_no_results_for_non_returning_insert(
+ self, connection, style, executemany
+ ):
+ """test another INSERT issue found during #10453"""
+
+ table = self.tables.no_implicit_returning
+
+ stmt = table.insert()
+ if style.return_defaults:
+ stmt = stmt.return_defaults()
+
+ if executemany:
+ data = [
+ {"data": "d1"},
+ {"data": "d2"},
+ {"data": "d3"},
+ {"data": "d4"},
+ {"data": "d5"},
+ ]
+ else:
+ data = {"data": "d1"}
+
+ r = connection.execute(stmt, data)
+ assert not r.returns_rows
+
+ @requirements.autoincrement_insert
+ def test_autoclose_on_insert(self, connection):
+ r = connection.execute(
+ self.tables.autoinc_pk.insert(), dict(data="some data")
+ )
+ assert r._soft_closed
+ assert not r.closed
+ assert r.is_insert
+
+ # new as of I8091919d45421e3f53029b8660427f844fee0228; for the moment
+ # an insert where the PK was taken from a row that the dialect
+ # selected, as is the case for mssql/pyodbc, will still report
+ # returns_rows as true because there's a cursor description. in that
+ # case, the row had to have been consumed at least.
+ assert not r.returns_rows or r.fetchone() is None
+
+ @requirements.insert_returning
+ def test_autoclose_on_insert_implicit_returning(self, connection):
+ r = connection.execute(
+ # return_defaults() ensures RETURNING will be used,
+ # new in 2.0 as sqlite/mariadb offer both RETURNING and
+ # cursor.lastrowid
+ self.tables.autoinc_pk.insert().return_defaults(),
+ dict(data="some data"),
+ )
+ assert r._soft_closed
+ assert not r.closed
+ assert r.is_insert
+
+ # note we are experimenting with having this be True
+ # as of I8091919d45421e3f53029b8660427f844fee0228 .
+ # implicit returning has fetched the row, but it still is a
+ # "returns rows"
+ assert r.returns_rows
+
+ # and we should be able to fetchone() on it, we just get no row
+ eq_(r.fetchone(), None)
+
+ # and the keys, etc.
+ eq_(r.keys(), ["id"])
+
+ # but the dialect took in the row already. not really sure
+ # what the best behavior is.
+
+ @requirements.empty_inserts
+ def test_empty_insert(self, connection):
+ r = connection.execute(self.tables.autoinc_pk.insert())
+ assert r._soft_closed
+ assert not r.closed
+
+ r = connection.execute(
+ self.tables.autoinc_pk.select().where(
+ self.tables.autoinc_pk.c.id != None
+ )
+ )
+ eq_(len(r.all()), 1)
+
+ @requirements.empty_inserts_executemany
+ def test_empty_insert_multiple(self, connection):
+ r = connection.execute(self.tables.autoinc_pk.insert(), [{}, {}, {}])
+ assert r._soft_closed
+ assert not r.closed
+
+ r = connection.execute(
+ self.tables.autoinc_pk.select().where(
+ self.tables.autoinc_pk.c.id != None
+ )
+ )
+
+ eq_(len(r.all()), 3)
+
+ @requirements.insert_from_select
+ def test_insert_from_select_autoinc(self, connection):
+ src_table = self.tables.manual_pk
+ dest_table = self.tables.autoinc_pk
+ connection.execute(
+ src_table.insert(),
+ [
+ dict(id=1, data="data1"),
+ dict(id=2, data="data2"),
+ dict(id=3, data="data3"),
+ ],
+ )
+
+ result = connection.execute(
+ dest_table.insert().from_select(
+ ("data",),
+ select(src_table.c.data).where(
+ src_table.c.data.in_(["data2", "data3"])
+ ),
+ )
+ )
+
+ eq_(result.inserted_primary_key, (None,))
+
+ result = connection.execute(
+ select(dest_table.c.data).order_by(dest_table.c.data)
+ )
+ eq_(result.fetchall(), [("data2",), ("data3",)])
+
+ @requirements.insert_from_select
+ def test_insert_from_select_autoinc_no_rows(self, connection):
+ src_table = self.tables.manual_pk
+ dest_table = self.tables.autoinc_pk
+
+ result = connection.execute(
+ dest_table.insert().from_select(
+ ("data",),
+ select(src_table.c.data).where(
+ src_table.c.data.in_(["data2", "data3"])
+ ),
+ )
+ )
+ eq_(result.inserted_primary_key, (None,))
+
+ result = connection.execute(
+ select(dest_table.c.data).order_by(dest_table.c.data)
+ )
+
+ eq_(result.fetchall(), [])
+
+ @requirements.insert_from_select
+ def test_insert_from_select(self, connection):
+ table = self.tables.manual_pk
+ connection.execute(
+ table.insert(),
+ [
+ dict(id=1, data="data1"),
+ dict(id=2, data="data2"),
+ dict(id=3, data="data3"),
+ ],
+ )
+
+ connection.execute(
+ table.insert()
+ .inline()
+ .from_select(
+ ("id", "data"),
+ select(table.c.id + 5, table.c.data).where(
+ table.c.data.in_(["data2", "data3"])
+ ),
+ )
+ )
+
+ eq_(
+ connection.execute(
+ select(table.c.data).order_by(table.c.data)
+ ).fetchall(),
+ [("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
+ )
+
+ @requirements.insert_from_select
+ def test_insert_from_select_with_defaults(self, connection):
+ table = self.tables.includes_defaults
+ connection.execute(
+ table.insert(),
+ [
+ dict(id=1, data="data1"),
+ dict(id=2, data="data2"),
+ dict(id=3, data="data3"),
+ ],
+ )
+
+ connection.execute(
+ table.insert()
+ .inline()
+ .from_select(
+ ("id", "data"),
+ select(table.c.id + 5, table.c.data).where(
+ table.c.data.in_(["data2", "data3"])
+ ),
+ )
+ )
+
+ eq_(
+ connection.execute(
+ select(table).order_by(table.c.data, table.c.id)
+ ).fetchall(),
+ [
+ (1, "data1", 5, 4),
+ (2, "data2", 5, 4),
+ (7, "data2", 5, 4),
+ (3, "data3", 5, 4),
+ (8, "data3", 5, 4),
+ ],
+ )
+
+
+class ReturningTest(fixtures.TablesTest):
+ run_create_tables = "each"
+ __requires__ = "insert_returning", "autoincrement_insert"
+ __backend__ = True
+
+ def _assert_round_trip(self, table, conn):
+ row = conn.execute(table.select()).first()
+ eq_(
+ row,
+ (
+ conn.dialect.default_sequence_base,
+ "some data",
+ ),
+ )
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "autoinc_pk",
+ metadata,
+ Column(
+ "id", Integer, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("data", String(50)),
+ )
+
+ @requirements.fetch_rows_post_commit
+ def test_explicit_returning_pk_autocommit(self, connection):
+ table = self.tables.autoinc_pk
+ r = connection.execute(
+ table.insert().returning(table.c.id), dict(data="some data")
+ )
+ pk = r.first()[0]
+ fetched_pk = connection.scalar(select(table.c.id))
+ eq_(fetched_pk, pk)
+
+ def test_explicit_returning_pk_no_autocommit(self, connection):
+ table = self.tables.autoinc_pk
+ r = connection.execute(
+ table.insert().returning(table.c.id), dict(data="some data")
+ )
+
+ pk = r.first()[0]
+ fetched_pk = connection.scalar(select(table.c.id))
+ eq_(fetched_pk, pk)
+
+ def test_autoincrement_on_insert_implicit_returning(self, connection):
+ connection.execute(
+ self.tables.autoinc_pk.insert(), dict(data="some data")
+ )
+ self._assert_round_trip(self.tables.autoinc_pk, connection)
+
+ def test_last_inserted_id_implicit_returning(self, connection):
+ r = connection.execute(
+ self.tables.autoinc_pk.insert(), dict(data="some data")
+ )
+ pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
+ eq_(r.inserted_primary_key, (pk,))
+
+ @requirements.insert_executemany_returning
+ def test_insertmanyvalues_returning(self, connection):
+ r = connection.execute(
+ self.tables.autoinc_pk.insert().returning(
+ self.tables.autoinc_pk.c.id
+ ),
+ [
+ {"data": "d1"},
+ {"data": "d2"},
+ {"data": "d3"},
+ {"data": "d4"},
+ {"data": "d5"},
+ ],
+ )
+ rall = r.all()
+
+ pks = connection.execute(select(self.tables.autoinc_pk.c.id))
+
+ eq_(rall, pks.all())
+
+ @testing.combinations(
+ (Double(), 8.5514716, True),
+ (
+ Double(53),
+ 8.5514716,
+ True,
+ testing.requires.float_or_double_precision_behaves_generically,
+ ),
+ (Float(), 8.5514, True),
+ (
+ Float(8),
+ 8.5514,
+ True,
+ testing.requires.float_or_double_precision_behaves_generically,
+ ),
+ (
+ Numeric(precision=15, scale=12, asdecimal=False),
+ 8.5514716,
+ True,
+ testing.requires.literal_float_coercion,
+ ),
+ (
+ Numeric(precision=15, scale=12, asdecimal=True),
+ Decimal("8.5514716"),
+ False,
+ ),
+ argnames="type_,value,do_rounding",
+ )
+ @testing.variation("sort_by_parameter_order", [True, False])
+ @testing.variation("multiple_rows", [True, False])
+ def test_insert_w_floats(
+ self,
+ connection,
+ metadata,
+ sort_by_parameter_order,
+ type_,
+ value,
+ do_rounding,
+ multiple_rows,
+ ):
+ """test #9701.
+
+ this tests insertmanyvalues as well as decimal / floating point
+ RETURNING types
+
+ """
+
+ t = Table(
+ # Oracle backends seems to be getting confused if
+ # this table is named the same as the one
+ # in test_imv_returning_datatypes. use a different name
+ "f_t",
+ metadata,
+ Column("id", Integer, Identity(), primary_key=True),
+ Column("value", type_),
+ )
+
+ t.create(connection)
+
+ result = connection.execute(
+ t.insert().returning(
+ t.c.id,
+ t.c.value,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ ),
+ (
+ [{"value": value} for i in range(10)]
+ if multiple_rows
+ else {"value": value}
+ ),
+ )
+
+ if multiple_rows:
+ i_range = range(1, 11)
+ else:
+ i_range = range(1, 2)
+
+ # we want to test only that we are getting floating points back
+ # with some degree of the original value maintained, that it is not
+ # being truncated to an integer. there's too much variation in how
+ # drivers return floats, which should not be relied upon to be
+ # exact, for us to just compare as is (works for PG drivers but not
+ # others) so we use rounding here. There's precedent for this
+ # in suite/test_types.py::NumericTest as well
+
+ if do_rounding:
+ eq_(
+ {(id_, round(val_, 5)) for id_, val_ in result},
+ {(id_, round(value, 5)) for id_ in i_range},
+ )
+
+ eq_(
+ {
+ round(val_, 5)
+ for val_ in connection.scalars(select(t.c.value))
+ },
+ {round(value, 5)},
+ )
+ else:
+ eq_(
+ set(result),
+ {(id_, value) for id_ in i_range},
+ )
+
+ eq_(
+ set(connection.scalars(select(t.c.value))),
+ {value},
+ )
+
+ @testing.combinations(
+ (
+ "non_native_uuid",
+ Uuid(native_uuid=False),
+ uuid.uuid4(),
+ ),
+ (
+ "non_native_uuid_str",
+ Uuid(as_uuid=False, native_uuid=False),
+ str(uuid.uuid4()),
+ ),
+ (
+ "generic_native_uuid",
+ Uuid(native_uuid=True),
+ uuid.uuid4(),
+ testing.requires.uuid_data_type,
+ ),
+ (
+ "generic_native_uuid_str",
+ Uuid(as_uuid=False, native_uuid=True),
+ str(uuid.uuid4()),
+ testing.requires.uuid_data_type,
+ ),
+ ("UUID", UUID(), uuid.uuid4(), testing.requires.uuid_data_type),
+ (
+ "LargeBinary1",
+ LargeBinary(),
+ b"this is binary",
+ ),
+ ("LargeBinary2", LargeBinary(), b"7\xe7\x9f"),
+ argnames="type_,value",
+ id_="iaa",
+ )
+ @testing.variation("sort_by_parameter_order", [True, False])
+ @testing.variation("multiple_rows", [True, False])
+ @testing.requires.insert_returning
+ def test_imv_returning_datatypes(
+ self,
+ connection,
+ metadata,
+ sort_by_parameter_order,
+ type_,
+ value,
+ multiple_rows,
+ ):
+ """test #9739, #9808 (similar to #9701).
+
+ this tests insertmanyvalues in conjunction with various datatypes.
+
+ These tests are particularly for the asyncpg driver which needs
+ most types to be explicitly cast for the new IMV format
+
+ """
+ t = Table(
+ "d_t",
+ metadata,
+ Column("id", Integer, Identity(), primary_key=True),
+ Column("value", type_),
+ )
+
+ t.create(connection)
+
+ result = connection.execute(
+ t.insert().returning(
+ t.c.id,
+ t.c.value,
+ sort_by_parameter_order=bool(sort_by_parameter_order),
+ ),
+ (
+ [{"value": value} for i in range(10)]
+ if multiple_rows
+ else {"value": value}
+ ),
+ )
+
+ if multiple_rows:
+ i_range = range(1, 11)
+ else:
+ i_range = range(1, 2)
+
+ eq_(
+ set(result),
+ {(id_, value) for id_ in i_range},
+ )
+
+ eq_(
+ set(connection.scalars(select(t.c.value))),
+ {value},
+ )
+
+
+__all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest")