mp_pool.py 6.97 KB
Newer Older
1 2 3
#
# A test of `multiprocessing.Pool` class
#
Benjamin Peterson's avatar
Benjamin Peterson committed
4 5 6
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
7 8 9 10 11 12 13 14 15 16 17 18 19

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
20
        multiprocessing.current_process().name,
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def f(x):
    return 1.0 / (x-5.0)

def pow3(x):
    return x**3

def noop(x):
    pass

#
# Test code
#

def test():
49
    print('cpu_count() = %d\n' % multiprocessing.cpu_count())
50 51 52 53 54 55

    #
    # Create pool
    #

    PROCESSES = 4
56
    print('Creating pool with %d processes\n' % PROCESSES)
57
    pool = multiprocessing.Pool(PROCESSES)
58 59
    print('pool = %s' % pool)
    print()
60 61 62 63 64 65 66 67 68 69 70 71

    #
    # Tests
    #

    TASKS = [(mul, (i, 7)) for i in range(10)] + \
            [(plus, (i, 8)) for i in range(10)]

    results = [pool.apply_async(calculate, t) for t in TASKS]
    imap_it = pool.imap(calculatestar, TASKS)
    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

72
    print('Ordered results using pool.apply_async():')
73
    for r in results:
74 75
        print('\t', r.get())
    print()
76

77
    print('Ordered results using pool.imap():')
78
    for x in imap_it:
79 80
        print('\t', x)
    print()
81

82
    print('Unordered results using pool.imap_unordered():')
83
    for x in imap_unordered_it:
84 85
        print('\t', x)
    print()
86

87
    print('Ordered results using pool.map() --- will block till complete:')
88
    for x in pool.map(calculatestar, TASKS):
89 90
        print('\t', x)
    print()
91 92 93 94 95 96

    #
    # Simple benchmarks
    #

    N = 100000
97
    print('def pow3(x): return x**3')
98 99

    t = time.time()
100
    A = list(map(pow3, range(N)))
101
    print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \
102
          (N, time.time() - t))
103 104

    t = time.time()
105
    B = pool.map(pow3, range(N))
106
    print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \
107
          (N, time.time() - t))
108 109

    t = time.time()
110
    C = list(pool.imap(pow3, range(N), chunksize=N//8))
111
    print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \
112
          ' seconds' % (N, N//8, time.time() - t))
113 114

    assert A == B == C, (len(A), len(B), len(C))
115
    print()
116 117

    L = [None] * 1000000
118 119
    print('def noop(x): pass')
    print('L = [None] * 1000000')
120 121

    t = time.time()
122 123 124
    A = list(map(noop, L))
    print('\tmap(noop, L):\n\t\t%s seconds' % \
          (time.time() - t))
125 126 127

    t = time.time()
    B = pool.map(noop, L)
128 129
    print('\tpool.map(noop, L):\n\t\t%s seconds' % \
          (time.time() - t))
130 131 132

    t = time.time()
    C = list(pool.imap(noop, L, chunksize=len(L)//8))
133 134
    print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
          (len(L)//8, time.time() - t))
135 136

    assert A == B == C, (len(A), len(B), len(C))
137
    print()
138 139 140 141 142 143 144

    del A, B, C, L

    #
    # Test error handling
    #

145
    print('Testing error handling:')
146 147

    try:
148
        print(pool.apply(f, (5,)))
149
    except ZeroDivisionError:
150
        print('\tGot ZeroDivisionError as expected from pool.apply()')
151
    else:
152
        raise AssertionError('expected ZeroDivisionError')
153 154

    try:
155
        print(pool.map(f, list(range(10))))
156
    except ZeroDivisionError:
157
        print('\tGot ZeroDivisionError as expected from pool.map()')
158
    else:
159
        raise AssertionError('expected ZeroDivisionError')
160 161

    try:
162
        print(list(pool.imap(f, list(range(10)))))
163
    except ZeroDivisionError:
164
        print('\tGot ZeroDivisionError as expected from list(pool.imap())')
165
    else:
166
        raise AssertionError('expected ZeroDivisionError')
167

168
    it = pool.imap(f, list(range(10)))
169 170
    for i in range(10):
        try:
171
            x = next(it)
172 173 174 175 176 177 178
        except ZeroDivisionError:
            if i == 5:
                pass
        except StopIteration:
            break
        else:
            if i == 5:
179
                raise AssertionError('expected ZeroDivisionError')
180 181

    assert i == 9
182 183
    print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
    print()
184 185 186 187 188

    #
    # Testing timeouts
    #

189
    print('Testing ApplyResult.get() with timeout:', end=' ')
190 191 192 193 194 195 196 197
    res = pool.apply_async(calculate, TASKS[0])
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % res.get(0.02))
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
198 199
    print()
    print()
200

201
    print('Testing IMapIterator.next() with timeout:', end=' ')
202 203 204 205 206 207 208 209 210
    it = pool.imap(calculatestar, TASKS)
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % it.next(0.02))
        except StopIteration:
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
211 212
    print()
    print()
213 214 215 216 217

    #
    # Testing callback
    #

218
    print('Testing callback:')
219 220 221 222 223 224 225

    A = []
    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

    r = pool.apply_async(mul, (7, 8), callback=A.append)
    r.wait()

226
    r = pool.map_async(pow3, list(range(10)), callback=A.extend)
227 228 229
    r.wait()

    if A == B:
230
        print('\tcallbacks succeeded\n')
231
    else:
232
        print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
233 234 235 236 237 238 239 240 241 242 243

    #
    # Check there are no outstanding tasks
    #

    assert not pool._cache, 'cache = %r' % pool._cache

    #
    # Check close() methods
    #

244
    print('Testing close():')
245 246 247 248 249 250 251 252 253 254 255 256 257

    for worker in pool._pool:
        assert worker.is_alive()

    result = pool.apply_async(time.sleep, [0.5])
    pool.close()
    pool.join()

    assert result.get() is None

    for worker in pool._pool:
        assert not worker.is_alive()

258
    print('\tclose() succeeded\n')
259 260 261 262 263

    #
    # Check terminate() method
    #

264
    print('Testing terminate():')
265 266 267 268 269 270 271 272 273 274 275

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
    pool.terminate()
    pool.join()

    for worker in pool._pool:
        assert not worker.is_alive()

276
    print('\tterminate() succeeded\n')
277 278 279 280 281

    #
    # Check garbage collection
    #

282
    print('Testing garbage collection:')
283 284 285 286 287 288 289 290 291 292 293 294 295 296

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    processes = pool._pool
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]

    results = pool = None

    time.sleep(DELTA * 2)

    for worker in processes:
        assert not worker.is_alive()

297
    print('\tgarbage collection succeeded\n')
298 299 300 301 302 303 304 305


if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
306
        print(' Using processes '.center(79, '-'))
307
    elif sys.argv[1] == 'threads':
308
        print(' Using threads '.center(79, '-'))
309 310
        import multiprocessing.dummy as multiprocessing
    else:
311
        print('Usage:\n\t%s [processes | threads]' % sys.argv[0])
312 313 314
        raise SystemExit(2)

    test()