test_multiprocessing_main_handling.py 11.3 KB
Newer Older
1
# tests __main__ module handling in multiprocessing
2 3 4 5
from test import support
# Skip tests if _thread or _multiprocessing wasn't built.
support.import_module('_thread')
support.import_module('_multiprocessing')
6 7 8 9 10 11 12 13 14 15

import importlib
import importlib.machinery
import zipimport
import unittest
import sys
import os
import os.path
import py_compile

16
from test.support.script_helper import (
17
    make_pkg, make_script, make_zip_pkg, make_zip_script,
18
    assert_python_ok, assert_python_failure, spawn_python, kill_python)
19

20 21 22
# Look up which start methods are available to test
import multiprocessing
AVAILABLE_START_METHODS = set(multiprocessing.get_all_start_methods())
23

24 25 26
# Issue #22332: Skip tests if sem_open implementation is broken.
support.import_module('multiprocessing.synchronize')

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
verbose = support.verbose

test_source = """\
# multiprocessing includes all sorts of shenanigans to make __main__
# attributes accessible in the subprocess in a pickle compatible way.

# We run the "doesn't work in the interactive interpreter" example from
# the docs to make sure it *does* work from an executed __main__,
# regardless of the invocation mechanism

import sys
import time
from multiprocessing import Pool, set_start_method

# We use this __main__ defined function in the map call below in order to
# check that multiprocessing in correctly running the unguarded
# code in child processes and then making it available as __main__
def f(x):
    return x*x

# Check explicit relative imports
if "check_sibling" in __file__:
    # We're inside a package and not in a __main__.py file
    # so make sure explicit relative imports work correctly
    from . import sibling

if __name__ == '__main__':
    start_method = sys.argv[1]
    set_start_method(start_method)
    p = Pool(5)
    results = []
    p.map_async(f, [1, 2, 3], callback=results.extend)
59
    deadline = time.time() + 10 # up to 10 s to report the results
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
    while not results:
        time.sleep(0.05)
        if time.time() > deadline:
            raise RuntimeError("Timed out waiting for results")
    results.sort()
    print(start_method, "->", results)
"""

test_source_main_skipped_in_children = """\
# __main__.py files have an implied "if __name__ == '__main__'" so
# multiprocessing should always skip running them in child processes

# This means we can't use __main__ defined functions in child processes,
# so we just use "int" as a passthrough operation below

if __name__ != "__main__":
    raise RuntimeError("Should only be called as __main__!")

import sys
import time
from multiprocessing import Pool, set_start_method

start_method = sys.argv[1]
set_start_method(start_method)
p = Pool(5)
results = []
p.map_async(int, [1, 4, 9], callback=results.extend)
87
deadline = time.time() + 10 # up to 10 s to report the results
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
while not results:
    time.sleep(0.05)
    if time.time() > deadline:
        raise RuntimeError("Timed out waiting for results")
results.sort()
print(start_method, "->", results)
"""

# These helpers were copied from test_cmd_line_script & tweaked a bit...

def _make_test_script(script_dir, script_basename,
                      source=test_source, omit_suffix=False):
    to_return = make_script(script_dir, script_basename,
                            source, omit_suffix)
    # Hack to check explicit relative imports
    if script_basename == "check_sibling":
        make_script(script_dir, "sibling", "")
    importlib.invalidate_caches()
    return to_return

def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
                       source=test_source, depth=1):
    to_return = make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
                             source, depth)
    importlib.invalidate_caches()
    return to_return

# There's no easy way to pass the script directory in to get
# -m to work (avoiding that is the whole point of making
# directories and zipfiles executable!)
# So we fake it for testing purposes with a custom launch script
launch_source = """\
import sys, os.path, runpy
sys.path.insert(0, %s)
runpy._run_module_as_main(%r)
"""

def _make_launch_script(script_dir, script_basename, module_name, path=None):
    if path is None:
        path = "os.path.dirname(__file__)"
    else:
        path = repr(path)
    source = launch_source % (path, module_name)
    to_return = make_script(script_dir, script_basename, source)
    importlib.invalidate_caches()
    return to_return

class MultiProcessingCmdLineMixin():
    maxDiff = None # Show full tracebacks on subprocess failure

138
    def setUp(self):
139
        if self.start_method not in AVAILABLE_START_METHODS:
140
            self.skipTest("%r start method not available" % self.start_method)
141 142 143 144

    def _check_output(self, script_name, exit_code, out, err):
        if verbose > 1:
            print("Output from test script %r:" % script_name)
145
            print(repr(out))
146 147 148 149 150 151 152 153 154 155 156 157 158
        self.assertEqual(exit_code, 0)
        self.assertEqual(err.decode('utf-8'), '')
        expected_results = "%s -> [1, 4, 9]" % self.start_method
        self.assertEqual(out.decode('utf-8').strip(), expected_results)

    def _check_script(self, script_name, *cmd_line_switches):
        if not __debug__:
            cmd_line_switches += ('-' + 'O' * sys.flags.optimize,)
        run_args = cmd_line_switches + (script_name, self.start_method)
        rc, out, err = assert_python_ok(*run_args, __isolated=False)
        self._check_output(script_name, rc, out, err)

    def test_basic_script(self):
159
        with support.temp_dir() as script_dir:
