1# Copyright 2009 Google Inc. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" Uses :py:class:`FakeOsModule` to provide a 16 fake :py:mod:`os` module replacement. 17""" 18import errno 19import functools 20import inspect 21import os 22import sys 23import uuid 24from contextlib import contextmanager 25from stat import ( 26 S_IFREG, 27 S_IFSOCK, 28) 29from typing import ( 30 List, 31 Optional, 32 Callable, 33 Union, 34 Any, 35 Tuple, 36 cast, 37 AnyStr, 38 TYPE_CHECKING, 39 Set, 40) 41 42from pyfakefs.extra_packages import use_scandir 43from pyfakefs.fake_file import ( 44 FakeDirectory, 45 FakeDirWrapper, 46 StandardStreamWrapper, 47 FakeFileWrapper, 48 FakePipeWrapper, 49 FakeFile, 50 AnyFileWrapper, 51) 52from pyfakefs.fake_open import FakeFileOpen, _OpenModes 53from pyfakefs.fake_path import FakePathModule 54from pyfakefs.fake_scandir import scandir, walk, ScanDirIter 55from pyfakefs.helpers import ( 56 FakeStatResult, 57 is_int_type, 58 is_byte_string, 59 make_string_path, 60 IS_PYPY, 61 to_string, 62 matching_string, 63 AnyString, 64 to_bytes, 65 PERM_EXE, 66 PERM_DEF, 67 is_root, 68 get_uid, 69 get_gid, 70) 71 72if TYPE_CHECKING: 73 from pyfakefs.fake_filesystem import FakeFilesystem 74 75NR_STD_STREAMS = 3 76 77 78class FakeOsModule: 79 """Uses FakeFilesystem to provide a fake os module replacement. 80 81 Do not create os.path separately from os, as there is a necessary circular 82 dependency between os and os.path to replicate the behavior of the standard 83 Python modules. What you want to do is to just let FakeOsModule take care 84 of `os.path` setup itself. 85 86 # You always want to do this. 87 filesystem = fake_filesystem.FakeFilesystem() 88 my_os_module = fake_os.FakeOsModule(filesystem) 89 """ 90 91 use_original = False 92 93 @staticmethod 94 def dir() -> List[str]: 95 """Return the list of patched function names. Used for patching 96 functions imported from the module. 97 """ 98 _dir = [ 99 "access", 100 "chdir", 101 "chmod", 102 "chown", 103 "close", 104 "fstat", 105 "fsync", 106 "getcwd", 107 "lchmod", 108 "link", 109 "listdir", 110 "lstat", 111 "makedirs", 112 "mkdir", 113 "mknod", 114 "open", 115 "read", 116 "readlink", 117 "remove", 118 "removedirs", 119 "rename", 120 "rmdir", 121 "stat", 122 "symlink", 123 "umask", 124 "unlink", 125 "utime", 126 "walk", 127 "write", 128 "getcwdb", 129 "replace", 130 ] 131 if sys.platform.startswith("linux"): 132 _dir += [ 133 "fdatasync", 134 "getxattr", 135 "listxattr", 136 "removexattr", 137 "setxattr", 138 ] 139 if sys.platform != "win32": 140 _dir += [ 141 "getgid", 142 "getuid", 143 ] 144 if use_scandir: 145 _dir += ["scandir"] 146 return _dir 147 148 def __init__(self, filesystem: "FakeFilesystem"): 149 """Also exposes self.path (to fake os.path). 150 151 Args: 152 filesystem: FakeFilesystem used to provide file system information 153 """ 154 self.filesystem = filesystem 155 self.os_module: Any = os 156 self.path = FakePathModule(self.filesystem, self) 157 self._supports_follow_symlinks: Optional[Set] = None 158 self._supports_dir_fd: Optional[Set] = None 159 self._supports_effective_ids: Optional[Set] = None 160 self._supports_fd: Optional[Set] = None 161 162 @property 163 def devnull(self) -> str: 164 return self.path.devnull 165 166 @property 167 def sep(self) -> str: 168 return self.path.sep 169 170 @property 171 def altsep(self) -> Optional[str]: 172 return self.path.altsep 173 174 @property 175 def linesep(self) -> str: 176 return self.path.linesep 177 178 @property 179 def pathsep(self) -> str: 180 return self.path.pathsep 181 182 def fdopen(self, fd: int, *args: Any, **kwargs: Any) -> AnyFileWrapper: 183 """Redirector to open() builtin function. 184 185 Args: 186 fd: The file descriptor of the file to open. 187 *args: Pass through args. 188 **kwargs: Pass through kwargs. 189 190 Returns: 191 File object corresponding to file_des. 192 193 Raises: 194 TypeError: if file descriptor is not an integer. 195 """ 196 if not is_int_type(fd): 197 raise TypeError("an integer is required") 198 return FakeFileOpen(self.filesystem)(fd, *args, **kwargs) 199 200 def _umask(self) -> int: 201 """Return the current umask.""" 202 if self.filesystem.is_windows_fs: 203 # windows always returns 0 - it has no real notion of umask 204 return 0 205 if sys.platform == "win32": 206 # if we are testing Unix under Windows we assume a default mask 207 return 0o002 208 else: 209 # under Unix, we return the real umask; 210 # as there is no pure getter for umask, so we have to first 211 # set a mode to get the previous one and then re-set that 212 mask = os.umask(0) 213 os.umask(mask) 214 return mask 215 216 def open( 217 self, 218 path: AnyStr, 219 flags: int, 220 mode: Optional[int] = None, 221 *, 222 dir_fd: Optional[int] = None 223 ) -> int: 224 """Return the file descriptor for a FakeFile. 225 226 Args: 227 path: the path to the file 228 flags: low-level bits to indicate io operation 229 mode: bits to define default permissions 230 Note: only basic modes are supported, OS-specific modes are 231 ignored 232 dir_fd: If not `None`, the file descriptor of a directory, 233 with `file_path` being relative to this directory. 234 235 Returns: 236 A file descriptor. 237 238 Raises: 239 OSError: if the path cannot be found 240 ValueError: if invalid mode is given 241 NotImplementedError: if `os.O_EXCL` is used without `os.O_CREAT` 242 """ 243 path = self._path_with_dir_fd(path, self.open, dir_fd) 244 if mode is None: 245 if self.filesystem.is_windows_fs: 246 mode = 0o666 247 else: 248 mode = 0o777 & ~self._umask() 249 250 has_tmpfile_flag = ( 251 hasattr(os, "O_TMPFILE") and flags & os.O_TMPFILE == os.O_TMPFILE 252 ) 253 open_modes = _OpenModes( 254 must_exist=not flags & os.O_CREAT and not has_tmpfile_flag, 255 can_read=not flags & os.O_WRONLY, 256 can_write=flags & (os.O_RDWR | os.O_WRONLY) != 0, 257 truncate=flags & os.O_TRUNC != 0, 258 append=flags & os.O_APPEND != 0, 259 must_not_exist=flags & os.O_EXCL != 0, 260 ) 261 if open_modes.must_not_exist and open_modes.must_exist: 262 raise NotImplementedError("O_EXCL without O_CREAT mode is not supported") 263 if has_tmpfile_flag: 264 # this is a workaround for tempfiles that do not have a filename 265 # as we do not support this directly, we just add a unique filename 266 # and set the file to delete on close 267 path = self.filesystem.joinpaths( 268 path, matching_string(path, str(uuid.uuid4())) 269 ) 270 271 if not self.filesystem.is_windows_fs and self.filesystem.exists(path): 272 # handle opening directory - only allowed under Posix 273 # with read-only mode 274 obj = self.filesystem.resolve(path) 275 if isinstance(obj, FakeDirectory): 276 if ( 277 not open_modes.must_exist and not self.filesystem.is_macos 278 ) or open_modes.can_write: 279 self.filesystem.raise_os_error(errno.EISDIR, path) 280 dir_wrapper = FakeDirWrapper(obj, path, self.filesystem) 281 file_des = self.filesystem._add_open_file(dir_wrapper) 282 dir_wrapper.filedes = file_des 283 return file_des 284 285 # low level open is always binary 286 str_flags = "b" 287 delete_on_close = has_tmpfile_flag 288 if hasattr(os, "O_TEMPORARY"): 289 delete_on_close = flags & os.O_TEMPORARY == os.O_TEMPORARY 290 fake_file = FakeFileOpen( 291 self.filesystem, delete_on_close=delete_on_close, raw_io=True 292 )(path, str_flags, open_modes=open_modes) 293 assert not isinstance(fake_file, StandardStreamWrapper) 294 if fake_file.file_object != self.filesystem.dev_null: 295 self.chmod(path, mode) 296 return fake_file.fileno() 297 298 def close(self, fd: int) -> None: 299 """Close a file descriptor. 300 301 Args: 302 fd: An integer file descriptor for the file object requested. 303 304 Raises: 305 OSError: bad file descriptor. 306 TypeError: if file descriptor is not an integer. 307 """ 308 file_handle = self.filesystem.get_open_file(fd) 309 file_handle.close() 310 311 def read(self, fd: int, n: int) -> bytes: 312 """Read number of bytes from a file descriptor, returns bytes read. 313 314 Args: 315 fd: An integer file descriptor for the file object requested. 316 n: Number of bytes to read from file. 317 318 Returns: 319 Bytes read from file. 320 321 Raises: 322 OSError: bad file descriptor. 323 TypeError: if file descriptor is not an integer. 324 """ 325 file_handle = self.filesystem.get_open_file(fd) 326 if isinstance(file_handle, FakeFileWrapper): 327 file_handle.raw_io = True 328 if isinstance(file_handle, FakeDirWrapper): 329 self.filesystem.raise_os_error(errno.EBADF, file_handle.file_path) 330 return file_handle.read(n) 331 332 def write(self, fd: int, contents: bytes) -> int: 333 """Write string to file descriptor, returns number of bytes written. 334 335 Args: 336 fd: An integer file descriptor for the file object requested. 337 contents: String of bytes to write to file. 338 339 Returns: 340 Number of bytes written. 341 342 Raises: 343 OSError: bad file descriptor. 344 TypeError: if file descriptor is not an integer. 345 """ 346 file_handle = cast(FakeFileWrapper, self.filesystem.get_open_file(fd)) 347 if isinstance(file_handle, FakeDirWrapper): 348 self.filesystem.raise_os_error(errno.EBADF, file_handle.file_path) 349 350 if isinstance(file_handle, FakePipeWrapper): 351 return file_handle.write(contents) 352 353 file_handle.raw_io = True 354 file_handle._sync_io() 355 file_handle.update_flush_pos() 356 file_handle.write(contents) 357 file_handle.flush() 358 return len(contents) 359 360 def pipe(self) -> Tuple[int, int]: 361 read_fd, write_fd = os.pipe() 362 read_wrapper = FakePipeWrapper(self.filesystem, read_fd, False) 363 file_des = self.filesystem._add_open_file(read_wrapper) 364 read_wrapper.filedes = file_des 365 write_wrapper = FakePipeWrapper(self.filesystem, write_fd, True) 366 file_des = self.filesystem._add_open_file(write_wrapper) 367 write_wrapper.filedes = file_des 368 return read_wrapper.filedes, write_wrapper.filedes 369 370 def fstat(self, fd: int) -> FakeStatResult: 371 """Return the os.stat-like tuple for the FakeFile object of file_des. 372 373 Args: 374 fd: The file descriptor of filesystem object to retrieve. 375 376 Returns: 377 The FakeStatResult object corresponding to entry_path. 378 379 Raises: 380 OSError: if the filesystem object doesn't exist. 381 """ 382 # stat should return the tuple representing return value of os.stat 383 file_object = self.filesystem.get_open_file(fd).get_object() 384 assert isinstance(file_object, FakeFile) 385 return file_object.stat_result.copy() 386 387 def umask(self, mask: int) -> int: 388 """Change the current umask. 389 390 Args: 391 mask: (int) The new umask value. 392 393 Returns: 394 The old umask. 395 396 Raises: 397 TypeError: if new_mask is of an invalid type. 398 """ 399 if not is_int_type(mask): 400 raise TypeError("an integer is required") 401 old_umask = self.filesystem.umask 402 self.filesystem.umask = mask 403 return old_umask 404 405 def chdir(self, path: AnyStr) -> None: 406 """Change current working directory to target directory. 407 408 Args: 409 path: The path to new current working directory. 410 411 Raises: 412 OSError: if user lacks permission to enter the argument directory 413 or if the target is not a directory. 414 """ 415 try: 416 path = self.filesystem.resolve_path(path, allow_fd=True) 417 except OSError as exc: 418 if self.filesystem.is_macos and exc.errno == errno.EBADF: 419 raise OSError(errno.ENOTDIR, "Not a directory: " + str(path)) 420 raise 421 self.filesystem.confirmdir(path) 422 directory = self.filesystem.resolve(path) 423 # A full implementation would check permissions all the way 424 # up the tree. 425 if not is_root() and not directory.st_mode | PERM_EXE: 426 self.filesystem.raise_os_error(errno.EACCES, directory.name) 427 self.filesystem.cwd = path # type: ignore[assignment] 428 429 def getcwd(self) -> str: 430 """Return current working directory.""" 431 return to_string(self.filesystem.cwd) 432 433 def getcwdb(self) -> bytes: 434 """Return current working directory as bytes.""" 435 return to_bytes(self.filesystem.cwd) 436 437 def listdir(self, path: AnyStr) -> List[AnyStr]: 438 """Return a list of file names in target_directory. 439 440 Args: 441 path: Path to the target directory within the fake 442 filesystem. 443 444 Returns: 445 A list of file names within the target directory in arbitrary 446 order. 447 448 Raises: 449 OSError: if the target is not a directory. 450 """ 451 return self.filesystem.listdir(path) 452 453 XATTR_CREATE = 1 454 XATTR_REPLACE = 2 455 456 def getxattr( 457 self, path: AnyStr, attribute: AnyString, *, follow_symlinks: bool = True 458 ) -> Optional[bytes]: 459 """Return the value of the given extended filesystem attribute for 460 `path`. 461 462 Args: 463 path: File path, file descriptor or path-like object (for 464 Python >= 3.6). 465 attribute: (str or bytes) The attribute name. 466 follow_symlinks: (bool) If True (the default), symlinks in the 467 path are traversed. 468 469 Returns: 470 The contents of the extended attribute as bytes or None if 471 the attribute does not exist. 472 473 Raises: 474 OSError: if the path does not exist. 475 """ 476 if not self.filesystem.is_linux: 477 raise AttributeError("module 'os' has no attribute 'getxattr'") 478 479 if isinstance(attribute, bytes): 480 attribute = attribute.decode(sys.getfilesystemencoding()) 481 file_obj = self.filesystem.resolve(path, follow_symlinks, allow_fd=True) 482 return file_obj.xattr.get(attribute) 483 484 def listxattr( 485 self, path: Optional[AnyStr] = None, *, follow_symlinks: bool = True 486 ) -> List[str]: 487 """Return a list of the extended filesystem attributes on `path`. 488 489 Args: 490 path: File path, file descriptor or path-like object (for 491 Python >= 3.6). If None, the current directory is used. 492 follow_symlinks: (bool) If True (the default), symlinks in the 493 path are traversed. 494 495 Returns: 496 A list of all attribute names for the given path as str. 497 498 Raises: 499 OSError: if the path does not exist. 500 """ 501 if not self.filesystem.is_linux: 502 raise AttributeError("module 'os' has no attribute 'listxattr'") 503 504 path_str = self.filesystem.cwd if path is None else path 505 file_obj = self.filesystem.resolve( 506 cast(AnyStr, path_str), # pytype: disable=invalid-annotation 507 follow_symlinks, 508 allow_fd=True, 509 ) 510 return list(file_obj.xattr.keys()) 511 512 def removexattr( 513 self, path: AnyStr, attribute: AnyString, *, follow_symlinks: bool = True 514 ) -> None: 515 """Removes the extended filesystem attribute attribute from `path`. 516 517 Args: 518 path: File path, file descriptor or path-like object (for 519 Python >= 3.6). 520 attribute: (str or bytes) The attribute name. 521 follow_symlinks: (bool) If True (the default), symlinks in the 522 path are traversed. 523 524 Raises: 525 OSError: if the path does not exist. 526 """ 527 if not self.filesystem.is_linux: 528 raise AttributeError("module 'os' has no attribute 'removexattr'") 529 530 if isinstance(attribute, bytes): 531 attribute = attribute.decode(sys.getfilesystemencoding()) 532 file_obj = self.filesystem.resolve(path, follow_symlinks, allow_fd=True) 533 if attribute in file_obj.xattr: 534 del file_obj.xattr[attribute] 535 536 def setxattr( 537 self, 538 path: AnyStr, 539 attribute: AnyString, 540 value: bytes, 541 flags: int = 0, 542 *, 543 follow_symlinks: bool = True 544 ) -> None: 545 """Sets the value of the given extended filesystem attribute for 546 `path`. 547 548 Args: 549 path: File path, file descriptor or path-like object (for 550 Python >= 3.6). 551 attribute: The attribute name (str or bytes). 552 value: (byte-like) The value to be set. 553 follow_symlinks: (bool) If True (the default), symlinks in the 554 path are traversed. 555 556 Raises: 557 OSError: if the path does not exist. 558 TypeError: if `value` is not a byte-like object. 559 """ 560 if not self.filesystem.is_linux: 561 raise AttributeError("module 'os' has no attribute 'setxattr'") 562 563 if isinstance(attribute, bytes): 564 attribute = attribute.decode(sys.getfilesystemencoding()) 565 if not is_byte_string(value): 566 raise TypeError("a bytes-like object is required") 567 file_obj = self.filesystem.resolve(path, follow_symlinks, allow_fd=True) 568 exists = attribute in file_obj.xattr 569 if exists and flags == self.XATTR_CREATE: 570 self.filesystem.raise_os_error(errno.ENODATA, file_obj.path) 571 if not exists and flags == self.XATTR_REPLACE: 572 self.filesystem.raise_os_error(errno.EEXIST, file_obj.path) 573 file_obj.xattr[attribute] = value 574 575 def scandir(self, path: str = ".") -> ScanDirIter: 576 """Return an iterator of DirEntry objects corresponding to the 577 entries in the directory given by path. 578 579 Args: 580 path: Path to the target directory within the fake filesystem. 581 582 Returns: 583 An iterator to an unsorted list of os.DirEntry objects for 584 each entry in path. 585 586 Raises: 587 OSError: if the target is not a directory. 588 """ 589 return scandir(self.filesystem, path) 590 591 def walk( 592 self, 593 top: AnyStr, 594 topdown: bool = True, 595 onerror: Optional[bool] = None, 596 followlinks: bool = False, 597 ): 598 """Perform an os.walk operation over the fake filesystem. 599 600 Args: 601 top: The root directory from which to begin walk. 602 topdown: Determines whether to return the tuples with the root as 603 the first entry (`True`) or as the last, after all the child 604 directory tuples (`False`). 605 onerror: If not `None`, function which will be called to handle the 606 `os.error` instance provided when `os.listdir()` fails. 607 followlinks: If `True`, symbolic links are followed. 608 609 Yields: 610 (path, directories, nondirectories) for top and each of its 611 subdirectories. See the documentation for the builtin os module 612 for further details. 613 """ 614 return walk(self.filesystem, top, topdown, onerror, followlinks) 615 616 def readlink(self, path: AnyStr, dir_fd: Optional[int] = None) -> str: 617 """Read the target of a symlink. 618 619 Args: 620 path: Symlink to read the target of. 621 dir_fd: If not `None`, the file descriptor of a directory, 622 with `path` being relative to this directory. 623 624 Returns: 625 the string representing the path to which the symbolic link points. 626 627 Raises: 628 TypeError: if `path` is None 629 OSError: (with errno=ENOENT) if path is not a valid path, or 630 (with errno=EINVAL) if path is valid, but is not a symlink 631 """ 632 path = self._path_with_dir_fd(path, self.readlink, dir_fd) 633 return self.filesystem.readlink(path) 634 635 def stat( 636 self, 637 path: AnyStr, 638 *, 639 dir_fd: Optional[int] = None, 640 follow_symlinks: bool = True 641 ) -> FakeStatResult: 642 """Return the os.stat-like tuple for the FakeFile object of entry_path. 643 644 Args: 645 path: path to filesystem object to retrieve. 646 dir_fd: (int) If not `None`, the file descriptor of a directory, 647 with `entry_path` being relative to this directory. 648 follow_symlinks: (bool) If `False` and `entry_path` points to a 649 symlink, the link itself is changed instead of the linked 650 object. 651 652 Returns: 653 The FakeStatResult object corresponding to entry_path. 654 655 Raises: 656 OSError: if the filesystem object doesn't exist. 657 """ 658 path = self._path_with_dir_fd(path, self.stat, dir_fd) 659 return self.filesystem.stat(path, follow_symlinks) 660 661 def lstat(self, path: AnyStr, *, dir_fd: Optional[int] = None) -> FakeStatResult: 662 """Return the os.stat-like tuple for entry_path, 663 not following symlinks. 664 665 Args: 666 path: path to filesystem object to retrieve. 667 dir_fd: If not `None`, the file descriptor of a directory, with 668 `path` being relative to this directory. 669 670 Returns: 671 the FakeStatResult object corresponding to `path`. 672 673 Raises: 674 OSError: if the filesystem object doesn't exist. 675 """ 676 # stat should return the tuple representing return value of os.stat 677 path = self._path_with_dir_fd(path, self.lstat, dir_fd) 678 return self.filesystem.stat(path, follow_symlinks=False) 679 680 def remove(self, path: AnyStr, dir_fd: Optional[int] = None) -> None: 681 """Remove the FakeFile object at the specified file path. 682 683 Args: 684 path: Path to file to be removed. 685 dir_fd: If not `None`, the file descriptor of a directory, 686 with `path` being relative to this directory. 687 688 Raises: 689 OSError: if path points to a directory. 690 OSError: if path does not exist. 691 OSError: if removal failed. 692 """ 693 path = self._path_with_dir_fd(path, self.remove, dir_fd) 694 self.filesystem.remove(path) 695 696 def unlink(self, path: AnyStr, *, dir_fd: Optional[int] = None) -> None: 697 """Remove the FakeFile object at the specified file path. 698 699 Args: 700 path: Path to file to be removed. 701 dir_fd: If not `None`, the file descriptor of a directory, 702 with `path` being relative to this directory. 703 704 Raises: 705 OSError: if path points to a directory. 706 OSError: if path does not exist. 707 OSError: if removal failed. 708 """ 709 path = self._path_with_dir_fd(path, self.unlink, dir_fd) 710 self.filesystem.remove(path) 711 712 def rename( 713 self, 714 src: AnyStr, 715 dst: AnyStr, 716 *, 717 src_dir_fd: Optional[int] = None, 718 dst_dir_fd: Optional[int] = None 719 ) -> None: 720 """Rename a FakeFile object at old_file_path to new_file_path, 721 preserving all properties. 722 Also replaces existing new_file_path object, if one existed 723 (Unix only). 724 725 Args: 726 src: Path to filesystem object to rename. 727 dst: Path to where the filesystem object will live 728 after this call. 729 src_dir_fd: If not `None`, the file descriptor of a directory, 730 with `src` being relative to this directory. 731 dst_dir_fd: If not `None`, the file descriptor of a directory, 732 with `dst` being relative to this directory. 733 734 Raises: 735 OSError: if old_file_path does not exist. 736 OSError: if new_file_path is an existing directory. 737 OSError: if new_file_path is an existing file (Windows only) 738 OSError: if new_file_path is an existing file and could not 739 be removed (Unix) 740 OSError: if `dirname(new_file)` does not exist 741 OSError: if the file would be moved to another filesystem 742 (e.g. mount point) 743 """ 744 src = self._path_with_dir_fd(src, self.rename, src_dir_fd) 745 dst = self._path_with_dir_fd(dst, self.rename, dst_dir_fd) 746 self.filesystem.rename(src, dst) 747 748 def renames(self, old: AnyStr, new: AnyStr): 749 """Fakes `os.renames`, documentation taken from there. 750 751 Super-rename; create directories as necessary and delete any left 752 empty. Works like rename, except creation of any intermediate 753 directories needed to make the new pathname good is attempted 754 first. After the rename, directories corresponding to rightmost 755 path segments of the old name will be pruned until either the 756 whole path is consumed or a nonempty directory is found. 757 758 Note: this function can fail with the new directory structure made 759 if you lack permissions needed to unlink the leaf directory or 760 file. 761 762 """ 763 head, tail = self.filesystem.splitpath(new) 764 if head and tail and not self.filesystem.exists(head): 765 self.makedirs(head) 766 self.rename(old, new) 767 head, tail = self.filesystem.splitpath(old) 768 if head and tail: 769 try: 770 self.removedirs(head) 771 except OSError: 772 pass 773 774 def replace( 775 self, 776 src: AnyStr, 777 dst: AnyStr, 778 *, 779 src_dir_fd: Optional[int] = None, 780 dst_dir_fd: Optional[int] = None 781 ) -> None: 782 """Renames a FakeFile object at old_file_path to new_file_path, 783 preserving all properties. 784 Also replaces existing new_file_path object, if one existed. 785 786 Arg 787 src: Path to filesystem object to rename. 788 dst: Path to where the filesystem object will live 789 after this call. 790 src_dir_fd: If not `None`, the file descriptor of a directory, 791 with `src` being relative to this directory. 792 dst_dir_fd: If not `None`, the file descriptor of a directory, 793 with `dst` being relative to this directory. 794 795 Raises: 796 OSError: if old_file_path does not exist. 797 OSError: if new_file_path is an existing directory. 798 OSError: if new_file_path is an existing file and could 799 not be removed 800 OSError: if `dirname(new_file)` does not exist 801 OSError: if the file would be moved to another filesystem 802 (e.g. mount point) 803 """ 804 src = self._path_with_dir_fd(src, self.rename, src_dir_fd) 805 dst = self._path_with_dir_fd(dst, self.rename, dst_dir_fd) 806 self.filesystem.rename(src, dst, force_replace=True) 807 808 def rmdir(self, path: AnyStr, *, dir_fd: Optional[int] = None) -> None: 809 """Remove a leaf Fake directory. 810 811 Args: 812 path: (str) Name of directory to remove. 813 dir_fd: If not `None`, the file descriptor of a directory, 814 with `path` being relative to this directory. 815 816 Raises: 817 OSError: if `path` does not exist or is not a directory, 818 or as per FakeFilesystem.remove_object. Cannot remove '.'. 819 """ 820 path = self._path_with_dir_fd(path, self.rmdir, dir_fd) 821 self.filesystem.rmdir(path) 822 823 def removedirs(self, name: AnyStr) -> None: 824 """Remove a leaf fake directory and all empty intermediate ones. 825 826 Args: 827 name: the directory to be removed. 828 829 Raises: 830 OSError: if target_directory does not exist or is not a directory. 831 OSError: if target_directory is not empty. 832 """ 833 name = self.filesystem.absnormpath(name) 834 directory = self.filesystem.confirmdir(name) 835 if directory.entries: 836 self.filesystem.raise_os_error(errno.ENOTEMPTY, self.path.basename(name)) 837 else: 838 self.rmdir(name) 839 head, tail = self.path.split(name) 840 if not tail: 841 head, tail = self.path.split(head) 842 while head and tail: 843 head_dir = self.filesystem.confirmdir(head) 844 if head_dir.entries: 845 break 846 # only the top-level dir may not be a symlink 847 self.filesystem.rmdir(head, allow_symlink=True) 848 head, tail = self.path.split(head) 849 850 def mkdir( 851 self, path: AnyStr, mode: int = PERM_DEF, *, dir_fd: Optional[int] = None 852 ) -> None: 853 """Create a leaf Fake directory. 854 855 Args: 856 path: (str) Name of directory to create. 857 Relative paths are assumed to be relative to '/'. 858 mode: (int) Mode to create directory with. This argument defaults 859 to 0o777. The umask is applied to this mode. 860 dir_fd: If not `None`, the file descriptor of a directory, 861 with `path` being relative to this directory. 862 863 Raises: 864 OSError: if the directory name is invalid or parent directory is 865 read only or as per FakeFilesystem.add_object. 866 """ 867 path = self._path_with_dir_fd(path, self.mkdir, dir_fd) 868 try: 869 self.filesystem.makedir(path, mode) 870 except OSError as e: 871 if e.errno == errno.EACCES: 872 self.filesystem.raise_os_error(e.errno, path) 873 raise 874 875 def makedirs( 876 self, name: AnyStr, mode: int = PERM_DEF, exist_ok: Optional[bool] = None 877 ) -> None: 878 """Create a leaf Fake directory + create any non-existent parent dirs. 879 880 Args: 881 name: (str) Name of directory to create. 882 mode: (int) Mode to create directory (and any necessary parent 883 directories) with. This argument defaults to 0o777. 884 The umask is applied to this mode. 885 exist_ok: (boolean) If exist_ok is False (the default), an OSError 886 is raised if the target directory already exists. 887 888 Raises: 889 OSError: if the directory already exists and exist_ok=False, or as 890 per :py:meth:`FakeFilesystem.create_dir`. 891 """ 892 if exist_ok is None: 893 exist_ok = False 894 self.filesystem.makedirs(name, mode, exist_ok) 895 896 def _path_with_dir_fd( 897 self, path: AnyStr, fct: Callable, dir_fd: Optional[int] 898 ) -> AnyStr: 899 """Return the path considering dir_fd. Raise on invalid parameters.""" 900 try: 901 path = make_string_path(path) 902 except TypeError: 903 # the error is handled later 904 path = path 905 if dir_fd is not None: 906 # check if fd is supported for the built-in real function 907 if fct not in self.supports_dir_fd: 908 raise NotImplementedError("dir_fd unavailable on this platform") 909 if isinstance(path, int): 910 raise ValueError( 911 "%s: Can't specify dir_fd without " 912 "matching path_str" % fct.__name__ 913 ) 914 if not self.path.isabs(path): 915 open_file = self.filesystem.get_open_file(dir_fd) 916 return self.path.join( # type: ignore[type-var, return-value] 917 cast(FakeFile, open_file.get_object()).path, path 918 ) 919 return path 920 921 def truncate(self, path: AnyStr, length: int) -> None: 922 """Truncate the file corresponding to path, so that it is 923 length bytes in size. If length is larger than the current size, 924 the file is filled up with zero bytes. 925 926 Args: 927 path: (str or int) Path to the file, or an integer file 928 descriptor for the file object. 929 length: (int) Length of the file after truncating it. 930 931 Raises: 932 OSError: if the file does not exist or the file descriptor is 933 invalid. 934 """ 935 file_object = self.filesystem.resolve(path, allow_fd=True) 936 file_object.size = length 937 938 def ftruncate(self, fd: int, length: int) -> None: 939 """Truncate the file corresponding to fd, so that it is 940 length bytes in size. If length is larger than the current size, 941 the file is filled up with zero bytes. 942 943 Args: 944 fd: (int) File descriptor for the file object. 945 length: (int) Maximum length of the file after truncating it. 946 947 Raises: 948 OSError: if the file descriptor is invalid 949 """ 950 file_object = self.filesystem.get_open_file(fd).get_object() 951 if isinstance(file_object, FakeFileWrapper): 952 file_object.size = length 953 else: 954 raise OSError(errno.EBADF, "Invalid file descriptor") 955 956 def access( 957 self, 958 path: AnyStr, 959 mode: int, 960 *, 961 dir_fd: Optional[int] = None, 962 effective_ids: bool = False, 963 follow_symlinks: bool = True 964 ) -> bool: 965 """Check if a file exists and has the specified permissions. 966 967 Args: 968 path: (str) Path to the file. 969 mode: (int) Permissions represented as a bitwise-OR combination of 970 os.F_OK, os.R_OK, os.W_OK, and os.X_OK. 971 dir_fd: If not `None`, the file descriptor of a directory, with 972 `path` being relative to this directory. 973 effective_ids: (bool) Unused. Only here to match the signature. 974 follow_symlinks: (bool) If `False` and `path` points to a symlink, 975 the link itself is queried instead of the linked object. 976 977 Returns: 978 bool, `True` if file is accessible, `False` otherwise. 979 """ 980 if effective_ids and self.filesystem.is_windows_fs: 981 raise NotImplementedError( 982 "access: effective_ids unavailable on this platform" 983 ) 984 path = self._path_with_dir_fd(path, self.access, dir_fd) 985 try: 986 stat_result = self.stat(path, follow_symlinks=follow_symlinks) 987 except OSError as os_error: 988 if os_error.errno == errno.ENOENT: 989 return False 990 raise 991 if is_root(): 992 mode &= ~os.W_OK 993 return (mode & ((stat_result.st_mode >> 6) & 7)) == mode 994 995 def chmod( 996 self, 997 path: AnyStr, 998 mode: int, 999 *, 1000 dir_fd: Optional[int] = None, 1001 follow_symlinks: bool = True 1002 ) -> None: 1003 """Change the permissions of a file as encoded in integer mode. 1004 1005 Args: 1006 path: (str) Path to the file. 1007 mode: (int) Permissions. 1008 dir_fd: If not `None`, the file descriptor of a directory, with 1009 `path` being relative to this directory. 1010 follow_symlinks: (bool) If `False` and `path` points to a symlink, 1011 the link itself is queried instead of the linked object. 1012 """ 1013 if not follow_symlinks and ( 1014 self.chmod not in self.supports_follow_symlinks or IS_PYPY 1015 ): 1016 raise NotImplementedError( 1017 "`follow_symlinks` for chmod() is not available " "on this system" 1018 ) 1019 path = self._path_with_dir_fd(path, self.chmod, dir_fd) 1020 self.filesystem.chmod(path, mode, follow_symlinks) 1021 1022 def lchmod(self, path: AnyStr, mode: int) -> None: 1023 """Change the permissions of a file as encoded in integer mode. 1024 If the file is a link, the permissions of the link are changed. 1025 1026 Args: 1027 path: (str) Path to the file. 1028 mode: (int) Permissions. 1029 """ 1030 if self.filesystem.is_windows_fs: 1031 raise NameError("name 'lchmod' is not defined") 1032 self.filesystem.chmod(path, mode, follow_symlinks=False) 1033 1034 def utime( 1035 self, 1036 path: AnyStr, 1037 times: Optional[Tuple[Union[int, float], Union[int, float]]] = None, 1038 ns: Optional[Tuple[int, int]] = None, 1039 dir_fd: Optional[int] = None, 1040 follow_symlinks: bool = True, 1041 ) -> None: 1042 """Change the access and modified times of a file. 1043 1044 Args: 1045 path: (str) Path to the file. 1046 times: 2-tuple of int or float numbers, of the form (atime, mtime) 1047 which is used to set the access and modified times in seconds. 1048 If None, both times are set to the current time. 1049 ns: 2-tuple of int numbers, of the form (atime, mtime) which is 1050 used to set the access and modified times in nanoseconds. 1051 If None, both times are set to the current time. 1052 dir_fd: If not `None`, the file descriptor of a directory, 1053 with `path` being relative to this directory. 1054 follow_symlinks: (bool) If `False` and `path` points to a symlink, 1055 the link itself is queried instead of the linked object. 1056 1057 Raises: 1058 TypeError: If anything other than the expected types is 1059 specified in the passed `times` or `ns` tuple, 1060 or if the tuple length is not equal to 2. 1061 ValueError: If both times and ns are specified. 1062 """ 1063 path = self._path_with_dir_fd(path, self.utime, dir_fd) 1064 self.filesystem.utime(path, times=times, ns=ns, follow_symlinks=follow_symlinks) 1065 1066 def chown( 1067 self, 1068 path: AnyStr, 1069 uid: int, 1070 gid: int, 1071 *, 1072 dir_fd: Optional[int] = None, 1073 follow_symlinks: bool = True 1074 ) -> None: 1075 """Set ownership of a faked file. 1076 1077 Args: 1078 path: (str) Path to the file or directory. 1079 uid: (int) Numeric uid to set the file or directory to. 1080 gid: (int) Numeric gid to set the file or directory to. 1081 dir_fd: (int) If not `None`, the file descriptor of a directory, 1082 with `path` being relative to this directory. 1083 follow_symlinks: (bool) If `False` and path points to a symlink, 1084 the link itself is changed instead of the linked object. 1085 1086 Raises: 1087 OSError: if path does not exist. 1088 1089 `None` is also allowed for `uid` and `gid`. This permits `os.rename` 1090 to use `os.chown` even when the source file `uid` and `gid` are 1091 `None` (unset). 1092 """ 1093 path = self._path_with_dir_fd(path, self.chown, dir_fd) 1094 file_object = self.filesystem.resolve(path, follow_symlinks, allow_fd=True) 1095 if not isinstance(uid, int) or not isinstance(gid, int): 1096 raise TypeError("An integer is required") 1097 if uid != -1: 1098 file_object.st_uid = uid 1099 if gid != -1: 1100 file_object.st_gid = gid 1101 1102 def mknod( 1103 self, 1104 path: AnyStr, 1105 mode: Optional[int] = None, 1106 device: int = 0, 1107 *, 1108 dir_fd: Optional[int] = None 1109 ) -> None: 1110 """Create a filesystem node named 'filename'. 1111 1112 Does not support device special files or named pipes as the real os 1113 module does. 1114 1115 Args: 1116 path: (str) Name of the file to create 1117 mode: (int) Permissions to use and type of file to be created. 1118 Default permissions are 0o666. Only the stat.S_IFREG file type 1119 is supported by the fake implementation. The umask is applied 1120 to this mode. 1121 device: not supported in fake implementation 1122 dir_fd: If not `None`, the file descriptor of a directory, 1123 with `path` being relative to this directory. 1124 1125 Raises: 1126 OSError: if called with unsupported options or the file can not be 1127 created. 1128 """ 1129 if self.filesystem.is_windows_fs: 1130 raise AttributeError("module 'os' has no attribute 'mknode'") 1131 if mode is None: 1132 # note that a default value of 0o600 without a device type is 1133 # documented - this is not how it seems to work 1134 mode = S_IFREG | 0o600 1135 if device or not mode & S_IFREG and not is_root(): 1136 self.filesystem.raise_os_error(errno.EPERM) 1137 1138 path = self._path_with_dir_fd(path, self.mknod, dir_fd) 1139 head, tail = self.path.split(path) 1140 if not tail: 1141 if self.filesystem.exists(head, check_link=True): 1142 self.filesystem.raise_os_error(errno.EEXIST, path) 1143 self.filesystem.raise_os_error(errno.ENOENT, path) 1144 if tail in (matching_string(tail, "."), matching_string(tail, "..")): 1145 self.filesystem.raise_os_error(errno.ENOENT, path) 1146 if self.filesystem.exists(path, check_link=True): 1147 self.filesystem.raise_os_error(errno.EEXIST, path) 1148 self.filesystem.add_object( 1149 head, 1150 FakeFile(tail, mode & ~self.filesystem.umask, filesystem=self.filesystem), 1151 ) 1152 1153 def symlink( 1154 self, 1155 src: AnyStr, 1156 dst: AnyStr, 1157 target_is_directory: bool = False, 1158 *, 1159 dir_fd: Optional[int] = None 1160 ) -> None: 1161 """Creates the specified symlink, pointed at the specified link target. 1162 1163 Args: 1164 src: The target of the symlink. 1165 dst: Path to the symlink to create. 1166 target_is_directory: Currently ignored. 1167 dir_fd: If not `None`, the file descriptor of a directory, 1168 with `src` being relative to this directory. 1169 1170 Raises: 1171 OSError: if the file already exists. 1172 """ 1173 src = self._path_with_dir_fd(src, self.symlink, dir_fd) 1174 self.filesystem.create_symlink(dst, src, create_missing_dirs=False) 1175 1176 def link( 1177 self, 1178 src: AnyStr, 1179 dst: AnyStr, 1180 *, 1181 src_dir_fd: Optional[int] = None, 1182 dst_dir_fd: Optional[int] = None 1183 ) -> None: 1184 """Create a hard link at new_path, pointing at old_path. 1185 1186 Args: 1187 src: An existing path to the target file. 1188 dst: The destination path to create a new link at. 1189 src_dir_fd: If not `None`, the file descriptor of a directory, 1190 with `src` being relative to this directory. 1191 dst_dir_fd: If not `None`, the file descriptor of a directory, 1192 with `dst` being relative to this directory. 1193 1194 Raises: 1195 OSError: if something already exists at new_path. 1196 OSError: if the parent directory doesn't exist. 1197 """ 1198 src = self._path_with_dir_fd(src, self.link, src_dir_fd) 1199 dst = self._path_with_dir_fd(dst, self.link, dst_dir_fd) 1200 self.filesystem.link(src, dst) 1201 1202 def fsync(self, fd: int) -> None: 1203 """Perform fsync for a fake file (in other words, do nothing). 1204 1205 Args: 1206 fd: The file descriptor of the open file. 1207 1208 Raises: 1209 OSError: file_des is an invalid file descriptor. 1210 TypeError: file_des is not an integer. 1211 """ 1212 # Throw an error if file_des isn't valid 1213 if 0 <= fd < NR_STD_STREAMS: 1214 self.filesystem.raise_os_error(errno.EINVAL) 1215 file_object = cast(FakeFileWrapper, self.filesystem.get_open_file(fd)) 1216 if self.filesystem.is_windows_fs: 1217 if not hasattr(file_object, "allow_update") or not file_object.allow_update: 1218 self.filesystem.raise_os_error(errno.EBADF, file_object.file_path) 1219 1220 def fdatasync(self, fd: int) -> None: 1221 """Perform fdatasync for a fake file (in other words, do nothing). 1222 1223 Args: 1224 fd: The file descriptor of the open file. 1225 1226 Raises: 1227 OSError: `fd` is an invalid file descriptor. 1228 TypeError: `fd` is not an integer. 1229 """ 1230 if self.filesystem.is_windows_fs or self.filesystem.is_macos: 1231 raise AttributeError("module 'os' has no attribute 'fdatasync'") 1232 # Throw an error if file_des isn't valid 1233 if 0 <= fd < NR_STD_STREAMS: 1234 self.filesystem.raise_os_error(errno.EINVAL) 1235 self.filesystem.get_open_file(fd) 1236 1237 def sendfile(self, fd_out: int, fd_in: int, offset: int, count: int) -> int: 1238 """Copy count bytes from file descriptor fd_in to file descriptor 1239 fd_out starting at offset. 1240 1241 Args: 1242 fd_out: The file descriptor of the destination file. 1243 fd_in: The file descriptor of the source file. 1244 offset: The offset in bytes where to start the copy in the 1245 source file. If `None` (Linux only), copying is started at 1246 the current position, and the position is updated. 1247 count: The number of bytes to copy. If 0, all remaining bytes 1248 are copied (MacOs only). 1249 1250 Raises: 1251 OSError: If `fd_in` or `fd_out` is an invalid file descriptor. 1252 TypeError: If `fd_in` or `fd_out` is not an integer. 1253 TypeError: If `offset` is None under MacOs. 1254 """ 1255 if self.filesystem.is_windows_fs: 1256 raise AttributeError("module 'os' has no attribute 'sendfile'") 1257 if 0 <= fd_in < NR_STD_STREAMS: 1258 self.filesystem.raise_os_error(errno.EINVAL) 1259 if 0 <= fd_out < NR_STD_STREAMS: 1260 self.filesystem.raise_os_error(errno.EINVAL) 1261 source = cast(FakeFileWrapper, self.filesystem.get_open_file(fd_in)) 1262 dest = cast(FakeFileWrapper, self.filesystem.get_open_file(fd_out)) 1263 if self.filesystem.is_macos: 1264 if dest.get_object().stat_result.st_mode & 0o777000 != S_IFSOCK: 1265 raise OSError("Socket operation on non-socket") 1266 if offset is None: 1267 if self.filesystem.is_macos: 1268 raise TypeError("None is not a valid offset") 1269 contents = source.read(count) 1270 else: 1271 position = source.tell() 1272 source.seek(offset) 1273 if count == 0 and self.filesystem.is_macos: 1274 contents = source.read() 1275 else: 1276 contents = source.read(count) 1277 source.seek(position) 1278 if contents: 1279 written = dest.write(contents) 1280 dest.flush() 1281 return written 1282 return 0 1283 1284 def getuid(self) -> int: 1285 """Returns the user id set in the fake filesystem. 1286 If not changed using ``set_uid``, this is the uid of the real system. 1287 """ 1288 if self.filesystem.is_windows_fs: 1289 raise NameError("name 'getuid' is not defined") 1290 return get_uid() 1291 1292 def getgid(self) -> int: 1293 """Returns the group id set in the fake filesystem. 1294 If not changed using ``set_gid``, this is the gid of the real system. 1295 """ 1296 if self.filesystem.is_windows_fs: 1297 raise NameError("name 'getgid' is not defined") 1298 return get_gid() 1299 1300 def fake_functions(self, original_functions) -> Set: 1301 functions = set() 1302 for fn in original_functions: 1303 if hasattr(self, fn.__name__): 1304 functions.add(getattr(self, fn.__name__)) 1305 else: 1306 functions.add(fn) 1307 return functions 1308 1309 @property 1310 def supports_follow_symlinks(self) -> Set[Callable]: 1311 if self._supports_follow_symlinks is None: 1312 self._supports_follow_symlinks = self.fake_functions( 1313 self.os_module.supports_follow_symlinks 1314 ) 1315 return self._supports_follow_symlinks 1316 1317 @property 1318 def supports_dir_fd(self) -> Set[Callable]: 1319 if self._supports_dir_fd is None: 1320 self._supports_dir_fd = self.fake_functions(self.os_module.supports_dir_fd) 1321 return self._supports_dir_fd 1322 1323 @property 1324 def supports_fd(self) -> Set[Callable]: 1325 if self._supports_fd is None: 1326 self._supports_fd = self.fake_functions(self.os_module.supports_fd) 1327 return self._supports_fd 1328 1329 @property 1330 def supports_effective_ids(self) -> Set[Callable]: 1331 if self._supports_effective_ids is None: 1332 self._supports_effective_ids = self.fake_functions( 1333 self.os_module.supports_effective_ids 1334 ) 1335 return self._supports_effective_ids 1336 1337 def __getattr__(self, name: str) -> Any: 1338 """Forwards any unfaked calls to the standard os module.""" 1339 return getattr(self.os_module, name) 1340 1341 1342if sys.version_info > (3, 10): 1343 1344 def handle_original_call(f: Callable) -> Callable: 1345 """Decorator used for real pathlib Path methods to ensure that 1346 real os functions instead of faked ones are used. 1347 Applied to all non-private methods of `FakeOsModule`.""" 1348 1349 @functools.wraps(f) 1350 def wrapped(*args, **kwargs): 1351 if FakeOsModule.use_original: 1352 # remove the `self` argument for FakeOsModule methods 1353 if args and isinstance(args[0], FakeOsModule): 1354 args = args[1:] 1355 return getattr(os, f.__name__)(*args, **kwargs) 1356 return f(*args, **kwargs) 1357 1358 return wrapped 1359 1360 for name, fn in inspect.getmembers(FakeOsModule, inspect.isfunction): 1361 if not fn.__name__.startswith("_"): 1362 setattr(FakeOsModule, name, handle_original_call(fn)) 1363 1364 1365@contextmanager 1366def use_original_os(): 1367 """Temporarily use original os functions instead of faked ones. 1368 Used to ensure that skipped modules do not use faked calls. 1369 """ 1370 try: 1371 FakeOsModule.use_original = True 1372 yield 1373 finally: 1374 FakeOsModule.use_original = False 1375