xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/shutil.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1"""Utility functions for copying and archiving files and directory trees.
2
3XXX The functions here don't copy the resource fork or other metadata on Mac.
4
5"""
6
7import os
8import sys
9import stat
10import fnmatch
11import collections
12import errno
13
14try:
15    import zlib
16    del zlib
17    _ZLIB_SUPPORTED = True
18except ImportError:
19    _ZLIB_SUPPORTED = False
20
21try:
22    import bz2
23    del bz2
24    _BZ2_SUPPORTED = True
25except ImportError:
26    _BZ2_SUPPORTED = False
27
28try:
29    import lzma
30    del lzma
31    _LZMA_SUPPORTED = True
32except ImportError:
33    _LZMA_SUPPORTED = False
34
35_WINDOWS = os.name == 'nt'
36posix = nt = None
37if os.name == 'posix':
38    import posix
39elif _WINDOWS:
40    import nt
41
42COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
43# This should never be removed, see rationale in:
44# https://bugs.python.org/issue43743#msg393429
45_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux")
46_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile")  # macOS
47
48# CMD defaults in Windows 10
49_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC"
50
51__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
52           "copytree", "move", "rmtree", "Error", "SpecialFileError",
53           "ExecError", "make_archive", "get_archive_formats",
54           "register_archive_format", "unregister_archive_format",
55           "get_unpack_formats", "register_unpack_format",
56           "unregister_unpack_format", "unpack_archive",
57           "ignore_patterns", "chown", "which", "get_terminal_size",
58           "SameFileError"]
59           # disk_usage is added later, if available on the platform
60
61class Error(OSError):
62    pass
63
64class SameFileError(Error):
65    """Raised when source and destination are the same file."""
66
67class SpecialFileError(OSError):
68    """Raised when trying to do a kind of operation (e.g. copying) which is
69    not supported on a special file (e.g. a named pipe)"""
70
71class ExecError(OSError):
72    """Raised when a command could not be executed"""
73
74class ReadError(OSError):
75    """Raised when an archive cannot be read"""
76
77class RegistryError(Exception):
78    """Raised when a registry operation with the archiving
79    and unpacking registries fails"""
80
81class _GiveupOnFastCopy(Exception):
82    """Raised as a signal to fallback on using raw read()/write()
83    file copy when fast-copy functions fail to do so.
84    """
85
86def _fastcopy_fcopyfile(fsrc, fdst, flags):
87    """Copy a regular file content or metadata by using high-performance
88    fcopyfile(3) syscall (macOS).
89    """
90    try:
91        infd = fsrc.fileno()
92        outfd = fdst.fileno()
93    except Exception as err:
94        raise _GiveupOnFastCopy(err)  # not a regular file
95
96    try:
97        posix._fcopyfile(infd, outfd, flags)
98    except OSError as err:
99        err.filename = fsrc.name
100        err.filename2 = fdst.name
101        if err.errno in {errno.EINVAL, errno.ENOTSUP}:
102            raise _GiveupOnFastCopy(err)
103        else:
104            raise err from None
105
106def _fastcopy_sendfile(fsrc, fdst):
107    """Copy data from one regular mmap-like fd to another by using
108    high-performance sendfile(2) syscall.
109    This should work on Linux >= 2.6.33 only.
110    """
111    # Note: copyfileobj() is left alone in order to not introduce any
112    # unexpected breakage. Possible risks by using zero-copy calls
113    # in copyfileobj() are:
114    # - fdst cannot be open in "a"(ppend) mode
115    # - fsrc and fdst may be open in "t"(ext) mode
116    # - fsrc may be a BufferedReader (which hides unread data in a buffer),
117    #   GzipFile (which decompresses data), HTTPResponse (which decodes
118    #   chunks).
119    # - possibly others (e.g. encrypted fs/partition?)
120    global _USE_CP_SENDFILE
121    try:
122        infd = fsrc.fileno()
123        outfd = fdst.fileno()
124    except Exception as err:
125        raise _GiveupOnFastCopy(err)  # not a regular file
126
127    # Hopefully the whole file will be copied in a single call.
128    # sendfile() is called in a loop 'till EOF is reached (0 return)
129    # so a bufsize smaller or bigger than the actual file size
130    # should not make any difference, also in case the file content
131    # changes while being copied.
132    try:
133        blocksize = max(os.fstat(infd).st_size, 2 ** 23)  # min 8MiB
134    except OSError:
135        blocksize = 2 ** 27  # 128MiB
136    # On 32-bit architectures truncate to 1GiB to avoid OverflowError,
137    # see bpo-38319.
138    if sys.maxsize < 2 ** 32:
139        blocksize = min(blocksize, 2 ** 30)
140
141    offset = 0
142    while True:
143        try:
144            sent = os.sendfile(outfd, infd, offset, blocksize)
145        except OSError as err:
146            # ...in oder to have a more informative exception.
147            err.filename = fsrc.name
148            err.filename2 = fdst.name
149
150            if err.errno == errno.ENOTSOCK:
151                # sendfile() on this platform (probably Linux < 2.6.33)
152                # does not support copies between regular files (only
153                # sockets).
154                _USE_CP_SENDFILE = False
155                raise _GiveupOnFastCopy(err)
156
157            if err.errno == errno.ENOSPC:  # filesystem is full
158                raise err from None
159
160            # Give up on first call and if no data was copied.
161            if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0:
162                raise _GiveupOnFastCopy(err)
163
164            raise err
165        else:
166            if sent == 0:
167                break  # EOF
168            offset += sent
169
170def _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE):
171    """readinto()/memoryview() based variant of copyfileobj().
172    *fsrc* must support readinto() method and both files must be
173    open in binary mode.
174    """
175    # Localize variable access to minimize overhead.
176    fsrc_readinto = fsrc.readinto
177    fdst_write = fdst.write
178    with memoryview(bytearray(length)) as mv:
179        while True:
180            n = fsrc_readinto(mv)
181            if not n:
182                break
183            elif n < length:
184                with mv[:n] as smv:
185                    fdst.write(smv)
186            else:
187                fdst_write(mv)
188
189def copyfileobj(fsrc, fdst, length=0):
190    """copy data from file-like object fsrc to file-like object fdst"""
191    if not length:
192        length = COPY_BUFSIZE
193    # Localize variable access to minimize overhead.
194    fsrc_read = fsrc.read
195    fdst_write = fdst.write
196    while True:
197        buf = fsrc_read(length)
198        if not buf:
199            break
200        fdst_write(buf)
201
202def _samefile(src, dst):
203    # Macintosh, Unix.
204    if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
205        try:
206            return os.path.samestat(src.stat(), os.stat(dst))
207        except OSError:
208            return False
209
210    if hasattr(os.path, 'samefile'):
211        try:
212            return os.path.samefile(src, dst)
213        except OSError:
214            return False
215
216    # All other platforms: check for same pathname.
217    return (os.path.normcase(os.path.abspath(src)) ==
218            os.path.normcase(os.path.abspath(dst)))
219
220def _stat(fn):
221    return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn)
222
223def _islink(fn):
224    return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn)
225
226def copyfile(src, dst, *, follow_symlinks=True):
227    """Copy data from src to dst in the most efficient way possible.
228
229    If follow_symlinks is not set and src is a symbolic link, a new
230    symlink will be created instead of copying the file it points to.
231
232    """
233    sys.audit("shutil.copyfile", src, dst)
234
235    if _samefile(src, dst):
236        raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
237
238    file_size = 0
239    for i, fn in enumerate([src, dst]):
240        try:
241            st = _stat(fn)
242        except OSError:
243            # File most likely does not exist
244            pass
245        else:
246            # XXX What about other special files? (sockets, devices...)
247            if stat.S_ISFIFO(st.st_mode):
248                fn = fn.path if isinstance(fn, os.DirEntry) else fn
249                raise SpecialFileError("`%s` is a named pipe" % fn)
250            if _WINDOWS and i == 0:
251                file_size = st.st_size
252
253    if not follow_symlinks and _islink(src):
254        os.symlink(os.readlink(src), dst)
255    else:
256        with open(src, 'rb') as fsrc:
257            try:
258                with open(dst, 'wb') as fdst:
259                    # macOS
260                    if _HAS_FCOPYFILE:
261                        try:
262                            _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA)
263                            return dst
264                        except _GiveupOnFastCopy:
265                            pass
266                    # Linux
267                    elif _USE_CP_SENDFILE:
268                        try:
269                            _fastcopy_sendfile(fsrc, fdst)
270                            return dst
271                        except _GiveupOnFastCopy:
272                            pass
273                    # Windows, see:
274                    # https://github.com/python/cpython/pull/7160#discussion_r195405230
275                    elif _WINDOWS and file_size > 0:
276                        _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE))
277                        return dst
278
279                    copyfileobj(fsrc, fdst)
280
281            # Issue 43219, raise a less confusing exception
282            except IsADirectoryError as e:
283                if not os.path.exists(dst):
284                    raise FileNotFoundError(f'Directory does not exist: {dst}') from e
285                else:
286                    raise
287
288    return dst
289
290def copymode(src, dst, *, follow_symlinks=True):
291    """Copy mode bits from src to dst.
292
293    If follow_symlinks is not set, symlinks aren't followed if and only
294    if both `src` and `dst` are symlinks.  If `lchmod` isn't available
295    (e.g. Linux) this method does nothing.
296
297    """
298    sys.audit("shutil.copymode", src, dst)
299
300    if not follow_symlinks and _islink(src) and os.path.islink(dst):
301        if hasattr(os, 'lchmod'):
302            stat_func, chmod_func = os.lstat, os.lchmod
303        else:
304            return
305    else:
306        stat_func, chmod_func = _stat, os.chmod
307
308    st = stat_func(src)
309    chmod_func(dst, stat.S_IMODE(st.st_mode))
310
311if hasattr(os, 'listxattr'):
312    def _copyxattr(src, dst, *, follow_symlinks=True):
313        """Copy extended filesystem attributes from `src` to `dst`.
314
315        Overwrite existing attributes.
316
317        If `follow_symlinks` is false, symlinks won't be followed.
318
319        """
320
321        try:
322            names = os.listxattr(src, follow_symlinks=follow_symlinks)
323        except OSError as e:
324            if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL):
325                raise
326            return
327        for name in names:
328            try:
329                value = os.getxattr(src, name, follow_symlinks=follow_symlinks)
330                os.setxattr(dst, name, value, follow_symlinks=follow_symlinks)
331            except OSError as e:
332                if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA,
333                                   errno.EINVAL):
334                    raise
335else:
336    def _copyxattr(*args, **kwargs):
337        pass
338
339def copystat(src, dst, *, follow_symlinks=True):
340    """Copy file metadata
341
342    Copy the permission bits, last access time, last modification time, and
343    flags from `src` to `dst`. On Linux, copystat() also copies the "extended
344    attributes" where possible. The file contents, owner, and group are
345    unaffected. `src` and `dst` are path-like objects or path names given as
346    strings.
347
348    If the optional flag `follow_symlinks` is not set, symlinks aren't
349    followed if and only if both `src` and `dst` are symlinks.
350    """
351    sys.audit("shutil.copystat", src, dst)
352
353    def _nop(*args, ns=None, follow_symlinks=None):
354        pass
355
356    # follow symlinks (aka don't not follow symlinks)
357    follow = follow_symlinks or not (_islink(src) and os.path.islink(dst))
358    if follow:
359        # use the real function if it exists
360        def lookup(name):
361            return getattr(os, name, _nop)
362    else:
363        # use the real function only if it exists
364        # *and* it supports follow_symlinks
365        def lookup(name):
366            fn = getattr(os, name, _nop)
367            if fn in os.supports_follow_symlinks:
368                return fn
369            return _nop
370
371    if isinstance(src, os.DirEntry):
372        st = src.stat(follow_symlinks=follow)
373    else:
374        st = lookup("stat")(src, follow_symlinks=follow)
375    mode = stat.S_IMODE(st.st_mode)
376    lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns),
377        follow_symlinks=follow)
378    # We must copy extended attributes before the file is (potentially)
379    # chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
380    _copyxattr(src, dst, follow_symlinks=follow)
381    try:
382        lookup("chmod")(dst, mode, follow_symlinks=follow)
383    except NotImplementedError:
384        # if we got a NotImplementedError, it's because
385        #   * follow_symlinks=False,
386        #   * lchown() is unavailable, and
387        #   * either
388        #       * fchownat() is unavailable or
389        #       * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
390        #         (it returned ENOSUP.)
391        # therefore we're out of options--we simply cannot chown the
392        # symlink.  give up, suppress the error.
393        # (which is what shutil always did in this circumstance.)
394        pass
395    if hasattr(st, 'st_flags'):
396        try:
397            lookup("chflags")(dst, st.st_flags, follow_symlinks=follow)
398        except OSError as why:
399            for err in 'EOPNOTSUPP', 'ENOTSUP':
400                if hasattr(errno, err) and why.errno == getattr(errno, err):
401                    break
402            else:
403                raise
404
405def copy(src, dst, *, follow_symlinks=True):
406    """Copy data and mode bits ("cp src dst"). Return the file's destination.
407
408    The destination may be a directory.
409
410    If follow_symlinks is false, symlinks won't be followed. This
411    resembles GNU's "cp -P src dst".
412
413    If source and destination are the same file, a SameFileError will be
414    raised.
415
416    """
417    if os.path.isdir(dst):
418        dst = os.path.join(dst, os.path.basename(src))
419    copyfile(src, dst, follow_symlinks=follow_symlinks)
420    copymode(src, dst, follow_symlinks=follow_symlinks)
421    return dst
422
423def copy2(src, dst, *, follow_symlinks=True):
424    """Copy data and metadata. Return the file's destination.
425
426    Metadata is copied with copystat(). Please see the copystat function
427    for more information.
428
429    The destination may be a directory.
430
431    If follow_symlinks is false, symlinks won't be followed. This
432    resembles GNU's "cp -P src dst".
433    """
434    if os.path.isdir(dst):
435        dst = os.path.join(dst, os.path.basename(src))
436    copyfile(src, dst, follow_symlinks=follow_symlinks)
437    copystat(src, dst, follow_symlinks=follow_symlinks)
438    return dst
439
440def ignore_patterns(*patterns):
441    """Function that can be used as copytree() ignore parameter.
442
443    Patterns is a sequence of glob-style patterns
444    that are used to exclude files"""
445    def _ignore_patterns(path, names):
446        ignored_names = []
447        for pattern in patterns:
448            ignored_names.extend(fnmatch.filter(names, pattern))
449        return set(ignored_names)
450    return _ignore_patterns
451
452def _copytree(entries, src, dst, symlinks, ignore, copy_function,
453              ignore_dangling_symlinks, dirs_exist_ok=False):
454    if ignore is not None:
455        ignored_names = ignore(os.fspath(src), [x.name for x in entries])
456    else:
457        ignored_names = set()
458
459    os.makedirs(dst, exist_ok=dirs_exist_ok)
460    errors = []
461    use_srcentry = copy_function is copy2 or copy_function is copy
462
463    for srcentry in entries:
464        if srcentry.name in ignored_names:
465            continue
466        srcname = os.path.join(src, srcentry.name)
467        dstname = os.path.join(dst, srcentry.name)
468        srcobj = srcentry if use_srcentry else srcname
469        try:
470            is_symlink = srcentry.is_symlink()
471            if is_symlink and os.name == 'nt':
472                # Special check for directory junctions, which appear as
473                # symlinks but we want to recurse.
474                lstat = srcentry.stat(follow_symlinks=False)
475                if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT:
476                    is_symlink = False
477            if is_symlink:
478                linkto = os.readlink(srcname)
479                if symlinks:
480                    # We can't just leave it to `copy_function` because legacy
481                    # code with a custom `copy_function` may rely on copytree
482                    # doing the right thing.
483                    os.symlink(linkto, dstname)
484                    copystat(srcobj, dstname, follow_symlinks=not symlinks)
485                else:
486                    # ignore dangling symlink if the flag is on
487                    if not os.path.exists(linkto) and ignore_dangling_symlinks:
488                        continue
489                    # otherwise let the copy occur. copy2 will raise an error
490                    if srcentry.is_dir():
491                        copytree(srcobj, dstname, symlinks, ignore,
492                                 copy_function, ignore_dangling_symlinks,
493                                 dirs_exist_ok)
494                    else:
495                        copy_function(srcobj, dstname)
496            elif srcentry.is_dir():
497                copytree(srcobj, dstname, symlinks, ignore, copy_function,
498                         ignore_dangling_symlinks, dirs_exist_ok)
499            else:
500                # Will raise a SpecialFileError for unsupported file types
501                copy_function(srcobj, dstname)
502        # catch the Error from the recursive copytree so that we can
503        # continue with other files
504        except Error as err:
505            errors.extend(err.args[0])
506        except OSError as why:
507            errors.append((srcname, dstname, str(why)))
508    try:
509        copystat(src, dst)
510    except OSError as why:
511        # Copying file access times may fail on Windows
512        if getattr(why, 'winerror', None) is None:
513            errors.append((src, dst, str(why)))
514    if errors:
515        raise Error(errors)
516    return dst
517
518def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
519             ignore_dangling_symlinks=False, dirs_exist_ok=False):
520    """Recursively copy a directory tree and return the destination directory.
521
522    If exception(s) occur, an Error is raised with a list of reasons.
523
524    If the optional symlinks flag is true, symbolic links in the
525    source tree result in symbolic links in the destination tree; if
526    it is false, the contents of the files pointed to by symbolic
527    links are copied. If the file pointed by the symlink doesn't
528    exist, an exception will be added in the list of errors raised in
529    an Error exception at the end of the copy process.
530
531    You can set the optional ignore_dangling_symlinks flag to true if you
532    want to silence this exception. Notice that this has no effect on
533    platforms that don't support os.symlink.
534
535    The optional ignore argument is a callable. If given, it
536    is called with the `src` parameter, which is the directory
537    being visited by copytree(), and `names` which is the list of
538    `src` contents, as returned by os.listdir():
539
540        callable(src, names) -> ignored_names
541
542    Since copytree() is called recursively, the callable will be
543    called once for each directory that is copied. It returns a
544    list of names relative to the `src` directory that should
545    not be copied.
546
547    The optional copy_function argument is a callable that will be used
548    to copy each file. It will be called with the source path and the
549    destination path as arguments. By default, copy2() is used, but any
550    function that supports the same signature (like copy()) can be used.
551
552    If dirs_exist_ok is false (the default) and `dst` already exists, a
553    `FileExistsError` is raised. If `dirs_exist_ok` is true, the copying
554    operation will continue if it encounters existing directories, and files
555    within the `dst` tree will be overwritten by corresponding files from the
556    `src` tree.
557    """
558    sys.audit("shutil.copytree", src, dst)
559    with os.scandir(src) as itr:
560        entries = list(itr)
561    return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
562                     ignore=ignore, copy_function=copy_function,
563                     ignore_dangling_symlinks=ignore_dangling_symlinks,
564                     dirs_exist_ok=dirs_exist_ok)
565
566if hasattr(os.stat_result, 'st_file_attributes'):
567    # Special handling for directory junctions to make them behave like
568    # symlinks for shutil.rmtree, since in general they do not appear as
569    # regular links.
570    def _rmtree_isdir(entry):
571        try:
572            st = entry.stat(follow_symlinks=False)
573            return (stat.S_ISDIR(st.st_mode) and not
574                (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
575                 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
576        except OSError:
577            return False
578
579    def _rmtree_islink(path):
580        try:
581            st = os.lstat(path)
582            return (stat.S_ISLNK(st.st_mode) or
583                (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT
584                 and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT))
585        except OSError:
586            return False
587else:
588    def _rmtree_isdir(entry):
589        try:
590            return entry.is_dir(follow_symlinks=False)
591        except OSError:
592            return False
593
594    def _rmtree_islink(path):
595        return os.path.islink(path)
596
597# version vulnerable to race conditions
598def _rmtree_unsafe(path, onerror):
599    try:
600        with os.scandir(path) as scandir_it:
601            entries = list(scandir_it)
602    except OSError:
603        onerror(os.scandir, path, sys.exc_info())
604        entries = []
605    for entry in entries:
606        fullname = entry.path
607        if _rmtree_isdir(entry):
608            try:
609                if entry.is_symlink():
610                    # This can only happen if someone replaces
611                    # a directory with a symlink after the call to
612                    # os.scandir or entry.is_dir above.
613                    raise OSError("Cannot call rmtree on a symbolic link")
614            except OSError:
615                onerror(os.path.islink, fullname, sys.exc_info())
616                continue
617            _rmtree_unsafe(fullname, onerror)
618        else:
619            try:
620                os.unlink(fullname)
621            except OSError:
622                onerror(os.unlink, fullname, sys.exc_info())
623    try:
624        os.rmdir(path)
625    except OSError:
626        onerror(os.rmdir, path, sys.exc_info())
627
628# Version using fd-based APIs to protect against races
629def _rmtree_safe_fd(topfd, path, onerror):
630    try:
631        with os.scandir(topfd) as scandir_it:
632            entries = list(scandir_it)
633    except OSError as err:
634        err.filename = path
635        onerror(os.scandir, path, sys.exc_info())
636        return
637    for entry in entries:
638        fullname = os.path.join(path, entry.name)
639        try:
640            is_dir = entry.is_dir(follow_symlinks=False)
641        except OSError:
642            is_dir = False
643        else:
644            if is_dir:
645                try:
646                    orig_st = entry.stat(follow_symlinks=False)
647                    is_dir = stat.S_ISDIR(orig_st.st_mode)
648                except OSError:
649                    onerror(os.lstat, fullname, sys.exc_info())
650                    continue
651        if is_dir:
652            try:
653                dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd)
654                dirfd_closed = False
655            except OSError:
656                onerror(os.open, fullname, sys.exc_info())
657            else:
658                try:
659                    if os.path.samestat(orig_st, os.fstat(dirfd)):
660                        _rmtree_safe_fd(dirfd, fullname, onerror)
661                        try:
662                            os.close(dirfd)
663                            dirfd_closed = True
664                            os.rmdir(entry.name, dir_fd=topfd)
665                        except OSError:
666                            onerror(os.rmdir, fullname, sys.exc_info())
667                    else:
668                        try:
669                            # This can only happen if someone replaces
670                            # a directory with a symlink after the call to
671                            # os.scandir or stat.S_ISDIR above.
672                            raise OSError("Cannot call rmtree on a symbolic "
673                                          "link")
674                        except OSError:
675                            onerror(os.path.islink, fullname, sys.exc_info())
676                finally:
677                    if not dirfd_closed:
678                        os.close(dirfd)
679        else:
680            try:
681                os.unlink(entry.name, dir_fd=topfd)
682            except OSError:
683                onerror(os.unlink, fullname, sys.exc_info())
684
685_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
686                     os.supports_dir_fd and
687                     os.scandir in os.supports_fd and
688                     os.stat in os.supports_follow_symlinks)
689
690def rmtree(path, ignore_errors=False, onerror=None, *, dir_fd=None):
691    """Recursively delete a directory tree.
692
693    If dir_fd is not None, it should be a file descriptor open to a directory;
694    path will then be relative to that directory.
695    dir_fd may not be implemented on your platform.
696    If it is unavailable, using it will raise a NotImplementedError.
697
698    If ignore_errors is set, errors are ignored; otherwise, if onerror
699    is set, it is called to handle the error with arguments (func,
700    path, exc_info) where func is platform and implementation dependent;
701    path is the argument to that function that caused it to fail; and
702    exc_info is a tuple returned by sys.exc_info().  If ignore_errors
703    is false and onerror is None, an exception is raised.
704
705    """
706    sys.audit("shutil.rmtree", path, dir_fd)
707    if ignore_errors:
708        def onerror(*args):
709            pass
710    elif onerror is None:
711        def onerror(*args):
712            raise
713    if _use_fd_functions:
714        # While the unsafe rmtree works fine on bytes, the fd based does not.
715        if isinstance(path, bytes):
716            path = os.fsdecode(path)
717        # Note: To guard against symlink races, we use the standard
718        # lstat()/open()/fstat() trick.
719        try:
720            orig_st = os.lstat(path, dir_fd=dir_fd)
721        except Exception:
722            onerror(os.lstat, path, sys.exc_info())
723            return
724        try:
725            fd = os.open(path, os.O_RDONLY, dir_fd=dir_fd)
726            fd_closed = False
727        except Exception:
728            onerror(os.open, path, sys.exc_info())
729            return
730        try:
731            if os.path.samestat(orig_st, os.fstat(fd)):
732                _rmtree_safe_fd(fd, path, onerror)
733                try:
734                    os.close(fd)
735                    fd_closed = True
736                    os.rmdir(path, dir_fd=dir_fd)
737                except OSError:
738                    onerror(os.rmdir, path, sys.exc_info())
739            else:
740                try:
741                    # symlinks to directories are forbidden, see bug #1669
742                    raise OSError("Cannot call rmtree on a symbolic link")
743                except OSError:
744                    onerror(os.path.islink, path, sys.exc_info())
745        finally:
746            if not fd_closed:
747                os.close(fd)
748    else:
749        if dir_fd is not None:
750            raise NotImplementedError("dir_fd unavailable on this platform")
751        try:
752            if _rmtree_islink(path):
753                # symlinks to directories are forbidden, see bug #1669
754                raise OSError("Cannot call rmtree on a symbolic link")
755        except OSError:
756            onerror(os.path.islink, path, sys.exc_info())
757            # can't continue even if onerror hook returns
758            return
759        return _rmtree_unsafe(path, onerror)
760
761# Allow introspection of whether or not the hardening against symlink
762# attacks is supported on the current platform
763rmtree.avoids_symlink_attacks = _use_fd_functions
764
765def _basename(path):
766    """A basename() variant which first strips the trailing slash, if present.
767    Thus we always get the last component of the path, even for directories.
768
769    path: Union[PathLike, str]
770
771    e.g.
772    >>> os.path.basename('/bar/foo')
773    'foo'
774    >>> os.path.basename('/bar/foo/')
775    ''
776    >>> _basename('/bar/foo/')
777    'foo'
778    """
779    path = os.fspath(path)
780    sep = os.path.sep + (os.path.altsep or '')
781    return os.path.basename(path.rstrip(sep))
782
783def move(src, dst, copy_function=copy2):
784    """Recursively move a file or directory to another location. This is
785    similar to the Unix "mv" command. Return the file or directory's
786    destination.
787
788    If the destination is a directory or a symlink to a directory, the source
789    is moved inside the directory. The destination path must not already
790    exist.
791
792    If the destination already exists but is not a directory, it may be
793    overwritten depending on os.rename() semantics.
794
795    If the destination is on our current filesystem, then rename() is used.
796    Otherwise, src is copied to the destination and then removed. Symlinks are
797    recreated under the new name if os.rename() fails because of cross
798    filesystem renames.
799
800    The optional `copy_function` argument is a callable that will be used
801    to copy the source or it will be delegated to `copytree`.
802    By default, copy2() is used, but any function that supports the same
803    signature (like copy()) can be used.
804
805    A lot more could be done here...  A look at a mv.c shows a lot of
806    the issues this implementation glosses over.
807
808    """
809    sys.audit("shutil.move", src, dst)
810    real_dst = dst
811    if os.path.isdir(dst):
812        if _samefile(src, dst):
813            # We might be on a case insensitive filesystem,
814            # perform the rename anyway.
815            os.rename(src, dst)
816            return
817
818        # Using _basename instead of os.path.basename is important, as we must
819        # ignore any trailing slash to avoid the basename returning ''
820        real_dst = os.path.join(dst, _basename(src))
821
822        if os.path.exists(real_dst):
823            raise Error("Destination path '%s' already exists" % real_dst)
824    try:
825        os.rename(src, real_dst)
826    except OSError:
827        if os.path.islink(src):
828            linkto = os.readlink(src)
829            os.symlink(linkto, real_dst)
830            os.unlink(src)
831        elif os.path.isdir(src):
832            if _destinsrc(src, dst):
833                raise Error("Cannot move a directory '%s' into itself"
834                            " '%s'." % (src, dst))
835            if (_is_immutable(src)
836                    or (not os.access(src, os.W_OK) and os.listdir(src)
837                        and sys.platform == 'darwin')):
838                raise PermissionError("Cannot move the non-empty directory "
839                                      "'%s': Lacking write permission to '%s'."
840                                      % (src, src))
841            copytree(src, real_dst, copy_function=copy_function,
842                     symlinks=True)
843            rmtree(src)
844        else:
845            copy_function(src, real_dst)
846            os.unlink(src)
847    return real_dst
848
849def _destinsrc(src, dst):
850    src = os.path.abspath(src)
851    dst = os.path.abspath(dst)
852    if not src.endswith(os.path.sep):
853        src += os.path.sep
854    if not dst.endswith(os.path.sep):
855        dst += os.path.sep
856    return dst.startswith(src)
857
858def _is_immutable(src):
859    st = _stat(src)
860    immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
861    return hasattr(st, 'st_flags') and st.st_flags in immutable_states
862
863def _get_gid(name):
864    """Returns a gid, given a group name."""
865    if name is None:
866        return None
867
868    try:
869        from grp import getgrnam
870    except ImportError:
871        return None
872
873    try:
874        result = getgrnam(name)
875    except KeyError:
876        result = None
877    if result is not None:
878        return result[2]
879    return None
880
881def _get_uid(name):
882    """Returns an uid, given a user name."""
883    if name is None:
884        return None
885
886    try:
887        from pwd import getpwnam
888    except ImportError:
889        return None
890
891    try:
892        result = getpwnam(name)
893    except KeyError:
894        result = None
895    if result is not None:
896        return result[2]
897    return None
898
899def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
900                  owner=None, group=None, logger=None, root_dir=None):
901    """Create a (possibly compressed) tar file from all the files under
902    'base_dir'.
903
904    'compress' must be "gzip" (the default), "bzip2", "xz", or None.
905
906    'owner' and 'group' can be used to define an owner and a group for the
907    archive that is being built. If not provided, the current owner and group
908    will be used.
909
910    The output tar file will be named 'base_name' +  ".tar", possibly plus
911    the appropriate compression extension (".gz", ".bz2", or ".xz").
912
913    Returns the output filename.
914    """
915    if compress is None:
916        tar_compression = ''
917    elif _ZLIB_SUPPORTED and compress == 'gzip':
918        tar_compression = 'gz'
919    elif _BZ2_SUPPORTED and compress == 'bzip2':
920        tar_compression = 'bz2'
921    elif _LZMA_SUPPORTED and compress == 'xz':
922        tar_compression = 'xz'
923    else:
924        raise ValueError("bad value for 'compress', or compression format not "
925                         "supported : {0}".format(compress))
926
927    import tarfile  # late import for breaking circular dependency
928
929    compress_ext = '.' + tar_compression if compress else ''
930    archive_name = base_name + '.tar' + compress_ext
931    archive_dir = os.path.dirname(archive_name)
932
933    if archive_dir and not os.path.exists(archive_dir):
934        if logger is not None:
935            logger.info("creating %s", archive_dir)
936        if not dry_run:
937            os.makedirs(archive_dir)
938
939    # creating the tarball
940    if logger is not None:
941        logger.info('Creating tar archive')
942
943    uid = _get_uid(owner)
944    gid = _get_gid(group)
945
946    def _set_uid_gid(tarinfo):
947        if gid is not None:
948            tarinfo.gid = gid
949            tarinfo.gname = group
950        if uid is not None:
951            tarinfo.uid = uid
952            tarinfo.uname = owner
953        return tarinfo
954
955    if not dry_run:
956        tar = tarfile.open(archive_name, 'w|%s' % tar_compression)
957        arcname = base_dir
958        if root_dir is not None:
959            base_dir = os.path.join(root_dir, base_dir)
960        try:
961            tar.add(base_dir, arcname, filter=_set_uid_gid)
962        finally:
963            tar.close()
964
965    if root_dir is not None:
966        archive_name = os.path.abspath(archive_name)
967    return archive_name
968
969def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0,
970                  logger=None, owner=None, group=None, root_dir=None):
971    """Create a zip file from all the files under 'base_dir'.
972
973    The output zip file will be named 'base_name' + ".zip".  Returns the
974    name of the output zip file.
975    """
976    import zipfile  # late import for breaking circular dependency
977
978    zip_filename = base_name + ".zip"
979    archive_dir = os.path.dirname(base_name)
980
981    if archive_dir and not os.path.exists(archive_dir):
982        if logger is not None:
983            logger.info("creating %s", archive_dir)
984        if not dry_run:
985            os.makedirs(archive_dir)
986
987    if logger is not None:
988        logger.info("creating '%s' and adding '%s' to it",
989                    zip_filename, base_dir)
990
991    if not dry_run:
992        with zipfile.ZipFile(zip_filename, "w",
993                             compression=zipfile.ZIP_DEFLATED) as zf:
994            arcname = os.path.normpath(base_dir)
995            if root_dir is not None:
996                base_dir = os.path.join(root_dir, base_dir)
997            base_dir = os.path.normpath(base_dir)
998            if arcname != os.curdir:
999                zf.write(base_dir, arcname)
1000                if logger is not None:
1001                    logger.info("adding '%s'", base_dir)
1002            for dirpath, dirnames, filenames in os.walk(base_dir):
1003                arcdirpath = dirpath
1004                if root_dir is not None:
1005                    arcdirpath = os.path.relpath(arcdirpath, root_dir)
1006                arcdirpath = os.path.normpath(arcdirpath)
1007                for name in sorted(dirnames):
1008                    path = os.path.join(dirpath, name)
1009                    arcname = os.path.join(arcdirpath, name)
1010                    zf.write(path, arcname)
1011                    if logger is not None:
1012                        logger.info("adding '%s'", path)
1013                for name in filenames:
1014                    path = os.path.join(dirpath, name)
1015                    path = os.path.normpath(path)
1016                    if os.path.isfile(path):
1017                        arcname = os.path.join(arcdirpath, name)
1018                        zf.write(path, arcname)
1019                        if logger is not None:
1020                            logger.info("adding '%s'", path)
1021
1022    if root_dir is not None:
1023        zip_filename = os.path.abspath(zip_filename)
1024    return zip_filename
1025
1026# Maps the name of the archive format to a tuple containing:
1027# * the archiving function
1028# * extra keyword arguments
1029# * description
1030# * does it support the root_dir argument?
1031_ARCHIVE_FORMATS = {
1032    'tar':   (_make_tarball, [('compress', None)],
1033              "uncompressed tar file", True),
1034}
1035
1036if _ZLIB_SUPPORTED:
1037    _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
1038                                "gzip'ed tar-file", True)
1039    _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file", True)
1040
1041if _BZ2_SUPPORTED:
1042    _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
1043                                "bzip2'ed tar-file", True)
1044
1045if _LZMA_SUPPORTED:
1046    _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
1047                                "xz'ed tar-file", True)
1048
1049def get_archive_formats():
1050    """Returns a list of supported formats for archiving and unarchiving.
1051
1052    Each element of the returned sequence is a tuple (name, description)
1053    """
1054    formats = [(name, registry[2]) for name, registry in
1055               _ARCHIVE_FORMATS.items()]
1056    formats.sort()
1057    return formats
1058
1059def register_archive_format(name, function, extra_args=None, description=''):
1060    """Registers an archive format.
1061
1062    name is the name of the format. function is the callable that will be
1063    used to create archives. If provided, extra_args is a sequence of
1064    (name, value) tuples that will be passed as arguments to the callable.
1065    description can be provided to describe the format, and will be returned
1066    by the get_archive_formats() function.
1067    """
1068    if extra_args is None:
1069        extra_args = []
1070    if not callable(function):
1071        raise TypeError('The %s object is not callable' % function)
1072    if not isinstance(extra_args, (tuple, list)):
1073        raise TypeError('extra_args needs to be a sequence')
1074    for element in extra_args:
1075        if not isinstance(element, (tuple, list)) or len(element) !=2:
1076            raise TypeError('extra_args elements are : (arg_name, value)')
1077
1078    _ARCHIVE_FORMATS[name] = (function, extra_args, description, False)
1079
1080def unregister_archive_format(name):
1081    del _ARCHIVE_FORMATS[name]
1082
1083def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
1084                 dry_run=0, owner=None, group=None, logger=None):
1085    """Create an archive file (eg. zip or tar).
1086
1087    'base_name' is the name of the file to create, minus any format-specific
1088    extension; 'format' is the archive format: one of "zip", "tar", "gztar",
1089    "bztar", or "xztar".  Or any other registered format.
1090
1091    'root_dir' is a directory that will be the root directory of the
1092    archive; ie. we typically chdir into 'root_dir' before creating the
1093    archive.  'base_dir' is the directory where we start archiving from;
1094    ie. 'base_dir' will be the common prefix of all files and
1095    directories in the archive.  'root_dir' and 'base_dir' both default
1096    to the current directory.  Returns the name of the archive file.
1097
1098    'owner' and 'group' are used when creating a tar archive. By default,
1099    uses the current owner and group.
1100    """
1101    sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
1102    try:
1103        format_info = _ARCHIVE_FORMATS[format]
1104    except KeyError:
1105        raise ValueError("unknown archive format '%s'" % format) from None
1106
1107    kwargs = {'dry_run': dry_run, 'logger': logger,
1108              'owner': owner, 'group': group}
1109
1110    func = format_info[0]
1111    for arg, val in format_info[1]:
1112        kwargs[arg] = val
1113
1114    if base_dir is None:
1115        base_dir = os.curdir
1116
1117    support_root_dir = format_info[3]
1118    save_cwd = None
1119    if root_dir is not None:
1120        if support_root_dir:
1121            # Support path-like base_name here for backwards-compatibility.
1122            base_name = os.fspath(base_name)
1123            kwargs['root_dir'] = root_dir
1124        else:
1125            save_cwd = os.getcwd()
1126            if logger is not None:
1127                logger.debug("changing into '%s'", root_dir)
1128            base_name = os.path.abspath(base_name)
1129            if not dry_run:
1130                os.chdir(root_dir)
1131
1132    try:
1133        filename = func(base_name, base_dir, **kwargs)
1134    finally:
1135        if save_cwd is not None:
1136            if logger is not None:
1137                logger.debug("changing back to '%s'", save_cwd)
1138            os.chdir(save_cwd)
1139
1140    return filename
1141
1142
1143def get_unpack_formats():
1144    """Returns a list of supported formats for unpacking.
1145
1146    Each element of the returned sequence is a tuple
1147    (name, extensions, description)
1148    """
1149    formats = [(name, info[0], info[3]) for name, info in
1150               _UNPACK_FORMATS.items()]
1151    formats.sort()
1152    return formats
1153
1154def _check_unpack_options(extensions, function, extra_args):
1155    """Checks what gets registered as an unpacker."""
1156    # first make sure no other unpacker is registered for this extension
1157    existing_extensions = {}
1158    for name, info in _UNPACK_FORMATS.items():
1159        for ext in info[0]:
1160            existing_extensions[ext] = name
1161
1162    for extension in extensions:
1163        if extension in existing_extensions:
1164            msg = '%s is already registered for "%s"'
1165            raise RegistryError(msg % (extension,
1166                                       existing_extensions[extension]))
1167
1168    if not callable(function):
1169        raise TypeError('The registered function must be a callable')
1170
1171
1172def register_unpack_format(name, extensions, function, extra_args=None,
1173                           description=''):
1174    """Registers an unpack format.
1175
1176    `name` is the name of the format. `extensions` is a list of extensions
1177    corresponding to the format.
1178
1179    `function` is the callable that will be
1180    used to unpack archives. The callable will receive archives to unpack.
1181    If it's unable to handle an archive, it needs to raise a ReadError
1182    exception.
1183
1184    If provided, `extra_args` is a sequence of
1185    (name, value) tuples that will be passed as arguments to the callable.
1186    description can be provided to describe the format, and will be returned
1187    by the get_unpack_formats() function.
1188    """
1189    if extra_args is None:
1190        extra_args = []
1191    _check_unpack_options(extensions, function, extra_args)
1192    _UNPACK_FORMATS[name] = extensions, function, extra_args, description
1193
1194def unregister_unpack_format(name):
1195    """Removes the pack format from the registry."""
1196    del _UNPACK_FORMATS[name]
1197
1198def _ensure_directory(path):
1199    """Ensure that the parent directory of `path` exists"""
1200    dirname = os.path.dirname(path)
1201    if not os.path.isdir(dirname):
1202        os.makedirs(dirname)
1203
1204def _unpack_zipfile(filename, extract_dir):
1205    """Unpack zip `filename` to `extract_dir`
1206    """
1207    import zipfile  # late import for breaking circular dependency
1208
1209    if not zipfile.is_zipfile(filename):
1210        raise ReadError("%s is not a zip file" % filename)
1211
1212    zip = zipfile.ZipFile(filename)
1213    try:
1214        for info in zip.infolist():
1215            name = info.filename
1216
1217            # don't extract absolute paths or ones with .. in them
1218            if name.startswith('/') or '..' in name:
1219                continue
1220
1221            targetpath = os.path.join(extract_dir, *name.split('/'))
1222            if not targetpath:
1223                continue
1224
1225            _ensure_directory(targetpath)
1226            if not name.endswith('/'):
1227                # file
1228                with zip.open(name, 'r') as source, \
1229                        open(targetpath, 'wb') as target:
1230                    copyfileobj(source, target)
1231    finally:
1232        zip.close()
1233
1234def _unpack_tarfile(filename, extract_dir, *, filter=None):
1235    """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir`
1236    """
1237    import tarfile  # late import for breaking circular dependency
1238    try:
1239        tarobj = tarfile.open(filename)
1240    except tarfile.TarError:
1241        raise ReadError(
1242            "%s is not a compressed or uncompressed tar file" % filename)
1243    try:
1244        tarobj.extractall(extract_dir, filter=filter)
1245    finally:
1246        tarobj.close()
1247
1248# Maps the name of the unpack format to a tuple containing:
1249# * extensions
1250# * the unpacking function
1251# * extra keyword arguments
1252# * description
1253_UNPACK_FORMATS = {
1254    'tar':   (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
1255    'zip':   (['.zip'], _unpack_zipfile, [], "ZIP file"),
1256}
1257
1258if _ZLIB_SUPPORTED:
1259    _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [],
1260                                "gzip'ed tar-file")
1261
1262if _BZ2_SUPPORTED:
1263    _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [],
1264                                "bzip2'ed tar-file")
1265
1266if _LZMA_SUPPORTED:
1267    _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [],
1268                                "xz'ed tar-file")
1269
1270def _find_unpack_format(filename):
1271    for name, info in _UNPACK_FORMATS.items():
1272        for extension in info[0]:
1273            if filename.endswith(extension):
1274                return name
1275    return None
1276
1277def unpack_archive(filename, extract_dir=None, format=None, *, filter=None):
1278    """Unpack an archive.
1279
1280    `filename` is the name of the archive.
1281
1282    `extract_dir` is the name of the target directory, where the archive
1283    is unpacked. If not provided, the current working directory is used.
1284
1285    `format` is the archive format: one of "zip", "tar", "gztar", "bztar",
1286    or "xztar".  Or any other registered format.  If not provided,
1287    unpack_archive will use the filename extension and see if an unpacker
1288    was registered for that extension.
1289
1290    In case none is found, a ValueError is raised.
1291
1292    If `filter` is given, it is passed to the underlying
1293    extraction function.
1294    """
1295    sys.audit("shutil.unpack_archive", filename, extract_dir, format)
1296
1297    if extract_dir is None:
1298        extract_dir = os.getcwd()
1299
1300    extract_dir = os.fspath(extract_dir)
1301    filename = os.fspath(filename)
1302
1303    if filter is None:
1304        filter_kwargs = {}
1305    else:
1306        filter_kwargs = {'filter': filter}
1307    if format is not None:
1308        try:
1309            format_info = _UNPACK_FORMATS[format]
1310        except KeyError:
1311            raise ValueError("Unknown unpack format '{0}'".format(format)) from None
1312
1313        func = format_info[1]
1314        func(filename, extract_dir, **dict(format_info[2]), **filter_kwargs)
1315    else:
1316        # we need to look at the registered unpackers supported extensions
1317        format = _find_unpack_format(filename)
1318        if format is None:
1319            raise ReadError("Unknown archive format '{0}'".format(filename))
1320
1321        func = _UNPACK_FORMATS[format][1]
1322        kwargs = dict(_UNPACK_FORMATS[format][2]) | filter_kwargs
1323        func(filename, extract_dir, **kwargs)
1324
1325
1326if hasattr(os, 'statvfs'):
1327
1328    __all__.append('disk_usage')
1329    _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1330    _ntuple_diskusage.total.__doc__ = 'Total space in bytes'
1331    _ntuple_diskusage.used.__doc__ = 'Used space in bytes'
1332    _ntuple_diskusage.free.__doc__ = 'Free space in bytes'
1333
1334    def disk_usage(path):
1335        """Return disk usage statistics about the given path.
1336
1337        Returned value is a named tuple with attributes 'total', 'used' and
1338        'free', which are the amount of total, used and free space, in bytes.
1339        """
1340        st = os.statvfs(path)
1341        free = st.f_bavail * st.f_frsize
1342        total = st.f_blocks * st.f_frsize
1343        used = (st.f_blocks - st.f_bfree) * st.f_frsize
1344        return _ntuple_diskusage(total, used, free)
1345
1346elif _WINDOWS:
1347
1348    __all__.append('disk_usage')
1349    _ntuple_diskusage = collections.namedtuple('usage', 'total used free')
1350
1351    def disk_usage(path):
1352        """Return disk usage statistics about the given path.
1353
1354        Returned values is a named tuple with attributes 'total', 'used' and
1355        'free', which are the amount of total, used and free space, in bytes.
1356        """
1357        total, free = nt._getdiskusage(path)
1358        used = total - free
1359        return _ntuple_diskusage(total, used, free)
1360
1361
1362def chown(path, user=None, group=None):
1363    """Change owner user and group of the given path.
1364
1365    user and group can be the uid/gid or the user/group names, and in that case,
1366    they are converted to their respective uid/gid.
1367    """
1368    sys.audit('shutil.chown', path, user, group)
1369
1370    if user is None and group is None:
1371        raise ValueError("user and/or group must be set")
1372
1373    _user = user
1374    _group = group
1375
1376    # -1 means don't change it
1377    if user is None:
1378        _user = -1
1379    # user can either be an int (the uid) or a string (the system username)
1380    elif isinstance(user, str):
1381        _user = _get_uid(user)
1382        if _user is None:
1383            raise LookupError("no such user: {!r}".format(user))
1384
1385    if group is None:
1386        _group = -1
1387    elif not isinstance(group, int):
1388        _group = _get_gid(group)
1389        if _group is None:
1390            raise LookupError("no such group: {!r}".format(group))
1391
1392    os.chown(path, _user, _group)
1393
1394def get_terminal_size(fallback=(80, 24)):
1395    """Get the size of the terminal window.
1396
1397    For each of the two dimensions, the environment variable, COLUMNS
1398    and LINES respectively, is checked. If the variable is defined and
1399    the value is a positive integer, it is used.
1400
1401    When COLUMNS or LINES is not defined, which is the common case,
1402    the terminal connected to sys.__stdout__ is queried
1403    by invoking os.get_terminal_size.
1404
1405    If the terminal size cannot be successfully queried, either because
1406    the system doesn't support querying, or because we are not
1407    connected to a terminal, the value given in fallback parameter
1408    is used. Fallback defaults to (80, 24) which is the default
1409    size used by many terminal emulators.
1410
1411    The value returned is a named tuple of type os.terminal_size.
1412    """
1413    # columns, lines are the working values
1414    try:
1415        columns = int(os.environ['COLUMNS'])
1416    except (KeyError, ValueError):
1417        columns = 0
1418
1419    try:
1420        lines = int(os.environ['LINES'])
1421    except (KeyError, ValueError):
1422        lines = 0
1423
1424    # only query if necessary
1425    if columns <= 0 or lines <= 0:
1426        try:
1427            size = os.get_terminal_size(sys.__stdout__.fileno())
1428        except (AttributeError, ValueError, OSError):
1429            # stdout is None, closed, detached, or not a terminal, or
1430            # os.get_terminal_size() is unsupported
1431            size = os.terminal_size(fallback)
1432        if columns <= 0:
1433            columns = size.columns or fallback[0]
1434        if lines <= 0:
1435            lines = size.lines or fallback[1]
1436
1437    return os.terminal_size((columns, lines))
1438
1439
1440# Check that a given file can be accessed with the correct mode.
1441# Additionally check that `file` is not a directory, as on Windows
1442# directories pass the os.access check.
1443def _access_check(fn, mode):
1444    return (os.path.exists(fn) and os.access(fn, mode)
1445            and not os.path.isdir(fn))
1446
1447
1448def which(cmd, mode=os.F_OK | os.X_OK, path=None):
1449    """Given a command, mode, and a PATH string, return the path which
1450    conforms to the given mode on the PATH, or None if there is no such
1451    file.
1452
1453    `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
1454    of os.environ.get("PATH"), or can be overridden with a custom search
1455    path.
1456
1457    """
1458    # If we're given a path with a directory part, look it up directly rather
1459    # than referring to PATH directories. This includes checking relative to the
1460    # current directory, e.g. ./script
1461    if os.path.dirname(cmd):
1462        if _access_check(cmd, mode):
1463            return cmd
1464        return None
1465
1466    use_bytes = isinstance(cmd, bytes)
1467
1468    if path is None:
1469        path = os.environ.get("PATH", None)
1470        if path is None:
1471            try:
1472                path = os.confstr("CS_PATH")
1473            except (AttributeError, ValueError):
1474                # os.confstr() or CS_PATH is not available
1475                path = os.defpath
1476        # bpo-35755: Don't use os.defpath if the PATH environment variable is
1477        # set to an empty string
1478
1479    # PATH='' doesn't match, whereas PATH=':' looks in the current directory
1480    if not path:
1481        return None
1482
1483    if use_bytes:
1484        path = os.fsencode(path)
1485        path = path.split(os.fsencode(os.pathsep))
1486    else:
1487        path = os.fsdecode(path)
1488        path = path.split(os.pathsep)
1489
1490    if sys.platform == "win32":
1491        # The current directory takes precedence on Windows.
1492        curdir = os.curdir
1493        if use_bytes:
1494            curdir = os.fsencode(curdir)
1495        if curdir not in path:
1496            path.insert(0, curdir)
1497
1498        # PATHEXT is necessary to check on Windows.
1499        pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT
1500        pathext = [ext for ext in pathext_source.split(os.pathsep) if ext]
1501
1502        if use_bytes:
1503            pathext = [os.fsencode(ext) for ext in pathext]
1504        # See if the given file matches any of the expected path extensions.
1505        # This will allow us to short circuit when given "python.exe".
1506        # If it does match, only test that one, otherwise we have to try
1507        # others.
1508        if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
1509            files = [cmd]
1510        else:
1511            files = [cmd + ext for ext in pathext]
1512    else:
1513        # On other platforms you don't have things like PATHEXT to tell you
1514        # what file suffixes are executable, so just pass on cmd as-is.
1515        files = [cmd]
1516
1517    seen = set()
1518    for dir in path:
1519        normdir = os.path.normcase(dir)
1520        if not normdir in seen:
1521            seen.add(normdir)
1522            for thefile in files:
1523                name = os.path.join(dir, thefile)
1524                if _access_check(name, mode):
1525                    return name
1526    return None
1527