test_queue.py 12.8 KB
Newer Older
1
# Some simple queue module tests, plus some failure conditions
2
# to ensure the Queue locks remain stable.
3
import queue
4
import time
5
import unittest
6
from test import support
7
threading = support.import_module('threading')
8

9
QUEUE_SIZE = 5
10

11 12 13
def qfull(q):
    return q.maxsize > 0 and q.qsize() == q.maxsize

14
# A thread to run a function that unclogs a blocked Queue.
15 16 17 18 19 20
class _TriggerThread(threading.Thread):
    def __init__(self, fn, args):
        self.fn = fn
        self.args = args
        self.startedEvent = threading.Event()
        threading.Thread.__init__(self)
21

22
    def run(self):
23 24 25 26 27 28 29 30 31
        # The sleep isn't necessary, but is intended to give the blocking
        # function in the main thread a chance at actually blocking before
        # we unclog it.  But if the sleep is longer than the timeout-based
        # tests wait in their blocking functions, those tests will fail.
        # So we give them much longer timeout values compared to the
        # sleep here (I aimed at 10 seconds for blocking functions --
        # they should never actually wait that long - they should make
        # progress as soon as we call self.fn()).
        time.sleep(0.1)
32 33 34
        self.startedEvent.set()
        self.fn(*self.args)

35

36
# Execute a function that blocks, and in a separate thread, a function that
37 38 39 40 41 42 43 44 45 46 47 48
# triggers the release.  Returns the result of the blocking function.  Caution:
# block_func must guarantee to block until trigger_func is called, and
# trigger_func must guarantee to change queue state so that block_func can make
# enough progress to return.  In particular, a block_func that just raises an
# exception regardless of whether trigger_func is called will lead to
# timing-dependent sporadic failures, and one of those went rarely seen but
# undiagnosed for years.  Now block_func must be unexceptional.  If block_func
# is supposed to raise an exception, call do_exceptional_blocking_test()
# instead.

class BlockingTestMixin:

49 50 51
    def tearDown(self):
        self.t = None

52 53 54 55 56
    def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
        self.t = _TriggerThread(trigger_func, trigger_args)
        self.t.start()
        self.result = block_func(*block_args)
        # If block_func returned before our thread made the call, we failed!
57
        if not self.t.startedEvent.is_set():
58 59 60
            self.fail("blocking function '%r' appeared not to block" %
                      block_func)
        self.t.join(10) # make sure the thread terminates
61
        if self.t.is_alive():
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
            self.fail("trigger function '%r' appeared to not return" %
                      trigger_func)
        return self.result

    # Call this instead if block_func is supposed to raise an exception.
    def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
                                   trigger_args, expected_exception_class):
        self.t = _TriggerThread(trigger_func, trigger_args)
        self.t.start()
        try:
            try:
                block_func(*block_args)
            except expected_exception_class:
                raise
            else:
                self.fail("expected exception of kind %r" %
                                 expected_exception_class)
        finally:
            self.t.join(10) # make sure the thread terminates
81
            if self.t.is_alive():
82 83
                self.fail("trigger function '%r' appeared to not return" %
                                 trigger_func)
84
            if not self.t.startedEvent.is_set():
85 86 87
                self.fail("trigger thread ended but event never set")


88
class BaseQueueTestMixin(BlockingTestMixin):
89 90 91 92 93 94 95
    def setUp(self):
        self.cum = 0
        self.cumlock = threading.Lock()

    def simple_queue_test(self, q):
        if q.qsize():
            raise RuntimeError("Call this function with an empty queue")
96 97
        self.assertTrue(q.empty())
        self.assertFalse(q.full())
98 99 100 101 102 103 104 105
        # I guess we better check things actually queue correctly a little :)
        q.put(111)
        q.put(333)
        q.put(222)
        target_order = dict(Queue = [111, 333, 222],
                            LifoQueue = [222, 333, 111],
                            PriorityQueue = [111, 222, 333])
        actual_order = [q.get(), q.get(), q.get()]
106 107
        self.assertEqual(actual_order, target_order[q.__class__.__name__],
                         "Didn't seem to queue the correct data!")
108 109
        for i in range(QUEUE_SIZE-1):
            q.put(i)
