summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/watchfiles/main.py
blob: 1df7e54700c62adb384edd9b07ff545384d9e239 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import logging
import os
import sys
import warnings
from enum import IntEnum
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator, Callable, Generator, Optional, Set, Tuple, Union

import anyio

from ._rust_notify import RustNotify
from .filters import DefaultFilter

__all__ = 'watch', 'awatch', 'Change', 'FileChange'
logger = logging.getLogger('watchfiles.main')


class Change(IntEnum):
    """
    Enum representing the type of change that occurred.
    """

    added = 1
    """A new file or directory was added."""
    modified = 2
    """A file or directory was modified, can be either a metadata or data change."""
    deleted = 3
    """A file or directory was deleted."""

    def raw_str(self) -> str:
        return self.name


FileChange = Tuple[Change, str]
"""
A tuple representing a file change, first element is a [`Change`][watchfiles.Change] member, second is the path
of the file or directory that changed.
"""

if TYPE_CHECKING:
    import asyncio
    from typing import Protocol

    import trio

    AnyEvent = Union[anyio.Event, asyncio.Event, trio.Event]

    class AbstractEvent(Protocol):
        def is_set(self) -> bool:
            ...


def watch(
    *paths: Union[Path, str],
    watch_filter: Optional[Callable[['Change', str], bool]] = DefaultFilter(),
    debounce: int = 1_600,
    step: int = 50,
    stop_event: Optional['AbstractEvent'] = None,
    rust_timeout: int = 5_000,
    yield_on_timeout: bool = False,
    debug: bool = False,
    raise_interrupt: bool = True,
    force_polling: Optional[bool] = None,
    poll_delay_ms: int = 300,
    recursive: bool = True,
    ignore_permission_denied: Optional[bool] = None,
) -> Generator[Set[FileChange], None, None]:
    """
    Watch one or more paths and yield a set of changes whenever files change.

    The paths watched can be directories or files, directories are watched recursively - changes in subdirectories
    are also detected.

    #### Force polling

    Notify will fall back to file polling if it can't use file system notifications, but we also force notify
    to us polling if the `force_polling` argument is `True`; if `force_polling` is unset (or `None`), we enable
    force polling thus:

    * if the `WATCHFILES_FORCE_POLLING` environment variable exists and is not empty:
       * if the value is `false`, `disable` or `disabled`, force polling is disabled
       * otherwise, force polling is enabled
    * otherwise, we enable force polling only if we detect we're running on WSL (Windows Subsystem for Linux)

    Args:
        *paths: filesystem paths to watch.
        watch_filter: callable used to filter out changes which are not important, you can either use a raw callable
            or a [`BaseFilter`][watchfiles.BaseFilter] instance,
            defaults to an instance of [`DefaultFilter`][watchfiles.DefaultFilter]. To keep all changes, use `None`.
        debounce: maximum time in milliseconds to group changes over before yielding them.
        step: time to wait for new changes in milliseconds, if no changes are detected in this time, and
            at least one change has been detected, the changes are yielded.
        stop_event: event to stop watching, if this is set, the generator will stop iteration,
            this can be anything with an `is_set()` method which returns a bool, e.g. `threading.Event()`.
        rust_timeout: maximum time in milliseconds to wait in the rust code for changes, `0` means no timeout.
        yield_on_timeout: if `True`, the generator will yield upon timeout in rust even if no changes are detected.
        debug: whether to print information about all filesystem changes in rust to stdout.
        raise_interrupt: whether to re-raise `KeyboardInterrupt`s, or suppress the error and just stop iterating.
        force_polling: See [Force polling](#force-polling) above.
        poll_delay_ms: delay between polling for changes, only used if `force_polling=True`.
        recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in the
            top-level directory, default is `True`.
        ignore_permission_denied: if `True`, will ignore permission denied errors, otherwise will raise them by default.
            Setting the `WATCHFILES_IGNORE_PERMISSION_DENIED` environment variable will set this value too.

    Yields:
        The generator yields sets of [`FileChange`][watchfiles.main.FileChange]s.

    ```py title="Example of watch usage"
    from watchfiles import watch

    for changes in watch('./first/dir', './second/dir', raise_interrupt=False):
        print(changes)
    ```
    """
    force_polling = _default_force_polling(force_polling)
    ignore_permission_denied = _default_ignore_permission_denied(ignore_permission_denied)
    with RustNotify(
        [str(p) for p in paths], debug, force_polling, poll_delay_ms, recursive, ignore_permission_denied
    ) as watcher:
        while True:
            raw_changes = watcher.watch(debounce, step, rust_timeout, stop_event)
            if raw_changes == 'timeout':
                if yield_on_timeout:
                    yield set()
                else:
                    logger.debug('rust notify timeout, continuing')
            elif raw_changes == 'signal':
                if raise_interrupt:
                    raise KeyboardInterrupt
                else:
                    logger.warning('KeyboardInterrupt caught, stopping watch')
                    return
            elif raw_changes == 'stop':
                return
            else:
                changes = _prep_changes(raw_changes, watch_filter)
                if changes:
                    _log_changes(changes)
                    yield changes
                else:
                    logger.debug('all changes filtered out, raw_changes=%s', raw_changes)


