1from __future__ import annotations
2
3import errno
4import os
5import pickle
6import time
7from unittest.mock import patch
8
9from watchdog.utils import platform
10from watchdog.utils.dirsnapshot import DirectorySnapshot, DirectorySnapshotDiff, EmptyDirectorySnapshot
11
12from .shell import mkdir, mv, rm, touch
13
14
15def wait():
16    """
17    Wait long enough for file/folder mtime to change. This is needed
18    to be able to detected modifications.
19    """
20    if platform.is_darwin() or platform.is_windows():
21        # on macOS resolution of stat.mtime is only 1 second
22        time.sleep(1.5)
23    else:
24        time.sleep(0.5)
25
26
27def test_pickle(p):
28    """It should be possible to pickle a snapshot."""
29    mkdir(p("dir1"))
30    snasphot = DirectorySnapshot(p("dir1"))
31    pickle.dumps(snasphot)
32
33
34def test_move_to(p):
35    mkdir(p("dir1"))
36    mkdir(p("dir2"))
37    touch(p("dir1", "a"))
38    ref = DirectorySnapshot(p("dir2"))
39    mv(p("dir1", "a"), p("dir2", "b"))
40    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p("dir2")))
41    assert diff.files_created == [p("dir2", "b")]
42
43
44def test_move_to_with_context_manager(p):
45    mkdir(p("dir1"))
46    touch(p("dir1", "a"))
47    mkdir(p("dir2"))
48
49    dir1_cm = DirectorySnapshotDiff.ContextManager(p("dir1"))
50    dir2_cm = DirectorySnapshotDiff.ContextManager(p("dir2"))
51    with dir1_cm, dir2_cm:
52        mv(p("dir1", "a"), p("dir2", "b"))
53
54    assert dir1_cm.diff.files_deleted == [p("dir1", "a")]
55    assert dir2_cm.diff.files_created == [p("dir2", "b")]
56
57
58def test_move_from(p):
59    mkdir(p("dir1"))
60    mkdir(p("dir2"))
61    touch(p("dir1", "a"))
62    ref = DirectorySnapshot(p("dir1"))
63    mv(p("dir1", "a"), p("dir2", "b"))
64    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p("dir1")))
65    assert diff.files_deleted == [p("dir1", "a")]
66
67
68def test_move_internal(p):
69    mkdir(p("dir1"))
70    mkdir(p("dir2"))
71    touch(p("dir1", "a"))
72    ref = DirectorySnapshot(p(""))
73    mv(p("dir1", "a"), p("dir2", "b"))
74    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p("")))
75    assert diff.files_moved == [(p("dir1", "a"), p("dir2", "b"))]
76    assert diff.files_created == []
77    assert diff.files_deleted == []
78
79
80def test_move_replace(p):
81    mkdir(p("dir1"))
82    mkdir(p("dir2"))
83    touch(p("dir1", "a"))
84    touch(p("dir2", "b"))
85    ref = DirectorySnapshot(p(""))
86    mv(p("dir1", "a"), p("dir2", "b"))
87    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p("")))
88    assert diff.files_moved == [(p("dir1", "a"), p("dir2", "b"))]
89    assert diff.files_deleted == [p("dir2", "b")]
90    assert diff.files_created == []
91
92
93def test_dir_modify_on_create(p):
94    ref = DirectorySnapshot(p(""))
95    wait()
96    touch(p("a"))
97    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p("")))
98    assert diff.dirs_modified == [p("")]
99
100
101def test_dir_modify_on_move(p):
102    mkdir(p("dir1"))
103    mkdir(p("dir2"))
104    touch(p("dir1", "a"))
105    ref = DirectorySnapshot(p(""))
106    wait()
107    mv(p("dir1", "a"), p("dir2", "b"))
108    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p("")))
109    assert set(diff.dirs_modified) == {p("dir1"), p("dir2")}
110
111
112def test_detect_modify_for_moved_files(p):
113    touch(p("a"))
114    ref = DirectorySnapshot(p(""))
115    wait()
116    touch(p("a"))
117    mv(p("a"), p("b"))
118    diff = DirectorySnapshotDiff(ref, DirectorySnapshot(p("")))
119    assert diff.files_moved == [(p("a"), p("b"))]
120    assert diff.files_modified == [p("a")]
121
122
123def test_replace_dir_with_file(p):
124    # Replace a dir with a file of the same name just before the normal listdir
125    # call and ensure it doesn't cause an exception
126
127    def listdir_fcn(path):
128        if path == p("root", "dir"):
129            rm(path, recursive=True)
130            touch(path)
131        return os.scandir(path)
132
133    mkdir(p("root"))
134    mkdir(p("root", "dir"))
135
136    # Should NOT raise an OSError (ENOTDIR)
137    DirectorySnapshot(p("root"), listdir=listdir_fcn)
138
139
140def test_permission_error(p):
141    # Test that unreadable folders are not raising exceptions
142    mkdir(p("a", "b", "c"), parents=True)
143
144    ref = DirectorySnapshot(p(""))
145    walk_orig = DirectorySnapshot.walk
146
147    def walk(self, root):
148        """Generate a permission error on folder "a/b"."""
149        # Generate the permission error
150        if root.startswith(p("a", "b")):
151            raise OSError(errno.EACCES, os.strerror(errno.EACCES))
152
153        # Mimic the original method
154        yield from walk_orig(self, root)
155
156    with patch.object(DirectorySnapshot, "walk", new=walk):
157        # Should NOT raise an OSError (EACCES)
158        new_snapshot = DirectorySnapshot(p(""))
159
160    diff = DirectorySnapshotDiff(ref, new_snapshot)
161    assert repr(diff)
162
163    # Children of a/b/ are no more accessible and so removed in the new snapshot
164    assert diff.dirs_deleted == [(p("a", "b", "c"))]
165
166
167def test_ignore_device(p):
168    # Create a file and take a snapshot.
169    touch(p("file"))
170    ref = DirectorySnapshot(p(""))
171    wait()
172
173    inode_orig = DirectorySnapshot.inode
174
175    inode_times = 0
176
177    def inode(self, path):
178        # This function will always return a different device_id,
179        # even for the same file.
180        nonlocal inode_times
181        result = inode_orig(self, path)
182        inode_times += 1
183        return result[0], result[1] + inode_times
184
185    # Set the custom inode function.
186    with patch.object(DirectorySnapshot, "inode", new=inode):
187        # If we make the diff of the same directory, since by default the
188        # DirectorySnapshotDiff compares the snapshots using the device_id (and it will
189        # be different), it thinks that the same file has been deleted and created again.
190        snapshot = DirectorySnapshot(p(""))
191        diff_with_device = DirectorySnapshotDiff(ref, snapshot)
192        assert diff_with_device.files_deleted == [(p("file"))]
193        assert diff_with_device.files_created == [(p("file"))]
194
195        # Otherwise, if we choose to ignore the device, the file will not be detected as
196        # deleted and re-created.
197        snapshot = DirectorySnapshot(p(""))
198        diff_without_device = DirectorySnapshotDiff(ref, snapshot, ignore_device=True)
199        assert diff_without_device.files_deleted == []
200        assert diff_without_device.files_created == []
201
202
203def test_empty_snapshot(p):
204    # Create a file and declare a DirectorySnapshot and a EmptyDirectorySnapshot.
205    # When we make the diff, although both objects were declared with the same items on
206    # the directory, the file and directories created BEFORE the DirectorySnapshot will
207    # be detected as newly created.
208
209    touch(p("a"))
210    mkdir(p("b", "c"), parents=True)
211    ref = DirectorySnapshot(p(""))
212    empty = EmptyDirectorySnapshot()
213
214    diff = DirectorySnapshotDiff(empty, ref)
215    assert diff.files_created == [p("a")]
216    assert sorted(diff.dirs_created) == sorted([p(""), p("b"), p("b", "c")])
217