test_mmap.py 28 KB
Newer Older
1
from test.support import (TESTFN, import_module, unlink,
2
                          requires, _2G, _4G, gc_collect, cpython_only)
3
import unittest
Benjamin Peterson's avatar
Benjamin Peterson committed
4 5 6 7 8
import os
import re
import itertools
import socket
import sys
9
import weakref
10

11 12 13
# Skip test if we can't import mmap.
mmap = import_module('mmap')

14 15
PAGESIZE = mmap.PAGESIZE

16
class MmapTests(unittest.TestCase):
17

18 19 20
    def setUp(self):
        if os.path.exists(TESTFN):
            os.unlink(TESTFN)
21

22 23 24 25 26
    def tearDown(self):
        try:
            os.unlink(TESTFN)
        except OSError:
            pass
27

28 29
    def test_basic(self):
        # Test mmap module on Unix systems and Windows
30

31
        # Create a file to be mmap'ed.
32
        f = open(TESTFN, 'bw+')
33 34
        try:
            # Write 2 pages worth of data to the file
35 36 37
            f.write(b'\0'* PAGESIZE)
            f.write(b'foo')
            f.write(b'\0'* (PAGESIZE-3) )
38 39
            f.flush()
            m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
40
        finally:
41
            f.close()
42

43
        # Simple sanity checks
44

45
        tp = str(type(m))  # SF bug 128713:  segfaulted on Linux
46
        self.assertEqual(m.find(b'foo'), PAGESIZE)
47

48
        self.assertEqual(len(m), 2*PAGESIZE)
49

50
        self.assertEqual(m[0], 0)
51
        self.assertEqual(m[0:3], b'\0\0\0')
52

53 54 55 56
        # Shouldn't crash on boundary (Issue #5292)
        self.assertRaises(IndexError, m.__getitem__, len(m))
        self.assertRaises(IndexError, m.__setitem__, len(m), b'\0')

57
        # Modify the file's content
58
        m[0] = b'3'[0]
59
        m[PAGESIZE +3: PAGESIZE +3+3] = b'bar'
60

61
        # Check that the modification worked
62
        self.assertEqual(m[0], b'3'[0])
63 64
        self.assertEqual(m[0:3], b'3\0\0')
        self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0')
65

66
        m.flush()
67

68
        # Test doing a regular expression match in an mmap'ed file
69
        match = re.search(b'[A-Za-z]+', m)
70 71 72 73 74
        if match is None:
            self.fail('regex match on mmap failed!')
        else:
            start, end = match.span(0)
            length = end - start
75

76 77
            self.assertEqual(start, PAGESIZE)
            self.assertEqual(end, PAGESIZE + 6)
78

79 80 81 82 83 84 85
        # test seeking around (try to overflow the seek implementation)
        m.seek(0,0)
        self.assertEqual(m.tell(), 0)
        m.seek(42,1)
        self.assertEqual(m.tell(), 42)
        m.seek(0,2)
        self.assertEqual(m.tell(), len(m))
86

87 88
        # Try to seek to negative position...
        self.assertRaises(ValueError, m.seek, -1)
89

90 91
        # Try to seek beyond end of mmap...
        self.assertRaises(ValueError, m.seek, 1, 2)
92

93 94
        # Try to seek to negative position...
        self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
95

96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
        # Try resizing map
        try:
            m.resize(512)
        except SystemError:
            # resize() not supported
            # No messages are printed, since the output of this test suite
            # would then be different across platforms.
            pass
        else:
            # resize() is supported
            self.assertEqual(len(m), 512)
            # Check that we can no longer seek beyond the new size.
            self.assertRaises(ValueError, m.seek, 513, 0)

            # Check that the underlying file is truncated too
            # (bug #728515)
112
            f = open(TESTFN, 'rb')
113
            try:
114 115
                f.seek(0, 2)
                self.assertEqual(f.tell(), 512)
116
            finally:
117
                f.close()
118
            self.assertEqual(m.size(), 512)
119

120
        m.close()
121

122 123
    def test_access_parameter(self):
        # Test for "access" keyword parameter
124
        mapsize = 10
Benjamin Peterson's avatar
Benjamin Peterson committed
125 126 127 128 129
        with open(TESTFN, "wb") as fp:
            fp.write(b"a"*mapsize)
        with open(TESTFN, "rb") as f:
            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
            self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
130

