1# Copyright 2009 Google Inc. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Fake implementations for different file objects.
16"""
17import errno
18import io
19import locale
20import os
21import sys
22from stat import (
23    S_IFREG,
24    S_IFDIR,
25)
26from types import TracebackType
27from typing import (
28    List,
29    Optional,
30    Callable,
31    Union,
32    Any,
33    Dict,
34    cast,
35    AnyStr,
36    NoReturn,
37    Iterator,
38    TextIO,
39    Type,
40    TYPE_CHECKING,
41)
42
43from pyfakefs import helpers
44from pyfakefs.helpers import (
45    FakeStatResult,
46    BinaryBufferIO,
47    TextBufferIO,
48    is_int_type,
49    is_unicode_string,
50    to_string,
51    matching_string,
52    real_encoding,
53    AnyPath,
54    AnyString,
55)
56
57if TYPE_CHECKING:
58    from pyfakefs.fake_filesystem import FakeFilesystem
59
60AnyFileWrapper = Union[
61    "FakeFileWrapper",
62    "FakeDirWrapper",
63    "StandardStreamWrapper",
64    "FakePipeWrapper",
65]
66AnyFile = Union["FakeFile", "FakeDirectory"]
67
68
69class FakeLargeFileIoException(Exception):
70    """Exception thrown on unsupported operations for fake large files.
71    Fake large files have a size with no real content.
72    """
73
74    def __init__(self, file_path: str) -> None:
75        super(FakeLargeFileIoException, self).__init__(
76            "Read and write operations not supported for "
77            "fake large file: %s" % file_path
78        )
79
80
81class FakeFile:
82    """Provides the appearance of a real file.
83
84    Attributes currently faked out:
85      * `st_mode`: user-specified, otherwise S_IFREG
86      * `st_ctime`: the time.time() timestamp of the file change time (updated
87        each time a file's attributes is modified).
88      * `st_atime`: the time.time() timestamp when the file was last accessed.
89      * `st_mtime`: the time.time() timestamp when the file was last modified.
90      * `st_size`: the size of the file
91      * `st_nlink`: the number of hard links to the file
92      * `st_ino`: the inode number - a unique number identifying the file
93      * `st_dev`: a unique number identifying the (fake) file system device
94        the file belongs to
95      * `st_uid`: always set to USER_ID, which can be changed globally using
96            `set_uid`
97      * `st_gid`: always set to GROUP_ID, which can be changed globally using
98            `set_gid`
99
100    .. note:: The resolution for `st_ctime`, `st_mtime` and `st_atime` in the
101        real file system depends on the used file system (for example it is
102        only 1s for HFS+ and older Linux file systems, but much higher for
103        ext4 and NTFS). This is currently ignored by pyfakefs, which uses
104        the resolution of `time.time()`.
105
106        Under Windows, `st_atime` is not updated for performance reasons by
107        default. pyfakefs never updates `st_atime` under Windows, assuming
108        the default setting.
109    """
110
111    stat_types = (
112        "st_mode",
113        "st_ino",
114        "st_dev",
115        "st_nlink",
116        "st_uid",
117        "st_gid",
118        "st_size",
119        "st_atime",
120        "st_mtime",
121        "st_ctime",
122        "st_atime_ns",
123        "st_mtime_ns",
124        "st_ctime_ns",
125    )
126
127    def __init__(
128        self,
129        name: AnyStr,
130        st_mode: int = S_IFREG | helpers.PERM_DEF_FILE,
131        contents: Optional[AnyStr] = None,
132        filesystem: Optional["FakeFilesystem"] = None,
133        encoding: Optional[str] = None,
134        errors: Optional[str] = None,
135        side_effect: Optional[Callable[["FakeFile"], None]] = None,
136    ):
137        """
138        Args:
139            name: Name of the file/directory, without parent path information
140            st_mode: The stat.S_IF* constant representing the file type (i.e.
141                stat.S_IFREG, stat.S_IFDIR), and the file permissions.
142                If no file type is set (e.g. permission flags only), a
143                regular file type is assumed.
144            contents: The contents of the filesystem object; should be a string
145                or byte object for regular files, and a dict of other
146                FakeFile or FakeDirectory objects with the file names as
147                keys for FakeDirectory objects
148            filesystem: The fake filesystem where the file is created.
149            encoding: If contents is a unicode string, the encoding used
150                for serialization.
151            errors: The error mode used for encoding/decoding errors.
152            side_effect: function handle that is executed when file is written,
153                must accept the file object as an argument.
154        """
155        # to be backwards compatible regarding argument order, we raise on None
156        if filesystem is None:
157            raise ValueError("filesystem shall not be None")
158        self.filesystem: "FakeFilesystem" = filesystem
159        self._side_effect: Optional[Callable] = side_effect
160        self.name: AnyStr = name  # type: ignore[assignment]
161        self.stat_result = FakeStatResult(
162            filesystem.is_windows_fs,
163            helpers.get_uid(),
164            helpers.get_gid(),
165            helpers.now(),
166        )
167        if st_mode >> 12 == 0:
168            st_mode |= S_IFREG
169        self.stat_result.st_mode = st_mode
170        self.st_size: int = 0
171        self.encoding: Optional[str] = real_encoding(encoding)
172        self.errors: str = errors or "strict"
173        self._byte_contents: Optional[bytes] = self._encode_contents(contents)
174        self.stat_result.st_size = (
175            len(self._byte_contents) if self._byte_contents is not None else 0
176        )
177        self.epoch: int = 0
178        self.parent_dir: Optional[FakeDirectory] = None
179        # Linux specific: extended file system attributes
180        self.xattr: Dict = {}
181        self.opened_as: AnyString = ""
182
183    @property
184    def byte_contents(self) -> Optional[bytes]:
185        """Return the contents as raw byte array."""
186        return self._byte_contents
187
188    @property
189    def contents(self) -> Optional[str]:
190        """Return the contents as string with the original encoding."""
191        if isinstance(self.byte_contents, bytes):
192            return self.byte_contents.decode(
193                self.encoding or locale.getpreferredencoding(False),
194                errors=self.errors,
195            )
196        return None
197
198    @property
199    def st_ctime(self) -> float:
200        """Return the creation time of the fake file."""
201        return self.stat_result.st_ctime
202
203    @st_ctime.setter
204    def st_ctime(self, val: float) -> None:
205        """Set the creation time of the fake file."""
206        self.stat_result.st_ctime = val
207
208    @property
209    def st_atime(self) -> float:
210        """Return the access time of the fake file."""
211        return self.stat_result.st_atime
212
213    @st_atime.setter
214    def st_atime(self, val: float) -> None:
215        """Set the access time of the fake file."""
216        self.stat_result.st_atime = val
217
218    @property
219    def st_mtime(self) -> float:
220        """Return the modification time of the fake file."""
221        return self.stat_result.st_mtime
222
223    @st_mtime.setter
224    def st_mtime(self, val: float) -> None:
225        """Set the modification time of the fake file."""
226        self.stat_result.st_mtime = val
227
228    def set_large_file_size(self, st_size: int) -> None:
229        """Sets the self.st_size attribute and replaces self.content with None.
230
231        Provided specifically to simulate very large files without regards
232        to their content (which wouldn't fit in memory).
233        Note that read/write operations with such a file raise
234            :py:class:`FakeLargeFileIoException`.
235
236        Args:
237          st_size: (int) The desired file size
238
239        Raises:
240          OSError: if the st_size is not a non-negative integer,
241                   or if st_size exceeds the available file system space
242        """
243        self._check_positive_int(st_size)
244        if self.st_size:
245            self.size = 0
246        if self.filesystem:
247            self.filesystem.change_disk_usage(st_size, self.name, self.st_dev)
248        self.st_size = st_size
249        self._byte_contents = None
250
251    def _check_positive_int(self, size: int) -> None:
252        # the size should be an positive integer value
253        if not is_int_type(size) or size < 0:
254            self.filesystem.raise_os_error(errno.ENOSPC, self.name)
255
256    def is_large_file(self) -> bool:
257        """Return `True` if this file was initialized with size
258        but no contents.
259        """
260        return self._byte_contents is None
261
262    def _encode_contents(self, contents: Union[str, bytes, None]) -> Optional[bytes]:
263        if is_unicode_string(contents):
264            contents = bytes(
265                cast(str, contents),
266                self.encoding or locale.getpreferredencoding(False),
267                self.errors,
268            )
269        return cast(bytes, contents)
270
271    def set_initial_contents(self, contents: AnyStr) -> bool:
272        """Sets the file contents and size.
273           Called internally after initial file creation.
274
275        Args:
276            contents: string, new content of file.
277
278        Returns:
279            True if the contents have been changed.
280
281        Raises:
282              OSError: if the st_size is not a non-negative integer,
283                   or if st_size exceeds the available file system space
284        """
285        byte_contents = self._encode_contents(contents)
286        changed = self._byte_contents != byte_contents
287        st_size = len(byte_contents) if byte_contents else 0
288
289        current_size = self.st_size or 0
290        self.filesystem.change_disk_usage(
291            st_size - current_size, self.name, self.st_dev
292        )
293        self._byte_contents = byte_contents
294        self.st_size = st_size
295        self.epoch += 1
296        return changed
297
298    def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool:
299        """Sets the file contents and size and increases the modification time.
300        Also executes the side_effects if available.
301
302        Args:
303          contents: (str, bytes) new content of file.
304          encoding: (str) the encoding to be used for writing the contents
305                    if they are a unicode string.
306                    If not given, the locale preferred encoding is used.
307
308        Returns:
309            True if the contents have been changed.
310
311        Raises:
312          OSError: if `st_size` is not a non-negative integer,
313                   or if it exceeds the available file system space.
314        """
315        self.encoding = real_encoding(encoding)
316        changed = self.set_initial_contents(contents)
317        if self._side_effect is not None:
318            self._side_effect(self)
319        return changed
320
321    @property
322    def size(self) -> int:
323        """Return the size in bytes of the file contents."""
324        return self.st_size
325
326    @size.setter
327    def size(self, st_size: int) -> None:
328        """Resizes file content, padding with nulls if new size exceeds the
329        old size.
330
331        Args:
332          st_size: The desired size for the file.
333
334        Raises:
335          OSError: if the st_size arg is not a non-negative integer
336                   or if st_size exceeds the available file system space
337        """
338
339        self._check_positive_int(st_size)
340        current_size = self.st_size or 0
341        self.filesystem.change_disk_usage(
342            st_size - current_size, self.name, self.st_dev
343        )
344        if self._byte_contents:
345            if st_size < current_size:
346                self._byte_contents = self._byte_contents[:st_size]
347            else:
348                self._byte_contents += b"\0" * (st_size - current_size)
349        self.st_size = st_size
350        self.epoch += 1
351
352    @property
353    def path(self) -> AnyStr:
354        """Return the full path of the current object."""
355        names: List[AnyStr] = []  # pytype: disable=invalid-annotation
356        obj: Optional[FakeFile] = self
357        while obj:
358            names.insert(0, matching_string(self.name, obj.name))  # type: ignore
359            obj = obj.parent_dir
360        sep = self.filesystem.get_path_separator(names[0])
361        if names[0] == sep:
362            names.pop(0)
363            dir_path = sep.join(names)
364            drive = self.filesystem.splitdrive(dir_path)[0]
365            # if a Windows path already starts with a drive or UNC path,
366            # no extra separator is needed
367            if not drive:
368                dir_path = sep + dir_path
369        else:
370            dir_path = sep.join(names)
371        return self.filesystem.absnormpath(dir_path)
372
373    if sys.version_info >= (3, 12):
374
375        @property
376        def is_junction(self) -> bool:
377            return self.filesystem.isjunction(self.path)
378
379    def __getattr__(self, item: str) -> Any:
380        """Forward some properties to stat_result."""
381        if item in self.stat_types:
382            return getattr(self.stat_result, item)
383        return super().__getattribute__(item)
384
385    def __setattr__(self, key: str, value: Any) -> None:
386        """Forward some properties to stat_result."""
387        if key in self.stat_types:
388            return setattr(self.stat_result, key, value)
389        return super().__setattr__(key, value)
390
391    def __str__(self) -> str:
392        return "%r(%o)" % (self.name, self.st_mode)
393
394
395class FakeNullFile(FakeFile):
396    def __init__(self, filesystem: "FakeFilesystem") -> None:
397        devnull = "nul" if filesystem.is_windows_fs else "/dev/null"
398        super(FakeNullFile, self).__init__(devnull, filesystem=filesystem, contents="")
399
400    @property
401    def byte_contents(self) -> bytes:
402        return b""
403
404    def set_initial_contents(self, contents: AnyStr) -> bool:
405        return False
406
407
408class FakeFileFromRealFile(FakeFile):
409    """Represents a fake file copied from the real file system.
410
411    The contents of the file are read on demand only.
412    """
413
414    def __init__(
415        self,
416        file_path: str,
417        filesystem: "FakeFilesystem",
418        side_effect: Optional[Callable] = None,
419    ) -> None:
420        """
421        Args:
422            file_path: Path to the existing file.
423            filesystem: The fake filesystem where the file is created.
424
425        Raises:
426            OSError: if the file does not exist in the real file system.
427            OSError: if the file already exists in the fake file system.
428        """
429        super().__init__(
430            name=os.path.basename(file_path),
431            filesystem=filesystem,
432            side_effect=side_effect,
433        )
434        self.contents_read = False
435
436    @property
437    def byte_contents(self) -> Optional[bytes]:
438        if not self.contents_read:
439            self.contents_read = True
440            with io.open(self.file_path, "rb") as f:
441                self._byte_contents = f.read()
442        # On MacOS and BSD, the above io.open() updates atime on the real file
443        self.st_atime = os.stat(self.file_path).st_atime
444        return self._byte_contents
445
446    def set_contents(self, contents, encoding=None):
447        self.contents_read = True
448        super(FakeFileFromRealFile, self).set_contents(contents, encoding)
449
450    def is_large_file(self):
451        """The contents are never faked."""
452        return False
453
454
455class FakeDirectory(FakeFile):
456    """Provides the appearance of a real directory."""
457
458    def __init__(
459        self,
460        name: str,
461        perm_bits: int = helpers.PERM_DEF,
462        filesystem: Optional["FakeFilesystem"] = None,
463    ):
464        """
465        Args:
466            name:  name of the file/directory, without parent path information
467            perm_bits: permission bits. defaults to 0o777.
468            filesystem: if set, the fake filesystem where the directory
469                is created
470        """
471        FakeFile.__init__(self, name, S_IFDIR | perm_bits, "", filesystem=filesystem)
472        # directories have the link count of contained entries,
473        # including '.' and '..'
474        self.st_nlink += 1
475        self._entries: Dict[str, AnyFile] = {}
476
477    def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool:
478        raise self.filesystem.raise_os_error(errno.EISDIR, self.path)
479
480    @property
481    def entries(self) -> Dict[str, FakeFile]:
482        """Return the list of contained directory entries."""
483        return self._entries
484
485    @property
486    def ordered_dirs(self) -> List[str]:
487        """Return the list of contained directory entry names ordered by
488        creation order.
489        """
490        return [
491            item[0]
492            for item in sorted(self._entries.items(), key=lambda entry: entry[1].st_ino)
493        ]
494
495    def add_entry(self, path_object: FakeFile) -> None:
496        """Adds a child FakeFile to this directory.
497
498        Args:
499            path_object: FakeFile instance to add as a child of this directory.
500
501        Raises:
502            OSError: if the directory has no write permission (Posix only)
503            OSError: if the file or directory to be added already exists
504        """
505        if (
506            not helpers.is_root()
507            and not self.st_mode & helpers.PERM_WRITE
508            and not self.filesystem.is_windows_fs
509        ):
510            raise OSError(errno.EACCES, "Permission Denied", self.path)
511
512        path_object_name: str = to_string(path_object.name)
513        if path_object_name in self.entries:
514            self.filesystem.raise_os_error(errno.EEXIST, self.path)
515
516        self._entries[path_object_name] = path_object
517        path_object.parent_dir = self
518        if path_object.st_ino is None:
519            self.filesystem.last_ino += 1
520            path_object.st_ino = self.filesystem.last_ino
521        self.st_nlink += 1
522        path_object.st_nlink += 1
523        path_object.st_dev = self.st_dev
524        if path_object.st_nlink == 1:
525            self.filesystem.change_disk_usage(
526                path_object.size, path_object.name, self.st_dev
527            )
528
529    def get_entry(self, pathname_name: str) -> AnyFile:
530        """Retrieves the specified child file or directory entry.
531
532        Args:
533            pathname_name: The basename of the child object to retrieve.
534
535        Returns:
536            The fake file or directory object.
537
538        Raises:
539            KeyError: if no child exists by the specified name.
540        """
541        pathname_name = self._normalized_entryname(pathname_name)
542        return self.entries[to_string(pathname_name)]
543
544    def _normalized_entryname(self, pathname_name: str) -> str:
545        if not self.filesystem.is_case_sensitive:
546            matching_names = [
547                name for name in self.entries if name.lower() == pathname_name.lower()
548            ]
549            if matching_names:
550                pathname_name = matching_names[0]
551        return pathname_name
552
553    def remove_entry(self, pathname_name: str, recursive: bool = True) -> None:
554        """Removes the specified child file or directory.
555
556        Args:
557            pathname_name: Basename of the child object to remove.
558            recursive: If True (default), the entries in contained directories
559                are deleted first. Used to propagate removal errors
560                (e.g. permission problems) from contained entries.
561
562        Raises:
563            KeyError: if no child exists by the specified name.
564            OSError: if user lacks permission to delete the file,
565                or (Windows only) the file is open.
566        """
567        pathname_name = self._normalized_entryname(pathname_name)
568        entry = self.get_entry(pathname_name)
569        if self.filesystem.is_windows_fs:
570            if entry.st_mode & helpers.PERM_WRITE == 0:
571                self.filesystem.raise_os_error(errno.EACCES, pathname_name)
572            if self.filesystem.has_open_file(entry):
573                self.filesystem.raise_os_error(errno.EACCES, pathname_name)
574        else:
575            if not helpers.is_root() and (
576                self.st_mode & (helpers.PERM_WRITE | helpers.PERM_EXE)
577                != helpers.PERM_WRITE | helpers.PERM_EXE
578            ):
579                self.filesystem.raise_os_error(errno.EACCES, pathname_name)
580
581        if recursive and isinstance(entry, FakeDirectory):
582            while entry.entries:
583                entry.remove_entry(list(entry.entries)[0])
584        elif entry.st_nlink == 1:
585            self.filesystem.change_disk_usage(-entry.size, pathname_name, entry.st_dev)
586
587        self.st_nlink -= 1
588        entry.st_nlink -= 1
589        assert entry.st_nlink >= 0
590
591        del self.entries[to_string(pathname_name)]
592
593    @property
594    def size(self) -> int:
595        """Return the total size of all files contained
596        in this directory tree.
597        """
598        return sum([item[1].size for item in self.entries.items()])
599
600    @size.setter
601    def size(self, st_size: int) -> None:
602        """Setting the size is an error for a directory."""
603        raise self.filesystem.raise_os_error(errno.EISDIR, self.path)
604
605    def has_parent_object(self, dir_object: "FakeDirectory") -> bool:
606        """Return `True` if dir_object is a direct or indirect parent
607        directory, or if both are the same object."""
608        obj: Optional[FakeDirectory] = self
609        while obj:
610            if obj == dir_object:
611                return True
612            obj = obj.parent_dir
613        return False
614
615    def __str__(self) -> str:
616        description = super(FakeDirectory, self).__str__() + ":\n"
617        for item in self.entries:
618            item_desc = self.entries[item].__str__()
619            for line in item_desc.split("\n"):
620                if line:
621                    description = description + "  " + line + "\n"
622        return description
623
624
625class FakeDirectoryFromRealDirectory(FakeDirectory):
626    """Represents a fake directory copied from the real file system.
627
628    The contents of the directory are read on demand only.
629    """
630
631    def __init__(
632        self,
633        source_path: AnyPath,
634        filesystem: "FakeFilesystem",
635        read_only: bool,
636        target_path: Optional[AnyPath] = None,
637    ):
638        """
639        Args:
640            source_path: Full directory path.
641            filesystem: The fake filesystem where the directory is created.
642            read_only: If set, all files under the directory are treated
643                as read-only, e.g. a write access raises an exception;
644                otherwise, writing to the files changes the fake files
645                only as usually.
646            target_path: If given, the target path of the directory,
647                otherwise the target is the same as `source_path`.
648
649        Raises:
650            OSError: if the directory does not exist in the real file system
651        """
652        target_path = target_path or source_path
653        real_stat = os.stat(source_path)
654        super(FakeDirectoryFromRealDirectory, self).__init__(
655            name=to_string(os.path.split(target_path)[1]),
656            perm_bits=real_stat.st_mode,
657            filesystem=filesystem,
658        )
659
660        self.st_ctime = real_stat.st_ctime
661        self.st_atime = real_stat.st_atime
662        self.st_mtime = real_stat.st_mtime
663        self.st_gid = real_stat.st_gid
664        self.st_uid = real_stat.st_uid
665        self.source_path = source_path  # type: ignore
666        self.read_only = read_only
667        self.contents_read = False
668
669    @property
670    def entries(self) -> Dict[str, FakeFile]:
671        """Return the list of contained directory entries, loading them
672        if not already loaded."""
673        if not self.contents_read:
674            self.contents_read = True
675            base = self.path
676            for entry in os.listdir(self.source_path):
677                source_path = os.path.join(self.source_path, entry)
678                target_path = os.path.join(base, entry)  # type: ignore
679                if os.path.islink(source_path):
680                    self.filesystem.add_real_symlink(source_path, target_path)
681                elif os.path.isdir(source_path):
682                    self.filesystem.add_real_directory(
683                        source_path, self.read_only, target_path=target_path
684                    )
685                else:
686                    self.filesystem.add_real_file(
687                        source_path, self.read_only, target_path=target_path
688                    )
689        return self._entries
690
691    @property
692    def size(self) -> int:
693        # we cannot get the size until the contents are loaded
694        if not self.contents_read:
695            return 0
696        return super(FakeDirectoryFromRealDirectory, self).size
697
698    @size.setter
699    def size(self, st_size: int) -> None:
700        raise self.filesystem.raise_os_error(errno.EISDIR, self.path)
701
702
703class FakeFileWrapper:
704    """Wrapper for a stream object for use by a FakeFile object.
705
706    If the wrapper has any data written to it, it will propagate to
707    the FakeFile object on close() or flush().
708    """
709
710    def __init__(
711        self,
712        file_object: FakeFile,
713        file_path: AnyStr,
714        update: bool,
715        read: bool,
716        append: bool,
717        delete_on_close: bool,
718        filesystem: "FakeFilesystem",
719        newline: Optional[str],
720        binary: bool,
721        closefd: bool,
722        encoding: Optional[str],
723        errors: Optional[str],
724        buffering: int,
725        raw_io: bool,
726        is_stream: bool = False,
727    ):
728        self.file_object = file_object
729        self.file_path = file_path  # type: ignore[var-annotated]
730        self._append = append
731        self._read = read
732        self.allow_update = update
733        self._closefd = closefd
734        self._file_epoch = file_object.epoch
735        self.raw_io = raw_io
736        self._binary = binary
737        self.is_stream = is_stream
738        self._changed = False
739        self._buffer_size = buffering
740        if self._buffer_size == 0 and not binary:
741            raise ValueError("can't have unbuffered text I/O")
742        # buffer_size is ignored in text mode
743        elif self._buffer_size == -1 or not binary:
744            self._buffer_size = io.DEFAULT_BUFFER_SIZE
745        self._use_line_buffer = not binary and buffering == 1
746
747        contents = file_object.byte_contents
748        self._encoding = encoding or locale.getpreferredencoding(False)
749        errors = errors or "strict"
750        self._io: Union[BinaryBufferIO, TextBufferIO] = (
751            BinaryBufferIO(contents)
752            if binary
753            else TextBufferIO(
754                contents, encoding=encoding, newline=newline, errors=errors
755            )
756        )
757        self._read_whence = 0
758        self._read_seek = 0
759        self._flush_pos = 0
760        if contents:
761            self._flush_pos = len(contents)
762            if update:
763                if not append:
764                    self._io.seek(0)
765                else:
766                    self._io.seek(self._flush_pos)
767                    self._read_seek = self._io.tell()
768
769        if delete_on_close:
770            assert filesystem, "delete_on_close=True requires filesystem"
771        self._filesystem = filesystem
772        self.delete_on_close = delete_on_close
773        # override, don't modify FakeFile.name, as FakeFilesystem expects
774        # it to be the file name only, no directories.
775        self.name = file_object.opened_as
776        self.filedes: Optional[int] = None
777
778    def __enter__(self) -> "FakeFileWrapper":
779        """To support usage of this fake file with the 'with' statement."""
780        return self
781
782    def __exit__(
783        self,
784        exc_type: Optional[Type[BaseException]],
785        exc_val: Optional[BaseException],
786        exc_tb: Optional[TracebackType],
787    ) -> None:
788        """To support usage of this fake file with the 'with' statement."""
789        self.close()
790
791    def _raise(self, message: str) -> NoReturn:
792        if self.raw_io:
793            self._filesystem.raise_os_error(errno.EBADF, self.file_path)
794        raise io.UnsupportedOperation(message)
795
796    def get_object(self) -> FakeFile:
797        """Return the FakeFile object that is wrapped
798        by the current instance.
799        """
800        return self.file_object
801
802    def fileno(self) -> int:
803        """Return the file descriptor of the file object."""
804        if self.filedes is not None:
805            return self.filedes
806        raise OSError(errno.EBADF, "Invalid file descriptor")
807
808    def close(self) -> None:
809        """Close the file."""
810        # ignore closing a closed file
811        if not self._is_open():
812            return
813
814        # for raw io, all writes are flushed immediately
815        if self.allow_update and not self.raw_io:
816            self.flush()
817            if self._filesystem.is_windows_fs and self._changed:
818                self.file_object.st_mtime = helpers.now()
819
820        assert self.filedes is not None
821        if self._closefd:
822            self._filesystem._close_open_file(self.filedes)
823        else:
824            open_files = self._filesystem.open_files[self.filedes]
825            assert open_files is not None
826            open_files.remove(self)
827        if self.delete_on_close:
828            self._filesystem.remove_object(
829                self.get_object().path  # type: ignore[arg-type]
830            )
831
832    @property
833    def closed(self) -> bool:
834        """Simulate the `closed` attribute on file."""
835        return not self._is_open()
836
837    def _try_flush(self, old_pos: int) -> None:
838        """Try to flush and reset the position if it fails."""
839        flush_pos = self._flush_pos
840        try:
841            self.flush()
842        except OSError:
843            # write failed - reset to previous position
844            self._io.seek(old_pos)
845            self._io.truncate()
846            self._flush_pos = flush_pos
847            raise
848
849    def flush(self) -> None:
850        """Flush file contents to 'disk'."""
851        self._check_open_file()
852        if self.allow_update and not self.is_stream:
853            contents = self._io.getvalue()
854            if self._append:
855                self._sync_io()
856                old_contents = self.file_object.byte_contents
857                assert old_contents is not None
858                contents = old_contents + contents[self._flush_pos :]
859                self._set_stream_contents(contents)
860            else:
861                self._io.flush()
862            changed = self.file_object.set_contents(contents, self._encoding)
863            self.update_flush_pos()
864            if changed:
865                if self._filesystem.is_windows_fs:
866                    self._changed = True
867                else:
868                    current_time = helpers.now()
869                    self.file_object.st_ctime = current_time
870                    self.file_object.st_mtime = current_time
871            self._file_epoch = self.file_object.epoch
872
873            if not self.is_stream:
874                self._flush_related_files()
875
876    def update_flush_pos(self) -> None:
877        self._flush_pos = self._io.tell()
878
879    def _flush_related_files(self) -> None:
880        for open_files in self._filesystem.open_files[3:]:
881            if open_files is not None:
882                for open_file in open_files:
883                    if (
884                        open_file is not self
885                        and isinstance(open_file, FakeFileWrapper)
886                        and self.file_object == open_file.file_object
887                        and not open_file._append
888                    ):
889                        open_file._sync_io()
890
891    def seek(self, offset: int, whence: int = 0) -> None:
892        """Move read/write pointer in 'file'."""
893        self._check_open_file()
894        if not self._append:
895            self._io.seek(offset, whence)
896        else:
897            self._read_seek = offset
898            self._read_whence = whence
899        if not self.is_stream:
900            self.flush()
901
902    def tell(self) -> int:
903        """Return the file's current position.
904
905        Returns:
906          int, file's current position in bytes.
907        """
908        self._check_open_file()
909        if not self.is_stream:
910            self.flush()
911
912        if not self._append:
913            return self._io.tell()
914        if self._read_whence:
915            write_seek = self._io.tell()
916            self._io.seek(self._read_seek, self._read_whence)
917            self._read_seek = self._io.tell()
918            self._read_whence = 0
919            self._io.seek(write_seek)
920        return self._read_seek
921
922    def _sync_io(self) -> None:
923        """Update the stream with changes to the file object contents."""
924        if self._file_epoch == self.file_object.epoch:
925            return
926
927        contents = self.file_object.byte_contents
928        assert contents is not None
929        self._set_stream_contents(contents)
930        self._file_epoch = self.file_object.epoch
931
932    def _set_stream_contents(self, contents: bytes) -> None:
933        whence = self._io.tell()
934        self._io.seek(0)
935        self._io.truncate()
936        self._io.putvalue(contents)
937        if not self._append:
938            self._io.seek(whence)
939
940    def _read_wrappers(self, name: str) -> Callable:
941        """Wrap a stream attribute in a read wrapper.
942
943        Returns a read_wrapper which tracks our own read pointer since the
944        stream object has no concept of a different read and write pointer.
945
946        Args:
947            name: The name of the attribute to wrap. Should be a read call.
948
949        Returns:
950            The read_wrapper function.
951        """
952        io_attr = getattr(self._io, name)
953
954        def read_wrapper(*args, **kwargs):
955            """Wrap all read calls to the stream object.
956
957            We do this to track the read pointer separate from the write
958            pointer.  Anything that wants to read from the stream object
959            while we're in append mode goes through this.
960
961            Args:
962                *args: pass through args
963                **kwargs: pass through kwargs
964            Returns:
965                Wrapped stream object method
966            """
967            self._io.seek(self._read_seek, self._read_whence)
968            ret_value = io_attr(*args, **kwargs)
969            self._read_seek = self._io.tell()
970            self._read_whence = 0
971            self._io.seek(0, 2)
972            return ret_value
973
974        return read_wrapper
975
976    def _other_wrapper(self, name: str) -> Callable:
977        """Wrap a stream attribute in an other_wrapper.
978
979        Args:
980          name: the name of the stream attribute to wrap.
981
982        Returns:
983          other_wrapper which is described below.
984        """
985        io_attr = getattr(self._io, name)
986
987        def other_wrapper(*args, **kwargs):
988            """Wrap all other calls to the stream Object.
989
990            We do this to track changes to the write pointer.  Anything that
991            moves the write pointer in a file open for appending should move
992            the read pointer as well.
993
994            Args:
995                *args: Pass through args.
996                **kwargs: Pass through kwargs.
997
998            Returns:
999                Wrapped stream object method.
1000            """
1001            write_seek = self._io.tell()
1002            ret_value = io_attr(*args, **kwargs)
1003            if write_seek != self._io.tell():
1004                self._read_seek = self._io.tell()
1005                self._read_whence = 0
1006
1007            return ret_value
1008
1009        return other_wrapper
1010
1011    def _write_wrapper(self, name: str) -> Callable:
1012        """Wrap a stream attribute in a write_wrapper.
1013
1014        Args:
1015          name: the name of the stream attribute to wrap.
1016
1017        Returns:
1018          write_wrapper which is described below.
1019        """
1020        io_attr = getattr(self._io, name)
1021
1022        def write_wrapper(*args, **kwargs):
1023            """Wrap all other calls to the stream Object.
1024
1025            We do this to track changes to the write pointer.  Anything that
1026            moves the write pointer in a file open for appending should move
1027            the read pointer as well.
1028
1029            Args:
1030                *args: Pass through args.
1031                **kwargs: Pass through kwargs.
1032
1033            Returns:
1034                Wrapped stream object method.
1035            """
1036            old_pos = self._io.tell()
1037            ret_value = io_attr(*args, **kwargs)
1038            new_pos = self._io.tell()
1039
1040            # if the buffer size is exceeded, we flush
1041            use_line_buf = self._use_line_buffer and "\n" in args[0]
1042            if new_pos - self._flush_pos > self._buffer_size or use_line_buf:
1043                flush_all = new_pos - old_pos > self._buffer_size or use_line_buf
1044                # if the current write does not exceed the buffer size,
1045                # we revert to the previous position and flush that,
1046                # otherwise we flush all
1047                if not flush_all:
1048                    self._io.seek(old_pos)
1049                    self._io.truncate()
1050                self._try_flush(old_pos)
1051                if not flush_all:
1052                    ret_value = io_attr(*args, **kwargs)
1053            if self._append:
1054                self._read_seek = self._io.tell()
1055                self._read_whence = 0
1056            return ret_value
1057
1058        return write_wrapper
1059
1060    def _adapt_size_for_related_files(self, size: int) -> None:
1061        for open_files in self._filesystem.open_files[3:]:
1062            if open_files is not None:
1063                for open_file in open_files:
1064                    if (
1065                        open_file is not self
1066                        and isinstance(open_file, FakeFileWrapper)
1067                        and self.file_object == open_file.file_object
1068                        and cast(FakeFileWrapper, open_file)._append
1069                    ):
1070                        open_file._read_seek += size
1071
1072    def _truncate_wrapper(self) -> Callable:
1073        """Wrap truncate() to allow flush after truncate.
1074
1075        Returns:
1076            Wrapper which is described below.
1077        """
1078        io_attr = self._io.truncate
1079
1080        def truncate_wrapper(*args, **kwargs):
1081            """Wrap truncate call to call flush after truncate."""
1082            if self._append:
1083                self._io.seek(self._read_seek, self._read_whence)
1084            size = io_attr(*args, **kwargs)
1085            self.flush()
1086            if not self.is_stream:
1087                self.file_object.size = size
1088                buffer_size = len(self._io.getvalue())
1089                if buffer_size < size:
1090                    self._io.seek(buffer_size)
1091                    self._io.putvalue(b"\0" * (size - buffer_size))
1092                    self.file_object.set_contents(self._io.getvalue(), self._encoding)
1093                    self._flush_pos = size
1094                    self._adapt_size_for_related_files(size - buffer_size)
1095
1096            self.flush()
1097            return size
1098
1099        return truncate_wrapper
1100
1101    def size(self) -> int:
1102        """Return the content size in bytes of the wrapped file."""
1103        return self.file_object.st_size
1104
1105    def __getattr__(self, name: str) -> Any:
1106        if self.file_object.is_large_file():
1107            raise FakeLargeFileIoException(self.file_path)
1108
1109        reading = name.startswith("read") or name == "next"
1110        truncate = name == "truncate"
1111        writing = name.startswith("write") or truncate
1112
1113        if reading or writing:
1114            self._check_open_file()
1115        if not self._read and reading:
1116            return self._read_error()
1117        if not self.allow_update and writing:
1118            return self._write_error()
1119
1120        if reading:
1121            self._sync_io()
1122            if not self.is_stream:
1123                self.flush()
1124            if not self._filesystem.is_windows_fs:
1125                self.file_object.st_atime = helpers.now()
1126        if truncate:
1127            return self._truncate_wrapper()
1128        if self._append:
1129            if reading:
1130                return self._read_wrappers(name)
1131            elif not writing:
1132                return self._other_wrapper(name)
1133        if writing:
1134            return self._write_wrapper(name)
1135
1136        return getattr(self._io, name)
1137
1138    def _read_error(self) -> Callable:
1139        def read_error(*args, **kwargs):
1140            """Throw an error unless the argument is zero."""
1141            if args and args[0] == 0:
1142                if self._filesystem.is_windows_fs and self.raw_io:
1143                    return b"" if self._binary else ""
1144            self._raise("File is not open for reading.")
1145
1146        return read_error
1147
1148    def _write_error(self) -> Callable:
1149        def write_error(*args, **kwargs):
1150            """Throw an error."""
1151            if self.raw_io:
1152                if self._filesystem.is_windows_fs and args and len(args[0]) == 0:
1153                    return 0
1154            self._raise("File is not open for writing.")
1155
1156        return write_error
1157
1158    def _is_open(self) -> bool:
1159        if self.filedes is not None and self.filedes < len(self._filesystem.open_files):
1160            open_files = self._filesystem.open_files[self.filedes]
1161            if open_files is not None and self in open_files:
1162                return True
1163        return False
1164
1165    def _check_open_file(self) -> None:
1166        if not self.is_stream and not self._is_open():
1167            raise ValueError("I/O operation on closed file")
1168
1169    def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]:
1170        if not self._read:
1171            self._raise("File is not open for reading")
1172        return self._io.__iter__()
1173
1174    def __next__(self):
1175        if not self._read:
1176            self._raise("File is not open for reading")
1177        return next(self._io)
1178
1179
1180class StandardStreamWrapper:
1181    """Wrapper for a system standard stream to be used in open files list."""
1182
1183    def __init__(self, stream_object: TextIO):
1184        self._stream_object = stream_object
1185        self.filedes: Optional[int] = None
1186
1187    def get_object(self) -> TextIO:
1188        return self._stream_object
1189
1190    def fileno(self) -> int:
1191        """Return the file descriptor of the wrapped standard stream."""
1192        if self.filedes is not None:
1193            return self.filedes
1194        raise OSError(errno.EBADF, "Invalid file descriptor")
1195
1196    def read(self, n: int = -1) -> bytes:
1197        return cast(bytes, self._stream_object.read())
1198
1199    def close(self) -> None:
1200        """We do not support closing standard streams."""
1201
1202    def is_stream(self) -> bool:
1203        return True
1204
1205
1206class FakeDirWrapper:
1207    """Wrapper for a FakeDirectory object to be used in open files list."""
1208
1209    def __init__(
1210        self,
1211        file_object: FakeDirectory,
1212        file_path: AnyString,
1213        filesystem: "FakeFilesystem",
1214    ):
1215        self.file_object = file_object
1216        self.file_path = file_path
1217        self._filesystem = filesystem
1218        self.filedes: Optional[int] = None
1219
1220    def get_object(self) -> FakeDirectory:
1221        """Return the FakeFile object that is wrapped by the current
1222        instance."""
1223        return self.file_object
1224
1225    def fileno(self) -> int:
1226        """Return the file descriptor of the file object."""
1227        if self.filedes is not None:
1228            return self.filedes
1229        raise OSError(errno.EBADF, "Invalid file descriptor")
1230
1231    def close(self) -> None:
1232        """Close the directory."""
1233        assert self.filedes is not None
1234        self._filesystem._close_open_file(self.filedes)
1235
1236
1237class FakePipeWrapper:
1238    """Wrapper for a read or write descriptor of a real pipe object to be
1239    used in open files list.
1240    """
1241
1242    def __init__(
1243        self,
1244        filesystem: "FakeFilesystem",
1245        fd: int,
1246        can_write: bool,
1247        mode: str = "",
1248    ):
1249        self._filesystem = filesystem
1250        self.fd = fd  # the real file descriptor
1251        self.can_write = can_write
1252        self.file_object = None
1253        self.filedes: Optional[int] = None
1254        self.real_file = None
1255        if mode:
1256            self.real_file = open(fd, mode)
1257
1258    def __enter__(self) -> "FakePipeWrapper":
1259        """To support usage of this fake pipe with the 'with' statement."""
1260        return self
1261
1262    def __exit__(
1263        self,
1264        exc_type: Optional[Type[BaseException]],
1265        exc_val: Optional[BaseException],
1266        exc_tb: Optional[TracebackType],
1267    ) -> None:
1268        """To support usage of this fake pipe with the 'with' statement."""
1269        self.close()
1270
1271    def get_object(self) -> None:
1272        return self.file_object
1273
1274    def fileno(self) -> int:
1275        """Return the fake file descriptor of the pipe object."""
1276        if self.filedes is not None:
1277            return self.filedes
1278        raise OSError(errno.EBADF, "Invalid file descriptor")
1279
1280    def read(self, numBytes: int = -1) -> bytes:
1281        """Read from the real pipe."""
1282        if self.real_file:
1283            return self.real_file.read(numBytes)  # pytype: disable=bad-return-type
1284        return os.read(self.fd, numBytes)
1285
1286    def flush(self) -> None:
1287        """Flush the real pipe?"""
1288
1289    def write(self, contents: bytes) -> int:
1290        """Write to the real pipe."""
1291        if self.real_file:
1292            return self.real_file.write(contents)
1293        return os.write(self.fd, contents)
1294
1295    def close(self) -> None:
1296        """Close the pipe descriptor."""
1297        assert self.filedes is not None
1298        open_files = self._filesystem.open_files[self.filedes]
1299        assert open_files is not None
1300        open_files.remove(self)
1301        if self.real_file:
1302            self.real_file.close()
1303        else:
1304            os.close(self.fd)
1305
1306    def readable(self) -> bool:
1307        """The pipe end can either be readable or writable."""
1308        return not self.can_write
1309
1310    def writable(self) -> bool:
1311        """The pipe end can either be readable or writable."""
1312        return self.can_write
1313
1314    def seekable(self) -> bool:
1315        """A pipe is not seekable."""
1316        return False
1317