summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/watchfiles/cli.py
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/watchfiles/cli.py')
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/cli.py224
1 files changed, 0 insertions, 224 deletions
diff --git a/venv/lib/python3.11/site-packages/watchfiles/cli.py b/venv/lib/python3.11/site-packages/watchfiles/cli.py
deleted file mode 100644
index f1e1ddd..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/cli.py
+++ /dev/null
@@ -1,224 +0,0 @@
-import argparse
-import logging
-import os
-import shlex
-import sys
-from pathlib import Path
-from textwrap import dedent
-from typing import Any, Callable, List, Optional, Tuple, Union, cast
-
-from . import Change
-from .filters import BaseFilter, DefaultFilter, PythonFilter
-from .run import detect_target_type, import_string, run_process
-from .version import VERSION
-
-logger = logging.getLogger('watchfiles.cli')
-
-
-def resolve_path(path_str: str) -> Path:
- path = Path(path_str)
- if not path.exists():
- raise FileNotFoundError(path)
- else:
- return path.resolve()
-
-
-def cli(*args_: str) -> None:
- """
- Watch one or more directories and execute either a shell command or a python function on file changes.
-
- Example of watching the current directory and calling a python function:
-
- watchfiles foobar.main
-
- Example of watching python files in two local directories and calling a shell command:
-
- watchfiles --filter python 'pytest --lf' src tests
-
- See https://watchfiles.helpmanual.io/cli/ for more information.
- """
- args = args_ or sys.argv[1:]
- parser = argparse.ArgumentParser(
- prog='watchfiles',
- description=dedent((cli.__doc__ or '').strip('\n')),
- formatter_class=argparse.RawTextHelpFormatter,
- )
- parser.add_argument('target', help='Command or dotted function path to run')
- parser.add_argument(
- 'paths', nargs='*', default='.', help='Filesystem paths to watch, defaults to current directory'
- )
-
- parser.add_argument(
- '--ignore-paths',
- nargs='?',
- type=str,
- help=(
- 'Specify directories to ignore, '
- 'to ignore multiple paths use a comma as separator, e.g. "env" or "env,node_modules"'
- ),
- )
- parser.add_argument(
- '--target-type',
- nargs='?',
- type=str,
- default='auto',
- choices=['command', 'function', 'auto'],
- help=(
- 'Whether the target should be intercepted as a shell command or a python function, '
- 'defaults to "auto" which infers the target type from the target string'
- ),
- )
- parser.add_argument(
- '--filter',
- nargs='?',
- type=str,
- default='default',
- help=(
- 'Which files to watch, defaults to "default" which uses the "DefaultFilter", '
- '"python" uses the "PythonFilter", "all" uses no filter, '
- 'any other value is interpreted as a python function/class path which is imported'
- ),
- )
- parser.add_argument(
- '--args',
- nargs='?',
- type=str,
- help='Arguments to set on sys.argv before calling target function, used only if the target is a function',
- )
- parser.add_argument('--verbose', action='store_true', help='Set log level to "debug", wins over `--verbosity`')
- parser.add_argument(
- '--non-recursive', action='store_true', help='Do not watch for changes in sub-directories recursively'
- )
- parser.add_argument(
- '--verbosity',
- nargs='?',
- type=str,
- default='info',
- choices=['warning', 'info', 'debug'],
- help='Log level, defaults to "info"',
- )
- parser.add_argument(
- '--sigint-timeout',
- nargs='?',
- type=int,
- default=5,
- help='How long to wait for the sigint timeout before sending sigkill.',
- )
- parser.add_argument(
- '--grace-period',
- nargs='?',
- type=float,
- default=0,
- help='Number of seconds after the process is started before watching for changes.',
- )
- parser.add_argument(
- '--sigkill-timeout',
- nargs='?',
- type=int,
- default=1,
- help='How long to wait for the sigkill timeout before issuing a timeout exception.',
- )
- parser.add_argument(
- '--ignore-permission-denied',
- action='store_true',
- help='Ignore permission denied errors while watching files and directories.',
- )
- parser.add_argument('--version', '-V', action='version', version=f'%(prog)s v{VERSION}')
- arg_namespace = parser.parse_args(args)
-
- if arg_namespace.verbose:
- log_level = logging.DEBUG
- else:
- log_level = getattr(logging, arg_namespace.verbosity.upper())
-
- hdlr = logging.StreamHandler()
- hdlr.setLevel(log_level)
- hdlr.setFormatter(logging.Formatter(fmt='[%(asctime)s] %(message)s', datefmt='%H:%M:%S'))
- wg_logger = logging.getLogger('watchfiles')
- wg_logger.addHandler(hdlr)
- wg_logger.setLevel(log_level)
-
- if arg_namespace.target_type == 'auto':
- target_type = detect_target_type(arg_namespace.target)
- else:
- target_type = arg_namespace.target_type
-
- if target_type == 'function':
- logger.debug('target_type=function, attempting import of "%s"', arg_namespace.target)
- import_exit(arg_namespace.target)
- if arg_namespace.args:
- sys.argv = [arg_namespace.target] + shlex.split(arg_namespace.args)
- elif arg_namespace.args:
- logger.warning('--args is only used when the target is a function')
-
- try:
- paths = [resolve_path(p) for p in arg_namespace.paths]
- except FileNotFoundError as e:
- print(f'path "{e}" does not exist', file=sys.stderr)
- sys.exit(1)
-
- watch_filter, watch_filter_str = build_filter(arg_namespace.filter, arg_namespace.ignore_paths)
-
- logger.info(
- 'watchfiles v%s 👀 path=%s target="%s" (%s) filter=%s...',
- VERSION,
- ', '.join(f'"{p}"' for p in paths),
- arg_namespace.target,
- target_type,
- watch_filter_str,
- )
-
- run_process(
- *paths,
- target=arg_namespace.target,
- target_type=target_type,
- watch_filter=watch_filter,
- debug=log_level == logging.DEBUG,
- sigint_timeout=arg_namespace.sigint_timeout,
- sigkill_timeout=arg_namespace.sigkill_timeout,
- recursive=not arg_namespace.non_recursive,
- ignore_permission_denied=arg_namespace.ignore_permission_denied,
- grace_period=arg_namespace.grace_period,
- )
-
-
-def import_exit(function_path: str) -> Any:
- cwd = os.getcwd()
- if cwd not in sys.path:
- sys.path.append(cwd)
-
- try:
- return import_string(function_path)
- except ImportError as e:
- print(f'ImportError: {e}', file=sys.stderr)
- sys.exit(1)
-
-
-def build_filter(
- filter_name: str, ignore_paths_str: Optional[str]
-) -> Tuple[Union[None, DefaultFilter, Callable[[Change, str], bool]], str]:
- ignore_paths: List[Path] = []
- if ignore_paths_str:
- ignore_paths = [Path(p).resolve() for p in ignore_paths_str.split(',')]
-
- if filter_name == 'default':
- return DefaultFilter(ignore_paths=ignore_paths), 'DefaultFilter'
- elif filter_name == 'python':
- return PythonFilter(ignore_paths=ignore_paths), 'PythonFilter'
- elif filter_name == 'all':
- if ignore_paths:
- logger.warning('"--ignore-paths" argument ignored as "all" filter was selected')
- return None, '(no filter)'
-
- watch_filter_cls = import_exit(filter_name)
- if isinstance(watch_filter_cls, type) and issubclass(watch_filter_cls, DefaultFilter):
- return watch_filter_cls(ignore_paths=ignore_paths), watch_filter_cls.__name__
-
- if ignore_paths:
- logger.warning('"--ignore-paths" argument ignored as filter is not a subclass of DefaultFilter')
-
- if isinstance(watch_filter_cls, type) and issubclass(watch_filter_cls, BaseFilter):
- return watch_filter_cls(), watch_filter_cls.__name__
- else:
- watch_filter = cast(Callable[[Change, str], bool], watch_filter_cls)
- return watch_filter, repr(watch_filter_cls)