runtest_mp.py 7.5 KB
Newer Older
1
import faulthandler
2 3
import json
import os
4
import queue
5
import sys
6
import threading
7
import time
8
import traceback
9
import types
10 11
from test import support

12
from test.libregrtest.runtest import (
13 14
    runtest, INTERRUPTED, CHILD_ERROR, PROGRESS_MIN_TIME,
    format_test_result)
15
from test.libregrtest.setup import setup_tests
16
from test.libregrtest.utils import format_duration
17 18


19
# Display the running tests if nothing happened last N seconds
20
PROGRESS_UPDATE = 30.0   # seconds
21

Victor Stinner's avatar
Victor Stinner committed
22
# If interrupted, display the wait progress every N seconds
23 24
WAIT_PROGRESS = 2.0   # seconds

25 26

def run_test_in_subprocess(testname, ns):
27
    """Run the given test in a subprocess with --worker-args.
28 29

    ns is the option Namespace parsed from command-line arguments. regrtest
30
    is invoked in a subprocess with the --worker-args argument; when the
31 32 33 34
    subprocess exits, its return code, stdout and stderr are returned as a
    3-tuple.
    """
    from subprocess import Popen, PIPE
35

36
    ns_dict = vars(ns)
37 38
    worker_args = (ns_dict, testname)
    worker_args = json.dumps(worker_args)
39 40

    cmd = [sys.executable, *support.args_from_interpreter_flags(),
41
           '-u',    # Unbuffered stdout and stderr
42
           '-m', 'test.regrtest',
43
           '--worker-args', worker_args]
44 45
    if ns.pgo:
        cmd += ['--pgo']
46

47 48 49
    # Running the child from the same working directory as regrtest's original
    # invocation ensures that TEMPDIR for the child is the same when
    # sysconfig.is_python_build() is true. See issue 15300.
50
    popen = Popen(cmd,
51 52 53 54
                  stdout=PIPE, stderr=PIPE,
                  universal_newlines=True,
                  close_fds=(os.name != 'nt'),
                  cwd=support.SAVEDCWD)
55 56 57
    with popen:
        stdout, stderr = popen.communicate()
        retcode = popen.wait()
58 59 60
    return retcode, stdout, stderr


61 62
def run_tests_worker(worker_args):
    ns_dict, testname = json.loads(worker_args)
63 64
    ns = types.SimpleNamespace(**ns_dict)

65
    setup_tests(ns)
66

67
    try:
68
        result = runtest(ns, testname)
69
    except KeyboardInterrupt:
70
        result = INTERRUPTED, '', None
71 72 73
    except BaseException as e:
        traceback.print_exc()
        result = CHILD_ERROR, str(e)
74

75
    print()   # Force a newline (just in case)
76
    print(json.dumps(result), flush=True)
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    sys.exit(0)


# We do not use a generator so multiple threads can call next().
class MultiprocessIterator:

    """A thread-safe iterator over tests for multiprocess mode."""

    def __init__(self, tests):
        self.interrupted = False
        self.lock = threading.Lock()
        self.tests = tests

    def __iter__(self):
        return self

    def __next__(self):
        with self.lock:
            if self.interrupted:
                raise StopIteration('tests interrupted')
            return next(self.tests)


class MultiprocessThread(threading.Thread):
    def __init__(self, pending, output, ns):
        super().__init__()
        self.pending = pending
        self.output = output
        self.ns = ns
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
        self.current_test = None
        self.start_time = None

    def _runtest(self):
        try:
            test = next(self.pending)
        except StopIteration:
            self.output.put((None, None, None, None))
            return True

        try:
            self.start_time = time.monotonic()
            self.current_test = test

            retcode, stdout, stderr = run_test_in_subprocess(test, self.ns)
        finally:
            self.current_test = None

        if retcode != 0:
125
            result = (CHILD_ERROR, "Exit code %s" % retcode, None)
126 127
            self.output.put((test, stdout.rstrip(), stderr.rstrip(),
                             result))
128
            return False
129