110 111
            self.assertTrue(q.qsize(), "Queue should not be empty")
        self.assertTrue(not qfull(q), "Queue should not be full")
112 113 114
        last = 2 * QUEUE_SIZE
        full = 3 * 2 * QUEUE_SIZE
        q.put(last)
115
        self.assertTrue(qfull(q), "Queue should be full")
116 117
        self.assertFalse(q.empty())
        self.assertTrue(q.full())
118 119 120
        try:
            q.put(full, block=0)
            self.fail("Didn't appear to block with a full queue")
121
        except queue.Full:
122 123 124 125
            pass
        try:
            q.put(full, timeout=0.01)
            self.fail("Didn't appear to time-out with a full queue")
126
        except queue.Full:
127 128 129 130 131 132 133
            pass
        # Test a blocking put
        self.do_blocking_test(q.put, (full,), q.get, ())
        self.do_blocking_test(q.put, (full, True, 10), q.get, ())
        # Empty it
        for i in range(QUEUE_SIZE):
            q.get()
134
        self.assertTrue(not q.qsize(), "Queue should be empty")
135
        try:
136 137
            q.get(block=0)
            self.fail("Didn't appear to block with an empty queue")
138
        except queue.Empty:
139 140 141 142
            pass
        try:
            q.get(timeout=0.01)
            self.fail("Didn't appear to time-out with an empty queue")
143
        except queue.Empty:
144 145 146 147 148 149 150 151 152
            pass
        # Test a blocking get
        self.do_blocking_test(q.get, (), q.put, ('empty',))
        self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))


    def worker(self, q):
        while True:
            x = q.get()
153
            if x < 0:
154 155 156 157 158 159 160 161 162 163 164 165 166
                q.task_done()
                return
            with self.cumlock:
                self.cum += x
            q.task_done()

    def queue_join_test(self, q):
        self.cum = 0
        for i in (0,1):
            threading.Thread(target=self.worker, args=(q,)).start()
        for i in range(100):
            q.put(i)
        q.join()
167 168
        self.assertEqual(self.cum, sum(range(100)),
                         "q.join() did not block until all tasks were done")
169 170
        for i in (0,1):
            q.put(-1)         # instruct the threads to close
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
        q.join()                # verify that you can join twice

    def test_queue_task_done(self):
        # Test to make sure a queue task completed successfully.
        q = self.type2test()
        try:
            q.task_done()
        except ValueError:
            pass
        else:
            self.fail("Did not detect task count going negative")

    def test_queue_join(self):
        # Test that a queue join()s successfully, and before anything else
        # (done twice for insurance).
        q = self.type2test()
        self.queue_join_test(q)
        self.queue_join_test(q)
        try:
            q.task_done()
        except ValueError:
            pass
193
        else:
194 195 196 197 198 199 200 201 202
            self.fail("Did not detect task count going negative")

    def test_simple_queue(self):
        # Do it a couple of times on the same queue.
        # Done twice to make sure works with same instance reused.
        q = self.type2test(QUEUE_SIZE)
        self.simple_queue_test(q)
        self.simple_queue_test(q)

203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
    def test_negative_timeout_raises_exception(self):
        q = self.type2test(QUEUE_SIZE)
        with self.assertRaises(ValueError):
            q.put(1, timeout=-1)
        with self.assertRaises(ValueError):
            q.get(1, timeout=-1)

    def test_nowait(self):
        q = self.type2test(QUEUE_SIZE)
        for i in range(QUEUE_SIZE):
            q.put_nowait(1)
        with self.assertRaises(queue.Full):
            q.put_nowait(1)

        for i in range(QUEUE_SIZE):
            q.get_nowait()
        with self.assertRaises(queue.Empty):
            q.get_nowait()

222 223 224 225 226 227 228 229 230 231 232 233
    def test_shrinking_queue(self):
        # issue 10110
        q = self.type2test(3)
        q.put(1)
        q.put(2)
        q.put(3)
        with self.assertRaises(queue.Full):
            q.put_nowait(4)
        self.assertEqual(q.qsize(), 3)
        q.maxsize = 2                       # shrink the queue
        with self.assertRaises(queue.Full):
            q.put_nowait(4)
