1# -*- Mode: Python -*-
2#   Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3#   Author: Sam Rushing <[email protected]>
4
5# ======================================================================
6# Copyright 1996 by Sam Rushing
7#
8#                         All Rights Reserved
9#
10# Permission to use, copy, modify, and distribute this software and
11# its documentation for any purpose and without fee is hereby
12# granted, provided that the above copyright notice appear in all
13# copies and that both that copyright notice and this permission
14# notice appear in supporting documentation, and that the name of Sam
15# Rushing not be used in advertising or publicity pertaining to
16# distribution of the software without specific, written prior
17# permission.
18#
19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26# ======================================================================
27
28"""Basic infrastructure for asynchronous socket service clients and servers.
29
30There are only two ways to have a program on a single processor do "more
31than one thing at a time".  Multi-threaded programming is the simplest and
32most popular way to do it, but there is another very different technique,
33that lets you have nearly all the advantages of multi-threading, without
34actually using multiple threads. it's really only practical if your program
35is largely I/O bound. If your program is CPU bound, then pre-emptive
36scheduled threads are probably what you really need. Network servers are
37rarely CPU-bound, however.
38
39If your operating system supports the select() system call in its I/O
40library (and nearly all do), then you can use it to juggle multiple
41communication channels at once; doing other work while your I/O is taking
42place in the "background."  Although this strategy can seem strange and
43complex, especially at first, it is in many ways easier to understand and
44control than multi-threaded programming. The module documented here solves
45many of the difficult problems for you, making the task of building
46sophisticated high-performance network servers and clients a snap.
47"""
48
49import select
50import socket
51import sys
52import time
53import warnings
54
55import os
56from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
57     ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
58     errorcode
59
60_DEPRECATION_MSG = ('The {name} module is deprecated and will be removed in '
61                    'Python {remove}. The recommended replacement is asyncio')
62warnings._deprecated(__name__, _DEPRECATION_MSG, remove=(3, 12))
63
64
65_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
66                           EBADF})
67
68try:
69    socket_map
70except NameError:
71    socket_map = {}
72
73def _strerror(err):
74    try:
75        return os.strerror(err)
76    except (ValueError, OverflowError, NameError):
77        if err in errorcode:
78            return errorcode[err]
79        return "Unknown error %s" %err
80
81class ExitNow(Exception):
82    pass
83
84_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
85
86def read(obj):
87    try:
88        obj.handle_read_event()
89    except _reraised_exceptions:
90        raise
91    except:
92        obj.handle_error()
93
94def write(obj):
95    try:
96        obj.handle_write_event()
97    except _reraised_exceptions:
98        raise
99    except:
100        obj.handle_error()
101
102def _exception(obj):
103    try:
104        obj.handle_expt_event()
105    except _reraised_exceptions:
106        raise
107    except:
108        obj.handle_error()
109
110def readwrite(obj, flags):
111    try:
112        if flags & select.POLLIN:
113            obj.handle_read_event()
114        if flags & select.POLLOUT:
115            obj.handle_write_event()
116        if flags & select.POLLPRI:
117            obj.handle_expt_event()
118        if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
119            obj.handle_close()
120    except OSError as e:
121        if e.errno not in _DISCONNECTED:
122            obj.handle_error()
123        else:
124            obj.handle_close()
125    except _reraised_exceptions:
126        raise
127    except:
128        obj.handle_error()
129
130def poll(timeout=0.0, map=None):
131    if map is None:
132        map = socket_map
133    if map:
134        r = []; w = []; e = []
135        for fd, obj in list(map.items()):
136            is_r = obj.readable()
137            is_w = obj.writable()
138            if is_r:
139                r.append(fd)
140            # accepting sockets should not be writable
141            if is_w and not obj.accepting:
142                w.append(fd)
143            if is_r or is_w:
144                e.append(fd)
145        if [] == r == w == e:
146            time.sleep(timeout)
147            return
148
149        r, w, e = select.select(r, w, e, timeout)
150
151        for fd in r:
152            obj = map.get(fd)
153            if obj is None:
154                continue
155            read(obj)
156
157        for fd in w:
158            obj = map.get(fd)
159            if obj is None:
160                continue
161            write(obj)
162
163        for fd in e:
164            obj = map.get(fd)
165            if obj is None:
166                continue
167            _exception(obj)
168
169def poll2(timeout=0.0, map=None):
170    # Use the poll() support added to the select module in Python 2.0
171    if map is None:
172        map = socket_map
173    if timeout is not None:
174        # timeout is in milliseconds
175        timeout = int(timeout*1000)
176    pollster = select.poll()
177    if map:
178        for fd, obj in list(map.items()):
179            flags = 0
180            if obj.readable():
181                flags |= select.POLLIN | select.POLLPRI
182            # accepting sockets should not be writable
183            if obj.writable() and not obj.accepting:
184                flags |= select.POLLOUT
185            if flags:
186                pollster.register(fd, flags)
187
188        r = pollster.poll(timeout)
189        for fd, flags in r:
190            obj = map.get(fd)
191            if obj is None:
192                continue
193            readwrite(obj, flags)
194
195poll3 = poll2                           # Alias for backward compatibility
196
197def loop(timeout=30.0, use_poll=False, map=None, count=None):
198    if map is None:
199        map = socket_map
200
201    if use_poll and hasattr(select, 'poll'):
202        poll_fun = poll2
203    else:
204        poll_fun = poll
205
206    if count is None:
207        while map:
208            poll_fun(timeout, map)
209
210    else:
211        while map and count > 0:
212            poll_fun(timeout, map)
213            count = count - 1
214
215class dispatcher:
216
217    debug = False
218    connected = False
219    accepting = False
220    connecting = False
221    closing = False
222    addr = None
223    ignore_log_types = frozenset({'warning'})
224
225    def __init__(self, sock=None, map=None):
226        if map is None:
227            self._map = socket_map
228        else:
229            self._map = map
230
231        self._fileno = None
232
233        if sock:
234            # Set to nonblocking just to make sure for cases where we
235            # get a socket from a blocking source.
236            sock.setblocking(False)
237            self.set_socket(sock, map)
238            self.connected = True
239            # The constructor no longer requires that the socket
240            # passed be connected.
241            try:
242                self.addr = sock.getpeername()
243            except OSError as err:
244                if err.errno in (ENOTCONN, EINVAL):
245                    # To handle the case where we got an unconnected
246                    # socket.
247                    self.connected = False
248                else:
249                    # The socket is broken in some unknown way, alert
250                    # the user and remove it from the map (to prevent
251                    # polling of broken sockets).
252                    self.del_channel(map)
253                    raise
254        else:
255            self.socket = None
256
257    def __repr__(self):
258        status = [self.__class__.__module__+"."+self.__class__.__qualname__]
259        if self.accepting and self.addr:
260            status.append('listening')
261        elif self.connected:
262            status.append('connected')
263        if self.addr is not None:
264            try:
265                status.append('%s:%d' % self.addr)
266            except TypeError:
267                status.append(repr(self.addr))
268        return '<%s at %#x>' % (' '.join(status), id(self))
269
270    def add_channel(self, map=None):
271        #self.log_info('adding channel %s' % self)
272        if map is None:
273            map = self._map
274        map[self._fileno] = self
275
276    def del_channel(self, map=None):
277        fd = self._fileno
278        if map is None:
279            map = self._map
280        if fd in map:
281            #self.log_info('closing channel %d:%s' % (fd, self))
282            del map[fd]
283        self._fileno = None
284
285    def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
286        self.family_and_type = family, type
287        sock = socket.socket(family, type)
288        sock.setblocking(False)
289        self.set_socket(sock)
290
291    def set_socket(self, sock, map=None):
292        self.socket = sock
293        self._fileno = sock.fileno()
294        self.add_channel(map)
295
296    def set_reuse_addr(self):
297        # try to re-use a server port if possible
298        try:
299            self.socket.setsockopt(
300                socket.SOL_SOCKET, socket.SO_REUSEADDR,
301                self.socket.getsockopt(socket.SOL_SOCKET,
302                                       socket.SO_REUSEADDR) | 1
303                )
304        except OSError:
305            pass
306
307    # ==================================================
308    # predicates for select()
309    # these are used as filters for the lists of sockets
310    # to pass to select().
311    # ==================================================
312
313    def readable(self):
314        return True
315
316    def writable(self):
317        return True
318
319    # ==================================================
320    # socket object methods.
321    # ==================================================
322
323    def listen(self, num):
324        self.accepting = True
325        if os.name == 'nt' and num > 5:
326            num = 5
327        return self.socket.listen(num)
328
329    def bind(self, addr):
330        self.addr = addr
331        return self.socket.bind(addr)
332
333    def connect(self, address):
334        self.connected = False
335        self.connecting = True
336        err = self.socket.connect_ex(address)
337        if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
338        or err == EINVAL and os.name == 'nt':
339            self.addr = address
340            return
341        if err in (0, EISCONN):
342            self.addr = address
343            self.handle_connect_event()
344        else:
345            raise OSError(err, errorcode[err])
346
347    def accept(self):
348        # XXX can return either an address pair or None
349        try:
350            conn, addr = self.socket.accept()
351        except TypeError:
352            return None
353        except OSError as why:
354            if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
355                return None
356            else:
357                raise
358        else:
359            return conn, addr
360
361    def send(self, data):
362        try:
363            result = self.socket.send(data)
364            return result
365        except OSError as why:
366            if why.errno == EWOULDBLOCK:
367                return 0
368            elif why.errno in _DISCONNECTED:
369                self.handle_close()
370                return 0
371            else:
372                raise
373
374    def recv(self, buffer_size):
375        try:
376            data = self.socket.recv(buffer_size)
377            if not data:
378                # a closed connection is indicated by signaling
379                # a read condition, and having recv() return 0.
380                self.handle_close()
381                return b''
382            else:
383                return data
384        except OSError as why:
385            # winsock sometimes raises ENOTCONN
386            if why.errno in _DISCONNECTED:
387                self.handle_close()
388                return b''
389            else:
390                raise
391
392    def close(self):
393        self.connected = False
394        self.accepting = False
395        self.connecting = False
396        self.del_channel()
397        if self.socket is not None:
398            try:
399                self.socket.close()
400            except OSError as why:
401                if why.errno not in (ENOTCONN, EBADF):
402                    raise
403
404    # log and log_info may be overridden to provide more sophisticated
405    # logging and warning methods. In general, log is for 'hit' logging
406    # and 'log_info' is for informational, warning and error logging.
407
408    def log(self, message):
409        sys.stderr.write('log: %s\n' % str(message))
410
411    def log_info(self, message, type='info'):
412        if type not in self.ignore_log_types:
413            print('%s: %s' % (type, message))
414
415    def handle_read_event(self):
416        if self.accepting:
417            # accepting sockets are never connected, they "spawn" new
418            # sockets that are connected
419            self.handle_accept()
420        elif not self.connected:
421            if self.connecting:
422                self.handle_connect_event()
423            self.handle_read()
424        else:
425            self.handle_read()
426
427    def handle_connect_event(self):
428        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
429        if err != 0:
430            raise OSError(err, _strerror(err))
431        self.handle_connect()
432        self.connected = True
433        self.connecting = False
434
435    def handle_write_event(self):
436        if self.accepting:
437            # Accepting sockets shouldn't get a write event.
438            # We will pretend it didn't happen.
439            return
440
441        if not self.connected:
442            if self.connecting:
443                self.handle_connect_event()
444        self.handle_write()
445
446    def handle_expt_event(self):
447        # handle_expt_event() is called if there might be an error on the
448        # socket, or if there is OOB data
449        # check for the error condition first
450        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
451        if err != 0:
452            # we can get here when select.select() says that there is an
453            # exceptional condition on the socket
454            # since there is an error, we'll go ahead and close the socket
455            # like we would in a subclassed handle_read() that received no
456            # data
457            self.handle_close()
458        else:
459            self.handle_expt()
460
461    def handle_error(self):
462        nil, t, v, tbinfo = compact_traceback()
463
464        # sometimes a user repr method will crash.
465        try:
466            self_repr = repr(self)
467        except:
468            self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
469
470        self.log_info(
471            'uncaptured python exception, closing channel %s (%s:%s %s)' % (
472                self_repr,
473                t,
474                v,
475                tbinfo
476                ),
477            'error'
478            )
479        self.handle_close()
480
481    def handle_expt(self):
482        self.log_info('unhandled incoming priority event', 'warning')
483
484    def handle_read(self):
485        self.log_info('unhandled read event', 'warning')
486
487    def handle_write(self):
488        self.log_info('unhandled write event', 'warning')
489
490    def handle_connect(self):
491        self.log_info('unhandled connect event', 'warning')
492
493    def handle_accept(self):
494        pair = self.accept()
495        if pair is not None:
496            self.handle_accepted(*pair)
497
498    def handle_accepted(self, sock, addr):
499        sock.close()
500        self.log_info('unhandled accepted event', 'warning')
501
502    def handle_close(self):
503        self.log_info('unhandled close event', 'warning')
504        self.close()
505
506# ---------------------------------------------------------------------------
507# adds simple buffered output capability, useful for simple clients.
508# [for more sophisticated usage use asynchat.async_chat]
509# ---------------------------------------------------------------------------
510
511class dispatcher_with_send(dispatcher):
512
513    def __init__(self, sock=None, map=None):
514        dispatcher.__init__(self, sock, map)
515        self.out_buffer = b''
516
517    def initiate_send(self):
518        num_sent = 0
519        num_sent = dispatcher.send(self, self.out_buffer[:65536])
520        self.out_buffer = self.out_buffer[num_sent:]
521
522    def handle_write(self):
523        self.initiate_send()
524
525    def writable(self):
526        return (not self.connected) or len(self.out_buffer)
527
528    def send(self, data):
529        if self.debug:
530            self.log_info('sending %s' % repr(data))
531        self.out_buffer = self.out_buffer + data
532        self.initiate_send()
533
534# ---------------------------------------------------------------------------
535# used for debugging.
536# ---------------------------------------------------------------------------
537
538def compact_traceback():
539    t, v, tb = sys.exc_info()
540    tbinfo = []
541    if not tb: # Must have a traceback
542        raise AssertionError("traceback does not exist")
543    while tb:
544        tbinfo.append((
545            tb.tb_frame.f_code.co_filename,
546            tb.tb_frame.f_code.co_name,
547            str(tb.tb_lineno)
548            ))
549        tb = tb.tb_next
550
551    # just to be safe
552    del tb
553
554    file, function, line = tbinfo[-1]
555    info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
556    return (file, function, line), t, v, info
557
558def close_all(map=None, ignore_all=False):
559    if map is None:
560        map = socket_map
561    for x in list(map.values()):
562        try:
563            x.close()
564        except OSError as x:
565            if x.errno == EBADF:
566                pass
567            elif not ignore_all:
568                raise
569        except _reraised_exceptions:
570            raise
571        except:
572            if not ignore_all:
573                raise
574    map.clear()
575
576# Asynchronous File I/O:
577#
578# After a little research (reading man pages on various unixen, and
579# digging through the linux kernel), I've determined that select()
580# isn't meant for doing asynchronous file i/o.
581# Heartening, though - reading linux/mm/filemap.c shows that linux
582# supports asynchronous read-ahead.  So _MOST_ of the time, the data
583# will be sitting in memory for us already when we go to read it.
584#
585# What other OS's (besides NT) support async file i/o?  [VMS?]
586#
587# Regardless, this is useful for pipes, and stdin/stdout...
588
589if os.name == 'posix':
590    class file_wrapper:
591        # Here we override just enough to make a file
592        # look like a socket for the purposes of asyncore.
593        # The passed fd is automatically os.dup()'d
594
595        def __init__(self, fd):
596            self.fd = os.dup(fd)
597
598        def __del__(self):
599            if self.fd >= 0:
600                warnings.warn("unclosed file %r" % self, ResourceWarning,
601                              source=self)
602            self.close()
603
604        def recv(self, *args):
605            return os.read(self.fd, *args)
606
607        def send(self, *args):
608            return os.write(self.fd, *args)
609
610        def getsockopt(self, level, optname, buflen=None):
611            if (level == socket.SOL_SOCKET and
612                optname == socket.SO_ERROR and
613                not buflen):
614                return 0
615            raise NotImplementedError("Only asyncore specific behaviour "
616                                      "implemented.")
617
618        read = recv
619        write = send
620
621        def close(self):
622            if self.fd < 0:
623                return
624            fd = self.fd
625            self.fd = -1
626            os.close(fd)
627
628        def fileno(self):
629            return self.fd
630
631    class file_dispatcher(dispatcher):
632
633        def __init__(self, fd, map=None):
634            dispatcher.__init__(self, None, map)
635            self.connected = True
636            try:
637                fd = fd.fileno()
638            except AttributeError:
639                pass
640            self.set_file(fd)
641            # set it to non-blocking mode
642            os.set_blocking(fd, False)
643
644        def set_file(self, fd):
645            self.socket = file_wrapper(fd)
646            self._fileno = self.socket.fileno()
647            self.add_channel()
648