1from test.support import (
2    requires, _2G, _4G, gc_collect, cpython_only, is_emscripten
3)
4from test.support.import_helper import import_module
5from test.support.os_helper import TESTFN, unlink
6import unittest
7import os
8import re
9import itertools
10import random
11import socket
12import string
13import sys
14import weakref
15
16# Skip test if we can't import mmap.
17mmap = import_module('mmap')
18
19PAGESIZE = mmap.PAGESIZE
20
21tagname_prefix = f'python_{os.getpid()}_test_mmap'
22def random_tagname(length=10):
23    suffix = ''.join(random.choices(string.ascii_uppercase, k=length))
24    return f'{tagname_prefix}_{suffix}'
25
26# Python's mmap module dup()s the file descriptor. Emscripten's FS layer
27# does not materialize file changes through a dupped fd to a new mmap.
28if is_emscripten:
29    raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.")
30
31
32class MmapTests(unittest.TestCase):
33
34    def setUp(self):
35        if os.path.exists(TESTFN):
36            os.unlink(TESTFN)
37
38    def tearDown(self):
39        try:
40            os.unlink(TESTFN)
41        except OSError:
42            pass
43
44    def test_basic(self):
45        # Test mmap module on Unix systems and Windows
46
47        # Create a file to be mmap'ed.
48        f = open(TESTFN, 'bw+')
49        try:
50            # Write 2 pages worth of data to the file
51            f.write(b'\0'* PAGESIZE)
52            f.write(b'foo')
53            f.write(b'\0'* (PAGESIZE-3) )
54            f.flush()
55            m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
56        finally:
57            f.close()
58
59        # Simple sanity checks
60
61        tp = str(type(m))  # SF bug 128713:  segfaulted on Linux
62        self.assertEqual(m.find(b'foo'), PAGESIZE)
63
64        self.assertEqual(len(m), 2*PAGESIZE)
65
66        self.assertEqual(m[0], 0)
67        self.assertEqual(m[0:3], b'\0\0\0')
68
69        # Shouldn't crash on boundary (Issue #5292)
70        self.assertRaises(IndexError, m.__getitem__, len(m))
71        self.assertRaises(IndexError, m.__setitem__, len(m), b'\0')
72
73        # Modify the file's content
74        m[0] = b'3'[0]
75        m[PAGESIZE +3: PAGESIZE +3+3] = b'bar'
76
77        # Check that the modification worked
78        self.assertEqual(m[0], b'3'[0])
79        self.assertEqual(m[0:3], b'3\0\0')
80        self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0')
81
82        m.flush()
83
84        # Test doing a regular expression match in an mmap'ed file
85        match = re.search(b'[A-Za-z]+', m)
86        if match is None:
87            self.fail('regex match on mmap failed!')
88        else:
89            start, end = match.span(0)
90            length = end - start
91
92            self.assertEqual(start, PAGESIZE)
93            self.assertEqual(end, PAGESIZE + 6)
94
95        # test seeking around (try to overflow the seek implementation)
96        m.seek(0,0)
97        self.assertEqual(m.tell(), 0)
98        m.seek(42,1)
99        self.assertEqual(m.tell(), 42)
100        m.seek(0,2)
101        self.assertEqual(m.tell(), len(m))
102
103        # Try to seek to negative position...
104        self.assertRaises(ValueError, m.seek, -1)
105
106        # Try to seek beyond end of mmap...
107        self.assertRaises(ValueError, m.seek, 1, 2)
108
109        # Try to seek to negative position...
110        self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
111
112        # Try resizing map
113        try:
114            m.resize(512)
115        except SystemError:
116            # resize() not supported
117            # No messages are printed, since the output of this test suite
118            # would then be different across platforms.
119            pass
120        else:
121            # resize() is supported
122            self.assertEqual(len(m), 512)
123            # Check that we can no longer seek beyond the new size.
124            self.assertRaises(ValueError, m.seek, 513, 0)
125
126            # Check that the underlying file is truncated too
127            # (bug #728515)
128            f = open(TESTFN, 'rb')
129            try:
130                f.seek(0, 2)
131                self.assertEqual(f.tell(), 512)
132            finally:
133                f.close()
134            self.assertEqual(m.size(), 512)
135
136        m.close()
137
138    def test_access_parameter(self):
139        # Test for "access" keyword parameter
140        mapsize = 10
141        with open(TESTFN, "wb") as fp:
142            fp.write(b"a"*mapsize)
143        with open(TESTFN, "rb") as f:
144            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
145            self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
146
147            # Ensuring that readonly mmap can't be slice assigned
148            try:
149                m[:] = b'b'*mapsize
150            except TypeError:
151                pass
152            else:
153                self.fail("Able to write to readonly memory map")
154
155            # Ensuring that readonly mmap can't be item assigned
156            try:
157                m[0] = b'b'
158            except TypeError:
159                pass
160            else:
161                self.fail("Able to write to readonly memory map")
162
163            # Ensuring that readonly mmap can't be write() to
164            try:
165                m.seek(0,0)
166                m.write(b'abc')
167            except TypeError:
168                pass
169            else:
170                self.fail("Able to write to readonly memory map")
171
172            # Ensuring that readonly mmap can't be write_byte() to
173            try:
174                m.seek(0,0)
175                m.write_byte(b'd')
176            except TypeError:
177                pass
178            else:
179                self.fail("Able to write to readonly memory map")
180
181            # Ensuring that readonly mmap can't be resized
182            try:
183                m.resize(2*mapsize)
184            except SystemError:   # resize is not universally supported
185                pass
186            except TypeError:
187                pass
188            else:
189                self.fail("Able to resize readonly memory map")
190            with open(TESTFN, "rb") as fp:
191                self.assertEqual(fp.read(), b'a'*mapsize,
192                                 "Readonly memory map data file was modified")
193
194        # Opening mmap with size too big
195        with open(TESTFN, "r+b") as f:
196            try:
197                m = mmap.mmap(f.fileno(), mapsize+1)
198            except ValueError:
199                # we do not expect a ValueError on Windows
200                # CAUTION:  This also changes the size of the file on disk, and
201                # later tests assume that the length hasn't changed.  We need to
202                # repair that.
203                if sys.platform.startswith('win'):
204                    self.fail("Opening mmap with size+1 should work on Windows.")
205            else:
206                # we expect a ValueError on Unix, but not on Windows
207                if not sys.platform.startswith('win'):
208                    self.fail("Opening mmap with size+1 should raise ValueError.")
209                m.close()
210            if sys.platform.startswith('win'):
211                # Repair damage from the resizing test.
212                with open(TESTFN, 'r+b') as f:
213                    f.truncate(mapsize)
214
215        # Opening mmap with access=ACCESS_WRITE
216        with open(TESTFN, "r+b") as f:
217            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
218            # Modifying write-through memory map
219            m[:] = b'c'*mapsize
220            self.assertEqual(m[:], b'c'*mapsize,
221                   "Write-through memory map memory not updated properly.")
222            m.flush()
223            m.close()
224        with open(TESTFN, 'rb') as f:
225            stuff = f.read()
226        self.assertEqual(stuff, b'c'*mapsize,
227               "Write-through memory map data file not updated properly.")
228
229        # Opening mmap with access=ACCESS_COPY
230        with open(TESTFN, "r+b") as f:
231            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
232            # Modifying copy-on-write memory map
233            m[:] = b'd'*mapsize
234            self.assertEqual(m[:], b'd' * mapsize,
235                             "Copy-on-write memory map data not written correctly.")
236            m.flush()
237            with open(TESTFN, "rb") as fp:
238                self.assertEqual(fp.read(), b'c'*mapsize,
239                                 "Copy-on-write test data file should not be modified.")
240            # Ensuring copy-on-write maps cannot be resized
241            self.assertRaises(TypeError, m.resize, 2*mapsize)
242            m.close()
243
244        # Ensuring invalid access parameter raises exception
245        with open(TESTFN, "r+b") as f:
246            self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
247
248        if os.name == "posix":
249            # Try incompatible flags, prot and access parameters.
250            with open(TESTFN, "r+b") as f:
251                self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
252                                  flags=mmap.MAP_PRIVATE,
253                                  prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
254
255            # Try writing with PROT_EXEC and without PROT_WRITE
256            prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0)
257            with open(TESTFN, "r+b") as f:
258                m = mmap.mmap(f.fileno(), mapsize, prot=prot)
259                self.assertRaises(TypeError, m.write, b"abcdef")
260                self.assertRaises(TypeError, m.write_byte, 0)
261                m.close()
262
263    def test_bad_file_desc(self):
264        # Try opening a bad file descriptor...
265        self.assertRaises(OSError, mmap.mmap, -2, 4096)
266
267    def test_tougher_find(self):
268        # Do a tougher .find() test.  SF bug 515943 pointed out that, in 2.2,
269        # searching for data with embedded \0 bytes didn't work.
270        with open(TESTFN, 'wb+') as f:
271
272            data = b'aabaac\x00deef\x00\x00aa\x00'
273            n = len(data)
274            f.write(data)
275            f.flush()
276            m = mmap.mmap(f.fileno(), n)
277
278        for start in range(n+1):
279            for finish in range(start, n+1):
280                slice = data[start : finish]
281                self.assertEqual(m.find(slice), data.find(slice))
282                self.assertEqual(m.find(slice + b'x'), -1)
283        m.close()
284
285    def test_find_end(self):
286        # test the new 'end' parameter works as expected
287        with open(TESTFN, 'wb+') as f:
288            data = b'one two ones'
289            n = len(data)
290            f.write(data)
291            f.flush()
292            m = mmap.mmap(f.fileno(), n)
293
294        self.assertEqual(m.find(b'one'), 0)
295        self.assertEqual(m.find(b'ones'), 8)
296        self.assertEqual(m.find(b'one', 0, -1), 0)
297        self.assertEqual(m.find(b'one', 1), 8)
298        self.assertEqual(m.find(b'one', 1, -1), 8)
299        self.assertEqual(m.find(b'one', 1, -2), -1)
300        self.assertEqual(m.find(bytearray(b'one')), 0)
301
302
303    def test_rfind(self):
304        # test the new 'end' parameter works as expected
305        with open(TESTFN, 'wb+') as f:
306            data = b'one two ones'
307            n = len(data)
308            f.write(data)
309            f.flush()
310            m = mmap.mmap(f.fileno(), n)
311
312        self.assertEqual(m.rfind(b'one'), 8)
313        self.assertEqual(m.rfind(b'one '), 0)
314        self.assertEqual(m.rfind(b'one', 0, -1), 8)
315        self.assertEqual(m.rfind(b'one', 0, -2), 0)
316        self.assertEqual(m.rfind(b'one', 1, -1), 8)
317        self.assertEqual(m.rfind(b'one', 1, -2), -1)
318        self.assertEqual(m.rfind(bytearray(b'one')), 8)
319
320
321    def test_double_close(self):
322        # make sure a double close doesn't crash on Solaris (Bug# 665913)
323        with open(TESTFN, 'wb+') as f:
324            f.write(2**16 * b'a') # Arbitrary character
325
326        with open(TESTFN, 'rb') as f:
327            mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
328            mf.close()
329            mf.close()
330
331    def test_entire_file(self):
332        # test mapping of entire file by passing 0 for map length
333        with open(TESTFN, "wb+") as f:
334            f.write(2**16 * b'm') # Arbitrary character
335
336        with open(TESTFN, "rb+") as f, \
337             mmap.mmap(f.fileno(), 0) as mf:
338            self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
339            self.assertEqual(mf.read(2**16), 2**16 * b"m")
340
341    def test_length_0_offset(self):
342        # Issue #10916: test mapping of remainder of file by passing 0 for
343        # map length with an offset doesn't cause a segfault.
344        # NOTE: allocation granularity is currently 65536 under Win64,
345        # and therefore the minimum offset alignment.
346        with open(TESTFN, "wb") as f:
347            f.write((65536 * 2) * b'm') # Arbitrary character
348
349        with open(TESTFN, "rb") as f:
350            with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
351                self.assertRaises(IndexError, mf.__getitem__, 80000)
352
353    def test_length_0_large_offset(self):
354        # Issue #10959: test mapping of a file by passing 0 for
355        # map length with a large offset doesn't cause a segfault.
356        with open(TESTFN, "wb") as f:
357            f.write(115699 * b'm') # Arbitrary character
358
359        with open(TESTFN, "w+b") as f:
360            self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0,
361                              offset=2147418112)
362
363    def test_move(self):
364        # make move works everywhere (64-bit format problem earlier)
365        with open(TESTFN, 'wb+') as f:
366
367            f.write(b"ABCDEabcde") # Arbitrary character
368            f.flush()
369
370            mf = mmap.mmap(f.fileno(), 10)
371            mf.move(5, 0, 5)
372            self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5")
373            mf.close()
374
375        # more excessive test
376        data = b"0123456789"
377        for dest in range(len(data)):
378            for src in range(len(data)):
379                for count in range(len(data) - max(dest, src)):
380                    expected = data[:dest] + data[src:src+count] + data[dest+count:]
381                    m = mmap.mmap(-1, len(data))
382                    m[:] = data
383                    m.move(dest, src, count)
384                    self.assertEqual(m[:], expected)
385                    m.close()
386
387        # segfault test (Issue 5387)
388        m = mmap.mmap(-1, 100)
389        offsets = [-100, -1, 0, 1, 100]
390        for source, dest, size in itertools.product(offsets, offsets, offsets):
391            try:
392                m.move(source, dest, size)
393            except ValueError:
394                pass
395
396        offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
397                   (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
398        for source, dest, size in offsets:
399            self.assertRaises(ValueError, m.move, source, dest, size)
400
401        m.close()
402
403        m = mmap.mmap(-1, 1) # single byte
404        self.assertRaises(ValueError, m.move, 0, 0, 2)
405        self.assertRaises(ValueError, m.move, 1, 0, 1)
406        self.assertRaises(ValueError, m.move, 0, 1, 1)
407        m.move(0, 0, 1)
408        m.move(0, 0, 0)
409
410    def test_anonymous(self):
411        # anonymous mmap.mmap(-1, PAGE)
412        m = mmap.mmap(-1, PAGESIZE)
413        for x in range(PAGESIZE):
414            self.assertEqual(m[x], 0,
415                             "anonymously mmap'ed contents should be zero")
416
417        for x in range(PAGESIZE):
418            b = x & 0xff
419            m[x] = b
420            self.assertEqual(m[x], b)
421
422    def test_read_all(self):
423        m = mmap.mmap(-1, 16)
424        self.addCleanup(m.close)
425
426        # With no parameters, or None or a negative argument, reads all
427        m.write(bytes(range(16)))
428        m.seek(0)
429        self.assertEqual(m.read(), bytes(range(16)))
430        m.seek(8)
431        self.assertEqual(m.read(), bytes(range(8, 16)))
432        m.seek(16)
433        self.assertEqual(m.read(), b'')
434        m.seek(3)
435        self.assertEqual(m.read(None), bytes(range(3, 16)))
436        m.seek(4)
437        self.assertEqual(m.read(-1), bytes(range(4, 16)))
438        m.seek(5)
439        self.assertEqual(m.read(-2), bytes(range(5, 16)))
440        m.seek(9)
441        self.assertEqual(m.read(-42), bytes(range(9, 16)))
442
443    def test_read_invalid_arg(self):
444        m = mmap.mmap(-1, 16)
445        self.addCleanup(m.close)
446
447        self.assertRaises(TypeError, m.read, 'foo')
448        self.assertRaises(TypeError, m.read, 5.5)
449        self.assertRaises(TypeError, m.read, [1, 2, 3])
450
451    def test_extended_getslice(self):
452        # Test extended slicing by comparing with list slicing.
453        s = bytes(reversed(range(256)))
454        m = mmap.mmap(-1, len(s))
455        m[:] = s
456        self.assertEqual(m[:], s)
457        indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
458        for start in indices:
459            for stop in indices:
460                # Skip step 0 (invalid)
461                for step in indices[1:]:
462                    self.assertEqual(m[start:stop:step],
463                                     s[start:stop:step])
464
465    def test_extended_set_del_slice(self):
466        # Test extended slicing by comparing with list slicing.
467        s = bytes(reversed(range(256)))
468        m = mmap.mmap(-1, len(s))
469        indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
470        for start in indices:
471            for stop in indices:
472                # Skip invalid step 0
473                for step in indices[1:]:
474                    m[:] = s
475                    self.assertEqual(m[:], s)
476                    L = list(s)
477                    # Make sure we have a slice of exactly the right length,
478                    # but with different data.
479                    data = L[start:stop:step]
480                    data = bytes(reversed(data))
481                    L[start:stop:step] = data
482                    m[start:stop:step] = data
483                    self.assertEqual(m[:], bytes(L))
484
485    def make_mmap_file (self, f, halfsize):
486        # Write 2 pages worth of data to the file
487        f.write (b'\0' * halfsize)
488        f.write (b'foo')
489        f.write (b'\0' * (halfsize - 3))
490        f.flush ()
491        return mmap.mmap (f.fileno(), 0)
492
493    def test_empty_file (self):
494        f = open (TESTFN, 'w+b')
495        f.close()
496        with open(TESTFN, "rb") as f :
497            self.assertRaisesRegex(ValueError,
498                                   "cannot mmap an empty file",
499                                   mmap.mmap, f.fileno(), 0,
500                                   access=mmap.ACCESS_READ)
501
502    def test_offset (self):
503        f = open (TESTFN, 'w+b')
504
505        try: # unlink TESTFN no matter what
506            halfsize = mmap.ALLOCATIONGRANULARITY
507            m = self.make_mmap_file (f, halfsize)
508            m.close ()
509            f.close ()
510
511            mapsize = halfsize * 2
512            # Try invalid offset
513            f = open(TESTFN, "r+b")
514            for offset in [-2, -1, None]:
515                try:
516                    m = mmap.mmap(f.fileno(), mapsize, offset=offset)
517                    self.assertEqual(0, 1)
518                except (ValueError, TypeError, OverflowError):
519                    pass
520                else:
521                    self.assertEqual(0, 0)
522            f.close()
523
524            # Try valid offset, hopefully 8192 works on all OSes
525            f = open(TESTFN, "r+b")
526            m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
527            self.assertEqual(m[0:3], b'foo')
528            f.close()
529
530            # Try resizing map
531            try:
532                m.resize(512)
533            except SystemError:
534                pass
535            else:
536                # resize() is supported
537                self.assertEqual(len(m), 512)
538                # Check that we can no longer seek beyond the new size.
539                self.assertRaises(ValueError, m.seek, 513, 0)
540                # Check that the content is not changed
541                self.assertEqual(m[0:3], b'foo')
542
543                # Check that the underlying file is truncated too
544                f = open(TESTFN, 'rb')
545                f.seek(0, 2)
546                self.assertEqual(f.tell(), halfsize + 512)
547                f.close()
548                self.assertEqual(m.size(), halfsize + 512)
549
550            m.close()
551
552        finally:
553            f.close()
554            try:
555                os.unlink(TESTFN)
556            except OSError:
557                pass
558
559    def test_subclass(self):
560        class anon_mmap(mmap.mmap):
561            def __new__(klass, *args, **kwargs):
562                return mmap.mmap.__new__(klass, -1, *args, **kwargs)
563        anon_mmap(PAGESIZE)
564
565    @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
566    def test_prot_readonly(self):
567        mapsize = 10
568        with open(TESTFN, "wb") as fp:
569            fp.write(b"a"*mapsize)
570        with open(TESTFN, "rb") as f:
571            m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
572            self.assertRaises(TypeError, m.write, "foo")
573
574    def test_error(self):
575        self.assertIs(mmap.error, OSError)
576
577    def test_io_methods(self):
578        data = b"0123456789"
579        with open(TESTFN, "wb") as fp:
580            fp.write(b"x"*len(data))
581        with open(TESTFN, "r+b") as f:
582            m = mmap.mmap(f.fileno(), len(data))
583        # Test write_byte()
584        for i in range(len(data)):
585            self.assertEqual(m.tell(), i)
586            m.write_byte(data[i])
587            self.assertEqual(m.tell(), i+1)
588        self.assertRaises(ValueError, m.write_byte, b"x"[0])
589        self.assertEqual(m[:], data)
590        # Test read_byte()
591        m.seek(0)
592        for i in range(len(data)):
593            self.assertEqual(m.tell(), i)
594            self.assertEqual(m.read_byte(), data[i])
595            self.assertEqual(m.tell(), i+1)
596        self.assertRaises(ValueError, m.read_byte)
597        # Test read()
598        m.seek(3)
599        self.assertEqual(m.read(3), b"345")
600        self.assertEqual(m.tell(), 6)
601        # Test write()
602        m.seek(3)
603        m.write(b"bar")
604        self.assertEqual(m.tell(), 6)
605        self.assertEqual(m[:], b"012bar6789")
606        m.write(bytearray(b"baz"))
607        self.assertEqual(m.tell(), 9)
608        self.assertEqual(m[:], b"012barbaz9")
609        self.assertRaises(ValueError, m.write, b"ba")
610
611    def test_non_ascii_byte(self):
612        for b in (129, 200, 255): # > 128
613            m = mmap.mmap(-1, 1)
614            m.write_byte(b)
615            self.assertEqual(m[0], b)
616            m.seek(0)
617            self.assertEqual(m.read_byte(), b)
618            m.close()
619
620    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
621    def test_tagname(self):
622        data1 = b"0123456789"
623        data2 = b"abcdefghij"
624        assert len(data1) == len(data2)
625        tagname1 = random_tagname()
626        tagname2 = random_tagname()
627
628        # Test same tag
629        m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
630        m1[:] = data1
631        m2 = mmap.mmap(-1, len(data2), tagname=tagname1)
632        m2[:] = data2
633        self.assertEqual(m1[:], data2)
634        self.assertEqual(m2[:], data2)
635        m2.close()
636        m1.close()
637
638        # Test different tag
639        m1 = mmap.mmap(-1, len(data1), tagname=tagname1)
640        m1[:] = data1
641        m2 = mmap.mmap(-1, len(data2), tagname=tagname2)
642        m2[:] = data2
643        self.assertEqual(m1[:], data1)
644        self.assertEqual(m2[:], data2)
645        m2.close()
646        m1.close()
647
648    @cpython_only
649    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
650    def test_sizeof(self):
651        m1 = mmap.mmap(-1, 100)
652        tagname = random_tagname()
653        m2 = mmap.mmap(-1, 100, tagname=tagname)
654        self.assertEqual(sys.getsizeof(m2),
655                         sys.getsizeof(m1) + len(tagname) + 1)
656
657    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
658    def test_crasher_on_windows(self):
659        # Should not crash (Issue 1733986)
660        tagname = random_tagname()
661        m = mmap.mmap(-1, 1000, tagname=tagname)
662        try:
663            mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size
664        except:
665            pass
666        m.close()
667
668        # Should not crash (Issue 5385)
669        with open(TESTFN, "wb") as fp:
670            fp.write(b"x"*10)
671        f = open(TESTFN, "r+b")
672        m = mmap.mmap(f.fileno(), 0)
673        f.close()
674        try:
675            m.resize(0) # will raise OSError
676        except:
677            pass
678        try:
679            m[:]
680        except:
681            pass
682        m.close()
683
684    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
685    def test_invalid_descriptor(self):
686        # socket file descriptors are valid, but out of range
687        # for _get_osfhandle, causing a crash when validating the
688        # parameters to _get_osfhandle.
689        s = socket.socket()
690        try:
691            with self.assertRaises(OSError):
692                m = mmap.mmap(s.fileno(), 10)
693        finally:
694            s.close()
695
696    def test_context_manager(self):
697        with mmap.mmap(-1, 10) as m:
698            self.assertFalse(m.closed)
699        self.assertTrue(m.closed)
700
701    def test_context_manager_exception(self):
702        # Test that the OSError gets passed through
703        with self.assertRaises(Exception) as exc:
704            with mmap.mmap(-1, 10) as m:
705                raise OSError
706        self.assertIsInstance(exc.exception, OSError,
707                              "wrong exception raised in context manager")
708        self.assertTrue(m.closed, "context manager failed")
709
710    def test_weakref(self):
711        # Check mmap objects are weakrefable
712        mm = mmap.mmap(-1, 16)
713        wr = weakref.ref(mm)
714        self.assertIs(wr(), mm)
715        del mm
716        gc_collect()
717        self.assertIs(wr(), None)
718
719    def test_write_returning_the_number_of_bytes_written(self):
720        mm = mmap.mmap(-1, 16)
721        self.assertEqual(mm.write(b""), 0)
722        self.assertEqual(mm.write(b"x"), 1)
723        self.assertEqual(mm.write(b"yz"), 2)
724        self.assertEqual(mm.write(b"python"), 6)
725
726    def test_resize_past_pos(self):
727        m = mmap.mmap(-1, 8192)
728        self.addCleanup(m.close)
729        m.read(5000)
730        try:
731            m.resize(4096)
732        except SystemError:
733            self.skipTest("resizing not supported")
734        self.assertEqual(m.read(14), b'')
735        self.assertRaises(ValueError, m.read_byte)
736        self.assertRaises(ValueError, m.write_byte, 42)
737        self.assertRaises(ValueError, m.write, b'abc')
738
739    def test_concat_repeat_exception(self):
740        m = mmap.mmap(-1, 16)
741        with self.assertRaises(TypeError):
742            m + m
743        with self.assertRaises(TypeError):
744            m * 2
745
746    def test_flush_return_value(self):
747        # mm.flush() should return None on success, raise an
748        # exception on error under all platforms.
749        mm = mmap.mmap(-1, 16)
750        self.addCleanup(mm.close)
751        mm.write(b'python')
752        result = mm.flush()
753        self.assertIsNone(result)
754        if sys.platform.startswith('linux'):
755            # 'offset' must be a multiple of mmap.PAGESIZE on Linux.
756            # See bpo-34754 for details.
757            self.assertRaises(OSError, mm.flush, 1, len(b'python'))
758
759    def test_repr(self):
760        open_mmap_repr_pat = re.compile(
761            r"<mmap.mmap closed=False, "
762            r"access=(?P<access>\S+), "
763            r"length=(?P<length>\d+), "
764            r"pos=(?P<pos>\d+), "
765            r"offset=(?P<offset>\d+)>")
766        closed_mmap_repr_pat = re.compile(r"<mmap.mmap closed=True>")
767        mapsizes = (50, 100, 1_000, 1_000_000, 10_000_000)
768        offsets = tuple((mapsize // 2 // mmap.ALLOCATIONGRANULARITY)
769                        * mmap.ALLOCATIONGRANULARITY for mapsize in mapsizes)
770        for offset, mapsize in zip(offsets, mapsizes):
771            data = b'a' * mapsize
772            length = mapsize - offset
773            accesses = ('ACCESS_DEFAULT', 'ACCESS_READ',
774                        'ACCESS_COPY', 'ACCESS_WRITE')
775            positions = (0, length//10, length//5, length//4)
776            with open(TESTFN, "wb+") as fp:
777                fp.write(data)
778                fp.flush()
779                for access, pos in itertools.product(accesses, positions):
780                    accint = getattr(mmap, access)
781                    with mmap.mmap(fp.fileno(),
782                                   length,
783                                   access=accint,
784                                   offset=offset) as mm:
785                        mm.seek(pos)
786                        match = open_mmap_repr_pat.match(repr(mm))
787                        self.assertIsNotNone(match)
788                        self.assertEqual(match.group('access'), access)
789                        self.assertEqual(match.group('length'), str(length))
790                        self.assertEqual(match.group('pos'), str(pos))
791                        self.assertEqual(match.group('offset'), str(offset))
792                    match = closed_mmap_repr_pat.match(repr(mm))
793                    self.assertIsNotNone(match)
794
795    @unittest.skipUnless(hasattr(mmap.mmap, 'madvise'), 'needs madvise')
796    def test_madvise(self):
797        size = 2 * PAGESIZE
798        m = mmap.mmap(-1, size)
799
800        with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
801            m.madvise(mmap.MADV_NORMAL, size)
802        with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
803            m.madvise(mmap.MADV_NORMAL, -1)
804        with self.assertRaisesRegex(ValueError, "madvise length invalid"):
805            m.madvise(mmap.MADV_NORMAL, 0, -1)
806        with self.assertRaisesRegex(OverflowError, "madvise length too large"):
807            m.madvise(mmap.MADV_NORMAL, PAGESIZE, sys.maxsize)
808        self.assertEqual(m.madvise(mmap.MADV_NORMAL), None)
809        self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE), None)
810        self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE, size), None)
811        self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None)
812        self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None)
813
814    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
815    def test_resize_up_when_mapped_to_pagefile(self):
816        """If the mmap is backed by the pagefile ensure a resize up can happen
817        and that the original data is still in place
818        """
819        start_size = PAGESIZE
820        new_size = 2 * start_size
821        data = bytes(random.getrandbits(8) for _ in range(start_size))
822
823        m = mmap.mmap(-1, start_size)
824        m[:] = data
825        m.resize(new_size)
826        self.assertEqual(len(m), new_size)
827        self.assertEqual(m[:start_size], data[:start_size])
828
829    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
830    def test_resize_down_when_mapped_to_pagefile(self):
831        """If the mmap is backed by the pagefile ensure a resize down up can happen
832        and that a truncated form of the original data is still in place
833        """
834        start_size = PAGESIZE
835        new_size = start_size // 2
836        data = bytes(random.getrandbits(8) for _ in range(start_size))
837
838        m = mmap.mmap(-1, start_size)
839        m[:] = data
840        m.resize(new_size)
841        self.assertEqual(len(m), new_size)
842        self.assertEqual(m[:new_size], data[:new_size])
843
844    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
845    def test_resize_fails_if_mapping_held_elsewhere(self):
846        """If more than one mapping is held against a named file on Windows, neither
847        mapping can be resized
848        """
849        start_size = 2 * PAGESIZE
850        reduced_size = PAGESIZE
851
852        f = open(TESTFN, 'wb+')
853        f.truncate(start_size)
854        try:
855            m1 = mmap.mmap(f.fileno(), start_size)
856            m2 = mmap.mmap(f.fileno(), start_size)
857            with self.assertRaises(OSError):
858                m1.resize(reduced_size)
859            with self.assertRaises(OSError):
860                m2.resize(reduced_size)
861            m2.close()
862            m1.resize(reduced_size)
863            self.assertEqual(m1.size(), reduced_size)
864            self.assertEqual(os.stat(f.fileno()).st_size, reduced_size)
865        finally:
866            f.close()
867
868    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
869    def test_resize_succeeds_with_error_for_second_named_mapping(self):
870        """If a more than one mapping exists of the same name, none of them can
871        be resized: they'll raise an Exception and leave the original mapping intact
872        """
873        start_size = 2 * PAGESIZE
874        reduced_size = PAGESIZE
875        tagname =  random_tagname()
876        data_length = 8
877        data = bytes(random.getrandbits(8) for _ in range(data_length))
878
879        m1 = mmap.mmap(-1, start_size, tagname=tagname)
880        m2 = mmap.mmap(-1, start_size, tagname=tagname)
881        m1[:data_length] = data
882        self.assertEqual(m2[:data_length], data)
883        with self.assertRaises(OSError):
884            m1.resize(reduced_size)
885        self.assertEqual(m1.size(), start_size)
886        self.assertEqual(m1[:data_length], data)
887        self.assertEqual(m2[:data_length], data)
888
889    def test_mmap_closed_by_int_scenarios(self):
890        """
891        gh-103987: Test that mmap objects raise ValueError
892                for closed mmap files
893        """
894
895        class MmapClosedByIntContext:
896            def __init__(self, access) -> None:
897                self.access = access
898
899            def __enter__(self):
900                self.f = open(TESTFN, "w+b")
901                self.f.write(random.randbytes(100))
902                self.f.flush()
903
904                m = mmap.mmap(self.f.fileno(), 100, access=self.access)
905
906                class X:
907                    def __index__(self):
908                        m.close()
909                        return 10
910
911                return (m, X)
912
913            def __exit__(self, exc_type, exc_value, traceback):
914                self.f.close()
915
916        read_access_modes = [
917            mmap.ACCESS_READ,
918            mmap.ACCESS_WRITE,
919            mmap.ACCESS_COPY,
920            mmap.ACCESS_DEFAULT,
921        ]
922
923        write_access_modes = [
924            mmap.ACCESS_WRITE,
925            mmap.ACCESS_COPY,
926            mmap.ACCESS_DEFAULT,
927        ]
928
929        for access in read_access_modes:
930            with MmapClosedByIntContext(access) as (m, X):
931                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
932                    m[X()]
933
934            with MmapClosedByIntContext(access) as (m, X):
935                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
936                    m[X() : 20]
937
938            with MmapClosedByIntContext(access) as (m, X):
939                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
940                    m[X() : 20 : 2]
941
942            with MmapClosedByIntContext(access) as (m, X):
943                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
944                    m[20 : X() : -2]
945
946            with MmapClosedByIntContext(access) as (m, X):
947                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
948                    m.read(X())
949
950            with MmapClosedByIntContext(access) as (m, X):
951                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
952                    m.find(b"1", 1, X())
953
954        for access in write_access_modes:
955            with MmapClosedByIntContext(access) as (m, X):
956                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
957                    m[X() : 20] = b"1" * 10
958
959            with MmapClosedByIntContext(access) as (m, X):
960                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
961                    m[X() : 20 : 2] = b"1" * 5
962
963            with MmapClosedByIntContext(access) as (m, X):
964                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
965                    m[20 : X() : -2] = b"1" * 5
966
967            with MmapClosedByIntContext(access) as (m, X):
968                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
969                    m.move(1, 2, X())
970
971            with MmapClosedByIntContext(access) as (m, X):
972                with self.assertRaisesRegex(ValueError, "mmap closed or invalid"):
973                    m.write_byte(X())
974
975class LargeMmapTests(unittest.TestCase):
976
977    def setUp(self):
978        unlink(TESTFN)
979
980    def tearDown(self):
981        unlink(TESTFN)
982
983    def _make_test_file(self, num_zeroes, tail):
984        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
985            requires('largefile',
986                'test requires %s bytes and a long time to run' % str(0x180000000))
987        f = open(TESTFN, 'w+b')
988        try:
989            f.seek(num_zeroes)
990            f.write(tail)
991            f.flush()
992        except (OSError, OverflowError, ValueError):
993            try:
994                f.close()
995            except (OSError, OverflowError):
996                pass
997            raise unittest.SkipTest("filesystem does not have largefile support")
998        return f
999
1000    def test_large_offset(self):
1001        with self._make_test_file(0x14FFFFFFF, b" ") as f:
1002            with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m:
1003                self.assertEqual(m[0xFFFFFFF], 32)
1004
1005    def test_large_filesize(self):
1006        with self._make_test_file(0x17FFFFFFF, b" ") as f:
1007            if sys.maxsize < 0x180000000:
1008                # On 32 bit platforms the file is larger than sys.maxsize so
1009                # mapping the whole file should fail -- Issue #16743
1010                with self.assertRaises(OverflowError):
1011                    mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ)
1012                with self.assertRaises(ValueError):
1013                    mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
1014            with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m:
1015                self.assertEqual(m.size(), 0x180000000)
1016
1017    # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X.
1018
1019    def _test_around_boundary(self, boundary):
1020        tail = b'  DEARdear  '
1021        start = boundary - len(tail) // 2
1022        end = start + len(tail)
1023        with self._make_test_file(start, tail) as f:
1024            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m:
1025                self.assertEqual(m[start:end], tail)
1026
1027    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
1028    def test_around_2GB(self):
1029        self._test_around_boundary(_2G)
1030
1031    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
1032    def test_around_4GB(self):
1033        self._test_around_boundary(_4G)
1034
1035
1036if __name__ == '__main__':
1037    unittest.main()
1038