1"""Event loop using a proactor and related classes. 2 3A proactor is a "notify-on-completion" multiplexer. Currently a 4proactor is only implemented on Windows with IOCP. 5""" 6 7__all__ = 'BaseProactorEventLoop', 8 9import io 10import os 11import socket 12import warnings 13import signal 14import threading 15import collections 16 17from . import base_events 18from . import constants 19from . import futures 20from . import exceptions 21from . import protocols 22from . import sslproto 23from . import transports 24from . import trsock 25from .log import logger 26 27 28def _set_socket_extra(transport, sock): 29 transport._extra['socket'] = trsock.TransportSocket(sock) 30 31 try: 32 transport._extra['sockname'] = sock.getsockname() 33 except socket.error: 34 if transport._loop.get_debug(): 35 logger.warning( 36 "getsockname() failed on %r", sock, exc_info=True) 37 38 if 'peername' not in transport._extra: 39 try: 40 transport._extra['peername'] = sock.getpeername() 41 except socket.error: 42 # UDP sockets may not have a peer name 43 transport._extra['peername'] = None 44 45 46class _ProactorBasePipeTransport(transports._FlowControlMixin, 47 transports.BaseTransport): 48 """Base class for pipe and socket transports.""" 49 50 def __init__(self, loop, sock, protocol, waiter=None, 51 extra=None, server=None): 52 super().__init__(extra, loop) 53 self._set_extra(sock) 54 self._sock = sock 55 self.set_protocol(protocol) 56 self._server = server 57 self._buffer = None # None or bytearray. 58 self._read_fut = None 59 self._write_fut = None 60 self._pending_write = 0 61 self._conn_lost = 0 62 self._closing = False # Set when close() called. 63 self._called_connection_lost = False 64 self._eof_written = False 65 if self._server is not None: 66 self._server._attach() 67 self._loop.call_soon(self._protocol.connection_made, self) 68 if waiter is not None: 69 # only wake up the waiter when connection_made() has been called 70 self._loop.call_soon(futures._set_result_unless_cancelled, 71 waiter, None) 72 73 def __repr__(self): 74 info = [self.__class__.__name__] 75 if self._sock is None: 76 info.append('closed') 77 elif self._closing: 78 info.append('closing') 79 if self._sock is not None: 80 info.append(f'fd={self._sock.fileno()}') 81 if self._read_fut is not None: 82 info.append(f'read={self._read_fut!r}') 83 if self._write_fut is not None: 84 info.append(f'write={self._write_fut!r}') 85 if self._buffer: 86 info.append(f'write_bufsize={len(self._buffer)}') 87 if self._eof_written: 88 info.append('EOF written') 89 return '<{}>'.format(' '.join(info)) 90 91 def _set_extra(self, sock): 92 self._extra['pipe'] = sock 93 94 def set_protocol(self, protocol): 95 self._protocol = protocol 96 97 def get_protocol(self): 98 return self._protocol 99 100 def is_closing(self): 101 return self._closing 102 103 def close(self): 104 if self._closing: 105 return 106 self._closing = True 107 self._conn_lost += 1 108 if not self._buffer and self._write_fut is None: 109 self._loop.call_soon(self._call_connection_lost, None) 110 if self._read_fut is not None: 111 self._read_fut.cancel() 112 self._read_fut = None 113 114 def __del__(self, _warn=warnings.warn): 115 if self._sock is not None: 116 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 117 self._sock.close() 118 119 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 120 try: 121 if isinstance(exc, OSError): 122 if self._loop.get_debug(): 123 logger.debug("%r: %s", self, message, exc_info=True) 124 else: 125 self._loop.call_exception_handler({ 126 'message': message, 127 'exception': exc, 128 'transport': self, 129 'protocol': self._protocol, 130 }) 131 finally: 132 self._force_close(exc) 133 134 def _force_close(self, exc): 135 if self._empty_waiter is not None and not self._empty_waiter.done(): 136 if exc is None: 137 self._empty_waiter.set_result(None) 138 else: 139 self._empty_waiter.set_exception(exc) 140 if self._closing and self._called_connection_lost: 141 return 142 self._closing = True 143 self._conn_lost += 1 144 if self._write_fut: 145 self._write_fut.cancel() 146 self._write_fut = None 147 if self._read_fut: 148 self._read_fut.cancel() 149 self._read_fut = None 150 self._pending_write = 0 151 self._buffer = None 152 self._loop.call_soon(self._call_connection_lost, exc) 153 154 def _call_connection_lost(self, exc): 155 if self._called_connection_lost: 156 return 157 try: 158 self._protocol.connection_lost(exc) 159 finally: 160 # XXX If there is a pending overlapped read on the other 161 # end then it may fail with ERROR_NETNAME_DELETED if we 162 # just close our end. First calling shutdown() seems to 163 # cure it, but maybe using DisconnectEx() would be better. 164 if hasattr(self._sock, 'shutdown') and self._sock.fileno() != -1: 165 self._sock.shutdown(socket.SHUT_RDWR) 166 self._sock.close() 167 self._sock = None 168 server = self._server 169 if server is not None: 170 server._detach() 171 self._server = None 172 self._called_connection_lost = True 173 174 def get_write_buffer_size(self): 175 size = self._pending_write 176 if self._buffer is not None: 177 size += len(self._buffer) 178 return size 179 180 181class _ProactorReadPipeTransport(_ProactorBasePipeTransport, 182 transports.ReadTransport): 183 """Transport for read pipes.""" 184 185 def __init__(self, loop, sock, protocol, waiter=None, 186 extra=None, server=None, buffer_size=65536): 187 self._pending_data_length = -1 188 self._paused = True 189 super().__init__(loop, sock, protocol, waiter, extra, server) 190 191 self._data = bytearray(buffer_size) 192 self._loop.call_soon(self._loop_reading) 193 self._paused = False 194 195 def is_reading(self): 196 return not self._paused and not self._closing 197 198 def pause_reading(self): 199 if self._closing or self._paused: 200 return 201 self._paused = True 202 203 # bpo-33694: Don't cancel self._read_fut because cancelling an 204 # overlapped WSASend() loss silently data with the current proactor 205 # implementation. 206 # 207 # If CancelIoEx() fails with ERROR_NOT_FOUND, it means that WSASend() 208 # completed (even if HasOverlappedIoCompleted() returns 0), but 209 # Overlapped.cancel() currently silently ignores the ERROR_NOT_FOUND 210 # error. Once the overlapped is ignored, the IOCP loop will ignores the 211 # completion I/O event and so not read the result of the overlapped 212 # WSARecv(). 213 214 if self._loop.get_debug(): 215 logger.debug("%r pauses reading", self) 216 217 def resume_reading(self): 218 if self._closing or not self._paused: 219 return 220 221 self._paused = False 222 if self._read_fut is None: 223 self._loop.call_soon(self._loop_reading, None) 224 225 length = self._pending_data_length 226 self._pending_data_length = -1 227 if length > -1: 228 # Call the protocol method after calling _loop_reading(), 229 # since the protocol can decide to pause reading again. 230 self._loop.call_soon(self._data_received, self._data[:length], length) 231 232 if self._loop.get_debug(): 233 logger.debug("%r resumes reading", self) 234 235 def _eof_received(self): 236 if self._loop.get_debug(): 237 logger.debug("%r received EOF", self) 238 239 try: 240 keep_open = self._protocol.eof_received() 241 except (SystemExit, KeyboardInterrupt): 242 raise 243 except BaseException as exc: 244 self._fatal_error( 245 exc, 'Fatal error: protocol.eof_received() call failed.') 246 return 247 248 if not keep_open: 249 self.close() 250 251 def _data_received(self, data, length): 252 if self._paused: 253 # Don't call any protocol method while reading is paused. 254 # The protocol will be called on resume_reading(). 255 assert self._pending_data_length == -1 256 self._pending_data_length = length 257 return 258 259 if length == 0: 260 self._eof_received() 261 return 262 263 if isinstance(self._protocol, protocols.BufferedProtocol): 264 try: 265 protocols._feed_data_to_buffered_proto(self._protocol, data) 266 except (SystemExit, KeyboardInterrupt): 267 raise 268 except BaseException as exc: 269 self._fatal_error(exc, 270 'Fatal error: protocol.buffer_updated() ' 271 'call failed.') 272 return 273 else: 274 self._protocol.data_received(data) 275 276 def _loop_reading(self, fut=None): 277 length = -1 278 data = None 279 try: 280 if fut is not None: 281 assert self._read_fut is fut or (self._read_fut is None and 282 self._closing) 283 self._read_fut = None 284 if fut.done(): 285 # deliver data later in "finally" clause 286 length = fut.result() 287 if length == 0: 288 # we got end-of-file so no need to reschedule a new read 289 return 290 291 data = self._data[:length] 292 else: 293 # the future will be replaced by next proactor.recv call 294 fut.cancel() 295 296 if self._closing: 297 # since close() has been called we ignore any read data 298 return 299 300 # bpo-33694: buffer_updated() has currently no fast path because of 301 # a data loss issue caused by overlapped WSASend() cancellation. 302 303 if not self._paused: 304 # reschedule a new read 305 self._read_fut = self._loop._proactor.recv_into(self._sock, self._data) 306 except ConnectionAbortedError as exc: 307 if not self._closing: 308 self._fatal_error(exc, 'Fatal read error on pipe transport') 309 elif self._loop.get_debug(): 310 logger.debug("Read error on pipe transport while closing", 311 exc_info=True) 312 except ConnectionResetError as exc: 313 self._force_close(exc) 314 except OSError as exc: 315 self._fatal_error(exc, 'Fatal read error on pipe transport') 316 except exceptions.CancelledError: 317 if not self._closing: 318 raise 319 else: 320 if not self._paused: 321 self._read_fut.add_done_callback(self._loop_reading) 322 finally: 323 if length > -1: 324 self._data_received(data, length) 325 326 327class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, 328 transports.WriteTransport): 329 """Transport for write pipes.""" 330 331 _start_tls_compatible = True 332 333 def __init__(self, *args, **kw): 334 super().__init__(*args, **kw) 335 self._empty_waiter = None 336 337 def write(self, data): 338 if not isinstance(data, (bytes, bytearray, memoryview)): 339 raise TypeError( 340 f"data argument must be a bytes-like object, " 341 f"not {type(data).__name__}") 342 if self._eof_written: 343 raise RuntimeError('write_eof() already called') 344 if self._empty_waiter is not None: 345 raise RuntimeError('unable to write; sendfile is in progress') 346 347 if not data: 348 return 349 350 if self._conn_lost: 351 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 352 logger.warning('socket.send() raised exception.') 353 self._conn_lost += 1 354 return 355 356 # Observable states: 357 # 1. IDLE: _write_fut and _buffer both None 358 # 2. WRITING: _write_fut set; _buffer None 359 # 3. BACKED UP: _write_fut set; _buffer a bytearray 360 # We always copy the data, so the caller can't modify it 361 # while we're still waiting for the I/O to happen. 362 if self._write_fut is None: # IDLE -> WRITING 363 assert self._buffer is None 364 # Pass a copy, except if it's already immutable. 365 self._loop_writing(data=bytes(data)) 366 elif not self._buffer: # WRITING -> BACKED UP 367 # Make a mutable copy which we can extend. 368 self._buffer = bytearray(data) 369 self._maybe_pause_protocol() 370 else: # BACKED UP 371 # Append to buffer (also copies). 372 self._buffer.extend(data) 373 self._maybe_pause_protocol() 374 375 def _loop_writing(self, f=None, data=None): 376 try: 377 if f is not None and self._write_fut is None and self._closing: 378 # XXX most likely self._force_close() has been called, and 379 # it has set self._write_fut to None. 380 return 381 assert f is self._write_fut 382 self._write_fut = None 383 self._pending_write = 0 384 if f: 385 f.result() 386 if data is None: 387 data = self._buffer 388 self._buffer = None 389 if not data: 390 if self._closing: 391 self._loop.call_soon(self._call_connection_lost, None) 392 if self._eof_written: 393 self._sock.shutdown(socket.SHUT_WR) 394 # Now that we've reduced the buffer size, tell the 395 # protocol to resume writing if it was paused. Note that 396 # we do this last since the callback is called immediately 397 # and it may add more data to the buffer (even causing the 398 # protocol to be paused again). 399 self._maybe_resume_protocol() 400 else: 401 self._write_fut = self._loop._proactor.send(self._sock, data) 402 if not self._write_fut.done(): 403 assert self._pending_write == 0 404 self._pending_write = len(data) 405 self._write_fut.add_done_callback(self._loop_writing) 406 self._maybe_pause_protocol() 407 else: 408 self._write_fut.add_done_callback(self._loop_writing) 409 if self._empty_waiter is not None and self._write_fut is None: 410 self._empty_waiter.set_result(None) 411 except ConnectionResetError as exc: 412 self._force_close(exc) 413 except OSError as exc: 414 self._fatal_error(exc, 'Fatal write error on pipe transport') 415 416 def can_write_eof(self): 417 return True 418 419 def write_eof(self): 420 self.close() 421 422 def abort(self): 423 self._force_close(None) 424 425 def _make_empty_waiter(self): 426 if self._empty_waiter is not None: 427 raise RuntimeError("Empty waiter is already set") 428 self._empty_waiter = self._loop.create_future() 429 if self._write_fut is None: 430 self._empty_waiter.set_result(None) 431 return self._empty_waiter 432 433 def _reset_empty_waiter(self): 434 self._empty_waiter = None 435 436 437class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport): 438 def __init__(self, *args, **kw): 439 super().__init__(*args, **kw) 440 self._read_fut = self._loop._proactor.recv(self._sock, 16) 441 self._read_fut.add_done_callback(self._pipe_closed) 442 443 def _pipe_closed(self, fut): 444 if fut.cancelled(): 445 # the transport has been closed 446 return 447 assert fut.result() == b'' 448 if self._closing: 449 assert self._read_fut is None 450 return 451 assert fut is self._read_fut, (fut, self._read_fut) 452 self._read_fut = None 453 if self._write_fut is not None: 454 self._force_close(BrokenPipeError()) 455 else: 456 self.close() 457 458 459class _ProactorDatagramTransport(_ProactorBasePipeTransport, 460 transports.DatagramTransport): 461 max_size = 256 * 1024 462 def __init__(self, loop, sock, protocol, address=None, 463 waiter=None, extra=None): 464 self._address = address 465 self._empty_waiter = None 466 self._buffer_size = 0 467 # We don't need to call _protocol.connection_made() since our base 468 # constructor does it for us. 469 super().__init__(loop, sock, protocol, waiter=waiter, extra=extra) 470 471 # The base constructor sets _buffer = None, so we set it here 472 self._buffer = collections.deque() 473 self._loop.call_soon(self._loop_reading) 474 475 def _set_extra(self, sock): 476 _set_socket_extra(self, sock) 477 478 def get_write_buffer_size(self): 479 return self._buffer_size 480 481 def abort(self): 482 self._force_close(None) 483 484 def sendto(self, data, addr=None): 485 if not isinstance(data, (bytes, bytearray, memoryview)): 486 raise TypeError('data argument must be bytes-like object (%r)', 487 type(data)) 488 489 if not data: 490 return 491 492 if self._address is not None and addr not in (None, self._address): 493 raise ValueError( 494 f'Invalid address: must be None or {self._address}') 495 496 if self._conn_lost and self._address: 497 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 498 logger.warning('socket.sendto() raised exception.') 499 self._conn_lost += 1 500 return 501 502 # Ensure that what we buffer is immutable. 503 self._buffer.append((bytes(data), addr)) 504 self._buffer_size += len(data) 505 506 if self._write_fut is None: 507 # No current write operations are active, kick one off 508 self._loop_writing() 509 # else: A write operation is already kicked off 510 511 self._maybe_pause_protocol() 512 513 def _loop_writing(self, fut=None): 514 try: 515 if self._conn_lost: 516 return 517 518 assert fut is self._write_fut 519 self._write_fut = None 520 if fut: 521 # We are in a _loop_writing() done callback, get the result 522 fut.result() 523 524 if not self._buffer or (self._conn_lost and self._address): 525 # The connection has been closed 526 if self._closing: 527 self._loop.call_soon(self._call_connection_lost, None) 528 return 529 530 data, addr = self._buffer.popleft() 531 self._buffer_size -= len(data) 532 if self._address is not None: 533 self._write_fut = self._loop._proactor.send(self._sock, 534 data) 535 else: 536 self._write_fut = self._loop._proactor.sendto(self._sock, 537 data, 538 addr=addr) 539 except OSError as exc: 540 self._protocol.error_received(exc) 541 except Exception as exc: 542 self._fatal_error(exc, 'Fatal write error on datagram transport') 543 else: 544 self._write_fut.add_done_callback(self._loop_writing) 545 self._maybe_resume_protocol() 546 547 def _loop_reading(self, fut=None): 548 data = None 549 try: 550 if self._conn_lost: 551 return 552 553 assert self._read_fut is fut or (self._read_fut is None and 554 self._closing) 555 556 self._read_fut = None 557 if fut is not None: 558 res = fut.result() 559 560 if self._closing: 561 # since close() has been called we ignore any read data 562 data = None 563 return 564 565 if self._address is not None: 566 data, addr = res, self._address 567 else: 568 data, addr = res 569 570 if self._conn_lost: 571 return 572 if self._address is not None: 573 self._read_fut = self._loop._proactor.recv(self._sock, 574 self.max_size) 575 else: 576 self._read_fut = self._loop._proactor.recvfrom(self._sock, 577 self.max_size) 578 except OSError as exc: 579 self._protocol.error_received(exc) 580 except exceptions.CancelledError: 581 if not self._closing: 582 raise 583 else: 584 if self._read_fut is not None: 585 self._read_fut.add_done_callback(self._loop_reading) 586 finally: 587 if data: 588 self._protocol.datagram_received(data, addr) 589 590 591class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport, 592 _ProactorBaseWritePipeTransport, 593 transports.Transport): 594 """Transport for duplex pipes.""" 595 596 def can_write_eof(self): 597 return False 598 599 def write_eof(self): 600 raise NotImplementedError 601 602 603class _ProactorSocketTransport(_ProactorReadPipeTransport, 604 _ProactorBaseWritePipeTransport, 605 transports.Transport): 606 """Transport for connected sockets.""" 607 608 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 609 610 def __init__(self, loop, sock, protocol, waiter=None, 611 extra=None, server=None): 612 super().__init__(loop, sock, protocol, waiter, extra, server) 613 base_events._set_nodelay(sock) 614 615 def _set_extra(self, sock): 616 _set_socket_extra(self, sock) 617 618 def can_write_eof(self): 619 return True 620 621 def write_eof(self): 622 if self._closing or self._eof_written: 623 return 624 self._eof_written = True 625 if self._write_fut is None: 626 self._sock.shutdown(socket.SHUT_WR) 627 628 629class BaseProactorEventLoop(base_events.BaseEventLoop): 630 631 def __init__(self, proactor): 632 super().__init__() 633 logger.debug('Using proactor: %s', proactor.__class__.__name__) 634 self._proactor = proactor 635 self._selector = proactor # convenient alias 636 self._self_reading_future = None 637 self._accept_futures = {} # socket file descriptor => Future 638 proactor.set_loop(self) 639 self._make_self_pipe() 640 if threading.current_thread() is threading.main_thread(): 641 # wakeup fd can only be installed to a file descriptor from the main thread 642 signal.set_wakeup_fd(self._csock.fileno()) 643 644 def _make_socket_transport(self, sock, protocol, waiter=None, 645 extra=None, server=None): 646 return _ProactorSocketTransport(self, sock, protocol, waiter, 647 extra, server) 648 649 def _make_ssl_transport( 650 self, rawsock, protocol, sslcontext, waiter=None, 651 *, server_side=False, server_hostname=None, 652 extra=None, server=None, 653 ssl_handshake_timeout=None, 654 ssl_shutdown_timeout=None): 655 ssl_protocol = sslproto.SSLProtocol( 656 self, protocol, sslcontext, waiter, 657 server_side, server_hostname, 658 ssl_handshake_timeout=ssl_handshake_timeout, 659 ssl_shutdown_timeout=ssl_shutdown_timeout) 660 _ProactorSocketTransport(self, rawsock, ssl_protocol, 661 extra=extra, server=server) 662 return ssl_protocol._app_transport 663 664 def _make_datagram_transport(self, sock, protocol, 665 address=None, waiter=None, extra=None): 666 return _ProactorDatagramTransport(self, sock, protocol, address, 667 waiter, extra) 668 669 def _make_duplex_pipe_transport(self, sock, protocol, waiter=None, 670 extra=None): 671 return _ProactorDuplexPipeTransport(self, 672 sock, protocol, waiter, extra) 673 674 def _make_read_pipe_transport(self, sock, protocol, waiter=None, 675 extra=None): 676 return _ProactorReadPipeTransport(self, sock, protocol, waiter, extra) 677 678 def _make_write_pipe_transport(self, sock, protocol, waiter=None, 679 extra=None): 680 # We want connection_lost() to be called when other end closes 681 return _ProactorWritePipeTransport(self, 682 sock, protocol, waiter, extra) 683 684 def close(self): 685 if self.is_running(): 686 raise RuntimeError("Cannot close a running event loop") 687 if self.is_closed(): 688 return 689 690 if threading.current_thread() is threading.main_thread(): 691 signal.set_wakeup_fd(-1) 692 # Call these methods before closing the event loop (before calling 693 # BaseEventLoop.close), because they can schedule callbacks with 694 # call_soon(), which is forbidden when the event loop is closed. 695 self._stop_accept_futures() 696 self._close_self_pipe() 697 self._proactor.close() 698 self._proactor = None 699 self._selector = None 700 701 # Close the event loop 702 super().close() 703 704 async def sock_recv(self, sock, n): 705 return await self._proactor.recv(sock, n) 706 707 async def sock_recv_into(self, sock, buf): 708 return await self._proactor.recv_into(sock, buf) 709 710 async def sock_recvfrom(self, sock, bufsize): 711 return await self._proactor.recvfrom(sock, bufsize) 712 713 async def sock_recvfrom_into(self, sock, buf, nbytes=0): 714 if not nbytes: 715 nbytes = len(buf) 716 717 return await self._proactor.recvfrom_into(sock, buf, nbytes) 718 719 async def sock_sendall(self, sock, data): 720 return await self._proactor.send(sock, data) 721 722 async def sock_sendto(self, sock, data, address): 723 return await self._proactor.sendto(sock, data, 0, address) 724 725 async def sock_connect(self, sock, address): 726 return await self._proactor.connect(sock, address) 727 728 async def sock_accept(self, sock): 729 return await self._proactor.accept(sock) 730 731 async def _sock_sendfile_native(self, sock, file, offset, count): 732 try: 733 fileno = file.fileno() 734 except (AttributeError, io.UnsupportedOperation) as err: 735 raise exceptions.SendfileNotAvailableError("not a regular file") 736 try: 737 fsize = os.fstat(fileno).st_size 738 except OSError: 739 raise exceptions.SendfileNotAvailableError("not a regular file") 740 blocksize = count if count else fsize 741 if not blocksize: 742 return 0 # empty file 743 744 blocksize = min(blocksize, 0xffff_ffff) 745 end_pos = min(offset + count, fsize) if count else fsize 746 offset = min(offset, fsize) 747 total_sent = 0 748 try: 749 while True: 750 blocksize = min(end_pos - offset, blocksize) 751 if blocksize <= 0: 752 return total_sent 753 await self._proactor.sendfile(sock, file, offset, blocksize) 754 offset += blocksize 755 total_sent += blocksize 756 finally: 757 if total_sent > 0: 758 file.seek(offset) 759 760 async def _sendfile_native(self, transp, file, offset, count): 761 resume_reading = transp.is_reading() 762 transp.pause_reading() 763 await transp._make_empty_waiter() 764 try: 765 return await self.sock_sendfile(transp._sock, file, offset, count, 766 fallback=False) 767 finally: 768 transp._reset_empty_waiter() 769 if resume_reading: 770 transp.resume_reading() 771 772 def _close_self_pipe(self): 773 if self._self_reading_future is not None: 774 self._self_reading_future.cancel() 775 self._self_reading_future = None 776 self._ssock.close() 777 self._ssock = None 778 self._csock.close() 779 self._csock = None 780 self._internal_fds -= 1 781 782 def _make_self_pipe(self): 783 # A self-socket, really. :-) 784 self._ssock, self._csock = socket.socketpair() 785 self._ssock.setblocking(False) 786 self._csock.setblocking(False) 787 self._internal_fds += 1 788 789 def _loop_self_reading(self, f=None): 790 try: 791 if f is not None: 792 f.result() # may raise 793 if self._self_reading_future is not f: 794 # When we scheduled this Future, we assigned it to 795 # _self_reading_future. If it's not there now, something has 796 # tried to cancel the loop while this callback was still in the 797 # queue (see windows_events.ProactorEventLoop.run_forever). In 798 # that case stop here instead of continuing to schedule a new 799 # iteration. 800 return 801 f = self._proactor.recv(self._ssock, 4096) 802 except exceptions.CancelledError: 803 # _close_self_pipe() has been called, stop waiting for data 804 return 805 except (SystemExit, KeyboardInterrupt): 806 raise 807 except BaseException as exc: 808 self.call_exception_handler({ 809 'message': 'Error on reading from the event loop self pipe', 810 'exception': exc, 811 'loop': self, 812 }) 813 else: 814 self._self_reading_future = f 815 f.add_done_callback(self._loop_self_reading) 816 817 def _write_to_self(self): 818 # This may be called from a different thread, possibly after 819 # _close_self_pipe() has been called or even while it is 820 # running. Guard for self._csock being None or closed. When 821 # a socket is closed, send() raises OSError (with errno set to 822 # EBADF, but let's not rely on the exact error code). 823 csock = self._csock 824 if csock is None: 825 return 826 827 try: 828 csock.send(b'\0') 829 except OSError: 830 if self._debug: 831 logger.debug("Fail to write a null byte into the " 832 "self-pipe socket", 833 exc_info=True) 834 835 def _start_serving(self, protocol_factory, sock, 836 sslcontext=None, server=None, backlog=100, 837 ssl_handshake_timeout=None, 838 ssl_shutdown_timeout=None): 839 840 def loop(f=None): 841 try: 842 if f is not None: 843 conn, addr = f.result() 844 if self._debug: 845 logger.debug("%r got a new connection from %r: %r", 846 server, addr, conn) 847 protocol = protocol_factory() 848 if sslcontext is not None: 849 self._make_ssl_transport( 850 conn, protocol, sslcontext, server_side=True, 851 extra={'peername': addr}, server=server, 852 ssl_handshake_timeout=ssl_handshake_timeout, 853 ssl_shutdown_timeout=ssl_shutdown_timeout) 854 else: 855 self._make_socket_transport( 856 conn, protocol, 857 extra={'peername': addr}, server=server) 858 if self.is_closed(): 859 return 860 f = self._proactor.accept(sock) 861 except OSError as exc: 862 if sock.fileno() != -1: 863 self.call_exception_handler({ 864 'message': 'Accept failed on a socket', 865 'exception': exc, 866 'socket': trsock.TransportSocket(sock), 867 }) 868 sock.close() 869 elif self._debug: 870 logger.debug("Accept failed on socket %r", 871 sock, exc_info=True) 872 except exceptions.CancelledError: 873 sock.close() 874 else: 875 self._accept_futures[sock.fileno()] = f 876 f.add_done_callback(loop) 877 878 self.call_soon(loop) 879 880 def _process_events(self, event_list): 881 # Events are processed in the IocpProactor._poll() method 882 pass 883 884 def _stop_accept_futures(self): 885 for future in self._accept_futures.values(): 886 future.cancel() 887 self._accept_futures.clear() 888 889 def _stop_serving(self, sock): 890 future = self._accept_futures.pop(sock.fileno(), None) 891 if future: 892 future.cancel() 893 self._proactor._stop_serving(sock) 894 sock.close() 895