summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin')
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__init__.py6
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/__init__.cpython-311.pycbin209 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/bootstrap.cpython-311.pycbin2230 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/plugin_base.cpython-311.pycbin31509 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/pytestplugin.cpython-311.pycbin37248 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/bootstrap.py51
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/plugin_base.py779
-rw-r--r--venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/pytestplugin.py868
8 files changed, 0 insertions, 1704 deletions
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__init__.py b/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__init__.py
deleted file mode 100644
index 0f98777..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__init__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# testing/plugin/__init__.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index 163dc80..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/bootstrap.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/bootstrap.cpython-311.pyc
deleted file mode 100644
index 9b70281..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/bootstrap.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/plugin_base.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/plugin_base.cpython-311.pyc
deleted file mode 100644
index c65e39e..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/plugin_base.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/pytestplugin.cpython-311.pyc b/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/pytestplugin.cpython-311.pyc
deleted file mode 100644
index 6eb39ab..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/__pycache__/pytestplugin.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/bootstrap.py b/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/bootstrap.py
deleted file mode 100644
index d0d3754..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/bootstrap.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# testing/plugin/bootstrap.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-"""
-Bootstrapper for test framework plugins.
-
-The entire rationale for this system is to get the modules in plugin/
-imported without importing all of the supporting library, so that we can
-set up things for testing before coverage starts.
-
-The rationale for all of plugin/ being *in* the supporting library in the
-first place is so that the testing and plugin suite is available to other
-libraries, mainly external SQLAlchemy and Alembic dialects, to make use
-of the same test environment and standard suites available to
-SQLAlchemy/Alembic themselves without the need to ship/install a separate
-package outside of SQLAlchemy.
-
-
-"""
-
-import importlib.util
-import os
-import sys
-
-
-bootstrap_file = locals()["bootstrap_file"]
-to_bootstrap = locals()["to_bootstrap"]
-
-
-def load_file_as_module(name):
- path = os.path.join(os.path.dirname(bootstrap_file), "%s.py" % name)
-
- spec = importlib.util.spec_from_file_location(name, path)
- assert spec is not None
- assert spec.loader is not None
- mod = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(mod)
- return mod
-
-
-if to_bootstrap == "pytest":
- sys.modules["sqla_plugin_base"] = load_file_as_module("plugin_base")
- sys.modules["sqla_plugin_base"].bootstrapped_as_sqlalchemy = True
- sys.modules["sqla_pytestplugin"] = load_file_as_module("pytestplugin")
-else:
- raise Exception("unknown bootstrap: %s" % to_bootstrap) # noqa
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/plugin_base.py b/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/plugin_base.py
deleted file mode 100644
index a642668..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/plugin_base.py
+++ /dev/null
@@ -1,779 +0,0 @@
-# testing/plugin/plugin_base.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-
-from __future__ import annotations
-
-import abc
-from argparse import Namespace
-import configparser
-import logging
-import os
-from pathlib import Path
-import re
-import sys
-from typing import Any
-
-from sqlalchemy.testing import asyncio
-
-"""Testing extensions.
-
-this module is designed to work as a testing-framework-agnostic library,
-created so that multiple test frameworks can be supported at once
-(mostly so that we can migrate to new ones). The current target
-is pytest.
-
-"""
-
-# flag which indicates we are in the SQLAlchemy testing suite,
-# and not that of Alembic or a third party dialect.
-bootstrapped_as_sqlalchemy = False
-
-log = logging.getLogger("sqlalchemy.testing.plugin_base")
-
-# late imports
-fixtures = None
-engines = None
-exclusions = None
-warnings = None
-profiling = None
-provision = None
-assertions = None
-requirements = None
-config = None
-testing = None
-util = None
-file_config = None
-
-logging = None
-include_tags = set()
-exclude_tags = set()
-options: Namespace = None # type: ignore
-
-
-def setup_options(make_option):
- make_option(
- "--log-info",
- action="callback",
- type=str,
- callback=_log,
- help="turn on info logging for <LOG> (multiple OK)",
- )
- make_option(
- "--log-debug",
- action="callback",
- type=str,
- callback=_log,
- help="turn on debug logging for <LOG> (multiple OK)",
- )
- make_option(
- "--db",
- action="append",
- type=str,
- dest="db",
- help="Use prefab database uri. Multiple OK, "
- "first one is run by default.",
- )
- make_option(
- "--dbs",
- action="callback",
- zeroarg_callback=_list_dbs,
- help="List available prefab dbs",
- )
- make_option(
- "--dburi",
- action="append",
- type=str,
- dest="dburi",
- help="Database uri. Multiple OK, first one is run by default.",
- )
- make_option(
- "--dbdriver",
- action="append",
- type=str,
- dest="dbdriver",
- help="Additional database drivers to include in tests. "
- "These are linked to the existing database URLs by the "
- "provisioning system.",
- )
- make_option(
- "--dropfirst",
- action="store_true",
- dest="dropfirst",
- help="Drop all tables in the target database first",
- )
- make_option(
- "--disable-asyncio",
- action="store_true",
- help="disable test / fixtures / provisoning running in asyncio",
- )
- make_option(
- "--backend-only",
- action="callback",
- zeroarg_callback=_set_tag_include("backend"),
- help=(
- "Run only tests marked with __backend__ or __sparse_backend__; "
- "this is now equivalent to the pytest -m backend mark expression"
- ),
- )
- make_option(
- "--nomemory",
- action="callback",
- zeroarg_callback=_set_tag_exclude("memory_intensive"),
- help="Don't run memory profiling tests; "
- "this is now equivalent to the pytest -m 'not memory_intensive' "
- "mark expression",
- )
- make_option(
- "--notimingintensive",
- action="callback",
- zeroarg_callback=_set_tag_exclude("timing_intensive"),
- help="Don't run timing intensive tests; "
- "this is now equivalent to the pytest -m 'not timing_intensive' "
- "mark expression",
- )
- make_option(
- "--nomypy",
- action="callback",
- zeroarg_callback=_set_tag_exclude("mypy"),
- help="Don't run mypy typing tests; "
- "this is now equivalent to the pytest -m 'not mypy' mark expression",
- )
- make_option(
- "--profile-sort",
- type=str,
- default="cumulative",
- dest="profilesort",
- help="Type of sort for profiling standard output",
- )
- make_option(
- "--profile-dump",
- type=str,
- dest="profiledump",
- help="Filename where a single profile run will be dumped",
- )
- make_option(
- "--low-connections",
- action="store_true",
- dest="low_connections",
- help="Use a low number of distinct connections - "
- "i.e. for Oracle TNS",
- )
- make_option(
- "--write-idents",
- type=str,
- dest="write_idents",
- help="write out generated follower idents to <file>, "
- "when -n<num> is used",
- )
- make_option(
- "--requirements",
- action="callback",
- type=str,
- callback=_requirements_opt,
- help="requirements class for testing, overrides setup.cfg",
- )
- make_option(
- "--include-tag",
- action="callback",
- callback=_include_tag,
- type=str,
- help="Include tests with tag <tag>; "
- "legacy, use pytest -m 'tag' instead",
- )
- make_option(
- "--exclude-tag",
- action="callback",
- callback=_exclude_tag,
- type=str,
- help="Exclude tests with tag <tag>; "
- "legacy, use pytest -m 'not tag' instead",
- )
- make_option(
- "--write-profiles",
- action="store_true",
- dest="write_profiles",
- default=False,
- help="Write/update failing profiling data.",
- )
- make_option(
- "--force-write-profiles",
- action="store_true",
- dest="force_write_profiles",
- default=False,
- help="Unconditionally write/update profiling data.",
- )
- make_option(
- "--dump-pyannotate",
- type=str,
- dest="dump_pyannotate",
- help="Run pyannotate and dump json info to given file",
- )
- make_option(
- "--mypy-extra-test-path",
- type=str,
- action="append",
- default=[],
- dest="mypy_extra_test_paths",
- help="Additional test directories to add to the mypy tests. "
- "This is used only when running mypy tests. Multiple OK",
- )
- # db specific options
- make_option(
- "--postgresql-templatedb",
- type=str,
- help="name of template database to use for PostgreSQL "
- "CREATE DATABASE (defaults to current database)",
- )
- make_option(
- "--oracledb-thick-mode",
- action="store_true",
- help="enables the 'thick mode' when testing with oracle+oracledb",
- )
-
-
-def configure_follower(follower_ident):
- """Configure required state for a follower.
-
- This invokes in the parent process and typically includes
- database creation.
-
- """
- from sqlalchemy.testing import provision
-
- provision.FOLLOWER_IDENT = follower_ident
-
-
-def memoize_important_follower_config(dict_):
- """Store important configuration we will need to send to a follower.
-
- This invokes in the parent process after normal config is set up.
-
- Hook is currently not used.
-
- """
-
-
-def restore_important_follower_config(dict_):
- """Restore important configuration needed by a follower.
-
- This invokes in the follower process.
-
- Hook is currently not used.
-
- """
-
-
-def read_config(root_path):
- global file_config
- file_config = configparser.ConfigParser()
- file_config.read(
- [str(root_path / "setup.cfg"), str(root_path / "test.cfg")]
- )
-
-
-def pre_begin(opt):
- """things to set up early, before coverage might be setup."""
- global options
- options = opt
- for fn in pre_configure:
- fn(options, file_config)
-
-
-def set_coverage_flag(value):
- options.has_coverage = value
-
-
-def post_begin():
- """things to set up later, once we know coverage is running."""
- # Lazy setup of other options (post coverage)
- for fn in post_configure:
- fn(options, file_config)
-
- # late imports, has to happen after config.
- global util, fixtures, engines, exclusions, assertions, provision
- global warnings, profiling, config, testing
- from sqlalchemy import testing # noqa
- from sqlalchemy.testing import fixtures, engines, exclusions # noqa
- from sqlalchemy.testing import assertions, warnings, profiling # noqa
- from sqlalchemy.testing import config, provision # noqa
- from sqlalchemy import util # noqa
-
- warnings.setup_filters()
-
-
-def _log(opt_str, value, parser):
- global logging
- if not logging:
- import logging
-
- logging.basicConfig()
-
- if opt_str.endswith("-info"):
- logging.getLogger(value).setLevel(logging.INFO)
- elif opt_str.endswith("-debug"):
- logging.getLogger(value).setLevel(logging.DEBUG)
-
-
-def _list_dbs(*args):
- if file_config is None:
- # assume the current working directory is the one containing the
- # setup file
- read_config(Path.cwd())
- print("Available --db options (use --dburi to override)")
- for macro in sorted(file_config.options("db")):
- print("%20s\t%s" % (macro, file_config.get("db", macro)))
- sys.exit(0)
-
-
-def _requirements_opt(opt_str, value, parser):
- _setup_requirements(value)
-
-
-def _set_tag_include(tag):
- def _do_include_tag(opt_str, value, parser):
- _include_tag(opt_str, tag, parser)
-
- return _do_include_tag
-
-
-def _set_tag_exclude(tag):
- def _do_exclude_tag(opt_str, value, parser):
- _exclude_tag(opt_str, tag, parser)
-
- return _do_exclude_tag
-
-
-def _exclude_tag(opt_str, value, parser):
- exclude_tags.add(value.replace("-", "_"))
-
-
-def _include_tag(opt_str, value, parser):
- include_tags.add(value.replace("-", "_"))
-
-
-pre_configure = []
-post_configure = []
-
-
-def pre(fn):
- pre_configure.append(fn)
- return fn
-
-
-def post(fn):
- post_configure.append(fn)
- return fn
-
-
-@pre
-def _setup_options(opt, file_config):
- global options
- options = opt
-
-
-@pre
-def _register_sqlite_numeric_dialect(opt, file_config):
- from sqlalchemy.dialects import registry
-
- registry.register(
- "sqlite.pysqlite_numeric",
- "sqlalchemy.dialects.sqlite.pysqlite",
- "_SQLiteDialect_pysqlite_numeric",
- )
- registry.register(
- "sqlite.pysqlite_dollar",
- "sqlalchemy.dialects.sqlite.pysqlite",
- "_SQLiteDialect_pysqlite_dollar",
- )
-
-
-@post
-def __ensure_cext(opt, file_config):
- if os.environ.get("REQUIRE_SQLALCHEMY_CEXT", "0") == "1":
- from sqlalchemy.util import has_compiled_ext
-
- try:
- has_compiled_ext(raise_=True)
- except ImportError as err:
- raise AssertionError(
- "REQUIRE_SQLALCHEMY_CEXT is set but can't import the "
- "cython extensions"
- ) from err
-
-
-@post
-def _init_symbols(options, file_config):
- from sqlalchemy.testing import config
-
- config._fixture_functions = _fixture_fn_class()
-
-
-@pre
-def _set_disable_asyncio(opt, file_config):
- if opt.disable_asyncio:
- asyncio.ENABLE_ASYNCIO = False
-
-
-@post
-def _engine_uri(options, file_config):
- from sqlalchemy import testing
- from sqlalchemy.testing import config
- from sqlalchemy.testing import provision
- from sqlalchemy.engine import url as sa_url
-
- if options.dburi:
- db_urls = list(options.dburi)
- else:
- db_urls = []
-
- extra_drivers = options.dbdriver or []
-
- if options.db:
- for db_token in options.db:
- for db in re.split(r"[,\s]+", db_token):
- if db not in file_config.options("db"):
- raise RuntimeError(
- "Unknown URI specifier '%s'. "
- "Specify --dbs for known uris." % db
- )
- else:
- db_urls.append(file_config.get("db", db))
-
- if not db_urls:
- db_urls.append(file_config.get("db", "default"))
-
- config._current = None
-
- if options.write_idents and provision.FOLLOWER_IDENT:
- for db_url in [sa_url.make_url(db_url) for db_url in db_urls]:
- with open(options.write_idents, "a") as file_:
- file_.write(
- f"{provision.FOLLOWER_IDENT} "
- f"{db_url.render_as_string(hide_password=False)}\n"
- )
-
- expanded_urls = list(provision.generate_db_urls(db_urls, extra_drivers))
-
- for db_url in expanded_urls:
- log.info("Adding database URL: %s", db_url)
-
- cfg = provision.setup_config(
- db_url, options, file_config, provision.FOLLOWER_IDENT
- )
- if not config._current:
- cfg.set_as_current(cfg, testing)
-
-
-@post
-def _requirements(options, file_config):
- requirement_cls = file_config.get("sqla_testing", "requirement_cls")
- _setup_requirements(requirement_cls)
-
-
-def _setup_requirements(argument):
- from sqlalchemy.testing import config
- from sqlalchemy import testing
-
- modname, clsname = argument.split(":")
-
- # importlib.import_module() only introduced in 2.7, a little
- # late
- mod = __import__(modname)
- for component in modname.split(".")[1:]:
- mod = getattr(mod, component)
- req_cls = getattr(mod, clsname)
-
- config.requirements = testing.requires = req_cls()
-
- config.bootstrapped_as_sqlalchemy = bootstrapped_as_sqlalchemy
-
-
-@post
-def _prep_testing_database(options, file_config):
- from sqlalchemy.testing import config
-
- if options.dropfirst:
- from sqlalchemy.testing import provision
-
- for cfg in config.Config.all_configs():
- provision.drop_all_schema_objects(cfg, cfg.db)
-
-
-@post
-def _post_setup_options(opt, file_config):
- from sqlalchemy.testing import config
-
- config.options = options
- config.file_config = file_config
-
-
-@post
-def _setup_profiling(options, file_config):
- from sqlalchemy.testing import profiling
-
- profiling._profile_stats = profiling.ProfileStatsFile(
- file_config.get("sqla_testing", "profile_file"),
- sort=options.profilesort,
- dump=options.profiledump,
- )
-
-
-def want_class(name, cls):
- if not issubclass(cls, fixtures.TestBase):
- return False
- elif name.startswith("_"):
- return False
- else:
- return True
-
-
-def want_method(cls, fn):
- if not fn.__name__.startswith("test_"):
- return False
- elif fn.__module__ is None:
- return False
- else:
- return True
-
-
-def generate_sub_tests(cls, module, markers):
- if "backend" in markers or "sparse_backend" in markers:
- sparse = "sparse_backend" in markers
- for cfg in _possible_configs_for_cls(cls, sparse=sparse):
- orig_name = cls.__name__
-
- # we can have special chars in these names except for the
- # pytest junit plugin, which is tripped up by the brackets
- # and periods, so sanitize
-
- alpha_name = re.sub(r"[_\[\]\.]+", "_", cfg.name)
- alpha_name = re.sub(r"_+$", "", alpha_name)
- name = "%s_%s" % (cls.__name__, alpha_name)
- subcls = type(
- name,
- (cls,),
- {"_sa_orig_cls_name": orig_name, "__only_on_config__": cfg},
- )
- setattr(module, name, subcls)
- yield subcls
- else:
- yield cls
-
-
-def start_test_class_outside_fixtures(cls):
- _do_skips(cls)
- _setup_engine(cls)
-
-
-def stop_test_class(cls):
- # close sessions, immediate connections, etc.
- fixtures.stop_test_class_inside_fixtures(cls)
-
- # close outstanding connection pool connections, dispose of
- # additional engines
- engines.testing_reaper.stop_test_class_inside_fixtures()
-
-
-def stop_test_class_outside_fixtures(cls):
- engines.testing_reaper.stop_test_class_outside_fixtures()
- provision.stop_test_class_outside_fixtures(config, config.db, cls)
- try:
- if not options.low_connections:
- assertions.global_cleanup_assertions()
- finally:
- _restore_engine()
-
-
-def _restore_engine():
- if config._current:
- config._current.reset(testing)
-
-
-def final_process_cleanup():
- engines.testing_reaper.final_cleanup()
- assertions.global_cleanup_assertions()
- _restore_engine()
-
-
-def _setup_engine(cls):
- if getattr(cls, "__engine_options__", None):
- opts = dict(cls.__engine_options__)
- opts["scope"] = "class"
- eng = engines.testing_engine(options=opts)
- config._current.push_engine(eng, testing)
-
-
-def before_test(test, test_module_name, test_class, test_name):
- # format looks like:
- # "test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause"
-
- name = getattr(test_class, "_sa_orig_cls_name", test_class.__name__)
-
- id_ = "%s.%s.%s" % (test_module_name, name, test_name)
-
- profiling._start_current_test(id_)
-
-
-def after_test(test):
- fixtures.after_test()
- engines.testing_reaper.after_test()
-
-
-def after_test_fixtures(test):
- engines.testing_reaper.after_test_outside_fixtures(test)
-
-
-def _possible_configs_for_cls(cls, reasons=None, sparse=False):
- all_configs = set(config.Config.all_configs())
-
- if cls.__unsupported_on__:
- spec = exclusions.db_spec(*cls.__unsupported_on__)
- for config_obj in list(all_configs):
- if spec(config_obj):
- all_configs.remove(config_obj)
-
- if getattr(cls, "__only_on__", None):
- spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
- for config_obj in list(all_configs):
- if not spec(config_obj):
- all_configs.remove(config_obj)
-
- if getattr(cls, "__only_on_config__", None):
- all_configs.intersection_update([cls.__only_on_config__])
-
- if hasattr(cls, "__requires__"):
- requirements = config.requirements
- for config_obj in list(all_configs):
- for requirement in cls.__requires__:
- check = getattr(requirements, requirement)
-
- skip_reasons = check.matching_config_reasons(config_obj)
- if skip_reasons:
- all_configs.remove(config_obj)
- if reasons is not None:
- reasons.extend(skip_reasons)
- break
-
- if hasattr(cls, "__prefer_requires__"):
- non_preferred = set()
- requirements = config.requirements
- for config_obj in list(all_configs):
- for requirement in cls.__prefer_requires__:
- check = getattr(requirements, requirement)
-
- if not check.enabled_for_config(config_obj):
- non_preferred.add(config_obj)
- if all_configs.difference(non_preferred):
- all_configs.difference_update(non_preferred)
-
- if sparse:
- # pick only one config from each base dialect
- # sorted so we get the same backend each time selecting the highest
- # server version info.
- per_dialect = {}
- for cfg in reversed(
- sorted(
- all_configs,
- key=lambda cfg: (
- cfg.db.name,
- cfg.db.driver,
- cfg.db.dialect.server_version_info,
- ),
- )
- ):
- db = cfg.db.name
- if db not in per_dialect:
- per_dialect[db] = cfg
- return per_dialect.values()
-
- return all_configs
-
-
-def _do_skips(cls):
- reasons = []
- all_configs = _possible_configs_for_cls(cls, reasons)
-
- if getattr(cls, "__skip_if__", False):
- for c in getattr(cls, "__skip_if__"):
- if c():
- config.skip_test(
- "'%s' skipped by %s" % (cls.__name__, c.__name__)
- )
-
- if not all_configs:
- msg = "'%s.%s' unsupported on any DB implementation %s%s" % (
- cls.__module__,
- cls.__name__,
- ", ".join(
- "'%s(%s)+%s'"
- % (
- config_obj.db.name,
- ".".join(
- str(dig)
- for dig in exclusions._server_version(config_obj.db)
- ),
- config_obj.db.driver,
- )
- for config_obj in config.Config.all_configs()
- ),
- ", ".join(reasons),
- )
- config.skip_test(msg)
- elif hasattr(cls, "__prefer_backends__"):
- non_preferred = set()
- spec = exclusions.db_spec(*util.to_list(cls.__prefer_backends__))
- for config_obj in all_configs:
- if not spec(config_obj):
- non_preferred.add(config_obj)
- if all_configs.difference(non_preferred):
- all_configs.difference_update(non_preferred)
-
- if config._current not in all_configs:
- _setup_config(all_configs.pop(), cls)
-
-
-def _setup_config(config_obj, ctx):
- config._current.push(config_obj, testing)
-
-
-class FixtureFunctions(abc.ABC):
- @abc.abstractmethod
- def skip_test_exception(self, *arg, **kw):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def combinations(self, *args, **kw):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def param_ident(self, *args, **kw):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def fixture(self, *arg, **kw):
- raise NotImplementedError()
-
- def get_current_test_name(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def mark_base_test_class(self) -> Any:
- raise NotImplementedError()
-
- @abc.abstractproperty
- def add_to_marker(self):
- raise NotImplementedError()
-
-
-_fixture_fn_class = None
-
-
-def set_fixture_functions(fixture_fn_class):
- global _fixture_fn_class
- _fixture_fn_class = fixture_fn_class
diff --git a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/pytestplugin.py b/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/pytestplugin.py
deleted file mode 100644
index 1a4d4bb..0000000
--- a/venv/lib/python3.11/site-packages/sqlalchemy/testing/plugin/pytestplugin.py
+++ /dev/null
@@ -1,868 +0,0 @@
-# testing/plugin/pytestplugin.py
-# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: https://www.opensource.org/licenses/mit-license.php
-# mypy: ignore-errors
-
-from __future__ import annotations
-
-import argparse
-import collections
-from functools import update_wrapper
-import inspect
-import itertools
-import operator
-import os
-import re
-import sys
-from typing import TYPE_CHECKING
-import uuid
-
-import pytest
-
-try:
- # installed by bootstrap.py
- if not TYPE_CHECKING:
- import sqla_plugin_base as plugin_base
-except ImportError:
- # assume we're a package, use traditional import
- from . import plugin_base
-
-
-def pytest_addoption(parser):
- group = parser.getgroup("sqlalchemy")
-
- def make_option(name, **kw):
- callback_ = kw.pop("callback", None)
- if callback_:
-
- class CallableAction(argparse.Action):
- def __call__(
- self, parser, namespace, values, option_string=None
- ):
- callback_(option_string, values, parser)
-
- kw["action"] = CallableAction
-
- zeroarg_callback = kw.pop("zeroarg_callback", None)
- if zeroarg_callback:
-
- class CallableAction(argparse.Action):
- def __init__(
- self,
- option_strings,
- dest,
- default=False,
- required=False,
- help=None, # noqa
- ):
- super().__init__(
- option_strings=option_strings,
- dest=dest,
- nargs=0,
- const=True,
- default=default,
- required=required,
- help=help,
- )
-
- def __call__(
- self, parser, namespace, values, option_string=None
- ):
- zeroarg_callback(option_string, values, parser)
-
- kw["action"] = CallableAction
-
- group.addoption(name, **kw)
-
- plugin_base.setup_options(make_option)
-
-
-def pytest_configure(config: pytest.Config):
- plugin_base.read_config(config.rootpath)
- if plugin_base.exclude_tags or plugin_base.include_tags:
- new_expr = " and ".join(
- list(plugin_base.include_tags)
- + [f"not {tag}" for tag in plugin_base.exclude_tags]
- )
-
- if config.option.markexpr:
- config.option.markexpr += f" and {new_expr}"
- else:
- config.option.markexpr = new_expr
-
- if config.pluginmanager.hasplugin("xdist"):
- config.pluginmanager.register(XDistHooks())
-
- if hasattr(config, "workerinput"):
- plugin_base.restore_important_follower_config(config.workerinput)
- plugin_base.configure_follower(config.workerinput["follower_ident"])
- else:
- if config.option.write_idents and os.path.exists(
- config.option.write_idents
- ):
- os.remove(config.option.write_idents)
-
- plugin_base.pre_begin(config.option)
-
- plugin_base.set_coverage_flag(
- bool(getattr(config.option, "cov_source", False))
- )
-
- plugin_base.set_fixture_functions(PytestFixtureFunctions)
-
- if config.option.dump_pyannotate:
- global DUMP_PYANNOTATE
- DUMP_PYANNOTATE = True
-
-
-DUMP_PYANNOTATE = False
-
-
-@pytest.fixture(autouse=True)
-def collect_types_fixture():
- if DUMP_PYANNOTATE:
- from pyannotate_runtime import collect_types
-
- collect_types.start()
- yield
- if DUMP_PYANNOTATE:
- collect_types.stop()
-
-
-def _log_sqlalchemy_info(session):
- import sqlalchemy
- from sqlalchemy import __version__
- from sqlalchemy.util import has_compiled_ext
- from sqlalchemy.util._has_cy import _CYEXTENSION_MSG
-
- greet = "sqlalchemy installation"
- site = "no user site" if sys.flags.no_user_site else "user site loaded"
- msgs = [
- f"SQLAlchemy {__version__} ({site})",
- f"Path: {sqlalchemy.__file__}",
- ]
-
- if has_compiled_ext():
- from sqlalchemy.cyextension import util
-
- msgs.append(f"compiled extension enabled, e.g. {util.__file__} ")
- else:
- msgs.append(f"compiled extension not enabled; {_CYEXTENSION_MSG}")
-
- pm = session.config.pluginmanager.get_plugin("terminalreporter")
- if pm:
- pm.write_sep("=", greet)
- for m in msgs:
- pm.write_line(m)
- else:
- # fancy pants reporter not found, fallback to plain print
- print("=" * 25, greet, "=" * 25)
- for m in msgs:
- print(m)
-
-
-def pytest_sessionstart(session):
- from sqlalchemy.testing import asyncio
-
- _log_sqlalchemy_info(session)
- asyncio._assume_async(plugin_base.post_begin)
-
-
-def pytest_sessionfinish(session):
- from sqlalchemy.testing import asyncio
-
- asyncio._maybe_async_provisioning(plugin_base.final_process_cleanup)
-
- if session.config.option.dump_pyannotate:
- from pyannotate_runtime import collect_types
-
- collect_types.dump_stats(session.config.option.dump_pyannotate)
-
-
-def pytest_unconfigure(config):
- from sqlalchemy.testing import asyncio
-
- asyncio._shutdown()
-
-
-def pytest_collection_finish(session):
- if session.config.option.dump_pyannotate:
- from pyannotate_runtime import collect_types
-
- lib_sqlalchemy = os.path.abspath("lib/sqlalchemy")
-
- def _filter(filename):
- filename = os.path.normpath(os.path.abspath(filename))
- if "lib/sqlalchemy" not in os.path.commonpath(
- [filename, lib_sqlalchemy]
- ):
- return None
- if "testing" in filename:
- return None
-
- return filename
-
- collect_types.init_types_collection(filter_filename=_filter)
-
-
-class XDistHooks:
- def pytest_configure_node(self, node):
- from sqlalchemy.testing import provision
- from sqlalchemy.testing import asyncio
-
- # the master for each node fills workerinput dictionary
- # which pytest-xdist will transfer to the subprocess
-
- plugin_base.memoize_important_follower_config(node.workerinput)
-
- node.workerinput["follower_ident"] = "test_%s" % uuid.uuid4().hex[0:12]
-
- asyncio._maybe_async_provisioning(
- provision.create_follower_db, node.workerinput["follower_ident"]
- )
-
- def pytest_testnodedown(self, node, error):
- from sqlalchemy.testing import provision
- from sqlalchemy.testing import asyncio
-
- asyncio._maybe_async_provisioning(
- provision.drop_follower_db, node.workerinput["follower_ident"]
- )
-
-
-def pytest_collection_modifyitems(session, config, items):
- # look for all those classes that specify __backend__ and
- # expand them out into per-database test cases.
-
- # this is much easier to do within pytest_pycollect_makeitem, however
- # pytest is iterating through cls.__dict__ as makeitem is
- # called which causes a "dictionary changed size" error on py3k.
- # I'd submit a pullreq for them to turn it into a list first, but
- # it's to suit the rather odd use case here which is that we are adding
- # new classes to a module on the fly.
-
- from sqlalchemy.testing import asyncio
-
- rebuilt_items = collections.defaultdict(
- lambda: collections.defaultdict(list)
- )
-
- items[:] = [
- item
- for item in items
- if item.getparent(pytest.Class) is not None
- and not item.getparent(pytest.Class).name.startswith("_")
- ]
-
- test_classes = {item.getparent(pytest.Class) for item in items}
-
- def collect(element):
- for inst_or_fn in element.collect():
- if isinstance(inst_or_fn, pytest.Collector):
- yield from collect(inst_or_fn)
- else:
- yield inst_or_fn
-
- def setup_test_classes():
- for test_class in test_classes:
- # transfer legacy __backend__ and __sparse_backend__ symbols
- # to be markers
- add_markers = set()
- if getattr(test_class.cls, "__backend__", False) or getattr(
- test_class.cls, "__only_on__", False
- ):
- add_markers = {"backend"}
- elif getattr(test_class.cls, "__sparse_backend__", False):
- add_markers = {"sparse_backend"}
- else:
- add_markers = frozenset()
-
- existing_markers = {
- mark.name for mark in test_class.iter_markers()
- }
- add_markers = add_markers - existing_markers
- all_markers = existing_markers.union(add_markers)
-
- for marker in add_markers:
- test_class.add_marker(marker)
-
- for sub_cls in plugin_base.generate_sub_tests(
- test_class.cls, test_class.module, all_markers
- ):
- if sub_cls is not test_class.cls:
- per_cls_dict = rebuilt_items[test_class.cls]
-
- module = test_class.getparent(pytest.Module)
-
- new_cls = pytest.Class.from_parent(
- name=sub_cls.__name__, parent=module
- )
- for marker in add_markers:
- new_cls.add_marker(marker)
-
- for fn in collect(new_cls):
- per_cls_dict[fn.name].append(fn)
-
- # class requirements will sometimes need to access the DB to check
- # capabilities, so need to do this for async
- asyncio._maybe_async_provisioning(setup_test_classes)
-
- newitems = []
- for item in items:
- cls_ = item.cls
- if cls_ in rebuilt_items:
- newitems.extend(rebuilt_items[cls_][item.name])
- else:
- newitems.append(item)
-
- # seems like the functions attached to a test class aren't sorted already?
- # is that true and why's that? (when using unittest, they're sorted)
- items[:] = sorted(
- newitems,
- key=lambda item: (
- item.getparent(pytest.Module).name,
- item.getparent(pytest.Class).name,
- item.name,
- ),
- )
-
-
-def pytest_pycollect_makeitem(collector, name, obj):
- if inspect.isclass(obj) and plugin_base.want_class(name, obj):
- from sqlalchemy.testing import config
-
- if config.any_async:
- obj = _apply_maybe_async(obj)
-
- return [
- pytest.Class.from_parent(
- name=parametrize_cls.__name__, parent=collector
- )
- for parametrize_cls in _parametrize_cls(collector.module, obj)
- ]
- elif (
- inspect.isfunction(obj)
- and collector.cls is not None
- and plugin_base.want_method(collector.cls, obj)
- ):
- # None means, fall back to default logic, which includes
- # method-level parametrize
- return None
- else:
- # empty list means skip this item
- return []
-
-
-def _is_wrapped_coroutine_function(fn):
- while hasattr(fn, "__wrapped__"):
- fn = fn.__wrapped__
-
- return inspect.iscoroutinefunction(fn)
-
-
-def _apply_maybe_async(obj, recurse=True):
- from sqlalchemy.testing import asyncio
-
- for name, value in vars(obj).items():
- if (
- (callable(value) or isinstance(value, classmethod))
- and not getattr(value, "_maybe_async_applied", False)
- and (name.startswith("test_"))
- and not _is_wrapped_coroutine_function(value)
- ):
- is_classmethod = False
- if isinstance(value, classmethod):
- value = value.__func__
- is_classmethod = True
-
- @_pytest_fn_decorator
- def make_async(fn, *args, **kwargs):
- return asyncio._maybe_async(fn, *args, **kwargs)
-
- do_async = make_async(value)
- if is_classmethod:
- do_async = classmethod(do_async)
- do_async._maybe_async_applied = True
-
- setattr(obj, name, do_async)
- if recurse:
- for cls in obj.mro()[1:]:
- if cls != object:
- _apply_maybe_async(cls, False)
- return obj
-
-
-def _parametrize_cls(module, cls):
- """implement a class-based version of pytest parametrize."""
-
- if "_sa_parametrize" not in cls.__dict__:
- return [cls]
-
- _sa_parametrize = cls._sa_parametrize
- classes = []
- for full_param_set in itertools.product(
- *[params for argname, params in _sa_parametrize]
- ):
- cls_variables = {}
-
- for argname, param in zip(
- [_sa_param[0] for _sa_param in _sa_parametrize], full_param_set
- ):
- if not argname:
- raise TypeError("need argnames for class-based combinations")
- argname_split = re.split(r",\s*", argname)
- for arg, val in zip(argname_split, param.values):
- cls_variables[arg] = val
- parametrized_name = "_".join(
- re.sub(r"\W", "", token)
- for param in full_param_set
- for token in param.id.split("-")
- )
- name = "%s_%s" % (cls.__name__, parametrized_name)
- newcls = type.__new__(type, name, (cls,), cls_variables)
- setattr(module, name, newcls)
- classes.append(newcls)
- return classes
-
-
-_current_class = None
-
-
-def pytest_runtest_setup(item):
- from sqlalchemy.testing import asyncio
-
- # pytest_runtest_setup runs *before* pytest fixtures with scope="class".
- # plugin_base.start_test_class_outside_fixtures may opt to raise SkipTest
- # for the whole class and has to run things that are across all current
- # databases, so we run this outside of the pytest fixture system altogether
- # and ensure asyncio greenlet if any engines are async
-
- global _current_class
-
- if isinstance(item, pytest.Function) and _current_class is None:
- asyncio._maybe_async_provisioning(
- plugin_base.start_test_class_outside_fixtures,
- item.cls,
- )
- _current_class = item.getparent(pytest.Class)
-
-
-@pytest.hookimpl(hookwrapper=True)
-def pytest_runtest_teardown(item, nextitem):
- # runs inside of pytest function fixture scope
- # after test function runs
-
- from sqlalchemy.testing import asyncio
-
- asyncio._maybe_async(plugin_base.after_test, item)
-
- yield
- # this is now after all the fixture teardown have run, the class can be
- # finalized. Since pytest v7 this finalizer can no longer be added in
- # pytest_runtest_setup since the class has not yet been setup at that
- # time.
- # See https://github.com/pytest-dev/pytest/issues/9343
- global _current_class, _current_report
-
- if _current_class is not None and (
- # last test or a new class
- nextitem is None
- or nextitem.getparent(pytest.Class) is not _current_class
- ):
- _current_class = None
-
- try:
- asyncio._maybe_async_provisioning(
- plugin_base.stop_test_class_outside_fixtures, item.cls
- )
- except Exception as e:
- # in case of an exception during teardown attach the original
- # error to the exception message, otherwise it will get lost
- if _current_report.failed:
- if not e.args:
- e.args = (
- "__Original test failure__:\n"
- + _current_report.longreprtext,
- )
- elif e.args[-1] and isinstance(e.args[-1], str):
- args = list(e.args)
- args[-1] += (
- "\n__Original test failure__:\n"
- + _current_report.longreprtext
- )
- e.args = tuple(args)
- else:
- e.args += (
- "__Original test failure__",
- _current_report.longreprtext,
- )
- raise
- finally:
- _current_report = None
-
-
-def pytest_runtest_call(item):
- # runs inside of pytest function fixture scope
- # before test function runs
-
- from sqlalchemy.testing import asyncio
-
- asyncio._maybe_async(
- plugin_base.before_test,
- item,
- item.module.__name__,
- item.cls,
- item.name,
- )
-
-
-_current_report = None
-
-
-def pytest_runtest_logreport(report):
- global _current_report
- if report.when == "call":
- _current_report = report
-
-
-@pytest.fixture(scope="class")
-def setup_class_methods(request):
- from sqlalchemy.testing import asyncio
-
- cls = request.cls
-
- if hasattr(cls, "setup_test_class"):
- asyncio._maybe_async(cls.setup_test_class)
-
- yield
-
- if hasattr(cls, "teardown_test_class"):
- asyncio._maybe_async(cls.teardown_test_class)
-
- asyncio._maybe_async(plugin_base.stop_test_class, cls)
-
-
-@pytest.fixture(scope="function")
-def setup_test_methods(request):
- from sqlalchemy.testing import asyncio
-
- # called for each test
-
- self = request.instance
-
- # before this fixture runs:
-
- # 1. function level "autouse" fixtures under py3k (examples: TablesTest
- # define tables / data, MappedTest define tables / mappers / data)
-
- # 2. was for p2k. no longer applies
-
- # 3. run outer xdist-style setup
- if hasattr(self, "setup_test"):
- asyncio._maybe_async(self.setup_test)
-
- # alembic test suite is using setUp and tearDown
- # xdist methods; support these in the test suite
- # for the near term
- if hasattr(self, "setUp"):
- asyncio._maybe_async(self.setUp)
-
- # inside the yield:
- # 4. function level fixtures defined on test functions themselves,
- # e.g. "connection", "metadata" run next
-
- # 5. pytest hook pytest_runtest_call then runs
-
- # 6. test itself runs
-
- yield
-
- # yield finishes:
-
- # 7. function level fixtures defined on test functions
- # themselves, e.g. "connection" rolls back the transaction, "metadata"
- # emits drop all
-
- # 8. pytest hook pytest_runtest_teardown hook runs, this is associated
- # with fixtures close all sessions, provisioning.stop_test_class(),
- # engines.testing_reaper -> ensure all connection pool connections
- # are returned, engines created by testing_engine that aren't the
- # config engine are disposed
-
- asyncio._maybe_async(plugin_base.after_test_fixtures, self)
-
- # 10. run xdist-style teardown
- if hasattr(self, "tearDown"):
- asyncio._maybe_async(self.tearDown)
-
- if hasattr(self, "teardown_test"):
- asyncio._maybe_async(self.teardown_test)
-
- # 11. was for p2k. no longer applies
-
- # 12. function level "autouse" fixtures under py3k (examples: TablesTest /
- # MappedTest delete table data, possibly drop tables and clear mappers
- # depending on the flags defined by the test class)
-
-
-def _pytest_fn_decorator(target):
- """Port of langhelpers.decorator with pytest-specific tricks."""
-
- from sqlalchemy.util.langhelpers import format_argspec_plus
- from sqlalchemy.util.compat import inspect_getfullargspec
-
- def _exec_code_in_env(code, env, fn_name):
- # note this is affected by "from __future__ import annotations" at
- # the top; exec'ed code will use non-evaluated annotations
- # which allows us to be more flexible with code rendering
- # in format_argpsec_plus()
- exec(code, env)
- return env[fn_name]
-
- def decorate(fn, add_positional_parameters=()):
- spec = inspect_getfullargspec(fn)
- if add_positional_parameters:
- spec.args.extend(add_positional_parameters)
-
- metadata = dict(
- __target_fn="__target_fn", __orig_fn="__orig_fn", name=fn.__name__
- )
- metadata.update(format_argspec_plus(spec, grouped=False))
- code = (
- """\
-def %(name)s%(grouped_args)s:
- return %(__target_fn)s(%(__orig_fn)s, %(apply_kw)s)
-"""
- % metadata
- )
- decorated = _exec_code_in_env(
- code, {"__target_fn": target, "__orig_fn": fn}, fn.__name__
- )
- if not add_positional_parameters:
- decorated.__defaults__ = getattr(fn, "__func__", fn).__defaults__
- decorated.__wrapped__ = fn
- return update_wrapper(decorated, fn)
- else:
- # this is the pytest hacky part. don't do a full update wrapper
- # because pytest is really being sneaky about finding the args
- # for the wrapped function
- decorated.__module__ = fn.__module__
- decorated.__name__ = fn.__name__
- if hasattr(fn, "pytestmark"):
- decorated.pytestmark = fn.pytestmark
- return decorated
-
- return decorate
-
-
-class PytestFixtureFunctions(plugin_base.FixtureFunctions):
- def skip_test_exception(self, *arg, **kw):
- return pytest.skip.Exception(*arg, **kw)
-
- @property
- def add_to_marker(self):
- return pytest.mark
-
- def mark_base_test_class(self):
- return pytest.mark.usefixtures(
- "setup_class_methods", "setup_test_methods"
- )
-
- _combination_id_fns = {
- "i": lambda obj: obj,
- "r": repr,
- "s": str,
- "n": lambda obj: (
- obj.__name__ if hasattr(obj, "__name__") else type(obj).__name__
- ),
- }
-
- def combinations(self, *arg_sets, **kw):
- """Facade for pytest.mark.parametrize.
-
- Automatically derives argument names from the callable which in our
- case is always a method on a class with positional arguments.
-
- ids for parameter sets are derived using an optional template.
-
- """
- from sqlalchemy.testing import exclusions
-
- if len(arg_sets) == 1 and hasattr(arg_sets[0], "__next__"):
- arg_sets = list(arg_sets[0])
-
- argnames = kw.pop("argnames", None)
-
- def _filter_exclusions(args):
- result = []
- gathered_exclusions = []
- for a in args:
- if isinstance(a, exclusions.compound):
- gathered_exclusions.append(a)
- else:
- result.append(a)
-
- return result, gathered_exclusions
-
- id_ = kw.pop("id_", None)
-
- tobuild_pytest_params = []
- has_exclusions = False
- if id_:
- _combination_id_fns = self._combination_id_fns
-
- # because itemgetter is not consistent for one argument vs.
- # multiple, make it multiple in all cases and use a slice
- # to omit the first argument
- _arg_getter = operator.itemgetter(
- 0,
- *[
- idx
- for idx, char in enumerate(id_)
- if char in ("n", "r", "s", "a")
- ],
- )
- fns = [
- (operator.itemgetter(idx), _combination_id_fns[char])
- for idx, char in enumerate(id_)
- if char in _combination_id_fns
- ]
-
- for arg in arg_sets:
- if not isinstance(arg, tuple):
- arg = (arg,)
-
- fn_params, param_exclusions = _filter_exclusions(arg)
-
- parameters = _arg_getter(fn_params)[1:]
-
- if param_exclusions:
- has_exclusions = True
-
- tobuild_pytest_params.append(
- (
- parameters,
- param_exclusions,
- "-".join(
- comb_fn(getter(arg)) for getter, comb_fn in fns
- ),
- )
- )
-
- else:
- for arg in arg_sets:
- if not isinstance(arg, tuple):
- arg = (arg,)
-
- fn_params, param_exclusions = _filter_exclusions(arg)
-
- if param_exclusions:
- has_exclusions = True
-
- tobuild_pytest_params.append(
- (fn_params, param_exclusions, None)
- )
-
- pytest_params = []
- for parameters, param_exclusions, id_ in tobuild_pytest_params:
- if has_exclusions:
- parameters += (param_exclusions,)
-
- param = pytest.param(*parameters, id=id_)
- pytest_params.append(param)
-
- def decorate(fn):
- if inspect.isclass(fn):
- if has_exclusions:
- raise NotImplementedError(
- "exclusions not supported for class level combinations"
- )
- if "_sa_parametrize" not in fn.__dict__:
- fn._sa_parametrize = []
- fn._sa_parametrize.append((argnames, pytest_params))
- return fn
- else:
- _fn_argnames = inspect.getfullargspec(fn).args[1:]
- if argnames is None:
- _argnames = _fn_argnames
- else:
- _argnames = re.split(r", *", argnames)
-
- if has_exclusions:
- existing_exl = sum(
- 1 for n in _fn_argnames if n.startswith("_exclusions")
- )
- current_exclusion_name = f"_exclusions_{existing_exl}"
- _argnames += [current_exclusion_name]
-
- @_pytest_fn_decorator
- def check_exclusions(fn, *args, **kw):
- _exclusions = args[-1]
- if _exclusions:
- exlu = exclusions.compound().add(*_exclusions)
- fn = exlu(fn)
- return fn(*args[:-1], **kw)
-
- fn = check_exclusions(
- fn, add_positional_parameters=(current_exclusion_name,)
- )
-
- return pytest.mark.parametrize(_argnames, pytest_params)(fn)
-
- return decorate
-
- def param_ident(self, *parameters):
- ident = parameters[0]
- return pytest.param(*parameters[1:], id=ident)
-
- def fixture(self, *arg, **kw):
- from sqlalchemy.testing import config
- from sqlalchemy.testing import asyncio
-
- # wrapping pytest.fixture function. determine if
- # decorator was called as @fixture or @fixture().
- if len(arg) > 0 and callable(arg[0]):
- # was called as @fixture(), we have the function to wrap.
- fn = arg[0]
- arg = arg[1:]
- else:
- # was called as @fixture, don't have the function yet.
- fn = None
-
- # create a pytest.fixture marker. because the fn is not being
- # passed, this is always a pytest.FixtureFunctionMarker()
- # object (or whatever pytest is calling it when you read this)
- # that is waiting for a function.
- fixture = pytest.fixture(*arg, **kw)
-
- # now apply wrappers to the function, including fixture itself
-
- def wrap(fn):
- if config.any_async:
- fn = asyncio._maybe_async_wrapper(fn)
- # other wrappers may be added here
-
- # now apply FixtureFunctionMarker
- fn = fixture(fn)
-
- return fn
-
- if fn:
- return wrap(fn)
- else:
- return wrap
-
- def get_current_test_name(self):
- return os.environ.get("PYTEST_CURRENT_TEST")
-
- def async_test(self, fn):
- from sqlalchemy.testing import asyncio
-
- @_pytest_fn_decorator
- def decorate(fn, *args, **kwargs):
- asyncio._run_coroutine_function(fn, *args, **kwargs)
-
- return decorate(fn)