test_faulthandler.py 17.7 KB
Newer Older
1
from contextlib import contextmanager
2
import datetime
3
import faulthandler
4
import os
5 6 7 8 9
import re
import signal
import subprocess
import sys
from test import support, script_helper
10
from test.script_helper import assert_python_ok
11 12 13
import tempfile
import unittest

14 15 16 17 18 19
try:
    import threading
    HAVE_THREADS = True
except ImportError:
    HAVE_THREADS = False

20 21
TIMEOUT = 0.5

22 23 24 25 26 27 28 29 30 31 32 33
try:
    from resource import setrlimit, RLIMIT_CORE, error as resource_error
except ImportError:
    prepare_subprocess = None
else:
    def prepare_subprocess():
        # don't create core file
        try:
            setrlimit(RLIMIT_CORE, (0, 0))
        except (ValueError, resource_error):
            pass

34
def expected_traceback(lineno1, lineno2, header, min_count=1):
35
    regex = header
36 37
    regex += '  File "<string>", line %s in func\n' % lineno1
    regex += '  File "<string>", line %s in <module>' % lineno2
38 39 40 41
    if 1 < min_count:
        return '^' + (regex + '\n') * (min_count - 1) + regex
    else:
        return '^' + regex + '$'
42 43 44 45 46 47 48 49 50 51

@contextmanager
def temporary_filename():
    filename = tempfile.mktemp()
    try:
        yield filename
    finally:
        support.unlink(filename)

class FaultHandlerTests(unittest.TestCase):
52
    def get_output(self, code, filename=None):
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
        """
        Run the specified code in Python (in a new child process) and read the
        output from the standard error or from a file (if filename is set).
        Return the output lines as a list.

        Strip the reference count from the standard error for Python debug
        build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
        thread XXX".
        """
        options = {}
        if prepare_subprocess:
            options['preexec_fn'] = prepare_subprocess
        process = script_helper.spawn_python('-c', code, **options)
        stdout, stderr = process.communicate()
        exitcode = process.wait()
68 69
        output = support.strip_python_stderr(stdout)
        output = output.decode('ascii', 'backslashreplace')
70
        if filename:
71
            self.assertEqual(output, '')
72 73
            with open(filename, "rb") as fp:
                output = fp.read()
74
            output = output.decode('ascii', 'backslashreplace')
75 76 77
        output = re.sub('Current thread 0x[0-9a-f]+',
                        'Current thread XXX',
                        output)
78
        return output.splitlines(), exitcode
79 80

    def check_fatal_error(self, code, line_number, name_regex,
81
                          filename=None, all_threads=True, other_regex=None):
82 83 84 85 86 87 88 89 90 91 92 93 94 95
        """
        Check that the fault handler for fatal errors is enabled and check the
        traceback from the child process output.

        Raise an error if the output doesn't match the expected format.
        """
        if all_threads:
            header = 'Current thread XXX'
        else:
            header = 'Traceback (most recent call first)'
        regex = """
^Fatal Python error: {name}

{header}:
96
  File "<string>", line {lineno} in <module>
97 98 99 100 101
""".strip()
        regex = regex.format(
            lineno=line_number,
            name=name_regex,
            header=re.escape(header))
102 103
        if other_regex:
            regex += '|' + other_regex
104
        with support.suppress_crash_popup():
Ezio Melotti's avatar
Ezio Melotti committed
105
            output, exitcode = self.get_output(code, filename)
106 107
        output = '\n'.join(output)
        self.assertRegex(output, regex)
108
        self.assertNotEqual(exitcode, 0)
109

110 111
    @unittest.skipIf(sys.platform.startswith('aix'),
                     "the first page of memory is a mapped read-only on AIX")
112 113 114 115 116 117 118
    def test_read_null(self):
        self.check_fatal_error("""
import faulthandler
faulthandler.enable()
faulthandler._read_null()
""".strip(),
            3,
119 120
            # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
            '(?:Segmentation fault|Bus error|Illegal instruction)')
121 122 123 124 125 126 127 128 129 130

    def test_sigsegv(self):
        self.check_fatal_error("""
import faulthandler
faulthandler.enable()
faulthandler._sigsegv()
""".strip(),
            3,
            'Segmentation fault')

131 132 133 134 135 136 137 138 139
    def test_sigabrt(self):
        self.check_fatal_error("""
import faulthandler
faulthandler.enable()
faulthandler._sigabrt()
""".strip(),
            3,
            'Aborted')

