test_sched.py 6.43 KB
Newer Older
1
import queue
2 3 4 5
import sched
import time
import unittest
from test import support
6 7 8 9
try:
    import threading
except ImportError:
    threading = None
10

11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
TIMEOUT = 10


class Timer:
    def __init__(self):
        self._cond = threading.Condition()
        self._time = 0
        self._stop = 0

    def time(self):
        with self._cond:
            return self._time

    # increase the time but not beyond the established limit
    def sleep(self, t):
        assert t >= 0
        with self._cond:
            t += self._time
            while self._stop < t:
                self._time = self._stop
                self._cond.wait()
            self._time = t

    # advance time limit for user code
    def advance(self, t):
        assert t >= 0
        with self._cond:
            self._stop += t
            self._cond.notify_all()


42 43 44 45 46 47
class TestCase(unittest.TestCase):

    def test_enter(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
48
        for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
49 50
            z = scheduler.enter(x, 1, fun, (x,))
        scheduler.run()
51
        self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
52 53 54 55 56 57 58 59 60 61

    def test_enterabs(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
            z = scheduler.enterabs(x, 1, fun, (x,))
        scheduler.run()
        self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])

62 63
    @unittest.skipUnless(threading, 'Threading required for this test.')
    def test_enter_concurrent(self):
64 65 66 67 68 69
        q = queue.Queue()
        fun = q.put
        timer = Timer()
        scheduler = sched.scheduler(timer.time, timer.sleep)
        scheduler.enter(1, 1, fun, (1,))
        scheduler.enter(3, 1, fun, (3,))
70 71
        t = threading.Thread(target=scheduler.run)
        t.start()
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 1)
        self.assertTrue(q.empty())
        for x in [4, 5, 2]:
            z = scheduler.enter(x - 1, 1, fun, (x,))
        timer.advance(2)
        self.assertEqual(q.get(timeout=TIMEOUT), 2)
        self.assertEqual(q.get(timeout=TIMEOUT), 3)
        self.assertTrue(q.empty())
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 4)
        self.assertTrue(q.empty())
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 5)
        self.assertTrue(q.empty())
        timer.advance(1000)
        t.join(timeout=TIMEOUT)
        self.assertFalse(t.is_alive())
        self.assertTrue(q.empty())
        self.assertEqual(timer.time(), 5)
92

93 94 95 96 97
    def test_priority(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        for priority in [1, 2, 3, 4, 5]:
98
            z = scheduler.enterabs(0.01, priority, fun, (priority,))
99 100 101 102 103 104 105
        scheduler.run()
        self.assertEqual(l, [1, 2, 3, 4, 5])

    def test_cancel(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
106 107 108 109 110 111
        now = time.time()
        event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
        event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
        event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
        event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
        event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
112 113 114 115 116
        scheduler.cancel(event1)
        scheduler.cancel(event5)
        scheduler.run()
        self.assertEqual(l, [0.02, 0.03, 0.04])

117 118
    @unittest.skipUnless(threading, 'Threading required for this test.')
    def test_cancel_concurrent(self):
119 120 121 122 123 124 125 126 127 128
        q = queue.Queue()
        fun = q.put
        timer = Timer()
        scheduler = sched.scheduler(timer.time, timer.sleep)
        now = timer.time()
        event1 = scheduler.enterabs(now + 1, 1, fun, (1,))
        event2 = scheduler.enterabs(now + 2, 1, fun, (2,))
        event4 = scheduler.enterabs(now + 4, 1, fun, (4,))
        event5 = scheduler.enterabs(now + 5, 1, fun, (5,))
        event3 = scheduler.enterabs(now + 3, 1, fun, (3,))
129 130
        t = threading.Thread(target=scheduler.run)
        t.start()
131 132 133 134
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 1)
        self.assertTrue(q.empty())
        scheduler.cancel(event2)
135
        scheduler.cancel(event5)
136 137 138 139 140 141 142 143 144 145 146 147 148
        timer.advance(1)
        self.assertTrue(q.empty())
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 3)
        self.assertTrue(q.empty())
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 4)
        self.assertTrue(q.empty())
        timer.advance(1000)
        t.join(timeout=TIMEOUT)
        self.assertFalse(t.is_alive())
        self.assertTrue(q.empty())
        self.assertEqual(timer.time(), 4)
149

150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
    def test_empty(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        self.assertTrue(scheduler.empty())
        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
            z = scheduler.enterabs(x, 1, fun, (x,))
        self.assertFalse(scheduler.empty())
        scheduler.run()
        self.assertTrue(scheduler.empty())

    def test_queue(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
165 166 167 168 169 170
        now = time.time()
        e5 = scheduler.enterabs(now + 0.05, 1, fun)
        e1 = scheduler.enterabs(now + 0.01, 1, fun)
        e2 = scheduler.enterabs(now + 0.02, 1, fun)
        e4 = scheduler.enterabs(now + 0.04, 1, fun)
        e3 = scheduler.enterabs(now + 0.03, 1, fun)
171 172
        # queue property is supposed to return an order list of
        # upcoming events
173
        self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
174

Giampaolo Rodola''s avatar
Giampaolo Rodola' committed
175 176 177 178 179 180 181 182 183 184 185 186
    def test_args_kwargs(self):
        flag = []

        def fun(*a, **b):
            flag.append(None)
            self.assertEqual(a, (1,2,3))
            self.assertEqual(b, {"foo":1})

        scheduler = sched.scheduler(time.time, time.sleep)
        z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1})
        scheduler.run()
        self.assertEqual(flag, [None])
187

188 189 190 191 192 193 194 195 196 197
    def test_run_non_blocking(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        for x in [10, 9, 8, 7, 6]:
            scheduler.enter(x, 1, fun, (x,))
        scheduler.run(blocking=False)
        self.assertEqual(l, [])


198
if __name__ == "__main__":
199
    unittest.main()