1# Copyright (C) 2003 Python Software Foundation
2
3import unittest
4import unittest.mock
5import shutil
6import tempfile
7import sys
8import stat
9import os
10import os.path
11import errno
12import functools
13import pathlib
14import subprocess
15import random
16import string
17import contextlib
18import io
19from shutil import (make_archive,
20                    register_archive_format, unregister_archive_format,
21                    get_archive_formats, Error, unpack_archive,
22                    register_unpack_format, RegistryError,
23                    unregister_unpack_format, get_unpack_formats,
24                    SameFileError, _GiveupOnFastCopy)
25import tarfile
26import zipfile
27try:
28    import posix
29except ImportError:
30    posix = None
31
32from test import support
33from test.support import os_helper
34from test.support.os_helper import TESTFN, FakePath
35from test.support import warnings_helper
36
37TESTFN2 = TESTFN + "2"
38TESTFN_SRC = TESTFN + "_SRC"
39TESTFN_DST = TESTFN + "_DST"
40MACOS = sys.platform.startswith("darwin")
41SOLARIS = sys.platform.startswith("sunos")
42AIX = sys.platform[:3] == 'aix'
43try:
44    import grp
45    import pwd
46    UID_GID_SUPPORT = True
47except ImportError:
48    UID_GID_SUPPORT = False
49
50try:
51    import _winapi
52except ImportError:
53    _winapi = None
54
55no_chdir = unittest.mock.patch('os.chdir',
56        side_effect=AssertionError("shouldn't call os.chdir()"))
57
58def _fake_rename(*args, **kwargs):
59    # Pretend the destination path is on a different filesystem.
60    raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
61
62def mock_rename(func):
63    @functools.wraps(func)
64    def wrap(*args, **kwargs):
65        try:
66            builtin_rename = os.rename
67            os.rename = _fake_rename
68            return func(*args, **kwargs)
69        finally:
70            os.rename = builtin_rename
71    return wrap
72
73def write_file(path, content, binary=False):
74    """Write *content* to a file located at *path*.
75
76    If *path* is a tuple instead of a string, os.path.join will be used to
77    make a path.  If *binary* is true, the file will be opened in binary
78    mode.
79    """
80    if isinstance(path, tuple):
81        path = os.path.join(*path)
82    mode = 'wb' if binary else 'w'
83    encoding = None if binary else "utf-8"
84    with open(path, mode, encoding=encoding) as fp:
85        fp.write(content)
86
87def write_test_file(path, size):
88    """Create a test file with an arbitrary size and random text content."""
89    def chunks(total, step):
90        assert total >= step
91        while total > step:
92            yield step
93            total -= step
94        if total:
95            yield total
96
97    bufsize = min(size, 8192)
98    chunk = b"".join([random.choice(string.ascii_letters).encode()
99                      for i in range(bufsize)])
100    with open(path, 'wb') as f:
101        for csize in chunks(size, bufsize):
102            f.write(chunk)
103    assert os.path.getsize(path) == size
104
105def read_file(path, binary=False):
106    """Return contents from a file located at *path*.
107
108    If *path* is a tuple instead of a string, os.path.join will be used to
109    make a path.  If *binary* is true, the file will be opened in binary
110    mode.
111    """
112    if isinstance(path, tuple):
113        path = os.path.join(*path)
114    mode = 'rb' if binary else 'r'
115    encoding = None if binary else "utf-8"
116    with open(path, mode, encoding=encoding) as fp:
117        return fp.read()
118
119def rlistdir(path):
120    res = []
121    for name in sorted(os.listdir(path)):
122        p = os.path.join(path, name)
123        if os.path.isdir(p) and not os.path.islink(p):
124            res.append(name + '/')
125            for n in rlistdir(p):
126                res.append(name + '/' + n)
127        else:
128            res.append(name)
129    return res
130
131def supports_file2file_sendfile():
132    # ...apparently Linux and Solaris are the only ones
133    if not hasattr(os, "sendfile"):
134        return False
135    srcname = None
136    dstname = None
137    try:
138        with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f:
139            srcname = f.name
140            f.write(b"0123456789")
141
142        with open(srcname, "rb") as src:
143            with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst:
144                dstname = dst.name
145                infd = src.fileno()
146                outfd = dst.fileno()
147                try:
148                    os.sendfile(outfd, infd, 0, 2)
149                except OSError:
150                    return False
151                else:
152                    return True
153    finally:
154        if srcname is not None:
155            os_helper.unlink(srcname)
156        if dstname is not None:
157            os_helper.unlink(dstname)
158
159
160SUPPORTS_SENDFILE = supports_file2file_sendfile()
161
162# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test
163# The AIX command 'dump -o program' gives XCOFF header information
164# The second word of the last line in the maxdata value
165# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed
166def _maxdataOK():
167    if AIX and sys.maxsize == 2147483647:
168        hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable)
169        maxdata=hdrs.split("\n")[-1].split()[1]
170        return int(maxdata,16) >= 0x20000000
171    else:
172        return True
173
174
175class BaseTest:
176
177    def mkdtemp(self, prefix=None):
178        """Create a temporary directory that will be cleaned up.
179
180        Returns the path of the directory.
181        """
182        d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd())
183        self.addCleanup(os_helper.rmtree, d)
184        return d
185
186
187class TestRmTree(BaseTest, unittest.TestCase):
188
189    def test_rmtree_works_on_bytes(self):
190        tmp = self.mkdtemp()
191        victim = os.path.join(tmp, 'killme')
192        os.mkdir(victim)
193        write_file(os.path.join(victim, 'somefile'), 'foo')
194        victim = os.fsencode(victim)
195        self.assertIsInstance(victim, bytes)
196        shutil.rmtree(victim)
197
198    @os_helper.skip_unless_symlink
199    def test_rmtree_fails_on_symlink(self):
200        tmp = self.mkdtemp()
201        dir_ = os.path.join(tmp, 'dir')
202        os.mkdir(dir_)
203        link = os.path.join(tmp, 'link')
204        os.symlink(dir_, link)
205        self.assertRaises(OSError, shutil.rmtree, link)
206        self.assertTrue(os.path.exists(dir_))
207        self.assertTrue(os.path.lexists(link))
208        errors = []
209        def onerror(*args):
210            errors.append(args)
211        shutil.rmtree(link, onerror=onerror)
212        self.assertEqual(len(errors), 1)
213        self.assertIs(errors[0][0], os.path.islink)
214        self.assertEqual(errors[0][1], link)
215        self.assertIsInstance(errors[0][2][1], OSError)
216
217    @os_helper.skip_unless_symlink
218    def test_rmtree_works_on_symlinks(self):
219        tmp = self.mkdtemp()
220        dir1 = os.path.join(tmp, 'dir1')
221        dir2 = os.path.join(dir1, 'dir2')
222        dir3 = os.path.join(tmp, 'dir3')
223        for d in dir1, dir2, dir3:
224            os.mkdir(d)
225        file1 = os.path.join(tmp, 'file1')
226        write_file(file1, 'foo')
227        link1 = os.path.join(dir1, 'link1')
228        os.symlink(dir2, link1)
229        link2 = os.path.join(dir1, 'link2')
230        os.symlink(dir3, link2)
231        link3 = os.path.join(dir1, 'link3')
232        os.symlink(file1, link3)
233        # make sure symlinks are removed but not followed
234        shutil.rmtree(dir1)
235        self.assertFalse(os.path.exists(dir1))
236        self.assertTrue(os.path.exists(dir3))
237        self.assertTrue(os.path.exists(file1))
238
239    @unittest.skipUnless(_winapi, 'only relevant on Windows')
240    def test_rmtree_fails_on_junctions(self):
241        tmp = self.mkdtemp()
242        dir_ = os.path.join(tmp, 'dir')
243        os.mkdir(dir_)
244        link = os.path.join(tmp, 'link')
245        _winapi.CreateJunction(dir_, link)
246        self.addCleanup(os_helper.unlink, link)
247        self.assertRaises(OSError, shutil.rmtree, link)
248        self.assertTrue(os.path.exists(dir_))
249        self.assertTrue(os.path.lexists(link))
250        errors = []
251        def onerror(*args):
252            errors.append(args)
253        shutil.rmtree(link, onerror=onerror)
254        self.assertEqual(len(errors), 1)
255        self.assertIs(errors[0][0], os.path.islink)
256        self.assertEqual(errors[0][1], link)
257        self.assertIsInstance(errors[0][2][1], OSError)
258
259    @unittest.skipUnless(_winapi, 'only relevant on Windows')
260    def test_rmtree_works_on_junctions(self):
261        tmp = self.mkdtemp()
262        dir1 = os.path.join(tmp, 'dir1')
263        dir2 = os.path.join(dir1, 'dir2')
264        dir3 = os.path.join(tmp, 'dir3')
265        for d in dir1, dir2, dir3:
266            os.mkdir(d)
267        file1 = os.path.join(tmp, 'file1')
268        write_file(file1, 'foo')
269        link1 = os.path.join(dir1, 'link1')
270        _winapi.CreateJunction(dir2, link1)
271        link2 = os.path.join(dir1, 'link2')
272        _winapi.CreateJunction(dir3, link2)
273        link3 = os.path.join(dir1, 'link3')
274        _winapi.CreateJunction(file1, link3)
275        # make sure junctions are removed but not followed
276        shutil.rmtree(dir1)
277        self.assertFalse(os.path.exists(dir1))
278        self.assertTrue(os.path.exists(dir3))
279        self.assertTrue(os.path.exists(file1))
280
281    def test_rmtree_errors(self):
282        # filename is guaranteed not to exist
283        filename = tempfile.mktemp(dir=self.mkdtemp())
284        self.assertRaises(FileNotFoundError, shutil.rmtree, filename)
285        # test that ignore_errors option is honored
286        shutil.rmtree(filename, ignore_errors=True)
287
288        # existing file
289        tmpdir = self.mkdtemp()
290        write_file((tmpdir, "tstfile"), "")
291        filename = os.path.join(tmpdir, "tstfile")
292        with self.assertRaises(NotADirectoryError) as cm:
293            shutil.rmtree(filename)
294        self.assertEqual(cm.exception.filename, filename)
295        self.assertTrue(os.path.exists(filename))
296        # test that ignore_errors option is honored
297        shutil.rmtree(filename, ignore_errors=True)
298        self.assertTrue(os.path.exists(filename))
299        errors = []
300        def onerror(*args):
301            errors.append(args)
302        shutil.rmtree(filename, onerror=onerror)
303        self.assertEqual(len(errors), 2)
304        self.assertIs(errors[0][0], os.scandir)
305        self.assertEqual(errors[0][1], filename)
306        self.assertIsInstance(errors[0][2][1], NotADirectoryError)
307        self.assertEqual(errors[0][2][1].filename, filename)
308        self.assertIs(errors[1][0], os.rmdir)
309        self.assertEqual(errors[1][1], filename)
310        self.assertIsInstance(errors[1][2][1], NotADirectoryError)
311        self.assertEqual(errors[1][2][1].filename, filename)
312
313
314    @unittest.skipIf(sys.platform[:6] == 'cygwin',
315                     "This test can't be run on Cygwin (issue #1071513).")
316    @os_helper.skip_if_dac_override
317    @os_helper.skip_unless_working_chmod
318    def test_on_error(self):
319        self.errorState = 0
320        os.mkdir(TESTFN)
321        self.addCleanup(shutil.rmtree, TESTFN)
322
323        self.child_file_path = os.path.join(TESTFN, 'a')
324        self.child_dir_path = os.path.join(TESTFN, 'b')
325        os_helper.create_empty_file(self.child_file_path)
326        os.mkdir(self.child_dir_path)
327        old_dir_mode = os.stat(TESTFN).st_mode
328        old_child_file_mode = os.stat(self.child_file_path).st_mode
329        old_child_dir_mode = os.stat(self.child_dir_path).st_mode
330        # Make unwritable.
331        new_mode = stat.S_IREAD|stat.S_IEXEC
332        os.chmod(self.child_file_path, new_mode)
333        os.chmod(self.child_dir_path, new_mode)
334        os.chmod(TESTFN, new_mode)
335
336        self.addCleanup(os.chmod, TESTFN, old_dir_mode)
337        self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
338        self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
339
340        shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
341        # Test whether onerror has actually been called.
342        self.assertEqual(self.errorState, 3,
343                         "Expected call to onerror function did not happen.")
344
345    def check_args_to_onerror(self, func, arg, exc):
346        # test_rmtree_errors deliberately runs rmtree
347        # on a directory that is chmod 500, which will fail.
348        # This function is run when shutil.rmtree fails.
349        # 99.9% of the time it initially fails to remove
350        # a file in the directory, so the first time through
351        # func is os.remove.
352        # However, some Linux machines running ZFS on
353        # FUSE experienced a failure earlier in the process
354        # at os.listdir.  The first failure may legally
355        # be either.
356        if self.errorState < 2:
357            if func is os.unlink:
358                self.assertEqual(arg, self.child_file_path)
359            elif func is os.rmdir:
360                self.assertEqual(arg, self.child_dir_path)
361            else:
362                self.assertIs(func, os.listdir)
363                self.assertIn(arg, [TESTFN, self.child_dir_path])
364            self.assertTrue(issubclass(exc[0], OSError))
365            self.errorState += 1
366        else:
367            self.assertEqual(func, os.rmdir)
368            self.assertEqual(arg, TESTFN)
369            self.assertTrue(issubclass(exc[0], OSError))
370            self.errorState = 3
371
372    def test_rmtree_does_not_choke_on_failing_lstat(self):
373        try:
374            orig_lstat = os.lstat
375            def raiser(fn, *args, **kwargs):
376                if fn != TESTFN:
377                    raise OSError()
378                else:
379                    return orig_lstat(fn)
380            os.lstat = raiser
381
382            os.mkdir(TESTFN)
383            write_file((TESTFN, 'foo'), 'foo')
384            shutil.rmtree(TESTFN)
385        finally:
386            os.lstat = orig_lstat
387
388    def test_rmtree_uses_safe_fd_version_if_available(self):
389        _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <=
390                             os.supports_dir_fd and
391                             os.listdir in os.supports_fd and
392                             os.stat in os.supports_follow_symlinks)
393        if _use_fd_functions:
394            self.assertTrue(shutil._use_fd_functions)
395            self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
396            tmp_dir = self.mkdtemp()
397            d = os.path.join(tmp_dir, 'a')
398            os.mkdir(d)
399            try:
400                real_rmtree = shutil._rmtree_safe_fd
401                class Called(Exception): pass
402                def _raiser(*args, **kwargs):
403                    raise Called
404                shutil._rmtree_safe_fd = _raiser
405                self.assertRaises(Called, shutil.rmtree, d)
406            finally:
407                shutil._rmtree_safe_fd = real_rmtree
408        else:
409            self.assertFalse(shutil._use_fd_functions)
410            self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
411
412    @unittest.skipUnless(shutil._use_fd_functions, "dir_fd is not supported")
413    def test_rmtree_with_dir_fd(self):
414        tmp_dir = self.mkdtemp()
415        victim = 'killme'
416        fullname = os.path.join(tmp_dir, victim)
417        dir_fd = os.open(tmp_dir, os.O_RDONLY)
418        self.addCleanup(os.close, dir_fd)
419        os.mkdir(fullname)
420        os.mkdir(os.path.join(fullname, 'subdir'))
421        write_file(os.path.join(fullname, 'subdir', 'somefile'), 'foo')
422        self.assertTrue(os.path.exists(fullname))
423        shutil.rmtree(victim, dir_fd=dir_fd)
424        self.assertFalse(os.path.exists(fullname))
425
426    @unittest.skipIf(shutil._use_fd_functions, "dir_fd is supported")
427    def test_rmtree_with_dir_fd_unsupported(self):
428        tmp_dir = self.mkdtemp()
429        with self.assertRaises(NotImplementedError):
430            shutil.rmtree(tmp_dir, dir_fd=0)
431        self.assertTrue(os.path.exists(tmp_dir))
432
433    def test_rmtree_dont_delete_file(self):
434        # When called on a file instead of a directory, don't delete it.
435        handle, path = tempfile.mkstemp(dir=self.mkdtemp())
436        os.close(handle)
437        self.assertRaises(NotADirectoryError, shutil.rmtree, path)
438        os.remove(path)
439
440    @os_helper.skip_unless_symlink
441    def test_rmtree_on_symlink(self):
442        # bug 1669.
443        os.mkdir(TESTFN)
444        try:
445            src = os.path.join(TESTFN, 'cheese')
446            dst = os.path.join(TESTFN, 'shop')
447            os.mkdir(src)
448            os.symlink(src, dst)
449            self.assertRaises(OSError, shutil.rmtree, dst)
450            shutil.rmtree(dst, ignore_errors=True)
451        finally:
452            shutil.rmtree(TESTFN, ignore_errors=True)
453
454    @unittest.skipUnless(_winapi, 'only relevant on Windows')
455    def test_rmtree_on_junction(self):
456        os.mkdir(TESTFN)
457        try:
458            src = os.path.join(TESTFN, 'cheese')
459            dst = os.path.join(TESTFN, 'shop')
460            os.mkdir(src)
461            open(os.path.join(src, 'spam'), 'wb').close()
462            _winapi.CreateJunction(src, dst)
463            self.assertRaises(OSError, shutil.rmtree, dst)
464            shutil.rmtree(dst, ignore_errors=True)
465        finally:
466            shutil.rmtree(TESTFN, ignore_errors=True)
467
468
469class TestCopyTree(BaseTest, unittest.TestCase):
470
471    def test_copytree_simple(self):
472        src_dir = self.mkdtemp()
473        dst_dir = os.path.join(self.mkdtemp(), 'destination')
474        self.addCleanup(shutil.rmtree, src_dir)
475        self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
476        write_file((src_dir, 'test.txt'), '123')
477        os.mkdir(os.path.join(src_dir, 'test_dir'))
478        write_file((src_dir, 'test_dir', 'test.txt'), '456')
479
480        shutil.copytree(src_dir, dst_dir)
481        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
482        self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
483        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
484                                                    'test.txt')))
485        actual = read_file((dst_dir, 'test.txt'))
486        self.assertEqual(actual, '123')
487        actual = read_file((dst_dir, 'test_dir', 'test.txt'))
488        self.assertEqual(actual, '456')
489
490    def test_copytree_dirs_exist_ok(self):
491        src_dir = self.mkdtemp()
492        dst_dir = self.mkdtemp()
493        self.addCleanup(shutil.rmtree, src_dir)
494        self.addCleanup(shutil.rmtree, dst_dir)
495
496        write_file((src_dir, 'nonexisting.txt'), '123')
497        os.mkdir(os.path.join(src_dir, 'existing_dir'))
498        os.mkdir(os.path.join(dst_dir, 'existing_dir'))
499        write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced')
500        write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced')
501
502        shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
503        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt')))
504        self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir')))
505        self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir',
506                                                    'existing.txt')))
507        actual = read_file((dst_dir, 'nonexisting.txt'))
508        self.assertEqual(actual, '123')
509        actual = read_file((dst_dir, 'existing_dir', 'existing.txt'))
510        self.assertEqual(actual, 'has been replaced')
511
512        with self.assertRaises(FileExistsError):
513            shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False)
514
515    @os_helper.skip_unless_symlink
516    def test_copytree_symlinks(self):
517        tmp_dir = self.mkdtemp()
518        src_dir = os.path.join(tmp_dir, 'src')
519        dst_dir = os.path.join(tmp_dir, 'dst')
520        sub_dir = os.path.join(src_dir, 'sub')
521        os.mkdir(src_dir)
522        os.mkdir(sub_dir)
523        write_file((src_dir, 'file.txt'), 'foo')
524        src_link = os.path.join(sub_dir, 'link')
525        dst_link = os.path.join(dst_dir, 'sub/link')
526        os.symlink(os.path.join(src_dir, 'file.txt'),
527                   src_link)
528        if hasattr(os, 'lchmod'):
529            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
530        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
531            os.lchflags(src_link, stat.UF_NODUMP)
532        src_stat = os.lstat(src_link)
533        shutil.copytree(src_dir, dst_dir, symlinks=True)
534        self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link')))
535        actual = os.readlink(os.path.join(dst_dir, 'sub', 'link'))
536        # Bad practice to blindly strip the prefix as it may be required to
537        # correctly refer to the file, but we're only comparing paths here.
538        if os.name == 'nt' and actual.startswith('\\\\?\\'):
539            actual = actual[4:]
540        self.assertEqual(actual, os.path.join(src_dir, 'file.txt'))
541        dst_stat = os.lstat(dst_link)
542        if hasattr(os, 'lchmod'):
543            self.assertEqual(dst_stat.st_mode, src_stat.st_mode)
544        if hasattr(os, 'lchflags'):
545            self.assertEqual(dst_stat.st_flags, src_stat.st_flags)
546
547    def test_copytree_with_exclude(self):
548        # creating data
549        join = os.path.join
550        exists = os.path.exists
551        src_dir = self.mkdtemp()
552        try:
553            dst_dir = join(self.mkdtemp(), 'destination')
554            write_file((src_dir, 'test.txt'), '123')
555            write_file((src_dir, 'test.tmp'), '123')
556            os.mkdir(join(src_dir, 'test_dir'))
557            write_file((src_dir, 'test_dir', 'test.txt'), '456')
558            os.mkdir(join(src_dir, 'test_dir2'))
559            write_file((src_dir, 'test_dir2', 'test.txt'), '456')
560            os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
561            os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
562            write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
563            write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
564
565            # testing glob-like patterns
566            try:
567                patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
568                shutil.copytree(src_dir, dst_dir, ignore=patterns)
569                # checking the result: some elements should not be copied
570                self.assertTrue(exists(join(dst_dir, 'test.txt')))
571                self.assertFalse(exists(join(dst_dir, 'test.tmp')))
572                self.assertFalse(exists(join(dst_dir, 'test_dir2')))
573            finally:
574                shutil.rmtree(dst_dir)
575            try:
576                patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
577                shutil.copytree(src_dir, dst_dir, ignore=patterns)
578                # checking the result: some elements should not be copied
579                self.assertFalse(exists(join(dst_dir, 'test.tmp')))
580                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2')))
581                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
582            finally:
583                shutil.rmtree(dst_dir)
584
585            # testing callable-style
586            try:
587                def _filter(src, names):
588                    res = []
589                    for name in names:
590                        path = os.path.join(src, name)
591
592                        if (os.path.isdir(path) and
593                            path.split()[-1] == 'subdir'):
594                            res.append(name)
595                        elif os.path.splitext(path)[-1] in ('.py'):
596                            res.append(name)
597                    return res
598
599                shutil.copytree(src_dir, dst_dir, ignore=_filter)
600
601                # checking the result: some elements should not be copied
602                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2',
603                                             'test.py')))
604                self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir')))
605
606            finally:
607                shutil.rmtree(dst_dir)
608        finally:
609            shutil.rmtree(src_dir)
610            shutil.rmtree(os.path.dirname(dst_dir))
611
612    def test_copytree_arg_types_of_ignore(self):
613        join = os.path.join
614        exists = os.path.exists
615
616        tmp_dir = self.mkdtemp()
617        src_dir = join(tmp_dir, "source")
618
619        os.mkdir(join(src_dir))
620        os.mkdir(join(src_dir, 'test_dir'))
621        os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir'))
622        write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456')
623
624        invokations = []
625
626        def _ignore(src, names):
627            invokations.append(src)
628            self.assertIsInstance(src, str)
629            self.assertIsInstance(names, list)
630            self.assertEqual(len(names), len(set(names)))
631            for name in names:
632                self.assertIsInstance(name, str)
633            return []
634
635        dst_dir = join(self.mkdtemp(), 'destination')
636        shutil.copytree(src_dir, dst_dir, ignore=_ignore)
637        self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
638                                    'test.txt')))
639
640        dst_dir = join(self.mkdtemp(), 'destination')
641        shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore)
642        self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
643                                    'test.txt')))
644
645        dst_dir = join(self.mkdtemp(), 'destination')
646        src_dir_entry = list(os.scandir(tmp_dir))[0]
647        self.assertIsInstance(src_dir_entry, os.DirEntry)
648        shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore)
649        self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir',
650                                    'test.txt')))
651
652        self.assertEqual(len(invokations), 9)
653
654    def test_copytree_retains_permissions(self):
655        tmp_dir = self.mkdtemp()
656        src_dir = os.path.join(tmp_dir, 'source')
657        os.mkdir(src_dir)
658        dst_dir = os.path.join(tmp_dir, 'destination')
659        self.addCleanup(shutil.rmtree, tmp_dir)
660
661        os.chmod(src_dir, 0o777)
662        write_file((src_dir, 'permissive.txt'), '123')
663        os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777)
664        write_file((src_dir, 'restrictive.txt'), '456')
665        os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600)
666        restrictive_subdir = tempfile.mkdtemp(dir=src_dir)
667        self.addCleanup(os_helper.rmtree, restrictive_subdir)
668        os.chmod(restrictive_subdir, 0o600)
669
670        shutil.copytree(src_dir, dst_dir)
671        self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode)
672        self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode,
673                          os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode)
674        self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode,
675                          os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode)
676        restrictive_subdir_dst = os.path.join(dst_dir,
677                                              os.path.split(restrictive_subdir)[1])
678        self.assertEqual(os.stat(restrictive_subdir).st_mode,
679                          os.stat(restrictive_subdir_dst).st_mode)
680
681    @unittest.mock.patch('os.chmod')
682    def test_copytree_winerror(self, mock_patch):
683        # When copying to VFAT, copystat() raises OSError. On Windows, the
684        # exception object has a meaningful 'winerror' attribute, but not
685        # on other operating systems. Do not assume 'winerror' is set.
686        src_dir = self.mkdtemp()
687        dst_dir = os.path.join(self.mkdtemp(), 'destination')
688        self.addCleanup(shutil.rmtree, src_dir)
689        self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir))
690
691        mock_patch.side_effect = PermissionError('ka-boom')
692        with self.assertRaises(shutil.Error):
693            shutil.copytree(src_dir, dst_dir)
694
695    def test_copytree_custom_copy_function(self):
696        # See: https://bugs.python.org/issue35648
697        def custom_cpfun(a, b):
698            flag.append(None)
699            self.assertIsInstance(a, str)
700            self.assertIsInstance(b, str)
701            self.assertEqual(a, os.path.join(src, 'foo'))
702            self.assertEqual(b, os.path.join(dst, 'foo'))
703
704        flag = []
705        src = self.mkdtemp()
706        dst = tempfile.mktemp(dir=self.mkdtemp())
707        with open(os.path.join(src, 'foo'), 'w', encoding='utf-8') as f:
708            f.close()
709        shutil.copytree(src, dst, copy_function=custom_cpfun)
710        self.assertEqual(len(flag), 1)
711
712    # Issue #3002: copyfile and copytree block indefinitely on named pipes
713    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
714    @os_helper.skip_unless_symlink
715    @unittest.skipIf(sys.platform == "vxworks",
716                    "fifo requires special path on VxWorks")
717    def test_copytree_named_pipe(self):
718        os.mkdir(TESTFN)
719        try:
720            subdir = os.path.join(TESTFN, "subdir")
721            os.mkdir(subdir)
722            pipe = os.path.join(subdir, "mypipe")
723            try:
724                os.mkfifo(pipe)
725            except PermissionError as e:
726                self.skipTest('os.mkfifo(): %s' % e)
727            try:
728                shutil.copytree(TESTFN, TESTFN2)
729            except shutil.Error as e:
730                errors = e.args[0]
731                self.assertEqual(len(errors), 1)
732                src, dst, error_msg = errors[0]
733                self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
734            else:
735                self.fail("shutil.Error should have been raised")
736        finally:
737            shutil.rmtree(TESTFN, ignore_errors=True)
738            shutil.rmtree(TESTFN2, ignore_errors=True)
739
740    def test_copytree_special_func(self):
741        src_dir = self.mkdtemp()
742        dst_dir = os.path.join(self.mkdtemp(), 'destination')
743        write_file((src_dir, 'test.txt'), '123')
744        os.mkdir(os.path.join(src_dir, 'test_dir'))
745        write_file((src_dir, 'test_dir', 'test.txt'), '456')
746
747        copied = []
748        def _copy(src, dst):
749            copied.append((src, dst))
750
751        shutil.copytree(src_dir, dst_dir, copy_function=_copy)
752        self.assertEqual(len(copied), 2)
753
754    @os_helper.skip_unless_symlink
755    def test_copytree_dangling_symlinks(self):
756        src_dir = self.mkdtemp()
757        valid_file = os.path.join(src_dir, 'test.txt')
758        write_file(valid_file, 'abc')
759        dir_a = os.path.join(src_dir, 'dir_a')
760        os.mkdir(dir_a)
761        for d in src_dir, dir_a:
762            os.symlink('IDONTEXIST', os.path.join(d, 'broken'))
763            os.symlink(valid_file, os.path.join(d, 'valid'))
764
765        # A dangling symlink should raise an error.
766        dst_dir = os.path.join(self.mkdtemp(), 'destination')
767        self.assertRaises(Error, shutil.copytree, src_dir, dst_dir)
768
769        # Dangling symlinks should be ignored with the proper flag.
770        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
771        shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True)
772        for root, dirs, files in os.walk(dst_dir):
773            self.assertNotIn('broken', files)
774            self.assertIn('valid', files)
775
776        # a dangling symlink is copied if symlinks=True
777        dst_dir = os.path.join(self.mkdtemp(), 'destination3')
778        shutil.copytree(src_dir, dst_dir, symlinks=True)
779        self.assertIn('test.txt', os.listdir(dst_dir))
780
781    @os_helper.skip_unless_symlink
782    def test_copytree_symlink_dir(self):
783        src_dir = self.mkdtemp()
784        dst_dir = os.path.join(self.mkdtemp(), 'destination')
785        os.mkdir(os.path.join(src_dir, 'real_dir'))
786        with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'wb'):
787            pass
788        os.symlink(os.path.join(src_dir, 'real_dir'),
789                   os.path.join(src_dir, 'link_to_dir'),
790                   target_is_directory=True)
791
792        shutil.copytree(src_dir, dst_dir, symlinks=False)
793        self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
794        self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
795
796        dst_dir = os.path.join(self.mkdtemp(), 'destination2')
797        shutil.copytree(src_dir, dst_dir, symlinks=True)
798        self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir')))
799        self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir')))
800
801    def test_copytree_return_value(self):
802        # copytree returns its destination path.
803        src_dir = self.mkdtemp()
804        dst_dir = src_dir + "dest"
805        self.addCleanup(shutil.rmtree, dst_dir, True)
806        src = os.path.join(src_dir, 'foo')
807        write_file(src, 'foo')
808        rv = shutil.copytree(src_dir, dst_dir)
809        self.assertEqual(['foo'], os.listdir(rv))
810
811    def test_copytree_subdirectory(self):
812        # copytree where dst is a subdirectory of src, see Issue 38688
813        base_dir = self.mkdtemp()
814        self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True)
815        src_dir = os.path.join(base_dir, "t", "pg")
816        dst_dir = os.path.join(src_dir, "somevendor", "1.0")
817        os.makedirs(src_dir)
818        src = os.path.join(src_dir, 'pol')
819        write_file(src, 'pol')
820        rv = shutil.copytree(src_dir, dst_dir)
821        self.assertEqual(['pol'], os.listdir(rv))
822
823class TestCopy(BaseTest, unittest.TestCase):
824
825    ### shutil.copymode
826
827    @os_helper.skip_unless_symlink
828    def test_copymode_follow_symlinks(self):
829        tmp_dir = self.mkdtemp()
830        src = os.path.join(tmp_dir, 'foo')
831        dst = os.path.join(tmp_dir, 'bar')
832        src_link = os.path.join(tmp_dir, 'baz')
833        dst_link = os.path.join(tmp_dir, 'quux')
834        write_file(src, 'foo')
835        write_file(dst, 'foo')
836        os.symlink(src, src_link)
837        os.symlink(dst, dst_link)
838        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
839        # file to file
840        os.chmod(dst, stat.S_IRWXO)
841        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
842        shutil.copymode(src, dst)
843        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
844        # On Windows, os.chmod does not follow symlinks (issue #15411)
845        if os.name != 'nt':
846            # follow src link
847            os.chmod(dst, stat.S_IRWXO)
848            shutil.copymode(src_link, dst)
849            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
850            # follow dst link
851            os.chmod(dst, stat.S_IRWXO)
852            shutil.copymode(src, dst_link)
853            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
854            # follow both links
855            os.chmod(dst, stat.S_IRWXO)
856            shutil.copymode(src_link, dst_link)
857            self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
858
859    @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod')
860    @os_helper.skip_unless_symlink
861    def test_copymode_symlink_to_symlink(self):
862        tmp_dir = self.mkdtemp()
863        src = os.path.join(tmp_dir, 'foo')
864        dst = os.path.join(tmp_dir, 'bar')
865        src_link = os.path.join(tmp_dir, 'baz')
866        dst_link = os.path.join(tmp_dir, 'quux')
867        write_file(src, 'foo')
868        write_file(dst, 'foo')
869        os.symlink(src, src_link)
870        os.symlink(dst, dst_link)
871        os.chmod(src, stat.S_IRWXU|stat.S_IRWXG)
872        os.chmod(dst, stat.S_IRWXU)
873        os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG)
874        # link to link
875        os.lchmod(dst_link, stat.S_IRWXO)
876        shutil.copymode(src_link, dst_link, follow_symlinks=False)
877        self.assertEqual(os.lstat(src_link).st_mode,
878                         os.lstat(dst_link).st_mode)
879        self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
880        # src link - use chmod
881        os.lchmod(dst_link, stat.S_IRWXO)
882        shutil.copymode(src_link, dst, follow_symlinks=False)
883        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
884        # dst link - use chmod
885        os.lchmod(dst_link, stat.S_IRWXO)
886        shutil.copymode(src, dst_link, follow_symlinks=False)
887        self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode)
888
889    @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing')
890    @os_helper.skip_unless_symlink
891    def test_copymode_symlink_to_symlink_wo_lchmod(self):
892        tmp_dir = self.mkdtemp()
893        src = os.path.join(tmp_dir, 'foo')
894        dst = os.path.join(tmp_dir, 'bar')
895        src_link = os.path.join(tmp_dir, 'baz')
896        dst_link = os.path.join(tmp_dir, 'quux')
897        write_file(src, 'foo')
898        write_file(dst, 'foo')
899        os.symlink(src, src_link)
900        os.symlink(dst, dst_link)
901        shutil.copymode(src_link, dst_link, follow_symlinks=False)  # silent fail
902
903    ### shutil.copystat
904
905    @os_helper.skip_unless_symlink
906    def test_copystat_symlinks(self):
907        tmp_dir = self.mkdtemp()
908        src = os.path.join(tmp_dir, 'foo')
909        dst = os.path.join(tmp_dir, 'bar')
910        src_link = os.path.join(tmp_dir, 'baz')
911        dst_link = os.path.join(tmp_dir, 'qux')
912        write_file(src, 'foo')
913        src_stat = os.stat(src)
914        os.utime(src, (src_stat.st_atime,
915                       src_stat.st_mtime - 42.0))  # ensure different mtimes
916        write_file(dst, 'bar')
917        self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime)
918        os.symlink(src, src_link)
919        os.symlink(dst, dst_link)
920        if hasattr(os, 'lchmod'):
921            os.lchmod(src_link, stat.S_IRWXO)
922        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
923            os.lchflags(src_link, stat.UF_NODUMP)
924        src_link_stat = os.lstat(src_link)
925        # follow
926        if hasattr(os, 'lchmod'):
927            shutil.copystat(src_link, dst_link, follow_symlinks=True)
928            self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode)
929        # don't follow
930        shutil.copystat(src_link, dst_link, follow_symlinks=False)
931        dst_link_stat = os.lstat(dst_link)
932        if os.utime in os.supports_follow_symlinks:
933            for attr in 'st_atime', 'st_mtime':
934                # The modification times may be truncated in the new file.
935                self.assertLessEqual(getattr(src_link_stat, attr),
936                                     getattr(dst_link_stat, attr) + 1)
937        if hasattr(os, 'lchmod'):
938            self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode)
939        if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
940            self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags)
941        # tell to follow but dst is not a link
942        shutil.copystat(src_link, dst, follow_symlinks=False)
943        self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) <
944                        00000.1)
945
946    @unittest.skipUnless(hasattr(os, 'chflags') and
947                         hasattr(errno, 'EOPNOTSUPP') and
948                         hasattr(errno, 'ENOTSUP'),
949                         "requires os.chflags, EOPNOTSUPP & ENOTSUP")
950    def test_copystat_handles_harmless_chflags_errors(self):
951        tmpdir = self.mkdtemp()
952        file1 = os.path.join(tmpdir, 'file1')
953        file2 = os.path.join(tmpdir, 'file2')
954        write_file(file1, 'xxx')
955        write_file(file2, 'xxx')
956
957        def make_chflags_raiser(err):
958            ex = OSError()
959
960            def _chflags_raiser(path, flags, *, follow_symlinks=True):
961                ex.errno = err
962                raise ex
963            return _chflags_raiser
964        old_chflags = os.chflags
965        try:
966            for err in errno.EOPNOTSUPP, errno.ENOTSUP:
967                os.chflags = make_chflags_raiser(err)
968                shutil.copystat(file1, file2)
969            # assert others errors break it
970            os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
971            self.assertRaises(OSError, shutil.copystat, file1, file2)
972        finally:
973            os.chflags = old_chflags
974
975    ### shutil.copyxattr
976
977    @os_helper.skip_unless_xattr
978    def test_copyxattr(self):
979        tmp_dir = self.mkdtemp()
980        src = os.path.join(tmp_dir, 'foo')
981        write_file(src, 'foo')
982        dst = os.path.join(tmp_dir, 'bar')
983        write_file(dst, 'bar')
984
985        # no xattr == no problem
986        shutil._copyxattr(src, dst)
987        # common case
988        os.setxattr(src, 'user.foo', b'42')
989        os.setxattr(src, 'user.bar', b'43')
990        shutil._copyxattr(src, dst)
991        self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst)))
992        self.assertEqual(
993                os.getxattr(src, 'user.foo'),
994                os.getxattr(dst, 'user.foo'))
995        # check errors don't affect other attrs
996        os.remove(dst)
997        write_file(dst, 'bar')
998        os_error = OSError(errno.EPERM, 'EPERM')
999
1000        def _raise_on_user_foo(fname, attr, val, **kwargs):
1001            if attr == 'user.foo':
1002                raise os_error
1003            else:
1004                orig_setxattr(fname, attr, val, **kwargs)
1005        try:
1006            orig_setxattr = os.setxattr
1007            os.setxattr = _raise_on_user_foo
1008            shutil._copyxattr(src, dst)
1009            self.assertIn('user.bar', os.listxattr(dst))
1010        finally:
1011            os.setxattr = orig_setxattr
1012        # the source filesystem not supporting xattrs should be ok, too.
1013        def _raise_on_src(fname, *, follow_symlinks=True):
1014            if fname == src:
1015                raise OSError(errno.ENOTSUP, 'Operation not supported')
1016            return orig_listxattr(fname, follow_symlinks=follow_symlinks)
1017        try:
1018            orig_listxattr = os.listxattr
1019            os.listxattr = _raise_on_src
1020            shutil._copyxattr(src, dst)
1021        finally:
1022            os.listxattr = orig_listxattr
1023
1024        # test that shutil.copystat copies xattrs
1025        src = os.path.join(tmp_dir, 'the_original')
1026        srcro = os.path.join(tmp_dir, 'the_original_ro')
1027        write_file(src, src)
1028        write_file(srcro, srcro)
1029        os.setxattr(src, 'user.the_value', b'fiddly')
1030        os.setxattr(srcro, 'user.the_value', b'fiddly')
1031        os.chmod(srcro, 0o444)
1032        dst = os.path.join(tmp_dir, 'the_copy')
1033        dstro = os.path.join(tmp_dir, 'the_copy_ro')
1034        write_file(dst, dst)
1035        write_file(dstro, dstro)
1036        shutil.copystat(src, dst)
1037        shutil.copystat(srcro, dstro)
1038        self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly')
1039        self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly')
1040
1041    @os_helper.skip_unless_symlink
1042    @os_helper.skip_unless_xattr
1043    @os_helper.skip_unless_dac_override
1044    def test_copyxattr_symlinks(self):
1045        # On Linux, it's only possible to access non-user xattr for symlinks;
1046        # which in turn require root privileges. This test should be expanded
1047        # as soon as other platforms gain support for extended attributes.
1048        tmp_dir = self.mkdtemp()
1049        src = os.path.join(tmp_dir, 'foo')
1050        src_link = os.path.join(tmp_dir, 'baz')
1051        write_file(src, 'foo')
1052        os.symlink(src, src_link)
1053        os.setxattr(src, 'trusted.foo', b'42')
1054        os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False)
1055        dst = os.path.join(tmp_dir, 'bar')
1056        dst_link = os.path.join(tmp_dir, 'qux')
1057        write_file(dst, 'bar')
1058        os.symlink(dst, dst_link)
1059        shutil._copyxattr(src_link, dst_link, follow_symlinks=False)
1060        self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43')
1061        self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo')
1062        shutil._copyxattr(src_link, dst, follow_symlinks=False)
1063        self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43')
1064
1065    ### shutil.copy
1066
1067    def _copy_file(self, method):
1068        fname = 'test.txt'
1069        tmpdir = self.mkdtemp()
1070        write_file((tmpdir, fname), 'xxx')
1071        file1 = os.path.join(tmpdir, fname)
1072        tmpdir2 = self.mkdtemp()
1073        method(file1, tmpdir2)
1074        file2 = os.path.join(tmpdir2, fname)
1075        return (file1, file2)
1076
1077    def test_copy(self):
1078        # Ensure that the copied file exists and has the same mode bits.
1079        file1, file2 = self._copy_file(shutil.copy)
1080        self.assertTrue(os.path.exists(file2))
1081        self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
1082
1083    @os_helper.skip_unless_symlink
1084    def test_copy_symlinks(self):
1085        tmp_dir = self.mkdtemp()
1086        src = os.path.join(tmp_dir, 'foo')
1087        dst = os.path.join(tmp_dir, 'bar')
1088        src_link = os.path.join(tmp_dir, 'baz')
1089        write_file(src, 'foo')
1090        os.symlink(src, src_link)
1091        if hasattr(os, 'lchmod'):
1092            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1093        # don't follow
1094        shutil.copy(src_link, dst, follow_symlinks=True)
1095        self.assertFalse(os.path.islink(dst))
1096        self.assertEqual(read_file(src), read_file(dst))
1097        os.remove(dst)
1098        # follow
1099        shutil.copy(src_link, dst, follow_symlinks=False)
1100        self.assertTrue(os.path.islink(dst))
1101        self.assertEqual(os.readlink(dst), os.readlink(src_link))
1102        if hasattr(os, 'lchmod'):
1103            self.assertEqual(os.lstat(src_link).st_mode,
1104                             os.lstat(dst).st_mode)
1105
1106    ### shutil.copy2
1107
1108    @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime')
1109    def test_copy2(self):
1110        # Ensure that the copied file exists and has the same mode and
1111        # modification time bits.
1112        file1, file2 = self._copy_file(shutil.copy2)
1113        self.assertTrue(os.path.exists(file2))
1114        file1_stat = os.stat(file1)
1115        file2_stat = os.stat(file2)
1116        self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
1117        for attr in 'st_atime', 'st_mtime':
1118            # The modification times may be truncated in the new file.
1119            self.assertLessEqual(getattr(file1_stat, attr),
1120                                 getattr(file2_stat, attr) + 1)
1121        if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
1122            self.assertEqual(getattr(file1_stat, 'st_flags'),
1123                             getattr(file2_stat, 'st_flags'))
1124
1125    @os_helper.skip_unless_symlink
1126    def test_copy2_symlinks(self):
1127        tmp_dir = self.mkdtemp()
1128        src = os.path.join(tmp_dir, 'foo')
1129        dst = os.path.join(tmp_dir, 'bar')
1130        src_link = os.path.join(tmp_dir, 'baz')
1131        write_file(src, 'foo')
1132        os.symlink(src, src_link)
1133        if hasattr(os, 'lchmod'):
1134            os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
1135        if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'):
1136            os.lchflags(src_link, stat.UF_NODUMP)
1137        src_stat = os.stat(src)
1138        src_link_stat = os.lstat(src_link)
1139        # follow
1140        shutil.copy2(src_link, dst, follow_symlinks=True)
1141        self.assertFalse(os.path.islink(dst))
1142        self.assertEqual(read_file(src), read_file(dst))
1143        os.remove(dst)
1144        # don't follow
1145        shutil.copy2(src_link, dst, follow_symlinks=False)
1146        self.assertTrue(os.path.islink(dst))
1147        self.assertEqual(os.readlink(dst), os.readlink(src_link))
1148        dst_stat = os.lstat(dst)
1149        if os.utime in os.supports_follow_symlinks:
1150            for attr in 'st_atime', 'st_mtime':
1151                # The modification times may be truncated in the new file.
1152                self.assertLessEqual(getattr(src_link_stat, attr),
1153                                     getattr(dst_stat, attr) + 1)
1154        if hasattr(os, 'lchmod'):
1155            self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode)
1156            self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode)
1157        if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'):
1158            self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags)
1159
1160    @os_helper.skip_unless_xattr
1161    def test_copy2_xattr(self):
1162        tmp_dir = self.mkdtemp()
1163        src = os.path.join(tmp_dir, 'foo')
1164        dst = os.path.join(tmp_dir, 'bar')
1165        write_file(src, 'foo')
1166        os.setxattr(src, 'user.foo', b'42')
1167        shutil.copy2(src, dst)
1168        self.assertEqual(
1169                os.getxattr(src, 'user.foo'),
1170                os.getxattr(dst, 'user.foo'))
1171        os.remove(dst)
1172
1173    def test_copy_return_value(self):
1174        # copy and copy2 both return their destination path.
1175        for fn in (shutil.copy, shutil.copy2):
1176            src_dir = self.mkdtemp()
1177            dst_dir = self.mkdtemp()
1178            src = os.path.join(src_dir, 'foo')
1179            write_file(src, 'foo')
1180            rv = fn(src, dst_dir)
1181            self.assertEqual(rv, os.path.join(dst_dir, 'foo'))
1182            rv = fn(src, os.path.join(dst_dir, 'bar'))
1183            self.assertEqual(rv, os.path.join(dst_dir, 'bar'))
1184
1185    def test_copy_dir(self):
1186        self._test_copy_dir(shutil.copy)
1187
1188    def test_copy2_dir(self):
1189        self._test_copy_dir(shutil.copy2)
1190
1191    def _test_copy_dir(self, copy_func):
1192        src_dir = self.mkdtemp()
1193        src_file = os.path.join(src_dir, 'foo')
1194        dir2 = self.mkdtemp()
1195        dst = os.path.join(src_dir, 'does_not_exist/')
1196        write_file(src_file, 'foo')
1197        if sys.platform == "win32":
1198            err = PermissionError
1199        else:
1200            err = IsADirectoryError
1201        self.assertRaises(err, copy_func, dir2, src_dir)
1202
1203        # raise *err* because of src rather than FileNotFoundError because of dst
1204        self.assertRaises(err, copy_func, dir2, dst)
1205        copy_func(src_file, dir2)     # should not raise exceptions
1206
1207    ### shutil.copyfile
1208
1209    @os_helper.skip_unless_symlink
1210    def test_copyfile_symlinks(self):
1211        tmp_dir = self.mkdtemp()
1212        src = os.path.join(tmp_dir, 'src')
1213        dst = os.path.join(tmp_dir, 'dst')
1214        dst_link = os.path.join(tmp_dir, 'dst_link')
1215        link = os.path.join(tmp_dir, 'link')
1216        write_file(src, 'foo')
1217        os.symlink(src, link)
1218        # don't follow
1219        shutil.copyfile(link, dst_link, follow_symlinks=False)
1220        self.assertTrue(os.path.islink(dst_link))
1221        self.assertEqual(os.readlink(link), os.readlink(dst_link))
1222        # follow
1223        shutil.copyfile(link, dst)
1224        self.assertFalse(os.path.islink(dst))
1225
1226    @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link')
1227    def test_dont_copy_file_onto_link_to_itself(self):
1228        # bug 851123.
1229        os.mkdir(TESTFN)
1230        src = os.path.join(TESTFN, 'cheese')
1231        dst = os.path.join(TESTFN, 'shop')
1232        try:
1233            with open(src, 'w', encoding='utf-8') as f:
1234                f.write('cheddar')
1235            try:
1236                os.link(src, dst)
1237            except PermissionError as e:
1238                self.skipTest('os.link(): %s' % e)
1239            self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
1240            with open(src, 'r', encoding='utf-8') as f:
1241                self.assertEqual(f.read(), 'cheddar')
1242            os.remove(dst)
1243        finally:
1244            shutil.rmtree(TESTFN, ignore_errors=True)
1245
1246    @os_helper.skip_unless_symlink
1247    def test_dont_copy_file_onto_symlink_to_itself(self):
1248        # bug 851123.
1249        os.mkdir(TESTFN)
1250        src = os.path.join(TESTFN, 'cheese')
1251        dst = os.path.join(TESTFN, 'shop')
1252        try:
1253            with open(src, 'w', encoding='utf-8') as f:
1254                f.write('cheddar')
1255            # Using `src` here would mean we end up with a symlink pointing
1256            # to TESTFN/TESTFN/cheese, while it should point at
1257            # TESTFN/cheese.
1258            os.symlink('cheese', dst)
1259            self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst)
1260            with open(src, 'r', encoding='utf-8') as f:
1261                self.assertEqual(f.read(), 'cheddar')
1262            os.remove(dst)
1263        finally:
1264            shutil.rmtree(TESTFN, ignore_errors=True)
1265
1266    # Issue #3002: copyfile and copytree block indefinitely on named pipes
1267    @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
1268    @unittest.skipIf(sys.platform == "vxworks",
1269                    "fifo requires special path on VxWorks")
1270    def test_copyfile_named_pipe(self):
1271        try:
1272            os.mkfifo(TESTFN)
1273        except PermissionError as e:
1274            self.skipTest('os.mkfifo(): %s' % e)
1275        try:
1276            self.assertRaises(shutil.SpecialFileError,
1277                                shutil.copyfile, TESTFN, TESTFN2)
1278            self.assertRaises(shutil.SpecialFileError,
1279                                shutil.copyfile, __file__, TESTFN)
1280        finally:
1281            os.remove(TESTFN)
1282
1283    def test_copyfile_return_value(self):
1284        # copytree returns its destination path.
1285        src_dir = self.mkdtemp()
1286        dst_dir = self.mkdtemp()
1287        dst_file = os.path.join(dst_dir, 'bar')
1288        src_file = os.path.join(src_dir, 'foo')
1289        write_file(src_file, 'foo')
1290        rv = shutil.copyfile(src_file, dst_file)
1291        self.assertTrue(os.path.exists(rv))
1292        self.assertEqual(read_file(src_file), read_file(dst_file))
1293
1294    def test_copyfile_same_file(self):
1295        # copyfile() should raise SameFileError if the source and destination
1296        # are the same.
1297        src_dir = self.mkdtemp()
1298        src_file = os.path.join(src_dir, 'foo')
1299        write_file(src_file, 'foo')
1300        self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file)
1301        # But Error should work too, to stay backward compatible.
1302        self.assertRaises(Error, shutil.copyfile, src_file, src_file)
1303        # Make sure file is not corrupted.
1304        self.assertEqual(read_file(src_file), 'foo')
1305
1306    @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)')
1307    # gh-92670: The test uses a trailing slash to force the OS consider
1308    # the path as a directory, but on AIX the trailing slash has no effect
1309    # and is considered as a file.
1310    @unittest.skipIf(AIX, 'Not valid on AIX, see gh-92670')
1311    def test_copyfile_nonexistent_dir(self):
1312        # Issue 43219
1313        src_dir = self.mkdtemp()
1314        src_file = os.path.join(src_dir, 'foo')
1315        dst = os.path.join(src_dir, 'does_not_exist/')
1316        write_file(src_file, 'foo')
1317        self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst)
1318
1319    def test_copyfile_copy_dir(self):
1320        # Issue 45234
1321        # test copy() and copyfile() raising proper exceptions when src and/or
1322        # dst are directories
1323        src_dir = self.mkdtemp()
1324        src_file = os.path.join(src_dir, 'foo')
1325        dir2 = self.mkdtemp()
1326        dst = os.path.join(src_dir, 'does_not_exist/')
1327        write_file(src_file, 'foo')
1328        if sys.platform == "win32":
1329            err = PermissionError
1330        else:
1331            err = IsADirectoryError
1332
1333        self.assertRaises(err, shutil.copyfile, src_dir, dst)
1334        self.assertRaises(err, shutil.copyfile, src_file, src_dir)
1335        self.assertRaises(err, shutil.copyfile, dir2, src_dir)
1336
1337
1338class TestArchives(BaseTest, unittest.TestCase):
1339
1340    ### shutil.make_archive
1341
1342    @support.requires_zlib()
1343    def test_make_tarball(self):
1344        # creating something to tar
1345        root_dir, base_dir = self._create_files('')
1346
1347        tmpdir2 = self.mkdtemp()
1348        # force shutil to create the directory
1349        os.rmdir(tmpdir2)
1350        # working with relative paths
1351        work_dir = os.path.dirname(tmpdir2)
1352        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
1353
1354        with os_helper.change_cwd(work_dir), no_chdir:
1355            base_name = os.path.abspath(rel_base_name)
1356            tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
1357
1358        # check if the compressed tarball was created
1359        self.assertEqual(tarball, base_name + '.tar.gz')
1360        self.assertTrue(os.path.isfile(tarball))
1361        self.assertTrue(tarfile.is_tarfile(tarball))
1362        with tarfile.open(tarball, 'r:gz') as tf:
1363            self.assertCountEqual(tf.getnames(),
1364                                  ['.', './sub', './sub2',
1365                                   './file1', './file2', './sub/file3'])
1366
1367        # trying an uncompressed one
1368        with os_helper.change_cwd(work_dir), no_chdir:
1369            tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
1370        self.assertEqual(tarball, base_name + '.tar')
1371        self.assertTrue(os.path.isfile(tarball))
1372        self.assertTrue(tarfile.is_tarfile(tarball))
1373        with tarfile.open(tarball, 'r') as tf:
1374            self.assertCountEqual(tf.getnames(),
1375                                  ['.', './sub', './sub2',
1376                                  './file1', './file2', './sub/file3'])
1377
1378    def _tarinfo(self, path):
1379        with tarfile.open(path) as tar:
1380            names = tar.getnames()
1381            names.sort()
1382            return tuple(names)
1383
1384    def _create_files(self, base_dir='dist'):
1385        # creating something to tar
1386        root_dir = self.mkdtemp()
1387        dist = os.path.join(root_dir, base_dir)
1388        os.makedirs(dist, exist_ok=True)
1389        write_file((dist, 'file1'), 'xxx')
1390        write_file((dist, 'file2'), 'xxx')
1391        os.mkdir(os.path.join(dist, 'sub'))
1392        write_file((dist, 'sub', 'file3'), 'xxx')
1393        os.mkdir(os.path.join(dist, 'sub2'))
1394        if base_dir:
1395            write_file((root_dir, 'outer'), 'xxx')
1396        return root_dir, base_dir
1397
1398    @support.requires_zlib()
1399    @unittest.skipUnless(shutil.which('tar'),
1400                         'Need the tar command to run')
1401    def test_tarfile_vs_tar(self):
1402        root_dir, base_dir = self._create_files()
1403        base_name = os.path.join(self.mkdtemp(), 'archive')
1404        with no_chdir:
1405            tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
1406
1407        # check if the compressed tarball was created
1408        self.assertEqual(tarball, base_name + '.tar.gz')
1409        self.assertTrue(os.path.isfile(tarball))
1410
1411        # now create another tarball using `tar`
1412        tarball2 = os.path.join(root_dir, 'archive2.tar')
1413        tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
1414        subprocess.check_call(tar_cmd, cwd=root_dir,
1415                              stdout=subprocess.DEVNULL)
1416
1417        self.assertTrue(os.path.isfile(tarball2))
1418        # let's compare both tarballs
1419        self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
1420
1421        # trying an uncompressed one
1422        with no_chdir:
1423            tarball = make_archive(base_name, 'tar', root_dir, base_dir)
1424        self.assertEqual(tarball, base_name + '.tar')
1425        self.assertTrue(os.path.isfile(tarball))
1426
1427        # now for a dry_run
1428        with no_chdir:
1429            tarball = make_archive(base_name, 'tar', root_dir, base_dir,
1430                                   dry_run=True)
1431        self.assertEqual(tarball, base_name + '.tar')
1432        self.assertTrue(os.path.isfile(tarball))
1433
1434    @support.requires_zlib()
1435    def test_make_zipfile(self):
1436        # creating something to zip
1437        root_dir, base_dir = self._create_files()
1438
1439        tmpdir2 = self.mkdtemp()
1440        # force shutil to create the directory
1441        os.rmdir(tmpdir2)
1442        # working with relative paths
1443        work_dir = os.path.dirname(tmpdir2)
1444        rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
1445
1446        with os_helper.change_cwd(work_dir), no_chdir:
1447            base_name = os.path.abspath(rel_base_name)
1448            res = make_archive(rel_base_name, 'zip', root_dir)
1449
1450        self.assertEqual(res, base_name + '.zip')
1451        self.assertTrue(os.path.isfile(res))
1452        self.assertTrue(zipfile.is_zipfile(res))
1453        with zipfile.ZipFile(res) as zf:
1454            self.assertCountEqual(zf.namelist(),
1455                    ['dist/', 'dist/sub/', 'dist/sub2/',
1456                     'dist/file1', 'dist/file2', 'dist/sub/file3',
1457                     'outer'])
1458
1459        with os_helper.change_cwd(work_dir), no_chdir:
1460            base_name = os.path.abspath(rel_base_name)
1461            res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
1462
1463        self.assertEqual(res, base_name + '.zip')
1464        self.assertTrue(os.path.isfile(res))
1465        self.assertTrue(zipfile.is_zipfile(res))
1466        with zipfile.ZipFile(res) as zf:
1467            self.assertCountEqual(zf.namelist(),
1468                    ['dist/', 'dist/sub/', 'dist/sub2/',
1469                     'dist/file1', 'dist/file2', 'dist/sub/file3'])
1470
1471    @support.requires_zlib()
1472    @unittest.skipUnless(shutil.which('zip'),
1473                         'Need the zip command to run')
1474    def test_zipfile_vs_zip(self):
1475        root_dir, base_dir = self._create_files()
1476        base_name = os.path.join(self.mkdtemp(), 'archive')
1477        with no_chdir:
1478            archive = make_archive(base_name, 'zip', root_dir, base_dir)
1479
1480        # check if ZIP file  was created
1481        self.assertEqual(archive, base_name + '.zip')
1482        self.assertTrue(os.path.isfile(archive))
1483
1484        # now create another ZIP file using `zip`
1485        archive2 = os.path.join(root_dir, 'archive2.zip')
1486        zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
1487        subprocess.check_call(zip_cmd, cwd=root_dir,
1488                              stdout=subprocess.DEVNULL)
1489
1490        self.assertTrue(os.path.isfile(archive2))
1491        # let's compare both ZIP files
1492        with zipfile.ZipFile(archive) as zf:
1493            names = zf.namelist()
1494        with zipfile.ZipFile(archive2) as zf:
1495            names2 = zf.namelist()
1496        self.assertEqual(sorted(names), sorted(names2))
1497
1498    @support.requires_zlib()
1499    @unittest.skipUnless(shutil.which('unzip'),
1500                         'Need the unzip command to run')
1501    def test_unzip_zipfile(self):
1502        root_dir, base_dir = self._create_files()
1503        base_name = os.path.join(self.mkdtemp(), 'archive')
1504        with no_chdir:
1505            archive = make_archive(base_name, 'zip', root_dir, base_dir)
1506
1507        # check if ZIP file  was created
1508        self.assertEqual(archive, base_name + '.zip')
1509        self.assertTrue(os.path.isfile(archive))
1510
1511        # now check the ZIP file using `unzip -t`
1512        zip_cmd = ['unzip', '-t', archive]
1513        with os_helper.change_cwd(root_dir):
1514            try:
1515                subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
1516            except subprocess.CalledProcessError as exc:
1517                details = exc.output.decode(errors="replace")
1518                if 'unrecognized option: t' in details:
1519                    self.skipTest("unzip doesn't support -t")
1520                msg = "{}\n\n**Unzip Output**\n{}"
1521                self.fail(msg.format(exc, details))
1522
1523    def test_make_archive(self):
1524        tmpdir = self.mkdtemp()
1525        base_name = os.path.join(tmpdir, 'archive')
1526        self.assertRaises(ValueError, make_archive, base_name, 'xxx')
1527
1528    @support.requires_zlib()
1529    def test_make_archive_owner_group(self):
1530        # testing make_archive with owner and group, with various combinations
1531        # this works even if there's not gid/uid support
1532        if UID_GID_SUPPORT:
1533            group = grp.getgrgid(0)[0]
1534            owner = pwd.getpwuid(0)[0]
1535        else:
1536            group = owner = 'root'
1537
1538        root_dir, base_dir = self._create_files()
1539        base_name = os.path.join(self.mkdtemp(), 'archive')
1540        res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
1541                           group=group)
1542        self.assertTrue(os.path.isfile(res))
1543
1544        res = make_archive(base_name, 'zip', root_dir, base_dir)
1545        self.assertTrue(os.path.isfile(res))
1546
1547        res = make_archive(base_name, 'tar', root_dir, base_dir,
1548                           owner=owner, group=group)
1549        self.assertTrue(os.path.isfile(res))
1550
1551        res = make_archive(base_name, 'tar', root_dir, base_dir,
1552                           owner='kjhkjhkjg', group='oihohoh')
1553        self.assertTrue(os.path.isfile(res))
1554
1555
1556    @support.requires_zlib()
1557    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1558    def test_tarfile_root_owner(self):
1559        root_dir, base_dir = self._create_files()
1560        base_name = os.path.join(self.mkdtemp(), 'archive')
1561        group = grp.getgrgid(0)[0]
1562        owner = pwd.getpwuid(0)[0]
1563        with os_helper.change_cwd(root_dir), no_chdir:
1564            archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
1565                                        owner=owner, group=group)
1566
1567        # check if the compressed tarball was created
1568        self.assertTrue(os.path.isfile(archive_name))
1569
1570        # now checks the rights
1571        archive = tarfile.open(archive_name)
1572        try:
1573            for member in archive.getmembers():
1574                self.assertEqual(member.uid, 0)
1575                self.assertEqual(member.gid, 0)
1576        finally:
1577            archive.close()
1578
1579    def test_make_archive_cwd(self):
1580        current_dir = os.getcwd()
1581        root_dir = self.mkdtemp()
1582        def _breaks(*args, **kw):
1583            raise RuntimeError()
1584        dirs = []
1585        def _chdir(path):
1586            dirs.append(path)
1587            orig_chdir(path)
1588
1589        register_archive_format('xxx', _breaks, [], 'xxx file')
1590        try:
1591            with support.swap_attr(os, 'chdir', _chdir) as orig_chdir:
1592                try:
1593                    make_archive('xxx', 'xxx', root_dir=root_dir)
1594                except Exception:
1595                    pass
1596            self.assertEqual(os.getcwd(), current_dir)
1597            self.assertEqual(dirs, [root_dir, current_dir])
1598        finally:
1599            unregister_archive_format('xxx')
1600
1601    def test_make_tarfile_in_curdir(self):
1602        # Issue #21280
1603        root_dir = self.mkdtemp()
1604        with os_helper.change_cwd(root_dir), no_chdir:
1605            self.assertEqual(make_archive('test', 'tar'), 'test.tar')
1606            self.assertTrue(os.path.isfile('test.tar'))
1607
1608    @support.requires_zlib()
1609    def test_make_zipfile_in_curdir(self):
1610        # Issue #21280
1611        root_dir = self.mkdtemp()
1612        with os_helper.change_cwd(root_dir), no_chdir:
1613            self.assertEqual(make_archive('test', 'zip'), 'test.zip')
1614            self.assertTrue(os.path.isfile('test.zip'))
1615
1616    def test_register_archive_format(self):
1617
1618        self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
1619        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1620                          1)
1621        self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1622                          [(1, 2), (1, 2, 3)])
1623
1624        register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
1625        formats = [name for name, params in get_archive_formats()]
1626        self.assertIn('xxx', formats)
1627
1628        unregister_archive_format('xxx')
1629        formats = [name for name, params in get_archive_formats()]
1630        self.assertNotIn('xxx', formats)
1631
1632    ### shutil.unpack_archive
1633
1634    def check_unpack_archive(self, format, **kwargs):
1635        self.check_unpack_archive_with_converter(
1636            format, lambda path: path, **kwargs)
1637        self.check_unpack_archive_with_converter(
1638            format, pathlib.Path, **kwargs)
1639        self.check_unpack_archive_with_converter(format, FakePath, **kwargs)
1640
1641    def check_unpack_archive_with_converter(self, format, converter, **kwargs):
1642        root_dir, base_dir = self._create_files()
1643        expected = rlistdir(root_dir)
1644        expected.remove('outer')
1645
1646        base_name = os.path.join(self.mkdtemp(), 'archive')
1647        filename = make_archive(base_name, format, root_dir, base_dir)
1648
1649        # let's try to unpack it now
1650        tmpdir2 = self.mkdtemp()
1651        unpack_archive(converter(filename), converter(tmpdir2), **kwargs)
1652        self.assertEqual(rlistdir(tmpdir2), expected)
1653
1654        # and again, this time with the format specified
1655        tmpdir3 = self.mkdtemp()
1656        unpack_archive(converter(filename), converter(tmpdir3), format=format,
1657                       **kwargs)
1658        self.assertEqual(rlistdir(tmpdir3), expected)
1659
1660        with self.assertRaises(shutil.ReadError):
1661            unpack_archive(converter(TESTFN), **kwargs)
1662        with self.assertRaises(ValueError):
1663            unpack_archive(converter(TESTFN), format='xxx', **kwargs)
1664
1665    def check_unpack_tarball(self, format):
1666        self.check_unpack_archive(format, filter='fully_trusted')
1667        self.check_unpack_archive(format, filter='data')
1668        with warnings_helper.check_no_warnings(self):
1669            self.check_unpack_archive(format)
1670
1671    def test_unpack_archive_tar(self):
1672        self.check_unpack_tarball('tar')
1673
1674    @support.requires_zlib()
1675    def test_unpack_archive_gztar(self):
1676        self.check_unpack_tarball('gztar')
1677
1678    @support.requires_bz2()
1679    def test_unpack_archive_bztar(self):
1680        self.check_unpack_tarball('bztar')
1681
1682    @support.requires_lzma()
1683    @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger")
1684    def test_unpack_archive_xztar(self):
1685        self.check_unpack_tarball('xztar')
1686
1687    @support.requires_zlib()
1688    def test_unpack_archive_zip(self):
1689        self.check_unpack_archive('zip')
1690        with self.assertRaises(TypeError):
1691            self.check_unpack_archive('zip', filter='data')
1692
1693    def test_unpack_registry(self):
1694
1695        formats = get_unpack_formats()
1696
1697        def _boo(filename, extract_dir, extra):
1698            self.assertEqual(extra, 1)
1699            self.assertEqual(filename, 'stuff.boo')
1700            self.assertEqual(extract_dir, 'xx')
1701
1702        register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)])
1703        unpack_archive('stuff.boo', 'xx')
1704
1705        # trying to register a .boo unpacker again
1706        self.assertRaises(RegistryError, register_unpack_format, 'Boo2',
1707                          ['.boo'], _boo)
1708
1709        # should work now
1710        unregister_unpack_format('Boo')
1711        register_unpack_format('Boo2', ['.boo'], _boo)
1712        self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats())
1713        self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats())
1714
1715        # let's leave a clean state
1716        unregister_unpack_format('Boo2')
1717        self.assertEqual(get_unpack_formats(), formats)
1718
1719
1720class TestMisc(BaseTest, unittest.TestCase):
1721
1722    @unittest.skipUnless(hasattr(shutil, 'disk_usage'),
1723                         "disk_usage not available on this platform")
1724    def test_disk_usage(self):
1725        usage = shutil.disk_usage(os.path.dirname(__file__))
1726        for attr in ('total', 'used', 'free'):
1727            self.assertIsInstance(getattr(usage, attr), int)
1728        self.assertGreater(usage.total, 0)
1729        self.assertGreater(usage.used, 0)
1730        self.assertGreaterEqual(usage.free, 0)
1731        self.assertGreaterEqual(usage.total, usage.used)
1732        self.assertGreater(usage.total, usage.free)
1733
1734        # bpo-32557: Check that disk_usage() also accepts a filename
1735        shutil.disk_usage(__file__)
1736
1737    @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
1738    @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown')
1739    def test_chown(self):
1740        dirname = self.mkdtemp()
1741        filename = tempfile.mktemp(dir=dirname)
1742        write_file(filename, 'testing chown function')
1743
1744        with self.assertRaises(ValueError):
1745            shutil.chown(filename)
1746
1747        with self.assertRaises(LookupError):
1748            shutil.chown(filename, user='non-existing username')
1749
1750        with self.assertRaises(LookupError):
1751            shutil.chown(filename, group='non-existing groupname')
1752
1753        with self.assertRaises(TypeError):
1754            shutil.chown(filename, b'spam')
1755
1756        with self.assertRaises(TypeError):
1757            shutil.chown(filename, 3.14)
1758
1759        uid = os.getuid()
1760        gid = os.getgid()
1761
1762        def check_chown(path, uid=None, gid=None):
1763            s = os.stat(filename)
1764            if uid is not None:
1765                self.assertEqual(uid, s.st_uid)
1766            if gid is not None:
1767                self.assertEqual(gid, s.st_gid)
1768
1769        shutil.chown(filename, uid, gid)
1770        check_chown(filename, uid, gid)
1771        shutil.chown(filename, uid)
1772        check_chown(filename, uid)
1773        shutil.chown(filename, user=uid)
1774        check_chown(filename, uid)
1775        shutil.chown(filename, group=gid)
1776        check_chown(filename, gid=gid)
1777
1778        shutil.chown(dirname, uid, gid)
1779        check_chown(dirname, uid, gid)
1780        shutil.chown(dirname, uid)
1781        check_chown(dirname, uid)
1782        shutil.chown(dirname, user=uid)
1783        check_chown(dirname, uid)
1784        shutil.chown(dirname, group=gid)
1785        check_chown(dirname, gid=gid)
1786
1787        try:
1788            user = pwd.getpwuid(uid)[0]
1789            group = grp.getgrgid(gid)[0]
1790        except KeyError:
1791            # On some systems uid/gid cannot be resolved.
1792            pass
1793        else:
1794            shutil.chown(filename, user, group)
1795            check_chown(filename, uid, gid)
1796            shutil.chown(dirname, user, group)
1797            check_chown(dirname, uid, gid)
1798
1799
1800class TestWhich(BaseTest, unittest.TestCase):
1801
1802    def setUp(self):
1803        self.temp_dir = self.mkdtemp(prefix="Tmp")
1804        # Give the temp_file an ".exe" suffix for all.
1805        # It's needed on Windows and not harmful on other platforms.
1806        self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1807                                                     prefix="Tmp",
1808                                                     suffix=".Exe")
1809        os.chmod(self.temp_file.name, stat.S_IXUSR)
1810        self.addCleanup(self.temp_file.close)
1811        self.dir, self.file = os.path.split(self.temp_file.name)
1812        self.env_path = self.dir
1813        self.curdir = os.curdir
1814        self.ext = ".EXE"
1815
1816    def test_basic(self):
1817        # Given an EXE in a directory, it should be returned.
1818        rv = shutil.which(self.file, path=self.dir)
1819        self.assertEqual(rv, self.temp_file.name)
1820
1821    def test_absolute_cmd(self):
1822        # When given the fully qualified path to an executable that exists,
1823        # it should be returned.
1824        rv = shutil.which(self.temp_file.name, path=self.temp_dir)
1825        self.assertEqual(rv, self.temp_file.name)
1826
1827    def test_relative_cmd(self):
1828        # When given the relative path with a directory part to an executable
1829        # that exists, it should be returned.
1830        base_dir, tail_dir = os.path.split(self.dir)
1831        relpath = os.path.join(tail_dir, self.file)
1832        with os_helper.change_cwd(path=base_dir):
1833            rv = shutil.which(relpath, path=self.temp_dir)
1834            self.assertEqual(rv, relpath)
1835        # But it shouldn't be searched in PATH directories (issue #16957).
1836        with os_helper.change_cwd(path=self.dir):
1837            rv = shutil.which(relpath, path=base_dir)
1838            self.assertIsNone(rv)
1839
1840    def test_cwd(self):
1841        # Issue #16957
1842        base_dir = os.path.dirname(self.dir)
1843        with os_helper.change_cwd(path=self.dir):
1844            rv = shutil.which(self.file, path=base_dir)
1845            if sys.platform == "win32":
1846                # Windows: current directory implicitly on PATH
1847                self.assertEqual(rv, os.path.join(self.curdir, self.file))
1848            else:
1849                # Other platforms: shouldn't match in the current directory.
1850                self.assertIsNone(rv)
1851
1852    @os_helper.skip_if_dac_override
1853    def test_non_matching_mode(self):
1854        # Set the file read-only and ask for writeable files.
1855        os.chmod(self.temp_file.name, stat.S_IREAD)
1856        if os.access(self.temp_file.name, os.W_OK):
1857            self.skipTest("can't set the file read-only")
1858        rv = shutil.which(self.file, path=self.dir, mode=os.W_OK)
1859        self.assertIsNone(rv)
1860
1861    def test_relative_path(self):
1862        base_dir, tail_dir = os.path.split(self.dir)
1863        with os_helper.change_cwd(path=base_dir):
1864            rv = shutil.which(self.file, path=tail_dir)
1865            self.assertEqual(rv, os.path.join(tail_dir, self.file))
1866
1867    def test_nonexistent_file(self):
1868        # Return None when no matching executable file is found on the path.
1869        rv = shutil.which("foo.exe", path=self.dir)
1870        self.assertIsNone(rv)
1871
1872    @unittest.skipUnless(sys.platform == "win32",
1873                         "pathext check is Windows-only")
1874    def test_pathext_checking(self):
1875        # Ask for the file without the ".exe" extension, then ensure that
1876        # it gets found properly with the extension.
1877        rv = shutil.which(self.file[:-4], path=self.dir)
1878        self.assertEqual(rv, self.temp_file.name[:-4] + self.ext)
1879
1880    def test_environ_path(self):
1881        with os_helper.EnvironmentVarGuard() as env:
1882            env['PATH'] = self.env_path
1883            rv = shutil.which(self.file)
1884            self.assertEqual(rv, self.temp_file.name)
1885
1886    def test_environ_path_empty(self):
1887        # PATH='': no match
1888        with os_helper.EnvironmentVarGuard() as env:
1889            env['PATH'] = ''
1890            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1891                                     create=True), \
1892                 support.swap_attr(os, 'defpath', self.dir), \
1893                 os_helper.change_cwd(self.dir):
1894                rv = shutil.which(self.file)
1895                self.assertIsNone(rv)
1896
1897    def test_environ_path_cwd(self):
1898        expected_cwd = os.path.basename(self.temp_file.name)
1899        if sys.platform == "win32":
1900            curdir = os.curdir
1901            if isinstance(expected_cwd, bytes):
1902                curdir = os.fsencode(curdir)
1903            expected_cwd = os.path.join(curdir, expected_cwd)
1904
1905        # PATH=':': explicitly looks in the current directory
1906        with os_helper.EnvironmentVarGuard() as env:
1907            env['PATH'] = os.pathsep
1908            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1909                                     create=True), \
1910                 support.swap_attr(os, 'defpath', self.dir):
1911                rv = shutil.which(self.file)
1912                self.assertIsNone(rv)
1913
1914                # look in current directory
1915                with os_helper.change_cwd(self.dir):
1916                    rv = shutil.which(self.file)
1917                    self.assertEqual(rv, expected_cwd)
1918
1919    def test_environ_path_missing(self):
1920        with os_helper.EnvironmentVarGuard() as env:
1921            env.pop('PATH', None)
1922
1923            # without confstr
1924            with unittest.mock.patch('os.confstr', side_effect=ValueError, \
1925                                     create=True), \
1926                 support.swap_attr(os, 'defpath', self.dir):
1927                rv = shutil.which(self.file)
1928            self.assertEqual(rv, self.temp_file.name)
1929
1930            # with confstr
1931            with unittest.mock.patch('os.confstr', return_value=self.dir, \
1932                                     create=True), \
1933                 support.swap_attr(os, 'defpath', ''):
1934                rv = shutil.which(self.file)
1935            self.assertEqual(rv, self.temp_file.name)
1936
1937    def test_empty_path(self):
1938        base_dir = os.path.dirname(self.dir)
1939        with os_helper.change_cwd(path=self.dir), \
1940             os_helper.EnvironmentVarGuard() as env:
1941            env['PATH'] = self.env_path
1942            rv = shutil.which(self.file, path='')
1943            self.assertIsNone(rv)
1944
1945    def test_empty_path_no_PATH(self):
1946        with os_helper.EnvironmentVarGuard() as env:
1947            env.pop('PATH', None)
1948            rv = shutil.which(self.file)
1949            self.assertIsNone(rv)
1950
1951    @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1952    def test_pathext(self):
1953        ext = ".xyz"
1954        temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1955                                                   prefix="Tmp2", suffix=ext)
1956        os.chmod(temp_filexyz.name, stat.S_IXUSR)
1957        self.addCleanup(temp_filexyz.close)
1958
1959        # strip path and extension
1960        program = os.path.basename(temp_filexyz.name)
1961        program = os.path.splitext(program)[0]
1962
1963        with os_helper.EnvironmentVarGuard() as env:
1964            env['PATHEXT'] = ext
1965            rv = shutil.which(program, path=self.temp_dir)
1966            self.assertEqual(rv, temp_filexyz.name)
1967
1968    # Issue 40592: See https://bugs.python.org/issue40592
1969    @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows')
1970    def test_pathext_with_empty_str(self):
1971        ext = ".xyz"
1972        temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir,
1973                                                   prefix="Tmp2", suffix=ext)
1974        self.addCleanup(temp_filexyz.close)
1975
1976        # strip path and extension
1977        program = os.path.basename(temp_filexyz.name)
1978        program = os.path.splitext(program)[0]
1979
1980        with os_helper.EnvironmentVarGuard() as env:
1981            env['PATHEXT'] = f"{ext};"  # note the ;
1982            rv = shutil.which(program, path=self.temp_dir)
1983            self.assertEqual(rv, temp_filexyz.name)
1984
1985
1986class TestWhichBytes(TestWhich):
1987    def setUp(self):
1988        TestWhich.setUp(self)
1989        self.dir = os.fsencode(self.dir)
1990        self.file = os.fsencode(self.file)
1991        self.temp_file.name = os.fsencode(self.temp_file.name)
1992        self.curdir = os.fsencode(self.curdir)
1993        self.ext = os.fsencode(self.ext)
1994
1995
1996class TestMove(BaseTest, unittest.TestCase):
1997
1998    def setUp(self):
1999        filename = "foo"
2000        self.src_dir = self.mkdtemp()
2001        self.dst_dir = self.mkdtemp()
2002        self.src_file = os.path.join(self.src_dir, filename)
2003        self.dst_file = os.path.join(self.dst_dir, filename)
2004        with open(self.src_file, "wb") as f:
2005            f.write(b"spam")
2006
2007    def _check_move_file(self, src, dst, real_dst):
2008        with open(src, "rb") as f:
2009            contents = f.read()
2010        shutil.move(src, dst)
2011        with open(real_dst, "rb") as f:
2012            self.assertEqual(contents, f.read())
2013        self.assertFalse(os.path.exists(src))
2014
2015    def _check_move_dir(self, src, dst, real_dst):
2016        contents = sorted(os.listdir(src))
2017        shutil.move(src, dst)
2018        self.assertEqual(contents, sorted(os.listdir(real_dst)))
2019        self.assertFalse(os.path.exists(src))
2020
2021    def test_move_file(self):
2022        # Move a file to another location on the same filesystem.
2023        self._check_move_file(self.src_file, self.dst_file, self.dst_file)
2024
2025    def test_move_file_to_dir(self):
2026        # Move a file inside an existing dir on the same filesystem.
2027        self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
2028
2029    def test_move_file_to_dir_pathlike_src(self):
2030        # Move a pathlike file to another location on the same filesystem.
2031        src = pathlib.Path(self.src_file)
2032        self._check_move_file(src, self.dst_dir, self.dst_file)
2033
2034    def test_move_file_to_dir_pathlike_dst(self):
2035        # Move a file to another pathlike location on the same filesystem.
2036        dst = pathlib.Path(self.dst_dir)
2037        self._check_move_file(self.src_file, dst, self.dst_file)
2038
2039    @mock_rename
2040    def test_move_file_other_fs(self):
2041        # Move a file to an existing dir on another filesystem.
2042        self.test_move_file()
2043
2044    @mock_rename
2045    def test_move_file_to_dir_other_fs(self):
2046        # Move a file to another location on another filesystem.
2047        self.test_move_file_to_dir()
2048
2049    def test_move_dir(self):
2050        # Move a dir to another location on the same filesystem.
2051        dst_dir = tempfile.mktemp(dir=self.mkdtemp())
2052        try:
2053            self._check_move_dir(self.src_dir, dst_dir, dst_dir)
2054        finally:
2055            os_helper.rmtree(dst_dir)
2056
2057    @mock_rename
2058    def test_move_dir_other_fs(self):
2059        # Move a dir to another location on another filesystem.
2060        self.test_move_dir()
2061
2062    def test_move_dir_to_dir(self):
2063        # Move a dir inside an existing dir on the same filesystem.
2064        self._check_move_dir(self.src_dir, self.dst_dir,
2065            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
2066
2067    @mock_rename
2068    def test_move_dir_to_dir_other_fs(self):
2069        # Move a dir inside an existing dir on another filesystem.
2070        self.test_move_dir_to_dir()
2071
2072    def test_move_dir_sep_to_dir(self):
2073        self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
2074            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
2075
2076    @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
2077    def test_move_dir_altsep_to_dir(self):
2078        self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
2079            os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
2080
2081    def test_existing_file_inside_dest_dir(self):
2082        # A file with the same name inside the destination dir already exists.
2083        with open(self.dst_file, "wb"):
2084            pass
2085        self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
2086
2087    def test_dont_move_dir_in_itself(self):
2088        # Moving a dir inside itself raises an Error.
2089        dst = os.path.join(self.src_dir, "bar")
2090        self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
2091
2092    def test_destinsrc_false_negative(self):
2093        os.mkdir(TESTFN)
2094        try:
2095            for src, dst in [('srcdir', 'srcdir/dest')]:
2096                src = os.path.join(TESTFN, src)
2097                dst = os.path.join(TESTFN, dst)
2098                self.assertTrue(shutil._destinsrc(src, dst),
2099                             msg='_destinsrc() wrongly concluded that '
2100                             'dst (%s) is not in src (%s)' % (dst, src))
2101        finally:
2102            os_helper.rmtree(TESTFN)
2103
2104    def test_destinsrc_false_positive(self):
2105        os.mkdir(TESTFN)
2106        try:
2107            for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
2108                src = os.path.join(TESTFN, src)
2109                dst = os.path.join(TESTFN, dst)
2110                self.assertFalse(shutil._destinsrc(src, dst),
2111                            msg='_destinsrc() wrongly concluded that '
2112                            'dst (%s) is in src (%s)' % (dst, src))
2113        finally:
2114            os_helper.rmtree(TESTFN)
2115
2116    @os_helper.skip_unless_symlink
2117    @mock_rename
2118    def test_move_file_symlink(self):
2119        dst = os.path.join(self.src_dir, 'bar')
2120        os.symlink(self.src_file, dst)
2121        shutil.move(dst, self.dst_file)
2122        self.assertTrue(os.path.islink(self.dst_file))
2123        self.assertTrue(os.path.samefile(self.src_file, self.dst_file))
2124
2125    @os_helper.skip_unless_symlink
2126    @mock_rename
2127    def test_move_file_symlink_to_dir(self):
2128        filename = "bar"
2129        dst = os.path.join(self.src_dir, filename)
2130        os.symlink(self.src_file, dst)
2131        shutil.move(dst, self.dst_dir)
2132        final_link = os.path.join(self.dst_dir, filename)
2133        self.assertTrue(os.path.islink(final_link))
2134        self.assertTrue(os.path.samefile(self.src_file, final_link))
2135
2136    @os_helper.skip_unless_symlink
2137    @mock_rename
2138    def test_move_dangling_symlink(self):
2139        src = os.path.join(self.src_dir, 'baz')
2140        dst = os.path.join(self.src_dir, 'bar')
2141        os.symlink(src, dst)
2142        dst_link = os.path.join(self.dst_dir, 'quux')
2143        shutil.move(dst, dst_link)
2144        self.assertTrue(os.path.islink(dst_link))
2145        self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link))
2146
2147    @os_helper.skip_unless_symlink
2148    @mock_rename
2149    def test_move_dir_symlink(self):
2150        src = os.path.join(self.src_dir, 'baz')
2151        dst = os.path.join(self.src_dir, 'bar')
2152        os.mkdir(src)
2153        os.symlink(src, dst)
2154        dst_link = os.path.join(self.dst_dir, 'quux')
2155        shutil.move(dst, dst_link)
2156        self.assertTrue(os.path.islink(dst_link))
2157        self.assertTrue(os.path.samefile(src, dst_link))
2158
2159    def test_move_return_value(self):
2160        rv = shutil.move(self.src_file, self.dst_dir)
2161        self.assertEqual(rv,
2162                os.path.join(self.dst_dir, os.path.basename(self.src_file)))
2163
2164    def test_move_as_rename_return_value(self):
2165        rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar'))
2166        self.assertEqual(rv, os.path.join(self.dst_dir, 'bar'))
2167
2168    @mock_rename
2169    def test_move_file_special_function(self):
2170        moved = []
2171        def _copy(src, dst):
2172            moved.append((src, dst))
2173        shutil.move(self.src_file, self.dst_dir, copy_function=_copy)
2174        self.assertEqual(len(moved), 1)
2175
2176    @mock_rename
2177    def test_move_dir_special_function(self):
2178        moved = []
2179        def _copy(src, dst):
2180            moved.append((src, dst))
2181        os_helper.create_empty_file(os.path.join(self.src_dir, 'child'))
2182        os_helper.create_empty_file(os.path.join(self.src_dir, 'child1'))
2183        shutil.move(self.src_dir, self.dst_dir, copy_function=_copy)
2184        self.assertEqual(len(moved), 3)
2185
2186    def test_move_dir_caseinsensitive(self):
2187        # Renames a folder to the same name
2188        # but a different case.
2189
2190        self.src_dir = self.mkdtemp()
2191        dst_dir = os.path.join(
2192                os.path.dirname(self.src_dir),
2193                os.path.basename(self.src_dir).upper())
2194        self.assertNotEqual(self.src_dir, dst_dir)
2195
2196        try:
2197            shutil.move(self.src_dir, dst_dir)
2198            self.assertTrue(os.path.isdir(dst_dir))
2199        finally:
2200            os.rmdir(dst_dir)
2201
2202
2203    @os_helper.skip_unless_dac_override
2204    @unittest.skipUnless(hasattr(os, 'lchflags')
2205                         and hasattr(stat, 'SF_IMMUTABLE')
2206                         and hasattr(stat, 'UF_OPAQUE'),
2207                         'requires lchflags')
2208    def test_move_dir_permission_denied(self):
2209        # bpo-42782: shutil.move should not create destination directories
2210        # if the source directory cannot be removed.
2211        try:
2212            os.mkdir(TESTFN_SRC)
2213            os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2214
2215            # Testing on an empty immutable directory
2216            # TESTFN_DST should not exist if shutil.move failed
2217            self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2218            self.assertFalse(TESTFN_DST in os.listdir())
2219
2220            # Create a file and keep the directory immutable
2221            os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2222            os_helper.create_empty_file(os.path.join(TESTFN_SRC, 'child'))
2223            os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE)
2224
2225            # Testing on a non-empty immutable directory
2226            # TESTFN_DST should not exist if shutil.move failed
2227            self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST)
2228            self.assertFalse(TESTFN_DST in os.listdir())
2229        finally:
2230            if os.path.exists(TESTFN_SRC):
2231                os.lchflags(TESTFN_SRC, stat.UF_OPAQUE)
2232                os_helper.rmtree(TESTFN_SRC)
2233            if os.path.exists(TESTFN_DST):
2234                os.lchflags(TESTFN_DST, stat.UF_OPAQUE)
2235                os_helper.rmtree(TESTFN_DST)
2236
2237
2238class TestCopyFile(unittest.TestCase):
2239
2240    class Faux(object):
2241        _entered = False
2242        _exited_with = None
2243        _raised = False
2244        def __init__(self, raise_in_exit=False, suppress_at_exit=True):
2245            self._raise_in_exit = raise_in_exit
2246            self._suppress_at_exit = suppress_at_exit
2247        def read(self, *args):
2248            return ''
2249        def __enter__(self):
2250            self._entered = True
2251        def __exit__(self, exc_type, exc_val, exc_tb):
2252            self._exited_with = exc_type, exc_val, exc_tb
2253            if self._raise_in_exit:
2254                self._raised = True
2255                raise OSError("Cannot close")
2256            return self._suppress_at_exit
2257
2258    def test_w_source_open_fails(self):
2259        def _open(filename, mode='r'):
2260            if filename == 'srcfile':
2261                raise OSError('Cannot open "srcfile"')
2262            assert 0  # shouldn't reach here.
2263
2264        with support.swap_attr(shutil, 'open', _open):
2265            with self.assertRaises(OSError):
2266                shutil.copyfile('srcfile', 'destfile')
2267
2268    @unittest.skipIf(MACOS, "skipped on macOS")
2269    def test_w_dest_open_fails(self):
2270        srcfile = self.Faux()
2271
2272        def _open(filename, mode='r'):
2273            if filename == 'srcfile':
2274                return srcfile
2275            if filename == 'destfile':
2276                raise OSError('Cannot open "destfile"')
2277            assert 0  # shouldn't reach here.
2278
2279        with support.swap_attr(shutil, 'open', _open):
2280            shutil.copyfile('srcfile', 'destfile')
2281        self.assertTrue(srcfile._entered)
2282        self.assertTrue(srcfile._exited_with[0] is OSError)
2283        self.assertEqual(srcfile._exited_with[1].args,
2284                         ('Cannot open "destfile"',))
2285
2286    @unittest.skipIf(MACOS, "skipped on macOS")
2287    def test_w_dest_close_fails(self):
2288        srcfile = self.Faux()
2289        destfile = self.Faux(True)
2290
2291        def _open(filename, mode='r'):
2292            if filename == 'srcfile':
2293                return srcfile
2294            if filename == 'destfile':
2295                return destfile
2296            assert 0  # shouldn't reach here.
2297
2298        with support.swap_attr(shutil, 'open', _open):
2299            shutil.copyfile('srcfile', 'destfile')
2300        self.assertTrue(srcfile._entered)
2301        self.assertTrue(destfile._entered)
2302        self.assertTrue(destfile._raised)
2303        self.assertTrue(srcfile._exited_with[0] is OSError)
2304        self.assertEqual(srcfile._exited_with[1].args,
2305                         ('Cannot close',))
2306
2307    @unittest.skipIf(MACOS, "skipped on macOS")
2308    def test_w_source_close_fails(self):
2309
2310        srcfile = self.Faux(True)
2311        destfile = self.Faux()
2312
2313        def _open(filename, mode='r'):
2314            if filename == 'srcfile':
2315                return srcfile
2316            if filename == 'destfile':
2317                return destfile
2318            assert 0  # shouldn't reach here.
2319
2320        with support.swap_attr(shutil, 'open', _open):
2321            with self.assertRaises(OSError):
2322                shutil.copyfile('srcfile', 'destfile')
2323        self.assertTrue(srcfile._entered)
2324        self.assertTrue(destfile._entered)
2325        self.assertFalse(destfile._raised)
2326        self.assertTrue(srcfile._exited_with[0] is None)
2327        self.assertTrue(srcfile._raised)
2328
2329
2330class TestCopyFileObj(unittest.TestCase):
2331    FILESIZE = 2 * 1024 * 1024
2332
2333    @classmethod
2334    def setUpClass(cls):
2335        write_test_file(TESTFN, cls.FILESIZE)
2336
2337    @classmethod
2338    def tearDownClass(cls):
2339        os_helper.unlink(TESTFN)
2340        os_helper.unlink(TESTFN2)
2341
2342    def tearDown(self):
2343        os_helper.unlink(TESTFN2)
2344
2345    @contextlib.contextmanager
2346    def get_files(self):
2347        with open(TESTFN, "rb") as src:
2348            with open(TESTFN2, "wb") as dst:
2349                yield (src, dst)
2350
2351    def assert_files_eq(self, src, dst):
2352        with open(src, 'rb') as fsrc:
2353            with open(dst, 'rb') as fdst:
2354                self.assertEqual(fsrc.read(), fdst.read())
2355
2356    def test_content(self):
2357        with self.get_files() as (src, dst):
2358            shutil.copyfileobj(src, dst)
2359        self.assert_files_eq(TESTFN, TESTFN2)
2360
2361    def test_file_not_closed(self):
2362        with self.get_files() as (src, dst):
2363            shutil.copyfileobj(src, dst)
2364            assert not src.closed
2365            assert not dst.closed
2366
2367    def test_file_offset(self):
2368        with self.get_files() as (src, dst):
2369            shutil.copyfileobj(src, dst)
2370            self.assertEqual(src.tell(), self.FILESIZE)
2371            self.assertEqual(dst.tell(), self.FILESIZE)
2372
2373    @unittest.skipIf(os.name != 'nt', "Windows only")
2374    def test_win_impl(self):
2375        # Make sure alternate Windows implementation is called.
2376        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2377            shutil.copyfile(TESTFN, TESTFN2)
2378        assert m.called
2379
2380        # File size is 2 MiB but max buf size should be 1 MiB.
2381        self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024)
2382
2383        # If file size < 1 MiB memoryview() length must be equal to
2384        # the actual file size.
2385        with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
2386            f.write(b'foo')
2387        fname = f.name
2388        self.addCleanup(os_helper.unlink, fname)
2389        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2390            shutil.copyfile(fname, TESTFN2)
2391        self.assertEqual(m.call_args[0][2], 3)
2392
2393        # Empty files should not rely on readinto() variant.
2394        with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f:
2395            pass
2396        fname = f.name
2397        self.addCleanup(os_helper.unlink, fname)
2398        with unittest.mock.patch("shutil._copyfileobj_readinto") as m:
2399            shutil.copyfile(fname, TESTFN2)
2400        assert not m.called
2401        self.assert_files_eq(fname, TESTFN2)
2402
2403
2404class _ZeroCopyFileTest(object):
2405    """Tests common to all zero-copy APIs."""
2406    FILESIZE = (10 * 1024 * 1024)  # 10 MiB
2407    FILEDATA = b""
2408    PATCHPOINT = ""
2409
2410    @classmethod
2411    def setUpClass(cls):
2412        write_test_file(TESTFN, cls.FILESIZE)
2413        with open(TESTFN, 'rb') as f:
2414            cls.FILEDATA = f.read()
2415            assert len(cls.FILEDATA) == cls.FILESIZE
2416
2417    @classmethod
2418    def tearDownClass(cls):
2419        os_helper.unlink(TESTFN)
2420
2421    def tearDown(self):
2422        os_helper.unlink(TESTFN2)
2423
2424    @contextlib.contextmanager
2425    def get_files(self):
2426        with open(TESTFN, "rb") as src:
2427            with open(TESTFN2, "wb") as dst:
2428                yield (src, dst)
2429
2430    def zerocopy_fun(self, *args, **kwargs):
2431        raise NotImplementedError("must be implemented in subclass")
2432
2433    def reset(self):
2434        self.tearDown()
2435        self.tearDownClass()
2436        self.setUpClass()
2437        self.setUp()
2438
2439    # ---
2440
2441    def test_regular_copy(self):
2442        with self.get_files() as (src, dst):
2443            self.zerocopy_fun(src, dst)
2444        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2445        # Make sure the fallback function is not called.
2446        with self.get_files() as (src, dst):
2447            with unittest.mock.patch('shutil.copyfileobj') as m:
2448                shutil.copyfile(TESTFN, TESTFN2)
2449            assert not m.called
2450
2451    def test_same_file(self):
2452        self.addCleanup(self.reset)
2453        with self.get_files() as (src, dst):
2454            with self.assertRaises(Exception):
2455                self.zerocopy_fun(src, src)
2456        # Make sure src file is not corrupted.
2457        self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA)
2458
2459    def test_non_existent_src(self):
2460        name = tempfile.mktemp(dir=os.getcwd())
2461        with self.assertRaises(FileNotFoundError) as cm:
2462            shutil.copyfile(name, "new")
2463        self.assertEqual(cm.exception.filename, name)
2464
2465    def test_empty_file(self):
2466        srcname = TESTFN + 'src'
2467        dstname = TESTFN + 'dst'
2468        self.addCleanup(lambda: os_helper.unlink(srcname))
2469        self.addCleanup(lambda: os_helper.unlink(dstname))
2470        with open(srcname, "wb"):
2471            pass
2472
2473        with open(srcname, "rb") as src:
2474            with open(dstname, "wb") as dst:
2475                self.zerocopy_fun(src, dst)
2476
2477        self.assertEqual(read_file(dstname, binary=True), b"")
2478
2479    def test_unhandled_exception(self):
2480        with unittest.mock.patch(self.PATCHPOINT,
2481                                 side_effect=ZeroDivisionError):
2482            self.assertRaises(ZeroDivisionError,
2483                              shutil.copyfile, TESTFN, TESTFN2)
2484
2485    def test_exception_on_first_call(self):
2486        # Emulate a case where the first call to the zero-copy
2487        # function raises an exception in which case the function is
2488        # supposed to give up immediately.
2489        with unittest.mock.patch(self.PATCHPOINT,
2490                                 side_effect=OSError(errno.EINVAL, "yo")):
2491            with self.get_files() as (src, dst):
2492                with self.assertRaises(_GiveupOnFastCopy):
2493                    self.zerocopy_fun(src, dst)
2494
2495    def test_filesystem_full(self):
2496        # Emulate a case where filesystem is full and sendfile() fails
2497        # on first call.
2498        with unittest.mock.patch(self.PATCHPOINT,
2499                                 side_effect=OSError(errno.ENOSPC, "yo")):
2500            with self.get_files() as (src, dst):
2501                self.assertRaises(OSError, self.zerocopy_fun, src, dst)
2502
2503
2504@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported')
2505class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase):
2506    PATCHPOINT = "os.sendfile"
2507
2508    def zerocopy_fun(self, fsrc, fdst):
2509        return shutil._fastcopy_sendfile(fsrc, fdst)
2510
2511    def test_non_regular_file_src(self):
2512        with io.BytesIO(self.FILEDATA) as src:
2513            with open(TESTFN2, "wb") as dst:
2514                with self.assertRaises(_GiveupOnFastCopy):
2515                    self.zerocopy_fun(src, dst)
2516                shutil.copyfileobj(src, dst)
2517
2518        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2519
2520    def test_non_regular_file_dst(self):
2521        with open(TESTFN, "rb") as src:
2522            with io.BytesIO() as dst:
2523                with self.assertRaises(_GiveupOnFastCopy):
2524                    self.zerocopy_fun(src, dst)
2525                shutil.copyfileobj(src, dst)
2526                dst.seek(0)
2527                self.assertEqual(dst.read(), self.FILEDATA)
2528
2529    def test_exception_on_second_call(self):
2530        def sendfile(*args, **kwargs):
2531            if not flag:
2532                flag.append(None)
2533                return orig_sendfile(*args, **kwargs)
2534            else:
2535                raise OSError(errno.EBADF, "yo")
2536
2537        flag = []
2538        orig_sendfile = os.sendfile
2539        with unittest.mock.patch('os.sendfile', create=True,
2540                                 side_effect=sendfile):
2541            with self.get_files() as (src, dst):
2542                with self.assertRaises(OSError) as cm:
2543                    shutil._fastcopy_sendfile(src, dst)
2544        assert flag
2545        self.assertEqual(cm.exception.errno, errno.EBADF)
2546
2547    def test_cant_get_size(self):
2548        # Emulate a case where src file size cannot be determined.
2549        # Internally bufsize will be set to a small value and
2550        # sendfile() will be called repeatedly.
2551        with unittest.mock.patch('os.fstat', side_effect=OSError) as m:
2552            with self.get_files() as (src, dst):
2553                shutil._fastcopy_sendfile(src, dst)
2554                assert m.called
2555        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2556
2557    def test_small_chunks(self):
2558        # Force internal file size detection to be smaller than the
2559        # actual file size. We want to force sendfile() to be called
2560        # multiple times, also in order to emulate a src fd which gets
2561        # bigger while it is being copied.
2562        mock = unittest.mock.Mock()
2563        mock.st_size = 65536 + 1
2564        with unittest.mock.patch('os.fstat', return_value=mock) as m:
2565            with self.get_files() as (src, dst):
2566                shutil._fastcopy_sendfile(src, dst)
2567                assert m.called
2568        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2569
2570    def test_big_chunk(self):
2571        # Force internal file size detection to be +100MB bigger than
2572        # the actual file size. Make sure sendfile() does not rely on
2573        # file size value except for (maybe) a better throughput /
2574        # performance.
2575        mock = unittest.mock.Mock()
2576        mock.st_size = self.FILESIZE + (100 * 1024 * 1024)
2577        with unittest.mock.patch('os.fstat', return_value=mock) as m:
2578            with self.get_files() as (src, dst):
2579                shutil._fastcopy_sendfile(src, dst)
2580                assert m.called
2581        self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA)
2582
2583    def test_blocksize_arg(self):
2584        with unittest.mock.patch('os.sendfile',
2585                                 side_effect=ZeroDivisionError) as m:
2586            self.assertRaises(ZeroDivisionError,
2587                              shutil.copyfile, TESTFN, TESTFN2)
2588            blocksize = m.call_args[0][3]
2589            # Make sure file size and the block size arg passed to
2590            # sendfile() are the same.
2591            self.assertEqual(blocksize, os.path.getsize(TESTFN))
2592            # ...unless we're dealing with a small file.
2593            os_helper.unlink(TESTFN2)
2594            write_file(TESTFN2, b"hello", binary=True)
2595            self.addCleanup(os_helper.unlink, TESTFN2 + '3')
2596            self.assertRaises(ZeroDivisionError,
2597                              shutil.copyfile, TESTFN2, TESTFN2 + '3')
2598            blocksize = m.call_args[0][3]
2599            self.assertEqual(blocksize, 2 ** 23)
2600
2601    def test_file2file_not_supported(self):
2602        # Emulate a case where sendfile() only support file->socket
2603        # fds. In such a case copyfile() is supposed to skip the
2604        # fast-copy attempt from then on.
2605        assert shutil._USE_CP_SENDFILE
2606        try:
2607            with unittest.mock.patch(
2608                    self.PATCHPOINT,
2609                    side_effect=OSError(errno.ENOTSOCK, "yo")) as m:
2610                with self.get_files() as (src, dst):
2611                    with self.assertRaises(_GiveupOnFastCopy):
2612                        shutil._fastcopy_sendfile(src, dst)
2613                assert m.called
2614            assert not shutil._USE_CP_SENDFILE
2615
2616            with unittest.mock.patch(self.PATCHPOINT) as m:
2617                shutil.copyfile(TESTFN, TESTFN2)
2618                assert not m.called
2619        finally:
2620            shutil._USE_CP_SENDFILE = True
2621
2622
2623@unittest.skipIf(not MACOS, 'macOS only')
2624class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase):
2625    PATCHPOINT = "posix._fcopyfile"
2626
2627    def zerocopy_fun(self, src, dst):
2628        return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA)
2629
2630
2631class TestGetTerminalSize(unittest.TestCase):
2632    def test_does_not_crash(self):
2633        """Check if get_terminal_size() returns a meaningful value.
2634
2635        There's no easy portable way to actually check the size of the
2636        terminal, so let's check if it returns something sensible instead.
2637        """
2638        size = shutil.get_terminal_size()
2639        self.assertGreaterEqual(size.columns, 0)
2640        self.assertGreaterEqual(size.lines, 0)
2641
2642    def test_os_environ_first(self):
2643        "Check if environment variables have precedence"
2644
2645        with os_helper.EnvironmentVarGuard() as env:
2646            env['COLUMNS'] = '777'
2647            del env['LINES']
2648            size = shutil.get_terminal_size()
2649        self.assertEqual(size.columns, 777)
2650
2651        with os_helper.EnvironmentVarGuard() as env:
2652            del env['COLUMNS']
2653            env['LINES'] = '888'
2654            size = shutil.get_terminal_size()
2655        self.assertEqual(size.lines, 888)
2656
2657    def test_bad_environ(self):
2658        with os_helper.EnvironmentVarGuard() as env:
2659            env['COLUMNS'] = 'xxx'
2660            env['LINES'] = 'yyy'
2661            size = shutil.get_terminal_size()
2662        self.assertGreaterEqual(size.columns, 0)
2663        self.assertGreaterEqual(size.lines, 0)
2664
2665    @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty")
2666    @unittest.skipUnless(hasattr(os, 'get_terminal_size'),
2667                         'need os.get_terminal_size()')
2668    def test_stty_match(self):
2669        """Check if stty returns the same results ignoring env
2670
2671        This test will fail if stdin and stdout are connected to
2672        different terminals with different sizes. Nevertheless, such
2673        situations should be pretty rare.
2674        """
2675        try:
2676            size = subprocess.check_output(['stty', 'size']).decode().split()
2677        except (FileNotFoundError, PermissionError,
2678                subprocess.CalledProcessError):
2679            self.skipTest("stty invocation failed")
2680        expected = (int(size[1]), int(size[0])) # reversed order
2681
2682        with os_helper.EnvironmentVarGuard() as env:
2683            del env['LINES']
2684            del env['COLUMNS']
2685            actual = shutil.get_terminal_size()
2686
2687        self.assertEqual(expected, actual)
2688
2689    @unittest.skipIf(support.is_wasi, "WASI has no /dev/null")
2690    def test_fallback(self):
2691        with os_helper.EnvironmentVarGuard() as env:
2692            del env['LINES']
2693            del env['COLUMNS']
2694
2695            # sys.__stdout__ has no fileno()
2696            with support.swap_attr(sys, '__stdout__', None):
2697                size = shutil.get_terminal_size(fallback=(10, 20))
2698            self.assertEqual(size.columns, 10)
2699            self.assertEqual(size.lines, 20)
2700
2701            # sys.__stdout__ is not a terminal on Unix
2702            # or fileno() not in (0, 1, 2) on Windows
2703            with open(os.devnull, 'w', encoding='utf-8') as f, \
2704                 support.swap_attr(sys, '__stdout__', f):
2705                size = shutil.get_terminal_size(fallback=(30, 40))
2706            self.assertEqual(size.columns, 30)
2707            self.assertEqual(size.lines, 40)
2708
2709
2710class PublicAPITests(unittest.TestCase):
2711    """Ensures that the correct values are exposed in the public API."""
2712
2713    def test_module_all_attribute(self):
2714        self.assertTrue(hasattr(shutil, '__all__'))
2715        target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat',
2716                      'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error',
2717                      'SpecialFileError', 'ExecError', 'make_archive',
2718                      'get_archive_formats', 'register_archive_format',
2719                      'unregister_archive_format', 'get_unpack_formats',
2720                      'register_unpack_format', 'unregister_unpack_format',
2721                      'unpack_archive', 'ignore_patterns', 'chown', 'which',
2722                      'get_terminal_size', 'SameFileError']
2723        if hasattr(os, 'statvfs') or os.name == 'nt':
2724            target_api.append('disk_usage')
2725        self.assertEqual(set(shutil.__all__), set(target_api))
2726
2727
2728if __name__ == '__main__':
2729    unittest.main()
2730