test_threading.py 28.8 KB
Newer Older
1 2
# Very rudimentary test of threading module

3
import test.support
4
from test.support import verbose, strip_python_stderr, import_module
5
import random
Georg Brandl's avatar
Georg Brandl committed
6
import re
7
import sys
8 9
_thread = import_module('_thread')
threading = import_module('threading')
10
import time
11
import unittest
12
import weakref
13
import os
14
from test.script_helper import assert_python_ok, assert_python_failure
15
import subprocess
16

17 18
from test import lock_tests

19 20 21 22 23 24 25 26 27 28
# A trivial mutable counter.
class Counter(object):
    def __init__(self):
        self.value = 0
    def inc(self):
        self.value += 1
    def dec(self):
        self.value -= 1
    def get(self):
        return self.value
29 30

class TestThread(threading.Thread):
31 32 33 34 35 36 37
    def __init__(self, name, testcase, sema, mutex, nrunning):
        threading.Thread.__init__(self, name=name)
        self.testcase = testcase
        self.sema = sema
        self.mutex = mutex
        self.nrunning = nrunning

38
    def run(self):
Christian Heimes's avatar
Christian Heimes committed
39
        delay = random.random() / 10000.0
40
        if verbose:
41
            print('task %s will run for %.1f usec' %
42
                  (self.name, delay * 1e6))
43

Christian Heimes's avatar
Christian Heimes committed
44 45 46 47 48
        with self.sema:
            with self.mutex:
                self.nrunning.inc()
                if verbose:
                    print(self.nrunning.get(), 'tasks are running')
49
                self.testcase.assertTrue(self.nrunning.get() <= 3)
50

Christian Heimes's avatar
Christian Heimes committed
51 52
            time.sleep(delay)
            if verbose:
53
                print('task', self.name, 'done')
54

Christian Heimes's avatar
Christian Heimes committed
55 56
            with self.mutex:
                self.nrunning.dec()
57
                self.testcase.assertTrue(self.nrunning.get() >= 0)
Christian Heimes's avatar
Christian Heimes committed
58 59
                if verbose:
                    print('%s is finished. %d tasks are running' %
60
                          (self.name, self.nrunning.get()))
61

62

63 64 65 66 67 68 69 70 71 72
class BaseTestCase(unittest.TestCase):
    def setUp(self):
        self._threads = test.support.threading_setup()

    def tearDown(self):
        test.support.threading_cleanup(*self._threads)
        test.support.reap_children()


class ThreadTests(BaseTestCase):
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90

    # Create a bunch of threads, let each do some work, wait until all are
    # done.
    def test_various_ops(self):
        # This takes about n/3 seconds to run (about n/3 clumps of tasks,
        # times about 1 second per clump).
        NUMTASKS = 10

        # no more than 3 of the 10 can run at once
        sema = threading.BoundedSemaphore(value=3)
        mutex = threading.RLock()
        numrunning = Counter()

        threads = []

        for i in range(NUMTASKS):
            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
            threads.append(t)
91 92
            self.assertEqual(t.ident, None)
            self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
93 94 95
            t.start()

        if verbose:
96
            print('waiting for all tasks to complete')
97
        for t in threads:
98
            t.join(NUMTASKS)
99 100
            self.assertTrue(not t.is_alive())
            self.assertNotEqual(t.ident, 0)
Benjamin Peterson's avatar
Benjamin Peterson committed
101
            self.assertFalse(t.ident is None)
102 103
            self.assertTrue(re.match('<TestThread\(.*, stopped -?\d+\)>',
                                     repr(t)))
104
        if verbose:
105
            print('all tasks done')
106 107
        self.assertEqual(numrunning.get(), 0)

Benjamin Peterson's avatar
Benjamin Peterson committed
108 109 110 111 112 113 114 115 116 117 118
    def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertFalse(threading.currentThread().ident is None)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        _thread.start_new_thread(f, ())
        done.wait()
        self.assertFalse(ident[0] is None)
119 120
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]
Benjamin Peterson's avatar
Benjamin Peterson committed
121

122 123 124
    # run with a small(ish) thread stack size (256kB)
    def test_various_ops_small_stack(self):
        if verbose:
