test_kqueue.py 8.81 KB
Newer Older
Christian Heimes's avatar
Christian Heimes committed
1 2 3 4
"""
Tests for kqueue wrapper.
"""
import errno
5
import os
Christian Heimes's avatar
Christian Heimes committed
6
import select
7 8
import socket
import time
Christian Heimes's avatar
Christian Heimes committed
9 10 11
import unittest

if not hasattr(select, "kqueue"):
Benjamin Peterson's avatar
Benjamin Peterson committed
12
    raise unittest.SkipTest("test works only on BSD")
Christian Heimes's avatar
Christian Heimes committed
13 14 15 16

class TestKQueue(unittest.TestCase):
    def test_create_queue(self):
        kq = select.kqueue()
17 18
        self.assertTrue(kq.fileno() > 0, kq.fileno())
        self.assertTrue(not kq.closed)
Christian Heimes's avatar
Christian Heimes committed
19
        kq.close()
20
        self.assertTrue(kq.closed)
Christian Heimes's avatar
Christian Heimes committed
21 22 23
        self.assertRaises(ValueError, kq.fileno)

    def test_create_event(self):
24
        from operator import lt, le, gt, ge
25 26 27 28

        fd = os.open(os.devnull, os.O_WRONLY)
        self.addCleanup(os.close, fd)

Christian Heimes's avatar
Christian Heimes committed
29 30 31 32 33 34 35 36 37 38
        ev = select.kevent(fd)
        other = select.kevent(1000)
        self.assertEqual(ev.ident, fd)
        self.assertEqual(ev.filter, select.KQ_FILTER_READ)
        self.assertEqual(ev.flags, select.KQ_EV_ADD)
        self.assertEqual(ev.fflags, 0)
        self.assertEqual(ev.data, 0)
        self.assertEqual(ev.udata, 0)
        self.assertEqual(ev, ev)
        self.assertNotEqual(ev, other)
39 40
        self.assertTrue(ev < other)
        self.assertTrue(other >= ev)
41 42 43 44
        for op in lt, le, gt, ge:
            self.assertRaises(TypeError, op, ev, None)
            self.assertRaises(TypeError, op, ev, 1)
            self.assertRaises(TypeError, op, ev, "ev")
Christian Heimes's avatar
Christian Heimes committed
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75

        ev = select.kevent(fd, select.KQ_FILTER_WRITE)
        self.assertEqual(ev.ident, fd)
        self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
        self.assertEqual(ev.flags, select.KQ_EV_ADD)
        self.assertEqual(ev.fflags, 0)
        self.assertEqual(ev.data, 0)
        self.assertEqual(ev.udata, 0)
        self.assertEqual(ev, ev)
        self.assertNotEqual(ev, other)

        ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
        self.assertEqual(ev.ident, fd)
        self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
        self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
        self.assertEqual(ev.fflags, 0)
        self.assertEqual(ev.data, 0)
        self.assertEqual(ev.udata, 0)
        self.assertEqual(ev, ev)
        self.assertNotEqual(ev, other)

        ev = select.kevent(1, 2, 3, 4, 5, 6)
        self.assertEqual(ev.ident, 1)
        self.assertEqual(ev.filter, 2)
        self.assertEqual(ev.flags, 3)
        self.assertEqual(ev.fflags, 4)
        self.assertEqual(ev.data, 5)
        self.assertEqual(ev.udata, 6)
        self.assertEqual(ev, ev)
        self.assertNotEqual(ev, other)

76 77
        bignum = 0x7fff
        ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum)
78 79 80 81
        self.assertEqual(ev.ident, bignum)
        self.assertEqual(ev.filter, 1)
        self.assertEqual(ev.flags, 2)
        self.assertEqual(ev.fflags, 3)
82
        self.assertEqual(ev.data, bignum - 1)
83 84 85 86
        self.assertEqual(ev.udata, bignum)
        self.assertEqual(ev, ev)
        self.assertNotEqual(ev, other)

87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
        # Issue 11973
        bignum = 0xffff
        ev = select.kevent(0, 1, bignum)
        self.assertEqual(ev.ident, 0)
        self.assertEqual(ev.filter, 1)
        self.assertEqual(ev.flags, bignum)
        self.assertEqual(ev.fflags, 0)
        self.assertEqual(ev.data, 0)
        self.assertEqual(ev.udata, 0)
        self.assertEqual(ev, ev)
        self.assertNotEqual(ev, other)

        # Issue 11973
        bignum = 0xffffffff
        ev = select.kevent(0, 1, 2, bignum)
        self.assertEqual(ev.ident, 0)
        self.assertEqual(ev.filter, 1)
        self.assertEqual(ev.flags, 2)
        self.assertEqual(ev.fflags, bignum)
        self.assertEqual(ev.data, 0)
        self.assertEqual(ev.udata, 0)
        self.assertEqual(ev, ev)
        self.assertNotEqual(ev, other)


Christian Heimes's avatar
Christian Heimes committed
112 113 114
    def test_queue_event(self):
        serverSocket = socket.socket()
        serverSocket.bind(('127.0.0.1', 0))
115
        serverSocket.listen()
