1""":module: watchdog.observers.kqueue
2:synopsis: ``kqueue(2)`` based emitter implementation.
3:author: [email protected] (Yesudeep Mangalapilly)
4:author: [email protected] (Mickaël Schoentgen)
5:platforms: macOS and BSD with kqueue(2).
6
7.. WARNING:: kqueue is a very heavyweight way to monitor file systems.
8             Each kqueue-detected directory modification triggers
9             a full directory scan. Traversing the entire directory tree
10             and opening file descriptors for all files will create
11             performance problems. We need to find a way to re-scan
12             only those directories which report changes and do a diff
13             between two sub-DirectorySnapshots perhaps.
14
15.. ADMONITION:: About OS X performance guidelines
16
17    Quote from the `macOS File System Performance Guidelines`_:
18
19        "When you only want to track changes on a file or directory, be sure to
20        open it using the ``O_EVTONLY`` flag. This flag prevents the file or
21        directory from being marked as open or in use. This is important
22        if you are tracking files on a removable volume and the user tries to
23        unmount the volume. With this flag in place, the system knows it can
24        dismiss the volume. If you had opened the files or directories without
25        this flag, the volume would be marked as busy and would not be
26        unmounted."
27
28    ``O_EVTONLY`` is defined as ``0x8000`` in the OS X header files.
29    More information here: http://www.mlsite.net/blog/?p=2312
30
31Classes
32-------
33.. autoclass:: KqueueEmitter
34   :members:
35   :show-inheritance:
36
37Collections and Utility Classes
38-------------------------------
39.. autoclass:: KeventDescriptor
40   :members:
41   :show-inheritance:
42
43.. autoclass:: KeventDescriptorSet
44   :members:
45   :show-inheritance:
46
47.. _macOS File System Performance Guidelines:
48    http://developer.apple.com/library/ios/#documentation/Performance/Conceptual/FileSystem/Articles/TrackingChanges.html#//apple_ref/doc/uid/20001993-CJBJFIDD
49
50"""
51
52
53# The `select` module varies between platforms.
54# mypy may complain about missing module attributes depending on which platform it's running on.
55# The comment below disables mypy's attribute check.
56# mypy: disable-error-code="attr-defined, name-defined"
57
58from __future__ import annotations
59
60import contextlib
61import errno
62import os
63import os.path
64import select
65import threading
66from stat import S_ISDIR
67from typing import TYPE_CHECKING
68
69from watchdog.events import (
70    EVENT_TYPE_CREATED,
71    EVENT_TYPE_DELETED,
72    EVENT_TYPE_MOVED,
73    DirCreatedEvent,
74    DirDeletedEvent,
75    DirModifiedEvent,
76    DirMovedEvent,
77    FileCreatedEvent,
78    FileDeletedEvent,
79    FileModifiedEvent,
80    FileMovedEvent,
81    generate_sub_moved_events,
82)
83from watchdog.observers.api import DEFAULT_EMITTER_TIMEOUT, DEFAULT_OBSERVER_TIMEOUT, BaseObserver, EventEmitter
84from watchdog.utils import platform
85from watchdog.utils.dirsnapshot import DirectorySnapshot
86
87if TYPE_CHECKING:
88    from collections.abc import Generator
89    from typing import Callable
90
91    from watchdog.events import FileSystemEvent
92    from watchdog.observers.api import EventQueue, ObservedWatch
93
94# Maximum number of events to process.
95MAX_EVENTS = 4096
96
97# O_EVTONLY value from the header files for OS X only.
98O_EVTONLY = 0x8000
99
100# Pre-calculated values for the kevent filter, flags, and fflags attributes.
101WATCHDOG_OS_OPEN_FLAGS = O_EVTONLY if platform.is_darwin() else os.O_RDONLY | os.O_NONBLOCK
102WATCHDOG_KQ_FILTER = select.KQ_FILTER_VNODE
103WATCHDOG_KQ_EV_FLAGS = select.KQ_EV_ADD | select.KQ_EV_ENABLE | select.KQ_EV_CLEAR
104WATCHDOG_KQ_FFLAGS = (
105    select.KQ_NOTE_DELETE
106    | select.KQ_NOTE_WRITE
107    | select.KQ_NOTE_EXTEND
108    | select.KQ_NOTE_ATTRIB
109    | select.KQ_NOTE_LINK
110    | select.KQ_NOTE_RENAME
111    | select.KQ_NOTE_REVOKE
112)
113
114
115def absolute_path(path: bytes | str) -> bytes | str:
116    return os.path.abspath(os.path.normpath(path))
117
118
119# Flag tests.
120
121
122def is_deleted(kev: select.kevent) -> bool:
123    """Determines whether the given kevent represents deletion."""
124    return kev.fflags & select.KQ_NOTE_DELETE > 0
125
126
127def is_modified(kev: select.kevent) -> bool:
128    """Determines whether the given kevent represents modification."""
129    fflags = kev.fflags
130    return (fflags & select.KQ_NOTE_EXTEND > 0) or (fflags & select.KQ_NOTE_WRITE > 0)
131
132
133def is_attrib_modified(kev: select.kevent) -> bool:
134    """Determines whether the given kevent represents attribute modification."""
135    return kev.fflags & select.KQ_NOTE_ATTRIB > 0
136
137
138def is_renamed(kev: select.kevent) -> bool:
139    """Determines whether the given kevent represents movement."""
140    return kev.fflags & select.KQ_NOTE_RENAME > 0
141
142
143class KeventDescriptorSet:
144    """Thread-safe kevent descriptor collection."""
145
146    def __init__(self) -> None:
147        self._descriptors: set[KeventDescriptor] = set()
148        self._descriptor_for_path: dict[bytes | str, KeventDescriptor] = {}
149        self._descriptor_for_fd: dict[int, KeventDescriptor] = {}
150        self._kevents: list[select.kevent] = []
151        self._lock = threading.Lock()
152
153    @property
154    def kevents(self) -> list[select.kevent]:
155        """List of kevents monitored."""
156        with self._lock:
157            return self._kevents
158
159    @property
160    def paths(self) -> list[bytes | str]:
161        """List of paths for which kevents have been created."""
162        with self._lock:
163            return list(self._descriptor_for_path.keys())
164
165    def get_for_fd(self, fd: int) -> KeventDescriptor:
166        """Given a file descriptor, returns the kevent descriptor object
167        for it.
168
169        :param fd:
170            OS file descriptor.
171        :type fd:
172            ``int``
173        :returns:
174            A :class:`KeventDescriptor` object.
175        """
176        with self._lock:
177            return self._descriptor_for_fd[fd]
178
179    def get(self, path: bytes | str) -> KeventDescriptor:
180        """Obtains a :class:`KeventDescriptor` object for the specified path.
181
182        :param path:
183            Path for which the descriptor will be obtained.
184        """
185        with self._lock:
186            path = absolute_path(path)
187            return self._get(path)
188
189    def __contains__(self, path: bytes | str) -> bool:
190        """Determines whether a :class:`KeventDescriptor has been registered
191        for the specified path.
192
193        :param path:
194            Path for which the descriptor will be obtained.
195        """
196        with self._lock:
197            path = absolute_path(path)
198            return self._has_path(path)
199
200    def add(self, path: bytes | str, *, is_directory: bool) -> None:
201        """Adds a :class:`KeventDescriptor` to the collection for the given
202        path.
203
204        :param path:
205            The path for which a :class:`KeventDescriptor` object will be
206            added.
207        :param is_directory:
208            ``True`` if the path refers to a directory; ``False`` otherwise.
209        :type is_directory:
210            ``bool``
211        """
212        with self._lock:
213            path = absolute_path(path)
214            if not self._has_path(path):
215                self._add_descriptor(KeventDescriptor(path, is_directory=is_directory))
216
217    def remove(self, path: bytes | str) -> None:
218        """Removes the :class:`KeventDescriptor` object for the given path
219        if it already exists.
220
221        :param path:
222            Path for which the :class:`KeventDescriptor` object will be
223            removed.
224        """
225        with self._lock:
226            path = absolute_path(path)
227            if self._has_path(path):
228                self._remove_descriptor(self._get(path))
229
230    def clear(self) -> None:
231        """Clears the collection and closes all open descriptors."""
232        with self._lock:
233            for descriptor in self._descriptors:
234                descriptor.close()
235            self._descriptors.clear()
236            self._descriptor_for_fd.clear()
237            self._descriptor_for_path.clear()
238            self._kevents = []
239
240    # Thread-unsafe methods. Locking is provided at a higher level.
241    def _get(self, path: bytes | str) -> KeventDescriptor:
242        """Returns a kevent descriptor for a given path."""
243        return self._descriptor_for_path[path]
244
245    def _has_path(self, path: bytes | str) -> bool:
246        """Determines whether a :class:`KeventDescriptor` for the specified
247        path exists already in the collection.
248        """
249        return path in self._descriptor_for_path
250
251    def _add_descriptor(self, descriptor: KeventDescriptor) -> None:
252        """Adds a descriptor to the collection.
253
254        :param descriptor:
255            An instance of :class:`KeventDescriptor` to be added.
256        """
257        self._descriptors.add(descriptor)
258        self._kevents.append(descriptor.kevent)
259        self._descriptor_for_path[descriptor.path] = descriptor
260        self._descriptor_for_fd[descriptor.fd] = descriptor
261
262    def _remove_descriptor(self, descriptor: KeventDescriptor) -> None:
263        """Removes a descriptor from the collection.
264
265        :param descriptor:
266            An instance of :class:`KeventDescriptor` to be removed.
267        """
268        self._descriptors.remove(descriptor)
269        del self._descriptor_for_fd[descriptor.fd]
270        del self._descriptor_for_path[descriptor.path]
271        self._kevents.remove(descriptor.kevent)
272        descriptor.close()
273
274
275class KeventDescriptor:
276    """A kevent descriptor convenience data structure to keep together:
277
278        * kevent
279        * directory status
280        * path
281        * file descriptor
282
283    :param path:
284        Path string for which a kevent descriptor will be created.
285    :param is_directory:
286        ``True`` if the path refers to a directory; ``False`` otherwise.
287    :type is_directory:
288        ``bool``
289    """
290
291    def __init__(self, path: bytes | str, *, is_directory: bool) -> None:
292        self._path = absolute_path(path)
293        self._is_directory = is_directory
294        self._fd = os.open(path, WATCHDOG_OS_OPEN_FLAGS)
295        self._kev = select.kevent(
296            self._fd,
297            filter=WATCHDOG_KQ_FILTER,
298            flags=WATCHDOG_KQ_EV_FLAGS,
299            fflags=WATCHDOG_KQ_FFLAGS,
300        )
301
302    @property
303    def fd(self) -> int:
304        """OS file descriptor for the kevent descriptor."""
305        return self._fd
306
307    @property
308    def path(self) -> bytes | str:
309        """The path associated with the kevent descriptor."""
310        return self._path
311
312    @property
313    def kevent(self) -> select.kevent:
314        """The kevent object associated with the kevent descriptor."""
315        return self._kev
316
317    @property
318    def is_directory(self) -> bool:
319        """Determines whether the kevent descriptor refers to a directory.
320
321        :returns:
322            ``True`` or ``False``
323        """
324        return self._is_directory
325
326    def close(self) -> None:
327        """Closes the file descriptor associated with a kevent descriptor."""
328        with contextlib.suppress(OSError):
329            os.close(self.fd)
330
331    @property
332    def key(self) -> tuple[bytes | str, bool]:
333        return (self.path, self.is_directory)
334
335    def __eq__(self, descriptor: object) -> bool:
336        if not isinstance(descriptor, KeventDescriptor):
337            return NotImplemented
338        return self.key == descriptor.key
339
340    def __ne__(self, descriptor: object) -> bool:
341        if not isinstance(descriptor, KeventDescriptor):
342            return NotImplemented
343        return self.key != descriptor.key
344
345    def __hash__(self) -> int:
346        return hash(self.key)
347
348    def __repr__(self) -> str:
349        return f"<{type(self).__name__}: path={self.path!r}, is_directory={self.is_directory}>"
350
351
352class KqueueEmitter(EventEmitter):
353    """kqueue(2)-based event emitter.
354
355    .. ADMONITION:: About ``kqueue(2)`` behavior and this implementation
356
357              ``kqueue(2)`` monitors file system events only for
358              open descriptors, which means, this emitter does a lot of
359              book-keeping behind the scenes to keep track of open
360              descriptors for every entry in the monitored directory tree.
361
362              This also means the number of maximum open file descriptors
363              on your system must be increased **manually**.
364              Usually, issuing a call to ``ulimit`` should suffice::
365
366                  ulimit -n 1024
367
368              Ensure that you pick a number that is larger than the
369              number of files you expect to be monitored.
370
371              ``kqueue(2)`` does not provide enough information about the
372              following things:
373
374              * The destination path of a file or directory that is renamed.
375              * Creation of a file or directory within a directory; in this
376                case, ``kqueue(2)`` only indicates a modified event on the
377                parent directory.
378
379              Therefore, this emitter takes a snapshot of the directory
380              tree when ``kqueue(2)`` detects a change on the file system
381              to be able to determine the above information.
382
383    :param event_queue:
384        The event queue to fill with events.
385    :param watch:
386        A watch object representing the directory to monitor.
387    :type watch:
388        :class:`watchdog.observers.api.ObservedWatch`
389    :param timeout:
390        Read events blocking timeout (in seconds).
391    :type timeout:
392        ``float``
393    :param event_filter:
394        Collection of event types to emit, or None for no filtering (default).
395    :type event_filter:
396        Iterable[:class:`watchdog.events.FileSystemEvent`] | None
397    :param stat: stat function. See ``os.stat`` for details.
398    """
399
400    def __init__(
401        self,
402        event_queue: EventQueue,
403        watch: ObservedWatch,
404        *,
405        timeout: float = DEFAULT_EMITTER_TIMEOUT,
406        event_filter: list[type[FileSystemEvent]] | None = None,
407        stat: Callable[[str], os.stat_result] = os.stat,
408    ) -> None:
409        super().__init__(event_queue, watch, timeout=timeout, event_filter=event_filter)
410
411        self._kq = select.kqueue()
412        self._lock = threading.RLock()
413
414        # A collection of KeventDescriptor.
415        self._descriptors = KeventDescriptorSet()
416
417        def custom_stat(path: str, cls: KqueueEmitter = self) -> os.stat_result:
418            stat_info = stat(path)
419            cls._register_kevent(path, is_directory=S_ISDIR(stat_info.st_mode))
420            return stat_info
421
422        self._snapshot = DirectorySnapshot(watch.path, recursive=watch.is_recursive, stat=custom_stat)
423
424    def _register_kevent(self, path: bytes | str, *, is_directory: bool) -> None:
425        """Registers a kevent descriptor for the given path.
426
427        :param path:
428            Path for which a kevent descriptor will be created.
429        :param is_directory:
430            ``True`` if the path refers to a directory; ``False`` otherwise.
431        :type is_directory:
432            ``bool``
433        """
434        try:
435            self._descriptors.add(path, is_directory=is_directory)
436        except OSError as e:
437            if e.errno == errno.ENOENT:
438                # Probably dealing with a temporary file that was created
439                # and then quickly deleted before we could open
440                # a descriptor for it. Therefore, simply queue a sequence
441                # of created and deleted events for the path.
442
443                # TODO: We could simply ignore these files.
444                # Locked files cause the python process to die with
445                # a bus error when we handle temporary files.
446                # eg. .git/index.lock when running tig operations.
447                # I don't fully understand this at the moment.
448                pass
449            elif e.errno == errno.EOPNOTSUPP:
450                # Probably dealing with the socket or special file
451                # mounted through a file system that does not support
452                # access to it (e.g. NFS). On BSD systems look at
453                # EOPNOTSUPP in man 2 open.
454                pass
455            else:
456                # All other errors are propagated.
457                raise
458
459    def _unregister_kevent(self, path: bytes | str) -> None:
460        """Convenience function to close the kevent descriptor for a
461        specified kqueue-monitored path.
462
463        :param path:
464            Path for which the kevent descriptor will be closed.
465        """
466        self._descriptors.remove(path)
467
468    def queue_event(self, event: FileSystemEvent) -> None:
469        """Handles queueing a single event object.
470
471        :param event:
472            An instance of :class:`watchdog.events.FileSystemEvent`
473            or a subclass.
474        """
475        # Handles all the book keeping for queued events.
476        # We do not need to fire moved/deleted events for all subitems in
477        # a directory tree here, because this function is called by kqueue
478        # for all those events anyway.
479        EventEmitter.queue_event(self, event)
480        if event.event_type == EVENT_TYPE_CREATED:
481            self._register_kevent(event.src_path, is_directory=event.is_directory)
482        elif event.event_type == EVENT_TYPE_MOVED:
483            self._unregister_kevent(event.src_path)
484            self._register_kevent(event.dest_path, is_directory=event.is_directory)
485        elif event.event_type == EVENT_TYPE_DELETED:
486            self._unregister_kevent(event.src_path)
487
488    def _gen_kqueue_events(
489        self, kev: select.kevent, ref_snapshot: DirectorySnapshot, new_snapshot: DirectorySnapshot
490    ) -> Generator[FileSystemEvent]:
491        """Generate events from the kevent list returned from the call to
492        :meth:`select.kqueue.control`.
493
494        .. NOTE:: kqueue only tells us about deletions, file modifications,
495                  attribute modifications. The other events, namely,
496                  file creation, directory modification, file rename,
497                  directory rename, directory creation, etc. are
498                  determined by comparing directory snapshots.
499        """
500        descriptor = self._descriptors.get_for_fd(kev.ident)
501        src_path = descriptor.path
502
503        if is_renamed(kev):
504            # Kqueue does not specify the destination names for renames
505            # to, so we have to process these using the a snapshot
506            # of the directory.
507            yield from self._gen_renamed_events(
508                src_path,
509                ref_snapshot,
510                new_snapshot,
511                is_directory=descriptor.is_directory,
512            )
513        elif is_attrib_modified(kev):
514            if descriptor.is_directory:
515                yield DirModifiedEvent(src_path)
516            else:
517                yield FileModifiedEvent(src_path)
518        elif is_modified(kev):
519            if descriptor.is_directory:
520                if self.watch.is_recursive or self.watch.path == src_path:
521                    # When a directory is modified, it may be due to
522                    # sub-file/directory renames or new file/directory
523                    # creation. We determine all this by comparing
524                    # snapshots later.
525                    yield DirModifiedEvent(src_path)
526            else:
527                yield FileModifiedEvent(src_path)
528        elif is_deleted(kev):
529            if descriptor.is_directory:
530                yield DirDeletedEvent(src_path)
531            else:
532                yield FileDeletedEvent(src_path)
533
534    def _parent_dir_modified(self, src_path: bytes | str) -> DirModifiedEvent:
535        """Helper to generate a DirModifiedEvent on the parent of src_path."""
536        return DirModifiedEvent(os.path.dirname(src_path))
537
538    def _gen_renamed_events(
539        self,
540        src_path: bytes | str,
541        ref_snapshot: DirectorySnapshot,
542        new_snapshot: DirectorySnapshot,
543        *,
544        is_directory: bool,
545    ) -> Generator[FileSystemEvent]:
546        """Compares information from two directory snapshots (one taken before
547        the rename operation and another taken right after) to determine the
548        destination path of the file system object renamed, and yields
549        the appropriate events to be queued.
550        """
551        try:
552            f_inode = ref_snapshot.inode(src_path)
553        except KeyError:
554            # Probably caught a temporary file/directory that was renamed
555            # and deleted. Fires a sequence of created and deleted events
556            # for the path.
557            if is_directory:
558                yield DirCreatedEvent(src_path)
559                yield DirDeletedEvent(src_path)
560            else:
561                yield FileCreatedEvent(src_path)
562                yield FileDeletedEvent(src_path)
563                # We don't process any further and bail out assuming
564            # the event represents deletion/creation instead of movement.
565            return
566
567        dest_path = new_snapshot.path(f_inode)
568        if dest_path is not None:
569            dest_path = absolute_path(dest_path)
570            if is_directory:
571                yield DirMovedEvent(src_path, dest_path)
572            else:
573                yield FileMovedEvent(src_path, dest_path)
574            yield self._parent_dir_modified(src_path)
575            yield self._parent_dir_modified(dest_path)
576            if is_directory and self.watch.is_recursive:
577                # TODO: Do we need to fire moved events for the items
578                # inside the directory tree? Does kqueue does this
579                # all by itself? Check this and then enable this code
580                # only if it doesn't already.
581                # A: It doesn't. So I've enabled this block.
582                yield from generate_sub_moved_events(src_path, dest_path)
583        else:
584            # If the new snapshot does not have an inode for the
585            # old path, we haven't found the new name. Therefore,
586            # we mark it as deleted and remove unregister the path.
587            if is_directory:
588                yield DirDeletedEvent(src_path)
589            else:
590                yield FileDeletedEvent(src_path)
591            yield self._parent_dir_modified(src_path)
592
593    def _read_events(self, timeout: float) -> list[select.kevent]:
594        """Reads events from a call to the blocking
595        :meth:`select.kqueue.control()` method.
596
597        :param timeout:
598            Blocking timeout for reading events.
599        :type timeout:
600            ``float`` (seconds)
601        """
602        return self._kq.control(self._descriptors.kevents, MAX_EVENTS, timeout)
603
604    def queue_events(self, timeout: float) -> None:
605        """Queues events by reading them from a call to the blocking
606        :meth:`select.kqueue.control()` method.
607
608        :param timeout:
609            Blocking timeout for reading events.
610        :type timeout:
611            ``float`` (seconds)
612        """
613        with self._lock:
614            try:
615                event_list = self._read_events(timeout)
616                # TODO: investigate why order appears to be reversed
617                event_list.reverse()
618
619                # Take a fresh snapshot of the directory and update the
620                # saved snapshot.
621                new_snapshot = DirectorySnapshot(self.watch.path, recursive=self.watch.is_recursive)
622                ref_snapshot = self._snapshot
623                self._snapshot = new_snapshot
624                diff_events = new_snapshot - ref_snapshot
625
626                # Process events
627                for directory_created in diff_events.dirs_created:
628                    self.queue_event(DirCreatedEvent(directory_created))
629                for file_created in diff_events.files_created:
630                    self.queue_event(FileCreatedEvent(file_created))
631                for file_modified in diff_events.files_modified:
632                    self.queue_event(FileModifiedEvent(file_modified))
633
634                for kev in event_list:
635                    for event in self._gen_kqueue_events(kev, ref_snapshot, new_snapshot):
636                        self.queue_event(event)
637
638            except OSError as e:
639                if e.errno != errno.EBADF:
640                    raise
641
642    def on_thread_stop(self) -> None:
643        # Clean up.
644        with self._lock:
645            self._descriptors.clear()
646            self._kq.close()
647
648
649class KqueueObserver(BaseObserver):
650    """Observer thread that schedules watching directories and dispatches
651    calls to event handlers.
652    """
653
654    def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
655        super().__init__(KqueueEmitter, timeout=timeout)
656