125
            print('with 256kB thread stack size...')
126 127
        try:
            threading.stack_size(262144)
128
        except _thread.error:
129 130
            raise unittest.SkipTest(
                'platform does not support changing thread stack size')
131 132 133 134 135 136
        self.test_various_ops()
        threading.stack_size(0)

    # run with a large thread stack size (1MB)
    def test_various_ops_large_stack(self):
        if verbose:
137
            print('with 1MB thread stack size...')
138 139
        try:
            threading.stack_size(0x100000)
140
        except _thread.error:
141 142
            raise unittest.SkipTest(
                'platform does not support changing thread stack size')
143 144 145
        self.test_various_ops()
        threading.stack_size(0)

146 147 148
    def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
149
            # Calling current_thread() forces an entry for the foreign
150
            # thread to get made in the threading._active map.
151
            threading.current_thread()
152 153 154 155
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
156
        tid = _thread.start_new_thread(f, (mutex,))
157 158
        # Wait for the thread to finish.
        mutex.acquire()
159
        self.assertIn(tid, threading._active)
160
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
161
        del threading._active[tid]
162

163 164 165
    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
    def test_PyThreadState_SetAsyncExc(self):
166
        ctypes = import_module("ctypes")
167 168 169 170 171 172 173 174

        set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc

        class AsyncExc(Exception):
            pass

        exception = ctypes.py_object(AsyncExc)

175 176 177 178 179 180 181 182 183 184 185 186
        # First check it works when setting the exception from the same thread.
        tid = _thread.get_ident()

        try:
            result = set_async_exc(ctypes.c_long(tid), exception)
            # The exception is async, so we might have to keep the VM busy until
            # it notices.
            while True:
                pass
        except AsyncExc:
            pass
        else:
Benjamin Peterson's avatar
Benjamin Peterson committed
187 188
            # This code is unreachable but it reflects the intent. If we wanted
            # to be smarter the above loop wouldn't be infinite.
189 190 191 192
            self.fail("AsyncExc not raised")
        try:
            self.assertEqual(result, 1) # one thread state modified
        except UnboundLocalError:
Benjamin Peterson's avatar
Benjamin Peterson committed
193
            # The exception was raised too quickly for us to get the result.
194 195
            pass

196 197 198 199 200 201 202 203 204
        # `worker_started` is set by the thread when it's inside a try/except
        # block waiting to catch the asynchronously set AsyncExc exception.
        # `worker_saw_exception` is set by the thread upon catching that
        # exception.
        worker_started = threading.Event()
        worker_saw_exception = threading.Event()

        class Worker(threading.Thread):
            def run(self):
205
                self.id = _thread.get_ident()
206 207 208 209 210 211 212 213 214 215 216
                self.finished = False

                try:
                    while True:
                        worker_started.set()
                        time.sleep(0.1)
                except AsyncExc:
                    self.finished = True
                    worker_saw_exception.set()

        t = Worker()
217
        t.daemon = True # so if this fails, we don't hang Python at shutdown
218 219
        t.start()
        if verbose:
220
            print("    started worker thread")
221 222 223

        # Try a thread id that doesn't make sense.
        if verbose:
224
            print("    trying nonsensical thread id")
225 226 227 228 229
        result = set_async_exc(ctypes.c_long(-1), exception)
        self.assertEqual(result, 0)  # no thread states modified

        # Now raise an exception in the worker thread.
        if verbose:
230
            print("    waiting for worker thread to get started")
Benjamin Peterson's avatar
Benjamin Peterson committed
231 232
        ret = worker_started.wait()
        self.assertTrue(ret)
233
        if verbose:
234
            print("    verifying worker hasn't exited")
235
        self.assertTrue(not t.finished)
236
        if verbose:
237
            print("    attempting to raise asynch exception in worker")
238 239 240
        result = set_async_exc(ctypes.c_long(t.id), exception)
        self.assertEqual(result, 1) # one thread state modified
        if verbose:
241
            print("    waiting for worker to say it caught the exception")
242
        worker_saw_exception.wait(timeout=10)
243
        self.assertTrue(t.finished)
244
        if verbose:
245
            print("    all OK -- joining worker")
246 247 248 249
        if t.finished:
            t.join()
        # else the thread is still running, and we have no way to kill it

