test_threading.py 19.7 KB
Newer Older
1 2
# Very rudimentary test of threading module

3
import test.test_support
4
from test.test_support import verbose
5
import random
6
import re
7
import sys
8 9
thread = test.test_support.import_module('thread')
threading = test.test_support.import_module('threading')
10
import time
11
import unittest
Jeffrey Yasskin's avatar
Jeffrey Yasskin committed
12
import weakref
13

14 15
from test import lock_tests

16 17 18 19 20 21 22 23 24 25
# A trivial mutable counter.
class Counter(object):
    def __init__(self):
        self.value = 0
    def inc(self):
        self.value += 1
    def dec(self):
        self.value -= 1
    def get(self):
        return self.value
26 27

class TestThread(threading.Thread):
28 29 30 31 32 33 34
    def __init__(self, name, testcase, sema, mutex, nrunning):
        threading.Thread.__init__(self, name=name)
        self.testcase = testcase
        self.sema = sema
        self.mutex = mutex
        self.nrunning = nrunning

35
    def run(self):
36
        delay = random.random() / 10000.0
37
        if verbose:
38
            print 'task %s will run for %.1f usec' % (
39
                self.name, delay * 1e6)
40

41 42 43 44 45
        with self.sema:
            with self.mutex:
                self.nrunning.inc()
                if verbose:
                    print self.nrunning.get(), 'tasks are running'
46
                self.testcase.assertTrue(self.nrunning.get() <= 3)
47

48 49
            time.sleep(delay)
            if verbose:
50
                print 'task', self.name, 'done'
51

52 53
            with self.mutex:
                self.nrunning.dec()
54
                self.testcase.assertTrue(self.nrunning.get() >= 0)
55 56
                if verbose:
                    print '%s is finished. %d tasks are running' % (
57
                        self.name, self.nrunning.get())
58

59 60 61 62 63 64 65 66 67 68
class BaseTestCase(unittest.TestCase):
    def setUp(self):
        self._threads = test.test_support.threading_setup()

    def tearDown(self):
        test.test_support.threading_cleanup(*self._threads)
        test.test_support.reap_children()


class ThreadTests(BaseTestCase):
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86

    # Create a bunch of threads, let each do some work, wait until all are
    # done.
    def test_various_ops(self):
        # This takes about n/3 seconds to run (about n/3 clumps of tasks,
        # times about 1 second per clump).
        NUMTASKS = 10

        # no more than 3 of the 10 can run at once
        sema = threading.BoundedSemaphore(value=3)
        mutex = threading.RLock()
        numrunning = Counter()

        threads = []

        for i in range(NUMTASKS):
            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
            threads.append(t)
87 88
            self.assertEqual(t.ident, None)
            self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
89 90 91 92 93
            t.start()

        if verbose:
            print 'waiting for all tasks to complete'
        for t in threads:
94
            t.join(NUMTASKS)
95 96
            self.assertTrue(not t.is_alive())
            self.assertNotEqual(t.ident, 0)
97
            self.assertFalse(t.ident is None)
98
            self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
99 100 101 102
        if verbose:
            print 'all tasks done'
        self.assertEqual(numrunning.get(), 0)

103 104 105 106 107 108 109 110 111 112 113
    def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertFalse(threading.currentThread().ident is None)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        thread.start_new_thread(f, ())
        done.wait()
        self.assertFalse(ident[0] is None)
114 115
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]
116

117
    # run with a small(ish) thread stack size (256kB)
118 119
    def test_various_ops_small_stack(self):
        if verbose:
120
            print 'with 256kB thread stack size...'
121 122 123 124 125 126
        try:
            threading.stack_size(262144)
        except thread.error:
            if verbose:
                print 'platform does not support changing thread stack size'
            return
127 128 129 130 131 132 133
        self.test_various_ops()
        threading.stack_size(0)

    # run with a large thread stack size (1MB)
    def test_various_ops_large_stack(self):
        if verbose:
            print 'with 1MB thread stack size...'
134 135 136 137 138 139
        try:
            threading.stack_size(0x100000)
        except thread.error:
            if verbose:
                print 'platform does not support changing thread stack size'
            return
140 141 142
        self.test_various_ops()
        threading.stack_size(0)

143 144 145
    def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
146
            # Calling current_thread() forces an entry for the foreign
