1import unittest 2from test import support 3from test.support import import_helper 4import binascii 5import copy 6import os 7import pickle 8import random 9import sys 10from test.support import bigmemtest, _1G, _4G 11 12 13zlib = import_helper.import_module('zlib') 14 15requires_Compress_copy = unittest.skipUnless( 16 hasattr(zlib.compressobj(), "copy"), 17 'requires Compress.copy()') 18requires_Decompress_copy = unittest.skipUnless( 19 hasattr(zlib.decompressobj(), "copy"), 20 'requires Decompress.copy()') 21 22# bpo-46623: On s390x, when a hardware accelerator is used, using different 23# ways to compress data with zlib can produce different compressed data. 24# Simplified test_pair() code: 25# 26# def func1(data): 27# return zlib.compress(data) 28# 29# def func2(data) 30# co = zlib.compressobj() 31# x1 = co.compress(data) 32# x2 = co.flush() 33# return x1 + x2 34# 35# On s390x if zlib uses a hardware accelerator, func1() creates a single 36# "final" compressed block whereas func2() produces 3 compressed blocks (the 37# last one is a final block). On other platforms with no accelerator, func1() 38# and func2() produce the same compressed data made of a single (final) 39# compressed block. 40# 41# Only the compressed data is different, the decompression returns the original 42# data: 43# 44# zlib.decompress(func1(data)) == zlib.decompress(func2(data)) == data 45# 46# Make the assumption that s390x always has an accelerator to simplify the skip 47# condition. Windows doesn't have os.uname() but it doesn't support s390x. 48skip_on_s390x = unittest.skipIf(hasattr(os, 'uname') and os.uname().machine == 's390x', 49 'skipped on s390x') 50 51 52class VersionTestCase(unittest.TestCase): 53 54 def test_library_version(self): 55 # Test that the major version of the actual library in use matches the 56 # major version that we were compiled against. We can't guarantee that 57 # the minor versions will match (even on the machine on which the module 58 # was compiled), and the API is stable between minor versions, so 59 # testing only the major versions avoids spurious failures. 60 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0]) 61 62 63class ChecksumTestCase(unittest.TestCase): 64 # checksum test cases 65 def test_crc32start(self): 66 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0)) 67 self.assertTrue(zlib.crc32(b"abc", 0xffffffff)) 68 69 def test_crc32empty(self): 70 self.assertEqual(zlib.crc32(b"", 0), 0) 71 self.assertEqual(zlib.crc32(b"", 1), 1) 72 self.assertEqual(zlib.crc32(b"", 432), 432) 73 74 def test_adler32start(self): 75 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1)) 76 self.assertTrue(zlib.adler32(b"abc", 0xffffffff)) 77 78 def test_adler32empty(self): 79 self.assertEqual(zlib.adler32(b"", 0), 0) 80 self.assertEqual(zlib.adler32(b"", 1), 1) 81 self.assertEqual(zlib.adler32(b"", 432), 432) 82 83 def test_penguins(self): 84 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120) 85 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94) 86 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6) 87 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7) 88 89 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0)) 90 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1)) 91 92 def test_crc32_adler32_unsigned(self): 93 foo = b'abcdefghijklmnop' 94 # explicitly test signed behavior 95 self.assertEqual(zlib.crc32(foo), 2486878355) 96 self.assertEqual(zlib.crc32(b'spam'), 1138425661) 97 self.assertEqual(zlib.adler32(foo+foo), 3573550353) 98 self.assertEqual(zlib.adler32(b'spam'), 72286642) 99 100 def test_same_as_binascii_crc32(self): 101 foo = b'abcdefghijklmnop' 102 crc = 2486878355 103 self.assertEqual(binascii.crc32(foo), crc) 104 self.assertEqual(zlib.crc32(foo), crc) 105 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam')) 106 107 108# Issue #10276 - check that inputs >=4 GiB are handled correctly. 109class ChecksumBigBufferTestCase(unittest.TestCase): 110 111 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False) 112 def test_big_buffer(self, size): 113 data = b"nyan" * (_1G + 1) 114 self.assertEqual(zlib.crc32(data), 1044521549) 115 self.assertEqual(zlib.adler32(data), 2256789997) 116 117 118class ExceptionTestCase(unittest.TestCase): 119 # make sure we generate some expected errors 120 def test_badlevel(self): 121 # specifying compression level out of range causes an error 122 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 123 # accepts 0 too) 124 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10) 125 126 def test_badargs(self): 127 self.assertRaises(TypeError, zlib.adler32) 128 self.assertRaises(TypeError, zlib.crc32) 129 self.assertRaises(TypeError, zlib.compress) 130 self.assertRaises(TypeError, zlib.decompress) 131 for arg in (42, None, '', 'abc', (), []): 132 self.assertRaises(TypeError, zlib.adler32, arg) 133 self.assertRaises(TypeError, zlib.crc32, arg) 134 self.assertRaises(TypeError, zlib.compress, arg) 135 self.assertRaises(TypeError, zlib.decompress, arg) 136 137 def test_badcompressobj(self): 138 # verify failure on building compress object with bad params 139 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 140 # specifying total bits too large causes an error 141 self.assertRaises(ValueError, 142 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 143 144 def test_baddecompressobj(self): 145 # verify failure on building decompress object with bad params 146 self.assertRaises(ValueError, zlib.decompressobj, -1) 147 148 def test_decompressobj_badflush(self): 149 # verify failure on calling decompressobj.flush with bad params 150 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 151 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 152 153 @support.cpython_only 154 def test_overflow(self): 155 with self.assertRaisesRegex(OverflowError, 'int too large'): 156 zlib.decompress(b'', 15, sys.maxsize + 1) 157 with self.assertRaisesRegex(OverflowError, 'int too large'): 158 zlib.decompressobj().decompress(b'', sys.maxsize + 1) 159 with self.assertRaisesRegex(OverflowError, 'int too large'): 160 zlib.decompressobj().flush(sys.maxsize + 1) 161 162 @support.cpython_only 163 def test_disallow_instantiation(self): 164 # Ensure that the type disallows instantiation (bpo-43916) 165 support.check_disallow_instantiation(self, type(zlib.compressobj())) 166 support.check_disallow_instantiation(self, type(zlib.decompressobj())) 167 168 169class BaseCompressTestCase(object): 170 def check_big_compress_buffer(self, size, compress_func): 171 _1M = 1024 * 1024 172 # Generate 10 MiB worth of random, and expand it by repeating it. 173 # The assumption is that zlib's memory is not big enough to exploit 174 # such spread out redundancy. 175 data = random.randbytes(_1M * 10) 176 data = data * (size // len(data) + 1) 177 try: 178 compress_func(data) 179 finally: 180 # Release memory 181 data = None 182 183 def check_big_decompress_buffer(self, size, decompress_func): 184 data = b'x' * size 185 try: 186 compressed = zlib.compress(data, 1) 187 finally: 188 # Release memory 189 data = None 190 data = decompress_func(compressed) 191 # Sanity check 192 try: 193 self.assertEqual(len(data), size) 194 self.assertEqual(len(data.strip(b'x')), 0) 195 finally: 196 data = None 197 198 199class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 200 # Test compression in one go (whole message compression) 201 def test_speech(self): 202 x = zlib.compress(HAMLET_SCENE) 203 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 204 205 def test_keywords(self): 206 x = zlib.compress(HAMLET_SCENE, level=3) 207 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 208 with self.assertRaises(TypeError): 209 zlib.compress(data=HAMLET_SCENE, level=3) 210 self.assertEqual(zlib.decompress(x, 211 wbits=zlib.MAX_WBITS, 212 bufsize=zlib.DEF_BUF_SIZE), 213 HAMLET_SCENE) 214 215 @skip_on_s390x 216 def test_speech128(self): 217 # compress more data 218 data = HAMLET_SCENE * 128 219 x = zlib.compress(data) 220 self.assertEqual(zlib.compress(bytearray(data)), x) 221 for ob in x, bytearray(x): 222 self.assertEqual(zlib.decompress(ob), data) 223 224 def test_incomplete_stream(self): 225 # A useful error message is given 226 x = zlib.compress(HAMLET_SCENE) 227 self.assertRaisesRegex(zlib.error, 228 "Error -5 while decompressing data: incomplete or truncated stream", 229 zlib.decompress, x[:-1]) 230 231 # Memory use of the following functions takes into account overallocation 232 233 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 234 def test_big_compress_buffer(self, size): 235 compress = lambda s: zlib.compress(s, 1) 236 self.check_big_compress_buffer(size, compress) 237 238 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 239 def test_big_decompress_buffer(self, size): 240 self.check_big_decompress_buffer(size, zlib.decompress) 241 242 @bigmemtest(size=_4G, memuse=1) 243 def test_large_bufsize(self, size): 244 # Test decompress(bufsize) parameter greater than the internal limit 245 data = HAMLET_SCENE * 10 246 compressed = zlib.compress(data, 1) 247 self.assertEqual(zlib.decompress(compressed, 15, size), data) 248 249 def test_custom_bufsize(self): 250 data = HAMLET_SCENE * 10 251 compressed = zlib.compress(data, 1) 252 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data) 253 254 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 255 @bigmemtest(size=_4G + 100, memuse=4) 256 def test_64bit_compress(self, size): 257 data = b'x' * size 258 try: 259 comp = zlib.compress(data, 0) 260 self.assertEqual(zlib.decompress(comp), data) 261 finally: 262 comp = data = None 263 264 265class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 266 # Test compression object 267 @skip_on_s390x 268 def test_pair(self): 269 # straightforward compress/decompress objects 270 datasrc = HAMLET_SCENE * 128 271 datazip = zlib.compress(datasrc) 272 # should compress both bytes and bytearray data 273 for data in (datasrc, bytearray(datasrc)): 274 co = zlib.compressobj() 275 x1 = co.compress(data) 276 x2 = co.flush() 277 self.assertRaises(zlib.error, co.flush) # second flush should not work 278 self.assertEqual(x1 + x2, datazip) 279 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))): 280 dco = zlib.decompressobj() 281 y1 = dco.decompress(v1 + v2) 282 y2 = dco.flush() 283 self.assertEqual(data, y1 + y2) 284 self.assertIsInstance(dco.unconsumed_tail, bytes) 285 self.assertIsInstance(dco.unused_data, bytes) 286 287 def test_keywords(self): 288 level = 2 289 method = zlib.DEFLATED 290 wbits = -12 291 memLevel = 9 292 strategy = zlib.Z_FILTERED 293 co = zlib.compressobj(level=level, 294 method=method, 295 wbits=wbits, 296 memLevel=memLevel, 297 strategy=strategy, 298 zdict=b"") 299 do = zlib.decompressobj(wbits=wbits, zdict=b"") 300 with self.assertRaises(TypeError): 301 co.compress(data=HAMLET_SCENE) 302 with self.assertRaises(TypeError): 303 do.decompress(data=zlib.compress(HAMLET_SCENE)) 304 x = co.compress(HAMLET_SCENE) + co.flush() 305 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush() 306 self.assertEqual(HAMLET_SCENE, y) 307 308 def test_compressoptions(self): 309 # specify lots of options to compressobj() 310 level = 2 311 method = zlib.DEFLATED 312 wbits = -12 313 memLevel = 9 314 strategy = zlib.Z_FILTERED 315 co = zlib.compressobj(level, method, wbits, memLevel, strategy) 316 x1 = co.compress(HAMLET_SCENE) 317 x2 = co.flush() 318 dco = zlib.decompressobj(wbits) 319 y1 = dco.decompress(x1 + x2) 320 y2 = dco.flush() 321 self.assertEqual(HAMLET_SCENE, y1 + y2) 322 323 def test_compressincremental(self): 324 # compress object in steps, decompress object as one-shot 325 data = HAMLET_SCENE * 128 326 co = zlib.compressobj() 327 bufs = [] 328 for i in range(0, len(data), 256): 329 bufs.append(co.compress(data[i:i+256])) 330 bufs.append(co.flush()) 331 combuf = b''.join(bufs) 332 333 dco = zlib.decompressobj() 334 y1 = dco.decompress(b''.join(bufs)) 335 y2 = dco.flush() 336 self.assertEqual(data, y1 + y2) 337 338 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 339 # compress object in steps, decompress object in steps 340 source = source or HAMLET_SCENE 341 data = source * 128 342 co = zlib.compressobj() 343 bufs = [] 344 for i in range(0, len(data), cx): 345 bufs.append(co.compress(data[i:i+cx])) 346 bufs.append(co.flush()) 347 combuf = b''.join(bufs) 348 349 decombuf = zlib.decompress(combuf) 350 # Test type of return value 351 self.assertIsInstance(decombuf, bytes) 352 353 self.assertEqual(data, decombuf) 354 355 dco = zlib.decompressobj() 356 bufs = [] 357 for i in range(0, len(combuf), dcx): 358 bufs.append(dco.decompress(combuf[i:i+dcx])) 359 self.assertEqual(b'', dco.unconsumed_tail, ######## 360 "(A) uct should be b'': not %d long" % 361 len(dco.unconsumed_tail)) 362 self.assertEqual(b'', dco.unused_data) 363 if flush: 364 bufs.append(dco.flush()) 365 else: 366 while True: 367 chunk = dco.decompress(b'') 368 if chunk: 369 bufs.append(chunk) 370 else: 371 break 372 self.assertEqual(b'', dco.unconsumed_tail, ######## 373 "(B) uct should be b'': not %d long" % 374 len(dco.unconsumed_tail)) 375 self.assertEqual(b'', dco.unused_data) 376 self.assertEqual(data, b''.join(bufs)) 377 # Failure means: "decompressobj with init options failed" 378 379 def test_decompincflush(self): 380 self.test_decompinc(flush=True) 381 382 def test_decompimax(self, source=None, cx=256, dcx=64): 383 # compress in steps, decompress in length-restricted steps 384 source = source or HAMLET_SCENE 385 # Check a decompression object with max_length specified 386 data = source * 128 387 co = zlib.compressobj() 388 bufs = [] 389 for i in range(0, len(data), cx): 390 bufs.append(co.compress(data[i:i+cx])) 391 bufs.append(co.flush()) 392 combuf = b''.join(bufs) 393 self.assertEqual(data, zlib.decompress(combuf), 394 'compressed data failure') 395 396 dco = zlib.decompressobj() 397 bufs = [] 398 cb = combuf 399 while cb: 400 #max_length = 1 + len(cb)//10 401 chunk = dco.decompress(cb, dcx) 402 self.assertFalse(len(chunk) > dcx, 403 'chunk too big (%d>%d)' % (len(chunk), dcx)) 404 bufs.append(chunk) 405 cb = dco.unconsumed_tail 406 bufs.append(dco.flush()) 407 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 408 409 def test_decompressmaxlen(self, flush=False): 410 # Check a decompression object with max_length specified 411 data = HAMLET_SCENE * 128 412 co = zlib.compressobj() 413 bufs = [] 414 for i in range(0, len(data), 256): 415 bufs.append(co.compress(data[i:i+256])) 416 bufs.append(co.flush()) 417 combuf = b''.join(bufs) 418 self.assertEqual(data, zlib.decompress(combuf), 419 'compressed data failure') 420 421 dco = zlib.decompressobj() 422 bufs = [] 423 cb = combuf 424 while cb: 425 max_length = 1 + len(cb)//10 426 chunk = dco.decompress(cb, max_length) 427 self.assertFalse(len(chunk) > max_length, 428 'chunk too big (%d>%d)' % (len(chunk),max_length)) 429 bufs.append(chunk) 430 cb = dco.unconsumed_tail 431 if flush: 432 bufs.append(dco.flush()) 433 else: 434 while chunk: 435 chunk = dco.decompress(b'', max_length) 436 self.assertFalse(len(chunk) > max_length, 437 'chunk too big (%d>%d)' % (len(chunk),max_length)) 438 bufs.append(chunk) 439 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 440 441 def test_decompressmaxlenflush(self): 442 self.test_decompressmaxlen(flush=True) 443 444 def test_maxlenmisc(self): 445 # Misc tests of max_length 446 dco = zlib.decompressobj() 447 self.assertRaises(ValueError, dco.decompress, b"", -1) 448 self.assertEqual(b'', dco.unconsumed_tail) 449 450 def test_maxlen_large(self): 451 # Sizes up to sys.maxsize should be accepted, although zlib is 452 # internally limited to expressing sizes with unsigned int 453 data = HAMLET_SCENE * 10 454 self.assertGreater(len(data), zlib.DEF_BUF_SIZE) 455 compressed = zlib.compress(data, 1) 456 dco = zlib.decompressobj() 457 self.assertEqual(dco.decompress(compressed, sys.maxsize), data) 458 459 def test_maxlen_custom(self): 460 data = HAMLET_SCENE * 10 461 compressed = zlib.compress(data, 1) 462 dco = zlib.decompressobj() 463 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100]) 464 465 def test_clear_unconsumed_tail(self): 466 # Issue #12050: calling decompress() without providing max_length 467 # should clear the unconsumed_tail attribute. 468 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc" 469 dco = zlib.decompressobj() 470 ddata = dco.decompress(cdata, 1) 471 ddata += dco.decompress(dco.unconsumed_tail) 472 self.assertEqual(dco.unconsumed_tail, b"") 473 474 def test_flushes(self): 475 # Test flush() with the various options, using all the 476 # different levels in order to provide more variations. 477 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH', 478 'Z_PARTIAL_FLUSH'] 479 480 ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.')) 481 # Z_BLOCK has a known failure prior to 1.2.5.3 482 if ver >= (1, 2, 5, 3): 483 sync_opt.append('Z_BLOCK') 484 485 sync_opt = [getattr(zlib, opt) for opt in sync_opt 486 if hasattr(zlib, opt)] 487 data = HAMLET_SCENE * 8 488 489 for sync in sync_opt: 490 for level in range(10): 491 try: 492 obj = zlib.compressobj( level ) 493 a = obj.compress( data[:3000] ) 494 b = obj.flush( sync ) 495 c = obj.compress( data[3000:] ) 496 d = obj.flush() 497 except: 498 print("Error for flush mode={}, level={}" 499 .format(sync, level)) 500 raise 501 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])), 502 data, ("Decompress failed: flush " 503 "mode=%i, level=%i") % (sync, level)) 504 del obj 505 506 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), 507 'requires zlib.Z_SYNC_FLUSH') 508 def test_odd_flush(self): 509 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 510 import random 511 # Testing on 17K of "random" data 512 513 # Create compressor and decompressor objects 514 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 515 dco = zlib.decompressobj() 516 517 # Try 17K of data 518 # generate random data stream 519 try: 520 # In 2.3 and later, WichmannHill is the RNG of the bug report 521 gen = random.WichmannHill() 522 except AttributeError: 523 try: 524 # 2.2 called it Random 525 gen = random.Random() 526 except AttributeError: 527 # others might simply have a single RNG 528 gen = random 529 gen.seed(1) 530 data = gen.randbytes(17 * 1024) 531 532 # compress, sync-flush, and decompress 533 first = co.compress(data) 534 second = co.flush(zlib.Z_SYNC_FLUSH) 535 expanded = dco.decompress(first + second) 536 537 # if decompressed data is different from the input data, choke. 538 self.assertEqual(expanded, data, "17K random source doesn't match") 539 540 def test_empty_flush(self): 541 # Test that calling .flush() on unused objects works. 542 # (Bug #1083110 -- calling .flush() on decompress objects 543 # caused a core dump.) 544 545 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 546 self.assertTrue(co.flush()) # Returns a zlib header 547 dco = zlib.decompressobj() 548 self.assertEqual(dco.flush(), b"") # Returns nothing 549 550 def test_dictionary(self): 551 h = HAMLET_SCENE 552 # Build a simulated dictionary out of the words in HAMLET. 553 words = h.split() 554 random.shuffle(words) 555 zdict = b''.join(words) 556 # Use it to compress HAMLET. 557 co = zlib.compressobj(zdict=zdict) 558 cd = co.compress(h) + co.flush() 559 # Verify that it will decompress with the dictionary. 560 dco = zlib.decompressobj(zdict=zdict) 561 self.assertEqual(dco.decompress(cd) + dco.flush(), h) 562 # Verify that it fails when not given the dictionary. 563 dco = zlib.decompressobj() 564 self.assertRaises(zlib.error, dco.decompress, cd) 565 566 def test_dictionary_streaming(self): 567 # This simulates the reuse of a compressor object for compressing 568 # several separate data streams. 569 co = zlib.compressobj(zdict=HAMLET_SCENE) 570 do = zlib.decompressobj(zdict=HAMLET_SCENE) 571 piece = HAMLET_SCENE[1000:1500] 572 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH) 573 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH) 574 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH) 575 self.assertEqual(do.decompress(d0), piece) 576 self.assertEqual(do.decompress(d1), piece[100:]) 577 self.assertEqual(do.decompress(d2), piece[:-100]) 578 579 def test_decompress_incomplete_stream(self): 580 # This is 'foo', deflated 581 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 582 # For the record 583 self.assertEqual(zlib.decompress(x), b'foo') 584 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 585 # Omitting the stream end works with decompressor objects 586 # (see issue #8672). 587 dco = zlib.decompressobj() 588 y = dco.decompress(x[:-5]) 589 y += dco.flush() 590 self.assertEqual(y, b'foo') 591 592 def test_decompress_eof(self): 593 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 594 dco = zlib.decompressobj() 595 self.assertFalse(dco.eof) 596 dco.decompress(x[:-5]) 597 self.assertFalse(dco.eof) 598 dco.decompress(x[-5:]) 599 self.assertTrue(dco.eof) 600 dco.flush() 601 self.assertTrue(dco.eof) 602 603 def test_decompress_eof_incomplete_stream(self): 604 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 605 dco = zlib.decompressobj() 606 self.assertFalse(dco.eof) 607 dco.decompress(x[:-5]) 608 self.assertFalse(dco.eof) 609 dco.flush() 610 self.assertFalse(dco.eof) 611 612 def test_decompress_unused_data(self): 613 # Repeated calls to decompress() after EOF should accumulate data in 614 # dco.unused_data, instead of just storing the arg to the last call. 615 source = b'abcdefghijklmnopqrstuvwxyz' 616 remainder = b'0123456789' 617 y = zlib.compress(source) 618 x = y + remainder 619 for maxlen in 0, 1000: 620 for step in 1, 2, len(y), len(x): 621 dco = zlib.decompressobj() 622 data = b'' 623 for i in range(0, len(x), step): 624 if i < len(y): 625 self.assertEqual(dco.unused_data, b'') 626 if maxlen == 0: 627 data += dco.decompress(x[i : i + step]) 628 self.assertEqual(dco.unconsumed_tail, b'') 629 else: 630 data += dco.decompress( 631 dco.unconsumed_tail + x[i : i + step], maxlen) 632 data += dco.flush() 633 self.assertTrue(dco.eof) 634 self.assertEqual(data, source) 635 self.assertEqual(dco.unconsumed_tail, b'') 636 self.assertEqual(dco.unused_data, remainder) 637 638 # issue27164 639 def test_decompress_raw_with_dictionary(self): 640 zdict = b'abcdefghijklmnopqrstuvwxyz' 641 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 642 comp = co.compress(zdict) + co.flush() 643 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 644 uncomp = dco.decompress(comp) + dco.flush() 645 self.assertEqual(zdict, uncomp) 646 647 def test_flush_with_freed_input(self): 648 # Issue #16411: decompressor accesses input to last decompress() call 649 # in flush(), even if this object has been freed in the meanwhile. 650 input1 = b'abcdefghijklmnopqrstuvwxyz' 651 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM' 652 data = zlib.compress(input1) 653 dco = zlib.decompressobj() 654 dco.decompress(data, 1) 655 del data 656 data = zlib.compress(input2) 657 self.assertEqual(dco.flush(), input1[1:]) 658 659 @bigmemtest(size=_4G, memuse=1) 660 def test_flush_large_length(self, size): 661 # Test flush(length) parameter greater than internal limit UINT_MAX 662 input = HAMLET_SCENE * 10 663 data = zlib.compress(input, 1) 664 dco = zlib.decompressobj() 665 dco.decompress(data, 1) 666 self.assertEqual(dco.flush(size), input[1:]) 667 668 def test_flush_custom_length(self): 669 input = HAMLET_SCENE * 10 670 data = zlib.compress(input, 1) 671 dco = zlib.decompressobj() 672 dco.decompress(data, 1) 673 self.assertEqual(dco.flush(CustomInt()), input[1:]) 674 675 @requires_Compress_copy 676 def test_compresscopy(self): 677 # Test copying a compression object 678 data0 = HAMLET_SCENE 679 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") 680 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 681 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 682 bufs0 = [] 683 bufs0.append(c0.compress(data0)) 684 685 c1 = func(c0) 686 bufs1 = bufs0[:] 687 688 bufs0.append(c0.compress(data0)) 689 bufs0.append(c0.flush()) 690 s0 = b''.join(bufs0) 691 692 bufs1.append(c1.compress(data1)) 693 bufs1.append(c1.flush()) 694 s1 = b''.join(bufs1) 695 696 self.assertEqual(zlib.decompress(s0),data0+data0) 697 self.assertEqual(zlib.decompress(s1),data0+data1) 698 699 @requires_Compress_copy 700 def test_badcompresscopy(self): 701 # Test copying a compression object in an inconsistent state 702 c = zlib.compressobj() 703 c.compress(HAMLET_SCENE) 704 c.flush() 705 self.assertRaises(ValueError, c.copy) 706 self.assertRaises(ValueError, copy.copy, c) 707 self.assertRaises(ValueError, copy.deepcopy, c) 708 709 @requires_Decompress_copy 710 def test_decompresscopy(self): 711 # Test copying a decompression object 712 data = HAMLET_SCENE 713 comp = zlib.compress(data) 714 # Test type of return value 715 self.assertIsInstance(comp, bytes) 716 717 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 718 d0 = zlib.decompressobj() 719 bufs0 = [] 720 bufs0.append(d0.decompress(comp[:32])) 721 722 d1 = func(d0) 723 bufs1 = bufs0[:] 724 725 bufs0.append(d0.decompress(comp[32:])) 726 s0 = b''.join(bufs0) 727 728 bufs1.append(d1.decompress(comp[32:])) 729 s1 = b''.join(bufs1) 730 731 self.assertEqual(s0,s1) 732 self.assertEqual(s0,data) 733 734 @requires_Decompress_copy 735 def test_baddecompresscopy(self): 736 # Test copying a compression object in an inconsistent state 737 data = zlib.compress(HAMLET_SCENE) 738 d = zlib.decompressobj() 739 d.decompress(data) 740 d.flush() 741 self.assertRaises(ValueError, d.copy) 742 self.assertRaises(ValueError, copy.copy, d) 743 self.assertRaises(ValueError, copy.deepcopy, d) 744 745 def test_compresspickle(self): 746 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 747 with self.assertRaises((TypeError, pickle.PicklingError)): 748 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto) 749 750 def test_decompresspickle(self): 751 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 752 with self.assertRaises((TypeError, pickle.PicklingError)): 753 pickle.dumps(zlib.decompressobj(), proto) 754 755 # Memory use of the following functions takes into account overallocation 756 757 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 758 def test_big_compress_buffer(self, size): 759 c = zlib.compressobj(1) 760 compress = lambda s: c.compress(s) + c.flush() 761 self.check_big_compress_buffer(size, compress) 762 763 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 764 def test_big_decompress_buffer(self, size): 765 d = zlib.decompressobj() 766 decompress = lambda s: d.decompress(s) + d.flush() 767 self.check_big_decompress_buffer(size, decompress) 768 769 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 770 @bigmemtest(size=_4G + 100, memuse=4) 771 def test_64bit_compress(self, size): 772 data = b'x' * size 773 co = zlib.compressobj(0) 774 do = zlib.decompressobj() 775 try: 776 comp = co.compress(data) + co.flush() 777 uncomp = do.decompress(comp) + do.flush() 778 self.assertEqual(uncomp, data) 779 finally: 780 comp = uncomp = data = None 781 782 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 783 @bigmemtest(size=_4G + 100, memuse=3) 784 def test_large_unused_data(self, size): 785 data = b'abcdefghijklmnop' 786 unused = b'x' * size 787 comp = zlib.compress(data) + unused 788 do = zlib.decompressobj() 789 try: 790 uncomp = do.decompress(comp) + do.flush() 791 self.assertEqual(unused, do.unused_data) 792 self.assertEqual(uncomp, data) 793 finally: 794 unused = comp = do = None 795 796 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 797 @bigmemtest(size=_4G + 100, memuse=5) 798 def test_large_unconsumed_tail(self, size): 799 data = b'x' * size 800 do = zlib.decompressobj() 801 try: 802 comp = zlib.compress(data, 0) 803 uncomp = do.decompress(comp, 1) + do.flush() 804 self.assertEqual(uncomp, data) 805 self.assertEqual(do.unconsumed_tail, b'') 806 finally: 807 comp = uncomp = data = None 808 809 def test_wbits(self): 810 # wbits=0 only supported since zlib v1.2.3.5 811 # Register "1.2.3" as "1.2.3.0" 812 # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux" 813 v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.') 814 if len(v) < 4: 815 v.append('0') 816 elif not v[-1].isnumeric(): 817 v[-1] = '0' 818 819 v = tuple(map(int, v)) 820 supports_wbits_0 = v >= (1, 2, 3, 5) 821 822 co = zlib.compressobj(level=1, wbits=15) 823 zlib15 = co.compress(HAMLET_SCENE) + co.flush() 824 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE) 825 if supports_wbits_0: 826 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE) 827 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE) 828 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 829 zlib.decompress(zlib15, 14) 830 dco = zlib.decompressobj(wbits=32 + 15) 831 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE) 832 dco = zlib.decompressobj(wbits=14) 833 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 834 dco.decompress(zlib15) 835 836 co = zlib.compressobj(level=1, wbits=9) 837 zlib9 = co.compress(HAMLET_SCENE) + co.flush() 838 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE) 839 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE) 840 if supports_wbits_0: 841 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE) 842 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE) 843 dco = zlib.decompressobj(wbits=32 + 9) 844 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE) 845 846 co = zlib.compressobj(level=1, wbits=-15) 847 deflate15 = co.compress(HAMLET_SCENE) + co.flush() 848 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE) 849 dco = zlib.decompressobj(wbits=-15) 850 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE) 851 852 co = zlib.compressobj(level=1, wbits=-9) 853 deflate9 = co.compress(HAMLET_SCENE) + co.flush() 854 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE) 855 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE) 856 dco = zlib.decompressobj(wbits=-9) 857 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE) 858 859 co = zlib.compressobj(level=1, wbits=16 + 15) 860 gzip = co.compress(HAMLET_SCENE) + co.flush() 861 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE) 862 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE) 863 dco = zlib.decompressobj(32 + 15) 864 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE) 865 866 for wbits in (-15, 15, 31): 867 with self.subTest(wbits=wbits): 868 expected = HAMLET_SCENE 869 actual = zlib.decompress( 870 zlib.compress(HAMLET_SCENE, wbits=wbits), wbits=wbits 871 ) 872 self.assertEqual(expected, actual) 873 874def choose_lines(source, number, seed=None, generator=random): 875 """Return a list of number lines randomly chosen from the source""" 876 if seed is not None: 877 generator.seed(seed) 878 sources = source.split('\n') 879 return [generator.choice(sources) for n in range(number)] 880 881 882HAMLET_SCENE = b""" 883LAERTES 884 885 O, fear me not. 886 I stay too long: but here my father comes. 887 888 Enter POLONIUS 889 890 A double blessing is a double grace, 891 Occasion smiles upon a second leave. 892 893LORD POLONIUS 894 895 Yet here, Laertes! aboard, aboard, for shame! 896 The wind sits in the shoulder of your sail, 897 And you are stay'd for. There; my blessing with thee! 898 And these few precepts in thy memory 899 See thou character. Give thy thoughts no tongue, 900 Nor any unproportioned thought his act. 901 Be thou familiar, but by no means vulgar. 902 Those friends thou hast, and their adoption tried, 903 Grapple them to thy soul with hoops of steel; 904 But do not dull thy palm with entertainment 905 Of each new-hatch'd, unfledged comrade. Beware 906 Of entrance to a quarrel, but being in, 907 Bear't that the opposed may beware of thee. 908 Give every man thy ear, but few thy voice; 909 Take each man's censure, but reserve thy judgment. 910 Costly thy habit as thy purse can buy, 911 But not express'd in fancy; rich, not gaudy; 912 For the apparel oft proclaims the man, 913 And they in France of the best rank and station 914 Are of a most select and generous chief in that. 915 Neither a borrower nor a lender be; 916 For loan oft loses both itself and friend, 917 And borrowing dulls the edge of husbandry. 918 This above all: to thine ownself be true, 919 And it must follow, as the night the day, 920 Thou canst not then be false to any man. 921 Farewell: my blessing season this in thee! 922 923LAERTES 924 925 Most humbly do I take my leave, my lord. 926 927LORD POLONIUS 928 929 The time invites you; go; your servants tend. 930 931LAERTES 932 933 Farewell, Ophelia; and remember well 934 What I have said to you. 935 936OPHELIA 937 938 'Tis in my memory lock'd, 939 And you yourself shall keep the key of it. 940 941LAERTES 942 943 Farewell. 944""" 945 946 947class CustomInt: 948 def __index__(self): 949 return 100 950 951 952if __name__ == "__main__": 953 unittest.main() 954