test_asynchat.py 9.31 KB
Newer Older
1 2 3 4
# test asynchat

from test import support

5 6
import asynchat
import asyncore
7
import errno
8
import socket
9
import sys
10 11
import _thread as thread
import threading
12 13
import time
import unittest
14
import unittest.mock
15

16
HOST = support.HOST
17
SERVER_QUIT = b'QUIT\n'
18
TIMEOUT = 3.0
19

20

21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
class echo_server(threading.Thread):
    # parameter to determine the number of bytes passed back to the
    # client each send
    chunk_size = 1

    def __init__(self, event):
        threading.Thread.__init__(self)
        self.event = event
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.port = support.bind_port(self.sock)
        # This will be set if the client wants us to wait before echoing
        # data back.
        self.start_resend_event = None

    def run(self):
        self.sock.listen()
        self.event.set()
        conn, client = self.sock.accept()
        self.buffer = b""
        # collect data until quit message is seen
        while SERVER_QUIT not in self.buffer:
            data = conn.recv(1)
            if not data:
                break
            self.buffer = self.buffer + data

        # remove the SERVER_QUIT message
        self.buffer = self.buffer.replace(SERVER_QUIT, b'')

        if self.start_resend_event:
            self.start_resend_event.wait()

        # re-send entire set of collected data
        try:
            # this may fail on some tests, such as test_close_when_done,
            # since the client closes the channel when it's done sending
            while self.buffer:
                n = conn.send(self.buffer[:self.chunk_size])
                time.sleep(0.001)
                self.buffer = self.buffer[n:]
        except:
            pass

        conn.close()
        self.sock.close()

class echo_client(asynchat.async_chat):

    def __init__(self, terminator, server_port):
        asynchat.async_chat.__init__(self)
        self.contents = []
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect((HOST, server_port))
        self.set_terminator(terminator)
        self.buffer = b""

        def handle_connect(self):
            pass

        if sys.platform == 'darwin':
            # select.poll returns a select.POLLHUP at the end of the tests
            # on darwin, so just ignore it
            def handle_expt(self):
84 85
                pass

86 87
    def collect_incoming_data(self, data):
        self.buffer += data
88

89 90 91
    def found_terminator(self):
        self.contents.append(self.buffer)
        self.buffer = b""
92

93 94 95 96 97 98 99 100
def start_echo_server():
    event = threading.Event()
    s = echo_server(event)
    s.start()
    event.wait()
    event.clear()
    time.sleep(0.01)   # Give server time to start accepting.
    return s, event
101 102


103
class TestAsynchat(unittest.TestCase):
104 105
    usepoll = False

106
    def setUp(self):
107
        self._threads = support.threading_setup()
108

109
    def tearDown(self):
110
        support.threading_cleanup(*self._threads)
111

112
    def line_terminator_check(self, term, server_chunk):
113 114
        event = threading.Event()
        s = echo_server(event)
115 116
        s.chunk_size = server_chunk
        s.start()
117 118
        event.wait()
        event.clear()
119
        time.sleep(0.01)   # Give server time to start accepting.
Christian Heimes's avatar
Christian Heimes committed
120
        c = echo_client(term, s.port)
121
        c.push(b"hello ")
122 123
        c.push(b"world" + term)
        c.push(b"I'm not dead yet!" + term)
124 125
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
126
        support.join_thread(s, timeout=TIMEOUT)
127 128 129 130 131 132 133 134 135

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    # the line terminator tests below check receiving variously-sized
    # chunks back from the server in order to exercise all branches of
    # async_chat.handle_read

    def test_line_terminator1(self):
        # test one-character terminator
136
        for l in (1, 2, 3):
137
            self.line_terminator_check(b'\n', l)
138 139 140

    def test_line_terminator2(self):
        # test two-character terminator
141
        for l in (1, 2, 3):
142
            self.line_terminator_check(b'\r\n', l)
143 144 145

    def test_line_terminator3(self):
        # test three-character terminator
146
        for l in (1, 2, 3):
147
            self.line_terminator_check(b'qqq', l)
148 149 150

    def numeric_terminator_check(self, termlen):
        # Try reading a fixed number of bytes
151
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
152
        c = echo_client(termlen, s.port)
153 154 155 156
        data = b"hello world, I'm not dead yet!\n"
        c.push(data)
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
157
        support.join_thread(s, timeout=TIMEOUT)