160 161 162 163
            script_name = _make_test_script(script_dir, 'script')
            self._check_script(script_name)

    def test_basic_script_no_suffix(self):
164
        with support.temp_dir() as script_dir:
165 166 167 168 169 170 171 172 173 174
            script_name = _make_test_script(script_dir, 'script',
                                            omit_suffix=True)
            self._check_script(script_name)

    def test_ipython_workaround(self):
        # Some versions of the IPython launch script are missing the
        # __name__ = "__main__" guard, and multiprocessing has long had
        # a workaround for that case
        # See https://github.com/ipython/ipython/issues/4698
        source = test_source_main_skipped_in_children
175
        with support.temp_dir() as script_dir:
176 177 178 179 180 181 182 183 184
            script_name = _make_test_script(script_dir, 'ipython',
                                            source=source)
            self._check_script(script_name)
            script_no_suffix = _make_test_script(script_dir, 'ipython',
                                                 source=source,
                                                 omit_suffix=True)
            self._check_script(script_no_suffix)

    def test_script_compiled(self):
185
        with support.temp_dir() as script_dir:
186 187 188 189 190 191 192 193
            script_name = _make_test_script(script_dir, 'script')
            py_compile.compile(script_name, doraise=True)
            os.remove(script_name)
            pyc_file = support.make_legacy_pyc(script_name)
            self._check_script(pyc_file)

    def test_directory(self):
        source = self.main_in_children_source
194
        with support.temp_dir() as script_dir:
195 196 197 198 199 200
            script_name = _make_test_script(script_dir, '__main__',
                                            source=source)
            self._check_script(script_dir)

    def test_directory_compiled(self):
        source = self.main_in_children_source
201
        with support.temp_dir() as script_dir:
202 203 204 205 206 207 208 209 210
            script_name = _make_test_script(script_dir, '__main__',
                                            source=source)
            py_compile.compile(script_name, doraise=True)
            os.remove(script_name)
            pyc_file = support.make_legacy_pyc(script_name)
            self._check_script(script_dir)

    def test_zipfile(self):
        source = self.main_in_children_source
211
        with support.temp_dir() as script_dir:
212 213 214 215 216 217 218
            script_name = _make_test_script(script_dir, '__main__',
                                            source=source)
            zip_name, run_name = make_zip_script(script_dir, 'test_zip', script_name)
            self._check_script(zip_name)

    def test_zipfile_compiled(self):
        source = self.main_in_children_source
219
        with support.temp_dir() as script_dir:
220 221 222 223 224 225 226
            script_name = _make_test_script(script_dir, '__main__',
                                            source=source)
            compiled_name = py_compile.compile(script_name, doraise=True)
            zip_name, run_name = make_zip_script(script_dir, 'test_zip', compiled_name)
            self._check_script(zip_name)

    def test_module_in_package(self):
227
        with support.temp_dir() as script_dir:
228 229 230 231 232 233 234 235
            pkg_dir = os.path.join(script_dir, 'test_pkg')
            make_pkg(pkg_dir)
            script_name = _make_test_script(pkg_dir, 'check_sibling')
            launch_name = _make_launch_script(script_dir, 'launch',
                                              'test_pkg.check_sibling')
            self._check_script(launch_name)

    def test_module_in_package_in_zipfile(self):
236
        with support.temp_dir() as script_dir:
237 238 239 240 241
            zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script')
            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.script', zip_name)
            self._check_script(launch_name)

    def test_module_in_subpackage_in_zipfile(self):
242
        with support.temp_dir() as script_dir:
243 244 245 246 247 248
            zip_name, run_name = _make_test_zip_pkg(script_dir, 'test_zip', 'test_pkg', 'script', depth=2)
            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg.test_pkg.script', zip_name)
            self._check_script(launch_name)

    def test_package(self):
        source = self.main_in_children_source
249
        with support.temp_dir() as script_dir:
250 251 252 253 254 255 256 257 258
            pkg_dir = os.path.join(script_dir, 'test_pkg')
            make_pkg(pkg_dir)
            script_name = _make_test_script(pkg_dir, '__main__',
                                            source=source)
            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
            self._check_script(launch_name)

    def test_package_compiled(self):
        source = self.main_in_children_source
259
        with support.temp_dir() as script_dir:
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
            pkg_dir = os.path.join(script_dir, 'test_pkg')
            make_pkg(pkg_dir)
            script_name = _make_test_script(pkg_dir, '__main__',
                                            source=source)
            compiled_name = py_compile.compile(script_name, doraise=True)
            os.remove(script_name)
            pyc_file = support.make_legacy_pyc(script_name)
            launch_name = _make_launch_script(script_dir, 'launch', 'test_pkg')
            self._check_script(launch_name)

# Test all supported start methods (setupClass skips as appropriate)

class SpawnCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
    start_method = 'spawn'
    main_in_children_source = test_source_main_skipped_in_children

class ForkCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
    start_method = 'fork'
    main_in_children_source = test_source

class ForkServerCmdLineTest(MultiProcessingCmdLineMixin, unittest.TestCase):
    start_method = 'forkserver'
    main_in_children_source = test_source_main_skipped_in_children

284
def tearDownModule():
285 286 287
    support.reap_children()

if __name__ == '__main__':
288
    unittest.main()