diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/aiosqlite/tests/smoke.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/aiosqlite/tests/smoke.py | 452 |
1 files changed, 452 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/aiosqlite/tests/smoke.py b/venv/lib/python3.11/site-packages/aiosqlite/tests/smoke.py new file mode 100644 index 0000000..f42106c --- /dev/null +++ b/venv/lib/python3.11/site-packages/aiosqlite/tests/smoke.py @@ -0,0 +1,452 @@ +# Copyright 2022 Amethyst Reese +# Licensed under the MIT license +import asyncio +import sqlite3 +from pathlib import Path +from sqlite3 import OperationalError +from threading import Thread +from unittest import IsolatedAsyncioTestCase as TestCase, SkipTest + +import aiosqlite +from .helpers import setup_logger + +TEST_DB = Path("test.db") + +# pypy uses non-standard text factory for low-level sqlite implementation +try: + from _sqlite3 import _unicode_text_factory as default_text_factory +except ImportError: + default_text_factory = str + + +class SmokeTest(TestCase): + @classmethod + def setUpClass(cls): + setup_logger() + + def setUp(self): + if TEST_DB.exists(): + TEST_DB.unlink() + + def tearDown(self): + if TEST_DB.exists(): + TEST_DB.unlink() + + async def test_connection_await(self): + db = await aiosqlite.connect(TEST_DB) + self.assertIsInstance(db, aiosqlite.Connection) + + async with db.execute("select 1, 2") as cursor: + rows = await cursor.fetchall() + self.assertEqual(rows, [(1, 2)]) + + await db.close() + + async def test_connection_context(self): + async with aiosqlite.connect(TEST_DB) as db: + self.assertIsInstance(db, aiosqlite.Connection) + + async with db.execute("select 1, 2") as cursor: + rows = await cursor.fetchall() + self.assertEqual(rows, [(1, 2)]) + + async def test_connection_locations(self): + class Fake: # pylint: disable=too-few-public-methods + def __str__(self): + return str(TEST_DB) + + locs = ("test.db", b"test.db", Path("test.db"), Fake()) + + async with aiosqlite.connect(TEST_DB) as db: + await db.execute("create table foo (i integer, k integer)") + await db.execute("insert into foo (i, k) values (1, 5)") + await db.commit() + + cursor = await db.execute("select * from foo") + rows = await cursor.fetchall() + + for loc in locs: + async with aiosqlite.connect(loc) as db: + cursor = await db.execute("select * from foo") + self.assertEqual(await cursor.fetchall(), rows) + + async def test_multiple_connections(self): + async with aiosqlite.connect(TEST_DB) as db: + await db.execute( + "create table multiple_connections " + "(i integer primary key asc, k integer)" + ) + + async def do_one_conn(i): + async with aiosqlite.connect(TEST_DB) as db: + await db.execute("insert into multiple_connections (k) values (?)", [i]) + await db.commit() + + await asyncio.gather(*[do_one_conn(i) for i in range(10)]) + + async with aiosqlite.connect(TEST_DB) as db: + cursor = await db.execute("select * from multiple_connections") + rows = await cursor.fetchall() + + assert len(rows) == 10 + + async def test_multiple_queries(self): + async with aiosqlite.connect(TEST_DB) as db: + await db.execute( + "create table multiple_queries " + "(i integer primary key asc, k integer)" + ) + + await asyncio.gather( + *[ + db.execute("insert into multiple_queries (k) values (?)", [i]) + for i in range(10) + ] + ) + + await db.commit() + + async with aiosqlite.connect(TEST_DB) as db: + cursor = await db.execute("select * from multiple_queries") + rows = await cursor.fetchall() + + assert len(rows) == 10 + + async def test_iterable_cursor(self): + async with aiosqlite.connect(TEST_DB) as db: + cursor = await db.cursor() + await cursor.execute( + "create table iterable_cursor " "(i integer primary key asc, k integer)" + ) + await cursor.executemany( + "insert into iterable_cursor (k) values (?)", [[i] for i in range(10)] + ) + await db.commit() + + async with aiosqlite.connect(TEST_DB) as db: + cursor = await db.execute("select * from iterable_cursor") + rows = [] + async for row in cursor: + rows.append(row) + + assert len(rows) == 10 + + async def test_multi_loop_usage(self): + results = {} + + def runner(k, conn): + async def query(): + async with conn.execute("select * from foo") as cursor: + rows = await cursor.fetchall() + self.assertEqual(len(rows), 2) + return rows + + with self.subTest(k): + loop = asyncio.new_event_loop() + rows = loop.run_until_complete(query()) + loop.close() + results[k] = rows + + async with aiosqlite.connect(":memory:") as db: + await db.execute("create table foo (id int, name varchar)") + await db.execute( + "insert into foo values (?, ?), (?, ?)", (1, "Sally", 2, "Janet") + ) + await db.commit() + + threads = [Thread(target=runner, args=(k, db)) for k in range(4)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + self.assertEqual(len(results), 4) + for rows in results.values(): + self.assertEqual(len(rows), 2) + + async def test_context_cursor(self): + async with aiosqlite.connect(TEST_DB) as db: + async with db.cursor() as cursor: + await cursor.execute( + "create table context_cursor " + "(i integer primary key asc, k integer)" + ) + await cursor.executemany( + "insert into context_cursor (k) values (?)", + [[i] for i in range(10)], + ) + await db.commit() + + async with aiosqlite.connect(TEST_DB) as db: + async with db.execute("select * from context_cursor") as cursor: + rows = [] + async for row in cursor: + rows.append(row) + + assert len(rows) == 10 + + async def test_cursor_return_self(self): + async with aiosqlite.connect(TEST_DB) as db: + cursor = await db.cursor() + + result = await cursor.execute( + "create table test_cursor_return_self (i integer, k integer)" + ) + self.assertEqual(result, cursor, "cursor execute returns itself") + + result = await cursor.executemany( + "insert into test_cursor_return_self values (?, ?)", [(1, 1), (2, 2)] + ) + self.assertEqual(result, cursor) + + result = await cursor.executescript( + "insert into test_cursor_return_self values (3, 3);" + "insert into test_cursor_return_self values (4, 4);" + "insert into test_cursor_return_self values (5, 5);" + ) + self.assertEqual(result, cursor) + + async def test_connection_properties(self): + async with aiosqlite.connect(TEST_DB) as db: + self.assertEqual(db.total_changes, 0) + + async with db.cursor() as cursor: + self.assertFalse(db.in_transaction) + await cursor.execute( + "create table test_properties " + "(i integer primary key asc, k integer, d text)" + ) + await cursor.execute( + "insert into test_properties (k, d) values (1, 'hi')" + ) + self.assertTrue(db.in_transaction) + await db.commit() + self.assertFalse(db.in_transaction) + + self.assertEqual(db.total_changes, 1) + + self.assertIsNone(db.row_factory) + self.assertEqual(db.text_factory, default_text_factory) + + async with db.cursor() as cursor: + await cursor.execute("select * from test_properties") + row = await cursor.fetchone() + self.assertIsInstance(row, tuple) + self.assertEqual(row, (1, 1, "hi")) + with self.assertRaises(TypeError): + _ = row["k"] + + async with db.cursor() as cursor: + cursor.row_factory = aiosqlite.Row + self.assertEqual(cursor.row_factory, aiosqlite.Row) + await cursor.execute("select * from test_properties") + row = await cursor.fetchone() + self.assertIsInstance(row, aiosqlite.Row) + self.assertEqual(row[1], 1) + self.assertEqual(row[2], "hi") + self.assertEqual(row["k"], 1) + self.assertEqual(row["d"], "hi") + + db.row_factory = aiosqlite.Row + db.text_factory = bytes + self.assertEqual(db.row_factory, aiosqlite.Row) + self.assertEqual(db.text_factory, bytes) + + async with db.cursor() as cursor: + await cursor.execute("select * from test_properties") + row = await cursor.fetchone() + self.assertIsInstance(row, aiosqlite.Row) + self.assertEqual(row[1], 1) + self.assertEqual(row[2], b"hi") + self.assertEqual(row["k"], 1) + self.assertEqual(row["d"], b"hi") + + async def test_fetch_all(self): + async with aiosqlite.connect(TEST_DB) as db: + await db.execute( + "create table test_fetch_all (i integer primary key asc, k integer)" + ) + await db.execute( + "insert into test_fetch_all (k) values (10), (24), (16), (32)" + ) + await db.commit() + + async with aiosqlite.connect(TEST_DB) as db: + cursor = await db.execute("select k from test_fetch_all where k < 30") + rows = await cursor.fetchall() + self.assertEqual(rows, [(10,), (24,), (16,)]) + + async def test_enable_load_extension(self): + """Assert that after enabling extension loading, they can be loaded""" + async with aiosqlite.connect(TEST_DB) as db: + try: + await db.enable_load_extension(True) + await db.load_extension("test") + except OperationalError as e: + assert "not authorized" not in e.args + except AttributeError as e: + raise SkipTest( + "python was not compiled with sqlite3 " + "extension support, so we can't test it" + ) from e + + async def test_set_progress_handler(self): + """ + Assert that after setting a progress handler returning 1, DB operations are aborted + """ + async with aiosqlite.connect(TEST_DB) as db: + await db.set_progress_handler(lambda: 1, 1) + with self.assertRaises(OperationalError): + await db.execute( + "create table test_progress_handler (i integer primary key asc, k integer)" + ) + + async def test_create_function(self): + """Assert that after creating a custom function, it can be used""" + + def no_arg(): + return "no arg" + + def one_arg(num): + return num * 2 + + async with aiosqlite.connect(TEST_DB) as db: + await db.create_function("no_arg", 0, no_arg) + await db.create_function("one_arg", 1, one_arg) + + async with db.execute("SELECT no_arg();") as res: + row = await res.fetchone() + self.assertEqual(row[0], "no arg") + + async with db.execute("SELECT one_arg(10);") as res: + row = await res.fetchone() + self.assertEqual(row[0], 20) + + async def test_create_function_deterministic(self): + """Assert that after creating a deterministic custom function, it can be used. + + https://sqlite.org/deterministic.html + """ + + def one_arg(num): + return num * 2 + + async with aiosqlite.connect(TEST_DB) as db: + await db.create_function("one_arg", 1, one_arg, deterministic=True) + await db.execute("create table foo (id int, bar int)") + + # Non-deterministic functions cannot be used in indexes + await db.execute("create index t on foo(one_arg(bar))") + + async def test_set_trace_callback(self): + statements = [] + + def callback(statement: str): + statements.append(statement) + + async with aiosqlite.connect(TEST_DB) as db: + await db.set_trace_callback(callback) + + await db.execute("select 10") + self.assertIn("select 10", statements) + + async def test_connect_error(self): + bad_db = Path("/something/that/shouldnt/exist.db") + with self.assertRaisesRegex(OperationalError, "unable to open database"): + async with aiosqlite.connect(bad_db) as db: + self.assertIsNone(db) # should never be reached + + with self.assertRaisesRegex(OperationalError, "unable to open database"): + await aiosqlite.connect(bad_db) + + async def test_iterdump(self): + async with aiosqlite.connect(":memory:") as db: + await db.execute("create table foo (i integer, k charvar(250))") + await db.executemany( + "insert into foo values (?, ?)", [(1, "hello"), (2, "world")] + ) + + lines = [line async for line in db.iterdump()] + self.assertEqual( + lines, + [ + "BEGIN TRANSACTION;", + "CREATE TABLE foo (i integer, k charvar(250));", + "INSERT INTO \"foo\" VALUES(1,'hello');", + "INSERT INTO \"foo\" VALUES(2,'world');", + "COMMIT;", + ], + ) + + async def test_cursor_on_closed_connection(self): + db = await aiosqlite.connect(TEST_DB) + + cursor = await db.execute("select 1, 2") + await db.close() + with self.assertRaisesRegex(ValueError, "Connection closed"): + await cursor.fetchall() + with self.assertRaisesRegex(ValueError, "Connection closed"): + await cursor.fetchall() + + async def test_cursor_on_closed_connection_loop(self): + db = await aiosqlite.connect(TEST_DB) + + cursor = await db.execute("select 1, 2") + tasks = [] + for i in range(100): + if i == 50: + tasks.append(asyncio.ensure_future(db.close())) + tasks.append(asyncio.ensure_future(cursor.fetchall())) + for task in tasks: + try: + await task + except sqlite3.ProgrammingError: + pass + + async def test_close_twice(self): + db = await aiosqlite.connect(TEST_DB) + + await db.close() + + # no error + await db.close() + + async def test_backup_aiosqlite(self): + def progress(a, b, c): + print(a, b, c) + + async with aiosqlite.connect(":memory:") as db1, aiosqlite.connect( + ":memory:" + ) as db2: + await db1.execute("create table foo (i integer, k charvar(250))") + await db1.executemany( + "insert into foo values (?, ?)", [(1, "hello"), (2, "world")] + ) + await db1.commit() + + with self.assertRaisesRegex(OperationalError, "no such table: foo"): + await db2.execute("select * from foo") + + await db1.backup(db2, progress=progress) + + async with db2.execute("select * from foo") as cursor: + rows = await cursor.fetchall() + self.assertEqual(rows, [(1, "hello"), (2, "world")]) + + async def test_backup_sqlite(self): + async with aiosqlite.connect(":memory:") as db1: + with sqlite3.connect(":memory:") as db2: + await db1.execute("create table foo (i integer, k charvar(250))") + await db1.executemany( + "insert into foo values (?, ?)", [(1, "hello"), (2, "world")] + ) + await db1.commit() + + with self.assertRaisesRegex(OperationalError, "no such table: foo"): + db2.execute("select * from foo") + + await db1.backup(db2) + + cursor = db2.execute("select * from foo") + rows = cursor.fetchall() + self.assertEqual(rows, [(1, "hello"), (2, "world")]) |