test_epoll.py 7.32 KB
Newer Older
Christian Heimes's avatar
Christian Heimes committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
# Copyright (c) 2001-2006 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""
Tests for epoll wrapper.
"""
import socket
import errno
import time
import select
import unittest

30
from test import support
Christian Heimes's avatar
Christian Heimes committed
31
if not hasattr(select, "epoll"):
Benjamin Peterson's avatar
Benjamin Peterson committed
32
    raise unittest.SkipTest("test works only on Linux 2.6")
Christian Heimes's avatar
Christian Heimes committed
33

Christian Heimes's avatar
Christian Heimes committed
34 35 36 37
try:
    select.epoll()
except IOError as e:
    if e.errno == errno.ENOSYS:
Benjamin Peterson's avatar
Benjamin Peterson committed
38
        raise unittest.SkipTest("kernel doesn't support epoll()")
Christian Heimes's avatar
Christian Heimes committed
39

Christian Heimes's avatar
Christian Heimes committed
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
class TestEPoll(unittest.TestCase):

    def setUp(self):
        self.serverSocket = socket.socket()
        self.serverSocket.bind(('127.0.0.1', 0))
        self.serverSocket.listen(1)
        self.connections = [self.serverSocket]


    def tearDown(self):
        for skt in self.connections:
            skt.close()

    def _connected_pair(self):
        client = socket.socket()
        client.setblocking(False)
        try:
            client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
        except socket.error as e:
            self.assertEquals(e.args[0], errno.EINPROGRESS)
        else:
            raise AssertionError("Connect should have raised EINPROGRESS")
        server, addr = self.serverSocket.accept()

        self.connections.extend((client, server))
        return client, server

    def test_create(self):
        try:
            ep = select.epoll(16)
        except OSError as e:
            raise AssertionError(str(e))
72 73
        self.assertTrue(ep.fileno() > 0, ep.fileno())
        self.assertTrue(not ep.closed)
Christian Heimes's avatar
Christian Heimes committed
74
        ep.close()
75
        self.assertTrue(ep.closed)
Christian Heimes's avatar
Christian Heimes committed
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
        self.assertRaises(ValueError, ep.fileno)

    def test_badcreate(self):
        self.assertRaises(TypeError, select.epoll, 1, 2, 3)
        self.assertRaises(TypeError, select.epoll, 'foo')
        self.assertRaises(TypeError, select.epoll, None)
        self.assertRaises(TypeError, select.epoll, ())
        self.assertRaises(TypeError, select.epoll, ['foo'])
        self.assertRaises(TypeError, select.epoll, {})

    def test_add(self):
        server, client = self._connected_pair()

        ep = select.epoll(2)
        try:
            ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
            ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
        finally:
            ep.close()

96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
        # adding by object w/ fileno works, too.
        ep = select.epoll(2)
        try:
            ep.register(server, select.EPOLLIN | select.EPOLLOUT)
            ep.register(client, select.EPOLLIN | select.EPOLLOUT)
        finally:
            ep.close()

        ep = select.epoll(2)
        try:
            # TypeError: argument must be an int, or have a fileno() method.
            self.assertRaises(TypeError, ep.register, object(),
                select.EPOLLIN | select.EPOLLOUT)
            self.assertRaises(TypeError, ep.register, None,
                select.EPOLLIN | select.EPOLLOUT)
            # ValueError: file descriptor cannot be a negative integer (-1)
            self.assertRaises(ValueError, ep.register, -1,
                select.EPOLLIN | select.EPOLLOUT)
            # IOError: [Errno 9] Bad file descriptor
            self.assertRaises(IOError, ep.register, 10000,
                select.EPOLLIN | select.EPOLLOUT)
            # registering twice also raises an exception
            ep.register(server, select.EPOLLIN | select.EPOLLOUT)
            self.assertRaises(IOError, ep.register, server,
                select.EPOLLIN | select.EPOLLOUT)
        finally:
            ep.close()

Christian Heimes's avatar
Christian Heimes committed
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    def test_fromfd(self):
        server, client = self._connected_pair()

        ep = select.epoll(2)
        ep2 = select.epoll.fromfd(ep.fileno())

        ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
        ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)

        events = ep.poll(1, 4)
        events2 = ep2.poll(0.9, 4)
        self.assertEqual(len(events), 2)
        self.assertEqual(len(events2), 2)

        ep.close()
        try:
            ep2.poll(1, 4)
        except IOError as e:
142
            self.assertEqual(e.args[0], errno.EBADF, e)
Christian Heimes's avatar
Christian Heimes committed
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
        else:
            self.fail("epoll on closed fd didn't raise EBADF")

    def test_control_and_wait(self):
        client, server = self._connected_pair()

        ep = select.epoll(16)
        ep.register(server.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
        ep.register(client.fileno(),
                   select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
158
        self.assertFalse(then - now > 0.1, then - now)
Christian Heimes's avatar
Christian Heimes committed
159 160 161 162 163 164 165

        events.sort()
        expected = [(client.fileno(), select.EPOLLOUT),
                    (server.fileno(), select.EPOLLOUT)]
        expected.sort()

        self.assertEquals(events, expected)
166
        self.assertFalse(then - now > 0.01, then - now)
Christian Heimes's avatar
Christian Heimes committed
167 168 169 170

        now = time.time()
        events = ep.poll(timeout=2.1, maxevents=4)
        then = time.time()
171
        self.assertFalse(events)
Christian Heimes's avatar
Christian Heimes committed
172 173 174 175 176 177 178

        client.send(b"Hello!")
        server.send(b"world!!!")

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
179
        self.assertFalse(then - now > 0.01)
Christian Heimes's avatar
Christian Heimes committed
180 181 182 183 184 185 186 187 188 189 190 191 192

        events.sort()
        expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
                    (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
        expected.sort()

        self.assertEquals(events, expected)

        ep.unregister(client.fileno())
        ep.modify(server.fileno(), select.EPOLLOUT)
        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
193
        self.assertFalse(then - now > 0.01)
Christian Heimes's avatar
Christian Heimes committed
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211

        expected = [(server.fileno(), select.EPOLLOUT)]
        self.assertEquals(events, expected)

    def test_errors(self):
        self.assertRaises(ValueError, select.epoll, -2)
        self.assertRaises(ValueError, select.epoll().register, -1,
                          select.EPOLLIN)

    def test_unregister_closed(self):
        server, client = self._connected_pair()
        fd = server.fileno()
        ep = select.epoll(16)
        ep.register(server)

        now = time.time()
        events = ep.poll(1, 4)
        then = time.time()
212
        self.assertFalse(then - now > 0.01)
Christian Heimes's avatar
Christian Heimes committed
213 214 215 216 217

        server.close()
        ep.unregister(fd)

def test_main():
218
    support.run_unittest(TestEPoll)
Christian Heimes's avatar
Christian Heimes committed
219 220 221

if __name__ == "__main__":
    test_main()