147
            # thread to get made in the threading._active map.
148
            threading.current_thread()
149 150 151 152 153 154 155
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
156
        self.assertIn(tid, threading._active)
157
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
158
        del threading._active[tid]
159

Tim Peters's avatar
Tim Peters committed
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
    def test_PyThreadState_SetAsyncExc(self):
        try:
            import ctypes
        except ImportError:
            if verbose:
                print "test_PyThreadState_SetAsyncExc can't import ctypes"
            return  # can't do anything

        set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc

        class AsyncExc(Exception):
            pass

        exception = ctypes.py_object(AsyncExc)

177 178 179 180 181 182 183 184 185 186 187 188
        # First check it works when setting the exception from the same thread.
        tid = thread.get_ident()

        try:
            result = set_async_exc(ctypes.c_long(tid), exception)
            # The exception is async, so we might have to keep the VM busy until
            # it notices.
            while True:
                pass
        except AsyncExc:
            pass
        else:
189 190
            # This code is unreachable but it reflects the intent. If we wanted
            # to be smarter the above loop wouldn't be infinite.
191 192 193 194
            self.fail("AsyncExc not raised")
        try:
            self.assertEqual(result, 1) # one thread state modified
        except UnboundLocalError:
195
            # The exception was raised too quickly for us to get the result.
196 197
            pass

Tim Peters's avatar
Tim Peters committed
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
        # `worker_started` is set by the thread when it's inside a try/except
        # block waiting to catch the asynchronously set AsyncExc exception.
        # `worker_saw_exception` is set by the thread upon catching that
        # exception.
        worker_started = threading.Event()
        worker_saw_exception = threading.Event()

        class Worker(threading.Thread):
            def run(self):
                self.id = thread.get_ident()
                self.finished = False

                try:
                    while True:
                        worker_started.set()
                        time.sleep(0.1)
                except AsyncExc:
                    self.finished = True
                    worker_saw_exception.set()

        t = Worker()
219
        t.daemon = True # so if this fails, we don't hang Python at shutdown
220
        t.start()
Tim Peters's avatar
Tim Peters committed
221 222 223 224 225 226
        if verbose:
            print "    started worker thread"

        # Try a thread id that doesn't make sense.
        if verbose:
            print "    trying nonsensical thread id"
227
        result = set_async_exc(ctypes.c_long(-1), exception)
Tim Peters's avatar
Tim Peters committed
228 229 230 231 232
        self.assertEqual(result, 0)  # no thread states modified

        # Now raise an exception in the worker thread.
        if verbose:
            print "    waiting for worker thread to get started"
233 234
        ret = worker_started.wait()
        self.assertTrue(ret)
Tim Peters's avatar
Tim Peters committed
235 236
        if verbose:
            print "    verifying worker hasn't exited"
237
        self.assertTrue(not t.finished)
Tim Peters's avatar
Tim Peters committed
238 239
        if verbose:
            print "    attempting to raise asynch exception in worker"
240
        result = set_async_exc(ctypes.c_long(t.id), exception)
Tim Peters's avatar
Tim Peters committed
241 242 243 244
        self.assertEqual(result, 1) # one thread state modified
        if verbose:
            print "    waiting for worker to say it caught the exception"
        worker_saw_exception.wait(timeout=10)
245
        self.assertTrue(t.finished)
Tim Peters's avatar
Tim Peters committed
246 247 248 249 250 251
        if verbose:
            print "    all OK -- joining worker"
        if t.finished:
            t.join()
        # else the thread is still running, and we have no way to kill it

252 253 254 255 256 257 258 259
    def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise thread.error()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
