1import sys 2import os 3import io 4from hashlib import sha256 5from contextlib import contextmanager 6from random import Random 7import pathlib 8import shutil 9import re 10import warnings 11import stat 12 13import unittest 14import unittest.mock 15import tarfile 16 17from test import support 18from test.support import os_helper 19from test.support import script_helper 20from test.support import warnings_helper 21 22# Check for our compression modules. 23try: 24 import gzip 25except ImportError: 26 gzip = None 27try: 28 import zlib 29except ImportError: 30 zlib = None 31try: 32 import bz2 33except ImportError: 34 bz2 = None 35try: 36 import lzma 37except ImportError: 38 lzma = None 39 40def sha256sum(data): 41 return sha256(data).hexdigest() 42 43TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir" 44tarextdir = TEMPDIR + '-extract-test' 45tarname = support.findfile("testtar.tar") 46gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 47bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 48xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 49tmpname = os.path.join(TEMPDIR, "tmp.tar") 50dotlessname = os.path.join(TEMPDIR, "testtar") 51 52sha256_regtype = ( 53 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 54) 55sha256_sparse = ( 56 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 57) 58 59 60class TarTest: 61 tarname = tarname 62 suffix = '' 63 open = io.FileIO 64 taropen = tarfile.TarFile.taropen 65 66 @property 67 def mode(self): 68 return self.prefix + self.suffix 69 70@support.requires_gzip() 71class GzipTest: 72 tarname = gzipname 73 suffix = 'gz' 74 open = gzip.GzipFile if gzip else None 75 taropen = tarfile.TarFile.gzopen 76 77@support.requires_bz2() 78class Bz2Test: 79 tarname = bz2name 80 suffix = 'bz2' 81 open = bz2.BZ2File if bz2 else None 82 taropen = tarfile.TarFile.bz2open 83 84@support.requires_lzma() 85class LzmaTest: 86 tarname = xzname 87 suffix = 'xz' 88 open = lzma.LZMAFile if lzma else None 89 taropen = tarfile.TarFile.xzopen 90 91 92class ReadTest(TarTest): 93 94 prefix = "r:" 95 96 def setUp(self): 97 self.tar = tarfile.open(self.tarname, mode=self.mode, 98 encoding="iso8859-1") 99 100 def tearDown(self): 101 self.tar.close() 102 103 104class UstarReadTest(ReadTest, unittest.TestCase): 105 106 def test_fileobj_regular_file(self): 107 tarinfo = self.tar.getmember("ustar/regtype") 108 with self.tar.extractfile(tarinfo) as fobj: 109 data = fobj.read() 110 self.assertEqual(len(data), tarinfo.size, 111 "regular file extraction failed") 112 self.assertEqual(sha256sum(data), sha256_regtype, 113 "regular file extraction failed") 114 115 def test_fileobj_readlines(self): 116 self.tar.extract("ustar/regtype", TEMPDIR, filter='data') 117 tarinfo = self.tar.getmember("ustar/regtype") 118 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 119 lines1 = fobj1.readlines() 120 121 with self.tar.extractfile(tarinfo) as fobj: 122 fobj2 = io.TextIOWrapper(fobj) 123 lines2 = fobj2.readlines() 124 self.assertEqual(lines1, lines2, 125 "fileobj.readlines() failed") 126 self.assertEqual(len(lines2), 114, 127 "fileobj.readlines() failed") 128 self.assertEqual(lines2[83], 129 "I will gladly admit that Python is not the fastest " 130 "running scripting language.\n", 131 "fileobj.readlines() failed") 132 133 def test_fileobj_iter(self): 134 self.tar.extract("ustar/regtype", TEMPDIR, filter='data') 135 tarinfo = self.tar.getmember("ustar/regtype") 136 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 137 lines1 = fobj1.readlines() 138 with self.tar.extractfile(tarinfo) as fobj2: 139 lines2 = list(io.TextIOWrapper(fobj2)) 140 self.assertEqual(lines1, lines2, 141 "fileobj.__iter__() failed") 142 143 def test_fileobj_seek(self): 144 self.tar.extract("ustar/regtype", TEMPDIR, 145 filter='data') 146 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 147 data = fobj.read() 148 149 tarinfo = self.tar.getmember("ustar/regtype") 150 with self.tar.extractfile(tarinfo) as fobj: 151 text = fobj.read() 152 fobj.seek(0) 153 self.assertEqual(0, fobj.tell(), 154 "seek() to file's start failed") 155 fobj.seek(2048, 0) 156 self.assertEqual(2048, fobj.tell(), 157 "seek() to absolute position failed") 158 fobj.seek(-1024, 1) 159 self.assertEqual(1024, fobj.tell(), 160 "seek() to negative relative position failed") 161 fobj.seek(1024, 1) 162 self.assertEqual(2048, fobj.tell(), 163 "seek() to positive relative position failed") 164 s = fobj.read(10) 165 self.assertEqual(s, data[2048:2058], 166 "read() after seek failed") 167 fobj.seek(0, 2) 168 self.assertEqual(tarinfo.size, fobj.tell(), 169 "seek() to file's end failed") 170 self.assertEqual(fobj.read(), b"", 171 "read() at file's end did not return empty string") 172 fobj.seek(-tarinfo.size, 2) 173 self.assertEqual(0, fobj.tell(), 174 "relative seek() to file's end failed") 175 fobj.seek(512) 176 s1 = fobj.readlines() 177 fobj.seek(512) 178 s2 = fobj.readlines() 179 self.assertEqual(s1, s2, 180 "readlines() after seek failed") 181 fobj.seek(0) 182 self.assertEqual(len(fobj.readline()), fobj.tell(), 183 "tell() after readline() failed") 184 fobj.seek(512) 185 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 186 "tell() after seek() and readline() failed") 187 fobj.seek(0) 188 line = fobj.readline() 189 self.assertEqual(fobj.read(), data[len(line):], 190 "read() after readline() failed") 191 192 def test_fileobj_text(self): 193 with self.tar.extractfile("ustar/regtype") as fobj: 194 fobj = io.TextIOWrapper(fobj) 195 data = fobj.read().encode("iso8859-1") 196 self.assertEqual(sha256sum(data), sha256_regtype) 197 try: 198 fobj.seek(100) 199 except AttributeError: 200 # Issue #13815: seek() complained about a missing 201 # flush() method. 202 self.fail("seeking failed in text mode") 203 204 # Test if symbolic and hard links are resolved by extractfile(). The 205 # test link members each point to a regular member whose data is 206 # supposed to be exported. 207 def _test_fileobj_link(self, lnktype, regtype): 208 with self.tar.extractfile(lnktype) as a, \ 209 self.tar.extractfile(regtype) as b: 210 self.assertEqual(a.name, b.name) 211 212 def test_fileobj_link1(self): 213 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 214 215 def test_fileobj_link2(self): 216 self._test_fileobj_link("./ustar/linktest2/lnktype", 217 "ustar/linktest1/regtype") 218 219 def test_fileobj_symlink1(self): 220 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 221 222 def test_fileobj_symlink2(self): 223 self._test_fileobj_link("./ustar/linktest2/symtype", 224 "ustar/linktest1/regtype") 225 226 def test_issue14160(self): 227 self._test_fileobj_link("symtype2", "ustar/regtype") 228 229 def test_add_dir_getmember(self): 230 # bpo-21987 231 self.add_dir_and_getmember('bar') 232 self.add_dir_and_getmember('a'*101) 233 234 @unittest.skipUnless(hasattr(os, "getuid") and hasattr(os, "getgid"), 235 "Missing getuid or getgid implementation") 236 def add_dir_and_getmember(self, name): 237 def filter(tarinfo): 238 tarinfo.uid = tarinfo.gid = 100 239 return tarinfo 240 241 with os_helper.temp_cwd(): 242 with tarfile.open(tmpname, 'w') as tar: 243 tar.format = tarfile.USTAR_FORMAT 244 try: 245 os.mkdir(name) 246 tar.add(name, filter=filter) 247 finally: 248 os.rmdir(name) 249 with tarfile.open(tmpname) as tar: 250 self.assertEqual( 251 tar.getmember(name), 252 tar.getmember(name + '/') 253 ) 254 255class GzipUstarReadTest(GzipTest, UstarReadTest): 256 pass 257 258class Bz2UstarReadTest(Bz2Test, UstarReadTest): 259 pass 260 261class LzmaUstarReadTest(LzmaTest, UstarReadTest): 262 pass 263 264 265class ListTest(ReadTest, unittest.TestCase): 266 267 # Override setUp to use default encoding (UTF-8) 268 def setUp(self): 269 self.tar = tarfile.open(self.tarname, mode=self.mode) 270 271 def test_list(self): 272 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 273 with support.swap_attr(sys, 'stdout', tio): 274 self.tar.list(verbose=False) 275 out = tio.detach().getvalue() 276 self.assertIn(b'ustar/conttype', out) 277 self.assertIn(b'ustar/regtype', out) 278 self.assertIn(b'ustar/lnktype', out) 279 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 280 self.assertIn(b'./ustar/linktest2/symtype', out) 281 self.assertIn(b'./ustar/linktest2/lnktype', out) 282 # Make sure it puts trailing slash for directory 283 self.assertIn(b'ustar/dirtype/', out) 284 self.assertIn(b'ustar/dirtype-with-size/', out) 285 # Make sure it is able to print unencodable characters 286 def conv(b): 287 s = b.decode(self.tar.encoding, 'surrogateescape') 288 return s.encode('ascii', 'backslashreplace') 289 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 290 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 291 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 292 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 293 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 294 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 295 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 296 # Make sure it prints files separated by one newline without any 297 # 'ls -l'-like accessories if verbose flag is not being used 298 # ... 299 # ustar/conttype 300 # ustar/regtype 301 # ... 302 self.assertRegex(out, br'ustar/conttype ?\r?\n' 303 br'ustar/regtype ?\r?\n') 304 # Make sure it does not print the source of link without verbose flag 305 self.assertNotIn(b'link to', out) 306 self.assertNotIn(b'->', out) 307 308 def test_list_verbose(self): 309 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 310 with support.swap_attr(sys, 'stdout', tio): 311 self.tar.list(verbose=True) 312 out = tio.detach().getvalue() 313 # Make sure it prints files separated by one newline with 'ls -l'-like 314 # accessories if verbose flag is being used 315 # ... 316 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 317 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 318 # ... 319 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 320 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 321 br'ustar/\w+type ?\r?\n') * 2) 322 # Make sure it prints the source of link with verbose flag 323 self.assertIn(b'ustar/symtype -> regtype', out) 324 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 325 self.assertIn(b'./ustar/linktest2/lnktype link to ' 326 b'./ustar/linktest1/regtype', out) 327 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 328 (b'/123' * 125) + b'/longname', out) 329 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 330 (b'/123' * 125) + b'/longname', out) 331 332 def test_list_members(self): 333 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 334 def members(tar): 335 for tarinfo in tar.getmembers(): 336 if 'reg' in tarinfo.name: 337 yield tarinfo 338 with support.swap_attr(sys, 'stdout', tio): 339 self.tar.list(verbose=False, members=members(self.tar)) 340 out = tio.detach().getvalue() 341 self.assertIn(b'ustar/regtype', out) 342 self.assertNotIn(b'ustar/conttype', out) 343 344 345class GzipListTest(GzipTest, ListTest): 346 pass 347 348 349class Bz2ListTest(Bz2Test, ListTest): 350 pass 351 352 353class LzmaListTest(LzmaTest, ListTest): 354 pass 355 356 357class CommonReadTest(ReadTest): 358 359 def test_is_tarfile_erroneous(self): 360 with open(tmpname, "wb"): 361 pass 362 363 # is_tarfile works on filenames 364 self.assertFalse(tarfile.is_tarfile(tmpname)) 365 366 # is_tarfile works on path-like objects 367 self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname))) 368 369 # is_tarfile works on file objects 370 with open(tmpname, "rb") as fobj: 371 self.assertFalse(tarfile.is_tarfile(fobj)) 372 373 # is_tarfile works on file-like objects 374 self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid"))) 375 376 def test_is_tarfile_valid(self): 377 # is_tarfile works on filenames 378 self.assertTrue(tarfile.is_tarfile(self.tarname)) 379 380 # is_tarfile works on path-like objects 381 self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname))) 382 383 # is_tarfile works on file objects 384 with open(self.tarname, "rb") as fobj: 385 self.assertTrue(tarfile.is_tarfile(fobj)) 386 387 # is_tarfile works on file-like objects 388 with open(self.tarname, "rb") as fobj: 389 self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read()))) 390 391 def test_is_tarfile_keeps_position(self): 392 # Test for issue44289: tarfile.is_tarfile() modifies 393 # file object's current position 394 with open(self.tarname, "rb") as fobj: 395 tarfile.is_tarfile(fobj) 396 self.assertEqual(fobj.tell(), 0) 397 398 with open(self.tarname, "rb") as fobj: 399 file_like = io.BytesIO(fobj.read()) 400 tarfile.is_tarfile(file_like) 401 self.assertEqual(file_like.tell(), 0) 402 403 def test_empty_tarfile(self): 404 # Test for issue6123: Allow opening empty archives. 405 # This test checks if tarfile.open() is able to open an empty tar 406 # archive successfully. Note that an empty tar archive is not the 407 # same as an empty file! 408 with tarfile.open(tmpname, self.mode.replace("r", "w")): 409 pass 410 try: 411 tar = tarfile.open(tmpname, self.mode) 412 tar.getnames() 413 except tarfile.ReadError: 414 self.fail("tarfile.open() failed on empty archive") 415 else: 416 self.assertListEqual(tar.getmembers(), []) 417 finally: 418 tar.close() 419 420 def test_non_existent_tarfile(self): 421 # Test for issue11513: prevent non-existent gzipped tarfiles raising 422 # multiple exceptions. 423 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 424 tarfile.open("xxx", self.mode) 425 426 def test_null_tarfile(self): 427 # Test for issue6123: Allow opening empty archives. 428 # This test guarantees that tarfile.open() does not treat an empty 429 # file as an empty tar archive. 430 with open(tmpname, "wb"): 431 pass 432 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 433 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 434 435 def test_ignore_zeros(self): 436 # Test TarFile's ignore_zeros option. 437 # generate 512 pseudorandom bytes 438 data = Random(0).randbytes(512) 439 for char in (b'\0', b'a'): 440 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 441 # are ignored correctly. 442 with self.open(tmpname, "w") as fobj: 443 fobj.write(char * 1024) 444 tarinfo = tarfile.TarInfo("foo") 445 tarinfo.size = len(data) 446 fobj.write(tarinfo.tobuf()) 447 fobj.write(data) 448 449 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 450 try: 451 self.assertListEqual(tar.getnames(), ["foo"], 452 "ignore_zeros=True should have skipped the %r-blocks" % 453 char) 454 finally: 455 tar.close() 456 457 def test_premature_end_of_archive(self): 458 for size in (512, 600, 1024, 1200): 459 with tarfile.open(tmpname, "w:") as tar: 460 t = tarfile.TarInfo("foo") 461 t.size = 1024 462 tar.addfile(t, io.BytesIO(b"a" * 1024)) 463 464 with open(tmpname, "r+b") as fobj: 465 fobj.truncate(size) 466 467 with tarfile.open(tmpname) as tar: 468 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 469 for t in tar: 470 pass 471 472 with tarfile.open(tmpname) as tar: 473 t = tar.next() 474 475 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 476 tar.extract(t, TEMPDIR, filter='data') 477 478 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 479 tar.extractfile(t).read() 480 481 def test_length_zero_header(self): 482 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 483 # with an exception 484 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 485 with tarfile.open(support.findfile('recursion.tar')) as tar: 486 pass 487 488class MiscReadTestBase(CommonReadTest): 489 def requires_name_attribute(self): 490 pass 491 492 def test_no_name_argument(self): 493 self.requires_name_attribute() 494 with open(self.tarname, "rb") as fobj: 495 self.assertIsInstance(fobj.name, str) 496 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 497 self.assertIsInstance(tar.name, str) 498 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 499 500 def test_no_name_attribute(self): 501 with open(self.tarname, "rb") as fobj: 502 data = fobj.read() 503 fobj = io.BytesIO(data) 504 self.assertRaises(AttributeError, getattr, fobj, "name") 505 tar = tarfile.open(fileobj=fobj, mode=self.mode) 506 self.assertIsNone(tar.name) 507 508 def test_empty_name_attribute(self): 509 with open(self.tarname, "rb") as fobj: 510 data = fobj.read() 511 fobj = io.BytesIO(data) 512 fobj.name = "" 513 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 514 self.assertIsNone(tar.name) 515 516 def test_int_name_attribute(self): 517 # Issue 21044: tarfile.open() should handle fileobj with an integer 518 # 'name' attribute. 519 fd = os.open(self.tarname, os.O_RDONLY) 520 with open(fd, 'rb') as fobj: 521 self.assertIsInstance(fobj.name, int) 522 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 523 self.assertIsNone(tar.name) 524 525 def test_bytes_name_attribute(self): 526 self.requires_name_attribute() 527 tarname = os.fsencode(self.tarname) 528 with open(tarname, 'rb') as fobj: 529 self.assertIsInstance(fobj.name, bytes) 530 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 531 self.assertIsInstance(tar.name, bytes) 532 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 533 534 def test_pathlike_name(self): 535 tarname = pathlib.Path(self.tarname) 536 with tarfile.open(tarname, mode=self.mode) as tar: 537 self.assertIsInstance(tar.name, str) 538 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 539 with self.taropen(tarname) as tar: 540 self.assertIsInstance(tar.name, str) 541 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 542 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 543 self.assertIsInstance(tar.name, str) 544 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 545 if self.suffix == '': 546 with tarfile.TarFile(tarname, mode='r') as tar: 547 self.assertIsInstance(tar.name, str) 548 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 549 550 def test_illegal_mode_arg(self): 551 with open(tmpname, 'wb'): 552 pass 553 with self.assertRaisesRegex(ValueError, 'mode must be '): 554 tar = self.taropen(tmpname, 'q') 555 with self.assertRaisesRegex(ValueError, 'mode must be '): 556 tar = self.taropen(tmpname, 'rw') 557 with self.assertRaisesRegex(ValueError, 'mode must be '): 558 tar = self.taropen(tmpname, '') 559 560 def test_fileobj_with_offset(self): 561 # Skip the first member and store values from the second member 562 # of the testtar. 563 tar = tarfile.open(self.tarname, mode=self.mode) 564 try: 565 tar.next() 566 t = tar.next() 567 name = t.name 568 offset = t.offset 569 with tar.extractfile(t) as f: 570 data = f.read() 571 finally: 572 tar.close() 573 574 # Open the testtar and seek to the offset of the second member. 575 with self.open(self.tarname) as fobj: 576 fobj.seek(offset) 577 578 # Test if the tarfile starts with the second member. 579 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 580 t = tar.next() 581 self.assertEqual(t.name, name) 582 # Read to the end of fileobj and test if seeking back to the 583 # beginning works. 584 tar.getmembers() 585 self.assertEqual(tar.extractfile(t).read(), data, 586 "seek back did not work") 587 588 def test_fail_comp(self): 589 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 590 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 591 with open(tarname, "rb") as fobj: 592 self.assertRaises(tarfile.ReadError, tarfile.open, 593 fileobj=fobj, mode=self.mode) 594 595 def test_v7_dirtype(self): 596 # Test old style dirtype member (bug #1336623): 597 # Old V7 tars create directory members using an AREGTYPE 598 # header with a "/" appended to the filename field. 599 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 600 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 601 "v7 dirtype failed") 602 603 def test_xstar_type(self): 604 # The xstar format stores extra atime and ctime fields inside the 605 # space reserved for the prefix field. The prefix field must be 606 # ignored in this case, otherwise it will mess up the name. 607 try: 608 self.tar.getmember("misc/regtype-xstar") 609 except KeyError: 610 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 611 612 def test_check_members(self): 613 for tarinfo in self.tar: 614 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 615 "wrong mtime for %s" % tarinfo.name) 616 if not tarinfo.name.startswith("ustar/"): 617 continue 618 self.assertEqual(tarinfo.uname, "tarfile", 619 "wrong uname for %s" % tarinfo.name) 620 621 def test_find_members(self): 622 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 623 "could not find all members") 624 625 @unittest.skipUnless(hasattr(os, "link"), 626 "Missing hardlink implementation") 627 @os_helper.skip_unless_symlink 628 def test_extract_hardlink(self): 629 # Test hardlink extraction (e.g. bug #857297). 630 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 631 tar.extract("ustar/regtype", TEMPDIR, filter='data') 632 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 633 634 tar.extract("ustar/lnktype", TEMPDIR, filter='data') 635 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 636 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 637 data = f.read() 638 self.assertEqual(sha256sum(data), sha256_regtype) 639 640 tar.extract("ustar/symtype", TEMPDIR, filter='data') 641 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 642 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 643 data = f.read() 644 self.assertEqual(sha256sum(data), sha256_regtype) 645 646 @os_helper.skip_unless_working_chmod 647 def test_extractall(self): 648 # Test if extractall() correctly restores directory permissions 649 # and times (see issue1735). 650 tar = tarfile.open(tarname, encoding="iso8859-1") 651 DIR = os.path.join(TEMPDIR, "extractall") 652 os.mkdir(DIR) 653 try: 654 directories = [t for t in tar if t.isdir()] 655 tar.extractall(DIR, directories, filter='fully_trusted') 656 for tarinfo in directories: 657 path = os.path.join(DIR, tarinfo.name) 658 if sys.platform != "win32": 659 # Win32 has no support for fine grained permissions. 660 self.assertEqual(tarinfo.mode & 0o777, 661 os.stat(path).st_mode & 0o777, 662 tarinfo.name) 663 def format_mtime(mtime): 664 if isinstance(mtime, float): 665 return "{} ({})".format(mtime, mtime.hex()) 666 else: 667 return "{!r} (int)".format(mtime) 668 file_mtime = os.path.getmtime(path) 669 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 670 format_mtime(tarinfo.mtime), 671 format_mtime(file_mtime), 672 path) 673 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 674 finally: 675 tar.close() 676 os_helper.rmtree(DIR) 677 678 @os_helper.skip_unless_working_chmod 679 def test_extract_directory(self): 680 dirtype = "ustar/dirtype" 681 DIR = os.path.join(TEMPDIR, "extractdir") 682 os.mkdir(DIR) 683 try: 684 with tarfile.open(tarname, encoding="iso8859-1") as tar: 685 tarinfo = tar.getmember(dirtype) 686 tar.extract(tarinfo, path=DIR, filter='fully_trusted') 687 extracted = os.path.join(DIR, dirtype) 688 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 689 if sys.platform != "win32": 690 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 691 finally: 692 os_helper.rmtree(DIR) 693 694 def test_extractall_pathlike_name(self): 695 DIR = pathlib.Path(TEMPDIR) / "extractall" 696 with os_helper.temp_dir(DIR), \ 697 tarfile.open(tarname, encoding="iso8859-1") as tar: 698 directories = [t for t in tar if t.isdir()] 699 tar.extractall(DIR, directories, filter='fully_trusted') 700 for tarinfo in directories: 701 path = DIR / tarinfo.name 702 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 703 704 def test_extract_pathlike_name(self): 705 dirtype = "ustar/dirtype" 706 DIR = pathlib.Path(TEMPDIR) / "extractall" 707 with os_helper.temp_dir(DIR), \ 708 tarfile.open(tarname, encoding="iso8859-1") as tar: 709 tarinfo = tar.getmember(dirtype) 710 tar.extract(tarinfo, path=DIR, filter='fully_trusted') 711 extracted = DIR / dirtype 712 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 713 714 def test_init_close_fobj(self): 715 # Issue #7341: Close the internal file object in the TarFile 716 # constructor in case of an error. For the test we rely on 717 # the fact that opening an empty file raises a ReadError. 718 empty = os.path.join(TEMPDIR, "empty") 719 with open(empty, "wb") as fobj: 720 fobj.write(b"") 721 722 try: 723 tar = object.__new__(tarfile.TarFile) 724 try: 725 tar.__init__(empty) 726 except tarfile.ReadError: 727 self.assertTrue(tar.fileobj.closed) 728 else: 729 self.fail("ReadError not raised") 730 finally: 731 os_helper.unlink(empty) 732 733 def test_parallel_iteration(self): 734 # Issue #16601: Restarting iteration over tarfile continued 735 # from where it left off. 736 with tarfile.open(self.tarname) as tar: 737 for m1, m2 in zip(tar, tar): 738 self.assertEqual(m1.offset, m2.offset) 739 self.assertEqual(m1.get_info(), m2.get_info()) 740 741 @unittest.skipIf(zlib is None, "requires zlib") 742 def test_zlib_error_does_not_leak(self): 743 # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when 744 # parsing certain types of invalid data 745 with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock: 746 mock.side_effect = zlib.error 747 with self.assertRaises(tarfile.ReadError): 748 tarfile.open(self.tarname) 749 750 def test_next_on_empty_tarfile(self): 751 fd = io.BytesIO() 752 tf = tarfile.open(fileobj=fd, mode="w") 753 tf.close() 754 755 fd.seek(0) 756 with tarfile.open(fileobj=fd, mode="r|") as tf: 757 self.assertEqual(tf.next(), None) 758 759 fd.seek(0) 760 with tarfile.open(fileobj=fd, mode="r") as tf: 761 self.assertEqual(tf.next(), None) 762 763class MiscReadTest(MiscReadTestBase, unittest.TestCase): 764 test_fail_comp = None 765 766class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 767 pass 768 769class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 770 def requires_name_attribute(self): 771 self.skipTest("BZ2File have no name attribute") 772 773class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 774 def requires_name_attribute(self): 775 self.skipTest("LZMAFile have no name attribute") 776 777 778class StreamReadTest(CommonReadTest, unittest.TestCase): 779 780 prefix="r|" 781 782 def test_read_through(self): 783 # Issue #11224: A poorly designed _FileInFile.read() method 784 # caused seeking errors with stream tar files. 785 for tarinfo in self.tar: 786 if not tarinfo.isreg(): 787 continue 788 with self.tar.extractfile(tarinfo) as fobj: 789 while True: 790 try: 791 buf = fobj.read(512) 792 except tarfile.StreamError: 793 self.fail("simple read-through using " 794 "TarFile.extractfile() failed") 795 if not buf: 796 break 797 798 def test_fileobj_regular_file(self): 799 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 800 with self.tar.extractfile(tarinfo) as fobj: 801 data = fobj.read() 802 self.assertEqual(len(data), tarinfo.size, 803 "regular file extraction failed") 804 self.assertEqual(sha256sum(data), sha256_regtype, 805 "regular file extraction failed") 806 807 def test_provoke_stream_error(self): 808 tarinfos = self.tar.getmembers() 809 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 810 self.assertRaises(tarfile.StreamError, f.read) 811 812 def test_compare_members(self): 813 tar1 = tarfile.open(tarname, encoding="iso8859-1") 814 try: 815 tar2 = self.tar 816 817 while True: 818 t1 = tar1.next() 819 t2 = tar2.next() 820 if t1 is None: 821 break 822 self.assertIsNotNone(t2, "stream.next() failed.") 823 824 if t2.islnk() or t2.issym(): 825 with self.assertRaises(tarfile.StreamError): 826 tar2.extractfile(t2) 827 continue 828 829 v1 = tar1.extractfile(t1) 830 v2 = tar2.extractfile(t2) 831 if v1 is None: 832 continue 833 self.assertIsNotNone(v2, "stream.extractfile() failed") 834 self.assertEqual(v1.read(), v2.read(), 835 "stream extraction failed") 836 finally: 837 tar1.close() 838 839class GzipStreamReadTest(GzipTest, StreamReadTest): 840 pass 841 842class Bz2StreamReadTest(Bz2Test, StreamReadTest): 843 pass 844 845class LzmaStreamReadTest(LzmaTest, StreamReadTest): 846 pass 847 848 849class DetectReadTest(TarTest, unittest.TestCase): 850 def _testfunc_file(self, name, mode): 851 try: 852 tar = tarfile.open(name, mode) 853 except tarfile.ReadError as e: 854 self.fail() 855 else: 856 tar.close() 857 858 def _testfunc_fileobj(self, name, mode): 859 try: 860 with open(name, "rb") as f: 861 tar = tarfile.open(name, mode, fileobj=f) 862 except tarfile.ReadError as e: 863 self.fail() 864 else: 865 tar.close() 866 867 def _test_modes(self, testfunc): 868 if self.suffix: 869 with self.assertRaises(tarfile.ReadError): 870 tarfile.open(tarname, mode="r:" + self.suffix) 871 with self.assertRaises(tarfile.ReadError): 872 tarfile.open(tarname, mode="r|" + self.suffix) 873 with self.assertRaises(tarfile.ReadError): 874 tarfile.open(self.tarname, mode="r:") 875 with self.assertRaises(tarfile.ReadError): 876 tarfile.open(self.tarname, mode="r|") 877 testfunc(self.tarname, "r") 878 testfunc(self.tarname, "r:" + self.suffix) 879 testfunc(self.tarname, "r:*") 880 testfunc(self.tarname, "r|" + self.suffix) 881 testfunc(self.tarname, "r|*") 882 883 def test_detect_file(self): 884 self._test_modes(self._testfunc_file) 885 886 def test_detect_fileobj(self): 887 self._test_modes(self._testfunc_fileobj) 888 889class GzipDetectReadTest(GzipTest, DetectReadTest): 890 pass 891 892class Bz2DetectReadTest(Bz2Test, DetectReadTest): 893 def test_detect_stream_bz2(self): 894 # Originally, tarfile's stream detection looked for the string 895 # "BZh91" at the start of the file. This is incorrect because 896 # the '9' represents the blocksize (900,000 bytes). If the file was 897 # compressed using another blocksize autodetection fails. 898 with open(tarname, "rb") as fobj: 899 data = fobj.read() 900 901 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 902 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 903 fobj.write(data) 904 905 self._testfunc_file(tmpname, "r|*") 906 907class LzmaDetectReadTest(LzmaTest, DetectReadTest): 908 pass 909 910 911class MemberReadTest(ReadTest, unittest.TestCase): 912 913 def _test_member(self, tarinfo, chksum=None, **kwargs): 914 if chksum is not None: 915 with self.tar.extractfile(tarinfo) as f: 916 self.assertEqual(sha256sum(f.read()), chksum, 917 "wrong sha256sum for %s" % tarinfo.name) 918 919 kwargs["mtime"] = 0o7606136617 920 kwargs["uid"] = 1000 921 kwargs["gid"] = 100 922 if "old-v7" not in tarinfo.name: 923 # V7 tar can't handle alphabetic owners. 924 kwargs["uname"] = "tarfile" 925 kwargs["gname"] = "tarfile" 926 for k, v in kwargs.items(): 927 self.assertEqual(getattr(tarinfo, k), v, 928 "wrong value in %s field of %s" % (k, tarinfo.name)) 929 930 def test_find_regtype(self): 931 tarinfo = self.tar.getmember("ustar/regtype") 932 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 933 934 def test_find_conttype(self): 935 tarinfo = self.tar.getmember("ustar/conttype") 936 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 937 938 def test_find_dirtype(self): 939 tarinfo = self.tar.getmember("ustar/dirtype") 940 self._test_member(tarinfo, size=0) 941 942 def test_find_dirtype_with_size(self): 943 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 944 self._test_member(tarinfo, size=255) 945 946 def test_find_lnktype(self): 947 tarinfo = self.tar.getmember("ustar/lnktype") 948 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 949 950 def test_find_symtype(self): 951 tarinfo = self.tar.getmember("ustar/symtype") 952 self._test_member(tarinfo, size=0, linkname="regtype") 953 954 def test_find_blktype(self): 955 tarinfo = self.tar.getmember("ustar/blktype") 956 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 957 958 def test_find_chrtype(self): 959 tarinfo = self.tar.getmember("ustar/chrtype") 960 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 961 962 def test_find_fifotype(self): 963 tarinfo = self.tar.getmember("ustar/fifotype") 964 self._test_member(tarinfo, size=0) 965 966 def test_find_sparse(self): 967 tarinfo = self.tar.getmember("ustar/sparse") 968 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 969 970 def test_find_gnusparse(self): 971 tarinfo = self.tar.getmember("gnu/sparse") 972 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 973 974 def test_find_gnusparse_00(self): 975 tarinfo = self.tar.getmember("gnu/sparse-0.0") 976 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 977 978 def test_find_gnusparse_01(self): 979 tarinfo = self.tar.getmember("gnu/sparse-0.1") 980 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 981 982 def test_find_gnusparse_10(self): 983 tarinfo = self.tar.getmember("gnu/sparse-1.0") 984 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 985 986 def test_find_umlauts(self): 987 tarinfo = self.tar.getmember("ustar/umlauts-" 988 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 989 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 990 991 def test_find_ustar_longname(self): 992 name = "ustar/" + "12345/" * 39 + "1234567/longname" 993 self.assertIn(name, self.tar.getnames()) 994 995 def test_find_regtype_oldv7(self): 996 tarinfo = self.tar.getmember("misc/regtype-old-v7") 997 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 998 999 def test_find_pax_umlauts(self): 1000 self.tar.close() 1001 self.tar = tarfile.open(self.tarname, mode=self.mode, 1002 encoding="iso8859-1") 1003 tarinfo = self.tar.getmember("pax/umlauts-" 1004 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1005 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 1006 1007 1008class LongnameTest: 1009 1010 def test_read_longname(self): 1011 # Test reading of longname (bug #1471427). 1012 longname = self.subdir + "/" + "123/" * 125 + "longname" 1013 try: 1014 tarinfo = self.tar.getmember(longname) 1015 except KeyError: 1016 self.fail("longname not found") 1017 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 1018 "read longname as dirtype") 1019 1020 def test_read_longlink(self): 1021 longname = self.subdir + "/" + "123/" * 125 + "longname" 1022 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 1023 try: 1024 tarinfo = self.tar.getmember(longlink) 1025 except KeyError: 1026 self.fail("longlink not found") 1027 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 1028 1029 def test_truncated_longname(self): 1030 longname = self.subdir + "/" + "123/" * 125 + "longname" 1031 tarinfo = self.tar.getmember(longname) 1032 offset = tarinfo.offset 1033 self.tar.fileobj.seek(offset) 1034 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 1035 with self.assertRaises(tarfile.ReadError): 1036 tarfile.open(name="foo.tar", fileobj=fobj) 1037 1038 def test_header_offset(self): 1039 # Test if the start offset of the TarInfo object includes 1040 # the preceding extended header. 1041 longname = self.subdir + "/" + "123/" * 125 + "longname" 1042 offset = self.tar.getmember(longname).offset 1043 with open(tarname, "rb") as fobj: 1044 fobj.seek(offset) 1045 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 1046 "iso8859-1", "strict") 1047 self.assertEqual(tarinfo.type, self.longnametype) 1048 1049 def test_longname_directory(self): 1050 # Test reading a longlink directory. Issue #47231. 1051 longdir = ('a' * 101) + '/' 1052 with os_helper.temp_cwd(): 1053 with tarfile.open(tmpname, 'w') as tar: 1054 tar.format = self.format 1055 try: 1056 os.mkdir(longdir) 1057 tar.add(longdir) 1058 finally: 1059 os.rmdir(longdir.rstrip("/")) 1060 with tarfile.open(tmpname) as tar: 1061 self.assertIsNotNone(tar.getmember(longdir)) 1062 self.assertIsNotNone(tar.getmember(longdir.removesuffix('/'))) 1063 1064class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 1065 1066 subdir = "gnu" 1067 longnametype = tarfile.GNUTYPE_LONGNAME 1068 format = tarfile.GNU_FORMAT 1069 1070 # Since 3.2 tarfile is supposed to accurately restore sparse members and 1071 # produce files with holes. This is what we actually want to test here. 1072 # Unfortunately, not all platforms/filesystems support sparse files, and 1073 # even on platforms that do it is non-trivial to make reliable assertions 1074 # about holes in files. Therefore, we first do one basic test which works 1075 # an all platforms, and after that a test that will work only on 1076 # platforms/filesystems that prove to support sparse files. 1077 def _test_sparse_file(self, name): 1078 self.tar.extract(name, TEMPDIR, filter='data') 1079 filename = os.path.join(TEMPDIR, name) 1080 with open(filename, "rb") as fobj: 1081 data = fobj.read() 1082 self.assertEqual(sha256sum(data), sha256_sparse, 1083 "wrong sha256sum for %s" % name) 1084 1085 if self._fs_supports_holes(): 1086 s = os.stat(filename) 1087 self.assertLess(s.st_blocks * 512, s.st_size) 1088 1089 def test_sparse_file_old(self): 1090 self._test_sparse_file("gnu/sparse") 1091 1092 def test_sparse_file_00(self): 1093 self._test_sparse_file("gnu/sparse-0.0") 1094 1095 def test_sparse_file_01(self): 1096 self._test_sparse_file("gnu/sparse-0.1") 1097 1098 def test_sparse_file_10(self): 1099 self._test_sparse_file("gnu/sparse-1.0") 1100 1101 @staticmethod 1102 def _fs_supports_holes(): 1103 # Return True if the platform knows the st_blocks stat attribute and 1104 # uses st_blocks units of 512 bytes, and if the filesystem is able to 1105 # store holes of 4 KiB in files. 1106 # 1107 # The function returns False if page size is larger than 4 KiB. 1108 # For example, ppc64 uses pages of 64 KiB. 1109 if sys.platform.startswith("linux"): 1110 # Linux evidentially has 512 byte st_blocks units. 1111 name = os.path.join(TEMPDIR, "sparse-test") 1112 with open(name, "wb") as fobj: 1113 # Seek to "punch a hole" of 4 KiB 1114 fobj.seek(4096) 1115 fobj.write(b'x' * 4096) 1116 fobj.truncate() 1117 s = os.stat(name) 1118 os_helper.unlink(name) 1119 return (s.st_blocks * 512 < s.st_size) 1120 else: 1121 return False 1122 1123 1124class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1125 1126 subdir = "pax" 1127 longnametype = tarfile.XHDTYPE 1128 format = tarfile.PAX_FORMAT 1129 1130 def test_pax_global_headers(self): 1131 tar = tarfile.open(tarname, encoding="iso8859-1") 1132 try: 1133 tarinfo = tar.getmember("pax/regtype1") 1134 self.assertEqual(tarinfo.uname, "foo") 1135 self.assertEqual(tarinfo.gname, "bar") 1136 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1137 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1138 1139 tarinfo = tar.getmember("pax/regtype2") 1140 self.assertEqual(tarinfo.uname, "") 1141 self.assertEqual(tarinfo.gname, "bar") 1142 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1143 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1144 1145 tarinfo = tar.getmember("pax/regtype3") 1146 self.assertEqual(tarinfo.uname, "tarfile") 1147 self.assertEqual(tarinfo.gname, "tarfile") 1148 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1149 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1150 finally: 1151 tar.close() 1152 1153 def test_pax_number_fields(self): 1154 # All following number fields are read from the pax header. 1155 tar = tarfile.open(tarname, encoding="iso8859-1") 1156 try: 1157 tarinfo = tar.getmember("pax/regtype4") 1158 self.assertEqual(tarinfo.size, 7011) 1159 self.assertEqual(tarinfo.uid, 123) 1160 self.assertEqual(tarinfo.gid, 123) 1161 self.assertEqual(tarinfo.mtime, 1041808783.0) 1162 self.assertEqual(type(tarinfo.mtime), float) 1163 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1164 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1165 finally: 1166 tar.close() 1167 1168 1169class WriteTestBase(TarTest): 1170 # Put all write tests in here that are supposed to be tested 1171 # in all possible mode combinations. 1172 1173 def test_fileobj_no_close(self): 1174 fobj = io.BytesIO() 1175 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1176 tar.addfile(tarfile.TarInfo("foo")) 1177 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1178 # Issue #20238: Incomplete gzip output with mode="w:gz" 1179 data = fobj.getvalue() 1180 del tar 1181 support.gc_collect() 1182 self.assertFalse(fobj.closed) 1183 self.assertEqual(data, fobj.getvalue()) 1184 1185 def test_eof_marker(self): 1186 # Make sure an end of archive marker is written (two zero blocks). 1187 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1188 # So, we create an archive that has exactly 10240 bytes without the 1189 # marker, and has 20480 bytes once the marker is written. 1190 with tarfile.open(tmpname, self.mode) as tar: 1191 t = tarfile.TarInfo("foo") 1192 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1193 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1194 1195 with self.open(tmpname, "rb") as fobj: 1196 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1197 1198 1199class WriteTest(WriteTestBase, unittest.TestCase): 1200 1201 prefix = "w:" 1202 1203 def test_100_char_name(self): 1204 # The name field in a tar header stores strings of at most 100 chars. 1205 # If a string is shorter than 100 chars it has to be padded with '\0', 1206 # which implies that a string of exactly 100 chars is stored without 1207 # a trailing '\0'. 1208 name = "0123456789" * 10 1209 tar = tarfile.open(tmpname, self.mode) 1210 try: 1211 t = tarfile.TarInfo(name) 1212 tar.addfile(t) 1213 finally: 1214 tar.close() 1215 1216 tar = tarfile.open(tmpname) 1217 try: 1218 self.assertEqual(tar.getnames()[0], name, 1219 "failed to store 100 char filename") 1220 finally: 1221 tar.close() 1222 1223 def test_tar_size(self): 1224 # Test for bug #1013882. 1225 tar = tarfile.open(tmpname, self.mode) 1226 try: 1227 path = os.path.join(TEMPDIR, "file") 1228 with open(path, "wb") as fobj: 1229 fobj.write(b"aaa") 1230 tar.add(path) 1231 finally: 1232 tar.close() 1233 self.assertGreater(os.path.getsize(tmpname), 0, 1234 "tarfile is empty") 1235 1236 # The test_*_size tests test for bug #1167128. 1237 def test_file_size(self): 1238 tar = tarfile.open(tmpname, self.mode) 1239 try: 1240 path = os.path.join(TEMPDIR, "file") 1241 with open(path, "wb"): 1242 pass 1243 tarinfo = tar.gettarinfo(path) 1244 self.assertEqual(tarinfo.size, 0) 1245 1246 with open(path, "wb") as fobj: 1247 fobj.write(b"aaa") 1248 tarinfo = tar.gettarinfo(path) 1249 self.assertEqual(tarinfo.size, 3) 1250 finally: 1251 tar.close() 1252 1253 def test_directory_size(self): 1254 path = os.path.join(TEMPDIR, "directory") 1255 os.mkdir(path) 1256 try: 1257 tar = tarfile.open(tmpname, self.mode) 1258 try: 1259 tarinfo = tar.gettarinfo(path) 1260 self.assertEqual(tarinfo.size, 0) 1261 finally: 1262 tar.close() 1263 finally: 1264 os_helper.rmdir(path) 1265 1266 # mock the following: 1267 # os.listdir: so we know that files are in the wrong order 1268 def test_ordered_recursion(self): 1269 path = os.path.join(TEMPDIR, "directory") 1270 os.mkdir(path) 1271 open(os.path.join(path, "1"), "a").close() 1272 open(os.path.join(path, "2"), "a").close() 1273 try: 1274 tar = tarfile.open(tmpname, self.mode) 1275 try: 1276 with unittest.mock.patch('os.listdir') as mock_listdir: 1277 mock_listdir.return_value = ["2", "1"] 1278 tar.add(path) 1279 paths = [] 1280 for m in tar.getmembers(): 1281 paths.append(os.path.split(m.name)[-1]) 1282 self.assertEqual(paths, ["directory", "1", "2"]); 1283 finally: 1284 tar.close() 1285 finally: 1286 os_helper.unlink(os.path.join(path, "1")) 1287 os_helper.unlink(os.path.join(path, "2")) 1288 os_helper.rmdir(path) 1289 1290 def test_gettarinfo_pathlike_name(self): 1291 with tarfile.open(tmpname, self.mode) as tar: 1292 path = pathlib.Path(TEMPDIR) / "file" 1293 with open(path, "wb") as fobj: 1294 fobj.write(b"aaa") 1295 tarinfo = tar.gettarinfo(path) 1296 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1297 self.assertIsInstance(tarinfo.name, str) 1298 self.assertEqual(tarinfo.name, tarinfo2.name) 1299 self.assertEqual(tarinfo.size, 3) 1300 1301 @unittest.skipUnless(hasattr(os, "link"), 1302 "Missing hardlink implementation") 1303 def test_link_size(self): 1304 link = os.path.join(TEMPDIR, "link") 1305 target = os.path.join(TEMPDIR, "link_target") 1306 with open(target, "wb") as fobj: 1307 fobj.write(b"aaa") 1308 try: 1309 os.link(target, link) 1310 except PermissionError as e: 1311 self.skipTest('os.link(): %s' % e) 1312 try: 1313 tar = tarfile.open(tmpname, self.mode) 1314 try: 1315 # Record the link target in the inodes list. 1316 tar.gettarinfo(target) 1317 tarinfo = tar.gettarinfo(link) 1318 self.assertEqual(tarinfo.size, 0) 1319 finally: 1320 tar.close() 1321 finally: 1322 os_helper.unlink(target) 1323 os_helper.unlink(link) 1324 1325 @os_helper.skip_unless_symlink 1326 def test_symlink_size(self): 1327 path = os.path.join(TEMPDIR, "symlink") 1328 os.symlink("link_target", path) 1329 try: 1330 tar = tarfile.open(tmpname, self.mode) 1331 try: 1332 tarinfo = tar.gettarinfo(path) 1333 self.assertEqual(tarinfo.size, 0) 1334 finally: 1335 tar.close() 1336 finally: 1337 os_helper.unlink(path) 1338 1339 def test_add_self(self): 1340 # Test for #1257255. 1341 dstname = os.path.abspath(tmpname) 1342 tar = tarfile.open(tmpname, self.mode) 1343 try: 1344 self.assertEqual(tar.name, dstname, 1345 "archive name must be absolute") 1346 tar.add(dstname) 1347 self.assertEqual(tar.getnames(), [], 1348 "added the archive to itself") 1349 1350 with os_helper.change_cwd(TEMPDIR): 1351 tar.add(dstname) 1352 self.assertEqual(tar.getnames(), [], 1353 "added the archive to itself") 1354 finally: 1355 tar.close() 1356 1357 def test_filter(self): 1358 tempdir = os.path.join(TEMPDIR, "filter") 1359 os.mkdir(tempdir) 1360 try: 1361 for name in ("foo", "bar", "baz"): 1362 name = os.path.join(tempdir, name) 1363 os_helper.create_empty_file(name) 1364 1365 def filter(tarinfo): 1366 if os.path.basename(tarinfo.name) == "bar": 1367 return 1368 tarinfo.uid = 123 1369 tarinfo.uname = "foo" 1370 return tarinfo 1371 1372 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1373 try: 1374 tar.add(tempdir, arcname="empty_dir", filter=filter) 1375 finally: 1376 tar.close() 1377 1378 # Verify that filter is a keyword-only argument 1379 with self.assertRaises(TypeError): 1380 tar.add(tempdir, "empty_dir", True, None, filter) 1381 1382 tar = tarfile.open(tmpname, "r") 1383 try: 1384 for tarinfo in tar: 1385 self.assertEqual(tarinfo.uid, 123) 1386 self.assertEqual(tarinfo.uname, "foo") 1387 self.assertEqual(len(tar.getmembers()), 3) 1388 finally: 1389 tar.close() 1390 finally: 1391 os_helper.rmtree(tempdir) 1392 1393 # Guarantee that stored pathnames are not modified. Don't 1394 # remove ./ or ../ or double slashes. Still make absolute 1395 # pathnames relative. 1396 # For details see bug #6054. 1397 def _test_pathname(self, path, cmp_path=None, dir=False): 1398 # Create a tarfile with an empty member named path 1399 # and compare the stored name with the original. 1400 foo = os.path.join(TEMPDIR, "foo") 1401 if not dir: 1402 os_helper.create_empty_file(foo) 1403 else: 1404 os.mkdir(foo) 1405 1406 tar = tarfile.open(tmpname, self.mode) 1407 try: 1408 tar.add(foo, arcname=path) 1409 finally: 1410 tar.close() 1411 1412 tar = tarfile.open(tmpname, "r") 1413 try: 1414 t = tar.next() 1415 finally: 1416 tar.close() 1417 1418 if not dir: 1419 os_helper.unlink(foo) 1420 else: 1421 os_helper.rmdir(foo) 1422 1423 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1424 1425 1426 @os_helper.skip_unless_symlink 1427 def test_extractall_symlinks(self): 1428 # Test if extractall works properly when tarfile contains symlinks 1429 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1430 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1431 os.mkdir(tempdir) 1432 try: 1433 source_file = os.path.join(tempdir,'source') 1434 target_file = os.path.join(tempdir,'symlink') 1435 with open(source_file,'w') as f: 1436 f.write('something\n') 1437 os.symlink(source_file, target_file) 1438 with tarfile.open(temparchive, 'w') as tar: 1439 tar.add(source_file, arcname="source") 1440 tar.add(target_file, arcname="symlink") 1441 # Let's extract it to the location which contains the symlink 1442 with tarfile.open(temparchive, errorlevel=2) as tar: 1443 # this should not raise OSError: [Errno 17] File exists 1444 try: 1445 tar.extractall(path=tempdir, 1446 filter='fully_trusted') 1447 except OSError: 1448 self.fail("extractall failed with symlinked files") 1449 finally: 1450 os_helper.unlink(temparchive) 1451 os_helper.rmtree(tempdir) 1452 1453 def test_pathnames(self): 1454 self._test_pathname("foo") 1455 self._test_pathname(os.path.join("foo", ".", "bar")) 1456 self._test_pathname(os.path.join("foo", "..", "bar")) 1457 self._test_pathname(os.path.join(".", "foo")) 1458 self._test_pathname(os.path.join(".", "foo", ".")) 1459 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1460 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1461 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1462 self._test_pathname(os.path.join("..", "foo")) 1463 self._test_pathname(os.path.join("..", "foo", "..")) 1464 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1465 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1466 1467 self._test_pathname("foo" + os.sep + os.sep + "bar") 1468 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1469 1470 def test_abs_pathnames(self): 1471 if sys.platform == "win32": 1472 self._test_pathname("C:\\foo", "foo") 1473 else: 1474 self._test_pathname("/foo", "foo") 1475 self._test_pathname("///foo", "foo") 1476 1477 def test_cwd(self): 1478 # Test adding the current working directory. 1479 with os_helper.change_cwd(TEMPDIR): 1480 tar = tarfile.open(tmpname, self.mode) 1481 try: 1482 tar.add(".") 1483 finally: 1484 tar.close() 1485 1486 tar = tarfile.open(tmpname, "r") 1487 try: 1488 for t in tar: 1489 if t.name != ".": 1490 self.assertTrue(t.name.startswith("./"), t.name) 1491 finally: 1492 tar.close() 1493 1494 def test_open_nonwritable_fileobj(self): 1495 for exctype in OSError, EOFError, RuntimeError: 1496 class BadFile(io.BytesIO): 1497 first = True 1498 def write(self, data): 1499 if self.first: 1500 self.first = False 1501 raise exctype 1502 1503 f = BadFile() 1504 with self.assertRaises(exctype): 1505 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1506 format=tarfile.PAX_FORMAT, 1507 pax_headers={'non': 'empty'}) 1508 self.assertFalse(f.closed) 1509 1510 1511class GzipWriteTest(GzipTest, WriteTest): 1512 pass 1513 1514 1515class Bz2WriteTest(Bz2Test, WriteTest): 1516 pass 1517 1518 1519class LzmaWriteTest(LzmaTest, WriteTest): 1520 pass 1521 1522 1523class StreamWriteTest(WriteTestBase, unittest.TestCase): 1524 1525 prefix = "w|" 1526 decompressor = None 1527 1528 def test_stream_padding(self): 1529 # Test for bug #1543303. 1530 tar = tarfile.open(tmpname, self.mode) 1531 tar.close() 1532 if self.decompressor: 1533 dec = self.decompressor() 1534 with open(tmpname, "rb") as fobj: 1535 data = fobj.read() 1536 data = dec.decompress(data) 1537 self.assertFalse(dec.unused_data, "found trailing data") 1538 else: 1539 with self.open(tmpname) as fobj: 1540 data = fobj.read() 1541 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1542 "incorrect zero padding") 1543 1544 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1545 "Missing umask implementation") 1546 @unittest.skipIf( 1547 support.is_emscripten or support.is_wasi, 1548 "Emscripten's/WASI's umask is a stub." 1549 ) 1550 def test_file_mode(self): 1551 # Test for issue #8464: Create files with correct 1552 # permissions. 1553 if os.path.exists(tmpname): 1554 os_helper.unlink(tmpname) 1555 1556 original_umask = os.umask(0o022) 1557 try: 1558 tar = tarfile.open(tmpname, self.mode) 1559 tar.close() 1560 mode = os.stat(tmpname).st_mode & 0o777 1561 self.assertEqual(mode, 0o644, "wrong file permissions") 1562 finally: 1563 os.umask(original_umask) 1564 1565 1566class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1567 def test_source_directory_not_leaked(self): 1568 """ 1569 Ensure the source directory is not included in the tar header 1570 per bpo-41316. 1571 """ 1572 tarfile.open(tmpname, self.mode).close() 1573 payload = pathlib.Path(tmpname).read_text(encoding='latin-1') 1574 assert os.path.dirname(tmpname) not in payload 1575 1576 1577class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1578 decompressor = bz2.BZ2Decompressor if bz2 else None 1579 1580class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1581 decompressor = lzma.LZMADecompressor if lzma else None 1582 1583 1584class GNUWriteTest(unittest.TestCase): 1585 # This testcase checks for correct creation of GNU Longname 1586 # and Longlink extended headers (cp. bug #812325). 1587 1588 def _length(self, s): 1589 blocks = len(s) // 512 + 1 1590 return blocks * 512 1591 1592 def _calc_size(self, name, link=None): 1593 # Initial tar header 1594 count = 512 1595 1596 if len(name) > tarfile.LENGTH_NAME: 1597 # GNU longname extended header + longname 1598 count += 512 1599 count += self._length(name) 1600 if link is not None and len(link) > tarfile.LENGTH_LINK: 1601 # GNU longlink extended header + longlink 1602 count += 512 1603 count += self._length(link) 1604 return count 1605 1606 def _test(self, name, link=None): 1607 tarinfo = tarfile.TarInfo(name) 1608 if link: 1609 tarinfo.linkname = link 1610 tarinfo.type = tarfile.LNKTYPE 1611 1612 tar = tarfile.open(tmpname, "w") 1613 try: 1614 tar.format = tarfile.GNU_FORMAT 1615 tar.addfile(tarinfo) 1616 1617 v1 = self._calc_size(name, link) 1618 v2 = tar.offset 1619 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1620 finally: 1621 tar.close() 1622 1623 tar = tarfile.open(tmpname) 1624 try: 1625 member = tar.next() 1626 self.assertIsNotNone(member, 1627 "unable to read longname member") 1628 self.assertEqual(tarinfo.name, member.name, 1629 "unable to read longname member") 1630 self.assertEqual(tarinfo.linkname, member.linkname, 1631 "unable to read longname member") 1632 finally: 1633 tar.close() 1634 1635 def test_longname_1023(self): 1636 self._test(("longnam/" * 127) + "longnam") 1637 1638 def test_longname_1024(self): 1639 self._test(("longnam/" * 127) + "longname") 1640 1641 def test_longname_1025(self): 1642 self._test(("longnam/" * 127) + "longname_") 1643 1644 def test_longlink_1023(self): 1645 self._test("name", ("longlnk/" * 127) + "longlnk") 1646 1647 def test_longlink_1024(self): 1648 self._test("name", ("longlnk/" * 127) + "longlink") 1649 1650 def test_longlink_1025(self): 1651 self._test("name", ("longlnk/" * 127) + "longlink_") 1652 1653 def test_longnamelink_1023(self): 1654 self._test(("longnam/" * 127) + "longnam", 1655 ("longlnk/" * 127) + "longlnk") 1656 1657 def test_longnamelink_1024(self): 1658 self._test(("longnam/" * 127) + "longname", 1659 ("longlnk/" * 127) + "longlink") 1660 1661 def test_longnamelink_1025(self): 1662 self._test(("longnam/" * 127) + "longname_", 1663 ("longlnk/" * 127) + "longlink_") 1664 1665 1666class DeviceHeaderTest(WriteTestBase, unittest.TestCase): 1667 1668 prefix = "w:" 1669 1670 def test_headers_written_only_for_device_files(self): 1671 # Regression test for bpo-18819. 1672 tempdir = os.path.join(TEMPDIR, "device_header_test") 1673 os.mkdir(tempdir) 1674 try: 1675 tar = tarfile.open(tmpname, self.mode) 1676 try: 1677 input_blk = tarfile.TarInfo(name="my_block_device") 1678 input_reg = tarfile.TarInfo(name="my_regular_file") 1679 input_blk.type = tarfile.BLKTYPE 1680 input_reg.type = tarfile.REGTYPE 1681 tar.addfile(input_blk) 1682 tar.addfile(input_reg) 1683 finally: 1684 tar.close() 1685 1686 # devmajor and devminor should be *interpreted* as 0 in both... 1687 tar = tarfile.open(tmpname, "r") 1688 try: 1689 output_blk = tar.getmember("my_block_device") 1690 output_reg = tar.getmember("my_regular_file") 1691 finally: 1692 tar.close() 1693 self.assertEqual(output_blk.devmajor, 0) 1694 self.assertEqual(output_blk.devminor, 0) 1695 self.assertEqual(output_reg.devmajor, 0) 1696 self.assertEqual(output_reg.devminor, 0) 1697 1698 # ...but the fields should not actually be set on regular files: 1699 with open(tmpname, "rb") as infile: 1700 buf = infile.read() 1701 buf_blk = buf[output_blk.offset:output_blk.offset_data] 1702 buf_reg = buf[output_reg.offset:output_reg.offset_data] 1703 # See `struct posixheader` in GNU docs for byte offsets: 1704 # <https://www.gnu.org/software/tar/manual/html_node/Standard.html> 1705 device_headers = slice(329, 329 + 16) 1706 self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2) 1707 self.assertEqual(buf_reg[device_headers], b"\0" * 16) 1708 finally: 1709 os_helper.rmtree(tempdir) 1710 1711 1712class CreateTest(WriteTestBase, unittest.TestCase): 1713 1714 prefix = "x:" 1715 1716 file_path = os.path.join(TEMPDIR, "spameggs42") 1717 1718 def setUp(self): 1719 os_helper.unlink(tmpname) 1720 1721 @classmethod 1722 def setUpClass(cls): 1723 with open(cls.file_path, "wb") as fobj: 1724 fobj.write(b"aaa") 1725 1726 @classmethod 1727 def tearDownClass(cls): 1728 os_helper.unlink(cls.file_path) 1729 1730 def test_create(self): 1731 with tarfile.open(tmpname, self.mode) as tobj: 1732 tobj.add(self.file_path) 1733 1734 with self.taropen(tmpname) as tobj: 1735 names = tobj.getnames() 1736 self.assertEqual(len(names), 1) 1737 self.assertIn('spameggs42', names[0]) 1738 1739 def test_create_existing(self): 1740 with tarfile.open(tmpname, self.mode) as tobj: 1741 tobj.add(self.file_path) 1742 1743 with self.assertRaises(FileExistsError): 1744 tobj = tarfile.open(tmpname, self.mode) 1745 1746 with self.taropen(tmpname) as tobj: 1747 names = tobj.getnames() 1748 self.assertEqual(len(names), 1) 1749 self.assertIn('spameggs42', names[0]) 1750 1751 def test_create_taropen(self): 1752 with self.taropen(tmpname, "x") as tobj: 1753 tobj.add(self.file_path) 1754 1755 with self.taropen(tmpname) as tobj: 1756 names = tobj.getnames() 1757 self.assertEqual(len(names), 1) 1758 self.assertIn('spameggs42', names[0]) 1759 1760 def test_create_existing_taropen(self): 1761 with self.taropen(tmpname, "x") as tobj: 1762 tobj.add(self.file_path) 1763 1764 with self.assertRaises(FileExistsError): 1765 with self.taropen(tmpname, "x"): 1766 pass 1767 1768 with self.taropen(tmpname) as tobj: 1769 names = tobj.getnames() 1770 self.assertEqual(len(names), 1) 1771 self.assertIn("spameggs42", names[0]) 1772 1773 def test_create_pathlike_name(self): 1774 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1775 self.assertIsInstance(tobj.name, str) 1776 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1777 tobj.add(pathlib.Path(self.file_path)) 1778 names = tobj.getnames() 1779 self.assertEqual(len(names), 1) 1780 self.assertIn('spameggs42', names[0]) 1781 1782 with self.taropen(tmpname) as tobj: 1783 names = tobj.getnames() 1784 self.assertEqual(len(names), 1) 1785 self.assertIn('spameggs42', names[0]) 1786 1787 def test_create_taropen_pathlike_name(self): 1788 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1789 self.assertIsInstance(tobj.name, str) 1790 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1791 tobj.add(pathlib.Path(self.file_path)) 1792 names = tobj.getnames() 1793 self.assertEqual(len(names), 1) 1794 self.assertIn('spameggs42', names[0]) 1795 1796 with self.taropen(tmpname) as tobj: 1797 names = tobj.getnames() 1798 self.assertEqual(len(names), 1) 1799 self.assertIn('spameggs42', names[0]) 1800 1801 1802class GzipCreateTest(GzipTest, CreateTest): 1803 1804 def test_create_with_compresslevel(self): 1805 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1806 tobj.add(self.file_path) 1807 with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj: 1808 pass 1809 1810 1811class Bz2CreateTest(Bz2Test, CreateTest): 1812 1813 def test_create_with_compresslevel(self): 1814 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1815 tobj.add(self.file_path) 1816 with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj: 1817 pass 1818 1819 1820class LzmaCreateTest(LzmaTest, CreateTest): 1821 1822 # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel. 1823 # It does not allow for preset to be specified when reading. 1824 def test_create_with_preset(self): 1825 with tarfile.open(tmpname, self.mode, preset=1) as tobj: 1826 tobj.add(self.file_path) 1827 1828 1829class CreateWithXModeTest(CreateTest): 1830 1831 prefix = "x" 1832 1833 test_create_taropen = None 1834 test_create_existing_taropen = None 1835 1836 1837@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1838class HardlinkTest(unittest.TestCase): 1839 # Test the creation of LNKTYPE (hardlink) members in an archive. 1840 1841 def setUp(self): 1842 self.foo = os.path.join(TEMPDIR, "foo") 1843 self.bar = os.path.join(TEMPDIR, "bar") 1844 1845 with open(self.foo, "wb") as fobj: 1846 fobj.write(b"foo") 1847 1848 try: 1849 os.link(self.foo, self.bar) 1850 except PermissionError as e: 1851 self.skipTest('os.link(): %s' % e) 1852 1853 self.tar = tarfile.open(tmpname, "w") 1854 self.tar.add(self.foo) 1855 1856 def tearDown(self): 1857 self.tar.close() 1858 os_helper.unlink(self.foo) 1859 os_helper.unlink(self.bar) 1860 1861 def test_add_twice(self): 1862 # The same name will be added as a REGTYPE every 1863 # time regardless of st_nlink. 1864 tarinfo = self.tar.gettarinfo(self.foo) 1865 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1866 "add file as regular failed") 1867 1868 def test_add_hardlink(self): 1869 tarinfo = self.tar.gettarinfo(self.bar) 1870 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1871 "add file as hardlink failed") 1872 1873 def test_dereference_hardlink(self): 1874 self.tar.dereference = True 1875 tarinfo = self.tar.gettarinfo(self.bar) 1876 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1877 "dereferencing hardlink failed") 1878 1879 1880class PaxWriteTest(GNUWriteTest): 1881 1882 def _test(self, name, link=None): 1883 # See GNUWriteTest. 1884 tarinfo = tarfile.TarInfo(name) 1885 if link: 1886 tarinfo.linkname = link 1887 tarinfo.type = tarfile.LNKTYPE 1888 1889 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1890 try: 1891 tar.addfile(tarinfo) 1892 finally: 1893 tar.close() 1894 1895 tar = tarfile.open(tmpname) 1896 try: 1897 if link: 1898 l = tar.getmembers()[0].linkname 1899 self.assertEqual(link, l, "PAX longlink creation failed") 1900 else: 1901 n = tar.getmembers()[0].name 1902 self.assertEqual(name, n, "PAX longname creation failed") 1903 finally: 1904 tar.close() 1905 1906 def test_pax_global_header(self): 1907 pax_headers = { 1908 "foo": "bar", 1909 "uid": "0", 1910 "mtime": "1.23", 1911 "test": "\xe4\xf6\xfc", 1912 "\xe4\xf6\xfc": "test"} 1913 1914 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1915 pax_headers=pax_headers) 1916 try: 1917 tar.addfile(tarfile.TarInfo("test")) 1918 finally: 1919 tar.close() 1920 1921 # Test if the global header was written correctly. 1922 tar = tarfile.open(tmpname, encoding="iso8859-1") 1923 try: 1924 self.assertEqual(tar.pax_headers, pax_headers) 1925 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1926 # Test if all the fields are strings. 1927 for key, val in tar.pax_headers.items(): 1928 self.assertIsNot(type(key), bytes) 1929 self.assertIsNot(type(val), bytes) 1930 if key in tarfile.PAX_NUMBER_FIELDS: 1931 try: 1932 tarfile.PAX_NUMBER_FIELDS[key](val) 1933 except (TypeError, ValueError): 1934 self.fail("unable to convert pax header field") 1935 finally: 1936 tar.close() 1937 1938 def test_pax_extended_header(self): 1939 # The fields from the pax header have priority over the 1940 # TarInfo. 1941 pax_headers = {"path": "foo", "uid": "123"} 1942 1943 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1944 encoding="iso8859-1") 1945 try: 1946 t = tarfile.TarInfo() 1947 t.name = "\xe4\xf6\xfc" # non-ASCII 1948 t.uid = 8**8 # too large 1949 t.pax_headers = pax_headers 1950 tar.addfile(t) 1951 finally: 1952 tar.close() 1953 1954 tar = tarfile.open(tmpname, encoding="iso8859-1") 1955 try: 1956 t = tar.getmembers()[0] 1957 self.assertEqual(t.pax_headers, pax_headers) 1958 self.assertEqual(t.name, "foo") 1959 self.assertEqual(t.uid, 123) 1960 finally: 1961 tar.close() 1962 1963 def test_create_pax_header(self): 1964 # The ustar header should contain values that can be 1965 # represented reasonably, even if a better (e.g. higher 1966 # precision) version is set in the pax header. 1967 # Issue #45863 1968 1969 # values that should be kept 1970 t = tarfile.TarInfo() 1971 t.name = "foo" 1972 t.mtime = 1000.1 1973 t.size = 100 1974 t.uid = 123 1975 t.gid = 124 1976 info = t.get_info() 1977 header = t.create_pax_header(info, encoding="iso8859-1") 1978 self.assertEqual(info['name'], "foo") 1979 # mtime should be rounded to nearest second 1980 self.assertIsInstance(info['mtime'], int) 1981 self.assertEqual(info['mtime'], 1000) 1982 self.assertEqual(info['size'], 100) 1983 self.assertEqual(info['uid'], 123) 1984 self.assertEqual(info['gid'], 124) 1985 self.assertEqual(header, 1986 b'././@PaxHeader' + bytes(86) \ 1987 + b'0000000\x000000000\x000000000\x0000000000020\x0000000000000\x00010205\x00 x' \ 1988 + bytes(100) + b'ustar\x0000'+ bytes(247) \ 1989 + b'16 mtime=1000.1\n' + bytes(496) + b'foo' + bytes(97) \ 1990 + b'0000644\x000000173\x000000174\x0000000000144\x0000000001750\x00006516\x00 0' \ 1991 + bytes(100) + b'ustar\x0000' + bytes(247)) 1992 1993 # values that should be changed 1994 t = tarfile.TarInfo() 1995 t.name = "foo\u3374" # can't be represented in ascii 1996 t.mtime = 10**10 # too big 1997 t.size = 10**10 # too big 1998 t.uid = 8**8 # too big 1999 t.gid = 8**8+1 # too big 2000 info = t.get_info() 2001 header = t.create_pax_header(info, encoding="iso8859-1") 2002 # name is kept as-is in info but should be added to pax header 2003 self.assertEqual(info['name'], "foo\u3374") 2004 self.assertEqual(info['mtime'], 0) 2005 self.assertEqual(info['size'], 0) 2006 self.assertEqual(info['uid'], 0) 2007 self.assertEqual(info['gid'], 0) 2008 self.assertEqual(header, 2009 b'././@PaxHeader' + bytes(86) \ 2010 + b'0000000\x000000000\x000000000\x0000000000130\x0000000000000\x00010207\x00 x' \ 2011 + bytes(100) + b'ustar\x0000' + bytes(247) \ 2012 + b'15 path=foo\xe3\x8d\xb4\n16 uid=16777216\n' \ 2013 + b'16 gid=16777217\n20 size=10000000000\n' \ 2014 + b'21 mtime=10000000000\n'+ bytes(424) + b'foo?' + bytes(96) \ 2015 + b'0000644\x000000000\x000000000\x0000000000000\x0000000000000\x00006540\x00 0' \ 2016 + bytes(100) + b'ustar\x0000' + bytes(247)) 2017 2018 2019class UnicodeTest: 2020 2021 def test_iso8859_1_filename(self): 2022 self._test_unicode_filename("iso8859-1") 2023 2024 def test_utf7_filename(self): 2025 self._test_unicode_filename("utf7") 2026 2027 def test_utf8_filename(self): 2028 self._test_unicode_filename("utf-8") 2029 2030 def _test_unicode_filename(self, encoding): 2031 tar = tarfile.open(tmpname, "w", format=self.format, 2032 encoding=encoding, errors="strict") 2033 try: 2034 name = "\xe4\xf6\xfc" 2035 tar.addfile(tarfile.TarInfo(name)) 2036 finally: 2037 tar.close() 2038 2039 tar = tarfile.open(tmpname, encoding=encoding) 2040 try: 2041 self.assertEqual(tar.getmembers()[0].name, name) 2042 finally: 2043 tar.close() 2044 2045 def test_unicode_filename_error(self): 2046 tar = tarfile.open(tmpname, "w", format=self.format, 2047 encoding="ascii", errors="strict") 2048 try: 2049 tarinfo = tarfile.TarInfo() 2050 2051 tarinfo.name = "\xe4\xf6\xfc" 2052 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 2053 2054 tarinfo.name = "foo" 2055 tarinfo.uname = "\xe4\xf6\xfc" 2056 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 2057 finally: 2058 tar.close() 2059 2060 def test_unicode_argument(self): 2061 tar = tarfile.open(tarname, "r", 2062 encoding="iso8859-1", errors="strict") 2063 try: 2064 for t in tar: 2065 self.assertIs(type(t.name), str) 2066 self.assertIs(type(t.linkname), str) 2067 self.assertIs(type(t.uname), str) 2068 self.assertIs(type(t.gname), str) 2069 finally: 2070 tar.close() 2071 2072 def test_uname_unicode(self): 2073 t = tarfile.TarInfo("foo") 2074 t.uname = "\xe4\xf6\xfc" 2075 t.gname = "\xe4\xf6\xfc" 2076 2077 tar = tarfile.open(tmpname, mode="w", format=self.format, 2078 encoding="iso8859-1") 2079 try: 2080 tar.addfile(t) 2081 finally: 2082 tar.close() 2083 2084 tar = tarfile.open(tmpname, encoding="iso8859-1") 2085 try: 2086 t = tar.getmember("foo") 2087 self.assertEqual(t.uname, "\xe4\xf6\xfc") 2088 self.assertEqual(t.gname, "\xe4\xf6\xfc") 2089 2090 if self.format != tarfile.PAX_FORMAT: 2091 tar.close() 2092 tar = tarfile.open(tmpname, encoding="ascii") 2093 t = tar.getmember("foo") 2094 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 2095 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 2096 finally: 2097 tar.close() 2098 2099 2100class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 2101 2102 format = tarfile.USTAR_FORMAT 2103 2104 # Test whether the utf-8 encoded version of a filename exceeds the 100 2105 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 2106 # bytes). 2107 def test_unicode_name1(self): 2108 self._test_ustar_name("0123456789" * 10) 2109 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 2110 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 2111 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 2112 2113 def test_unicode_name2(self): 2114 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 2115 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 2116 2117 # Test whether the utf-8 encoded version of a filename exceeds the 155 2118 # bytes prefix + '/' + 100 bytes name limit. 2119 def test_unicode_longname1(self): 2120 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 2121 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 2122 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 2123 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 2124 2125 def test_unicode_longname2(self): 2126 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 2127 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 2128 2129 def test_unicode_longname3(self): 2130 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 2131 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 2132 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 2133 2134 def test_unicode_longname4(self): 2135 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 2136 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 2137 2138 def _test_ustar_name(self, name, exc=None): 2139 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2140 t = tarfile.TarInfo(name) 2141 if exc is None: 2142 tar.addfile(t) 2143 else: 2144 self.assertRaises(exc, tar.addfile, t) 2145 2146 if exc is None: 2147 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2148 for t in tar: 2149 self.assertEqual(name, t.name) 2150 break 2151 2152 # Test the same as above for the 100 bytes link field. 2153 def test_unicode_link1(self): 2154 self._test_ustar_link("0123456789" * 10) 2155 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 2156 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 2157 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 2158 2159 def test_unicode_link2(self): 2160 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 2161 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 2162 2163 def _test_ustar_link(self, name, exc=None): 2164 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2165 t = tarfile.TarInfo("foo") 2166 t.linkname = name 2167 if exc is None: 2168 tar.addfile(t) 2169 else: 2170 self.assertRaises(exc, tar.addfile, t) 2171 2172 if exc is None: 2173 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2174 for t in tar: 2175 self.assertEqual(name, t.linkname) 2176 break 2177 2178 2179class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 2180 2181 format = tarfile.GNU_FORMAT 2182 2183 def test_bad_pax_header(self): 2184 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 2185 # without a hdrcharset=BINARY header. 2186 for encoding, name in ( 2187 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 2188 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 2189 with tarfile.open(tarname, encoding=encoding, 2190 errors="surrogateescape") as tar: 2191 try: 2192 t = tar.getmember(name) 2193 except KeyError: 2194 self.fail("unable to read bad GNU tar pax header") 2195 2196 2197class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 2198 2199 format = tarfile.PAX_FORMAT 2200 2201 # PAX_FORMAT ignores encoding in write mode. 2202 test_unicode_filename_error = None 2203 2204 def test_binary_header(self): 2205 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 2206 for encoding, name in ( 2207 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 2208 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 2209 with tarfile.open(tarname, encoding=encoding, 2210 errors="surrogateescape") as tar: 2211 try: 2212 t = tar.getmember(name) 2213 except KeyError: 2214 self.fail("unable to read POSIX.1-2008 binary header") 2215 2216 2217class AppendTestBase: 2218 # Test append mode (cp. patch #1652681). 2219 2220 def setUp(self): 2221 self.tarname = tmpname 2222 if os.path.exists(self.tarname): 2223 os_helper.unlink(self.tarname) 2224 2225 def _create_testtar(self, mode="w:"): 2226 with tarfile.open(tarname, encoding="iso8859-1") as src: 2227 t = src.getmember("ustar/regtype") 2228 t.name = "foo" 2229 with src.extractfile(t) as f: 2230 with tarfile.open(self.tarname, mode) as tar: 2231 tar.addfile(t, f) 2232 2233 def test_append_compressed(self): 2234 self._create_testtar("w:" + self.suffix) 2235 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 2236 2237class AppendTest(AppendTestBase, unittest.TestCase): 2238 test_append_compressed = None 2239 2240 def _add_testfile(self, fileobj=None): 2241 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 2242 tar.addfile(tarfile.TarInfo("bar")) 2243 2244 def _test(self, names=["bar"], fileobj=None): 2245 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 2246 self.assertEqual(tar.getnames(), names) 2247 2248 def test_non_existing(self): 2249 self._add_testfile() 2250 self._test() 2251 2252 def test_empty(self): 2253 tarfile.open(self.tarname, "w:").close() 2254 self._add_testfile() 2255 self._test() 2256 2257 def test_empty_fileobj(self): 2258 fobj = io.BytesIO(b"\0" * 1024) 2259 self._add_testfile(fobj) 2260 fobj.seek(0) 2261 self._test(fileobj=fobj) 2262 2263 def test_fileobj(self): 2264 self._create_testtar() 2265 with open(self.tarname, "rb") as fobj: 2266 data = fobj.read() 2267 fobj = io.BytesIO(data) 2268 self._add_testfile(fobj) 2269 fobj.seek(0) 2270 self._test(names=["foo", "bar"], fileobj=fobj) 2271 2272 def test_existing(self): 2273 self._create_testtar() 2274 self._add_testfile() 2275 self._test(names=["foo", "bar"]) 2276 2277 # Append mode is supposed to fail if the tarfile to append to 2278 # does not end with a zero block. 2279 def _test_error(self, data): 2280 with open(self.tarname, "wb") as fobj: 2281 fobj.write(data) 2282 self.assertRaises(tarfile.ReadError, self._add_testfile) 2283 2284 def test_null(self): 2285 self._test_error(b"") 2286 2287 def test_incomplete(self): 2288 self._test_error(b"\0" * 13) 2289 2290 def test_premature_eof(self): 2291 data = tarfile.TarInfo("foo").tobuf() 2292 self._test_error(data) 2293 2294 def test_trailing_garbage(self): 2295 data = tarfile.TarInfo("foo").tobuf() 2296 self._test_error(data + b"\0" * 13) 2297 2298 def test_invalid(self): 2299 self._test_error(b"a" * 512) 2300 2301class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2302 pass 2303 2304class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2305 pass 2306 2307class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2308 pass 2309 2310 2311class LimitsTest(unittest.TestCase): 2312 2313 def test_ustar_limits(self): 2314 # 100 char name 2315 tarinfo = tarfile.TarInfo("0123456789" * 10) 2316 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2317 2318 # 101 char name that cannot be stored 2319 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2320 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2321 2322 # 256 char name with a slash at pos 156 2323 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2324 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2325 2326 # 256 char name that cannot be stored 2327 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2328 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2329 2330 # 512 char name 2331 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2332 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2333 2334 # 512 char linkname 2335 tarinfo = tarfile.TarInfo("longlink") 2336 tarinfo.linkname = "123/" * 126 + "longname" 2337 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2338 2339 # uid > 8 digits 2340 tarinfo = tarfile.TarInfo("name") 2341 tarinfo.uid = 0o10000000 2342 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2343 2344 def test_gnu_limits(self): 2345 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2346 tarinfo.tobuf(tarfile.GNU_FORMAT) 2347 2348 tarinfo = tarfile.TarInfo("longlink") 2349 tarinfo.linkname = "123/" * 126 + "longname" 2350 tarinfo.tobuf(tarfile.GNU_FORMAT) 2351 2352 # uid >= 256 ** 7 2353 tarinfo = tarfile.TarInfo("name") 2354 tarinfo.uid = 0o4000000000000000000 2355 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2356 2357 def test_pax_limits(self): 2358 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2359 tarinfo.tobuf(tarfile.PAX_FORMAT) 2360 2361 tarinfo = tarfile.TarInfo("longlink") 2362 tarinfo.linkname = "123/" * 126 + "longname" 2363 tarinfo.tobuf(tarfile.PAX_FORMAT) 2364 2365 tarinfo = tarfile.TarInfo("name") 2366 tarinfo.uid = 0o4000000000000000000 2367 tarinfo.tobuf(tarfile.PAX_FORMAT) 2368 2369 2370class MiscTest(unittest.TestCase): 2371 2372 def test_char_fields(self): 2373 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2374 b"foo\0\0\0\0\0") 2375 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2376 b"foo") 2377 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2378 "foo") 2379 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2380 "foo") 2381 2382 def test_read_number_fields(self): 2383 # Issue 13158: Test if GNU tar specific base-256 number fields 2384 # are decoded correctly. 2385 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2386 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2387 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2388 0o10000000) 2389 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2390 0xffffffff) 2391 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2392 -1) 2393 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2394 -100) 2395 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2396 -0x100000000000000) 2397 2398 # Issue 24514: Test if empty number fields are converted to zero. 2399 self.assertEqual(tarfile.nti(b"\0"), 0) 2400 self.assertEqual(tarfile.nti(b" \0"), 0) 2401 2402 def test_write_number_fields(self): 2403 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2404 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2405 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2406 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2407 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2408 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2409 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2410 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2411 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2412 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2413 self.assertEqual(tarfile.itn(-0x100000000000000, 2414 format=tarfile.GNU_FORMAT), 2415 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2416 2417 # Issue 32713: Test if itn() supports float values outside the 2418 # non-GNU format range 2419 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2420 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2421 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2422 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2423 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2424 2425 def test_number_field_limits(self): 2426 with self.assertRaises(ValueError): 2427 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2428 with self.assertRaises(ValueError): 2429 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2430 with self.assertRaises(ValueError): 2431 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2432 with self.assertRaises(ValueError): 2433 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2434 2435 def test__all__(self): 2436 not_exported = { 2437 'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE', 2438 'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME', 2439 'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2440 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE', 2441 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE', 2442 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES', 2443 'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS', 2444 'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 2445 'copyfileobj', 'filemode', 'EmptyHeaderError', 2446 'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError', 2447 'SubsequentHeaderError', 'ExFileObject', 'main', 2448 "fully_trusted_filter", "data_filter", 2449 "tar_filter", "FilterError", "AbsoluteLinkError", 2450 "OutsideDestinationError", "SpecialFileError", "AbsolutePathError", 2451 "LinkOutsideDestinationError", 2452 } 2453 support.check__all__(self, tarfile, not_exported=not_exported) 2454 2455 def test_useful_error_message_when_modules_missing(self): 2456 fname = os.path.join(os.path.dirname(__file__), 'testtar.tar.xz') 2457 with self.assertRaises(tarfile.ReadError) as excinfo: 2458 error = tarfile.CompressionError('lzma module is not available'), 2459 with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error): 2460 tarfile.open(fname) 2461 2462 self.assertIn( 2463 "\n- method xz: CompressionError('lzma module is not available')\n", 2464 str(excinfo.exception), 2465 ) 2466 2467 2468class CommandLineTest(unittest.TestCase): 2469 2470 def tarfilecmd(self, *args, **kwargs): 2471 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2472 **kwargs) 2473 return out.replace(os.linesep.encode(), b'\n') 2474 2475 def tarfilecmd_failure(self, *args): 2476 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2477 2478 def make_simple_tarfile(self, tar_name): 2479 files = [support.findfile('tokenize_tests.txt'), 2480 support.findfile('tokenize_tests-no-coding-cookie-' 2481 'and-utf8-bom-sig-only.txt')] 2482 self.addCleanup(os_helper.unlink, tar_name) 2483 with tarfile.open(tar_name, 'w') as tf: 2484 for tardata in files: 2485 tf.add(tardata, arcname=os.path.basename(tardata)) 2486 2487 def make_evil_tarfile(self, tar_name): 2488 files = [support.findfile('tokenize_tests.txt')] 2489 self.addCleanup(os_helper.unlink, tar_name) 2490 with tarfile.open(tar_name, 'w') as tf: 2491 benign = tarfile.TarInfo('benign') 2492 tf.addfile(benign, fileobj=io.BytesIO(b'')) 2493 evil = tarfile.TarInfo('../evil') 2494 tf.addfile(evil, fileobj=io.BytesIO(b'')) 2495 2496 def test_bad_use(self): 2497 rc, out, err = self.tarfilecmd_failure() 2498 self.assertEqual(out, b'') 2499 self.assertIn(b'usage', err.lower()) 2500 self.assertIn(b'error', err.lower()) 2501 self.assertIn(b'required', err.lower()) 2502 rc, out, err = self.tarfilecmd_failure('-l', '') 2503 self.assertEqual(out, b'') 2504 self.assertNotEqual(err.strip(), b'') 2505 2506 def test_test_command(self): 2507 for tar_name in testtarnames: 2508 for opt in '-t', '--test': 2509 out = self.tarfilecmd(opt, tar_name) 2510 self.assertEqual(out, b'') 2511 2512 def test_test_command_verbose(self): 2513 for tar_name in testtarnames: 2514 for opt in '-v', '--verbose': 2515 out = self.tarfilecmd(opt, '-t', tar_name, 2516 PYTHONIOENCODING='utf-8') 2517 self.assertIn(b'is a tar archive.\n', out) 2518 2519 def test_test_command_invalid_file(self): 2520 zipname = support.findfile('zipdir.zip') 2521 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2522 self.assertIn(b' is not a tar archive.', err) 2523 self.assertEqual(out, b'') 2524 self.assertEqual(rc, 1) 2525 2526 for tar_name in testtarnames: 2527 with self.subTest(tar_name=tar_name): 2528 with open(tar_name, 'rb') as f: 2529 data = f.read() 2530 try: 2531 with open(tmpname, 'wb') as f: 2532 f.write(data[:511]) 2533 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2534 self.assertEqual(out, b'') 2535 self.assertEqual(rc, 1) 2536 finally: 2537 os_helper.unlink(tmpname) 2538 2539 def test_list_command(self): 2540 for tar_name in testtarnames: 2541 with support.captured_stdout() as t: 2542 with tarfile.open(tar_name, 'r') as tf: 2543 tf.list(verbose=False) 2544 expected = t.getvalue().encode('ascii', 'backslashreplace') 2545 for opt in '-l', '--list': 2546 out = self.tarfilecmd(opt, tar_name, 2547 PYTHONIOENCODING='ascii') 2548 self.assertEqual(out, expected) 2549 2550 def test_list_command_verbose(self): 2551 for tar_name in testtarnames: 2552 with support.captured_stdout() as t: 2553 with tarfile.open(tar_name, 'r') as tf: 2554 tf.list(verbose=True) 2555 expected = t.getvalue().encode('ascii', 'backslashreplace') 2556 for opt in '-v', '--verbose': 2557 out = self.tarfilecmd(opt, '-l', tar_name, 2558 PYTHONIOENCODING='ascii') 2559 self.assertEqual(out, expected) 2560 2561 def test_list_command_invalid_file(self): 2562 zipname = support.findfile('zipdir.zip') 2563 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2564 self.assertIn(b' is not a tar archive.', err) 2565 self.assertEqual(out, b'') 2566 self.assertEqual(rc, 1) 2567 2568 def test_create_command(self): 2569 files = [support.findfile('tokenize_tests.txt'), 2570 support.findfile('tokenize_tests-no-coding-cookie-' 2571 'and-utf8-bom-sig-only.txt')] 2572 for opt in '-c', '--create': 2573 try: 2574 out = self.tarfilecmd(opt, tmpname, *files) 2575 self.assertEqual(out, b'') 2576 with tarfile.open(tmpname) as tar: 2577 tar.getmembers() 2578 finally: 2579 os_helper.unlink(tmpname) 2580 2581 def test_create_command_verbose(self): 2582 files = [support.findfile('tokenize_tests.txt'), 2583 support.findfile('tokenize_tests-no-coding-cookie-' 2584 'and-utf8-bom-sig-only.txt')] 2585 for opt in '-v', '--verbose': 2586 try: 2587 out = self.tarfilecmd(opt, '-c', tmpname, *files, 2588 PYTHONIOENCODING='utf-8') 2589 self.assertIn(b' file created.', out) 2590 with tarfile.open(tmpname) as tar: 2591 tar.getmembers() 2592 finally: 2593 os_helper.unlink(tmpname) 2594 2595 def test_create_command_dotless_filename(self): 2596 files = [support.findfile('tokenize_tests.txt')] 2597 try: 2598 out = self.tarfilecmd('-c', dotlessname, *files) 2599 self.assertEqual(out, b'') 2600 with tarfile.open(dotlessname) as tar: 2601 tar.getmembers() 2602 finally: 2603 os_helper.unlink(dotlessname) 2604 2605 def test_create_command_dot_started_filename(self): 2606 tar_name = os.path.join(TEMPDIR, ".testtar") 2607 files = [support.findfile('tokenize_tests.txt')] 2608 try: 2609 out = self.tarfilecmd('-c', tar_name, *files) 2610 self.assertEqual(out, b'') 2611 with tarfile.open(tar_name) as tar: 2612 tar.getmembers() 2613 finally: 2614 os_helper.unlink(tar_name) 2615 2616 def test_create_command_compressed(self): 2617 files = [support.findfile('tokenize_tests.txt'), 2618 support.findfile('tokenize_tests-no-coding-cookie-' 2619 'and-utf8-bom-sig-only.txt')] 2620 for filetype in (GzipTest, Bz2Test, LzmaTest): 2621 if not filetype.open: 2622 continue 2623 try: 2624 tar_name = tmpname + '.' + filetype.suffix 2625 out = self.tarfilecmd('-c', tar_name, *files) 2626 with filetype.taropen(tar_name) as tar: 2627 tar.getmembers() 2628 finally: 2629 os_helper.unlink(tar_name) 2630 2631 def test_extract_command(self): 2632 self.make_simple_tarfile(tmpname) 2633 for opt in '-e', '--extract': 2634 try: 2635 with os_helper.temp_cwd(tarextdir): 2636 out = self.tarfilecmd(opt, tmpname) 2637 self.assertEqual(out, b'') 2638 finally: 2639 os_helper.rmtree(tarextdir) 2640 2641 def test_extract_command_verbose(self): 2642 self.make_simple_tarfile(tmpname) 2643 for opt in '-v', '--verbose': 2644 try: 2645 with os_helper.temp_cwd(tarextdir): 2646 out = self.tarfilecmd(opt, '-e', tmpname, 2647 PYTHONIOENCODING='utf-8') 2648 self.assertIn(b' file is extracted.', out) 2649 finally: 2650 os_helper.rmtree(tarextdir) 2651 2652 def test_extract_command_filter(self): 2653 self.make_evil_tarfile(tmpname) 2654 # Make an inner directory, so the member named '../evil' 2655 # is still extracted into `tarextdir` 2656 destdir = os.path.join(tarextdir, 'dest') 2657 os.mkdir(tarextdir) 2658 try: 2659 with os_helper.temp_cwd(destdir): 2660 self.tarfilecmd_failure('-e', tmpname, 2661 '-v', 2662 '--filter', 'data') 2663 out = self.tarfilecmd('-e', tmpname, 2664 '-v', 2665 '--filter', 'fully_trusted', 2666 PYTHONIOENCODING='utf-8') 2667 self.assertIn(b' file is extracted.', out) 2668 finally: 2669 os_helper.rmtree(tarextdir) 2670 2671 def test_extract_command_different_directory(self): 2672 self.make_simple_tarfile(tmpname) 2673 try: 2674 with os_helper.temp_cwd(tarextdir): 2675 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2676 self.assertEqual(out, b'') 2677 finally: 2678 os_helper.rmtree(tarextdir) 2679 2680 def test_extract_command_invalid_file(self): 2681 zipname = support.findfile('zipdir.zip') 2682 with os_helper.temp_cwd(tarextdir): 2683 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2684 self.assertIn(b' is not a tar archive.', err) 2685 self.assertEqual(out, b'') 2686 self.assertEqual(rc, 1) 2687 2688 2689class ContextManagerTest(unittest.TestCase): 2690 2691 def test_basic(self): 2692 with tarfile.open(tarname) as tar: 2693 self.assertFalse(tar.closed, "closed inside runtime context") 2694 self.assertTrue(tar.closed, "context manager failed") 2695 2696 def test_closed(self): 2697 # The __enter__() method is supposed to raise OSError 2698 # if the TarFile object is already closed. 2699 tar = tarfile.open(tarname) 2700 tar.close() 2701 with self.assertRaises(OSError): 2702 with tar: 2703 pass 2704 2705 def test_exception(self): 2706 # Test if the OSError exception is passed through properly. 2707 with self.assertRaises(Exception) as exc: 2708 with tarfile.open(tarname) as tar: 2709 raise OSError 2710 self.assertIsInstance(exc.exception, OSError, 2711 "wrong exception raised in context manager") 2712 self.assertTrue(tar.closed, "context manager failed") 2713 2714 def test_no_eof(self): 2715 # __exit__() must not write end-of-archive blocks if an 2716 # exception was raised. 2717 try: 2718 with tarfile.open(tmpname, "w") as tar: 2719 raise Exception 2720 except: 2721 pass 2722 self.assertEqual(os.path.getsize(tmpname), 0, 2723 "context manager wrote an end-of-archive block") 2724 self.assertTrue(tar.closed, "context manager failed") 2725 2726 def test_eof(self): 2727 # __exit__() must write end-of-archive blocks, i.e. call 2728 # TarFile.close() if there was no error. 2729 with tarfile.open(tmpname, "w"): 2730 pass 2731 self.assertNotEqual(os.path.getsize(tmpname), 0, 2732 "context manager wrote no end-of-archive block") 2733 2734 def test_fileobj(self): 2735 # Test that __exit__() did not close the external file 2736 # object. 2737 with open(tmpname, "wb") as fobj: 2738 try: 2739 with tarfile.open(fileobj=fobj, mode="w") as tar: 2740 raise Exception 2741 except: 2742 pass 2743 self.assertFalse(fobj.closed, "external file object was closed") 2744 self.assertTrue(tar.closed, "context manager failed") 2745 2746 2747@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2748class LinkEmulationTest(ReadTest, unittest.TestCase): 2749 2750 # Test for issue #8741 regression. On platforms that do not support 2751 # symbolic or hard links tarfile tries to extract these types of members 2752 # as the regular files they point to. 2753 def _test_link_extraction(self, name): 2754 self.tar.extract(name, TEMPDIR, filter='fully_trusted') 2755 with open(os.path.join(TEMPDIR, name), "rb") as f: 2756 data = f.read() 2757 self.assertEqual(sha256sum(data), sha256_regtype) 2758 2759 # See issues #1578269, #8879, and #17689 for some history on these skips 2760 @unittest.skipIf(hasattr(os.path, "islink"), 2761 "Skip emulation - has os.path.islink but not os.link") 2762 def test_hardlink_extraction1(self): 2763 self._test_link_extraction("ustar/lnktype") 2764 2765 @unittest.skipIf(hasattr(os.path, "islink"), 2766 "Skip emulation - has os.path.islink but not os.link") 2767 def test_hardlink_extraction2(self): 2768 self._test_link_extraction("./ustar/linktest2/lnktype") 2769 2770 @unittest.skipIf(hasattr(os, "symlink"), 2771 "Skip emulation if symlink exists") 2772 def test_symlink_extraction1(self): 2773 self._test_link_extraction("ustar/symtype") 2774 2775 @unittest.skipIf(hasattr(os, "symlink"), 2776 "Skip emulation if symlink exists") 2777 def test_symlink_extraction2(self): 2778 self._test_link_extraction("./ustar/linktest2/symtype") 2779 2780 2781class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2782 # Issue5068: The _BZ2Proxy.read() method loops forever 2783 # on an empty or partial bzipped file. 2784 2785 def _test_partial_input(self, mode): 2786 class MyBytesIO(io.BytesIO): 2787 hit_eof = False 2788 def read(self, n): 2789 if self.hit_eof: 2790 raise AssertionError("infinite loop detected in " 2791 "tarfile.open()") 2792 self.hit_eof = self.tell() == len(self.getvalue()) 2793 return super(MyBytesIO, self).read(n) 2794 def seek(self, *args): 2795 self.hit_eof = False 2796 return super(MyBytesIO, self).seek(*args) 2797 2798 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2799 for x in range(len(data) + 1): 2800 try: 2801 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2802 except tarfile.ReadError: 2803 pass # we have no interest in ReadErrors 2804 2805 def test_partial_input(self): 2806 self._test_partial_input("r") 2807 2808 def test_partial_input_bz2(self): 2809 self._test_partial_input("r:bz2") 2810 2811 2812def root_is_uid_gid_0(): 2813 try: 2814 import pwd, grp 2815 except ImportError: 2816 return False 2817 if pwd.getpwuid(0)[0] != 'root': 2818 return False 2819 if grp.getgrgid(0)[0] != 'root': 2820 return False 2821 return True 2822 2823 2824@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2825@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2826class NumericOwnerTest(unittest.TestCase): 2827 # mock the following: 2828 # os.chown: so we can test what's being called 2829 # os.chmod: so the modes are not actually changed. if they are, we can't 2830 # delete the files/directories 2831 # os.geteuid: so we can lie and say we're root (uid = 0) 2832 2833 @staticmethod 2834 def _make_test_archive(filename_1, dirname_1, filename_2): 2835 # the file contents to write 2836 fobj = io.BytesIO(b"content") 2837 2838 # create a tar file with a file, a directory, and a file within that 2839 # directory. Assign various .uid/.gid values to them 2840 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2841 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2842 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2843 ] 2844 with tarfile.open(tmpname, 'w') as tarfl: 2845 for name, uid, gid, typ, contents in items: 2846 t = tarfile.TarInfo(name) 2847 t.uid = uid 2848 t.gid = gid 2849 t.uname = 'root' 2850 t.gname = 'root' 2851 t.type = typ 2852 tarfl.addfile(t, contents) 2853 2854 # return the full pathname to the tar file 2855 return tmpname 2856 2857 @staticmethod 2858 @contextmanager 2859 def _setup_test(mock_geteuid): 2860 mock_geteuid.return_value = 0 # lie and say we're root 2861 fname = 'numeric-owner-testfile' 2862 dirname = 'dir' 2863 2864 # the names we want stored in the tarfile 2865 filename_1 = fname 2866 dirname_1 = dirname 2867 filename_2 = os.path.join(dirname, fname) 2868 2869 # create the tarfile with the contents we're after 2870 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2871 dirname_1, 2872 filename_2) 2873 2874 # open the tarfile for reading. yield it and the names of the items 2875 # we stored into the file 2876 with tarfile.open(tar_filename) as tarfl: 2877 yield tarfl, filename_1, dirname_1, filename_2 2878 2879 @unittest.mock.patch('os.chown') 2880 @unittest.mock.patch('os.chmod') 2881 @unittest.mock.patch('os.geteuid') 2882 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2883 mock_chown): 2884 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2885 filename_2): 2886 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True, 2887 filter='fully_trusted') 2888 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True, 2889 filter='fully_trusted') 2890 2891 # convert to filesystem paths 2892 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2893 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2894 2895 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2896 unittest.mock.call(f_filename_2, 88, 87), 2897 ], 2898 any_order=True) 2899 2900 @unittest.mock.patch('os.chown') 2901 @unittest.mock.patch('os.chmod') 2902 @unittest.mock.patch('os.geteuid') 2903 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2904 mock_chown): 2905 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2906 filename_2): 2907 tarfl.extractall(TEMPDIR, numeric_owner=True, 2908 filter='fully_trusted') 2909 2910 # convert to filesystem paths 2911 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2912 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2913 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2914 2915 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2916 unittest.mock.call(f_dirname_1, 77, 76), 2917 unittest.mock.call(f_filename_2, 88, 87), 2918 ], 2919 any_order=True) 2920 2921 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2922 # because the uname and gname in the test file are 'root', and extract() 2923 # will look them up using pwd and grp to find their uid and gid, which we 2924 # test here to be 0. 2925 @unittest.skipUnless(root_is_uid_gid_0(), 2926 'uid=0,gid=0 must be named "root"') 2927 @unittest.mock.patch('os.chown') 2928 @unittest.mock.patch('os.chmod') 2929 @unittest.mock.patch('os.geteuid') 2930 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2931 mock_chown): 2932 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2933 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False, 2934 filter='fully_trusted') 2935 2936 # convert to filesystem paths 2937 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2938 2939 mock_chown.assert_called_with(f_filename_1, 0, 0) 2940 2941 @unittest.mock.patch('os.geteuid') 2942 def test_keyword_only(self, mock_geteuid): 2943 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2944 self.assertRaises(TypeError, 2945 tarfl.extract, filename_1, TEMPDIR, False, True) 2946 2947 2948class ReplaceTests(ReadTest, unittest.TestCase): 2949 def test_replace_name(self): 2950 member = self.tar.getmember('ustar/regtype') 2951 replaced = member.replace(name='misc/other') 2952 self.assertEqual(replaced.name, 'misc/other') 2953 self.assertEqual(member.name, 'ustar/regtype') 2954 self.assertEqual(self.tar.getmember('ustar/regtype').name, 2955 'ustar/regtype') 2956 2957 def test_replace_deep(self): 2958 member = self.tar.getmember('pax/regtype1') 2959 replaced = member.replace() 2960 replaced.pax_headers['gname'] = 'not-bar' 2961 self.assertEqual(member.pax_headers['gname'], 'bar') 2962 self.assertEqual( 2963 self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar') 2964 2965 def test_replace_shallow(self): 2966 member = self.tar.getmember('pax/regtype1') 2967 replaced = member.replace(deep=False) 2968 replaced.pax_headers['gname'] = 'not-bar' 2969 self.assertEqual(member.pax_headers['gname'], 'not-bar') 2970 self.assertEqual( 2971 self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar') 2972 2973 def test_replace_all(self): 2974 member = self.tar.getmember('ustar/regtype') 2975 for attr_name in ('name', 'mtime', 'mode', 'linkname', 2976 'uid', 'gid', 'uname', 'gname'): 2977 with self.subTest(attr_name=attr_name): 2978 replaced = member.replace(**{attr_name: None}) 2979 self.assertEqual(getattr(replaced, attr_name), None) 2980 self.assertNotEqual(getattr(member, attr_name), None) 2981 2982 def test_replace_internal(self): 2983 member = self.tar.getmember('ustar/regtype') 2984 with self.assertRaises(TypeError): 2985 member.replace(offset=123456789) 2986 2987 2988class NoneInfoExtractTests(ReadTest): 2989 # These mainly check that all kinds of members are extracted successfully 2990 # if some metadata is None. 2991 # Some of the methods do additional spot checks. 2992 2993 # We also test that the default filters can deal with None. 2994 2995 extraction_filter = None 2996 2997 @classmethod 2998 def setUpClass(cls): 2999 tar = tarfile.open(tarname, mode='r', encoding="iso8859-1") 3000 cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl" 3001 tar.errorlevel = 0 3002 tar.extractall(cls.control_dir, filter=cls.extraction_filter) 3003 tar.close() 3004 cls.control_paths = set( 3005 p.relative_to(cls.control_dir) 3006 for p in pathlib.Path(cls.control_dir).glob('**/*')) 3007 3008 @classmethod 3009 def tearDownClass(cls): 3010 shutil.rmtree(cls.control_dir) 3011 3012 def check_files_present(self, directory): 3013 got_paths = set( 3014 p.relative_to(directory) 3015 for p in pathlib.Path(directory).glob('**/*')) 3016 self.assertEqual(self.control_paths, got_paths) 3017 3018 @contextmanager 3019 def extract_with_none(self, *attr_names): 3020 DIR = pathlib.Path(TEMPDIR) / "extractall_none" 3021 self.tar.errorlevel = 0 3022 for member in self.tar.getmembers(): 3023 for attr_name in attr_names: 3024 setattr(member, attr_name, None) 3025 with os_helper.temp_dir(DIR): 3026 self.tar.extractall(DIR, filter='fully_trusted') 3027 self.check_files_present(DIR) 3028 yield DIR 3029 3030 def test_extractall_none_mtime(self): 3031 # mtimes of extracted files should be later than 'now' -- the mtime 3032 # of a previously created directory. 3033 now = pathlib.Path(TEMPDIR).stat().st_mtime 3034 with self.extract_with_none('mtime') as DIR: 3035 for path in pathlib.Path(DIR).glob('**/*'): 3036 with self.subTest(path=path): 3037 try: 3038 mtime = path.stat().st_mtime 3039 except OSError: 3040 # Some systems can't stat symlinks, ignore those 3041 if not path.is_symlink(): 3042 raise 3043 else: 3044 self.assertGreaterEqual(path.stat().st_mtime, now) 3045 3046 def test_extractall_none_mode(self): 3047 # modes of directories and regular files should match the mode 3048 # of a "normally" created directory or regular file 3049 dir_mode = pathlib.Path(TEMPDIR).stat().st_mode 3050 regular_file = pathlib.Path(TEMPDIR) / 'regular_file' 3051 regular_file.write_text('') 3052 regular_file_mode = regular_file.stat().st_mode 3053 with self.extract_with_none('mode') as DIR: 3054 for path in pathlib.Path(DIR).glob('**/*'): 3055 with self.subTest(path=path): 3056 if path.is_dir(): 3057 self.assertEqual(path.stat().st_mode, dir_mode) 3058 elif path.is_file(): 3059 self.assertEqual(path.stat().st_mode, 3060 regular_file_mode) 3061 3062 def test_extractall_none_uid(self): 3063 with self.extract_with_none('uid'): 3064 pass 3065 3066 def test_extractall_none_gid(self): 3067 with self.extract_with_none('gid'): 3068 pass 3069 3070 def test_extractall_none_uname(self): 3071 with self.extract_with_none('uname'): 3072 pass 3073 3074 def test_extractall_none_gname(self): 3075 with self.extract_with_none('gname'): 3076 pass 3077 3078 def test_extractall_none_ownership(self): 3079 with self.extract_with_none('uid', 'gid', 'uname', 'gname'): 3080 pass 3081 3082class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase): 3083 extraction_filter = 'data' 3084 3085class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests, 3086 unittest.TestCase): 3087 extraction_filter = 'fully_trusted' 3088 3089class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase): 3090 extraction_filter = 'tar' 3091 3092class NoneInfoExtractTests_Default(NoneInfoExtractTests, 3093 unittest.TestCase): 3094 extraction_filter = None 3095 3096class NoneInfoTests_Misc(unittest.TestCase): 3097 def test_add(self): 3098 # When addfile() encounters None metadata, it raises a ValueError 3099 bio = io.BytesIO() 3100 for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT, 3101 tarfile.PAX_FORMAT): 3102 with self.subTest(tarformat=tarformat): 3103 tar = tarfile.open(fileobj=bio, mode='w', format=tarformat) 3104 tarinfo = tar.gettarinfo(tarname) 3105 try: 3106 tar.addfile(tarinfo) 3107 except Exception: 3108 if tarformat == tarfile.USTAR_FORMAT: 3109 # In the old, limited format, adding might fail for 3110 # reasons like the UID being too large 3111 pass 3112 else: 3113 raise 3114 else: 3115 for attr_name in ('mtime', 'mode', 'uid', 'gid', 3116 'uname', 'gname'): 3117 with self.subTest(attr_name=attr_name): 3118 replaced = tarinfo.replace(**{attr_name: None}) 3119 with self.assertRaisesRegex(ValueError, 3120 f"{attr_name}"): 3121 tar.addfile(replaced) 3122 3123 def test_list(self): 3124 # Change some metadata to None, then compare list() output 3125 # word-for-word. We want list() to not raise, and to only change 3126 # printout for the affected piece of metadata. 3127 # (n.b.: some contents of the test archive are hardcoded.) 3128 for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'}, 3129 {'uname'}, {'gname'}, 3130 {'uid', 'uname'}, {'gid', 'gname'}): 3131 with (self.subTest(attr_names=attr_names), 3132 tarfile.open(tarname, encoding="iso8859-1") as tar): 3133 tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 3134 with support.swap_attr(sys, 'stdout', tio_prev): 3135 tar.list() 3136 for member in tar.getmembers(): 3137 for attr_name in attr_names: 3138 setattr(member, attr_name, None) 3139 tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 3140 with support.swap_attr(sys, 'stdout', tio_new): 3141 tar.list() 3142 for expected, got in zip(tio_prev.detach().getvalue().split(), 3143 tio_new.detach().getvalue().split()): 3144 if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected): 3145 self.assertEqual(got, b'????-??-??') 3146 elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected): 3147 self.assertEqual(got, b'??:??:??') 3148 elif attr_names == {'mode'} and re.match( 3149 rb'.([r-][w-][x-]){3}', expected): 3150 self.assertEqual(got, b'??????????') 3151 elif attr_names == {'uname'} and expected.startswith( 3152 (b'tarfile/', b'lars/', b'foo/')): 3153 exp_user, exp_group = expected.split(b'/') 3154 got_user, got_group = got.split(b'/') 3155 self.assertEqual(got_group, exp_group) 3156 self.assertRegex(got_user, b'[0-9]+') 3157 elif attr_names == {'gname'} and expected.endswith( 3158 (b'/tarfile', b'/users', b'/bar')): 3159 exp_user, exp_group = expected.split(b'/') 3160 got_user, got_group = got.split(b'/') 3161 self.assertEqual(got_user, exp_user) 3162 self.assertRegex(got_group, b'[0-9]+') 3163 elif attr_names == {'uid'} and expected.startswith( 3164 (b'1000/')): 3165 exp_user, exp_group = expected.split(b'/') 3166 got_user, got_group = got.split(b'/') 3167 self.assertEqual(got_group, exp_group) 3168 self.assertEqual(got_user, b'None') 3169 elif attr_names == {'gid'} and expected.endswith((b'/100')): 3170 exp_user, exp_group = expected.split(b'/') 3171 got_user, got_group = got.split(b'/') 3172 self.assertEqual(got_user, exp_user) 3173 self.assertEqual(got_group, b'None') 3174 elif attr_names == {'uid', 'uname'} and expected.startswith( 3175 (b'tarfile/', b'lars/', b'foo/', b'1000/')): 3176 exp_user, exp_group = expected.split(b'/') 3177 got_user, got_group = got.split(b'/') 3178 self.assertEqual(got_group, exp_group) 3179 self.assertEqual(got_user, b'None') 3180 elif attr_names == {'gname', 'gid'} and expected.endswith( 3181 (b'/tarfile', b'/users', b'/bar', b'/100')): 3182 exp_user, exp_group = expected.split(b'/') 3183 got_user, got_group = got.split(b'/') 3184 self.assertEqual(got_user, exp_user) 3185 self.assertEqual(got_group, b'None') 3186 else: 3187 # In other cases the output should be the same 3188 self.assertEqual(expected, got) 3189 3190def _filemode_to_int(mode): 3191 """Inverse of `stat.filemode` (for permission bits) 3192 3193 Using mode strings rather than numbers makes the later tests more readable. 3194 """ 3195 str_mode = mode[1:] 3196 result = ( 3197 {'r': stat.S_IRUSR, '-': 0}[str_mode[0]] 3198 | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]] 3199 | {'x': stat.S_IXUSR, '-': 0, 3200 's': stat.S_IXUSR | stat.S_ISUID, 3201 'S': stat.S_ISUID}[str_mode[2]] 3202 | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]] 3203 | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]] 3204 | {'x': stat.S_IXGRP, '-': 0, 3205 's': stat.S_IXGRP | stat.S_ISGID, 3206 'S': stat.S_ISGID}[str_mode[5]] 3207 | {'r': stat.S_IROTH, '-': 0}[str_mode[6]] 3208 | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]] 3209 | {'x': stat.S_IXOTH, '-': 0, 3210 't': stat.S_IXOTH | stat.S_ISVTX, 3211 'T': stat.S_ISVTX}[str_mode[8]] 3212 ) 3213 # check we did this right 3214 assert stat.filemode(result)[1:] == mode[1:] 3215 3216 return result 3217 3218class ArchiveMaker: 3219 """Helper to create a tar file with specific contents 3220 3221 Usage: 3222 3223 with ArchiveMaker() as t: 3224 t.add('filename', ...) 3225 3226 with t.open() as tar: 3227 ... # `tar` is now a TarFile with 'filename' in it! 3228 """ 3229 def __init__(self): 3230 self.bio = io.BytesIO() 3231 3232 def __enter__(self): 3233 self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio) 3234 return self 3235 3236 def __exit__(self, *exc): 3237 self.tar_w.close() 3238 self.contents = self.bio.getvalue() 3239 self.bio = None 3240 3241 def add(self, name, *, type=None, symlink_to=None, hardlink_to=None, 3242 mode=None, **kwargs): 3243 """Add a member to the test archive. Call within `with`.""" 3244 name = str(name) 3245 tarinfo = tarfile.TarInfo(name).replace(**kwargs) 3246 if mode: 3247 tarinfo.mode = _filemode_to_int(mode) 3248 if symlink_to is not None: 3249 type = tarfile.SYMTYPE 3250 tarinfo.linkname = str(symlink_to) 3251 if hardlink_to is not None: 3252 type = tarfile.LNKTYPE 3253 tarinfo.linkname = str(hardlink_to) 3254 if name.endswith('/') and type is None: 3255 type = tarfile.DIRTYPE 3256 if type is not None: 3257 tarinfo.type = type 3258 if tarinfo.isreg(): 3259 fileobj = io.BytesIO(bytes(tarinfo.size)) 3260 else: 3261 fileobj = None 3262 self.tar_w.addfile(tarinfo, fileobj) 3263 3264 def open(self, **kwargs): 3265 """Open the resulting archive as TarFile. Call after `with`.""" 3266 bio = io.BytesIO(self.contents) 3267 return tarfile.open(fileobj=bio, **kwargs) 3268 3269# Under WASI, `os_helper.can_symlink` is False to make 3270# `skip_unless_symlink` skip symlink tests. " 3271# But in the following tests we use can_symlink to *determine* which 3272# behavior is expected. 3273# Like other symlink tests, skip these on WASI for now. 3274if support.is_wasi: 3275 def symlink_test(f): 3276 return unittest.skip("WASI: Skip symlink test for now")(f) 3277else: 3278 def symlink_test(f): 3279 return f 3280 3281 3282class TestExtractionFilters(unittest.TestCase): 3283 3284 # A temporary directory for the extraction results. 3285 # All files that "escape" the destination path should still end 3286 # up in this directory. 3287 outerdir = pathlib.Path(TEMPDIR) / 'outerdir' 3288 3289 # The destination for the extraction, within `outerdir` 3290 destdir = outerdir / 'dest' 3291 3292 @contextmanager 3293 def check_context(self, tar, filter): 3294 """Extracts `tar` to `self.destdir` and allows checking the result 3295 3296 If an error occurs, it must be checked using `expect_exception` 3297 3298 Otherwise, all resulting files must be checked using `expect_file`, 3299 except the destination directory itself and parent directories of 3300 other files. 3301 When checking directories, do so before their contents. 3302 """ 3303 with os_helper.temp_dir(self.outerdir): 3304 try: 3305 tar.extractall(self.destdir, filter=filter) 3306 except Exception as exc: 3307 self.raised_exception = exc 3308 self.expected_paths = set() 3309 else: 3310 self.raised_exception = None 3311 self.expected_paths = set(self.outerdir.glob('**/*')) 3312 self.expected_paths.discard(self.destdir) 3313 try: 3314 yield 3315 finally: 3316 tar.close() 3317 if self.raised_exception: 3318 raise self.raised_exception 3319 self.assertEqual(self.expected_paths, set()) 3320 3321 def expect_file(self, name, type=None, symlink_to=None, mode=None): 3322 """Check a single file. See check_context.""" 3323 if self.raised_exception: 3324 raise self.raised_exception 3325 # use normpath() rather than resolve() so we don't follow symlinks 3326 path = pathlib.Path(os.path.normpath(self.destdir / name)) 3327 self.assertIn(path, self.expected_paths) 3328 self.expected_paths.remove(path) 3329 if mode is not None and os_helper.can_chmod(): 3330 got = stat.filemode(stat.S_IMODE(path.stat().st_mode)) 3331 self.assertEqual(got, mode) 3332 if type is None and isinstance(name, str) and name.endswith('/'): 3333 type = tarfile.DIRTYPE 3334 if symlink_to is not None: 3335 got = (self.destdir / name).readlink() 3336 expected = pathlib.Path(symlink_to) 3337 # The symlink might be the same (textually) as what we expect, 3338 # but some systems change the link to an equivalent path, so 3339 # we fall back to samefile(). 3340 if expected != got: 3341 self.assertTrue(got.samefile(expected)) 3342 elif type == tarfile.REGTYPE or type is None: 3343 self.assertTrue(path.is_file()) 3344 elif type == tarfile.DIRTYPE: 3345 self.assertTrue(path.is_dir()) 3346 elif type == tarfile.FIFOTYPE: 3347 self.assertTrue(path.is_fifo()) 3348 else: 3349 raise NotImplementedError(type) 3350 for parent in path.parents: 3351 self.expected_paths.discard(parent) 3352 3353 def expect_exception(self, exc_type, message_re='.'): 3354 with self.assertRaisesRegex(exc_type, message_re): 3355 if self.raised_exception is not None: 3356 raise self.raised_exception 3357 self.raised_exception = None 3358 3359 def test_benign_file(self): 3360 with ArchiveMaker() as arc: 3361 arc.add('benign.txt') 3362 for filter in 'fully_trusted', 'tar', 'data': 3363 with self.check_context(arc.open(), filter): 3364 self.expect_file('benign.txt') 3365 3366 def test_absolute(self): 3367 # Test handling a member with an absolute path 3368 # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives 3369 with ArchiveMaker() as arc: 3370 arc.add(self.outerdir / 'escaped.evil') 3371 3372 with self.check_context(arc.open(), 'fully_trusted'): 3373 self.expect_file('../escaped.evil') 3374 3375 for filter in 'tar', 'data': 3376 with self.check_context(arc.open(), filter): 3377 if str(self.outerdir).startswith('/'): 3378 # We strip leading slashes, as e.g. GNU tar does 3379 # (without --absolute-filenames). 3380 outerdir_stripped = str(self.outerdir).lstrip('/') 3381 self.expect_file(f'{outerdir_stripped}/escaped.evil') 3382 else: 3383 # On this system, absolute paths don't have leading 3384 # slashes. 3385 # So, there's nothing to strip. We refuse to unpack 3386 # to an absolute path, nonetheless. 3387 self.expect_exception( 3388 tarfile.AbsolutePathError, 3389 """['"].*escaped.evil['"] has an absolute path""") 3390 3391 @symlink_test 3392 def test_parent_symlink(self): 3393 # Test interplaying symlinks 3394 # Inspired by 'dirsymlink2a' in jwilk/traversal-archives 3395 with ArchiveMaker() as arc: 3396 arc.add('current', symlink_to='.') 3397 arc.add('parent', symlink_to='current/..') 3398 arc.add('parent/evil') 3399 3400 if os_helper.can_symlink(): 3401 with self.check_context(arc.open(), 'fully_trusted'): 3402 if self.raised_exception is not None: 3403 # Windows will refuse to create a file that's a symlink to itself 3404 # (and tarfile doesn't swallow that exception) 3405 self.expect_exception(FileExistsError) 3406 # The other cases will fail with this error too. 3407 # Skip the rest of this test. 3408 return 3409 else: 3410 self.expect_file('current', symlink_to='.') 3411 self.expect_file('parent', symlink_to='current/..') 3412 self.expect_file('../evil') 3413 3414 with self.check_context(arc.open(), 'tar'): 3415 self.expect_exception( 3416 tarfile.OutsideDestinationError, 3417 """'parent/evil' would be extracted to ['"].*evil['"], """ 3418 + "which is outside the destination") 3419 3420 with self.check_context(arc.open(), 'data'): 3421 self.expect_exception( 3422 tarfile.LinkOutsideDestinationError, 3423 """'parent' would link to ['"].*outerdir['"], """ 3424 + "which is outside the destination") 3425 3426 else: 3427 # No symlink support. The symlinks are ignored. 3428 with self.check_context(arc.open(), 'fully_trusted'): 3429 self.expect_file('parent/evil') 3430 with self.check_context(arc.open(), 'tar'): 3431 self.expect_file('parent/evil') 3432 with self.check_context(arc.open(), 'data'): 3433 self.expect_file('parent/evil') 3434 3435 @symlink_test 3436 def test_parent_symlink2(self): 3437 # Test interplaying symlinks 3438 # Inspired by 'dirsymlink2b' in jwilk/traversal-archives 3439 with ArchiveMaker() as arc: 3440 arc.add('current', symlink_to='.') 3441 arc.add('current/parent', symlink_to='..') 3442 arc.add('parent/evil') 3443 3444 with self.check_context(arc.open(), 'fully_trusted'): 3445 if os_helper.can_symlink(): 3446 self.expect_file('current', symlink_to='.') 3447 self.expect_file('parent', symlink_to='..') 3448 self.expect_file('../evil') 3449 else: 3450 self.expect_file('current/') 3451 self.expect_file('parent/evil') 3452 3453 with self.check_context(arc.open(), 'tar'): 3454 if os_helper.can_symlink(): 3455 self.expect_exception( 3456 tarfile.OutsideDestinationError, 3457 "'parent/evil' would be extracted to " 3458 + """['"].*evil['"], which is outside """ 3459 + "the destination") 3460 else: 3461 self.expect_file('current/') 3462 self.expect_file('parent/evil') 3463 3464 with self.check_context(arc.open(), 'data'): 3465 self.expect_exception( 3466 tarfile.LinkOutsideDestinationError, 3467 """'current/parent' would link to ['"].*['"], """ 3468 + "which is outside the destination") 3469 3470 @symlink_test 3471 def test_absolute_symlink(self): 3472 # Test symlink to an absolute path 3473 # Inspired by 'dirsymlink' in jwilk/traversal-archives 3474 with ArchiveMaker() as arc: 3475 arc.add('parent', symlink_to=self.outerdir) 3476 arc.add('parent/evil') 3477 3478 with self.check_context(arc.open(), 'fully_trusted'): 3479 if os_helper.can_symlink(): 3480 self.expect_file('parent', symlink_to=self.outerdir) 3481 self.expect_file('../evil') 3482 else: 3483 self.expect_file('parent/evil') 3484 3485 with self.check_context(arc.open(), 'tar'): 3486 if os_helper.can_symlink(): 3487 self.expect_exception( 3488 tarfile.OutsideDestinationError, 3489 "'parent/evil' would be extracted to " 3490 + """['"].*evil['"], which is outside """ 3491 + "the destination") 3492 else: 3493 self.expect_file('parent/evil') 3494 3495 with self.check_context(arc.open(), 'data'): 3496 self.expect_exception( 3497 tarfile.AbsoluteLinkError, 3498 "'parent' is a symlink to an absolute path") 3499 3500 @symlink_test 3501 def test_sly_relative0(self): 3502 # Inspired by 'relative0' in jwilk/traversal-archives 3503 with ArchiveMaker() as arc: 3504 arc.add('../moo', symlink_to='..//tmp/moo') 3505 3506 try: 3507 with self.check_context(arc.open(), filter='fully_trusted'): 3508 if os_helper.can_symlink(): 3509 if isinstance(self.raised_exception, FileExistsError): 3510 # XXX TarFile happens to fail creating a parent 3511 # directory. 3512 # This might be a bug, but fixing it would hurt 3513 # security. 3514 # Note that e.g. GNU `tar` rejects '..' components, 3515 # so you could argue this is an invalid archive and we 3516 # just raise an bad type of exception. 3517 self.expect_exception(FileExistsError) 3518 else: 3519 self.expect_file('../moo', symlink_to='..//tmp/moo') 3520 else: 3521 # The symlink can't be extracted and is ignored 3522 pass 3523 except FileExistsError: 3524 pass 3525 3526 for filter in 'tar', 'data': 3527 with self.check_context(arc.open(), filter): 3528 self.expect_exception( 3529 tarfile.OutsideDestinationError, 3530 "'../moo' would be extracted to " 3531 + "'.*moo', which is outside " 3532 + "the destination") 3533 3534 @symlink_test 3535 def test_sly_relative2(self): 3536 # Inspired by 'relative2' in jwilk/traversal-archives 3537 with ArchiveMaker() as arc: 3538 arc.add('tmp/') 3539 arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo') 3540 3541 with self.check_context(arc.open(), 'fully_trusted'): 3542 self.expect_file('tmp', type=tarfile.DIRTYPE) 3543 if os_helper.can_symlink(): 3544 self.expect_file('../moo', symlink_to='tmp/../../tmp/moo') 3545 3546 for filter in 'tar', 'data': 3547 with self.check_context(arc.open(), filter): 3548 self.expect_exception( 3549 tarfile.OutsideDestinationError, 3550 "'tmp/../../moo' would be extracted to " 3551 + """['"].*moo['"], which is outside the """ 3552 + "destination") 3553 3554 def test_modes(self): 3555 # Test how file modes are extracted 3556 # (Note that the modes are ignored on platforms without working chmod) 3557 with ArchiveMaker() as arc: 3558 arc.add('all_bits', mode='?rwsrwsrwt') 3559 arc.add('perm_bits', mode='?rwxrwxrwx') 3560 arc.add('exec_group_other', mode='?rw-rwxrwx') 3561 arc.add('read_group_only', mode='?---r-----') 3562 arc.add('no_bits', mode='?---------') 3563 arc.add('dir/', mode='?---rwsrwt') 3564 3565 # On some systems, setting the sticky bit is a no-op. 3566 # Check if that's the case. 3567 tmp_filename = os.path.join(TEMPDIR, "tmp.file") 3568 with open(tmp_filename, 'w'): 3569 pass 3570 os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX) 3571 have_sticky_files = (os.stat(tmp_filename).st_mode & stat.S_ISVTX) 3572 os.unlink(tmp_filename) 3573 3574 os.mkdir(tmp_filename) 3575 os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX) 3576 have_sticky_dirs = (os.stat(tmp_filename).st_mode & stat.S_ISVTX) 3577 os.rmdir(tmp_filename) 3578 3579 with self.check_context(arc.open(), 'fully_trusted'): 3580 if have_sticky_files: 3581 self.expect_file('all_bits', mode='?rwsrwsrwt') 3582 else: 3583 self.expect_file('all_bits', mode='?rwsrwsrwx') 3584 self.expect_file('perm_bits', mode='?rwxrwxrwx') 3585 self.expect_file('exec_group_other', mode='?rw-rwxrwx') 3586 self.expect_file('read_group_only', mode='?---r-----') 3587 self.expect_file('no_bits', mode='?---------') 3588 if have_sticky_dirs: 3589 self.expect_file('dir/', mode='?---rwsrwt') 3590 else: 3591 self.expect_file('dir/', mode='?---rwsrwx') 3592 3593 with self.check_context(arc.open(), 'tar'): 3594 self.expect_file('all_bits', mode='?rwxr-xr-x') 3595 self.expect_file('perm_bits', mode='?rwxr-xr-x') 3596 self.expect_file('exec_group_other', mode='?rw-r-xr-x') 3597 self.expect_file('read_group_only', mode='?---r-----') 3598 self.expect_file('no_bits', mode='?---------') 3599 self.expect_file('dir/', mode='?---r-xr-x') 3600 3601 with self.check_context(arc.open(), 'data'): 3602 normal_dir_mode = stat.filemode(stat.S_IMODE( 3603 self.outerdir.stat().st_mode)) 3604 self.expect_file('all_bits', mode='?rwxr-xr-x') 3605 self.expect_file('perm_bits', mode='?rwxr-xr-x') 3606 self.expect_file('exec_group_other', mode='?rw-r--r--') 3607 self.expect_file('read_group_only', mode='?rw-r-----') 3608 self.expect_file('no_bits', mode='?rw-------') 3609 self.expect_file('dir/', mode=normal_dir_mode) 3610 3611 def test_pipe(self): 3612 # Test handling of a special file 3613 with ArchiveMaker() as arc: 3614 arc.add('foo', type=tarfile.FIFOTYPE) 3615 3616 for filter in 'fully_trusted', 'tar': 3617 with self.check_context(arc.open(), filter): 3618 if hasattr(os, 'mkfifo'): 3619 self.expect_file('foo', type=tarfile.FIFOTYPE) 3620 else: 3621 # The pipe can't be extracted and is skipped. 3622 pass 3623 3624 with self.check_context(arc.open(), 'data'): 3625 self.expect_exception( 3626 tarfile.SpecialFileError, 3627 "'foo' is a special file") 3628 3629 def test_special_files(self): 3630 # Creating device files is tricky. Instead of attempting that let's 3631 # only check the filter result. 3632 for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE: 3633 tarinfo = tarfile.TarInfo('foo') 3634 tarinfo.type = special_type 3635 trusted = tarfile.fully_trusted_filter(tarinfo, '') 3636 self.assertIs(trusted, tarinfo) 3637 tar = tarfile.tar_filter(tarinfo, '') 3638 self.assertEqual(tar.type, special_type) 3639 with self.assertRaises(tarfile.SpecialFileError) as cm: 3640 tarfile.data_filter(tarinfo, '') 3641 self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo) 3642 self.assertEqual(cm.exception.tarinfo.name, 'foo') 3643 3644 def test_fully_trusted_filter(self): 3645 # The 'fully_trusted' filter returns the original TarInfo objects. 3646 with tarfile.TarFile.open(tarname) as tar: 3647 for tarinfo in tar.getmembers(): 3648 filtered = tarfile.fully_trusted_filter(tarinfo, '') 3649 self.assertIs(filtered, tarinfo) 3650 3651 def test_tar_filter(self): 3652 # The 'tar' filter returns TarInfo objects with the same name/type. 3653 # (It can also fail for particularly "evil" input, but we don't have 3654 # that in the test archive.) 3655 with tarfile.TarFile.open(tarname) as tar: 3656 for tarinfo in tar.getmembers(): 3657 filtered = tarfile.tar_filter(tarinfo, '') 3658 self.assertIs(filtered.name, tarinfo.name) 3659 self.assertIs(filtered.type, tarinfo.type) 3660 3661 def test_data_filter(self): 3662 # The 'data' filter either raises, or returns TarInfo with the same 3663 # name/type. 3664 with tarfile.TarFile.open(tarname) as tar: 3665 for tarinfo in tar.getmembers(): 3666 try: 3667 filtered = tarfile.data_filter(tarinfo, '') 3668 except tarfile.FilterError: 3669 continue 3670 self.assertIs(filtered.name, tarinfo.name) 3671 self.assertIs(filtered.type, tarinfo.type) 3672 3673 def test_default_filter_warns_not(self): 3674 """Ensure the default filter does not warn (like in 3.12)""" 3675 with ArchiveMaker() as arc: 3676 arc.add('foo') 3677 with warnings_helper.check_no_warnings(self): 3678 with self.check_context(arc.open(), None): 3679 self.expect_file('foo') 3680 3681 def test_change_default_filter_on_instance(self): 3682 tar = tarfile.TarFile(tarname, 'r') 3683 def strict_filter(tarinfo, path): 3684 if tarinfo.name == 'ustar/regtype': 3685 return tarinfo 3686 else: 3687 return None 3688 tar.extraction_filter = strict_filter 3689 with self.check_context(tar, None): 3690 self.expect_file('ustar/regtype') 3691 3692 def test_change_default_filter_on_class(self): 3693 def strict_filter(tarinfo, path): 3694 if tarinfo.name == 'ustar/regtype': 3695 return tarinfo 3696 else: 3697 return None 3698 tar = tarfile.TarFile(tarname, 'r') 3699 with support.swap_attr(tarfile.TarFile, 'extraction_filter', 3700 staticmethod(strict_filter)): 3701 with self.check_context(tar, None): 3702 self.expect_file('ustar/regtype') 3703 3704 def test_change_default_filter_on_subclass(self): 3705 class TarSubclass(tarfile.TarFile): 3706 def extraction_filter(self, tarinfo, path): 3707 if tarinfo.name == 'ustar/regtype': 3708 return tarinfo 3709 else: 3710 return None 3711 3712 tar = TarSubclass(tarname, 'r') 3713 with self.check_context(tar, None): 3714 self.expect_file('ustar/regtype') 3715 3716 def test_change_default_filter_to_string(self): 3717 tar = tarfile.TarFile(tarname, 'r') 3718 tar.extraction_filter = 'data' 3719 with self.check_context(tar, None): 3720 self.expect_exception(TypeError) 3721 3722 def test_custom_filter(self): 3723 def custom_filter(tarinfo, path): 3724 self.assertIs(path, self.destdir) 3725 if tarinfo.name == 'move_this': 3726 return tarinfo.replace(name='moved') 3727 if tarinfo.name == 'ignore_this': 3728 return None 3729 return tarinfo 3730 3731 with ArchiveMaker() as arc: 3732 arc.add('move_this') 3733 arc.add('ignore_this') 3734 arc.add('keep') 3735 with self.check_context(arc.open(), custom_filter): 3736 self.expect_file('moved') 3737 self.expect_file('keep') 3738 3739 def test_bad_filter_name(self): 3740 with ArchiveMaker() as arc: 3741 arc.add('foo') 3742 with self.check_context(arc.open(), 'bad filter name'): 3743 self.expect_exception(ValueError) 3744 3745 def test_stateful_filter(self): 3746 # Stateful filters should be possible. 3747 # (This doesn't really test tarfile. Rather, it demonstrates 3748 # that third parties can implement a stateful filter.) 3749 class StatefulFilter: 3750 def __enter__(self): 3751 self.num_files_processed = 0 3752 return self 3753 3754 def __call__(self, tarinfo, path): 3755 try: 3756 tarinfo = tarfile.data_filter(tarinfo, path) 3757 except tarfile.FilterError: 3758 return None 3759 self.num_files_processed += 1 3760 return tarinfo 3761 3762 def __exit__(self, *exc_info): 3763 self.done = True 3764 3765 with ArchiveMaker() as arc: 3766 arc.add('good') 3767 arc.add('bad', symlink_to='/') 3768 arc.add('good') 3769 with StatefulFilter() as custom_filter: 3770 with self.check_context(arc.open(), custom_filter): 3771 self.expect_file('good') 3772 self.assertEqual(custom_filter.num_files_processed, 2) 3773 self.assertEqual(custom_filter.done, True) 3774 3775 def test_errorlevel(self): 3776 def extracterror_filter(tarinfo, path): 3777 raise tarfile.ExtractError('failed with ExtractError') 3778 def filtererror_filter(tarinfo, path): 3779 raise tarfile.FilterError('failed with FilterError') 3780 def oserror_filter(tarinfo, path): 3781 raise OSError('failed with OSError') 3782 def tarerror_filter(tarinfo, path): 3783 raise tarfile.TarError('failed with base TarError') 3784 def valueerror_filter(tarinfo, path): 3785 raise ValueError('failed with ValueError') 3786 3787 with ArchiveMaker() as arc: 3788 arc.add('file') 3789 3790 # If errorlevel is 0, errors affected by errorlevel are ignored 3791 3792 with self.check_context(arc.open(errorlevel=0), extracterror_filter): 3793 self.expect_file('file') 3794 3795 with self.check_context(arc.open(errorlevel=0), filtererror_filter): 3796 self.expect_file('file') 3797 3798 with self.check_context(arc.open(errorlevel=0), oserror_filter): 3799 self.expect_file('file') 3800 3801 with self.check_context(arc.open(errorlevel=0), tarerror_filter): 3802 self.expect_exception(tarfile.TarError) 3803 3804 with self.check_context(arc.open(errorlevel=0), valueerror_filter): 3805 self.expect_exception(ValueError) 3806 3807 # If 1, all fatal errors are raised 3808 3809 with self.check_context(arc.open(errorlevel=1), extracterror_filter): 3810 self.expect_file('file') 3811 3812 with self.check_context(arc.open(errorlevel=1), filtererror_filter): 3813 self.expect_exception(tarfile.FilterError) 3814 3815 with self.check_context(arc.open(errorlevel=1), oserror_filter): 3816 self.expect_exception(OSError) 3817 3818 with self.check_context(arc.open(errorlevel=1), tarerror_filter): 3819 self.expect_exception(tarfile.TarError) 3820 3821 with self.check_context(arc.open(errorlevel=1), valueerror_filter): 3822 self.expect_exception(ValueError) 3823 3824 # If 2, all non-fatal errors are raised as well. 3825 3826 with self.check_context(arc.open(errorlevel=2), extracterror_filter): 3827 self.expect_exception(tarfile.ExtractError) 3828 3829 with self.check_context(arc.open(errorlevel=2), filtererror_filter): 3830 self.expect_exception(tarfile.FilterError) 3831 3832 with self.check_context(arc.open(errorlevel=2), oserror_filter): 3833 self.expect_exception(OSError) 3834 3835 with self.check_context(arc.open(errorlevel=2), tarerror_filter): 3836 self.expect_exception(tarfile.TarError) 3837 3838 with self.check_context(arc.open(errorlevel=2), valueerror_filter): 3839 self.expect_exception(ValueError) 3840 3841 # We only handle ExtractionError, FilterError & OSError specially. 3842 3843 with self.check_context(arc.open(errorlevel='boo!'), filtererror_filter): 3844 self.expect_exception(TypeError) # errorlevel is not int 3845 3846 3847def setUpModule(): 3848 os_helper.unlink(TEMPDIR) 3849 os.makedirs(TEMPDIR) 3850 3851 global testtarnames 3852 testtarnames = [tarname] 3853 with open(tarname, "rb") as fobj: 3854 data = fobj.read() 3855 3856 # Create compressed tarfiles. 3857 for c in GzipTest, Bz2Test, LzmaTest: 3858 if c.open: 3859 os_helper.unlink(c.tarname) 3860 testtarnames.append(c.tarname) 3861 with c.open(c.tarname, "wb") as tar: 3862 tar.write(data) 3863 3864def tearDownModule(): 3865 if os.path.exists(TEMPDIR): 3866 os_helper.rmtree(TEMPDIR) 3867 3868if __name__ == "__main__": 3869 unittest.main() 3870