diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/watchfiles/run.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/watchfiles/run.py | 441 |
1 files changed, 0 insertions, 441 deletions
diff --git a/venv/lib/python3.11/site-packages/watchfiles/run.py b/venv/lib/python3.11/site-packages/watchfiles/run.py deleted file mode 100644 index d1e2494..0000000 --- a/venv/lib/python3.11/site-packages/watchfiles/run.py +++ /dev/null @@ -1,441 +0,0 @@ -import contextlib -import json -import logging -import os -import re -import shlex -import signal -import subprocess -import sys -from importlib import import_module -from multiprocessing import get_context -from multiprocessing.context import SpawnProcess -from pathlib import Path -from time import sleep -from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union - -import anyio - -from .filters import DefaultFilter -from .main import Change, FileChange, awatch, watch - -if TYPE_CHECKING: - try: - from typing import Literal - except ImportError: - from typing_extensions import Literal # type: ignore[misc] - -__all__ = 'run_process', 'arun_process', 'detect_target_type', 'import_string' -logger = logging.getLogger('watchfiles.main') - - -def run_process( - *paths: Union[Path, str], - target: Union[str, Callable[..., Any]], - args: Tuple[Any, ...] = (), - kwargs: Optional[Dict[str, Any]] = None, - target_type: "Literal['function', 'command', 'auto']" = 'auto', - callback: Optional[Callable[[Set[FileChange]], None]] = None, - watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(), - grace_period: float = 0, - debounce: int = 1_600, - step: int = 50, - debug: bool = False, - sigint_timeout: int = 5, - sigkill_timeout: int = 1, - recursive: bool = True, - ignore_permission_denied: bool = False, -) -> int: - """ - Run a process and restart it upon file changes. - - `run_process` can work in two ways: - - * Using `multiprocessing.Process` † to run a python function - * Or, using `subprocess.Popen` to run a command - - !!! note - - **†** technically `multiprocessing.get_context('spawn').Process` to avoid forking and improve - code reload/import. - - Internally, `run_process` uses [`watch`][watchfiles.watch] with `raise_interrupt=False` so the function - exits cleanly upon `Ctrl+C`. - - Args: - *paths: matches the same argument of [`watch`][watchfiles.watch] - target: function or command to run - args: arguments to pass to `target`, only used if `target` is a function - kwargs: keyword arguments to pass to `target`, only used if `target` is a function - target_type: type of target. Can be `'function'`, `'command'`, or `'auto'` in which case - [`detect_target_type`][watchfiles.run.detect_target_type] is used to determine the type. - callback: function to call on each reload, the function should accept a set of changes as the sole argument - watch_filter: matches the same argument of [`watch`][watchfiles.watch] - grace_period: number of seconds after the process is started before watching for changes - debounce: matches the same argument of [`watch`][watchfiles.watch] - step: matches the same argument of [`watch`][watchfiles.watch] - debug: matches the same argument of [`watch`][watchfiles.watch] - sigint_timeout: the number of seconds to wait after sending sigint before sending sigkill - sigkill_timeout: the number of seconds to wait after sending sigkill before raising an exception - recursive: matches the same argument of [`watch`][watchfiles.watch] - - Returns: - number of times the function was reloaded. - - ```py title="Example of run_process running a function" - from watchfiles import run_process - - def callback(changes): - print('changes detected:', changes) - - def foobar(a, b): - print('foobar called with:', a, b) - - if __name__ == '__main__': - run_process('./path/to/dir', target=foobar, args=(1, 2), callback=callback) - ``` - - As well as using a `callback` function, changes can be accessed from within the target function, - using the `WATCHFILES_CHANGES` environment variable. - - ```py title="Example of run_process accessing changes" - from watchfiles import run_process - - def foobar(a, b, c): - # changes will be an empty list "[]" the first time the function is called - changes = os.getenv('WATCHFILES_CHANGES') - changes = json.loads(changes) - print('foobar called due to changes:', changes) - - if __name__ == '__main__': - run_process('./path/to/dir', target=foobar, args=(1, 2, 3)) - ``` - - Again with the target as `command`, `WATCHFILES_CHANGES` can be used - to access changes. - - ```bash title="example.sh" - echo "changers: ${WATCHFILES_CHANGES}" - ``` - - ```py title="Example of run_process running a command" - from watchfiles import run_process - - if __name__ == '__main__': - run_process('.', target='./example.sh') - ``` - """ - if target_type == 'auto': - target_type = detect_target_type(target) - - logger.debug('running "%s" as %s', target, target_type) - catch_sigterm() - process = start_process(target, target_type, args, kwargs) - reloads = 0 - - if grace_period: - logger.debug('sleeping for %s seconds before watching for changes', grace_period) - sleep(grace_period) - - try: - for changes in watch( - *paths, - watch_filter=watch_filter, - debounce=debounce, - step=step, - debug=debug, - raise_interrupt=False, - recursive=recursive, - ignore_permission_denied=ignore_permission_denied, - ): - callback and callback(changes) - process.stop(sigint_timeout=sigint_timeout, sigkill_timeout=sigkill_timeout) - process = start_process(target, target_type, args, kwargs, changes) - reloads += 1 - finally: - process.stop() - return reloads - - -async def arun_process( - *paths: Union[Path, str], - target: Union[str, Callable[..., Any]], - args: Tuple[Any, ...] = (), - kwargs: Optional[Dict[str, Any]] = None, - target_type: "Literal['function', 'command', 'auto']" = 'auto', - callback: Optional[Callable[[Set[FileChange]], Any]] = None, - watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(), - grace_period: float = 0, - debounce: int = 1_600, - step: int = 50, - debug: bool = False, - recursive: bool = True, - ignore_permission_denied: bool = False, -) -> int: - """ - Async equivalent of [`run_process`][watchfiles.run_process], all arguments match those of `run_process` except - `callback` which can be a coroutine. - - Starting and stopping the process and watching for changes is done in a separate thread. - - As with `run_process`, internally `arun_process` uses [`awatch`][watchfiles.awatch], however `KeyboardInterrupt` - cannot be caught and suppressed in `awatch` so these errors need to be caught separately, see below. - - ```py title="Example of arun_process usage" - import asyncio - from watchfiles import arun_process - - async def callback(changes): - await asyncio.sleep(0.1) - print('changes detected:', changes) - - def foobar(a, b): - print('foobar called with:', a, b) - - async def main(): - await arun_process('.', target=foobar, args=(1, 2), callback=callback) - - if __name__ == '__main__': - try: - asyncio.run(main()) - except KeyboardInterrupt: - print('stopped via KeyboardInterrupt') - ``` - """ - import inspect - - if target_type == 'auto': - target_type = detect_target_type(target) - - logger.debug('running "%s" as %s', target, target_type) - catch_sigterm() - process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs) - reloads = 0 - - if grace_period: - logger.debug('sleeping for %s seconds before watching for changes', grace_period) - await anyio.sleep(grace_period) - - async for changes in awatch( - *paths, - watch_filter=watch_filter, - debounce=debounce, - step=step, - debug=debug, - recursive=recursive, - ignore_permission_denied=ignore_permission_denied, - ): - if callback is not None: - r = callback(changes) - if inspect.isawaitable(r): - await r - - await anyio.to_thread.run_sync(process.stop) - process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs, changes) - reloads += 1 - await anyio.to_thread.run_sync(process.stop) - return reloads - - -# Use spawn context to make sure code run in subprocess -# does not reuse imported modules in main process/context -spawn_context = get_context('spawn') - - -def split_cmd(cmd: str) -> List[str]: - import platform - - posix = platform.uname().system.lower() != 'windows' - return shlex.split(cmd, posix=posix) - - -def start_process( - target: Union[str, Callable[..., Any]], - target_type: "Literal['function', 'command']", - args: Tuple[Any, ...], - kwargs: Optional[Dict[str, Any]], - changes: Optional[Set[FileChange]] = None, -) -> 'CombinedProcess': - if changes is None: - changes_env_var = '[]' - else: - changes_env_var = json.dumps([[c.raw_str(), p] for c, p in changes]) - - os.environ['WATCHFILES_CHANGES'] = changes_env_var - - process: 'Union[SpawnProcess, subprocess.Popen[bytes]]' - if target_type == 'function': - kwargs = kwargs or {} - if isinstance(target, str): - args = target, get_tty_path(), args, kwargs - target_ = run_function - kwargs = {} - else: - target_ = target - - process = spawn_context.Process(target=target_, args=args, kwargs=kwargs) - process.start() - else: - if args or kwargs: - logger.warning('ignoring args and kwargs for "command" target') - - assert isinstance(target, str), 'target must be a string to run as a command' - popen_args = split_cmd(target) - process = subprocess.Popen(popen_args) - return CombinedProcess(process) - - -def detect_target_type(target: Union[str, Callable[..., Any]]) -> "Literal['function', 'command']": - """ - Used by [`run_process`][watchfiles.run_process], [`arun_process`][watchfiles.arun_process] - and indirectly the CLI to determine the target type with `target_type` is `auto`. - - Detects the target type - either `function` or `command`. This method is only called with `target_type='auto'`. - - The following logic is employed: - - * If `target` is not a string, it is assumed to be a function - * If `target` ends with `.py` or `.sh`, it is assumed to be a command - * Otherwise, the target is assumed to be a function if it matches the regex `[a-zA-Z0-9_]+(\\.[a-zA-Z0-9_]+)+` - - If this logic does not work for you, specify the target type explicitly using the `target_type` function argument - or `--target-type` command line argument. - - Args: - target: The target value - - Returns: - either `'function'` or `'command'` - """ - if not isinstance(target, str): - return 'function' - elif target.endswith(('.py', '.sh')): - return 'command' - elif re.fullmatch(r'[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+', target): - return 'function' - else: - return 'command' - - -class CombinedProcess: - def __init__(self, p: 'Union[SpawnProcess, subprocess.Popen[bytes]]'): - self._p = p - assert self.pid is not None, 'process not yet spawned' - - def stop(self, sigint_timeout: int = 5, sigkill_timeout: int = 1) -> None: - os.environ.pop('WATCHFILES_CHANGES', None) - if self.is_alive(): - logger.debug('stopping process...') - - os.kill(self.pid, signal.SIGINT) - - try: - self.join(sigint_timeout) - except subprocess.TimeoutExpired: - # Capture this exception to allow the self.exitcode to be reached. - # This will allow the SIGKILL to be sent, otherwise it is swallowed up. - logger.warning('SIGINT timed out after %r seconds', sigint_timeout) - pass - - if self.exitcode is None: - logger.warning('process has not terminated, sending SIGKILL') - os.kill(self.pid, signal.SIGKILL) - self.join(sigkill_timeout) - else: - logger.debug('process stopped') - else: - logger.warning('process already dead, exit code: %d', self.exitcode) - - def is_alive(self) -> bool: - if isinstance(self._p, SpawnProcess): - return self._p.is_alive() - else: - return self._p.poll() is None - - @property - def pid(self) -> int: - # we check the process has always been spawned when CombinedProcess is initialised - return self._p.pid # type: ignore[return-value] - - def join(self, timeout: int) -> None: - if isinstance(self._p, SpawnProcess): - self._p.join(timeout) - else: - self._p.wait(timeout) - - @property - def exitcode(self) -> Optional[int]: - if isinstance(self._p, SpawnProcess): - return self._p.exitcode - else: - return self._p.returncode - - -def run_function(function: str, tty_path: Optional[str], args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> None: - with set_tty(tty_path): - func = import_string(function) - func(*args, **kwargs) - - -def import_string(dotted_path: str) -> Any: - """ - Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the - last name in the path. Raise ImportError if the import fails. - """ - try: - module_path, class_name = dotted_path.strip(' ').rsplit('.', 1) - except ValueError as e: - raise ImportError(f'"{dotted_path}" doesn\'t look like a module path') from e - - module = import_module(module_path) - try: - return getattr(module, class_name) - except AttributeError as e: - raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute') from e - - -def get_tty_path() -> Optional[str]: # pragma: no cover - """ - Return the path to the current TTY, if any. - - Virtually impossible to test in pytest, hence no cover. - """ - try: - return os.ttyname(sys.stdin.fileno()) - except OSError: - # fileno() always fails with pytest - return '/dev/tty' - except AttributeError: - # on Windows. No idea of a better solution - return None - - -@contextlib.contextmanager -def set_tty(tty_path: Optional[str]) -> Generator[None, None, None]: - if tty_path: - try: - with open(tty_path) as tty: # pragma: no cover - sys.stdin = tty - yield - except OSError: - # eg. "No such device or address: '/dev/tty'", see https://github.com/samuelcolvin/watchfiles/issues/40 - yield - else: - # currently on windows tty_path is None and there's nothing we can do here - yield - - -def raise_keyboard_interrupt(signum: int, _frame: Any) -> None: # pragma: no cover - logger.warning('received signal %s, raising KeyboardInterrupt', signal.Signals(signum)) - raise KeyboardInterrupt - - -def catch_sigterm() -> None: - """ - Catch SIGTERM and raise KeyboardInterrupt instead. This means watchfiles will stop quickly - on `docker compose stop` and other cases where SIGTERM is sent. - - Without this the watchfiles process will be killed while a running process will continue uninterrupted. - """ - logger.debug('registering handler for SIGTERM on watchfiles process %d', os.getpid()) - signal.signal(signal.SIGTERM, raise_keyboard_interrupt) |