Benjamin Peterson's avatar
Benjamin Peterson committed
131 132 133 134 135 136 137
            # Ensuring that readonly mmap can't be slice assigned
            try:
                m[:] = b'b'*mapsize
            except TypeError:
                pass
            else:
                self.fail("Able to write to readonly memory map")
138

Benjamin Peterson's avatar
Benjamin Peterson committed
139 140 141 142 143 144 145
            # Ensuring that readonly mmap can't be item assigned
            try:
                m[0] = b'b'
            except TypeError:
                pass
            else:
                self.fail("Able to write to readonly memory map")
146

Benjamin Peterson's avatar
Benjamin Peterson committed
147 148 149 150 151 152 153 154
            # Ensuring that readonly mmap can't be write() to
            try:
                m.seek(0,0)
                m.write(b'abc')
            except TypeError:
                pass
            else:
                self.fail("Able to write to readonly memory map")
155

Benjamin Peterson's avatar
Benjamin Peterson committed
156 157 158 159 160 161 162 163
            # Ensuring that readonly mmap can't be write_byte() to
            try:
                m.seek(0,0)
                m.write_byte(b'd')
            except TypeError:
                pass
            else:
                self.fail("Able to write to readonly memory map")
164

Benjamin Peterson's avatar
Benjamin Peterson committed
165 166 167 168 169 170 171 172 173 174 175 176
            # Ensuring that readonly mmap can't be resized
            try:
                m.resize(2*mapsize)
            except SystemError:   # resize is not universally supported
                pass
            except TypeError:
                pass
            else:
                self.fail("Able to resize readonly memory map")
            with open(TESTFN, "rb") as fp:
                self.assertEqual(fp.read(), b'a'*mapsize,
                                 "Readonly memory map data file was modified")
177

178
        # Opening mmap with size too big
Benjamin Peterson's avatar
Benjamin Peterson committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
        with open(TESTFN, "r+b") as f:
            try:
                m = mmap.mmap(f.fileno(), mapsize+1)
            except ValueError:
                # we do not expect a ValueError on Windows
                # CAUTION:  This also changes the size of the file on disk, and
                # later tests assume that the length hasn't changed.  We need to
                # repair that.
                if sys.platform.startswith('win'):
                    self.fail("Opening mmap with size+1 should work on Windows.")
            else:
                # we expect a ValueError on Unix, but not on Windows
                if not sys.platform.startswith('win'):
                    self.fail("Opening mmap with size+1 should raise ValueError.")
                m.close()
194
            if sys.platform.startswith('win'):
Benjamin Peterson's avatar
Benjamin Peterson committed
195 196 197
                # Repair damage from the resizing test.
                with open(TESTFN, 'r+b') as f:
                    f.truncate(mapsize)
198

199
        # Opening mmap with access=ACCESS_WRITE
Benjamin Peterson's avatar
Benjamin Peterson committed
200 201 202 203 204 205 206 207 208 209
        with open(TESTFN, "r+b") as f:
            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
            # Modifying write-through memory map
            m[:] = b'c'*mapsize
            self.assertEqual(m[:], b'c'*mapsize,
                   "Write-through memory map memory not updated properly.")
            m.flush()
            m.close()
        with open(TESTFN, 'rb') as f:
            stuff = f.read()
210
        self.assertEqual(stuff, b'c'*mapsize,
211 212
               "Write-through memory map data file not updated properly.")

213
        # Opening mmap with access=ACCESS_COPY
Benjamin Peterson's avatar
Benjamin Peterson committed
214 215 216 217 218 219 220 221 222 223 224 225 226
        with open(TESTFN, "r+b") as f:
            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
            # Modifying copy-on-write memory map
            m[:] = b'd'*mapsize
            self.assertEqual(m[:], b'd' * mapsize,
                             "Copy-on-write memory map data not written correctly.")
            m.flush()
            with open(TESTFN, "rb") as fp:
                self.assertEqual(fp.read(), b'c'*mapsize,
                                 "Copy-on-write test data file should not be modified.")
            # Ensuring copy-on-write maps cannot be resized
            self.assertRaises(TypeError, m.resize, 2*mapsize)
            m.close()
227 228

        # Ensuring invalid access parameter raises exception
Benjamin Peterson's avatar
Benjamin Peterson committed
229 230
        with open(TESTFN, "r+b") as f:
            self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
231 232

        if os.name == "posix":
