1"""Base implementation of event loop. 2 3The event loop can be broken up into a multiplexer (the part 4responsible for notifying us of I/O events) and the event loop proper, 5which wraps a multiplexer with functionality for scheduling callbacks, 6immediately or at a given time in the future. 7 8Whenever a public API takes a callback, subsequent positional 9arguments will be passed to the callback if/when it is called. This 10avoids the proliferation of trivial lambdas implementing closures. 11Keyword arguments for the callback are not supported; this is a 12conscious design decision, leaving the door open for keyword arguments 13to modify the meaning of the API call itself. 14""" 15 16import collections 17import collections.abc 18import concurrent.futures 19import functools 20import heapq 21import itertools 22import os 23import socket 24import stat 25import subprocess 26import threading 27import time 28import traceback 29import sys 30import warnings 31import weakref 32 33try: 34 import ssl 35except ImportError: # pragma: no cover 36 ssl = None 37 38from . import constants 39from . import coroutines 40from . import events 41from . import exceptions 42from . import futures 43from . import protocols 44from . import sslproto 45from . import staggered 46from . import tasks 47from . import transports 48from . import trsock 49from .log import logger 50 51 52__all__ = 'BaseEventLoop','Server', 53 54 55# Minimum number of _scheduled timer handles before cleanup of 56# cancelled handles is performed. 57_MIN_SCHEDULED_TIMER_HANDLES = 100 58 59# Minimum fraction of _scheduled timer handles that are cancelled 60# before cleanup of cancelled handles is performed. 61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 62 63 64_HAS_IPv6 = hasattr(socket, 'AF_INET6') 65 66# Maximum timeout passed to select to avoid OS limitations 67MAXIMUM_SELECT_TIMEOUT = 24 * 3600 68 69 70def _format_handle(handle): 71 cb = handle._callback 72 if isinstance(getattr(cb, '__self__', None), tasks.Task): 73 # format the task 74 return repr(cb.__self__) 75 else: 76 return str(handle) 77 78 79def _format_pipe(fd): 80 if fd == subprocess.PIPE: 81 return '<pipe>' 82 elif fd == subprocess.STDOUT: 83 return '<stdout>' 84 else: 85 return repr(fd) 86 87 88def _set_reuseport(sock): 89 if not hasattr(socket, 'SO_REUSEPORT'): 90 raise ValueError('reuse_port not supported by socket module') 91 else: 92 try: 93 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 94 except OSError: 95 raise ValueError('reuse_port not supported by socket module, ' 96 'SO_REUSEPORT defined but not implemented.') 97 98 99def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 100 # Try to skip getaddrinfo if "host" is already an IP. Users might have 101 # handled name resolution in their own code and pass in resolved IPs. 102 if not hasattr(socket, 'inet_pton'): 103 return 104 105 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 106 host is None: 107 return None 108 109 if type == socket.SOCK_STREAM: 110 proto = socket.IPPROTO_TCP 111 elif type == socket.SOCK_DGRAM: 112 proto = socket.IPPROTO_UDP 113 else: 114 return None 115 116 if port is None: 117 port = 0 118 elif isinstance(port, bytes) and port == b'': 119 port = 0 120 elif isinstance(port, str) and port == '': 121 port = 0 122 else: 123 # If port's a service name like "http", don't skip getaddrinfo. 124 try: 125 port = int(port) 126 except (TypeError, ValueError): 127 return None 128 129 if family == socket.AF_UNSPEC: 130 afs = [socket.AF_INET] 131 if _HAS_IPv6: 132 afs.append(socket.AF_INET6) 133 else: 134 afs = [family] 135 136 if isinstance(host, bytes): 137 host = host.decode('idna') 138 if '%' in host: 139 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 140 # like '::1%lo0'. 141 return None 142 143 for af in afs: 144 try: 145 socket.inet_pton(af, host) 146 # The host has already been resolved. 147 if _HAS_IPv6 and af == socket.AF_INET6: 148 return af, type, proto, '', (host, port, flowinfo, scopeid) 149 else: 150 return af, type, proto, '', (host, port) 151 except OSError: 152 pass 153 154 # "host" is not an IP address. 155 return None 156 157 158def _interleave_addrinfos(addrinfos, first_address_family_count=1): 159 """Interleave list of addrinfo tuples by family.""" 160 # Group addresses by family 161 addrinfos_by_family = collections.OrderedDict() 162 for addr in addrinfos: 163 family = addr[0] 164 if family not in addrinfos_by_family: 165 addrinfos_by_family[family] = [] 166 addrinfos_by_family[family].append(addr) 167 addrinfos_lists = list(addrinfos_by_family.values()) 168 169 reordered = [] 170 if first_address_family_count > 1: 171 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 172 del addrinfos_lists[0][:first_address_family_count - 1] 173 reordered.extend( 174 a for a in itertools.chain.from_iterable( 175 itertools.zip_longest(*addrinfos_lists) 176 ) if a is not None) 177 return reordered 178 179 180def _run_until_complete_cb(fut): 181 if not fut.cancelled(): 182 exc = fut.exception() 183 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 184 # Issue #22429: run_forever() already finished, no need to 185 # stop it. 186 return 187 futures._get_loop(fut).stop() 188 189 190if hasattr(socket, 'TCP_NODELAY'): 191 def _set_nodelay(sock): 192 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 193 sock.type == socket.SOCK_STREAM and 194 sock.proto == socket.IPPROTO_TCP): 195 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 196else: 197 def _set_nodelay(sock): 198 pass 199 200 201def _check_ssl_socket(sock): 202 if ssl is not None and isinstance(sock, ssl.SSLSocket): 203 raise TypeError("Socket cannot be of type SSLSocket") 204 205 206class _SendfileFallbackProtocol(protocols.Protocol): 207 def __init__(self, transp): 208 if not isinstance(transp, transports._FlowControlMixin): 209 raise TypeError("transport should be _FlowControlMixin instance") 210 self._transport = transp 211 self._proto = transp.get_protocol() 212 self._should_resume_reading = transp.is_reading() 213 self._should_resume_writing = transp._protocol_paused 214 transp.pause_reading() 215 transp.set_protocol(self) 216 if self._should_resume_writing: 217 self._write_ready_fut = self._transport._loop.create_future() 218 else: 219 self._write_ready_fut = None 220 221 async def drain(self): 222 if self._transport.is_closing(): 223 raise ConnectionError("Connection closed by peer") 224 fut = self._write_ready_fut 225 if fut is None: 226 return 227 await fut 228 229 def connection_made(self, transport): 230 raise RuntimeError("Invalid state: " 231 "connection should have been established already.") 232 233 def connection_lost(self, exc): 234 if self._write_ready_fut is not None: 235 # Never happens if peer disconnects after sending the whole content 236 # Thus disconnection is always an exception from user perspective 237 if exc is None: 238 self._write_ready_fut.set_exception( 239 ConnectionError("Connection is closed by peer")) 240 else: 241 self._write_ready_fut.set_exception(exc) 242 self._proto.connection_lost(exc) 243 244 def pause_writing(self): 245 if self._write_ready_fut is not None: 246 return 247 self._write_ready_fut = self._transport._loop.create_future() 248 249 def resume_writing(self): 250 if self._write_ready_fut is None: 251 return 252 self._write_ready_fut.set_result(False) 253 self._write_ready_fut = None 254 255 def data_received(self, data): 256 raise RuntimeError("Invalid state: reading should be paused") 257 258 def eof_received(self): 259 raise RuntimeError("Invalid state: reading should be paused") 260 261 async def restore(self): 262 self._transport.set_protocol(self._proto) 263 if self._should_resume_reading: 264 self._transport.resume_reading() 265 if self._write_ready_fut is not None: 266 # Cancel the future. 267 # Basically it has no effect because protocol is switched back, 268 # no code should wait for it anymore. 269 self._write_ready_fut.cancel() 270 if self._should_resume_writing: 271 self._proto.resume_writing() 272 273 274class Server(events.AbstractServer): 275 276 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 277 ssl_handshake_timeout, ssl_shutdown_timeout=None): 278 self._loop = loop 279 self._sockets = sockets 280 self._active_count = 0 281 self._waiters = [] 282 self._protocol_factory = protocol_factory 283 self._backlog = backlog 284 self._ssl_context = ssl_context 285 self._ssl_handshake_timeout = ssl_handshake_timeout 286 self._ssl_shutdown_timeout = ssl_shutdown_timeout 287 self._serving = False 288 self._serving_forever_fut = None 289 290 def __repr__(self): 291 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 292 293 def _attach(self): 294 assert self._sockets is not None 295 self._active_count += 1 296 297 def _detach(self): 298 assert self._active_count > 0 299 self._active_count -= 1 300 if self._active_count == 0 and self._sockets is None: 301 self._wakeup() 302 303 def _wakeup(self): 304 waiters = self._waiters 305 self._waiters = None 306 for waiter in waiters: 307 if not waiter.done(): 308 waiter.set_result(waiter) 309 310 def _start_serving(self): 311 if self._serving: 312 return 313 self._serving = True 314 for sock in self._sockets: 315 sock.listen(self._backlog) 316 self._loop._start_serving( 317 self._protocol_factory, sock, self._ssl_context, 318 self, self._backlog, self._ssl_handshake_timeout, 319 self._ssl_shutdown_timeout) 320 321 def get_loop(self): 322 return self._loop 323 324 def is_serving(self): 325 return self._serving 326 327 @property 328 def sockets(self): 329 if self._sockets is None: 330 return () 331 return tuple(trsock.TransportSocket(s) for s in self._sockets) 332 333 def close(self): 334 sockets = self._sockets 335 if sockets is None: 336 return 337 self._sockets = None 338 339 for sock in sockets: 340 self._loop._stop_serving(sock) 341 342 self._serving = False 343 344 if (self._serving_forever_fut is not None and 345 not self._serving_forever_fut.done()): 346 self._serving_forever_fut.cancel() 347 self._serving_forever_fut = None 348 349 if self._active_count == 0: 350 self._wakeup() 351 352 async def start_serving(self): 353 self._start_serving() 354 # Skip one loop iteration so that all 'loop.add_reader' 355 # go through. 356 await tasks.sleep(0) 357 358 async def serve_forever(self): 359 if self._serving_forever_fut is not None: 360 raise RuntimeError( 361 f'server {self!r} is already being awaited on serve_forever()') 362 if self._sockets is None: 363 raise RuntimeError(f'server {self!r} is closed') 364 365 self._start_serving() 366 self._serving_forever_fut = self._loop.create_future() 367 368 try: 369 await self._serving_forever_fut 370 except exceptions.CancelledError: 371 try: 372 self.close() 373 await self.wait_closed() 374 finally: 375 raise 376 finally: 377 self._serving_forever_fut = None 378 379 async def wait_closed(self): 380 if self._sockets is None or self._waiters is None: 381 return 382 waiter = self._loop.create_future() 383 self._waiters.append(waiter) 384 await waiter 385 386 387class BaseEventLoop(events.AbstractEventLoop): 388 389 def __init__(self): 390 self._timer_cancelled_count = 0 391 self._closed = False 392 self._stopping = False 393 self._ready = collections.deque() 394 self._scheduled = [] 395 self._default_executor = None 396 self._internal_fds = 0 397 # Identifier of the thread running the event loop, or None if the 398 # event loop is not running 399 self._thread_id = None 400 self._clock_resolution = time.get_clock_info('monotonic').resolution 401 self._exception_handler = None 402 self.set_debug(coroutines._is_debug_mode()) 403 # In debug mode, if the execution of a callback or a step of a task 404 # exceed this duration in seconds, the slow callback/task is logged. 405 self.slow_callback_duration = 0.1 406 self._current_handle = None 407 self._task_factory = None 408 self._coroutine_origin_tracking_enabled = False 409 self._coroutine_origin_tracking_saved_depth = None 410 411 # A weak set of all asynchronous generators that are 412 # being iterated by the loop. 413 self._asyncgens = weakref.WeakSet() 414 # Set to True when `loop.shutdown_asyncgens` is called. 415 self._asyncgens_shutdown_called = False 416 # Set to True when `loop.shutdown_default_executor` is called. 417 self._executor_shutdown_called = False 418 419 def __repr__(self): 420 return ( 421 f'<{self.__class__.__name__} running={self.is_running()} ' 422 f'closed={self.is_closed()} debug={self.get_debug()}>' 423 ) 424 425 def create_future(self): 426 """Create a Future object attached to the loop.""" 427 return futures.Future(loop=self) 428 429 def create_task(self, coro, *, name=None, context=None): 430 """Schedule a coroutine object. 431 432 Return a task object. 433 """ 434 self._check_closed() 435 if self._task_factory is None: 436 task = tasks.Task(coro, loop=self, name=name, context=context) 437 if task._source_traceback: 438 del task._source_traceback[-1] 439 else: 440 if context is None: 441 # Use legacy API if context is not needed 442 task = self._task_factory(self, coro) 443 else: 444 task = self._task_factory(self, coro, context=context) 445 446 tasks._set_task_name(task, name) 447 448 return task 449 450 def set_task_factory(self, factory): 451 """Set a task factory that will be used by loop.create_task(). 452 453 If factory is None the default task factory will be set. 454 455 If factory is a callable, it should have a signature matching 456 '(loop, coro)', where 'loop' will be a reference to the active 457 event loop, 'coro' will be a coroutine object. The callable 458 must return a Future. 459 """ 460 if factory is not None and not callable(factory): 461 raise TypeError('task factory must be a callable or None') 462 self._task_factory = factory 463 464 def get_task_factory(self): 465 """Return a task factory, or None if the default one is in use.""" 466 return self._task_factory 467 468 def _make_socket_transport(self, sock, protocol, waiter=None, *, 469 extra=None, server=None): 470 """Create socket transport.""" 471 raise NotImplementedError 472 473 def _make_ssl_transport( 474 self, rawsock, protocol, sslcontext, waiter=None, 475 *, server_side=False, server_hostname=None, 476 extra=None, server=None, 477 ssl_handshake_timeout=None, 478 ssl_shutdown_timeout=None, 479 call_connection_made=True): 480 """Create SSL transport.""" 481 raise NotImplementedError 482 483 def _make_datagram_transport(self, sock, protocol, 484 address=None, waiter=None, extra=None): 485 """Create datagram transport.""" 486 raise NotImplementedError 487 488 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 489 extra=None): 490 """Create read pipe transport.""" 491 raise NotImplementedError 492 493 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 494 extra=None): 495 """Create write pipe transport.""" 496 raise NotImplementedError 497 498 async def _make_subprocess_transport(self, protocol, args, shell, 499 stdin, stdout, stderr, bufsize, 500 extra=None, **kwargs): 501 """Create subprocess transport.""" 502 raise NotImplementedError 503 504 def _write_to_self(self): 505 """Write a byte to self-pipe, to wake up the event loop. 506 507 This may be called from a different thread. 508 509 The subclass is responsible for implementing the self-pipe. 510 """ 511 raise NotImplementedError 512 513 def _process_events(self, event_list): 514 """Process selector events.""" 515 raise NotImplementedError 516 517 def _check_closed(self): 518 if self._closed: 519 raise RuntimeError('Event loop is closed') 520 521 def _check_default_executor(self): 522 if self._executor_shutdown_called: 523 raise RuntimeError('Executor shutdown has been called') 524 525 def _asyncgen_finalizer_hook(self, agen): 526 self._asyncgens.discard(agen) 527 if not self.is_closed(): 528 self.call_soon_threadsafe(self.create_task, agen.aclose()) 529 530 def _asyncgen_firstiter_hook(self, agen): 531 if self._asyncgens_shutdown_called: 532 warnings.warn( 533 f"asynchronous generator {agen!r} was scheduled after " 534 f"loop.shutdown_asyncgens() call", 535 ResourceWarning, source=self) 536 537 self._asyncgens.add(agen) 538 539 async def shutdown_asyncgens(self): 540 """Shutdown all active asynchronous generators.""" 541 self._asyncgens_shutdown_called = True 542 543 if not len(self._asyncgens): 544 # If Python version is <3.6 or we don't have any asynchronous 545 # generators alive. 546 return 547 548 closing_agens = list(self._asyncgens) 549 self._asyncgens.clear() 550 551 results = await tasks.gather( 552 *[ag.aclose() for ag in closing_agens], 553 return_exceptions=True) 554 555 for result, agen in zip(results, closing_agens): 556 if isinstance(result, Exception): 557 self.call_exception_handler({ 558 'message': f'an error occurred during closing of ' 559 f'asynchronous generator {agen!r}', 560 'exception': result, 561 'asyncgen': agen 562 }) 563 564 async def shutdown_default_executor(self): 565 """Schedule the shutdown of the default executor.""" 566 self._executor_shutdown_called = True 567 if self._default_executor is None: 568 return 569 future = self.create_future() 570 thread = threading.Thread(target=self._do_shutdown, args=(future,)) 571 thread.start() 572 try: 573 await future 574 finally: 575 thread.join() 576 577 def _do_shutdown(self, future): 578 try: 579 self._default_executor.shutdown(wait=True) 580 if not self.is_closed(): 581 self.call_soon_threadsafe(future.set_result, None) 582 except Exception as ex: 583 if not self.is_closed(): 584 self.call_soon_threadsafe(future.set_exception, ex) 585 586 def _check_running(self): 587 if self.is_running(): 588 raise RuntimeError('This event loop is already running') 589 if events._get_running_loop() is not None: 590 raise RuntimeError( 591 'Cannot run the event loop while another loop is running') 592 593 def run_forever(self): 594 """Run until stop() is called.""" 595 self._check_closed() 596 self._check_running() 597 self._set_coroutine_origin_tracking(self._debug) 598 599 old_agen_hooks = sys.get_asyncgen_hooks() 600 try: 601 self._thread_id = threading.get_ident() 602 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, 603 finalizer=self._asyncgen_finalizer_hook) 604 605 events._set_running_loop(self) 606 while True: 607 self._run_once() 608 if self._stopping: 609 break 610 finally: 611 self._stopping = False 612 self._thread_id = None 613 events._set_running_loop(None) 614 self._set_coroutine_origin_tracking(False) 615 sys.set_asyncgen_hooks(*old_agen_hooks) 616 617 def run_until_complete(self, future): 618 """Run until the Future is done. 619 620 If the argument is a coroutine, it is wrapped in a Task. 621 622 WARNING: It would be disastrous to call run_until_complete() 623 with the same coroutine twice -- it would wrap it in two 624 different Tasks and that can't be good. 625 626 Return the Future's result, or raise its exception. 627 """ 628 self._check_closed() 629 self._check_running() 630 631 new_task = not futures.isfuture(future) 632 future = tasks.ensure_future(future, loop=self) 633 if new_task: 634 # An exception is raised if the future didn't complete, so there 635 # is no need to log the "destroy pending task" message 636 future._log_destroy_pending = False 637 638 future.add_done_callback(_run_until_complete_cb) 639 try: 640 self.run_forever() 641 except: 642 if new_task and future.done() and not future.cancelled(): 643 # The coroutine raised a BaseException. Consume the exception 644 # to not log a warning, the caller doesn't have access to the 645 # local task. 646 future.exception() 647 raise 648 finally: 649 future.remove_done_callback(_run_until_complete_cb) 650 if not future.done(): 651 raise RuntimeError('Event loop stopped before Future completed.') 652 653 return future.result() 654 655 def stop(self): 656 """Stop running the event loop. 657 658 Every callback already scheduled will still run. This simply informs 659 run_forever to stop looping after a complete iteration. 660 """ 661 self._stopping = True 662 663 def close(self): 664 """Close the event loop. 665 666 This clears the queues and shuts down the executor, 667 but does not wait for the executor to finish. 668 669 The event loop must not be running. 670 """ 671 if self.is_running(): 672 raise RuntimeError("Cannot close a running event loop") 673 if self._closed: 674 return 675 if self._debug: 676 logger.debug("Close %r", self) 677 self._closed = True 678 self._ready.clear() 679 self._scheduled.clear() 680 self._executor_shutdown_called = True 681 executor = self._default_executor 682 if executor is not None: 683 self._default_executor = None 684 executor.shutdown(wait=False) 685 686 def is_closed(self): 687 """Returns True if the event loop was closed.""" 688 return self._closed 689 690 def __del__(self, _warn=warnings.warn): 691 if not self.is_closed(): 692 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 693 if not self.is_running(): 694 self.close() 695 696 def is_running(self): 697 """Returns True if the event loop is running.""" 698 return (self._thread_id is not None) 699 700 def time(self): 701 """Return the time according to the event loop's clock. 702 703 This is a float expressed in seconds since an epoch, but the 704 epoch, precision, accuracy and drift are unspecified and may 705 differ per event loop. 706 """ 707 return time.monotonic() 708 709 def call_later(self, delay, callback, *args, context=None): 710 """Arrange for a callback to be called at a given time. 711 712 Return a Handle: an opaque object with a cancel() method that 713 can be used to cancel the call. 714 715 The delay can be an int or float, expressed in seconds. It is 716 always relative to the current time. 717 718 Each callback will be called exactly once. If two callbacks 719 are scheduled for exactly the same time, it undefined which 720 will be called first. 721 722 Any positional arguments after the callback will be passed to 723 the callback when it is called. 724 """ 725 if delay is None: 726 raise TypeError('delay must not be None') 727 timer = self.call_at(self.time() + delay, callback, *args, 728 context=context) 729 if timer._source_traceback: 730 del timer._source_traceback[-1] 731 return timer 732 733 def call_at(self, when, callback, *args, context=None): 734 """Like call_later(), but uses an absolute time. 735 736 Absolute time corresponds to the event loop's time() method. 737 """ 738 if when is None: 739 raise TypeError("when cannot be None") 740 self._check_closed() 741 if self._debug: 742 self._check_thread() 743 self._check_callback(callback, 'call_at') 744 timer = events.TimerHandle(when, callback, args, self, context) 745 if timer._source_traceback: 746 del timer._source_traceback[-1] 747 heapq.heappush(self._scheduled, timer) 748 timer._scheduled = True 749 return timer 750 751 def call_soon(self, callback, *args, context=None): 752 """Arrange for a callback to be called as soon as possible. 753 754 This operates as a FIFO queue: callbacks are called in the 755 order in which they are registered. Each callback will be 756 called exactly once. 757 758 Any positional arguments after the callback will be passed to 759 the callback when it is called. 760 """ 761 self._check_closed() 762 if self._debug: 763 self._check_thread() 764 self._check_callback(callback, 'call_soon') 765 handle = self._call_soon(callback, args, context) 766 if handle._source_traceback: 767 del handle._source_traceback[-1] 768 return handle 769 770 def _check_callback(self, callback, method): 771 if (coroutines.iscoroutine(callback) or 772 coroutines.iscoroutinefunction(callback)): 773 raise TypeError( 774 f"coroutines cannot be used with {method}()") 775 if not callable(callback): 776 raise TypeError( 777 f'a callable object was expected by {method}(), ' 778 f'got {callback!r}') 779 780 def _call_soon(self, callback, args, context): 781 handle = events.Handle(callback, args, self, context) 782 if handle._source_traceback: 783 del handle._source_traceback[-1] 784 self._ready.append(handle) 785 return handle 786 787 def _check_thread(self): 788 """Check that the current thread is the thread running the event loop. 789 790 Non-thread-safe methods of this class make this assumption and will 791 likely behave incorrectly when the assumption is violated. 792 793 Should only be called when (self._debug == True). The caller is 794 responsible for checking this condition for performance reasons. 795 """ 796 if self._thread_id is None: 797 return 798 thread_id = threading.get_ident() 799 if thread_id != self._thread_id: 800 raise RuntimeError( 801 "Non-thread-safe operation invoked on an event loop other " 802 "than the current one") 803 804 def call_soon_threadsafe(self, callback, *args, context=None): 805 """Like call_soon(), but thread-safe.""" 806 self._check_closed() 807 if self._debug: 808 self._check_callback(callback, 'call_soon_threadsafe') 809 handle = self._call_soon(callback, args, context) 810 if handle._source_traceback: 811 del handle._source_traceback[-1] 812 self._write_to_self() 813 return handle 814 815 def run_in_executor(self, executor, func, *args): 816 self._check_closed() 817 if self._debug: 818 self._check_callback(func, 'run_in_executor') 819 if executor is None: 820 executor = self._default_executor 821 # Only check when the default executor is being used 822 self._check_default_executor() 823 if executor is None: 824 executor = concurrent.futures.ThreadPoolExecutor( 825 thread_name_prefix='asyncio' 826 ) 827 self._default_executor = executor 828 return futures.wrap_future( 829 executor.submit(func, *args), loop=self) 830 831 def set_default_executor(self, executor): 832 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 833 raise TypeError('executor must be ThreadPoolExecutor instance') 834 self._default_executor = executor 835 836 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 837 msg = [f"{host}:{port!r}"] 838 if family: 839 msg.append(f'family={family!r}') 840 if type: 841 msg.append(f'type={type!r}') 842 if proto: 843 msg.append(f'proto={proto!r}') 844 if flags: 845 msg.append(f'flags={flags!r}') 846 msg = ', '.join(msg) 847 logger.debug('Get address info %s', msg) 848 849 t0 = self.time() 850 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 851 dt = self.time() - t0 852 853 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 854 if dt >= self.slow_callback_duration: 855 logger.info(msg) 856 else: 857 logger.debug(msg) 858 return addrinfo 859 860 async def getaddrinfo(self, host, port, *, 861 family=0, type=0, proto=0, flags=0): 862 if self._debug: 863 getaddr_func = self._getaddrinfo_debug 864 else: 865 getaddr_func = socket.getaddrinfo 866 867 return await self.run_in_executor( 868 None, getaddr_func, host, port, family, type, proto, flags) 869 870 async def getnameinfo(self, sockaddr, flags=0): 871 return await self.run_in_executor( 872 None, socket.getnameinfo, sockaddr, flags) 873 874 async def sock_sendfile(self, sock, file, offset=0, count=None, 875 *, fallback=True): 876 if self._debug and sock.gettimeout() != 0: 877 raise ValueError("the socket must be non-blocking") 878 _check_ssl_socket(sock) 879 self._check_sendfile_params(sock, file, offset, count) 880 try: 881 return await self._sock_sendfile_native(sock, file, 882 offset, count) 883 except exceptions.SendfileNotAvailableError as exc: 884 if not fallback: 885 raise 886 return await self._sock_sendfile_fallback(sock, file, 887 offset, count) 888 889 async def _sock_sendfile_native(self, sock, file, offset, count): 890 # NB: sendfile syscall is not supported for SSL sockets and 891 # non-mmap files even if sendfile is supported by OS 892 raise exceptions.SendfileNotAvailableError( 893 f"syscall sendfile is not available for socket {sock!r} " 894 f"and file {file!r} combination") 895 896 async def _sock_sendfile_fallback(self, sock, file, offset, count): 897 if offset: 898 file.seek(offset) 899 blocksize = ( 900 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 901 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 902 ) 903 buf = bytearray(blocksize) 904 total_sent = 0 905 try: 906 while True: 907 if count: 908 blocksize = min(count - total_sent, blocksize) 909 if blocksize <= 0: 910 break 911 view = memoryview(buf)[:blocksize] 912 read = await self.run_in_executor(None, file.readinto, view) 913 if not read: 914 break # EOF 915 await self.sock_sendall(sock, view[:read]) 916 total_sent += read 917 return total_sent 918 finally: 919 if total_sent > 0 and hasattr(file, 'seek'): 920 file.seek(offset + total_sent) 921 922 def _check_sendfile_params(self, sock, file, offset, count): 923 if 'b' not in getattr(file, 'mode', 'b'): 924 raise ValueError("file should be opened in binary mode") 925 if not sock.type == socket.SOCK_STREAM: 926 raise ValueError("only SOCK_STREAM type sockets are supported") 927 if count is not None: 928 if not isinstance(count, int): 929 raise TypeError( 930 "count must be a positive integer (got {!r})".format(count)) 931 if count <= 0: 932 raise ValueError( 933 "count must be a positive integer (got {!r})".format(count)) 934 if not isinstance(offset, int): 935 raise TypeError( 936 "offset must be a non-negative integer (got {!r})".format( 937 offset)) 938 if offset < 0: 939 raise ValueError( 940 "offset must be a non-negative integer (got {!r})".format( 941 offset)) 942 943 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 944 """Create, bind and connect one socket.""" 945 my_exceptions = [] 946 exceptions.append(my_exceptions) 947 family, type_, proto, _, address = addr_info 948 sock = None 949 try: 950 sock = socket.socket(family=family, type=type_, proto=proto) 951 sock.setblocking(False) 952 if local_addr_infos is not None: 953 for lfamily, _, _, _, laddr in local_addr_infos: 954 # skip local addresses of different family 955 if lfamily != family: 956 continue 957 try: 958 sock.bind(laddr) 959 break 960 except OSError as exc: 961 msg = ( 962 f'error while attempting to bind on ' 963 f'address {laddr!r}: ' 964 f'{exc.strerror.lower()}' 965 ) 966 exc = OSError(exc.errno, msg) 967 my_exceptions.append(exc) 968 else: # all bind attempts failed 969 if my_exceptions: 970 raise my_exceptions.pop() 971 else: 972 raise OSError(f"no matching local address with {family=} found") 973 await self.sock_connect(sock, address) 974 return sock 975 except OSError as exc: 976 my_exceptions.append(exc) 977 if sock is not None: 978 sock.close() 979 raise 980 except: 981 if sock is not None: 982 sock.close() 983 raise 984 finally: 985 exceptions = my_exceptions = None 986 987 async def create_connection( 988 self, protocol_factory, host=None, port=None, 989 *, ssl=None, family=0, 990 proto=0, flags=0, sock=None, 991 local_addr=None, server_hostname=None, 992 ssl_handshake_timeout=None, 993 ssl_shutdown_timeout=None, 994 happy_eyeballs_delay=None, interleave=None): 995 """Connect to a TCP server. 996 997 Create a streaming transport connection to a given internet host and 998 port: socket family AF_INET or socket.AF_INET6 depending on host (or 999 family if specified), socket type SOCK_STREAM. protocol_factory must be 1000 a callable returning a protocol instance. 1001 1002 This method is a coroutine which will try to establish the connection 1003 in the background. When successful, the coroutine returns a 1004 (transport, protocol) pair. 1005 """ 1006 if server_hostname is not None and not ssl: 1007 raise ValueError('server_hostname is only meaningful with ssl') 1008 1009 if server_hostname is None and ssl: 1010 # Use host as default for server_hostname. It is an error 1011 # if host is empty or not set, e.g. when an 1012 # already-connected socket was passed or when only a port 1013 # is given. To avoid this error, you can pass 1014 # server_hostname='' -- this will bypass the hostname 1015 # check. (This also means that if host is a numeric 1016 # IP/IPv6 address, we will attempt to verify that exact 1017 # address; this will probably fail, but it is possible to 1018 # create a certificate for a specific IP address, so we 1019 # don't judge it here.) 1020 if not host: 1021 raise ValueError('You must set server_hostname ' 1022 'when using ssl without a host') 1023 server_hostname = host 1024 1025 if ssl_handshake_timeout is not None and not ssl: 1026 raise ValueError( 1027 'ssl_handshake_timeout is only meaningful with ssl') 1028 1029 if ssl_shutdown_timeout is not None and not ssl: 1030 raise ValueError( 1031 'ssl_shutdown_timeout is only meaningful with ssl') 1032 1033 if sock is not None: 1034 _check_ssl_socket(sock) 1035 1036 if happy_eyeballs_delay is not None and interleave is None: 1037 # If using happy eyeballs, default to interleave addresses by family 1038 interleave = 1 1039 1040 if host is not None or port is not None: 1041 if sock is not None: 1042 raise ValueError( 1043 'host/port and sock can not be specified at the same time') 1044 1045 infos = await self._ensure_resolved( 1046 (host, port), family=family, 1047 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 1048 if not infos: 1049 raise OSError('getaddrinfo() returned empty list') 1050 1051 if local_addr is not None: 1052 laddr_infos = await self._ensure_resolved( 1053 local_addr, family=family, 1054 type=socket.SOCK_STREAM, proto=proto, 1055 flags=flags, loop=self) 1056 if not laddr_infos: 1057 raise OSError('getaddrinfo() returned empty list') 1058 else: 1059 laddr_infos = None 1060 1061 if interleave: 1062 infos = _interleave_addrinfos(infos, interleave) 1063 1064 exceptions = [] 1065 if happy_eyeballs_delay is None: 1066 # not using happy eyeballs 1067 for addrinfo in infos: 1068 try: 1069 sock = await self._connect_sock( 1070 exceptions, addrinfo, laddr_infos) 1071 break 1072 except OSError: 1073 continue 1074 else: # using happy eyeballs 1075 sock, _, _ = await staggered.staggered_race( 1076 (functools.partial(self._connect_sock, 1077 exceptions, addrinfo, laddr_infos) 1078 for addrinfo in infos), 1079 happy_eyeballs_delay, loop=self) 1080 1081 if sock is None: 1082 exceptions = [exc for sub in exceptions for exc in sub] 1083 try: 1084 if len(exceptions) == 1: 1085 raise exceptions[0] 1086 else: 1087 # If they all have the same str(), raise one. 1088 model = str(exceptions[0]) 1089 if all(str(exc) == model for exc in exceptions): 1090 raise exceptions[0] 1091 # Raise a combined exception so the user can see all 1092 # the various error messages. 1093 raise OSError('Multiple exceptions: {}'.format( 1094 ', '.join(str(exc) for exc in exceptions))) 1095 finally: 1096 exceptions = None 1097 1098 else: 1099 if sock is None: 1100 raise ValueError( 1101 'host and port was not specified and no sock specified') 1102 if sock.type != socket.SOCK_STREAM: 1103 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 1104 # are SOCK_STREAM. 1105 # We support passing AF_UNIX sockets even though we have 1106 # a dedicated API for that: create_unix_connection. 1107 # Disallowing AF_UNIX in this method, breaks backwards 1108 # compatibility. 1109 raise ValueError( 1110 f'A Stream Socket was expected, got {sock!r}') 1111 1112 transport, protocol = await self._create_connection_transport( 1113 sock, protocol_factory, ssl, server_hostname, 1114 ssl_handshake_timeout=ssl_handshake_timeout, 1115 ssl_shutdown_timeout=ssl_shutdown_timeout) 1116 if self._debug: 1117 # Get the socket from the transport because SSL transport closes 1118 # the old socket and creates a new SSL socket 1119 sock = transport.get_extra_info('socket') 1120 logger.debug("%r connected to %s:%r: (%r, %r)", 1121 sock, host, port, transport, protocol) 1122 return transport, protocol 1123 1124 async def _create_connection_transport( 1125 self, sock, protocol_factory, ssl, 1126 server_hostname, server_side=False, 1127 ssl_handshake_timeout=None, 1128 ssl_shutdown_timeout=None): 1129 1130 sock.setblocking(False) 1131 1132 protocol = protocol_factory() 1133 waiter = self.create_future() 1134 if ssl: 1135 sslcontext = None if isinstance(ssl, bool) else ssl 1136 transport = self._make_ssl_transport( 1137 sock, protocol, sslcontext, waiter, 1138 server_side=server_side, server_hostname=server_hostname, 1139 ssl_handshake_timeout=ssl_handshake_timeout, 1140 ssl_shutdown_timeout=ssl_shutdown_timeout) 1141 else: 1142 transport = self._make_socket_transport(sock, protocol, waiter) 1143 1144 try: 1145 await waiter 1146 except: 1147 transport.close() 1148 raise 1149 1150 return transport, protocol 1151 1152 async def sendfile(self, transport, file, offset=0, count=None, 1153 *, fallback=True): 1154 """Send a file to transport. 1155 1156 Return the total number of bytes which were sent. 1157 1158 The method uses high-performance os.sendfile if available. 1159 1160 file must be a regular file object opened in binary mode. 1161 1162 offset tells from where to start reading the file. If specified, 1163 count is the total number of bytes to transmit as opposed to 1164 sending the file until EOF is reached. File position is updated on 1165 return or also in case of error in which case file.tell() 1166 can be used to figure out the number of bytes 1167 which were sent. 1168 1169 fallback set to True makes asyncio to manually read and send 1170 the file when the platform does not support the sendfile syscall 1171 (e.g. Windows or SSL socket on Unix). 1172 1173 Raise SendfileNotAvailableError if the system does not support 1174 sendfile syscall and fallback is False. 1175 """ 1176 if transport.is_closing(): 1177 raise RuntimeError("Transport is closing") 1178 mode = getattr(transport, '_sendfile_compatible', 1179 constants._SendfileMode.UNSUPPORTED) 1180 if mode is constants._SendfileMode.UNSUPPORTED: 1181 raise RuntimeError( 1182 f"sendfile is not supported for transport {transport!r}") 1183 if mode is constants._SendfileMode.TRY_NATIVE: 1184 try: 1185 return await self._sendfile_native(transport, file, 1186 offset, count) 1187 except exceptions.SendfileNotAvailableError as exc: 1188 if not fallback: 1189 raise 1190 1191 if not fallback: 1192 raise RuntimeError( 1193 f"fallback is disabled and native sendfile is not " 1194 f"supported for transport {transport!r}") 1195 1196 return await self._sendfile_fallback(transport, file, 1197 offset, count) 1198 1199 async def _sendfile_native(self, transp, file, offset, count): 1200 raise exceptions.SendfileNotAvailableError( 1201 "sendfile syscall is not supported") 1202 1203 async def _sendfile_fallback(self, transp, file, offset, count): 1204 if offset: 1205 file.seek(offset) 1206 blocksize = min(count, 16384) if count else 16384 1207 buf = bytearray(blocksize) 1208 total_sent = 0 1209 proto = _SendfileFallbackProtocol(transp) 1210 try: 1211 while True: 1212 if count: 1213 blocksize = min(count - total_sent, blocksize) 1214 if blocksize <= 0: 1215 return total_sent 1216 view = memoryview(buf)[:blocksize] 1217 read = await self.run_in_executor(None, file.readinto, view) 1218 if not read: 1219 return total_sent # EOF 1220 await proto.drain() 1221 transp.write(view[:read]) 1222 total_sent += read 1223 finally: 1224 if total_sent > 0 and hasattr(file, 'seek'): 1225 file.seek(offset + total_sent) 1226 await proto.restore() 1227 1228 async def start_tls(self, transport, protocol, sslcontext, *, 1229 server_side=False, 1230 server_hostname=None, 1231 ssl_handshake_timeout=None, 1232 ssl_shutdown_timeout=None): 1233 """Upgrade transport to TLS. 1234 1235 Return a new transport that *protocol* should start using 1236 immediately. 1237 """ 1238 if ssl is None: 1239 raise RuntimeError('Python ssl module is not available') 1240 1241 if not isinstance(sslcontext, ssl.SSLContext): 1242 raise TypeError( 1243 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 1244 f'got {sslcontext!r}') 1245 1246 if not getattr(transport, '_start_tls_compatible', False): 1247 raise TypeError( 1248 f'transport {transport!r} is not supported by start_tls()') 1249 1250 waiter = self.create_future() 1251 ssl_protocol = sslproto.SSLProtocol( 1252 self, protocol, sslcontext, waiter, 1253 server_side, server_hostname, 1254 ssl_handshake_timeout=ssl_handshake_timeout, 1255 ssl_shutdown_timeout=ssl_shutdown_timeout, 1256 call_connection_made=False) 1257 1258 # Pause early so that "ssl_protocol.data_received()" doesn't 1259 # have a chance to get called before "ssl_protocol.connection_made()". 1260 transport.pause_reading() 1261 1262 transport.set_protocol(ssl_protocol) 1263 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 1264 resume_cb = self.call_soon(transport.resume_reading) 1265 1266 try: 1267 await waiter 1268 except BaseException: 1269 transport.close() 1270 conmade_cb.cancel() 1271 resume_cb.cancel() 1272 raise 1273 1274 return ssl_protocol._app_transport 1275 1276 async def create_datagram_endpoint(self, protocol_factory, 1277 local_addr=None, remote_addr=None, *, 1278 family=0, proto=0, flags=0, 1279 reuse_port=None, 1280 allow_broadcast=None, sock=None): 1281 """Create datagram connection.""" 1282 if sock is not None: 1283 if sock.type != socket.SOCK_DGRAM: 1284 raise ValueError( 1285 f'A UDP Socket was expected, got {sock!r}') 1286 if (local_addr or remote_addr or 1287 family or proto or flags or 1288 reuse_port or allow_broadcast): 1289 # show the problematic kwargs in exception msg 1290 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 1291 family=family, proto=proto, flags=flags, 1292 reuse_port=reuse_port, 1293 allow_broadcast=allow_broadcast) 1294 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 1295 raise ValueError( 1296 f'socket modifier keyword arguments can not be used ' 1297 f'when sock is specified. ({problems})') 1298 sock.setblocking(False) 1299 r_addr = None 1300 else: 1301 if not (local_addr or remote_addr): 1302 if family == 0: 1303 raise ValueError('unexpected address family') 1304 addr_pairs_info = (((family, proto), (None, None)),) 1305 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 1306 for addr in (local_addr, remote_addr): 1307 if addr is not None and not isinstance(addr, str): 1308 raise TypeError('string is expected') 1309 1310 if local_addr and local_addr[0] not in (0, '\x00'): 1311 try: 1312 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1313 os.remove(local_addr) 1314 except FileNotFoundError: 1315 pass 1316 except OSError as err: 1317 # Directory may have permissions only to create socket. 1318 logger.error('Unable to check or remove stale UNIX ' 1319 'socket %r: %r', 1320 local_addr, err) 1321 1322 addr_pairs_info = (((family, proto), 1323 (local_addr, remote_addr)), ) 1324 else: 1325 # join address by (family, protocol) 1326 addr_infos = {} # Using order preserving dict 1327 for idx, addr in ((0, local_addr), (1, remote_addr)): 1328 if addr is not None: 1329 if not (isinstance(addr, tuple) and len(addr) == 2): 1330 raise TypeError('2-tuple is expected') 1331 1332 infos = await self._ensure_resolved( 1333 addr, family=family, type=socket.SOCK_DGRAM, 1334 proto=proto, flags=flags, loop=self) 1335 if not infos: 1336 raise OSError('getaddrinfo() returned empty list') 1337 1338 for fam, _, pro, _, address in infos: 1339 key = (fam, pro) 1340 if key not in addr_infos: 1341 addr_infos[key] = [None, None] 1342 addr_infos[key][idx] = address 1343 1344 # each addr has to have info for each (family, proto) pair 1345 addr_pairs_info = [ 1346 (key, addr_pair) for key, addr_pair in addr_infos.items() 1347 if not ((local_addr and addr_pair[0] is None) or 1348 (remote_addr and addr_pair[1] is None))] 1349 1350 if not addr_pairs_info: 1351 raise ValueError('can not get address information') 1352 1353 exceptions = [] 1354 1355 for ((family, proto), 1356 (local_address, remote_address)) in addr_pairs_info: 1357 sock = None 1358 r_addr = None 1359 try: 1360 sock = socket.socket( 1361 family=family, type=socket.SOCK_DGRAM, proto=proto) 1362 if reuse_port: 1363 _set_reuseport(sock) 1364 if allow_broadcast: 1365 sock.setsockopt( 1366 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1367 sock.setblocking(False) 1368 1369 if local_addr: 1370 sock.bind(local_address) 1371 if remote_addr: 1372 if not allow_broadcast: 1373 await self.sock_connect(sock, remote_address) 1374 r_addr = remote_address 1375 except OSError as exc: 1376 if sock is not None: 1377 sock.close() 1378 exceptions.append(exc) 1379 except: 1380 if sock is not None: 1381 sock.close() 1382 raise 1383 else: 1384 break 1385 else: 1386 raise exceptions[0] 1387 1388 protocol = protocol_factory() 1389 waiter = self.create_future() 1390 transport = self._make_datagram_transport( 1391 sock, protocol, r_addr, waiter) 1392 if self._debug: 1393 if local_addr: 1394 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 1395 "created: (%r, %r)", 1396 local_addr, remote_addr, transport, protocol) 1397 else: 1398 logger.debug("Datagram endpoint remote_addr=%r created: " 1399 "(%r, %r)", 1400 remote_addr, transport, protocol) 1401 1402 try: 1403 await waiter 1404 except: 1405 transport.close() 1406 raise 1407 1408 return transport, protocol 1409 1410 async def _ensure_resolved(self, address, *, 1411 family=0, type=socket.SOCK_STREAM, 1412 proto=0, flags=0, loop): 1413 host, port = address[:2] 1414 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 1415 if info is not None: 1416 # "host" is already a resolved IP. 1417 return [info] 1418 else: 1419 return await loop.getaddrinfo(host, port, family=family, type=type, 1420 proto=proto, flags=flags) 1421 1422 async def _create_server_getaddrinfo(self, host, port, family, flags): 1423 infos = await self._ensure_resolved((host, port), family=family, 1424 type=socket.SOCK_STREAM, 1425 flags=flags, loop=self) 1426 if not infos: 1427 raise OSError(f'getaddrinfo({host!r}) returned empty list') 1428 return infos 1429 1430 async def create_server( 1431 self, protocol_factory, host=None, port=None, 1432 *, 1433 family=socket.AF_UNSPEC, 1434 flags=socket.AI_PASSIVE, 1435 sock=None, 1436 backlog=100, 1437 ssl=None, 1438 reuse_address=None, 1439 reuse_port=None, 1440 ssl_handshake_timeout=None, 1441 ssl_shutdown_timeout=None, 1442 start_serving=True): 1443 """Create a TCP server. 1444 1445 The host parameter can be a string, in that case the TCP server is 1446 bound to host and port. 1447 1448 The host parameter can also be a sequence of strings and in that case 1449 the TCP server is bound to all hosts of the sequence. If a host 1450 appears multiple times (possibly indirectly e.g. when hostnames 1451 resolve to the same IP address), the server is only bound once to that 1452 host. 1453 1454 Return a Server object which can be used to stop the service. 1455 1456 This method is a coroutine. 1457 """ 1458 if isinstance(ssl, bool): 1459 raise TypeError('ssl argument must be an SSLContext or None') 1460 1461 if ssl_handshake_timeout is not None and ssl is None: 1462 raise ValueError( 1463 'ssl_handshake_timeout is only meaningful with ssl') 1464 1465 if ssl_shutdown_timeout is not None and ssl is None: 1466 raise ValueError( 1467 'ssl_shutdown_timeout is only meaningful with ssl') 1468 1469 if sock is not None: 1470 _check_ssl_socket(sock) 1471 1472 if host is not None or port is not None: 1473 if sock is not None: 1474 raise ValueError( 1475 'host/port and sock can not be specified at the same time') 1476 1477 if reuse_address is None: 1478 reuse_address = os.name == "posix" and sys.platform != "cygwin" 1479 sockets = [] 1480 if host == '': 1481 hosts = [None] 1482 elif (isinstance(host, str) or 1483 not isinstance(host, collections.abc.Iterable)): 1484 hosts = [host] 1485 else: 1486 hosts = host 1487 1488 fs = [self._create_server_getaddrinfo(host, port, family=family, 1489 flags=flags) 1490 for host in hosts] 1491 infos = await tasks.gather(*fs) 1492 infos = set(itertools.chain.from_iterable(infos)) 1493 1494 completed = False 1495 try: 1496 for res in infos: 1497 af, socktype, proto, canonname, sa = res 1498 try: 1499 sock = socket.socket(af, socktype, proto) 1500 except socket.error: 1501 # Assume it's a bad family/type/protocol combination. 1502 if self._debug: 1503 logger.warning('create_server() failed to create ' 1504 'socket.socket(%r, %r, %r)', 1505 af, socktype, proto, exc_info=True) 1506 continue 1507 sockets.append(sock) 1508 if reuse_address: 1509 sock.setsockopt( 1510 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 1511 if reuse_port: 1512 _set_reuseport(sock) 1513 # Disable IPv4/IPv6 dual stack support (enabled by 1514 # default on Linux) which makes a single socket 1515 # listen on both address families. 1516 if (_HAS_IPv6 and 1517 af == socket.AF_INET6 and 1518 hasattr(socket, 'IPPROTO_IPV6')): 1519 sock.setsockopt(socket.IPPROTO_IPV6, 1520 socket.IPV6_V6ONLY, 1521 True) 1522 try: 1523 sock.bind(sa) 1524 except OSError as err: 1525 raise OSError(err.errno, 'error while attempting ' 1526 'to bind on address %r: %s' 1527 % (sa, err.strerror.lower())) from None 1528 completed = True 1529 finally: 1530 if not completed: 1531 for sock in sockets: 1532 sock.close() 1533 else: 1534 if sock is None: 1535 raise ValueError('Neither host/port nor sock were specified') 1536 if sock.type != socket.SOCK_STREAM: 1537 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1538 sockets = [sock] 1539 1540 for sock in sockets: 1541 sock.setblocking(False) 1542 1543 server = Server(self, sockets, protocol_factory, 1544 ssl, backlog, ssl_handshake_timeout, 1545 ssl_shutdown_timeout) 1546 if start_serving: 1547 server._start_serving() 1548 # Skip one loop iteration so that all 'loop.add_reader' 1549 # go through. 1550 await tasks.sleep(0) 1551 1552 if self._debug: 1553 logger.info("%r is serving", server) 1554 return server 1555 1556 async def connect_accepted_socket( 1557 self, protocol_factory, sock, 1558 *, ssl=None, 1559 ssl_handshake_timeout=None, 1560 ssl_shutdown_timeout=None): 1561 if sock.type != socket.SOCK_STREAM: 1562 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1563 1564 if ssl_handshake_timeout is not None and not ssl: 1565 raise ValueError( 1566 'ssl_handshake_timeout is only meaningful with ssl') 1567 1568 if ssl_shutdown_timeout is not None and not ssl: 1569 raise ValueError( 1570 'ssl_shutdown_timeout is only meaningful with ssl') 1571 1572 if sock is not None: 1573 _check_ssl_socket(sock) 1574 1575 transport, protocol = await self._create_connection_transport( 1576 sock, protocol_factory, ssl, '', server_side=True, 1577 ssl_handshake_timeout=ssl_handshake_timeout, 1578 ssl_shutdown_timeout=ssl_shutdown_timeout) 1579 if self._debug: 1580 # Get the socket from the transport because SSL transport closes 1581 # the old socket and creates a new SSL socket 1582 sock = transport.get_extra_info('socket') 1583 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 1584 return transport, protocol 1585 1586 async def connect_read_pipe(self, protocol_factory, pipe): 1587 protocol = protocol_factory() 1588 waiter = self.create_future() 1589 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 1590 1591 try: 1592 await waiter 1593 except: 1594 transport.close() 1595 raise 1596 1597 if self._debug: 1598 logger.debug('Read pipe %r connected: (%r, %r)', 1599 pipe.fileno(), transport, protocol) 1600 return transport, protocol 1601 1602 async def connect_write_pipe(self, protocol_factory, pipe): 1603 protocol = protocol_factory() 1604 waiter = self.create_future() 1605 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 1606 1607 try: 1608 await waiter 1609 except: 1610 transport.close() 1611 raise 1612 1613 if self._debug: 1614 logger.debug('Write pipe %r connected: (%r, %r)', 1615 pipe.fileno(), transport, protocol) 1616 return transport, protocol 1617 1618 def _log_subprocess(self, msg, stdin, stdout, stderr): 1619 info = [msg] 1620 if stdin is not None: 1621 info.append(f'stdin={_format_pipe(stdin)}') 1622 if stdout is not None and stderr == subprocess.STDOUT: 1623 info.append(f'stdout=stderr={_format_pipe(stdout)}') 1624 else: 1625 if stdout is not None: 1626 info.append(f'stdout={_format_pipe(stdout)}') 1627 if stderr is not None: 1628 info.append(f'stderr={_format_pipe(stderr)}') 1629 logger.debug(' '.join(info)) 1630 1631 async def subprocess_shell(self, protocol_factory, cmd, *, 1632 stdin=subprocess.PIPE, 1633 stdout=subprocess.PIPE, 1634 stderr=subprocess.PIPE, 1635 universal_newlines=False, 1636 shell=True, bufsize=0, 1637 encoding=None, errors=None, text=None, 1638 **kwargs): 1639 if not isinstance(cmd, (bytes, str)): 1640 raise ValueError("cmd must be a string") 1641 if universal_newlines: 1642 raise ValueError("universal_newlines must be False") 1643 if not shell: 1644 raise ValueError("shell must be True") 1645 if bufsize != 0: 1646 raise ValueError("bufsize must be 0") 1647 if text: 1648 raise ValueError("text must be False") 1649 if encoding is not None: 1650 raise ValueError("encoding must be None") 1651 if errors is not None: 1652 raise ValueError("errors must be None") 1653 1654 protocol = protocol_factory() 1655 debug_log = None 1656 if self._debug: 1657 # don't log parameters: they may contain sensitive information 1658 # (password) and may be too long 1659 debug_log = 'run shell command %r' % cmd 1660 self._log_subprocess(debug_log, stdin, stdout, stderr) 1661 transport = await self._make_subprocess_transport( 1662 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 1663 if self._debug and debug_log is not None: 1664 logger.info('%s: %r', debug_log, transport) 1665 return transport, protocol 1666 1667 async def subprocess_exec(self, protocol_factory, program, *args, 1668 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1669 stderr=subprocess.PIPE, universal_newlines=False, 1670 shell=False, bufsize=0, 1671 encoding=None, errors=None, text=None, 1672 **kwargs): 1673 if universal_newlines: 1674 raise ValueError("universal_newlines must be False") 1675 if shell: 1676 raise ValueError("shell must be False") 1677 if bufsize != 0: 1678 raise ValueError("bufsize must be 0") 1679 if text: 1680 raise ValueError("text must be False") 1681 if encoding is not None: 1682 raise ValueError("encoding must be None") 1683 if errors is not None: 1684 raise ValueError("errors must be None") 1685 1686 popen_args = (program,) + args 1687 protocol = protocol_factory() 1688 debug_log = None 1689 if self._debug: 1690 # don't log parameters: they may contain sensitive information 1691 # (password) and may be too long 1692 debug_log = f'execute program {program!r}' 1693 self._log_subprocess(debug_log, stdin, stdout, stderr) 1694 transport = await self._make_subprocess_transport( 1695 protocol, popen_args, False, stdin, stdout, stderr, 1696 bufsize, **kwargs) 1697 if self._debug and debug_log is not None: 1698 logger.info('%s: %r', debug_log, transport) 1699 return transport, protocol 1700 1701 def get_exception_handler(self): 1702 """Return an exception handler, or None if the default one is in use. 1703 """ 1704 return self._exception_handler 1705 1706 def set_exception_handler(self, handler): 1707 """Set handler as the new event loop exception handler. 1708 1709 If handler is None, the default exception handler will 1710 be set. 1711 1712 If handler is a callable object, it should have a 1713 signature matching '(loop, context)', where 'loop' 1714 will be a reference to the active event loop, 'context' 1715 will be a dict object (see `call_exception_handler()` 1716 documentation for details about context). 1717 """ 1718 if handler is not None and not callable(handler): 1719 raise TypeError(f'A callable object or None is expected, ' 1720 f'got {handler!r}') 1721 self._exception_handler = handler 1722 1723 def default_exception_handler(self, context): 1724 """Default exception handler. 1725 1726 This is called when an exception occurs and no exception 1727 handler is set, and can be called by a custom exception 1728 handler that wants to defer to the default behavior. 1729 1730 This default handler logs the error message and other 1731 context-dependent information. In debug mode, a truncated 1732 stack trace is also appended showing where the given object 1733 (e.g. a handle or future or task) was created, if any. 1734 1735 The context parameter has the same meaning as in 1736 `call_exception_handler()`. 1737 """ 1738 message = context.get('message') 1739 if not message: 1740 message = 'Unhandled exception in event loop' 1741 1742 exception = context.get('exception') 1743 if exception is not None: 1744 exc_info = (type(exception), exception, exception.__traceback__) 1745 else: 1746 exc_info = False 1747 1748 if ('source_traceback' not in context and 1749 self._current_handle is not None and 1750 self._current_handle._source_traceback): 1751 context['handle_traceback'] = \ 1752 self._current_handle._source_traceback 1753 1754 log_lines = [message] 1755 for key in sorted(context): 1756 if key in {'message', 'exception'}: 1757 continue 1758 value = context[key] 1759 if key == 'source_traceback': 1760 tb = ''.join(traceback.format_list(value)) 1761 value = 'Object created at (most recent call last):\n' 1762 value += tb.rstrip() 1763 elif key == 'handle_traceback': 1764 tb = ''.join(traceback.format_list(value)) 1765 value = 'Handle created at (most recent call last):\n' 1766 value += tb.rstrip() 1767 else: 1768 value = repr(value) 1769 log_lines.append(f'{key}: {value}') 1770 1771 logger.error('\n'.join(log_lines), exc_info=exc_info) 1772 1773 def call_exception_handler(self, context): 1774 """Call the current event loop's exception handler. 1775 1776 The context argument is a dict containing the following keys: 1777 1778 - 'message': Error message; 1779 - 'exception' (optional): Exception object; 1780 - 'future' (optional): Future instance; 1781 - 'task' (optional): Task instance; 1782 - 'handle' (optional): Handle instance; 1783 - 'protocol' (optional): Protocol instance; 1784 - 'transport' (optional): Transport instance; 1785 - 'socket' (optional): Socket instance; 1786 - 'asyncgen' (optional): Asynchronous generator that caused 1787 the exception. 1788 1789 New keys maybe introduced in the future. 1790 1791 Note: do not overload this method in an event loop subclass. 1792 For custom exception handling, use the 1793 `set_exception_handler()` method. 1794 """ 1795 if self._exception_handler is None: 1796 try: 1797 self.default_exception_handler(context) 1798 except (SystemExit, KeyboardInterrupt): 1799 raise 1800 except BaseException: 1801 # Second protection layer for unexpected errors 1802 # in the default implementation, as well as for subclassed 1803 # event loops with overloaded "default_exception_handler". 1804 logger.error('Exception in default exception handler', 1805 exc_info=True) 1806 else: 1807 try: 1808 self._exception_handler(self, context) 1809 except (SystemExit, KeyboardInterrupt): 1810 raise 1811 except BaseException as exc: 1812 # Exception in the user set custom exception handler. 1813 try: 1814 # Let's try default handler. 1815 self.default_exception_handler({ 1816 'message': 'Unhandled error in exception handler', 1817 'exception': exc, 1818 'context': context, 1819 }) 1820 except (SystemExit, KeyboardInterrupt): 1821 raise 1822 except BaseException: 1823 # Guard 'default_exception_handler' in case it is 1824 # overloaded. 1825 logger.error('Exception in default exception handler ' 1826 'while handling an unexpected error ' 1827 'in custom exception handler', 1828 exc_info=True) 1829 1830 def _add_callback(self, handle): 1831 """Add a Handle to _ready.""" 1832 if not handle._cancelled: 1833 self._ready.append(handle) 1834 1835 def _add_callback_signalsafe(self, handle): 1836 """Like _add_callback() but called from a signal handler.""" 1837 self._add_callback(handle) 1838 self._write_to_self() 1839 1840 def _timer_handle_cancelled(self, handle): 1841 """Notification that a TimerHandle has been cancelled.""" 1842 if handle._scheduled: 1843 self._timer_cancelled_count += 1 1844 1845 def _run_once(self): 1846 """Run one full iteration of the event loop. 1847 1848 This calls all currently ready callbacks, polls for I/O, 1849 schedules the resulting callbacks, and finally schedules 1850 'call_later' callbacks. 1851 """ 1852 1853 sched_count = len(self._scheduled) 1854 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 1855 self._timer_cancelled_count / sched_count > 1856 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 1857 # Remove delayed calls that were cancelled if their number 1858 # is too high 1859 new_scheduled = [] 1860 for handle in self._scheduled: 1861 if handle._cancelled: 1862 handle._scheduled = False 1863 else: 1864 new_scheduled.append(handle) 1865 1866 heapq.heapify(new_scheduled) 1867 self._scheduled = new_scheduled 1868 self._timer_cancelled_count = 0 1869 else: 1870 # Remove delayed calls that were cancelled from head of queue. 1871 while self._scheduled and self._scheduled[0]._cancelled: 1872 self._timer_cancelled_count -= 1 1873 handle = heapq.heappop(self._scheduled) 1874 handle._scheduled = False 1875 1876 timeout = None 1877 if self._ready or self._stopping: 1878 timeout = 0 1879 elif self._scheduled: 1880 # Compute the desired timeout. 1881 when = self._scheduled[0]._when 1882 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) 1883 1884 event_list = self._selector.select(timeout) 1885 self._process_events(event_list) 1886 # Needed to break cycles when an exception occurs. 1887 event_list = None 1888 1889 # Handle 'later' callbacks that are ready. 1890 end_time = self.time() + self._clock_resolution 1891 while self._scheduled: 1892 handle = self._scheduled[0] 1893 if handle._when >= end_time: 1894 break 1895 handle = heapq.heappop(self._scheduled) 1896 handle._scheduled = False 1897 self._ready.append(handle) 1898 1899 # This is the only place where callbacks are actually *called*. 1900 # All other places just add them to ready. 1901 # Note: We run all currently scheduled callbacks, but not any 1902 # callbacks scheduled by callbacks run this time around -- 1903 # they will be run the next time (after another I/O poll). 1904 # Use an idiom that is thread-safe without using locks. 1905 ntodo = len(self._ready) 1906 for i in range(ntodo): 1907 handle = self._ready.popleft() 1908 if handle._cancelled: 1909 continue 1910 if self._debug: 1911 try: 1912 self._current_handle = handle 1913 t0 = self.time() 1914 handle._run() 1915 dt = self.time() - t0 1916 if dt >= self.slow_callback_duration: 1917 logger.warning('Executing %s took %.3f seconds', 1918 _format_handle(handle), dt) 1919 finally: 1920 self._current_handle = None 1921 else: 1922 handle._run() 1923 handle = None # Needed to break cycles when an exception occurs. 1924 1925 def _set_coroutine_origin_tracking(self, enabled): 1926 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 1927 return 1928 1929 if enabled: 1930 self._coroutine_origin_tracking_saved_depth = ( 1931 sys.get_coroutine_origin_tracking_depth()) 1932 sys.set_coroutine_origin_tracking_depth( 1933 constants.DEBUG_STACK_DEPTH) 1934 else: 1935 sys.set_coroutine_origin_tracking_depth( 1936 self._coroutine_origin_tracking_saved_depth) 1937 1938 self._coroutine_origin_tracking_enabled = enabled 1939 1940 def get_debug(self): 1941 return self._debug 1942 1943 def set_debug(self, enabled): 1944 self._debug = enabled 1945 1946 if self.is_running(): 1947 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) 1948