test_asynchat.py 8.07 KB
Newer Older
1 2
# test asynchat -- requires threading

3
import thread # If this fails, we can't test this module
4
import asyncore, asynchat, socket, threading, time
5
import unittest
6
import sys
7
from test import test_support
8

Christian Heimes's avatar
Christian Heimes committed
9
HOST = test_support.HOST
10
SERVER_QUIT = b'QUIT\n'
11 12

class echo_server(threading.Thread):
13 14 15
    # parameter to determine the number of bytes passed back to the
    # client each send
    chunk_size = 1
16

17 18 19
    def __init__(self, event):
        threading.Thread.__init__(self)
        self.event = event
Christian Heimes's avatar
Christian Heimes committed
20 21
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.port = test_support.bind_port(self.sock)
22

23
    def run(self):
Christian Heimes's avatar
Christian Heimes committed
24
        self.sock.listen(1)
25
        self.event.set()
Christian Heimes's avatar
Christian Heimes committed
26
        conn, client = self.sock.accept()
27 28 29
        self.buffer = b""
        # collect data until quit message is seen
        while SERVER_QUIT not in self.buffer:
30
            data = conn.recv(1)
31 32
            if not data:
                break
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
            self.buffer = self.buffer + data

        # remove the SERVER_QUIT message
        self.buffer = self.buffer.replace(SERVER_QUIT, b'')

        # re-send entire set of collected data
        try:
            # this may fail on some tests, such as test_close_when_done, since
            # the client closes the channel when it's done sending
            while self.buffer:
                n = conn.send(self.buffer[:self.chunk_size])
                time.sleep(0.001)
                self.buffer = self.buffer[n:]
        except:
            pass

49
        conn.close()
Christian Heimes's avatar
Christian Heimes committed
50
        self.sock.close()
51 52 53

class echo_client(asynchat.async_chat):

Christian Heimes's avatar
Christian Heimes committed
54
    def __init__(self, terminator, server_port):
55
        asynchat.async_chat.__init__(self)
56
        self.contents = []
57
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
Christian Heimes's avatar
Christian Heimes committed
58
        self.connect((HOST, server_port))
59
        self.set_terminator(terminator)
60
        self.buffer = b""
61 62

    def handle_connect(self):
63
        pass
64 65 66 67 68 69

    if sys.platform == 'darwin':
        # select.poll returns a select.POLLHUP at the end of the tests
        # on darwin, so just ignore it
        def handle_expt(self):
            pass
70 71

    def collect_incoming_data(self, data):
72
        self.buffer += data
73 74

    def found_terminator(self):
75
        self.contents.append(self.buffer)
76
        self.buffer = b""
77 78


79 80 81 82 83 84 85 86 87 88
def start_echo_server():
    event = threading.Event()
    s = echo_server(event)
    s.start()
    event.wait()
    event.clear()
    time.sleep(0.01) # Give server time to start accepting.
    return s, event


89
class TestAsynchat(unittest.TestCase):
90 91
    usepoll = False

92 93 94 95 96 97
    def setUp (self):
        pass

    def tearDown (self):
        pass

98
    def line_terminator_check(self, term, server_chunk):
99 100
        event = threading.Event()
        s = echo_server(event)
101 102
        s.chunk_size = server_chunk
        s.start()
103 104 105
        event.wait()
        event.clear()
        time.sleep(0.01) # Give server time to start accepting.
Christian Heimes's avatar
Christian Heimes committed
106
        c = echo_client(term, s.port)
107
        c.push(b"hello ")
108 109
        c.push(bytes("world%s" % term, "ascii"))
        c.push(bytes("I'm not dead yet!%s" % term, "ascii"))
110 111 112 113 114 115 116 117 118 119 120 121 122
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
        s.join()

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    # the line terminator tests below check receiving variously-sized
    # chunks back from the server in order to exercise all branches of
    # async_chat.handle_read

    def test_line_terminator1(self):
        # test one-character terminator
        for l in (1,2,3):
123
            self.line_terminator_check('\n', l)
