asyncore.py 20.3 KB
Newer Older
1
# -*- Mode: Python -*-
Tim Peters's avatar
Tim Peters committed
2
#   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3
#   Author: Sam Rushing <rushing@nightmare.com>
4 5 6

# ======================================================================
# Copyright 1996 by Sam Rushing
Tim Peters's avatar
Tim Peters committed
7
#
8
#                         All Rights Reserved
Tim Peters's avatar
Tim Peters committed
9
#
10 11 12 13 14 15 16 17
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
Tim Peters's avatar
Tim Peters committed
18
#
19 20 21 22 23 24 25 26 27
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================

28 29 30
"""Basic infrastructure for asynchronous socket service clients and servers.

There are only two ways to have a program on a single processor do "more
Tim Peters's avatar
Tim Peters committed
31
than one thing at a time".  Multi-threaded programming is the simplest and
32 33 34 35 36
most popular way to do it, but there is another very different technique,
that lets you have nearly all the advantages of multi-threading, without
actually using multiple threads. it's really only practical if your program
is largely I/O bound. If your program is CPU bound, then pre-emptive
scheduled threads are probably what you really need. Network servers are
Tim Peters's avatar
Tim Peters committed
37
rarely CPU-bound, however.
38

Tim Peters's avatar
Tim Peters committed
39
If your operating system supports the select() system call in its I/O
40 41 42 43 44 45
library (and nearly all do), then you can use it to juggle multiple
communication channels at once; doing other work while your I/O is taking
place in the "background."  Although this strategy can seem strange and
complex, especially at first, it is in many ways easier to understand and
control than multi-threaded programming. The module documented here solves
many of the difficult problems for you, making the task of building
Tim Peters's avatar
Tim Peters committed
46
sophisticated high-performance network servers and clients a snap.
47 48
"""

49 50 51
import select
import socket
import sys
52
import time
53 54
import warnings

55
import os
56
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
57
     ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
Giampaolo Rodolà's avatar
Giampaolo Rodolà committed
58
     errorcode
59

60 61
_DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
                           EBADF))
62

63
try:
64
    socket_map
65
except NameError:
66
    socket_map = {}
67

68
def _strerror(err):
69
    try:
70 71
        return os.strerror(err)
    except (ValueError, OverflowError, NameError):
72 73 74
        if err in errorcode:
            return errorcode[err]
        return "Unknown error %s" %err
75

76
class ExitNow(Exception):
77
    pass
78

79 80
_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)

Jeremy Hylton's avatar
Jeremy Hylton committed
81 82 83
def read(obj):
    try:
        obj.handle_read_event()
84
    except _reraised_exceptions:
Jeremy Hylton's avatar
Jeremy Hylton committed
85 86 87 88 89 90 91
        raise
    except:
        obj.handle_error()

def write(obj):
    try:
        obj.handle_write_event()
92
    except _reraised_exceptions:
Jeremy Hylton's avatar
Jeremy Hylton committed
93 94 95 96
        raise
    except:
        obj.handle_error()

97
def _exception(obj):
98 99
    try:
        obj.handle_expt_event()
100
    except _reraised_exceptions:
101 102 103 104
        raise
    except:
        obj.handle_error()

Jeremy Hylton's avatar
Jeremy Hylton committed
105 106
def readwrite(obj, flags):
    try:
107
        if flags & select.POLLIN:
Jeremy Hylton's avatar
Jeremy Hylton committed
108 109 110
            obj.handle_read_event()
        if flags & select.POLLOUT:
            obj.handle_write_event()
111 112
        if flags & select.POLLPRI:
            obj.handle_expt_event()
113 114 115
        if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
            obj.handle_close()
    except socket.error as e:
116
        if e.args[0] not in _DISCONNECTED:
117 118 119
            obj.handle_error()
        else:
            obj.handle_close()
120
    except _reraised_exceptions:
Jeremy Hylton's avatar
Jeremy Hylton committed
121 122 123
        raise
    except:
        obj.handle_error()
124

125
def poll(timeout=0.0, map=None):
126 127 128 129
    if map is None:
        map = socket_map
    if map:
        r = []; w = []; e = []
130
        for fd, obj in list(map.items()):
