1"""Test script for the gzip module.
2"""
3
4import array
5import functools
6import io
7import os
8import pathlib
9import struct
10import sys
11import unittest
12from subprocess import PIPE, Popen
13from test.support import import_helper
14from test.support import os_helper
15from test.support import _4G, bigmemtest, requires_subprocess
16from test.support.script_helper import assert_python_ok, assert_python_failure
17
18gzip = import_helper.import_module('gzip')
19
20data1 = b"""  int length=DEFAULTALLOC, err = Z_OK;
21  PyObject *RetVal;
22  int flushmode = Z_FINISH;
23  unsigned long start_total_out;
24
25"""
26
27data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
28/* See http://www.gzip.org/zlib/
29/* See http://www.winimage.com/zLibDll for Windows */
30"""
31
32
33TEMPDIR = os.path.abspath(os_helper.TESTFN) + '-gzdir'
34
35
36class UnseekableIO(io.BytesIO):
37    def seekable(self):
38        return False
39
40    def tell(self):
41        raise io.UnsupportedOperation
42
43    def seek(self, *args):
44        raise io.UnsupportedOperation
45
46
47class BaseTest(unittest.TestCase):
48    filename = os_helper.TESTFN
49
50    def setUp(self):
51        os_helper.unlink(self.filename)
52
53    def tearDown(self):
54        os_helper.unlink(self.filename)
55
56
57class TestGzip(BaseTest):
58    def write_and_read_back(self, data, mode='b'):
59        b_data = bytes(data)
60        with gzip.GzipFile(self.filename, 'w'+mode) as f:
61            l = f.write(data)
62        self.assertEqual(l, len(b_data))
63        with gzip.GzipFile(self.filename, 'r'+mode) as f:
64            self.assertEqual(f.read(), b_data)
65
66    def test_write(self):
67        with gzip.GzipFile(self.filename, 'wb') as f:
68            f.write(data1 * 50)
69
70            # Try flush and fileno.
71            f.flush()
72            f.fileno()
73            if hasattr(os, 'fsync'):
74                os.fsync(f.fileno())
75            f.close()
76
77        # Test multiple close() calls.
78        f.close()
79
80    def test_write_read_with_pathlike_file(self):
81        filename = pathlib.Path(self.filename)
82        with gzip.GzipFile(filename, 'w') as f:
83            f.write(data1 * 50)
84        self.assertIsInstance(f.name, str)
85        with gzip.GzipFile(filename, 'a') as f:
86            f.write(data1)
87        with gzip.GzipFile(filename) as f:
88            d = f.read()
89        self.assertEqual(d, data1 * 51)
90        self.assertIsInstance(f.name, str)
91
92    # The following test_write_xy methods test that write accepts
93    # the corresponding bytes-like object type as input
94    # and that the data written equals bytes(xy) in all cases.
95    def test_write_memoryview(self):
96        self.write_and_read_back(memoryview(data1 * 50))
97        m = memoryview(bytes(range(256)))
98        data = m.cast('B', shape=[8,8,4])
99        self.write_and_read_back(data)
100
101    def test_write_bytearray(self):
102        self.write_and_read_back(bytearray(data1 * 50))
103
104    def test_write_array(self):
105        self.write_and_read_back(array.array('I', data1 * 40))
106
107    def test_write_incompatible_type(self):
108        # Test that non-bytes-like types raise TypeError.
109        # Issue #21560: attempts to write incompatible types
110        # should not affect the state of the fileobject
111        with gzip.GzipFile(self.filename, 'wb') as f:
112            with self.assertRaises(TypeError):
113                f.write('')
114            with self.assertRaises(TypeError):
115                f.write([])
116            f.write(data1)
117        with gzip.GzipFile(self.filename, 'rb') as f:
118            self.assertEqual(f.read(), data1)
119
120    def test_read(self):
121        self.test_write()
122        # Try reading.
123        with gzip.GzipFile(self.filename, 'r') as f:
124            d = f.read()
125        self.assertEqual(d, data1*50)
126
127    def test_read1(self):
128        self.test_write()
129        blocks = []
130        nread = 0
131        with gzip.GzipFile(self.filename, 'r') as f:
132            while True:
133                d = f.read1()
134                if not d:
135                    break
136                blocks.append(d)
137                nread += len(d)
138                # Check that position was updated correctly (see issue10791).
139                self.assertEqual(f.tell(), nread)
140        self.assertEqual(b''.join(blocks), data1 * 50)
141
142    @bigmemtest(size=_4G, memuse=1)
143    def test_read_large(self, size):
144        # Read chunk size over UINT_MAX should be supported, despite zlib's
145        # limitation per low-level call
146        compressed = gzip.compress(data1, compresslevel=1)
147        f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
148        self.assertEqual(f.read(size), data1)
149
150    def test_io_on_closed_object(self):
151        # Test that I/O operations on closed GzipFile objects raise a
152        # ValueError, just like the corresponding functions on file objects.
153
154        # Write to a file, open it for reading, then close it.
155        self.test_write()
156        f = gzip.GzipFile(self.filename, 'r')
157        fileobj = f.fileobj
158        self.assertFalse(fileobj.closed)
159        f.close()
160        self.assertTrue(fileobj.closed)
161        with self.assertRaises(ValueError):
162            f.read(1)
163        with self.assertRaises(ValueError):
164            f.seek(0)
165        with self.assertRaises(ValueError):
166            f.tell()
167        # Open the file for writing, then close it.
168        f = gzip.GzipFile(self.filename, 'w')
169        fileobj = f.fileobj
170        self.assertFalse(fileobj.closed)
171        f.close()
172        self.assertTrue(fileobj.closed)
173        with self.assertRaises(ValueError):
174            f.write(b'')
175        with self.assertRaises(ValueError):
176            f.flush()
177
178    def test_append(self):
179        self.test_write()
180        # Append to the previous file
181        with gzip.GzipFile(self.filename, 'ab') as f:
182            f.write(data2 * 15)
183
184        with gzip.GzipFile(self.filename, 'rb') as f:
185            d = f.read()
186        self.assertEqual(d, (data1*50) + (data2*15))
187
188    def test_many_append(self):
189        # Bug #1074261 was triggered when reading a file that contained
190        # many, many members.  Create such a file and verify that reading it
191        # works.
192        with gzip.GzipFile(self.filename, 'wb', 9) as f:
193            f.write(b'a')
194        for i in range(0, 200):
195            with gzip.GzipFile(self.filename, "ab", 9) as f: # append
196                f.write(b'a')
197
198        # Try reading the file
199        with gzip.GzipFile(self.filename, "rb") as zgfile:
200            contents = b""
201            while 1:
202                ztxt = zgfile.read(8192)
203                contents += ztxt
204                if not ztxt: break
205        self.assertEqual(contents, b'a'*201)
206
207    def test_exclusive_write(self):
208        with gzip.GzipFile(self.filename, 'xb') as f:
209            f.write(data1 * 50)
210        with gzip.GzipFile(self.filename, 'rb') as f:
211            self.assertEqual(f.read(), data1 * 50)
212        with self.assertRaises(FileExistsError):
213            gzip.GzipFile(self.filename, 'xb')
214
215    def test_buffered_reader(self):
216        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
217        # performance.
218        self.test_write()
219
220        with gzip.GzipFile(self.filename, 'rb') as f:
221            with io.BufferedReader(f) as r:
222                lines = [line for line in r]
223
224        self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
225
226    def test_readline(self):
227        self.test_write()
228        # Try .readline() with varying line lengths
229
230        with gzip.GzipFile(self.filename, 'rb') as f:
231            line_length = 0
232            while 1:
233                L = f.readline(line_length)
234                if not L and line_length != 0: break
235                self.assertTrue(len(L) <= line_length)
236                line_length = (line_length + 1) % 50
237
238    def test_readlines(self):
239        self.test_write()
240        # Try .readlines()
241
242        with gzip.GzipFile(self.filename, 'rb') as f:
243            L = f.readlines()
244
245        with gzip.GzipFile(self.filename, 'rb') as f:
246            while 1:
247                L = f.readlines(150)
248                if L == []: break
249
250    def test_seek_read(self):
251        self.test_write()
252        # Try seek, read test
253
254        with gzip.GzipFile(self.filename) as f:
255            while 1:
256                oldpos = f.tell()
257                line1 = f.readline()
258                if not line1: break
259                newpos = f.tell()
260                f.seek(oldpos)  # negative seek
261                if len(line1)>10:
262                    amount = 10
263                else:
264                    amount = len(line1)
265                line2 = f.read(amount)
266                self.assertEqual(line1[:amount], line2)
267                f.seek(newpos)  # positive seek
268
269    def test_seek_whence(self):
270        self.test_write()
271        # Try seek(whence=1), read test
272
273        with gzip.GzipFile(self.filename) as f:
274            f.read(10)
275            f.seek(10, whence=1)
276            y = f.read(10)
277        self.assertEqual(y, data1[20:30])
278
279    def test_seek_write(self):
280        # Try seek, write test
281        with gzip.GzipFile(self.filename, 'w') as f:
282            for pos in range(0, 256, 16):
283                f.seek(pos)
284                f.write(b'GZ\n')
285
286    def test_mode(self):
287        self.test_write()
288        with gzip.GzipFile(self.filename, 'r') as f:
289            self.assertEqual(f.myfileobj.mode, 'rb')
290        os_helper.unlink(self.filename)
291        with gzip.GzipFile(self.filename, 'x') as f:
292            self.assertEqual(f.myfileobj.mode, 'xb')
293
294    def test_1647484(self):
295        for mode in ('wb', 'rb'):
296            with gzip.GzipFile(self.filename, mode) as f:
297                self.assertTrue(hasattr(f, "name"))
298                self.assertEqual(f.name, self.filename)
299
300    def test_paddedfile_getattr(self):
301        self.test_write()
302        with gzip.GzipFile(self.filename, 'rb') as f:
303            self.assertTrue(hasattr(f.fileobj, "name"))
304            self.assertEqual(f.fileobj.name, self.filename)
305
306    def test_mtime(self):
307        mtime = 123456789
308        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
309            fWrite.write(data1)
310        with gzip.GzipFile(self.filename) as fRead:
311            self.assertTrue(hasattr(fRead, 'mtime'))
312            self.assertIsNone(fRead.mtime)
313            dataRead = fRead.read()
314            self.assertEqual(dataRead, data1)
315            self.assertEqual(fRead.mtime, mtime)
316
317    def test_metadata(self):
318        mtime = 123456789
319
320        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
321            fWrite.write(data1)
322
323        with open(self.filename, 'rb') as fRead:
324            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
325
326            idBytes = fRead.read(2)
327            self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
328
329            cmByte = fRead.read(1)
330            self.assertEqual(cmByte, b'\x08') # deflate
331
332            try:
333                expectedname = self.filename.encode('Latin-1') + b'\x00'
334                expectedflags = b'\x08' # only the FNAME flag is set
335            except UnicodeEncodeError:
336                expectedname = b''
337                expectedflags = b'\x00'
338
339            flagsByte = fRead.read(1)
340            self.assertEqual(flagsByte, expectedflags)
341
342            mtimeBytes = fRead.read(4)
343            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
344
345            xflByte = fRead.read(1)
346            self.assertEqual(xflByte, b'\x02') # maximum compression
347
348            osByte = fRead.read(1)
349            self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
350
351            # Since the FNAME flag is set, the zero-terminated filename follows.
352            # RFC 1952 specifies that this is the name of the input file, if any.
353            # However, the gzip module defaults to storing the name of the output
354            # file in this field.
355            nameBytes = fRead.read(len(expectedname))
356            self.assertEqual(nameBytes, expectedname)
357
358            # Since no other flags were set, the header ends here.
359            # Rather than process the compressed data, let's seek to the trailer.
360            fRead.seek(os.stat(self.filename).st_size - 8)
361
362            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
363            self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
364
365            isizeBytes = fRead.read(4)
366            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
367
368    def test_metadata_ascii_name(self):
369        self.filename = os_helper.TESTFN_ASCII
370        self.test_metadata()
371
372    def test_compresslevel_metadata(self):
373        # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
374        # specifically, discussion of XFL in section 2.3.1
375        cases = [
376            ('fast', 1, b'\x04'),
377            ('best', 9, b'\x02'),
378            ('tradeoff', 6, b'\x00'),
379        ]
380        xflOffset = 8
381
382        for (name, level, expectedXflByte) in cases:
383            with self.subTest(name):
384                fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level)
385                with fWrite:
386                    fWrite.write(data1)
387                with open(self.filename, 'rb') as fRead:
388                    fRead.seek(xflOffset)
389                    xflByte = fRead.read(1)
390                    self.assertEqual(xflByte, expectedXflByte)
391
392    def test_with_open(self):
393        # GzipFile supports the context management protocol
394        with gzip.GzipFile(self.filename, "wb") as f:
395            f.write(b"xxx")
396        f = gzip.GzipFile(self.filename, "rb")
397        f.close()
398        try:
399            with f:
400                pass
401        except ValueError:
402            pass
403        else:
404            self.fail("__enter__ on a closed file didn't raise an exception")
405        try:
406            with gzip.GzipFile(self.filename, "wb") as f:
407                1/0
408        except ZeroDivisionError:
409            pass
410        else:
411            self.fail("1/0 didn't raise an exception")
412
413    def test_zero_padded_file(self):
414        with gzip.GzipFile(self.filename, "wb") as f:
415            f.write(data1 * 50)
416
417        # Pad the file with zeroes
418        with open(self.filename, "ab") as f:
419            f.write(b"\x00" * 50)
420
421        with gzip.GzipFile(self.filename, "rb") as f:
422            d = f.read()
423            self.assertEqual(d, data1 * 50, "Incorrect data in file")
424
425    def test_gzip_BadGzipFile_exception(self):
426        self.assertTrue(issubclass(gzip.BadGzipFile, OSError))
427
428    def test_bad_gzip_file(self):
429        with open(self.filename, 'wb') as file:
430            file.write(data1 * 50)
431        with gzip.GzipFile(self.filename, 'r') as file:
432            self.assertRaises(gzip.BadGzipFile, file.readlines)
433
434    def test_non_seekable_file(self):
435        uncompressed = data1 * 50
436        buf = UnseekableIO()
437        with gzip.GzipFile(fileobj=buf, mode="wb") as f:
438            f.write(uncompressed)
439        compressed = buf.getvalue()
440        buf = UnseekableIO(compressed)
441        with gzip.GzipFile(fileobj=buf, mode="rb") as f:
442            self.assertEqual(f.read(), uncompressed)
443
444    def test_peek(self):
445        uncompressed = data1 * 200
446        with gzip.GzipFile(self.filename, "wb") as f:
447            f.write(uncompressed)
448
449        def sizes():
450            while True:
451                for n in range(5, 50, 10):
452                    yield n
453
454        with gzip.GzipFile(self.filename, "rb") as f:
455            f.max_read_chunk = 33
456            nread = 0
457            for n in sizes():
458                s = f.peek(n)
459                if s == b'':
460                    break
461                self.assertEqual(f.read(len(s)), s)
462                nread += len(s)
463            self.assertEqual(f.read(100), b'')
464            self.assertEqual(nread, len(uncompressed))
465
466    def test_textio_readlines(self):
467        # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
468        lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
469        self.test_write()
470        with gzip.GzipFile(self.filename, 'r') as f:
471            with io.TextIOWrapper(f, encoding="ascii") as t:
472                self.assertEqual(t.readlines(), lines)
473
474    def test_fileobj_from_fdopen(self):
475        # Issue #13781: Opening a GzipFile for writing fails when using a
476        # fileobj created with os.fdopen().
477        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
478        with os.fdopen(fd, "wb") as f:
479            with gzip.GzipFile(fileobj=f, mode="w") as g:
480                pass
481
482    def test_fileobj_mode(self):
483        gzip.GzipFile(self.filename, "wb").close()
484        with open(self.filename, "r+b") as f:
485            with gzip.GzipFile(fileobj=f, mode='r') as g:
486                self.assertEqual(g.mode, gzip.READ)
487            with gzip.GzipFile(fileobj=f, mode='w') as g:
488                self.assertEqual(g.mode, gzip.WRITE)
489            with gzip.GzipFile(fileobj=f, mode='a') as g:
490                self.assertEqual(g.mode, gzip.WRITE)
491            with gzip.GzipFile(fileobj=f, mode='x') as g:
492                self.assertEqual(g.mode, gzip.WRITE)
493            with self.assertRaises(ValueError):
494                gzip.GzipFile(fileobj=f, mode='z')
495        for mode in "rb", "r+b":
496            with open(self.filename, mode) as f:
497                with gzip.GzipFile(fileobj=f) as g:
498                    self.assertEqual(g.mode, gzip.READ)
499        for mode in "wb", "ab", "xb":
500            if "x" in mode:
501                os_helper.unlink(self.filename)
502            with open(self.filename, mode) as f:
503                with self.assertWarns(FutureWarning):
504                    g = gzip.GzipFile(fileobj=f)
505                with g:
506                    self.assertEqual(g.mode, gzip.WRITE)
507
508    def test_bytes_filename(self):
509        str_filename = self.filename
510        try:
511            bytes_filename = str_filename.encode("ascii")
512        except UnicodeEncodeError:
513            self.skipTest("Temporary file name needs to be ASCII")
514        with gzip.GzipFile(bytes_filename, "wb") as f:
515            f.write(data1 * 50)
516        with gzip.GzipFile(bytes_filename, "rb") as f:
517            self.assertEqual(f.read(), data1 * 50)
518        # Sanity check that we are actually operating on the right file.
519        with gzip.GzipFile(str_filename, "rb") as f:
520            self.assertEqual(f.read(), data1 * 50)
521
522    def test_decompress_limited(self):
523        """Decompressed data buffering should be limited"""
524        bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
525        self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
526
527        bomb = io.BytesIO(bomb)
528        decomp = gzip.GzipFile(fileobj=bomb)
529        self.assertEqual(decomp.read(1), b'\0')
530        max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
531        self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
532            "Excessive amount of data was decompressed")
533
534    # Testing compress/decompress shortcut functions
535
536    def test_compress(self):
537        for data in [data1, data2]:
538            for args in [(), (1,), (6,), (9,)]:
539                datac = gzip.compress(data, *args)
540                self.assertEqual(type(datac), bytes)
541                with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
542                    self.assertEqual(f.read(), data)
543
544    def test_compress_mtime(self):
545        mtime = 123456789
546        for data in [data1, data2]:
547            for args in [(), (1,), (6,), (9,)]:
548                with self.subTest(data=data, args=args):
549                    datac = gzip.compress(data, *args, mtime=mtime)
550                    self.assertEqual(type(datac), bytes)
551                    with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
552                        f.read(1) # to set mtime attribute
553                        self.assertEqual(f.mtime, mtime)
554
555    def test_compress_correct_level(self):
556        # gzip.compress calls with mtime == 0 take a different code path.
557        for mtime in (0, 42):
558            with self.subTest(mtime=mtime):
559                nocompress = gzip.compress(data1, compresslevel=0, mtime=mtime)
560                yescompress = gzip.compress(data1, compresslevel=1, mtime=mtime)
561                self.assertIn(data1, nocompress)
562                self.assertNotIn(data1, yescompress)
563
564    def test_decompress(self):
565        for data in (data1, data2):
566            buf = io.BytesIO()
567            with gzip.GzipFile(fileobj=buf, mode="wb") as f:
568                f.write(data)
569            self.assertEqual(gzip.decompress(buf.getvalue()), data)
570            # Roundtrip with compress
571            datac = gzip.compress(data)
572            self.assertEqual(gzip.decompress(datac), data)
573
574    def test_decompress_truncated_trailer(self):
575        compressed_data = gzip.compress(data1)
576        self.assertRaises(EOFError, gzip.decompress, compressed_data[:-4])
577
578    def test_decompress_missing_trailer(self):
579        compressed_data = gzip.compress(data1)
580        self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
581
582    def test_read_truncated(self):
583        data = data1*50
584        # Drop the CRC (4 bytes) and file size (4 bytes).
585        truncated = gzip.compress(data)[:-8]
586        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
587            self.assertRaises(EOFError, f.read)
588        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
589            self.assertEqual(f.read(len(data)), data)
590            self.assertRaises(EOFError, f.read, 1)
591        # Incomplete 10-byte header.
592        for i in range(2, 10):
593            with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
594                self.assertRaises(EOFError, f.read, 1)
595
596    def test_read_with_extra(self):
597        # Gzip data with an extra field
598        gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
599                  b'\x05\x00Extra'
600                  b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
601        with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
602            self.assertEqual(f.read(), b'Test')
603
604    def test_prepend_error(self):
605        # See issue #20875
606        with gzip.open(self.filename, "wb") as f:
607            f.write(data1)
608        with gzip.open(self.filename, "rb") as f:
609            f._buffer.raw._fp.prepend()
610
611    def test_issue44439(self):
612        q = array.array('Q', [1, 2, 3, 4, 5])
613        LENGTH = len(q) * q.itemsize
614
615        with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as f:
616            self.assertEqual(f.write(q), LENGTH)
617            self.assertEqual(f.tell(), LENGTH)
618
619
620class TestOpen(BaseTest):
621    def test_binary_modes(self):
622        uncompressed = data1 * 50
623
624        with gzip.open(self.filename, "wb") as f:
625            f.write(uncompressed)
626        with open(self.filename, "rb") as f:
627            file_data = gzip.decompress(f.read())
628            self.assertEqual(file_data, uncompressed)
629
630        with gzip.open(self.filename, "rb") as f:
631            self.assertEqual(f.read(), uncompressed)
632
633        with gzip.open(self.filename, "ab") as f:
634            f.write(uncompressed)
635        with open(self.filename, "rb") as f:
636            file_data = gzip.decompress(f.read())
637            self.assertEqual(file_data, uncompressed * 2)
638
639        with self.assertRaises(FileExistsError):
640            gzip.open(self.filename, "xb")
641        os_helper.unlink(self.filename)
642        with gzip.open(self.filename, "xb") as f:
643            f.write(uncompressed)
644        with open(self.filename, "rb") as f:
645            file_data = gzip.decompress(f.read())
646            self.assertEqual(file_data, uncompressed)
647
648    def test_pathlike_file(self):
649        filename = pathlib.Path(self.filename)
650        with gzip.open(filename, "wb") as f:
651            f.write(data1 * 50)
652        with gzip.open(filename, "ab") as f:
653            f.write(data1)
654        with gzip.open(filename) as f:
655            self.assertEqual(f.read(), data1 * 51)
656
657    def test_implicit_binary_modes(self):
658        # Test implicit binary modes (no "b" or "t" in mode string).
659        uncompressed = data1 * 50
660
661        with gzip.open(self.filename, "w") as f:
662            f.write(uncompressed)
663        with open(self.filename, "rb") as f:
664            file_data = gzip.decompress(f.read())
665            self.assertEqual(file_data, uncompressed)
666
667        with gzip.open(self.filename, "r") as f:
668            self.assertEqual(f.read(), uncompressed)
669
670        with gzip.open(self.filename, "a") as f:
671            f.write(uncompressed)
672        with open(self.filename, "rb") as f:
673            file_data = gzip.decompress(f.read())
674            self.assertEqual(file_data, uncompressed * 2)
675
676        with self.assertRaises(FileExistsError):
677            gzip.open(self.filename, "x")
678        os_helper.unlink(self.filename)
679        with gzip.open(self.filename, "x") as f:
680            f.write(uncompressed)
681        with open(self.filename, "rb") as f:
682            file_data = gzip.decompress(f.read())
683            self.assertEqual(file_data, uncompressed)
684
685    def test_text_modes(self):
686        uncompressed = data1.decode("ascii") * 50
687        uncompressed_raw = uncompressed.replace("\n", os.linesep)
688        with gzip.open(self.filename, "wt", encoding="ascii") as f:
689            f.write(uncompressed)
690        with open(self.filename, "rb") as f:
691            file_data = gzip.decompress(f.read()).decode("ascii")
692            self.assertEqual(file_data, uncompressed_raw)
693        with gzip.open(self.filename, "rt", encoding="ascii") as f:
694            self.assertEqual(f.read(), uncompressed)
695        with gzip.open(self.filename, "at", encoding="ascii") as f:
696            f.write(uncompressed)
697        with open(self.filename, "rb") as f:
698            file_data = gzip.decompress(f.read()).decode("ascii")
699            self.assertEqual(file_data, uncompressed_raw * 2)
700
701    def test_fileobj(self):
702        uncompressed_bytes = data1 * 50
703        uncompressed_str = uncompressed_bytes.decode("ascii")
704        compressed = gzip.compress(uncompressed_bytes)
705        with gzip.open(io.BytesIO(compressed), "r") as f:
706            self.assertEqual(f.read(), uncompressed_bytes)
707        with gzip.open(io.BytesIO(compressed), "rb") as f:
708            self.assertEqual(f.read(), uncompressed_bytes)
709        with gzip.open(io.BytesIO(compressed), "rt", encoding="ascii") as f:
710            self.assertEqual(f.read(), uncompressed_str)
711
712    def test_bad_params(self):
713        # Test invalid parameter combinations.
714        with self.assertRaises(TypeError):
715            gzip.open(123.456)
716        with self.assertRaises(ValueError):
717            gzip.open(self.filename, "wbt")
718        with self.assertRaises(ValueError):
719            gzip.open(self.filename, "xbt")
720        with self.assertRaises(ValueError):
721            gzip.open(self.filename, "rb", encoding="utf-8")
722        with self.assertRaises(ValueError):
723            gzip.open(self.filename, "rb", errors="ignore")
724        with self.assertRaises(ValueError):
725            gzip.open(self.filename, "rb", newline="\n")
726
727    def test_encoding(self):
728        # Test non-default encoding.
729        uncompressed = data1.decode("ascii") * 50
730        uncompressed_raw = uncompressed.replace("\n", os.linesep)
731        with gzip.open(self.filename, "wt", encoding="utf-16") as f:
732            f.write(uncompressed)
733        with open(self.filename, "rb") as f:
734            file_data = gzip.decompress(f.read()).decode("utf-16")
735            self.assertEqual(file_data, uncompressed_raw)
736        with gzip.open(self.filename, "rt", encoding="utf-16") as f:
737            self.assertEqual(f.read(), uncompressed)
738
739    def test_encoding_error_handler(self):
740        # Test with non-default encoding error handler.
741        with gzip.open(self.filename, "wb") as f:
742            f.write(b"foo\xffbar")
743        with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
744                as f:
745            self.assertEqual(f.read(), "foobar")
746
747    def test_newline(self):
748        # Test with explicit newline (universal newline mode disabled).
749        uncompressed = data1.decode("ascii") * 50
750        with gzip.open(self.filename, "wt", encoding="ascii", newline="\n") as f:
751            f.write(uncompressed)
752        with gzip.open(self.filename, "rt", encoding="ascii", newline="\r") as f:
753            self.assertEqual(f.readlines(), [uncompressed])
754
755
756def create_and_remove_directory(directory):
757    def decorator(function):
758        @functools.wraps(function)
759        def wrapper(*args, **kwargs):
760            os.makedirs(directory)
761            try:
762                return function(*args, **kwargs)
763            finally:
764                os_helper.rmtree(directory)
765        return wrapper
766    return decorator
767
768
769class TestCommandLine(unittest.TestCase):
770    data = b'This is a simple test with gzip'
771
772    @requires_subprocess()
773    def test_decompress_stdin_stdout(self):
774        with io.BytesIO() as bytes_io:
775            with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file:
776                gzip_file.write(self.data)
777
778            args = sys.executable, '-m', 'gzip', '-d'
779            with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
780                out, err = proc.communicate(bytes_io.getvalue())
781
782        self.assertEqual(err, b'')
783        self.assertEqual(out, self.data)
784
785    @create_and_remove_directory(TEMPDIR)
786    def test_decompress_infile_outfile(self):
787        gzipname = os.path.join(TEMPDIR, 'testgzip.gz')
788        self.assertFalse(os.path.exists(gzipname))
789
790        with gzip.open(gzipname, mode='wb') as fp:
791            fp.write(self.data)
792        rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname)
793
794        with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped:
795            self.assertEqual(gunziped.read(), self.data)
796
797        self.assertTrue(os.path.exists(gzipname))
798        self.assertEqual(rc, 0)
799        self.assertEqual(out, b'')
800        self.assertEqual(err, b'')
801
802    def test_decompress_infile_outfile_error(self):
803        rc, out, err = assert_python_failure('-m', 'gzip', '-d', 'thisisatest.out')
804        self.assertEqual(b"filename doesn't end in .gz: 'thisisatest.out'", err.strip())
805        self.assertEqual(rc, 1)
806        self.assertEqual(out, b'')
807
808    @requires_subprocess()
809    @create_and_remove_directory(TEMPDIR)
810    def test_compress_stdin_outfile(self):
811        args = sys.executable, '-m', 'gzip'
812        with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
813            out, err = proc.communicate(self.data)
814
815        self.assertEqual(err, b'')
816        self.assertEqual(out[:2], b"\x1f\x8b")
817
818    @create_and_remove_directory(TEMPDIR)
819    def test_compress_infile_outfile_default(self):
820        local_testgzip = os.path.join(TEMPDIR, 'testgzip')
821        gzipname = local_testgzip + '.gz'
822        self.assertFalse(os.path.exists(gzipname))
823
824        with open(local_testgzip, 'wb') as fp:
825            fp.write(self.data)
826
827        rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip)
828
829        self.assertTrue(os.path.exists(gzipname))
830        self.assertEqual(out, b'')
831        self.assertEqual(err, b'')
832
833    @create_and_remove_directory(TEMPDIR)
834    def test_compress_infile_outfile(self):
835        for compress_level in ('--fast', '--best'):
836            with self.subTest(compress_level=compress_level):
837                local_testgzip = os.path.join(TEMPDIR, 'testgzip')
838                gzipname = local_testgzip + '.gz'
839                self.assertFalse(os.path.exists(gzipname))
840
841                with open(local_testgzip, 'wb') as fp:
842                    fp.write(self.data)
843
844                rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip)
845
846                self.assertTrue(os.path.exists(gzipname))
847                self.assertEqual(out, b'')
848                self.assertEqual(err, b'')
849                os.remove(gzipname)
850                self.assertFalse(os.path.exists(gzipname))
851
852    def test_compress_fast_best_are_exclusive(self):
853        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best')
854        self.assertIn(b"error: argument --best: not allowed with argument --fast", err)
855        self.assertEqual(out, b'')
856
857    def test_decompress_cannot_have_flags_compression(self):
858        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d')
859        self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err)
860        self.assertEqual(out, b'')
861
862
863if __name__ == "__main__":
864    unittest.main()
865