diff options
Diffstat (limited to 'venv/lib/python3.11/site-packages/watchfiles/main.py')
| -rw-r--r-- | venv/lib/python3.11/site-packages/watchfiles/main.py | 344 | 
1 files changed, 0 insertions, 344 deletions
diff --git a/venv/lib/python3.11/site-packages/watchfiles/main.py b/venv/lib/python3.11/site-packages/watchfiles/main.py deleted file mode 100644 index 1df7e54..0000000 --- a/venv/lib/python3.11/site-packages/watchfiles/main.py +++ /dev/null @@ -1,344 +0,0 @@ -import logging -import os -import sys -import warnings -from enum import IntEnum -from pathlib import Path -from typing import TYPE_CHECKING, AsyncGenerator, Callable, Generator, Optional, Set, Tuple, Union - -import anyio - -from ._rust_notify import RustNotify -from .filters import DefaultFilter - -__all__ = 'watch', 'awatch', 'Change', 'FileChange' -logger = logging.getLogger('watchfiles.main') - - -class Change(IntEnum): -    """ -    Enum representing the type of change that occurred. -    """ - -    added = 1 -    """A new file or directory was added.""" -    modified = 2 -    """A file or directory was modified, can be either a metadata or data change.""" -    deleted = 3 -    """A file or directory was deleted.""" - -    def raw_str(self) -> str: -        return self.name - - -FileChange = Tuple[Change, str] -""" -A tuple representing a file change, first element is a [`Change`][watchfiles.Change] member, second is the path -of the file or directory that changed. -""" - -if TYPE_CHECKING: -    import asyncio -    from typing import Protocol - -    import trio - -    AnyEvent = Union[anyio.Event, asyncio.Event, trio.Event] - -    class AbstractEvent(Protocol): -        def is_set(self) -> bool: -            ... - - -def watch( -    *paths: Union[Path, str], -    watch_filter: Optional[Callable[['Change', str], bool]] = DefaultFilter(), -    debounce: int = 1_600, -    step: int = 50, -    stop_event: Optional['AbstractEvent'] = None, -    rust_timeout: int = 5_000, -    yield_on_timeout: bool = False, -    debug: bool = False, -    raise_interrupt: bool = True, -    force_polling: Optional[bool] = None, -    poll_delay_ms: int = 300, -    recursive: bool = True, -    ignore_permission_denied: Optional[bool] = None, -) -> Generator[Set[FileChange], None, None]: -    """ -    Watch one or more paths and yield a set of changes whenever files change. - -    The paths watched can be directories or files, directories are watched recursively - changes in subdirectories -    are also detected. - -    #### Force polling - -    Notify will fall back to file polling if it can't use file system notifications, but we also force notify -    to us polling if the `force_polling` argument is `True`; if `force_polling` is unset (or `None`), we enable -    force polling thus: - -    * if the `WATCHFILES_FORCE_POLLING` environment variable exists and is not empty: -       * if the value is `false`, `disable` or `disabled`, force polling is disabled -       * otherwise, force polling is enabled -    * otherwise, we enable force polling only if we detect we're running on WSL (Windows Subsystem for Linux) - -    Args: -        *paths: filesystem paths to watch. -        watch_filter: callable used to filter out changes which are not important, you can either use a raw callable -            or a [`BaseFilter`][watchfiles.BaseFilter] instance, -            defaults to an instance of [`DefaultFilter`][watchfiles.DefaultFilter]. To keep all changes, use `None`. -        debounce: maximum time in milliseconds to group changes over before yielding them. -        step: time to wait for new changes in milliseconds, if no changes are detected in this time, and -            at least one change has been detected, the changes are yielded. -        stop_event: event to stop watching, if this is set, the generator will stop iteration, -            this can be anything with an `is_set()` method which returns a bool, e.g. `threading.Event()`. -        rust_timeout: maximum time in milliseconds to wait in the rust code for changes, `0` means no timeout. -        yield_on_timeout: if `True`, the generator will yield upon timeout in rust even if no changes are detected. -        debug: whether to print information about all filesystem changes in rust to stdout. -        raise_interrupt: whether to re-raise `KeyboardInterrupt`s, or suppress the error and just stop iterating. -        force_polling: See [Force polling](#force-polling) above. -        poll_delay_ms: delay between polling for changes, only used if `force_polling=True`. -        recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in the -            top-level directory, default is `True`. -        ignore_permission_denied: if `True`, will ignore permission denied errors, otherwise will raise them by default. -            Setting the `WATCHFILES_IGNORE_PERMISSION_DENIED` environment variable will set this value too. - -    Yields: -        The generator yields sets of [`FileChange`][watchfiles.main.FileChange]s. - -    ```py title="Example of watch usage" -    from watchfiles import watch - -    for changes in watch('./first/dir', './second/dir', raise_interrupt=False): -        print(changes) -    ``` -    """ -    force_polling = _default_force_polling(force_polling) -    ignore_permission_denied = _default_ignore_permission_denied(ignore_permission_denied) -    with RustNotify( -        [str(p) for p in paths], debug, force_polling, poll_delay_ms, recursive, ignore_permission_denied -    ) as watcher: -        while True: -            raw_changes = watcher.watch(debounce, step, rust_timeout, stop_event) -            if raw_changes == 'timeout': -                if yield_on_timeout: -                    yield set() -                else: -                    logger.debug('rust notify timeout, continuing') -            elif raw_changes == 'signal': -                if raise_interrupt: -                    raise KeyboardInterrupt -                else: -                    logger.warning('KeyboardInterrupt caught, stopping watch') -                    return -            elif raw_changes == 'stop': -                return -            else: -                changes = _prep_changes(raw_changes, watch_filter) -                if changes: -                    _log_changes(changes) -                    yield changes -                else: -                    logger.debug('all changes filtered out, raw_changes=%s', raw_changes) - - -async def awatch(  # noqa C901 -    *paths: Union[Path, str], -    watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(), -    debounce: int = 1_600, -    step: int = 50, -    stop_event: Optional['AnyEvent'] = None, -    rust_timeout: Optional[int] = None, -    yield_on_timeout: bool = False, -    debug: bool = False, -    raise_interrupt: Optional[bool] = None, -    force_polling: Optional[bool] = None, -    poll_delay_ms: int = 300, -    recursive: bool = True, -    ignore_permission_denied: Optional[bool] = None, -) -> AsyncGenerator[Set[FileChange], None]: -    """ -    Asynchronous equivalent of [`watch`][watchfiles.watch] using threads to wait for changes. -    Arguments match those of [`watch`][watchfiles.watch] except `stop_event`. - -    All async methods use [anyio](https://anyio.readthedocs.io/en/latest/) to run the event loop. - -    Unlike [`watch`][watchfiles.watch] `KeyboardInterrupt` cannot be suppressed by `awatch` so they need to be caught -    where `asyncio.run` or equivalent is called. - -    Args: -        *paths: filesystem paths to watch. -        watch_filter: matches the same argument of [`watch`][watchfiles.watch]. -        debounce: matches the same argument of [`watch`][watchfiles.watch]. -        step: matches the same argument of [`watch`][watchfiles.watch]. -        stop_event: `anyio.Event` which can be used to stop iteration, see example below. -        rust_timeout: matches the same argument of [`watch`][watchfiles.watch], except that `None` means -            use `1_000` on Windows and `5_000` on other platforms thus helping with exiting on `Ctrl+C` on Windows, -            see [#110](https://github.com/samuelcolvin/watchfiles/issues/110). -        yield_on_timeout: matches the same argument of [`watch`][watchfiles.watch]. -        debug: matches the same argument of [`watch`][watchfiles.watch]. -        raise_interrupt: This is deprecated, `KeyboardInterrupt` will cause this coroutine to be cancelled and then -            be raised by the top level `asyncio.run` call or equivalent, and should be caught there. -            See [#136](https://github.com/samuelcolvin/watchfiles/issues/136) -        force_polling: if true, always use polling instead of file system notifications, default is `None` where -            `force_polling` is set to `True` if the `WATCHFILES_FORCE_POLLING` environment variable exists. -        poll_delay_ms: delay between polling for changes, only used if `force_polling=True`. -        recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in the -            top-level directory, default is `True`. -        ignore_permission_denied: if `True`, will ignore permission denied errors, otherwise will raise them by default. -            Setting the `WATCHFILES_IGNORE_PERMISSION_DENIED` environment variable will set this value too. - -    Yields: -        The generator yields sets of [`FileChange`][watchfiles.main.FileChange]s. - -    ```py title="Example of awatch usage" -    import asyncio -    from watchfiles import awatch - -    async def main(): -        async for changes in awatch('./first/dir', './second/dir'): -            print(changes) - -    if __name__ == '__main__': -        try: -            asyncio.run(main()) -        except KeyboardInterrupt: -            print('stopped via KeyboardInterrupt') -    ``` - -    ```py title="Example of awatch usage with a stop event" -    import asyncio -    from watchfiles import awatch - -    async def main(): -        stop_event = asyncio.Event() - -        async def stop_soon(): -            await asyncio.sleep(3) -            stop_event.set() - -        stop_soon_task = asyncio.create_task(stop_soon()) - -        async for changes in awatch('/path/to/dir', stop_event=stop_event): -            print(changes) - -        # cleanup by awaiting the (now complete) stop_soon_task -        await stop_soon_task - -    asyncio.run(main()) -    ``` -    """ -    if raise_interrupt is not None: -        warnings.warn( -            'raise_interrupt is deprecated, KeyboardInterrupt will cause this coroutine to be cancelled and then ' -            'be raised by the top level asyncio.run call or equivalent, and should be caught there. See #136.', -            DeprecationWarning, -        ) - -    if stop_event is None: -        stop_event_: 'AnyEvent' = anyio.Event() -    else: -        stop_event_ = stop_event - -    force_polling = _default_force_polling(force_polling) -    ignore_permission_denied = _default_ignore_permission_denied(ignore_permission_denied) -    with RustNotify( -        [str(p) for p in paths], debug, force_polling, poll_delay_ms, recursive, ignore_permission_denied -    ) as watcher: -        timeout = _calc_async_timeout(rust_timeout) -        CancelledError = anyio.get_cancelled_exc_class() - -        while True: -            async with anyio.create_task_group() as tg: -                try: -                    raw_changes = await anyio.to_thread.run_sync(watcher.watch, debounce, step, timeout, stop_event_) -                except (CancelledError, KeyboardInterrupt): -                    stop_event_.set() -                    # suppressing KeyboardInterrupt wouldn't stop it getting raised by the top level asyncio.run call -                    raise -                tg.cancel_scope.cancel() - -            if raw_changes == 'timeout': -                if yield_on_timeout: -                    yield set() -                else: -                    logger.debug('rust notify timeout, continuing') -            elif raw_changes == 'stop': -                return -            elif raw_changes == 'signal': -                # in theory the watch thread should never get a signal -                raise RuntimeError('watch thread unexpectedly received a signal') -            else: -                changes = _prep_changes(raw_changes, watch_filter) -                if changes: -                    _log_changes(changes) -                    yield changes -                else: -                    logger.debug('all changes filtered out, raw_changes=%s', raw_changes) - - -def _prep_changes( -    raw_changes: Set[Tuple[int, str]], watch_filter: Optional[Callable[[Change, str], bool]] -) -> Set[FileChange]: -    # if we wanted to be really snazzy, we could move this into rust -    changes = {(Change(change), path) for change, path in raw_changes} -    if watch_filter: -        changes = {c for c in changes if watch_filter(c[0], c[1])} -    return changes - - -def _log_changes(changes: Set[FileChange]) -> None: -    if logger.isEnabledFor(logging.INFO):  # pragma: no branch -        count = len(changes) -        plural = '' if count == 1 else 's' -        if logger.isEnabledFor(logging.DEBUG): -            logger.debug('%d change%s detected: %s', count, plural, changes) -        else: -            logger.info('%d change%s detected', count, plural) - - -def _calc_async_timeout(timeout: Optional[int]) -> int: -    """ -    see https://github.com/samuelcolvin/watchfiles/issues/110 -    """ -    if timeout is None: -        if sys.platform == 'win32': -            return 1_000 -        else: -            return 5_000 -    else: -        return timeout - - -def _default_force_polling(force_polling: Optional[bool]) -> bool: -    """ -    See docstring for `watch` above for details. - -    See samuelcolvin/watchfiles#167 and samuelcolvin/watchfiles#187 for discussion and rationale. -    """ -    if force_polling is not None: -        return force_polling -    env_var = os.getenv('WATCHFILES_FORCE_POLLING') -    if env_var: -        return env_var.lower() not in {'false', 'disable', 'disabled'} -    else: -        return _auto_force_polling() - - -def _auto_force_polling() -> bool: -    """ -    Whether to auto-enable force polling, it should be enabled automatically only on WSL. - -    See samuelcolvin/watchfiles#187 for discussion. -    """ -    import platform - -    uname = platform.uname() -    return 'microsoft-standard' in uname.release.lower() and uname.system.lower() == 'linux' - - -def _default_ignore_permission_denied(ignore_permission_denied: Optional[bool]) -> bool: -    if ignore_permission_denied is not None: -        return ignore_permission_denied -    env_var = os.getenv('WATCHFILES_IGNORE_PERMISSION_DENIED') -    return bool(env_var)  | 
