heap.py 8.12 KB
Newer Older
1 2 3 4 5
#
# Module which supports allocation of memory from an mmap
#
# multiprocessing/heap.py
#
6
# Copyright (c) 2006-2008, R Oudkerk
7
# Licensed to PSF under a Contributor Agreement.
8 9 10 11 12 13
#

import bisect
import mmap
import os
import sys
14
import tempfile
15
import threading
16

17
from .context import reduction, assert_spawning
18
from . import util
19 20 21 22

__all__ = ['BufferWrapper']

#
23
# Inheritable class which wraps an mmap, and from which blocks can be allocated
24 25 26 27
#

if sys.platform == 'win32':

28
    import _winapi
29 30 31

    class Arena(object):

32
        _rand = tempfile._RandomNameSequence()
33 34 35

        def __init__(self, size):
            self.size = size
36 37 38 39 40 41 42 43 44 45 46
            for i in range(100):
                name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
                buf = mmap.mmap(-1, size, tagname=name)
                if _winapi.GetLastError() == 0:
                    break
                # We have reopened a preexisting mmap.
                buf.close()
            else:
                raise FileExistsError('Cannot find name for new mmap')
            self.name = name
            self.buffer = buf
47 48 49
            self._state = (self.size, self.name)

        def __getstate__(self):
50
            assert_spawning(self)
51 52 53 54 55
            return self._state

        def __setstate__(self, state):
            self.size, self.name = self._state = state
            self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
56 57 58
            # XXX Temporarily preventing buildbot failures while determining
            # XXX the correct long-term fix. See issue 23060
            #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
59 60 61 62 63

else:

    class Arena(object):

64
        def __init__(self, size, fd=-1):
65
            self.size = size
66 67 68 69 70 71 72
            self.fd = fd
            if fd == -1:
                self.fd, name = tempfile.mkstemp(
                     prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir())
                os.unlink(name)
                util.Finalize(self, os.close, (self.fd,))
                with open(self.fd, 'wb', closefd=False) as f:
