From 12cf076118570eebbff08c6b3090e0d4798447a1 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:17:55 -0400 Subject: no venv --- .../site-packages/litestar/background_tasks.py | 74 ---------------------- 1 file changed, 74 deletions(-) delete mode 100644 venv/lib/python3.11/site-packages/litestar/background_tasks.py (limited to 'venv/lib/python3.11/site-packages/litestar/background_tasks.py') diff --git a/venv/lib/python3.11/site-packages/litestar/background_tasks.py b/venv/lib/python3.11/site-packages/litestar/background_tasks.py deleted file mode 100644 index a475836..0000000 --- a/venv/lib/python3.11/site-packages/litestar/background_tasks.py +++ /dev/null @@ -1,74 +0,0 @@ -from typing import Any, Callable, Iterable - -from anyio import create_task_group -from typing_extensions import ParamSpec - -from litestar.utils.sync import ensure_async_callable - -__all__ = ("BackgroundTask", "BackgroundTasks") - - -P = ParamSpec("P") - - -class BackgroundTask: - """A container for a 'background' task function. - - Background tasks are called once a Response finishes. - """ - - __slots__ = ("fn", "args", "kwargs") - - def __init__(self, fn: Callable[P, Any], *args: P.args, **kwargs: P.kwargs) -> None: - """Initialize ``BackgroundTask``. - - Args: - fn: A sync or async function to call as the background task. - *args: Args to pass to the func. - **kwargs: Kwargs to pass to the func - """ - self.fn = ensure_async_callable(fn) - self.args = args - self.kwargs = kwargs - - async def __call__(self) -> None: - """Call the wrapped function with the passed in arguments. - - Returns: - None - """ - await self.fn(*self.args, **self.kwargs) - - -class BackgroundTasks: - """A container for multiple 'background' task functions. - - Background tasks are called once a Response finishes. - """ - - __slots__ = ("tasks", "run_in_task_group") - - def __init__(self, tasks: Iterable[BackgroundTask], run_in_task_group: bool = False) -> None: - """Initialize ``BackgroundTasks``. - - Args: - tasks: An iterable of :class:`BackgroundTask <.background_tasks.BackgroundTask>` instances. - run_in_task_group: If you set this value to ``True`` than the tasks will run concurrently, using - a :class:`TaskGroup `. Note: This will not preserve execution order. - """ - self.tasks = tasks - self.run_in_task_group = run_in_task_group - - async def __call__(self) -> None: - """Call the wrapped background tasks. - - Returns: - None - """ - if self.run_in_task_group: - async with create_task_group() as task_group: - for task in self.tasks: - task_group.start_soon(task) - else: - for task in self.tasks: - await task() -- cgit v1.2.3