1""":module: watchdog.watchmedo 2:author: [email protected] (Yesudeep Mangalapilly) 3:author: [email protected] (Mickaël Schoentgen) 4:synopsis: ``watchmedo`` shell script utility. 5""" 6 7from __future__ import annotations 8 9import errno 10import logging 11import os 12import os.path 13import sys 14import time 15from argparse import ArgumentParser, RawDescriptionHelpFormatter 16from io import StringIO 17from textwrap import dedent 18from typing import TYPE_CHECKING, Any 19 20from watchdog.utils import WatchdogShutdownError, load_class, platform 21from watchdog.version import VERSION_STRING 22 23if TYPE_CHECKING: 24 from argparse import Namespace, _SubParsersAction 25 from typing import Callable 26 27 from watchdog.events import FileSystemEventHandler 28 from watchdog.observers import ObserverType 29 from watchdog.observers.api import BaseObserver 30 31 32logging.basicConfig(level=logging.INFO) 33 34CONFIG_KEY_TRICKS = "tricks" 35CONFIG_KEY_PYTHON_PATH = "python-path" 36 37 38class HelpFormatter(RawDescriptionHelpFormatter): 39 """A nicer help formatter. 40 41 Help for arguments can be indented and contain new lines. 42 It will be de-dented and arguments in the help 43 will be separated by a blank line for better readability. 44 45 Source: https://github.com/httpie/httpie/blob/2423f89/httpie/cli/argparser.py#L31 46 """ 47 48 def __init__(self, *args: Any, max_help_position: int = 6, **kwargs: Any) -> None: 49 # A smaller indent for args help. 50 kwargs["max_help_position"] = max_help_position 51 super().__init__(*args, **kwargs) 52 53 def _split_lines(self, text: str, width: int) -> list[str]: 54 text = dedent(text).strip() + "\n\n" 55 return text.splitlines() 56 57 58epilog = """\ 59Copyright 2018-2024 Mickaël Schoentgen & contributors 60Copyright 2014-2018 Thomas Amland & contributors 61Copyright 2012-2014 Google, Inc. 62Copyright 2011-2012 Yesudeep Mangalapilly 63 64Licensed under the terms of the Apache license, version 2.0. Please see 65LICENSE in the source code for more information.""" 66 67cli = ArgumentParser(epilog=epilog, formatter_class=HelpFormatter) 68cli.add_argument("--version", action="version", version=VERSION_STRING) 69subparsers = cli.add_subparsers(dest="top_command") 70command_parsers = {} 71 72Argument = tuple[list[str], Any] 73 74 75def argument(*name_or_flags: str, **kwargs: Any) -> Argument: 76 """Convenience function to properly format arguments to pass to the 77 command decorator. 78 """ 79 return list(name_or_flags), kwargs 80 81 82def command( 83 args: list[Argument], 84 *, 85 parent: _SubParsersAction[ArgumentParser] = subparsers, 86 cmd_aliases: list[str] | None = None, 87) -> Callable: 88 """Decorator to define a new command in a sanity-preserving way. 89 The function will be stored in the ``func`` variable when the parser 90 parses arguments so that it can be called directly like so:: 91 92 >>> args = cli.parse_args() 93 >>> args.func(args) 94 95 """ 96 97 def decorator(func: Callable) -> Callable: 98 name = func.__name__.replace("_", "-") 99 desc = dedent(func.__doc__ or "") 100 parser = parent.add_parser(name, aliases=cmd_aliases or [], description=desc, formatter_class=HelpFormatter) 101 command_parsers[name] = parser 102 verbosity_group = parser.add_mutually_exclusive_group() 103 verbosity_group.add_argument("-q", "--quiet", dest="verbosity", action="append_const", const=-1) 104 verbosity_group.add_argument("-v", "--verbose", dest="verbosity", action="append_const", const=1) 105 for name_or_flags, kwargs in args: 106 parser.add_argument(*name_or_flags, **kwargs) 107 parser.set_defaults(func=func) 108 return func 109 110 return decorator 111 112 113def path_split(pathname_spec: str, *, separator: str = os.pathsep) -> list[str]: 114 """Splits a pathname specification separated by an OS-dependent separator. 115 116 :param pathname_spec: 117 The pathname specification. 118 :param separator: 119 (OS Dependent) `:` on Unix and `;` on Windows or user-specified. 120 """ 121 return pathname_spec.split(separator) 122 123 124def add_to_sys_path(pathnames: list[str], *, index: int = 0) -> None: 125 """Adds specified paths at specified index into the sys.path list. 126 127 :param paths: 128 A list of paths to add to the sys.path 129 :param index: 130 (Default 0) The index in the sys.path list where the paths will be 131 added. 132 """ 133 for pathname in pathnames[::-1]: 134 sys.path.insert(index, pathname) 135 136 137def load_config(tricks_file_pathname: str) -> dict: 138 """Loads the YAML configuration from the specified file. 139 140 :param tricks_file_path: 141 The path to the tricks configuration file. 142 :returns: 143 A dictionary of configuration information. 144 """ 145 import yaml 146 147 with open(tricks_file_pathname, "rb") as f: 148 return yaml.safe_load(f.read()) 149 150 151def parse_patterns( 152 patterns_spec: str, ignore_patterns_spec: str, *, separator: str = ";" 153) -> tuple[list[str], list[str]]: 154 """Parses pattern argument specs and returns a two-tuple of 155 (patterns, ignore_patterns). 156 """ 157 patterns = patterns_spec.split(separator) 158 ignore_patterns = ignore_patterns_spec.split(separator) 159 if ignore_patterns == [""]: 160 ignore_patterns = [] 161 return patterns, ignore_patterns 162 163 164def observe_with( 165 observer: BaseObserver, 166 event_handler: FileSystemEventHandler, 167 pathnames: list[str], 168 *, 169 recursive: bool, 170) -> None: 171 """Single observer thread with a scheduled path and event handler. 172 173 :param observer: 174 The observer thread. 175 :param event_handler: 176 Event handler which will be called in response to file system events. 177 :param pathnames: 178 A list of pathnames to monitor. 179 :param recursive: 180 ``True`` if recursive; ``False`` otherwise. 181 """ 182 for pathname in set(pathnames): 183 observer.schedule(event_handler, pathname, recursive=recursive) 184 observer.start() 185 try: 186 while True: 187 time.sleep(1) 188 except WatchdogShutdownError: 189 observer.stop() 190 observer.join() 191 192 193def schedule_tricks(observer: BaseObserver, tricks: list[dict], pathname: str, *, recursive: bool) -> None: 194 """Schedules tricks with the specified observer and for the given watch 195 path. 196 197 :param observer: 198 The observer thread into which to schedule the trick and watch. 199 :param tricks: 200 A list of tricks. 201 :param pathname: 202 A path name which should be watched. 203 :param recursive: 204 ``True`` if recursive; ``False`` otherwise. 205 """ 206 for trick in tricks: 207 for name, value in trick.items(): 208 trick_cls = load_class(name) 209 handler = trick_cls(**value) 210 trick_pathname = getattr(handler, "source_directory", None) or pathname 211 observer.schedule(handler, trick_pathname, recursive=recursive) 212 213 214@command( 215 [ 216 argument("files", nargs="*", help="perform tricks from given file"), 217 argument( 218 "--python-path", 219 default=".", 220 help=f"Paths separated by {os.pathsep!r} to add to the Python path.", 221 ), 222 argument( 223 "--interval", 224 "--timeout", 225 dest="timeout", 226 default=1.0, 227 type=float, 228 help="Use this as the polling interval/blocking timeout (in seconds).", 229 ), 230 argument( 231 "--recursive", 232 action="store_true", 233 default=True, 234 help="Recursively monitor paths (defaults to True).", 235 ), 236 argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."), 237 argument( 238 "--debug-force-kqueue", 239 action="store_true", 240 help="[debug] Forces BSD kqueue(2).", 241 ), 242 argument( 243 "--debug-force-winapi", 244 action="store_true", 245 help="[debug] Forces Windows API.", 246 ), 247 argument( 248 "--debug-force-fsevents", 249 action="store_true", 250 help="[debug] Forces macOS FSEvents.", 251 ), 252 argument( 253 "--debug-force-inotify", 254 action="store_true", 255 help="[debug] Forces Linux inotify(7).", 256 ), 257 ], 258 cmd_aliases=["tricks"], 259) 260def tricks_from(args: Namespace) -> None: 261 """Command to execute tricks from a tricks configuration file.""" 262 observer_cls: ObserverType 263 if args.debug_force_polling: 264 from watchdog.observers.polling import PollingObserver 265 266 observer_cls = PollingObserver 267 elif args.debug_force_kqueue: 268 from watchdog.observers.kqueue import KqueueObserver 269 270 observer_cls = KqueueObserver 271 elif (not TYPE_CHECKING and args.debug_force_winapi) or (TYPE_CHECKING and platform.is_windows()): 272 from watchdog.observers.read_directory_changes import WindowsApiObserver 273 274 observer_cls = WindowsApiObserver 275 elif args.debug_force_inotify: 276 from watchdog.observers.inotify import InotifyObserver 277 278 observer_cls = InotifyObserver 279 elif args.debug_force_fsevents: 280 from watchdog.observers.fsevents import FSEventsObserver 281 282 observer_cls = FSEventsObserver 283 else: 284 # Automatically picks the most appropriate observer for the platform 285 # on which it is running. 286 from watchdog.observers import Observer 287 288 observer_cls = Observer 289 290 add_to_sys_path(path_split(args.python_path)) 291 observers = [] 292 for tricks_file in args.files: 293 observer = observer_cls(timeout=args.timeout) 294 295 if not os.path.exists(tricks_file): 296 raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), tricks_file) 297 298 config = load_config(tricks_file) 299 300 try: 301 tricks = config[CONFIG_KEY_TRICKS] 302 except KeyError as e: 303 error = f"No {CONFIG_KEY_TRICKS!r} key specified in {tricks_file!r}." 304 raise KeyError(error) from e 305 306 if CONFIG_KEY_PYTHON_PATH in config: 307 add_to_sys_path(config[CONFIG_KEY_PYTHON_PATH]) 308 309 dir_path = os.path.dirname(tricks_file) or os.path.relpath(os.getcwd()) 310 schedule_tricks(observer, tricks, dir_path, recursive=args.recursive) 311 observer.start() 312 observers.append(observer) 313 314 try: 315 while True: 316 time.sleep(1) 317 except WatchdogShutdownError: 318 for o in observers: 319 o.unschedule_all() 320 o.stop() 321 for o in observers: 322 o.join() 323 324 325@command( 326 [ 327 argument( 328 "trick_paths", 329 nargs="*", 330 help="Dotted paths for all the tricks you want to generate.", 331 ), 332 argument( 333 "--python-path", 334 default=".", 335 help=f"Paths separated by {os.pathsep!r} to add to the Python path.", 336 ), 337 argument( 338 "--append-to-file", 339 default=None, 340 help=""" 341 Appends the generated tricks YAML to a file. 342 If not specified, prints to standard output.""", 343 ), 344 argument( 345 "-a", 346 "--append-only", 347 dest="append_only", 348 action="store_true", 349 help=""" 350 If --append-to-file is not specified, produces output for 351 appending instead of a complete tricks YAML file.""", 352 ), 353 ], 354 cmd_aliases=["generate-tricks-yaml"], 355) 356def tricks_generate_yaml(args: Namespace) -> None: 357 """Command to generate Yaml configuration for tricks named on the command line.""" 358 import yaml 359 360 python_paths = path_split(args.python_path) 361 add_to_sys_path(python_paths) 362 output = StringIO() 363 364 for trick_path in args.trick_paths: 365 trick_cls = load_class(trick_path) 366 output.write(trick_cls.generate_yaml()) 367 368 content = output.getvalue() 369 output.close() 370 371 header = yaml.dump({CONFIG_KEY_PYTHON_PATH: python_paths}) 372 header += f"{CONFIG_KEY_TRICKS}:\n" 373 if args.append_to_file is None: 374 # Output to standard output. 375 if not args.append_only: 376 content = header + content 377 sys.stdout.write(content) 378 else: 379 if not os.path.exists(args.append_to_file): 380 content = header + content 381 with open(args.append_to_file, "a", encoding="utf-8") as file: 382 file.write(content) 383 384 385@command( 386 [ 387 argument( 388 "directories", 389 nargs="*", 390 default=".", 391 help="Directories to watch. (default: '.').", 392 ), 393 argument( 394 "-p", 395 "--pattern", 396 "--patterns", 397 dest="patterns", 398 default="*", 399 help="Matches event paths with these patterns (separated by ;).", 400 ), 401 argument( 402 "-i", 403 "--ignore-pattern", 404 "--ignore-patterns", 405 dest="ignore_patterns", 406 default="", 407 help="Ignores event paths with these patterns (separated by ;).", 408 ), 409 argument( 410 "-D", 411 "--ignore-directories", 412 dest="ignore_directories", 413 action="store_true", 414 help="Ignores events for directories.", 415 ), 416 argument( 417 "-R", 418 "--recursive", 419 dest="recursive", 420 action="store_true", 421 help="Monitors the directories recursively.", 422 ), 423 argument( 424 "--interval", 425 "--timeout", 426 dest="timeout", 427 default=1.0, 428 type=float, 429 help="Use this as the polling interval/blocking timeout.", 430 ), 431 argument("--trace", action="store_true", help="Dumps complete dispatching trace."), 432 argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."), 433 argument( 434 "--debug-force-kqueue", 435 action="store_true", 436 help="[debug] Forces BSD kqueue(2).", 437 ), 438 argument( 439 "--debug-force-winapi", 440 action="store_true", 441 help="[debug] Forces Windows API.", 442 ), 443 argument( 444 "--debug-force-fsevents", 445 action="store_true", 446 help="[debug] Forces macOS FSEvents.", 447 ), 448 argument( 449 "--debug-force-inotify", 450 action="store_true", 451 help="[debug] Forces Linux inotify(7).", 452 ), 453 ], 454) 455def log(args: Namespace) -> None: 456 """Command to log file system events to the console.""" 457 from watchdog.tricks import LoggerTrick 458 from watchdog.utils import echo 459 460 if args.trace: 461 class_module_logger = logging.getLogger(LoggerTrick.__module__) 462 echo.echo_class(LoggerTrick, write=lambda msg: class_module_logger.info(msg)) 463 464 patterns, ignore_patterns = parse_patterns(args.patterns, args.ignore_patterns) 465 handler = LoggerTrick( 466 patterns=patterns, 467 ignore_patterns=ignore_patterns, 468 ignore_directories=args.ignore_directories, 469 ) 470 471 observer_cls: ObserverType 472 if args.debug_force_polling: 473 from watchdog.observers.polling import PollingObserver 474 475 observer_cls = PollingObserver 476 elif args.debug_force_kqueue: 477 from watchdog.observers.kqueue import KqueueObserver 478 479 observer_cls = KqueueObserver 480 elif (not TYPE_CHECKING and args.debug_force_winapi) or (TYPE_CHECKING and platform.is_windows()): 481 from watchdog.observers.read_directory_changes import WindowsApiObserver 482 483 observer_cls = WindowsApiObserver 484 elif args.debug_force_inotify: 485 from watchdog.observers.inotify import InotifyObserver 486 487 observer_cls = InotifyObserver 488 elif args.debug_force_fsevents: 489 from watchdog.observers.fsevents import FSEventsObserver 490 491 observer_cls = FSEventsObserver 492 else: 493 # Automatically picks the most appropriate observer for the platform 494 # on which it is running. 495 from watchdog.observers import Observer 496 497 observer_cls = Observer 498 499 observer = observer_cls(timeout=args.timeout) 500 observe_with(observer, handler, args.directories, recursive=args.recursive) 501 502 503@command( 504 [ 505 argument("directories", nargs="*", default=".", help="Directories to watch."), 506 argument( 507 "-c", 508 "--command", 509 dest="command", 510 default=None, 511 help=""" 512 Shell command executed in response to matching events. 513 These interpolation variables are available to your command string: 514 515 ${watch_src_path} - event source path 516 ${watch_dest_path} - event destination path (for moved events) 517 ${watch_event_type} - event type 518 ${watch_object} - 'file' or 'directory' 519 520 Note: 521 Please ensure you do not use double quotes (") to quote 522 your command string. That will force your shell to 523 interpolate before the command is processed by this 524 command. 525 526 Example: 527 528 --command='echo "${watch_src_path}"' 529 """, 530 ), 531 argument( 532 "-p", 533 "--pattern", 534 "--patterns", 535 dest="patterns", 536 default="*", 537 help="Matches event paths with these patterns (separated by ;).", 538 ), 539 argument( 540 "-i", 541 "--ignore-pattern", 542 "--ignore-patterns", 543 dest="ignore_patterns", 544 default="", 545 help="Ignores event paths with these patterns (separated by ;).", 546 ), 547 argument( 548 "-D", 549 "--ignore-directories", 550 dest="ignore_directories", 551 default=False, 552 action="store_true", 553 help="Ignores events for directories.", 554 ), 555 argument( 556 "-R", 557 "--recursive", 558 dest="recursive", 559 action="store_true", 560 help="Monitors the directories recursively.", 561 ), 562 argument( 563 "--interval", 564 "--timeout", 565 dest="timeout", 566 default=1.0, 567 type=float, 568 help="Use this as the polling interval/blocking timeout.", 569 ), 570 argument( 571 "-w", 572 "--wait", 573 dest="wait_for_process", 574 action="store_true", 575 help="Wait for process to finish to avoid multiple simultaneous instances.", 576 ), 577 argument( 578 "-W", 579 "--drop", 580 dest="drop_during_process", 581 action="store_true", 582 help="Ignore events that occur while command is still being" 583 " executed to avoid multiple simultaneous instances.", 584 ), 585 argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."), 586 ], 587) 588def shell_command(args: Namespace) -> None: 589 """Command to execute shell commands in response to file system events.""" 590 from watchdog.tricks import ShellCommandTrick 591 592 if not args.command: 593 args.command = None 594 595 observer_cls: ObserverType 596 if args.debug_force_polling: 597 from watchdog.observers.polling import PollingObserver 598 599 observer_cls = PollingObserver 600 else: 601 from watchdog.observers import Observer 602 603 observer_cls = Observer 604 605 patterns, ignore_patterns = parse_patterns(args.patterns, args.ignore_patterns) 606 handler = ShellCommandTrick( 607 args.command, 608 patterns=patterns, 609 ignore_patterns=ignore_patterns, 610 ignore_directories=args.ignore_directories, 611 wait_for_process=args.wait_for_process, 612 drop_during_process=args.drop_during_process, 613 ) 614 observer = observer_cls(timeout=args.timeout) 615 observe_with(observer, handler, args.directories, recursive=args.recursive) 616 617 618@command( 619 [ 620 argument("command", help="Long-running command to run in a subprocess."), 621 argument( 622 "command_args", 623 metavar="arg", 624 nargs="*", 625 help=""" 626 Command arguments. 627 628 Note: Use -- before the command arguments, otherwise watchmedo will 629 try to interpret them. 630 """, 631 ), 632 argument( 633 "-d", 634 "--directory", 635 dest="directories", 636 metavar="DIRECTORY", 637 action="append", 638 help="Directory to watch. Use another -d or --directory option for each directory.", 639 ), 640 argument( 641 "-p", 642 "--pattern", 643 "--patterns", 644 dest="patterns", 645 default="*", 646 help="Matches event paths with these patterns (separated by ;).", 647 ), 648 argument( 649 "-i", 650 "--ignore-pattern", 651 "--ignore-patterns", 652 dest="ignore_patterns", 653 default="", 654 help="Ignores event paths with these patterns (separated by ;).", 655 ), 656 argument( 657 "-D", 658 "--ignore-directories", 659 dest="ignore_directories", 660 default=False, 661 action="store_true", 662 help="Ignores events for directories.", 663 ), 664 argument( 665 "-R", 666 "--recursive", 667 dest="recursive", 668 action="store_true", 669 help="Monitors the directories recursively.", 670 ), 671 argument( 672 "--interval", 673 "--timeout", 674 dest="timeout", 675 default=1.0, 676 type=float, 677 help="Use this as the polling interval/blocking timeout.", 678 ), 679 argument( 680 "--signal", 681 dest="signal", 682 default="SIGINT", 683 help="Stop the subprocess with this signal (default SIGINT).", 684 ), 685 argument("--debug-force-polling", action="store_true", help="[debug] Forces polling."), 686 argument( 687 "--kill-after", 688 dest="kill_after", 689 default=10.0, 690 type=float, 691 help="When stopping, kill the subprocess after the specified timeout in seconds (default 10.0).", 692 ), 693 argument( 694 "--debounce-interval", 695 dest="debounce_interval", 696 default=0.0, 697 type=float, 698 help="After a file change, Wait until the specified interval (in " 699 "seconds) passes with no file changes, and only then restart.", 700 ), 701 argument( 702 "--no-restart-on-command-exit", 703 dest="restart_on_command_exit", 704 default=True, 705 action="store_false", 706 help="Don't auto-restart the command after it exits.", 707 ), 708 ], 709) 710def auto_restart(args: Namespace) -> None: 711 """Command to start a long-running subprocess and restart it on matched events.""" 712 observer_cls: ObserverType 713 if args.debug_force_polling: 714 from watchdog.observers.polling import PollingObserver 715 716 observer_cls = PollingObserver 717 else: 718 from watchdog.observers import Observer 719 720 observer_cls = Observer 721 722 import signal 723 724 from watchdog.tricks import AutoRestartTrick 725 726 if not args.directories: 727 args.directories = ["."] 728 729 # Allow either signal name or number. 730 stop_signal = getattr(signal, args.signal) if args.signal.startswith("SIG") else int(args.signal) 731 732 # Handle termination signals by raising a semantic exception which will 733 # allow us to gracefully unwind and stop the observer 734 termination_signals = {signal.SIGTERM, signal.SIGINT} 735 736 if hasattr(signal, "SIGHUP"): 737 termination_signals.add(signal.SIGHUP) 738 739 def handler_termination_signal(_signum: signal._SIGNUM, _frame: object) -> None: 740 # Neuter all signals so that we don't attempt a double shutdown 741 for signum in termination_signals: 742 signal.signal(signum, signal.SIG_IGN) 743 raise WatchdogShutdownError 744 745 for signum in termination_signals: 746 signal.signal(signum, handler_termination_signal) 747 748 patterns, ignore_patterns = parse_patterns(args.patterns, args.ignore_patterns) 749 command = [args.command] 750 command.extend(args.command_args) 751 handler = AutoRestartTrick( 752 command, 753 patterns=patterns, 754 ignore_patterns=ignore_patterns, 755 ignore_directories=args.ignore_directories, 756 stop_signal=stop_signal, 757 kill_after=args.kill_after, 758 debounce_interval_seconds=args.debounce_interval, 759 restart_on_command_exit=args.restart_on_command_exit, 760 ) 761 handler.start() 762 observer = observer_cls(timeout=args.timeout) 763 try: 764 observe_with(observer, handler, args.directories, recursive=args.recursive) 765 except WatchdogShutdownError: 766 pass 767 finally: 768 handler.stop() 769 770 771class LogLevelError(Exception): 772 pass 773 774 775def _get_log_level_from_args(args: Namespace) -> str: 776 verbosity = sum(args.verbosity or []) 777 if verbosity < -1: 778 error = "-q/--quiet may be specified only once." 779 raise LogLevelError(error) 780 if verbosity > 2: 781 error = "-v/--verbose may be specified up to 2 times." 782 raise LogLevelError(error) 783 return ["ERROR", "WARNING", "INFO", "DEBUG"][1 + verbosity] 784 785 786def main() -> int: 787 """Entry-point function.""" 788 args = cli.parse_args() 789 if args.top_command is None: 790 cli.print_help() 791 return 1 792 793 try: 794 log_level = _get_log_level_from_args(args) 795 except LogLevelError as exc: 796 print(f"Error: {exc.args[0]}", file=sys.stderr) # noqa:T201 797 command_parsers[args.top_command].print_help() 798 return 1 799 logging.getLogger("watchdog").setLevel(log_level) 800 801 try: 802 args.func(args) 803 except KeyboardInterrupt: 804 return 130 805 806 return 0 807 808 809if __name__ == "__main__": 810 sys.exit(main()) 811