summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py519
1 files changed, 519 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py b/venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py
new file mode 100644
index 0000000..a6ce6ca
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py
@@ -0,0 +1,519 @@
+# testing/util.py
+# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+
+from __future__ import annotations
+
+from collections import deque
+import decimal
+import gc
+from itertools import chain
+import random
+import sys
+from sys import getsizeof
+import types
+
+from . import config
+from . import mock
+from .. import inspect
+from ..engine import Connection
+from ..schema import Column
+from ..schema import DropConstraint
+from ..schema import DropTable
+from ..schema import ForeignKeyConstraint
+from ..schema import MetaData
+from ..schema import Table
+from ..sql import schema
+from ..sql.sqltypes import Integer
+from ..util import decorator
+from ..util import defaultdict
+from ..util import has_refcount_gc
+from ..util import inspect_getfullargspec
+
+
+if not has_refcount_gc:
+
+ def non_refcount_gc_collect(*args):
+ gc.collect()
+ gc.collect()
+
+ gc_collect = lazy_gc = non_refcount_gc_collect
+else:
+ # assume CPython - straight gc.collect, lazy_gc() is a pass
+ gc_collect = gc.collect
+
+ def lazy_gc():
+ pass
+
+
+def picklers():
+ picklers = set()
+ import pickle
+
+ picklers.add(pickle)
+
+ # yes, this thing needs this much testing
+ for pickle_ in picklers:
+ for protocol in range(-2, pickle.HIGHEST_PROTOCOL + 1):
+ yield pickle_.loads, lambda d: pickle_.dumps(d, protocol)
+
+
+def random_choices(population, k=1):
+ return random.choices(population, k=k)
+
+
+def round_decimal(value, prec):
+ if isinstance(value, float):
+ return round(value, prec)
+
+ # can also use shift() here but that is 2.6 only
+ return (value * decimal.Decimal("1" + "0" * prec)).to_integral(
+ decimal.ROUND_FLOOR
+ ) / pow(10, prec)
+
+
+class RandomSet(set):
+ def __iter__(self):
+ l = list(set.__iter__(self))
+ random.shuffle(l)
+ return iter(l)
+
+ def pop(self):
+ index = random.randint(0, len(self) - 1)
+ item = list(set.__iter__(self))[index]
+ self.remove(item)
+ return item
+
+ def union(self, other):
+ return RandomSet(set.union(self, other))
+
+ def difference(self, other):
+ return RandomSet(set.difference(self, other))
+
+ def intersection(self, other):
+ return RandomSet(set.intersection(self, other))
+
+ def copy(self):
+ return RandomSet(self)
+
+
+def conforms_partial_ordering(tuples, sorted_elements):
+ """True if the given sorting conforms to the given partial ordering."""
+
+ deps = defaultdict(set)
+ for parent, child in tuples:
+ deps[parent].add(child)
+ for i, node in enumerate(sorted_elements):
+ for n in sorted_elements[i:]:
+ if node in deps[n]:
+ return False
+ else:
+ return True
+
+
+def all_partial_orderings(tuples, elements):
+ edges = defaultdict(set)
+ for parent, child in tuples:
+ edges[child].add(parent)
+
+ def _all_orderings(elements):
+ if len(elements) == 1:
+ yield list(elements)
+ else:
+ for elem in elements:
+ subset = set(elements).difference([elem])
+ if not subset.intersection(edges[elem]):
+ for sub_ordering in _all_orderings(subset):
+ yield [elem] + sub_ordering
+
+ return iter(_all_orderings(elements))
+
+
+def function_named(fn, name):
+ """Return a function with a given __name__.
+
+ Will assign to __name__ and return the original function if possible on
+ the Python implementation, otherwise a new function will be constructed.
+
+ This function should be phased out as much as possible
+ in favor of @decorator. Tests that "generate" many named tests
+ should be modernized.
+
+ """
+ try:
+ fn.__name__ = name
+ except TypeError:
+ fn = types.FunctionType(
+ fn.__code__, fn.__globals__, name, fn.__defaults__, fn.__closure__
+ )
+ return fn
+
+
+def run_as_contextmanager(ctx, fn, *arg, **kw):
+ """Run the given function under the given contextmanager,
+ simulating the behavior of 'with' to support older
+ Python versions.
+
+ This is not necessary anymore as we have placed 2.6
+ as minimum Python version, however some tests are still using
+ this structure.
+
+ """
+
+ obj = ctx.__enter__()
+ try:
+ result = fn(obj, *arg, **kw)
+ ctx.__exit__(None, None, None)
+ return result
+ except:
+ exc_info = sys.exc_info()
+ raise_ = ctx.__exit__(*exc_info)
+ if not raise_:
+ raise
+ else:
+ return raise_
+
+
+def rowset(results):
+ """Converts the results of sql execution into a plain set of column tuples.
+
+ Useful for asserting the results of an unordered query.
+ """
+
+ return {tuple(row) for row in results}
+
+
+def fail(msg):
+ assert False, msg
+
+
+@decorator
+def provide_metadata(fn, *args, **kw):
+ """Provide bound MetaData for a single test, dropping afterwards.
+
+ Legacy; use the "metadata" pytest fixture.
+
+ """
+
+ from . import fixtures
+
+ metadata = schema.MetaData()
+ self = args[0]
+ prev_meta = getattr(self, "metadata", None)
+ self.metadata = metadata
+ try:
+ return fn(*args, **kw)
+ finally:
+ # close out some things that get in the way of dropping tables.
+ # when using the "metadata" fixture, there is a set ordering
+ # of things that makes sure things are cleaned up in order, however
+ # the simple "decorator" nature of this legacy function means
+ # we have to hardcode some of that cleanup ahead of time.
+
+ # close ORM sessions
+ fixtures.close_all_sessions()
+
+ # integrate with the "connection" fixture as there are many
+ # tests where it is used along with provide_metadata
+ cfc = fixtures.base._connection_fixture_connection
+ if cfc:
+ # TODO: this warning can be used to find all the places
+ # this is used with connection fixture
+ # warn("mixing legacy provide metadata with connection fixture")
+ drop_all_tables_from_metadata(metadata, cfc)
+ # as the provide_metadata fixture is often used with "testing.db",
+ # when we do the drop we have to commit the transaction so that
+ # the DB is actually updated as the CREATE would have been
+ # committed
+ cfc.get_transaction().commit()
+ else:
+ drop_all_tables_from_metadata(metadata, config.db)
+ self.metadata = prev_meta
+
+
+def flag_combinations(*combinations):
+ """A facade around @testing.combinations() oriented towards boolean
+ keyword-based arguments.
+
+ Basically generates a nice looking identifier based on the keywords
+ and also sets up the argument names.
+
+ E.g.::
+
+ @testing.flag_combinations(
+ dict(lazy=False, passive=False),
+ dict(lazy=True, passive=False),
+ dict(lazy=False, passive=True),
+ dict(lazy=False, passive=True, raiseload=True),
+ )
+
+
+ would result in::
+
+ @testing.combinations(
+ ('', False, False, False),
+ ('lazy', True, False, False),
+ ('lazy_passive', True, True, False),
+ ('lazy_passive', True, True, True),
+ id_='iaaa',
+ argnames='lazy,passive,raiseload'
+ )
+
+ """
+
+ keys = set()
+
+ for d in combinations:
+ keys.update(d)
+
+ keys = sorted(keys)
+
+ return config.combinations(
+ *[
+ ("_".join(k for k in keys if d.get(k, False)),)
+ + tuple(d.get(k, False) for k in keys)
+ for d in combinations
+ ],
+ id_="i" + ("a" * len(keys)),
+ argnames=",".join(keys),
+ )
+
+
+def lambda_combinations(lambda_arg_sets, **kw):
+ args = inspect_getfullargspec(lambda_arg_sets)
+
+ arg_sets = lambda_arg_sets(*[mock.Mock() for arg in args[0]])
+
+ def create_fixture(pos):
+ def fixture(**kw):
+ return lambda_arg_sets(**kw)[pos]
+
+ fixture.__name__ = "fixture_%3.3d" % pos
+ return fixture
+
+ return config.combinations(
+ *[(create_fixture(i),) for i in range(len(arg_sets))], **kw
+ )
+
+
+def resolve_lambda(__fn, **kw):
+ """Given a no-arg lambda and a namespace, return a new lambda that
+ has all the values filled in.
+
+ This is used so that we can have module-level fixtures that
+ refer to instance-level variables using lambdas.
+
+ """
+
+ pos_args = inspect_getfullargspec(__fn)[0]
+ pass_pos_args = {arg: kw.pop(arg) for arg in pos_args}
+ glb = dict(__fn.__globals__)
+ glb.update(kw)
+ new_fn = types.FunctionType(__fn.__code__, glb)
+ return new_fn(**pass_pos_args)
+
+
+def metadata_fixture(ddl="function"):
+ """Provide MetaData for a pytest fixture."""
+
+ def decorate(fn):
+ def run_ddl(self):
+ metadata = self.metadata = schema.MetaData()
+ try:
+ result = fn(self, metadata)
+ metadata.create_all(config.db)
+ # TODO:
+ # somehow get a per-function dml erase fixture here
+ yield result
+ finally:
+ metadata.drop_all(config.db)
+
+ return config.fixture(scope=ddl)(run_ddl)
+
+ return decorate
+
+
+def force_drop_names(*names):
+ """Force the given table names to be dropped after test complete,
+ isolating for foreign key cycles
+
+ """
+
+ @decorator
+ def go(fn, *args, **kw):
+ try:
+ return fn(*args, **kw)
+ finally:
+ drop_all_tables(config.db, inspect(config.db), include_names=names)
+
+ return go
+
+
+class adict(dict):
+ """Dict keys available as attributes. Shadows."""
+
+ def __getattribute__(self, key):
+ try:
+ return self[key]
+ except KeyError:
+ return dict.__getattribute__(self, key)
+
+ def __call__(self, *keys):
+ return tuple([self[key] for key in keys])
+
+ get_all = __call__
+
+
+def drop_all_tables_from_metadata(metadata, engine_or_connection):
+ from . import engines
+
+ def go(connection):
+ engines.testing_reaper.prepare_for_drop_tables(connection)
+
+ if not connection.dialect.supports_alter:
+ from . import assertions
+
+ with assertions.expect_warnings(
+ "Can't sort tables", assert_=False
+ ):
+ metadata.drop_all(connection)
+ else:
+ metadata.drop_all(connection)
+
+ if not isinstance(engine_or_connection, Connection):
+ with engine_or_connection.begin() as connection:
+ go(connection)
+ else:
+ go(engine_or_connection)
+
+
+def drop_all_tables(
+ engine,
+ inspector,
+ schema=None,
+ consider_schemas=(None,),
+ include_names=None,
+):
+ if include_names is not None:
+ include_names = set(include_names)
+
+ if schema is not None:
+ assert consider_schemas == (
+ None,
+ ), "consider_schemas and schema are mutually exclusive"
+ consider_schemas = (schema,)
+
+ with engine.begin() as conn:
+ for table_key, fkcs in reversed(
+ inspector.sort_tables_on_foreign_key_dependency(
+ consider_schemas=consider_schemas
+ )
+ ):
+ if table_key:
+ if (
+ include_names is not None
+ and table_key[1] not in include_names
+ ):
+ continue
+ conn.execute(
+ DropTable(
+ Table(table_key[1], MetaData(), schema=table_key[0])
+ )
+ )
+ elif fkcs:
+ if not engine.dialect.supports_alter:
+ continue
+ for t_key, fkc in fkcs:
+ if (
+ include_names is not None
+ and t_key[1] not in include_names
+ ):
+ continue
+ tb = Table(
+ t_key[1],
+ MetaData(),
+ Column("x", Integer),
+ Column("y", Integer),
+ schema=t_key[0],
+ )
+ conn.execute(
+ DropConstraint(
+ ForeignKeyConstraint([tb.c.x], [tb.c.y], name=fkc)
+ )
+ )
+
+
+def teardown_events(event_cls):
+ @decorator
+ def decorate(fn, *arg, **kw):
+ try:
+ return fn(*arg, **kw)
+ finally:
+ event_cls._clear()
+
+ return decorate
+
+
+def total_size(o):
+ """Returns the approximate memory footprint an object and all of its
+ contents.
+
+ source: https://code.activestate.com/recipes/577504/
+
+
+ """
+
+ def dict_handler(d):
+ return chain.from_iterable(d.items())
+
+ all_handlers = {
+ tuple: iter,
+ list: iter,
+ deque: iter,
+ dict: dict_handler,
+ set: iter,
+ frozenset: iter,
+ }
+ seen = set() # track which object id's have already been seen
+ default_size = getsizeof(0) # estimate sizeof object without __sizeof__
+
+ def sizeof(o):
+ if id(o) in seen: # do not double count the same object
+ return 0
+ seen.add(id(o))
+ s = getsizeof(o, default_size)
+
+ for typ, handler in all_handlers.items():
+ if isinstance(o, typ):
+ s += sum(map(sizeof, handler(o)))
+ break
+ return s
+
+ return sizeof(o)
+
+
+def count_cache_key_tuples(tup):
+ """given a cache key tuple, counts how many instances of actual
+ tuples are found.
+
+ used to alert large jumps in cache key complexity.
+
+ """
+ stack = [tup]
+
+ sentinel = object()
+ num_elements = 0
+
+ while stack:
+ elem = stack.pop(0)
+ if elem is sentinel:
+ num_elements += 1
+ elif isinstance(elem, tuple):
+ if elem:
+ stack = list(elem) + [sentinel] + stack
+ return num_elements