From 12cf076118570eebbff08c6b3090e0d4798447a1 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:17:55 -0400 Subject: no venv --- .../site-packages/sqlalchemy/testing/util.py | 519 --------------------- 1 file changed, 519 deletions(-) delete mode 100644 venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py') diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py b/venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py deleted file mode 100644 index a6ce6ca..0000000 --- a/venv/lib/python3.11/site-packages/sqlalchemy/testing/util.py +++ /dev/null @@ -1,519 +0,0 @@ -# testing/util.py -# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors -# -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - - -from __future__ import annotations - -from collections import deque -import decimal -import gc -from itertools import chain -import random -import sys -from sys import getsizeof -import types - -from . import config -from . import mock -from .. import inspect -from ..engine import Connection -from ..schema import Column -from ..schema import DropConstraint -from ..schema import DropTable -from ..schema import ForeignKeyConstraint -from ..schema import MetaData -from ..schema import Table -from ..sql import schema -from ..sql.sqltypes import Integer -from ..util import decorator -from ..util import defaultdict -from ..util import has_refcount_gc -from ..util import inspect_getfullargspec - - -if not has_refcount_gc: - - def non_refcount_gc_collect(*args): - gc.collect() - gc.collect() - - gc_collect = lazy_gc = non_refcount_gc_collect -else: - # assume CPython - straight gc.collect, lazy_gc() is a pass - gc_collect = gc.collect - - def lazy_gc(): - pass - - -def picklers(): - picklers = set() - import pickle - - picklers.add(pickle) - - # yes, this thing needs this much testing - for pickle_ in picklers: - for protocol in range(-2, pickle.HIGHEST_PROTOCOL + 1): - yield pickle_.loads, lambda d: pickle_.dumps(d, protocol) - - -def random_choices(population, k=1): - return random.choices(population, k=k) - - -def round_decimal(value, prec): - if isinstance(value, float): - return round(value, prec) - - # can also use shift() here but that is 2.6 only - return (value * decimal.Decimal("1" + "0" * prec)).to_integral( - decimal.ROUND_FLOOR - ) / pow(10, prec) - - -class RandomSet(set): - def __iter__(self): - l = list(set.__iter__(self)) - random.shuffle(l) - return iter(l) - - def pop(self): - index = random.randint(0, len(self) - 1) - item = list(set.__iter__(self))[index] - self.remove(item) - return item - - def union(self, other): - return RandomSet(set.union(self, other)) - - def difference(self, other): - return RandomSet(set.difference(self, other)) - - def intersection(self, other): - return RandomSet(set.intersection(self, other)) - - def copy(self): - return RandomSet(self) - - -def conforms_partial_ordering(tuples, sorted_elements): - """True if the given sorting conforms to the given partial ordering.""" - - deps = defaultdict(set) - for parent, child in tuples: - deps[parent].add(child) - for i, node in enumerate(sorted_elements): - for n in sorted_elements[i:]: - if node in deps[n]: - return False - else: - return True - - -def all_partial_orderings(tuples, elements): - edges = defaultdict(set) - for parent, child in tuples: - edges[child].add(parent) - - def _all_orderings(elements): - if len(elements) == 1: - yield list(elements) - else: - for elem in elements: - subset = set(elements).difference([elem]) - if not subset.intersection(edges[elem]): - for sub_ordering in _all_orderings(subset): - yield [elem] + sub_ordering - - return iter(_all_orderings(elements)) - - -def function_named(fn, name): - """Return a function with a given __name__. - - Will assign to __name__ and return the original function if possible on - the Python implementation, otherwise a new function will be constructed. - - This function should be phased out as much as possible - in favor of @decorator. Tests that "generate" many named tests - should be modernized. - - """ - try: - fn.__name__ = name - except TypeError: - fn = types.FunctionType( - fn.__code__, fn.__globals__, name, fn.__defaults__, fn.__closure__ - ) - return fn - - -def run_as_contextmanager(ctx, fn, *arg, **kw): - """Run the given function under the given contextmanager, - simulating the behavior of 'with' to support older - Python versions. - - This is not necessary anymore as we have placed 2.6 - as minimum Python version, however some tests are still using - this structure. - - """ - - obj = ctx.__enter__() - try: - result = fn(obj, *arg, **kw) - ctx.__exit__(None, None, None) - return result - except: - exc_info = sys.exc_info() - raise_ = ctx.__exit__(*exc_info) - if not raise_: - raise - else: - return raise_ - - -def rowset(results): - """Converts the results of sql execution into a plain set of column tuples. - - Useful for asserting the results of an unordered query. - """ - - return {tuple(row) for row in results} - - -def fail(msg): - assert False, msg - - -@decorator -def provide_metadata(fn, *args, **kw): - """Provide bound MetaData for a single test, dropping afterwards. - - Legacy; use the "metadata" pytest fixture. - - """ - - from . import fixtures - - metadata = schema.MetaData() - self = args[0] - prev_meta = getattr(self, "metadata", None) - self.metadata = metadata - try: - return fn(*args, **kw) - finally: - # close out some things that get in the way of dropping tables. - # when using the "metadata" fixture, there is a set ordering - # of things that makes sure things are cleaned up in order, however - # the simple "decorator" nature of this legacy function means - # we have to hardcode some of that cleanup ahead of time. - - # close ORM sessions - fixtures.close_all_sessions() - - # integrate with the "connection" fixture as there are many - # tests where it is used along with provide_metadata - cfc = fixtures.base._connection_fixture_connection - if cfc: - # TODO: this warning can be used to find all the places - # this is used with connection fixture - # warn("mixing legacy provide metadata with connection fixture") - drop_all_tables_from_metadata(metadata, cfc) - # as the provide_metadata fixture is often used with "testing.db", - # when we do the drop we have to commit the transaction so that - # the DB is actually updated as the CREATE would have been - # committed - cfc.get_transaction().commit() - else: - drop_all_tables_from_metadata(metadata, config.db) - self.metadata = prev_meta - - -def flag_combinations(*combinations): - """A facade around @testing.combinations() oriented towards boolean - keyword-based arguments. - - Basically generates a nice looking identifier based on the keywords - and also sets up the argument names. - - E.g.:: - - @testing.flag_combinations( - dict(lazy=False, passive=False), - dict(lazy=True, passive=False), - dict(lazy=False, passive=True), - dict(lazy=False, passive=True, raiseload=True), - ) - - - would result in:: - - @testing.combinations( - ('', False, False, False), - ('lazy', True, False, False), - ('lazy_passive', True, True, False), - ('lazy_passive', True, True, True), - id_='iaaa', - argnames='lazy,passive,raiseload' - ) - - """ - - keys = set() - - for d in combinations: - keys.update(d) - - keys = sorted(keys) - - return config.combinations( - *[ - ("_".join(k for k in keys if d.get(k, False)),) - + tuple(d.get(k, False) for k in keys) - for d in combinations - ], - id_="i" + ("a" * len(keys)), - argnames=",".join(keys), - ) - - -def lambda_combinations(lambda_arg_sets, **kw): - args = inspect_getfullargspec(lambda_arg_sets) - - arg_sets = lambda_arg_sets(*[mock.Mock() for arg in args[0]]) - - def create_fixture(pos): - def fixture(**kw): - return lambda_arg_sets(**kw)[pos] - - fixture.__name__ = "fixture_%3.3d" % pos - return fixture - - return config.combinations( - *[(create_fixture(i),) for i in range(len(arg_sets))], **kw - ) - - -def resolve_lambda(__fn, **kw): - """Given a no-arg lambda and a namespace, return a new lambda that - has all the values filled in. - - This is used so that we can have module-level fixtures that - refer to instance-level variables using lambdas. - - """ - - pos_args = inspect_getfullargspec(__fn)[0] - pass_pos_args = {arg: kw.pop(arg) for arg in pos_args} - glb = dict(__fn.__globals__) - glb.update(kw) - new_fn = types.FunctionType(__fn.__code__, glb) - return new_fn(**pass_pos_args) - - -def metadata_fixture(ddl="function"): - """Provide MetaData for a pytest fixture.""" - - def decorate(fn): - def run_ddl(self): - metadata = self.metadata = schema.MetaData() - try: - result = fn(self, metadata) - metadata.create_all(config.db) - # TODO: - # somehow get a per-function dml erase fixture here - yield result - finally: - metadata.drop_all(config.db) - - return config.fixture(scope=ddl)(run_ddl) - - return decorate - - -def force_drop_names(*names): - """Force the given table names to be dropped after test complete, - isolating for foreign key cycles - - """ - - @decorator - def go(fn, *args, **kw): - try: - return fn(*args, **kw) - finally: - drop_all_tables(config.db, inspect(config.db), include_names=names) - - return go - - -class adict(dict): - """Dict keys available as attributes. Shadows.""" - - def __getattribute__(self, key): - try: - return self[key] - except KeyError: - return dict.__getattribute__(self, key) - - def __call__(self, *keys): - return tuple([self[key] for key in keys]) - - get_all = __call__ - - -def drop_all_tables_from_metadata(metadata, engine_or_connection): - from . import engines - - def go(connection): - engines.testing_reaper.prepare_for_drop_tables(connection) - - if not connection.dialect.supports_alter: - from . import assertions - - with assertions.expect_warnings( - "Can't sort tables", assert_=False - ): - metadata.drop_all(connection) - else: - metadata.drop_all(connection) - - if not isinstance(engine_or_connection, Connection): - with engine_or_connection.begin() as connection: - go(connection) - else: - go(engine_or_connection) - - -def drop_all_tables( - engine, - inspector, - schema=None, - consider_schemas=(None,), - include_names=None, -): - if include_names is not None: - include_names = set(include_names) - - if schema is not None: - assert consider_schemas == ( - None, - ), "consider_schemas and schema are mutually exclusive" - consider_schemas = (schema,) - - with engine.begin() as conn: - for table_key, fkcs in reversed( - inspector.sort_tables_on_foreign_key_dependency( - consider_schemas=consider_schemas - ) - ): - if table_key: - if ( - include_names is not None - and table_key[1] not in include_names - ): - continue - conn.execute( - DropTable( - Table(table_key[1], MetaData(), schema=table_key[0]) - ) - ) - elif fkcs: - if not engine.dialect.supports_alter: - continue - for t_key, fkc in fkcs: - if ( - include_names is not None - and t_key[1] not in include_names - ): - continue - tb = Table( - t_key[1], - MetaData(), - Column("x", Integer), - Column("y", Integer), - schema=t_key[0], - ) - conn.execute( - DropConstraint( - ForeignKeyConstraint([tb.c.x], [tb.c.y], name=fkc) - ) - ) - - -def teardown_events(event_cls): - @decorator - def decorate(fn, *arg, **kw): - try: - return fn(*arg, **kw) - finally: - event_cls._clear() - - return decorate - - -def total_size(o): - """Returns the approximate memory footprint an object and all of its - contents. - - source: https://code.activestate.com/recipes/577504/ - - - """ - - def dict_handler(d): - return chain.from_iterable(d.items()) - - all_handlers = { - tuple: iter, - list: iter, - deque: iter, - dict: dict_handler, - set: iter, - frozenset: iter, - } - seen = set() # track which object id's have already been seen - default_size = getsizeof(0) # estimate sizeof object without __sizeof__ - - def sizeof(o): - if id(o) in seen: # do not double count the same object - return 0 - seen.add(id(o)) - s = getsizeof(o, default_size) - - for typ, handler in all_handlers.items(): - if isinstance(o, typ): - s += sum(map(sizeof, handler(o))) - break - return s - - return sizeof(o) - - -def count_cache_key_tuples(tup): - """given a cache key tuple, counts how many instances of actual - tuples are found. - - used to alert large jumps in cache key complexity. - - """ - stack = [tup] - - sentinel = object() - num_elements = 0 - - while stack: - elem = stack.pop(0) - if elem is sentinel: - num_elements += 1 - elif isinstance(elem, tuple): - if elem: - stack = list(elem) + [sentinel] + stack - return num_elements -- cgit v1.2.3