260 261 262 263
            self.assertRaises(thread.error, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
264 265 266
        finally:
            threading._start_new_thread = _start_new_thread

267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
    def test_finalize_runnning_thread(self):
        # Issue 1402: the PyGILState_Ensure / _Release functions may be called
        # very late on python exit: on deallocation of a running thread for
        # example.
        try:
            import ctypes
        except ImportError:
            if verbose:
                print("test_finalize_with_runnning_thread can't import ctypes")
            return  # can't do anything

        import subprocess
        rc = subprocess.call([sys.executable, "-c", """if 1:
            import ctypes, sys, time, thread

282 283 284 285
            # This lock is used as a simple event variable.
            ready = thread.allocate_lock()
            ready.acquire()

286 287 288 289 290 291 292 293 294 295 296
            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
297
                ready.release()
298 299 300
                time.sleep(100)

            thread.start_new_thread(waitingThread, ())
301
            ready.acquire()  # Be sure the other thread is waiting.
302 303 304 305
            sys.exit(42)
            """])
        self.assertEqual(rc, 42)

306 307 308 309
    def test_finalize_with_trace(self):
        # Issue1733757
        # Avoid a deadlock when sys.settrace steps into threading._shutdown
        import subprocess
310
        p = subprocess.Popen([sys.executable, "-c", """if 1:
311 312 313 314 315 316 317 318 319 320
            import sys, threading

            # A deadlock-killer, to prevent the
            # testsuite to hang forever
            def killer():
                import os, time
                time.sleep(2)
                print 'program blocked; aborting'
                os._exit(2)
            t = threading.Thread(target=killer)
321
            t.daemon = True
322 323 324 325
            t.start()

            # This is the trace function
            def func(frame, event, arg):
326
                threading.current_thread()
327 328 329
                return func

            sys.settrace(func)
330 331 332
            """],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
333 334
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
335 336
        stdout, stderr = p.communicate()
        rc = p.returncode
337
        self.assertFalse(rc == 2, "interpreted was blocked")
338 339
        self.assertTrue(rc == 0,
                        "Unexpected error: " + repr(stderr))
340

341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
    def test_join_nondaemon_on_shutdown(self):
        # Issue 1722344
        # Raising SystemExit skipped threading._shutdown
        import subprocess
        p = subprocess.Popen([sys.executable, "-c", """if 1:
                import threading
                from time import sleep

                def child():
                    sleep(1)
                    # As a non-daemon thread we SHOULD wake up and nothing
                    # should be torn down yet
                    print "Woke up, sleep function is:", sleep

                threading.Thread(target=child).start()
                raise SystemExit
            """],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
360 361
        self.addCleanup(p.stdout.close)
        self.addCleanup(p.stderr.close)
362
        stdout, stderr = p.communicate()
363 364
        self.assertEqual(stdout.strip(),
            "Woke up, sleep function is: <built-in function sleep>")
365
        stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
366 367
        self.assertEqual(stderr, "")

368 369 370 371 372 373
    def test_enumerate_after_join(self):
        # Try hard to trigger #1703448: a thread is still returned in
        # threading.enumerate() after it has been join()ed.
        enum = threading.enumerate
        old_interval = sys.getcheckinterval()
        try:
374 375 376 377
            for i in xrange(1, 100):
                # Try a couple times at each thread-switching interval
                # to get more interleavings.
                sys.setcheckinterval(i // 5)
378 379 380 381
                t = threading.Thread(target=lambda: None)
                t.start()
                t.join()
                l = enum()
382
                self.assertNotIn(t, l,
383 384 385 386
                    "#1703448 triggered after %d trials: %s" % (i, l))
        finally:
            sys.setcheckinterval(old_interval)

Jeffrey Yasskin's avatar
Jeffrey Yasskin committed
387 388
    def test_no_refcycle_through_target(self):
        class RunSelfFunction(object):
389
            def __init__(self, should_raise):
Jeffrey Yasskin's avatar
Jeffrey Yasskin committed
390 391
                # The links in this refcycle from Thread back to self
                # should be cleaned up when the thread completes.
392
                self.should_raise = should_raise
Jeffrey Yasskin's avatar
Jeffrey Yasskin committed
393 394 395 396 397 398
                self.thread = threading.Thread(target=self._run,
                                               args=(self,),
                                               kwargs={'yet_another':self})
                self.thread.start()

            def _run(self, other_ref, yet_another):
399 400
                if self.should_raise:
                    raise SystemExit
Jeffrey Yasskin's avatar
Jeffrey Yasskin committed
401

402
        cyclic_object = RunSelfFunction(should_raise=False)
Jeffrey Yasskin's avatar
Jeffrey Yasskin committed
403 404 405
        weak_cyclic_object = weakref.ref(cyclic_object)
        cyclic_object.thread.join()
        del cyclic_object
406 407 408
        self.assertEqual(None, weak_cyclic_object(),
                         msg=('%d references still around' %
                              sys.getrefcount(weak_cyclic_object())))
Jeffrey Yasskin's avatar
Jeffrey Yasskin committed
409

410 411 412 413
        raising_cyclic_object = RunSelfFunction(should_raise=True)
        weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
        raising_cyclic_object.thread.join()
        del raising_cyclic_object
414 415 416
        self.assertEqual(None, weak_raising_cyclic_object(),
                         msg=('%d references still around' %
                              sys.getrefcount(weak_raising_cyclic_object())))
417

418

419
class ThreadJoinOnShutdown(BaseTestCase):
420 421 422 423 424 425 426 427 428 429 430 431 432 433

    def _run_and_join(self, script):
        script = """if 1:
            import sys, os, time, threading

            # a thread, which waits for the main program to terminate
            def joiningfunc(mainthread):
                mainthread.join()
                print 'end of thread'
        \n""" + script

        import subprocess
        p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
        rc = p.wait()
434
        data = p.stdout.read().replace('\r', '')
435
        p.stdout.close()
436
        self.assertEqual(data, "end of main\nend of thread\n")
437 438
        self.assertFalse(rc == 2, "interpreter was blocked")
        self.assertTrue(rc == 0, "Unexpected error")
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476

    def test_1_join_on_shutdown(self):
        # The usual case: on exit, wait for a non-daemon thread
        script = """if 1:
            import os
            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            time.sleep(0.1)
            print 'end of main'
            """
        self._run_and_join(script)


    def test_2_join_in_forked_process(self):
        # Like the test above, but from a forked interpreter
        import os
        if not hasattr(os, 'fork'):
            return
        script = """if 1:
            childpid = os.fork()
            if childpid != 0:
                os.waitpid(childpid, 0)
                sys.exit(0)

            t = threading.Thread(target=joiningfunc,
                                 args=(threading.current_thread(),))
            t.start()
            print 'end of main'
            """
        self._run_and_join(script)

    def test_3_join_in_forked_from_thread(self):
        # Like the test above, but fork() was called from a worker thread
        # In the forked process, the main Thread object must be marked as stopped.
        import os
        if not hasattr(os, 'fork'):
            return
477 478
        # Skip platforms with known problems forking from a worker thread.
        # See http://bugs.python.org/issue3863.
479 480
        if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
                           'os2emx'):
481 482 483
            print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread'
                                 ' due to known OS bugs on'), sys.platform
            return
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
        script = """if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print 'end of main'
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            """
        self._run_and_join(script)


504
class ThreadingExceptionTests(BaseTestCase):
505 506 507 508 509 510 511 512
    # A RuntimeError should be raised if Thread.start() is called
    # multiple times.
    def test_start_thread_again(self):
        thread = threading.Thread()
        thread.start()
        self.assertRaises(RuntimeError, thread.start)

    def test_joining_current_thread(self):
513 514
        current_thread = threading.current_thread()
        self.assertRaises(RuntimeError, current_thread.join);
515 516 517 518 519 520 521 522

    def test_joining_inactive_thread(self):
        thread = threading.Thread()
        self.assertRaises(RuntimeError, thread.join)

    def test_daemonize_active_thread(self):
        thread = threading.Thread()
        thread.start()
523
        self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
524 525


526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
class LockTests(lock_tests.LockTests):
    locktype = staticmethod(threading.Lock)

class RLockTests(lock_tests.RLockTests):
    locktype = staticmethod(threading.RLock)

class EventTests(lock_tests.EventTests):
    eventtype = staticmethod(threading.Event)

class ConditionAsRLockTests(lock_tests.RLockTests):
    # An Condition uses an RLock by default and exports its API.
    locktype = staticmethod(threading.Condition)

class ConditionTests(lock_tests.ConditionTests):
    condtype = staticmethod(threading.Condition)

class SemaphoreTests(lock_tests.SemaphoreTests):
    semtype = staticmethod(threading.Semaphore)

class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
    semtype = staticmethod(threading.BoundedSemaphore)


549
def test_main():
550 551 552 553
    test.test_support.run_unittest(LockTests, RLockTests, EventTests,
                                   ConditionAsRLockTests, ConditionTests,
                                   SemaphoreTests, BoundedSemaphoreTests,
                                   ThreadTests,
554 555 556
                                   ThreadJoinOnShutdown,
                                   ThreadingExceptionTests,
                                   )
557 558 559

if __name__ == "__main__":
    test_main()