250 251 252 253 254 255 256 257
    def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise threading.ThreadError()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
258 259 260 261
            self.assertRaises(threading.ThreadError, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
262 263 264
        finally:
            threading._start_new_thread = _start_new_thread

265 266 267 268
    def test_finalize_runnning_thread(self):
        # Issue 1402: the PyGILState_Ensure / _Release functions may be called
        # very late on python exit: on deallocation of a running thread for
        # example.
269
        import_module("ctypes")
270

271
        rc, out, err = assert_python_failure("-c", """if 1:
272
            import ctypes, sys, time, _thread
273

Christian Heimes's avatar
Christian Heimes committed
274
            # This lock is used as a simple event variable.
275
            ready = _thread.allocate_lock()
Christian Heimes's avatar
Christian Heimes committed
276 277
            ready.acquire()

278 279 280 281 282 283 284 285 286 287 288
            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
Christian Heimes's avatar
Christian Heimes committed
289
                ready.release()
290 291
                time.sleep(100)

292
            _thread.start_new_thread(waitingThread, ())
Christian Heimes's avatar
Christian Heimes committed
293
            ready.acquire()  # Be sure the other thread is waiting.
294
            sys.exit(42)
295
            """)
296 297
        self.assertEqual(rc, 42)

Neal Norwitz's avatar
Neal Norwitz committed
298 299 300
    def test_finalize_with_trace(self):
        # Issue1733757
        # Avoid a deadlock when sys.settrace steps into threading._shutdown
301
        assert_python_ok("-c", """if 1:
Neal Norwitz's avatar
Neal Norwitz committed
302 303 304 305 306 307 308 309 310 311
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print('program blocked; aborting')
                os._exit(2)
            t = threading.Thread(target=killer)
312
            t.daemon = True
Neal Norwitz's avatar
Neal Norwitz committed
313 314 315 316
            t.start()

            # This is the trace function
            def func(frame, event, arg):
317
                threading.current_thread()
Neal Norwitz's avatar
Neal Norwitz committed
318 319 320
                return func

            sys.settrace(func)
321
            """)
Neal Norwitz's avatar
Neal Norwitz committed
322

323 324 325
    def test_join_nondaemon_on_shutdown(self):
        # Issue 1722344
        # Raising SystemExit skipped threading._shutdown
326
        rc, out, err = assert_python_ok("-c", """if 1:
327 328 329 330 331 332 333 334 335 336 337
                import threading
                from time import sleep

                def child():
                    sleep(1)
                    # As a non-daemon thread we SHOULD wake up and nothing
                    # should be torn down yet
                    print("Woke up, sleep function is:", sleep)

                threading.Thread(target=child).start()
                raise SystemExit
338 339
            """)
        self.assertEqual(out.strip(),
340
            b"Woke up, sleep function is: <built-in function sleep>")
341
        self.assertEqual(err, b"")
Neal Norwitz's avatar
Neal Norwitz committed
342

343 344 345 346
    def test_enumerate_after_join(self):
        # Try hard to trigger #1703448: a thread is still returned in
        # threading.enumerate() after it has been join()ed.
        enum = threading.enumerate
347
        old_interval = sys.getswitchinterval()
348
        try:
349
            for i in range(1, 100):
350
                sys.setswitchinterval(i * 0.0002)
351 352 353 354
                t = threading.Thread(target=lambda: None)
                t.start()
                t.join()
                l = enum()
355
                self.assertNotIn(t, l,
356 357
                    "#1703448 triggered after %d trials: %s" % (i, l))
        finally:
358
            sys.setswitchinterval(old_interval)
359

360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
    def test_no_refcycle_through_target(self):
        class RunSelfFunction(object):
            def __init__(self, should_raise):
                # The links in this refcycle from Thread back to self
                # should be cleaned up when the thread completes.
                self.should_raise = should_raise
                self.thread = threading.Thread(target=self._run,
                                               args=(self,),
                                               kwargs={'yet_another':self})
                self.thread.start()

            def _run(self, other_ref, yet_another):
                if self.should_raise:
                    raise SystemExit

        cyclic_object = RunSelfFunction(should_raise=False)
        weak_cyclic_object = weakref.ref(cyclic_object)
        cyclic_object.thread.join()
        del cyclic_object
379
        self.assertIsNone(weak_cyclic_object(),
380 381
                         msg=('%d references still around' %
                              sys.getrefcount(weak_cyclic_object())))
382 383 384 385 386

        raising_cyclic_object = RunSelfFunction(should_raise=True)
        weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
        raising_cyclic_object.thread.join()
        del raising_cyclic_object
387
        self.assertIsNone(weak_raising_cyclic_object(),
388 389
                         msg=('%d references still around' %
                              sys.getrefcount(weak_raising_cyclic_object())))
390

391 392 393
    def test_old_threading_api(self):
        # Just a quick sanity check to make sure the old method names are
        # still present
394
        t = threading.Thread()
395 396 397 398 399 400 401 402
        t.isDaemon()
        t.setDaemon(True)
        t.getName()
        t.setName("name")
        t.isAlive()
        e = threading.Event()
        e.isSet()
        threading.activeCount()
403

404 405 406 407 408
    def test_repr_daemon(self):
        t = threading.Thread()
        self.assertFalse('daemon' in repr(t))
        t.daemon = True
        self.assertTrue('daemon' in repr(t))
409

410

411
class ThreadJoinOnShutdown(BaseTestCase):
Jesse Noller's avatar
Jesse Noller committed
412

413 414 415 416 417 418 419
    # Between fork() and exec(), only async-safe functions are allowed (issues
    # #12316 and #11870), and fork() from a worker thread is known to trigger
    # problems with some operating systems (issue #3863): skip problematic tests
    # on platforms known to behave badly.
    platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
                         'os2emx')

