1import unittest
2from test import support
3from test.support import import_helper
4import binascii
5import copy
6import os
7import pickle
8import random
9import sys
10from test.support import bigmemtest, _1G, _4G
11
12
13zlib = import_helper.import_module('zlib')
14
15requires_Compress_copy = unittest.skipUnless(
16        hasattr(zlib.compressobj(), "copy"),
17        'requires Compress.copy()')
18requires_Decompress_copy = unittest.skipUnless(
19        hasattr(zlib.decompressobj(), "copy"),
20        'requires Decompress.copy()')
21
22# bpo-46623: On s390x, when a hardware accelerator is used, using different
23# ways to compress data with zlib can produce different compressed data.
24# Simplified test_pair() code:
25#
26#   def func1(data):
27#       return zlib.compress(data)
28#
29#   def func2(data)
30#       co = zlib.compressobj()
31#       x1 = co.compress(data)
32#       x2 = co.flush()
33#       return x1 + x2
34#
35# On s390x if zlib uses a hardware accelerator, func1() creates a single
36# "final" compressed block whereas func2() produces 3 compressed blocks (the
37# last one is a final block). On other platforms with no accelerator, func1()
38# and func2() produce the same compressed data made of a single (final)
39# compressed block.
40#
41# Only the compressed data is different, the decompression returns the original
42# data:
43#
44#   zlib.decompress(func1(data)) == zlib.decompress(func2(data)) == data
45#
46# Make the assumption that s390x always has an accelerator to simplify the skip
47# condition. Windows doesn't have os.uname() but it doesn't support s390x.
48skip_on_s390x = unittest.skipIf(hasattr(os, 'uname') and os.uname().machine == 's390x',
49                                'skipped on s390x')
50
51
52class VersionTestCase(unittest.TestCase):
53
54    def test_library_version(self):
55        # Test that the major version of the actual library in use matches the
56        # major version that we were compiled against. We can't guarantee that
57        # the minor versions will match (even on the machine on which the module
58        # was compiled), and the API is stable between minor versions, so
59        # testing only the major versions avoids spurious failures.
60        self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
61
62
63class ChecksumTestCase(unittest.TestCase):
64    # checksum test cases
65    def test_crc32start(self):
66        self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
67        self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
68
69    def test_crc32empty(self):
70        self.assertEqual(zlib.crc32(b"", 0), 0)
71        self.assertEqual(zlib.crc32(b"", 1), 1)
72        self.assertEqual(zlib.crc32(b"", 432), 432)
73
74    def test_adler32start(self):
75        self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
76        self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
77
78    def test_adler32empty(self):
79        self.assertEqual(zlib.adler32(b"", 0), 0)
80        self.assertEqual(zlib.adler32(b"", 1), 1)
81        self.assertEqual(zlib.adler32(b"", 432), 432)
82
83    def test_penguins(self):
84        self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
85        self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
86        self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
87        self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
88
89        self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
90        self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
91
92    def test_crc32_adler32_unsigned(self):
93        foo = b'abcdefghijklmnop'
94        # explicitly test signed behavior
95        self.assertEqual(zlib.crc32(foo), 2486878355)
96        self.assertEqual(zlib.crc32(b'spam'), 1138425661)
97        self.assertEqual(zlib.adler32(foo+foo), 3573550353)
98        self.assertEqual(zlib.adler32(b'spam'), 72286642)
99
100    def test_same_as_binascii_crc32(self):
101        foo = b'abcdefghijklmnop'
102        crc = 2486878355
103        self.assertEqual(binascii.crc32(foo), crc)
104        self.assertEqual(zlib.crc32(foo), crc)
105        self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
106
107
108# Issue #10276 - check that inputs >=4 GiB are handled correctly.
109class ChecksumBigBufferTestCase(unittest.TestCase):
110
111    @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
112    def test_big_buffer(self, size):
113        data = b"nyan" * (_1G + 1)
114        self.assertEqual(zlib.crc32(data), 1044521549)
115        self.assertEqual(zlib.adler32(data), 2256789997)
116
117
118class ExceptionTestCase(unittest.TestCase):
119    # make sure we generate some expected errors
120    def test_badlevel(self):
121        # specifying compression level out of range causes an error
122        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
123        # accepts 0 too)
124        self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
125
126    def test_badargs(self):
127        self.assertRaises(TypeError, zlib.adler32)
128        self.assertRaises(TypeError, zlib.crc32)
129        self.assertRaises(TypeError, zlib.compress)
130        self.assertRaises(TypeError, zlib.decompress)
131        for arg in (42, None, '', 'abc', (), []):
132            self.assertRaises(TypeError, zlib.adler32, arg)
133            self.assertRaises(TypeError, zlib.crc32, arg)
134            self.assertRaises(TypeError, zlib.compress, arg)
135            self.assertRaises(TypeError, zlib.decompress, arg)
136
137    def test_badcompressobj(self):
138        # verify failure on building compress object with bad params
139        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
140        # specifying total bits too large causes an error
141        self.assertRaises(ValueError,
142                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
143
144    def test_baddecompressobj(self):
145        # verify failure on building decompress object with bad params
146        self.assertRaises(ValueError, zlib.decompressobj, -1)
147
148    def test_decompressobj_badflush(self):
149        # verify failure on calling decompressobj.flush with bad params
150        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
151        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
152
153    @support.cpython_only
154    def test_overflow(self):
155        with self.assertRaisesRegex(OverflowError, 'int too large'):
156            zlib.decompress(b'', 15, sys.maxsize + 1)
157        with self.assertRaisesRegex(OverflowError, 'int too large'):
158            zlib.decompressobj().decompress(b'', sys.maxsize + 1)
159        with self.assertRaisesRegex(OverflowError, 'int too large'):
160            zlib.decompressobj().flush(sys.maxsize + 1)
161
162    @support.cpython_only
163    def test_disallow_instantiation(self):
164        # Ensure that the type disallows instantiation (bpo-43916)
165        support.check_disallow_instantiation(self, type(zlib.compressobj()))
166        support.check_disallow_instantiation(self, type(zlib.decompressobj()))
167
168
169class BaseCompressTestCase(object):
170    def check_big_compress_buffer(self, size, compress_func):
171        _1M = 1024 * 1024
172        # Generate 10 MiB worth of random, and expand it by repeating it.
173        # The assumption is that zlib's memory is not big enough to exploit
174        # such spread out redundancy.
175        data = random.randbytes(_1M * 10)
176        data = data * (size // len(data) + 1)
177        try:
178            compress_func(data)
179        finally:
180            # Release memory
181            data = None
182
183    def check_big_decompress_buffer(self, size, decompress_func):
184        data = b'x' * size
185        try:
186            compressed = zlib.compress(data, 1)
187        finally:
188            # Release memory
189            data = None
190        data = decompress_func(compressed)
191        # Sanity check
192        try:
193            self.assertEqual(len(data), size)
194            self.assertEqual(len(data.strip(b'x')), 0)
195        finally:
196            data = None
197
198
199class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
200    # Test compression in one go (whole message compression)
201    def test_speech(self):
202        x = zlib.compress(HAMLET_SCENE)
203        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
204
205    def test_keywords(self):
206        x = zlib.compress(HAMLET_SCENE, level=3)
207        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
208        with self.assertRaises(TypeError):
209            zlib.compress(data=HAMLET_SCENE, level=3)
210        self.assertEqual(zlib.decompress(x,
211                                         wbits=zlib.MAX_WBITS,
212                                         bufsize=zlib.DEF_BUF_SIZE),
213                         HAMLET_SCENE)
214
215    @skip_on_s390x
216    def test_speech128(self):
217        # compress more data
218        data = HAMLET_SCENE * 128
219        x = zlib.compress(data)
220        self.assertEqual(zlib.compress(bytearray(data)), x)
221        for ob in x, bytearray(x):
222            self.assertEqual(zlib.decompress(ob), data)
223
224    def test_incomplete_stream(self):
225        # A useful error message is given
226        x = zlib.compress(HAMLET_SCENE)
227        self.assertRaisesRegex(zlib.error,
228            "Error -5 while decompressing data: incomplete or truncated stream",
229            zlib.decompress, x[:-1])
230
231    # Memory use of the following functions takes into account overallocation
232
233    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
234    def test_big_compress_buffer(self, size):
235        compress = lambda s: zlib.compress(s, 1)
236        self.check_big_compress_buffer(size, compress)
237
238    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
239    def test_big_decompress_buffer(self, size):
240        self.check_big_decompress_buffer(size, zlib.decompress)
241
242    @bigmemtest(size=_4G, memuse=1)
243    def test_large_bufsize(self, size):
244        # Test decompress(bufsize) parameter greater than the internal limit
245        data = HAMLET_SCENE * 10
246        compressed = zlib.compress(data, 1)
247        self.assertEqual(zlib.decompress(compressed, 15, size), data)
248
249    def test_custom_bufsize(self):
250        data = HAMLET_SCENE * 10
251        compressed = zlib.compress(data, 1)
252        self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
253
254    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
255    @bigmemtest(size=_4G + 100, memuse=4)
256    def test_64bit_compress(self, size):
257        data = b'x' * size
258        try:
259            comp = zlib.compress(data, 0)
260            self.assertEqual(zlib.decompress(comp), data)
261        finally:
262            comp = data = None
263
264
265class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
266    # Test compression object
267    @skip_on_s390x
268    def test_pair(self):
269        # straightforward compress/decompress objects
270        datasrc = HAMLET_SCENE * 128
271        datazip = zlib.compress(datasrc)
272        # should compress both bytes and bytearray data
273        for data in (datasrc, bytearray(datasrc)):
274            co = zlib.compressobj()
275            x1 = co.compress(data)
276            x2 = co.flush()
277            self.assertRaises(zlib.error, co.flush) # second flush should not work
278            self.assertEqual(x1 + x2, datazip)
279        for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
280            dco = zlib.decompressobj()
281            y1 = dco.decompress(v1 + v2)
282            y2 = dco.flush()
283            self.assertEqual(data, y1 + y2)
284            self.assertIsInstance(dco.unconsumed_tail, bytes)
285            self.assertIsInstance(dco.unused_data, bytes)
286
287    def test_keywords(self):
288        level = 2
289        method = zlib.DEFLATED
290        wbits = -12
291        memLevel = 9
292        strategy = zlib.Z_FILTERED
293        co = zlib.compressobj(level=level,
294                              method=method,
295                              wbits=wbits,
296                              memLevel=memLevel,
297                              strategy=strategy,
298                              zdict=b"")
299        do = zlib.decompressobj(wbits=wbits, zdict=b"")
300        with self.assertRaises(TypeError):
301            co.compress(data=HAMLET_SCENE)
302        with self.assertRaises(TypeError):
303            do.decompress(data=zlib.compress(HAMLET_SCENE))
304        x = co.compress(HAMLET_SCENE) + co.flush()
305        y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
306        self.assertEqual(HAMLET_SCENE, y)
307
308    def test_compressoptions(self):
309        # specify lots of options to compressobj()
310        level = 2
311        method = zlib.DEFLATED
312        wbits = -12
313        memLevel = 9
314        strategy = zlib.Z_FILTERED
315        co = zlib.compressobj(level, method, wbits, memLevel, strategy)
316        x1 = co.compress(HAMLET_SCENE)
317        x2 = co.flush()
318        dco = zlib.decompressobj(wbits)
319        y1 = dco.decompress(x1 + x2)
320        y2 = dco.flush()
321        self.assertEqual(HAMLET_SCENE, y1 + y2)
322
323    def test_compressincremental(self):
324        # compress object in steps, decompress object as one-shot
325        data = HAMLET_SCENE * 128
326        co = zlib.compressobj()
327        bufs = []
328        for i in range(0, len(data), 256):
329            bufs.append(co.compress(data[i:i+256]))
330        bufs.append(co.flush())
331        combuf = b''.join(bufs)
332
333        dco = zlib.decompressobj()
334        y1 = dco.decompress(b''.join(bufs))
335        y2 = dco.flush()
336        self.assertEqual(data, y1 + y2)
337
338    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
339        # compress object in steps, decompress object in steps
340        source = source or HAMLET_SCENE
341        data = source * 128
342        co = zlib.compressobj()
343        bufs = []
344        for i in range(0, len(data), cx):
345            bufs.append(co.compress(data[i:i+cx]))
346        bufs.append(co.flush())
347        combuf = b''.join(bufs)
348
349        decombuf = zlib.decompress(combuf)
350        # Test type of return value
351        self.assertIsInstance(decombuf, bytes)
352
353        self.assertEqual(data, decombuf)
354
355        dco = zlib.decompressobj()
356        bufs = []
357        for i in range(0, len(combuf), dcx):
358            bufs.append(dco.decompress(combuf[i:i+dcx]))
359            self.assertEqual(b'', dco.unconsumed_tail, ########
360                             "(A) uct should be b'': not %d long" %
361                                       len(dco.unconsumed_tail))
362            self.assertEqual(b'', dco.unused_data)
363        if flush:
364            bufs.append(dco.flush())
365        else:
366            while True:
367                chunk = dco.decompress(b'')
368                if chunk:
369                    bufs.append(chunk)
370                else:
371                    break
372        self.assertEqual(b'', dco.unconsumed_tail, ########
373                         "(B) uct should be b'': not %d long" %
374                                       len(dco.unconsumed_tail))
375        self.assertEqual(b'', dco.unused_data)
376        self.assertEqual(data, b''.join(bufs))
377        # Failure means: "decompressobj with init options failed"
378
379    def test_decompincflush(self):
380        self.test_decompinc(flush=True)
381
382    def test_decompimax(self, source=None, cx=256, dcx=64):
383        # compress in steps, decompress in length-restricted steps
384        source = source or HAMLET_SCENE
385        # Check a decompression object with max_length specified
386        data = source * 128
387        co = zlib.compressobj()
388        bufs = []
389        for i in range(0, len(data), cx):
390            bufs.append(co.compress(data[i:i+cx]))
391        bufs.append(co.flush())
392        combuf = b''.join(bufs)
393        self.assertEqual(data, zlib.decompress(combuf),
394                         'compressed data failure')
395
396        dco = zlib.decompressobj()
397        bufs = []
398        cb = combuf
399        while cb:
400            #max_length = 1 + len(cb)//10
401            chunk = dco.decompress(cb, dcx)
402            self.assertFalse(len(chunk) > dcx,
403                    'chunk too big (%d>%d)' % (len(chunk), dcx))
404            bufs.append(chunk)
405            cb = dco.unconsumed_tail
406        bufs.append(dco.flush())
407        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
408
409    def test_decompressmaxlen(self, flush=False):
410        # Check a decompression object with max_length specified
411        data = HAMLET_SCENE * 128
412        co = zlib.compressobj()
413        bufs = []
414        for i in range(0, len(data), 256):
415            bufs.append(co.compress(data[i:i+256]))
416        bufs.append(co.flush())
417        combuf = b''.join(bufs)
418        self.assertEqual(data, zlib.decompress(combuf),
419                         'compressed data failure')
420
421        dco = zlib.decompressobj()
422        bufs = []
423        cb = combuf
424        while cb:
425            max_length = 1 + len(cb)//10
426            chunk = dco.decompress(cb, max_length)
427            self.assertFalse(len(chunk) > max_length,
428                        'chunk too big (%d>%d)' % (len(chunk),max_length))
429            bufs.append(chunk)
430            cb = dco.unconsumed_tail
431        if flush:
432            bufs.append(dco.flush())
433        else:
434            while chunk:
435                chunk = dco.decompress(b'', max_length)
436                self.assertFalse(len(chunk) > max_length,
437                            'chunk too big (%d>%d)' % (len(chunk),max_length))
438                bufs.append(chunk)
439        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
440
441    def test_decompressmaxlenflush(self):
442        self.test_decompressmaxlen(flush=True)
443
444    def test_maxlenmisc(self):
445        # Misc tests of max_length
446        dco = zlib.decompressobj()
447        self.assertRaises(ValueError, dco.decompress, b"", -1)
448        self.assertEqual(b'', dco.unconsumed_tail)
449
450    def test_maxlen_large(self):
451        # Sizes up to sys.maxsize should be accepted, although zlib is
452        # internally limited to expressing sizes with unsigned int
453        data = HAMLET_SCENE * 10
454        self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
455        compressed = zlib.compress(data, 1)
456        dco = zlib.decompressobj()
457        self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
458
459    def test_maxlen_custom(self):
460        data = HAMLET_SCENE * 10
461        compressed = zlib.compress(data, 1)
462        dco = zlib.decompressobj()
463        self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
464
465    def test_clear_unconsumed_tail(self):
466        # Issue #12050: calling decompress() without providing max_length
467        # should clear the unconsumed_tail attribute.
468        cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
469        dco = zlib.decompressobj()
470        ddata = dco.decompress(cdata, 1)
471        ddata += dco.decompress(dco.unconsumed_tail)
472        self.assertEqual(dco.unconsumed_tail, b"")
473
474    def test_flushes(self):
475        # Test flush() with the various options, using all the
476        # different levels in order to provide more variations.
477        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
478                    'Z_PARTIAL_FLUSH']
479
480        ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
481        # Z_BLOCK has a known failure prior to 1.2.5.3
482        if ver >= (1, 2, 5, 3):
483            sync_opt.append('Z_BLOCK')
484
485        sync_opt = [getattr(zlib, opt) for opt in sync_opt
486                    if hasattr(zlib, opt)]
487        data = HAMLET_SCENE * 8
488
489        for sync in sync_opt:
490            for level in range(10):
491                try:
492                    obj = zlib.compressobj( level )
493                    a = obj.compress( data[:3000] )
494                    b = obj.flush( sync )
495                    c = obj.compress( data[3000:] )
496                    d = obj.flush()
497                except:
498                    print("Error for flush mode={}, level={}"
499                          .format(sync, level))
500                    raise
501                self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
502                                 data, ("Decompress failed: flush "
503                                        "mode=%i, level=%i") % (sync, level))
504                del obj
505
506    @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
507                         'requires zlib.Z_SYNC_FLUSH')
508    def test_odd_flush(self):
509        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
510        import random
511        # Testing on 17K of "random" data
512
513        # Create compressor and decompressor objects
514        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
515        dco = zlib.decompressobj()
516
517        # Try 17K of data
518        # generate random data stream
519        try:
520            # In 2.3 and later, WichmannHill is the RNG of the bug report
521            gen = random.WichmannHill()
522        except AttributeError:
523            try:
524                # 2.2 called it Random
525                gen = random.Random()
526            except AttributeError:
527                # others might simply have a single RNG
528                gen = random
529        gen.seed(1)
530        data = gen.randbytes(17 * 1024)
531
532        # compress, sync-flush, and decompress
533        first = co.compress(data)
534        second = co.flush(zlib.Z_SYNC_FLUSH)
535        expanded = dco.decompress(first + second)
536
537        # if decompressed data is different from the input data, choke.
538        self.assertEqual(expanded, data, "17K random source doesn't match")
539
540    def test_empty_flush(self):
541        # Test that calling .flush() on unused objects works.
542        # (Bug #1083110 -- calling .flush() on decompress objects
543        # caused a core dump.)
544
545        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
546        self.assertTrue(co.flush())  # Returns a zlib header
547        dco = zlib.decompressobj()
548        self.assertEqual(dco.flush(), b"") # Returns nothing
549
550    def test_dictionary(self):
551        h = HAMLET_SCENE
552        # Build a simulated dictionary out of the words in HAMLET.
553        words = h.split()
554        random.shuffle(words)
555        zdict = b''.join(words)
556        # Use it to compress HAMLET.
557        co = zlib.compressobj(zdict=zdict)
558        cd = co.compress(h) + co.flush()
559        # Verify that it will decompress with the dictionary.
560        dco = zlib.decompressobj(zdict=zdict)
561        self.assertEqual(dco.decompress(cd) + dco.flush(), h)
562        # Verify that it fails when not given the dictionary.
563        dco = zlib.decompressobj()
564        self.assertRaises(zlib.error, dco.decompress, cd)
565
566    def test_dictionary_streaming(self):
567        # This simulates the reuse of a compressor object for compressing
568        # several separate data streams.
569        co = zlib.compressobj(zdict=HAMLET_SCENE)
570        do = zlib.decompressobj(zdict=HAMLET_SCENE)
571        piece = HAMLET_SCENE[1000:1500]
572        d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
573        d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
574        d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
575        self.assertEqual(do.decompress(d0), piece)
576        self.assertEqual(do.decompress(d1), piece[100:])
577        self.assertEqual(do.decompress(d2), piece[:-100])
578
579    def test_decompress_incomplete_stream(self):
580        # This is 'foo', deflated
581        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
582        # For the record
583        self.assertEqual(zlib.decompress(x), b'foo')
584        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
585        # Omitting the stream end works with decompressor objects
586        # (see issue #8672).
587        dco = zlib.decompressobj()
588        y = dco.decompress(x[:-5])
589        y += dco.flush()
590        self.assertEqual(y, b'foo')
591
592    def test_decompress_eof(self):
593        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
594        dco = zlib.decompressobj()
595        self.assertFalse(dco.eof)
596        dco.decompress(x[:-5])
597        self.assertFalse(dco.eof)
598        dco.decompress(x[-5:])
599        self.assertTrue(dco.eof)
600        dco.flush()
601        self.assertTrue(dco.eof)
602
603    def test_decompress_eof_incomplete_stream(self):
604        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
605        dco = zlib.decompressobj()
606        self.assertFalse(dco.eof)
607        dco.decompress(x[:-5])
608        self.assertFalse(dco.eof)
609        dco.flush()
610        self.assertFalse(dco.eof)
611
612    def test_decompress_unused_data(self):
613        # Repeated calls to decompress() after EOF should accumulate data in
614        # dco.unused_data, instead of just storing the arg to the last call.
615        source = b'abcdefghijklmnopqrstuvwxyz'
616        remainder = b'0123456789'
617        y = zlib.compress(source)
618        x = y + remainder
619        for maxlen in 0, 1000:
620            for step in 1, 2, len(y), len(x):
621                dco = zlib.decompressobj()
622                data = b''
623                for i in range(0, len(x), step):
624                    if i < len(y):
625                        self.assertEqual(dco.unused_data, b'')
626                    if maxlen == 0:
627                        data += dco.decompress(x[i : i + step])
628                        self.assertEqual(dco.unconsumed_tail, b'')
629                    else:
630                        data += dco.decompress(
631                                dco.unconsumed_tail + x[i : i + step], maxlen)
632                data += dco.flush()
633                self.assertTrue(dco.eof)
634                self.assertEqual(data, source)
635                self.assertEqual(dco.unconsumed_tail, b'')
636                self.assertEqual(dco.unused_data, remainder)
637
638    # issue27164
639    def test_decompress_raw_with_dictionary(self):
640        zdict = b'abcdefghijklmnopqrstuvwxyz'
641        co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
642        comp = co.compress(zdict) + co.flush()
643        dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
644        uncomp = dco.decompress(comp) + dco.flush()
645        self.assertEqual(zdict, uncomp)
646
647    def test_flush_with_freed_input(self):
648        # Issue #16411: decompressor accesses input to last decompress() call
649        # in flush(), even if this object has been freed in the meanwhile.
650        input1 = b'abcdefghijklmnopqrstuvwxyz'
651        input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
652        data = zlib.compress(input1)
653        dco = zlib.decompressobj()
654        dco.decompress(data, 1)
655        del data
656        data = zlib.compress(input2)
657        self.assertEqual(dco.flush(), input1[1:])
658
659    @bigmemtest(size=_4G, memuse=1)
660    def test_flush_large_length(self, size):
661        # Test flush(length) parameter greater than internal limit UINT_MAX
662        input = HAMLET_SCENE * 10
663        data = zlib.compress(input, 1)
664        dco = zlib.decompressobj()
665        dco.decompress(data, 1)
666        self.assertEqual(dco.flush(size), input[1:])
667
668    def test_flush_custom_length(self):
669        input = HAMLET_SCENE * 10
670        data = zlib.compress(input, 1)
671        dco = zlib.decompressobj()
672        dco.decompress(data, 1)
673        self.assertEqual(dco.flush(CustomInt()), input[1:])
674
675    @requires_Compress_copy
676    def test_compresscopy(self):
677        # Test copying a compression object
678        data0 = HAMLET_SCENE
679        data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
680        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
681            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
682            bufs0 = []
683            bufs0.append(c0.compress(data0))
684
685            c1 = func(c0)
686            bufs1 = bufs0[:]
687
688            bufs0.append(c0.compress(data0))
689            bufs0.append(c0.flush())
690            s0 = b''.join(bufs0)
691
692            bufs1.append(c1.compress(data1))
693            bufs1.append(c1.flush())
694            s1 = b''.join(bufs1)
695
696            self.assertEqual(zlib.decompress(s0),data0+data0)
697            self.assertEqual(zlib.decompress(s1),data0+data1)
698
699    @requires_Compress_copy
700    def test_badcompresscopy(self):
701        # Test copying a compression object in an inconsistent state
702        c = zlib.compressobj()
703        c.compress(HAMLET_SCENE)
704        c.flush()
705        self.assertRaises(ValueError, c.copy)
706        self.assertRaises(ValueError, copy.copy, c)
707        self.assertRaises(ValueError, copy.deepcopy, c)
708
709    @requires_Decompress_copy
710    def test_decompresscopy(self):
711        # Test copying a decompression object
712        data = HAMLET_SCENE
713        comp = zlib.compress(data)
714        # Test type of return value
715        self.assertIsInstance(comp, bytes)
716
717        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
718            d0 = zlib.decompressobj()
719            bufs0 = []
720            bufs0.append(d0.decompress(comp[:32]))
721
722            d1 = func(d0)
723            bufs1 = bufs0[:]
724
725            bufs0.append(d0.decompress(comp[32:]))
726            s0 = b''.join(bufs0)
727
728            bufs1.append(d1.decompress(comp[32:]))
729            s1 = b''.join(bufs1)
730
731            self.assertEqual(s0,s1)
732            self.assertEqual(s0,data)
733
734    @requires_Decompress_copy
735    def test_baddecompresscopy(self):
736        # Test copying a compression object in an inconsistent state
737        data = zlib.compress(HAMLET_SCENE)
738        d = zlib.decompressobj()
739        d.decompress(data)
740        d.flush()
741        self.assertRaises(ValueError, d.copy)
742        self.assertRaises(ValueError, copy.copy, d)
743        self.assertRaises(ValueError, copy.deepcopy, d)
744
745    def test_compresspickle(self):
746        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
747            with self.assertRaises((TypeError, pickle.PicklingError)):
748                pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
749
750    def test_decompresspickle(self):
751        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
752            with self.assertRaises((TypeError, pickle.PicklingError)):
753                pickle.dumps(zlib.decompressobj(), proto)
754
755    # Memory use of the following functions takes into account overallocation
756
757    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
758    def test_big_compress_buffer(self, size):
759        c = zlib.compressobj(1)
760        compress = lambda s: c.compress(s) + c.flush()
761        self.check_big_compress_buffer(size, compress)
762
763    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
764    def test_big_decompress_buffer(self, size):
765        d = zlib.decompressobj()
766        decompress = lambda s: d.decompress(s) + d.flush()
767        self.check_big_decompress_buffer(size, decompress)
768
769    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
770    @bigmemtest(size=_4G + 100, memuse=4)
771    def test_64bit_compress(self, size):
772        data = b'x' * size
773        co = zlib.compressobj(0)
774        do = zlib.decompressobj()
775        try:
776            comp = co.compress(data) + co.flush()
777            uncomp = do.decompress(comp) + do.flush()
778            self.assertEqual(uncomp, data)
779        finally:
780            comp = uncomp = data = None
781
782    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
783    @bigmemtest(size=_4G + 100, memuse=3)
784    def test_large_unused_data(self, size):
785        data = b'abcdefghijklmnop'
786        unused = b'x' * size
787        comp = zlib.compress(data) + unused
788        do = zlib.decompressobj()
789        try:
790            uncomp = do.decompress(comp) + do.flush()
791            self.assertEqual(unused, do.unused_data)
792            self.assertEqual(uncomp, data)
793        finally:
794            unused = comp = do = None
795
796    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
797    @bigmemtest(size=_4G + 100, memuse=5)
798    def test_large_unconsumed_tail(self, size):
799        data = b'x' * size
800        do = zlib.decompressobj()
801        try:
802            comp = zlib.compress(data, 0)
803            uncomp = do.decompress(comp, 1) + do.flush()
804            self.assertEqual(uncomp, data)
805            self.assertEqual(do.unconsumed_tail, b'')
806        finally:
807            comp = uncomp = data = None
808
809    def test_wbits(self):
810        # wbits=0 only supported since zlib v1.2.3.5
811        # Register "1.2.3" as "1.2.3.0"
812        # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
813        v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
814        if len(v) < 4:
815            v.append('0')
816        elif not v[-1].isnumeric():
817            v[-1] = '0'
818
819        v = tuple(map(int, v))
820        supports_wbits_0 = v >= (1, 2, 3, 5)
821
822        co = zlib.compressobj(level=1, wbits=15)
823        zlib15 = co.compress(HAMLET_SCENE) + co.flush()
824        self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
825        if supports_wbits_0:
826            self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
827        self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
828        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
829            zlib.decompress(zlib15, 14)
830        dco = zlib.decompressobj(wbits=32 + 15)
831        self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
832        dco = zlib.decompressobj(wbits=14)
833        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
834            dco.decompress(zlib15)
835
836        co = zlib.compressobj(level=1, wbits=9)
837        zlib9 = co.compress(HAMLET_SCENE) + co.flush()
838        self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
839        self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
840        if supports_wbits_0:
841            self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
842        self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
843        dco = zlib.decompressobj(wbits=32 + 9)
844        self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
845
846        co = zlib.compressobj(level=1, wbits=-15)
847        deflate15 = co.compress(HAMLET_SCENE) + co.flush()
848        self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
849        dco = zlib.decompressobj(wbits=-15)
850        self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
851
852        co = zlib.compressobj(level=1, wbits=-9)
853        deflate9 = co.compress(HAMLET_SCENE) + co.flush()
854        self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
855        self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
856        dco = zlib.decompressobj(wbits=-9)
857        self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
858
859        co = zlib.compressobj(level=1, wbits=16 + 15)
860        gzip = co.compress(HAMLET_SCENE) + co.flush()
861        self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
862        self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
863        dco = zlib.decompressobj(32 + 15)
864        self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
865
866        for wbits in (-15, 15, 31):
867            with self.subTest(wbits=wbits):
868                expected = HAMLET_SCENE
869                actual = zlib.decompress(
870                    zlib.compress(HAMLET_SCENE, wbits=wbits), wbits=wbits
871                )
872                self.assertEqual(expected, actual)
873
874def choose_lines(source, number, seed=None, generator=random):
875    """Return a list of number lines randomly chosen from the source"""
876    if seed is not None:
877        generator.seed(seed)
878    sources = source.split('\n')
879    return [generator.choice(sources) for n in range(number)]
880
881
882HAMLET_SCENE = b"""
883LAERTES
884
885       O, fear me not.
886       I stay too long: but here my father comes.
887
888       Enter POLONIUS
889
890       A double blessing is a double grace,
891       Occasion smiles upon a second leave.
892
893LORD POLONIUS
894
895       Yet here, Laertes! aboard, aboard, for shame!
896       The wind sits in the shoulder of your sail,
897       And you are stay'd for. There; my blessing with thee!
898       And these few precepts in thy memory
899       See thou character. Give thy thoughts no tongue,
900       Nor any unproportioned thought his act.
901       Be thou familiar, but by no means vulgar.
902       Those friends thou hast, and their adoption tried,
903       Grapple them to thy soul with hoops of steel;
904       But do not dull thy palm with entertainment
905       Of each new-hatch'd, unfledged comrade. Beware
906       Of entrance to a quarrel, but being in,
907       Bear't that the opposed may beware of thee.
908       Give every man thy ear, but few thy voice;
909       Take each man's censure, but reserve thy judgment.
910       Costly thy habit as thy purse can buy,
911       But not express'd in fancy; rich, not gaudy;
912       For the apparel oft proclaims the man,
913       And they in France of the best rank and station
914       Are of a most select and generous chief in that.
915       Neither a borrower nor a lender be;
916       For loan oft loses both itself and friend,
917       And borrowing dulls the edge of husbandry.
918       This above all: to thine ownself be true,
919       And it must follow, as the night the day,
920       Thou canst not then be false to any man.
921       Farewell: my blessing season this in thee!
922
923LAERTES
924
925       Most humbly do I take my leave, my lord.
926
927LORD POLONIUS
928
929       The time invites you; go; your servants tend.
930
931LAERTES
932
933       Farewell, Ophelia; and remember well
934       What I have said to you.
935
936OPHELIA
937
938       'Tis in my memory lock'd,
939       And you yourself shall keep the key of it.
940
941LAERTES
942
943       Farewell.
944"""
945
946
947class CustomInt:
948    def __index__(self):
949        return 100
950
951
952if __name__ == "__main__":
953    unittest.main()
954