from __future__ import annotations import contextlib import threading from typing import TYPE_CHECKING from unittest.mock import patch import pytest from watchdog.events import FileModifiedEvent, FileSystemEventHandler from watchdog.observers.api import BaseObserver, EventEmitter if TYPE_CHECKING: from collections.abc import Iterator @pytest.fixture def observer() -> Iterator[BaseObserver]: obs = BaseObserver(EventEmitter) yield obs obs.stop() with contextlib.suppress(RuntimeError): obs.join() @pytest.fixture def observer2(): obs = BaseObserver(EventEmitter) yield obs obs.stop() with contextlib.suppress(RuntimeError): obs.join() def test_schedule_should_start_emitter_if_running(observer): observer.start() observer.schedule(None, "") (emitter,) = observer.emitters assert emitter.is_alive() def test_schedule_should_not_start_emitter_if_not_running(observer): observer.schedule(None, "") (emitter,) = observer.emitters assert not emitter.is_alive() def test_start_should_start_emitter(observer): observer.schedule(None, "") observer.start() (emitter,) = observer.emitters assert emitter.is_alive() def test_stop_should_stop_emitter(observer): observer.schedule(None, "") observer.start() (emitter,) = observer.emitters assert emitter.is_alive() observer.stop() observer.join() assert not observer.is_alive() assert not emitter.is_alive() def test_unschedule_self(observer): """ Tests that unscheduling a watch from within an event handler correctly correctly unregisters emitter and handler without deadlocking. """ class EventHandler(FileSystemEventHandler): def on_modified(self, event): observer.unschedule(watch) unschedule_finished.set() unschedule_finished = threading.Event() watch = observer.schedule(EventHandler(), "") observer.start() (emitter,) = observer.emitters emitter.queue_event(FileModifiedEvent("")) assert unschedule_finished.wait() assert len(observer.emitters) == 0 def test_schedule_after_unschedule_all(observer): observer.start() observer.schedule(None, "") assert len(observer.emitters) == 1 observer.unschedule_all() assert len(observer.emitters) == 0 observer.schedule(None, "") assert len(observer.emitters) == 1 def test_2_observers_on_the_same_path(observer, observer2): assert observer is not observer2 observer.schedule(None, "") assert len(observer.emitters) == 1 observer2.schedule(None, "") assert len(observer2.emitters) == 1 def test_start_failure_should_not_prevent_further_try(observer): observer.schedule(None, "") emitters = observer.emitters assert len(emitters) == 1 # Make the emitter to fail on start() def mocked_start(): raise OSError("Mock'ed!") emitter = next(iter(emitters)) with patch.object(emitter, "start", new=mocked_start), pytest.raises(OSError, match="Mock'ed!"): observer.start() # The emitter should be removed from the list assert len(observer.emitters) == 0 # Restoring the original behavior should work like there never be emitters observer.start() assert len(observer.emitters) == 0 # Re-scheduling the watch should work observer.schedule(None, "") assert len(observer.emitters) == 1 def test_schedule_failure_should_not_prevent_future_schedules(observer): observer.start() # Make the emitter fail on start(), and subsequently the observer to fail on schedule() def bad_start(_): raise OSError("Mock'ed!") with patch.object(EventEmitter, "start", new=bad_start), pytest.raises(OSError, match="Mock'ed!"): observer.schedule(None, "") # The emitter should not be in the list assert not observer.emitters # Re-scheduling the watch should work observer.schedule(None, "") assert len(observer.emitters) == 1