summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/watchfiles/_rust_notify.pyi
blob: 63eacda6a6ad32d2461dd7d0d7763d50bcc3f53d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from typing import Any, List, Literal, Optional, Protocol, Set, Tuple, Union

__all__ = 'RustNotify', 'WatchfilesRustInternalError'

__version__: str
"""The package version as defined in `Cargo.toml`, modified to match python's versioning semantics."""

class AbstractEvent(Protocol):
    def is_set(self) -> bool: ...

class RustNotify:
    """
    Interface to the Rust [notify](https://crates.io/crates/notify) crate which does
    the heavy lifting of watching for file changes and grouping them into events.
    """

    def __init__(
        self,
        watch_paths: List[str],
        debug: bool,
        force_polling: bool,
        poll_delay_ms: int,
        recursive: bool,
        ignore_permission_denied: bool,
    ) -> None:
        """
        Create a new `RustNotify` instance and start a thread to watch for changes.

        `FileNotFoundError` is raised if any of the paths do not exist.

        Args:
            watch_paths: file system paths to watch for changes, can be directories or files
            debug: if true, print details about all events to stderr
            force_polling: if true, always use polling instead of file system notifications
            poll_delay_ms: delay between polling for changes, only used if `force_polling=True`
            recursive: if `True`, watch for changes in sub-directories recursively, otherwise watch only for changes in
                the top-level directory, default is `True`.
            ignore_permission_denied: if `True`, permission denied errors are ignored while watching changes.
        """
    def watch(
        self,
        debounce_ms: int,
        step_ms: int,
        timeout_ms: int,
        stop_event: Optional[AbstractEvent],
    ) -> Union[Set[Tuple[int, str]], Literal['signal', 'stop', 'timeout']]:
        """
        Watch for changes.

        This method will wait `timeout_ms` milliseconds for changes, but once a change is detected,
        it will group changes and return in no more than `debounce_ms` milliseconds.

        The GIL is released during a `step_ms` sleep on each iteration to avoid
        blocking python.

        Args:
            debounce_ms: maximum time in milliseconds to group changes over before returning.
            step_ms: time to wait for new changes in milliseconds, if no changes are detected
                in this time, and at least one change has been detected, the changes are yielded.
            timeout_ms: maximum time in milliseconds to wait for changes before returning,
                `0` means wait indefinitely, `debounce_ms` takes precedence over `timeout_ms` once
                a change is detected.
            stop_event: event to check on every iteration to see if this function should return early.
                The event should be an object which has an `is_set()` method which returns a boolean.

        Returns:
            See below.

        Return values have the following meanings:

        * Change details as a `set` of `(event_type, path)` tuples, the event types are ints which match
          [`Change`][watchfiles.Change], `path` is a string representing the path of the file that changed
        * `'signal'` string, if a signal was received
        * `'stop'` string, if the `stop_event` was set
        * `'timeout'` string, if `timeout_ms` was exceeded
        """
    def __enter__(self) -> 'RustNotify':
        """
        Does nothing, but allows `RustNotify` to be used as a context manager.

        !!! note

            The watching thead is created when an instance is initiated, not on `__enter__`.
        """
    def __exit__(self, *args: Any) -> None:
        """
        Calls [`close`][watchfiles._rust_notify.RustNotify.close].
        """
    def close(self) -> None:
        """
        Stops the watching thread. After `close` is called, the `RustNotify` instance can no
        longer be used, calls to [`watch`][watchfiles._rust_notify.RustNotify.watch] will raise a `RuntimeError`.

        !!! note

            `close` is not required, just deleting the `RustNotify` instance will kill the thread
            implicitly.

            As per [#163](https://github.com/samuelcolvin/watchfiles/issues/163) `close()` is only required because
            in the event of an error, the traceback in `sys.exc_info` keeps a reference to `watchfiles.watch`'s
            frame, so you can't rely on the `RustNotify` object being deleted, and thereby stopping
            the watching thread.
        """

class WatchfilesRustInternalError(RuntimeError):
    """
    Raised when RustNotify encounters an unknown error.

    If you get this a lot, please check [github](https://github.com/samuelcolvin/watchfiles/issues) issues
    and create a new issue if your problem is not discussed.
    """