233
            # Try incompatible flags, prot and access parameters.
Benjamin Peterson's avatar
Benjamin Peterson committed
234 235 236 237
            with open(TESTFN, "r+b") as f:
                self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
                                  flags=mmap.MAP_PRIVATE,
                                  prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
238

239 240
            # Try writing with PROT_EXEC and without PROT_WRITE
            prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0)
241
            with open(TESTFN, "r+b") as f:
242
                m = mmap.mmap(f.fileno(), mapsize, prot=prot)
243 244 245 246
                self.assertRaises(TypeError, m.write, b"abcdef")
                self.assertRaises(TypeError, m.write_byte, 0)
                m.close()

247 248
    def test_bad_file_desc(self):
        # Try opening a bad file descriptor...
249
        self.assertRaises(OSError, mmap.mmap, -2, 4096)
250

251 252 253
    def test_tougher_find(self):
        # Do a tougher .find() test.  SF bug 515943 pointed out that, in 2.2,
        # searching for data with embedded \0 bytes didn't work.
Benjamin Peterson's avatar
Benjamin Peterson committed
254
        with open(TESTFN, 'wb+') as f:
255

Benjamin Peterson's avatar
Benjamin Peterson committed
256 257 258 259 260
            data = b'aabaac\x00deef\x00\x00aa\x00'
            n = len(data)
            f.write(data)
            f.flush()
            m = mmap.mmap(f.fileno(), n)
261 262 263 264

        for start in range(n+1):
            for finish in range(start, n+1):
                slice = data[start : finish]
265
                self.assertEqual(m.find(slice), data.find(slice))
266
                self.assertEqual(m.find(slice + b'x'), -1)
267
        m.close()
268

269 270
    def test_find_end(self):
        # test the new 'end' parameter works as expected
271 272 273 274 275 276
        with open(TESTFN, 'wb+') as f:
            data = b'one two ones'
            n = len(data)
            f.write(data)
            f.flush()
            m = mmap.mmap(f.fileno(), n)
277

278 279 280 281 282 283
        self.assertEqual(m.find(b'one'), 0)
        self.assertEqual(m.find(b'ones'), 8)
        self.assertEqual(m.find(b'one', 0, -1), 0)
        self.assertEqual(m.find(b'one', 1), 8)
        self.assertEqual(m.find(b'one', 1, -1), 8)
        self.assertEqual(m.find(b'one', 1, -2), -1)
284
        self.assertEqual(m.find(bytearray(b'one')), 0)
285 286 287 288


    def test_rfind(self):
        # test the new 'end' parameter works as expected
289 290 291 292 293 294
        with open(TESTFN, 'wb+') as f:
            data = b'one two ones'
            n = len(data)
            f.write(data)
            f.flush()
            m = mmap.mmap(f.fileno(), n)
295

296 297 298 299 300 301
        self.assertEqual(m.rfind(b'one'), 8)
        self.assertEqual(m.rfind(b'one '), 0)
        self.assertEqual(m.rfind(b'one', 0, -1), 8)
        self.assertEqual(m.rfind(b'one', 0, -2), 0)
        self.assertEqual(m.rfind(b'one', 1, -1), 8)
        self.assertEqual(m.rfind(b'one', 1, -2), -1)
302
        self.assertEqual(m.rfind(bytearray(b'one')), 8)
303 304


305 306
    def test_double_close(self):
        # make sure a double close doesn't crash on Solaris (Bug# 665913)
307 308
        with open(TESTFN, 'wb+') as f:
            f.write(2**16 * b'a') # Arbitrary character
309

310 311 312 313
        with open(TESTFN, 'rb') as f:
            mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
            mf.close()
            mf.close()
314

315 316
    def test_entire_file(self):
        # test mapping of entire file by passing 0 for map length
317 318
        with open(TESTFN, "wb+") as f:
            f.write(2**16 * b'm') # Arbitrary character
319

320 321 322 323
        with open(TESTFN, "rb+") as f, \
             mmap.mmap(f.fileno(), 0) as mf:
            self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
            self.assertEqual(mf.read(2**16), 2**16 * b"m")
324

325 326 327
    def test_length_0_offset(self):
        # Issue #10916: test mapping of remainder of file by passing 0 for
        # map length with an offset doesn't cause a segfault.
328 329 330 331
        # NOTE: allocation granularity is currently 65536 under Win64,
        # and therefore the minimum offset alignment.
        with open(TESTFN, "wb") as f:
            f.write((65536 * 2) * b'm') # Arbitrary character
