asyncore.py 19.6 KB
Newer Older
1
# -*- Mode: Python -*-
Tim Peters's avatar
Tim Peters committed
2
#   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3
#   Author: Sam Rushing <rushing@nightmare.com>
4 5 6

# ======================================================================
# Copyright 1996 by Sam Rushing
Tim Peters's avatar
Tim Peters committed
7
#
8
#                         All Rights Reserved
Tim Peters's avatar
Tim Peters committed
9
#
10 11 12 13 14 15 16 17
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
Tim Peters's avatar
Tim Peters committed
18
#
19 20 21 22 23 24 25 26 27
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================

28 29 30
"""Basic infrastructure for asynchronous socket service clients and servers.

There are only two ways to have a program on a single processor do "more
Tim Peters's avatar
Tim Peters committed
31
than one thing at a time".  Multi-threaded programming is the simplest and
32 33 34 35 36
most popular way to do it, but there is another very different technique,
that lets you have nearly all the advantages of multi-threading, without
actually using multiple threads. it's really only practical if your program
is largely I/O bound. If your program is CPU bound, then pre-emptive
scheduled threads are probably what you really need. Network servers are
Tim Peters's avatar
Tim Peters committed
37
rarely CPU-bound, however.
38

Tim Peters's avatar
Tim Peters committed
39
If your operating system supports the select() system call in its I/O
40 41 42 43 44 45
library (and nearly all do), then you can use it to juggle multiple
communication channels at once; doing other work while your I/O is taking
place in the "background."  Although this strategy can seem strange and
complex, especially at first, it is in many ways easier to understand and
control than multi-threaded programming. The module documented here solves
many of the difficult problems for you, making the task of building
Tim Peters's avatar
Tim Peters committed
46
sophisticated high-performance network servers and clients a snap.
47 48
"""

49 50 51
import select
import socket
import sys
52
import time
53 54
import warnings

55
import os
56
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
57
     ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
Giampaolo Rodolà's avatar
Giampaolo Rodolà committed
58
     errorcode
59

60 61
_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
                           EBADF})
62

63
try:
64
    socket_map
65
except NameError:
66
    socket_map = {}
67

68
def _strerror(err):
69
    try:
70 71
        return os.strerror(err)
    except (ValueError, OverflowError, NameError):
72 73 74
        if err in errorcode:
            return errorcode[err]
        return "Unknown error %s" %err
75

76
class ExitNow(Exception):
77
    pass
78

79 80
_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)

Jeremy Hylton's avatar
Jeremy Hylton committed
81 82 83
def read(obj):
    try:
        obj.handle_read_event()
84
    except _reraised_exceptions:
Jeremy Hylton's avatar
Jeremy Hylton committed
85 86 87 88 89 90 91
        raise
    except:
        obj.handle_error()

def write(obj):
    try:
        obj.handle_write_event()
92
    except _reraised_exceptions:
Jeremy Hylton's avatar
Jeremy Hylton committed
93 94 95 96
        raise
    except:
        obj.handle_error()

97
def _exception(obj):
98 99
    try:
        obj.handle_expt_event()
100
    except _reraised_exceptions:
101 102 103 104
        raise
    except:
        obj.handle_error()

Jeremy Hylton's avatar
Jeremy Hylton committed
105 106
def readwrite(obj, flags):
    try:
107
        if flags & select.POLLIN:
Jeremy Hylton's avatar
Jeremy Hylton committed
108 109 110
            obj.handle_read_event()
        if flags & select.POLLOUT:
            obj.handle_write_event()
111 112
        if flags & select.POLLPRI:
            obj.handle_expt_event()
113 114
        if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
            obj.handle_close()
115
    except OSError as e:
116
        if e.args[0] not in _DISCONNECTED:
117 118 119
            obj.handle_error()
        else:
            obj.handle_close()
120
    except _reraised_exceptions:
Jeremy Hylton's avatar
Jeremy Hylton committed
121 122 123
        raise
    except:
        obj.handle_error()
124

125
def poll(timeout=0.0, map=None):
126 127 128 129
    if map is None:
        map = socket_map
    if map:
        r = []; w = []; e = []
