test_dummy_thread.py 7 KB
Newer Older
1 2 3 4 5 6 7
"""Generic thread tests.

Meant to be used by dummy_thread and thread.  To allow for different modules
to be used, test_main() can be called with the module to use as the thread
implementation as its sole argument.

"""
8
import _dummy_thread as _thread
9
import time
10
import queue
11 12
import random
import unittest
13
from test import support
14

15 16
DELAY = 0 # Set > 0 when testing a module other than _dummy_thread, such as
          # the '_thread' module.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35

class LockTests(unittest.TestCase):
    """Test lock objects."""

    def setUp(self):
        # Create a lock
        self.lock = _thread.allocate_lock()

    def test_initlock(self):
        #Make sure locks start locked
        self.failUnless(not self.lock.locked(),
                        "Lock object is not initialized unlocked.")

    def test_release(self):
        # Test self.lock.release()
        self.lock.acquire()
        self.lock.release()
        self.failUnless(not self.lock.locked(),
                        "Lock object did not release properly.")
Tim Peters's avatar
Tim Peters committed
36

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
    def test_improper_release(self):
        #Make sure release of an unlocked thread raises _thread.error
        self.failUnlessRaises(_thread.error, self.lock.release)

    def test_cond_acquire_success(self):
        #Make sure the conditional acquiring of the lock works.
        self.failUnless(self.lock.acquire(0),
                        "Conditional acquiring of the lock failed.")

    def test_cond_acquire_fail(self):
        #Test acquiring locked lock returns False
        self.lock.acquire(0)
        self.failUnless(not self.lock.acquire(0),
                        "Conditional acquiring of a locked lock incorrectly "
                         "succeeded.")

    def test_uncond_acquire_success(self):
        #Make sure unconditional acquiring of a lock works.
        self.lock.acquire()
        self.failUnless(self.lock.locked(),
                        "Uncondional locking failed.")

    def test_uncond_acquire_return_val(self):
        #Make sure that an unconditional locking returns True.
        self.failUnless(self.lock.acquire(1) is True,
                        "Unconditional locking did not return True.")
63
        self.failUnless(self.lock.acquire() is True)
Tim Peters's avatar
Tim Peters committed
64

65 66 67 68 69 70 71 72 73
    def test_uncond_acquire_blocking(self):
        #Make sure that unconditional acquiring of a locked lock blocks.
        def delay_unlock(to_unlock, delay):
            """Hold on to lock for a set amount of time before unlocking."""
            time.sleep(delay)
            to_unlock.release()

        self.lock.acquire()
        start_time = int(time.time())
74
        _thread.start_new_thread(delay_unlock,(self.lock, DELAY))
75
        if support.verbose:
76 77 78
            print()
            print("*** Waiting for thread to release the lock "\
            "(approx. %s sec.) ***" % DELAY)
79 80
        self.lock.acquire()
        end_time = int(time.time())
81
        if support.verbose:
82
            print("done")
83
        self.failUnless((end_time - start_time) >= DELAY,
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
                        "Blocking by unconditional acquiring failed.")

class MiscTests(unittest.TestCase):
    """Miscellaneous tests."""

    def test_exit(self):
        #Make sure _thread.exit() raises SystemExit
        self.failUnlessRaises(SystemExit, _thread.exit)

    def test_ident(self):
        #Test sanity of _thread.get_ident()
        self.failUnless(isinstance(_thread.get_ident(), int),
                        "_thread.get_ident() returned a non-integer")
        self.failUnless(_thread.get_ident() != 0,
                        "_thread.get_ident() returned 0")

    def test_LockType(self):
        #Make sure _thread.LockType is the same type as _thread.allocate_locke()
        self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType),
                        "_thread.LockType is not an instance of what is "
                         "returned by _thread.allocate_lock()")

106 107 108 109 110 111 112
    def test_interrupt_main(self):
        #Calling start_new_thread with a function that executes interrupt_main
        # should raise KeyboardInterrupt upon completion.
        def call_interrupt():
            _thread.interrupt_main()
        self.failUnlessRaises(KeyboardInterrupt, _thread.start_new_thread,
                              call_interrupt, tuple())
Tim Peters's avatar
Tim Peters committed
113

114 115 116 117
    def test_interrupt_in_main(self):
        # Make sure that if interrupt_main is called in main threat that
        # KeyboardInterrupt is raised instantly.
        self.failUnlessRaises(KeyboardInterrupt, _thread.interrupt_main)
118

119 120 121 122 123 124 125 126 127
class ThreadTests(unittest.TestCase):
    """Test thread creation."""

    def test_arg_passing(self):
        #Make sure that parameter passing works.
        def arg_tester(queue, arg1=False, arg2=False):
            """Use to test _thread.start_new_thread() passes args properly."""
            queue.put((arg1, arg2))

128
        testing_queue = queue.Queue(1)
129 130 131 132 133 134 135 136 137 138 139 140 141 142
        _thread.start_new_thread(arg_tester, (testing_queue, True, True))
        result = testing_queue.get()
        self.failUnless(result[0] and result[1],
                        "Argument passing for thread creation using tuple failed")
        _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
                                                       'arg1':True, 'arg2':True})
        result = testing_queue.get()
        self.failUnless(result[0] and result[1],
                        "Argument passing for thread creation using kwargs failed")
        _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
        result = testing_queue.get()
        self.failUnless(result[0] and result[1],
                        "Argument passing for thread creation using both tuple"
                        " and kwargs failed")
Tim Peters's avatar
Tim Peters committed
143

144 145 146 147 148 149
    def test_multi_creation(self):
        #Make sure multiple threads can be created.
        def queue_mark(queue, delay):
            """Wait for ``delay`` seconds and then put something into ``queue``"""
            time.sleep(delay)
            queue.put(_thread.get_ident())
Tim Peters's avatar
Tim Peters committed
150

151
        thread_count = 5
152
        testing_queue = queue.Queue(thread_count)
153
        if support.verbose:
154 155 156
            print()
            print("*** Testing multiple thread creation "\
            "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count))
157
        for count in range(thread_count):
158 159 160 161
            if DELAY:
                local_delay = round(random.random(), 1)
            else:
                local_delay = 0
162
            _thread.start_new_thread(queue_mark,
163 164
                                     (testing_queue, local_delay))
        time.sleep(DELAY)
165
        if support.verbose:
166
            print('done')
167
        self.failUnless(testing_queue.qsize() == thread_count,
Tim Peters's avatar
Tim Peters committed
168
                        "Not all %s threads executed properly after %s sec." %
169
                        (thread_count, DELAY))
170 171

def test_main(imported_module=None):
172
    global _thread, DELAY
173 174
    if imported_module:
        _thread = imported_module
175
        DELAY = 2
176
    if support.verbose:
177 178
        print()
        print("*** Using %s as _thread module ***" % _thread)
179
    support.run_unittest(LockTests, MiscTests, ThreadTests)
180 181 182

if __name__ == '__main__':
    test_main()