Jesse Noller's avatar
Jesse Noller committed
420 421 422 423 424 425 426 427
    def _run_and_join(self, script):
        script = """if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print('end of thread')
428 429 430
                # stdout is fully buffered because not a tty, we have to flush
                # before exit.
                sys.stdout.flush()
Jesse Noller's avatar
Jesse Noller committed
431 432
        \n""" + script

433 434
        rc, out, err = assert_python_ok("-c", script)
        data = out.decode().replace('\r', '')
435
        self.assertEqual(data, "end of main\nend of thread\n")
Jesse Noller's avatar
Jesse Noller committed
436 437 438 439 440 441 442 443 444 445 446 447 448

    def test_1_join_on_shutdown(self):
        # The usual case: on exit, wait for a non-daemon thread
        script = """if 1:
            import os
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            time.sleep(0.1)
            print('end of main')
            """
        self._run_and_join(script)

449
    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
450
    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
Jesse Noller's avatar
Jesse Noller committed
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
    def test_2_join_in_forked_process(self):
        # Like the test above, but from a forked interpreter
        script = """if 1:
            childpid = os.fork()
            if childpid != 0:
                os.waitpid(childpid, 0)
                sys.exit(0)

            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            print('end of main')
            """
        self._run_and_join(script)

466
    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
467
    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
468
    def test_3_join_in_forked_from_thread(self):
Jesse Noller's avatar
Jesse Noller committed
469 470
        # Like the test above, but fork() was called from a worker thread
        # In the forked process, the main Thread object must be marked as stopped.
471

Jesse Noller's avatar
Jesse Noller committed
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490
        script = """if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            """
        self._run_and_join(script)

491
    def assertScriptHasOutput(self, script, expected_output):
492 493
        rc, out, err = assert_python_ok("-c", script)
        data = out.decode().replace('\r', '')
494 495 496
        self.assertEqual(data, expected_output)

    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