130
        for fd, obj in list(map.items()):
131 132 133
            is_r = obj.readable()
            is_w = obj.writable()
            if is_r:
Jeremy Hylton's avatar
Jeremy Hylton committed
134
                r.append(fd)
135 136
            # accepting sockets should not be writable
            if is_w and not obj.accepting:
Jeremy Hylton's avatar
Jeremy Hylton committed
137
                w.append(fd)
138 139
            if is_r or is_w:
                e.append(fd)
140 141
        if [] == r == w == e:
            time.sleep(timeout)
142 143
            return

144
        r, w, e = select.select(r, w, e, timeout)
145 146

        for fd in r:
Jeremy Hylton's avatar
Jeremy Hylton committed
147 148
            obj = map.get(fd)
            if obj is None:
149
                continue
Jeremy Hylton's avatar
Jeremy Hylton committed
150
            read(obj)
151 152

        for fd in w:
Jeremy Hylton's avatar
Jeremy Hylton committed
153 154
            obj = map.get(fd)
            if obj is None:
155
                continue
Jeremy Hylton's avatar
Jeremy Hylton committed
156
            write(obj)
157

158 159 160 161 162 163
        for fd in e:
            obj = map.get(fd)
            if obj is None:
                continue
            _exception(obj)

164
def poll2(timeout=0.0, map=None):
165 166
    # Use the poll() support added to the select module in Python 2.0
    if map is None:
167
        map = socket_map
168 169 170
    if timeout is not None:
        # timeout is in milliseconds
        timeout = int(timeout*1000)
171 172
    pollster = select.poll()
    if map:
173
        for fd, obj in list(map.items()):
174
            flags = 0
175
            if obj.readable():
176
                flags |= select.POLLIN | select.POLLPRI
177 178
            # accepting sockets should not be writable
            if obj.writable() and not obj.accepting:
179
                flags |= select.POLLOUT
180 181
            if flags:
                pollster.register(fd, flags)
182 183

        r = pollster.poll(timeout)
184
        for fd, flags in r:
Jeremy Hylton's avatar
Jeremy Hylton committed
185 186
            obj = map.get(fd)
            if obj is None:
187
                continue
Jeremy Hylton's avatar
Jeremy Hylton committed
188
            readwrite(obj, flags)
189

190 191
poll3 = poll2                           # Alias for backward compatibility

192
def loop(timeout=30.0, use_poll=False, map=None, count=None):
193
    if map is None:
Jeremy Hylton's avatar
Jeremy Hylton committed
194
        map = socket_map
195

196 197
    if use_poll and hasattr(select, 'poll'):
        poll_fun = poll2
198 199
    else:
        poll_fun = poll
200

201 202 203 204 205 206 207 208
    if count is None:
        while map:
            poll_fun(timeout, map)

    else:
        while map and count > 0:
            poll_fun(timeout, map)
            count = count - 1
209 210

class dispatcher:
211

212
    debug = False
213 214
    connected = False
    accepting = False
215
    connecting = False
216
    closing = False
217
    addr = None
218
    ignore_log_types = frozenset({'warning'})
219

220
    def __init__(self, sock=None, map=None):
221 222 223 224 225
        if map is None:
            self._map = socket_map
        else:
            self._map = map

226 227
        self._fileno = None

228
        if sock:
229 230 231
            # Set to nonblocking just to make sure for cases where we
            # get a socket from a blocking source.
            sock.setblocking(0)
232
            self.set_socket(sock, map)
233
            self.connected = True
234 235
            # The constructor no longer requires that the socket
            # passed be connected.
236 237
            try:
                self.addr = sock.getpeername()
238
            except OSError as err:
239
                if err.args[0] in (ENOTCONN, EINVAL):
240 241 242 243 244 245 246 247 248
                    # To handle the case where we got an unconnected
                    # socket.
                    self.connected = False
                else:
                    # The socket is broken in some unknown way, alert
                    # the user and remove it from the map (to prevent
                    # polling of broken sockets).
                    self.del_channel(map)
                    raise
