xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/unix_events.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1"""Selector event loop for Unix with signal handling."""
2
3import errno
4import io
5import itertools
6import os
7import selectors
8import signal
9import socket
10import stat
11import subprocess
12import sys
13import threading
14import warnings
15
16from . import base_events
17from . import base_subprocess
18from . import constants
19from . import coroutines
20from . import events
21from . import exceptions
22from . import futures
23from . import selector_events
24from . import tasks
25from . import transports
26from .log import logger
27
28
29__all__ = (
30    'SelectorEventLoop',
31    'AbstractChildWatcher', 'SafeChildWatcher',
32    'FastChildWatcher', 'PidfdChildWatcher',
33    'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34    'DefaultEventLoopPolicy',
35)
36
37
38if sys.platform == 'win32':  # pragma: no cover
39    raise ImportError('Signals are not really supported on Windows')
40
41
42def _sighandler_noop(signum, frame):
43    """Dummy signal handler."""
44    pass
45
46
47def waitstatus_to_exitcode(status):
48    try:
49        return os.waitstatus_to_exitcode(status)
50    except ValueError:
51        # The child exited, but we don't understand its status.
52        # This shouldn't happen, but if it does, let's just
53        # return that status; perhaps that helps debug it.
54        return status
55
56
57class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
58    """Unix event loop.
59
60    Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
61    """
62
63    def __init__(self, selector=None):
64        super().__init__(selector)
65        self._signal_handlers = {}
66
67    def close(self):
68        super().close()
69        if not sys.is_finalizing():
70            for sig in list(self._signal_handlers):
71                self.remove_signal_handler(sig)
72        else:
73            if self._signal_handlers:
74                warnings.warn(f"Closing the loop {self!r} "
75                              f"on interpreter shutdown "
76                              f"stage, skipping signal handlers removal",
77                              ResourceWarning,
78                              source=self)
79                self._signal_handlers.clear()
80
81    def _process_self_data(self, data):
82        for signum in data:
83            if not signum:
84                # ignore null bytes written by _write_to_self()
85                continue
86            self._handle_signal(signum)
87
88    def add_signal_handler(self, sig, callback, *args):
89        """Add a handler for a signal.  UNIX only.
90
91        Raise ValueError if the signal number is invalid or uncatchable.
92        Raise RuntimeError if there is a problem setting up the handler.
93        """
94        if (coroutines.iscoroutine(callback) or
95                coroutines.iscoroutinefunction(callback)):
96            raise TypeError("coroutines cannot be used "
97                            "with add_signal_handler()")
98        self._check_signal(sig)
99        self._check_closed()
100        try:
101            # set_wakeup_fd() raises ValueError if this is not the
102            # main thread.  By calling it early we ensure that an
103            # event loop running in another thread cannot add a signal
104            # handler.
105            signal.set_wakeup_fd(self._csock.fileno())
106        except (ValueError, OSError) as exc:
107            raise RuntimeError(str(exc))
108
109        handle = events.Handle(callback, args, self, None)
110        self._signal_handlers[sig] = handle
111
112        try:
113            # Register a dummy signal handler to ask Python to write the signal
114            # number in the wakeup file descriptor. _process_self_data() will
115            # read signal numbers from this file descriptor to handle signals.
116            signal.signal(sig, _sighandler_noop)
117
118            # Set SA_RESTART to limit EINTR occurrences.
119            signal.siginterrupt(sig, False)
120        except OSError as exc:
121            del self._signal_handlers[sig]
122            if not self._signal_handlers:
123                try:
124                    signal.set_wakeup_fd(-1)
125                except (ValueError, OSError) as nexc:
126                    logger.info('set_wakeup_fd(-1) failed: %s', nexc)
127
128            if exc.errno == errno.EINVAL:
129                raise RuntimeError(f'sig {sig} cannot be caught')
130            else:
131                raise
132
133    def _handle_signal(self, sig):
134        """Internal helper that is the actual signal handler."""
135        handle = self._signal_handlers.get(sig)
136        if handle is None:
137            return  # Assume it's some race condition.
138        if handle._cancelled:
139            self.remove_signal_handler(sig)  # Remove it properly.
140        else:
141            self._add_callback_signalsafe(handle)
142
143    def remove_signal_handler(self, sig):
144        """Remove a handler for a signal.  UNIX only.
145
146        Return True if a signal handler was removed, False if not.
147        """
148        self._check_signal(sig)
149        try:
150            del self._signal_handlers[sig]
151        except KeyError:
152            return False
153
154        if sig == signal.SIGINT:
155            handler = signal.default_int_handler
156        else:
157            handler = signal.SIG_DFL
158
159        try:
160            signal.signal(sig, handler)
161        except OSError as exc:
162            if exc.errno == errno.EINVAL:
163                raise RuntimeError(f'sig {sig} cannot be caught')
164            else:
165                raise
166
167        if not self._signal_handlers:
168            try:
169                signal.set_wakeup_fd(-1)
170            except (ValueError, OSError) as exc:
171                logger.info('set_wakeup_fd(-1) failed: %s', exc)
172
173        return True
174
175    def _check_signal(self, sig):
176        """Internal helper to validate a signal.
177
178        Raise ValueError if the signal number is invalid or uncatchable.
179        Raise RuntimeError if there is a problem setting up the handler.
180        """
181        if not isinstance(sig, int):
182            raise TypeError(f'sig must be an int, not {sig!r}')
183
184        if sig not in signal.valid_signals():
185            raise ValueError(f'invalid signal number {sig}')
186
187    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
188                                  extra=None):
189        return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
190
191    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
192                                   extra=None):
193        return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
194
195    async def _make_subprocess_transport(self, protocol, args, shell,
196                                         stdin, stdout, stderr, bufsize,
197                                         extra=None, **kwargs):
198        with events.get_child_watcher() as watcher:
199            if not watcher.is_active():
200                # Check early.
201                # Raising exception before process creation
202                # prevents subprocess execution if the watcher
203                # is not ready to handle it.
204                raise RuntimeError("asyncio.get_child_watcher() is not activated, "
205                                   "subprocess support is not installed.")
206            waiter = self.create_future()
207            transp = _UnixSubprocessTransport(self, protocol, args, shell,
208                                              stdin, stdout, stderr, bufsize,
209                                              waiter=waiter, extra=extra,
210                                              **kwargs)
211
212            watcher.add_child_handler(transp.get_pid(),
213                                      self._child_watcher_callback, transp)
214            try:
215                await waiter
216            except (SystemExit, KeyboardInterrupt):
217                raise
218            except BaseException:
219                transp.close()
220                await transp._wait()
221                raise
222
223        return transp
224
225    def _child_watcher_callback(self, pid, returncode, transp):
226        # Skip one iteration for callbacks to be executed
227        self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
228
229    async def create_unix_connection(
230            self, protocol_factory, path=None, *,
231            ssl=None, sock=None,
232            server_hostname=None,
233            ssl_handshake_timeout=None,
234            ssl_shutdown_timeout=None):
235        assert server_hostname is None or isinstance(server_hostname, str)
236        if ssl:
237            if server_hostname is None:
238                raise ValueError(
239                    'you have to pass server_hostname when using ssl')
240        else:
241            if server_hostname is not None:
242                raise ValueError('server_hostname is only meaningful with ssl')
243            if ssl_handshake_timeout is not None:
244                raise ValueError(
245                    'ssl_handshake_timeout is only meaningful with ssl')
246            if ssl_shutdown_timeout is not None:
247                raise ValueError(
248                    'ssl_shutdown_timeout is only meaningful with ssl')
249
250        if path is not None:
251            if sock is not None:
252                raise ValueError(
253                    'path and sock can not be specified at the same time')
254
255            path = os.fspath(path)
256            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
257            try:
258                sock.setblocking(False)
259                await self.sock_connect(sock, path)
260            except:
261                sock.close()
262                raise
263
264        else:
265            if sock is None:
266                raise ValueError('no path and sock were specified')
267            if (sock.family != socket.AF_UNIX or
268                    sock.type != socket.SOCK_STREAM):
269                raise ValueError(
270                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
271            sock.setblocking(False)
272
273        transport, protocol = await self._create_connection_transport(
274            sock, protocol_factory, ssl, server_hostname,
275            ssl_handshake_timeout=ssl_handshake_timeout,
276            ssl_shutdown_timeout=ssl_shutdown_timeout)
277        return transport, protocol
278
279    async def create_unix_server(
280            self, protocol_factory, path=None, *,
281            sock=None, backlog=100, ssl=None,
282            ssl_handshake_timeout=None,
283            ssl_shutdown_timeout=None,
284            start_serving=True):
285        if isinstance(ssl, bool):
286            raise TypeError('ssl argument must be an SSLContext or None')
287
288        if ssl_handshake_timeout is not None and not ssl:
289            raise ValueError(
290                'ssl_handshake_timeout is only meaningful with ssl')
291
292        if ssl_shutdown_timeout is not None and not ssl:
293            raise ValueError(
294                'ssl_shutdown_timeout is only meaningful with ssl')
295
296        if path is not None:
297            if sock is not None:
298                raise ValueError(
299                    'path and sock can not be specified at the same time')
300
301            path = os.fspath(path)
302            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
303
304            # Check for abstract socket. `str` and `bytes` paths are supported.
305            if path[0] not in (0, '\x00'):
306                try:
307                    if stat.S_ISSOCK(os.stat(path).st_mode):
308                        os.remove(path)
309                except FileNotFoundError:
310                    pass
311                except OSError as err:
312                    # Directory may have permissions only to create socket.
313                    logger.error('Unable to check or remove stale UNIX socket '
314                                 '%r: %r', path, err)
315
316            try:
317                sock.bind(path)
318            except OSError as exc:
319                sock.close()
320                if exc.errno == errno.EADDRINUSE:
321                    # Let's improve the error message by adding
322                    # with what exact address it occurs.
323                    msg = f'Address {path!r} is already in use'
324                    raise OSError(errno.EADDRINUSE, msg) from None
325                else:
326                    raise
327            except:
328                sock.close()
329                raise
330        else:
331            if sock is None:
332                raise ValueError(
333                    'path was not specified, and no sock specified')
334
335            if (sock.family != socket.AF_UNIX or
336                    sock.type != socket.SOCK_STREAM):
337                raise ValueError(
338                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
339
340        sock.setblocking(False)
341        server = base_events.Server(self, [sock], protocol_factory,
342                                    ssl, backlog, ssl_handshake_timeout,
343                                    ssl_shutdown_timeout)
344        if start_serving:
345            server._start_serving()
346            # Skip one loop iteration so that all 'loop.add_reader'
347            # go through.
348            await tasks.sleep(0)
349
350        return server
351
352    async def _sock_sendfile_native(self, sock, file, offset, count):
353        try:
354            os.sendfile
355        except AttributeError:
356            raise exceptions.SendfileNotAvailableError(
357                "os.sendfile() is not available")
358        try:
359            fileno = file.fileno()
360        except (AttributeError, io.UnsupportedOperation) as err:
361            raise exceptions.SendfileNotAvailableError("not a regular file")
362        try:
363            fsize = os.fstat(fileno).st_size
364        except OSError:
365            raise exceptions.SendfileNotAvailableError("not a regular file")
366        blocksize = count if count else fsize
367        if not blocksize:
368            return 0  # empty file
369
370        fut = self.create_future()
371        self._sock_sendfile_native_impl(fut, None, sock, fileno,
372                                        offset, count, blocksize, 0)
373        return await fut
374
375    def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
376                                   offset, count, blocksize, total_sent):
377        fd = sock.fileno()
378        if registered_fd is not None:
379            # Remove the callback early.  It should be rare that the
380            # selector says the fd is ready but the call still returns
381            # EAGAIN, and I am willing to take a hit in that case in
382            # order to simplify the common case.
383            self.remove_writer(registered_fd)
384        if fut.cancelled():
385            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
386            return
387        if count:
388            blocksize = count - total_sent
389            if blocksize <= 0:
390                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
391                fut.set_result(total_sent)
392                return
393
394        try:
395            sent = os.sendfile(fd, fileno, offset, blocksize)
396        except (BlockingIOError, InterruptedError):
397            if registered_fd is None:
398                self._sock_add_cancellation_callback(fut, sock)
399            self.add_writer(fd, self._sock_sendfile_native_impl, fut,
400                            fd, sock, fileno,
401                            offset, count, blocksize, total_sent)
402        except OSError as exc:
403            if (registered_fd is not None and
404                    exc.errno == errno.ENOTCONN and
405                    type(exc) is not ConnectionError):
406                # If we have an ENOTCONN and this isn't a first call to
407                # sendfile(), i.e. the connection was closed in the middle
408                # of the operation, normalize the error to ConnectionError
409                # to make it consistent across all Posix systems.
410                new_exc = ConnectionError(
411                    "socket is not connected", errno.ENOTCONN)
412                new_exc.__cause__ = exc
413                exc = new_exc
414            if total_sent == 0:
415                # We can get here for different reasons, the main
416                # one being 'file' is not a regular mmap(2)-like
417                # file, in which case we'll fall back on using
418                # plain send().
419                err = exceptions.SendfileNotAvailableError(
420                    "os.sendfile call failed")
421                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
422                fut.set_exception(err)
423            else:
424                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
425                fut.set_exception(exc)
426        except (SystemExit, KeyboardInterrupt):
427            raise
428        except BaseException as exc:
429            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
430            fut.set_exception(exc)
431        else:
432            if sent == 0:
433                # EOF
434                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
435                fut.set_result(total_sent)
436            else:
437                offset += sent
438                total_sent += sent
439                if registered_fd is None:
440                    self._sock_add_cancellation_callback(fut, sock)
441                self.add_writer(fd, self._sock_sendfile_native_impl, fut,
442                                fd, sock, fileno,
443                                offset, count, blocksize, total_sent)
444
445    def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
446        if total_sent > 0:
447            os.lseek(fileno, offset, os.SEEK_SET)
448
449    def _sock_add_cancellation_callback(self, fut, sock):
450        def cb(fut):
451            if fut.cancelled():
452                fd = sock.fileno()
453                if fd != -1:
454                    self.remove_writer(fd)
455        fut.add_done_callback(cb)
456
457
458class _UnixReadPipeTransport(transports.ReadTransport):
459
460    max_size = 256 * 1024  # max bytes we read in one event loop iteration
461
462    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
463        super().__init__(extra)
464        self._extra['pipe'] = pipe
465        self._loop = loop
466        self._pipe = pipe
467        self._fileno = pipe.fileno()
468        self._protocol = protocol
469        self._closing = False
470        self._paused = False
471
472        mode = os.fstat(self._fileno).st_mode
473        if not (stat.S_ISFIFO(mode) or
474                stat.S_ISSOCK(mode) or
475                stat.S_ISCHR(mode)):
476            self._pipe = None
477            self._fileno = None
478            self._protocol = None
479            raise ValueError("Pipe transport is for pipes/sockets only.")
480
481        os.set_blocking(self._fileno, False)
482
483        self._loop.call_soon(self._protocol.connection_made, self)
484        # only start reading when connection_made() has been called
485        self._loop.call_soon(self._add_reader,
486                             self._fileno, self._read_ready)
487        if waiter is not None:
488            # only wake up the waiter when connection_made() has been called
489            self._loop.call_soon(futures._set_result_unless_cancelled,
490                                 waiter, None)
491
492    def _add_reader(self, fd, callback):
493        if not self.is_reading():
494            return
495        self._loop._add_reader(fd, callback)
496
497    def is_reading(self):
498        return not self._paused and not self._closing
499
500    def __repr__(self):
501        info = [self.__class__.__name__]
502        if self._pipe is None:
503            info.append('closed')
504        elif self._closing:
505            info.append('closing')
506        info.append(f'fd={self._fileno}')
507        selector = getattr(self._loop, '_selector', None)
508        if self._pipe is not None and selector is not None:
509            polling = selector_events._test_selector_event(
510                selector, self._fileno, selectors.EVENT_READ)
511            if polling:
512                info.append('polling')
513            else:
514                info.append('idle')
515        elif self._pipe is not None:
516            info.append('open')
517        else:
518            info.append('closed')
519        return '<{}>'.format(' '.join(info))
520
521    def _read_ready(self):
522        try:
523            data = os.read(self._fileno, self.max_size)
524        except (BlockingIOError, InterruptedError):
525            pass
526        except OSError as exc:
527            self._fatal_error(exc, 'Fatal read error on pipe transport')
528        else:
529            if data:
530                self._protocol.data_received(data)
531            else:
532                if self._loop.get_debug():
533                    logger.info("%r was closed by peer", self)
534                self._closing = True
535                self._loop._remove_reader(self._fileno)
536                self._loop.call_soon(self._protocol.eof_received)
537                self._loop.call_soon(self._call_connection_lost, None)
538
539    def pause_reading(self):
540        if not self.is_reading():
541            return
542        self._paused = True
543        self._loop._remove_reader(self._fileno)
544        if self._loop.get_debug():
545            logger.debug("%r pauses reading", self)
546
547    def resume_reading(self):
548        if self._closing or not self._paused:
549            return
550        self._paused = False
551        self._loop._add_reader(self._fileno, self._read_ready)
552        if self._loop.get_debug():
553            logger.debug("%r resumes reading", self)
554
555    def set_protocol(self, protocol):
556        self._protocol = protocol
557
558    def get_protocol(self):
559        return self._protocol
560
561    def is_closing(self):
562        return self._closing
563
564    def close(self):
565        if not self._closing:
566            self._close(None)
567
568    def __del__(self, _warn=warnings.warn):
569        if self._pipe is not None:
570            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
571            self._pipe.close()
572
573    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
574        # should be called by exception handler only
575        if (isinstance(exc, OSError) and exc.errno == errno.EIO):
576            if self._loop.get_debug():
577                logger.debug("%r: %s", self, message, exc_info=True)
578        else:
579            self._loop.call_exception_handler({
580                'message': message,
581                'exception': exc,
582                'transport': self,
583                'protocol': self._protocol,
584            })
585        self._close(exc)
586
587    def _close(self, exc):
588        self._closing = True
589        self._loop._remove_reader(self._fileno)
590        self._loop.call_soon(self._call_connection_lost, exc)
591
592    def _call_connection_lost(self, exc):
593        try:
594            self._protocol.connection_lost(exc)
595        finally:
596            self._pipe.close()
597            self._pipe = None
598            self._protocol = None
599            self._loop = None
600
601
602class _UnixWritePipeTransport(transports._FlowControlMixin,
603                              transports.WriteTransport):
604
605    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
606        super().__init__(extra, loop)
607        self._extra['pipe'] = pipe
608        self._pipe = pipe
609        self._fileno = pipe.fileno()
610        self._protocol = protocol
611        self._buffer = bytearray()
612        self._conn_lost = 0
613        self._closing = False  # Set when close() or write_eof() called.
614
615        mode = os.fstat(self._fileno).st_mode
616        is_char = stat.S_ISCHR(mode)
617        is_fifo = stat.S_ISFIFO(mode)
618        is_socket = stat.S_ISSOCK(mode)
619        if not (is_char or is_fifo or is_socket):
620            self._pipe = None
621            self._fileno = None
622            self._protocol = None
623            raise ValueError("Pipe transport is only for "
624                             "pipes, sockets and character devices")
625
626        os.set_blocking(self._fileno, False)
627        self._loop.call_soon(self._protocol.connection_made, self)
628
629        # On AIX, the reader trick (to be notified when the read end of the
630        # socket is closed) only works for sockets. On other platforms it
631        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
632        if is_socket or (is_fifo and not sys.platform.startswith("aix")):
633            # only start reading when connection_made() has been called
634            self._loop.call_soon(self._loop._add_reader,
635                                 self._fileno, self._read_ready)
636
637        if waiter is not None:
638            # only wake up the waiter when connection_made() has been called
639            self._loop.call_soon(futures._set_result_unless_cancelled,
640                                 waiter, None)
641
642    def __repr__(self):
643        info = [self.__class__.__name__]
644        if self._pipe is None:
645            info.append('closed')
646        elif self._closing:
647            info.append('closing')
648        info.append(f'fd={self._fileno}')
649        selector = getattr(self._loop, '_selector', None)
650        if self._pipe is not None and selector is not None:
651            polling = selector_events._test_selector_event(
652                selector, self._fileno, selectors.EVENT_WRITE)
653            if polling:
654                info.append('polling')
655            else:
656                info.append('idle')
657
658            bufsize = self.get_write_buffer_size()
659            info.append(f'bufsize={bufsize}')
660        elif self._pipe is not None:
661            info.append('open')
662        else:
663            info.append('closed')
664        return '<{}>'.format(' '.join(info))
665
666    def get_write_buffer_size(self):
667        return len(self._buffer)
668
669    def _read_ready(self):
670        # Pipe was closed by peer.
671        if self._loop.get_debug():
672            logger.info("%r was closed by peer", self)
673        if self._buffer:
674            self._close(BrokenPipeError())
675        else:
676            self._close()
677
678    def write(self, data):
679        assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
680        if isinstance(data, bytearray):
681            data = memoryview(data)
682        if not data:
683            return
684
685        if self._conn_lost or self._closing:
686            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
687                logger.warning('pipe closed by peer or '
688                               'os.write(pipe, data) raised exception.')
689            self._conn_lost += 1
690            return
691
692        if not self._buffer:
693            # Attempt to send it right away first.
694            try:
695                n = os.write(self._fileno, data)
696            except (BlockingIOError, InterruptedError):
697                n = 0
698            except (SystemExit, KeyboardInterrupt):
699                raise
700            except BaseException as exc:
701                self._conn_lost += 1
702                self._fatal_error(exc, 'Fatal write error on pipe transport')
703                return
704            if n == len(data):
705                return
706            elif n > 0:
707                data = memoryview(data)[n:]
708            self._loop._add_writer(self._fileno, self._write_ready)
709
710        self._buffer += data
711        self._maybe_pause_protocol()
712
713    def _write_ready(self):
714        assert self._buffer, 'Data should not be empty'
715
716        try:
717            n = os.write(self._fileno, self._buffer)
718        except (BlockingIOError, InterruptedError):
719            pass
720        except (SystemExit, KeyboardInterrupt):
721            raise
722        except BaseException as exc:
723            self._buffer.clear()
724            self._conn_lost += 1
725            # Remove writer here, _fatal_error() doesn't it
726            # because _buffer is empty.
727            self._loop._remove_writer(self._fileno)
728            self._fatal_error(exc, 'Fatal write error on pipe transport')
729        else:
730            if n == len(self._buffer):
731                self._buffer.clear()
732                self._loop._remove_writer(self._fileno)
733                self._maybe_resume_protocol()  # May append to buffer.
734                if self._closing:
735                    self._loop._remove_reader(self._fileno)
736                    self._call_connection_lost(None)
737                return
738            elif n > 0:
739                del self._buffer[:n]
740
741    def can_write_eof(self):
742        return True
743
744    def write_eof(self):
745        if self._closing:
746            return
747        assert self._pipe
748        self._closing = True
749        if not self._buffer:
750            self._loop._remove_reader(self._fileno)
751            self._loop.call_soon(self._call_connection_lost, None)
752
753    def set_protocol(self, protocol):
754        self._protocol = protocol
755
756    def get_protocol(self):
757        return self._protocol
758
759    def is_closing(self):
760        return self._closing
761
762    def close(self):
763        if self._pipe is not None and not self._closing:
764            # write_eof is all what we needed to close the write pipe
765            self.write_eof()
766
767    def __del__(self, _warn=warnings.warn):
768        if self._pipe is not None:
769            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
770            self._pipe.close()
771
772    def abort(self):
773        self._close(None)
774
775    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
776        # should be called by exception handler only
777        if isinstance(exc, OSError):
778            if self._loop.get_debug():
779                logger.debug("%r: %s", self, message, exc_info=True)
780        else:
781            self._loop.call_exception_handler({
782                'message': message,
783                'exception': exc,
784                'transport': self,
785                'protocol': self._protocol,
786            })
787        self._close(exc)
788
789    def _close(self, exc=None):
790        self._closing = True
791        if self._buffer:
792            self._loop._remove_writer(self._fileno)
793        self._buffer.clear()
794        self._loop._remove_reader(self._fileno)
795        self._loop.call_soon(self._call_connection_lost, exc)
796
797    def _call_connection_lost(self, exc):
798        try:
799            self._protocol.connection_lost(exc)
800        finally:
801            self._pipe.close()
802            self._pipe = None
803            self._protocol = None
804            self._loop = None
805
806
807class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
808
809    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
810        stdin_w = None
811        if stdin == subprocess.PIPE and sys.platform.startswith('aix'):
812            # Use a socket pair for stdin on AIX, since it does not
813            # support selecting read events on the write end of a
814            # socket (which we use in order to detect closing of the
815            # other end).
816            stdin, stdin_w = socket.socketpair()
817        try:
818            self._proc = subprocess.Popen(
819                args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
820                universal_newlines=False, bufsize=bufsize, **kwargs)
821            if stdin_w is not None:
822                stdin.close()
823                self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
824                stdin_w = None
825        finally:
826            if stdin_w is not None:
827                stdin.close()
828                stdin_w.close()
829
830
831class AbstractChildWatcher:
832    """Abstract base class for monitoring child processes.
833
834    Objects derived from this class monitor a collection of subprocesses and
835    report their termination or interruption by a signal.
836
837    New callbacks are registered with .add_child_handler(). Starting a new
838    process must be done within a 'with' block to allow the watcher to suspend
839    its activity until the new process if fully registered (this is needed to
840    prevent a race condition in some implementations).
841
842    Example:
843        with watcher:
844            proc = subprocess.Popen("sleep 1")
845            watcher.add_child_handler(proc.pid, callback)
846
847    Notes:
848        Implementations of this class must be thread-safe.
849
850        Since child watcher objects may catch the SIGCHLD signal and call
851        waitpid(-1), there should be only one active object per process.
852    """
853
854    def add_child_handler(self, pid, callback, *args):
855        """Register a new child handler.
856
857        Arrange for callback(pid, returncode, *args) to be called when
858        process 'pid' terminates. Specifying another callback for the same
859        process replaces the previous handler.
860
861        Note: callback() must be thread-safe.
862        """
863        raise NotImplementedError()
864
865    def remove_child_handler(self, pid):
866        """Removes the handler for process 'pid'.
867
868        The function returns True if the handler was successfully removed,
869        False if there was nothing to remove."""
870
871        raise NotImplementedError()
872
873    def attach_loop(self, loop):
874        """Attach the watcher to an event loop.
875
876        If the watcher was previously attached to an event loop, then it is
877        first detached before attaching to the new loop.
878
879        Note: loop may be None.
880        """
881        raise NotImplementedError()
882
883    def close(self):
884        """Close the watcher.
885
886        This must be called to make sure that any underlying resource is freed.
887        """
888        raise NotImplementedError()
889
890    def is_active(self):
891        """Return ``True`` if the watcher is active and is used by the event loop.
892
893        Return True if the watcher is installed and ready to handle process exit
894        notifications.
895
896        """
897        raise NotImplementedError()
898
899    def __enter__(self):
900        """Enter the watcher's context and allow starting new processes
901
902        This function must return self"""
903        raise NotImplementedError()
904
905    def __exit__(self, a, b, c):
906        """Exit the watcher's context"""
907        raise NotImplementedError()
908
909
910class PidfdChildWatcher(AbstractChildWatcher):
911    """Child watcher implementation using Linux's pid file descriptors.
912
913    This child watcher polls process file descriptors (pidfds) to await child
914    process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
915    child watcher implementation. It doesn't require signals or threads, doesn't
916    interfere with any processes launched outside the event loop, and scales
917    linearly with the number of subprocesses launched by the event loop. The
918    main disadvantage is that pidfds are specific to Linux, and only work on
919    recent (5.3+) kernels.
920    """
921
922    def __init__(self):
923        self._loop = None
924        self._callbacks = {}
925
926    def __enter__(self):
927        return self
928
929    def __exit__(self, exc_type, exc_value, exc_traceback):
930        pass
931
932    def is_active(self):
933        return self._loop is not None and self._loop.is_running()
934
935    def close(self):
936        self.attach_loop(None)
937
938    def attach_loop(self, loop):
939        if self._loop is not None and loop is None and self._callbacks:
940            warnings.warn(
941                'A loop is being detached '
942                'from a child watcher with pending handlers',
943                RuntimeWarning)
944        for pidfd, _, _ in self._callbacks.values():
945            self._loop._remove_reader(pidfd)
946            os.close(pidfd)
947        self._callbacks.clear()
948        self._loop = loop
949
950    def add_child_handler(self, pid, callback, *args):
951        existing = self._callbacks.get(pid)
952        if existing is not None:
953            self._callbacks[pid] = existing[0], callback, args
954        else:
955            pidfd = os.pidfd_open(pid)
956            self._loop._add_reader(pidfd, self._do_wait, pid)
957            self._callbacks[pid] = pidfd, callback, args
958
959    def _do_wait(self, pid):
960        pidfd, callback, args = self._callbacks.pop(pid)
961        self._loop._remove_reader(pidfd)
962        try:
963            _, status = os.waitpid(pid, 0)
964        except ChildProcessError:
965            # The child process is already reaped
966            # (may happen if waitpid() is called elsewhere).
967            returncode = 255
968            logger.warning(
969                "child process pid %d exit status already read: "
970                " will report returncode 255",
971                pid)
972        else:
973            returncode = waitstatus_to_exitcode(status)
974
975        os.close(pidfd)
976        callback(pid, returncode, *args)
977
978    def remove_child_handler(self, pid):
979        try:
980            pidfd, _, _ = self._callbacks.pop(pid)
981        except KeyError:
982            return False
983        self._loop._remove_reader(pidfd)
984        os.close(pidfd)
985        return True
986
987
988class BaseChildWatcher(AbstractChildWatcher):
989
990    def __init__(self):
991        self._loop = None
992        self._callbacks = {}
993
994    def close(self):
995        self.attach_loop(None)
996
997    def is_active(self):
998        return self._loop is not None and self._loop.is_running()
999
1000    def _do_waitpid(self, expected_pid):
1001        raise NotImplementedError()
1002
1003    def _do_waitpid_all(self):
1004        raise NotImplementedError()
1005
1006    def attach_loop(self, loop):
1007        assert loop is None or isinstance(loop, events.AbstractEventLoop)
1008
1009        if self._loop is not None and loop is None and self._callbacks:
1010            warnings.warn(
1011                'A loop is being detached '
1012                'from a child watcher with pending handlers',
1013                RuntimeWarning)
1014
1015        if self._loop is not None:
1016            self._loop.remove_signal_handler(signal.SIGCHLD)
1017
1018        self._loop = loop
1019        if loop is not None:
1020            loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
1021
1022            # Prevent a race condition in case a child terminated
1023            # during the switch.
1024            self._do_waitpid_all()
1025
1026    def _sig_chld(self):
1027        try:
1028            self._do_waitpid_all()
1029        except (SystemExit, KeyboardInterrupt):
1030            raise
1031        except BaseException as exc:
1032            # self._loop should always be available here
1033            # as '_sig_chld' is added as a signal handler
1034            # in 'attach_loop'
1035            self._loop.call_exception_handler({
1036                'message': 'Unknown exception in SIGCHLD handler',
1037                'exception': exc,
1038            })
1039
1040
1041class SafeChildWatcher(BaseChildWatcher):
1042    """'Safe' child watcher implementation.
1043
1044    This implementation avoids disrupting other code spawning processes by
1045    polling explicitly each process in the SIGCHLD handler instead of calling
1046    os.waitpid(-1).
1047
1048    This is a safe solution but it has a significant overhead when handling a
1049    big number of children (O(n) each time SIGCHLD is raised)
1050    """
1051
1052    def close(self):
1053        self._callbacks.clear()
1054        super().close()
1055
1056    def __enter__(self):
1057        return self
1058
1059    def __exit__(self, a, b, c):
1060        pass
1061
1062    def add_child_handler(self, pid, callback, *args):
1063        self._callbacks[pid] = (callback, args)
1064
1065        # Prevent a race condition in case the child is already terminated.
1066        self._do_waitpid(pid)
1067
1068    def remove_child_handler(self, pid):
1069        try:
1070            del self._callbacks[pid]
1071            return True
1072        except KeyError:
1073            return False
1074
1075    def _do_waitpid_all(self):
1076
1077        for pid in list(self._callbacks):
1078            self._do_waitpid(pid)
1079
1080    def _do_waitpid(self, expected_pid):
1081        assert expected_pid > 0
1082
1083        try:
1084            pid, status = os.waitpid(expected_pid, os.WNOHANG)
1085        except ChildProcessError:
1086            # The child process is already reaped
1087            # (may happen if waitpid() is called elsewhere).
1088            pid = expected_pid
1089            returncode = 255
1090            logger.warning(
1091                "Unknown child process pid %d, will report returncode 255",
1092                pid)
1093        else:
1094            if pid == 0:
1095                # The child process is still alive.
1096                return
1097
1098            returncode = waitstatus_to_exitcode(status)
1099            if self._loop.get_debug():
1100                logger.debug('process %s exited with returncode %s',
1101                             expected_pid, returncode)
1102
1103        try:
1104            callback, args = self._callbacks.pop(pid)
1105        except KeyError:  # pragma: no cover
1106            # May happen if .remove_child_handler() is called
1107            # after os.waitpid() returns.
1108            if self._loop.get_debug():
1109                logger.warning("Child watcher got an unexpected pid: %r",
1110                               pid, exc_info=True)
1111        else:
1112            callback(pid, returncode, *args)
1113
1114
1115class FastChildWatcher(BaseChildWatcher):
1116    """'Fast' child watcher implementation.
1117
1118    This implementation reaps every terminated processes by calling
1119    os.waitpid(-1) directly, possibly breaking other code spawning processes
1120    and waiting for their termination.
1121
1122    There is no noticeable overhead when handling a big number of children
1123    (O(1) each time a child terminates).
1124    """
1125    def __init__(self):
1126        super().__init__()
1127        self._lock = threading.Lock()
1128        self._zombies = {}
1129        self._forks = 0
1130
1131    def close(self):
1132        self._callbacks.clear()
1133        self._zombies.clear()
1134        super().close()
1135
1136    def __enter__(self):
1137        with self._lock:
1138            self._forks += 1
1139
1140            return self
1141
1142    def __exit__(self, a, b, c):
1143        with self._lock:
1144            self._forks -= 1
1145
1146            if self._forks or not self._zombies:
1147                return
1148
1149            collateral_victims = str(self._zombies)
1150            self._zombies.clear()
1151
1152        logger.warning(
1153            "Caught subprocesses termination from unknown pids: %s",
1154            collateral_victims)
1155
1156    def add_child_handler(self, pid, callback, *args):
1157        assert self._forks, "Must use the context manager"
1158
1159        with self._lock:
1160            try:
1161                returncode = self._zombies.pop(pid)
1162            except KeyError:
1163                # The child is running.
1164                self._callbacks[pid] = callback, args
1165                return
1166
1167        # The child is dead already. We can fire the callback.
1168        callback(pid, returncode, *args)
1169
1170    def remove_child_handler(self, pid):
1171        try:
1172            del self._callbacks[pid]
1173            return True
1174        except KeyError:
1175            return False
1176
1177    def _do_waitpid_all(self):
1178        # Because of signal coalescing, we must keep calling waitpid() as
1179        # long as we're able to reap a child.
1180        while True:
1181            try:
1182                pid, status = os.waitpid(-1, os.WNOHANG)
1183            except ChildProcessError:
1184                # No more child processes exist.
1185                return
1186            else:
1187                if pid == 0:
1188                    # A child process is still alive.
1189                    return
1190
1191                returncode = waitstatus_to_exitcode(status)
1192
1193            with self._lock:
1194                try:
1195                    callback, args = self._callbacks.pop(pid)
1196                except KeyError:
1197                    # unknown child
1198                    if self._forks:
1199                        # It may not be registered yet.
1200                        self._zombies[pid] = returncode
1201                        if self._loop.get_debug():
1202                            logger.debug('unknown process %s exited '
1203                                         'with returncode %s',
1204                                         pid, returncode)
1205                        continue
1206                    callback = None
1207                else:
1208                    if self._loop.get_debug():
1209                        logger.debug('process %s exited with returncode %s',
1210                                     pid, returncode)
1211
1212            if callback is None:
1213                logger.warning(
1214                    "Caught subprocess termination from unknown pid: "
1215                    "%d -> %d", pid, returncode)
1216            else:
1217                callback(pid, returncode, *args)
1218
1219
1220class MultiLoopChildWatcher(AbstractChildWatcher):
1221    """A watcher that doesn't require running loop in the main thread.
1222
1223    This implementation registers a SIGCHLD signal handler on
1224    instantiation (which may conflict with other code that
1225    install own handler for this signal).
1226
1227    The solution is safe but it has a significant overhead when
1228    handling a big number of processes (*O(n)* each time a
1229    SIGCHLD is received).
1230    """
1231
1232    # Implementation note:
1233    # The class keeps compatibility with AbstractChildWatcher ABC
1234    # To achieve this it has empty attach_loop() method
1235    # and doesn't accept explicit loop argument
1236    # for add_child_handler()/remove_child_handler()
1237    # but retrieves the current loop by get_running_loop()
1238
1239    def __init__(self):
1240        self._callbacks = {}
1241        self._saved_sighandler = None
1242
1243    def is_active(self):
1244        return self._saved_sighandler is not None
1245
1246    def close(self):
1247        self._callbacks.clear()
1248        if self._saved_sighandler is None:
1249            return
1250
1251        handler = signal.getsignal(signal.SIGCHLD)
1252        if handler != self._sig_chld:
1253            logger.warning("SIGCHLD handler was changed by outside code")
1254        else:
1255            signal.signal(signal.SIGCHLD, self._saved_sighandler)
1256        self._saved_sighandler = None
1257
1258    def __enter__(self):
1259        return self
1260
1261    def __exit__(self, exc_type, exc_val, exc_tb):
1262        pass
1263
1264    def add_child_handler(self, pid, callback, *args):
1265        loop = events.get_running_loop()
1266        self._callbacks[pid] = (loop, callback, args)
1267
1268        # Prevent a race condition in case the child is already terminated.
1269        self._do_waitpid(pid)
1270
1271    def remove_child_handler(self, pid):
1272        try:
1273            del self._callbacks[pid]
1274            return True
1275        except KeyError:
1276            return False
1277
1278    def attach_loop(self, loop):
1279        # Don't save the loop but initialize itself if called first time
1280        # The reason to do it here is that attach_loop() is called from
1281        # unix policy only for the main thread.
1282        # Main thread is required for subscription on SIGCHLD signal
1283        if self._saved_sighandler is not None:
1284            return
1285
1286        self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1287        if self._saved_sighandler is None:
1288            logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1289                           "restore to default handler on watcher close.")
1290            self._saved_sighandler = signal.SIG_DFL
1291
1292        # Set SA_RESTART to limit EINTR occurrences.
1293        signal.siginterrupt(signal.SIGCHLD, False)
1294
1295    def _do_waitpid_all(self):
1296        for pid in list(self._callbacks):
1297            self._do_waitpid(pid)
1298
1299    def _do_waitpid(self, expected_pid):
1300        assert expected_pid > 0
1301
1302        try:
1303            pid, status = os.waitpid(expected_pid, os.WNOHANG)
1304        except ChildProcessError:
1305            # The child process is already reaped
1306            # (may happen if waitpid() is called elsewhere).
1307            pid = expected_pid
1308            returncode = 255
1309            logger.warning(
1310                "Unknown child process pid %d, will report returncode 255",
1311                pid)
1312            debug_log = False
1313        else:
1314            if pid == 0:
1315                # The child process is still alive.
1316                return
1317
1318            returncode = waitstatus_to_exitcode(status)
1319            debug_log = True
1320        try:
1321            loop, callback, args = self._callbacks.pop(pid)
1322        except KeyError:  # pragma: no cover
1323            # May happen if .remove_child_handler() is called
1324            # after os.waitpid() returns.
1325            logger.warning("Child watcher got an unexpected pid: %r",
1326                           pid, exc_info=True)
1327        else:
1328            if loop.is_closed():
1329                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1330            else:
1331                if debug_log and loop.get_debug():
1332                    logger.debug('process %s exited with returncode %s',
1333                                 expected_pid, returncode)
1334                loop.call_soon_threadsafe(callback, pid, returncode, *args)
1335
1336    def _sig_chld(self, signum, frame):
1337        try:
1338            self._do_waitpid_all()
1339        except (SystemExit, KeyboardInterrupt):
1340            raise
1341        except BaseException:
1342            logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1343
1344
1345class ThreadedChildWatcher(AbstractChildWatcher):
1346    """Threaded child watcher implementation.
1347
1348    The watcher uses a thread per process
1349    for waiting for the process finish.
1350
1351    It doesn't require subscription on POSIX signal
1352    but a thread creation is not free.
1353
1354    The watcher has O(1) complexity, its performance doesn't depend
1355    on amount of spawn processes.
1356    """
1357
1358    def __init__(self):
1359        self._pid_counter = itertools.count(0)
1360        self._threads = {}
1361
1362    def is_active(self):
1363        return True
1364
1365    def close(self):
1366        self._join_threads()
1367
1368    def _join_threads(self):
1369        """Internal: Join all non-daemon threads"""
1370        threads = [thread for thread in list(self._threads.values())
1371                   if thread.is_alive() and not thread.daemon]
1372        for thread in threads:
1373            thread.join()
1374
1375    def __enter__(self):
1376        return self
1377
1378    def __exit__(self, exc_type, exc_val, exc_tb):
1379        pass
1380
1381    def __del__(self, _warn=warnings.warn):
1382        threads = [thread for thread in list(self._threads.values())
1383                   if thread.is_alive()]
1384        if threads:
1385            _warn(f"{self.__class__} has registered but not finished child processes",
1386                  ResourceWarning,
1387                  source=self)
1388
1389    def add_child_handler(self, pid, callback, *args):
1390        loop = events.get_running_loop()
1391        thread = threading.Thread(target=self._do_waitpid,
1392                                  name=f"waitpid-{next(self._pid_counter)}",
1393                                  args=(loop, pid, callback, args),
1394                                  daemon=True)
1395        self._threads[pid] = thread
1396        thread.start()
1397
1398    def remove_child_handler(self, pid):
1399        # asyncio never calls remove_child_handler() !!!
1400        # The method is no-op but is implemented because
1401        # abstract base classes require it.
1402        return True
1403
1404    def attach_loop(self, loop):
1405        pass
1406
1407    def _do_waitpid(self, loop, expected_pid, callback, args):
1408        assert expected_pid > 0
1409
1410        try:
1411            pid, status = os.waitpid(expected_pid, 0)
1412        except ChildProcessError:
1413            # The child process is already reaped
1414            # (may happen if waitpid() is called elsewhere).
1415            pid = expected_pid
1416            returncode = 255
1417            logger.warning(
1418                "Unknown child process pid %d, will report returncode 255",
1419                pid)
1420        else:
1421            returncode = waitstatus_to_exitcode(status)
1422            if loop.get_debug():
1423                logger.debug('process %s exited with returncode %s',
1424                             expected_pid, returncode)
1425
1426        if loop.is_closed():
1427            logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1428        else:
1429            loop.call_soon_threadsafe(callback, pid, returncode, *args)
1430
1431        self._threads.pop(expected_pid)
1432
1433
1434class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
1435    """UNIX event loop policy with a watcher for child processes."""
1436    _loop_factory = _UnixSelectorEventLoop
1437
1438    def __init__(self):
1439        super().__init__()
1440        self._watcher = None
1441
1442    def _init_watcher(self):
1443        with events._lock:
1444            if self._watcher is None:  # pragma: no branch
1445                self._watcher = ThreadedChildWatcher()
1446                if threading.current_thread() is threading.main_thread():
1447                    self._watcher.attach_loop(self._local._loop)
1448
1449    def set_event_loop(self, loop):
1450        """Set the event loop.
1451
1452        As a side effect, if a child watcher was set before, then calling
1453        .set_event_loop() from the main thread will call .attach_loop(loop) on
1454        the child watcher.
1455        """
1456
1457        super().set_event_loop(loop)
1458
1459        if (self._watcher is not None and
1460                threading.current_thread() is threading.main_thread()):
1461            self._watcher.attach_loop(loop)
1462
1463    def get_child_watcher(self):
1464        """Get the watcher for child processes.
1465
1466        If not yet set, a ThreadedChildWatcher object is automatically created.
1467        """
1468        if self._watcher is None:
1469            self._init_watcher()
1470
1471        return self._watcher
1472
1473    def set_child_watcher(self, watcher):
1474        """Set the watcher for child processes."""
1475
1476        assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1477
1478        if self._watcher is not None:
1479            self._watcher.close()
1480
1481        self._watcher = watcher
1482
1483
1484SelectorEventLoop = _UnixSelectorEventLoop
1485DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
1486