497
    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
    def test_4_joining_across_fork_in_worker_thread(self):
        # There used to be a possible deadlock when forking from a child
        # thread.  See http://bugs.python.org/issue6643.

        # The script takes the following steps:
        # - The main thread in the parent process starts a new thread and then
        #   tries to join it.
        # - The join operation acquires the Lock inside the thread's _block
        #   Condition.  (See threading.py:Thread.join().)
        # - We stub out the acquire method on the condition to force it to wait
        #   until the child thread forks.  (See LOCK ACQUIRED HERE)
        # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS
        #   HERE)
        # - The main thread of the parent process enters Condition.wait(),
        #   which releases the lock on the child thread.
        # - The child process returns.  Without the necessary fix, when the
        #   main thread of the child process (which used to be the child thread
        #   in the parent process) attempts to exit, it will try to acquire the
        #   lock in the Thread._block Condition object and hang, because the
        #   lock was held across the fork.

        script = """if 1:
            import os, time, threading

            finish_join = False
            start_fork = False

            def worker():
                # Wait until this thread's lock is acquired before forking to
                # create the deadlock.
                global finish_join
                while not start_fork:
                    time.sleep(0.01)
                # LOCK HELD: Main thread holds lock across this call.
                childpid = os.fork()
                finish_join = True
                if childpid != 0:
                    # Parent process just waits for child.
                    os.waitpid(childpid, 0)
                # Child process should just return.

            w = threading.Thread(target=worker)

            # Stub out the private condition variable's lock acquire method.
            # This acquires the lock and then waits until the child has forked
            # before returning, which will release the lock soon after.  If
            # someone else tries to fix this test case by acquiring this lock
545
            # before forking instead of resetting it, the test case will
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
            # deadlock when it shouldn't.
            condition = w._block
            orig_acquire = condition.acquire
            call_count_lock = threading.Lock()
            call_count = 0
            def my_acquire():
                global call_count
                global start_fork
                orig_acquire()  # LOCK ACQUIRED HERE
                start_fork = True
                if call_count == 0:
                    while not finish_join:
                        time.sleep(0.01)  # WORKER THREAD FORKS HERE
                with call_count_lock:
                    call_count += 1
            condition.acquire = my_acquire

            w.start()
            w.join()
            print('end of main')
            """
        self.assertScriptHasOutput(script, "end of main\n")

    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
570
    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626
    def test_5_clear_waiter_locks_to_avoid_crash(self):
        # Check that a spawned thread that forks doesn't segfault on certain
        # platforms, namely OS X.  This used to happen if there was a waiter
        # lock in the thread's condition variable's waiters list.  Even though
        # we know the lock will be held across the fork, it is not safe to
        # release locks held across forks on all platforms, so releasing the
        # waiter lock caused a segfault on OS X.  Furthermore, since locks on
        # OS X are (as of this writing) implemented with a mutex + condition
        # variable instead of a semaphore, while we know that the Python-level
        # lock will be acquired, we can't know if the internal mutex will be
        # acquired at the time of the fork.

        script = """if True:
            import os, time, threading

            start_fork = False

            def worker():
                # Wait until the main thread has attempted to join this thread
                # before continuing.
                while not start_fork:
                    time.sleep(0.01)
                childpid = os.fork()
                if childpid != 0:
                    # Parent process just waits for child.
                    (cpid, rc) = os.waitpid(childpid, 0)
                    assert cpid == childpid
                    assert rc == 0
                    print('end of worker thread')
                else:
                    # Child process should just return.
                    pass

            w = threading.Thread(target=worker)

            # Stub out the private condition variable's _release_save method.
            # This releases the condition's lock and flips the global that
            # causes the worker to fork.  At this point, the problematic waiter
            # lock has been acquired once by the waiter and has been put onto
            # the waiters list.
            condition = w._block
            orig_release_save = condition._release_save
            def my_release_save():
                global start_fork
                orig_release_save()
                # Waiter lock held here, condition lock released.
                start_fork = True
            condition._release_save = my_release_save

            w.start()
            w.join()
            print('end of main thread')
            """
        output = "end of worker thread\nend of main thread\n"
        self.assertScriptHasOutput(script, output)

627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642
    def test_6_daemon_threads(self):
        # Check that a daemon thread cannot crash the interpreter on shutdown
        # by manipulating internal structures that are being disposed of in
        # the main thread.
        script = """if True:
            import os
            import random
            import sys
            import time
            import threading

            thread_has_run = set()

            def random_io():
                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
                while True:
643
                    in_f = open(os.__file__, 'rb')
644
                    stuff = in_f.read(200)
645
                    null_f = open(os.devnull, 'wb')
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668
                    null_f.write(stuff)
                    time.sleep(random.random() / 1995)
                    null_f.close()
                    in_f.close()
                    thread_has_run.add(threading.current_thread())

            def main():
                count = 0
                for _ in range(40):
                    new_thread = threading.Thread(target=random_io)
                    new_thread.daemon = True
                    new_thread.start()
                    count += 1
                while len(thread_has_run) < count:
                    time.sleep(0.001)
                # Trigger process shutdown
                sys.exit(0)

            main()
            """
        rc, out, err = assert_python_ok('-c', script)
        self.assertFalse(err)

