1import array
2import contextlib
3import importlib.util
4import io
5import itertools
6import os
7import pathlib
8import posixpath
9import string
10import struct
11import subprocess
12import sys
13from test.support.script_helper import assert_python_ok
14import time
15import unittest
16import unittest.mock as mock
17import zipfile
18import functools
19
20
21from tempfile import TemporaryFile
22from random import randint, random, randbytes
23
24from test.support import script_helper
25from test.support import (
26    findfile, requires_zlib, requires_bz2, requires_lzma,
27    captured_stdout, captured_stderr, requires_subprocess
28)
29from test.support.os_helper import (
30    TESTFN, unlink, rmtree, temp_dir, temp_cwd, fd_count
31)
32
33
34TESTFN2 = TESTFN + "2"
35TESTFNDIR = TESTFN + "d"
36FIXEDTEST_SIZE = 1000
37DATAFILES_DIR = 'zipfile_datafiles'
38
39SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
40                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
41                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
42                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
43
44def get_files(test):
45    yield TESTFN2
46    with TemporaryFile() as f:
47        yield f
48        test.assertFalse(f.closed)
49    with io.BytesIO() as f:
50        yield f
51        test.assertFalse(f.closed)
52
53class AbstractTestsWithSourceFile:
54    @classmethod
55    def setUpClass(cls):
56        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
57                              (i, random()), "ascii")
58                        for i in range(FIXEDTEST_SIZE)]
59        cls.data = b''.join(cls.line_gen)
60
61    def setUp(self):
62        # Make a source file with some lines
63        with open(TESTFN, "wb") as fp:
64            fp.write(self.data)
65
66    def make_test_archive(self, f, compression, compresslevel=None):
67        kwargs = {'compression': compression, 'compresslevel': compresslevel}
68        # Create the ZIP archive
69        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
70            zipfp.write(TESTFN, "another.name")
71            zipfp.write(TESTFN, TESTFN)
72            zipfp.writestr("strfile", self.data)
73            with zipfp.open('written-open-w', mode='w') as f:
74                for line in self.line_gen:
75                    f.write(line)
76
77    def zip_test(self, f, compression, compresslevel=None):
78        self.make_test_archive(f, compression, compresslevel)
79
80        # Read the ZIP archive
81        with zipfile.ZipFile(f, "r", compression) as zipfp:
82            self.assertEqual(zipfp.read(TESTFN), self.data)
83            self.assertEqual(zipfp.read("another.name"), self.data)
84            self.assertEqual(zipfp.read("strfile"), self.data)
85
86            # Print the ZIP directory
87            fp = io.StringIO()
88            zipfp.printdir(file=fp)
89            directory = fp.getvalue()
90            lines = directory.splitlines()
91            self.assertEqual(len(lines), 5) # Number of files + header
92
93            self.assertIn('File Name', lines[0])
94            self.assertIn('Modified', lines[0])
95            self.assertIn('Size', lines[0])
96
97            fn, date, time_, size = lines[1].split()
98            self.assertEqual(fn, 'another.name')
99            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
100            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
101            self.assertEqual(size, str(len(self.data)))
102
103            # Check the namelist
104            names = zipfp.namelist()
105            self.assertEqual(len(names), 4)
106            self.assertIn(TESTFN, names)
107            self.assertIn("another.name", names)
108            self.assertIn("strfile", names)
109            self.assertIn("written-open-w", names)
110
111            # Check infolist
112            infos = zipfp.infolist()
113            names = [i.filename for i in infos]
114            self.assertEqual(len(names), 4)
115            self.assertIn(TESTFN, names)
116            self.assertIn("another.name", names)
117            self.assertIn("strfile", names)
118            self.assertIn("written-open-w", names)
119            for i in infos:
120                self.assertEqual(i.file_size, len(self.data))
121
122            # check getinfo
123            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
124                info = zipfp.getinfo(nm)
125                self.assertEqual(info.filename, nm)
126                self.assertEqual(info.file_size, len(self.data))
127
128            # Check that testzip doesn't raise an exception
129            zipfp.testzip()
130
131    def test_basic(self):
132        for f in get_files(self):
133            self.zip_test(f, self.compression)
134
135    def zip_open_test(self, f, compression):
136        self.make_test_archive(f, compression)
137
138        # Read the ZIP archive
139        with zipfile.ZipFile(f, "r", compression) as zipfp:
140            zipdata1 = []
141            with zipfp.open(TESTFN) as zipopen1:
142                while True:
143                    read_data = zipopen1.read(256)
144                    if not read_data:
145                        break
146                    zipdata1.append(read_data)
147
148            zipdata2 = []
149            with zipfp.open("another.name") as zipopen2:
150                while True:
151                    read_data = zipopen2.read(256)
152                    if not read_data:
153                        break
154                    zipdata2.append(read_data)
155
156            self.assertEqual(b''.join(zipdata1), self.data)
157            self.assertEqual(b''.join(zipdata2), self.data)
158
159    def test_open(self):
160        for f in get_files(self):
161            self.zip_open_test(f, self.compression)
162
163    def test_open_with_pathlike(self):
164        path = pathlib.Path(TESTFN2)
165        self.zip_open_test(path, self.compression)
166        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
167            self.assertIsInstance(zipfp.filename, str)
168
169    def zip_random_open_test(self, f, compression):
170        self.make_test_archive(f, compression)
171
172        # Read the ZIP archive
173        with zipfile.ZipFile(f, "r", compression) as zipfp:
174            zipdata1 = []
175            with zipfp.open(TESTFN) as zipopen1:
176                while True:
177                    read_data = zipopen1.read(randint(1, 1024))
178                    if not read_data:
179                        break
180                    zipdata1.append(read_data)
181
182            self.assertEqual(b''.join(zipdata1), self.data)
183
184    def test_random_open(self):
185        for f in get_files(self):
186            self.zip_random_open_test(f, self.compression)
187
188    def zip_read1_test(self, f, compression):
189        self.make_test_archive(f, compression)
190
191        # Read the ZIP archive
192        with zipfile.ZipFile(f, "r") as zipfp, \
193             zipfp.open(TESTFN) as zipopen:
194            zipdata = []
195            while True:
196                read_data = zipopen.read1(-1)
197                if not read_data:
198                    break
199                zipdata.append(read_data)
200
201        self.assertEqual(b''.join(zipdata), self.data)
202
203    def test_read1(self):
204        for f in get_files(self):
205            self.zip_read1_test(f, self.compression)
206
207    def zip_read1_10_test(self, f, compression):
208        self.make_test_archive(f, compression)
209
210        # Read the ZIP archive
211        with zipfile.ZipFile(f, "r") as zipfp, \
212             zipfp.open(TESTFN) as zipopen:
213            zipdata = []
214            while True:
215                read_data = zipopen.read1(10)
216                self.assertLessEqual(len(read_data), 10)
217                if not read_data:
218                    break
219                zipdata.append(read_data)
220
221        self.assertEqual(b''.join(zipdata), self.data)
222
223    def test_read1_10(self):
224        for f in get_files(self):
225            self.zip_read1_10_test(f, self.compression)
226
227    def zip_readline_read_test(self, f, compression):
228        self.make_test_archive(f, compression)
229
230        # Read the ZIP archive
231        with zipfile.ZipFile(f, "r") as zipfp, \
232             zipfp.open(TESTFN) as zipopen:
233            data = b''
234            while True:
235                read = zipopen.readline()
236                if not read:
237                    break
238                data += read
239
240                read = zipopen.read(100)
241                if not read:
242                    break
243                data += read
244
245        self.assertEqual(data, self.data)
246
247    def test_readline_read(self):
248        # Issue #7610: calls to readline() interleaved with calls to read().
249        for f in get_files(self):
250            self.zip_readline_read_test(f, self.compression)
251
252    def zip_readline_test(self, f, compression):
253        self.make_test_archive(f, compression)
254
255        # Read the ZIP archive
256        with zipfile.ZipFile(f, "r") as zipfp:
257            with zipfp.open(TESTFN) as zipopen:
258                for line in self.line_gen:
259                    linedata = zipopen.readline()
260                    self.assertEqual(linedata, line)
261
262    def test_readline(self):
263        for f in get_files(self):
264            self.zip_readline_test(f, self.compression)
265
266    def zip_readlines_test(self, f, compression):
267        self.make_test_archive(f, compression)
268
269        # Read the ZIP archive
270        with zipfile.ZipFile(f, "r") as zipfp:
271            with zipfp.open(TESTFN) as zipopen:
272                ziplines = zipopen.readlines()
273            for line, zipline in zip(self.line_gen, ziplines):
274                self.assertEqual(zipline, line)
275
276    def test_readlines(self):
277        for f in get_files(self):
278            self.zip_readlines_test(f, self.compression)
279
280    def zip_iterlines_test(self, f, compression):
281        self.make_test_archive(f, compression)
282
283        # Read the ZIP archive
284        with zipfile.ZipFile(f, "r") as zipfp:
285            with zipfp.open(TESTFN) as zipopen:
286                for line, zipline in zip(self.line_gen, zipopen):
287                    self.assertEqual(zipline, line)
288
289    def test_iterlines(self):
290        for f in get_files(self):
291            self.zip_iterlines_test(f, self.compression)
292
293    def test_low_compression(self):
294        """Check for cases where compressed data is larger than original."""
295        # Create the ZIP archive
296        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
297            zipfp.writestr("strfile", '12')
298
299        # Get an open object for strfile
300        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
301            with zipfp.open("strfile") as openobj:
302                self.assertEqual(openobj.read(1), b'1')
303                self.assertEqual(openobj.read(1), b'2')
304
305    def test_writestr_compression(self):
306        zipfp = zipfile.ZipFile(TESTFN2, "w")
307        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
308        info = zipfp.getinfo('b.txt')
309        self.assertEqual(info.compress_type, self.compression)
310
311    def test_writestr_compresslevel(self):
312        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
313        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
314        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
315                       compresslevel=2)
316
317        # Compression level follows the constructor.
318        a_info = zipfp.getinfo('a.txt')
319        self.assertEqual(a_info.compress_type, self.compression)
320        self.assertEqual(a_info._compresslevel, 1)
321
322        # Compression level is overridden.
323        b_info = zipfp.getinfo('b.txt')
324        self.assertEqual(b_info.compress_type, self.compression)
325        self.assertEqual(b_info._compresslevel, 2)
326
327    def test_read_return_size(self):
328        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
329        # than requested.
330        for test_size in (1, 4095, 4096, 4097, 16384):
331            file_size = test_size + 1
332            junk = randbytes(file_size)
333            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
334                zipf.writestr('foo', junk)
335                with zipf.open('foo', 'r') as fp:
336                    buf = fp.read(test_size)
337                    self.assertEqual(len(buf), test_size)
338
339    def test_truncated_zipfile(self):
340        fp = io.BytesIO()
341        with zipfile.ZipFile(fp, mode='w') as zipf:
342            zipf.writestr('strfile', self.data, compress_type=self.compression)
343            end_offset = fp.tell()
344        zipfiledata = fp.getvalue()
345
346        fp = io.BytesIO(zipfiledata)
347        with zipfile.ZipFile(fp) as zipf:
348            with zipf.open('strfile') as zipopen:
349                fp.truncate(end_offset - 20)
350                with self.assertRaises(EOFError):
351                    zipopen.read()
352
353        fp = io.BytesIO(zipfiledata)
354        with zipfile.ZipFile(fp) as zipf:
355            with zipf.open('strfile') as zipopen:
356                fp.truncate(end_offset - 20)
357                with self.assertRaises(EOFError):
358                    while zipopen.read(100):
359                        pass
360
361        fp = io.BytesIO(zipfiledata)
362        with zipfile.ZipFile(fp) as zipf:
363            with zipf.open('strfile') as zipopen:
364                fp.truncate(end_offset - 20)
365                with self.assertRaises(EOFError):
366                    while zipopen.read1(100):
367                        pass
368
369    def test_repr(self):
370        fname = 'file.name'
371        for f in get_files(self):
372            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
373                zipfp.write(TESTFN, fname)
374                r = repr(zipfp)
375                self.assertIn("mode='w'", r)
376
377            with zipfile.ZipFile(f, 'r') as zipfp:
378                r = repr(zipfp)
379                if isinstance(f, str):
380                    self.assertIn('filename=%r' % f, r)
381                else:
382                    self.assertIn('file=%r' % f, r)
383                self.assertIn("mode='r'", r)
384                r = repr(zipfp.getinfo(fname))
385                self.assertIn('filename=%r' % fname, r)
386                self.assertIn('filemode=', r)
387                self.assertIn('file_size=', r)
388                if self.compression != zipfile.ZIP_STORED:
389                    self.assertIn('compress_type=', r)
390                    self.assertIn('compress_size=', r)
391                with zipfp.open(fname) as zipopen:
392                    r = repr(zipopen)
393                    self.assertIn('name=%r' % fname, r)
394                    self.assertIn("mode='r'", r)
395                    if self.compression != zipfile.ZIP_STORED:
396                        self.assertIn('compress_type=', r)
397                self.assertIn('[closed]', repr(zipopen))
398            self.assertIn('[closed]', repr(zipfp))
399
400    def test_compresslevel_basic(self):
401        for f in get_files(self):
402            self.zip_test(f, self.compression, compresslevel=9)
403
404    def test_per_file_compresslevel(self):
405        """Check that files within a Zip archive can have different
406        compression levels."""
407        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
408            zipfp.write(TESTFN, 'compress_1')
409            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
410            one_info = zipfp.getinfo('compress_1')
411            nine_info = zipfp.getinfo('compress_9')
412            self.assertEqual(one_info._compresslevel, 1)
413            self.assertEqual(nine_info._compresslevel, 9)
414
415    def test_writing_errors(self):
416        class BrokenFile(io.BytesIO):
417            def write(self, data):
418                nonlocal count
419                if count is not None:
420                    if count == stop:
421                        raise OSError
422                    count += 1
423                super().write(data)
424
425        stop = 0
426        while True:
427            testfile = BrokenFile()
428            count = None
429            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
430                with zipfp.open('file1', 'w') as f:
431                    f.write(b'data1')
432                count = 0
433                try:
434                    with zipfp.open('file2', 'w') as f:
435                        f.write(b'data2')
436                except OSError:
437                    stop += 1
438                else:
439                    break
440                finally:
441                    count = None
442            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
443                self.assertEqual(zipfp.namelist(), ['file1'])
444                self.assertEqual(zipfp.read('file1'), b'data1')
445
446        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
447            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
448            self.assertEqual(zipfp.read('file1'), b'data1')
449            self.assertEqual(zipfp.read('file2'), b'data2')
450
451
452    def tearDown(self):
453        unlink(TESTFN)
454        unlink(TESTFN2)
455
456
457class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
458                                unittest.TestCase):
459    compression = zipfile.ZIP_STORED
460    test_low_compression = None
461
462    def zip_test_writestr_permissions(self, f, compression):
463        # Make sure that writestr and open(... mode='w') create files with
464        # mode 0600, when they are passed a name rather than a ZipInfo
465        # instance.
466
467        self.make_test_archive(f, compression)
468        with zipfile.ZipFile(f, "r") as zipfp:
469            zinfo = zipfp.getinfo('strfile')
470            self.assertEqual(zinfo.external_attr, 0o600 << 16)
471
472            zinfo2 = zipfp.getinfo('written-open-w')
473            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
474
475    def test_writestr_permissions(self):
476        for f in get_files(self):
477            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
478
479    def test_absolute_arcnames(self):
480        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
481            zipfp.write(TESTFN, "/absolute")
482
483        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
484            self.assertEqual(zipfp.namelist(), ["absolute"])
485
486    def test_append_to_zip_file(self):
487        """Test appending to an existing zipfile."""
488        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
489            zipfp.write(TESTFN, TESTFN)
490
491        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
492            zipfp.writestr("strfile", self.data)
493            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
494
495    def test_append_to_non_zip_file(self):
496        """Test appending to an existing file that is not a zipfile."""
497        # NOTE: this test fails if len(d) < 22 because of the first
498        # line "fpin.seek(-22, 2)" in _EndRecData
499        data = b'I am not a ZipFile!'*10
500        with open(TESTFN2, 'wb') as f:
501            f.write(data)
502
503        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
504            zipfp.write(TESTFN, TESTFN)
505
506        with open(TESTFN2, 'rb') as f:
507            f.seek(len(data))
508            with zipfile.ZipFile(f, "r") as zipfp:
509                self.assertEqual(zipfp.namelist(), [TESTFN])
510                self.assertEqual(zipfp.read(TESTFN), self.data)
511        with open(TESTFN2, 'rb') as f:
512            self.assertEqual(f.read(len(data)), data)
513            zipfiledata = f.read()
514        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
515            self.assertEqual(zipfp.namelist(), [TESTFN])
516            self.assertEqual(zipfp.read(TESTFN), self.data)
517
518    def test_read_concatenated_zip_file(self):
519        with io.BytesIO() as bio:
520            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
521                zipfp.write(TESTFN, TESTFN)
522            zipfiledata = bio.getvalue()
523        data = b'I am not a ZipFile!'*10
524        with open(TESTFN2, 'wb') as f:
525            f.write(data)
526            f.write(zipfiledata)
527
528        with zipfile.ZipFile(TESTFN2) as zipfp:
529            self.assertEqual(zipfp.namelist(), [TESTFN])
530            self.assertEqual(zipfp.read(TESTFN), self.data)
531
532    def test_append_to_concatenated_zip_file(self):
533        with io.BytesIO() as bio:
534            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
535                zipfp.write(TESTFN, TESTFN)
536            zipfiledata = bio.getvalue()
537        data = b'I am not a ZipFile!'*1000000
538        with open(TESTFN2, 'wb') as f:
539            f.write(data)
540            f.write(zipfiledata)
541
542        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
543            self.assertEqual(zipfp.namelist(), [TESTFN])
544            zipfp.writestr('strfile', self.data)
545
546        with open(TESTFN2, 'rb') as f:
547            self.assertEqual(f.read(len(data)), data)
548            zipfiledata = f.read()
549        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
550            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
551            self.assertEqual(zipfp.read(TESTFN), self.data)
552            self.assertEqual(zipfp.read('strfile'), self.data)
553
554    def test_ignores_newline_at_end(self):
555        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
556            zipfp.write(TESTFN, TESTFN)
557        with open(TESTFN2, 'a', encoding='utf-8') as f:
558            f.write("\r\n\00\00\00")
559        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
560            self.assertIsInstance(zipfp, zipfile.ZipFile)
561
562    def test_ignores_stuff_appended_past_comments(self):
563        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
564            zipfp.comment = b"this is a comment"
565            zipfp.write(TESTFN, TESTFN)
566        with open(TESTFN2, 'a', encoding='utf-8') as f:
567            f.write("abcdef\r\n")
568        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
569            self.assertIsInstance(zipfp, zipfile.ZipFile)
570            self.assertEqual(zipfp.comment, b"this is a comment")
571
572    def test_write_default_name(self):
573        """Check that calling ZipFile.write without arcname specified
574        produces the expected result."""
575        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
576            zipfp.write(TESTFN)
577            with open(TESTFN, "rb") as f:
578                self.assertEqual(zipfp.read(TESTFN), f.read())
579
580    def test_io_on_closed_zipextfile(self):
581        fname = "somefile.txt"
582        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
583            zipfp.writestr(fname, "bogus")
584
585        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
586            with zipfp.open(fname) as fid:
587                fid.close()
588                self.assertRaises(ValueError, fid.read)
589                self.assertRaises(ValueError, fid.seek, 0)
590                self.assertRaises(ValueError, fid.tell)
591                self.assertRaises(ValueError, fid.readable)
592                self.assertRaises(ValueError, fid.seekable)
593
594    def test_write_to_readonly(self):
595        """Check that trying to call write() on a readonly ZipFile object
596        raises a ValueError."""
597        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
598            zipfp.writestr("somefile.txt", "bogus")
599
600        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
601            self.assertRaises(ValueError, zipfp.write, TESTFN)
602
603        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
604            with self.assertRaises(ValueError):
605                zipfp.open(TESTFN, mode='w')
606
607    def test_add_file_before_1980(self):
608        # Set atime and mtime to 1970-01-01
609        os.utime(TESTFN, (0, 0))
610        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
611            self.assertRaises(ValueError, zipfp.write, TESTFN)
612
613        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
614            zipfp.write(TESTFN)
615            zinfo = zipfp.getinfo(TESTFN)
616            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
617
618    def test_add_file_after_2107(self):
619        # Set atime and mtime to 2108-12-30
620        ts = 4386268800
621        try:
622            time.localtime(ts)
623        except OverflowError:
624            self.skipTest(f'time.localtime({ts}) raises OverflowError')
625        try:
626            os.utime(TESTFN, (ts, ts))
627        except OverflowError:
628            self.skipTest('Host fs cannot set timestamp to required value.')
629
630        mtime_ns = os.stat(TESTFN).st_mtime_ns
631        if mtime_ns != (4386268800 * 10**9):
632            # XFS filesystem is limited to 32-bit timestamp, but the syscall
633            # didn't fail. Moreover, there is a VFS bug which returns
634            # a cached timestamp which is different than the value on disk.
635            #
636            # Test st_mtime_ns rather than st_mtime to avoid rounding issues.
637            #
638            # https://bugzilla.redhat.com/show_bug.cgi?id=1795576
639            # https://bugs.python.org/issue39460#msg360952
640            self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}")
641
642        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
643            self.assertRaises(struct.error, zipfp.write, TESTFN)
644
645        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
646            zipfp.write(TESTFN)
647            zinfo = zipfp.getinfo(TESTFN)
648            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
649
650
651@requires_zlib()
652class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
653                                 unittest.TestCase):
654    compression = zipfile.ZIP_DEFLATED
655
656    def test_per_file_compression(self):
657        """Check that files within a Zip archive can have different
658        compression options."""
659        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
660            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
661            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
662            sinfo = zipfp.getinfo('storeme')
663            dinfo = zipfp.getinfo('deflateme')
664            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
665            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
666
667@requires_bz2()
668class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
669                               unittest.TestCase):
670    compression = zipfile.ZIP_BZIP2
671
672@requires_lzma()
673class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
674                              unittest.TestCase):
675    compression = zipfile.ZIP_LZMA
676
677
678class AbstractTestZip64InSmallFiles:
679    # These tests test the ZIP64 functionality without using large files,
680    # see test_zipfile64 for proper tests.
681
682    @classmethod
683    def setUpClass(cls):
684        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
685                    for i in range(0, FIXEDTEST_SIZE))
686        cls.data = b'\n'.join(line_gen)
687
688    def setUp(self):
689        self._limit = zipfile.ZIP64_LIMIT
690        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
691        zipfile.ZIP64_LIMIT = 1000
692        zipfile.ZIP_FILECOUNT_LIMIT = 9
693
694        # Make a source file with some lines
695        with open(TESTFN, "wb") as fp:
696            fp.write(self.data)
697
698    def zip_test(self, f, compression):
699        # Create the ZIP archive
700        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
701            zipfp.write(TESTFN, "another.name")
702            zipfp.write(TESTFN, TESTFN)
703            zipfp.writestr("strfile", self.data)
704
705        # Read the ZIP archive
706        with zipfile.ZipFile(f, "r", compression) as zipfp:
707            self.assertEqual(zipfp.read(TESTFN), self.data)
708            self.assertEqual(zipfp.read("another.name"), self.data)
709            self.assertEqual(zipfp.read("strfile"), self.data)
710
711            # Print the ZIP directory
712            fp = io.StringIO()
713            zipfp.printdir(fp)
714
715            directory = fp.getvalue()
716            lines = directory.splitlines()
717            self.assertEqual(len(lines), 4) # Number of files + header
718
719            self.assertIn('File Name', lines[0])
720            self.assertIn('Modified', lines[0])
721            self.assertIn('Size', lines[0])
722
723            fn, date, time_, size = lines[1].split()
724            self.assertEqual(fn, 'another.name')
725            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
726            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
727            self.assertEqual(size, str(len(self.data)))
728
729            # Check the namelist
730            names = zipfp.namelist()
731            self.assertEqual(len(names), 3)
732            self.assertIn(TESTFN, names)
733            self.assertIn("another.name", names)
734            self.assertIn("strfile", names)
735
736            # Check infolist
737            infos = zipfp.infolist()
738            names = [i.filename for i in infos]
739            self.assertEqual(len(names), 3)
740            self.assertIn(TESTFN, names)
741            self.assertIn("another.name", names)
742            self.assertIn("strfile", names)
743            for i in infos:
744                self.assertEqual(i.file_size, len(self.data))
745
746            # check getinfo
747            for nm in (TESTFN, "another.name", "strfile"):
748                info = zipfp.getinfo(nm)
749                self.assertEqual(info.filename, nm)
750                self.assertEqual(info.file_size, len(self.data))
751
752            # Check that testzip doesn't raise an exception
753            zipfp.testzip()
754
755    def test_basic(self):
756        for f in get_files(self):
757            self.zip_test(f, self.compression)
758
759    def test_too_many_files(self):
760        # This test checks that more than 64k files can be added to an archive,
761        # and that the resulting archive can be read properly by ZipFile
762        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
763                               allowZip64=True)
764        zipf.debug = 100
765        numfiles = 15
766        for i in range(numfiles):
767            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
768        self.assertEqual(len(zipf.namelist()), numfiles)
769        zipf.close()
770
771        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
772        self.assertEqual(len(zipf2.namelist()), numfiles)
773        for i in range(numfiles):
774            content = zipf2.read("foo%08d" % i).decode('ascii')
775            self.assertEqual(content, "%d" % (i**3 % 57))
776        zipf2.close()
777
778    def test_too_many_files_append(self):
779        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
780                               allowZip64=False)
781        zipf.debug = 100
782        numfiles = 9
783        for i in range(numfiles):
784            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
785        self.assertEqual(len(zipf.namelist()), numfiles)
786        with self.assertRaises(zipfile.LargeZipFile):
787            zipf.writestr("foo%08d" % numfiles, b'')
788        self.assertEqual(len(zipf.namelist()), numfiles)
789        zipf.close()
790
791        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
792                               allowZip64=False)
793        zipf.debug = 100
794        self.assertEqual(len(zipf.namelist()), numfiles)
795        with self.assertRaises(zipfile.LargeZipFile):
796            zipf.writestr("foo%08d" % numfiles, b'')
797        self.assertEqual(len(zipf.namelist()), numfiles)
798        zipf.close()
799
800        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
801                               allowZip64=True)
802        zipf.debug = 100
803        self.assertEqual(len(zipf.namelist()), numfiles)
804        numfiles2 = 15
805        for i in range(numfiles, numfiles2):
806            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
807        self.assertEqual(len(zipf.namelist()), numfiles2)
808        zipf.close()
809
810        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
811        self.assertEqual(len(zipf2.namelist()), numfiles2)
812        for i in range(numfiles2):
813            content = zipf2.read("foo%08d" % i).decode('ascii')
814            self.assertEqual(content, "%d" % (i**3 % 57))
815        zipf2.close()
816
817    def tearDown(self):
818        zipfile.ZIP64_LIMIT = self._limit
819        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
820        unlink(TESTFN)
821        unlink(TESTFN2)
822
823
824class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
825                                  unittest.TestCase):
826    compression = zipfile.ZIP_STORED
827
828    def large_file_exception_test(self, f, compression):
829        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
830            self.assertRaises(zipfile.LargeZipFile,
831                              zipfp.write, TESTFN, "another.name")
832
833    def large_file_exception_test2(self, f, compression):
834        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
835            self.assertRaises(zipfile.LargeZipFile,
836                              zipfp.writestr, "another.name", self.data)
837
838    def test_large_file_exception(self):
839        for f in get_files(self):
840            self.large_file_exception_test(f, zipfile.ZIP_STORED)
841            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
842
843    def test_absolute_arcnames(self):
844        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
845                             allowZip64=True) as zipfp:
846            zipfp.write(TESTFN, "/absolute")
847
848        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
849            self.assertEqual(zipfp.namelist(), ["absolute"])
850
851    def test_append(self):
852        # Test that appending to the Zip64 archive doesn't change
853        # extra fields of existing entries.
854        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
855            zipfp.writestr("strfile", self.data)
856        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
857            zinfo = zipfp.getinfo("strfile")
858            extra = zinfo.extra
859        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
860            zipfp.writestr("strfile2", self.data)
861        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
862            zinfo = zipfp.getinfo("strfile")
863            self.assertEqual(zinfo.extra, extra)
864
865    def make_zip64_file(
866        self, file_size_64_set=False, file_size_extra=False,
867        compress_size_64_set=False, compress_size_extra=False,
868        header_offset_64_set=False, header_offset_extra=False,
869    ):
870        """Generate bytes sequence for a zip with (incomplete) zip64 data.
871
872        The actual values (not the zip 64 0xffffffff values) stored in the file
873        are:
874        file_size: 8
875        compress_size: 8
876        header_offset: 0
877        """
878        actual_size = 8
879        actual_header_offset = 0
880        local_zip64_fields = []
881        central_zip64_fields = []
882
883        file_size = actual_size
884        if file_size_64_set:
885            file_size = 0xffffffff
886            if file_size_extra:
887                local_zip64_fields.append(actual_size)
888                central_zip64_fields.append(actual_size)
889        file_size = struct.pack("<L", file_size)
890
891        compress_size = actual_size
892        if compress_size_64_set:
893            compress_size = 0xffffffff
894            if compress_size_extra:
895                local_zip64_fields.append(actual_size)
896                central_zip64_fields.append(actual_size)
897        compress_size = struct.pack("<L", compress_size)
898
899        header_offset = actual_header_offset
900        if header_offset_64_set:
901            header_offset = 0xffffffff
902            if header_offset_extra:
903                central_zip64_fields.append(actual_header_offset)
904        header_offset = struct.pack("<L", header_offset)
905
906        local_extra = struct.pack(
907            '<HH' + 'Q'*len(local_zip64_fields),
908            0x0001,
909            8*len(local_zip64_fields),
910            *local_zip64_fields
911        )
912
913        central_extra = struct.pack(
914            '<HH' + 'Q'*len(central_zip64_fields),
915            0x0001,
916            8*len(central_zip64_fields),
917            *central_zip64_fields
918        )
919
920        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
921        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
922
923        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
924        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
925
926        filename = b"test.txt"
927        content = b"test1234"
928        filename_length = struct.pack("<H", len(filename))
929        zip64_contents = (
930            # Local file header
931            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
932            + compress_size
933            + file_size
934            + filename_length
935            + local_extra_length
936            + filename
937            + local_extra
938            + content
939            # Central directory:
940            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
941            + compress_size
942            + file_size
943            + filename_length
944            + central_extra_length
945            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
946            + header_offset
947            + filename
948            + central_extra
949            # Zip64 end of central directory
950            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
951            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
952            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
953            + central_dir_size
954            + offset_to_central_dir
955            # Zip64 end of central directory locator
956            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
957            + b"\x00\x00\x00"
958            # end of central directory
959            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
960            + b"\x00\x00\x00\x00"
961        )
962        return zip64_contents
963
964    def test_bad_zip64_extra(self):
965        """Missing zip64 extra records raises an exception.
966
967        There are 4 fields that the zip64 format handles (the disk number is
968        not used in this module and so is ignored here). According to the zip
969        spec:
970              The order of the fields in the zip64 extended
971              information record is fixed, but the fields MUST
972              only appear if the corresponding Local or Central
973              directory record field is set to 0xFFFF or 0xFFFFFFFF.
974
975        If the zip64 extra content doesn't contain enough entries for the
976        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
977        This test mismatches the length of the zip64 extra field and the number
978        of fields set to indicate the presence of zip64 data.
979        """
980        # zip64 file size present, no fields in extra, expecting one, equals
981        # missing file size.
982        missing_file_size_extra = self.make_zip64_file(
983            file_size_64_set=True,
984        )
985        with self.assertRaises(zipfile.BadZipFile) as e:
986            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
987        self.assertIn('file size', str(e.exception).lower())
988
989        # zip64 file size present, zip64 compress size present, one field in
990        # extra, expecting two, equals missing compress size.
991        missing_compress_size_extra = self.make_zip64_file(
992            file_size_64_set=True,
993            file_size_extra=True,
994            compress_size_64_set=True,
995        )
996        with self.assertRaises(zipfile.BadZipFile) as e:
997            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
998        self.assertIn('compress size', str(e.exception).lower())
999
1000        # zip64 compress size present, no fields in extra, expecting one,
1001        # equals missing compress size.
1002        missing_compress_size_extra = self.make_zip64_file(
1003            compress_size_64_set=True,
1004        )
1005        with self.assertRaises(zipfile.BadZipFile) as e:
1006            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
1007        self.assertIn('compress size', str(e.exception).lower())
1008
1009        # zip64 file size present, zip64 compress size present, zip64 header
1010        # offset present, two fields in extra, expecting three, equals missing
1011        # header offset
1012        missing_header_offset_extra = self.make_zip64_file(
1013            file_size_64_set=True,
1014            file_size_extra=True,
1015            compress_size_64_set=True,
1016            compress_size_extra=True,
1017            header_offset_64_set=True,
1018        )
1019        with self.assertRaises(zipfile.BadZipFile) as e:
1020            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1021        self.assertIn('header offset', str(e.exception).lower())
1022
1023        # zip64 compress size present, zip64 header offset present, one field
1024        # in extra, expecting two, equals missing header offset
1025        missing_header_offset_extra = self.make_zip64_file(
1026            file_size_64_set=False,
1027            compress_size_64_set=True,
1028            compress_size_extra=True,
1029            header_offset_64_set=True,
1030        )
1031        with self.assertRaises(zipfile.BadZipFile) as e:
1032            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1033        self.assertIn('header offset', str(e.exception).lower())
1034
1035        # zip64 file size present, zip64 header offset present, one field in
1036        # extra, expecting two, equals missing header offset
1037        missing_header_offset_extra = self.make_zip64_file(
1038            file_size_64_set=True,
1039            file_size_extra=True,
1040            compress_size_64_set=False,
1041            header_offset_64_set=True,
1042        )
1043        with self.assertRaises(zipfile.BadZipFile) as e:
1044            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1045        self.assertIn('header offset', str(e.exception).lower())
1046
1047        # zip64 header offset present, no fields in extra, expecting one,
1048        # equals missing header offset
1049        missing_header_offset_extra = self.make_zip64_file(
1050            file_size_64_set=False,
1051            compress_size_64_set=False,
1052            header_offset_64_set=True,
1053        )
1054        with self.assertRaises(zipfile.BadZipFile) as e:
1055            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1056        self.assertIn('header offset', str(e.exception).lower())
1057
1058    def test_generated_valid_zip64_extra(self):
1059        # These values are what is set in the make_zip64_file method.
1060        expected_file_size = 8
1061        expected_compress_size = 8
1062        expected_header_offset = 0
1063        expected_content = b"test1234"
1064
1065        # Loop through the various valid combinations of zip64 masks
1066        # present and extra fields present.
1067        params = (
1068            {"file_size_64_set": True, "file_size_extra": True},
1069            {"compress_size_64_set": True, "compress_size_extra": True},
1070            {"header_offset_64_set": True, "header_offset_extra": True},
1071        )
1072
1073        for r in range(1, len(params) + 1):
1074            for combo in itertools.combinations(params, r):
1075                kwargs = {}
1076                for c in combo:
1077                    kwargs.update(c)
1078                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1079                    zinfo = zf.infolist()[0]
1080                    self.assertEqual(zinfo.file_size, expected_file_size)
1081                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1082                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1083                    self.assertEqual(zf.read(zinfo), expected_content)
1084
1085    def test_force_zip64(self):
1086        """Test that forcing zip64 extensions correctly notes this in the zip file"""
1087
1088        # GH-103861 describes an issue where forcing a small file to use zip64
1089        # extensions would add a zip64 extra record, but not change the data
1090        # sizes to 0xFFFFFFFF to indicate to the extractor that the zip64
1091        # record should be read. Additionally, it would not set the required
1092        # version to indicate that zip64 extensions are required to extract it.
1093        # This test replicates the situation and reads the raw data to specifically ensure:
1094        #  - The required extract version is always >= ZIP64_VERSION
1095        #  - The compressed and uncompressed size in the file headers are both
1096        #     0xFFFFFFFF (ie. point to zip64 record)
1097        #  - The zip64 record is provided and has the correct sizes in it
1098        # Other aspects of the zip are checked as well, but verifying the above is the main goal.
1099        # Because this is hard to verify by parsing the data as a zip, the raw
1100        # bytes are checked to ensure that they line up with the zip spec.
1101        # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT
1102        # The relevent sections for this test are:
1103        #  - 4.3.7 for local file header
1104        #  - 4.5.3 for zip64 extra field
1105
1106        data = io.BytesIO()
1107        with zipfile.ZipFile(data, mode="w", allowZip64=True) as zf:
1108            with zf.open("text.txt", mode="w", force_zip64=True) as zi:
1109                zi.write(b"_")
1110
1111        zipdata = data.getvalue()
1112
1113        # pull out and check zip information
1114        (
1115            header, vers, os, flags, comp, csize, usize, fn_len,
1116            ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, cd_sig
1117        ) = struct.unpack("<4sBBHH8xIIHH8shhQQx4s", zipdata[:63])
1118
1119        self.assertEqual(header, b"PK\x03\x04")  # local file header
1120        self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION)  # requires zip64 to extract
1121        self.assertEqual(os, 0)  # compatible with MS-DOS
1122        self.assertEqual(flags, 0)  # no flags
1123        self.assertEqual(comp, 0)  # compression method = stored
1124        self.assertEqual(csize, 0xFFFFFFFF)  # sizes are in zip64 extra
1125        self.assertEqual(usize, 0xFFFFFFFF)
1126        self.assertEqual(fn_len, 8)  # filename len
1127        self.assertEqual(ex_total_len, 20)  # size of extra records
1128        self.assertEqual(ex_id, 1)  # Zip64 extra record
1129        self.assertEqual(ex_len, 16)  # 16 bytes of data
1130        self.assertEqual(ex_usize, 1)  # uncompressed size
1131        self.assertEqual(ex_csize, 1)  # compressed size
1132        self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next
1133
1134        z = zipfile.ZipFile(io.BytesIO(zipdata))
1135        zinfos = z.infolist()
1136        self.assertEqual(len(zinfos), 1)
1137        self.assertGreaterEqual(zinfos[0].extract_version, zipfile.ZIP64_VERSION)  # requires zip64 to extract
1138
1139    def test_unseekable_zip_unknown_filesize(self):
1140        """Test that creating a zip with/without seeking will raise a RuntimeError if zip64 was required but not used"""
1141
1142        def make_zip(fp):
1143            with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf:
1144                with zf.open("text.txt", mode="w", force_zip64=False) as zi:
1145                    zi.write(b"_" * (zipfile.ZIP64_LIMIT + 1))
1146
1147        self.assertRaises(RuntimeError, make_zip, io.BytesIO())
1148        self.assertRaises(RuntimeError, make_zip, Unseekable(io.BytesIO()))
1149
1150    def test_zip64_required_not_allowed_fail(self):
1151        """Test that trying to add a large file to a zip that doesn't allow zip64 extensions fails on add"""
1152        def make_zip(fp):
1153            with zipfile.ZipFile(fp, mode="w", allowZip64=False) as zf:
1154                # pretend zipfile.ZipInfo.from_file was used to get the name and filesize
1155                info = zipfile.ZipInfo("text.txt")
1156                info.file_size = zipfile.ZIP64_LIMIT + 1
1157                zf.open(info, mode="w")
1158
1159        self.assertRaises(zipfile.LargeZipFile, make_zip, io.BytesIO())
1160        self.assertRaises(zipfile.LargeZipFile, make_zip, Unseekable(io.BytesIO()))
1161
1162    def test_unseekable_zip_known_filesize(self):
1163        """Test that creating a zip without seeking will use zip64 extensions if the file size is provided up-front"""
1164
1165        # This test ensures that the zip will use a zip64 data descriptor (same
1166        # as a regular data descriptor except the sizes are 8 bytes instead of
1167        # 4) record to communicate the size of a file if the zip is being
1168        # written to an unseekable stream.
1169        # Because this sort of thing is hard to verify by parsing the data back
1170        # in as a zip, this test looks at the raw bytes created to ensure that
1171        # the correct data has been generated.
1172        # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT
1173        # The relevent sections for this test are:
1174        #  - 4.3.7 for local file header
1175        #  - 4.3.9 for the data descriptor
1176        #  - 4.5.3 for zip64 extra field
1177
1178        file_size = zipfile.ZIP64_LIMIT + 1
1179
1180        def make_zip(fp):
1181            with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf:
1182                # pretend zipfile.ZipInfo.from_file was used to get the name and filesize
1183                info = zipfile.ZipInfo("text.txt")
1184                info.file_size = file_size
1185                with zf.open(info, mode="w", force_zip64=False) as zi:
1186                    zi.write(b"_" * file_size)
1187            return fp
1188
1189        # check seekable file information
1190        seekable_data = make_zip(io.BytesIO()).getvalue()
1191        (
1192            header, vers, os, flags, comp, csize, usize, fn_len,
1193            ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize,
1194            cd_sig
1195        ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s".format(file_size), seekable_data[:62 + file_size])
1196
1197        self.assertEqual(header, b"PK\x03\x04")  # local file header
1198        self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION)  # requires zip64 to extract
1199        self.assertEqual(os, 0)  # compatible with MS-DOS
1200        self.assertEqual(flags, 0)  # no flags set
1201        self.assertEqual(comp, 0)  # compression method = stored
1202        self.assertEqual(csize, 0xFFFFFFFF)  # sizes are in zip64 extra
1203        self.assertEqual(usize, 0xFFFFFFFF)
1204        self.assertEqual(fn_len, 8)  # filename len
1205        self.assertEqual(ex_total_len, 20)  # size of extra records
1206        self.assertEqual(ex_id, 1)  # Zip64 extra record
1207        self.assertEqual(ex_len, 16)  # 16 bytes of data
1208        self.assertEqual(ex_usize, file_size)  # uncompressed size
1209        self.assertEqual(ex_csize, file_size)  # compressed size
1210        self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next
1211
1212        # check unseekable file information
1213        unseekable_data = make_zip(Unseekable(io.BytesIO())).fp.getvalue()
1214        (
1215            header, vers, os, flags, comp, csize, usize, fn_len,
1216            ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize,
1217            dd_header, dd_usize, dd_csize, cd_sig
1218        ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s4xQQ4s".format(file_size), unseekable_data[:86 + file_size])
1219
1220        self.assertEqual(header, b"PK\x03\x04")  # local file header
1221        self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION)  # requires zip64 to extract
1222        self.assertEqual(os, 0)  # compatible with MS-DOS
1223        self.assertEqual("{:b}".format(flags), "1000")  # streaming flag set
1224        self.assertEqual(comp, 0)  # compression method = stored
1225        self.assertEqual(csize, 0xFFFFFFFF)  # sizes are in zip64 extra
1226        self.assertEqual(usize, 0xFFFFFFFF)
1227        self.assertEqual(fn_len, 8)  # filename len
1228        self.assertEqual(ex_total_len, 20)  # size of extra records
1229        self.assertEqual(ex_id, 1)  # Zip64 extra record
1230        self.assertEqual(ex_len, 16)  # 16 bytes of data
1231        self.assertEqual(ex_usize, 0)  # uncompressed size - 0 to defer to data descriptor
1232        self.assertEqual(ex_csize, 0)  # compressed size - 0 to defer to data descriptor
1233        self.assertEqual(dd_header, b"PK\07\x08")  # data descriptor
1234        self.assertEqual(dd_usize, file_size)  # file size (8 bytes because zip64)
1235        self.assertEqual(dd_csize, file_size)  # compressed size (8 bytes because zip64)
1236        self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next
1237
1238
1239@requires_zlib()
1240class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1241                                   unittest.TestCase):
1242    compression = zipfile.ZIP_DEFLATED
1243
1244@requires_bz2()
1245class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1246                                 unittest.TestCase):
1247    compression = zipfile.ZIP_BZIP2
1248
1249@requires_lzma()
1250class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1251                                unittest.TestCase):
1252    compression = zipfile.ZIP_LZMA
1253
1254
1255class AbstractWriterTests:
1256
1257    def tearDown(self):
1258        unlink(TESTFN2)
1259
1260    def test_close_after_close(self):
1261        data = b'content'
1262        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1263            w = zipf.open('test', 'w')
1264            w.write(data)
1265            w.close()
1266            self.assertTrue(w.closed)
1267            w.close()
1268            self.assertTrue(w.closed)
1269            self.assertEqual(zipf.read('test'), data)
1270
1271    def test_write_after_close(self):
1272        data = b'content'
1273        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1274            w = zipf.open('test', 'w')
1275            w.write(data)
1276            w.close()
1277            self.assertTrue(w.closed)
1278            self.assertRaises(ValueError, w.write, b'')
1279            self.assertEqual(zipf.read('test'), data)
1280
1281    def test_issue44439(self):
1282        q = array.array('Q', [1, 2, 3, 4, 5])
1283        LENGTH = len(q) * q.itemsize
1284        with zipfile.ZipFile(io.BytesIO(), 'w', self.compression) as zip:
1285            with zip.open('data', 'w') as data:
1286                self.assertEqual(data.write(q), LENGTH)
1287            self.assertEqual(zip.getinfo('data').file_size, LENGTH)
1288
1289class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1290    compression = zipfile.ZIP_STORED
1291
1292@requires_zlib()
1293class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1294    compression = zipfile.ZIP_DEFLATED
1295
1296@requires_bz2()
1297class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1298    compression = zipfile.ZIP_BZIP2
1299
1300@requires_lzma()
1301class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1302    compression = zipfile.ZIP_LZMA
1303
1304
1305class PyZipFileTests(unittest.TestCase):
1306    def assertCompiledIn(self, name, namelist):
1307        if name + 'o' not in namelist:
1308            self.assertIn(name + 'c', namelist)
1309
1310    def requiresWriteAccess(self, path):
1311        # effective_ids unavailable on windows
1312        if not os.access(path, os.W_OK,
1313                         effective_ids=os.access in os.supports_effective_ids):
1314            self.skipTest('requires write access to the installed location')
1315        filename = os.path.join(path, 'test_zipfile.try')
1316        try:
1317            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1318            os.close(fd)
1319        except Exception:
1320            self.skipTest('requires write access to the installed location')
1321        unlink(filename)
1322
1323    def test_write_pyfile(self):
1324        self.requiresWriteAccess(os.path.dirname(__file__))
1325        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1326            fn = __file__
1327            if fn.endswith('.pyc'):
1328                path_split = fn.split(os.sep)
1329                if os.altsep is not None:
1330                    path_split.extend(fn.split(os.altsep))
1331                if '__pycache__' in path_split:
1332                    fn = importlib.util.source_from_cache(fn)
1333                else:
1334                    fn = fn[:-1]
1335
1336            zipfp.writepy(fn)
1337
1338            bn = os.path.basename(fn)
1339            self.assertNotIn(bn, zipfp.namelist())
1340            self.assertCompiledIn(bn, zipfp.namelist())
1341
1342        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1343            fn = __file__
1344            if fn.endswith('.pyc'):
1345                fn = fn[:-1]
1346
1347            zipfp.writepy(fn, "testpackage")
1348
1349            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1350            self.assertNotIn(bn, zipfp.namelist())
1351            self.assertCompiledIn(bn, zipfp.namelist())
1352
1353    def test_write_python_package(self):
1354        import email
1355        packagedir = os.path.dirname(email.__file__)
1356        self.requiresWriteAccess(packagedir)
1357
1358        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1359            zipfp.writepy(packagedir)
1360
1361            # Check for a couple of modules at different levels of the
1362            # hierarchy
1363            names = zipfp.namelist()
1364            self.assertCompiledIn('email/__init__.py', names)
1365            self.assertCompiledIn('email/mime/text.py', names)
1366
1367    def test_write_filtered_python_package(self):
1368        import test
1369        packagedir = os.path.dirname(test.__file__)
1370        self.requiresWriteAccess(packagedir)
1371
1372        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1373
1374            # first make sure that the test folder gives error messages
1375            # (on the badsyntax_... files)
1376            with captured_stdout() as reportSIO:
1377                zipfp.writepy(packagedir)
1378            reportStr = reportSIO.getvalue()
1379            self.assertTrue('SyntaxError' in reportStr)
1380
1381            # then check that the filter works on the whole package
1382            with captured_stdout() as reportSIO:
1383                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1384            reportStr = reportSIO.getvalue()
1385            self.assertTrue('SyntaxError' not in reportStr)
1386
1387            # then check that the filter works on individual files
1388            def filter(path):
1389                return not os.path.basename(path).startswith("bad")
1390            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1391                zipfp.writepy(packagedir, filterfunc=filter)
1392            reportStr = reportSIO.getvalue()
1393            if reportStr:
1394                print(reportStr)
1395            self.assertTrue('SyntaxError' not in reportStr)
1396
1397    def test_write_with_optimization(self):
1398        import email
1399        packagedir = os.path.dirname(email.__file__)
1400        self.requiresWriteAccess(packagedir)
1401        optlevel = 1 if __debug__ else 0
1402        ext = '.pyc'
1403
1404        with TemporaryFile() as t, \
1405             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1406            zipfp.writepy(packagedir)
1407
1408            names = zipfp.namelist()
1409            self.assertIn('email/__init__' + ext, names)
1410            self.assertIn('email/mime/text' + ext, names)
1411
1412    def test_write_python_directory(self):
1413        os.mkdir(TESTFN2)
1414        try:
1415            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1416                fp.write("print(42)\n")
1417
1418            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1419                fp.write("print(42 * 42)\n")
1420
1421            with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp:
1422                fp.write("bla bla bla\n")
1423
1424            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1425                zipfp.writepy(TESTFN2)
1426
1427                names = zipfp.namelist()
1428                self.assertCompiledIn('mod1.py', names)
1429                self.assertCompiledIn('mod2.py', names)
1430                self.assertNotIn('mod2.txt', names)
1431
1432        finally:
1433            rmtree(TESTFN2)
1434
1435    def test_write_python_directory_filtered(self):
1436        os.mkdir(TESTFN2)
1437        try:
1438            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1439                fp.write("print(42)\n")
1440
1441            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1442                fp.write("print(42 * 42)\n")
1443
1444            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1445                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1446                                                  not fn.endswith('mod2.py'))
1447
1448                names = zipfp.namelist()
1449                self.assertCompiledIn('mod1.py', names)
1450                self.assertNotIn('mod2.py', names)
1451
1452        finally:
1453            rmtree(TESTFN2)
1454
1455    def test_write_non_pyfile(self):
1456        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1457            with open(TESTFN, 'w', encoding='utf-8') as f:
1458                f.write('most definitely not a python file')
1459            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1460            unlink(TESTFN)
1461
1462    def test_write_pyfile_bad_syntax(self):
1463        os.mkdir(TESTFN2)
1464        try:
1465            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1466                fp.write("Bad syntax in python file\n")
1467
1468            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1469                # syntax errors are printed to stdout
1470                with captured_stdout() as s:
1471                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1472
1473                self.assertIn("SyntaxError", s.getvalue())
1474
1475                # as it will not have compiled the python file, it will
1476                # include the .py file not .pyc
1477                names = zipfp.namelist()
1478                self.assertIn('mod1.py', names)
1479                self.assertNotIn('mod1.pyc', names)
1480
1481        finally:
1482            rmtree(TESTFN2)
1483
1484    def test_write_pathlike(self):
1485        os.mkdir(TESTFN2)
1486        try:
1487            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1488                fp.write("print(42)\n")
1489
1490            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1491                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1492                names = zipfp.namelist()
1493                self.assertCompiledIn('mod1.py', names)
1494        finally:
1495            rmtree(TESTFN2)
1496
1497
1498class ExtractTests(unittest.TestCase):
1499
1500    def make_test_file(self):
1501        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1502            for fpath, fdata in SMALL_TEST_DATA:
1503                zipfp.writestr(fpath, fdata)
1504
1505    def test_extract(self):
1506        with temp_cwd():
1507            self.make_test_file()
1508            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1509                for fpath, fdata in SMALL_TEST_DATA:
1510                    writtenfile = zipfp.extract(fpath)
1511
1512                    # make sure it was written to the right place
1513                    correctfile = os.path.join(os.getcwd(), fpath)
1514                    correctfile = os.path.normpath(correctfile)
1515
1516                    self.assertEqual(writtenfile, correctfile)
1517
1518                    # make sure correct data is in correct file
1519                    with open(writtenfile, "rb") as f:
1520                        self.assertEqual(fdata.encode(), f.read())
1521
1522                    unlink(writtenfile)
1523
1524    def _test_extract_with_target(self, target):
1525        self.make_test_file()
1526        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1527            for fpath, fdata in SMALL_TEST_DATA:
1528                writtenfile = zipfp.extract(fpath, target)
1529
1530                # make sure it was written to the right place
1531                correctfile = os.path.join(target, fpath)
1532                correctfile = os.path.normpath(correctfile)
1533                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1534
1535                # make sure correct data is in correct file
1536                with open(writtenfile, "rb") as f:
1537                    self.assertEqual(fdata.encode(), f.read())
1538
1539                unlink(writtenfile)
1540
1541        unlink(TESTFN2)
1542
1543    def test_extract_with_target(self):
1544        with temp_dir() as extdir:
1545            self._test_extract_with_target(extdir)
1546
1547    def test_extract_with_target_pathlike(self):
1548        with temp_dir() as extdir:
1549            self._test_extract_with_target(pathlib.Path(extdir))
1550
1551    def test_extract_all(self):
1552        with temp_cwd():
1553            self.make_test_file()
1554            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1555                zipfp.extractall()
1556                for fpath, fdata in SMALL_TEST_DATA:
1557                    outfile = os.path.join(os.getcwd(), fpath)
1558
1559                    with open(outfile, "rb") as f:
1560                        self.assertEqual(fdata.encode(), f.read())
1561
1562                    unlink(outfile)
1563
1564    def _test_extract_all_with_target(self, target):
1565        self.make_test_file()
1566        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1567            zipfp.extractall(target)
1568            for fpath, fdata in SMALL_TEST_DATA:
1569                outfile = os.path.join(target, fpath)
1570
1571                with open(outfile, "rb") as f:
1572                    self.assertEqual(fdata.encode(), f.read())
1573
1574                unlink(outfile)
1575
1576        unlink(TESTFN2)
1577
1578    def test_extract_all_with_target(self):
1579        with temp_dir() as extdir:
1580            self._test_extract_all_with_target(extdir)
1581
1582    def test_extract_all_with_target_pathlike(self):
1583        with temp_dir() as extdir:
1584            self._test_extract_all_with_target(pathlib.Path(extdir))
1585
1586    def check_file(self, filename, content):
1587        self.assertTrue(os.path.isfile(filename))
1588        with open(filename, 'rb') as f:
1589            self.assertEqual(f.read(), content)
1590
1591    def test_sanitize_windows_name(self):
1592        san = zipfile.ZipFile._sanitize_windows_name
1593        # Passing pathsep in allows this test to work regardless of platform.
1594        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1595        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1596        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1597
1598    def test_extract_hackers_arcnames_common_cases(self):
1599        common_hacknames = [
1600            ('../foo/bar', 'foo/bar'),
1601            ('foo/../bar', 'foo/bar'),
1602            ('foo/../../bar', 'foo/bar'),
1603            ('foo/bar/..', 'foo/bar'),
1604            ('./../foo/bar', 'foo/bar'),
1605            ('/foo/bar', 'foo/bar'),
1606            ('/foo/../bar', 'foo/bar'),
1607            ('/foo/../../bar', 'foo/bar'),
1608        ]
1609        self._test_extract_hackers_arcnames(common_hacknames)
1610
1611    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1612    def test_extract_hackers_arcnames_windows_only(self):
1613        """Test combination of path fixing and windows name sanitization."""
1614        windows_hacknames = [
1615            (r'..\foo\bar', 'foo/bar'),
1616            (r'..\/foo\/bar', 'foo/bar'),
1617            (r'foo/\..\/bar', 'foo/bar'),
1618            (r'foo\/../\bar', 'foo/bar'),
1619            (r'C:foo/bar', 'foo/bar'),
1620            (r'C:/foo/bar', 'foo/bar'),
1621            (r'C://foo/bar', 'foo/bar'),
1622            (r'C:\foo\bar', 'foo/bar'),
1623            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1624            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1625            (r'///conky/mountpoint/foo/bar', 'mountpoint/foo/bar'),
1626            (r'\\\conky\mountpoint\foo\bar', 'mountpoint/foo/bar'),
1627            (r'//conky//mountpoint/foo/bar', 'mountpoint/foo/bar'),
1628            (r'\\conky\\mountpoint\foo\bar', 'mountpoint/foo/bar'),
1629            (r'//?/C:/foo/bar', 'foo/bar'),
1630            (r'\\?\C:\foo\bar', 'foo/bar'),
1631            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1632            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1633            ('../../foo../../ba..r', 'foo/ba..r'),
1634        ]
1635        self._test_extract_hackers_arcnames(windows_hacknames)
1636
1637    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1638    def test_extract_hackers_arcnames_posix_only(self):
1639        posix_hacknames = [
1640            ('//foo/bar', 'foo/bar'),
1641            ('../../foo../../ba..r', 'foo../ba..r'),
1642            (r'foo/..\bar', r'foo/..\bar'),
1643        ]
1644        self._test_extract_hackers_arcnames(posix_hacknames)
1645
1646    def _test_extract_hackers_arcnames(self, hacknames):
1647        for arcname, fixedname in hacknames:
1648            content = b'foobar' + arcname.encode()
1649            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1650                zinfo = zipfile.ZipInfo()
1651                # preserve backslashes
1652                zinfo.filename = arcname
1653                zinfo.external_attr = 0o600 << 16
1654                zipfp.writestr(zinfo, content)
1655
1656            arcname = arcname.replace(os.sep, "/")
1657            targetpath = os.path.join('target', 'subdir', 'subsub')
1658            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1659
1660            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1661                writtenfile = zipfp.extract(arcname, targetpath)
1662                self.assertEqual(writtenfile, correctfile,
1663                                 msg='extract %r: %r != %r' %
1664                                 (arcname, writtenfile, correctfile))
1665            self.check_file(correctfile, content)
1666            rmtree('target')
1667
1668            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1669                zipfp.extractall(targetpath)
1670            self.check_file(correctfile, content)
1671            rmtree('target')
1672
1673            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1674
1675            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1676                writtenfile = zipfp.extract(arcname)
1677                self.assertEqual(writtenfile, correctfile,
1678                                 msg="extract %r" % arcname)
1679            self.check_file(correctfile, content)
1680            rmtree(fixedname.split('/')[0])
1681
1682            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1683                zipfp.extractall()
1684            self.check_file(correctfile, content)
1685            rmtree(fixedname.split('/')[0])
1686
1687            unlink(TESTFN2)
1688
1689
1690class OtherTests(unittest.TestCase):
1691    def test_open_via_zip_info(self):
1692        # Create the ZIP archive
1693        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1694            zipfp.writestr("name", "foo")
1695            with self.assertWarns(UserWarning):
1696                zipfp.writestr("name", "bar")
1697            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1698
1699        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1700            infos = zipfp.infolist()
1701            data = b""
1702            for info in infos:
1703                with zipfp.open(info) as zipopen:
1704                    data += zipopen.read()
1705            self.assertIn(data, {b"foobar", b"barfoo"})
1706            data = b""
1707            for info in infos:
1708                data += zipfp.read(info)
1709            self.assertIn(data, {b"foobar", b"barfoo"})
1710
1711    def test_writestr_extended_local_header_issue1202(self):
1712        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1713            for data in 'abcdefghijklmnop':
1714                zinfo = zipfile.ZipInfo(data)
1715                zinfo.flag_bits |= zipfile._MASK_USE_DATA_DESCRIPTOR  # Include an extended local header.
1716                orig_zip.writestr(zinfo, data)
1717
1718    def test_close(self):
1719        """Check that the zipfile is closed after the 'with' block."""
1720        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1721            for fpath, fdata in SMALL_TEST_DATA:
1722                zipfp.writestr(fpath, fdata)
1723                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1724        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1725
1726        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1727            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1728        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1729
1730    def test_close_on_exception(self):
1731        """Check that the zipfile is closed if an exception is raised in the
1732        'with' block."""
1733        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1734            for fpath, fdata in SMALL_TEST_DATA:
1735                zipfp.writestr(fpath, fdata)
1736
1737        try:
1738            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1739                raise zipfile.BadZipFile()
1740        except zipfile.BadZipFile:
1741            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1742
1743    def test_unsupported_version(self):
1744        # File has an extract_version of 120
1745        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1746                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1747                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1748                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1749                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1750
1751        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1752                          io.BytesIO(data), 'r')
1753
1754    @requires_zlib()
1755    def test_read_unicode_filenames(self):
1756        # bug #10801
1757        fname = findfile('zip_cp437_header.zip')
1758        with zipfile.ZipFile(fname) as zipfp:
1759            for name in zipfp.namelist():
1760                zipfp.open(name).close()
1761
1762    def test_write_unicode_filenames(self):
1763        with zipfile.ZipFile(TESTFN, "w") as zf:
1764            zf.writestr("foo.txt", "Test for unicode filename")
1765            zf.writestr("\xf6.txt", "Test for unicode filename")
1766            self.assertIsInstance(zf.infolist()[0].filename, str)
1767
1768        with zipfile.ZipFile(TESTFN, "r") as zf:
1769            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1770            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1771
1772    def test_read_after_write_unicode_filenames(self):
1773        with zipfile.ZipFile(TESTFN2, 'w') as zipfp:
1774            zipfp.writestr('приклад', b'sample')
1775            self.assertEqual(zipfp.read('приклад'), b'sample')
1776
1777    def test_exclusive_create_zip_file(self):
1778        """Test exclusive creating a new zipfile."""
1779        unlink(TESTFN2)
1780        filename = 'testfile.txt'
1781        content = b'hello, world. this is some content.'
1782        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1783            zipfp.writestr(filename, content)
1784        with self.assertRaises(FileExistsError):
1785            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1786        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1787            self.assertEqual(zipfp.namelist(), [filename])
1788            self.assertEqual(zipfp.read(filename), content)
1789
1790    def test_create_non_existent_file_for_append(self):
1791        if os.path.exists(TESTFN):
1792            os.unlink(TESTFN)
1793
1794        filename = 'testfile.txt'
1795        content = b'hello, world. this is some content.'
1796
1797        try:
1798            with zipfile.ZipFile(TESTFN, 'a') as zf:
1799                zf.writestr(filename, content)
1800        except OSError:
1801            self.fail('Could not append data to a non-existent zip file.')
1802
1803        self.assertTrue(os.path.exists(TESTFN))
1804
1805        with zipfile.ZipFile(TESTFN, 'r') as zf:
1806            self.assertEqual(zf.read(filename), content)
1807
1808    def test_close_erroneous_file(self):
1809        # This test checks that the ZipFile constructor closes the file object
1810        # it opens if there's an error in the file.  If it doesn't, the
1811        # traceback holds a reference to the ZipFile object and, indirectly,
1812        # the file object.
1813        # On Windows, this causes the os.unlink() call to fail because the
1814        # underlying file is still open.  This is SF bug #412214.
1815        #
1816        with open(TESTFN, "w", encoding="utf-8") as fp:
1817            fp.write("this is not a legal zip file\n")
1818        try:
1819            zf = zipfile.ZipFile(TESTFN)
1820        except zipfile.BadZipFile:
1821            pass
1822
1823    def test_is_zip_erroneous_file(self):
1824        """Check that is_zipfile() correctly identifies non-zip files."""
1825        # - passing a filename
1826        with open(TESTFN, "w", encoding='utf-8') as fp:
1827            fp.write("this is not a legal zip file\n")
1828        self.assertFalse(zipfile.is_zipfile(TESTFN))
1829        # - passing a path-like object
1830        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1831        # - passing a file object
1832        with open(TESTFN, "rb") as fp:
1833            self.assertFalse(zipfile.is_zipfile(fp))
1834        # - passing a file-like object
1835        fp = io.BytesIO()
1836        fp.write(b"this is not a legal zip file\n")
1837        self.assertFalse(zipfile.is_zipfile(fp))
1838        fp.seek(0, 0)
1839        self.assertFalse(zipfile.is_zipfile(fp))
1840
1841    def test_damaged_zipfile(self):
1842        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1843        # - Create a valid zip file
1844        fp = io.BytesIO()
1845        with zipfile.ZipFile(fp, mode="w") as zipf:
1846            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1847        zipfiledata = fp.getvalue()
1848
1849        # - Now create copies of it missing the last N bytes and make sure
1850        #   a BadZipFile exception is raised when we try to open it
1851        for N in range(len(zipfiledata)):
1852            fp = io.BytesIO(zipfiledata[:N])
1853            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1854
1855    def test_is_zip_valid_file(self):
1856        """Check that is_zipfile() correctly identifies zip files."""
1857        # - passing a filename
1858        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1859            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1860
1861        self.assertTrue(zipfile.is_zipfile(TESTFN))
1862        # - passing a file object
1863        with open(TESTFN, "rb") as fp:
1864            self.assertTrue(zipfile.is_zipfile(fp))
1865            fp.seek(0, 0)
1866            zip_contents = fp.read()
1867        # - passing a file-like object
1868        fp = io.BytesIO()
1869        fp.write(zip_contents)
1870        self.assertTrue(zipfile.is_zipfile(fp))
1871        fp.seek(0, 0)
1872        self.assertTrue(zipfile.is_zipfile(fp))
1873
1874    def test_non_existent_file_raises_OSError(self):
1875        # make sure we don't raise an AttributeError when a partially-constructed
1876        # ZipFile instance is finalized; this tests for regression on SF tracker
1877        # bug #403871.
1878
1879        # The bug we're testing for caused an AttributeError to be raised
1880        # when a ZipFile instance was created for a file that did not
1881        # exist; the .fp member was not initialized but was needed by the
1882        # __del__() method.  Since the AttributeError is in the __del__(),
1883        # it is ignored, but the user should be sufficiently annoyed by
1884        # the message on the output that regression will be noticed
1885        # quickly.
1886        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1887
1888    def test_empty_file_raises_BadZipFile(self):
1889        f = open(TESTFN, 'w', encoding='utf-8')
1890        f.close()
1891        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1892
1893        with open(TESTFN, 'w', encoding='utf-8') as fp:
1894            fp.write("short file")
1895        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1896
1897    def test_negative_central_directory_offset_raises_BadZipFile(self):
1898        # Zip file containing an empty EOCD record
1899        buffer = bytearray(b'PK\x05\x06' + b'\0'*18)
1900
1901        # Set the size of the central directory bytes to become 1,
1902        # causing the central directory offset to become negative
1903        for dirsize in 1, 2**32-1:
1904            buffer[12:16] = struct.pack('<L', dirsize)
1905            f = io.BytesIO(buffer)
1906            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, f)
1907
1908    def test_closed_zip_raises_ValueError(self):
1909        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1910        data = io.BytesIO()
1911        with zipfile.ZipFile(data, mode="w") as zipf:
1912            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1913
1914        # This is correct; calling .read on a closed ZipFile should raise
1915        # a ValueError, and so should calling .testzip.  An earlier
1916        # version of .testzip would swallow this exception (and any other)
1917        # and report that the first file in the archive was corrupt.
1918        self.assertRaises(ValueError, zipf.read, "foo.txt")
1919        self.assertRaises(ValueError, zipf.open, "foo.txt")
1920        self.assertRaises(ValueError, zipf.testzip)
1921        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1922        with open(TESTFN, 'w', encoding='utf-8') as f:
1923            f.write('zipfile test data')
1924        self.assertRaises(ValueError, zipf.write, TESTFN)
1925
1926    def test_bad_constructor_mode(self):
1927        """Check that bad modes passed to ZipFile constructor are caught."""
1928        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1929
1930    def test_bad_open_mode(self):
1931        """Check that bad modes passed to ZipFile.open are caught."""
1932        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1933            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1934
1935        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1936            # read the data to make sure the file is there
1937            zipf.read("foo.txt")
1938            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1939            # universal newlines support is removed
1940            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1941            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1942
1943    def test_read0(self):
1944        """Check that calling read(0) on a ZipExtFile object returns an empty
1945        string and doesn't advance file pointer."""
1946        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1947            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1948            # read the data to make sure the file is there
1949            with zipf.open("foo.txt") as f:
1950                for i in range(FIXEDTEST_SIZE):
1951                    self.assertEqual(f.read(0), b'')
1952
1953                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1954
1955    def test_open_non_existent_item(self):
1956        """Check that attempting to call open() for an item that doesn't
1957        exist in the archive raises a RuntimeError."""
1958        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1959            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1960
1961    def test_bad_compression_mode(self):
1962        """Check that bad compression methods passed to ZipFile.open are
1963        caught."""
1964        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1965
1966    def test_unsupported_compression(self):
1967        # data is declared as shrunk, but actually deflated
1968        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1969                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1970                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1971                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1972                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1973                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1974        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1975            self.assertRaises(NotImplementedError, zipf.open, 'x')
1976
1977    def test_null_byte_in_filename(self):
1978        """Check that a filename containing a null byte is properly
1979        terminated."""
1980        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1981            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1982            self.assertEqual(zipf.namelist(), ['foo.txt'])
1983
1984    def test_struct_sizes(self):
1985        """Check that ZIP internal structure sizes are calculated correctly."""
1986        self.assertEqual(zipfile.sizeEndCentDir, 22)
1987        self.assertEqual(zipfile.sizeCentralDir, 46)
1988        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1989        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1990
1991    def test_comments(self):
1992        """Check that comments on the archive are handled properly."""
1993
1994        # check default comment is empty
1995        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1996            self.assertEqual(zipf.comment, b'')
1997            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1998
1999        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
2000            self.assertEqual(zipfr.comment, b'')
2001
2002        # check a simple short comment
2003        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
2004        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2005            zipf.comment = comment
2006            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2007        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
2008            self.assertEqual(zipf.comment, comment)
2009
2010        # check a comment of max length
2011        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
2012        comment2 = comment2.encode("ascii")
2013        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2014            zipf.comment = comment2
2015            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2016
2017        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
2018            self.assertEqual(zipfr.comment, comment2)
2019
2020        # check a comment that is too long is truncated
2021        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2022            with self.assertWarns(UserWarning):
2023                zipf.comment = comment2 + b'oops'
2024            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2025        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
2026            self.assertEqual(zipfr.comment, comment2)
2027
2028        # check that comments are correctly modified in append mode
2029        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
2030            zipf.comment = b"original comment"
2031            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2032        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
2033            zipf.comment = b"an updated comment"
2034        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
2035            self.assertEqual(zipf.comment, b"an updated comment")
2036
2037        # check that comments are correctly shortened in append mode
2038        # and the file is indeed truncated
2039        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
2040            zipf.comment = b"original comment that's longer"
2041            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2042        original_zip_size = os.path.getsize(TESTFN)
2043        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
2044            zipf.comment = b"shorter comment"
2045        self.assertTrue(original_zip_size > os.path.getsize(TESTFN))
2046        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
2047            self.assertEqual(zipf.comment, b"shorter comment")
2048
2049    def test_unicode_comment(self):
2050        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
2051            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2052            with self.assertRaises(TypeError):
2053                zipf.comment = "this is an error"
2054
2055    def test_change_comment_in_empty_archive(self):
2056        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
2057            self.assertFalse(zipf.filelist)
2058            zipf.comment = b"this is a comment"
2059        with zipfile.ZipFile(TESTFN, "r") as zipf:
2060            self.assertEqual(zipf.comment, b"this is a comment")
2061
2062    def test_change_comment_in_nonempty_archive(self):
2063        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
2064            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2065        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
2066            self.assertTrue(zipf.filelist)
2067            zipf.comment = b"this is a comment"
2068        with zipfile.ZipFile(TESTFN, "r") as zipf:
2069            self.assertEqual(zipf.comment, b"this is a comment")
2070
2071    def test_empty_zipfile(self):
2072        # Check that creating a file in 'w' or 'a' mode and closing without
2073        # adding any files to the archives creates a valid empty ZIP file
2074        zipf = zipfile.ZipFile(TESTFN, mode="w")
2075        zipf.close()
2076        try:
2077            zipf = zipfile.ZipFile(TESTFN, mode="r")
2078        except zipfile.BadZipFile:
2079            self.fail("Unable to create empty ZIP file in 'w' mode")
2080
2081        zipf = zipfile.ZipFile(TESTFN, mode="a")
2082        zipf.close()
2083        try:
2084            zipf = zipfile.ZipFile(TESTFN, mode="r")
2085        except:
2086            self.fail("Unable to create empty ZIP file in 'a' mode")
2087
2088    def test_open_empty_file(self):
2089        # Issue 1710703: Check that opening a file with less than 22 bytes
2090        # raises a BadZipFile exception (rather than the previously unhelpful
2091        # OSError)
2092        f = open(TESTFN, 'w', encoding='utf-8')
2093        f.close()
2094        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
2095
2096    def test_create_zipinfo_before_1980(self):
2097        self.assertRaises(ValueError,
2098                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
2099
2100    def test_create_empty_zipinfo_repr(self):
2101        """Before bpo-26185, repr() on empty ZipInfo object was failing."""
2102        zi = zipfile.ZipInfo(filename="empty")
2103        self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>")
2104
2105    def test_create_empty_zipinfo_default_attributes(self):
2106        """Ensure all required attributes are set."""
2107        zi = zipfile.ZipInfo()
2108        self.assertEqual(zi.orig_filename, "NoName")
2109        self.assertEqual(zi.filename, "NoName")
2110        self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0))
2111        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2112        self.assertEqual(zi.comment, b"")
2113        self.assertEqual(zi.extra, b"")
2114        self.assertIn(zi.create_system, (0, 3))
2115        self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION)
2116        self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION)
2117        self.assertEqual(zi.reserved, 0)
2118        self.assertEqual(zi.flag_bits, 0)
2119        self.assertEqual(zi.volume, 0)
2120        self.assertEqual(zi.internal_attr, 0)
2121        self.assertEqual(zi.external_attr, 0)
2122
2123        # Before bpo-26185, both were missing
2124        self.assertEqual(zi.file_size, 0)
2125        self.assertEqual(zi.compress_size, 0)
2126
2127    def test_zipfile_with_short_extra_field(self):
2128        """If an extra field in the header is less than 4 bytes, skip it."""
2129        zipdata = (
2130            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
2131            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
2132            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
2133            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
2134            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
2135            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
2136            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
2137        )
2138        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
2139            # testzip returns the name of the first corrupt file, or None
2140            self.assertIsNone(zipf.testzip())
2141
2142    def test_open_conflicting_handles(self):
2143        # It's only possible to open one writable file handle at a time
2144        msg1 = b"It's fun to charter an accountant!"
2145        msg2 = b"And sail the wide accountant sea"
2146        msg3 = b"To find, explore the funds offshore"
2147        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
2148            with zipf.open('foo', mode='w') as w2:
2149                w2.write(msg1)
2150            with zipf.open('bar', mode='w') as w1:
2151                with self.assertRaises(ValueError):
2152                    zipf.open('handle', mode='w')
2153                with self.assertRaises(ValueError):
2154                    zipf.open('foo', mode='r')
2155                with self.assertRaises(ValueError):
2156                    zipf.writestr('str', 'abcde')
2157                with self.assertRaises(ValueError):
2158                    zipf.write(__file__, 'file')
2159                with self.assertRaises(ValueError):
2160                    zipf.close()
2161                w1.write(msg2)
2162            with zipf.open('baz', mode='w') as w2:
2163                w2.write(msg3)
2164
2165        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
2166            self.assertEqual(zipf.read('foo'), msg1)
2167            self.assertEqual(zipf.read('bar'), msg2)
2168            self.assertEqual(zipf.read('baz'), msg3)
2169            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
2170
2171    def test_seek_tell(self):
2172        # Test seek functionality
2173        txt = b"Where's Bruce?"
2174        bloc = txt.find(b"Bruce")
2175        # Check seek on a file
2176        with zipfile.ZipFile(TESTFN, "w") as zipf:
2177            zipf.writestr("foo.txt", txt)
2178        with zipfile.ZipFile(TESTFN, "r") as zipf:
2179            with zipf.open("foo.txt", "r") as fp:
2180                fp.seek(bloc, os.SEEK_SET)
2181                self.assertEqual(fp.tell(), bloc)
2182                fp.seek(-bloc, os.SEEK_CUR)
2183                self.assertEqual(fp.tell(), 0)
2184                fp.seek(bloc, os.SEEK_CUR)
2185                self.assertEqual(fp.tell(), bloc)
2186                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2187                fp.seek(0, os.SEEK_END)
2188                self.assertEqual(fp.tell(), len(txt))
2189                fp.seek(0, os.SEEK_SET)
2190                self.assertEqual(fp.tell(), 0)
2191        # Check seek on memory file
2192        data = io.BytesIO()
2193        with zipfile.ZipFile(data, mode="w") as zipf:
2194            zipf.writestr("foo.txt", txt)
2195        with zipfile.ZipFile(data, mode="r") as zipf:
2196            with zipf.open("foo.txt", "r") as fp:
2197                fp.seek(bloc, os.SEEK_SET)
2198                self.assertEqual(fp.tell(), bloc)
2199                fp.seek(-bloc, os.SEEK_CUR)
2200                self.assertEqual(fp.tell(), 0)
2201                fp.seek(bloc, os.SEEK_CUR)
2202                self.assertEqual(fp.tell(), bloc)
2203                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2204                fp.seek(0, os.SEEK_END)
2205                self.assertEqual(fp.tell(), len(txt))
2206                fp.seek(0, os.SEEK_SET)
2207                self.assertEqual(fp.tell(), 0)
2208
2209    @requires_bz2()
2210    def test_decompress_without_3rd_party_library(self):
2211        data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2212        zip_file = io.BytesIO(data)
2213        with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf:
2214            zf.writestr('a.txt', b'a')
2215        with mock.patch('zipfile.bz2', None):
2216            with zipfile.ZipFile(zip_file) as zf:
2217                self.assertRaises(RuntimeError, zf.extract, 'a.txt')
2218
2219    def tearDown(self):
2220        unlink(TESTFN)
2221        unlink(TESTFN2)
2222
2223
2224class AbstractBadCrcTests:
2225    def test_testzip_with_bad_crc(self):
2226        """Tests that files with bad CRCs return their name from testzip."""
2227        zipdata = self.zip_with_bad_crc
2228
2229        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2230            # testzip returns the name of the first corrupt file, or None
2231            self.assertEqual('afile', zipf.testzip())
2232
2233    def test_read_with_bad_crc(self):
2234        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
2235        zipdata = self.zip_with_bad_crc
2236
2237        # Using ZipFile.read()
2238        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2239            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
2240
2241        # Using ZipExtFile.read()
2242        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2243            with zipf.open('afile', 'r') as corrupt_file:
2244                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
2245
2246        # Same with small reads (in order to exercise the buffering logic)
2247        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2248            with zipf.open('afile', 'r') as corrupt_file:
2249                corrupt_file.MIN_READ_SIZE = 2
2250                with self.assertRaises(zipfile.BadZipFile):
2251                    while corrupt_file.read(2):
2252                        pass
2253
2254
2255class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2256    compression = zipfile.ZIP_STORED
2257    zip_with_bad_crc = (
2258        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
2259        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
2260        b'ilehello,AworldP'
2261        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
2262        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
2263        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
2264        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
2265        b'\0\0/\0\0\0\0\0')
2266
2267@requires_zlib()
2268class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2269    compression = zipfile.ZIP_DEFLATED
2270    zip_with_bad_crc = (
2271        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
2272        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2273        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
2274        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
2275        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
2276        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2277        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2278        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2279
2280@requires_bz2()
2281class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2282    compression = zipfile.ZIP_BZIP2
2283    zip_with_bad_crc = (
2284        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2285        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2286        b'ileBZh91AY&SY\xd4\xa8\xca'
2287        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2288        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2289        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2290        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2291        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2292        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2293        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2294        b'\x00\x00\x00\x00')
2295
2296@requires_lzma()
2297class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2298    compression = zipfile.ZIP_LZMA
2299    zip_with_bad_crc = (
2300        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2301        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2302        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2303        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2304        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2305        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2306        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2307        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2308        b'\x00>\x00\x00\x00\x00\x00')
2309
2310
2311class DecryptionTests(unittest.TestCase):
2312    """Check that ZIP decryption works. Since the library does not
2313    support encryption at the moment, we use a pre-generated encrypted
2314    ZIP file."""
2315
2316    data = (
2317        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2318        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2319        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2320        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2321        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2322        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2323        b'\x00\x00L\x00\x00\x00\x00\x00' )
2324    data2 = (
2325        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2326        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2327        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2328        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2329        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2330        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2331        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2332        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2333
2334    plain = b'zipfile.py encryption test'
2335    plain2 = b'\x00'*512
2336
2337    def setUp(self):
2338        with open(TESTFN, "wb") as fp:
2339            fp.write(self.data)
2340        self.zip = zipfile.ZipFile(TESTFN, "r")
2341        with open(TESTFN2, "wb") as fp:
2342            fp.write(self.data2)
2343        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2344
2345    def tearDown(self):
2346        self.zip.close()
2347        os.unlink(TESTFN)
2348        self.zip2.close()
2349        os.unlink(TESTFN2)
2350
2351    def test_no_password(self):
2352        # Reading the encrypted file without password
2353        # must generate a RunTime exception
2354        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2355        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2356
2357    def test_bad_password(self):
2358        self.zip.setpassword(b"perl")
2359        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2360        self.zip2.setpassword(b"perl")
2361        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2362
2363    @requires_zlib()
2364    def test_good_password(self):
2365        self.zip.setpassword(b"python")
2366        self.assertEqual(self.zip.read("test.txt"), self.plain)
2367        self.zip2.setpassword(b"12345")
2368        self.assertEqual(self.zip2.read("zero"), self.plain2)
2369
2370    def test_unicode_password(self):
2371        expected_msg = "pwd: expected bytes, got str"
2372
2373        with self.assertRaisesRegex(TypeError, expected_msg):
2374            self.zip.setpassword("unicode")
2375
2376        with self.assertRaisesRegex(TypeError, expected_msg):
2377            self.zip.read("test.txt", "python")
2378
2379        with self.assertRaisesRegex(TypeError, expected_msg):
2380            self.zip.open("test.txt", pwd="python")
2381
2382        with self.assertRaisesRegex(TypeError, expected_msg):
2383            self.zip.extract("test.txt", pwd="python")
2384
2385        with self.assertRaisesRegex(TypeError, expected_msg):
2386            self.zip.pwd = "python"
2387            self.zip.open("test.txt")
2388
2389    def test_seek_tell(self):
2390        self.zip.setpassword(b"python")
2391        txt = self.plain
2392        test_word = b'encryption'
2393        bloc = txt.find(test_word)
2394        bloc_len = len(test_word)
2395        with self.zip.open("test.txt", "r") as fp:
2396            fp.seek(bloc, os.SEEK_SET)
2397            self.assertEqual(fp.tell(), bloc)
2398            fp.seek(-bloc, os.SEEK_CUR)
2399            self.assertEqual(fp.tell(), 0)
2400            fp.seek(bloc, os.SEEK_CUR)
2401            self.assertEqual(fp.tell(), bloc)
2402            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2403
2404            # Make sure that the second read after seeking back beyond
2405            # _readbuffer returns the same content (ie. rewind to the start of
2406            # the file to read forward to the required position).
2407            old_read_size = fp.MIN_READ_SIZE
2408            fp.MIN_READ_SIZE = 1
2409            fp._readbuffer = b''
2410            fp._offset = 0
2411            fp.seek(0, os.SEEK_SET)
2412            self.assertEqual(fp.tell(), 0)
2413            fp.seek(bloc, os.SEEK_CUR)
2414            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2415            fp.MIN_READ_SIZE = old_read_size
2416
2417            fp.seek(0, os.SEEK_END)
2418            self.assertEqual(fp.tell(), len(txt))
2419            fp.seek(0, os.SEEK_SET)
2420            self.assertEqual(fp.tell(), 0)
2421
2422            # Read the file completely to definitely call any eof integrity
2423            # checks (crc) and make sure they still pass.
2424            fp.read()
2425
2426
2427class AbstractTestsWithRandomBinaryFiles:
2428    @classmethod
2429    def setUpClass(cls):
2430        datacount = randint(16, 64)*1024 + randint(1, 1024)
2431        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2432                            for i in range(datacount))
2433
2434    def setUp(self):
2435        # Make a source file with some lines
2436        with open(TESTFN, "wb") as fp:
2437            fp.write(self.data)
2438
2439    def tearDown(self):
2440        unlink(TESTFN)
2441        unlink(TESTFN2)
2442
2443    def make_test_archive(self, f, compression):
2444        # Create the ZIP archive
2445        with zipfile.ZipFile(f, "w", compression) as zipfp:
2446            zipfp.write(TESTFN, "another.name")
2447            zipfp.write(TESTFN, TESTFN)
2448
2449    def zip_test(self, f, compression):
2450        self.make_test_archive(f, compression)
2451
2452        # Read the ZIP archive
2453        with zipfile.ZipFile(f, "r", compression) as zipfp:
2454            testdata = zipfp.read(TESTFN)
2455            self.assertEqual(len(testdata), len(self.data))
2456            self.assertEqual(testdata, self.data)
2457            self.assertEqual(zipfp.read("another.name"), self.data)
2458
2459    def test_read(self):
2460        for f in get_files(self):
2461            self.zip_test(f, self.compression)
2462
2463    def zip_open_test(self, f, compression):
2464        self.make_test_archive(f, compression)
2465
2466        # Read the ZIP archive
2467        with zipfile.ZipFile(f, "r", compression) as zipfp:
2468            zipdata1 = []
2469            with zipfp.open(TESTFN) as zipopen1:
2470                while True:
2471                    read_data = zipopen1.read(256)
2472                    if not read_data:
2473                        break
2474                    zipdata1.append(read_data)
2475
2476            zipdata2 = []
2477            with zipfp.open("another.name") as zipopen2:
2478                while True:
2479                    read_data = zipopen2.read(256)
2480                    if not read_data:
2481                        break
2482                    zipdata2.append(read_data)
2483
2484            testdata1 = b''.join(zipdata1)
2485            self.assertEqual(len(testdata1), len(self.data))
2486            self.assertEqual(testdata1, self.data)
2487
2488            testdata2 = b''.join(zipdata2)
2489            self.assertEqual(len(testdata2), len(self.data))
2490            self.assertEqual(testdata2, self.data)
2491
2492    def test_open(self):
2493        for f in get_files(self):
2494            self.zip_open_test(f, self.compression)
2495
2496    def zip_random_open_test(self, f, compression):
2497        self.make_test_archive(f, compression)
2498
2499        # Read the ZIP archive
2500        with zipfile.ZipFile(f, "r", compression) as zipfp:
2501            zipdata1 = []
2502            with zipfp.open(TESTFN) as zipopen1:
2503                while True:
2504                    read_data = zipopen1.read(randint(1, 1024))
2505                    if not read_data:
2506                        break
2507                    zipdata1.append(read_data)
2508
2509            testdata = b''.join(zipdata1)
2510            self.assertEqual(len(testdata), len(self.data))
2511            self.assertEqual(testdata, self.data)
2512
2513    def test_random_open(self):
2514        for f in get_files(self):
2515            self.zip_random_open_test(f, self.compression)
2516
2517
2518class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2519                                       unittest.TestCase):
2520    compression = zipfile.ZIP_STORED
2521
2522@requires_zlib()
2523class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2524                                        unittest.TestCase):
2525    compression = zipfile.ZIP_DEFLATED
2526
2527@requires_bz2()
2528class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2529                                      unittest.TestCase):
2530    compression = zipfile.ZIP_BZIP2
2531
2532@requires_lzma()
2533class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2534                                     unittest.TestCase):
2535    compression = zipfile.ZIP_LZMA
2536
2537
2538# Provide the tell() method but not seek()
2539class Tellable:
2540    def __init__(self, fp):
2541        self.fp = fp
2542        self.offset = 0
2543
2544    def write(self, data):
2545        n = self.fp.write(data)
2546        self.offset += n
2547        return n
2548
2549    def tell(self):
2550        return self.offset
2551
2552    def flush(self):
2553        self.fp.flush()
2554
2555class Unseekable:
2556    def __init__(self, fp):
2557        self.fp = fp
2558
2559    def write(self, data):
2560        return self.fp.write(data)
2561
2562    def flush(self):
2563        self.fp.flush()
2564
2565class UnseekableTests(unittest.TestCase):
2566    def test_writestr(self):
2567        for wrapper in (lambda f: f), Tellable, Unseekable:
2568            with self.subTest(wrapper=wrapper):
2569                f = io.BytesIO()
2570                f.write(b'abc')
2571                bf = io.BufferedWriter(f)
2572                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2573                    zipfp.writestr('ones', b'111')
2574                    zipfp.writestr('twos', b'222')
2575                self.assertEqual(f.getvalue()[:5], b'abcPK')
2576                with zipfile.ZipFile(f, mode='r') as zipf:
2577                    with zipf.open('ones') as zopen:
2578                        self.assertEqual(zopen.read(), b'111')
2579                    with zipf.open('twos') as zopen:
2580                        self.assertEqual(zopen.read(), b'222')
2581
2582    def test_write(self):
2583        for wrapper in (lambda f: f), Tellable, Unseekable:
2584            with self.subTest(wrapper=wrapper):
2585                f = io.BytesIO()
2586                f.write(b'abc')
2587                bf = io.BufferedWriter(f)
2588                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2589                    self.addCleanup(unlink, TESTFN)
2590                    with open(TESTFN, 'wb') as f2:
2591                        f2.write(b'111')
2592                    zipfp.write(TESTFN, 'ones')
2593                    with open(TESTFN, 'wb') as f2:
2594                        f2.write(b'222')
2595                    zipfp.write(TESTFN, 'twos')
2596                self.assertEqual(f.getvalue()[:5], b'abcPK')
2597                with zipfile.ZipFile(f, mode='r') as zipf:
2598                    with zipf.open('ones') as zopen:
2599                        self.assertEqual(zopen.read(), b'111')
2600                    with zipf.open('twos') as zopen:
2601                        self.assertEqual(zopen.read(), b'222')
2602
2603    def test_open_write(self):
2604        for wrapper in (lambda f: f), Tellable, Unseekable:
2605            with self.subTest(wrapper=wrapper):
2606                f = io.BytesIO()
2607                f.write(b'abc')
2608                bf = io.BufferedWriter(f)
2609                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2610                    with zipf.open('ones', 'w') as zopen:
2611                        zopen.write(b'111')
2612                    with zipf.open('twos', 'w') as zopen:
2613                        zopen.write(b'222')
2614                self.assertEqual(f.getvalue()[:5], b'abcPK')
2615                with zipfile.ZipFile(f) as zipf:
2616                    self.assertEqual(zipf.read('ones'), b'111')
2617                    self.assertEqual(zipf.read('twos'), b'222')
2618
2619
2620@requires_zlib()
2621class TestsWithMultipleOpens(unittest.TestCase):
2622    @classmethod
2623    def setUpClass(cls):
2624        cls.data1 = b'111' + randbytes(10000)
2625        cls.data2 = b'222' + randbytes(10000)
2626
2627    def make_test_archive(self, f):
2628        # Create the ZIP archive
2629        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2630            zipfp.writestr('ones', self.data1)
2631            zipfp.writestr('twos', self.data2)
2632
2633    def test_same_file(self):
2634        # Verify that (when the ZipFile is in control of creating file objects)
2635        # multiple open() calls can be made without interfering with each other.
2636        for f in get_files(self):
2637            self.make_test_archive(f)
2638            with zipfile.ZipFile(f, mode="r") as zipf:
2639                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2640                    data1 = zopen1.read(500)
2641                    data2 = zopen2.read(500)
2642                    data1 += zopen1.read()
2643                    data2 += zopen2.read()
2644                self.assertEqual(data1, data2)
2645                self.assertEqual(data1, self.data1)
2646
2647    def test_different_file(self):
2648        # Verify that (when the ZipFile is in control of creating file objects)
2649        # multiple open() calls can be made without interfering with each other.
2650        for f in get_files(self):
2651            self.make_test_archive(f)
2652            with zipfile.ZipFile(f, mode="r") as zipf:
2653                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2654                    data1 = zopen1.read(500)
2655                    data2 = zopen2.read(500)
2656                    data1 += zopen1.read()
2657                    data2 += zopen2.read()
2658                self.assertEqual(data1, self.data1)
2659                self.assertEqual(data2, self.data2)
2660
2661    def test_interleaved(self):
2662        # Verify that (when the ZipFile is in control of creating file objects)
2663        # multiple open() calls can be made without interfering with each other.
2664        for f in get_files(self):
2665            self.make_test_archive(f)
2666            with zipfile.ZipFile(f, mode="r") as zipf:
2667                with zipf.open('ones') as zopen1:
2668                    data1 = zopen1.read(500)
2669                    with zipf.open('twos') as zopen2:
2670                        data2 = zopen2.read(500)
2671                        data1 += zopen1.read()
2672                        data2 += zopen2.read()
2673                self.assertEqual(data1, self.data1)
2674                self.assertEqual(data2, self.data2)
2675
2676    def test_read_after_close(self):
2677        for f in get_files(self):
2678            self.make_test_archive(f)
2679            with contextlib.ExitStack() as stack:
2680                with zipfile.ZipFile(f, 'r') as zipf:
2681                    zopen1 = stack.enter_context(zipf.open('ones'))
2682                    zopen2 = stack.enter_context(zipf.open('twos'))
2683                data1 = zopen1.read(500)
2684                data2 = zopen2.read(500)
2685                data1 += zopen1.read()
2686                data2 += zopen2.read()
2687            self.assertEqual(data1, self.data1)
2688            self.assertEqual(data2, self.data2)
2689
2690    def test_read_after_write(self):
2691        for f in get_files(self):
2692            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2693                zipf.writestr('ones', self.data1)
2694                zipf.writestr('twos', self.data2)
2695                with zipf.open('ones') as zopen1:
2696                    data1 = zopen1.read(500)
2697            self.assertEqual(data1, self.data1[:500])
2698            with zipfile.ZipFile(f, 'r') as zipf:
2699                data1 = zipf.read('ones')
2700                data2 = zipf.read('twos')
2701            self.assertEqual(data1, self.data1)
2702            self.assertEqual(data2, self.data2)
2703
2704    def test_write_after_read(self):
2705        for f in get_files(self):
2706            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2707                zipf.writestr('ones', self.data1)
2708                with zipf.open('ones') as zopen1:
2709                    zopen1.read(500)
2710                    zipf.writestr('twos', self.data2)
2711            with zipfile.ZipFile(f, 'r') as zipf:
2712                data1 = zipf.read('ones')
2713                data2 = zipf.read('twos')
2714            self.assertEqual(data1, self.data1)
2715            self.assertEqual(data2, self.data2)
2716
2717    def test_many_opens(self):
2718        # Verify that read() and open() promptly close the file descriptor,
2719        # and don't rely on the garbage collector to free resources.
2720        startcount = fd_count()
2721        self.make_test_archive(TESTFN2)
2722        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2723            for x in range(100):
2724                zipf.read('ones')
2725                with zipf.open('ones') as zopen1:
2726                    pass
2727        self.assertEqual(startcount, fd_count())
2728
2729    def test_write_while_reading(self):
2730        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2731            zipf.writestr('ones', self.data1)
2732        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2733            with zipf.open('ones', 'r') as r1:
2734                data1 = r1.read(500)
2735                with zipf.open('twos', 'w') as w1:
2736                    w1.write(self.data2)
2737                data1 += r1.read()
2738        self.assertEqual(data1, self.data1)
2739        with zipfile.ZipFile(TESTFN2) as zipf:
2740            self.assertEqual(zipf.read('twos'), self.data2)
2741
2742    def tearDown(self):
2743        unlink(TESTFN2)
2744
2745
2746class TestWithDirectory(unittest.TestCase):
2747    def setUp(self):
2748        os.mkdir(TESTFN2)
2749
2750    def test_extract_dir(self):
2751        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
2752            zipf.extractall(TESTFN2)
2753        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2754        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2755        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2756
2757    def test_bug_6050(self):
2758        # Extraction should succeed if directories already exist
2759        os.mkdir(os.path.join(TESTFN2, "a"))
2760        self.test_extract_dir()
2761
2762    def test_write_dir(self):
2763        dirpath = os.path.join(TESTFN2, "x")
2764        os.mkdir(dirpath)
2765        mode = os.stat(dirpath).st_mode & 0xFFFF
2766        with zipfile.ZipFile(TESTFN, "w") as zipf:
2767            zipf.write(dirpath)
2768            zinfo = zipf.filelist[0]
2769            self.assertTrue(zinfo.filename.endswith("/x/"))
2770            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2771            zipf.write(dirpath, "y")
2772            zinfo = zipf.filelist[1]
2773            self.assertTrue(zinfo.filename, "y/")
2774            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2775        with zipfile.ZipFile(TESTFN, "r") as zipf:
2776            zinfo = zipf.filelist[0]
2777            self.assertTrue(zinfo.filename.endswith("/x/"))
2778            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2779            zinfo = zipf.filelist[1]
2780            self.assertTrue(zinfo.filename, "y/")
2781            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2782            target = os.path.join(TESTFN2, "target")
2783            os.mkdir(target)
2784            zipf.extractall(target)
2785            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2786            self.assertEqual(len(os.listdir(target)), 2)
2787
2788    def test_writestr_dir(self):
2789        os.mkdir(os.path.join(TESTFN2, "x"))
2790        with zipfile.ZipFile(TESTFN, "w") as zipf:
2791            zipf.writestr("x/", b'')
2792            zinfo = zipf.filelist[0]
2793            self.assertEqual(zinfo.filename, "x/")
2794            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2795        with zipfile.ZipFile(TESTFN, "r") as zipf:
2796            zinfo = zipf.filelist[0]
2797            self.assertTrue(zinfo.filename.endswith("x/"))
2798            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2799            target = os.path.join(TESTFN2, "target")
2800            os.mkdir(target)
2801            zipf.extractall(target)
2802            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2803            self.assertEqual(os.listdir(target), ["x"])
2804
2805    def test_mkdir(self):
2806        with zipfile.ZipFile(TESTFN, "w") as zf:
2807            zf.mkdir("directory")
2808            zinfo = zf.filelist[0]
2809            self.assertEqual(zinfo.filename, "directory/")
2810            self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10)
2811
2812            zf.mkdir("directory2/")
2813            zinfo = zf.filelist[1]
2814            self.assertEqual(zinfo.filename, "directory2/")
2815            self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10)
2816
2817            zf.mkdir("directory3", mode=0o777)
2818            zinfo = zf.filelist[2]
2819            self.assertEqual(zinfo.filename, "directory3/")
2820            self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10)
2821
2822            old_zinfo = zipfile.ZipInfo("directory4/")
2823            old_zinfo.external_attr = (0o40777 << 16) | 0x10
2824            old_zinfo.CRC = 0
2825            old_zinfo.file_size = 0
2826            old_zinfo.compress_size = 0
2827            zf.mkdir(old_zinfo)
2828            new_zinfo = zf.filelist[3]
2829            self.assertEqual(old_zinfo.filename, "directory4/")
2830            self.assertEqual(old_zinfo.external_attr, new_zinfo.external_attr)
2831
2832            target = os.path.join(TESTFN2, "target")
2833            os.mkdir(target)
2834            zf.extractall(target)
2835            self.assertEqual(set(os.listdir(target)), {"directory", "directory2", "directory3", "directory4"})
2836
2837    def test_create_directory_with_write(self):
2838        with zipfile.ZipFile(TESTFN, "w") as zf:
2839            zf.writestr(zipfile.ZipInfo('directory/'), '')
2840
2841            zinfo = zf.filelist[0]
2842            self.assertEqual(zinfo.filename, "directory/")
2843
2844            directory = os.path.join(TESTFN2, "directory2")
2845            os.mkdir(directory)
2846            mode = os.stat(directory).st_mode
2847            zf.write(directory, arcname="directory2/")
2848            zinfo = zf.filelist[1]
2849            self.assertEqual(zinfo.filename, "directory2/")
2850            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2851
2852            target = os.path.join(TESTFN2, "target")
2853            os.mkdir(target)
2854            zf.extractall(target)
2855
2856            self.assertEqual(set(os.listdir(target)), {"directory", "directory2"})
2857
2858    def tearDown(self):
2859        rmtree(TESTFN2)
2860        if os.path.exists(TESTFN):
2861            unlink(TESTFN)
2862
2863
2864class ZipInfoTests(unittest.TestCase):
2865    def test_from_file(self):
2866        zi = zipfile.ZipInfo.from_file(__file__)
2867        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2868        self.assertFalse(zi.is_dir())
2869        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2870
2871    def test_from_file_pathlike(self):
2872        zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
2873        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2874        self.assertFalse(zi.is_dir())
2875        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2876
2877    def test_from_file_bytes(self):
2878        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
2879        self.assertEqual(posixpath.basename(zi.filename), 'test')
2880        self.assertFalse(zi.is_dir())
2881        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2882
2883    def test_from_file_fileno(self):
2884        with open(__file__, 'rb') as f:
2885            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
2886            self.assertEqual(posixpath.basename(zi.filename), 'test')
2887            self.assertFalse(zi.is_dir())
2888            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2889
2890    def test_from_dir(self):
2891        dirpath = os.path.dirname(os.path.abspath(__file__))
2892        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2893        self.assertEqual(zi.filename, 'stdlib_tests/')
2894        self.assertTrue(zi.is_dir())
2895        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2896        self.assertEqual(zi.file_size, 0)
2897
2898
2899class CommandLineTest(unittest.TestCase):
2900
2901    def zipfilecmd(self, *args, **kwargs):
2902        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2903                                                      **kwargs)
2904        return out.replace(os.linesep.encode(), b'\n')
2905
2906    def zipfilecmd_failure(self, *args):
2907        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2908
2909    def test_bad_use(self):
2910        rc, out, err = self.zipfilecmd_failure()
2911        self.assertEqual(out, b'')
2912        self.assertIn(b'usage', err.lower())
2913        self.assertIn(b'error', err.lower())
2914        self.assertIn(b'required', err.lower())
2915        rc, out, err = self.zipfilecmd_failure('-l', '')
2916        self.assertEqual(out, b'')
2917        self.assertNotEqual(err.strip(), b'')
2918
2919    def test_test_command(self):
2920        zip_name = findfile('zipdir.zip')
2921        for opt in '-t', '--test':
2922            out = self.zipfilecmd(opt, zip_name)
2923            self.assertEqual(out.rstrip(), b'Done testing')
2924        zip_name = findfile('testtar.tar')
2925        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2926        self.assertEqual(out, b'')
2927
2928    def test_list_command(self):
2929        zip_name = findfile('zipdir.zip')
2930        t = io.StringIO()
2931        with zipfile.ZipFile(zip_name, 'r') as tf:
2932            tf.printdir(t)
2933        expected = t.getvalue().encode('ascii', 'backslashreplace')
2934        for opt in '-l', '--list':
2935            out = self.zipfilecmd(opt, zip_name,
2936                                  PYTHONIOENCODING='ascii:backslashreplace')
2937            self.assertEqual(out, expected)
2938
2939    @requires_zlib()
2940    def test_create_command(self):
2941        self.addCleanup(unlink, TESTFN)
2942        with open(TESTFN, 'w', encoding='utf-8') as f:
2943            f.write('test 1')
2944        os.mkdir(TESTFNDIR)
2945        self.addCleanup(rmtree, TESTFNDIR)
2946        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f:
2947            f.write('test 2')
2948        files = [TESTFN, TESTFNDIR]
2949        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2950        for opt in '-c', '--create':
2951            try:
2952                out = self.zipfilecmd(opt, TESTFN2, *files)
2953                self.assertEqual(out, b'')
2954                with zipfile.ZipFile(TESTFN2) as zf:
2955                    self.assertEqual(zf.namelist(), namelist)
2956                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2957                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2958            finally:
2959                unlink(TESTFN2)
2960
2961    def test_extract_command(self):
2962        zip_name = findfile('zipdir.zip')
2963        for opt in '-e', '--extract':
2964            with temp_dir() as extdir:
2965                out = self.zipfilecmd(opt, zip_name, extdir)
2966                self.assertEqual(out, b'')
2967                with zipfile.ZipFile(zip_name) as zf:
2968                    for zi in zf.infolist():
2969                        path = os.path.join(extdir,
2970                                    zi.filename.replace('/', os.sep))
2971                        if zi.is_dir():
2972                            self.assertTrue(os.path.isdir(path))
2973                        else:
2974                            self.assertTrue(os.path.isfile(path))
2975                            with open(path, 'rb') as f:
2976                                self.assertEqual(f.read(), zf.read(zi))
2977
2978
2979class TestExecutablePrependedZip(unittest.TestCase):
2980    """Test our ability to open zip files with an executable prepended."""
2981
2982    def setUp(self):
2983        self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
2984        self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')
2985
2986    def _test_zip_works(self, name):
2987        # bpo28494 sanity check: ensure is_zipfile works on these.
2988        self.assertTrue(zipfile.is_zipfile(name),
2989                        f'is_zipfile failed on {name}')
2990        # Ensure we can operate on these via ZipFile.
2991        with zipfile.ZipFile(name) as zipfp:
2992            for n in zipfp.namelist():
2993                data = zipfp.read(n)
2994                self.assertIn(b'FAVORITE_NUMBER', data)
2995
2996    def test_read_zip_with_exe_prepended(self):
2997        self._test_zip_works(self.exe_zip)
2998
2999    def test_read_zip64_with_exe_prepended(self):
3000        self._test_zip_works(self.exe_zip64)
3001
3002    @unittest.skipUnless(sys.executable, 'sys.executable required.')
3003    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
3004                         'Test relies on #!/bin/bash working.')
3005    @requires_subprocess()
3006    def test_execute_zip2(self):
3007        output = subprocess.check_output([self.exe_zip, sys.executable])
3008        self.assertIn(b'number in executable: 5', output)
3009
3010    @unittest.skipUnless(sys.executable, 'sys.executable required.')
3011    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
3012                         'Test relies on #!/bin/bash working.')
3013    @requires_subprocess()
3014    def test_execute_zip64(self):
3015        output = subprocess.check_output([self.exe_zip64, sys.executable])
3016        self.assertIn(b'number in executable: 5', output)
3017
3018
3019# Poor man's technique to consume a (smallish) iterable.
3020consume = tuple
3021
3022
3023# from jaraco.itertools 5.0
3024class jaraco:
3025    class itertools:
3026        class Counter:
3027            def __init__(self, i):
3028                self.count = 0
3029                self._orig_iter = iter(i)
3030
3031            def __iter__(self):
3032                return self
3033
3034            def __next__(self):
3035                result = next(self._orig_iter)
3036                self.count += 1
3037                return result
3038
3039
3040def add_dirs(zf):
3041    """
3042    Given a writable zip file zf, inject directory entries for
3043    any directories implied by the presence of children.
3044    """
3045    for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
3046        zf.writestr(name, b"")
3047    return zf
3048
3049
3050def build_alpharep_fixture():
3051    """
3052    Create a zip file with this structure:
3053
3054    .
3055    ├── a.txt
3056    ├── b
3057    │   ├── c.txt
3058    │   ├── d
3059    │   │   └── e.txt
3060    │   └── f.txt
3061    └── g
3062        └── h
3063            └── i.txt
3064
3065    This fixture has the following key characteristics:
3066
3067    - a file at the root (a)
3068    - a file two levels deep (b/d/e)
3069    - multiple files in a directory (b/c, b/f)
3070    - a directory containing only a directory (g/h)
3071
3072    "alpha" because it uses alphabet
3073    "rep" because it's a representative example
3074    """
3075    data = io.BytesIO()
3076    zf = zipfile.ZipFile(data, "w")
3077    zf.writestr("a.txt", b"content of a")
3078    zf.writestr("b/c.txt", b"content of c")
3079    zf.writestr("b/d/e.txt", b"content of e")
3080    zf.writestr("b/f.txt", b"content of f")
3081    zf.writestr("g/h/i.txt", b"content of i")
3082    zf.filename = "alpharep.zip"
3083    return zf
3084
3085
3086def pass_alpharep(meth):
3087    """
3088    Given a method, wrap it in a for loop that invokes method
3089    with each subtest.
3090    """
3091
3092    @functools.wraps(meth)
3093    def wrapper(self):
3094        for alpharep in self.zipfile_alpharep():
3095            meth(self, alpharep=alpharep)
3096
3097    return wrapper
3098
3099
3100class TestPath(unittest.TestCase):
3101    def setUp(self):
3102        self.fixtures = contextlib.ExitStack()
3103        self.addCleanup(self.fixtures.close)
3104
3105    def zipfile_alpharep(self):
3106        with self.subTest():
3107            yield build_alpharep_fixture()
3108        with self.subTest():
3109            yield add_dirs(build_alpharep_fixture())
3110
3111    def zipfile_ondisk(self, alpharep):
3112        tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
3113        buffer = alpharep.fp
3114        alpharep.close()
3115        path = tmpdir / alpharep.filename
3116        with path.open("wb") as strm:
3117            strm.write(buffer.getvalue())
3118        return path
3119
3120    @pass_alpharep
3121    def test_iterdir_and_types(self, alpharep):
3122        root = zipfile.Path(alpharep)
3123        assert root.is_dir()
3124        a, b, g = root.iterdir()
3125        assert a.is_file()
3126        assert b.is_dir()
3127        assert g.is_dir()
3128        c, f, d = b.iterdir()
3129        assert c.is_file() and f.is_file()
3130        (e,) = d.iterdir()
3131        assert e.is_file()
3132        (h,) = g.iterdir()
3133        (i,) = h.iterdir()
3134        assert i.is_file()
3135
3136    @pass_alpharep
3137    def test_is_file_missing(self, alpharep):
3138        root = zipfile.Path(alpharep)
3139        assert not root.joinpath('missing.txt').is_file()
3140
3141    @pass_alpharep
3142    def test_iterdir_on_file(self, alpharep):
3143        root = zipfile.Path(alpharep)
3144        a, b, g = root.iterdir()
3145        with self.assertRaises(ValueError):
3146            a.iterdir()
3147
3148    @pass_alpharep
3149    def test_subdir_is_dir(self, alpharep):
3150        root = zipfile.Path(alpharep)
3151        assert (root / 'b').is_dir()
3152        assert (root / 'b/').is_dir()
3153        assert (root / 'g').is_dir()
3154        assert (root / 'g/').is_dir()
3155
3156    @pass_alpharep
3157    def test_open(self, alpharep):
3158        root = zipfile.Path(alpharep)
3159        a, b, g = root.iterdir()
3160        with a.open(encoding="utf-8") as strm:
3161            data = strm.read()
3162        self.assertEqual(data, "content of a")
3163        with a.open('r', "utf-8") as strm:  # not a kw, no gh-101144 TypeError
3164            data = strm.read()
3165        self.assertEqual(data, "content of a")
3166
3167    def test_open_encoding_utf16(self):
3168        in_memory_file = io.BytesIO()
3169        zf = zipfile.ZipFile(in_memory_file, "w")
3170        zf.writestr("path/16.txt", "This was utf-16".encode("utf-16"))
3171        zf.filename = "test_open_utf16.zip"
3172        root = zipfile.Path(zf)
3173        (path,) = root.iterdir()
3174        u16 = path.joinpath("16.txt")
3175        with u16.open('r', "utf-16") as strm:
3176            data = strm.read()
3177        self.assertEqual(data, "This was utf-16")
3178        with u16.open(encoding="utf-16") as strm:
3179            data = strm.read()
3180        self.assertEqual(data, "This was utf-16")
3181
3182    def test_open_encoding_errors(self):
3183        in_memory_file = io.BytesIO()
3184        zf = zipfile.ZipFile(in_memory_file, "w")
3185        zf.writestr("path/bad-utf8.bin", b"invalid utf-8: \xff\xff.")
3186        zf.filename = "test_read_text_encoding_errors.zip"
3187        root = zipfile.Path(zf)
3188        (path,) = root.iterdir()
3189        u16 = path.joinpath("bad-utf8.bin")
3190
3191        # encoding= as a positional argument for gh-101144.
3192        data = u16.read_text("utf-8", errors="ignore")
3193        self.assertEqual(data, "invalid utf-8: .")
3194        with u16.open("r", "utf-8", errors="surrogateescape") as f:
3195            self.assertEqual(f.read(), "invalid utf-8: \udcff\udcff.")
3196
3197        # encoding= both positional and keyword is an error; gh-101144.
3198        with self.assertRaisesRegex(TypeError, "encoding"):
3199            data = u16.read_text("utf-8", encoding="utf-8")
3200
3201        # both keyword arguments work.
3202        with u16.open("r", encoding="utf-8", errors="strict") as f:
3203            # error during decoding with wrong codec.
3204            with self.assertRaises(UnicodeDecodeError):
3205                f.read()
3206
3207    def test_encoding_warnings(self):
3208        """EncodingWarning must blame the read_text and open calls."""
3209        code = '''\
3210import io, zipfile
3211with zipfile.ZipFile(io.BytesIO(), "w") as zf:
3212    zf.filename = '<test_encoding_warnings in memory zip file>'
3213    zf.writestr("path/file.txt", b"Spanish Inquisition")
3214    root = zipfile.Path(zf)
3215    (path,) = root.iterdir()
3216    file_path = path.joinpath("file.txt")
3217    unused = file_path.read_text()  # should warn
3218    file_path.open("r").close()  # should warn
3219'''
3220        proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
3221        warnings = proc.err.splitlines()
3222        self.assertEqual(len(warnings), 2, proc.err)
3223        self.assertRegex(warnings[0], rb"^<string>:8: EncodingWarning:")
3224        self.assertRegex(warnings[1], rb"^<string>:9: EncodingWarning:")
3225
3226    def test_open_write(self):
3227        """
3228        If the zipfile is open for write, it should be possible to
3229        write bytes or text to it.
3230        """
3231        zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w'))
3232        with zf.joinpath('file.bin').open('wb') as strm:
3233            strm.write(b'binary contents')
3234        with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
3235            strm.write('text file')
3236
3237    def test_open_extant_directory(self):
3238        """
3239        Attempting to open a directory raises IsADirectoryError.
3240        """
3241        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
3242        with self.assertRaises(IsADirectoryError):
3243            zf.joinpath('b').open()
3244
3245    @pass_alpharep
3246    def test_open_binary_invalid_args(self, alpharep):
3247        root = zipfile.Path(alpharep)
3248        with self.assertRaises(ValueError):
3249            root.joinpath('a.txt').open('rb', encoding='utf-8')
3250        with self.assertRaises(ValueError):
3251            root.joinpath('a.txt').open('rb', 'utf-8')
3252
3253    def test_open_missing_directory(self):
3254        """
3255        Attempting to open a missing directory raises FileNotFoundError.
3256        """
3257        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
3258        with self.assertRaises(FileNotFoundError):
3259            zf.joinpath('z').open()
3260
3261    @pass_alpharep
3262    def test_read(self, alpharep):
3263        root = zipfile.Path(alpharep)
3264        a, b, g = root.iterdir()
3265        assert a.read_text(encoding="utf-8") == "content of a"
3266        a.read_text("utf-8")  # No positional arg TypeError per gh-101144.
3267        assert a.read_bytes() == b"content of a"
3268
3269    @pass_alpharep
3270    def test_joinpath(self, alpharep):
3271        root = zipfile.Path(alpharep)
3272        a = root.joinpath("a.txt")
3273        assert a.is_file()
3274        e = root.joinpath("b").joinpath("d").joinpath("e.txt")
3275        assert e.read_text(encoding="utf-8") == "content of e"
3276
3277    @pass_alpharep
3278    def test_joinpath_multiple(self, alpharep):
3279        root = zipfile.Path(alpharep)
3280        e = root.joinpath("b", "d", "e.txt")
3281        assert e.read_text(encoding="utf-8") == "content of e"
3282
3283    @pass_alpharep
3284    def test_traverse_truediv(self, alpharep):
3285        root = zipfile.Path(alpharep)
3286        a = root / "a.txt"
3287        assert a.is_file()
3288        e = root / "b" / "d" / "e.txt"
3289        assert e.read_text(encoding="utf-8") == "content of e"
3290
3291    @pass_alpharep
3292    def test_traverse_simplediv(self, alpharep):
3293        """
3294        Disable the __future__.division when testing traversal.
3295        """
3296        code = compile(
3297            source="zipfile.Path(alpharep) / 'a'",
3298            filename="(test)",
3299            mode="eval",
3300            dont_inherit=True,
3301        )
3302        eval(code)
3303
3304    @pass_alpharep
3305    def test_pathlike_construction(self, alpharep):
3306        """
3307        zipfile.Path should be constructable from a path-like object
3308        """
3309        zipfile_ondisk = self.zipfile_ondisk(alpharep)
3310        pathlike = pathlib.Path(str(zipfile_ondisk))
3311        zipfile.Path(pathlike)
3312
3313    @pass_alpharep
3314    def test_traverse_pathlike(self, alpharep):
3315        root = zipfile.Path(alpharep)
3316        root / pathlib.Path("a")
3317
3318    @pass_alpharep
3319    def test_parent(self, alpharep):
3320        root = zipfile.Path(alpharep)
3321        assert (root / 'a').parent.at == ''
3322        assert (root / 'a' / 'b').parent.at == 'a/'
3323
3324    @pass_alpharep
3325    def test_dir_parent(self, alpharep):
3326        root = zipfile.Path(alpharep)
3327        assert (root / 'b').parent.at == ''
3328        assert (root / 'b/').parent.at == ''
3329
3330    @pass_alpharep
3331    def test_missing_dir_parent(self, alpharep):
3332        root = zipfile.Path(alpharep)
3333        assert (root / 'missing dir/').parent.at == ''
3334
3335    @pass_alpharep
3336    def test_mutability(self, alpharep):
3337        """
3338        If the underlying zipfile is changed, the Path object should
3339        reflect that change.
3340        """
3341        root = zipfile.Path(alpharep)
3342        a, b, g = root.iterdir()
3343        alpharep.writestr('foo.txt', 'foo')
3344        alpharep.writestr('bar/baz.txt', 'baz')
3345        assert any(child.name == 'foo.txt' for child in root.iterdir())
3346        assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo'
3347        (baz,) = (root / 'bar').iterdir()
3348        assert baz.read_text(encoding="utf-8") == 'baz'
3349
3350    HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
3351
3352    def huge_zipfile(self):
3353        """Create a read-only zipfile with a huge number of entries entries."""
3354        strm = io.BytesIO()
3355        zf = zipfile.ZipFile(strm, "w")
3356        for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
3357            zf.writestr(entry, entry)
3358        zf.mode = 'r'
3359        return zf
3360
3361    def test_joinpath_constant_time(self):
3362        """
3363        Ensure joinpath on items in zipfile is linear time.
3364        """
3365        root = zipfile.Path(self.huge_zipfile())
3366        entries = jaraco.itertools.Counter(root.iterdir())
3367        for entry in entries:
3368            entry.joinpath('suffix')
3369        # Check the file iterated all items
3370        assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
3371
3372    # @func_timeout.func_set_timeout(3)
3373    def test_implied_dirs_performance(self):
3374        data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
3375        zipfile.CompleteDirs._implied_dirs(data)
3376
3377    @pass_alpharep
3378    def test_read_does_not_close(self, alpharep):
3379        alpharep = self.zipfile_ondisk(alpharep)
3380        with zipfile.ZipFile(alpharep) as file:
3381            for rep in range(2):
3382                zipfile.Path(file, 'a.txt').read_text(encoding="utf-8")
3383
3384    @pass_alpharep
3385    def test_subclass(self, alpharep):
3386        class Subclass(zipfile.Path):
3387            pass
3388
3389        root = Subclass(alpharep)
3390        assert isinstance(root / 'b', Subclass)
3391
3392    @pass_alpharep
3393    def test_filename(self, alpharep):
3394        root = zipfile.Path(alpharep)
3395        assert root.filename == pathlib.Path('alpharep.zip')
3396
3397    @pass_alpharep
3398    def test_root_name(self, alpharep):
3399        """
3400        The name of the root should be the name of the zipfile
3401        """
3402        root = zipfile.Path(alpharep)
3403        assert root.name == 'alpharep.zip' == root.filename.name
3404
3405    @pass_alpharep
3406    def test_suffix(self, alpharep):
3407        """
3408        The suffix of the root should be the suffix of the zipfile.
3409        The suffix of each nested file is the final component's last suffix, if any.
3410        Includes the leading period, just like pathlib.Path.
3411        """
3412        root = zipfile.Path(alpharep)
3413        assert root.suffix == '.zip' == root.filename.suffix
3414
3415        b = root / "b.txt"
3416        assert b.suffix == ".txt"
3417
3418        c = root / "c" / "filename.tar.gz"
3419        assert c.suffix == ".gz"
3420
3421        d = root / "d"
3422        assert d.suffix == ""
3423
3424    @pass_alpharep
3425    def test_suffixes(self, alpharep):
3426        """
3427        The suffix of the root should be the suffix of the zipfile.
3428        The suffix of each nested file is the final component's last suffix, if any.
3429        Includes the leading period, just like pathlib.Path.
3430        """
3431        root = zipfile.Path(alpharep)
3432        assert root.suffixes == ['.zip'] == root.filename.suffixes
3433
3434        b = root / 'b.txt'
3435        assert b.suffixes == ['.txt']
3436
3437        c = root / 'c' / 'filename.tar.gz'
3438        assert c.suffixes == ['.tar', '.gz']
3439
3440        d = root / 'd'
3441        assert d.suffixes == []
3442
3443        e = root / '.hgrc'
3444        assert e.suffixes == []
3445
3446    @pass_alpharep
3447    def test_stem(self, alpharep):
3448        """
3449        The final path component, without its suffix
3450        """
3451        root = zipfile.Path(alpharep)
3452        assert root.stem == 'alpharep' == root.filename.stem
3453
3454        b = root / "b.txt"
3455        assert b.stem == "b"
3456
3457        c = root / "c" / "filename.tar.gz"
3458        assert c.stem == "filename.tar"
3459
3460        d = root / "d"
3461        assert d.stem == "d"
3462
3463    @pass_alpharep
3464    def test_root_parent(self, alpharep):
3465        root = zipfile.Path(alpharep)
3466        assert root.parent == pathlib.Path('.')
3467        root.root.filename = 'foo/bar.zip'
3468        assert root.parent == pathlib.Path('foo')
3469
3470    @pass_alpharep
3471    def test_root_unnamed(self, alpharep):
3472        """
3473        It is an error to attempt to get the name
3474        or parent of an unnamed zipfile.
3475        """
3476        alpharep.filename = None
3477        root = zipfile.Path(alpharep)
3478        with self.assertRaises(TypeError):
3479            root.name
3480        with self.assertRaises(TypeError):
3481            root.parent
3482
3483        # .name and .parent should still work on subs
3484        sub = root / "b"
3485        assert sub.name == "b"
3486        assert sub.parent
3487
3488    @pass_alpharep
3489    def test_inheritance(self, alpharep):
3490        cls = type('PathChild', (zipfile.Path,), {})
3491        for alpharep in self.zipfile_alpharep():
3492            file = cls(alpharep).joinpath('some dir').parent
3493            assert isinstance(file, cls)
3494
3495    @pass_alpharep
3496    def test_extract_orig_with_implied_dirs(self, alpharep):
3497        """
3498        A zip file wrapped in a Path should extract even with implied dirs.
3499        """
3500        source_path = self.zipfile_ondisk(alpharep)
3501        zf = zipfile.ZipFile(source_path)
3502        # wrap the zipfile for its side effect
3503        zipfile.Path(zf)
3504        zf.extractall(source_path.parent)
3505
3506
3507class EncodedMetadataTests(unittest.TestCase):
3508    file_names = ['\u4e00', '\u4e8c', '\u4e09']  # Han 'one', 'two', 'three'
3509    file_content = [
3510        "This is pure ASCII.\n".encode('ascii'),
3511        # This is modern Japanese. (UTF-8)
3512        "\u3053\u308c\u306f\u73fe\u4ee3\u7684\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('utf-8'),
3513        # This is obsolete Japanese. (Shift JIS)
3514        "\u3053\u308c\u306f\u53e4\u3044\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('shift_jis'),
3515    ]
3516
3517    def setUp(self):
3518        self.addCleanup(unlink, TESTFN)
3519        # Create .zip of 3 members with Han names encoded in Shift JIS.
3520        # Each name is 1 Han character encoding to 2 bytes in Shift JIS.
3521        # The ASCII names are arbitrary as long as they are length 2 and
3522        # not otherwise contained in the zip file.
3523        # Data elements are encoded bytes (ascii, utf-8, shift_jis).
3524        placeholders = ["n1", "n2"] + self.file_names[2:]
3525        with zipfile.ZipFile(TESTFN, mode="w") as tf:
3526            for temp, content in zip(placeholders, self.file_content):
3527                tf.writestr(temp, content, zipfile.ZIP_STORED)
3528        # Hack in the Shift JIS names with flag bit 11 (UTF-8) unset.
3529        with open(TESTFN, "rb") as tf:
3530            data = tf.read()
3531        for name, temp in zip(self.file_names, placeholders[:2]):
3532            data = data.replace(temp.encode('ascii'),
3533                                name.encode('shift_jis'))
3534        with open(TESTFN, "wb") as tf:
3535            tf.write(data)
3536
3537    def _test_read(self, zipfp, expected_names, expected_content):
3538        # Check the namelist
3539        names = zipfp.namelist()
3540        self.assertEqual(sorted(names), sorted(expected_names))
3541
3542        # Check infolist
3543        infos = zipfp.infolist()
3544        names = [zi.filename for zi in infos]
3545        self.assertEqual(sorted(names), sorted(expected_names))
3546
3547        # check getinfo
3548        for name, content in zip(expected_names, expected_content):
3549            info = zipfp.getinfo(name)
3550            self.assertEqual(info.filename, name)
3551            self.assertEqual(info.file_size, len(content))
3552            self.assertEqual(zipfp.read(name), content)
3553
3554    def test_read_with_metadata_encoding(self):
3555        # Read the ZIP archive with correct metadata_encoding
3556        with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp:
3557            self._test_read(zipfp, self.file_names, self.file_content)
3558
3559    def test_read_without_metadata_encoding(self):
3560        # Read the ZIP archive without metadata_encoding
3561        expected_names = [name.encode('shift_jis').decode('cp437')
3562                          for name in self.file_names[:2]] + self.file_names[2:]
3563        with zipfile.ZipFile(TESTFN, "r") as zipfp:
3564            self._test_read(zipfp, expected_names, self.file_content)
3565
3566    def test_read_with_incorrect_metadata_encoding(self):
3567        # Read the ZIP archive with incorrect metadata_encoding
3568        expected_names = [name.encode('shift_jis').decode('koi8-u')
3569                          for name in self.file_names[:2]] + self.file_names[2:]
3570        with zipfile.ZipFile(TESTFN, "r", metadata_encoding='koi8-u') as zipfp:
3571            self._test_read(zipfp, expected_names, self.file_content)
3572
3573    def test_read_with_unsuitable_metadata_encoding(self):
3574        # Read the ZIP archive with metadata_encoding unsuitable for
3575        # decoding metadata
3576        with self.assertRaises(UnicodeDecodeError):
3577            zipfile.ZipFile(TESTFN, "r", metadata_encoding='ascii')
3578        with self.assertRaises(UnicodeDecodeError):
3579            zipfile.ZipFile(TESTFN, "r", metadata_encoding='utf-8')
3580
3581    def test_read_after_append(self):
3582        newname = '\u56db'  # Han 'four'
3583        expected_names = [name.encode('shift_jis').decode('cp437')
3584                          for name in self.file_names[:2]] + self.file_names[2:]
3585        expected_names.append(newname)
3586        expected_content = (*self.file_content, b"newcontent")
3587
3588        with zipfile.ZipFile(TESTFN, "a") as zipfp:
3589            zipfp.writestr(newname, "newcontent")
3590            self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names))
3591
3592        with zipfile.ZipFile(TESTFN, "r") as zipfp:
3593            self._test_read(zipfp, expected_names, expected_content)
3594
3595        with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp:
3596            self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names))
3597            for i, (name, content) in enumerate(zip(expected_names, expected_content)):
3598                info = zipfp.getinfo(name)
3599                self.assertEqual(info.filename, name)
3600                self.assertEqual(info.file_size, len(content))
3601                if i < 2:
3602                    with self.assertRaises(zipfile.BadZipFile):
3603                        zipfp.read(name)
3604                else:
3605                    self.assertEqual(zipfp.read(name), content)
3606
3607    def test_write_with_metadata_encoding(self):
3608        ZF = zipfile.ZipFile
3609        for mode in ("w", "x", "a"):
3610            with self.assertRaisesRegex(ValueError,
3611                                        "^metadata_encoding is only"):
3612                ZF("nonesuch.zip", mode, metadata_encoding="shift_jis")
3613
3614    def test_cli_with_metadata_encoding(self):
3615        errmsg = "Non-conforming encodings not supported with -c."
3616        args = ["--metadata-encoding=shift_jis", "-c", "nonesuch", "nonesuch"]
3617        with captured_stdout() as stdout:
3618            with captured_stderr() as stderr:
3619                self.assertRaises(SystemExit, zipfile.main, args)
3620        self.assertEqual(stdout.getvalue(), "")
3621        self.assertIn(errmsg, stderr.getvalue())
3622
3623        with captured_stdout() as stdout:
3624            zipfile.main(["--metadata-encoding=shift_jis", "-t", TESTFN])
3625        listing = stdout.getvalue()
3626
3627        with captured_stdout() as stdout:
3628            zipfile.main(["--metadata-encoding=shift_jis", "-l", TESTFN])
3629        listing = stdout.getvalue()
3630        for name in self.file_names:
3631            self.assertIn(name, listing)
3632
3633    def test_cli_with_metadata_encoding_extract(self):
3634        os.mkdir(TESTFN2)
3635        self.addCleanup(rmtree, TESTFN2)
3636        # Depending on locale, extracted file names can be not encodable
3637        # with the filesystem encoding.
3638        for fn in self.file_names:
3639            try:
3640                os.stat(os.path.join(TESTFN2, fn))
3641            except OSError:
3642                pass
3643            except UnicodeEncodeError:
3644                self.skipTest(f'cannot encode file name {fn!r}')
3645
3646        zipfile.main(["--metadata-encoding=shift_jis", "-e", TESTFN, TESTFN2])
3647        listing = os.listdir(TESTFN2)
3648        for name in self.file_names:
3649            self.assertIn(name, listing)
3650
3651
3652class StripExtraTests(unittest.TestCase):
3653    # Note: all of the "z" characters are technically invalid, but up
3654    # to 3 bytes at the end of the extra will be passed through as they
3655    # are too short to encode a valid extra.
3656
3657    ZIP64_EXTRA = 1
3658
3659    def test_no_data(self):
3660        s = struct.Struct("<HH")
3661        a = s.pack(self.ZIP64_EXTRA, 0)
3662        b = s.pack(2, 0)
3663        c = s.pack(3, 0)
3664
3665        self.assertEqual(b'', zipfile._strip_extra(a, (self.ZIP64_EXTRA,)))
3666        self.assertEqual(b, zipfile._strip_extra(b, (self.ZIP64_EXTRA,)))
3667        self.assertEqual(
3668            b+b"z", zipfile._strip_extra(b+b"z", (self.ZIP64_EXTRA,)))
3669
3670        self.assertEqual(b+c, zipfile._strip_extra(a+b+c, (self.ZIP64_EXTRA,)))
3671        self.assertEqual(b+c, zipfile._strip_extra(b+a+c, (self.ZIP64_EXTRA,)))
3672        self.assertEqual(b+c, zipfile._strip_extra(b+c+a, (self.ZIP64_EXTRA,)))
3673
3674    def test_with_data(self):
3675        s = struct.Struct("<HH")
3676        a = s.pack(self.ZIP64_EXTRA, 1) + b"a"
3677        b = s.pack(2, 2) + b"bb"
3678        c = s.pack(3, 3) + b"ccc"
3679
3680        self.assertEqual(b"", zipfile._strip_extra(a, (self.ZIP64_EXTRA,)))
3681        self.assertEqual(b, zipfile._strip_extra(b, (self.ZIP64_EXTRA,)))
3682        self.assertEqual(
3683            b+b"z", zipfile._strip_extra(b+b"z", (self.ZIP64_EXTRA,)))
3684
3685        self.assertEqual(b+c, zipfile._strip_extra(a+b+c, (self.ZIP64_EXTRA,)))
3686        self.assertEqual(b+c, zipfile._strip_extra(b+a+c, (self.ZIP64_EXTRA,)))
3687        self.assertEqual(b+c, zipfile._strip_extra(b+c+a, (self.ZIP64_EXTRA,)))
3688
3689    def test_multiples(self):
3690        s = struct.Struct("<HH")
3691        a = s.pack(self.ZIP64_EXTRA, 1) + b"a"
3692        b = s.pack(2, 2) + b"bb"
3693
3694        self.assertEqual(b"", zipfile._strip_extra(a+a, (self.ZIP64_EXTRA,)))
3695        self.assertEqual(b"", zipfile._strip_extra(a+a+a, (self.ZIP64_EXTRA,)))
3696        self.assertEqual(
3697            b"z", zipfile._strip_extra(a+a+b"z", (self.ZIP64_EXTRA,)))
3698        self.assertEqual(
3699            b+b"z", zipfile._strip_extra(a+a+b+b"z", (self.ZIP64_EXTRA,)))
3700
3701        self.assertEqual(b, zipfile._strip_extra(a+a+b, (self.ZIP64_EXTRA,)))
3702        self.assertEqual(b, zipfile._strip_extra(a+b+a, (self.ZIP64_EXTRA,)))
3703        self.assertEqual(b, zipfile._strip_extra(b+a+a, (self.ZIP64_EXTRA,)))
3704
3705    def test_too_short(self):
3706        self.assertEqual(b"", zipfile._strip_extra(b"", (self.ZIP64_EXTRA,)))
3707        self.assertEqual(b"z", zipfile._strip_extra(b"z", (self.ZIP64_EXTRA,)))
3708        self.assertEqual(
3709            b"zz", zipfile._strip_extra(b"zz", (self.ZIP64_EXTRA,)))
3710        self.assertEqual(
3711            b"zzz", zipfile._strip_extra(b"zzz", (self.ZIP64_EXTRA,)))
3712
3713
3714if __name__ == "__main__":
3715    unittest.main()
3716