1"""Utility functions for copying and archiving files and directory trees. 2 3XXX The functions here don't copy the resource fork or other metadata on Mac. 4 5""" 6 7import os 8import sys 9import stat 10import fnmatch 11import collections 12import errno 13 14try: 15 import zlib 16 del zlib 17 _ZLIB_SUPPORTED = True 18except ImportError: 19 _ZLIB_SUPPORTED = False 20 21try: 22 import bz2 23 del bz2 24 _BZ2_SUPPORTED = True 25except ImportError: 26 _BZ2_SUPPORTED = False 27 28try: 29 import lzma 30 del lzma 31 _LZMA_SUPPORTED = True 32except ImportError: 33 _LZMA_SUPPORTED = False 34 35_WINDOWS = os.name == 'nt' 36posix = nt = None 37if os.name == 'posix': 38 import posix 39elif _WINDOWS: 40 import nt 41 42COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 43# This should never be removed, see rationale in: 44# https://bugs.python.org/issue43743#msg393429 45_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") 46_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS 47 48# CMD defaults in Windows 10 49_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" 50 51__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", 52 "copytree", "move", "rmtree", "Error", "SpecialFileError", 53 "ExecError", "make_archive", "get_archive_formats", 54 "register_archive_format", "unregister_archive_format", 55 "get_unpack_formats", "register_unpack_format", 56 "unregister_unpack_format", "unpack_archive", 57 "ignore_patterns", "chown", "which", "get_terminal_size", 58 "SameFileError"] 59 # disk_usage is added later, if available on the platform 60 61class Error(OSError): 62 pass 63 64class SameFileError(Error): 65 """Raised when source and destination are the same file.""" 66 67class SpecialFileError(OSError): 68 """Raised when trying to do a kind of operation (e.g. copying) which is 69 not supported on a special file (e.g. a named pipe)""" 70 71class ExecError(OSError): 72 """Raised when a command could not be executed""" 73 74class ReadError(OSError): 75 """Raised when an archive cannot be read""" 76 77class RegistryError(Exception): 78 """Raised when a registry operation with the archiving 79 and unpacking registries fails""" 80 81class _GiveupOnFastCopy(Exception): 82 """Raised as a signal to fallback on using raw read()/write() 83 file copy when fast-copy functions fail to do so. 84 """ 85 86def _fastcopy_fcopyfile(fsrc, fdst, flags): 87 """Copy a regular file content or metadata by using high-performance 88 fcopyfile(3) syscall (macOS). 89 """ 90 try: 91 infd = fsrc.fileno() 92 outfd = fdst.fileno() 93 except Exception as err: 94 raise _GiveupOnFastCopy(err) # not a regular file 95 96 try: 97 posix._fcopyfile(infd, outfd, flags) 98 except OSError as err: 99 err.filename = fsrc.name 100 err.filename2 = fdst.name 101 if err.errno in {errno.EINVAL, errno.ENOTSUP}: 102 raise _GiveupOnFastCopy(err) 103 else: 104 raise err from None 105 106def _fastcopy_sendfile(fsrc, fdst): 107 """Copy data from one regular mmap-like fd to another by using 108 high-performance sendfile(2) syscall. 109 This should work on Linux >= 2.6.33 only. 110 """ 111 # Note: copyfileobj() is left alone in order to not introduce any 112 # unexpected breakage. Possible risks by using zero-copy calls 113 # in copyfileobj() are: 114 # - fdst cannot be open in "a"(ppend) mode 115 # - fsrc and fdst may be open in "t"(ext) mode 116 # - fsrc may be a BufferedReader (which hides unread data in a buffer), 117 # GzipFile (which decompresses data), HTTPResponse (which decodes 118 # chunks). 119 # - possibly others (e.g. encrypted fs/partition?) 120 global _USE_CP_SENDFILE 121 try: 122 infd = fsrc.fileno() 123 outfd = fdst.fileno() 124 except Exception as err: 125 raise _GiveupOnFastCopy(err) # not a regular file 126 127 # Hopefully the whole file will be copied in a single call. 128 # sendfile() is called in a loop 'till EOF is reached (0 return) 129 # so a bufsize smaller or bigger than the actual file size 130 # should not make any difference, also in case the file content 131 # changes while being copied. 132 try: 133 blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB 134 except OSError: 135 blocksize = 2 ** 27 # 128MiB 136 # On 32-bit architectures truncate to 1GiB to avoid OverflowError, 137 # see bpo-38319. 138 if sys.maxsize < 2 ** 32: 139 blocksize = min(blocksize, 2 ** 30) 140 141 offset = 0 142 while True: 143 try: 144 sent = os.sendfile(outfd, infd, offset, blocksize) 145 except OSError as err: 146 # ...in oder to have a more informative exception. 147 err.filename = fsrc.name 148 err.filename2 = fdst.name 149 150 if err.errno == errno.ENOTSOCK: 151 # sendfile() on this platform (probably Linux < 2.6.33) 152 # does not support copies between regular files (only 153 # sockets). 154 _USE_CP_SENDFILE = False 155 raise _GiveupOnFastCopy(err) 156 157 if err.errno == errno.ENOSPC: # filesystem is full 158 raise err from None 159 160 # Give up on first call and if no data was copied. 161 if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: 162 raise _GiveupOnFastCopy(err) 163 164 raise err 165 else: 166 if sent == 0: 167 break # EOF 168 offset += sent 169 170def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): 171 """readinto()/memoryview() based variant of copyfileobj(). 172 *fsrc* must support readinto() method and both files must be 173 open in binary mode. 174 """ 175 # Localize variable access to minimize overhead. 176 fsrc_readinto = fsrc.readinto 177 fdst_write = fdst.write 178 with memoryview(bytearray(length)) as mv: 179 while True: 180 n = fsrc_readinto(mv) 181 if not n: 182 break 183 elif n < length: 184 with mv[:n] as smv: 185 fdst.write(smv) 186 else: 187 fdst_write(mv) 188 189def copyfileobj(fsrc, fdst, length=0): 190 """copy data from file-like object fsrc to file-like object fdst""" 191 if not length: 192 length = COPY_BUFSIZE 193 # Localize variable access to minimize overhead. 194 fsrc_read = fsrc.read 195 fdst_write = fdst.write 196 while True: 197 buf = fsrc_read(length) 198 if not buf: 199 break 200 fdst_write(buf) 201 202def _samefile(src, dst): 203 # Macintosh, Unix. 204 if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): 205 try: 206 return os.path.samestat(src.stat(), os.stat(dst)) 207 except OSError: 208 return False 209 210 if hasattr(os.path, 'samefile'): 211 try: 212 return os.path.samefile(src, dst) 213 except OSError: 214 return False 215 216 # All other platforms: check for same pathname. 217 return (os.path.normcase(os.path.abspath(src)) == 218 os.path.normcase(os.path.abspath(dst))) 219 220def _stat(fn): 221 return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) 222 223def _islink(fn): 224 return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) 225 226def copyfile(src, dst, *, follow_symlinks=True): 227 """Copy data from src to dst in the most efficient way possible. 228 229 If follow_symlinks is not set and src is a symbolic link, a new 230 symlink will be created instead of copying the file it points to. 231 232 """ 233 sys.audit("shutil.copyfile", src, dst) 234 235 if _samefile(src, dst): 236 raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) 237 238 file_size = 0 239 for i, fn in enumerate([src, dst]): 240 try: 241 st = _stat(fn) 242 except OSError: 243 # File most likely does not exist 244 pass 245 else: 246 # XXX What about other special files? (sockets, devices...) 247 if stat.S_ISFIFO(st.st_mode): 248 fn = fn.path if isinstance(fn, os.DirEntry) else fn 249 raise SpecialFileError("`%s` is a named pipe" % fn) 250 if _WINDOWS and i == 0: 251 file_size = st.st_size 252 253 if not follow_symlinks and _islink(src): 254 os.symlink(os.readlink(src), dst) 255 else: 256 with open(src, 'rb') as fsrc: 257 try: 258 with open(dst, 'wb') as fdst: 259 # macOS 260 if _HAS_FCOPYFILE: 261 try: 262 _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) 263 return dst 264 except _GiveupOnFastCopy: 265 pass 266 # Linux 267 elif _USE_CP_SENDFILE: 268 try: 269 _fastcopy_sendfile(fsrc, fdst) 270 return dst 271 except _GiveupOnFastCopy: 272 pass 273 # Windows, see: 274 # https://github.com/python/cpython/pull/7160#discussion_r195405230 275 elif _WINDOWS and file_size > 0: 276 _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) 277 return dst 278 279 copyfileobj(fsrc, fdst) 280 281 # Issue 43219, raise a less confusing exception 282 except IsADirectoryError as e: 283 if not os.path.exists(dst): 284 raise FileNotFoundError(f'Directory does not exist: {dst}') from e 285 else: 286 raise 287 288 return dst 289 290def copymode(src, dst, *, follow_symlinks=True): 291 """Copy mode bits from src to dst. 292 293 If follow_symlinks is not set, symlinks aren't followed if and only 294 if both `src` and `dst` are symlinks. If `lchmod` isn't available 295 (e.g. Linux) this method does nothing. 296 297 """ 298 sys.audit("shutil.copymode", src, dst) 299 300 if not follow_symlinks and _islink(src) and os.path.islink(dst): 301 if hasattr(os, 'lchmod'): 302 stat_func, chmod_func = os.lstat, os.lchmod 303 else: 304 return 305 else: 306 stat_func, chmod_func = _stat, os.chmod 307 308 st = stat_func(src) 309 chmod_func(dst, stat.S_IMODE(st.st_mode)) 310 311if hasattr(os, 'listxattr'): 312 def _copyxattr(src, dst, *, follow_symlinks=True): 313 """Copy extended filesystem attributes from `src` to `dst`. 314 315 Overwrite existing attributes. 316 317 If `follow_symlinks` is false, symlinks won't be followed. 318 319 """ 320 321 try: 322 names = os.listxattr(src, follow_symlinks=follow_symlinks) 323 except OSError as e: 324 if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): 325 raise 326 return 327 for name in names: 328 try: 329 value = os.getxattr(src, name, follow_symlinks=follow_symlinks) 330 os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) 331 except OSError as e: 332 if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, 333 errno.EINVAL): 334 raise 335else: 336 def _copyxattr(*args, **kwargs): 337 pass 338 339def copystat(src, dst, *, follow_symlinks=True): 340 """Copy file metadata 341 342 Copy the permission bits, last access time, last modification time, and 343 flags from `src` to `dst`. On Linux, copystat() also copies the "extended 344 attributes" where possible. The file contents, owner, and group are 345 unaffected. `src` and `dst` are path-like objects or path names given as 346 strings. 347 348 If the optional flag `follow_symlinks` is not set, symlinks aren't 349 followed if and only if both `src` and `dst` are symlinks. 350 """ 351 sys.audit("shutil.copystat", src, dst) 352 353 def _nop(*args, ns=None, follow_symlinks=None): 354 pass 355 356 # follow symlinks (aka don't not follow symlinks) 357 follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) 358 if follow: 359 # use the real function if it exists 360 def lookup(name): 361 return getattr(os, name, _nop) 362 else: 363 # use the real function only if it exists 364 # *and* it supports follow_symlinks 365 def lookup(name): 366 fn = getattr(os, name, _nop) 367 if fn in os.supports_follow_symlinks: 368 return fn 369 return _nop 370 371 if isinstance(src, os.DirEntry): 372 st = src.stat(follow_symlinks=follow) 373 else: 374 st = lookup("stat")(src, follow_symlinks=follow) 375 mode = stat.S_IMODE(st.st_mode) 376 lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), 377 follow_symlinks=follow) 378 # We must copy extended attributes before the file is (potentially) 379 # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. 380 _copyxattr(src, dst, follow_symlinks=follow) 381 try: 382 lookup("chmod")(dst, mode, follow_symlinks=follow) 383 except NotImplementedError: 384 # if we got a NotImplementedError, it's because 385 # * follow_symlinks=False, 386 # * lchown() is unavailable, and 387 # * either 388 # * fchownat() is unavailable or 389 # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. 390 # (it returned ENOSUP.) 391 # therefore we're out of options--we simply cannot chown the 392 # symlink. give up, suppress the error. 393 # (which is what shutil always did in this circumstance.) 394 pass 395 if hasattr(st, 'st_flags'): 396 try: 397 lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) 398 except OSError as why: 399 for err in 'EOPNOTSUPP', 'ENOTSUP': 400 if hasattr(errno, err) and why.errno == getattr(errno, err): 401 break 402 else: 403 raise 404 405def copy(src, dst, *, follow_symlinks=True): 406 """Copy data and mode bits ("cp src dst"). Return the file's destination. 407 408 The destination may be a directory. 409 410 If follow_symlinks is false, symlinks won't be followed. This 411 resembles GNU's "cp -P src dst". 412 413 If source and destination are the same file, a SameFileError will be 414 raised. 415 416 """ 417 if os.path.isdir(dst): 418 dst = os.path.join(dst, os.path.basename(src)) 419 copyfile(src, dst, follow_symlinks=follow_symlinks) 420 copymode(src, dst, follow_symlinks=follow_symlinks) 421 return dst 422 423def copy2(src, dst, *, follow_symlinks=True): 424 """Copy data and metadata. Return the file's destination. 425 426 Metadata is copied with copystat(). Please see the copystat function 427 for more information. 428 429 The destination may be a directory. 430 431 If follow_symlinks is false, symlinks won't be followed. This 432 resembles GNU's "cp -P src dst". 433 """ 434 if os.path.isdir(dst): 435 dst = os.path.join(dst, os.path.basename(src)) 436 copyfile(src, dst, follow_symlinks=follow_symlinks) 437 copystat(src, dst, follow_symlinks=follow_symlinks) 438 return dst 439 440def ignore_patterns(*patterns): 441 """Function that can be used as copytree() ignore parameter. 442 443 Patterns is a sequence of glob-style patterns 444 that are used to exclude files""" 445 def _ignore_patterns(path, names): 446 ignored_names = [] 447 for pattern in patterns: 448 ignored_names.extend(fnmatch.filter(names, pattern)) 449 return set(ignored_names) 450 return _ignore_patterns 451 452def _copytree(entries, src, dst, symlinks, ignore, copy_function, 453 ignore_dangling_symlinks, dirs_exist_ok=False): 454 if ignore is not None: 455 ignored_names = ignore(os.fspath(src), [x.name for x in entries]) 456 else: 457 ignored_names = set() 458 459 os.makedirs(dst, exist_ok=dirs_exist_ok) 460 errors = [] 461 use_srcentry = copy_function is copy2 or copy_function is copy 462 463 for srcentry in entries: 464 if srcentry.name in ignored_names: 465 continue 466 srcname = os.path.join(src, srcentry.name) 467 dstname = os.path.join(dst, srcentry.name) 468 srcobj = srcentry if use_srcentry else srcname 469 try: 470 is_symlink = srcentry.is_symlink() 471 if is_symlink and os.name == 'nt': 472 # Special check for directory junctions, which appear as 473 # symlinks but we want to recurse. 474 lstat = srcentry.stat(follow_symlinks=False) 475 if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: 476 is_symlink = False 477 if is_symlink: 478 linkto = os.readlink(srcname) 479 if symlinks: 480 # We can't just leave it to `copy_function` because legacy 481 # code with a custom `copy_function` may rely on copytree 482 # doing the right thing. 483 os.symlink(linkto, dstname) 484 copystat(srcobj, dstname, follow_symlinks=not symlinks) 485 else: 486 # ignore dangling symlink if the flag is on 487 if not os.path.exists(linkto) and ignore_dangling_symlinks: 488 continue 489 # otherwise let the copy occur. copy2 will raise an error 490 if srcentry.is_dir(): 491 copytree(srcobj, dstname, symlinks, ignore, 492 copy_function, ignore_dangling_symlinks, 493 dirs_exist_ok) 494 else: 495 copy_function(srcobj, dstname) 496 elif srcentry.is_dir(): 497 copytree(srcobj, dstname, symlinks, ignore, copy_function, 498 ignore_dangling_symlinks, dirs_exist_ok) 499 else: 500 # Will raise a SpecialFileError for unsupported file types 501 copy_function(srcobj, dstname) 502 # catch the Error from the recursive copytree so that we can 503 # continue with other files 504 except Error as err: 505 errors.extend(err.args[0]) 506 except OSError as why: 507 errors.append((srcname, dstname, str(why))) 508 try: 509 copystat(src, dst) 510 except OSError as why: 511 # Copying file access times may fail on Windows 512 if getattr(why, 'winerror', None) is None: 513 errors.append((src, dst, str(why))) 514 if errors: 515 raise Error(errors) 516 return dst 517 518def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, 519 ignore_dangling_symlinks=False, dirs_exist_ok=False): 520 """Recursively copy a directory tree and return the destination directory. 521 522 If exception(s) occur, an Error is raised with a list of reasons. 523 524 If the optional symlinks flag is true, symbolic links in the 525 source tree result in symbolic links in the destination tree; if 526 it is false, the contents of the files pointed to by symbolic 527 links are copied. If the file pointed by the symlink doesn't 528 exist, an exception will be added in the list of errors raised in 529 an Error exception at the end of the copy process. 530 531 You can set the optional ignore_dangling_symlinks flag to true if you 532 want to silence this exception. Notice that this has no effect on 533 platforms that don't support os.symlink. 534 535 The optional ignore argument is a callable. If given, it 536 is called with the `src` parameter, which is the directory 537 being visited by copytree(), and `names` which is the list of 538 `src` contents, as returned by os.listdir(): 539 540 callable(src, names) -> ignored_names 541 542 Since copytree() is called recursively, the callable will be 543 called once for each directory that is copied. It returns a 544 list of names relative to the `src` directory that should 545 not be copied. 546 547 The optional copy_function argument is a callable that will be used 548 to copy each file. It will be called with the source path and the 549 destination path as arguments. By default, copy2() is used, but any 550 function that supports the same signature (like copy()) can be used. 551 552 If dirs_exist_ok is false (the default) and `dst` already exists, a 553 `FileExistsError` is raised. If `dirs_exist_ok` is true, the copying 554 operation will continue if it encounters existing directories, and files 555 within the `dst` tree will be overwritten by corresponding files from the 556 `src` tree. 557 """ 558 sys.audit("shutil.copytree", src, dst) 559 with os.scandir(src) as itr: 560 entries = list(itr) 561 return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, 562 ignore=ignore, copy_function=copy_function, 563 ignore_dangling_symlinks=ignore_dangling_symlinks, 564 dirs_exist_ok=dirs_exist_ok) 565 566if hasattr(os.stat_result, 'st_file_attributes'): 567 # Special handling for directory junctions to make them behave like 568 # symlinks for shutil.rmtree, since in general they do not appear as 569 # regular links. 570 def _rmtree_isdir(entry): 571 try: 572 st = entry.stat(follow_symlinks=False) 573 return (stat.S_ISDIR(st.st_mode) and not 574 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 575 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 576 except OSError: 577 return False 578 579 def _rmtree_islink(path): 580 try: 581 st = os.lstat(path) 582 return (stat.S_ISLNK(st.st_mode) or 583 (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 584 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 585 except OSError: 586 return False 587else: 588 def _rmtree_isdir(entry): 589 try: 590 return entry.is_dir(follow_symlinks=False) 591 except OSError: 592 return False 593 594 def _rmtree_islink(path): 595 return os.path.islink(path) 596 597# version vulnerable to race conditions 598def _rmtree_unsafe(path, onerror): 599 try: 600 with os.scandir(path) as scandir_it: 601 entries = list(scandir_it) 602 except OSError: 603 onerror(os.scandir, path, sys.exc_info()) 604 entries = [] 605 for entry in entries: 606 fullname = entry.path 607 if _rmtree_isdir(entry): 608 try: 609 if entry.is_symlink(): 610 # This can only happen if someone replaces 611 # a directory with a symlink after the call to 612 # os.scandir or entry.is_dir above. 613 raise OSError("Cannot call rmtree on a symbolic link") 614 except OSError: 615 onerror(os.path.islink, fullname, sys.exc_info()) 616 continue 617 _rmtree_unsafe(fullname, onerror) 618 else: 619 try: 620 os.unlink(fullname) 621 except OSError: 622 onerror(os.unlink, fullname, sys.exc_info()) 623 try: 624 os.rmdir(path) 625 except OSError: 626 onerror(os.rmdir, path, sys.exc_info()) 627 628# Version using fd-based APIs to protect against races 629def _rmtree_safe_fd(topfd, path, onerror): 630 try: 631 with os.scandir(topfd) as scandir_it: 632 entries = list(scandir_it) 633 except OSError as err: 634 err.filename = path 635 onerror(os.scandir, path, sys.exc_info()) 636 return 637 for entry in entries: 638 fullname = os.path.join(path, entry.name) 639 try: 640 is_dir = entry.is_dir(follow_symlinks=False) 641 except OSError: 642 is_dir = False 643 else: 644 if is_dir: 645 try: 646 orig_st = entry.stat(follow_symlinks=False) 647 is_dir = stat.S_ISDIR(orig_st.st_mode) 648 except OSError: 649 onerror(os.lstat, fullname, sys.exc_info()) 650 continue 651 if is_dir: 652 try: 653 dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) 654 dirfd_closed = False 655 except OSError: 656 onerror(os.open, fullname, sys.exc_info()) 657 else: 658 try: 659 if os.path.samestat(orig_st, os.fstat(dirfd)): 660 _rmtree_safe_fd(dirfd, fullname, onerror) 661 try: 662 os.close(dirfd) 663 dirfd_closed = True 664 os.rmdir(entry.name, dir_fd=topfd) 665 except OSError: 666 onerror(os.rmdir, fullname, sys.exc_info()) 667 else: 668 try: 669 # This can only happen if someone replaces 670 # a directory with a symlink after the call to 671 # os.scandir or stat.S_ISDIR above. 672 raise OSError("Cannot call rmtree on a symbolic " 673 "link") 674 except OSError: 675 onerror(os.path.islink, fullname, sys.exc_info()) 676 finally: 677 if not dirfd_closed: 678 os.close(dirfd) 679 else: 680 try: 681 os.unlink(entry.name, dir_fd=topfd) 682 except OSError: 683 onerror(os.unlink, fullname, sys.exc_info()) 684 685_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 686 os.supports_dir_fd and 687 os.scandir in os.supports_fd and 688 os.stat in os.supports_follow_symlinks) 689 690def rmtree(path, ignore_errors=False, onerror=None, *, dir_fd=None): 691 """Recursively delete a directory tree. 692 693 If dir_fd is not None, it should be a file descriptor open to a directory; 694 path will then be relative to that directory. 695 dir_fd may not be implemented on your platform. 696 If it is unavailable, using it will raise a NotImplementedError. 697 698 If ignore_errors is set, errors are ignored; otherwise, if onerror 699 is set, it is called to handle the error with arguments (func, 700 path, exc_info) where func is platform and implementation dependent; 701 path is the argument to that function that caused it to fail; and 702 exc_info is a tuple returned by sys.exc_info(). If ignore_errors 703 is false and onerror is None, an exception is raised. 704 705 """ 706 sys.audit("shutil.rmtree", path, dir_fd) 707 if ignore_errors: 708 def onerror(*args): 709 pass 710 elif onerror is None: 711 def onerror(*args): 712 raise 713 if _use_fd_functions: 714 # While the unsafe rmtree works fine on bytes, the fd based does not. 715 if isinstance(path, bytes): 716 path = os.fsdecode(path) 717 # Note: To guard against symlink races, we use the standard 718 # lstat()/open()/fstat() trick. 719 try: 720 orig_st = os.lstat(path, dir_fd=dir_fd) 721 except Exception: 722 onerror(os.lstat, path, sys.exc_info()) 723 return 724 try: 725 fd = os.open(path, os.O_RDONLY, dir_fd=dir_fd) 726 fd_closed = False 727 except Exception: 728 onerror(os.open, path, sys.exc_info()) 729 return 730 try: 731 if os.path.samestat(orig_st, os.fstat(fd)): 732 _rmtree_safe_fd(fd, path, onerror) 733 try: 734 os.close(fd) 735 fd_closed = True 736 os.rmdir(path, dir_fd=dir_fd) 737 except OSError: 738 onerror(os.rmdir, path, sys.exc_info()) 739 else: 740 try: 741 # symlinks to directories are forbidden, see bug #1669 742 raise OSError("Cannot call rmtree on a symbolic link") 743 except OSError: 744 onerror(os.path.islink, path, sys.exc_info()) 745 finally: 746 if not fd_closed: 747 os.close(fd) 748 else: 749 if dir_fd is not None: 750 raise NotImplementedError("dir_fd unavailable on this platform") 751 try: 752 if _rmtree_islink(path): 753 # symlinks to directories are forbidden, see bug #1669 754 raise OSError("Cannot call rmtree on a symbolic link") 755 except OSError: 756 onerror(os.path.islink, path, sys.exc_info()) 757 # can't continue even if onerror hook returns 758 return 759 return _rmtree_unsafe(path, onerror) 760 761# Allow introspection of whether or not the hardening against symlink 762# attacks is supported on the current platform 763rmtree.avoids_symlink_attacks = _use_fd_functions 764 765def _basename(path): 766 """A basename() variant which first strips the trailing slash, if present. 767 Thus we always get the last component of the path, even for directories. 768 769 path: Union[PathLike, str] 770 771 e.g. 772 >>> os.path.basename('/bar/foo') 773 'foo' 774 >>> os.path.basename('/bar/foo/') 775 '' 776 >>> _basename('/bar/foo/') 777 'foo' 778 """ 779 path = os.fspath(path) 780 sep = os.path.sep + (os.path.altsep or '') 781 return os.path.basename(path.rstrip(sep)) 782 783def move(src, dst, copy_function=copy2): 784 """Recursively move a file or directory to another location. This is 785 similar to the Unix "mv" command. Return the file or directory's 786 destination. 787 788 If the destination is a directory or a symlink to a directory, the source 789 is moved inside the directory. The destination path must not already 790 exist. 791 792 If the destination already exists but is not a directory, it may be 793 overwritten depending on os.rename() semantics. 794 795 If the destination is on our current filesystem, then rename() is used. 796 Otherwise, src is copied to the destination and then removed. Symlinks are 797 recreated under the new name if os.rename() fails because of cross 798 filesystem renames. 799 800 The optional `copy_function` argument is a callable that will be used 801 to copy the source or it will be delegated to `copytree`. 802 By default, copy2() is used, but any function that supports the same 803 signature (like copy()) can be used. 804 805 A lot more could be done here... A look at a mv.c shows a lot of 806 the issues this implementation glosses over. 807 808 """ 809 sys.audit("shutil.move", src, dst) 810 real_dst = dst 811 if os.path.isdir(dst): 812 if _samefile(src, dst): 813 # We might be on a case insensitive filesystem, 814 # perform the rename anyway. 815 os.rename(src, dst) 816 return 817 818 # Using _basename instead of os.path.basename is important, as we must 819 # ignore any trailing slash to avoid the basename returning '' 820 real_dst = os.path.join(dst, _basename(src)) 821 822 if os.path.exists(real_dst): 823 raise Error("Destination path '%s' already exists" % real_dst) 824 try: 825 os.rename(src, real_dst) 826 except OSError: 827 if os.path.islink(src): 828 linkto = os.readlink(src) 829 os.symlink(linkto, real_dst) 830 os.unlink(src) 831 elif os.path.isdir(src): 832 if _destinsrc(src, dst): 833 raise Error("Cannot move a directory '%s' into itself" 834 " '%s'." % (src, dst)) 835 if (_is_immutable(src) 836 or (not os.access(src, os.W_OK) and os.listdir(src) 837 and sys.platform == 'darwin')): 838 raise PermissionError("Cannot move the non-empty directory " 839 "'%s': Lacking write permission to '%s'." 840 % (src, src)) 841 copytree(src, real_dst, copy_function=copy_function, 842 symlinks=True) 843 rmtree(src) 844 else: 845 copy_function(src, real_dst) 846 os.unlink(src) 847 return real_dst 848 849def _destinsrc(src, dst): 850 src = os.path.abspath(src) 851 dst = os.path.abspath(dst) 852 if not src.endswith(os.path.sep): 853 src += os.path.sep 854 if not dst.endswith(os.path.sep): 855 dst += os.path.sep 856 return dst.startswith(src) 857 858def _is_immutable(src): 859 st = _stat(src) 860 immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] 861 return hasattr(st, 'st_flags') and st.st_flags in immutable_states 862 863def _get_gid(name): 864 """Returns a gid, given a group name.""" 865 if name is None: 866 return None 867 868 try: 869 from grp import getgrnam 870 except ImportError: 871 return None 872 873 try: 874 result = getgrnam(name) 875 except KeyError: 876 result = None 877 if result is not None: 878 return result[2] 879 return None 880 881def _get_uid(name): 882 """Returns an uid, given a user name.""" 883 if name is None: 884 return None 885 886 try: 887 from pwd import getpwnam 888 except ImportError: 889 return None 890 891 try: 892 result = getpwnam(name) 893 except KeyError: 894 result = None 895 if result is not None: 896 return result[2] 897 return None 898 899def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, 900 owner=None, group=None, logger=None, root_dir=None): 901 """Create a (possibly compressed) tar file from all the files under 902 'base_dir'. 903 904 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 905 906 'owner' and 'group' can be used to define an owner and a group for the 907 archive that is being built. If not provided, the current owner and group 908 will be used. 909 910 The output tar file will be named 'base_name' + ".tar", possibly plus 911 the appropriate compression extension (".gz", ".bz2", or ".xz"). 912 913 Returns the output filename. 914 """ 915 if compress is None: 916 tar_compression = '' 917 elif _ZLIB_SUPPORTED and compress == 'gzip': 918 tar_compression = 'gz' 919 elif _BZ2_SUPPORTED and compress == 'bzip2': 920 tar_compression = 'bz2' 921 elif _LZMA_SUPPORTED and compress == 'xz': 922 tar_compression = 'xz' 923 else: 924 raise ValueError("bad value for 'compress', or compression format not " 925 "supported : {0}".format(compress)) 926 927 import tarfile # late import for breaking circular dependency 928 929 compress_ext = '.' + tar_compression if compress else '' 930 archive_name = base_name + '.tar' + compress_ext 931 archive_dir = os.path.dirname(archive_name) 932 933 if archive_dir and not os.path.exists(archive_dir): 934 if logger is not None: 935 logger.info("creating %s", archive_dir) 936 if not dry_run: 937 os.makedirs(archive_dir) 938 939 # creating the tarball 940 if logger is not None: 941 logger.info('Creating tar archive') 942 943 uid = _get_uid(owner) 944 gid = _get_gid(group) 945 946 def _set_uid_gid(tarinfo): 947 if gid is not None: 948 tarinfo.gid = gid 949 tarinfo.gname = group 950 if uid is not None: 951 tarinfo.uid = uid 952 tarinfo.uname = owner 953 return tarinfo 954 955 if not dry_run: 956 tar = tarfile.open(archive_name, 'w|%s' % tar_compression) 957 arcname = base_dir 958 if root_dir is not None: 959 base_dir = os.path.join(root_dir, base_dir) 960 try: 961 tar.add(base_dir, arcname, filter=_set_uid_gid) 962 finally: 963 tar.close() 964 965 if root_dir is not None: 966 archive_name = os.path.abspath(archive_name) 967 return archive_name 968 969def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, 970 logger=None, owner=None, group=None, root_dir=None): 971 """Create a zip file from all the files under 'base_dir'. 972 973 The output zip file will be named 'base_name' + ".zip". Returns the 974 name of the output zip file. 975 """ 976 import zipfile # late import for breaking circular dependency 977 978 zip_filename = base_name + ".zip" 979 archive_dir = os.path.dirname(base_name) 980 981 if archive_dir and not os.path.exists(archive_dir): 982 if logger is not None: 983 logger.info("creating %s", archive_dir) 984 if not dry_run: 985 os.makedirs(archive_dir) 986 987 if logger is not None: 988 logger.info("creating '%s' and adding '%s' to it", 989 zip_filename, base_dir) 990 991 if not dry_run: 992 with zipfile.ZipFile(zip_filename, "w", 993 compression=zipfile.ZIP_DEFLATED) as zf: 994 arcname = os.path.normpath(base_dir) 995 if root_dir is not None: 996 base_dir = os.path.join(root_dir, base_dir) 997 base_dir = os.path.normpath(base_dir) 998 if arcname != os.curdir: 999 zf.write(base_dir, arcname) 1000 if logger is not None: 1001 logger.info("adding '%s'", base_dir) 1002 for dirpath, dirnames, filenames in os.walk(base_dir): 1003 arcdirpath = dirpath 1004 if root_dir is not None: 1005 arcdirpath = os.path.relpath(arcdirpath, root_dir) 1006 arcdirpath = os.path.normpath(arcdirpath) 1007 for name in sorted(dirnames): 1008 path = os.path.join(dirpath, name) 1009 arcname = os.path.join(arcdirpath, name) 1010 zf.write(path, arcname) 1011 if logger is not None: 1012 logger.info("adding '%s'", path) 1013 for name in filenames: 1014 path = os.path.join(dirpath, name) 1015 path = os.path.normpath(path) 1016 if os.path.isfile(path): 1017 arcname = os.path.join(arcdirpath, name) 1018 zf.write(path, arcname) 1019 if logger is not None: 1020 logger.info("adding '%s'", path) 1021 1022 if root_dir is not None: 1023 zip_filename = os.path.abspath(zip_filename) 1024 return zip_filename 1025 1026# Maps the name of the archive format to a tuple containing: 1027# * the archiving function 1028# * extra keyword arguments 1029# * description 1030# * does it support the root_dir argument? 1031_ARCHIVE_FORMATS = { 1032 'tar': (_make_tarball, [('compress', None)], 1033 "uncompressed tar file", True), 1034} 1035 1036if _ZLIB_SUPPORTED: 1037 _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], 1038 "gzip'ed tar-file", True) 1039 _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file", True) 1040 1041if _BZ2_SUPPORTED: 1042 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], 1043 "bzip2'ed tar-file", True) 1044 1045if _LZMA_SUPPORTED: 1046 _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], 1047 "xz'ed tar-file", True) 1048 1049def get_archive_formats(): 1050 """Returns a list of supported formats for archiving and unarchiving. 1051 1052 Each element of the returned sequence is a tuple (name, description) 1053 """ 1054 formats = [(name, registry[2]) for name, registry in 1055 _ARCHIVE_FORMATS.items()] 1056 formats.sort() 1057 return formats 1058 1059def register_archive_format(name, function, extra_args=None, description=''): 1060 """Registers an archive format. 1061 1062 name is the name of the format. function is the callable that will be 1063 used to create archives. If provided, extra_args is a sequence of 1064 (name, value) tuples that will be passed as arguments to the callable. 1065 description can be provided to describe the format, and will be returned 1066 by the get_archive_formats() function. 1067 """ 1068 if extra_args is None: 1069 extra_args = [] 1070 if not callable(function): 1071 raise TypeError('The %s object is not callable' % function) 1072 if not isinstance(extra_args, (tuple, list)): 1073 raise TypeError('extra_args needs to be a sequence') 1074 for element in extra_args: 1075 if not isinstance(element, (tuple, list)) or len(element) !=2: 1076 raise TypeError('extra_args elements are : (arg_name, value)') 1077 1078 _ARCHIVE_FORMATS[name] = (function, extra_args, description, False) 1079 1080def unregister_archive_format(name): 1081 del _ARCHIVE_FORMATS[name] 1082 1083def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 1084 dry_run=0, owner=None, group=None, logger=None): 1085 """Create an archive file (eg. zip or tar). 1086 1087 'base_name' is the name of the file to create, minus any format-specific 1088 extension; 'format' is the archive format: one of "zip", "tar", "gztar", 1089 "bztar", or "xztar". Or any other registered format. 1090 1091 'root_dir' is a directory that will be the root directory of the 1092 archive; ie. we typically chdir into 'root_dir' before creating the 1093 archive. 'base_dir' is the directory where we start archiving from; 1094 ie. 'base_dir' will be the common prefix of all files and 1095 directories in the archive. 'root_dir' and 'base_dir' both default 1096 to the current directory. Returns the name of the archive file. 1097 1098 'owner' and 'group' are used when creating a tar archive. By default, 1099 uses the current owner and group. 1100 """ 1101 sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) 1102 try: 1103 format_info = _ARCHIVE_FORMATS[format] 1104 except KeyError: 1105 raise ValueError("unknown archive format '%s'" % format) from None 1106 1107 kwargs = {'dry_run': dry_run, 'logger': logger, 1108 'owner': owner, 'group': group} 1109 1110 func = format_info[0] 1111 for arg, val in format_info[1]: 1112 kwargs[arg] = val 1113 1114 if base_dir is None: 1115 base_dir = os.curdir 1116 1117 support_root_dir = format_info[3] 1118 save_cwd = None 1119 if root_dir is not None: 1120 if support_root_dir: 1121 # Support path-like base_name here for backwards-compatibility. 1122 base_name = os.fspath(base_name) 1123 kwargs['root_dir'] = root_dir 1124 else: 1125 save_cwd = os.getcwd() 1126 if logger is not None: 1127 logger.debug("changing into '%s'", root_dir) 1128 base_name = os.path.abspath(base_name) 1129 if not dry_run: 1130 os.chdir(root_dir) 1131 1132 try: 1133 filename = func(base_name, base_dir, **kwargs) 1134 finally: 1135 if save_cwd is not None: 1136 if logger is not None: 1137 logger.debug("changing back to '%s'", save_cwd) 1138 os.chdir(save_cwd) 1139 1140 return filename 1141 1142 1143def get_unpack_formats(): 1144 """Returns a list of supported formats for unpacking. 1145 1146 Each element of the returned sequence is a tuple 1147 (name, extensions, description) 1148 """ 1149 formats = [(name, info[0], info[3]) for name, info in 1150 _UNPACK_FORMATS.items()] 1151 formats.sort() 1152 return formats 1153 1154def _check_unpack_options(extensions, function, extra_args): 1155 """Checks what gets registered as an unpacker.""" 1156 # first make sure no other unpacker is registered for this extension 1157 existing_extensions = {} 1158 for name, info in _UNPACK_FORMATS.items(): 1159 for ext in info[0]: 1160 existing_extensions[ext] = name 1161 1162 for extension in extensions: 1163 if extension in existing_extensions: 1164 msg = '%s is already registered for "%s"' 1165 raise RegistryError(msg % (extension, 1166 existing_extensions[extension])) 1167 1168 if not callable(function): 1169 raise TypeError('The registered function must be a callable') 1170 1171 1172def register_unpack_format(name, extensions, function, extra_args=None, 1173 description=''): 1174 """Registers an unpack format. 1175 1176 `name` is the name of the format. `extensions` is a list of extensions 1177 corresponding to the format. 1178 1179 `function` is the callable that will be 1180 used to unpack archives. The callable will receive archives to unpack. 1181 If it's unable to handle an archive, it needs to raise a ReadError 1182 exception. 1183 1184 If provided, `extra_args` is a sequence of 1185 (name, value) tuples that will be passed as arguments to the callable. 1186 description can be provided to describe the format, and will be returned 1187 by the get_unpack_formats() function. 1188 """ 1189 if extra_args is None: 1190 extra_args = [] 1191 _check_unpack_options(extensions, function, extra_args) 1192 _UNPACK_FORMATS[name] = extensions, function, extra_args, description 1193 1194def unregister_unpack_format(name): 1195 """Removes the pack format from the registry.""" 1196 del _UNPACK_FORMATS[name] 1197 1198def _ensure_directory(path): 1199 """Ensure that the parent directory of `path` exists""" 1200 dirname = os.path.dirname(path) 1201 if not os.path.isdir(dirname): 1202 os.makedirs(dirname) 1203 1204def _unpack_zipfile(filename, extract_dir): 1205 """Unpack zip `filename` to `extract_dir` 1206 """ 1207 import zipfile # late import for breaking circular dependency 1208 1209 if not zipfile.is_zipfile(filename): 1210 raise ReadError("%s is not a zip file" % filename) 1211 1212 zip = zipfile.ZipFile(filename) 1213 try: 1214 for info in zip.infolist(): 1215 name = info.filename 1216 1217 # don't extract absolute paths or ones with .. in them 1218 if name.startswith('/') or '..' in name: 1219 continue 1220 1221 targetpath = os.path.join(extract_dir, *name.split('/')) 1222 if not targetpath: 1223 continue 1224 1225 _ensure_directory(targetpath) 1226 if not name.endswith('/'): 1227 # file 1228 with zip.open(name, 'r') as source, \ 1229 open(targetpath, 'wb') as target: 1230 copyfileobj(source, target) 1231 finally: 1232 zip.close() 1233 1234def _unpack_tarfile(filename, extract_dir, *, filter=None): 1235 """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` 1236 """ 1237 import tarfile # late import for breaking circular dependency 1238 try: 1239 tarobj = tarfile.open(filename) 1240 except tarfile.TarError: 1241 raise ReadError( 1242 "%s is not a compressed or uncompressed tar file" % filename) 1243 try: 1244 tarobj.extractall(extract_dir, filter=filter) 1245 finally: 1246 tarobj.close() 1247 1248# Maps the name of the unpack format to a tuple containing: 1249# * extensions 1250# * the unpacking function 1251# * extra keyword arguments 1252# * description 1253_UNPACK_FORMATS = { 1254 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 1255 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 1256} 1257 1258if _ZLIB_SUPPORTED: 1259 _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [], 1260 "gzip'ed tar-file") 1261 1262if _BZ2_SUPPORTED: 1263 _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [], 1264 "bzip2'ed tar-file") 1265 1266if _LZMA_SUPPORTED: 1267 _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [], 1268 "xz'ed tar-file") 1269 1270def _find_unpack_format(filename): 1271 for name, info in _UNPACK_FORMATS.items(): 1272 for extension in info[0]: 1273 if filename.endswith(extension): 1274 return name 1275 return None 1276 1277def unpack_archive(filename, extract_dir=None, format=None, *, filter=None): 1278 """Unpack an archive. 1279 1280 `filename` is the name of the archive. 1281 1282 `extract_dir` is the name of the target directory, where the archive 1283 is unpacked. If not provided, the current working directory is used. 1284 1285 `format` is the archive format: one of "zip", "tar", "gztar", "bztar", 1286 or "xztar". Or any other registered format. If not provided, 1287 unpack_archive will use the filename extension and see if an unpacker 1288 was registered for that extension. 1289 1290 In case none is found, a ValueError is raised. 1291 1292 If `filter` is given, it is passed to the underlying 1293 extraction function. 1294 """ 1295 sys.audit("shutil.unpack_archive", filename, extract_dir, format) 1296 1297 if extract_dir is None: 1298 extract_dir = os.getcwd() 1299 1300 extract_dir = os.fspath(extract_dir) 1301 filename = os.fspath(filename) 1302 1303 if filter is None: 1304 filter_kwargs = {} 1305 else: 1306 filter_kwargs = {'filter': filter} 1307 if format is not None: 1308 try: 1309 format_info = _UNPACK_FORMATS[format] 1310 except KeyError: 1311 raise ValueError("Unknown unpack format '{0}'".format(format)) from None 1312 1313 func = format_info[1] 1314 func(filename, extract_dir, **dict(format_info[2]), **filter_kwargs) 1315 else: 1316 # we need to look at the registered unpackers supported extensions 1317 format = _find_unpack_format(filename) 1318 if format is None: 1319 raise ReadError("Unknown archive format '{0}'".format(filename)) 1320 1321 func = _UNPACK_FORMATS[format][1] 1322 kwargs = dict(_UNPACK_FORMATS[format][2]) | filter_kwargs 1323 func(filename, extract_dir, **kwargs) 1324 1325 1326if hasattr(os, 'statvfs'): 1327 1328 __all__.append('disk_usage') 1329 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1330 _ntuple_diskusage.total.__doc__ = 'Total space in bytes' 1331 _ntuple_diskusage.used.__doc__ = 'Used space in bytes' 1332 _ntuple_diskusage.free.__doc__ = 'Free space in bytes' 1333 1334 def disk_usage(path): 1335 """Return disk usage statistics about the given path. 1336 1337 Returned value is a named tuple with attributes 'total', 'used' and 1338 'free', which are the amount of total, used and free space, in bytes. 1339 """ 1340 st = os.statvfs(path) 1341 free = st.f_bavail * st.f_frsize 1342 total = st.f_blocks * st.f_frsize 1343 used = (st.f_blocks - st.f_bfree) * st.f_frsize 1344 return _ntuple_diskusage(total, used, free) 1345 1346elif _WINDOWS: 1347 1348 __all__.append('disk_usage') 1349 _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 1350 1351 def disk_usage(path): 1352 """Return disk usage statistics about the given path. 1353 1354 Returned values is a named tuple with attributes 'total', 'used' and 1355 'free', which are the amount of total, used and free space, in bytes. 1356 """ 1357 total, free = nt._getdiskusage(path) 1358 used = total - free 1359 return _ntuple_diskusage(total, used, free) 1360 1361 1362def chown(path, user=None, group=None): 1363 """Change owner user and group of the given path. 1364 1365 user and group can be the uid/gid or the user/group names, and in that case, 1366 they are converted to their respective uid/gid. 1367 """ 1368 sys.audit('shutil.chown', path, user, group) 1369 1370 if user is None and group is None: 1371 raise ValueError("user and/or group must be set") 1372 1373 _user = user 1374 _group = group 1375 1376 # -1 means don't change it 1377 if user is None: 1378 _user = -1 1379 # user can either be an int (the uid) or a string (the system username) 1380 elif isinstance(user, str): 1381 _user = _get_uid(user) 1382 if _user is None: 1383 raise LookupError("no such user: {!r}".format(user)) 1384 1385 if group is None: 1386 _group = -1 1387 elif not isinstance(group, int): 1388 _group = _get_gid(group) 1389 if _group is None: 1390 raise LookupError("no such group: {!r}".format(group)) 1391 1392 os.chown(path, _user, _group) 1393 1394def get_terminal_size(fallback=(80, 24)): 1395 """Get the size of the terminal window. 1396 1397 For each of the two dimensions, the environment variable, COLUMNS 1398 and LINES respectively, is checked. If the variable is defined and 1399 the value is a positive integer, it is used. 1400 1401 When COLUMNS or LINES is not defined, which is the common case, 1402 the terminal connected to sys.__stdout__ is queried 1403 by invoking os.get_terminal_size. 1404 1405 If the terminal size cannot be successfully queried, either because 1406 the system doesn't support querying, or because we are not 1407 connected to a terminal, the value given in fallback parameter 1408 is used. Fallback defaults to (80, 24) which is the default 1409 size used by many terminal emulators. 1410 1411 The value returned is a named tuple of type os.terminal_size. 1412 """ 1413 # columns, lines are the working values 1414 try: 1415 columns = int(os.environ['COLUMNS']) 1416 except (KeyError, ValueError): 1417 columns = 0 1418 1419 try: 1420 lines = int(os.environ['LINES']) 1421 except (KeyError, ValueError): 1422 lines = 0 1423 1424 # only query if necessary 1425 if columns <= 0 or lines <= 0: 1426 try: 1427 size = os.get_terminal_size(sys.__stdout__.fileno()) 1428 except (AttributeError, ValueError, OSError): 1429 # stdout is None, closed, detached, or not a terminal, or 1430 # os.get_terminal_size() is unsupported 1431 size = os.terminal_size(fallback) 1432 if columns <= 0: 1433 columns = size.columns or fallback[0] 1434 if lines <= 0: 1435 lines = size.lines or fallback[1] 1436 1437 return os.terminal_size((columns, lines)) 1438 1439 1440# Check that a given file can be accessed with the correct mode. 1441# Additionally check that `file` is not a directory, as on Windows 1442# directories pass the os.access check. 1443def _access_check(fn, mode): 1444 return (os.path.exists(fn) and os.access(fn, mode) 1445 and not os.path.isdir(fn)) 1446 1447 1448def which(cmd, mode=os.F_OK | os.X_OK, path=None): 1449 """Given a command, mode, and a PATH string, return the path which 1450 conforms to the given mode on the PATH, or None if there is no such 1451 file. 1452 1453 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 1454 of os.environ.get("PATH"), or can be overridden with a custom search 1455 path. 1456 1457 """ 1458 # If we're given a path with a directory part, look it up directly rather 1459 # than referring to PATH directories. This includes checking relative to the 1460 # current directory, e.g. ./script 1461 if os.path.dirname(cmd): 1462 if _access_check(cmd, mode): 1463 return cmd 1464 return None 1465 1466 use_bytes = isinstance(cmd, bytes) 1467 1468 if path is None: 1469 path = os.environ.get("PATH", None) 1470 if path is None: 1471 try: 1472 path = os.confstr("CS_PATH") 1473 except (AttributeError, ValueError): 1474 # os.confstr() or CS_PATH is not available 1475 path = os.defpath 1476 # bpo-35755: Don't use os.defpath if the PATH environment variable is 1477 # set to an empty string 1478 1479 # PATH='' doesn't match, whereas PATH=':' looks in the current directory 1480 if not path: 1481 return None 1482 1483 if use_bytes: 1484 path = os.fsencode(path) 1485 path = path.split(os.fsencode(os.pathsep)) 1486 else: 1487 path = os.fsdecode(path) 1488 path = path.split(os.pathsep) 1489 1490 if sys.platform == "win32": 1491 # The current directory takes precedence on Windows. 1492 curdir = os.curdir 1493 if use_bytes: 1494 curdir = os.fsencode(curdir) 1495 if curdir not in path: 1496 path.insert(0, curdir) 1497 1498 # PATHEXT is necessary to check on Windows. 1499 pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT 1500 pathext = [ext for ext in pathext_source.split(os.pathsep) if ext] 1501 1502 if use_bytes: 1503 pathext = [os.fsencode(ext) for ext in pathext] 1504 # See if the given file matches any of the expected path extensions. 1505 # This will allow us to short circuit when given "python.exe". 1506 # If it does match, only test that one, otherwise we have to try 1507 # others. 1508 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 1509 files = [cmd] 1510 else: 1511 files = [cmd + ext for ext in pathext] 1512 else: 1513 # On other platforms you don't have things like PATHEXT to tell you 1514 # what file suffixes are executable, so just pass on cmd as-is. 1515 files = [cmd] 1516 1517 seen = set() 1518 for dir in path: 1519 normdir = os.path.normcase(dir) 1520 if not normdir in seen: 1521 seen.add(normdir) 1522 for thefile in files: 1523 name = os.path.join(dir, thefile) 1524 if _access_check(name, mode): 1525 return name 1526 return None 1527