130
        stdout, _, result = stdout.strip().rpartition("\n")
131 132 133 134 135
        if not result:
            self.output.put((None, None, None, None))
            return True

        result = json.loads(result)
136
        assert len(result) == 3, f"Invalid result tuple: {result!r}"
137 138 139
        self.output.put((test, stdout.rstrip(), stderr.rstrip(),
                         result))
        return False
140 141 142

    def run(self):
        try:
143 144 145
            stop = False
            while not stop:
                stop = self._runtest()
146 147 148 149 150 151
        except BaseException:
            self.output.put((None, None, None, None))
            raise


def run_tests_multiprocess(regrtest):
152
    output = queue.Queue()
153
    pending = MultiprocessIterator(regrtest.tests)
154 155
    test_timeout = regrtest.ns.timeout
    use_timeout = (test_timeout is not None)
156 157 158

    workers = [MultiprocessThread(pending, output, regrtest.ns)
               for i in range(regrtest.ns.use_mp)]
159 160
    print("Run tests in parallel using %s child processes"
          % len(workers))
161 162
    for worker in workers:
        worker.start()
163 164 165 166 167 168 169 170 171

    def get_running(workers):
        running = []
        for worker in workers:
            current_test = worker.current_test
            if not current_test:
                continue
            dt = time.monotonic() - worker.start_time
            if dt >= PROGRESS_MIN_TIME:
172 173
                text = '%s (%s)' % (current_test, format_duration(dt))
                running.append(text)
174 175
        return running

176 177
    finished = 0
    test_index = 1
178
    get_timeout = max(PROGRESS_UPDATE, PROGRESS_MIN_TIME)
179 180
    try:
        while finished < regrtest.ns.use_mp:
181 182 183
            if use_timeout:
                faulthandler.dump_traceback_later(test_timeout, exit=True)

184
            try:
185
                item = output.get(timeout=get_timeout)
186 187
            except queue.Empty:
                running = get_running(workers)
188
                if running and not regrtest.ns.pgo:
189
                    print('running: %s' % ', '.join(running), flush=True)
190 191 192
                continue

            test, stdout, stderr, result = item
193 194 195 196
            if test is None:
                finished += 1
                continue
            regrtest.accumulate_result(test, result)
197 198

            # Display progress
199
            ok, test_time, xml_data = result
200
            text = format_test_result(test, ok)
201
            if (ok not in (CHILD_ERROR, INTERRUPTED)
202 203
                and test_time >= PROGRESS_MIN_TIME
                and not regrtest.ns.pgo):
204
                text += ' (%s)' % format_duration(test_time)
205 206
            elif ok == CHILD_ERROR:
                text = '%s (%s)' % (text, test_time)
207
            running = get_running(workers)
208
            if running and not regrtest.ns.pgo:
209 210 211 212
                text += ' -- running: %s' % ', '.join(running)
            regrtest.display_progress(test_index, text)

            # Copy stdout and stderr from the child process
213
            if stdout:
214
                print(stdout, flush=True)
215
            if stderr and not regrtest.ns.pgo:
216
                print(stderr, file=sys.stderr, flush=True)
217

218 219 220 221 222 223
            if result[0] == INTERRUPTED:
                raise KeyboardInterrupt
            test_index += 1
    except KeyboardInterrupt:
        regrtest.interrupted = True
        pending.interrupted = True
224
        print()
225 226 227
    finally:
        if use_timeout:
            faulthandler.cancel_dump_traceback_later()
228

229 230 231 232 233 234 235 236 237 238 239 240
    # If tests are interrupted, wait until tests complete
    wait_start = time.monotonic()
    while True:
        running = [worker.current_test for worker in workers]
        running = list(filter(bool, running))
        if not running:
            break

        dt = time.monotonic() - wait_start
        line = "Waiting for %s (%s tests)" % (', '.join(running), len(running))
        if dt >= WAIT_PROGRESS:
            line = "%s since %.0f sec" % (line, dt)
241
        print(line, flush=True)
242 243
        for worker in workers:
            worker.join(WAIT_PROGRESS)