158

159
        self.assertEqual(c.contents, [data[:termlen]])
160

161 162 163 164 165 166 167 168 169
    def test_numeric_terminator1(self):
        # check that ints & longs both work (since type is
        # explicitly checked in async_chat.handle_read)
        self.numeric_terminator_check(1)

    def test_numeric_terminator2(self):
        self.numeric_terminator_check(6)

    def test_none_terminator(self):
170
        # Try reading a fixed number of bytes
171
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
172
        c = echo_client(None, s.port)
173 174 175 176
        data = b"hello world, I'm not dead yet!\n"
        c.push(data)
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
177
        support.join_thread(s, timeout=TIMEOUT)
178 179 180 181 182

        self.assertEqual(c.contents, [])
        self.assertEqual(c.buffer, data)

    def test_simple_producer(self):
183
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
184
        c = echo_client(b'\n', s.port)
185 186 187 188
        data = b"hello world\nI'm not dead yet!\n"
        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
        c.push_with_producer(p)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
189
        support.join_thread(s, timeout=TIMEOUT)
190 191 192 193

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    def test_string_producer(self):
194
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
195
        c = echo_client(b'\n', s.port)
196 197 198
        data = b"hello world\nI'm not dead yet!\n"
        c.push_with_producer(data+SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
199
        support.join_thread(s, timeout=TIMEOUT)
200 201 202 203 204

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    def test_empty_line(self):
        # checks that empty lines are handled correctly
205
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
206
        c = echo_client(b'\n', s.port)
207
        c.push(b"hello world\n\nI'm not dead yet!\n")
208 209
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
210
        support.join_thread(s, timeout=TIMEOUT)
211 212 213 214 215

        self.assertEqual(c.contents,
                         [b"hello world", b"", b"I'm not dead yet!"])

    def test_close_when_done(self):
216
        s, event = start_echo_server()
217
        s.start_resend_event = threading.Event()
Christian Heimes's avatar
Christian Heimes committed
218
        c = echo_client(b'\n', s.port)
219
        c.push(b"hello world\nI'm not dead yet!\n")
220 221 222
        c.push(SERVER_QUIT)
        c.close_when_done()
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
223 224 225 226 227 228

        # Only allow the server to start echoing data back to the client after
        # the client has closed its connection.  This prevents a race condition
        # where the server echoes all of its data before we can check that it
        # got any down below.
        s.start_resend_event.set()
229
        support.join_thread(s, timeout=TIMEOUT)
230

231 232 233 234
        self.assertEqual(c.contents, [])
        # the server might have been able to send a byte or two back, but this
        # at least checks that it received something and didn't just fail
        # (which could still result in the client not having received anything)
235
        self.assertGreater(len(s.buffer), 0)
236

237 238 239 240 241 242 243 244 245 246 247 248 249
    def test_push(self):
        # Issue #12523: push() should raise a TypeError if it doesn't get
        # a bytes string
        s, event = start_echo_server()
        c = echo_client(b'\n', s.port)
        data = b'bytes\n'
        c.push(data)
        c.push(bytearray(data))
        c.push(memoryview(data))
        self.assertRaises(TypeError, c.push, 10)
        self.assertRaises(TypeError, c.push, 'unicode')
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
250
        support.join_thread(s, timeout=TIMEOUT)
251 252
        self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])

253 254 255 256

class TestAsynchat_WithPoll(TestAsynchat):
    usepoll = True

257

258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
class TestAsynchatMocked(unittest.TestCase):
    def test_blockingioerror(self):
        # Issue #16133: handle_read() must ignore BlockingIOError
        sock = unittest.mock.Mock()
        sock.recv.side_effect = BlockingIOError(errno.EAGAIN)

        dispatcher = asynchat.async_chat()
        dispatcher.set_socket(sock)
        self.addCleanup(dispatcher.del_channel)

        with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
            dispatcher.handle_read()
        self.assertFalse(error.called)


273 274 275 276 277
class TestHelperFunctions(unittest.TestCase):
    def test_find_prefix_at_end(self):
        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)

278

279 280 281 282 283 284 285 286
class TestNotConnected(unittest.TestCase):
    def test_disallow_negative_terminator(self):
        # Issue #11259
        client = asynchat.async_chat()
        self.assertRaises(ValueError, client.set_terminator, -1)



287
if __name__ == "__main__":
288
    unittest.main()