140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
    @unittest.skipIf(sys.platform == 'win32',
                     "SIGFPE cannot be caught on Windows")
    def test_sigfpe(self):
        self.check_fatal_error("""
import faulthandler
faulthandler.enable()
faulthandler._sigfpe()
""".strip(),
            3,
            'Floating point exception')

    @unittest.skipIf(not hasattr(faulthandler, '_sigbus'),
                     "need faulthandler._sigbus()")
    def test_sigbus(self):
        self.check_fatal_error("""
import faulthandler
faulthandler.enable()
faulthandler._sigbus()
""".strip(),
            3,
            'Bus error')

    @unittest.skipIf(not hasattr(faulthandler, '_sigill'),
                     "need faulthandler._sigill()")
    def test_sigill(self):
        self.check_fatal_error("""
import faulthandler
faulthandler.enable()
faulthandler._sigill()
""".strip(),
            3,
            'Illegal instruction')

    def test_fatal_error(self):
        self.check_fatal_error("""
import faulthandler
faulthandler._fatal_error(b'xyz')
""".strip(),
            2,
            'xyz')

181 182 183
    @unittest.skipIf(sys.platform.startswith('openbsd') and HAVE_THREADS,
                     "Issue #12868: sigaltstack() doesn't work on "
                     "OpenBSD if Python is compiled with pthread")
184 185 186 187 188 189 190 191 192
    @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
                     'need faulthandler._stack_overflow()')
    def test_stack_overflow(self):
        self.check_fatal_error("""
import faulthandler
faulthandler.enable()
faulthandler._stack_overflow()
""".strip(),
            3,
193 194
            '(?:Segmentation fault|Bus error)',
            other_regex='unable to raise a stack overflow')
195 196 197 198 199 200 201 202

    def test_gil_released(self):
        self.check_fatal_error("""
import faulthandler
faulthandler.enable()
faulthandler._read_null(True)
""".strip(),
            3,
203
            '(?:Segmentation fault|Bus error|Illegal instruction)')
204 205 206 207 208 209 210

    def test_enable_file(self):
        with temporary_filename() as filename:
            self.check_fatal_error("""
import faulthandler
output = open({filename}, 'wb')
faulthandler.enable(output)
211
faulthandler._sigsegv()
212 213
""".strip().format(filename=repr(filename)),
                4,
214
                'Segmentation fault',
215 216
                filename=filename)

217
    def test_enable_single_thread(self):
218 219
        self.check_fatal_error("""
import faulthandler
220
faulthandler.enable(all_threads=False)
221
faulthandler._sigsegv()
222 223
""".strip(),
            3,
224
            'Segmentation fault',
225
            all_threads=False)
226 227 228 229 230 231

    def test_disable(self):
        code = """
import faulthandler
faulthandler.enable()
faulthandler.disable()
232
faulthandler._sigsegv()
233 234
""".strip()
        not_expected = 'Fatal Python error'
235 236
        with support.suppress_crash_popup():
            stderr, exitcode = self.get_output(code)
237 238 239
        stder = '\n'.join(stderr)
        self.assertTrue(not_expected not in stderr,
                     "%r is present in %r" % (not_expected, stderr))
240
        self.assertNotEqual(exitcode, 0)
241 242

    def test_is_enabled(self):
243
        orig_stderr = sys.stderr
244
        try:
245 246 247
            # regrtest may replace sys.stderr by io.StringIO object, but
            # faulthandler.enable() requires that sys.stderr has a fileno()
            # method
248
            sys.stderr = sys.__stderr__
249 250 251

            was_enabled = faulthandler.is_enabled()
            try:
252
                faulthandler.enable()
253
                self.assertTrue(faulthandler.is_enabled())
254
                faulthandler.disable()
255 256 257 258 259 260 261 262
                self.assertFalse(faulthandler.is_enabled())
            finally:
                if was_enabled:
                    faulthandler.enable()
                else:
                    faulthandler.disable()
        finally:
            sys.stderr = orig_stderr
263

264 265 266 267 268 269 270 271 272 273 274 275 276 277
    def test_disabled_by_default(self):
        # By default, the module should be disabled
        code = "import faulthandler; print(faulthandler.is_enabled())"
        rc, stdout, stderr = assert_python_ok("-c", code)
        stdout = (stdout + stderr).strip()
        self.assertEqual(stdout, b"False")

    def test_sys_xoptions(self):
        # Test python -X faulthandler
        code = "import faulthandler; print(faulthandler.is_enabled())"
        rc, stdout, stderr = assert_python_ok("-X", "faulthandler", "-c", code)
        stdout = (stdout + stderr).strip()
        self.assertEqual(stdout, b"True")