async def awatch(  # noqa C901
    *paths: Union[Path, str],
    watch_filter: Optional[Callable[[Change, str], bool]] = DefaultFilter(),
    debounce: int = 1_600,
    step: int = 50,
    stop_event: Optional['AnyEvent'] = None,
    rust_timeout: Optional[int] = None,
    yield_on_timeout: bool = False,
    debug: bool = False,
    raise_interrupt: Optional[bool] = None,
    force_polling: Optional[bool] = None,
    poll_delay_ms: int = 300,
    recursive: bool = True,
    ignore_permission_denied: Optional[bool] = None,
) -> AsyncGenerator[Set[FileChange], None]:
    """
    Asynchronous equivalent of [`watch`][watchfiles.watch] using threads to wait for changes.
    Arguments match those of [`watch`][watchfiles.watch] except `stop_event`.

    All async methods use [anyio](https://anyio.readthedocs.io/en/latest/) to run the event loop.

    Unlike [`watch`][watchfiles.watch] `KeyboardInterrupt` cannot be suppressed by `awatch` so they need to be caught
    where `asyncio.run` or equivalent is called.

    Args:
        *paths: filesystem paths to watch.
        watch_filter: matches the same argument of [`watch`][watchfiles.watch].
        debounce: matches the same argument of [`watch`][watchfiles.watch].
        step: matches the same argument of [`watch`][watchfiles.watch].
        stop_event: `anyio.Event` which can be used to stop iteration, see example below.
        rust_timeout: matches the same argument of [`watch`][watchfiles.watch], except that `None` means
            use `1_000` on Windows and `5_000` on other platforms thus helping with exiting on `Ctrl+C` on Windows,
            see [#110](https://github.com/samuelcolvin/watchfiles/issues/110).
        yield_on_timeout: matches the same argument of [`watch`][watchfiles.watch].
        debug: matches the same argument of [`watch`][watchfiles.watch].
        raise_interrupt: This is deprecated, `KeyboardInterrupt` will cause this coroutine to be cancelled and then
            be raised by the top level `asyncio.run` call or equivalent, and should be caught there.
            See [#136](https://github.com/samuelcolvin/watchfiles/issues/136)
        force_polling: if true, always use polling instead of file system notifications, default is `None` where
            `force_polling` is set to `True` if the `WATCHFILES_FORCE_POLLING` environment variable exists.
        poll_delay_ms: delay between polling for changes, only used if `force_polling=True`.
        recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in the
            top-level directory, default is `True`.
        ignore_permission_denied: if `True`, will ignore permission denied errors, otherwise will raise them by default.
            Setting the `WATCHFILES_IGNORE_PERMISSION_DENIED` environment variable will set this value too.

    Yields:
        The generator yields sets of [`FileChange`][watchfiles.main.FileChange]s.

    ```py title="Example of awatch usage"
    import asyncio
    from watchfiles import awatch

    async def main():
        async for changes in awatch('./first/dir', './second/dir'):
            print(changes)

    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print('stopped via KeyboardInterrupt')
    ```

    ```py title="Example of awatch usage with a stop event"
    import asyncio
    from watchfiles import awatch

    async def main():
        stop_event = asyncio.Event()

        async def stop_soon():
            await asyncio.sleep(3)
            stop_event.set()

        stop_soon_task = asyncio.create_task(stop_soon())

        async for changes in awatch('/path/to/dir', stop_event=stop_event):
            print(changes)

        # cleanup by awaiting the (now complete) stop_soon_task
        await stop_soon_task

    asyncio.run(main())
    ```
    """
    if raise_interrupt is not None:
        warnings.warn(
            'raise_interrupt is deprecated, KeyboardInterrupt will cause this coroutine to be cancelled and then '
            'be raised by the top level asyncio.run call or equivalent, and should be caught there. See #136.',
            DeprecationWarning,
        )

    if stop_event is None:
        stop_event_: 'AnyEvent' = anyio.Event()
    else:
        stop_event_ = stop_event

    force_polling = _default_force_polling(force_polling)
    ignore_permission_denied = _default_ignore_permission_denied(ignore_permission_denied)
    with RustNotify(
        [str(p) for p in paths], debug, force_polling, poll_delay_ms, recursive, ignore_permission_denied
    ) as watcher:
        timeout = _calc_async_timeout(rust_timeout)
        CancelledError = anyio.get_cancelled_exc_class()

        while True:
            async with anyio.create_task_group() as tg:
                try:
                    raw_changes = await anyio.to_thread.run_sync(watcher.watch, debounce, step, timeout, stop_event_)
                except (CancelledError, KeyboardInterrupt):
                    stop_event_.set()
                    # suppressing KeyboardInterrupt wouldn't stop it getting raised by the top level asyncio.run call
                    raise
                tg.cancel_scope.cancel()

            if raw_changes == 'timeout':
                if yield_on_timeout:
                    yield set()
                else:
                    logger.debug('rust notify timeout, continuing')
            elif raw_changes == 'stop':
                return
            elif raw_changes == 'signal':
                # in theory the watch thread should never get a signal
                raise RuntimeError('watch thread unexpectedly received a signal')
            else:
                changes = _prep_changes(raw_changes, watch_filter)
                if changes:
                    _log_changes(changes)
                    yield changes
                else:
                    logger.debug('all changes filtered out, raw_changes=%s', raw_changes)