234

235
class QueueTest(BaseQueueTestMixin, unittest.TestCase):
236
    type2test = queue.Queue
237

238
class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
239
    type2test = queue.LifoQueue
240

241
class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
242
    type2test = queue.PriorityQueue
243 244


245 246 247 248 249

# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
    pass

250
class FailingQueue(queue.Queue):
251 252 253
    def __init__(self, *args):
        self.fail_next_put = False
        self.fail_next_get = False
254
        queue.Queue.__init__(self, *args)
255 256 257
    def _put(self, item):
        if self.fail_next_put:
            self.fail_next_put = False
258
            raise FailingQueueException("You Lose")
259
        return queue.Queue._put(self, item)
260 261 262
    def _get(self):
        if self.fail_next_get:
            self.fail_next_get = False
263
            raise FailingQueueException("You Lose")
264
        return queue.Queue._get(self)
265

266
class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286

    def failing_queue_test(self, q):
        if q.qsize():
            raise RuntimeError("Call this function with an empty queue")
        for i in range(QUEUE_SIZE-1):
            q.put(i)
        # Test a failing non-blocking put.
        q.fail_next_put = True
        try:
            q.put("oops", block=0)
            self.fail("The queue didn't fail when it should have")
        except FailingQueueException:
            pass
        q.fail_next_put = True
        try:
            q.put("oops", timeout=0.1)
            self.fail("The queue didn't fail when it should have")
        except FailingQueueException:
            pass
        q.put("last")
287
        self.assertTrue(qfull(q), "Queue should be full")
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
        # Test a failing blocking put
        q.fail_next_put = True
        try:
            self.do_blocking_test(q.put, ("full",), q.get, ())
            self.fail("The queue didn't fail when it should have")
        except FailingQueueException:
            pass
        # Check the Queue isn't damaged.
        # put failed, but get succeeded - re-add
        q.put("last")
        # Test a failing timeout put
        q.fail_next_put = True
        try:
            self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
                                              FailingQueueException)
            self.fail("The queue didn't fail when it should have")
        except FailingQueueException:
            pass
        # Check the Queue isn't damaged.
        # put failed, but get succeeded - re-add
        q.put("last")
309
        self.assertTrue(qfull(q), "Queue should be full")
310
        q.get()
311
        self.assertTrue(not qfull(q), "Queue should not be full")
312
        q.put("last")
313
        self.assertTrue(qfull(q), "Queue should be full")
314 315 316 317 318
        # Test a blocking put
        self.do_blocking_test(q.put, ("full",), q.get, ())
        # Empty it
        for i in range(QUEUE_SIZE):
            q.get()
319
        self.assertTrue(not q.qsize(), "Queue should be empty")
320 321 322 323 324 325 326
        q.put("first")
        q.fail_next_get = True
        try:
            q.get()
            self.fail("The queue didn't fail when it should have")
        except FailingQueueException:
            pass
327
        self.assertTrue(q.qsize(), "Queue should not be empty")
328 329 330 331 332 333
        q.fail_next_get = True
        try:
            q.get(timeout=0.1)
            self.fail("The queue didn't fail when it should have")
        except FailingQueueException:
            pass
334
        self.assertTrue(q.qsize(), "Queue should not be empty")
335
        q.get()
336
        self.assertTrue(not q.qsize(), "Queue should be empty")
337
        q.fail_next_get = True
338
        try:
339 340 341 342 343 344
            self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
                                              FailingQueueException)
            self.fail("The queue didn't fail when it should have")
        except FailingQueueException:
            pass
        # put succeeded, but get failed.
345
        self.assertTrue(q.qsize(), "Queue should not be empty")
346
        q.get()
347
        self.assertTrue(not q.qsize(), "Queue should be empty")
348 349 350 351 352 353 354 355 356 357

    def test_failing_queue(self):
        # Test to make sure a queue is functioning correctly.
        # Done twice to the same instance.
        q = FailingQueue(QUEUE_SIZE)
        self.failing_queue_test(q)
        self.failing_queue_test(q)


def test_main():
358
    support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
359 360 361 362 363
                              FailingQueueTest)


if __name__ == "__main__":
    test_main()