131 132 133
            is_r = obj.readable()
            is_w = obj.writable()
            if is_r:
Jeremy Hylton's avatar
Jeremy Hylton committed
134
                r.append(fd)
135 136
            # accepting sockets should not be writable
            if is_w and not obj.accepting:
Jeremy Hylton's avatar
Jeremy Hylton committed
137
                w.append(fd)
138 139
            if is_r or is_w:
                e.append(fd)
140 141
        if [] == r == w == e:
            time.sleep(timeout)
142 143 144 145
            return

        try:
            r, w, e = select.select(r, w, e, timeout)
146 147
        except InterruptedError:
            return
148 149

        for fd in r:
Jeremy Hylton's avatar
Jeremy Hylton committed
150 151
            obj = map.get(fd)
            if obj is None:
152
                continue
Jeremy Hylton's avatar
Jeremy Hylton committed
153
            read(obj)
154 155

        for fd in w:
Jeremy Hylton's avatar
Jeremy Hylton committed
156 157
            obj = map.get(fd)
            if obj is None:
158
                continue
Jeremy Hylton's avatar
Jeremy Hylton committed
159
            write(obj)
160

161 162 163 164 165 166
        for fd in e:
            obj = map.get(fd)
            if obj is None:
                continue
            _exception(obj)

167
def poll2(timeout=0.0, map=None):
168 169
    # Use the poll() support added to the select module in Python 2.0
    if map is None:
170
        map = socket_map
171 172 173
    if timeout is not None:
        # timeout is in milliseconds
        timeout = int(timeout*1000)
174 175
    pollster = select.poll()
    if map:
176
        for fd, obj in list(map.items()):
177
            flags = 0
178
            if obj.readable():
179
                flags |= select.POLLIN | select.POLLPRI
180 181
            # accepting sockets should not be writable
            if obj.writable() and not obj.accepting:
182
                flags |= select.POLLOUT
183 184
            if flags:
                pollster.register(fd, flags)
185
        try:
186
            r = pollster.poll(timeout)
187
        except InterruptedError:
188
            r = []
189
        for fd, flags in r:
Jeremy Hylton's avatar
Jeremy Hylton committed
190 191
            obj = map.get(fd)
            if obj is None:
192
                continue
Jeremy Hylton's avatar
Jeremy Hylton committed
193
            readwrite(obj, flags)
194

195 196
poll3 = poll2                           # Alias for backward compatibility

197
def loop(timeout=30.0, use_poll=False, map=None, count=None):
198
    if map is None:
Jeremy Hylton's avatar
Jeremy Hylton committed
199
        map = socket_map
200

201 202
    if use_poll and hasattr(select, 'poll'):
        poll_fun = poll2
203 204
    else:
        poll_fun = poll
205

206 207 208 209 210 211 212 213
    if count is None:
        while map:
            poll_fun(timeout, map)

    else:
        while map and count > 0:
            poll_fun(timeout, map)
            count = count - 1
214 215

class dispatcher:
216

217
    debug = False
218 219
    connected = False
    accepting = False
220
    connecting = False
221
    closing = False
222
    addr = None
223
    ignore_log_types = frozenset(['warning'])
224

225
    def __init__(self, sock=None, map=None):
226 227 228 229 230
        if map is None:
            self._map = socket_map
        else:
            self._map = map

231 232
        self._fileno = None

233
        if sock:
234 235 236
            # Set to nonblocking just to make sure for cases where we
            # get a socket from a blocking source.
            sock.setblocking(0)
237
            self.set_socket(sock, map)
238
            self.connected = True
239 240
            # The constructor no longer requires that the socket
            # passed be connected.
241 242
            try:
                self.addr = sock.getpeername()
243
            except socket.error as err:
244
                if err.args[0] in (ENOTCONN, EINVAL):
245 246 247 248 249 250 251 252 253
                    # To handle the case where we got an unconnected
                    # socket.
                    self.connected = False
                else:
                    # The socket is broken in some unknown way, alert
                    # the user and remove it from the map (to prevent
                    # polling of broken sockets).
                    self.del_channel(map)
                    raise
254 255
        else:
            self.socket = None
256

257
    def __repr__(self):
