test_thread.py 5.21 KB
Newer Older
1 2
import os
import unittest
3
import random
4
from test import test_support
5 6 7
import thread
import time

8 9 10 11 12

NUMTASKS = 10
NUMTRIPS = 3


13 14
_print_mutex = thread.allocate_lock()

15 16 17
def verbose_print(arg):
    """Helper function for printing out debugging output."""
    if test_support.verbose:
18 19
        with _print_mutex:
            print arg
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43


class BasicThreadTest(unittest.TestCase):

    def setUp(self):
        self.done_mutex = thread.allocate_lock()
        self.done_mutex.acquire()
        self.running_mutex = thread.allocate_lock()
        self.random_mutex = thread.allocate_lock()
        self.running = 0
        self.next_ident = 0


class ThreadRunningTests(BasicThreadTest):

    def newtask(self):
        with self.running_mutex:
            self.next_ident += 1
            verbose_print("creating task %s" % self.next_ident)
            thread.start_new_thread(self.task, (self.next_ident,))
            self.running += 1

    def task(self, ident):
        with self.random_mutex:
44 45
            delay = random.random() / 10000.0
        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
        time.sleep(delay)
        verbose_print("task %s done" % ident)
        with self.running_mutex:
            self.running -= 1
            if self.running == 0:
                self.done_mutex.release()

    def test_starting_threads(self):
        # Basic test for thread creation.
        for i in range(NUMTASKS):
            self.newtask()
        verbose_print("waiting for tasks to complete...")
        self.done_mutex.acquire()
        verbose_print("all tasks done")

    def test_stack_size(self):
        # Various stack size tests.
        self.assertEquals(thread.stack_size(), 0, "intial stack size is not 0")

        thread.stack_size(0)
        self.assertEquals(thread.stack_size(), 0, "stack_size not reset to default")

        if os.name not in ("nt", "os2", "posix"):
            return

        tss_supported = True
        try:
            thread.stack_size(4096)
        except ValueError:
            verbose_print("caught expected ValueError setting "
                            "stack_size(4096)")
        except thread.error:
            tss_supported = False
            verbose_print("platform does not support changing thread stack "
                            "size")

        if tss_supported:
            fail_msg = "stack_size(%d) failed - should succeed"
            for tss in (262144, 0x100000, 0):
                thread.stack_size(tss)
                self.assertEquals(thread.stack_size(), tss, fail_msg % tss)
                verbose_print("successfully set stack_size(%d)" % tss)

            for tss in (262144, 0x100000):
                verbose_print("trying stack_size = (%d)" % tss)
                self.next_ident = 0
                for i in range(NUMTASKS):
                    self.newtask()

                verbose_print("waiting for all tasks to complete")
                self.done_mutex.acquire()
                verbose_print("all tasks done")

            thread.stack_size(0)


class Barrier:
    def __init__(self, num_threads):
        self.num_threads = num_threads
105
        self.waiting = 0
106 107 108
        self.checkin_mutex  = thread.allocate_lock()
        self.checkout_mutex = thread.allocate_lock()
        self.checkout_mutex.acquire()
109

110
    def enter(self):
111
        self.checkin_mutex.acquire()
112
        self.waiting = self.waiting + 1
113 114 115
        if self.waiting == self.num_threads:
            self.waiting = self.num_threads - 1
            self.checkout_mutex.release()
116
            return
117
        self.checkin_mutex.release()
118

119
        self.checkout_mutex.acquire()
120 121
        self.waiting = self.waiting - 1
        if self.waiting == 0:
122
            self.checkin_mutex.release()
123
            return
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
        self.checkout_mutex.release()


class BarrierTest(BasicThreadTest):

    def test_barrier(self):
        self.bar = Barrier(NUMTASKS)
        self.running = NUMTASKS
        for i in range(NUMTASKS):
            thread.start_new_thread(self.task2, (i,))
        verbose_print("waiting for tasks to end")
        self.done_mutex.acquire()
        verbose_print("tasks done")

    def task2(self, ident):
        for i in range(NUMTRIPS):
            if ident == 0:
                # give it a good chance to enter the next
                # barrier before the others are all out
                # of the current one
144
                delay = 0
145 146
            else:
                with self.random_mutex:
147 148 149
                    delay = random.random() / 10000.0
            verbose_print("task %s will run for %sus" %
                          (ident, round(delay * 1e6)))
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
            time.sleep(delay)
            verbose_print("task %s entering %s" % (ident, i))
            self.bar.enter()
            verbose_print("task %s leaving barrier" % ident)
        with self.running_mutex:
            self.running -= 1
            # Must release mutex before releasing done, else the main thread can
            # exit and set mutex to None as part of global teardown; then
            # mutex.release() raises AttributeError.
            finished = self.running == 0
        if finished:
            self.done_mutex.release()


def test_main():
    test_support.run_unittest(ThreadRunningTests, BarrierTest)

if __name__ == "__main__":
    test_main()