1""":module: watchdog.watchmedo
2:author: [email protected] (Yesudeep Mangalapilly)
3:author: [email protected] (Mickaël Schoentgen)
4:synopsis: ``watchmedo`` shell script utility.
5"""
6
7from __future__ import annotations
8
9import errno
10import logging
11import os
12import os.path
13import sys
14import time
15from argparse import ArgumentParser, RawDescriptionHelpFormatter
16from io import StringIO
17from textwrap import dedent
18from typing import TYPE_CHECKING, Any
19
20from watchdog.utils import WatchdogShutdownError, load_class, platform
21from watchdog.version import VERSION_STRING
22
23if TYPE_CHECKING:
24    from argparse import Namespace, _SubParsersAction
25    from typing import Callable
26
27    from watchdog.events import FileSystemEventHandler
28    from watchdog.observers import ObserverType
29    from watchdog.observers.api import BaseObserver
30
31
32logging.basicConfig(level=logging.INFO)
33
34CONFIG_KEY_TRICKS = "tricks"
35CONFIG_KEY_PYTHON_PATH = "python-path"
36
37
38class HelpFormatter(RawDescriptionHelpFormatter):
39    """A nicer help formatter.
40
41    Help for arguments can be indented and contain new lines.
42    It will be de-dented and arguments in the help
43    will be separated by a blank line for better readability.
44
45    Source: https://github.com/httpie/httpie/blob/2423f89/httpie/cli/argparser.py#L31
46    """
47
48    def __init__(self, *args: Any, max_help_position: int = 6, **kwargs: Any) -> None:
49        # A smaller indent for args help.
50        kwargs["max_help_position"] = max_help_position
51        super().__init__(*args, **kwargs)
52
53    def _split_lines(self, text: str, width: int) -> list[str]:
54        text = dedent(text).strip() + "\n\n"
55        return text.splitlines()
56
57
58epilog = """\
59Copyright 2018-2024 Mickaël Schoentgen & contributors
60Copyright 2014-2018 Thomas Amland & contributors
61Copyright 2012-2014 Google, Inc.
62Copyright 2011-2012 Yesudeep Mangalapilly
63
64Licensed under the terms of the Apache license, version 2.0. Please see
65LICENSE in the source code for more information."""
66
67cli = ArgumentParser(epilog=epilog, formatter_class=HelpFormatter)
68cli.add_argument("--version", action="version", version=VERSION_STRING)
69subparsers = cli.add_subparsers(dest="top_command")
70command_parsers = {}
71
72Argument = tuple[list[str], Any]
73
74
75def argument(*name_or_flags: str, **kwargs: Any) -> Argument:
76    """Convenience function to properly format arguments to pass to the
77    command decorator.
78    """
79    return list(name_or_flags), kwargs
80
81
82def command(
83    args: list[Argument],
84    *,
85    parent: _SubParsersAction[ArgumentParser] = subparsers,
86    cmd_aliases: list[str] | None = None,
87) -> Callable:
88    """Decorator to define a new command in a sanity-preserving way.
89    The function will be stored in the ``func`` variable when the parser
90    parses arguments so that it can be called directly like so::
91
92      >>> args = cli.parse_args()
93      >>> args.func(args)
94
95    """
96
97    def decorator(func: Callable) -> Callable:
98        name = func.__name__.replace("_", "-")
99        desc = dedent(func.__doc__ or "")
100        parser = parent.add_parser(name, aliases=cmd_aliases or [], description=desc, formatter_class=HelpFormatter)
101        command_parsers[name] = parser
102        verbosity_group = parser.add_mutually_exclusive_group()
103        verbosity_group.add_argument("-q", "--quiet", dest="verbosity", action="append_const", const=-1)
104        verbosity_group.add_argument("-v", "--verbose", dest="verbosity", action="append_const", const=1)
105        for name_or_flags, kwargs in args:
106            parser.add_argument(*name_or_flags, **kwargs)
107            parser.set_defaults(func=func)
108        return func
109
110    return decorator
111
112
113def path_split(pathname_spec: str, *, separator: str = os.pathsep) -> list[str]:
114    """Splits a pathname specification separated by an OS-dependent separator.
115
116    :param pathname_spec:
117        The pathname specification.
118    :param separator:
119        (OS Dependent) `:` on Unix and `;` on Windows or user-specified.
120    """
121    return pathname_spec.split(separator)
122
123
124def add_to_sys_path(pathnames: list[str], *, index: int = 0) -> None:
125    """Adds specified paths at specified index into the sys.path list.
126
127    :param paths:
128        A list of paths to add to the sys.path
129    :param index:
130        (Default 0) The index in the sys.path list where the paths will be
131        added.
132    """
133    for pathname in pathnames[::-1]:
134        sys.path.insert(index, pathname)
135
136
137def load_config(tricks_file_pathname: str) -> dict:
138    """Loads the YAML configuration from the specified file.
139
140    :param tricks_file_path:
141        The path to the tricks configuration file.
142    :returns:
143        A dictionary of configuration information.
144    """
145    import yaml
146
147    with open(tricks_file_pathname, "rb") as f:
148        return yaml.safe_load(f.read())
149
150
151def parse_patterns(
152    patterns_spec: str, ignore_patterns_spec: str, *, separator: str = ";"
153) -> tuple[list[str], list[str]]:
154    """Parses pattern argument specs and returns a two-tuple of
155    (patterns, ignore_patterns).
156    """
157    patterns = patterns_spec.split(separator)
158    ignore_patterns = ignore_patterns_spec.split(separator)
159    if ignore_patterns == [""]:
160        ignore_patterns = []
161    return patterns, ignore_patterns
162
163
164def observe_with(
165    observer: BaseObserver,
166    event_handler: FileSystemEventHandler,
167    pathnames: list[str],
168    *,
169    recursive: bool,
170) -> None:
171    """Single observer thread with a scheduled path and event handler.
172
173    :param observer:
174        The observer thread.
175    :param event_handler:
176        Event handler which will be called in response to file system events.
177    :param pathnames:
178        A list of pathnames to monitor.
179    :param recursive:
180        ``True`` if recursive; ``False`` otherwise.
181    """
182    for pathname in set(pathnames):
183        observer.schedule(event_handler, pathname, recursive=recursive)
184    observer.start()
185    try:
186        while True:
187            time.sleep(1)
188    except WatchdogShutdownError:
189        observer.stop()
190    observer.join()
191
192
193def schedule_tricks(observer: BaseObserver, tricks: list[dict], pathname: str, *, recursive: bool) -> None:
194    """Schedules tricks with the specified observer and for the given watch
195    path.
196
197    :param observer:
198        The observer thread into which to schedule the trick and watch.
199    :param tricks:
200        A list of tricks.
201    :param pathname:
202        A path name which should be watched.
203    :param recursive:
204        ``True`` if recursive; ``False`` otherwise.
205    """
206    for trick in tricks:
207        for name, value in trick.items():
208            trick_cls = load_class(name)
209            handler = trick_cls(**value)
210            trick_pathname = getattr(handler, "source_directory", None) or pathname
211            observer.schedule(handler, trick_pathname, recursive=recursive)
212
213
214@command(
215    [
216        argument("files", nargs="*", help="perform tricks from given file"),
217        argument(
218            "--python-path",
219            default=".",
220            help=f"Paths separated by {os.pathsep!r} to add to the Python path.",
221        ),
222        argument(
223            "--interval",
224            "--timeout",
225            dest="timeout",
226            default=1.0,
227            type=float,
228            help="Use this as the polling interval/blocking timeout (in seconds).",
229        ),
230        argument(
231            "--recursive",
232            action="store_true",
233            default=True,
234            help="Recursively monitor paths (defaults to True).",
235        ),
236        argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."),
237        argument(
238            "--debug-force-kqueue",
239            action="store_true",
240            help="[debug] Forces BSD kqueue(2).",
241        ),
242        argument(
243            "--debug-force-winapi",
244            action="store_true",
245            help="[debug] Forces Windows API.",
246        ),
247        argument(
248            "--debug-force-fsevents",
249            action="store_true",
250            help="[debug] Forces macOS FSEvents.",
251        ),
252        argument(
253            "--debug-force-inotify",
254            action="store_true",
255            help="[debug] Forces Linux inotify(7).",
256        ),
257    ],
258    cmd_aliases=["tricks"],
259)
260def tricks_from(args: Namespace) -> None:
261    """Command to execute tricks from a tricks configuration file."""
262    observer_cls: ObserverType
263    if args.debug_force_polling:
264        from watchdog.observers.polling import PollingObserver
265
266        observer_cls = PollingObserver
267    elif args.debug_force_kqueue:
268        from watchdog.observers.kqueue import KqueueObserver
269
270        observer_cls = KqueueObserver
271    elif (not TYPE_CHECKING and args.debug_force_winapi) or (TYPE_CHECKING and platform.is_windows()):
272        from watchdog.observers.read_directory_changes import WindowsApiObserver
273
274        observer_cls = WindowsApiObserver
275    elif args.debug_force_inotify:
276        from watchdog.observers.inotify import InotifyObserver
277
278        observer_cls = InotifyObserver
279    elif args.debug_force_fsevents:
280        from watchdog.observers.fsevents import FSEventsObserver
281
282        observer_cls = FSEventsObserver
283    else:
284        # Automatically picks the most appropriate observer for the platform
285        # on which it is running.
286        from watchdog.observers import Observer
287
288        observer_cls = Observer
289
290    add_to_sys_path(path_split(args.python_path))
291    observers = []
292    for tricks_file in args.files:
293        observer = observer_cls(timeout=args.timeout)
294
295        if not os.path.exists(tricks_file):
296            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), tricks_file)
297
298        config = load_config(tricks_file)
299
300        try:
301            tricks = config[CONFIG_KEY_TRICKS]
302        except KeyError as e:
303            error = f"No {CONFIG_KEY_TRICKS!r} key specified in {tricks_file!r}."
304            raise KeyError(error) from e
305
306        if CONFIG_KEY_PYTHON_PATH in config:
307            add_to_sys_path(config[CONFIG_KEY_PYTHON_PATH])
308
309        dir_path = os.path.dirname(tricks_file) or os.path.relpath(os.getcwd())
310        schedule_tricks(observer, tricks, dir_path, recursive=args.recursive)
311        observer.start()
312        observers.append(observer)
313
314    try:
315        while True:
316            time.sleep(1)
317    except WatchdogShutdownError:
318        for o in observers:
319            o.unschedule_all()
320            o.stop()
321    for o in observers:
322        o.join()
323
324
325@command(
326    [
327        argument(
328            "trick_paths",
329            nargs="*",
330            help="Dotted paths for all the tricks you want to generate.",
331        ),
332        argument(
333            "--python-path",
334            default=".",
335            help=f"Paths separated by {os.pathsep!r} to add to the Python path.",
336        ),
337        argument(
338            "--append-to-file",
339            default=None,
340            help="""
341                   Appends the generated tricks YAML to a file.
342                   If not specified, prints to standard output.""",
343        ),
344        argument(
345            "-a",
346            "--append-only",
347            dest="append_only",
348            action="store_true",
349            help="""
350                   If --append-to-file is not specified, produces output for
351                   appending instead of a complete tricks YAML file.""",
352        ),
353    ],
354    cmd_aliases=["generate-tricks-yaml"],
355)
356def tricks_generate_yaml(args: Namespace) -> None:
357    """Command to generate Yaml configuration for tricks named on the command line."""
358    import yaml
359
360    python_paths = path_split(args.python_path)
361    add_to_sys_path(python_paths)
362    output = StringIO()
363
364    for trick_path in args.trick_paths:
365        trick_cls = load_class(trick_path)
366        output.write(trick_cls.generate_yaml())
367
368    content = output.getvalue()
369    output.close()
370
371    header = yaml.dump({CONFIG_KEY_PYTHON_PATH: python_paths})
372    header += f"{CONFIG_KEY_TRICKS}:\n"
373    if args.append_to_file is None:
374        # Output to standard output.
375        if not args.append_only:
376            content = header + content
377        sys.stdout.write(content)
378    else:
379        if not os.path.exists(args.append_to_file):
380            content = header + content
381        with open(args.append_to_file, "a", encoding="utf-8") as file:
382            file.write(content)
383
384
385@command(
386    [
387        argument(
388            "directories",
389            nargs="*",
390            default=".",
391            help="Directories to watch. (default: '.').",
392        ),
393        argument(
394            "-p",
395            "--pattern",
396            "--patterns",
397            dest="patterns",
398            default="*",
399            help="Matches event paths with these patterns (separated by ;).",
400        ),
401        argument(
402            "-i",
403            "--ignore-pattern",
404            "--ignore-patterns",
405            dest="ignore_patterns",
406            default="",
407            help="Ignores event paths with these patterns (separated by ;).",
408        ),
409        argument(
410            "-D",
411            "--ignore-directories",
412            dest="ignore_directories",
413            action="store_true",
414            help="Ignores events for directories.",
415        ),
416        argument(
417            "-R",
418            "--recursive",
419            dest="recursive",
420            action="store_true",
421            help="Monitors the directories recursively.",
422        ),
423        argument(
424            "--interval",
425            "--timeout",
426            dest="timeout",
427            default=1.0,
428            type=float,
429            help="Use this as the polling interval/blocking timeout.",
430        ),
431        argument("--trace", action="store_true", help="Dumps complete dispatching trace."),
432        argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."),
433        argument(
434            "--debug-force-kqueue",
435            action="store_true",
436            help="[debug] Forces BSD kqueue(2).",
437        ),
438        argument(
439            "--debug-force-winapi",
440            action="store_true",
441            help="[debug] Forces Windows API.",
442        ),
443        argument(
444            "--debug-force-fsevents",
445            action="store_true",
446            help="[debug] Forces macOS FSEvents.",
447        ),
448        argument(
449            "--debug-force-inotify",
450            action="store_true",
451            help="[debug] Forces Linux inotify(7).",
452        ),
453    ],
454)
455def log(args: Namespace) -> None:
456    """Command to log file system events to the console."""
457    from watchdog.tricks import LoggerTrick
458    from watchdog.utils import echo
459
460    if args.trace:
461        class_module_logger = logging.getLogger(LoggerTrick.__module__)
462        echo.echo_class(LoggerTrick, write=lambda msg: class_module_logger.info(msg))
463
464    patterns, ignore_patterns = parse_patterns(args.patterns, args.ignore_patterns)
465    handler = LoggerTrick(
466        patterns=patterns,
467        ignore_patterns=ignore_patterns,
468        ignore_directories=args.ignore_directories,
469    )
470
471    observer_cls: ObserverType
472    if args.debug_force_polling:
473        from watchdog.observers.polling import PollingObserver
474
475        observer_cls = PollingObserver
476    elif args.debug_force_kqueue:
477        from watchdog.observers.kqueue import KqueueObserver
478
479        observer_cls = KqueueObserver
480    elif (not TYPE_CHECKING and args.debug_force_winapi) or (TYPE_CHECKING and platform.is_windows()):
481        from watchdog.observers.read_directory_changes import WindowsApiObserver
482
483        observer_cls = WindowsApiObserver
484    elif args.debug_force_inotify:
485        from watchdog.observers.inotify import InotifyObserver
486
487        observer_cls = InotifyObserver
488    elif args.debug_force_fsevents:
489        from watchdog.observers.fsevents import FSEventsObserver
490
491        observer_cls = FSEventsObserver
492    else:
493        # Automatically picks the most appropriate observer for the platform
494        # on which it is running.
495        from watchdog.observers import Observer
496
497        observer_cls = Observer
498
499    observer = observer_cls(timeout=args.timeout)
500    observe_with(observer, handler, args.directories, recursive=args.recursive)
501
502
503@command(
504    [
505        argument("directories", nargs="*", default=".", help="Directories to watch."),
506        argument(
507            "-c",
508            "--command",
509            dest="command",
510            default=None,
511            help="""
512    Shell command executed in response to matching events.
513    These interpolation variables are available to your command string:
514
515        ${watch_src_path}   - event source path
516        ${watch_dest_path}  - event destination path (for moved events)
517        ${watch_event_type} - event type
518        ${watch_object}     - 'file' or 'directory'
519
520    Note:
521        Please ensure you do not use double quotes (") to quote
522        your command string. That will force your shell to
523        interpolate before the command is processed by this
524        command.
525
526    Example:
527
528        --command='echo "${watch_src_path}"'
529    """,
530        ),
531        argument(
532            "-p",
533            "--pattern",
534            "--patterns",
535            dest="patterns",
536            default="*",
537            help="Matches event paths with these patterns (separated by ;).",
538        ),
539        argument(
540            "-i",
541            "--ignore-pattern",
542            "--ignore-patterns",
543            dest="ignore_patterns",
544            default="",
545            help="Ignores event paths with these patterns (separated by ;).",
546        ),
547        argument(
548            "-D",
549            "--ignore-directories",
550            dest="ignore_directories",
551            default=False,
552            action="store_true",
553            help="Ignores events for directories.",
554        ),
555        argument(
556            "-R",
557            "--recursive",
558            dest="recursive",
559            action="store_true",
560            help="Monitors the directories recursively.",
561        ),
562        argument(
563            "--interval",
564            "--timeout",
565            dest="timeout",
566            default=1.0,
567            type=float,
568            help="Use this as the polling interval/blocking timeout.",
569        ),
570        argument(
571            "-w",
572            "--wait",
573            dest="wait_for_process",
574            action="store_true",
575            help="Wait for process to finish to avoid multiple simultaneous instances.",
576        ),
577        argument(
578            "-W",
579            "--drop",
580            dest="drop_during_process",
581            action="store_true",
582            help="Ignore events that occur while command is still being"
583            " executed to avoid multiple simultaneous instances.",
584        ),
585        argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."),
586    ],
587)
588def shell_command(args: Namespace) -> None:
589    """Command to execute shell commands in response to file system events."""
590    from watchdog.tricks import ShellCommandTrick
591
592    if not args.command:
593        args.command = None
594
595    observer_cls: ObserverType
596    if args.debug_force_polling:
597        from watchdog.observers.polling import PollingObserver
598
599        observer_cls = PollingObserver
600    else:
601        from watchdog.observers import Observer
602
603        observer_cls = Observer
604
605    patterns, ignore_patterns = parse_patterns(args.patterns, args.ignore_patterns)
606    handler = ShellCommandTrick(
607        args.command,
608        patterns=patterns,
609        ignore_patterns=ignore_patterns,
610        ignore_directories=args.ignore_directories,
611        wait_for_process=args.wait_for_process,
612        drop_during_process=args.drop_during_process,
613    )
614    observer = observer_cls(timeout=args.timeout)
615    observe_with(observer, handler, args.directories, recursive=args.recursive)
616
617
618@command(
619    [
620        argument("command", help="Long-running command to run in a subprocess."),
621        argument(
622            "command_args",
623            metavar="arg",
624            nargs="*",
625            help="""
626    Command arguments.
627
628    Note: Use -- before the command arguments, otherwise watchmedo will
629    try to interpret them.
630    """,
631        ),
632        argument(
633            "-d",
634            "--directory",
635            dest="directories",
636            metavar="DIRECTORY",
637            action="append",
638            help="Directory to watch. Use another -d or --directory option for each directory.",
639        ),
640        argument(
641            "-p",
642            "--pattern",
643            "--patterns",
644            dest="patterns",
645            default="*",
646            help="Matches event paths with these patterns (separated by ;).",
647        ),
648        argument(
649            "-i",
650            "--ignore-pattern",
651            "--ignore-patterns",
652            dest="ignore_patterns",
653            default="",
654            help="Ignores event paths with these patterns (separated by ;).",
655        ),
656        argument(
657            "-D",
658            "--ignore-directories",
659            dest="ignore_directories",
660            default=False,
661            action="store_true",
662            help="Ignores events for directories.",
663        ),
664        argument(
665            "-R",
666            "--recursive",
667            dest="recursive",
668            action="store_true",
669            help="Monitors the directories recursively.",
670        ),
671        argument(
672            "--interval",
673            "--timeout",
674            dest="timeout",
675            default=1.0,
676            type=float,
677            help="Use this as the polling interval/blocking timeout.",
678        ),
679        argument(
680            "--signal",
681            dest="signal",
682            default="SIGINT",
683            help="Stop the subprocess with this signal (default SIGINT).",
684        ),
685        argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."),
686        argument(
687            "--kill-after",
688            dest="kill_after",
689            default=10.0,
690            type=float,
691            help="When stopping, kill the subprocess after the specified timeout in seconds (default 10.0).",
692        ),
693        argument(
694            "--debounce-interval",
695            dest="debounce_interval",
696            default=0.0,
697            type=float,
698            help="After a file change, Wait until the specified interval (in "
699            "seconds) passes with no file changes, and only then restart.",
700        ),
701        argument(
702            "--no-restart-on-command-exit",
703            dest="restart_on_command_exit",
704            default=True,
705            action="store_false",
706            help="Don't auto-restart the command after it exits.",
707        ),
708    ],
709)
710def auto_restart(args: Namespace) -> None:
711    """Command to start a long-running subprocess and restart it on matched events."""
712    observer_cls: ObserverType
713    if args.debug_force_polling:
714        from watchdog.observers.polling import PollingObserver
715
716        observer_cls = PollingObserver
717    else:
718        from watchdog.observers import Observer
719
720        observer_cls = Observer
721
722    import signal
723
724    from watchdog.tricks import AutoRestartTrick
725
726    if not args.directories:
727        args.directories = ["."]
728
729    # Allow either signal name or number.
730    stop_signal = getattr(signal, args.signal) if args.signal.startswith("SIG") else int(args.signal)
731
732    # Handle termination signals by raising a semantic exception which will
733    # allow us to gracefully unwind and stop the observer
734    termination_signals = {signal.SIGTERM, signal.SIGINT}
735
736    if hasattr(signal, "SIGHUP"):
737        termination_signals.add(signal.SIGHUP)
738
739    def handler_termination_signal(_signum: signal._SIGNUM, _frame: object) -> None:
740        # Neuter all signals so that we don't attempt a double shutdown
741        for signum in termination_signals:
742            signal.signal(signum, signal.SIG_IGN)
743        raise WatchdogShutdownError
744
745    for signum in termination_signals:
746        signal.signal(signum, handler_termination_signal)
747
748    patterns, ignore_patterns = parse_patterns(args.patterns, args.ignore_patterns)
749    command = [args.command]
750    command.extend(args.command_args)
751    handler = AutoRestartTrick(
752        command,
753        patterns=patterns,
754        ignore_patterns=ignore_patterns,
755        ignore_directories=args.ignore_directories,
756        stop_signal=stop_signal,
757        kill_after=args.kill_after,
758        debounce_interval_seconds=args.debounce_interval,
759        restart_on_command_exit=args.restart_on_command_exit,
760    )
761    handler.start()
762    observer = observer_cls(timeout=args.timeout)
763    try:
764        observe_with(observer, handler, args.directories, recursive=args.recursive)
765    except WatchdogShutdownError:
766        pass
767    finally:
768        handler.stop()
769
770
771class LogLevelError(Exception):
772    pass
773
774
775def _get_log_level_from_args(args: Namespace) -> str:
776    verbosity = sum(args.verbosity or [])
777    if verbosity < -1:
778        error = "-q/--quiet may be specified only once."
779        raise LogLevelError(error)
780    if verbosity > 2:
781        error = "-v/--verbose may be specified up to 2 times."
782        raise LogLevelError(error)
783    return ["ERROR", "WARNING", "INFO", "DEBUG"][1 + verbosity]
784
785
786def main() -> int:
787    """Entry-point function."""
788    args = cli.parse_args()
789    if args.top_command is None:
790        cli.print_help()
791        return 1
792
793    try:
794        log_level = _get_log_level_from_args(args)
795    except LogLevelError as exc:
796        print(f"Error: {exc.args[0]}", file=sys.stderr)  # noqa:T201
797        command_parsers[args.top_command].print_help()
798        return 1
799    logging.getLogger("watchdog").setLevel(log_level)
800
801    try:
802        args.func(args)
803    except KeyboardInterrupt:
804        return 130
805
806    return 0
807
808
809if __name__ == "__main__":
810    sys.exit(main())
811