1"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of I/O events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called.  This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16import collections
17import collections.abc
18import concurrent.futures
19import functools
20import heapq
21import itertools
22import os
23import socket
24import stat
25import subprocess
26import threading
27import time
28import traceback
29import sys
30import warnings
31import weakref
32
33try:
34    import ssl
35except ImportError:  # pragma: no cover
36    ssl = None
37
38from . import constants
39from . import coroutines
40from . import events
41from . import exceptions
42from . import futures
43from . import protocols
44from . import sslproto
45from . import staggered
46from . import tasks
47from . import transports
48from . import trsock
49from .log import logger
50
51
52__all__ = 'BaseEventLoop','Server',
53
54
55# Minimum number of _scheduled timer handles before cleanup of
56# cancelled handles is performed.
57_MIN_SCHEDULED_TIMER_HANDLES = 100
58
59# Minimum fraction of _scheduled timer handles that are cancelled
60# before cleanup of cancelled handles is performed.
61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
62
63
64_HAS_IPv6 = hasattr(socket, 'AF_INET6')
65
66# Maximum timeout passed to select to avoid OS limitations
67MAXIMUM_SELECT_TIMEOUT = 24 * 3600
68
69
70def _format_handle(handle):
71    cb = handle._callback
72    if isinstance(getattr(cb, '__self__', None), tasks.Task):
73        # format the task
74        return repr(cb.__self__)
75    else:
76        return str(handle)
77
78
79def _format_pipe(fd):
80    if fd == subprocess.PIPE:
81        return '<pipe>'
82    elif fd == subprocess.STDOUT:
83        return '<stdout>'
84    else:
85        return repr(fd)
86
87
88def _set_reuseport(sock):
89    if not hasattr(socket, 'SO_REUSEPORT'):
90        raise ValueError('reuse_port not supported by socket module')
91    else:
92        try:
93            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
94        except OSError:
95            raise ValueError('reuse_port not supported by socket module, '
96                             'SO_REUSEPORT defined but not implemented.')
97
98
99def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0):
100    # Try to skip getaddrinfo if "host" is already an IP. Users might have
101    # handled name resolution in their own code and pass in resolved IPs.
102    if not hasattr(socket, 'inet_pton'):
103        return
104
105    if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \
106            host is None:
107        return None
108
109    if type == socket.SOCK_STREAM:
110        proto = socket.IPPROTO_TCP
111    elif type == socket.SOCK_DGRAM:
112        proto = socket.IPPROTO_UDP
113    else:
114        return None
115
116    if port is None:
117        port = 0
118    elif isinstance(port, bytes) and port == b'':
119        port = 0
120    elif isinstance(port, str) and port == '':
121        port = 0
122    else:
123        # If port's a service name like "http", don't skip getaddrinfo.
124        try:
125            port = int(port)
126        except (TypeError, ValueError):
127            return None
128
129    if family == socket.AF_UNSPEC:
130        afs = [socket.AF_INET]
131        if _HAS_IPv6:
132            afs.append(socket.AF_INET6)
133    else:
134        afs = [family]
135
136    if isinstance(host, bytes):
137        host = host.decode('idna')
138    if '%' in host:
139        # Linux's inet_pton doesn't accept an IPv6 zone index after host,
140        # like '::1%lo0'.
141        return None
142
143    for af in afs:
144        try:
145            socket.inet_pton(af, host)
146            # The host has already been resolved.
147            if _HAS_IPv6 and af == socket.AF_INET6:
148                return af, type, proto, '', (host, port, flowinfo, scopeid)
149            else:
150                return af, type, proto, '', (host, port)
151        except OSError:
152            pass
153
154    # "host" is not an IP address.
155    return None
156
157
158def _interleave_addrinfos(addrinfos, first_address_family_count=1):
159    """Interleave list of addrinfo tuples by family."""
160    # Group addresses by family
161    addrinfos_by_family = collections.OrderedDict()
162    for addr in addrinfos:
163        family = addr[0]
164        if family not in addrinfos_by_family:
165            addrinfos_by_family[family] = []
166        addrinfos_by_family[family].append(addr)
167    addrinfos_lists = list(addrinfos_by_family.values())
168
169    reordered = []
170    if first_address_family_count > 1:
171        reordered.extend(addrinfos_lists[0][:first_address_family_count - 1])
172        del addrinfos_lists[0][:first_address_family_count - 1]
173    reordered.extend(
174        a for a in itertools.chain.from_iterable(
175            itertools.zip_longest(*addrinfos_lists)
176        ) if a is not None)
177    return reordered
178
179
180def _run_until_complete_cb(fut):
181    if not fut.cancelled():
182        exc = fut.exception()
183        if isinstance(exc, (SystemExit, KeyboardInterrupt)):
184            # Issue #22429: run_forever() already finished, no need to
185            # stop it.
186            return
187    futures._get_loop(fut).stop()
188
189
190if hasattr(socket, 'TCP_NODELAY'):
191    def _set_nodelay(sock):
192        if (sock.family in {socket.AF_INET, socket.AF_INET6} and
193                sock.type == socket.SOCK_STREAM and
194                sock.proto == socket.IPPROTO_TCP):
195            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
196else:
197    def _set_nodelay(sock):
198        pass
199
200
201def _check_ssl_socket(sock):
202    if ssl is not None and isinstance(sock, ssl.SSLSocket):
203        raise TypeError("Socket cannot be of type SSLSocket")
204
205
206class _SendfileFallbackProtocol(protocols.Protocol):
207    def __init__(self, transp):
208        if not isinstance(transp, transports._FlowControlMixin):
209            raise TypeError("transport should be _FlowControlMixin instance")
210        self._transport = transp
211        self._proto = transp.get_protocol()
212        self._should_resume_reading = transp.is_reading()
213        self._should_resume_writing = transp._protocol_paused
214        transp.pause_reading()
215        transp.set_protocol(self)
216        if self._should_resume_writing:
217            self._write_ready_fut = self._transport._loop.create_future()
218        else:
219            self._write_ready_fut = None
220
221    async def drain(self):
222        if self._transport.is_closing():
223            raise ConnectionError("Connection closed by peer")
224        fut = self._write_ready_fut
225        if fut is None:
226            return
227        await fut
228
229    def connection_made(self, transport):
230        raise RuntimeError("Invalid state: "
231                           "connection should have been established already.")
232
233    def connection_lost(self, exc):
234        if self._write_ready_fut is not None:
235            # Never happens if peer disconnects after sending the whole content
236            # Thus disconnection is always an exception from user perspective
237            if exc is None:
238                self._write_ready_fut.set_exception(
239                    ConnectionError("Connection is closed by peer"))
240            else:
241                self._write_ready_fut.set_exception(exc)
242        self._proto.connection_lost(exc)
243
244    def pause_writing(self):
245        if self._write_ready_fut is not None:
246            return
247        self._write_ready_fut = self._transport._loop.create_future()
248
249    def resume_writing(self):
250        if self._write_ready_fut is None:
251            return
252        self._write_ready_fut.set_result(False)
253        self._write_ready_fut = None
254
255    def data_received(self, data):
256        raise RuntimeError("Invalid state: reading should be paused")
257
258    def eof_received(self):
259        raise RuntimeError("Invalid state: reading should be paused")
260
261    async def restore(self):
262        self._transport.set_protocol(self._proto)
263        if self._should_resume_reading:
264            self._transport.resume_reading()
265        if self._write_ready_fut is not None:
266            # Cancel the future.
267            # Basically it has no effect because protocol is switched back,
268            # no code should wait for it anymore.
269            self._write_ready_fut.cancel()
270        if self._should_resume_writing:
271            self._proto.resume_writing()
272
273
274class Server(events.AbstractServer):
275
276    def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
277                 ssl_handshake_timeout, ssl_shutdown_timeout=None):
278        self._loop = loop
279        self._sockets = sockets
280        self._active_count = 0
281        self._waiters = []
282        self._protocol_factory = protocol_factory
283        self._backlog = backlog
284        self._ssl_context = ssl_context
285        self._ssl_handshake_timeout = ssl_handshake_timeout
286        self._ssl_shutdown_timeout = ssl_shutdown_timeout
287        self._serving = False
288        self._serving_forever_fut = None
289
290    def __repr__(self):
291        return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
292
293    def _attach(self):
294        assert self._sockets is not None
295        self._active_count += 1
296
297    def _detach(self):
298        assert self._active_count > 0
299        self._active_count -= 1
300        if self._active_count == 0 and self._sockets is None:
301            self._wakeup()
302
303    def _wakeup(self):
304        waiters = self._waiters
305        self._waiters = None
306        for waiter in waiters:
307            if not waiter.done():
308                waiter.set_result(waiter)
309
310    def _start_serving(self):
311        if self._serving:
312            return
313        self._serving = True
314        for sock in self._sockets:
315            sock.listen(self._backlog)
316            self._loop._start_serving(
317                self._protocol_factory, sock, self._ssl_context,
318                self, self._backlog, self._ssl_handshake_timeout,
319                self._ssl_shutdown_timeout)
320
321    def get_loop(self):
322        return self._loop
323
324    def is_serving(self):
325        return self._serving
326
327    @property
328    def sockets(self):
329        if self._sockets is None:
330            return ()
331        return tuple(trsock.TransportSocket(s) for s in self._sockets)
332
333    def close(self):
334        sockets = self._sockets
335        if sockets is None:
336            return
337        self._sockets = None
338
339        for sock in sockets:
340            self._loop._stop_serving(sock)
341
342        self._serving = False
343
344        if (self._serving_forever_fut is not None and
345                not self._serving_forever_fut.done()):
346            self._serving_forever_fut.cancel()
347            self._serving_forever_fut = None
348
349        if self._active_count == 0:
350            self._wakeup()
351
352    async def start_serving(self):
353        self._start_serving()
354        # Skip one loop iteration so that all 'loop.add_reader'
355        # go through.
356        await tasks.sleep(0)
357
358    async def serve_forever(self):
359        if self._serving_forever_fut is not None:
360            raise RuntimeError(
361                f'server {self!r} is already being awaited on serve_forever()')
362        if self._sockets is None:
363            raise RuntimeError(f'server {self!r} is closed')
364
365        self._start_serving()
366        self._serving_forever_fut = self._loop.create_future()
367
368        try:
369            await self._serving_forever_fut
370        except exceptions.CancelledError:
371            try:
372                self.close()
373                await self.wait_closed()
374            finally:
375                raise
376        finally:
377            self._serving_forever_fut = None
378
379    async def wait_closed(self):
380        if self._sockets is None or self._waiters is None:
381            return
382        waiter = self._loop.create_future()
383        self._waiters.append(waiter)
384        await waiter
385
386
387class BaseEventLoop(events.AbstractEventLoop):
388
389    def __init__(self):
390        self._timer_cancelled_count = 0
391        self._closed = False
392        self._stopping = False
393        self._ready = collections.deque()
394        self._scheduled = []
395        self._default_executor = None
396        self._internal_fds = 0
397        # Identifier of the thread running the event loop, or None if the
398        # event loop is not running
399        self._thread_id = None
400        self._clock_resolution = time.get_clock_info('monotonic').resolution
401        self._exception_handler = None
402        self.set_debug(coroutines._is_debug_mode())
403        # In debug mode, if the execution of a callback or a step of a task
404        # exceed this duration in seconds, the slow callback/task is logged.
405        self.slow_callback_duration = 0.1
406        self._current_handle = None
407        self._task_factory = None
408        self._coroutine_origin_tracking_enabled = False
409        self._coroutine_origin_tracking_saved_depth = None
410
411        # A weak set of all asynchronous generators that are
412        # being iterated by the loop.
413        self._asyncgens = weakref.WeakSet()
414        # Set to True when `loop.shutdown_asyncgens` is called.
415        self._asyncgens_shutdown_called = False
416        # Set to True when `loop.shutdown_default_executor` is called.
417        self._executor_shutdown_called = False
418
419    def __repr__(self):
420        return (
421            f'<{self.__class__.__name__} running={self.is_running()} '
422            f'closed={self.is_closed()} debug={self.get_debug()}>'
423        )
424
425    def create_future(self):
426        """Create a Future object attached to the loop."""
427        return futures.Future(loop=self)
428
429    def create_task(self, coro, *, name=None, context=None):
430        """Schedule a coroutine object.
431
432        Return a task object.
433        """
434        self._check_closed()
435        if self._task_factory is None:
436            task = tasks.Task(coro, loop=self, name=name, context=context)
437            if task._source_traceback:
438                del task._source_traceback[-1]
439        else:
440            if context is None:
441                # Use legacy API if context is not needed
442                task = self._task_factory(self, coro)
443            else:
444                task = self._task_factory(self, coro, context=context)
445
446            tasks._set_task_name(task, name)
447
448        return task
449
450    def set_task_factory(self, factory):
451        """Set a task factory that will be used by loop.create_task().
452
453        If factory is None the default task factory will be set.
454
455        If factory is a callable, it should have a signature matching
456        '(loop, coro)', where 'loop' will be a reference to the active
457        event loop, 'coro' will be a coroutine object.  The callable
458        must return a Future.
459        """
460        if factory is not None and not callable(factory):
461            raise TypeError('task factory must be a callable or None')
462        self._task_factory = factory
463
464    def get_task_factory(self):
465        """Return a task factory, or None if the default one is in use."""
466        return self._task_factory
467
468    def _make_socket_transport(self, sock, protocol, waiter=None, *,
469                               extra=None, server=None):
470        """Create socket transport."""
471        raise NotImplementedError
472
473    def _make_ssl_transport(
474            self, rawsock, protocol, sslcontext, waiter=None,
475            *, server_side=False, server_hostname=None,
476            extra=None, server=None,
477            ssl_handshake_timeout=None,
478            ssl_shutdown_timeout=None,
479            call_connection_made=True):
480        """Create SSL transport."""
481        raise NotImplementedError
482
483    def _make_datagram_transport(self, sock, protocol,
484                                 address=None, waiter=None, extra=None):
485        """Create datagram transport."""
486        raise NotImplementedError
487
488    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
489                                  extra=None):
490        """Create read pipe transport."""
491        raise NotImplementedError
492
493    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
494                                   extra=None):
495        """Create write pipe transport."""
496        raise NotImplementedError
497
498    async def _make_subprocess_transport(self, protocol, args, shell,
499                                         stdin, stdout, stderr, bufsize,
500                                         extra=None, **kwargs):
501        """Create subprocess transport."""
502        raise NotImplementedError
503
504    def _write_to_self(self):
505        """Write a byte to self-pipe, to wake up the event loop.
506
507        This may be called from a different thread.
508
509        The subclass is responsible for implementing the self-pipe.
510        """
511        raise NotImplementedError
512
513    def _process_events(self, event_list):
514        """Process selector events."""
515        raise NotImplementedError
516
517    def _check_closed(self):
518        if self._closed:
519            raise RuntimeError('Event loop is closed')
520
521    def _check_default_executor(self):
522        if self._executor_shutdown_called:
523            raise RuntimeError('Executor shutdown has been called')
524
525    def _asyncgen_finalizer_hook(self, agen):
526        self._asyncgens.discard(agen)
527        if not self.is_closed():
528            self.call_soon_threadsafe(self.create_task, agen.aclose())
529
530    def _asyncgen_firstiter_hook(self, agen):
531        if self._asyncgens_shutdown_called:
532            warnings.warn(
533                f"asynchronous generator {agen!r} was scheduled after "
534                f"loop.shutdown_asyncgens() call",
535                ResourceWarning, source=self)
536
537        self._asyncgens.add(agen)
538
539    async def shutdown_asyncgens(self):
540        """Shutdown all active asynchronous generators."""
541        self._asyncgens_shutdown_called = True
542
543        if not len(self._asyncgens):
544            # If Python version is <3.6 or we don't have any asynchronous
545            # generators alive.
546            return
547
548        closing_agens = list(self._asyncgens)
549        self._asyncgens.clear()
550
551        results = await tasks.gather(
552            *[ag.aclose() for ag in closing_agens],
553            return_exceptions=True)
554
555        for result, agen in zip(results, closing_agens):
556            if isinstance(result, Exception):
557                self.call_exception_handler({
558                    'message': f'an error occurred during closing of '
559                               f'asynchronous generator {agen!r}',
560                    'exception': result,
561                    'asyncgen': agen
562                })
563
564    async def shutdown_default_executor(self):
565        """Schedule the shutdown of the default executor."""
566        self._executor_shutdown_called = True
567        if self._default_executor is None:
568            return
569        future = self.create_future()
570        thread = threading.Thread(target=self._do_shutdown, args=(future,))
571        thread.start()
572        try:
573            await future
574        finally:
575            thread.join()
576
577    def _do_shutdown(self, future):
578        try:
579            self._default_executor.shutdown(wait=True)
580            if not self.is_closed():
581                self.call_soon_threadsafe(future.set_result, None)
582        except Exception as ex:
583            if not self.is_closed():
584                self.call_soon_threadsafe(future.set_exception, ex)
585
586    def _check_running(self):
587        if self.is_running():
588            raise RuntimeError('This event loop is already running')
589        if events._get_running_loop() is not None:
590            raise RuntimeError(
591                'Cannot run the event loop while another loop is running')
592
593    def run_forever(self):
594        """Run until stop() is called."""
595        self._check_closed()
596        self._check_running()
597        self._set_coroutine_origin_tracking(self._debug)
598
599        old_agen_hooks = sys.get_asyncgen_hooks()
600        try:
601            self._thread_id = threading.get_ident()
602            sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
603                                   finalizer=self._asyncgen_finalizer_hook)
604
605            events._set_running_loop(self)
606            while True:
607                self._run_once()
608                if self._stopping:
609                    break
610        finally:
611            self._stopping = False
612            self._thread_id = None
613            events._set_running_loop(None)
614            self._set_coroutine_origin_tracking(False)
615            sys.set_asyncgen_hooks(*old_agen_hooks)
616
617    def run_until_complete(self, future):
618        """Run until the Future is done.
619
620        If the argument is a coroutine, it is wrapped in a Task.
621
622        WARNING: It would be disastrous to call run_until_complete()
623        with the same coroutine twice -- it would wrap it in two
624        different Tasks and that can't be good.
625
626        Return the Future's result, or raise its exception.
627        """
628        self._check_closed()
629        self._check_running()
630
631        new_task = not futures.isfuture(future)
632        future = tasks.ensure_future(future, loop=self)
633        if new_task:
634            # An exception is raised if the future didn't complete, so there
635            # is no need to log the "destroy pending task" message
636            future._log_destroy_pending = False
637
638        future.add_done_callback(_run_until_complete_cb)
639        try:
640            self.run_forever()
641        except:
642            if new_task and future.done() and not future.cancelled():
643                # The coroutine raised a BaseException. Consume the exception
644                # to not log a warning, the caller doesn't have access to the
645                # local task.
646                future.exception()
647            raise
648        finally:
649            future.remove_done_callback(_run_until_complete_cb)
650        if not future.done():
651            raise RuntimeError('Event loop stopped before Future completed.')
652
653        return future.result()
654
655    def stop(self):
656        """Stop running the event loop.
657
658        Every callback already scheduled will still run.  This simply informs
659        run_forever to stop looping after a complete iteration.
660        """
661        self._stopping = True
662
663    def close(self):
664        """Close the event loop.
665
666        This clears the queues and shuts down the executor,
667        but does not wait for the executor to finish.
668
669        The event loop must not be running.
670        """
671        if self.is_running():
672            raise RuntimeError("Cannot close a running event loop")
673        if self._closed:
674            return
675        if self._debug:
676            logger.debug("Close %r", self)
677        self._closed = True
678        self._ready.clear()
679        self._scheduled.clear()
680        self._executor_shutdown_called = True
681        executor = self._default_executor
682        if executor is not None:
683            self._default_executor = None
684            executor.shutdown(wait=False)
685
686    def is_closed(self):
687        """Returns True if the event loop was closed."""
688        return self._closed
689
690    def __del__(self, _warn=warnings.warn):
691        if not self.is_closed():
692            _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self)
693            if not self.is_running():
694                self.close()
695
696    def is_running(self):
697        """Returns True if the event loop is running."""
698        return (self._thread_id is not None)
699
700    def time(self):
701        """Return the time according to the event loop's clock.
702
703        This is a float expressed in seconds since an epoch, but the
704        epoch, precision, accuracy and drift are unspecified and may
705        differ per event loop.
706        """
707        return time.monotonic()
708
709    def call_later(self, delay, callback, *args, context=None):
710        """Arrange for a callback to be called at a given time.
711
712        Return a Handle: an opaque object with a cancel() method that
713        can be used to cancel the call.
714
715        The delay can be an int or float, expressed in seconds.  It is
716        always relative to the current time.
717
718        Each callback will be called exactly once.  If two callbacks
719        are scheduled for exactly the same time, it undefined which
720        will be called first.
721
722        Any positional arguments after the callback will be passed to
723        the callback when it is called.
724        """
725        if delay is None:
726            raise TypeError('delay must not be None')
727        timer = self.call_at(self.time() + delay, callback, *args,
728                             context=context)
729        if timer._source_traceback:
730            del timer._source_traceback[-1]
731        return timer
732
733    def call_at(self, when, callback, *args, context=None):
734        """Like call_later(), but uses an absolute time.
735
736        Absolute time corresponds to the event loop's time() method.
737        """
738        if when is None:
739            raise TypeError("when cannot be None")
740        self._check_closed()
741        if self._debug:
742            self._check_thread()
743            self._check_callback(callback, 'call_at')
744        timer = events.TimerHandle(when, callback, args, self, context)
745        if timer._source_traceback:
746            del timer._source_traceback[-1]
747        heapq.heappush(self._scheduled, timer)
748        timer._scheduled = True
749        return timer
750
751    def call_soon(self, callback, *args, context=None):
752        """Arrange for a callback to be called as soon as possible.
753
754        This operates as a FIFO queue: callbacks are called in the
755        order in which they are registered.  Each callback will be
756        called exactly once.
757
758        Any positional arguments after the callback will be passed to
759        the callback when it is called.
760        """
761        self._check_closed()
762        if self._debug:
763            self._check_thread()
764            self._check_callback(callback, 'call_soon')
765        handle = self._call_soon(callback, args, context)
766        if handle._source_traceback:
767            del handle._source_traceback[-1]
768        return handle
769
770    def _check_callback(self, callback, method):
771        if (coroutines.iscoroutine(callback) or
772                coroutines.iscoroutinefunction(callback)):
773            raise TypeError(
774                f"coroutines cannot be used with {method}()")
775        if not callable(callback):
776            raise TypeError(
777                f'a callable object was expected by {method}(), '
778                f'got {callback!r}')
779
780    def _call_soon(self, callback, args, context):
781        handle = events.Handle(callback, args, self, context)
782        if handle._source_traceback:
783            del handle._source_traceback[-1]
784        self._ready.append(handle)
785        return handle
786
787    def _check_thread(self):
788        """Check that the current thread is the thread running the event loop.
789
790        Non-thread-safe methods of this class make this assumption and will
791        likely behave incorrectly when the assumption is violated.
792
793        Should only be called when (self._debug == True).  The caller is
794        responsible for checking this condition for performance reasons.
795        """
796        if self._thread_id is None:
797            return
798        thread_id = threading.get_ident()
799        if thread_id != self._thread_id:
800            raise RuntimeError(
801                "Non-thread-safe operation invoked on an event loop other "
802                "than the current one")
803
804    def call_soon_threadsafe(self, callback, *args, context=None):
805        """Like call_soon(), but thread-safe."""
806        self._check_closed()
807        if self._debug:
808            self._check_callback(callback, 'call_soon_threadsafe')
809        handle = self._call_soon(callback, args, context)
810        if handle._source_traceback:
811            del handle._source_traceback[-1]
812        self._write_to_self()
813        return handle
814
815    def run_in_executor(self, executor, func, *args):
816        self._check_closed()
817        if self._debug:
818            self._check_callback(func, 'run_in_executor')
819        if executor is None:
820            executor = self._default_executor
821            # Only check when the default executor is being used
822            self._check_default_executor()
823            if executor is None:
824                executor = concurrent.futures.ThreadPoolExecutor(
825                    thread_name_prefix='asyncio'
826                )
827                self._default_executor = executor
828        return futures.wrap_future(
829            executor.submit(func, *args), loop=self)
830
831    def set_default_executor(self, executor):
832        if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
833            raise TypeError('executor must be ThreadPoolExecutor instance')
834        self._default_executor = executor
835
836    def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
837        msg = [f"{host}:{port!r}"]
838        if family:
839            msg.append(f'family={family!r}')
840        if type:
841            msg.append(f'type={type!r}')
842        if proto:
843            msg.append(f'proto={proto!r}')
844        if flags:
845            msg.append(f'flags={flags!r}')
846        msg = ', '.join(msg)
847        logger.debug('Get address info %s', msg)
848
849        t0 = self.time()
850        addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
851        dt = self.time() - t0
852
853        msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}'
854        if dt >= self.slow_callback_duration:
855            logger.info(msg)
856        else:
857            logger.debug(msg)
858        return addrinfo
859
860    async def getaddrinfo(self, host, port, *,
861                          family=0, type=0, proto=0, flags=0):
862        if self._debug:
863            getaddr_func = self._getaddrinfo_debug
864        else:
865            getaddr_func = socket.getaddrinfo
866
867        return await self.run_in_executor(
868            None, getaddr_func, host, port, family, type, proto, flags)
869
870    async def getnameinfo(self, sockaddr, flags=0):
871        return await self.run_in_executor(
872            None, socket.getnameinfo, sockaddr, flags)
873
874    async def sock_sendfile(self, sock, file, offset=0, count=None,
875                            *, fallback=True):
876        if self._debug and sock.gettimeout() != 0:
877            raise ValueError("the socket must be non-blocking")
878        _check_ssl_socket(sock)
879        self._check_sendfile_params(sock, file, offset, count)
880        try:
881            return await self._sock_sendfile_native(sock, file,
882                                                    offset, count)
883        except exceptions.SendfileNotAvailableError as exc:
884            if not fallback:
885                raise
886        return await self._sock_sendfile_fallback(sock, file,
887                                                  offset, count)
888
889    async def _sock_sendfile_native(self, sock, file, offset, count):
890        # NB: sendfile syscall is not supported for SSL sockets and
891        # non-mmap files even if sendfile is supported by OS
892        raise exceptions.SendfileNotAvailableError(
893            f"syscall sendfile is not available for socket {sock!r} "
894            f"and file {file!r} combination")
895
896    async def _sock_sendfile_fallback(self, sock, file, offset, count):
897        if offset:
898            file.seek(offset)
899        blocksize = (
900            min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE)
901            if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE
902        )
903        buf = bytearray(blocksize)
904        total_sent = 0
905        try:
906            while True:
907                if count:
908                    blocksize = min(count - total_sent, blocksize)
909                    if blocksize <= 0:
910                        break
911                view = memoryview(buf)[:blocksize]
912                read = await self.run_in_executor(None, file.readinto, view)
913                if not read:
914                    break  # EOF
915                await self.sock_sendall(sock, view[:read])
916                total_sent += read
917            return total_sent
918        finally:
919            if total_sent > 0 and hasattr(file, 'seek'):
920                file.seek(offset + total_sent)
921
922    def _check_sendfile_params(self, sock, file, offset, count):
923        if 'b' not in getattr(file, 'mode', 'b'):
924            raise ValueError("file should be opened in binary mode")
925        if not sock.type == socket.SOCK_STREAM:
926            raise ValueError("only SOCK_STREAM type sockets are supported")
927        if count is not None:
928            if not isinstance(count, int):
929                raise TypeError(
930                    "count must be a positive integer (got {!r})".format(count))
931            if count <= 0:
932                raise ValueError(
933                    "count must be a positive integer (got {!r})".format(count))
934        if not isinstance(offset, int):
935            raise TypeError(
936                "offset must be a non-negative integer (got {!r})".format(
937                    offset))
938        if offset < 0:
939            raise ValueError(
940                "offset must be a non-negative integer (got {!r})".format(
941                    offset))
942
943    async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None):
944        """Create, bind and connect one socket."""
945        my_exceptions = []
946        exceptions.append(my_exceptions)
947        family, type_, proto, _, address = addr_info
948        sock = None
949        try:
950            sock = socket.socket(family=family, type=type_, proto=proto)
951            sock.setblocking(False)
952            if local_addr_infos is not None:
953                for lfamily, _, _, _, laddr in local_addr_infos:
954                    # skip local addresses of different family
955                    if lfamily != family:
956                        continue
957                    try:
958                        sock.bind(laddr)
959                        break
960                    except OSError as exc:
961                        msg = (
962                            f'error while attempting to bind on '
963                            f'address {laddr!r}: '
964                            f'{exc.strerror.lower()}'
965                        )
966                        exc = OSError(exc.errno, msg)
967                        my_exceptions.append(exc)
968                else:  # all bind attempts failed
969                    if my_exceptions:
970                        raise my_exceptions.pop()
971                    else:
972                        raise OSError(f"no matching local address with {family=} found")
973            await self.sock_connect(sock, address)
974            return sock
975        except OSError as exc:
976            my_exceptions.append(exc)
977            if sock is not None:
978                sock.close()
979            raise
980        except:
981            if sock is not None:
982                sock.close()
983            raise
984        finally:
985            exceptions = my_exceptions = None
986
987    async def create_connection(
988            self, protocol_factory, host=None, port=None,
989            *, ssl=None, family=0,
990            proto=0, flags=0, sock=None,
991            local_addr=None, server_hostname=None,
992            ssl_handshake_timeout=None,
993            ssl_shutdown_timeout=None,
994            happy_eyeballs_delay=None, interleave=None):
995        """Connect to a TCP server.
996
997        Create a streaming transport connection to a given internet host and
998        port: socket family AF_INET or socket.AF_INET6 depending on host (or
999        family if specified), socket type SOCK_STREAM. protocol_factory must be
1000        a callable returning a protocol instance.
1001
1002        This method is a coroutine which will try to establish the connection
1003        in the background.  When successful, the coroutine returns a
1004        (transport, protocol) pair.
1005        """
1006        if server_hostname is not None and not ssl:
1007            raise ValueError('server_hostname is only meaningful with ssl')
1008
1009        if server_hostname is None and ssl:
1010            # Use host as default for server_hostname.  It is an error
1011            # if host is empty or not set, e.g. when an
1012            # already-connected socket was passed or when only a port
1013            # is given.  To avoid this error, you can pass
1014            # server_hostname='' -- this will bypass the hostname
1015            # check.  (This also means that if host is a numeric
1016            # IP/IPv6 address, we will attempt to verify that exact
1017            # address; this will probably fail, but it is possible to
1018            # create a certificate for a specific IP address, so we
1019            # don't judge it here.)
1020            if not host:
1021                raise ValueError('You must set server_hostname '
1022                                 'when using ssl without a host')
1023            server_hostname = host
1024
1025        if ssl_handshake_timeout is not None and not ssl:
1026            raise ValueError(
1027                'ssl_handshake_timeout is only meaningful with ssl')
1028
1029        if ssl_shutdown_timeout is not None and not ssl:
1030            raise ValueError(
1031                'ssl_shutdown_timeout is only meaningful with ssl')
1032
1033        if sock is not None:
1034            _check_ssl_socket(sock)
1035
1036        if happy_eyeballs_delay is not None and interleave is None:
1037            # If using happy eyeballs, default to interleave addresses by family
1038            interleave = 1
1039
1040        if host is not None or port is not None:
1041            if sock is not None:
1042                raise ValueError(
1043                    'host/port and sock can not be specified at the same time')
1044
1045            infos = await self._ensure_resolved(
1046                (host, port), family=family,
1047                type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
1048            if not infos:
1049                raise OSError('getaddrinfo() returned empty list')
1050
1051            if local_addr is not None:
1052                laddr_infos = await self._ensure_resolved(
1053                    local_addr, family=family,
1054                    type=socket.SOCK_STREAM, proto=proto,
1055                    flags=flags, loop=self)
1056                if not laddr_infos:
1057                    raise OSError('getaddrinfo() returned empty list')
1058            else:
1059                laddr_infos = None
1060
1061            if interleave:
1062                infos = _interleave_addrinfos(infos, interleave)
1063
1064            exceptions = []
1065            if happy_eyeballs_delay is None:
1066                # not using happy eyeballs
1067                for addrinfo in infos:
1068                    try:
1069                        sock = await self._connect_sock(
1070                            exceptions, addrinfo, laddr_infos)
1071                        break
1072                    except OSError:
1073                        continue
1074            else:  # using happy eyeballs
1075                sock, _, _ = await staggered.staggered_race(
1076                    (functools.partial(self._connect_sock,
1077                                       exceptions, addrinfo, laddr_infos)
1078                     for addrinfo in infos),
1079                    happy_eyeballs_delay, loop=self)
1080
1081            if sock is None:
1082                exceptions = [exc for sub in exceptions for exc in sub]
1083                try:
1084                    if len(exceptions) == 1:
1085                        raise exceptions[0]
1086                    else:
1087                        # If they all have the same str(), raise one.
1088                        model = str(exceptions[0])
1089                        if all(str(exc) == model for exc in exceptions):
1090                            raise exceptions[0]
1091                        # Raise a combined exception so the user can see all
1092                        # the various error messages.
1093                        raise OSError('Multiple exceptions: {}'.format(
1094                            ', '.join(str(exc) for exc in exceptions)))
1095                finally:
1096                    exceptions = None
1097
1098        else:
1099            if sock is None:
1100                raise ValueError(
1101                    'host and port was not specified and no sock specified')
1102            if sock.type != socket.SOCK_STREAM:
1103                # We allow AF_INET, AF_INET6, AF_UNIX as long as they
1104                # are SOCK_STREAM.
1105                # We support passing AF_UNIX sockets even though we have
1106                # a dedicated API for that: create_unix_connection.
1107                # Disallowing AF_UNIX in this method, breaks backwards
1108                # compatibility.
1109                raise ValueError(
1110                    f'A Stream Socket was expected, got {sock!r}')
1111
1112        transport, protocol = await self._create_connection_transport(
1113            sock, protocol_factory, ssl, server_hostname,
1114            ssl_handshake_timeout=ssl_handshake_timeout,
1115            ssl_shutdown_timeout=ssl_shutdown_timeout)
1116        if self._debug:
1117            # Get the socket from the transport because SSL transport closes
1118            # the old socket and creates a new SSL socket
1119            sock = transport.get_extra_info('socket')
1120            logger.debug("%r connected to %s:%r: (%r, %r)",
1121                         sock, host, port, transport, protocol)
1122        return transport, protocol
1123
1124    async def _create_connection_transport(
1125            self, sock, protocol_factory, ssl,
1126            server_hostname, server_side=False,
1127            ssl_handshake_timeout=None,
1128            ssl_shutdown_timeout=None):
1129
1130        sock.setblocking(False)
1131
1132        protocol = protocol_factory()
1133        waiter = self.create_future()
1134        if ssl:
1135            sslcontext = None if isinstance(ssl, bool) else ssl
1136            transport = self._make_ssl_transport(
1137                sock, protocol, sslcontext, waiter,
1138                server_side=server_side, server_hostname=server_hostname,
1139                ssl_handshake_timeout=ssl_handshake_timeout,
1140                ssl_shutdown_timeout=ssl_shutdown_timeout)
1141        else:
1142            transport = self._make_socket_transport(sock, protocol, waiter)
1143
1144        try:
1145            await waiter
1146        except:
1147            transport.close()
1148            raise
1149
1150        return transport, protocol
1151
1152    async def sendfile(self, transport, file, offset=0, count=None,
1153                       *, fallback=True):
1154        """Send a file to transport.
1155
1156        Return the total number of bytes which were sent.
1157
1158        The method uses high-performance os.sendfile if available.
1159
1160        file must be a regular file object opened in binary mode.
1161
1162        offset tells from where to start reading the file. If specified,
1163        count is the total number of bytes to transmit as opposed to
1164        sending the file until EOF is reached. File position is updated on
1165        return or also in case of error in which case file.tell()
1166        can be used to figure out the number of bytes
1167        which were sent.
1168
1169        fallback set to True makes asyncio to manually read and send
1170        the file when the platform does not support the sendfile syscall
1171        (e.g. Windows or SSL socket on Unix).
1172
1173        Raise SendfileNotAvailableError if the system does not support
1174        sendfile syscall and fallback is False.
1175        """
1176        if transport.is_closing():
1177            raise RuntimeError("Transport is closing")
1178        mode = getattr(transport, '_sendfile_compatible',
1179                       constants._SendfileMode.UNSUPPORTED)
1180        if mode is constants._SendfileMode.UNSUPPORTED:
1181            raise RuntimeError(
1182                f"sendfile is not supported for transport {transport!r}")
1183        if mode is constants._SendfileMode.TRY_NATIVE:
1184            try:
1185                return await self._sendfile_native(transport, file,
1186                                                   offset, count)
1187            except exceptions.SendfileNotAvailableError as exc:
1188                if not fallback:
1189                    raise
1190
1191        if not fallback:
1192            raise RuntimeError(
1193                f"fallback is disabled and native sendfile is not "
1194                f"supported for transport {transport!r}")
1195
1196        return await self._sendfile_fallback(transport, file,
1197                                             offset, count)
1198
1199    async def _sendfile_native(self, transp, file, offset, count):
1200        raise exceptions.SendfileNotAvailableError(
1201            "sendfile syscall is not supported")
1202
1203    async def _sendfile_fallback(self, transp, file, offset, count):
1204        if offset:
1205            file.seek(offset)
1206        blocksize = min(count, 16384) if count else 16384
1207        buf = bytearray(blocksize)
1208        total_sent = 0
1209        proto = _SendfileFallbackProtocol(transp)
1210        try:
1211            while True:
1212                if count:
1213                    blocksize = min(count - total_sent, blocksize)
1214                    if blocksize <= 0:
1215                        return total_sent
1216                view = memoryview(buf)[:blocksize]
1217                read = await self.run_in_executor(None, file.readinto, view)
1218                if not read:
1219                    return total_sent  # EOF
1220                await proto.drain()
1221                transp.write(view[:read])
1222                total_sent += read
1223        finally:
1224            if total_sent > 0 and hasattr(file, 'seek'):
1225                file.seek(offset + total_sent)
1226            await proto.restore()
1227
1228    async def start_tls(self, transport, protocol, sslcontext, *,
1229                        server_side=False,
1230                        server_hostname=None,
1231                        ssl_handshake_timeout=None,
1232                        ssl_shutdown_timeout=None):
1233        """Upgrade transport to TLS.
1234
1235        Return a new transport that *protocol* should start using
1236        immediately.
1237        """
1238        if ssl is None:
1239            raise RuntimeError('Python ssl module is not available')
1240
1241        if not isinstance(sslcontext, ssl.SSLContext):
1242            raise TypeError(
1243                f'sslcontext is expected to be an instance of ssl.SSLContext, '
1244                f'got {sslcontext!r}')
1245
1246        if not getattr(transport, '_start_tls_compatible', False):
1247            raise TypeError(
1248                f'transport {transport!r} is not supported by start_tls()')
1249
1250        waiter = self.create_future()
1251        ssl_protocol = sslproto.SSLProtocol(
1252            self, protocol, sslcontext, waiter,
1253            server_side, server_hostname,
1254            ssl_handshake_timeout=ssl_handshake_timeout,
1255            ssl_shutdown_timeout=ssl_shutdown_timeout,
1256            call_connection_made=False)
1257
1258        # Pause early so that "ssl_protocol.data_received()" doesn't
1259        # have a chance to get called before "ssl_protocol.connection_made()".
1260        transport.pause_reading()
1261
1262        transport.set_protocol(ssl_protocol)
1263        conmade_cb = self.call_soon(ssl_protocol.connection_made, transport)
1264        resume_cb = self.call_soon(transport.resume_reading)
1265
1266        try:
1267            await waiter
1268        except BaseException:
1269            transport.close()
1270            conmade_cb.cancel()
1271            resume_cb.cancel()
1272            raise
1273
1274        return ssl_protocol._app_transport
1275
1276    async def create_datagram_endpoint(self, protocol_factory,
1277                                       local_addr=None, remote_addr=None, *,
1278                                       family=0, proto=0, flags=0,
1279                                       reuse_port=None,
1280                                       allow_broadcast=None, sock=None):
1281        """Create datagram connection."""
1282        if sock is not None:
1283            if sock.type != socket.SOCK_DGRAM:
1284                raise ValueError(
1285                    f'A UDP Socket was expected, got {sock!r}')
1286            if (local_addr or remote_addr or
1287                    family or proto or flags or
1288                    reuse_port or allow_broadcast):
1289                # show the problematic kwargs in exception msg
1290                opts = dict(local_addr=local_addr, remote_addr=remote_addr,
1291                            family=family, proto=proto, flags=flags,
1292                            reuse_port=reuse_port,
1293                            allow_broadcast=allow_broadcast)
1294                problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v)
1295                raise ValueError(
1296                    f'socket modifier keyword arguments can not be used '
1297                    f'when sock is specified. ({problems})')
1298            sock.setblocking(False)
1299            r_addr = None
1300        else:
1301            if not (local_addr or remote_addr):
1302                if family == 0:
1303                    raise ValueError('unexpected address family')
1304                addr_pairs_info = (((family, proto), (None, None)),)
1305            elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX:
1306                for addr in (local_addr, remote_addr):
1307                    if addr is not None and not isinstance(addr, str):
1308                        raise TypeError('string is expected')
1309
1310                if local_addr and local_addr[0] not in (0, '\x00'):
1311                    try:
1312                        if stat.S_ISSOCK(os.stat(local_addr).st_mode):
1313                            os.remove(local_addr)
1314                    except FileNotFoundError:
1315                        pass
1316                    except OSError as err:
1317                        # Directory may have permissions only to create socket.
1318                        logger.error('Unable to check or remove stale UNIX '
1319                                     'socket %r: %r',
1320                                     local_addr, err)
1321
1322                addr_pairs_info = (((family, proto),
1323                                    (local_addr, remote_addr)), )
1324            else:
1325                # join address by (family, protocol)
1326                addr_infos = {}  # Using order preserving dict
1327                for idx, addr in ((0, local_addr), (1, remote_addr)):
1328                    if addr is not None:
1329                        if not (isinstance(addr, tuple) and len(addr) == 2):
1330                            raise TypeError('2-tuple is expected')
1331
1332                        infos = await self._ensure_resolved(
1333                            addr, family=family, type=socket.SOCK_DGRAM,
1334                            proto=proto, flags=flags, loop=self)
1335                        if not infos:
1336                            raise OSError('getaddrinfo() returned empty list')
1337
1338                        for fam, _, pro, _, address in infos:
1339                            key = (fam, pro)
1340                            if key not in addr_infos:
1341                                addr_infos[key] = [None, None]
1342                            addr_infos[key][idx] = address
1343
1344                # each addr has to have info for each (family, proto) pair
1345                addr_pairs_info = [
1346                    (key, addr_pair) for key, addr_pair in addr_infos.items()
1347                    if not ((local_addr and addr_pair[0] is None) or
1348                            (remote_addr and addr_pair[1] is None))]
1349
1350                if not addr_pairs_info:
1351                    raise ValueError('can not get address information')
1352
1353            exceptions = []
1354
1355            for ((family, proto),
1356                 (local_address, remote_address)) in addr_pairs_info:
1357                sock = None
1358                r_addr = None
1359                try:
1360                    sock = socket.socket(
1361                        family=family, type=socket.SOCK_DGRAM, proto=proto)
1362                    if reuse_port:
1363                        _set_reuseport(sock)
1364                    if allow_broadcast:
1365                        sock.setsockopt(
1366                            socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
1367                    sock.setblocking(False)
1368
1369                    if local_addr:
1370                        sock.bind(local_address)
1371                    if remote_addr:
1372                        if not allow_broadcast:
1373                            await self.sock_connect(sock, remote_address)
1374                        r_addr = remote_address
1375                except OSError as exc:
1376                    if sock is not None:
1377                        sock.close()
1378                    exceptions.append(exc)
1379                except:
1380                    if sock is not None:
1381                        sock.close()
1382                    raise
1383                else:
1384                    break
1385            else:
1386                raise exceptions[0]
1387
1388        protocol = protocol_factory()
1389        waiter = self.create_future()
1390        transport = self._make_datagram_transport(
1391            sock, protocol, r_addr, waiter)
1392        if self._debug:
1393            if local_addr:
1394                logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
1395                            "created: (%r, %r)",
1396                            local_addr, remote_addr, transport, protocol)
1397            else:
1398                logger.debug("Datagram endpoint remote_addr=%r created: "
1399                             "(%r, %r)",
1400                             remote_addr, transport, protocol)
1401
1402        try:
1403            await waiter
1404        except:
1405            transport.close()
1406            raise
1407
1408        return transport, protocol
1409
1410    async def _ensure_resolved(self, address, *,
1411                               family=0, type=socket.SOCK_STREAM,
1412                               proto=0, flags=0, loop):
1413        host, port = address[:2]
1414        info = _ipaddr_info(host, port, family, type, proto, *address[2:])
1415        if info is not None:
1416            # "host" is already a resolved IP.
1417            return [info]
1418        else:
1419            return await loop.getaddrinfo(host, port, family=family, type=type,
1420                                          proto=proto, flags=flags)
1421
1422    async def _create_server_getaddrinfo(self, host, port, family, flags):
1423        infos = await self._ensure_resolved((host, port), family=family,
1424                                            type=socket.SOCK_STREAM,
1425                                            flags=flags, loop=self)
1426        if not infos:
1427            raise OSError(f'getaddrinfo({host!r}) returned empty list')
1428        return infos
1429
1430    async def create_server(
1431            self, protocol_factory, host=None, port=None,
1432            *,
1433            family=socket.AF_UNSPEC,
1434            flags=socket.AI_PASSIVE,
1435            sock=None,
1436            backlog=100,
1437            ssl=None,
1438            reuse_address=None,
1439            reuse_port=None,
1440            ssl_handshake_timeout=None,
1441            ssl_shutdown_timeout=None,
1442            start_serving=True):
1443        """Create a TCP server.
1444
1445        The host parameter can be a string, in that case the TCP server is
1446        bound to host and port.
1447
1448        The host parameter can also be a sequence of strings and in that case
1449        the TCP server is bound to all hosts of the sequence. If a host
1450        appears multiple times (possibly indirectly e.g. when hostnames
1451        resolve to the same IP address), the server is only bound once to that
1452        host.
1453
1454        Return a Server object which can be used to stop the service.
1455
1456        This method is a coroutine.
1457        """
1458        if isinstance(ssl, bool):
1459            raise TypeError('ssl argument must be an SSLContext or None')
1460
1461        if ssl_handshake_timeout is not None and ssl is None:
1462            raise ValueError(
1463                'ssl_handshake_timeout is only meaningful with ssl')
1464
1465        if ssl_shutdown_timeout is not None and ssl is None:
1466            raise ValueError(
1467                'ssl_shutdown_timeout is only meaningful with ssl')
1468
1469        if sock is not None:
1470            _check_ssl_socket(sock)
1471
1472        if host is not None or port is not None:
1473            if sock is not None:
1474                raise ValueError(
1475                    'host/port and sock can not be specified at the same time')
1476
1477            if reuse_address is None:
1478                reuse_address = os.name == "posix" and sys.platform != "cygwin"
1479            sockets = []
1480            if host == '':
1481                hosts = [None]
1482            elif (isinstance(host, str) or
1483                  not isinstance(host, collections.abc.Iterable)):
1484                hosts = [host]
1485            else:
1486                hosts = host
1487
1488            fs = [self._create_server_getaddrinfo(host, port, family=family,
1489                                                  flags=flags)
1490                  for host in hosts]
1491            infos = await tasks.gather(*fs)
1492            infos = set(itertools.chain.from_iterable(infos))
1493
1494            completed = False
1495            try:
1496                for res in infos:
1497                    af, socktype, proto, canonname, sa = res
1498                    try:
1499                        sock = socket.socket(af, socktype, proto)
1500                    except socket.error:
1501                        # Assume it's a bad family/type/protocol combination.
1502                        if self._debug:
1503                            logger.warning('create_server() failed to create '
1504                                           'socket.socket(%r, %r, %r)',
1505                                           af, socktype, proto, exc_info=True)
1506                        continue
1507                    sockets.append(sock)
1508                    if reuse_address:
1509                        sock.setsockopt(
1510                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
1511                    if reuse_port:
1512                        _set_reuseport(sock)
1513                    # Disable IPv4/IPv6 dual stack support (enabled by
1514                    # default on Linux) which makes a single socket
1515                    # listen on both address families.
1516                    if (_HAS_IPv6 and
1517                            af == socket.AF_INET6 and
1518                            hasattr(socket, 'IPPROTO_IPV6')):
1519                        sock.setsockopt(socket.IPPROTO_IPV6,
1520                                        socket.IPV6_V6ONLY,
1521                                        True)
1522                    try:
1523                        sock.bind(sa)
1524                    except OSError as err:
1525                        raise OSError(err.errno, 'error while attempting '
1526                                      'to bind on address %r: %s'
1527                                      % (sa, err.strerror.lower())) from None
1528                completed = True
1529            finally:
1530                if not completed:
1531                    for sock in sockets:
1532                        sock.close()
1533        else:
1534            if sock is None:
1535                raise ValueError('Neither host/port nor sock were specified')
1536            if sock.type != socket.SOCK_STREAM:
1537                raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1538            sockets = [sock]
1539
1540        for sock in sockets:
1541            sock.setblocking(False)
1542
1543        server = Server(self, sockets, protocol_factory,
1544                        ssl, backlog, ssl_handshake_timeout,
1545                        ssl_shutdown_timeout)
1546        if start_serving:
1547            server._start_serving()
1548            # Skip one loop iteration so that all 'loop.add_reader'
1549            # go through.
1550            await tasks.sleep(0)
1551
1552        if self._debug:
1553            logger.info("%r is serving", server)
1554        return server
1555
1556    async def connect_accepted_socket(
1557            self, protocol_factory, sock,
1558            *, ssl=None,
1559            ssl_handshake_timeout=None,
1560            ssl_shutdown_timeout=None):
1561        if sock.type != socket.SOCK_STREAM:
1562            raise ValueError(f'A Stream Socket was expected, got {sock!r}')
1563
1564        if ssl_handshake_timeout is not None and not ssl:
1565            raise ValueError(
1566                'ssl_handshake_timeout is only meaningful with ssl')
1567
1568        if ssl_shutdown_timeout is not None and not ssl:
1569            raise ValueError(
1570                'ssl_shutdown_timeout is only meaningful with ssl')
1571
1572        if sock is not None:
1573            _check_ssl_socket(sock)
1574
1575        transport, protocol = await self._create_connection_transport(
1576            sock, protocol_factory, ssl, '', server_side=True,
1577            ssl_handshake_timeout=ssl_handshake_timeout,
1578            ssl_shutdown_timeout=ssl_shutdown_timeout)
1579        if self._debug:
1580            # Get the socket from the transport because SSL transport closes
1581            # the old socket and creates a new SSL socket
1582            sock = transport.get_extra_info('socket')
1583            logger.debug("%r handled: (%r, %r)", sock, transport, protocol)
1584        return transport, protocol
1585
1586    async def connect_read_pipe(self, protocol_factory, pipe):
1587        protocol = protocol_factory()
1588        waiter = self.create_future()
1589        transport = self._make_read_pipe_transport(pipe, protocol, waiter)
1590
1591        try:
1592            await waiter
1593        except:
1594            transport.close()
1595            raise
1596
1597        if self._debug:
1598            logger.debug('Read pipe %r connected: (%r, %r)',
1599                         pipe.fileno(), transport, protocol)
1600        return transport, protocol
1601
1602    async def connect_write_pipe(self, protocol_factory, pipe):
1603        protocol = protocol_factory()
1604        waiter = self.create_future()
1605        transport = self._make_write_pipe_transport(pipe, protocol, waiter)
1606
1607        try:
1608            await waiter
1609        except:
1610            transport.close()
1611            raise
1612
1613        if self._debug:
1614            logger.debug('Write pipe %r connected: (%r, %r)',
1615                         pipe.fileno(), transport, protocol)
1616        return transport, protocol
1617
1618    def _log_subprocess(self, msg, stdin, stdout, stderr):
1619        info = [msg]
1620        if stdin is not None:
1621            info.append(f'stdin={_format_pipe(stdin)}')
1622        if stdout is not None and stderr == subprocess.STDOUT:
1623            info.append(f'stdout=stderr={_format_pipe(stdout)}')
1624        else:
1625            if stdout is not None:
1626                info.append(f'stdout={_format_pipe(stdout)}')
1627            if stderr is not None:
1628                info.append(f'stderr={_format_pipe(stderr)}')
1629        logger.debug(' '.join(info))
1630
1631    async def subprocess_shell(self, protocol_factory, cmd, *,
1632                               stdin=subprocess.PIPE,
1633                               stdout=subprocess.PIPE,
1634                               stderr=subprocess.PIPE,
1635                               universal_newlines=False,
1636                               shell=True, bufsize=0,
1637                               encoding=None, errors=None, text=None,
1638                               **kwargs):
1639        if not isinstance(cmd, (bytes, str)):
1640            raise ValueError("cmd must be a string")
1641        if universal_newlines:
1642            raise ValueError("universal_newlines must be False")
1643        if not shell:
1644            raise ValueError("shell must be True")
1645        if bufsize != 0:
1646            raise ValueError("bufsize must be 0")
1647        if text:
1648            raise ValueError("text must be False")
1649        if encoding is not None:
1650            raise ValueError("encoding must be None")
1651        if errors is not None:
1652            raise ValueError("errors must be None")
1653
1654        protocol = protocol_factory()
1655        debug_log = None
1656        if self._debug:
1657            # don't log parameters: they may contain sensitive information
1658            # (password) and may be too long
1659            debug_log = 'run shell command %r' % cmd
1660            self._log_subprocess(debug_log, stdin, stdout, stderr)
1661        transport = await self._make_subprocess_transport(
1662            protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
1663        if self._debug and debug_log is not None:
1664            logger.info('%s: %r', debug_log, transport)
1665        return transport, protocol
1666
1667    async def subprocess_exec(self, protocol_factory, program, *args,
1668                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1669                              stderr=subprocess.PIPE, universal_newlines=False,
1670                              shell=False, bufsize=0,
1671                              encoding=None, errors=None, text=None,
1672                              **kwargs):
1673        if universal_newlines:
1674            raise ValueError("universal_newlines must be False")
1675        if shell:
1676            raise ValueError("shell must be False")
1677        if bufsize != 0:
1678            raise ValueError("bufsize must be 0")
1679        if text:
1680            raise ValueError("text must be False")
1681        if encoding is not None:
1682            raise ValueError("encoding must be None")
1683        if errors is not None:
1684            raise ValueError("errors must be None")
1685
1686        popen_args = (program,) + args
1687        protocol = protocol_factory()
1688        debug_log = None
1689        if self._debug:
1690            # don't log parameters: they may contain sensitive information
1691            # (password) and may be too long
1692            debug_log = f'execute program {program!r}'
1693            self._log_subprocess(debug_log, stdin, stdout, stderr)
1694        transport = await self._make_subprocess_transport(
1695            protocol, popen_args, False, stdin, stdout, stderr,
1696            bufsize, **kwargs)
1697        if self._debug and debug_log is not None:
1698            logger.info('%s: %r', debug_log, transport)
1699        return transport, protocol
1700
1701    def get_exception_handler(self):
1702        """Return an exception handler, or None if the default one is in use.
1703        """
1704        return self._exception_handler
1705
1706    def set_exception_handler(self, handler):
1707        """Set handler as the new event loop exception handler.
1708
1709        If handler is None, the default exception handler will
1710        be set.
1711
1712        If handler is a callable object, it should have a
1713        signature matching '(loop, context)', where 'loop'
1714        will be a reference to the active event loop, 'context'
1715        will be a dict object (see `call_exception_handler()`
1716        documentation for details about context).
1717        """
1718        if handler is not None and not callable(handler):
1719            raise TypeError(f'A callable object or None is expected, '
1720                            f'got {handler!r}')
1721        self._exception_handler = handler
1722
1723    def default_exception_handler(self, context):
1724        """Default exception handler.
1725
1726        This is called when an exception occurs and no exception
1727        handler is set, and can be called by a custom exception
1728        handler that wants to defer to the default behavior.
1729
1730        This default handler logs the error message and other
1731        context-dependent information.  In debug mode, a truncated
1732        stack trace is also appended showing where the given object
1733        (e.g. a handle or future or task) was created, if any.
1734
1735        The context parameter has the same meaning as in
1736        `call_exception_handler()`.
1737        """
1738        message = context.get('message')
1739        if not message:
1740            message = 'Unhandled exception in event loop'
1741
1742        exception = context.get('exception')
1743        if exception is not None:
1744            exc_info = (type(exception), exception, exception.__traceback__)
1745        else:
1746            exc_info = False
1747
1748        if ('source_traceback' not in context and
1749                self._current_handle is not None and
1750                self._current_handle._source_traceback):
1751            context['handle_traceback'] = \
1752                self._current_handle._source_traceback
1753
1754        log_lines = [message]
1755        for key in sorted(context):
1756            if key in {'message', 'exception'}:
1757                continue
1758            value = context[key]
1759            if key == 'source_traceback':
1760                tb = ''.join(traceback.format_list(value))
1761                value = 'Object created at (most recent call last):\n'
1762                value += tb.rstrip()
1763            elif key == 'handle_traceback':
1764                tb = ''.join(traceback.format_list(value))
1765                value = 'Handle created at (most recent call last):\n'
1766                value += tb.rstrip()
1767            else:
1768                value = repr(value)
1769            log_lines.append(f'{key}: {value}')
1770
1771        logger.error('\n'.join(log_lines), exc_info=exc_info)
1772
1773    def call_exception_handler(self, context):
1774        """Call the current event loop's exception handler.
1775
1776        The context argument is a dict containing the following keys:
1777
1778        - 'message': Error message;
1779        - 'exception' (optional): Exception object;
1780        - 'future' (optional): Future instance;
1781        - 'task' (optional): Task instance;
1782        - 'handle' (optional): Handle instance;
1783        - 'protocol' (optional): Protocol instance;
1784        - 'transport' (optional): Transport instance;
1785        - 'socket' (optional): Socket instance;
1786        - 'asyncgen' (optional): Asynchronous generator that caused
1787                                 the exception.
1788
1789        New keys maybe introduced in the future.
1790
1791        Note: do not overload this method in an event loop subclass.
1792        For custom exception handling, use the
1793        `set_exception_handler()` method.
1794        """
1795        if self._exception_handler is None:
1796            try:
1797                self.default_exception_handler(context)
1798            except (SystemExit, KeyboardInterrupt):
1799                raise
1800            except BaseException:
1801                # Second protection layer for unexpected errors
1802                # in the default implementation, as well as for subclassed
1803                # event loops with overloaded "default_exception_handler".
1804                logger.error('Exception in default exception handler',
1805                             exc_info=True)
1806        else:
1807            try:
1808                self._exception_handler(self, context)
1809            except (SystemExit, KeyboardInterrupt):
1810                raise
1811            except BaseException as exc:
1812                # Exception in the user set custom exception handler.
1813                try:
1814                    # Let's try default handler.
1815                    self.default_exception_handler({
1816                        'message': 'Unhandled error in exception handler',
1817                        'exception': exc,
1818                        'context': context,
1819                    })
1820                except (SystemExit, KeyboardInterrupt):
1821                    raise
1822                except BaseException:
1823                    # Guard 'default_exception_handler' in case it is
1824                    # overloaded.
1825                    logger.error('Exception in default exception handler '
1826                                 'while handling an unexpected error '
1827                                 'in custom exception handler',
1828                                 exc_info=True)
1829
1830    def _add_callback(self, handle):
1831        """Add a Handle to _ready."""
1832        if not handle._cancelled:
1833            self._ready.append(handle)
1834
1835    def _add_callback_signalsafe(self, handle):
1836        """Like _add_callback() but called from a signal handler."""
1837        self._add_callback(handle)
1838        self._write_to_self()
1839
1840    def _timer_handle_cancelled(self, handle):
1841        """Notification that a TimerHandle has been cancelled."""
1842        if handle._scheduled:
1843            self._timer_cancelled_count += 1
1844
1845    def _run_once(self):
1846        """Run one full iteration of the event loop.
1847
1848        This calls all currently ready callbacks, polls for I/O,
1849        schedules the resulting callbacks, and finally schedules
1850        'call_later' callbacks.
1851        """
1852
1853        sched_count = len(self._scheduled)
1854        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1855            self._timer_cancelled_count / sched_count >
1856                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
1857            # Remove delayed calls that were cancelled if their number
1858            # is too high
1859            new_scheduled = []
1860            for handle in self._scheduled:
1861                if handle._cancelled:
1862                    handle._scheduled = False
1863                else:
1864                    new_scheduled.append(handle)
1865
1866            heapq.heapify(new_scheduled)
1867            self._scheduled = new_scheduled
1868            self._timer_cancelled_count = 0
1869        else:
1870            # Remove delayed calls that were cancelled from head of queue.
1871            while self._scheduled and self._scheduled[0]._cancelled:
1872                self._timer_cancelled_count -= 1
1873                handle = heapq.heappop(self._scheduled)
1874                handle._scheduled = False
1875
1876        timeout = None
1877        if self._ready or self._stopping:
1878            timeout = 0
1879        elif self._scheduled:
1880            # Compute the desired timeout.
1881            when = self._scheduled[0]._when
1882            timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
1883
1884        event_list = self._selector.select(timeout)
1885        self._process_events(event_list)
1886        # Needed to break cycles when an exception occurs.
1887        event_list = None
1888
1889        # Handle 'later' callbacks that are ready.
1890        end_time = self.time() + self._clock_resolution
1891        while self._scheduled:
1892            handle = self._scheduled[0]
1893            if handle._when >= end_time:
1894                break
1895            handle = heapq.heappop(self._scheduled)
1896            handle._scheduled = False
1897            self._ready.append(handle)
1898
1899        # This is the only place where callbacks are actually *called*.
1900        # All other places just add them to ready.
1901        # Note: We run all currently scheduled callbacks, but not any
1902        # callbacks scheduled by callbacks run this time around --
1903        # they will be run the next time (after another I/O poll).
1904        # Use an idiom that is thread-safe without using locks.
1905        ntodo = len(self._ready)
1906        for i in range(ntodo):
1907            handle = self._ready.popleft()
1908            if handle._cancelled:
1909                continue
1910            if self._debug:
1911                try:
1912                    self._current_handle = handle
1913                    t0 = self.time()
1914                    handle._run()
1915                    dt = self.time() - t0
1916                    if dt >= self.slow_callback_duration:
1917                        logger.warning('Executing %s took %.3f seconds',
1918                                       _format_handle(handle), dt)
1919                finally:
1920                    self._current_handle = None
1921            else:
1922                handle._run()
1923        handle = None  # Needed to break cycles when an exception occurs.
1924
1925    def _set_coroutine_origin_tracking(self, enabled):
1926        if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
1927            return
1928
1929        if enabled:
1930            self._coroutine_origin_tracking_saved_depth = (
1931                sys.get_coroutine_origin_tracking_depth())
1932            sys.set_coroutine_origin_tracking_depth(
1933                constants.DEBUG_STACK_DEPTH)
1934        else:
1935            sys.set_coroutine_origin_tracking_depth(
1936                self._coroutine_origin_tracking_saved_depth)
1937
1938        self._coroutine_origin_tracking_enabled = enabled
1939
1940    def get_debug(self):
1941        return self._debug
1942
1943    def set_debug(self, enabled):
1944        self._debug = enabled
1945
1946        if self.is_running():
1947            self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)
1948