249 250
        else:
            self.socket = None
251

252
    def __repr__(self):
253
        status = [self.__class__.__module__+"."+self.__class__.__qualname__]
Martin v. Löwis's avatar
Martin v. Löwis committed
254
        if self.accepting and self.addr:
255
            status.append('listening')
Martin v. Löwis's avatar
Martin v. Löwis committed
256
        elif self.connected:
257
            status.append('connected')
Martin v. Löwis's avatar
Martin v. Löwis committed
258 259
        if self.addr is not None:
            try:
260
                status.append('%s:%d' % self.addr)
Martin v. Löwis's avatar
Martin v. Löwis committed
261
            except TypeError:
262 263
                status.append(repr(self.addr))
        return '<%s at %#x>' % (' '.join(status), id(self))
264

265 266
    __str__ = __repr__

267 268
    def add_channel(self, map=None):
        #self.log_info('adding channel %s' % self)
269
        if map is None:
270
            map = self._map
271
        map[self._fileno] = self
272

273
    def del_channel(self, map=None):
274 275
        fd = self._fileno
        if map is None:
276
            map = self._map
277
        if fd in map:
278
            #self.log_info('closing channel %d:%s' % (fd, self))
279
            del map[fd]
280
        self._fileno = None
281

282
    def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
283
        self.family_and_type = family, type
284 285 286
        sock = socket.socket(family, type)
        sock.setblocking(0)
        self.set_socket(sock)
287

288
    def set_socket(self, sock, map=None):
289
        self.socket = sock
290
        self._fileno = sock.fileno()
291
        self.add_channel(map)
292

293
    def set_reuse_addr(self):
294 295
        # try to re-use a server port if possible
        try:
296
            self.socket.setsockopt(
297
                socket.SOL_SOCKET, socket.SO_REUSEADDR,
298
                self.socket.getsockopt(socket.SOL_SOCKET,
299
                                       socket.SO_REUSEADDR) | 1
300
                )
301
        except OSError:
302
            pass
303

304 305 306 307 308 309
    # ==================================================
    # predicates for select()
    # these are used as filters for the lists of sockets
    # to pass to select().
    # ==================================================

310
    def readable(self):
311
        return True
312

313 314
    def writable(self):
        return True
315 316 317 318 319

    # ==================================================
    # socket object methods.
    # ==================================================

320
    def listen(self, num):
321
        self.accepting = True
322
        if os.name == 'nt' and num > 5:
323
            num = 5
324
        return self.socket.listen(num)
325

326
    def bind(self, addr):
327
        self.addr = addr
328
        return self.socket.bind(addr)
329

330
    def connect(self, address):
331
        self.connected = False
332
        self.connecting = True
333
        err = self.socket.connect_ex(address)
334
        if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
335
        or err == EINVAL and os.name == 'nt':
336
            self.addr = address
337 338 339
            return
        if err in (0, EISCONN):
            self.addr = address
340
            self.handle_connect_event()
341
        else:
342
            raise OSError(err, errorcode[err])
343

344
    def accept(self):
345
        # XXX can return either an address pair or None
346 347
        try:
            conn, addr = self.socket.accept()
348 349
        except TypeError:
            return None
350
        except OSError as why:
351
            if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
352
                return None
353
            else:
Tim Peters's avatar
Tim Peters committed
354
                raise
355 356
        else:
            return conn, addr
357

358
    def send(self, data):
359
        try:
360
            result = self.socket.send(data)
361
            return result
362
        except OSError as why:
363
            if why.args[0] == EWOULDBLOCK:
364
                return 0
365
            elif why.args[0] in _DISCONNECTED:
366
                self.handle_close()
367 368
                return 0
            else:
Tim Peters's avatar
Tim Peters committed
369
                raise
370

371
    def recv(self, buffer_size):
372
        try:
373
            data = self.socket.recv(buffer_size)
374 375 376 377
            if not data:
                # a closed connection is indicated by signaling
                # a read condition, and having recv() return 0.
                self.handle_close()
378
                return b''
379 380
            else:
                return data
381
        except OSError as why:
382
            # winsock sometimes raises ENOTCONN
