diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/watchfiles/cli.py')
-rw-r--r-- | venv/lib/python3.11/site-packages/watchfiles/cli.py | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/venv/lib/python3.11/site-packages/watchfiles/cli.py b/venv/lib/python3.11/site-packages/watchfiles/cli.py new file mode 100644 index 0000000..f1e1ddd --- /dev/null +++ b/venv/lib/python3.11/site-packages/watchfiles/cli.py @@ -0,0 +1,224 @@ +import argparse +import logging +import os +import shlex +import sys +from pathlib import Path +from textwrap import dedent +from typing import Any, Callable, List, Optional, Tuple, Union, cast + +from . import Change +from .filters import BaseFilter, DefaultFilter, PythonFilter +from .run import detect_target_type, import_string, run_process +from .version import VERSION + +logger = logging.getLogger('watchfiles.cli') + + +def resolve_path(path_str: str) -> Path: + path = Path(path_str) + if not path.exists(): + raise FileNotFoundError(path) + else: + return path.resolve() + + +def cli(*args_: str) -> None: + """ + Watch one or more directories and execute either a shell command or a python function on file changes. + + Example of watching the current directory and calling a python function: + + watchfiles foobar.main + + Example of watching python files in two local directories and calling a shell command: + + watchfiles --filter python 'pytest --lf' src tests + + See https://watchfiles.helpmanual.io/cli/ for more information. + """ + args = args_ or sys.argv[1:] + parser = argparse.ArgumentParser( + prog='watchfiles', + description=dedent((cli.__doc__ or '').strip('\n')), + formatter_class=argparse.RawTextHelpFormatter, + ) + parser.add_argument('target', help='Command or dotted function path to run') + parser.add_argument( + 'paths', nargs='*', default='.', help='Filesystem paths to watch, defaults to current directory' + ) + + parser.add_argument( + '--ignore-paths', + nargs='?', + type=str, + help=( + 'Specify directories to ignore, ' + 'to ignore multiple paths use a comma as separator, e.g. "env" or "env,node_modules"' + ), + ) + parser.add_argument( + '--target-type', + nargs='?', + type=str, + default='auto', + choices=['command', 'function', 'auto'], + help=( + 'Whether the target should be intercepted as a shell command or a python function, ' + 'defaults to "auto" which infers the target type from the target string' + ), + ) + parser.add_argument( + '--filter', + nargs='?', + type=str, + default='default', + help=( + 'Which files to watch, defaults to "default" which uses the "DefaultFilter", ' + '"python" uses the "PythonFilter", "all" uses no filter, ' + 'any other value is interpreted as a python function/class path which is imported' + ), + ) + parser.add_argument( + '--args', + nargs='?', + type=str, + help='Arguments to set on sys.argv before calling target function, used only if the target is a function', + ) + parser.add_argument('--verbose', action='store_true', help='Set log level to "debug", wins over `--verbosity`') + parser.add_argument( + '--non-recursive', action='store_true', help='Do not watch for changes in sub-directories recursively' + ) + parser.add_argument( + '--verbosity', + nargs='?', + type=str, + default='info', + choices=['warning', 'info', 'debug'], + help='Log level, defaults to "info"', + ) + parser.add_argument( + '--sigint-timeout', + nargs='?', + type=int, + default=5, + help='How long to wait for the sigint timeout before sending sigkill.', + ) + parser.add_argument( + '--grace-period', + nargs='?', + type=float, + default=0, + help='Number of seconds after the process is started before watching for changes.', + ) + parser.add_argument( + '--sigkill-timeout', + nargs='?', + type=int, + default=1, + help='How long to wait for the sigkill timeout before issuing a timeout exception.', + ) + parser.add_argument( + '--ignore-permission-denied', + action='store_true', + help='Ignore permission denied errors while watching files and directories.', + ) + parser.add_argument('--version', '-V', action='version', version=f'%(prog)s v{VERSION}') + arg_namespace = parser.parse_args(args) + + if arg_namespace.verbose: + log_level = logging.DEBUG + else: + log_level = getattr(logging, arg_namespace.verbosity.upper()) + + hdlr = logging.StreamHandler() + hdlr.setLevel(log_level) + hdlr.setFormatter(logging.Formatter(fmt='[%(asctime)s] %(message)s', datefmt='%H:%M:%S')) + wg_logger = logging.getLogger('watchfiles') + wg_logger.addHandler(hdlr) + wg_logger.setLevel(log_level) + + if arg_namespace.target_type == 'auto': + target_type = detect_target_type(arg_namespace.target) + else: + target_type = arg_namespace.target_type + + if target_type == 'function': + logger.debug('target_type=function, attempting import of "%s"', arg_namespace.target) + import_exit(arg_namespace.target) + if arg_namespace.args: + sys.argv = [arg_namespace.target] + shlex.split(arg_namespace.args) + elif arg_namespace.args: + logger.warning('--args is only used when the target is a function') + + try: + paths = [resolve_path(p) for p in arg_namespace.paths] + except FileNotFoundError as e: + print(f'path "{e}" does not exist', file=sys.stderr) + sys.exit(1) + + watch_filter, watch_filter_str = build_filter(arg_namespace.filter, arg_namespace.ignore_paths) + + logger.info( + 'watchfiles v%s 👀 path=%s target="%s" (%s) filter=%s...', + VERSION, + ', '.join(f'"{p}"' for p in paths), + arg_namespace.target, + target_type, + watch_filter_str, + ) + + run_process( + *paths, + target=arg_namespace.target, + target_type=target_type, + watch_filter=watch_filter, + debug=log_level == logging.DEBUG, + sigint_timeout=arg_namespace.sigint_timeout, + sigkill_timeout=arg_namespace.sigkill_timeout, + recursive=not arg_namespace.non_recursive, + ignore_permission_denied=arg_namespace.ignore_permission_denied, + grace_period=arg_namespace.grace_period, + ) + + +def import_exit(function_path: str) -> Any: + cwd = os.getcwd() + if cwd not in sys.path: + sys.path.append(cwd) + + try: + return import_string(function_path) + except ImportError as e: + print(f'ImportError: {e}', file=sys.stderr) + sys.exit(1) + + +def build_filter( + filter_name: str, ignore_paths_str: Optional[str] +) -> Tuple[Union[None, DefaultFilter, Callable[[Change, str], bool]], str]: + ignore_paths: List[Path] = [] + if ignore_paths_str: + ignore_paths = [Path(p).resolve() for p in ignore_paths_str.split(',')] + + if filter_name == 'default': + return DefaultFilter(ignore_paths=ignore_paths), 'DefaultFilter' + elif filter_name == 'python': + return PythonFilter(ignore_paths=ignore_paths), 'PythonFilter' + elif filter_name == 'all': + if ignore_paths: + logger.warning('"--ignore-paths" argument ignored as "all" filter was selected') + return None, '(no filter)' + + watch_filter_cls = import_exit(filter_name) + if isinstance(watch_filter_cls, type) and issubclass(watch_filter_cls, DefaultFilter): + return watch_filter_cls(ignore_paths=ignore_paths), watch_filter_cls.__name__ + + if ignore_paths: + logger.warning('"--ignore-paths" argument ignored as filter is not a subclass of DefaultFilter') + + if isinstance(watch_filter_cls, type) and issubclass(watch_filter_cls, BaseFilter): + return watch_filter_cls(), watch_filter_cls.__name__ + else: + watch_filter = cast(Callable[[Change, str], bool], watch_filter_cls) + return watch_filter, repr(watch_filter_cls) |