summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/watchfiles/run.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/watchfiles/run.py')
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/run.py441
1 files changed, 0 insertions, 441 deletions
diff --git a/venv/lib/python3.11/site-packages/watchfiles/run.py b/venv/lib/python3.11/site-packages/watchfiles/run.py
deleted file mode 100644
index d1e2494..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/run.py
+++ /dev/null
@@ -1,441 +0,0 @@
-import contextlib
-import json
-import logging
-import os
-import re
-import shlex
-import signal
-import subprocess
-import sys
-from importlib import import_module
-from multiprocessing import get_context
-from multiprocessing.context import SpawnProcess
-from pathlib import Path
-from time import sleep
-from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union
-
-import anyio
-
-from .filters import DefaultFilter
-from .main import Change, FileChange, awatch, watch
-
-if TYPE_CHECKING:
- try:
- from typing import Literal
- except ImportError:
- from typing_extensions import Literal # type: ignore[misc]
-
-__all__ = 'run_process', 'arun_process', 'detect_target_type', 'import_string'
-logger = logging.getLogger('watchfiles.main')
-
-
-def run_process(
- *paths: Union[Path, str],
- target: Union[str, Callable[..., Any]],
- args: Tuple[Any, ...] = (),
- kwargs: Optional[Dict[str, Any]] = None,
- target_type: "Literal['function', 'command', 'auto']" = 'auto',
- callback: Optional[Callable[[Set[FileChange]], None]] = None,
- watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
- grace_period: float = 0,
- debounce: int = 1_600,
- step: int = 50,
- debug: bool = False,
- sigint_timeout: int = 5,
- sigkill_timeout: int = 1,
- recursive: bool = True,
- ignore_permission_denied: bool = False,
-) -> int:
- """
- Run a process and restart it upon file changes.
-
- `run_process` can work in two ways:
-
- * Using `multiprocessing.Process` † to run a python function
- * Or, using `subprocess.Popen` to run a command
-
- !!! note
-
- **†** technically `multiprocessing.get_context('spawn').Process` to avoid forking and improve
- code reload/import.
-
- Internally, `run_process` uses [`watch`][watchfiles.watch] with `raise_interrupt=False` so the function
- exits cleanly upon `Ctrl+C`.
-
- Args:
- *paths: matches the same argument of [`watch`][watchfiles.watch]
- target: function or command to run
- args: arguments to pass to `target`, only used if `target` is a function
- kwargs: keyword arguments to pass to `target`, only used if `target` is a function
- target_type: type of target. Can be `'function'`, `'command'`, or `'auto'` in which case
- [`detect_target_type`][watchfiles.run.detect_target_type] is used to determine the type.
- callback: function to call on each reload, the function should accept a set of changes as the sole argument
- watch_filter: matches the same argument of [`watch`][watchfiles.watch]
- grace_period: number of seconds after the process is started before watching for changes
- debounce: matches the same argument of [`watch`][watchfiles.watch]
- step: matches the same argument of [`watch`][watchfiles.watch]
- debug: matches the same argument of [`watch`][watchfiles.watch]
- sigint_timeout: the number of seconds to wait after sending sigint before sending sigkill
- sigkill_timeout: the number of seconds to wait after sending sigkill before raising an exception
- recursive: matches the same argument of [`watch`][watchfiles.watch]
-
- Returns:
- number of times the function was reloaded.
-
- ```py title="Example of run_process running a function"
- from watchfiles import run_process
-
- def callback(changes):
- print('changes detected:', changes)
-
- def foobar(a, b):
- print('foobar called with:', a, b)
-
- if __name__ == '__main__':
- run_process('./path/to/dir', target=foobar, args=(1, 2), callback=callback)
- ```
-
- As well as using a `callback` function, changes can be accessed from within the target function,
- using the `WATCHFILES_CHANGES` environment variable.
-
- ```py title="Example of run_process accessing changes"
- from watchfiles import run_process
-
- def foobar(a, b, c):
- # changes will be an empty list "[]" the first time the function is called
- changes = os.getenv('WATCHFILES_CHANGES')
- changes = json.loads(changes)
- print('foobar called due to changes:', changes)
-
- if __name__ == '__main__':
- run_process('./path/to/dir', target=foobar, args=(1, 2, 3))
- ```
-
- Again with the target as `command`, `WATCHFILES_CHANGES` can be used
- to access changes.
-
- ```bash title="example.sh"
- echo "changers: ${WATCHFILES_CHANGES}"
- ```
-
- ```py title="Example of run_process running a command"
- from watchfiles import run_process
-
- if __name__ == '__main__':
- run_process('.', target='./example.sh')
- ```
- """
- if target_type == 'auto':
- target_type = detect_target_type(target)
-
- logger.debug('running "%s" as %s', target, target_type)
- catch_sigterm()
- process = start_process(target, target_type, args, kwargs)
- reloads = 0
-
- if grace_period:
- logger.debug('sleeping for %s seconds before watching for changes', grace_period)
- sleep(grace_period)
-
- try:
- for changes in watch(
- *paths,
- watch_filter=watch_filter,
- debounce=debounce,
- step=step,
- debug=debug,
- raise_interrupt=False,
- recursive=recursive,
- ignore_permission_denied=ignore_permission_denied,
- ):
- callback and callback(changes)
- process.stop(sigint_timeout=sigint_timeout, sigkill_timeout=sigkill_timeout)
- process = start_process(target, target_type, args, kwargs, changes)
- reloads += 1
- finally:
- process.stop()
- return reloads
-
-
-async def arun_process(
- *paths: Union[Path, str],
- target: Union[str, Callable[..., Any]],
- args: Tuple[Any, ...] = (),
- kwargs: Optional[Dict[str, Any]] = None,
- target_type: "Literal['function', 'command', 'auto']" = 'auto',
- callback: Optional[Callable[[Set[FileChange]], Any]] = None,
- watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
- grace_period: float = 0,
- debounce: int = 1_600,
- step: int = 50,
- debug: bool = False,
- recursive: bool = True,
- ignore_permission_denied: bool = False,
-) -> int:
- """
- Async equivalent of [`run_process`][watchfiles.run_process], all arguments match those of `run_process` except
- `callback` which can be a coroutine.
-
- Starting and stopping the process and watching for changes is done in a separate thread.
-
- As with `run_process`, internally `arun_process` uses [`awatch`][watchfiles.awatch], however `KeyboardInterrupt`
- cannot be caught and suppressed in `awatch` so these errors need to be caught separately, see below.
-
- ```py title="Example of arun_process usage"
- import asyncio
- from watchfiles import arun_process
-
- async def callback(changes):
- await asyncio.sleep(0.1)
- print('changes detected:', changes)
-
- def foobar(a, b):
- print('foobar called with:', a, b)
-
- async def main():
- await arun_process('.', target=foobar, args=(1, 2), callback=callback)
-
- if __name__ == '__main__':
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- print('stopped via KeyboardInterrupt')
- ```
- """
- import inspect
-
- if target_type == 'auto':
- target_type = detect_target_type(target)
-
- logger.debug('running "%s" as %s', target, target_type)
- catch_sigterm()
- process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs)
- reloads = 0
-
- if grace_period:
- logger.debug('sleeping for %s seconds before watching for changes', grace_period)
- await anyio.sleep(grace_period)
-
- async for changes in awatch(
- *paths,
- watch_filter=watch_filter,
- debounce=debounce,
- step=step,
- debug=debug,
- recursive=recursive,
- ignore_permission_denied=ignore_permission_denied,
- ):
- if callback is not None:
- r = callback(changes)
- if inspect.isawaitable(r):
- await r
-
- await anyio.to_thread.run_sync(process.stop)
- process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs, changes)
- reloads += 1
- await anyio.to_thread.run_sync(process.stop)
- return reloads
-
-
-# Use spawn context to make sure code run in subprocess
-# does not reuse imported modules in main process/context
-spawn_context = get_context('spawn')
-
-
-def split_cmd(cmd: str) -> List[str]:
- import platform
-
- posix = platform.uname().system.lower() != 'windows'
- return shlex.split(cmd, posix=posix)
-
-
-def start_process(
- target: Union[str, Callable[..., Any]],
- target_type: "Literal['function', 'command']",
- args: Tuple[Any, ...],
- kwargs: Optional[Dict[str, Any]],
- changes: Optional[Set[FileChange]] = None,
-) -> 'CombinedProcess':
- if changes is None:
- changes_env_var = '[]'
- else:
- changes_env_var = json.dumps([[c.raw_str(), p] for c, p in changes])
-
- os.environ['WATCHFILES_CHANGES'] = changes_env_var
-
- process: 'Union[SpawnProcess, subprocess.Popen[bytes]]'
- if target_type == 'function':
- kwargs = kwargs or {}
- if isinstance(target, str):
- args = target, get_tty_path(), args, kwargs
- target_ = run_function
- kwargs = {}
- else:
- target_ = target
-
- process = spawn_context.Process(target=target_, args=args, kwargs=kwargs)
- process.start()
- else:
- if args or kwargs:
- logger.warning('ignoring args and kwargs for "command" target')
-
- assert isinstance(target, str), 'target must be a string to run as a command'
- popen_args = split_cmd(target)
- process = subprocess.Popen(popen_args)
- return CombinedProcess(process)
-
-
-def detect_target_type(target: Union[str, Callable[..., Any]]) -> "Literal['function', 'command']":
- """
- Used by [`run_process`][watchfiles.run_process], [`arun_process`][watchfiles.arun_process]
- and indirectly the CLI to determine the target type with `target_type` is `auto`.
-
- Detects the target type - either `function` or `command`. This method is only called with `target_type='auto'`.
-
- The following logic is employed:
-
- * If `target` is not a string, it is assumed to be a function
- * If `target` ends with `.py` or `.sh`, it is assumed to be a command
- * Otherwise, the target is assumed to be a function if it matches the regex `[a-zA-Z0-9_]+(\\.[a-zA-Z0-9_]+)+`
-
- If this logic does not work for you, specify the target type explicitly using the `target_type` function argument
- or `--target-type` command line argument.
-
- Args:
- target: The target value
-
- Returns:
- either `'function'` or `'command'`
- """
- if not isinstance(target, str):
- return 'function'
- elif target.endswith(('.py', '.sh')):
- return 'command'
- elif re.fullmatch(r'[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+', target):
- return 'function'
- else:
- return 'command'
-
-
-class CombinedProcess:
- def __init__(self, p: 'Union[SpawnProcess, subprocess.Popen[bytes]]'):
- self._p = p
- assert self.pid is not None, 'process not yet spawned'
-
- def stop(self, sigint_timeout: int = 5, sigkill_timeout: int = 1) -> None:
- os.environ.pop('WATCHFILES_CHANGES', None)
- if self.is_alive():
- logger.debug('stopping process...')
-
- os.kill(self.pid, signal.SIGINT)
-
- try:
- self.join(sigint_timeout)
- except subprocess.TimeoutExpired:
- # Capture this exception to allow the self.exitcode to be reached.
- # This will allow the SIGKILL to be sent, otherwise it is swallowed up.
- logger.warning('SIGINT timed out after %r seconds', sigint_timeout)
- pass
-
- if self.exitcode is None:
- logger.warning('process has not terminated, sending SIGKILL')
- os.kill(self.pid, signal.SIGKILL)
- self.join(sigkill_timeout)
- else:
- logger.debug('process stopped')
- else:
- logger.warning('process already dead, exit code: %d', self.exitcode)
-
- def is_alive(self) -> bool:
- if isinstance(self._p, SpawnProcess):
- return self._p.is_alive()
- else:
- return self._p.poll() is None
-
- @property
- def pid(self) -> int:
- # we check the process has always been spawned when CombinedProcess is initialised
- return self._p.pid # type: ignore[return-value]
-
- def join(self, timeout: int) -> None:
- if isinstance(self._p, SpawnProcess):
- self._p.join(timeout)
- else:
- self._p.wait(timeout)
-
- @property
- def exitcode(self) -> Optional[int]:
- if isinstance(self._p, SpawnProcess):
- return self._p.exitcode
- else:
- return self._p.returncode
-
-
-def run_function(function: str, tty_path: Optional[str], args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> None:
- with set_tty(tty_path):
- func = import_string(function)
- func(*args, **kwargs)
-
-
-def import_string(dotted_path: str) -> Any:
- """
- Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the
- last name in the path. Raise ImportError if the import fails.
- """
- try:
- module_path, class_name = dotted_path.strip(' ').rsplit('.', 1)
- except ValueError as e:
- raise ImportError(f'"{dotted_path}" doesn\'t look like a module path') from e
-
- module = import_module(module_path)
- try:
- return getattr(module, class_name)
- except AttributeError as e:
- raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute') from e
-
-
-def get_tty_path() -> Optional[str]: # pragma: no cover
- """
- Return the path to the current TTY, if any.
-
- Virtually impossible to test in pytest, hence no cover.
- """
- try:
- return os.ttyname(sys.stdin.fileno())
- except OSError:
- # fileno() always fails with pytest
- return '/dev/tty'
- except AttributeError:
- # on Windows. No idea of a better solution
- return None
-
-
-@contextlib.contextmanager
-def set_tty(tty_path: Optional[str]) -> Generator[None, None, None]:
- if tty_path:
- try:
- with open(tty_path) as tty: # pragma: no cover
- sys.stdin = tty
- yield
- except OSError:
- # eg. "No such device or address: '/dev/tty'", see https://github.com/samuelcolvin/watchfiles/issues/40
- yield
- else:
- # currently on windows tty_path is None and there's nothing we can do here
- yield
-
-
-def raise_keyboard_interrupt(signum: int, _frame: Any) -> None: # pragma: no cover
- logger.warning('received signal %s, raising KeyboardInterrupt', signal.Signals(signum))
- raise KeyboardInterrupt
-
-
-def catch_sigterm() -> None:
- """
- Catch SIGTERM and raise KeyboardInterrupt instead. This means watchfiles will stop quickly
- on `docker compose stop` and other cases where SIGTERM is sent.
-
- Without this the watchfiles process will be killed while a running process will continue uninterrupted.
- """
- logger.debug('registering handler for SIGTERM on watchfiles process %d', os.getpid())
- signal.signal(signal.SIGTERM, raise_keyboard_interrupt)