1""":module: watchdog.observers.fsevents2
2:synopsis: FSEvents based emitter implementation.
3:author: [email protected] (Thomas Amland)
4:author: [email protected] (Mickaël Schoentgen)
5:platforms: macOS
6"""
7
8from __future__ import annotations
9
10import logging
11import os
12import queue
13import unicodedata
14import warnings
15from threading import Thread
16from typing import TYPE_CHECKING
17
18# pyobjc
19import AppKit
20from FSEvents import (
21    CFRunLoopGetCurrent,
22    CFRunLoopRun,
23    CFRunLoopStop,
24    FSEventStreamCreate,
25    FSEventStreamInvalidate,
26    FSEventStreamRelease,
27    FSEventStreamScheduleWithRunLoop,
28    FSEventStreamStart,
29    FSEventStreamStop,
30    kCFAllocatorDefault,
31    kCFRunLoopDefaultMode,
32    kFSEventStreamCreateFlagFileEvents,
33    kFSEventStreamCreateFlagNoDefer,
34    kFSEventStreamEventFlagItemChangeOwner,
35    kFSEventStreamEventFlagItemCreated,
36    kFSEventStreamEventFlagItemFinderInfoMod,
37    kFSEventStreamEventFlagItemInodeMetaMod,
38    kFSEventStreamEventFlagItemIsDir,
39    kFSEventStreamEventFlagItemIsSymlink,
40    kFSEventStreamEventFlagItemModified,
41    kFSEventStreamEventFlagItemRemoved,
42    kFSEventStreamEventFlagItemRenamed,
43    kFSEventStreamEventFlagItemXattrMod,
44    kFSEventStreamEventIdSinceNow,
45)
46
47from watchdog.events import (
48    DirCreatedEvent,
49    DirDeletedEvent,
50    DirModifiedEvent,
51    DirMovedEvent,
52    FileCreatedEvent,
53    FileDeletedEvent,
54    FileModifiedEvent,
55    FileMovedEvent,
56    FileSystemEvent,
57)
58from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter
59
60if TYPE_CHECKING:
61    from typing import Callable
62
63    from watchdog.observers.api import EventQueue, ObservedWatch
64
65logger = logging.getLogger(__name__)
66
67message = "watchdog.observers.fsevents2 is deprecated and will be removed in a future release."
68warnings.warn(message, category=DeprecationWarning, stacklevel=1)
69logger.warning(message)
70
71
72class FSEventsQueue(Thread):
73    """Low level FSEvents client."""
74
75    def __init__(self, path: bytes | str) -> None:
76        Thread.__init__(self)
77        self._queue: queue.Queue[list[NativeEvent] | None] = queue.Queue()
78        self._run_loop = None
79
80        if isinstance(path, bytes):
81            path = os.fsdecode(path)
82        self._path = unicodedata.normalize("NFC", path)
83
84        context = None
85        latency = 1.0
86        self._stream_ref = FSEventStreamCreate(
87            kCFAllocatorDefault,
88            self._callback,
89            context,
90            [self._path],
91            kFSEventStreamEventIdSinceNow,
92            latency,
93            kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents,
94        )
95        if self._stream_ref is None:
96            error = "FSEvents. Could not create stream."
97            raise OSError(error)
98
99    def run(self) -> None:
100        pool = AppKit.NSAutoreleasePool.alloc().init()
101        self._run_loop = CFRunLoopGetCurrent()
102        FSEventStreamScheduleWithRunLoop(self._stream_ref, self._run_loop, kCFRunLoopDefaultMode)
103        if not FSEventStreamStart(self._stream_ref):
104            FSEventStreamInvalidate(self._stream_ref)
105            FSEventStreamRelease(self._stream_ref)
106            error = "FSEvents. Could not start stream."
107            raise OSError(error)
108
109        CFRunLoopRun()
110        FSEventStreamStop(self._stream_ref)
111        FSEventStreamInvalidate(self._stream_ref)
112        FSEventStreamRelease(self._stream_ref)
113        del pool
114        # Make sure waiting thread is notified
115        self._queue.put(None)
116
117    def stop(self) -> None:
118        if self._run_loop is not None:
119            CFRunLoopStop(self._run_loop)
120
121    def _callback(
122        self,
123        stream_ref: int,
124        client_callback_info: Callable,
125        num_events: int,
126        event_paths: list[bytes],
127        event_flags: list[int],
128        event_ids: list[int],
129    ) -> None:
130        events = [NativeEvent(path, flags, _id) for path, flags, _id in zip(event_paths, event_flags, event_ids)]
131        logger.debug("FSEvents callback. Got %d events:", num_events)
132        for e in events:
133            logger.debug(e)
134        self._queue.put(events)
135
136    def read_events(self) -> list[NativeEvent] | None:
137        """Returns a list or one or more events, or None if there are no more
138        events to be read.
139        """
140        return self._queue.get() if self.is_alive() else None
141
142
143class NativeEvent:
144    def __init__(self, path: bytes, flags: int, event_id: int) -> None:
145        self.path = path
146        self.flags = flags
147        self.event_id = event_id
148        self.is_created = bool(flags & kFSEventStreamEventFlagItemCreated)
149        self.is_removed = bool(flags & kFSEventStreamEventFlagItemRemoved)
150        self.is_renamed = bool(flags & kFSEventStreamEventFlagItemRenamed)
151        self.is_modified = bool(flags & kFSEventStreamEventFlagItemModified)
152        self.is_change_owner = bool(flags & kFSEventStreamEventFlagItemChangeOwner)
153        self.is_inode_meta_mod = bool(flags & kFSEventStreamEventFlagItemInodeMetaMod)
154        self.is_finder_info_mod = bool(flags & kFSEventStreamEventFlagItemFinderInfoMod)
155        self.is_xattr_mod = bool(flags & kFSEventStreamEventFlagItemXattrMod)
156        self.is_symlink = bool(flags & kFSEventStreamEventFlagItemIsSymlink)
157        self.is_directory = bool(flags & kFSEventStreamEventFlagItemIsDir)
158
159    @property
160    def _event_type(self) -> str:
161        if self.is_created:
162            return "Created"
163        if self.is_removed:
164            return "Removed"
165        if self.is_renamed:
166            return "Renamed"
167        if self.is_modified:
168            return "Modified"
169        if self.is_inode_meta_mod:
170            return "InodeMetaMod"
171        if self.is_xattr_mod:
172            return "XattrMod"
173        return "Unknown"
174
175    def __repr__(self) -> str:
176        return (
177            f"<{type(self).__name__}: path={self.path!r}, type={self._event_type},"
178            f" is_dir={self.is_directory}, flags={hex(self.flags)}, id={self.event_id}>"
179        )
180
181
182class FSEventsEmitter(EventEmitter):
183    """FSEvents based event emitter. Handles conversion of native events."""
184
185    def __init__(
186        self,
187        event_queue: EventQueue,
188        watch: ObservedWatch,
189        *,
190        timeout: float = DEFAULT_EMITTER_TIMEOUT,
191        event_filter: list[type[FileSystemEvent]] | None = None,
192    ):
193        super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
194        self._fsevents = FSEventsQueue(watch.path)
195        self._fsevents.start()
196
197    def on_thread_stop(self) -> None:
198        self._fsevents.stop()
199
200    def queue_events(self, timeout: float) -> None:
201        events = self._fsevents.read_events()
202        if events is None:
203            return
204        i = 0
205        while i < len(events):
206            event = events[i]
207
208            cls: type[FileSystemEvent]
209            # For some reason the create and remove flags are sometimes also
210            # set for rename and modify type events, so let those take
211            # precedence.
212            if event.is_renamed:
213                # Internal moves appears to always be consecutive in the same
214                # buffer and have IDs differ by exactly one (while others
215                # don't) making it possible to pair up the two events coming
216                # from a single move operation. (None of this is documented!)
217                # Otherwise, guess whether file was moved in or out.
218                # TODO: handle id wrapping
219                if i + 1 < len(events) and events[i + 1].is_renamed and events[i + 1].event_id == event.event_id + 1:
220                    cls = DirMovedEvent if event.is_directory else FileMovedEvent
221                    self.queue_event(cls(event.path, events[i + 1].path))
222                    self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
223                    self.queue_event(DirModifiedEvent(os.path.dirname(events[i + 1].path)))
224                    i += 1
225                elif os.path.exists(event.path):
226                    cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
227                    self.queue_event(cls(event.path))
228                    self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
229                else:
230                    cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
231                    self.queue_event(cls(event.path))
232                    self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
233                # TODO: generate events for tree
234
235            elif event.is_modified or event.is_inode_meta_mod or event.is_xattr_mod:
236                cls = DirModifiedEvent if event.is_directory else FileModifiedEvent
237                self.queue_event(cls(event.path))
238
239            elif event.is_created:
240                cls = DirCreatedEvent if event.is_directory else FileCreatedEvent
241                self.queue_event(cls(event.path))
242                self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
243
244            elif event.is_removed:
245                cls = DirDeletedEvent if event.is_directory else FileDeletedEvent
246                self.queue_event(cls(event.path))
247                self.queue_event(DirModifiedEvent(os.path.dirname(event.path)))
248            i += 1
249
250
251class FSEventsObserver2(BaseObserver):
252    def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
253        super().__init__(FSEventsEmitter, timeout=timeout)
254