1from test.support import ( 2 requires, _2G, _4G, gc_collect, cpython_only, is_emscripten 3) 4from test.support.import_helper import import_module 5from test.support.os_helper import TESTFN, unlink 6import unittest 7import os 8import re 9import itertools 10import random 11import socket 12import string 13import sys 14import weakref 15 16# Skip test if we can't import mmap. 17mmap = import_module('mmap') 18 19PAGESIZE = mmap.PAGESIZE 20 21tagname_prefix = f'python_{os.getpid()}_test_mmap' 22def random_tagname(length=10): 23 suffix = ''.join(random.choices(string.ascii_uppercase, k=length)) 24 return f'{tagname_prefix}_{suffix}' 25 26# Python's mmap module dup()s the file descriptor. Emscripten's FS layer 27# does not materialize file changes through a dupped fd to a new mmap. 28if is_emscripten: 29 raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.") 30 31 32class MmapTests(unittest.TestCase): 33 34 def setUp(self): 35 if os.path.exists(TESTFN): 36 os.unlink(TESTFN) 37 38 def tearDown(self): 39 try: 40 os.unlink(TESTFN) 41 except OSError: 42 pass 43 44 def test_basic(self): 45 # Test mmap module on Unix systems and Windows 46 47 # Create a file to be mmap'ed. 48 f = open(TESTFN, 'bw+') 49 try: 50 # Write 2 pages worth of data to the file 51 f.write(b'\0'* PAGESIZE) 52 f.write(b'foo') 53 f.write(b'\0'* (PAGESIZE-3) ) 54 f.flush() 55 m = mmap.mmap(f.fileno(), 2 * PAGESIZE) 56 finally: 57 f.close() 58 59 # Simple sanity checks 60 61 tp = str(type(m)) # SF bug 128713: segfaulted on Linux 62 self.assertEqual(m.find(b'foo'), PAGESIZE) 63 64 self.assertEqual(len(m), 2*PAGESIZE) 65 66 self.assertEqual(m[0], 0) 67 self.assertEqual(m[0:3], b'\0\0\0') 68 69 # Shouldn't crash on boundary (Issue #5292) 70 self.assertRaises(IndexError, m.__getitem__, len(m)) 71 self.assertRaises(IndexError, m.__setitem__, len(m), b'\0') 72 73 # Modify the file's content 74 m[0] = b'3'[0] 75 m[PAGESIZE +3: PAGESIZE +3+3] = b'bar' 76 77 # Check that the modification worked 78 self.assertEqual(m[0], b'3'[0]) 79 self.assertEqual(m[0:3], b'3\0\0') 80 self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0') 81 82 m.flush() 83 84 # Test doing a regular expression match in an mmap'ed file 85 match = re.search(b'[A-Za-z]+', m) 86 if match is None: 87 self.fail('regex match on mmap failed!') 88 else: 89 start, end = match.span(0) 90 length = end - start 91 92 self.assertEqual(start, PAGESIZE) 93 self.assertEqual(end, PAGESIZE + 6) 94 95 # test seeking around (try to overflow the seek implementation) 96 m.seek(0,0) 97 self.assertEqual(m.tell(), 0) 98 m.seek(42,1) 99 self.assertEqual(m.tell(), 42) 100 m.seek(0,2) 101 self.assertEqual(m.tell(), len(m)) 102 103 # Try to seek to negative position... 104 self.assertRaises(ValueError, m.seek, -1) 105 106 # Try to seek beyond end of mmap... 107 self.assertRaises(ValueError, m.seek, 1, 2) 108 109 # Try to seek to negative position... 110 self.assertRaises(ValueError, m.seek, -len(m)-1, 2) 111 112 # Try resizing map 113 try: 114 m.resize(512) 115 except SystemError: 116 # resize() not supported 117 # No messages are printed, since the output of this test suite 118 # would then be different across platforms. 119 pass 120 else: 121 # resize() is supported 122 self.assertEqual(len(m), 512) 123 # Check that we can no longer seek beyond the new size. 124 self.assertRaises(ValueError, m.seek, 513, 0) 125 126 # Check that the underlying file is truncated too 127 # (bug #728515) 128 f = open(TESTFN, 'rb') 129 try: 130 f.seek(0, 2) 131 self.assertEqual(f.tell(), 512) 132 finally: 133 f.close() 134 self.assertEqual(m.size(), 512) 135 136 m.close() 137 138 def test_access_parameter(self): 139 # Test for "access" keyword parameter 140 mapsize = 10 141 with open(TESTFN, "wb") as fp: 142 fp.write(b"a"*mapsize) 143 with open(TESTFN, "rb") as f: 144 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ) 145 self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.") 146 147 # Ensuring that readonly mmap can't be slice assigned 148 try: 149 m[:] = b'b'*mapsize 150 except TypeError: 151 pass 152 else: 153 self.fail("Able to write to readonly memory map") 154 155 # Ensuring that readonly mmap can't be item assigned 156 try: 157 m[0] = b'b' 158 except TypeError: 159 pass 160 else: 161 self.fail("Able to write to readonly memory map") 162 163 # Ensuring that readonly mmap can't be write() to 164 try: 165 m.seek(0,0) 166 m.write(b'abc') 167 except TypeError: 168 pass 169 else: 170 self.fail("Able to write to readonly memory map") 171 172 # Ensuring that readonly mmap can't be write_byte() to 173 try: 174 m.seek(0,0) 175 m.write_byte(b'd') 176 except TypeError: 177 pass 178 else: 179 self.fail("Able to write to readonly memory map") 180 181 # Ensuring that readonly mmap can't be resized 182 try: 183 m.resize(2*mapsize) 184 except SystemError: # resize is not universally supported 185 pass 186 except TypeError: 187 pass 188 else: 189 self.fail("Able to resize readonly memory map") 190 with open(TESTFN, "rb") as fp: 191 self.assertEqual(fp.read(), b'a'*mapsize, 192 "Readonly memory map data file was modified") 193 194 # Opening mmap with size too big 195 with open(TESTFN, "r+b") as f: 196 try: 197 m = mmap.mmap(f.fileno(), mapsize+1) 198 except ValueError: 199 # we do not expect a ValueError on Windows 200 # CAUTION: This also changes the size of the file on disk, and 201 # later tests assume that the length hasn't changed. We need to 202 # repair that. 203 if sys.platform.startswith('win'): 204 self.fail("Opening mmap with size+1 should work on Windows.") 205 else: 206 # we expect a ValueError on Unix, but not on Windows 207 if not sys.platform.startswith('win'): 208 self.fail("Opening mmap with size+1 should raise ValueError.") 209 m.close() 210 if sys.platform.startswith('win'): 211 # Repair damage from the resizing test. 212 with open(TESTFN, 'r+b') as f: 213 f.truncate(mapsize) 214 215 # Opening mmap with access=ACCESS_WRITE 216 with open(TESTFN, "r+b") as f: 217 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE) 218 # Modifying write-through memory map 219 m[:] = b'c'*mapsize 220 self.assertEqual(m[:], b'c'*mapsize, 221 "Write-through memory map memory not updated properly.") 222 m.flush() 223 m.close() 224 with open(TESTFN, 'rb') as f: 225 stuff = f.read() 226 self.assertEqual(stuff, b'c'*mapsize, 227 "Write-through memory map data file not updated properly.") 228 229 # Opening mmap with access=ACCESS_COPY 230 with open(TESTFN, "r+b") as f: 231 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY) 232 # Modifying copy-on-write memory map 233 m[:] = b'd'*mapsize 234 self.assertEqual(m[:], b'd' * mapsize, 235 "Copy-on-write memory map data not written correctly.") 236 m.flush() 237 with open(TESTFN, "rb") as fp: 238 self.assertEqual(fp.read(), b'c'*mapsize, 239 "Copy-on-write test data file should not be modified.") 240 # Ensuring copy-on-write maps cannot be resized 241 self.assertRaises(TypeError, m.resize, 2*mapsize) 242 m.close() 243 244 # Ensuring invalid access parameter raises exception 245 with open(TESTFN, "r+b") as f: 246 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4) 247 248 if os.name == "posix": 249 # Try incompatible flags, prot and access parameters. 250 with open(TESTFN, "r+b") as f: 251 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, 252 flags=mmap.MAP_PRIVATE, 253 prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE) 254 255 # Try writing with PROT_EXEC and without PROT_WRITE 256 prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0) 257 with open(TESTFN, "r+b") as f: 258 m = mmap.mmap(f.fileno(), mapsize, prot=prot) 259 self.assertRaises(TypeError, m.write, b"abcdef") 260 self.assertRaises(TypeError, m.write_byte, 0) 261 m.close() 262 263 def test_bad_file_desc(self): 264 # Try opening a bad file descriptor... 265 self.assertRaises(OSError, mmap.mmap, -2, 4096) 266 267 def test_tougher_find(self): 268 # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2, 269 # searching for data with embedded \0 bytes didn't work. 270 with open(TESTFN, 'wb+') as f: 271 272 data = b'aabaac\x00deef\x00\x00aa\x00' 273 n = len(data) 274 f.write(data) 275 f.flush() 276 m = mmap.mmap(f.fileno(), n) 277 278 for start in range(n+1): 279 for finish in range(start, n+1): 280 slice = data[start : finish] 281 self.assertEqual(m.find(slice), data.find(slice)) 282 self.assertEqual(m.find(slice + b'x'), -1) 283 m.close() 284 285 def test_find_end(self): 286 # test the new 'end' parameter works as expected 287 with open(TESTFN, 'wb+') as f: 288 data = b'one two ones' 289 n = len(data) 290 f.write(data) 291 f.flush() 292 m = mmap.mmap(f.fileno(), n) 293 294 self.assertEqual(m.find(b'one'), 0) 295 self.assertEqual(m.find(b'ones'), 8) 296 self.assertEqual(m.find(b'one', 0, -1), 0) 297 self.assertEqual(m.find(b'one', 1), 8) 298 self.assertEqual(m.find(b'one', 1, -1), 8) 299 self.assertEqual(m.find(b'one', 1, -2), -1) 300 self.assertEqual(m.find(bytearray(b'one')), 0) 301 302 303 def test_rfind(self): 304 # test the new 'end' parameter works as expected 305 with open(TESTFN, 'wb+') as f: 306 data = b'one two ones' 307 n = len(data) 308 f.write(data) 309 f.flush() 310 m = mmap.mmap(f.fileno(), n) 311 312 self.assertEqual(m.rfind(b'one'), 8) 313 self.assertEqual(m.rfind(b'one '), 0) 314 self.assertEqual(m.rfind(b'one', 0, -1), 8) 315 self.assertEqual(m.rfind(b'one', 0, -2), 0) 316 self.assertEqual(m.rfind(b'one', 1, -1), 8) 317 self.assertEqual(m.rfind(b'one', 1, -2), -1) 318 self.assertEqual(m.rfind(bytearray(b'one')), 8) 319 320 321 def test_double_close(self): 322 # make sure a double close doesn't crash on Solaris (Bug# 665913) 323 with open(TESTFN, 'wb+') as f: 324 f.write(2**16 * b'a') # Arbitrary character 325 326 with open(TESTFN, 'rb') as f: 327 mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ) 328 mf.close() 329 mf.close() 330 331 def test_entire_file(self): 332 # test mapping of entire file by passing 0 for map length 333 with open(TESTFN, "wb+") as f: 334 f.write(2**16 * b'm') # Arbitrary character 335 336 with open(TESTFN, "rb+") as f, \ 337 mmap.mmap(f.fileno(), 0) as mf: 338 self.assertEqual(len(mf), 2**16, "Map size should equal file size.") 339 self.assertEqual(mf.read(2**16), 2**16 * b"m") 340 341 def test_length_0_offset(self): 342 # Issue #10916: test mapping of remainder of file by passing 0 for 343 # map length with an offset doesn't cause a segfault. 344 # NOTE: allocation granularity is currently 65536 under Win64, 345 # and therefore the minimum offset alignment. 346 with open(TESTFN, "wb") as f: 347 f.write((65536 * 2) * b'm') # Arbitrary character 348 349 with open(TESTFN, "rb") as f: 350 with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf: 351 self.assertRaises(IndexError, mf.__getitem__, 80000) 352 353 def test_length_0_large_offset(self): 354 # Issue #10959: test mapping of a file by passing 0 for 355 # map length with a large offset doesn't cause a segfault. 356 with open(TESTFN, "wb") as f: 357 f.write(115699 * b'm') # Arbitrary character 358 359 with open(TESTFN, "w+b") as f: 360 self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0, 361 offset=2147418112) 362 363 def test_move(self): 364 # make move works everywhere (64-bit format problem earlier) 365 with open(TESTFN, 'wb+') as f: 366 367 f.write(b"ABCDEabcde") # Arbitrary character 368 f.flush() 369 370 mf = mmap.mmap(f.fileno(), 10) 371 mf.move(5, 0, 5) 372 self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5") 373 mf.close() 374 375 # more excessive test 376 data = b"0123456789" 377 for dest in range(len(data)): 378 for src in range(len(data)): 379 for count in range(len(data) - max(dest, src)): 380 expected = data[:dest] + data[src:src+count] + data[dest+count:] 381 m = mmap.mmap(-1, len(data)) 382 m[:] = data 383 m.move(dest, src, count) 384 self.assertEqual(m[:], expected) 385 m.close() 386 387 # segfault test (Issue 5387) 388 m = mmap.mmap(-1, 100) 389 offsets = [-100, -1, 0, 1, 100] 390 for source, dest, size in itertools.product(offsets, offsets, offsets): 391 try: 392 m.move(source, dest, size) 393 except ValueError: 394 pass 395 396 offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1), 397 (-1, 0, 0), (0, -1, 0), (0, 0, -1)] 398 for source, dest, size in offsets: 399 self.assertRaises(ValueError, m.move, source, dest, size) 400 401 m.close() 402 403 m = mmap.mmap(-1, 1) # single byte 404 self.assertRaises(ValueError, m.move, 0, 0, 2) 405 self.assertRaises(ValueError, m.move, 1, 0, 1) 406 self.assertRaises(ValueError, m.move, 0, 1, 1) 407 m.move(0, 0, 1) 408 m.move(0, 0, 0) 409 410 def test_anonymous(self): 411 # anonymous mmap.mmap(-1, PAGE) 412 m = mmap.mmap(-1, PAGESIZE) 413 for x in range(PAGESIZE): 414 self.assertEqual(m[x], 0, 415 "anonymously mmap'ed contents should be zero") 416 417 for x in range(PAGESIZE): 418 b = x & 0xff 419 m[x] = b 420 self.assertEqual(m[x], b) 421 422 def test_read_all(self): 423 m = mmap.mmap(-1, 16) 424 self.addCleanup(m.close) 425 426 # With no parameters, or None or a negative argument, reads all 427 m.write(bytes(range(16))) 428 m.seek(0) 429 self.assertEqual(m.read(), bytes(range(16))) 430 m.seek(8) 431 self.assertEqual(m.read(), bytes(range(8, 16))) 432 m.seek(16) 433 self.assertEqual(m.read(), b'') 434 m.seek(3) 435 self.assertEqual(m.read(None), bytes(range(3, 16))) 436 m.seek(4) 437 self.assertEqual(m.read(-1), bytes(range(4, 16))) 438 m.seek(5) 439 self.assertEqual(m.read(-2), bytes(range(5, 16))) 440 m.seek(9) 441 self.assertEqual(m.read(-42), bytes(range(9, 16))) 442 443 def test_read_invalid_arg(self): 444 m = mmap.mmap(-1, 16) 445 self.addCleanup(m.close) 446 447 self.assertRaises(TypeError, m.read, 'foo') 448 self.assertRaises(TypeError, m.read, 5.5) 449 self.assertRaises(TypeError, m.read, [1, 2, 3]) 450 451 def test_extended_getslice(self): 452 # Test extended slicing by comparing with list slicing. 453 s = bytes(reversed(range(256))) 454 m = mmap.mmap(-1, len(s)) 455 m[:] = s 456 self.assertEqual(m[:], s) 457 indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300) 458 for start in indices: 459 for stop in indices: 460 # Skip step 0 (invalid) 461 for step in indices[1:]: 462 self.assertEqual(m[start:stop:step], 463 s[start:stop:step]) 464 465 def test_extended_set_del_slice(self): 466 # Test extended slicing by comparing with list slicing. 467 s = bytes(reversed(range(256))) 468 m = mmap.mmap(-1, len(s)) 469 indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300) 470 for start in indices: 471 for stop in indices: 472 # Skip invalid step 0 473 for step in indices[1:]: 474 m[:] = s 475 self.assertEqual(m[:], s) 476 L = list(s) 477 # Make sure we have a slice of exactly the right length, 478 # but with different data. 479 data = L[start:stop:step] 480 data = bytes(reversed(data)) 481 L[start:stop:step] = data 482 m[start:stop:step] = data 483 self.assertEqual(m[:], bytes(L)) 484 485 def make_mmap_file (self, f, halfsize): 486 # Write 2 pages worth of data to the file 487 f.write (b'\0' * halfsize) 488 f.write (b'foo') 489 f.write (b'\0' * (halfsize - 3)) 490 f.flush () 491 return mmap.mmap (f.fileno(), 0) 492 493 def test_empty_file (self): 494 f = open (TESTFN, 'w+b') 495 f.close() 496 with open(TESTFN, "rb") as f : 497 self.assertRaisesRegex(ValueError, 498 "cannot mmap an empty file", 499 mmap.mmap, f.fileno(), 0, 500 access=mmap.ACCESS_READ) 501 502 def test_offset (self): 503 f = open (TESTFN, 'w+b') 504 505 try: # unlink TESTFN no matter what 506 halfsize = mmap.ALLOCATIONGRANULARITY 507 m = self.make_mmap_file (f, halfsize) 508 m.close () 509 f.close () 510 511 mapsize = halfsize * 2 512 # Try invalid offset 513 f = open(TESTFN, "r+b") 514 for offset in [-2, -1, None]: 515 try: 516 m = mmap.mmap(f.fileno(), mapsize, offset=offset) 517 self.assertEqual(0, 1) 518 except (ValueError, TypeError, OverflowError): 519 pass 520 else: 521 self.assertEqual(0, 0) 522 f.close() 523 524 # Try valid offset, hopefully 8192 works on all OSes 525 f = open(TESTFN, "r+b") 526 m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize) 527 self.assertEqual(m[0:3], b'foo') 528 f.close() 529 530 # Try resizing map 531 try: 532 m.resize(512) 533 except SystemError: 534 pass 535 else: 536 # resize() is supported 537 self.assertEqual(len(m), 512) 538 # Check that we can no longer seek beyond the new size. 539 self.assertRaises(ValueError, m.seek, 513, 0) 540 # Check that the content is not changed 541 self.assertEqual(m[0:3], b'foo') 542 543 # Check that the underlying file is truncated too 544 f = open(TESTFN, 'rb') 545 f.seek(0, 2) 546 self.assertEqual(f.tell(), halfsize + 512) 547 f.close() 548 self.assertEqual(m.size(), halfsize + 512) 549 550 m.close() 551 552 finally: 553 f.close() 554 try: 555 os.unlink(TESTFN) 556 except OSError: 557 pass 558 559 def test_subclass(self): 560 class anon_mmap(mmap.mmap): 561 def __new__(klass, *args, **kwargs): 562 return mmap.mmap.__new__(klass, -1, *args, **kwargs) 563 anon_mmap(PAGESIZE) 564 565 @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ") 566 def test_prot_readonly(self): 567 mapsize = 10 568 with open(TESTFN, "wb") as fp: 569 fp.write(b"a"*mapsize) 570 with open(TESTFN, "rb") as f: 571 m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ) 572 self.assertRaises(TypeError, m.write, "foo") 573 574 def test_error(self): 575 self.assertIs(mmap.error, OSError) 576 577 def test_io_methods(self): 578 data = b"0123456789" 579 with open(TESTFN, "wb") as fp: 580 fp.write(b"x"*len(data)) 581 with open(TESTFN, "r+b") as f: 582 m = mmap.mmap(f.fileno(), len(data)) 583 # Test write_byte() 584 for i in range(len(data)): 585 self.assertEqual(m.tell(), i) 586 m.write_byte(data[i]) 587 self.assertEqual(m.tell(), i+1) 588 self.assertRaises(ValueError, m.write_byte, b"x"[0]) 589 self.assertEqual(m[:], data) 590 # Test read_byte() 591 m.seek(0) 592 for i in range(len(data)): 593 self.assertEqual(m.tell(), i) 594 self.assertEqual(m.read_byte(), data[i]) 595 self.assertEqual(m.tell(), i+1) 596 self.assertRaises(ValueError, m.read_byte) 597 # Test read() 598 m.seek(3) 599 self.assertEqual(m.read(3), b"345") 600 self.assertEqual(m.tell(), 6) 601 # Test write() 602 m.seek(3) 603 m.write(b"bar") 604 self.assertEqual(m.tell(), 6) 605 self.assertEqual(m[:], b"012bar6789") 606 m.write(bytearray(b"baz")) 607 self.assertEqual(m.tell(), 9) 608 self.assertEqual(m[:], b"012barbaz9") 609 self.assertRaises(ValueError, m.write, b"ba") 610 611 def test_non_ascii_byte(self): 612 for b in (129, 200, 255): # > 128 613 m = mmap.mmap(-1, 1) 614 m.write_byte(b) 615 self.assertEqual(m[0], b) 616 m.seek(0) 617 self.assertEqual(m.read_byte(), b) 618 m.close() 619 620 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 621 def test_tagname(self): 622 data1 = b"0123456789" 623 data2 = b"abcdefghij" 624 assert len(data1) == len(data2) 625 tagname1 = random_tagname() 626 tagname2 = random_tagname() 627 628 # Test same tag 629 m1 = mmap.mmap(-1, len(data1), tagname=tagname1) 630 m1[:] = data1 631 m2 = mmap.mmap(-1, len(data2), tagname=tagname1) 632 m2[:] = data2 633 self.assertEqual(m1[:], data2) 634 self.assertEqual(m2[:], data2) 635 m2.close() 636 m1.close() 637 638 # Test different tag 639 m1 = mmap.mmap(-1, len(data1), tagname=tagname1) 640 m1[:] = data1 641 m2 = mmap.mmap(-1, len(data2), tagname=tagname2) 642 m2[:] = data2 643 self.assertEqual(m1[:], data1) 644 self.assertEqual(m2[:], data2) 645 m2.close() 646 m1.close() 647 648 @cpython_only 649 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 650 def test_sizeof(self): 651 m1 = mmap.mmap(-1, 100) 652 tagname = random_tagname() 653 m2 = mmap.mmap(-1, 100, tagname=tagname) 654 self.assertEqual(sys.getsizeof(m2), 655 sys.getsizeof(m1) + len(tagname) + 1) 656 657 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 658 def test_crasher_on_windows(self): 659 # Should not crash (Issue 1733986) 660 tagname = random_tagname() 661 m = mmap.mmap(-1, 1000, tagname=tagname) 662 try: 663 mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size 664 except: 665 pass 666 m.close() 667 668 # Should not crash (Issue 5385) 669 with open(TESTFN, "wb") as fp: 670 fp.write(b"x"*10) 671 f = open(TESTFN, "r+b") 672 m = mmap.mmap(f.fileno(), 0) 673 f.close() 674 try: 675 m.resize(0) # will raise OSError 676 except: 677 pass 678 try: 679 m[:] 680 except: 681 pass 682 m.close() 683 684 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 685 def test_invalid_descriptor(self): 686 # socket file descriptors are valid, but out of range 687 # for _get_osfhandle, causing a crash when validating the 688 # parameters to _get_osfhandle. 689 s = socket.socket() 690 try: 691 with self.assertRaises(OSError): 692 m = mmap.mmap(s.fileno(), 10) 693 finally: 694 s.close() 695 696 def test_context_manager(self): 697 with mmap.mmap(-1, 10) as m: 698 self.assertFalse(m.closed) 699 self.assertTrue(m.closed) 700 701 def test_context_manager_exception(self): 702 # Test that the OSError gets passed through 703 with self.assertRaises(Exception) as exc: 704 with mmap.mmap(-1, 10) as m: 705 raise OSError 706 self.assertIsInstance(exc.exception, OSError, 707 "wrong exception raised in context manager") 708 self.assertTrue(m.closed, "context manager failed") 709 710 def test_weakref(self): 711 # Check mmap objects are weakrefable 712 mm = mmap.mmap(-1, 16) 713 wr = weakref.ref(mm) 714 self.assertIs(wr(), mm) 715 del mm 716 gc_collect() 717 self.assertIs(wr(), None) 718 719 def test_write_returning_the_number_of_bytes_written(self): 720 mm = mmap.mmap(-1, 16) 721 self.assertEqual(mm.write(b""), 0) 722 self.assertEqual(mm.write(b"x"), 1) 723 self.assertEqual(mm.write(b"yz"), 2) 724 self.assertEqual(mm.write(b"python"), 6) 725 726 def test_resize_past_pos(self): 727 m = mmap.mmap(-1, 8192) 728 self.addCleanup(m.close) 729 m.read(5000) 730 try: 731 m.resize(4096) 732 except SystemError: 733 self.skipTest("resizing not supported") 734 self.assertEqual(m.read(14), b'') 735 self.assertRaises(ValueError, m.read_byte) 736 self.assertRaises(ValueError, m.write_byte, 42) 737 self.assertRaises(ValueError, m.write, b'abc') 738 739 def test_concat_repeat_exception(self): 740 m = mmap.mmap(-1, 16) 741 with self.assertRaises(TypeError): 742 m + m 743 with self.assertRaises(TypeError): 744 m * 2 745 746 def test_flush_return_value(self): 747 # mm.flush() should return None on success, raise an 748 # exception on error under all platforms. 749 mm = mmap.mmap(-1, 16) 750 self.addCleanup(mm.close) 751 mm.write(b'python') 752 result = mm.flush() 753 self.assertIsNone(result) 754 if sys.platform.startswith('linux'): 755 # 'offset' must be a multiple of mmap.PAGESIZE on Linux. 756 # See bpo-34754 for details. 757 self.assertRaises(OSError, mm.flush, 1, len(b'python')) 758 759 def test_repr(self): 760 open_mmap_repr_pat = re.compile( 761 r"<mmap.mmap closed=False, " 762 r"access=(?P<access>\S+), " 763 r"length=(?P<length>\d+), " 764 r"pos=(?P<pos>\d+), " 765 r"offset=(?P<offset>\d+)>") 766 closed_mmap_repr_pat = re.compile(r"<mmap.mmap closed=True>") 767 mapsizes = (50, 100, 1_000, 1_000_000, 10_000_000) 768 offsets = tuple((mapsize // 2 // mmap.ALLOCATIONGRANULARITY) 769 * mmap.ALLOCATIONGRANULARITY for mapsize in mapsizes) 770 for offset, mapsize in zip(offsets, mapsizes): 771 data = b'a' * mapsize 772 length = mapsize - offset 773 accesses = ('ACCESS_DEFAULT', 'ACCESS_READ', 774 'ACCESS_COPY', 'ACCESS_WRITE') 775 positions = (0, length//10, length//5, length//4) 776 with open(TESTFN, "wb+") as fp: 777 fp.write(data) 778 fp.flush() 779 for access, pos in itertools.product(accesses, positions): 780 accint = getattr(mmap, access) 781 with mmap.mmap(fp.fileno(), 782 length, 783 access=accint, 784 offset=offset) as mm: 785 mm.seek(pos) 786 match = open_mmap_repr_pat.match(repr(mm)) 787 self.assertIsNotNone(match) 788 self.assertEqual(match.group('access'), access) 789 self.assertEqual(match.group('length'), str(length)) 790 self.assertEqual(match.group('pos'), str(pos)) 791 self.assertEqual(match.group('offset'), str(offset)) 792 match = closed_mmap_repr_pat.match(repr(mm)) 793 self.assertIsNotNone(match) 794 795 @unittest.skipUnless(hasattr(mmap.mmap, 'madvise'), 'needs madvise') 796 def test_madvise(self): 797 size = 2 * PAGESIZE 798 m = mmap.mmap(-1, size) 799 800 with self.assertRaisesRegex(ValueError, "madvise start out of bounds"): 801 m.madvise(mmap.MADV_NORMAL, size) 802 with self.assertRaisesRegex(ValueError, "madvise start out of bounds"): 803 m.madvise(mmap.MADV_NORMAL, -1) 804 with self.assertRaisesRegex(ValueError, "madvise length invalid"): 805 m.madvise(mmap.MADV_NORMAL, 0, -1) 806 with self.assertRaisesRegex(OverflowError, "madvise length too large"): 807 m.madvise(mmap.MADV_NORMAL, PAGESIZE, sys.maxsize) 808 self.assertEqual(m.madvise(mmap.MADV_NORMAL), None) 809 self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE), None) 810 self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE, size), None) 811 self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None) 812 self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None) 813 814 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 815 def test_resize_up_when_mapped_to_pagefile(self): 816 """If the mmap is backed by the pagefile ensure a resize up can happen 817 and that the original data is still in place 818 """ 819 start_size = PAGESIZE 820 new_size = 2 * start_size 821 data = bytes(random.getrandbits(8) for _ in range(start_size)) 822 823 m = mmap.mmap(-1, start_size) 824 m[:] = data 825 m.resize(new_size) 826 self.assertEqual(len(m), new_size) 827 self.assertEqual(m[:start_size], data[:start_size]) 828 829 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 830 def test_resize_down_when_mapped_to_pagefile(self): 831 """If the mmap is backed by the pagefile ensure a resize down up can happen 832 and that a truncated form of the original data is still in place 833 """ 834 start_size = PAGESIZE 835 new_size = start_size // 2 836 data = bytes(random.getrandbits(8) for _ in range(start_size)) 837 838 m = mmap.mmap(-1, start_size) 839 m[:] = data 840 m.resize(new_size) 841 self.assertEqual(len(m), new_size) 842 self.assertEqual(m[:new_size], data[:new_size]) 843 844 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 845 def test_resize_fails_if_mapping_held_elsewhere(self): 846 """If more than one mapping is held against a named file on Windows, neither 847 mapping can be resized 848 """ 849 start_size = 2 * PAGESIZE 850 reduced_size = PAGESIZE 851 852 f = open(TESTFN, 'wb+') 853 f.truncate(start_size) 854 try: 855 m1 = mmap.mmap(f.fileno(), start_size) 856 m2 = mmap.mmap(f.fileno(), start_size) 857 with self.assertRaises(OSError): 858 m1.resize(reduced_size) 859 with self.assertRaises(OSError): 860 m2.resize(reduced_size) 861 m2.close() 862 m1.resize(reduced_size) 863 self.assertEqual(m1.size(), reduced_size) 864 self.assertEqual(os.stat(f.fileno()).st_size, reduced_size) 865 finally: 866 f.close() 867 868 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 869 def test_resize_succeeds_with_error_for_second_named_mapping(self): 870 """If a more than one mapping exists of the same name, none of them can 871 be resized: they'll raise an Exception and leave the original mapping intact 872 """ 873 start_size = 2 * PAGESIZE 874 reduced_size = PAGESIZE 875 tagname = random_tagname() 876 data_length = 8 877 data = bytes(random.getrandbits(8) for _ in range(data_length)) 878 879 m1 = mmap.mmap(-1, start_size, tagname=tagname) 880 m2 = mmap.mmap(-1, start_size, tagname=tagname) 881 m1[:data_length] = data 882 self.assertEqual(m2[:data_length], data) 883 with self.assertRaises(OSError): 884 m1.resize(reduced_size) 885 self.assertEqual(m1.size(), start_size) 886 self.assertEqual(m1[:data_length], data) 887 self.assertEqual(m2[:data_length], data) 888 889 def test_mmap_closed_by_int_scenarios(self): 890 """ 891 gh-103987: Test that mmap objects raise ValueError 892 for closed mmap files 893 """ 894 895 class MmapClosedByIntContext: 896 def __init__(self, access) -> None: 897 self.access = access 898 899 def __enter__(self): 900 self.f = open(TESTFN, "w+b") 901 self.f.write(random.randbytes(100)) 902 self.f.flush() 903 904 m = mmap.mmap(self.f.fileno(), 100, access=self.access) 905 906 class X: 907 def __index__(self): 908 m.close() 909 return 10 910 911 return (m, X) 912 913 def __exit__(self, exc_type, exc_value, traceback): 914 self.f.close() 915 916 read_access_modes = [ 917 mmap.ACCESS_READ, 918 mmap.ACCESS_WRITE, 919 mmap.ACCESS_COPY, 920 mmap.ACCESS_DEFAULT, 921 ] 922 923 write_access_modes = [ 924 mmap.ACCESS_WRITE, 925 mmap.ACCESS_COPY, 926 mmap.ACCESS_DEFAULT, 927 ] 928 929 for access in read_access_modes: 930 with MmapClosedByIntContext(access) as (m, X): 931 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 932 m[X()] 933 934 with MmapClosedByIntContext(access) as (m, X): 935 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 936 m[X() : 20] 937 938 with MmapClosedByIntContext(access) as (m, X): 939 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 940 m[X() : 20 : 2] 941 942 with MmapClosedByIntContext(access) as (m, X): 943 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 944 m[20 : X() : -2] 945 946 with MmapClosedByIntContext(access) as (m, X): 947 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 948 m.read(X()) 949 950 with MmapClosedByIntContext(access) as (m, X): 951 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 952 m.find(b"1", 1, X()) 953 954 for access in write_access_modes: 955 with MmapClosedByIntContext(access) as (m, X): 956 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 957 m[X() : 20] = b"1" * 10 958 959 with MmapClosedByIntContext(access) as (m, X): 960 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 961 m[X() : 20 : 2] = b"1" * 5 962 963 with MmapClosedByIntContext(access) as (m, X): 964 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 965 m[20 : X() : -2] = b"1" * 5 966 967 with MmapClosedByIntContext(access) as (m, X): 968 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 969 m.move(1, 2, X()) 970 971 with MmapClosedByIntContext(access) as (m, X): 972 with self.assertRaisesRegex(ValueError, "mmap closed or invalid"): 973 m.write_byte(X()) 974 975class LargeMmapTests(unittest.TestCase): 976 977 def setUp(self): 978 unlink(TESTFN) 979 980 def tearDown(self): 981 unlink(TESTFN) 982 983 def _make_test_file(self, num_zeroes, tail): 984 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 985 requires('largefile', 986 'test requires %s bytes and a long time to run' % str(0x180000000)) 987 f = open(TESTFN, 'w+b') 988 try: 989 f.seek(num_zeroes) 990 f.write(tail) 991 f.flush() 992 except (OSError, OverflowError, ValueError): 993 try: 994 f.close() 995 except (OSError, OverflowError): 996 pass 997 raise unittest.SkipTest("filesystem does not have largefile support") 998 return f 999 1000 def test_large_offset(self): 1001 with self._make_test_file(0x14FFFFFFF, b" ") as f: 1002 with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m: 1003 self.assertEqual(m[0xFFFFFFF], 32) 1004 1005 def test_large_filesize(self): 1006 with self._make_test_file(0x17FFFFFFF, b" ") as f: 1007 if sys.maxsize < 0x180000000: 1008 # On 32 bit platforms the file is larger than sys.maxsize so 1009 # mapping the whole file should fail -- Issue #16743 1010 with self.assertRaises(OverflowError): 1011 mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ) 1012 with self.assertRaises(ValueError): 1013 mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 1014 with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m: 1015 self.assertEqual(m.size(), 0x180000000) 1016 1017 # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X. 1018 1019 def _test_around_boundary(self, boundary): 1020 tail = b' DEARdear ' 1021 start = boundary - len(tail) // 2 1022 end = start + len(tail) 1023 with self._make_test_file(start, tail) as f: 1024 with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m: 1025 self.assertEqual(m[start:end], tail) 1026 1027 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") 1028 def test_around_2GB(self): 1029 self._test_around_boundary(_2G) 1030 1031 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") 1032 def test_around_4GB(self): 1033 self._test_around_boundary(_4G) 1034 1035 1036if __name__ == '__main__': 1037 unittest.main() 1038