73 74 75 76 77 78 79 80
                    bs = 1024 * 1024
                    if size >= bs:
                        zeros = b'\0' * bs
                        for _ in range(size // bs):
                            f.write(zeros)
                        del zeros
                    f.write(b'\0' * (size % bs))
                    assert f.tell() == size
81 82 83 84 85 86 87 88 89 90 91 92
            self.buffer = mmap.mmap(self.fd, self.size)

    def reduce_arena(a):
        if a.fd == -1:
            raise ValueError('Arena is unpicklable because '
                             'forking was enabled when it was created')
        return rebuild_arena, (a.size, reduction.DupFd(a.fd))

    def rebuild_arena(size, dupfd):
        return Arena(size, dupfd.detach())

    reduction.register(Arena, reduce_arena)
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111

#
# Class allowing allocation of chunks of memory from arenas
#

class Heap(object):

    _alignment = 8

    def __init__(self, size=mmap.PAGESIZE):
        self._lastpid = os.getpid()
        self._lock = threading.Lock()
        self._size = size
        self._lengths = []
        self._len_to_seq = {}
        self._start_to_block = {}
        self._stop_to_block = {}
        self._allocated_blocks = set()
        self._arenas = []
112 113
        # list of pending blocks to free - see free() comment below
        self._pending_free_blocks = []
114 115 116 117 118 119 120 121 122 123 124 125 126

    @staticmethod
    def _roundup(n, alignment):
        # alignment must be a power of 2
        mask = alignment - 1
        return (n + mask) & ~mask

    def _malloc(self, size):
        # returns a large enough block -- it might be much larger
        i = bisect.bisect_left(self._lengths, size)
        if i == len(self._lengths):
            length = self._roundup(max(self._size, size), mmap.PAGESIZE)
            self._size *= 2
127
            util.info('allocating a new mmap of length %d', length)
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
            arena = Arena(length)
            self._arenas.append(arena)
            return (arena, 0, length)
        else:
            length = self._lengths[i]
            seq = self._len_to_seq[length]
            block = seq.pop()
            if not seq:
                del self._len_to_seq[length], self._lengths[i]

        (arena, start, stop) = block
        del self._start_to_block[(arena, start)]
        del self._stop_to_block[(arena, stop)]
        return block

    def _free(self, block):
        # free location and try to merge with neighbours
        (arena, start, stop) = block

        try:
            prev_block = self._stop_to_block[(arena, start)]
        except KeyError:
            pass
        else:
            start, _ = self._absorb(prev_block)

        try:
            next_block = self._start_to_block[(arena, stop)]
        except KeyError:
            pass
        else:
            _, stop = self._absorb(next_block)

        block = (arena, start, stop)
        length = stop - start

        try:
            self._len_to_seq[length].append(block)
        except KeyError:
            self._len_to_seq[length] = [block]
            bisect.insort(self._lengths, length)

        self._start_to_block[(arena, start)] = block
        self._stop_to_block[(arena, stop)] = block

    def _absorb(self, block):
        # deregister this block so it can be merged with a neighbour
        (arena, start, stop) = block
        del self._start_to_block[(arena, start)]
        del self._stop_to_block[(arena, stop)]

        length = stop - start
        seq = self._len_to_seq[length]
        seq.remove(block)
        if not seq:
            del self._len_to_seq[length]
            self._lengths.remove(length)

        return start, stop

188 189 190 191 192 193 194 195 196 197
    def _free_pending_blocks(self):
        # Free all the blocks in the pending list - called with the lock held.
        while True:
            try:
                block = self._pending_free_blocks.pop()
            except IndexError:
                break
            self._allocated_blocks.remove(block)
            self._free(block)

198 199
    def free(self, block):
        # free a block returned by malloc()
200 201 202 203 204 205 206 207
        # Since free() can be called asynchronously by the GC, it could happen
        # that it's called while self._lock is held: in that case,
        # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
        # trylock is used instead, and if the lock can't be acquired
        # immediately, the block is added to a list of blocks to be freed
        # synchronously sometimes later from malloc() or free(), by calling
        # _free_pending_blocks() (appending and retrieving from a list is not
        # strictly thread-safe but under cPython it's atomic thanks to the GIL).
208
        assert os.getpid() == self._lastpid
209 210 211 212 213 214 215 216 217 218 219 220
        if not self._lock.acquire(False):
            # can't acquire the lock right now, add the block to the list of
            # pending blocks to free
            self._pending_free_blocks.append(block)
        else:
            # we hold the lock
            try:
                self._free_pending_blocks()
                self._allocated_blocks.remove(block)
                self._free(block)
            finally:
                self._lock.release()
221 222 223 224 225 226

    def malloc(self, size):
        # return a block of right size (possibly rounded up)
        assert 0 <= size < sys.maxsize
        if os.getpid() != self._lastpid:
            self.__init__()                     # reinitialize after fork
227 228
        with self._lock:
            self._free_pending_blocks()
229 230 231 232 233 234 235 236 237 238
            size = self._roundup(max(size,1), self._alignment)
            (arena, start, stop) = self._malloc(size)
            new_stop = start + size
            if new_stop < stop:
                self._free((arena, new_stop, stop))
            block = (arena, start, new_stop)
            self._allocated_blocks.add(block)
            return block

#
239
# Class representing a chunk of an mmap -- can be inherited by child process
240 241 242 243 244 245 246 247 248 249
#

class BufferWrapper(object):

    _heap = Heap()

    def __init__(self, size):
        assert 0 <= size < sys.maxsize
        block = BufferWrapper._heap.malloc(size)
        self._state = (block, size)
250
        util.Finalize(self, BufferWrapper._heap.free, args=(block,))
251

252
    def create_memoryview(self):
253
        (arena, start, stop), size = self._state
254
        return memoryview(arena.buffer)[start:start+size]