1from __future__ import annotations
2
3import os.path
4import platform
5import threading
6from typing import TYPE_CHECKING
7
8from watchdog.events import (
9    DirCreatedEvent,
10    DirDeletedEvent,
11    DirModifiedEvent,
12    DirMovedEvent,
13    FileCreatedEvent,
14    FileDeletedEvent,
15    FileModifiedEvent,
16    FileMovedEvent,
17    generate_sub_created_events,
18    generate_sub_moved_events,
19)
20from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter
21from watchdog.observers.winapi import close_directory_handle, get_directory_handle, read_events
22
23if TYPE_CHECKING:
24    from ctypes.wintypes import HANDLE
25
26    from watchdog.events import FileSystemEvent
27    from watchdog.observers.api import EventQueue, ObservedWatch
28    from watchdog.observers.winapi import WinAPINativeEvent
29
30
31class WindowsApiEmitter(EventEmitter):
32    """Windows API-based emitter that uses ReadDirectoryChangesW
33    to detect file system changes for a watch.
34    """
35
36    def __init__(
37        self,
38        event_queue: EventQueue,
39        watch: ObservedWatch,
40        *,
41        timeout: float = DEFAULT_EMITTER_TIMEOUT,
42        event_filter: list[type[FileSystemEvent]] | None = None,
43    ) -> None:
44        super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
45        self._lock = threading.Lock()
46        self._whandle: HANDLE | None = None
47
48    def on_thread_start(self) -> None:
49        self._whandle = get_directory_handle(self.watch.path)
50
51    if platform.python_implementation() == "PyPy":
52
53        def start(self) -> None:
54            """PyPy needs some time before receiving events, see #792."""
55            from time import sleep
56
57            super().start()
58            sleep(0.01)
59
60    def on_thread_stop(self) -> None:
61        if self._whandle:
62            close_directory_handle(self._whandle)
63
64    def _read_events(self) -> list[WinAPINativeEvent]:
65        if not self._whandle:
66            return []
67        return read_events(self._whandle, self.watch.path, recursive=self.watch.is_recursive)
68
69    def queue_events(self, timeout: float) -> None:
70        winapi_events = self._read_events()
71        with self._lock:
72            last_renamed_src_path = ""
73            for winapi_event in winapi_events:
74                src_path = os.path.join(self.watch.path, winapi_event.src_path)
75
76                if winapi_event.is_renamed_old:
77                    last_renamed_src_path = src_path
78                elif winapi_event.is_renamed_new:
79                    dest_path = src_path
80                    src_path = last_renamed_src_path
81                    if os.path.isdir(dest_path):
82                        self.queue_event(DirMovedEvent(src_path, dest_path))
83                        if self.watch.is_recursive:
84                            for sub_moved_event in generate_sub_moved_events(src_path, dest_path):
85                                self.queue_event(sub_moved_event)
86                    else:
87                        self.queue_event(FileMovedEvent(src_path, dest_path))
88                elif winapi_event.is_modified:
89                    cls = DirModifiedEvent if os.path.isdir(src_path) else FileModifiedEvent
90                    self.queue_event(cls(src_path))
91                elif winapi_event.is_added:
92                    isdir = os.path.isdir(src_path)
93                    cls = DirCreatedEvent if isdir else FileCreatedEvent
94                    self.queue_event(cls(src_path))
95                    if isdir and self.watch.is_recursive:
96                        for sub_created_event in generate_sub_created_events(src_path):
97                            self.queue_event(sub_created_event)
98                elif winapi_event.is_removed:
99                    self.queue_event(FileDeletedEvent(src_path))
100                elif winapi_event.is_removed_self:
101                    self.queue_event(DirDeletedEvent(self.watch.path))
102                    self.stop()
103
104
105class WindowsApiObserver(BaseObserver):
106    """Observer thread that schedules watching directories and dispatches
107    calls to event handlers.
108    """
109
110    def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
111        super().__init__(WindowsApiEmitter, timeout=timeout)
112