test_multiprocessing_main_handling.py 11.3 KB
Newer Older
1
# tests __main__ module handling in multiprocessing
2
from test import support
3
# Skip tests if _multiprocessing wasn't built.
4
support.import_module('_multiprocessing')
5 6 7 8 9 10 11 12 13

import importlib
import importlib.machinery
import unittest
import sys
import os
import os.path
import py_compile

14
from test.support.script_helper import (
15
    make_pkg, make_script, make_zip_pkg, make_zip_script,
16
    assert_python_ok)
17

18 19 20
if support.PGO:
    raise unittest.SkipTest("test is not helpful for PGO")

21 22 23
# Look up which start methods are available to test
import multiprocessing
AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods())
24

25 26 27
# Issue #22332: Skip tests if sem_open implementation is broken.
support.import_module('multiprocessing.synchronize')

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
verbose = support.verbose

test_source = """\
# multiprocessing includes all sorts of shenanigans to make __main__
# attributes accessible in the subprocess in a pickle compatible way.

# We run the "doesn't work in the interactive interpreter" example from
# the docs to make sure it *does* work from an executed __main__,
# regardless of the invocation mechanism

import sys
import time
from multiprocessing import Pool, set_start_method

# We use this __main__ defined function in the map call below in order to
# check that multiprocessing in correctly running the unguarded
# code in child processes and then making it available as __main__
def f(x):
    return x*x

# Check explicit relative imports
if "check_sibling" in __file__:
    # We're inside a package and not in a __main__.py file
    # so make sure explicit relative imports work correctly
    from . import sibling

if __name__ == '__main__':
    start_method = sys.argv[1]
    set_start_method(start_method)
    p = Pool(5)
    results = []
    p.map_async(f, [1, 2, 3], callback=results.extend)
60
    start_time = time.monotonic()
61 62
    while not results:
        time.sleep(0.05)
63 64 65 66
        # up to 1 min to report the results
        dt = time.monotonic() - start_time
        if dt > 60.0:
            raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
    results.sort()
    print(start_method, "->", results)
"""

test_source_main_skipped_in_children = """\
# __main__.py files have an implied "if __name__ == '__main__'" so
# multiprocessing should always skip running them in child processes

# This means we can't use __main__ defined functions in child processes,
# so we just use "int" as a passthrough operation below

if __name__ != "__main__":
    raise RuntimeError("Should only be called as __main__!")

import sys
import time
from multiprocessing import Pool, set_start_method

start_method = sys.argv[1]
set_start_method(start_method)
p = Pool(5)
results = []
p.map_async(int, [1, 4, 9], callback=results.extend)
90
start_time = time.monotonic()
91 92
while not results:
    time.sleep(0.05)
93 94 95 96
    # up to 1 min to report the results
    dt = time.monotonic() - start_time
    if dt > 60.0:
        raise RuntimeError("Timed out waiting for results (%.1f sec)" % dt)
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
results.sort()
print(start_method, "->", results)
"""

# These helpers were copied from test_cmd_line_script & tweaked a bit...

def _make_test_script(script_dir, script_basename,
                      source=test_source, omit_suffix=False):
    to_return = make_script(script_dir, script_basename,
                            source, omit_suffix)
    # Hack to check explicit relative imports
    if script_basename == "check_sibling":
        make_script(script_dir, "sibling", "")
    importlib.invalidate_caches()
    return to_return

def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
                       source=test_source, depth=1):
    to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
                             source, depth)
    importlib.invalidate_caches()
    return to_return

# There's no easy way to pass the script directory in to get
# -m to work (avoiding that is the whole point of making
# directories and zipfiles executable!)
# So we fake it for testing purposes with a custom launch script
launch_source = """\
import sys, os.path, runpy
sys.path.insert(0, %s)
runpy._run_module_as_main(%r)
"""

def _make_launch_script(script_dir, script_basename, module_name, path=None):
    if path is None:
        path = "os.path.dirname(__file__)"
    else:
        path = repr(path)
    source = launch_source % (path, module_name)
    to_return = make_script(script_dir, script_basename, source)
    importlib.invalidate_caches()
    return to_return

class MultiProcessingCmdLineMixin():
    maxDiff = None # Show full tracebacks on subprocess failure

143
    def setUp(self):
144
        if self.start_method not in AVAILABLE_START_METHODS:
145
            self.skipTest("%r start method not available" % self.start_method)
146 147 148 149

    def _check_output(self, script_name, exit_code, out, err):
        if verbose > 1:
            print("Output from test script %r:" % script_name)
150
            print(repr(out))
151 152 153 154 155 156 157 158 159 160 161 162 163
        self.assertEqual(exit_code, 0)
        self.assertEqual(err.decode('utf-8'), '')
        expected_results = "%s -> [1, 4, 9]" % self.start_method
        self.assertEqual(out.decode('utf-8').strip(), expected_results)

    def _check_script(self, script_name, *cmd_line_switches):
        if not __debug__:
            cmd_line_switches += ('-' + 'O' * sys.flags.optimize,)
        run_args = cmd_line_switches + (script_name, self.start_method)
        rc, out, err = assert_python_ok(*run_args, __isolated=False)
        self._check_output(script_name, rc, out, err)

    def test_basic_script(self):
