xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/asyncio/tasks.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
20from types import GenericAlias
21
22from . import base_tasks
23from . import coroutines
24from . import events
25from . import exceptions
26from . import futures
27from .coroutines import _is_coroutine
28
29# Helper to generate new task names
30# This uses itertools.count() instead of a "+= 1" operation because the latter
31# is not thread safe. See bpo-11866 for a longer explanation.
32_task_name_counter = itertools.count(1).__next__
33
34
35def current_task(loop=None):
36    """Return a currently executed task."""
37    if loop is None:
38        loop = events.get_running_loop()
39    return _current_tasks.get(loop)
40
41
42def all_tasks(loop=None):
43    """Return a set of all tasks for the loop."""
44    if loop is None:
45        loop = events.get_running_loop()
46    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
47    # thread while we do so. Therefore we cast it to list prior to filtering. The list
48    # cast itself requires iteration, so we repeat it several times ignoring
49    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
50    # details.
51    i = 0
52    while True:
53        try:
54            tasks = list(_all_tasks)
55        except RuntimeError:
56            i += 1
57            if i >= 1000:
58                raise
59        else:
60            break
61    return {t for t in tasks
62            if futures._get_loop(t) is loop and not t.done()}
63
64
65def _set_task_name(task, name):
66    if name is not None:
67        try:
68            set_name = task.set_name
69        except AttributeError:
70            warnings.warn("Task.set_name() was added in Python 3.8, "
71                      "the method support will be mandatory for third-party "
72                      "task implementations since 3.13.",
73                      DeprecationWarning, stacklevel=3)
74        else:
75            set_name(name)
76
77
78class Task(futures._PyFuture):  # Inherit Python Task implementation
79                                # from a Python Future implementation.
80
81    """A coroutine wrapped in a Future."""
82
83    # An important invariant maintained while a Task not done:
84    #
85    # - Either _fut_waiter is None, and _step() is scheduled;
86    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
87    #
88    # The only transition from the latter to the former is through
89    # _wakeup().  When _fut_waiter is not None, one of its callbacks
90    # must be _wakeup().
91
92    # If False, don't log a message if the task is destroyed whereas its
93    # status is still pending
94    _log_destroy_pending = True
95
96    def __init__(self, coro, *, loop=None, name=None, context=None):
97        super().__init__(loop=loop)
98        if self._source_traceback:
99            del self._source_traceback[-1]
100        if not coroutines.iscoroutine(coro):
101            # raise after Future.__init__(), attrs are required for __del__
102            # prevent logging for pending task in __del__
103            self._log_destroy_pending = False
104            raise TypeError(f"a coroutine was expected, got {coro!r}")
105
106        if name is None:
107            self._name = f'Task-{_task_name_counter()}'
108        else:
109            self._name = str(name)
110
111        self._num_cancels_requested = 0
112        self._must_cancel = False
113        self._fut_waiter = None
114        self._coro = coro
115        if context is None:
116            self._context = contextvars.copy_context()
117        else:
118            self._context = context
119
120        self._loop.call_soon(self.__step, context=self._context)
121        _register_task(self)
122
123    def __del__(self):
124        if self._state == futures._PENDING and self._log_destroy_pending:
125            context = {
126                'task': self,
127                'message': 'Task was destroyed but it is pending!',
128            }
129            if self._source_traceback:
130                context['source_traceback'] = self._source_traceback
131            self._loop.call_exception_handler(context)
132        super().__del__()
133
134    __class_getitem__ = classmethod(GenericAlias)
135
136    def __repr__(self):
137        return base_tasks._task_repr(self)
138
139    def get_coro(self):
140        return self._coro
141
142    def get_name(self):
143        return self._name
144
145    def set_name(self, value):
146        self._name = str(value)
147
148    def set_result(self, result):
149        raise RuntimeError('Task does not support set_result operation')
150
151    def set_exception(self, exception):
152        raise RuntimeError('Task does not support set_exception operation')
153
154    def get_stack(self, *, limit=None):
155        """Return the list of stack frames for this task's coroutine.
156
157        If the coroutine is not done, this returns the stack where it is
158        suspended.  If the coroutine has completed successfully or was
159        cancelled, this returns an empty list.  If the coroutine was
160        terminated by an exception, this returns the list of traceback
161        frames.
162
163        The frames are always ordered from oldest to newest.
164
165        The optional limit gives the maximum number of frames to
166        return; by default all available frames are returned.  Its
167        meaning differs depending on whether a stack or a traceback is
168        returned: the newest frames of a stack are returned, but the
169        oldest frames of a traceback are returned.  (This matches the
170        behavior of the traceback module.)
171
172        For reasons beyond our control, only one stack frame is
173        returned for a suspended coroutine.
174        """
175        return base_tasks._task_get_stack(self, limit)
176
177    def print_stack(self, *, limit=None, file=None):
178        """Print the stack or traceback for this task's coroutine.
179
180        This produces output similar to that of the traceback module,
181        for the frames retrieved by get_stack().  The limit argument
182        is passed to get_stack().  The file argument is an I/O stream
183        to which the output is written; by default output is written
184        to sys.stderr.
185        """
186        return base_tasks._task_print_stack(self, limit, file)
187
188    def cancel(self, msg=None):
189        """Request that this task cancel itself.
190
191        This arranges for a CancelledError to be thrown into the
192        wrapped coroutine on the next cycle through the event loop.
193        The coroutine then has a chance to clean up or even deny
194        the request using try/except/finally.
195
196        Unlike Future.cancel, this does not guarantee that the
197        task will be cancelled: the exception might be caught and
198        acted upon, delaying cancellation of the task or preventing
199        cancellation completely.  The task may also return a value or
200        raise a different exception.
201
202        Immediately after this method is called, Task.cancelled() will
203        not return True (unless the task was already cancelled).  A
204        task will be marked as cancelled when the wrapped coroutine
205        terminates with a CancelledError exception (even if cancel()
206        was not called).
207
208        This also increases the task's count of cancellation requests.
209        """
210        self._log_traceback = False
211        if self.done():
212            return False
213        self._num_cancels_requested += 1
214        # These two lines are controversial.  See discussion starting at
215        # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
216        # Also remember that this is duplicated in _asynciomodule.c.
217        # if self._num_cancels_requested > 1:
218        #     return False
219        if self._fut_waiter is not None:
220            if self._fut_waiter.cancel(msg=msg):
221                # Leave self._fut_waiter; it may be a Task that
222                # catches and ignores the cancellation so we may have
223                # to cancel it again later.
224                return True
225        # It must be the case that self.__step is already scheduled.
226        self._must_cancel = True
227        self._cancel_message = msg
228        return True
229
230    def cancelling(self):
231        """Return the count of the task's cancellation requests.
232
233        This count is incremented when .cancel() is called
234        and may be decremented using .uncancel().
235        """
236        return self._num_cancels_requested
237
238    def uncancel(self):
239        """Decrement the task's count of cancellation requests.
240
241        This should be called by the party that called `cancel()` on the task
242        beforehand.
243
244        Returns the remaining number of cancellation requests.
245        """
246        if self._num_cancels_requested > 0:
247            self._num_cancels_requested -= 1
248        return self._num_cancels_requested
249
250    def __step(self, exc=None):
251        if self.done():
252            raise exceptions.InvalidStateError(
253                f'_step(): already done: {self!r}, {exc!r}')
254        if self._must_cancel:
255            if not isinstance(exc, exceptions.CancelledError):
256                exc = self._make_cancelled_error()
257            self._must_cancel = False
258        coro = self._coro
259        self._fut_waiter = None
260
261        _enter_task(self._loop, self)
262        # Call either coro.throw(exc) or coro.send(None).
263        try:
264            if exc is None:
265                # We use the `send` method directly, because coroutines
266                # don't have `__iter__` and `__next__` methods.
267                result = coro.send(None)
268            else:
269                result = coro.throw(exc)
270        except StopIteration as exc:
271            if self._must_cancel:
272                # Task is cancelled right before coro stops.
273                self._must_cancel = False
274                super().cancel(msg=self._cancel_message)
275            else:
276                super().set_result(exc.value)
277        except exceptions.CancelledError as exc:
278            # Save the original exception so we can chain it later.
279            self._cancelled_exc = exc
280            super().cancel()  # I.e., Future.cancel(self).
281        except (KeyboardInterrupt, SystemExit) as exc:
282            super().set_exception(exc)
283            raise
284        except BaseException as exc:
285            super().set_exception(exc)
286        else:
287            blocking = getattr(result, '_asyncio_future_blocking', None)
288            if blocking is not None:
289                # Yielded Future must come from Future.__iter__().
290                if futures._get_loop(result) is not self._loop:
291                    new_exc = RuntimeError(
292                        f'Task {self!r} got Future '
293                        f'{result!r} attached to a different loop')
294                    self._loop.call_soon(
295                        self.__step, new_exc, context=self._context)
296                elif blocking:
297                    if result is self:
298                        new_exc = RuntimeError(
299                            f'Task cannot await on itself: {self!r}')
300                        self._loop.call_soon(
301                            self.__step, new_exc, context=self._context)
302                    else:
303                        result._asyncio_future_blocking = False
304                        result.add_done_callback(
305                            self.__wakeup, context=self._context)
306                        self._fut_waiter = result
307                        if self._must_cancel:
308                            if self._fut_waiter.cancel(
309                                    msg=self._cancel_message):
310                                self._must_cancel = False
311                else:
312                    new_exc = RuntimeError(
313                        f'yield was used instead of yield from '
314                        f'in task {self!r} with {result!r}')
315                    self._loop.call_soon(
316                        self.__step, new_exc, context=self._context)
317
318            elif result is None:
319                # Bare yield relinquishes control for one event loop iteration.
320                self._loop.call_soon(self.__step, context=self._context)
321            elif inspect.isgenerator(result):
322                # Yielding a generator is just wrong.
323                new_exc = RuntimeError(
324                    f'yield was used instead of yield from for '
325                    f'generator in task {self!r} with {result!r}')
326                self._loop.call_soon(
327                    self.__step, new_exc, context=self._context)
328            else:
329                # Yielding something else is an error.
330                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
331                self._loop.call_soon(
332                    self.__step, new_exc, context=self._context)
333        finally:
334            _leave_task(self._loop, self)
335            self = None  # Needed to break cycles when an exception occurs.
336
337    def __wakeup(self, future):
338        try:
339            future.result()
340        except BaseException as exc:
341            # This may also be a cancellation.
342            self.__step(exc)
343        else:
344            # Don't pass the value of `future.result()` explicitly,
345            # as `Future.__iter__` and `Future.__await__` don't need it.
346            # If we call `_step(value, None)` instead of `_step()`,
347            # Python eval loop would use `.send(value)` method call,
348            # instead of `__next__()`, which is slower for futures
349            # that return non-generator iterators from their `__iter__`.
350            self.__step()
351        self = None  # Needed to break cycles when an exception occurs.
352
353
354_PyTask = Task
355
356
357try:
358    import _asyncio
359except ImportError:
360    pass
361else:
362    # _CTask is needed for tests.
363    Task = _CTask = _asyncio.Task
364
365
366def create_task(coro, *, name=None, context=None):
367    """Schedule the execution of a coroutine object in a spawn task.
368
369    Return a Task object.
370    """
371    loop = events.get_running_loop()
372    if context is None:
373        # Use legacy API if context is not needed
374        task = loop.create_task(coro)
375    else:
376        task = loop.create_task(coro, context=context)
377
378    _set_task_name(task, name)
379    return task
380
381
382# wait() and as_completed() similar to those in PEP 3148.
383
384FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
385FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
386ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
387
388
389async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
390    """Wait for the Futures or Tasks given by fs to complete.
391
392    The fs iterable must not be empty.
393
394    Coroutines will be wrapped in Tasks.
395
396    Returns two sets of Future: (done, pending).
397
398    Usage:
399
400        done, pending = await asyncio.wait(fs)
401
402    Note: This does not raise TimeoutError! Futures that aren't done
403    when the timeout occurs are returned in the second set.
404    """
405    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
406        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
407    if not fs:
408        raise ValueError('Set of Tasks/Futures is empty.')
409    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
410        raise ValueError(f'Invalid return_when value: {return_when}')
411
412    fs = set(fs)
413
414    if any(coroutines.iscoroutine(f) for f in fs):
415        raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
416
417    loop = events.get_running_loop()
418    return await _wait(fs, timeout, return_when, loop)
419
420
421def _release_waiter(waiter, *args):
422    if not waiter.done():
423        waiter.set_result(None)
424
425
426async def wait_for(fut, timeout):
427    """Wait for the single Future or coroutine to complete, with timeout.
428
429    Coroutine will be wrapped in Task.
430
431    Returns result of the Future or coroutine.  When a timeout occurs,
432    it cancels the task and raises TimeoutError.  To avoid the task
433    cancellation, wrap it in shield().
434
435    If the wait is cancelled, the task is also cancelled.
436
437    This function is a coroutine.
438    """
439    loop = events.get_running_loop()
440
441    if timeout is None:
442        return await fut
443
444    if timeout <= 0:
445        fut = ensure_future(fut, loop=loop)
446
447        if fut.done():
448            return fut.result()
449
450        await _cancel_and_wait(fut, loop=loop)
451        try:
452            return fut.result()
453        except exceptions.CancelledError as exc:
454            raise exceptions.TimeoutError() from exc
455
456    waiter = loop.create_future()
457    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
458    cb = functools.partial(_release_waiter, waiter)
459
460    fut = ensure_future(fut, loop=loop)
461    fut.add_done_callback(cb)
462
463    try:
464        # wait until the future completes or the timeout
465        try:
466            await waiter
467        except exceptions.CancelledError:
468            if fut.done():
469                return fut.result()
470            else:
471                fut.remove_done_callback(cb)
472                # We must ensure that the task is not running
473                # after wait_for() returns.
474                # See https://bugs.python.org/issue32751
475                await _cancel_and_wait(fut, loop=loop)
476                raise
477
478        if fut.done():
479            return fut.result()
480        else:
481            fut.remove_done_callback(cb)
482            # We must ensure that the task is not running
483            # after wait_for() returns.
484            # See https://bugs.python.org/issue32751
485            await _cancel_and_wait(fut, loop=loop)
486            # In case task cancellation failed with some
487            # exception, we should re-raise it
488            # See https://bugs.python.org/issue40607
489            try:
490                return fut.result()
491            except exceptions.CancelledError as exc:
492                raise exceptions.TimeoutError() from exc
493    finally:
494        timeout_handle.cancel()
495
496
497async def _wait(fs, timeout, return_when, loop):
498    """Internal helper for wait().
499
500    The fs argument must be a collection of Futures.
501    """
502    assert fs, 'Set of Futures is empty.'
503    waiter = loop.create_future()
504    timeout_handle = None
505    if timeout is not None:
506        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
507    counter = len(fs)
508
509    def _on_completion(f):
510        nonlocal counter
511        counter -= 1
512        if (counter <= 0 or
513            return_when == FIRST_COMPLETED or
514            return_when == FIRST_EXCEPTION and (not f.cancelled() and
515                                                f.exception() is not None)):
516            if timeout_handle is not None:
517                timeout_handle.cancel()
518            if not waiter.done():
519                waiter.set_result(None)
520
521    for f in fs:
522        f.add_done_callback(_on_completion)
523
524    try:
525        await waiter
526    finally:
527        if timeout_handle is not None:
528            timeout_handle.cancel()
529        for f in fs:
530            f.remove_done_callback(_on_completion)
531
532    done, pending = set(), set()
533    for f in fs:
534        if f.done():
535            done.add(f)
536        else:
537            pending.add(f)
538    return done, pending
539
540
541async def _cancel_and_wait(fut, loop):
542    """Cancel the *fut* future or task and wait until it completes."""
543
544    waiter = loop.create_future()
545    cb = functools.partial(_release_waiter, waiter)
546    fut.add_done_callback(cb)
547
548    try:
549        fut.cancel()
550        # We cannot wait on *fut* directly to make
551        # sure _cancel_and_wait itself is reliably cancellable.
552        await waiter
553    finally:
554        fut.remove_done_callback(cb)
555
556
557# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
558def as_completed(fs, *, timeout=None):
559    """Return an iterator whose values are coroutines.
560
561    When waiting for the yielded coroutines you'll get the results (or
562    exceptions!) of the original Futures (or coroutines), in the order
563    in which and as soon as they complete.
564
565    This differs from PEP 3148; the proper way to use this is:
566
567        for f in as_completed(fs):
568            result = await f  # The 'await' may raise.
569            # Use result.
570
571    If a timeout is specified, the 'await' will raise
572    TimeoutError when the timeout occurs before all Futures are done.
573
574    Note: The futures 'f' are not necessarily members of fs.
575    """
576    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
577        raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
578
579    from .queues import Queue  # Import here to avoid circular import problem.
580    done = Queue()
581
582    loop = events._get_event_loop()
583    todo = {ensure_future(f, loop=loop) for f in set(fs)}
584    timeout_handle = None
585
586    def _on_timeout():
587        for f in todo:
588            f.remove_done_callback(_on_completion)
589            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
590        todo.clear()  # Can't do todo.remove(f) in the loop.
591
592    def _on_completion(f):
593        if not todo:
594            return  # _on_timeout() was here first.
595        todo.remove(f)
596        done.put_nowait(f)
597        if not todo and timeout_handle is not None:
598            timeout_handle.cancel()
599
600    async def _wait_for_one():
601        f = await done.get()
602        if f is None:
603            # Dummy value from _on_timeout().
604            raise exceptions.TimeoutError
605        return f.result()  # May raise f.exception().
606
607    for f in todo:
608        f.add_done_callback(_on_completion)
609    if todo and timeout is not None:
610        timeout_handle = loop.call_later(timeout, _on_timeout)
611    for _ in range(len(todo)):
612        yield _wait_for_one()
613
614
615@types.coroutine
616def __sleep0():
617    """Skip one event loop run cycle.
618
619    This is a private helper for 'asyncio.sleep()', used
620    when the 'delay' is set to 0.  It uses a bare 'yield'
621    expression (which Task.__step knows how to handle)
622    instead of creating a Future object.
623    """
624    yield
625
626
627async def sleep(delay, result=None):
628    """Coroutine that completes after a given time (in seconds)."""
629    if delay <= 0:
630        await __sleep0()
631        return result
632
633    loop = events.get_running_loop()
634    future = loop.create_future()
635    h = loop.call_later(delay,
636                        futures._set_result_unless_cancelled,
637                        future, result)
638    try:
639        return await future
640    finally:
641        h.cancel()
642
643
644def ensure_future(coro_or_future, *, loop=None):
645    """Wrap a coroutine or an awaitable in a future.
646
647    If the argument is a Future, it is returned directly.
648    """
649    return _ensure_future(coro_or_future, loop=loop)
650
651
652def _ensure_future(coro_or_future, *, loop=None):
653    if futures.isfuture(coro_or_future):
654        if loop is not None and loop is not futures._get_loop(coro_or_future):
655            raise ValueError('The future belongs to a different loop than '
656                            'the one specified as the loop argument')
657        return coro_or_future
658    called_wrap_awaitable = False
659    if not coroutines.iscoroutine(coro_or_future):
660        if inspect.isawaitable(coro_or_future):
661            coro_or_future = _wrap_awaitable(coro_or_future)
662            called_wrap_awaitable = True
663        else:
664            raise TypeError('An asyncio.Future, a coroutine or an awaitable '
665                            'is required')
666
667    if loop is None:
668        loop = events._get_event_loop(stacklevel=4)
669    try:
670        return loop.create_task(coro_or_future)
671    except RuntimeError:
672        if not called_wrap_awaitable:
673            coro_or_future.close()
674        raise
675
676
677@types.coroutine
678def _wrap_awaitable(awaitable):
679    """Helper for asyncio.ensure_future().
680
681    Wraps awaitable (an object with __await__) into a coroutine
682    that will later be wrapped in a Task by ensure_future().
683    """
684    return (yield from awaitable.__await__())
685
686_wrap_awaitable._is_coroutine = _is_coroutine
687
688
689class _GatheringFuture(futures.Future):
690    """Helper for gather().
691
692    This overrides cancel() to cancel all the children and act more
693    like Task.cancel(), which doesn't immediately mark itself as
694    cancelled.
695    """
696
697    def __init__(self, children, *, loop):
698        assert loop is not None
699        super().__init__(loop=loop)
700        self._children = children
701        self._cancel_requested = False
702
703    def cancel(self, msg=None):
704        if self.done():
705            return False
706        ret = False
707        for child in self._children:
708            if child.cancel(msg=msg):
709                ret = True
710        if ret:
711            # If any child tasks were actually cancelled, we should
712            # propagate the cancellation request regardless of
713            # *return_exceptions* argument.  See issue 32684.
714            self._cancel_requested = True
715        return ret
716
717
718def gather(*coros_or_futures, return_exceptions=False):
719    """Return a future aggregating results from the given coroutines/futures.
720
721    Coroutines will be wrapped in a future and scheduled in the event
722    loop. They will not necessarily be scheduled in the same order as
723    passed in.
724
725    All futures must share the same event loop.  If all the tasks are
726    done successfully, the returned future's result is the list of
727    results (in the order of the original sequence, not necessarily
728    the order of results arrival).  If *return_exceptions* is True,
729    exceptions in the tasks are treated the same as successful
730    results, and gathered in the result list; otherwise, the first
731    raised exception will be immediately propagated to the returned
732    future.
733
734    Cancellation: if the outer Future is cancelled, all children (that
735    have not completed yet) are also cancelled.  If any child is
736    cancelled, this is treated as if it raised CancelledError --
737    the outer Future is *not* cancelled in this case.  (This is to
738    prevent the cancellation of one child to cause other children to
739    be cancelled.)
740
741    If *return_exceptions* is False, cancelling gather() after it
742    has been marked done won't cancel any submitted awaitables.
743    For instance, gather can be marked done after propagating an
744    exception to the caller, therefore, calling ``gather.cancel()``
745    after catching an exception (raised by one of the awaitables) from
746    gather won't cancel any other awaitables.
747    """
748    if not coros_or_futures:
749        loop = events._get_event_loop()
750        outer = loop.create_future()
751        outer.set_result([])
752        return outer
753
754    def _done_callback(fut):
755        nonlocal nfinished
756        nfinished += 1
757
758        if outer is None or outer.done():
759            if not fut.cancelled():
760                # Mark exception retrieved.
761                fut.exception()
762            return
763
764        if not return_exceptions:
765            if fut.cancelled():
766                # Check if 'fut' is cancelled first, as
767                # 'fut.exception()' will *raise* a CancelledError
768                # instead of returning it.
769                exc = fut._make_cancelled_error()
770                outer.set_exception(exc)
771                return
772            else:
773                exc = fut.exception()
774                if exc is not None:
775                    outer.set_exception(exc)
776                    return
777
778        if nfinished == nfuts:
779            # All futures are done; create a list of results
780            # and set it to the 'outer' future.
781            results = []
782
783            for fut in children:
784                if fut.cancelled():
785                    # Check if 'fut' is cancelled first, as 'fut.exception()'
786                    # will *raise* a CancelledError instead of returning it.
787                    # Also, since we're adding the exception return value
788                    # to 'results' instead of raising it, don't bother
789                    # setting __context__.  This also lets us preserve
790                    # calling '_make_cancelled_error()' at most once.
791                    res = exceptions.CancelledError(
792                        '' if fut._cancel_message is None else
793                        fut._cancel_message)
794                else:
795                    res = fut.exception()
796                    if res is None:
797                        res = fut.result()
798                results.append(res)
799
800            if outer._cancel_requested:
801                # If gather is being cancelled we must propagate the
802                # cancellation regardless of *return_exceptions* argument.
803                # See issue 32684.
804                exc = fut._make_cancelled_error()
805                outer.set_exception(exc)
806            else:
807                outer.set_result(results)
808
809    arg_to_fut = {}
810    children = []
811    nfuts = 0
812    nfinished = 0
813    loop = None
814    outer = None  # bpo-46672
815    for arg in coros_or_futures:
816        if arg not in arg_to_fut:
817            fut = _ensure_future(arg, loop=loop)
818            if loop is None:
819                loop = futures._get_loop(fut)
820            if fut is not arg:
821                # 'arg' was not a Future, therefore, 'fut' is a new
822                # Future created specifically for 'arg'.  Since the caller
823                # can't control it, disable the "destroy pending task"
824                # warning.
825                fut._log_destroy_pending = False
826
827            nfuts += 1
828            arg_to_fut[arg] = fut
829            fut.add_done_callback(_done_callback)
830
831        else:
832            # There's a duplicate Future object in coros_or_futures.
833            fut = arg_to_fut[arg]
834
835        children.append(fut)
836
837    outer = _GatheringFuture(children, loop=loop)
838    return outer
839
840
841def shield(arg):
842    """Wait for a future, shielding it from cancellation.
843
844    The statement
845
846        task = asyncio.create_task(something())
847        res = await shield(task)
848
849    is exactly equivalent to the statement
850
851        res = await something()
852
853    *except* that if the coroutine containing it is cancelled, the
854    task running in something() is not cancelled.  From the POV of
855    something(), the cancellation did not happen.  But its caller is
856    still cancelled, so the yield-from expression still raises
857    CancelledError.  Note: If something() is cancelled by other means
858    this will still cancel shield().
859
860    If you want to completely ignore cancellation (not recommended)
861    you can combine shield() with a try/except clause, as follows:
862
863        task = asyncio.create_task(something())
864        try:
865            res = await shield(task)
866        except CancelledError:
867            res = None
868
869    Save a reference to tasks passed to this function, to avoid
870    a task disappearing mid-execution. The event loop only keeps
871    weak references to tasks. A task that isn't referenced elsewhere
872    may get garbage collected at any time, even before it's done.
873    """
874    inner = _ensure_future(arg)
875    if inner.done():
876        # Shortcut.
877        return inner
878    loop = futures._get_loop(inner)
879    outer = loop.create_future()
880
881    def _inner_done_callback(inner):
882        if outer.cancelled():
883            if not inner.cancelled():
884                # Mark inner's result as retrieved.
885                inner.exception()
886            return
887
888        if inner.cancelled():
889            outer.cancel()
890        else:
891            exc = inner.exception()
892            if exc is not None:
893                outer.set_exception(exc)
894            else:
895                outer.set_result(inner.result())
896
897
898    def _outer_done_callback(outer):
899        if not inner.done():
900            inner.remove_done_callback(_inner_done_callback)
901
902    inner.add_done_callback(_inner_done_callback)
903    outer.add_done_callback(_outer_done_callback)
904    return outer
905
906
907def run_coroutine_threadsafe(coro, loop):
908    """Submit a coroutine object to a given event loop.
909
910    Return a concurrent.futures.Future to access the result.
911    """
912    if not coroutines.iscoroutine(coro):
913        raise TypeError('A coroutine object is required')
914    future = concurrent.futures.Future()
915
916    def callback():
917        try:
918            futures._chain_future(ensure_future(coro, loop=loop), future)
919        except (SystemExit, KeyboardInterrupt):
920            raise
921        except BaseException as exc:
922            if future.set_running_or_notify_cancel():
923                future.set_exception(exc)
924            raise
925
926    loop.call_soon_threadsafe(callback)
927    return future
928
929
930# WeakSet containing all alive tasks.
931_all_tasks = weakref.WeakSet()
932
933# Dictionary containing tasks that are currently active in
934# all running event loops.  {EventLoop: Task}
935_current_tasks = {}
936
937
938def _register_task(task):
939    """Register a new task in asyncio as executed by loop."""
940    _all_tasks.add(task)
941
942
943def _enter_task(loop, task):
944    current_task = _current_tasks.get(loop)
945    if current_task is not None:
946        raise RuntimeError(f"Cannot enter into task {task!r} while another "
947                           f"task {current_task!r} is being executed.")
948    _current_tasks[loop] = task
949
950
951def _leave_task(loop, task):
952    current_task = _current_tasks.get(loop)
953    if current_task is not task:
954        raise RuntimeError(f"Leaving task {task!r} does not match "
955                           f"the current task {current_task!r}.")
956    del _current_tasks[loop]
957
958
959def _unregister_task(task):
960    """Unregister a task."""
961    _all_tasks.discard(task)
962
963
964_py_register_task = _register_task
965_py_unregister_task = _unregister_task
966_py_enter_task = _enter_task
967_py_leave_task = _leave_task
968
969
970try:
971    from _asyncio import (_register_task, _unregister_task,
972                          _enter_task, _leave_task,
973                          _all_tasks, _current_tasks)
974except ImportError:
975    pass
976else:
977    _c_register_task = _register_task
978    _c_unregister_task = _unregister_task
979    _c_enter_task = _enter_task
980    _c_leave_task = _leave_task
981