383
            if why.args[0] in _DISCONNECTED:
384
                self.handle_close()
385
                return b''
386
            else:
Tim Peters's avatar
Tim Peters committed
387
                raise
388

389
    def close(self):
390 391
        self.connected = False
        self.accepting = False
392
        self.connecting = False
393
        self.del_channel()
394 395 396 397 398 399
        if self.socket is not None:
            try:
                self.socket.close()
            except OSError as why:
                if why.args[0] not in (ENOTCONN, EBADF):
                    raise
400

Andrew M. Kuchling's avatar
Andrew M. Kuchling committed
401
    # log and log_info may be overridden to provide more sophisticated
402
    # logging and warning methods. In general, log is for 'hit' logging
Tim Peters's avatar
Tim Peters committed
403
    # and 'log_info' is for informational, warning and error logging.
404

405 406
    def log(self, message):
        sys.stderr.write('log: %s\n' % str(message))
407

408
    def log_info(self, message, type='info'):
409
        if type not in self.ignore_log_types:
410
            print('%s: %s' % (type, message))
411

412
    def handle_read_event(self):
413
        if self.accepting:
414 415
            # accepting sockets are never connected, they "spawn" new
            # sockets that are connected
416 417
            self.handle_accept()
        elif not self.connected:
418 419
            if self.connecting:
                self.handle_connect_event()
420 421 422 423
            self.handle_read()
        else:
            self.handle_read()

424
    def handle_connect_event(self):
425 426
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
427
            raise OSError(err, _strerror(err))
428
        self.handle_connect()
429
        self.connected = True
430
        self.connecting = False
431

432
    def handle_write_event(self):
433 434 435 436 437
        if self.accepting:
            # Accepting sockets shouldn't get a write event.
            # We will pretend it didn't happen.
            return

438
        if not self.connected:
439 440
            if self.connecting:
                self.handle_connect_event()
441 442
        self.handle_write()

443
    def handle_expt_event(self):
444 445 446 447 448 449 450 451 452 453 454
        # handle_expt_event() is called if there might be an error on the
        # socket, or if there is OOB data
        # check for the error condition first
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
            # we can get here when select.select() says that there is an
            # exceptional condition on the socket
            # since there is an error, we'll go ahead and close the socket
            # like we would in a subclassed handle_read() that received no
            # data
            self.handle_close()
455 456
        else:
            self.handle_expt()
457

458
    def handle_error(self):
459
        nil, t, v, tbinfo = compact_traceback()
460 461 462

        # sometimes a user repr method will crash.
        try:
463
            self_repr = repr(self)
464
        except:
465
            self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
466

467
        self.log_info(
468 469 470 471 472 473 474 475
            'uncaptured python exception, closing channel %s (%s:%s %s)' % (
                self_repr,
                t,
                v,
                tbinfo
                ),
            'error'
            )
476
        self.handle_close()
477

478
    def handle_expt(self):
479
        self.log_info('unhandled incoming priority event', 'warning')
480

481 482
    def handle_read(self):
        self.log_info('unhandled read event', 'warning')
483

484 485
    def handle_write(self):
        self.log_info('unhandled write event', 'warning')
486

487 488
    def handle_connect(self):
        self.log_info('unhandled connect event', 'warning')
489

490
    def handle_accept(self):
491 492 493 494 495 496 497
        pair = self.accept()
        if pair is not None:
            self.handle_accepted(*pair)

    def handle_accepted(self, sock, addr):
        sock.close()
        self.log_info('unhandled accepted event', 'warning')
498

499 500
    def handle_close(self):
        self.log_info('unhandled close event', 'warning')
501
        self.close()
502 503 504 505 506 507

# ---------------------------------------------------------------------------
# adds simple buffered output capability, useful for simple clients.
# [for more sophisticated usage use asynchat.async_chat]
# ---------------------------------------------------------------------------

508 509
class dispatcher_with_send(dispatcher):

510 511
    def __init__(self, sock=None, map=None):
        dispatcher.__init__(self, sock, map)
512
        self.out_buffer = b''
513

514
    def initiate_send(self):
