test_asynchat.py 11.3 KB
Newer Older
1 2 3 4 5 6
# test asynchat

from test import support

# If this fails, the test will be skipped.
thread = support.import_module('_thread')
7

8 9
import asynchat
import asyncore
10
import errno
11
import socket
12
import sys
13 14
import time
import unittest
15
import warnings
16
import unittest.mock
17 18 19 20
try:
    import threading
except ImportError:
    threading = None
21

22
HOST = support.HOST
23
SERVER_QUIT = b'QUIT\n'
24
TIMEOUT = 3.0
25

26 27 28 29 30 31 32 33 34 35 36
if threading:
    class echo_server(threading.Thread):
        # parameter to determine the number of bytes passed back to the
        # client each send
        chunk_size = 1

        def __init__(self, event):
            threading.Thread.__init__(self)
            self.event = event
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.port = support.bind_port(self.sock)
37 38
            # This will be set if the client wants us to wait before echoing
            # data back.
39 40 41
            self.start_resend_event = None

        def run(self):
42
            self.sock.listen()
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
            self.event.set()
            conn, client = self.sock.accept()
            self.buffer = b""
            # collect data until quit message is seen
            while SERVER_QUIT not in self.buffer:
                data = conn.recv(1)
                if not data:
                    break
                self.buffer = self.buffer + data

            # remove the SERVER_QUIT message
            self.buffer = self.buffer.replace(SERVER_QUIT, b'')

            if self.start_resend_event:
                self.start_resend_event.wait()

            # re-send entire set of collected data
            try:
61 62
                # this may fail on some tests, such as test_close_when_done,
                # since the client closes the channel when it's done sending
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
                while self.buffer:
                    n = conn.send(self.buffer[:self.chunk_size])
                    time.sleep(0.001)
                    self.buffer = self.buffer[n:]
            except:
                pass

            conn.close()
            self.sock.close()

    class echo_client(asynchat.async_chat):

        def __init__(self, terminator, server_port):
            asynchat.async_chat.__init__(self)
            self.contents = []
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
            self.connect((HOST, server_port))
            self.set_terminator(terminator)
            self.buffer = b""

            def handle_connect(self):
                pass

            if sys.platform == 'darwin':
                # select.poll returns a select.POLLHUP at the end of the tests
                # on darwin, so just ignore it
                def handle_expt(self):
                    pass

        def collect_incoming_data(self, data):
            self.buffer += data

        def found_terminator(self):
            self.contents.append(self.buffer)
            self.buffer = b""

    def start_echo_server():
        event = threading.Event()
        s = echo_server(event)
        s.start()
        event.wait()
        event.clear()
105
        time.sleep(0.01)   # Give server time to start accepting.
106 107 108 109
        return s, event


@unittest.skipUnless(threading, 'Threading required for this test.')
110
class TestAsynchat(unittest.TestCase):
111 112
    usepoll = False

113
    def setUp(self):
114
        self._threads = support.threading_setup()
115

116
    def tearDown(self):
117
        support.threading_cleanup(*self._threads)
118

119
    def line_terminator_check(self, term, server_chunk):
120 121
        event = threading.Event()
        s = echo_server(event)
122 123
        s.chunk_size = server_chunk
        s.start()
124 125
        event.wait()
        event.clear()
126
        time.sleep(0.01)   # Give server time to start accepting.
Christian Heimes's avatar
Christian Heimes committed
127
        c = echo_client(term, s.port)
128
        c.push(b"hello ")
129 130
        c.push(b"world" + term)
        c.push(b"I'm not dead yet!" + term)
131 132
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
133 134 135
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
136 137 138 139 140 141 142 143 144

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    # the line terminator tests below check receiving variously-sized
    # chunks back from the server in order to exercise all branches of
    # async_chat.handle_read

    def test_line_terminator1(self):
        # test one-character terminator
145
        for l in (1, 2, 3):
146
            self.line_terminator_check(b'\n', l)
147 148 149

    def test_line_terminator2(self):
        # test two-character terminator
150
        for l in (1, 2, 3):
151
            self.line_terminator_check(b'\r\n', l)
152 153 154

    def test_line_terminator3(self):
        # test three-character terminator
155
        for l in (1, 2, 3):
156
            self.line_terminator_check(b'qqq', l)
157 158 159

    def numeric_terminator_check(self, termlen):
        # Try reading a fixed number of bytes
160
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
161
        c = echo_client(termlen, s.port)
162 163 164 165
        data = b"hello world, I'm not dead yet!\n"
        c.push(data)
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
166 167 168
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
169

170
        self.assertEqual(c.contents, [data[:termlen]])
171

172 173 174 175 176 177 178 179 180
    def test_numeric_terminator1(self):
        # check that ints & longs both work (since type is
        # explicitly checked in async_chat.handle_read)
        self.numeric_terminator_check(1)

    def test_numeric_terminator2(self):
        self.numeric_terminator_check(6)

    def test_none_terminator(self):
181
        # Try reading a fixed number of bytes
182
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
183
        c = echo_client(None, s.port)