278 279 280 281 282 283 284 285 286 287 288
    def check_dump_traceback(self, filename):
        """
        Explicitly call dump_traceback() function and check its output.
        Raise an error if the output doesn't match the expected format.
        """
        code = """
import faulthandler

def funcB():
    if {has_filename}:
        with open({filename}, "wb") as fp:
289
            faulthandler.dump_traceback(fp, all_threads=False)
290
    else:
291
        faulthandler.dump_traceback(all_threads=False)
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311

def funcA():
    funcB()

funcA()
""".strip()
        code = code.format(
            filename=repr(filename),
            has_filename=bool(filename),
        )
        if filename:
            lineno = 6
        else:
            lineno = 8
        expected = [
            'Traceback (most recent call first):',
            '  File "<string>", line %s in funcB' % lineno,
            '  File "<string>", line 11 in funcA',
            '  File "<string>", line 13 in <module>'
        ]
312
        trace, exitcode = self.get_output(code, filename)
313
        self.assertEqual(trace, expected)
314
        self.assertEqual(exitcode, 0)
315 316 317

    def test_dump_traceback(self):
        self.check_dump_traceback(None)
318 319

    def test_dump_traceback_file(self):
320 321 322
        with temporary_filename() as filename:
            self.check_dump_traceback(filename)

323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
    def test_truncate(self):
        maxlen = 500
        func_name = 'x' * (maxlen + 50)
        truncated = 'x' * maxlen + '...'
        code = """
import faulthandler

def {func_name}():
    faulthandler.dump_traceback(all_threads=False)

{func_name}()
""".strip()
        code = code.format(
            func_name=func_name,
        )
        expected = [
            'Traceback (most recent call first):',
            '  File "<string>", line 4 in %s' % truncated,
            '  File "<string>", line 6 in <module>'
        ]
        trace, exitcode = self.get_output(code)
        self.assertEqual(trace, expected)
        self.assertEqual(exitcode, 0)

347
    @unittest.skipIf(not HAVE_THREADS, 'need threads')
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
    def check_dump_traceback_threads(self, filename):
        """
        Call explicitly dump_traceback(all_threads=True) and check the output.
        Raise an error if the output doesn't match the expected format.
        """
        code = """
import faulthandler
from threading import Thread, Event
import time

def dump():
    if {filename}:
        with open({filename}, "wb") as fp:
            faulthandler.dump_traceback(fp, all_threads=True)
    else:
        faulthandler.dump_traceback(all_threads=True)

class Waiter(Thread):
    # avoid blocking if the main thread raises an exception.
    daemon = True

    def __init__(self):
        Thread.__init__(self)
        self.running = Event()
        self.stop = Event()

    def run(self):
        self.running.set()
        self.stop.wait()

waiter = Waiter()
waiter.start()
waiter.running.wait()
dump()
waiter.stop.set()
waiter.join()
""".strip()
        code = code.format(filename=repr(filename))
386
        output, exitcode = self.get_output(code, filename)
387 388 389 390 391 392 393
        output = '\n'.join(output)
        if filename:
            lineno = 8
        else:
            lineno = 10
        regex = """
^Thread 0x[0-9a-f]+:
394 395
(?:  File ".*threading.py", line [0-9]+ in [_a-z]+
){{1,3}}  File "<string>", line 23 in run
396 397 398 399 400 401 402 403 404
  File ".*threading.py", line [0-9]+ in _bootstrap_inner
  File ".*threading.py", line [0-9]+ in _bootstrap

Current thread XXX:
  File "<string>", line {lineno} in dump
  File "<string>", line 28 in <module>$
""".strip()
        regex = regex.format(lineno=lineno)
        self.assertRegex(output, regex)
405
        self.assertEqual(exitcode, 0)
406 407 408

    def test_dump_traceback_threads(self):
        self.check_dump_traceback_threads(None)
409 410

    def test_dump_traceback_threads_file(self):
411 412 413
        with temporary_filename() as filename:
            self.check_dump_traceback_threads(filename)

414
    def _check_dump_traceback_later(self, repeat, cancel, filename, loops):
415 416 417 418 419 420 421
        """
        Check how many times the traceback is written in timeout x 2.5 seconds,
        or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
        on repeat and cancel options.

        Raise an error if the output doesn't match the expect format.
        """
422
        timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
423 424 425 426
        code = """
import faulthandler
import time

427 428
def func(timeout, repeat, cancel, file, loops):
    for loop in range(loops):
429
        faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
430
        if cancel:
431
            faulthandler.cancel_dump_traceback_later()
432
        time.sleep(timeout * 5)
433
        faulthandler.cancel_dump_traceback_later()
434

435
timeout = {timeout}
436 437
repeat = {repeat}
cancel = {cancel}
438
loops = {loops}
439 440 441 442
if {has_filename}:
    file = open({filename}, "wb")
else:
    file = None
443
func(timeout, repeat, cancel, file, loops)
444 445 446 447
if file is not None:
    file.close()
""".strip()
        code = code.format(
448
            timeout=TIMEOUT,
449 450
            repeat=repeat,
            cancel=cancel,
451 452 453
            loops=loops,
            has_filename=bool(filename),
            filename=repr(filename),
454
        )
