From 6d7ba58f880be618ade07f8ea080fe8c4bf8a896 Mon Sep 17 00:00:00 2001 From: cyfraeviolae Date: Wed, 3 Apr 2024 03:10:44 -0400 Subject: venv --- .../lib/python3.11/site-packages/watchfiles/run.py | 441 +++++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 venv/lib/python3.11/site-packages/watchfiles/run.py (limited to 'venv/lib/python3.11/site-packages/watchfiles/run.py') diff --git a/venv/lib/python3.11/site-packages/watchfiles/run.py b/venv/lib/python3.11/site-packages/watchfiles/run.py new file mode 100644 index 0000000..d1e2494 --- /dev/null +++ b/venv/lib/python3.11/site-packages/watchfiles/run.py @@ -0,0 +1,441 @@ +import contextlib +import json +import logging +import os +import re +import shlex +import signal +import subprocess +import sys +from importlib import import_module +from multiprocessing import get_context +from multiprocessing.context import SpawnProcess +from pathlib import Path +from time import sleep +from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union + +import anyio + +from .filters import DefaultFilter +from .main import Change, FileChange, awatch, watch + +if TYPE_CHECKING: + try: + from typing import Literal + except ImportError: + from typing_extensions import Literal # type: ignore[misc] + +__all__ = 'run_process', 'arun_process', 'detect_target_type', 'import_string' +logger = logging.getLogger('watchfiles.main') + + +def run_process( + *paths: Union[Path, str], + target: Union[str, Callable[..., Any]], + args: Tuple[Any, ...] = (), + kwargs: Optional[Dict[str, Any]] = None, + target_type: "Literal['function', 'command', 'auto']" = 'auto', + callback: Optional[Callable[[Set[FileChange]], None]] = None, + watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(), + grace_period: float = 0, + debounce: int = 1_600, + step: int = 50, + debug: bool = False, + sigint_timeout: int = 5, + sigkill_timeout: int = 1, + recursive: bool = True, + ignore_permission_denied: bool = False, +) -> int: + """ + Run a process and restart it upon file changes. + + `run_process` can work in two ways: + + * Using `multiprocessing.Process` † to run a python function + * Or, using `subprocess.Popen` to run a command + + !!! note + + **†** technically `multiprocessing.get_context('spawn').Process` to avoid forking and improve + code reload/import. + + Internally, `run_process` uses [`watch`][watchfiles.watch] with `raise_interrupt=False` so the function + exits cleanly upon `Ctrl+C`. + + Args: + *paths: matches the same argument of [`watch`][watchfiles.watch] + target: function or command to run + args: arguments to pass to `target`, only used if `target` is a function + kwargs: keyword arguments to pass to `target`, only used if `target` is a function + target_type: type of target. Can be `'function'`, `'command'`, or `'auto'` in which case + [`detect_target_type`][watchfiles.run.detect_target_type] is used to determine the type. + callback: function to call on each reload, the function should accept a set of changes as the sole argument + watch_filter: matches the same argument of [`watch`][watchfiles.watch] + grace_period: number of seconds after the process is started before watching for changes + debounce: matches the same argument of [`watch`][watchfiles.watch] + step: matches the same argument of [`watch`][watchfiles.watch] + debug: matches the same argument of [`watch`][watchfiles.watch] + sigint_timeout: the number of seconds to wait after sending sigint before sending sigkill + sigkill_timeout: the number of seconds to wait after sending sigkill before raising an exception + recursive: matches the same argument of [`watch`][watchfiles.watch] + + Returns: + number of times the function was reloaded. + + ```py title="Example of run_process running a function" + from watchfiles import run_process + + def callback(changes): + print('changes detected:', changes) + + def foobar(a, b): + print('foobar called with:', a, b) + + if __name__ == '__main__': + run_process('./path/to/dir', target=foobar, args=(1, 2), callback=callback) + ``` + + As well as using a `callback` function, changes can be accessed from within the target function, + using the `WATCHFILES_CHANGES` environment variable. + + ```py title="Example of run_process accessing changes" + from watchfiles import run_process + + def foobar(a, b, c): + # changes will be an empty list "[]" the first time the function is called + changes = os.getenv('WATCHFILES_CHANGES') + changes = json.loads(changes) + print('foobar called due to changes:', changes) + + if __name__ == '__main__': + run_process('./path/to/dir', target=foobar, args=(1, 2, 3)) + ``` + + Again with the target as `command`, `WATCHFILES_CHANGES` can be used + to access changes. + + ```bash title="example.sh" + echo "changers: ${WATCHFILES_CHANGES}" + ``` + + ```py title="Example of run_process running a command" + from watchfiles import run_process + + if __name__ == '__main__': + run_process('.', target='./example.sh') + ``` + """ + if target_type == 'auto': + target_type = detect_target_type(target) + + logger.debug('running "%s" as %s', target, target_type) + catch_sigterm() + process = start_process(target, target_type, args, kwargs) + reloads = 0 + + if grace_period: + logger.debug('sleeping for %s seconds before watching for changes', grace_period) + sleep(grace_period) + + try: + for changes in watch( + *paths, + watch_filter=watch_filter, + debounce=debounce, + step=step, + debug=debug, + raise_interrupt=False, + recursive=recursive, + ignore_permission_denied=ignore_permission_denied, + ): + callback and callback(changes) + process.stop(sigint_timeout=sigint_timeout, sigkill_timeout=sigkill_timeout) + process = start_process(target, target_type, args, kwargs, changes) + reloads += 1 + finally: + process.stop() + return reloads + + +async def arun_process( + *paths: Union[Path, str], + target: Union[str, Callable[..., Any]], + args: Tuple[Any, ...] = (), + kwargs: Optional[Dict[str, Any]] = None, + target_type: "Literal['function', 'command', 'auto']" = 'auto', + callback: Optional[Callable[[Set[FileChange]], Any]] = None, + watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(), + grace_period: float = 0, + debounce: int = 1_600, + step: int = 50, + debug: bool = False, + recursive: bool = True, + ignore_permission_denied: bool = False, +) -> int: + """ + Async equivalent of [`run_process`][watchfiles.run_process], all arguments match those of `run_process` except + `callback` which can be a coroutine. + + Starting and stopping the process and watching for changes is done in a separate thread. + + As with `run_process`, internally `arun_process` uses [`awatch`][watchfiles.awatch], however `KeyboardInterrupt` + cannot be caught and suppressed in `awatch` so these errors need to be caught separately, see below. + + ```py title="Example of arun_process usage" + import asyncio + from watchfiles import arun_process + + async def callback(changes): + await asyncio.sleep(0.1) + print('changes detected:', changes) + + def foobar(a, b): + print('foobar called with:', a, b) + + async def main(): + await arun_process('.', target=foobar, args=(1, 2), callback=callback) + + if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + print('stopped via KeyboardInterrupt') + ``` + """ + import inspect + + if target_type == 'auto': + target_type = detect_target_type(target) + + logger.debug('running "%s" as %s', target, target_type) + catch_sigterm() + process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs) + reloads = 0 + + if grace_period: + logger.debug('sleeping for %s seconds before watching for changes', grace_period) + await anyio.sleep(grace_period) + + async for changes in awatch( + *paths, + watch_filter=watch_filter, + debounce=debounce, + step=step, + debug=debug, + recursive=recursive, + ignore_permission_denied=ignore_permission_denied, + ): + if callback is not None: + r = callback(changes) + if inspect.isawaitable(r): + await r + + await anyio.to_thread.run_sync(process.stop) + process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs, changes) + reloads += 1 + await anyio.to_thread.run_sync(process.stop) + return reloads + + +# Use spawn context to make sure code run in subprocess +# does not reuse imported modules in main process/context +spawn_context = get_context('spawn') + + +def split_cmd(cmd: str) -> List[str]: + import platform + + posix = platform.uname().system.lower() != 'windows' + return shlex.split(cmd, posix=posix) + + +def start_process( + target: Union[str, Callable[..., Any]], + target_type: "Literal['function', 'command']", + args: Tuple[Any, ...], + kwargs: Optional[Dict[str, Any]], + changes: Optional[Set[FileChange]] = None, +) -> 'CombinedProcess': + if changes is None: + changes_env_var = '[]' + else: + changes_env_var = json.dumps([[c.raw_str(), p] for c, p in changes]) + + os.environ['WATCHFILES_CHANGES'] = changes_env_var + + process: 'Union[SpawnProcess, subprocess.Popen[bytes]]' + if target_type == 'function': + kwargs = kwargs or {} + if isinstance(target, str): + args = target, get_tty_path(), args, kwargs + target_ = run_function + kwargs = {} + else: + target_ = target + + process = spawn_context.Process(target=target_, args=args, kwargs=kwargs) + process.start() + else: + if args or kwargs: + logger.warning('ignoring args and kwargs for "command" target') + + assert isinstance(target, str), 'target must be a string to run as a command' + popen_args = split_cmd(target) + process = subprocess.Popen(popen_args) + return CombinedProcess(process) + + +def detect_target_type(target: Union[str, Callable[..., Any]]) -> "Literal['function', 'command']": + """ + Used by [`run_process`][watchfiles.run_process], [`arun_process`][watchfiles.arun_process] + and indirectly the CLI to determine the target type with `target_type` is `auto`. + + Detects the target type - either `function` or `command`. This method is only called with `target_type='auto'`. + + The following logic is employed: + + * If `target` is not a string, it is assumed to be a function + * If `target` ends with `.py` or `.sh`, it is assumed to be a command + * Otherwise, the target is assumed to be a function if it matches the regex `[a-zA-Z0-9_]+(\\.[a-zA-Z0-9_]+)+` + + If this logic does not work for you, specify the target type explicitly using the `target_type` function argument + or `--target-type` command line argument. + + Args: + target: The target value + + Returns: + either `'function'` or `'command'` + """ + if not isinstance(target, str): + return 'function' + elif target.endswith(('.py', '.sh')): + return 'command' + elif re.fullmatch(r'[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+', target): + return 'function' + else: + return 'command' + + +class CombinedProcess: + def __init__(self, p: 'Union[SpawnProcess, subprocess.Popen[bytes]]'): + self._p = p + assert self.pid is not None, 'process not yet spawned' + + def stop(self, sigint_timeout: int = 5, sigkill_timeout: int = 1) -> None: + os.environ.pop('WATCHFILES_CHANGES', None) + if self.is_alive(): + logger.debug('stopping process...') + + os.kill(self.pid, signal.SIGINT) + + try: + self.join(sigint_timeout) + except subprocess.TimeoutExpired: + # Capture this exception to allow the self.exitcode to be reached. + # This will allow the SIGKILL to be sent, otherwise it is swallowed up. + logger.warning('SIGINT timed out after %r seconds', sigint_timeout) + pass + + if self.exitcode is None: + logger.warning('process has not terminated, sending SIGKILL') + os.kill(self.pid, signal.SIGKILL) + self.join(sigkill_timeout) + else: + logger.debug('process stopped') + else: + logger.warning('process already dead, exit code: %d', self.exitcode) + + def is_alive(self) -> bool: + if isinstance(self._p, SpawnProcess): + return self._p.is_alive() + else: + return self._p.poll() is None + + @property + def pid(self) -> int: + # we check the process has always been spawned when CombinedProcess is initialised + return self._p.pid # type: ignore[return-value] + + def join(self, timeout: int) -> None: + if isinstance(self._p, SpawnProcess): + self._p.join(timeout) + else: + self._p.wait(timeout) + + @property + def exitcode(self) -> Optional[int]: + if isinstance(self._p, SpawnProcess): + return self._p.exitcode + else: + return self._p.returncode + + +def run_function(function: str, tty_path: Optional[str], args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> None: + with set_tty(tty_path): + func = import_string(function) + func(*args, **kwargs) + + +def import_string(dotted_path: str) -> Any: + """ + Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the + last name in the path. Raise ImportError if the import fails. + """ + try: + module_path, class_name = dotted_path.strip(' ').rsplit('.', 1) + except ValueError as e: + raise ImportError(f'"{dotted_path}" doesn\'t look like a module path') from e + + module = import_module(module_path) + try: + return getattr(module, class_name) + except AttributeError as e: + raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute') from e + + +def get_tty_path() -> Optional[str]: # pragma: no cover + """ + Return the path to the current TTY, if any. + + Virtually impossible to test in pytest, hence no cover. + """ + try: + return os.ttyname(sys.stdin.fileno()) + except OSError: + # fileno() always fails with pytest + return '/dev/tty' + except AttributeError: + # on Windows. No idea of a better solution + return None + + +@contextlib.contextmanager +def set_tty(tty_path: Optional[str]) -> Generator[None, None, None]: + if tty_path: + try: + with open(tty_path) as tty: # pragma: no cover + sys.stdin = tty + yield + except OSError: + # eg. "No such device or address: '/dev/tty'", see https://github.com/samuelcolvin/watchfiles/issues/40 + yield + else: + # currently on windows tty_path is None and there's nothing we can do here + yield + + +def raise_keyboard_interrupt(signum: int, _frame: Any) -> None: # pragma: no cover + logger.warning('received signal %s, raising KeyboardInterrupt', signal.Signals(signum)) + raise KeyboardInterrupt + + +def catch_sigterm() -> None: + """ + Catch SIGTERM and raise KeyboardInterrupt instead. This means watchfiles will stop quickly + on `docker compose stop` and other cases where SIGTERM is sent. + + Without this the watchfiles process will be killed while a running process will continue uninterrupted. + """ + logger.debug('registering handler for SIGTERM on watchfiles process %d', os.getpid()) + signal.signal(signal.SIGTERM, raise_keyboard_interrupt) -- cgit v1.2.3