1# Copyright (C) 2003 Python Software Foundation 2 3import unittest 4import unittest.mock 5import shutil 6import tempfile 7import sys 8import stat 9import os 10import os.path 11import errno 12import functools 13import pathlib 14import subprocess 15import random 16import string 17import contextlib 18import io 19from shutil import (make_archive, 20 register_archive_format, unregister_archive_format, 21 get_archive_formats, Error, unpack_archive, 22 register_unpack_format, RegistryError, 23 unregister_unpack_format, get_unpack_formats, 24 SameFileError, _GiveupOnFastCopy) 25import tarfile 26import zipfile 27try: 28 import posix 29except ImportError: 30 posix = None 31 32from test import support 33from test.support import os_helper 34from test.support.os_helper import TESTFN, FakePath 35from test.support import warnings_helper 36 37TESTFN2 = TESTFN + "2" 38TESTFN_SRC = TESTFN + "_SRC" 39TESTFN_DST = TESTFN + "_DST" 40MACOS = sys.platform.startswith("darwin") 41SOLARIS = sys.platform.startswith("sunos") 42AIX = sys.platform[:3] == 'aix' 43try: 44 import grp 45 import pwd 46 UID_GID_SUPPORT = True 47except ImportError: 48 UID_GID_SUPPORT = False 49 50try: 51 import _winapi 52except ImportError: 53 _winapi = None 54 55no_chdir = unittest.mock.patch('os.chdir', 56 side_effect=AssertionError("shouldn't call os.chdir()")) 57 58def _fake_rename(*args, **kwargs): 59 # Pretend the destination path is on a different filesystem. 60 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link") 61 62def mock_rename(func): 63 @functools.wraps(func) 64 def wrap(*args, **kwargs): 65 try: 66 builtin_rename = os.rename 67 os.rename = _fake_rename 68 return func(*args, **kwargs) 69 finally: 70 os.rename = builtin_rename 71 return wrap 72 73def write_file(path, content, binary=False): 74 """Write *content* to a file located at *path*. 75 76 If *path* is a tuple instead of a string, os.path.join will be used to 77 make a path. If *binary* is true, the file will be opened in binary 78 mode. 79 """ 80 if isinstance(path, tuple): 81 path = os.path.join(*path) 82 mode = 'wb' if binary else 'w' 83 encoding = None if binary else "utf-8" 84 with open(path, mode, encoding=encoding) as fp: 85 fp.write(content) 86 87def write_test_file(path, size): 88 """Create a test file with an arbitrary size and random text content.""" 89 def chunks(total, step): 90 assert total >= step 91 while total > step: 92 yield step 93 total -= step 94 if total: 95 yield total 96 97 bufsize = min(size, 8192) 98 chunk = b"".join([random.choice(string.ascii_letters).encode() 99 for i in range(bufsize)]) 100 with open(path, 'wb') as f: 101 for csize in chunks(size, bufsize): 102 f.write(chunk) 103 assert os.path.getsize(path) == size 104 105def read_file(path, binary=False): 106 """Return contents from a file located at *path*. 107 108 If *path* is a tuple instead of a string, os.path.join will be used to 109 make a path. If *binary* is true, the file will be opened in binary 110 mode. 111 """ 112 if isinstance(path, tuple): 113 path = os.path.join(*path) 114 mode = 'rb' if binary else 'r' 115 encoding = None if binary else "utf-8" 116 with open(path, mode, encoding=encoding) as fp: 117 return fp.read() 118 119def rlistdir(path): 120 res = [] 121 for name in sorted(os.listdir(path)): 122 p = os.path.join(path, name) 123 if os.path.isdir(p) and not os.path.islink(p): 124 res.append(name + '/') 125 for n in rlistdir(p): 126 res.append(name + '/' + n) 127 else: 128 res.append(name) 129 return res 130 131def supports_file2file_sendfile(): 132 # ...apparently Linux and Solaris are the only ones 133 if not hasattr(os, "sendfile"): 134 return False 135 srcname = None 136 dstname = None 137 try: 138 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f: 139 srcname = f.name 140 f.write(b"0123456789") 141 142 with open(srcname, "rb") as src: 143 with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst: 144 dstname = dst.name 145 infd = src.fileno() 146 outfd = dst.fileno() 147 try: 148 os.sendfile(outfd, infd, 0, 2) 149 except OSError: 150 return False 151 else: 152 return True 153 finally: 154 if srcname is not None: 155 os_helper.unlink(srcname) 156 if dstname is not None: 157 os_helper.unlink(dstname) 158 159 160SUPPORTS_SENDFILE = supports_file2file_sendfile() 161 162# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test 163# The AIX command 'dump -o program' gives XCOFF header information 164# The second word of the last line in the maxdata value 165# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed 166def _maxdataOK(): 167 if AIX and sys.maxsize == 2147483647: 168 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable) 169 maxdata=hdrs.split("\n")[-1].split()[1] 170 return int(maxdata,16) >= 0x20000000 171 else: 172 return True 173 174 175class BaseTest: 176 177 def mkdtemp(self, prefix=None): 178 """Create a temporary directory that will be cleaned up. 179 180 Returns the path of the directory. 181 """ 182 d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd()) 183 self.addCleanup(os_helper.rmtree, d) 184 return d 185 186 187class TestRmTree(BaseTest, unittest.TestCase): 188 189 def test_rmtree_works_on_bytes(self): 190 tmp = self.mkdtemp() 191 victim = os.path.join(tmp, 'killme') 192 os.mkdir(victim) 193 write_file(os.path.join(victim, 'somefile'), 'foo') 194 victim = os.fsencode(victim) 195 self.assertIsInstance(victim, bytes) 196 shutil.rmtree(victim) 197 198 @os_helper.skip_unless_symlink 199 def test_rmtree_fails_on_symlink(self): 200 tmp = self.mkdtemp() 201 dir_ = os.path.join(tmp, 'dir') 202 os.mkdir(dir_) 203 link = os.path.join(tmp, 'link') 204 os.symlink(dir_, link) 205 self.assertRaises(OSError, shutil.rmtree, link) 206 self.assertTrue(os.path.exists(dir_)) 207 self.assertTrue(os.path.lexists(link)) 208 errors = [] 209 def onerror(*args): 210 errors.append(args) 211 shutil.rmtree(link, onerror=onerror) 212 self.assertEqual(len(errors), 1) 213 self.assertIs(errors[0][0], os.path.islink) 214 self.assertEqual(errors[0][1], link) 215 self.assertIsInstance(errors[0][2][1], OSError) 216 217 @os_helper.skip_unless_symlink 218 def test_rmtree_works_on_symlinks(self): 219 tmp = self.mkdtemp() 220 dir1 = os.path.join(tmp, 'dir1') 221 dir2 = os.path.join(dir1, 'dir2') 222 dir3 = os.path.join(tmp, 'dir3') 223 for d in dir1, dir2, dir3: 224 os.mkdir(d) 225 file1 = os.path.join(tmp, 'file1') 226 write_file(file1, 'foo') 227 link1 = os.path.join(dir1, 'link1') 228 os.symlink(dir2, link1) 229 link2 = os.path.join(dir1, 'link2') 230 os.symlink(dir3, link2) 231 link3 = os.path.join(dir1, 'link3') 232 os.symlink(file1, link3) 233 # make sure symlinks are removed but not followed 234 shutil.rmtree(dir1) 235 self.assertFalse(os.path.exists(dir1)) 236 self.assertTrue(os.path.exists(dir3)) 237 self.assertTrue(os.path.exists(file1)) 238 239 @unittest.skipUnless(_winapi, 'only relevant on Windows') 240 def test_rmtree_fails_on_junctions(self): 241 tmp = self.mkdtemp() 242 dir_ = os.path.join(tmp, 'dir') 243 os.mkdir(dir_) 244 link = os.path.join(tmp, 'link') 245 _winapi.CreateJunction(dir_, link) 246 self.addCleanup(os_helper.unlink, link) 247 self.assertRaises(OSError, shutil.rmtree, link) 248 self.assertTrue(os.path.exists(dir_)) 249 self.assertTrue(os.path.lexists(link)) 250 errors = [] 251 def onerror(*args): 252 errors.append(args) 253 shutil.rmtree(link, onerror=onerror) 254 self.assertEqual(len(errors), 1) 255 self.assertIs(errors[0][0], os.path.islink) 256 self.assertEqual(errors[0][1], link) 257 self.assertIsInstance(errors[0][2][1], OSError) 258 259 @unittest.skipUnless(_winapi, 'only relevant on Windows') 260 def test_rmtree_works_on_junctions(self): 261 tmp = self.mkdtemp() 262 dir1 = os.path.join(tmp, 'dir1') 263 dir2 = os.path.join(dir1, 'dir2') 264 dir3 = os.path.join(tmp, 'dir3') 265 for d in dir1, dir2, dir3: 266 os.mkdir(d) 267 file1 = os.path.join(tmp, 'file1') 268 write_file(file1, 'foo') 269 link1 = os.path.join(dir1, 'link1') 270 _winapi.CreateJunction(dir2, link1) 271 link2 = os.path.join(dir1, 'link2') 272 _winapi.CreateJunction(dir3, link2) 273 link3 = os.path.join(dir1, 'link3') 274 _winapi.CreateJunction(file1, link3) 275 # make sure junctions are removed but not followed 276 shutil.rmtree(dir1) 277 self.assertFalse(os.path.exists(dir1)) 278 self.assertTrue(os.path.exists(dir3)) 279 self.assertTrue(os.path.exists(file1)) 280 281 def test_rmtree_errors(self): 282 # filename is guaranteed not to exist 283 filename = tempfile.mktemp(dir=self.mkdtemp()) 284 self.assertRaises(FileNotFoundError, shutil.rmtree, filename) 285 # test that ignore_errors option is honored 286 shutil.rmtree(filename, ignore_errors=True) 287 288 # existing file 289 tmpdir = self.mkdtemp() 290 write_file((tmpdir, "tstfile"), "") 291 filename = os.path.join(tmpdir, "tstfile") 292 with self.assertRaises(NotADirectoryError) as cm: 293 shutil.rmtree(filename) 294 self.assertEqual(cm.exception.filename, filename) 295 self.assertTrue(os.path.exists(filename)) 296 # test that ignore_errors option is honored 297 shutil.rmtree(filename, ignore_errors=True) 298 self.assertTrue(os.path.exists(filename)) 299 errors = [] 300 def onerror(*args): 301 errors.append(args) 302 shutil.rmtree(filename, onerror=onerror) 303 self.assertEqual(len(errors), 2) 304 self.assertIs(errors[0][0], os.scandir) 305 self.assertEqual(errors[0][1], filename) 306 self.assertIsInstance(errors[0][2][1], NotADirectoryError) 307 self.assertEqual(errors[0][2][1].filename, filename) 308 self.assertIs(errors[1][0], os.rmdir) 309 self.assertEqual(errors[1][1], filename) 310 self.assertIsInstance(errors[1][2][1], NotADirectoryError) 311 self.assertEqual(errors[1][2][1].filename, filename) 312 313 314 @unittest.skipIf(sys.platform[:6] == 'cygwin', 315 "This test can't be run on Cygwin (issue #1071513).") 316 @os_helper.skip_if_dac_override 317 @os_helper.skip_unless_working_chmod 318 def test_on_error(self): 319 self.errorState = 0 320 os.mkdir(TESTFN) 321 self.addCleanup(shutil.rmtree, TESTFN) 322 323 self.child_file_path = os.path.join(TESTFN, 'a') 324 self.child_dir_path = os.path.join(TESTFN, 'b') 325 os_helper.create_empty_file(self.child_file_path) 326 os.mkdir(self.child_dir_path) 327 old_dir_mode = os.stat(TESTFN).st_mode 328 old_child_file_mode = os.stat(self.child_file_path).st_mode 329 old_child_dir_mode = os.stat(self.child_dir_path).st_mode 330 # Make unwritable. 331 new_mode = stat.S_IREAD|stat.S_IEXEC 332 os.chmod(self.child_file_path, new_mode) 333 os.chmod(self.child_dir_path, new_mode) 334 os.chmod(TESTFN, new_mode) 335 336 self.addCleanup(os.chmod, TESTFN, old_dir_mode) 337 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) 338 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) 339 340 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) 341 # Test whether onerror has actually been called. 342 self.assertEqual(self.errorState, 3, 343 "Expected call to onerror function did not happen.") 344 345 def check_args_to_onerror(self, func, arg, exc): 346 # test_rmtree_errors deliberately runs rmtree 347 # on a directory that is chmod 500, which will fail. 348 # This function is run when shutil.rmtree fails. 349 # 99.9% of the time it initially fails to remove 350 # a file in the directory, so the first time through 351 # func is os.remove. 352 # However, some Linux machines running ZFS on 353 # FUSE experienced a failure earlier in the process 354 # at os.listdir. The first failure may legally 355 # be either. 356 if self.errorState < 2: 357 if func is os.unlink: 358 self.assertEqual(arg, self.child_file_path) 359 elif func is os.rmdir: 360 self.assertEqual(arg, self.child_dir_path) 361 else: 362 self.assertIs(func, os.listdir) 363 self.assertIn(arg, [TESTFN, self.child_dir_path]) 364 self.assertTrue(issubclass(exc[0], OSError)) 365 self.errorState += 1 366 else: 367 self.assertEqual(func, os.rmdir) 368 self.assertEqual(arg, TESTFN) 369 self.assertTrue(issubclass(exc[0], OSError)) 370 self.errorState = 3 371 372 def test_rmtree_does_not_choke_on_failing_lstat(self): 373 try: 374 orig_lstat = os.lstat 375 def raiser(fn, *args, **kwargs): 376 if fn != TESTFN: 377 raise OSError() 378 else: 379 return orig_lstat(fn) 380 os.lstat = raiser 381 382 os.mkdir(TESTFN) 383 write_file((TESTFN, 'foo'), 'foo') 384 shutil.rmtree(TESTFN) 385 finally: 386 os.lstat = orig_lstat 387 388 def test_rmtree_uses_safe_fd_version_if_available(self): 389 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 390 os.supports_dir_fd and 391 os.listdir in os.supports_fd and 392 os.stat in os.supports_follow_symlinks) 393 if _use_fd_functions: 394 self.assertTrue(shutil._use_fd_functions) 395 self.assertTrue(shutil.rmtree.avoids_symlink_attacks) 396 tmp_dir = self.mkdtemp() 397 d = os.path.join(tmp_dir, 'a') 398 os.mkdir(d) 399 try: 400 real_rmtree = shutil._rmtree_safe_fd 401 class Called(Exception): pass 402 def _raiser(*args, **kwargs): 403 raise Called 404 shutil._rmtree_safe_fd = _raiser 405 self.assertRaises(Called, shutil.rmtree, d) 406 finally: 407 shutil._rmtree_safe_fd = real_rmtree 408 else: 409 self.assertFalse(shutil._use_fd_functions) 410 self.assertFalse(shutil.rmtree.avoids_symlink_attacks) 411 412 @unittest.skipUnless(shutil._use_fd_functions, "dir_fd is not supported") 413 def test_rmtree_with_dir_fd(self): 414 tmp_dir = self.mkdtemp() 415 victim = 'killme' 416 fullname = os.path.join(tmp_dir, victim) 417 dir_fd = os.open(tmp_dir, os.O_RDONLY) 418 self.addCleanup(os.close, dir_fd) 419 os.mkdir(fullname) 420 os.mkdir(os.path.join(fullname, 'subdir')) 421 write_file(os.path.join(fullname, 'subdir', 'somefile'), 'foo') 422 self.assertTrue(os.path.exists(fullname)) 423 shutil.rmtree(victim, dir_fd=dir_fd) 424 self.assertFalse(os.path.exists(fullname)) 425 426 @unittest.skipIf(shutil._use_fd_functions, "dir_fd is supported") 427 def test_rmtree_with_dir_fd_unsupported(self): 428 tmp_dir = self.mkdtemp() 429 with self.assertRaises(NotImplementedError): 430 shutil.rmtree(tmp_dir, dir_fd=0) 431 self.assertTrue(os.path.exists(tmp_dir)) 432 433 def test_rmtree_dont_delete_file(self): 434 # When called on a file instead of a directory, don't delete it. 435 handle, path = tempfile.mkstemp(dir=self.mkdtemp()) 436 os.close(handle) 437 self.assertRaises(NotADirectoryError, shutil.rmtree, path) 438 os.remove(path) 439 440 @os_helper.skip_unless_symlink 441 def test_rmtree_on_symlink(self): 442 # bug 1669. 443 os.mkdir(TESTFN) 444 try: 445 src = os.path.join(TESTFN, 'cheese') 446 dst = os.path.join(TESTFN, 'shop') 447 os.mkdir(src) 448 os.symlink(src, dst) 449 self.assertRaises(OSError, shutil.rmtree, dst) 450 shutil.rmtree(dst, ignore_errors=True) 451 finally: 452 shutil.rmtree(TESTFN, ignore_errors=True) 453 454 @unittest.skipUnless(_winapi, 'only relevant on Windows') 455 def test_rmtree_on_junction(self): 456 os.mkdir(TESTFN) 457 try: 458 src = os.path.join(TESTFN, 'cheese') 459 dst = os.path.join(TESTFN, 'shop') 460 os.mkdir(src) 461 open(os.path.join(src, 'spam'), 'wb').close() 462 _winapi.CreateJunction(src, dst) 463 self.assertRaises(OSError, shutil.rmtree, dst) 464 shutil.rmtree(dst, ignore_errors=True) 465 finally: 466 shutil.rmtree(TESTFN, ignore_errors=True) 467 468 469class TestCopyTree(BaseTest, unittest.TestCase): 470 471 def test_copytree_simple(self): 472 src_dir = self.mkdtemp() 473 dst_dir = os.path.join(self.mkdtemp(), 'destination') 474 self.addCleanup(shutil.rmtree, src_dir) 475 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 476 write_file((src_dir, 'test.txt'), '123') 477 os.mkdir(os.path.join(src_dir, 'test_dir')) 478 write_file((src_dir, 'test_dir', 'test.txt'), '456') 479 480 shutil.copytree(src_dir, dst_dir) 481 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) 482 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) 483 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', 484 'test.txt'))) 485 actual = read_file((dst_dir, 'test.txt')) 486 self.assertEqual(actual, '123') 487 actual = read_file((dst_dir, 'test_dir', 'test.txt')) 488 self.assertEqual(actual, '456') 489 490 def test_copytree_dirs_exist_ok(self): 491 src_dir = self.mkdtemp() 492 dst_dir = self.mkdtemp() 493 self.addCleanup(shutil.rmtree, src_dir) 494 self.addCleanup(shutil.rmtree, dst_dir) 495 496 write_file((src_dir, 'nonexisting.txt'), '123') 497 os.mkdir(os.path.join(src_dir, 'existing_dir')) 498 os.mkdir(os.path.join(dst_dir, 'existing_dir')) 499 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced') 500 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced') 501 502 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True) 503 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt'))) 504 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir'))) 505 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir', 506 'existing.txt'))) 507 actual = read_file((dst_dir, 'nonexisting.txt')) 508 self.assertEqual(actual, '123') 509 actual = read_file((dst_dir, 'existing_dir', 'existing.txt')) 510 self.assertEqual(actual, 'has been replaced') 511 512 with self.assertRaises(FileExistsError): 513 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False) 514 515 @os_helper.skip_unless_symlink 516 def test_copytree_symlinks(self): 517 tmp_dir = self.mkdtemp() 518 src_dir = os.path.join(tmp_dir, 'src') 519 dst_dir = os.path.join(tmp_dir, 'dst') 520 sub_dir = os.path.join(src_dir, 'sub') 521 os.mkdir(src_dir) 522 os.mkdir(sub_dir) 523 write_file((src_dir, 'file.txt'), 'foo') 524 src_link = os.path.join(sub_dir, 'link') 525 dst_link = os.path.join(dst_dir, 'sub/link') 526 os.symlink(os.path.join(src_dir, 'file.txt'), 527 src_link) 528 if hasattr(os, 'lchmod'): 529 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 530 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 531 os.lchflags(src_link, stat.UF_NODUMP) 532 src_stat = os.lstat(src_link) 533 shutil.copytree(src_dir, dst_dir, symlinks=True) 534 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link'))) 535 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link')) 536 # Bad practice to blindly strip the prefix as it may be required to 537 # correctly refer to the file, but we're only comparing paths here. 538 if os.name == 'nt' and actual.startswith('\\\\?\\'): 539 actual = actual[4:] 540 self.assertEqual(actual, os.path.join(src_dir, 'file.txt')) 541 dst_stat = os.lstat(dst_link) 542 if hasattr(os, 'lchmod'): 543 self.assertEqual(dst_stat.st_mode, src_stat.st_mode) 544 if hasattr(os, 'lchflags'): 545 self.assertEqual(dst_stat.st_flags, src_stat.st_flags) 546 547 def test_copytree_with_exclude(self): 548 # creating data 549 join = os.path.join 550 exists = os.path.exists 551 src_dir = self.mkdtemp() 552 try: 553 dst_dir = join(self.mkdtemp(), 'destination') 554 write_file((src_dir, 'test.txt'), '123') 555 write_file((src_dir, 'test.tmp'), '123') 556 os.mkdir(join(src_dir, 'test_dir')) 557 write_file((src_dir, 'test_dir', 'test.txt'), '456') 558 os.mkdir(join(src_dir, 'test_dir2')) 559 write_file((src_dir, 'test_dir2', 'test.txt'), '456') 560 os.mkdir(join(src_dir, 'test_dir2', 'subdir')) 561 os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) 562 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') 563 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') 564 565 # testing glob-like patterns 566 try: 567 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2') 568 shutil.copytree(src_dir, dst_dir, ignore=patterns) 569 # checking the result: some elements should not be copied 570 self.assertTrue(exists(join(dst_dir, 'test.txt'))) 571 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 572 self.assertFalse(exists(join(dst_dir, 'test_dir2'))) 573 finally: 574 shutil.rmtree(dst_dir) 575 try: 576 patterns = shutil.ignore_patterns('*.tmp', 'subdir*') 577 shutil.copytree(src_dir, dst_dir, ignore=patterns) 578 # checking the result: some elements should not be copied 579 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 580 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2'))) 581 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 582 finally: 583 shutil.rmtree(dst_dir) 584 585 # testing callable-style 586 try: 587 def _filter(src, names): 588 res = [] 589 for name in names: 590 path = os.path.join(src, name) 591 592 if (os.path.isdir(path) and 593 path.split()[-1] == 'subdir'): 594 res.append(name) 595 elif os.path.splitext(path)[-1] in ('.py'): 596 res.append(name) 597 return res 598 599 shutil.copytree(src_dir, dst_dir, ignore=_filter) 600 601 # checking the result: some elements should not be copied 602 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2', 603 'test.py'))) 604 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 605 606 finally: 607 shutil.rmtree(dst_dir) 608 finally: 609 shutil.rmtree(src_dir) 610 shutil.rmtree(os.path.dirname(dst_dir)) 611 612 def test_copytree_arg_types_of_ignore(self): 613 join = os.path.join 614 exists = os.path.exists 615 616 tmp_dir = self.mkdtemp() 617 src_dir = join(tmp_dir, "source") 618 619 os.mkdir(join(src_dir)) 620 os.mkdir(join(src_dir, 'test_dir')) 621 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir')) 622 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456') 623 624 invokations = [] 625 626 def _ignore(src, names): 627 invokations.append(src) 628 self.assertIsInstance(src, str) 629 self.assertIsInstance(names, list) 630 self.assertEqual(len(names), len(set(names))) 631 for name in names: 632 self.assertIsInstance(name, str) 633 return [] 634 635 dst_dir = join(self.mkdtemp(), 'destination') 636 shutil.copytree(src_dir, dst_dir, ignore=_ignore) 637 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 638 'test.txt'))) 639 640 dst_dir = join(self.mkdtemp(), 'destination') 641 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore) 642 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 643 'test.txt'))) 644 645 dst_dir = join(self.mkdtemp(), 'destination') 646 src_dir_entry = list(os.scandir(tmp_dir))[0] 647 self.assertIsInstance(src_dir_entry, os.DirEntry) 648 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore) 649 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 650 'test.txt'))) 651 652 self.assertEqual(len(invokations), 9) 653 654 def test_copytree_retains_permissions(self): 655 tmp_dir = self.mkdtemp() 656 src_dir = os.path.join(tmp_dir, 'source') 657 os.mkdir(src_dir) 658 dst_dir = os.path.join(tmp_dir, 'destination') 659 self.addCleanup(shutil.rmtree, tmp_dir) 660 661 os.chmod(src_dir, 0o777) 662 write_file((src_dir, 'permissive.txt'), '123') 663 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777) 664 write_file((src_dir, 'restrictive.txt'), '456') 665 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600) 666 restrictive_subdir = tempfile.mkdtemp(dir=src_dir) 667 self.addCleanup(os_helper.rmtree, restrictive_subdir) 668 os.chmod(restrictive_subdir, 0o600) 669 670 shutil.copytree(src_dir, dst_dir) 671 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode) 672 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode, 673 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode) 674 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode, 675 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode) 676 restrictive_subdir_dst = os.path.join(dst_dir, 677 os.path.split(restrictive_subdir)[1]) 678 self.assertEqual(os.stat(restrictive_subdir).st_mode, 679 os.stat(restrictive_subdir_dst).st_mode) 680 681 @unittest.mock.patch('os.chmod') 682 def test_copytree_winerror(self, mock_patch): 683 # When copying to VFAT, copystat() raises OSError. On Windows, the 684 # exception object has a meaningful 'winerror' attribute, but not 685 # on other operating systems. Do not assume 'winerror' is set. 686 src_dir = self.mkdtemp() 687 dst_dir = os.path.join(self.mkdtemp(), 'destination') 688 self.addCleanup(shutil.rmtree, src_dir) 689 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 690 691 mock_patch.side_effect = PermissionError('ka-boom') 692 with self.assertRaises(shutil.Error): 693 shutil.copytree(src_dir, dst_dir) 694 695 def test_copytree_custom_copy_function(self): 696 # See: https://bugs.python.org/issue35648 697 def custom_cpfun(a, b): 698 flag.append(None) 699 self.assertIsInstance(a, str) 700 self.assertIsInstance(b, str) 701 self.assertEqual(a, os.path.join(src, 'foo')) 702 self.assertEqual(b, os.path.join(dst, 'foo')) 703 704 flag = [] 705 src = self.mkdtemp() 706 dst = tempfile.mktemp(dir=self.mkdtemp()) 707 with open(os.path.join(src, 'foo'), 'w', encoding='utf-8') as f: 708 f.close() 709 shutil.copytree(src, dst, copy_function=custom_cpfun) 710 self.assertEqual(len(flag), 1) 711 712 # Issue #3002: copyfile and copytree block indefinitely on named pipes 713 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 714 @os_helper.skip_unless_symlink 715 @unittest.skipIf(sys.platform == "vxworks", 716 "fifo requires special path on VxWorks") 717 def test_copytree_named_pipe(self): 718 os.mkdir(TESTFN) 719 try: 720 subdir = os.path.join(TESTFN, "subdir") 721 os.mkdir(subdir) 722 pipe = os.path.join(subdir, "mypipe") 723 try: 724 os.mkfifo(pipe) 725 except PermissionError as e: 726 self.skipTest('os.mkfifo(): %s' % e) 727 try: 728 shutil.copytree(TESTFN, TESTFN2) 729 except shutil.Error as e: 730 errors = e.args[0] 731 self.assertEqual(len(errors), 1) 732 src, dst, error_msg = errors[0] 733 self.assertEqual("`%s` is a named pipe" % pipe, error_msg) 734 else: 735 self.fail("shutil.Error should have been raised") 736 finally: 737 shutil.rmtree(TESTFN, ignore_errors=True) 738 shutil.rmtree(TESTFN2, ignore_errors=True) 739 740 def test_copytree_special_func(self): 741 src_dir = self.mkdtemp() 742 dst_dir = os.path.join(self.mkdtemp(), 'destination') 743 write_file((src_dir, 'test.txt'), '123') 744 os.mkdir(os.path.join(src_dir, 'test_dir')) 745 write_file((src_dir, 'test_dir', 'test.txt'), '456') 746 747 copied = [] 748 def _copy(src, dst): 749 copied.append((src, dst)) 750 751 shutil.copytree(src_dir, dst_dir, copy_function=_copy) 752 self.assertEqual(len(copied), 2) 753 754 @os_helper.skip_unless_symlink 755 def test_copytree_dangling_symlinks(self): 756 src_dir = self.mkdtemp() 757 valid_file = os.path.join(src_dir, 'test.txt') 758 write_file(valid_file, 'abc') 759 dir_a = os.path.join(src_dir, 'dir_a') 760 os.mkdir(dir_a) 761 for d in src_dir, dir_a: 762 os.symlink('IDONTEXIST', os.path.join(d, 'broken')) 763 os.symlink(valid_file, os.path.join(d, 'valid')) 764 765 # A dangling symlink should raise an error. 766 dst_dir = os.path.join(self.mkdtemp(), 'destination') 767 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) 768 769 # Dangling symlinks should be ignored with the proper flag. 770 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 771 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) 772 for root, dirs, files in os.walk(dst_dir): 773 self.assertNotIn('broken', files) 774 self.assertIn('valid', files) 775 776 # a dangling symlink is copied if symlinks=True 777 dst_dir = os.path.join(self.mkdtemp(), 'destination3') 778 shutil.copytree(src_dir, dst_dir, symlinks=True) 779 self.assertIn('test.txt', os.listdir(dst_dir)) 780 781 @os_helper.skip_unless_symlink 782 def test_copytree_symlink_dir(self): 783 src_dir = self.mkdtemp() 784 dst_dir = os.path.join(self.mkdtemp(), 'destination') 785 os.mkdir(os.path.join(src_dir, 'real_dir')) 786 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'wb'): 787 pass 788 os.symlink(os.path.join(src_dir, 'real_dir'), 789 os.path.join(src_dir, 'link_to_dir'), 790 target_is_directory=True) 791 792 shutil.copytree(src_dir, dst_dir, symlinks=False) 793 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 794 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 795 796 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 797 shutil.copytree(src_dir, dst_dir, symlinks=True) 798 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 799 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 800 801 def test_copytree_return_value(self): 802 # copytree returns its destination path. 803 src_dir = self.mkdtemp() 804 dst_dir = src_dir + "dest" 805 self.addCleanup(shutil.rmtree, dst_dir, True) 806 src = os.path.join(src_dir, 'foo') 807 write_file(src, 'foo') 808 rv = shutil.copytree(src_dir, dst_dir) 809 self.assertEqual(['foo'], os.listdir(rv)) 810 811 def test_copytree_subdirectory(self): 812 # copytree where dst is a subdirectory of src, see Issue 38688 813 base_dir = self.mkdtemp() 814 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True) 815 src_dir = os.path.join(base_dir, "t", "pg") 816 dst_dir = os.path.join(src_dir, "somevendor", "1.0") 817 os.makedirs(src_dir) 818 src = os.path.join(src_dir, 'pol') 819 write_file(src, 'pol') 820 rv = shutil.copytree(src_dir, dst_dir) 821 self.assertEqual(['pol'], os.listdir(rv)) 822 823class TestCopy(BaseTest, unittest.TestCase): 824 825 ### shutil.copymode 826 827 @os_helper.skip_unless_symlink 828 def test_copymode_follow_symlinks(self): 829 tmp_dir = self.mkdtemp() 830 src = os.path.join(tmp_dir, 'foo') 831 dst = os.path.join(tmp_dir, 'bar') 832 src_link = os.path.join(tmp_dir, 'baz') 833 dst_link = os.path.join(tmp_dir, 'quux') 834 write_file(src, 'foo') 835 write_file(dst, 'foo') 836 os.symlink(src, src_link) 837 os.symlink(dst, dst_link) 838 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 839 # file to file 840 os.chmod(dst, stat.S_IRWXO) 841 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 842 shutil.copymode(src, dst) 843 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 844 # On Windows, os.chmod does not follow symlinks (issue #15411) 845 if os.name != 'nt': 846 # follow src link 847 os.chmod(dst, stat.S_IRWXO) 848 shutil.copymode(src_link, dst) 849 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 850 # follow dst link 851 os.chmod(dst, stat.S_IRWXO) 852 shutil.copymode(src, dst_link) 853 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 854 # follow both links 855 os.chmod(dst, stat.S_IRWXO) 856 shutil.copymode(src_link, dst_link) 857 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 858 859 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') 860 @os_helper.skip_unless_symlink 861 def test_copymode_symlink_to_symlink(self): 862 tmp_dir = self.mkdtemp() 863 src = os.path.join(tmp_dir, 'foo') 864 dst = os.path.join(tmp_dir, 'bar') 865 src_link = os.path.join(tmp_dir, 'baz') 866 dst_link = os.path.join(tmp_dir, 'quux') 867 write_file(src, 'foo') 868 write_file(dst, 'foo') 869 os.symlink(src, src_link) 870 os.symlink(dst, dst_link) 871 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 872 os.chmod(dst, stat.S_IRWXU) 873 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) 874 # link to link 875 os.lchmod(dst_link, stat.S_IRWXO) 876 shutil.copymode(src_link, dst_link, follow_symlinks=False) 877 self.assertEqual(os.lstat(src_link).st_mode, 878 os.lstat(dst_link).st_mode) 879 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 880 # src link - use chmod 881 os.lchmod(dst_link, stat.S_IRWXO) 882 shutil.copymode(src_link, dst, follow_symlinks=False) 883 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 884 # dst link - use chmod 885 os.lchmod(dst_link, stat.S_IRWXO) 886 shutil.copymode(src, dst_link, follow_symlinks=False) 887 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 888 889 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') 890 @os_helper.skip_unless_symlink 891 def test_copymode_symlink_to_symlink_wo_lchmod(self): 892 tmp_dir = self.mkdtemp() 893 src = os.path.join(tmp_dir, 'foo') 894 dst = os.path.join(tmp_dir, 'bar') 895 src_link = os.path.join(tmp_dir, 'baz') 896 dst_link = os.path.join(tmp_dir, 'quux') 897 write_file(src, 'foo') 898 write_file(dst, 'foo') 899 os.symlink(src, src_link) 900 os.symlink(dst, dst_link) 901 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail 902 903 ### shutil.copystat 904 905 @os_helper.skip_unless_symlink 906 def test_copystat_symlinks(self): 907 tmp_dir = self.mkdtemp() 908 src = os.path.join(tmp_dir, 'foo') 909 dst = os.path.join(tmp_dir, 'bar') 910 src_link = os.path.join(tmp_dir, 'baz') 911 dst_link = os.path.join(tmp_dir, 'qux') 912 write_file(src, 'foo') 913 src_stat = os.stat(src) 914 os.utime(src, (src_stat.st_atime, 915 src_stat.st_mtime - 42.0)) # ensure different mtimes 916 write_file(dst, 'bar') 917 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) 918 os.symlink(src, src_link) 919 os.symlink(dst, dst_link) 920 if hasattr(os, 'lchmod'): 921 os.lchmod(src_link, stat.S_IRWXO) 922 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 923 os.lchflags(src_link, stat.UF_NODUMP) 924 src_link_stat = os.lstat(src_link) 925 # follow 926 if hasattr(os, 'lchmod'): 927 shutil.copystat(src_link, dst_link, follow_symlinks=True) 928 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) 929 # don't follow 930 shutil.copystat(src_link, dst_link, follow_symlinks=False) 931 dst_link_stat = os.lstat(dst_link) 932 if os.utime in os.supports_follow_symlinks: 933 for attr in 'st_atime', 'st_mtime': 934 # The modification times may be truncated in the new file. 935 self.assertLessEqual(getattr(src_link_stat, attr), 936 getattr(dst_link_stat, attr) + 1) 937 if hasattr(os, 'lchmod'): 938 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) 939 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 940 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) 941 # tell to follow but dst is not a link 942 shutil.copystat(src_link, dst, follow_symlinks=False) 943 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < 944 00000.1) 945 946 @unittest.skipUnless(hasattr(os, 'chflags') and 947 hasattr(errno, 'EOPNOTSUPP') and 948 hasattr(errno, 'ENOTSUP'), 949 "requires os.chflags, EOPNOTSUPP & ENOTSUP") 950 def test_copystat_handles_harmless_chflags_errors(self): 951 tmpdir = self.mkdtemp() 952 file1 = os.path.join(tmpdir, 'file1') 953 file2 = os.path.join(tmpdir, 'file2') 954 write_file(file1, 'xxx') 955 write_file(file2, 'xxx') 956 957 def make_chflags_raiser(err): 958 ex = OSError() 959 960 def _chflags_raiser(path, flags, *, follow_symlinks=True): 961 ex.errno = err 962 raise ex 963 return _chflags_raiser 964 old_chflags = os.chflags 965 try: 966 for err in errno.EOPNOTSUPP, errno.ENOTSUP: 967 os.chflags = make_chflags_raiser(err) 968 shutil.copystat(file1, file2) 969 # assert others errors break it 970 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) 971 self.assertRaises(OSError, shutil.copystat, file1, file2) 972 finally: 973 os.chflags = old_chflags 974 975 ### shutil.copyxattr 976 977 @os_helper.skip_unless_xattr 978 def test_copyxattr(self): 979 tmp_dir = self.mkdtemp() 980 src = os.path.join(tmp_dir, 'foo') 981 write_file(src, 'foo') 982 dst = os.path.join(tmp_dir, 'bar') 983 write_file(dst, 'bar') 984 985 # no xattr == no problem 986 shutil._copyxattr(src, dst) 987 # common case 988 os.setxattr(src, 'user.foo', b'42') 989 os.setxattr(src, 'user.bar', b'43') 990 shutil._copyxattr(src, dst) 991 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst))) 992 self.assertEqual( 993 os.getxattr(src, 'user.foo'), 994 os.getxattr(dst, 'user.foo')) 995 # check errors don't affect other attrs 996 os.remove(dst) 997 write_file(dst, 'bar') 998 os_error = OSError(errno.EPERM, 'EPERM') 999 1000 def _raise_on_user_foo(fname, attr, val, **kwargs): 1001 if attr == 'user.foo': 1002 raise os_error 1003 else: 1004 orig_setxattr(fname, attr, val, **kwargs) 1005 try: 1006 orig_setxattr = os.setxattr 1007 os.setxattr = _raise_on_user_foo 1008 shutil._copyxattr(src, dst) 1009 self.assertIn('user.bar', os.listxattr(dst)) 1010 finally: 1011 os.setxattr = orig_setxattr 1012 # the source filesystem not supporting xattrs should be ok, too. 1013 def _raise_on_src(fname, *, follow_symlinks=True): 1014 if fname == src: 1015 raise OSError(errno.ENOTSUP, 'Operation not supported') 1016 return orig_listxattr(fname, follow_symlinks=follow_symlinks) 1017 try: 1018 orig_listxattr = os.listxattr 1019 os.listxattr = _raise_on_src 1020 shutil._copyxattr(src, dst) 1021 finally: 1022 os.listxattr = orig_listxattr 1023 1024 # test that shutil.copystat copies xattrs 1025 src = os.path.join(tmp_dir, 'the_original') 1026 srcro = os.path.join(tmp_dir, 'the_original_ro') 1027 write_file(src, src) 1028 write_file(srcro, srcro) 1029 os.setxattr(src, 'user.the_value', b'fiddly') 1030 os.setxattr(srcro, 'user.the_value', b'fiddly') 1031 os.chmod(srcro, 0o444) 1032 dst = os.path.join(tmp_dir, 'the_copy') 1033 dstro = os.path.join(tmp_dir, 'the_copy_ro') 1034 write_file(dst, dst) 1035 write_file(dstro, dstro) 1036 shutil.copystat(src, dst) 1037 shutil.copystat(srcro, dstro) 1038 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly') 1039 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly') 1040 1041 @os_helper.skip_unless_symlink 1042 @os_helper.skip_unless_xattr 1043 @os_helper.skip_unless_dac_override 1044 def test_copyxattr_symlinks(self): 1045 # On Linux, it's only possible to access non-user xattr for symlinks; 1046 # which in turn require root privileges. This test should be expanded 1047 # as soon as other platforms gain support for extended attributes. 1048 tmp_dir = self.mkdtemp() 1049 src = os.path.join(tmp_dir, 'foo') 1050 src_link = os.path.join(tmp_dir, 'baz') 1051 write_file(src, 'foo') 1052 os.symlink(src, src_link) 1053 os.setxattr(src, 'trusted.foo', b'42') 1054 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) 1055 dst = os.path.join(tmp_dir, 'bar') 1056 dst_link = os.path.join(tmp_dir, 'qux') 1057 write_file(dst, 'bar') 1058 os.symlink(dst, dst_link) 1059 shutil._copyxattr(src_link, dst_link, follow_symlinks=False) 1060 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') 1061 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') 1062 shutil._copyxattr(src_link, dst, follow_symlinks=False) 1063 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') 1064 1065 ### shutil.copy 1066 1067 def _copy_file(self, method): 1068 fname = 'test.txt' 1069 tmpdir = self.mkdtemp() 1070 write_file((tmpdir, fname), 'xxx') 1071 file1 = os.path.join(tmpdir, fname) 1072 tmpdir2 = self.mkdtemp() 1073 method(file1, tmpdir2) 1074 file2 = os.path.join(tmpdir2, fname) 1075 return (file1, file2) 1076 1077 def test_copy(self): 1078 # Ensure that the copied file exists and has the same mode bits. 1079 file1, file2 = self._copy_file(shutil.copy) 1080 self.assertTrue(os.path.exists(file2)) 1081 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode) 1082 1083 @os_helper.skip_unless_symlink 1084 def test_copy_symlinks(self): 1085 tmp_dir = self.mkdtemp() 1086 src = os.path.join(tmp_dir, 'foo') 1087 dst = os.path.join(tmp_dir, 'bar') 1088 src_link = os.path.join(tmp_dir, 'baz') 1089 write_file(src, 'foo') 1090 os.symlink(src, src_link) 1091 if hasattr(os, 'lchmod'): 1092 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 1093 # don't follow 1094 shutil.copy(src_link, dst, follow_symlinks=True) 1095 self.assertFalse(os.path.islink(dst)) 1096 self.assertEqual(read_file(src), read_file(dst)) 1097 os.remove(dst) 1098 # follow 1099 shutil.copy(src_link, dst, follow_symlinks=False) 1100 self.assertTrue(os.path.islink(dst)) 1101 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 1102 if hasattr(os, 'lchmod'): 1103 self.assertEqual(os.lstat(src_link).st_mode, 1104 os.lstat(dst).st_mode) 1105 1106 ### shutil.copy2 1107 1108 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime') 1109 def test_copy2(self): 1110 # Ensure that the copied file exists and has the same mode and 1111 # modification time bits. 1112 file1, file2 = self._copy_file(shutil.copy2) 1113 self.assertTrue(os.path.exists(file2)) 1114 file1_stat = os.stat(file1) 1115 file2_stat = os.stat(file2) 1116 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode) 1117 for attr in 'st_atime', 'st_mtime': 1118 # The modification times may be truncated in the new file. 1119 self.assertLessEqual(getattr(file1_stat, attr), 1120 getattr(file2_stat, attr) + 1) 1121 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'): 1122 self.assertEqual(getattr(file1_stat, 'st_flags'), 1123 getattr(file2_stat, 'st_flags')) 1124 1125 @os_helper.skip_unless_symlink 1126 def test_copy2_symlinks(self): 1127 tmp_dir = self.mkdtemp() 1128 src = os.path.join(tmp_dir, 'foo') 1129 dst = os.path.join(tmp_dir, 'bar') 1130 src_link = os.path.join(tmp_dir, 'baz') 1131 write_file(src, 'foo') 1132 os.symlink(src, src_link) 1133 if hasattr(os, 'lchmod'): 1134 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 1135 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 1136 os.lchflags(src_link, stat.UF_NODUMP) 1137 src_stat = os.stat(src) 1138 src_link_stat = os.lstat(src_link) 1139 # follow 1140 shutil.copy2(src_link, dst, follow_symlinks=True) 1141 self.assertFalse(os.path.islink(dst)) 1142 self.assertEqual(read_file(src), read_file(dst)) 1143 os.remove(dst) 1144 # don't follow 1145 shutil.copy2(src_link, dst, follow_symlinks=False) 1146 self.assertTrue(os.path.islink(dst)) 1147 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 1148 dst_stat = os.lstat(dst) 1149 if os.utime in os.supports_follow_symlinks: 1150 for attr in 'st_atime', 'st_mtime': 1151 # The modification times may be truncated in the new file. 1152 self.assertLessEqual(getattr(src_link_stat, attr), 1153 getattr(dst_stat, attr) + 1) 1154 if hasattr(os, 'lchmod'): 1155 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) 1156 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) 1157 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 1158 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags) 1159 1160 @os_helper.skip_unless_xattr 1161 def test_copy2_xattr(self): 1162 tmp_dir = self.mkdtemp() 1163 src = os.path.join(tmp_dir, 'foo') 1164 dst = os.path.join(tmp_dir, 'bar') 1165 write_file(src, 'foo') 1166 os.setxattr(src, 'user.foo', b'42') 1167 shutil.copy2(src, dst) 1168 self.assertEqual( 1169 os.getxattr(src, 'user.foo'), 1170 os.getxattr(dst, 'user.foo')) 1171 os.remove(dst) 1172 1173 def test_copy_return_value(self): 1174 # copy and copy2 both return their destination path. 1175 for fn in (shutil.copy, shutil.copy2): 1176 src_dir = self.mkdtemp() 1177 dst_dir = self.mkdtemp() 1178 src = os.path.join(src_dir, 'foo') 1179 write_file(src, 'foo') 1180 rv = fn(src, dst_dir) 1181 self.assertEqual(rv, os.path.join(dst_dir, 'foo')) 1182 rv = fn(src, os.path.join(dst_dir, 'bar')) 1183 self.assertEqual(rv, os.path.join(dst_dir, 'bar')) 1184 1185 def test_copy_dir(self): 1186 self._test_copy_dir(shutil.copy) 1187 1188 def test_copy2_dir(self): 1189 self._test_copy_dir(shutil.copy2) 1190 1191 def _test_copy_dir(self, copy_func): 1192 src_dir = self.mkdtemp() 1193 src_file = os.path.join(src_dir, 'foo') 1194 dir2 = self.mkdtemp() 1195 dst = os.path.join(src_dir, 'does_not_exist/') 1196 write_file(src_file, 'foo') 1197 if sys.platform == "win32": 1198 err = PermissionError 1199 else: 1200 err = IsADirectoryError 1201 self.assertRaises(err, copy_func, dir2, src_dir) 1202 1203 # raise *err* because of src rather than FileNotFoundError because of dst 1204 self.assertRaises(err, copy_func, dir2, dst) 1205 copy_func(src_file, dir2) # should not raise exceptions 1206 1207 ### shutil.copyfile 1208 1209 @os_helper.skip_unless_symlink 1210 def test_copyfile_symlinks(self): 1211 tmp_dir = self.mkdtemp() 1212 src = os.path.join(tmp_dir, 'src') 1213 dst = os.path.join(tmp_dir, 'dst') 1214 dst_link = os.path.join(tmp_dir, 'dst_link') 1215 link = os.path.join(tmp_dir, 'link') 1216 write_file(src, 'foo') 1217 os.symlink(src, link) 1218 # don't follow 1219 shutil.copyfile(link, dst_link, follow_symlinks=False) 1220 self.assertTrue(os.path.islink(dst_link)) 1221 self.assertEqual(os.readlink(link), os.readlink(dst_link)) 1222 # follow 1223 shutil.copyfile(link, dst) 1224 self.assertFalse(os.path.islink(dst)) 1225 1226 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') 1227 def test_dont_copy_file_onto_link_to_itself(self): 1228 # bug 851123. 1229 os.mkdir(TESTFN) 1230 src = os.path.join(TESTFN, 'cheese') 1231 dst = os.path.join(TESTFN, 'shop') 1232 try: 1233 with open(src, 'w', encoding='utf-8') as f: 1234 f.write('cheddar') 1235 try: 1236 os.link(src, dst) 1237 except PermissionError as e: 1238 self.skipTest('os.link(): %s' % e) 1239 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 1240 with open(src, 'r', encoding='utf-8') as f: 1241 self.assertEqual(f.read(), 'cheddar') 1242 os.remove(dst) 1243 finally: 1244 shutil.rmtree(TESTFN, ignore_errors=True) 1245 1246 @os_helper.skip_unless_symlink 1247 def test_dont_copy_file_onto_symlink_to_itself(self): 1248 # bug 851123. 1249 os.mkdir(TESTFN) 1250 src = os.path.join(TESTFN, 'cheese') 1251 dst = os.path.join(TESTFN, 'shop') 1252 try: 1253 with open(src, 'w', encoding='utf-8') as f: 1254 f.write('cheddar') 1255 # Using `src` here would mean we end up with a symlink pointing 1256 # to TESTFN/TESTFN/cheese, while it should point at 1257 # TESTFN/cheese. 1258 os.symlink('cheese', dst) 1259 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 1260 with open(src, 'r', encoding='utf-8') as f: 1261 self.assertEqual(f.read(), 'cheddar') 1262 os.remove(dst) 1263 finally: 1264 shutil.rmtree(TESTFN, ignore_errors=True) 1265 1266 # Issue #3002: copyfile and copytree block indefinitely on named pipes 1267 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 1268 @unittest.skipIf(sys.platform == "vxworks", 1269 "fifo requires special path on VxWorks") 1270 def test_copyfile_named_pipe(self): 1271 try: 1272 os.mkfifo(TESTFN) 1273 except PermissionError as e: 1274 self.skipTest('os.mkfifo(): %s' % e) 1275 try: 1276 self.assertRaises(shutil.SpecialFileError, 1277 shutil.copyfile, TESTFN, TESTFN2) 1278 self.assertRaises(shutil.SpecialFileError, 1279 shutil.copyfile, __file__, TESTFN) 1280 finally: 1281 os.remove(TESTFN) 1282 1283 def test_copyfile_return_value(self): 1284 # copytree returns its destination path. 1285 src_dir = self.mkdtemp() 1286 dst_dir = self.mkdtemp() 1287 dst_file = os.path.join(dst_dir, 'bar') 1288 src_file = os.path.join(src_dir, 'foo') 1289 write_file(src_file, 'foo') 1290 rv = shutil.copyfile(src_file, dst_file) 1291 self.assertTrue(os.path.exists(rv)) 1292 self.assertEqual(read_file(src_file), read_file(dst_file)) 1293 1294 def test_copyfile_same_file(self): 1295 # copyfile() should raise SameFileError if the source and destination 1296 # are the same. 1297 src_dir = self.mkdtemp() 1298 src_file = os.path.join(src_dir, 'foo') 1299 write_file(src_file, 'foo') 1300 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) 1301 # But Error should work too, to stay backward compatible. 1302 self.assertRaises(Error, shutil.copyfile, src_file, src_file) 1303 # Make sure file is not corrupted. 1304 self.assertEqual(read_file(src_file), 'foo') 1305 1306 @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)') 1307 # gh-92670: The test uses a trailing slash to force the OS consider 1308 # the path as a directory, but on AIX the trailing slash has no effect 1309 # and is considered as a file. 1310 @unittest.skipIf(AIX, 'Not valid on AIX, see gh-92670') 1311 def test_copyfile_nonexistent_dir(self): 1312 # Issue 43219 1313 src_dir = self.mkdtemp() 1314 src_file = os.path.join(src_dir, 'foo') 1315 dst = os.path.join(src_dir, 'does_not_exist/') 1316 write_file(src_file, 'foo') 1317 self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst) 1318 1319 def test_copyfile_copy_dir(self): 1320 # Issue 45234 1321 # test copy() and copyfile() raising proper exceptions when src and/or 1322 # dst are directories 1323 src_dir = self.mkdtemp() 1324 src_file = os.path.join(src_dir, 'foo') 1325 dir2 = self.mkdtemp() 1326 dst = os.path.join(src_dir, 'does_not_exist/') 1327 write_file(src_file, 'foo') 1328 if sys.platform == "win32": 1329 err = PermissionError 1330 else: 1331 err = IsADirectoryError 1332 1333 self.assertRaises(err, shutil.copyfile, src_dir, dst) 1334 self.assertRaises(err, shutil.copyfile, src_file, src_dir) 1335 self.assertRaises(err, shutil.copyfile, dir2, src_dir) 1336 1337 1338class TestArchives(BaseTest, unittest.TestCase): 1339 1340 ### shutil.make_archive 1341 1342 @support.requires_zlib() 1343 def test_make_tarball(self): 1344 # creating something to tar 1345 root_dir, base_dir = self._create_files('') 1346 1347 tmpdir2 = self.mkdtemp() 1348 # force shutil to create the directory 1349 os.rmdir(tmpdir2) 1350 # working with relative paths 1351 work_dir = os.path.dirname(tmpdir2) 1352 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 1353 1354 with os_helper.change_cwd(work_dir), no_chdir: 1355 base_name = os.path.abspath(rel_base_name) 1356 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.') 1357 1358 # check if the compressed tarball was created 1359 self.assertEqual(tarball, base_name + '.tar.gz') 1360 self.assertTrue(os.path.isfile(tarball)) 1361 self.assertTrue(tarfile.is_tarfile(tarball)) 1362 with tarfile.open(tarball, 'r:gz') as tf: 1363 self.assertCountEqual(tf.getnames(), 1364 ['.', './sub', './sub2', 1365 './file1', './file2', './sub/file3']) 1366 1367 # trying an uncompressed one 1368 with os_helper.change_cwd(work_dir), no_chdir: 1369 tarball = make_archive(rel_base_name, 'tar', root_dir, '.') 1370 self.assertEqual(tarball, base_name + '.tar') 1371 self.assertTrue(os.path.isfile(tarball)) 1372 self.assertTrue(tarfile.is_tarfile(tarball)) 1373 with tarfile.open(tarball, 'r') as tf: 1374 self.assertCountEqual(tf.getnames(), 1375 ['.', './sub', './sub2', 1376 './file1', './file2', './sub/file3']) 1377 1378 def _tarinfo(self, path): 1379 with tarfile.open(path) as tar: 1380 names = tar.getnames() 1381 names.sort() 1382 return tuple(names) 1383 1384 def _create_files(self, base_dir='dist'): 1385 # creating something to tar 1386 root_dir = self.mkdtemp() 1387 dist = os.path.join(root_dir, base_dir) 1388 os.makedirs(dist, exist_ok=True) 1389 write_file((dist, 'file1'), 'xxx') 1390 write_file((dist, 'file2'), 'xxx') 1391 os.mkdir(os.path.join(dist, 'sub')) 1392 write_file((dist, 'sub', 'file3'), 'xxx') 1393 os.mkdir(os.path.join(dist, 'sub2')) 1394 if base_dir: 1395 write_file((root_dir, 'outer'), 'xxx') 1396 return root_dir, base_dir 1397 1398 @support.requires_zlib() 1399 @unittest.skipUnless(shutil.which('tar'), 1400 'Need the tar command to run') 1401 def test_tarfile_vs_tar(self): 1402 root_dir, base_dir = self._create_files() 1403 base_name = os.path.join(self.mkdtemp(), 'archive') 1404 with no_chdir: 1405 tarball = make_archive(base_name, 'gztar', root_dir, base_dir) 1406 1407 # check if the compressed tarball was created 1408 self.assertEqual(tarball, base_name + '.tar.gz') 1409 self.assertTrue(os.path.isfile(tarball)) 1410 1411 # now create another tarball using `tar` 1412 tarball2 = os.path.join(root_dir, 'archive2.tar') 1413 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir] 1414 subprocess.check_call(tar_cmd, cwd=root_dir, 1415 stdout=subprocess.DEVNULL) 1416 1417 self.assertTrue(os.path.isfile(tarball2)) 1418 # let's compare both tarballs 1419 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) 1420 1421 # trying an uncompressed one 1422 with no_chdir: 1423 tarball = make_archive(base_name, 'tar', root_dir, base_dir) 1424 self.assertEqual(tarball, base_name + '.tar') 1425 self.assertTrue(os.path.isfile(tarball)) 1426 1427 # now for a dry_run 1428 with no_chdir: 1429 tarball = make_archive(base_name, 'tar', root_dir, base_dir, 1430 dry_run=True) 1431 self.assertEqual(tarball, base_name + '.tar') 1432 self.assertTrue(os.path.isfile(tarball)) 1433 1434 @support.requires_zlib() 1435 def test_make_zipfile(self): 1436 # creating something to zip 1437 root_dir, base_dir = self._create_files() 1438 1439 tmpdir2 = self.mkdtemp() 1440 # force shutil to create the directory 1441 os.rmdir(tmpdir2) 1442 # working with relative paths 1443 work_dir = os.path.dirname(tmpdir2) 1444 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 1445 1446 with os_helper.change_cwd(work_dir), no_chdir: 1447 base_name = os.path.abspath(rel_base_name) 1448 res = make_archive(rel_base_name, 'zip', root_dir) 1449 1450 self.assertEqual(res, base_name + '.zip') 1451 self.assertTrue(os.path.isfile(res)) 1452 self.assertTrue(zipfile.is_zipfile(res)) 1453 with zipfile.ZipFile(res) as zf: 1454 self.assertCountEqual(zf.namelist(), 1455 ['dist/', 'dist/sub/', 'dist/sub2/', 1456 'dist/file1', 'dist/file2', 'dist/sub/file3', 1457 'outer']) 1458 1459 with os_helper.change_cwd(work_dir), no_chdir: 1460 base_name = os.path.abspath(rel_base_name) 1461 res = make_archive(rel_base_name, 'zip', root_dir, base_dir) 1462 1463 self.assertEqual(res, base_name + '.zip') 1464 self.assertTrue(os.path.isfile(res)) 1465 self.assertTrue(zipfile.is_zipfile(res)) 1466 with zipfile.ZipFile(res) as zf: 1467 self.assertCountEqual(zf.namelist(), 1468 ['dist/', 'dist/sub/', 'dist/sub2/', 1469 'dist/file1', 'dist/file2', 'dist/sub/file3']) 1470 1471 @support.requires_zlib() 1472 @unittest.skipUnless(shutil.which('zip'), 1473 'Need the zip command to run') 1474 def test_zipfile_vs_zip(self): 1475 root_dir, base_dir = self._create_files() 1476 base_name = os.path.join(self.mkdtemp(), 'archive') 1477 with no_chdir: 1478 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1479 1480 # check if ZIP file was created 1481 self.assertEqual(archive, base_name + '.zip') 1482 self.assertTrue(os.path.isfile(archive)) 1483 1484 # now create another ZIP file using `zip` 1485 archive2 = os.path.join(root_dir, 'archive2.zip') 1486 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir] 1487 subprocess.check_call(zip_cmd, cwd=root_dir, 1488 stdout=subprocess.DEVNULL) 1489 1490 self.assertTrue(os.path.isfile(archive2)) 1491 # let's compare both ZIP files 1492 with zipfile.ZipFile(archive) as zf: 1493 names = zf.namelist() 1494 with zipfile.ZipFile(archive2) as zf: 1495 names2 = zf.namelist() 1496 self.assertEqual(sorted(names), sorted(names2)) 1497 1498 @support.requires_zlib() 1499 @unittest.skipUnless(shutil.which('unzip'), 1500 'Need the unzip command to run') 1501 def test_unzip_zipfile(self): 1502 root_dir, base_dir = self._create_files() 1503 base_name = os.path.join(self.mkdtemp(), 'archive') 1504 with no_chdir: 1505 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1506 1507 # check if ZIP file was created 1508 self.assertEqual(archive, base_name + '.zip') 1509 self.assertTrue(os.path.isfile(archive)) 1510 1511 # now check the ZIP file using `unzip -t` 1512 zip_cmd = ['unzip', '-t', archive] 1513 with os_helper.change_cwd(root_dir): 1514 try: 1515 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT) 1516 except subprocess.CalledProcessError as exc: 1517 details = exc.output.decode(errors="replace") 1518 if 'unrecognized option: t' in details: 1519 self.skipTest("unzip doesn't support -t") 1520 msg = "{}\n\n**Unzip Output**\n{}" 1521 self.fail(msg.format(exc, details)) 1522 1523 def test_make_archive(self): 1524 tmpdir = self.mkdtemp() 1525 base_name = os.path.join(tmpdir, 'archive') 1526 self.assertRaises(ValueError, make_archive, base_name, 'xxx') 1527 1528 @support.requires_zlib() 1529 def test_make_archive_owner_group(self): 1530 # testing make_archive with owner and group, with various combinations 1531 # this works even if there's not gid/uid support 1532 if UID_GID_SUPPORT: 1533 group = grp.getgrgid(0)[0] 1534 owner = pwd.getpwuid(0)[0] 1535 else: 1536 group = owner = 'root' 1537 1538 root_dir, base_dir = self._create_files() 1539 base_name = os.path.join(self.mkdtemp(), 'archive') 1540 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, 1541 group=group) 1542 self.assertTrue(os.path.isfile(res)) 1543 1544 res = make_archive(base_name, 'zip', root_dir, base_dir) 1545 self.assertTrue(os.path.isfile(res)) 1546 1547 res = make_archive(base_name, 'tar', root_dir, base_dir, 1548 owner=owner, group=group) 1549 self.assertTrue(os.path.isfile(res)) 1550 1551 res = make_archive(base_name, 'tar', root_dir, base_dir, 1552 owner='kjhkjhkjg', group='oihohoh') 1553 self.assertTrue(os.path.isfile(res)) 1554 1555 1556 @support.requires_zlib() 1557 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1558 def test_tarfile_root_owner(self): 1559 root_dir, base_dir = self._create_files() 1560 base_name = os.path.join(self.mkdtemp(), 'archive') 1561 group = grp.getgrgid(0)[0] 1562 owner = pwd.getpwuid(0)[0] 1563 with os_helper.change_cwd(root_dir), no_chdir: 1564 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', 1565 owner=owner, group=group) 1566 1567 # check if the compressed tarball was created 1568 self.assertTrue(os.path.isfile(archive_name)) 1569 1570 # now checks the rights 1571 archive = tarfile.open(archive_name) 1572 try: 1573 for member in archive.getmembers(): 1574 self.assertEqual(member.uid, 0) 1575 self.assertEqual(member.gid, 0) 1576 finally: 1577 archive.close() 1578 1579 def test_make_archive_cwd(self): 1580 current_dir = os.getcwd() 1581 root_dir = self.mkdtemp() 1582 def _breaks(*args, **kw): 1583 raise RuntimeError() 1584 dirs = [] 1585 def _chdir(path): 1586 dirs.append(path) 1587 orig_chdir(path) 1588 1589 register_archive_format('xxx', _breaks, [], 'xxx file') 1590 try: 1591 with support.swap_attr(os, 'chdir', _chdir) as orig_chdir: 1592 try: 1593 make_archive('xxx', 'xxx', root_dir=root_dir) 1594 except Exception: 1595 pass 1596 self.assertEqual(os.getcwd(), current_dir) 1597 self.assertEqual(dirs, [root_dir, current_dir]) 1598 finally: 1599 unregister_archive_format('xxx') 1600 1601 def test_make_tarfile_in_curdir(self): 1602 # Issue #21280 1603 root_dir = self.mkdtemp() 1604 with os_helper.change_cwd(root_dir), no_chdir: 1605 self.assertEqual(make_archive('test', 'tar'), 'test.tar') 1606 self.assertTrue(os.path.isfile('test.tar')) 1607 1608 @support.requires_zlib() 1609 def test_make_zipfile_in_curdir(self): 1610 # Issue #21280 1611 root_dir = self.mkdtemp() 1612 with os_helper.change_cwd(root_dir), no_chdir: 1613 self.assertEqual(make_archive('test', 'zip'), 'test.zip') 1614 self.assertTrue(os.path.isfile('test.zip')) 1615 1616 def test_register_archive_format(self): 1617 1618 self.assertRaises(TypeError, register_archive_format, 'xxx', 1) 1619 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1620 1) 1621 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1622 [(1, 2), (1, 2, 3)]) 1623 1624 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') 1625 formats = [name for name, params in get_archive_formats()] 1626 self.assertIn('xxx', formats) 1627 1628 unregister_archive_format('xxx') 1629 formats = [name for name, params in get_archive_formats()] 1630 self.assertNotIn('xxx', formats) 1631 1632 ### shutil.unpack_archive 1633 1634 def check_unpack_archive(self, format, **kwargs): 1635 self.check_unpack_archive_with_converter( 1636 format, lambda path: path, **kwargs) 1637 self.check_unpack_archive_with_converter( 1638 format, pathlib.Path, **kwargs) 1639 self.check_unpack_archive_with_converter(format, FakePath, **kwargs) 1640 1641 def check_unpack_archive_with_converter(self, format, converter, **kwargs): 1642 root_dir, base_dir = self._create_files() 1643 expected = rlistdir(root_dir) 1644 expected.remove('outer') 1645 1646 base_name = os.path.join(self.mkdtemp(), 'archive') 1647 filename = make_archive(base_name, format, root_dir, base_dir) 1648 1649 # let's try to unpack it now 1650 tmpdir2 = self.mkdtemp() 1651 unpack_archive(converter(filename), converter(tmpdir2), **kwargs) 1652 self.assertEqual(rlistdir(tmpdir2), expected) 1653 1654 # and again, this time with the format specified 1655 tmpdir3 = self.mkdtemp() 1656 unpack_archive(converter(filename), converter(tmpdir3), format=format, 1657 **kwargs) 1658 self.assertEqual(rlistdir(tmpdir3), expected) 1659 1660 with self.assertRaises(shutil.ReadError): 1661 unpack_archive(converter(TESTFN), **kwargs) 1662 with self.assertRaises(ValueError): 1663 unpack_archive(converter(TESTFN), format='xxx', **kwargs) 1664 1665 def check_unpack_tarball(self, format): 1666 self.check_unpack_archive(format, filter='fully_trusted') 1667 self.check_unpack_archive(format, filter='data') 1668 with warnings_helper.check_no_warnings(self): 1669 self.check_unpack_archive(format) 1670 1671 def test_unpack_archive_tar(self): 1672 self.check_unpack_tarball('tar') 1673 1674 @support.requires_zlib() 1675 def test_unpack_archive_gztar(self): 1676 self.check_unpack_tarball('gztar') 1677 1678 @support.requires_bz2() 1679 def test_unpack_archive_bztar(self): 1680 self.check_unpack_tarball('bztar') 1681 1682 @support.requires_lzma() 1683 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger") 1684 def test_unpack_archive_xztar(self): 1685 self.check_unpack_tarball('xztar') 1686 1687 @support.requires_zlib() 1688 def test_unpack_archive_zip(self): 1689 self.check_unpack_archive('zip') 1690 with self.assertRaises(TypeError): 1691 self.check_unpack_archive('zip', filter='data') 1692 1693 def test_unpack_registry(self): 1694 1695 formats = get_unpack_formats() 1696 1697 def _boo(filename, extract_dir, extra): 1698 self.assertEqual(extra, 1) 1699 self.assertEqual(filename, 'stuff.boo') 1700 self.assertEqual(extract_dir, 'xx') 1701 1702 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)]) 1703 unpack_archive('stuff.boo', 'xx') 1704 1705 # trying to register a .boo unpacker again 1706 self.assertRaises(RegistryError, register_unpack_format, 'Boo2', 1707 ['.boo'], _boo) 1708 1709 # should work now 1710 unregister_unpack_format('Boo') 1711 register_unpack_format('Boo2', ['.boo'], _boo) 1712 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats()) 1713 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats()) 1714 1715 # let's leave a clean state 1716 unregister_unpack_format('Boo2') 1717 self.assertEqual(get_unpack_formats(), formats) 1718 1719 1720class TestMisc(BaseTest, unittest.TestCase): 1721 1722 @unittest.skipUnless(hasattr(shutil, 'disk_usage'), 1723 "disk_usage not available on this platform") 1724 def test_disk_usage(self): 1725 usage = shutil.disk_usage(os.path.dirname(__file__)) 1726 for attr in ('total', 'used', 'free'): 1727 self.assertIsInstance(getattr(usage, attr), int) 1728 self.assertGreater(usage.total, 0) 1729 self.assertGreater(usage.used, 0) 1730 self.assertGreaterEqual(usage.free, 0) 1731 self.assertGreaterEqual(usage.total, usage.used) 1732 self.assertGreater(usage.total, usage.free) 1733 1734 # bpo-32557: Check that disk_usage() also accepts a filename 1735 shutil.disk_usage(__file__) 1736 1737 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1738 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown') 1739 def test_chown(self): 1740 dirname = self.mkdtemp() 1741 filename = tempfile.mktemp(dir=dirname) 1742 write_file(filename, 'testing chown function') 1743 1744 with self.assertRaises(ValueError): 1745 shutil.chown(filename) 1746 1747 with self.assertRaises(LookupError): 1748 shutil.chown(filename, user='non-existing username') 1749 1750 with self.assertRaises(LookupError): 1751 shutil.chown(filename, group='non-existing groupname') 1752 1753 with self.assertRaises(TypeError): 1754 shutil.chown(filename, b'spam') 1755 1756 with self.assertRaises(TypeError): 1757 shutil.chown(filename, 3.14) 1758 1759 uid = os.getuid() 1760 gid = os.getgid() 1761 1762 def check_chown(path, uid=None, gid=None): 1763 s = os.stat(filename) 1764 if uid is not None: 1765 self.assertEqual(uid, s.st_uid) 1766 if gid is not None: 1767 self.assertEqual(gid, s.st_gid) 1768 1769 shutil.chown(filename, uid, gid) 1770 check_chown(filename, uid, gid) 1771 shutil.chown(filename, uid) 1772 check_chown(filename, uid) 1773 shutil.chown(filename, user=uid) 1774 check_chown(filename, uid) 1775 shutil.chown(filename, group=gid) 1776 check_chown(filename, gid=gid) 1777 1778 shutil.chown(dirname, uid, gid) 1779 check_chown(dirname, uid, gid) 1780 shutil.chown(dirname, uid) 1781 check_chown(dirname, uid) 1782 shutil.chown(dirname, user=uid) 1783 check_chown(dirname, uid) 1784 shutil.chown(dirname, group=gid) 1785 check_chown(dirname, gid=gid) 1786 1787 try: 1788 user = pwd.getpwuid(uid)[0] 1789 group = grp.getgrgid(gid)[0] 1790 except KeyError: 1791 # On some systems uid/gid cannot be resolved. 1792 pass 1793 else: 1794 shutil.chown(filename, user, group) 1795 check_chown(filename, uid, gid) 1796 shutil.chown(dirname, user, group) 1797 check_chown(dirname, uid, gid) 1798 1799 1800class TestWhich(BaseTest, unittest.TestCase): 1801 1802 def setUp(self): 1803 self.temp_dir = self.mkdtemp(prefix="Tmp") 1804 # Give the temp_file an ".exe" suffix for all. 1805 # It's needed on Windows and not harmful on other platforms. 1806 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1807 prefix="Tmp", 1808 suffix=".Exe") 1809 os.chmod(self.temp_file.name, stat.S_IXUSR) 1810 self.addCleanup(self.temp_file.close) 1811 self.dir, self.file = os.path.split(self.temp_file.name) 1812 self.env_path = self.dir 1813 self.curdir = os.curdir 1814 self.ext = ".EXE" 1815 1816 def test_basic(self): 1817 # Given an EXE in a directory, it should be returned. 1818 rv = shutil.which(self.file, path=self.dir) 1819 self.assertEqual(rv, self.temp_file.name) 1820 1821 def test_absolute_cmd(self): 1822 # When given the fully qualified path to an executable that exists, 1823 # it should be returned. 1824 rv = shutil.which(self.temp_file.name, path=self.temp_dir) 1825 self.assertEqual(rv, self.temp_file.name) 1826 1827 def test_relative_cmd(self): 1828 # When given the relative path with a directory part to an executable 1829 # that exists, it should be returned. 1830 base_dir, tail_dir = os.path.split(self.dir) 1831 relpath = os.path.join(tail_dir, self.file) 1832 with os_helper.change_cwd(path=base_dir): 1833 rv = shutil.which(relpath, path=self.temp_dir) 1834 self.assertEqual(rv, relpath) 1835 # But it shouldn't be searched in PATH directories (issue #16957). 1836 with os_helper.change_cwd(path=self.dir): 1837 rv = shutil.which(relpath, path=base_dir) 1838 self.assertIsNone(rv) 1839 1840 def test_cwd(self): 1841 # Issue #16957 1842 base_dir = os.path.dirname(self.dir) 1843 with os_helper.change_cwd(path=self.dir): 1844 rv = shutil.which(self.file, path=base_dir) 1845 if sys.platform == "win32": 1846 # Windows: current directory implicitly on PATH 1847 self.assertEqual(rv, os.path.join(self.curdir, self.file)) 1848 else: 1849 # Other platforms: shouldn't match in the current directory. 1850 self.assertIsNone(rv) 1851 1852 @os_helper.skip_if_dac_override 1853 def test_non_matching_mode(self): 1854 # Set the file read-only and ask for writeable files. 1855 os.chmod(self.temp_file.name, stat.S_IREAD) 1856 if os.access(self.temp_file.name, os.W_OK): 1857 self.skipTest("can't set the file read-only") 1858 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK) 1859 self.assertIsNone(rv) 1860 1861 def test_relative_path(self): 1862 base_dir, tail_dir = os.path.split(self.dir) 1863 with os_helper.change_cwd(path=base_dir): 1864 rv = shutil.which(self.file, path=tail_dir) 1865 self.assertEqual(rv, os.path.join(tail_dir, self.file)) 1866 1867 def test_nonexistent_file(self): 1868 # Return None when no matching executable file is found on the path. 1869 rv = shutil.which("foo.exe", path=self.dir) 1870 self.assertIsNone(rv) 1871 1872 @unittest.skipUnless(sys.platform == "win32", 1873 "pathext check is Windows-only") 1874 def test_pathext_checking(self): 1875 # Ask for the file without the ".exe" extension, then ensure that 1876 # it gets found properly with the extension. 1877 rv = shutil.which(self.file[:-4], path=self.dir) 1878 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext) 1879 1880 def test_environ_path(self): 1881 with os_helper.EnvironmentVarGuard() as env: 1882 env['PATH'] = self.env_path 1883 rv = shutil.which(self.file) 1884 self.assertEqual(rv, self.temp_file.name) 1885 1886 def test_environ_path_empty(self): 1887 # PATH='': no match 1888 with os_helper.EnvironmentVarGuard() as env: 1889 env['PATH'] = '' 1890 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1891 create=True), \ 1892 support.swap_attr(os, 'defpath', self.dir), \ 1893 os_helper.change_cwd(self.dir): 1894 rv = shutil.which(self.file) 1895 self.assertIsNone(rv) 1896 1897 def test_environ_path_cwd(self): 1898 expected_cwd = os.path.basename(self.temp_file.name) 1899 if sys.platform == "win32": 1900 curdir = os.curdir 1901 if isinstance(expected_cwd, bytes): 1902 curdir = os.fsencode(curdir) 1903 expected_cwd = os.path.join(curdir, expected_cwd) 1904 1905 # PATH=':': explicitly looks in the current directory 1906 with os_helper.EnvironmentVarGuard() as env: 1907 env['PATH'] = os.pathsep 1908 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1909 create=True), \ 1910 support.swap_attr(os, 'defpath', self.dir): 1911 rv = shutil.which(self.file) 1912 self.assertIsNone(rv) 1913 1914 # look in current directory 1915 with os_helper.change_cwd(self.dir): 1916 rv = shutil.which(self.file) 1917 self.assertEqual(rv, expected_cwd) 1918 1919 def test_environ_path_missing(self): 1920 with os_helper.EnvironmentVarGuard() as env: 1921 env.pop('PATH', None) 1922 1923 # without confstr 1924 with unittest.mock.patch('os.confstr', side_effect=ValueError, \ 1925 create=True), \ 1926 support.swap_attr(os, 'defpath', self.dir): 1927 rv = shutil.which(self.file) 1928 self.assertEqual(rv, self.temp_file.name) 1929 1930 # with confstr 1931 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1932 create=True), \ 1933 support.swap_attr(os, 'defpath', ''): 1934 rv = shutil.which(self.file) 1935 self.assertEqual(rv, self.temp_file.name) 1936 1937 def test_empty_path(self): 1938 base_dir = os.path.dirname(self.dir) 1939 with os_helper.change_cwd(path=self.dir), \ 1940 os_helper.EnvironmentVarGuard() as env: 1941 env['PATH'] = self.env_path 1942 rv = shutil.which(self.file, path='') 1943 self.assertIsNone(rv) 1944 1945 def test_empty_path_no_PATH(self): 1946 with os_helper.EnvironmentVarGuard() as env: 1947 env.pop('PATH', None) 1948 rv = shutil.which(self.file) 1949 self.assertIsNone(rv) 1950 1951 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows') 1952 def test_pathext(self): 1953 ext = ".xyz" 1954 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1955 prefix="Tmp2", suffix=ext) 1956 os.chmod(temp_filexyz.name, stat.S_IXUSR) 1957 self.addCleanup(temp_filexyz.close) 1958 1959 # strip path and extension 1960 program = os.path.basename(temp_filexyz.name) 1961 program = os.path.splitext(program)[0] 1962 1963 with os_helper.EnvironmentVarGuard() as env: 1964 env['PATHEXT'] = ext 1965 rv = shutil.which(program, path=self.temp_dir) 1966 self.assertEqual(rv, temp_filexyz.name) 1967 1968 # Issue 40592: See https://bugs.python.org/issue40592 1969 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows') 1970 def test_pathext_with_empty_str(self): 1971 ext = ".xyz" 1972 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1973 prefix="Tmp2", suffix=ext) 1974 self.addCleanup(temp_filexyz.close) 1975 1976 # strip path and extension 1977 program = os.path.basename(temp_filexyz.name) 1978 program = os.path.splitext(program)[0] 1979 1980 with os_helper.EnvironmentVarGuard() as env: 1981 env['PATHEXT'] = f"{ext};" # note the ; 1982 rv = shutil.which(program, path=self.temp_dir) 1983 self.assertEqual(rv, temp_filexyz.name) 1984 1985 1986class TestWhichBytes(TestWhich): 1987 def setUp(self): 1988 TestWhich.setUp(self) 1989 self.dir = os.fsencode(self.dir) 1990 self.file = os.fsencode(self.file) 1991 self.temp_file.name = os.fsencode(self.temp_file.name) 1992 self.curdir = os.fsencode(self.curdir) 1993 self.ext = os.fsencode(self.ext) 1994 1995 1996class TestMove(BaseTest, unittest.TestCase): 1997 1998 def setUp(self): 1999 filename = "foo" 2000 self.src_dir = self.mkdtemp() 2001 self.dst_dir = self.mkdtemp() 2002 self.src_file = os.path.join(self.src_dir, filename) 2003 self.dst_file = os.path.join(self.dst_dir, filename) 2004 with open(self.src_file, "wb") as f: 2005 f.write(b"spam") 2006 2007 def _check_move_file(self, src, dst, real_dst): 2008 with open(src, "rb") as f: 2009 contents = f.read() 2010 shutil.move(src, dst) 2011 with open(real_dst, "rb") as f: 2012 self.assertEqual(contents, f.read()) 2013 self.assertFalse(os.path.exists(src)) 2014 2015 def _check_move_dir(self, src, dst, real_dst): 2016 contents = sorted(os.listdir(src)) 2017 shutil.move(src, dst) 2018 self.assertEqual(contents, sorted(os.listdir(real_dst))) 2019 self.assertFalse(os.path.exists(src)) 2020 2021 def test_move_file(self): 2022 # Move a file to another location on the same filesystem. 2023 self._check_move_file(self.src_file, self.dst_file, self.dst_file) 2024 2025 def test_move_file_to_dir(self): 2026 # Move a file inside an existing dir on the same filesystem. 2027 self._check_move_file(self.src_file, self.dst_dir, self.dst_file) 2028 2029 def test_move_file_to_dir_pathlike_src(self): 2030 # Move a pathlike file to another location on the same filesystem. 2031 src = pathlib.Path(self.src_file) 2032 self._check_move_file(src, self.dst_dir, self.dst_file) 2033 2034 def test_move_file_to_dir_pathlike_dst(self): 2035 # Move a file to another pathlike location on the same filesystem. 2036 dst = pathlib.Path(self.dst_dir) 2037 self._check_move_file(self.src_file, dst, self.dst_file) 2038 2039 @mock_rename 2040 def test_move_file_other_fs(self): 2041 # Move a file to an existing dir on another filesystem. 2042 self.test_move_file() 2043 2044 @mock_rename 2045 def test_move_file_to_dir_other_fs(self): 2046 # Move a file to another location on another filesystem. 2047 self.test_move_file_to_dir() 2048 2049 def test_move_dir(self): 2050 # Move a dir to another location on the same filesystem. 2051 dst_dir = tempfile.mktemp(dir=self.mkdtemp()) 2052 try: 2053 self._check_move_dir(self.src_dir, dst_dir, dst_dir) 2054 finally: 2055 os_helper.rmtree(dst_dir) 2056 2057 @mock_rename 2058 def test_move_dir_other_fs(self): 2059 # Move a dir to another location on another filesystem. 2060 self.test_move_dir() 2061 2062 def test_move_dir_to_dir(self): 2063 # Move a dir inside an existing dir on the same filesystem. 2064 self._check_move_dir(self.src_dir, self.dst_dir, 2065 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 2066 2067 @mock_rename 2068 def test_move_dir_to_dir_other_fs(self): 2069 # Move a dir inside an existing dir on another filesystem. 2070 self.test_move_dir_to_dir() 2071 2072 def test_move_dir_sep_to_dir(self): 2073 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir, 2074 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 2075 2076 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep') 2077 def test_move_dir_altsep_to_dir(self): 2078 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir, 2079 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 2080 2081 def test_existing_file_inside_dest_dir(self): 2082 # A file with the same name inside the destination dir already exists. 2083 with open(self.dst_file, "wb"): 2084 pass 2085 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir) 2086 2087 def test_dont_move_dir_in_itself(self): 2088 # Moving a dir inside itself raises an Error. 2089 dst = os.path.join(self.src_dir, "bar") 2090 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst) 2091 2092 def test_destinsrc_false_negative(self): 2093 os.mkdir(TESTFN) 2094 try: 2095 for src, dst in [('srcdir', 'srcdir/dest')]: 2096 src = os.path.join(TESTFN, src) 2097 dst = os.path.join(TESTFN, dst) 2098 self.assertTrue(shutil._destinsrc(src, dst), 2099 msg='_destinsrc() wrongly concluded that ' 2100 'dst (%s) is not in src (%s)' % (dst, src)) 2101 finally: 2102 os_helper.rmtree(TESTFN) 2103 2104 def test_destinsrc_false_positive(self): 2105 os.mkdir(TESTFN) 2106 try: 2107 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]: 2108 src = os.path.join(TESTFN, src) 2109 dst = os.path.join(TESTFN, dst) 2110 self.assertFalse(shutil._destinsrc(src, dst), 2111 msg='_destinsrc() wrongly concluded that ' 2112 'dst (%s) is in src (%s)' % (dst, src)) 2113 finally: 2114 os_helper.rmtree(TESTFN) 2115 2116 @os_helper.skip_unless_symlink 2117 @mock_rename 2118 def test_move_file_symlink(self): 2119 dst = os.path.join(self.src_dir, 'bar') 2120 os.symlink(self.src_file, dst) 2121 shutil.move(dst, self.dst_file) 2122 self.assertTrue(os.path.islink(self.dst_file)) 2123 self.assertTrue(os.path.samefile(self.src_file, self.dst_file)) 2124 2125 @os_helper.skip_unless_symlink 2126 @mock_rename 2127 def test_move_file_symlink_to_dir(self): 2128 filename = "bar" 2129 dst = os.path.join(self.src_dir, filename) 2130 os.symlink(self.src_file, dst) 2131 shutil.move(dst, self.dst_dir) 2132 final_link = os.path.join(self.dst_dir, filename) 2133 self.assertTrue(os.path.islink(final_link)) 2134 self.assertTrue(os.path.samefile(self.src_file, final_link)) 2135 2136 @os_helper.skip_unless_symlink 2137 @mock_rename 2138 def test_move_dangling_symlink(self): 2139 src = os.path.join(self.src_dir, 'baz') 2140 dst = os.path.join(self.src_dir, 'bar') 2141 os.symlink(src, dst) 2142 dst_link = os.path.join(self.dst_dir, 'quux') 2143 shutil.move(dst, dst_link) 2144 self.assertTrue(os.path.islink(dst_link)) 2145 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link)) 2146 2147 @os_helper.skip_unless_symlink 2148 @mock_rename 2149 def test_move_dir_symlink(self): 2150 src = os.path.join(self.src_dir, 'baz') 2151 dst = os.path.join(self.src_dir, 'bar') 2152 os.mkdir(src) 2153 os.symlink(src, dst) 2154 dst_link = os.path.join(self.dst_dir, 'quux') 2155 shutil.move(dst, dst_link) 2156 self.assertTrue(os.path.islink(dst_link)) 2157 self.assertTrue(os.path.samefile(src, dst_link)) 2158 2159 def test_move_return_value(self): 2160 rv = shutil.move(self.src_file, self.dst_dir) 2161 self.assertEqual(rv, 2162 os.path.join(self.dst_dir, os.path.basename(self.src_file))) 2163 2164 def test_move_as_rename_return_value(self): 2165 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar')) 2166 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar')) 2167 2168 @mock_rename 2169 def test_move_file_special_function(self): 2170 moved = [] 2171 def _copy(src, dst): 2172 moved.append((src, dst)) 2173 shutil.move(self.src_file, self.dst_dir, copy_function=_copy) 2174 self.assertEqual(len(moved), 1) 2175 2176 @mock_rename 2177 def test_move_dir_special_function(self): 2178 moved = [] 2179 def _copy(src, dst): 2180 moved.append((src, dst)) 2181 os_helper.create_empty_file(os.path.join(self.src_dir, 'child')) 2182 os_helper.create_empty_file(os.path.join(self.src_dir, 'child1')) 2183 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy) 2184 self.assertEqual(len(moved), 3) 2185 2186 def test_move_dir_caseinsensitive(self): 2187 # Renames a folder to the same name 2188 # but a different case. 2189 2190 self.src_dir = self.mkdtemp() 2191 dst_dir = os.path.join( 2192 os.path.dirname(self.src_dir), 2193 os.path.basename(self.src_dir).upper()) 2194 self.assertNotEqual(self.src_dir, dst_dir) 2195 2196 try: 2197 shutil.move(self.src_dir, dst_dir) 2198 self.assertTrue(os.path.isdir(dst_dir)) 2199 finally: 2200 os.rmdir(dst_dir) 2201 2202 2203 @os_helper.skip_unless_dac_override 2204 @unittest.skipUnless(hasattr(os, 'lchflags') 2205 and hasattr(stat, 'SF_IMMUTABLE') 2206 and hasattr(stat, 'UF_OPAQUE'), 2207 'requires lchflags') 2208 def test_move_dir_permission_denied(self): 2209 # bpo-42782: shutil.move should not create destination directories 2210 # if the source directory cannot be removed. 2211 try: 2212 os.mkdir(TESTFN_SRC) 2213 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE) 2214 2215 # Testing on an empty immutable directory 2216 # TESTFN_DST should not exist if shutil.move failed 2217 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST) 2218 self.assertFalse(TESTFN_DST in os.listdir()) 2219 2220 # Create a file and keep the directory immutable 2221 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE) 2222 os_helper.create_empty_file(os.path.join(TESTFN_SRC, 'child')) 2223 os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE) 2224 2225 # Testing on a non-empty immutable directory 2226 # TESTFN_DST should not exist if shutil.move failed 2227 self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST) 2228 self.assertFalse(TESTFN_DST in os.listdir()) 2229 finally: 2230 if os.path.exists(TESTFN_SRC): 2231 os.lchflags(TESTFN_SRC, stat.UF_OPAQUE) 2232 os_helper.rmtree(TESTFN_SRC) 2233 if os.path.exists(TESTFN_DST): 2234 os.lchflags(TESTFN_DST, stat.UF_OPAQUE) 2235 os_helper.rmtree(TESTFN_DST) 2236 2237 2238class TestCopyFile(unittest.TestCase): 2239 2240 class Faux(object): 2241 _entered = False 2242 _exited_with = None 2243 _raised = False 2244 def __init__(self, raise_in_exit=False, suppress_at_exit=True): 2245 self._raise_in_exit = raise_in_exit 2246 self._suppress_at_exit = suppress_at_exit 2247 def read(self, *args): 2248 return '' 2249 def __enter__(self): 2250 self._entered = True 2251 def __exit__(self, exc_type, exc_val, exc_tb): 2252 self._exited_with = exc_type, exc_val, exc_tb 2253 if self._raise_in_exit: 2254 self._raised = True 2255 raise OSError("Cannot close") 2256 return self._suppress_at_exit 2257 2258 def test_w_source_open_fails(self): 2259 def _open(filename, mode='r'): 2260 if filename == 'srcfile': 2261 raise OSError('Cannot open "srcfile"') 2262 assert 0 # shouldn't reach here. 2263 2264 with support.swap_attr(shutil, 'open', _open): 2265 with self.assertRaises(OSError): 2266 shutil.copyfile('srcfile', 'destfile') 2267 2268 @unittest.skipIf(MACOS, "skipped on macOS") 2269 def test_w_dest_open_fails(self): 2270 srcfile = self.Faux() 2271 2272 def _open(filename, mode='r'): 2273 if filename == 'srcfile': 2274 return srcfile 2275 if filename == 'destfile': 2276 raise OSError('Cannot open "destfile"') 2277 assert 0 # shouldn't reach here. 2278 2279 with support.swap_attr(shutil, 'open', _open): 2280 shutil.copyfile('srcfile', 'destfile') 2281 self.assertTrue(srcfile._entered) 2282 self.assertTrue(srcfile._exited_with[0] is OSError) 2283 self.assertEqual(srcfile._exited_with[1].args, 2284 ('Cannot open "destfile"',)) 2285 2286 @unittest.skipIf(MACOS, "skipped on macOS") 2287 def test_w_dest_close_fails(self): 2288 srcfile = self.Faux() 2289 destfile = self.Faux(True) 2290 2291 def _open(filename, mode='r'): 2292 if filename == 'srcfile': 2293 return srcfile 2294 if filename == 'destfile': 2295 return destfile 2296 assert 0 # shouldn't reach here. 2297 2298 with support.swap_attr(shutil, 'open', _open): 2299 shutil.copyfile('srcfile', 'destfile') 2300 self.assertTrue(srcfile._entered) 2301 self.assertTrue(destfile._entered) 2302 self.assertTrue(destfile._raised) 2303 self.assertTrue(srcfile._exited_with[0] is OSError) 2304 self.assertEqual(srcfile._exited_with[1].args, 2305 ('Cannot close',)) 2306 2307 @unittest.skipIf(MACOS, "skipped on macOS") 2308 def test_w_source_close_fails(self): 2309 2310 srcfile = self.Faux(True) 2311 destfile = self.Faux() 2312 2313 def _open(filename, mode='r'): 2314 if filename == 'srcfile': 2315 return srcfile 2316 if filename == 'destfile': 2317 return destfile 2318 assert 0 # shouldn't reach here. 2319 2320 with support.swap_attr(shutil, 'open', _open): 2321 with self.assertRaises(OSError): 2322 shutil.copyfile('srcfile', 'destfile') 2323 self.assertTrue(srcfile._entered) 2324 self.assertTrue(destfile._entered) 2325 self.assertFalse(destfile._raised) 2326 self.assertTrue(srcfile._exited_with[0] is None) 2327 self.assertTrue(srcfile._raised) 2328 2329 2330class TestCopyFileObj(unittest.TestCase): 2331 FILESIZE = 2 * 1024 * 1024 2332 2333 @classmethod 2334 def setUpClass(cls): 2335 write_test_file(TESTFN, cls.FILESIZE) 2336 2337 @classmethod 2338 def tearDownClass(cls): 2339 os_helper.unlink(TESTFN) 2340 os_helper.unlink(TESTFN2) 2341 2342 def tearDown(self): 2343 os_helper.unlink(TESTFN2) 2344 2345 @contextlib.contextmanager 2346 def get_files(self): 2347 with open(TESTFN, "rb") as src: 2348 with open(TESTFN2, "wb") as dst: 2349 yield (src, dst) 2350 2351 def assert_files_eq(self, src, dst): 2352 with open(src, 'rb') as fsrc: 2353 with open(dst, 'rb') as fdst: 2354 self.assertEqual(fsrc.read(), fdst.read()) 2355 2356 def test_content(self): 2357 with self.get_files() as (src, dst): 2358 shutil.copyfileobj(src, dst) 2359 self.assert_files_eq(TESTFN, TESTFN2) 2360 2361 def test_file_not_closed(self): 2362 with self.get_files() as (src, dst): 2363 shutil.copyfileobj(src, dst) 2364 assert not src.closed 2365 assert not dst.closed 2366 2367 def test_file_offset(self): 2368 with self.get_files() as (src, dst): 2369 shutil.copyfileobj(src, dst) 2370 self.assertEqual(src.tell(), self.FILESIZE) 2371 self.assertEqual(dst.tell(), self.FILESIZE) 2372 2373 @unittest.skipIf(os.name != 'nt', "Windows only") 2374 def test_win_impl(self): 2375 # Make sure alternate Windows implementation is called. 2376 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2377 shutil.copyfile(TESTFN, TESTFN2) 2378 assert m.called 2379 2380 # File size is 2 MiB but max buf size should be 1 MiB. 2381 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024) 2382 2383 # If file size < 1 MiB memoryview() length must be equal to 2384 # the actual file size. 2385 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: 2386 f.write(b'foo') 2387 fname = f.name 2388 self.addCleanup(os_helper.unlink, fname) 2389 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2390 shutil.copyfile(fname, TESTFN2) 2391 self.assertEqual(m.call_args[0][2], 3) 2392 2393 # Empty files should not rely on readinto() variant. 2394 with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: 2395 pass 2396 fname = f.name 2397 self.addCleanup(os_helper.unlink, fname) 2398 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2399 shutil.copyfile(fname, TESTFN2) 2400 assert not m.called 2401 self.assert_files_eq(fname, TESTFN2) 2402 2403 2404class _ZeroCopyFileTest(object): 2405 """Tests common to all zero-copy APIs.""" 2406 FILESIZE = (10 * 1024 * 1024) # 10 MiB 2407 FILEDATA = b"" 2408 PATCHPOINT = "" 2409 2410 @classmethod 2411 def setUpClass(cls): 2412 write_test_file(TESTFN, cls.FILESIZE) 2413 with open(TESTFN, 'rb') as f: 2414 cls.FILEDATA = f.read() 2415 assert len(cls.FILEDATA) == cls.FILESIZE 2416 2417 @classmethod 2418 def tearDownClass(cls): 2419 os_helper.unlink(TESTFN) 2420 2421 def tearDown(self): 2422 os_helper.unlink(TESTFN2) 2423 2424 @contextlib.contextmanager 2425 def get_files(self): 2426 with open(TESTFN, "rb") as src: 2427 with open(TESTFN2, "wb") as dst: 2428 yield (src, dst) 2429 2430 def zerocopy_fun(self, *args, **kwargs): 2431 raise NotImplementedError("must be implemented in subclass") 2432 2433 def reset(self): 2434 self.tearDown() 2435 self.tearDownClass() 2436 self.setUpClass() 2437 self.setUp() 2438 2439 # --- 2440 2441 def test_regular_copy(self): 2442 with self.get_files() as (src, dst): 2443 self.zerocopy_fun(src, dst) 2444 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2445 # Make sure the fallback function is not called. 2446 with self.get_files() as (src, dst): 2447 with unittest.mock.patch('shutil.copyfileobj') as m: 2448 shutil.copyfile(TESTFN, TESTFN2) 2449 assert not m.called 2450 2451 def test_same_file(self): 2452 self.addCleanup(self.reset) 2453 with self.get_files() as (src, dst): 2454 with self.assertRaises(Exception): 2455 self.zerocopy_fun(src, src) 2456 # Make sure src file is not corrupted. 2457 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA) 2458 2459 def test_non_existent_src(self): 2460 name = tempfile.mktemp(dir=os.getcwd()) 2461 with self.assertRaises(FileNotFoundError) as cm: 2462 shutil.copyfile(name, "new") 2463 self.assertEqual(cm.exception.filename, name) 2464 2465 def test_empty_file(self): 2466 srcname = TESTFN + 'src' 2467 dstname = TESTFN + 'dst' 2468 self.addCleanup(lambda: os_helper.unlink(srcname)) 2469 self.addCleanup(lambda: os_helper.unlink(dstname)) 2470 with open(srcname, "wb"): 2471 pass 2472 2473 with open(srcname, "rb") as src: 2474 with open(dstname, "wb") as dst: 2475 self.zerocopy_fun(src, dst) 2476 2477 self.assertEqual(read_file(dstname, binary=True), b"") 2478 2479 def test_unhandled_exception(self): 2480 with unittest.mock.patch(self.PATCHPOINT, 2481 side_effect=ZeroDivisionError): 2482 self.assertRaises(ZeroDivisionError, 2483 shutil.copyfile, TESTFN, TESTFN2) 2484 2485 def test_exception_on_first_call(self): 2486 # Emulate a case where the first call to the zero-copy 2487 # function raises an exception in which case the function is 2488 # supposed to give up immediately. 2489 with unittest.mock.patch(self.PATCHPOINT, 2490 side_effect=OSError(errno.EINVAL, "yo")): 2491 with self.get_files() as (src, dst): 2492 with self.assertRaises(_GiveupOnFastCopy): 2493 self.zerocopy_fun(src, dst) 2494 2495 def test_filesystem_full(self): 2496 # Emulate a case where filesystem is full and sendfile() fails 2497 # on first call. 2498 with unittest.mock.patch(self.PATCHPOINT, 2499 side_effect=OSError(errno.ENOSPC, "yo")): 2500 with self.get_files() as (src, dst): 2501 self.assertRaises(OSError, self.zerocopy_fun, src, dst) 2502 2503 2504@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported') 2505class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase): 2506 PATCHPOINT = "os.sendfile" 2507 2508 def zerocopy_fun(self, fsrc, fdst): 2509 return shutil._fastcopy_sendfile(fsrc, fdst) 2510 2511 def test_non_regular_file_src(self): 2512 with io.BytesIO(self.FILEDATA) as src: 2513 with open(TESTFN2, "wb") as dst: 2514 with self.assertRaises(_GiveupOnFastCopy): 2515 self.zerocopy_fun(src, dst) 2516 shutil.copyfileobj(src, dst) 2517 2518 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2519 2520 def test_non_regular_file_dst(self): 2521 with open(TESTFN, "rb") as src: 2522 with io.BytesIO() as dst: 2523 with self.assertRaises(_GiveupOnFastCopy): 2524 self.zerocopy_fun(src, dst) 2525 shutil.copyfileobj(src, dst) 2526 dst.seek(0) 2527 self.assertEqual(dst.read(), self.FILEDATA) 2528 2529 def test_exception_on_second_call(self): 2530 def sendfile(*args, **kwargs): 2531 if not flag: 2532 flag.append(None) 2533 return orig_sendfile(*args, **kwargs) 2534 else: 2535 raise OSError(errno.EBADF, "yo") 2536 2537 flag = [] 2538 orig_sendfile = os.sendfile 2539 with unittest.mock.patch('os.sendfile', create=True, 2540 side_effect=sendfile): 2541 with self.get_files() as (src, dst): 2542 with self.assertRaises(OSError) as cm: 2543 shutil._fastcopy_sendfile(src, dst) 2544 assert flag 2545 self.assertEqual(cm.exception.errno, errno.EBADF) 2546 2547 def test_cant_get_size(self): 2548 # Emulate a case where src file size cannot be determined. 2549 # Internally bufsize will be set to a small value and 2550 # sendfile() will be called repeatedly. 2551 with unittest.mock.patch('os.fstat', side_effect=OSError) as m: 2552 with self.get_files() as (src, dst): 2553 shutil._fastcopy_sendfile(src, dst) 2554 assert m.called 2555 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2556 2557 def test_small_chunks(self): 2558 # Force internal file size detection to be smaller than the 2559 # actual file size. We want to force sendfile() to be called 2560 # multiple times, also in order to emulate a src fd which gets 2561 # bigger while it is being copied. 2562 mock = unittest.mock.Mock() 2563 mock.st_size = 65536 + 1 2564 with unittest.mock.patch('os.fstat', return_value=mock) as m: 2565 with self.get_files() as (src, dst): 2566 shutil._fastcopy_sendfile(src, dst) 2567 assert m.called 2568 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2569 2570 def test_big_chunk(self): 2571 # Force internal file size detection to be +100MB bigger than 2572 # the actual file size. Make sure sendfile() does not rely on 2573 # file size value except for (maybe) a better throughput / 2574 # performance. 2575 mock = unittest.mock.Mock() 2576 mock.st_size = self.FILESIZE + (100 * 1024 * 1024) 2577 with unittest.mock.patch('os.fstat', return_value=mock) as m: 2578 with self.get_files() as (src, dst): 2579 shutil._fastcopy_sendfile(src, dst) 2580 assert m.called 2581 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2582 2583 def test_blocksize_arg(self): 2584 with unittest.mock.patch('os.sendfile', 2585 side_effect=ZeroDivisionError) as m: 2586 self.assertRaises(ZeroDivisionError, 2587 shutil.copyfile, TESTFN, TESTFN2) 2588 blocksize = m.call_args[0][3] 2589 # Make sure file size and the block size arg passed to 2590 # sendfile() are the same. 2591 self.assertEqual(blocksize, os.path.getsize(TESTFN)) 2592 # ...unless we're dealing with a small file. 2593 os_helper.unlink(TESTFN2) 2594 write_file(TESTFN2, b"hello", binary=True) 2595 self.addCleanup(os_helper.unlink, TESTFN2 + '3') 2596 self.assertRaises(ZeroDivisionError, 2597 shutil.copyfile, TESTFN2, TESTFN2 + '3') 2598 blocksize = m.call_args[0][3] 2599 self.assertEqual(blocksize, 2 ** 23) 2600 2601 def test_file2file_not_supported(self): 2602 # Emulate a case where sendfile() only support file->socket 2603 # fds. In such a case copyfile() is supposed to skip the 2604 # fast-copy attempt from then on. 2605 assert shutil._USE_CP_SENDFILE 2606 try: 2607 with unittest.mock.patch( 2608 self.PATCHPOINT, 2609 side_effect=OSError(errno.ENOTSOCK, "yo")) as m: 2610 with self.get_files() as (src, dst): 2611 with self.assertRaises(_GiveupOnFastCopy): 2612 shutil._fastcopy_sendfile(src, dst) 2613 assert m.called 2614 assert not shutil._USE_CP_SENDFILE 2615 2616 with unittest.mock.patch(self.PATCHPOINT) as m: 2617 shutil.copyfile(TESTFN, TESTFN2) 2618 assert not m.called 2619 finally: 2620 shutil._USE_CP_SENDFILE = True 2621 2622 2623@unittest.skipIf(not MACOS, 'macOS only') 2624class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase): 2625 PATCHPOINT = "posix._fcopyfile" 2626 2627 def zerocopy_fun(self, src, dst): 2628 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA) 2629 2630 2631class TestGetTerminalSize(unittest.TestCase): 2632 def test_does_not_crash(self): 2633 """Check if get_terminal_size() returns a meaningful value. 2634 2635 There's no easy portable way to actually check the size of the 2636 terminal, so let's check if it returns something sensible instead. 2637 """ 2638 size = shutil.get_terminal_size() 2639 self.assertGreaterEqual(size.columns, 0) 2640 self.assertGreaterEqual(size.lines, 0) 2641 2642 def test_os_environ_first(self): 2643 "Check if environment variables have precedence" 2644 2645 with os_helper.EnvironmentVarGuard() as env: 2646 env['COLUMNS'] = '777' 2647 del env['LINES'] 2648 size = shutil.get_terminal_size() 2649 self.assertEqual(size.columns, 777) 2650 2651 with os_helper.EnvironmentVarGuard() as env: 2652 del env['COLUMNS'] 2653 env['LINES'] = '888' 2654 size = shutil.get_terminal_size() 2655 self.assertEqual(size.lines, 888) 2656 2657 def test_bad_environ(self): 2658 with os_helper.EnvironmentVarGuard() as env: 2659 env['COLUMNS'] = 'xxx' 2660 env['LINES'] = 'yyy' 2661 size = shutil.get_terminal_size() 2662 self.assertGreaterEqual(size.columns, 0) 2663 self.assertGreaterEqual(size.lines, 0) 2664 2665 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty") 2666 @unittest.skipUnless(hasattr(os, 'get_terminal_size'), 2667 'need os.get_terminal_size()') 2668 def test_stty_match(self): 2669 """Check if stty returns the same results ignoring env 2670 2671 This test will fail if stdin and stdout are connected to 2672 different terminals with different sizes. Nevertheless, such 2673 situations should be pretty rare. 2674 """ 2675 try: 2676 size = subprocess.check_output(['stty', 'size']).decode().split() 2677 except (FileNotFoundError, PermissionError, 2678 subprocess.CalledProcessError): 2679 self.skipTest("stty invocation failed") 2680 expected = (int(size[1]), int(size[0])) # reversed order 2681 2682 with os_helper.EnvironmentVarGuard() as env: 2683 del env['LINES'] 2684 del env['COLUMNS'] 2685 actual = shutil.get_terminal_size() 2686 2687 self.assertEqual(expected, actual) 2688 2689 @unittest.skipIf(support.is_wasi, "WASI has no /dev/null") 2690 def test_fallback(self): 2691 with os_helper.EnvironmentVarGuard() as env: 2692 del env['LINES'] 2693 del env['COLUMNS'] 2694 2695 # sys.__stdout__ has no fileno() 2696 with support.swap_attr(sys, '__stdout__', None): 2697 size = shutil.get_terminal_size(fallback=(10, 20)) 2698 self.assertEqual(size.columns, 10) 2699 self.assertEqual(size.lines, 20) 2700 2701 # sys.__stdout__ is not a terminal on Unix 2702 # or fileno() not in (0, 1, 2) on Windows 2703 with open(os.devnull, 'w', encoding='utf-8') as f, \ 2704 support.swap_attr(sys, '__stdout__', f): 2705 size = shutil.get_terminal_size(fallback=(30, 40)) 2706 self.assertEqual(size.columns, 30) 2707 self.assertEqual(size.lines, 40) 2708 2709 2710class PublicAPITests(unittest.TestCase): 2711 """Ensures that the correct values are exposed in the public API.""" 2712 2713 def test_module_all_attribute(self): 2714 self.assertTrue(hasattr(shutil, '__all__')) 2715 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat', 2716 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error', 2717 'SpecialFileError', 'ExecError', 'make_archive', 2718 'get_archive_formats', 'register_archive_format', 2719 'unregister_archive_format', 'get_unpack_formats', 2720 'register_unpack_format', 'unregister_unpack_format', 2721 'unpack_archive', 'ignore_patterns', 'chown', 'which', 2722 'get_terminal_size', 'SameFileError'] 2723 if hasattr(os, 'statvfs') or os.name == 'nt': 2724 target_api.append('disk_usage') 2725 self.assertEqual(set(shutil.__all__), set(target_api)) 2726 2727 2728if __name__ == '__main__': 2729 unittest.main() 2730