summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/anyio/pytest_plugin.py
diff options
context:
space:
mode:
authorcyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
committercyfraeviolae <cyfraeviolae>2024-04-03 03:10:44 -0400
commit6d7ba58f880be618ade07f8ea080fe8c4bf8a896 (patch)
treeb1c931051ffcebd2bd9d61d98d6233ffa289bbce /venv/lib/python3.11/site-packages/anyio/pytest_plugin.py
parent4f884c9abc32990b4061a1bb6997b4b37e58ea0b (diff)
venv
Diffstat (limited to 'venv/lib/python3.11/site-packages/anyio/pytest_plugin.py')
-rw-r--r--venv/lib/python3.11/site-packages/anyio/pytest_plugin.py149
1 files changed, 149 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/anyio/pytest_plugin.py b/venv/lib/python3.11/site-packages/anyio/pytest_plugin.py
new file mode 100644
index 0000000..a8dd6f3
--- /dev/null
+++ b/venv/lib/python3.11/site-packages/anyio/pytest_plugin.py
@@ -0,0 +1,149 @@
+from __future__ import annotations
+
+from collections.abc import Iterator
+from contextlib import ExitStack, contextmanager
+from inspect import isasyncgenfunction, iscoroutinefunction
+from typing import Any, Dict, Tuple, cast
+
+import pytest
+import sniffio
+
+from ._core._eventloop import get_all_backends, get_async_backend
+from .abc import TestRunner
+
+_current_runner: TestRunner | None = None
+_runner_stack: ExitStack | None = None
+_runner_leases = 0
+
+
+def extract_backend_and_options(backend: object) -> tuple[str, dict[str, Any]]:
+ if isinstance(backend, str):
+ return backend, {}
+ elif isinstance(backend, tuple) and len(backend) == 2:
+ if isinstance(backend[0], str) and isinstance(backend[1], dict):
+ return cast(Tuple[str, Dict[str, Any]], backend)
+
+ raise TypeError("anyio_backend must be either a string or tuple of (string, dict)")
+
+
+@contextmanager
+def get_runner(
+ backend_name: str, backend_options: dict[str, Any]
+) -> Iterator[TestRunner]:
+ global _current_runner, _runner_leases, _runner_stack
+ if _current_runner is None:
+ asynclib = get_async_backend(backend_name)
+ _runner_stack = ExitStack()
+ if sniffio.current_async_library_cvar.get(None) is None:
+ # Since we're in control of the event loop, we can cache the name of the
+ # async library
+ token = sniffio.current_async_library_cvar.set(backend_name)
+ _runner_stack.callback(sniffio.current_async_library_cvar.reset, token)
+
+ backend_options = backend_options or {}
+ _current_runner = _runner_stack.enter_context(
+ asynclib.create_test_runner(backend_options)
+ )
+
+ _runner_leases += 1
+ try:
+ yield _current_runner
+ finally:
+ _runner_leases -= 1
+ if not _runner_leases:
+ assert _runner_stack is not None
+ _runner_stack.close()
+ _runner_stack = _current_runner = None
+
+
+def pytest_configure(config: Any) -> None:
+ config.addinivalue_line(
+ "markers",
+ "anyio: mark the (coroutine function) test to be run "
+ "asynchronously via anyio.",
+ )
+
+
+def pytest_fixture_setup(fixturedef: Any, request: Any) -> None:
+ def wrapper(*args, anyio_backend, **kwargs): # type: ignore[no-untyped-def]
+ backend_name, backend_options = extract_backend_and_options(anyio_backend)
+ if has_backend_arg:
+ kwargs["anyio_backend"] = anyio_backend
+
+ with get_runner(backend_name, backend_options) as runner:
+ if isasyncgenfunction(func):
+ yield from runner.run_asyncgen_fixture(func, kwargs)
+ else:
+ yield runner.run_fixture(func, kwargs)
+
+ # Only apply this to coroutine functions and async generator functions in requests
+ # that involve the anyio_backend fixture
+ func = fixturedef.func
+ if isasyncgenfunction(func) or iscoroutinefunction(func):
+ if "anyio_backend" in request.fixturenames:
+ has_backend_arg = "anyio_backend" in fixturedef.argnames
+ fixturedef.func = wrapper
+ if not has_backend_arg:
+ fixturedef.argnames += ("anyio_backend",)
+
+
+@pytest.hookimpl(tryfirst=True)
+def pytest_pycollect_makeitem(collector: Any, name: Any, obj: Any) -> None:
+ if collector.istestfunction(obj, name):
+ inner_func = obj.hypothesis.inner_test if hasattr(obj, "hypothesis") else obj
+ if iscoroutinefunction(inner_func):
+ marker = collector.get_closest_marker("anyio")
+ own_markers = getattr(obj, "pytestmark", ())
+ if marker or any(marker.name == "anyio" for marker in own_markers):
+ pytest.mark.usefixtures("anyio_backend")(obj)
+
+
+@pytest.hookimpl(tryfirst=True)
+def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
+ def run_with_hypothesis(**kwargs: Any) -> None:
+ with get_runner(backend_name, backend_options) as runner:
+ runner.run_test(original_func, kwargs)
+
+ backend = pyfuncitem.funcargs.get("anyio_backend")
+ if backend:
+ backend_name, backend_options = extract_backend_and_options(backend)
+
+ if hasattr(pyfuncitem.obj, "hypothesis"):
+ # Wrap the inner test function unless it's already wrapped
+ original_func = pyfuncitem.obj.hypothesis.inner_test
+ if original_func.__qualname__ != run_with_hypothesis.__qualname__:
+ if iscoroutinefunction(original_func):
+ pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
+
+ return None
+
+ if iscoroutinefunction(pyfuncitem.obj):
+ funcargs = pyfuncitem.funcargs
+ testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
+ with get_runner(backend_name, backend_options) as runner:
+ runner.run_test(pyfuncitem.obj, testargs)
+
+ return True
+
+ return None
+
+
+@pytest.fixture(scope="module", params=get_all_backends())
+def anyio_backend(request: Any) -> Any:
+ return request.param
+
+
+@pytest.fixture
+def anyio_backend_name(anyio_backend: Any) -> str:
+ if isinstance(anyio_backend, str):
+ return anyio_backend
+ else:
+ return anyio_backend[0]
+
+
+@pytest.fixture
+def anyio_backend_options(anyio_backend: Any) -> dict[str, Any]:
+ if isinstance(anyio_backend, str):
+ return {}
+ else:
+ return anyio_backend[1]