164
        with support.temp_dir() as script_dir:
165 166 167 168
            script_name = _make_test_script(script_dir, 'script')
            self._check_script(script_name)

    def test_basic_script_no_suffix(self):
169
        with support.temp_dir() as script_dir:
170 171 172 173 174 175 176 177 178 179
            script_name = _make_test_script(script_dir, 'script',
                                            omit_suffix=True)
            self._check_script(script_name)

    def test_ipython_workaround(self):
        # Some versions of the IPython launch script are missing the
        # __name__ = "__main__" guard, and multiprocessing has long had
        # a workaround for that case
        # See https://github.com/ipython/ipython/issues/4698
        source = test_source_main_skipped_in_children
180
        with support.temp_dir() as script_dir:
181 182 183 184 185 186 187 188 189
            script_name = _make_test_script(script_dir, 'ipython',
                                            source=source)
            self._check_script(script_name)
            script_no_suffix = _make_test_script(script_dir, 'ipython',
                                                 source=source,
                                                 omit_suffix=True)
            self._check_script(script_no_suffix)

    def test_script_compiled(self):
190
        with support.temp_dir() as script_dir:
191 192 193 194 195 196 197 198
            script_name = _make_test_script(script_dir, 'script')
            py_compile.compile(script_name, doraise=True)
            os.remove(script_name)
            pyc_file = support.make_legacy_pyc(script_name)
            self._check_script(pyc_file)

    def test_directory(self):
        source = self.main_in_children_source
199
        with support.temp_dir() as script_dir:
200 201 202 203 204 205
            script_name = _make_test_script(script_dir, '__main__',
                                            source=source)
            self._check_script(script_dir)

    def test_directory_compiled(self):
        source = self.main_in_children_source
206
        with support.temp_dir() as script_dir:
207 208 209 210 211 212 213 214 215
            script_name = _make_test_script(script_dir, '__main__',
                                            source=source)
            py_compile.compile(script_name, doraise=True)
            os.remove(script_name)
            pyc_file = support.make_legacy_pyc(script_name)
            self._check_script(script_dir)

    def test_zipfile(self):
        source = self.main_in_children_source
216
        with support.temp_dir() as script_dir:
217 218 219 220 221 222 223
            script_name = _make_test_script(script_dir, '__main__',
                                            source=source)
            zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name)
            self._check_script(zip_name)

    def test_zipfile_compiled(self):
        source = self.main_in_children_source
224
        with support.temp_dir() as script_dir:
225 226 227 228 229 230 231
            script_name = _make_test_script(script_dir, '__main__',
                                            source=source)
            compiled_name = py_compile.compile(script_name, doraise=True)
            zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name)
            self._check_script(zip_name)

    def test_module_in_package(self):
232
        with support.temp_dir() as script_dir:
233 234 235 236 237 238 239 240
            pkg_dir = os.path.join(script_dir, 'test_pkg')
            make_pkg(pkg_dir)
            script_name = _make_test_script(pkg_dir, 'check_sibling')
            launch_name = _make_launch_script(script_dir, 'launch',
                                              'test_pkg.check_sibling')
            self._check_script(launch_name)

    def test_module_in_package_in_zipfile(self):
241
        with support.temp_dir() as script_dir:
242 243 244 245 246
            zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script')
            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name)
            self._check_script(launch_name)

    def test_module_in_subpackage_in_zipfile(self):
247
        with support.temp_dir() as script_dir:
248 249 250 251 252 253
            zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2)
            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name)
            self._check_script(launch_name)

    def test_package(self):
        source = self.main_in_children_source
254
        with support.temp_dir() as script_dir:
255 256 257 258 259 260 261 262 263
            pkg_dir = os.path.join(script_dir, 'test_pkg')
            make_pkg(pkg_dir)
            script_name = _make_test_script(pkg_dir, '__main__',
                                            source=source)
            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
            self._check_script(launch_name)

    def test_package_compiled(self):
        source = self.main_in_children_source
264
        with support.temp_dir() as script_dir:
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
            pkg_dir = os.path.join(script_dir, 'test_pkg')
            make_pkg(pkg_dir)
            script_name = _make_test_script(pkg_dir, '__main__',
                                            source=source)
            compiled_name = py_compile.compile(script_name, doraise=True)
            os.remove(script_name)
            pyc_file = support.make_legacy_pyc(script_name)
            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
            self._check_script(launch_name)

# Test all supported start methods (setupClass skips as appropriate)

class SpawnCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
    start_method = 'spawn'
    main_in_children_source = test_source_main_skipped_in_children

class ForkCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
    start_method = 'fork'
    main_in_children_source = test_source

class ForkServerCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
    start_method = 'forkserver'
    main_in_children_source = test_source_main_skipped_in_children

289
def tearDownModule():
290 291 292
    support.reap_children()

if __name__ == '__main__':
293
    unittest.main()