1import array 2import contextlib 3import importlib.util 4import io 5import itertools 6import os 7import pathlib 8import posixpath 9import string 10import struct 11import subprocess 12import sys 13from test.support.script_helper import assert_python_ok 14import time 15import unittest 16import unittest.mock as mock 17import zipfile 18import functools 19 20 21from tempfile import TemporaryFile 22from random import randint, random, randbytes 23 24from test.support import script_helper 25from test.support import ( 26 findfile, requires_zlib, requires_bz2, requires_lzma, 27 captured_stdout, captured_stderr, requires_subprocess 28) 29from test.support.os_helper import ( 30 TESTFN, unlink, rmtree, temp_dir, temp_cwd, fd_count 31) 32 33 34TESTFN2 = TESTFN + "2" 35TESTFNDIR = TESTFN + "d" 36FIXEDTEST_SIZE = 1000 37DATAFILES_DIR = 'zipfile_datafiles' 38 39SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 40 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 41 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 42 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 43 44def get_files(test): 45 yield TESTFN2 46 with TemporaryFile() as f: 47 yield f 48 test.assertFalse(f.closed) 49 with io.BytesIO() as f: 50 yield f 51 test.assertFalse(f.closed) 52 53class AbstractTestsWithSourceFile: 54 @classmethod 55 def setUpClass(cls): 56 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 57 (i, random()), "ascii") 58 for i in range(FIXEDTEST_SIZE)] 59 cls.data = b''.join(cls.line_gen) 60 61 def setUp(self): 62 # Make a source file with some lines 63 with open(TESTFN, "wb") as fp: 64 fp.write(self.data) 65 66 def make_test_archive(self, f, compression, compresslevel=None): 67 kwargs = {'compression': compression, 'compresslevel': compresslevel} 68 # Create the ZIP archive 69 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 70 zipfp.write(TESTFN, "another.name") 71 zipfp.write(TESTFN, TESTFN) 72 zipfp.writestr("strfile", self.data) 73 with zipfp.open('written-open-w', mode='w') as f: 74 for line in self.line_gen: 75 f.write(line) 76 77 def zip_test(self, f, compression, compresslevel=None): 78 self.make_test_archive(f, compression, compresslevel) 79 80 # Read the ZIP archive 81 with zipfile.ZipFile(f, "r", compression) as zipfp: 82 self.assertEqual(zipfp.read(TESTFN), self.data) 83 self.assertEqual(zipfp.read("another.name"), self.data) 84 self.assertEqual(zipfp.read("strfile"), self.data) 85 86 # Print the ZIP directory 87 fp = io.StringIO() 88 zipfp.printdir(file=fp) 89 directory = fp.getvalue() 90 lines = directory.splitlines() 91 self.assertEqual(len(lines), 5) # Number of files + header 92 93 self.assertIn('File Name', lines[0]) 94 self.assertIn('Modified', lines[0]) 95 self.assertIn('Size', lines[0]) 96 97 fn, date, time_, size = lines[1].split() 98 self.assertEqual(fn, 'another.name') 99 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 100 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 101 self.assertEqual(size, str(len(self.data))) 102 103 # Check the namelist 104 names = zipfp.namelist() 105 self.assertEqual(len(names), 4) 106 self.assertIn(TESTFN, names) 107 self.assertIn("another.name", names) 108 self.assertIn("strfile", names) 109 self.assertIn("written-open-w", names) 110 111 # Check infolist 112 infos = zipfp.infolist() 113 names = [i.filename for i in infos] 114 self.assertEqual(len(names), 4) 115 self.assertIn(TESTFN, names) 116 self.assertIn("another.name", names) 117 self.assertIn("strfile", names) 118 self.assertIn("written-open-w", names) 119 for i in infos: 120 self.assertEqual(i.file_size, len(self.data)) 121 122 # check getinfo 123 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 124 info = zipfp.getinfo(nm) 125 self.assertEqual(info.filename, nm) 126 self.assertEqual(info.file_size, len(self.data)) 127 128 # Check that testzip doesn't raise an exception 129 zipfp.testzip() 130 131 def test_basic(self): 132 for f in get_files(self): 133 self.zip_test(f, self.compression) 134 135 def zip_open_test(self, f, compression): 136 self.make_test_archive(f, compression) 137 138 # Read the ZIP archive 139 with zipfile.ZipFile(f, "r", compression) as zipfp: 140 zipdata1 = [] 141 with zipfp.open(TESTFN) as zipopen1: 142 while True: 143 read_data = zipopen1.read(256) 144 if not read_data: 145 break 146 zipdata1.append(read_data) 147 148 zipdata2 = [] 149 with zipfp.open("another.name") as zipopen2: 150 while True: 151 read_data = zipopen2.read(256) 152 if not read_data: 153 break 154 zipdata2.append(read_data) 155 156 self.assertEqual(b''.join(zipdata1), self.data) 157 self.assertEqual(b''.join(zipdata2), self.data) 158 159 def test_open(self): 160 for f in get_files(self): 161 self.zip_open_test(f, self.compression) 162 163 def test_open_with_pathlike(self): 164 path = pathlib.Path(TESTFN2) 165 self.zip_open_test(path, self.compression) 166 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 167 self.assertIsInstance(zipfp.filename, str) 168 169 def zip_random_open_test(self, f, compression): 170 self.make_test_archive(f, compression) 171 172 # Read the ZIP archive 173 with zipfile.ZipFile(f, "r", compression) as zipfp: 174 zipdata1 = [] 175 with zipfp.open(TESTFN) as zipopen1: 176 while True: 177 read_data = zipopen1.read(randint(1, 1024)) 178 if not read_data: 179 break 180 zipdata1.append(read_data) 181 182 self.assertEqual(b''.join(zipdata1), self.data) 183 184 def test_random_open(self): 185 for f in get_files(self): 186 self.zip_random_open_test(f, self.compression) 187 188 def zip_read1_test(self, f, compression): 189 self.make_test_archive(f, compression) 190 191 # Read the ZIP archive 192 with zipfile.ZipFile(f, "r") as zipfp, \ 193 zipfp.open(TESTFN) as zipopen: 194 zipdata = [] 195 while True: 196 read_data = zipopen.read1(-1) 197 if not read_data: 198 break 199 zipdata.append(read_data) 200 201 self.assertEqual(b''.join(zipdata), self.data) 202 203 def test_read1(self): 204 for f in get_files(self): 205 self.zip_read1_test(f, self.compression) 206 207 def zip_read1_10_test(self, f, compression): 208 self.make_test_archive(f, compression) 209 210 # Read the ZIP archive 211 with zipfile.ZipFile(f, "r") as zipfp, \ 212 zipfp.open(TESTFN) as zipopen: 213 zipdata = [] 214 while True: 215 read_data = zipopen.read1(10) 216 self.assertLessEqual(len(read_data), 10) 217 if not read_data: 218 break 219 zipdata.append(read_data) 220 221 self.assertEqual(b''.join(zipdata), self.data) 222 223 def test_read1_10(self): 224 for f in get_files(self): 225 self.zip_read1_10_test(f, self.compression) 226 227 def zip_readline_read_test(self, f, compression): 228 self.make_test_archive(f, compression) 229 230 # Read the ZIP archive 231 with zipfile.ZipFile(f, "r") as zipfp, \ 232 zipfp.open(TESTFN) as zipopen: 233 data = b'' 234 while True: 235 read = zipopen.readline() 236 if not read: 237 break 238 data += read 239 240 read = zipopen.read(100) 241 if not read: 242 break 243 data += read 244 245 self.assertEqual(data, self.data) 246 247 def test_readline_read(self): 248 # Issue #7610: calls to readline() interleaved with calls to read(). 249 for f in get_files(self): 250 self.zip_readline_read_test(f, self.compression) 251 252 def zip_readline_test(self, f, compression): 253 self.make_test_archive(f, compression) 254 255 # Read the ZIP archive 256 with zipfile.ZipFile(f, "r") as zipfp: 257 with zipfp.open(TESTFN) as zipopen: 258 for line in self.line_gen: 259 linedata = zipopen.readline() 260 self.assertEqual(linedata, line) 261 262 def test_readline(self): 263 for f in get_files(self): 264 self.zip_readline_test(f, self.compression) 265 266 def zip_readlines_test(self, f, compression): 267 self.make_test_archive(f, compression) 268 269 # Read the ZIP archive 270 with zipfile.ZipFile(f, "r") as zipfp: 271 with zipfp.open(TESTFN) as zipopen: 272 ziplines = zipopen.readlines() 273 for line, zipline in zip(self.line_gen, ziplines): 274 self.assertEqual(zipline, line) 275 276 def test_readlines(self): 277 for f in get_files(self): 278 self.zip_readlines_test(f, self.compression) 279 280 def zip_iterlines_test(self, f, compression): 281 self.make_test_archive(f, compression) 282 283 # Read the ZIP archive 284 with zipfile.ZipFile(f, "r") as zipfp: 285 with zipfp.open(TESTFN) as zipopen: 286 for line, zipline in zip(self.line_gen, zipopen): 287 self.assertEqual(zipline, line) 288 289 def test_iterlines(self): 290 for f in get_files(self): 291 self.zip_iterlines_test(f, self.compression) 292 293 def test_low_compression(self): 294 """Check for cases where compressed data is larger than original.""" 295 # Create the ZIP archive 296 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 297 zipfp.writestr("strfile", '12') 298 299 # Get an open object for strfile 300 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 301 with zipfp.open("strfile") as openobj: 302 self.assertEqual(openobj.read(1), b'1') 303 self.assertEqual(openobj.read(1), b'2') 304 305 def test_writestr_compression(self): 306 zipfp = zipfile.ZipFile(TESTFN2, "w") 307 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 308 info = zipfp.getinfo('b.txt') 309 self.assertEqual(info.compress_type, self.compression) 310 311 def test_writestr_compresslevel(self): 312 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 313 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 314 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 315 compresslevel=2) 316 317 # Compression level follows the constructor. 318 a_info = zipfp.getinfo('a.txt') 319 self.assertEqual(a_info.compress_type, self.compression) 320 self.assertEqual(a_info._compresslevel, 1) 321 322 # Compression level is overridden. 323 b_info = zipfp.getinfo('b.txt') 324 self.assertEqual(b_info.compress_type, self.compression) 325 self.assertEqual(b_info._compresslevel, 2) 326 327 def test_read_return_size(self): 328 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 329 # than requested. 330 for test_size in (1, 4095, 4096, 4097, 16384): 331 file_size = test_size + 1 332 junk = randbytes(file_size) 333 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 334 zipf.writestr('foo', junk) 335 with zipf.open('foo', 'r') as fp: 336 buf = fp.read(test_size) 337 self.assertEqual(len(buf), test_size) 338 339 def test_truncated_zipfile(self): 340 fp = io.BytesIO() 341 with zipfile.ZipFile(fp, mode='w') as zipf: 342 zipf.writestr('strfile', self.data, compress_type=self.compression) 343 end_offset = fp.tell() 344 zipfiledata = fp.getvalue() 345 346 fp = io.BytesIO(zipfiledata) 347 with zipfile.ZipFile(fp) as zipf: 348 with zipf.open('strfile') as zipopen: 349 fp.truncate(end_offset - 20) 350 with self.assertRaises(EOFError): 351 zipopen.read() 352 353 fp = io.BytesIO(zipfiledata) 354 with zipfile.ZipFile(fp) as zipf: 355 with zipf.open('strfile') as zipopen: 356 fp.truncate(end_offset - 20) 357 with self.assertRaises(EOFError): 358 while zipopen.read(100): 359 pass 360 361 fp = io.BytesIO(zipfiledata) 362 with zipfile.ZipFile(fp) as zipf: 363 with zipf.open('strfile') as zipopen: 364 fp.truncate(end_offset - 20) 365 with self.assertRaises(EOFError): 366 while zipopen.read1(100): 367 pass 368 369 def test_repr(self): 370 fname = 'file.name' 371 for f in get_files(self): 372 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 373 zipfp.write(TESTFN, fname) 374 r = repr(zipfp) 375 self.assertIn("mode='w'", r) 376 377 with zipfile.ZipFile(f, 'r') as zipfp: 378 r = repr(zipfp) 379 if isinstance(f, str): 380 self.assertIn('filename=%r' % f, r) 381 else: 382 self.assertIn('file=%r' % f, r) 383 self.assertIn("mode='r'", r) 384 r = repr(zipfp.getinfo(fname)) 385 self.assertIn('filename=%r' % fname, r) 386 self.assertIn('filemode=', r) 387 self.assertIn('file_size=', r) 388 if self.compression != zipfile.ZIP_STORED: 389 self.assertIn('compress_type=', r) 390 self.assertIn('compress_size=', r) 391 with zipfp.open(fname) as zipopen: 392 r = repr(zipopen) 393 self.assertIn('name=%r' % fname, r) 394 self.assertIn("mode='r'", r) 395 if self.compression != zipfile.ZIP_STORED: 396 self.assertIn('compress_type=', r) 397 self.assertIn('[closed]', repr(zipopen)) 398 self.assertIn('[closed]', repr(zipfp)) 399 400 def test_compresslevel_basic(self): 401 for f in get_files(self): 402 self.zip_test(f, self.compression, compresslevel=9) 403 404 def test_per_file_compresslevel(self): 405 """Check that files within a Zip archive can have different 406 compression levels.""" 407 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 408 zipfp.write(TESTFN, 'compress_1') 409 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 410 one_info = zipfp.getinfo('compress_1') 411 nine_info = zipfp.getinfo('compress_9') 412 self.assertEqual(one_info._compresslevel, 1) 413 self.assertEqual(nine_info._compresslevel, 9) 414 415 def test_writing_errors(self): 416 class BrokenFile(io.BytesIO): 417 def write(self, data): 418 nonlocal count 419 if count is not None: 420 if count == stop: 421 raise OSError 422 count += 1 423 super().write(data) 424 425 stop = 0 426 while True: 427 testfile = BrokenFile() 428 count = None 429 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 430 with zipfp.open('file1', 'w') as f: 431 f.write(b'data1') 432 count = 0 433 try: 434 with zipfp.open('file2', 'w') as f: 435 f.write(b'data2') 436 except OSError: 437 stop += 1 438 else: 439 break 440 finally: 441 count = None 442 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 443 self.assertEqual(zipfp.namelist(), ['file1']) 444 self.assertEqual(zipfp.read('file1'), b'data1') 445 446 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 447 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 448 self.assertEqual(zipfp.read('file1'), b'data1') 449 self.assertEqual(zipfp.read('file2'), b'data2') 450 451 452 def tearDown(self): 453 unlink(TESTFN) 454 unlink(TESTFN2) 455 456 457class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 458 unittest.TestCase): 459 compression = zipfile.ZIP_STORED 460 test_low_compression = None 461 462 def zip_test_writestr_permissions(self, f, compression): 463 # Make sure that writestr and open(... mode='w') create files with 464 # mode 0600, when they are passed a name rather than a ZipInfo 465 # instance. 466 467 self.make_test_archive(f, compression) 468 with zipfile.ZipFile(f, "r") as zipfp: 469 zinfo = zipfp.getinfo('strfile') 470 self.assertEqual(zinfo.external_attr, 0o600 << 16) 471 472 zinfo2 = zipfp.getinfo('written-open-w') 473 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 474 475 def test_writestr_permissions(self): 476 for f in get_files(self): 477 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 478 479 def test_absolute_arcnames(self): 480 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 481 zipfp.write(TESTFN, "/absolute") 482 483 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 484 self.assertEqual(zipfp.namelist(), ["absolute"]) 485 486 def test_append_to_zip_file(self): 487 """Test appending to an existing zipfile.""" 488 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 489 zipfp.write(TESTFN, TESTFN) 490 491 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 492 zipfp.writestr("strfile", self.data) 493 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 494 495 def test_append_to_non_zip_file(self): 496 """Test appending to an existing file that is not a zipfile.""" 497 # NOTE: this test fails if len(d) < 22 because of the first 498 # line "fpin.seek(-22, 2)" in _EndRecData 499 data = b'I am not a ZipFile!'*10 500 with open(TESTFN2, 'wb') as f: 501 f.write(data) 502 503 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 504 zipfp.write(TESTFN, TESTFN) 505 506 with open(TESTFN2, 'rb') as f: 507 f.seek(len(data)) 508 with zipfile.ZipFile(f, "r") as zipfp: 509 self.assertEqual(zipfp.namelist(), [TESTFN]) 510 self.assertEqual(zipfp.read(TESTFN), self.data) 511 with open(TESTFN2, 'rb') as f: 512 self.assertEqual(f.read(len(data)), data) 513 zipfiledata = f.read() 514 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 515 self.assertEqual(zipfp.namelist(), [TESTFN]) 516 self.assertEqual(zipfp.read(TESTFN), self.data) 517 518 def test_read_concatenated_zip_file(self): 519 with io.BytesIO() as bio: 520 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 521 zipfp.write(TESTFN, TESTFN) 522 zipfiledata = bio.getvalue() 523 data = b'I am not a ZipFile!'*10 524 with open(TESTFN2, 'wb') as f: 525 f.write(data) 526 f.write(zipfiledata) 527 528 with zipfile.ZipFile(TESTFN2) as zipfp: 529 self.assertEqual(zipfp.namelist(), [TESTFN]) 530 self.assertEqual(zipfp.read(TESTFN), self.data) 531 532 def test_append_to_concatenated_zip_file(self): 533 with io.BytesIO() as bio: 534 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 535 zipfp.write(TESTFN, TESTFN) 536 zipfiledata = bio.getvalue() 537 data = b'I am not a ZipFile!'*1000000 538 with open(TESTFN2, 'wb') as f: 539 f.write(data) 540 f.write(zipfiledata) 541 542 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 543 self.assertEqual(zipfp.namelist(), [TESTFN]) 544 zipfp.writestr('strfile', self.data) 545 546 with open(TESTFN2, 'rb') as f: 547 self.assertEqual(f.read(len(data)), data) 548 zipfiledata = f.read() 549 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 550 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 551 self.assertEqual(zipfp.read(TESTFN), self.data) 552 self.assertEqual(zipfp.read('strfile'), self.data) 553 554 def test_ignores_newline_at_end(self): 555 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 556 zipfp.write(TESTFN, TESTFN) 557 with open(TESTFN2, 'a', encoding='utf-8') as f: 558 f.write("\r\n\00\00\00") 559 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 560 self.assertIsInstance(zipfp, zipfile.ZipFile) 561 562 def test_ignores_stuff_appended_past_comments(self): 563 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 564 zipfp.comment = b"this is a comment" 565 zipfp.write(TESTFN, TESTFN) 566 with open(TESTFN2, 'a', encoding='utf-8') as f: 567 f.write("abcdef\r\n") 568 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 569 self.assertIsInstance(zipfp, zipfile.ZipFile) 570 self.assertEqual(zipfp.comment, b"this is a comment") 571 572 def test_write_default_name(self): 573 """Check that calling ZipFile.write without arcname specified 574 produces the expected result.""" 575 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 576 zipfp.write(TESTFN) 577 with open(TESTFN, "rb") as f: 578 self.assertEqual(zipfp.read(TESTFN), f.read()) 579 580 def test_io_on_closed_zipextfile(self): 581 fname = "somefile.txt" 582 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 583 zipfp.writestr(fname, "bogus") 584 585 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 586 with zipfp.open(fname) as fid: 587 fid.close() 588 self.assertRaises(ValueError, fid.read) 589 self.assertRaises(ValueError, fid.seek, 0) 590 self.assertRaises(ValueError, fid.tell) 591 self.assertRaises(ValueError, fid.readable) 592 self.assertRaises(ValueError, fid.seekable) 593 594 def test_write_to_readonly(self): 595 """Check that trying to call write() on a readonly ZipFile object 596 raises a ValueError.""" 597 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 598 zipfp.writestr("somefile.txt", "bogus") 599 600 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 601 self.assertRaises(ValueError, zipfp.write, TESTFN) 602 603 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 604 with self.assertRaises(ValueError): 605 zipfp.open(TESTFN, mode='w') 606 607 def test_add_file_before_1980(self): 608 # Set atime and mtime to 1970-01-01 609 os.utime(TESTFN, (0, 0)) 610 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 611 self.assertRaises(ValueError, zipfp.write, TESTFN) 612 613 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 614 zipfp.write(TESTFN) 615 zinfo = zipfp.getinfo(TESTFN) 616 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 617 618 def test_add_file_after_2107(self): 619 # Set atime and mtime to 2108-12-30 620 ts = 4386268800 621 try: 622 time.localtime(ts) 623 except OverflowError: 624 self.skipTest(f'time.localtime({ts}) raises OverflowError') 625 try: 626 os.utime(TESTFN, (ts, ts)) 627 except OverflowError: 628 self.skipTest('Host fs cannot set timestamp to required value.') 629 630 mtime_ns = os.stat(TESTFN).st_mtime_ns 631 if mtime_ns != (4386268800 * 10**9): 632 # XFS filesystem is limited to 32-bit timestamp, but the syscall 633 # didn't fail. Moreover, there is a VFS bug which returns 634 # a cached timestamp which is different than the value on disk. 635 # 636 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 637 # 638 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 639 # https://bugs.python.org/issue39460#msg360952 640 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 641 642 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 643 self.assertRaises(struct.error, zipfp.write, TESTFN) 644 645 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 646 zipfp.write(TESTFN) 647 zinfo = zipfp.getinfo(TESTFN) 648 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 649 650 651@requires_zlib() 652class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 653 unittest.TestCase): 654 compression = zipfile.ZIP_DEFLATED 655 656 def test_per_file_compression(self): 657 """Check that files within a Zip archive can have different 658 compression options.""" 659 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 660 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 661 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 662 sinfo = zipfp.getinfo('storeme') 663 dinfo = zipfp.getinfo('deflateme') 664 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 665 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 666 667@requires_bz2() 668class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 669 unittest.TestCase): 670 compression = zipfile.ZIP_BZIP2 671 672@requires_lzma() 673class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 674 unittest.TestCase): 675 compression = zipfile.ZIP_LZMA 676 677 678class AbstractTestZip64InSmallFiles: 679 # These tests test the ZIP64 functionality without using large files, 680 # see test_zipfile64 for proper tests. 681 682 @classmethod 683 def setUpClass(cls): 684 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 685 for i in range(0, FIXEDTEST_SIZE)) 686 cls.data = b'\n'.join(line_gen) 687 688 def setUp(self): 689 self._limit = zipfile.ZIP64_LIMIT 690 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 691 zipfile.ZIP64_LIMIT = 1000 692 zipfile.ZIP_FILECOUNT_LIMIT = 9 693 694 # Make a source file with some lines 695 with open(TESTFN, "wb") as fp: 696 fp.write(self.data) 697 698 def zip_test(self, f, compression): 699 # Create the ZIP archive 700 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 701 zipfp.write(TESTFN, "another.name") 702 zipfp.write(TESTFN, TESTFN) 703 zipfp.writestr("strfile", self.data) 704 705 # Read the ZIP archive 706 with zipfile.ZipFile(f, "r", compression) as zipfp: 707 self.assertEqual(zipfp.read(TESTFN), self.data) 708 self.assertEqual(zipfp.read("another.name"), self.data) 709 self.assertEqual(zipfp.read("strfile"), self.data) 710 711 # Print the ZIP directory 712 fp = io.StringIO() 713 zipfp.printdir(fp) 714 715 directory = fp.getvalue() 716 lines = directory.splitlines() 717 self.assertEqual(len(lines), 4) # Number of files + header 718 719 self.assertIn('File Name', lines[0]) 720 self.assertIn('Modified', lines[0]) 721 self.assertIn('Size', lines[0]) 722 723 fn, date, time_, size = lines[1].split() 724 self.assertEqual(fn, 'another.name') 725 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 726 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 727 self.assertEqual(size, str(len(self.data))) 728 729 # Check the namelist 730 names = zipfp.namelist() 731 self.assertEqual(len(names), 3) 732 self.assertIn(TESTFN, names) 733 self.assertIn("another.name", names) 734 self.assertIn("strfile", names) 735 736 # Check infolist 737 infos = zipfp.infolist() 738 names = [i.filename for i in infos] 739 self.assertEqual(len(names), 3) 740 self.assertIn(TESTFN, names) 741 self.assertIn("another.name", names) 742 self.assertIn("strfile", names) 743 for i in infos: 744 self.assertEqual(i.file_size, len(self.data)) 745 746 # check getinfo 747 for nm in (TESTFN, "another.name", "strfile"): 748 info = zipfp.getinfo(nm) 749 self.assertEqual(info.filename, nm) 750 self.assertEqual(info.file_size, len(self.data)) 751 752 # Check that testzip doesn't raise an exception 753 zipfp.testzip() 754 755 def test_basic(self): 756 for f in get_files(self): 757 self.zip_test(f, self.compression) 758 759 def test_too_many_files(self): 760 # This test checks that more than 64k files can be added to an archive, 761 # and that the resulting archive can be read properly by ZipFile 762 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 763 allowZip64=True) 764 zipf.debug = 100 765 numfiles = 15 766 for i in range(numfiles): 767 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 768 self.assertEqual(len(zipf.namelist()), numfiles) 769 zipf.close() 770 771 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 772 self.assertEqual(len(zipf2.namelist()), numfiles) 773 for i in range(numfiles): 774 content = zipf2.read("foo%08d" % i).decode('ascii') 775 self.assertEqual(content, "%d" % (i**3 % 57)) 776 zipf2.close() 777 778 def test_too_many_files_append(self): 779 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 780 allowZip64=False) 781 zipf.debug = 100 782 numfiles = 9 783 for i in range(numfiles): 784 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 785 self.assertEqual(len(zipf.namelist()), numfiles) 786 with self.assertRaises(zipfile.LargeZipFile): 787 zipf.writestr("foo%08d" % numfiles, b'') 788 self.assertEqual(len(zipf.namelist()), numfiles) 789 zipf.close() 790 791 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 792 allowZip64=False) 793 zipf.debug = 100 794 self.assertEqual(len(zipf.namelist()), numfiles) 795 with self.assertRaises(zipfile.LargeZipFile): 796 zipf.writestr("foo%08d" % numfiles, b'') 797 self.assertEqual(len(zipf.namelist()), numfiles) 798 zipf.close() 799 800 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 801 allowZip64=True) 802 zipf.debug = 100 803 self.assertEqual(len(zipf.namelist()), numfiles) 804 numfiles2 = 15 805 for i in range(numfiles, numfiles2): 806 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 807 self.assertEqual(len(zipf.namelist()), numfiles2) 808 zipf.close() 809 810 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 811 self.assertEqual(len(zipf2.namelist()), numfiles2) 812 for i in range(numfiles2): 813 content = zipf2.read("foo%08d" % i).decode('ascii') 814 self.assertEqual(content, "%d" % (i**3 % 57)) 815 zipf2.close() 816 817 def tearDown(self): 818 zipfile.ZIP64_LIMIT = self._limit 819 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 820 unlink(TESTFN) 821 unlink(TESTFN2) 822 823 824class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 825 unittest.TestCase): 826 compression = zipfile.ZIP_STORED 827 828 def large_file_exception_test(self, f, compression): 829 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 830 self.assertRaises(zipfile.LargeZipFile, 831 zipfp.write, TESTFN, "another.name") 832 833 def large_file_exception_test2(self, f, compression): 834 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 835 self.assertRaises(zipfile.LargeZipFile, 836 zipfp.writestr, "another.name", self.data) 837 838 def test_large_file_exception(self): 839 for f in get_files(self): 840 self.large_file_exception_test(f, zipfile.ZIP_STORED) 841 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 842 843 def test_absolute_arcnames(self): 844 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 845 allowZip64=True) as zipfp: 846 zipfp.write(TESTFN, "/absolute") 847 848 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 849 self.assertEqual(zipfp.namelist(), ["absolute"]) 850 851 def test_append(self): 852 # Test that appending to the Zip64 archive doesn't change 853 # extra fields of existing entries. 854 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 855 zipfp.writestr("strfile", self.data) 856 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 857 zinfo = zipfp.getinfo("strfile") 858 extra = zinfo.extra 859 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 860 zipfp.writestr("strfile2", self.data) 861 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 862 zinfo = zipfp.getinfo("strfile") 863 self.assertEqual(zinfo.extra, extra) 864 865 def make_zip64_file( 866 self, file_size_64_set=False, file_size_extra=False, 867 compress_size_64_set=False, compress_size_extra=False, 868 header_offset_64_set=False, header_offset_extra=False, 869 ): 870 """Generate bytes sequence for a zip with (incomplete) zip64 data. 871 872 The actual values (not the zip 64 0xffffffff values) stored in the file 873 are: 874 file_size: 8 875 compress_size: 8 876 header_offset: 0 877 """ 878 actual_size = 8 879 actual_header_offset = 0 880 local_zip64_fields = [] 881 central_zip64_fields = [] 882 883 file_size = actual_size 884 if file_size_64_set: 885 file_size = 0xffffffff 886 if file_size_extra: 887 local_zip64_fields.append(actual_size) 888 central_zip64_fields.append(actual_size) 889 file_size = struct.pack("<L", file_size) 890 891 compress_size = actual_size 892 if compress_size_64_set: 893 compress_size = 0xffffffff 894 if compress_size_extra: 895 local_zip64_fields.append(actual_size) 896 central_zip64_fields.append(actual_size) 897 compress_size = struct.pack("<L", compress_size) 898 899 header_offset = actual_header_offset 900 if header_offset_64_set: 901 header_offset = 0xffffffff 902 if header_offset_extra: 903 central_zip64_fields.append(actual_header_offset) 904 header_offset = struct.pack("<L", header_offset) 905 906 local_extra = struct.pack( 907 '<HH' + 'Q'*len(local_zip64_fields), 908 0x0001, 909 8*len(local_zip64_fields), 910 *local_zip64_fields 911 ) 912 913 central_extra = struct.pack( 914 '<HH' + 'Q'*len(central_zip64_fields), 915 0x0001, 916 8*len(central_zip64_fields), 917 *central_zip64_fields 918 ) 919 920 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 921 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 922 923 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 924 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 925 926 filename = b"test.txt" 927 content = b"test1234" 928 filename_length = struct.pack("<H", len(filename)) 929 zip64_contents = ( 930 # Local file header 931 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 932 + compress_size 933 + file_size 934 + filename_length 935 + local_extra_length 936 + filename 937 + local_extra 938 + content 939 # Central directory: 940 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 941 + compress_size 942 + file_size 943 + filename_length 944 + central_extra_length 945 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 946 + header_offset 947 + filename 948 + central_extra 949 # Zip64 end of central directory 950 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 951 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 952 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 953 + central_dir_size 954 + offset_to_central_dir 955 # Zip64 end of central directory locator 956 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 957 + b"\x00\x00\x00" 958 # end of central directory 959 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 960 + b"\x00\x00\x00\x00" 961 ) 962 return zip64_contents 963 964 def test_bad_zip64_extra(self): 965 """Missing zip64 extra records raises an exception. 966 967 There are 4 fields that the zip64 format handles (the disk number is 968 not used in this module and so is ignored here). According to the zip 969 spec: 970 The order of the fields in the zip64 extended 971 information record is fixed, but the fields MUST 972 only appear if the corresponding Local or Central 973 directory record field is set to 0xFFFF or 0xFFFFFFFF. 974 975 If the zip64 extra content doesn't contain enough entries for the 976 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 977 This test mismatches the length of the zip64 extra field and the number 978 of fields set to indicate the presence of zip64 data. 979 """ 980 # zip64 file size present, no fields in extra, expecting one, equals 981 # missing file size. 982 missing_file_size_extra = self.make_zip64_file( 983 file_size_64_set=True, 984 ) 985 with self.assertRaises(zipfile.BadZipFile) as e: 986 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 987 self.assertIn('file size', str(e.exception).lower()) 988 989 # zip64 file size present, zip64 compress size present, one field in 990 # extra, expecting two, equals missing compress size. 991 missing_compress_size_extra = self.make_zip64_file( 992 file_size_64_set=True, 993 file_size_extra=True, 994 compress_size_64_set=True, 995 ) 996 with self.assertRaises(zipfile.BadZipFile) as e: 997 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 998 self.assertIn('compress size', str(e.exception).lower()) 999 1000 # zip64 compress size present, no fields in extra, expecting one, 1001 # equals missing compress size. 1002 missing_compress_size_extra = self.make_zip64_file( 1003 compress_size_64_set=True, 1004 ) 1005 with self.assertRaises(zipfile.BadZipFile) as e: 1006 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 1007 self.assertIn('compress size', str(e.exception).lower()) 1008 1009 # zip64 file size present, zip64 compress size present, zip64 header 1010 # offset present, two fields in extra, expecting three, equals missing 1011 # header offset 1012 missing_header_offset_extra = self.make_zip64_file( 1013 file_size_64_set=True, 1014 file_size_extra=True, 1015 compress_size_64_set=True, 1016 compress_size_extra=True, 1017 header_offset_64_set=True, 1018 ) 1019 with self.assertRaises(zipfile.BadZipFile) as e: 1020 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1021 self.assertIn('header offset', str(e.exception).lower()) 1022 1023 # zip64 compress size present, zip64 header offset present, one field 1024 # in extra, expecting two, equals missing header offset 1025 missing_header_offset_extra = self.make_zip64_file( 1026 file_size_64_set=False, 1027 compress_size_64_set=True, 1028 compress_size_extra=True, 1029 header_offset_64_set=True, 1030 ) 1031 with self.assertRaises(zipfile.BadZipFile) as e: 1032 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1033 self.assertIn('header offset', str(e.exception).lower()) 1034 1035 # zip64 file size present, zip64 header offset present, one field in 1036 # extra, expecting two, equals missing header offset 1037 missing_header_offset_extra = self.make_zip64_file( 1038 file_size_64_set=True, 1039 file_size_extra=True, 1040 compress_size_64_set=False, 1041 header_offset_64_set=True, 1042 ) 1043 with self.assertRaises(zipfile.BadZipFile) as e: 1044 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1045 self.assertIn('header offset', str(e.exception).lower()) 1046 1047 # zip64 header offset present, no fields in extra, expecting one, 1048 # equals missing header offset 1049 missing_header_offset_extra = self.make_zip64_file( 1050 file_size_64_set=False, 1051 compress_size_64_set=False, 1052 header_offset_64_set=True, 1053 ) 1054 with self.assertRaises(zipfile.BadZipFile) as e: 1055 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1056 self.assertIn('header offset', str(e.exception).lower()) 1057 1058 def test_generated_valid_zip64_extra(self): 1059 # These values are what is set in the make_zip64_file method. 1060 expected_file_size = 8 1061 expected_compress_size = 8 1062 expected_header_offset = 0 1063 expected_content = b"test1234" 1064 1065 # Loop through the various valid combinations of zip64 masks 1066 # present and extra fields present. 1067 params = ( 1068 {"file_size_64_set": True, "file_size_extra": True}, 1069 {"compress_size_64_set": True, "compress_size_extra": True}, 1070 {"header_offset_64_set": True, "header_offset_extra": True}, 1071 ) 1072 1073 for r in range(1, len(params) + 1): 1074 for combo in itertools.combinations(params, r): 1075 kwargs = {} 1076 for c in combo: 1077 kwargs.update(c) 1078 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1079 zinfo = zf.infolist()[0] 1080 self.assertEqual(zinfo.file_size, expected_file_size) 1081 self.assertEqual(zinfo.compress_size, expected_compress_size) 1082 self.assertEqual(zinfo.header_offset, expected_header_offset) 1083 self.assertEqual(zf.read(zinfo), expected_content) 1084 1085 def test_force_zip64(self): 1086 """Test that forcing zip64 extensions correctly notes this in the zip file""" 1087 1088 # GH-103861 describes an issue where forcing a small file to use zip64 1089 # extensions would add a zip64 extra record, but not change the data 1090 # sizes to 0xFFFFFFFF to indicate to the extractor that the zip64 1091 # record should be read. Additionally, it would not set the required 1092 # version to indicate that zip64 extensions are required to extract it. 1093 # This test replicates the situation and reads the raw data to specifically ensure: 1094 # - The required extract version is always >= ZIP64_VERSION 1095 # - The compressed and uncompressed size in the file headers are both 1096 # 0xFFFFFFFF (ie. point to zip64 record) 1097 # - The zip64 record is provided and has the correct sizes in it 1098 # Other aspects of the zip are checked as well, but verifying the above is the main goal. 1099 # Because this is hard to verify by parsing the data as a zip, the raw 1100 # bytes are checked to ensure that they line up with the zip spec. 1101 # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT 1102 # The relevent sections for this test are: 1103 # - 4.3.7 for local file header 1104 # - 4.5.3 for zip64 extra field 1105 1106 data = io.BytesIO() 1107 with zipfile.ZipFile(data, mode="w", allowZip64=True) as zf: 1108 with zf.open("text.txt", mode="w", force_zip64=True) as zi: 1109 zi.write(b"_") 1110 1111 zipdata = data.getvalue() 1112 1113 # pull out and check zip information 1114 ( 1115 header, vers, os, flags, comp, csize, usize, fn_len, 1116 ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, cd_sig 1117 ) = struct.unpack("<4sBBHH8xIIHH8shhQQx4s", zipdata[:63]) 1118 1119 self.assertEqual(header, b"PK\x03\x04") # local file header 1120 self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract 1121 self.assertEqual(os, 0) # compatible with MS-DOS 1122 self.assertEqual(flags, 0) # no flags 1123 self.assertEqual(comp, 0) # compression method = stored 1124 self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra 1125 self.assertEqual(usize, 0xFFFFFFFF) 1126 self.assertEqual(fn_len, 8) # filename len 1127 self.assertEqual(ex_total_len, 20) # size of extra records 1128 self.assertEqual(ex_id, 1) # Zip64 extra record 1129 self.assertEqual(ex_len, 16) # 16 bytes of data 1130 self.assertEqual(ex_usize, 1) # uncompressed size 1131 self.assertEqual(ex_csize, 1) # compressed size 1132 self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next 1133 1134 z = zipfile.ZipFile(io.BytesIO(zipdata)) 1135 zinfos = z.infolist() 1136 self.assertEqual(len(zinfos), 1) 1137 self.assertGreaterEqual(zinfos[0].extract_version, zipfile.ZIP64_VERSION) # requires zip64 to extract 1138 1139 def test_unseekable_zip_unknown_filesize(self): 1140 """Test that creating a zip with/without seeking will raise a RuntimeError if zip64 was required but not used""" 1141 1142 def make_zip(fp): 1143 with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf: 1144 with zf.open("text.txt", mode="w", force_zip64=False) as zi: 1145 zi.write(b"_" * (zipfile.ZIP64_LIMIT + 1)) 1146 1147 self.assertRaises(RuntimeError, make_zip, io.BytesIO()) 1148 self.assertRaises(RuntimeError, make_zip, Unseekable(io.BytesIO())) 1149 1150 def test_zip64_required_not_allowed_fail(self): 1151 """Test that trying to add a large file to a zip that doesn't allow zip64 extensions fails on add""" 1152 def make_zip(fp): 1153 with zipfile.ZipFile(fp, mode="w", allowZip64=False) as zf: 1154 # pretend zipfile.ZipInfo.from_file was used to get the name and filesize 1155 info = zipfile.ZipInfo("text.txt") 1156 info.file_size = zipfile.ZIP64_LIMIT + 1 1157 zf.open(info, mode="w") 1158 1159 self.assertRaises(zipfile.LargeZipFile, make_zip, io.BytesIO()) 1160 self.assertRaises(zipfile.LargeZipFile, make_zip, Unseekable(io.BytesIO())) 1161 1162 def test_unseekable_zip_known_filesize(self): 1163 """Test that creating a zip without seeking will use zip64 extensions if the file size is provided up-front""" 1164 1165 # This test ensures that the zip will use a zip64 data descriptor (same 1166 # as a regular data descriptor except the sizes are 8 bytes instead of 1167 # 4) record to communicate the size of a file if the zip is being 1168 # written to an unseekable stream. 1169 # Because this sort of thing is hard to verify by parsing the data back 1170 # in as a zip, this test looks at the raw bytes created to ensure that 1171 # the correct data has been generated. 1172 # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT 1173 # The relevent sections for this test are: 1174 # - 4.3.7 for local file header 1175 # - 4.3.9 for the data descriptor 1176 # - 4.5.3 for zip64 extra field 1177 1178 file_size = zipfile.ZIP64_LIMIT + 1 1179 1180 def make_zip(fp): 1181 with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf: 1182 # pretend zipfile.ZipInfo.from_file was used to get the name and filesize 1183 info = zipfile.ZipInfo("text.txt") 1184 info.file_size = file_size 1185 with zf.open(info, mode="w", force_zip64=False) as zi: 1186 zi.write(b"_" * file_size) 1187 return fp 1188 1189 # check seekable file information 1190 seekable_data = make_zip(io.BytesIO()).getvalue() 1191 ( 1192 header, vers, os, flags, comp, csize, usize, fn_len, 1193 ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, 1194 cd_sig 1195 ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s".format(file_size), seekable_data[:62 + file_size]) 1196 1197 self.assertEqual(header, b"PK\x03\x04") # local file header 1198 self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract 1199 self.assertEqual(os, 0) # compatible with MS-DOS 1200 self.assertEqual(flags, 0) # no flags set 1201 self.assertEqual(comp, 0) # compression method = stored 1202 self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra 1203 self.assertEqual(usize, 0xFFFFFFFF) 1204 self.assertEqual(fn_len, 8) # filename len 1205 self.assertEqual(ex_total_len, 20) # size of extra records 1206 self.assertEqual(ex_id, 1) # Zip64 extra record 1207 self.assertEqual(ex_len, 16) # 16 bytes of data 1208 self.assertEqual(ex_usize, file_size) # uncompressed size 1209 self.assertEqual(ex_csize, file_size) # compressed size 1210 self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next 1211 1212 # check unseekable file information 1213 unseekable_data = make_zip(Unseekable(io.BytesIO())).fp.getvalue() 1214 ( 1215 header, vers, os, flags, comp, csize, usize, fn_len, 1216 ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, 1217 dd_header, dd_usize, dd_csize, cd_sig 1218 ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s4xQQ4s".format(file_size), unseekable_data[:86 + file_size]) 1219 1220 self.assertEqual(header, b"PK\x03\x04") # local file header 1221 self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract 1222 self.assertEqual(os, 0) # compatible with MS-DOS 1223 self.assertEqual("{:b}".format(flags), "1000") # streaming flag set 1224 self.assertEqual(comp, 0) # compression method = stored 1225 self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra 1226 self.assertEqual(usize, 0xFFFFFFFF) 1227 self.assertEqual(fn_len, 8) # filename len 1228 self.assertEqual(ex_total_len, 20) # size of extra records 1229 self.assertEqual(ex_id, 1) # Zip64 extra record 1230 self.assertEqual(ex_len, 16) # 16 bytes of data 1231 self.assertEqual(ex_usize, 0) # uncompressed size - 0 to defer to data descriptor 1232 self.assertEqual(ex_csize, 0) # compressed size - 0 to defer to data descriptor 1233 self.assertEqual(dd_header, b"PK\07\x08") # data descriptor 1234 self.assertEqual(dd_usize, file_size) # file size (8 bytes because zip64) 1235 self.assertEqual(dd_csize, file_size) # compressed size (8 bytes because zip64) 1236 self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next 1237 1238 1239@requires_zlib() 1240class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1241 unittest.TestCase): 1242 compression = zipfile.ZIP_DEFLATED 1243 1244@requires_bz2() 1245class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1246 unittest.TestCase): 1247 compression = zipfile.ZIP_BZIP2 1248 1249@requires_lzma() 1250class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1251 unittest.TestCase): 1252 compression = zipfile.ZIP_LZMA 1253 1254 1255class AbstractWriterTests: 1256 1257 def tearDown(self): 1258 unlink(TESTFN2) 1259 1260 def test_close_after_close(self): 1261 data = b'content' 1262 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1263 w = zipf.open('test', 'w') 1264 w.write(data) 1265 w.close() 1266 self.assertTrue(w.closed) 1267 w.close() 1268 self.assertTrue(w.closed) 1269 self.assertEqual(zipf.read('test'), data) 1270 1271 def test_write_after_close(self): 1272 data = b'content' 1273 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1274 w = zipf.open('test', 'w') 1275 w.write(data) 1276 w.close() 1277 self.assertTrue(w.closed) 1278 self.assertRaises(ValueError, w.write, b'') 1279 self.assertEqual(zipf.read('test'), data) 1280 1281 def test_issue44439(self): 1282 q = array.array('Q', [1, 2, 3, 4, 5]) 1283 LENGTH = len(q) * q.itemsize 1284 with zipfile.ZipFile(io.BytesIO(), 'w', self.compression) as zip: 1285 with zip.open('data', 'w') as data: 1286 self.assertEqual(data.write(q), LENGTH) 1287 self.assertEqual(zip.getinfo('data').file_size, LENGTH) 1288 1289class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1290 compression = zipfile.ZIP_STORED 1291 1292@requires_zlib() 1293class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1294 compression = zipfile.ZIP_DEFLATED 1295 1296@requires_bz2() 1297class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1298 compression = zipfile.ZIP_BZIP2 1299 1300@requires_lzma() 1301class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1302 compression = zipfile.ZIP_LZMA 1303 1304 1305class PyZipFileTests(unittest.TestCase): 1306 def assertCompiledIn(self, name, namelist): 1307 if name + 'o' not in namelist: 1308 self.assertIn(name + 'c', namelist) 1309 1310 def requiresWriteAccess(self, path): 1311 # effective_ids unavailable on windows 1312 if not os.access(path, os.W_OK, 1313 effective_ids=os.access in os.supports_effective_ids): 1314 self.skipTest('requires write access to the installed location') 1315 filename = os.path.join(path, 'test_zipfile.try') 1316 try: 1317 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1318 os.close(fd) 1319 except Exception: 1320 self.skipTest('requires write access to the installed location') 1321 unlink(filename) 1322 1323 def test_write_pyfile(self): 1324 self.requiresWriteAccess(os.path.dirname(__file__)) 1325 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1326 fn = __file__ 1327 if fn.endswith('.pyc'): 1328 path_split = fn.split(os.sep) 1329 if os.altsep is not None: 1330 path_split.extend(fn.split(os.altsep)) 1331 if '__pycache__' in path_split: 1332 fn = importlib.util.source_from_cache(fn) 1333 else: 1334 fn = fn[:-1] 1335 1336 zipfp.writepy(fn) 1337 1338 bn = os.path.basename(fn) 1339 self.assertNotIn(bn, zipfp.namelist()) 1340 self.assertCompiledIn(bn, zipfp.namelist()) 1341 1342 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1343 fn = __file__ 1344 if fn.endswith('.pyc'): 1345 fn = fn[:-1] 1346 1347 zipfp.writepy(fn, "testpackage") 1348 1349 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1350 self.assertNotIn(bn, zipfp.namelist()) 1351 self.assertCompiledIn(bn, zipfp.namelist()) 1352 1353 def test_write_python_package(self): 1354 import email 1355 packagedir = os.path.dirname(email.__file__) 1356 self.requiresWriteAccess(packagedir) 1357 1358 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1359 zipfp.writepy(packagedir) 1360 1361 # Check for a couple of modules at different levels of the 1362 # hierarchy 1363 names = zipfp.namelist() 1364 self.assertCompiledIn('email/__init__.py', names) 1365 self.assertCompiledIn('email/mime/text.py', names) 1366 1367 def test_write_filtered_python_package(self): 1368 import test 1369 packagedir = os.path.dirname(test.__file__) 1370 self.requiresWriteAccess(packagedir) 1371 1372 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1373 1374 # first make sure that the test folder gives error messages 1375 # (on the badsyntax_... files) 1376 with captured_stdout() as reportSIO: 1377 zipfp.writepy(packagedir) 1378 reportStr = reportSIO.getvalue() 1379 self.assertTrue('SyntaxError' in reportStr) 1380 1381 # then check that the filter works on the whole package 1382 with captured_stdout() as reportSIO: 1383 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1384 reportStr = reportSIO.getvalue() 1385 self.assertTrue('SyntaxError' not in reportStr) 1386 1387 # then check that the filter works on individual files 1388 def filter(path): 1389 return not os.path.basename(path).startswith("bad") 1390 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1391 zipfp.writepy(packagedir, filterfunc=filter) 1392 reportStr = reportSIO.getvalue() 1393 if reportStr: 1394 print(reportStr) 1395 self.assertTrue('SyntaxError' not in reportStr) 1396 1397 def test_write_with_optimization(self): 1398 import email 1399 packagedir = os.path.dirname(email.__file__) 1400 self.requiresWriteAccess(packagedir) 1401 optlevel = 1 if __debug__ else 0 1402 ext = '.pyc' 1403 1404 with TemporaryFile() as t, \ 1405 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1406 zipfp.writepy(packagedir) 1407 1408 names = zipfp.namelist() 1409 self.assertIn('email/__init__' + ext, names) 1410 self.assertIn('email/mime/text' + ext, names) 1411 1412 def test_write_python_directory(self): 1413 os.mkdir(TESTFN2) 1414 try: 1415 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1416 fp.write("print(42)\n") 1417 1418 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1419 fp.write("print(42 * 42)\n") 1420 1421 with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp: 1422 fp.write("bla bla bla\n") 1423 1424 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1425 zipfp.writepy(TESTFN2) 1426 1427 names = zipfp.namelist() 1428 self.assertCompiledIn('mod1.py', names) 1429 self.assertCompiledIn('mod2.py', names) 1430 self.assertNotIn('mod2.txt', names) 1431 1432 finally: 1433 rmtree(TESTFN2) 1434 1435 def test_write_python_directory_filtered(self): 1436 os.mkdir(TESTFN2) 1437 try: 1438 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1439 fp.write("print(42)\n") 1440 1441 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1442 fp.write("print(42 * 42)\n") 1443 1444 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1445 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1446 not fn.endswith('mod2.py')) 1447 1448 names = zipfp.namelist() 1449 self.assertCompiledIn('mod1.py', names) 1450 self.assertNotIn('mod2.py', names) 1451 1452 finally: 1453 rmtree(TESTFN2) 1454 1455 def test_write_non_pyfile(self): 1456 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1457 with open(TESTFN, 'w', encoding='utf-8') as f: 1458 f.write('most definitely not a python file') 1459 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1460 unlink(TESTFN) 1461 1462 def test_write_pyfile_bad_syntax(self): 1463 os.mkdir(TESTFN2) 1464 try: 1465 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1466 fp.write("Bad syntax in python file\n") 1467 1468 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1469 # syntax errors are printed to stdout 1470 with captured_stdout() as s: 1471 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1472 1473 self.assertIn("SyntaxError", s.getvalue()) 1474 1475 # as it will not have compiled the python file, it will 1476 # include the .py file not .pyc 1477 names = zipfp.namelist() 1478 self.assertIn('mod1.py', names) 1479 self.assertNotIn('mod1.pyc', names) 1480 1481 finally: 1482 rmtree(TESTFN2) 1483 1484 def test_write_pathlike(self): 1485 os.mkdir(TESTFN2) 1486 try: 1487 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1488 fp.write("print(42)\n") 1489 1490 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1491 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1492 names = zipfp.namelist() 1493 self.assertCompiledIn('mod1.py', names) 1494 finally: 1495 rmtree(TESTFN2) 1496 1497 1498class ExtractTests(unittest.TestCase): 1499 1500 def make_test_file(self): 1501 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1502 for fpath, fdata in SMALL_TEST_DATA: 1503 zipfp.writestr(fpath, fdata) 1504 1505 def test_extract(self): 1506 with temp_cwd(): 1507 self.make_test_file() 1508 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1509 for fpath, fdata in SMALL_TEST_DATA: 1510 writtenfile = zipfp.extract(fpath) 1511 1512 # make sure it was written to the right place 1513 correctfile = os.path.join(os.getcwd(), fpath) 1514 correctfile = os.path.normpath(correctfile) 1515 1516 self.assertEqual(writtenfile, correctfile) 1517 1518 # make sure correct data is in correct file 1519 with open(writtenfile, "rb") as f: 1520 self.assertEqual(fdata.encode(), f.read()) 1521 1522 unlink(writtenfile) 1523 1524 def _test_extract_with_target(self, target): 1525 self.make_test_file() 1526 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1527 for fpath, fdata in SMALL_TEST_DATA: 1528 writtenfile = zipfp.extract(fpath, target) 1529 1530 # make sure it was written to the right place 1531 correctfile = os.path.join(target, fpath) 1532 correctfile = os.path.normpath(correctfile) 1533 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1534 1535 # make sure correct data is in correct file 1536 with open(writtenfile, "rb") as f: 1537 self.assertEqual(fdata.encode(), f.read()) 1538 1539 unlink(writtenfile) 1540 1541 unlink(TESTFN2) 1542 1543 def test_extract_with_target(self): 1544 with temp_dir() as extdir: 1545 self._test_extract_with_target(extdir) 1546 1547 def test_extract_with_target_pathlike(self): 1548 with temp_dir() as extdir: 1549 self._test_extract_with_target(pathlib.Path(extdir)) 1550 1551 def test_extract_all(self): 1552 with temp_cwd(): 1553 self.make_test_file() 1554 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1555 zipfp.extractall() 1556 for fpath, fdata in SMALL_TEST_DATA: 1557 outfile = os.path.join(os.getcwd(), fpath) 1558 1559 with open(outfile, "rb") as f: 1560 self.assertEqual(fdata.encode(), f.read()) 1561 1562 unlink(outfile) 1563 1564 def _test_extract_all_with_target(self, target): 1565 self.make_test_file() 1566 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1567 zipfp.extractall(target) 1568 for fpath, fdata in SMALL_TEST_DATA: 1569 outfile = os.path.join(target, fpath) 1570 1571 with open(outfile, "rb") as f: 1572 self.assertEqual(fdata.encode(), f.read()) 1573 1574 unlink(outfile) 1575 1576 unlink(TESTFN2) 1577 1578 def test_extract_all_with_target(self): 1579 with temp_dir() as extdir: 1580 self._test_extract_all_with_target(extdir) 1581 1582 def test_extract_all_with_target_pathlike(self): 1583 with temp_dir() as extdir: 1584 self._test_extract_all_with_target(pathlib.Path(extdir)) 1585 1586 def check_file(self, filename, content): 1587 self.assertTrue(os.path.isfile(filename)) 1588 with open(filename, 'rb') as f: 1589 self.assertEqual(f.read(), content) 1590 1591 def test_sanitize_windows_name(self): 1592 san = zipfile.ZipFile._sanitize_windows_name 1593 # Passing pathsep in allows this test to work regardless of platform. 1594 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1595 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1596 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1597 1598 def test_extract_hackers_arcnames_common_cases(self): 1599 common_hacknames = [ 1600 ('../foo/bar', 'foo/bar'), 1601 ('foo/../bar', 'foo/bar'), 1602 ('foo/../../bar', 'foo/bar'), 1603 ('foo/bar/..', 'foo/bar'), 1604 ('./../foo/bar', 'foo/bar'), 1605 ('/foo/bar', 'foo/bar'), 1606 ('/foo/../bar', 'foo/bar'), 1607 ('/foo/../../bar', 'foo/bar'), 1608 ] 1609 self._test_extract_hackers_arcnames(common_hacknames) 1610 1611 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1612 def test_extract_hackers_arcnames_windows_only(self): 1613 """Test combination of path fixing and windows name sanitization.""" 1614 windows_hacknames = [ 1615 (r'..\foo\bar', 'foo/bar'), 1616 (r'..\/foo\/bar', 'foo/bar'), 1617 (r'foo/\..\/bar', 'foo/bar'), 1618 (r'foo\/../\bar', 'foo/bar'), 1619 (r'C:foo/bar', 'foo/bar'), 1620 (r'C:/foo/bar', 'foo/bar'), 1621 (r'C://foo/bar', 'foo/bar'), 1622 (r'C:\foo\bar', 'foo/bar'), 1623 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1624 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1625 (r'///conky/mountpoint/foo/bar', 'mountpoint/foo/bar'), 1626 (r'\\\conky\mountpoint\foo\bar', 'mountpoint/foo/bar'), 1627 (r'//conky//mountpoint/foo/bar', 'mountpoint/foo/bar'), 1628 (r'\\conky\\mountpoint\foo\bar', 'mountpoint/foo/bar'), 1629 (r'//?/C:/foo/bar', 'foo/bar'), 1630 (r'\\?\C:\foo\bar', 'foo/bar'), 1631 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1632 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1633 ('../../foo../../ba..r', 'foo/ba..r'), 1634 ] 1635 self._test_extract_hackers_arcnames(windows_hacknames) 1636 1637 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1638 def test_extract_hackers_arcnames_posix_only(self): 1639 posix_hacknames = [ 1640 ('//foo/bar', 'foo/bar'), 1641 ('../../foo../../ba..r', 'foo../ba..r'), 1642 (r'foo/..\bar', r'foo/..\bar'), 1643 ] 1644 self._test_extract_hackers_arcnames(posix_hacknames) 1645 1646 def _test_extract_hackers_arcnames(self, hacknames): 1647 for arcname, fixedname in hacknames: 1648 content = b'foobar' + arcname.encode() 1649 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1650 zinfo = zipfile.ZipInfo() 1651 # preserve backslashes 1652 zinfo.filename = arcname 1653 zinfo.external_attr = 0o600 << 16 1654 zipfp.writestr(zinfo, content) 1655 1656 arcname = arcname.replace(os.sep, "/") 1657 targetpath = os.path.join('target', 'subdir', 'subsub') 1658 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1659 1660 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1661 writtenfile = zipfp.extract(arcname, targetpath) 1662 self.assertEqual(writtenfile, correctfile, 1663 msg='extract %r: %r != %r' % 1664 (arcname, writtenfile, correctfile)) 1665 self.check_file(correctfile, content) 1666 rmtree('target') 1667 1668 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1669 zipfp.extractall(targetpath) 1670 self.check_file(correctfile, content) 1671 rmtree('target') 1672 1673 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1674 1675 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1676 writtenfile = zipfp.extract(arcname) 1677 self.assertEqual(writtenfile, correctfile, 1678 msg="extract %r" % arcname) 1679 self.check_file(correctfile, content) 1680 rmtree(fixedname.split('/')[0]) 1681 1682 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1683 zipfp.extractall() 1684 self.check_file(correctfile, content) 1685 rmtree(fixedname.split('/')[0]) 1686 1687 unlink(TESTFN2) 1688 1689 1690class OtherTests(unittest.TestCase): 1691 def test_open_via_zip_info(self): 1692 # Create the ZIP archive 1693 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1694 zipfp.writestr("name", "foo") 1695 with self.assertWarns(UserWarning): 1696 zipfp.writestr("name", "bar") 1697 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1698 1699 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1700 infos = zipfp.infolist() 1701 data = b"" 1702 for info in infos: 1703 with zipfp.open(info) as zipopen: 1704 data += zipopen.read() 1705 self.assertIn(data, {b"foobar", b"barfoo"}) 1706 data = b"" 1707 for info in infos: 1708 data += zipfp.read(info) 1709 self.assertIn(data, {b"foobar", b"barfoo"}) 1710 1711 def test_writestr_extended_local_header_issue1202(self): 1712 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1713 for data in 'abcdefghijklmnop': 1714 zinfo = zipfile.ZipInfo(data) 1715 zinfo.flag_bits |= zipfile._MASK_USE_DATA_DESCRIPTOR # Include an extended local header. 1716 orig_zip.writestr(zinfo, data) 1717 1718 def test_close(self): 1719 """Check that the zipfile is closed after the 'with' block.""" 1720 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1721 for fpath, fdata in SMALL_TEST_DATA: 1722 zipfp.writestr(fpath, fdata) 1723 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1724 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1725 1726 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1727 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1728 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1729 1730 def test_close_on_exception(self): 1731 """Check that the zipfile is closed if an exception is raised in the 1732 'with' block.""" 1733 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1734 for fpath, fdata in SMALL_TEST_DATA: 1735 zipfp.writestr(fpath, fdata) 1736 1737 try: 1738 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1739 raise zipfile.BadZipFile() 1740 except zipfile.BadZipFile: 1741 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1742 1743 def test_unsupported_version(self): 1744 # File has an extract_version of 120 1745 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1746 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1747 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1748 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1749 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1750 1751 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1752 io.BytesIO(data), 'r') 1753 1754 @requires_zlib() 1755 def test_read_unicode_filenames(self): 1756 # bug #10801 1757 fname = findfile('zip_cp437_header.zip') 1758 with zipfile.ZipFile(fname) as zipfp: 1759 for name in zipfp.namelist(): 1760 zipfp.open(name).close() 1761 1762 def test_write_unicode_filenames(self): 1763 with zipfile.ZipFile(TESTFN, "w") as zf: 1764 zf.writestr("foo.txt", "Test for unicode filename") 1765 zf.writestr("\xf6.txt", "Test for unicode filename") 1766 self.assertIsInstance(zf.infolist()[0].filename, str) 1767 1768 with zipfile.ZipFile(TESTFN, "r") as zf: 1769 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1770 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1771 1772 def test_read_after_write_unicode_filenames(self): 1773 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1774 zipfp.writestr('приклад', b'sample') 1775 self.assertEqual(zipfp.read('приклад'), b'sample') 1776 1777 def test_exclusive_create_zip_file(self): 1778 """Test exclusive creating a new zipfile.""" 1779 unlink(TESTFN2) 1780 filename = 'testfile.txt' 1781 content = b'hello, world. this is some content.' 1782 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1783 zipfp.writestr(filename, content) 1784 with self.assertRaises(FileExistsError): 1785 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1786 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1787 self.assertEqual(zipfp.namelist(), [filename]) 1788 self.assertEqual(zipfp.read(filename), content) 1789 1790 def test_create_non_existent_file_for_append(self): 1791 if os.path.exists(TESTFN): 1792 os.unlink(TESTFN) 1793 1794 filename = 'testfile.txt' 1795 content = b'hello, world. this is some content.' 1796 1797 try: 1798 with zipfile.ZipFile(TESTFN, 'a') as zf: 1799 zf.writestr(filename, content) 1800 except OSError: 1801 self.fail('Could not append data to a non-existent zip file.') 1802 1803 self.assertTrue(os.path.exists(TESTFN)) 1804 1805 with zipfile.ZipFile(TESTFN, 'r') as zf: 1806 self.assertEqual(zf.read(filename), content) 1807 1808 def test_close_erroneous_file(self): 1809 # This test checks that the ZipFile constructor closes the file object 1810 # it opens if there's an error in the file. If it doesn't, the 1811 # traceback holds a reference to the ZipFile object and, indirectly, 1812 # the file object. 1813 # On Windows, this causes the os.unlink() call to fail because the 1814 # underlying file is still open. This is SF bug #412214. 1815 # 1816 with open(TESTFN, "w", encoding="utf-8") as fp: 1817 fp.write("this is not a legal zip file\n") 1818 try: 1819 zf = zipfile.ZipFile(TESTFN) 1820 except zipfile.BadZipFile: 1821 pass 1822 1823 def test_is_zip_erroneous_file(self): 1824 """Check that is_zipfile() correctly identifies non-zip files.""" 1825 # - passing a filename 1826 with open(TESTFN, "w", encoding='utf-8') as fp: 1827 fp.write("this is not a legal zip file\n") 1828 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1829 # - passing a path-like object 1830 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1831 # - passing a file object 1832 with open(TESTFN, "rb") as fp: 1833 self.assertFalse(zipfile.is_zipfile(fp)) 1834 # - passing a file-like object 1835 fp = io.BytesIO() 1836 fp.write(b"this is not a legal zip file\n") 1837 self.assertFalse(zipfile.is_zipfile(fp)) 1838 fp.seek(0, 0) 1839 self.assertFalse(zipfile.is_zipfile(fp)) 1840 1841 def test_damaged_zipfile(self): 1842 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1843 # - Create a valid zip file 1844 fp = io.BytesIO() 1845 with zipfile.ZipFile(fp, mode="w") as zipf: 1846 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1847 zipfiledata = fp.getvalue() 1848 1849 # - Now create copies of it missing the last N bytes and make sure 1850 # a BadZipFile exception is raised when we try to open it 1851 for N in range(len(zipfiledata)): 1852 fp = io.BytesIO(zipfiledata[:N]) 1853 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1854 1855 def test_is_zip_valid_file(self): 1856 """Check that is_zipfile() correctly identifies zip files.""" 1857 # - passing a filename 1858 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1859 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1860 1861 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1862 # - passing a file object 1863 with open(TESTFN, "rb") as fp: 1864 self.assertTrue(zipfile.is_zipfile(fp)) 1865 fp.seek(0, 0) 1866 zip_contents = fp.read() 1867 # - passing a file-like object 1868 fp = io.BytesIO() 1869 fp.write(zip_contents) 1870 self.assertTrue(zipfile.is_zipfile(fp)) 1871 fp.seek(0, 0) 1872 self.assertTrue(zipfile.is_zipfile(fp)) 1873 1874 def test_non_existent_file_raises_OSError(self): 1875 # make sure we don't raise an AttributeError when a partially-constructed 1876 # ZipFile instance is finalized; this tests for regression on SF tracker 1877 # bug #403871. 1878 1879 # The bug we're testing for caused an AttributeError to be raised 1880 # when a ZipFile instance was created for a file that did not 1881 # exist; the .fp member was not initialized but was needed by the 1882 # __del__() method. Since the AttributeError is in the __del__(), 1883 # it is ignored, but the user should be sufficiently annoyed by 1884 # the message on the output that regression will be noticed 1885 # quickly. 1886 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1887 1888 def test_empty_file_raises_BadZipFile(self): 1889 f = open(TESTFN, 'w', encoding='utf-8') 1890 f.close() 1891 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1892 1893 with open(TESTFN, 'w', encoding='utf-8') as fp: 1894 fp.write("short file") 1895 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1896 1897 def test_negative_central_directory_offset_raises_BadZipFile(self): 1898 # Zip file containing an empty EOCD record 1899 buffer = bytearray(b'PK\x05\x06' + b'\0'*18) 1900 1901 # Set the size of the central directory bytes to become 1, 1902 # causing the central directory offset to become negative 1903 for dirsize in 1, 2**32-1: 1904 buffer[12:16] = struct.pack('<L', dirsize) 1905 f = io.BytesIO(buffer) 1906 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, f) 1907 1908 def test_closed_zip_raises_ValueError(self): 1909 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1910 data = io.BytesIO() 1911 with zipfile.ZipFile(data, mode="w") as zipf: 1912 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1913 1914 # This is correct; calling .read on a closed ZipFile should raise 1915 # a ValueError, and so should calling .testzip. An earlier 1916 # version of .testzip would swallow this exception (and any other) 1917 # and report that the first file in the archive was corrupt. 1918 self.assertRaises(ValueError, zipf.read, "foo.txt") 1919 self.assertRaises(ValueError, zipf.open, "foo.txt") 1920 self.assertRaises(ValueError, zipf.testzip) 1921 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1922 with open(TESTFN, 'w', encoding='utf-8') as f: 1923 f.write('zipfile test data') 1924 self.assertRaises(ValueError, zipf.write, TESTFN) 1925 1926 def test_bad_constructor_mode(self): 1927 """Check that bad modes passed to ZipFile constructor are caught.""" 1928 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1929 1930 def test_bad_open_mode(self): 1931 """Check that bad modes passed to ZipFile.open are caught.""" 1932 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1933 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1934 1935 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1936 # read the data to make sure the file is there 1937 zipf.read("foo.txt") 1938 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1939 # universal newlines support is removed 1940 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1941 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1942 1943 def test_read0(self): 1944 """Check that calling read(0) on a ZipExtFile object returns an empty 1945 string and doesn't advance file pointer.""" 1946 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1947 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1948 # read the data to make sure the file is there 1949 with zipf.open("foo.txt") as f: 1950 for i in range(FIXEDTEST_SIZE): 1951 self.assertEqual(f.read(0), b'') 1952 1953 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1954 1955 def test_open_non_existent_item(self): 1956 """Check that attempting to call open() for an item that doesn't 1957 exist in the archive raises a RuntimeError.""" 1958 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1959 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1960 1961 def test_bad_compression_mode(self): 1962 """Check that bad compression methods passed to ZipFile.open are 1963 caught.""" 1964 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1965 1966 def test_unsupported_compression(self): 1967 # data is declared as shrunk, but actually deflated 1968 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1969 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1970 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1971 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1972 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1973 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1974 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1975 self.assertRaises(NotImplementedError, zipf.open, 'x') 1976 1977 def test_null_byte_in_filename(self): 1978 """Check that a filename containing a null byte is properly 1979 terminated.""" 1980 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1981 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1982 self.assertEqual(zipf.namelist(), ['foo.txt']) 1983 1984 def test_struct_sizes(self): 1985 """Check that ZIP internal structure sizes are calculated correctly.""" 1986 self.assertEqual(zipfile.sizeEndCentDir, 22) 1987 self.assertEqual(zipfile.sizeCentralDir, 46) 1988 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1989 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1990 1991 def test_comments(self): 1992 """Check that comments on the archive are handled properly.""" 1993 1994 # check default comment is empty 1995 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1996 self.assertEqual(zipf.comment, b'') 1997 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1998 1999 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2000 self.assertEqual(zipfr.comment, b'') 2001 2002 # check a simple short comment 2003 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 2004 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2005 zipf.comment = comment 2006 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2007 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2008 self.assertEqual(zipf.comment, comment) 2009 2010 # check a comment of max length 2011 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 2012 comment2 = comment2.encode("ascii") 2013 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2014 zipf.comment = comment2 2015 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2016 2017 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2018 self.assertEqual(zipfr.comment, comment2) 2019 2020 # check a comment that is too long is truncated 2021 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2022 with self.assertWarns(UserWarning): 2023 zipf.comment = comment2 + b'oops' 2024 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2025 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2026 self.assertEqual(zipfr.comment, comment2) 2027 2028 # check that comments are correctly modified in append mode 2029 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 2030 zipf.comment = b"original comment" 2031 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2032 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 2033 zipf.comment = b"an updated comment" 2034 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 2035 self.assertEqual(zipf.comment, b"an updated comment") 2036 2037 # check that comments are correctly shortened in append mode 2038 # and the file is indeed truncated 2039 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 2040 zipf.comment = b"original comment that's longer" 2041 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2042 original_zip_size = os.path.getsize(TESTFN) 2043 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 2044 zipf.comment = b"shorter comment" 2045 self.assertTrue(original_zip_size > os.path.getsize(TESTFN)) 2046 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 2047 self.assertEqual(zipf.comment, b"shorter comment") 2048 2049 def test_unicode_comment(self): 2050 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 2051 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2052 with self.assertRaises(TypeError): 2053 zipf.comment = "this is an error" 2054 2055 def test_change_comment_in_empty_archive(self): 2056 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 2057 self.assertFalse(zipf.filelist) 2058 zipf.comment = b"this is a comment" 2059 with zipfile.ZipFile(TESTFN, "r") as zipf: 2060 self.assertEqual(zipf.comment, b"this is a comment") 2061 2062 def test_change_comment_in_nonempty_archive(self): 2063 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 2064 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2065 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 2066 self.assertTrue(zipf.filelist) 2067 zipf.comment = b"this is a comment" 2068 with zipfile.ZipFile(TESTFN, "r") as zipf: 2069 self.assertEqual(zipf.comment, b"this is a comment") 2070 2071 def test_empty_zipfile(self): 2072 # Check that creating a file in 'w' or 'a' mode and closing without 2073 # adding any files to the archives creates a valid empty ZIP file 2074 zipf = zipfile.ZipFile(TESTFN, mode="w") 2075 zipf.close() 2076 try: 2077 zipf = zipfile.ZipFile(TESTFN, mode="r") 2078 except zipfile.BadZipFile: 2079 self.fail("Unable to create empty ZIP file in 'w' mode") 2080 2081 zipf = zipfile.ZipFile(TESTFN, mode="a") 2082 zipf.close() 2083 try: 2084 zipf = zipfile.ZipFile(TESTFN, mode="r") 2085 except: 2086 self.fail("Unable to create empty ZIP file in 'a' mode") 2087 2088 def test_open_empty_file(self): 2089 # Issue 1710703: Check that opening a file with less than 22 bytes 2090 # raises a BadZipFile exception (rather than the previously unhelpful 2091 # OSError) 2092 f = open(TESTFN, 'w', encoding='utf-8') 2093 f.close() 2094 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 2095 2096 def test_create_zipinfo_before_1980(self): 2097 self.assertRaises(ValueError, 2098 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 2099 2100 def test_create_empty_zipinfo_repr(self): 2101 """Before bpo-26185, repr() on empty ZipInfo object was failing.""" 2102 zi = zipfile.ZipInfo(filename="empty") 2103 self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>") 2104 2105 def test_create_empty_zipinfo_default_attributes(self): 2106 """Ensure all required attributes are set.""" 2107 zi = zipfile.ZipInfo() 2108 self.assertEqual(zi.orig_filename, "NoName") 2109 self.assertEqual(zi.filename, "NoName") 2110 self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0)) 2111 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2112 self.assertEqual(zi.comment, b"") 2113 self.assertEqual(zi.extra, b"") 2114 self.assertIn(zi.create_system, (0, 3)) 2115 self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION) 2116 self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION) 2117 self.assertEqual(zi.reserved, 0) 2118 self.assertEqual(zi.flag_bits, 0) 2119 self.assertEqual(zi.volume, 0) 2120 self.assertEqual(zi.internal_attr, 0) 2121 self.assertEqual(zi.external_attr, 0) 2122 2123 # Before bpo-26185, both were missing 2124 self.assertEqual(zi.file_size, 0) 2125 self.assertEqual(zi.compress_size, 0) 2126 2127 def test_zipfile_with_short_extra_field(self): 2128 """If an extra field in the header is less than 4 bytes, skip it.""" 2129 zipdata = ( 2130 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 2131 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 2132 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 2133 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 2134 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 2135 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 2136 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 2137 ) 2138 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 2139 # testzip returns the name of the first corrupt file, or None 2140 self.assertIsNone(zipf.testzip()) 2141 2142 def test_open_conflicting_handles(self): 2143 # It's only possible to open one writable file handle at a time 2144 msg1 = b"It's fun to charter an accountant!" 2145 msg2 = b"And sail the wide accountant sea" 2146 msg3 = b"To find, explore the funds offshore" 2147 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 2148 with zipf.open('foo', mode='w') as w2: 2149 w2.write(msg1) 2150 with zipf.open('bar', mode='w') as w1: 2151 with self.assertRaises(ValueError): 2152 zipf.open('handle', mode='w') 2153 with self.assertRaises(ValueError): 2154 zipf.open('foo', mode='r') 2155 with self.assertRaises(ValueError): 2156 zipf.writestr('str', 'abcde') 2157 with self.assertRaises(ValueError): 2158 zipf.write(__file__, 'file') 2159 with self.assertRaises(ValueError): 2160 zipf.close() 2161 w1.write(msg2) 2162 with zipf.open('baz', mode='w') as w2: 2163 w2.write(msg3) 2164 2165 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 2166 self.assertEqual(zipf.read('foo'), msg1) 2167 self.assertEqual(zipf.read('bar'), msg2) 2168 self.assertEqual(zipf.read('baz'), msg3) 2169 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 2170 2171 def test_seek_tell(self): 2172 # Test seek functionality 2173 txt = b"Where's Bruce?" 2174 bloc = txt.find(b"Bruce") 2175 # Check seek on a file 2176 with zipfile.ZipFile(TESTFN, "w") as zipf: 2177 zipf.writestr("foo.txt", txt) 2178 with zipfile.ZipFile(TESTFN, "r") as zipf: 2179 with zipf.open("foo.txt", "r") as fp: 2180 fp.seek(bloc, os.SEEK_SET) 2181 self.assertEqual(fp.tell(), bloc) 2182 fp.seek(-bloc, os.SEEK_CUR) 2183 self.assertEqual(fp.tell(), 0) 2184 fp.seek(bloc, os.SEEK_CUR) 2185 self.assertEqual(fp.tell(), bloc) 2186 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2187 fp.seek(0, os.SEEK_END) 2188 self.assertEqual(fp.tell(), len(txt)) 2189 fp.seek(0, os.SEEK_SET) 2190 self.assertEqual(fp.tell(), 0) 2191 # Check seek on memory file 2192 data = io.BytesIO() 2193 with zipfile.ZipFile(data, mode="w") as zipf: 2194 zipf.writestr("foo.txt", txt) 2195 with zipfile.ZipFile(data, mode="r") as zipf: 2196 with zipf.open("foo.txt", "r") as fp: 2197 fp.seek(bloc, os.SEEK_SET) 2198 self.assertEqual(fp.tell(), bloc) 2199 fp.seek(-bloc, os.SEEK_CUR) 2200 self.assertEqual(fp.tell(), 0) 2201 fp.seek(bloc, os.SEEK_CUR) 2202 self.assertEqual(fp.tell(), bloc) 2203 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2204 fp.seek(0, os.SEEK_END) 2205 self.assertEqual(fp.tell(), len(txt)) 2206 fp.seek(0, os.SEEK_SET) 2207 self.assertEqual(fp.tell(), 0) 2208 2209 @requires_bz2() 2210 def test_decompress_without_3rd_party_library(self): 2211 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2212 zip_file = io.BytesIO(data) 2213 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 2214 zf.writestr('a.txt', b'a') 2215 with mock.patch('zipfile.bz2', None): 2216 with zipfile.ZipFile(zip_file) as zf: 2217 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 2218 2219 def tearDown(self): 2220 unlink(TESTFN) 2221 unlink(TESTFN2) 2222 2223 2224class AbstractBadCrcTests: 2225 def test_testzip_with_bad_crc(self): 2226 """Tests that files with bad CRCs return their name from testzip.""" 2227 zipdata = self.zip_with_bad_crc 2228 2229 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2230 # testzip returns the name of the first corrupt file, or None 2231 self.assertEqual('afile', zipf.testzip()) 2232 2233 def test_read_with_bad_crc(self): 2234 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2235 zipdata = self.zip_with_bad_crc 2236 2237 # Using ZipFile.read() 2238 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2239 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2240 2241 # Using ZipExtFile.read() 2242 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2243 with zipf.open('afile', 'r') as corrupt_file: 2244 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2245 2246 # Same with small reads (in order to exercise the buffering logic) 2247 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2248 with zipf.open('afile', 'r') as corrupt_file: 2249 corrupt_file.MIN_READ_SIZE = 2 2250 with self.assertRaises(zipfile.BadZipFile): 2251 while corrupt_file.read(2): 2252 pass 2253 2254 2255class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2256 compression = zipfile.ZIP_STORED 2257 zip_with_bad_crc = ( 2258 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2259 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2260 b'ilehello,AworldP' 2261 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2262 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2263 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2264 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2265 b'\0\0/\0\0\0\0\0') 2266 2267@requires_zlib() 2268class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2269 compression = zipfile.ZIP_DEFLATED 2270 zip_with_bad_crc = ( 2271 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2272 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2273 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2274 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2275 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2276 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2277 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2278 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2279 2280@requires_bz2() 2281class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2282 compression = zipfile.ZIP_BZIP2 2283 zip_with_bad_crc = ( 2284 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2285 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2286 b'ileBZh91AY&SY\xd4\xa8\xca' 2287 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2288 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2289 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2290 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2291 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2292 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2293 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2294 b'\x00\x00\x00\x00') 2295 2296@requires_lzma() 2297class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2298 compression = zipfile.ZIP_LZMA 2299 zip_with_bad_crc = ( 2300 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2301 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2302 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2303 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2304 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2305 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2306 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2307 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2308 b'\x00>\x00\x00\x00\x00\x00') 2309 2310 2311class DecryptionTests(unittest.TestCase): 2312 """Check that ZIP decryption works. Since the library does not 2313 support encryption at the moment, we use a pre-generated encrypted 2314 ZIP file.""" 2315 2316 data = ( 2317 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2318 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2319 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2320 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2321 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2322 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2323 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2324 data2 = ( 2325 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2326 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2327 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2328 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2329 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2330 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2331 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2332 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2333 2334 plain = b'zipfile.py encryption test' 2335 plain2 = b'\x00'*512 2336 2337 def setUp(self): 2338 with open(TESTFN, "wb") as fp: 2339 fp.write(self.data) 2340 self.zip = zipfile.ZipFile(TESTFN, "r") 2341 with open(TESTFN2, "wb") as fp: 2342 fp.write(self.data2) 2343 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2344 2345 def tearDown(self): 2346 self.zip.close() 2347 os.unlink(TESTFN) 2348 self.zip2.close() 2349 os.unlink(TESTFN2) 2350 2351 def test_no_password(self): 2352 # Reading the encrypted file without password 2353 # must generate a RunTime exception 2354 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2355 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2356 2357 def test_bad_password(self): 2358 self.zip.setpassword(b"perl") 2359 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2360 self.zip2.setpassword(b"perl") 2361 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2362 2363 @requires_zlib() 2364 def test_good_password(self): 2365 self.zip.setpassword(b"python") 2366 self.assertEqual(self.zip.read("test.txt"), self.plain) 2367 self.zip2.setpassword(b"12345") 2368 self.assertEqual(self.zip2.read("zero"), self.plain2) 2369 2370 def test_unicode_password(self): 2371 expected_msg = "pwd: expected bytes, got str" 2372 2373 with self.assertRaisesRegex(TypeError, expected_msg): 2374 self.zip.setpassword("unicode") 2375 2376 with self.assertRaisesRegex(TypeError, expected_msg): 2377 self.zip.read("test.txt", "python") 2378 2379 with self.assertRaisesRegex(TypeError, expected_msg): 2380 self.zip.open("test.txt", pwd="python") 2381 2382 with self.assertRaisesRegex(TypeError, expected_msg): 2383 self.zip.extract("test.txt", pwd="python") 2384 2385 with self.assertRaisesRegex(TypeError, expected_msg): 2386 self.zip.pwd = "python" 2387 self.zip.open("test.txt") 2388 2389 def test_seek_tell(self): 2390 self.zip.setpassword(b"python") 2391 txt = self.plain 2392 test_word = b'encryption' 2393 bloc = txt.find(test_word) 2394 bloc_len = len(test_word) 2395 with self.zip.open("test.txt", "r") as fp: 2396 fp.seek(bloc, os.SEEK_SET) 2397 self.assertEqual(fp.tell(), bloc) 2398 fp.seek(-bloc, os.SEEK_CUR) 2399 self.assertEqual(fp.tell(), 0) 2400 fp.seek(bloc, os.SEEK_CUR) 2401 self.assertEqual(fp.tell(), bloc) 2402 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2403 2404 # Make sure that the second read after seeking back beyond 2405 # _readbuffer returns the same content (ie. rewind to the start of 2406 # the file to read forward to the required position). 2407 old_read_size = fp.MIN_READ_SIZE 2408 fp.MIN_READ_SIZE = 1 2409 fp._readbuffer = b'' 2410 fp._offset = 0 2411 fp.seek(0, os.SEEK_SET) 2412 self.assertEqual(fp.tell(), 0) 2413 fp.seek(bloc, os.SEEK_CUR) 2414 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2415 fp.MIN_READ_SIZE = old_read_size 2416 2417 fp.seek(0, os.SEEK_END) 2418 self.assertEqual(fp.tell(), len(txt)) 2419 fp.seek(0, os.SEEK_SET) 2420 self.assertEqual(fp.tell(), 0) 2421 2422 # Read the file completely to definitely call any eof integrity 2423 # checks (crc) and make sure they still pass. 2424 fp.read() 2425 2426 2427class AbstractTestsWithRandomBinaryFiles: 2428 @classmethod 2429 def setUpClass(cls): 2430 datacount = randint(16, 64)*1024 + randint(1, 1024) 2431 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2432 for i in range(datacount)) 2433 2434 def setUp(self): 2435 # Make a source file with some lines 2436 with open(TESTFN, "wb") as fp: 2437 fp.write(self.data) 2438 2439 def tearDown(self): 2440 unlink(TESTFN) 2441 unlink(TESTFN2) 2442 2443 def make_test_archive(self, f, compression): 2444 # Create the ZIP archive 2445 with zipfile.ZipFile(f, "w", compression) as zipfp: 2446 zipfp.write(TESTFN, "another.name") 2447 zipfp.write(TESTFN, TESTFN) 2448 2449 def zip_test(self, f, compression): 2450 self.make_test_archive(f, compression) 2451 2452 # Read the ZIP archive 2453 with zipfile.ZipFile(f, "r", compression) as zipfp: 2454 testdata = zipfp.read(TESTFN) 2455 self.assertEqual(len(testdata), len(self.data)) 2456 self.assertEqual(testdata, self.data) 2457 self.assertEqual(zipfp.read("another.name"), self.data) 2458 2459 def test_read(self): 2460 for f in get_files(self): 2461 self.zip_test(f, self.compression) 2462 2463 def zip_open_test(self, f, compression): 2464 self.make_test_archive(f, compression) 2465 2466 # Read the ZIP archive 2467 with zipfile.ZipFile(f, "r", compression) as zipfp: 2468 zipdata1 = [] 2469 with zipfp.open(TESTFN) as zipopen1: 2470 while True: 2471 read_data = zipopen1.read(256) 2472 if not read_data: 2473 break 2474 zipdata1.append(read_data) 2475 2476 zipdata2 = [] 2477 with zipfp.open("another.name") as zipopen2: 2478 while True: 2479 read_data = zipopen2.read(256) 2480 if not read_data: 2481 break 2482 zipdata2.append(read_data) 2483 2484 testdata1 = b''.join(zipdata1) 2485 self.assertEqual(len(testdata1), len(self.data)) 2486 self.assertEqual(testdata1, self.data) 2487 2488 testdata2 = b''.join(zipdata2) 2489 self.assertEqual(len(testdata2), len(self.data)) 2490 self.assertEqual(testdata2, self.data) 2491 2492 def test_open(self): 2493 for f in get_files(self): 2494 self.zip_open_test(f, self.compression) 2495 2496 def zip_random_open_test(self, f, compression): 2497 self.make_test_archive(f, compression) 2498 2499 # Read the ZIP archive 2500 with zipfile.ZipFile(f, "r", compression) as zipfp: 2501 zipdata1 = [] 2502 with zipfp.open(TESTFN) as zipopen1: 2503 while True: 2504 read_data = zipopen1.read(randint(1, 1024)) 2505 if not read_data: 2506 break 2507 zipdata1.append(read_data) 2508 2509 testdata = b''.join(zipdata1) 2510 self.assertEqual(len(testdata), len(self.data)) 2511 self.assertEqual(testdata, self.data) 2512 2513 def test_random_open(self): 2514 for f in get_files(self): 2515 self.zip_random_open_test(f, self.compression) 2516 2517 2518class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2519 unittest.TestCase): 2520 compression = zipfile.ZIP_STORED 2521 2522@requires_zlib() 2523class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2524 unittest.TestCase): 2525 compression = zipfile.ZIP_DEFLATED 2526 2527@requires_bz2() 2528class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2529 unittest.TestCase): 2530 compression = zipfile.ZIP_BZIP2 2531 2532@requires_lzma() 2533class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2534 unittest.TestCase): 2535 compression = zipfile.ZIP_LZMA 2536 2537 2538# Provide the tell() method but not seek() 2539class Tellable: 2540 def __init__(self, fp): 2541 self.fp = fp 2542 self.offset = 0 2543 2544 def write(self, data): 2545 n = self.fp.write(data) 2546 self.offset += n 2547 return n 2548 2549 def tell(self): 2550 return self.offset 2551 2552 def flush(self): 2553 self.fp.flush() 2554 2555class Unseekable: 2556 def __init__(self, fp): 2557 self.fp = fp 2558 2559 def write(self, data): 2560 return self.fp.write(data) 2561 2562 def flush(self): 2563 self.fp.flush() 2564 2565class UnseekableTests(unittest.TestCase): 2566 def test_writestr(self): 2567 for wrapper in (lambda f: f), Tellable, Unseekable: 2568 with self.subTest(wrapper=wrapper): 2569 f = io.BytesIO() 2570 f.write(b'abc') 2571 bf = io.BufferedWriter(f) 2572 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2573 zipfp.writestr('ones', b'111') 2574 zipfp.writestr('twos', b'222') 2575 self.assertEqual(f.getvalue()[:5], b'abcPK') 2576 with zipfile.ZipFile(f, mode='r') as zipf: 2577 with zipf.open('ones') as zopen: 2578 self.assertEqual(zopen.read(), b'111') 2579 with zipf.open('twos') as zopen: 2580 self.assertEqual(zopen.read(), b'222') 2581 2582 def test_write(self): 2583 for wrapper in (lambda f: f), Tellable, Unseekable: 2584 with self.subTest(wrapper=wrapper): 2585 f = io.BytesIO() 2586 f.write(b'abc') 2587 bf = io.BufferedWriter(f) 2588 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2589 self.addCleanup(unlink, TESTFN) 2590 with open(TESTFN, 'wb') as f2: 2591 f2.write(b'111') 2592 zipfp.write(TESTFN, 'ones') 2593 with open(TESTFN, 'wb') as f2: 2594 f2.write(b'222') 2595 zipfp.write(TESTFN, 'twos') 2596 self.assertEqual(f.getvalue()[:5], b'abcPK') 2597 with zipfile.ZipFile(f, mode='r') as zipf: 2598 with zipf.open('ones') as zopen: 2599 self.assertEqual(zopen.read(), b'111') 2600 with zipf.open('twos') as zopen: 2601 self.assertEqual(zopen.read(), b'222') 2602 2603 def test_open_write(self): 2604 for wrapper in (lambda f: f), Tellable, Unseekable: 2605 with self.subTest(wrapper=wrapper): 2606 f = io.BytesIO() 2607 f.write(b'abc') 2608 bf = io.BufferedWriter(f) 2609 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2610 with zipf.open('ones', 'w') as zopen: 2611 zopen.write(b'111') 2612 with zipf.open('twos', 'w') as zopen: 2613 zopen.write(b'222') 2614 self.assertEqual(f.getvalue()[:5], b'abcPK') 2615 with zipfile.ZipFile(f) as zipf: 2616 self.assertEqual(zipf.read('ones'), b'111') 2617 self.assertEqual(zipf.read('twos'), b'222') 2618 2619 2620@requires_zlib() 2621class TestsWithMultipleOpens(unittest.TestCase): 2622 @classmethod 2623 def setUpClass(cls): 2624 cls.data1 = b'111' + randbytes(10000) 2625 cls.data2 = b'222' + randbytes(10000) 2626 2627 def make_test_archive(self, f): 2628 # Create the ZIP archive 2629 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2630 zipfp.writestr('ones', self.data1) 2631 zipfp.writestr('twos', self.data2) 2632 2633 def test_same_file(self): 2634 # Verify that (when the ZipFile is in control of creating file objects) 2635 # multiple open() calls can be made without interfering with each other. 2636 for f in get_files(self): 2637 self.make_test_archive(f) 2638 with zipfile.ZipFile(f, mode="r") as zipf: 2639 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2640 data1 = zopen1.read(500) 2641 data2 = zopen2.read(500) 2642 data1 += zopen1.read() 2643 data2 += zopen2.read() 2644 self.assertEqual(data1, data2) 2645 self.assertEqual(data1, self.data1) 2646 2647 def test_different_file(self): 2648 # Verify that (when the ZipFile is in control of creating file objects) 2649 # multiple open() calls can be made without interfering with each other. 2650 for f in get_files(self): 2651 self.make_test_archive(f) 2652 with zipfile.ZipFile(f, mode="r") as zipf: 2653 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2654 data1 = zopen1.read(500) 2655 data2 = zopen2.read(500) 2656 data1 += zopen1.read() 2657 data2 += zopen2.read() 2658 self.assertEqual(data1, self.data1) 2659 self.assertEqual(data2, self.data2) 2660 2661 def test_interleaved(self): 2662 # Verify that (when the ZipFile is in control of creating file objects) 2663 # multiple open() calls can be made without interfering with each other. 2664 for f in get_files(self): 2665 self.make_test_archive(f) 2666 with zipfile.ZipFile(f, mode="r") as zipf: 2667 with zipf.open('ones') as zopen1: 2668 data1 = zopen1.read(500) 2669 with zipf.open('twos') as zopen2: 2670 data2 = zopen2.read(500) 2671 data1 += zopen1.read() 2672 data2 += zopen2.read() 2673 self.assertEqual(data1, self.data1) 2674 self.assertEqual(data2, self.data2) 2675 2676 def test_read_after_close(self): 2677 for f in get_files(self): 2678 self.make_test_archive(f) 2679 with contextlib.ExitStack() as stack: 2680 with zipfile.ZipFile(f, 'r') as zipf: 2681 zopen1 = stack.enter_context(zipf.open('ones')) 2682 zopen2 = stack.enter_context(zipf.open('twos')) 2683 data1 = zopen1.read(500) 2684 data2 = zopen2.read(500) 2685 data1 += zopen1.read() 2686 data2 += zopen2.read() 2687 self.assertEqual(data1, self.data1) 2688 self.assertEqual(data2, self.data2) 2689 2690 def test_read_after_write(self): 2691 for f in get_files(self): 2692 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2693 zipf.writestr('ones', self.data1) 2694 zipf.writestr('twos', self.data2) 2695 with zipf.open('ones') as zopen1: 2696 data1 = zopen1.read(500) 2697 self.assertEqual(data1, self.data1[:500]) 2698 with zipfile.ZipFile(f, 'r') as zipf: 2699 data1 = zipf.read('ones') 2700 data2 = zipf.read('twos') 2701 self.assertEqual(data1, self.data1) 2702 self.assertEqual(data2, self.data2) 2703 2704 def test_write_after_read(self): 2705 for f in get_files(self): 2706 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2707 zipf.writestr('ones', self.data1) 2708 with zipf.open('ones') as zopen1: 2709 zopen1.read(500) 2710 zipf.writestr('twos', self.data2) 2711 with zipfile.ZipFile(f, 'r') as zipf: 2712 data1 = zipf.read('ones') 2713 data2 = zipf.read('twos') 2714 self.assertEqual(data1, self.data1) 2715 self.assertEqual(data2, self.data2) 2716 2717 def test_many_opens(self): 2718 # Verify that read() and open() promptly close the file descriptor, 2719 # and don't rely on the garbage collector to free resources. 2720 startcount = fd_count() 2721 self.make_test_archive(TESTFN2) 2722 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2723 for x in range(100): 2724 zipf.read('ones') 2725 with zipf.open('ones') as zopen1: 2726 pass 2727 self.assertEqual(startcount, fd_count()) 2728 2729 def test_write_while_reading(self): 2730 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2731 zipf.writestr('ones', self.data1) 2732 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2733 with zipf.open('ones', 'r') as r1: 2734 data1 = r1.read(500) 2735 with zipf.open('twos', 'w') as w1: 2736 w1.write(self.data2) 2737 data1 += r1.read() 2738 self.assertEqual(data1, self.data1) 2739 with zipfile.ZipFile(TESTFN2) as zipf: 2740 self.assertEqual(zipf.read('twos'), self.data2) 2741 2742 def tearDown(self): 2743 unlink(TESTFN2) 2744 2745 2746class TestWithDirectory(unittest.TestCase): 2747 def setUp(self): 2748 os.mkdir(TESTFN2) 2749 2750 def test_extract_dir(self): 2751 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2752 zipf.extractall(TESTFN2) 2753 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2754 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2755 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2756 2757 def test_bug_6050(self): 2758 # Extraction should succeed if directories already exist 2759 os.mkdir(os.path.join(TESTFN2, "a")) 2760 self.test_extract_dir() 2761 2762 def test_write_dir(self): 2763 dirpath = os.path.join(TESTFN2, "x") 2764 os.mkdir(dirpath) 2765 mode = os.stat(dirpath).st_mode & 0xFFFF 2766 with zipfile.ZipFile(TESTFN, "w") as zipf: 2767 zipf.write(dirpath) 2768 zinfo = zipf.filelist[0] 2769 self.assertTrue(zinfo.filename.endswith("/x/")) 2770 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2771 zipf.write(dirpath, "y") 2772 zinfo = zipf.filelist[1] 2773 self.assertTrue(zinfo.filename, "y/") 2774 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2775 with zipfile.ZipFile(TESTFN, "r") as zipf: 2776 zinfo = zipf.filelist[0] 2777 self.assertTrue(zinfo.filename.endswith("/x/")) 2778 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2779 zinfo = zipf.filelist[1] 2780 self.assertTrue(zinfo.filename, "y/") 2781 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2782 target = os.path.join(TESTFN2, "target") 2783 os.mkdir(target) 2784 zipf.extractall(target) 2785 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2786 self.assertEqual(len(os.listdir(target)), 2) 2787 2788 def test_writestr_dir(self): 2789 os.mkdir(os.path.join(TESTFN2, "x")) 2790 with zipfile.ZipFile(TESTFN, "w") as zipf: 2791 zipf.writestr("x/", b'') 2792 zinfo = zipf.filelist[0] 2793 self.assertEqual(zinfo.filename, "x/") 2794 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2795 with zipfile.ZipFile(TESTFN, "r") as zipf: 2796 zinfo = zipf.filelist[0] 2797 self.assertTrue(zinfo.filename.endswith("x/")) 2798 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2799 target = os.path.join(TESTFN2, "target") 2800 os.mkdir(target) 2801 zipf.extractall(target) 2802 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2803 self.assertEqual(os.listdir(target), ["x"]) 2804 2805 def test_mkdir(self): 2806 with zipfile.ZipFile(TESTFN, "w") as zf: 2807 zf.mkdir("directory") 2808 zinfo = zf.filelist[0] 2809 self.assertEqual(zinfo.filename, "directory/") 2810 self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10) 2811 2812 zf.mkdir("directory2/") 2813 zinfo = zf.filelist[1] 2814 self.assertEqual(zinfo.filename, "directory2/") 2815 self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10) 2816 2817 zf.mkdir("directory3", mode=0o777) 2818 zinfo = zf.filelist[2] 2819 self.assertEqual(zinfo.filename, "directory3/") 2820 self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10) 2821 2822 old_zinfo = zipfile.ZipInfo("directory4/") 2823 old_zinfo.external_attr = (0o40777 << 16) | 0x10 2824 old_zinfo.CRC = 0 2825 old_zinfo.file_size = 0 2826 old_zinfo.compress_size = 0 2827 zf.mkdir(old_zinfo) 2828 new_zinfo = zf.filelist[3] 2829 self.assertEqual(old_zinfo.filename, "directory4/") 2830 self.assertEqual(old_zinfo.external_attr, new_zinfo.external_attr) 2831 2832 target = os.path.join(TESTFN2, "target") 2833 os.mkdir(target) 2834 zf.extractall(target) 2835 self.assertEqual(set(os.listdir(target)), {"directory", "directory2", "directory3", "directory4"}) 2836 2837 def test_create_directory_with_write(self): 2838 with zipfile.ZipFile(TESTFN, "w") as zf: 2839 zf.writestr(zipfile.ZipInfo('directory/'), '') 2840 2841 zinfo = zf.filelist[0] 2842 self.assertEqual(zinfo.filename, "directory/") 2843 2844 directory = os.path.join(TESTFN2, "directory2") 2845 os.mkdir(directory) 2846 mode = os.stat(directory).st_mode 2847 zf.write(directory, arcname="directory2/") 2848 zinfo = zf.filelist[1] 2849 self.assertEqual(zinfo.filename, "directory2/") 2850 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2851 2852 target = os.path.join(TESTFN2, "target") 2853 os.mkdir(target) 2854 zf.extractall(target) 2855 2856 self.assertEqual(set(os.listdir(target)), {"directory", "directory2"}) 2857 2858 def tearDown(self): 2859 rmtree(TESTFN2) 2860 if os.path.exists(TESTFN): 2861 unlink(TESTFN) 2862 2863 2864class ZipInfoTests(unittest.TestCase): 2865 def test_from_file(self): 2866 zi = zipfile.ZipInfo.from_file(__file__) 2867 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2868 self.assertFalse(zi.is_dir()) 2869 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2870 2871 def test_from_file_pathlike(self): 2872 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2873 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2874 self.assertFalse(zi.is_dir()) 2875 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2876 2877 def test_from_file_bytes(self): 2878 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2879 self.assertEqual(posixpath.basename(zi.filename), 'test') 2880 self.assertFalse(zi.is_dir()) 2881 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2882 2883 def test_from_file_fileno(self): 2884 with open(__file__, 'rb') as f: 2885 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2886 self.assertEqual(posixpath.basename(zi.filename), 'test') 2887 self.assertFalse(zi.is_dir()) 2888 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2889 2890 def test_from_dir(self): 2891 dirpath = os.path.dirname(os.path.abspath(__file__)) 2892 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2893 self.assertEqual(zi.filename, 'stdlib_tests/') 2894 self.assertTrue(zi.is_dir()) 2895 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2896 self.assertEqual(zi.file_size, 0) 2897 2898 2899class CommandLineTest(unittest.TestCase): 2900 2901 def zipfilecmd(self, *args, **kwargs): 2902 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2903 **kwargs) 2904 return out.replace(os.linesep.encode(), b'\n') 2905 2906 def zipfilecmd_failure(self, *args): 2907 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2908 2909 def test_bad_use(self): 2910 rc, out, err = self.zipfilecmd_failure() 2911 self.assertEqual(out, b'') 2912 self.assertIn(b'usage', err.lower()) 2913 self.assertIn(b'error', err.lower()) 2914 self.assertIn(b'required', err.lower()) 2915 rc, out, err = self.zipfilecmd_failure('-l', '') 2916 self.assertEqual(out, b'') 2917 self.assertNotEqual(err.strip(), b'') 2918 2919 def test_test_command(self): 2920 zip_name = findfile('zipdir.zip') 2921 for opt in '-t', '--test': 2922 out = self.zipfilecmd(opt, zip_name) 2923 self.assertEqual(out.rstrip(), b'Done testing') 2924 zip_name = findfile('testtar.tar') 2925 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2926 self.assertEqual(out, b'') 2927 2928 def test_list_command(self): 2929 zip_name = findfile('zipdir.zip') 2930 t = io.StringIO() 2931 with zipfile.ZipFile(zip_name, 'r') as tf: 2932 tf.printdir(t) 2933 expected = t.getvalue().encode('ascii', 'backslashreplace') 2934 for opt in '-l', '--list': 2935 out = self.zipfilecmd(opt, zip_name, 2936 PYTHONIOENCODING='ascii:backslashreplace') 2937 self.assertEqual(out, expected) 2938 2939 @requires_zlib() 2940 def test_create_command(self): 2941 self.addCleanup(unlink, TESTFN) 2942 with open(TESTFN, 'w', encoding='utf-8') as f: 2943 f.write('test 1') 2944 os.mkdir(TESTFNDIR) 2945 self.addCleanup(rmtree, TESTFNDIR) 2946 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f: 2947 f.write('test 2') 2948 files = [TESTFN, TESTFNDIR] 2949 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2950 for opt in '-c', '--create': 2951 try: 2952 out = self.zipfilecmd(opt, TESTFN2, *files) 2953 self.assertEqual(out, b'') 2954 with zipfile.ZipFile(TESTFN2) as zf: 2955 self.assertEqual(zf.namelist(), namelist) 2956 self.assertEqual(zf.read(namelist[0]), b'test 1') 2957 self.assertEqual(zf.read(namelist[2]), b'test 2') 2958 finally: 2959 unlink(TESTFN2) 2960 2961 def test_extract_command(self): 2962 zip_name = findfile('zipdir.zip') 2963 for opt in '-e', '--extract': 2964 with temp_dir() as extdir: 2965 out = self.zipfilecmd(opt, zip_name, extdir) 2966 self.assertEqual(out, b'') 2967 with zipfile.ZipFile(zip_name) as zf: 2968 for zi in zf.infolist(): 2969 path = os.path.join(extdir, 2970 zi.filename.replace('/', os.sep)) 2971 if zi.is_dir(): 2972 self.assertTrue(os.path.isdir(path)) 2973 else: 2974 self.assertTrue(os.path.isfile(path)) 2975 with open(path, 'rb') as f: 2976 self.assertEqual(f.read(), zf.read(zi)) 2977 2978 2979class TestExecutablePrependedZip(unittest.TestCase): 2980 """Test our ability to open zip files with an executable prepended.""" 2981 2982 def setUp(self): 2983 self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata') 2984 self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata') 2985 2986 def _test_zip_works(self, name): 2987 # bpo28494 sanity check: ensure is_zipfile works on these. 2988 self.assertTrue(zipfile.is_zipfile(name), 2989 f'is_zipfile failed on {name}') 2990 # Ensure we can operate on these via ZipFile. 2991 with zipfile.ZipFile(name) as zipfp: 2992 for n in zipfp.namelist(): 2993 data = zipfp.read(n) 2994 self.assertIn(b'FAVORITE_NUMBER', data) 2995 2996 def test_read_zip_with_exe_prepended(self): 2997 self._test_zip_works(self.exe_zip) 2998 2999 def test_read_zip64_with_exe_prepended(self): 3000 self._test_zip_works(self.exe_zip64) 3001 3002 @unittest.skipUnless(sys.executable, 'sys.executable required.') 3003 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 3004 'Test relies on #!/bin/bash working.') 3005 @requires_subprocess() 3006 def test_execute_zip2(self): 3007 output = subprocess.check_output([self.exe_zip, sys.executable]) 3008 self.assertIn(b'number in executable: 5', output) 3009 3010 @unittest.skipUnless(sys.executable, 'sys.executable required.') 3011 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 3012 'Test relies on #!/bin/bash working.') 3013 @requires_subprocess() 3014 def test_execute_zip64(self): 3015 output = subprocess.check_output([self.exe_zip64, sys.executable]) 3016 self.assertIn(b'number in executable: 5', output) 3017 3018 3019# Poor man's technique to consume a (smallish) iterable. 3020consume = tuple 3021 3022 3023# from jaraco.itertools 5.0 3024class jaraco: 3025 class itertools: 3026 class Counter: 3027 def __init__(self, i): 3028 self.count = 0 3029 self._orig_iter = iter(i) 3030 3031 def __iter__(self): 3032 return self 3033 3034 def __next__(self): 3035 result = next(self._orig_iter) 3036 self.count += 1 3037 return result 3038 3039 3040def add_dirs(zf): 3041 """ 3042 Given a writable zip file zf, inject directory entries for 3043 any directories implied by the presence of children. 3044 """ 3045 for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()): 3046 zf.writestr(name, b"") 3047 return zf 3048 3049 3050def build_alpharep_fixture(): 3051 """ 3052 Create a zip file with this structure: 3053 3054 . 3055 ├── a.txt 3056 ├── b 3057 │ ├── c.txt 3058 │ ├── d 3059 │ │ └── e.txt 3060 │ └── f.txt 3061 └── g 3062 └── h 3063 └── i.txt 3064 3065 This fixture has the following key characteristics: 3066 3067 - a file at the root (a) 3068 - a file two levels deep (b/d/e) 3069 - multiple files in a directory (b/c, b/f) 3070 - a directory containing only a directory (g/h) 3071 3072 "alpha" because it uses alphabet 3073 "rep" because it's a representative example 3074 """ 3075 data = io.BytesIO() 3076 zf = zipfile.ZipFile(data, "w") 3077 zf.writestr("a.txt", b"content of a") 3078 zf.writestr("b/c.txt", b"content of c") 3079 zf.writestr("b/d/e.txt", b"content of e") 3080 zf.writestr("b/f.txt", b"content of f") 3081 zf.writestr("g/h/i.txt", b"content of i") 3082 zf.filename = "alpharep.zip" 3083 return zf 3084 3085 3086def pass_alpharep(meth): 3087 """ 3088 Given a method, wrap it in a for loop that invokes method 3089 with each subtest. 3090 """ 3091 3092 @functools.wraps(meth) 3093 def wrapper(self): 3094 for alpharep in self.zipfile_alpharep(): 3095 meth(self, alpharep=alpharep) 3096 3097 return wrapper 3098 3099 3100class TestPath(unittest.TestCase): 3101 def setUp(self): 3102 self.fixtures = contextlib.ExitStack() 3103 self.addCleanup(self.fixtures.close) 3104 3105 def zipfile_alpharep(self): 3106 with self.subTest(): 3107 yield build_alpharep_fixture() 3108 with self.subTest(): 3109 yield add_dirs(build_alpharep_fixture()) 3110 3111 def zipfile_ondisk(self, alpharep): 3112 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 3113 buffer = alpharep.fp 3114 alpharep.close() 3115 path = tmpdir / alpharep.filename 3116 with path.open("wb") as strm: 3117 strm.write(buffer.getvalue()) 3118 return path 3119 3120 @pass_alpharep 3121 def test_iterdir_and_types(self, alpharep): 3122 root = zipfile.Path(alpharep) 3123 assert root.is_dir() 3124 a, b, g = root.iterdir() 3125 assert a.is_file() 3126 assert b.is_dir() 3127 assert g.is_dir() 3128 c, f, d = b.iterdir() 3129 assert c.is_file() and f.is_file() 3130 (e,) = d.iterdir() 3131 assert e.is_file() 3132 (h,) = g.iterdir() 3133 (i,) = h.iterdir() 3134 assert i.is_file() 3135 3136 @pass_alpharep 3137 def test_is_file_missing(self, alpharep): 3138 root = zipfile.Path(alpharep) 3139 assert not root.joinpath('missing.txt').is_file() 3140 3141 @pass_alpharep 3142 def test_iterdir_on_file(self, alpharep): 3143 root = zipfile.Path(alpharep) 3144 a, b, g = root.iterdir() 3145 with self.assertRaises(ValueError): 3146 a.iterdir() 3147 3148 @pass_alpharep 3149 def test_subdir_is_dir(self, alpharep): 3150 root = zipfile.Path(alpharep) 3151 assert (root / 'b').is_dir() 3152 assert (root / 'b/').is_dir() 3153 assert (root / 'g').is_dir() 3154 assert (root / 'g/').is_dir() 3155 3156 @pass_alpharep 3157 def test_open(self, alpharep): 3158 root = zipfile.Path(alpharep) 3159 a, b, g = root.iterdir() 3160 with a.open(encoding="utf-8") as strm: 3161 data = strm.read() 3162 self.assertEqual(data, "content of a") 3163 with a.open('r', "utf-8") as strm: # not a kw, no gh-101144 TypeError 3164 data = strm.read() 3165 self.assertEqual(data, "content of a") 3166 3167 def test_open_encoding_utf16(self): 3168 in_memory_file = io.BytesIO() 3169 zf = zipfile.ZipFile(in_memory_file, "w") 3170 zf.writestr("path/16.txt", "This was utf-16".encode("utf-16")) 3171 zf.filename = "test_open_utf16.zip" 3172 root = zipfile.Path(zf) 3173 (path,) = root.iterdir() 3174 u16 = path.joinpath("16.txt") 3175 with u16.open('r', "utf-16") as strm: 3176 data = strm.read() 3177 self.assertEqual(data, "This was utf-16") 3178 with u16.open(encoding="utf-16") as strm: 3179 data = strm.read() 3180 self.assertEqual(data, "This was utf-16") 3181 3182 def test_open_encoding_errors(self): 3183 in_memory_file = io.BytesIO() 3184 zf = zipfile.ZipFile(in_memory_file, "w") 3185 zf.writestr("path/bad-utf8.bin", b"invalid utf-8: \xff\xff.") 3186 zf.filename = "test_read_text_encoding_errors.zip" 3187 root = zipfile.Path(zf) 3188 (path,) = root.iterdir() 3189 u16 = path.joinpath("bad-utf8.bin") 3190 3191 # encoding= as a positional argument for gh-101144. 3192 data = u16.read_text("utf-8", errors="ignore") 3193 self.assertEqual(data, "invalid utf-8: .") 3194 with u16.open("r", "utf-8", errors="surrogateescape") as f: 3195 self.assertEqual(f.read(), "invalid utf-8: \udcff\udcff.") 3196 3197 # encoding= both positional and keyword is an error; gh-101144. 3198 with self.assertRaisesRegex(TypeError, "encoding"): 3199 data = u16.read_text("utf-8", encoding="utf-8") 3200 3201 # both keyword arguments work. 3202 with u16.open("r", encoding="utf-8", errors="strict") as f: 3203 # error during decoding with wrong codec. 3204 with self.assertRaises(UnicodeDecodeError): 3205 f.read() 3206 3207 def test_encoding_warnings(self): 3208 """EncodingWarning must blame the read_text and open calls.""" 3209 code = '''\ 3210import io, zipfile 3211with zipfile.ZipFile(io.BytesIO(), "w") as zf: 3212 zf.filename = '<test_encoding_warnings in memory zip file>' 3213 zf.writestr("path/file.txt", b"Spanish Inquisition") 3214 root = zipfile.Path(zf) 3215 (path,) = root.iterdir() 3216 file_path = path.joinpath("file.txt") 3217 unused = file_path.read_text() # should warn 3218 file_path.open("r").close() # should warn 3219''' 3220 proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code) 3221 warnings = proc.err.splitlines() 3222 self.assertEqual(len(warnings), 2, proc.err) 3223 self.assertRegex(warnings[0], rb"^<string>:8: EncodingWarning:") 3224 self.assertRegex(warnings[1], rb"^<string>:9: EncodingWarning:") 3225 3226 def test_open_write(self): 3227 """ 3228 If the zipfile is open for write, it should be possible to 3229 write bytes or text to it. 3230 """ 3231 zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w')) 3232 with zf.joinpath('file.bin').open('wb') as strm: 3233 strm.write(b'binary contents') 3234 with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm: 3235 strm.write('text file') 3236 3237 def test_open_extant_directory(self): 3238 """ 3239 Attempting to open a directory raises IsADirectoryError. 3240 """ 3241 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 3242 with self.assertRaises(IsADirectoryError): 3243 zf.joinpath('b').open() 3244 3245 @pass_alpharep 3246 def test_open_binary_invalid_args(self, alpharep): 3247 root = zipfile.Path(alpharep) 3248 with self.assertRaises(ValueError): 3249 root.joinpath('a.txt').open('rb', encoding='utf-8') 3250 with self.assertRaises(ValueError): 3251 root.joinpath('a.txt').open('rb', 'utf-8') 3252 3253 def test_open_missing_directory(self): 3254 """ 3255 Attempting to open a missing directory raises FileNotFoundError. 3256 """ 3257 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 3258 with self.assertRaises(FileNotFoundError): 3259 zf.joinpath('z').open() 3260 3261 @pass_alpharep 3262 def test_read(self, alpharep): 3263 root = zipfile.Path(alpharep) 3264 a, b, g = root.iterdir() 3265 assert a.read_text(encoding="utf-8") == "content of a" 3266 a.read_text("utf-8") # No positional arg TypeError per gh-101144. 3267 assert a.read_bytes() == b"content of a" 3268 3269 @pass_alpharep 3270 def test_joinpath(self, alpharep): 3271 root = zipfile.Path(alpharep) 3272 a = root.joinpath("a.txt") 3273 assert a.is_file() 3274 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 3275 assert e.read_text(encoding="utf-8") == "content of e" 3276 3277 @pass_alpharep 3278 def test_joinpath_multiple(self, alpharep): 3279 root = zipfile.Path(alpharep) 3280 e = root.joinpath("b", "d", "e.txt") 3281 assert e.read_text(encoding="utf-8") == "content of e" 3282 3283 @pass_alpharep 3284 def test_traverse_truediv(self, alpharep): 3285 root = zipfile.Path(alpharep) 3286 a = root / "a.txt" 3287 assert a.is_file() 3288 e = root / "b" / "d" / "e.txt" 3289 assert e.read_text(encoding="utf-8") == "content of e" 3290 3291 @pass_alpharep 3292 def test_traverse_simplediv(self, alpharep): 3293 """ 3294 Disable the __future__.division when testing traversal. 3295 """ 3296 code = compile( 3297 source="zipfile.Path(alpharep) / 'a'", 3298 filename="(test)", 3299 mode="eval", 3300 dont_inherit=True, 3301 ) 3302 eval(code) 3303 3304 @pass_alpharep 3305 def test_pathlike_construction(self, alpharep): 3306 """ 3307 zipfile.Path should be constructable from a path-like object 3308 """ 3309 zipfile_ondisk = self.zipfile_ondisk(alpharep) 3310 pathlike = pathlib.Path(str(zipfile_ondisk)) 3311 zipfile.Path(pathlike) 3312 3313 @pass_alpharep 3314 def test_traverse_pathlike(self, alpharep): 3315 root = zipfile.Path(alpharep) 3316 root / pathlib.Path("a") 3317 3318 @pass_alpharep 3319 def test_parent(self, alpharep): 3320 root = zipfile.Path(alpharep) 3321 assert (root / 'a').parent.at == '' 3322 assert (root / 'a' / 'b').parent.at == 'a/' 3323 3324 @pass_alpharep 3325 def test_dir_parent(self, alpharep): 3326 root = zipfile.Path(alpharep) 3327 assert (root / 'b').parent.at == '' 3328 assert (root / 'b/').parent.at == '' 3329 3330 @pass_alpharep 3331 def test_missing_dir_parent(self, alpharep): 3332 root = zipfile.Path(alpharep) 3333 assert (root / 'missing dir/').parent.at == '' 3334 3335 @pass_alpharep 3336 def test_mutability(self, alpharep): 3337 """ 3338 If the underlying zipfile is changed, the Path object should 3339 reflect that change. 3340 """ 3341 root = zipfile.Path(alpharep) 3342 a, b, g = root.iterdir() 3343 alpharep.writestr('foo.txt', 'foo') 3344 alpharep.writestr('bar/baz.txt', 'baz') 3345 assert any(child.name == 'foo.txt' for child in root.iterdir()) 3346 assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo' 3347 (baz,) = (root / 'bar').iterdir() 3348 assert baz.read_text(encoding="utf-8") == 'baz' 3349 3350 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 3351 3352 def huge_zipfile(self): 3353 """Create a read-only zipfile with a huge number of entries entries.""" 3354 strm = io.BytesIO() 3355 zf = zipfile.ZipFile(strm, "w") 3356 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 3357 zf.writestr(entry, entry) 3358 zf.mode = 'r' 3359 return zf 3360 3361 def test_joinpath_constant_time(self): 3362 """ 3363 Ensure joinpath on items in zipfile is linear time. 3364 """ 3365 root = zipfile.Path(self.huge_zipfile()) 3366 entries = jaraco.itertools.Counter(root.iterdir()) 3367 for entry in entries: 3368 entry.joinpath('suffix') 3369 # Check the file iterated all items 3370 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 3371 3372 # @func_timeout.func_set_timeout(3) 3373 def test_implied_dirs_performance(self): 3374 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 3375 zipfile.CompleteDirs._implied_dirs(data) 3376 3377 @pass_alpharep 3378 def test_read_does_not_close(self, alpharep): 3379 alpharep = self.zipfile_ondisk(alpharep) 3380 with zipfile.ZipFile(alpharep) as file: 3381 for rep in range(2): 3382 zipfile.Path(file, 'a.txt').read_text(encoding="utf-8") 3383 3384 @pass_alpharep 3385 def test_subclass(self, alpharep): 3386 class Subclass(zipfile.Path): 3387 pass 3388 3389 root = Subclass(alpharep) 3390 assert isinstance(root / 'b', Subclass) 3391 3392 @pass_alpharep 3393 def test_filename(self, alpharep): 3394 root = zipfile.Path(alpharep) 3395 assert root.filename == pathlib.Path('alpharep.zip') 3396 3397 @pass_alpharep 3398 def test_root_name(self, alpharep): 3399 """ 3400 The name of the root should be the name of the zipfile 3401 """ 3402 root = zipfile.Path(alpharep) 3403 assert root.name == 'alpharep.zip' == root.filename.name 3404 3405 @pass_alpharep 3406 def test_suffix(self, alpharep): 3407 """ 3408 The suffix of the root should be the suffix of the zipfile. 3409 The suffix of each nested file is the final component's last suffix, if any. 3410 Includes the leading period, just like pathlib.Path. 3411 """ 3412 root = zipfile.Path(alpharep) 3413 assert root.suffix == '.zip' == root.filename.suffix 3414 3415 b = root / "b.txt" 3416 assert b.suffix == ".txt" 3417 3418 c = root / "c" / "filename.tar.gz" 3419 assert c.suffix == ".gz" 3420 3421 d = root / "d" 3422 assert d.suffix == "" 3423 3424 @pass_alpharep 3425 def test_suffixes(self, alpharep): 3426 """ 3427 The suffix of the root should be the suffix of the zipfile. 3428 The suffix of each nested file is the final component's last suffix, if any. 3429 Includes the leading period, just like pathlib.Path. 3430 """ 3431 root = zipfile.Path(alpharep) 3432 assert root.suffixes == ['.zip'] == root.filename.suffixes 3433 3434 b = root / 'b.txt' 3435 assert b.suffixes == ['.txt'] 3436 3437 c = root / 'c' / 'filename.tar.gz' 3438 assert c.suffixes == ['.tar', '.gz'] 3439 3440 d = root / 'd' 3441 assert d.suffixes == [] 3442 3443 e = root / '.hgrc' 3444 assert e.suffixes == [] 3445 3446 @pass_alpharep 3447 def test_stem(self, alpharep): 3448 """ 3449 The final path component, without its suffix 3450 """ 3451 root = zipfile.Path(alpharep) 3452 assert root.stem == 'alpharep' == root.filename.stem 3453 3454 b = root / "b.txt" 3455 assert b.stem == "b" 3456 3457 c = root / "c" / "filename.tar.gz" 3458 assert c.stem == "filename.tar" 3459 3460 d = root / "d" 3461 assert d.stem == "d" 3462 3463 @pass_alpharep 3464 def test_root_parent(self, alpharep): 3465 root = zipfile.Path(alpharep) 3466 assert root.parent == pathlib.Path('.') 3467 root.root.filename = 'foo/bar.zip' 3468 assert root.parent == pathlib.Path('foo') 3469 3470 @pass_alpharep 3471 def test_root_unnamed(self, alpharep): 3472 """ 3473 It is an error to attempt to get the name 3474 or parent of an unnamed zipfile. 3475 """ 3476 alpharep.filename = None 3477 root = zipfile.Path(alpharep) 3478 with self.assertRaises(TypeError): 3479 root.name 3480 with self.assertRaises(TypeError): 3481 root.parent 3482 3483 # .name and .parent should still work on subs 3484 sub = root / "b" 3485 assert sub.name == "b" 3486 assert sub.parent 3487 3488 @pass_alpharep 3489 def test_inheritance(self, alpharep): 3490 cls = type('PathChild', (zipfile.Path,), {}) 3491 for alpharep in self.zipfile_alpharep(): 3492 file = cls(alpharep).joinpath('some dir').parent 3493 assert isinstance(file, cls) 3494 3495 @pass_alpharep 3496 def test_extract_orig_with_implied_dirs(self, alpharep): 3497 """ 3498 A zip file wrapped in a Path should extract even with implied dirs. 3499 """ 3500 source_path = self.zipfile_ondisk(alpharep) 3501 zf = zipfile.ZipFile(source_path) 3502 # wrap the zipfile for its side effect 3503 zipfile.Path(zf) 3504 zf.extractall(source_path.parent) 3505 3506 3507class EncodedMetadataTests(unittest.TestCase): 3508 file_names = ['\u4e00', '\u4e8c', '\u4e09'] # Han 'one', 'two', 'three' 3509 file_content = [ 3510 "This is pure ASCII.\n".encode('ascii'), 3511 # This is modern Japanese. (UTF-8) 3512 "\u3053\u308c\u306f\u73fe\u4ee3\u7684\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('utf-8'), 3513 # This is obsolete Japanese. (Shift JIS) 3514 "\u3053\u308c\u306f\u53e4\u3044\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('shift_jis'), 3515 ] 3516 3517 def setUp(self): 3518 self.addCleanup(unlink, TESTFN) 3519 # Create .zip of 3 members with Han names encoded in Shift JIS. 3520 # Each name is 1 Han character encoding to 2 bytes in Shift JIS. 3521 # The ASCII names are arbitrary as long as they are length 2 and 3522 # not otherwise contained in the zip file. 3523 # Data elements are encoded bytes (ascii, utf-8, shift_jis). 3524 placeholders = ["n1", "n2"] + self.file_names[2:] 3525 with zipfile.ZipFile(TESTFN, mode="w") as tf: 3526 for temp, content in zip(placeholders, self.file_content): 3527 tf.writestr(temp, content, zipfile.ZIP_STORED) 3528 # Hack in the Shift JIS names with flag bit 11 (UTF-8) unset. 3529 with open(TESTFN, "rb") as tf: 3530 data = tf.read() 3531 for name, temp in zip(self.file_names, placeholders[:2]): 3532 data = data.replace(temp.encode('ascii'), 3533 name.encode('shift_jis')) 3534 with open(TESTFN, "wb") as tf: 3535 tf.write(data) 3536 3537 def _test_read(self, zipfp, expected_names, expected_content): 3538 # Check the namelist 3539 names = zipfp.namelist() 3540 self.assertEqual(sorted(names), sorted(expected_names)) 3541 3542 # Check infolist 3543 infos = zipfp.infolist() 3544 names = [zi.filename for zi in infos] 3545 self.assertEqual(sorted(names), sorted(expected_names)) 3546 3547 # check getinfo 3548 for name, content in zip(expected_names, expected_content): 3549 info = zipfp.getinfo(name) 3550 self.assertEqual(info.filename, name) 3551 self.assertEqual(info.file_size, len(content)) 3552 self.assertEqual(zipfp.read(name), content) 3553 3554 def test_read_with_metadata_encoding(self): 3555 # Read the ZIP archive with correct metadata_encoding 3556 with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp: 3557 self._test_read(zipfp, self.file_names, self.file_content) 3558 3559 def test_read_without_metadata_encoding(self): 3560 # Read the ZIP archive without metadata_encoding 3561 expected_names = [name.encode('shift_jis').decode('cp437') 3562 for name in self.file_names[:2]] + self.file_names[2:] 3563 with zipfile.ZipFile(TESTFN, "r") as zipfp: 3564 self._test_read(zipfp, expected_names, self.file_content) 3565 3566 def test_read_with_incorrect_metadata_encoding(self): 3567 # Read the ZIP archive with incorrect metadata_encoding 3568 expected_names = [name.encode('shift_jis').decode('koi8-u') 3569 for name in self.file_names[:2]] + self.file_names[2:] 3570 with zipfile.ZipFile(TESTFN, "r", metadata_encoding='koi8-u') as zipfp: 3571 self._test_read(zipfp, expected_names, self.file_content) 3572 3573 def test_read_with_unsuitable_metadata_encoding(self): 3574 # Read the ZIP archive with metadata_encoding unsuitable for 3575 # decoding metadata 3576 with self.assertRaises(UnicodeDecodeError): 3577 zipfile.ZipFile(TESTFN, "r", metadata_encoding='ascii') 3578 with self.assertRaises(UnicodeDecodeError): 3579 zipfile.ZipFile(TESTFN, "r", metadata_encoding='utf-8') 3580 3581 def test_read_after_append(self): 3582 newname = '\u56db' # Han 'four' 3583 expected_names = [name.encode('shift_jis').decode('cp437') 3584 for name in self.file_names[:2]] + self.file_names[2:] 3585 expected_names.append(newname) 3586 expected_content = (*self.file_content, b"newcontent") 3587 3588 with zipfile.ZipFile(TESTFN, "a") as zipfp: 3589 zipfp.writestr(newname, "newcontent") 3590 self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names)) 3591 3592 with zipfile.ZipFile(TESTFN, "r") as zipfp: 3593 self._test_read(zipfp, expected_names, expected_content) 3594 3595 with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp: 3596 self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names)) 3597 for i, (name, content) in enumerate(zip(expected_names, expected_content)): 3598 info = zipfp.getinfo(name) 3599 self.assertEqual(info.filename, name) 3600 self.assertEqual(info.file_size, len(content)) 3601 if i < 2: 3602 with self.assertRaises(zipfile.BadZipFile): 3603 zipfp.read(name) 3604 else: 3605 self.assertEqual(zipfp.read(name), content) 3606 3607 def test_write_with_metadata_encoding(self): 3608 ZF = zipfile.ZipFile 3609 for mode in ("w", "x", "a"): 3610 with self.assertRaisesRegex(ValueError, 3611 "^metadata_encoding is only"): 3612 ZF("nonesuch.zip", mode, metadata_encoding="shift_jis") 3613 3614 def test_cli_with_metadata_encoding(self): 3615 errmsg = "Non-conforming encodings not supported with -c." 3616 args = ["--metadata-encoding=shift_jis", "-c", "nonesuch", "nonesuch"] 3617 with captured_stdout() as stdout: 3618 with captured_stderr() as stderr: 3619 self.assertRaises(SystemExit, zipfile.main, args) 3620 self.assertEqual(stdout.getvalue(), "") 3621 self.assertIn(errmsg, stderr.getvalue()) 3622 3623 with captured_stdout() as stdout: 3624 zipfile.main(["--metadata-encoding=shift_jis", "-t", TESTFN]) 3625 listing = stdout.getvalue() 3626 3627 with captured_stdout() as stdout: 3628 zipfile.main(["--metadata-encoding=shift_jis", "-l", TESTFN]) 3629 listing = stdout.getvalue() 3630 for name in self.file_names: 3631 self.assertIn(name, listing) 3632 3633 def test_cli_with_metadata_encoding_extract(self): 3634 os.mkdir(TESTFN2) 3635 self.addCleanup(rmtree, TESTFN2) 3636 # Depending on locale, extracted file names can be not encodable 3637 # with the filesystem encoding. 3638 for fn in self.file_names: 3639 try: 3640 os.stat(os.path.join(TESTFN2, fn)) 3641 except OSError: 3642 pass 3643 except UnicodeEncodeError: 3644 self.skipTest(f'cannot encode file name {fn!r}') 3645 3646 zipfile.main(["--metadata-encoding=shift_jis", "-e", TESTFN, TESTFN2]) 3647 listing = os.listdir(TESTFN2) 3648 for name in self.file_names: 3649 self.assertIn(name, listing) 3650 3651 3652class StripExtraTests(unittest.TestCase): 3653 # Note: all of the "z" characters are technically invalid, but up 3654 # to 3 bytes at the end of the extra will be passed through as they 3655 # are too short to encode a valid extra. 3656 3657 ZIP64_EXTRA = 1 3658 3659 def test_no_data(self): 3660 s = struct.Struct("<HH") 3661 a = s.pack(self.ZIP64_EXTRA, 0) 3662 b = s.pack(2, 0) 3663 c = s.pack(3, 0) 3664 3665 self.assertEqual(b'', zipfile._strip_extra(a, (self.ZIP64_EXTRA,))) 3666 self.assertEqual(b, zipfile._strip_extra(b, (self.ZIP64_EXTRA,))) 3667 self.assertEqual( 3668 b+b"z", zipfile._strip_extra(b+b"z", (self.ZIP64_EXTRA,))) 3669 3670 self.assertEqual(b+c, zipfile._strip_extra(a+b+c, (self.ZIP64_EXTRA,))) 3671 self.assertEqual(b+c, zipfile._strip_extra(b+a+c, (self.ZIP64_EXTRA,))) 3672 self.assertEqual(b+c, zipfile._strip_extra(b+c+a, (self.ZIP64_EXTRA,))) 3673 3674 def test_with_data(self): 3675 s = struct.Struct("<HH") 3676 a = s.pack(self.ZIP64_EXTRA, 1) + b"a" 3677 b = s.pack(2, 2) + b"bb" 3678 c = s.pack(3, 3) + b"ccc" 3679 3680 self.assertEqual(b"", zipfile._strip_extra(a, (self.ZIP64_EXTRA,))) 3681 self.assertEqual(b, zipfile._strip_extra(b, (self.ZIP64_EXTRA,))) 3682 self.assertEqual( 3683 b+b"z", zipfile._strip_extra(b+b"z", (self.ZIP64_EXTRA,))) 3684 3685 self.assertEqual(b+c, zipfile._strip_extra(a+b+c, (self.ZIP64_EXTRA,))) 3686 self.assertEqual(b+c, zipfile._strip_extra(b+a+c, (self.ZIP64_EXTRA,))) 3687 self.assertEqual(b+c, zipfile._strip_extra(b+c+a, (self.ZIP64_EXTRA,))) 3688 3689 def test_multiples(self): 3690 s = struct.Struct("<HH") 3691 a = s.pack(self.ZIP64_EXTRA, 1) + b"a" 3692 b = s.pack(2, 2) + b"bb" 3693 3694 self.assertEqual(b"", zipfile._strip_extra(a+a, (self.ZIP64_EXTRA,))) 3695 self.assertEqual(b"", zipfile._strip_extra(a+a+a, (self.ZIP64_EXTRA,))) 3696 self.assertEqual( 3697 b"z", zipfile._strip_extra(a+a+b"z", (self.ZIP64_EXTRA,))) 3698 self.assertEqual( 3699 b+b"z", zipfile._strip_extra(a+a+b+b"z", (self.ZIP64_EXTRA,))) 3700 3701 self.assertEqual(b, zipfile._strip_extra(a+a+b, (self.ZIP64_EXTRA,))) 3702 self.assertEqual(b, zipfile._strip_extra(a+b+a, (self.ZIP64_EXTRA,))) 3703 self.assertEqual(b, zipfile._strip_extra(b+a+a, (self.ZIP64_EXTRA,))) 3704 3705 def test_too_short(self): 3706 self.assertEqual(b"", zipfile._strip_extra(b"", (self.ZIP64_EXTRA,))) 3707 self.assertEqual(b"z", zipfile._strip_extra(b"z", (self.ZIP64_EXTRA,))) 3708 self.assertEqual( 3709 b"zz", zipfile._strip_extra(b"zz", (self.ZIP64_EXTRA,))) 3710 self.assertEqual( 3711 b"zzz", zipfile._strip_extra(b"zzz", (self.ZIP64_EXTRA,))) 3712 3713 3714if __name__ == "__main__": 3715 unittest.main() 3716