332 333

        with open(TESTFN, "rb") as f:
334 335
            with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
                self.assertRaises(IndexError, mf.__getitem__, 80000)
336

337 338 339 340 341 342 343 344 345 346
    def test_length_0_large_offset(self):
        # Issue #10959: test mapping of a file by passing 0 for
        # map length with a large offset doesn't cause a segfault.
        with open(TESTFN, "wb") as f:
            f.write(115699 * b'm') # Arbitrary character

        with open(TESTFN, "w+b") as f:
            self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0,
                              offset=2147418112)

347 348
    def test_move(self):
        # make move works everywhere (64-bit format problem earlier)
349
        with open(TESTFN, 'wb+') as f:
350

351 352
            f.write(b"ABCDEabcde") # Arbitrary character
            f.flush()
353

354 355 356 357
            mf = mmap.mmap(f.fileno(), 10)
            mf.move(5, 0, 5)
            self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5")
            mf.close()
358

359 360 361 362 363 364 365 366 367 368 369 370
        # more excessive test
        data = b"0123456789"
        for dest in range(len(data)):
            for src in range(len(data)):
                for count in range(len(data) - max(dest, src)):
                    expected = data[:dest] + data[src:src+count] + data[dest+count:]
                    m = mmap.mmap(-1, len(data))
                    m[:] = data
                    m.move(dest, src, count)
                    self.assertEqual(m[:], expected)
                    m.close()

371 372 373 374 375 376 377 378
        # segfault test (Issue 5387)
        m = mmap.mmap(-1, 100)
        offsets = [-100, -1, 0, 1, 100]
        for source, dest, size in itertools.product(offsets, offsets, offsets):
            try:
                m.move(source, dest, size)
            except ValueError:
                pass
