1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""Fake implementations for different file objects. 16""" 17import errno 18import io 19import locale 20import os 21import sys 22from stat import ( 23 S_IFREG, 24 S_IFDIR, 25) 26from types import TracebackType 27from typing import ( 28 List, 29 Optional, 30 Callable, 31 Union, 32 Any, 33 Dict, 34 cast, 35 AnyStr, 36 NoReturn, 37 Iterator, 38 TextIO, 39 Type, 40 TYPE_CHECKING, 41) 42 43from pyfakefs import helpers 44from pyfakefs.helpers import ( 45 FakeStatResult, 46 BinaryBufferIO, 47 TextBufferIO, 48 is_int_type, 49 is_unicode_string, 50 to_string, 51 matching_string, 52 real_encoding, 53 AnyPath, 54 AnyString, 55) 56 57if TYPE_CHECKING: 58 from pyfakefs.fake_filesystem import FakeFilesystem 59 60AnyFileWrapper = Union[ 61 "FakeFileWrapper", 62 "FakeDirWrapper", 63 "StandardStreamWrapper", 64 "FakePipeWrapper", 65] 66AnyFile = Union["FakeFile", "FakeDirectory"] 67 68 69class FakeLargeFileIoException(Exception): 70 """Exception thrown on unsupported operations for fake large files. 71 Fake large files have a size with no real content. 72 """ 73 74 def __init__(self, file_path: str) -> None: 75 super(FakeLargeFileIoException, self).__init__( 76 "Read and write operations not supported for " 77 "fake large file: %s" % file_path 78 ) 79 80 81class FakeFile: 82 """Provides the appearance of a real file. 83 84 Attributes currently faked out: 85 * `st_mode`: user-specified, otherwise S_IFREG 86 * `st_ctime`: the time.time() timestamp of the file change time (updated 87 each time a file's attributes is modified). 88 * `st_atime`: the time.time() timestamp when the file was last accessed. 89 * `st_mtime`: the time.time() timestamp when the file was last modified. 90 * `st_size`: the size of the file 91 * `st_nlink`: the number of hard links to the file 92 * `st_ino`: the inode number - a unique number identifying the file 93 * `st_dev`: a unique number identifying the (fake) file system device 94 the file belongs to 95 * `st_uid`: always set to USER_ID, which can be changed globally using 96 `set_uid` 97 * `st_gid`: always set to GROUP_ID, which can be changed globally using 98 `set_gid` 99 100 .. note:: The resolution for `st_ctime`, `st_mtime` and `st_atime` in the 101 real file system depends on the used file system (for example it is 102 only 1s for HFS+ and older Linux file systems, but much higher for 103 ext4 and NTFS). This is currently ignored by pyfakefs, which uses 104 the resolution of `time.time()`. 105 106 Under Windows, `st_atime` is not updated for performance reasons by 107 default. pyfakefs never updates `st_atime` under Windows, assuming 108 the default setting. 109 """ 110 111 stat_types = ( 112 "st_mode", 113 "st_ino", 114 "st_dev", 115 "st_nlink", 116 "st_uid", 117 "st_gid", 118 "st_size", 119 "st_atime", 120 "st_mtime", 121 "st_ctime", 122 "st_atime_ns", 123 "st_mtime_ns", 124 "st_ctime_ns", 125 ) 126 127 def __init__( 128 self, 129 name: AnyStr, 130 st_mode: int = S_IFREG | helpers.PERM_DEF_FILE, 131 contents: Optional[AnyStr] = None, 132 filesystem: Optional["FakeFilesystem"] = None, 133 encoding: Optional[str] = None, 134 errors: Optional[str] = None, 135 side_effect: Optional[Callable[["FakeFile"], None]] = None, 136 ): 137 """ 138 Args: 139 name: Name of the file/directory, without parent path information 140 st_mode: The stat.S_IF* constant representing the file type (i.e. 141 stat.S_IFREG, stat.S_IFDIR), and the file permissions. 142 If no file type is set (e.g. permission flags only), a 143 regular file type is assumed. 144 contents: The contents of the filesystem object; should be a string 145 or byte object for regular files, and a dict of other 146 FakeFile or FakeDirectory objects with the file names as 147 keys for FakeDirectory objects 148 filesystem: The fake filesystem where the file is created. 149 encoding: If contents is a unicode string, the encoding used 150 for serialization. 151 errors: The error mode used for encoding/decoding errors. 152 side_effect: function handle that is executed when file is written, 153 must accept the file object as an argument. 154 """ 155 # to be backwards compatible regarding argument order, we raise on None 156 if filesystem is None: 157 raise ValueError("filesystem shall not be None") 158 self.filesystem: "FakeFilesystem" = filesystem 159 self._side_effect: Optional[Callable] = side_effect 160 self.name: AnyStr = name # type: ignore[assignment] 161 self.stat_result = FakeStatResult( 162 filesystem.is_windows_fs, 163 helpers.get_uid(), 164 helpers.get_gid(), 165 helpers.now(), 166 ) 167 if st_mode >> 12 == 0: 168 st_mode |= S_IFREG 169 self.stat_result.st_mode = st_mode 170 self.st_size: int = 0 171 self.encoding: Optional[str] = real_encoding(encoding) 172 self.errors: str = errors or "strict" 173 self._byte_contents: Optional[bytes] = self._encode_contents(contents) 174 self.stat_result.st_size = ( 175 len(self._byte_contents) if self._byte_contents is not None else 0 176 ) 177 self.epoch: int = 0 178 self.parent_dir: Optional[FakeDirectory] = None 179 # Linux specific: extended file system attributes 180 self.xattr: Dict = {} 181 self.opened_as: AnyString = "" 182 183 @property 184 def byte_contents(self) -> Optional[bytes]: 185 """Return the contents as raw byte array.""" 186 return self._byte_contents 187 188 @property 189 def contents(self) -> Optional[str]: 190 """Return the contents as string with the original encoding.""" 191 if isinstance(self.byte_contents, bytes): 192 return self.byte_contents.decode( 193 self.encoding or locale.getpreferredencoding(False), 194 errors=self.errors, 195 ) 196 return None 197 198 @property 199 def st_ctime(self) -> float: 200 """Return the creation time of the fake file.""" 201 return self.stat_result.st_ctime 202 203 @st_ctime.setter 204 def st_ctime(self, val: float) -> None: 205 """Set the creation time of the fake file.""" 206 self.stat_result.st_ctime = val 207 208 @property 209 def st_atime(self) -> float: 210 """Return the access time of the fake file.""" 211 return self.stat_result.st_atime 212 213 @st_atime.setter 214 def st_atime(self, val: float) -> None: 215 """Set the access time of the fake file.""" 216 self.stat_result.st_atime = val 217 218 @property 219 def st_mtime(self) -> float: 220 """Return the modification time of the fake file.""" 221 return self.stat_result.st_mtime 222 223 @st_mtime.setter 224 def st_mtime(self, val: float) -> None: 225 """Set the modification time of the fake file.""" 226 self.stat_result.st_mtime = val 227 228 def set_large_file_size(self, st_size: int) -> None: 229 """Sets the self.st_size attribute and replaces self.content with None. 230 231 Provided specifically to simulate very large files without regards 232 to their content (which wouldn't fit in memory). 233 Note that read/write operations with such a file raise 234 :py:class:`FakeLargeFileIoException`. 235 236 Args: 237 st_size: (int) The desired file size 238 239 Raises: 240 OSError: if the st_size is not a non-negative integer, 241 or if st_size exceeds the available file system space 242 """ 243 self._check_positive_int(st_size) 244 if self.st_size: 245 self.size = 0 246 if self.filesystem: 247 self.filesystem.change_disk_usage(st_size, self.name, self.st_dev) 248 self.st_size = st_size 249 self._byte_contents = None 250 251 def _check_positive_int(self, size: int) -> None: 252 # the size should be an positive integer value 253 if not is_int_type(size) or size < 0: 254 self.filesystem.raise_os_error(errno.ENOSPC, self.name) 255 256 def is_large_file(self) -> bool: 257 """Return `True` if this file was initialized with size 258 but no contents. 259 """ 260 return self._byte_contents is None 261 262 def _encode_contents(self, contents: Union[str, bytes, None]) -> Optional[bytes]: 263 if is_unicode_string(contents): 264 contents = bytes( 265 cast(str, contents), 266 self.encoding or locale.getpreferredencoding(False), 267 self.errors, 268 ) 269 return cast(bytes, contents) 270 271 def set_initial_contents(self, contents: AnyStr) -> bool: 272 """Sets the file contents and size. 273 Called internally after initial file creation. 274 275 Args: 276 contents: string, new content of file. 277 278 Returns: 279 True if the contents have been changed. 280 281 Raises: 282 OSError: if the st_size is not a non-negative integer, 283 or if st_size exceeds the available file system space 284 """ 285 byte_contents = self._encode_contents(contents) 286 changed = self._byte_contents != byte_contents 287 st_size = len(byte_contents) if byte_contents else 0 288 289 current_size = self.st_size or 0 290 self.filesystem.change_disk_usage( 291 st_size - current_size, self.name, self.st_dev 292 ) 293 self._byte_contents = byte_contents 294 self.st_size = st_size 295 self.epoch += 1 296 return changed 297 298 def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool: 299 """Sets the file contents and size and increases the modification time. 300 Also executes the side_effects if available. 301 302 Args: 303 contents: (str, bytes) new content of file. 304 encoding: (str) the encoding to be used for writing the contents 305 if they are a unicode string. 306 If not given, the locale preferred encoding is used. 307 308 Returns: 309 True if the contents have been changed. 310 311 Raises: 312 OSError: if `st_size` is not a non-negative integer, 313 or if it exceeds the available file system space. 314 """ 315 self.encoding = real_encoding(encoding) 316 changed = self.set_initial_contents(contents) 317 if self._side_effect is not None: 318 self._side_effect(self) 319 return changed 320 321 @property 322 def size(self) -> int: 323 """Return the size in bytes of the file contents.""" 324 return self.st_size 325 326 @size.setter 327 def size(self, st_size: int) -> None: 328 """Resizes file content, padding with nulls if new size exceeds the 329 old size. 330 331 Args: 332 st_size: The desired size for the file. 333 334 Raises: 335 OSError: if the st_size arg is not a non-negative integer 336 or if st_size exceeds the available file system space 337 """ 338 339 self._check_positive_int(st_size) 340 current_size = self.st_size or 0 341 self.filesystem.change_disk_usage( 342 st_size - current_size, self.name, self.st_dev 343 ) 344 if self._byte_contents: 345 if st_size < current_size: 346 self._byte_contents = self._byte_contents[:st_size] 347 else: 348 self._byte_contents += b"\0" * (st_size - current_size) 349 self.st_size = st_size 350 self.epoch += 1 351 352 @property 353 def path(self) -> AnyStr: 354 """Return the full path of the current object.""" 355 names: List[AnyStr] = [] # pytype: disable=invalid-annotation 356 obj: Optional[FakeFile] = self 357 while obj: 358 names.insert(0, matching_string(self.name, obj.name)) # type: ignore 359 obj = obj.parent_dir 360 sep = self.filesystem.get_path_separator(names[0]) 361 if names[0] == sep: 362 names.pop(0) 363 dir_path = sep.join(names) 364 drive = self.filesystem.splitdrive(dir_path)[0] 365 # if a Windows path already starts with a drive or UNC path, 366 # no extra separator is needed 367 if not drive: 368 dir_path = sep + dir_path 369 else: 370 dir_path = sep.join(names) 371 return self.filesystem.absnormpath(dir_path) 372 373 if sys.version_info >= (3, 12): 374 375 @property 376 def is_junction(self) -> bool: 377 return self.filesystem.isjunction(self.path) 378 379 def __getattr__(self, item: str) -> Any: 380 """Forward some properties to stat_result.""" 381 if item in self.stat_types: 382 return getattr(self.stat_result, item) 383 return super().__getattribute__(item) 384 385 def __setattr__(self, key: str, value: Any) -> None: 386 """Forward some properties to stat_result.""" 387 if key in self.stat_types: 388 return setattr(self.stat_result, key, value) 389 return super().__setattr__(key, value) 390 391 def __str__(self) -> str: 392 return "%r(%o)" % (self.name, self.st_mode) 393 394 395class FakeNullFile(FakeFile): 396 def __init__(self, filesystem: "FakeFilesystem") -> None: 397 devnull = "nul" if filesystem.is_windows_fs else "/dev/null" 398 super(FakeNullFile, self).__init__(devnull, filesystem=filesystem, contents="") 399 400 @property 401 def byte_contents(self) -> bytes: 402 return b"" 403 404 def set_initial_contents(self, contents: AnyStr) -> bool: 405 return False 406 407 408class FakeFileFromRealFile(FakeFile): 409 """Represents a fake file copied from the real file system. 410 411 The contents of the file are read on demand only. 412 """ 413 414 def __init__( 415 self, 416 file_path: str, 417 filesystem: "FakeFilesystem", 418 side_effect: Optional[Callable] = None, 419 ) -> None: 420 """ 421 Args: 422 file_path: Path to the existing file. 423 filesystem: The fake filesystem where the file is created. 424 425 Raises: 426 OSError: if the file does not exist in the real file system. 427 OSError: if the file already exists in the fake file system. 428 """ 429 super().__init__( 430 name=os.path.basename(file_path), 431 filesystem=filesystem, 432 side_effect=side_effect, 433 ) 434 self.contents_read = False 435 436 @property 437 def byte_contents(self) -> Optional[bytes]: 438 if not self.contents_read: 439 self.contents_read = True 440 with io.open(self.file_path, "rb") as f: 441 self._byte_contents = f.read() 442 # On MacOS and BSD, the above io.open() updates atime on the real file 443 self.st_atime = os.stat(self.file_path).st_atime 444 return self._byte_contents 445 446 def set_contents(self, contents, encoding=None): 447 self.contents_read = True 448 super(FakeFileFromRealFile, self).set_contents(contents, encoding) 449 450 def is_large_file(self): 451 """The contents are never faked.""" 452 return False 453 454 455class FakeDirectory(FakeFile): 456 """Provides the appearance of a real directory.""" 457 458 def __init__( 459 self, 460 name: str, 461 perm_bits: int = helpers.PERM_DEF, 462 filesystem: Optional["FakeFilesystem"] = None, 463 ): 464 """ 465 Args: 466 name: name of the file/directory, without parent path information 467 perm_bits: permission bits. defaults to 0o777. 468 filesystem: if set, the fake filesystem where the directory 469 is created 470 """ 471 FakeFile.__init__(self, name, S_IFDIR | perm_bits, "", filesystem=filesystem) 472 # directories have the link count of contained entries, 473 # including '.' and '..' 474 self.st_nlink += 1 475 self._entries: Dict[str, AnyFile] = {} 476 477 def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool: 478 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 479 480 @property 481 def entries(self) -> Dict[str, FakeFile]: 482 """Return the list of contained directory entries.""" 483 return self._entries 484 485 @property 486 def ordered_dirs(self) -> List[str]: 487 """Return the list of contained directory entry names ordered by 488 creation order. 489 """ 490 return [ 491 item[0] 492 for item in sorted(self._entries.items(), key=lambda entry: entry[1].st_ino) 493 ] 494 495 def add_entry(self, path_object: FakeFile) -> None: 496 """Adds a child FakeFile to this directory. 497 498 Args: 499 path_object: FakeFile instance to add as a child of this directory. 500 501 Raises: 502 OSError: if the directory has no write permission (Posix only) 503 OSError: if the file or directory to be added already exists 504 """ 505 if ( 506 not helpers.is_root() 507 and not self.st_mode & helpers.PERM_WRITE 508 and not self.filesystem.is_windows_fs 509 ): 510 raise OSError(errno.EACCES, "Permission Denied", self.path) 511 512 path_object_name: str = to_string(path_object.name) 513 if path_object_name in self.entries: 514 self.filesystem.raise_os_error(errno.EEXIST, self.path) 515 516 self._entries[path_object_name] = path_object 517 path_object.parent_dir = self 518 if path_object.st_ino is None: 519 self.filesystem.last_ino += 1 520 path_object.st_ino = self.filesystem.last_ino 521 self.st_nlink += 1 522 path_object.st_nlink += 1 523 path_object.st_dev = self.st_dev 524 if path_object.st_nlink == 1: 525 self.filesystem.change_disk_usage( 526 path_object.size, path_object.name, self.st_dev 527 ) 528 529 def get_entry(self, pathname_name: str) -> AnyFile: 530 """Retrieves the specified child file or directory entry. 531 532 Args: 533 pathname_name: The basename of the child object to retrieve. 534 535 Returns: 536 The fake file or directory object. 537 538 Raises: 539 KeyError: if no child exists by the specified name. 540 """ 541 pathname_name = self._normalized_entryname(pathname_name) 542 return self.entries[to_string(pathname_name)] 543 544 def _normalized_entryname(self, pathname_name: str) -> str: 545 if not self.filesystem.is_case_sensitive: 546 matching_names = [ 547 name for name in self.entries if name.lower() == pathname_name.lower() 548 ] 549 if matching_names: 550 pathname_name = matching_names[0] 551 return pathname_name 552 553 def remove_entry(self, pathname_name: str, recursive: bool = True) -> None: 554 """Removes the specified child file or directory. 555 556 Args: 557 pathname_name: Basename of the child object to remove. 558 recursive: If True (default), the entries in contained directories 559 are deleted first. Used to propagate removal errors 560 (e.g. permission problems) from contained entries. 561 562 Raises: 563 KeyError: if no child exists by the specified name. 564 OSError: if user lacks permission to delete the file, 565 or (Windows only) the file is open. 566 """ 567 pathname_name = self._normalized_entryname(pathname_name) 568 entry = self.get_entry(pathname_name) 569 if self.filesystem.is_windows_fs: 570 if entry.st_mode & helpers.PERM_WRITE == 0: 571 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 572 if self.filesystem.has_open_file(entry): 573 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 574 else: 575 if not helpers.is_root() and ( 576 self.st_mode & (helpers.PERM_WRITE | helpers.PERM_EXE) 577 != helpers.PERM_WRITE | helpers.PERM_EXE 578 ): 579 self.filesystem.raise_os_error(errno.EACCES, pathname_name) 580 581 if recursive and isinstance(entry, FakeDirectory): 582 while entry.entries: 583 entry.remove_entry(list(entry.entries)[0]) 584 elif entry.st_nlink == 1: 585 self.filesystem.change_disk_usage(-entry.size, pathname_name, entry.st_dev) 586 587 self.st_nlink -= 1 588 entry.st_nlink -= 1 589 assert entry.st_nlink >= 0 590 591 del self.entries[to_string(pathname_name)] 592 593 @property 594 def size(self) -> int: 595 """Return the total size of all files contained 596 in this directory tree. 597 """ 598 return sum([item[1].size for item in self.entries.items()]) 599 600 @size.setter 601 def size(self, st_size: int) -> None: 602 """Setting the size is an error for a directory.""" 603 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 604 605 def has_parent_object(self, dir_object: "FakeDirectory") -> bool: 606 """Return `True` if dir_object is a direct or indirect parent 607 directory, or if both are the same object.""" 608 obj: Optional[FakeDirectory] = self 609 while obj: 610 if obj == dir_object: 611 return True 612 obj = obj.parent_dir 613 return False 614 615 def __str__(self) -> str: 616 description = super(FakeDirectory, self).__str__() + ":\n" 617 for item in self.entries: 618 item_desc = self.entries[item].__str__() 619 for line in item_desc.split("\n"): 620 if line: 621 description = description + " " + line + "\n" 622 return description 623 624 625class FakeDirectoryFromRealDirectory(FakeDirectory): 626 """Represents a fake directory copied from the real file system. 627 628 The contents of the directory are read on demand only. 629 """ 630 631 def __init__( 632 self, 633 source_path: AnyPath, 634 filesystem: "FakeFilesystem", 635 read_only: bool, 636 target_path: Optional[AnyPath] = None, 637 ): 638 """ 639 Args: 640 source_path: Full directory path. 641 filesystem: The fake filesystem where the directory is created. 642 read_only: If set, all files under the directory are treated 643 as read-only, e.g. a write access raises an exception; 644 otherwise, writing to the files changes the fake files 645 only as usually. 646 target_path: If given, the target path of the directory, 647 otherwise the target is the same as `source_path`. 648 649 Raises: 650 OSError: if the directory does not exist in the real file system 651 """ 652 target_path = target_path or source_path 653 real_stat = os.stat(source_path) 654 super(FakeDirectoryFromRealDirectory, self).__init__( 655 name=to_string(os.path.split(target_path)[1]), 656 perm_bits=real_stat.st_mode, 657 filesystem=filesystem, 658 ) 659 660 self.st_ctime = real_stat.st_ctime 661 self.st_atime = real_stat.st_atime 662 self.st_mtime = real_stat.st_mtime 663 self.st_gid = real_stat.st_gid 664 self.st_uid = real_stat.st_uid 665 self.source_path = source_path # type: ignore 666 self.read_only = read_only 667 self.contents_read = False 668 669 @property 670 def entries(self) -> Dict[str, FakeFile]: 671 """Return the list of contained directory entries, loading them 672 if not already loaded.""" 673 if not self.contents_read: 674 self.contents_read = True 675 base = self.path 676 for entry in os.listdir(self.source_path): 677 source_path = os.path.join(self.source_path, entry) 678 target_path = os.path.join(base, entry) # type: ignore 679 if os.path.islink(source_path): 680 self.filesystem.add_real_symlink(source_path, target_path) 681 elif os.path.isdir(source_path): 682 self.filesystem.add_real_directory( 683 source_path, self.read_only, target_path=target_path 684 ) 685 else: 686 self.filesystem.add_real_file( 687 source_path, self.read_only, target_path=target_path 688 ) 689 return self._entries 690 691 @property 692 def size(self) -> int: 693 # we cannot get the size until the contents are loaded 694 if not self.contents_read: 695 return 0 696 return super(FakeDirectoryFromRealDirectory, self).size 697 698 @size.setter 699 def size(self, st_size: int) -> None: 700 raise self.filesystem.raise_os_error(errno.EISDIR, self.path) 701 702 703class FakeFileWrapper: 704 """Wrapper for a stream object for use by a FakeFile object. 705 706 If the wrapper has any data written to it, it will propagate to 707 the FakeFile object on close() or flush(). 708 """ 709 710 def __init__( 711 self, 712 file_object: FakeFile, 713 file_path: AnyStr, 714 update: bool, 715 read: bool, 716 append: bool, 717 delete_on_close: bool, 718 filesystem: "FakeFilesystem", 719 newline: Optional[str], 720 binary: bool, 721 closefd: bool, 722 encoding: Optional[str], 723 errors: Optional[str], 724 buffering: int, 725 raw_io: bool, 726 is_stream: bool = False, 727 ): 728 self.file_object = file_object 729 self.file_path = file_path # type: ignore[var-annotated] 730 self._append = append 731 self._read = read 732 self.allow_update = update 733 self._closefd = closefd 734 self._file_epoch = file_object.epoch 735 self.raw_io = raw_io 736 self._binary = binary 737 self.is_stream = is_stream 738 self._changed = False 739 self._buffer_size = buffering 740 if self._buffer_size == 0 and not binary: 741 raise ValueError("can't have unbuffered text I/O") 742 # buffer_size is ignored in text mode 743 elif self._buffer_size == -1 or not binary: 744 self._buffer_size = io.DEFAULT_BUFFER_SIZE 745 self._use_line_buffer = not binary and buffering == 1 746 747 contents = file_object.byte_contents 748 self._encoding = encoding or locale.getpreferredencoding(False) 749 errors = errors or "strict" 750 self._io: Union[BinaryBufferIO, TextBufferIO] = ( 751 BinaryBufferIO(contents) 752 if binary 753 else TextBufferIO( 754 contents, encoding=encoding, newline=newline, errors=errors 755 ) 756 ) 757 self._read_whence = 0 758 self._read_seek = 0 759 self._flush_pos = 0 760 if contents: 761 self._flush_pos = len(contents) 762 if update: 763 if not append: 764 self._io.seek(0) 765 else: 766 self._io.seek(self._flush_pos) 767 self._read_seek = self._io.tell() 768 769 if delete_on_close: 770 assert filesystem, "delete_on_close=True requires filesystem" 771 self._filesystem = filesystem 772 self.delete_on_close = delete_on_close 773 # override, don't modify FakeFile.name, as FakeFilesystem expects 774 # it to be the file name only, no directories. 775 self.name = file_object.opened_as 776 self.filedes: Optional[int] = None 777 778 def __enter__(self) -> "FakeFileWrapper": 779 """To support usage of this fake file with the 'with' statement.""" 780 return self 781 782 def __exit__( 783 self, 784 exc_type: Optional[Type[BaseException]], 785 exc_val: Optional[BaseException], 786 exc_tb: Optional[TracebackType], 787 ) -> None: 788 """To support usage of this fake file with the 'with' statement.""" 789 self.close() 790 791 def _raise(self, message: str) -> NoReturn: 792 if self.raw_io: 793 self._filesystem.raise_os_error(errno.EBADF, self.file_path) 794 raise io.UnsupportedOperation(message) 795 796 def get_object(self) -> FakeFile: 797 """Return the FakeFile object that is wrapped 798 by the current instance. 799 """ 800 return self.file_object 801 802 def fileno(self) -> int: 803 """Return the file descriptor of the file object.""" 804 if self.filedes is not None: 805 return self.filedes 806 raise OSError(errno.EBADF, "Invalid file descriptor") 807 808 def close(self) -> None: 809 """Close the file.""" 810 # ignore closing a closed file 811 if not self._is_open(): 812 return 813 814 # for raw io, all writes are flushed immediately 815 if self.allow_update and not self.raw_io: 816 self.flush() 817 if self._filesystem.is_windows_fs and self._changed: 818 self.file_object.st_mtime = helpers.now() 819 820 assert self.filedes is not None 821 if self._closefd: 822 self._filesystem._close_open_file(self.filedes) 823 else: 824 open_files = self._filesystem.open_files[self.filedes] 825 assert open_files is not None 826 open_files.remove(self) 827 if self.delete_on_close: 828 self._filesystem.remove_object( 829 self.get_object().path # type: ignore[arg-type] 830 ) 831 832 @property 833 def closed(self) -> bool: 834 """Simulate the `closed` attribute on file.""" 835 return not self._is_open() 836 837 def _try_flush(self, old_pos: int) -> None: 838 """Try to flush and reset the position if it fails.""" 839 flush_pos = self._flush_pos 840 try: 841 self.flush() 842 except OSError: 843 # write failed - reset to previous position 844 self._io.seek(old_pos) 845 self._io.truncate() 846 self._flush_pos = flush_pos 847 raise 848 849 def flush(self) -> None: 850 """Flush file contents to 'disk'.""" 851 self._check_open_file() 852 if self.allow_update and not self.is_stream: 853 contents = self._io.getvalue() 854 if self._append: 855 self._sync_io() 856 old_contents = self.file_object.byte_contents 857 assert old_contents is not None 858 contents = old_contents + contents[self._flush_pos :] 859 self._set_stream_contents(contents) 860 else: 861 self._io.flush() 862 changed = self.file_object.set_contents(contents, self._encoding) 863 self.update_flush_pos() 864 if changed: 865 if self._filesystem.is_windows_fs: 866 self._changed = True 867 else: 868 current_time = helpers.now() 869 self.file_object.st_ctime = current_time 870 self.file_object.st_mtime = current_time 871 self._file_epoch = self.file_object.epoch 872 873 if not self.is_stream: 874 self._flush_related_files() 875 876 def update_flush_pos(self) -> None: 877 self._flush_pos = self._io.tell() 878 879 def _flush_related_files(self) -> None: 880 for open_files in self._filesystem.open_files[3:]: 881 if open_files is not None: 882 for open_file in open_files: 883 if ( 884 open_file is not self 885 and isinstance(open_file, FakeFileWrapper) 886 and self.file_object == open_file.file_object 887 and not open_file._append 888 ): 889 open_file._sync_io() 890 891 def seek(self, offset: int, whence: int = 0) -> None: 892 """Move read/write pointer in 'file'.""" 893 self._check_open_file() 894 if not self._append: 895 self._io.seek(offset, whence) 896 else: 897 self._read_seek = offset 898 self._read_whence = whence 899 if not self.is_stream: 900 self.flush() 901 902 def tell(self) -> int: 903 """Return the file's current position. 904 905 Returns: 906 int, file's current position in bytes. 907 """ 908 self._check_open_file() 909 if not self.is_stream: 910 self.flush() 911 912 if not self._append: 913 return self._io.tell() 914 if self._read_whence: 915 write_seek = self._io.tell() 916 self._io.seek(self._read_seek, self._read_whence) 917 self._read_seek = self._io.tell() 918 self._read_whence = 0 919 self._io.seek(write_seek) 920 return self._read_seek 921 922 def _sync_io(self) -> None: 923 """Update the stream with changes to the file object contents.""" 924 if self._file_epoch == self.file_object.epoch: 925 return 926 927 contents = self.file_object.byte_contents 928 assert contents is not None 929 self._set_stream_contents(contents) 930 self._file_epoch = self.file_object.epoch 931 932 def _set_stream_contents(self, contents: bytes) -> None: 933 whence = self._io.tell() 934 self._io.seek(0) 935 self._io.truncate() 936 self._io.putvalue(contents) 937 if not self._append: 938 self._io.seek(whence) 939 940 def _read_wrappers(self, name: str) -> Callable: 941 """Wrap a stream attribute in a read wrapper. 942 943 Returns a read_wrapper which tracks our own read pointer since the 944 stream object has no concept of a different read and write pointer. 945 946 Args: 947 name: The name of the attribute to wrap. Should be a read call. 948 949 Returns: 950 The read_wrapper function. 951 """ 952 io_attr = getattr(self._io, name) 953 954 def read_wrapper(*args, **kwargs): 955 """Wrap all read calls to the stream object. 956 957 We do this to track the read pointer separate from the write 958 pointer. Anything that wants to read from the stream object 959 while we're in append mode goes through this. 960 961 Args: 962 *args: pass through args 963 **kwargs: pass through kwargs 964 Returns: 965 Wrapped stream object method 966 """ 967 self._io.seek(self._read_seek, self._read_whence) 968 ret_value = io_attr(*args, **kwargs) 969 self._read_seek = self._io.tell() 970 self._read_whence = 0 971 self._io.seek(0, 2) 972 return ret_value 973 974 return read_wrapper 975 976 def _other_wrapper(self, name: str) -> Callable: 977 """Wrap a stream attribute in an other_wrapper. 978 979 Args: 980 name: the name of the stream attribute to wrap. 981 982 Returns: 983 other_wrapper which is described below. 984 """ 985 io_attr = getattr(self._io, name) 986 987 def other_wrapper(*args, **kwargs): 988 """Wrap all other calls to the stream Object. 989 990 We do this to track changes to the write pointer. Anything that 991 moves the write pointer in a file open for appending should move 992 the read pointer as well. 993 994 Args: 995 *args: Pass through args. 996 **kwargs: Pass through kwargs. 997 998 Returns: 999 Wrapped stream object method. 1000 """ 1001 write_seek = self._io.tell() 1002 ret_value = io_attr(*args, **kwargs) 1003 if write_seek != self._io.tell(): 1004 self._read_seek = self._io.tell() 1005 self._read_whence = 0 1006 1007 return ret_value 1008 1009 return other_wrapper 1010 1011 def _write_wrapper(self, name: str) -> Callable: 1012 """Wrap a stream attribute in a write_wrapper. 1013 1014 Args: 1015 name: the name of the stream attribute to wrap. 1016 1017 Returns: 1018 write_wrapper which is described below. 1019 """ 1020 io_attr = getattr(self._io, name) 1021 1022 def write_wrapper(*args, **kwargs): 1023 """Wrap all other calls to the stream Object. 1024 1025 We do this to track changes to the write pointer. Anything that 1026 moves the write pointer in a file open for appending should move 1027 the read pointer as well. 1028 1029 Args: 1030 *args: Pass through args. 1031 **kwargs: Pass through kwargs. 1032 1033 Returns: 1034 Wrapped stream object method. 1035 """ 1036 old_pos = self._io.tell() 1037 ret_value = io_attr(*args, **kwargs) 1038 new_pos = self._io.tell() 1039 1040 # if the buffer size is exceeded, we flush 1041 use_line_buf = self._use_line_buffer and "\n" in args[0] 1042 if new_pos - self._flush_pos > self._buffer_size or use_line_buf: 1043 flush_all = new_pos - old_pos > self._buffer_size or use_line_buf 1044 # if the current write does not exceed the buffer size, 1045 # we revert to the previous position and flush that, 1046 # otherwise we flush all 1047 if not flush_all: 1048 self._io.seek(old_pos) 1049 self._io.truncate() 1050 self._try_flush(old_pos) 1051 if not flush_all: 1052 ret_value = io_attr(*args, **kwargs) 1053 if self._append: 1054 self._read_seek = self._io.tell() 1055 self._read_whence = 0 1056 return ret_value 1057 1058 return write_wrapper 1059 1060 def _adapt_size_for_related_files(self, size: int) -> None: 1061 for open_files in self._filesystem.open_files[3:]: 1062 if open_files is not None: 1063 for open_file in open_files: 1064 if ( 1065 open_file is not self 1066 and isinstance(open_file, FakeFileWrapper) 1067 and self.file_object == open_file.file_object 1068 and cast(FakeFileWrapper, open_file)._append 1069 ): 1070 open_file._read_seek += size 1071 1072 def _truncate_wrapper(self) -> Callable: 1073 """Wrap truncate() to allow flush after truncate. 1074 1075 Returns: 1076 Wrapper which is described below. 1077 """ 1078 io_attr = self._io.truncate 1079 1080 def truncate_wrapper(*args, **kwargs): 1081 """Wrap truncate call to call flush after truncate.""" 1082 if self._append: 1083 self._io.seek(self._read_seek, self._read_whence) 1084 size = io_attr(*args, **kwargs) 1085 self.flush() 1086 if not self.is_stream: 1087 self.file_object.size = size 1088 buffer_size = len(self._io.getvalue()) 1089 if buffer_size < size: 1090 self._io.seek(buffer_size) 1091 self._io.putvalue(b"\0" * (size - buffer_size)) 1092 self.file_object.set_contents(self._io.getvalue(), self._encoding) 1093 self._flush_pos = size 1094 self._adapt_size_for_related_files(size - buffer_size) 1095 1096 self.flush() 1097 return size 1098 1099 return truncate_wrapper 1100 1101 def size(self) -> int: 1102 """Return the content size in bytes of the wrapped file.""" 1103 return self.file_object.st_size 1104 1105 def __getattr__(self, name: str) -> Any: 1106 if self.file_object.is_large_file(): 1107 raise FakeLargeFileIoException(self.file_path) 1108 1109 reading = name.startswith("read") or name == "next" 1110 truncate = name == "truncate" 1111 writing = name.startswith("write") or truncate 1112 1113 if reading or writing: 1114 self._check_open_file() 1115 if not self._read and reading: 1116 return self._read_error() 1117 if not self.allow_update and writing: 1118 return self._write_error() 1119 1120 if reading: 1121 self._sync_io() 1122 if not self.is_stream: 1123 self.flush() 1124 if not self._filesystem.is_windows_fs: 1125 self.file_object.st_atime = helpers.now() 1126 if truncate: 1127 return self._truncate_wrapper() 1128 if self._append: 1129 if reading: 1130 return self._read_wrappers(name) 1131 elif not writing: 1132 return self._other_wrapper(name) 1133 if writing: 1134 return self._write_wrapper(name) 1135 1136 return getattr(self._io, name) 1137 1138 def _read_error(self) -> Callable: 1139 def read_error(*args, **kwargs): 1140 """Throw an error unless the argument is zero.""" 1141 if args and args[0] == 0: 1142 if self._filesystem.is_windows_fs and self.raw_io: 1143 return b"" if self._binary else "" 1144 self._raise("File is not open for reading.") 1145 1146 return read_error 1147 1148 def _write_error(self) -> Callable: 1149 def write_error(*args, **kwargs): 1150 """Throw an error.""" 1151 if self.raw_io: 1152 if self._filesystem.is_windows_fs and args and len(args[0]) == 0: 1153 return 0 1154 self._raise("File is not open for writing.") 1155 1156 return write_error 1157 1158 def _is_open(self) -> bool: 1159 if self.filedes is not None and self.filedes < len(self._filesystem.open_files): 1160 open_files = self._filesystem.open_files[self.filedes] 1161 if open_files is not None and self in open_files: 1162 return True 1163 return False 1164 1165 def _check_open_file(self) -> None: 1166 if not self.is_stream and not self._is_open(): 1167 raise ValueError("I/O operation on closed file") 1168 1169 def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]: 1170 if not self._read: 1171 self._raise("File is not open for reading") 1172 return self._io.__iter__() 1173 1174 def __next__(self): 1175 if not self._read: 1176 self._raise("File is not open for reading") 1177 return next(self._io) 1178 1179 1180class StandardStreamWrapper: 1181 """Wrapper for a system standard stream to be used in open files list.""" 1182 1183 def __init__(self, stream_object: TextIO): 1184 self._stream_object = stream_object 1185 self.filedes: Optional[int] = None 1186 1187 def get_object(self) -> TextIO: 1188 return self._stream_object 1189 1190 def fileno(self) -> int: 1191 """Return the file descriptor of the wrapped standard stream.""" 1192 if self.filedes is not None: 1193 return self.filedes 1194 raise OSError(errno.EBADF, "Invalid file descriptor") 1195 1196 def read(self, n: int = -1) -> bytes: 1197 return cast(bytes, self._stream_object.read()) 1198 1199 def close(self) -> None: 1200 """We do not support closing standard streams.""" 1201 1202 def is_stream(self) -> bool: 1203 return True 1204 1205 1206class FakeDirWrapper: 1207 """Wrapper for a FakeDirectory object to be used in open files list.""" 1208 1209 def __init__( 1210 self, 1211 file_object: FakeDirectory, 1212 file_path: AnyString, 1213 filesystem: "FakeFilesystem", 1214 ): 1215 self.file_object = file_object 1216 self.file_path = file_path 1217 self._filesystem = filesystem 1218 self.filedes: Optional[int] = None 1219 1220 def get_object(self) -> FakeDirectory: 1221 """Return the FakeFile object that is wrapped by the current 1222 instance.""" 1223 return self.file_object 1224 1225 def fileno(self) -> int: 1226 """Return the file descriptor of the file object.""" 1227 if self.filedes is not None: 1228 return self.filedes 1229 raise OSError(errno.EBADF, "Invalid file descriptor") 1230 1231 def close(self) -> None: 1232 """Close the directory.""" 1233 assert self.filedes is not None 1234 self._filesystem._close_open_file(self.filedes) 1235 1236 1237class FakePipeWrapper: 1238 """Wrapper for a read or write descriptor of a real pipe object to be 1239 used in open files list. 1240 """ 1241 1242 def __init__( 1243 self, 1244 filesystem: "FakeFilesystem", 1245 fd: int, 1246 can_write: bool, 1247 mode: str = "", 1248 ): 1249 self._filesystem = filesystem 1250 self.fd = fd # the real file descriptor 1251 self.can_write = can_write 1252 self.file_object = None 1253 self.filedes: Optional[int] = None 1254 self.real_file = None 1255 if mode: 1256 self.real_file = open(fd, mode) 1257 1258 def __enter__(self) -> "FakePipeWrapper": 1259 """To support usage of this fake pipe with the 'with' statement.""" 1260 return self 1261 1262 def __exit__( 1263 self, 1264 exc_type: Optional[Type[BaseException]], 1265 exc_val: Optional[BaseException], 1266 exc_tb: Optional[TracebackType], 1267 ) -> None: 1268 """To support usage of this fake pipe with the 'with' statement.""" 1269 self.close() 1270 1271 def get_object(self) -> None: 1272 return self.file_object 1273 1274 def fileno(self) -> int: 1275 """Return the fake file descriptor of the pipe object.""" 1276 if self.filedes is not None: 1277 return self.filedes 1278 raise OSError(errno.EBADF, "Invalid file descriptor") 1279 1280 def read(self, numBytes: int = -1) -> bytes: 1281 """Read from the real pipe.""" 1282 if self.real_file: 1283 return self.real_file.read(numBytes) # pytype: disable=bad-return-type 1284 return os.read(self.fd, numBytes) 1285 1286 def flush(self) -> None: 1287 """Flush the real pipe?""" 1288 1289 def write(self, contents: bytes) -> int: 1290 """Write to the real pipe.""" 1291 if self.real_file: 1292 return self.real_file.write(contents) 1293 return os.write(self.fd, contents) 1294 1295 def close(self) -> None: 1296 """Close the pipe descriptor.""" 1297 assert self.filedes is not None 1298 open_files = self._filesystem.open_files[self.filedes] 1299 assert open_files is not None 1300 open_files.remove(self) 1301 if self.real_file: 1302 self.real_file.close() 1303 else: 1304 os.close(self.fd) 1305 1306 def readable(self) -> bool: 1307 """The pipe end can either be readable or writable.""" 1308 return not self.can_write 1309 1310 def writable(self) -> bool: 1311 """The pipe end can either be readable or writable.""" 1312 return self.can_write 1313 1314 def seekable(self) -> bool: 1315 """A pipe is not seekable.""" 1316 return False 1317