summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/watchfiles
diff options
context:
space:
mode:
Diffstat (limited to 'venv/lib/python3.11/site-packages/watchfiles')
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/__init__.py17
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/__main__.py4
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/__pycache__/__init__.cpython-311.pycbin631 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/__pycache__/__main__.cpython-311.pycbin318 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/__pycache__/cli.cpython-311.pycbin11318 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/__pycache__/filters.cpython-311.pycbin8116 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/__pycache__/main.cpython-311.pycbin17664 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/__pycache__/run.cpython-311.pycbin21477 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/__pycache__/version.cpython-311.pycbin296 -> 0 bytes
-rwxr-xr-xvenv/lib/python3.11/site-packages/watchfiles/_rust_notify.cpython-311-x86_64-linux-gnu.sobin5364528 -> 0 bytes
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/_rust_notify.pyi111
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/cli.py224
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/filters.py150
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/main.py344
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/py.typed1
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/run.py441
-rw-r--r--venv/lib/python3.11/site-packages/watchfiles/version.py5
17 files changed, 0 insertions, 1297 deletions
diff --git a/venv/lib/python3.11/site-packages/watchfiles/__init__.py b/venv/lib/python3.11/site-packages/watchfiles/__init__.py
deleted file mode 100644
index 877fbd5..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-from .filters import BaseFilter, DefaultFilter, PythonFilter
-from .main import Change, awatch, watch
-from .run import arun_process, run_process
-from .version import VERSION
-
-__version__ = VERSION
-__all__ = (
- 'watch',
- 'awatch',
- 'run_process',
- 'arun_process',
- 'Change',
- 'BaseFilter',
- 'DefaultFilter',
- 'PythonFilter',
- 'VERSION',
-)
diff --git a/venv/lib/python3.11/site-packages/watchfiles/__main__.py b/venv/lib/python3.11/site-packages/watchfiles/__main__.py
deleted file mode 100644
index d396c2a..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/__main__.py
+++ /dev/null
@@ -1,4 +0,0 @@
-from .cli import cli
-
-if __name__ == '__main__':
- cli()
diff --git a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/__init__.cpython-311.pyc b/venv/lib/python3.11/site-packages/watchfiles/__pycache__/__init__.cpython-311.pyc
deleted file mode 100644
index d0fdd17..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/__init__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/__main__.cpython-311.pyc b/venv/lib/python3.11/site-packages/watchfiles/__pycache__/__main__.cpython-311.pyc
deleted file mode 100644
index cecee55..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/__main__.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/cli.cpython-311.pyc b/venv/lib/python3.11/site-packages/watchfiles/__pycache__/cli.cpython-311.pyc
deleted file mode 100644
index 612625b..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/cli.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/filters.cpython-311.pyc b/venv/lib/python3.11/site-packages/watchfiles/__pycache__/filters.cpython-311.pyc
deleted file mode 100644
index f861ae9..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/filters.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/main.cpython-311.pyc b/venv/lib/python3.11/site-packages/watchfiles/__pycache__/main.cpython-311.pyc
deleted file mode 100644
index 7529b39..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/main.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/run.cpython-311.pyc b/venv/lib/python3.11/site-packages/watchfiles/__pycache__/run.cpython-311.pyc
deleted file mode 100644
index 948cac5..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/run.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/version.cpython-311.pyc b/venv/lib/python3.11/site-packages/watchfiles/__pycache__/version.cpython-311.pyc
deleted file mode 100644
index 3506dd7..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/__pycache__/version.cpython-311.pyc
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/watchfiles/_rust_notify.cpython-311-x86_64-linux-gnu.so b/venv/lib/python3.11/site-packages/watchfiles/_rust_notify.cpython-311-x86_64-linux-gnu.so
deleted file mode 100755
index 5ee5943..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/_rust_notify.cpython-311-x86_64-linux-gnu.so
+++ /dev/null
Binary files differ
diff --git a/venv/lib/python3.11/site-packages/watchfiles/_rust_notify.pyi b/venv/lib/python3.11/site-packages/watchfiles/_rust_notify.pyi
deleted file mode 100644
index 63eacda..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/_rust_notify.pyi
+++ /dev/null
@@ -1,111 +0,0 @@
-from typing import Any, List, Literal, Optional, Protocol, Set, Tuple, Union
-
-__all__ = 'RustNotify', 'WatchfilesRustInternalError'
-
-__version__: str
-"""The package version as defined in `Cargo.toml`, modified to match python's versioning semantics."""
-
-class AbstractEvent(Protocol):
- def is_set(self) -> bool: ...
-
-class RustNotify:
- """
- Interface to the Rust [notify](https://crates.io/crates/notify) crate which does
- the heavy lifting of watching for file changes and grouping them into events.
- """
-
- def __init__(
- self,
- watch_paths: List[str],
- debug: bool,
- force_polling: bool,
- poll_delay_ms: int,
- recursive: bool,
- ignore_permission_denied: bool,
- ) -> None:
- """
- Create a new `RustNotify` instance and start a thread to watch for changes.
-
- `FileNotFoundError` is raised if any of the paths do not exist.
-
- Args:
- watch_paths: file system paths to watch for changes, can be directories or files
- debug: if true, print details about all events to stderr
- force_polling: if true, always use polling instead of file system notifications
- poll_delay_ms: delay between polling for changes, only used if `force_polling=True`
- recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in
- the top-level directory, default is `True`.
- ignore_permission_denied: if `True`, permission denied errors are ignored while watching changes.
- """
- def watch(
- self,
- debounce_ms: int,
- step_ms: int,
- timeout_ms: int,
- stop_event: Optional[AbstractEvent],
- ) -> Union[Set[Tuple[int, str]], Literal['signal', 'stop', 'timeout']]:
- """
- Watch for changes.
-
- This method will wait `timeout_ms` milliseconds for changes, but once a change is detected,
- it will group changes and return in no more than `debounce_ms` milliseconds.
-
- The GIL is released during a `step_ms` sleep on each iteration to avoid
- blocking python.
-
- Args:
- debounce_ms: maximum time in milliseconds to group changes over before returning.
- step_ms: time to wait for new changes in milliseconds, if no changes are detected
- in this time, and at least one change has been detected, the changes are yielded.
- timeout_ms: maximum time in milliseconds to wait for changes before returning,
- `0` means wait indefinitely, `debounce_ms` takes precedence over `timeout_ms` once
- a change is detected.
- stop_event: event to check on every iteration to see if this function should return early.
- The event should be an object which has an `is_set()` method which returns a boolean.
-
- Returns:
- See below.
-
- Return values have the following meanings:
-
- * Change details as a `set` of `(event_type, path)` tuples, the event types are ints which match
- [`Change`][watchfiles.Change], `path` is a string representing the path of the file that changed
- * `'signal'` string, if a signal was received
- * `'stop'` string, if the `stop_event` was set
- * `'timeout'` string, if `timeout_ms` was exceeded
- """
- def __enter__(self) -> 'RustNotify':
- """
- Does nothing, but allows `RustNotify` to be used as a context manager.
-
- !!! note
-
- The watching thead is created when an instance is initiated, not on `__enter__`.
- """
- def __exit__(self, *args: Any) -> None:
- """
- Calls [`close`][watchfiles._rust_notify.RustNotify.close].
- """
- def close(self) -> None:
- """
- Stops the watching thread. After `close` is called, the `RustNotify` instance can no
- longer be used, calls to [`watch`][watchfiles._rust_notify.RustNotify.watch] will raise a `RuntimeError`.
-
- !!! note
-
- `close` is not required, just deleting the `RustNotify` instance will kill the thread
- implicitly.
-
- As per [#163](https://github.com/samuelcolvin/watchfiles/issues/163) `close()` is only required because
- in the event of an error, the traceback in `sys.exc_info` keeps a reference to `watchfiles.watch`'s
- frame, so you can't rely on the `RustNotify` object being deleted, and thereby stopping
- the watching thread.
- """
-
-class WatchfilesRustInternalError(RuntimeError):
- """
- Raised when RustNotify encounters an unknown error.
-
- If you get this a lot, please check [github](https://github.com/samuelcolvin/watchfiles/issues) issues
- and create a new issue if your problem is not discussed.
- """
diff --git a/venv/lib/python3.11/site-packages/watchfiles/cli.py b/venv/lib/python3.11/site-packages/watchfiles/cli.py
deleted file mode 100644
index f1e1ddd..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/cli.py
+++ /dev/null
@@ -1,224 +0,0 @@
-import argparse
-import logging
-import os
-import shlex
-import sys
-from pathlib import Path
-from textwrap import dedent
-from typing import Any, Callable, List, Optional, Tuple, Union, cast
-
-from . import Change
-from .filters import BaseFilter, DefaultFilter, PythonFilter
-from .run import detect_target_type, import_string, run_process
-from .version import VERSION
-
-logger = logging.getLogger('watchfiles.cli')
-
-
-def resolve_path(path_str: str) -> Path:
- path = Path(path_str)
- if not path.exists():
- raise FileNotFoundError(path)
- else:
- return path.resolve()
-
-
-def cli(*args_: str) -> None:
- """
- Watch one or more directories and execute either a shell command or a python function on file changes.
-
- Example of watching the current directory and calling a python function:
-
- watchfiles foobar.main
-
- Example of watching python files in two local directories and calling a shell command:
-
- watchfiles --filter python 'pytest --lf' src tests
-
- See https://watchfiles.helpmanual.io/cli/ for more information.
- """
- args = args_ or sys.argv[1:]
- parser = argparse.ArgumentParser(
- prog='watchfiles',
- description=dedent((cli.__doc__ or '').strip('\n')),
- formatter_class=argparse.RawTextHelpFormatter,
- )
- parser.add_argument('target', help='Command or dotted function path to run')
- parser.add_argument(
- 'paths', nargs='*', default='.', help='Filesystem paths to watch, defaults to current directory'
- )
-
- parser.add_argument(
- '--ignore-paths',
- nargs='?',
- type=str,
- help=(
- 'Specify directories to ignore, '
- 'to ignore multiple paths use a comma as separator, e.g. "env" or "env,node_modules"'
- ),
- )
- parser.add_argument(
- '--target-type',
- nargs='?',
- type=str,
- default='auto',
- choices=['command', 'function', 'auto'],
- help=(
- 'Whether the target should be intercepted as a shell command or a python function, '
- 'defaults to "auto" which infers the target type from the target string'
- ),
- )
- parser.add_argument(
- '--filter',
- nargs='?',
- type=str,
- default='default',
- help=(
- 'Which files to watch, defaults to "default" which uses the "DefaultFilter", '
- '"python" uses the "PythonFilter", "all" uses no filter, '
- 'any other value is interpreted as a python function/class path which is imported'
- ),
- )
- parser.add_argument(
- '--args',
- nargs='?',
- type=str,
- help='Arguments to set on sys.argv before calling target function, used only if the target is a function',
- )
- parser.add_argument('--verbose', action='store_true', help='Set log level to "debug", wins over `--verbosity`')
- parser.add_argument(
- '--non-recursive', action='store_true', help='Do not watch for changes in sub-directories recursively'
- )
- parser.add_argument(
- '--verbosity',
- nargs='?',
- type=str,
- default='info',
- choices=['warning', 'info', 'debug'],
- help='Log level, defaults to "info"',
- )
- parser.add_argument(
- '--sigint-timeout',
- nargs='?',
- type=int,
- default=5,
- help='How long to wait for the sigint timeout before sending sigkill.',
- )
- parser.add_argument(
- '--grace-period',
- nargs='?',
- type=float,
- default=0,
- help='Number of seconds after the process is started before watching for changes.',
- )
- parser.add_argument(
- '--sigkill-timeout',
- nargs='?',
- type=int,
- default=1,
- help='How long to wait for the sigkill timeout before issuing a timeout exception.',
- )
- parser.add_argument(
- '--ignore-permission-denied',
- action='store_true',
- help='Ignore permission denied errors while watching files and directories.',
- )
- parser.add_argument('--version', '-V', action='version', version=f'%(prog)s v{VERSION}')
- arg_namespace = parser.parse_args(args)
-
- if arg_namespace.verbose:
- log_level = logging.DEBUG
- else:
- log_level = getattr(logging, arg_namespace.verbosity.upper())
-
- hdlr = logging.StreamHandler()
- hdlr.setLevel(log_level)
- hdlr.setFormatter(logging.Formatter(fmt='[%(asctime)s] %(message)s', datefmt='%H:%M:%S'))
- wg_logger = logging.getLogger('watchfiles')
- wg_logger.addHandler(hdlr)
- wg_logger.setLevel(log_level)
-
- if arg_namespace.target_type == 'auto':
- target_type = detect_target_type(arg_namespace.target)
- else:
- target_type = arg_namespace.target_type
-
- if target_type == 'function':
- logger.debug('target_type=function, attempting import of "%s"', arg_namespace.target)
- import_exit(arg_namespace.target)
- if arg_namespace.args:
- sys.argv = [arg_namespace.target] + shlex.split(arg_namespace.args)
- elif arg_namespace.args:
- logger.warning('--args is only used when the target is a function')
-
- try:
- paths = [resolve_path(p) for p in arg_namespace.paths]
- except FileNotFoundError as e:
- print(f'path "{e}" does not exist', file=sys.stderr)
- sys.exit(1)
-
- watch_filter, watch_filter_str = build_filter(arg_namespace.filter, arg_namespace.ignore_paths)
-
- logger.info(
- 'watchfiles v%s 👀 path=%s target="%s" (%s) filter=%s...',
- VERSION,
- ', '.join(f'"{p}"' for p in paths),
- arg_namespace.target,
- target_type,
- watch_filter_str,
- )
-
- run_process(
- *paths,
- target=arg_namespace.target,
- target_type=target_type,
- watch_filter=watch_filter,
- debug=log_level == logging.DEBUG,
- sigint_timeout=arg_namespace.sigint_timeout,
- sigkill_timeout=arg_namespace.sigkill_timeout,
- recursive=not arg_namespace.non_recursive,
- ignore_permission_denied=arg_namespace.ignore_permission_denied,
- grace_period=arg_namespace.grace_period,
- )
-
-
-def import_exit(function_path: str) -> Any:
- cwd = os.getcwd()
- if cwd not in sys.path:
- sys.path.append(cwd)
-
- try:
- return import_string(function_path)
- except ImportError as e:
- print(f'ImportError: {e}', file=sys.stderr)
- sys.exit(1)
-
-
-def build_filter(
- filter_name: str, ignore_paths_str: Optional[str]
-) -> Tuple[Union[None, DefaultFilter, Callable[[Change, str], bool]], str]:
- ignore_paths: List[Path] = []
- if ignore_paths_str:
- ignore_paths = [Path(p).resolve() for p in ignore_paths_str.split(',')]
-
- if filter_name == 'default':
- return DefaultFilter(ignore_paths=ignore_paths), 'DefaultFilter'
- elif filter_name == 'python':
- return PythonFilter(ignore_paths=ignore_paths), 'PythonFilter'
- elif filter_name == 'all':
- if ignore_paths:
- logger.warning('"--ignore-paths" argument ignored as "all" filter was selected')
- return None, '(no filter)'
-
- watch_filter_cls = import_exit(filter_name)
- if isinstance(watch_filter_cls, type) and issubclass(watch_filter_cls, DefaultFilter):
- return watch_filter_cls(ignore_paths=ignore_paths), watch_filter_cls.__name__
-
- if ignore_paths:
- logger.warning('"--ignore-paths" argument ignored as filter is not a subclass of DefaultFilter')
-
- if isinstance(watch_filter_cls, type) and issubclass(watch_filter_cls, BaseFilter):
- return watch_filter_cls(), watch_filter_cls.__name__
- else:
- watch_filter = cast(Callable[[Change, str], bool], watch_filter_cls)
- return watch_filter, repr(watch_filter_cls)
diff --git a/venv/lib/python3.11/site-packages/watchfiles/filters.py b/venv/lib/python3.11/site-packages/watchfiles/filters.py
deleted file mode 100644
index 3ebe6c5..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/filters.py
+++ /dev/null
@@ -1,150 +0,0 @@
-import logging
-import os
-import re
-from pathlib import Path
-from typing import TYPE_CHECKING, Optional, Sequence, Union
-
-__all__ = 'BaseFilter', 'DefaultFilter', 'PythonFilter'
-logger = logging.getLogger('watchfiles.watcher')
-
-
-if TYPE_CHECKING:
- from .main import Change
-
-
-class BaseFilter:
- """
- Useful base class for creating filters. `BaseFilter` should be inherited and configured, rather than used
- directly.
-
- The class supports ignoring files in 3 ways:
- """
-
- __slots__ = '_ignore_dirs', '_ignore_entity_regexes', '_ignore_paths'
- ignore_dirs: Sequence[str] = ()
- """Full names of directories to ignore, an obvious example would be `.git`."""
- ignore_entity_patterns: Sequence[str] = ()
- """
- Patterns of files or directories to ignore, these are compiled into regexes.
-
- "entity" here refers to the specific file or directory - basically the result of `path.split(os.sep)[-1]`,
- an obvious example would be `r'\\.py[cod]$'`.
- """
- ignore_paths: Sequence[Union[str, Path]] = ()
- """
- Full paths to ignore, e.g. `/home/users/.cache` or `C:\\Users\\user\\.cache`.
- """
-
- def __init__(self) -> None:
- self._ignore_dirs = set(self.ignore_dirs)
- self._ignore_entity_regexes = tuple(re.compile(r) for r in self.ignore_entity_patterns)
- self._ignore_paths = tuple(map(str, self.ignore_paths))
-
- def __call__(self, change: 'Change', path: str) -> bool:
- """
- Instances of `BaseFilter` subclasses can be used as callables.
- Args:
- change: The type of change that occurred, see [`Change`][watchfiles.Change].
- path: the raw path of the file or directory that changed.
-
- Returns:
- True if the file should be included in changes, False if it should be ignored.
- """
- parts = path.lstrip(os.sep).split(os.sep)
- if any(p in self._ignore_dirs for p in parts):
- return False
-
- entity_name = parts[-1]
- if any(r.search(entity_name) for r in self._ignore_entity_regexes):
- return False
- elif self._ignore_paths and path.startswith(self._ignore_paths):
- return False
- else:
- return True
-
- def __repr__(self) -> str:
- args = ', '.join(f'{k}={getattr(self, k, None)!r}' for k in self.__slots__)
- return f'{self.__class__.__name__}({args})'
-
-
-class DefaultFilter(BaseFilter):
- """
- The default filter, which ignores files and directories that you might commonly want to ignore.
- """
-
- ignore_dirs: Sequence[str] = (
- '__pycache__',
- '.git',
- '.hg',
- '.svn',
- '.tox',
- '.venv',
- 'site-packages',
- '.idea',
- 'node_modules',
- '.mypy_cache',
- '.pytest_cache',
- '.hypothesis',
- )
- """Directory names to ignore."""
-
- ignore_entity_patterns: Sequence[str] = (
- r'\.py[cod]$',
- r'\.___jb_...___$',
- r'\.sw.$',
- '~$',
- r'^\.\#',
- r'^\.DS_Store$',
- r'^flycheck_',
- )
- """File/Directory name patterns to ignore."""
-
- def __init__(
- self,
- *,
- ignore_dirs: Optional[Sequence[str]] = None,
- ignore_entity_patterns: Optional[Sequence[str]] = None,
- ignore_paths: Optional[Sequence[Union[str, Path]]] = None,
- ) -> None:
- """
- Args:
- ignore_dirs: if not `None`, overrides the `ignore_dirs` value set on the class.
- ignore_entity_patterns: if not `None`, overrides the `ignore_entity_patterns` value set on the class.
- ignore_paths: if not `None`, overrides the `ignore_paths` value set on the class.
- """
- if ignore_dirs is not None:
- self.ignore_dirs = ignore_dirs
- if ignore_entity_patterns is not None:
- self.ignore_entity_patterns = ignore_entity_patterns
- if ignore_paths is not None:
- self.ignore_paths = ignore_paths
-
- super().__init__()
-
-
-class PythonFilter(DefaultFilter):
- """
- A filter for Python files, since this class inherits from [`DefaultFilter`][watchfiles.DefaultFilter]
- it will ignore files and directories that you might commonly want to ignore as well as filtering out
- all changes except in Python files (files with extensions `('.py', '.pyx', '.pyd')`).
- """
-
- def __init__(
- self,
- *,
- ignore_paths: Optional[Sequence[Union[str, Path]]] = None,
- extra_extensions: Sequence[str] = (),
- ) -> None:
- """
- Args:
- ignore_paths: The paths to ignore, see [`BaseFilter`][watchfiles.BaseFilter].
- extra_extensions: extra extensions to ignore.
-
- `ignore_paths` and `extra_extensions` can be passed as arguments partly to support [CLI](../cli.md) usage where
- `--ignore-paths` and `--extensions` can be passed as arguments.
- """
- self.extensions = ('.py', '.pyx', '.pyd') + tuple(extra_extensions)
- super().__init__(ignore_paths=ignore_paths)
-
- def __call__(self, change: 'Change', path: str) -> bool:
- return path.endswith(self.extensions) and super().__call__(change, path)
diff --git a/venv/lib/python3.11/site-packages/watchfiles/main.py b/venv/lib/python3.11/site-packages/watchfiles/main.py
deleted file mode 100644
index 1df7e54..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/main.py
+++ /dev/null
@@ -1,344 +0,0 @@
-import logging
-import os
-import sys
-import warnings
-from enum import IntEnum
-from pathlib import Path
-from typing import TYPE_CHECKING, AsyncGenerator, Callable, Generator, Optional, Set, Tuple, Union
-
-import anyio
-
-from ._rust_notify import RustNotify
-from .filters import DefaultFilter
-
-__all__ = 'watch', 'awatch', 'Change', 'FileChange'
-logger = logging.getLogger('watchfiles.main')
-
-
-class Change(IntEnum):
- """
- Enum representing the type of change that occurred.
- """
-
- added = 1
- """A new file or directory was added."""
- modified = 2
- """A file or directory was modified, can be either a metadata or data change."""
- deleted = 3
- """A file or directory was deleted."""
-
- def raw_str(self) -> str:
- return self.name
-
-
-FileChange = Tuple[Change, str]
-"""
-A tuple representing a file change, first element is a [`Change`][watchfiles.Change] member, second is the path
-of the file or directory that changed.
-"""
-
-if TYPE_CHECKING:
- import asyncio
- from typing import Protocol
-
- import trio
-
- AnyEvent = Union[anyio.Event, asyncio.Event, trio.Event]
-
- class AbstractEvent(Protocol):
- def is_set(self) -> bool:
- ...
-
-
-def watch(
- *paths: Union[Path, str],
- watch_filter: Optional[Callable[['Change', str], bool]] = DefaultFilter(),
- debounce: int = 1_600,
- step: int = 50,
- stop_event: Optional['AbstractEvent'] = None,
- rust_timeout: int = 5_000,
- yield_on_timeout: bool = False,
- debug: bool = False,
- raise_interrupt: bool = True,
- force_polling: Optional[bool] = None,
- poll_delay_ms: int = 300,
- recursive: bool = True,
- ignore_permission_denied: Optional[bool] = None,
-) -> Generator[Set[FileChange], None, None]:
- """
- Watch one or more paths and yield a set of changes whenever files change.
-
- The paths watched can be directories or files, directories are watched recursively - changes in subdirectories
- are also detected.
-
- #### Force polling
-
- Notify will fall back to file polling if it can't use file system notifications, but we also force notify
- to us polling if the `force_polling` argument is `True`; if `force_polling` is unset (or `None`), we enable
- force polling thus:
-
- * if the `WATCHFILES_FORCE_POLLING` environment variable exists and is not empty:
- * if the value is `false`, `disable` or `disabled`, force polling is disabled
- * otherwise, force polling is enabled
- * otherwise, we enable force polling only if we detect we're running on WSL (Windows Subsystem for Linux)
-
- Args:
- *paths: filesystem paths to watch.
- watch_filter: callable used to filter out changes which are not important, you can either use a raw callable
- or a [`BaseFilter`][watchfiles.BaseFilter] instance,
- defaults to an instance of [`DefaultFilter`][watchfiles.DefaultFilter]. To keep all changes, use `None`.
- debounce: maximum time in milliseconds to group changes over before yielding them.
- step: time to wait for new changes in milliseconds, if no changes are detected in this time, and
- at least one change has been detected, the changes are yielded.
- stop_event: event to stop watching, if this is set, the generator will stop iteration,
- this can be anything with an `is_set()` method which returns a bool, e.g. `threading.Event()`.
- rust_timeout: maximum time in milliseconds to wait in the rust code for changes, `0` means no timeout.
- yield_on_timeout: if `True`, the generator will yield upon timeout in rust even if no changes are detected.
- debug: whether to print information about all filesystem changes in rust to stdout.
- raise_interrupt: whether to re-raise `KeyboardInterrupt`s, or suppress the error and just stop iterating.
- force_polling: See [Force polling](#force-polling) above.
- poll_delay_ms: delay between polling for changes, only used if `force_polling=True`.
- recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in the
- top-level directory, default is `True`.
- ignore_permission_denied: if `True`, will ignore permission denied errors, otherwise will raise them by default.
- Setting the `WATCHFILES_IGNORE_PERMISSION_DENIED` environment variable will set this value too.
-
- Yields:
- The generator yields sets of [`FileChange`][watchfiles.main.FileChange]s.
-
- ```py title="Example of watch usage"
- from watchfiles import watch
-
- for changes in watch('./first/dir', './second/dir', raise_interrupt=False):
- print(changes)
- ```
- """
- force_polling = _default_force_polling(force_polling)
- ignore_permission_denied = _default_ignore_permission_denied(ignore_permission_denied)
- with RustNotify(
- [str(p) for p in paths], debug, force_polling, poll_delay_ms, recursive, ignore_permission_denied
- ) as watcher:
- while True:
- raw_changes = watcher.watch(debounce, step, rust_timeout, stop_event)
- if raw_changes == 'timeout':
- if yield_on_timeout:
- yield set()
- else:
- logger.debug('rust notify timeout, continuing')
- elif raw_changes == 'signal':
- if raise_interrupt:
- raise KeyboardInterrupt
- else:
- logger.warning('KeyboardInterrupt caught, stopping watch')
- return
- elif raw_changes == 'stop':
- return
- else:
- changes = _prep_changes(raw_changes, watch_filter)
- if changes:
- _log_changes(changes)
- yield changes
- else:
- logger.debug('all changes filtered out, raw_changes=%s', raw_changes)
-
-
-async def awatch( # noqa C901
- *paths: Union[Path, str],
- watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
- debounce: int = 1_600,
- step: int = 50,
- stop_event: Optional['AnyEvent'] = None,
- rust_timeout: Optional[int] = None,
- yield_on_timeout: bool = False,
- debug: bool = False,
- raise_interrupt: Optional[bool] = None,
- force_polling: Optional[bool] = None,
- poll_delay_ms: int = 300,
- recursive: bool = True,
- ignore_permission_denied: Optional[bool] = None,
-) -> AsyncGenerator[Set[FileChange], None]:
- """
- Asynchronous equivalent of [`watch`][watchfiles.watch] using threads to wait for changes.
- Arguments match those of [`watch`][watchfiles.watch] except `stop_event`.
-
- All async methods use [anyio](https://anyio.readthedocs.io/en/latest/) to run the event loop.
-
- Unlike [`watch`][watchfiles.watch] `KeyboardInterrupt` cannot be suppressed by `awatch` so they need to be caught
- where `asyncio.run` or equivalent is called.
-
- Args:
- *paths: filesystem paths to watch.
- watch_filter: matches the same argument of [`watch`][watchfiles.watch].
- debounce: matches the same argument of [`watch`][watchfiles.watch].
- step: matches the same argument of [`watch`][watchfiles.watch].
- stop_event: `anyio.Event` which can be used to stop iteration, see example below.
- rust_timeout: matches the same argument of [`watch`][watchfiles.watch], except that `None` means
- use `1_000` on Windows and `5_000` on other platforms thus helping with exiting on `Ctrl+C` on Windows,
- see [#110](https://github.com/samuelcolvin/watchfiles/issues/110).
- yield_on_timeout: matches the same argument of [`watch`][watchfiles.watch].
- debug: matches the same argument of [`watch`][watchfiles.watch].
- raise_interrupt: This is deprecated, `KeyboardInterrupt` will cause this coroutine to be cancelled and then
- be raised by the top level `asyncio.run` call or equivalent, and should be caught there.
- See [#136](https://github.com/samuelcolvin/watchfiles/issues/136)
- force_polling: if true, always use polling instead of file system notifications, default is `None` where
- `force_polling` is set to `True` if the `WATCHFILES_FORCE_POLLING` environment variable exists.
- poll_delay_ms: delay between polling for changes, only used if `force_polling=True`.
- recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in the
- top-level directory, default is `True`.
- ignore_permission_denied: if `True`, will ignore permission denied errors, otherwise will raise them by default.
- Setting the `WATCHFILES_IGNORE_PERMISSION_DENIED` environment variable will set this value too.
-
- Yields:
- The generator yields sets of [`FileChange`][watchfiles.main.FileChange]s.
-
- ```py title="Example of awatch usage"
- import asyncio
- from watchfiles import awatch
-
- async def main():
- async for changes in awatch('./first/dir', './second/dir'):
- print(changes)
-
- if __name__ == '__main__':
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- print('stopped via KeyboardInterrupt')
- ```
-
- ```py title="Example of awatch usage with a stop event"
- import asyncio
- from watchfiles import awatch
-
- async def main():
- stop_event = asyncio.Event()
-
- async def stop_soon():
- await asyncio.sleep(3)
- stop_event.set()
-
- stop_soon_task = asyncio.create_task(stop_soon())
-
- async for changes in awatch('/path/to/dir', stop_event=stop_event):
- print(changes)
-
- # cleanup by awaiting the (now complete) stop_soon_task
- await stop_soon_task
-
- asyncio.run(main())
- ```
- """
- if raise_interrupt is not None:
- warnings.warn(
- 'raise_interrupt is deprecated, KeyboardInterrupt will cause this coroutine to be cancelled and then '
- 'be raised by the top level asyncio.run call or equivalent, and should be caught there. See #136.',
- DeprecationWarning,
- )
-
- if stop_event is None:
- stop_event_: 'AnyEvent' = anyio.Event()
- else:
- stop_event_ = stop_event
-
- force_polling = _default_force_polling(force_polling)
- ignore_permission_denied = _default_ignore_permission_denied(ignore_permission_denied)
- with RustNotify(
- [str(p) for p in paths], debug, force_polling, poll_delay_ms, recursive, ignore_permission_denied
- ) as watcher:
- timeout = _calc_async_timeout(rust_timeout)
- CancelledError = anyio.get_cancelled_exc_class()
-
- while True:
- async with anyio.create_task_group() as tg:
- try:
- raw_changes = await anyio.to_thread.run_sync(watcher.watch, debounce, step, timeout, stop_event_)
- except (CancelledError, KeyboardInterrupt):
- stop_event_.set()
- # suppressing KeyboardInterrupt wouldn't stop it getting raised by the top level asyncio.run call
- raise
- tg.cancel_scope.cancel()
-
- if raw_changes == 'timeout':
- if yield_on_timeout:
- yield set()
- else:
- logger.debug('rust notify timeout, continuing')
- elif raw_changes == 'stop':
- return
- elif raw_changes == 'signal':
- # in theory the watch thread should never get a signal
- raise RuntimeError('watch thread unexpectedly received a signal')
- else:
- changes = _prep_changes(raw_changes, watch_filter)
- if changes:
- _log_changes(changes)
- yield changes
- else:
- logger.debug('all changes filtered out, raw_changes=%s', raw_changes)
-
-
-def _prep_changes(
- raw_changes: Set[Tuple[int, str]], watch_filter: Optional[Callable[[Change, str], bool]]
-) -> Set[FileChange]:
- # if we wanted to be really snazzy, we could move this into rust
- changes = {(Change(change), path) for change, path in raw_changes}
- if watch_filter:
- changes = {c for c in changes if watch_filter(c[0], c[1])}
- return changes
-
-
-def _log_changes(changes: Set[FileChange]) -> None:
- if logger.isEnabledFor(logging.INFO): # pragma: no branch
- count = len(changes)
- plural = '' if count == 1 else 's'
- if logger.isEnabledFor(logging.DEBUG):
- logger.debug('%d change%s detected: %s', count, plural, changes)
- else:
- logger.info('%d change%s detected', count, plural)
-
-
-def _calc_async_timeout(timeout: Optional[int]) -> int:
- """
- see https://github.com/samuelcolvin/watchfiles/issues/110
- """
- if timeout is None:
- if sys.platform == 'win32':
- return 1_000
- else:
- return 5_000
- else:
- return timeout
-
-
-def _default_force_polling(force_polling: Optional[bool]) -> bool:
- """
- See docstring for `watch` above for details.
-
- See samuelcolvin/watchfiles#167 and samuelcolvin/watchfiles#187 for discussion and rationale.
- """
- if force_polling is not None:
- return force_polling
- env_var = os.getenv('WATCHFILES_FORCE_POLLING')
- if env_var:
- return env_var.lower() not in {'false', 'disable', 'disabled'}
- else:
- return _auto_force_polling()
-
-
-def _auto_force_polling() -> bool:
- """
- Whether to auto-enable force polling, it should be enabled automatically only on WSL.
-
- See samuelcolvin/watchfiles#187 for discussion.
- """
- import platform
-
- uname = platform.uname()
- return 'microsoft-standard' in uname.release.lower() and uname.system.lower() == 'linux'
-
-
-def _default_ignore_permission_denied(ignore_permission_denied: Optional[bool]) -> bool:
- if ignore_permission_denied is not None:
- return ignore_permission_denied
- env_var = os.getenv('WATCHFILES_IGNORE_PERMISSION_DENIED')
- return bool(env_var)
diff --git a/venv/lib/python3.11/site-packages/watchfiles/py.typed b/venv/lib/python3.11/site-packages/watchfiles/py.typed
deleted file mode 100644
index 7cd6d6f..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/py.typed
+++ /dev/null
@@ -1 +0,0 @@
-# Marker file for PEP 561. The watchfiles package uses inline types.
diff --git a/venv/lib/python3.11/site-packages/watchfiles/run.py b/venv/lib/python3.11/site-packages/watchfiles/run.py
deleted file mode 100644
index d1e2494..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/run.py
+++ /dev/null
@@ -1,441 +0,0 @@
-import contextlib
-import json
-import logging
-import os
-import re
-import shlex
-import signal
-import subprocess
-import sys
-from importlib import import_module
-from multiprocessing import get_context
-from multiprocessing.context import SpawnProcess
-from pathlib import Path
-from time import sleep
-from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union
-
-import anyio
-
-from .filters import DefaultFilter
-from .main import Change, FileChange, awatch, watch
-
-if TYPE_CHECKING:
- try:
- from typing import Literal
- except ImportError:
- from typing_extensions import Literal # type: ignore[misc]
-
-__all__ = 'run_process', 'arun_process', 'detect_target_type', 'import_string'
-logger = logging.getLogger('watchfiles.main')
-
-
-def run_process(
- *paths: Union[Path, str],
- target: Union[str, Callable[..., Any]],
- args: Tuple[Any, ...] = (),
- kwargs: Optional[Dict[str, Any]] = None,
- target_type: "Literal['function', 'command', 'auto']" = 'auto',
- callback: Optional[Callable[[Set[FileChange]], None]] = None,
- watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
- grace_period: float = 0,
- debounce: int = 1_600,
- step: int = 50,
- debug: bool = False,
- sigint_timeout: int = 5,
- sigkill_timeout: int = 1,
- recursive: bool = True,
- ignore_permission_denied: bool = False,
-) -> int:
- """
- Run a process and restart it upon file changes.
-
- `run_process` can work in two ways:
-
- * Using `multiprocessing.Process` † to run a python function
- * Or, using `subprocess.Popen` to run a command
-
- !!! note
-
- **†** technically `multiprocessing.get_context('spawn').Process` to avoid forking and improve
- code reload/import.
-
- Internally, `run_process` uses [`watch`][watchfiles.watch] with `raise_interrupt=False` so the function
- exits cleanly upon `Ctrl+C`.
-
- Args:
- *paths: matches the same argument of [`watch`][watchfiles.watch]
- target: function or command to run
- args: arguments to pass to `target`, only used if `target` is a function
- kwargs: keyword arguments to pass to `target`, only used if `target` is a function
- target_type: type of target. Can be `'function'`, `'command'`, or `'auto'` in which case
- [`detect_target_type`][watchfiles.run.detect_target_type] is used to determine the type.
- callback: function to call on each reload, the function should accept a set of changes as the sole argument
- watch_filter: matches the same argument of [`watch`][watchfiles.watch]
- grace_period: number of seconds after the process is started before watching for changes
- debounce: matches the same argument of [`watch`][watchfiles.watch]
- step: matches the same argument of [`watch`][watchfiles.watch]
- debug: matches the same argument of [`watch`][watchfiles.watch]
- sigint_timeout: the number of seconds to wait after sending sigint before sending sigkill
- sigkill_timeout: the number of seconds to wait after sending sigkill before raising an exception
- recursive: matches the same argument of [`watch`][watchfiles.watch]
-
- Returns:
- number of times the function was reloaded.
-
- ```py title="Example of run_process running a function"
- from watchfiles import run_process
-
- def callback(changes):
- print('changes detected:', changes)
-
- def foobar(a, b):
- print('foobar called with:', a, b)
-
- if __name__ == '__main__':
- run_process('./path/to/dir', target=foobar, args=(1, 2), callback=callback)
- ```
-
- As well as using a `callback` function, changes can be accessed from within the target function,
- using the `WATCHFILES_CHANGES` environment variable.
-
- ```py title="Example of run_process accessing changes"
- from watchfiles import run_process
-
- def foobar(a, b, c):
- # changes will be an empty list "[]" the first time the function is called
- changes = os.getenv('WATCHFILES_CHANGES')
- changes = json.loads(changes)
- print('foobar called due to changes:', changes)
-
- if __name__ == '__main__':
- run_process('./path/to/dir', target=foobar, args=(1, 2, 3))
- ```
-
- Again with the target as `command`, `WATCHFILES_CHANGES` can be used
- to access changes.
-
- ```bash title="example.sh"
- echo "changers: ${WATCHFILES_CHANGES}"
- ```
-
- ```py title="Example of run_process running a command"
- from watchfiles import run_process
-
- if __name__ == '__main__':
- run_process('.', target='./example.sh')
- ```
- """
- if target_type == 'auto':
- target_type = detect_target_type(target)
-
- logger.debug('running "%s" as %s', target, target_type)
- catch_sigterm()
- process = start_process(target, target_type, args, kwargs)
- reloads = 0
-
- if grace_period:
- logger.debug('sleeping for %s seconds before watching for changes', grace_period)
- sleep(grace_period)
-
- try:
- for changes in watch(
- *paths,
- watch_filter=watch_filter,
- debounce=debounce,
- step=step,
- debug=debug,
- raise_interrupt=False,
- recursive=recursive,
- ignore_permission_denied=ignore_permission_denied,
- ):
- callback and callback(changes)
- process.stop(sigint_timeout=sigint_timeout, sigkill_timeout=sigkill_timeout)
- process = start_process(target, target_type, args, kwargs, changes)
- reloads += 1
- finally:
- process.stop()
- return reloads
-
-
-async def arun_process(
- *paths: Union[Path, str],
- target: Union[str, Callable[..., Any]],
- args: Tuple[Any, ...] = (),
- kwargs: Optional[Dict[str, Any]] = None,
- target_type: "Literal['function', 'command', 'auto']" = 'auto',
- callback: Optional[Callable[[Set[FileChange]], Any]] = None,
- watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
- grace_period: float = 0,
- debounce: int = 1_600,
- step: int = 50,
- debug: bool = False,
- recursive: bool = True,
- ignore_permission_denied: bool = False,
-) -> int:
- """
- Async equivalent of [`run_process`][watchfiles.run_process], all arguments match those of `run_process` except
- `callback` which can be a coroutine.
-
- Starting and stopping the process and watching for changes is done in a separate thread.
-
- As with `run_process`, internally `arun_process` uses [`awatch`][watchfiles.awatch], however `KeyboardInterrupt`
- cannot be caught and suppressed in `awatch` so these errors need to be caught separately, see below.
-
- ```py title="Example of arun_process usage"
- import asyncio
- from watchfiles import arun_process
-
- async def callback(changes):
- await asyncio.sleep(0.1)
- print('changes detected:', changes)
-
- def foobar(a, b):
- print('foobar called with:', a, b)
-
- async def main():
- await arun_process('.', target=foobar, args=(1, 2), callback=callback)
-
- if __name__ == '__main__':
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- print('stopped via KeyboardInterrupt')
- ```
- """
- import inspect
-
- if target_type == 'auto':
- target_type = detect_target_type(target)
-
- logger.debug('running "%s" as %s', target, target_type)
- catch_sigterm()
- process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs)
- reloads = 0
-
- if grace_period:
- logger.debug('sleeping for %s seconds before watching for changes', grace_period)
- await anyio.sleep(grace_period)
-
- async for changes in awatch(
- *paths,
- watch_filter=watch_filter,
- debounce=debounce,
- step=step,
- debug=debug,
- recursive=recursive,
- ignore_permission_denied=ignore_permission_denied,
- ):
- if callback is not None:
- r = callback(changes)
- if inspect.isawaitable(r):
- await r
-
- await anyio.to_thread.run_sync(process.stop)
- process = await anyio.to_thread.run_sync(start_process, target, target_type, args, kwargs, changes)
- reloads += 1
- await anyio.to_thread.run_sync(process.stop)
- return reloads
-
-
-# Use spawn context to make sure code run in subprocess
-# does not reuse imported modules in main process/context
-spawn_context = get_context('spawn')
-
-
-def split_cmd(cmd: str) -> List[str]:
- import platform
-
- posix = platform.uname().system.lower() != 'windows'
- return shlex.split(cmd, posix=posix)
-
-
-def start_process(
- target: Union[str, Callable[..., Any]],
- target_type: "Literal['function', 'command']",
- args: Tuple[Any, ...],
- kwargs: Optional[Dict[str, Any]],
- changes: Optional[Set[FileChange]] = None,
-) -> 'CombinedProcess':
- if changes is None:
- changes_env_var = '[]'
- else:
- changes_env_var = json.dumps([[c.raw_str(), p] for c, p in changes])
-
- os.environ['WATCHFILES_CHANGES'] = changes_env_var
-
- process: 'Union[SpawnProcess, subprocess.Popen[bytes]]'
- if target_type == 'function':
- kwargs = kwargs or {}
- if isinstance(target, str):
- args = target, get_tty_path(), args, kwargs
- target_ = run_function
- kwargs = {}
- else:
- target_ = target
-
- process = spawn_context.Process(target=target_, args=args, kwargs=kwargs)
- process.start()
- else:
- if args or kwargs:
- logger.warning('ignoring args and kwargs for "command" target')
-
- assert isinstance(target, str), 'target must be a string to run as a command'
- popen_args = split_cmd(target)
- process = subprocess.Popen(popen_args)
- return CombinedProcess(process)
-
-
-def detect_target_type(target: Union[str, Callable[..., Any]]) -> "Literal['function', 'command']":
- """
- Used by [`run_process`][watchfiles.run_process], [`arun_process`][watchfiles.arun_process]
- and indirectly the CLI to determine the target type with `target_type` is `auto`.
-
- Detects the target type - either `function` or `command`. This method is only called with `target_type='auto'`.
-
- The following logic is employed:
-
- * If `target` is not a string, it is assumed to be a function
- * If `target` ends with `.py` or `.sh`, it is assumed to be a command
- * Otherwise, the target is assumed to be a function if it matches the regex `[a-zA-Z0-9_]+(\\.[a-zA-Z0-9_]+)+`
-
- If this logic does not work for you, specify the target type explicitly using the `target_type` function argument
- or `--target-type` command line argument.
-
- Args:
- target: The target value
-
- Returns:
- either `'function'` or `'command'`
- """
- if not isinstance(target, str):
- return 'function'
- elif target.endswith(('.py', '.sh')):
- return 'command'
- elif re.fullmatch(r'[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)+', target):
- return 'function'
- else:
- return 'command'
-
-
-class CombinedProcess:
- def __init__(self, p: 'Union[SpawnProcess, subprocess.Popen[bytes]]'):
- self._p = p
- assert self.pid is not None, 'process not yet spawned'
-
- def stop(self, sigint_timeout: int = 5, sigkill_timeout: int = 1) -> None:
- os.environ.pop('WATCHFILES_CHANGES', None)
- if self.is_alive():
- logger.debug('stopping process...')
-
- os.kill(self.pid, signal.SIGINT)
-
- try:
- self.join(sigint_timeout)
- except subprocess.TimeoutExpired:
- # Capture this exception to allow the self.exitcode to be reached.
- # This will allow the SIGKILL to be sent, otherwise it is swallowed up.
- logger.warning('SIGINT timed out after %r seconds', sigint_timeout)
- pass
-
- if self.exitcode is None:
- logger.warning('process has not terminated, sending SIGKILL')
- os.kill(self.pid, signal.SIGKILL)
- self.join(sigkill_timeout)
- else:
- logger.debug('process stopped')
- else:
- logger.warning('process already dead, exit code: %d', self.exitcode)
-
- def is_alive(self) -> bool:
- if isinstance(self._p, SpawnProcess):
- return self._p.is_alive()
- else:
- return self._p.poll() is None
-
- @property
- def pid(self) -> int:
- # we check the process has always been spawned when CombinedProcess is initialised
- return self._p.pid # type: ignore[return-value]
-
- def join(self, timeout: int) -> None:
- if isinstance(self._p, SpawnProcess):
- self._p.join(timeout)
- else:
- self._p.wait(timeout)
-
- @property
- def exitcode(self) -> Optional[int]:
- if isinstance(self._p, SpawnProcess):
- return self._p.exitcode
- else:
- return self._p.returncode
-
-
-def run_function(function: str, tty_path: Optional[str], args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> None:
- with set_tty(tty_path):
- func = import_string(function)
- func(*args, **kwargs)
-
-
-def import_string(dotted_path: str) -> Any:
- """
- Stolen approximately from django. Import a dotted module path and return the attribute/class designated by the
- last name in the path. Raise ImportError if the import fails.
- """
- try:
- module_path, class_name = dotted_path.strip(' ').rsplit('.', 1)
- except ValueError as e:
- raise ImportError(f'"{dotted_path}" doesn\'t look like a module path') from e
-
- module = import_module(module_path)
- try:
- return getattr(module, class_name)
- except AttributeError as e:
- raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute') from e
-
-
-def get_tty_path() -> Optional[str]: # pragma: no cover
- """
- Return the path to the current TTY, if any.
-
- Virtually impossible to test in pytest, hence no cover.
- """
- try:
- return os.ttyname(sys.stdin.fileno())
- except OSError:
- # fileno() always fails with pytest
- return '/dev/tty'
- except AttributeError:
- # on Windows. No idea of a better solution
- return None
-
-
-@contextlib.contextmanager
-def set_tty(tty_path: Optional[str]) -> Generator[None, None, None]:
- if tty_path:
- try:
- with open(tty_path) as tty: # pragma: no cover
- sys.stdin = tty
- yield
- except OSError:
- # eg. "No such device or address: '/dev/tty'", see https://github.com/samuelcolvin/watchfiles/issues/40
- yield
- else:
- # currently on windows tty_path is None and there's nothing we can do here
- yield
-
-
-def raise_keyboard_interrupt(signum: int, _frame: Any) -> None: # pragma: no cover
- logger.warning('received signal %s, raising KeyboardInterrupt', signal.Signals(signum))
- raise KeyboardInterrupt
-
-
-def catch_sigterm() -> None:
- """
- Catch SIGTERM and raise KeyboardInterrupt instead. This means watchfiles will stop quickly
- on `docker compose stop` and other cases where SIGTERM is sent.
-
- Without this the watchfiles process will be killed while a running process will continue uninterrupted.
- """
- logger.debug('registering handler for SIGTERM on watchfiles process %d', os.getpid())
- signal.signal(signal.SIGTERM, raise_keyboard_interrupt)
diff --git a/venv/lib/python3.11/site-packages/watchfiles/version.py b/venv/lib/python3.11/site-packages/watchfiles/version.py
deleted file mode 100644
index f55721f..0000000
--- a/venv/lib/python3.11/site-packages/watchfiles/version.py
+++ /dev/null
@@ -1,5 +0,0 @@
-from ._rust_notify import __version__
-
-__all__ = ('VERSION',)
-
-VERSION = __version__