184 185 186 187
        data = b"hello world, I'm not dead yet!\n"
        c.push(data)
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
188 189 190
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
191 192 193 194 195

        self.assertEqual(c.contents, [])
        self.assertEqual(c.buffer, data)

    def test_simple_producer(self):
196
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
197
        c = echo_client(b'\n', s.port)
198 199 200 201
        data = b"hello world\nI'm not dead yet!\n"
        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
        c.push_with_producer(p)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
202 203 204
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
205 206 207 208

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    def test_string_producer(self):
209
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
210
        c = echo_client(b'\n', s.port)
211 212 213
        data = b"hello world\nI'm not dead yet!\n"
        c.push_with_producer(data+SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
214 215 216
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
217 218 219 220 221

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    def test_empty_line(self):
        # checks that empty lines are handled correctly
222
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
223
        c = echo_client(b'\n', s.port)
224
        c.push(b"hello world\n\nI'm not dead yet!\n")
225 226
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
227 228 229
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
230 231 232 233 234

        self.assertEqual(c.contents,
                         [b"hello world", b"", b"I'm not dead yet!"])

    def test_close_when_done(self):
235
        s, event = start_echo_server()
236
        s.start_resend_event = threading.Event()
Christian Heimes's avatar
Christian Heimes committed
237
        c = echo_client(b'\n', s.port)
238
        c.push(b"hello world\nI'm not dead yet!\n")
239 240 241
        c.push(SERVER_QUIT)
        c.close_when_done()
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
242 243 244 245 246 247

        # Only allow the server to start echoing data back to the client after
        # the client has closed its connection.  This prevents a race condition
        # where the server echoes all of its data before we can check that it
        # got any down below.
        s.start_resend_event.set()
248 249 250
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
251

252 253 254 255
        self.assertEqual(c.contents, [])
        # the server might have been able to send a byte or two back, but this
        # at least checks that it received something and didn't just fail
        # (which could still result in the client not having received anything)
256
        self.assertGreater(len(s.buffer), 0)
257

258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
    def test_push(self):
        # Issue #12523: push() should raise a TypeError if it doesn't get
        # a bytes string
        s, event = start_echo_server()
        c = echo_client(b'\n', s.port)
        data = b'bytes\n'
        c.push(data)
        c.push(bytearray(data))
        c.push(memoryview(data))
        self.assertRaises(TypeError, c.push, 10)
        self.assertRaises(TypeError, c.push, 'unicode')
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
        s.join(timeout=TIMEOUT)
        self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])

274 275 276 277

class TestAsynchat_WithPoll(TestAsynchat):
    usepoll = True

278

279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
class TestAsynchatMocked(unittest.TestCase):
    def test_blockingioerror(self):
        # Issue #16133: handle_read() must ignore BlockingIOError
        sock = unittest.mock.Mock()
        sock.recv.side_effect = BlockingIOError(errno.EAGAIN)

        dispatcher = asynchat.async_chat()
        dispatcher.set_socket(sock)
        self.addCleanup(dispatcher.del_channel)

        with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
            dispatcher.handle_read()
        self.assertFalse(error.called)


294 295 296 297 298
class TestHelperFunctions(unittest.TestCase):
    def test_find_prefix_at_end(self):
        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)

299

300 301
class TestFifo(unittest.TestCase):
    def test_basic(self):
302
        with self.assertWarns(DeprecationWarning) as cm:
303
            f = asynchat.fifo()
304 305
        self.assertEqual(str(cm.warning),
                         "fifo class will be removed in Python 3.6")
306 307 308 309 310 311 312 313 314 315 316 317 318 319
        f.push(7)
        f.push(b'a')
        self.assertEqual(len(f), 2)
        self.assertEqual(f.first(), 7)
        self.assertEqual(f.pop(), (1, 7))
        self.assertEqual(len(f), 1)
        self.assertEqual(f.first(), b'a')
        self.assertEqual(f.is_empty(), False)
        self.assertEqual(f.pop(), (1, b'a'))
        self.assertEqual(len(f), 0)
        self.assertEqual(f.is_empty(), True)
        self.assertEqual(f.pop(), (0, None))

    def test_given_list(self):
320
        with self.assertWarns(DeprecationWarning) as cm:
321
            f = asynchat.fifo([b'x', 17, 3])
322 323
        self.assertEqual(str(cm.warning),
                         "fifo class will be removed in Python 3.6")
324 325 326 327 328
        self.assertEqual(len(f), 3)
        self.assertEqual(f.pop(), (1, b'x'))
        self.assertEqual(f.pop(), (1, 17))
        self.assertEqual(f.pop(), (1, 3))
        self.assertEqual(f.pop(), (0, None))
329 330


331 332 333 334 335 336 337 338
class TestNotConnected(unittest.TestCase):
    def test_disallow_negative_terminator(self):
        # Issue #11259
        client = asynchat.async_chat()
        self.assertRaises(ValueError, client.set_terminator, -1)



339
if __name__ == "__main__":
340
    unittest.main()