1"""Support for tasks, coroutines and the scheduler.""" 2 3__all__ = ( 4 'Task', 'create_task', 5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 6 'wait', 'wait_for', 'as_completed', 'sleep', 7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 8 'current_task', 'all_tasks', 9 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 10) 11 12import concurrent.futures 13import contextvars 14import functools 15import inspect 16import itertools 17import types 18import warnings 19import weakref 20from types import GenericAlias 21 22from . import base_tasks 23from . import coroutines 24from . import events 25from . import exceptions 26from . import futures 27from .coroutines import _is_coroutine 28 29# Helper to generate new task names 30# This uses itertools.count() instead of a "+= 1" operation because the latter 31# is not thread safe. See bpo-11866 for a longer explanation. 32_task_name_counter = itertools.count(1).__next__ 33 34 35def current_task(loop=None): 36 """Return a currently executed task.""" 37 if loop is None: 38 loop = events.get_running_loop() 39 return _current_tasks.get(loop) 40 41 42def all_tasks(loop=None): 43 """Return a set of all tasks for the loop.""" 44 if loop is None: 45 loop = events.get_running_loop() 46 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 47 # thread while we do so. Therefore we cast it to list prior to filtering. The list 48 # cast itself requires iteration, so we repeat it several times ignoring 49 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 50 # details. 51 i = 0 52 while True: 53 try: 54 tasks = list(_all_tasks) 55 except RuntimeError: 56 i += 1 57 if i >= 1000: 58 raise 59 else: 60 break 61 return {t for t in tasks 62 if futures._get_loop(t) is loop and not t.done()} 63 64 65def _set_task_name(task, name): 66 if name is not None: 67 try: 68 set_name = task.set_name 69 except AttributeError: 70 warnings.warn("Task.set_name() was added in Python 3.8, " 71 "the method support will be mandatory for third-party " 72 "task implementations since 3.13.", 73 DeprecationWarning, stacklevel=3) 74 else: 75 set_name(name) 76 77 78class Task(futures._PyFuture): # Inherit Python Task implementation 79 # from a Python Future implementation. 80 81 """A coroutine wrapped in a Future.""" 82 83 # An important invariant maintained while a Task not done: 84 # 85 # - Either _fut_waiter is None, and _step() is scheduled; 86 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 87 # 88 # The only transition from the latter to the former is through 89 # _wakeup(). When _fut_waiter is not None, one of its callbacks 90 # must be _wakeup(). 91 92 # If False, don't log a message if the task is destroyed whereas its 93 # status is still pending 94 _log_destroy_pending = True 95 96 def __init__(self, coro, *, loop=None, name=None, context=None): 97 super().__init__(loop=loop) 98 if self._source_traceback: 99 del self._source_traceback[-1] 100 if not coroutines.iscoroutine(coro): 101 # raise after Future.__init__(), attrs are required for __del__ 102 # prevent logging for pending task in __del__ 103 self._log_destroy_pending = False 104 raise TypeError(f"a coroutine was expected, got {coro!r}") 105 106 if name is None: 107 self._name = f'Task-{_task_name_counter()}' 108 else: 109 self._name = str(name) 110 111 self._num_cancels_requested = 0 112 self._must_cancel = False 113 self._fut_waiter = None 114 self._coro = coro 115 if context is None: 116 self._context = contextvars.copy_context() 117 else: 118 self._context = context 119 120 self._loop.call_soon(self.__step, context=self._context) 121 _register_task(self) 122 123 def __del__(self): 124 if self._state == futures._PENDING and self._log_destroy_pending: 125 context = { 126 'task': self, 127 'message': 'Task was destroyed but it is pending!', 128 } 129 if self._source_traceback: 130 context['source_traceback'] = self._source_traceback 131 self._loop.call_exception_handler(context) 132 super().__del__() 133 134 __class_getitem__ = classmethod(GenericAlias) 135 136 def __repr__(self): 137 return base_tasks._task_repr(self) 138 139 def get_coro(self): 140 return self._coro 141 142 def get_name(self): 143 return self._name 144 145 def set_name(self, value): 146 self._name = str(value) 147 148 def set_result(self, result): 149 raise RuntimeError('Task does not support set_result operation') 150 151 def set_exception(self, exception): 152 raise RuntimeError('Task does not support set_exception operation') 153 154 def get_stack(self, *, limit=None): 155 """Return the list of stack frames for this task's coroutine. 156 157 If the coroutine is not done, this returns the stack where it is 158 suspended. If the coroutine has completed successfully or was 159 cancelled, this returns an empty list. If the coroutine was 160 terminated by an exception, this returns the list of traceback 161 frames. 162 163 The frames are always ordered from oldest to newest. 164 165 The optional limit gives the maximum number of frames to 166 return; by default all available frames are returned. Its 167 meaning differs depending on whether a stack or a traceback is 168 returned: the newest frames of a stack are returned, but the 169 oldest frames of a traceback are returned. (This matches the 170 behavior of the traceback module.) 171 172 For reasons beyond our control, only one stack frame is 173 returned for a suspended coroutine. 174 """ 175 return base_tasks._task_get_stack(self, limit) 176 177 def print_stack(self, *, limit=None, file=None): 178 """Print the stack or traceback for this task's coroutine. 179 180 This produces output similar to that of the traceback module, 181 for the frames retrieved by get_stack(). The limit argument 182 is passed to get_stack(). The file argument is an I/O stream 183 to which the output is written; by default output is written 184 to sys.stderr. 185 """ 186 return base_tasks._task_print_stack(self, limit, file) 187 188 def cancel(self, msg=None): 189 """Request that this task cancel itself. 190 191 This arranges for a CancelledError to be thrown into the 192 wrapped coroutine on the next cycle through the event loop. 193 The coroutine then has a chance to clean up or even deny 194 the request using try/except/finally. 195 196 Unlike Future.cancel, this does not guarantee that the 197 task will be cancelled: the exception might be caught and 198 acted upon, delaying cancellation of the task or preventing 199 cancellation completely. The task may also return a value or 200 raise a different exception. 201 202 Immediately after this method is called, Task.cancelled() will 203 not return True (unless the task was already cancelled). A 204 task will be marked as cancelled when the wrapped coroutine 205 terminates with a CancelledError exception (even if cancel() 206 was not called). 207 208 This also increases the task's count of cancellation requests. 209 """ 210 self._log_traceback = False 211 if self.done(): 212 return False 213 self._num_cancels_requested += 1 214 # These two lines are controversial. See discussion starting at 215 # https://github.com/python/cpython/pull/31394#issuecomment-1053545331 216 # Also remember that this is duplicated in _asynciomodule.c. 217 # if self._num_cancels_requested > 1: 218 # return False 219 if self._fut_waiter is not None: 220 if self._fut_waiter.cancel(msg=msg): 221 # Leave self._fut_waiter; it may be a Task that 222 # catches and ignores the cancellation so we may have 223 # to cancel it again later. 224 return True 225 # It must be the case that self.__step is already scheduled. 226 self._must_cancel = True 227 self._cancel_message = msg 228 return True 229 230 def cancelling(self): 231 """Return the count of the task's cancellation requests. 232 233 This count is incremented when .cancel() is called 234 and may be decremented using .uncancel(). 235 """ 236 return self._num_cancels_requested 237 238 def uncancel(self): 239 """Decrement the task's count of cancellation requests. 240 241 This should be called by the party that called `cancel()` on the task 242 beforehand. 243 244 Returns the remaining number of cancellation requests. 245 """ 246 if self._num_cancels_requested > 0: 247 self._num_cancels_requested -= 1 248 return self._num_cancels_requested 249 250 def __step(self, exc=None): 251 if self.done(): 252 raise exceptions.InvalidStateError( 253 f'_step(): already done: {self!r}, {exc!r}') 254 if self._must_cancel: 255 if not isinstance(exc, exceptions.CancelledError): 256 exc = self._make_cancelled_error() 257 self._must_cancel = False 258 coro = self._coro 259 self._fut_waiter = None 260 261 _enter_task(self._loop, self) 262 # Call either coro.throw(exc) or coro.send(None). 263 try: 264 if exc is None: 265 # We use the `send` method directly, because coroutines 266 # don't have `__iter__` and `__next__` methods. 267 result = coro.send(None) 268 else: 269 result = coro.throw(exc) 270 except StopIteration as exc: 271 if self._must_cancel: 272 # Task is cancelled right before coro stops. 273 self._must_cancel = False 274 super().cancel(msg=self._cancel_message) 275 else: 276 super().set_result(exc.value) 277 except exceptions.CancelledError as exc: 278 # Save the original exception so we can chain it later. 279 self._cancelled_exc = exc 280 super().cancel() # I.e., Future.cancel(self). 281 except (KeyboardInterrupt, SystemExit) as exc: 282 super().set_exception(exc) 283 raise 284 except BaseException as exc: 285 super().set_exception(exc) 286 else: 287 blocking = getattr(result, '_asyncio_future_blocking', None) 288 if blocking is not None: 289 # Yielded Future must come from Future.__iter__(). 290 if futures._get_loop(result) is not self._loop: 291 new_exc = RuntimeError( 292 f'Task {self!r} got Future ' 293 f'{result!r} attached to a different loop') 294 self._loop.call_soon( 295 self.__step, new_exc, context=self._context) 296 elif blocking: 297 if result is self: 298 new_exc = RuntimeError( 299 f'Task cannot await on itself: {self!r}') 300 self._loop.call_soon( 301 self.__step, new_exc, context=self._context) 302 else: 303 result._asyncio_future_blocking = False 304 result.add_done_callback( 305 self.__wakeup, context=self._context) 306 self._fut_waiter = result 307 if self._must_cancel: 308 if self._fut_waiter.cancel( 309 msg=self._cancel_message): 310 self._must_cancel = False 311 else: 312 new_exc = RuntimeError( 313 f'yield was used instead of yield from ' 314 f'in task {self!r} with {result!r}') 315 self._loop.call_soon( 316 self.__step, new_exc, context=self._context) 317 318 elif result is None: 319 # Bare yield relinquishes control for one event loop iteration. 320 self._loop.call_soon(self.__step, context=self._context) 321 elif inspect.isgenerator(result): 322 # Yielding a generator is just wrong. 323 new_exc = RuntimeError( 324 f'yield was used instead of yield from for ' 325 f'generator in task {self!r} with {result!r}') 326 self._loop.call_soon( 327 self.__step, new_exc, context=self._context) 328 else: 329 # Yielding something else is an error. 330 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 331 self._loop.call_soon( 332 self.__step, new_exc, context=self._context) 333 finally: 334 _leave_task(self._loop, self) 335 self = None # Needed to break cycles when an exception occurs. 336 337 def __wakeup(self, future): 338 try: 339 future.result() 340 except BaseException as exc: 341 # This may also be a cancellation. 342 self.__step(exc) 343 else: 344 # Don't pass the value of `future.result()` explicitly, 345 # as `Future.__iter__` and `Future.__await__` don't need it. 346 # If we call `_step(value, None)` instead of `_step()`, 347 # Python eval loop would use `.send(value)` method call, 348 # instead of `__next__()`, which is slower for futures 349 # that return non-generator iterators from their `__iter__`. 350 self.__step() 351 self = None # Needed to break cycles when an exception occurs. 352 353 354_PyTask = Task 355 356 357try: 358 import _asyncio 359except ImportError: 360 pass 361else: 362 # _CTask is needed for tests. 363 Task = _CTask = _asyncio.Task 364 365 366def create_task(coro, *, name=None, context=None): 367 """Schedule the execution of a coroutine object in a spawn task. 368 369 Return a Task object. 370 """ 371 loop = events.get_running_loop() 372 if context is None: 373 # Use legacy API if context is not needed 374 task = loop.create_task(coro) 375 else: 376 task = loop.create_task(coro, context=context) 377 378 _set_task_name(task, name) 379 return task 380 381 382# wait() and as_completed() similar to those in PEP 3148. 383 384FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 385FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 386ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 387 388 389async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 390 """Wait for the Futures or Tasks given by fs to complete. 391 392 The fs iterable must not be empty. 393 394 Coroutines will be wrapped in Tasks. 395 396 Returns two sets of Future: (done, pending). 397 398 Usage: 399 400 done, pending = await asyncio.wait(fs) 401 402 Note: This does not raise TimeoutError! Futures that aren't done 403 when the timeout occurs are returned in the second set. 404 """ 405 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 406 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 407 if not fs: 408 raise ValueError('Set of Tasks/Futures is empty.') 409 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 410 raise ValueError(f'Invalid return_when value: {return_when}') 411 412 fs = set(fs) 413 414 if any(coroutines.iscoroutine(f) for f in fs): 415 raise TypeError("Passing coroutines is forbidden, use tasks explicitly.") 416 417 loop = events.get_running_loop() 418 return await _wait(fs, timeout, return_when, loop) 419 420 421def _release_waiter(waiter, *args): 422 if not waiter.done(): 423 waiter.set_result(None) 424 425 426async def wait_for(fut, timeout): 427 """Wait for the single Future or coroutine to complete, with timeout. 428 429 Coroutine will be wrapped in Task. 430 431 Returns result of the Future or coroutine. When a timeout occurs, 432 it cancels the task and raises TimeoutError. To avoid the task 433 cancellation, wrap it in shield(). 434 435 If the wait is cancelled, the task is also cancelled. 436 437 This function is a coroutine. 438 """ 439 loop = events.get_running_loop() 440 441 if timeout is None: 442 return await fut 443 444 if timeout <= 0: 445 fut = ensure_future(fut, loop=loop) 446 447 if fut.done(): 448 return fut.result() 449 450 await _cancel_and_wait(fut, loop=loop) 451 try: 452 return fut.result() 453 except exceptions.CancelledError as exc: 454 raise exceptions.TimeoutError() from exc 455 456 waiter = loop.create_future() 457 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 458 cb = functools.partial(_release_waiter, waiter) 459 460 fut = ensure_future(fut, loop=loop) 461 fut.add_done_callback(cb) 462 463 try: 464 # wait until the future completes or the timeout 465 try: 466 await waiter 467 except exceptions.CancelledError: 468 if fut.done(): 469 return fut.result() 470 else: 471 fut.remove_done_callback(cb) 472 # We must ensure that the task is not running 473 # after wait_for() returns. 474 # See https://bugs.python.org/issue32751 475 await _cancel_and_wait(fut, loop=loop) 476 raise 477 478 if fut.done(): 479 return fut.result() 480 else: 481 fut.remove_done_callback(cb) 482 # We must ensure that the task is not running 483 # after wait_for() returns. 484 # See https://bugs.python.org/issue32751 485 await _cancel_and_wait(fut, loop=loop) 486 # In case task cancellation failed with some 487 # exception, we should re-raise it 488 # See https://bugs.python.org/issue40607 489 try: 490 return fut.result() 491 except exceptions.CancelledError as exc: 492 raise exceptions.TimeoutError() from exc 493 finally: 494 timeout_handle.cancel() 495 496 497async def _wait(fs, timeout, return_when, loop): 498 """Internal helper for wait(). 499 500 The fs argument must be a collection of Futures. 501 """ 502 assert fs, 'Set of Futures is empty.' 503 waiter = loop.create_future() 504 timeout_handle = None 505 if timeout is not None: 506 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 507 counter = len(fs) 508 509 def _on_completion(f): 510 nonlocal counter 511 counter -= 1 512 if (counter <= 0 or 513 return_when == FIRST_COMPLETED or 514 return_when == FIRST_EXCEPTION and (not f.cancelled() and 515 f.exception() is not None)): 516 if timeout_handle is not None: 517 timeout_handle.cancel() 518 if not waiter.done(): 519 waiter.set_result(None) 520 521 for f in fs: 522 f.add_done_callback(_on_completion) 523 524 try: 525 await waiter 526 finally: 527 if timeout_handle is not None: 528 timeout_handle.cancel() 529 for f in fs: 530 f.remove_done_callback(_on_completion) 531 532 done, pending = set(), set() 533 for f in fs: 534 if f.done(): 535 done.add(f) 536 else: 537 pending.add(f) 538 return done, pending 539 540 541async def _cancel_and_wait(fut, loop): 542 """Cancel the *fut* future or task and wait until it completes.""" 543 544 waiter = loop.create_future() 545 cb = functools.partial(_release_waiter, waiter) 546 fut.add_done_callback(cb) 547 548 try: 549 fut.cancel() 550 # We cannot wait on *fut* directly to make 551 # sure _cancel_and_wait itself is reliably cancellable. 552 await waiter 553 finally: 554 fut.remove_done_callback(cb) 555 556 557# This is *not* a @coroutine! It is just an iterator (yielding Futures). 558def as_completed(fs, *, timeout=None): 559 """Return an iterator whose values are coroutines. 560 561 When waiting for the yielded coroutines you'll get the results (or 562 exceptions!) of the original Futures (or coroutines), in the order 563 in which and as soon as they complete. 564 565 This differs from PEP 3148; the proper way to use this is: 566 567 for f in as_completed(fs): 568 result = await f # The 'await' may raise. 569 # Use result. 570 571 If a timeout is specified, the 'await' will raise 572 TimeoutError when the timeout occurs before all Futures are done. 573 574 Note: The futures 'f' are not necessarily members of fs. 575 """ 576 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 577 raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") 578 579 from .queues import Queue # Import here to avoid circular import problem. 580 done = Queue() 581 582 loop = events._get_event_loop() 583 todo = {ensure_future(f, loop=loop) for f in set(fs)} 584 timeout_handle = None 585 586 def _on_timeout(): 587 for f in todo: 588 f.remove_done_callback(_on_completion) 589 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 590 todo.clear() # Can't do todo.remove(f) in the loop. 591 592 def _on_completion(f): 593 if not todo: 594 return # _on_timeout() was here first. 595 todo.remove(f) 596 done.put_nowait(f) 597 if not todo and timeout_handle is not None: 598 timeout_handle.cancel() 599 600 async def _wait_for_one(): 601 f = await done.get() 602 if f is None: 603 # Dummy value from _on_timeout(). 604 raise exceptions.TimeoutError 605 return f.result() # May raise f.exception(). 606 607 for f in todo: 608 f.add_done_callback(_on_completion) 609 if todo and timeout is not None: 610 timeout_handle = loop.call_later(timeout, _on_timeout) 611 for _ in range(len(todo)): 612 yield _wait_for_one() 613 614 615@types.coroutine 616def __sleep0(): 617 """Skip one event loop run cycle. 618 619 This is a private helper for 'asyncio.sleep()', used 620 when the 'delay' is set to 0. It uses a bare 'yield' 621 expression (which Task.__step knows how to handle) 622 instead of creating a Future object. 623 """ 624 yield 625 626 627async def sleep(delay, result=None): 628 """Coroutine that completes after a given time (in seconds).""" 629 if delay <= 0: 630 await __sleep0() 631 return result 632 633 loop = events.get_running_loop() 634 future = loop.create_future() 635 h = loop.call_later(delay, 636 futures._set_result_unless_cancelled, 637 future, result) 638 try: 639 return await future 640 finally: 641 h.cancel() 642 643 644def ensure_future(coro_or_future, *, loop=None): 645 """Wrap a coroutine or an awaitable in a future. 646 647 If the argument is a Future, it is returned directly. 648 """ 649 return _ensure_future(coro_or_future, loop=loop) 650 651 652def _ensure_future(coro_or_future, *, loop=None): 653 if futures.isfuture(coro_or_future): 654 if loop is not None and loop is not futures._get_loop(coro_or_future): 655 raise ValueError('The future belongs to a different loop than ' 656 'the one specified as the loop argument') 657 return coro_or_future 658 called_wrap_awaitable = False 659 if not coroutines.iscoroutine(coro_or_future): 660 if inspect.isawaitable(coro_or_future): 661 coro_or_future = _wrap_awaitable(coro_or_future) 662 called_wrap_awaitable = True 663 else: 664 raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 665 'is required') 666 667 if loop is None: 668 loop = events._get_event_loop(stacklevel=4) 669 try: 670 return loop.create_task(coro_or_future) 671 except RuntimeError: 672 if not called_wrap_awaitable: 673 coro_or_future.close() 674 raise 675 676 677@types.coroutine 678def _wrap_awaitable(awaitable): 679 """Helper for asyncio.ensure_future(). 680 681 Wraps awaitable (an object with __await__) into a coroutine 682 that will later be wrapped in a Task by ensure_future(). 683 """ 684 return (yield from awaitable.__await__()) 685 686_wrap_awaitable._is_coroutine = _is_coroutine 687 688 689class _GatheringFuture(futures.Future): 690 """Helper for gather(). 691 692 This overrides cancel() to cancel all the children and act more 693 like Task.cancel(), which doesn't immediately mark itself as 694 cancelled. 695 """ 696 697 def __init__(self, children, *, loop): 698 assert loop is not None 699 super().__init__(loop=loop) 700 self._children = children 701 self._cancel_requested = False 702 703 def cancel(self, msg=None): 704 if self.done(): 705 return False 706 ret = False 707 for child in self._children: 708 if child.cancel(msg=msg): 709 ret = True 710 if ret: 711 # If any child tasks were actually cancelled, we should 712 # propagate the cancellation request regardless of 713 # *return_exceptions* argument. See issue 32684. 714 self._cancel_requested = True 715 return ret 716 717 718def gather(*coros_or_futures, return_exceptions=False): 719 """Return a future aggregating results from the given coroutines/futures. 720 721 Coroutines will be wrapped in a future and scheduled in the event 722 loop. They will not necessarily be scheduled in the same order as 723 passed in. 724 725 All futures must share the same event loop. If all the tasks are 726 done successfully, the returned future's result is the list of 727 results (in the order of the original sequence, not necessarily 728 the order of results arrival). If *return_exceptions* is True, 729 exceptions in the tasks are treated the same as successful 730 results, and gathered in the result list; otherwise, the first 731 raised exception will be immediately propagated to the returned 732 future. 733 734 Cancellation: if the outer Future is cancelled, all children (that 735 have not completed yet) are also cancelled. If any child is 736 cancelled, this is treated as if it raised CancelledError -- 737 the outer Future is *not* cancelled in this case. (This is to 738 prevent the cancellation of one child to cause other children to 739 be cancelled.) 740 741 If *return_exceptions* is False, cancelling gather() after it 742 has been marked done won't cancel any submitted awaitables. 743 For instance, gather can be marked done after propagating an 744 exception to the caller, therefore, calling ``gather.cancel()`` 745 after catching an exception (raised by one of the awaitables) from 746 gather won't cancel any other awaitables. 747 """ 748 if not coros_or_futures: 749 loop = events._get_event_loop() 750 outer = loop.create_future() 751 outer.set_result([]) 752 return outer 753 754 def _done_callback(fut): 755 nonlocal nfinished 756 nfinished += 1 757 758 if outer is None or outer.done(): 759 if not fut.cancelled(): 760 # Mark exception retrieved. 761 fut.exception() 762 return 763 764 if not return_exceptions: 765 if fut.cancelled(): 766 # Check if 'fut' is cancelled first, as 767 # 'fut.exception()' will *raise* a CancelledError 768 # instead of returning it. 769 exc = fut._make_cancelled_error() 770 outer.set_exception(exc) 771 return 772 else: 773 exc = fut.exception() 774 if exc is not None: 775 outer.set_exception(exc) 776 return 777 778 if nfinished == nfuts: 779 # All futures are done; create a list of results 780 # and set it to the 'outer' future. 781 results = [] 782 783 for fut in children: 784 if fut.cancelled(): 785 # Check if 'fut' is cancelled first, as 'fut.exception()' 786 # will *raise* a CancelledError instead of returning it. 787 # Also, since we're adding the exception return value 788 # to 'results' instead of raising it, don't bother 789 # setting __context__. This also lets us preserve 790 # calling '_make_cancelled_error()' at most once. 791 res = exceptions.CancelledError( 792 '' if fut._cancel_message is None else 793 fut._cancel_message) 794 else: 795 res = fut.exception() 796 if res is None: 797 res = fut.result() 798 results.append(res) 799 800 if outer._cancel_requested: 801 # If gather is being cancelled we must propagate the 802 # cancellation regardless of *return_exceptions* argument. 803 # See issue 32684. 804 exc = fut._make_cancelled_error() 805 outer.set_exception(exc) 806 else: 807 outer.set_result(results) 808 809 arg_to_fut = {} 810 children = [] 811 nfuts = 0 812 nfinished = 0 813 loop = None 814 outer = None # bpo-46672 815 for arg in coros_or_futures: 816 if arg not in arg_to_fut: 817 fut = _ensure_future(arg, loop=loop) 818 if loop is None: 819 loop = futures._get_loop(fut) 820 if fut is not arg: 821 # 'arg' was not a Future, therefore, 'fut' is a new 822 # Future created specifically for 'arg'. Since the caller 823 # can't control it, disable the "destroy pending task" 824 # warning. 825 fut._log_destroy_pending = False 826 827 nfuts += 1 828 arg_to_fut[arg] = fut 829 fut.add_done_callback(_done_callback) 830 831 else: 832 # There's a duplicate Future object in coros_or_futures. 833 fut = arg_to_fut[arg] 834 835 children.append(fut) 836 837 outer = _GatheringFuture(children, loop=loop) 838 return outer 839 840 841def shield(arg): 842 """Wait for a future, shielding it from cancellation. 843 844 The statement 845 846 task = asyncio.create_task(something()) 847 res = await shield(task) 848 849 is exactly equivalent to the statement 850 851 res = await something() 852 853 *except* that if the coroutine containing it is cancelled, the 854 task running in something() is not cancelled. From the POV of 855 something(), the cancellation did not happen. But its caller is 856 still cancelled, so the yield-from expression still raises 857 CancelledError. Note: If something() is cancelled by other means 858 this will still cancel shield(). 859 860 If you want to completely ignore cancellation (not recommended) 861 you can combine shield() with a try/except clause, as follows: 862 863 task = asyncio.create_task(something()) 864 try: 865 res = await shield(task) 866 except CancelledError: 867 res = None 868 869 Save a reference to tasks passed to this function, to avoid 870 a task disappearing mid-execution. The event loop only keeps 871 weak references to tasks. A task that isn't referenced elsewhere 872 may get garbage collected at any time, even before it's done. 873 """ 874 inner = _ensure_future(arg) 875 if inner.done(): 876 # Shortcut. 877 return inner 878 loop = futures._get_loop(inner) 879 outer = loop.create_future() 880 881 def _inner_done_callback(inner): 882 if outer.cancelled(): 883 if not inner.cancelled(): 884 # Mark inner's result as retrieved. 885 inner.exception() 886 return 887 888 if inner.cancelled(): 889 outer.cancel() 890 else: 891 exc = inner.exception() 892 if exc is not None: 893 outer.set_exception(exc) 894 else: 895 outer.set_result(inner.result()) 896 897 898 def _outer_done_callback(outer): 899 if not inner.done(): 900 inner.remove_done_callback(_inner_done_callback) 901 902 inner.add_done_callback(_inner_done_callback) 903 outer.add_done_callback(_outer_done_callback) 904 return outer 905 906 907def run_coroutine_threadsafe(coro, loop): 908 """Submit a coroutine object to a given event loop. 909 910 Return a concurrent.futures.Future to access the result. 911 """ 912 if not coroutines.iscoroutine(coro): 913 raise TypeError('A coroutine object is required') 914 future = concurrent.futures.Future() 915 916 def callback(): 917 try: 918 futures._chain_future(ensure_future(coro, loop=loop), future) 919 except (SystemExit, KeyboardInterrupt): 920 raise 921 except BaseException as exc: 922 if future.set_running_or_notify_cancel(): 923 future.set_exception(exc) 924 raise 925 926 loop.call_soon_threadsafe(callback) 927 return future 928 929 930# WeakSet containing all alive tasks. 931_all_tasks = weakref.WeakSet() 932 933# Dictionary containing tasks that are currently active in 934# all running event loops. {EventLoop: Task} 935_current_tasks = {} 936 937 938def _register_task(task): 939 """Register a new task in asyncio as executed by loop.""" 940 _all_tasks.add(task) 941 942 943def _enter_task(loop, task): 944 current_task = _current_tasks.get(loop) 945 if current_task is not None: 946 raise RuntimeError(f"Cannot enter into task {task!r} while another " 947 f"task {current_task!r} is being executed.") 948 _current_tasks[loop] = task 949 950 951def _leave_task(loop, task): 952 current_task = _current_tasks.get(loop) 953 if current_task is not task: 954 raise RuntimeError(f"Leaving task {task!r} does not match " 955 f"the current task {current_task!r}.") 956 del _current_tasks[loop] 957 958 959def _unregister_task(task): 960 """Unregister a task.""" 961 _all_tasks.discard(task) 962 963 964_py_register_task = _register_task 965_py_unregister_task = _unregister_task 966_py_enter_task = _enter_task 967_py_leave_task = _leave_task 968 969 970try: 971 from _asyncio import (_register_task, _unregister_task, 972 _enter_task, _leave_task, 973 _all_tasks, _current_tasks) 974except ImportError: 975 pass 976else: 977 _c_register_task = _register_task 978 _c_unregister_task = _unregister_task 979 _c_enter_task = _enter_task 980 _c_leave_task = _leave_task 981