1"""Event loop using a selector and related classes. 2 3A selector is a "notify-when-ready" multiplexer. For a subclass which 4also includes support for signal handling, see the unix_events sub-module. 5""" 6 7__all__ = 'BaseSelectorEventLoop', 8 9import collections 10import errno 11import functools 12import selectors 13import socket 14import warnings 15import weakref 16try: 17 import ssl 18except ImportError: # pragma: no cover 19 ssl = None 20 21from . import base_events 22from . import constants 23from . import events 24from . import futures 25from . import protocols 26from . import sslproto 27from . import transports 28from . import trsock 29from .log import logger 30 31 32def _test_selector_event(selector, fd, event): 33 # Test if the selector is monitoring 'event' events 34 # for the file descriptor 'fd'. 35 try: 36 key = selector.get_key(fd) 37 except KeyError: 38 return False 39 else: 40 return bool(key.events & event) 41 42 43class BaseSelectorEventLoop(base_events.BaseEventLoop): 44 """Selector event loop. 45 46 See events.EventLoop for API specification. 47 """ 48 49 def __init__(self, selector=None): 50 super().__init__() 51 52 if selector is None: 53 selector = selectors.DefaultSelector() 54 logger.debug('Using selector: %s', selector.__class__.__name__) 55 self._selector = selector 56 self._make_self_pipe() 57 self._transports = weakref.WeakValueDictionary() 58 59 def _make_socket_transport(self, sock, protocol, waiter=None, *, 60 extra=None, server=None): 61 return _SelectorSocketTransport(self, sock, protocol, waiter, 62 extra, server) 63 64 def _make_ssl_transport( 65 self, rawsock, protocol, sslcontext, waiter=None, 66 *, server_side=False, server_hostname=None, 67 extra=None, server=None, 68 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 69 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT, 70 ): 71 ssl_protocol = sslproto.SSLProtocol( 72 self, protocol, sslcontext, waiter, 73 server_side, server_hostname, 74 ssl_handshake_timeout=ssl_handshake_timeout, 75 ssl_shutdown_timeout=ssl_shutdown_timeout 76 ) 77 _SelectorSocketTransport(self, rawsock, ssl_protocol, 78 extra=extra, server=server) 79 return ssl_protocol._app_transport 80 81 def _make_datagram_transport(self, sock, protocol, 82 address=None, waiter=None, extra=None): 83 return _SelectorDatagramTransport(self, sock, protocol, 84 address, waiter, extra) 85 86 def close(self): 87 if self.is_running(): 88 raise RuntimeError("Cannot close a running event loop") 89 if self.is_closed(): 90 return 91 self._close_self_pipe() 92 super().close() 93 if self._selector is not None: 94 self._selector.close() 95 self._selector = None 96 97 def _close_self_pipe(self): 98 self._remove_reader(self._ssock.fileno()) 99 self._ssock.close() 100 self._ssock = None 101 self._csock.close() 102 self._csock = None 103 self._internal_fds -= 1 104 105 def _make_self_pipe(self): 106 # A self-socket, really. :-) 107 self._ssock, self._csock = socket.socketpair() 108 self._ssock.setblocking(False) 109 self._csock.setblocking(False) 110 self._internal_fds += 1 111 self._add_reader(self._ssock.fileno(), self._read_from_self) 112 113 def _process_self_data(self, data): 114 pass 115 116 def _read_from_self(self): 117 while True: 118 try: 119 data = self._ssock.recv(4096) 120 if not data: 121 break 122 self._process_self_data(data) 123 except InterruptedError: 124 continue 125 except BlockingIOError: 126 break 127 128 def _write_to_self(self): 129 # This may be called from a different thread, possibly after 130 # _close_self_pipe() has been called or even while it is 131 # running. Guard for self._csock being None or closed. When 132 # a socket is closed, send() raises OSError (with errno set to 133 # EBADF, but let's not rely on the exact error code). 134 csock = self._csock 135 if csock is None: 136 return 137 138 try: 139 csock.send(b'\0') 140 except OSError: 141 if self._debug: 142 logger.debug("Fail to write a null byte into the " 143 "self-pipe socket", 144 exc_info=True) 145 146 def _start_serving(self, protocol_factory, sock, 147 sslcontext=None, server=None, backlog=100, 148 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 149 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT): 150 self._add_reader(sock.fileno(), self._accept_connection, 151 protocol_factory, sock, sslcontext, server, backlog, 152 ssl_handshake_timeout, ssl_shutdown_timeout) 153 154 def _accept_connection( 155 self, protocol_factory, sock, 156 sslcontext=None, server=None, backlog=100, 157 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 158 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT): 159 # This method is only called once for each event loop tick where the 160 # listening socket has triggered an EVENT_READ. There may be multiple 161 # connections waiting for an .accept() so it is called in a loop. 162 # See https://bugs.python.org/issue27906 for more details. 163 for _ in range(backlog): 164 try: 165 conn, addr = sock.accept() 166 if self._debug: 167 logger.debug("%r got a new connection from %r: %r", 168 server, addr, conn) 169 conn.setblocking(False) 170 except (BlockingIOError, InterruptedError, ConnectionAbortedError): 171 # Early exit because the socket accept buffer is empty. 172 return None 173 except OSError as exc: 174 # There's nowhere to send the error, so just log it. 175 if exc.errno in (errno.EMFILE, errno.ENFILE, 176 errno.ENOBUFS, errno.ENOMEM): 177 # Some platforms (e.g. Linux keep reporting the FD as 178 # ready, so we remove the read handler temporarily. 179 # We'll try again in a while. 180 self.call_exception_handler({ 181 'message': 'socket.accept() out of system resource', 182 'exception': exc, 183 'socket': trsock.TransportSocket(sock), 184 }) 185 self._remove_reader(sock.fileno()) 186 self.call_later(constants.ACCEPT_RETRY_DELAY, 187 self._start_serving, 188 protocol_factory, sock, sslcontext, server, 189 backlog, ssl_handshake_timeout, 190 ssl_shutdown_timeout) 191 else: 192 raise # The event loop will catch, log and ignore it. 193 else: 194 extra = {'peername': addr} 195 accept = self._accept_connection2( 196 protocol_factory, conn, extra, sslcontext, server, 197 ssl_handshake_timeout, ssl_shutdown_timeout) 198 self.create_task(accept) 199 200 async def _accept_connection2( 201 self, protocol_factory, conn, extra, 202 sslcontext=None, server=None, 203 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT, 204 ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT): 205 protocol = None 206 transport = None 207 try: 208 protocol = protocol_factory() 209 waiter = self.create_future() 210 if sslcontext: 211 transport = self._make_ssl_transport( 212 conn, protocol, sslcontext, waiter=waiter, 213 server_side=True, extra=extra, server=server, 214 ssl_handshake_timeout=ssl_handshake_timeout, 215 ssl_shutdown_timeout=ssl_shutdown_timeout) 216 else: 217 transport = self._make_socket_transport( 218 conn, protocol, waiter=waiter, extra=extra, 219 server=server) 220 221 try: 222 await waiter 223 except BaseException: 224 transport.close() 225 raise 226 # It's now up to the protocol to handle the connection. 227 228 except (SystemExit, KeyboardInterrupt): 229 raise 230 except BaseException as exc: 231 if self._debug: 232 context = { 233 'message': 234 'Error on transport creation for incoming connection', 235 'exception': exc, 236 } 237 if protocol is not None: 238 context['protocol'] = protocol 239 if transport is not None: 240 context['transport'] = transport 241 self.call_exception_handler(context) 242 243 def _ensure_fd_no_transport(self, fd): 244 fileno = fd 245 if not isinstance(fileno, int): 246 try: 247 fileno = int(fileno.fileno()) 248 except (AttributeError, TypeError, ValueError): 249 # This code matches selectors._fileobj_to_fd function. 250 raise ValueError(f"Invalid file object: {fd!r}") from None 251 try: 252 transport = self._transports[fileno] 253 except KeyError: 254 pass 255 else: 256 if not transport.is_closing(): 257 raise RuntimeError( 258 f'File descriptor {fd!r} is used by transport ' 259 f'{transport!r}') 260 261 def _add_reader(self, fd, callback, *args): 262 self._check_closed() 263 handle = events.Handle(callback, args, self, None) 264 try: 265 key = self._selector.get_key(fd) 266 except KeyError: 267 self._selector.register(fd, selectors.EVENT_READ, 268 (handle, None)) 269 else: 270 mask, (reader, writer) = key.events, key.data 271 self._selector.modify(fd, mask | selectors.EVENT_READ, 272 (handle, writer)) 273 if reader is not None: 274 reader.cancel() 275 return handle 276 277 def _remove_reader(self, fd): 278 if self.is_closed(): 279 return False 280 try: 281 key = self._selector.get_key(fd) 282 except KeyError: 283 return False 284 else: 285 mask, (reader, writer) = key.events, key.data 286 mask &= ~selectors.EVENT_READ 287 if not mask: 288 self._selector.unregister(fd) 289 else: 290 self._selector.modify(fd, mask, (None, writer)) 291 292 if reader is not None: 293 reader.cancel() 294 return True 295 else: 296 return False 297 298 def _add_writer(self, fd, callback, *args): 299 self._check_closed() 300 handle = events.Handle(callback, args, self, None) 301 try: 302 key = self._selector.get_key(fd) 303 except KeyError: 304 self._selector.register(fd, selectors.EVENT_WRITE, 305 (None, handle)) 306 else: 307 mask, (reader, writer) = key.events, key.data 308 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 309 (reader, handle)) 310 if writer is not None: 311 writer.cancel() 312 return handle 313 314 def _remove_writer(self, fd): 315 """Remove a writer callback.""" 316 if self.is_closed(): 317 return False 318 try: 319 key = self._selector.get_key(fd) 320 except KeyError: 321 return False 322 else: 323 mask, (reader, writer) = key.events, key.data 324 # Remove both writer and connector. 325 mask &= ~selectors.EVENT_WRITE 326 if not mask: 327 self._selector.unregister(fd) 328 else: 329 self._selector.modify(fd, mask, (reader, None)) 330 331 if writer is not None: 332 writer.cancel() 333 return True 334 else: 335 return False 336 337 def add_reader(self, fd, callback, *args): 338 """Add a reader callback.""" 339 self._ensure_fd_no_transport(fd) 340 self._add_reader(fd, callback, *args) 341 342 def remove_reader(self, fd): 343 """Remove a reader callback.""" 344 self._ensure_fd_no_transport(fd) 345 return self._remove_reader(fd) 346 347 def add_writer(self, fd, callback, *args): 348 """Add a writer callback..""" 349 self._ensure_fd_no_transport(fd) 350 self._add_writer(fd, callback, *args) 351 352 def remove_writer(self, fd): 353 """Remove a writer callback.""" 354 self._ensure_fd_no_transport(fd) 355 return self._remove_writer(fd) 356 357 async def sock_recv(self, sock, n): 358 """Receive data from the socket. 359 360 The return value is a bytes object representing the data received. 361 The maximum amount of data to be received at once is specified by 362 nbytes. 363 """ 364 base_events._check_ssl_socket(sock) 365 if self._debug and sock.gettimeout() != 0: 366 raise ValueError("the socket must be non-blocking") 367 try: 368 return sock.recv(n) 369 except (BlockingIOError, InterruptedError): 370 pass 371 fut = self.create_future() 372 fd = sock.fileno() 373 self._ensure_fd_no_transport(fd) 374 handle = self._add_reader(fd, self._sock_recv, fut, sock, n) 375 fut.add_done_callback( 376 functools.partial(self._sock_read_done, fd, handle=handle)) 377 return await fut 378 379 def _sock_read_done(self, fd, fut, handle=None): 380 if handle is None or not handle.cancelled(): 381 self.remove_reader(fd) 382 383 def _sock_recv(self, fut, sock, n): 384 # _sock_recv() can add itself as an I/O callback if the operation can't 385 # be done immediately. Don't use it directly, call sock_recv(). 386 if fut.done(): 387 return 388 try: 389 data = sock.recv(n) 390 except (BlockingIOError, InterruptedError): 391 return # try again next time 392 except (SystemExit, KeyboardInterrupt): 393 raise 394 except BaseException as exc: 395 fut.set_exception(exc) 396 else: 397 fut.set_result(data) 398 399 async def sock_recv_into(self, sock, buf): 400 """Receive data from the socket. 401 402 The received data is written into *buf* (a writable buffer). 403 The return value is the number of bytes written. 404 """ 405 base_events._check_ssl_socket(sock) 406 if self._debug and sock.gettimeout() != 0: 407 raise ValueError("the socket must be non-blocking") 408 try: 409 return sock.recv_into(buf) 410 except (BlockingIOError, InterruptedError): 411 pass 412 fut = self.create_future() 413 fd = sock.fileno() 414 self._ensure_fd_no_transport(fd) 415 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) 416 fut.add_done_callback( 417 functools.partial(self._sock_read_done, fd, handle=handle)) 418 return await fut 419 420 def _sock_recv_into(self, fut, sock, buf): 421 # _sock_recv_into() can add itself as an I/O callback if the operation 422 # can't be done immediately. Don't use it directly, call 423 # sock_recv_into(). 424 if fut.done(): 425 return 426 try: 427 nbytes = sock.recv_into(buf) 428 except (BlockingIOError, InterruptedError): 429 return # try again next time 430 except (SystemExit, KeyboardInterrupt): 431 raise 432 except BaseException as exc: 433 fut.set_exception(exc) 434 else: 435 fut.set_result(nbytes) 436 437 async def sock_recvfrom(self, sock, bufsize): 438 """Receive a datagram from a datagram socket. 439 440 The return value is a tuple of (bytes, address) representing the 441 datagram received and the address it came from. 442 The maximum amount of data to be received at once is specified by 443 nbytes. 444 """ 445 base_events._check_ssl_socket(sock) 446 if self._debug and sock.gettimeout() != 0: 447 raise ValueError("the socket must be non-blocking") 448 try: 449 return sock.recvfrom(bufsize) 450 except (BlockingIOError, InterruptedError): 451 pass 452 fut = self.create_future() 453 fd = sock.fileno() 454 self._ensure_fd_no_transport(fd) 455 handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize) 456 fut.add_done_callback( 457 functools.partial(self._sock_read_done, fd, handle=handle)) 458 return await fut 459 460 def _sock_recvfrom(self, fut, sock, bufsize): 461 # _sock_recvfrom() can add itself as an I/O callback if the operation 462 # can't be done immediately. Don't use it directly, call 463 # sock_recvfrom(). 464 if fut.done(): 465 return 466 try: 467 result = sock.recvfrom(bufsize) 468 except (BlockingIOError, InterruptedError): 469 return # try again next time 470 except (SystemExit, KeyboardInterrupt): 471 raise 472 except BaseException as exc: 473 fut.set_exception(exc) 474 else: 475 fut.set_result(result) 476 477 async def sock_recvfrom_into(self, sock, buf, nbytes=0): 478 """Receive data from the socket. 479 480 The received data is written into *buf* (a writable buffer). 481 The return value is a tuple of (number of bytes written, address). 482 """ 483 base_events._check_ssl_socket(sock) 484 if self._debug and sock.gettimeout() != 0: 485 raise ValueError("the socket must be non-blocking") 486 if not nbytes: 487 nbytes = len(buf) 488 489 try: 490 return sock.recvfrom_into(buf, nbytes) 491 except (BlockingIOError, InterruptedError): 492 pass 493 fut = self.create_future() 494 fd = sock.fileno() 495 self._ensure_fd_no_transport(fd) 496 handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf, 497 nbytes) 498 fut.add_done_callback( 499 functools.partial(self._sock_read_done, fd, handle=handle)) 500 return await fut 501 502 def _sock_recvfrom_into(self, fut, sock, buf, bufsize): 503 # _sock_recv_into() can add itself as an I/O callback if the operation 504 # can't be done immediately. Don't use it directly, call 505 # sock_recv_into(). 506 if fut.done(): 507 return 508 try: 509 result = sock.recvfrom_into(buf, bufsize) 510 except (BlockingIOError, InterruptedError): 511 return # try again next time 512 except (SystemExit, KeyboardInterrupt): 513 raise 514 except BaseException as exc: 515 fut.set_exception(exc) 516 else: 517 fut.set_result(result) 518 519 async def sock_sendall(self, sock, data): 520 """Send data to the socket. 521 522 The socket must be connected to a remote socket. This method continues 523 to send data from data until either all data has been sent or an 524 error occurs. None is returned on success. On error, an exception is 525 raised, and there is no way to determine how much data, if any, was 526 successfully processed by the receiving end of the connection. 527 """ 528 base_events._check_ssl_socket(sock) 529 if self._debug and sock.gettimeout() != 0: 530 raise ValueError("the socket must be non-blocking") 531 try: 532 n = sock.send(data) 533 except (BlockingIOError, InterruptedError): 534 n = 0 535 536 if n == len(data): 537 # all data sent 538 return 539 540 fut = self.create_future() 541 fd = sock.fileno() 542 self._ensure_fd_no_transport(fd) 543 # use a trick with a list in closure to store a mutable state 544 handle = self._add_writer(fd, self._sock_sendall, fut, sock, 545 memoryview(data), [n]) 546 fut.add_done_callback( 547 functools.partial(self._sock_write_done, fd, handle=handle)) 548 return await fut 549 550 def _sock_sendall(self, fut, sock, view, pos): 551 if fut.done(): 552 # Future cancellation can be scheduled on previous loop iteration 553 return 554 start = pos[0] 555 try: 556 n = sock.send(view[start:]) 557 except (BlockingIOError, InterruptedError): 558 return 559 except (SystemExit, KeyboardInterrupt): 560 raise 561 except BaseException as exc: 562 fut.set_exception(exc) 563 return 564 565 start += n 566 567 if start == len(view): 568 fut.set_result(None) 569 else: 570 pos[0] = start 571 572 async def sock_sendto(self, sock, data, address): 573 """Send data to the socket. 574 575 The socket must be connected to a remote socket. This method continues 576 to send data from data until either all data has been sent or an 577 error occurs. None is returned on success. On error, an exception is 578 raised, and there is no way to determine how much data, if any, was 579 successfully processed by the receiving end of the connection. 580 """ 581 base_events._check_ssl_socket(sock) 582 if self._debug and sock.gettimeout() != 0: 583 raise ValueError("the socket must be non-blocking") 584 try: 585 return sock.sendto(data, address) 586 except (BlockingIOError, InterruptedError): 587 pass 588 589 fut = self.create_future() 590 fd = sock.fileno() 591 self._ensure_fd_no_transport(fd) 592 # use a trick with a list in closure to store a mutable state 593 handle = self._add_writer(fd, self._sock_sendto, fut, sock, data, 594 address) 595 fut.add_done_callback( 596 functools.partial(self._sock_write_done, fd, handle=handle)) 597 return await fut 598 599 def _sock_sendto(self, fut, sock, data, address): 600 if fut.done(): 601 # Future cancellation can be scheduled on previous loop iteration 602 return 603 try: 604 n = sock.sendto(data, 0, address) 605 except (BlockingIOError, InterruptedError): 606 return 607 except (SystemExit, KeyboardInterrupt): 608 raise 609 except BaseException as exc: 610 fut.set_exception(exc) 611 else: 612 fut.set_result(n) 613 614 async def sock_connect(self, sock, address): 615 """Connect to a remote socket at address. 616 617 This method is a coroutine. 618 """ 619 base_events._check_ssl_socket(sock) 620 if self._debug and sock.gettimeout() != 0: 621 raise ValueError("the socket must be non-blocking") 622 623 if sock.family == socket.AF_INET or ( 624 base_events._HAS_IPv6 and sock.family == socket.AF_INET6): 625 resolved = await self._ensure_resolved( 626 address, family=sock.family, type=sock.type, proto=sock.proto, 627 loop=self, 628 ) 629 _, _, _, _, address = resolved[0] 630 631 fut = self.create_future() 632 self._sock_connect(fut, sock, address) 633 try: 634 return await fut 635 finally: 636 # Needed to break cycles when an exception occurs. 637 fut = None 638 639 def _sock_connect(self, fut, sock, address): 640 fd = sock.fileno() 641 try: 642 sock.connect(address) 643 except (BlockingIOError, InterruptedError): 644 # Issue #23618: When the C function connect() fails with EINTR, the 645 # connection runs in background. We have to wait until the socket 646 # becomes writable to be notified when the connection succeed or 647 # fails. 648 self._ensure_fd_no_transport(fd) 649 handle = self._add_writer( 650 fd, self._sock_connect_cb, fut, sock, address) 651 fut.add_done_callback( 652 functools.partial(self._sock_write_done, fd, handle=handle)) 653 except (SystemExit, KeyboardInterrupt): 654 raise 655 except BaseException as exc: 656 fut.set_exception(exc) 657 else: 658 fut.set_result(None) 659 finally: 660 fut = None 661 662 def _sock_write_done(self, fd, fut, handle=None): 663 if handle is None or not handle.cancelled(): 664 self.remove_writer(fd) 665 666 def _sock_connect_cb(self, fut, sock, address): 667 if fut.done(): 668 return 669 670 try: 671 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 672 if err != 0: 673 # Jump to any except clause below. 674 raise OSError(err, f'Connect call failed {address}') 675 except (BlockingIOError, InterruptedError): 676 # socket is still registered, the callback will be retried later 677 pass 678 except (SystemExit, KeyboardInterrupt): 679 raise 680 except BaseException as exc: 681 fut.set_exception(exc) 682 else: 683 fut.set_result(None) 684 finally: 685 fut = None 686 687 async def sock_accept(self, sock): 688 """Accept a connection. 689 690 The socket must be bound to an address and listening for connections. 691 The return value is a pair (conn, address) where conn is a new socket 692 object usable to send and receive data on the connection, and address 693 is the address bound to the socket on the other end of the connection. 694 """ 695 base_events._check_ssl_socket(sock) 696 if self._debug and sock.gettimeout() != 0: 697 raise ValueError("the socket must be non-blocking") 698 fut = self.create_future() 699 self._sock_accept(fut, sock) 700 return await fut 701 702 def _sock_accept(self, fut, sock): 703 fd = sock.fileno() 704 try: 705 conn, address = sock.accept() 706 conn.setblocking(False) 707 except (BlockingIOError, InterruptedError): 708 self._ensure_fd_no_transport(fd) 709 handle = self._add_reader(fd, self._sock_accept, fut, sock) 710 fut.add_done_callback( 711 functools.partial(self._sock_read_done, fd, handle=handle)) 712 except (SystemExit, KeyboardInterrupt): 713 raise 714 except BaseException as exc: 715 fut.set_exception(exc) 716 else: 717 fut.set_result((conn, address)) 718 719 async def _sendfile_native(self, transp, file, offset, count): 720 del self._transports[transp._sock_fd] 721 resume_reading = transp.is_reading() 722 transp.pause_reading() 723 await transp._make_empty_waiter() 724 try: 725 return await self.sock_sendfile(transp._sock, file, offset, count, 726 fallback=False) 727 finally: 728 transp._reset_empty_waiter() 729 if resume_reading: 730 transp.resume_reading() 731 self._transports[transp._sock_fd] = transp 732 733 def _process_events(self, event_list): 734 for key, mask in event_list: 735 fileobj, (reader, writer) = key.fileobj, key.data 736 if mask & selectors.EVENT_READ and reader is not None: 737 if reader._cancelled: 738 self._remove_reader(fileobj) 739 else: 740 self._add_callback(reader) 741 if mask & selectors.EVENT_WRITE and writer is not None: 742 if writer._cancelled: 743 self._remove_writer(fileobj) 744 else: 745 self._add_callback(writer) 746 747 def _stop_serving(self, sock): 748 self._remove_reader(sock.fileno()) 749 sock.close() 750 751 752class _SelectorTransport(transports._FlowControlMixin, 753 transports.Transport): 754 755 max_size = 256 * 1024 # Buffer size passed to recv(). 756 757 _buffer_factory = bytearray # Constructs initial value for self._buffer. 758 759 # Attribute used in the destructor: it must be set even if the constructor 760 # is not called (see _SelectorSslTransport which may start by raising an 761 # exception) 762 _sock = None 763 764 def __init__(self, loop, sock, protocol, extra=None, server=None): 765 super().__init__(extra, loop) 766 self._extra['socket'] = trsock.TransportSocket(sock) 767 try: 768 self._extra['sockname'] = sock.getsockname() 769 except OSError: 770 self._extra['sockname'] = None 771 if 'peername' not in self._extra: 772 try: 773 self._extra['peername'] = sock.getpeername() 774 except socket.error: 775 self._extra['peername'] = None 776 self._sock = sock 777 self._sock_fd = sock.fileno() 778 779 self._protocol_connected = False 780 self.set_protocol(protocol) 781 782 self._server = server 783 self._buffer = self._buffer_factory() 784 self._conn_lost = 0 # Set when call to connection_lost scheduled. 785 self._closing = False # Set when close() called. 786 self._paused = False # Set when pause_reading() called 787 788 if self._server is not None: 789 self._server._attach() 790 loop._transports[self._sock_fd] = self 791 792 def __repr__(self): 793 info = [self.__class__.__name__] 794 if self._sock is None: 795 info.append('closed') 796 elif self._closing: 797 info.append('closing') 798 info.append(f'fd={self._sock_fd}') 799 # test if the transport was closed 800 if self._loop is not None and not self._loop.is_closed(): 801 polling = _test_selector_event(self._loop._selector, 802 self._sock_fd, selectors.EVENT_READ) 803 if polling: 804 info.append('read=polling') 805 else: 806 info.append('read=idle') 807 808 polling = _test_selector_event(self._loop._selector, 809 self._sock_fd, 810 selectors.EVENT_WRITE) 811 if polling: 812 state = 'polling' 813 else: 814 state = 'idle' 815 816 bufsize = self.get_write_buffer_size() 817 info.append(f'write=<{state}, bufsize={bufsize}>') 818 return '<{}>'.format(' '.join(info)) 819 820 def abort(self): 821 self._force_close(None) 822 823 def set_protocol(self, protocol): 824 self._protocol = protocol 825 self._protocol_connected = True 826 827 def get_protocol(self): 828 return self._protocol 829 830 def is_closing(self): 831 return self._closing 832 833 def is_reading(self): 834 return not self.is_closing() and not self._paused 835 836 def pause_reading(self): 837 if not self.is_reading(): 838 return 839 self._paused = True 840 self._loop._remove_reader(self._sock_fd) 841 if self._loop.get_debug(): 842 logger.debug("%r pauses reading", self) 843 844 def resume_reading(self): 845 if self._closing or not self._paused: 846 return 847 self._paused = False 848 self._add_reader(self._sock_fd, self._read_ready) 849 if self._loop.get_debug(): 850 logger.debug("%r resumes reading", self) 851 852 def close(self): 853 if self._closing: 854 return 855 self._closing = True 856 self._loop._remove_reader(self._sock_fd) 857 if not self._buffer: 858 self._conn_lost += 1 859 self._loop._remove_writer(self._sock_fd) 860 self._loop.call_soon(self._call_connection_lost, None) 861 862 def __del__(self, _warn=warnings.warn): 863 if self._sock is not None: 864 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 865 self._sock.close() 866 867 def _fatal_error(self, exc, message='Fatal error on transport'): 868 # Should be called from exception handler only. 869 if isinstance(exc, OSError): 870 if self._loop.get_debug(): 871 logger.debug("%r: %s", self, message, exc_info=True) 872 else: 873 self._loop.call_exception_handler({ 874 'message': message, 875 'exception': exc, 876 'transport': self, 877 'protocol': self._protocol, 878 }) 879 self._force_close(exc) 880 881 def _force_close(self, exc): 882 if self._conn_lost: 883 return 884 if self._buffer: 885 self._buffer.clear() 886 self._loop._remove_writer(self._sock_fd) 887 if not self._closing: 888 self._closing = True 889 self._loop._remove_reader(self._sock_fd) 890 self._conn_lost += 1 891 self._loop.call_soon(self._call_connection_lost, exc) 892 893 def _call_connection_lost(self, exc): 894 try: 895 if self._protocol_connected: 896 self._protocol.connection_lost(exc) 897 finally: 898 self._sock.close() 899 self._sock = None 900 self._protocol = None 901 self._loop = None 902 server = self._server 903 if server is not None: 904 server._detach() 905 self._server = None 906 907 def get_write_buffer_size(self): 908 return len(self._buffer) 909 910 def _add_reader(self, fd, callback, *args): 911 if not self.is_reading(): 912 return 913 self._loop._add_reader(fd, callback, *args) 914 915 916class _SelectorSocketTransport(_SelectorTransport): 917 918 _start_tls_compatible = True 919 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 920 921 def __init__(self, loop, sock, protocol, waiter=None, 922 extra=None, server=None): 923 924 self._read_ready_cb = None 925 super().__init__(loop, sock, protocol, extra, server) 926 self._eof = False 927 self._empty_waiter = None 928 929 # Disable the Nagle algorithm -- small writes will be 930 # sent without waiting for the TCP ACK. This generally 931 # decreases the latency (in some cases significantly.) 932 base_events._set_nodelay(self._sock) 933 934 self._loop.call_soon(self._protocol.connection_made, self) 935 # only start reading when connection_made() has been called 936 self._loop.call_soon(self._add_reader, 937 self._sock_fd, self._read_ready) 938 if waiter is not None: 939 # only wake up the waiter when connection_made() has been called 940 self._loop.call_soon(futures._set_result_unless_cancelled, 941 waiter, None) 942 943 def set_protocol(self, protocol): 944 if isinstance(protocol, protocols.BufferedProtocol): 945 self._read_ready_cb = self._read_ready__get_buffer 946 else: 947 self._read_ready_cb = self._read_ready__data_received 948 949 super().set_protocol(protocol) 950 951 def _read_ready(self): 952 self._read_ready_cb() 953 954 def _read_ready__get_buffer(self): 955 if self._conn_lost: 956 return 957 958 try: 959 buf = self._protocol.get_buffer(-1) 960 if not len(buf): 961 raise RuntimeError('get_buffer() returned an empty buffer') 962 except (SystemExit, KeyboardInterrupt): 963 raise 964 except BaseException as exc: 965 self._fatal_error( 966 exc, 'Fatal error: protocol.get_buffer() call failed.') 967 return 968 969 try: 970 nbytes = self._sock.recv_into(buf) 971 except (BlockingIOError, InterruptedError): 972 return 973 except (SystemExit, KeyboardInterrupt): 974 raise 975 except BaseException as exc: 976 self._fatal_error(exc, 'Fatal read error on socket transport') 977 return 978 979 if not nbytes: 980 self._read_ready__on_eof() 981 return 982 983 try: 984 self._protocol.buffer_updated(nbytes) 985 except (SystemExit, KeyboardInterrupt): 986 raise 987 except BaseException as exc: 988 self._fatal_error( 989 exc, 'Fatal error: protocol.buffer_updated() call failed.') 990 991 def _read_ready__data_received(self): 992 if self._conn_lost: 993 return 994 try: 995 data = self._sock.recv(self.max_size) 996 except (BlockingIOError, InterruptedError): 997 return 998 except (SystemExit, KeyboardInterrupt): 999 raise 1000 except BaseException as exc: 1001 self._fatal_error(exc, 'Fatal read error on socket transport') 1002 return 1003 1004 if not data: 1005 self._read_ready__on_eof() 1006 return 1007 1008 try: 1009 self._protocol.data_received(data) 1010 except (SystemExit, KeyboardInterrupt): 1011 raise 1012 except BaseException as exc: 1013 self._fatal_error( 1014 exc, 'Fatal error: protocol.data_received() call failed.') 1015 1016 def _read_ready__on_eof(self): 1017 if self._loop.get_debug(): 1018 logger.debug("%r received EOF", self) 1019 1020 try: 1021 keep_open = self._protocol.eof_received() 1022 except (SystemExit, KeyboardInterrupt): 1023 raise 1024 except BaseException as exc: 1025 self._fatal_error( 1026 exc, 'Fatal error: protocol.eof_received() call failed.') 1027 return 1028 1029 if keep_open: 1030 # We're keeping the connection open so the 1031 # protocol can write more, but we still can't 1032 # receive more, so remove the reader callback. 1033 self._loop._remove_reader(self._sock_fd) 1034 else: 1035 self.close() 1036 1037 def write(self, data): 1038 if not isinstance(data, (bytes, bytearray, memoryview)): 1039 raise TypeError(f'data argument must be a bytes-like object, ' 1040 f'not {type(data).__name__!r}') 1041 if self._eof: 1042 raise RuntimeError('Cannot call write() after write_eof()') 1043 if self._empty_waiter is not None: 1044 raise RuntimeError('unable to write; sendfile is in progress') 1045 if not data: 1046 return 1047 1048 if self._conn_lost: 1049 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1050 logger.warning('socket.send() raised exception.') 1051 self._conn_lost += 1 1052 return 1053 1054 if not self._buffer: 1055 # Optimization: try to send now. 1056 try: 1057 n = self._sock.send(data) 1058 except (BlockingIOError, InterruptedError): 1059 pass 1060 except (SystemExit, KeyboardInterrupt): 1061 raise 1062 except BaseException as exc: 1063 self._fatal_error(exc, 'Fatal write error on socket transport') 1064 return 1065 else: 1066 data = data[n:] 1067 if not data: 1068 return 1069 # Not all was written; register write handler. 1070 self._loop._add_writer(self._sock_fd, self._write_ready) 1071 1072 # Add it to the buffer. 1073 self._buffer.extend(data) 1074 self._maybe_pause_protocol() 1075 1076 def _write_ready(self): 1077 assert self._buffer, 'Data should not be empty' 1078 1079 if self._conn_lost: 1080 return 1081 try: 1082 n = self._sock.send(self._buffer) 1083 except (BlockingIOError, InterruptedError): 1084 pass 1085 except (SystemExit, KeyboardInterrupt): 1086 raise 1087 except BaseException as exc: 1088 self._loop._remove_writer(self._sock_fd) 1089 self._buffer.clear() 1090 self._fatal_error(exc, 'Fatal write error on socket transport') 1091 if self._empty_waiter is not None: 1092 self._empty_waiter.set_exception(exc) 1093 else: 1094 if n: 1095 del self._buffer[:n] 1096 self._maybe_resume_protocol() # May append to buffer. 1097 if not self._buffer: 1098 self._loop._remove_writer(self._sock_fd) 1099 if self._empty_waiter is not None: 1100 self._empty_waiter.set_result(None) 1101 if self._closing: 1102 self._call_connection_lost(None) 1103 elif self._eof: 1104 self._sock.shutdown(socket.SHUT_WR) 1105 1106 def write_eof(self): 1107 if self._closing or self._eof: 1108 return 1109 self._eof = True 1110 if not self._buffer: 1111 self._sock.shutdown(socket.SHUT_WR) 1112 1113 def can_write_eof(self): 1114 return True 1115 1116 def _call_connection_lost(self, exc): 1117 super()._call_connection_lost(exc) 1118 if self._empty_waiter is not None: 1119 self._empty_waiter.set_exception( 1120 ConnectionError("Connection is closed by peer")) 1121 1122 def _make_empty_waiter(self): 1123 if self._empty_waiter is not None: 1124 raise RuntimeError("Empty waiter is already set") 1125 self._empty_waiter = self._loop.create_future() 1126 if not self._buffer: 1127 self._empty_waiter.set_result(None) 1128 return self._empty_waiter 1129 1130 def _reset_empty_waiter(self): 1131 self._empty_waiter = None 1132 1133 1134class _SelectorDatagramTransport(_SelectorTransport): 1135 1136 _buffer_factory = collections.deque 1137 1138 def __init__(self, loop, sock, protocol, address=None, 1139 waiter=None, extra=None): 1140 super().__init__(loop, sock, protocol, extra) 1141 self._address = address 1142 self._buffer_size = 0 1143 self._loop.call_soon(self._protocol.connection_made, self) 1144 # only start reading when connection_made() has been called 1145 self._loop.call_soon(self._add_reader, 1146 self._sock_fd, self._read_ready) 1147 if waiter is not None: 1148 # only wake up the waiter when connection_made() has been called 1149 self._loop.call_soon(futures._set_result_unless_cancelled, 1150 waiter, None) 1151 1152 def get_write_buffer_size(self): 1153 return self._buffer_size 1154 1155 def _read_ready(self): 1156 if self._conn_lost: 1157 return 1158 try: 1159 data, addr = self._sock.recvfrom(self.max_size) 1160 except (BlockingIOError, InterruptedError): 1161 pass 1162 except OSError as exc: 1163 self._protocol.error_received(exc) 1164 except (SystemExit, KeyboardInterrupt): 1165 raise 1166 except BaseException as exc: 1167 self._fatal_error(exc, 'Fatal read error on datagram transport') 1168 else: 1169 self._protocol.datagram_received(data, addr) 1170 1171 def sendto(self, data, addr=None): 1172 if not isinstance(data, (bytes, bytearray, memoryview)): 1173 raise TypeError(f'data argument must be a bytes-like object, ' 1174 f'not {type(data).__name__!r}') 1175 if not data: 1176 return 1177 1178 if self._address: 1179 if addr not in (None, self._address): 1180 raise ValueError( 1181 f'Invalid address: must be None or {self._address}') 1182 addr = self._address 1183 1184 if self._conn_lost and self._address: 1185 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1186 logger.warning('socket.send() raised exception.') 1187 self._conn_lost += 1 1188 return 1189 1190 if not self._buffer: 1191 # Attempt to send it right away first. 1192 try: 1193 if self._extra['peername']: 1194 self._sock.send(data) 1195 else: 1196 self._sock.sendto(data, addr) 1197 return 1198 except (BlockingIOError, InterruptedError): 1199 self._loop._add_writer(self._sock_fd, self._sendto_ready) 1200 except OSError as exc: 1201 self._protocol.error_received(exc) 1202 return 1203 except (SystemExit, KeyboardInterrupt): 1204 raise 1205 except BaseException as exc: 1206 self._fatal_error( 1207 exc, 'Fatal write error on datagram transport') 1208 return 1209 1210 # Ensure that what we buffer is immutable. 1211 self._buffer.append((bytes(data), addr)) 1212 self._buffer_size += len(data) 1213 self._maybe_pause_protocol() 1214 1215 def _sendto_ready(self): 1216 while self._buffer: 1217 data, addr = self._buffer.popleft() 1218 self._buffer_size -= len(data) 1219 try: 1220 if self._extra['peername']: 1221 self._sock.send(data) 1222 else: 1223 self._sock.sendto(data, addr) 1224 except (BlockingIOError, InterruptedError): 1225 self._buffer.appendleft((data, addr)) # Try again later. 1226 self._buffer_size += len(data) 1227 break 1228 except OSError as exc: 1229 self._protocol.error_received(exc) 1230 return 1231 except (SystemExit, KeyboardInterrupt): 1232 raise 1233 except BaseException as exc: 1234 self._fatal_error( 1235 exc, 'Fatal write error on datagram transport') 1236 return 1237 1238 self._maybe_resume_protocol() # May append to buffer. 1239 if not self._buffer: 1240 self._loop._remove_writer(self._sock_fd) 1241 if self._closing: 1242 self._call_connection_lost(None) 1243