test_thread.py 6.46 KB
Newer Older
1 2
import os
import unittest
3
import random
4
from test import test_support
5 6
import thread
import time
7
import sys
8

9
from test import lock_tests
10 11 12 13 14

NUMTASKS = 10
NUMTRIPS = 3


15 16
_print_mutex = thread.allocate_lock()

17 18 19
def verbose_print(arg):
    """Helper function for printing out debugging output."""
    if test_support.verbose:
20 21
        with _print_mutex:
            print arg
22 23 24 25 26 27 28 29 30


class BasicThreadTest(unittest.TestCase):

    def setUp(self):
        self.done_mutex = thread.allocate_lock()
        self.done_mutex.acquire()
        self.running_mutex = thread.allocate_lock()
        self.random_mutex = thread.allocate_lock()
31
        self.created = 0
32 33 34 35 36 37 38 39 40 41 42
        self.running = 0
        self.next_ident = 0


class ThreadRunningTests(BasicThreadTest):

    def newtask(self):
        with self.running_mutex:
            self.next_ident += 1
            verbose_print("creating task %s" % self.next_ident)
            thread.start_new_thread(self.task, (self.next_ident,))
43
            self.created += 1
44 45 46 47
            self.running += 1

    def task(self, ident):
        with self.random_mutex:
48 49
            delay = random.random() / 10000.0
        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
50 51 52 53
        time.sleep(delay)
        verbose_print("task %s done" % ident)
        with self.running_mutex:
            self.running -= 1
54
            if self.created == NUMTASKS and self.running == 0:
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
                self.done_mutex.release()

    def test_starting_threads(self):
        # Basic test for thread creation.
        for i in range(NUMTASKS):
            self.newtask()
        verbose_print("waiting for tasks to complete...")
        self.done_mutex.acquire()
        verbose_print("all tasks done")

    def test_stack_size(self):
        # Various stack size tests.
        self.assertEquals(thread.stack_size(), 0, "intial stack size is not 0")

        thread.stack_size(0)
        self.assertEquals(thread.stack_size(), 0, "stack_size not reset to default")

        if os.name not in ("nt", "os2", "posix"):
            return

        tss_supported = True
        try:
            thread.stack_size(4096)
        except ValueError:
            verbose_print("caught expected ValueError setting "
                            "stack_size(4096)")
        except thread.error:
            tss_supported = False
            verbose_print("platform does not support changing thread stack "
                            "size")

        if tss_supported:
            fail_msg = "stack_size(%d) failed - should succeed"
            for tss in (262144, 0x100000, 0):
                thread.stack_size(tss)
                self.assertEquals(thread.stack_size(), tss, fail_msg % tss)
                verbose_print("successfully set stack_size(%d)" % tss)

            for tss in (262144, 0x100000):
                verbose_print("trying stack_size = (%d)" % tss)
                self.next_ident = 0
96
                self.created = 0
97 98 99 100 101 102 103 104 105 106 107 108 109
                for i in range(NUMTASKS):
                    self.newtask()

                verbose_print("waiting for all tasks to complete")
                self.done_mutex.acquire()
                verbose_print("all tasks done")

            thread.stack_size(0)


class Barrier:
    def __init__(self, num_threads):
        self.num_threads = num_threads
110
        self.waiting = 0
111 112 113
        self.checkin_mutex  = thread.allocate_lock()
        self.checkout_mutex = thread.allocate_lock()
        self.checkout_mutex.acquire()
114

115
    def enter(self):
116
        self.checkin_mutex.acquire()
117
        self.waiting = self.waiting + 1
118 119 120
        if self.waiting == self.num_threads:
            self.waiting = self.num_threads - 1
            self.checkout_mutex.release()
121
            return
122
        self.checkin_mutex.release()
123

124
        self.checkout_mutex.acquire()
125 126
        self.waiting = self.waiting - 1
        if self.waiting == 0:
127
            self.checkin_mutex.release()
128
            return
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
        self.checkout_mutex.release()


class BarrierTest(BasicThreadTest):

    def test_barrier(self):
        self.bar = Barrier(NUMTASKS)
        self.running = NUMTASKS
        for i in range(NUMTASKS):
            thread.start_new_thread(self.task2, (i,))
        verbose_print("waiting for tasks to end")
        self.done_mutex.acquire()
        verbose_print("tasks done")

    def task2(self, ident):
        for i in range(NUMTRIPS):
            if ident == 0:
                # give it a good chance to enter the next
                # barrier before the others are all out
                # of the current one
149
                delay = 0
150 151
            else:
                with self.random_mutex:
152 153 154
                    delay = random.random() / 10000.0
            verbose_print("task %s will run for %sus" %
                          (ident, round(delay * 1e6)))
155 156 157 158 159 160 161 162 163 164 165 166 167 168
            time.sleep(delay)
            verbose_print("task %s entering %s" % (ident, i))
            self.bar.enter()
            verbose_print("task %s leaving barrier" % ident)
        with self.running_mutex:
            self.running -= 1
            # Must release mutex before releasing done, else the main thread can
            # exit and set mutex to None as part of global teardown; then
            # mutex.release() raises AttributeError.
            finished = self.running == 0
        if finished:
            self.done_mutex.release()


169 170 171 172
class LockTests(lock_tests.LockTests):
    locktype = thread.allocate_lock


173 174 175 176
class TestForkInThread(unittest.TestCase):
    def setUp(self):
        self.read_fd, self.write_fd = os.pipe()

177
    def _test_forkinthread(self):
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
        def thread1():
            try:
                pid = os.fork() # fork in a thread
            except RuntimeError:
                sys.exit(0) # exit the child

            if pid == 0: # child
                os.close(self.read_fd)
                os.write(self.write_fd, "OK")
                sys.exit(0)
            else: # parent
                os.close(self.write_fd)

        thread.start_new_thread(thread1, ())
        self.assertEqual(os.read(self.read_fd, 2), "OK",
                         "Unable to fork() in thread")

195
    if not sys.platform.startswith('win'):
196 197
        test_forkinthread = _test_forkinthread

198 199 200 201 202 203 204 205 206 207 208 209
    def tearDown(self):
        try:
            os.close(self.read_fd)
        except OSError:
            pass

        try:
            os.close(self.write_fd)
        except OSError:
            pass


210
def test_main():
211 212
    test_support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
                              TestForkInThread)
213 214 215

if __name__ == "__main__":
    test_main()