def _prep_changes(
    raw_changes: Set[Tuple[int, str]], watch_filter: Optional[Callable[[Change, str], bool]]
) -> Set[FileChange]:
    # if we wanted to be really snazzy, we could move this into rust
    changes = {(Change(change), path) for change, path in raw_changes}
    if watch_filter:
        changes = {c for c in changes if watch_filter(c[0], c[1])}
    return changes


def _log_changes(changes: Set[FileChange]) -> None:
    if logger.isEnabledFor(logging.INFO):  # pragma: no branch
        count = len(changes)
        plural = '' if count == 1 else 's'
        if logger.isEnabledFor(logging.DEBUG):
            logger.debug('%d change%s detected: %s', count, plural, changes)
        else:
            logger.info('%d change%s detected', count, plural)


def _calc_async_timeout(timeout: Optional[int]) -> int:
    """
    see https://github.com/samuelcolvin/watchfiles/issues/110
    """
    if timeout is None:
        if sys.platform == 'win32':
            return 1_000
        else:
            return 5_000
    else:
        return timeout


def _default_force_polling(force_polling: Optional[bool]) -> bool:
    """
    See docstring for `watch` above for details.

    See samuelcolvin/watchfiles#167 and samuelcolvin/watchfiles#187 for discussion and rationale.
    """
    if force_polling is not None:
        return force_polling
    env_var = os.getenv('WATCHFILES_FORCE_POLLING')
    if env_var:
        return env_var.lower() not in {'false', 'disable', 'disabled'}
    else:
        return _auto_force_polling()


def _auto_force_polling() -> bool:
    """
    Whether to auto-enable force polling, it should be enabled automatically only on WSL.

    See samuelcolvin/watchfiles#187 for discussion.
    """
    import platform

    uname = platform.uname()
    return 'microsoft-standard' in uname.release.lower() and uname.system.lower() == 'linux'


def _default_ignore_permission_denied(ignore_permission_denied: Optional[bool]) -> bool:
    if ignore_permission_denied is not None:
        return ignore_permission_denied
    env_var = os.getenv('WATCHFILES_IGNORE_PERMISSION_DENIED')
    return bool(env_var)