Benjamin Peterson's avatar
Benjamin Peterson committed
379 380 381 382 383 384

        offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
                   (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
        for source, dest, size in offsets:
            self.assertRaises(ValueError, m.move, source, dest, size)

385 386
        m.close()

Benjamin Peterson's avatar
Benjamin Peterson committed
387 388 389 390 391 392 393 394
        m = mmap.mmap(-1, 1) # single byte
        self.assertRaises(ValueError, m.move, 0, 0, 2)
        self.assertRaises(ValueError, m.move, 1, 0, 1)
        self.assertRaises(ValueError, m.move, 0, 1, 1)
        m.move(0, 0, 1)
        m.move(0, 0, 0)


395 396 397
    def test_anonymous(self):
        # anonymous mmap.mmap(-1, PAGE)
        m = mmap.mmap(-1, PAGESIZE)
398
        for x in range(PAGESIZE):
399 400
            self.assertEqual(m[x], 0,
                             "anonymously mmap'ed contents should be zero")
401

402
        for x in range(PAGESIZE):
403
            b = x & 0xff
404 405
            m[x] = b
            self.assertEqual(m[x], b)
406

407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
    def test_read_all(self):
        m = mmap.mmap(-1, 16)
        self.addCleanup(m.close)

        # With no parameters, or None or a negative argument, reads all
        m.write(bytes(range(16)))
        m.seek(0)
        self.assertEqual(m.read(), bytes(range(16)))
        m.seek(8)
        self.assertEqual(m.read(), bytes(range(8, 16)))
        m.seek(16)
        self.assertEqual(m.read(), b'')
        m.seek(3)
        self.assertEqual(m.read(None), bytes(range(3, 16)))
        m.seek(4)
        self.assertEqual(m.read(-1), bytes(range(4, 16)))
        m.seek(5)
        self.assertEqual(m.read(-2), bytes(range(5, 16)))
        m.seek(9)
        self.assertEqual(m.read(-42), bytes(range(9, 16)))

    def test_read_invalid_arg(self):
        m = mmap.mmap(-1, 16)
        self.addCleanup(m.close)

        self.assertRaises(TypeError, m.read, 'foo')
        self.assertRaises(TypeError, m.read, 5.5)
        self.assertRaises(TypeError, m.read, [1, 2, 3])

436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467
    def test_extended_getslice(self):
        # Test extended slicing by comparing with list slicing.
        s = bytes(reversed(range(256)))
        m = mmap.mmap(-1, len(s))
        m[:] = s
        self.assertEqual(m[:], s)
        indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
        for start in indices:
            for stop in indices:
                # Skip step 0 (invalid)
                for step in indices[1:]:
                    self.assertEqual(m[start:stop:step],
                                     s[start:stop:step])

    def test_extended_set_del_slice(self):
        # Test extended slicing by comparing with list slicing.
        s = bytes(reversed(range(256)))
        m = mmap.mmap(-1, len(s))
        indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
        for start in indices:
            for stop in indices:
                # Skip invalid step 0
                for step in indices[1:]:
                    m[:] = s
                    self.assertEqual(m[:], s)
                    L = list(s)
                    # Make sure we have a slice of exactly the right length,
                    # but with different data.
                    data = L[start:stop:step]
                    data = bytes(reversed(data))
                    L[start:stop:step] = data
                    m[start:stop:step] = data
468
                    self.assertEqual(m[:], bytes(L))
469

470 471 472 473 474 475 476 477
    def make_mmap_file (self, f, halfsize):
        # Write 2 pages worth of data to the file
        f.write (b'\0' * halfsize)
        f.write (b'foo')
        f.write (b'\0' * (halfsize - 3))
        f.flush ()
        return mmap.mmap (f.fileno(), 0)

478 479 480
    def test_empty_file (self):
        f = open (TESTFN, 'w+b')
        f.close()
481 482 483 484 485
        with open(TESTFN, "rb") as f :
            self.assertRaisesRegex(ValueError,
                                   "cannot mmap an empty file",
                                   mmap.mmap, f.fileno(), 0,
                                   access=mmap.ACCESS_READ)
486

487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
    def test_offset (self):
        f = open (TESTFN, 'w+b')

        try: # unlink TESTFN no matter what
            halfsize = mmap.ALLOCATIONGRANULARITY
            m = self.make_mmap_file (f, halfsize)
            m.close ()
            f.close ()

            mapsize = halfsize * 2
            # Try invalid offset
            f = open(TESTFN, "r+b")
            for offset in [-2, -1, None]:
                try:
                    m = mmap.mmap(f.fileno(), mapsize, offset=offset)
                    self.assertEqual(0, 1)
                except (ValueError, TypeError, OverflowError):
                    pass
                else:
                    self.assertEqual(0, 0)
            f.close()

            # Try valid offset, hopefully 8192 works on all OSes
            f = open(TESTFN, "r+b")
            m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
            self.assertEqual(m[0:3], b'foo')
            f.close()
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528

            # Try resizing map
            try:
                m.resize(512)
            except SystemError:
                pass
            else:
                # resize() is supported
                self.assertEqual(len(m), 512)
                # Check that we can no longer seek beyond the new size.
                self.assertRaises(ValueError, m.seek, 513, 0)
                # Check that the content is not changed
                self.assertEqual(m[0:3], b'foo')

                # Check that the underlying file is truncated too
529
                f = open(TESTFN, 'rb')
530 531 532 533 534
                f.seek(0, 2)
                self.assertEqual(f.tell(), halfsize + 512)
                f.close()
                self.assertEqual(m.size(), halfsize + 512)

535 536 537 538 539 540 541 542 543
            m.close()

        finally:
            f.close()
            try:
                os.unlink(TESTFN)
            except OSError:
                pass

544 545 546 547 548 549
    def test_subclass(self):
        class anon_mmap(mmap.mmap):
            def __new__(klass, *args, **kwargs):
                return mmap.mmap.__new__(klass, -1, *args, **kwargs)
        anon_mmap(PAGESIZE)

550
    @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
Christian Heimes's avatar
Christian Heimes committed
551 552
    def test_prot_readonly(self):
        mapsize = 10
Benjamin Peterson's avatar
Benjamin Peterson committed
553 554
        with open(TESTFN, "wb") as fp:
            fp.write(b"a"*mapsize)
555 556 557
        with open(TESTFN, "rb") as f:
            m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
            self.assertRaises(TypeError, m.write, "foo")
558

Christian Heimes's avatar
Christian Heimes committed
559
    def test_error(self):
560
        self.assertIs(mmap.error, OSError)
Christian Heimes's avatar
Christian Heimes committed
561

562 563
    def test_io_methods(self):
        data = b"0123456789"
Benjamin Peterson's avatar
Benjamin Peterson committed
564 565
        with open(TESTFN, "wb") as fp:
            fp.write(b"x"*len(data))
566 567
        with open(TESTFN, "r+b") as f:
            m = mmap.mmap(f.fileno(), len(data))
568 569
        # Test write_byte()
        for i in range(len(data)):
570
            self.assertEqual(m.tell(), i)
571
            m.write_byte(data[i])
572
            self.assertEqual(m.tell(), i+1)
573
        self.assertRaises(ValueError, m.write_byte, b"x"[0])
574
        self.assertEqual(m[:], data)
575 576 577
        # Test read_byte()
        m.seek(0)
        for i in range(len(data)):
578 579 580
            self.assertEqual(m.tell(), i)
            self.assertEqual(m.read_byte(), data[i])
            self.assertEqual(m.tell(), i+1)
581 582 583
        self.assertRaises(ValueError, m.read_byte)
        # Test read()
        m.seek(3)
584 585
        self.assertEqual(m.read(3), b"345")
        self.assertEqual(m.tell(), 6)
586 587 588
        # Test write()
        m.seek(3)
        m.write(b"bar")
589 590
        self.assertEqual(m.tell(), 6)
        self.assertEqual(m[:], b"012bar6789")
591 592 593 594
        m.write(bytearray(b"baz"))
        self.assertEqual(m.tell(), 9)
        self.assertEqual(m[:], b"012barbaz9")
        self.assertRaises(ValueError, m.write, b"ba")
595

596 597 598 599
    def test_non_ascii_byte(self):
        for b in (129, 200, 255): # > 128
            m = mmap.mmap(-1, 1)
            m.write_byte(b)
600
            self.assertEqual(m[0], b)
601
            m.seek(0)
602
            self.assertEqual(m.read_byte(), b)
603 604
            m.close()

605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
    def test_tagname(self):
        data1 = b"0123456789"
        data2 = b"abcdefghij"
        assert len(data1) == len(data2)

        # Test same tag
        m1 = mmap.mmap(-1, len(data1), tagname="foo")
        m1[:] = data1
        m2 = mmap.mmap(-1, len(data2), tagname="foo")
        m2[:] = data2
        self.assertEqual(m1[:], data2)
        self.assertEqual(m2[:], data2)
        m2.close()
        m1.close()

        # Test different tag
        m1 = mmap.mmap(-1, len(data1), tagname="foo")
        m1[:] = data1
        m2 = mmap.mmap(-1, len(data2), tagname="boo")
        m2[:] = data2
        self.assertEqual(m1[:], data1)
        self.assertEqual(m2[:], data2)
        m2.close()
        m1.close()

631 632 633 634 635 636
    @cpython_only
    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
    def test_sizeof(self):
        m1 = mmap.mmap(-1, 100)
        tagname = "foo"
        m2 = mmap.mmap(-1, 100, tagname=tagname)
Serhiy Storchaka's avatar
Serhiy Storchaka committed
637 638
        self.assertEqual(sys.getsizeof(m2),
                         sys.getsizeof(m1) + len(tagname) + 1)
639

640 641 642 643 644 645 646 647 648
    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
    def test_crasher_on_windows(self):
        # Should not crash (Issue 1733986)
        m = mmap.mmap(-1, 1000, tagname="foo")
        try:
            mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
        except:
            pass
        m.close()
649

650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
        # Should not crash (Issue 5385)
        with open(TESTFN, "wb") as fp:
            fp.write(b"x"*10)
        f = open(TESTFN, "r+b")
        m = mmap.mmap(f.fileno(), 0)
        f.close()
        try:
            m.resize(0) # will raise OSError
        except:
            pass
        try:
            m[:]
        except:
            pass
        m.close()
665

666 667 668 669 670 671 672 673 674 675 676
    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
    def test_invalid_descriptor(self):
        # socket file descriptors are valid, but out of range
        # for _get_osfhandle, causing a crash when validating the
        # parameters to _get_osfhandle.
        s = socket.socket()
        try:
            with self.assertRaises(OSError):
                m = mmap.mmap(s.fileno(), 10)
        finally:
            s.close()
677

678 679 680 681 682 683
    def test_context_manager(self):
        with mmap.mmap(-1, 10) as m:
            self.assertFalse(m.closed)
        self.assertTrue(m.closed)

    def test_context_manager_exception(self):
684
        # Test that the OSError gets passed through
685 686
        with self.assertRaises(Exception) as exc:
            with mmap.mmap(-1, 10) as m:
687 688
                raise OSError
        self.assertIsInstance(exc.exception, OSError,
689 690 691
                              "wrong exception raised in context manager")
        self.assertTrue(m.closed, "context manager failed")

692 693 694 695 696 697 698 699 700
    def test_weakref(self):
        # Check mmap objects are weakrefable
        mm = mmap.mmap(-1, 16)
        wr = weakref.ref(mm)
        self.assertIs(wr(), mm)
        del mm
        gc_collect()
        self.assertIs(wr(), None)

701 702 703 704 705 706 707
    def test_write_returning_the_number_of_bytes_written(self):
        mm = mmap.mmap(-1, 16)
        self.assertEqual(mm.write(b""), 0)
        self.assertEqual(mm.write(b"x"), 1)
        self.assertEqual(mm.write(b"yz"), 2)
        self.assertEqual(mm.write(b"python"), 6)

708
    @unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows')
709 710 711 712
    def test_resize_past_pos(self):
        m = mmap.mmap(-1, 8192)
        self.addCleanup(m.close)
        m.read(5000)
713 714 715 716
        try:
            m.resize(4096)
        except SystemError:
            self.skipTest("resizing not supported")
717
        self.assertEqual(m.read(14), b'')
718
        self.assertRaises(ValueError, m.read_byte)
719 720 721
        self.assertRaises(ValueError, m.write_byte, 42)
        self.assertRaises(ValueError, m.write, b'abc')

722 723 724 725 726 727 728
    def test_concat_repeat_exception(self):
        m = mmap.mmap(-1, 16)
        with self.assertRaises(TypeError):
            m + m
        with self.assertRaises(TypeError):
            m * 2

729 730 731 732 733 734 735 736
    def test_flush_return_value(self):
        # mm.flush() should return None on success, raise an
        # exception on error under all platforms.
        mm = mmap.mmap(-1, 16)
        self.addCleanup(mm.close)
        mm.write(b'python')
        result = mm.flush()
        self.assertIsNone(result)
737 738 739
        if sys.platform.startswith('linux'):
            # 'offset' must be a multiple of mmap.PAGESIZE on Linux.
            # See bpo-34754 for details.
740 741
            self.assertRaises(OSError, mm.flush, 1, len(b'python'))

742

743 744 745 746 747 748 749 750
class LargeMmapTests(unittest.TestCase):

    def setUp(self):
        unlink(TESTFN)

    def tearDown(self):
        unlink(TESTFN)

751
    def _make_test_file(self, num_zeroes, tail):
752 753 754
        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
            requires('largefile',
                'test requires %s bytes and a long time to run' % str(0x180000000))
755 756 757 758 759
        f = open(TESTFN, 'w+b')
        try:
            f.seek(num_zeroes)
            f.write(tail)
            f.flush()
760
        except (OSError, OverflowError, ValueError):
761 762 763 764
            try:
                f.close()
            except (OSError, OverflowError):
                pass
765 766
            raise unittest.SkipTest("filesystem does not have largefile support")
        return f
767

768
    def test_large_offset(self):
769
        with self._make_test_file(0x14FFFFFFF, b" ") as f:
770 771 772 773
            with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m:
                self.assertEqual(m[0xFFFFFFF], 32)

    def test_large_filesize(self):
774
        with self._make_test_file(0x17FFFFFFF, b" ") as f:
775 776 777 778 779 780 781
            if sys.maxsize < 0x180000000:
                # On 32 bit platforms the file is larger than sys.maxsize so
                # mapping the whole file should fail -- Issue #16743
                with self.assertRaises(OverflowError):
                    mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ)
                with self.assertRaises(ValueError):
                    mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
782 783 784
            with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m:
                self.assertEqual(m.size(), 0x180000000)

785
    # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X.
786 787 788 789 790

    def _test_around_boundary(self, boundary):
        tail = b'  DEARdear  '
        start = boundary - len(tail) // 2
        end = start + len(tail)
791
        with self._make_test_file(start, tail) as f:
792
            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m:
793 794 795 796 797 798 799 800 801 802
                self.assertEqual(m[start:end], tail)

    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
    def test_around_2GB(self):
        self._test_around_boundary(_2G)

    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
    def test_around_4GB(self):
        self._test_around_boundary(_4G)

Christian Heimes's avatar
Christian Heimes committed
803

804
if __name__ == '__main__':
805
    unittest.main()