xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/selector_events.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer.  For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
7__all__ = 'BaseSelectorEventLoop',
8
9import collections
10import errno
11import functools
12import selectors
13import socket
14import warnings
15import weakref
16try:
17    import ssl
18except ImportError:  # pragma: no cover
19    ssl = None
20
21from . import base_events
22from . import constants
23from . import events
24from . import futures
25from . import protocols
26from . import sslproto
27from . import transports
28from . import trsock
29from .log import logger
30
31
32def _test_selector_event(selector, fd, event):
33    # Test if the selector is monitoring 'event' events
34    # for the file descriptor 'fd'.
35    try:
36        key = selector.get_key(fd)
37    except KeyError:
38        return False
39    else:
40        return bool(key.events & event)
41
42
43class BaseSelectorEventLoop(base_events.BaseEventLoop):
44    """Selector event loop.
45
46    See events.EventLoop for API specification.
47    """
48
49    def __init__(self, selector=None):
50        super().__init__()
51
52        if selector is None:
53            selector = selectors.DefaultSelector()
54        logger.debug('Using selector: %s', selector.__class__.__name__)
55        self._selector = selector
56        self._make_self_pipe()
57        self._transports = weakref.WeakValueDictionary()
58
59    def _make_socket_transport(self, sock, protocol, waiter=None, *,
60                               extra=None, server=None):
61        return _SelectorSocketTransport(self, sock, protocol, waiter,
62                                        extra, server)
63
64    def _make_ssl_transport(
65            self, rawsock, protocol, sslcontext, waiter=None,
66            *, server_side=False, server_hostname=None,
67            extra=None, server=None,
68            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
69            ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT,
70    ):
71        ssl_protocol = sslproto.SSLProtocol(
72            self, protocol, sslcontext, waiter,
73            server_side, server_hostname,
74            ssl_handshake_timeout=ssl_handshake_timeout,
75            ssl_shutdown_timeout=ssl_shutdown_timeout
76        )
77        _SelectorSocketTransport(self, rawsock, ssl_protocol,
78                                 extra=extra, server=server)
79        return ssl_protocol._app_transport
80
81    def _make_datagram_transport(self, sock, protocol,
82                                 address=None, waiter=None, extra=None):
83        return _SelectorDatagramTransport(self, sock, protocol,
84                                          address, waiter, extra)
85
86    def close(self):
87        if self.is_running():
88            raise RuntimeError("Cannot close a running event loop")
89        if self.is_closed():
90            return
91        self._close_self_pipe()
92        super().close()
93        if self._selector is not None:
94            self._selector.close()
95            self._selector = None
96
97    def _close_self_pipe(self):
98        self._remove_reader(self._ssock.fileno())
99        self._ssock.close()
100        self._ssock = None
101        self._csock.close()
102        self._csock = None
103        self._internal_fds -= 1
104
105    def _make_self_pipe(self):
106        # A self-socket, really. :-)
107        self._ssock, self._csock = socket.socketpair()
108        self._ssock.setblocking(False)
109        self._csock.setblocking(False)
110        self._internal_fds += 1
111        self._add_reader(self._ssock.fileno(), self._read_from_self)
112
113    def _process_self_data(self, data):
114        pass
115
116    def _read_from_self(self):
117        while True:
118            try:
119                data = self._ssock.recv(4096)
120                if not data:
121                    break
122                self._process_self_data(data)
123            except InterruptedError:
124                continue
125            except BlockingIOError:
126                break
127
128    def _write_to_self(self):
129        # This may be called from a different thread, possibly after
130        # _close_self_pipe() has been called or even while it is
131        # running.  Guard for self._csock being None or closed.  When
132        # a socket is closed, send() raises OSError (with errno set to
133        # EBADF, but let's not rely on the exact error code).
134        csock = self._csock
135        if csock is None:
136            return
137
138        try:
139            csock.send(b'\0')
140        except OSError:
141            if self._debug:
142                logger.debug("Fail to write a null byte into the "
143                             "self-pipe socket",
144                             exc_info=True)
145
146    def _start_serving(self, protocol_factory, sock,
147                       sslcontext=None, server=None, backlog=100,
148                       ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
149                       ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
150        self._add_reader(sock.fileno(), self._accept_connection,
151                         protocol_factory, sock, sslcontext, server, backlog,
152                         ssl_handshake_timeout, ssl_shutdown_timeout)
153
154    def _accept_connection(
155            self, protocol_factory, sock,
156            sslcontext=None, server=None, backlog=100,
157            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
158            ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
159        # This method is only called once for each event loop tick where the
160        # listening socket has triggered an EVENT_READ. There may be multiple
161        # connections waiting for an .accept() so it is called in a loop.
162        # See https://bugs.python.org/issue27906 for more details.
163        for _ in range(backlog):
164            try:
165                conn, addr = sock.accept()
166                if self._debug:
167                    logger.debug("%r got a new connection from %r: %r",
168                                 server, addr, conn)
169                conn.setblocking(False)
170            except (BlockingIOError, InterruptedError, ConnectionAbortedError):
171                # Early exit because the socket accept buffer is empty.
172                return None
173            except OSError as exc:
174                # There's nowhere to send the error, so just log it.
175                if exc.errno in (errno.EMFILE, errno.ENFILE,
176                                 errno.ENOBUFS, errno.ENOMEM):
177                    # Some platforms (e.g. Linux keep reporting the FD as
178                    # ready, so we remove the read handler temporarily.
179                    # We'll try again in a while.
180                    self.call_exception_handler({
181                        'message': 'socket.accept() out of system resource',
182                        'exception': exc,
183                        'socket': trsock.TransportSocket(sock),
184                    })
185                    self._remove_reader(sock.fileno())
186                    self.call_later(constants.ACCEPT_RETRY_DELAY,
187                                    self._start_serving,
188                                    protocol_factory, sock, sslcontext, server,
189                                    backlog, ssl_handshake_timeout,
190                                    ssl_shutdown_timeout)
191                else:
192                    raise  # The event loop will catch, log and ignore it.
193            else:
194                extra = {'peername': addr}
195                accept = self._accept_connection2(
196                    protocol_factory, conn, extra, sslcontext, server,
197                    ssl_handshake_timeout, ssl_shutdown_timeout)
198                self.create_task(accept)
199
200    async def _accept_connection2(
201            self, protocol_factory, conn, extra,
202            sslcontext=None, server=None,
203            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
204            ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
205        protocol = None
206        transport = None
207        try:
208            protocol = protocol_factory()
209            waiter = self.create_future()
210            if sslcontext:
211                transport = self._make_ssl_transport(
212                    conn, protocol, sslcontext, waiter=waiter,
213                    server_side=True, extra=extra, server=server,
214                    ssl_handshake_timeout=ssl_handshake_timeout,
215                    ssl_shutdown_timeout=ssl_shutdown_timeout)
216            else:
217                transport = self._make_socket_transport(
218                    conn, protocol, waiter=waiter, extra=extra,
219                    server=server)
220
221            try:
222                await waiter
223            except BaseException:
224                transport.close()
225                raise
226                # It's now up to the protocol to handle the connection.
227
228        except (SystemExit, KeyboardInterrupt):
229            raise
230        except BaseException as exc:
231            if self._debug:
232                context = {
233                    'message':
234                        'Error on transport creation for incoming connection',
235                    'exception': exc,
236                }
237                if protocol is not None:
238                    context['protocol'] = protocol
239                if transport is not None:
240                    context['transport'] = transport
241                self.call_exception_handler(context)
242
243    def _ensure_fd_no_transport(self, fd):
244        fileno = fd
245        if not isinstance(fileno, int):
246            try:
247                fileno = int(fileno.fileno())
248            except (AttributeError, TypeError, ValueError):
249                # This code matches selectors._fileobj_to_fd function.
250                raise ValueError(f"Invalid file object: {fd!r}") from None
251        try:
252            transport = self._transports[fileno]
253        except KeyError:
254            pass
255        else:
256            if not transport.is_closing():
257                raise RuntimeError(
258                    f'File descriptor {fd!r} is used by transport '
259                    f'{transport!r}')
260
261    def _add_reader(self, fd, callback, *args):
262        self._check_closed()
263        handle = events.Handle(callback, args, self, None)
264        try:
265            key = self._selector.get_key(fd)
266        except KeyError:
267            self._selector.register(fd, selectors.EVENT_READ,
268                                    (handle, None))
269        else:
270            mask, (reader, writer) = key.events, key.data
271            self._selector.modify(fd, mask | selectors.EVENT_READ,
272                                  (handle, writer))
273            if reader is not None:
274                reader.cancel()
275        return handle
276
277    def _remove_reader(self, fd):
278        if self.is_closed():
279            return False
280        try:
281            key = self._selector.get_key(fd)
282        except KeyError:
283            return False
284        else:
285            mask, (reader, writer) = key.events, key.data
286            mask &= ~selectors.EVENT_READ
287            if not mask:
288                self._selector.unregister(fd)
289            else:
290                self._selector.modify(fd, mask, (None, writer))
291
292            if reader is not None:
293                reader.cancel()
294                return True
295            else:
296                return False
297
298    def _add_writer(self, fd, callback, *args):
299        self._check_closed()
300        handle = events.Handle(callback, args, self, None)
301        try:
302            key = self._selector.get_key(fd)
303        except KeyError:
304            self._selector.register(fd, selectors.EVENT_WRITE,
305                                    (None, handle))
306        else:
307            mask, (reader, writer) = key.events, key.data
308            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
309                                  (reader, handle))
310            if writer is not None:
311                writer.cancel()
312        return handle
313
314    def _remove_writer(self, fd):
315        """Remove a writer callback."""
316        if self.is_closed():
317            return False
318        try:
319            key = self._selector.get_key(fd)
320        except KeyError:
321            return False
322        else:
323            mask, (reader, writer) = key.events, key.data
324            # Remove both writer and connector.
325            mask &= ~selectors.EVENT_WRITE
326            if not mask:
327                self._selector.unregister(fd)
328            else:
329                self._selector.modify(fd, mask, (reader, None))
330
331            if writer is not None:
332                writer.cancel()
333                return True
334            else:
335                return False
336
337    def add_reader(self, fd, callback, *args):
338        """Add a reader callback."""
339        self._ensure_fd_no_transport(fd)
340        self._add_reader(fd, callback, *args)
341
342    def remove_reader(self, fd):
343        """Remove a reader callback."""
344        self._ensure_fd_no_transport(fd)
345        return self._remove_reader(fd)
346
347    def add_writer(self, fd, callback, *args):
348        """Add a writer callback.."""
349        self._ensure_fd_no_transport(fd)
350        self._add_writer(fd, callback, *args)
351
352    def remove_writer(self, fd):
353        """Remove a writer callback."""
354        self._ensure_fd_no_transport(fd)
355        return self._remove_writer(fd)
356
357    async def sock_recv(self, sock, n):
358        """Receive data from the socket.
359
360        The return value is a bytes object representing the data received.
361        The maximum amount of data to be received at once is specified by
362        nbytes.
363        """
364        base_events._check_ssl_socket(sock)
365        if self._debug and sock.gettimeout() != 0:
366            raise ValueError("the socket must be non-blocking")
367        try:
368            return sock.recv(n)
369        except (BlockingIOError, InterruptedError):
370            pass
371        fut = self.create_future()
372        fd = sock.fileno()
373        self._ensure_fd_no_transport(fd)
374        handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
375        fut.add_done_callback(
376            functools.partial(self._sock_read_done, fd, handle=handle))
377        return await fut
378
379    def _sock_read_done(self, fd, fut, handle=None):
380        if handle is None or not handle.cancelled():
381            self.remove_reader(fd)
382
383    def _sock_recv(self, fut, sock, n):
384        # _sock_recv() can add itself as an I/O callback if the operation can't
385        # be done immediately. Don't use it directly, call sock_recv().
386        if fut.done():
387            return
388        try:
389            data = sock.recv(n)
390        except (BlockingIOError, InterruptedError):
391            return  # try again next time
392        except (SystemExit, KeyboardInterrupt):
393            raise
394        except BaseException as exc:
395            fut.set_exception(exc)
396        else:
397            fut.set_result(data)
398
399    async def sock_recv_into(self, sock, buf):
400        """Receive data from the socket.
401
402        The received data is written into *buf* (a writable buffer).
403        The return value is the number of bytes written.
404        """
405        base_events._check_ssl_socket(sock)
406        if self._debug and sock.gettimeout() != 0:
407            raise ValueError("the socket must be non-blocking")
408        try:
409            return sock.recv_into(buf)
410        except (BlockingIOError, InterruptedError):
411            pass
412        fut = self.create_future()
413        fd = sock.fileno()
414        self._ensure_fd_no_transport(fd)
415        handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
416        fut.add_done_callback(
417            functools.partial(self._sock_read_done, fd, handle=handle))
418        return await fut
419
420    def _sock_recv_into(self, fut, sock, buf):
421        # _sock_recv_into() can add itself as an I/O callback if the operation
422        # can't be done immediately. Don't use it directly, call
423        # sock_recv_into().
424        if fut.done():
425            return
426        try:
427            nbytes = sock.recv_into(buf)
428        except (BlockingIOError, InterruptedError):
429            return  # try again next time
430        except (SystemExit, KeyboardInterrupt):
431            raise
432        except BaseException as exc:
433            fut.set_exception(exc)
434        else:
435            fut.set_result(nbytes)
436
437    async def sock_recvfrom(self, sock, bufsize):
438        """Receive a datagram from a datagram socket.
439
440        The return value is a tuple of (bytes, address) representing the
441        datagram received and the address it came from.
442        The maximum amount of data to be received at once is specified by
443        nbytes.
444        """
445        base_events._check_ssl_socket(sock)
446        if self._debug and sock.gettimeout() != 0:
447            raise ValueError("the socket must be non-blocking")
448        try:
449            return sock.recvfrom(bufsize)
450        except (BlockingIOError, InterruptedError):
451            pass
452        fut = self.create_future()
453        fd = sock.fileno()
454        self._ensure_fd_no_transport(fd)
455        handle = self._add_reader(fd, self._sock_recvfrom, fut, sock, bufsize)
456        fut.add_done_callback(
457            functools.partial(self._sock_read_done, fd, handle=handle))
458        return await fut
459
460    def _sock_recvfrom(self, fut, sock, bufsize):
461        # _sock_recvfrom() can add itself as an I/O callback if the operation
462        # can't be done immediately. Don't use it directly, call
463        # sock_recvfrom().
464        if fut.done():
465            return
466        try:
467            result = sock.recvfrom(bufsize)
468        except (BlockingIOError, InterruptedError):
469            return  # try again next time
470        except (SystemExit, KeyboardInterrupt):
471            raise
472        except BaseException as exc:
473            fut.set_exception(exc)
474        else:
475            fut.set_result(result)
476
477    async def sock_recvfrom_into(self, sock, buf, nbytes=0):
478        """Receive data from the socket.
479
480        The received data is written into *buf* (a writable buffer).
481        The return value is a tuple of (number of bytes written, address).
482        """
483        base_events._check_ssl_socket(sock)
484        if self._debug and sock.gettimeout() != 0:
485            raise ValueError("the socket must be non-blocking")
486        if not nbytes:
487            nbytes = len(buf)
488
489        try:
490            return sock.recvfrom_into(buf, nbytes)
491        except (BlockingIOError, InterruptedError):
492            pass
493        fut = self.create_future()
494        fd = sock.fileno()
495        self._ensure_fd_no_transport(fd)
496        handle = self._add_reader(fd, self._sock_recvfrom_into, fut, sock, buf,
497                                  nbytes)
498        fut.add_done_callback(
499            functools.partial(self._sock_read_done, fd, handle=handle))
500        return await fut
501
502    def _sock_recvfrom_into(self, fut, sock, buf, bufsize):
503        # _sock_recv_into() can add itself as an I/O callback if the operation
504        # can't be done immediately. Don't use it directly, call
505        # sock_recv_into().
506        if fut.done():
507            return
508        try:
509            result = sock.recvfrom_into(buf, bufsize)
510        except (BlockingIOError, InterruptedError):
511            return  # try again next time
512        except (SystemExit, KeyboardInterrupt):
513            raise
514        except BaseException as exc:
515            fut.set_exception(exc)
516        else:
517            fut.set_result(result)
518
519    async def sock_sendall(self, sock, data):
520        """Send data to the socket.
521
522        The socket must be connected to a remote socket. This method continues
523        to send data from data until either all data has been sent or an
524        error occurs. None is returned on success. On error, an exception is
525        raised, and there is no way to determine how much data, if any, was
526        successfully processed by the receiving end of the connection.
527        """
528        base_events._check_ssl_socket(sock)
529        if self._debug and sock.gettimeout() != 0:
530            raise ValueError("the socket must be non-blocking")
531        try:
532            n = sock.send(data)
533        except (BlockingIOError, InterruptedError):
534            n = 0
535
536        if n == len(data):
537            # all data sent
538            return
539
540        fut = self.create_future()
541        fd = sock.fileno()
542        self._ensure_fd_no_transport(fd)
543        # use a trick with a list in closure to store a mutable state
544        handle = self._add_writer(fd, self._sock_sendall, fut, sock,
545                                  memoryview(data), [n])
546        fut.add_done_callback(
547            functools.partial(self._sock_write_done, fd, handle=handle))
548        return await fut
549
550    def _sock_sendall(self, fut, sock, view, pos):
551        if fut.done():
552            # Future cancellation can be scheduled on previous loop iteration
553            return
554        start = pos[0]
555        try:
556            n = sock.send(view[start:])
557        except (BlockingIOError, InterruptedError):
558            return
559        except (SystemExit, KeyboardInterrupt):
560            raise
561        except BaseException as exc:
562            fut.set_exception(exc)
563            return
564
565        start += n
566
567        if start == len(view):
568            fut.set_result(None)
569        else:
570            pos[0] = start
571
572    async def sock_sendto(self, sock, data, address):
573        """Send data to the socket.
574
575        The socket must be connected to a remote socket. This method continues
576        to send data from data until either all data has been sent or an
577        error occurs. None is returned on success. On error, an exception is
578        raised, and there is no way to determine how much data, if any, was
579        successfully processed by the receiving end of the connection.
580        """
581        base_events._check_ssl_socket(sock)
582        if self._debug and sock.gettimeout() != 0:
583            raise ValueError("the socket must be non-blocking")
584        try:
585            return sock.sendto(data, address)
586        except (BlockingIOError, InterruptedError):
587            pass
588
589        fut = self.create_future()
590        fd = sock.fileno()
591        self._ensure_fd_no_transport(fd)
592        # use a trick with a list in closure to store a mutable state
593        handle = self._add_writer(fd, self._sock_sendto, fut, sock, data,
594                                  address)
595        fut.add_done_callback(
596            functools.partial(self._sock_write_done, fd, handle=handle))
597        return await fut
598
599    def _sock_sendto(self, fut, sock, data, address):
600        if fut.done():
601            # Future cancellation can be scheduled on previous loop iteration
602            return
603        try:
604            n = sock.sendto(data, 0, address)
605        except (BlockingIOError, InterruptedError):
606            return
607        except (SystemExit, KeyboardInterrupt):
608            raise
609        except BaseException as exc:
610            fut.set_exception(exc)
611        else:
612            fut.set_result(n)
613
614    async def sock_connect(self, sock, address):
615        """Connect to a remote socket at address.
616
617        This method is a coroutine.
618        """
619        base_events._check_ssl_socket(sock)
620        if self._debug and sock.gettimeout() != 0:
621            raise ValueError("the socket must be non-blocking")
622
623        if sock.family == socket.AF_INET or (
624                base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
625            resolved = await self._ensure_resolved(
626                address, family=sock.family, type=sock.type, proto=sock.proto,
627                loop=self,
628            )
629            _, _, _, _, address = resolved[0]
630
631        fut = self.create_future()
632        self._sock_connect(fut, sock, address)
633        try:
634            return await fut
635        finally:
636            # Needed to break cycles when an exception occurs.
637            fut = None
638
639    def _sock_connect(self, fut, sock, address):
640        fd = sock.fileno()
641        try:
642            sock.connect(address)
643        except (BlockingIOError, InterruptedError):
644            # Issue #23618: When the C function connect() fails with EINTR, the
645            # connection runs in background. We have to wait until the socket
646            # becomes writable to be notified when the connection succeed or
647            # fails.
648            self._ensure_fd_no_transport(fd)
649            handle = self._add_writer(
650                fd, self._sock_connect_cb, fut, sock, address)
651            fut.add_done_callback(
652                functools.partial(self._sock_write_done, fd, handle=handle))
653        except (SystemExit, KeyboardInterrupt):
654            raise
655        except BaseException as exc:
656            fut.set_exception(exc)
657        else:
658            fut.set_result(None)
659        finally:
660            fut = None
661
662    def _sock_write_done(self, fd, fut, handle=None):
663        if handle is None or not handle.cancelled():
664            self.remove_writer(fd)
665
666    def _sock_connect_cb(self, fut, sock, address):
667        if fut.done():
668            return
669
670        try:
671            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
672            if err != 0:
673                # Jump to any except clause below.
674                raise OSError(err, f'Connect call failed {address}')
675        except (BlockingIOError, InterruptedError):
676            # socket is still registered, the callback will be retried later
677            pass
678        except (SystemExit, KeyboardInterrupt):
679            raise
680        except BaseException as exc:
681            fut.set_exception(exc)
682        else:
683            fut.set_result(None)
684        finally:
685            fut = None
686
687    async def sock_accept(self, sock):
688        """Accept a connection.
689
690        The socket must be bound to an address and listening for connections.
691        The return value is a pair (conn, address) where conn is a new socket
692        object usable to send and receive data on the connection, and address
693        is the address bound to the socket on the other end of the connection.
694        """
695        base_events._check_ssl_socket(sock)
696        if self._debug and sock.gettimeout() != 0:
697            raise ValueError("the socket must be non-blocking")
698        fut = self.create_future()
699        self._sock_accept(fut, sock)
700        return await fut
701
702    def _sock_accept(self, fut, sock):
703        fd = sock.fileno()
704        try:
705            conn, address = sock.accept()
706            conn.setblocking(False)
707        except (BlockingIOError, InterruptedError):
708            self._ensure_fd_no_transport(fd)
709            handle = self._add_reader(fd, self._sock_accept, fut, sock)
710            fut.add_done_callback(
711                functools.partial(self._sock_read_done, fd, handle=handle))
712        except (SystemExit, KeyboardInterrupt):
713            raise
714        except BaseException as exc:
715            fut.set_exception(exc)
716        else:
717            fut.set_result((conn, address))
718
719    async def _sendfile_native(self, transp, file, offset, count):
720        del self._transports[transp._sock_fd]
721        resume_reading = transp.is_reading()
722        transp.pause_reading()
723        await transp._make_empty_waiter()
724        try:
725            return await self.sock_sendfile(transp._sock, file, offset, count,
726                                            fallback=False)
727        finally:
728            transp._reset_empty_waiter()
729            if resume_reading:
730                transp.resume_reading()
731            self._transports[transp._sock_fd] = transp
732
733    def _process_events(self, event_list):
734        for key, mask in event_list:
735            fileobj, (reader, writer) = key.fileobj, key.data
736            if mask & selectors.EVENT_READ and reader is not None:
737                if reader._cancelled:
738                    self._remove_reader(fileobj)
739                else:
740                    self._add_callback(reader)
741            if mask & selectors.EVENT_WRITE and writer is not None:
742                if writer._cancelled:
743                    self._remove_writer(fileobj)
744                else:
745                    self._add_callback(writer)
746
747    def _stop_serving(self, sock):
748        self._remove_reader(sock.fileno())
749        sock.close()
750
751
752class _SelectorTransport(transports._FlowControlMixin,
753                         transports.Transport):
754
755    max_size = 256 * 1024  # Buffer size passed to recv().
756
757    _buffer_factory = bytearray  # Constructs initial value for self._buffer.
758
759    # Attribute used in the destructor: it must be set even if the constructor
760    # is not called (see _SelectorSslTransport which may start by raising an
761    # exception)
762    _sock = None
763
764    def __init__(self, loop, sock, protocol, extra=None, server=None):
765        super().__init__(extra, loop)
766        self._extra['socket'] = trsock.TransportSocket(sock)
767        try:
768            self._extra['sockname'] = sock.getsockname()
769        except OSError:
770            self._extra['sockname'] = None
771        if 'peername' not in self._extra:
772            try:
773                self._extra['peername'] = sock.getpeername()
774            except socket.error:
775                self._extra['peername'] = None
776        self._sock = sock
777        self._sock_fd = sock.fileno()
778
779        self._protocol_connected = False
780        self.set_protocol(protocol)
781
782        self._server = server
783        self._buffer = self._buffer_factory()
784        self._conn_lost = 0  # Set when call to connection_lost scheduled.
785        self._closing = False  # Set when close() called.
786        self._paused = False  # Set when pause_reading() called
787
788        if self._server is not None:
789            self._server._attach()
790        loop._transports[self._sock_fd] = self
791
792    def __repr__(self):
793        info = [self.__class__.__name__]
794        if self._sock is None:
795            info.append('closed')
796        elif self._closing:
797            info.append('closing')
798        info.append(f'fd={self._sock_fd}')
799        # test if the transport was closed
800        if self._loop is not None and not self._loop.is_closed():
801            polling = _test_selector_event(self._loop._selector,
802                                           self._sock_fd, selectors.EVENT_READ)
803            if polling:
804                info.append('read=polling')
805            else:
806                info.append('read=idle')
807
808            polling = _test_selector_event(self._loop._selector,
809                                           self._sock_fd,
810                                           selectors.EVENT_WRITE)
811            if polling:
812                state = 'polling'
813            else:
814                state = 'idle'
815
816            bufsize = self.get_write_buffer_size()
817            info.append(f'write=<{state}, bufsize={bufsize}>')
818        return '<{}>'.format(' '.join(info))
819
820    def abort(self):
821        self._force_close(None)
822
823    def set_protocol(self, protocol):
824        self._protocol = protocol
825        self._protocol_connected = True
826
827    def get_protocol(self):
828        return self._protocol
829
830    def is_closing(self):
831        return self._closing
832
833    def is_reading(self):
834        return not self.is_closing() and not self._paused
835
836    def pause_reading(self):
837        if not self.is_reading():
838            return
839        self._paused = True
840        self._loop._remove_reader(self._sock_fd)
841        if self._loop.get_debug():
842            logger.debug("%r pauses reading", self)
843
844    def resume_reading(self):
845        if self._closing or not self._paused:
846            return
847        self._paused = False
848        self._add_reader(self._sock_fd, self._read_ready)
849        if self._loop.get_debug():
850            logger.debug("%r resumes reading", self)
851
852    def close(self):
853        if self._closing:
854            return
855        self._closing = True
856        self._loop._remove_reader(self._sock_fd)
857        if not self._buffer:
858            self._conn_lost += 1
859            self._loop._remove_writer(self._sock_fd)
860            self._loop.call_soon(self._call_connection_lost, None)
861
862    def __del__(self, _warn=warnings.warn):
863        if self._sock is not None:
864            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
865            self._sock.close()
866
867    def _fatal_error(self, exc, message='Fatal error on transport'):
868        # Should be called from exception handler only.
869        if isinstance(exc, OSError):
870            if self._loop.get_debug():
871                logger.debug("%r: %s", self, message, exc_info=True)
872        else:
873            self._loop.call_exception_handler({
874                'message': message,
875                'exception': exc,
876                'transport': self,
877                'protocol': self._protocol,
878            })
879        self._force_close(exc)
880
881    def _force_close(self, exc):
882        if self._conn_lost:
883            return
884        if self._buffer:
885            self._buffer.clear()
886            self._loop._remove_writer(self._sock_fd)
887        if not self._closing:
888            self._closing = True
889            self._loop._remove_reader(self._sock_fd)
890        self._conn_lost += 1
891        self._loop.call_soon(self._call_connection_lost, exc)
892
893    def _call_connection_lost(self, exc):
894        try:
895            if self._protocol_connected:
896                self._protocol.connection_lost(exc)
897        finally:
898            self._sock.close()
899            self._sock = None
900            self._protocol = None
901            self._loop = None
902            server = self._server
903            if server is not None:
904                server._detach()
905                self._server = None
906
907    def get_write_buffer_size(self):
908        return len(self._buffer)
909
910    def _add_reader(self, fd, callback, *args):
911        if not self.is_reading():
912            return
913        self._loop._add_reader(fd, callback, *args)
914
915
916class _SelectorSocketTransport(_SelectorTransport):
917
918    _start_tls_compatible = True
919    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
920
921    def __init__(self, loop, sock, protocol, waiter=None,
922                 extra=None, server=None):
923
924        self._read_ready_cb = None
925        super().__init__(loop, sock, protocol, extra, server)
926        self._eof = False
927        self._empty_waiter = None
928
929        # Disable the Nagle algorithm -- small writes will be
930        # sent without waiting for the TCP ACK.  This generally
931        # decreases the latency (in some cases significantly.)
932        base_events._set_nodelay(self._sock)
933
934        self._loop.call_soon(self._protocol.connection_made, self)
935        # only start reading when connection_made() has been called
936        self._loop.call_soon(self._add_reader,
937                             self._sock_fd, self._read_ready)
938        if waiter is not None:
939            # only wake up the waiter when connection_made() has been called
940            self._loop.call_soon(futures._set_result_unless_cancelled,
941                                 waiter, None)
942
943    def set_protocol(self, protocol):
944        if isinstance(protocol, protocols.BufferedProtocol):
945            self._read_ready_cb = self._read_ready__get_buffer
946        else:
947            self._read_ready_cb = self._read_ready__data_received
948
949        super().set_protocol(protocol)
950
951    def _read_ready(self):
952        self._read_ready_cb()
953
954    def _read_ready__get_buffer(self):
955        if self._conn_lost:
956            return
957
958        try:
959            buf = self._protocol.get_buffer(-1)
960            if not len(buf):
961                raise RuntimeError('get_buffer() returned an empty buffer')
962        except (SystemExit, KeyboardInterrupt):
963            raise
964        except BaseException as exc:
965            self._fatal_error(
966                exc, 'Fatal error: protocol.get_buffer() call failed.')
967            return
968
969        try:
970            nbytes = self._sock.recv_into(buf)
971        except (BlockingIOError, InterruptedError):
972            return
973        except (SystemExit, KeyboardInterrupt):
974            raise
975        except BaseException as exc:
976            self._fatal_error(exc, 'Fatal read error on socket transport')
977            return
978
979        if not nbytes:
980            self._read_ready__on_eof()
981            return
982
983        try:
984            self._protocol.buffer_updated(nbytes)
985        except (SystemExit, KeyboardInterrupt):
986            raise
987        except BaseException as exc:
988            self._fatal_error(
989                exc, 'Fatal error: protocol.buffer_updated() call failed.')
990
991    def _read_ready__data_received(self):
992        if self._conn_lost:
993            return
994        try:
995            data = self._sock.recv(self.max_size)
996        except (BlockingIOError, InterruptedError):
997            return
998        except (SystemExit, KeyboardInterrupt):
999            raise
1000        except BaseException as exc:
1001            self._fatal_error(exc, 'Fatal read error on socket transport')
1002            return
1003
1004        if not data:
1005            self._read_ready__on_eof()
1006            return
1007
1008        try:
1009            self._protocol.data_received(data)
1010        except (SystemExit, KeyboardInterrupt):
1011            raise
1012        except BaseException as exc:
1013            self._fatal_error(
1014                exc, 'Fatal error: protocol.data_received() call failed.')
1015
1016    def _read_ready__on_eof(self):
1017        if self._loop.get_debug():
1018            logger.debug("%r received EOF", self)
1019
1020        try:
1021            keep_open = self._protocol.eof_received()
1022        except (SystemExit, KeyboardInterrupt):
1023            raise
1024        except BaseException as exc:
1025            self._fatal_error(
1026                exc, 'Fatal error: protocol.eof_received() call failed.')
1027            return
1028
1029        if keep_open:
1030            # We're keeping the connection open so the
1031            # protocol can write more, but we still can't
1032            # receive more, so remove the reader callback.
1033            self._loop._remove_reader(self._sock_fd)
1034        else:
1035            self.close()
1036
1037    def write(self, data):
1038        if not isinstance(data, (bytes, bytearray, memoryview)):
1039            raise TypeError(f'data argument must be a bytes-like object, '
1040                            f'not {type(data).__name__!r}')
1041        if self._eof:
1042            raise RuntimeError('Cannot call write() after write_eof()')
1043        if self._empty_waiter is not None:
1044            raise RuntimeError('unable to write; sendfile is in progress')
1045        if not data:
1046            return
1047
1048        if self._conn_lost:
1049            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1050                logger.warning('socket.send() raised exception.')
1051            self._conn_lost += 1
1052            return
1053
1054        if not self._buffer:
1055            # Optimization: try to send now.
1056            try:
1057                n = self._sock.send(data)
1058            except (BlockingIOError, InterruptedError):
1059                pass
1060            except (SystemExit, KeyboardInterrupt):
1061                raise
1062            except BaseException as exc:
1063                self._fatal_error(exc, 'Fatal write error on socket transport')
1064                return
1065            else:
1066                data = data[n:]
1067                if not data:
1068                    return
1069            # Not all was written; register write handler.
1070            self._loop._add_writer(self._sock_fd, self._write_ready)
1071
1072        # Add it to the buffer.
1073        self._buffer.extend(data)
1074        self._maybe_pause_protocol()
1075
1076    def _write_ready(self):
1077        assert self._buffer, 'Data should not be empty'
1078
1079        if self._conn_lost:
1080            return
1081        try:
1082            n = self._sock.send(self._buffer)
1083        except (BlockingIOError, InterruptedError):
1084            pass
1085        except (SystemExit, KeyboardInterrupt):
1086            raise
1087        except BaseException as exc:
1088            self._loop._remove_writer(self._sock_fd)
1089            self._buffer.clear()
1090            self._fatal_error(exc, 'Fatal write error on socket transport')
1091            if self._empty_waiter is not None:
1092                self._empty_waiter.set_exception(exc)
1093        else:
1094            if n:
1095                del self._buffer[:n]
1096            self._maybe_resume_protocol()  # May append to buffer.
1097            if not self._buffer:
1098                self._loop._remove_writer(self._sock_fd)
1099                if self._empty_waiter is not None:
1100                    self._empty_waiter.set_result(None)
1101                if self._closing:
1102                    self._call_connection_lost(None)
1103                elif self._eof:
1104                    self._sock.shutdown(socket.SHUT_WR)
1105
1106    def write_eof(self):
1107        if self._closing or self._eof:
1108            return
1109        self._eof = True
1110        if not self._buffer:
1111            self._sock.shutdown(socket.SHUT_WR)
1112
1113    def can_write_eof(self):
1114        return True
1115
1116    def _call_connection_lost(self, exc):
1117        super()._call_connection_lost(exc)
1118        if self._empty_waiter is not None:
1119            self._empty_waiter.set_exception(
1120                ConnectionError("Connection is closed by peer"))
1121
1122    def _make_empty_waiter(self):
1123        if self._empty_waiter is not None:
1124            raise RuntimeError("Empty waiter is already set")
1125        self._empty_waiter = self._loop.create_future()
1126        if not self._buffer:
1127            self._empty_waiter.set_result(None)
1128        return self._empty_waiter
1129
1130    def _reset_empty_waiter(self):
1131        self._empty_waiter = None
1132
1133
1134class _SelectorDatagramTransport(_SelectorTransport):
1135
1136    _buffer_factory = collections.deque
1137
1138    def __init__(self, loop, sock, protocol, address=None,
1139                 waiter=None, extra=None):
1140        super().__init__(loop, sock, protocol, extra)
1141        self._address = address
1142        self._buffer_size = 0
1143        self._loop.call_soon(self._protocol.connection_made, self)
1144        # only start reading when connection_made() has been called
1145        self._loop.call_soon(self._add_reader,
1146                             self._sock_fd, self._read_ready)
1147        if waiter is not None:
1148            # only wake up the waiter when connection_made() has been called
1149            self._loop.call_soon(futures._set_result_unless_cancelled,
1150                                 waiter, None)
1151
1152    def get_write_buffer_size(self):
1153        return self._buffer_size
1154
1155    def _read_ready(self):
1156        if self._conn_lost:
1157            return
1158        try:
1159            data, addr = self._sock.recvfrom(self.max_size)
1160        except (BlockingIOError, InterruptedError):
1161            pass
1162        except OSError as exc:
1163            self._protocol.error_received(exc)
1164        except (SystemExit, KeyboardInterrupt):
1165            raise
1166        except BaseException as exc:
1167            self._fatal_error(exc, 'Fatal read error on datagram transport')
1168        else:
1169            self._protocol.datagram_received(data, addr)
1170
1171    def sendto(self, data, addr=None):
1172        if not isinstance(data, (bytes, bytearray, memoryview)):
1173            raise TypeError(f'data argument must be a bytes-like object, '
1174                            f'not {type(data).__name__!r}')
1175        if not data:
1176            return
1177
1178        if self._address:
1179            if addr not in (None, self._address):
1180                raise ValueError(
1181                    f'Invalid address: must be None or {self._address}')
1182            addr = self._address
1183
1184        if self._conn_lost and self._address:
1185            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1186                logger.warning('socket.send() raised exception.')
1187            self._conn_lost += 1
1188            return
1189
1190        if not self._buffer:
1191            # Attempt to send it right away first.
1192            try:
1193                if self._extra['peername']:
1194                    self._sock.send(data)
1195                else:
1196                    self._sock.sendto(data, addr)
1197                return
1198            except (BlockingIOError, InterruptedError):
1199                self._loop._add_writer(self._sock_fd, self._sendto_ready)
1200            except OSError as exc:
1201                self._protocol.error_received(exc)
1202                return
1203            except (SystemExit, KeyboardInterrupt):
1204                raise
1205            except BaseException as exc:
1206                self._fatal_error(
1207                    exc, 'Fatal write error on datagram transport')
1208                return
1209
1210        # Ensure that what we buffer is immutable.
1211        self._buffer.append((bytes(data), addr))
1212        self._buffer_size += len(data)
1213        self._maybe_pause_protocol()
1214
1215    def _sendto_ready(self):
1216        while self._buffer:
1217            data, addr = self._buffer.popleft()
1218            self._buffer_size -= len(data)
1219            try:
1220                if self._extra['peername']:
1221                    self._sock.send(data)
1222                else:
1223                    self._sock.sendto(data, addr)
1224            except (BlockingIOError, InterruptedError):
1225                self._buffer.appendleft((data, addr))  # Try again later.
1226                self._buffer_size += len(data)
1227                break
1228            except OSError as exc:
1229                self._protocol.error_received(exc)
1230                return
1231            except (SystemExit, KeyboardInterrupt):
1232                raise
1233            except BaseException as exc:
1234                self._fatal_error(
1235                    exc, 'Fatal write error on datagram transport')
1236                return
1237
1238        self._maybe_resume_protocol()  # May append to buffer.
1239        if not self._buffer:
1240            self._loop._remove_writer(self._sock_fd)
1241            if self._closing:
1242                self._call_connection_lost(None)
1243