Martin v. Löwis's avatar
Martin v. Löwis committed
258 259
        status = [self.__class__.__module__+"."+self.__class__.__name__]
        if self.accepting and self.addr:
260
            status.append('listening')
Martin v. Löwis's avatar
Martin v. Löwis committed
261
        elif self.connected:
262
            status.append('connected')
Martin v. Löwis's avatar
Martin v. Löwis committed
263 264
        if self.addr is not None:
            try:
265
                status.append('%s:%d' % self.addr)
Martin v. Löwis's avatar
Martin v. Löwis committed
266
            except TypeError:
267 268
                status.append(repr(self.addr))
        return '<%s at %#x>' % (' '.join(status), id(self))
269

270 271
    __str__ = __repr__

272 273
    def add_channel(self, map=None):
        #self.log_info('adding channel %s' % self)
274
        if map is None:
275
            map = self._map
276
        map[self._fileno] = self
277

278
    def del_channel(self, map=None):
279 280
        fd = self._fileno
        if map is None:
281
            map = self._map
282
        if fd in map:
283
            #self.log_info('closing channel %d:%s' % (fd, self))
284
            del map[fd]
285
        self._fileno = None
286

287
    def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
288
        self.family_and_type = family, type
289 290 291
        sock = socket.socket(family, type)
        sock.setblocking(0)
        self.set_socket(sock)
292

293
    def set_socket(self, sock, map=None):
294 295
        self.socket = sock
##        self.__dict__['socket'] = sock
296
        self._fileno = sock.fileno()
297
        self.add_channel(map)
298

299
    def set_reuse_addr(self):
300 301
        # try to re-use a server port if possible
        try:
302
            self.socket.setsockopt(
303
                socket.SOL_SOCKET, socket.SO_REUSEADDR,
304
                self.socket.getsockopt(socket.SOL_SOCKET,
305
                                       socket.SO_REUSEADDR) | 1
306
                )
307
        except socket.error:
308
            pass
309

310 311 312 313 314 315
    # ==================================================
    # predicates for select()
    # these are used as filters for the lists of sockets
    # to pass to select().
    # ==================================================

316
    def readable(self):
317
        return True
318

319 320
    def writable(self):
        return True
321 322 323 324 325

    # ==================================================
    # socket object methods.
    # ==================================================

326
    def listen(self, num):
327
        self.accepting = True
328
        if os.name == 'nt' and num > 5:
329
            num = 5
330
        return self.socket.listen(num)
331

332
    def bind(self, addr):
333
        self.addr = addr
334
        return self.socket.bind(addr)
335

336
    def connect(self, address):
337
        self.connected = False
338
        self.connecting = True
339
        err = self.socket.connect_ex(address)
340 341
        if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
        or err == EINVAL and os.name in ('nt', 'ce'):
342
            self.addr = address
343 344 345
            return
        if err in (0, EISCONN):
            self.addr = address
346
            self.handle_connect_event()
347
        else:
348
            raise socket.error(err, errorcode[err])
349

350
    def accept(self):
351
        # XXX can return either an address pair or None
352 353
        try:
            conn, addr = self.socket.accept()
354 355
        except TypeError:
            return None
356
        except socket.error as why:
357
            if why.args[0] in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
358
                return None
359
            else:
Tim Peters's avatar
Tim Peters committed
360
                raise
361 362
        else:
            return conn, addr
363

364
    def send(self, data):
365
        try:
366
            result = self.socket.send(data)
367
            return result
368
        except socket.error as why:
369
            if why.args[0] == EWOULDBLOCK:
370
                return 0
371
            elif why.args[0] in _DISCONNECTED:
372
                self.handle_close()
373 374
                return 0
            else:
Tim Peters's avatar
Tim Peters committed
375
                raise
376

377
    def recv(self, buffer_size):
378
        try:
379
            data = self.socket.recv(buffer_size)
380 381 382 383
            if not data:
                # a closed connection is indicated by signaling
                # a read condition, and having recv() return 0.
                self.handle_close()
384
                return b''
385 386
            else:
                return data
387
        except socket.error as why:
388
            # winsock sometimes throws ENOTCONN
389
            if why.args[0] in _DISCONNECTED:
390
                self.handle_close()
391
                return b''
392
            else:
Tim Peters's avatar
Tim Peters committed
393
                raise
394

395
    def close(self):
396 397
        self.connected = False
        self.accepting = False
398
        self.connecting = False
399 400 401 402 403 404
        self.del_channel()
        try:
            self.socket.close()
        except socket.error as why:
            if why.args[0] not in (ENOTCONN, EBADF):
                raise
405 406 407

    # cheap inheritance, used to pass all other attribute
    # references to the underlying socket object.
408
    def __getattr__(self, attr):
409 410 411 412 413 414
        try:
            retattr = getattr(self.socket, attr)
        except AttributeError:
            raise AttributeError("%s instance has no attribute '%s'"
                                 %(self.__class__.__name__, attr))
        else:
415 416 417
            msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s " \
                  "instead" % {'me' : self.__class__.__name__, 'attr' : attr}
            warnings.warn(msg, DeprecationWarning, stacklevel=2)
418
            return retattr
419

Andrew M. Kuchling's avatar
Andrew M. Kuchling committed
420
    # log and log_info may be overridden to provide more sophisticated
421
    # logging and warning methods. In general, log is for 'hit' logging
Tim Peters's avatar
Tim Peters committed
422
    # and 'log_info' is for informational, warning and error logging.
423

424 425
    def log(self, message):
        sys.stderr.write('log: %s\n' % str(message))
426

427
    def log_info(self, message, type='info'):
428
        if type not in self.ignore_log_types:
429
            print('%s: %s' % (type, message))
430

431
    def handle_read_event(self):
432
        if self.accepting:
433 434
            # accepting sockets are never connected, they "spawn" new
            # sockets that are connected
435 436
            self.handle_accept()
        elif not self.connected:
437 438
            if self.connecting:
                self.handle_connect_event()
439 440 441 442
            self.handle_read()
        else:
            self.handle_read()

443
    def handle_connect_event(self):
444 445 446
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
            raise socket.error(err, _strerror(err))
447
        self.handle_connect()
448
        self.connected = True
449
        self.connecting = False
450

451
    def handle_write_event(self):
452 453 454 455 456
        if self.accepting:
            # Accepting sockets shouldn't get a write event.
            # We will pretend it didn't happen.
            return

457
        if not self.connected:
458 459
            if self.connecting:
                self.handle_connect_event()
460 461
        self.handle_write()

462
    def handle_expt_event(self):
463 464 465 466 467 468 469 470 471 472 473
        # handle_expt_event() is called if there might be an error on the
        # socket, or if there is OOB data
        # check for the error condition first
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
            # we can get here when select.select() says that there is an
            # exceptional condition on the socket
            # since there is an error, we'll go ahead and close the socket
            # like we would in a subclassed handle_read() that received no
            # data
            self.handle_close()
474 475
        else:
            self.handle_expt()
476

477
    def handle_error(self):
478
        nil, t, v, tbinfo = compact_traceback()
479 480 481

        # sometimes a user repr method will crash.
        try:
482
            self_repr = repr(self)
483
        except:
484
            self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
485

486
        self.log_info(
487 488 489 490 491 492 493 494
            'uncaptured python exception, closing channel %s (%s:%s %s)' % (
                self_repr,
                t,
                v,
                tbinfo
                ),
            'error'
            )
495
        self.handle_close()
496

497
    def handle_expt(self):
498
        self.log_info('unhandled incoming priority event', 'warning')
499

500 501
    def handle_read(self):
        self.log_info('unhandled read event', 'warning')
502

503 504
    def handle_write(self):
        self.log_info('unhandled write event', 'warning')
505

506 507
    def handle_connect(self):
        self.log_info('unhandled connect event', 'warning')
508

509
    def handle_accept(self):
510 511 512 513 514 515 516
        pair = self.accept()
        if pair is not None:
            self.handle_accepted(*pair)

    def handle_accepted(self, sock, addr):
        sock.close()
        self.log_info('unhandled accepted event', 'warning')
517

518 519
    def handle_close(self):
        self.log_info('unhandled close event', 'warning')
520
        self.close()
521 522 523 524 525 526

# ---------------------------------------------------------------------------
# adds simple buffered output capability, useful for simple clients.
# [for more sophisticated usage use asynchat.async_chat]
# ---------------------------------------------------------------------------