124 125 126 127

    def test_line_terminator2(self):
        # test two-character terminator
        for l in (1,2,3):
128
            self.line_terminator_check('\r\n', l)
129 130 131 132

    def test_line_terminator3(self):
        # test three-character terminator
        for l in (1,2,3):
133
            self.line_terminator_check('qqq', l)
134 135 136

    def numeric_terminator_check(self, termlen):
        # Try reading a fixed number of bytes
137
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
138
        c = echo_client(termlen, s.port)
139 140 141 142
        data = b"hello world, I'm not dead yet!\n"
        c.push(data)
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
143
        s.join()
144

145
        self.assertEqual(c.contents, [data[:termlen]])
146

147 148 149 150 151 152 153 154 155
    def test_numeric_terminator1(self):
        # check that ints & longs both work (since type is
        # explicitly checked in async_chat.handle_read)
        self.numeric_terminator_check(1)

    def test_numeric_terminator2(self):
        self.numeric_terminator_check(6)

    def test_none_terminator(self):
156
        # Try reading a fixed number of bytes
157
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
158
        c = echo_client(None, s.port)
159 160 161 162 163 164 165 166 167 168
        data = b"hello world, I'm not dead yet!\n"
        c.push(data)
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
        s.join()

        self.assertEqual(c.contents, [])
        self.assertEqual(c.buffer, data)

    def test_simple_producer(self):
169
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
170
        c = echo_client(b'\n', s.port)
171 172 173 174 175 176 177 178 179
        data = b"hello world\nI'm not dead yet!\n"
        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
        c.push_with_producer(p)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
        s.join()

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    def test_string_producer(self):
180
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
181
        c = echo_client(b'\n', s.port)
182 183 184 185 186 187 188 189 190
        data = b"hello world\nI'm not dead yet!\n"
        c.push_with_producer(data+SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
        s.join()

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    def test_empty_line(self):
        # checks that empty lines are handled correctly
191
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
192 193
        c = echo_client(b'\n', s.port)
        c.push("hello world\n\nI'm not dead yet!\n")
194 195 196 197 198 199 200 201
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
        s.join()

        self.assertEqual(c.contents,
                         [b"hello world", b"", b"I'm not dead yet!"])

    def test_close_when_done(self):
202
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
203 204
        c = echo_client(b'\n', s.port)
        c.push("hello world\nI'm not dead yet!\n")
205 206 207
        c.push(SERVER_QUIT)
        c.close_when_done()
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
208
        s.join()
209

210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247
        self.assertEqual(c.contents, [])
        # the server might have been able to send a byte or two back, but this
        # at least checks that it received something and didn't just fail
        # (which could still result in the client not having received anything)
        self.assertTrue(len(s.buffer) > 0)


class TestAsynchat_WithPoll(TestAsynchat):
    usepoll = True

class TestHelperFunctions(unittest.TestCase):
    def test_find_prefix_at_end(self):
        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)

class TestFifo(unittest.TestCase):
    def test_basic(self):
        f = asynchat.fifo()
        f.push(7)
        f.push(b'a')
        self.assertEqual(len(f), 2)
        self.assertEqual(f.first(), 7)
        self.assertEqual(f.pop(), (1, 7))
        self.assertEqual(len(f), 1)
        self.assertEqual(f.first(), b'a')
        self.assertEqual(f.is_empty(), False)
        self.assertEqual(f.pop(), (1, b'a'))
        self.assertEqual(len(f), 0)
        self.assertEqual(f.is_empty(), True)
        self.assertEqual(f.pop(), (0, None))

    def test_given_list(self):
        f = asynchat.fifo([b'x', 17, 3])
        self.assertEqual(len(f), 3)
        self.assertEqual(f.pop(), (1, b'x'))
        self.assertEqual(f.pop(), (1, 17))
        self.assertEqual(f.pop(), (1, 3))
        self.assertEqual(f.pop(), (0, None))
248 249 250


def test_main(verbose=None):
251 252
    test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
                              TestHelperFunctions, TestFifo)
253 254 255

if __name__ == "__main__":
    test_main(verbose=True)