1*cda5da8dSAndroid Build Coastguard Worker"""Base implementation of event loop. 2*cda5da8dSAndroid Build Coastguard Worker 3*cda5da8dSAndroid Build Coastguard WorkerThe event loop can be broken up into a multiplexer (the part 4*cda5da8dSAndroid Build Coastguard Workerresponsible for notifying us of I/O events) and the event loop proper, 5*cda5da8dSAndroid Build Coastguard Workerwhich wraps a multiplexer with functionality for scheduling callbacks, 6*cda5da8dSAndroid Build Coastguard Workerimmediately or at a given time in the future. 7*cda5da8dSAndroid Build Coastguard Worker 8*cda5da8dSAndroid Build Coastguard WorkerWhenever a public API takes a callback, subsequent positional 9*cda5da8dSAndroid Build Coastguard Workerarguments will be passed to the callback if/when it is called. This 10*cda5da8dSAndroid Build Coastguard Workeravoids the proliferation of trivial lambdas implementing closures. 11*cda5da8dSAndroid Build Coastguard WorkerKeyword arguments for the callback are not supported; this is a 12*cda5da8dSAndroid Build Coastguard Workerconscious design decision, leaving the door open for keyword arguments 13*cda5da8dSAndroid Build Coastguard Workerto modify the meaning of the API call itself. 14*cda5da8dSAndroid Build Coastguard Worker""" 15*cda5da8dSAndroid Build Coastguard Worker 16*cda5da8dSAndroid Build Coastguard Workerimport collections 17*cda5da8dSAndroid Build Coastguard Workerimport collections.abc 18*cda5da8dSAndroid Build Coastguard Workerimport concurrent.futures 19*cda5da8dSAndroid Build Coastguard Workerimport functools 20*cda5da8dSAndroid Build Coastguard Workerimport heapq 21*cda5da8dSAndroid Build Coastguard Workerimport itertools 22*cda5da8dSAndroid Build Coastguard Workerimport os 23*cda5da8dSAndroid Build Coastguard Workerimport socket 24*cda5da8dSAndroid Build Coastguard Workerimport stat 25*cda5da8dSAndroid Build Coastguard Workerimport subprocess 26*cda5da8dSAndroid Build Coastguard Workerimport threading 27*cda5da8dSAndroid Build Coastguard Workerimport time 28*cda5da8dSAndroid Build Coastguard Workerimport traceback 29*cda5da8dSAndroid Build Coastguard Workerimport sys 30*cda5da8dSAndroid Build Coastguard Workerimport warnings 31*cda5da8dSAndroid Build Coastguard Workerimport weakref 32*cda5da8dSAndroid Build Coastguard Worker 33*cda5da8dSAndroid Build Coastguard Workertry: 34*cda5da8dSAndroid Build Coastguard Worker import ssl 35*cda5da8dSAndroid Build Coastguard Workerexcept ImportError: # pragma: no cover 36*cda5da8dSAndroid Build Coastguard Worker ssl = None 37*cda5da8dSAndroid Build Coastguard Worker 38*cda5da8dSAndroid Build Coastguard Workerfrom . import constants 39*cda5da8dSAndroid Build Coastguard Workerfrom . import coroutines 40*cda5da8dSAndroid Build Coastguard Workerfrom . import events 41*cda5da8dSAndroid Build Coastguard Workerfrom . import exceptions 42*cda5da8dSAndroid Build Coastguard Workerfrom . import futures 43*cda5da8dSAndroid Build Coastguard Workerfrom . import protocols 44*cda5da8dSAndroid Build Coastguard Workerfrom . import sslproto 45*cda5da8dSAndroid Build Coastguard Workerfrom . import staggered 46*cda5da8dSAndroid Build Coastguard Workerfrom . import tasks 47*cda5da8dSAndroid Build Coastguard Workerfrom . import transports 48*cda5da8dSAndroid Build Coastguard Workerfrom . import trsock 49*cda5da8dSAndroid Build Coastguard Workerfrom .log import logger 50*cda5da8dSAndroid Build Coastguard Worker 51*cda5da8dSAndroid Build Coastguard Worker 52*cda5da8dSAndroid Build Coastguard Worker__all__ = 'BaseEventLoop','Server', 53*cda5da8dSAndroid Build Coastguard Worker 54*cda5da8dSAndroid Build Coastguard Worker 55*cda5da8dSAndroid Build Coastguard Worker# Minimum number of _scheduled timer handles before cleanup of 56*cda5da8dSAndroid Build Coastguard Worker# cancelled handles is performed. 57*cda5da8dSAndroid Build Coastguard Worker_MIN_SCHEDULED_TIMER_HANDLES = 100 58*cda5da8dSAndroid Build Coastguard Worker 59*cda5da8dSAndroid Build Coastguard Worker# Minimum fraction of _scheduled timer handles that are cancelled 60*cda5da8dSAndroid Build Coastguard Worker# before cleanup of cancelled handles is performed. 61*cda5da8dSAndroid Build Coastguard Worker_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 62*cda5da8dSAndroid Build Coastguard Worker 63*cda5da8dSAndroid Build Coastguard Worker 64*cda5da8dSAndroid Build Coastguard Worker_HAS_IPv6 = hasattr(socket, 'AF_INET6') 65*cda5da8dSAndroid Build Coastguard Worker 66*cda5da8dSAndroid Build Coastguard Worker# Maximum timeout passed to select to avoid OS limitations 67*cda5da8dSAndroid Build Coastguard WorkerMAXIMUM_SELECT_TIMEOUT = 24 * 3600 68*cda5da8dSAndroid Build Coastguard Worker 69*cda5da8dSAndroid Build Coastguard Worker 70*cda5da8dSAndroid Build Coastguard Workerdef _format_handle(handle): 71*cda5da8dSAndroid Build Coastguard Worker cb = handle._callback 72*cda5da8dSAndroid Build Coastguard Worker if isinstance(getattr(cb, '__self__', None), tasks.Task): 73*cda5da8dSAndroid Build Coastguard Worker # format the task 74*cda5da8dSAndroid Build Coastguard Worker return repr(cb.__self__) 75*cda5da8dSAndroid Build Coastguard Worker else: 76*cda5da8dSAndroid Build Coastguard Worker return str(handle) 77*cda5da8dSAndroid Build Coastguard Worker 78*cda5da8dSAndroid Build Coastguard Worker 79*cda5da8dSAndroid Build Coastguard Workerdef _format_pipe(fd): 80*cda5da8dSAndroid Build Coastguard Worker if fd == subprocess.PIPE: 81*cda5da8dSAndroid Build Coastguard Worker return '<pipe>' 82*cda5da8dSAndroid Build Coastguard Worker elif fd == subprocess.STDOUT: 83*cda5da8dSAndroid Build Coastguard Worker return '<stdout>' 84*cda5da8dSAndroid Build Coastguard Worker else: 85*cda5da8dSAndroid Build Coastguard Worker return repr(fd) 86*cda5da8dSAndroid Build Coastguard Worker 87*cda5da8dSAndroid Build Coastguard Worker 88*cda5da8dSAndroid Build Coastguard Workerdef _set_reuseport(sock): 89*cda5da8dSAndroid Build Coastguard Worker if not hasattr(socket, 'SO_REUSEPORT'): 90*cda5da8dSAndroid Build Coastguard Worker raise ValueError('reuse_port not supported by socket module') 91*cda5da8dSAndroid Build Coastguard Worker else: 92*cda5da8dSAndroid Build Coastguard Worker try: 93*cda5da8dSAndroid Build Coastguard Worker sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 94*cda5da8dSAndroid Build Coastguard Worker except OSError: 95*cda5da8dSAndroid Build Coastguard Worker raise ValueError('reuse_port not supported by socket module, ' 96*cda5da8dSAndroid Build Coastguard Worker 'SO_REUSEPORT defined but not implemented.') 97*cda5da8dSAndroid Build Coastguard Worker 98*cda5da8dSAndroid Build Coastguard Worker 99*cda5da8dSAndroid Build Coastguard Workerdef _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 100*cda5da8dSAndroid Build Coastguard Worker # Try to skip getaddrinfo if "host" is already an IP. Users might have 101*cda5da8dSAndroid Build Coastguard Worker # handled name resolution in their own code and pass in resolved IPs. 102*cda5da8dSAndroid Build Coastguard Worker if not hasattr(socket, 'inet_pton'): 103*cda5da8dSAndroid Build Coastguard Worker return 104*cda5da8dSAndroid Build Coastguard Worker 105*cda5da8dSAndroid Build Coastguard Worker if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 106*cda5da8dSAndroid Build Coastguard Worker host is None: 107*cda5da8dSAndroid Build Coastguard Worker return None 108*cda5da8dSAndroid Build Coastguard Worker 109*cda5da8dSAndroid Build Coastguard Worker if type == socket.SOCK_STREAM: 110*cda5da8dSAndroid Build Coastguard Worker proto = socket.IPPROTO_TCP 111*cda5da8dSAndroid Build Coastguard Worker elif type == socket.SOCK_DGRAM: 112*cda5da8dSAndroid Build Coastguard Worker proto = socket.IPPROTO_UDP 113*cda5da8dSAndroid Build Coastguard Worker else: 114*cda5da8dSAndroid Build Coastguard Worker return None 115*cda5da8dSAndroid Build Coastguard Worker 116*cda5da8dSAndroid Build Coastguard Worker if port is None: 117*cda5da8dSAndroid Build Coastguard Worker port = 0 118*cda5da8dSAndroid Build Coastguard Worker elif isinstance(port, bytes) and port == b'': 119*cda5da8dSAndroid Build Coastguard Worker port = 0 120*cda5da8dSAndroid Build Coastguard Worker elif isinstance(port, str) and port == '': 121*cda5da8dSAndroid Build Coastguard Worker port = 0 122*cda5da8dSAndroid Build Coastguard Worker else: 123*cda5da8dSAndroid Build Coastguard Worker # If port's a service name like "http", don't skip getaddrinfo. 124*cda5da8dSAndroid Build Coastguard Worker try: 125*cda5da8dSAndroid Build Coastguard Worker port = int(port) 126*cda5da8dSAndroid Build Coastguard Worker except (TypeError, ValueError): 127*cda5da8dSAndroid Build Coastguard Worker return None 128*cda5da8dSAndroid Build Coastguard Worker 129*cda5da8dSAndroid Build Coastguard Worker if family == socket.AF_UNSPEC: 130*cda5da8dSAndroid Build Coastguard Worker afs = [socket.AF_INET] 131*cda5da8dSAndroid Build Coastguard Worker if _HAS_IPv6: 132*cda5da8dSAndroid Build Coastguard Worker afs.append(socket.AF_INET6) 133*cda5da8dSAndroid Build Coastguard Worker else: 134*cda5da8dSAndroid Build Coastguard Worker afs = [family] 135*cda5da8dSAndroid Build Coastguard Worker 136*cda5da8dSAndroid Build Coastguard Worker if isinstance(host, bytes): 137*cda5da8dSAndroid Build Coastguard Worker host = host.decode('idna') 138*cda5da8dSAndroid Build Coastguard Worker if '%' in host: 139*cda5da8dSAndroid Build Coastguard Worker # Linux's inet_pton doesn't accept an IPv6 zone index after host, 140*cda5da8dSAndroid Build Coastguard Worker # like '::1%lo0'. 141*cda5da8dSAndroid Build Coastguard Worker return None 142*cda5da8dSAndroid Build Coastguard Worker 143*cda5da8dSAndroid Build Coastguard Worker for af in afs: 144*cda5da8dSAndroid Build Coastguard Worker try: 145*cda5da8dSAndroid Build Coastguard Worker socket.inet_pton(af, host) 146*cda5da8dSAndroid Build Coastguard Worker # The host has already been resolved. 147*cda5da8dSAndroid Build Coastguard Worker if _HAS_IPv6 and af == socket.AF_INET6: 148*cda5da8dSAndroid Build Coastguard Worker return af, type, proto, '', (host, port, flowinfo, scopeid) 149*cda5da8dSAndroid Build Coastguard Worker else: 150*cda5da8dSAndroid Build Coastguard Worker return af, type, proto, '', (host, port) 151*cda5da8dSAndroid Build Coastguard Worker except OSError: 152*cda5da8dSAndroid Build Coastguard Worker pass 153*cda5da8dSAndroid Build Coastguard Worker 154*cda5da8dSAndroid Build Coastguard Worker # "host" is not an IP address. 155*cda5da8dSAndroid Build Coastguard Worker return None 156*cda5da8dSAndroid Build Coastguard Worker 157*cda5da8dSAndroid Build Coastguard Worker 158*cda5da8dSAndroid Build Coastguard Workerdef _interleave_addrinfos(addrinfos, first_address_family_count=1): 159*cda5da8dSAndroid Build Coastguard Worker """Interleave list of addrinfo tuples by family.""" 160*cda5da8dSAndroid Build Coastguard Worker # Group addresses by family 161*cda5da8dSAndroid Build Coastguard Worker addrinfos_by_family = collections.OrderedDict() 162*cda5da8dSAndroid Build Coastguard Worker for addr in addrinfos: 163*cda5da8dSAndroid Build Coastguard Worker family = addr[0] 164*cda5da8dSAndroid Build Coastguard Worker if family not in addrinfos_by_family: 165*cda5da8dSAndroid Build Coastguard Worker addrinfos_by_family[family] = [] 166*cda5da8dSAndroid Build Coastguard Worker addrinfos_by_family[family].append(addr) 167*cda5da8dSAndroid Build Coastguard Worker addrinfos_lists = list(addrinfos_by_family.values()) 168*cda5da8dSAndroid Build Coastguard Worker 169*cda5da8dSAndroid Build Coastguard Worker reordered = [] 170*cda5da8dSAndroid Build Coastguard Worker if first_address_family_count > 1: 171*cda5da8dSAndroid Build Coastguard Worker reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 172*cda5da8dSAndroid Build Coastguard Worker del addrinfos_lists[0][:first_address_family_count - 1] 173*cda5da8dSAndroid Build Coastguard Worker reordered.extend( 174*cda5da8dSAndroid Build Coastguard Worker a for a in itertools.chain.from_iterable( 175*cda5da8dSAndroid Build Coastguard Worker itertools.zip_longest(*addrinfos_lists) 176*cda5da8dSAndroid Build Coastguard Worker ) if a is not None) 177*cda5da8dSAndroid Build Coastguard Worker return reordered 178*cda5da8dSAndroid Build Coastguard Worker 179*cda5da8dSAndroid Build Coastguard Worker 180*cda5da8dSAndroid Build Coastguard Workerdef _run_until_complete_cb(fut): 181*cda5da8dSAndroid Build Coastguard Worker if not fut.cancelled(): 182*cda5da8dSAndroid Build Coastguard Worker exc = fut.exception() 183*cda5da8dSAndroid Build Coastguard Worker if isinstance(exc, (SystemExit, KeyboardInterrupt)): 184*cda5da8dSAndroid Build Coastguard Worker # Issue #22429: run_forever() already finished, no need to 185*cda5da8dSAndroid Build Coastguard Worker # stop it. 186*cda5da8dSAndroid Build Coastguard Worker return 187*cda5da8dSAndroid Build Coastguard Worker futures._get_loop(fut).stop() 188*cda5da8dSAndroid Build Coastguard Worker 189*cda5da8dSAndroid Build Coastguard Worker 190*cda5da8dSAndroid Build Coastguard Workerif hasattr(socket, 'TCP_NODELAY'): 191*cda5da8dSAndroid Build Coastguard Worker def _set_nodelay(sock): 192*cda5da8dSAndroid Build Coastguard Worker if (sock.family in {socket.AF_INET, socket.AF_INET6} and 193*cda5da8dSAndroid Build Coastguard Worker sock.type == socket.SOCK_STREAM and 194*cda5da8dSAndroid Build Coastguard Worker sock.proto == socket.IPPROTO_TCP): 195*cda5da8dSAndroid Build Coastguard Worker sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 196*cda5da8dSAndroid Build Coastguard Workerelse: 197*cda5da8dSAndroid Build Coastguard Worker def _set_nodelay(sock): 198*cda5da8dSAndroid Build Coastguard Worker pass 199*cda5da8dSAndroid Build Coastguard Worker 200*cda5da8dSAndroid Build Coastguard Worker 201*cda5da8dSAndroid Build Coastguard Workerdef _check_ssl_socket(sock): 202*cda5da8dSAndroid Build Coastguard Worker if ssl is not None and isinstance(sock, ssl.SSLSocket): 203*cda5da8dSAndroid Build Coastguard Worker raise TypeError("Socket cannot be of type SSLSocket") 204*cda5da8dSAndroid Build Coastguard Worker 205*cda5da8dSAndroid Build Coastguard Worker 206*cda5da8dSAndroid Build Coastguard Workerclass _SendfileFallbackProtocol(protocols.Protocol): 207*cda5da8dSAndroid Build Coastguard Worker def __init__(self, transp): 208*cda5da8dSAndroid Build Coastguard Worker if not isinstance(transp, transports._FlowControlMixin): 209*cda5da8dSAndroid Build Coastguard Worker raise TypeError("transport should be _FlowControlMixin instance") 210*cda5da8dSAndroid Build Coastguard Worker self._transport = transp 211*cda5da8dSAndroid Build Coastguard Worker self._proto = transp.get_protocol() 212*cda5da8dSAndroid Build Coastguard Worker self._should_resume_reading = transp.is_reading() 213*cda5da8dSAndroid Build Coastguard Worker self._should_resume_writing = transp._protocol_paused 214*cda5da8dSAndroid Build Coastguard Worker transp.pause_reading() 215*cda5da8dSAndroid Build Coastguard Worker transp.set_protocol(self) 216*cda5da8dSAndroid Build Coastguard Worker if self._should_resume_writing: 217*cda5da8dSAndroid Build Coastguard Worker self._write_ready_fut = self._transport._loop.create_future() 218*cda5da8dSAndroid Build Coastguard Worker else: 219*cda5da8dSAndroid Build Coastguard Worker self._write_ready_fut = None 220*cda5da8dSAndroid Build Coastguard Worker 221*cda5da8dSAndroid Build Coastguard Worker async def drain(self): 222*cda5da8dSAndroid Build Coastguard Worker if self._transport.is_closing(): 223*cda5da8dSAndroid Build Coastguard Worker raise ConnectionError("Connection closed by peer") 224*cda5da8dSAndroid Build Coastguard Worker fut = self._write_ready_fut 225*cda5da8dSAndroid Build Coastguard Worker if fut is None: 226*cda5da8dSAndroid Build Coastguard Worker return 227*cda5da8dSAndroid Build Coastguard Worker await fut 228*cda5da8dSAndroid Build Coastguard Worker 229*cda5da8dSAndroid Build Coastguard Worker def connection_made(self, transport): 230*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError("Invalid state: " 231*cda5da8dSAndroid Build Coastguard Worker "connection should have been established already.") 232*cda5da8dSAndroid Build Coastguard Worker 233*cda5da8dSAndroid Build Coastguard Worker def connection_lost(self, exc): 234*cda5da8dSAndroid Build Coastguard Worker if self._write_ready_fut is not None: 235*cda5da8dSAndroid Build Coastguard Worker # Never happens if peer disconnects after sending the whole content 236*cda5da8dSAndroid Build Coastguard Worker # Thus disconnection is always an exception from user perspective 237*cda5da8dSAndroid Build Coastguard Worker if exc is None: 238*cda5da8dSAndroid Build Coastguard Worker self._write_ready_fut.set_exception( 239*cda5da8dSAndroid Build Coastguard Worker ConnectionError("Connection is closed by peer")) 240*cda5da8dSAndroid Build Coastguard Worker else: 241*cda5da8dSAndroid Build Coastguard Worker self._write_ready_fut.set_exception(exc) 242*cda5da8dSAndroid Build Coastguard Worker self._proto.connection_lost(exc) 243*cda5da8dSAndroid Build Coastguard Worker 244*cda5da8dSAndroid Build Coastguard Worker def pause_writing(self): 245*cda5da8dSAndroid Build Coastguard Worker if self._write_ready_fut is not None: 246*cda5da8dSAndroid Build Coastguard Worker return 247*cda5da8dSAndroid Build Coastguard Worker self._write_ready_fut = self._transport._loop.create_future() 248*cda5da8dSAndroid Build Coastguard Worker 249*cda5da8dSAndroid Build Coastguard Worker def resume_writing(self): 250*cda5da8dSAndroid Build Coastguard Worker if self._write_ready_fut is None: 251*cda5da8dSAndroid Build Coastguard Worker return 252*cda5da8dSAndroid Build Coastguard Worker self._write_ready_fut.set_result(False) 253*cda5da8dSAndroid Build Coastguard Worker self._write_ready_fut = None 254*cda5da8dSAndroid Build Coastguard Worker 255*cda5da8dSAndroid Build Coastguard Worker def data_received(self, data): 256*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError("Invalid state: reading should be paused") 257*cda5da8dSAndroid Build Coastguard Worker 258*cda5da8dSAndroid Build Coastguard Worker def eof_received(self): 259*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError("Invalid state: reading should be paused") 260*cda5da8dSAndroid Build Coastguard Worker 261*cda5da8dSAndroid Build Coastguard Worker async def restore(self): 262*cda5da8dSAndroid Build Coastguard Worker self._transport.set_protocol(self._proto) 263*cda5da8dSAndroid Build Coastguard Worker if self._should_resume_reading: 264*cda5da8dSAndroid Build Coastguard Worker self._transport.resume_reading() 265*cda5da8dSAndroid Build Coastguard Worker if self._write_ready_fut is not None: 266*cda5da8dSAndroid Build Coastguard Worker # Cancel the future. 267*cda5da8dSAndroid Build Coastguard Worker # Basically it has no effect because protocol is switched back, 268*cda5da8dSAndroid Build Coastguard Worker # no code should wait for it anymore. 269*cda5da8dSAndroid Build Coastguard Worker self._write_ready_fut.cancel() 270*cda5da8dSAndroid Build Coastguard Worker if self._should_resume_writing: 271*cda5da8dSAndroid Build Coastguard Worker self._proto.resume_writing() 272*cda5da8dSAndroid Build Coastguard Worker 273*cda5da8dSAndroid Build Coastguard Worker 274*cda5da8dSAndroid Build Coastguard Workerclass Server(events.AbstractServer): 275*cda5da8dSAndroid Build Coastguard Worker 276*cda5da8dSAndroid Build Coastguard Worker def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 277*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout, ssl_shutdown_timeout=None): 278*cda5da8dSAndroid Build Coastguard Worker self._loop = loop 279*cda5da8dSAndroid Build Coastguard Worker self._sockets = sockets 280*cda5da8dSAndroid Build Coastguard Worker self._active_count = 0 281*cda5da8dSAndroid Build Coastguard Worker self._waiters = [] 282*cda5da8dSAndroid Build Coastguard Worker self._protocol_factory = protocol_factory 283*cda5da8dSAndroid Build Coastguard Worker self._backlog = backlog 284*cda5da8dSAndroid Build Coastguard Worker self._ssl_context = ssl_context 285*cda5da8dSAndroid Build Coastguard Worker self._ssl_handshake_timeout = ssl_handshake_timeout 286*cda5da8dSAndroid Build Coastguard Worker self._ssl_shutdown_timeout = ssl_shutdown_timeout 287*cda5da8dSAndroid Build Coastguard Worker self._serving = False 288*cda5da8dSAndroid Build Coastguard Worker self._serving_forever_fut = None 289*cda5da8dSAndroid Build Coastguard Worker 290*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 291*cda5da8dSAndroid Build Coastguard Worker return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 292*cda5da8dSAndroid Build Coastguard Worker 293*cda5da8dSAndroid Build Coastguard Worker def _attach(self): 294*cda5da8dSAndroid Build Coastguard Worker assert self._sockets is not None 295*cda5da8dSAndroid Build Coastguard Worker self._active_count += 1 296*cda5da8dSAndroid Build Coastguard Worker 297*cda5da8dSAndroid Build Coastguard Worker def _detach(self): 298*cda5da8dSAndroid Build Coastguard Worker assert self._active_count > 0 299*cda5da8dSAndroid Build Coastguard Worker self._active_count -= 1 300*cda5da8dSAndroid Build Coastguard Worker if self._active_count == 0 and self._sockets is None: 301*cda5da8dSAndroid Build Coastguard Worker self._wakeup() 302*cda5da8dSAndroid Build Coastguard Worker 303*cda5da8dSAndroid Build Coastguard Worker def _wakeup(self): 304*cda5da8dSAndroid Build Coastguard Worker waiters = self._waiters 305*cda5da8dSAndroid Build Coastguard Worker self._waiters = None 306*cda5da8dSAndroid Build Coastguard Worker for waiter in waiters: 307*cda5da8dSAndroid Build Coastguard Worker if not waiter.done(): 308*cda5da8dSAndroid Build Coastguard Worker waiter.set_result(waiter) 309*cda5da8dSAndroid Build Coastguard Worker 310*cda5da8dSAndroid Build Coastguard Worker def _start_serving(self): 311*cda5da8dSAndroid Build Coastguard Worker if self._serving: 312*cda5da8dSAndroid Build Coastguard Worker return 313*cda5da8dSAndroid Build Coastguard Worker self._serving = True 314*cda5da8dSAndroid Build Coastguard Worker for sock in self._sockets: 315*cda5da8dSAndroid Build Coastguard Worker sock.listen(self._backlog) 316*cda5da8dSAndroid Build Coastguard Worker self._loop._start_serving( 317*cda5da8dSAndroid Build Coastguard Worker self._protocol_factory, sock, self._ssl_context, 318*cda5da8dSAndroid Build Coastguard Worker self, self._backlog, self._ssl_handshake_timeout, 319*cda5da8dSAndroid Build Coastguard Worker self._ssl_shutdown_timeout) 320*cda5da8dSAndroid Build Coastguard Worker 321*cda5da8dSAndroid Build Coastguard Worker def get_loop(self): 322*cda5da8dSAndroid Build Coastguard Worker return self._loop 323*cda5da8dSAndroid Build Coastguard Worker 324*cda5da8dSAndroid Build Coastguard Worker def is_serving(self): 325*cda5da8dSAndroid Build Coastguard Worker return self._serving 326*cda5da8dSAndroid Build Coastguard Worker 327*cda5da8dSAndroid Build Coastguard Worker @property 328*cda5da8dSAndroid Build Coastguard Worker def sockets(self): 329*cda5da8dSAndroid Build Coastguard Worker if self._sockets is None: 330*cda5da8dSAndroid Build Coastguard Worker return () 331*cda5da8dSAndroid Build Coastguard Worker return tuple(trsock.TransportSocket(s) for s in self._sockets) 332*cda5da8dSAndroid Build Coastguard Worker 333*cda5da8dSAndroid Build Coastguard Worker def close(self): 334*cda5da8dSAndroid Build Coastguard Worker sockets = self._sockets 335*cda5da8dSAndroid Build Coastguard Worker if sockets is None: 336*cda5da8dSAndroid Build Coastguard Worker return 337*cda5da8dSAndroid Build Coastguard Worker self._sockets = None 338*cda5da8dSAndroid Build Coastguard Worker 339*cda5da8dSAndroid Build Coastguard Worker for sock in sockets: 340*cda5da8dSAndroid Build Coastguard Worker self._loop._stop_serving(sock) 341*cda5da8dSAndroid Build Coastguard Worker 342*cda5da8dSAndroid Build Coastguard Worker self._serving = False 343*cda5da8dSAndroid Build Coastguard Worker 344*cda5da8dSAndroid Build Coastguard Worker if (self._serving_forever_fut is not None and 345*cda5da8dSAndroid Build Coastguard Worker not self._serving_forever_fut.done()): 346*cda5da8dSAndroid Build Coastguard Worker self._serving_forever_fut.cancel() 347*cda5da8dSAndroid Build Coastguard Worker self._serving_forever_fut = None 348*cda5da8dSAndroid Build Coastguard Worker 349*cda5da8dSAndroid Build Coastguard Worker if self._active_count == 0: 350*cda5da8dSAndroid Build Coastguard Worker self._wakeup() 351*cda5da8dSAndroid Build Coastguard Worker 352*cda5da8dSAndroid Build Coastguard Worker async def start_serving(self): 353*cda5da8dSAndroid Build Coastguard Worker self._start_serving() 354*cda5da8dSAndroid Build Coastguard Worker # Skip one loop iteration so that all 'loop.add_reader' 355*cda5da8dSAndroid Build Coastguard Worker # go through. 356*cda5da8dSAndroid Build Coastguard Worker await tasks.sleep(0) 357*cda5da8dSAndroid Build Coastguard Worker 358*cda5da8dSAndroid Build Coastguard Worker async def serve_forever(self): 359*cda5da8dSAndroid Build Coastguard Worker if self._serving_forever_fut is not None: 360*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError( 361*cda5da8dSAndroid Build Coastguard Worker f'server {self!r} is already being awaited on serve_forever()') 362*cda5da8dSAndroid Build Coastguard Worker if self._sockets is None: 363*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError(f'server {self!r} is closed') 364*cda5da8dSAndroid Build Coastguard Worker 365*cda5da8dSAndroid Build Coastguard Worker self._start_serving() 366*cda5da8dSAndroid Build Coastguard Worker self._serving_forever_fut = self._loop.create_future() 367*cda5da8dSAndroid Build Coastguard Worker 368*cda5da8dSAndroid Build Coastguard Worker try: 369*cda5da8dSAndroid Build Coastguard Worker await self._serving_forever_fut 370*cda5da8dSAndroid Build Coastguard Worker except exceptions.CancelledError: 371*cda5da8dSAndroid Build Coastguard Worker try: 372*cda5da8dSAndroid Build Coastguard Worker self.close() 373*cda5da8dSAndroid Build Coastguard Worker await self.wait_closed() 374*cda5da8dSAndroid Build Coastguard Worker finally: 375*cda5da8dSAndroid Build Coastguard Worker raise 376*cda5da8dSAndroid Build Coastguard Worker finally: 377*cda5da8dSAndroid Build Coastguard Worker self._serving_forever_fut = None 378*cda5da8dSAndroid Build Coastguard Worker 379*cda5da8dSAndroid Build Coastguard Worker async def wait_closed(self): 380*cda5da8dSAndroid Build Coastguard Worker if self._sockets is None or self._waiters is None: 381*cda5da8dSAndroid Build Coastguard Worker return 382*cda5da8dSAndroid Build Coastguard Worker waiter = self._loop.create_future() 383*cda5da8dSAndroid Build Coastguard Worker self._waiters.append(waiter) 384*cda5da8dSAndroid Build Coastguard Worker await waiter 385*cda5da8dSAndroid Build Coastguard Worker 386*cda5da8dSAndroid Build Coastguard Worker 387*cda5da8dSAndroid Build Coastguard Workerclass BaseEventLoop(events.AbstractEventLoop): 388*cda5da8dSAndroid Build Coastguard Worker 389*cda5da8dSAndroid Build Coastguard Worker def __init__(self): 390*cda5da8dSAndroid Build Coastguard Worker self._timer_cancelled_count = 0 391*cda5da8dSAndroid Build Coastguard Worker self._closed = False 392*cda5da8dSAndroid Build Coastguard Worker self._stopping = False 393*cda5da8dSAndroid Build Coastguard Worker self._ready = collections.deque() 394*cda5da8dSAndroid Build Coastguard Worker self._scheduled = [] 395*cda5da8dSAndroid Build Coastguard Worker self._default_executor = None 396*cda5da8dSAndroid Build Coastguard Worker self._internal_fds = 0 397*cda5da8dSAndroid Build Coastguard Worker # Identifier of the thread running the event loop, or None if the 398*cda5da8dSAndroid Build Coastguard Worker # event loop is not running 399*cda5da8dSAndroid Build Coastguard Worker self._thread_id = None 400*cda5da8dSAndroid Build Coastguard Worker self._clock_resolution = time.get_clock_info('monotonic').resolution 401*cda5da8dSAndroid Build Coastguard Worker self._exception_handler = None 402*cda5da8dSAndroid Build Coastguard Worker self.set_debug(coroutines._is_debug_mode()) 403*cda5da8dSAndroid Build Coastguard Worker # In debug mode, if the execution of a callback or a step of a task 404*cda5da8dSAndroid Build Coastguard Worker # exceed this duration in seconds, the slow callback/task is logged. 405*cda5da8dSAndroid Build Coastguard Worker self.slow_callback_duration = 0.1 406*cda5da8dSAndroid Build Coastguard Worker self._current_handle = None 407*cda5da8dSAndroid Build Coastguard Worker self._task_factory = None 408*cda5da8dSAndroid Build Coastguard Worker self._coroutine_origin_tracking_enabled = False 409*cda5da8dSAndroid Build Coastguard Worker self._coroutine_origin_tracking_saved_depth = None 410*cda5da8dSAndroid Build Coastguard Worker 411*cda5da8dSAndroid Build Coastguard Worker # A weak set of all asynchronous generators that are 412*cda5da8dSAndroid Build Coastguard Worker # being iterated by the loop. 413*cda5da8dSAndroid Build Coastguard Worker self._asyncgens = weakref.WeakSet() 414*cda5da8dSAndroid Build Coastguard Worker # Set to True when `loop.shutdown_asyncgens` is called. 415*cda5da8dSAndroid Build Coastguard Worker self._asyncgens_shutdown_called = False 416*cda5da8dSAndroid Build Coastguard Worker # Set to True when `loop.shutdown_default_executor` is called. 417*cda5da8dSAndroid Build Coastguard Worker self._executor_shutdown_called = False 418*cda5da8dSAndroid Build Coastguard Worker 419*cda5da8dSAndroid Build Coastguard Worker def __repr__(self): 420*cda5da8dSAndroid Build Coastguard Worker return ( 421*cda5da8dSAndroid Build Coastguard Worker f'<{self.__class__.__name__} running={self.is_running()} ' 422*cda5da8dSAndroid Build Coastguard Worker f'closed={self.is_closed()} debug={self.get_debug()}>' 423*cda5da8dSAndroid Build Coastguard Worker ) 424*cda5da8dSAndroid Build Coastguard Worker 425*cda5da8dSAndroid Build Coastguard Worker def create_future(self): 426*cda5da8dSAndroid Build Coastguard Worker """Create a Future object attached to the loop.""" 427*cda5da8dSAndroid Build Coastguard Worker return futures.Future(loop=self) 428*cda5da8dSAndroid Build Coastguard Worker 429*cda5da8dSAndroid Build Coastguard Worker def create_task(self, coro, *, name=None, context=None): 430*cda5da8dSAndroid Build Coastguard Worker """Schedule a coroutine object. 431*cda5da8dSAndroid Build Coastguard Worker 432*cda5da8dSAndroid Build Coastguard Worker Return a task object. 433*cda5da8dSAndroid Build Coastguard Worker """ 434*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 435*cda5da8dSAndroid Build Coastguard Worker if self._task_factory is None: 436*cda5da8dSAndroid Build Coastguard Worker task = tasks.Task(coro, loop=self, name=name, context=context) 437*cda5da8dSAndroid Build Coastguard Worker if task._source_traceback: 438*cda5da8dSAndroid Build Coastguard Worker del task._source_traceback[-1] 439*cda5da8dSAndroid Build Coastguard Worker else: 440*cda5da8dSAndroid Build Coastguard Worker if context is None: 441*cda5da8dSAndroid Build Coastguard Worker # Use legacy API if context is not needed 442*cda5da8dSAndroid Build Coastguard Worker task = self._task_factory(self, coro) 443*cda5da8dSAndroid Build Coastguard Worker else: 444*cda5da8dSAndroid Build Coastguard Worker task = self._task_factory(self, coro, context=context) 445*cda5da8dSAndroid Build Coastguard Worker 446*cda5da8dSAndroid Build Coastguard Worker tasks._set_task_name(task, name) 447*cda5da8dSAndroid Build Coastguard Worker 448*cda5da8dSAndroid Build Coastguard Worker return task 449*cda5da8dSAndroid Build Coastguard Worker 450*cda5da8dSAndroid Build Coastguard Worker def set_task_factory(self, factory): 451*cda5da8dSAndroid Build Coastguard Worker """Set a task factory that will be used by loop.create_task(). 452*cda5da8dSAndroid Build Coastguard Worker 453*cda5da8dSAndroid Build Coastguard Worker If factory is None the default task factory will be set. 454*cda5da8dSAndroid Build Coastguard Worker 455*cda5da8dSAndroid Build Coastguard Worker If factory is a callable, it should have a signature matching 456*cda5da8dSAndroid Build Coastguard Worker '(loop, coro)', where 'loop' will be a reference to the active 457*cda5da8dSAndroid Build Coastguard Worker event loop, 'coro' will be a coroutine object. The callable 458*cda5da8dSAndroid Build Coastguard Worker must return a Future. 459*cda5da8dSAndroid Build Coastguard Worker """ 460*cda5da8dSAndroid Build Coastguard Worker if factory is not None and not callable(factory): 461*cda5da8dSAndroid Build Coastguard Worker raise TypeError('task factory must be a callable or None') 462*cda5da8dSAndroid Build Coastguard Worker self._task_factory = factory 463*cda5da8dSAndroid Build Coastguard Worker 464*cda5da8dSAndroid Build Coastguard Worker def get_task_factory(self): 465*cda5da8dSAndroid Build Coastguard Worker """Return a task factory, or None if the default one is in use.""" 466*cda5da8dSAndroid Build Coastguard Worker return self._task_factory 467*cda5da8dSAndroid Build Coastguard Worker 468*cda5da8dSAndroid Build Coastguard Worker def _make_socket_transport(self, sock, protocol, waiter=None, *, 469*cda5da8dSAndroid Build Coastguard Worker extra=None, server=None): 470*cda5da8dSAndroid Build Coastguard Worker """Create socket transport.""" 471*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError 472*cda5da8dSAndroid Build Coastguard Worker 473*cda5da8dSAndroid Build Coastguard Worker def _make_ssl_transport( 474*cda5da8dSAndroid Build Coastguard Worker self, rawsock, protocol, sslcontext, waiter=None, 475*cda5da8dSAndroid Build Coastguard Worker *, server_side=False, server_hostname=None, 476*cda5da8dSAndroid Build Coastguard Worker extra=None, server=None, 477*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=None, 478*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=None, 479*cda5da8dSAndroid Build Coastguard Worker call_connection_made=True): 480*cda5da8dSAndroid Build Coastguard Worker """Create SSL transport.""" 481*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError 482*cda5da8dSAndroid Build Coastguard Worker 483*cda5da8dSAndroid Build Coastguard Worker def _make_datagram_transport(self, sock, protocol, 484*cda5da8dSAndroid Build Coastguard Worker address=None, waiter=None, extra=None): 485*cda5da8dSAndroid Build Coastguard Worker """Create datagram transport.""" 486*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError 487*cda5da8dSAndroid Build Coastguard Worker 488*cda5da8dSAndroid Build Coastguard Worker def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 489*cda5da8dSAndroid Build Coastguard Worker extra=None): 490*cda5da8dSAndroid Build Coastguard Worker """Create read pipe transport.""" 491*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError 492*cda5da8dSAndroid Build Coastguard Worker 493*cda5da8dSAndroid Build Coastguard Worker def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 494*cda5da8dSAndroid Build Coastguard Worker extra=None): 495*cda5da8dSAndroid Build Coastguard Worker """Create write pipe transport.""" 496*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError 497*cda5da8dSAndroid Build Coastguard Worker 498*cda5da8dSAndroid Build Coastguard Worker async def _make_subprocess_transport(self, protocol, args, shell, 499*cda5da8dSAndroid Build Coastguard Worker stdin, stdout, stderr, bufsize, 500*cda5da8dSAndroid Build Coastguard Worker extra=None, **kwargs): 501*cda5da8dSAndroid Build Coastguard Worker """Create subprocess transport.""" 502*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError 503*cda5da8dSAndroid Build Coastguard Worker 504*cda5da8dSAndroid Build Coastguard Worker def _write_to_self(self): 505*cda5da8dSAndroid Build Coastguard Worker """Write a byte to self-pipe, to wake up the event loop. 506*cda5da8dSAndroid Build Coastguard Worker 507*cda5da8dSAndroid Build Coastguard Worker This may be called from a different thread. 508*cda5da8dSAndroid Build Coastguard Worker 509*cda5da8dSAndroid Build Coastguard Worker The subclass is responsible for implementing the self-pipe. 510*cda5da8dSAndroid Build Coastguard Worker """ 511*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError 512*cda5da8dSAndroid Build Coastguard Worker 513*cda5da8dSAndroid Build Coastguard Worker def _process_events(self, event_list): 514*cda5da8dSAndroid Build Coastguard Worker """Process selector events.""" 515*cda5da8dSAndroid Build Coastguard Worker raise NotImplementedError 516*cda5da8dSAndroid Build Coastguard Worker 517*cda5da8dSAndroid Build Coastguard Worker def _check_closed(self): 518*cda5da8dSAndroid Build Coastguard Worker if self._closed: 519*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('Event loop is closed') 520*cda5da8dSAndroid Build Coastguard Worker 521*cda5da8dSAndroid Build Coastguard Worker def _check_default_executor(self): 522*cda5da8dSAndroid Build Coastguard Worker if self._executor_shutdown_called: 523*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('Executor shutdown has been called') 524*cda5da8dSAndroid Build Coastguard Worker 525*cda5da8dSAndroid Build Coastguard Worker def _asyncgen_finalizer_hook(self, agen): 526*cda5da8dSAndroid Build Coastguard Worker self._asyncgens.discard(agen) 527*cda5da8dSAndroid Build Coastguard Worker if not self.is_closed(): 528*cda5da8dSAndroid Build Coastguard Worker self.call_soon_threadsafe(self.create_task, agen.aclose()) 529*cda5da8dSAndroid Build Coastguard Worker 530*cda5da8dSAndroid Build Coastguard Worker def _asyncgen_firstiter_hook(self, agen): 531*cda5da8dSAndroid Build Coastguard Worker if self._asyncgens_shutdown_called: 532*cda5da8dSAndroid Build Coastguard Worker warnings.warn( 533*cda5da8dSAndroid Build Coastguard Worker f"asynchronous generator {agen!r} was scheduled after " 534*cda5da8dSAndroid Build Coastguard Worker f"loop.shutdown_asyncgens() call", 535*cda5da8dSAndroid Build Coastguard Worker ResourceWarning, source=self) 536*cda5da8dSAndroid Build Coastguard Worker 537*cda5da8dSAndroid Build Coastguard Worker self._asyncgens.add(agen) 538*cda5da8dSAndroid Build Coastguard Worker 539*cda5da8dSAndroid Build Coastguard Worker async def shutdown_asyncgens(self): 540*cda5da8dSAndroid Build Coastguard Worker """Shutdown all active asynchronous generators.""" 541*cda5da8dSAndroid Build Coastguard Worker self._asyncgens_shutdown_called = True 542*cda5da8dSAndroid Build Coastguard Worker 543*cda5da8dSAndroid Build Coastguard Worker if not len(self._asyncgens): 544*cda5da8dSAndroid Build Coastguard Worker # If Python version is <3.6 or we don't have any asynchronous 545*cda5da8dSAndroid Build Coastguard Worker # generators alive. 546*cda5da8dSAndroid Build Coastguard Worker return 547*cda5da8dSAndroid Build Coastguard Worker 548*cda5da8dSAndroid Build Coastguard Worker closing_agens = list(self._asyncgens) 549*cda5da8dSAndroid Build Coastguard Worker self._asyncgens.clear() 550*cda5da8dSAndroid Build Coastguard Worker 551*cda5da8dSAndroid Build Coastguard Worker results = await tasks.gather( 552*cda5da8dSAndroid Build Coastguard Worker *[ag.aclose() for ag in closing_agens], 553*cda5da8dSAndroid Build Coastguard Worker return_exceptions=True) 554*cda5da8dSAndroid Build Coastguard Worker 555*cda5da8dSAndroid Build Coastguard Worker for result, agen in zip(results, closing_agens): 556*cda5da8dSAndroid Build Coastguard Worker if isinstance(result, Exception): 557*cda5da8dSAndroid Build Coastguard Worker self.call_exception_handler({ 558*cda5da8dSAndroid Build Coastguard Worker 'message': f'an error occurred during closing of ' 559*cda5da8dSAndroid Build Coastguard Worker f'asynchronous generator {agen!r}', 560*cda5da8dSAndroid Build Coastguard Worker 'exception': result, 561*cda5da8dSAndroid Build Coastguard Worker 'asyncgen': agen 562*cda5da8dSAndroid Build Coastguard Worker }) 563*cda5da8dSAndroid Build Coastguard Worker 564*cda5da8dSAndroid Build Coastguard Worker async def shutdown_default_executor(self): 565*cda5da8dSAndroid Build Coastguard Worker """Schedule the shutdown of the default executor.""" 566*cda5da8dSAndroid Build Coastguard Worker self._executor_shutdown_called = True 567*cda5da8dSAndroid Build Coastguard Worker if self._default_executor is None: 568*cda5da8dSAndroid Build Coastguard Worker return 569*cda5da8dSAndroid Build Coastguard Worker future = self.create_future() 570*cda5da8dSAndroid Build Coastguard Worker thread = threading.Thread(target=self._do_shutdown, args=(future,)) 571*cda5da8dSAndroid Build Coastguard Worker thread.start() 572*cda5da8dSAndroid Build Coastguard Worker try: 573*cda5da8dSAndroid Build Coastguard Worker await future 574*cda5da8dSAndroid Build Coastguard Worker finally: 575*cda5da8dSAndroid Build Coastguard Worker thread.join() 576*cda5da8dSAndroid Build Coastguard Worker 577*cda5da8dSAndroid Build Coastguard Worker def _do_shutdown(self, future): 578*cda5da8dSAndroid Build Coastguard Worker try: 579*cda5da8dSAndroid Build Coastguard Worker self._default_executor.shutdown(wait=True) 580*cda5da8dSAndroid Build Coastguard Worker if not self.is_closed(): 581*cda5da8dSAndroid Build Coastguard Worker self.call_soon_threadsafe(future.set_result, None) 582*cda5da8dSAndroid Build Coastguard Worker except Exception as ex: 583*cda5da8dSAndroid Build Coastguard Worker if not self.is_closed(): 584*cda5da8dSAndroid Build Coastguard Worker self.call_soon_threadsafe(future.set_exception, ex) 585*cda5da8dSAndroid Build Coastguard Worker 586*cda5da8dSAndroid Build Coastguard Worker def _check_running(self): 587*cda5da8dSAndroid Build Coastguard Worker if self.is_running(): 588*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('This event loop is already running') 589*cda5da8dSAndroid Build Coastguard Worker if events._get_running_loop() is not None: 590*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError( 591*cda5da8dSAndroid Build Coastguard Worker 'Cannot run the event loop while another loop is running') 592*cda5da8dSAndroid Build Coastguard Worker 593*cda5da8dSAndroid Build Coastguard Worker def run_forever(self): 594*cda5da8dSAndroid Build Coastguard Worker """Run until stop() is called.""" 595*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 596*cda5da8dSAndroid Build Coastguard Worker self._check_running() 597*cda5da8dSAndroid Build Coastguard Worker self._set_coroutine_origin_tracking(self._debug) 598*cda5da8dSAndroid Build Coastguard Worker 599*cda5da8dSAndroid Build Coastguard Worker old_agen_hooks = sys.get_asyncgen_hooks() 600*cda5da8dSAndroid Build Coastguard Worker try: 601*cda5da8dSAndroid Build Coastguard Worker self._thread_id = threading.get_ident() 602*cda5da8dSAndroid Build Coastguard Worker sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, 603*cda5da8dSAndroid Build Coastguard Worker finalizer=self._asyncgen_finalizer_hook) 604*cda5da8dSAndroid Build Coastguard Worker 605*cda5da8dSAndroid Build Coastguard Worker events._set_running_loop(self) 606*cda5da8dSAndroid Build Coastguard Worker while True: 607*cda5da8dSAndroid Build Coastguard Worker self._run_once() 608*cda5da8dSAndroid Build Coastguard Worker if self._stopping: 609*cda5da8dSAndroid Build Coastguard Worker break 610*cda5da8dSAndroid Build Coastguard Worker finally: 611*cda5da8dSAndroid Build Coastguard Worker self._stopping = False 612*cda5da8dSAndroid Build Coastguard Worker self._thread_id = None 613*cda5da8dSAndroid Build Coastguard Worker events._set_running_loop(None) 614*cda5da8dSAndroid Build Coastguard Worker self._set_coroutine_origin_tracking(False) 615*cda5da8dSAndroid Build Coastguard Worker sys.set_asyncgen_hooks(*old_agen_hooks) 616*cda5da8dSAndroid Build Coastguard Worker 617*cda5da8dSAndroid Build Coastguard Worker def run_until_complete(self, future): 618*cda5da8dSAndroid Build Coastguard Worker """Run until the Future is done. 619*cda5da8dSAndroid Build Coastguard Worker 620*cda5da8dSAndroid Build Coastguard Worker If the argument is a coroutine, it is wrapped in a Task. 621*cda5da8dSAndroid Build Coastguard Worker 622*cda5da8dSAndroid Build Coastguard Worker WARNING: It would be disastrous to call run_until_complete() 623*cda5da8dSAndroid Build Coastguard Worker with the same coroutine twice -- it would wrap it in two 624*cda5da8dSAndroid Build Coastguard Worker different Tasks and that can't be good. 625*cda5da8dSAndroid Build Coastguard Worker 626*cda5da8dSAndroid Build Coastguard Worker Return the Future's result, or raise its exception. 627*cda5da8dSAndroid Build Coastguard Worker """ 628*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 629*cda5da8dSAndroid Build Coastguard Worker self._check_running() 630*cda5da8dSAndroid Build Coastguard Worker 631*cda5da8dSAndroid Build Coastguard Worker new_task = not futures.isfuture(future) 632*cda5da8dSAndroid Build Coastguard Worker future = tasks.ensure_future(future, loop=self) 633*cda5da8dSAndroid Build Coastguard Worker if new_task: 634*cda5da8dSAndroid Build Coastguard Worker # An exception is raised if the future didn't complete, so there 635*cda5da8dSAndroid Build Coastguard Worker # is no need to log the "destroy pending task" message 636*cda5da8dSAndroid Build Coastguard Worker future._log_destroy_pending = False 637*cda5da8dSAndroid Build Coastguard Worker 638*cda5da8dSAndroid Build Coastguard Worker future.add_done_callback(_run_until_complete_cb) 639*cda5da8dSAndroid Build Coastguard Worker try: 640*cda5da8dSAndroid Build Coastguard Worker self.run_forever() 641*cda5da8dSAndroid Build Coastguard Worker except: 642*cda5da8dSAndroid Build Coastguard Worker if new_task and future.done() and not future.cancelled(): 643*cda5da8dSAndroid Build Coastguard Worker # The coroutine raised a BaseException. Consume the exception 644*cda5da8dSAndroid Build Coastguard Worker # to not log a warning, the caller doesn't have access to the 645*cda5da8dSAndroid Build Coastguard Worker # local task. 646*cda5da8dSAndroid Build Coastguard Worker future.exception() 647*cda5da8dSAndroid Build Coastguard Worker raise 648*cda5da8dSAndroid Build Coastguard Worker finally: 649*cda5da8dSAndroid Build Coastguard Worker future.remove_done_callback(_run_until_complete_cb) 650*cda5da8dSAndroid Build Coastguard Worker if not future.done(): 651*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('Event loop stopped before Future completed.') 652*cda5da8dSAndroid Build Coastguard Worker 653*cda5da8dSAndroid Build Coastguard Worker return future.result() 654*cda5da8dSAndroid Build Coastguard Worker 655*cda5da8dSAndroid Build Coastguard Worker def stop(self): 656*cda5da8dSAndroid Build Coastguard Worker """Stop running the event loop. 657*cda5da8dSAndroid Build Coastguard Worker 658*cda5da8dSAndroid Build Coastguard Worker Every callback already scheduled will still run. This simply informs 659*cda5da8dSAndroid Build Coastguard Worker run_forever to stop looping after a complete iteration. 660*cda5da8dSAndroid Build Coastguard Worker """ 661*cda5da8dSAndroid Build Coastguard Worker self._stopping = True 662*cda5da8dSAndroid Build Coastguard Worker 663*cda5da8dSAndroid Build Coastguard Worker def close(self): 664*cda5da8dSAndroid Build Coastguard Worker """Close the event loop. 665*cda5da8dSAndroid Build Coastguard Worker 666*cda5da8dSAndroid Build Coastguard Worker This clears the queues and shuts down the executor, 667*cda5da8dSAndroid Build Coastguard Worker but does not wait for the executor to finish. 668*cda5da8dSAndroid Build Coastguard Worker 669*cda5da8dSAndroid Build Coastguard Worker The event loop must not be running. 670*cda5da8dSAndroid Build Coastguard Worker """ 671*cda5da8dSAndroid Build Coastguard Worker if self.is_running(): 672*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError("Cannot close a running event loop") 673*cda5da8dSAndroid Build Coastguard Worker if self._closed: 674*cda5da8dSAndroid Build Coastguard Worker return 675*cda5da8dSAndroid Build Coastguard Worker if self._debug: 676*cda5da8dSAndroid Build Coastguard Worker logger.debug("Close %r", self) 677*cda5da8dSAndroid Build Coastguard Worker self._closed = True 678*cda5da8dSAndroid Build Coastguard Worker self._ready.clear() 679*cda5da8dSAndroid Build Coastguard Worker self._scheduled.clear() 680*cda5da8dSAndroid Build Coastguard Worker self._executor_shutdown_called = True 681*cda5da8dSAndroid Build Coastguard Worker executor = self._default_executor 682*cda5da8dSAndroid Build Coastguard Worker if executor is not None: 683*cda5da8dSAndroid Build Coastguard Worker self._default_executor = None 684*cda5da8dSAndroid Build Coastguard Worker executor.shutdown(wait=False) 685*cda5da8dSAndroid Build Coastguard Worker 686*cda5da8dSAndroid Build Coastguard Worker def is_closed(self): 687*cda5da8dSAndroid Build Coastguard Worker """Returns True if the event loop was closed.""" 688*cda5da8dSAndroid Build Coastguard Worker return self._closed 689*cda5da8dSAndroid Build Coastguard Worker 690*cda5da8dSAndroid Build Coastguard Worker def __del__(self, _warn=warnings.warn): 691*cda5da8dSAndroid Build Coastguard Worker if not self.is_closed(): 692*cda5da8dSAndroid Build Coastguard Worker _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 693*cda5da8dSAndroid Build Coastguard Worker if not self.is_running(): 694*cda5da8dSAndroid Build Coastguard Worker self.close() 695*cda5da8dSAndroid Build Coastguard Worker 696*cda5da8dSAndroid Build Coastguard Worker def is_running(self): 697*cda5da8dSAndroid Build Coastguard Worker """Returns True if the event loop is running.""" 698*cda5da8dSAndroid Build Coastguard Worker return (self._thread_id is not None) 699*cda5da8dSAndroid Build Coastguard Worker 700*cda5da8dSAndroid Build Coastguard Worker def time(self): 701*cda5da8dSAndroid Build Coastguard Worker """Return the time according to the event loop's clock. 702*cda5da8dSAndroid Build Coastguard Worker 703*cda5da8dSAndroid Build Coastguard Worker This is a float expressed in seconds since an epoch, but the 704*cda5da8dSAndroid Build Coastguard Worker epoch, precision, accuracy and drift are unspecified and may 705*cda5da8dSAndroid Build Coastguard Worker differ per event loop. 706*cda5da8dSAndroid Build Coastguard Worker """ 707*cda5da8dSAndroid Build Coastguard Worker return time.monotonic() 708*cda5da8dSAndroid Build Coastguard Worker 709*cda5da8dSAndroid Build Coastguard Worker def call_later(self, delay, callback, *args, context=None): 710*cda5da8dSAndroid Build Coastguard Worker """Arrange for a callback to be called at a given time. 711*cda5da8dSAndroid Build Coastguard Worker 712*cda5da8dSAndroid Build Coastguard Worker Return a Handle: an opaque object with a cancel() method that 713*cda5da8dSAndroid Build Coastguard Worker can be used to cancel the call. 714*cda5da8dSAndroid Build Coastguard Worker 715*cda5da8dSAndroid Build Coastguard Worker The delay can be an int or float, expressed in seconds. It is 716*cda5da8dSAndroid Build Coastguard Worker always relative to the current time. 717*cda5da8dSAndroid Build Coastguard Worker 718*cda5da8dSAndroid Build Coastguard Worker Each callback will be called exactly once. If two callbacks 719*cda5da8dSAndroid Build Coastguard Worker are scheduled for exactly the same time, it undefined which 720*cda5da8dSAndroid Build Coastguard Worker will be called first. 721*cda5da8dSAndroid Build Coastguard Worker 722*cda5da8dSAndroid Build Coastguard Worker Any positional arguments after the callback will be passed to 723*cda5da8dSAndroid Build Coastguard Worker the callback when it is called. 724*cda5da8dSAndroid Build Coastguard Worker """ 725*cda5da8dSAndroid Build Coastguard Worker if delay is None: 726*cda5da8dSAndroid Build Coastguard Worker raise TypeError('delay must not be None') 727*cda5da8dSAndroid Build Coastguard Worker timer = self.call_at(self.time() + delay, callback, *args, 728*cda5da8dSAndroid Build Coastguard Worker context=context) 729*cda5da8dSAndroid Build Coastguard Worker if timer._source_traceback: 730*cda5da8dSAndroid Build Coastguard Worker del timer._source_traceback[-1] 731*cda5da8dSAndroid Build Coastguard Worker return timer 732*cda5da8dSAndroid Build Coastguard Worker 733*cda5da8dSAndroid Build Coastguard Worker def call_at(self, when, callback, *args, context=None): 734*cda5da8dSAndroid Build Coastguard Worker """Like call_later(), but uses an absolute time. 735*cda5da8dSAndroid Build Coastguard Worker 736*cda5da8dSAndroid Build Coastguard Worker Absolute time corresponds to the event loop's time() method. 737*cda5da8dSAndroid Build Coastguard Worker """ 738*cda5da8dSAndroid Build Coastguard Worker if when is None: 739*cda5da8dSAndroid Build Coastguard Worker raise TypeError("when cannot be None") 740*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 741*cda5da8dSAndroid Build Coastguard Worker if self._debug: 742*cda5da8dSAndroid Build Coastguard Worker self._check_thread() 743*cda5da8dSAndroid Build Coastguard Worker self._check_callback(callback, 'call_at') 744*cda5da8dSAndroid Build Coastguard Worker timer = events.TimerHandle(when, callback, args, self, context) 745*cda5da8dSAndroid Build Coastguard Worker if timer._source_traceback: 746*cda5da8dSAndroid Build Coastguard Worker del timer._source_traceback[-1] 747*cda5da8dSAndroid Build Coastguard Worker heapq.heappush(self._scheduled, timer) 748*cda5da8dSAndroid Build Coastguard Worker timer._scheduled = True 749*cda5da8dSAndroid Build Coastguard Worker return timer 750*cda5da8dSAndroid Build Coastguard Worker 751*cda5da8dSAndroid Build Coastguard Worker def call_soon(self, callback, *args, context=None): 752*cda5da8dSAndroid Build Coastguard Worker """Arrange for a callback to be called as soon as possible. 753*cda5da8dSAndroid Build Coastguard Worker 754*cda5da8dSAndroid Build Coastguard Worker This operates as a FIFO queue: callbacks are called in the 755*cda5da8dSAndroid Build Coastguard Worker order in which they are registered. Each callback will be 756*cda5da8dSAndroid Build Coastguard Worker called exactly once. 757*cda5da8dSAndroid Build Coastguard Worker 758*cda5da8dSAndroid Build Coastguard Worker Any positional arguments after the callback will be passed to 759*cda5da8dSAndroid Build Coastguard Worker the callback when it is called. 760*cda5da8dSAndroid Build Coastguard Worker """ 761*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 762*cda5da8dSAndroid Build Coastguard Worker if self._debug: 763*cda5da8dSAndroid Build Coastguard Worker self._check_thread() 764*cda5da8dSAndroid Build Coastguard Worker self._check_callback(callback, 'call_soon') 765*cda5da8dSAndroid Build Coastguard Worker handle = self._call_soon(callback, args, context) 766*cda5da8dSAndroid Build Coastguard Worker if handle._source_traceback: 767*cda5da8dSAndroid Build Coastguard Worker del handle._source_traceback[-1] 768*cda5da8dSAndroid Build Coastguard Worker return handle 769*cda5da8dSAndroid Build Coastguard Worker 770*cda5da8dSAndroid Build Coastguard Worker def _check_callback(self, callback, method): 771*cda5da8dSAndroid Build Coastguard Worker if (coroutines.iscoroutine(callback) or 772*cda5da8dSAndroid Build Coastguard Worker coroutines.iscoroutinefunction(callback)): 773*cda5da8dSAndroid Build Coastguard Worker raise TypeError( 774*cda5da8dSAndroid Build Coastguard Worker f"coroutines cannot be used with {method}()") 775*cda5da8dSAndroid Build Coastguard Worker if not callable(callback): 776*cda5da8dSAndroid Build Coastguard Worker raise TypeError( 777*cda5da8dSAndroid Build Coastguard Worker f'a callable object was expected by {method}(), ' 778*cda5da8dSAndroid Build Coastguard Worker f'got {callback!r}') 779*cda5da8dSAndroid Build Coastguard Worker 780*cda5da8dSAndroid Build Coastguard Worker def _call_soon(self, callback, args, context): 781*cda5da8dSAndroid Build Coastguard Worker handle = events.Handle(callback, args, self, context) 782*cda5da8dSAndroid Build Coastguard Worker if handle._source_traceback: 783*cda5da8dSAndroid Build Coastguard Worker del handle._source_traceback[-1] 784*cda5da8dSAndroid Build Coastguard Worker self._ready.append(handle) 785*cda5da8dSAndroid Build Coastguard Worker return handle 786*cda5da8dSAndroid Build Coastguard Worker 787*cda5da8dSAndroid Build Coastguard Worker def _check_thread(self): 788*cda5da8dSAndroid Build Coastguard Worker """Check that the current thread is the thread running the event loop. 789*cda5da8dSAndroid Build Coastguard Worker 790*cda5da8dSAndroid Build Coastguard Worker Non-thread-safe methods of this class make this assumption and will 791*cda5da8dSAndroid Build Coastguard Worker likely behave incorrectly when the assumption is violated. 792*cda5da8dSAndroid Build Coastguard Worker 793*cda5da8dSAndroid Build Coastguard Worker Should only be called when (self._debug == True). The caller is 794*cda5da8dSAndroid Build Coastguard Worker responsible for checking this condition for performance reasons. 795*cda5da8dSAndroid Build Coastguard Worker """ 796*cda5da8dSAndroid Build Coastguard Worker if self._thread_id is None: 797*cda5da8dSAndroid Build Coastguard Worker return 798*cda5da8dSAndroid Build Coastguard Worker thread_id = threading.get_ident() 799*cda5da8dSAndroid Build Coastguard Worker if thread_id != self._thread_id: 800*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError( 801*cda5da8dSAndroid Build Coastguard Worker "Non-thread-safe operation invoked on an event loop other " 802*cda5da8dSAndroid Build Coastguard Worker "than the current one") 803*cda5da8dSAndroid Build Coastguard Worker 804*cda5da8dSAndroid Build Coastguard Worker def call_soon_threadsafe(self, callback, *args, context=None): 805*cda5da8dSAndroid Build Coastguard Worker """Like call_soon(), but thread-safe.""" 806*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 807*cda5da8dSAndroid Build Coastguard Worker if self._debug: 808*cda5da8dSAndroid Build Coastguard Worker self._check_callback(callback, 'call_soon_threadsafe') 809*cda5da8dSAndroid Build Coastguard Worker handle = self._call_soon(callback, args, context) 810*cda5da8dSAndroid Build Coastguard Worker if handle._source_traceback: 811*cda5da8dSAndroid Build Coastguard Worker del handle._source_traceback[-1] 812*cda5da8dSAndroid Build Coastguard Worker self._write_to_self() 813*cda5da8dSAndroid Build Coastguard Worker return handle 814*cda5da8dSAndroid Build Coastguard Worker 815*cda5da8dSAndroid Build Coastguard Worker def run_in_executor(self, executor, func, *args): 816*cda5da8dSAndroid Build Coastguard Worker self._check_closed() 817*cda5da8dSAndroid Build Coastguard Worker if self._debug: 818*cda5da8dSAndroid Build Coastguard Worker self._check_callback(func, 'run_in_executor') 819*cda5da8dSAndroid Build Coastguard Worker if executor is None: 820*cda5da8dSAndroid Build Coastguard Worker executor = self._default_executor 821*cda5da8dSAndroid Build Coastguard Worker # Only check when the default executor is being used 822*cda5da8dSAndroid Build Coastguard Worker self._check_default_executor() 823*cda5da8dSAndroid Build Coastguard Worker if executor is None: 824*cda5da8dSAndroid Build Coastguard Worker executor = concurrent.futures.ThreadPoolExecutor( 825*cda5da8dSAndroid Build Coastguard Worker thread_name_prefix='asyncio' 826*cda5da8dSAndroid Build Coastguard Worker ) 827*cda5da8dSAndroid Build Coastguard Worker self._default_executor = executor 828*cda5da8dSAndroid Build Coastguard Worker return futures.wrap_future( 829*cda5da8dSAndroid Build Coastguard Worker executor.submit(func, *args), loop=self) 830*cda5da8dSAndroid Build Coastguard Worker 831*cda5da8dSAndroid Build Coastguard Worker def set_default_executor(self, executor): 832*cda5da8dSAndroid Build Coastguard Worker if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 833*cda5da8dSAndroid Build Coastguard Worker raise TypeError('executor must be ThreadPoolExecutor instance') 834*cda5da8dSAndroid Build Coastguard Worker self._default_executor = executor 835*cda5da8dSAndroid Build Coastguard Worker 836*cda5da8dSAndroid Build Coastguard Worker def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 837*cda5da8dSAndroid Build Coastguard Worker msg = [f"{host}:{port!r}"] 838*cda5da8dSAndroid Build Coastguard Worker if family: 839*cda5da8dSAndroid Build Coastguard Worker msg.append(f'family={family!r}') 840*cda5da8dSAndroid Build Coastguard Worker if type: 841*cda5da8dSAndroid Build Coastguard Worker msg.append(f'type={type!r}') 842*cda5da8dSAndroid Build Coastguard Worker if proto: 843*cda5da8dSAndroid Build Coastguard Worker msg.append(f'proto={proto!r}') 844*cda5da8dSAndroid Build Coastguard Worker if flags: 845*cda5da8dSAndroid Build Coastguard Worker msg.append(f'flags={flags!r}') 846*cda5da8dSAndroid Build Coastguard Worker msg = ', '.join(msg) 847*cda5da8dSAndroid Build Coastguard Worker logger.debug('Get address info %s', msg) 848*cda5da8dSAndroid Build Coastguard Worker 849*cda5da8dSAndroid Build Coastguard Worker t0 = self.time() 850*cda5da8dSAndroid Build Coastguard Worker addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 851*cda5da8dSAndroid Build Coastguard Worker dt = self.time() - t0 852*cda5da8dSAndroid Build Coastguard Worker 853*cda5da8dSAndroid Build Coastguard Worker msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 854*cda5da8dSAndroid Build Coastguard Worker if dt >= self.slow_callback_duration: 855*cda5da8dSAndroid Build Coastguard Worker logger.info(msg) 856*cda5da8dSAndroid Build Coastguard Worker else: 857*cda5da8dSAndroid Build Coastguard Worker logger.debug(msg) 858*cda5da8dSAndroid Build Coastguard Worker return addrinfo 859*cda5da8dSAndroid Build Coastguard Worker 860*cda5da8dSAndroid Build Coastguard Worker async def getaddrinfo(self, host, port, *, 861*cda5da8dSAndroid Build Coastguard Worker family=0, type=0, proto=0, flags=0): 862*cda5da8dSAndroid Build Coastguard Worker if self._debug: 863*cda5da8dSAndroid Build Coastguard Worker getaddr_func = self._getaddrinfo_debug 864*cda5da8dSAndroid Build Coastguard Worker else: 865*cda5da8dSAndroid Build Coastguard Worker getaddr_func = socket.getaddrinfo 866*cda5da8dSAndroid Build Coastguard Worker 867*cda5da8dSAndroid Build Coastguard Worker return await self.run_in_executor( 868*cda5da8dSAndroid Build Coastguard Worker None, getaddr_func, host, port, family, type, proto, flags) 869*cda5da8dSAndroid Build Coastguard Worker 870*cda5da8dSAndroid Build Coastguard Worker async def getnameinfo(self, sockaddr, flags=0): 871*cda5da8dSAndroid Build Coastguard Worker return await self.run_in_executor( 872*cda5da8dSAndroid Build Coastguard Worker None, socket.getnameinfo, sockaddr, flags) 873*cda5da8dSAndroid Build Coastguard Worker 874*cda5da8dSAndroid Build Coastguard Worker async def sock_sendfile(self, sock, file, offset=0, count=None, 875*cda5da8dSAndroid Build Coastguard Worker *, fallback=True): 876*cda5da8dSAndroid Build Coastguard Worker if self._debug and sock.gettimeout() != 0: 877*cda5da8dSAndroid Build Coastguard Worker raise ValueError("the socket must be non-blocking") 878*cda5da8dSAndroid Build Coastguard Worker _check_ssl_socket(sock) 879*cda5da8dSAndroid Build Coastguard Worker self._check_sendfile_params(sock, file, offset, count) 880*cda5da8dSAndroid Build Coastguard Worker try: 881*cda5da8dSAndroid Build Coastguard Worker return await self._sock_sendfile_native(sock, file, 882*cda5da8dSAndroid Build Coastguard Worker offset, count) 883*cda5da8dSAndroid Build Coastguard Worker except exceptions.SendfileNotAvailableError as exc: 884*cda5da8dSAndroid Build Coastguard Worker if not fallback: 885*cda5da8dSAndroid Build Coastguard Worker raise 886*cda5da8dSAndroid Build Coastguard Worker return await self._sock_sendfile_fallback(sock, file, 887*cda5da8dSAndroid Build Coastguard Worker offset, count) 888*cda5da8dSAndroid Build Coastguard Worker 889*cda5da8dSAndroid Build Coastguard Worker async def _sock_sendfile_native(self, sock, file, offset, count): 890*cda5da8dSAndroid Build Coastguard Worker # NB: sendfile syscall is not supported for SSL sockets and 891*cda5da8dSAndroid Build Coastguard Worker # non-mmap files even if sendfile is supported by OS 892*cda5da8dSAndroid Build Coastguard Worker raise exceptions.SendfileNotAvailableError( 893*cda5da8dSAndroid Build Coastguard Worker f"syscall sendfile is not available for socket {sock!r} " 894*cda5da8dSAndroid Build Coastguard Worker f"and file {file!r} combination") 895*cda5da8dSAndroid Build Coastguard Worker 896*cda5da8dSAndroid Build Coastguard Worker async def _sock_sendfile_fallback(self, sock, file, offset, count): 897*cda5da8dSAndroid Build Coastguard Worker if offset: 898*cda5da8dSAndroid Build Coastguard Worker file.seek(offset) 899*cda5da8dSAndroid Build Coastguard Worker blocksize = ( 900*cda5da8dSAndroid Build Coastguard Worker min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 901*cda5da8dSAndroid Build Coastguard Worker if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 902*cda5da8dSAndroid Build Coastguard Worker ) 903*cda5da8dSAndroid Build Coastguard Worker buf = bytearray(blocksize) 904*cda5da8dSAndroid Build Coastguard Worker total_sent = 0 905*cda5da8dSAndroid Build Coastguard Worker try: 906*cda5da8dSAndroid Build Coastguard Worker while True: 907*cda5da8dSAndroid Build Coastguard Worker if count: 908*cda5da8dSAndroid Build Coastguard Worker blocksize = min(count - total_sent, blocksize) 909*cda5da8dSAndroid Build Coastguard Worker if blocksize <= 0: 910*cda5da8dSAndroid Build Coastguard Worker break 911*cda5da8dSAndroid Build Coastguard Worker view = memoryview(buf)[:blocksize] 912*cda5da8dSAndroid Build Coastguard Worker read = await self.run_in_executor(None, file.readinto, view) 913*cda5da8dSAndroid Build Coastguard Worker if not read: 914*cda5da8dSAndroid Build Coastguard Worker break # EOF 915*cda5da8dSAndroid Build Coastguard Worker await self.sock_sendall(sock, view[:read]) 916*cda5da8dSAndroid Build Coastguard Worker total_sent += read 917*cda5da8dSAndroid Build Coastguard Worker return total_sent 918*cda5da8dSAndroid Build Coastguard Worker finally: 919*cda5da8dSAndroid Build Coastguard Worker if total_sent > 0 and hasattr(file, 'seek'): 920*cda5da8dSAndroid Build Coastguard Worker file.seek(offset + total_sent) 921*cda5da8dSAndroid Build Coastguard Worker 922*cda5da8dSAndroid Build Coastguard Worker def _check_sendfile_params(self, sock, file, offset, count): 923*cda5da8dSAndroid Build Coastguard Worker if 'b' not in getattr(file, 'mode', 'b'): 924*cda5da8dSAndroid Build Coastguard Worker raise ValueError("file should be opened in binary mode") 925*cda5da8dSAndroid Build Coastguard Worker if not sock.type == socket.SOCK_STREAM: 926*cda5da8dSAndroid Build Coastguard Worker raise ValueError("only SOCK_STREAM type sockets are supported") 927*cda5da8dSAndroid Build Coastguard Worker if count is not None: 928*cda5da8dSAndroid Build Coastguard Worker if not isinstance(count, int): 929*cda5da8dSAndroid Build Coastguard Worker raise TypeError( 930*cda5da8dSAndroid Build Coastguard Worker "count must be a positive integer (got {!r})".format(count)) 931*cda5da8dSAndroid Build Coastguard Worker if count <= 0: 932*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 933*cda5da8dSAndroid Build Coastguard Worker "count must be a positive integer (got {!r})".format(count)) 934*cda5da8dSAndroid Build Coastguard Worker if not isinstance(offset, int): 935*cda5da8dSAndroid Build Coastguard Worker raise TypeError( 936*cda5da8dSAndroid Build Coastguard Worker "offset must be a non-negative integer (got {!r})".format( 937*cda5da8dSAndroid Build Coastguard Worker offset)) 938*cda5da8dSAndroid Build Coastguard Worker if offset < 0: 939*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 940*cda5da8dSAndroid Build Coastguard Worker "offset must be a non-negative integer (got {!r})".format( 941*cda5da8dSAndroid Build Coastguard Worker offset)) 942*cda5da8dSAndroid Build Coastguard Worker 943*cda5da8dSAndroid Build Coastguard Worker async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 944*cda5da8dSAndroid Build Coastguard Worker """Create, bind and connect one socket.""" 945*cda5da8dSAndroid Build Coastguard Worker my_exceptions = [] 946*cda5da8dSAndroid Build Coastguard Worker exceptions.append(my_exceptions) 947*cda5da8dSAndroid Build Coastguard Worker family, type_, proto, _, address = addr_info 948*cda5da8dSAndroid Build Coastguard Worker sock = None 949*cda5da8dSAndroid Build Coastguard Worker try: 950*cda5da8dSAndroid Build Coastguard Worker sock = socket.socket(family=family, type=type_, proto=proto) 951*cda5da8dSAndroid Build Coastguard Worker sock.setblocking(False) 952*cda5da8dSAndroid Build Coastguard Worker if local_addr_infos is not None: 953*cda5da8dSAndroid Build Coastguard Worker for lfamily, _, _, _, laddr in local_addr_infos: 954*cda5da8dSAndroid Build Coastguard Worker # skip local addresses of different family 955*cda5da8dSAndroid Build Coastguard Worker if lfamily != family: 956*cda5da8dSAndroid Build Coastguard Worker continue 957*cda5da8dSAndroid Build Coastguard Worker try: 958*cda5da8dSAndroid Build Coastguard Worker sock.bind(laddr) 959*cda5da8dSAndroid Build Coastguard Worker break 960*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 961*cda5da8dSAndroid Build Coastguard Worker msg = ( 962*cda5da8dSAndroid Build Coastguard Worker f'error while attempting to bind on ' 963*cda5da8dSAndroid Build Coastguard Worker f'address {laddr!r}: ' 964*cda5da8dSAndroid Build Coastguard Worker f'{exc.strerror.lower()}' 965*cda5da8dSAndroid Build Coastguard Worker ) 966*cda5da8dSAndroid Build Coastguard Worker exc = OSError(exc.errno, msg) 967*cda5da8dSAndroid Build Coastguard Worker my_exceptions.append(exc) 968*cda5da8dSAndroid Build Coastguard Worker else: # all bind attempts failed 969*cda5da8dSAndroid Build Coastguard Worker if my_exceptions: 970*cda5da8dSAndroid Build Coastguard Worker raise my_exceptions.pop() 971*cda5da8dSAndroid Build Coastguard Worker else: 972*cda5da8dSAndroid Build Coastguard Worker raise OSError(f"no matching local address with {family=} found") 973*cda5da8dSAndroid Build Coastguard Worker await self.sock_connect(sock, address) 974*cda5da8dSAndroid Build Coastguard Worker return sock 975*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 976*cda5da8dSAndroid Build Coastguard Worker my_exceptions.append(exc) 977*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 978*cda5da8dSAndroid Build Coastguard Worker sock.close() 979*cda5da8dSAndroid Build Coastguard Worker raise 980*cda5da8dSAndroid Build Coastguard Worker except: 981*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 982*cda5da8dSAndroid Build Coastguard Worker sock.close() 983*cda5da8dSAndroid Build Coastguard Worker raise 984*cda5da8dSAndroid Build Coastguard Worker finally: 985*cda5da8dSAndroid Build Coastguard Worker exceptions = my_exceptions = None 986*cda5da8dSAndroid Build Coastguard Worker 987*cda5da8dSAndroid Build Coastguard Worker async def create_connection( 988*cda5da8dSAndroid Build Coastguard Worker self, protocol_factory, host=None, port=None, 989*cda5da8dSAndroid Build Coastguard Worker *, ssl=None, family=0, 990*cda5da8dSAndroid Build Coastguard Worker proto=0, flags=0, sock=None, 991*cda5da8dSAndroid Build Coastguard Worker local_addr=None, server_hostname=None, 992*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=None, 993*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=None, 994*cda5da8dSAndroid Build Coastguard Worker happy_eyeballs_delay=None, interleave=None): 995*cda5da8dSAndroid Build Coastguard Worker """Connect to a TCP server. 996*cda5da8dSAndroid Build Coastguard Worker 997*cda5da8dSAndroid Build Coastguard Worker Create a streaming transport connection to a given internet host and 998*cda5da8dSAndroid Build Coastguard Worker port: socket family AF_INET or socket.AF_INET6 depending on host (or 999*cda5da8dSAndroid Build Coastguard Worker family if specified), socket type SOCK_STREAM. protocol_factory must be 1000*cda5da8dSAndroid Build Coastguard Worker a callable returning a protocol instance. 1001*cda5da8dSAndroid Build Coastguard Worker 1002*cda5da8dSAndroid Build Coastguard Worker This method is a coroutine which will try to establish the connection 1003*cda5da8dSAndroid Build Coastguard Worker in the background. When successful, the coroutine returns a 1004*cda5da8dSAndroid Build Coastguard Worker (transport, protocol) pair. 1005*cda5da8dSAndroid Build Coastguard Worker """ 1006*cda5da8dSAndroid Build Coastguard Worker if server_hostname is not None and not ssl: 1007*cda5da8dSAndroid Build Coastguard Worker raise ValueError('server_hostname is only meaningful with ssl') 1008*cda5da8dSAndroid Build Coastguard Worker 1009*cda5da8dSAndroid Build Coastguard Worker if server_hostname is None and ssl: 1010*cda5da8dSAndroid Build Coastguard Worker # Use host as default for server_hostname. It is an error 1011*cda5da8dSAndroid Build Coastguard Worker # if host is empty or not set, e.g. when an 1012*cda5da8dSAndroid Build Coastguard Worker # already-connected socket was passed or when only a port 1013*cda5da8dSAndroid Build Coastguard Worker # is given. To avoid this error, you can pass 1014*cda5da8dSAndroid Build Coastguard Worker # server_hostname='' -- this will bypass the hostname 1015*cda5da8dSAndroid Build Coastguard Worker # check. (This also means that if host is a numeric 1016*cda5da8dSAndroid Build Coastguard Worker # IP/IPv6 address, we will attempt to verify that exact 1017*cda5da8dSAndroid Build Coastguard Worker # address; this will probably fail, but it is possible to 1018*cda5da8dSAndroid Build Coastguard Worker # create a certificate for a specific IP address, so we 1019*cda5da8dSAndroid Build Coastguard Worker # don't judge it here.) 1020*cda5da8dSAndroid Build Coastguard Worker if not host: 1021*cda5da8dSAndroid Build Coastguard Worker raise ValueError('You must set server_hostname ' 1022*cda5da8dSAndroid Build Coastguard Worker 'when using ssl without a host') 1023*cda5da8dSAndroid Build Coastguard Worker server_hostname = host 1024*cda5da8dSAndroid Build Coastguard Worker 1025*cda5da8dSAndroid Build Coastguard Worker if ssl_handshake_timeout is not None and not ssl: 1026*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1027*cda5da8dSAndroid Build Coastguard Worker 'ssl_handshake_timeout is only meaningful with ssl') 1028*cda5da8dSAndroid Build Coastguard Worker 1029*cda5da8dSAndroid Build Coastguard Worker if ssl_shutdown_timeout is not None and not ssl: 1030*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1031*cda5da8dSAndroid Build Coastguard Worker 'ssl_shutdown_timeout is only meaningful with ssl') 1032*cda5da8dSAndroid Build Coastguard Worker 1033*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 1034*cda5da8dSAndroid Build Coastguard Worker _check_ssl_socket(sock) 1035*cda5da8dSAndroid Build Coastguard Worker 1036*cda5da8dSAndroid Build Coastguard Worker if happy_eyeballs_delay is not None and interleave is None: 1037*cda5da8dSAndroid Build Coastguard Worker # If using happy eyeballs, default to interleave addresses by family 1038*cda5da8dSAndroid Build Coastguard Worker interleave = 1 1039*cda5da8dSAndroid Build Coastguard Worker 1040*cda5da8dSAndroid Build Coastguard Worker if host is not None or port is not None: 1041*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 1042*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1043*cda5da8dSAndroid Build Coastguard Worker 'host/port and sock can not be specified at the same time') 1044*cda5da8dSAndroid Build Coastguard Worker 1045*cda5da8dSAndroid Build Coastguard Worker infos = await self._ensure_resolved( 1046*cda5da8dSAndroid Build Coastguard Worker (host, port), family=family, 1047*cda5da8dSAndroid Build Coastguard Worker type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 1048*cda5da8dSAndroid Build Coastguard Worker if not infos: 1049*cda5da8dSAndroid Build Coastguard Worker raise OSError('getaddrinfo() returned empty list') 1050*cda5da8dSAndroid Build Coastguard Worker 1051*cda5da8dSAndroid Build Coastguard Worker if local_addr is not None: 1052*cda5da8dSAndroid Build Coastguard Worker laddr_infos = await self._ensure_resolved( 1053*cda5da8dSAndroid Build Coastguard Worker local_addr, family=family, 1054*cda5da8dSAndroid Build Coastguard Worker type=socket.SOCK_STREAM, proto=proto, 1055*cda5da8dSAndroid Build Coastguard Worker flags=flags, loop=self) 1056*cda5da8dSAndroid Build Coastguard Worker if not laddr_infos: 1057*cda5da8dSAndroid Build Coastguard Worker raise OSError('getaddrinfo() returned empty list') 1058*cda5da8dSAndroid Build Coastguard Worker else: 1059*cda5da8dSAndroid Build Coastguard Worker laddr_infos = None 1060*cda5da8dSAndroid Build Coastguard Worker 1061*cda5da8dSAndroid Build Coastguard Worker if interleave: 1062*cda5da8dSAndroid Build Coastguard Worker infos = _interleave_addrinfos(infos, interleave) 1063*cda5da8dSAndroid Build Coastguard Worker 1064*cda5da8dSAndroid Build Coastguard Worker exceptions = [] 1065*cda5da8dSAndroid Build Coastguard Worker if happy_eyeballs_delay is None: 1066*cda5da8dSAndroid Build Coastguard Worker # not using happy eyeballs 1067*cda5da8dSAndroid Build Coastguard Worker for addrinfo in infos: 1068*cda5da8dSAndroid Build Coastguard Worker try: 1069*cda5da8dSAndroid Build Coastguard Worker sock = await self._connect_sock( 1070*cda5da8dSAndroid Build Coastguard Worker exceptions, addrinfo, laddr_infos) 1071*cda5da8dSAndroid Build Coastguard Worker break 1072*cda5da8dSAndroid Build Coastguard Worker except OSError: 1073*cda5da8dSAndroid Build Coastguard Worker continue 1074*cda5da8dSAndroid Build Coastguard Worker else: # using happy eyeballs 1075*cda5da8dSAndroid Build Coastguard Worker sock, _, _ = await staggered.staggered_race( 1076*cda5da8dSAndroid Build Coastguard Worker (functools.partial(self._connect_sock, 1077*cda5da8dSAndroid Build Coastguard Worker exceptions, addrinfo, laddr_infos) 1078*cda5da8dSAndroid Build Coastguard Worker for addrinfo in infos), 1079*cda5da8dSAndroid Build Coastguard Worker happy_eyeballs_delay, loop=self) 1080*cda5da8dSAndroid Build Coastguard Worker 1081*cda5da8dSAndroid Build Coastguard Worker if sock is None: 1082*cda5da8dSAndroid Build Coastguard Worker exceptions = [exc for sub in exceptions for exc in sub] 1083*cda5da8dSAndroid Build Coastguard Worker try: 1084*cda5da8dSAndroid Build Coastguard Worker if len(exceptions) == 1: 1085*cda5da8dSAndroid Build Coastguard Worker raise exceptions[0] 1086*cda5da8dSAndroid Build Coastguard Worker else: 1087*cda5da8dSAndroid Build Coastguard Worker # If they all have the same str(), raise one. 1088*cda5da8dSAndroid Build Coastguard Worker model = str(exceptions[0]) 1089*cda5da8dSAndroid Build Coastguard Worker if all(str(exc) == model for exc in exceptions): 1090*cda5da8dSAndroid Build Coastguard Worker raise exceptions[0] 1091*cda5da8dSAndroid Build Coastguard Worker # Raise a combined exception so the user can see all 1092*cda5da8dSAndroid Build Coastguard Worker # the various error messages. 1093*cda5da8dSAndroid Build Coastguard Worker raise OSError('Multiple exceptions: {}'.format( 1094*cda5da8dSAndroid Build Coastguard Worker ', '.join(str(exc) for exc in exceptions))) 1095*cda5da8dSAndroid Build Coastguard Worker finally: 1096*cda5da8dSAndroid Build Coastguard Worker exceptions = None 1097*cda5da8dSAndroid Build Coastguard Worker 1098*cda5da8dSAndroid Build Coastguard Worker else: 1099*cda5da8dSAndroid Build Coastguard Worker if sock is None: 1100*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1101*cda5da8dSAndroid Build Coastguard Worker 'host and port was not specified and no sock specified') 1102*cda5da8dSAndroid Build Coastguard Worker if sock.type != socket.SOCK_STREAM: 1103*cda5da8dSAndroid Build Coastguard Worker # We allow AF_INET, AF_INET6, AF_UNIX as long as they 1104*cda5da8dSAndroid Build Coastguard Worker # are SOCK_STREAM. 1105*cda5da8dSAndroid Build Coastguard Worker # We support passing AF_UNIX sockets even though we have 1106*cda5da8dSAndroid Build Coastguard Worker # a dedicated API for that: create_unix_connection. 1107*cda5da8dSAndroid Build Coastguard Worker # Disallowing AF_UNIX in this method, breaks backwards 1108*cda5da8dSAndroid Build Coastguard Worker # compatibility. 1109*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1110*cda5da8dSAndroid Build Coastguard Worker f'A Stream Socket was expected, got {sock!r}') 1111*cda5da8dSAndroid Build Coastguard Worker 1112*cda5da8dSAndroid Build Coastguard Worker transport, protocol = await self._create_connection_transport( 1113*cda5da8dSAndroid Build Coastguard Worker sock, protocol_factory, ssl, server_hostname, 1114*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=ssl_handshake_timeout, 1115*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=ssl_shutdown_timeout) 1116*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1117*cda5da8dSAndroid Build Coastguard Worker # Get the socket from the transport because SSL transport closes 1118*cda5da8dSAndroid Build Coastguard Worker # the old socket and creates a new SSL socket 1119*cda5da8dSAndroid Build Coastguard Worker sock = transport.get_extra_info('socket') 1120*cda5da8dSAndroid Build Coastguard Worker logger.debug("%r connected to %s:%r: (%r, %r)", 1121*cda5da8dSAndroid Build Coastguard Worker sock, host, port, transport, protocol) 1122*cda5da8dSAndroid Build Coastguard Worker return transport, protocol 1123*cda5da8dSAndroid Build Coastguard Worker 1124*cda5da8dSAndroid Build Coastguard Worker async def _create_connection_transport( 1125*cda5da8dSAndroid Build Coastguard Worker self, sock, protocol_factory, ssl, 1126*cda5da8dSAndroid Build Coastguard Worker server_hostname, server_side=False, 1127*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=None, 1128*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=None): 1129*cda5da8dSAndroid Build Coastguard Worker 1130*cda5da8dSAndroid Build Coastguard Worker sock.setblocking(False) 1131*cda5da8dSAndroid Build Coastguard Worker 1132*cda5da8dSAndroid Build Coastguard Worker protocol = protocol_factory() 1133*cda5da8dSAndroid Build Coastguard Worker waiter = self.create_future() 1134*cda5da8dSAndroid Build Coastguard Worker if ssl: 1135*cda5da8dSAndroid Build Coastguard Worker sslcontext = None if isinstance(ssl, bool) else ssl 1136*cda5da8dSAndroid Build Coastguard Worker transport = self._make_ssl_transport( 1137*cda5da8dSAndroid Build Coastguard Worker sock, protocol, sslcontext, waiter, 1138*cda5da8dSAndroid Build Coastguard Worker server_side=server_side, server_hostname=server_hostname, 1139*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=ssl_handshake_timeout, 1140*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=ssl_shutdown_timeout) 1141*cda5da8dSAndroid Build Coastguard Worker else: 1142*cda5da8dSAndroid Build Coastguard Worker transport = self._make_socket_transport(sock, protocol, waiter) 1143*cda5da8dSAndroid Build Coastguard Worker 1144*cda5da8dSAndroid Build Coastguard Worker try: 1145*cda5da8dSAndroid Build Coastguard Worker await waiter 1146*cda5da8dSAndroid Build Coastguard Worker except: 1147*cda5da8dSAndroid Build Coastguard Worker transport.close() 1148*cda5da8dSAndroid Build Coastguard Worker raise 1149*cda5da8dSAndroid Build Coastguard Worker 1150*cda5da8dSAndroid Build Coastguard Worker return transport, protocol 1151*cda5da8dSAndroid Build Coastguard Worker 1152*cda5da8dSAndroid Build Coastguard Worker async def sendfile(self, transport, file, offset=0, count=None, 1153*cda5da8dSAndroid Build Coastguard Worker *, fallback=True): 1154*cda5da8dSAndroid Build Coastguard Worker """Send a file to transport. 1155*cda5da8dSAndroid Build Coastguard Worker 1156*cda5da8dSAndroid Build Coastguard Worker Return the total number of bytes which were sent. 1157*cda5da8dSAndroid Build Coastguard Worker 1158*cda5da8dSAndroid Build Coastguard Worker The method uses high-performance os.sendfile if available. 1159*cda5da8dSAndroid Build Coastguard Worker 1160*cda5da8dSAndroid Build Coastguard Worker file must be a regular file object opened in binary mode. 1161*cda5da8dSAndroid Build Coastguard Worker 1162*cda5da8dSAndroid Build Coastguard Worker offset tells from where to start reading the file. If specified, 1163*cda5da8dSAndroid Build Coastguard Worker count is the total number of bytes to transmit as opposed to 1164*cda5da8dSAndroid Build Coastguard Worker sending the file until EOF is reached. File position is updated on 1165*cda5da8dSAndroid Build Coastguard Worker return or also in case of error in which case file.tell() 1166*cda5da8dSAndroid Build Coastguard Worker can be used to figure out the number of bytes 1167*cda5da8dSAndroid Build Coastguard Worker which were sent. 1168*cda5da8dSAndroid Build Coastguard Worker 1169*cda5da8dSAndroid Build Coastguard Worker fallback set to True makes asyncio to manually read and send 1170*cda5da8dSAndroid Build Coastguard Worker the file when the platform does not support the sendfile syscall 1171*cda5da8dSAndroid Build Coastguard Worker (e.g. Windows or SSL socket on Unix). 1172*cda5da8dSAndroid Build Coastguard Worker 1173*cda5da8dSAndroid Build Coastguard Worker Raise SendfileNotAvailableError if the system does not support 1174*cda5da8dSAndroid Build Coastguard Worker sendfile syscall and fallback is False. 1175*cda5da8dSAndroid Build Coastguard Worker """ 1176*cda5da8dSAndroid Build Coastguard Worker if transport.is_closing(): 1177*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError("Transport is closing") 1178*cda5da8dSAndroid Build Coastguard Worker mode = getattr(transport, '_sendfile_compatible', 1179*cda5da8dSAndroid Build Coastguard Worker constants._SendfileMode.UNSUPPORTED) 1180*cda5da8dSAndroid Build Coastguard Worker if mode is constants._SendfileMode.UNSUPPORTED: 1181*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError( 1182*cda5da8dSAndroid Build Coastguard Worker f"sendfile is not supported for transport {transport!r}") 1183*cda5da8dSAndroid Build Coastguard Worker if mode is constants._SendfileMode.TRY_NATIVE: 1184*cda5da8dSAndroid Build Coastguard Worker try: 1185*cda5da8dSAndroid Build Coastguard Worker return await self._sendfile_native(transport, file, 1186*cda5da8dSAndroid Build Coastguard Worker offset, count) 1187*cda5da8dSAndroid Build Coastguard Worker except exceptions.SendfileNotAvailableError as exc: 1188*cda5da8dSAndroid Build Coastguard Worker if not fallback: 1189*cda5da8dSAndroid Build Coastguard Worker raise 1190*cda5da8dSAndroid Build Coastguard Worker 1191*cda5da8dSAndroid Build Coastguard Worker if not fallback: 1192*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError( 1193*cda5da8dSAndroid Build Coastguard Worker f"fallback is disabled and native sendfile is not " 1194*cda5da8dSAndroid Build Coastguard Worker f"supported for transport {transport!r}") 1195*cda5da8dSAndroid Build Coastguard Worker 1196*cda5da8dSAndroid Build Coastguard Worker return await self._sendfile_fallback(transport, file, 1197*cda5da8dSAndroid Build Coastguard Worker offset, count) 1198*cda5da8dSAndroid Build Coastguard Worker 1199*cda5da8dSAndroid Build Coastguard Worker async def _sendfile_native(self, transp, file, offset, count): 1200*cda5da8dSAndroid Build Coastguard Worker raise exceptions.SendfileNotAvailableError( 1201*cda5da8dSAndroid Build Coastguard Worker "sendfile syscall is not supported") 1202*cda5da8dSAndroid Build Coastguard Worker 1203*cda5da8dSAndroid Build Coastguard Worker async def _sendfile_fallback(self, transp, file, offset, count): 1204*cda5da8dSAndroid Build Coastguard Worker if offset: 1205*cda5da8dSAndroid Build Coastguard Worker file.seek(offset) 1206*cda5da8dSAndroid Build Coastguard Worker blocksize = min(count, 16384) if count else 16384 1207*cda5da8dSAndroid Build Coastguard Worker buf = bytearray(blocksize) 1208*cda5da8dSAndroid Build Coastguard Worker total_sent = 0 1209*cda5da8dSAndroid Build Coastguard Worker proto = _SendfileFallbackProtocol(transp) 1210*cda5da8dSAndroid Build Coastguard Worker try: 1211*cda5da8dSAndroid Build Coastguard Worker while True: 1212*cda5da8dSAndroid Build Coastguard Worker if count: 1213*cda5da8dSAndroid Build Coastguard Worker blocksize = min(count - total_sent, blocksize) 1214*cda5da8dSAndroid Build Coastguard Worker if blocksize <= 0: 1215*cda5da8dSAndroid Build Coastguard Worker return total_sent 1216*cda5da8dSAndroid Build Coastguard Worker view = memoryview(buf)[:blocksize] 1217*cda5da8dSAndroid Build Coastguard Worker read = await self.run_in_executor(None, file.readinto, view) 1218*cda5da8dSAndroid Build Coastguard Worker if not read: 1219*cda5da8dSAndroid Build Coastguard Worker return total_sent # EOF 1220*cda5da8dSAndroid Build Coastguard Worker await proto.drain() 1221*cda5da8dSAndroid Build Coastguard Worker transp.write(view[:read]) 1222*cda5da8dSAndroid Build Coastguard Worker total_sent += read 1223*cda5da8dSAndroid Build Coastguard Worker finally: 1224*cda5da8dSAndroid Build Coastguard Worker if total_sent > 0 and hasattr(file, 'seek'): 1225*cda5da8dSAndroid Build Coastguard Worker file.seek(offset + total_sent) 1226*cda5da8dSAndroid Build Coastguard Worker await proto.restore() 1227*cda5da8dSAndroid Build Coastguard Worker 1228*cda5da8dSAndroid Build Coastguard Worker async def start_tls(self, transport, protocol, sslcontext, *, 1229*cda5da8dSAndroid Build Coastguard Worker server_side=False, 1230*cda5da8dSAndroid Build Coastguard Worker server_hostname=None, 1231*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=None, 1232*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=None): 1233*cda5da8dSAndroid Build Coastguard Worker """Upgrade transport to TLS. 1234*cda5da8dSAndroid Build Coastguard Worker 1235*cda5da8dSAndroid Build Coastguard Worker Return a new transport that *protocol* should start using 1236*cda5da8dSAndroid Build Coastguard Worker immediately. 1237*cda5da8dSAndroid Build Coastguard Worker """ 1238*cda5da8dSAndroid Build Coastguard Worker if ssl is None: 1239*cda5da8dSAndroid Build Coastguard Worker raise RuntimeError('Python ssl module is not available') 1240*cda5da8dSAndroid Build Coastguard Worker 1241*cda5da8dSAndroid Build Coastguard Worker if not isinstance(sslcontext, ssl.SSLContext): 1242*cda5da8dSAndroid Build Coastguard Worker raise TypeError( 1243*cda5da8dSAndroid Build Coastguard Worker f'sslcontext is expected to be an instance of ssl.SSLContext, ' 1244*cda5da8dSAndroid Build Coastguard Worker f'got {sslcontext!r}') 1245*cda5da8dSAndroid Build Coastguard Worker 1246*cda5da8dSAndroid Build Coastguard Worker if not getattr(transport, '_start_tls_compatible', False): 1247*cda5da8dSAndroid Build Coastguard Worker raise TypeError( 1248*cda5da8dSAndroid Build Coastguard Worker f'transport {transport!r} is not supported by start_tls()') 1249*cda5da8dSAndroid Build Coastguard Worker 1250*cda5da8dSAndroid Build Coastguard Worker waiter = self.create_future() 1251*cda5da8dSAndroid Build Coastguard Worker ssl_protocol = sslproto.SSLProtocol( 1252*cda5da8dSAndroid Build Coastguard Worker self, protocol, sslcontext, waiter, 1253*cda5da8dSAndroid Build Coastguard Worker server_side, server_hostname, 1254*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=ssl_handshake_timeout, 1255*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=ssl_shutdown_timeout, 1256*cda5da8dSAndroid Build Coastguard Worker call_connection_made=False) 1257*cda5da8dSAndroid Build Coastguard Worker 1258*cda5da8dSAndroid Build Coastguard Worker # Pause early so that "ssl_protocol.data_received()" doesn't 1259*cda5da8dSAndroid Build Coastguard Worker # have a chance to get called before "ssl_protocol.connection_made()". 1260*cda5da8dSAndroid Build Coastguard Worker transport.pause_reading() 1261*cda5da8dSAndroid Build Coastguard Worker 1262*cda5da8dSAndroid Build Coastguard Worker transport.set_protocol(ssl_protocol) 1263*cda5da8dSAndroid Build Coastguard Worker conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 1264*cda5da8dSAndroid Build Coastguard Worker resume_cb = self.call_soon(transport.resume_reading) 1265*cda5da8dSAndroid Build Coastguard Worker 1266*cda5da8dSAndroid Build Coastguard Worker try: 1267*cda5da8dSAndroid Build Coastguard Worker await waiter 1268*cda5da8dSAndroid Build Coastguard Worker except BaseException: 1269*cda5da8dSAndroid Build Coastguard Worker transport.close() 1270*cda5da8dSAndroid Build Coastguard Worker conmade_cb.cancel() 1271*cda5da8dSAndroid Build Coastguard Worker resume_cb.cancel() 1272*cda5da8dSAndroid Build Coastguard Worker raise 1273*cda5da8dSAndroid Build Coastguard Worker 1274*cda5da8dSAndroid Build Coastguard Worker return ssl_protocol._app_transport 1275*cda5da8dSAndroid Build Coastguard Worker 1276*cda5da8dSAndroid Build Coastguard Worker async def create_datagram_endpoint(self, protocol_factory, 1277*cda5da8dSAndroid Build Coastguard Worker local_addr=None, remote_addr=None, *, 1278*cda5da8dSAndroid Build Coastguard Worker family=0, proto=0, flags=0, 1279*cda5da8dSAndroid Build Coastguard Worker reuse_port=None, 1280*cda5da8dSAndroid Build Coastguard Worker allow_broadcast=None, sock=None): 1281*cda5da8dSAndroid Build Coastguard Worker """Create datagram connection.""" 1282*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 1283*cda5da8dSAndroid Build Coastguard Worker if sock.type != socket.SOCK_DGRAM: 1284*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1285*cda5da8dSAndroid Build Coastguard Worker f'A UDP Socket was expected, got {sock!r}') 1286*cda5da8dSAndroid Build Coastguard Worker if (local_addr or remote_addr or 1287*cda5da8dSAndroid Build Coastguard Worker family or proto or flags or 1288*cda5da8dSAndroid Build Coastguard Worker reuse_port or allow_broadcast): 1289*cda5da8dSAndroid Build Coastguard Worker # show the problematic kwargs in exception msg 1290*cda5da8dSAndroid Build Coastguard Worker opts = dict(local_addr=local_addr, remote_addr=remote_addr, 1291*cda5da8dSAndroid Build Coastguard Worker family=family, proto=proto, flags=flags, 1292*cda5da8dSAndroid Build Coastguard Worker reuse_port=reuse_port, 1293*cda5da8dSAndroid Build Coastguard Worker allow_broadcast=allow_broadcast) 1294*cda5da8dSAndroid Build Coastguard Worker problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 1295*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1296*cda5da8dSAndroid Build Coastguard Worker f'socket modifier keyword arguments can not be used ' 1297*cda5da8dSAndroid Build Coastguard Worker f'when sock is specified. ({problems})') 1298*cda5da8dSAndroid Build Coastguard Worker sock.setblocking(False) 1299*cda5da8dSAndroid Build Coastguard Worker r_addr = None 1300*cda5da8dSAndroid Build Coastguard Worker else: 1301*cda5da8dSAndroid Build Coastguard Worker if not (local_addr or remote_addr): 1302*cda5da8dSAndroid Build Coastguard Worker if family == 0: 1303*cda5da8dSAndroid Build Coastguard Worker raise ValueError('unexpected address family') 1304*cda5da8dSAndroid Build Coastguard Worker addr_pairs_info = (((family, proto), (None, None)),) 1305*cda5da8dSAndroid Build Coastguard Worker elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 1306*cda5da8dSAndroid Build Coastguard Worker for addr in (local_addr, remote_addr): 1307*cda5da8dSAndroid Build Coastguard Worker if addr is not None and not isinstance(addr, str): 1308*cda5da8dSAndroid Build Coastguard Worker raise TypeError('string is expected') 1309*cda5da8dSAndroid Build Coastguard Worker 1310*cda5da8dSAndroid Build Coastguard Worker if local_addr and local_addr[0] not in (0, '\x00'): 1311*cda5da8dSAndroid Build Coastguard Worker try: 1312*cda5da8dSAndroid Build Coastguard Worker if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1313*cda5da8dSAndroid Build Coastguard Worker os.remove(local_addr) 1314*cda5da8dSAndroid Build Coastguard Worker except FileNotFoundError: 1315*cda5da8dSAndroid Build Coastguard Worker pass 1316*cda5da8dSAndroid Build Coastguard Worker except OSError as err: 1317*cda5da8dSAndroid Build Coastguard Worker # Directory may have permissions only to create socket. 1318*cda5da8dSAndroid Build Coastguard Worker logger.error('Unable to check or remove stale UNIX ' 1319*cda5da8dSAndroid Build Coastguard Worker 'socket %r: %r', 1320*cda5da8dSAndroid Build Coastguard Worker local_addr, err) 1321*cda5da8dSAndroid Build Coastguard Worker 1322*cda5da8dSAndroid Build Coastguard Worker addr_pairs_info = (((family, proto), 1323*cda5da8dSAndroid Build Coastguard Worker (local_addr, remote_addr)), ) 1324*cda5da8dSAndroid Build Coastguard Worker else: 1325*cda5da8dSAndroid Build Coastguard Worker # join address by (family, protocol) 1326*cda5da8dSAndroid Build Coastguard Worker addr_infos = {} # Using order preserving dict 1327*cda5da8dSAndroid Build Coastguard Worker for idx, addr in ((0, local_addr), (1, remote_addr)): 1328*cda5da8dSAndroid Build Coastguard Worker if addr is not None: 1329*cda5da8dSAndroid Build Coastguard Worker if not (isinstance(addr, tuple) and len(addr) == 2): 1330*cda5da8dSAndroid Build Coastguard Worker raise TypeError('2-tuple is expected') 1331*cda5da8dSAndroid Build Coastguard Worker 1332*cda5da8dSAndroid Build Coastguard Worker infos = await self._ensure_resolved( 1333*cda5da8dSAndroid Build Coastguard Worker addr, family=family, type=socket.SOCK_DGRAM, 1334*cda5da8dSAndroid Build Coastguard Worker proto=proto, flags=flags, loop=self) 1335*cda5da8dSAndroid Build Coastguard Worker if not infos: 1336*cda5da8dSAndroid Build Coastguard Worker raise OSError('getaddrinfo() returned empty list') 1337*cda5da8dSAndroid Build Coastguard Worker 1338*cda5da8dSAndroid Build Coastguard Worker for fam, _, pro, _, address in infos: 1339*cda5da8dSAndroid Build Coastguard Worker key = (fam, pro) 1340*cda5da8dSAndroid Build Coastguard Worker if key not in addr_infos: 1341*cda5da8dSAndroid Build Coastguard Worker addr_infos[key] = [None, None] 1342*cda5da8dSAndroid Build Coastguard Worker addr_infos[key][idx] = address 1343*cda5da8dSAndroid Build Coastguard Worker 1344*cda5da8dSAndroid Build Coastguard Worker # each addr has to have info for each (family, proto) pair 1345*cda5da8dSAndroid Build Coastguard Worker addr_pairs_info = [ 1346*cda5da8dSAndroid Build Coastguard Worker (key, addr_pair) for key, addr_pair in addr_infos.items() 1347*cda5da8dSAndroid Build Coastguard Worker if not ((local_addr and addr_pair[0] is None) or 1348*cda5da8dSAndroid Build Coastguard Worker (remote_addr and addr_pair[1] is None))] 1349*cda5da8dSAndroid Build Coastguard Worker 1350*cda5da8dSAndroid Build Coastguard Worker if not addr_pairs_info: 1351*cda5da8dSAndroid Build Coastguard Worker raise ValueError('can not get address information') 1352*cda5da8dSAndroid Build Coastguard Worker 1353*cda5da8dSAndroid Build Coastguard Worker exceptions = [] 1354*cda5da8dSAndroid Build Coastguard Worker 1355*cda5da8dSAndroid Build Coastguard Worker for ((family, proto), 1356*cda5da8dSAndroid Build Coastguard Worker (local_address, remote_address)) in addr_pairs_info: 1357*cda5da8dSAndroid Build Coastguard Worker sock = None 1358*cda5da8dSAndroid Build Coastguard Worker r_addr = None 1359*cda5da8dSAndroid Build Coastguard Worker try: 1360*cda5da8dSAndroid Build Coastguard Worker sock = socket.socket( 1361*cda5da8dSAndroid Build Coastguard Worker family=family, type=socket.SOCK_DGRAM, proto=proto) 1362*cda5da8dSAndroid Build Coastguard Worker if reuse_port: 1363*cda5da8dSAndroid Build Coastguard Worker _set_reuseport(sock) 1364*cda5da8dSAndroid Build Coastguard Worker if allow_broadcast: 1365*cda5da8dSAndroid Build Coastguard Worker sock.setsockopt( 1366*cda5da8dSAndroid Build Coastguard Worker socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1367*cda5da8dSAndroid Build Coastguard Worker sock.setblocking(False) 1368*cda5da8dSAndroid Build Coastguard Worker 1369*cda5da8dSAndroid Build Coastguard Worker if local_addr: 1370*cda5da8dSAndroid Build Coastguard Worker sock.bind(local_address) 1371*cda5da8dSAndroid Build Coastguard Worker if remote_addr: 1372*cda5da8dSAndroid Build Coastguard Worker if not allow_broadcast: 1373*cda5da8dSAndroid Build Coastguard Worker await self.sock_connect(sock, remote_address) 1374*cda5da8dSAndroid Build Coastguard Worker r_addr = remote_address 1375*cda5da8dSAndroid Build Coastguard Worker except OSError as exc: 1376*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 1377*cda5da8dSAndroid Build Coastguard Worker sock.close() 1378*cda5da8dSAndroid Build Coastguard Worker exceptions.append(exc) 1379*cda5da8dSAndroid Build Coastguard Worker except: 1380*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 1381*cda5da8dSAndroid Build Coastguard Worker sock.close() 1382*cda5da8dSAndroid Build Coastguard Worker raise 1383*cda5da8dSAndroid Build Coastguard Worker else: 1384*cda5da8dSAndroid Build Coastguard Worker break 1385*cda5da8dSAndroid Build Coastguard Worker else: 1386*cda5da8dSAndroid Build Coastguard Worker raise exceptions[0] 1387*cda5da8dSAndroid Build Coastguard Worker 1388*cda5da8dSAndroid Build Coastguard Worker protocol = protocol_factory() 1389*cda5da8dSAndroid Build Coastguard Worker waiter = self.create_future() 1390*cda5da8dSAndroid Build Coastguard Worker transport = self._make_datagram_transport( 1391*cda5da8dSAndroid Build Coastguard Worker sock, protocol, r_addr, waiter) 1392*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1393*cda5da8dSAndroid Build Coastguard Worker if local_addr: 1394*cda5da8dSAndroid Build Coastguard Worker logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 1395*cda5da8dSAndroid Build Coastguard Worker "created: (%r, %r)", 1396*cda5da8dSAndroid Build Coastguard Worker local_addr, remote_addr, transport, protocol) 1397*cda5da8dSAndroid Build Coastguard Worker else: 1398*cda5da8dSAndroid Build Coastguard Worker logger.debug("Datagram endpoint remote_addr=%r created: " 1399*cda5da8dSAndroid Build Coastguard Worker "(%r, %r)", 1400*cda5da8dSAndroid Build Coastguard Worker remote_addr, transport, protocol) 1401*cda5da8dSAndroid Build Coastguard Worker 1402*cda5da8dSAndroid Build Coastguard Worker try: 1403*cda5da8dSAndroid Build Coastguard Worker await waiter 1404*cda5da8dSAndroid Build Coastguard Worker except: 1405*cda5da8dSAndroid Build Coastguard Worker transport.close() 1406*cda5da8dSAndroid Build Coastguard Worker raise 1407*cda5da8dSAndroid Build Coastguard Worker 1408*cda5da8dSAndroid Build Coastguard Worker return transport, protocol 1409*cda5da8dSAndroid Build Coastguard Worker 1410*cda5da8dSAndroid Build Coastguard Worker async def _ensure_resolved(self, address, *, 1411*cda5da8dSAndroid Build Coastguard Worker family=0, type=socket.SOCK_STREAM, 1412*cda5da8dSAndroid Build Coastguard Worker proto=0, flags=0, loop): 1413*cda5da8dSAndroid Build Coastguard Worker host, port = address[:2] 1414*cda5da8dSAndroid Build Coastguard Worker info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 1415*cda5da8dSAndroid Build Coastguard Worker if info is not None: 1416*cda5da8dSAndroid Build Coastguard Worker # "host" is already a resolved IP. 1417*cda5da8dSAndroid Build Coastguard Worker return [info] 1418*cda5da8dSAndroid Build Coastguard Worker else: 1419*cda5da8dSAndroid Build Coastguard Worker return await loop.getaddrinfo(host, port, family=family, type=type, 1420*cda5da8dSAndroid Build Coastguard Worker proto=proto, flags=flags) 1421*cda5da8dSAndroid Build Coastguard Worker 1422*cda5da8dSAndroid Build Coastguard Worker async def _create_server_getaddrinfo(self, host, port, family, flags): 1423*cda5da8dSAndroid Build Coastguard Worker infos = await self._ensure_resolved((host, port), family=family, 1424*cda5da8dSAndroid Build Coastguard Worker type=socket.SOCK_STREAM, 1425*cda5da8dSAndroid Build Coastguard Worker flags=flags, loop=self) 1426*cda5da8dSAndroid Build Coastguard Worker if not infos: 1427*cda5da8dSAndroid Build Coastguard Worker raise OSError(f'getaddrinfo({host!r}) returned empty list') 1428*cda5da8dSAndroid Build Coastguard Worker return infos 1429*cda5da8dSAndroid Build Coastguard Worker 1430*cda5da8dSAndroid Build Coastguard Worker async def create_server( 1431*cda5da8dSAndroid Build Coastguard Worker self, protocol_factory, host=None, port=None, 1432*cda5da8dSAndroid Build Coastguard Worker *, 1433*cda5da8dSAndroid Build Coastguard Worker family=socket.AF_UNSPEC, 1434*cda5da8dSAndroid Build Coastguard Worker flags=socket.AI_PASSIVE, 1435*cda5da8dSAndroid Build Coastguard Worker sock=None, 1436*cda5da8dSAndroid Build Coastguard Worker backlog=100, 1437*cda5da8dSAndroid Build Coastguard Worker ssl=None, 1438*cda5da8dSAndroid Build Coastguard Worker reuse_address=None, 1439*cda5da8dSAndroid Build Coastguard Worker reuse_port=None, 1440*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=None, 1441*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=None, 1442*cda5da8dSAndroid Build Coastguard Worker start_serving=True): 1443*cda5da8dSAndroid Build Coastguard Worker """Create a TCP server. 1444*cda5da8dSAndroid Build Coastguard Worker 1445*cda5da8dSAndroid Build Coastguard Worker The host parameter can be a string, in that case the TCP server is 1446*cda5da8dSAndroid Build Coastguard Worker bound to host and port. 1447*cda5da8dSAndroid Build Coastguard Worker 1448*cda5da8dSAndroid Build Coastguard Worker The host parameter can also be a sequence of strings and in that case 1449*cda5da8dSAndroid Build Coastguard Worker the TCP server is bound to all hosts of the sequence. If a host 1450*cda5da8dSAndroid Build Coastguard Worker appears multiple times (possibly indirectly e.g. when hostnames 1451*cda5da8dSAndroid Build Coastguard Worker resolve to the same IP address), the server is only bound once to that 1452*cda5da8dSAndroid Build Coastguard Worker host. 1453*cda5da8dSAndroid Build Coastguard Worker 1454*cda5da8dSAndroid Build Coastguard Worker Return a Server object which can be used to stop the service. 1455*cda5da8dSAndroid Build Coastguard Worker 1456*cda5da8dSAndroid Build Coastguard Worker This method is a coroutine. 1457*cda5da8dSAndroid Build Coastguard Worker """ 1458*cda5da8dSAndroid Build Coastguard Worker if isinstance(ssl, bool): 1459*cda5da8dSAndroid Build Coastguard Worker raise TypeError('ssl argument must be an SSLContext or None') 1460*cda5da8dSAndroid Build Coastguard Worker 1461*cda5da8dSAndroid Build Coastguard Worker if ssl_handshake_timeout is not None and ssl is None: 1462*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1463*cda5da8dSAndroid Build Coastguard Worker 'ssl_handshake_timeout is only meaningful with ssl') 1464*cda5da8dSAndroid Build Coastguard Worker 1465*cda5da8dSAndroid Build Coastguard Worker if ssl_shutdown_timeout is not None and ssl is None: 1466*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1467*cda5da8dSAndroid Build Coastguard Worker 'ssl_shutdown_timeout is only meaningful with ssl') 1468*cda5da8dSAndroid Build Coastguard Worker 1469*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 1470*cda5da8dSAndroid Build Coastguard Worker _check_ssl_socket(sock) 1471*cda5da8dSAndroid Build Coastguard Worker 1472*cda5da8dSAndroid Build Coastguard Worker if host is not None or port is not None: 1473*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 1474*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1475*cda5da8dSAndroid Build Coastguard Worker 'host/port and sock can not be specified at the same time') 1476*cda5da8dSAndroid Build Coastguard Worker 1477*cda5da8dSAndroid Build Coastguard Worker if reuse_address is None: 1478*cda5da8dSAndroid Build Coastguard Worker reuse_address = os.name == "posix" and sys.platform != "cygwin" 1479*cda5da8dSAndroid Build Coastguard Worker sockets = [] 1480*cda5da8dSAndroid Build Coastguard Worker if host == '': 1481*cda5da8dSAndroid Build Coastguard Worker hosts = [None] 1482*cda5da8dSAndroid Build Coastguard Worker elif (isinstance(host, str) or 1483*cda5da8dSAndroid Build Coastguard Worker not isinstance(host, collections.abc.Iterable)): 1484*cda5da8dSAndroid Build Coastguard Worker hosts = [host] 1485*cda5da8dSAndroid Build Coastguard Worker else: 1486*cda5da8dSAndroid Build Coastguard Worker hosts = host 1487*cda5da8dSAndroid Build Coastguard Worker 1488*cda5da8dSAndroid Build Coastguard Worker fs = [self._create_server_getaddrinfo(host, port, family=family, 1489*cda5da8dSAndroid Build Coastguard Worker flags=flags) 1490*cda5da8dSAndroid Build Coastguard Worker for host in hosts] 1491*cda5da8dSAndroid Build Coastguard Worker infos = await tasks.gather(*fs) 1492*cda5da8dSAndroid Build Coastguard Worker infos = set(itertools.chain.from_iterable(infos)) 1493*cda5da8dSAndroid Build Coastguard Worker 1494*cda5da8dSAndroid Build Coastguard Worker completed = False 1495*cda5da8dSAndroid Build Coastguard Worker try: 1496*cda5da8dSAndroid Build Coastguard Worker for res in infos: 1497*cda5da8dSAndroid Build Coastguard Worker af, socktype, proto, canonname, sa = res 1498*cda5da8dSAndroid Build Coastguard Worker try: 1499*cda5da8dSAndroid Build Coastguard Worker sock = socket.socket(af, socktype, proto) 1500*cda5da8dSAndroid Build Coastguard Worker except socket.error: 1501*cda5da8dSAndroid Build Coastguard Worker # Assume it's a bad family/type/protocol combination. 1502*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1503*cda5da8dSAndroid Build Coastguard Worker logger.warning('create_server() failed to create ' 1504*cda5da8dSAndroid Build Coastguard Worker 'socket.socket(%r, %r, %r)', 1505*cda5da8dSAndroid Build Coastguard Worker af, socktype, proto, exc_info=True) 1506*cda5da8dSAndroid Build Coastguard Worker continue 1507*cda5da8dSAndroid Build Coastguard Worker sockets.append(sock) 1508*cda5da8dSAndroid Build Coastguard Worker if reuse_address: 1509*cda5da8dSAndroid Build Coastguard Worker sock.setsockopt( 1510*cda5da8dSAndroid Build Coastguard Worker socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 1511*cda5da8dSAndroid Build Coastguard Worker if reuse_port: 1512*cda5da8dSAndroid Build Coastguard Worker _set_reuseport(sock) 1513*cda5da8dSAndroid Build Coastguard Worker # Disable IPv4/IPv6 dual stack support (enabled by 1514*cda5da8dSAndroid Build Coastguard Worker # default on Linux) which makes a single socket 1515*cda5da8dSAndroid Build Coastguard Worker # listen on both address families. 1516*cda5da8dSAndroid Build Coastguard Worker if (_HAS_IPv6 and 1517*cda5da8dSAndroid Build Coastguard Worker af == socket.AF_INET6 and 1518*cda5da8dSAndroid Build Coastguard Worker hasattr(socket, 'IPPROTO_IPV6')): 1519*cda5da8dSAndroid Build Coastguard Worker sock.setsockopt(socket.IPPROTO_IPV6, 1520*cda5da8dSAndroid Build Coastguard Worker socket.IPV6_V6ONLY, 1521*cda5da8dSAndroid Build Coastguard Worker True) 1522*cda5da8dSAndroid Build Coastguard Worker try: 1523*cda5da8dSAndroid Build Coastguard Worker sock.bind(sa) 1524*cda5da8dSAndroid Build Coastguard Worker except OSError as err: 1525*cda5da8dSAndroid Build Coastguard Worker raise OSError(err.errno, 'error while attempting ' 1526*cda5da8dSAndroid Build Coastguard Worker 'to bind on address %r: %s' 1527*cda5da8dSAndroid Build Coastguard Worker % (sa, err.strerror.lower())) from None 1528*cda5da8dSAndroid Build Coastguard Worker completed = True 1529*cda5da8dSAndroid Build Coastguard Worker finally: 1530*cda5da8dSAndroid Build Coastguard Worker if not completed: 1531*cda5da8dSAndroid Build Coastguard Worker for sock in sockets: 1532*cda5da8dSAndroid Build Coastguard Worker sock.close() 1533*cda5da8dSAndroid Build Coastguard Worker else: 1534*cda5da8dSAndroid Build Coastguard Worker if sock is None: 1535*cda5da8dSAndroid Build Coastguard Worker raise ValueError('Neither host/port nor sock were specified') 1536*cda5da8dSAndroid Build Coastguard Worker if sock.type != socket.SOCK_STREAM: 1537*cda5da8dSAndroid Build Coastguard Worker raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1538*cda5da8dSAndroid Build Coastguard Worker sockets = [sock] 1539*cda5da8dSAndroid Build Coastguard Worker 1540*cda5da8dSAndroid Build Coastguard Worker for sock in sockets: 1541*cda5da8dSAndroid Build Coastguard Worker sock.setblocking(False) 1542*cda5da8dSAndroid Build Coastguard Worker 1543*cda5da8dSAndroid Build Coastguard Worker server = Server(self, sockets, protocol_factory, 1544*cda5da8dSAndroid Build Coastguard Worker ssl, backlog, ssl_handshake_timeout, 1545*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout) 1546*cda5da8dSAndroid Build Coastguard Worker if start_serving: 1547*cda5da8dSAndroid Build Coastguard Worker server._start_serving() 1548*cda5da8dSAndroid Build Coastguard Worker # Skip one loop iteration so that all 'loop.add_reader' 1549*cda5da8dSAndroid Build Coastguard Worker # go through. 1550*cda5da8dSAndroid Build Coastguard Worker await tasks.sleep(0) 1551*cda5da8dSAndroid Build Coastguard Worker 1552*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1553*cda5da8dSAndroid Build Coastguard Worker logger.info("%r is serving", server) 1554*cda5da8dSAndroid Build Coastguard Worker return server 1555*cda5da8dSAndroid Build Coastguard Worker 1556*cda5da8dSAndroid Build Coastguard Worker async def connect_accepted_socket( 1557*cda5da8dSAndroid Build Coastguard Worker self, protocol_factory, sock, 1558*cda5da8dSAndroid Build Coastguard Worker *, ssl=None, 1559*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=None, 1560*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=None): 1561*cda5da8dSAndroid Build Coastguard Worker if sock.type != socket.SOCK_STREAM: 1562*cda5da8dSAndroid Build Coastguard Worker raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1563*cda5da8dSAndroid Build Coastguard Worker 1564*cda5da8dSAndroid Build Coastguard Worker if ssl_handshake_timeout is not None and not ssl: 1565*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1566*cda5da8dSAndroid Build Coastguard Worker 'ssl_handshake_timeout is only meaningful with ssl') 1567*cda5da8dSAndroid Build Coastguard Worker 1568*cda5da8dSAndroid Build Coastguard Worker if ssl_shutdown_timeout is not None and not ssl: 1569*cda5da8dSAndroid Build Coastguard Worker raise ValueError( 1570*cda5da8dSAndroid Build Coastguard Worker 'ssl_shutdown_timeout is only meaningful with ssl') 1571*cda5da8dSAndroid Build Coastguard Worker 1572*cda5da8dSAndroid Build Coastguard Worker if sock is not None: 1573*cda5da8dSAndroid Build Coastguard Worker _check_ssl_socket(sock) 1574*cda5da8dSAndroid Build Coastguard Worker 1575*cda5da8dSAndroid Build Coastguard Worker transport, protocol = await self._create_connection_transport( 1576*cda5da8dSAndroid Build Coastguard Worker sock, protocol_factory, ssl, '', server_side=True, 1577*cda5da8dSAndroid Build Coastguard Worker ssl_handshake_timeout=ssl_handshake_timeout, 1578*cda5da8dSAndroid Build Coastguard Worker ssl_shutdown_timeout=ssl_shutdown_timeout) 1579*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1580*cda5da8dSAndroid Build Coastguard Worker # Get the socket from the transport because SSL transport closes 1581*cda5da8dSAndroid Build Coastguard Worker # the old socket and creates a new SSL socket 1582*cda5da8dSAndroid Build Coastguard Worker sock = transport.get_extra_info('socket') 1583*cda5da8dSAndroid Build Coastguard Worker logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 1584*cda5da8dSAndroid Build Coastguard Worker return transport, protocol 1585*cda5da8dSAndroid Build Coastguard Worker 1586*cda5da8dSAndroid Build Coastguard Worker async def connect_read_pipe(self, protocol_factory, pipe): 1587*cda5da8dSAndroid Build Coastguard Worker protocol = protocol_factory() 1588*cda5da8dSAndroid Build Coastguard Worker waiter = self.create_future() 1589*cda5da8dSAndroid Build Coastguard Worker transport = self._make_read_pipe_transport(pipe, protocol, waiter) 1590*cda5da8dSAndroid Build Coastguard Worker 1591*cda5da8dSAndroid Build Coastguard Worker try: 1592*cda5da8dSAndroid Build Coastguard Worker await waiter 1593*cda5da8dSAndroid Build Coastguard Worker except: 1594*cda5da8dSAndroid Build Coastguard Worker transport.close() 1595*cda5da8dSAndroid Build Coastguard Worker raise 1596*cda5da8dSAndroid Build Coastguard Worker 1597*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1598*cda5da8dSAndroid Build Coastguard Worker logger.debug('Read pipe %r connected: (%r, %r)', 1599*cda5da8dSAndroid Build Coastguard Worker pipe.fileno(), transport, protocol) 1600*cda5da8dSAndroid Build Coastguard Worker return transport, protocol 1601*cda5da8dSAndroid Build Coastguard Worker 1602*cda5da8dSAndroid Build Coastguard Worker async def connect_write_pipe(self, protocol_factory, pipe): 1603*cda5da8dSAndroid Build Coastguard Worker protocol = protocol_factory() 1604*cda5da8dSAndroid Build Coastguard Worker waiter = self.create_future() 1605*cda5da8dSAndroid Build Coastguard Worker transport = self._make_write_pipe_transport(pipe, protocol, waiter) 1606*cda5da8dSAndroid Build Coastguard Worker 1607*cda5da8dSAndroid Build Coastguard Worker try: 1608*cda5da8dSAndroid Build Coastguard Worker await waiter 1609*cda5da8dSAndroid Build Coastguard Worker except: 1610*cda5da8dSAndroid Build Coastguard Worker transport.close() 1611*cda5da8dSAndroid Build Coastguard Worker raise 1612*cda5da8dSAndroid Build Coastguard Worker 1613*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1614*cda5da8dSAndroid Build Coastguard Worker logger.debug('Write pipe %r connected: (%r, %r)', 1615*cda5da8dSAndroid Build Coastguard Worker pipe.fileno(), transport, protocol) 1616*cda5da8dSAndroid Build Coastguard Worker return transport, protocol 1617*cda5da8dSAndroid Build Coastguard Worker 1618*cda5da8dSAndroid Build Coastguard Worker def _log_subprocess(self, msg, stdin, stdout, stderr): 1619*cda5da8dSAndroid Build Coastguard Worker info = [msg] 1620*cda5da8dSAndroid Build Coastguard Worker if stdin is not None: 1621*cda5da8dSAndroid Build Coastguard Worker info.append(f'stdin={_format_pipe(stdin)}') 1622*cda5da8dSAndroid Build Coastguard Worker if stdout is not None and stderr == subprocess.STDOUT: 1623*cda5da8dSAndroid Build Coastguard Worker info.append(f'stdout=stderr={_format_pipe(stdout)}') 1624*cda5da8dSAndroid Build Coastguard Worker else: 1625*cda5da8dSAndroid Build Coastguard Worker if stdout is not None: 1626*cda5da8dSAndroid Build Coastguard Worker info.append(f'stdout={_format_pipe(stdout)}') 1627*cda5da8dSAndroid Build Coastguard Worker if stderr is not None: 1628*cda5da8dSAndroid Build Coastguard Worker info.append(f'stderr={_format_pipe(stderr)}') 1629*cda5da8dSAndroid Build Coastguard Worker logger.debug(' '.join(info)) 1630*cda5da8dSAndroid Build Coastguard Worker 1631*cda5da8dSAndroid Build Coastguard Worker async def subprocess_shell(self, protocol_factory, cmd, *, 1632*cda5da8dSAndroid Build Coastguard Worker stdin=subprocess.PIPE, 1633*cda5da8dSAndroid Build Coastguard Worker stdout=subprocess.PIPE, 1634*cda5da8dSAndroid Build Coastguard Worker stderr=subprocess.PIPE, 1635*cda5da8dSAndroid Build Coastguard Worker universal_newlines=False, 1636*cda5da8dSAndroid Build Coastguard Worker shell=True, bufsize=0, 1637*cda5da8dSAndroid Build Coastguard Worker encoding=None, errors=None, text=None, 1638*cda5da8dSAndroid Build Coastguard Worker **kwargs): 1639*cda5da8dSAndroid Build Coastguard Worker if not isinstance(cmd, (bytes, str)): 1640*cda5da8dSAndroid Build Coastguard Worker raise ValueError("cmd must be a string") 1641*cda5da8dSAndroid Build Coastguard Worker if universal_newlines: 1642*cda5da8dSAndroid Build Coastguard Worker raise ValueError("universal_newlines must be False") 1643*cda5da8dSAndroid Build Coastguard Worker if not shell: 1644*cda5da8dSAndroid Build Coastguard Worker raise ValueError("shell must be True") 1645*cda5da8dSAndroid Build Coastguard Worker if bufsize != 0: 1646*cda5da8dSAndroid Build Coastguard Worker raise ValueError("bufsize must be 0") 1647*cda5da8dSAndroid Build Coastguard Worker if text: 1648*cda5da8dSAndroid Build Coastguard Worker raise ValueError("text must be False") 1649*cda5da8dSAndroid Build Coastguard Worker if encoding is not None: 1650*cda5da8dSAndroid Build Coastguard Worker raise ValueError("encoding must be None") 1651*cda5da8dSAndroid Build Coastguard Worker if errors is not None: 1652*cda5da8dSAndroid Build Coastguard Worker raise ValueError("errors must be None") 1653*cda5da8dSAndroid Build Coastguard Worker 1654*cda5da8dSAndroid Build Coastguard Worker protocol = protocol_factory() 1655*cda5da8dSAndroid Build Coastguard Worker debug_log = None 1656*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1657*cda5da8dSAndroid Build Coastguard Worker # don't log parameters: they may contain sensitive information 1658*cda5da8dSAndroid Build Coastguard Worker # (password) and may be too long 1659*cda5da8dSAndroid Build Coastguard Worker debug_log = 'run shell command %r' % cmd 1660*cda5da8dSAndroid Build Coastguard Worker self._log_subprocess(debug_log, stdin, stdout, stderr) 1661*cda5da8dSAndroid Build Coastguard Worker transport = await self._make_subprocess_transport( 1662*cda5da8dSAndroid Build Coastguard Worker protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 1663*cda5da8dSAndroid Build Coastguard Worker if self._debug and debug_log is not None: 1664*cda5da8dSAndroid Build Coastguard Worker logger.info('%s: %r', debug_log, transport) 1665*cda5da8dSAndroid Build Coastguard Worker return transport, protocol 1666*cda5da8dSAndroid Build Coastguard Worker 1667*cda5da8dSAndroid Build Coastguard Worker async def subprocess_exec(self, protocol_factory, program, *args, 1668*cda5da8dSAndroid Build Coastguard Worker stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1669*cda5da8dSAndroid Build Coastguard Worker stderr=subprocess.PIPE, universal_newlines=False, 1670*cda5da8dSAndroid Build Coastguard Worker shell=False, bufsize=0, 1671*cda5da8dSAndroid Build Coastguard Worker encoding=None, errors=None, text=None, 1672*cda5da8dSAndroid Build Coastguard Worker **kwargs): 1673*cda5da8dSAndroid Build Coastguard Worker if universal_newlines: 1674*cda5da8dSAndroid Build Coastguard Worker raise ValueError("universal_newlines must be False") 1675*cda5da8dSAndroid Build Coastguard Worker if shell: 1676*cda5da8dSAndroid Build Coastguard Worker raise ValueError("shell must be False") 1677*cda5da8dSAndroid Build Coastguard Worker if bufsize != 0: 1678*cda5da8dSAndroid Build Coastguard Worker raise ValueError("bufsize must be 0") 1679*cda5da8dSAndroid Build Coastguard Worker if text: 1680*cda5da8dSAndroid Build Coastguard Worker raise ValueError("text must be False") 1681*cda5da8dSAndroid Build Coastguard Worker if encoding is not None: 1682*cda5da8dSAndroid Build Coastguard Worker raise ValueError("encoding must be None") 1683*cda5da8dSAndroid Build Coastguard Worker if errors is not None: 1684*cda5da8dSAndroid Build Coastguard Worker raise ValueError("errors must be None") 1685*cda5da8dSAndroid Build Coastguard Worker 1686*cda5da8dSAndroid Build Coastguard Worker popen_args = (program,) + args 1687*cda5da8dSAndroid Build Coastguard Worker protocol = protocol_factory() 1688*cda5da8dSAndroid Build Coastguard Worker debug_log = None 1689*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1690*cda5da8dSAndroid Build Coastguard Worker # don't log parameters: they may contain sensitive information 1691*cda5da8dSAndroid Build Coastguard Worker # (password) and may be too long 1692*cda5da8dSAndroid Build Coastguard Worker debug_log = f'execute program {program!r}' 1693*cda5da8dSAndroid Build Coastguard Worker self._log_subprocess(debug_log, stdin, stdout, stderr) 1694*cda5da8dSAndroid Build Coastguard Worker transport = await self._make_subprocess_transport( 1695*cda5da8dSAndroid Build Coastguard Worker protocol, popen_args, False, stdin, stdout, stderr, 1696*cda5da8dSAndroid Build Coastguard Worker bufsize, **kwargs) 1697*cda5da8dSAndroid Build Coastguard Worker if self._debug and debug_log is not None: 1698*cda5da8dSAndroid Build Coastguard Worker logger.info('%s: %r', debug_log, transport) 1699*cda5da8dSAndroid Build Coastguard Worker return transport, protocol 1700*cda5da8dSAndroid Build Coastguard Worker 1701*cda5da8dSAndroid Build Coastguard Worker def get_exception_handler(self): 1702*cda5da8dSAndroid Build Coastguard Worker """Return an exception handler, or None if the default one is in use. 1703*cda5da8dSAndroid Build Coastguard Worker """ 1704*cda5da8dSAndroid Build Coastguard Worker return self._exception_handler 1705*cda5da8dSAndroid Build Coastguard Worker 1706*cda5da8dSAndroid Build Coastguard Worker def set_exception_handler(self, handler): 1707*cda5da8dSAndroid Build Coastguard Worker """Set handler as the new event loop exception handler. 1708*cda5da8dSAndroid Build Coastguard Worker 1709*cda5da8dSAndroid Build Coastguard Worker If handler is None, the default exception handler will 1710*cda5da8dSAndroid Build Coastguard Worker be set. 1711*cda5da8dSAndroid Build Coastguard Worker 1712*cda5da8dSAndroid Build Coastguard Worker If handler is a callable object, it should have a 1713*cda5da8dSAndroid Build Coastguard Worker signature matching '(loop, context)', where 'loop' 1714*cda5da8dSAndroid Build Coastguard Worker will be a reference to the active event loop, 'context' 1715*cda5da8dSAndroid Build Coastguard Worker will be a dict object (see `call_exception_handler()` 1716*cda5da8dSAndroid Build Coastguard Worker documentation for details about context). 1717*cda5da8dSAndroid Build Coastguard Worker """ 1718*cda5da8dSAndroid Build Coastguard Worker if handler is not None and not callable(handler): 1719*cda5da8dSAndroid Build Coastguard Worker raise TypeError(f'A callable object or None is expected, ' 1720*cda5da8dSAndroid Build Coastguard Worker f'got {handler!r}') 1721*cda5da8dSAndroid Build Coastguard Worker self._exception_handler = handler 1722*cda5da8dSAndroid Build Coastguard Worker 1723*cda5da8dSAndroid Build Coastguard Worker def default_exception_handler(self, context): 1724*cda5da8dSAndroid Build Coastguard Worker """Default exception handler. 1725*cda5da8dSAndroid Build Coastguard Worker 1726*cda5da8dSAndroid Build Coastguard Worker This is called when an exception occurs and no exception 1727*cda5da8dSAndroid Build Coastguard Worker handler is set, and can be called by a custom exception 1728*cda5da8dSAndroid Build Coastguard Worker handler that wants to defer to the default behavior. 1729*cda5da8dSAndroid Build Coastguard Worker 1730*cda5da8dSAndroid Build Coastguard Worker This default handler logs the error message and other 1731*cda5da8dSAndroid Build Coastguard Worker context-dependent information. In debug mode, a truncated 1732*cda5da8dSAndroid Build Coastguard Worker stack trace is also appended showing where the given object 1733*cda5da8dSAndroid Build Coastguard Worker (e.g. a handle or future or task) was created, if any. 1734*cda5da8dSAndroid Build Coastguard Worker 1735*cda5da8dSAndroid Build Coastguard Worker The context parameter has the same meaning as in 1736*cda5da8dSAndroid Build Coastguard Worker `call_exception_handler()`. 1737*cda5da8dSAndroid Build Coastguard Worker """ 1738*cda5da8dSAndroid Build Coastguard Worker message = context.get('message') 1739*cda5da8dSAndroid Build Coastguard Worker if not message: 1740*cda5da8dSAndroid Build Coastguard Worker message = 'Unhandled exception in event loop' 1741*cda5da8dSAndroid Build Coastguard Worker 1742*cda5da8dSAndroid Build Coastguard Worker exception = context.get('exception') 1743*cda5da8dSAndroid Build Coastguard Worker if exception is not None: 1744*cda5da8dSAndroid Build Coastguard Worker exc_info = (type(exception), exception, exception.__traceback__) 1745*cda5da8dSAndroid Build Coastguard Worker else: 1746*cda5da8dSAndroid Build Coastguard Worker exc_info = False 1747*cda5da8dSAndroid Build Coastguard Worker 1748*cda5da8dSAndroid Build Coastguard Worker if ('source_traceback' not in context and 1749*cda5da8dSAndroid Build Coastguard Worker self._current_handle is not None and 1750*cda5da8dSAndroid Build Coastguard Worker self._current_handle._source_traceback): 1751*cda5da8dSAndroid Build Coastguard Worker context['handle_traceback'] = \ 1752*cda5da8dSAndroid Build Coastguard Worker self._current_handle._source_traceback 1753*cda5da8dSAndroid Build Coastguard Worker 1754*cda5da8dSAndroid Build Coastguard Worker log_lines = [message] 1755*cda5da8dSAndroid Build Coastguard Worker for key in sorted(context): 1756*cda5da8dSAndroid Build Coastguard Worker if key in {'message', 'exception'}: 1757*cda5da8dSAndroid Build Coastguard Worker continue 1758*cda5da8dSAndroid Build Coastguard Worker value = context[key] 1759*cda5da8dSAndroid Build Coastguard Worker if key == 'source_traceback': 1760*cda5da8dSAndroid Build Coastguard Worker tb = ''.join(traceback.format_list(value)) 1761*cda5da8dSAndroid Build Coastguard Worker value = 'Object created at (most recent call last):\n' 1762*cda5da8dSAndroid Build Coastguard Worker value += tb.rstrip() 1763*cda5da8dSAndroid Build Coastguard Worker elif key == 'handle_traceback': 1764*cda5da8dSAndroid Build Coastguard Worker tb = ''.join(traceback.format_list(value)) 1765*cda5da8dSAndroid Build Coastguard Worker value = 'Handle created at (most recent call last):\n' 1766*cda5da8dSAndroid Build Coastguard Worker value += tb.rstrip() 1767*cda5da8dSAndroid Build Coastguard Worker else: 1768*cda5da8dSAndroid Build Coastguard Worker value = repr(value) 1769*cda5da8dSAndroid Build Coastguard Worker log_lines.append(f'{key}: {value}') 1770*cda5da8dSAndroid Build Coastguard Worker 1771*cda5da8dSAndroid Build Coastguard Worker logger.error('\n'.join(log_lines), exc_info=exc_info) 1772*cda5da8dSAndroid Build Coastguard Worker 1773*cda5da8dSAndroid Build Coastguard Worker def call_exception_handler(self, context): 1774*cda5da8dSAndroid Build Coastguard Worker """Call the current event loop's exception handler. 1775*cda5da8dSAndroid Build Coastguard Worker 1776*cda5da8dSAndroid Build Coastguard Worker The context argument is a dict containing the following keys: 1777*cda5da8dSAndroid Build Coastguard Worker 1778*cda5da8dSAndroid Build Coastguard Worker - 'message': Error message; 1779*cda5da8dSAndroid Build Coastguard Worker - 'exception' (optional): Exception object; 1780*cda5da8dSAndroid Build Coastguard Worker - 'future' (optional): Future instance; 1781*cda5da8dSAndroid Build Coastguard Worker - 'task' (optional): Task instance; 1782*cda5da8dSAndroid Build Coastguard Worker - 'handle' (optional): Handle instance; 1783*cda5da8dSAndroid Build Coastguard Worker - 'protocol' (optional): Protocol instance; 1784*cda5da8dSAndroid Build Coastguard Worker - 'transport' (optional): Transport instance; 1785*cda5da8dSAndroid Build Coastguard Worker - 'socket' (optional): Socket instance; 1786*cda5da8dSAndroid Build Coastguard Worker - 'asyncgen' (optional): Asynchronous generator that caused 1787*cda5da8dSAndroid Build Coastguard Worker the exception. 1788*cda5da8dSAndroid Build Coastguard Worker 1789*cda5da8dSAndroid Build Coastguard Worker New keys maybe introduced in the future. 1790*cda5da8dSAndroid Build Coastguard Worker 1791*cda5da8dSAndroid Build Coastguard Worker Note: do not overload this method in an event loop subclass. 1792*cda5da8dSAndroid Build Coastguard Worker For custom exception handling, use the 1793*cda5da8dSAndroid Build Coastguard Worker `set_exception_handler()` method. 1794*cda5da8dSAndroid Build Coastguard Worker """ 1795*cda5da8dSAndroid Build Coastguard Worker if self._exception_handler is None: 1796*cda5da8dSAndroid Build Coastguard Worker try: 1797*cda5da8dSAndroid Build Coastguard Worker self.default_exception_handler(context) 1798*cda5da8dSAndroid Build Coastguard Worker except (SystemExit, KeyboardInterrupt): 1799*cda5da8dSAndroid Build Coastguard Worker raise 1800*cda5da8dSAndroid Build Coastguard Worker except BaseException: 1801*cda5da8dSAndroid Build Coastguard Worker # Second protection layer for unexpected errors 1802*cda5da8dSAndroid Build Coastguard Worker # in the default implementation, as well as for subclassed 1803*cda5da8dSAndroid Build Coastguard Worker # event loops with overloaded "default_exception_handler". 1804*cda5da8dSAndroid Build Coastguard Worker logger.error('Exception in default exception handler', 1805*cda5da8dSAndroid Build Coastguard Worker exc_info=True) 1806*cda5da8dSAndroid Build Coastguard Worker else: 1807*cda5da8dSAndroid Build Coastguard Worker try: 1808*cda5da8dSAndroid Build Coastguard Worker self._exception_handler(self, context) 1809*cda5da8dSAndroid Build Coastguard Worker except (SystemExit, KeyboardInterrupt): 1810*cda5da8dSAndroid Build Coastguard Worker raise 1811*cda5da8dSAndroid Build Coastguard Worker except BaseException as exc: 1812*cda5da8dSAndroid Build Coastguard Worker # Exception in the user set custom exception handler. 1813*cda5da8dSAndroid Build Coastguard Worker try: 1814*cda5da8dSAndroid Build Coastguard Worker # Let's try default handler. 1815*cda5da8dSAndroid Build Coastguard Worker self.default_exception_handler({ 1816*cda5da8dSAndroid Build Coastguard Worker 'message': 'Unhandled error in exception handler', 1817*cda5da8dSAndroid Build Coastguard Worker 'exception': exc, 1818*cda5da8dSAndroid Build Coastguard Worker 'context': context, 1819*cda5da8dSAndroid Build Coastguard Worker }) 1820*cda5da8dSAndroid Build Coastguard Worker except (SystemExit, KeyboardInterrupt): 1821*cda5da8dSAndroid Build Coastguard Worker raise 1822*cda5da8dSAndroid Build Coastguard Worker except BaseException: 1823*cda5da8dSAndroid Build Coastguard Worker # Guard 'default_exception_handler' in case it is 1824*cda5da8dSAndroid Build Coastguard Worker # overloaded. 1825*cda5da8dSAndroid Build Coastguard Worker logger.error('Exception in default exception handler ' 1826*cda5da8dSAndroid Build Coastguard Worker 'while handling an unexpected error ' 1827*cda5da8dSAndroid Build Coastguard Worker 'in custom exception handler', 1828*cda5da8dSAndroid Build Coastguard Worker exc_info=True) 1829*cda5da8dSAndroid Build Coastguard Worker 1830*cda5da8dSAndroid Build Coastguard Worker def _add_callback(self, handle): 1831*cda5da8dSAndroid Build Coastguard Worker """Add a Handle to _ready.""" 1832*cda5da8dSAndroid Build Coastguard Worker if not handle._cancelled: 1833*cda5da8dSAndroid Build Coastguard Worker self._ready.append(handle) 1834*cda5da8dSAndroid Build Coastguard Worker 1835*cda5da8dSAndroid Build Coastguard Worker def _add_callback_signalsafe(self, handle): 1836*cda5da8dSAndroid Build Coastguard Worker """Like _add_callback() but called from a signal handler.""" 1837*cda5da8dSAndroid Build Coastguard Worker self._add_callback(handle) 1838*cda5da8dSAndroid Build Coastguard Worker self._write_to_self() 1839*cda5da8dSAndroid Build Coastguard Worker 1840*cda5da8dSAndroid Build Coastguard Worker def _timer_handle_cancelled(self, handle): 1841*cda5da8dSAndroid Build Coastguard Worker """Notification that a TimerHandle has been cancelled.""" 1842*cda5da8dSAndroid Build Coastguard Worker if handle._scheduled: 1843*cda5da8dSAndroid Build Coastguard Worker self._timer_cancelled_count += 1 1844*cda5da8dSAndroid Build Coastguard Worker 1845*cda5da8dSAndroid Build Coastguard Worker def _run_once(self): 1846*cda5da8dSAndroid Build Coastguard Worker """Run one full iteration of the event loop. 1847*cda5da8dSAndroid Build Coastguard Worker 1848*cda5da8dSAndroid Build Coastguard Worker This calls all currently ready callbacks, polls for I/O, 1849*cda5da8dSAndroid Build Coastguard Worker schedules the resulting callbacks, and finally schedules 1850*cda5da8dSAndroid Build Coastguard Worker 'call_later' callbacks. 1851*cda5da8dSAndroid Build Coastguard Worker """ 1852*cda5da8dSAndroid Build Coastguard Worker 1853*cda5da8dSAndroid Build Coastguard Worker sched_count = len(self._scheduled) 1854*cda5da8dSAndroid Build Coastguard Worker if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 1855*cda5da8dSAndroid Build Coastguard Worker self._timer_cancelled_count / sched_count > 1856*cda5da8dSAndroid Build Coastguard Worker _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 1857*cda5da8dSAndroid Build Coastguard Worker # Remove delayed calls that were cancelled if their number 1858*cda5da8dSAndroid Build Coastguard Worker # is too high 1859*cda5da8dSAndroid Build Coastguard Worker new_scheduled = [] 1860*cda5da8dSAndroid Build Coastguard Worker for handle in self._scheduled: 1861*cda5da8dSAndroid Build Coastguard Worker if handle._cancelled: 1862*cda5da8dSAndroid Build Coastguard Worker handle._scheduled = False 1863*cda5da8dSAndroid Build Coastguard Worker else: 1864*cda5da8dSAndroid Build Coastguard Worker new_scheduled.append(handle) 1865*cda5da8dSAndroid Build Coastguard Worker 1866*cda5da8dSAndroid Build Coastguard Worker heapq.heapify(new_scheduled) 1867*cda5da8dSAndroid Build Coastguard Worker self._scheduled = new_scheduled 1868*cda5da8dSAndroid Build Coastguard Worker self._timer_cancelled_count = 0 1869*cda5da8dSAndroid Build Coastguard Worker else: 1870*cda5da8dSAndroid Build Coastguard Worker # Remove delayed calls that were cancelled from head of queue. 1871*cda5da8dSAndroid Build Coastguard Worker while self._scheduled and self._scheduled[0]._cancelled: 1872*cda5da8dSAndroid Build Coastguard Worker self._timer_cancelled_count -= 1 1873*cda5da8dSAndroid Build Coastguard Worker handle = heapq.heappop(self._scheduled) 1874*cda5da8dSAndroid Build Coastguard Worker handle._scheduled = False 1875*cda5da8dSAndroid Build Coastguard Worker 1876*cda5da8dSAndroid Build Coastguard Worker timeout = None 1877*cda5da8dSAndroid Build Coastguard Worker if self._ready or self._stopping: 1878*cda5da8dSAndroid Build Coastguard Worker timeout = 0 1879*cda5da8dSAndroid Build Coastguard Worker elif self._scheduled: 1880*cda5da8dSAndroid Build Coastguard Worker # Compute the desired timeout. 1881*cda5da8dSAndroid Build Coastguard Worker when = self._scheduled[0]._when 1882*cda5da8dSAndroid Build Coastguard Worker timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) 1883*cda5da8dSAndroid Build Coastguard Worker 1884*cda5da8dSAndroid Build Coastguard Worker event_list = self._selector.select(timeout) 1885*cda5da8dSAndroid Build Coastguard Worker self._process_events(event_list) 1886*cda5da8dSAndroid Build Coastguard Worker # Needed to break cycles when an exception occurs. 1887*cda5da8dSAndroid Build Coastguard Worker event_list = None 1888*cda5da8dSAndroid Build Coastguard Worker 1889*cda5da8dSAndroid Build Coastguard Worker # Handle 'later' callbacks that are ready. 1890*cda5da8dSAndroid Build Coastguard Worker end_time = self.time() + self._clock_resolution 1891*cda5da8dSAndroid Build Coastguard Worker while self._scheduled: 1892*cda5da8dSAndroid Build Coastguard Worker handle = self._scheduled[0] 1893*cda5da8dSAndroid Build Coastguard Worker if handle._when >= end_time: 1894*cda5da8dSAndroid Build Coastguard Worker break 1895*cda5da8dSAndroid Build Coastguard Worker handle = heapq.heappop(self._scheduled) 1896*cda5da8dSAndroid Build Coastguard Worker handle._scheduled = False 1897*cda5da8dSAndroid Build Coastguard Worker self._ready.append(handle) 1898*cda5da8dSAndroid Build Coastguard Worker 1899*cda5da8dSAndroid Build Coastguard Worker # This is the only place where callbacks are actually *called*. 1900*cda5da8dSAndroid Build Coastguard Worker # All other places just add them to ready. 1901*cda5da8dSAndroid Build Coastguard Worker # Note: We run all currently scheduled callbacks, but not any 1902*cda5da8dSAndroid Build Coastguard Worker # callbacks scheduled by callbacks run this time around -- 1903*cda5da8dSAndroid Build Coastguard Worker # they will be run the next time (after another I/O poll). 1904*cda5da8dSAndroid Build Coastguard Worker # Use an idiom that is thread-safe without using locks. 1905*cda5da8dSAndroid Build Coastguard Worker ntodo = len(self._ready) 1906*cda5da8dSAndroid Build Coastguard Worker for i in range(ntodo): 1907*cda5da8dSAndroid Build Coastguard Worker handle = self._ready.popleft() 1908*cda5da8dSAndroid Build Coastguard Worker if handle._cancelled: 1909*cda5da8dSAndroid Build Coastguard Worker continue 1910*cda5da8dSAndroid Build Coastguard Worker if self._debug: 1911*cda5da8dSAndroid Build Coastguard Worker try: 1912*cda5da8dSAndroid Build Coastguard Worker self._current_handle = handle 1913*cda5da8dSAndroid Build Coastguard Worker t0 = self.time() 1914*cda5da8dSAndroid Build Coastguard Worker handle._run() 1915*cda5da8dSAndroid Build Coastguard Worker dt = self.time() - t0 1916*cda5da8dSAndroid Build Coastguard Worker if dt >= self.slow_callback_duration: 1917*cda5da8dSAndroid Build Coastguard Worker logger.warning('Executing %s took %.3f seconds', 1918*cda5da8dSAndroid Build Coastguard Worker _format_handle(handle), dt) 1919*cda5da8dSAndroid Build Coastguard Worker finally: 1920*cda5da8dSAndroid Build Coastguard Worker self._current_handle = None 1921*cda5da8dSAndroid Build Coastguard Worker else: 1922*cda5da8dSAndroid Build Coastguard Worker handle._run() 1923*cda5da8dSAndroid Build Coastguard Worker handle = None # Needed to break cycles when an exception occurs. 1924*cda5da8dSAndroid Build Coastguard Worker 1925*cda5da8dSAndroid Build Coastguard Worker def _set_coroutine_origin_tracking(self, enabled): 1926*cda5da8dSAndroid Build Coastguard Worker if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 1927*cda5da8dSAndroid Build Coastguard Worker return 1928*cda5da8dSAndroid Build Coastguard Worker 1929*cda5da8dSAndroid Build Coastguard Worker if enabled: 1930*cda5da8dSAndroid Build Coastguard Worker self._coroutine_origin_tracking_saved_depth = ( 1931*cda5da8dSAndroid Build Coastguard Worker sys.get_coroutine_origin_tracking_depth()) 1932*cda5da8dSAndroid Build Coastguard Worker sys.set_coroutine_origin_tracking_depth( 1933*cda5da8dSAndroid Build Coastguard Worker constants.DEBUG_STACK_DEPTH) 1934*cda5da8dSAndroid Build Coastguard Worker else: 1935*cda5da8dSAndroid Build Coastguard Worker sys.set_coroutine_origin_tracking_depth( 1936*cda5da8dSAndroid Build Coastguard Worker self._coroutine_origin_tracking_saved_depth) 1937*cda5da8dSAndroid Build Coastguard Worker 1938*cda5da8dSAndroid Build Coastguard Worker self._coroutine_origin_tracking_enabled = enabled 1939*cda5da8dSAndroid Build Coastguard Worker 1940*cda5da8dSAndroid Build Coastguard Worker def get_debug(self): 1941*cda5da8dSAndroid Build Coastguard Worker return self._debug 1942*cda5da8dSAndroid Build Coastguard Worker 1943*cda5da8dSAndroid Build Coastguard Worker def set_debug(self, enabled): 1944*cda5da8dSAndroid Build Coastguard Worker self._debug = enabled 1945*cda5da8dSAndroid Build Coastguard Worker 1946*cda5da8dSAndroid Build Coastguard Worker if self.is_running(): 1947*cda5da8dSAndroid Build Coastguard Worker self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) 1948