527 528
class dispatcher_with_send(dispatcher):

529 530
    def __init__(self, sock=None, map=None):
        dispatcher.__init__(self, sock, map)
531
        self.out_buffer = b''
532

533
    def initiate_send(self):
534
        num_sent = 0
535
        num_sent = dispatcher.send(self, self.out_buffer[:512])
536
        self.out_buffer = self.out_buffer[num_sent:]
537

538
    def handle_write(self):
539
        self.initiate_send()
540

541
    def writable(self):
542
        return (not self.connected) or len(self.out_buffer)
543

544
    def send(self, data):
545 546
        if self.debug:
            self.log_info('sending %s' % repr(data))
547 548
        self.out_buffer = self.out_buffer + data
        self.initiate_send()
549 550 551 552 553

# ---------------------------------------------------------------------------
# used for debugging.
# ---------------------------------------------------------------------------

554
def compact_traceback():
555
    t, v, tb = sys.exc_info()
556
    tbinfo = []
557 558
    if not tb: # Must have a traceback
        raise AssertionError("traceback does not exist")
559
    while tb:
560
        tbinfo.append((
561
            tb.tb_frame.f_code.co_filename,
Tim Peters's avatar
Tim Peters committed
562
            tb.tb_frame.f_code.co_name,
563 564 565 566 567 568 569 570
            str(tb.tb_lineno)
            ))
        tb = tb.tb_next

    # just to be safe
    del tb

    file, function, line = tbinfo[-1]
571
    info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
572
    return (file, function, line), t, v, info
573

574
def close_all(map=None, ignore_all=False):
575
    if map is None:
576
        map = socket_map
577 578 579 580
    for x in list(map.values()):
        try:
            x.close()
        except OSError as x:
581
            if x.args[0] == EBADF:
582 583 584
                pass
            elif not ignore_all:
                raise
585
        except _reraised_exceptions:
586 587 588 589
            raise
        except:
            if not ignore_all:
                raise
590
    map.clear()
591 592 593 594 595

# Asynchronous File I/O:
#
# After a little research (reading man pages on various unixen, and
# digging through the linux kernel), I've determined that select()
596
# isn't meant for doing asynchronous file i/o.
597 598 599 600 601 602 603 604 605
# Heartening, though - reading linux/mm/filemap.c shows that linux
# supports asynchronous read-ahead.  So _MOST_ of the time, the data
# will be sitting in memory for us already when we go to read it.
#
# What other OS's (besides NT) support async file i/o?  [VMS?]
#
# Regardless, this is useful for pipes, and stdin/stdout...

if os.name == 'posix':
606 607 608
    import fcntl

    class file_wrapper:
609
        # Here we override just enough to make a file
610
        # look like a socket for the purposes of asyncore.
611
        # The passed fd is automatically os.dup()'d
612 613

        def __init__(self, fd):
614
            self.fd = os.dup(fd)
615

616
        def recv(self, *args):
617
            return os.read(self.fd, *args)
618

619
        def send(self, *args):
620
            return os.write(self.fd, *args)
621

622 623 624 625 626 627 628 629
        def getsockopt(self, level, optname, buflen=None):
            if (level == socket.SOL_SOCKET and
                optname == socket.SO_ERROR and
                not buflen):
                return 0
            raise NotImplementedError("Only asyncore specific behaviour "
                                      "implemented.")

630 631 632
        read = recv
        write = send

633
        def close(self):
634
            os.close(self.fd)
635

636
        def fileno(self):
637 638
            return self.fd

639 640
    class file_dispatcher(dispatcher):

641 642
        def __init__(self, fd, map=None):
            dispatcher.__init__(self, None, map)
643
            self.connected = True
644 645 646 647
            try:
                fd = fd.fileno()
            except AttributeError:
                pass
648
            self.set_file(fd)
649
            # set it to non-blocking mode
650
            flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
651
            flags = flags | os.O_NONBLOCK
652
            fcntl.fcntl(fd, fcntl.F_SETFL, flags)
653

654 655
        def set_file(self, fd):
            self.socket = file_wrapper(fd)
656
            self._fileno = self.socket.fileno()
657
            self.add_channel()