1# Tests of the full ZIP64 functionality of zipfile
2# The support.requires call is the only reason for keeping this separate
3# from test_zipfile
4from test import support
5
6# XXX(nnorwitz): disable this test by looking for extralargefile resource,
7# which doesn't exist.  This test takes over 30 minutes to run in general
8# and requires more disk space than most of the buildbots.
9support.requires(
10        'extralargefile',
11        'test requires loads of disk-space bytes and a long time to run'
12    )
13
14import zipfile, os, unittest
15import time
16import sys
17
18from tempfile import TemporaryFile
19
20from test.support import os_helper
21from test.support import requires_zlib
22
23TESTFN = os_helper.TESTFN
24TESTFN2 = TESTFN + "2"
25
26# How much time in seconds can pass before we print a 'Still working' message.
27_PRINT_WORKING_MSG_INTERVAL = 60
28
29class TestsWithSourceFile(unittest.TestCase):
30    def setUp(self):
31        # Create test data.
32        line_gen = ("Test of zipfile line %d." % i for i in range(1000000))
33        self.data = '\n'.join(line_gen).encode('ascii')
34
35        # And write it to a file.
36        with open(TESTFN, "wb") as fp:
37            fp.write(self.data)
38
39    def zipTest(self, f, compression):
40        # Create the ZIP archive.
41        with zipfile.ZipFile(f, "w", compression) as zipfp:
42
43            # It will contain enough copies of self.data to reach about 6 GiB of
44            # raw data to store.
45            filecount = 6*1024**3 // len(self.data)
46
47            next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
48            for num in range(filecount):
49                zipfp.writestr("testfn%d" % num, self.data)
50                # Print still working message since this test can be really slow
51                if next_time <= time.monotonic():
52                    next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
53                    print((
54                    '  zipTest still writing %d of %d, be patient...' %
55                    (num, filecount)), file=sys.__stdout__)
56                    sys.__stdout__.flush()
57
58        # Read the ZIP archive
59        with zipfile.ZipFile(f, "r", compression) as zipfp:
60            for num in range(filecount):
61                self.assertEqual(zipfp.read("testfn%d" % num), self.data)
62                # Print still working message since this test can be really slow
63                if next_time <= time.monotonic():
64                    next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
65                    print((
66                    '  zipTest still reading %d of %d, be patient...' %
67                    (num, filecount)), file=sys.__stdout__)
68                    sys.__stdout__.flush()
69
70    def testStored(self):
71        # Try the temp file first.  If we do TESTFN2 first, then it hogs
72        # gigabytes of disk space for the duration of the test.
73        with TemporaryFile() as f:
74            self.zipTest(f, zipfile.ZIP_STORED)
75            self.assertFalse(f.closed)
76        self.zipTest(TESTFN2, zipfile.ZIP_STORED)
77
78    @requires_zlib()
79    def testDeflated(self):
80        # Try the temp file first.  If we do TESTFN2 first, then it hogs
81        # gigabytes of disk space for the duration of the test.
82        with TemporaryFile() as f:
83            self.zipTest(f, zipfile.ZIP_DEFLATED)
84            self.assertFalse(f.closed)
85        self.zipTest(TESTFN2, zipfile.ZIP_DEFLATED)
86
87    def tearDown(self):
88        for fname in TESTFN, TESTFN2:
89            if os.path.exists(fname):
90                os.remove(fname)
91
92
93class OtherTests(unittest.TestCase):
94    def testMoreThan64kFiles(self):
95        # This test checks that more than 64k files can be added to an archive,
96        # and that the resulting archive can be read properly by ZipFile
97        with zipfile.ZipFile(TESTFN, mode="w", allowZip64=True) as zipf:
98            zipf.debug = 100
99            numfiles = (1 << 16) * 3//2
100            for i in range(numfiles):
101                zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
102            self.assertEqual(len(zipf.namelist()), numfiles)
103
104        with zipfile.ZipFile(TESTFN, mode="r") as zipf2:
105            self.assertEqual(len(zipf2.namelist()), numfiles)
106            for i in range(numfiles):
107                content = zipf2.read("foo%08d" % i).decode('ascii')
108                self.assertEqual(content, "%d" % (i**3 % 57))
109
110    def testMoreThan64kFilesAppend(self):
111        with zipfile.ZipFile(TESTFN, mode="w", allowZip64=False) as zipf:
112            zipf.debug = 100
113            numfiles = (1 << 16) - 1
114            for i in range(numfiles):
115                zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
116            self.assertEqual(len(zipf.namelist()), numfiles)
117            with self.assertRaises(zipfile.LargeZipFile):
118                zipf.writestr("foo%08d" % numfiles, b'')
119            self.assertEqual(len(zipf.namelist()), numfiles)
120
121        with zipfile.ZipFile(TESTFN, mode="a", allowZip64=False) as zipf:
122            zipf.debug = 100
123            self.assertEqual(len(zipf.namelist()), numfiles)
124            with self.assertRaises(zipfile.LargeZipFile):
125                zipf.writestr("foo%08d" % numfiles, b'')
126            self.assertEqual(len(zipf.namelist()), numfiles)
127
128        with zipfile.ZipFile(TESTFN, mode="a", allowZip64=True) as zipf:
129            zipf.debug = 100
130            self.assertEqual(len(zipf.namelist()), numfiles)
131            numfiles2 = (1 << 16) * 3//2
132            for i in range(numfiles, numfiles2):
133                zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
134            self.assertEqual(len(zipf.namelist()), numfiles2)
135
136        with zipfile.ZipFile(TESTFN, mode="r") as zipf2:
137            self.assertEqual(len(zipf2.namelist()), numfiles2)
138            for i in range(numfiles2):
139                content = zipf2.read("foo%08d" % i).decode('ascii')
140                self.assertEqual(content, "%d" % (i**3 % 57))
141
142    def tearDown(self):
143        os_helper.unlink(TESTFN)
144        os_helper.unlink(TESTFN2)
145
146if __name__ == "__main__":
147    unittest.main()
148