1"""Selector event loop for Unix with signal handling.""" 2 3import errno 4import io 5import itertools 6import os 7import selectors 8import signal 9import socket 10import stat 11import subprocess 12import sys 13import threading 14import warnings 15 16from . import base_events 17from . import base_subprocess 18from . import constants 19from . import coroutines 20from . import events 21from . import exceptions 22from . import futures 23from . import selector_events 24from . import tasks 25from . import transports 26from .log import logger 27 28 29__all__ = ( 30 'SelectorEventLoop', 31 'AbstractChildWatcher', 'SafeChildWatcher', 32 'FastChildWatcher', 'PidfdChildWatcher', 33 'MultiLoopChildWatcher', 'ThreadedChildWatcher', 34 'DefaultEventLoopPolicy', 35) 36 37 38if sys.platform == 'win32': # pragma: no cover 39 raise ImportError('Signals are not really supported on Windows') 40 41 42def _sighandler_noop(signum, frame): 43 """Dummy signal handler.""" 44 pass 45 46 47def waitstatus_to_exitcode(status): 48 try: 49 return os.waitstatus_to_exitcode(status) 50 except ValueError: 51 # The child exited, but we don't understand its status. 52 # This shouldn't happen, but if it does, let's just 53 # return that status; perhaps that helps debug it. 54 return status 55 56 57class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): 58 """Unix event loop. 59 60 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop. 61 """ 62 63 def __init__(self, selector=None): 64 super().__init__(selector) 65 self._signal_handlers = {} 66 67 def close(self): 68 super().close() 69 if not sys.is_finalizing(): 70 for sig in list(self._signal_handlers): 71 self.remove_signal_handler(sig) 72 else: 73 if self._signal_handlers: 74 warnings.warn(f"Closing the loop {self!r} " 75 f"on interpreter shutdown " 76 f"stage, skipping signal handlers removal", 77 ResourceWarning, 78 source=self) 79 self._signal_handlers.clear() 80 81 def _process_self_data(self, data): 82 for signum in data: 83 if not signum: 84 # ignore null bytes written by _write_to_self() 85 continue 86 self._handle_signal(signum) 87 88 def add_signal_handler(self, sig, callback, *args): 89 """Add a handler for a signal. UNIX only. 90 91 Raise ValueError if the signal number is invalid or uncatchable. 92 Raise RuntimeError if there is a problem setting up the handler. 93 """ 94 if (coroutines.iscoroutine(callback) or 95 coroutines.iscoroutinefunction(callback)): 96 raise TypeError("coroutines cannot be used " 97 "with add_signal_handler()") 98 self._check_signal(sig) 99 self._check_closed() 100 try: 101 # set_wakeup_fd() raises ValueError if this is not the 102 # main thread. By calling it early we ensure that an 103 # event loop running in another thread cannot add a signal 104 # handler. 105 signal.set_wakeup_fd(self._csock.fileno()) 106 except (ValueError, OSError) as exc: 107 raise RuntimeError(str(exc)) 108 109 handle = events.Handle(callback, args, self, None) 110 self._signal_handlers[sig] = handle 111 112 try: 113 # Register a dummy signal handler to ask Python to write the signal 114 # number in the wakeup file descriptor. _process_self_data() will 115 # read signal numbers from this file descriptor to handle signals. 116 signal.signal(sig, _sighandler_noop) 117 118 # Set SA_RESTART to limit EINTR occurrences. 119 signal.siginterrupt(sig, False) 120 except OSError as exc: 121 del self._signal_handlers[sig] 122 if not self._signal_handlers: 123 try: 124 signal.set_wakeup_fd(-1) 125 except (ValueError, OSError) as nexc: 126 logger.info('set_wakeup_fd(-1) failed: %s', nexc) 127 128 if exc.errno == errno.EINVAL: 129 raise RuntimeError(f'sig {sig} cannot be caught') 130 else: 131 raise 132 133 def _handle_signal(self, sig): 134 """Internal helper that is the actual signal handler.""" 135 handle = self._signal_handlers.get(sig) 136 if handle is None: 137 return # Assume it's some race condition. 138 if handle._cancelled: 139 self.remove_signal_handler(sig) # Remove it properly. 140 else: 141 self._add_callback_signalsafe(handle) 142 143 def remove_signal_handler(self, sig): 144 """Remove a handler for a signal. UNIX only. 145 146 Return True if a signal handler was removed, False if not. 147 """ 148 self._check_signal(sig) 149 try: 150 del self._signal_handlers[sig] 151 except KeyError: 152 return False 153 154 if sig == signal.SIGINT: 155 handler = signal.default_int_handler 156 else: 157 handler = signal.SIG_DFL 158 159 try: 160 signal.signal(sig, handler) 161 except OSError as exc: 162 if exc.errno == errno.EINVAL: 163 raise RuntimeError(f'sig {sig} cannot be caught') 164 else: 165 raise 166 167 if not self._signal_handlers: 168 try: 169 signal.set_wakeup_fd(-1) 170 except (ValueError, OSError) as exc: 171 logger.info('set_wakeup_fd(-1) failed: %s', exc) 172 173 return True 174 175 def _check_signal(self, sig): 176 """Internal helper to validate a signal. 177 178 Raise ValueError if the signal number is invalid or uncatchable. 179 Raise RuntimeError if there is a problem setting up the handler. 180 """ 181 if not isinstance(sig, int): 182 raise TypeError(f'sig must be an int, not {sig!r}') 183 184 if sig not in signal.valid_signals(): 185 raise ValueError(f'invalid signal number {sig}') 186 187 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 188 extra=None): 189 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) 190 191 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 192 extra=None): 193 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) 194 195 async def _make_subprocess_transport(self, protocol, args, shell, 196 stdin, stdout, stderr, bufsize, 197 extra=None, **kwargs): 198 with events.get_child_watcher() as watcher: 199 if not watcher.is_active(): 200 # Check early. 201 # Raising exception before process creation 202 # prevents subprocess execution if the watcher 203 # is not ready to handle it. 204 raise RuntimeError("asyncio.get_child_watcher() is not activated, " 205 "subprocess support is not installed.") 206 waiter = self.create_future() 207 transp = _UnixSubprocessTransport(self, protocol, args, shell, 208 stdin, stdout, stderr, bufsize, 209 waiter=waiter, extra=extra, 210 **kwargs) 211 212 watcher.add_child_handler(transp.get_pid(), 213 self._child_watcher_callback, transp) 214 try: 215 await waiter 216 except (SystemExit, KeyboardInterrupt): 217 raise 218 except BaseException: 219 transp.close() 220 await transp._wait() 221 raise 222 223 return transp 224 225 def _child_watcher_callback(self, pid, returncode, transp): 226 # Skip one iteration for callbacks to be executed 227 self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode) 228 229 async def create_unix_connection( 230 self, protocol_factory, path=None, *, 231 ssl=None, sock=None, 232 server_hostname=None, 233 ssl_handshake_timeout=None, 234 ssl_shutdown_timeout=None): 235 assert server_hostname is None or isinstance(server_hostname, str) 236 if ssl: 237 if server_hostname is None: 238 raise ValueError( 239 'you have to pass server_hostname when using ssl') 240 else: 241 if server_hostname is not None: 242 raise ValueError('server_hostname is only meaningful with ssl') 243 if ssl_handshake_timeout is not None: 244 raise ValueError( 245 'ssl_handshake_timeout is only meaningful with ssl') 246 if ssl_shutdown_timeout is not None: 247 raise ValueError( 248 'ssl_shutdown_timeout is only meaningful with ssl') 249 250 if path is not None: 251 if sock is not None: 252 raise ValueError( 253 'path and sock can not be specified at the same time') 254 255 path = os.fspath(path) 256 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 257 try: 258 sock.setblocking(False) 259 await self.sock_connect(sock, path) 260 except: 261 sock.close() 262 raise 263 264 else: 265 if sock is None: 266 raise ValueError('no path and sock were specified') 267 if (sock.family != socket.AF_UNIX or 268 sock.type != socket.SOCK_STREAM): 269 raise ValueError( 270 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 271 sock.setblocking(False) 272 273 transport, protocol = await self._create_connection_transport( 274 sock, protocol_factory, ssl, server_hostname, 275 ssl_handshake_timeout=ssl_handshake_timeout, 276 ssl_shutdown_timeout=ssl_shutdown_timeout) 277 return transport, protocol 278 279 async def create_unix_server( 280 self, protocol_factory, path=None, *, 281 sock=None, backlog=100, ssl=None, 282 ssl_handshake_timeout=None, 283 ssl_shutdown_timeout=None, 284 start_serving=True): 285 if isinstance(ssl, bool): 286 raise TypeError('ssl argument must be an SSLContext or None') 287 288 if ssl_handshake_timeout is not None and not ssl: 289 raise ValueError( 290 'ssl_handshake_timeout is only meaningful with ssl') 291 292 if ssl_shutdown_timeout is not None and not ssl: 293 raise ValueError( 294 'ssl_shutdown_timeout is only meaningful with ssl') 295 296 if path is not None: 297 if sock is not None: 298 raise ValueError( 299 'path and sock can not be specified at the same time') 300 301 path = os.fspath(path) 302 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 303 304 # Check for abstract socket. `str` and `bytes` paths are supported. 305 if path[0] not in (0, '\x00'): 306 try: 307 if stat.S_ISSOCK(os.stat(path).st_mode): 308 os.remove(path) 309 except FileNotFoundError: 310 pass 311 except OSError as err: 312 # Directory may have permissions only to create socket. 313 logger.error('Unable to check or remove stale UNIX socket ' 314 '%r: %r', path, err) 315 316 try: 317 sock.bind(path) 318 except OSError as exc: 319 sock.close() 320 if exc.errno == errno.EADDRINUSE: 321 # Let's improve the error message by adding 322 # with what exact address it occurs. 323 msg = f'Address {path!r} is already in use' 324 raise OSError(errno.EADDRINUSE, msg) from None 325 else: 326 raise 327 except: 328 sock.close() 329 raise 330 else: 331 if sock is None: 332 raise ValueError( 333 'path was not specified, and no sock specified') 334 335 if (sock.family != socket.AF_UNIX or 336 sock.type != socket.SOCK_STREAM): 337 raise ValueError( 338 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 339 340 sock.setblocking(False) 341 server = base_events.Server(self, [sock], protocol_factory, 342 ssl, backlog, ssl_handshake_timeout, 343 ssl_shutdown_timeout) 344 if start_serving: 345 server._start_serving() 346 # Skip one loop iteration so that all 'loop.add_reader' 347 # go through. 348 await tasks.sleep(0) 349 350 return server 351 352 async def _sock_sendfile_native(self, sock, file, offset, count): 353 try: 354 os.sendfile 355 except AttributeError: 356 raise exceptions.SendfileNotAvailableError( 357 "os.sendfile() is not available") 358 try: 359 fileno = file.fileno() 360 except (AttributeError, io.UnsupportedOperation) as err: 361 raise exceptions.SendfileNotAvailableError("not a regular file") 362 try: 363 fsize = os.fstat(fileno).st_size 364 except OSError: 365 raise exceptions.SendfileNotAvailableError("not a regular file") 366 blocksize = count if count else fsize 367 if not blocksize: 368 return 0 # empty file 369 370 fut = self.create_future() 371 self._sock_sendfile_native_impl(fut, None, sock, fileno, 372 offset, count, blocksize, 0) 373 return await fut 374 375 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno, 376 offset, count, blocksize, total_sent): 377 fd = sock.fileno() 378 if registered_fd is not None: 379 # Remove the callback early. It should be rare that the 380 # selector says the fd is ready but the call still returns 381 # EAGAIN, and I am willing to take a hit in that case in 382 # order to simplify the common case. 383 self.remove_writer(registered_fd) 384 if fut.cancelled(): 385 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 386 return 387 if count: 388 blocksize = count - total_sent 389 if blocksize <= 0: 390 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 391 fut.set_result(total_sent) 392 return 393 394 try: 395 sent = os.sendfile(fd, fileno, offset, blocksize) 396 except (BlockingIOError, InterruptedError): 397 if registered_fd is None: 398 self._sock_add_cancellation_callback(fut, sock) 399 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 400 fd, sock, fileno, 401 offset, count, blocksize, total_sent) 402 except OSError as exc: 403 if (registered_fd is not None and 404 exc.errno == errno.ENOTCONN and 405 type(exc) is not ConnectionError): 406 # If we have an ENOTCONN and this isn't a first call to 407 # sendfile(), i.e. the connection was closed in the middle 408 # of the operation, normalize the error to ConnectionError 409 # to make it consistent across all Posix systems. 410 new_exc = ConnectionError( 411 "socket is not connected", errno.ENOTCONN) 412 new_exc.__cause__ = exc 413 exc = new_exc 414 if total_sent == 0: 415 # We can get here for different reasons, the main 416 # one being 'file' is not a regular mmap(2)-like 417 # file, in which case we'll fall back on using 418 # plain send(). 419 err = exceptions.SendfileNotAvailableError( 420 "os.sendfile call failed") 421 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 422 fut.set_exception(err) 423 else: 424 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 425 fut.set_exception(exc) 426 except (SystemExit, KeyboardInterrupt): 427 raise 428 except BaseException as exc: 429 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 430 fut.set_exception(exc) 431 else: 432 if sent == 0: 433 # EOF 434 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 435 fut.set_result(total_sent) 436 else: 437 offset += sent 438 total_sent += sent 439 if registered_fd is None: 440 self._sock_add_cancellation_callback(fut, sock) 441 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 442 fd, sock, fileno, 443 offset, count, blocksize, total_sent) 444 445 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent): 446 if total_sent > 0: 447 os.lseek(fileno, offset, os.SEEK_SET) 448 449 def _sock_add_cancellation_callback(self, fut, sock): 450 def cb(fut): 451 if fut.cancelled(): 452 fd = sock.fileno() 453 if fd != -1: 454 self.remove_writer(fd) 455 fut.add_done_callback(cb) 456 457 458class _UnixReadPipeTransport(transports.ReadTransport): 459 460 max_size = 256 * 1024 # max bytes we read in one event loop iteration 461 462 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 463 super().__init__(extra) 464 self._extra['pipe'] = pipe 465 self._loop = loop 466 self._pipe = pipe 467 self._fileno = pipe.fileno() 468 self._protocol = protocol 469 self._closing = False 470 self._paused = False 471 472 mode = os.fstat(self._fileno).st_mode 473 if not (stat.S_ISFIFO(mode) or 474 stat.S_ISSOCK(mode) or 475 stat.S_ISCHR(mode)): 476 self._pipe = None 477 self._fileno = None 478 self._protocol = None 479 raise ValueError("Pipe transport is for pipes/sockets only.") 480 481 os.set_blocking(self._fileno, False) 482 483 self._loop.call_soon(self._protocol.connection_made, self) 484 # only start reading when connection_made() has been called 485 self._loop.call_soon(self._add_reader, 486 self._fileno, self._read_ready) 487 if waiter is not None: 488 # only wake up the waiter when connection_made() has been called 489 self._loop.call_soon(futures._set_result_unless_cancelled, 490 waiter, None) 491 492 def _add_reader(self, fd, callback): 493 if not self.is_reading(): 494 return 495 self._loop._add_reader(fd, callback) 496 497 def is_reading(self): 498 return not self._paused and not self._closing 499 500 def __repr__(self): 501 info = [self.__class__.__name__] 502 if self._pipe is None: 503 info.append('closed') 504 elif self._closing: 505 info.append('closing') 506 info.append(f'fd={self._fileno}') 507 selector = getattr(self._loop, '_selector', None) 508 if self._pipe is not None and selector is not None: 509 polling = selector_events._test_selector_event( 510 selector, self._fileno, selectors.EVENT_READ) 511 if polling: 512 info.append('polling') 513 else: 514 info.append('idle') 515 elif self._pipe is not None: 516 info.append('open') 517 else: 518 info.append('closed') 519 return '<{}>'.format(' '.join(info)) 520 521 def _read_ready(self): 522 try: 523 data = os.read(self._fileno, self.max_size) 524 except (BlockingIOError, InterruptedError): 525 pass 526 except OSError as exc: 527 self._fatal_error(exc, 'Fatal read error on pipe transport') 528 else: 529 if data: 530 self._protocol.data_received(data) 531 else: 532 if self._loop.get_debug(): 533 logger.info("%r was closed by peer", self) 534 self._closing = True 535 self._loop._remove_reader(self._fileno) 536 self._loop.call_soon(self._protocol.eof_received) 537 self._loop.call_soon(self._call_connection_lost, None) 538 539 def pause_reading(self): 540 if not self.is_reading(): 541 return 542 self._paused = True 543 self._loop._remove_reader(self._fileno) 544 if self._loop.get_debug(): 545 logger.debug("%r pauses reading", self) 546 547 def resume_reading(self): 548 if self._closing or not self._paused: 549 return 550 self._paused = False 551 self._loop._add_reader(self._fileno, self._read_ready) 552 if self._loop.get_debug(): 553 logger.debug("%r resumes reading", self) 554 555 def set_protocol(self, protocol): 556 self._protocol = protocol 557 558 def get_protocol(self): 559 return self._protocol 560 561 def is_closing(self): 562 return self._closing 563 564 def close(self): 565 if not self._closing: 566 self._close(None) 567 568 def __del__(self, _warn=warnings.warn): 569 if self._pipe is not None: 570 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 571 self._pipe.close() 572 573 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 574 # should be called by exception handler only 575 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 576 if self._loop.get_debug(): 577 logger.debug("%r: %s", self, message, exc_info=True) 578 else: 579 self._loop.call_exception_handler({ 580 'message': message, 581 'exception': exc, 582 'transport': self, 583 'protocol': self._protocol, 584 }) 585 self._close(exc) 586 587 def _close(self, exc): 588 self._closing = True 589 self._loop._remove_reader(self._fileno) 590 self._loop.call_soon(self._call_connection_lost, exc) 591 592 def _call_connection_lost(self, exc): 593 try: 594 self._protocol.connection_lost(exc) 595 finally: 596 self._pipe.close() 597 self._pipe = None 598 self._protocol = None 599 self._loop = None 600 601 602class _UnixWritePipeTransport(transports._FlowControlMixin, 603 transports.WriteTransport): 604 605 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 606 super().__init__(extra, loop) 607 self._extra['pipe'] = pipe 608 self._pipe = pipe 609 self._fileno = pipe.fileno() 610 self._protocol = protocol 611 self._buffer = bytearray() 612 self._conn_lost = 0 613 self._closing = False # Set when close() or write_eof() called. 614 615 mode = os.fstat(self._fileno).st_mode 616 is_char = stat.S_ISCHR(mode) 617 is_fifo = stat.S_ISFIFO(mode) 618 is_socket = stat.S_ISSOCK(mode) 619 if not (is_char or is_fifo or is_socket): 620 self._pipe = None 621 self._fileno = None 622 self._protocol = None 623 raise ValueError("Pipe transport is only for " 624 "pipes, sockets and character devices") 625 626 os.set_blocking(self._fileno, False) 627 self._loop.call_soon(self._protocol.connection_made, self) 628 629 # On AIX, the reader trick (to be notified when the read end of the 630 # socket is closed) only works for sockets. On other platforms it 631 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) 632 if is_socket or (is_fifo and not sys.platform.startswith("aix")): 633 # only start reading when connection_made() has been called 634 self._loop.call_soon(self._loop._add_reader, 635 self._fileno, self._read_ready) 636 637 if waiter is not None: 638 # only wake up the waiter when connection_made() has been called 639 self._loop.call_soon(futures._set_result_unless_cancelled, 640 waiter, None) 641 642 def __repr__(self): 643 info = [self.__class__.__name__] 644 if self._pipe is None: 645 info.append('closed') 646 elif self._closing: 647 info.append('closing') 648 info.append(f'fd={self._fileno}') 649 selector = getattr(self._loop, '_selector', None) 650 if self._pipe is not None and selector is not None: 651 polling = selector_events._test_selector_event( 652 selector, self._fileno, selectors.EVENT_WRITE) 653 if polling: 654 info.append('polling') 655 else: 656 info.append('idle') 657 658 bufsize = self.get_write_buffer_size() 659 info.append(f'bufsize={bufsize}') 660 elif self._pipe is not None: 661 info.append('open') 662 else: 663 info.append('closed') 664 return '<{}>'.format(' '.join(info)) 665 666 def get_write_buffer_size(self): 667 return len(self._buffer) 668 669 def _read_ready(self): 670 # Pipe was closed by peer. 671 if self._loop.get_debug(): 672 logger.info("%r was closed by peer", self) 673 if self._buffer: 674 self._close(BrokenPipeError()) 675 else: 676 self._close() 677 678 def write(self, data): 679 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) 680 if isinstance(data, bytearray): 681 data = memoryview(data) 682 if not data: 683 return 684 685 if self._conn_lost or self._closing: 686 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 687 logger.warning('pipe closed by peer or ' 688 'os.write(pipe, data) raised exception.') 689 self._conn_lost += 1 690 return 691 692 if not self._buffer: 693 # Attempt to send it right away first. 694 try: 695 n = os.write(self._fileno, data) 696 except (BlockingIOError, InterruptedError): 697 n = 0 698 except (SystemExit, KeyboardInterrupt): 699 raise 700 except BaseException as exc: 701 self._conn_lost += 1 702 self._fatal_error(exc, 'Fatal write error on pipe transport') 703 return 704 if n == len(data): 705 return 706 elif n > 0: 707 data = memoryview(data)[n:] 708 self._loop._add_writer(self._fileno, self._write_ready) 709 710 self._buffer += data 711 self._maybe_pause_protocol() 712 713 def _write_ready(self): 714 assert self._buffer, 'Data should not be empty' 715 716 try: 717 n = os.write(self._fileno, self._buffer) 718 except (BlockingIOError, InterruptedError): 719 pass 720 except (SystemExit, KeyboardInterrupt): 721 raise 722 except BaseException as exc: 723 self._buffer.clear() 724 self._conn_lost += 1 725 # Remove writer here, _fatal_error() doesn't it 726 # because _buffer is empty. 727 self._loop._remove_writer(self._fileno) 728 self._fatal_error(exc, 'Fatal write error on pipe transport') 729 else: 730 if n == len(self._buffer): 731 self._buffer.clear() 732 self._loop._remove_writer(self._fileno) 733 self._maybe_resume_protocol() # May append to buffer. 734 if self._closing: 735 self._loop._remove_reader(self._fileno) 736 self._call_connection_lost(None) 737 return 738 elif n > 0: 739 del self._buffer[:n] 740 741 def can_write_eof(self): 742 return True 743 744 def write_eof(self): 745 if self._closing: 746 return 747 assert self._pipe 748 self._closing = True 749 if not self._buffer: 750 self._loop._remove_reader(self._fileno) 751 self._loop.call_soon(self._call_connection_lost, None) 752 753 def set_protocol(self, protocol): 754 self._protocol = protocol 755 756 def get_protocol(self): 757 return self._protocol 758 759 def is_closing(self): 760 return self._closing 761 762 def close(self): 763 if self._pipe is not None and not self._closing: 764 # write_eof is all what we needed to close the write pipe 765 self.write_eof() 766 767 def __del__(self, _warn=warnings.warn): 768 if self._pipe is not None: 769 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 770 self._pipe.close() 771 772 def abort(self): 773 self._close(None) 774 775 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 776 # should be called by exception handler only 777 if isinstance(exc, OSError): 778 if self._loop.get_debug(): 779 logger.debug("%r: %s", self, message, exc_info=True) 780 else: 781 self._loop.call_exception_handler({ 782 'message': message, 783 'exception': exc, 784 'transport': self, 785 'protocol': self._protocol, 786 }) 787 self._close(exc) 788 789 def _close(self, exc=None): 790 self._closing = True 791 if self._buffer: 792 self._loop._remove_writer(self._fileno) 793 self._buffer.clear() 794 self._loop._remove_reader(self._fileno) 795 self._loop.call_soon(self._call_connection_lost, exc) 796 797 def _call_connection_lost(self, exc): 798 try: 799 self._protocol.connection_lost(exc) 800 finally: 801 self._pipe.close() 802 self._pipe = None 803 self._protocol = None 804 self._loop = None 805 806 807class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): 808 809 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 810 stdin_w = None 811 if stdin == subprocess.PIPE and sys.platform.startswith('aix'): 812 # Use a socket pair for stdin on AIX, since it does not 813 # support selecting read events on the write end of a 814 # socket (which we use in order to detect closing of the 815 # other end). 816 stdin, stdin_w = socket.socketpair() 817 try: 818 self._proc = subprocess.Popen( 819 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 820 universal_newlines=False, bufsize=bufsize, **kwargs) 821 if stdin_w is not None: 822 stdin.close() 823 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 824 stdin_w = None 825 finally: 826 if stdin_w is not None: 827 stdin.close() 828 stdin_w.close() 829 830 831class AbstractChildWatcher: 832 """Abstract base class for monitoring child processes. 833 834 Objects derived from this class monitor a collection of subprocesses and 835 report their termination or interruption by a signal. 836 837 New callbacks are registered with .add_child_handler(). Starting a new 838 process must be done within a 'with' block to allow the watcher to suspend 839 its activity until the new process if fully registered (this is needed to 840 prevent a race condition in some implementations). 841 842 Example: 843 with watcher: 844 proc = subprocess.Popen("sleep 1") 845 watcher.add_child_handler(proc.pid, callback) 846 847 Notes: 848 Implementations of this class must be thread-safe. 849 850 Since child watcher objects may catch the SIGCHLD signal and call 851 waitpid(-1), there should be only one active object per process. 852 """ 853 854 def add_child_handler(self, pid, callback, *args): 855 """Register a new child handler. 856 857 Arrange for callback(pid, returncode, *args) to be called when 858 process 'pid' terminates. Specifying another callback for the same 859 process replaces the previous handler. 860 861 Note: callback() must be thread-safe. 862 """ 863 raise NotImplementedError() 864 865 def remove_child_handler(self, pid): 866 """Removes the handler for process 'pid'. 867 868 The function returns True if the handler was successfully removed, 869 False if there was nothing to remove.""" 870 871 raise NotImplementedError() 872 873 def attach_loop(self, loop): 874 """Attach the watcher to an event loop. 875 876 If the watcher was previously attached to an event loop, then it is 877 first detached before attaching to the new loop. 878 879 Note: loop may be None. 880 """ 881 raise NotImplementedError() 882 883 def close(self): 884 """Close the watcher. 885 886 This must be called to make sure that any underlying resource is freed. 887 """ 888 raise NotImplementedError() 889 890 def is_active(self): 891 """Return ``True`` if the watcher is active and is used by the event loop. 892 893 Return True if the watcher is installed and ready to handle process exit 894 notifications. 895 896 """ 897 raise NotImplementedError() 898 899 def __enter__(self): 900 """Enter the watcher's context and allow starting new processes 901 902 This function must return self""" 903 raise NotImplementedError() 904 905 def __exit__(self, a, b, c): 906 """Exit the watcher's context""" 907 raise NotImplementedError() 908 909 910class PidfdChildWatcher(AbstractChildWatcher): 911 """Child watcher implementation using Linux's pid file descriptors. 912 913 This child watcher polls process file descriptors (pidfds) to await child 914 process termination. In some respects, PidfdChildWatcher is a "Goldilocks" 915 child watcher implementation. It doesn't require signals or threads, doesn't 916 interfere with any processes launched outside the event loop, and scales 917 linearly with the number of subprocesses launched by the event loop. The 918 main disadvantage is that pidfds are specific to Linux, and only work on 919 recent (5.3+) kernels. 920 """ 921 922 def __init__(self): 923 self._loop = None 924 self._callbacks = {} 925 926 def __enter__(self): 927 return self 928 929 def __exit__(self, exc_type, exc_value, exc_traceback): 930 pass 931 932 def is_active(self): 933 return self._loop is not None and self._loop.is_running() 934 935 def close(self): 936 self.attach_loop(None) 937 938 def attach_loop(self, loop): 939 if self._loop is not None and loop is None and self._callbacks: 940 warnings.warn( 941 'A loop is being detached ' 942 'from a child watcher with pending handlers', 943 RuntimeWarning) 944 for pidfd, _, _ in self._callbacks.values(): 945 self._loop._remove_reader(pidfd) 946 os.close(pidfd) 947 self._callbacks.clear() 948 self._loop = loop 949 950 def add_child_handler(self, pid, callback, *args): 951 existing = self._callbacks.get(pid) 952 if existing is not None: 953 self._callbacks[pid] = existing[0], callback, args 954 else: 955 pidfd = os.pidfd_open(pid) 956 self._loop._add_reader(pidfd, self._do_wait, pid) 957 self._callbacks[pid] = pidfd, callback, args 958 959 def _do_wait(self, pid): 960 pidfd, callback, args = self._callbacks.pop(pid) 961 self._loop._remove_reader(pidfd) 962 try: 963 _, status = os.waitpid(pid, 0) 964 except ChildProcessError: 965 # The child process is already reaped 966 # (may happen if waitpid() is called elsewhere). 967 returncode = 255 968 logger.warning( 969 "child process pid %d exit status already read: " 970 " will report returncode 255", 971 pid) 972 else: 973 returncode = waitstatus_to_exitcode(status) 974 975 os.close(pidfd) 976 callback(pid, returncode, *args) 977 978 def remove_child_handler(self, pid): 979 try: 980 pidfd, _, _ = self._callbacks.pop(pid) 981 except KeyError: 982 return False 983 self._loop._remove_reader(pidfd) 984 os.close(pidfd) 985 return True 986 987 988class BaseChildWatcher(AbstractChildWatcher): 989 990 def __init__(self): 991 self._loop = None 992 self._callbacks = {} 993 994 def close(self): 995 self.attach_loop(None) 996 997 def is_active(self): 998 return self._loop is not None and self._loop.is_running() 999 1000 def _do_waitpid(self, expected_pid): 1001 raise NotImplementedError() 1002 1003 def _do_waitpid_all(self): 1004 raise NotImplementedError() 1005 1006 def attach_loop(self, loop): 1007 assert loop is None or isinstance(loop, events.AbstractEventLoop) 1008 1009 if self._loop is not None and loop is None and self._callbacks: 1010 warnings.warn( 1011 'A loop is being detached ' 1012 'from a child watcher with pending handlers', 1013 RuntimeWarning) 1014 1015 if self._loop is not None: 1016 self._loop.remove_signal_handler(signal.SIGCHLD) 1017 1018 self._loop = loop 1019 if loop is not None: 1020 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) 1021 1022 # Prevent a race condition in case a child terminated 1023 # during the switch. 1024 self._do_waitpid_all() 1025 1026 def _sig_chld(self): 1027 try: 1028 self._do_waitpid_all() 1029 except (SystemExit, KeyboardInterrupt): 1030 raise 1031 except BaseException as exc: 1032 # self._loop should always be available here 1033 # as '_sig_chld' is added as a signal handler 1034 # in 'attach_loop' 1035 self._loop.call_exception_handler({ 1036 'message': 'Unknown exception in SIGCHLD handler', 1037 'exception': exc, 1038 }) 1039 1040 1041class SafeChildWatcher(BaseChildWatcher): 1042 """'Safe' child watcher implementation. 1043 1044 This implementation avoids disrupting other code spawning processes by 1045 polling explicitly each process in the SIGCHLD handler instead of calling 1046 os.waitpid(-1). 1047 1048 This is a safe solution but it has a significant overhead when handling a 1049 big number of children (O(n) each time SIGCHLD is raised) 1050 """ 1051 1052 def close(self): 1053 self._callbacks.clear() 1054 super().close() 1055 1056 def __enter__(self): 1057 return self 1058 1059 def __exit__(self, a, b, c): 1060 pass 1061 1062 def add_child_handler(self, pid, callback, *args): 1063 self._callbacks[pid] = (callback, args) 1064 1065 # Prevent a race condition in case the child is already terminated. 1066 self._do_waitpid(pid) 1067 1068 def remove_child_handler(self, pid): 1069 try: 1070 del self._callbacks[pid] 1071 return True 1072 except KeyError: 1073 return False 1074 1075 def _do_waitpid_all(self): 1076 1077 for pid in list(self._callbacks): 1078 self._do_waitpid(pid) 1079 1080 def _do_waitpid(self, expected_pid): 1081 assert expected_pid > 0 1082 1083 try: 1084 pid, status = os.waitpid(expected_pid, os.WNOHANG) 1085 except ChildProcessError: 1086 # The child process is already reaped 1087 # (may happen if waitpid() is called elsewhere). 1088 pid = expected_pid 1089 returncode = 255 1090 logger.warning( 1091 "Unknown child process pid %d, will report returncode 255", 1092 pid) 1093 else: 1094 if pid == 0: 1095 # The child process is still alive. 1096 return 1097 1098 returncode = waitstatus_to_exitcode(status) 1099 if self._loop.get_debug(): 1100 logger.debug('process %s exited with returncode %s', 1101 expected_pid, returncode) 1102 1103 try: 1104 callback, args = self._callbacks.pop(pid) 1105 except KeyError: # pragma: no cover 1106 # May happen if .remove_child_handler() is called 1107 # after os.waitpid() returns. 1108 if self._loop.get_debug(): 1109 logger.warning("Child watcher got an unexpected pid: %r", 1110 pid, exc_info=True) 1111 else: 1112 callback(pid, returncode, *args) 1113 1114 1115class FastChildWatcher(BaseChildWatcher): 1116 """'Fast' child watcher implementation. 1117 1118 This implementation reaps every terminated processes by calling 1119 os.waitpid(-1) directly, possibly breaking other code spawning processes 1120 and waiting for their termination. 1121 1122 There is no noticeable overhead when handling a big number of children 1123 (O(1) each time a child terminates). 1124 """ 1125 def __init__(self): 1126 super().__init__() 1127 self._lock = threading.Lock() 1128 self._zombies = {} 1129 self._forks = 0 1130 1131 def close(self): 1132 self._callbacks.clear() 1133 self._zombies.clear() 1134 super().close() 1135 1136 def __enter__(self): 1137 with self._lock: 1138 self._forks += 1 1139 1140 return self 1141 1142 def __exit__(self, a, b, c): 1143 with self._lock: 1144 self._forks -= 1 1145 1146 if self._forks or not self._zombies: 1147 return 1148 1149 collateral_victims = str(self._zombies) 1150 self._zombies.clear() 1151 1152 logger.warning( 1153 "Caught subprocesses termination from unknown pids: %s", 1154 collateral_victims) 1155 1156 def add_child_handler(self, pid, callback, *args): 1157 assert self._forks, "Must use the context manager" 1158 1159 with self._lock: 1160 try: 1161 returncode = self._zombies.pop(pid) 1162 except KeyError: 1163 # The child is running. 1164 self._callbacks[pid] = callback, args 1165 return 1166 1167 # The child is dead already. We can fire the callback. 1168 callback(pid, returncode, *args) 1169 1170 def remove_child_handler(self, pid): 1171 try: 1172 del self._callbacks[pid] 1173 return True 1174 except KeyError: 1175 return False 1176 1177 def _do_waitpid_all(self): 1178 # Because of signal coalescing, we must keep calling waitpid() as 1179 # long as we're able to reap a child. 1180 while True: 1181 try: 1182 pid, status = os.waitpid(-1, os.WNOHANG) 1183 except ChildProcessError: 1184 # No more child processes exist. 1185 return 1186 else: 1187 if pid == 0: 1188 # A child process is still alive. 1189 return 1190 1191 returncode = waitstatus_to_exitcode(status) 1192 1193 with self._lock: 1194 try: 1195 callback, args = self._callbacks.pop(pid) 1196 except KeyError: 1197 # unknown child 1198 if self._forks: 1199 # It may not be registered yet. 1200 self._zombies[pid] = returncode 1201 if self._loop.get_debug(): 1202 logger.debug('unknown process %s exited ' 1203 'with returncode %s', 1204 pid, returncode) 1205 continue 1206 callback = None 1207 else: 1208 if self._loop.get_debug(): 1209 logger.debug('process %s exited with returncode %s', 1210 pid, returncode) 1211 1212 if callback is None: 1213 logger.warning( 1214 "Caught subprocess termination from unknown pid: " 1215 "%d -> %d", pid, returncode) 1216 else: 1217 callback(pid, returncode, *args) 1218 1219 1220class MultiLoopChildWatcher(AbstractChildWatcher): 1221 """A watcher that doesn't require running loop in the main thread. 1222 1223 This implementation registers a SIGCHLD signal handler on 1224 instantiation (which may conflict with other code that 1225 install own handler for this signal). 1226 1227 The solution is safe but it has a significant overhead when 1228 handling a big number of processes (*O(n)* each time a 1229 SIGCHLD is received). 1230 """ 1231 1232 # Implementation note: 1233 # The class keeps compatibility with AbstractChildWatcher ABC 1234 # To achieve this it has empty attach_loop() method 1235 # and doesn't accept explicit loop argument 1236 # for add_child_handler()/remove_child_handler() 1237 # but retrieves the current loop by get_running_loop() 1238 1239 def __init__(self): 1240 self._callbacks = {} 1241 self._saved_sighandler = None 1242 1243 def is_active(self): 1244 return self._saved_sighandler is not None 1245 1246 def close(self): 1247 self._callbacks.clear() 1248 if self._saved_sighandler is None: 1249 return 1250 1251 handler = signal.getsignal(signal.SIGCHLD) 1252 if handler != self._sig_chld: 1253 logger.warning("SIGCHLD handler was changed by outside code") 1254 else: 1255 signal.signal(signal.SIGCHLD, self._saved_sighandler) 1256 self._saved_sighandler = None 1257 1258 def __enter__(self): 1259 return self 1260 1261 def __exit__(self, exc_type, exc_val, exc_tb): 1262 pass 1263 1264 def add_child_handler(self, pid, callback, *args): 1265 loop = events.get_running_loop() 1266 self._callbacks[pid] = (loop, callback, args) 1267 1268 # Prevent a race condition in case the child is already terminated. 1269 self._do_waitpid(pid) 1270 1271 def remove_child_handler(self, pid): 1272 try: 1273 del self._callbacks[pid] 1274 return True 1275 except KeyError: 1276 return False 1277 1278 def attach_loop(self, loop): 1279 # Don't save the loop but initialize itself if called first time 1280 # The reason to do it here is that attach_loop() is called from 1281 # unix policy only for the main thread. 1282 # Main thread is required for subscription on SIGCHLD signal 1283 if self._saved_sighandler is not None: 1284 return 1285 1286 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) 1287 if self._saved_sighandler is None: 1288 logger.warning("Previous SIGCHLD handler was set by non-Python code, " 1289 "restore to default handler on watcher close.") 1290 self._saved_sighandler = signal.SIG_DFL 1291 1292 # Set SA_RESTART to limit EINTR occurrences. 1293 signal.siginterrupt(signal.SIGCHLD, False) 1294 1295 def _do_waitpid_all(self): 1296 for pid in list(self._callbacks): 1297 self._do_waitpid(pid) 1298 1299 def _do_waitpid(self, expected_pid): 1300 assert expected_pid > 0 1301 1302 try: 1303 pid, status = os.waitpid(expected_pid, os.WNOHANG) 1304 except ChildProcessError: 1305 # The child process is already reaped 1306 # (may happen if waitpid() is called elsewhere). 1307 pid = expected_pid 1308 returncode = 255 1309 logger.warning( 1310 "Unknown child process pid %d, will report returncode 255", 1311 pid) 1312 debug_log = False 1313 else: 1314 if pid == 0: 1315 # The child process is still alive. 1316 return 1317 1318 returncode = waitstatus_to_exitcode(status) 1319 debug_log = True 1320 try: 1321 loop, callback, args = self._callbacks.pop(pid) 1322 except KeyError: # pragma: no cover 1323 # May happen if .remove_child_handler() is called 1324 # after os.waitpid() returns. 1325 logger.warning("Child watcher got an unexpected pid: %r", 1326 pid, exc_info=True) 1327 else: 1328 if loop.is_closed(): 1329 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 1330 else: 1331 if debug_log and loop.get_debug(): 1332 logger.debug('process %s exited with returncode %s', 1333 expected_pid, returncode) 1334 loop.call_soon_threadsafe(callback, pid, returncode, *args) 1335 1336 def _sig_chld(self, signum, frame): 1337 try: 1338 self._do_waitpid_all() 1339 except (SystemExit, KeyboardInterrupt): 1340 raise 1341 except BaseException: 1342 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) 1343 1344 1345class ThreadedChildWatcher(AbstractChildWatcher): 1346 """Threaded child watcher implementation. 1347 1348 The watcher uses a thread per process 1349 for waiting for the process finish. 1350 1351 It doesn't require subscription on POSIX signal 1352 but a thread creation is not free. 1353 1354 The watcher has O(1) complexity, its performance doesn't depend 1355 on amount of spawn processes. 1356 """ 1357 1358 def __init__(self): 1359 self._pid_counter = itertools.count(0) 1360 self._threads = {} 1361 1362 def is_active(self): 1363 return True 1364 1365 def close(self): 1366 self._join_threads() 1367 1368 def _join_threads(self): 1369 """Internal: Join all non-daemon threads""" 1370 threads = [thread for thread in list(self._threads.values()) 1371 if thread.is_alive() and not thread.daemon] 1372 for thread in threads: 1373 thread.join() 1374 1375 def __enter__(self): 1376 return self 1377 1378 def __exit__(self, exc_type, exc_val, exc_tb): 1379 pass 1380 1381 def __del__(self, _warn=warnings.warn): 1382 threads = [thread for thread in list(self._threads.values()) 1383 if thread.is_alive()] 1384 if threads: 1385 _warn(f"{self.__class__} has registered but not finished child processes", 1386 ResourceWarning, 1387 source=self) 1388 1389 def add_child_handler(self, pid, callback, *args): 1390 loop = events.get_running_loop() 1391 thread = threading.Thread(target=self._do_waitpid, 1392 name=f"waitpid-{next(self._pid_counter)}", 1393 args=(loop, pid, callback, args), 1394 daemon=True) 1395 self._threads[pid] = thread 1396 thread.start() 1397 1398 def remove_child_handler(self, pid): 1399 # asyncio never calls remove_child_handler() !!! 1400 # The method is no-op but is implemented because 1401 # abstract base classes require it. 1402 return True 1403 1404 def attach_loop(self, loop): 1405 pass 1406 1407 def _do_waitpid(self, loop, expected_pid, callback, args): 1408 assert expected_pid > 0 1409 1410 try: 1411 pid, status = os.waitpid(expected_pid, 0) 1412 except ChildProcessError: 1413 # The child process is already reaped 1414 # (may happen if waitpid() is called elsewhere). 1415 pid = expected_pid 1416 returncode = 255 1417 logger.warning( 1418 "Unknown child process pid %d, will report returncode 255", 1419 pid) 1420 else: 1421 returncode = waitstatus_to_exitcode(status) 1422 if loop.get_debug(): 1423 logger.debug('process %s exited with returncode %s', 1424 expected_pid, returncode) 1425 1426 if loop.is_closed(): 1427 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 1428 else: 1429 loop.call_soon_threadsafe(callback, pid, returncode, *args) 1430 1431 self._threads.pop(expected_pid) 1432 1433 1434class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 1435 """UNIX event loop policy with a watcher for child processes.""" 1436 _loop_factory = _UnixSelectorEventLoop 1437 1438 def __init__(self): 1439 super().__init__() 1440 self._watcher = None 1441 1442 def _init_watcher(self): 1443 with events._lock: 1444 if self._watcher is None: # pragma: no branch 1445 self._watcher = ThreadedChildWatcher() 1446 if threading.current_thread() is threading.main_thread(): 1447 self._watcher.attach_loop(self._local._loop) 1448 1449 def set_event_loop(self, loop): 1450 """Set the event loop. 1451 1452 As a side effect, if a child watcher was set before, then calling 1453 .set_event_loop() from the main thread will call .attach_loop(loop) on 1454 the child watcher. 1455 """ 1456 1457 super().set_event_loop(loop) 1458 1459 if (self._watcher is not None and 1460 threading.current_thread() is threading.main_thread()): 1461 self._watcher.attach_loop(loop) 1462 1463 def get_child_watcher(self): 1464 """Get the watcher for child processes. 1465 1466 If not yet set, a ThreadedChildWatcher object is automatically created. 1467 """ 1468 if self._watcher is None: 1469 self._init_watcher() 1470 1471 return self._watcher 1472 1473 def set_child_watcher(self, watcher): 1474 """Set the watcher for child processes.""" 1475 1476 assert watcher is None or isinstance(watcher, AbstractChildWatcher) 1477 1478 if self._watcher is not None: 1479 self._watcher.close() 1480 1481 self._watcher = watcher 1482 1483 1484SelectorEventLoop = _UnixSelectorEventLoop 1485DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy 1486