515
        num_sent = 0
516
        num_sent = dispatcher.send(self, self.out_buffer[:65536])
517
        self.out_buffer = self.out_buffer[num_sent:]
518

519
    def handle_write(self):
520
        self.initiate_send()
521

522
    def writable(self):
523
        return (not self.connected) or len(self.out_buffer)
524

525
    def send(self, data):
526 527
        if self.debug:
            self.log_info('sending %s' % repr(data))
528 529
        self.out_buffer = self.out_buffer + data
        self.initiate_send()
530 531 532 533 534

# ---------------------------------------------------------------------------
# used for debugging.
# ---------------------------------------------------------------------------

535
def compact_traceback():
536
    t, v, tb = sys.exc_info()
537
    tbinfo = []
538 539
    if not tb: # Must have a traceback
        raise AssertionError("traceback does not exist")
540
    while tb:
541
        tbinfo.append((
542
            tb.tb_frame.f_code.co_filename,
Tim Peters's avatar
Tim Peters committed
543
            tb.tb_frame.f_code.co_name,
544 545 546 547 548 549 550 551
            str(tb.tb_lineno)
            ))
        tb = tb.tb_next

    # just to be safe
    del tb

    file, function, line = tbinfo[-1]
552
    info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
553
    return (file, function, line), t, v, info
554

555
def close_all(map=None, ignore_all=False):
556
    if map is None:
557
        map = socket_map
558 559 560 561
    for x in list(map.values()):
        try:
            x.close()
        except OSError as x:
562
            if x.args[0] == EBADF:
563 564 565
                pass
            elif not ignore_all:
                raise
566
        except _reraised_exceptions:
567 568 569 570
            raise
        except:
            if not ignore_all:
                raise
571
    map.clear()
572 573 574 575 576

# Asynchronous File I/O:
#
# After a little research (reading man pages on various unixen, and
# digging through the linux kernel), I've determined that select()
577
# isn't meant for doing asynchronous file i/o.
578 579 580 581 582 583 584 585 586
# Heartening, though - reading linux/mm/filemap.c shows that linux
# supports asynchronous read-ahead.  So _MOST_ of the time, the data
# will be sitting in memory for us already when we go to read it.
#
# What other OS's (besides NT) support async file i/o?  [VMS?]
#
# Regardless, this is useful for pipes, and stdin/stdout...

if os.name == 'posix':
587
    class file_wrapper:
588
        # Here we override just enough to make a file
589
        # look like a socket for the purposes of asyncore.
590
        # The passed fd is automatically os.dup()'d
591 592

        def __init__(self, fd):
593
            self.fd = os.dup(fd)
594

595 596
        def __del__(self):
            if self.fd >= 0:
597 598
                warnings.warn("unclosed file %r" % self, ResourceWarning,
                              source=self)
599 600
            self.close()

601
        def recv(self, *args):
602
            return os.read(self.fd, *args)
603

604
        def send(self, *args):
605
            return os.write(self.fd, *args)
606

607 608 609 610 611 612 613 614
        def getsockopt(self, level, optname, buflen=None):
            if (level == socket.SOL_SOCKET and
                optname == socket.SO_ERROR and
                not buflen):
                return 0
            raise NotImplementedError("Only asyncore specific behaviour "
                                      "implemented.")

615 616 617
        read = recv
        write = send

618
        def close(self):
619 620
            if self.fd < 0:
                return
621
            fd = self.fd
622
            self.fd = -1
623
            os.close(fd)
624

625
        def fileno(self):
626 627
            return self.fd

628 629
    class file_dispatcher(dispatcher):

630 631
        def __init__(self, fd, map=None):
            dispatcher.__init__(self, None, map)
632
            self.connected = True
633 634 635 636
            try:
                fd = fd.fileno()
            except AttributeError:
                pass
637
            self.set_file(fd)
638
            # set it to non-blocking mode
639
            os.set_blocking(fd, False)
640

641 642
        def set_file(self, fd):
            self.socket = file_wrapper(fd)
643
            self._fileno = self.socket.fileno()
644
            self.add_channel()