669
    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
670
    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
    def test_reinit_tls_after_fork(self):
        # Issue #13817: fork() would deadlock in a multithreaded program with
        # the ad-hoc TLS implementation.

        def do_fork_and_wait():
            # just fork a child process and wait it
            pid = os.fork()
            if pid > 0:
                os.waitpid(pid, 0)
            else:
                os._exit(0)

        # start a bunch of threads that will fork() child processes
        threads = []
        for i in range(16):
            t = threading.Thread(target=do_fork_and_wait)
            threads.append(t)
            t.start()

        for t in threads:
            t.join()

Jesse Noller's avatar
Jesse Noller committed
693

694
class ThreadingExceptionTests(BaseTestCase):
695 696 697 698 699 700 701 702
    # A RuntimeError should be raised if Thread.start() is called
    # multiple times.
    def test_start_thread_again(self):
        thread = threading.Thread()
        thread.start()
        self.assertRaises(RuntimeError, thread.start)

    def test_joining_current_thread(self):
703 704
        current_thread = threading.current_thread()
        self.assertRaises(RuntimeError, current_thread.join);
705 706 707 708 709 710 711 712

    def test_joining_inactive_thread(self):
        thread = threading.Thread()
        self.assertRaises(RuntimeError, thread.join)

    def test_daemonize_active_thread(self):
        thread = threading.Thread()
        thread.start()
713
        self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
714

715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
    @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
    def test_recursion_limit(self):
        # Issue 9670
        # test that excessive recursion within a non-main thread causes
        # an exception rather than crashing the interpreter on platforms
        # like Mac OS X or FreeBSD which have small default stack sizes
        # for threads
        script = """if True:
            import threading

            def recurse():
                return recurse()

            def outer():
                try:
                    recurse()
                except RuntimeError:
                    pass

            w = threading.Thread(target=outer)
            w.start()
            w.join()
            print('end of main thread')
            """
        expected_output = "end of main thread\n"
        p = subprocess.Popen([sys.executable, "-c", script],
                             stdout=subprocess.PIPE)
        stdout, stderr = p.communicate()
        data = stdout.decode().replace('\r', '')
        self.assertEqual(p.returncode, 0, "Unexpected error")
        self.assertEqual(data, expected_output)
746

747 748 749
class LockTests(lock_tests.LockTests):
    locktype = staticmethod(threading.Lock)

750 751 752
class PyRLockTests(lock_tests.RLockTests):
    locktype = staticmethod(threading._PyRLock)

753
@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
754 755
class CRLockTests(lock_tests.RLockTests):
    locktype = staticmethod(threading._CRLock)
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772

class EventTests(lock_tests.EventTests):
    eventtype = staticmethod(threading.Event)

class ConditionAsRLockTests(lock_tests.RLockTests):
    # An Condition uses an RLock by default and exports its API.
    locktype = staticmethod(threading.Condition)

class ConditionTests(lock_tests.ConditionTests):
    condtype = staticmethod(threading.Condition)

class SemaphoreTests(lock_tests.SemaphoreTests):
    semtype = staticmethod(threading.Semaphore)

class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
    semtype = staticmethod(threading.BoundedSemaphore)

Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
773 774
class BarrierTests(lock_tests.BarrierTests):
    barriertype = staticmethod(threading.Barrier)
775

776
def test_main():
777
    test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests,
778 779 780 781 782
                              ConditionAsRLockTests, ConditionTests,
                              SemaphoreTests, BoundedSemaphoreTests,
                              ThreadTests,
                              ThreadJoinOnShutdown,
                              ThreadingExceptionTests,
Kristján Valur Jónsson's avatar
Kristján Valur Jónsson committed
783
                              BarrierTests
784
                              )
785 786 787

if __name__ == "__main__":
    test_main()