Christian Heimes's avatar
Christian Heimes committed
116 117 118 119
        client = socket.socket()
        client.setblocking(False)
        try:
            client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
120
        except OSError as e:
121
            self.assertEqual(e.args[0], errno.EINPROGRESS)
Christian Heimes's avatar
Christian Heimes committed
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
        else:
            #raise AssertionError("Connect should have raised EINPROGRESS")
            pass # FreeBSD doesn't raise an exception here
        server, addr = serverSocket.accept()

        kq = select.kqueue()
        kq2 = select.kqueue.fromfd(kq.fileno())

        ev = select.kevent(server.fileno(),
                           select.KQ_FILTER_WRITE,
                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        kq.control([ev], 0)
        ev = select.kevent(server.fileno(),
                           select.KQ_FILTER_READ,
                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        kq.control([ev], 0)
        ev = select.kevent(client.fileno(),
                           select.KQ_FILTER_WRITE,
                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        kq2.control([ev], 0)
        ev = select.kevent(client.fileno(),
                           select.KQ_FILTER_READ,
                           select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        kq2.control([ev], 0)

        events = kq.control(None, 4, 1)
148 149 150 151
        events = set((e.ident, e.filter) for e in events)
        self.assertEqual(events, set([
            (client.fileno(), select.KQ_FILTER_WRITE),
            (server.fileno(), select.KQ_FILTER_WRITE)]))
Christian Heimes's avatar
Christian Heimes committed
152 153 154 155 156

        client.send(b"Hello!")
        server.send(b"world!!!")

        # We may need to call it several times
157 158
        for i in range(10):
            events = kq.control(None, 4, 1)
Christian Heimes's avatar
Christian Heimes committed
159 160
            if len(events) == 4:
                break
161 162 163 164
            time.sleep(1.0)
        else:
            self.fail('timeout waiting for event notifications')

165 166 167 168 169 170
        events = set((e.ident, e.filter) for e in events)
        self.assertEqual(events, set([
            (client.fileno(), select.KQ_FILTER_WRITE),
            (client.fileno(), select.KQ_FILTER_READ),
            (server.fileno(), select.KQ_FILTER_WRITE),
            (server.fileno(), select.KQ_FILTER_READ)]))
Christian Heimes's avatar
Christian Heimes committed
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186

        # Remove completely client, and server read part
        ev = select.kevent(client.fileno(),
                           select.KQ_FILTER_WRITE,
                           select.KQ_EV_DELETE)
        kq.control([ev], 0)
        ev = select.kevent(client.fileno(),
                           select.KQ_FILTER_READ,
                           select.KQ_EV_DELETE)
        kq.control([ev], 0)
        ev = select.kevent(server.fileno(),
                           select.KQ_FILTER_READ,
                           select.KQ_EV_DELETE)
        kq.control([ev], 0, 0)

        events = kq.control([], 4, 0.99)
187 188 189
        events = set((e.ident, e.filter) for e in events)
        self.assertEqual(events, set([
            (server.fileno(), select.KQ_FILTER_WRITE)]))
Christian Heimes's avatar
Christian Heimes committed
190 191 192 193 194

        client.close()
        server.close()
        serverSocket.close()

195 196 197 198 199 200 201 202 203 204
    def testPair(self):
        kq = select.kqueue()
        a, b = socket.socketpair()

        a.send(b'foo')
        event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
        r = kq.control([event1, event2], 1, 1)
        self.assertTrue(r)
        self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
205
        self.assertEqual(b.recv(r[0].data), b'foo')
206 207 208 209 210

        a.close()
        b.close()
        kq.close()

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
    def test_issue30058(self):
        # changelist must be an iterable
        kq = select.kqueue()
        a, b = socket.socketpair()
        ev = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)

        kq.control([ev], 0)
        # not a list
        kq.control((ev,), 0)
        # __len__ is not consistent with __iter__
        class BadList:
            def __len__(self):
                return 0
            def __iter__(self):
                for i in range(100):
                    yield ev
        kq.control(BadList(), 0)
        # doesn't have __len__
        kq.control(iter([ev]), 0)

        a.close()
        b.close()
        kq.close()

235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
    def test_close(self):
        open_file = open(__file__, "rb")
        self.addCleanup(open_file.close)
        fd = open_file.fileno()
        kqueue = select.kqueue()

        # test fileno() method and closed attribute
        self.assertIsInstance(kqueue.fileno(), int)
        self.assertFalse(kqueue.closed)

        # test close()
        kqueue.close()
        self.assertTrue(kqueue.closed)
        self.assertRaises(ValueError, kqueue.fileno)

        # close() can be called more than once
        kqueue.close()

        # operations must fail with ValueError("I/O operation on closed ...")
        self.assertRaises(ValueError, kqueue.control, None, 4)

256 257 258 259 260
    def test_fd_non_inheritable(self):
        kqueue = select.kqueue()
        self.addCleanup(kqueue.close)
        self.assertEqual(os.get_inheritable(kqueue.fileno()), False)

261

Christian Heimes's avatar
Christian Heimes committed
262
if __name__ == "__main__":
263
    unittest.main()