1# Licensed under the Apache License, Version 2.0 (the "License");
2# you may not use this file except in compliance with the License.
3# You may obtain a copy of the License at
4#
5#      http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS,
9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10# See the License for the specific language governing permissions and
11# limitations under the License.
12
13"""A fake implementation for the `scandir` function working with
14FakeFilesystem.
15Works with both the function integrated into the `os` module since Python 3.5
16and the standalone function available in the standalone `scandir` python
17package.
18"""
19import os
20import sys
21
22from pyfakefs.extra_packages import use_scandir_package
23from pyfakefs.helpers import to_string, make_string_path
24
25if sys.version_info >= (3, 6):
26    BaseClass = os.PathLike
27else:
28    BaseClass = object
29
30
31class DirEntry(BaseClass):
32    """Emulates os.DirEntry. Note that we did not enforce keyword only
33    arguments."""
34
35    def __init__(self, filesystem):
36        """Initialize the dir entry with unset values.
37
38        Args:
39            filesystem: the fake filesystem used for implementation.
40        """
41        self._filesystem = filesystem
42        self.name = ""
43        self.path = ""
44        self._abspath = ""
45        self._inode = None
46        self._islink = False
47        self._isdir = False
48        self._statresult = None
49        self._statresult_symlink = None
50
51    def inode(self):
52        """Return the inode number of the entry."""
53        if self._inode is None:
54            self.stat(follow_symlinks=False)
55        return self._inode
56
57    def is_dir(self, follow_symlinks=True):
58        """Return True if this entry is a directory entry.
59
60        Args:
61            follow_symlinks: If True, also return True if this entry is a
62                symlink pointing to a directory.
63
64        Returns:
65            True if this entry is an existing directory entry, or if
66                follow_symlinks is set, and this entry points to an existing
67                directory entry.
68        """
69        return self._isdir and (follow_symlinks or not self._islink)
70
71    def is_file(self, follow_symlinks=True):
72        """Return True if this entry is a regular file entry.
73
74        Args:
75            follow_symlinks: If True, also return True if this entry is a
76                symlink pointing to a regular file.
77
78        Returns:
79            True if this entry is an existing file entry, or if
80                follow_symlinks is set, and this entry points to an existing
81                file entry.
82        """
83        return not self._isdir and (follow_symlinks or not self._islink)
84
85    def is_symlink(self):
86        """Return True if this entry is a symbolic link (even if broken)."""
87        return self._islink
88
89    def stat(self, follow_symlinks=True):
90        """Return a stat_result object for this entry.
91
92        Args:
93            follow_symlinks: If False and the entry is a symlink, return the
94                result for the symlink, otherwise for the object it points to.
95        """
96        if follow_symlinks:
97            if self._statresult_symlink is None:
98                file_object = self._filesystem.resolve(self._abspath)
99                self._statresult_symlink = file_object.stat_result.copy()
100                if self._filesystem.is_windows_fs:
101                    self._statresult_symlink.st_nlink = 0
102            return self._statresult_symlink
103
104        if self._statresult is None:
105            file_object = self._filesystem.lresolve(self._abspath)
106            self._inode = file_object.st_ino
107            self._statresult = file_object.stat_result.copy()
108            if self._filesystem.is_windows_fs:
109                self._statresult.st_nlink = 0
110        return self._statresult
111
112    if sys.version_info >= (3, 6):
113
114        def __fspath__(self):
115            return self.path
116
117    if sys.version_info >= (3, 12):
118
119        def is_junction(self) -> bool:
120            """Return True if this entry is a junction.
121            Junctions are not a part of posix semantic."""
122            if not self._filesystem.is_windows_fs:
123                return False
124            file_object = self._filesystem.resolve(self._abspath)
125            return file_object.is_junction
126
127
128class ScanDirIter:
129    """Iterator for DirEntry objects returned from `scandir()`
130    function."""
131
132    def __init__(self, filesystem, path):
133        self.filesystem = filesystem
134        if isinstance(path, int):
135            if not use_scandir_package and (
136                sys.version_info < (3, 7) or self.filesystem.is_windows_fs
137            ):
138                raise NotImplementedError(
139                    "scandir does not support file descriptor " "path argument"
140                )
141            self.abspath = self.filesystem.absnormpath(
142                self.filesystem.get_open_file(path).get_object().path
143            )
144            self.path = ""
145        else:
146            path = make_string_path(path)
147            self.abspath = self.filesystem.absnormpath(path)
148            self.path = to_string(path)
149        entries = self.filesystem.confirmdir(self.abspath).entries
150        self.entry_iter = iter(entries)
151
152    def __iter__(self):
153        return self
154
155    def __next__(self):
156        entry = self.entry_iter.__next__()
157        dir_entry = DirEntry(self.filesystem)
158        dir_entry.name = entry
159        dir_entry.path = self.filesystem.joinpaths(self.path, dir_entry.name)
160        dir_entry._abspath = self.filesystem.joinpaths(self.abspath, dir_entry.name)
161        dir_entry._isdir = self.filesystem.isdir(dir_entry._abspath)
162        dir_entry._islink = self.filesystem.islink(dir_entry._abspath)
163        return dir_entry
164
165    if sys.version_info >= (3, 6):
166
167        def __enter__(self):
168            return self
169
170        def __exit__(self, exc_type, exc_val, exc_tb):
171            self.close()
172
173        def close(self):
174            pass
175
176
177def scandir(filesystem, path=""):
178    """Return an iterator of DirEntry objects corresponding to the entries
179    in the directory given by path.
180
181    Args:
182        filesystem: The fake filesystem used for implementation
183        path: Path to the target directory within the fake filesystem.
184
185    Returns:
186        an iterator to an unsorted list of os.DirEntry objects for
187        each entry in path.
188
189    Raises:
190        OSError: if the target is not a directory.
191    """
192    return ScanDirIter(filesystem, path)
193
194
195def _classify_directory_contents(filesystem, root):
196    """Classify contents of a directory as files/directories.
197
198    Args:
199        filesystem: The fake filesystem used for implementation
200        root: (str) Directory to examine.
201
202    Returns:
203        (tuple) A tuple consisting of three values: the directory examined,
204        a list containing all of the directory entries, and a list
205        containing all of the non-directory entries.
206        (This is the same format as returned by the `os.walk` generator.)
207
208    Raises:
209        Nothing on its own, but be ready to catch exceptions generated by
210        underlying mechanisms like `os.listdir`.
211    """
212    dirs = []
213    files = []
214    for entry in filesystem.listdir(root):
215        if filesystem.isdir(filesystem.joinpaths(root, entry)):
216            dirs.append(entry)
217        else:
218            files.append(entry)
219    return root, dirs, files
220
221
222def walk(filesystem, top, topdown=True, onerror=None, followlinks=False):
223    """Perform an os.walk operation over the fake filesystem.
224
225    Args:
226        filesystem: The fake filesystem used for implementation
227        top: The root directory from which to begin walk.
228        topdown: Determines whether to return the tuples with the root as
229            the first entry (`True`) or as the last, after all the child
230            directory tuples (`False`).
231      onerror: If not `None`, function which will be called to handle the
232            `os.error` instance provided when `os.listdir()` fails.
233      followlinks: If `True`, symbolic links are followed.
234
235    Yields:
236        (path, directories, nondirectories) for top and each of its
237        subdirectories.  See the documentation for the builtin os module
238        for further details.
239    """
240
241    def do_walk(top_dir, top_most=False):
242        if not top_most and not followlinks and filesystem.islink(top_dir):
243            return
244        try:
245            top_contents = _classify_directory_contents(filesystem, top_dir)
246        except OSError as exc:
247            top_contents = None
248            if onerror is not None:
249                onerror(exc)
250
251        if top_contents is not None:
252            if topdown:
253                yield top_contents
254
255            for directory in top_contents[1]:
256                path = filesystem.joinpaths(top_dir, directory)
257                if not followlinks and filesystem.islink(path):
258                    continue
259                for contents in do_walk(path):
260                    yield contents
261            if not topdown:
262                yield top_contents
263
264    return do_walk(to_string(top), top_most=True)
265
266
267class FakeScanDirModule:
268    """Uses FakeFilesystem to provide a fake `scandir` module replacement.
269
270    .. Note:: The ``scandir`` function is a part of the standard ``os`` module
271      since Python 3.5. This class handles the separate ``scandir`` module
272      that is available on pypi.
273
274    You need a fake_filesystem to use this:
275    `filesystem = fake_filesystem.FakeFilesystem()`
276    `fake_scandir_module = fake_filesystem.FakeScanDirModule(filesystem)`
277    """
278
279    @staticmethod
280    def dir():
281        """Return the list of patched function names. Used for patching
282        functions imported from the module.
283        """
284        return "scandir", "walk"
285
286    def __init__(self, filesystem):
287        self.filesystem = filesystem
288
289    def scandir(self, path="."):
290        """Return an iterator of DirEntry objects corresponding to the entries
291        in the directory given by path.
292
293        Args:
294            path: Path to the target directory within the fake filesystem.
295
296        Returns:
297            an iterator to an unsorted list of os.DirEntry objects for
298            each entry in path.
299
300        Raises:
301            OSError: if the target is not a directory.
302        """
303        return scandir(self.filesystem, path)
304
305    def walk(self, top, topdown=True, onerror=None, followlinks=False):
306        """Perform a walk operation over the fake filesystem.
307
308        Args:
309            top: The root directory from which to begin walk.
310            topdown: Determines whether to return the tuples with the root as
311                the first entry (`True`) or as the last, after all the child
312                directory tuples (`False`).
313          onerror: If not `None`, function which will be called to handle the
314                `os.error` instance provided when `os.listdir()` fails.
315          followlinks: If `True`, symbolic links are followed.
316
317        Yields:
318            (path, directories, nondirectories) for top and each of its
319            subdirectories.  See the documentation for the builtin os module
320            for further details.
321        """
322        return walk(self.filesystem, top, topdown, onerror, followlinks)
323