455
        trace, exitcode = self.get_output(code, filename)
456 457
        trace = '\n'.join(trace)

458
        if not cancel:
459
            count = loops
460
            if repeat:
461
                count *= 2
462
            header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+:\n' % timeout_str
463
            regex = expected_traceback(9, 20, header, min_count=count)
464
            self.assertRegex(trace, regex)
465
        else:
466
            self.assertEqual(trace, '')
467
        self.assertEqual(exitcode, 0)
468

469 470 471
    @unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'),
                     'need faulthandler.dump_traceback_later()')
    def check_dump_traceback_later(self, repeat=False, cancel=False,
472 473 474 475 476
                                    file=False, twice=False):
        if twice:
            loops = 2
        else:
            loops = 1
477 478
        if file:
            with temporary_filename() as filename:
479
                self._check_dump_traceback_later(repeat, cancel,
480
                                                  filename, loops)
481
        else:
482
            self._check_dump_traceback_later(repeat, cancel, None, loops)
483

484 485
    def test_dump_traceback_later(self):
        self.check_dump_traceback_later()
486

487 488
    def test_dump_traceback_later_repeat(self):
        self.check_dump_traceback_later(repeat=True)
489

490 491
    def test_dump_traceback_later_cancel(self):
        self.check_dump_traceback_later(cancel=True)
492

493 494
    def test_dump_traceback_later_file(self):
        self.check_dump_traceback_later(file=True)
495

496 497
    def test_dump_traceback_later_twice(self):
        self.check_dump_traceback_later(twice=True)
498

499 500
    @unittest.skipIf(not hasattr(faulthandler, "register"),
                     "need faulthandler.register")
501
    def check_register(self, filename=False, all_threads=False,
502
                       unregister=False, chain=False):
503 504 505 506
        """
        Register a handler displaying the traceback on a user signal. Raise the
        signal and check the written traceback.

507 508
        If chain is True, check that the previous signal handler is called.

509 510
        Raise an error if the output doesn't match the expected format.
        """
511
        signum = signal.SIGUSR1
512 513 514 515
        code = """
import faulthandler
import os
import signal
516
import sys
517 518 519 520

def func(signum):
    os.kill(os.getpid(), signum)

521 522 523 524 525
def handler(signum, frame):
    handler.called = True
handler.called = False

exitcode = 0
526 527
signum = {signum}
unregister = {unregister}
528 529
chain = {chain}

530 531 532 533
if {has_filename}:
    file = open({filename}, "wb")
else:
    file = None
534 535 536 537
if chain:
    signal.signal(signum, handler)
faulthandler.register(signum, file=file,
                      all_threads={all_threads}, chain={chain})
538 539
if unregister:
    faulthandler.unregister(signum)
540
func(signum)
541 542 543 544 545 546 547
if chain and not handler.called:
    if file is not None:
        output = file
    else:
        output = sys.stderr
    print("Error: signal handler not called!", file=output)
    exitcode = 1
548 549
if file is not None:
    file.close()
550
sys.exit(exitcode)
551 552 553 554 555
""".strip()
        code = code.format(
            filename=repr(filename),
            has_filename=bool(filename),
            all_threads=all_threads,
556 557
            signum=signum,
            unregister=unregister,
558
            chain=chain,
559
        )
560
        trace, exitcode = self.get_output(code, filename)
561
        trace = '\n'.join(trace)
562 563 564 565 566
        if not unregister:
            if all_threads:
                regex = 'Current thread XXX:\n'
            else:
                regex = 'Traceback \(most recent call first\):\n'
567
            regex = expected_traceback(7, 28, regex)
568
            self.assertRegex(trace, regex)
569
        else:
570 571 572 573 574
            self.assertEqual(trace, '')
        if unregister:
            self.assertNotEqual(exitcode, 0)
        else:
            self.assertEqual(exitcode, 0)
575 576 577 578

    def test_register(self):
        self.check_register()

579 580 581
    def test_unregister(self):
        self.check_register(unregister=True)

582 583 584 585 586 587 588
    def test_register_file(self):
        with temporary_filename() as filename:
            self.check_register(filename=filename)

    def test_register_threads(self):
        self.check_register(all_threads=True)

589 590 591
    def test_register_chain(self):
        self.check_register(chain=True)

592 593 594 595 596 597

def test_main():
    support.run_unittest(FaultHandlerTests)

if __name__ == "__main__":
    test_main()