1# -*- Mode: Python -*- 2# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp 3# Author: Sam Rushing <[email protected]> 4 5# ====================================================================== 6# Copyright 1996 by Sam Rushing 7# 8# All Rights Reserved 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose and without fee is hereby 12# granted, provided that the above copyright notice appear in all 13# copies and that both that copyright notice and this permission 14# notice appear in supporting documentation, and that the name of Sam 15# Rushing not be used in advertising or publicity pertaining to 16# distribution of the software without specific, written prior 17# permission. 18# 19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26# ====================================================================== 27 28"""Basic infrastructure for asynchronous socket service clients and servers. 29 30There are only two ways to have a program on a single processor do "more 31than one thing at a time". Multi-threaded programming is the simplest and 32most popular way to do it, but there is another very different technique, 33that lets you have nearly all the advantages of multi-threading, without 34actually using multiple threads. it's really only practical if your program 35is largely I/O bound. If your program is CPU bound, then pre-emptive 36scheduled threads are probably what you really need. Network servers are 37rarely CPU-bound, however. 38 39If your operating system supports the select() system call in its I/O 40library (and nearly all do), then you can use it to juggle multiple 41communication channels at once; doing other work while your I/O is taking 42place in the "background." Although this strategy can seem strange and 43complex, especially at first, it is in many ways easier to understand and 44control than multi-threaded programming. The module documented here solves 45many of the difficult problems for you, making the task of building 46sophisticated high-performance network servers and clients a snap. 47""" 48 49import select 50import socket 51import sys 52import time 53import warnings 54 55import os 56from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ 57 ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \ 58 errorcode 59 60_DEPRECATION_MSG = ('The {name} module is deprecated and will be removed in ' 61 'Python {remove}. The recommended replacement is asyncio') 62warnings._deprecated(__name__, _DEPRECATION_MSG, remove=(3, 12)) 63 64 65_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, 66 EBADF}) 67 68try: 69 socket_map 70except NameError: 71 socket_map = {} 72 73def _strerror(err): 74 try: 75 return os.strerror(err) 76 except (ValueError, OverflowError, NameError): 77 if err in errorcode: 78 return errorcode[err] 79 return "Unknown error %s" %err 80 81class ExitNow(Exception): 82 pass 83 84_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit) 85 86def read(obj): 87 try: 88 obj.handle_read_event() 89 except _reraised_exceptions: 90 raise 91 except: 92 obj.handle_error() 93 94def write(obj): 95 try: 96 obj.handle_write_event() 97 except _reraised_exceptions: 98 raise 99 except: 100 obj.handle_error() 101 102def _exception(obj): 103 try: 104 obj.handle_expt_event() 105 except _reraised_exceptions: 106 raise 107 except: 108 obj.handle_error() 109 110def readwrite(obj, flags): 111 try: 112 if flags & select.POLLIN: 113 obj.handle_read_event() 114 if flags & select.POLLOUT: 115 obj.handle_write_event() 116 if flags & select.POLLPRI: 117 obj.handle_expt_event() 118 if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL): 119 obj.handle_close() 120 except OSError as e: 121 if e.errno not in _DISCONNECTED: 122 obj.handle_error() 123 else: 124 obj.handle_close() 125 except _reraised_exceptions: 126 raise 127 except: 128 obj.handle_error() 129 130def poll(timeout=0.0, map=None): 131 if map is None: 132 map = socket_map 133 if map: 134 r = []; w = []; e = [] 135 for fd, obj in list(map.items()): 136 is_r = obj.readable() 137 is_w = obj.writable() 138 if is_r: 139 r.append(fd) 140 # accepting sockets should not be writable 141 if is_w and not obj.accepting: 142 w.append(fd) 143 if is_r or is_w: 144 e.append(fd) 145 if [] == r == w == e: 146 time.sleep(timeout) 147 return 148 149 r, w, e = select.select(r, w, e, timeout) 150 151 for fd in r: 152 obj = map.get(fd) 153 if obj is None: 154 continue 155 read(obj) 156 157 for fd in w: 158 obj = map.get(fd) 159 if obj is None: 160 continue 161 write(obj) 162 163 for fd in e: 164 obj = map.get(fd) 165 if obj is None: 166 continue 167 _exception(obj) 168 169def poll2(timeout=0.0, map=None): 170 # Use the poll() support added to the select module in Python 2.0 171 if map is None: 172 map = socket_map 173 if timeout is not None: 174 # timeout is in milliseconds 175 timeout = int(timeout*1000) 176 pollster = select.poll() 177 if map: 178 for fd, obj in list(map.items()): 179 flags = 0 180 if obj.readable(): 181 flags |= select.POLLIN | select.POLLPRI 182 # accepting sockets should not be writable 183 if obj.writable() and not obj.accepting: 184 flags |= select.POLLOUT 185 if flags: 186 pollster.register(fd, flags) 187 188 r = pollster.poll(timeout) 189 for fd, flags in r: 190 obj = map.get(fd) 191 if obj is None: 192 continue 193 readwrite(obj, flags) 194 195poll3 = poll2 # Alias for backward compatibility 196 197def loop(timeout=30.0, use_poll=False, map=None, count=None): 198 if map is None: 199 map = socket_map 200 201 if use_poll and hasattr(select, 'poll'): 202 poll_fun = poll2 203 else: 204 poll_fun = poll 205 206 if count is None: 207 while map: 208 poll_fun(timeout, map) 209 210 else: 211 while map and count > 0: 212 poll_fun(timeout, map) 213 count = count - 1 214 215class dispatcher: 216 217 debug = False 218 connected = False 219 accepting = False 220 connecting = False 221 closing = False 222 addr = None 223 ignore_log_types = frozenset({'warning'}) 224 225 def __init__(self, sock=None, map=None): 226 if map is None: 227 self._map = socket_map 228 else: 229 self._map = map 230 231 self._fileno = None 232 233 if sock: 234 # Set to nonblocking just to make sure for cases where we 235 # get a socket from a blocking source. 236 sock.setblocking(False) 237 self.set_socket(sock, map) 238 self.connected = True 239 # The constructor no longer requires that the socket 240 # passed be connected. 241 try: 242 self.addr = sock.getpeername() 243 except OSError as err: 244 if err.errno in (ENOTCONN, EINVAL): 245 # To handle the case where we got an unconnected 246 # socket. 247 self.connected = False 248 else: 249 # The socket is broken in some unknown way, alert 250 # the user and remove it from the map (to prevent 251 # polling of broken sockets). 252 self.del_channel(map) 253 raise 254 else: 255 self.socket = None 256 257 def __repr__(self): 258 status = [self.__class__.__module__+"."+self.__class__.__qualname__] 259 if self.accepting and self.addr: 260 status.append('listening') 261 elif self.connected: 262 status.append('connected') 263 if self.addr is not None: 264 try: 265 status.append('%s:%d' % self.addr) 266 except TypeError: 267 status.append(repr(self.addr)) 268 return '<%s at %#x>' % (' '.join(status), id(self)) 269 270 def add_channel(self, map=None): 271 #self.log_info('adding channel %s' % self) 272 if map is None: 273 map = self._map 274 map[self._fileno] = self 275 276 def del_channel(self, map=None): 277 fd = self._fileno 278 if map is None: 279 map = self._map 280 if fd in map: 281 #self.log_info('closing channel %d:%s' % (fd, self)) 282 del map[fd] 283 self._fileno = None 284 285 def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM): 286 self.family_and_type = family, type 287 sock = socket.socket(family, type) 288 sock.setblocking(False) 289 self.set_socket(sock) 290 291 def set_socket(self, sock, map=None): 292 self.socket = sock 293 self._fileno = sock.fileno() 294 self.add_channel(map) 295 296 def set_reuse_addr(self): 297 # try to re-use a server port if possible 298 try: 299 self.socket.setsockopt( 300 socket.SOL_SOCKET, socket.SO_REUSEADDR, 301 self.socket.getsockopt(socket.SOL_SOCKET, 302 socket.SO_REUSEADDR) | 1 303 ) 304 except OSError: 305 pass 306 307 # ================================================== 308 # predicates for select() 309 # these are used as filters for the lists of sockets 310 # to pass to select(). 311 # ================================================== 312 313 def readable(self): 314 return True 315 316 def writable(self): 317 return True 318 319 # ================================================== 320 # socket object methods. 321 # ================================================== 322 323 def listen(self, num): 324 self.accepting = True 325 if os.name == 'nt' and num > 5: 326 num = 5 327 return self.socket.listen(num) 328 329 def bind(self, addr): 330 self.addr = addr 331 return self.socket.bind(addr) 332 333 def connect(self, address): 334 self.connected = False 335 self.connecting = True 336 err = self.socket.connect_ex(address) 337 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \ 338 or err == EINVAL and os.name == 'nt': 339 self.addr = address 340 return 341 if err in (0, EISCONN): 342 self.addr = address 343 self.handle_connect_event() 344 else: 345 raise OSError(err, errorcode[err]) 346 347 def accept(self): 348 # XXX can return either an address pair or None 349 try: 350 conn, addr = self.socket.accept() 351 except TypeError: 352 return None 353 except OSError as why: 354 if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN): 355 return None 356 else: 357 raise 358 else: 359 return conn, addr 360 361 def send(self, data): 362 try: 363 result = self.socket.send(data) 364 return result 365 except OSError as why: 366 if why.errno == EWOULDBLOCK: 367 return 0 368 elif why.errno in _DISCONNECTED: 369 self.handle_close() 370 return 0 371 else: 372 raise 373 374 def recv(self, buffer_size): 375 try: 376 data = self.socket.recv(buffer_size) 377 if not data: 378 # a closed connection is indicated by signaling 379 # a read condition, and having recv() return 0. 380 self.handle_close() 381 return b'' 382 else: 383 return data 384 except OSError as why: 385 # winsock sometimes raises ENOTCONN 386 if why.errno in _DISCONNECTED: 387 self.handle_close() 388 return b'' 389 else: 390 raise 391 392 def close(self): 393 self.connected = False 394 self.accepting = False 395 self.connecting = False 396 self.del_channel() 397 if self.socket is not None: 398 try: 399 self.socket.close() 400 except OSError as why: 401 if why.errno not in (ENOTCONN, EBADF): 402 raise 403 404 # log and log_info may be overridden to provide more sophisticated 405 # logging and warning methods. In general, log is for 'hit' logging 406 # and 'log_info' is for informational, warning and error logging. 407 408 def log(self, message): 409 sys.stderr.write('log: %s\n' % str(message)) 410 411 def log_info(self, message, type='info'): 412 if type not in self.ignore_log_types: 413 print('%s: %s' % (type, message)) 414 415 def handle_read_event(self): 416 if self.accepting: 417 # accepting sockets are never connected, they "spawn" new 418 # sockets that are connected 419 self.handle_accept() 420 elif not self.connected: 421 if self.connecting: 422 self.handle_connect_event() 423 self.handle_read() 424 else: 425 self.handle_read() 426 427 def handle_connect_event(self): 428 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 429 if err != 0: 430 raise OSError(err, _strerror(err)) 431 self.handle_connect() 432 self.connected = True 433 self.connecting = False 434 435 def handle_write_event(self): 436 if self.accepting: 437 # Accepting sockets shouldn't get a write event. 438 # We will pretend it didn't happen. 439 return 440 441 if not self.connected: 442 if self.connecting: 443 self.handle_connect_event() 444 self.handle_write() 445 446 def handle_expt_event(self): 447 # handle_expt_event() is called if there might be an error on the 448 # socket, or if there is OOB data 449 # check for the error condition first 450 err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 451 if err != 0: 452 # we can get here when select.select() says that there is an 453 # exceptional condition on the socket 454 # since there is an error, we'll go ahead and close the socket 455 # like we would in a subclassed handle_read() that received no 456 # data 457 self.handle_close() 458 else: 459 self.handle_expt() 460 461 def handle_error(self): 462 nil, t, v, tbinfo = compact_traceback() 463 464 # sometimes a user repr method will crash. 465 try: 466 self_repr = repr(self) 467 except: 468 self_repr = '<__repr__(self) failed for object at %0x>' % id(self) 469 470 self.log_info( 471 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( 472 self_repr, 473 t, 474 v, 475 tbinfo 476 ), 477 'error' 478 ) 479 self.handle_close() 480 481 def handle_expt(self): 482 self.log_info('unhandled incoming priority event', 'warning') 483 484 def handle_read(self): 485 self.log_info('unhandled read event', 'warning') 486 487 def handle_write(self): 488 self.log_info('unhandled write event', 'warning') 489 490 def handle_connect(self): 491 self.log_info('unhandled connect event', 'warning') 492 493 def handle_accept(self): 494 pair = self.accept() 495 if pair is not None: 496 self.handle_accepted(*pair) 497 498 def handle_accepted(self, sock, addr): 499 sock.close() 500 self.log_info('unhandled accepted event', 'warning') 501 502 def handle_close(self): 503 self.log_info('unhandled close event', 'warning') 504 self.close() 505 506# --------------------------------------------------------------------------- 507# adds simple buffered output capability, useful for simple clients. 508# [for more sophisticated usage use asynchat.async_chat] 509# --------------------------------------------------------------------------- 510 511class dispatcher_with_send(dispatcher): 512 513 def __init__(self, sock=None, map=None): 514 dispatcher.__init__(self, sock, map) 515 self.out_buffer = b'' 516 517 def initiate_send(self): 518 num_sent = 0 519 num_sent = dispatcher.send(self, self.out_buffer[:65536]) 520 self.out_buffer = self.out_buffer[num_sent:] 521 522 def handle_write(self): 523 self.initiate_send() 524 525 def writable(self): 526 return (not self.connected) or len(self.out_buffer) 527 528 def send(self, data): 529 if self.debug: 530 self.log_info('sending %s' % repr(data)) 531 self.out_buffer = self.out_buffer + data 532 self.initiate_send() 533 534# --------------------------------------------------------------------------- 535# used for debugging. 536# --------------------------------------------------------------------------- 537 538def compact_traceback(): 539 t, v, tb = sys.exc_info() 540 tbinfo = [] 541 if not tb: # Must have a traceback 542 raise AssertionError("traceback does not exist") 543 while tb: 544 tbinfo.append(( 545 tb.tb_frame.f_code.co_filename, 546 tb.tb_frame.f_code.co_name, 547 str(tb.tb_lineno) 548 )) 549 tb = tb.tb_next 550 551 # just to be safe 552 del tb 553 554 file, function, line = tbinfo[-1] 555 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) 556 return (file, function, line), t, v, info 557 558def close_all(map=None, ignore_all=False): 559 if map is None: 560 map = socket_map 561 for x in list(map.values()): 562 try: 563 x.close() 564 except OSError as x: 565 if x.errno == EBADF: 566 pass 567 elif not ignore_all: 568 raise 569 except _reraised_exceptions: 570 raise 571 except: 572 if not ignore_all: 573 raise 574 map.clear() 575 576# Asynchronous File I/O: 577# 578# After a little research (reading man pages on various unixen, and 579# digging through the linux kernel), I've determined that select() 580# isn't meant for doing asynchronous file i/o. 581# Heartening, though - reading linux/mm/filemap.c shows that linux 582# supports asynchronous read-ahead. So _MOST_ of the time, the data 583# will be sitting in memory for us already when we go to read it. 584# 585# What other OS's (besides NT) support async file i/o? [VMS?] 586# 587# Regardless, this is useful for pipes, and stdin/stdout... 588 589if os.name == 'posix': 590 class file_wrapper: 591 # Here we override just enough to make a file 592 # look like a socket for the purposes of asyncore. 593 # The passed fd is automatically os.dup()'d 594 595 def __init__(self, fd): 596 self.fd = os.dup(fd) 597 598 def __del__(self): 599 if self.fd >= 0: 600 warnings.warn("unclosed file %r" % self, ResourceWarning, 601 source=self) 602 self.close() 603 604 def recv(self, *args): 605 return os.read(self.fd, *args) 606 607 def send(self, *args): 608 return os.write(self.fd, *args) 609 610 def getsockopt(self, level, optname, buflen=None): 611 if (level == socket.SOL_SOCKET and 612 optname == socket.SO_ERROR and 613 not buflen): 614 return 0 615 raise NotImplementedError("Only asyncore specific behaviour " 616 "implemented.") 617 618 read = recv 619 write = send 620 621 def close(self): 622 if self.fd < 0: 623 return 624 fd = self.fd 625 self.fd = -1 626 os.close(fd) 627 628 def fileno(self): 629 return self.fd 630 631 class file_dispatcher(dispatcher): 632 633 def __init__(self, fd, map=None): 634 dispatcher.__init__(self, None, map) 635 self.connected = True 636 try: 637 fd = fd.fileno() 638 except AttributeError: 639 pass 640 self.set_file(fd) 641 # set it to non-blocking mode 642 os.set_blocking(fd, False) 643 644 def set_file(self, fd): 645 self.socket = file_wrapper(fd) 646 self._fileno = self.socket.fileno() 647 self.add_channel() 648