test_thread.py 5.2 KB
Newer Older
Christian Heimes's avatar
Christian Heimes committed
1 2
import os
import unittest
3
import random
4
from test import support
5
import _thread as thread
6 7
import time

Christian Heimes's avatar
Christian Heimes committed
8 9 10 11

NUMTASKS = 10
NUMTRIPS = 3

Christian Heimes's avatar
Christian Heimes committed
12 13
_print_mutex = thread.allocate_lock()

Christian Heimes's avatar
Christian Heimes committed
14 15
def verbose_print(arg):
    """Helper function for printing out debugging output."""
16
    if support.verbose:
Christian Heimes's avatar
Christian Heimes committed
17 18
        with _print_mutex:
            print(arg)
Christian Heimes's avatar
Christian Heimes committed
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41

class BasicThreadTest(unittest.TestCase):

    def setUp(self):
        self.done_mutex = thread.allocate_lock()
        self.done_mutex.acquire()
        self.running_mutex = thread.allocate_lock()
        self.random_mutex = thread.allocate_lock()
        self.running = 0
        self.next_ident = 0


class ThreadRunningTests(BasicThreadTest):

    def newtask(self):
        with self.running_mutex:
            self.next_ident += 1
            verbose_print("creating task %s" % self.next_ident)
            thread.start_new_thread(self.task, (self.next_ident,))
            self.running += 1

    def task(self, ident):
        with self.random_mutex:
Christian Heimes's avatar
Christian Heimes committed
42 43
            delay = random.random() / 10000.0
        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
Christian Heimes's avatar
Christian Heimes committed
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
        time.sleep(delay)
        verbose_print("task %s done" % ident)
        with self.running_mutex:
            self.running -= 1
            if self.running == 0:
                self.done_mutex.release()

    def test_starting_threads(self):
        # Basic test for thread creation.
        for i in range(NUMTASKS):
            self.newtask()
        verbose_print("waiting for tasks to complete...")
        self.done_mutex.acquire()
        verbose_print("all tasks done")

    def test_stack_size(self):
        # Various stack size tests.
        self.assertEquals(thread.stack_size(), 0, "intial stack size is not 0")

        thread.stack_size(0)
        self.assertEquals(thread.stack_size(), 0, "stack_size not reset to default")

        if os.name not in ("nt", "os2", "posix"):
            return

        tss_supported = True
        try:
            thread.stack_size(4096)
        except ValueError:
            verbose_print("caught expected ValueError setting "
                            "stack_size(4096)")
        except thread.error:
            tss_supported = False
            verbose_print("platform does not support changing thread stack "
                            "size")

        if tss_supported:
            fail_msg = "stack_size(%d) failed - should succeed"
            for tss in (262144, 0x100000, 0):
                thread.stack_size(tss)
                self.assertEquals(thread.stack_size(), tss, fail_msg % tss)
                verbose_print("successfully set stack_size(%d)" % tss)

            for tss in (262144, 0x100000):
                verbose_print("trying stack_size = (%d)" % tss)
                self.next_ident = 0
                for i in range(NUMTASKS):
                    self.newtask()

                verbose_print("waiting for all tasks to complete")
                self.done_mutex.acquire()
                verbose_print("all tasks done")

            thread.stack_size(0)


class Barrier:
    def __init__(self, num_threads):
        self.num_threads = num_threads
103
        self.waiting = 0
Christian Heimes's avatar
Christian Heimes committed
104 105 106
        self.checkin_mutex  = thread.allocate_lock()
        self.checkout_mutex = thread.allocate_lock()
        self.checkout_mutex.acquire()
107

108
    def enter(self):
Christian Heimes's avatar
Christian Heimes committed
109
        self.checkin_mutex.acquire()
110
        self.waiting = self.waiting + 1
Christian Heimes's avatar
Christian Heimes committed
111 112 113
        if self.waiting == self.num_threads:
            self.waiting = self.num_threads - 1
            self.checkout_mutex.release()
114
            return
Christian Heimes's avatar
Christian Heimes committed
115
        self.checkin_mutex.release()
116

Christian Heimes's avatar
Christian Heimes committed
117
        self.checkout_mutex.acquire()
118 119
        self.waiting = self.waiting - 1
        if self.waiting == 0:
Christian Heimes's avatar
Christian Heimes committed
120
            self.checkin_mutex.release()
121
            return
Christian Heimes's avatar
Christian Heimes committed
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
        self.checkout_mutex.release()


class BarrierTest(BasicThreadTest):

    def test_barrier(self):
        self.bar = Barrier(NUMTASKS)
        self.running = NUMTASKS
        for i in range(NUMTASKS):
            thread.start_new_thread(self.task2, (i,))
        verbose_print("waiting for tasks to end")
        self.done_mutex.acquire()
        verbose_print("tasks done")

    def task2(self, ident):
        for i in range(NUMTRIPS):
            if ident == 0:
                # give it a good chance to enter the next
                # barrier before the others are all out
                # of the current one
Christian Heimes's avatar
Christian Heimes committed
142
                delay = 0
Christian Heimes's avatar
Christian Heimes committed
143 144
            else:
                with self.random_mutex:
Christian Heimes's avatar
Christian Heimes committed
145 146 147
                    delay = random.random() / 10000.0
            verbose_print("task %s will run for %sus" %
                          (ident, round(delay * 1e6)))
Christian Heimes's avatar
Christian Heimes committed
148 149 150 151 152 153 154 155 156 157 158 159 160 161
            time.sleep(delay)
            verbose_print("task %s entering %s" % (ident, i))
            self.bar.enter()
            verbose_print("task %s leaving barrier" % ident)
        with self.running_mutex:
            self.running -= 1
            # Must release mutex before releasing done, else the main thread can
            # exit and set mutex to None as part of global teardown; then
            # mutex.release() raises AttributeError.
            finished = self.running == 0
        if finished:
            self.done_mutex.release()

def test_main():
162
    support.run_unittest(ThreadRunningTests, BarrierTest)
Christian Heimes's avatar
Christian Heimes committed
163 164 165

if __name__ == "__main__":
    test_main()